SQLite4
Check-in [f512ea3c4d]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove dead code. Fix a read-lock related problem causing the multi-threaded tests to fail.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | freelist-rework
Files: files | file ages | folders
SHA1: f512ea3c4d806460fd3b384f4cb02af3fa49473e
User & Date: dan 2012-10-31 18:46:38
Context
2012-10-31
19:27
Fix a crash in the check-blocks assert that may occur following an OOM condition. Leaf check-in: 503f49b0cc user: dan tags: freelist-rework
18:46
Remove dead code. Fix a read-lock related problem causing the multi-threaded tests to fail. check-in: f512ea3c4d user: dan tags: freelist-rework
16:37
Fix a compressed mode bug unrelated to the free block list. check-in: 6bf6b00b8b user: dan tags: freelist-rework
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_tdb3.c.

   894    894   int test_lsm_lomem_open(
   895    895     const char *zFilename, 
   896    896     int bClear, 
   897    897     TestDb **ppDb
   898    898   ){
   899    899     const char *zCfg = 
   900    900       "page_size=256 block_size=65536 write_buffer=16384 "
   901         -    "max_freelist=4 autocheckpoint=32768 "
          901  +    "max_freelist=2 autocheckpoint=32768 "
   902    902       "mmap=0 "
   903    903     ;
   904    904     return testLsmOpen(zCfg, zFilename, bClear, ppDb);
   905    905   }
   906    906   
   907    907   int test_lsm_zip_open(
   908    908     const char *zFilename, 
   909    909     int bClear, 
   910    910     TestDb **ppDb
   911    911   ){
   912    912     const char *zCfg = 
   913    913       "page_size=256 block_size=65536 write_buffer=16384 "
   914         -    "max_freelist=4 autocheckpoint=32768 compression=1"
          914  +    "max_freelist=2 autocheckpoint=32768 compression=1"
   915    915       "mmap=0 "
   916    916     ;
   917    917     return testLsmOpen(zCfg, zFilename, bClear, ppDb);
   918    918   }
   919    919   
   920    920   lsm_db *tdb_lsm(TestDb *pDb){
   921    921     if( pDb->pMethods->xClose==test_lsm_close ){

Changes to src/lsmInt.h.

    58     58   
    59     59   /* "mmap" mode is currently only used in environments with 64-bit address 
    60     60   ** spaces. The following macro is used to test for this.  */
    61     61   #define LSM_IS_64_BIT (sizeof(void*)==8)
    62     62   
    63     63   #define LSM_AUTOWORK_QUANT 32
    64     64   
    65         -/* Minimum number of free-list entries to store in the checkpoint, assuming
    66         -** the free-list contains this many entries. i.e. if overflow is required,
    67         -** the first LSM_CKPT_MIN_FREELIST entries are stored in the checkpoint and
    68         -** the remainder in an LSM system entry.  */
    69         -#define LSM_CKPT_MIN_FREELIST     6
    70         -#define LSM_CKPT_MAX_REFREE       2
    71         -#define LSM_CKPT_MIN_NONLSM       (LSM_CKPT_MIN_FREELIST - LSM_CKPT_MAX_REFREE)
    72         -
    73     65   typedef struct Database Database;
    74     66   typedef struct DbLog DbLog;
    75     67   typedef struct FileSystem FileSystem;
    76     68   typedef struct Freelist Freelist;
    77     69   typedef struct FreelistEntry FreelistEntry;
    78     70   typedef struct Level Level;
    79     71   typedef struct LogMark LogMark;
................................................................................
   497    489     i64 iId;                        /* Snapshot id */
   498    490     i64 iLogOff;                    /* Log file offset */
   499    491   
   500    492     /* Used by worker snapshots only */
   501    493     int nBlock;                     /* Number of blocks in database file */
   502    494     Pgno aiAppend[LSM_APPLIST_SZ];  /* Append point list */
   503    495     Freelist freelist;              /* Free block list */
   504         -  int nFreelistOvfl;              /* Number of extra free-list entries in LSM */
   505    496     u32 nWrite;                     /* Total number of pages written to disk */
   506    497   };
   507    498   #define LSM_INITIAL_SNAPSHOT_ID 11
   508    499   
   509    500   /*
   510    501   ** Functions from file "lsm_ckpt.c".
   511    502   */
   512    503   int lsmCheckpointWrite(lsm_db *, u32 *);
   513    504   int lsmCheckpointLevels(lsm_db *, int, void **, int *);
   514    505   int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);
   515    506   
   516         -int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);
   517         -int lsmCheckpointOverflowRequired(lsm_db *pDb);
   518         -int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *);
   519         -
   520    507   int lsmCheckpointRecover(lsm_db *);
   521    508   int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);
   522    509   
   523    510   int lsmCheckpointLoadWorker(lsm_db *pDb);
   524    511   int lsmCheckpointStore(lsm_db *pDb, int);
   525    512   
   526    513   int lsmCheckpointLoad(lsm_db *pDb, int *);
