Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove the LSM_WORK_OPTIMIZE flag. Add free-list management related tests and fixes.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 91912a39ca5e4b3334feb5ce0fc2c67265975846
User & Date: dan 2012-11-07 20:08:06.987
Context
2012-11-08
11:59
Set a flag on levels that consist entirely of freelist entries. Use this flag to avoid counter-productive merges during database optimization. check-in: 48bd83a17a user: dan tags: trunk
2012-11-07
20:08
Remove the LSM_WORK_OPTIMIZE flag. Add free-list management related tests and fixes. check-in: 91912a39ca user: dan tags: trunk
2012-11-06
19:14
Fix lsmview.tcl so that it can view databases compressed with zlib. check-in: 7268cf7535 user: dan tags: trunk
Changes
Unified Diff Show Whitespace Changes Patch
Changes to lsm-test/lsmtest_func.c.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

#include "lsmtest.h"


int do_work(int nArg, char **azArg){
  struct Option {
    const char *zName;
  } aOpt [] = {
    { "-optimize" },
    { "-npage" },
    { 0 }
  };

  lsm_db *pDb;
  int rc;
  int i;
  const char *zDb;
  int flags = 0;
  int nWork = (1<<30);

  if( nArg==0 ) goto usage;
  zDb = azArg[nArg-1];
  for(i=0; i<(nArg-1); i++){
    int iSel;
    rc = testArgSelect(aOpt, "option", azArg[i], &iSel);
    if( rc ) return rc;
    switch( iSel ){
      case 0:
        flags |= LSM_WORK_OPTIMIZE;



        break;
      case 1:
        i++;
        if( i==(nArg-1) ) goto usage;
        nWork = atoi(azArg[i]);
        break;
    }
  }

  rc = lsm_new(0, &pDb);
  if( rc!=LSM_OK ){
    testPrintError("lsm_open(): rc=%d\n", rc);
  }else{
    rc = lsm_open(pDb, zDb);
    if( rc!=LSM_OK ){
      testPrintError("lsm_open(): rc=%d\n", rc);
    }else{
      rc = lsm_work(pDb, flags, nWork, 0);
      if( rc!=LSM_OK ){
        testPrintError("lsm_work(): rc=%d\n", rc);
      }
    }
  }
  if( rc==LSM_OK ){
    rc = lsm_checkpoint(pDb, 0);








|








|










<
>
>
>

















|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

#include "lsmtest.h"


int do_work(int nArg, char **azArg){
  struct Option {
    const char *zName;
  } aOpt [] = {
    { "-nmerge" },
    { "-npage" },
    { 0 }
  };

  lsm_db *pDb;
  int rc;
  int i;
  const char *zDb;
  int nMerge = 1;
  int nWork = (1<<30);

  if( nArg==0 ) goto usage;
  zDb = azArg[nArg-1];
  for(i=0; i<(nArg-1); i++){
    int iSel;
    rc = testArgSelect(aOpt, "option", azArg[i], &iSel);
    if( rc ) return rc;
    switch( iSel ){
      case 0:

        i++;
        if( i==(nArg-1) ) goto usage;
        nMerge = atoi(azArg[i]);
        break;
      case 1:
        i++;
        if( i==(nArg-1) ) goto usage;
        nWork = atoi(azArg[i]);
        break;
    }
  }

  rc = lsm_new(0, &pDb);
  if( rc!=LSM_OK ){
    testPrintError("lsm_open(): rc=%d\n", rc);
  }else{
    rc = lsm_open(pDb, zDb);
    if( rc!=LSM_OK ){
      testPrintError("lsm_open(): rc=%d\n", rc);
    }else{
      rc = lsm_work(pDb, nMerge, nWork, 0);
      if( rc!=LSM_OK ){
        testPrintError("lsm_work(): rc=%d\n", rc);
      }
    }
  }
  if( rc==LSM_OK ){
    rc = lsm_checkpoint(pDb, 0);
Changes to src/kvlsm.c.
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
      lsm_flush(p->pDb);
      break;
    }

    case SQLITE4_KVCTRL_LSM_MERGE: {
      int nPage = *(int*)pArg;
      int nWrite = 0;
      lsm_work(p->pDb, LSM_WORK_OPTIMIZE, nPage, &nWrite);
      *(int*)pArg = nWrite;
      break;
    }

    case SQLITE4_KVCTRL_LSM_CHECKPOINT: {
      lsm_checkpoint(p->pDb, 0);
      break;







|







387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
      lsm_flush(p->pDb);
      break;
    }

    case SQLITE4_KVCTRL_LSM_MERGE: {
      int nPage = *(int*)pArg;
      int nWrite = 0;
      lsm_work(p->pDb, 0, nPage, &nWrite);
      *(int*)pArg = nWrite;
      break;
    }

    case SQLITE4_KVCTRL_LSM_CHECKPOINT: {
      lsm_checkpoint(p->pDb, 0);
      break;
Changes to src/lsm.h.
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
**   Describe the race condition this function is subject to. Or remove
**   it somehow.
*/
int lsm_ckpt_size(lsm_db *, int *pnByte);

/*
** This function is called by a thread to work on the database structure.
** The actual operations performed by this function depend on the value 
** passed as the "flags" parameter:
**
** LSM_WORK_OPTIMIZE:
**   If nMerge suitable arrays cannot be found, where nMerge is as 
**   configured by LSM_CONFIG_NMERGE, merge together any arrays that
**   can be found. This is usually used to optimize the database by 
**   merging the whole thing into one big array.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite);

#define LSM_WORK_OPTIMIZE        0x00000002

int lsm_flush(lsm_db *pDb);

/*
** Attempt to checkpoint the current database snapshot. Return an LSM
** error code if an error occurs or LSM_OK otherwise.
**







<
<
<
<
<
<
<
<

|
<
<







469
470
471
472
473
474
475








476
477


478
479
480
481
482
483
484
**   Describe the race condition this function is subject to. Or remove
**   it somehow.
*/
int lsm_ckpt_size(lsm_db *, int *pnByte);

/*
** This function is called by a thread to work on the database structure.








*/
int lsm_work(lsm_db *pDb, int nMerge, int nPage, int *pnWrite);



int lsm_flush(lsm_db *pDb);

/*
** Attempt to checkpoint the current database snapshot. Return an LSM
** error code if an error occurs or LSM_OK otherwise.
**
Changes to src/lsmInt.h.
694
695
696
697
698
699
700

701
702
703
704
705
706
707
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

int lsmSortedWalkFreelist(lsm_db *, int (*)(void *, int, i64), void *);



int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);

void lsmSortedFreeLevel(lsm_env *pEnv, Level *);








>







694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

int lsmSortedWalkFreelist(lsm_db *, int (*)(void *, int, i64), void *);

int lsmSaveWorker(lsm_db *, int);

int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);

void lsmSortedFreeLevel(lsm_env *pEnv, Level *);

Changes to src/lsm_ckpt.c.
355
356
357
358
359
360
361

362
363
364
365
366
367
368
    ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc);
  }else{
    for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
      ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc);
    }
  }


  *piOut = iOut;
}

static void ckptExportAppendlist(
  lsm_db *db,                     /* Database connection */
  CkptBuffer *p,                  /* Checkpoint buffer to write to */
  int *piOut,                     /* IN/OUT: Offset within checkpoint buffer */







>







355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
    ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc);
  }else{
    for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
      ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc);
    }
  }

  assert( *pRc || iOut==CKPT_HDR_LO_CKSUM2+1 );
  *piOut = iOut;
}

