SQLite4
Check-in [91912a39ca]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove the LSM_WORK_OPTIMIZE flag. Add free-list management related tests and fixes.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 91912a39ca5e4b3334feb5ce0fc2c67265975846
User & Date: dan 2012-11-07 20:08:06
Context
2012-11-08
11:59
Set a flag on levels that consist entirely of freelist entries. Use this flag to avoid counter-productive merges during database optimization. check-in: 48bd83a17a user: dan tags: trunk
2012-11-07
20:08
Remove the LSM_WORK_OPTIMIZE flag. Add free-list management related tests and fixes. check-in: 91912a39ca user: dan tags: trunk
2012-11-06
19:14
Fix lsmview.tcl so that it can view databases compressed with zlib. check-in: 7268cf7535 user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_func.c.

     2      2   #include "lsmtest.h"
     3      3   
     4      4   
     5      5   int do_work(int nArg, char **azArg){
     6      6     struct Option {
     7      7       const char *zName;
     8      8     } aOpt [] = {
     9         -    { "-optimize" },
            9  +    { "-nmerge" },
    10     10       { "-npage" },
    11     11       { 0 }
    12     12     };
    13     13   
    14     14     lsm_db *pDb;
    15     15     int rc;
    16     16     int i;
    17     17     const char *zDb;
    18         -  int flags = 0;
           18  +  int nMerge = 1;
    19     19     int nWork = (1<<30);
    20     20   
    21     21     if( nArg==0 ) goto usage;
    22     22     zDb = azArg[nArg-1];
    23     23     for(i=0; i<(nArg-1); i++){
    24     24       int iSel;
    25     25       rc = testArgSelect(aOpt, "option", azArg[i], &iSel);
    26     26       if( rc ) return rc;
    27     27       switch( iSel ){
    28     28         case 0:
    29         -        flags |= LSM_WORK_OPTIMIZE;
           29  +        i++;
           30  +        if( i==(nArg-1) ) goto usage;
           31  +        nMerge = atoi(azArg[i]);
    30     32           break;
    31     33         case 1:
    32     34           i++;
    33     35           if( i==(nArg-1) ) goto usage;
    34     36           nWork = atoi(azArg[i]);
    35     37           break;
    36     38       }
................................................................................
    40     42     if( rc!=LSM_OK ){
    41     43       testPrintError("lsm_open(): rc=%d\n", rc);
    42     44     }else{
    43     45       rc = lsm_open(pDb, zDb);
    44     46       if( rc!=LSM_OK ){
    45     47         testPrintError("lsm_open(): rc=%d\n", rc);
    46     48       }else{
    47         -      rc = lsm_work(pDb, flags, nWork, 0);
           49  +      rc = lsm_work(pDb, nMerge, nWork, 0);
    48     50         if( rc!=LSM_OK ){
    49     51           testPrintError("lsm_work(): rc=%d\n", rc);
    50     52         }
    51     53       }
    52     54     }
    53     55     if( rc==LSM_OK ){
    54     56       rc = lsm_checkpoint(pDb, 0);

Changes to src/kvlsm.c.

   387    387         lsm_flush(p->pDb);
   388    388         break;
   389    389       }
   390    390   
   391    391       case SQLITE4_KVCTRL_LSM_MERGE: {
   392    392         int nPage = *(int*)pArg;
   393    393         int nWrite = 0;
   394         -      lsm_work(p->pDb, LSM_WORK_OPTIMIZE, nPage, &nWrite);
          394  +      lsm_work(p->pDb, 0, nPage, &nWrite);
   395    395         *(int*)pArg = nWrite;
   396    396         break;
   397    397       }
   398    398   
   399    399       case SQLITE4_KVCTRL_LSM_CHECKPOINT: {
   400    400         lsm_checkpoint(p->pDb, 0);
   401    401         break;

Changes to src/lsm.h.

   469    469   **   Describe the race condition this function is subject to. Or remove
   470    470   **   it somehow.
   471    471   */
   472    472   int lsm_ckpt_size(lsm_db *, int *pnByte);
   473    473   
   474    474   /*
   475    475   ** This function is called by a thread to work on the database structure.
   476         -** The actual operations performed by this function depend on the value 
   477         -** passed as the "flags" parameter:
   478         -**
   479         -** LSM_WORK_OPTIMIZE:
   480         -**   If nMerge suitable arrays cannot be found, where nMerge is as 
   481         -**   configured by LSM_CONFIG_NMERGE, merge together any arrays that
   482         -**   can be found. This is usually used to optimize the database by 
   483         -**   merging the whole thing into one big array.
   484    476   */
   485         -int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite);
   486         -
   487         -#define LSM_WORK_OPTIMIZE        0x00000002
          477  +int lsm_work(lsm_db *pDb, int nMerge, int nPage, int *pnWrite);
   488    478   
   489    479   int lsm_flush(lsm_db *pDb);
   490    480   
   491    481   /*
   492    482   ** Attempt to checkpoint the current database snapshot. Return an LSM
   493    483   ** error code if an error occurs or LSM_OK otherwise.
   494    484   **

Changes to src/lsmInt.h.

   694    694   */
   695    695   int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
   696    696   void lsmSortedCleanup(lsm_db *);
   697    697   int lsmSortedAutoWork(lsm_db *, int nUnit);
   698    698   
   699    699   int lsmSortedWalkFreelist(lsm_db *, int (*)(void *, int, i64), void *);
   700    700   
          701  +int lsmSaveWorker(lsm_db *, int);
   701    702   
   702    703   int lsmFlushTreeToDisk(lsm_db *pDb);
   703    704   
   704    705   void lsmSortedRemap(lsm_db *pDb);
   705    706   
   706    707   void lsmSortedFreeLevel(lsm_env *pEnv, Level *);
   707    708   

Changes to src/lsm_ckpt.c.

   355    355       ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc);
   356    356     }else{
   357    357       for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
   358    358         ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc);
   359    359       }
   360    360     }
   361    361   
          362  +  assert( *pRc || iOut==CKPT_HDR_LO_CKSUM2+1 );
   362    363     *piOut = iOut;
   363    364   }
   364    365   
   365    366   static void ckptExportAppendlist(
   366    367     lsm_db *db,                     /* Database connection */
   367    368     CkptBuffer *p,                  /* Checkpoint buffer to write to */
   368    369     int *piOut,                     /* IN/OUT: Offset within checkpoint buffer */
................................................................................
   412    413     iLevel = 0;
   413    414     for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
   414    415       ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
   415    416       iLevel++;
   416    417     }
   417    418   
   418    419     /* Write the freelist */
          420  +  assert( pSnap->freelist.nEntry<=pDb->nMaxFreelist );
   419    421     if( rc==LSM_OK ){
   420    422       int nFree = pSnap->freelist.nEntry;
   421    423       ckptSetValue(&ckpt, iOut++, nFree, &rc);
   422    424       for(i=0; i<nFree; i++){
   423    425         FreelistEntry *p = &pSnap->freelist.aEntry[i];
   424    426         ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
   425    427         ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
................................................................................
   447    449     iOut += 2;
   448    450     assert( iOut<=1024 );
   449    451   
   450    452   #ifdef LSM_LOG_FREELIST
   451    453     lsmLogMessage(pDb, rc, 
   452    454         "ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry
   453    455     );
          456  +  for(i=0; i<pSnap->freelist.nEntry; i++){
          457  +  lsmLogMessage(pDb, rc, 
          458  +      "ckptExportSnapshot(): iBlk=%d id=%lld", 
          459  +      pSnap->freelist.aEntry[i].iBlk,
          460  +      pSnap->freelist.aEntry[i].iId
          461  +  );
          462  +  }
   454    463   #endif
   455    464   
   456    465     *ppCkpt = (void *)ckpt.aCkpt;
   457    466     if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
   458    467     return rc;
   459    468   }
   460    469   

Changes to src/lsm_main.c.

    52     52   */
    53     53   static int xCmp(void *p1, int n1, void *p2, int n2){
    54     54     int res;
    55     55     res = memcmp(p1, p2, LSM_MIN(n1, n2));
    56     56     if( res==0 ) res = (n1-n2);
    57     57     return res;
    58     58   }
           59  +
           60  +static void xLog(void *pCtx, int rc, const char *z){
           61  +  (void)(rc);
           62  +  (void)(pCtx);
           63  +  fprintf(stderr, "%s\n", z);
           64  +  fflush(stderr);
           65  +}
    59     66   
    60     67   /*
    61     68   ** Allocate a new db handle.
    62     69   */
    63     70   int lsm_new(lsm_env *pEnv, lsm_db **ppDb){
    64     71     lsm_db *pDb;
    65     72   
................................................................................
    83     90     pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE;
    84     91     pDb->nMerge = LSM_DFLT_NMERGE;
    85     92     pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
    86     93     pDb->bUseLog = 1;
    87     94     pDb->iReader = -1;
    88     95     pDb->bMultiProc = 1;
    89     96     pDb->bMmap = LSM_IS_64_BIT;
           97  +  pDb->xLog = xLog;
    90     98     return LSM_OK;
    91     99   }
    92    100   
    93    101   lsm_env *lsm_get_env(lsm_db *pDb){
    94    102     assert( pDb->pEnv );
    95    103     return pDb->pEnv;
    96    104   }
................................................................................
   406    414     rc = s.n>=0 ? LSM_OK : LSM_NOMEM;
   407    415   
   408    416     /* Release the snapshot and return */
   409    417     infoFreeWorker(pDb, bUnlock);
   410    418     *pzOut = s.z;
   411    419     return rc;
   412    420   }
          421  +
          422  +static int infoFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
          423  +  LsmString *pStr = (LsmString *)pCtx;
          424  +  lsmStringAppendf(pStr, "%s{%d %lld}", (pStr->n?" ":""), iBlk, iSnapshot);
          425  +  return 0;
          426  +}
   413    427   
   414    428   int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
   415    429     Snapshot *pWorker;              /* Worker snapshot */
   416    430     int bUnlock = 0;
   417    431     LsmString s;
   418    432     int i;
   419    433     int rc;
   420    434   
   421    435     /* Obtain the worker snapshot */
   422    436     rc = infoGetWorker(pDb, &pWorker, &bUnlock);
   423    437     if( rc!=LSM_OK ) return rc;
   424    438   
   425    439     lsmStringInit(&s, pDb->pEnv);
   426         -  lsmStringAppendf(&s, "%d", pWorker->freelist.nEntry);
   427         -  for(i=0; i<pWorker->freelist.nEntry; i++){
   428         -    FreelistEntry *p = &pWorker->freelist.aEntry[i];
   429         -    lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId);
          440  +  rc = lsmWalkFreelist(pDb, infoFreelistCb, &s);
          441  +  if( rc!=LSM_OK ){
          442  +    lsmFree(pDb->pEnv, s.z);
          443  +  }else{
          444  +    *pzOut = s.z;
   430    445     }
   431         -  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;
   432    446   
   433    447     /* Release the snapshot and return */
   434    448     infoFreeWorker(pDb, bUnlock);
   435         -  *pzOut = s.z;
   436    449     return rc;
   437    450   }
   438    451   
   439    452   int lsm_info(lsm_db *pDb, int eParam, ...){
   440    453     int rc = LSM_OK;
   441    454     va_list ap;
   442    455     va_start(ap, eParam);

Changes to src/lsm_shared.c.

   734    734   */
   735    735   void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
   736    736     assert( *pRc!=0 || pDb->pWorker );
   737    737     if( pDb->pWorker ){
   738    738       /* If no error has occurred, serialize the worker snapshot and write
   739    739       ** it to shared memory.  */
   740    740       if( *pRc==LSM_OK ){
   741         -      *pRc = lsmCheckpointSaveWorker(pDb, bFlush);
          741  +      *pRc = lsmSaveWorker(pDb, bFlush);
   742    742       }
   743    743       lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
   744    744       pDb->pWorker = 0;
   745    745     }
   746    746   
   747    747     lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
   748    748   }