................................................................................
   531    518   u32 lsmCheckpointNWrite(u32 *, int);
   532    519   i64 lsmCheckpointLogOffset(u32 *);
   533    520   int lsmCheckpointPgsz(u32 *);
   534    521   int lsmCheckpointBlksz(u32 *);
   535    522   void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
   536    523   void lsmCheckpointZeroLogoffset(lsm_db *);
   537    524   
   538         -int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
          525  +int lsmCheckpointSaveWorker(lsm_db *pDb, int);
   539    526   int lsmDatabaseFull(lsm_db *pDb);
   540    527   int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite);
   541    528   
   542    529   
   543    530   /* 
   544    531   ** Functions from file "lsm_tree.c".
   545    532   */
................................................................................
   788    775   void lsmDbDatabaseRelease(lsm_db *);
   789    776   
   790    777   int lsmBeginReadTrans(lsm_db *);
   791    778   int lsmBeginWriteTrans(lsm_db *);
   792    779   int lsmBeginFlush(lsm_db *);
   793    780   
   794    781   int lsmBeginWork(lsm_db *);
   795         -void lsmFinishWork(lsm_db *, int, int, int *);
          782  +void lsmFinishWork(lsm_db *, int, int *);
   796    783   
   797    784   int lsmFinishRecovery(lsm_db *);
   798    785   void lsmFinishReadTrans(lsm_db *);
   799    786   int lsmFinishWriteTrans(lsm_db *, int);
   800    787   int lsmFinishFlush(lsm_db *, int);
   801    788   
   802    789   int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Changes to src/lsm_ckpt.c.

   163    163    + (((x)&0x00FF0000)>>8)  + (((x)&0xFF000000)>>24) \
   164    164   )
   165    165   
   166    166   static const int one = 1;
   167    167   #define LSM_LITTLE_ENDIAN (*(u8 *)(&one))
   168    168   
   169    169   /* Sizes, in integers, of various parts of the checkpoint. */
   170         -#define CKPT_HDR_SIZE         9
          170  +#define CKPT_HDR_SIZE         8
   171    171   #define CKPT_LOGPTR_SIZE      4
   172    172   #define CKPT_APPENDLIST_SIZE  (LSM_APPLIST_SZ * 2)
   173    173   
   174    174   /* A #define to describe each integer in the checkpoint header. */
   175    175   #define CKPT_HDR_ID_MSW   0
   176    176   #define CKPT_HDR_ID_LSW   1
   177    177   #define CKPT_HDR_NCKPT    2
   178    178   #define CKPT_HDR_NBLOCK   3
   179    179   #define CKPT_HDR_BLKSZ    4
   180    180   #define CKPT_HDR_NLEVEL   5
   181    181   #define CKPT_HDR_PGSZ     6
   182         -#define CKPT_HDR_OVFL     7
   183         -#define CKPT_HDR_NWRITE   8
          182  +#define CKPT_HDR_NWRITE   7
   184    183   
   185         -#define CKPT_HDR_LO_MSW     9
   186         -#define CKPT_HDR_LO_LSW    10
   187         -#define CKPT_HDR_LO_CKSUM1 11
   188         -#define CKPT_HDR_LO_CKSUM2 12
          184  +#define CKPT_HDR_LO_MSW     8
          185  +#define CKPT_HDR_LO_LSW     9
          186  +#define CKPT_HDR_LO_CKSUM1 10
          187  +#define CKPT_HDR_LO_CKSUM2 11
   189    188   
   190    189   typedef struct CkptBuffer CkptBuffer;
   191    190   
   192    191   /*
   193    192   ** Dynamic buffer used to accumulate data for a checkpoint.
   194    193   */
   195    194   struct CkptBuffer {
................................................................................
   437    436     ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
   438    437     ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
   439    438     ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
   440    439     ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
   441    440     ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
   442    441     ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
   443    442     ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
   444         -  ckptSetValue(&ckpt, CKPT_HDR_OVFL, 0, &rc);
   445    443     ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);
   446    444   
   447    445     if( bCksum ){
   448    446       ckptAddChecksum(&ckpt, iOut, &rc);
   449    447     }else{
   450    448       ckptSetValue(&ckpt, iOut, 0, &rc);
   451    449       ckptSetValue(&ckpt, iOut+1, 0, &rc);
................................................................................
   655    653       *pnVal = 0;
   656    654       *paVal = 0;
   657    655     }
   658    656   
   659    657     return rc;
   660    658   }
   661    659   
   662         -/*
   663         -** The worker lock must be held to call this function.
   664         -**
   665         -** The function serializes and returns the data that should be stored as
   666         -** the FREELIST system record.
   667         -*/
   668         -int lsmCheckpointOverflow(
   669         -  lsm_db *pDb,                    /* Database handle (must hold worker lock) */
   670         -  void **ppVal,                   /* OUT: lsmMalloc'd buffer */
   671         -  int *pnVal,                     /* OUT: Size of *ppVal in bytes */
   672         -  int *pnOvfl                     /* OUT: Number of freelist entries in buf */
   673         -){
   674         -  assert( 0 );
   675         -#if 0
   676         -  int rc = LSM_OK;
   677         -  int nRet;
   678         -  Snapshot *p = pDb->pWorker;
   679         -
   680         -  assert( lsmShmAssertWorker(pDb) );
   681         -  assert( pnOvfl && ppVal && pnVal );
   682         -  assert( pDb->nMaxFreelist>=2 && pDb->nMaxFreelist<=LSM_MAX_FREELIST_ENTRIES );
   683         -
   684         -  if( p->nFreelistOvfl ){
   685         -    rc = lsmCheckpointOverflowLoad(pDb, &p->freelist);
   686         -    if( rc!=LSM_OK ) return rc;
   687         -    p->nFreelistOvfl = 0;
   688         -  }
   689         -
   690         -  if( p->freelist.nEntry<=pDb->nMaxFreelist ){
   691         -    nRet = 0;
   692         -    *pnVal = 0;
   693         -    *ppVal = 0;
   694         -  }else{
   695         -    int i;                        /* Iterator variable */
   696         -    int iOut = 0;                 /* Current size of blob in ckpt */
   697         -    CkptBuffer ckpt;              /* Used to build FREELIST blob */
   698         -
   699         -    nRet = (p->freelist.nEntry - pDb->nMaxFreelist);
   700         -
   701         -    memset(&ckpt, 0, sizeof(CkptBuffer));
   702         -    ckpt.pEnv = pDb->pEnv;
   703         -    for(i=p->freelist.nEntry-nRet; rc==LSM_OK && i<p->freelist.nEntry; i++){
   704         -      FreelistEntry *pEntry = &p->freelist.aEntry[i];
   705         -      ckptSetValue(&ckpt, iOut++, pEntry->iBlk, &rc);
   706         -      ckptSetValue(&ckpt, iOut++, (pEntry->iId >> 32) & 0xFFFFFFFF, &rc);
   707         -      ckptSetValue(&ckpt, iOut++, pEntry->iId & 0xFFFFFFFF, &rc);
   708         -    }
   709         -    ckptChangeEndianness(ckpt.aCkpt, iOut);
   710         -
   711         -    *ppVal = ckpt.aCkpt;
   712         -    *pnVal = iOut*sizeof(u32);
   713         -  }
   714         -
   715         -  *pnOvfl = nRet;
   716         -  return rc;
   717         -#endif
   718         -}
   719         -
   720         -/*
   721         -** The connection must be the worker in order to call this function.
   722         -**
   723         -** True is returned if there are currently too many free-list entries
   724         -** in-memory to store in a checkpoint. Before calling CheckpointSaveWorker()
   725         -** to save the current worker snapshot, a new top-level LSM segment must
   726         -** be created so that some of them can be written to the LSM. 
   727         -*/
   728         -int lsmCheckpointOverflowRequired(lsm_db *pDb){
   729         -  Snapshot *p = pDb->pWorker;
   730         -  assert( 0 );
   731         -  assert( lsmShmAssertWorker(pDb) );
   732         -  return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0);
   733         -}
   734         -
   735         -/*
   736         -** Connection pDb must be the worker to call this function.
   737         -**
   738         -** Load the FREELIST record from the database. Decode it and append the
   739         -** results to list pFreelist.
   740         -*/
   741         -int lsmCheckpointOverflowLoad(
   742         -  lsm_db *pDb,
   743         -  Freelist *pFreelist
   744         -){
   745         -  assert( 0 );
   746         -#if 0
   747         -  int rc;
   748         -  int nVal = 0;
   749         -  void *pVal = 0;
   750         -  assert( lsmShmAssertWorker(pDb) );
   751         -
   752         -  /* Load the blob of data from the LSM. If that is successful (and the
   753         -  ** blob is greater than zero bytes in size), decode the contents and
   754         -  ** merge them into the current contents of *pFreelist.  */
   755         -  rc = lsmSortedLoadFreelist(pDb, &pVal, &nVal);
   756         -  if( pVal ){
   757         -    u32 *aFree = (u32 *)pVal;
   758         -    int nFree = nVal / sizeof(int);
   759         -    ckptChangeEndianness(aFree, nFree);
   760         -    if( (nFree % 3) ){
   761         -      rc = LSM_CORRUPT_BKPT;
   762         -    }else{
   763         -      int iNew = 0;               /* Offset of next element in aFree[] */
   764         -      int iOld = 0;               /* Next element in freelist fl */
   765         -      Freelist fl = *pFreelist;   /* Original contents of *pFreelist */
   766         -
   767         -      memset(pFreelist, 0, sizeof(Freelist));
   768         -      while( rc==LSM_OK && (iNew<nFree || iOld<fl.nEntry) ){
   769         -        int iBlk;
   770         -        i64 iId;
   771         -
   772         -        if( iOld>=fl.nEntry ){
   773         -          iBlk = aFree[iNew];
   774         -          iId = ((i64)(aFree[iNew+1])<<32) + (i64)aFree[iNew+2];
   775         -          iNew += 3;
   776         -        }else if( iNew>=nFree ){
   777         -          iBlk = fl.aEntry[iOld].iBlk;
   778         -          iId = fl.aEntry[iOld].iId;
   779         -          iOld += 1;
   780         -        }else{
   781         -          iId = ((i64)(aFree[iNew+1])<<32) + (i64)aFree[iNew+2];
   782         -          if( iId<fl.aEntry[iOld].iId ){
   783         -            iBlk = aFree[iNew];
   784         -            iNew += 3;
   785         -          }else{
   786         -            iBlk = fl.aEntry[iOld].iBlk;
   787         -            iId = fl.aEntry[iOld].iId;
   788         -            iOld += 1;
   789         -          }
   790         -        }
   791         -
   792         -        rc = lsmFreelistAppend(pDb->pEnv, pFreelist, iBlk, iId);
   793         -      }
   794         -      lsmFree(pDb->pEnv, fl.aEntry);
   795         -
   796         -#ifdef LSM_DEBUG
   797         -      if( rc==LSM_OK ){
   798         -        int i;
   799         -        for(i=1; rc==LSM_OK && i<pFreelist->nEntry; i++){
   800         -          assert( pFreelist->aEntry[i].iId >= pFreelist->aEntry[i-1].iId );
   801         -        }
   802         -        assert( pFreelist->nEntry==(fl.nEntry + nFree/3) );
   803         -      }
   804         -#endif
   805         -    }
   806         -
   807         -    lsmFree(pDb->pEnv, pVal);
   808         -  }
   809         -
   810         -  return rc;
   811         -#endif
   812         -}
   813         -
   814    660   /*
   815    661   ** Read the checkpoint id from meta-page pPg.
   816    662   */
   817    663   static i64 ckptLoadId(MetaPage *pPg){
   818    664     i64 ret = 0;
   819    665     if( pPg ){
   820    666       int nData;
................................................................................
  1076    922       for(i=0; i<LSM_APPLIST_SZ; i++){
  1077    923         u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
  1078    924         pNew->aiAppend[i] = ckptRead64(a);
  1079    925       }
  1080    926   
  1081    927       /* Copy the free-list */
  1082    928       if( bInclFreelist ){
  1083         -      pNew->nFreelistOvfl = aCkpt[CKPT_HDR_OVFL];
  1084    929         nFree = aCkpt[iIn++];
  1085    930         if( nFree ){
  1086    931           pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc(
  1087    932               pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
  1088    933           );
  1089    934           if( rc==LSM_OK ){
  1090    935             int i;
................................................................................
  1141    986   **
  1142    987   ** This function updates the shared-memory worker and client snapshots with
  1143    988   ** the new snapshot produced by the work performed by pDb.
  1144    989   **
  1145    990   ** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM
  1146    991   ** error code is returned.
  1147    992   */
  1148         -int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
          993  +int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
  1149    994     Snapshot *pSnap = pDb->pWorker;
  1150    995     ShmHeader *pShm = pDb->pShmhdr;
  1151    996     void *p = 0;
  1152    997     int n = 0;
  1153    998     int rc;
  1154    999   
  1155   1000     rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n);

Changes to src/lsm_file.c.

  2134   2134       lsmStringAppendf(&str, " %d", pArray->iLastPg);
  2135   2135   
  2136   2136       *pzOut = str.z;
  2137   2137     }
  2138   2138   
  2139   2139     if( bUnlock ){
  2140   2140       int rcwork = LSM_BUSY;
  2141         -    lsmFinishWork(pDb, 0, 0, &rcwork);
         2141  +    lsmFinishWork(pDb, 0, &rcwork);
  2142   2142     }
  2143   2143     return rc;
  2144   2144   }
  2145   2145   
  2146   2146   /*
  2147   2147   ** The following macros are used by the integrity-check code. Associated with
  2148   2148   ** each block in the database is an 8-bit bit mask (the entry in the aUsed[]

Changes to src/lsm_main.c.

   368    368     *pp = pDb->pWorker;
   369    369     return rc;
   370    370   }
   371    371   
   372    372   static void infoFreeWorker(lsm_db *pDb, int bUnlock){
   373    373     if( bUnlock ){
   374    374       int rcdummy = LSM_BUSY;
   375         -    lsmFinishWork(pDb, 0, 0, &rcdummy);
          375  +    lsmFinishWork(pDb, 0, &rcdummy);
   376    376     }
   377    377   }
   378    378   
   379    379   int lsmStructList(
   380    380     lsm_db *pDb,                    /* Database handle */
   381    381     char **pzOut                    /* OUT: Nul-terminated string (tcl list) */
   382    382   ){
................................................................................
   419    419     int rc;
   420    420   
   421    421     /* Obtain the worker snapshot */
   422    422     rc = infoGetWorker(pDb, &pWorker, &bUnlock);
   423    423     if( rc!=LSM_OK ) return rc;
   424    424   
   425    425     lsmStringInit(&s, pDb->pEnv);
   426         -  lsmStringAppendf(&s, "%d+%d",pWorker->freelist.nEntry,pWorker->nFreelistOvfl);
          426  +  lsmStringAppendf(&s, "%d", pWorker->freelist.nEntry);
   427    427     for(i=0; i<pWorker->freelist.nEntry; i++){
   428    428       FreelistEntry *p = &pWorker->freelist.aEntry[i];
   429    429       lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId);
   430    430     }
   431    431     rc = s.n>=0 ? LSM_OK : LSM_NOMEM;
   432    432   
   433    433     /* Release the snapshot and return */

Changes to src/lsm_shared.c.

   559    559     **   * May be used by a database client in the future, or
   560    560     **   * Is the most recently checkpointed snapshot (i.e. the one that will
   561    561     **     be used following recovery if a failure occurs at this point).
   562    562     */
   563    563     rc = lsmCheckpointSynced(pDb, &iInUse, 0, 0);
   564    564     if( rc==LSM_OK && iInUse==0 ) iInUse = p->iId;
   565    565     if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
   566         -
   567    566     if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);
   568    567   
   569    568     /* Query the free block list for a suitable block */
   570    569     if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);
   571    570   
   572    571     /* If a block was found in the free block list, use it and remove it from 
   573    572     ** the list. Otherwise, if no suitable block was found, allocate one from
   574    573     ** the end of the file.  */
   575    574     if( rc==LSM_OK ){
   576    575       if( iRet>0 ){
   577    576   #ifdef LSM_LOG_FREELIST
   578         -      lsmLogMessage(pDb, 0, "reusing block %d", iRet);
          577  +      lsmLogMessage(pDb, 0, 
          578  +          "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
   579    579   #endif
   580    580         rc = freelistAppend(pDb, iRet, -1);
   581    581       }else{
   582    582         iRet = ++(p->nBlock);
   583    583   #ifdef LSM_LOG_FREELIST
   584    584         lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
   585    585   #endif
................................................................................
   728    728   
   729    729   /*
   730    730   ** Argument bFlush is true if the contents of the in-memory tree has just
   731    731   ** been flushed to disk. The significance of this is that once the snapshot
   732    732   ** created to hold the updated state of the database is synced to disk, log
   733    733   ** file space can be recycled.
   734    734   */
   735         -void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
          735  +void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
   736    736     assert( *pRc!=0 || pDb->pWorker );
   737    737     if( pDb->pWorker ){
   738    738       /* If no error has occurred, serialize the worker snapshot and write
   739    739       ** it to shared memory.  */
   740         -    assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 );
   741    740       if( *pRc==LSM_OK ){
   742         -      *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
          741  +      *pRc = lsmCheckpointSaveWorker(pDb, bFlush);
   743    742       }
   744    743       lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
   745    744       pDb->pWorker = 0;
   746    745     }
   747    746   
   748    747     lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
   749    748   }
................................................................................
   816    815         }
   817    816         if( rc==LSM_BUSY ){
   818    817           rc = LSM_OK;
   819    818         }
   820    819       }
   821    820   #if 0
   822    821   if( rc==LSM_OK && pDb->pClient ){
   823         -  printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
          822  +  fprintf(stderr, 
          823  +      "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
   824    824         (void *)pDb,
   825    825         (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, 
   826    826         (int)pDb->treehdr.root.iTransId,
   827    827         (int)pDb->treehdr.iOldShmid
   828    828     );
   829         -  fflush(stdout);
   830    829   }
   831    830   #endif
   832    831     }
   833    832   
   834    833     if( rc!=LSM_OK ){
   835    834       lsmReleaseReadlock(pDb);
   836    835     }