static void ckptExportAppendlist(
  lsm_db *db,                     /* Database connection */
  CkptBuffer *p,                  /* Checkpoint buffer to write to */
  int *piOut,                     /* IN/OUT: Offset within checkpoint buffer */
412
413
414
415
416
417
418

419
420
421
422
423
424
425
  iLevel = 0;
  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
    ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
    iLevel++;
  }

  /* Write the freelist */

  if( rc==LSM_OK ){
    int nFree = pSnap->freelist.nEntry;
    ckptSetValue(&ckpt, iOut++, nFree, &rc);
    for(i=0; i<nFree; i++){
      FreelistEntry *p = &pSnap->freelist.aEntry[i];
      ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
      ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);







>







413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
  iLevel = 0;
  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
    ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
    iLevel++;
  }

  /* Write the freelist */
  assert( pSnap->freelist.nEntry<=pDb->nMaxFreelist );
  if( rc==LSM_OK ){
    int nFree = pSnap->freelist.nEntry;
    ckptSetValue(&ckpt, iOut++, nFree, &rc);
    for(i=0; i<nFree; i++){
      FreelistEntry *p = &pSnap->freelist.aEntry[i];
      ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
      ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
447
448
449
450
451
452
453







454
455
456
457
458
459
460
  iOut += 2;
  assert( iOut<=1024 );

#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, rc, 
      "ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry
  );







#endif

  *ppCkpt = (void *)ckpt.aCkpt;
  if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
  return rc;
}








>
>
>
>
>
>
>







449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
  iOut += 2;
  assert( iOut<=1024 );

#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, rc, 
      "ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry
  );
  for(i=0; i<pSnap->freelist.nEntry; i++){
  lsmLogMessage(pDb, rc, 
      "ckptExportSnapshot(): iBlk=%d id=%lld", 
      pSnap->freelist.aEntry[i].iBlk,
      pSnap->freelist.aEntry[i].iId
  );
  }
#endif

  *ppCkpt = (void *)ckpt.aCkpt;
  if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
  return rc;
}

Changes to src/lsm_main.c.
52
53
54
55
56
57
58







59
60
61
62
63
64
65
*/
static int xCmp(void *p1, int n1, void *p2, int n2){
  int res;
  res = memcmp(p1, p2, LSM_MIN(n1, n2));
  if( res==0 ) res = (n1-n2);
  return res;
}








/*
** Allocate a new db handle.
*/
int lsm_new(lsm_env *pEnv, lsm_db **ppDb){
  lsm_db *pDb;








>
>
>
>
>
>
>







52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
*/
static int xCmp(void *p1, int n1, void *p2, int n2){
  int res;
  res = memcmp(p1, p2, LSM_MIN(n1, n2));
  if( res==0 ) res = (n1-n2);
  return res;
}

static void xLog(void *pCtx, int rc, const char *z){
  (void)(rc);
  (void)(pCtx);
  fprintf(stderr, "%s\n", z);
  fflush(stderr);
}

/*
** Allocate a new db handle.
*/
int lsm_new(lsm_env *pEnv, lsm_db **ppDb){
  lsm_db *pDb;

83
84
85
86
87
88
89

90
91
92
93
94
95
96
  pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE;
  pDb->nMerge = LSM_DFLT_NMERGE;
  pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
  pDb->bUseLog = 1;
  pDb->iReader = -1;
  pDb->bMultiProc = 1;
  pDb->bMmap = LSM_IS_64_BIT;

  return LSM_OK;
}

lsm_env *lsm_get_env(lsm_db *pDb){
  assert( pDb->pEnv );
  return pDb->pEnv;
}







>







90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
  pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE;
  pDb->nMerge = LSM_DFLT_NMERGE;
  pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
  pDb->bUseLog = 1;
  pDb->iReader = -1;
  pDb->bMultiProc = 1;
  pDb->bMmap = LSM_IS_64_BIT;
  pDb->xLog = xLog;
  return LSM_OK;
}

lsm_env *lsm_get_env(lsm_db *pDb){
  assert( pDb->pEnv );
  return pDb->pEnv;
}
406
407
408
409
410
411
412






413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429

430
431
432
433
434
435
436
437
438
439
440
441
442
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */
  infoFreeWorker(pDb, bUnlock);
  *pzOut = s.z;
  return rc;
}







int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
  Snapshot *pWorker;              /* Worker snapshot */
  int bUnlock = 0;
  LsmString s;
  int i;
  int rc;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  lsmStringAppendf(&s, "%d", pWorker->freelist.nEntry);
  for(i=0; i<pWorker->freelist.nEntry; i++){
    FreelistEntry *p = &pWorker->freelist.aEntry[i];
    lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId);

  }
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */
  infoFreeWorker(pDb, bUnlock);
  *pzOut = s.z;
  return rc;
}

int lsm_info(lsm_db *pDb, int eParam, ...){
  int rc = LSM_OK;
  va_list ap;
  va_start(ap, eParam);







>
>
>
>
>
>













|
|
|
|
>

<



<







414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445

446
447
448

449
450
451
452
453
454
455
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */
  infoFreeWorker(pDb, bUnlock);
  *pzOut = s.z;
  return rc;
}

