Index: lsm-test/lsmtest1.c ================================================================== --- lsm-test/lsmtest1.c +++ lsm-test/lsmtest1.c @@ -358,11 +358,10 @@ int i; testScanCompare(pControl, pDb, 0, 0, 0, 0, 0, pRc); testScanCompare(pControl, pDb, 1, 0, 0, 0, 0, pRc); -#if 0 if( *pRc==0 ){ int iKey1; int iKey2; void *pKey1; int nKey1; /* Start key */ void *pKey2; int nKey2; /* Final key */ @@ -380,11 +379,10 @@ testScanCompare(pControl, pDb, 1, 0, 0, pKey2, nKey2, pRc); testScanCompare(pControl, pDb, 1, pKey1, nKey1, 0, 0, pRc); testScanCompare(pControl, pDb, 1, pKey1, nKey1, pKey2, nKey2, pRc); testFree(pKey1); } -#endif for(i=0; inIter*p->nWrite), i, pControl, pDb, &rc); -#endif testReopen(&pDb, &rc); testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc); /* Update the progress dots... */ testCaseProgress(i, p->nIter, testCaseNDot(), &iDot); Index: src/lsm_sorted.c ================================================================== --- src/lsm_sorted.c +++ src/lsm_sorted.c @@ -1312,33 +1312,17 @@ return 1; } #endif -int segmentPtrSeek( +static int segmentPtrSearchOversized( MultiCursor *pCsr, /* Cursor context */ SegmentPtr *pPtr, /* Pointer to seek */ - void *pKey, int nKey, /* Key to seek to */ - int eSeek, /* Search bias - see above */ - int *piPtr, /* OUT: FC pointer */ - int *pbStop + void *pKey, int nKey /* Key to seek to */ ){ int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; - int res; /* Result of comparison operation */ int rc = LSM_OK; - int iMin; - int iMax; - int iPtrOut = 0; - - const int iTopic = 0; - -#if 0 -static int nCall = 0; -nCall++; -printf("in call %d\n", nCall); -fflush(stdout); -#endif /* If the OVERSIZED flag is set, then there is no pointer in the ** upper level to the next page in the segment that contains at least ** one key. So compare the largest key on the current page with the ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance @@ -1358,11 +1342,11 @@ /* If the loaded key is >= than (pKey/nKey), break out of the loop. ** If (pKey/nKey) is present in this array, it must be on the current ** page. */ res = sortedKeyCompare( - xCmp, iLastTopic, pLastKey, nLastKey, iTopic, pKey, nKey + xCmp, iLastTopic, pLastKey, nLastKey, 0, pKey, nKey ); if( res>=0 ) break; /* Advance to the next page that contains at least one key. */ pNext = pPtr->pPg; @@ -1389,17 +1373,187 @@ /* This should probably be an LSM_CORRUPT error. */ assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) ); } + return rc; +} + +static int ptrFwdPointer( + Page *pPage, + int iCell, + Segment *pSeg, + Pgno *piPtr, + int *pbFound +){ + Page *pPg = pPage; + int iFirst = iCell; + int rc = LSM_OK; + + do { + Page *pNext = 0; + u8 *aData; + int nData; + + aData = lsmFsPageData(pPg, &nData); + if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){ + int i; + int nCell = pageGetNRec(aData, nData); + for(i=iFirst; ipPg && rc==LSM_OK ){ + int res = sortedKeyCompare(pCsr->pDb->xCmp, + pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey, + rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey + ); + if( res<=0 ) break; + rc = segmentPtrAdvance(pCsr, pPtr, 0); + } + return rc; +} + + +/* +** This function is called as part of a SEEK_GE op on a multi-cursor if the +** FC pointer read from segment *pPtr comes from an entry with the +** LSM_START_DELETE flag set. In this case the pointer value cannot be +** trusted. Instead, the pointer that should be followed is that associated +** with the next entry in *pPtr that does not have LSM_START_DELETE set. +** +** Why the pointers can't be trusted: +** +** +** +** TODO: This is a stop-gap solution: +** +** At the moment, this function is called from within segmentPtrSeek(), +** as part of the initial lsmMCursorSeek() call. However, consider a +** database where the following has occurred: +** +** 1. A range delete removes keys 1..9999 using a range delete. +** 2. Keys 1 through 9999 are reinserted. +** 3. The levels containing the ops in 1. and 2. above are merged. Call +** this level N. Level N contains FC pointers to level N+1. +** +** Then, if the user attempts to query for (key>=2 LIMIT 10), the +** lsmMCursorSeek() call will iterate through 9998 entries searching for a +** pointer down to the level N+1 that is never actually used. It would be +** much better if the multi-cursor could do this lazily - only seek to the +** level (N+1) page after the user has moved the cursor on level N passed +** the big range-delete. +*/ +static int segmentPtrFwdPointer( + MultiCursor *pCsr, /* Multi-cursor pPtr belongs to */ + SegmentPtr *pPtr, /* Segment-pointer to extract FC ptr from */ + Pgno *piPtr /* OUT: FC pointer value */ +){ + Level *pLvl = pPtr->pLevel; + Level *pNext = pLvl->pNext; + Page *pPg = pPtr->pPg; + int rc; + int bFound; + Pgno iOut = 0; + + if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){ + if( pNext==0 + || (pNext->nRight==0 && pNext->lhs.iRoot) + || (pNext->nRight!=0 && pNext->aRhs[0].iRoot) + ){ + /* Do nothing. The pointer will not be used anyway. */ + return LSM_OK; + } + }else{ + if( pPtr[1].pSeg->iRoot ){ + return LSM_OK; + } + } + + /* Search for a pointer within the current segment. */ + lsmFsPageRef(pPg); + rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound); + + if( rc==LSM_OK && bFound==0 ){ + /* This case happens when pPtr points to the left-hand-side of a segment + ** currently undergoing an incremental merge. In this case, jump to the + ** oldest segment in the right-hand-side of the same level and continue + ** searching. But - do not consider any keys smaller than the levels + ** split-key. */ + SegmentPtr ptr; + + if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){ + return LSM_CORRUPT_BKPT; + } + + memset(&ptr, 0, sizeof(SegmentPtr)); + ptr.pLevel = pPtr->pLevel; + ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1]; + rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr); + if( rc==LSM_OK ){ + rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound); + ptr.pPg = 0; + } + segmentPtrReset(&ptr); + } + + *piPtr = iOut; + return rc; +} + +static int segmentPtrSeek( + MultiCursor *pCsr, /* Cursor context */ + SegmentPtr *pPtr, /* Pointer to seek */ + void *pKey, int nKey, /* Key to seek to */ + int eSeek, /* Search bias - see above */ + int *piPtr, /* OUT: FC pointer */ + int *pbStop +){ + int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; + int res; /* Result of comparison operation */ + int rc = LSM_OK; + int iMin; + int iMax; + int iPtrOut = 0; + const int iTopic = 0; + + /* If the current page contains an oversized entry, then there are no + ** pointers to one or more of the subsequent pages in the sorted run. + ** The following call ensures that the segment-ptr points to the correct + ** page in this case. */ + rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey); iPtrOut = pPtr->iPtr; /* Assert that this page is the right page of this segment for the key ** that we are searching for. Do this by loading page (iPg-1) and testing ** that pKey/nKey is greater than all keys on that page, and then by ** loading (iPg+1) and testing that pKey/nKey is smaller than all - ** the keys it houses. */ + ** the keys it houses. + ** + ** TODO: With range-deletes in the tree, the test described above may fail. + */ #if 0 assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) ); #endif assert( pPtr->nCell>0 @@ -1442,10 +1596,11 @@ if( rc==LSM_OK ){ assert( res==0 || (iMin==iMax && iMin>=0 && iMinnCell) ); if( res ){ rc = segmentPtrLoadCell(pPtr, iMin); } + assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) ); if( rc==LSM_OK ){ switch( eSeek ){ case LSM_SEEK_EQ: { int eType = pPtr->eType; @@ -1468,13 +1623,22 @@ break; } case LSM_SEEK_LE: if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1); break; - case LSM_SEEK_GE: - if( res<0 ) rc = segmentPtrAdvance(pCsr, pPtr, 0); + case LSM_SEEK_GE: { + /* Figure out if we need to 'skip' the pointer forward or not */ + if( (res<=0 && (pPtr->eType & LSM_START_DELETE)) + || (res>0 && (pPtr->eType & LSM_END_DELETE)) + ){ + rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut); + } + if( res<0 && rc==LSM_OK ){ + rc = segmentPtrAdvance(pCsr, pPtr, 0); + } break; + } } } } /* If the cursor seek has found a separator key, and this cursor is @@ -1810,24 +1974,10 @@ } pCsr->aTree[iOut] = iRes; } -static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){ - int rc; - rc = segmentPtrEnd(pCsr, pPtr, 0); - while( pPtr->pPg && rc==LSM_OK ){ - int res = sortedKeyCompare(pCsr->pDb->xCmp, - pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey, - rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey - ); - if( res<=0 ) break; - rc = segmentPtrAdvance(pCsr, pPtr, 0); - } - return rc; -} - /* ** This function advances segment pointer iPtr belonging to multi-cursor ** pCsr forward (bReverse==0) or backward (bReverse!=0). ** ** If the segment pointer points to a segment that is part of a composite @@ -2559,17 +2709,17 @@ int iKey = pCsr->aTree[1]; /* If this multi-cursor is advancing forwards, and the sub-cursor ** being advanced is the one that separator keys may be being read ** from, record the current absolute pointer value. */ - if( pCsr->pPrevMergePtr - && iKey>=CURSOR_DATA_SEGMENT - && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-(!pCsr->pBtCsr)) - ){ + if( pCsr->pPrevMergePtr ){ if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){ + assert( pCsr->pBtCsr ); *pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr; - }else{ + }else if( pCsr->pBtCsr==0 && pCsr->nPtr>0 + && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1) + ){ SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT]; *pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr; } } @@ -3167,17 +3317,10 @@ pPg = pMW->pPage; aData = fsPageData(pPg, &nData); nRec = pageGetNRec(aData, nData); iFPtr = pageGetPtr(aData, nData); - /* If iPtr is 0, set it to the same value as the absolute pointer - ** stored as part of the previous record. */ - if( 0 && iPtr==0 ){ - iPtr = iFPtr; - if( nRec ) iPtr += pageGetRecordPtr(aData, nData, nRec-1); - } - /* Calculate the relative pointer value to write to this record */ iRPtr = iPtr - iFPtr; /* assert( iRPtr>=0 ); */ /* Figure out how much space is required by the new record. The space @@ -3556,11 +3699,11 @@ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ - int iLeftPtr = 0; + Pgno iLeftPtr = 0; int nWrite = 0; /* Number of database pages written */ assert( pnOvfl ); /* Allocate the new level structure to write to. */ @@ -3587,11 +3730,10 @@ } if( rc!=LSM_OK ){ lsmMCursorClose(pCsr); }else{ - Pgno iCurrentPtr = 0; Merge merge; /* Merge object used to create new level */ MergeWorker mergeworker; /* MergeWorker object for the same purpose */ memset(&merge, 0, sizeof(Merge)); memset(&mergeworker, 0, sizeof(MergeWorker)); @@ -3598,11 +3740,11 @@ pNew->pMerge = &merge; mergeworker.pDb = pDb; mergeworker.pLevel = pNew; mergeworker.pCsr = pCsr; - pCsr->pPrevMergePtr = &iCurrentPtr; + pCsr->pPrevMergePtr = &iLeftPtr; /* Mark the separators array for the new level as a "phantom". */ mergeworker.bFlush = 1; /* Allocate the first page of the output segment. */