SQLite4
Check-in [ebca1063ac]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Rework the free block list storage so that it scales properly. Currently some test cases fail.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | freelist-rework
Files: files | file ages | folders
SHA1: ebca1063ac267424900688999dd0cea022d9d360
User & Date: dan 2012-10-29 20:04:54
Context
2012-10-30
16:27
Fix a couple of problems in the code that handles free block lists. check-in: 5c3e17cc90 user: dan tags: freelist-rework
2012-10-29
20:04
Rework the free block list storage so that it scales properly. Currently some test cases fail. check-in: ebca1063ac user: dan tags: freelist-rework
09:19
Fix a couple of crashes and a memory leak in OOM tests. check-in: 90f46bd082 user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

    69     69   #define LSM_CKPT_MIN_FREELIST     6
    70     70   #define LSM_CKPT_MAX_REFREE       2
    71     71   #define LSM_CKPT_MIN_NONLSM       (LSM_CKPT_MIN_FREELIST - LSM_CKPT_MAX_REFREE)
    72     72   
    73     73   typedef struct Database Database;
    74     74   typedef struct DbLog DbLog;
    75     75   typedef struct FileSystem FileSystem;
           76  +typedef struct Freelist Freelist;
           77  +typedef struct FreelistEntry FreelistEntry;
    76     78   typedef struct Level Level;
    77     79   typedef struct LogMark LogMark;
    78     80   typedef struct LogRegion LogRegion;
    79     81   typedef struct LogWriter LogWriter;
    80     82   typedef struct LsmString LsmString;
    81     83   typedef struct Mempool Mempool;
    82     84   typedef struct Merge Merge;
................................................................................
   318    320     int nTransOpen;                 /* Number of opened write transactions */
   319    321     int nTransAlloc;                /* Allocated size of aTrans[] array */
   320    322     TransMark *aTrans;              /* Array of marks for transaction rollback */
   321    323     IntArray rollback;              /* List of tree-nodes to roll back */
   322    324   
   323    325     /* Worker context */
   324    326     Snapshot *pWorker;              /* Worker snapshot (or NULL) */
          327  +  Freelist *pFreelist;            /* See sortedNewToplevel() */
          328  +  int bUseFreelist;               /* True to use pFreelist */
   325    329   
   326    330     /* Debugging message callback */
   327    331     void (*xLog)(void *, int, const char *);
   328    332     void *pLogCtx;
   329    333   
   330    334     /* Work done notification callback */
   331    335     void (*xWork)(lsm_db *, void *);
................................................................................
   446    450   };
   447    451   
   448    452   /* Return true if shm-sequence "a" is larger than or equal to "b" */
   449    453   #define shm_sequence_ge(a, b) (((u32)a-(u32)b) < (1<<30))
   450    454   
   451    455   #define LSM_APPLIST_SZ 4
   452    456   
   453         -typedef struct Freelist Freelist;
   454         -typedef struct FreelistEntry FreelistEntry;
   455         -
   456    457   /*
   457         -** An instance of the following structure stores the current database free
   458         -** block list. The free list is a list of blocks that are not currently
   459         -** used by the worker snapshot. Assocated with each block in the list is the
   460         -** snapshot id of the most recent snapshot that did actually use the block.
          458  +** An instance of the following structure stores the in-memory part of
          459  +** the current free block list. This structure is to the free block list
          460  +** as the in-memory tree is to the users database content. The contents 
          461  +** of the free block list is found by merging the in-memory components 
          462  +** with those stored in the LSM, just as the contents of the database is
          463  +** found by merging the in-memory tree with the user data entries in the
          464  +** LSM.
          465  +**
          466  +** Each FreelistEntry structure in the array represents either an insert
          467  +** or delete operation on the free-list. For deletes, the FreelistEntry.iId
          468  +** field is set to -1. For inserts, it is set to zero or greater. 
          469  +**
          470  +** The array of FreelistEntry structures is always sorted in order of
          471  +** block number (ascending).
          472  +**
          473  +** When the in-memory free block list is written into the LSM, each insert
          474  +** operation is written separately. The entry key is the bitwise inverse
          475  +** of the block number as a 32-bit big-endian integer. This is done so that
          476  +** the entries in the LSM are sorted in descending order of block id. 
          477  +** The associated value is the snapshot id, formated as a varint.
   461    478   */
   462    479   struct Freelist {
   463    480     FreelistEntry *aEntry;          /* Free list entries */
   464    481     int nEntry;                     /* Number of valid slots in aEntry[] */
   465    482     int nAlloc;                     /* Allocated size of aEntry[] */
   466    483   };
   467    484   struct FreelistEntry {
................................................................................
   687    704   
   688    705   /* 
   689    706   ** Functions from file "lsm_sorted.c".
   690    707   */
   691    708   int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
   692    709   void lsmSortedCleanup(lsm_db *);
   693    710   int lsmSortedAutoWork(lsm_db *, int nUnit);
          711  +
          712  +int lsmSortedWalkFreelist(lsm_db *, int (*)(void *, int, i64), void *);
   694    713   
   695    714   int lsmFlushTreeToDisk(lsm_db *pDb);
   696    715   
   697    716   void lsmSortedRemap(lsm_db *pDb);
   698    717   
   699    718   void lsmSortedFreeLevel(lsm_env *pEnv, Level *);
   700    719   

Changes to src/lsm_ckpt.c.

   379    379     for(i=0; i<LSM_APPLIST_SZ; i++){
   380    380       ckptAppend64(p, piOut, aiAppend[i], pRc);
   381    381     }
   382    382   };
   383    383   
   384    384   static int ckptExportSnapshot( 
   385    385     lsm_db *pDb,                    /* Connection handle */
   386         -  int nOvfl,                      /* Number of free-list entries in LSM */
   387    386     int bLog,                       /* True to update log-offset fields */
   388    387     i64 iId,                        /* Checkpoint id */
   389    388     int bCksum,                     /* If true, include checksums */
   390    389     void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
   391    390     int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
   392    391   ){
   393    392     int rc = LSM_OK;                /* Return Code */
................................................................................
   395    394     Snapshot *pSnap = pDb->pWorker; /* Worker snapshot */
   396    395     int nLevel = 0;                 /* Number of levels in checkpoint */
   397    396     int iLevel;                     /* Used to count out nLevel levels */
   398    397     int iOut = 0;                   /* Current offset in aCkpt[] */
   399    398     Level *pLevel;                  /* Level iterator */
   400    399     int i;                          /* Iterator used while serializing freelist */
   401    400     CkptBuffer ckpt;
   402         -  int nFree;
   403         - 
   404         -  nFree = pSnap->freelist.nEntry;
   405         -  if( nOvfl>=0 ){
   406         -    nFree -=  nOvfl;
   407         -  }else{
   408         -    assert( 0 );
   409         -    nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL];
   410         -  }
   411    401   
   412    402     /* Initialize the output buffer */
   413    403     memset(&ckpt, 0, sizeof(CkptBuffer));
   414    404     ckpt.pEnv = pDb->pEnv;
   415    405     iOut = CKPT_HDR_SIZE;
   416    406   
   417    407     /* Write the log offset into the checkpoint. */
