Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -71,10 +71,12 @@ #define LSM_CKPT_MIN_NONLSM (LSM_CKPT_MIN_FREELIST - LSM_CKPT_MAX_REFREE) typedef struct Database Database; typedef struct DbLog DbLog; typedef struct FileSystem FileSystem; +typedef struct Freelist Freelist; +typedef struct FreelistEntry FreelistEntry; typedef struct Level Level; typedef struct LogMark LogMark; typedef struct LogRegion LogRegion; typedef struct LogWriter LogWriter; typedef struct LsmString LsmString; @@ -320,10 +322,12 @@ TransMark *aTrans; /* Array of marks for transaction rollback */ IntArray rollback; /* List of tree-nodes to roll back */ /* Worker context */ Snapshot *pWorker; /* Worker snapshot (or NULL) */ + Freelist *pFreelist; /* See sortedNewToplevel() */ + int bUseFreelist; /* True to use pFreelist */ /* Debugging message callback */ void (*xLog)(void *, int, const char *); void *pLogCtx; @@ -448,18 +452,31 @@ /* Return true if shm-sequence "a" is larger than or equal to "b" */ #define shm_sequence_ge(a, b) (((u32)a-(u32)b) < (1<<30)) #define LSM_APPLIST_SZ 4 -typedef struct Freelist Freelist; -typedef struct FreelistEntry FreelistEntry; - /* -** An instance of the following structure stores the current database free -** block list. The free list is a list of blocks that are not currently -** used by the worker snapshot. Assocated with each block in the list is the -** snapshot id of the most recent snapshot that did actually use the block. +** An instance of the following structure stores the in-memory part of +** the current free block list. This structure is to the free block list +** as the in-memory tree is to the users database content. The contents +** of the free block list is found by merging the in-memory components +** with those stored in the LSM, just as the contents of the database is +** found by merging the in-memory tree with the user data entries in the +** LSM. +** +** Each FreelistEntry structure in the array represents either an insert +** or delete operation on the free-list. For deletes, the FreelistEntry.iId +** field is set to -1. For inserts, it is set to zero or greater. +** +** The array of FreelistEntry structures is always sorted in order of +** block number (ascending). +** +** When the in-memory free block list is written into the LSM, each insert +** operation is written separately. The entry key is the bitwise inverse +** of the block number as a 32-bit big-endian integer. This is done so that +** the entries in the LSM are sorted in descending order of block id. +** The associated value is the snapshot id, formated as a varint. */ struct Freelist { FreelistEntry *aEntry; /* Free list entries */ int nEntry; /* Number of valid slots in aEntry[] */ int nAlloc; /* Allocated size of aEntry[] */ @@ -689,10 +706,12 @@ ** Functions from file "lsm_sorted.c". */ int lsmInfoPageDump(lsm_db *, Pgno, int, char **); void lsmSortedCleanup(lsm_db *); int lsmSortedAutoWork(lsm_db *, int nUnit); + +int lsmSortedWalkFreelist(lsm_db *, int (*)(void *, int, i64), void *); int lsmFlushTreeToDisk(lsm_db *pDb); void lsmSortedRemap(lsm_db *pDb); Index: src/lsm_ckpt.c ================================================================== --- src/lsm_ckpt.c +++ src/lsm_ckpt.c @@ -381,11 +381,10 @@ } }; static int ckptExportSnapshot( lsm_db *pDb, /* Connection handle */ - int nOvfl, /* Number of free-list entries in LSM */ int bLog, /* True to update log-offset fields */ i64 iId, /* Checkpoint id */ int bCksum, /* If true, include checksums */ void **ppCkpt, /* OUT: Buffer containing checkpoint */ int *pnCkpt /* OUT: Size of checkpoint in bytes */ @@ -397,19 +396,10 @@ int iLevel; /* Used to count out nLevel levels */ int iOut = 0; /* Current offset in aCkpt[] */ Level *pLevel; /* Level iterator */ int i; /* Iterator used while serializing freelist */ CkptBuffer ckpt; - int nFree; - - nFree = pSnap->freelist.nEntry; - if( nOvfl>=0 ){ - nFree -= nOvfl; - }else{ - assert( 0 ); - nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL]; - } /* Initialize the output buffer */ memset(&ckpt, 0, sizeof(CkptBuffer)); ckpt.pEnv = pDb->pEnv; iOut = CKPT_HDR_SIZE; @@ -430,10 +420,11 @@ iLevel++; } /* Write the freelist */ if( rc==LSM_OK ){ + int nFree = pSnap->freelist.nEntry; ckptSetValue(&ckpt, iOut++, nFree, &rc); for(i=0; ifreelist.aEntry[i]; ckptSetValue(&ckpt, iOut++, p->iBlk, &rc); ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc); @@ -448,11 +439,11 @@ ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc); ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc); ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc); ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc); ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc); - ckptSetValue(&ckpt, CKPT_HDR_OVFL, (nOvfl?nOvfl:pSnap->nFreelistOvfl), &rc); + ckptSetValue(&ckpt, CKPT_HDR_OVFL, 0, &rc); ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc); if( bCksum ){ ckptAddChecksum(&ckpt, iOut, &rc); }else{ @@ -462,11 +453,11 @@ iOut += 2; assert( iOut<=1024 ); #ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, rc, - "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl + "ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry ); #endif *ppCkpt = (void *)ckpt.aCkpt; if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut; @@ -678,10 +669,12 @@ lsm_db *pDb, /* Database handle (must hold worker lock) */ void **ppVal, /* OUT: lsmMalloc'd buffer */ int *pnVal, /* OUT: Size of *ppVal in bytes */ int *pnOvfl /* OUT: Number of freelist entries in buf */ ){ + assert( 0 ); +#if 0 int rc = LSM_OK; int nRet; Snapshot *p = pDb->pWorker; assert( lsmShmAssertWorker(pDb) ); @@ -719,10 +712,11 @@ *pnVal = iOut*sizeof(u32); } *pnOvfl = nRet; return rc; +#endif } /* ** The connection must be the worker in order to call this function. ** @@ -731,10 +725,11 @@ ** to save the current worker snapshot, a new top-level LSM segment must ** be created so that some of them can be written to the LSM. */ int lsmCheckpointOverflowRequired(lsm_db *pDb){ Snapshot *p = pDb->pWorker; + assert( 0 ); assert( lsmShmAssertWorker(pDb) ); return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0); } /* @@ -745,10 +740,12 @@ */ int lsmCheckpointOverflowLoad( lsm_db *pDb, Freelist *pFreelist ){ + assert( 0 ); +#if 0 int rc; int nVal = 0; void *pVal = 0; assert( lsmShmAssertWorker(pDb) ); @@ -809,10 +806,11 @@ lsmFree(pDb->pEnv, pVal); } return rc; +#endif } /* ** Read the checkpoint id from meta-page pPg. */ @@ -1152,27 +1150,21 @@ ShmHeader *pShm = pDb->pShmhdr; void *p = 0; int n = 0; int rc; -#if 0 -if( bFlush ){ - printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1); - fflush(stdout); -} -#endif - assert( lsmFsIntegrityCheck(pDb) ); - rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); + rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); memcpy(pShm->aSnap2, p, n); lsmShmBarrier(pDb); memcpy(pShm->aSnap1, p, n); lsmFree(pDb->pEnv, p); + assert( lsmFsIntegrityCheck(pDb) ); return LSM_OK; } /* ** This function is used to determine the snapshot-id of the most recently Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -2163,10 +2163,26 @@ aUsed[iBlk-1] = 1; } } } } + +typedef struct CheckFreelistCtx CheckFreelistCtx; +typedef struct CheckFreelistCtx { + u8 *aUsed; + int nBlock; +}; +static int checkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){ + CheckFreelistCtx *p = (CheckFreelistCtx *)pCtx; + + assert( iBlk>=1 ); + assert( iBlk<=p->nBlock ); + assert( p->aUsed[iBlk-1]==0 ); + p->aUsed[iBlk-1] = 1; + + return 0; +} /* ** This function checks that all blocks in the database file are accounted ** for. For each block, exactly one of the following must be true: ** @@ -2178,10 +2194,11 @@ ** ** If no errors are found, non-zero is returned. If an error is found, an ** assert() fails. */ int lsmFsIntegrityCheck(lsm_db *pDb){ + CheckFreelistCtx ctx; FileSystem *pFS = pDb->pFS; int i; int j; Freelist freelist = {0, 0, 0}; u8 *aUsed; @@ -2190,13 +2207,13 @@ int nBlock = pWorker->nBlock; aUsed = lsmMallocZero(pDb->pEnv, nBlock); if( aUsed==0 ){ /* Malloc has failed. Since this function is only called within debug - ** builds, this probably means the user is running an OOM injection test. - ** Regardless, it will not be possible to run the integrity-check at this - ** time, so assume the database is Ok and return non-zero. */ + ** builds, this probably means the user is running an OOM injection test. + ** Regardless, it will not be possible to run the integrity-check at this + ** time, so assume the database is Ok and return non-zero. */ return 1; } for(pLevel=pWorker->pLevel; pLevel; pLevel=pLevel->pNext){ int i; @@ -2204,31 +2221,16 @@ for(i=0; inRight; i++){ checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed); } } - if( pWorker->nFreelistOvfl ){ - int rc = lsmCheckpointOverflowLoad(pDb, &freelist); - assert( rc==LSM_OK || rc==LSM_NOMEM ); - if( rc!=LSM_OK ) return 1; - } - - for(j=0; j<2; j++){ - Freelist *pFreelist; - if( j==0 ) pFreelist = &pWorker->freelist; - if( j==1 ) pFreelist = &freelist; - - for(i=0; inEntry; i++){ - u32 iBlk = pFreelist->aEntry[i].iBlk; - assert( iBlk<=nBlock ); - assert( aUsed[iBlk-1]==0 ); - aUsed[iBlk-1] = 1; - } - } + /* Mark all blocks in the free-list as used */ + ctx.aUsed = aUsed; + ctx.nBlock = nBlock; + lsmWalkFreelist(pDb, checkFreelistCb, (void *)&ctx); for(i=0; ipEnv, aUsed); lsmFree(pDb->pEnv, freelist.aEntry); return 1; } Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -82,65 +82,57 @@ #else # define assertNotInFreelist(x,y) #endif /* -** Append an entry to the free-list. +** Append an entry to the free-list. If (iId==-1), this is a delete. */ -int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){ +int freelistAppend(lsm_db *db, int iBlk, i64 iId){ + lsm_env *pEnv = db->pEnv; + Freelist *p; + int i; - /* Assert that this is not an attempt to insert a duplicate block number */ -#if 0 - assertNotInFreelist(p, iBlk); -#endif + assert( iId==-1 || iId>=0 ); + p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist; /* Extend the space allocated for the freelist, if required */ assert( p->nAlloc>=p->nEntry ); if( p->nAlloc==p->nEntry ){ int nNew; + int nByte; FreelistEntry *aNew; nNew = (p->nAlloc==0 ? 4 : p->nAlloc*2); - aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, - sizeof(FreelistEntry)*nNew); + nByte = sizeof(FreelistEntry) * nNew; + aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, nByte); if( !aNew ) return LSM_NOMEM_BKPT; p->nAlloc = nNew; p->aEntry = aNew; } - /* Append the new entry to the freelist */ - p->aEntry[p->nEntry].iBlk = iBlk; - p->aEntry[p->nEntry].iId = iId; - p->nEntry++; + for(i=0; inEntry; i++){ + assert( i==0 || p->aEntry[i].iBlk > p->aEntry[i-1].iBlk ); + if( p->aEntry[i].iBlk>=iBlk ) break; + } + + if( inEntry && p->aEntry[i].iBlk==iBlk ){ + /* Clobber an existing entry */ + p->aEntry[i].iId = iId; + }else{ + /* Insert a new entry into the list */ + int nByte = sizeof(FreelistEntry)*(p->nEntry-i); + memmove(&p->aEntry[i+1], &p->aEntry[i], nByte); + p->aEntry[i].iBlk = iBlk; + p->aEntry[i].iId = iId; + p->nEntry++; + } return LSM_OK; } -static int flInsertEntry(lsm_env *pEnv, Freelist *p, int iBlk){ - int rc; - - rc = lsmFreelistAppend(pEnv, p, iBlk, 1); - if( rc==LSM_OK ){ - memmove(&p->aEntry[1], &p->aEntry[0], sizeof(FreelistEntry)*(p->nEntry-1)); - p->aEntry[0].iBlk = iBlk; - p->aEntry[0].iId = 1; - } - return rc; -} - -/* -** Remove the first entry of the free-list. -*/ -static void flRemoveEntry0(Freelist *p){ - int nNew = p->nEntry - 1; - assert( nNew>=0 ); - memmove(&p->aEntry[0], &p->aEntry[1], sizeof(FreelistEntry) * nNew); - p->nEntry = nNew; -} - -/* -** tHIS Function frees all resources held by the Database structure passed +/* +** This function frees all resources held by the Database structure passed ** as the only argument. */ static void freeDatabase(lsm_env *pEnv, Database *p){ assert( holdingGlobalMutex(pEnv) ); if( p ){ @@ -426,10 +418,113 @@ void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){ pSnap->pLevel = pLevel; } +/* TODO: Shuffle things around to get rid of this */ +static int firstSnapshotInUse(lsm_db *, i64 *); + +/* +** Context object used by the lsmWalkFreelist() utility. +*/ +typedef struct WalkFreelistCtx WalkFreelistCtx; +struct WalkFreelistCtx { + lsm_db *pDb; + Freelist *pFreelist; + int iFree; + int (*xUsr)(void *, int, i64); /* User callback function */ + void *pUsrctx; /* User callback context */ +}; + +/* +** Callback used by lsmWalkFreelist(). +*/ +static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){ + WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx; + Freelist *pFree = p->pFreelist; + + while( (p->iFree < pFree->nEntry) ){ + FreelistEntry *pEntry = &pFree->aEntry[p->iFree]; + if( pEntry->iBlk>iBlk ){ + break; + }else{ + p->iFree++; + if( pEntry->iId>=0 + && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) + ){ + return 1; + } + if( pEntry->iBlk==iBlk ) return 0; + } + } + + return p->xUsr(p->pUsrctx, iBlk, iSnapshot); +} + +/* +** The database handle passed as the first argument must be the worker +** connection. This function iterates through the contents of the current +** free block list, invoking the supplied callback once for each list +** element. +** +** The difference between this function and lsmSortedWalkFreelist() is +** that lsmSortedWalkFreelist() only considers those free-list elements +** stored within the LSM. This function also merges in any in-memory +** elements. +*/ +int lsmWalkFreelist( + lsm_db *pDb, /* Database handle (must be worker) */ + int (*x)(void *, int, i64), /* Callback function */ + void *pCtx /* First argument to pass to callback */ +){ + int rc; + WalkFreelistCtx ctx; + ctx.pDb = pDb; + ctx.pFreelist = &pDb->pWorker->freelist; + ctx.iFree = 0; + ctx.xUsr = x; + ctx.pUsrctx = pCtx; + + rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx); + if( rc==LSM_OK ){ + int i; + for(i=ctx.iFree; inEntry; i++){ + FreelistEntry *pEntry = &ctx.pFreelist->aEntry[i]; + if( pEntry->iId>=0 && ctx.xUsr(ctx.pUsrctx, pEntry->iBlk, pEntry->iId) ){ + return 1; + } + } + } + + return rc; +} + +typedef struct FindFreeblockCtx FindFreeblockCtx; +struct FindFreeblockCtx { + i64 iInUse; + int iRet; +}; + +static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){ + FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx; + if( iSnapshotiInUse ){ + p->iRet = iBlk; + return 1; + } + return 0; +} + +static int findFreeblock(lsm_db *pDb, i64 iInUse, int *piRet){ + int rc; /* Return code */ + FindFreeblockCtx ctx; /* Context object */ + + ctx.iInUse = iInUse; + ctx.iRet = 0; + rc = lsmWalkFreelist(pDb, findFreeblockCb, (void *)&ctx); + *piRet = ctx.iRet; + return rc; +} /* ** Allocate a new database file block to write data to, either by extending ** the database file or by recycling a free-list entry. The worker snapshot ** must be held in order to call this function. @@ -437,60 +532,50 @@ ** If successful, *piBlk is set to the block number allocated and LSM_OK is ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned. */ int lsmBlockAllocate(lsm_db *pDb, int *piBlk){ Snapshot *p = pDb->pWorker; - Freelist *pFree; /* Database free list */ int iRet = 0; /* Block number of allocated block */ int rc = LSM_OK; - - assert( pDb->pWorker ); - - pFree = &p->freelist; - if( pFree->nEntry>0 ){ - /* The first block on the free list was freed as part of the work done - ** to create the snapshot with id iFree. So, we can reuse this block if - ** snapshot iFree or later has been checkpointed and all currently - ** active clients are reading from snapshot iFree or later. */ - i64 iFree = pFree->aEntry[0].iId; - int bInUse = 0; - - /* The "is in use" bit */ - rc = lsmLsmInUse(pDb, iFree, &bInUse); - - /* The "has been checkpointed" bit */ - if( rc==LSM_OK && bInUse==0 ){ - i64 iId = 0; - rc = lsmCheckpointSynced(pDb, &iId, 0, 0); - if( rc!=LSM_OK || iIdaEntry[0].iBlk; - flRemoveEntry0(pFree); - assert( iRet!=0 ); - } -#ifdef LSM_LOG_BLOCKS - lsmLogMessage( - pDb, 0, "%s reusing block %d%s", (iRet==0 ? "not " : ""), - pFree->aEntry[0].iBlk, - bInUse==0 ? "" : bInUse==1 ? " (client)" : " (unsynced)" - ); -#endif - } - - /* If no block was allocated from the free-list, allocate one at the - ** end of the file. */ - if( rc==LSM_OK && iRet==0 ){ - iRet = ++pDb->pWorker->nBlock; -#ifdef LSM_LOG_BLOCKS - lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet); -#endif - } - - *piBlk = iRet; - return LSM_OK; + i64 iInUse = 0; /* Snapshot id still in use */ + + assert( p ); + + /* Set iInUse to the smallest snapshot id that is either: + ** + ** * Currently in use by a database client, + ** * May be used by a database client in the future, or + ** * Is the most recently checkpointed snapshot (i.e. the one that will + ** be used following recovery if a failure occurs at this point). + */ + rc = lsmCheckpointSynced(pDb, &iInUse, 0, 0); + if( rc==LSM_OK && iInUse==0 ) iInUse = p->iId; + if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse); + + /* Query the free block list for a suitable block */ + if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet); + + /* If a block was found in the free block list, use it and remove it from + ** the list. Otherwise, if no suitable block was found, allocate one from + ** the end of the file. */ + if( rc==LSM_OK ){ + if( iRet>0 ){ +#ifdef LSM_LOG_BLOCKS + lsmLogMessage(pDb, 0, "reusing block %d", iRet); +#endif + rc = freelistAppend(pDb, iRet, -1); + }else{ +#ifdef LSM_LOG_BLOCKS + lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet); +#endif + iRet = ++(p->nBlock); + } + } + + assert( iRet>0 || rc!=LSM_OK ); + *piBlk = iRet; + return rc; } /* ** Free a database block. The worker snapshot must be held in order to call ** this function. @@ -498,18 +583,17 @@ ** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g. ** LSM_NOMEM). */ int lsmBlockFree(lsm_db *pDb, int iBlk){ Snapshot *p = pDb->pWorker; - assert( lsmShmAssertWorker(pDb) ); - /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */ + #ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk); #endif - return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId); + return freelistAppend(pDb, iBlk, p->iId); } /* ** Refree a database block. The worker snapshot must be held in order to call ** this function. @@ -525,11 +609,11 @@ Snapshot *p = pDb->pWorker; if( iBlk==p->nBlock ){ p->nBlock--; }else{ - rc = flInsertEntry(pDb->pEnv, &p->freelist, iBlk); + rc = freelistAppend(pDb, iBlk, 0); } return rc; } @@ -960,10 +1044,48 @@ return LSM_OK; } *pbInUse = 0; return rc; } + +/* +** This function is called by worker connections to determine the smallest +** snapshot id that is currently in use by a database client. The worker +** connection uses this result to determine whether or not it is safe to +** recycle a database block. +*/ +static int firstSnapshotInUse( + lsm_db *db, /* Database handle */ + i64 *piInUse /* IN/OUT: Smallest snapshot id in use */ +){ + ShmHeader *pShm = db->pShmhdr; + i64 iInUse = *piInUse; + int i; + + assert( iInUse>0 ); + for(i=0; iaReader[i]; + if( p->iLsmId ){ + i64 iThis = p->iLsmId; + if( iThis!=0 && iInUse>iThis ){ + int rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0); + if( rc==LSM_OK ){ + p->iLsmId = 0; + lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0); + }else if( rc==LSM_BUSY ){ + iInUse = iThis; + }else{ + /* Some error other than LSM_BUSY. Return the error code to + ** the caller in this case. */ + return rc; + } + } + } + } + + return LSM_OK; +} int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){ if( db->treehdr.iUsedShmid==iShmid ){ *pbInUse = 1; return LSM_OK; Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -194,10 +194,11 @@ Blob key; /* Cache of current key (or NULL) */ Blob val; /* Cache of current value */ /* All the component cursors: */ TreeCursor *apTreeCsr[2]; /* Up to two tree cursors */ + int iFree; /* Next element of free-list (-ve for eof) */ SegmentPtr *aPtr; /* Array of segment pointers */ int nPtr; /* Size of array aPtr[] */ BtreeCursor *pBtCsr; /* b-tree cursor (db writes only) */ /* Comparison results */ @@ -210,29 +211,26 @@ /* Used by worker cursors only */ Pgno *pPrevMergePtr; }; -#define CURSOR_DATA_TREE0 0 /* Current tree cursor */ -#define CURSOR_DATA_TREE1 1 /* The "old" tree, if any */ -#define CURSOR_DATA_SYSTEM 2 +/* +** The following constants are used to assign integers to each component +** cursor of a multi-cursor. +*/ +#define CURSOR_DATA_TREE0 0 /* Current tree cursor (apTreeCsr[0]) */ +#define CURSOR_DATA_TREE1 1 /* The "old" tree, if any (apTreeCsr[1]) */ +#define CURSOR_DATA_SYSTEM 2 /* Free-list entries (new-toplevel only) */ #define CURSOR_DATA_SEGMENT 3 /* ** CURSOR_IGNORE_DELETE ** If set, this cursor will not visit SORTED_DELETE keys. ** -** CURSOR_NEW_SYSTEM -** If set, then after all user data from the in-memory tree and any other -** cursors has been visited, the cursor visits the live (uncommitted) -** versions of the two system keys: FREELIST AND LEVELS. This is used when -** flushing the in-memory tree to disk - the new free-list and levels record -** are flushed along with it. -** -** CURSOR_AT_FREELIST -** This flag is set when sub-cursor CURSOR_DATA_SYSTEM is actually -** pointing at a free list. +** CURSOR_FLUSH_FREELIST +** This cursor is being used to create a new toplevel. It should also +** iterate through the contents of the in-memory free block list. ** ** CURSOR_IGNORE_SYSTEM ** If set, this cursor ignores system keys. ** ** CURSOR_NEXT_OK @@ -249,12 +247,11 @@ ** Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation. ** The key and value are stored in MultiCursor.key and MultiCursor.val ** respectively. */ #define CURSOR_IGNORE_DELETE 0x00000001 -#define CURSOR_NEW_SYSTEM 0x00000002 -#define CURSOR_AT_FREELIST 0x00000004 +#define CURSOR_FLUSH_FREELIST 0x00000002 #define CURSOR_IGNORE_SYSTEM 0x00000010 #define CURSOR_NEXT_OK 0x00000020 #define CURSOR_PREV_OK 0x00000040 #define CURSOR_READ_SEPARATORS 0x00000080 #define CURSOR_SEEK_EQ 0x00000100 @@ -1909,17 +1906,29 @@ lsmTreeCursorValue(pTreeCsr, &pVal, &nVal); } break; } - case CURSOR_DATA_SYSTEM: - if( pCsr->flags & CURSOR_AT_FREELIST ){ - pKey = (void *)"FREELIST"; - nKey = 8; - eType = LSM_SYSTEMKEY | LSM_INSERT; + case CURSOR_DATA_SYSTEM: { + Snapshot *pWorker = pCsr->pDb->pWorker; + if( (pCsr->flags & CURSOR_FLUSH_FREELIST) + && pWorker && pWorker->freelist.nEntry > pCsr->iFree + ){ + int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1; + FreelistEntry *pEntry = &pWorker->freelist.aEntry[iEntry]; + u32 i = ~((u32)(pEntry->iBlk)); + lsmPutU32(pCsr->pSystemVal, i); + pKey = pCsr->pSystemVal; + nKey = 4; + if( pEntry->iId>=0 ){ + eType = LSM_SYSTEMKEY | LSM_INSERT; + }else{ + eType = LSM_SYSTEMKEY | LSM_POINT_DELETE; + } } break; + } default: { int iPtr = iKey - CURSOR_DATA_SEGMENT; assert( iPtr>=0 ); if( iPtr==pCsr->nPtr ){ @@ -2252,14 +2261,18 @@ /* ** If the free-block list is not empty, then have this cursor visit a key ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value ** blob containing the serialized free-block list. */ -static void multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){ +static int multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){ + int rc = LSM_OK; + assert( pCsr ); pCsr->pnOvfl = pnOvfl; - pCsr->flags |= CURSOR_NEW_SYSTEM; + pCsr->flags |= CURSOR_FLUSH_FREELIST; + pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc); + return rc; } /* ** Allocate and return a new database cursor. */ @@ -2304,18 +2317,22 @@ *pnVal = 0; } break; } - case CURSOR_DATA_SYSTEM: - if( pCsr->flags & CURSOR_AT_FREELIST ){ - void *aVal; - rc = lsmCheckpointOverflow(pCsr->pDb, &aVal, pnVal, pCsr->pnOvfl); - assert( pCsr->pSystemVal==0 ); - *ppVal = pCsr->pSystemVal = aVal; + case CURSOR_DATA_SYSTEM: { + Snapshot *pWorker = pCsr->pDb->pWorker; + if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){ + int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1; + u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4]; + lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId); + *ppVal = aVal; + *pnVal = 8; + pCsr->pDb->bUseFreelist = 1; } break; + } default: { int iPtr = iVal-CURSOR_DATA_SEGMENT; if( iPtrnPtr ){ SegmentPtr *pPtr = &pCsr->aPtr[iPtr]; @@ -2326,10 +2343,60 @@ } } } assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) ); + return rc; +} + +/* +** This function is called by worker connections to walk the part of the +** free-list stored within the LSM data structure. +*/ +int lsmSortedWalkFreelist( + lsm_db *pDb, /* Database handle */ + int (*x)(void *, int, i64), /* Callback function */ + void *pCtx /* First argument to pass to callback */ +){ + MultiCursor *pCsr; /* Cursor used to read db */ + int rc = LSM_OK; /* Return Code */ + Snapshot *pSnap = 0; + + assert( pDb->pWorker ); + rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap); + if( rc!=LSM_OK ) return rc; + + pCsr = multiCursorNew(pDb, &rc); + if( pCsr ){ + rc = multiCursorAddAll(pCsr, pSnap); + pCsr->flags |= CURSOR_IGNORE_DELETE; + } + + if( rc==LSM_OK ){ + rc = lsmMCursorLast(pCsr); + while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){ + void *pKey; int nKey; + void *pVal; int nVal; + + rc = lsmMCursorKey(pCsr, &pKey, &nKey); + if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal); + if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT; + + if( rc==LSM_OK ){ + int iBlk; + i64 iSnap; + iBlk = (int)(~(lsmGetU32((u8 *)pKey))); + iSnap = (i64)lsmGetU64((u8 *)pVal); + if( x(pCtx, iBlk, iSnap) ) break; + rc = lsmMCursorPrev(pCsr); + } + } + } + + lsmMCursorClose(pCsr); + lsmFreeSnapshot(pDb->pEnv, pSnap); + return rc; } int lsmSortedLoadFreelist( lsm_db *pDb, /* Database handle (must be worker) */ @@ -2448,14 +2515,11 @@ } if( rc==LSM_OK && pCsr->apTreeCsr[1] ){ rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast); } - if( pCsr->flags & CURSOR_NEW_SYSTEM ){ - assert( bLast==0 ); - pCsr->flags |= CURSOR_AT_FREELIST; - } + pCsr->iFree = 0; for(i=0; rc==LSM_OK && inPtr; i++){ SegmentPtr *pPtr = &pCsr->aPtr[i]; Level *pLvl = pPtr->pLevel; @@ -2624,12 +2688,11 @@ Pgno iPgno = 0; /* FC pointer value */ if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE; assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE ); - assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 ); - assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 ); + assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 ); assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel ); pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ); rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop); if( rc==LSM_OK && bStop==0 ){ @@ -2759,14 +2822,13 @@ rc = lsmTreeCursorPrev(pTreeCsr); }else{ rc = lsmTreeCursorNext(pTreeCsr); } }else if( iKey==CURSOR_DATA_SYSTEM ){ - assert( pCsr->flags & CURSOR_AT_FREELIST ); - assert( pCsr->flags & CURSOR_NEW_SYSTEM ); + assert( pCsr->flags & CURSOR_FLUSH_FREELIST ); assert( bReverse==0 ); - pCsr->flags &= ~CURSOR_AT_FREELIST; + pCsr->iFree++; }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){ assert( bReverse==0 && pCsr->pBtCsr ); rc = btreeCursorNext(pCsr->pBtCsr); }else{ rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse); @@ -3825,13 +3887,17 @@ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ int nWrite = 0; /* Number of database pages written */ - + Freelist freelist; assert( pnOvfl ); + pDb->pFreelist = &freelist; + assert( pDb->bUseFreelist==0 ); + memset(&freelist, 0, sizeof(freelist)); + /* Allocate the new level structure to write to. */ pNext = lsmDbSnapshotLevel(pDb->pWorker); pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc); if( pNew ){ pNew->pNext = pNext; @@ -3842,12 +3908,14 @@ ** segment contains everything in the tree and pointers to the next segment ** in the database (if any). */ pCsr = multiCursorNew(pDb, &rc); if( pCsr ){ pCsr->pDb = pDb; - multiCursorVisitFreelist(pCsr, pnOvfl); - rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree); + rc = multiCursorVisitFreelist(pCsr, pnOvfl); + if( rc==LSM_OK ){ + rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree); + } if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){ pDel = &pNext->lhs; rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr); } @@ -3903,16 +3971,28 @@ #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel"); #endif if( rc==LSM_OK ){ + if( freelist.nEntry ){ + Freelist *p = &pDb->pWorker->freelist; + lsmFree(pDb->pEnv, p->aEntry); + memcpy(p, &freelist, sizeof(freelist)); + freelist.aEntry = 0; + }else{ + pDb->pWorker->freelist.nEntry = 0; + } + assertBtreeOk(pDb, &pNew->lhs); sortedInvokeWorkHook(pDb); } if( pnWrite ) *pnWrite = nWrite; pDb->pWorker->nWrite += nWrite; + pDb->pFreelist = 0; + pDb->bUseFreelist = 0; + lsmFree(pDb->pEnv, freelist.aEntry); return rc; } /* ** The nMerge levels in the LSM beginning with pLevel consist of a @@ -4369,11 +4449,10 @@ int nOvfl = 0; int bFlush = 0; int nMax = nPage; /* Maximum pages to write to disk */ int nRem = nPage; int bCkpt = 0; - int bToplevel = 0; /* Open the worker 'transaction'. It will be closed before this function ** returns. */ assert( pDb->pWorker==0 ); rc = lsmBeginWork(pDb); @@ -4405,21 +4484,19 @@ if( sortedDbIsFull(pDb) ){ int nPg = 0; rc = sortedWork(pDb, nRem, 0, 1, &nPg); nRem -= nPg; assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) ); - bToplevel = 1; } if( rc==LSM_OK && nRem>0 ){ int nPg = 0; rc = sortedNewToplevel(pDb, TREE_OLD, &nOvfl, &nPg); nRem -= nPg; if( rc==LSM_OK && pDb->nTransOpen>0 ){ lsmTreeDiscardOld(pDb); } bFlush = 1; - bToplevel = 0; } } /* If nPage is still greater than zero, do some merging. */ if( rc==LSM_OK && nRem>0 && bShutdown==0 ){ @@ -4426,23 +4503,27 @@ int nPg = 0; int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0); rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg); nRem -= nPg; if( nPg ){ - bToplevel = 1; nOvfl = 0; } } - if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){ + /* If the in-memory part of the free-list is too large, write a new + ** top-level containing just the in-memory free-list entries to disk. + */ + if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > LSM_MAX_FREELIST_ENTRIES ){ + int nPg = 0; while( rc==LSM_OK && sortedDbIsFull(pDb) ){ - int nPg = 0; rc = sortedWork(pDb, 16, 0, 1, &nPg); + nRem -= nPg; } - if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){ - rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, 0); + if( rc==LSM_OK ){ + rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, &nPg); } + nRem -= nPg; } if( rc==LSM_OK && (nRem!=nMax) ){ lsmFinishWork(pDb, bFlush, nOvfl, &rc); }else{