Index: lsm-test/README ================================================================== --- lsm-test/README +++ lsm-test/README @@ -26,9 +26,14 @@ The difference from lsmtest2.c is that this file tests live-recovery (recovery from a failure that occurs while other clients are still running) whereas lsmtest2.c tests recovery from a system or power failure. + + lsmtest9.c: More data tests. These focus on testing that calling + lsm_work(nMerge=1) to compact the database does not corrupt it. + In other words, that databases containing block-redirects + can be read and written. Index: lsm-test/lsmtest.h ================================================================== --- lsm-test/lsmtest.h +++ lsm-test/lsmtest.h @@ -217,10 +217,13 @@ void test_data_3(const char *, const char *, int *pRc); void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *); void testCaseProgress(int, int, int, int *); int testCaseNDot(void); +void testCompareDb(Datasource *, int, int, TestDb *, TestDb *, int *); +int testControlDb(TestDb **ppDb); + typedef struct CksumDb CksumDb; CksumDb *testCksumArrayNew(Datasource *, int, int, int); char *testCksumArrayGet(CksumDb *, int); void testCksumArrayFree(CksumDb *); void testCaseStart(int *pRc, char *zFmt, ...); @@ -232,10 +235,13 @@ int testCksumDatabase(TestDb *pDb, char *zOut); int testCountDatabase(TestDb *pDb); void testCompareInt(int, int, int *); void testCompareStr(const char *z1, const char *z2, int *pRc); +/* lsmtest9.c */ +void test_data_4(const char *, const char *, int *pRc); + /* ** Similar to the Tcl_GetIndexFromObjStruct() Tcl library function. */ #define testArgSelect(w,x,y,z) testArgSelectX(w,x,sizeof(w[0]),y,z) Index: lsm-test/lsmtest1.c ================================================================== --- lsm-test/lsmtest1.c +++ lsm-test/lsmtest1.c @@ -90,11 +90,11 @@ ); testFree(zData); return zRet; } -static int testControlDb(TestDb **ppDb){ +int testControlDb(TestDb **ppDb){ #ifdef HAVE_KYOTOCABINET return tdb_open("kyotocabinet", "tmp.db", 1, ppDb); #else return tdb_open("sqlite3", ":memory:", 1, ppDb); #endif @@ -346,11 +346,11 @@ } testFree(zName); } } -static void testCompareDb( +void testCompareDb( Datasource *pData, int nData, int iSeed, TestDb *pControl, TestDb *pDb, ADDED lsm-test/lsmtest9.c Index: lsm-test/lsmtest9.c ================================================================== --- /dev/null +++ lsm-test/lsmtest9.c @@ -0,0 +1,138 @@ + +#include "lsmtest.h" + +#define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE +#define DATA_RANDOM TEST_DATASOURCE_RANDOM + +typedef struct Datatest4 Datatest4; + +/* +** Test overview: +** +** 1. Insert (Datatest4.nRec) records into a database. +** +** 2. Repeat (Datatest4.nRepeat) times: +** +** 2a. Delete 2/3 of the records in the database. +** +** 2b. Run lsm_work(nMerge=1). +** +** 2c. Insert as many records as were deleted in 2a. +** +** 2d. Check database content is as expected. +** +** 2e. If (Datatest4.bReopen) is true, close and reopen the database. +*/ +struct Datatest4 { + /* Datasource definition */ + DatasourceDefn defn; + + int nRec; + int nRepeat; + int bReopen; +}; + +static void doDataTest4( + const char *zSystem, /* Database system to test */ + Datatest4 *p, /* Structure containing test parameters */ + int *pRc /* OUT: Error code */ +){ + lsm_db *db = 0; + TestDb *pDb; + TestDb *pControl; + Datasource *pData; + int i; + int rc = 0; + int iDot = 0; + + int nRecOn3 = (p->nRec / 3); + int iData = 0; + + /* Start the test case, open a database and allocate the datasource. */ + rc = testControlDb(&pControl); + pDb = testOpen(zSystem, 1, &rc); + pData = testDatasourceNew(&p->defn); + if( rc==0 ) db = tdb_lsm(pDb); + + testWriteDatasourceRange(pControl, pData, iData, nRecOn3*3, &rc); + testWriteDatasourceRange(pDb, pData, iData, nRecOn3*3, &rc); + + for(i=0; rc==0 && inRepeat; i++){ + + testDeleteDatasourceRange(pControl, pData, iData, nRecOn3*2, &rc); + testDeleteDatasourceRange(pDb, pData, iData, nRecOn3*2, &rc); + + if( db ){ + int nDone; +#if 0 + fprintf(stderr, "lsm_work() start...\n"); fflush(stderr); +#endif + do { + nDone = 0; + rc = lsm_work(db, 1, (1<<30), &nDone); + }while( rc==0 && nDone>0 ); +#if 0 + fprintf(stderr, "lsm_work() done...\n"); fflush(stderr); +#endif + } + +if( i+1nRepeat ){ + iData += (nRecOn3*2); + testWriteDatasourceRange(pControl, pData, iData+nRecOn3, nRecOn3*2, &rc); + testWriteDatasourceRange(pDb, pData, iData+nRecOn3, nRecOn3*2, &rc); + + testCompareDb(pData, nRecOn3*3, iData, pControl, pDb, &rc); + + /* If Datatest4.bReopen is true, close and reopen the database */ + if( p->bReopen ){ + testReopen(&pDb, &rc); + if( rc==0 ) db = tdb_lsm(pDb); + } +} + + /* Update the progress dots... */ + testCaseProgress(i, p->nRepeat, testCaseNDot(), &iDot); + } + + testClose(&pDb); + testClose(&pControl); + testDatasourceFree(pData); + testCaseFinish(rc); + *pRc = rc; +} + +static char *getName4(const char *zSystem, Datatest4 *pTest){ + char *zRet; + char *zData; + zData = testDatasourceName(&pTest->defn); + zRet = testMallocPrintf("data4.%s.%s.%d.%d.%d", + zSystem, zData, pTest->nRec, pTest->nRepeat, pTest->bReopen + ); + testFree(zData); + return zRet; +} + +void test_data_4( + const char *zSystem, /* Database system name */ + const char *zPattern, /* Run test cases that match this pattern */ + int *pRc /* IN/OUT: Error code */ +){ + Datatest4 aTest[] = { + /* defn, nRec, nRepeat, bReopen */ + { {DATA_RANDOM, 20,25, 500,600}, 10000, 10, 0 }, + { {DATA_RANDOM, 20,25, 500,600}, 10000, 10, 1 }, + }; + + int i; + + for(i=0; *pRc==LSM_OK && i0, this buffer contains a copy of the largest key that has @@ -512,10 +526,11 @@ struct Snapshot { Database *pDatabase; /* Database this snapshot belongs to */ Level *pLevel; /* Pointer to level 0 of snapshot (or NULL) */ i64 iId; /* Snapshot id */ i64 iLogOff; /* Log file offset */ + Redirect redirect; /* Block redirection array */ /* Used by worker snapshots only */ int nBlock; /* Number of blocks in database file */ Pgno aiAppend[LSM_APPLIST_SZ]; /* Append point list */ Freelist freelist; /* Free block list */ @@ -652,11 +667,11 @@ void lsmSortedSplitkey(lsm_db *, Level *, int *); /* Reading sorted run content. */ int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg); -int lsmFsDbPageGet(FileSystem *, Pgno, Page **); +int lsmFsDbPageGet(FileSystem *, Segment *, Pgno, Page **); int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **); u8 *lsmFsPageData(Page *, int *); int lsmFsPageRelease(Page *); int lsmFsPagePersist(Page *); @@ -706,10 +721,13 @@ void lsmEnvShmUnmap(lsm_env *, lsm_file *, int); void lsmEnvSleep(lsm_env *, int); int lsmFsReadSyncedId(lsm_db *db, int, i64 *piVal); + +int lsmFsSegmentContainsPg(FileSystem *pFS, Segment *, Pgno, int *); +Pgno lsmFsRedirectPage(FileSystem *, Redirect *, Pgno); /* ** End of functions from "lsm_file.c". **************************************************************************/ @@ -825,11 +843,11 @@ Level *lsmDbSnapshotLevel(Snapshot *); void lsmDbSnapshotSetLevel(Snapshot *, Level *); void lsmDbRecoveryComplete(lsm_db *, int); -int lsmBlockAllocate(lsm_db *, int *); +int lsmBlockAllocate(lsm_db *, int, int *); int lsmBlockFree(lsm_db *, int); int lsmBlockRefree(lsm_db *, int); void lsmFreelistDeltaBegin(lsm_db *); void lsmFreelistDeltaEnd(lsm_db *); Index: src/lsm_ckpt.c ================================================================== --- src/lsm_ckpt.c +++ src/lsm_ckpt.c @@ -65,10 +65,17 @@ ** is 64-bits - 2 integers) ** 5b. Cell number of next cell to read during merge ** 7. Page containing current split-key (64-bits - 2 integers). ** 8. Cell within page containing current split-key. ** 9. Current pointer value (64-bits - 2 integers). +** +** The block redirect array: +** +** 1. Number of redirections (maximum LSM_MAX_BLOCK_REDIRECTS). +** 2. For each redirection: +** a. "from" block number +** b. "to" block number ** ** The in-memory freelist entries. Each entry is either an insert or a ** delete. The in-memory freelist is to the free-block-list as the ** in-memory tree is to the users database content. ** @@ -417,10 +424,17 @@ iLevel = 0; for(pLevel=lsmDbSnapshotLevel(pSnap); iLevelpNext){ ckptExportLevel(pLevel, &ckpt, &iOut, &rc); iLevel++; } + + /* Write the block-redirect list */ + ckptSetValue(&ckpt, iOut++, pSnap->redirect.n, &rc); + for(i=0; iredirect.n; i++){ + ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iFrom, &rc); + ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iTo, &rc); + } /* Write the freelist */ assert( pSnap->freelist.nEntry<=pDb->nMaxFreelist ); if( rc==LSM_OK ){ int nFree = pSnap->freelist.nEntry; @@ -906,11 +920,13 @@ } rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker); if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase; +#if 0 assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); +#endif return rc; } int lsmCheckpointDeserialize( lsm_db *pDb, @@ -921,10 +937,11 @@ int rc = LSM_OK; Snapshot *pNew; pNew = (Snapshot *)lsmMallocZeroRc(pDb->pEnv, sizeof(Snapshot), &rc); if( rc==LSM_OK ){ + Level *pLvl; int nFree; int i; int nLevel = (int)aCkpt[CKPT_HDR_NLEVEL]; int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE; @@ -937,13 +954,33 @@ /* Make a copy of the append-list */ for(i=0; iaiAppend[i] = ckptRead64(a); } + + /* Read the block-redirect list */ + pNew->redirect.n = aCkpt[iIn++]; + if( pNew->redirect.n ){ + pNew->redirect.a = lsmMallocZeroRc(pDb->pEnv, + (sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS), &rc + ); + if( rc==LSM_OK ){ + for(i=0; iredirect.n; i++){ + pNew->redirect.a[i].iFrom = aCkpt[iIn++]; + pNew->redirect.a[i].iTo = aCkpt[iIn++]; + } + } + for(pLvl=pNew->pLevel; pLvl->pNext; pLvl=pLvl->pNext); + if( pLvl->nRight ){ + pLvl->aRhs[pLvl->nRight-1].pRedirect = &pNew->redirect; + }else{ + pLvl->lhs.pRedirect = &pNew->redirect; + } + } /* Copy the free-list */ - if( bInclFreelist ){ + if( rc==LSM_OK && bInclFreelist ){ nFree = aCkpt[iIn++]; if( nFree ){ pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc( pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc ); @@ -1011,11 +1048,12 @@ ShmHeader *pShm = pDb->pShmhdr; void *p = 0; int n = 0; int rc; - rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n); + pSnap->iId++; + rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); memcpy(pShm->aSnap2, p, n); Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -877,44 +877,94 @@ if( rc!=LSM_OK ) return rc; } return lsmEnvSync(pFS->pEnv, pFS->fdDb); } -static int fsPageGet(FileSystem *, Pgno, int, Page **, int *); +static int fsPageGet(FileSystem *, Segment *, Pgno, int, Page **, int *); + +static int fsRedirectBlock(Redirect *p, int iBlk){ + if( p ){ + int i; + for(i=0; in; i++){ + if( iBlk==p->a[i].iFrom ) return p->a[i].iTo; + } + } + assert( iBlk!=0 ); + return iBlk; +} + +Pgno lsmFsRedirectPage(FileSystem *pFS, Redirect *pRedir, Pgno iPg){ + Pgno iReal = iPg; + + if( pRedir ){ + const int nPagePerBlock = ( + pFS->pCompress ? pFS->nBlocksize : (pFS->nBlocksize / pFS->nPagesize) + ); + int iBlk = fsPageToBlock(pFS, iPg); + int i; + for(i=0; in; i++){ + int iFrom = pRedir->a[i].iFrom; + if( iFrom>iBlk ) break; + if( iFrom==iBlk ){ + int iTo = pRedir->a[i].iTo; + iReal = iPg - (Pgno)(iFrom - iTo) * nPagePerBlock; + if( iTo==1 ){ + iReal += (fsFirstPageOnBlock(pFS, 1)-1); + } + break; + } + } + } + + assert( iReal!=0 ); + return iReal; +} /* ** Parameter iBlock is a database file block. This function reads the value ** stored in the blocks "next block" pointer and stores it in *piNext. ** LSM_OK is returned if everything is successful, or an LSM error code ** otherwise. */ static int fsBlockNext( FileSystem *pFS, /* File-system object handle */ + Segment *pSeg, /* Use this segment for block redirects */ int iBlock, /* Read field from this block */ int *piNext /* OUT: Next block in linked list */ ){ int rc; + int iRead; /* Read block from here */ + + if( pSeg ){ + iRead = fsRedirectBlock(pSeg->pRedirect, iBlock); + }else{ + iRead = iBlock; + } assert( pFS->bUseMmap==0 || pFS->pCompress==0 ); if( pFS->pCompress ){ i64 iOff; /* File offset to read data from */ u8 aNext[4]; /* 4-byte pointer read from db file */ - iOff = (i64)iBlock * pFS->nBlocksize - sizeof(aNext); + iOff = (i64)iRead * pFS->nBlocksize - sizeof(aNext); rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext)); if( rc==LSM_OK ){ *piNext = (int)lsmGetU32(aNext); } }else{ const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); Page *pLast; - rc = fsPageGet(pFS, iBlock*nPagePerBlock, 0, &pLast, 0); + rc = fsPageGet(pFS, 0, iRead*nPagePerBlock, 0, &pLast, 0); if( rc==LSM_OK ){ *piNext = lsmGetU32(&pLast->aData[pFS->nPagesize-4]); lsmFsPageRelease(pLast); } } + + if( pSeg ){ + *piNext = fsRedirectBlock(pSeg->pRedirect, *piNext); + } return rc; } /* ** Return the page number of the last page on the same block as page iPg. @@ -926,10 +976,11 @@ /* ** This function is only called in compressed database mode. */ static int fsReadData( FileSystem *pFS, /* File-system handle */ + Segment *pSeg, /* Block redirection */ i64 iOff, /* Read data from this offset */ u8 *aData, /* Buffer to read data into */ int nData /* Number of bytes to read */ ){ i64 iEob; /* End of block */ @@ -943,11 +994,11 @@ rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead); if( rc==LSM_OK && nRead!=nData ){ int iBlk; - rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk); + rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk); if( rc==LSM_OK ){ i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk); rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff2, &aData[nRead], nData-nRead); } } @@ -961,10 +1012,11 @@ ** LSM_OK is returned if everything is successful, or an LSM error code ** otherwise. */ static int fsBlockPrev( FileSystem *pFS, /* File-system object handle */ + Segment *pSeg, /* Use this segment for block redirects */ int iBlock, /* Read field from this block */ int *piPrev /* OUT: Previous block in linked list */ ){ int rc = LSM_OK; /* Return code */ @@ -974,11 +1026,12 @@ if( pFS->pCompress ){ i64 iOff = fsFirstPageOnBlock(pFS, iBlock) - 4; u8 aPrev[4]; /* 4-byte pointer read from db file */ rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aPrev, sizeof(aPrev)); if( rc==LSM_OK ){ - *piPrev = (int)lsmGetU32(aPrev); + Redirect *pRedir = (pSeg ? pSeg->pRedirect : 0); + *piPrev = fsRedirectBlock(pRedir, (int)lsmGetU32(aPrev)); } }else{ assert( 0 ); } return rc; @@ -999,11 +1052,17 @@ nByte += (aBuf[2] & 0x7F); *pbFree = !(aBuf[1] & 0x80); return nByte; } -static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){ +static int fsSubtractOffset( + FileSystem *pFS, + Segment *pSeg, + i64 iOff, + int iSub, + i64 *piRes +){ i64 iStart; int iBlk = 0; int rc; assert( pFS->pCompress ); @@ -1012,16 +1071,22 @@ if( (iOff-iSub)>=iStart ){ *piRes = (iOff-iSub); return LSM_OK; } - rc = fsBlockPrev(pFS, fsPageToBlock(pFS, iOff), &iBlk); + rc = fsBlockPrev(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk); *piRes = fsLastPageOnBlock(pFS, iBlk) - iSub + (iOff - iStart + 1); return rc; } -static int fsAddOffset(FileSystem *pFS, i64 iOff, int iAdd, i64 *piRes){ +static int fsAddOffset( + FileSystem *pFS, + Segment *pSeg, + i64 iOff, + int iAdd, + i64 *piRes +){ i64 iEob; int iBlk; int rc; assert( pFS->pCompress ); @@ -1030,11 +1095,11 @@ if( (iOff+iAdd)<=iEob ){ *piRes = (iOff+iAdd); return LSM_OK; } - rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk); + rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk); *piRes = fsFirstPageOnBlock(pFS, iBlk) + iAdd - (iEob - iOff + 1); return rc; } static int fsAllocateBuffer(FileSystem *pFS, int bWrite){ @@ -1068,10 +1133,11 @@ ** ** LSM_OK is returned if successful, or an LSM error code otherwise. */ static int fsReadPagedata( FileSystem *pFS, /* File-system handle */ + Segment *pSeg, /* pPg is part of this segment */ Page *pPg, /* Page to read and uncompress data for */ int *pnSpace /* OUT: Total bytes of free space */ ){ lsm_compress *p = pFS->pCompress; i64 iOff = pPg->iPg; @@ -1080,11 +1146,11 @@ assert( p && pPg->nCompress==0 ); if( fsAllocateBuffer(pFS, 0) ) return LSM_NOMEM; - rc = fsReadData(pFS, iOff, aSz, sizeof(aSz)); + rc = fsReadData(pFS, pSeg, iOff, aSz, sizeof(aSz)); if( rc==LSM_OK ){ int bFree; if( aSz[0] & 0x80 ){ pPg->nCompress = (int)getRecordSize(aSz, &bFree); @@ -1097,16 +1163,16 @@ *pnSpace = pPg->nCompress + sizeof(aSz)*2; }else{ rc = LSM_CORRUPT_BKPT; } }else{ - rc = fsAddOffset(pFS, iOff, 3, &iOff); + rc = fsAddOffset(pFS, pSeg, iOff, 3, &iOff); if( rc==LSM_OK ){ if( pPg->nCompress>pFS->nBuffer ){ rc = LSM_CORRUPT_BKPT; }else{ - rc = fsReadData(pFS, iOff, pFS->aIBuffer, pPg->nCompress); + rc = fsReadData(pFS, pSeg, iOff, pFS->aIBuffer, pPg->nCompress); } if( rc==LSM_OK ){ int n = pFS->nPagesize; rc = p->xUncompress(p->pCtx, (char *)pPg->aData, &n, @@ -1125,32 +1191,40 @@ /* ** Return a handle for a database page. */ static int fsPageGet( FileSystem *pFS, /* File-system handle */ + Segment *pSeg, /* Block redirection to use (or NULL) */ Pgno iPg, /* Page id */ int noContent, /* True to not load content from disk */ Page **ppPg, /* OUT: New page handle */ int *pnSpace /* OUT: Bytes of free space */ ){ Page *p; int iHash; int rc = LSM_OK; + /* In most cases iReal is the same as iPg. Except, if pSeg->pRedirect is + ** not NULL, and the block containing iPg has been redirected, then iReal + ** is the page number after redirection. */ + Pgno iReal = lsmFsRedirectPage(pFS, (pSeg ? pSeg->pRedirect : 0), iPg); + assert( iPg>=fsFirstPageOnBlock(pFS, 1) ); + assert( iReal>=fsFirstPageOnBlock(pFS, 1) ); *ppPg = 0; assert( pFS->bUseMmap==0 || pFS->pCompress==0 ); if( pFS->bUseMmap ){ Page *pTest; - i64 iEnd = (i64)iPg * pFS->nPagesize; + i64 iEnd = (i64)iReal * pFS->nPagesize; fsGrowMapping(pFS, iEnd, &rc); if( rc!=LSM_OK ) return rc; p = 0; for(pTest=pFS->pWaiting; pTest; pTest=pTest->pNextWaiting){ - if( pTest->iPg==iPg ){ + if( pTest->iPg==iReal ){ + assert( iReal==iPg ); p = pTest; p->nRef++; *ppPg = p; return LSM_OK; } @@ -1163,25 +1237,25 @@ p = lsmMallocZeroRc(pFS->pEnv, sizeof(Page), &rc); if( rc ) return rc; fsPageAddToLru(pFS, p); p->pFS = pFS; } - p->aData = &((u8 *)pFS->pMap)[pFS->nPagesize * (i64)(iPg-1)]; - p->iPg = iPg; + p->aData = &((u8 *)pFS->pMap)[pFS->nPagesize * (iReal-1)]; + p->iPg = iReal; }else{ /* Search the hash-table for the page */ - iHash = fsHashKey(pFS->nHash, iPg); + iHash = fsHashKey(pFS->nHash, iReal); for(p=pFS->apHash[iHash]; p; p=p->pHashNext){ - if( p->iPg==iPg) break; + if( p->iPg==iReal) break; } if( p==0 ){ rc = fsPageBuffer(pFS, 1, &p); if( rc==LSM_OK ){ int nSpace = 0; - p->iPg = iPg; + p->iPg = iReal; p->nRef = 0; p->pFS = pFS; assert( p->flags==0 || p->flags==PAGE_FREE ); #ifdef LSM_DEBUG @@ -1188,14 +1262,14 @@ memset(p->aData, 0x56, pFS->nPagesize); #endif assert( p->pLruNext==0 && p->pLruPrev==0 ); if( noContent==0 ){ if( pFS->pCompress ){ - rc = fsReadPagedata(pFS, p, &nSpace); + rc = fsReadPagedata(pFS, pSeg, p, &nSpace); }else{ int nByte = pFS->nPagesize; - i64 iOff = (i64)(iPg-1) * pFS->nPagesize; + i64 iOff = (i64)(iReal-1) * pFS->nPagesize; rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, p->aData, nByte); } pFS->nRead++; } @@ -1219,13 +1293,13 @@ || (rc!=LSM_OK && p==0) ); } if( rc==LSM_OK && p ){ - if( pFS->pCompress==0 && (fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg)) ){ + if( pFS->pCompress==0 && (fsIsLast(pFS, iReal) || fsIsFirst(pFS, iReal)) ){ p->nData = pFS->nPagesize - 4; - if( fsIsFirst(pFS, iPg) && p->nRef==0 ){ + if( fsIsFirst(pFS, iReal) && p->nRef==0 ){ p->aData += 4; p->flags |= PAGE_HASPREV; } }else{ p->nData = pFS->nPagesize; @@ -1361,17 +1435,22 @@ /* Mark all blocks currently used by this sorted run as free */ while( iBlk && rc==LSM_OK ){ int iNext = 0; if( iBlk!=iLastBlk ){ - rc = fsBlockNext(pFS, iBlk, &iNext); + rc = fsBlockNext(pFS, pDel, iBlk, &iNext); }else if( bZero==0 && pDel->iLastPg!=fsLastPageOnBlock(pFS, iLastBlk) ){ break; } rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk); iBlk = iNext; } + + if( pDel->pRedirect ){ + assert( pDel->pRedirect==&pSnapshot->redirect ); + pSnapshot->redirect.n = 0; + } if( bZero ) memset(pDel, 0, sizeof(Segment)); } return LSM_OK; } @@ -1386,10 +1465,32 @@ } } return iRet; } +#ifndef NDEBUG +/* +** Return true if page iPg, which is a part of segment p, lies on +** a redirected block. +*/ +static int fsPageRedirects(FileSystem *pFS, Segment *p, Pgno iPg){ + return (iPg!=0 && iPg!=lsmFsRedirectPage(pFS, p->pRedirect, iPg)); +} + +/* +** Return true if the second argument is not NULL and any of the first +** last or root pages lie on a redirected block. +*/ +static int fsSegmentRedirects(FileSystem *pFS, Segment *p){ + return (p && ( + fsPageRedirects(pFS, p, p->iFirst) + || fsPageRedirects(pFS, p, p->iRoot) + || fsPageRedirects(pFS, p, p->iLastPg) + )); +} +#endif + /* ** Argument aPgno is an array of nPgno page numbers. All pages belong to ** the segment pRun. This function gobbles from the start of the run to the ** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is ** the new first page of the run). @@ -1404,10 +1505,13 @@ FileSystem *pFS = pDb->pFS; Snapshot *pSnapshot = pDb->pWorker; int iBlk; assert( pRun->nSize>0 ); + assert( 0==fsSegmentRedirects(pFS, pRun) ); + assert( nPgno>0 && 0==fsPageRedirects(pFS, pRun, aPgno[0]) ); + iBlk = fsPageToBlock(pFS, pRun->iFirst); pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk)); while( rc==LSM_OK ){ int iNext = 0; @@ -1414,11 +1518,11 @@ Pgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno); if( iFirst ){ pRun->iFirst = iFirst; break; } - rc = fsBlockNext(pFS, iBlk, &iNext); + rc = fsBlockNext(pFS, pRun, iBlk, &iNext); if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, pRun, iBlk); pRun->nSize -= ( 1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk) ); iBlk = iNext; @@ -1426,10 +1530,24 @@ pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk)); assert( pRun->nSize>0 ); } +/* +** This function is only used in compressed database mode. +** +** Argument iPg is the page number (byte offset) of a page within segment +** pSeg. The page record, including all headers, is nByte bytes in size. +** Before returning, set *piNext to the page number of the next page in +** the segment, or to zero if iPg is the last. +** +** In other words, do: +** +** *piNext = iPg + nByte; +** +** But take block overflow and redirection into account. +*/ static int fsNextPageOffset( FileSystem *pFS, /* File system object */ Segment *pSeg, /* Segment to move within */ Pgno iPg, /* Offset of current page */ int nByte, /* Size of current page including headers */ @@ -1438,28 +1556,33 @@ Pgno iNext; int rc; assert( pFS->pCompress ); - rc = fsAddOffset(pFS, iPg, nByte-1, &iNext); + rc = fsAddOffset(pFS, pSeg, iPg, nByte-1, &iNext); if( pSeg && iNext==pSeg->iLastPg ){ iNext = 0; }else if( rc==LSM_OK ){ - rc = fsAddOffset(pFS, iNext, 1, &iNext); + rc = fsAddOffset(pFS, pSeg, iNext, 1, &iNext); } *piNext = iNext; return rc; } -static int fsGetPageBefore(FileSystem *pFS, i64 iOff, Pgno *piPrev){ +static int fsGetPageBefore( + FileSystem *pFS, + Segment *pSeg, + i64 iOff, + Pgno *piPrev +){ u8 aSz[3]; int rc; i64 iRead; - rc = fsSubtractOffset(pFS, iOff, sizeof(aSz), &iRead); - if( rc==LSM_OK ) rc = fsReadData(pFS, iRead, aSz, sizeof(aSz)); + rc = fsSubtractOffset(pFS, pSeg, iOff, sizeof(aSz), &iRead); + if( rc==LSM_OK ) rc = fsReadData(pFS, pSeg, iRead, aSz, sizeof(aSz)); if( rc==LSM_OK ){ int bFree; int nSz; if( aSz[2] & 0x80 ){ @@ -1466,11 +1589,11 @@ nSz = getRecordSize(aSz, &bFree) + sizeof(aSz)*2; }else{ nSz = (int)(aSz[2] & 0x7F); bFree = 1; } - rc = fsSubtractOffset(pFS, iOff, nSz, piPrev); + rc = fsSubtractOffset(pFS, pSeg, iOff, nSz, piPrev); } return rc; } @@ -1498,10 +1621,11 @@ int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){ int rc = LSM_OK; FileSystem *pFS = pPg->pFS; Pgno iPg = pPg->iPg; + assert( 0==fsSegmentRedirects(pFS, pRun) ); if( pFS->pCompress ){ int nSpace = pPg->nCompress + 2*3; do { if( eDir>0 ){ @@ -1508,24 +1632,25 @@ rc = fsNextPageOffset(pFS, pRun, iPg, nSpace, &iPg); }else{ if( iPg==pRun->iFirst ){ iPg = 0; }else{ - rc = fsGetPageBefore(pFS, iPg, &iPg); + rc = fsGetPageBefore(pFS, pRun, iPg, &iPg); } } nSpace = 0; if( iPg!=0 ){ - rc = fsPageGet(pFS, iPg, 0, ppNext, &nSpace); + rc = fsPageGet(pFS, pRun, iPg, 0, ppNext, &nSpace); assert( (*ppNext==0)==(rc!=LSM_OK || nSpace>0) ); }else{ *ppNext = 0; } }while( nSpace>0 && rc==LSM_OK ); }else{ + Redirect *pRedir = pRun ? pRun->pRedirect : 0; assert( eDir==1 || eDir==-1 ); if( eDir<0 ){ if( pRun && iPg==pRun->iFirst ){ *ppNext = 0; return LSM_OK; @@ -1534,20 +1659,27 @@ iPg = fsLastPageOnBlock(pFS, lsmGetU32(&pPg->aData[-4])); }else{ iPg--; } }else{ - if( pRun && iPg==pRun->iLastPg ){ - *ppNext = 0; - return LSM_OK; - }else if( fsIsLast(pFS, iPg) ){ - iPg = fsFirstPageOnBlock(pFS, lsmGetU32(&pPg->aData[pFS->nPagesize-4])); + if( pRun ){ + if( iPg==pRun->iLastPg ){ + *ppNext = 0; + return LSM_OK; + } + } + + if( fsIsLast(pFS, iPg) ){ + int iBlk = fsRedirectBlock( + pRedir, lsmGetU32(&pPg->aData[pFS->nPagesize-4]) + ); + iPg = fsFirstPageOnBlock(pFS, iBlk); }else{ iPg++; } } - rc = fsPageGet(pFS, iPg, 0, ppNext, 0); + rc = fsPageGet(pFS, pRun, iPg, 0, ppNext, 0); } return rc; } @@ -1590,10 +1722,12 @@ *ppOut = 0; int iApp = 0; int iNext = 0; Segment *p = &pLvl->lhs; int iPrev = p->iLastPg; + + assert( p->pRedirect==0 ); if( pFS->pCompress || bDefer ){ /* In compressed database mode the page is not assigned a page number ** or location in the database file at this point. This will be done ** by the lsmFsPagePersist() call. */ @@ -1613,11 +1747,11 @@ }else{ if( iPrev==0 ){ iApp = findAppendPoint(pFS, pLvl); }else if( fsIsLast(pFS, iPrev) ){ int iNext; - rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPrev), &iNext); + rc = fsBlockNext(pFS, 0, fsPageToBlock(pFS, iPrev), &iNext); if( rc!=LSM_OK ) return rc; iApp = fsFirstPageOnBlock(pFS, iNext); }else{ iApp = iPrev + 1; } @@ -1625,11 +1759,11 @@ /* If this is the first page allocated, or if the page allocated is the ** last in the block, also allocate the next block here. */ if( iApp==0 || fsIsLast(pFS, iApp) ){ int iNew; /* New block number */ - rc = lsmBlockAllocate(pFS->pDb, &iNew); + rc = lsmBlockAllocate(pFS->pDb, 0, &iNew); if( rc!=LSM_OK ) return rc; if( iApp==0 ){ iApp = fsFirstPageOnBlock(pFS, iNew); }else{ iNext = fsFirstPageOnBlock(pFS, iNew); @@ -1636,11 +1770,11 @@ } } /* Grab the new page. */ pPg = 0; - rc = fsPageGet(pFS, iApp, 1, &pPg, 0); + rc = fsPageGet(pFS, 0, iApp, 1, &pPg, 0); assert( rc==LSM_OK || pPg==0 ); /* If this is the first or last page of a block, fill in the pointer ** value at the end of the new page. */ if( rc==LSM_OK ){ @@ -1665,10 +1799,11 @@ ** Mark the sorted run passed as the second argument as finished. */ int lsmFsSortedFinish(FileSystem *pFS, Segment *p){ int rc = LSM_OK; if( p && p->iLastPg ){ + assert( p->pRedirect==0 ); /* Check if the last page of this run happens to be the last of a block. ** If it is, then an extra block has already been allocated for this run. ** Shift this extra block back to the free-block list. ** @@ -1684,19 +1819,19 @@ break; } } }else if( pFS->pCompress==0 ){ Page *pLast; - rc = fsPageGet(pFS, p->iLastPg, 0, &pLast, 0); + rc = fsPageGet(pFS, 0, p->iLastPg, 0, &pLast, 0); if( rc==LSM_OK ){ int iBlk = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]); lsmBlockRefree(pFS->pDb, iBlk); lsmFsPageRelease(pLast); } }else{ int iBlk = 0; - rc = fsBlockNext(pFS, fsPageToBlock(pFS, p->iLastPg), &iBlk); + rc = fsBlockNext(pFS, p, fsPageToBlock(pFS, p->iLastPg), &iBlk); if( rc==LSM_OK ){ lsmBlockRefree(pFS->pDb, iBlk); } } } @@ -1704,13 +1839,13 @@ } /* ** Obtain a reference to page number iPg. */ -int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){ +int lsmFsDbPageGet(FileSystem *pFS, Segment *pSeg, Pgno iPg, Page **ppPg){ assert( pFS ); - return fsPageGet(pFS, iPg, 0, ppPg, 0); + return fsPageGet(pFS, pSeg, iPg, 0, ppPg, 0); } /* ** Obtain a reference to the last page in the segment passed as the ** second argument. @@ -1721,18 +1856,18 @@ if( pFS->pCompress ){ int nSpace; iPg++; do { nSpace = 0; - rc = fsGetPageBefore(pFS, iPg, &iPg); + rc = fsGetPageBefore(pFS, pSeg, iPg, &iPg); if( rc==LSM_OK ){ - rc = fsPageGet(pFS, iPg, 0, ppPg, &nSpace); + rc = fsPageGet(pFS, pSeg, iPg, 0, ppPg, &nSpace); } }while( rc==LSM_OK && nSpace>0 ); }else{ - rc = fsPageGet(pFS, iPg, 0, ppPg, 0); + rc = fsPageGet(pFS, pSeg, iPg, 0, ppPg, 0); } return rc; } /* @@ -1828,10 +1963,85 @@ ** Return true if page is currently writable. */ int lsmFsPageWritable(Page *pPg){ return (pPg->flags & PAGE_DIRTY) ? 1 : 0; } + +static void fsMovePage( + FileSystem *pFS, + Segment *pSeg, + int iTo, + int iFrom, + Pgno *piPg +){ + Pgno iPg = *piPg; + if( iFrom==fsPageToBlock(pFS, iPg) ){ + const int nPagePerBlock = ( + pFS->pCompress ? pFS ->nBlocksize : (pFS->nBlocksize / pFS->nPagesize) + ); + *piPg = iPg - (Pgno)(iFrom - iTo) * nPagePerBlock; + } +} + +/* +** Copy the contents of block iFrom to block iTo. +** +** It is safe to assume that there are no outstanding references to pages +** on block iTo. And that block iFrom is not currently being written. In +** other words, the data can be read and written directly. +*/ +int lsmFsMoveBlock(FileSystem *pFS, Segment *pSeg, int iTo, int iFrom){ + Snapshot *p = pFS->pDb->pWorker; + int rc = LSM_OK; + + i64 iFromOff = (i64)(iFrom-1) * pFS->nBlocksize; + i64 iToOff = (i64)(iTo-1) * pFS->nBlocksize; + + assert( iTo!=1 ); + assert( iFrom>iTo ); + + if( pFS->bUseMmap ){ + fsGrowMapping(pFS, (i64)iFrom * pFS->nBlocksize, &rc); + if( rc==LSM_OK ){ + u8 *aMap = (u8 *)(pFS->pMap); + memcpy(&aMap[iToOff], &aMap[iFromOff], pFS->nBlocksize); + } + }else{ + int nSz = pFS->nPagesize; + u8 *aData = (u8 *)lsmMallocRc(pFS->pEnv, nSz, &rc); + if( rc==LSM_OK ){ + const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); + int i; + for(i=0; rc==LSM_OK && ipEnv, pFS->fdDb, iOff, aData, nSz); + if( rc==LSM_OK ){ + iOff = iToOff + i*nSz; + rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, nSz); + } + } + } + } + + /* Update append-point list if necessary */ + if( rc==LSM_OK ){ + int i; + for(i=0; iaiAppend[i])==iFrom ){ + const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); + p->aiAppend[i] -= (i64)(iFrom-iTo) * nPagePerBlock; + } + } + } + + /* Update the Segment structure itself */ + fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iFirst); + fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iLastPg); + fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iRoot); + + return rc; +} /* ** Append raw data to a segment. This function is only used in compressed ** database mode. */ @@ -1855,11 +2065,11 @@ ** or allocate a new block. */ if( iApp==1 ){ pSeg->iFirst = iApp = findAppendPoint(pFS, 0); if( iApp==0 ){ int iBlk; - rc = lsmBlockAllocate(pFS->pDb, &iBlk); + rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk); pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk); } } iRet = iApp; @@ -1884,11 +2094,11 @@ u8 aPtr[4]; /* Space to serialize a u32 */ int iBlk; /* New block number */ if( nWrite>0 ){ /* Allocate a new block. */ - rc = lsmBlockAllocate(pFS->pDb, &iBlk); + rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk); /* Set the "next" pointer on the old block */ if( rc==LSM_OK ){ assert( iApp==(fsPageToBlock(pFS, iApp)*pFS->nBlocksize)-4 ); lsmPutU32(aPtr, iBlk); @@ -1904,11 +2114,12 @@ if( nRem>0 ) iApp = iWrite; } }else{ /* The next block is already allocated. */ assert( nRem>0 ); - rc = fsBlockNext(pFS, fsPageToBlock(pFS, iApp), &iBlk); + assert( pSeg->pRedirect==0 ); + rc = fsBlockNext(pFS, 0, fsPageToBlock(pFS, iApp), &iBlk); iRet = iApp = fsFirstPageOnBlock(pFS, iBlk); } /* Write the remaining data into the new block */ if( rc==LSM_OK && nRem>0 ){ @@ -1965,20 +2176,21 @@ ** allocated). In this case set *piPrev to tell the caller to set ** the "previous block" pointer in the first 4 bytes of the page. */ int iNext; int iBlk = fsPageToBlock(pFS, iPrev); - rc = fsBlockNext(pFS, iBlk, &iNext); + assert( pSeg->pRedirect==0 ); + rc = fsBlockNext(pFS, 0, iBlk, &iNext); if( rc!=LSM_OK ) return rc; *piNew = fsFirstPageOnBlock(pFS, iNext); *piPrev = iBlk; }else{ *piNew = iPrev+1; if( fsIsLast(pFS, *piNew) ){ /* Allocate the next block here. */ int iBlk; - rc = lsmBlockAllocate(pFS->pDb, &iBlk); + rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk); if( rc!=LSM_OK ) return rc; *piNext = iBlk; } } @@ -2329,18 +2541,18 @@ lsmStringInit(&str, pDb->pEnv); if( bBlock ){ lsmStringAppendf(&str, "%d", iBlk); while( iBlk!=iLastBlk ){ - fsBlockNext(pFS, iBlk, &iBlk); + fsBlockNext(pFS, pArray, iBlk, &iBlk); lsmStringAppendf(&str, " %d", iBlk); } }else{ lsmStringAppendf(&str, "%d", pArray->iFirst); while( iBlk!=iLastBlk ){ lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk)); - fsBlockNext(pFS, iBlk, &iBlk); + fsBlockNext(pFS, pArray, iBlk, &iBlk); lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk)); } lsmStringAppendf(&str, " %d", pArray->iLastPg); } @@ -2351,10 +2563,34 @@ int rcwork = LSM_BUSY; lsmFinishWork(pDb, 0, &rcwork); } return rc; } + +int lsmFsSegmentContainsPg( + FileSystem *pFS, + Segment *pSeg, + Pgno iPg, + int *pbRes +){ + Redirect *pRedir = pSeg->pRedirect; + int rc = LSM_OK; + int iBlk; + int iLastBlk; + int iPgBlock; /* Block containing page iPg */ + + iPgBlock = fsPageToBlock(pFS, pSeg->iFirst); + iBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iFirst)); + iLastBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iLastPg)); + + while( iBlk!=iLastBlk && iBlk!=iPgBlock && rc==LSM_OK ){ + rc = fsBlockNext(pFS, pSeg, iBlk, &iBlk); + } + + *pbRes = (iBlk==iPgBlock); + return rc; +} /* ** This function implements the lsm_info(LSM_INFO_ARRAY_PAGES) request. ** If successful, *pzOut is set to point to a nul-terminated string ** containing the array structure and LSM_OK is returned. The caller should @@ -2363,11 +2599,11 @@ ** If an error occurs, *pzOut is set to NULL and an LSM error code returned. */ int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut){ int rc = LSM_OK; Snapshot *pWorker; /* Worker snapshot */ - Segment *pArray = 0; /* Array to report on */ + Segment *pSeg = 0; /* Array to report on */ int bUnlock = 0; *pzOut = 0; if( iFirst==0 ) return LSM_ERROR; @@ -2379,26 +2615,26 @@ pWorker = pDb->pWorker; bUnlock = 1; } /* Search for the array that starts on page iFirst */ - pArray = findSegment(pWorker, iFirst); + pSeg = findSegment(pWorker, iFirst); - if( pArray==0 ){ + if( pSeg==0 ){ /* Could not find the requested array. This is an error. */ rc = LSM_ERROR; }else{ Page *pPg = 0; FileSystem *pFS = pDb->pFS; LsmString str; lsmStringInit(&str, pDb->pEnv); - rc = lsmFsDbPageGet(pFS, iFirst, &pPg); + rc = lsmFsDbPageGet(pFS, pSeg, iFirst, &pPg); while( rc==LSM_OK && pPg ){ Page *pNext = 0; lsmStringAppendf(&str, " %lld", lsmFsPageNumber(pPg)); - rc = lsmFsDbPageNext(pArray, pPg, 1, &pNext); + rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext); lsmFsPageRelease(pPg); pPg = pNext; } if( rc!=LSM_OK ){ @@ -2449,31 +2685,33 @@ u8 *aUsed ){ if( pSeg ){ if( pSeg && pSeg->nSize>0 ){ int rc; - Pgno iLast = pSeg->iLastPg; - int iBlk; - int iLastBlk; - int bLastIsLastOnBlock; + int iBlk; /* Current block (during iteration) */ + int iLastBlk; /* Last block of segment */ + int iFirstBlk; /* First block of segment */ + int bLastIsLastOnBlock; /* True iLast is the last on its block */ - iBlk = fsPageToBlock(pFS, pSeg->iFirst); + assert( 0==fsSegmentRedirects(pFS, pSeg) ); + iBlk = iFirstBlk = fsPageToBlock(pFS, pSeg->iFirst); iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg); - bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==iLast); - assert( iBlk>0 ); - - /* If the first page of this run is also the first page of its first - ** block, set the flag to indicate that the first page of iBlk is - ** in use. */ - if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst ){ - assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 ); - aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG; - } + + bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg); + assert( iBlk>0 ); do { /* iBlk is a part of this sorted run. */ aUsed[iBlk-1] |= INTEGRITY_CHECK_USED; + + /* If the first page of this block is also part of the segment, + ** set the flag to indicate that the first page of iBlk is in use. + */ + if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst || iBlk!=iFirstBlk ){ + assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 ); + aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG; + } /* Unless the sorted run finishes before the last page on this block, ** the last page of this block is also in use. */ if( iBlk!=iLastBlk || bLastIsLastOnBlock ){ assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_LAST_PG)==0 ); @@ -2484,11 +2722,11 @@ ** a level currently undergoing an incremental merge. The sorted ** run ends on the last page of iBlk, but the next block has already ** been allocated. So mark it as in use as well. */ if( iBlk==iLastBlk && bLastIsLastOnBlock && bExtra ){ int iExtra = 0; - rc = fsBlockNext(pFS, iBlk, &iExtra); + rc = fsBlockNext(pFS, pSeg, iBlk, &iExtra); assert( rc==LSM_OK ); assert( aUsed[iExtra-1]==0 ); aUsed[iExtra-1] |= INTEGRITY_CHECK_USED; aUsed[iExtra-1] |= INTEGRITY_CHECK_FIRST_PG; @@ -2499,11 +2737,11 @@ ** in order to break out of the loop if this was the last block in ** the run. */ if( iBlk==iLastBlk ){ iBlk = 0; }else{ - rc = fsBlockNext(pFS, iBlk, &iBlk); + rc = fsBlockNext(pFS, pSeg, iBlk, &iBlk); assert( rc==LSM_OK ); } }while( iBlk ); } } Index: src/lsm_main.c ================================================================== --- src/lsm_main.c +++ src/lsm_main.c @@ -39,10 +39,12 @@ ** handle must be holding a pointer to a client snapshot. And the reverse ** - if there are no open cursors and no write transactions then there must ** not be a client snapshot. */ assert( (pDb->pCsr!=0 || pDb->nTransOpen>0)==(pDb->iReader>=0) ); + assert( pDb->iReader<0 || pDb->pClient!=0 ); + assert( pDb->nTransOpen>=0 ); } #else # define assert_db_state(x) #endif @@ -373,11 +375,11 @@ if( !pDb->pWorker ){ rc = lsmBeginWork(pDb); if( rc!=LSM_OK ) return rc; *pbUnlock = 1; } - *pp = pDb->pWorker; + if( pp ) *pp = pDb->pWorker; return rc; } static void infoFreeWorker(lsm_db *pDb, int bUnlock){ if( bUnlock ){ @@ -531,11 +533,17 @@ case LSM_INFO_PAGE_HEX_DUMP: case LSM_INFO_PAGE_ASCII_DUMP: { Pgno pgno = va_arg(ap, Pgno); char **pzVal = va_arg(ap, char **); - rc = lsmInfoPageDump(pDb, pgno, (eParam==LSM_INFO_PAGE_HEX_DUMP), pzVal); + int bUnlock = 0; + rc = infoGetWorker(pDb, 0, &bUnlock); + if( rc==LSM_OK ){ + int bHex = (eParam==LSM_INFO_PAGE_HEX_DUMP); + rc = lsmInfoPageDump(pDb, pgno, bHex, pzVal); + } + infoFreeWorker(pDb, bUnlock); break; } case LSM_INFO_LOG_STRUCTURE: { char **pzVal = va_arg(ap, char **); Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -163,11 +163,12 @@ p->nBlock--; return 0; } static int dbTruncate(lsm_db *pDb, i64 iInUse){ - int rc; + int rc = LSM_OK; +#if 0 int i; DbTruncateCtx ctx; assert( pDb->pWorker ); ctx.nBlock = pDb->pWorker->nBlock; @@ -186,10 +187,11 @@ ); } #endif pDb->pWorker->nBlock = ctx.nBlock; } +#endif return rc; } /* @@ -625,41 +627,32 @@ typedef struct FindFreeblockCtx FindFreeblockCtx; struct FindFreeblockCtx { i64 iInUse; int iRet; + int bNotOne; }; static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){ FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx; - if( iSnapshotiInUse ){ + if( iSnapshotiInUse && (iBlk!=1 || p->bNotOne==0) ){ p->iRet = iBlk; return 1; } return 0; } -static int findFreeblock(lsm_db *pDb, i64 iInUse, int *piRet){ +static int findFreeblock(lsm_db *pDb, i64 iInUse, int bNotOne, int *piRet){ int rc; /* Return code */ FindFreeblockCtx ctx; /* Context object */ -#ifdef LSM_LOG_BLOCKS - char *zList = 0; - lsmInfoFreelist(pDb, &zList); - lsmLogMessage(pDb, 0, - "findFreeblock(): iInUse=%lld freelist=%s", iInUse, zList); - lsmFree(pDb->pEnv, zList); -#endif - ctx.iInUse = iInUse; ctx.iRet = 0; + ctx.bNotOne = bNotOne; rc = lsmWalkFreelist(pDb, 0, findFreeblockCb, (void *)&ctx); *piRet = ctx.iRet; -#ifdef LSM_LOG_BLOCKS - lsmLogMessage(pDb, 0, "findFreeblock(): returning block %d", *piRet); -#endif return rc; } /* ** Allocate a new database file block to write data to, either by extending @@ -667,11 +660,11 @@ ** must be held in order to call this function. ** ** If successful, *piBlk is set to the block number allocated and LSM_OK is ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned. */ -int lsmBlockAllocate(lsm_db *pDb, int *piBlk){ +int lsmBlockAllocate(lsm_db *pDb, int iBefore, int *piBlk){ Snapshot *p = pDb->pWorker; int iRet = 0; /* Block number of allocated block */ int rc = LSM_OK; i64 iInUse = 0; /* Snapshot id still in use */ i64 iSynced = 0; /* Snapshot id synced to disk */ @@ -678,14 +671,16 @@ assert( p ); #ifdef LSM_LOG_FREELIST { + static int nCall = 0; char *zFree = 0; + nCall++; rc = lsmInfoFreelist(pDb, &zFree); if( rc!=LSM_OK ) return rc; - lsmLogMessage(pDb, 0, "lsmBlockAllocate(): freelist: %s", zFree); + lsmLogMessage(pDb, 0, "lsmBlockAllocate(): %d freelist: %s", nCall, zFree); lsmFree(pDb->pEnv, zFree); } #endif /* Set iInUse to the smallest snapshot id that is either: @@ -696,29 +691,35 @@ ** be used following recovery if a failure occurs at this point). */ rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0); if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId; iInUse = iSynced; - if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId); + if( rc==LSM_OK && pDb->iReader>=0 ){ + assert( pDb->pClient ); + iInUse = LSM_MIN(iInUse, pDb->pClient->iId); + } if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse); #ifdef LSM_LOG_FREELIST { lsmLogMessage(pDb, 0, "lsmBlockAllocate(): " "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)", - iInUse, iSynced, (pDb->pClient ? pDb->pClient->iId : 0) + iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0) ); } #endif /* Query the free block list for a suitable block */ - if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet); + if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet); - /* If a block was found in the free block list, use it and remove it from - ** the list. Otherwise, if no suitable block was found, allocate one from - ** the end of the file. */ - if( rc==LSM_OK ){ + if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){ + iRet = 0; + + }else if( rc==LSM_OK ){ + /* If a block was found in the free block list, use it and remove it from + ** the list. Otherwise, if no suitable block was found, allocate one from + ** the end of the file. */ if( iRet>0 ){ #ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, 0, "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse); #endif @@ -732,11 +733,11 @@ lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet); #endif } } - assert( iRet>0 || rc!=LSM_OK ); + assert( iBefore>0 || iRet>0 || rc!=LSM_OK ); *piBlk = iRet; return rc; } /* @@ -869,10 +870,11 @@ void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){ if( p ){ lsmSortedFreeLevel(pEnv, p->pLevel); lsmFree(pEnv, p->freelist.aEntry); + lsmFree(pEnv, p->redirect.a); lsmFree(pEnv, p); } } /* Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -70,10 +70,13 @@ #ifndef _LSM_INT_H # include "lsmInt.h" #endif #include "sqlite4.h" /* only for sqlite4_snprintf() */ +#define LSM_LOG_STRUCTURE 0 +#define LSM_LOG_DATA 0 + /* ** Macros to help decode record types. */ #define rtTopic(eType) ((eType) & LSM_SYSTEMKEY) #define rtIsDelete(eType) (((eType) & 0x0F)==LSM_POINT_DELETE) @@ -393,10 +396,11 @@ if( pBlob->pData ) lsmFree(pBlob->pEnv, pBlob->pData); memset(pBlob, 0, sizeof(Blob)); } static int sortedReadData( + Segment *pSeg, Page *pPg, int iOff, int nByte, void **ppData, Blob *pBlob @@ -446,11 +450,11 @@ i -= iEnd; /* Grab the next page in the segment */ do { - rc = lsmFsDbPageNext(0, pPg, 1, &pNext); + rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext); if( rc==LSM_OK && pNext==0 ){ rc = LSM_CORRUPT_BKPT; } if( rc ) break; lsmFsPageRelease(pPg); @@ -507,10 +511,11 @@ lsmVarintGet64(&aCell[1], &iRet); return iRet; } static u8 *pageGetKey( + Segment *pSeg, /* Segment pPg belongs to */ Page *pPg, /* Page to read from */ int iCell, /* Index of cell on page to read */ int *piTopic, /* OUT: Topic associated with this key */ int *pnKey, /* OUT: Size of key in bytes */ Blob *pBlob /* If required, use this for dynamic memory */ @@ -533,26 +538,27 @@ if( rtIsWrite(eType) ){ pKey += lsmVarintGet32(pKey, &nDummy); } *piTopic = rtTopic(eType); - sortedReadData(pPg, pKey-aData, *pnKey, (void **)&pKey, pBlob); + sortedReadData(pSeg, pPg, pKey-aData, *pnKey, (void **)&pKey, pBlob); return pKey; } static int pageGetKeyCopy( lsm_env *pEnv, /* Environment handle */ + Segment *pSeg, /* Segment pPg belongs to */ Page *pPg, /* Page to read from */ int iCell, /* Index of cell on page to read */ int *piTopic, /* OUT: Topic associated with this key */ Blob *pBlob /* If required, use this for dynamic memory */ ){ int rc = LSM_OK; int nKey; u8 *aKey; - aKey = pageGetKey(pPg, iCell, piTopic, &nKey, pBlob); + aKey = pageGetKey(pSeg, pPg, iCell, piTopic, &nKey, pBlob); assert( (void *)aKey!=pBlob->pData || nKey==pBlob->nData ); if( (void *)aKey!=pBlob->pData ){ rc = sortedBlobSet(pEnv, pBlob, aKey, nKey); } @@ -577,10 +583,11 @@ #define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i))) #define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i))) static int pageGetBtreeKey( + Segment *pSeg, /* Segment page pPg belongs to */ Page *pPg, int iKey, Pgno *piPtr, int *piTopic, void **ppKey, @@ -603,13 +610,13 @@ if( eType==0 ){ int rc; Pgno iRef; /* Page number of referenced page */ Page *pRef; aCell += GETVARINT64(aCell, iRef); - rc = lsmFsDbPageGet(lsmPageFS(pPg), iRef, &pRef); + rc = lsmFsDbPageGet(lsmPageFS(pPg), pSeg, iRef, &pRef); if( rc!=LSM_OK ) return rc; - pageGetKeyCopy(lsmPageEnv(pPg), pRef, 0, &eType, pBlob); + pageGetKeyCopy(lsmPageEnv(pPg), pSeg, pRef, 0, &eType, pBlob); lsmFsPageRelease(pRef); *ppKey = pBlob->pData; *pnKey = pBlob->nData; }else{ aCell += GETVARINT32(aCell, *pnKey); @@ -634,10 +641,11 @@ iCell = pCsr->aPg[iPg].iCell-1; } if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT; rc = pageGetBtreeKey( + pCsr->pSeg, pCsr->aPg[iPg].pPage, iCell, &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob ); pCsr->eType |= LSM_SEPARATOR; } @@ -695,11 +703,11 @@ iLoad = btreeCursorPtr(aData, nData, pPg->iCell); do { Page *pLoad; pCsr->iPg++; - rc = lsmFsDbPageGet(pCsr->pFS, iLoad, &pLoad); + rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLoad, &pLoad); pCsr->aPg[pCsr->iPg].pPage = pLoad; pCsr->aPg[pCsr->iPg].iCell = 0; if( rc==LSM_OK ){ if( pCsr->iPg==(pCsr->nDepth-1) ) break; aData = fsPageData(pLoad, &nData); @@ -740,11 +748,11 @@ Page *pPg = 0; FileSystem *pFS = pCsr->pFS; int iPg = pCsr->pSeg->iRoot; do { - rc = lsmFsDbPageGet(pFS, iPg, &pPg); + rc = lsmFsDbPageGet(pFS, pCsr->pSeg, iPg, &pPg); assert( (rc==LSM_OK)==(pPg!=0) ); if( rc==LSM_OK ){ u8 *aData; int nData; int flags; @@ -830,10 +838,11 @@ if( p->iPg ){ lsm_env *pEnv = lsmFsEnv(pCsr->pFS); int iCell; /* Current cell number on leaf page */ Pgno iLeaf; /* Page number of current leaf page */ int nDepth; /* Depth of b-tree structure */ + Segment *pSeg = pCsr->pSeg; /* Decode the MergeInput structure */ iLeaf = p->iPg; nDepth = (p->iCell & 0x00FF); iCell = (p->iCell >> 8) - 1; @@ -842,24 +851,25 @@ assert( pCsr->aPg==0 ); pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc); /* Populate the last entry of the aPg[] array */ if( rc==LSM_OK ){ + Page **pp = &pCsr->aPg[nDepth-1].pPage; pCsr->iPg = nDepth-1; pCsr->nDepth = nDepth; pCsr->aPg[pCsr->iPg].iCell = iCell; - rc = lsmFsDbPageGet(pCsr->pFS, iLeaf, &pCsr->aPg[nDepth-1].pPage); + rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLeaf, pp); } /* Populate any other aPg[] array entries */ if( rc==LSM_OK && nDepth>1 ){ Blob blob = {0,0,0}; void *pSeek; int nSeek; int iTopicSeek; int iPg = 0; - int iLoad = pCsr->pSeg->iRoot; + int iLoad = pSeg->iRoot; Page *pPg = pCsr->aPg[nDepth-1].pPage; if( pageObjGetNRec(pPg)==0 ){ /* This can happen when pPg is the right-most leaf in the b-tree. ** In this case, set the iTopicSeek/pSeek/nSeek key to a value @@ -868,18 +878,18 @@ iTopicSeek = 1000; pSeek = 0; nSeek = 0; }else{ Pgno dummy; - rc = pageGetBtreeKey(pPg, + rc = pageGetBtreeKey(pSeg, pPg, 0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob ); } do { Page *pPg; - rc = lsmFsDbPageGet(pCsr->pFS, iLoad, &pPg); + rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLoad, &pPg); assert( rc==LSM_OK || pPg==0 ); if( rc==LSM_OK ){ u8 *aData; /* Buffer containing page data */ int nData; /* Size of aData[] in bytes */ int iMin; @@ -899,11 +909,13 @@ void *pKey; int nKey; /* Key for cell iTry */ int iTopic; /* Topic for key pKeyT/nKeyT */ Pgno iPtr; /* Pointer for cell iTry */ int res; /* (pSeek - pKeyT) */ - rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopic, &pKey, &nKey,&blob); + rc = pageGetBtreeKey( + pSeg, pPg, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob + ); if( rc!=LSM_OK ) break; res = sortedKeyCompare( xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey ); @@ -919,11 +931,13 @@ } pCsr->aPg[iPg].pPage = pPg; pCsr->aPg[iPg].iCell = iCell; iPg++; - assert( iPg!=nDepth-1 || iLoad==iLeaf ); + assert( iPg!=nDepth-1 + || lsmFsRedirectPage(pCsr->pFS, pSeg->pRedirect, iLoad)==iLeaf + ); } }while( rc==LSM_OK && iPg<(nDepth-1) ); sortedBlobFree(&blob); } @@ -941,11 +955,11 @@ int i; for(i=pCsr->iPg-1; i>=0; i--){ if( pCsr->aPg[i].iCell>0 ) break; } assert( i>=0 ); - rc = pageGetBtreeKey( + rc = pageGetBtreeKey(pSeg, pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1, &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob ); pCsr->eType |= LSM_SEPARATOR; @@ -998,11 +1012,11 @@ int iNew /* Page number of new page */ ){ Page *pPg = 0; /* The new page */ int rc; /* Return Code */ - rc = lsmFsDbPageGet(pFS, iNew, &pPg); + rc = lsmFsDbPageGet(pFS, pPtr->pSeg, iNew, &pPg); assert( rc==LSM_OK || pPg==0 ); segmentPtrSetPage(pPtr, pPg); return rc; } @@ -1012,11 +1026,11 @@ int iOff, int nByte, void **ppData, Blob *pBlob ){ - return sortedReadData(pPtr->pPg, iOff, nByte, ppData, pBlob); + return sortedReadData(pPtr->pSeg, pPtr->pPg, iOff, nByte, ppData, pBlob); } static int segmentPtrNextPage( SegmentPtr *pPtr, /* Load page into this SegmentPtr object */ int eDir /* +1 for next(), -1 for prev() */ @@ -1071,18 +1085,39 @@ } return rc; } -void lsmSortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){ + +static Segment *sortedSplitkeySegment(Level *pLevel){ + Merge *pMerge = pLevel->pMerge; + MergeInput *p = &pMerge->splitkey; + Segment *pSeg; + int i; + + for(i=0; inInput; i++){ + if( p->iPg==pMerge->aInput[i].iPg ) break; + } + if( pMerge->nInput==(pLevel->nRight+1) && i>=(pMerge->nInput-1) ){ + pSeg = &pLevel->pNext->lhs; + }else{ + pSeg = &pLevel->aRhs[i]; + } + + return pSeg; +} + +static void sortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){ + Segment *pSeg; Page *pPg = 0; lsm_env *pEnv = pDb->pEnv; /* Environment handle */ int rc = *pRc; Merge *pMerge = pLevel->pMerge; + pSeg = sortedSplitkeySegment(pLevel); if( rc==LSM_OK ){ - rc = lsmFsDbPageGet(pDb->pFS, pMerge->splitkey.iPg, &pPg); + rc = lsmFsDbPageGet(pDb->pFS, pSeg, pMerge->splitkey.iPg, &pPg); } if( rc==LSM_OK ){ int iTopic; Blob blob = {0, 0, 0, 0}; u8 *aData; @@ -1091,18 +1126,20 @@ aData = lsmFsPageData(pPg, &nData); if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){ void *pKey; int nKey; Pgno dummy; - rc = pageGetBtreeKey( + rc = pageGetBtreeKey(pSeg, pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob ); if( rc==LSM_OK && blob.pData!=pKey ){ rc = sortedBlobSet(pEnv, &blob, pKey, nKey); } }else{ - rc = pageGetKeyCopy(pEnv, pPg, pMerge->splitkey.iCell, &iTopic, &blob); + rc = pageGetKeyCopy( + pEnv, pSeg, pPg, pMerge->splitkey.iCell, &iTopic, &blob + ); } pLevel->iSplitTopic = iTopic; pLevel->pSplitKey = blob.pData; pLevel->nSplitKey = blob.nData; @@ -1170,14 +1207,17 @@ rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey ); if( res<0 ) segmentPtrReset(pPtr); } + if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){ - rc = lsmFsDbPageGet(pCsr->pDb->pFS, pPtr->pSeg->iFirst, &pPtr->pPg); + Segment *pSeg = pPtr->pSeg; + rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, pSeg->iFirst, &pPtr->pPg); if( rc!=LSM_OK ) return rc; - pPtr->eType = LSM_START_DELETE | (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0); + pPtr->eType = LSM_START_DELETE | LSM_POINT_DELETE; + pPtr->eType |= (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0); pPtr->pKey = pLvl->pSplitKey; pPtr->nKey = pLvl->nSplitKey; } }while( pCsr @@ -1194,15 +1234,16 @@ SegmentPtr *pPtr, int bLast, int *pRc ){ if( *pRc==LSM_OK ){ + Segment *pSeg = pPtr->pSeg; Page *pNew = 0; if( bLast ){ - *pRc = lsmFsDbPageLast(pFS, pPtr->pSeg, &pNew); + *pRc = lsmFsDbPageLast(pFS, pSeg, &pNew); }else{ - *pRc = lsmFsDbPageGet(pFS, pPtr->pSeg->iFirst, &pNew); + *pRc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pNew); } segmentPtrSetPage(pPtr, pNew); } } @@ -1228,18 +1269,25 @@ rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1)); } if( rc==LSM_OK && pPtr->pPg ){ rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0); + if( rc==LSM_OK && bLast && pPtr->pSeg!=&pLvl->lhs ){ + int res = sortedKeyCompare(pCsr->pDb->xCmp, + rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, + pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey + ); + if( res<0 ) segmentPtrReset(pPtr); + } } bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr); if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){ rc = segmentPtrAdvance(pCsr, pPtr, bLast); } - +#if 0 if( bLast && rc==LSM_OK && pPtr->pPg && pPtr->pSeg==&pLvl->lhs && pLvl->nRight && (pPtr->eType & LSM_START_DELETE) ){ pPtr->iCell++; @@ -1247,10 +1295,11 @@ pPtr->pKey = pLvl->pSplitKey; pPtr->nKey = pLvl->nSplitKey; pPtr->pVal = 0; pPtr->nVal = 0; } +#endif return rc; } static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){ @@ -1291,13 +1340,14 @@ for(eDir=-1; eDir<=1; eDir+=2){ Page *pTest = pPtr->pPg; lsmFsPageRef(pTest); while( pTest ){ + Segment *pSeg = pPtr->pSeg; Page *pNext; - int rc = lsmFsDbPageNext(pPtr->pSeg, pTest, eDir, &pNext); + int rc = lsmFsDbPageNext(pSeg, pTest, eDir, &pNext); lsmFsPageRelease(pTest); if( rc ) return 1; pTest = pNext; if( pTest ){ @@ -1311,11 +1361,11 @@ u8 *pPgKey; int res; int iCell; iCell = ((eDir < 0) ? (nCell-1) : 0); - pPgKey = pageGetKey(pTest, iCell, &iPgTopic, &nPgKey, &blob); + pPgKey = pageGetKey(pSeg, pTest, iCell, &iPgTopic, &nPgKey, &blob); res = iTopic - iPgTopic; if( res==0 ) res = pCsr->pDb->xCmp(pKey, nKey, pPgKey, nPgKey); if( (eDir==1 && res>0) || (eDir==-1 && res<0) ){ /* Taking this branch means something has gone wrong. */ char *zMsg = lsmMallocPrintf(pEnv, "Key \"%s\" is not on page %d", @@ -1381,11 +1431,11 @@ int iLastTopic; int res; /* Result of comparison */ Page *pNext; /* Load the last key on the current page. */ - pLastKey = pageGetKey( + pLastKey = pageGetKey(pPtr->pSeg, pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1 ); /* If the loaded key is >= than (pKey/nKey), break out of the loop. ** If (pKey/nKey) is present in this array, it must be on the current @@ -1724,11 +1774,11 @@ if( aPg ){ aPg[i++] = iPg; piFirst = &aPg[i]; } - rc = lsmFsDbPageGet(pCsr->pDb->pFS, iPg, &pPg); + rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, iPg, &pPg); assert( rc==LSM_OK || pPg==0 ); if( rc==LSM_OK ){ u8 *aData; /* Buffer containing page data */ int nData; /* Size of aData[] in bytes */ int iMin; @@ -1750,11 +1800,13 @@ void *pKeyT; int nKeyT; /* Key for cell iTry */ int iTopicT; /* Topic for key pKeyT/nKeyT */ Pgno iPtr; /* Pointer associated with cell iTry */ int res; /* (pKey - pKeyT) */ - rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob); + rc = pageGetBtreeKey( + pSeg, pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob + ); if( rc!=LSM_OK ) break; if( piFirst && pKeyT==blob.pData ){ *piFirst = pageGetBtreeRef(pPg, iTry); piFirst = 0; i++; @@ -1871,21 +1923,35 @@ res = 0; } } if( res>=0 ){ + int bHit = 0; /* True if at least one rhs is not EOF */ int iPtr = *piPgno; int i; for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){ + SegmentPtr *pPtr = &aPtr[i]; iOut = 0; rc = seekInSegment( - pCsr, &aPtr[i], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop + pCsr, pPtr, iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop ); iPtr = iOut; + + /* If the segment-pointer has settled on a key that is smaller than + ** the splitkey, invalidate the segment-pointer. */ + if( pPtr->pPg ){ + res = sortedKeyCompare(pCsr->pDb->xCmp, + rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, + pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey + ); + if( res<0 ) segmentPtrReset(pPtr); + } + + if( aPtr[i].pKey ) bHit = 1; } - if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){ + if( rc==LSM_OK && eSeek==LSM_SEEK_LE && bHit==0 ){ rc = segmentPtrEnd(pCsr, &aPtr[0], 1); } } assert( eSeek==LSM_SEEK_EQ || bStop==0 ); @@ -2091,16 +2157,47 @@ int bReverse ){ int rc; SegmentPtr *pPtr = &pCsr->aPtr[iPtr]; Level *pLvl = pPtr->pLevel; - int bComposite; + int bComposite; /* True if pPtr is part of composite level */ + /* Advance the segment-pointer object. */ rc = segmentPtrAdvance(pCsr, pPtr, bReverse); if( rc!=LSM_OK ) return rc; + bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight); + if( bComposite && pPtr->pPg==0 ){ + int bFix = 0; + if( (bReverse==0)==(pPtr->pSeg==&pLvl->lhs) ){ + int i; + if( bReverse ){ + SegmentPtr *pLhs = &pCsr->aPtr[iPtr - 1 - (pPtr->pSeg - pLvl->aRhs)]; + for(i=0; inRight; i++){ + if( pLhs[i+1].pPg ) break; + } + if( i==pLvl->nRight ){ + bFix = 1; + rc = segmentPtrEnd(pCsr, pLhs, 1); + } + }else{ + bFix = 1; + for(i=0; rc==LSM_OK && inRight; i++){ + rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]); + } + } + } + if( bFix ){ + int i; + for(i=pCsr->nTree-1; i>0; i--){ + multiCursorDoCompare(pCsr, i, bReverse); + } + } + } + +#if 0 if( bComposite && pPtr->pSeg==&pLvl->lhs /* lhs of composite level */ && bReverse==0 /* csr advanced forwards */ && pPtr->pPg==0 /* segment at EOF */ ){ int i; @@ -2109,10 +2206,11 @@ } for(i=pCsr->nTree-1; i>0; i--){ multiCursorDoCompare(pCsr, i, 0); } } +#endif return rc; } static void mcursorFreeComponents(MultiCursor *pCsr){ @@ -2215,10 +2313,30 @@ pCsr->aPtr[i].pLevel = pLvl; } return LSM_OK; } + +static void multiCursorAddOne(MultiCursor *pCsr, Level *pLvl, int *pRc){ + if( *pRc==LSM_OK ){ + int iPtr = pCsr->nPtr; + int i; + pCsr->aPtr[iPtr].pLevel = pLvl; + pCsr->aPtr[iPtr].pSeg = &pLvl->lhs; + iPtr++; + for(i=0; inRight; i++){ + pCsr->aPtr[iPtr].pLevel = pLvl; + pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i]; + iPtr++; + } + + if( pLvl->nRight && pLvl->pSplitKey==0 ){ + sortedSplitkey(pCsr->pDb, pLvl, pRc); + } + pCsr->nPtr = iPtr; + } +} static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){ Level *pLvl; int nPtr = 0; int iPtr = 0; @@ -2234,26 +2352,14 @@ nPtr += (1 + pLvl->nRight); } assert( pCsr->aPtr==0 ); pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc); - if( rc==LSM_OK ) pCsr->nPtr = nPtr; - - for(pLvl=pSnap->pLevel; pLvl && rc==LSM_OK; pLvl=pLvl->pNext){ - int i; - if( pLvl->flags & LEVEL_INCOMPLETE ) continue; - pCsr->aPtr[iPtr].pLevel = pLvl; - pCsr->aPtr[iPtr].pSeg = &pLvl->lhs; - iPtr++; - for(i=0; inRight; i++){ - pCsr->aPtr[iPtr].pLevel = pLvl; - pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i]; - iPtr++; - } - - if( pLvl->nRight && pLvl->pSplitKey==0 ){ - lsmSortedSplitkey(pCsr->pDb, pLvl, &rc); + + for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){ + if( (pLvl->flags & LEVEL_INCOMPLETE)==0 ){ + multiCursorAddOne(pCsr, pLvl, &rc); } } return rc; } @@ -2410,12 +2516,16 @@ MultiCursor *pCsr; /* Cursor used to read db */ int rc = LSM_OK; /* Return Code */ Snapshot *pSnap = 0; assert( pDb->pWorker ); - rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap); - if( rc!=LSM_OK ) return rc; + if( pDb->bIncrMerge ){ + rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap); + if( rc!=LSM_OK ) return rc; + }else{ + pSnap = pDb->pWorker; + } pCsr = multiCursorNew(pDb, &rc); if( pCsr ){ rc = multiCursorAddAll(pCsr, pSnap); pCsr->flags |= CURSOR_IGNORE_DELETE; @@ -2446,11 +2556,13 @@ } } } lsmMCursorClose(pCsr); - lsmFreeSnapshot(pDb->pEnv, pSnap); + if( pSnap!=pDb->pWorker ){ + lsmFreeSnapshot(pDb->pEnv, pSnap); + } return rc; } int lsmSortedLoadFreelist( @@ -2518,32 +2630,118 @@ int nKey; multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey); *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey); } } + +#ifndef NDEBUG +static void assertCursorTree(MultiCursor *pCsr){ + int bRev = !!(pCsr->flags & CURSOR_PREV_OK); + int *aSave = pCsr->aTree; + int nSave = pCsr->nTree; + int rc; + + pCsr->aTree = 0; + pCsr->nTree = 0; + rc = multiCursorAllocTree(pCsr); + if( rc==LSM_OK ){ + int i; + for(i=pCsr->nTree-1; i>0; i--){ + multiCursorDoCompare(pCsr, i, bRev); + } + + assert( nSave==pCsr->nTree + && 0==memcmp(aSave, pCsr->aTree, sizeof(int)*nSave) + ); + + lsmFree(pCsr->pDb->pEnv, pCsr->aTree); + } + + pCsr->aTree = aSave; + pCsr->nTree = nSave; +} +#else +# define assertCursorTree(x) +#endif static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){ int eType = pCsr->eType; int iKey; int i; - int rdmask = 0; + int rdmask; assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) ); - if( pCsr->flags & CURSOR_NEXT_OK ){ - rdmask = LSM_END_DELETE; - }else{ - rdmask = LSM_START_DELETE; - } + assertCursorTree(pCsr); + rdmask = (pCsr->flags & CURSOR_NEXT_OK) ? LSM_END_DELETE : LSM_START_DELETE; + + /* If the cursor does not currently point to an actual database key (i.e. + ** it points to a delete key, or the start or end of a range-delete), and + ** the CURSOR_IGNORE_DELETE flag is set, skip past this entry. */ if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){ if( (eType & LSM_INSERT)==0 ) return 0; } + + /* If the cursor points to a system key (free-list entry), and the + ** CURSOR_IGNORE_SYSTEM flag is set, skip thie entry. */ if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){ return 0; } - /* Check if this key has already been deleted by a range-delete */ +#ifndef NDEBUG + /* This block fires assert() statements to check one of the assumptions + ** in the comment below - that if the lhs sub-cursor of a level undergoing + ** a merge is valid, then all the rhs sub-cursors must be at EOF. + ** + ** Also assert that all rhs sub-cursors are either at EOF or point to + ** a key that is not less than the level split-key. */ + for(i=0; inPtr; i++){ + SegmentPtr *pPtr = &pCsr->aPtr[i]; + Level *pLvl = pPtr->pLevel; + if( pLvl->nRight && pPtr->pPg ){ + if( pPtr->pSeg==&pLvl->lhs ){ + int j; + for(j=0; jnRight; j++) assert( pPtr[j+1].pPg==0 ); + }else{ + int res = sortedKeyCompare(pCsr->pDb->xCmp, + rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, + pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey + ); + assert( res>=0 ); + } + } + } +#endif + + /* Now check if this key has already been deleted by a range-delete. If + ** so, skip past it. + ** + ** Assume, for the moment, that the tree contains no levels currently + ** undergoing incremental merge, and that this cursor is iterating forwards + ** through the database keys. The cursor currently points to a key in + ** level L. This key has already been deleted if any of the sub-cursors + ** that point to levels newer than L (or to the in-memory tree) point to + ** a key greater than the current key with the LSM_END_DELETE flag set. + ** + ** Or, if the cursor is iterating backwards through data keys, if any + ** such sub-cursor points to a key smaller than the current key with the + ** LSM_START_DELETE flag set. + ** + ** Why it works with levels undergoing a merge too: + ** + ** When a cursor iterates forwards, the sub-cursors for the rhs of a + ** level are only activated once the lhs reaches EOF. So when iterating + ** forwards, the keys visited are the same as if the level was completely + ** merged. + ** + ** If the cursor is iterating backwards, then the lhs sub-cursor is not + ** initialized until the last of the rhs sub-cursors has reached EOF. + ** Additionally, if the START_DELETE flag is set on the last entry (in + ** reverse order - so the entry with the smallest key) of a rhs sub-cursor, + ** then a pseudo-key equal to the levels split-key with the END_DELETE + ** flag set is visited by the sub-cursor. + */ iKey = pCsr->aTree[1]; for(i=0; i0 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[0]))) - || (iKey>1 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[1]))) - ){ - return 0; - } - if( iKey>CURSOR_DATA_SYSTEM && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){ - int eType; - multiCursorGetKey(pCsr, CURSOR_DATA_SYSTEM, &eType, 0, 0); - if( rdmask & eType ) return 0; - } - - for(i=CURSOR_DATA_SEGMENT; iaPtr[iPtr].pPg && (pCsr->aPtr[iPtr].eType & rdmask) ){ - return 0; - } - } -#endif - + /* The current cursor position is one this cursor should visit. Return 1. */ return 1; } + +static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){ + int rc; + + rc = multiCursorAllocTree(pCsr); + if( rc==LSM_OK ){ + int i; + for(i=pCsr->nTree-1; i>0; i--){ + multiCursorDoCompare(pCsr, i, bRev); + } + } + + assertCursorTree(pCsr); + multiCursorCacheKey(pCsr, &rc); + + if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){ + rc = multiCursorAdvance(pCsr, bRev); + } + return rc; +} + static int multiCursorEnd(MultiCursor *pCsr, int bLast){ int rc = LSM_OK; int i; pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK); pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK); - - if( pCsr->apTreeCsr[0] ){ - rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast); - } - if( rc==LSM_OK && pCsr->apTreeCsr[1] ){ - rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast); - } - pCsr->iFree = 0; + + /* Position the two in-memory tree cursors */ + for(i=0; rc==LSM_OK && i<2; i++){ + if( pCsr->apTreeCsr[i] ){ + rc = lsmTreeCursorEnd(pCsr->apTreeCsr[i], bLast); + } + } for(i=0; rc==LSM_OK && inPtr; i++){ SegmentPtr *pPtr = &pCsr->aPtr[i]; Level *pLvl = pPtr->pLevel; - - rc = segmentPtrEnd(pCsr, pPtr, bLast); - if( rc==LSM_OK && bLast==0 && pLvl->nRight && pPtr->pSeg==&pLvl->lhs ){ - int iRhs; - for(iRhs=1+i; rc==LSM_OK && iRhs<1+i+pLvl->nRight; iRhs++){ - SegmentPtr *pRhs = &pCsr->aPtr[iRhs]; - if( pPtr->pPg==0 ){ - rc = sortedRhsFirst(pCsr, pLvl, pRhs); + int iRhs; + int bHit = 0; + + if( bLast ){ + for(iRhs=0; iRhsnRight && rc==LSM_OK; iRhs++){ + rc = segmentPtrEnd(pCsr, &pPtr[iRhs+1], 1); + if( pPtr[iRhs+1].pPg ) bHit = 1; + } + if( bHit==0 && rc==LSM_OK ){ + rc = segmentPtrEnd(pCsr, pPtr, 1); + }else{ + segmentPtrReset(pPtr); + } + }else{ + int bLhs = (pPtr->pSeg==&pLvl->lhs); + assert( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[0] ); + + if( bLhs ){ + rc = segmentPtrEnd(pCsr, pPtr, 0); + if( pPtr->pKey ) bHit = 1; + } + for(iRhs=0; iRhsnRight && rc==LSM_OK; iRhs++){ + if( bHit ){ + segmentPtrReset(&pPtr[iRhs+1]); }else{ - segmentPtrReset(pRhs); + rc = sortedRhsFirst(pCsr, pLvl, &pPtr[iRhs+bLhs]); } } - i += pLvl->nRight; } + i += pLvl->nRight; } + /* And the b-tree cursor, if applicable */ if( rc==LSM_OK && pCsr->pBtCsr ){ assert( bLast==0 ); rc = btreeCursorFirst(pCsr->pBtCsr); } if( rc==LSM_OK ){ - rc = multiCursorAllocTree(pCsr); - } - if( rc==LSM_OK ){ - for(i=pCsr->nTree-1; i>0; i--){ - multiCursorDoCompare(pCsr, i, bLast); - } - } - - multiCursorCacheKey(pCsr, &rc); - if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){ - if( bLast ){ - rc = lsmMCursorPrev(pCsr); - }else{ - rc = lsmMCursorNext(pCsr); - } - } - + rc = multiCursorSetupTree(pCsr, bLast); + } + return rc; } int mcursorSave(MultiCursor *pCsr){ @@ -2884,11 +3086,11 @@ ** cursor points to a SORTED_DELETE entry, then the cursor has not been ** successfully advanced. ** ** Similarly, if the cursor is configured to skip system keys and the ** current cursor points to a system key, it has not yet been advanced. - */ + */ if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0; } return 1; } @@ -2922,10 +3124,12 @@ static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){ int rc = LSM_OK; /* Return Code */ if( lsmMCursorValid(pCsr) ){ do { int iKey = pCsr->aTree[1]; + + assertCursorTree(pCsr); /* If this multi-cursor is advancing forwards, and the sub-cursor ** being advanced is the one that separator keys may be being read ** from, record the current absolute pointer value. */ if( pCsr->pPrevMergePtr ){ @@ -2960,10 +3164,11 @@ if( rc==LSM_OK ){ int i; for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){ multiCursorDoCompare(pCsr, i, bReverse); } + assertCursorTree(pCsr); } }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 ); } return rc; } @@ -3199,11 +3404,11 @@ Page *pPg = 0; u8 *aData; int nData; int flags; - rc = lsmFsDbPageGet(pFS, iPg, &pPg); + rc = lsmFsDbPageGet(pFS, pSeg, iPg, &pPg); if( rc!=LSM_OK ) break; aData = fsPageData(pPg, &nData); flags = pageGetFlags(aData, nData); if( flags&SEGMENT_BTREE_FLAG ){ @@ -3645,11 +3850,11 @@ rc = LSM_OK; iFPtr = pMW->pLevel->pNext->lhs.iFirst; }else if( pCsr->nPtr>0 ){ Segment *pSeg; pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg; - rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iFirst, &pPg); + rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg, pSeg->iFirst, &pPg); if( rc==LSM_OK ){ u8 *aData; /* Buffer for page pPg */ int nData; /* Size of aData[] in bytes */ aData = fsPageData(pPg, &nData); iFPtr = pageGetPtr(aData, nData); @@ -3781,27 +3986,10 @@ return rc; } -static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){ - int rc; - - assert( pCsr->aTree==0 ); - - rc = multiCursorAllocTree(pCsr); - if( rc==LSM_OK ){ - int i; - for(i=pCsr->nTree-1; i>0; i--){ - multiCursorDoCompare(pCsr, i, bRev); - } - } - - multiCursorCacheKey(pCsr, &rc); - return rc; -} - /* ** Free all resources allocated by mergeWorkerInit(). */ static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){ int i; /* Iterator variable */ @@ -3987,11 +4175,11 @@ mergeRangeDeletes(pCsr, &iVal, &eType); if( eType!=0 ){ if( pMW->aGobble ){ int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT; - if( iGobblenPtr ){ + if( iGobblenPtr && iGobble>=0 ){ SegmentPtr *pGobble = &pCsr->aPtr[iGobble]; if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){ pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg); } } @@ -4017,34 +4205,10 @@ /* Advance the cursor to the next input record (assuming one exists). */ assert( lsmMCursorValid(pMW->pCsr) ); if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr); - /* If the cursor is at EOF, the merge is finished. Release all page - ** references currently held by the merge worker and inform the - ** FileSystem object that no further pages will be appended to either - ** the main or separators array. - */ - if( rc==LSM_OK && !lsmMCursorValid(pMW->pCsr) ){ - - mergeWorkerShutdown(pMW, &rc); - if( pSeg->iFirst ){ - rc = lsmFsSortedFinish(pDb->pFS, pSeg); - } - -#ifdef LSM_DEBUG_EXPENSIVE - if( rc==LSM_OK ){ -#if 0 - rc = assertBtreeOk(pDb, pSeg); - if( pMW->pCsr->pBtCsr ){ - Segment *pNext = &pMW->pLevel->pNext->lhs; - rc = assertPointersOk(pDb, pSeg, pNext, 0); - } -#endif - } -#endif - } return rc; } static int mergeWorkerDone(MergeWorker *pMW){ return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr); @@ -4072,11 +4236,12 @@ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ - Segment *pDel = 0; /* Delete separators from this segment */ + Segment *pLinked = 0; /* Delete separators from this segment */ + Level *pDel = 0; /* Delete this entire level */ int nWrite = 0; /* Number of database pages written */ Freelist freelist; if( eTree!=TREE_NONE ){ rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk); @@ -4103,16 +4268,19 @@ pCsr->pDb = pDb; rc = multiCursorVisitFreelist(pCsr); if( rc==LSM_OK ){ rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree); } - if( rc==LSM_OK - && pNext && pNext->pMerge==0 && pNext->lhs.iRoot - && (eTree!=TREE_NONE || (pNext->flags & LEVEL_FREELIST_ONLY)) - ){ - pDel = &pNext->lhs; - rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr); + if( rc==LSM_OK && pNext && pNext->pMerge==0 ){ + if( (pNext->flags & LEVEL_FREELIST_ONLY) ){ + pDel = pNext; + pCsr->aPtr = lsmMallocZeroRc(pDb->pEnv, sizeof(SegmentPtr), &rc); + multiCursorAddOne(pCsr, pNext, &rc); + }else if( eTree!=TREE_NONE && pNext->lhs.iRoot ){ + pLinked = &pNext->lhs; + rc = btreeCursorNew(pDb, pLinked, &pCsr->pBtCsr); + } } /* If this will be the only segment in the database, discard any delete ** markers present in the in-memory tree. */ if( pNext==0 ){ @@ -4143,14 +4311,16 @@ /* Do the work to create the new merged segment on disk */ if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr); while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){ rc = mergeWorkerStep(&mergeworker); } + mergeWorkerShutdown(&mergeworker, &rc); assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst ); - + if( rc==LSM_OK && pNew->lhs.iFirst ){ + rc = lsmFsSortedFinish(pDb->pFS, &pNew->lhs); + } nWrite = mergeworker.nWork; - mergeWorkerShutdown(&mergeworker, &rc); pNew->flags &= ~LEVEL_INCOMPLETE; if( eTree==TREE_NONE ){ pNew->flags |= LEVEL_FREELIST_ONLY; } pNew->pMerge = 0; @@ -4159,14 +4329,21 @@ if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){ assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 ); lsmDbSnapshotSetLevel(pDb->pWorker, pNext); sortedFreeLevel(pDb->pEnv, pNew); }else{ - if( pDel ) pDel->iRoot = 0; + if( pLinked ){ + pLinked->iRoot = 0; + }else if( pDel ){ + assert( pNew->pNext==pDel ); + pNew->pNext = pDel->pNext; + lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs); + sortedFreeLevel(pDb->pEnv, pDel); + } -#if 0 - lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel"); +#if LSM_LOG_STRUCTURE + lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "new-toplevel"); #endif if( freelist.nEntry ){ Freelist *p = &pDb->pWorker->freelist; lsmFree(pDb->pEnv, p->aEntry); @@ -4252,11 +4429,13 @@ for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext)); *pp = pNew; lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel); /* Determine whether or not the next separators will be linked in */ - if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){ + if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot && pNext + && (bFreeOnly==0 || (pNext->flags & LEVEL_FREELIST_ONLY)) + ){ bUseNext = 1; } } /* Allocate the merge object */ @@ -4301,10 +4480,11 @@ ** If the new level is the lowest (oldest) in the db, discard any ** delete keys. Key annihilation. */ pCsr = multiCursorNew(pDb, &rc); if( pCsr ){ + pCsr->flags |= CURSOR_NEXT_OK; rc = multiCursorAddRhs(pCsr, pLevel); } if( rc==LSM_OK && pMerge->nInput > pLevel->nRight ){ rc = btreeCursorNew(pDb, &pNext->lhs, &pCsr->pBtCsr); }else if( pNext ){ @@ -4429,12 +4609,12 @@ if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){ nThis++; }else{ if( nThis>nBest ){ if( (pLevel->iAge!=pThis->iAge+1) - || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge) - ){ + || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge) + ){ pBest = pThis; nBest = nThis; } } if( pLevel->nRight ){ @@ -4493,10 +4673,163 @@ ){ return 1; } return 0; } + +typedef struct MoveBlockCtx MoveBlockCtx; +struct MoveBlockCtx { + int iSeen; /* Previous free block on list */ + int iFrom; /* Total number of blocks in file */ +}; + +static int moveBlockCb(void *pCtx, int iBlk, i64 iSnapshot){ + MoveBlockCtx *p = (MoveBlockCtx *)pCtx; + assert( p->iFrom==0 ); + if( iBlk==(p->iSeen-1) ){ + p->iSeen = iBlk; + return 0; + } + p->iFrom = p->iSeen-1; + return 1; +} + +/* +** This function is called to further compact a database for which all +** of the content has already been merged into a single segment. If +** possible, it moves the contents of a single block from the end of the +** file to a free-block that lies closer to the start of the file (allowing +** the file to be eventually truncated). +*/ +static int sortedMoveBlock(lsm_db *pDb, int *pnWrite){ + Snapshot *p = pDb->pWorker; + Level *pLvl = lsmDbSnapshotLevel(p); + int iFrom; /* Block to move */ + int iTo; /* Destination to move block to */ + int rc; /* Return code */ + + MoveBlockCtx sCtx; + + assert( pLvl->pNext==0 && pLvl->nRight==0 ); + assert( p->redirect.n<=LSM_MAX_BLOCK_REDIRECTS ); + + *pnWrite = 0; + + /* Check that the redirect array is not already full. If it is, return + ** without moving any database content. */ + if( p->redirect.n>=LSM_MAX_BLOCK_REDIRECTS ) return LSM_OK; + + /* Find the last block of content in the database file. Do this by + ** traversing the free-list in reverse (descending block number) order. + ** The first block not on the free list is the one that will be moved. + ** Since the db consists of a single segment, there is no ambiguity as + ** to which segment the block belongs to. */ + sCtx.iSeen = p->nBlock+1; + sCtx.iFrom = 0; + rc = lsmWalkFreelist(pDb, 1, moveBlockCb, &sCtx); + if( rc!=LSM_OK || sCtx.iFrom==0 ) return rc; + iFrom = sCtx.iFrom; + + /* Find the first free block in the database, ignoring block 1. Block + ** 1 is tricky as it is smaller than the other blocks. */ + rc = lsmBlockAllocate(pDb, iFrom, &iTo); + if( rc!=LSM_OK || iTo==0 ) return rc; + assert( iTo!=1 && iTopFS, &pLvl->lhs, iTo, iFrom); + if( rc==LSM_OK ){ + if( p->redirect.a==0 ){ + int nByte = sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS; + p->redirect.a = lsmMallocZeroRc(pDb->pEnv, nByte, &rc); + } + if( rc==LSM_OK ){ + memmove(&p->redirect.a[1], &p->redirect.a[0], + sizeof(struct RedirectEntry) * p->redirect.n + ); + p->redirect.a[0].iFrom = iFrom; + p->redirect.a[0].iTo = iTo; + p->redirect.n++; + + rc = lsmBlockFree(pDb, iFrom); + + *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS); + pLvl->lhs.pRedirect = &p->redirect; + } + } + +#if LSM_LOG_STRUCTURE + if( rc==LSM_OK ){ + char aBuf[64]; + sprintf(aBuf, "move-block %d/%d", p->redirect.n-1, LSM_MAX_BLOCK_REDIRECTS); + lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, aBuf); + } +#endif + return rc; +} + +/* +*/ +static int mergeInsertFreelistSegments( + lsm_db *pDb, + int nFree, + MergeWorker *pMW +){ + int rc = LSM_OK; + if( nFree>0 ){ + MultiCursor *pCsr = pMW->pCsr; + Level *pLvl = pMW->pLevel; + SegmentPtr *aNew1; + Segment *aNew2; + + Level *pIter; + Level *pNext; + int i = 0; + + aNew1 = (SegmentPtr *)lsmMallocZeroRc( + pDb->pEnv, sizeof(SegmentPtr) * (pCsr->nPtr+nFree), &rc + ); + if( rc ) return rc; + memcpy(&aNew1[nFree], pCsr->aPtr, sizeof(SegmentPtr)*pCsr->nPtr); + pCsr->nPtr += nFree; + lsmFree(pDb->pEnv, pCsr->aTree); + lsmFree(pDb->pEnv, pCsr->aPtr); + pCsr->aTree = 0; + pCsr->aPtr = aNew1; + + aNew2 = (Segment *)lsmMallocZeroRc( + pDb->pEnv, sizeof(Segment) * (pLvl->nRight+nFree), &rc + ); + if( rc ) return rc; + memcpy(&aNew2[nFree], pLvl->aRhs, sizeof(Segment)*pLvl->nRight); + pLvl->nRight += nFree; + lsmFree(pDb->pEnv, pLvl->aRhs); + pLvl->aRhs = aNew2; + + for(pIter=pDb->pWorker->pLevel; rc==LSM_OK && pIter!=pLvl; pIter=pNext){ + Segment *pSeg = &pLvl->aRhs[i]; + memcpy(pSeg, &pIter->lhs, sizeof(Segment)); + + pCsr->aPtr[i].pSeg = pSeg; + pCsr->aPtr[i].pLevel = pLvl; + rc = segmentPtrEnd(pCsr, &pCsr->aPtr[i], 0); + + pDb->pWorker->pLevel = pNext = pIter->pNext; + sortedFreeLevel(pDb->pEnv, pIter); + i++; + } + assert( i==nFree ); + assert( rc!=LSM_OK || pDb->pWorker->pLevel==pLvl ); + + for(i=nFree; inPtr; i++){ + pCsr->aPtr[i].pSeg = &pLvl->aRhs[i]; + } + + lsmFree(pDb->pEnv, pMW->aGobble); + pMW->aGobble = 0; + } + return rc; +} static int sortedWork( lsm_db *pDb, /* Database handle. Must be worker. */ int nWork, /* Number of pages of work to do */ int nMerge, /* Try to merge this many levels at once */ @@ -4516,33 +4849,81 @@ /* Find a level to work on. */ rc = sortedSelectLevel(pDb, nMerge, &pLevel); assert( rc==LSM_OK || pLevel==0 ); if( pLevel==0 ){ + int nDone = 0; + Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker); + if( bFlush==0 && nMerge==1 && pTopLevel && pTopLevel->pNext==0 ){ + rc = sortedMoveBlock(pDb, &nDone); + } + nRemaining -= nDone; + /* Could not find any work to do. Finished. */ - break; + if( nDone==0 ) break; }else{ + int bSave = 0; + Freelist freelist = {0, 0, 0}; MergeWorker mergeworker; /* State used to work on the level merge */ + assert( pDb->bIncrMerge==0 ); + assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 ); + + pDb->bIncrMerge = 1; rc = mergeWorkerInit(pDb, pLevel, &mergeworker); assert( mergeworker.nWork==0 ); + while( rc==LSM_OK && 0==mergeWorkerDone(&mergeworker) - && mergeworker.nWorkbUseFreelist) ){ + int eType = rtTopic(mergeworker.pCsr->eType); rc = mergeWorkerStep(&mergeworker); + + /* If the cursor now points at the first entry past the end of the + ** user data (i.e. either to EOF or to the first free-list entry + ** that will be added to the run), then check if it is possible to + ** merge in any free-list entries that are either in-memory or in + ** free-list-only blocks. */ + if( rc==LSM_OK && nMerge==1 && eType==0 + && (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker)) + ){ + int nFree = 0; /* Number of free-list-only levels to merge */ + Level *pLvl; + assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 ); + + /* Now check if all levels containing data newer than this one + ** are single-segment free-list only levels. If so, they will be + ** merged in now. */ + for(pLvl=pDb->pWorker->pLevel; + pLvl!=mergeworker.pLevel && (pLvl->flags & LEVEL_FREELIST_ONLY); + pLvl=pLvl->pNext + ){ + assert( pLvl->nRight==0 ); + nFree++; + } + if( pLvl==mergeworker.pLevel ){ + + rc = mergeInsertFreelistSegments(pDb, nFree, &mergeworker); + if( rc==LSM_OK ){ + rc = multiCursorVisitFreelist(mergeworker.pCsr); + } + if( rc==LSM_OK ){ + rc = multiCursorSetupTree(mergeworker.pCsr, 0); + pDb->pFreelist = &freelist; + pDb->bUseFreelist = 1; + } + } + } } nRemaining -= LSM_MAX(mergeworker.nWork, 1); - /* Check if the merge operation is completely finished. If so, the - ** Merge object and the right-hand-side of the level can be deleted. - ** - ** Otherwise, gobble up (declare eligible for recycling) any pages - ** from rhs segments for which the content has been completely merged - ** into the lhs of the level. - */ if( rc==LSM_OK ){ + /* Check if the merge operation is completely finished. If not, + ** gobble up (declare eligible for recycling) any pages from rhs + ** segments for which the content has been completely merged into + ** the lhs of the level. */ if( mergeWorkerDone(&mergeworker)==0 ){ int i; for(i=0; inRight; i++){ SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i]; if( pGobble->pSeg->iRoot ){ @@ -4549,65 +4930,86 @@ rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i); }else if( mergeworker.aGobble[i] ){ lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1); } } - }else if( pLevel->lhs.iFirst==0 ){ - /* If the new level is completely empty, remove it from the - ** database snapshot. This can only happen if all input keys were - ** annihilated. Since keys are only annihilated if the new level - ** is the last in the linked list (contains the most ancient of - ** database content), this guarantees that pLevel->pNext==0. */ - - Level *pTop; /* Top level of worker snapshot */ - Level **pp; /* Read/write iterator for Level.pNext list */ - int i; - assert( pLevel->pNext==0 ); - - /* Remove the level from the worker snapshot. */ - pTop = lsmDbSnapshotLevel(pWorker); - for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext)); - *pp = pLevel->pNext; - lsmDbSnapshotSetLevel(pWorker, pTop); - - /* Free the Level structure. */ - lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->lhs); - for(i=0; inRight; i++){ - lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]); - } - sortedFreeLevel(pDb->pEnv, pLevel); - }else{ - int i; - - /* Free the separators of the next level, if required. */ - if( pLevel->pMerge->nInput > pLevel->nRight ){ - assert( pLevel->pNext->lhs.iRoot ); - pLevel->pNext->lhs.iRoot = 0; - } - - /* Free the right-hand-side of pLevel */ - for(i=0; inRight; i++){ - lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]); - } - lsmFree(pDb->pEnv, pLevel->aRhs); - pLevel->nRight = 0; - pLevel->aRhs = 0; - - /* Free the Merge object */ - lsmFree(pDb->pEnv, pLevel->pMerge); - pLevel->pMerge = 0; + }else{ + int i; + int bEmpty; + mergeWorkerShutdown(&mergeworker, &rc); + bEmpty = (pLevel->lhs.iFirst==0); + + if( bEmpty==0 && rc==LSM_OK ){ + rc = lsmFsSortedFinish(pDb->pFS, &pLevel->lhs); + } + + if( pDb->bUseFreelist ){ + Freelist *p = &pDb->pWorker->freelist; + lsmFree(pDb->pEnv, p->aEntry); + memcpy(p, &freelist, sizeof(freelist)); + pDb->bUseFreelist = 0; + pDb->pFreelist = 0; + bSave = 1; + } + + for(i=0; inRight; i++){ + lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]); + } + + if( bEmpty ){ + /* If the new level is completely empty, remove it from the + ** database snapshot. This can only happen if all input keys were + ** annihilated. Since keys are only annihilated if the new level + ** is the last in the linked list (contains the most ancient of + ** database content), this guarantees that pLevel->pNext==0. */ + Level *pTop; /* Top level of worker snapshot */ + Level **pp; /* Read/write iterator for Level.pNext list */ + + assert( pLevel->pNext==0 ); + + /* Remove the level from the worker snapshot. */ + pTop = lsmDbSnapshotLevel(pWorker); + for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext)); + *pp = pLevel->pNext; + lsmDbSnapshotSetLevel(pWorker, pTop); + + /* Free the Level structure. */ + sortedFreeLevel(pDb->pEnv, pLevel); + }else{ + + /* Free the separators of the next level, if required. */ + if( pLevel->pMerge->nInput > pLevel->nRight ){ + assert( pLevel->pNext->lhs.iRoot ); + pLevel->pNext->lhs.iRoot = 0; + } + + /* Zero the right-hand-side of pLevel */ + lsmFree(pDb->pEnv, pLevel->aRhs); + pLevel->nRight = 0; + pLevel->aRhs = 0; + + /* Free the Merge object */ + lsmFree(pDb->pEnv, pLevel->pMerge); + pLevel->pMerge = 0; + } + + if( bSave && rc==LSM_OK ){ + pDb->bIncrMerge = 0; + rc = lsmSaveWorker(pDb, 0); + } } } /* Clean up the MergeWorker object initialized above. If no error ** has occurred, invoke the work-hook to inform the application that ** the database structure has changed. */ mergeWorkerShutdown(&mergeworker, &rc); + pDb->bIncrMerge = 0; if( rc==LSM_OK ) sortedInvokeWorkHook(pDb); -#if 0 - lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work"); +#if LSM_LOG_STRUCTURE + lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "work"); #endif assertBtreeOk(pDb, &pLevel->lhs); assertRunInOrder(pDb, &pLevel->lhs); /* If bFlush is true and the database is no longer considered "full", @@ -4651,15 +5053,23 @@ *pRc = rc; } assert( *pRc==LSM_OK || bRet==0 ); return bRet; } + +/* +** Create a new free-list only top-level segment. Return LSM_OK if successful +** or an LSM error code if some error occurs. +*/ +static int sortedNewFreelistOnly(lsm_db *pDb){ + return sortedNewToplevel(pDb, TREE_NONE, 0); +} int lsmSaveWorker(lsm_db *pDb, int bFlush){ Snapshot *p = pDb->pWorker; if( p->freelist.nEntry>pDb->nMaxFreelist ){ - int rc = sortedNewToplevel(pDb, TREE_NONE, 0); + int rc = sortedNewFreelistOnly(pDb); if( rc!=LSM_OK ) return rc; } return lsmCheckpointSaveWorker(pDb, bFlush); } @@ -4669,21 +5079,25 @@ int nMerge, /* Minimum segments to merge together */ int nPage, /* Number of pages to write to disk */ int *pnWrite, /* OUT: Pages actually written to disk */ int *pbCkpt /* OUT: True if an auto-checkpoint is req. */ ){ + Snapshot *pWorker; /* Worker snapshot */ int rc = LSM_OK; /* Return code */ int bDirty = 0; int nMax = nPage; /* Maximum pages to write to disk */ int nRem = nPage; int bCkpt = 0; + + assert( nPage>0 ); /* Open the worker 'transaction'. It will be closed before this function ** returns. */ assert( pDb->pWorker==0 ); rc = lsmBeginWork(pDb); if( rc!=LSM_OK ) return rc; + pWorker = pDb->pWorker; /* If this connection is doing auto-checkpoints, set nMax (and nRem) so ** that this call stops writing when the auto-checkpoint is due. The ** caller will do the checkpoint, then possibly call this function again. */ if( bShutdown==0 && pDb->nAutockpt ){ @@ -4749,50 +5163,57 @@ while( rc==LSM_OK && lsmDatabaseFull(pDb) ){ rc = sortedWork(pDb, 16, nMerge, 1, &nPg); nRem -= nPg; } if( rc==LSM_OK ){ - rc = sortedNewToplevel(pDb, TREE_NONE, &nPg); + rc = sortedNewFreelistOnly(pDb); } nRem -= nPg; if( nPg ) bDirty = 1; } + + if( rc==LSM_OK ){ + *pnWrite = (nMax - nRem); + *pbCkpt = (bCkpt && nRem<=0); + if( nMerge==1 && pDb->nAutockpt>0 && *pnWrite>0 + && pWorker->pLevel + && pWorker->pLevel->nRight==0 + && pWorker->pLevel->pNext==0 + ){ + *pbCkpt = 1; + } + } if( rc==LSM_OK && bDirty ){ lsmFinishWork(pDb, 0, &rc); }else{ int rcdummy = LSM_BUSY; lsmFinishWork(pDb, 0, &rcdummy); - } - assert( pDb->pWorker==0 ); - - if( rc==LSM_OK ){ - *pnWrite = (nMax - nRem); - *pbCkpt = (bCkpt && nRem<=0); - }else{ *pnWrite = 0; - *pbCkpt = 0; } - + assert( pDb->pWorker==0 ); return rc; } static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){ - int rc; - int nWrite = 0; - int bCkpt = 0; + int rc = LSM_OK; /* Return code */ + int nWrite = 0; /* Number of pages written */ assert( nMerge>=1 ); - do { - int nThis = 0; - bCkpt = 0; - rc = doLsmSingleWork(pDb, 0, nMerge, nPage-nWrite, &nThis, &bCkpt); - nWrite += nThis; - if( rc==LSM_OK && bCkpt ){ - rc = lsm_checkpoint(pDb, 0); - } - }while( rc==LSM_OK && (nWrite0 ){ + int bCkpt = 0; + do { + int nThis = 0; + bCkpt = 0; + rc = doLsmSingleWork(pDb, 0, nMerge, nPage-nWrite, &nThis, &bCkpt); + nWrite += nThis; + if( rc==LSM_OK && bCkpt ){ + rc = lsm_checkpoint(pDb, 0); + } + }while( rc==LSM_OK && (nWritepFS, iRef, &pRef); - aKey = pageGetKey(pRef, 0, &iTopic, &nKey, &blob); + lsmFsDbPageGet(pDb->pFS, pRun, iRef, &pRef); + aKey = pageGetKey(pRun, pRef, 0, &iTopic, &nKey, &blob); }else{ aCell += lsmVarintGet32(aCell, &nKey); if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal); - sortedReadData(pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob); + sortedReadData(0, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob); aVal = &aKey[nKey]; iTopic = eType; } lsmStringAppendf(&s, "%s%2X:", (i==0?"":" "), iTopic); @@ -5053,11 +5474,12 @@ sortedBlobFree(&blob); } static void infoCellDump( - lsm_db *pDb, + lsm_db *pDb, /* Database handle */ + Segment *pSeg, /* Segment page belongs to */ int bIndirect, /* True to follow indirect refs */ Page *pPg, int iCell, int *peType, int *piPgPtr, @@ -5082,12 +5504,12 @@ if( eType==0 ){ int dummy; Pgno iRef; /* Page number of referenced page */ aCell += lsmVarintGet64(aCell, &iRef); if( bIndirect ){ - lsmFsDbPageGet(pDb->pFS, iRef, &pRef); - pageGetKeyCopy(pDb->pEnv, pRef, 0, &dummy, pBlob); + lsmFsDbPageGet(pDb->pFS, pSeg, iRef, &pRef); + pageGetKeyCopy(pDb->pEnv, pSeg, pRef, 0, &dummy, pBlob); aKey = (u8 *)pBlob->pData; nKey = pBlob->nData; lsmFsPageRelease(pRef); }else{ aKey = (u8 *)""; @@ -5094,11 +5516,11 @@ nKey = 11; } }else{ aCell += lsmVarintGet32(aCell, &nKey); if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal); - sortedReadData(pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob); + sortedReadData(pSeg, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob); aVal = &aKey[nKey]; } if( peType ) *peType = eType; if( piPgPtr ) *piPgPtr = iPgPtr; @@ -5133,20 +5555,42 @@ ){ int rc = LSM_OK; /* Return code */ Page *pPg = 0; /* Handle for page iPg */ int i, j; /* Loop counters */ const int perLine = 16; /* Bytes per line in the raw hex dump */ + Segment *pSeg = 0; + Snapshot *pSnap; int bValues = (flags & INFO_PAGE_DUMP_VALUES); int bHex = (flags & INFO_PAGE_DUMP_HEX); int bData = (flags & INFO_PAGE_DUMP_DATA); int bIndirect = (flags & INFO_PAGE_DUMP_INDIRECT); *pzOut = 0; if( iPg==0 ) return LSM_ERROR; - rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg); + assert( pDb->pClient || pDb->pWorker ); + pSnap = pDb->pClient; + if( pSnap==0 ) pSnap = pDb->pWorker; + if( pSnap->redirect.n>0 ){ + Level *pLvl; + int bUse = 0; + for(pLvl=pSnap->pLevel; pLvl->pNext; pLvl=pLvl->pNext); + pSeg = (pLvl->nRight==0 ? &pLvl->lhs : &pLvl->aRhs[pLvl->nRight-1]); + rc = lsmFsSegmentContainsPg(pDb->pFS, pSeg, iPg, &bUse); + if( bUse==0 ){ + pSeg = 0; + } + } + + /* iPg is a real page number (not subject to redirection). So it is safe + ** to pass a NULL in place of the segment pointer as the second argument + ** to lsmFsDbPageGet() here. */ + if( rc==LSM_OK ){ + rc = lsmFsDbPageGet(pDb->pFS, 0, iPg, &pPg); + } + if( rc==LSM_OK ){ Blob blob = {0, 0, 0, 0}; int nKeyWidth = 0; LsmString str; int nRec; @@ -5167,25 +5611,26 @@ lsmStringAppendf(&str, "flags: %04x\n", flags); lsmStringAppendf(&str, "\n"); for(iCell=0; iCellnKeyWidth ) nKeyWidth = nKey; } if( bHex ) nKeyWidth = nKeyWidth * 2; for(iCell=0; iCellpEnv, pRun, 0); lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg); lsmFree(pDb->pEnv, zSeg); - lsmFsDbPageGet(pDb->pFS, pRun->iFirst, &pPg); + lsmFsDbPageGet(pDb->pFS, pRun, pRun->iFirst, &pPg); while( pPg ){ Page *pNext; char *z = 0; infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z); lsmLogMessage(pDb, LSM_OK, "%s", z); @@ -5294,18 +5739,26 @@ int bVals, /* Output the values from each segment */ const char *zWhy /* Caption to print near top of dump */ ){ Snapshot *pDump = pSnap; Level *pTopLevel; + char *zFree = 0; + assert( pSnap ); pTopLevel = lsmDbSnapshotLevel(pDump); if( pDb->xLog && pTopLevel ){ + static int nCall = 0; Level *pLevel; int iLevel = 0; - lsmLogMessage(pDb, LSM_OK, "Database structure (%s)", zWhy); + nCall++; + lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy); + +#if 0 + if( nCall==1031 || nCall==1032 ) bKeys=1; +#endif for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){ char zLeft[1024]; char zRight[1024]; int i = 0; @@ -5372,10 +5825,16 @@ sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals); } } } } + + lsmInfoFreelist(pDb, &zFree); + lsmLogMessage(pDb, LSM_OK, "Freelist: %s", zFree); + lsmFree(pDb->pEnv, zFree); + + assert( lsmFsIntegrityCheck(pDb) ); } void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){ Level *pNext; Level *p; @@ -5410,11 +5869,11 @@ static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){ Page *pPg = 0; Blob blob1 = {0, 0, 0, 0}; Blob blob2 = {0, 0, 0, 0}; - lsmFsDbPageGet(pDb->pFS, pSeg->iFirst, &pPg); + lsmFsDbPageGet(pDb->pFS, pSeg, pSeg->iFirst, &pPg); while( pPg ){ u8 *aData; int nData; Page *pNext; aData = lsmFsPageData(pPg, &nData); @@ -5421,21 +5880,21 @@ if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){ int i; int nRec = pageGetNRec(aData, nData); for(i=0; ipEnv, pPg, i, &iTopic1, &blob1); + pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i, &iTopic1, &blob1); if( i==0 && blob2.nData ){ assert( sortedKeyCompare( pDb->xCmp, iTopic2, blob2.pData, blob2.nData, iTopic1, blob1.pData, blob1.nData )<0 ); } if( i<(nRec-1) ){ - pageGetKeyCopy(pDb->pEnv, pPg, i+1, &iTopic2, &blob2); + pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i+1, &iTopic2, &blob2); assert( sortedKeyCompare( pDb->xCmp, iTopic1, blob1.pData, blob1.nData, iTopic2, blob2.pData, blob2.nData )<0 ); } @@ -5565,11 +6024,11 @@ rc = btreeCursorNew(pDb, pSeg, &pCsr); if( rc==LSM_OK ){ rc = btreeCursorFirst(pCsr); } if( rc==LSM_OK ){ - rc = lsmFsDbPageGet(pFS, pSeg->iFirst, &pPg); + rc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pPg); } while( rc==LSM_OK ){ Page *pNext; u8 *aData; @@ -5587,11 +6046,11 @@ && 0!=pageGetNRec(aData, nData) ){ u8 *pKey; int nKey; int iTopic; - pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob); + pKey = pageGetKey(pSeg, pPg, 0, &iTopic, &nKey, &blob); assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) ); assert( lsmFsPageNumber(pPg)==pCsr->iPtr ); rc = btreeCursorNext(pCsr); } } Index: tool/lsmview.tcl ================================================================== --- tool/lsmview.tcl +++ tool/lsmview.tcl @@ -41,11 +41,11 @@ } namespace import ::autoscroll::* ############################################################################# proc exec_lsmtest_show {args} { - set fd [open [list |lsmtest show {*}$args]] + set fd [open [list |lsmtest show {*}$args 2>/dev/null]] set res "" while {![eof $fd]} { set line [gets $fd] if {[regexp {^\#.*} $line]} continue if {[regexp {^Leaked*} $line]} continue