................................................................................
   428    418     for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
   429    419       ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
   430    420       iLevel++;
   431    421     }
   432    422   
   433    423     /* Write the freelist */
   434    424     if( rc==LSM_OK ){
          425  +    int nFree = pSnap->freelist.nEntry;
   435    426       ckptSetValue(&ckpt, iOut++, nFree, &rc);
   436    427       for(i=0; i<nFree; i++){
   437    428         FreelistEntry *p = &pSnap->freelist.aEntry[i];
   438    429         ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
   439    430         ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
   440    431         ckptSetValue(&ckpt, iOut++, p->iId & 0xFFFFFFFF, &rc);
   441    432       }
................................................................................
   446    437     ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
   447    438     ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
   448    439     ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
   449    440     ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
   450    441     ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
   451    442     ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
   452    443     ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
   453         -  ckptSetValue(&ckpt, CKPT_HDR_OVFL, (nOvfl?nOvfl:pSnap->nFreelistOvfl), &rc);
          444  +  ckptSetValue(&ckpt, CKPT_HDR_OVFL, 0, &rc);
   454    445     ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);
   455    446   
   456    447     if( bCksum ){
   457    448       ckptAddChecksum(&ckpt, iOut, &rc);
   458    449     }else{
   459    450       ckptSetValue(&ckpt, iOut, 0, &rc);
   460    451       ckptSetValue(&ckpt, iOut+1, 0, &rc);
   461    452     }
   462    453     iOut += 2;
   463    454     assert( iOut<=1024 );
   464    455   
   465    456   #ifdef LSM_LOG_FREELIST
   466    457     lsmLogMessage(pDb, rc, 
   467         -      "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl
          458  +      "ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry
   468    459     );
   469    460   #endif
   470    461   
   471    462     *ppCkpt = (void *)ckpt.aCkpt;
   472    463     if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
   473    464     return rc;
   474    465   }
................................................................................
   676    667   */
   677    668   int lsmCheckpointOverflow(
   678    669     lsm_db *pDb,                    /* Database handle (must hold worker lock) */
   679    670     void **ppVal,                   /* OUT: lsmMalloc'd buffer */
   680    671     int *pnVal,                     /* OUT: Size of *ppVal in bytes */
   681    672     int *pnOvfl                     /* OUT: Number of freelist entries in buf */
   682    673   ){
          674  +  assert( 0 );
          675  +#if 0
   683    676     int rc = LSM_OK;
   684    677     int nRet;
   685    678     Snapshot *p = pDb->pWorker;
   686    679   
   687    680     assert( lsmShmAssertWorker(pDb) );
   688    681     assert( pnOvfl && ppVal && pnVal );
   689    682     assert( pDb->nMaxFreelist>=2 && pDb->nMaxFreelist<=LSM_MAX_FREELIST_ENTRIES );
................................................................................
   717    710   
   718    711       *ppVal = ckpt.aCkpt;
   719    712       *pnVal = iOut*sizeof(u32);
   720    713     }
   721    714   
   722    715     *pnOvfl = nRet;
   723    716     return rc;
          717  +#endif
   724    718   }
   725    719   
   726    720   /*
   727    721   ** The connection must be the worker in order to call this function.
   728    722   **
   729    723   ** True is returned if there are currently too many free-list entries
   730    724   ** in-memory to store in a checkpoint. Before calling CheckpointSaveWorker()
   731    725   ** to save the current worker snapshot, a new top-level LSM segment must
   732    726   ** be created so that some of them can be written to the LSM. 
   733    727   */
   734    728   int lsmCheckpointOverflowRequired(lsm_db *pDb){
   735    729     Snapshot *p = pDb->pWorker;
          730  +  assert( 0 );
   736    731     assert( lsmShmAssertWorker(pDb) );
   737    732     return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0);
   738    733   }
   739    734   
   740    735   /*
   741    736   ** Connection pDb must be the worker to call this function.
   742    737   **
................................................................................
   743    738   ** Load the FREELIST record from the database. Decode it and append the
   744    739   ** results to list pFreelist.
   745    740   */
   746    741   int lsmCheckpointOverflowLoad(
   747    742     lsm_db *pDb,
   748    743     Freelist *pFreelist
   749    744   ){
          745  +  assert( 0 );
          746  +#if 0
   750    747     int rc;
   751    748     int nVal = 0;
   752    749     void *pVal = 0;
   753    750     assert( lsmShmAssertWorker(pDb) );
   754    751   
   755    752     /* Load the blob of data from the LSM. If that is successful (and the
   756    753     ** blob is greater than zero bytes in size), decode the contents and
................................................................................
   807    804   #endif
   808    805       }
   809    806   
   810    807       lsmFree(pDb->pEnv, pVal);
   811    808     }
   812    809   
   813    810     return rc;
          811  +#endif
   814    812   }
   815    813   
   816    814   /*
   817    815   ** Read the checkpoint id from meta-page pPg.
   818    816   */
   819    817   static i64 ckptLoadId(MetaPage *pPg){
   820    818     i64 ret = 0;
................................................................................
  1150   1148   int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  1151   1149     Snapshot *pSnap = pDb->pWorker;
  1152   1150     ShmHeader *pShm = pDb->pShmhdr;
  1153   1151     void *p = 0;
  1154   1152     int n = 0;
  1155   1153     int rc;
  1156   1154   
  1157         -#if 0
  1158         -if( bFlush ){
  1159         -  printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1);
  1160         -  fflush(stdout);
  1161         -}
  1162         -#endif
  1163         -  assert( lsmFsIntegrityCheck(pDb) );
  1164         -  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
         1155  +  rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n);
  1165   1156     if( rc!=LSM_OK ) return rc;
  1166   1157     assert( ckptChecksumOk((u32 *)p) );
  1167   1158   
  1168   1159     assert( n<=LSM_META_PAGE_SIZE );
  1169   1160     memcpy(pShm->aSnap2, p, n);
  1170   1161     lsmShmBarrier(pDb);
  1171   1162     memcpy(pShm->aSnap1, p, n);
  1172   1163     lsmFree(pDb->pEnv, p);
  1173   1164   
         1165  +  assert( lsmFsIntegrityCheck(pDb) );
  1174   1166     return LSM_OK;
  1175   1167   }
  1176   1168   
  1177   1169   /*
  1178   1170   ** This function is used to determine the snapshot-id of the most recently
  1179   1171   ** checkpointed snapshot. Variable ShmHeader.iMetaPage indicates which of
  1180   1172   ** the two meta-pages said snapshot resides on (if any). 

Changes to src/lsm_file.c.

  2161   2161         if( bExtra && iLast==fsLastPageOnPagesBlock(pFS, iLast) ){
  2162   2162           fsBlockNext(pFS, iLastBlk, &iBlk);
  2163   2163           aUsed[iBlk-1] = 1;
  2164   2164         }
  2165   2165       }
  2166   2166     }
  2167   2167   }
         2168  +
         2169  +typedef struct CheckFreelistCtx CheckFreelistCtx;
         2170  +typedef struct CheckFreelistCtx {
         2171  +  u8 *aUsed;
         2172  +  int nBlock;
         2173  +};
         2174  +static int checkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
         2175  +  CheckFreelistCtx *p = (CheckFreelistCtx *)pCtx;
         2176  +
         2177  +  assert( iBlk>=1 );
         2178  +  assert( iBlk<=p->nBlock );
         2179  +  assert( p->aUsed[iBlk-1]==0 );
         2180  +  p->aUsed[iBlk-1] = 1;
         2181  +
         2182  +  return 0;
         2183  +}
  2168   2184   
  2169   2185   /*
  2170   2186   ** This function checks that all blocks in the database file are accounted
  2171   2187   ** for. For each block, exactly one of the following must be true:
  2172   2188   **
  2173   2189   **   + the block is part of a sorted run, or
  2174   2190   **   + the block is on the free-block list
................................................................................
  2176   2192   ** This function also checks that there are no references to blocks with
  2177   2193   ** out-of-range block numbers.
  2178   2194   **
  2179   2195   ** If no errors are found, non-zero is returned. If an error is found, an
  2180   2196   ** assert() fails.
  2181   2197   */
  2182   2198   int lsmFsIntegrityCheck(lsm_db *pDb){
         2199  +  CheckFreelistCtx ctx;
  2183   2200     FileSystem *pFS = pDb->pFS;
  2184   2201     int i;
  2185   2202     int j;
  2186   2203     Freelist freelist = {0, 0, 0};
  2187   2204     u8 *aUsed;
  2188   2205     Level *pLevel;
  2189   2206     Snapshot *pWorker = pDb->pWorker;
  2190   2207     int nBlock = pWorker->nBlock;
  2191   2208   
  2192   2209     aUsed = lsmMallocZero(pDb->pEnv, nBlock);
  2193   2210     if( aUsed==0 ){
  2194   2211       /* Malloc has failed. Since this function is only called within debug
  2195         -     ** builds, this probably means the user is running an OOM injection test.
  2196         -     ** Regardless, it will not be possible to run the integrity-check at this
  2197         -     ** time, so assume the database is Ok and return non-zero. */
         2212  +    ** builds, this probably means the user is running an OOM injection test.
         2213  +    ** Regardless, it will not be possible to run the integrity-check at this
         2214  +    ** time, so assume the database is Ok and return non-zero. */
  2198   2215       return 1;
  2199   2216     }
  2200   2217   
  2201   2218     for(pLevel=pWorker->pLevel; pLevel; pLevel=pLevel->pNext){
  2202   2219       int i;
  2203   2220       checkBlocks(pFS, &pLevel->lhs, (pLevel->nRight!=0), nBlock, aUsed);
  2204   2221       for(i=0; i<pLevel->nRight; i++){
  2205   2222         checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed);
  2206   2223       }
  2207   2224     }
  2208   2225   
  2209         -  if( pWorker->nFreelistOvfl ){
  2210         -    int rc = lsmCheckpointOverflowLoad(pDb, &freelist);
  2211         -    assert( rc==LSM_OK || rc==LSM_NOMEM );
  2212         -    if( rc!=LSM_OK ) return 1;
  2213         -  }
  2214         -
  2215         -  for(j=0; j<2; j++){
  2216         -    Freelist *pFreelist;
  2217         -    if( j==0 ) pFreelist = &pWorker->freelist;
  2218         -    if( j==1 ) pFreelist = &freelist;
  2219         -
  2220         -    for(i=0; i<pFreelist->nEntry; i++){
  2221         -      u32 iBlk = pFreelist->aEntry[i].iBlk;
  2222         -      assert( iBlk<=nBlock );
  2223         -      assert( aUsed[iBlk-1]==0 );
  2224         -      aUsed[iBlk-1] = 1;
  2225         -    }
  2226         -  }
         2226  +  /* Mark all blocks in the free-list as used */
         2227  +  ctx.aUsed = aUsed;
         2228  +  ctx.nBlock = nBlock;
         2229  +  lsmWalkFreelist(pDb, checkFreelistCb, (void *)&ctx);
  2227   2230   
  2228   2231     for(i=0; i<nBlock; i++) assert( aUsed[i]==1 );
  2229         -
  2230   2232     lsmFree(pDb->pEnv, aUsed);
  2231   2233     lsmFree(pDb->pEnv, freelist.aEntry);
  2232   2234   
  2233   2235     return 1;
  2234   2236   }
  2235   2237   
  2236   2238   #ifndef NDEBUG

