Index: lsm-test/lsmtest_tdb3.c ================================================================== --- lsm-test/lsmtest_tdb3.c +++ lsm-test/lsmtest_tdb3.c @@ -896,11 +896,11 @@ int bClear, TestDb **ppDb ){ const char *zCfg = "page_size=256 block_size=65536 write_buffer=16384 " - "max_freelist=4 autocheckpoint=32768 " + "max_freelist=2 autocheckpoint=32768 " "mmap=0 " ; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } @@ -909,11 +909,11 @@ int bClear, TestDb **ppDb ){ const char *zCfg = "page_size=256 block_size=65536 write_buffer=16384 " - "max_freelist=4 autocheckpoint=32768 compression=1" + "max_freelist=2 autocheckpoint=32768 compression=1" "mmap=0 " ; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -60,18 +60,10 @@ ** spaces. The following macro is used to test for this. */ #define LSM_IS_64_BIT (sizeof(void*)==8) #define LSM_AUTOWORK_QUANT 32 -/* Minimum number of free-list entries to store in the checkpoint, assuming -** the free-list contains this many entries. i.e. if overflow is required, -** the first LSM_CKPT_MIN_FREELIST entries are stored in the checkpoint and -** the remainder in an LSM system entry. */ -#define LSM_CKPT_MIN_FREELIST 6 -#define LSM_CKPT_MAX_REFREE 2 -#define LSM_CKPT_MIN_NONLSM (LSM_CKPT_MIN_FREELIST - LSM_CKPT_MAX_REFREE) - typedef struct Database Database; typedef struct DbLog DbLog; typedef struct FileSystem FileSystem; typedef struct Freelist Freelist; typedef struct FreelistEntry FreelistEntry; @@ -499,11 +491,10 @@ /* Used by worker snapshots only */ int nBlock; /* Number of blocks in database file */ Pgno aiAppend[LSM_APPLIST_SZ]; /* Append point list */ Freelist freelist; /* Free block list */ - int nFreelistOvfl; /* Number of extra free-list entries in LSM */ u32 nWrite; /* Total number of pages written to disk */ }; #define LSM_INITIAL_SNAPSHOT_ID 11 /* @@ -511,14 +502,10 @@ */ int lsmCheckpointWrite(lsm_db *, u32 *); int lsmCheckpointLevels(lsm_db *, int, void **, int *); int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal); -int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *); -int lsmCheckpointOverflowRequired(lsm_db *pDb); -int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *); - int lsmCheckpointRecover(lsm_db *); int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **); int lsmCheckpointLoadWorker(lsm_db *pDb); int lsmCheckpointStore(lsm_db *pDb, int); @@ -533,11 +520,11 @@ int lsmCheckpointPgsz(u32 *); int lsmCheckpointBlksz(u32 *); void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog); void lsmCheckpointZeroLogoffset(lsm_db *); -int lsmCheckpointSaveWorker(lsm_db *pDb, int, int); +int lsmCheckpointSaveWorker(lsm_db *pDb, int); int lsmDatabaseFull(lsm_db *pDb); int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite); /* @@ -790,11 +777,11 @@ int lsmBeginReadTrans(lsm_db *); int lsmBeginWriteTrans(lsm_db *); int lsmBeginFlush(lsm_db *); int lsmBeginWork(lsm_db *); -void lsmFinishWork(lsm_db *, int, int, int *); +void lsmFinishWork(lsm_db *, int, int *); int lsmFinishRecovery(lsm_db *); void lsmFinishReadTrans(lsm_db *); int lsmFinishWriteTrans(lsm_db *, int); int lsmFinishFlush(lsm_db *, int); Index: src/lsm_ckpt.c ================================================================== --- src/lsm_ckpt.c +++ src/lsm_ckpt.c @@ -165,11 +165,11 @@ static const int one = 1; #define LSM_LITTLE_ENDIAN (*(u8 *)(&one)) /* Sizes, in integers, of various parts of the checkpoint. */ -#define CKPT_HDR_SIZE 9 +#define CKPT_HDR_SIZE 8 #define CKPT_LOGPTR_SIZE 4 #define CKPT_APPENDLIST_SIZE (LSM_APPLIST_SZ * 2) /* A #define to describe each integer in the checkpoint header. */ #define CKPT_HDR_ID_MSW 0 @@ -177,17 +177,16 @@ #define CKPT_HDR_NCKPT 2 #define CKPT_HDR_NBLOCK 3 #define CKPT_HDR_BLKSZ 4 #define CKPT_HDR_NLEVEL 5 #define CKPT_HDR_PGSZ 6 -#define CKPT_HDR_OVFL 7 -#define CKPT_HDR_NWRITE 8 +#define CKPT_HDR_NWRITE 7 -#define CKPT_HDR_LO_MSW 9 -#define CKPT_HDR_LO_LSW 10 -#define CKPT_HDR_LO_CKSUM1 11 -#define CKPT_HDR_LO_CKSUM2 12 +#define CKPT_HDR_LO_MSW 8 +#define CKPT_HDR_LO_LSW 9 +#define CKPT_HDR_LO_CKSUM1 10 +#define CKPT_HDR_LO_CKSUM2 11 typedef struct CkptBuffer CkptBuffer; /* ** Dynamic buffer used to accumulate data for a checkpoint. @@ -439,11 +438,10 @@ ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc); ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc); ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc); ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc); ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc); - ckptSetValue(&ckpt, CKPT_HDR_OVFL, 0, &rc); ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc); if( bCksum ){ ckptAddChecksum(&ckpt, iOut, &rc); }else{ @@ -657,162 +655,10 @@ } return rc; } -/* -** The worker lock must be held to call this function. -** -** The function serializes and returns the data that should be stored as -** the FREELIST system record. -*/ -int lsmCheckpointOverflow( - lsm_db *pDb, /* Database handle (must hold worker lock) */ - void **ppVal, /* OUT: lsmMalloc'd buffer */ - int *pnVal, /* OUT: Size of *ppVal in bytes */ - int *pnOvfl /* OUT: Number of freelist entries in buf */ -){ - assert( 0 ); -#if 0 - int rc = LSM_OK; - int nRet; - Snapshot *p = pDb->pWorker; - - assert( lsmShmAssertWorker(pDb) ); - assert( pnOvfl && ppVal && pnVal ); - assert( pDb->nMaxFreelist>=2 && pDb->nMaxFreelist<=LSM_MAX_FREELIST_ENTRIES ); - - if( p->nFreelistOvfl ){ - rc = lsmCheckpointOverflowLoad(pDb, &p->freelist); - if( rc!=LSM_OK ) return rc; - p->nFreelistOvfl = 0; - } - - if( p->freelist.nEntry<=pDb->nMaxFreelist ){ - nRet = 0; - *pnVal = 0; - *ppVal = 0; - }else{ - int i; /* Iterator variable */ - int iOut = 0; /* Current size of blob in ckpt */ - CkptBuffer ckpt; /* Used to build FREELIST blob */ - - nRet = (p->freelist.nEntry - pDb->nMaxFreelist); - - memset(&ckpt, 0, sizeof(CkptBuffer)); - ckpt.pEnv = pDb->pEnv; - for(i=p->freelist.nEntry-nRet; rc==LSM_OK && ifreelist.nEntry; i++){ - FreelistEntry *pEntry = &p->freelist.aEntry[i]; - ckptSetValue(&ckpt, iOut++, pEntry->iBlk, &rc); - ckptSetValue(&ckpt, iOut++, (pEntry->iId >> 32) & 0xFFFFFFFF, &rc); - ckptSetValue(&ckpt, iOut++, pEntry->iId & 0xFFFFFFFF, &rc); - } - ckptChangeEndianness(ckpt.aCkpt, iOut); - - *ppVal = ckpt.aCkpt; - *pnVal = iOut*sizeof(u32); - } - - *pnOvfl = nRet; - return rc; -#endif -} - -/* -** The connection must be the worker in order to call this function. -** -** True is returned if there are currently too many free-list entries -** in-memory to store in a checkpoint. Before calling CheckpointSaveWorker() -** to save the current worker snapshot, a new top-level LSM segment must -** be created so that some of them can be written to the LSM. -*/ -int lsmCheckpointOverflowRequired(lsm_db *pDb){ - Snapshot *p = pDb->pWorker; - assert( 0 ); - assert( lsmShmAssertWorker(pDb) ); - return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0); -} - -/* -** Connection pDb must be the worker to call this function. -** -** Load the FREELIST record from the database. Decode it and append the -** results to list pFreelist. -*/ -int lsmCheckpointOverflowLoad( - lsm_db *pDb, - Freelist *pFreelist -){ - assert( 0 ); -#if 0 - int rc; - int nVal = 0; - void *pVal = 0; - assert( lsmShmAssertWorker(pDb) ); - - /* Load the blob of data from the LSM. If that is successful (and the - ** blob is greater than zero bytes in size), decode the contents and - ** merge them into the current contents of *pFreelist. */ - rc = lsmSortedLoadFreelist(pDb, &pVal, &nVal); - if( pVal ){ - u32 *aFree = (u32 *)pVal; - int nFree = nVal / sizeof(int); - ckptChangeEndianness(aFree, nFree); - if( (nFree % 3) ){ - rc = LSM_CORRUPT_BKPT; - }else{ - int iNew = 0; /* Offset of next element in aFree[] */ - int iOld = 0; /* Next element in freelist fl */ - Freelist fl = *pFreelist; /* Original contents of *pFreelist */ - - memset(pFreelist, 0, sizeof(Freelist)); - while( rc==LSM_OK && (iNew=fl.nEntry ){ - iBlk = aFree[iNew]; - iId = ((i64)(aFree[iNew+1])<<32) + (i64)aFree[iNew+2]; - iNew += 3; - }else if( iNew>=nFree ){ - iBlk = fl.aEntry[iOld].iBlk; - iId = fl.aEntry[iOld].iId; - iOld += 1; - }else{ - iId = ((i64)(aFree[iNew+1])<<32) + (i64)aFree[iNew+2]; - if( iIdpEnv, pFreelist, iBlk, iId); - } - lsmFree(pDb->pEnv, fl.aEntry); - -#ifdef LSM_DEBUG - if( rc==LSM_OK ){ - int i; - for(i=1; rc==LSM_OK && inEntry; i++){ - assert( pFreelist->aEntry[i].iId >= pFreelist->aEntry[i-1].iId ); - } - assert( pFreelist->nEntry==(fl.nEntry + nFree/3) ); - } -#endif - } - - lsmFree(pDb->pEnv, pVal); - } - - return rc; -#endif -} - /* ** Read the checkpoint id from meta-page pPg. */ static i64 ckptLoadId(MetaPage *pPg){ i64 ret = 0; @@ -1078,11 +924,10 @@ pNew->aiAppend[i] = ckptRead64(a); } /* Copy the free-list */ if( bInclFreelist ){ - pNew->nFreelistOvfl = aCkpt[CKPT_HDR_OVFL]; nFree = aCkpt[iIn++]; if( nFree ){ pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc( pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc ); @@ -1143,11 +988,11 @@ ** the new snapshot produced by the work performed by pDb. ** ** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM ** error code is returned. */ -int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){ +int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){ Snapshot *pSnap = pDb->pWorker; ShmHeader *pShm = pDb->pShmhdr; void *p = 0; int n = 0; int rc; Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -2136,11 +2136,11 @@ *pzOut = str.z; } if( bUnlock ){ int rcwork = LSM_BUSY; - lsmFinishWork(pDb, 0, 0, &rcwork); + lsmFinishWork(pDb, 0, &rcwork); } return rc; } /* Index: src/lsm_main.c ================================================================== --- src/lsm_main.c +++ src/lsm_main.c @@ -370,11 +370,11 @@ } static void infoFreeWorker(lsm_db *pDb, int bUnlock){ if( bUnlock ){ int rcdummy = LSM_BUSY; - lsmFinishWork(pDb, 0, 0, &rcdummy); + lsmFinishWork(pDb, 0, &rcdummy); } } int lsmStructList( lsm_db *pDb, /* Database handle */ @@ -421,11 +421,11 @@ /* Obtain the worker snapshot */ rc = infoGetWorker(pDb, &pWorker, &bUnlock); if( rc!=LSM_OK ) return rc; lsmStringInit(&s, pDb->pEnv); - lsmStringAppendf(&s, "%d+%d",pWorker->freelist.nEntry,pWorker->nFreelistOvfl); + lsmStringAppendf(&s, "%d", pWorker->freelist.nEntry); for(i=0; ifreelist.nEntry; i++){ FreelistEntry *p = &pWorker->freelist.aEntry[i]; lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId); } rc = s.n>=0 ? LSM_OK : LSM_NOMEM; Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -561,11 +561,10 @@ ** be used following recovery if a failure occurs at this point). */ rc = lsmCheckpointSynced(pDb, &iInUse, 0, 0); if( rc==LSM_OK && iInUse==0 ) iInUse = p->iId; if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId); - if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse); /* Query the free block list for a suitable block */ if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet); @@ -573,11 +572,12 @@ ** the list. Otherwise, if no suitable block was found, allocate one from ** the end of the file. */ if( rc==LSM_OK ){ if( iRet>0 ){ #ifdef LSM_LOG_FREELIST - lsmLogMessage(pDb, 0, "reusing block %d", iRet); + lsmLogMessage(pDb, 0, + "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse); #endif rc = freelistAppend(pDb, iRet, -1); }else{ iRet = ++(p->nBlock); #ifdef LSM_LOG_FREELIST @@ -730,18 +730,17 @@ ** Argument bFlush is true if the contents of the in-memory tree has just ** been flushed to disk. The significance of this is that once the snapshot ** created to hold the updated state of the database is synced to disk, log ** file space can be recycled. */ -void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){ +void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){ assert( *pRc!=0 || pDb->pWorker ); if( pDb->pWorker ){ /* If no error has occurred, serialize the worker snapshot and write ** it to shared memory. */ - assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 ); if( *pRc==LSM_OK ){ - *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl); + *pRc = lsmCheckpointSaveWorker(pDb, bFlush); } lsmFreeSnapshot(pDb->pEnv, pDb->pWorker); pDb->pWorker = 0; } @@ -818,17 +817,17 @@ rc = LSM_OK; } } #if 0 if( rc==LSM_OK && pDb->pClient ){ - printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n", + fprintf(stderr, + "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n", (void *)pDb, (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, (int)pDb->treehdr.root.iTransId, (int)pDb->treehdr.iOldShmid ); - fflush(stdout); } #endif } if( rc!=LSM_OK ){ @@ -854,10 +853,18 @@ if( pClient ){ lsmFreeSnapshot(pDb->pEnv, pDb->pClient); pDb->pClient = 0; } #endif + +#if 0 +if( pDb->pClient && pDb->iReader>=0 ){ + fprintf(stderr, + "finished reading %p: snapshot:%d\n", (void *)pDb, (int)pDb->pClient->iId + ); +} +#endif if( pDb->iReader>=0 ) lsmReleaseReadlock(pDb); } /* ** Open a write transaction. @@ -1094,10 +1101,11 @@ } } } } + *piInUse = iInUse; return LSM_OK; } int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){ if( db->treehdr.iUsedShmid==iShmid ){ Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -204,11 +204,10 @@ /* Comparison results */ int nTree; /* Size of aTree[] array */ int *aTree; /* Array of comparison results */ /* Used by cursors flushing the in-memory tree only */ - int *pnOvfl; /* Number of free-list entries to store */ void *pSystemVal; /* Pointer to buffer to free */ /* Used by worker cursors only */ Pgno *pPrevMergePtr; }; @@ -2261,15 +2260,12 @@ /* ** If the free-block list is not empty, then have this cursor visit a key ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value ** blob containing the serialized free-block list. */ -static int multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){ +static int multiCursorVisitFreelist(MultiCursor *pCsr){ int rc = LSM_OK; - - assert( pCsr ); - pCsr->pnOvfl = pnOvfl; pCsr->flags |= CURSOR_FLUSH_FREELIST; pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc); return rc; } @@ -3886,21 +3882,19 @@ } static int sortedNewToplevel( lsm_db *pDb, /* Connection handle */ int eTree, /* One of the TREE_XXX constants */ - int *pnOvfl, /* OUT: Number of free-list entries stored */ int *pnWrite /* OUT: Number of database pages written */ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ int nWrite = 0; /* Number of database pages written */ Freelist freelist; - assert( pnOvfl ); assert( pDb->bUseFreelist==0 ); pDb->pFreelist = &freelist; pDb->bUseFreelist = 1; memset(&freelist, 0, sizeof(freelist)); @@ -3917,11 +3911,11 @@ ** segment contains everything in the tree and pointers to the next segment ** in the database (if any). */ pCsr = multiCursorNew(pDb, &rc); if( pCsr ){ pCsr->pDb = pDb; - rc = multiCursorVisitFreelist(pCsr, pnOvfl); + rc = multiCursorVisitFreelist(pCsr); if( rc==LSM_OK ){ rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree); } if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){ pDel = &pNext->lhs; @@ -3968,26 +3962,21 @@ mergeWorkerShutdown(&mergeworker, &rc); pNew->pMerge = 0; pNew->iAge = 0; } - /* Link the new level into the top of the tree. */ - if( rc==LSM_OK && pNew->lhs.iFirst ){ - if( pDel ) pDel->iRoot = 0; - }else{ + if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){ + assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 ); lsmDbSnapshotSetLevel(pDb->pWorker, pNext); sortedFreeLevel(pDb->pEnv, pNew); - } + }else{ + if( pDel ) pDel->iRoot = 0; #if 0 - if( pNew->lhs.iFirst ){ lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel"); - } #endif - if( rc==LSM_OK ){ - assert( pNew->lhs.iFirst || pDb->pWorker->freelist.nEntry==0 ); if( freelist.nEntry ){ Freelist *p = &pDb->pWorker->freelist; lsmFree(pDb->pEnv, p->aEntry); memcpy(p, &freelist, sizeof(freelist)); freelist.aEntry = 0; @@ -4457,11 +4446,10 @@ int nPage, /* Number of pages to write to disk */ int *pnWrite, /* OUT: Pages actually written to disk */ int *pbCkpt /* OUT: True if an auto-checkpoint is req. */ ){ int rc = LSM_OK; /* Return code */ - int nOvfl = 0; int bDirty = 0; int nMax = nPage; /* Maximum pages to write to disk */ int nRem = nPage; int bCkpt = 0; @@ -4505,17 +4493,17 @@ assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) ); } if( rc==LSM_OK && nRem>0 ){ int nPg = 0; - rc = sortedNewToplevel(pDb, TREE_OLD, &nOvfl, &nPg); + rc = sortedNewToplevel(pDb, TREE_OLD, &nPg); nRem -= nPg; if( rc==LSM_OK ){ if( pDb->nTransOpen>0 ){ lsmTreeDiscardOld(pDb); } - rc = lsmCheckpointSaveWorker(pDb, 1, 0); + rc = lsmCheckpointSaveWorker(pDb, 1); } } } /* If nPage is still greater than zero, do some merging. */ @@ -4526,30 +4514,29 @@ nRem -= nPg; if( nPg ) bDirty = 1; } /* If the in-memory part of the free-list is too large, write a new - ** top-level containing just the in-memory free-list entries to disk. - */ - if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > LSM_MAX_FREELIST_ENTRIES ){ + ** top-level containing just the in-memory free-list entries to disk. */ + if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){ int nPg = 0; while( rc==LSM_OK && sortedDbIsFull(pDb) ){ rc = sortedWork(pDb, 16, 0, 1, &nPg); nRem -= nPg; } if( rc==LSM_OK ){ - rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, &nPg); + rc = sortedNewToplevel(pDb, TREE_NONE, &nPg); } nRem -= nPg; if( nPg ) bDirty = 1; } if( rc==LSM_OK && bDirty ){ - lsmFinishWork(pDb, 0, 0, &rc); + lsmFinishWork(pDb, 0, &rc); }else{ int rcdummy = LSM_BUSY; - lsmFinishWork(pDb, 0, 0, &rcdummy); + lsmFinishWork(pDb, 0, &rcdummy); } assert( pDb->pWorker==0 ); if( rc==LSM_OK ){ if( pnWrite ) *pnWrite = (nMax - nRem); @@ -4662,21 +4649,20 @@ ** This function is only called during system shutdown. The contents of ** any in-memory trees present (old or current) are written out to disk. */ int lsmFlushTreeToDisk(lsm_db *pDb){ int rc; - int nOvfl = 0; rc = lsmBeginWork(pDb); while( rc==LSM_OK && sortedDbIsFull(pDb) ){ rc = sortedWork(pDb, 256, 0, 1, 0); } if( rc==LSM_OK ){ - rc = sortedNewToplevel(pDb, TREE_BOTH, &nOvfl, 0); + rc = sortedNewToplevel(pDb, TREE_BOTH, 0); } - lsmFinishWork(pDb, 1, nOvfl, &rc); + lsmFinishWork(pDb, 1, &rc); return rc; } /* ** Return a string representation of the segment passed as the only argument.