Changes to src/lsm_sorted.c.

  4044   4044     pCsr = multiCursorNew(pDb, &rc);
  4045   4045     if( pCsr ){
  4046   4046       pCsr->pDb = pDb;
  4047   4047       rc = multiCursorVisitFreelist(pCsr);
  4048   4048       if( rc==LSM_OK ){
  4049   4049         rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
  4050   4050       }
  4051         -    if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){
         4051  +    if( rc==LSM_OK 
         4052  +        && eTree!=TREE_NONE
         4053  +        && pNext && pNext->pMerge==0 && pNext->lhs.iRoot 
         4054  +    ){
  4052   4055         pDel = &pNext->lhs;
  4053   4056         rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
  4054   4057       }
  4055   4058   
  4056   4059       if( pNext==0 ){
  4057   4060         multiCursorIgnoreDelete(pCsr);
  4058   4061       }
................................................................................
  4340   4343     do {
  4341   4344       nRet++;
  4342   4345       p = p->pNext;
  4343   4346     }while( p && p->iAge==iAge );
  4344   4347     return nRet;
  4345   4348   }
  4346   4349   
  4347         -static int sortedSelectLevel(lsm_db *pDb, int bOpt, Level **ppOut){
         4350  +static int sortedSelectLevel(lsm_db *pDb, int nMerge, Level **ppOut){
  4348   4351     Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
  4349   4352     int rc = LSM_OK;
  4350   4353     Level *pLevel = 0;            /* Output value */
  4351   4354     Level *pBest = 0;             /* Best level to work on found so far */
  4352         -  int nBest = pDb->nMerge-1;    /* Number of segments merged at pBest */
         4355  +  int nBest;                    /* Number of segments merged at pBest */
  4353   4356     Level *pThis = 0;             /* First in run of levels with age=iAge */
  4354   4357     int nThis = 0;                /* Number of levels starting at pThis */
  4355   4358   
         4359  +  assert( nMerge>=1 );
         4360  +  nBest = LSM_MAX(1, nMerge-1);
         4361  +
  4356   4362     /* Find the longest contiguous run of levels not currently undergoing a 
  4357   4363     ** merge with the same age in the structure. Or the level being merged
  4358   4364     ** with the largest number of right-hand segments. Work on it. */
  4359   4365     for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
  4360   4366       if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
  4361   4367         nThis++;
  4362   4368       }else{
................................................................................
  4383   4389     }
  4384   4390     if( nThis>nBest ){
  4385   4391       assert( pThis );
  4386   4392       pBest = pThis;
  4387   4393       nBest = nThis;
  4388   4394     }
  4389   4395   
  4390         -  if( pBest==0 && bOpt && pTopLevel->pNext ){
         4396  +  if( pBest==0 && nMerge==1 && pTopLevel && pTopLevel->pNext ){
  4391   4397       pBest = pTopLevel;
  4392   4398       nBest = 2;
  4393   4399     }
  4394   4400   
  4395   4401     if( pBest ){
  4396   4402       if( pBest->nRight==0 ){
  4397   4403         rc = sortedMergeSetup(pDb, pBest, nBest, ppOut);
................................................................................
  4414   4420     }
  4415   4421     return 0;
  4416   4422   }
  4417   4423   
  4418   4424   static int sortedWork(
  4419   4425     lsm_db *pDb,                    /* Database handle. Must be worker. */
  4420   4426     int nWork,                      /* Number of pages of work to do */
  4421         -  int bOptimize,                  /* True to merge less than nMerge levels */
         4427  +  int nMerge,                     /* Try to merge this many levels at once */
  4422   4428     int bFlush,                     /* Set if call is to make room for a flush */
  4423   4429     int *pnWrite                    /* OUT: Actual number of pages written */
  4424   4430   ){
  4425   4431     int rc = LSM_OK;                /* Return Code */
  4426   4432     int nRemaining = nWork;         /* Units of work to do before returning */
  4427   4433     Snapshot *pWorker = pDb->pWorker;
  4428   4434   
................................................................................
  4429   4435     assert( pWorker );
  4430   4436     if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;
  4431   4437   
  4432   4438     while( nRemaining>0 ){
  4433   4439       Level *pLevel = 0;
  4434   4440   
  4435   4441       /* Find a level to work on. */
  4436         -    rc = sortedSelectLevel(pDb, bOptimize, &pLevel);
         4442  +    rc = sortedSelectLevel(pDb, nMerge, &pLevel);
  4437   4443       assert( rc==LSM_OK || pLevel==0 );
  4438   4444   
  4439   4445       if( pLevel==0 ){
  4440   4446         /* Could not find any work to do. Finished. */
  4441   4447         break;
  4442   4448       }else{
  4443   4449         MergeWorker mergeworker;    /* State used to work on the level merge */
................................................................................
  4475   4481             ** database snapshot. This can only happen if all input keys were
  4476   4482             ** annihilated. Since keys are only annihilated if the new level
  4477   4483             ** is the last in the linked list (contains the most ancient of
  4478   4484             ** database content), this guarantees that pLevel->pNext==0.  */ 
  4479   4485   
  4480   4486             Level *pTop;          /* Top level of worker snapshot */
  4481   4487             Level **pp;           /* Read/write iterator for Level.pNext list */
         4488  +          int i;
  4482   4489             assert( pLevel->pNext==0 );
  4483   4490   
  4484   4491             /* Remove the level from the worker snapshot. */
  4485   4492             pTop = lsmDbSnapshotLevel(pWorker);
  4486   4493             for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
  4487   4494             *pp = pLevel->pNext;
  4488   4495             lsmDbSnapshotSetLevel(pWorker, pTop);
  4489   4496   
  4490   4497             /* Free the Level structure. */
  4491   4498             lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->lhs);
         4499  +          for(i=0; i<pLevel->nRight; i++){
         4500  +            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
         4501  +          }
  4492   4502             sortedFreeLevel(pDb->pEnv, pLevel);
  4493   4503           }else{
  4494   4504             int i;
  4495   4505   
  4496   4506             /* Free the separators of the next level, if required. */
  4497   4507             if( pLevel->pMerge->nInput > pLevel->nRight ){
  4498   4508               assert( pLevel->pNext->lhs.iRoot );
................................................................................
  4567   4577         bRet = 0;
  4568   4578       }
  4569   4579       *pRc = rc;
  4570   4580     }
  4571   4581     assert( *pRc==LSM_OK || bRet==0 );
  4572   4582     return bRet;
  4573   4583   }
         4584  +
         4585  +int lsmSaveWorker(lsm_db *pDb, int bFlush){
         4586  +  Snapshot *p = pDb->pWorker;
         4587  +  if( p->freelist.nEntry>pDb->nMaxFreelist ){
         4588  +    int rc = sortedNewToplevel(pDb, TREE_NONE, 0);
         4589  +    if( rc!=LSM_OK ) return rc;
         4590  +  }
         4591  +  return lsmCheckpointSaveWorker(pDb, bFlush);
         4592  +}
  4574   4593   
  4575   4594   static int doLsmSingleWork(
  4576   4595     lsm_db *pDb, 
  4577   4596     int bShutdown,
  4578         -  int flags, 
         4597  +  int nMerge,                     /* Minimum segments to merge together */
  4579   4598     int nPage,                      /* Number of pages to write to disk */
  4580   4599     int *pnWrite,                   /* OUT: Pages actually written to disk */
  4581   4600     int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
  4582   4601   ){
  4583   4602     int rc = LSM_OK;                /* Return code */
  4584   4603     int bDirty = 0;
  4585   4604     int nMax = nPage;               /* Maximum pages to write to disk */
................................................................................
  4595   4614     /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  4596   4615     ** that this call stops writing when the auto-checkpoint is due. The
  4597   4616     ** caller will do the checkpoint, then possibly call this function again. */
  4598   4617     if( bShutdown==0 && pDb->nAutockpt ){
  4599   4618       u32 nSync;
  4600   4619       u32 nUnsync;
  4601   4620       int nPgsz;
  4602         -    int nMax;
  4603   4621   
  4604   4622       lsmCheckpointSynced(pDb, 0, 0, &nSync);
  4605   4623       nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
  4606   4624       nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);
  4607   4625   
  4608         -    nMax = (pDb->nAutockpt/nPgsz) - (nUnsync-nSync);
         4626  +    nMax = LSM_MIN(nMax, (pDb->nAutockpt/nPgsz) - (nUnsync-nSync));
  4609   4627       if( nMax<nRem ){
  4610   4628         bCkpt = 1;
  4611   4629         nRem = LSM_MAX(nMax, 0);
  4612   4630       }
  4613   4631     }
  4614   4632   
  4615   4633     /* If there exists in-memory data ready to be flushed to disk, attempt
................................................................................
  4617   4635     if( sortedTreeHasOld(pDb, &rc) ){
  4618   4636       /* sortedDbIsFull() returns non-zero if either (a) there are too many
  4619   4637       ** levels in total in the db, or (b) there are too many levels with the
  4620   4638       ** the same age in the db. Either way, call sortedWork() to merge 
  4621   4639       ** existing segments together until this condition is cleared.  */
  4622   4640       if( sortedDbIsFull(pDb) ){
  4623   4641         int nPg = 0;
  4624         -      rc = sortedWork(pDb, nRem, 0, 1, &nPg);
         4642  +      rc = sortedWork(pDb, nRem, nMerge, 1, &nPg);
  4625   4643         nRem -= nPg;
  4626   4644         assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
  4627   4645         bDirty = 1;
  4628   4646       }
  4629   4647   
  4630   4648       if( rc==LSM_OK && nRem>0 ){
  4631   4649         int nPg = 0;
  4632   4650         rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
  4633   4651         nRem -= nPg;
  4634   4652         if( rc==LSM_OK ){
  4635   4653           if( pDb->nTransOpen>0 ){
  4636   4654             lsmTreeDiscardOld(pDb);
  4637   4655           }
  4638         -        rc = lsmCheckpointSaveWorker(pDb, 1);
         4656  +        rc = lsmSaveWorker(pDb, 1);
  4639   4657           bDirty = 0;
  4640   4658         }
  4641   4659       }
  4642   4660     }
  4643   4661   
  4644   4662     /* If nPage is still greater than zero, do some merging. */
  4645   4663     if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
  4646   4664       int nPg = 0;
  4647         -    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
  4648         -    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
         4665  +    rc = sortedWork(pDb, nRem, nMerge, 0, &nPg);
  4649   4666       nRem -= nPg;
  4650   4667       if( nPg ) bDirty = 1;
  4651   4668     }
  4652   4669   
  4653   4670     /* If the in-memory part of the free-list is too large, write a new 
  4654   4671     ** top-level containing just the in-memory free-list entries to disk. */
  4655   4672     if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
  4656   4673       int nPg = 0;
  4657   4674       while( rc==LSM_OK && sortedDbIsFull(pDb) ){
  4658         -      rc = sortedWork(pDb, 16, 0, 1, &nPg);
         4675  +      rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
  4659   4676         nRem -= nPg;
  4660   4677       }
  4661   4678       if( rc==LSM_OK ){
  4662   4679         rc = sortedNewToplevel(pDb, TREE_NONE, &nPg);
  4663   4680       }
  4664   4681       nRem -= nPg;
  4665   4682       if( nPg ) bDirty = 1;