Changes to src/lsm_shared.c.

    80     80     }
    81     81   }
    82     82   #else
    83     83   # define assertNotInFreelist(x,y)
    84     84   #endif
    85     85   
    86     86   /*
    87         -** Append an entry to the free-list.
           87  +** Append an entry to the free-list. If (iId==-1), this is a delete.
    88     88   */
    89         -int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){
           89  +int freelistAppend(lsm_db *db, int iBlk, i64 iId){
           90  +  lsm_env *pEnv = db->pEnv;
           91  +  Freelist *p;
           92  +  int i; 
    90     93   
    91         -  /* Assert that this is not an attempt to insert a duplicate block number */
    92         -#if 0
    93         -  assertNotInFreelist(p, iBlk);
    94         -#endif
           94  +  assert( iId==-1 || iId>=0 );
           95  +  p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist;
    95     96   
    96     97     /* Extend the space allocated for the freelist, if required */
    97     98     assert( p->nAlloc>=p->nEntry );
    98     99     if( p->nAlloc==p->nEntry ){
    99    100       int nNew; 
          101  +    int nByte; 
   100    102       FreelistEntry *aNew;
   101    103   
   102    104       nNew = (p->nAlloc==0 ? 4 : p->nAlloc*2);
   103         -    aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry,
   104         -                                       sizeof(FreelistEntry)*nNew);
          105  +    nByte = sizeof(FreelistEntry) * nNew;
          106  +    aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, nByte);
   105    107       if( !aNew ) return LSM_NOMEM_BKPT;
   106    108       p->nAlloc = nNew;
   107    109       p->aEntry = aNew;
   108    110     }
   109    111   
   110         -  /* Append the new entry to the freelist */
   111         -  p->aEntry[p->nEntry].iBlk = iBlk;
   112         -  p->aEntry[p->nEntry].iId = iId;
   113         -  p->nEntry++;
          112  +  for(i=0; i<p->nEntry; i++){
          113  +    assert( i==0 || p->aEntry[i].iBlk > p->aEntry[i-1].iBlk );
          114  +    if( p->aEntry[i].iBlk>=iBlk ) break;
          115  +  }
          116  +
          117  +  if( i<p->nEntry && p->aEntry[i].iBlk==iBlk ){
          118  +    /* Clobber an existing entry */
          119  +    p->aEntry[i].iId = iId;
          120  +  }else{
          121  +    /* Insert a new entry into the list */
          122  +    int nByte = sizeof(FreelistEntry)*(p->nEntry-i);
          123  +    memmove(&p->aEntry[i+1], &p->aEntry[i], nByte);
          124  +    p->aEntry[i].iBlk = iBlk;
          125  +    p->aEntry[i].iId = iId;
          126  +    p->nEntry++;
          127  +  }
   114    128   
   115    129     return LSM_OK;
   116    130   }
   117    131   
   118         -static int flInsertEntry(lsm_env *pEnv, Freelist *p, int iBlk){
   119         -  int rc;
   120         -
   121         -  rc = lsmFreelistAppend(pEnv, p, iBlk, 1);
   122         -  if( rc==LSM_OK ){
   123         -    memmove(&p->aEntry[1], &p->aEntry[0], sizeof(FreelistEntry)*(p->nEntry-1));
   124         -    p->aEntry[0].iBlk = iBlk;
   125         -    p->aEntry[0].iId = 1;
   126         -  }
   127         -  return rc;
   128         -}
   129         -
   130    132   /*
   131         -** Remove the first entry of the free-list.
   132         -*/
   133         -static void flRemoveEntry0(Freelist *p){
   134         -  int nNew = p->nEntry - 1;
   135         -  assert( nNew>=0 );
   136         -  memmove(&p->aEntry[0], &p->aEntry[1], sizeof(FreelistEntry) * nNew);
   137         -  p->nEntry = nNew;
   138         -}
   139         -
   140         -/*
   141         -** tHIS Function frees all resources held by the Database structure passed
          133  +** This function frees all resources held by the Database structure passed
   142    134   ** as the only argument.
   143    135   */
   144    136   static void freeDatabase(lsm_env *pEnv, Database *p){
   145    137     assert( holdingGlobalMutex(pEnv) );
   146    138     if( p ){
   147    139       /* Free the mutexes */
   148    140       lsmMutexDel(pEnv, p->pClientMutex);
................................................................................
   424    416     return pSnapshot->pLevel;
   425    417   }
   426    418   
   427    419   void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){
   428    420     pSnap->pLevel = pLevel;
   429    421   }
   430    422   
          423  +/* TODO: Shuffle things around to get rid of this */
          424  +static int firstSnapshotInUse(lsm_db *, i64 *);
          425  +
          426  +/* 
          427  +** Context object used by the lsmWalkFreelist() utility. 
          428  +*/
          429  +typedef struct WalkFreelistCtx WalkFreelistCtx;
          430  +struct WalkFreelistCtx {
          431  +  lsm_db *pDb;
          432  +  Freelist *pFreelist;
          433  +  int iFree;
          434  +  int (*xUsr)(void *, int, i64);  /* User callback function */
          435  +  void *pUsrctx;                  /* User callback context */
          436  +};
          437  +
          438  +/* 
          439  +** Callback used by lsmWalkFreelist().
          440  +*/
          441  +static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
          442  +  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
          443  +  Freelist *pFree = p->pFreelist;
          444  +
          445  +  while( (p->iFree < pFree->nEntry) ){
          446  +    FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
          447  +    if( pEntry->iBlk>iBlk ){
          448  +      break;
          449  +    }else{
          450  +      p->iFree++;
          451  +      if( pEntry->iId>=0 
          452  +       && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
          453  +      ){
          454  +        return 1;
          455  +      }
          456  +      if( pEntry->iBlk==iBlk ) return 0;
          457  +    }
          458  +  }
          459  +
          460  +  return p->xUsr(p->pUsrctx, iBlk, iSnapshot);
          461  +}
          462  +
          463  +/*
          464  +** The database handle passed as the first argument must be the worker
          465  +** connection. This function iterates through the contents of the current
          466  +** free block list, invoking the supplied callback once for each list
          467  +** element.
          468  +**
          469  +** The difference between this function and lsmSortedWalkFreelist() is
          470  +** that lsmSortedWalkFreelist() only considers those free-list elements
          471  +** stored within the LSM. This function also merges in any in-memory 
          472  +** elements.
          473  +*/
          474  +int lsmWalkFreelist(
          475  +  lsm_db *pDb,                    /* Database handle (must be worker) */
          476  +  int (*x)(void *, int, i64),     /* Callback function */
          477  +  void *pCtx                      /* First argument to pass to callback */
          478  +){
          479  +  int rc;
          480  +  WalkFreelistCtx ctx;
          481  +  ctx.pDb = pDb;
          482  +  ctx.pFreelist = &pDb->pWorker->freelist;
          483  +  ctx.iFree = 0;
          484  +  ctx.xUsr = x;
          485  +  ctx.pUsrctx = pCtx;
          486  +
          487  +  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx);
          488  +  if( rc==LSM_OK ){
          489  +    int i;
          490  +    for(i=ctx.iFree; i<ctx.pFreelist->nEntry; i++){
          491  +      FreelistEntry *pEntry = &ctx.pFreelist->aEntry[i];
          492  +      if( pEntry->iId>=0 && ctx.xUsr(ctx.pUsrctx, pEntry->iBlk, pEntry->iId) ){
          493  +        return 1;
          494  +      }
          495  +    }
          496  +  }
          497  +
          498  +  return rc;
          499  +}
          500  +
          501  +typedef struct FindFreeblockCtx FindFreeblockCtx;
          502  +struct FindFreeblockCtx {
          503  +  i64 iInUse;
          504  +  int iRet;
          505  +};
          506  +
          507  +static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){
          508  +  FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx;
          509  +  if( iSnapshot<p->iInUse ){
          510  +    p->iRet = iBlk;
          511  +    return 1;
          512  +  }
          513  +  return 0;
          514  +}
          515  +
          516  +static int findFreeblock(lsm_db *pDb, i64 iInUse, int *piRet){
          517  +  int rc;                         /* Return code */
          518  +  FindFreeblockCtx ctx;           /* Context object */
          519  +
          520  +  ctx.iInUse = iInUse;
          521  +  ctx.iRet = 0;
          522  +  rc = lsmWalkFreelist(pDb, findFreeblockCb, (void *)&ctx);
          523  +  *piRet = ctx.iRet;
          524  +  return rc;
          525  +}
   431    526   
   432    527   /*
   433    528   ** Allocate a new database file block to write data to, either by extending
   434    529   ** the database file or by recycling a free-list entry. The worker snapshot 
   435    530   ** must be held in order to call this function.
   436    531   **
   437    532   ** If successful, *piBlk is set to the block number allocated and LSM_OK is
   438    533   ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
   439    534   */
   440    535   int lsmBlockAllocate(lsm_db *pDb, int *piBlk){
   441    536     Snapshot *p = pDb->pWorker;
   442         -  Freelist *pFree;                /* Database free list */
   443    537     int iRet = 0;                   /* Block number of allocated block */
   444    538     int rc = LSM_OK;
   445         -
   446         -  assert( pDb->pWorker );
   447         - 
   448         -  pFree = &p->freelist;
   449         -  if( pFree->nEntry>0 ){
   450         -    /* The first block on the free list was freed as part of the work done
   451         -    ** to create the snapshot with id iFree. So, we can reuse this block if
   452         -    ** snapshot iFree or later has been checkpointed and all currently 
   453         -    ** active clients are reading from snapshot iFree or later.  */
   454         -    i64 iFree = pFree->aEntry[0].iId;
   455         -    int bInUse = 0;
   456         -
   457         -    /* The "is in use" bit */
   458         -    rc = lsmLsmInUse(pDb, iFree, &bInUse);
   459         -
   460         -    /* The "has been checkpointed" bit */
   461         -    if( rc==LSM_OK && bInUse==0 ){
   462         -      i64 iId = 0;
   463         -      rc = lsmCheckpointSynced(pDb, &iId, 0, 0);
   464         -      if( rc!=LSM_OK || iId<iFree ) bInUse = 2;
   465         -    }
   466         -
   467         -    if( rc==LSM_OK && bInUse==0 ){
   468         -      iRet = pFree->aEntry[0].iBlk;
   469         -      flRemoveEntry0(pFree);
   470         -      assert( iRet!=0 );
   471         -    }
          539  +  i64 iInUse = 0;                 /* Snapshot id still in use */
          540  +
          541  +  assert( p );
          542  +
          543  +  /* Set iInUse to the smallest snapshot id that is either:
          544  +  **
          545  +  **   * Currently in use by a database client,
          546  +  **   * May be used by a database client in the future, or
          547  +  **   * Is the most recently checkpointed snapshot (i.e. the one that will
          548  +  **     be used following recovery if a failure occurs at this point).
          549  +  */
          550  +  rc = lsmCheckpointSynced(pDb, &iInUse, 0, 0);
          551  +  if( rc==LSM_OK && iInUse==0 ) iInUse = p->iId;
          552  +  if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);
          553  +
          554  +  /* Query the free block list for a suitable block */
          555  +  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);
          556  +
          557  +  /* If a block was found in the free block list, use it and remove it from 
          558  +  ** the list. Otherwise, if no suitable block was found, allocate one from
          559  +  ** the end of the file.  */
          560  +  if( rc==LSM_OK ){
          561  +    if( iRet>0 ){
          562  +#ifdef LSM_LOG_BLOCKS
          563  +      lsmLogMessage(pDb, 0, "reusing block %d", iRet);
          564  +#endif
          565  +      rc = freelistAppend(pDb, iRet, -1);
          566  +    }else{
   472    567   #ifdef LSM_LOG_BLOCKS
   473         -    lsmLogMessage(
   474         -        pDb, 0, "%s reusing block %d%s", (iRet==0 ? "not " : ""),
   475         -        pFree->aEntry[0].iBlk, 
   476         -        bInUse==0 ? "" : bInUse==1 ? " (client)" : " (unsynced)"
   477         -    );
          568  +      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
   478    569   #endif
          570  +      iRet = ++(p->nBlock);
          571  +    }
   479    572     }
   480    573   
   481         -  /* If no block was allocated from the free-list, allocate one at the
   482         -  ** end of the file. */
   483         -  if( rc==LSM_OK && iRet==0 ){
   484         -    iRet = ++pDb->pWorker->nBlock;
   485         -#ifdef LSM_LOG_BLOCKS
   486         -    lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
   487         -#endif
   488         -  }
   489         -
          574  +  assert( iRet>0 || rc!=LSM_OK );
   490    575     *piBlk = iRet;
   491         -  return LSM_OK;
          576  +  return rc;
   492    577   }
   493    578   
   494    579   /*
   495    580   ** Free a database block. The worker snapshot must be held in order to call 
   496    581   ** this function.
   497    582   **
   498    583   ** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g. 
   499    584   ** LSM_NOMEM).
   500    585   */
   501    586   int lsmBlockFree(lsm_db *pDb, int iBlk){
   502    587     Snapshot *p = pDb->pWorker;
   503         -
   504    588     assert( lsmShmAssertWorker(pDb) );
   505         -  /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */
          589  +
   506    590   #ifdef LSM_LOG_FREELIST
   507    591     lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk);
   508    592   #endif
   509    593   
   510         -  return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId);
          594  +  return freelistAppend(pDb, iBlk, p->iId);
   511    595   }
   512    596   
   513    597   /*
   514    598   ** Refree a database block. The worker snapshot must be held in order to call 
   515    599   ** this function.
   516    600   **
   517    601   ** Refreeing is required when a block is allocated using lsmBlockAllocate()
................................................................................
   523    607   int lsmBlockRefree(lsm_db *pDb, int iBlk){
   524    608     int rc = LSM_OK;                /* Return code */
   525    609     Snapshot *p = pDb->pWorker;
   526    610   
   527    611     if( iBlk==p->nBlock ){
   528    612       p->nBlock--;
   529    613     }else{
   530         -    rc = flInsertEntry(pDb->pEnv, &p->freelist, iBlk);
          614  +    rc = freelistAppend(pDb, iBlk, 0);
   531    615     }
   532    616   
   533    617     return rc;
   534    618   }
   535    619   
   536    620   /*
   537    621   ** If required, copy a database checkpoint from shared memory into the
................................................................................
   958   1042     if( rc==LSM_BUSY ){
   959   1043       *pbInUse = 1;
   960   1044       return LSM_OK;
   961   1045     }
   962   1046     *pbInUse = 0;
   963   1047     return rc;
   964   1048   }
         1049  +
         1050  +/*
         1051  +** This function is called by worker connections to determine the smallest
         1052  +** snapshot id that is currently in use by a database client. The worker
         1053  +** connection uses this result to determine whether or not it is safe to
         1054  +** recycle a database block.
         1055  +*/
         1056  +static int firstSnapshotInUse(
         1057  +  lsm_db *db,                     /* Database handle */
         1058  +  i64 *piInUse                    /* IN/OUT: Smallest snapshot id in use */
         1059  +){
         1060  +  ShmHeader *pShm = db->pShmhdr;
         1061  +  i64 iInUse = *piInUse;
         1062  +  int i;
         1063  +
         1064  +  assert( iInUse>0 );
         1065  +  for(i=0; i<LSM_LOCK_NREADER; i++){
         1066  +    ShmReader *p = &pShm->aReader[i];
         1067  +    if( p->iLsmId ){
         1068  +      i64 iThis = p->iLsmId;
         1069  +      if( iThis!=0 && iInUse>iThis ){
         1070  +        int rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
         1071  +        if( rc==LSM_OK ){
         1072  +          p->iLsmId = 0;
         1073  +          lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
         1074  +        }else if( rc==LSM_BUSY ){
         1075  +          iInUse = iThis;
         1076  +        }else{
         1077  +          /* Some error other than LSM_BUSY. Return the error code to
         1078  +          ** the caller in this case.  */
         1079  +          return rc;
         1080  +        }
         1081  +      }
         1082  +    }
         1083  +  }
         1084  +
         1085  +  return LSM_OK;
         1086  +}
   965   1087   
   966   1088   int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
   967   1089     if( db->treehdr.iUsedShmid==iShmid ){
   968   1090       *pbInUse = 1;
   969   1091       return LSM_OK;
   970   1092     }
   971   1093     return isInUse(db, 0, iShmid, pbInUse);

