Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -638,11 +638,11 @@ /* Creating, populating, gobbling and deleting sorted runs. */ void lsmFsGobble(lsm_db *, Segment *, Pgno *, int); int lsmFsSortedDelete(FileSystem *, Snapshot *, int, Segment *); int lsmFsSortedFinish(FileSystem *, Segment *); -int lsmFsSortedAppend(FileSystem *, Snapshot *, Segment *, int, Page **); +int lsmFsSortedAppend(FileSystem *, Snapshot *, Level *, int, Page **); int lsmFsSortedPadding(FileSystem *, Snapshot *, Segment *); /* Functions to retrieve the lsm_env pointer from a FileSystem or Page object */ lsm_env *lsmFsEnv(FileSystem *); lsm_env *lsmPageEnv(Page *); Index: src/lsm_ckpt.c ================================================================== --- src/lsm_ckpt.c +++ src/lsm_ckpt.c @@ -884,12 +884,15 @@ int rc; ShmHeader *pShm = pDb->pShmhdr; int nInt1; int nInt2; - /* Must be holding the WORKER lock to do this */ - assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) ); + /* Must be holding the WORKER lock to do this. Or DMS2. */ + assert( + lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) + || lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL) + ); /* Check that the two snapshots match. If not, repair them. */ nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT]; nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT]; if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){ @@ -901,10 +904,12 @@ return LSM_PROTOCOL; } } rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker); + if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase; + assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); return rc; } int lsmCheckpointDeserialize( Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -1549,38 +1549,50 @@ } return rc; } -static Pgno findAppendPoint(FileSystem *pFS){ +static Pgno findAppendPoint(FileSystem *pFS, Level *pLvl){ int i; Pgno *aiAppend = pFS->pDb->pWorker->aiAppend; u32 iRet = 0; for(i=LSM_APPLIST_SZ-1; iRet==0 && i>=0; i--){ - if( (iRet = aiAppend[i]) ) aiAppend[i] = 0; + if( (iRet = aiAppend[i]) ){ + if( pLvl ){ + int iBlk = fsPageToBlock(pFS, iRet); + int j; + for(j=0; iRet && jnRight; j++){ + if( fsPageToBlock(pFS, pLvl->aRhs[j].iLastPg)==iBlk ){ + iRet = 0; + } + } + } + if( iRet ) aiAppend[i] = 0; + } } return iRet; } /* -** Append a page to file iFile. Set the ref-count to 1 and return a pointer -** to it. The page is writable until either lsmFsPagePersist() is called on -** it or the ref-count drops to zero. +** Append a page to the left-hand-side of pLvl. Set the ref-count to 1 and +** return a pointer to it. The page is writable until either +** lsmFsPagePersist() is called on it or the ref-count drops to zero. */ int lsmFsSortedAppend( FileSystem *pFS, Snapshot *pSnapshot, - Segment *p, + Level *pLvl, int bDefer, Page **ppOut ){ int rc = LSM_OK; Page *pPg = 0; *ppOut = 0; int iApp = 0; int iNext = 0; + Segment *p = &pLvl->lhs; int iPrev = p->iLastPg; if( pFS->pCompress || bDefer ){ /* In compressed database mode the page is not assigned a page number ** or location in the database file at this point. This will be done @@ -1598,11 +1610,11 @@ pPg->nRef = 1; pFS->nOut++; } }else{ if( iPrev==0 ){ - iApp = findAppendPoint(pFS); + iApp = findAppendPoint(pFS, pLvl); }else if( fsIsLast(pFS, iPrev) ){ int iNext; rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPrev), &iNext); if( rc!=LSM_OK ) return rc; iApp = fsFirstPageOnBlock(pFS, iNext); @@ -1840,11 +1852,11 @@ Pgno iApp = pSeg->iLastPg+1; /* If this is the first data written into the segment, find an append-point ** or allocate a new block. */ if( iApp==1 ){ - pSeg->iFirst = iApp = findAppendPoint(pFS); + pSeg->iFirst = iApp = findAppendPoint(pFS, 0); if( iApp==0 ){ int iBlk; rc = lsmBlockAllocate(pFS->pDb, &iBlk); pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk); } Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -148,10 +148,88 @@ /* Free the memory allocated for the Database struct itself */ lsmFree(pEnv, p); } } + +typedef struct DbTruncateCtx DbTruncateCtx; +struct DbTruncateCtx { + int nBlock; + i64 iInUse; +}; + +static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){ + DbTruncateCtx *p = (DbTruncateCtx *)pCtx; + if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1; + p->nBlock--; + return 0; +} + +static int dbTruncate(lsm_db *pDb, i64 iInUse){ + int rc; + int i; + DbTruncateCtx ctx; + + assert( pDb->pWorker ); + ctx.nBlock = pDb->pWorker->nBlock; + ctx.iInUse = iInUse; + + rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx); + for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){ + rc = freelistAppend(pDb, i, -1); + } + + if( rc==LSM_OK ){ +#ifdef LSM_LOG_FREELIST + if( ctx.nBlock!=pDb->pWorker->nBlock ){ + lsmLogMessage(pDb, 0, + "dbTruncate(): truncated db to %d blocks",ctx.nBlock + ); + } +#endif + pDb->pWorker->nBlock = ctx.nBlock; + } + return rc; +} + + +/* +** This function is called during database shutdown (when the number of +** connections drops from one to zero). It truncates the database file +** to as small a size as possible without truncating away any blocks that +** contain data. +*/ +static int dbTruncateFile(lsm_db *pDb){ + int rc; + + assert( pDb->pWorker==0 ); + assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL) ); + rc = lsmCheckpointLoadWorker(pDb); + + if( rc==LSM_OK ){ + DbTruncateCtx ctx; + + /* Walk the database free-block-list in reverse order. Set ctx.nBlock + ** to the block number of the last block in the database that actually + ** contains data. */ + ctx.nBlock = pDb->pWorker->nBlock; + ctx.iInUse = -1; + rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx); + + /* If the last block that contains data is not already the last block in + ** the database file, truncate the database file so that it is. */ + if( rc==LSM_OK && ctx.nBlock!=pDb->pWorker->nBlock ){ + rc = lsmFsTruncateDb( + pDb->pFS, (i64)ctx.nBlock*lsmFsBlockSize(pDb->pFS) + ); + } + } + + lsmFreeSnapshot(pDb->pEnv, pDb->pWorker); + pDb->pWorker = 0; + return rc; +} static void doDbDisconnect(lsm_db *pDb){ int rc; /* Block for an exclusive lock on DMS1. This lock serializes all calls @@ -181,13 +259,15 @@ /* Write a checkpoint to disk. */ if( rc==LSM_OK ){ rc = lsmCheckpointWrite(pDb, 1, 0); } - /* If the checkpoint was written successfully, delete the log file */ + /* If the checkpoint was written successfully, delete the log file + ** and, if possible, truncate the database file. */ if( rc==LSM_OK ){ Database *p = pDb->pDatabase; + dbTruncateFile(pDb); lsmFsCloseAndDeleteLog(pDb->pFS); if( p->pFile ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1); } } } @@ -540,50 +620,10 @@ } return rc; } -typedef struct DbTruncateCtx DbTruncateCtx; -struct DbTruncateCtx { - int nBlock; - i64 iInUse; -}; - -static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){ - DbTruncateCtx *p = (DbTruncateCtx *)pCtx; - if( iBlk!=p->nBlock || iSnapshot>=p->iInUse ) return 1; - p->nBlock--; - return 0; -} - -static int dbTruncate(lsm_db *pDb, i64 iInUse){ - int rc; - int i; - DbTruncateCtx ctx; - - assert( pDb->pWorker ); - ctx.nBlock = pDb->pWorker->nBlock; - ctx.iInUse = iInUse; - - rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx); - for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){ - rc = freelistAppend(pDb, i, -1); - } - - if( rc==LSM_OK ){ -#ifdef LSM_LOG_FREELIST - if( ctx.nBlock!=pDb->pWorker->nBlock ){ - lsmLogMessage(pDb, 0, - "dbTruncate(): truncated db to %d blocks",ctx.nBlock - ); - } -#endif - pDb->pWorker->nBlock = ctx.nBlock; - } - return rc; -} - typedef struct FindFreeblockCtx FindFreeblockCtx; struct FindFreeblockCtx { i64 iInUse; int iRet; @@ -821,11 +861,10 @@ rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0); /* Deserialize the current worker snapshot */ if( rc==LSM_OK ){ rc = lsmCheckpointLoadWorker(pDb); - if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase; } return rc; } void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){ Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -3106,22 +3106,19 @@ */ static int mergeWorkerMoveHierarchy( MergeWorker *pMW, /* Merge worker */ int bSep /* True for separators run */ ){ - Segment *pSeg; /* Segment being written */ lsm_db *pDb = pMW->pDb; /* Database handle */ int rc = LSM_OK; /* Return code */ int i; Page **apHier = pMW->hier.apHier; int nHier = pMW->hier.nHier; - pSeg = &pMW->pLevel->lhs; - for(i=0; rc==LSM_OK && ipFS, pDb->pWorker, pSeg, 1, &pNew); + rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &pNew); assert( rc==LSM_OK ); if( rc==LSM_OK ){ u8 *a1; int n1; u8 *a2; int n2; @@ -3295,11 +3292,10 @@ Pgno iPtr, Pgno iKeyPg, void *pKey, int nKey ){ - Segment *pSeg = &pMW->pLevel->lhs; Hierarchy *p = &pMW->hier; lsm_db *pDb = pMW->pDb; /* Database handle */ int rc = LSM_OK; /* Return Code */ int iLevel; /* Level of b-tree hierachy to write to */ int nData; /* Size of aData[] in bytes */ @@ -3363,11 +3359,11 @@ /* Allocate a new page for apHier[iLevel]. */ p->apHier[iLevel] = 0; if( rc==LSM_OK ){ rc = lsmFsSortedAppend( - pDb->pFS, pDb->pWorker, pSeg, 1, &p->apHier[iLevel] + pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &p->apHier[iLevel] ); } if( rc!=LSM_OK ) return rc; aData = fsPageData(p->apHier[iLevel], &nData); @@ -3563,13 +3559,12 @@ int rc = LSM_OK; /* Return code */ Page *pNext = 0; /* New page appended to run */ lsm_db *pDb = pMW->pDb; /* Database handle */ Segment *pSeg; /* Run to append to */ - pSeg = &pMW->pLevel->lhs; - rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, 0, &pNext); - assert( rc!=LSM_OK || pSeg->iFirst>0 || pMW->pDb->compress.xCompress ); + rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 0, &pNext); + assert( rc || pMW->pLevel->lhs.iFirst>0 || pMW->pDb->compress.xCompress ); if( rc==LSM_OK ){ u8 *aData; /* Data buffer belonging to page pNext */ int nData; /* Size of aData[] in bytes */ @@ -4917,10 +4912,11 @@ } if( rc==LSM_OK ){ rc = sortedNewToplevel(pDb, TREE_BOTH, 0); } + lsmFinishWork(pDb, 1, &rc); return rc; } /*