static int infoFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  LsmString *pStr = (LsmString *)pCtx;
  lsmStringAppendf(pStr, "%s{%d %lld}", (pStr->n?" ":""), iBlk, iSnapshot);
  return 0;
}

int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
  Snapshot *pWorker;              /* Worker snapshot */
  int bUnlock = 0;
  LsmString s;
  int i;
  int rc;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  rc = lsmWalkFreelist(pDb, infoFreelistCb, &s);
  if( rc!=LSM_OK ){
    lsmFree(pDb->pEnv, s.z);
  }else{
    *pzOut = s.z;
  }


  /* Release the snapshot and return */
  infoFreeWorker(pDb, bUnlock);

  return rc;
}

int lsm_info(lsm_db *pDb, int eParam, ...){
  int rc = LSM_OK;
  va_list ap;
  va_start(ap, eParam);
Changes to src/lsm_shared.c.
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
  assert( *pRc!=0 || pDb->pWorker );
  if( pDb->pWorker ){
    /* If no error has occurred, serialize the worker snapshot and write
    ** it to shared memory.  */
    if( *pRc==LSM_OK ){
      *pRc = lsmCheckpointSaveWorker(pDb, bFlush);
    }
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}







|







734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
  assert( *pRc!=0 || pDb->pWorker );
  if( pDb->pWorker ){
    /* If no error has occurred, serialize the worker snapshot and write
    ** it to shared memory.  */
    if( *pRc==LSM_OK ){
      *pRc = lsmSaveWorker(pDb, bFlush);
    }
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}
Changes to src/lsm_sorted.c.
4044
4045
4046
4047
4048
4049
4050


4051

4052
4053
4054
4055
4056
4057
4058
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    pCsr->pDb = pDb;
    rc = multiCursorVisitFreelist(pCsr);
    if( rc==LSM_OK ){
      rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
    }


    if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){

      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
    }

    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }







>
>
|
>







4044
4045
4046
4047
4048
4049
4050
4051
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    pCsr->pDb = pDb;
    rc = multiCursorVisitFreelist(pCsr);
    if( rc==LSM_OK ){
      rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
    }
    if( rc==LSM_OK 
        && eTree!=TREE_NONE
        && pNext && pNext->pMerge==0 && pNext->lhs.iRoot 
    ){
      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
    }

    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }
4340
4341
4342
4343
4344
4345
4346
4347
4348
4349
4350
4351
4352
4353
4354
4355



4356
4357
4358
4359
4360
4361
4362
  do {
    nRet++;
    p = p->pNext;
  }while( p && p->iAge==iAge );
  return nRet;
}

static int sortedSelectLevel(lsm_db *pDb, int bOpt, Level **ppOut){
  Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
  int rc = LSM_OK;
  Level *pLevel = 0;            /* Output value */
  Level *pBest = 0;             /* Best level to work on found so far */
  int nBest = pDb->nMerge-1;    /* Number of segments merged at pBest */
  Level *pThis = 0;             /* First in run of levels with age=iAge */
  int nThis = 0;                /* Number of levels starting at pThis */




  /* Find the longest contiguous run of levels not currently undergoing a 
  ** merge with the same age in the structure. Or the level being merged
  ** with the largest number of right-hand segments. Work on it. */
  for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
    if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
      nThis++;
    }else{







|




|



>
>
>







4343
4344
4345
4346
4347
4348
4349
4350
4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
4361
4362
4363
4364
4365
4366
4367
4368
  do {
    nRet++;
    p = p->pNext;
  }while( p && p->iAge==iAge );
  return nRet;
}

