SQLite4
Check-in [99b59dacbd]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix further bugs.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | rework-flow-control
Files: files | file ages | folders
SHA1: 99b59dacbd9cb7bea0d3205c8ecb8029b5e321db
User & Date: dan 2012-09-24 16:04:06
Context
2012-09-24
17:18
Fix a problem preventing log file space from being reclaimed. check-in: b9f122f4e3 user: dan tags: rework-flow-control
16:04
Fix further bugs. check-in: 99b59dacbd user: dan tags: rework-flow-control
10:55
Fix problems introduced in the previous checkin. check-in: 1cd7d6ca93 user: dan tags: rework-flow-control
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_tdb3.c.

622
623
624
625
626
627
628

629
630
631
632
633
634
635
...
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
    int eParam;
  } aParam[] = {
    { "write_buffer",     0, LSM_CONFIG_WRITE_BUFFER },
    { "page_size",        0, LSM_CONFIG_PAGE_SIZE },
    { "block_size",       0, LSM_CONFIG_BLOCK_SIZE },
    { "safety",           0, LSM_CONFIG_SAFETY },
    { "autowork",         0, LSM_CONFIG_AUTOWORK },

    { "log_size",         0, LSM_CONFIG_LOG_SIZE },
    { "mmap",             0, LSM_CONFIG_MMAP },
    { "use_log",          0, LSM_CONFIG_USE_LOG },
    { "nmerge",           0, LSM_CONFIG_NMERGE },
    { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_nmerge",    1, LSM_CONFIG_NMERGE },
................................................................................

int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 max_freelist=4";
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){
    return ((LsmDb *)pDb)->db;
  }







>







 







|







622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
...
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
    int eParam;
  } aParam[] = {
    { "write_buffer",     0, LSM_CONFIG_WRITE_BUFFER },
    { "page_size",        0, LSM_CONFIG_PAGE_SIZE },
    { "block_size",       0, LSM_CONFIG_BLOCK_SIZE },
    { "safety",           0, LSM_CONFIG_SAFETY },
    { "autowork",         0, LSM_CONFIG_AUTOWORK },
    { "autocheckpoint",   0, LSM_CONFIG_AUTOCHECKPOINT },
    { "log_size",         0, LSM_CONFIG_LOG_SIZE },
    { "mmap",             0, LSM_CONFIG_MMAP },
    { "use_log",          0, LSM_CONFIG_USE_LOG },
    { "nmerge",           0, LSM_CONFIG_NMERGE },
    { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_nmerge",    1, LSM_CONFIG_NMERGE },
................................................................................

int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 max_freelist=4 autocheckpoint=32768";
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){
    return ((LsmDb *)pDb)->db;
  }

Changes to src/lsm_ckpt.c.

426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);







|







426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, (nOvfl?nOvfl:pSnap->nFreelistOvfl), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);

Changes to src/lsm_shared.c.

628
629
630
631
632
633
634