Changes to src/lsm_sorted.c.

   192    192   
   193    193     int eType;                      /* Cache of current key type */
   194    194     Blob key;                       /* Cache of current key (or NULL) */
   195    195     Blob val;                       /* Cache of current value */
   196    196   
   197    197     /* All the component cursors: */
   198    198     TreeCursor *apTreeCsr[2];       /* Up to two tree cursors */
          199  +  int iFree;                      /* Next element of free-list (-ve for eof) */
   199    200     SegmentPtr *aPtr;               /* Array of segment pointers */
   200    201     int nPtr;                       /* Size of array aPtr[] */
   201    202     BtreeCursor *pBtCsr;            /* b-tree cursor (db writes only) */
   202    203   
   203    204     /* Comparison results */
   204    205     int nTree;                      /* Size of aTree[] array */
   205    206     int *aTree;                     /* Array of comparison results */
................................................................................
   208    209     int *pnOvfl;                    /* Number of free-list entries to store */
   209    210     void *pSystemVal;               /* Pointer to buffer to free */
   210    211   
   211    212     /* Used by worker cursors only */
   212    213     Pgno *pPrevMergePtr;
   213    214   };
   214    215   
   215         -#define CURSOR_DATA_TREE0     0   /* Current tree cursor */
   216         -#define CURSOR_DATA_TREE1     1   /* The "old" tree, if any */
   217         -#define CURSOR_DATA_SYSTEM    2
          216  +/*
          217  +** The following constants are used to assign integers to each component
          218  +** cursor of a multi-cursor.
          219  +*/
          220  +#define CURSOR_DATA_TREE0     0   /* Current tree cursor (apTreeCsr[0]) */
          221  +#define CURSOR_DATA_TREE1     1   /* The "old" tree, if any (apTreeCsr[1]) */
          222  +#define CURSOR_DATA_SYSTEM    2   /* Free-list entries (new-toplevel only) */
   218    223   #define CURSOR_DATA_SEGMENT   3
   219    224   
   220    225   /*
   221    226   ** CURSOR_IGNORE_DELETE
   222    227   **   If set, this cursor will not visit SORTED_DELETE keys.
   223    228   **
   224         -** CURSOR_NEW_SYSTEM
   225         -**   If set, then after all user data from the in-memory tree and any other
   226         -**   cursors has been visited, the cursor visits the live (uncommitted) 
   227         -**   versions of the two system keys: FREELIST AND LEVELS. This is used when 
   228         -**   flushing the in-memory tree to disk - the new free-list and levels record
   229         -**   are flushed along with it.
   230         -**
   231         -** CURSOR_AT_FREELIST
   232         -**   This flag is set when sub-cursor CURSOR_DATA_SYSTEM is actually
   233         -**   pointing at a free list.
          229  +** CURSOR_FLUSH_FREELIST
          230  +**   This cursor is being used to create a new toplevel. It should also 
          231  +**   iterate through the contents of the in-memory free block list.
   234    232   **
   235    233   ** CURSOR_IGNORE_SYSTEM
   236    234   **   If set, this cursor ignores system keys.
   237    235   **
   238    236   ** CURSOR_NEXT_OK
   239    237   **   Set if it is Ok to call lsm_csr_next().
   240    238   **
................................................................................
   247    245   **
   248    246   ** CURSOR_SEEK_EQ
   249    247   **   Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation.
   250    248   **   The key and value are stored in MultiCursor.key and MultiCursor.val
   251    249   **   respectively.
   252    250   */
   253    251   #define CURSOR_IGNORE_DELETE    0x00000001
   254         -#define CURSOR_NEW_SYSTEM       0x00000002
   255         -#define CURSOR_AT_FREELIST      0x00000004
          252  +#define CURSOR_FLUSH_FREELIST   0x00000002
   256    253   #define CURSOR_IGNORE_SYSTEM    0x00000010
   257    254   #define CURSOR_NEXT_OK          0x00000020
   258    255   #define CURSOR_PREV_OK          0x00000040
   259    256   #define CURSOR_READ_SEPARATORS  0x00000080
   260    257   #define CURSOR_SEEK_EQ          0x00000100
   261    258   
   262    259   typedef struct MergeWorker MergeWorker;