static int sortedSelectLevel(lsm_db *pDb, int nMerge, Level **ppOut){
  Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
  int rc = LSM_OK;
  Level *pLevel = 0;            /* Output value */
  Level *pBest = 0;             /* Best level to work on found so far */
  int nBest;                    /* Number of segments merged at pBest */
  Level *pThis = 0;             /* First in run of levels with age=iAge */
  int nThis = 0;                /* Number of levels starting at pThis */

  assert( nMerge>=1 );
  nBest = LSM_MAX(1, nMerge-1);

  /* Find the longest contiguous run of levels not currently undergoing a 
  ** merge with the same age in the structure. Or the level being merged
  ** with the largest number of right-hand segments. Work on it. */
  for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
    if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
      nThis++;
    }else{
4383
4384
4385
4386
4387
4388
4389
4390
4391
4392
4393
4394
4395
4396
4397
  }
  if( nThis>nBest ){
    assert( pThis );
    pBest = pThis;
    nBest = nThis;
  }

  if( pBest==0 && bOpt && pTopLevel->pNext ){
    pBest = pTopLevel;
    nBest = 2;
  }

  if( pBest ){
    if( pBest->nRight==0 ){
      rc = sortedMergeSetup(pDb, pBest, nBest, ppOut);







|







4389
4390
4391
4392
4393
4394
4395
4396
4397
4398
4399
4400
4401
4402
4403
  }
  if( nThis>nBest ){
    assert( pThis );
    pBest = pThis;
    nBest = nThis;
  }

  if( pBest==0 && nMerge==1 && pTopLevel && pTopLevel->pNext ){
    pBest = pTopLevel;
    nBest = 2;
  }

  if( pBest ){
    if( pBest->nRight==0 ){
      rc = sortedMergeSetup(pDb, pBest, nBest, ppOut);
4414
4415
4416
4417
4418
4419
4420
4421
4422
4423
4424
4425
4426
4427
4428
4429
4430
4431
4432
4433
4434
4435
4436
4437
4438
4439
4440
4441
4442
4443
  }
  return 0;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int bOptimize,                  /* True to merge less than nMerge levels */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;

  assert( pWorker );
  if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;

  while( nRemaining>0 ){
    Level *pLevel = 0;

    /* Find a level to work on. */
    rc = sortedSelectLevel(pDb, bOptimize, &pLevel);
    assert( rc==LSM_OK || pLevel==0 );

    if( pLevel==0 ){
      /* Could not find any work to do. Finished. */
      break;
    }else{
      MergeWorker mergeworker;    /* State used to work on the level merge */







|














|







4420
4421
4422
4423
4424
4425
4426
4427
4428
4429
4430
4431
4432
4433
4434
4435
4436
4437
4438
4439
4440
4441
4442
4443
4444
4445
4446
4447
4448
4449
  }
  return 0;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int nMerge,                     /* Try to merge this many levels at once */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;

  assert( pWorker );
  if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;

  while( nRemaining>0 ){
    Level *pLevel = 0;

    /* Find a level to work on. */
    rc = sortedSelectLevel(pDb, nMerge, &pLevel);
    assert( rc==LSM_OK || pLevel==0 );

    if( pLevel==0 ){
      /* Could not find any work to do. Finished. */
      break;
    }else{
      MergeWorker mergeworker;    /* State used to work on the level merge */
4475
4476
4477
4478
4479
4480
4481

4482
4483
4484
4485
4486
4487
4488
4489
4490
4491



4492
4493
4494
4495
4496
4497
4498
          ** database snapshot. This can only happen if all input keys were
          ** annihilated. Since keys are only annihilated if the new level
          ** is the last in the linked list (contains the most ancient of
          ** database content), this guarantees that pLevel->pNext==0.  */ 

          Level *pTop;          /* Top level of worker snapshot */
          Level **pp;           /* Read/write iterator for Level.pNext list */

          assert( pLevel->pNext==0 );

          /* Remove the level from the worker snapshot. */
          pTop = lsmDbSnapshotLevel(pWorker);
          for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
          *pp = pLevel->pNext;
          lsmDbSnapshotSetLevel(pWorker, pTop);

          /* Free the Level structure. */
          lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->lhs);



          sortedFreeLevel(pDb->pEnv, pLevel);
        }else{
          int i;

          /* Free the separators of the next level, if required. */
          if( pLevel->pMerge->nInput > pLevel->nRight ){
            assert( pLevel->pNext->lhs.iRoot );







>










>
>
>







4481
4482
4483
4484
4485
4486
4487
4488
4489
4490
4491
4492
4493
4494
4495
4496
4497
4498
4499
4500
4501
4502
4503
4504
4505
4506
4507
4508
          ** database snapshot. This can only happen if all input keys were
          ** annihilated. Since keys are only annihilated if the new level
          ** is the last in the linked list (contains the most ancient of
          ** database content), this guarantees that pLevel->pNext==0.  */ 

          Level *pTop;          /* Top level of worker snapshot */
          Level **pp;           /* Read/write iterator for Level.pNext list */
          int i;
          assert( pLevel->pNext==0 );

          /* Remove the level from the worker snapshot. */
          pTop = lsmDbSnapshotLevel(pWorker);
          for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
          *pp = pLevel->pNext;
          lsmDbSnapshotSetLevel(pWorker, pTop);

          /* Free the Level structure. */
          lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->lhs);
          for(i=0; i<pLevel->nRight; i++){
            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
          }
          sortedFreeLevel(pDb->pEnv, pLevel);
        }else{
          int i;

          /* Free the separators of the next level, if required. */
          if( pLevel->pMerge->nInput > pLevel->nRight ){
            assert( pLevel->pNext->lhs.iRoot );
4567
4568
4569
4570
4571
4572
4573









4574
4575
4576
4577
4578
4579
4580
4581
4582
4583
4584
4585
      bRet = 0;
    }
    *pRc = rc;
  }
  assert( *pRc==LSM_OK || bRet==0 );
  return bRet;
}










static int doLsmSingleWork(
  lsm_db *pDb, 
  int bShutdown,
  int flags, 
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
  int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
){
  int rc = LSM_OK;                /* Return code */
  int bDirty = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */







>
>
>
>
>
>
>
>
>




|







4577
4578
4579
4580
4581
4582
4583
4584
4585
4586
4587
4588
4589
4590
4591
4592
4593
4594
4595
4596
4597
4598
4599
4600
4601
4602
4603
4604
      bRet = 0;
    }
    *pRc = rc;
  }
  assert( *pRc==LSM_OK || bRet==0 );
  return bRet;
}

int lsmSaveWorker(lsm_db *pDb, int bFlush){
  Snapshot *p = pDb->pWorker;
  if( p->freelist.nEntry>pDb->nMaxFreelist ){
    int rc = sortedNewToplevel(pDb, TREE_NONE, 0);
    if( rc!=LSM_OK ) return rc;
  }
  return lsmCheckpointSaveWorker(pDb, bFlush);
}