................................................................................
   852    851   
   853    852   #if 0
   854    853     if( pClient ){
   855    854       lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
   856    855       pDb->pClient = 0;
   857    856     }
   858    857   #endif
          858  +
          859  +#if 0
          860  +if( pDb->pClient && pDb->iReader>=0 ){
          861  +  fprintf(stderr, 
          862  +      "finished reading %p: snapshot:%d\n", (void *)pDb, (int)pDb->pClient->iId
          863  +  );
          864  +}
          865  +#endif
   859    866     if( pDb->iReader>=0 ) lsmReleaseReadlock(pDb);
   860    867   }
   861    868   
   862    869   /*
   863    870   ** Open a write transaction.
   864    871   */
   865    872   int lsmBeginWriteTrans(lsm_db *pDb){
................................................................................
  1092   1099             ** the caller in this case.  */
  1093   1100             return rc;
  1094   1101           }
  1095   1102         }
  1096   1103       }
  1097   1104     }
  1098   1105   
         1106  +  *piInUse = iInUse;
  1099   1107     return LSM_OK;
  1100   1108   }
  1101   1109   
  1102   1110   int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
  1103   1111     if( db->treehdr.iUsedShmid==iShmid ){
  1104   1112       *pbInUse = 1;
  1105   1113       return LSM_OK;

Changes to src/lsm_sorted.c.

   202    202     BtreeCursor *pBtCsr;            /* b-tree cursor (db writes only) */
   203    203   
   204    204     /* Comparison results */
   205    205     int nTree;                      /* Size of aTree[] array */
   206    206     int *aTree;                     /* Array of comparison results */
   207    207   
   208    208     /* Used by cursors flushing the in-memory tree only */
   209         -  int *pnOvfl;                    /* Number of free-list entries to store */
   210    209     void *pSystemVal;               /* Pointer to buffer to free */
   211    210   
   212    211     /* Used by worker cursors only */
   213    212     Pgno *pPrevMergePtr;
   214    213   };
   215    214   
   216    215   /*
................................................................................
  2259   2258   }
  2260   2259   
  2261   2260   /*
  2262   2261   ** If the free-block list is not empty, then have this cursor visit a key
  2263   2262   ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value 
  2264   2263   ** blob containing the serialized free-block list.
  2265   2264   */
  2266         -static int multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){
         2265  +static int multiCursorVisitFreelist(MultiCursor *pCsr){
  2267   2266     int rc = LSM_OK;
  2268         -
  2269         -  assert( pCsr );
  2270         -  pCsr->pnOvfl = pnOvfl;
  2271   2267     pCsr->flags |= CURSOR_FLUSH_FREELIST;
  2272   2268     pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
  2273   2269     return rc;
  2274   2270   }
  2275   2271   
  2276   2272   /*
  2277   2273   ** Allocate and return a new database cursor.
................................................................................
  3884   3880       pDb->xWork(pDb, pDb->pWorkCtx);
  3885   3881     }
  3886   3882   }
  3887   3883   
  3888   3884   static int sortedNewToplevel(
  3889   3885     lsm_db *pDb,                    /* Connection handle */
  3890   3886     int eTree,                      /* One of the TREE_XXX constants */
  3891         -  int *pnOvfl,                    /* OUT: Number of free-list entries stored */
  3892   3887     int *pnWrite                    /* OUT: Number of database pages written */
  3893   3888   ){
  3894   3889     int rc = LSM_OK;                /* Return Code */
  3895   3890     MultiCursor *pCsr = 0;
  3896   3891     Level *pNext = 0;               /* The current top level */
  3897   3892     Level *pNew;                    /* The new level itself */
  3898   3893     Segment *pDel = 0;              /* Delete separators from this segment */
  3899   3894     int nWrite = 0;                 /* Number of database pages written */
  3900   3895     Freelist freelist;
  3901         -  assert( pnOvfl );
  3902   3896   
  3903   3897     assert( pDb->bUseFreelist==0 );
  3904   3898     pDb->pFreelist = &freelist;
  3905   3899     pDb->bUseFreelist = 1;
  3906   3900     memset(&freelist, 0, sizeof(freelist));
  3907   3901   
  3908   3902     /* Allocate the new level structure to write to. */
................................................................................
  3915   3909   
  3916   3910     /* Create a cursor to gather the data required by the new segment. The new
  3917   3911     ** segment contains everything in the tree and pointers to the next segment
  3918   3912     ** in the database (if any).  */
  3919   3913     pCsr = multiCursorNew(pDb, &rc);
  3920   3914     if( pCsr ){
  3921   3915       pCsr->pDb = pDb;
  3922         -    rc = multiCursorVisitFreelist(pCsr, pnOvfl);
         3916  +    rc = multiCursorVisitFreelist(pCsr);
  3923   3917       if( rc==LSM_OK ){
  3924   3918         rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
  3925   3919       }
  3926   3920       if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){
  3927   3921         pDel = &pNext->lhs;
  3928   3922         rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
  3929   3923       }
................................................................................
  3966   3960   
  3967   3961       nWrite = mergeworker.nWork;
  3968   3962       mergeWorkerShutdown(&mergeworker, &rc);
  3969   3963       pNew->pMerge = 0;
  3970   3964       pNew->iAge = 0;
  3971   3965     }
  3972   3966   
  3973         -  /* Link the new level into the top of the tree. */
  3974         -  if( rc==LSM_OK && pNew->lhs.iFirst ){
  3975         -    if( pDel ) pDel->iRoot = 0;
  3976         -  }else{
         3967  +  if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
         3968  +    assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
  3977   3969       lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
  3978   3970       sortedFreeLevel(pDb->pEnv, pNew);
  3979         -  }
         3971  +  }else{
         3972  +    if( pDel ) pDel->iRoot = 0;
  3980   3973   
  3981   3974   #if 0
  3982         -  if( pNew->lhs.iFirst ){
  3983   3975       lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
  3984         -  }
  3985   3976   #endif
  3986   3977   
  3987         -  if( rc==LSM_OK ){
  3988         -    assert( pNew->lhs.iFirst || pDb->pWorker->freelist.nEntry==0 );
  3989   3978       if( freelist.nEntry ){
  3990   3979         Freelist *p = &pDb->pWorker->freelist;
  3991   3980         lsmFree(pDb->pEnv, p->aEntry);
  3992   3981         memcpy(p, &freelist, sizeof(freelist));
  3993   3982         freelist.aEntry = 0;
  3994   3983       }else{
  3995   3984         pDb->pWorker->freelist.nEntry = 0;
................................................................................
  4455   4444     int bShutdown,
  4456   4445     int flags, 
  4457   4446     int nPage,                      /* Number of pages to write to disk */
  4458   4447     int *pnWrite,                   /* OUT: Pages actually written to disk */
  4459   4448     int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
  4460   4449   ){
  4461   4450     int rc = LSM_OK;                /* Return code */
  4462         -  int nOvfl = 0;
  4463   4451     int bDirty = 0;
  4464   4452     int nMax = nPage;               /* Maximum pages to write to disk */
  4465   4453     int nRem = nPage;
  4466   4454     int bCkpt = 0;
  4467   4455   
  4468   4456     /* Open the worker 'transaction'. It will be closed before this function
  4469   4457     ** returns.  */
................................................................................
  4503   4491         rc = sortedWork(pDb, nRem, 0, 1, &nPg);
  4504   4492         nRem -= nPg;
  4505   4493         assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
  4506   4494       }
  4507   4495   
  4508   4496       if( rc==LSM_OK && nRem>0 ){
  4509   4497         int nPg = 0;
  4510         -      rc = sortedNewToplevel(pDb, TREE_OLD, &nOvfl, &nPg);
         4498  +      rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
  4511   4499         nRem -= nPg;
  4512   4500         if( rc==LSM_OK ){
  4513   4501           if( pDb->nTransOpen>0 ){
  4514   4502             lsmTreeDiscardOld(pDb);
  4515   4503           }
  4516         -        rc = lsmCheckpointSaveWorker(pDb, 1, 0);
         4504  +        rc = lsmCheckpointSaveWorker(pDb, 1);
  4517   4505         }
  4518   4506       }
  4519   4507     }
  4520   4508   
  4521   4509     /* If nPage is still greater than zero, do some merging. */
  4522   4510     if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
  4523   4511       int nPg = 0;
................................................................................
  4524   4512       int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
  4525   4513       rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
  4526   4514       nRem -= nPg;
  4527   4515       if( nPg ) bDirty = 1;
  4528   4516     }
  4529   4517   
  4530   4518     /* If the in-memory part of the free-list is too large, write a new 
  4531         -  ** top-level containing just the in-memory free-list entries to disk.
  4532         -  */
  4533         -  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > LSM_MAX_FREELIST_ENTRIES ){
         4519  +  ** top-level containing just the in-memory free-list entries to disk. */
         4520  +  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
  4534   4521       int nPg = 0;
  4535   4522       while( rc==LSM_OK && sortedDbIsFull(pDb) ){
  4536   4523         rc = sortedWork(pDb, 16, 0, 1, &nPg);
  4537   4524         nRem -= nPg;
  4538   4525       }
  4539   4526       if( rc==LSM_OK ){
  4540         -      rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, &nPg);
         4527  +      rc = sortedNewToplevel(pDb, TREE_NONE, &nPg);
  4541   4528       }
  4542   4529       nRem -= nPg;
  4543   4530       if( nPg ) bDirty = 1;
  4544   4531     }
  4545   4532   
  4546   4533     if( rc==LSM_OK && bDirty ){
  4547         -    lsmFinishWork(pDb, 0, 0, &rc);
         4534  +    lsmFinishWork(pDb, 0, &rc);
  4548   4535     }else{
  4549   4536       int rcdummy = LSM_BUSY;
  4550         -    lsmFinishWork(pDb, 0, 0, &rcdummy);
         4537  +    lsmFinishWork(pDb, 0, &rcdummy);
  4551   4538     }
  4552   4539     assert( pDb->pWorker==0 );
  4553   4540   
  4554   4541     if( rc==LSM_OK ){
  4555   4542       if( pnWrite ) *pnWrite = (nMax - nRem);
  4556   4543       if( pbCkpt ) *pbCkpt = (bCkpt && nRem<=0);
  4557   4544     }else{
................................................................................
  4660   4647   
  4661   4648   /*
  4662   4649   ** This function is only called during system shutdown. The contents of
  4663   4650   ** any in-memory trees present (old or current) are written out to disk.
  4664   4651   */
  4665   4652   int lsmFlushTreeToDisk(lsm_db *pDb){
  4666   4653     int rc;
  4667         -  int nOvfl = 0;
  4668   4654   
  4669   4655     rc = lsmBeginWork(pDb);
  4670   4656     while( rc==LSM_OK && sortedDbIsFull(pDb) ){
  4671   4657       rc = sortedWork(pDb, 256, 0, 1, 0);
  4672   4658     }
  4673   4659   
  4674   4660     if( rc==LSM_OK ){
  4675         -    rc = sortedNewToplevel(pDb, TREE_BOTH, &nOvfl, 0);
         4661  +    rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
  4676   4662     }
  4677         -  lsmFinishWork(pDb, 1, nOvfl, &rc);
         4663  +  lsmFinishWork(pDb, 1, &rc);
  4678   4664     return rc;
  4679   4665   }
  4680   4666   
  4681   4667   /*
  4682   4668   ** Return a string representation of the segment passed as the only argument.
  4683   4669   ** Space for the returned string is allocated using lsmMalloc(), and should
  4684   4670   ** be freed by the caller using lsmFree().