Index: lsm-test/lsmtest_main.c ================================================================== --- lsm-test/lsmtest_main.c +++ lsm-test/lsmtest_main.c @@ -183,11 +183,11 @@ ScanResult *p = (ScanResult *)pCtx; u8 *aKey = (u8 *)pKey; u8 *aVal = (u8 *)pVal; int i; - if( test_scan_debug ) printf("%s ", (char *)pKey); + if( test_scan_debug ) printf("%.20s\n", (char *)pKey); #if 0 /* Check tdb_fetch() matches */ int rc = 0; testFetch(p->pDb, pKey, nKey, pVal, nVal, &rc); @@ -264,13 +264,13 @@ res2.nKey1 = nKey1; res2.pKey1 = pKey1; res2.nKey2 = nKey2; res2.pKey2 = pKey2; res2.bReverse = bReverse; tdb_scan(pDb1, pRes1, bReverse, pKey1, nKey1, pKey2, nKey2, scanCompareCb); -if( test_scan_debug ) printf("\n"); +if( test_scan_debug ) printf("\n\n\n"); tdb_scan(pDb2, pRes2, bReverse, pKey1, nKey1, pKey2, nKey2, scanCompareCb); -if( test_scan_debug ) printf("\n"); +if( test_scan_debug ) printf("\n\n\n"); if( res1.nRow!=res2.nRow || res1.cksum1!=res2.cksum1 || res1.cksum2!=res2.cksum2 ){ @@ -465,10 +465,19 @@ } do_crash_test(zPattern, &rc); return rc; } + +static lsm_db *configure_lsm_db(TestDb *pDb){ + lsm_db *pLsm; + pLsm = tdb_lsm(pDb); + if( pLsm ){ + tdb_lsm_config_str(pDb, "mmap=1 autowork=1 nmerge=4 worker_nmerge=4"); + } + return pLsm; +} int do_speed_tests(int nArg, char **azArg){ struct DbSystem { const char *zLibrary; @@ -592,23 +601,12 @@ testCaseBegin(&rc, 0, "speed.insert.%s", aSys[j].zLibrary); rc = tdb_open(aSys[j].zLibrary, 0, 1, &pDb); if( rc ) return rc; - pLsm = tdb_lsm(pDb); - - if( pLsm ){ - int bMmap = 0; - int nLimit = 2 * 1024 * 1024; - int eSafety = 1; - int bUseLog = 1; - - lsm_config(pLsm, LSM_CONFIG_WRITE_BUFFER, &nLimit); - lsm_config(pLsm, LSM_CONFIG_SAFETY, &eSafety); - lsm_config(pLsm, LSM_CONFIG_MMAP, &bMmap); - lsm_config(pLsm, LSM_CONFIG_USE_LOG, &bUseLog); - } + + pLsm = configure_lsm_db(pDb); testTimeInit(); for(i=0; i1 ){ - testPrintError("Usage: insert ?DATABASE?\n"); + if( nArg>2 ){ + testPrintError("Usage: insert ?DATABASE? ?LSM-CONFIG?\n"); return 1; } - if( nArg==1 ){ - zDb = azArg[0]; - } + if( nArg==1 ){ zDb = azArg[0]; } + if( nArg==2 ){ zConfig = azArg[1]; } testMallocUninstall(tdb_lsm_env()); rc = tdb_open(zDb, 0, 1, &pDb); if( rc!=0 ){ testPrintError("Error opening db \"%s\": %d\n", zDb, rc); }else{ - InsertWriteHook hook; memset(&hook, 0, sizeof(hook)); hook.pOut = fopen("writelog.txt", "w"); pData = testDatasourceNew(&defn); tdb_lsm_config_work_hook(pDb, do_insert_work_hook, 0); tdb_lsm_write_hook(pDb, do_insert_write_hook, (void *)&hook); - - for(i=0; iworker_mutex); - pWorker = p->pWorker; - pthread_mutex_unlock(&p->worker_mutex); - - while( pWorker ){ + while( (pWorker = p->pWorker) ){ int nWrite = 0; int rc; /* Do some work. If an error occurs, exit. */ + pthread_mutex_unlock(&p->worker_mutex); rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite); + pthread_mutex_lock(&p->worker_mutex); if( rc!=LSM_OK ){ p->worker_rc = rc; break; } @@ -738,19 +737,17 @@ ** to do at this point, wait on the condition variable. The thread will ** wake up when it is signaled either because the client thread has ** flushed an in-memory tree into the db file or when the connection ** is being closed. */ if( nWrite==0 ){ - pthread_mutex_lock(&p->worker_mutex); if( p->pWorker && p->bDoWork==0 ){ pthread_cond_wait(&p->worker_cond, &p->worker_mutex); } p->bDoWork = 0; - pWorker = p->pWorker; - pthread_mutex_unlock(&p->worker_mutex); } } + pthread_mutex_unlock(&p->worker_mutex); return 0; } @@ -883,16 +880,16 @@ pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker); memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker); pDb->nWorker = nWorker; rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_CHECKPOINT, - nWorker==1 ? 32 : 0 + nWorker==1 ? 512 : 0 ); } if( rc==0 && nWorker==2 ){ - rc = mt_start_worker(pDb, 1, zFilename, 0, 32); + rc = mt_start_worker(pDb, 1, zFilename, 0, 512); } return rc; } @@ -901,10 +898,95 @@ } int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){ return test_lsm_mt(zFilename, 2, bClear, ppDb); } + +int test_lsm_config_str( + lsm_db *pDb, + int bWorker, + const char *zStr +){ + + struct CfgParam { + const char *zParam; + int bWorker; + int eParam; + } aParam[] = { + { "write_buffer", 0, LSM_CONFIG_WRITE_BUFFER }, + { "page_size", 0, LSM_CONFIG_PAGE_SIZE }, + { "safety", 0, LSM_CONFIG_SAFETY }, + { "autowork", 0, LSM_CONFIG_AUTOWORK }, + { "log_size", 0, LSM_CONFIG_LOG_SIZE }, + { "mmap", 0, LSM_CONFIG_MMAP }, + { "use_log", 0, LSM_CONFIG_USE_LOG }, + { "nmerge", 0, LSM_CONFIG_NMERGE }, + { "worker_nmerge", 1, LSM_CONFIG_NMERGE }, + { 0, 0 } + }; + char *z = zStr; + + while( z[0] && pDb ){ + char *zStart; + + /* Skip whitespace */ + while( *z==' ' ) z++; + zStart = z; + + while( *z && *z!='=' ) z++; + if( *z ){ + int eParam; + int i; + int iVal; + int rc; + char zParam[32]; + int nParam = z-zStart; + if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error; + + memcpy(zParam, zStart, nParam); + zParam[nParam] = '\0'; + rc = testArgSelect(aParam, "param", zParam, &i); + if( rc!=0 ) return rc; + eParam = aParam[i].eParam; + + z++; + zStart = z; + while( *z>='0' && *z<='9' ) z++; + nParam = z-zStart; + if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error; + memcpy(zParam, zStart, nParam); + zParam[nParam] = '\0'; + iVal = atoi(zParam); + + if( bWorker || aParam[i].bWorker==0 ){ + lsm_config(pDb, eParam, &iVal); + } + }else if( z!=zStart ){ + goto syntax_error; + } + } + + return 0; + syntax_error: + testPrintError("syntax error at: \"%s\"\n", z); + return 1; +} + +int tdb_lsm_config_str(TestDb *pDb, const char *zStr){ + int rc = 0; + if( tdb_lsm(pDb) ){ + int i; + LsmDb *pLsm = (LsmDb *)pDb; + + rc = test_lsm_config_str(pLsm->db, 0, zStr); + for(i=0; rc==0 && inWorker; i++){ + rc = test_lsm_config_str(pLsm->aWorker[i].pWorker, 1, zStr); + } + } + return rc; +} + #else static void mt_shutdown(LsmDb *pDb) { unused_parameter(pDb); } Index: src/lsm.h ================================================================== --- src/lsm.h +++ src/lsm.h @@ -161,19 +161,24 @@ ** database file. False otherwise. ** ** LSM_CONFIG_USE_LOG ** A read/write boolean parameter. True (the default) to use the log ** file normally. False otherwise. +** +** LSM_CONFIG_NMERGE +** A read/write integer parameter. The minimum number of segments to +** merge together at a time. Default value 4. */ #define LSM_CONFIG_WRITE_BUFFER 1 #define LSM_CONFIG_PAGE_SIZE 2 #define LSM_CONFIG_SAFETY 3 #define LSM_CONFIG_BLOCK_SIZE 4 #define LSM_CONFIG_AUTOWORK 5 #define LSM_CONFIG_LOG_SIZE 6 #define LSM_CONFIG_MMAP 7 #define LSM_CONFIG_USE_LOG 8 +#define LSM_CONFIG_NMERGE 9 #define LSM_SAFETY_OFF 0 #define LSM_SAFETY_NORMAL 1 #define LSM_SAFETY_FULL 2 Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -44,10 +44,11 @@ #define LSM_BLOCK_SIZE (2 * 1024 * 1024) #define LSM_TREE_BYTES (2 * 1024 * 1024) #define LSM_ECOLA 4 #define LSM_DEFAULT_LOG_SIZE (128*1024) +#define LSM_DEFAULT_NMERGE 4 /* Places where a NULL needs to be changed to a real lsm_env pointer ** are marked with NEED_ENV */ #define NEED_ENV ((lsm_env*)0) @@ -69,11 +70,10 @@ typedef struct MultiCursor MultiCursor; typedef struct Page Page; typedef struct Segment Segment; typedef struct SegmentMerger SegmentMerger; typedef struct Snapshot Snapshot; -typedef struct SortedRun SortedRun; typedef struct TransMark TransMark; typedef struct Tree Tree; typedef struct TreeMark TreeMark; typedef struct TreeVersion TreeVersion; typedef struct TreeCursor TreeCursor; @@ -170,10 +170,11 @@ int (*xCmp)(void *, int, void *, int); /* Compare function */ int nTreeLimit; /* Maximum size of in-memory tree in bytes */ int bAutowork; /* True to do auto-work after writing */ int eSafety; /* LSM_SAFETY_OFF, NORMAL or FULL */ + int nMerge; /* Configured by LSM_CONFIG_NMERGE */ int nLogSz; /* Configured by LSM_CONFIG_LOG_SIZE */ int bUseLog; /* Configured by LSM_CONFIG_USE_LOG */ int nDfltPgsz; /* Configured by LSM_CONFIG_PAGE_SIZE */ int nDfltBlksz; /* Configured by LSM_CONFIG_BLOCK_SIZE */ @@ -200,22 +201,17 @@ /* Work done notification callback */ void (*xWork)(lsm_db *, void *); void *pWorkCtx; }; -struct SortedRun { +struct Segment { int iFirst; /* First page of this run */ int iLast; /* Last page of this run */ Pgno iRoot; /* Root page number (if any) */ int nSize; /* Size of this run in pages */ }; -struct Segment { - SortedRun run; /* Main array */ - SortedRun sep; /* If sep.iFirst!=0, the separators array */ -}; - /* ** iSplitTopic/pSplitKey/nSplitKey: ** If nRight>0, this buffer contains a copy of the largest key that has ** already been written to the left-hand-side of the level. */ @@ -240,22 +236,20 @@ ** access to the associated Level struct. ** ** bHierReadonly: ** True if the b-tree hierarchy is currently read-only. ** -** aiOutputOff: -** The byte offset to write to next within the last page of the output -** segments main run (aiOutputOff[0]) or separators run (aiOutputOff[1]). -** If either page is read-only, then the associated aiOutputOff[] entry -** is set to a negative value. +** iOutputOff: +** The byte offset to write to next within the last page of the +** output segment. */ struct Merge { int nInput; /* Number of input runs being merged */ MergeInput *aInput; /* Array nInput entries in size */ int nSkip; /* Number of separators entries to skip */ - int aiOutputOff[2]; /* Write offsets on run output pages */ - int bHierReadonly; /* True if b-tree heirarchy is read-only */ + int iOutputOff; /* Write offset on output page */ + int bHierReadonly; /* True if b-tree heirarchies are read-only */ }; struct MergeInput { Pgno iPg; /* Page on which next input is stored */ int iCell; /* Cell containing next input to merge */ }; @@ -369,30 +363,29 @@ void lsmFsSetPageSize(FileSystem *, int); int lsmFsFileid(lsm_db *pDb, void **ppId, int *pnId); /* Creating, populating, gobbling and deleting sorted runs. */ -int lsmFsPhantom(FileSystem *, SortedRun *); -void lsmFsPhantomFree(FileSystem *pFS); -void lsmFsGobble(Snapshot *, SortedRun *, Page *); -int lsmFsSortedDelete(FileSystem *, Snapshot *, int, SortedRun *); -int lsmFsSortedFinish(FileSystem *, SortedRun *); -int lsmFsSortedAppend(FileSystem *, Snapshot *, SortedRun *, Page **); -int lsmFsPhantomMaterialize(FileSystem *, Snapshot *, SortedRun *); +void lsmFsGobble(Snapshot *, Segment *, Page *); +int lsmFsSortedDelete(FileSystem *, Snapshot *, int, Segment *); +int lsmFsSortedFinish(FileSystem *, Segment *); +int lsmFsSortedAppend(FileSystem *, Snapshot *, Segment *, Page **); +int lsmFsPhantomMaterialize(FileSystem *, Snapshot *, Segment *); /* Functions to retrieve the lsm_env pointer from a FileSystem or Page object */ lsm_env *lsmFsEnv(FileSystem *); lsm_env *lsmPageEnv(Page *); +FileSystem *lsmPageFS(Page *); int lsmFsSectorSize(FileSystem *); void lsmSortedSplitkey(lsm_db *, Level *, int *); int lsmFsSetupAppendList(lsm_db *db); /* Reading sorted run content. */ int lsmFsDbPageGet(FileSystem *, Pgno, Page **); -int lsmFsDbPageNext(SortedRun *, Page *, int eDir, Page **); +int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **); int lsmFsPageWrite(Page *); u8 *lsmFsPageData(Page *, int *); int lsmFsPageRelease(Page *); int lsmFsPagePersist(Page *); @@ -437,10 +430,12 @@ */ int lsmInfoPageDump(lsm_db *, Pgno, int, char **); int lsmSortedFlushTree(lsm_db *, int *); void lsmSortedCleanup(lsm_db *); int lsmSortedAutoWork(lsm_db *, int nUnit); + +void lsmSortedRemap(lsm_db *pDb); void lsmSortedFreeLevel(lsm_env *pEnv, Level *); int lsmSortedFlushDb(lsm_db *); int lsmSortedAdvanceAll(lsm_db *pDb); @@ -468,11 +463,10 @@ int lsmSaveCursors(lsm_db *pDb); int lsmRestoreCursors(lsm_db *pDb); void lsmSortedDumpStructure(lsm_db *pDb, Snapshot *, int, int, const char *); -void lsmFsDumpBlockmap(lsm_db *, SortedRun *); void lsmFsDumpBlocklists(lsm_db *); void lsmPutU32(u8 *, u32); u32 lsmGetU32(u8 *); Index: src/lsm_ckpt.c ================================================================== --- src/lsm_ckpt.c +++ src/lsm_ckpt.c @@ -64,16 +64,14 @@ ** 1. Checksum value 1. ** 2. Checksum value 2. ** ** In the above, a segment record is: ** -** 1. First page of main array, -** 2. Last page of main array, -** 3. Size of main array in pages, -** 4. First page of separators array (or 0), -** 5. Last page of separators array (or 0), -** 6. Root page of separators array (or 0). +** 1. First page of array, +** 2. Last page of array, +** 3. Root page of array (or 0), +** 4. Size of array in pages, */ /* ** OVERSIZED CHECKPOINT BLOBS: ** @@ -221,22 +219,14 @@ int *piOut, int *pRc ){ int iOut = *piOut; - ckptSetValue(p, iOut++, pSeg->run.iFirst, pRc); - ckptSetValue(p, iOut++, pSeg->run.iLast, pRc); - ckptSetValue(p, iOut++, pSeg->run.nSize, pRc); - if( segmentHasSeparators(pSeg) ){ - ckptSetValue(p, iOut++, pSeg->sep.iFirst, pRc); - ckptSetValue(p, iOut++, pSeg->sep.iLast, pRc); - ckptSetValue(p, iOut++, pSeg->sep.iRoot, pRc); - }else{ - ckptSetValue(p, iOut++, 0, pRc); - ckptSetValue(p, iOut++, 0, pRc); - ckptSetValue(p, iOut++, 0, pRc); - } + ckptSetValue(p, iOut++, pSeg->iFirst, pRc); + ckptSetValue(p, iOut++, pSeg->iLast, pRc); + ckptSetValue(p, iOut++, pSeg->iRoot, pRc); + ckptSetValue(p, iOut++, pSeg->nSize, pRc); *piOut = iOut; } static void ckptExportLevel( @@ -385,22 +375,16 @@ int *piIn, Segment *pSegment /* Populate this structure */ ){ int iIn = *piIn; - assert( pSegment->run.iFirst==0 && pSegment->run.iLast==0 ); - assert( pSegment->run.nSize==0 && pSegment->run.iRoot==0 ); - assert( pSegment->sep.iFirst==0 && pSegment->sep.iLast==0 ); - assert( pSegment->sep.nSize==0 && pSegment->sep.iRoot==0 ); - - pSegment->run.iFirst = aIn[iIn++]; - pSegment->run.iLast = aIn[iIn++]; - pSegment->run.nSize = aIn[iIn++]; - pSegment->sep.iFirst = aIn[iIn++]; - pSegment->sep.iLast = aIn[iIn++]; - pSegment->sep.iRoot = aIn[iIn++]; - if( pSegment->sep.iFirst ) pSegment->sep.nSize = 1; + assert( pSegment->iFirst==0 && pSegment->iLast==0 ); + assert( pSegment->nSize==0 && pSegment->iRoot==0 ); + pSegment->iFirst = aIn[iIn++]; + pSegment->iLast = aIn[iIn++]; + pSegment->iRoot = aIn[iIn++]; + pSegment->nSize = aIn[iIn++]; *piIn = iIn; } static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){ @@ -418,14 +402,13 @@ pLevel->pMerge = pMerge; /* Populate the Merge object. */ pMerge->aInput = (MergeInput *)&pMerge[1]; pMerge->nInput = nInput; - pMerge->aiOutputOff[0] = -1; - pMerge->aiOutputOff[1] = -1; - pMerge->nSkip = (int)aInt[iIn++]; + pMerge->iOutputOff = -1; pMerge->bHierReadonly = 1; + pMerge->nSkip = (int)aInt[iIn++]; for(i=0; iaInput[i].iPg = (Pgno)aInt[iIn++]; pMerge->aInput[i].iCell = (int)aInt[iIn++]; } @@ -496,11 +479,10 @@ static int ckptImport(lsm_db *pDb, void *pCkpt, int nInt, int *pRc){ int ret = 0; if( *pRc==LSM_OK ){ Snapshot *pSnap = pDb->pWorker; - FileSystem *pFS = pDb->pFS; u32 cksum[2] = {0, 0}; u32 *aInt = (u32 *)pCkpt; lsmChecksumBytes((u8 *)aInt, sizeof(u32)*(nInt-2), 0, cksum); if( LSM_LITTLE_ENDIAN ){ @@ -525,11 +507,11 @@ lsmDbSetPagesize(pDb,(int)aInt[CKPT_HDR_PGSZ],(int)aInt[CKPT_HDR_BLKSZ]); /* Import log offset */ ckptImportLog(aInt, &iIn, lsmDatabaseLog(pDb)); - /* Import each level. This loop runs once for each db level. */ + /* Import all levels stored in the checkpoint. */ *pRc = ckptLoadLevels(pDb, aInt, &iIn, nLevel, &pTopLevel); lsmDbSnapshotSetLevel(pSnap, pTopLevel); /* Import the freelist delta */ aDelta = lsmFreelistDeltaPtr(pDb); @@ -683,11 +665,11 @@ int *pnHdrLevel, /* OUT: Levels to write to db header */ void **paVal, /* OUT: Pointer to LEVELS blob */ int *pnVal /* OUT: Size of LEVELS blob in bytes */ ){ int rc = LSM_OK; /* Return code */ - const int SEGMENT_SIZE = 6; /* Size of a checkpoint segment record */ + const int SEGMENT_SIZE = 4; /* Size of a checkpoint segment record */ Level *p; /* Used to iterate through levels */ int nFree; /* Free integers remaining in db header */ int nHdr = 0; /* Number of levels stored in db header */ int nLevels = 0; /* Number of levels stored in LEVELS */ Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -88,46 +88,10 @@ #include #include #include -/* -** A "phantom" run under construction. -** -** Phantom runs are constructed entirely in memory, then written out to -** disk. This is distinct from normal runs, for which each page is written -** to disk as soon as it is completely populated. -** -** They are used when the in-memory tree is flushed to disk. In this case, -** the main run is written directly to disk and the separators run -** accumulated in memory as a phantom run and flushed to disk after the main -** run is completed. This allows the separators run to immediately follow -** the main run in the file - making the entire flush operation a single -** contiguous write. -** -** Before they are flushed to disk, the pages of phantom runs do not have -** page numbers. This means it is not possible to obtain pointers to them. -** In practice, this means that when creating a separators run, a phantom -** run is used to accumulate and write all leaf pages to disk, then a -** second pass is made to populate and append the b-tree hierarchy pages. -*/ -typedef struct PhantomRun PhantomRun; -struct PhantomRun { - SortedRun *pRun; /* Accompanying SortedRun object */ - int nPhantom; /* Number of pages in run */ - int bRunFinished; /* True if the associated run is finished */ - Page *pFirst; /* First page in phantom run */ - Page *pLast; /* Current last page in phantom run */ -}; - -/* -** Maximum number of pages allowed to accumulate in memory when constructing -** a phantom run. If this limit is exceeded, the phantom run is flushed to -** disk even if it is not finished. -*/ -#define FS_MAX_PHANTOM_PAGES 32 - /* ** File-system object. Each database connection allocates a single instance ** of the following structure. It is used for all access to the database and ** log files. @@ -150,11 +114,10 @@ lsm_env *pEnv; /* Environment pointer */ char *zDb; /* Database file name */ int nMetasize; /* Size of meta pages in bytes */ int nPagesize; /* Database page-size in bytes */ int nBlocksize; /* Database block-size in bytes */ - PhantomRun phantom; /* Phantom run currently under construction */ /* r/w file descriptors for both files. */ lsm_file *fdDb; /* Database file */ lsm_file *fdLog; /* Log file */ @@ -180,10 +143,11 @@ /* ** Database page handle. */ struct Page { u8 *aData; /* Buffer containing page data */ + int nData; /* Bytes of usable data at aData[] */ int iPg; /* Page number */ int nRef; /* Number of outstanding references */ int flags; /* Combination of PAGE_XXX flags */ Page *pHashNext; /* Next page in hash table slot */ Page *pLruNext; /* Next page in LRU list */ @@ -213,15 +177,10 @@ ** Number of pgsz byte pages omitted from the start of block 1. The start ** of block 1 contains two 4096 byte meta pages (8192 bytes in total). */ #define BLOCK1_HDR_SIZE(pgsz) LSM_MAX(1, 8192/(pgsz)) -/* -** Return true if the SortedRun passed as the second argument is a phantom -** run currently being constructed by FileSystem object pFS. -*/ -#define isPhantom(pFS, pSorted) ((pSorted) && (pFS)->phantom.pRun==(pSorted)) /* ** Wrappers around the VFS methods of the lsm_env object: ** ** lsmEnvOpen() @@ -415,11 +374,10 @@ if( pFS ){ Page *pPg; lsm_env *pEnv = pFS->pEnv; assert( pFS->nOut==0 ); - pPg = pFS->pLruFirst; while( pPg ){ Page *pNext = pPg->pLruNext; if( pPg->flags & PAGE_FREE ) lsmFree(pEnv, pPg->aData); lsmFree(pEnv, pPg); @@ -598,10 +556,13 @@ pFS->pLruFirst = pPg; } pFS->pLruLast = pPg; } +/* +** Remove page pPg from the hash table. +*/ static void fsPageRemoveFromHash(FileSystem *pFS, Page *pPg){ int iHash; Page **pp; iHash = fsHashKey(pFS->nHash, pPg->iPg); @@ -670,10 +631,12 @@ if( rc==LSM_OK ){ u8 *aData = (u8 *)pFS->pMap; for(pFix=pFS->pLruFirst; pFix; pFix=pFix->pLruNext){ pFix->aData = &aData[pFS->nPagesize * (i64)(pFix->iPg-1)]; } + + lsmSortedRemap(pFS->pDb); } *pRc = rc; } } @@ -740,10 +703,11 @@ /* If the xRead() call was successful (or not attempted), link the ** page into the page-cache hash-table. Otherwise, if it failed, ** free the buffer. */ if( rc==LSM_OK ){ p->pHashNext = pFS->apHash[iHash]; + p->nData = pFS->nPagesize - (p->flags & PAGE_SHORT); pFS->apHash[iHash] = p; }else{ fsPageBufferFree(p); p = 0; } @@ -777,12 +741,12 @@ } return rc; } static int fsRunEndsBetween( - SortedRun *pRun, - SortedRun *pIgnore, + Segment *pRun, + Segment *pIgnore, int iFirst, int iLast ){ return (pRun!=pIgnore && ( (pRun->iFirst>=iFirst && pRun->iFirst<=iLast) @@ -790,25 +754,21 @@ )); } static int fsLevelEndsBetween( Level *pLevel, - SortedRun *pIgnore, + Segment *pIgnore, int iFirst, int iLast ){ int i; - if( fsRunEndsBetween(&pLevel->lhs.run, pIgnore, iFirst, iLast) - || fsRunEndsBetween(&pLevel->lhs.sep, pIgnore, iFirst, iLast) - ){ + if( fsRunEndsBetween(&pLevel->lhs, pIgnore, iFirst, iLast) ){ return 1; } for(i=0; inRight; i++){ - if( fsRunEndsBetween(&pLevel->aRhs[i].run, pIgnore, iFirst, iLast) - || fsRunEndsBetween(&pLevel->aRhs[i].sep, pIgnore, iFirst, iLast) - ){ + if( fsRunEndsBetween(&pLevel->aRhs[i], pIgnore, iFirst, iLast) ){ return 1; } } return 0; @@ -815,11 +775,11 @@ } static int fsFreeBlock( FileSystem *pFS, Snapshot *pSnapshot, - SortedRun *pIgnore, /* Ignore this run when searching */ + Segment *pIgnore, /* Ignore this run when searching */ int iBlk ){ int rc = LSM_OK; /* Return code */ int iFirst; /* First page on block iBlk */ int iLast; /* Last page on block iBlk */ @@ -858,12 +818,12 @@ ** Delete or otherwise recycle the blocks currently occupied by run pDel. */ int lsmFsSortedDelete( FileSystem *pFS, Snapshot *pSnapshot, - int bZero, /* True to zero the SortedRun structure */ - SortedRun *pDel + int bZero, /* True to zero the Segment structure */ + Segment *pDel ){ if( pDel->iFirst ){ int rc = LSM_OK; int iBlk; @@ -882,11 +842,11 @@ } rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk); iBlk = iNext; } - if( bZero ) memset(pDel, 0, sizeof(SortedRun)); + if( bZero ) memset(pDel, 0, sizeof(Segment)); } return LSM_OK; } /* @@ -895,11 +855,11 @@ ** page is now the first page in its sorted file - all previous pages may ** be considered free. */ void lsmFsGobble( Snapshot *pSnapshot, - SortedRun *pRun, + Segment *pRun, Page *pPg ){ FileSystem *pFS = pPg->pFS; if( pPg->iPg!=pRun->iFirst ){ @@ -944,11 +904,11 @@ ** occurs, *ppNext is set to NULL and and lsm error code returned. ** ** Page references returned by this function should be released by the ** caller using lsmFsPageRelease(). */ -int lsmFsDbPageNext(SortedRun *pRun, Page *pPg, int eDir, Page **ppNext){ +int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){ FileSystem *pFS = pPg->pFS; int iPg = pPg->iPg; assert( eDir==1 || eDir==-1 ); @@ -1057,103 +1017,27 @@ for(pLvl=lsmDbSnapshotLevel(db->pWorker); rc==LSM_OK && pLvl; pLvl=pLvl->pNext ){ if( pLvl->nRight==0 ){ - addAppendPoint(db, pLvl->lhs.sep.iLast, &rc); - addAppendPoint(db, pLvl->lhs.run.iLast, &rc); + addAppendPoint(db, pLvl->lhs.iLast, &rc); }else{ int i; for(i=0; inRight; i++){ - addAppendPoint(db, pLvl->aRhs[i].sep.iLast, &rc); - addAppendPoint(db, pLvl->aRhs[i].run.iLast, &rc); + addAppendPoint(db, pLvl->aRhs[i].iLast, &rc); } } } for(pLvl=lsmDbSnapshotLevel(db->pWorker); pLvl; pLvl=pLvl->pNext){ int i; - subAppendPoint(db, pLvl->lhs.sep.iFirst); - subAppendPoint(db, pLvl->lhs.run.iFirst); + subAppendPoint(db, pLvl->lhs.iFirst); for(i=0; inRight; i++){ - subAppendPoint(db, pLvl->aRhs[i].sep.iFirst); - subAppendPoint(db, pLvl->aRhs[i].run.iFirst); - } - } - - return rc; -} - -int lsmFsPhantom(FileSystem *pFS, SortedRun *pRun){ - assert( pFS->phantom.pRun==0 ); - pFS->phantom.pRun = pRun; - return LSM_OK; -} - -void lsmFsPhantomFree(FileSystem *pFS){ - if( pFS->phantom.pRun ){ - Page *pPg; - Page *pNext; - for(pPg=pFS->phantom.pFirst; pPg; pPg=pNext){ - pNext = pPg->pHashNext; - fsPageBufferFree(pPg); - } - memset(&pFS->phantom, 0, sizeof(PhantomRun)); - } -} - -int lsmFsPhantomMaterialize( - FileSystem *pFS, - Snapshot *pSnapshot, - SortedRun *p -){ - int rc = LSM_OK; - if( isPhantom(pFS, p) ){ - PhantomRun *pPhantom = &pFS->phantom; - Page *pPg; - Page *pNext; - int i; - Pgno iFirst = 0; - - /* Search for an existing run in the database that this run can be - ** appended to. See comments surrounding findAppendPoint() for details. */ - iFirst = findAppendPoint(pFS, pPhantom->nPhantom); - - /* If the array can not be written into any partially used block, - ** allocate a new block. The first page of the materialized run will - ** be the second page of the new block (since the first is undersized - ** and can not be used). */ - if( iFirst==0 ){ - int iNew; /* New block */ - lsmBlockAllocate(pFS->pDb, &iNew); - iFirst = fsFirstPageOnBlock(pFS, iNew) + 1; - } - - p->iFirst = iFirst; - p->iLast = iFirst + pPhantom->nPhantom - 1; - assert( 0==fsIsFirst(pFS, p->iFirst) && 0==fsIsLast(pFS, p->iFirst) ); - assert( 0==fsIsFirst(pFS, p->iLast) && 0==fsIsLast(pFS, p->iLast) ); - assert( fsPageToBlock(pFS, p->iFirst)==fsPageToBlock(pFS, p->iLast) ); - - i = iFirst; - for(pPg=pPhantom->pFirst; pPg; pPg=pNext){ - int iHash; - pNext = pPg->pHashNext; - pPg->iPg = i++; - pPg->nRef++; - - iHash = fsHashKey(pFS->nHash, pPg->iPg); - pPg->pHashNext = pFS->apHash[iHash]; - pFS->apHash[iHash] = pPg; - pFS->nOut++; - lsmFsPageRelease(pPg); - } - assert( i==p->iLast+1 ); - - p->nSize = pPhantom->nPhantom; - memset(&pFS->phantom, 0, sizeof(PhantomRun)); - } + subAppendPoint(db, pLvl->aRhs[i].iFirst); + } + } + return rc; } /* ** Append a page to file iFile. Return a reference to it. lsmFsPageWrite() @@ -1160,113 +1044,78 @@ ** has already been called on the returned reference. */ int lsmFsSortedAppend( FileSystem *pFS, Snapshot *pSnapshot, - SortedRun *p, + Segment *p, Page **ppOut ){ int rc = LSM_OK; Page *pPg = 0; *ppOut = 0; - - if( isPhantom(pFS, p) ){ - const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); - int nLimit = (nPagePerBlock - 2 - (fsFirstPageOnBlock(pFS, 1)-1) ); - - if( pFS->phantom.nPhantom>=nLimit ){ - rc = lsmFsPhantomMaterialize(pFS, pSnapshot, p); - if( rc!=LSM_OK ) return rc; - } - } - - if( isPhantom(pFS, p) ){ - rc = fsPageBuffer(pFS, 1, &pPg); - if( rc==LSM_OK ){ - PhantomRun *pPhantom = &pFS->phantom; - pPg->iPg = 0; - pPg->nRef = 1; - pPg->flags |= PAGE_DIRTY; - pPg->pHashNext = 0; - pPg->pLruNext = 0; - pPg->pLruPrev = 0; - pPg->pFS = pFS; - if( pPhantom->pFirst ){ - assert( pPhantom->pLast ); - pPhantom->pLast->pHashNext = pPg; - }else{ - pPhantom->pFirst = pPg; - } - pPhantom->pLast = pPg; - pPhantom->nPhantom++; - } - }else{ - int iApp = 0; - int iNext = 0; - int iPrev = p->iLast; - - if( iPrev==0 ){ - iApp = findAppendPoint(pFS, 0); - }else if( fsIsLast(pFS, iPrev) ){ - Page *pLast = 0; - rc = fsPageGet(pFS, iPrev, 0, &pLast); - if( rc!=LSM_OK ) return rc; - iApp = lsmGetU32(&pLast->aData[pFS->nPagesize-4]); - lsmFsPageRelease(pLast); - }else{ - iApp = iPrev + 1; - } - - /* If this is the first page allocated, or if the page allocated is the - ** last in the block, allocate a new block here. */ - if( iApp==0 || fsIsLast(pFS, iApp) ){ - int iNew; /* New block number */ - - lsmBlockAllocate(pFS->pDb, &iNew); - if( iApp==0 ){ - iApp = fsFirstPageOnBlock(pFS, iNew); - }else{ - iNext = fsFirstPageOnBlock(pFS, iNew); - } - } - - /* Grab the new page. */ - pPg = 0; - rc = fsPageGet(pFS, iApp, 1, &pPg); - assert( rc==LSM_OK || pPg==0 ); - - /* If this is the first or last page of a block, fill in the pointer - ** value at the end of the new page. */ - if( rc==LSM_OK ){ - p->nSize++; - p->iLast = iApp; - if( p->iFirst==0 ) p->iFirst = iApp; - pPg->flags |= PAGE_DIRTY; - - if( fsIsLast(pFS, iApp) ){ - lsmPutU32(&pPg->aData[pFS->nPagesize-4], iNext); - }else - if( fsIsFirst(pFS, iApp) ){ - lsmPutU32(&pPg->aData[pFS->nPagesize-4], iPrev); - } - } + int iApp = 0; + int iNext = 0; + int iPrev = p->iLast; + + if( iPrev==0 ){ + iApp = findAppendPoint(pFS, 0); + }else if( fsIsLast(pFS, iPrev) ){ + Page *pLast = 0; + rc = fsPageGet(pFS, iPrev, 0, &pLast); + if( rc!=LSM_OK ) return rc; + iApp = lsmGetU32(&pLast->aData[pFS->nPagesize-4]); + lsmFsPageRelease(pLast); + }else{ + iApp = iPrev + 1; + } + + /* If this is the first page allocated, or if the page allocated is the + ** last in the block, allocate a new block here. */ + if( iApp==0 || fsIsLast(pFS, iApp) ){ + int iNew; /* New block number */ + + lsmBlockAllocate(pFS->pDb, &iNew); + if( iApp==0 ){ + iApp = fsFirstPageOnBlock(pFS, iNew); + }else{ + iNext = fsFirstPageOnBlock(pFS, iNew); + } + } + + /* Grab the new page. */ + pPg = 0; + rc = fsPageGet(pFS, iApp, 1, &pPg); + assert( rc==LSM_OK || pPg==0 ); + + /* If this is the first or last page of a block, fill in the pointer + ** value at the end of the new page. */ + if( rc==LSM_OK ){ + p->nSize++; + p->iLast = iApp; + if( p->iFirst==0 ) p->iFirst = iApp; + pPg->flags |= PAGE_DIRTY; + + if( fsIsLast(pFS, iApp) ){ + lsmPutU32(&pPg->aData[pFS->nPagesize-4], iNext); + }else + if( fsIsFirst(pFS, iApp) ){ + lsmPutU32(&pPg->aData[pFS->nPagesize-4], iPrev); + } } *ppOut = pPg; return rc; } /* ** Mark the sorted run passed as the second argument as finished. */ -int lsmFsSortedFinish(FileSystem *pFS, SortedRun *p){ +int lsmFsSortedFinish(FileSystem *pFS, Segment *p){ int rc = LSM_OK; if( p ){ const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); - if( pFS->phantom.pRun ) pFS->phantom.bRunFinished = 1; - /* Check if the last page of this run happens to be the last of a block. ** If it is, then an extra block has already been allocated for this run. ** Shift this extra block back to the free-block list. ** ** Otherwise, add the first free page in the last block used by the run @@ -1493,10 +1342,14 @@ ** to which this page belongs. */ lsm_env *lsmPageEnv(Page *pPg) { return pPg->pFS->pEnv; } + +FileSystem *lsmPageFS(Page *pPg){ + return pPg->pFS; +} /* ** Return the sector-size as reported by the log file handle. */ int lsmFsSectorSize(FileSystem *pFS){ @@ -1519,11 +1372,11 @@ } /* ** Helper function for lsmInfoArrayStructure(). */ -static SortedRun *startsWith(SortedRun *pRun, Pgno iFirst){ +static Segment *startsWith(Segment *pRun, Pgno iFirst){ return (iFirst==pRun->iFirst) ? pRun : 0; } /* ** This function implements the lsm_info(LSM_INFO_ARRAY_STRUCTURE) request. @@ -1535,11 +1388,11 @@ */ int lsmInfoArrayStructure(lsm_db *pDb, Pgno iFirst, char **pzOut){ int rc = LSM_OK; Snapshot *pWorker; /* Worker snapshot */ Snapshot *pRelease = 0; /* Snapshot to release */ - SortedRun *pArray = 0; /* Array to report on */ + Segment *pArray = 0; /* Array to report on */ Level *pLvl; /* Used to iterate through db levels */ *pzOut = 0; if( iFirst==0 ) return LSM_ERROR; @@ -1549,17 +1402,14 @@ pRelease = pWorker = lsmDbSnapshotWorker(pDb); } /* Search for the array that starts on page iFirst */ for(pLvl=lsmDbSnapshotLevel(pWorker); pLvl && pArray==0; pLvl=pLvl->pNext){ - if( 0==(pArray = startsWith(&pLvl->lhs.sep, iFirst)) - && 0==(pArray = startsWith(&pLvl->lhs.run, iFirst)) - ){ + if( 0==(pArray = startsWith(&pLvl->lhs, iFirst)) ){ int i; for(i=0; inRight; i++){ - if( (pArray = startsWith(&pLvl->aRhs[i].sep, iFirst)) ) break; - if( (pArray = startsWith(&pLvl->aRhs[i].run, iFirst)) ) break; + if( (pArray = startsWith(&pLvl->aRhs[i], iFirst)) ) break; } } } if( pArray==0 ){ @@ -1589,38 +1439,10 @@ lsmDbSnapshotRelease(pDb->pEnv, pRelease); return rc; } -void lsmFsDumpBlockmap(lsm_db *pDb, SortedRun *p){ - if( p ){ - FileSystem *pFS = pDb->pFS; - int iBlk; - int iLastBlk; - char *zMsg = 0; - LsmString zBlk; - - lsmStringInit(&zBlk, pDb->pEnv); - iBlk = fsPageToBlock(pFS, p->iFirst); - iLastBlk = fsPageToBlock(pFS, p->iLast); - - while( iBlk ){ - lsmStringAppendf(&zBlk, " %d", iBlk); - if( iBlk!=iLastBlk ){ - fsBlockNext(pFS, iBlk, &iBlk); - }else{ - iBlk = 0; - } - } - - zMsg = lsmMallocPrintf(pDb->pEnv, "%d..%d: ", p->iFirst, p->iLast); - lsmLogMessage(pDb, LSM_OK, " % -15s %s", zMsg, zBlk.z); - lsmFree(pDb->pEnv, zMsg); - lsmStringClear(&zBlk); - } -} - #ifdef LSM_EXPENSIVE_DEBUG /* ** Helper function for lsmFsIntegrityCheck() */ static void checkBlocks( @@ -1630,11 +1452,11 @@ u8 *aUsed ){ if( pSeg ){ int i; for(i=0; i<2; i++){ - SortedRun *p = (i ? pSeg->pRun : pSeg->pSep); + Segment *p = (i ? pSeg->pRun : pSeg->pSep); if( p && p->nSize>0 ){ const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); int iBlk; Index: src/lsm_main.c ================================================================== --- src/lsm_main.c +++ src/lsm_main.c @@ -80,10 +80,11 @@ pDb->eSafety = LSM_SAFETY_NORMAL; pDb->xCmp = xCmp; pDb->nLogSz = LSM_DEFAULT_LOG_SIZE; pDb->nDfltPgsz = LSM_PAGE_SIZE; pDb->nDfltBlksz = LSM_BLOCK_SIZE; + pDb->nMerge = LSM_DEFAULT_NMERGE; pDb->bUseLog = 1; return LSM_OK; } @@ -395,10 +396,17 @@ pDb->bUseLog = *piVal; } *piVal = pDb->bUseLog; break; } + + case LSM_CONFIG_NMERGE: { + int *piVal = va_arg(ap, int *); + if( *piVal>1 ) pDb->nMerge = *piVal; + *piVal = pDb->nMerge; + break; + } default: rc = LSM_MISUSE; break; } @@ -407,12 +415,12 @@ return rc; } void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){ lsmStringAppendf(pStr, "%s{%d %d %d %d %d %d}", zPre, - pSeg->sep.iFirst, pSeg->sep.iLast, pSeg->sep.iRoot, - pSeg->run.iFirst, pSeg->run.iLast, pSeg->run.nSize + 0, 0, 0, + pSeg->iFirst, pSeg->iLast, pSeg->nSize ); } int lsmStructList( lsm_db *pDb, /* Database handle */ Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -752,13 +752,12 @@ pOld = p->pClient; pNew->pSnapshotNext = pOld; p->pClient = pNew; assertSnapshotListOk(p); if( pDb->pClient ){ - assert( pDb->pClient==pOld ); - pDb->pClient = p->pClient; - p->pClient->nRef++; + pDb->pClient = pNew; + pNew->nRef++; } lsmMutexLeave(pDb->pEnv, p->pClientMutex); lsmDbSnapshotRelease(pDb->pEnv, pOld); p->bDirty = 0; Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -8,16 +8,10 @@ ** May you find forgiveness for yourself and forgive others. ** May you share freely, never taking more than you give. ** ************************************************************************* ** -** SORTED FILE FORMAT: -** -** A sorted file is divided into pages. The page-size is not stored anywhere -** within the sorted file itself - it must be known in advance in order to -** read the file. The maximum allowed page-size is 64KB. -** ** PAGE FORMAT: ** ** The maximum page size is 65536 bytes. ** ** Since all records are equal to or larger than 2 bytes in size, and @@ -150,12 +144,11 @@ ** main run and an optional sorted run). ** ** * To iterate through the separators array of a segment. */ struct SegmentPtr { - Segment *pSeg; /* Segment to access */ - SortedRun *pRun; /* Points to either pSeg->run or pSeg->sep */ + Segment *pSeg; /* Segment to access */ /* Current page. See segmentPtrLoadPage(). */ Page *pPg; /* Current page */ u16 flags; /* Copy of page flags field */ int nCell; /* Number of cells on pPg */ @@ -199,10 +192,45 @@ int iCurrentPtr; /* Current entry in aPtr[] */ int nPtr; /* Size of aPtr[] array */ SegmentPtr *aPtr; /* Array of segment pointers */ Level *pLevel; /* Pointer to Level object (if nPtr>1) */ }; + +/* +** Used to iterate through the keys stored in a b-tree hierarchy from start +** to finish. Only First() and Next() operations are required. +** +** btreeCursorNew() +** btreeCursorFirst() +** btreeCursorNext() +** btreeCursorFree() +** btreeCursorPosition() +** btreeCursorRestore() +*/ +typedef struct BtreePg BtreePg; +typedef struct BtreeCursor BtreeCursor; +struct BtreePg { + Page *pPage; + int iCell; +}; +struct BtreeCursor { + Segment *pSeg; /* Iterate through this segments btree */ + FileSystem *pFS; /* File system to read pages from */ + int nDepth; /* Allocated size of aPg[] */ + int iPg; /* Current entry in aPg[]. -1 -> EOF. */ + BtreePg *aPg; /* Pages from root to current location */ + + /* Cache of current entry. pKey==0 for EOF. */ + void *pKey; + int nKey; + int eType; + Pgno iPtr; + + /* Storage for key, if not local */ + Blob blob; +}; + /* ** A cursor used for merged searches or iterations through up to one ** Tree structure and any number of sorted files. ** @@ -222,21 +250,28 @@ int flags; /* Mask of CURSOR_XXX flags */ int (*xCmp)(void *, int, void *, int); /* Compare function */ int eType; /* Cache of current key type */ Blob key; /* Cache of current key (or NULL) */ + Blob val; /* Cache of current value */ TreeCursor *pTreeCsr; /* Single tree cursor */ int nSegCsr; /* Size of aSegCsr[] array */ LevelCursor *aSegCsr; /* Array of cursors open on sorted files */ int nTree; int *aTree; + BtreeCursor *pBtCsr; int *pnHdrLevel; void *pSystemVal; Snapshot *pSnap; }; + +#define CURSOR_DATA_TREE 0 +#define CURSOR_DATA_SYSTEM 1 +#define CURSOR_DATA_SEGMENT 2 + /* ** CURSOR_IGNORE_DELETE ** If set, this cursor will not visit SORTED_DELETE keys. ** @@ -271,28 +306,40 @@ #define CURSOR_IGNORE_SYSTEM 0x00000010 #define CURSOR_NEXT_OK 0x00000020 #define CURSOR_PREV_OK 0x00000040 typedef struct MergeWorker MergeWorker; +typedef struct Hierarchy Hierarchy; + +struct Hierarchy { + Page **apHier; + int nHier; +}; + struct MergeWorker { lsm_db *pDb; /* Database handle */ Level *pLevel; /* Worker snapshot Level being merged */ MultiCursor *pCsr; /* Cursor to read new segment contents from */ int bFlush; /* True if this is an in-memory tree flush */ - Page **apHier; /* Separators array b-tree internal nodes */ - int nHier; /* Number of entries in apHier[] */ - Page *apPage[2]; /* Current output pages (0 is main run) */ + Hierarchy hier; /* B-tree hierarchy under construction */ + Page *pPage; /* Current output page */ int nWork; /* Number of calls to mergeWorkerNextPage() */ }; #ifdef LSM_DEBUG_EXPENSIVE -static void assertAllPointersOk(int rc, lsm_db *pDb); -static void assertAllBtreesOk(int rc, lsm_db *); -#else -# define assertAllPointersOk(y, z) -# define assertAllBtreesOk(y, z) +static int assertPointersOk(lsm_db *, Segment *, Segment *, int); +static int assertBtreeOk(lsm_db *, Segment *); #endif + +struct FilePage { u8 *aData; int nData; }; +static u8 *fsPageData(Page *pPg, int *pnData){ + *pnData = ((struct FilePage *)(pPg))->nData; + return ((struct FilePage *)(pPg))->aData; +} +static u8 *fsPageDataPtr(Page *pPg){ + return ((struct FilePage *)(pPg))->aData; +} /* ** Write nVal as a 16-bit unsigned big-endian integer into buffer aOut. */ void lsmPutU16(u8 *aOut, u16 nVal){ @@ -346,93 +393,10 @@ assert( pBlob->pEnv || pBlob->pData==0 ); if( pBlob->pData ) lsmFree(pBlob->pEnv, pBlob->pData); memset(pBlob, 0, sizeof(Blob)); } - -static int pageGetNRec(u8 *aData, int nData){ - return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]); -} - -static int pageGetPtr(u8 *aData, int nData){ - return (int)lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]); -} - -static int pageGetFlags(u8 *aData, int nData){ - return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]); -} - -static u8 *pageGetCell(u8 *aData, int nData, int iCell){ - return &aData[lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, iCell)])]; -} - -/* -** Return the decoded (possibly relative) pointer value stored in cell -** iCell from page aData/nData. -*/ -static int pageGetRecordPtr(u8 *aData, int nData, int iCell){ - int iRet; /* Return value */ - u8 *aCell; /* Pointer to cell iCell */ - aCell = pageGetCell(aData, nData, iCell); - lsmVarintGet32(&aCell[1], &iRet); - return iRet; -} - -static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){ - lsmFsPageRelease(pPtr->pPg); - if( pNext ){ - int nData; - u8 *aData = lsmFsPageData(pNext, &nData); - pPtr->nCell = pageGetNRec(aData, nData); - pPtr->flags = pageGetFlags(aData, nData); - pPtr->iPtr = pageGetPtr(aData, nData); - } - pPtr->pPg = pNext; -} - -/* -** Load a new page into the SegmentPtr object pPtr. -*/ -static int segmentPtrLoadPage( - FileSystem *pFS, - SegmentPtr *pPtr, /* Load page into this SegmentPtr object */ - int iNew /* Page number of new page */ -){ - Page *pPg = 0; /* The new page */ - int rc; /* Return Code */ - - assert( pPtr->pSeg==0 - || pPtr->pRun==&pPtr->pSeg->run - || pPtr->pRun==&pPtr->pSeg->sep - ); - rc = lsmFsDbPageGet(pFS, iNew, &pPg); - assert( rc==LSM_OK || pPg==0 ); - segmentPtrSetPage(pPtr, pPg); - - return rc; -} - -static int segmentPtrNextPage( - SegmentPtr *pPtr, /* Load page into this SegmentPtr object */ - int eDir /* +1 for next(), -1 for prev() */ -){ - Page *pNext; /* New page to load */ - int rc; /* Return code */ - - assert( eDir==1 || eDir==-1 ); - assert( pPtr->pPg ); - assert( (pPtr->pSeg==0 && eDir>0) - || pPtr->pRun==&pPtr->pSeg->run - || pPtr->pRun==&pPtr->pSeg->sep - ); - - rc = lsmFsDbPageNext(pPtr->pRun, pPtr->pPg, eDir, &pNext); - assert( rc==LSM_OK || pNext==0 ); - segmentPtrSetPage(pPtr, pNext); - return rc; -} - static int sortedReadData( Page *pPg, int iOff, int nByte, void **ppData, @@ -442,11 +406,11 @@ int iEnd; int nData; int nCell; u8 *aData; - aData = lsmFsPageData(pPg, &nData); + aData = fsPageData(pPg, &nData); nCell = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]); iEnd = SEGMENT_EOF(nData, nCell); assert( iEnd>0 && iEnd0 && iEndpPg, iOff, nByte, ppData, pBlob); +static int pageGetNRec(u8 *aData, int nData){ + return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]); +} + +static int pageGetPtr(u8 *aData, int nData){ + return (int)lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]); +} + +static int pageGetFlags(u8 *aData, int nData){ + return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]); +} + +static u8 *pageGetCell(u8 *aData, int nData, int iCell){ + return &aData[lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, iCell)])]; +} + +/* +** Return the decoded (possibly relative) pointer value stored in cell +** iCell from page aData/nData. +*/ +static int pageGetRecordPtr(u8 *aData, int nData, int iCell){ + int iRet; /* Return value */ + u8 *aCell; /* Pointer to cell iCell */ + + assert( iCell=0 ); + aCell = pageGetCell(aData, nData, iCell); + lsmVarintGet32(&aCell[1], &iRet); + return iRet; } static u8 *pageGetKey( Page *pPg, /* Page to read from */ int iCell, /* Index of cell on page to read */ @@ -527,12 +511,14 @@ int nDummy; int eType; u8 *aData; int nData; - aData = lsmFsPageData(pPg, &nData); + aData = fsPageData(pPg, &nData); + assert( !(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ); + assert( iCellpData; + *pnKey = pBlob->nData; + }else{ + aCell += lsmVarintGet32(aCell, pnKey); + *ppKey = aCell; + } + if( piTopic ) *piTopic = rtTopic(eType); + + return LSM_OK; +} + +static int btreeCursorLoadKey(BtreeCursor *pCsr){ + int rc = LSM_OK; + if( pCsr->iPg<0 ){ + pCsr->pKey = 0; + pCsr->nKey = 0; + pCsr->eType = 0; + }else{ + int dummy; + rc = pageGetBtreeKey( + pCsr->aPg[pCsr->iPg].pPage, pCsr->aPg[pCsr->iPg].iCell, + &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob + ); + pCsr->eType |= SORTED_SEPARATOR; + } + + return rc; +} + +static int btreeCursorPtr(u8 *aData, int nData, int iCell){ + int nCell; + + nCell = pageGetNRec(aData, nData); + if( iCell>=nCell ){ + return pageGetPtr(aData, nData); + } + return pageGetRecordPtr(aData, nData, iCell); +} + +static int btreeCursorNext(BtreeCursor *pCsr){ + int rc = LSM_OK; + + BtreePg *pPg = &pCsr->aPg[pCsr->iPg]; + int nCell; + u8 *aData; + int nData; + + assert( pCsr->iPg>=0 ); + assert( pCsr->iPg==pCsr->nDepth-1 ); + + aData = fsPageData(pPg->pPage, &nData); + nCell = pageGetNRec(aData, nData); + + assert( pPg->iCell<=nCell ); + + pPg->iCell++; + if( pPg->iCell==nCell ){ + Pgno iLoad; + + /* Up to parent. */ + lsmFsPageRelease(pPg->pPage); + pPg->pPage = 0; + pCsr->iPg--; + while( pCsr->iPg>=0 ){ + pPg = &pCsr->aPg[pCsr->iPg]; + aData = fsPageData(pPg->pPage, &nData); + if( pPg->iCellpPage); + pCsr->iPg--; + } + + /* Read the key */ + rc = btreeCursorLoadKey(pCsr); + + /* Unless the cursor is at EOF, descend to cell -1 (yes, negative one) of + ** the left-most most descendent. */ + if( pCsr->iPg>=0 ){ + pCsr->aPg[pCsr->iPg].iCell++; + + iLoad = btreeCursorPtr(aData, nData, pPg->iCell); + do { + Page *pLoad; + pCsr->iPg++; + rc = lsmFsDbPageGet(pCsr->pFS, iLoad, &pLoad); + pCsr->aPg[pCsr->iPg].pPage = pLoad; + pCsr->aPg[pCsr->iPg].iCell = 0; + if( rc==LSM_OK ){ + if( pCsr->iPg==(pCsr->nDepth-1) ) break; + aData = fsPageData(pLoad, &nData); + iLoad = btreeCursorPtr(aData, nData, 0); + } + }while( rc==LSM_OK && pCsr->iPg<(pCsr->nDepth-1) ); + pCsr->aPg[pCsr->iPg].iCell = -1; + } + + }else{ + rc = btreeCursorLoadKey(pCsr); + } + + if( rc==LSM_OK && pCsr->iPg>=0 ){ + aData = fsPageData(pCsr->aPg[pCsr->iPg].pPage, &nData); + pCsr->iPtr = btreeCursorPtr(aData, nData, pCsr->aPg[pCsr->iPg].iCell+1); + } + + return rc; +} + +static void btreeCursorFree(BtreeCursor *pCsr){ + if( pCsr ){ + int i; + lsm_env *pEnv = lsmFsEnv(pCsr->pFS); + for(i=0; i<=pCsr->iPg; i++){ + lsmFsPageRelease(pCsr->aPg[i].pPage); + } + sortedBlobFree(&pCsr->blob); + lsmFree(pEnv, pCsr->aPg); + lsmFree(pEnv, pCsr); + } +} + +static int btreeCursorFirst(BtreeCursor *pCsr){ + int rc; + + Page *pPg = 0; + FileSystem *pFS = pCsr->pFS; + int iPg = pCsr->pSeg->iRoot; + + do { + rc = lsmFsDbPageGet(pFS, iPg, &pPg); + assert( (rc==LSM_OK)==(pPg!=0) ); + if( rc==LSM_OK ){ + u8 *aData; + int nData; + int flags; + + aData = fsPageData(pPg, &nData); + flags = pageGetFlags(aData, nData); + if( (flags & SEGMENT_BTREE_FLAG)==0 ) break; + + if( (pCsr->nDepth % 8)==0 ){ + int nNew = pCsr->nDepth + 8; + pCsr->aPg = (BtreePg *)lsmReallocOrFreeRc( + lsmFsEnv(pFS), pCsr->aPg, sizeof(BtreePg) * nNew, &rc + ); + if( rc==LSM_OK ){ + memset(&pCsr->aPg[pCsr->nDepth], 0, sizeof(BtreePg) * 8); + } + } + + if( rc==LSM_OK ){ + assert( pCsr->aPg[pCsr->nDepth].iCell==0 ); + pCsr->aPg[pCsr->nDepth].pPage = pPg; + pCsr->nDepth++; + iPg = pageGetRecordPtr(aData, nData, 0); + } + } + }while( rc==LSM_OK ); + lsmFsPageRelease(pPg); + pCsr->iPg = pCsr->nDepth-1; + + if( rc==LSM_OK && pCsr->nDepth ){ + pCsr->aPg[pCsr->iPg].iCell = -1; + rc = btreeCursorNext(pCsr); + } + + return rc; +} + +static void btreeCursorPosition(BtreeCursor *pCsr, MergeInput *p){ + if( pCsr->iPg>=0 ){ + p->iPg = lsmFsPageNumber(pCsr->aPg[pCsr->iPg].pPage); + p->iCell = ((pCsr->aPg[pCsr->iPg].iCell + 1) << 8) + pCsr->nDepth; + }else{ + p->iPg = 0; + p->iCell = 0; + } +} + +static int sortedKeyCompare( + int (*xCmp)(void *, int, void *, int), + int iLhsTopic, void *pLhsKey, int nLhsKey, + int iRhsTopic, void *pRhsKey, int nRhsKey +){ + int res = iLhsTopic - iRhsTopic; + if( res==0 ){ + res = xCmp(pLhsKey, nLhsKey, pRhsKey, nRhsKey); + } + return res; +} + +static int btreeCursorRestore( + BtreeCursor *pCsr, + int (*xCmp)(void *, int, void *, int), + MergeInput *p +){ + int rc = LSM_OK; + + if( p->iPg ){ + lsm_env *pEnv = lsmFsEnv(pCsr->pFS); + int iCell; /* Current cell number on leaf page */ + Pgno iLeaf; /* Page number of current leaf page */ + int nDepth; /* Depth of b-tree structure */ + + /* Decode the MergeInput structure */ + iLeaf = p->iPg; + nDepth = (p->iCell & 0x00FF); + iCell = (p->iCell >> 8) - 1; + + /* Allocate the BtreeCursor.aPg[] array */ + assert( pCsr->aPg==0 ); + pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc); + + /* Populate the last entry of the aPg[] array */ + if( rc==LSM_OK ){ + pCsr->iPg = nDepth-1; + pCsr->nDepth = nDepth; + pCsr->aPg[pCsr->iPg].iCell = iCell; + rc = lsmFsDbPageGet(pCsr->pFS, iLeaf, &pCsr->aPg[nDepth-1].pPage); + } + + /* Populate any other aPg[] array entries */ + if( rc==LSM_OK && nDepth>1 ){ + Blob blob = {0,0,0}; + void *pSeek; + int nSeek; + int iTopicSeek; + int dummy; + + int iPg = 0; + int iLoad = pCsr->pSeg->iRoot; + + rc = pageGetBtreeKey(pCsr->aPg[nDepth-1].pPage, + 0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob + ); + + do { + Page *pPg; + rc = lsmFsDbPageGet(pCsr->pFS, iLoad, &pPg); + assert( rc==LSM_OK || pPg==0 ); + if( rc==LSM_OK ){ + u8 *aData; /* Buffer containing page data */ + int nData; /* Size of aData[] in bytes */ + int iMin; + int iMax; + int iCell; + + aData = fsPageData(pPg, &nData); + assert( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ); + + iLoad = pageGetPtr(aData, nData); + iCell = pageGetNRec(aData, nData); + iMax = iCell-1; + iMin = 0; + + while( iMax>=iMin ){ + int iTry = (iMin+iMax)/2; + void *pKey; int nKey; /* Key for cell iTry */ + int iTopic; /* Topic for key pKeyT/nKeyT */ + int iPtr; /* Pointer for cell iTry */ + int res; /* (pSeek - pKeyT) */ + + rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopic, &pKey, &nKey,&blob); + if( rc!=LSM_OK ) break; + + res = sortedKeyCompare( + xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey + ); + assert( res!=0 ); + + if( res<0 ){ + iLoad = iPtr; + iCell = iTry; + iMax = iTry-1; + }else{ + iMin = iTry+1; + } + } + + pCsr->aPg[iPg].pPage = pPg; + pCsr->aPg[iPg].iCell = iCell; + iPg++; + assert( iPg!=nDepth-1 || iLoad==iLeaf ); + } + }while( rc==LSM_OK && iPg<(nDepth-1) ); + sortedBlobFree(&blob); + } + + /* Load the current key and pointer */ + if( rc==LSM_OK ){ + BtreePg *pBtreePg; + u8 *aData; + int nData; + + pBtreePg = &pCsr->aPg[pCsr->iPg]; + aData = fsPageData(pBtreePg->pPage, &nData); + pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1); + if( pBtreePg->iCell<0 ){ + int dummy; + int i; + for(i=pCsr->iPg-1; i>=0; i--){ + if( pCsr->aPg[i].iCell>0 ) break; + } + assert( i>=0 ); + rc = pageGetBtreeKey( + pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1, + &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob + ); + pCsr->eType |= SORTED_SEPARATOR; + + }else{ + rc = btreeCursorLoadKey(pCsr); + } + } + } + return rc; +} + +static int btreeCursorNew( + lsm_db *pDb, + Segment *pSeg, + BtreeCursor **ppCsr +){ + int rc = LSM_OK; + BtreeCursor *pCsr; + + assert( pSeg->iRoot ); + pCsr = lsmMallocZeroRc(pDb->pEnv, sizeof(BtreeCursor), &rc); + if( pCsr ){ + pCsr->pFS = pDb->pFS; + pCsr->pSeg = pSeg; + pCsr->iPg = -1; + } + + *ppCsr = pCsr; + return rc; +} + +static void segmentPtrSetPage(SegmentPtr *pPtr, Page *pNext){ + lsmFsPageRelease(pPtr->pPg); + if( pNext ){ + int nData; + u8 *aData = fsPageData(pNext, &nData); + pPtr->nCell = pageGetNRec(aData, nData); + pPtr->flags = pageGetFlags(aData, nData); + pPtr->iPtr = pageGetPtr(aData, nData); + } + pPtr->pPg = pNext; +} + +/* +** Load a new page into the SegmentPtr object pPtr. +*/ +static int segmentPtrLoadPage( + FileSystem *pFS, + SegmentPtr *pPtr, /* Load page into this SegmentPtr object */ + int iNew /* Page number of new page */ +){ + Page *pPg = 0; /* The new page */ + int rc; /* Return Code */ + + rc = lsmFsDbPageGet(pFS, iNew, &pPg); + assert( rc==LSM_OK || pPg==0 ); + segmentPtrSetPage(pPtr, pPg); + + return rc; +} + +static int segmentPtrReadData( + SegmentPtr *pPtr, + int iOff, + int nByte, + void **ppData, + Blob *pBlob +){ + return sortedReadData(pPtr->pPg, iOff, nByte, ppData, pBlob); +} + +static int segmentPtrNextPage( + SegmentPtr *pPtr, /* Load page into this SegmentPtr object */ + int eDir /* +1 for next(), -1 for prev() */ +){ + Page *pNext; /* New page to load */ + int rc; /* Return code */ + + assert( eDir==1 || eDir==-1 ); + assert( pPtr->pPg ); + assert( pPtr->pSeg || eDir>0 ); + + rc = lsmFsDbPageNext(pPtr->pSeg, pPtr->pPg, eDir, &pNext); + assert( rc==LSM_OK || pNext==0 ); + segmentPtrSetPage(pPtr, pNext); + return rc; +} static int segmentPtrLoadCell( SegmentPtr *pPtr, /* Load page into this SegmentPtr object */ int iNew /* Cell number of new cell */ ){ @@ -576,11 +985,11 @@ int iOff; /* Offset in aData[] to read from */ int nPgsz; /* Size of page (aData[]) in bytes */ assert( iNewnCell ); pPtr->iCell = iNew; - aData = lsmFsPageData(pPtr->pPg, &nPgsz); + aData = fsPageData(pPtr->pPg, &nPgsz); iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nPgsz, pPtr->iCell)]); pPtr->eType = aData[iOff]; iOff++; iOff += lsmVarintGet32(&aData[iOff], &pPtr->iPgPtr); iOff += lsmVarintGet32(&aData[iOff], &pPtr->nKey); @@ -593,28 +1002,19 @@ ); if( rc==LSM_OK && rtIsWrite(pPtr->eType) ){ rc = segmentPtrReadData( pPtr, iOff+pPtr->nKey, pPtr->nVal, &pPtr->pVal, &pPtr->blob2 ); + }else{ + pPtr->nVal = 0; + pPtr->pVal = 0; } } return rc; } -static int sortedKeyCompare( - int (*xCmp)(void *, int, void *, int), - int iLhsTopic, void *pLhsKey, int nLhsKey, - int iRhsTopic, void *pRhsKey, int nRhsKey -){ - int res = iLhsTopic - iRhsTopic; - if( res==0 ){ - res = xCmp(pLhsKey, nLhsKey, pRhsKey, nRhsKey); - } - return res; -} - void lsmSortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){ lsm_env *pEnv = pDb->pEnv; /* Environment handle */ int rc = *pRc; int i; Merge *pMerge = pLevel->pMerge; @@ -621,14 +1021,12 @@ for(i=0; rc==LSM_OK && inRight; i++){ Page *pPg = 0; int iTopic; Blob blob = {0, 0, 0, 0}; - SortedRun *pRun = &pLevel->aRhs[i].run; - assert( pRun->iFirst!=0 ); - + assert( pLevel->aRhs[i].iFirst!=0 ); rc = lsmFsDbPageGet(pDb->pFS, pMerge->aInput[i].iPg, &pPg); if( rc==LSM_OK ){ rc = pageGetKeyCopy(pEnv, pPg, pMerge->aInput[i].iCell, &iTopic, &blob); } if( rc==LSM_OK ){ @@ -682,21 +1080,18 @@ pCsr->nPtr = nPtr; for(i=0; inRight; i++){ pCsr->aPtr[i+1].pSeg = &pLevel->aRhs[i]; } - for(i=0; inPtr; i++){ - pCsr->aPtr[i].pRun = &pCsr->aPtr[i].pSeg->run; - } } return rc; } static int levelCursorInitRun( lsm_db *pDb, - SortedRun *pRun, + Segment *pSeg, int (*xCmp)(void *, int, void *, int), LevelCursor *pCsr /* Cursor structure to initialize */ ){ int rc = LSM_OK; @@ -708,11 +1103,11 @@ pCsr->aPtr = (SegmentPtr*)lsmMallocZeroRc(pDb->pEnv, sizeof(SegmentPtr)*pCsr->nPtr, &rc ); if( rc==LSM_OK ){ - pCsr->aPtr[0].pRun = pRun; + pCsr->aPtr[0].pSeg = pSeg; } return rc; } @@ -789,11 +1184,11 @@ int bLast, int *pRc ){ if( *pRc==LSM_OK ){ Page *pNew = 0; - Pgno iPg = (bLast ? pPtr->pRun->iLast : pPtr->pRun->iFirst); + Pgno iPg = (bLast ? pPtr->pSeg->iLast : pPtr->pSeg->iFirst); *pRc = lsmFsDbPageGet(pFS, iPg, &pNew); segmentPtrSetPage(pPtr, pNew); } } @@ -813,11 +1208,14 @@ ){ if( *pRc==LSM_OK ){ int rc = LSM_OK; segmentPtrEndPage(pCsr->pFS, pPtr, bLast, &rc); - while( rc==LSM_OK && pPtr->pPg && pPtr->nCell==0 ){ + while( rc==LSM_OK + && pPtr->pPg + && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG)) + ){ rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1)); } if( rc==LSM_OK && pPtr->pPg ){ rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0); } @@ -894,18 +1292,18 @@ lsmFsPageRef(pTest); while( pTest ){ Page *pNext; - int rc = lsmFsDbPageNext(pPtr->pRun, pTest, eDir, &pNext); + int rc = lsmFsDbPageNext(pPtr->pSeg, pTest, eDir, &pNext); lsmFsPageRelease(pTest); pTest = pNext; assert( rc==LSM_OK ); if( pTest ){ int nData; - u8 *aData = lsmFsPageData(pTest, &nData); + u8 *aData = fsPageData(pTest, &nData); int nCell = pageGetNRec(aData, nData); int flags = pageGetFlags(aData, nData); if( nCell && 0==(flags&SEGMENT_BTREE_FLAG) ){ int nPgKey; int iPgTopic; @@ -977,11 +1375,11 @@ if( res==0 ) res = pCsr->xCmp(pLastKey, nLastKey, pKey, nKey); if( res>=0 ) break; /* Advance to the next page that contains at least one key. */ do { - rc = lsmFsDbPageNext(pPtr->pRun, pPtr->pPg, 1, &pNext); + rc = lsmFsDbPageNext(pPtr->pSeg, pPtr->pPg, 1, &pNext); if( pNext==0 ) break; assert( rc==LSM_OK ); segmentPtrSetPage(pPtr, pNext); }while( (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG)) ); if( pNext==0 ) break; @@ -995,17 +1393,17 @@ /* Assert that this page is the right page of this segment for the key ** that we are searching for. Do this by loading page (iPg-1) and testing ** that pKey/nKey is greater than all keys on that page, and then by ** loading (iPg+1) and testing that pKey/nKey is smaller than all ** the keys it houses. */ -#if 0 +#if 1 assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) ); #endif assert( pPtr->nCell>0 - || pPtr->pRun->nSize==1 - || lsmFsPageNumber(pPtr->pPg)==pPtr->pRun->iLast + || pPtr->pSeg->nSize==1 + || lsmFsPageNumber(pPtr->pPg)==pPtr->pSeg->iLast ); if( pPtr->nCell==0 ){ segmentPtrReset(pPtr); }else{ iMin = 0; @@ -1112,73 +1510,52 @@ } pCsr->iCurrentPtr = iBest; } -static int seekInSeparators( +static int seekInBtree( LevelCursor *pCsr, - SegmentPtr *pPtr, /* Segment to seek within */ + Segment *pSeg, void *pKey, int nKey, /* Key to seek to */ - int *piPtr /* OUT: FC pointer */ + Page **ppPg /* OUT: Leaf (sorted-run) page reference */ ){ int rc; int iPg; + Page *pPg = 0; Blob blob = {0, 0, 0}; int iTopic = 0; /* TODO: Fix me */ - SortedRun *pSep = &pPtr->pSeg->sep; - iPg = pSep->iRoot; + iPg = pSeg->iRoot; do { - Page *pPg; rc = lsmFsDbPageGet(pCsr->pFS, iPg, &pPg); + assert( rc==LSM_OK || pPg==0 ); if( rc==LSM_OK ){ u8 *aData; /* Buffer containing page data */ int nData; /* Size of aData[] in bytes */ int iMin; int iMax; int nRec; int flags; - aData = lsmFsPageData(pPg, &nData); + aData = fsPageData(pPg, &nData); flags = pageGetFlags(aData, nData); - if( (flags & SEGMENT_BTREE_FLAG)==0 ){ - lsmFsPageRelease(pPg); - break; - } + if( (flags & SEGMENT_BTREE_FLAG)==0 ) break; iPg = pageGetPtr(aData, nData); nRec = pageGetNRec(aData, nData); iMin = 0; iMax = nRec-1; while( iMax>=iMin ){ - Page *pRef = 0; int iTry = (iMin+iMax)/2; void *pKeyT; int nKeyT; /* Key for cell iTry */ int iTopicT; /* Topic for key pKeyT/nKeyT */ int iPtr; /* Pointer associated with cell iTry */ - u8 *aCell; /* Pointer to cell iTry */ int res; /* (pKey - pKeyT) */ - int eType; - - aCell = pageGetCell(aData, nData, iTry); - eType = *aCell++; - aCell += lsmVarintGet32(aCell, &iPtr); - if( eType==0 ){ - /* If eType==0, then this b-tree cell does not contain a key. - ** Instead, it is a reference to another cell in the same separators - ** array that does contain a key. */ - Pgno iRef; - aCell += lsmVarintGet32(aCell, &iRef); - rc = lsmFsDbPageGet(pCsr->pFS, iRef, &pRef); - if( rc!=LSM_OK ) break; - pKeyT = pageGetKey(pRef, 0, &iTopicT, &nKeyT, &blob); - }else{ - aCell += lsmVarintGet32(aCell, &nKeyT); - pKeyT = (void *)aCell; - iTopicT = rtTopic(eType); - } + + rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob); + if( rc!=LSM_OK ) break; res = iTopic - iTopicT; if( res==0 ) res = pCsr->xCmp(pKey, nKey, pKeyT, nKeyT); if( res<0 ){ @@ -1185,27 +1562,19 @@ iPg = iPtr; iMax = iTry-1; }else{ iMin = iTry+1; } - lsmFsPageRelease(pRef); } lsmFsPageRelease(pPg); + pPg = 0; } }while( rc==LSM_OK ); - if( rc==LSM_OK ){ - assert( pPtr->pRun==&pPtr->pSeg->run ); - pPtr->pRun = pSep; - rc = segmentPtrLoadPage(pCsr->pFS, pPtr, iPg); - if( rc==LSM_OK ){ - rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, 0, piPtr); - } - pPtr->pRun = &pPtr->pSeg->run; - } - sortedBlobFree(&blob); + assert( (rc==LSM_OK)==(pPg!=0) ); + *ppPg = pPg; return rc; } static int seekInSegment( LevelCursor *pCsr, @@ -1216,21 +1585,24 @@ int *piPtr /* OUT: FC pointer */ ){ int iPtr = iPg; int rc = LSM_OK; - assert( pPtr->pRun==&pPtr->pSeg->run ); - - if( segmentHasSeparators(pPtr->pSeg) ){ - rc = seekInSeparators(pCsr, pPtr, pKey, nKey, &iPtr); - }else if( iPtr==0 ){ - iPtr = pPtr->pSeg->run.iFirst; + if( pPtr->pSeg->iRoot ){ + Page *pPg; + assert( pPtr->pSeg->iRoot!=0 ); + rc = seekInBtree(pCsr, pPtr->pSeg, pKey, nKey, &pPg); + if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg); + }else{ + if( iPtr==0 ){ + iPtr = pPtr->pSeg->iFirst; + } + if( rc==LSM_OK ){ + rc = segmentPtrLoadPage(pCsr->pFS, pPtr, iPtr); + } } - if( rc==LSM_OK ){ - rc = segmentPtrLoadPage(pCsr->pFS, pPtr, iPtr); - } if( rc==LSM_OK ){ rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr); } return rc; } @@ -1391,10 +1763,13 @@ /* Close the sorted file cursors */ for(i=0; inSegCsr; i++){ segmentCursorClose(pEnv, &pCsr->aSegCsr[i]); } + /* And the b-tree cursor, if any */ + btreeCursorFree(pCsr->pBtCsr); + /* Free allocations */ lsmFree(pEnv, pCsr->aSegCsr); lsmFree(pEnv, pCsr->aTree); lsmFree(pEnv, pCsr->pSystemVal); @@ -1404,10 +1779,11 @@ pCsr->nTree = 0; pCsr->aTree = 0; pCsr->pSystemVal = 0; pCsr->pSnap = 0; pCsr->pTreeCsr = 0; + pCsr->pBtCsr = 0; } void lsmMCursorClose(MultiCursor *pCsr){ if( pCsr ){ lsm_db *pDb = pCsr->pDb; @@ -1422,10 +1798,11 @@ } } /* Free the allocation used to cache the current key, if any. */ sortedBlobFree(&pCsr->key); + sortedBlobFree(&pCsr->val); /* Free the component cursors */ mcursorFreeComponents(pCsr); /* Free the cursor structure itself */ @@ -1434,11 +1811,10 @@ } #define MULTICURSOR_ADDLEVEL_ALL 1 #define MULTICURSOR_ADDLEVEL_RHS 2 #define MULTICURSOR_ADDLEVEL_LHS_SEP 3 -#define MULTICURSOR_ADDLEVEL_RHS_SEP 4 /* ** Add segments belonging to level pLevel to the multi-cursor pCsr. The ** third argument must be one of the following: ** @@ -1450,14 +1826,10 @@ ** ** MULTICURSOR_ADDLEVEL_LHS_SEP ** Add only the lhs segment. And iterate through its separators array, ** not the main run array. ** -** MULTICURSOR_ADDLEVEL_RHS_SEP -** Add only the first segment from the rhs. And iterate through its -** separators array, not the main run array. -** ** RHS and SEP are only used by cursors created to use as data sources when ** creating new segments (either when flushing the in-memory tree to disk or ** when merging existing runs). */ int multiCursorAddLevel( @@ -1464,51 +1836,55 @@ MultiCursor *pCsr, /* Multi-cursor to add segment to */ Level *pLevel, /* Level to add to multi-cursor merge */ int eMode /* A MULTICURSOR_ADDLEVEL_*** constant */ ){ int rc = LSM_OK; - int i; - int nAdd = (eMode==MULTICURSOR_ADDLEVEL_RHS ? pLevel->nRight : 1); assert( eMode==MULTICURSOR_ADDLEVEL_ALL || eMode==MULTICURSOR_ADDLEVEL_RHS || eMode==MULTICURSOR_ADDLEVEL_LHS_SEP ); - for(i=0; ipDb; - - /* Grow the pCsr->aSegCsr array if required */ - if( 0==(pCsr->nSegCsr % 16) ){ - int nByte; - LevelCursor *aNew; - nByte = sizeof(LevelCursor) * (pCsr->nSegCsr+16); - aNew = (LevelCursor *)lsmRealloc(pDb->pEnv, pCsr->aSegCsr, nByte); - if( aNew==0 ) return LSM_NOMEM_BKPT; - memset(&aNew[pCsr->nSegCsr], 0, sizeof(LevelCursor)*16); - pCsr->aSegCsr = aNew; - } - pNew = &pCsr->aSegCsr[pCsr->nSegCsr]; - - switch( eMode ){ - case MULTICURSOR_ADDLEVEL_ALL: - rc = levelCursorInit(pDb, pLevel, pCsr->xCmp, pNew); - break; - - case MULTICURSOR_ADDLEVEL_RHS: - rc = levelCursorInitRun(pDb, &pLevel->aRhs[i].run, pCsr->xCmp, pNew); - break; - - case MULTICURSOR_ADDLEVEL_LHS_SEP: - rc = levelCursorInitRun(pDb, &pLevel->lhs.sep, pCsr->xCmp, pNew); - break; - } - if( pCsr->flags & CURSOR_IGNORE_SYSTEM ){ - pNew->bIgnoreSystem = 1; - } - if( rc==LSM_OK ) pCsr->nSegCsr++; + if( eMode==MULTICURSOR_ADDLEVEL_LHS_SEP ){ + assert( pLevel->lhs.iRoot ); + assert( pCsr->pBtCsr==0 ); + rc = btreeCursorNew(pCsr->pDb, &pLevel->lhs, &pCsr->pBtCsr); + assert( (rc==LSM_OK)==(pCsr->pBtCsr!=0) ); + }else{ + int i; + int nAdd = (eMode==MULTICURSOR_ADDLEVEL_RHS ? pLevel->nRight : 1); + + for(i=0; ipDb; + + /* Grow the pCsr->aSegCsr array if required */ + if( 0==(pCsr->nSegCsr % 16) ){ + int nByte; + LevelCursor *aNew; + nByte = sizeof(LevelCursor) * (pCsr->nSegCsr+16); + aNew = (LevelCursor *)lsmRealloc(pDb->pEnv, pCsr->aSegCsr, nByte); + if( aNew==0 ) return LSM_NOMEM_BKPT; + memset(&aNew[pCsr->nSegCsr], 0, sizeof(LevelCursor)*16); + pCsr->aSegCsr = aNew; + } + pNew = &pCsr->aSegCsr[pCsr->nSegCsr]; + + switch( eMode ){ + case MULTICURSOR_ADDLEVEL_ALL: + rc = levelCursorInit(pDb, pLevel, pCsr->xCmp, pNew); + break; + + case MULTICURSOR_ADDLEVEL_RHS: + rc = levelCursorInitRun(pDb, &pLevel->aRhs[i], pCsr->xCmp, pNew); + break; + } + if( pCsr->flags & CURSOR_IGNORE_SYSTEM ){ + pNew->bIgnoreSystem = 1; + } + if( rc==LSM_OK ) pCsr->nSegCsr++; + } } return rc; } @@ -1523,10 +1899,14 @@ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = *ppCsr; /* Allocated multi-cursor */ if( pCsr==0 ){ pCsr = (MultiCursor *)lsmMallocZeroRc(pDb->pEnv, sizeof(MultiCursor), &rc); + if( pCsr ){ + pCsr->pNext = pDb->pCsr; + pDb->pCsr = pCsr; + } } if( rc==LSM_OK ){ if( useTree ){ assert( pDb->pTV ); @@ -1544,10 +1924,27 @@ pCsr = 0; } *ppCsr = pCsr; return rc; } + +void lsmSortedRemap(lsm_db *pDb){ + MultiCursor *pCsr; + for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){ + int i; + if( pCsr->pBtCsr ){ + btreeCursorLoadKey(pCsr->pBtCsr); + } + for(i=0; inSegCsr; i++){ + int iPtr; + LevelCursor *p = &pCsr->aSegCsr[i]; + for(iPtr=0; iPtrnPtr; iPtr++){ + segmentPtrLoadCell(&p->aPtr[iPtr], p->aPtr[iPtr].iCell); + } + } + } +} static void multiCursorReadSeparators(MultiCursor *pCsr){ if( pCsr->nSegCsr>0 ){ pCsr->aSegCsr[pCsr->nSegCsr-1].bIgnoreSeparators = 0; } @@ -1618,24 +2015,16 @@ ){ MultiCursor *pCsr = 0; int rc; rc = multiCursorAllocate(pDb, 0, &pCsr); - if( rc==LSM_OK ){ - pCsr->pNext = pDb->pCsr; - pDb->pCsr = pCsr; - } assert( (rc==LSM_OK)==(pCsr!=0) ); *ppCsr = pCsr; return rc; } -#define CURSOR_DATA_TREE 0 -#define CURSOR_DATA_SYSTEM 1 -#define CURSOR_DATA_SEGMENT 2 - static void multiCursorGetKey( MultiCursor *pCsr, int iKey, int *peType, /* OUT: Key type (SORTED_WRITE etc.) */ void **ppKey, /* OUT: Pointer to buffer containing key */ @@ -1670,11 +2059,15 @@ } break; default: { int iSeg = iKey - CURSOR_DATA_SEGMENT; - if( iSegnSegCsr && segmentCursorValid(&pCsr->aSegCsr[iSeg]) ){ + if( iSeg==pCsr->nSegCsr && pCsr->pBtCsr ){ + pKey = pCsr->pBtCsr->pKey; + nKey = pCsr->pBtCsr->nKey; + eType = pCsr->pBtCsr->eType; + }if( iSegnSegCsr && segmentCursorValid(&pCsr->aSegCsr[iSeg]) ){ segmentCursorKey(&pCsr->aSegCsr[iSeg], &pKey, &nKey); segmentCursorType(&pCsr->aSegCsr[iSeg], &eType); } break; } @@ -1722,10 +2115,11 @@ segmentCursorValue(&pCsr->aSegCsr[iVal-CURSOR_DATA_SEGMENT], ppVal, pnVal); }else{ *ppVal = 0; *pnVal = 0; } + assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) ); return rc; } int lsmSortedLoadSystem(lsm_db *pDb){ MultiCursor *pCsr = 0; /* Cursor used to retreive free-list */ @@ -1816,12 +2210,15 @@ static int multiCursorAllocTree(MultiCursor *pCsr){ int rc = LSM_OK; if( pCsr->aTree==0 ){ int nByte; /* Bytes of space to allocate */ + int bBtree; /* True if b-tree cursor is present */ + + bBtree = (pCsr->pBtCsr!=0); pCsr->nTree = 2; - while( pCsr->nTree<(CURSOR_DATA_SEGMENT+pCsr->nSegCsr) ){ + while( pCsr->nTree<(CURSOR_DATA_SEGMENT+pCsr->nSegCsr+bBtree) ){ pCsr->nTree = pCsr->nTree*2; } nByte = sizeof(int)*pCsr->nTree*2; pCsr->aTree = (int *)lsmMallocZeroRc(pCsr->pDb->pEnv, nByte, &rc); @@ -1851,10 +2248,15 @@ pCsr->flags |= CURSOR_AT_FREELIST; } for(i=0; rc==LSM_OK && inSegCsr; i++){ rc = segmentCursorEnd(&pCsr->aSegCsr[i], bLast); } + + if( rc==LSM_OK && pCsr->pBtCsr ){ + assert( bLast==0 ); + rc = btreeCursorFirst(pCsr->pBtCsr); + } if( rc==LSM_OK ){ rc = multiCursorAllocTree(pCsr); } @@ -2096,10 +2498,13 @@ pCsr->flags &= ~CURSOR_AT_FREELIST; pCsr->flags |= CURSOR_AT_LEVELS; }else{ pCsr->flags &= ~CURSOR_AT_LEVELS; } + }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nSegCsr) ){ + assert( bReverse==0 && pCsr->pBtCsr ); + rc = btreeCursorNext(pCsr->pBtCsr); }else{ LevelCursor *pLevel = &pCsr->aSegCsr[iKey-CURSOR_DATA_SEGMENT]; rc = segmentCursorAdvance(pLevel, bReverse); } if( rc==LSM_OK ){ @@ -2149,13 +2554,30 @@ } return LSM_OK; } int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){ + void *pVal; + int nVal; + int rc; + assert( pCsr->aTree ); assert( rtIsDelete(pCsr->eType)==0 || !(pCsr->flags & CURSOR_IGNORE_DELETE) ); - return multiCursorGetVal(pCsr, pCsr->aTree[1], ppVal, pnVal); + + rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal); + if( pVal && rc==LSM_OK ){ + rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal); + pVal = pCsr->val.pData; + } + + if( rc!=LSM_OK ){ + pVal = 0; + nVal = 0; + } + *ppVal = pVal; + *pnVal = nVal; + return rc; } int lsmMCursorType(MultiCursor *pCsr, int *peType){ assert( pCsr->aTree ); multiCursorGetKey(pCsr, pCsr->aTree[1], peType, 0, 0); @@ -2201,34 +2623,36 @@ ** ** The complication is that not all database pages are the same size - due ** to the way the file.c module works some (the first and last in each block) ** are 4 bytes smaller than the others. */ -static int mergeWorkerMoveHierarchy(MergeWorker *pMW){ - SortedRun *pSep; /* Separators run being modified */ +static int mergeWorkerMoveHierarchy( + MergeWorker *pMW, /* Merge worker */ + int bSep /* True for separators run */ +){ + Segment *pSeg; /* Segment being written */ lsm_db *pDb = pMW->pDb; /* Database handle */ int rc = LSM_OK; /* Return code */ int i; int iRight = 0; - int nHier = pMW->nHier; - Page **apHier = pMW->apHier; + Page **apHier = pMW->hier.apHier; + int nHier = pMW->hier.nHier; assert( nHier>0 && pMW->pLevel->pMerge->bHierReadonly ); - - pSep = &pMW->pLevel->lhs.sep; + pSeg = &pMW->pLevel->lhs; for(i=0; rc==LSM_OK && ipFS, pDb->pWorker, pSep, &pNew); + rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNew); assert( rc==LSM_OK ); if( rc==LSM_OK ){ u8 *a1; int n1; u8 *a2; int n2; - a1 = lsmFsPageData(pNew, &n1); - a2 = lsmFsPageData(apHier[i], &n2); + a1 = fsPageData(pNew, &n1); + a2 = fsPageData(apHier[i], &n2); assert( n1==n2 || n1+4==n2 || n2+4==n1 ); if( n1>=n2 ){ /* If n1 (size of the new page) is equal to or greater than n2 (the ** size of the old page), then copy the data into the new page. If @@ -2254,11 +2678,11 @@ } } #ifdef LSM_DEBUG if( rc==LSM_OK ){ - for(i=0; iapHier[i]) ); + for(i=0; ipLevel->pMerge->bHierReadonly = 0; @@ -2269,19 +2693,23 @@ /* ** Allocate and populate the MergeWorker.apHier[] array. */ static int mergeWorkerLoadHierarchy(MergeWorker *pMW){ int rc = LSM_OK; - SortedRun *pSep = &pMW->pLevel->lhs.sep; + Segment *pSeg; + Hierarchy *p; + + pSeg = &pMW->pLevel->lhs; + p = &pMW->hier; - if( pMW->apHier==0 && pSep->iRoot!=0 ){ + if( p->apHier==0 && pSeg->iRoot!=0 ){ int bHierReadonly = pMW->pLevel->pMerge->bHierReadonly; FileSystem *pFS = pMW->pDb->pFS; lsm_env *pEnv = pMW->pDb->pEnv; Page **apHier = 0; int nHier = 0; - int iPg = pSep->iRoot; + int iPg = pSeg->iRoot; do { Page *pPg = 0; u8 *aData; int nData; @@ -2288,11 +2716,11 @@ int flags; rc = lsmFsDbPageGet(pFS, iPg, &pPg); if( rc!=LSM_OK ) break; - aData = lsmFsPageData(pPg, &nData); + aData = fsPageData(pPg, &nData); flags = pageGetFlags(aData, nData); if( flags&SEGMENT_BTREE_FLAG ){ Page **apNew = (Page **)lsmRealloc( pEnv, apHier, sizeof(Page *)*(nHier+1) ); @@ -2312,12 +2740,12 @@ break; } }while( 1 ); if( rc==LSM_OK ){ - pMW->nHier = nHier; - pMW->apHier = apHier; + p->nHier = nHier; + p->apHier = apHier; }else{ int i; for(i=0; ipLevel->lhs; + p = &pMW->hier; rc = mergeWorkerLoadHierarchy(pMW); - /* TODO: What the heck does this do? */ - if( pMW->nHier ){ - aData = lsmFsPageData(pMW->apHier[0], &nData); + /* Obtain the absolute pointer value to store along with the key in the + ** page body. This pointer points to a page that contains keys that are + ** smaller than pKey/nKey. */ + if( p->nHier ){ + aData = fsPageData(p->apHier[0], &nData); iPtr = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]); }else{ - iPtr = pMW->pLevel->lhs.sep.iFirst; + iPtr = pSeg->iFirst; } - if( pMW->nHier && pMW->pLevel->pMerge->bHierReadonly ){ - rc = mergeWorkerMoveHierarchy(pMW); + if( p->nHier && pMW->pLevel->pMerge->bHierReadonly ){ + rc = mergeWorkerMoveHierarchy(pMW, bSep); if( rc!=LSM_OK ) goto push_hierarchy_out; } /* Determine if the indirect format should be used. */ bIndirect = (nKey*4 > lsmFsPageSize(pMW->pDb->pFS)); @@ -2414,31 +2850,31 @@ ** ** This loop searches for a node with enough space to store the key on, ** starting with the leaf and iterating up towards the root. When the loop ** exits, the key may be written to apHier[iLevel]. */ - for(iLevel=0; iLevel<=pMW->nHier; iLevel++){ + for(iLevel=0; iLevel<=p->nHier; iLevel++){ int nByte; /* Number of free bytes required */ int iRight; /* Right hand pointer from aData[]/nData */ - if( iLevel==pMW->nHier ){ + if( iLevel==p->nHier ){ /* Extend the array and allocate a new root page. */ Page **aNew; aNew = (Page **)lsmRealloc( - pMW->pDb->pEnv, pMW->apHier, sizeof(Page *)*(pMW->nHier+1) + pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1) ); if( !aNew ){ rc = LSM_NOMEM_BKPT; goto push_hierarchy_out; } - pMW->apHier = aNew; + p->apHier = aNew; }else{ int nFree; /* If the key will fit on this page, break out of the loop. */ - assert( lsmFsPageWritable(pMW->apHier[iLevel]) ); - aData = lsmFsPageData(pMW->apHier[iLevel], &nData); + assert( lsmFsPageWritable(p->apHier[iLevel]) ); + aData = fsPageData(p->apHier[iLevel], &nData); iRight = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]); if( bIndirect ){ nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(iKeyPg); }else{ nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(nKey) + nKey; @@ -2446,40 +2882,40 @@ nRec = pageGetNRec(aData, nData); nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData); if( nByte<=nFree ) break; /* Otherwise, it is full. Release it. */ - iPtr = lsmFsPageNumber(pMW->apHier[iLevel]); - rc = lsmFsPageRelease(pMW->apHier[iLevel]); + iPtr = lsmFsPageNumber(p->apHier[iLevel]); + rc = lsmFsPageRelease(p->apHier[iLevel]); } /* Allocate a new page for apHier[iLevel]. */ - pMW->apHier[iLevel] = 0; + p->apHier[iLevel] = 0; if( rc==LSM_OK ){ rc = lsmFsSortedAppend( - pDb->pFS, pDb->pWorker, &pMW->pLevel->lhs.sep, &pMW->apHier[iLevel] + pDb->pFS, pDb->pWorker, pSeg, &p->apHier[iLevel] ); } if( rc!=LSM_OK ) goto push_hierarchy_out; - aData = lsmFsPageData(pMW->apHier[iLevel], &nData); + aData = fsPageData(p->apHier[iLevel], &nData); memset(aData, 0, nData); lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG); lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0); if( iLevel>0 ){ - iRight = lsmFsPageNumber(pMW->apHier[iLevel-1]); + iRight = lsmFsPageNumber(p->apHier[iLevel-1]); lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight); } - if( iLevel==pMW->nHier ){ - pMW->nHier++; + if( iLevel==p->nHier ){ + p->nHier++; break; } } /* Write the key into page apHier[iLevel]. */ - aData = lsmFsPageData(pMW->apHier[iLevel], &nData); + aData = fsPageData(p->apHier[iLevel], &nData); iOff = mergeWorkerPageOffset(aData, nData); nRec = pageGetNRec(aData, nData); lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], iOff); @@ -2495,90 +2931,23 @@ iOff += lsmVarintPut32(&aData[iOff], nKey); memcpy(&aData[iOff], pKey, nKey); } if( iLevel>0 ){ - int iRight = lsmFsPageNumber(pMW->apHier[iLevel-1]); + int iRight = lsmFsPageNumber(p->apHier[iLevel-1]); lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight); } /* Write the right-hand pointer of the right-most leaf page of the ** b-tree heirarchy. */ - aData = lsmFsPageData(pMW->apHier[0], &nData); + aData = fsPageData(p->apHier[0], &nData); lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iKeyPg); /* Ensure that the SortedRun.iRoot field is correct. */ - pMW->pLevel->lhs.sep.iRoot = lsmFsPageNumber(pMW->apHier[pMW->nHier-1]); + pSeg->iRoot = lsmFsPageNumber(p->apHier[p->nHier-1]); push_hierarchy_out: - return rc; -} - -/* -** The merge-worker object passed as the first argument to this function -** was used for an in-memory tree flush. If one was required, the separators -** array has been assembled in-memory (as a "phantom"). In this case it -** consists of leaf nodes only, there are no b-tree nodes. This function -** materializes the phantom run (writes it into the db file) and appends -** any required b-tree nodes. -*/ -static int mergeWorkerBuildHierarchy(MergeWorker *pMW){ - int rc = LSM_OK; - - assert( pMW->bFlush ); - assert( pMW->pLevel->lhs.sep.iRoot==0 ); - - if( pMW->apPage[1] ){ - SortedRun *pRun; /* Separators run to materialize */ - lsm_db *db = pMW->pDb; - Blob blob = {0, 0, 0}; - Page *pPg; - int iLast; - - /* Write the leaf pages into the file. They now have page numbers, - ** which can be used as pointers in the b-tree hierarchy. */ - pRun = &pMW->pLevel->lhs.sep; - rc = lsmFsPhantomMaterialize(db->pFS, db->pWorker, pRun); - - if( rc==LSM_OK ){ - rc = lsmFsDbPageGet(db->pFS, pRun->iFirst, &pPg); - } - - iLast = pRun->iLast; - while( rc==LSM_OK && lsmFsPageNumber(pPg)!=iLast ){ - Page *pNext = 0; - - rc = lsmFsDbPageNext(pRun, pPg, 1, &pNext); - lsmFsPageRelease(pPg); - pPg = pNext; - - if( rc==LSM_OK ){ - u8 *aData; - int nData; - aData = lsmFsPageData(pPg, &nData); - if( pageGetNRec(aData, nData)>0 ){ - u8 *pKey; - int nKey; - int iTopic; - Pgno iPg = lsmFsPageNumber(pPg); - - pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob); - rc = mergeWorkerPushHierarchy(pMW, iPg, iTopic, pKey, nKey); - } - } - } - - if( pMW->nHier>0 ){ - Page *pRoot = pMW->apHier[pMW->nHier-1]; - pRun->iRoot = lsmFsPageNumber(pRoot); - }else{ - pRun->iRoot = pRun->iFirst; - } - - lsmFsPageRelease(pPg); - sortedBlobFree(&blob); - } return rc; } static int keyszToSkip(FileSystem *pFS, int nKey){ int nPgsz; /* Nominal database page size */ @@ -2586,49 +2955,43 @@ return LSM_MIN(((nKey * 4) / nPgsz), 3); } /* ** Advance to the next page of an output run being populated by merge-worker -** pMW. If bSep is true, the separators run output is advanced by one page. -** Otherwise, the main run. -** -** The footer of the new page is initialized to indicate that it contains +** pMW. The footer of the new page is initialized to indicate that it contains ** zero records. The flags field is cleared. The page footer pointer field ** is set to iFPtr. ** ** If successful, LSM_OK is returned. Otherwise, an error code. */ static int mergeWorkerNextPage( MergeWorker *pMW, /* Merge worker object to append page to */ - int bSep, /* True to append to the separators array */ int iFPtr /* Pointer value for footer of new page */ ){ int rc = LSM_OK; /* Return code */ Page *pNext = 0; /* New page appended to run */ lsm_db *pDb = pMW->pDb; /* Database handle */ - SortedRun *pRun; /* Run to append to */ + Segment *pSeg; /* Run to append to */ - assert( bSep==0 || bSep==1 ); - - pRun = (bSep ? &pMW->pLevel->lhs.sep : &pMW->pLevel->lhs.run); - rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pRun, &pNext); - assert( rc!=LSM_OK || bSep || pRun->iFirst>0 ); + pSeg = &pMW->pLevel->lhs; + rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext); + assert( rc!=LSM_OK || pSeg->iFirst>0 ); if( rc==LSM_OK ){ u8 *aData; /* Data buffer belonging to page pNext */ int nData; /* Size of aData[] in bytes */ - lsmFsPageRelease(pMW->apPage[bSep]); - pMW->apPage[bSep] = pNext; - pMW->pLevel->pMerge->aiOutputOff[bSep] = 0; + lsmFsPageRelease(pMW->pPage); + pMW->pPage = pNext; + pMW->pLevel->pMerge->iOutputOff = 0; - aData = lsmFsPageData(pNext, &nData); + aData = fsPageData(pNext, &nData); lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0); lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0); lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr); - if( bSep==0 ) pMW->nWork++; + pMW->nWork++; } return rc; } @@ -2655,37 +3018,36 @@ u8 *aData; /* Pointer to buffer of current output page */ int nData; /* Size of aData[] in bytes */ int nRec; /* Number of records on current output page */ int iOff; /* Offset in aData[] to write to */ - assert( lsmFsPageWritable(pMW->apPage[bSep]) ); + assert( lsmFsPageWritable(pMW->pPage) ); - aData = lsmFsPageData(pMW->apPage[bSep], &nData); + aData = fsPageData(pMW->pPage, &nData); nRec = pageGetNRec(aData, nData); - iOff = pMerge->aiOutputOff[bSep]; + iOff = pMerge->iOutputOff; nCopy = LSM_MIN(nRem, SEGMENT_EOF(nData, nRec) - iOff); memcpy(&aData[iOff], &aWrite[nWrite-nRem], nCopy); nRem -= nCopy; if( nRem>0 ){ - rc = mergeWorkerNextPage(pMW, bSep, iFPtr); + rc = mergeWorkerNextPage(pMW, iFPtr); }else{ - pMerge->aiOutputOff[bSep] = iOff + nCopy; + pMerge->iOutputOff = iOff + nCopy; } } return rc; } static int mergeWorkerWrite( MergeWorker *pMW, /* Merge worker object to write into */ - int bSep, /* True to write to separators array */ int eType, /* One of SORTED_SEPARATOR, WRITE or DELETE */ void *pKey, int nKey, /* Key value */ - void *pVal, int nVal, /* Accompanying value, if any */ + MultiCursor *pCsr, /* Read value (if any) from here */ int iPtr, /* Absolute value of page pointer, or 0 */ int *piPtrOut /* OUT: Pointer to write to separators */ ){ int rc = LSM_OK; /* Return code */ Merge *pMerge; /* Persistent part of level merge state */ @@ -2695,21 +3057,20 @@ int nData; /* Size of buffer aData[] in bytes */ int nRec; /* Number of records on page pPg */ int iFPtr; /* Value of pointer in footer of pPg */ int iRPtr; /* Value of pointer written into record */ int iOff; /* Current write offset within page pPg */ - SortedRun *pRun; /* Run being written to */ + Segment *pSeg; /* Segment being written */ int flags = 0; /* If != 0, flags value for page footer */ - - assert( bSep==0 || bSep==1 ); - assert( bSep==0 || rtIsSeparator(eType) ); + void *pVal; + int nVal; pMerge = pMW->pLevel->pMerge; - pRun = (bSep ? &pMW->pLevel->lhs.sep : &pMW->pLevel->lhs.run); + pSeg = &pMW->pLevel->lhs; - pPg = pMW->apPage[bSep]; - aData = lsmFsPageData(pPg, &nData); + pPg = pMW->pPage; + aData = fsPageData(pPg, &nData); nRec = pageGetNRec(aData, nData); iFPtr = pageGetPtr(aData, nData); /* If iPtr is 0, set it to the same value as the absolute pointer ** stored as part of the previous record. */ @@ -2734,24 +3095,26 @@ ** 1) record type - 1 byte. ** 2) Page-pointer-offset - 1 varint ** 3) Key size - 1 varint ** 4) Value size - 1 varint (SORTED_WRITE only) */ - nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey); - if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal); - - /* If the entire header will not fit on page pPg, or if page pPg is - ** marked read-only, advance to the next page of the output run. */ - iOff = pMerge->aiOutputOff[bSep]; - if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){ - iFPtr = iFPtr + (nRec ? pageGetRecordPtr(aData, nData, nRec-1) : 0); - iRPtr = iPtr - iFPtr; - iOff = 0; - nRec = 0; - rc = mergeWorkerNextPage(pMW, bSep, iFPtr); - pPg = pMW->apPage[bSep]; - aData = lsmFsPageData(pPg, &nData); + rc = lsmMCursorValue(pCsr, &pVal, &nVal); + if( rc==LSM_OK ){ + nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey); + if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal); + + /* If the entire header will not fit on page pPg, or if page pPg is + ** marked read-only, advance to the next page of the output run. */ + iOff = pMerge->iOutputOff; + if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){ + iFPtr = iFPtr + (nRec ? pageGetRecordPtr(aData, nData, nRec-1) : 0); + iRPtr = iPtr - iFPtr; + iOff = 0; + nRec = 0; + rc = mergeWorkerNextPage(pMW, iFPtr); + pPg = pMW->pPage; + } } /* If this record header will be the first on the page, and the page is ** not the very first in the entire run, special actions may need to be ** taken: @@ -2761,32 +3124,31 @@ ** array that points to the current page. ** ** * If currently writing the separators array, push a copy of the key ** into the b-tree hierarchy. */ - if( rc==LSM_OK && nRec==0 && pRun->iFirst!=pRun->iLast ){ + if( rc==LSM_OK && nRec==0 && pSeg->iFirst!=pSeg->iLast ){ assert( pMerge->nSkip>=0 ); - if( bSep ){ - if( pMW->bFlush==0 ){ - Pgno iPg = lsmFsPageNumber(pPg); - rc = mergeWorkerPushHierarchy(pMW, iPg, rtTopic(eType), pKey, nKey); - } - }else{ - if( pMerge->nSkip ){ - pMerge->nSkip--; - flags = PGFTR_SKIP_THIS_FLAG; - }else{ - *piPtrOut = lsmFsPageNumber(pPg); - pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey); - } - if( pMerge->nSkip ) flags |= PGFTR_SKIP_NEXT_FLAG; - } + if( pMerge->nSkip==0 ){ + Pgno iPg = lsmFsPageNumber(pPg); + rc = mergeWorkerPushHierarchy(pMW, 0, iPg, rtTopic(eType), pKey, nKey); + } + if( pMerge->nSkip ){ + pMerge->nSkip--; + flags = PGFTR_SKIP_THIS_FLAG; + }else{ + *piPtrOut = lsmFsPageNumber(pPg); + pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey); + } + if( pMerge->nSkip ) flags |= PGFTR_SKIP_NEXT_FLAG; } /* Update the output segment */ if( rc==LSM_OK ){ + aData = fsPageData(pPg, &nData); + /* Update the page footer. */ lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1); lsmPutU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec)], iOff); if( flags ) lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], flags); @@ -2793,17 +3155,20 @@ /* Write the entry header into the current page. */ aData[iOff++] = eType; /* 1 */ iOff += lsmVarintPut32(&aData[iOff], iRPtr); /* 2 */ iOff += lsmVarintPut32(&aData[iOff], nKey); /* 3 */ if( rtIsWrite(eType) ) iOff += lsmVarintPut32(&aData[iOff], nVal); /* 4 */ - pMerge->aiOutputOff[bSep] = iOff; + pMerge->iOutputOff = iOff; /* Write the key and data into the segment. */ assert( iFPtr==pageGetPtr(aData, nData) ); - rc = mergeWorkerData(pMW, bSep, iFPtr+iRPtr, pKey, nKey); + rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pKey, nKey); if( rc==LSM_OK && rtIsWrite(eType) ){ - rc = mergeWorkerData(pMW, bSep, iFPtr+iRPtr, pVal, nVal); + if( rtTopic(eType)==0 ) rc = lsmMCursorValue(pCsr, &pVal, &nVal); + if( rc==LSM_OK ){ + rc = mergeWorkerData(pMW, 0, iFPtr+iRPtr, pVal, nVal); + } } } return rc; } @@ -2836,90 +3201,115 @@ /* Unless the merge has finished, save the cursor position in the ** Merge.aInput[] array. See function mergeWorkerInit() for the ** code to restore a cursor position based on aInput[]. */ if( pCsr ){ Merge *pMerge = pMW->pLevel->pMerge; + int bBtree = (pCsr->pBtCsr!=0); /* pMerge->nInput==0 indicates that this is a FlushTree() operation. */ assert( pMerge->nInput==0 || pMW->pLevel->nRight>0 ); - assert( pMerge->nInput==0 || pMerge->nInput==pCsr->nSegCsr ); + assert( pMerge->nInput==0 || pMerge->nInput==(pCsr->nSegCsr+bBtree) ); - for(i=0; inInput; i++){ + for(i=0; i<(pMerge->nInput-bBtree); i++){ SegmentPtr *pPtr = &pCsr->aSegCsr[i].aPtr[0]; if( pPtr->pPg ){ pMerge->aInput[i].iPg = lsmFsPageNumber(pPtr->pPg); pMerge->aInput[i].iCell = pPtr->iCell; }else{ pMerge->aInput[i].iPg = 0; pMerge->aInput[i].iCell = 0; } } + if( bBtree && pMerge->nInput ){ + assert( i==pCsr->nSegCsr ); + btreeCursorPosition(pCsr->pBtCsr, &pMerge->aInput[i]); + } } lsmMCursorClose(pCsr); - lsmFsPageRelease(pMW->apPage[0]); - lsmFsPageRelease(pMW->apPage[1]); - for(i=0; inHier; i++){ - lsmFsPageRelease(pMW->apHier[i]); + lsmFsPageRelease(pMW->pPage); + lsmFsPageRelease(pMW->pPage); + + for(i=0; i<2; i++){ + Hierarchy *p = &pMW->hier; + int iPg; + for(iPg=0; iPgnHier; iPg++){ + lsmFsPageRelease(p->apHier[iPg]); + } + lsmFree(pMW->pDb->pEnv, p->apHier); + p->apHier = 0; + p->nHier = 0; } - lsmFree(pMW->pDb->pEnv, pMW->apHier); pMW->pCsr = 0; - pMW->apHier = 0; - pMW->nHier = 0; - pMW->apPage[0] = 0; - pMW->apPage[1] = 0; + pMW->pPage = 0; + pMW->pPage = 0; } static int mergeWorkerFirstPage(MergeWorker *pMW){ int rc; /* Return code */ - SortedRun *pRun; /* Run containing sep. keys to merge in */ - Page *pPg = 0; /* First page of run pRun */ + Page *pPg = 0; /* First page of run pSeg */ + int iFPtr; /* Pointer value read from footer of pPg */ + MultiCursor *pCsr = pMW->pCsr; - assert( pMW->apPage[0]==0 ); + assert( pMW->pPage==0 ); - pRun = pMW->pCsr->aSegCsr[pMW->pCsr->nSegCsr-1].aPtr[0].pRun; - rc = lsmFsDbPageGet(pMW->pDb->pFS, pRun->iFirst, &pPg); + if( pCsr->pBtCsr ){ + rc = LSM_OK; + iFPtr = pMW->pLevel->pNext->lhs.iFirst; + }else{ + Segment *pSeg; + pSeg = pMW->pCsr->aSegCsr[pMW->pCsr->nSegCsr-1].aPtr[0].pSeg; + rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iFirst, &pPg); + if( rc==LSM_OK ){ + u8 *aData; /* Buffer for page pPg */ + int nData; /* Size of aData[] in bytes */ + aData = fsPageData(pPg, &nData); + iFPtr = pageGetPtr(aData, nData); + lsmFsPageRelease(pPg); + } + } + if( rc==LSM_OK ){ - u8 *aData; /* Buffer for page pPg */ - int nData; /* Size of aData[] in bytes */ - int iFPtr; /* Pointer value read from footer of pPg */ - aData = lsmFsPageData(pPg, &nData); - iFPtr = pageGetPtr(aData, nData); - lsmFsPageRelease(pPg); - rc = mergeWorkerNextPage(pMW, 0, iFPtr); + rc = mergeWorkerNextPage(pMW, iFPtr); } return rc; } static int mergeWorkerStep(MergeWorker *pMW){ lsm_db *pDb = pMW->pDb; /* Database handle */ MultiCursor *pCsr; /* Cursor to read input data from */ - int rc; /* Return code */ + int rc = LSM_OK; /* Return code */ int eType; /* SORTED_SEPARATOR, WRITE or DELETE */ void *pKey; int nKey; /* Key */ - void *pVal; int nVal; /* Value */ Segment *pSeg; /* Output segment */ int iPtr = 0; pCsr = pMW->pCsr; pSeg = &pMW->pLevel->lhs; /* Pull the next record out of the source cursor. */ lsmMCursorKey(pCsr, &pKey, &nKey); - rc = lsmMCursorValue(pCsr, &pVal, &nVal); eType = pCsr->eType; - if( rc!=LSM_OK ) return rc; /* Figure out if the output record may have a different pointer value ** than the previous. This is the case if the current key is identical to ** a key that appears in the lowest level run being merged. If so, set ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, ** indicating that the output pointer value should be a copy of the pointer ** value written with the previous key. */ - if( pCsr->nSegCsr ){ + if( pCsr->pBtCsr ){ + BtreeCursor *pBtCsr = pCsr->pBtCsr; + if( pBtCsr->pKey ){ + int res = rtTopic(pBtCsr->eType) - rtTopic(eType); + if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey); + if( 0==res ) iPtr = pBtCsr->iPtr; + + assert( res>=0 ); + } + }else if( pCsr->nSegCsr ){ LevelCursor *pPtrs = &pCsr->aSegCsr[pCsr->nSegCsr-1]; if( segmentCursorValid(pPtrs) && 0==pDb->xCmp(pPtrs->aPtr[0].pKey, pPtrs->aPtr[0].nKey, pKey, nKey) ){ iPtr = pPtrs->aPtr[0].iPtr+pPtrs->aPtr[0].iPgPtr; @@ -2930,41 +3320,17 @@ ** changed, there is no point in writing an output record. Otherwise, ** proceed. */ if( rtIsSeparator(eType)==0 || iPtr!=0 ){ int iSPtr = 0; /* Separators require a pointer here */ - if( pMW->apPage[0]==0 ){ + if( pMW->pPage==0 ){ rc = mergeWorkerFirstPage(pMW); } /* Write the record into the main run. */ if( rc==LSM_OK ){ - rc = mergeWorkerWrite(pMW, 0, eType, pKey, nKey, pVal, nVal, iPtr,&iSPtr); - } - - /* If the call to mergeWorkerWrite() above started a new page, then - ** add a SORTED_SEPARATOR key to the separators run. */ - if( rc==LSM_OK && iSPtr ){ - - /* If the separators array has not been started, start it now. */ - if( pMW->apPage[1]==0 ){ - assert( pSeg->run.iFirst!=0 ); - rc = mergeWorkerNextPage(pMW, 1, pSeg->run.iFirst); - if( !pMW->bFlush ) pSeg->sep.iRoot = pSeg->sep.iFirst; - } - - if( rc==LSM_OK ){ - int eSType; /* Type of record for separators array */ - - /* Figure out how many (if any) keys to skip from this point. */ - assert( pMW->apPage[1] && (pSeg->sep.iFirst || pMW->bFlush) ); - pMW->pLevel->pMerge->nSkip = keyszToSkip(pDb->pFS, nKey); - - /* Write the key into the separators array. */ - eSType = rtTopic(eType) | SORTED_SEPARATOR; - rc = mergeWorkerWrite(pMW, 1, eSType, pKey, nKey, 0, 0, iSPtr, 0); - } + rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr, &iSPtr); } } /* Advance the cursor to the next input record (assuming one exists). */ assert( lsmMCursorValid(pMW->pCsr) ); @@ -2974,19 +3340,24 @@ ** references currently held by the merge worker and inform the ** FileSystem object that no further pages will be appended to either ** the main or separators array. */ if( rc==LSM_OK && !lsmMCursorValid(pMW->pCsr) ){ - if( pSeg->run.iFirst ){ - rc = lsmFsSortedFinish(pDb->pFS, &pSeg->run); - } - if( rc==LSM_OK && pMW->bFlush ){ - rc = mergeWorkerBuildHierarchy(pMW); - } - if( rc==LSM_OK && pSeg->sep.iFirst ){ - rc = lsmFsSortedFinish(pDb->pFS, &pSeg->sep); - } + if( pSeg->iFirst ){ + rc = lsmFsSortedFinish(pDb->pFS, pSeg); + } + +#ifdef LSM_DEBUG_EXPENSIVE + if( rc==LSM_OK ){ + rc = assertBtreeOk(pDb, pSeg); + if( pMW->pCsr->pBtCsr ){ + Segment *pNext = &pMW->pLevel->pNext->lhs; + rc = assertPointersOk(pDb, pSeg, pNext, 0); + } + } +#endif + mergeWorkerShutdown(pMW); } return rc; } @@ -3014,11 +3385,11 @@ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ - SortedRun *pDel = 0; + Segment *pDel = 0; /* Delete separators from this segment */ int iLeftPtr = 0; /* Allocate the new level structure to write to. */ pNext = lsmDbSnapshotLevel(pDb->pWorker); pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc); @@ -3034,24 +3405,17 @@ rc = multiCursorNew(pDb, pDb->pWorker, (pDb->pTV!=0), 0, &pCsr); if( rc==LSM_OK ){ if( pNext ){ assert( pNext->pMerge==0 || pNext->nRight>0 ); if( pNext->pMerge==0 ){ - if( segmentHasSeparators(&pNext->lhs) ){ + if( pNext->lhs.iRoot ){ rc = multiCursorAddLevel(pCsr, pNext, MULTICURSOR_ADDLEVEL_LHS_SEP); - - /* This call moves any blocks occupied by separators array pDel - ** to the pending list. We do this here, even though pDel will be - ** read while building the new level, so that the blocks will be - ** included in the "FREELIST" entry visited by the cursor (and - ** written into the new top level). */ if( rc==LSM_OK ){ - pDel = &pNext->lhs.sep; - rc = lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 0, pDel); + pDel = &pNext->lhs; } } - iLeftPtr = pNext->lhs.run.iFirst; + iLeftPtr = pNext->lhs.iFirst; } }else{ /* The new level will be the only level in the LSM. There is no reason ** to write out delete keys in this case. */ multiCursorIgnoreDelete(pCsr); @@ -3079,35 +3443,29 @@ mergeworker.pLevel = pNew; mergeworker.pCsr = pCsr; /* Mark the separators array for the new level as a "phantom". */ mergeworker.bFlush = 1; - lsmFsPhantom(pDb->pFS, &pNew->lhs.sep); /* Allocate the first page of the output segment. */ - rc = mergeWorkerNextPage(&mergeworker, 0, iLeftPtr); + rc = mergeWorkerNextPage(&mergeworker, iLeftPtr); /* Do the work to create the new merged segment on disk */ if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr); while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){ rc = mergeWorkerStep(&mergeworker); } - lsmFsPhantomFree(pDb->pFS); mergeWorkerShutdown(&mergeworker); pNew->pMerge = 0; } lsmFreelistDeltaEnd(pDb); - /* Link the new level into the top of the tree. Delete the separators - ** array (if any) that was merged into the new level. */ + /* Link the new level into the top of the tree. */ if( rc==LSM_OK ){ if( pDel ){ - /* lsmFsSortedDelete() has already been called on pDel. So all - ** that is required here is to zero it (so that it is not used by - ** future LSM searches). */ - memset(pDel, 0, sizeof(SortedRun)); + pDel->iRoot = 0; } }else{ lsmDbSnapshotSetLevel(pDb->pWorker, pNext); sortedFreeLevel(pDb->pEnv, pNew); } @@ -3158,12 +3516,10 @@ #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "tree flush"); #endif - assertAllBtreesOk(rc, pDb); - assertAllPointersOk(rc, pDb); assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); lsmFinishFlush(pDb, rc==LSM_OK); return rc; } @@ -3218,11 +3574,11 @@ for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext)); *pp = pNew; lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel); /* Determine whether or not the next separators will be linked in */ - if( pNext && pNext->pMerge==0 && segmentHasSeparators(&pNext->lhs) ){ + if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){ bUseNext = 1; } } /* Allocate the merge object */ @@ -3236,35 +3592,35 @@ *ppNew = pNew; return rc; } -static int mergeWorkerLoadOutputPage(MergeWorker *pMW, int bSep){ +static int mergeWorkerLoadOutputPage(MergeWorker *pMW){ int rc = LSM_OK; /* Return code */ - SortedRun *pRun; /* Run to load page from */ + Segment *pSeg; /* Run to load page from */ Level *pLevel; pLevel = pMW->pLevel; - pRun = (bSep ? &pLevel->lhs.sep : &pLevel->lhs.run); - if( pRun->iLast ){ + pSeg = &pLevel->lhs; + if( pSeg->iLast ){ Page *pPg; - rc = lsmFsDbPageGet(pMW->pDb->pFS, pRun->iLast, &pPg); + rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iLast, &pPg); while( rc==LSM_OK ){ Page *pNext; u8 *aData; int nData; - aData = lsmFsPageData(pPg, &nData); + aData = fsPageData(pPg, &nData); if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ) break; - rc = lsmFsDbPageNext(pRun, pPg, -1, &pNext); + rc = lsmFsDbPageNext(pSeg, pPg, -1, &pNext); lsmFsPageRelease(pPg); pPg = pNext; } if( rc==LSM_OK ){ - pMW->apPage[bSep] = pPg; - if( pLevel->pMerge->aiOutputOff[bSep]>=0 ) rc = lsmFsPageWrite(pPg); + pMW->pPage = pPg; + if( pLevel->pMerge->iOutputOff>=0 ) rc = lsmFsPageWrite(pPg); } } return rc; } @@ -3306,20 +3662,19 @@ } multiCursorReadSeparators(pCsr); }else{ multiCursorIgnoreDelete(pCsr); } - assert( pMerge->nInput==pCsr->nSegCsr ); + assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nSegCsr+(pCsr->pBtCsr!=0)) ); pMW->pCsr = pCsr; - /* Load each of the output pages into memory. */ - if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW, 0); - if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW, 1); + /* Load the current output page into memory. */ + if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW); /* Position the cursor. */ if( rc==LSM_OK ){ - if( pMW->apPage[0]==0 ){ + if( pMW->pPage==0 ){ /* The output array is still empty. So position the cursor at the very ** start of the input. */ rc = multiCursorEnd(pCsr, 0); }else{ /* The output array is non-empty. Position the cursor based on the @@ -3336,10 +3691,15 @@ if( rc==LSM_OK && pPtr->nCell>0 ){ rc = segmentPtrLoadCell(pPtr, pInput->iCell); } } } + + if( rc==LSM_OK && pCsr->pBtCsr ){ + assert( i==pCsr->nSegCsr ); + rc = btreeCursorRestore(pCsr->pBtCsr, pCsr->xCmp, &pMerge->aInput[i]); + } if( rc==LSM_OK ){ rc = multiCursorSetupTree(pCsr, 0); } } @@ -3355,11 +3715,10 @@ int nRemaining = nWork; /* Units of work to do before returning */ Snapshot *pWorker = pDb->pWorker; assert( lsmFsIntegrityCheck(pDb) ); assert( pWorker ); - assertAllPointersOk(rc, pDb); if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK; lsmDatabaseDirty(pDb); while( nRemaining>0 ){ @@ -3368,11 +3727,11 @@ /* Find the longest contiguous run of levels not currently undergoing a ** merge with the same age in the structure. Or the level being merged ** with the largest number of right-hand segments. Work on it. */ Level *pBest = 0; - int nBest = 4; + int nBest = pDb->nMerge; Level *pThis = 0; int nThis = 0; for(pLevel = pTopLevel; pLevel; pLevel=pLevel->pNext){ @@ -3442,50 +3801,49 @@ if( rc==LSM_OK ){ if( mergeWorkerDone(&mergeworker)==0 ){ int iGobble = mergeworker.pCsr->aTree[1] - CURSOR_DATA_SEGMENT; if( iGobblenRight ){ SegmentPtr *pGobble = &mergeworker.pCsr->aSegCsr[iGobble].aPtr[0]; - if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){ - lsmFsGobble(pWorker, pGobble->pRun, pGobble->pPg); + if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 + && pGobble->pSeg->iRoot==0 + ){ + lsmFsGobble(pWorker, pGobble->pSeg, pGobble->pPg); } } - }else if( pLevel->lhs.run.iFirst==0 ){ + }else if( pLevel->lhs.iFirst==0 ){ /* If the new level is completely empty, remove it from the ** database snapshot. This can only happen if all input keys were ** annihilated. Since keys are only annihilated if the new level ** is the last in the linked list (contains the most ancient of ** database content), this guarantees that pLevel->pNext==0. */ Level *pTop; /* Top level of worker snapshot */ Level **pp; /* Read/write iterator for Level.pNext list */ assert( pLevel->pNext==0 ); - assert( segmentHasSeparators(&pLevel->lhs)==0 ); /* Remove the level from the worker snapshot. */ pTop = lsmDbSnapshotLevel(pWorker); for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext)); *pp = pLevel->pNext; lsmDbSnapshotSetLevel(pWorker, pTop); /* Free the Level structure. */ - lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->lhs.run); + lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->lhs); sortedFreeLevel(pDb->pEnv, pLevel); }else{ int i; /* Free the separators of the next level, if required. */ if( pLevel->pMerge->nInput > pLevel->nRight ){ - assert( pLevel->pNext ); - assert( segmentHasSeparators(&pLevel->pNext->lhs) ); - lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->pNext->lhs.sep); + assert( pLevel->pNext->lhs.iRoot ); + pLevel->pNext->lhs.iRoot = 0; } /* Free the right-hand-side of pLevel */ for(i=0; inRight; i++){ - lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i].run); - lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i].sep); + lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]); } lsmFree(pDb->pEnv, pLevel->aRhs); pLevel->nRight = 0; pLevel->aRhs = 0; @@ -3510,12 +3868,10 @@ if( pnWrite ){ *pnWrite = (nWork - nRemaining); } - assertAllBtreesOk(rc, pDb); - assertAllPointersOk(rc, pDb); assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); return rc; } typedef struct Metric Metric; @@ -3644,15 +4000,15 @@ /* ** Return a string representation of the segment passed as the only argument. ** Space for the returned string is allocated using lsmMalloc(), and should ** be freed by the caller using lsmFree(). */ -static char *segToString(lsm_env *pEnv, SortedRun *pRun, int nMin){ - int nSize = pRun->nSize; - Pgno iRoot = pRun->iRoot; - Pgno iFirst = pRun->iFirst; - Pgno iLast = pRun->iLast; +static char *segToString(lsm_env *pEnv, Segment *pSeg, int nMin){ + int nSize = pSeg->nSize; + Pgno iRoot = pSeg->iRoot; + Pgno iFirst = pSeg->iFirst; + Pgno iLast = pSeg->iLast; char *z; char *z1; char *z2; int nPad; @@ -3681,23 +4037,23 @@ static int fileToString( lsm_env *pEnv, /* For xMalloc() */ char *aBuf, int nBuf, int nMin, - SortedRun *pRun + Segment *pSeg ){ int i = 0; char *zSeg; - zSeg = segToString(pEnv, pRun, nMin); + zSeg = segToString(pEnv, pSeg, nMin); i += sqlite4_snprintf(&aBuf[i], nBuf-i, "%s", zSeg); lsmFree(pEnv, zSeg); return i; } -void sortedDumpPage(lsm_db *pDb, SortedRun *pRun, Page *pPg, int bVals){ +void sortedDumpPage(lsm_db *pDb, Segment *pRun, Page *pPg, int bVals){ Blob blob = {0, 0, 0}; /* Blob used for keys */ LsmString s; int i; int nRec; @@ -3704,11 +4060,11 @@ int iPtr; int flags; u8 *aData; int nData; - aData = lsmFsPageData(pPg, &nData); + aData = fsPageData(pPg, &nData); nRec = pageGetNRec(aData, nData); iPtr = pageGetPtr(aData, nData); flags = pageGetFlags(aData, nData); @@ -3782,11 +4138,11 @@ int eType; int iPgPtr; Page *pRef = 0; /* Pointer to page iRef */ u8 *aCell; - aData = lsmFsPageData(pPg, &nData); + aData = fsPageData(pPg, &nData); aCell = pageGetCell(aData, nData, iCell); eType = *aCell++; aCell += lsmVarintGet32(aCell, &iPgPtr); @@ -3850,11 +4206,11 @@ int iPtr; int flags; int iCell; u8 *aData; int nData; /* Page data and size thereof */ - aData = lsmFsPageData(pPg, &nData); + aData = fsPageData(pPg, &nData); nRec = pageGetNRec(aData, nData); iPtr = pageGetPtr(aData, nData); flags = pageGetFlags(aData, nData); lsmStringInit(&str, pDb->pEnv); @@ -3907,13 +4263,13 @@ lsmDbSnapshotRelease(pDb->pEnv, pRelease); return rc; } -void sortedDumpSegment(lsm_db *pDb, SortedRun *pRun, int bVals){ +void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){ assert( pDb->xLog ); - if( pRun ){ + if( pRun && pRun->iFirst ){ char *zSeg; Page *pPg; zSeg = segToString(pDb->pEnv, pRun, 0); lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg); @@ -3959,27 +4315,21 @@ for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){ char zLeft[1024]; char zRight[1024]; int i = 0; - SortedRun *aLeft[24]; - SortedRun *aRight[24]; + Segment *aLeft[24]; + Segment *aRight[24]; int nLeft = 0; int nRight = 0; Segment *pSeg = &pLevel->lhs; - if( segmentHasSeparators(pSeg) ){ - aLeft[nLeft++] = &pSeg->sep; - } - aLeft[nLeft++] = &pSeg->run; + aLeft[nLeft++] = pSeg; for(i=0; inRight; i++){ - if( segmentHasSeparators(&pLevel->aRhs[i]) ){ - aRight[nRight++] = &pLevel->aRhs[i].sep; - } - aRight[nRight++] = &pLevel->aRhs[i].run; + aRight[nRight++] = &pLevel->aRhs[i]; } for(i=0; ipLevel; pLevel; pLevel=pLevel->pNext){ - int iRhs; - lsmFsDumpBlockmap(pDb, pLevel->lhs.pSep); - lsmFsDumpBlockmap(pDb, pLevel->lhs.pRun); - for(iRhs=0; iRhsnRight; iRhs++){ - lsmFsDumpBlockmap(pDb, pLevel->aRhs[iRhs].pSep); - lsmFsDumpBlockmap(pDb, pLevel->aRhs[iRhs].pRun); - } - } - lsmFsDumpBlocklists(pDb); -#endif - if( bKeys ){ for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){ int i; - sortedDumpSegment(pDb, &pLevel->lhs.sep, 0); - sortedDumpSegment(pDb, &pLevel->lhs.run, bVals); + sortedDumpSegment(pDb, &pLevel->lhs, bVals); for(i=0; inRight; i++){ - if( pLevel->aRhs[i].sep.iFirst>0 ){ - sortedDumpSegment(pDb, &pLevel->aRhs[i].sep, 0); - } - sortedDumpSegment(pDb, &pLevel->aRhs[i].run, bVals); + sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals); } } } } @@ -4061,12 +4393,11 @@ assert( pDb->pWorker ); for(p=lsmDbSnapshotLevel(pDb->pWorker); p && rc==LSM_OK; p=p->pNext){ Merge *pMerge = p->pMerge; if( pMerge ){ - pMerge->aiOutputOff[0] = -1; - pMerge->aiOutputOff[1] = -1; + pMerge->iOutputOff = -1; pMerge->bHierReadonly = 1; } } return LSM_OK; @@ -4077,96 +4408,26 @@ for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){ lsmTreeCursorSave(pCsr->pTreeCsr); } } - -#ifdef LSM_DEBUG_EXPENSIVE - -/* -** Argument iPg is a page number within a separators run. Assert() that for -** each key K on on the page, (pKey1 >= K > pKey2) is true. -** -** Also, if page iPg is a BTREE page, call this function recursively to -** check that the keys on each child page fall into the expected range. -*/ -static void assertBtreeRanges( - lsm_db *pDb, - SortedRun *pRun, - Pgno iPg, /* Database page to load */ - void *pKey1, int nKey1, /* All keys must be >= than this */ - void *pKey2, int nKey2 /* And < than this */ -){ - Blob blob = {0, 0, 0}; - u8 *aData; - int nData; - Page *pPg; - int rc; - int i; - int nRec; - int flags; - - int iPrevTopic = 0; /* Previous topic value */ - u8 *aPrev = 0; /* Buffer pointing to previous key */ - int nPrev = 0; /* Size of aPrev[] in bytes */ - - rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg); - assert( rc==LSM_OK ); - aData = lsmFsPageData(pPg, &nData); - - nRec = pageGetNRec(aData, nData); - flags = pageGetFlags(aData, nData); - - for(i=0; ixCmp(aKey, nKey, pKey1, nKey1)>=0 ); - assert( pKey2==0 || pDb->xCmp(aKey, nKey, pKey2, nKey2)<0 ); - - if( flags&SEGMENT_BTREE_FLAG ){ - assertBtreeRanges(pDb, pRun, iPtr, aPrev, nPrev, aKey, nKey); - } - aPrev = aKey; - nPrev = nKey; - } - - if( flags&SEGMENT_BTREE_FLAG ){ - int iRight = pageGetPtr(aData, nData); - assertBtreeRanges(pDb, pRun, iRight, aPrev, nPrev, 0, 0); - } - - lsmFsPageRelease(pPg); - sortedBlobFree(&blob); -} - -/* -** Check that the array pOne contains the required pointers to pTwo. -** Array pTwo must be a main array. pOne may be either a separators array -** or another main array. -** -** If an error is encountered, *pzErr is set to point to a buffer containing -** a nul-terminated error message and this function returns immediately. The -** caller should eventually call lsmFree(*pzErr) to free the allocated -** error message buffer. -*/ -static void assertPointersOk( - lsm_db *pDb, /* Database handle */ - SortedRun *pOne, /* Run containing pointers */ - SortedRun *pTwo, /* Run containing pointer targets */ - int bRhs, /* True if pTwo may have been Gobble()d */ - char **pzErr +#ifdef LSM_DEBUG_EXPENSIVE +/* +** This function is only included in the build if LSM_DEBUG_EXPENSIVE is +** defined. Its only purpose is to evaluate various assert() statements to +** verify that the database is well formed in certain respects. +** +** More specifically, it checks that the array pOne contains the required +** pointers to pTwo. Array pTwo must be a main array. pOne may be either a +** separators array or another main array. If pOne does not contain the +** correct set of pointers, an assert() statement fails. +*/ +static int assertPointersOk( + lsm_db *pDb, /* Database handle */ + Segment *pOne, /* Segment containing pointers */ + Segment *pTwo, /* Segment containing pointer targets */ + int bRhs /* True if pTwo may have been Gobble()d */ ){ int rc = LSM_OK; /* Error code */ SegmentPtr ptr1; /* Iterates through pOne */ SegmentPtr ptr2; /* Iterates through pTwo */ Pgno iPrev; @@ -4173,12 +4434,12 @@ assert( pOne && pTwo ); memset(&ptr1, 0, sizeof(ptr1)); memset(&ptr2, 0, sizeof(ptr1)); - ptr1.pRun = pOne; - ptr2.pRun = pTwo; + ptr1.pSeg = pOne; + ptr2.pSeg = pTwo; segmentPtrEndPage(pDb->pFS, &ptr1, 0, &rc); segmentPtrEndPage(pDb->pFS, &ptr2, 0, &rc); /* Check that the footer pointer of the first page of pOne points to ** the first page of pTwo. */ @@ -4201,11 +4462,11 @@ assert( rc==LSM_OK ); }while( rc==LSM_OK && ptr2.pPg && ptr2.nCell==0 ); if( rc!=LSM_OK || ptr2.pPg==0 ) break; iThis = lsmFsPageNumber(ptr2.pPg); - if( (ptr2.flags & PGFTR_SKIP_THIS_FLAG)==0 ){ + if( (ptr2.flags & (PGFTR_SKIP_THIS_FLAG|SEGMENT_BTREE_FLAG))==0 ){ /* Load the first cell in the array pTwo page. */ rc = segmentPtrLoadCell(&ptr2, 0); /* Iterate forwards through pOne, searching for a key that matches the @@ -4221,11 +4482,11 @@ assert( bRhs || ptr1.iPtr+ptr1.iPgPtr==iPrev ); }else if( res>0 ){ assert( 0 ); }else{ assert( ptr1.iPtr+ptr1.iPgPtr==iThis ); - iPrev = lsmFsPageNumber(ptr2.pPg); + iPrev = iThis; break; } rc = segmentPtrAdvance(0, &ptr1, 0); if( ptr1.pPg==0 ){ @@ -4235,126 +4496,71 @@ } } segmentPtrReset(&ptr1); segmentPtrReset(&ptr2); -} - -static int countBtreeKeys(FileSystem *pFS, SortedRun *pRun, Pgno iPg){ -#if 0 - int rc; - Page *pPg; - u8 *aData; - int nData; - int flags; - int nRet; - - rc = lsmFsDbPageGet(pFs, iPg, &pPg); - assert( rc==LSM_OK ); - aData = lsmFsPageData(pPg, &nData); - flags = pageGetFlags(aData, nData); - - if( flags & SEGMENT_BTREE_FLAG ){ - Pgno iRight; - int nRec; - int i; - - iRight = pageGetPtr(aData, nData); - nRec = pageGetNRec(aData, nData); - - nRet = nRec; - nRet += countBtreeKeys(pFS, pRun, iRight); - for(i=0; iiFirst, &pPg); - assert( rc==LSM_OK ); - while( pPg ){ - Page *pNext = 0; - u8 *aData; - int nData; - int flags; - int nRec; - - aData = lsmFsPageData(pPg, &nData); - flags = pageGetFlags(aData, nData); - nRec = pageGetNRec(aData, nData); - - if( (flags & SEGMENT_BTREE_FLAG)==0 && nRec ){ - nRun++; - } - - rc = lsmFsDbPageNext(pPg, 1, &pNext); - assert( rc==LSM_OK ); - lsmFsPageRelease(pPg); - pPg = pNext; - } - - nKey = countBtreeKeys(pFS, pRun, iRoot); - assert( nRun==1+nKey ); -#endif -} - -static void assertAllBtreesOk(int rc, lsm_db *pDb){ -#if 0 - if( rc==LSM_OK ){ - Level *p; - for(p=pDb->pLevel; p; p=p->pNext){ - SortedRun *pSep = p->lhs.pSep; - Pgno iRoot = pSep->iRoot; - if( pSep && iRoot ){ - assertBtreeRanges(pDb, pSep, iRoot, 0, 0, 0, 0); - assertBtreeSize(pDb->pFS, pSep, iRoot); - } - } - } -#endif + return LSM_OK; } /* -** This function is only useful for debugging. +** This function is only included in the build if LSM_DEBUG_EXPENSIVE is +** defined. Its only purpose is to evaluate various assert() statements to +** verify that the database is well formed in certain respects. +** +** More specifically, it checks that the b-tree embedded in array pRun +** contains the correct keys. If not, an assert() fails. */ -static void assertAllPointersOk(int rc, lsm_db *pDb){ - assert( rc!=LSM_OK || pDb->pWorker ); - if( rc==LSM_OK ){ - Level *p; - for(p=lsmDbSnapshotLevel(pDb->pWorker); p; p=p->pNext){ - int i; - - if( segmentHasSeparators(&p->lhs) ){ - assertPointersOk(pDb, &p->lhs.sep, &p->lhs.run, 0, 0); - } - for(i=0; inRight; i++){ - if( segmentHasSeparators(&p->aRhs[i]) ){ - assertPointersOk(pDb, &p->aRhs[i].sep, &p->aRhs[i].run, 1, 0); - } - } - - } - } -} - +static int assertBtreeOk( + lsm_db *pDb, + Segment *pSeg +){ + int rc = LSM_OK; /* Return code */ + if( pSeg->iRoot ){ + Blob blob = {0, 0, 0}; /* Buffer used to cache overflow keys */ + FileSystem *pFS = pDb->pFS; /* File system to read from */ + Page *pPg = 0; /* Main run page */ + BtreeCursor *pCsr = 0; /* Btree cursor */ + + rc = btreeCursorNew(pDb, pSeg, &pCsr); + if( rc==LSM_OK ){ + rc = btreeCursorFirst(pCsr); + } + if( rc==LSM_OK ){ + rc = lsmFsDbPageGet(pFS, pSeg->iFirst, &pPg); + } + + while( rc==LSM_OK ){ + Page *pNext; + u8 *aData; + int nData; + int flags; + + rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext); + lsmFsPageRelease(pPg); + pPg = pNext; + if( pPg==0 ) break; + aData = fsPageData(pPg, &nData); + flags = pageGetFlags(aData, nData); + if( rc==LSM_OK + && 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags) + && 0!=pageGetNRec(aData, nData) + ){ + u8 *pKey; + int nKey; + int iTopic; + pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob); + assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) ); + assert( lsmFsPageNumber(pPg)==pCsr->iPtr ); + rc = btreeCursorNext(pCsr); + } + } + assert( rc!=LSM_OK || pCsr->pKey==0 ); + + if( pPg ) lsmFsPageRelease(pPg); + + btreeCursorFree(pCsr); + sortedBlobFree(&blob); + } + + return rc; +} #endif /* ifdef LSM_DEBUG_EXPENSIVE */