Index: lsm-test/lsmtest_tdb3.c ================================================================== --- lsm-test/lsmtest_tdb3.c +++ lsm-test/lsmtest_tdb3.c @@ -896,11 +896,11 @@ int bClear, TestDb **ppDb ){ const char *zCfg = "page_size=256 block_size=65536 write_buffer=16384 " - "max_freelist=4 autocheckpoint=32768 " + "max_freelist=2 autocheckpoint=32768 " "mmap=0 " ; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } @@ -909,11 +909,11 @@ int bClear, TestDb **ppDb ){ const char *zCfg = "page_size=256 block_size=65536 write_buffer=16384 " - "max_freelist=4 autocheckpoint=32768 compression=1" + "max_freelist=2 autocheckpoint=32768 compression=1" "mmap=0 " ; return testLsmOpen(zCfg, zFilename, bClear, ppDb); } Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -60,21 +60,15 @@ ** spaces. The following macro is used to test for this. */ #define LSM_IS_64_BIT (sizeof(void*)==8) #define LSM_AUTOWORK_QUANT 32 -/* Minimum number of free-list entries to store in the checkpoint, assuming -** the free-list contains this many entries. i.e. if overflow is required, -** the first LSM_CKPT_MIN_FREELIST entries are stored in the checkpoint and -** the remainder in an LSM system entry. */ -#define LSM_CKPT_MIN_FREELIST 6 -#define LSM_CKPT_MAX_REFREE 2 -#define LSM_CKPT_MIN_NONLSM (LSM_CKPT_MIN_FREELIST - LSM_CKPT_MAX_REFREE) - typedef struct Database Database; typedef struct DbLog DbLog; typedef struct FileSystem FileSystem; +typedef struct Freelist Freelist; +typedef struct FreelistEntry FreelistEntry; typedef struct Level Level; typedef struct LogMark LogMark; typedef struct LogRegion LogRegion; typedef struct LogWriter LogWriter; typedef struct LsmString LsmString; @@ -320,10 +314,12 @@ TransMark *aTrans; /* Array of marks for transaction rollback */ IntArray rollback; /* List of tree-nodes to roll back */ /* Worker context */ Snapshot *pWorker; /* Worker snapshot (or NULL) */ + Freelist *pFreelist; /* See sortedNewToplevel() */ + int bUseFreelist; /* True to use pFreelist */ /* Debugging message callback */ void (*xLog)(void *, int, const char *); void *pLogCtx; @@ -448,18 +444,31 @@ /* Return true if shm-sequence "a" is larger than or equal to "b" */ #define shm_sequence_ge(a, b) (((u32)a-(u32)b) < (1<<30)) #define LSM_APPLIST_SZ 4 -typedef struct Freelist Freelist; -typedef struct FreelistEntry FreelistEntry; - /* -** An instance of the following structure stores the current database free -** block list. The free list is a list of blocks that are not currently -** used by the worker snapshot. Assocated with each block in the list is the -** snapshot id of the most recent snapshot that did actually use the block. +** An instance of the following structure stores the in-memory part of +** the current free block list. This structure is to the free block list +** as the in-memory tree is to the users database content. The contents +** of the free block list is found by merging the in-memory components +** with those stored in the LSM, just as the contents of the database is +** found by merging the in-memory tree with the user data entries in the +** LSM. +** +** Each FreelistEntry structure in the array represents either an insert +** or delete operation on the free-list. For deletes, the FreelistEntry.iId +** field is set to -1. For inserts, it is set to zero or greater. +** +** The array of FreelistEntry structures is always sorted in order of +** block number (ascending). +** +** When the in-memory free block list is written into the LSM, each insert +** operation is written separately. The entry key is the bitwise inverse +** of the block number as a 32-bit big-endian integer. This is done so that +** the entries in the LSM are sorted in descending order of block id. +** The associated value is the snapshot id, formated as a varint. */ struct Freelist { FreelistEntry *aEntry; /* Free list entries */ int nEntry; /* Number of valid slots in aEntry[] */ int nAlloc; /* Allocated size of aEntry[] */ @@ -482,11 +491,10 @@ /* Used by worker snapshots only */ int nBlock; /* Number of blocks in database file */ Pgno aiAppend[LSM_APPLIST_SZ]; /* Append point list */ Freelist freelist; /* Free block list */ - int nFreelistOvfl; /* Number of extra free-list entries in LSM */ u32 nWrite; /* Total number of pages written to disk */ }; #define LSM_INITIAL_SNAPSHOT_ID 11 /* @@ -494,14 +502,10 @@ */ int lsmCheckpointWrite(lsm_db *, u32 *); int lsmCheckpointLevels(lsm_db *, int, void **, int *); int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal); -int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *); -int lsmCheckpointOverflowRequired(lsm_db *pDb); -int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *); - int lsmCheckpointRecover(lsm_db *); int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **); int lsmCheckpointLoadWorker(lsm_db *pDb); int lsmCheckpointStore(lsm_db *pDb, int); @@ -516,11 +520,11 @@ int lsmCheckpointPgsz(u32 *); int lsmCheckpointBlksz(u32 *); void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog); void lsmCheckpointZeroLogoffset(lsm_db *); -int lsmCheckpointSaveWorker(lsm_db *pDb, int, int); +int lsmCheckpointSaveWorker(lsm_db *pDb, int); int lsmDatabaseFull(lsm_db *pDb); int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite); /* @@ -690,10 +694,12 @@ */ int lsmInfoPageDump(lsm_db *, Pgno, int, char **); void lsmSortedCleanup(lsm_db *); int lsmSortedAutoWork(lsm_db *, int nUnit); +int lsmSortedWalkFreelist(lsm_db *, int (*)(void *, int, i64), void *); + int lsmFlushTreeToDisk(lsm_db *pDb); void lsmSortedRemap(lsm_db *pDb); void lsmSortedFreeLevel(lsm_env *pEnv, Level *); @@ -771,11 +777,11 @@ int lsmBeginReadTrans(lsm_db *); int lsmBeginWriteTrans(lsm_db *); int lsmBeginFlush(lsm_db *); int lsmBeginWork(lsm_db *); -void lsmFinishWork(lsm_db *, int, int, int *); +void lsmFinishWork(lsm_db *, int, int *); int lsmFinishRecovery(lsm_db *); void lsmFinishReadTrans(lsm_db *); int lsmFinishWriteTrans(lsm_db *, int); int lsmFinishFlush(lsm_db *, int); Index: src/lsm_ckpt.c ================================================================== --- src/lsm_ckpt.c +++ src/lsm_ckpt.c @@ -165,11 +165,11 @@ static const int one = 1; #define LSM_LITTLE_ENDIAN (*(u8 *)(&one)) /* Sizes, in integers, of various parts of the checkpoint. */ -#define CKPT_HDR_SIZE 9 +#define CKPT_HDR_SIZE 8 #define CKPT_LOGPTR_SIZE 4 #define CKPT_APPENDLIST_SIZE (LSM_APPLIST_SZ * 2) /* A #define to describe each integer in the checkpoint header. */ #define CKPT_HDR_ID_MSW 0 @@ -177,17 +177,16 @@ #define CKPT_HDR_NCKPT 2 #define CKPT_HDR_NBLOCK 3 #define CKPT_HDR_BLKSZ 4 #define CKPT_HDR_NLEVEL 5 #define CKPT_HDR_PGSZ 6 -#define CKPT_HDR_OVFL 7 -#define CKPT_HDR_NWRITE 8 +#define CKPT_HDR_NWRITE 7 -#define CKPT_HDR_LO_MSW 9 -#define CKPT_HDR_LO_LSW 10 -#define CKPT_HDR_LO_CKSUM1 11 -#define CKPT_HDR_LO_CKSUM2 12 +#define CKPT_HDR_LO_MSW 8 +#define CKPT_HDR_LO_LSW 9 +#define CKPT_HDR_LO_CKSUM1 10 +#define CKPT_HDR_LO_CKSUM2 11 typedef struct CkptBuffer CkptBuffer; /* ** Dynamic buffer used to accumulate data for a checkpoint. @@ -381,11 +380,10 @@ } }; static int ckptExportSnapshot( lsm_db *pDb, /* Connection handle */ - int nOvfl, /* Number of free-list entries in LSM */ int bLog, /* True to update log-offset fields */ i64 iId, /* Checkpoint id */ int bCksum, /* If true, include checksums */ void **ppCkpt, /* OUT: Buffer containing checkpoint */ int *pnCkpt /* OUT: Size of checkpoint in bytes */ @@ -397,19 +395,10 @@ int iLevel; /* Used to count out nLevel levels */ int iOut = 0; /* Current offset in aCkpt[] */ Level *pLevel; /* Level iterator */ int i; /* Iterator used while serializing freelist */ CkptBuffer ckpt; - int nFree; - - nFree = pSnap->freelist.nEntry; - if( nOvfl>=0 ){ - nFree -= nOvfl; - }else{ - assert( 0 ); - nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL]; - } /* Initialize the output buffer */ memset(&ckpt, 0, sizeof(CkptBuffer)); ckpt.pEnv = pDb->pEnv; iOut = CKPT_HDR_SIZE; @@ -430,10 +419,11 @@ iLevel++; } /* Write the freelist */ if( rc==LSM_OK ){ + int nFree = pSnap->freelist.nEntry; ckptSetValue(&ckpt, iOut++, nFree, &rc); for(i=0; ifreelist.aEntry[i]; ckptSetValue(&ckpt, iOut++, p->iBlk, &rc); ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc); @@ -448,11 +438,10 @@ ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc); ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc); ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc); ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc); ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc); - ckptSetValue(&ckpt, CKPT_HDR_OVFL, (nOvfl?nOvfl:pSnap->nFreelistOvfl), &rc); ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc); if( bCksum ){ ckptAddChecksum(&ckpt, iOut, &rc); }else{ @@ -462,11 +451,11 @@ iOut += 2; assert( iOut<=1024 ); #ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, rc, - "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl + "ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry ); #endif *ppCkpt = (void *)ckpt.aCkpt; if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut; @@ -663,155 +652,10 @@ }else{ *pnVal = 0; *paVal = 0; } - return rc; -} - -/* -** The worker lock must be held to call this function. -** -** The function serializes and returns the data that should be stored as -** the FREELIST system record. -*/ -int lsmCheckpointOverflow( - lsm_db *pDb, /* Database handle (must hold worker lock) */ - void **ppVal, /* OUT: lsmMalloc'd buffer */ - int *pnVal, /* OUT: Size of *ppVal in bytes */ - int *pnOvfl /* OUT: Number of freelist entries in buf */ -){ - int rc = LSM_OK; - int nRet; - Snapshot *p = pDb->pWorker; - - assert( lsmShmAssertWorker(pDb) ); - assert( pnOvfl && ppVal && pnVal ); - assert( pDb->nMaxFreelist>=2 && pDb->nMaxFreelist<=LSM_MAX_FREELIST_ENTRIES ); - - if( p->nFreelistOvfl ){ - rc = lsmCheckpointOverflowLoad(pDb, &p->freelist); - if( rc!=LSM_OK ) return rc; - p->nFreelistOvfl = 0; - } - - if( p->freelist.nEntry<=pDb->nMaxFreelist ){ - nRet = 0; - *pnVal = 0; - *ppVal = 0; - }else{ - int i; /* Iterator variable */ - int iOut = 0; /* Current size of blob in ckpt */ - CkptBuffer ckpt; /* Used to build FREELIST blob */ - - nRet = (p->freelist.nEntry - pDb->nMaxFreelist); - - memset(&ckpt, 0, sizeof(CkptBuffer)); - ckpt.pEnv = pDb->pEnv; - for(i=p->freelist.nEntry-nRet; rc==LSM_OK && ifreelist.nEntry; i++){ - FreelistEntry *pEntry = &p->freelist.aEntry[i]; - ckptSetValue(&ckpt, iOut++, pEntry->iBlk, &rc); - ckptSetValue(&ckpt, iOut++, (pEntry->iId >> 32) & 0xFFFFFFFF, &rc); - ckptSetValue(&ckpt, iOut++, pEntry->iId & 0xFFFFFFFF, &rc); - } - ckptChangeEndianness(ckpt.aCkpt, iOut); - - *ppVal = ckpt.aCkpt; - *pnVal = iOut*sizeof(u32); - } - - *pnOvfl = nRet; - return rc; -} - -/* -** The connection must be the worker in order to call this function. -** -** True is returned if there are currently too many free-list entries -** in-memory to store in a checkpoint. Before calling CheckpointSaveWorker() -** to save the current worker snapshot, a new top-level LSM segment must -** be created so that some of them can be written to the LSM. -*/ -int lsmCheckpointOverflowRequired(lsm_db *pDb){ - Snapshot *p = pDb->pWorker; - assert( lsmShmAssertWorker(pDb) ); - return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0); -} - -/* -** Connection pDb must be the worker to call this function. -** -** Load the FREELIST record from the database. Decode it and append the -** results to list pFreelist. -*/ -int lsmCheckpointOverflowLoad( - lsm_db *pDb, - Freelist *pFreelist -){ - int rc; - int nVal = 0; - void *pVal = 0; - assert( lsmShmAssertWorker(pDb) ); - - /* Load the blob of data from the LSM. If that is successful (and the - ** blob is greater than zero bytes in size), decode the contents and - ** merge them into the current contents of *pFreelist. */ - rc = lsmSortedLoadFreelist(pDb, &pVal, &nVal); - if( pVal ){ - u32 *aFree = (u32 *)pVal; - int nFree = nVal / sizeof(int); - ckptChangeEndianness(aFree, nFree); - if( (nFree % 3) ){ - rc = LSM_CORRUPT_BKPT; - }else{ - int iNew = 0; /* Offset of next element in aFree[] */ - int iOld = 0; /* Next element in freelist fl */ - Freelist fl = *pFreelist; /* Original contents of *pFreelist */ - - memset(pFreelist, 0, sizeof(Freelist)); - while( rc==LSM_OK && (iNew=fl.nEntry ){ - iBlk = aFree[iNew]; - iId = ((i64)(aFree[iNew+1])<<32) + (i64)aFree[iNew+2]; - iNew += 3; - }else if( iNew>=nFree ){ - iBlk = fl.aEntry[iOld].iBlk; - iId = fl.aEntry[iOld].iId; - iOld += 1; - }else{ - iId = ((i64)(aFree[iNew+1])<<32) + (i64)aFree[iNew+2]; - if( iIdpEnv, pFreelist, iBlk, iId); - } - lsmFree(pDb->pEnv, fl.aEntry); - -#ifdef LSM_DEBUG - if( rc==LSM_OK ){ - int i; - for(i=1; rc==LSM_OK && inEntry; i++){ - assert( pFreelist->aEntry[i].iId >= pFreelist->aEntry[i-1].iId ); - } - assert( pFreelist->nEntry==(fl.nEntry + nFree/3) ); - } -#endif - } - - lsmFree(pDb->pEnv, pVal); - } - return rc; } /* ** Read the checkpoint id from meta-page pPg. @@ -1080,11 +924,10 @@ pNew->aiAppend[i] = ckptRead64(a); } /* Copy the free-list */ if( bInclFreelist ){ - pNew->nFreelistOvfl = aCkpt[CKPT_HDR_OVFL]; nFree = aCkpt[iIn++]; if( nFree ){ pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc( pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc ); @@ -1145,34 +988,28 @@ ** the new snapshot produced by the work performed by pDb. ** ** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM ** error code is returned. */ -int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){ +int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){ Snapshot *pSnap = pDb->pWorker; ShmHeader *pShm = pDb->pShmhdr; void *p = 0; int n = 0; int rc; -#if 0 -if( bFlush ){ - printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1); - fflush(stdout); -} -#endif - assert( lsmFsIntegrityCheck(pDb) ); - rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); + rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); memcpy(pShm->aSnap2, p, n); lsmShmBarrier(pDb); memcpy(pShm->aSnap1, p, n); lsmFree(pDb->pEnv, p); + assert( lsmFsIntegrityCheck(pDb) ); return LSM_OK; } /* ** This function is used to determine the snapshot-id of the most recently Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -187,11 +187,12 @@ int szSector; /* Database file sector size */ /* If this is a compressed database, a pointer to the compression methods. ** For an uncompressed database, a NULL pointer. */ lsm_compress *pCompress; - u8 *aBuffer; /* Buffer to compress into */ + u8 *aIBuffer; /* Buffer to compress to */ + u8 *aOBuffer; /* Buffer to uncompress from */ int nBuffer; /* Allocated size of aBuffer[] in bytes */ /* mmap() mode things */ int bUseMmap; /* True to use mmap() to access db file */ void *pMap; /* Current mapping of database file */ @@ -556,11 +557,12 @@ if( pFS->fdDb ) lsmEnvClose(pFS->pEnv, pFS->fdDb ); if( pFS->fdLog ) lsmEnvClose(pFS->pEnv, pFS->fdLog ); lsmFree(pEnv, pFS->pLsmFile); lsmFree(pEnv, pFS->apHash); - lsmFree(pEnv, pFS->aBuffer); + lsmFree(pEnv, pFS->aIBuffer); + lsmFree(pEnv, pFS->aOBuffer); lsmFree(pEnv, pFS); } } void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp){ @@ -1013,20 +1015,31 @@ rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk); *piRes = fsFirstPageOnBlock(pFS, iBlk) + iAdd - (iEob - iOff + 1); return rc; } -static int fsAllocateBuffer(FileSystem *pFS){ +static int fsAllocateBuffer(FileSystem *pFS, int bWrite){ + u8 **pp; /* Pointer to either aIBuffer or aOBuffer */ + assert( pFS->pCompress ); - if( pFS->aBuffer==0 ){ + + /* If neither buffer has been allocated, figure out how large they + ** should be. Store this value in FileSystem.nBuffer. */ + if( pFS->nBuffer==0 ){ + assert( pFS->aIBuffer==0 && pFS->aOBuffer==0 ); pFS->nBuffer = pFS->pCompress->xBound(pFS->pCompress->pCtx, pFS->nPagesize); if( pFS->nBuffer<(pFS->szSector+6) ){ pFS->nBuffer = pFS->szSector+6; } - pFS->aBuffer = lsmMalloc(pFS->pEnv, LSM_MAX(pFS->nBuffer, pFS->nPagesize)); - if( pFS->aBuffer==0 ) return LSM_NOMEM_BKPT; + } + + pp = (bWrite ? &pFS->aOBuffer : &pFS->aIBuffer); + if( *pp==0 ){ + *pp = lsmMalloc(pFS->pEnv, LSM_MAX(pFS->nBuffer, pFS->nPagesize)); + if( *pp==0 ) return LSM_NOMEM_BKPT; } + return LSM_OK; } /* ** This function is only called in compressed database mode. It reads and @@ -1045,11 +1058,11 @@ u8 aSz[3]; int rc; assert( p && pPg->nCompress==0 ); - if( fsAllocateBuffer(pFS) ) return LSM_NOMEM; + if( fsAllocateBuffer(pFS, 0) ) return LSM_NOMEM; rc = fsReadData(pFS, iOff, aSz, sizeof(aSz)); if( rc==LSM_OK ){ int bFree; @@ -1069,18 +1082,18 @@ rc = fsAddOffset(pFS, iOff, 3, &iOff); if( rc==LSM_OK ){ if( pPg->nCompress>pFS->nBuffer ){ rc = LSM_CORRUPT_BKPT; }else{ - rc = fsReadData(pFS, iOff, pFS->aBuffer, pPg->nCompress); + rc = fsReadData(pFS, iOff, pFS->aIBuffer, pPg->nCompress); } if( rc==LSM_OK ){ int n = pFS->nPagesize; rc = p->xUncompress(p->pCtx, (char *)pPg->aData, &n, - (const char *)pFS->aBuffer, pPg->nCompress - ); + (const char *)pFS->aIBuffer, pPg->nCompress + ); if( rc==LSM_OK && n!=pPg->nData ){ rc = LSM_CORRUPT_BKPT; } } } @@ -1534,11 +1547,12 @@ /* If this is the first page allocated, or if the page allocated is the ** last in the block, allocate a new block here. */ if( iApp==0 || fsIsLast(pFS, iApp) ){ int iNew; /* New block number */ - lsmBlockAllocate(pFS->pDb, &iNew); + rc = lsmBlockAllocate(pFS->pDb, &iNew); + if( rc!=LSM_OK ) return rc; if( iApp==0 ){ iApp = fsFirstPageOnBlock(pFS, iNew); }else{ iNext = fsFirstPageOnBlock(pFS, iNew); } @@ -1844,16 +1858,16 @@ ** allocates it. If this fails, LSM_NOMEM is returned. Otherwise, LSM_OK. */ static int fsCompressIntoBuffer(FileSystem *pFS, Page *pPg){ lsm_compress *p = pFS->pCompress; - if( fsAllocateBuffer(pFS) ) return LSM_NOMEM; + if( fsAllocateBuffer(pFS, 1) ) return LSM_NOMEM; assert( pPg->nData==pFS->nPagesize ); pPg->nCompress = pFS->nBuffer; return p->xCompress(p->pCtx, - (char *)pFS->aBuffer, &pPg->nCompress, + (char *)pFS->aOBuffer, &pPg->nCompress, (const char *)pPg->aData, pPg->nData ); } /* @@ -1880,11 +1894,11 @@ /* Serialize the compressed size into buffer aSz[] */ putRecordSize(aSz, pPg->nCompress, 0); /* Write the serialized page record into the database file. */ pPg->iPg = fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc); - fsAppendData(pFS, pPg->pSeg, pFS->aBuffer, pPg->nCompress, &rc); + fsAppendData(pFS, pPg->pSeg, pFS->aOBuffer, pPg->nCompress, &rc); fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc); /* Now that it has a page number, insert the page into the hash table */ iHash = fsHashKey(pFS->nHash, pPg->iPg); pPg->pHashNext = pFS->apHash[iHash]; @@ -1950,12 +1964,12 @@ if( nPad>=6 ){ pSeg->nSize += nPad; nPad -= 6; putRecordSize(aSz, nPad, 1); fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc); - memset(pFS->aBuffer, 0, nPad); - fsAppendData(pFS, pSeg, pFS->aBuffer, nPad, &rc); + memset(pFS->aOBuffer, 0, nPad); + fsAppendData(pFS, pSeg, pFS->aOBuffer, nPad, &rc); fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc); }else if( nPad>0 ){ u8 aBuf[5] = {0,0,0,0,0}; aBuf[0] = (u8)nPad; aBuf[nPad-1] = (u8)nPad; @@ -2122,15 +2136,38 @@ *pzOut = str.z; } if( bUnlock ){ int rcwork = LSM_BUSY; - lsmFinishWork(pDb, 0, 0, &rcwork); + lsmFinishWork(pDb, 0, &rcwork); } return rc; } +/* +** The following macros are used by the integrity-check code. Associated with +** each block in the database is an 8-bit bit mask (the entry in the aUsed[] +** array). As the integrity-check meanders through the database, it sets the +** following bits to indicate how each block is used. +** +** INTEGRITY_CHECK_FIRST_PG: +** First page of block is in use by sorted run. +** +** INTEGRITY_CHECK_LAST_PG: +** Last page of block is in use by sorted run. +** +** INTEGRITY_CHECK_USED: +** At least one page of the block is in use by a sorted run. +** +** INTEGRITY_CHECK_FREE: +** The free block list contains an entry corresponding to this block. +*/ +#define INTEGRITY_CHECK_FIRST_PG 0x01 +#define INTEGRITY_CHECK_LAST_PG 0x02 +#define INTEGRITY_CHECK_USED 0x04 +#define INTEGRITY_CHECK_FREE 0x08 + /* ** Helper function for lsmFsIntegrityCheck() */ static void checkBlocks( FileSystem *pFS, @@ -2139,33 +2176,82 @@ int nUsed, u8 *aUsed ){ if( pSeg ){ if( pSeg && pSeg->nSize>0 ){ + int rc; Pgno iLast = pSeg->iLastPg; int iBlk; int iLastBlk; - iBlk = fsPageToBlock(pFS, pSeg->iFirst); - iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg); - - while( iBlk ){ - assert( iBlk<=nUsed ); - /* assert( aUsed[iBlk-1]==0 ); */ - aUsed[iBlk-1] = 1; - if( iBlk!=iLastBlk ){ - fsBlockNext(pFS, iBlk, &iBlk); - }else{ - iBlk = 0; - } - } - - if( bExtra && iLast==fsLastPageOnPagesBlock(pFS, iLast) ){ - fsBlockNext(pFS, iLastBlk, &iBlk); - aUsed[iBlk-1] = 1; - } - } - } + int bLastIsLastOnBlock; + + iBlk = fsPageToBlock(pFS, pSeg->iFirst); + iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg); + bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==iLast); + assert( iBlk>0 ); + + /* If the first page of this run is also the first page of its first + ** block, set the flag to indicate that the first page of iBlk is + ** in use. */ + if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst ){ + assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 ); + aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG; + } + + do { + /* iBlk is a part of this sorted run. */ + aUsed[iBlk-1] |= INTEGRITY_CHECK_USED; + + /* Unless the sorted run finishes before the last page on this block, + ** the last page of this block is also in use. */ + if( iBlk!=iLastBlk || bLastIsLastOnBlock ){ + assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_LAST_PG)==0 ); + aUsed[iBlk-1] |= INTEGRITY_CHECK_LAST_PG; + } + + /* Special case. The sorted run being scanned is the output run of + ** a level currently undergoing an incremental merge. The sorted + ** run ends on the last page of iBlk, but the next block has already + ** been allocated. So mark it as in use as well. */ + if( iBlk==iLastBlk && bLastIsLastOnBlock && bExtra ){ + int iExtra = 0; + rc = fsBlockNext(pFS, iBlk, &iExtra); + assert( rc==LSM_OK ); + + assert( aUsed[iExtra-1]==0 ); + aUsed[iExtra-1] |= INTEGRITY_CHECK_USED; + aUsed[iExtra-1] |= INTEGRITY_CHECK_FIRST_PG; + aUsed[iExtra-1] |= INTEGRITY_CHECK_LAST_PG; + } + + /* Move on to the next block in the sorted run. Or set iBlk to zero + ** in order to break out of the loop if this was the last block in + ** the run. */ + if( iBlk==iLastBlk ){ + iBlk = 0; + }else{ + rc = fsBlockNext(pFS, iBlk, &iBlk); + assert( rc==LSM_OK ); + } + }while( iBlk ); + } + } +} + +typedef struct CheckFreelistCtx CheckFreelistCtx; +typedef struct CheckFreelistCtx { + u8 *aUsed; + int nBlock; +}; +static int checkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){ + CheckFreelistCtx *p = (CheckFreelistCtx *)pCtx; + + assert( iBlk>=1 ); + assert( iBlk<=p->nBlock ); + assert( p->aUsed[iBlk-1]==0 ); + p->aUsed[iBlk-1] = INTEGRITY_CHECK_FREE; + return 0; } /* ** This function checks that all blocks in the database file are accounted ** for. For each block, exactly one of the following must be true: @@ -2178,25 +2264,27 @@ ** ** If no errors are found, non-zero is returned. If an error is found, an ** assert() fails. */ int lsmFsIntegrityCheck(lsm_db *pDb){ + CheckFreelistCtx ctx; FileSystem *pFS = pDb->pFS; int i; int j; + int rc; Freelist freelist = {0, 0, 0}; u8 *aUsed; Level *pLevel; Snapshot *pWorker = pDb->pWorker; int nBlock = pWorker->nBlock; aUsed = lsmMallocZero(pDb->pEnv, nBlock); if( aUsed==0 ){ /* Malloc has failed. Since this function is only called within debug - ** builds, this probably means the user is running an OOM injection test. - ** Regardless, it will not be possible to run the integrity-check at this - ** time, so assume the database is Ok and return non-zero. */ + ** builds, this probably means the user is running an OOM injection test. + ** Regardless, it will not be possible to run the integrity-check at this + ** time, so assume the database is Ok and return non-zero. */ return 1; } for(pLevel=pWorker->pLevel; pLevel; pLevel=pLevel->pNext){ int i; @@ -2204,30 +2292,18 @@ for(i=0; inRight; i++){ checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed); } } - if( pWorker->nFreelistOvfl ){ - int rc = lsmCheckpointOverflowLoad(pDb, &freelist); - assert( rc==LSM_OK || rc==LSM_NOMEM ); - if( rc!=LSM_OK ) return 1; - } - - for(j=0; j<2; j++){ - Freelist *pFreelist; - if( j==0 ) pFreelist = &pWorker->freelist; - if( j==1 ) pFreelist = &freelist; - - for(i=0; inEntry; i++){ - u32 iBlk = pFreelist->aEntry[i].iBlk; - assert( iBlk<=nBlock ); - assert( aUsed[iBlk-1]==0 ); - aUsed[iBlk-1] = 1; - } - } - - for(i=0; ipEnv, aUsed); lsmFree(pDb->pEnv, freelist.aEntry); return 1; Index: src/lsm_main.c ================================================================== --- src/lsm_main.c +++ src/lsm_main.c @@ -370,11 +370,11 @@ } static void infoFreeWorker(lsm_db *pDb, int bUnlock){ if( bUnlock ){ int rcdummy = LSM_BUSY; - lsmFinishWork(pDb, 0, 0, &rcdummy); + lsmFinishWork(pDb, 0, &rcdummy); } } int lsmStructList( lsm_db *pDb, /* Database handle */ @@ -421,11 +421,11 @@ /* Obtain the worker snapshot */ rc = infoGetWorker(pDb, &pWorker, &bUnlock); if( rc!=LSM_OK ) return rc; lsmStringInit(&s, pDb->pEnv); - lsmStringAppendf(&s, "%d+%d",pWorker->freelist.nEntry,pWorker->nFreelistOvfl); + lsmStringAppendf(&s, "%d", pWorker->freelist.nEntry); for(i=0; ifreelist.nEntry; i++){ FreelistEntry *p = &pWorker->freelist.aEntry[i]; lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId); } rc = s.n>=0 ? LSM_OK : LSM_NOMEM; Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -82,65 +82,57 @@ #else # define assertNotInFreelist(x,y) #endif /* -** Append an entry to the free-list. +** Append an entry to the free-list. If (iId==-1), this is a delete. */ -int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){ +int freelistAppend(lsm_db *db, int iBlk, i64 iId){ + lsm_env *pEnv = db->pEnv; + Freelist *p; + int i; - /* Assert that this is not an attempt to insert a duplicate block number */ -#if 0 - assertNotInFreelist(p, iBlk); -#endif + assert( iId==-1 || iId>=0 ); + p = db->bUseFreelist ? db->pFreelist : &db->pWorker->freelist; /* Extend the space allocated for the freelist, if required */ assert( p->nAlloc>=p->nEntry ); if( p->nAlloc==p->nEntry ){ int nNew; + int nByte; FreelistEntry *aNew; nNew = (p->nAlloc==0 ? 4 : p->nAlloc*2); - aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, - sizeof(FreelistEntry)*nNew); + nByte = sizeof(FreelistEntry) * nNew; + aNew = (FreelistEntry *)lsmRealloc(pEnv, p->aEntry, nByte); if( !aNew ) return LSM_NOMEM_BKPT; p->nAlloc = nNew; p->aEntry = aNew; } - /* Append the new entry to the freelist */ - p->aEntry[p->nEntry].iBlk = iBlk; - p->aEntry[p->nEntry].iId = iId; - p->nEntry++; + for(i=0; inEntry; i++){ + assert( i==0 || p->aEntry[i].iBlk > p->aEntry[i-1].iBlk ); + if( p->aEntry[i].iBlk>=iBlk ) break; + } + + if( inEntry && p->aEntry[i].iBlk==iBlk ){ + /* Clobber an existing entry */ + p->aEntry[i].iId = iId; + }else{ + /* Insert a new entry into the list */ + int nByte = sizeof(FreelistEntry)*(p->nEntry-i); + memmove(&p->aEntry[i+1], &p->aEntry[i], nByte); + p->aEntry[i].iBlk = iBlk; + p->aEntry[i].iId = iId; + p->nEntry++; + } return LSM_OK; } -static int flInsertEntry(lsm_env *pEnv, Freelist *p, int iBlk){ - int rc; - - rc = lsmFreelistAppend(pEnv, p, iBlk, 1); - if( rc==LSM_OK ){ - memmove(&p->aEntry[1], &p->aEntry[0], sizeof(FreelistEntry)*(p->nEntry-1)); - p->aEntry[0].iBlk = iBlk; - p->aEntry[0].iId = 1; - } - return rc; -} - -/* -** Remove the first entry of the free-list. -*/ -static void flRemoveEntry0(Freelist *p){ - int nNew = p->nEntry - 1; - assert( nNew>=0 ); - memmove(&p->aEntry[0], &p->aEntry[1], sizeof(FreelistEntry) * nNew); - p->nEntry = nNew; -} - -/* -** tHIS Function frees all resources held by the Database structure passed +/* +** This function frees all resources held by the Database structure passed ** as the only argument. */ static void freeDatabase(lsm_env *pEnv, Database *p){ assert( holdingGlobalMutex(pEnv) ); if( p ){ @@ -426,10 +418,126 @@ void lsmDbSnapshotSetLevel(Snapshot *pSnap, Level *pLevel){ pSnap->pLevel = pLevel; } +/* TODO: Shuffle things around to get rid of this */ +static int firstSnapshotInUse(lsm_db *, i64 *); + +/* +** Context object used by the lsmWalkFreelist() utility. +*/ +typedef struct WalkFreelistCtx WalkFreelistCtx; +struct WalkFreelistCtx { + lsm_db *pDb; + Freelist *pFreelist; + int iFree; + int (*xUsr)(void *, int, i64); /* User callback function */ + void *pUsrctx; /* User callback context */ +}; + +/* +** Callback used by lsmWalkFreelist(). +*/ +static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){ + WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx; + Freelist *pFree = p->pFreelist; + + if( pFree ){ + while( (p->iFree < pFree->nEntry) ){ + FreelistEntry *pEntry = &pFree->aEntry[p->iFree]; + if( pEntry->iBlk>iBlk ){ + break; + }else{ + p->iFree++; + if( pEntry->iId>=0 + && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) + ){ + return 1; + } + if( pEntry->iBlk==iBlk ) return 0; + } + } + } + + return p->xUsr(p->pUsrctx, iBlk, iSnapshot); +} + +/* +** The database handle passed as the first argument must be the worker +** connection. This function iterates through the contents of the current +** free block list, invoking the supplied callback once for each list +** element. +** +** The difference between this function and lsmSortedWalkFreelist() is +** that lsmSortedWalkFreelist() only considers those free-list elements +** stored within the LSM. This function also merges in any in-memory +** elements. +*/ +int lsmWalkFreelist( + lsm_db *pDb, /* Database handle (must be worker) */ + int (*x)(void *, int, i64), /* Callback function */ + void *pCtx /* First argument to pass to callback */ +){ + int rc; + int iCtx; + + WalkFreelistCtx ctx[2]; + + ctx[0].pDb = pDb; + ctx[0].pFreelist = &pDb->pWorker->freelist; + ctx[0].iFree = 0; + ctx[0].xUsr = walkFreelistCb; + ctx[0].pUsrctx = (void *)&ctx[1]; + + ctx[1].pDb = pDb; + ctx[1].pFreelist = pDb->pFreelist; + ctx[1].iFree = 0; + ctx[1].xUsr = x; + ctx[1].pUsrctx = pCtx; + + rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx[0]); + + for(iCtx=0; iCtx<2; iCtx++){ + int i; + WalkFreelistCtx *p = &ctx[iCtx]; + for(i=p->iFree; p->pFreelist && rc==LSM_OK && ipFreelist->nEntry; i++){ + FreelistEntry *pEntry = &p->pFreelist->aEntry[i]; + if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){ + return LSM_OK; + } + } + } + + return rc; +} + +typedef struct FindFreeblockCtx FindFreeblockCtx; +struct FindFreeblockCtx { + i64 iInUse; + int iRet; +}; + +static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){ + FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx; + if( iSnapshotiInUse ){ + p->iRet = iBlk; + return 1; + } + return 0; +} + +static int findFreeblock(lsm_db *pDb, i64 iInUse, int *piRet){ + int rc; /* Return code */ + FindFreeblockCtx ctx; /* Context object */ + + ctx.iInUse = iInUse; + ctx.iRet = 0; + rc = lsmWalkFreelist(pDb, findFreeblockCb, (void *)&ctx); + *piRet = ctx.iRet; + return rc; +} /* ** Allocate a new database file block to write data to, either by extending ** the database file or by recycling a free-list entry. The worker snapshot ** must be held in order to call this function. @@ -437,60 +545,52 @@ ** If successful, *piBlk is set to the block number allocated and LSM_OK is ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned. */ int lsmBlockAllocate(lsm_db *pDb, int *piBlk){ Snapshot *p = pDb->pWorker; - Freelist *pFree; /* Database free list */ int iRet = 0; /* Block number of allocated block */ int rc = LSM_OK; - - assert( pDb->pWorker ); - - pFree = &p->freelist; - if( pFree->nEntry>0 ){ - /* The first block on the free list was freed as part of the work done - ** to create the snapshot with id iFree. So, we can reuse this block if - ** snapshot iFree or later has been checkpointed and all currently - ** active clients are reading from snapshot iFree or later. */ - i64 iFree = pFree->aEntry[0].iId; - int bInUse = 0; - - /* The "is in use" bit */ - rc = lsmLsmInUse(pDb, iFree, &bInUse); - - /* The "has been checkpointed" bit */ - if( rc==LSM_OK && bInUse==0 ){ - i64 iId = 0; - rc = lsmCheckpointSynced(pDb, &iId, 0, 0); - if( rc!=LSM_OK || iIdaEntry[0].iBlk; - flRemoveEntry0(pFree); - assert( iRet!=0 ); - } -#ifdef LSM_LOG_BLOCKS - lsmLogMessage( - pDb, 0, "%s reusing block %d%s", (iRet==0 ? "not " : ""), - pFree->aEntry[0].iBlk, - bInUse==0 ? "" : bInUse==1 ? " (client)" : " (unsynced)" - ); -#endif - } - - /* If no block was allocated from the free-list, allocate one at the - ** end of the file. */ - if( rc==LSM_OK && iRet==0 ){ - iRet = ++pDb->pWorker->nBlock; -#ifdef LSM_LOG_BLOCKS - lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet); -#endif - } - + i64 iInUse = 0; /* Snapshot id still in use */ + + assert( p ); + + /* Set iInUse to the smallest snapshot id that is either: + ** + ** * Currently in use by a database client, + ** * May be used by a database client in the future, or + ** * Is the most recently checkpointed snapshot (i.e. the one that will + ** be used following recovery if a failure occurs at this point). + */ + rc = lsmCheckpointSynced(pDb, &iInUse, 0, 0); + if( rc==LSM_OK && iInUse==0 ) iInUse = p->iId; + if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId); + if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse); + + /* Query the free block list for a suitable block */ + if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet); + + /* If a block was found in the free block list, use it and remove it from + ** the list. Otherwise, if no suitable block was found, allocate one from + ** the end of the file. */ + if( rc==LSM_OK ){ + if( iRet>0 ){ +#ifdef LSM_LOG_FREELIST + lsmLogMessage(pDb, 0, + "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse); +#endif + rc = freelistAppend(pDb, iRet, -1); + }else{ + iRet = ++(p->nBlock); +#ifdef LSM_LOG_FREELIST + lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet); +#endif + } + } + + assert( iRet>0 || rc!=LSM_OK ); *piBlk = iRet; - return LSM_OK; + return rc; } /* ** Free a database block. The worker snapshot must be held in order to call ** this function. @@ -498,18 +598,17 @@ ** If successful, LSM_OK is returned. Otherwise, an lsm error code (e.g. ** LSM_NOMEM). */ int lsmBlockFree(lsm_db *pDb, int iBlk){ Snapshot *p = pDb->pWorker; - assert( lsmShmAssertWorker(pDb) ); - /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */ + #ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk); #endif - return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId); + return freelistAppend(pDb, iBlk, p->iId); } /* ** Refree a database block. The worker snapshot must be held in order to call ** this function. @@ -522,16 +621,15 @@ */ int lsmBlockRefree(lsm_db *pDb, int iBlk){ int rc = LSM_OK; /* Return code */ Snapshot *p = pDb->pWorker; - if( iBlk==p->nBlock ){ - p->nBlock--; - }else{ - rc = flInsertEntry(pDb->pEnv, &p->freelist, iBlk); - } +#ifdef LSM_LOG_FREELIST + lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk); +#endif + rc = freelistAppend(pDb, iBlk, 0); return rc; } /* ** If required, copy a database checkpoint from shared memory into the @@ -632,18 +730,17 @@ ** Argument bFlush is true if the contents of the in-memory tree has just ** been flushed to disk. The significance of this is that once the snapshot ** created to hold the updated state of the database is synced to disk, log ** file space can be recycled. */ -void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){ +void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){ assert( *pRc!=0 || pDb->pWorker ); if( pDb->pWorker ){ /* If no error has occurred, serialize the worker snapshot and write ** it to shared memory. */ - assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 ); if( *pRc==LSM_OK ){ - *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl); + *pRc = lsmCheckpointSaveWorker(pDb, bFlush); } lsmFreeSnapshot(pDb->pEnv, pDb->pWorker); pDb->pWorker = 0; } @@ -720,17 +817,17 @@ rc = LSM_OK; } } #if 0 if( rc==LSM_OK && pDb->pClient ){ - printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n", + fprintf(stderr, + "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n", (void *)pDb, (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, (int)pDb->treehdr.root.iTransId, (int)pDb->treehdr.iOldShmid ); - fflush(stdout); } #endif } if( rc!=LSM_OK ){ @@ -756,10 +853,18 @@ if( pClient ){ lsmFreeSnapshot(pDb->pEnv, pDb->pClient); pDb->pClient = 0; } #endif + +#if 0 +if( pDb->pClient && pDb->iReader>=0 ){ + fprintf(stderr, + "finished reading %p: snapshot:%d\n", (void *)pDb, (int)pDb->pClient->iId + ); +} +#endif if( pDb->iReader>=0 ) lsmReleaseReadlock(pDb); } /* ** Open a write transaction. @@ -960,10 +1065,49 @@ return LSM_OK; } *pbInUse = 0; return rc; } + +/* +** This function is called by worker connections to determine the smallest +** snapshot id that is currently in use by a database client. The worker +** connection uses this result to determine whether or not it is safe to +** recycle a database block. +*/ +static int firstSnapshotInUse( + lsm_db *db, /* Database handle */ + i64 *piInUse /* IN/OUT: Smallest snapshot id in use */ +){ + ShmHeader *pShm = db->pShmhdr; + i64 iInUse = *piInUse; + int i; + + assert( iInUse>0 ); + for(i=0; iaReader[i]; + if( p->iLsmId ){ + i64 iThis = p->iLsmId; + if( iThis!=0 && iInUse>iThis ){ + int rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0); + if( rc==LSM_OK ){ + p->iLsmId = 0; + lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0); + }else if( rc==LSM_BUSY ){ + iInUse = iThis; + }else{ + /* Some error other than LSM_BUSY. Return the error code to + ** the caller in this case. */ + return rc; + } + } + } + } + + *piInUse = iInUse; + return LSM_OK; +} int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){ if( db->treehdr.iUsedShmid==iShmid ){ *pbInUse = 1; return LSM_OK; Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -194,45 +194,42 @@ Blob key; /* Cache of current key (or NULL) */ Blob val; /* Cache of current value */ /* All the component cursors: */ TreeCursor *apTreeCsr[2]; /* Up to two tree cursors */ + int iFree; /* Next element of free-list (-ve for eof) */ SegmentPtr *aPtr; /* Array of segment pointers */ int nPtr; /* Size of array aPtr[] */ BtreeCursor *pBtCsr; /* b-tree cursor (db writes only) */ /* Comparison results */ int nTree; /* Size of aTree[] array */ int *aTree; /* Array of comparison results */ /* Used by cursors flushing the in-memory tree only */ - int *pnOvfl; /* Number of free-list entries to store */ void *pSystemVal; /* Pointer to buffer to free */ /* Used by worker cursors only */ Pgno *pPrevMergePtr; }; -#define CURSOR_DATA_TREE0 0 /* Current tree cursor */ -#define CURSOR_DATA_TREE1 1 /* The "old" tree, if any */ -#define CURSOR_DATA_SYSTEM 2 +/* +** The following constants are used to assign integers to each component +** cursor of a multi-cursor. +*/ +#define CURSOR_DATA_TREE0 0 /* Current tree cursor (apTreeCsr[0]) */ +#define CURSOR_DATA_TREE1 1 /* The "old" tree, if any (apTreeCsr[1]) */ +#define CURSOR_DATA_SYSTEM 2 /* Free-list entries (new-toplevel only) */ #define CURSOR_DATA_SEGMENT 3 /* ** CURSOR_IGNORE_DELETE ** If set, this cursor will not visit SORTED_DELETE keys. ** -** CURSOR_NEW_SYSTEM -** If set, then after all user data from the in-memory tree and any other -** cursors has been visited, the cursor visits the live (uncommitted) -** versions of the two system keys: FREELIST AND LEVELS. This is used when -** flushing the in-memory tree to disk - the new free-list and levels record -** are flushed along with it. -** -** CURSOR_AT_FREELIST -** This flag is set when sub-cursor CURSOR_DATA_SYSTEM is actually -** pointing at a free list. +** CURSOR_FLUSH_FREELIST +** This cursor is being used to create a new toplevel. It should also +** iterate through the contents of the in-memory free block list. ** ** CURSOR_IGNORE_SYSTEM ** If set, this cursor ignores system keys. ** ** CURSOR_NEXT_OK @@ -249,12 +246,11 @@ ** Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation. ** The key and value are stored in MultiCursor.key and MultiCursor.val ** respectively. */ #define CURSOR_IGNORE_DELETE 0x00000001 -#define CURSOR_NEW_SYSTEM 0x00000002 -#define CURSOR_AT_FREELIST 0x00000004 +#define CURSOR_FLUSH_FREELIST 0x00000002 #define CURSOR_IGNORE_SYSTEM 0x00000010 #define CURSOR_NEXT_OK 0x00000020 #define CURSOR_PREV_OK 0x00000040 #define CURSOR_READ_SEPARATORS 0x00000080 #define CURSOR_SEEK_EQ 0x00000100 @@ -1909,17 +1905,29 @@ lsmTreeCursorValue(pTreeCsr, &pVal, &nVal); } break; } - case CURSOR_DATA_SYSTEM: - if( pCsr->flags & CURSOR_AT_FREELIST ){ - pKey = (void *)"FREELIST"; - nKey = 8; - eType = LSM_SYSTEMKEY | LSM_INSERT; + case CURSOR_DATA_SYSTEM: { + Snapshot *pWorker = pCsr->pDb->pWorker; + if( (pCsr->flags & CURSOR_FLUSH_FREELIST) + && pWorker && pWorker->freelist.nEntry > pCsr->iFree + ){ + int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1; + FreelistEntry *pEntry = &pWorker->freelist.aEntry[iEntry]; + u32 i = ~((u32)(pEntry->iBlk)); + lsmPutU32(pCsr->pSystemVal, i); + pKey = pCsr->pSystemVal; + nKey = 4; + if( pEntry->iId>=0 ){ + eType = LSM_SYSTEMKEY | LSM_INSERT; + }else{ + eType = LSM_SYSTEMKEY | LSM_POINT_DELETE; + } } break; + } default: { int iPtr = iKey - CURSOR_DATA_SEGMENT; assert( iPtr>=0 ); if( iPtr==pCsr->nPtr ){ @@ -2252,14 +2260,15 @@ /* ** If the free-block list is not empty, then have this cursor visit a key ** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value ** blob containing the serialized free-block list. */ -static void multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){ - assert( pCsr ); - pCsr->pnOvfl = pnOvfl; - pCsr->flags |= CURSOR_NEW_SYSTEM; +static int multiCursorVisitFreelist(MultiCursor *pCsr){ + int rc = LSM_OK; + pCsr->flags |= CURSOR_FLUSH_FREELIST; + pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc); + return rc; } /* ** Allocate and return a new database cursor. */ @@ -2304,18 +2313,21 @@ *pnVal = 0; } break; } - case CURSOR_DATA_SYSTEM: - if( pCsr->flags & CURSOR_AT_FREELIST ){ - void *aVal; - rc = lsmCheckpointOverflow(pCsr->pDb, &aVal, pnVal, pCsr->pnOvfl); - assert( pCsr->pSystemVal==0 ); - *ppVal = pCsr->pSystemVal = aVal; + case CURSOR_DATA_SYSTEM: { + Snapshot *pWorker = pCsr->pDb->pWorker; + if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){ + int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1; + u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4]; + lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId); + *ppVal = aVal; + *pnVal = 8; } break; + } default: { int iPtr = iVal-CURSOR_DATA_SEGMENT; if( iPtrnPtr ){ SegmentPtr *pPtr = &pCsr->aPtr[iPtr]; @@ -2326,10 +2338,60 @@ } } } assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) ); + return rc; +} + +/* +** This function is called by worker connections to walk the part of the +** free-list stored within the LSM data structure. +*/ +int lsmSortedWalkFreelist( + lsm_db *pDb, /* Database handle */ + int (*x)(void *, int, i64), /* Callback function */ + void *pCtx /* First argument to pass to callback */ +){ + MultiCursor *pCsr; /* Cursor used to read db */ + int rc = LSM_OK; /* Return Code */ + Snapshot *pSnap = 0; + + assert( pDb->pWorker ); + rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap); + if( rc!=LSM_OK ) return rc; + + pCsr = multiCursorNew(pDb, &rc); + if( pCsr ){ + rc = multiCursorAddAll(pCsr, pSnap); + pCsr->flags |= CURSOR_IGNORE_DELETE; + } + + if( rc==LSM_OK ){ + rc = lsmMCursorLast(pCsr); + while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){ + void *pKey; int nKey; + void *pVal; int nVal; + + rc = lsmMCursorKey(pCsr, &pKey, &nKey); + if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal); + if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT; + + if( rc==LSM_OK ){ + int iBlk; + i64 iSnap; + iBlk = (int)(~(lsmGetU32((u8 *)pKey))); + iSnap = (i64)lsmGetU64((u8 *)pVal); + if( x(pCtx, iBlk, iSnap) ) break; + rc = lsmMCursorPrev(pCsr); + } + } + } + + lsmMCursorClose(pCsr); + lsmFreeSnapshot(pDb->pEnv, pSnap); + return rc; } int lsmSortedLoadFreelist( lsm_db *pDb, /* Database handle (must be worker) */ @@ -2448,14 +2510,11 @@ } if( rc==LSM_OK && pCsr->apTreeCsr[1] ){ rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast); } - if( pCsr->flags & CURSOR_NEW_SYSTEM ){ - assert( bLast==0 ); - pCsr->flags |= CURSOR_AT_FREELIST; - } + pCsr->iFree = 0; for(i=0; rc==LSM_OK && inPtr; i++){ SegmentPtr *pPtr = &pCsr->aPtr[i]; Level *pLvl = pPtr->pLevel; @@ -2624,12 +2683,11 @@ Pgno iPgno = 0; /* FC pointer value */ if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE; assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE ); - assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 ); - assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 ); + assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 ); assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel ); pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ); rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop); if( rc==LSM_OK && bStop==0 ){ @@ -2759,14 +2817,13 @@ rc = lsmTreeCursorPrev(pTreeCsr); }else{ rc = lsmTreeCursorNext(pTreeCsr); } }else if( iKey==CURSOR_DATA_SYSTEM ){ - assert( pCsr->flags & CURSOR_AT_FREELIST ); - assert( pCsr->flags & CURSOR_NEW_SYSTEM ); + assert( pCsr->flags & CURSOR_FLUSH_FREELIST ); assert( bReverse==0 ); - pCsr->flags &= ~CURSOR_AT_FREELIST; + pCsr->iFree++; }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){ assert( bReverse==0 && pCsr->pBtCsr ); rc = btreeCursorNext(pCsr->pBtCsr); }else{ rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse); @@ -3665,16 +3722,19 @@ ** The cursor passed as the first argument is being used as the input for ** a merge operation. When this function is called, *piFlags contains the ** database entry flags for the current entry. The entry about to be written ** to the output. ** +** Note that this function only has to work for cursors configured to +** iterate forwards (not backwards). */ static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){ int f = *piFlags; int iKey = pCsr->aTree[1]; int i; + assert( pCsr->flags & CURSOR_NEXT_OK ); if( pCsr->flags & CURSOR_IGNORE_DELETE ){ /* The ignore-delete flag is set when the output of the merge will form ** the oldest level in the database. In this case there is no point in ** retaining any range-delete flags. */ assert( (f & LSM_POINT_DELETE)==0 ); @@ -3686,11 +3746,12 @@ f |= (LSM_START_DELETE|LSM_END_DELETE); } } for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); inPtr; i++){ - if( pCsr->aPtr[i].eType & LSM_END_DELETE ){ + SegmentPtr *pPtr = &pCsr->aPtr[i]; + if( pPtr->pPg && (pPtr->eType & LSM_END_DELETE) ){ f |= (LSM_START_DELETE|LSM_END_DELETE); } } if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){ @@ -3714,10 +3775,15 @@ pSeg = &pMW->pLevel->lhs; /* Pull the next record out of the source cursor. */ lsmMCursorKey(pCsr, &pKey, &nKey); eType = pCsr->eType; + + if( eType & LSM_SYSTEMKEY ){ + int i; + i = 1; + } /* Figure out if the output record may have a different pointer value ** than the previous. This is the case if the current key is identical to ** a key that appears in the lowest level run being merged. If so, set ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, @@ -3816,21 +3882,24 @@ } static int sortedNewToplevel( lsm_db *pDb, /* Connection handle */ int eTree, /* One of the TREE_XXX constants */ - int *pnOvfl, /* OUT: Number of free-list entries stored */ int *pnWrite /* OUT: Number of database pages written */ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ int nWrite = 0; /* Number of database pages written */ + Freelist freelist; - assert( pnOvfl ); + assert( pDb->bUseFreelist==0 ); + pDb->pFreelist = &freelist; + pDb->bUseFreelist = 1; + memset(&freelist, 0, sizeof(freelist)); /* Allocate the new level structure to write to. */ pNext = lsmDbSnapshotLevel(pDb->pWorker); pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc); if( pNew ){ @@ -3842,12 +3911,14 @@ ** segment contains everything in the tree and pointers to the next segment ** in the database (if any). */ pCsr = multiCursorNew(pDb, &rc); if( pCsr ){ pCsr->pDb = pDb; - multiCursorVisitFreelist(pCsr, pnOvfl); - rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree); + rc = multiCursorVisitFreelist(pCsr); + if( rc==LSM_OK ){ + rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree); + } if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){ pDel = &pNext->lhs; rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr); } @@ -3883,36 +3954,47 @@ /* Do the work to create the new merged segment on disk */ if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr); while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){ rc = mergeWorkerStep(&mergeworker); } + assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst ); nWrite = mergeworker.nWork; mergeWorkerShutdown(&mergeworker, &rc); pNew->pMerge = 0; pNew->iAge = 0; } - /* Link the new level into the top of the tree. */ - if( rc==LSM_OK ){ - if( pDel ) pDel->iRoot = 0; - }else{ + if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){ + assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 ); lsmDbSnapshotSetLevel(pDb->pWorker, pNext); sortedFreeLevel(pDb->pEnv, pNew); - } + }else{ + if( pDel ) pDel->iRoot = 0; #if 0 - lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel"); + lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel"); #endif - if( rc==LSM_OK ){ + if( freelist.nEntry ){ + Freelist *p = &pDb->pWorker->freelist; + lsmFree(pDb->pEnv, p->aEntry); + memcpy(p, &freelist, sizeof(freelist)); + freelist.aEntry = 0; + }else{ + pDb->pWorker->freelist.nEntry = 0; + } + assertBtreeOk(pDb, &pNew->lhs); sortedInvokeWorkHook(pDb); } if( pnWrite ) *pnWrite = nWrite; pDb->pWorker->nWrite += nWrite; + pDb->pFreelist = 0; + pDb->bUseFreelist = 0; + lsmFree(pDb->pEnv, freelist.aEntry); return rc; } /* ** The nMerge levels in the LSM beginning with pLevel consist of a @@ -4303,11 +4385,11 @@ ** the database structure has changed. */ mergeWorkerShutdown(&mergeworker, &rc); if( rc==LSM_OK ) sortedInvokeWorkHook(pDb); #if 0 - lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work"); + lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work"); #endif assertBtreeOk(pDb, &pLevel->lhs); assertRunInOrder(pDb, &pLevel->lhs); /* If bFlush is true and the database is no longer considered "full", @@ -4364,26 +4446,24 @@ int nPage, /* Number of pages to write to disk */ int *pnWrite, /* OUT: Pages actually written to disk */ int *pbCkpt /* OUT: True if an auto-checkpoint is req. */ ){ int rc = LSM_OK; /* Return code */ - int nOvfl = 0; - int bFlush = 0; + int bDirty = 0; int nMax = nPage; /* Maximum pages to write to disk */ int nRem = nPage; int bCkpt = 0; - int bToplevel = 0; /* Open the worker 'transaction'. It will be closed before this function ** returns. */ assert( pDb->pWorker==0 ); rc = lsmBeginWork(pDb); - assert( rc!=8 ); if( rc!=LSM_OK ) return rc; /* If this connection is doing auto-checkpoints, set nMax (and nRem) so - ** that this call stops writing when the auto-checkpoint is due. */ + ** that this call stops writing when the auto-checkpoint is due. The + ** caller will do the checkpoint, then possibly call this function again. */ if( bShutdown==0 && pDb->nAutockpt ){ u32 nSync; u32 nUnsync; int nPgsz; int nMax; @@ -4400,57 +4480,63 @@ } /* If there exists in-memory data ready to be flushed to disk, attempt ** to flush it now. */ if( sortedTreeHasOld(pDb, &rc) ){ + /* sortedDbIsFull() returns non-zero if either (a) there are too many + ** levels in total in the db, or (b) there are too many levels with the + ** the same age in the db. Either way, call sortedWork() to merge + ** existing segments together until this condition is cleared. */ if( sortedDbIsFull(pDb) ){ int nPg = 0; rc = sortedWork(pDb, nRem, 0, 1, &nPg); nRem -= nPg; assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) ); - bToplevel = 1; } + if( rc==LSM_OK && nRem>0 ){ int nPg = 0; - rc = sortedNewToplevel(pDb, TREE_OLD, &nOvfl, &nPg); + rc = sortedNewToplevel(pDb, TREE_OLD, &nPg); nRem -= nPg; - if( rc==LSM_OK && pDb->nTransOpen>0 ){ - lsmTreeDiscardOld(pDb); + if( rc==LSM_OK ){ + if( pDb->nTransOpen>0 ){ + lsmTreeDiscardOld(pDb); + } + rc = lsmCheckpointSaveWorker(pDb, 1); } - bFlush = 1; - bToplevel = 0; } } /* If nPage is still greater than zero, do some merging. */ if( rc==LSM_OK && nRem>0 && bShutdown==0 ){ int nPg = 0; int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0); rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg); nRem -= nPg; - if( nPg ){ - bToplevel = 1; - nOvfl = 0; - } + if( nPg ) bDirty = 1; } - if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){ + /* If the in-memory part of the free-list is too large, write a new + ** top-level containing just the in-memory free-list entries to disk. */ + if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){ + int nPg = 0; while( rc==LSM_OK && sortedDbIsFull(pDb) ){ - int nPg = 0; rc = sortedWork(pDb, 16, 0, 1, &nPg); + nRem -= nPg; } - if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){ - rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, 0); + if( rc==LSM_OK ){ + rc = sortedNewToplevel(pDb, TREE_NONE, &nPg); } + nRem -= nPg; + if( nPg ) bDirty = 1; } - if( rc==LSM_OK && (nRem!=nMax) ){ - lsmFinishWork(pDb, bFlush, nOvfl, &rc); + if( rc==LSM_OK && bDirty ){ + lsmFinishWork(pDb, 0, &rc); }else{ int rcdummy = LSM_BUSY; - assert( rc!=LSM_OK || bFlush==0 ); - lsmFinishWork(pDb, 0, 0, &rcdummy); + lsmFinishWork(pDb, 0, &rcdummy); } assert( pDb->pWorker==0 ); if( rc==LSM_OK ){ if( pnWrite ) *pnWrite = (nMax - nRem); @@ -4563,21 +4649,20 @@ ** This function is only called during system shutdown. The contents of ** any in-memory trees present (old or current) are written out to disk. */ int lsmFlushTreeToDisk(lsm_db *pDb){ int rc; - int nOvfl = 0; rc = lsmBeginWork(pDb); while( rc==LSM_OK && sortedDbIsFull(pDb) ){ rc = sortedWork(pDb, 256, 0, 1, 0); } if( rc==LSM_OK ){ - rc = sortedNewToplevel(pDb, TREE_BOTH, &nOvfl, 0); + rc = sortedNewToplevel(pDb, TREE_BOTH, 0); } - lsmFinishWork(pDb, 1, nOvfl, &rc); + lsmFinishWork(pDb, 1, &rc); return rc; } /* ** Return a string representation of the segment passed as the only argument. @@ -4846,10 +4931,19 @@ if( nVal>0 && bValues ){ lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), ""); lsmStringAppendf(&str, " "); infoAppendBlob(&str, bHex, aVal, nVal); } + if( rtTopic(eType) ){ + int iBlk = (int)~lsmGetU32(aKey); + lsmStringAppendf(&str, " (block=%d", iBlk); + if( nVal>0 ){ + i64 iSnap = lsmGetU64(aVal); + lsmStringAppendf(&str, " snapshot=%lld", iSnap); + } + lsmStringAppendf(&str, ")"); + } lsmStringAppendf(&str, "\n"); } if( bData ){ lsmStringAppendf(&str, "\n-------------------"