................................................................................
  1907   1904   
  1908   1905           lsmTreeCursorKey(pTreeCsr, &eType, &pKey, &nKey);
  1909   1906           lsmTreeCursorValue(pTreeCsr, &pVal, &nVal);
  1910   1907         }
  1911   1908         break;
  1912   1909       }
  1913   1910   
  1914         -    case CURSOR_DATA_SYSTEM:
  1915         -      if( pCsr->flags & CURSOR_AT_FREELIST ){
  1916         -        pKey = (void *)"FREELIST";
  1917         -        nKey = 8;
  1918         -        eType = LSM_SYSTEMKEY | LSM_INSERT;
         1911  +    case CURSOR_DATA_SYSTEM: {
         1912  +      Snapshot *pWorker = pCsr->pDb->pWorker;
         1913  +      if( (pCsr->flags & CURSOR_FLUSH_FREELIST) 
         1914  +       && pWorker && pWorker->freelist.nEntry > pCsr->iFree 
         1915  +      ){
         1916  +        int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1;
         1917  +        FreelistEntry *pEntry = &pWorker->freelist.aEntry[iEntry];
         1918  +        u32 i = ~((u32)(pEntry->iBlk));
         1919  +        lsmPutU32(pCsr->pSystemVal, i);
         1920  +        pKey = pCsr->pSystemVal;
         1921  +        nKey = 4;
         1922  +        if( pEntry->iId>=0 ){
         1923  +          eType = LSM_SYSTEMKEY | LSM_INSERT;
         1924  +        }else{
         1925  +          eType = LSM_SYSTEMKEY | LSM_POINT_DELETE;
         1926  +        }
  1919   1927         }
  1920   1928         break;
         1929  +    }
  1921   1930   
  1922   1931       default: {
  1923   1932         int iPtr = iKey - CURSOR_DATA_SEGMENT;
  1924   1933         assert( iPtr>=0 );
  1925   1934         if( iPtr==pCsr->nPtr ){
  1926   1935           if( pCsr->pBtCsr ){
  1927   1936             pKey = pCsr->pBtCsr->pKey;
................................................................................
  2250   2259   }
  2251   2260   
  2252   2261   /*
  2253   2262   ** If the free-block list is not empty, then have this cursor visit a key
  2254   2263   ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value 
  2255   2264   ** blob containing the serialized free-block list.
  2256   2265   */
  2257         -static void multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){
         2266  +static int multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){
         2267  +  int rc = LSM_OK;
         2268  +
  2258   2269     assert( pCsr );
  2259   2270     pCsr->pnOvfl = pnOvfl;
  2260         -  pCsr->flags |= CURSOR_NEW_SYSTEM;
         2271  +  pCsr->flags |= CURSOR_FLUSH_FREELIST;
         2272  +  pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
         2273  +  return rc;
  2261   2274   }
  2262   2275   
  2263   2276   /*
  2264   2277   ** Allocate and return a new database cursor.
  2265   2278   */
  2266   2279   int lsmMCursorNew(
  2267   2280     lsm_db *pDb,                    /* Database handle */
................................................................................
  2302   2315         }else{
  2303   2316           *ppVal = 0;
  2304   2317           *pnVal = 0;
  2305   2318         }
  2306   2319         break;
  2307   2320       }
  2308   2321   
  2309         -    case CURSOR_DATA_SYSTEM:
  2310         -      if( pCsr->flags & CURSOR_AT_FREELIST ){
  2311         -        void *aVal;
  2312         -        rc = lsmCheckpointOverflow(pCsr->pDb, &aVal, pnVal, pCsr->pnOvfl);
  2313         -        assert( pCsr->pSystemVal==0 );
  2314         -        *ppVal = pCsr->pSystemVal = aVal;
         2322  +    case CURSOR_DATA_SYSTEM: {
         2323  +      Snapshot *pWorker = pCsr->pDb->pWorker;
         2324  +      if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){
         2325  +        int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1;
         2326  +        u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
         2327  +        lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
         2328  +        *ppVal = aVal;
         2329  +        *pnVal = 8;
         2330  +        pCsr->pDb->bUseFreelist = 1;
  2315   2331         }
  2316   2332         break;
         2333  +    }
  2317   2334   
  2318   2335       default: {
  2319   2336         int iPtr = iVal-CURSOR_DATA_SEGMENT;
  2320   2337         if( iPtr<pCsr->nPtr ){
  2321   2338           SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
  2322   2339           if( pPtr->pPg ){
  2323   2340             *ppVal = pPtr->pVal;
................................................................................
  2324   2341             *pnVal = pPtr->nVal;
  2325   2342           }
  2326   2343         }
  2327   2344       }
  2328   2345     }
  2329   2346   
  2330   2347     assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
         2348  +  return rc;
         2349  +}
         2350  +
         2351  +/*
         2352  +** This function is called by worker connections to walk the part of the
         2353  +** free-list stored within the LSM data structure.
         2354  +*/
         2355  +int lsmSortedWalkFreelist(
         2356  +  lsm_db *pDb,                    /* Database handle */
         2357  +  int (*x)(void *, int, i64),     /* Callback function */
         2358  +  void *pCtx                      /* First argument to pass to callback */
         2359  +){
         2360  +  MultiCursor *pCsr;              /* Cursor used to read db */
         2361  +  int rc = LSM_OK;                /* Return Code */
         2362  +  Snapshot *pSnap = 0;
         2363  +
         2364  +  assert( pDb->pWorker );
         2365  +  rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
         2366  +  if( rc!=LSM_OK ) return rc;
         2367  +
         2368  +  pCsr = multiCursorNew(pDb, &rc);
         2369  +  if( pCsr ){
         2370  +    rc = multiCursorAddAll(pCsr, pSnap);
         2371  +    pCsr->flags |= CURSOR_IGNORE_DELETE;
         2372  +  }
         2373  +  
         2374  +  if( rc==LSM_OK ){
         2375  +    rc = lsmMCursorLast(pCsr);
         2376  +    while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
         2377  +      void *pKey; int nKey;
         2378  +      void *pVal; int nVal;
         2379  +
         2380  +      rc = lsmMCursorKey(pCsr, &pKey, &nKey);
         2381  +      if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
         2382  +      if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;
         2383  +
         2384  +      if( rc==LSM_OK ){
         2385  +        int iBlk;
         2386  +        i64 iSnap;
         2387  +        iBlk = (int)(~(lsmGetU32((u8 *)pKey)));
         2388  +        iSnap = (i64)lsmGetU64((u8 *)pVal);
         2389  +        if( x(pCtx, iBlk, iSnap) ) break;
         2390  +        rc = lsmMCursorPrev(pCsr);
         2391  +      }
         2392  +    }
         2393  +  }
         2394  +
         2395  +  lsmMCursorClose(pCsr);
         2396  +  lsmFreeSnapshot(pDb->pEnv, pSnap);
         2397  +
  2331   2398     return rc;
  2332   2399   }
  2333   2400   
  2334   2401   int lsmSortedLoadFreelist(
  2335   2402     lsm_db *pDb,                    /* Database handle (must be worker) */
  2336   2403     void **ppVal,                   /* OUT: Blob containing LSM free-list */
  2337   2404     int *pnVal                      /* OUT: Size of *ppVal blob in bytes */
................................................................................
  2446   2513     if( pCsr->apTreeCsr[0] ){
  2447   2514       rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast);
  2448   2515     }
  2449   2516     if( rc==LSM_OK && pCsr->apTreeCsr[1] ){
  2450   2517       rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast);
  2451   2518     }
  2452   2519   
  2453         -  if( pCsr->flags & CURSOR_NEW_SYSTEM ){
  2454         -    assert( bLast==0 );
  2455         -    pCsr->flags |= CURSOR_AT_FREELIST;
  2456         -  }
         2520  +  pCsr->iFree = 0;
  2457   2521   
  2458   2522     for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
  2459   2523       SegmentPtr *pPtr = &pCsr->aPtr[i];
  2460   2524       Level *pLvl = pPtr->pLevel;
  2461   2525   
  2462   2526       rc = segmentPtrEnd(pCsr, pPtr, bLast);
  2463   2527       if( rc==LSM_OK && bLast==0 && pLvl->nRight && pPtr->pSeg==&pLvl->lhs ){
................................................................................
  2622   2686     int rc = LSM_OK;                /* Return code */
  2623   2687     int iPtr = 0;                   /* Used to iterate through pCsr->aPtr[] */
  2624   2688     Pgno iPgno = 0;                 /* FC pointer value */
  2625   2689   
  2626   2690     if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
  2627   2691   
  2628   2692     assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
  2629         -  assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 );
  2630         -  assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 );
         2693  +  assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 );
  2631   2694     assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );
  2632   2695   
  2633   2696     pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
  2634   2697     rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop);
  2635   2698     if( rc==LSM_OK && bStop==0 ){
  2636   2699       rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
  2637   2700     }
