Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -4777,15 +4777,76 @@ *pRc = rc; } assert( *pRc==LSM_OK || bRet==0 ); return bRet; } + +/* +** Create a new free-list only top-level segment. Return LSM_OK if successful +** or an LSM error code if some error occurs. +*/ +static int sortedNewFreelistOnly(lsm_db *pDb){ + int rc; + Level *pLvl; + + pLvl = pDb->pWorker->pLevel; + if( pLvl->pNext!=0 || pLvl->nRight==0 ){ + pLvl = 0; + } + + rc = sortedNewToplevel(pDb, TREE_NONE, 0); + if( rc==LSM_OK && pLvl ){ + Level *pNew = pDb->pWorker->pLevel; + assert( pNew->pNext==pLvl ); + + if( pLvl->pSplitKey==0 ){ + sortedSplitkey(pDb, pLvl, &rc); + } + if( rc==LSM_OK && pLvl->iSplitTopic==0 ){ + /* Add the new top-level to the rhs of pLvl. */ + Merge *pMerge; + + Segment *aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, + sizeof(Segment) * (pLvl->nRight + 1), &rc + ); + if( rc==LSM_OK ){ + memcpy(&aRhs[1], pLvl->aRhs, sizeof(Segment) * pLvl->nRight); + aRhs[0] = pNew->lhs; + lsmFree(pDb->pEnv, pLvl->aRhs); + pLvl->aRhs = aRhs; + pLvl->nRight++; + } + + /* Also add an entry to the Merge object */ + pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, + sizeof(Merge) + sizeof(MergeInput)*(pLvl->pMerge->nInput+1), &rc + ); + if( rc==LSM_OK ){ + memcpy(pMerge, pLvl->pMerge, sizeof(Merge)); + pMerge->aInput = (MergeInput *)&pMerge[1]; + memcpy(&pMerge->aInput[1], pLvl->pMerge->aInput, + sizeof(MergeInput)*(pLvl->pMerge->nInput+1) + ); + pMerge->aInput[0].iPg = aRhs[0].iFirst; + pMerge->aInput[0].iCell = 0; + pMerge->nInput++; + lsmFree(pDb->pEnv, pLvl->pMerge); + pLvl->pMerge = pMerge; + + sortedFreeLevel(pDb->pEnv, pNew); + pDb->pWorker->pLevel = pLvl; + } + } + } + + return rc; +} int lsmSaveWorker(lsm_db *pDb, int bFlush){ Snapshot *p = pDb->pWorker; if( p->freelist.nEntry>pDb->nMaxFreelist ){ - int rc = sortedNewToplevel(pDb, TREE_NONE, 0); + int rc = sortedNewFreelistOnly(pDb); if( rc!=LSM_OK ) return rc; } return lsmCheckpointSaveWorker(pDb, bFlush); } @@ -4795,10 +4856,11 @@ int nMerge, /* Minimum segments to merge together */ int nPage, /* Number of pages to write to disk */ int *pnWrite, /* OUT: Pages actually written to disk */ int *pbCkpt /* OUT: True if an auto-checkpoint is req. */ ){ + Snapshot *pWorker; /* Worker snapshot */ int rc = LSM_OK; /* Return code */ int bDirty = 0; int nMax = nPage; /* Maximum pages to write to disk */ int nRem = nPage; int bCkpt = 0; @@ -4806,10 +4868,11 @@ /* Open the worker 'transaction'. It will be closed before this function ** returns. */ assert( pDb->pWorker==0 ); rc = lsmBeginWork(pDb); if( rc!=LSM_OK ) return rc; + pWorker = pDb->pWorker; /* If this connection is doing auto-checkpoints, set nMax (and nRem) so ** that this call stops writing when the auto-checkpoint is due. The ** caller will do the checkpoint, then possibly call this function again. */ if( bShutdown==0 && pDb->nAutockpt ){ @@ -4875,11 +4938,11 @@ while( rc==LSM_OK && lsmDatabaseFull(pDb) ){ rc = sortedWork(pDb, 16, nMerge, 1, &nPg); nRem -= nPg; } if( rc==LSM_OK ){ - rc = sortedNewToplevel(pDb, TREE_NONE, &nPg); + rc = sortedNewFreelistOnly(pDb); } nRem -= nPg; if( nPg ) bDirty = 1; } @@ -4892,10 +4955,17 @@ assert( pDb->pWorker==0 ); if( rc==LSM_OK ){ *pnWrite = (nMax - nRem); *pbCkpt = (bCkpt && nRem<=0); + if( nMerge==1 && pDb->nAutockpt>0 && bDirty + && pWorker->pLevel + && pWorker->pLevel->nRight==0 + && pWorker->pLevel->pNext==0 + ){ + *pbCkpt = 1; + } }else{ *pnWrite = 0; *pbCkpt = 0; }