................................................................................
  4680   4697       if( pnWrite ) *pnWrite = 0;
  4681   4698       if( pbCkpt ) *pbCkpt = 0;
  4682   4699     }
  4683   4700   
  4684   4701     return rc;
  4685   4702   }
  4686   4703   
  4687         -static int doLsmWork(lsm_db *pDb, int flags, int nPage, int *pnWrite){
         4704  +static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
  4688   4705     int rc;
  4689   4706     int nWrite = 0;
  4690   4707     int bCkpt = 0;
  4691   4708   
         4709  +  assert( nMerge>=1 );
  4692   4710     do {
  4693   4711       int nThis = 0;
  4694   4712       bCkpt = 0;
  4695         -    rc = doLsmSingleWork(pDb, 0, flags, nPage-nWrite, &nThis, &bCkpt);
         4713  +    rc = doLsmSingleWork(pDb, 0, nMerge, nPage-nWrite, &nThis, &bCkpt);
  4696   4714       nWrite += nThis;
  4697   4715       if( rc==LSM_OK && bCkpt ){
  4698   4716         rc = lsm_checkpoint(pDb, 0);
  4699   4717       }
  4700   4718     }while( rc==LSM_OK && (nWrite<nPage && bCkpt) );
  4701   4719   
  4702   4720     if( pnWrite ){
................................................................................
  4708   4726     }
  4709   4727     return rc;
  4710   4728   }
  4711   4729   
  4712   4730   /*
  4713   4731   ** Perform work to merge database segments together.
  4714   4732   */
  4715         -int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite){
         4733  +int lsm_work(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
  4716   4734   
  4717   4735     /* This function may not be called if pDb has an open read or write
  4718   4736     ** transaction. Return LSM_MISUSE if an application attempts this.  */
  4719   4737     if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;
  4720   4738   
  4721         -  return doLsmWork(pDb, flags, nPage, pnWrite);
         4739  +  if( nMerge<=0 ) nMerge = pDb->nMerge;
         4740  +  return doLsmWork(pDb, nMerge, nPage, pnWrite);
  4722   4741   }
  4723   4742   
  4724   4743   int lsm_flush(lsm_db *db){
  4725   4744     int rc;
  4726   4745   
  4727   4746     if( db->nTransOpen>0 || db->pCsr ){
  4728   4747       rc = LSM_MISUSE_BKPT;
................................................................................
  4782   4801       int nRemaining;               /* Units of work to do before returning */
  4783   4802   
  4784   4803       nRemaining = nUnit * nDepth;
  4785   4804   #ifdef LSM_LOG_WORK
  4786   4805       lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages", 
  4787   4806           nUnit, nDepth, nRemaining);
  4788   4807   #endif
  4789         -    rc = doLsmWork(pDb, 0, nRemaining, 0);
         4808  +    rc = doLsmWork(pDb, pDb->nMerge, nRemaining, 0);
  4790   4809       if( rc==LSM_BUSY ) rc = LSM_OK;
  4791   4810   
  4792   4811       if( bRestore && pDb->pCsr ){
  4793   4812         lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
  4794   4813         pDb->pClient = 0;
  4795   4814         rc = lsmCheckpointLoad(pDb, 0);
  4796   4815         if( rc==LSM_OK ){
................................................................................
  4810   4829   ** any in-memory trees present (old or current) are written out to disk.
  4811   4830   */
  4812   4831   int lsmFlushTreeToDisk(lsm_db *pDb){
  4813   4832     int rc;
  4814   4833   
  4815   4834     rc = lsmBeginWork(pDb);
  4816   4835     while( rc==LSM_OK && sortedDbIsFull(pDb) ){
  4817         -    rc = sortedWork(pDb, 256, 0, 1, 0);
         4836  +    rc = sortedWork(pDb, 256, pDb->nMerge, 1, 0);
  4818   4837     }
  4819   4838   
  4820   4839     if( rc==LSM_OK ){
  4821   4840       rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
  4822   4841     }
  4823   4842     lsmFinishWork(pDb, 1, &rc);
  4824   4843     return rc;

Changes to test/ckpt1.test.

    72     72     INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  4K
    73     73     INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   --  8K
    74     74     INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 16K
    75     75     INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 32K
    76     76     INSERT INTO t1 SELECT randstr(100,100), randstr(100,100) FROM t1;   -- 64K
    77     77   }
    78     78   do_test 3.2 {
    79         -  sqlite4_lsm_work db main -optimize 1000000
           79  +  sqlite4_lsm_work db main -nmerge 1 -npage 1000000
    80     80     execsql { SELECT count(*) FROM t1 }
    81     81   } {65536}
    82     82   do_test 3.3 {
    83     83     db close
    84     84     sqlite4 db test.db
    85     85     execsql { SELECT count(*) FROM t1 }
    86     86   } {65536}
    87     87   do_test 3.4 {
    88     88     execsql { INSERT INTO t1 VALUES(randstr(100,100), randstr(100,100)) }
    89         -  sqlite4_lsm_work db main -optimize 1000000
           89  +  sqlite4_lsm_work db main -nmerge 1 -npage 1000000
    90     90     execsql { SELECT count(*) FROM t1 }
    91     91   } {65537}
    92     92   
    93     93   finish_test
    94     94   

Changes to test/lsm1.test.

    48     48   proc do_contents_test {tn res} {
    49     49     set con [contents]
    50     50     set res2 [list]
    51     51     foreach r $res {lappend res2 $r}
    52     52     uplevel do_test $tn [list [list set {} $con]] [list $res2]
    53     53   }
    54     54   
    55         -if 1 {
    56         -
    57     55   do_test 1.1 {
    58     56     reopen
    59     57     db write abc def
    60     58     db close
    61     59   } {}
    62     60   
    63     61   do_test 1.2 {
................................................................................
    88     86     db write ccc three
    89     87     db write ddd four
    90     88     db write eee five
    91     89     db write fff six
    92     90     reopen
    93     91     db delete_range a bbb
    94     92     reopen
           93  +breakpoint
    95     94     db work 10 
    96     95   } {1}
    97     96   
    98     97   do_contents_test 2.2 { {bbb two} {ccc three} {ddd four} {eee five} {fff six} }
    99     98   
   100     99   
   101    100   #-------------------------------------------------------------------------
................................................................................
   179    178   
   180    179     db config {nmerge 3}
   181    180     db work 10
   182    181   
   183    182     contents
   184    183   } {{10 ten} {17 seventeen} {20 twenty} {30 thirty} {40 forty}}
   185    184   
   186         -}
   187         -
   188    185   do_test 5.3 {
   189    186     reopen 1
   190    187     db config {nmerge 4}
   191    188   
   192    189     dbwrite { 10 ten    }  ; db flush
   193    190     dbwrite { 20 twenty }  ; db flush
   194    191     dbwrite { 30 thirty }  ; db flush

Added test/lsm2.test.

            1  +# 2012 November 02
            2  +#
            3  +# The author disclaims copyright to this source code.  In place of
            4  +# a legal notice, here is a blessing:
            5  +#
            6  +#    May you do good and not evil.
            7  +#    May you find forgiveness for yourself and forgive others.
            8  +#    May you share freely, never taking more than you give.
            9  +#
           10  +#***********************************************************************
           11  +#
           12  +set testdir [file dirname $argv0]
           13  +source $testdir/tester.tcl
           14  +set testprefix lsm2
           15  +db close
           16  +
           17  +expr srand(0)
           18  +
           19  +proc insert_data {nRow} {
           20  +  for {set i 0} {$i < $nRow} {incr i} {
           21  +    set key [string range [expr rand()] 0 10]
           22  +    set val [string repeat [string range [expr rand()] 0 10] 100]
           23  +    db write $key $val
           24  +  }
           25  +}
           26  +
           27  +do_test 1.1 {
           28  +  forcedelete test.db
           29  +  lsm_open db test.db [list mmap 0 block_size [expr 256*1024]]
           30  +  insert_data 10000
           31  +} {}
           32  +
           33  +do_test 1.2 {
           34  +  db flush
           35  +  db delete_range 0 9
           36  +  db flush
           37  +  db work 1 1000000
           38  +  db checkpoint
           39  +
           40  +  db begin  1
           41  +  insert_data 10000
           42  +  db commit 0
           43  +} {}
           44  +
           45  +finish_test

Changes to test/permutations.test.

   131    131   #
   132    132   lappend ::testsuitelist xxx
   133    133   
   134    134   test_suite "src4" -prefix "" -description {
   135    135   } -files {
   136    136     simple.test simple2.test
   137    137     log3.test 
   138         -  lsm1.test
          138  +  lsm1.test lsm2.test
   139    139     csr1.test
   140    140     ckpt1.test
   141    141     mc1.test
   142    142   
   143    143     aggerror.test
   144    144     attach.test
   145    145     autoindex1.test

Changes to test/test_lsm.c.

   139    139       Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC);
   140    140       return TCL_ERROR;
   141    141     }
   142    142     return TCL_OK;
   143    143   }
   144    144   
   145    145   /*
   146         -** TCLCMD:    sqlite4_lsm_work DB DBNAME ?SWITCHES? ?N?
          146  +** TCLCMD:    sqlite4_lsm_work DB DBNAME ?-nmerge N? ?-npage N?
   147    147   */
   148    148   static int test_sqlite4_lsm_work(
   149    149     void * clientData,
   150    150     Tcl_Interp *interp,
   151    151     int objc,
   152    152     Tcl_Obj *CONST objv[]
   153    153   ){
   154    154     struct Switch {
   155    155       const char *zSwitch;
   156         -    int flags;
   157    156     } aSwitch[] = {
   158         -    { "-optimize",   LSM_WORK_OPTIMIZE }, 
   159         -    { 0, 0 }
          157  +    { "-nmerge" },
          158  +    { "-npage" },
          159  +    { 0 }
   160    160     };
   161    161   
   162         -  int flags = 0;
          162  +  int nMerge = 1;
   163    163     int nPage = 0;
   164    164     const char *zDb;
   165    165     const char *zName;
   166    166     int i;
   167    167     int rc;
   168    168     sqlite4 *db;
   169    169     lsm_db *pLsm;
   170    170     int nWork;
   171    171   
   172         -  if( objc<3 ){
   173         -    Tcl_WrongNumArgs(interp, 1, objv, "DB DBNAME ?SWITCHES? ?N?");
          172  +  if( objc!=3 && objc!=5 && objc!=7 ){
          173  +    Tcl_WrongNumArgs(interp, 1, objv, "DB DBNAME ?-nmerge N? ?-npage N?");
   174    174       return TCL_ERROR;
   175    175     }
   176    176     zDb = Tcl_GetString(objv[1]);
   177    177     zName = Tcl_GetString(objv[2]);
   178    178   
   179         -  for(i=3; i<objc; i++){
   180         -    const char *z = Tcl_GetString(objv[i]);
   181         -
   182         -    if( z[0]=='-' ){
   183         -      int iIdx;
   184         -      rc = Tcl_GetIndexFromObjStruct(
   185         -          interp, objv[i], aSwitch, sizeof(aSwitch[0]), "switch", 0, &iIdx
   186         -      );
   187         -      if( rc!=TCL_OK ) return rc;
   188         -      flags |= aSwitch[iIdx].flags;
   189         -    }else{
   190         -      rc = Tcl_GetIntFromObj(interp, objv[i], &nPage);
   191         -      if( rc!=TCL_OK ) return rc;
   192         -    }
          179  +  for(i=3; i<objc; i+=2){
          180  +    int iIdx;
          181  +    int iVal;
          182  +    rc = Tcl_GetIndexFromObjStruct(
          183  +        interp, objv[i], aSwitch, sizeof(aSwitch[0]), "switch", 0, &iIdx
          184  +        );
          185  +    if( rc!=TCL_OK ) return rc;
          186  +    rc = Tcl_GetIntFromObj(interp, objv[i+1], &iVal);
          187  +    if( rc!=TCL_OK ) return rc;
          188  +    if( iIdx==0 ) nMerge = iVal;
          189  +    if( iIdx==1 ) nPage = iVal;
   193    190     }
   194    191   
   195    192     rc = getDbPointer(interp, zDb, &db);
   196    193     if( rc!=TCL_OK ) return rc;
   197    194   
   198    195     rc = sqlite4_kvstore_control(db, zName, SQLITE4_KVCTRL_LSM_HANDLE, &pLsm);
   199    196     if( rc==SQLITE4_OK ){
   200         -    rc = lsm_work(pLsm, flags, nPage, &nWork);
          197  +    rc = lsm_work(pLsm, nMerge, nPage, &nWork);
   201    198     }
   202    199     if( rc!=SQLITE4_OK ){
   203    200       Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC);
   204    201       return TCL_ERROR;
   205    202     }
   206    203   
   207    204     Tcl_SetObjResult(interp, Tcl_NewIntObj(nWork));
................................................................................
   516    513       /*  1 */ {"write",        2, "KEY VALUE"},
   517    514       /*  2 */ {"delete",       1, "KEY"},
   518    515       /*  3 */ {"delete_range", 2, "START-KEY END-KEY"},
   519    516       /*  4 */ {"begin",        1, "LEVEL"},
   520    517       /*  5 */ {"commit",       1, "LEVEL"},
   521    518       /*  6 */ {"rollback",     1, "LEVEL"},
   522    519       /*  7 */ {"csr_open",     1, "CSR"},
   523         -    /*  8 */ {"work",        -1, "NPAGE ?SWITCHES?"},
          520  +    /*  8 */ {"work",        -1, "?NMERGE? NPAGE"},
   524    521       /*  9 */ {"flush",        0, ""},
   525    522       /* 10 */ {"config",       1, "LIST"},
          523  +    /* 11 */ {"checkpoint",   0, ""},
   526    524       {0, 0, 0}
   527    525     };
   528    526     int iCmd;
   529    527     int rc;
   530    528     TclLsm *p = (TclLsm *)clientData;
   531    529   
   532    530     if( objc<2 ){
................................................................................
   618    616             (ClientData)pCsr, test_lsm_cursor_del
   619    617         );
   620    618         Tcl_SetObjResult(interp, objv[2]);
   621    619         return TCL_OK;
   622    620       }
   623    621   
   624    622       case 8: assert( 0==strcmp(aCmd[8].zCmd, "work") ); {
   625         -      int nWork;
          623  +      int nWork = 0;
          624  +      int nMerge = 1;
   626    625         int nWrite = 0;
   627         -      int flags = 0;
   628    626         int i;
   629    627   
   630         -      rc = Tcl_GetIntFromObj(interp, objv[2], &nWork);
          628  +      if( objc==3 ){
          629  +        rc = Tcl_GetIntFromObj(interp, objv[2], &nWork);
          630  +      }else if( objc==4 ){
          631  +        rc = Tcl_GetIntFromObj(interp, objv[2], &nMerge);
          632  +        if( rc!=TCL_OK ) return rc;
          633  +        rc = Tcl_GetIntFromObj(interp, objv[3], &nWork);
          634  +      }else{
          635  +        Tcl_WrongNumArgs(interp, 2, objv, "?NMERGE? NWRITE");
          636  +        return TCL_ERROR;
          637  +      }
   631    638         if( rc!=TCL_OK ) return rc;
   632    639   
   633         -      for(i=3; i<objc; i++){
   634         -        int iOpt;
   635         -        const char *azOpt[] = { "-optimize", "-flush", 0 };
   636         -
   637         -        rc = Tcl_GetIndexFromObj(interp, objv[i], azOpt, "option", 0, &iOpt);
   638         -        if( rc!=TCL_OK ) return rc;
   639         -
   640         -        if( iOpt==0 ) flags |= LSM_WORK_OPTIMIZE;
   641         -      }
   642         -
   643         -      rc = lsm_work(p->db, flags, nWork, &nWrite);
          640  +      rc = lsm_work(p->db, nMerge, nWork, &nWrite);
   644    641         if( rc!=LSM_OK ) return test_lsm_error(interp, "lsm_work", rc);
   645    642         Tcl_SetObjResult(interp, Tcl_NewIntObj(nWrite));
   646    643         return TCL_OK;
   647    644       }
   648    645   
   649    646       case 9: assert( 0==strcmp(aCmd[9].zCmd, "flush") ); {
   650    647         rc = lsm_flush(p->db);
   651    648         return test_lsm_error(interp, "lsm_flush", rc);
   652    649       }
   653    650   
   654    651       case 10: assert( 0==strcmp(aCmd[10].zCmd, "config") ); {
   655    652         return testConfigureLsm(interp, p->db, objv[2]);
   656    653       }
          654  +
          655  +    case 11: assert( 0==strcmp(aCmd[11].zCmd, "checkpoint") ); {
          656  +      rc = lsm_checkpoint(p->db, 0);
          657  +      return test_lsm_error(interp, "lsm_checkpoint", rc);
          658  +    }
   657    659   
   658    660       default:
   659    661         assert( 0 );
   660    662     }
   661    663   
   662    664     Tcl_AppendResult(interp, "internal error", 0);
   663    665     return TCL_ERROR;

Changes to test/tester.tcl.

  1518   1518   # a single b-tree structure. Because this annihilates all delete keys,
  1519   1519   # the next rowid allocated for each table with an IPK will be as expected
  1520   1520   # by SQLite 3 tests.
  1521   1521   #
  1522   1522   proc optimize_db {} { 
  1523   1523     #catch { 
  1524   1524       sqlite4_lsm_flush db main 
  1525         -    sqlite4_lsm_work db main -opt 100000 
         1525  +    sqlite4_lsm_work db main -nmerge 1 -npage 100000 
  1526   1526       sqlite4_lsm_checkpoint db main
  1527   1527     #}
  1528   1528     return ""
  1529   1529   }
  1530   1530   
  1531   1531   
  1532   1532   # If the library is compiled with the SQLITE4_DEFAULT_AUTOVACUUM macro set
  1533   1533   # to non-zero, then set the global variable $AUTOVACUUM to 1.
  1534   1534   set AUTOVACUUM $sqlite_options(default_autovacuum)
  1535   1535   
  1536   1536   source $testdir/malloc_common.tcl