................................................................................
  2757   2820           TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
  2758   2821           if( bReverse ){
  2759   2822             rc = lsmTreeCursorPrev(pTreeCsr);
  2760   2823           }else{
  2761   2824             rc = lsmTreeCursorNext(pTreeCsr);
  2762   2825           }
  2763   2826         }else if( iKey==CURSOR_DATA_SYSTEM ){
  2764         -        assert( pCsr->flags & CURSOR_AT_FREELIST );
  2765         -        assert( pCsr->flags & CURSOR_NEW_SYSTEM );
         2827  +        assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
  2766   2828           assert( bReverse==0 );
  2767         -        pCsr->flags &= ~CURSOR_AT_FREELIST;
         2829  +        pCsr->iFree++;
  2768   2830         }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
  2769   2831           assert( bReverse==0 && pCsr->pBtCsr );
  2770   2832           rc = btreeCursorNext(pCsr->pBtCsr);
  2771   2833         }else{
  2772   2834           rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
  2773   2835         }
  2774   2836         if( rc==LSM_OK ){
................................................................................
  3823   3885   ){
  3824   3886     int rc = LSM_OK;                /* Return Code */
  3825   3887     MultiCursor *pCsr = 0;
  3826   3888     Level *pNext = 0;               /* The current top level */
  3827   3889     Level *pNew;                    /* The new level itself */
  3828   3890     Segment *pDel = 0;              /* Delete separators from this segment */
  3829   3891     int nWrite = 0;                 /* Number of database pages written */
  3830         -
         3892  +  Freelist freelist;
  3831   3893     assert( pnOvfl );
  3832   3894   
         3895  +  pDb->pFreelist = &freelist;
         3896  +  assert( pDb->bUseFreelist==0 );
         3897  +  memset(&freelist, 0, sizeof(freelist));
         3898  +
  3833   3899     /* Allocate the new level structure to write to. */
  3834   3900     pNext = lsmDbSnapshotLevel(pDb->pWorker);
  3835   3901     pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  3836   3902     if( pNew ){
  3837   3903       pNew->pNext = pNext;
  3838   3904       lsmDbSnapshotSetLevel(pDb->pWorker, pNew);
  3839   3905     }
................................................................................
  3840   3906   
  3841   3907     /* Create a cursor to gather the data required by the new segment. The new
  3842   3908     ** segment contains everything in the tree and pointers to the next segment
  3843   3909     ** in the database (if any).  */
  3844   3910     pCsr = multiCursorNew(pDb, &rc);
  3845   3911     if( pCsr ){
  3846   3912       pCsr->pDb = pDb;
  3847         -    multiCursorVisitFreelist(pCsr, pnOvfl);
  3848         -    rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
         3913  +    rc = multiCursorVisitFreelist(pCsr, pnOvfl);
         3914  +    if( rc==LSM_OK ){
         3915  +      rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
         3916  +    }
  3849   3917       if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){
  3850   3918         pDel = &pNext->lhs;
  3851   3919         rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
  3852   3920       }
  3853   3921   
  3854   3922       if( pNext==0 ){
  3855   3923         multiCursorIgnoreDelete(pCsr);
................................................................................
  3901   3969     }
  3902   3970   
  3903   3971   #if 0
  3904   3972     lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
  3905   3973   #endif
  3906   3974   
  3907   3975     if( rc==LSM_OK ){
         3976  +    if( freelist.nEntry ){
         3977  +      Freelist *p = &pDb->pWorker->freelist;
         3978  +      lsmFree(pDb->pEnv, p->aEntry);
         3979  +      memcpy(p, &freelist, sizeof(freelist));
         3980  +      freelist.aEntry = 0;
         3981  +    }else{
         3982  +      pDb->pWorker->freelist.nEntry = 0;
         3983  +    }
         3984  +
  3908   3985       assertBtreeOk(pDb, &pNew->lhs);
  3909   3986       sortedInvokeWorkHook(pDb);
  3910   3987     }
  3911   3988   
  3912   3989     if( pnWrite ) *pnWrite = nWrite;
  3913   3990     pDb->pWorker->nWrite += nWrite;
         3991  +  pDb->pFreelist = 0;
         3992  +  pDb->bUseFreelist = 0;
         3993  +  lsmFree(pDb->pEnv, freelist.aEntry);
  3914   3994     return rc;
  3915   3995   }
  3916   3996   
  3917   3997   /*
  3918   3998   ** The nMerge levels in the LSM beginning with pLevel consist of a
  3919   3999   ** left-hand-side segment only. Replace these levels with a single new
  3920   4000   ** level consisting of a new empty segment on the left-hand-side and the
................................................................................
  4367   4447   ){
  4368   4448     int rc = LSM_OK;                /* Return code */
  4369   4449     int nOvfl = 0;
  4370   4450     int bFlush = 0;
  4371   4451     int nMax = nPage;               /* Maximum pages to write to disk */
  4372   4452     int nRem = nPage;
  4373   4453     int bCkpt = 0;
  4374         -  int bToplevel = 0;
  4375   4454   
  4376   4455     /* Open the worker 'transaction'. It will be closed before this function
  4377   4456     ** returns.  */
  4378   4457     assert( pDb->pWorker==0 );
  4379   4458     rc = lsmBeginWork(pDb);
  4380   4459     assert( rc!=8 );
  4381   4460     if( rc!=LSM_OK ) return rc;