static int doLsmSingleWork(
  lsm_db *pDb, 
  int bShutdown,
  int nMerge,                     /* Minimum segments to merge together */
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
  int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
){
  int rc = LSM_OK;                /* Return code */
  int bDirty = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
4595
4596
4597
4598
4599
4600
4601
4602
4603
4604
4605
4606
4607
4608
4609
4610
4611
4612
4613
4614
4615
4616
4617
4618
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
4629
4630
4631
4632
4633
4634
4635
4636
4637
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651
4652
4653
4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
4665
  /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  ** that this call stops writing when the auto-checkpoint is due. The
  ** caller will do the checkpoint, then possibly call this function again. */
  if( bShutdown==0 && pDb->nAutockpt ){
    u32 nSync;
    u32 nUnsync;
    int nPgsz;
    int nMax;

    lsmCheckpointSynced(pDb, 0, 0, &nSync);
    nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
    nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);

    nMax = (pDb->nAutockpt/nPgsz) - (nUnsync-nSync);
    if( nMax<nRem ){
      bCkpt = 1;
      nRem = LSM_MAX(nMax, 0);
    }
  }

  /* If there exists in-memory data ready to be flushed to disk, attempt
  ** to flush it now.  */
  if( sortedTreeHasOld(pDb, &rc) ){
    /* sortedDbIsFull() returns non-zero if either (a) there are too many
    ** levels in total in the db, or (b) there are too many levels with the
    ** the same age in the db. Either way, call sortedWork() to merge 
    ** existing segments together until this condition is cleared.  */
    if( sortedDbIsFull(pDb) ){
      int nPg = 0;
      rc = sortedWork(pDb, nRem, 0, 1, &nPg);
      nRem -= nPg;
      assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
      bDirty = 1;
    }

    if( rc==LSM_OK && nRem>0 ){
      int nPg = 0;
      rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
      nRem -= nPg;
      if( rc==LSM_OK ){
        if( pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        rc = lsmCheckpointSaveWorker(pDb, 1);
        bDirty = 0;
      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
    nRem -= nPg;
    if( nPg ) bDirty = 1;
  }

  /* If the in-memory part of the free-list is too large, write a new 
  ** top-level containing just the in-memory free-list entries to disk. */
  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
    int nPg = 0;
    while( rc==LSM_OK && sortedDbIsFull(pDb) ){
      rc = sortedWork(pDb, 16, 0, 1, &nPg);
      nRem -= nPg;
    }
    if( rc==LSM_OK ){
      rc = sortedNewToplevel(pDb, TREE_NONE, &nPg);
    }
    nRem -= nPg;
    if( nPg ) bDirty = 1;







<





|















|













|








<
|









|







4614
4615
4616
4617
4618
4619
4620

4621
4622
4623
4624
4625
4626
4627
4628
4629
4630
4631
4632
4633
4634
4635
4636
4637
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651
4652
4653
4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664

4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
  /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  ** that this call stops writing when the auto-checkpoint is due. The
  ** caller will do the checkpoint, then possibly call this function again. */
  if( bShutdown==0 && pDb->nAutockpt ){
    u32 nSync;
    u32 nUnsync;
    int nPgsz;


    lsmCheckpointSynced(pDb, 0, 0, &nSync);
    nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
    nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);

    nMax = LSM_MIN(nMax, (pDb->nAutockpt/nPgsz) - (nUnsync-nSync));
    if( nMax<nRem ){
      bCkpt = 1;
      nRem = LSM_MAX(nMax, 0);
    }
  }

  /* If there exists in-memory data ready to be flushed to disk, attempt
  ** to flush it now.  */
  if( sortedTreeHasOld(pDb, &rc) ){
    /* sortedDbIsFull() returns non-zero if either (a) there are too many
    ** levels in total in the db, or (b) there are too many levels with the
    ** the same age in the db. Either way, call sortedWork() to merge 
    ** existing segments together until this condition is cleared.  */
    if( sortedDbIsFull(pDb) ){
      int nPg = 0;
      rc = sortedWork(pDb, nRem, nMerge, 1, &nPg);
      nRem -= nPg;
      assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
      bDirty = 1;
    }

    if( rc==LSM_OK && nRem>0 ){
      int nPg = 0;
      rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
      nRem -= nPg;
      if( rc==LSM_OK ){
        if( pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        rc = lsmSaveWorker(pDb, 1);
        bDirty = 0;
      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;

    rc = sortedWork(pDb, nRem, nMerge, 0, &nPg);
    nRem -= nPg;
    if( nPg ) bDirty = 1;
  }

  /* If the in-memory part of the free-list is too large, write a new 
  ** top-level containing just the in-memory free-list entries to disk. */
  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
    int nPg = 0;
    while( rc==LSM_OK && sortedDbIsFull(pDb) ){
      rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
      nRem -= nPg;
    }
    if( rc==LSM_OK ){
      rc = sortedNewToplevel(pDb, TREE_NONE, &nPg);
    }
    nRem -= nPg;
    if( nPg ) bDirty = 1;
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689
4690
4691

4692
4693
4694
4695
4696
4697
4698
4699
4700
4701
4702
4703
4704
4705
4706
4707
4708
4709
4710
4711
4712
4713
4714
4715
4716
4717
4718
4719
4720

4721
4722
4723
4724
4725
4726
4727
4728
    if( pnWrite ) *pnWrite = 0;
    if( pbCkpt ) *pbCkpt = 0;
  }

  return rc;
}

static int doLsmWork(lsm_db *pDb, int flags, int nPage, int *pnWrite){
  int rc;
  int nWrite = 0;
  int bCkpt = 0;


  do {
    int nThis = 0;
    bCkpt = 0;
    rc = doLsmSingleWork(pDb, 0, flags, nPage-nWrite, &nThis, &bCkpt);
    nWrite += nThis;
    if( rc==LSM_OK && bCkpt ){
      rc = lsm_checkpoint(pDb, 0);
    }
  }while( rc==LSM_OK && (nWrite<nPage && bCkpt) );

  if( pnWrite ){
    if( rc==LSM_OK ){
      *pnWrite = nWrite;
    }else{
      *pnWrite = 0;
    }
  }
  return rc;
}

/*
** Perform work to merge database segments together.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite){

  /* This function may not be called if pDb has an open read or write
  ** transaction. Return LSM_MISUSE if an application attempts this.  */
  if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;


  return doLsmWork(pDb, flags, nPage, pnWrite);
}

int lsm_flush(lsm_db *db){
  int rc;

  if( db->nTransOpen>0 || db->pCsr ){
    rc = LSM_MISUSE_BKPT;







|




>



|



















|





>
|







4697
4698
4699
4700
4701
4702
4703
4704
4705
4706
4707
4708
4709
4710
4711
4712
4713
4714
4715
4716
4717
4718
4719
4720
4721
4722
4723
4724
4725
4726
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739
4740
4741
4742
4743
4744
4745
4746
4747
    if( pnWrite ) *pnWrite = 0;
    if( pbCkpt ) *pbCkpt = 0;
  }

  return rc;
}

static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
  int rc;
  int nWrite = 0;
  int bCkpt = 0;

  assert( nMerge>=1 );
  do {
    int nThis = 0;
    bCkpt = 0;
    rc = doLsmSingleWork(pDb, 0, nMerge, nPage-nWrite, &nThis, &bCkpt);
    nWrite += nThis;
    if( rc==LSM_OK && bCkpt ){
      rc = lsm_checkpoint(pDb, 0);
    }
  }while( rc==LSM_OK && (nWrite<nPage && bCkpt) );

  if( pnWrite ){
    if( rc==LSM_OK ){
      *pnWrite = nWrite;
    }else{
      *pnWrite = 0;
    }
  }
  return rc;
}

/*
** Perform work to merge database segments together.
*/
int lsm_work(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){

  /* This function may not be called if pDb has an open read or write
  ** transaction. Return LSM_MISUSE if an application attempts this.  */
  if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;

  if( nMerge<=0 ) nMerge = pDb->nMerge;
  return doLsmWork(pDb, nMerge, nPage, pnWrite);
}

int lsm_flush(lsm_db *db){
  int rc;

  if( db->nTransOpen>0 || db->pCsr ){
    rc = LSM_MISUSE_BKPT;
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
4793
4794
4795
4796
    int nRemaining;               /* Units of work to do before returning */

    nRemaining = nUnit * nDepth;
#ifdef LSM_LOG_WORK
    lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages", 
        nUnit, nDepth, nRemaining);
#endif
    rc = doLsmWork(pDb, 0, nRemaining, 0);
    if( rc==LSM_BUSY ) rc = LSM_OK;

    if( bRestore && pDb->pCsr ){
      lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
      pDb->pClient = 0;
      rc = lsmCheckpointLoad(pDb, 0);
      if( rc==LSM_OK ){







|







4801
4802
4803
4804
4805
4806
4807
4808
4809
4810
4811
4812
4813
4814
4815
    int nRemaining;               /* Units of work to do before returning */

    nRemaining = nUnit * nDepth;
#ifdef LSM_LOG_WORK
    lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages", 
        nUnit, nDepth, nRemaining);
#endif
    rc = doLsmWork(pDb, pDb->nMerge, nRemaining, 0);
    if( rc==LSM_BUSY ) rc = LSM_OK;

    if( bRestore && pDb->pCsr ){
      lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
      pDb->pClient = 0;
      rc = lsmCheckpointLoad(pDb, 0);
      if( rc==LSM_OK ){
4810
4811
4812
4813
4814
4815
4816
4817
4818
4819
4820
4821
4822
4823
4824
** any in-memory trees present (old or current) are written out to disk.
*/
int lsmFlushTreeToDisk(lsm_db *pDb){
  int rc;

  rc = lsmBeginWork(pDb);
  while( rc==LSM_OK && sortedDbIsFull(pDb) ){
    rc = sortedWork(pDb, 256, 0, 1, 0);
  }

  if( rc==LSM_OK ){
    rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
  }
  lsmFinishWork(pDb, 1, &rc);
  return rc;







|







4829
4830
4831
4832
4833
4834
4835
4836
4837
4838
4839
4840
4841
4842
4843
** any in-memory trees present (old or current) are written out to disk.
*/
int lsmFlushTreeToDisk(lsm_db *pDb){
  int rc;

  rc = lsmBeginWork(pDb);
  while( rc==LSM_OK && sortedDbIsFull(pDb) ){
    rc = sortedWork(pDb, 256, pDb->nMerge, 1, 0);
  }

  if( rc==LSM_OK ){
    rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
  }
  lsmFinishWork(pDb, 1, &rc);
  return rc;
Changes to test/ckpt1.test.
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  4K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  8K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 16K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 32K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 64K
}
do_test 3.2 {
  sqlite4_lsm_work db main -optimize 1000000
  execsql { SELECT count(*) FROM t1 }
} {65536}
do_test 3.3 {
  db close
  sqlite4 db test.db
  execsql { SELECT count(*) FROM t1 }
} {65536}
do_test 3.4 {
  execsql { INSERT INTO t1 VALUES(randstr(100,100), randstr(100,100)) }
  sqlite4_lsm_work db main -optimize 1000000
  execsql { SELECT count(*) FROM t1 }
} {65537}

finish_test








|









|





72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  4K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  8K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 16K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 32K
  INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 64K
}
do_test 3.2 {
  sqlite4_lsm_work db main -nmerge 1 -npage 1000000
  execsql { SELECT count(*) FROM t1 }
} {65536}
do_test 3.3 {
  db close
  sqlite4 db test.db
  execsql { SELECT count(*) FROM t1 }
} {65536}
do_test 3.4 {
  execsql { INSERT INTO t1 VALUES(randstr(100,100), randstr(100,100)) }
  sqlite4_lsm_work db main -nmerge 1 -npage 1000000
  execsql { SELECT count(*) FROM t1 }
} {65537}

finish_test

Changes to test/lsm1.test.
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
proc do_contents_test {tn res} {
  set con [contents]
  set res2 [list]
  foreach r $res {lappend res2 $r}
  uplevel do_test $tn [list [list set {} $con]] [list $res2]
}

if 1 {

do_test 1.1 {
  reopen
  db write abc def
  db close
} {}

do_test 1.2 {







<
<







48
49
50
51
52
53
54


55
56
57
58
59
60
61
proc do_contents_test {tn res} {
  set con [contents]
  set res2 [list]
  foreach r $res {lappend res2 $r}
  uplevel do_test $tn [list [list set {} $con]] [list $res2]
}



do_test 1.1 {
  reopen
  db write abc def
  db close
} {}

do_test 1.2 {
88
89
90
91
92
93
94

95
96
97
98
99
100
101
  db write ccc three
  db write ddd four
  db write eee five
  db write fff six
  reopen
  db delete_range a bbb
  reopen

  db work 10 
} {1}

do_contents_test 2.2 { {bbb two} {ccc three} {ddd four} {eee five} {fff six} }


#-------------------------------------------------------------------------







>







86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
  db write ccc three
  db write ddd four
  db write eee five
  db write fff six
  reopen
  db delete_range a bbb
  reopen
breakpoint
  db work 10 
} {1}

do_contents_test 2.2 { {bbb two} {ccc three} {ddd four} {eee five} {fff six} }


#-------------------------------------------------------------------------
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194

  db config {nmerge 3}
  db work 10

  contents
} {{10 ten} {17 seventeen} {20 twenty} {30 thirty} {40 forty}}

}

do_test 5.3 {
  reopen 1
  db config {nmerge 4}

  dbwrite { 10 ten    }  ; db flush
  dbwrite { 20 twenty }  ; db flush
  dbwrite { 30 thirty }  ; db flush







<
<







178
179
180
181
182
183
184


185
186
187
188
189
190
191

  db config {nmerge 3}
  db work 10

  contents
} {{10 ten} {17 seventeen} {20 twenty} {30 thirty} {40 forty}}



do_test 5.3 {
  reopen 1
  db config {nmerge 4}

  dbwrite { 10 ten    }  ; db flush
  dbwrite { 20 twenty }  ; db flush
  dbwrite { 30 thirty }  ; db flush
Added test/lsm2.test.


























































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 2012 November 02
#
# The author disclaims copyright to this source code.  In place of
# a legal notice, here is a blessing:
#
#    May you do good and not evil.
#    May you find forgiveness for yourself and forgive others.
#    May you share freely, never taking more than you give.
#
#***********************************************************************
#
set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix lsm2
db close

expr srand(0)

proc insert_data {nRow} {
  for {set i 0} {$i < $nRow} {incr i} {
    set key [string range [expr rand()] 0 10]
    set val [string repeat [string range [expr rand()] 0 10] 100]
    db write $key $val
  }
}

do_test 1.1 {
  forcedelete test.db
  lsm_open db test.db [list mmap 0 block_size [expr 256*1024]]
  insert_data 10000
} {}

do_test 1.2 {
  db flush
  db delete_range 0 9
  db flush
  db work 1 1000000
  db checkpoint

  db begin  1
  insert_data 10000
  db commit 0
} {}

finish_test
Changes to test/permutations.test.
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#
lappend ::testsuitelist xxx

test_suite "src4" -prefix "" -description {
} -files {
  simple.test simple2.test
  log3.test 
  lsm1.test
  csr1.test
  ckpt1.test
  mc1.test

  aggerror.test
  attach.test
  autoindex1.test







|







131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#
lappend ::testsuitelist xxx

test_suite "src4" -prefix "" -description {
} -files {
  simple.test simple2.test
  log3.test 
  lsm1.test lsm2.test
  csr1.test
  ckpt1.test
  mc1.test

  aggerror.test
  attach.test
  autoindex1.test
Changes to test/test_lsm.c.
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183

184
185
186
187
188
189
190
191
192


193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
    Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC);
    return TCL_ERROR;
  }
  return TCL_OK;
}

/*
** TCLCMD:    sqlite4_lsm_work DB DBNAME ?SWITCHES? ?N?
*/
static int test_sqlite4_lsm_work(
  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  struct Switch {
    const char *zSwitch;
    int flags;
  } aSwitch[] = {
    { "-optimize",   LSM_WORK_OPTIMIZE }, 

    { 0, 0 }
  };

  int flags = 0;
  int nPage = 0;
  const char *zDb;
  const char *zName;
  int i;
  int rc;
  sqlite4 *db;
  lsm_db *pLsm;
  int nWork;

  if( objc<3 ){
    Tcl_WrongNumArgs(interp, 1, objv, "DB DBNAME ?SWITCHES? ?N?");
    return TCL_ERROR;
  }
  zDb = Tcl_GetString(objv[1]);
  zName = Tcl_GetString(objv[2]);

  for(i=3; i<objc; i++){
    const char *z = Tcl_GetString(objv[i]);

    if( z[0]=='-' ){
      int iIdx;

      rc = Tcl_GetIndexFromObjStruct(
          interp, objv[i], aSwitch, sizeof(aSwitch[0]), "switch", 0, &iIdx
      );
      if( rc!=TCL_OK ) return rc;
      flags |= aSwitch[iIdx].flags;
    }else{
      rc = Tcl_GetIntFromObj(interp, objv[i], &nPage);
      if( rc!=TCL_OK ) return rc;
    }


  }

  rc = getDbPointer(interp, zDb, &db);
  if( rc!=TCL_OK ) return rc;

  rc = sqlite4_kvstore_control(db, zName, SQLITE4_KVCTRL_LSM_HANDLE, &pLsm);
  if( rc==SQLITE4_OK ){
    rc = lsm_work(pLsm, flags, nPage, &nWork);
  }
  if( rc!=SQLITE4_OK ){
    Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC);
    return TCL_ERROR;
  }

  Tcl_SetObjResult(interp, Tcl_NewIntObj(nWork));







|









<

|
>
|


|









|
|





|
<
<
<

>




<
<
|

<
>
>







|







139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155

156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179



180
181
182
183
184
185


186
187

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC);
    return TCL_ERROR;
  }
  return TCL_OK;
}

/*
** TCLCMD:    sqlite4_lsm_work DB DBNAME ?-nmerge N? ?-npage N?
*/
static int test_sqlite4_lsm_work(
  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  struct Switch {
    const char *zSwitch;

  } aSwitch[] = {
    { "-nmerge" },
    { "-npage" },
    { 0 }
  };

  int nMerge = 1;
  int nPage = 0;
  const char *zDb;
  const char *zName;
  int i;
  int rc;
  sqlite4 *db;
  lsm_db *pLsm;
  int nWork;

  if( objc!=3 && objc!=5 && objc!=7 ){
    Tcl_WrongNumArgs(interp, 1, objv, "DB DBNAME ?-nmerge N? ?-npage N?");
    return TCL_ERROR;
  }
  zDb = Tcl_GetString(objv[1]);
  zName = Tcl_GetString(objv[2]);

  for(i=3; i<objc; i+=2){



      int iIdx;
    int iVal;
      rc = Tcl_GetIndexFromObjStruct(
          interp, objv[i], aSwitch, sizeof(aSwitch[0]), "switch", 0, &iIdx
      );
      if( rc!=TCL_OK ) return rc;


    rc = Tcl_GetIntFromObj(interp, objv[i+1], &iVal);
      if( rc!=TCL_OK ) return rc;

    if( iIdx==0 ) nMerge = iVal;
    if( iIdx==1 ) nPage = iVal;
  }

  rc = getDbPointer(interp, zDb, &db);
  if( rc!=TCL_OK ) return rc;

  rc = sqlite4_kvstore_control(db, zName, SQLITE4_KVCTRL_LSM_HANDLE, &pLsm);
  if( rc==SQLITE4_OK ){
    rc = lsm_work(pLsm, nMerge, nPage, &nWork);
  }
  if( rc!=SQLITE4_OK ){
    Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC);
    return TCL_ERROR;
  }

  Tcl_SetObjResult(interp, Tcl_NewIntObj(nWork));
516
517
518
519
520
521
522
523
524
525

526
527
528
529
530
531
532
    /*  1 */ {"write",        2, "KEY VALUE"},
    /*  2 */ {"delete",       1, "KEY"},
    /*  3 */ {"delete_range", 2, "START-KEY END-KEY"},
    /*  4 */ {"begin",        1, "LEVEL"},
    /*  5 */ {"commit",       1, "LEVEL"},
    /*  6 */ {"rollback",     1, "LEVEL"},
    /*  7 */ {"csr_open",     1, "CSR"},
    /*  8 */ {"work",        -1, "NPAGE ?SWITCHES?"},
    /*  9 */ {"flush",        0, ""},
    /* 10 */ {"config",       1, "LIST"},

    {0, 0, 0}
  };
  int iCmd;
  int rc;
  TclLsm *p = (TclLsm *)clientData;

  if( objc<2 ){







|


>







513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
    /*  1 */ {"write",        2, "KEY VALUE"},
    /*  2 */ {"delete",       1, "KEY"},
    /*  3 */ {"delete_range", 2, "START-KEY END-KEY"},
    /*  4 */ {"begin",        1, "LEVEL"},
    /*  5 */ {"commit",       1, "LEVEL"},
    /*  6 */ {"rollback",     1, "LEVEL"},
    /*  7 */ {"csr_open",     1, "CSR"},
    /*  8 */ {"work",        -1, "?NMERGE? NPAGE"},
    /*  9 */ {"flush",        0, ""},
    /* 10 */ {"config",       1, "LIST"},
    /* 11 */ {"checkpoint",   0, ""},
    {0, 0, 0}
  };
  int iCmd;
  int rc;
  TclLsm *p = (TclLsm *)clientData;

  if( objc<2 ){
618
619
620
621
622
623
624
625

626
627
628
629

630


631
632
633

634
635

636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656





657
658
659
660
661
662
663
          (ClientData)pCsr, test_lsm_cursor_del
      );
      Tcl_SetObjResult(interp, objv[2]);
      return TCL_OK;
    }

    case 8: assert( 0==strcmp(aCmd[8].zCmd, "work") ); {
      int nWork;

      int nWrite = 0;
      int flags = 0;
      int i;


      rc = Tcl_GetIntFromObj(interp, objv[2], &nWork);


      if( rc!=TCL_OK ) return rc;

      for(i=3; i<objc; i++){

        int iOpt;
        const char *azOpt[] = { "-optimize", "-flush", 0 };


        rc = Tcl_GetIndexFromObj(interp, objv[i], azOpt, "option", 0, &iOpt);
        if( rc!=TCL_OK ) return rc;

        if( iOpt==0 ) flags |= LSM_WORK_OPTIMIZE;
      }

      rc = lsm_work(p->db, flags, nWork, &nWrite);
      if( rc!=LSM_OK ) return test_lsm_error(interp, "lsm_work", rc);
      Tcl_SetObjResult(interp, Tcl_NewIntObj(nWrite));
      return TCL_OK;
    }

    case 9: assert( 0==strcmp(aCmd[9].zCmd, "flush") ); {
      rc = lsm_flush(p->db);
      return test_lsm_error(interp, "lsm_flush", rc);
    }

    case 10: assert( 0==strcmp(aCmd[10].zCmd, "config") ); {
      return testConfigureLsm(interp, p->db, objv[2]);
    }






    default:
      assert( 0 );
  }

  Tcl_AppendResult(interp, "internal error", 0);
  return TCL_ERROR;