635
636
637
638
639
640
641
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
  /* If no error has occurred, serialize the worker snapshot and write
  ** it to shared memory.  */



  if( *pRc==LSM_OK ){
    *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;







>
>
>







628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
  /* If no error has occurred, serialize the worker snapshot and write
  ** it to shared memory.  */

  assert( pDb->pWorker );
  assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 );
  if( *pRc==LSM_OK ){
    *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;

Changes to src/lsm_sorted.c.

4144
4145
4146
4147
4148
4149
4150

4151
4152
4153
4154
4155
4156
4157
....
4190
4191
4192
4193
4194
4195
4196
4197
4198
4199
4200
4201
4202
4203
4204
....
4232
4233
4234
4235
4236
4237
4238
4239
4240
4241
4242
4243
4244
4245
4246
4247
4248
4249
4250
4251
4252
4253
4254
4255
4256

4257
4258
4259
4260
4261





4262
4263
4264
4265
4266
4267
4268
....
4273
4274
4275
4276
4277
4278
4279

4280
4281
4282
4283
4284
4285
4286
        rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg);
        nRem -= nPg;
        if( rc==LSM_OK && pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        bFlush = 1;
        bToplevel = 0;

      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
................................................................................
  int rc;
  int nWrite = 0;
  int bCkpt = 0;

  do {
    int nThis = 0;
    bCkpt = 0;
    rc = doLsmSingleWork(pDb, 0, flags, nPage, &nThis, &bCkpt);
    nWrite += nThis;
    if( rc==LSM_OK && bCkpt ){
      rc = lsm_checkpoint(pDb, 0);
    }
  }while( rc==LSM_OK && (nWrite<nPage && bCkpt) );

  if( pnWrite ){
................................................................................
*/
int lsmSortedAutoWork(
  lsm_db *pDb,                    /* Database handle */
  int nUnit                       /* Pages of data written to in-memory tree */
){
  int rc;                         /* Return code */
  int nRemaining;                 /* Units of work to do before returning */
  int nDepth;                     /* Current height of tree (longest path) */
  int nWrite;                     /* Pages written */
  Level *pLevel;                  /* Used to iterate through levels */
  int bRestore = 0;

  assert( pDb->pWorker==0 );
  assert( pDb->nTransOpen>0 );

  /* Determine how many units of work to do before returning. One unit of
  ** work is achieved by writing one page (~4KB) of merged data.  */
  nRemaining = nDepth = 0;
  for(pLevel=lsmDbSnapshotLevel(pDb->pClient); pLevel; pLevel=pLevel->pNext){
    /* nDepth += LSM_MAX(1, pLevel->nRight); */
    nDepth += 1;
  }
  nRemaining = nUnit * nDepth;

  if( lsmTreeHasOld(pDb) ){

    bRestore = 1;
    rc = lsmSaveCursors(pDb);
    if( rc!=LSM_OK ) return rc;
  }






  rc = doLsmWork(pDb, LSM_WORK_FLUSH, nRemaining, 0);
  if( rc==LSM_BUSY ) rc = LSM_OK;

  if( bRestore && pDb->pCsr ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
    rc = lsmCheckpointLoad(pDb, 0);
................................................................................
      rc = lsmRestoreCursors(pDb);
    }
  }

#if 0
  lsmLogMessage(pDb, 0, "auto-work: %d pages", nWrite);
#endif

  return rc;
}

int lsmFlushTreeToDisk(lsm_db *pDb){
  int rc;
  rc = doLsmSingleWork(pDb, 1, LSM_WORK_FLUSH, (1<<30), 0, 0);
  if( rc==LSM_OK ){







>







 







|







 







|









|




<
<

>





>
>
>
>
>







 







>







4144
4145
4146
4147
4148
4149
4150
4151
4152
4153
4154
4155
4156
4157
4158
....
4191
4192
4193
4194
4195
4196
4197
4198
4199
4200
4201
4202
4203
4204
4205
....
4233
4234
4235
4236
4237
4238
4239
4240
4241
4242
4243
4244
4245
4246
4247
4248
4249
4250
4251
4252
4253
4254


4255
4256
4257
4258
4259
4260
4261
4262
4263
4264
4265
4266
4267
4268
4269
4270
4271
4272
4273
....
4278
4279
4280
4281
4282
4283
4284
4285
4286
4287
4288
4289
4290
4291
4292
        rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg);
        nRem -= nPg;
        if( rc==LSM_OK && pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        bFlush = 1;
        bToplevel = 0;
        nOvfl = 0;
      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
................................................................................
  int rc;
  int nWrite = 0;
  int bCkpt = 0;

  do {
    int nThis = 0;
    bCkpt = 0;
    rc = doLsmSingleWork(pDb, 0, flags, nPage-nWrite, &nThis, &bCkpt);
    nWrite += nThis;
    if( rc==LSM_OK && bCkpt ){
      rc = lsm_checkpoint(pDb, 0);
    }
  }while( rc==LSM_OK && (nWrite<nPage && bCkpt) );

  if( pnWrite ){
................................................................................
*/
int lsmSortedAutoWork(
  lsm_db *pDb,                    /* Database handle */
  int nUnit                       /* Pages of data written to in-memory tree */
){
  int rc;                         /* Return code */
  int nRemaining;                 /* Units of work to do before returning */
  int nDepth = 0;                 /* Current height of tree (longest path) */
  int nWrite;                     /* Pages written */
  Level *pLevel;                  /* Used to iterate through levels */
  int bRestore = 0;

  assert( pDb->pWorker==0 );
  assert( pDb->nTransOpen>0 );

  /* Determine how many units of work to do before returning. One unit of
  ** work is achieved by writing one page (~4KB) of merged data.  */
  nRemaining = 0;
  for(pLevel=lsmDbSnapshotLevel(pDb->pClient); pLevel; pLevel=pLevel->pNext){
    /* nDepth += LSM_MAX(1, pLevel->nRight); */
    nDepth += 1;
  }


  if( lsmTreeHasOld(pDb) ){
    nDepth += 1;
    bRestore = 1;
    rc = lsmSaveCursors(pDb);
    if( rc!=LSM_OK ) return rc;
  }

  nRemaining = nUnit * nDepth;
#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages", 
      nUnit, nDepth, nRemaining);
#endif
  rc = doLsmWork(pDb, LSM_WORK_FLUSH, nRemaining, 0);
  if( rc==LSM_BUSY ) rc = LSM_OK;

  if( bRestore && pDb->pCsr ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
    rc = lsmCheckpointLoad(pDb, 0);
................................................................................
      rc = lsmRestoreCursors(pDb);
    }
  }

#if 0
  lsmLogMessage(pDb, 0, "auto-work: %d pages", nWrite);
#endif

  return rc;
}

int lsmFlushTreeToDisk(lsm_db *pDb){
  int rc;
  rc = doLsmSingleWork(pDb, 1, LSM_WORK_FLUSH, (1<<30), 0, 0);
  if( rc==LSM_OK ){