................................................................................
  4403   4482     ** to flush it now.  */
  4404   4483     if( sortedTreeHasOld(pDb, &rc) ){
  4405   4484       if( sortedDbIsFull(pDb) ){
  4406   4485         int nPg = 0;
  4407   4486         rc = sortedWork(pDb, nRem, 0, 1, &nPg);
  4408   4487         nRem -= nPg;
  4409   4488         assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
  4410         -      bToplevel = 1;
  4411   4489       }
  4412   4490       if( rc==LSM_OK && nRem>0 ){
  4413   4491         int nPg = 0;
  4414   4492         rc = sortedNewToplevel(pDb, TREE_OLD, &nOvfl, &nPg);
  4415   4493         nRem -= nPg;
  4416   4494         if( rc==LSM_OK && pDb->nTransOpen>0 ){
  4417   4495           lsmTreeDiscardOld(pDb);
  4418   4496         }
  4419   4497         bFlush = 1;
  4420         -      bToplevel = 0;
  4421   4498       }
  4422   4499     }
  4423   4500   
  4424   4501     /* If nPage is still greater than zero, do some merging. */
  4425   4502     if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
  4426   4503       int nPg = 0;
  4427   4504       int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
  4428   4505       rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
  4429   4506       nRem -= nPg;
  4430   4507       if( nPg ){
  4431         -      bToplevel = 1;
  4432   4508         nOvfl = 0;
  4433   4509       }
  4434   4510     }
  4435   4511   
  4436         -  if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){
         4512  +  /* If the in-memory part of the free-list is too large, write a new 
         4513  +  ** top-level containing just the in-memory free-list entries to disk.
         4514  +  */
         4515  +  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > LSM_MAX_FREELIST_ENTRIES ){
         4516  +    int nPg = 0;
  4437   4517       while( rc==LSM_OK && sortedDbIsFull(pDb) ){
  4438         -      int nPg = 0;
  4439   4518         rc = sortedWork(pDb, 16, 0, 1, &nPg);
         4519  +      nRem -= nPg;
  4440   4520       }
  4441         -    if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
  4442         -      rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, 0);
         4521  +    if( rc==LSM_OK ){
         4522  +      rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, &nPg);
  4443   4523       }
         4524  +    nRem -= nPg;
  4444   4525     }
  4445   4526   
  4446   4527     if( rc==LSM_OK && (nRem!=nMax) ){
  4447   4528       lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  4448   4529     }else{
  4449   4530       int rcdummy = LSM_BUSY;
  4450   4531       assert( rc!=LSM_OK || bFlush==0 );