Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -440,22 +440,24 @@ */ static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){ WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx; Freelist *pFree = p->pFreelist; - while( (p->iFree < pFree->nEntry) ){ - FreelistEntry *pEntry = &pFree->aEntry[p->iFree]; - if( pEntry->iBlk>iBlk ){ - break; - }else{ - p->iFree++; - if( pEntry->iId>=0 - && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) - ){ - return 1; - } - if( pEntry->iBlk==iBlk ) return 0; + if( pFree ){ + while( (p->iFree < pFree->nEntry) ){ + FreelistEntry *pEntry = &pFree->aEntry[p->iFree]; + if( pEntry->iBlk>iBlk ){ + break; + }else{ + p->iFree++; + if( pEntry->iId>=0 + && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) + ){ + return 1; + } + if( pEntry->iBlk==iBlk ) return 0; + } } } return p->xUsr(p->pUsrctx, iBlk, iSnapshot); } @@ -475,23 +477,34 @@ lsm_db *pDb, /* Database handle (must be worker) */ int (*x)(void *, int, i64), /* Callback function */ void *pCtx /* First argument to pass to callback */ ){ int rc; - WalkFreelistCtx ctx; - ctx.pDb = pDb; - ctx.pFreelist = &pDb->pWorker->freelist; - ctx.iFree = 0; - ctx.xUsr = x; - ctx.pUsrctx = pCtx; - - rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx); - if( rc==LSM_OK ){ + int iCtx; + + WalkFreelistCtx ctx[2]; + + ctx[0].pDb = pDb; + ctx[0].pFreelist = &pDb->pWorker->freelist; + ctx[0].iFree = 0; + ctx[0].xUsr = walkFreelistCb; + ctx[0].pUsrctx = (void *)&ctx[1]; + + ctx[1].pDb = pDb; + ctx[1].pFreelist = pDb->pFreelist; + ctx[1].iFree = 0; + ctx[1].xUsr = x; + ctx[1].pUsrctx = pCtx; + + rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx[0]); + + for(iCtx=0; iCtx<2; iCtx++){ int i; - for(i=ctx.iFree; inEntry; i++){ - FreelistEntry *pEntry = &ctx.pFreelist->aEntry[i]; - if( pEntry->iId>=0 && ctx.xUsr(ctx.pUsrctx, pEntry->iBlk, pEntry->iId) ){ + WalkFreelistCtx *p = &ctx[iCtx]; + for(i=p->iFree; p->pFreelist && rc==LSM_OK && ipFreelist->nEntry; i++){ + FreelistEntry *pEntry = &p->pFreelist->aEntry[i]; + if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){ return 1; } } } @@ -557,16 +570,16 @@ /* If a block was found in the free block list, use it and remove it from ** the list. Otherwise, if no suitable block was found, allocate one from ** the end of the file. */ if( rc==LSM_OK ){ if( iRet>0 ){ -#ifdef LSM_LOG_BLOCKS +#ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, 0, "reusing block %d", iRet); #endif rc = freelistAppend(pDb, iRet, -1); }else{ -#ifdef LSM_LOG_BLOCKS +#ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet); #endif iRet = ++(p->nBlock); } } @@ -605,10 +618,14 @@ ** until (at least) after the next checkpoint. */ int lsmBlockRefree(lsm_db *pDb, int iBlk){ int rc = LSM_OK; /* Return code */ Snapshot *p = pDb->pWorker; + +#ifdef LSM_LOG_FREELIST + lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk); +#endif if( iBlk==p->nBlock ){ p->nBlock--; }else{ rc = freelistAppend(pDb, iBlk, 0); Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -2325,11 +2325,10 @@ int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1; u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4]; lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId); *ppVal = aVal; *pnVal = 8; - pCsr->pDb->bUseFreelist = 1; } break; } default: { @@ -3727,16 +3726,19 @@ ** The cursor passed as the first argument is being used as the input for ** a merge operation. When this function is called, *piFlags contains the ** database entry flags for the current entry. The entry about to be written ** to the output. ** +** Note that this function only has to work for cursors configured to +** iterate forwards (not backwards). */ static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){ int f = *piFlags; int iKey = pCsr->aTree[1]; int i; + assert( pCsr->flags & CURSOR_NEXT_OK ); if( pCsr->flags & CURSOR_IGNORE_DELETE ){ /* The ignore-delete flag is set when the output of the merge will form ** the oldest level in the database. In this case there is no point in ** retaining any range-delete flags. */ assert( (f & LSM_POINT_DELETE)==0 ); @@ -3748,11 +3750,12 @@ f |= (LSM_START_DELETE|LSM_END_DELETE); } } for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); inPtr; i++){ - if( pCsr->aPtr[i].eType & LSM_END_DELETE ){ + SegmentPtr *pPtr = &pCsr->aPtr[i]; + if( pPtr->pPg && (pPtr->eType & LSM_END_DELETE) ){ f |= (LSM_START_DELETE|LSM_END_DELETE); } } if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){ @@ -3776,10 +3779,15 @@ pSeg = &pMW->pLevel->lhs; /* Pull the next record out of the source cursor. */ lsmMCursorKey(pCsr, &pKey, &nKey); eType = pCsr->eType; + + if( eType & LSM_SYSTEMKEY ){ + int i; + i = 1; + } /* Figure out if the output record may have a different pointer value ** than the previous. This is the case if the current key is identical to ** a key that appears in the lowest level run being merged. If so, set ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, @@ -3890,12 +3898,13 @@ Segment *pDel = 0; /* Delete separators from this segment */ int nWrite = 0; /* Number of database pages written */ Freelist freelist; assert( pnOvfl ); - pDb->pFreelist = &freelist; assert( pDb->bUseFreelist==0 ); + pDb->pFreelist = &freelist; + pDb->bUseFreelist = 1; memset(&freelist, 0, sizeof(freelist)); /* Allocate the new level structure to write to. */ pNext = lsmDbSnapshotLevel(pDb->pWorker); pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);