Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -689,12 +689,10 @@ void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp); /* And to sync the db file */ int lsmFsSyncDb(FileSystem *); -void lsmFsFlushWaiting(FileSystem *, int *); - /* Used by lsm_info(ARRAY_STRUCTURE) and lsm_config(MMAP) */ int lsmInfoArrayStructure(lsm_db *pDb, Pgno iFirst, char **pzOut); int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut); int lsmConfigMmap(lsm_db *pDb, int *piParam); Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -197,12 +197,10 @@ int bUseMmap; /* True to use mmap() to access db file */ void *pMap; /* Current mapping of database file */ i64 nMap; /* Bytes mapped at pMap */ Page *pFree; - Page *pWaiting; /* b-tree pages waiting to be written */ - /* Statistics */ int nWrite; /* Total number of pages written */ int nRead; /* Total number of pages read */ /* Page cache parameters for non-mmap() mode */ @@ -241,13 +239,10 @@ /* Only used in compressed database mode: */ int nCompress; /* Compressed size (or 0 for uncomp. db) */ int nCompressPrev; /* Compressed size of prev page */ Segment *pSeg; /* Segment this page will be written to */ - - /* Fix this up somehow */ - Page *pNextWaiting; }; /* ** Meta-data page handle. There are two meta-data pages at the start of ** the database file, each FileSystem.nMetasize bytes in size. @@ -1588,18 +1583,20 @@ ** Mark the sorted run passed as the second argument as finished. */ int lsmFsSortedFinish(FileSystem *pFS, Segment *p){ int rc = LSM_OK; if( p && p->iLastPg ){ + int iBlk; /* Check if the last page of this run happens to be the last of a block. ** If it is, then an extra block has already been allocated for this run. ** Shift this extra block back to the free-block list. ** ** Otherwise, add the first free page in the last block used by the run ** to the lAppend list. */ + iBlk = fsPageToBlock(pFS, p->iLastPg); if( fsLastPageOnPagesBlock(pFS, p->iLastPg)!=p->iLastPg ){ int i; Pgno *aiAppend = pFS->pDb->pWorker->aiAppend; for(i=0; inSize++; pSeg->iLastPg = *piNew; return LSM_OK; } -void lsmFsFlushWaiting(FileSystem *pFS, int *pRc){ - int rc = *pRc; - Page *pPg; - - pPg = pFS->pWaiting; - pFS->pWaiting = 0; - - while( pPg ){ - Page *pNext = pPg->pNextWaiting; - if( rc==LSM_OK ) rc = lsmFsPagePersist(pPg); - assert( pPg->nRef==1 ); - lsmFsPageRelease(pPg); - pPg = pNext; - } - *pRc = rc; -} - /* ** If the page passed as an argument is dirty, update the database file ** (or mapping of the database file) with its current contents and mark ** the page as clean. ** @@ -1962,20 +1942,15 @@ pFS->apHash[iHash] = pPg; pPg->pSeg->nSize += (sizeof(aSz) * 2) + pPg->nCompress; }else{ + i64 iOff; /* Offset to write within database file */ if( pPg->iPg==0 ){ /* No page number has been assigned yet. This occurs with pages used - ** in the b-tree hierarchy. They were not assigned page numbers when - ** they were created as doing so would cause this call to - ** lsmFsPagePersist() to write an out-of-order page. Instead a page - ** number is assigned here so that the page data will be appended - ** to the current segment. - */ - Page **pp; + ** in the b-tree hierarchy. */ int iPrev = 0; int iNext = 0; int iHash; assert( pPg->pSeg->iFirst ); @@ -2002,41 +1977,31 @@ }else{ int nData = pPg->nData; pPg->nData += 4; lsmSortedExpandBtreePage(pPg, nData); } - - pPg->nRef++; - for(pp=&pFS->pWaiting; *pp; pp=&(*pp)->pNextWaiting); - *pp = pPg; - assert( pPg->pNextWaiting==0 ); - - }else{ - i64 iOff; /* Offset to write within database file */ - - iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1); - if( pFS->bUseMmap==0 ){ - u8 *aData = pPg->aData - (pPg->flags & PAGE_HASPREV); - rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, pFS->nPagesize); - }else if( pPg->flags & PAGE_FREE ){ - fsGrowMapping(pFS, iOff + pFS->nPagesize, &rc); - if( rc==LSM_OK ){ - u8 *aTo = &((u8 *)(pFS->pMap))[iOff]; - u8 *aFrom = pPg->aData - (pPg->flags & PAGE_HASPREV); - memcpy(aTo, aFrom, pFS->nPagesize); - lsmFree(pFS->pEnv, aFrom); - pPg->aData = aTo + (pPg->flags & PAGE_HASPREV); - pPg->flags &= ~PAGE_FREE; - fsPageAddToLru(pFS, pPg); - } - } - - lsmFsFlushWaiting(pFS, &rc); - pPg->flags &= ~PAGE_DIRTY; - pFS->nWrite++; - } - } + } + + iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1); + if( pFS->bUseMmap==0 ){ + u8 *aData = pPg->aData - (pPg->flags & PAGE_HASPREV); + rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, pFS->nPagesize); + }else if( pPg->flags & PAGE_FREE ){ + fsGrowMapping(pFS, iOff + pFS->nPagesize, &rc); + if( rc==LSM_OK ){ + u8 *aTo = &((u8 *)(pFS->pMap))[iOff]; + u8 *aFrom = pPg->aData - (pPg->flags & PAGE_HASPREV); + memcpy(aTo, aFrom, pFS->nPagesize); + lsmFree(pFS->pEnv, aFrom); + pPg->aData = aTo + (pPg->flags & PAGE_HASPREV); + pPg->flags &= ~PAGE_FREE; + fsPageAddToLru(pFS, pPg); + } + } + } + pPg->flags &= ~PAGE_DIRTY; + pFS->nWrite++; } return rc; } @@ -2112,11 +2077,11 @@ int lsmFsPageRelease(Page *pPg){ int rc = LSM_OK; if( pPg ){ assert( pPg->nRef>0 ); pPg->nRef--; - if( pPg->nRef==0 ){ + if( pPg->nRef==0 && pPg->iPg!=0 ){ FileSystem *pFS = pPg->pFS; rc = lsmFsPagePersist(pPg); pFS->nOut--; assert( pPg->pFS->pCompress Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -3406,10 +3406,18 @@ u8 *aData; /* Page data for level iLevel */ int iOff; /* Offset on b-tree page to write record to */ int nRec; /* Initial number of records on b-tree page */ Pgno iPtr; /* Pointer value to accompany pKey/nKey */ + Hierarchy *p; + Segment *pSeg; + + /* If there exists a b-tree hierarchy and it is not loaded into + ** memory, load it now. */ + pSeg = &pMW->pLevel->lhs; + p = &pMW->hier; + assert( pMW->aSave[0].bStore==0 ); assert( pMW->aSave[1].bStore==0 ); rc = mergeWorkerBtreeIndirect(pMW); /* Obtain the absolute pointer value to store along with the key in the @@ -3832,11 +3840,10 @@ /* Persist and release the output page. */ if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW); if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW); if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW); if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW); - lsmFsFlushWaiting(pMW->pDb->pFS, &rc); mergeWorkerReleaseAll(pMW); lsmFree(pMW->pDb->pEnv, pMW->aGobble); pMW->aGobble = 0; pMW->pCsr = 0; @@ -4140,11 +4147,11 @@ sortedFreeLevel(pDb->pEnv, pNew); }else{ if( pDel ) pDel->iRoot = 0; #if 0 - lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 1, "new-toplevel"); + lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel"); #endif if( freelist.nEntry ){ Freelist *p = &pDb->pWorker->freelist; lsmFree(pDb->pEnv, p->aEntry); @@ -4579,11 +4586,11 @@ ** the database structure has changed. */ mergeWorkerShutdown(&mergeworker, &rc); if( rc==LSM_OK ) sortedInvokeWorkHook(pDb); #if 0 - lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 1, "work"); + lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work"); #endif assertBtreeOk(pDb, &pLevel->lhs); assertRunInOrder(pDb, &pLevel->lhs); /* If bFlush is true and the database is no longer considered "full", @@ -5124,11 +5131,11 @@ nRec = pageGetNRec(aData, nData); iPtr = pageGetPtr(aData, nData); flags = pageGetFlags(aData, nData); lsmStringInit(&str, pDb->pEnv); - lsmStringAppendf(&str, "Page : %lld (%d bytes)\n", iPg, nData); + lsmStringAppendf(&str, "Page : %d (%d bytes)\n", iPg, nData); lsmStringAppendf(&str, "nRec : %d\n", nRec); lsmStringAppendf(&str, "iPtr : %d\n", iPtr); lsmStringAppendf(&str, "flags: %04x\n", flags); lsmStringAppendf(&str, "\n");