|
>

<


>

>
>

|
<
>
|
<
>
|
<


<
<
<
|













>
>
>
>
>







616
617
618
619
620
621
622
623
624
625

626
627
628
629
630
631
632
633

634
635

636
637

638
639



640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
          (ClientData)pCsr, test_lsm_cursor_del
      );
      Tcl_SetObjResult(interp, objv[2]);
      return TCL_OK;
    }

    case 8: assert( 0==strcmp(aCmd[8].zCmd, "work") ); {
      int nWork = 0;
      int nMerge = 1;
      int nWrite = 0;

      int i;

      if( objc==3 ){
      rc = Tcl_GetIntFromObj(interp, objv[2], &nWork);
      }else if( objc==4 ){
        rc = Tcl_GetIntFromObj(interp, objv[2], &nMerge);
      if( rc!=TCL_OK ) return rc;
        rc = Tcl_GetIntFromObj(interp, objv[3], &nWork);

      }else{
        Tcl_WrongNumArgs(interp, 2, objv, "?NMERGE? NWRITE");

        return TCL_ERROR;
      }

        if( rc!=TCL_OK ) return rc;




      rc = lsm_work(p->db, nMerge, nWork, &nWrite);
      if( rc!=LSM_OK ) return test_lsm_error(interp, "lsm_work", rc);
      Tcl_SetObjResult(interp, Tcl_NewIntObj(nWrite));
      return TCL_OK;
    }

    case 9: assert( 0==strcmp(aCmd[9].zCmd, "flush") ); {
      rc = lsm_flush(p->db);
      return test_lsm_error(interp, "lsm_flush", rc);
    }

    case 10: assert( 0==strcmp(aCmd[10].zCmd, "config") ); {
      return testConfigureLsm(interp, p->db, objv[2]);
    }

    case 11: assert( 0==strcmp(aCmd[11].zCmd, "checkpoint") ); {
      rc = lsm_checkpoint(p->db, 0);
      return test_lsm_error(interp, "lsm_checkpoint", rc);
    }

    default:
      assert( 0 );
  }

  Tcl_AppendResult(interp, "internal error", 0);
  return TCL_ERROR;
Changes to test/tester.tcl.
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
# a single b-tree structure. Because this annihilates all delete keys,
# the next rowid allocated for each table with an IPK will be as expected
# by SQLite 3 tests.
#
proc optimize_db {} { 
  #catch { 
    sqlite4_lsm_flush db main 
    sqlite4_lsm_work db main -opt 100000 
    sqlite4_lsm_checkpoint db main
  #}
  return ""
}


# If the library is compiled with the SQLITE4_DEFAULT_AUTOVACUUM macro set







|







1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
# a single b-tree structure. Because this annihilates all delete keys,
# the next rowid allocated for each table with an IPK will be as expected
# by SQLite 3 tests.
#
proc optimize_db {} { 
  #catch { 
    sqlite4_lsm_flush db main 
    sqlite4_lsm_work db main -nmerge 1 -npage 100000 
    sqlite4_lsm_checkpoint db main
  #}
  return ""
}


# If the library is compiled with the SQLITE4_DEFAULT_AUTOVACUUM macro set