Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Range-delete related SEEK_GE and SEEK_LE fixes. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | range-delete |
Files: | files | file ages | folders |
SHA1: |
d30de7f821ba18345c1958c4c226de08 |
User & Date: | dan 2012-10-14 09:41:35.754 |
Context
2012-10-15
| ||
14:26 | Fix a problem with lsm_mt2 in lsmtest_tdb3.c. check-in: c025a26642 user: dan tags: range-delete | |
2012-10-14
| ||
09:41 | Range-delete related SEEK_GE and SEEK_LE fixes. check-in: d30de7f821 user: dan tags: range-delete | |
2012-10-11
| ||
19:36 | Fix cases involving iteration through split levels where the first part of a range-delete has been merged or annihilated but the second has not. check-in: 45d5b7570e user: dan tags: range-delete | |
Changes
Changes to lsm-test/lsmtest1.c.
︙ | ︙ | |||
356 357 358 359 360 361 362 | int *pRc ){ int i; testScanCompare(pControl, pDb, 0, 0, 0, 0, 0, pRc); testScanCompare(pControl, pDb, 1, 0, 0, 0, 0, pRc); | < | 356 357 358 359 360 361 362 363 364 365 366 367 368 369 | int *pRc ){ int i; testScanCompare(pControl, pDb, 0, 0, 0, 0, 0, pRc); testScanCompare(pControl, pDb, 1, 0, 0, 0, 0, pRc); if( *pRc==0 ){ int iKey1; int iKey2; void *pKey1; int nKey1; /* Start key */ void *pKey2; int nKey2; /* Final key */ iKey1 = testPrngValue(iSeed) % nData; |
︙ | ︙ | |||
378 379 380 381 382 383 384 | testScanCompare(pControl, pDb, 0, pKey1, nKey1, 0, 0, pRc); testScanCompare(pControl, pDb, 0, pKey1, nKey1, pKey2, nKey2, pRc); testScanCompare(pControl, pDb, 1, 0, 0, pKey2, nKey2, pRc); testScanCompare(pControl, pDb, 1, pKey1, nKey1, 0, 0, pRc); testScanCompare(pControl, pDb, 1, pKey1, nKey1, pKey2, nKey2, pRc); testFree(pKey1); } | < | 377 378 379 380 381 382 383 384 385 386 387 388 389 390 | testScanCompare(pControl, pDb, 0, pKey1, nKey1, 0, 0, pRc); testScanCompare(pControl, pDb, 0, pKey1, nKey1, pKey2, nKey2, pRc); testScanCompare(pControl, pDb, 1, 0, 0, pKey2, nKey2, pRc); testScanCompare(pControl, pDb, 1, pKey1, nKey1, 0, 0, pRc); testScanCompare(pControl, pDb, 1, pKey1, nKey1, pKey2, nKey2, pRc); testFree(pKey1); } for(i=0; i<nData && *pRc==0; i++){ void *pKey; int nKey; testDatasourceEntry(pData, i, &pKey, &nKey, 0, 0); testFetchCompare(pControl, pDb, pKey, nKey, pRc); } } |
︙ | ︙ | |||
424 425 426 427 428 429 430 | pKey1 = testMallocCopy(pKey1, nKey1); testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0); testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc); testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc); testFree(pKey1); | < < | 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 | pKey1 = testMallocCopy(pKey1, nKey1); testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0); testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc); testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc); testFree(pKey1); testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc); testReopen(&pDb, &rc); testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc); /* Update the progress dots... */ testCaseProgress(i, p->nIter, testCaseNDot(), &iDot); } |
︙ | ︙ |
Changes to src/lsm_sorted.c.
︙ | ︙ | |||
1310 1311 1312 1313 1314 1315 1316 | if( eSeek==LSM_SEEK_GE ) return (res<=0); } return 1; } #endif | | | < < < < < < < < < < < < < < < < | 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 | if( eSeek==LSM_SEEK_GE ) return (res<=0); } return 1; } #endif static int segmentPtrSearchOversized( MultiCursor *pCsr, /* Cursor context */ SegmentPtr *pPtr, /* Pointer to seek */ void *pKey, int nKey /* Key to seek to */ ){ int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; int rc = LSM_OK; /* If the OVERSIZED flag is set, then there is no pointer in the ** upper level to the next page in the segment that contains at least ** one key. So compare the largest key on the current page with the ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance ** to the next page in the segment that contains at least one key. */ |
︙ | ︙ | |||
1356 1357 1358 1359 1360 1361 1362 | pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1 ); /* If the loaded key is >= than (pKey/nKey), break out of the loop. ** If (pKey/nKey) is present in this array, it must be on the current ** page. */ res = sortedKeyCompare( | | | 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 | pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1 ); /* If the loaded key is >= than (pKey/nKey), break out of the loop. ** If (pKey/nKey) is present in this array, it must be on the current ** page. */ res = sortedKeyCompare( xCmp, iLastTopic, pLastKey, nLastKey, 0, pKey, nKey ); if( res>=0 ) break; /* Advance to the next page that contains at least one key. */ pNext = pPtr->pPg; lsmFsPageRef(pNext); while( 1 ){ |
︙ | ︙ | |||
1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 | if( pNext==0 ) break; segmentPtrSetPage(pPtr, pNext); /* This should probably be an LSM_CORRUPT error. */ assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) ); } iPtrOut = pPtr->iPtr; /* Assert that this page is the right page of this segment for the key ** that we are searching for. Do this by loading page (iPg-1) and testing ** that pKey/nKey is greater than all keys on that page, and then by ** loading (iPg+1) and testing that pKey/nKey is smaller than all | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | > > > | 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 | if( pNext==0 ) break; segmentPtrSetPage(pPtr, pNext); /* This should probably be an LSM_CORRUPT error. */ assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) ); } return rc; } static int ptrFwdPointer( Page *pPage, int iCell, Segment *pSeg, Pgno *piPtr, int *pbFound ){ Page *pPg = pPage; int iFirst = iCell; int rc = LSM_OK; do { Page *pNext = 0; u8 *aData; int nData; aData = lsmFsPageData(pPg, &nData); if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){ int i; int nCell = pageGetNRec(aData, nData); for(i=iFirst; i<nCell; i++){ u8 eType = *pageGetCell(aData, nData, i); if( (eType & LSM_START_DELETE)==0 ){ *pbFound = 1; *piPtr = pageGetRecordPtr(aData, nData, i) + pageGetPtr(aData, nData); lsmFsPageRelease(pPg); return LSM_OK; } } } rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext); lsmFsPageRelease(pPg); pPg = pNext; iFirst = 0; }while( pPg && rc==LSM_OK ); lsmFsPageRelease(pPg); *pbFound = 0; return rc; } static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){ int rc; rc = segmentPtrEnd(pCsr, pPtr, 0); while( pPtr->pPg && rc==LSM_OK ){ int res = sortedKeyCompare(pCsr->pDb->xCmp, pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey, rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey ); if( res<=0 ) break; rc = segmentPtrAdvance(pCsr, pPtr, 0); } return rc; } /* ** This function is called as part of a SEEK_GE op on a multi-cursor if the ** FC pointer read from segment *pPtr comes from an entry with the ** LSM_START_DELETE flag set. In this case the pointer value cannot be ** trusted. Instead, the pointer that should be followed is that associated ** with the next entry in *pPtr that does not have LSM_START_DELETE set. ** ** Why the pointers can't be trusted: ** ** ** ** TODO: This is a stop-gap solution: ** ** At the moment, this function is called from within segmentPtrSeek(), ** as part of the initial lsmMCursorSeek() call. However, consider a ** database where the following has occurred: ** ** 1. A range delete removes keys 1..9999 using a range delete. ** 2. Keys 1 through 9999 are reinserted. ** 3. The levels containing the ops in 1. and 2. above are merged. Call ** this level N. Level N contains FC pointers to level N+1. ** ** Then, if the user attempts to query for (key>=2 LIMIT 10), the ** lsmMCursorSeek() call will iterate through 9998 entries searching for a ** pointer down to the level N+1 that is never actually used. It would be ** much better if the multi-cursor could do this lazily - only seek to the ** level (N+1) page after the user has moved the cursor on level N passed ** the big range-delete. */ static int segmentPtrFwdPointer( MultiCursor *pCsr, /* Multi-cursor pPtr belongs to */ SegmentPtr *pPtr, /* Segment-pointer to extract FC ptr from */ Pgno *piPtr /* OUT: FC pointer value */ ){ Level *pLvl = pPtr->pLevel; Level *pNext = pLvl->pNext; Page *pPg = pPtr->pPg; int rc; int bFound; Pgno iOut = 0; if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){ if( pNext==0 || (pNext->nRight==0 && pNext->lhs.iRoot) || (pNext->nRight!=0 && pNext->aRhs[0].iRoot) ){ /* Do nothing. The pointer will not be used anyway. */ return LSM_OK; } }else{ if( pPtr[1].pSeg->iRoot ){ return LSM_OK; } } /* Search for a pointer within the current segment. */ lsmFsPageRef(pPg); rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound); if( rc==LSM_OK && bFound==0 ){ /* This case happens when pPtr points to the left-hand-side of a segment ** currently undergoing an incremental merge. In this case, jump to the ** oldest segment in the right-hand-side of the same level and continue ** searching. But - do not consider any keys smaller than the levels ** split-key. */ SegmentPtr ptr; if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){ return LSM_CORRUPT_BKPT; } memset(&ptr, 0, sizeof(SegmentPtr)); ptr.pLevel = pPtr->pLevel; ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1]; rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr); if( rc==LSM_OK ){ rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound); ptr.pPg = 0; } segmentPtrReset(&ptr); } *piPtr = iOut; return rc; } static int segmentPtrSeek( MultiCursor *pCsr, /* Cursor context */ SegmentPtr *pPtr, /* Pointer to seek */ void *pKey, int nKey, /* Key to seek to */ int eSeek, /* Search bias - see above */ int *piPtr, /* OUT: FC pointer */ int *pbStop ){ int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp; int res; /* Result of comparison operation */ int rc = LSM_OK; int iMin; int iMax; int iPtrOut = 0; const int iTopic = 0; /* If the current page contains an oversized entry, then there are no ** pointers to one or more of the subsequent pages in the sorted run. ** The following call ensures that the segment-ptr points to the correct ** page in this case. */ rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey); iPtrOut = pPtr->iPtr; /* Assert that this page is the right page of this segment for the key ** that we are searching for. Do this by loading page (iPg-1) and testing ** that pKey/nKey is greater than all keys on that page, and then by ** loading (iPg+1) and testing that pKey/nKey is smaller than all ** the keys it houses. ** ** TODO: With range-deletes in the tree, the test described above may fail. */ #if 0 assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) ); #endif assert( pPtr->nCell>0 || pPtr->pSeg->nSize==1 || lsmFsPageNumber(pPtr->pPg)==pPtr->pSeg->iLast |
︙ | ︙ | |||
1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 | } if( rc==LSM_OK ){ assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) ); if( res ){ rc = segmentPtrLoadCell(pPtr, iMin); } if( rc==LSM_OK ){ switch( eSeek ){ case LSM_SEEK_EQ: { int eType = pPtr->eType; if( (res<0 && (eType & LSM_START_DELETE)) || (res>0 && (eType & LSM_END_DELETE)) | > | 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 | } if( rc==LSM_OK ){ assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) ); if( res ){ rc = segmentPtrLoadCell(pPtr, iMin); } assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) ); if( rc==LSM_OK ){ switch( eSeek ){ case LSM_SEEK_EQ: { int eType = pPtr->eType; if( (res<0 && (eType & LSM_START_DELETE)) || (res>0 && (eType & LSM_END_DELETE)) |
︙ | ︙ | |||
1466 1467 1468 1469 1470 1471 1472 | } segmentPtrReset(pPtr); break; } case LSM_SEEK_LE: if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1); break; | | > > > > > > > | > > | 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 | } segmentPtrReset(pPtr); break; } case LSM_SEEK_LE: if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1); break; case LSM_SEEK_GE: { /* Figure out if we need to 'skip' the pointer forward or not */ if( (res<=0 && (pPtr->eType & LSM_START_DELETE)) || (res>0 && (pPtr->eType & LSM_END_DELETE)) ){ rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut); } if( res<0 && rc==LSM_OK ){ rc = segmentPtrAdvance(pCsr, pPtr, 0); } break; } } } } /* If the cursor seek has found a separator key, and this cursor is ** supposed to ignore separators keys, advance to the next entry. */ if( rc==LSM_OK && pPtr->pPg |
︙ | ︙ | |||
1808 1809 1810 1811 1812 1813 1814 | iRes = i2; } } pCsr->aTree[iOut] = iRes; } | < < < < < < < < < < < < < < | 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 | iRes = i2; } } pCsr->aTree[iOut] = iRes; } /* ** This function advances segment pointer iPtr belonging to multi-cursor ** pCsr forward (bReverse==0) or backward (bReverse!=0). ** ** If the segment pointer points to a segment that is part of a composite ** level, then the following special cases are handled. ** |
︙ | ︙ | |||
2557 2558 2559 2560 2561 2562 2563 | if( lsmMCursorValid(pCsr) ){ do { int iKey = pCsr->aTree[1]; /* If this multi-cursor is advancing forwards, and the sub-cursor ** being advanced is the one that separator keys may be being read ** from, record the current absolute pointer value. */ | | | > > > | | < < < | 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 | if( lsmMCursorValid(pCsr) ){ do { int iKey = pCsr->aTree[1]; /* If this multi-cursor is advancing forwards, and the sub-cursor ** being advanced is the one that separator keys may be being read ** from, record the current absolute pointer value. */ if( pCsr->pPrevMergePtr ){ if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){ assert( pCsr->pBtCsr ); *pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr; }else if( pCsr->pBtCsr==0 && pCsr->nPtr>0 && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1) ){ SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT]; *pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr; } } if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){ TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]; |
︙ | ︙ | |||
3165 3166 3167 3168 3169 3170 3171 | pSeg = &pMW->pLevel->lhs; pPg = pMW->pPage; aData = fsPageData(pPg, &nData); nRec = pageGetNRec(aData, nData); iFPtr = pageGetPtr(aData, nData); | < < < < < < < | 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 | pSeg = &pMW->pLevel->lhs; pPg = pMW->pPage; aData = fsPageData(pPg, &nData); nRec = pageGetNRec(aData, nData); iFPtr = pageGetPtr(aData, nData); /* Calculate the relative pointer value to write to this record */ iRPtr = iPtr - iFPtr; /* assert( iRPtr>=0 ); */ /* Figure out how much space is required by the new record. The space ** required is divided into two sections: the header and the body. The ** header consists of the intial varint fields. The body are the blobs |
︙ | ︙ | |||
3554 3555 3556 3557 3558 3559 3560 | int *pnWrite /* OUT: Number of database pages written */ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ | | | 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 | int *pnWrite /* OUT: Number of database pages written */ ){ int rc = LSM_OK; /* Return Code */ MultiCursor *pCsr = 0; Level *pNext = 0; /* The current top level */ Level *pNew; /* The new level itself */ Segment *pDel = 0; /* Delete separators from this segment */ Pgno iLeftPtr = 0; int nWrite = 0; /* Number of database pages written */ assert( pnOvfl ); /* Allocate the new level structure to write to. */ pNext = lsmDbSnapshotLevel(pDb->pWorker); pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc); |
︙ | ︙ | |||
3585 3586 3587 3588 3589 3590 3591 | iLeftPtr = pNext->lhs.iFirst; } } if( rc!=LSM_OK ){ lsmMCursorClose(pCsr); }else{ | < | | 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 | iLeftPtr = pNext->lhs.iFirst; } } if( rc!=LSM_OK ){ lsmMCursorClose(pCsr); }else{ Merge merge; /* Merge object used to create new level */ MergeWorker mergeworker; /* MergeWorker object for the same purpose */ memset(&merge, 0, sizeof(Merge)); memset(&mergeworker, 0, sizeof(MergeWorker)); pNew->pMerge = &merge; mergeworker.pDb = pDb; mergeworker.pLevel = pNew; mergeworker.pCsr = pCsr; pCsr->pPrevMergePtr = &iLeftPtr; /* Mark the separators array for the new level as a "phantom". */ mergeworker.bFlush = 1; /* Allocate the first page of the output segment. */ rc = mergeWorkerNextPage(&mergeworker, iLeftPtr); |
︙ | ︙ |