Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Fix cases involving iteration through split levels where the first part of a range-delete has been merged or annihilated but the second has not. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | range-delete |
Files: | files | file ages | folders |
SHA1: |
45d5b7570e48fc17f9c9b525122b2a35 |
User & Date: | dan 2012-10-11 19:36:20.705 |
Context
2012-10-14
| ||
09:41 | Range-delete related SEEK_GE and SEEK_LE fixes. check-in: d30de7f821 user: dan tags: range-delete | |
2012-10-11
| ||
19:36 | Fix cases involving iteration through split levels where the first part of a range-delete has been merged or annihilated but the second has not. check-in: 45d5b7570e user: dan tags: range-delete | |
2012-10-10
| ||
18:10 | Fixes for range-delete and seek operations. check-in: 1ff4639070 user: dan tags: range-delete | |
Changes
Changes to lsm-test/lsmtest1.c.
︙ | ︙ | |||
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 | pKey1 = testMallocCopy(pKey1, nKey1); testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0); testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc); testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc); testFree(pKey1); testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc); testReopen(&pDb, &rc); testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc); /* Update the progress dots... */ testCaseProgress(i, p->nIter, testCaseNDot(), &iDot); } | > > | 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 | pKey1 = testMallocCopy(pKey1, nKey1); testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0); testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc); testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc); testFree(pKey1); #if 0 testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc); #endif testReopen(&pDb, &rc); testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc); /* Update the progress dots... */ testCaseProgress(i, p->nIter, testCaseNDot(), &iDot); } |
︙ | ︙ |
Changes to lsm-test/lsmtest_main.c.
︙ | ︙ | |||
197 198 199 200 201 202 203 | ){ ScanResult *p = (ScanResult *)pCtx; u8 *aKey = (u8 *)pKey; u8 *aVal = (u8 *)pVal; int i; if( test_scan_debug ){ | | | 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | ){ ScanResult *p = (ScanResult *)pCtx; u8 *aKey = (u8 *)pKey; u8 *aVal = (u8 *)pVal; int i; if( test_scan_debug ){ printf("%d: %.*s\n", p->nRow, nKey, (char *)pKey); fflush(stdout); } #if 0 if( test_scan_debug ) printf("%.20s\n", (char *)pVal); #endif #if 0 |
︙ | ︙ |
Changes to src/lsm_sorted.c.
︙ | ︙ | |||
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 | static int segmentPtrAdvance( MultiCursor *pCsr, SegmentPtr *pPtr, int bReverse ){ int eDir = (bReverse ? -1 : 1); do { int rc; int iCell; /* Number of new cell in page */ iCell = pPtr->iCell + eDir; assert( pPtr->pPg ); assert( iCell<=pPtr->nCell && iCell>=-1 ); if( iCell>=pPtr->nCell || iCell<0 ){ do { rc = segmentPtrNextPage(pPtr, eDir); }while( rc==LSM_OK && pPtr->pPg && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) ) ); if( rc!=LSM_OK ) return rc; iCell = bReverse ? (pPtr->nCell-1) : 0; } rc = segmentPtrLoadCell(pPtr, iCell); if( rc!=LSM_OK ) return rc; }while( pCsr && pPtr->pPg && segmentPtrIgnoreSeparators(pCsr, pPtr) && rtIsSeparator(pPtr->eType) ); return LSM_OK; | > > > > > > > > > > > > > > > > > > > > > > > | 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 | static int segmentPtrAdvance( MultiCursor *pCsr, SegmentPtr *pPtr, int bReverse ){ int eDir = (bReverse ? -1 : 1); Level *pLvl = pPtr->pLevel; do { int rc; int iCell; /* Number of new cell in page */ int svFlags = 0; /* SegmentPtr.eType before advance */ iCell = pPtr->iCell + eDir; assert( pPtr->pPg ); assert( iCell<=pPtr->nCell && iCell>=-1 ); if( bReverse && pPtr->pSeg!=&pPtr->pLevel->lhs ){ svFlags = pPtr->eType; assert( svFlags ); } if( iCell>=pPtr->nCell || iCell<0 ){ do { rc = segmentPtrNextPage(pPtr, eDir); }while( rc==LSM_OK && pPtr->pPg && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) ) ); if( rc!=LSM_OK ) return rc; iCell = bReverse ? (pPtr->nCell-1) : 0; } rc = segmentPtrLoadCell(pPtr, iCell); if( rc!=LSM_OK ) return rc; if( svFlags && pPtr->pPg ){ int res = sortedKeyCompare(pCsr->pDb->xCmp, rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey ); if( res<0 ) segmentPtrReset(pPtr); } if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){ rc = lsmFsDbPageGet(pCsr->pDb->pFS, pPtr->pSeg->iFirst, &pPtr->pPg); if( rc!=LSM_OK ) return rc; pPtr->eType = LSM_START_DELETE | (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0); pPtr->pKey = pLvl->pSplitKey; pPtr->nKey = pLvl->nSplitKey; } }while( pCsr && pPtr->pPg && segmentPtrIgnoreSeparators(pCsr, pPtr) && rtIsSeparator(pPtr->eType) ); return LSM_OK; |
︙ | ︙ | |||
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 | ** points at either the first (bLast==0) or last (bLast==1) cell in the valid ** region of the segment defined by pPtr->iFirst and pPtr->iLast. ** ** Return LSM_OK if successful or an lsm error code if something goes ** wrong (IO error, OOM etc.). */ static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){ int rc = LSM_OK; FileSystem *pFS = pCsr->pDb->pFS; int bIgnore; segmentPtrEndPage(pFS, pPtr, bLast, &rc); while( rc==LSM_OK && pPtr->pPg && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG)) ){ rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1)); } | > | | > > > > > > > > > > > > > | 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 | ** points at either the first (bLast==0) or last (bLast==1) cell in the valid ** region of the segment defined by pPtr->iFirst and pPtr->iLast. ** ** Return LSM_OK if successful or an lsm error code if something goes ** wrong (IO error, OOM etc.). */ static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){ Level *pLvl = pPtr->pLevel; int rc = LSM_OK; FileSystem *pFS = pCsr->pDb->pFS; int bIgnore; segmentPtrEndPage(pFS, pPtr, bLast, &rc); while( rc==LSM_OK && pPtr->pPg && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG)) ){ rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1)); } if( rc==LSM_OK && pPtr->pPg ){ rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0); } bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr); if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){ rc = segmentPtrAdvance(pCsr, pPtr, bLast); } if( bLast && rc==LSM_OK && pPtr->pPg && pPtr->pSeg==&pLvl->lhs && pLvl->nRight && (pPtr->eType & LSM_START_DELETE) ){ pPtr->iCell++; pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic); pPtr->pKey = pLvl->pSplitKey; pPtr->nKey = pLvl->nSplitKey; pPtr->pVal = 0; pPtr->nVal = 0; } return rc; } static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){ assert( pPtr->pPg ); *ppKey = pPtr->pKey; *pnKey = pPtr->nKey; |
︙ | ︙ | |||
1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 | } if( peType ) *peType = eType; if( pnKey ) *pnKey = nKey; if( ppKey ) *ppKey = pKey; } static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){ int i1; int i2; int iRes; void *pKey1; int nKey1; int eType1; void *pKey2; int nKey2; int eType2; | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 | } if( peType ) *peType = eType; if( pnKey ) *pnKey = nKey; if( ppKey ) *ppKey = pKey; } static int sortedDbKeyCompare( int (*xCmp)(void *, int, void *, int), int iLhsFlags, void *pLhsKey, int nLhsKey, int iRhsFlags, void *pRhsKey, int nRhsKey ){ int res; /* Compare the keys, including the system flag. */ res = sortedKeyCompare(xCmp, rtTopic(iLhsFlags), pLhsKey, nLhsKey, rtTopic(iRhsFlags), pRhsKey, nRhsKey ); /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents ** the beginning of an open-ended set from masking a database entry or ** delete at a lower level. */ if( res==0 ){ const int insdel = LSM_POINT_DELETE|LSM_INSERT; int iDel1 = 0; int iDel2 = 0; if( LSM_START_DELETE==(iLhsFlags & (LSM_START_DELETE|insdel)) ) iDel1 = +1; if( LSM_END_DELETE ==(iLhsFlags & (LSM_END_DELETE |insdel)) ) iDel1 = -1; if( LSM_START_DELETE==(iRhsFlags & (LSM_START_DELETE|insdel)) ) iDel2 = +1; if( LSM_END_DELETE ==(iRhsFlags & (LSM_END_DELETE |insdel)) ) iDel2 = -1; res = (iDel1 - iDel2); } return res; } static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){ int i1; int i2; int iRes; void *pKey1; int nKey1; int eType1; void *pKey2; int nKey2; int eType2; |
︙ | ︙ | |||
1723 1724 1725 1726 1727 1728 1729 | if( pKey1==0 ){ iRes = i2; }else if( pKey2==0 ){ iRes = i1; }else{ int res; | | | | < < < < < < < < < < < < < < < < < | 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 | if( pKey1==0 ){ iRes = i2; }else if( pKey2==0 ){ iRes = i1; }else{ int res; /* Compare the keys */ res = sortedDbKeyCompare(pCsr->pDb->xCmp, eType1, pKey1, nKey1, eType2, pKey2, nKey2 ); res = res * mul; if( res==0 ){ iRes = (rtIsSeparator(eType1) ? i2 : i1); }else if( res<0 ){ iRes = i1; }else{ iRes = i2; |
︙ | ︙ | |||
1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 | } for(i=pCsr->nTree-1; i>0; i--){ multiCursorDoCompare(pCsr, i, 0); } } else if( pPtr->pPg && bComposite && bReverse && pPtr->pSeg!=&pLvl->lhs ){ int res = sortedKeyCompare(pCsr->pDb->xCmp, rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey ); if( res<0 ){ segmentPtrReset(pPtr); } } return rc; } static void mcursorFreeComponents(MultiCursor *pCsr){ int i; lsm_env *pEnv = pCsr->pDb->pEnv; | > > | 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 | } for(i=pCsr->nTree-1; i>0; i--){ multiCursorDoCompare(pCsr, i, 0); } } #if 0 else if( pPtr->pPg && bComposite && bReverse && pPtr->pSeg!=&pLvl->lhs ){ int res = sortedKeyCompare(pCsr->pDb->xCmp, rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey ); if( res<0 ){ segmentPtrReset(pPtr); } } #endif return rc; } static void mcursorFreeComponents(MultiCursor *pCsr){ int i; lsm_env *pEnv = pCsr->pDb->pEnv; |
︙ | ︙ | |||
2474 2475 2476 2477 2478 2479 2480 | /* Check the current key value. If it is not greater than (if bReverse==0) ** or less than (if bReverse!=0) the key currently cached in pCsr->key, ** then the cursor has not yet been successfully advanced. */ multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew); if( pNew ){ | | < | < > | 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 | /* Check the current key value. If it is not greater than (if bReverse==0) ** or less than (if bReverse!=0) the key currently cached in pCsr->key, ** then the cursor has not yet been successfully advanced. */ multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew); if( pNew ){ int res = sortedDbKeyCompare(pCsr->pDb->xCmp, eNewType, pNew, nNew, pCsr->eType, pCsr->key.pData, pCsr->key.nData ); if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){ return 0; } multiCursorCacheKey(pCsr, pRc); assert( pCsr->eType==eNewType ); |
︙ | ︙ |
Changes to src/lsm_tree.c.
︙ | ︙ | |||
122 123 124 125 126 127 128 129 130 131 132 133 134 135 | ); return 1; } static int assert_delete_ranges_match(lsm_db *); static int treeCountEntries(lsm_db *db); #endif /* ** Container for a key-value pair. Within the *-shm file, each key/value ** pair is stored in a single allocation (which may not actually be ** contiguous in memory). Layout is the TreeKey structure, followed by ** the nKey bytes of key blob, followed by the nValue bytes of value blob | > > | 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | ); return 1; } static int assert_delete_ranges_match(lsm_db *); static int treeCountEntries(lsm_db *db); #else # define assertFlagsOk(x) #endif /* ** Container for a key-value pair. Within the *-shm file, each key/value ** pair is stored in a single allocation (which may not actually be ** contiguous in memory). Layout is the TreeKey structure, followed by ** the nKey bytes of key blob, followed by the nValue bytes of value blob |
︙ | ︙ | |||
409 410 411 412 413 414 415 416 417 418 419 420 421 422 | ** * todo... */ void assert_tree_looks_ok(int rc, Tree *pTree){ } #else # define assert_tree_looks_ok(x,y) #endif #ifdef LSM_DEBUG /* ** Pointer pBlob points to a buffer containing a blob of binary data ** nBlob bytes long. Append the contents of this blob to *pStr, with ** each octet represented by a 2-digit hexadecimal number. For example, | > > > > > > > > > > > > > > > > > > > | 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 | ** * todo... */ void assert_tree_looks_ok(int rc, Tree *pTree){ } #else # define assert_tree_looks_ok(x,y) #endif void lsmFlagsToString(int flags, char *zFlags){ zFlags[0] = (flags & LSM_END_DELETE) ? ']' : '.'; /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever ** be set. If this is not true, write a '?' to the output. */ switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){ case 0: zFlags[1] = '.'; break; case LSM_POINT_DELETE: zFlags[1] = '-'; break; case LSM_INSERT: zFlags[1] = '+'; break; case LSM_SEPARATOR: zFlags[1] = '^'; break; default: zFlags[1] = '?'; break; } zFlags[2] = (flags & LSM_SYSTEMKEY) ? '*' : '.'; zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.'; zFlags[4] = '\0'; } #ifdef LSM_DEBUG /* ** Pointer pBlob points to a buffer containing a blob of binary data ** nBlob bytes long. Append the contents of this blob to *pStr, with ** each octet represented by a 2-digit hexadecimal number. For example, |
︙ | ︙ | |||
444 445 446 447 448 449 450 | */ static void lsmAppendIndent(LsmString *pStr, int nIndent){ int i; lsmStringExtend(pStr, nIndent); for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1); } | < < < < < < < < < < < < < < < < < < < | 465 466 467 468 469 470 471 472 473 474 475 476 477 478 | */ static void lsmAppendIndent(LsmString *pStr, int nIndent){ int i; lsmStringExtend(pStr, nIndent); for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1); } static void strAppendFlags(LsmString *pStr, u8 flags){ char zFlags[8]; lsmFlagsToString(flags, zFlags); zFlags[4] = ':'; lsmStringAppend(pStr, zFlags, 5); |
︙ | ︙ |