SQLite4
Check-in [a7de625f13]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Merge range-delete branch back into trunk.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: a7de625f13d7a02d7a55e2e50210c949ada37a4d
User & Date: dan 2012-10-15 19:36:53
Context
2012-10-17
19:29
Fix various issues with tcl test cases. check-in: ae7d9c68ef user: dan tags: trunk
2012-10-16
15:26
Change page numbers to 8-byte numbers (from 4). This is required to support compressed databases, where a page number is a byte offset in the database file. check-in: 5d266a717d user: dan tags: compression-hooks
2012-10-15
19:36
Merge range-delete branch back into trunk. check-in: a7de625f13 user: dan tags: trunk
19:34
Fix a case in live-recovery from a writer crash. Leaf check-in: 80abdbea2d user: dan tags: range-delete
2012-10-03
09:24
Minor changes to the lsmperf.tcl script. check-in: 45e59053e7 user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest.h.

    42     42     DatabaseMethods const *pMethods;          /* Database methods */
    43     43     const char *zLibrary;                     /* Library name for tdb_open() */
    44     44   };
    45     45   struct DatabaseMethods {
    46     46     int (*xClose)(TestDb *);
    47     47     int (*xWrite)(TestDb *, void *, int , void *, int);
    48     48     int (*xDelete)(TestDb *, void *, int);
           49  +  int (*xDeleteRange)(TestDb *, void *, int, void *, int);
    49     50     int (*xFetch)(TestDb *, void *, int, void **, int *);
    50     51     int (*xScan)(TestDb *, void *, int, void *, int, void *, int,
    51     52       void (*)(void *, void *, int , void *, int)
    52     53     );
    53     54     int (*xBegin)(TestDb *, int);
    54     55     int (*xCommit)(TestDb *, int);
    55     56     int (*xRollback)(TestDb *, int);
................................................................................
   117    118   void test_failed(void);
   118    119   
   119    120   char *testMallocPrintf(const char *zFormat, ...);
   120    121   char *testMallocVPrintf(const char *zFormat, va_list ap);
   121    122   int testGlobMatch(const char *zPattern, const char *zStr);
   122    123   
   123    124   void testScanCompare(TestDb *, TestDb *, int, void *, int, void *, int, int *);
          125  +void testFetchCompare(TestDb *, TestDb *, void *, int, int *);
   124    126   
   125    127   void *testMalloc(int);
          128  +void *testMallocCopy(void *pCopy, int nByte);
   126    129   void *testRealloc(void *, int);
   127    130   void testFree(void *);
   128    131   
   129    132   /* testio.c */
   130    133   int testVfsConfigureDb(TestDb *pDb);
   131    134   
   132    135   /* testfunc.c */
................................................................................
   190    193   void testWriteDatasource(TestDb *, Datasource *, int, int *);
   191    194   void testWriteDatasourceRange(TestDb *, Datasource *, int, int, int *);
   192    195   void testDeleteDatasource(TestDb *, Datasource *, int, int *);
   193    196   void testDeleteDatasourceRange(TestDb *, Datasource *, int, int, int *);
   194    197   
   195    198   
   196    199   /* test1.c */
   197         -void test_data_1(TestDb *pDb, int *pRc);
   198         -void test_data_2(TestDb *pDb, int *pRc);
   199         -void test_data_3(const char *, const char *, int *pRc);
          200  +void test_data_1(const char *, const char *, int *pRc);
          201  +void test_data_2(const char *, const char *, int *pRc);
   200    202   void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *);
   201    203   void testCaseProgress(int, int, int, int *);
   202    204   int testCaseNDot(void);
   203    205   
   204    206   typedef struct CksumDb CksumDb;
   205    207   CksumDb *testCksumArrayNew(Datasource *, int, int, int);
   206    208   char *testCksumArrayGet(CksumDb *, int);

Changes to lsm-test/lsmtest1.c.

     1      1   
     2      2   #include "lsmtest.h"
     3      3   
     4      4   #define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE
     5      5   #define DATA_RANDOM     TEST_DATASOURCE_RANDOM
     6      6   
     7         -typedef struct Datatest Datatest;
     8         -typedef struct MinMax MinMax;
     9         -
    10         -struct MinMax {
    11         -  int nMin;
    12         -  int nMax;
    13         -};
            7  +typedef struct Datatest1 Datatest1;
            8  +typedef struct Datatest2 Datatest2;
    14      9   
    15     10   /*
    16     11   ** An instance of the following structure contains parameters used to
    17     12   ** customize the test function in this file. Test procedure:
    18     13   **
    19     14   **   1. Create a data-source based on the "datasource definition" vars.
    20     15   **
    21     16   **   2. Insert nRow key value pairs into the database.
    22     17   **
    23     18   **   3. Delete all keys from the database. Deletes are done in the same 
    24     19   **      order as the inserts.
    25     20   **
    26         -** During steps 2 and 3 above, after each Datatest.nVerify inserts or
           21  +** During steps 2 and 3 above, after each Datatest1.nVerify inserts or
    27     22   ** deletes, the following:
    28     23   **
    29     24   **   a. Run Datasource.nTest key lookups and check the results are as expected.
    30     25   **
    31     26   **   b. If Datasource.bTestScan is true, run a handful (8) of range
    32     27   **      queries (scanning forwards and backwards). Check that the results
    33     28   **      are as expected.
    34     29   **
    35     30   **   c. Close and reopen the database. Then run (a) and (b) again.
    36     31   */
    37         -struct Datatest {
           32  +struct Datatest1 {
    38     33     /* Datasource definition */
    39     34     DatasourceDefn defn;
    40     35   
    41     36     /* Test procedure parameters */
    42     37     int nRow;                       /* Number of rows to insert then delete */
    43     38     int nVerify;                    /* How often to verify the db contents */
    44     39     int nTest;                      /* Number of keys to test (0==all) */
    45     40     int bTestScan;                  /* True to do scan tests */
    46     41   };
    47     42   
    48         -static char *getName(const char *zSystem, Datatest *pTest){
           43  +/*
           44  +** An instance of the following data structure is used to describe the
           45  +** second type of test case in this file. The chief difference between 
           46  +** these tests and those described by Datatest1 is that these tests also
           47  +** experiment with range-delete operations. Tests proceed as follows:
           48  +**
           49  +**     1. Open the datasource described by Datatest2.defn. 
           50  +**
           51  +**     2. Open a connection on an empty database.
           52  +**
           53  +**     3. Do this Datatest2.nIter times:
           54  +**
           55  +**        a) Insert Datatest2.nWrite key-value pairs from the datasource.
           56  +**
           57  +**        b) Select two pseudo-random keys and use them as the start
           58  +**           and end points of a range-delete operation.
           59  +**
           60  +**        c) Verify that the contents of the database are as expected (see
           61  +**           below for details).
           62  +**
           63  +**        d) Close and then reopen the database handle.
           64  +**
           65  +**        e) Verify that the contents of the database are still as expected.
           66  +**
           67  +** The inserts and range deletes are run twice - once on the database being
           68  +** tested and once using a control system (sqlite3, kc etc. - something that 
           69  +** works). In order to verify that the contents of the db being tested are
           70  +** correct, the test runs a bunch of scans and lookups on both the test and
           71  +** control databases. If the results are the same, the test passes.
           72  +*/
           73  +struct Datatest2 {
           74  +  DatasourceDefn defn;
           75  +  int nRange;
           76  +  int nWrite;                     /* Number of writes per iteration */
           77  +  int nIter;                      /* Total number of iterations to run */
           78  +};
           79  +
           80  +/*
           81  +** Generate a unique name for the test case pTest with database system
           82  +** zSystem.
           83  +*/
           84  +static char *getName(const char *zSystem, Datatest1 *pTest){
    49     85     char *zRet;
    50     86     char *zData;
    51     87     zData = testDatasourceName(&pTest->defn);
    52     88     zRet = testMallocPrintf("data.%s.%s.%d.%d", 
    53     89         zSystem, zData, pTest->nRow, pTest->nVerify
    54     90     );
    55     91     testFree(zData);
................................................................................
   211    247       void *pCtx, void *pKey, int nKey, void *pVal, int nVal
   212    248   ){
   213    249     printf("%s\n", (char *)pKey);
   214    250     fflush(stdout);
   215    251   }
   216    252   #endif
   217    253   
   218         -static void doDataTest(
          254  +static void doDataTest1(
   219    255     const char *zSystem,            /* Database system to test */
   220         -  Datatest *p,                    /* Structure containing test parameters */
          256  +  Datatest1 *p,                   /* Structure containing test parameters */
   221    257     int *pRc                        /* OUT: Error code */
   222    258   ){
   223    259     int i;
   224    260     int iDot;
   225    261     int rc = LSM_OK;
   226    262     Datasource *pData;
   227    263     TestDb *pDb;
................................................................................
   276    312     testDatasourceFree(pData);
   277    313     tdb_close(pDb);
   278    314     testCaseFinish(rc);
   279    315     *pRc = rc;
   280    316   }
   281    317   
   282    318   
   283         -void test_data_3(
          319  +void test_data_1(
   284    320     const char *zSystem,            /* Database system name */
   285    321     const char *zPattern,           /* Run test cases that match this pattern */
   286    322     int *pRc                        /* IN/OUT: Error code */
   287    323   ){
   288         -  Datatest aTest[] = {
          324  +  Datatest1 aTest[] = {
   289    325       { {DATA_RANDOM,     20,25,     100,200},       1000,  250, 1000, 1},
   290    326       { {DATA_RANDOM,     8,10,      100,200},       1000,  250, 1000, 1},
   291    327       { {DATA_RANDOM,     8,10,      10,20},         1000,  250, 1000, 1},
   292    328       { {DATA_RANDOM,     8,10,      1000,2000},     1000,  250, 1000, 1},
   293    329       { {DATA_RANDOM,     8,100,     10000,20000},    100,   25,  100, 1},
   294    330       { {DATA_RANDOM,     80,100,    10,20},         1000,  250, 1000, 1},
   295    331       { {DATA_RANDOM,     5000,6000, 10,20},          100,   25,  100, 1},
................................................................................
   302    338     };
   303    339   
   304    340     int i;
   305    341   
   306    342     for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
   307    343       char *zName = getName(zSystem, &aTest[i]);
   308    344       if( testCaseBegin(pRc, zPattern, "%s", zName) ){
   309         -      doDataTest(zSystem, &aTest[i], pRc);
          345  +      doDataTest1(zSystem, &aTest[i], pRc);
          346  +    }
          347  +    testFree(zName);
          348  +  }
          349  +}
          350  +
          351  +static void testCompareDb(
          352  +  Datasource *pData,
          353  +  int nData,
          354  +  int iSeed,
          355  +  TestDb *pControl,
          356  +  TestDb *pDb,
          357  +  int *pRc
          358  +){
          359  +  int i;
          360  +
          361  +  testScanCompare(pControl, pDb, 0, 0, 0,         0, 0,         pRc);
          362  +  testScanCompare(pControl, pDb, 1, 0, 0,         0, 0,         pRc);
          363  +
          364  +  if( *pRc==0 ){
          365  +    int iKey1;
          366  +    int iKey2;
          367  +    void *pKey1; int nKey1;       /* Start key */
          368  +    void *pKey2; int nKey2;       /* Final key */
          369  +
          370  +    iKey1 = testPrngValue(iSeed) % nData;
          371  +    iKey2 = testPrngValue(iSeed+1) % nData;
          372  +    testDatasourceEntry(pData, iKey1, &pKey2, &nKey1, 0, 0);
          373  +    pKey1 = testMalloc(nKey1+1);
          374  +    memcpy(pKey1, pKey2, nKey1+1);
          375  +    testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0);
          376  +
          377  +    testScanCompare(pControl, pDb, 0, 0, 0,         pKey2, nKey2, pRc);
          378  +    testScanCompare(pControl, pDb, 0, pKey1, nKey1, 0, 0,         pRc);
          379  +    testScanCompare(pControl, pDb, 0, pKey1, nKey1, pKey2, nKey2, pRc);
          380  +    testScanCompare(pControl, pDb, 1, 0, 0,         pKey2, nKey2, pRc);
          381  +    testScanCompare(pControl, pDb, 1, pKey1, nKey1, 0, 0,         pRc);
          382  +    testScanCompare(pControl, pDb, 1, pKey1, nKey1, pKey2, nKey2, pRc);
          383  +    testFree(pKey1);
          384  +  }
          385  +
          386  +  for(i=0; i<nData && *pRc==0; i++){
          387  +    void *pKey; int nKey;
          388  +    testDatasourceEntry(pData, i, &pKey, &nKey, 0, 0);
          389  +    testFetchCompare(pControl, pDb, pKey, nKey, pRc);
          390  +  }
          391  +}
          392  +
          393  +static void doDataTest2(
          394  +  const char *zSystem,            /* Database system to test */
          395  +  Datatest2 *p,                   /* Structure containing test parameters */
          396  +  int *pRc                        /* OUT: Error code */
          397  +){
          398  +  TestDb *pDb;
          399  +  TestDb *pControl;
          400  +  Datasource *pData;
          401  +  int i;
          402  +  int rc = LSM_OK;
          403  +  int iDot = 0;
          404  +
          405  +  /* Start the test case, open a database and allocate the datasource. */
          406  +  pDb = testOpen(zSystem, 1, &rc);
          407  +  pData = testDatasourceNew(&p->defn);
          408  +  rc = testControlDb(&pControl);
          409  +
          410  +  if( tdb_lsm(pDb) ){
          411  +    int nBuf = 32 * 1024 * 1024;
          412  +    lsm_config(tdb_lsm(pDb), LSM_CONFIG_WRITE_BUFFER, &nBuf);
          413  +  }
          414  +
          415  +  for(i=0; rc==0 && i<p->nIter; i++){
          416  +    void *pKey1; int nKey1;
          417  +    void *pKey2; int nKey2;
          418  +    int ii;
          419  +    int nRange = MIN(p->nIter*p->nWrite, p->nRange);
          420  +
          421  +    for(ii=0; rc==0 && ii<p->nWrite; ii++){
          422  +      int iKey = (i*p->nWrite + ii) % p->nRange;
          423  +      testWriteDatasource(pControl, pData, iKey, &rc);
          424  +      testWriteDatasource(pDb, pData, iKey, &rc);
          425  +    }
          426  +
          427  +    testDatasourceEntry(pData, i+1000000, &pKey1, &nKey1, 0, 0);
          428  +    pKey1 = testMallocCopy(pKey1, nKey1);
          429  +    testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0);
          430  +
          431  +    testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc);
          432  +    testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc);
          433  +    testFree(pKey1);
          434  +
          435  +    testCompareDb(pData, nRange, i, pControl, pDb, &rc);
          436  +    testReopen(&pDb, &rc);
          437  +    testCompareDb(pData, nRange, i, pControl, pDb, &rc);
          438  +
          439  +    /* Update the progress dots... */
          440  +    testCaseProgress(i, p->nIter, testCaseNDot(), &iDot);
          441  +  }
          442  +
          443  +  testClose(&pDb);
          444  +  testClose(&pControl);
          445  +  testDatasourceFree(pData);
          446  +  testCaseFinish(rc);
          447  +  *pRc = rc;
          448  +}
          449  +
          450  +static char *getName2(const char *zSystem, Datatest2 *pTest){
          451  +  char *zRet;
          452  +  char *zData;
          453  +  zData = testDatasourceName(&pTest->defn);
          454  +  zRet = testMallocPrintf("data2.%s.%s.%d.%d.%d", 
          455  +      zSystem, zData, pTest->nRange, pTest->nWrite, pTest->nIter
          456  +  );
          457  +  testFree(zData);
          458  +  return zRet;
          459  +}
          460  +
          461  +void test_data_2(
          462  +  const char *zSystem,            /* Database system name */
          463  +  const char *zPattern,           /* Run test cases that match this pattern */
          464  +  int *pRc                        /* IN/OUT: Error code */
          465  +){
          466  +  Datatest2 aTest[] = {
          467  +      /* defn,                                 nRange, nWrite, nIter */
          468  +    { {DATA_RANDOM,     20,25,     100,200},   10000,  10,     50   },
          469  +    { {DATA_RANDOM,     20,25,     100,200},   10000,  200,    50   },
          470  +    { {DATA_RANDOM,     20,25,     100,200},   100,    10,     1000 },
          471  +    { {DATA_RANDOM,     20,25,     100,200},   100,    200,    50   },
          472  +
          473  +#if 0
          474  +    { {DATA_RANDOM,     20,25,     100,200},       200, 50 }
          475  +#endif
          476  +  };
          477  +
          478  +  int i;
          479  +
          480  +  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
          481  +    char *zName = getName2(zSystem, &aTest[i]);
          482  +    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
          483  +      doDataTest2(zSystem, &aTest[i], pRc);
   310    484       }
   311    485       testFree(zName);
   312    486     }
   313    487   }
          488  +

Changes to lsm-test/lsmtest8.c.

   301    301   }
   302    302   
   303    303   void do_writer_crash_test(const char *zPattern, int *pRc){
   304    304     struct Test {
   305    305       const char *zName;
   306    306       void (*xFunc)(int *);
   307    307     } aTest[] = {
   308         -    { "writercrash2.lsm", doWriterCrash2 },
   309    308       { "writercrash1.lsm", doWriterCrash1 },
          309  +    { "writercrash2.lsm", doWriterCrash2 },
   310    310     };
   311    311     int i;
   312    312     for(i=0; i<ArraySize(aTest); i++){
   313    313       struct Test *p = &aTest[i];
   314    314       if( testCaseBegin(pRc, zPattern, p->zName) ){
   315    315         p->xFunc(pRc);
   316    316         testCaseFinish(*pRc);
   317    317       }
   318    318     }
   319    319   
   320    320   }
   321    321   
   322    322   

Changes to lsm-test/lsmtest_main.c.

    69     69   void testDelete(
    70     70     TestDb *pDb,                    /* Database handle */
    71     71     void *pKey, int nKey,           /* Key to query database for */
    72     72     int *pRc                        /* IN/OUT: Error code */
    73     73   ){
    74     74     if( *pRc==0 ){
    75     75       int rc;
    76         -    rc = tdb_delete(pDb, pKey, nKey);
           76  +    *pRc = rc = tdb_delete(pDb, pKey, nKey);
           77  +    testSetError(rc);
           78  +  }
           79  +}
           80  +void testDeleteRange(
           81  +  TestDb *pDb,                    /* Database handle */
           82  +  void *pKey1, int nKey1,
           83  +  void *pKey2, int nKey2,
           84  +  int *pRc                        /* IN/OUT: Error code */
           85  +){
           86  +  if( *pRc==0 ){
           87  +    int rc;
           88  +    *pRc = rc = tdb_delete_range(pDb, pKey1, nKey1, pKey2, nKey2);
    77     89       testSetError(rc);
    78     90     }
    79     91   }
    80     92   
    81     93   void testBegin(TestDb *pDb, int iTrans, int *pRc){
    82     94     if( *pRc==0 ){
    83     95       int rc;
................................................................................
   120    132     const char *zVal,               /* Value to write */
   121    133     int *pRc                        /* IN/OUT: Error code */
   122    134   ){
   123    135     int nVal = (zVal ? strlen(zVal) : 0);
   124    136     testFetch(pDb, (void *)zKey, strlen(zKey), (void *)zVal, nVal, pRc);
   125    137   }
   126    138   
   127         -static void testFetchCompare(
          139  +void testFetchCompare(
   128    140     TestDb *pDb1, 
   129    141     TestDb *pDb2, 
   130    142     void *pKey, int nKey, 
   131    143     int *pRc
   132    144   ){
   133    145     int rc;
   134    146     void *pDbVal1;
   135    147     void *pDbVal2;
   136    148     int nDbVal1;
   137    149     int nDbVal2;
          150  +
          151  +  static int nCall = 0;
          152  +  nCall++;
   138    153   
   139    154     rc = tdb_fetch(pDb1, pKey, nKey, &pDbVal1, &nDbVal1);
   140    155     testSetError(rc);
   141    156   
   142    157     rc = tdb_fetch(pDb2, pKey, nKey, &pDbVal2, &nDbVal2);
   143    158     testSetError(rc);
   144    159   
................................................................................
   181    196     void *pVal, int nVal
   182    197   ){
   183    198     ScanResult *p = (ScanResult *)pCtx;
   184    199     u8 *aKey = (u8 *)pKey;
   185    200     u8 *aVal = (u8 *)pVal;
   186    201     int i;
   187    202   
   188         -  if( test_scan_debug ) printf("%.*s\n", nKey, (char *)pKey);
          203  +  if( test_scan_debug ){
          204  +    printf("%d: %.*s\n", p->nRow, nKey, (char *)pKey);
          205  +    fflush(stdout);
          206  +  }
   189    207   #if 0
   190    208     if( test_scan_debug ) printf("%.20s\n", (char *)pVal);
   191    209   #endif
   192    210   
   193    211   #if 0
   194    212     /* Check tdb_fetch() matches */
   195    213     int rc = 0;
................................................................................
   369    387   ** Allocations made using testMalloc() should be freed using testFree().
   370    388   */
   371    389   void *testMalloc(int n){
   372    390     void *pRet = malloc(n);
   373    391     memset(pRet, 0, n);
   374    392     return pRet;
   375    393   }
          394  +
          395  +void *testMallocCopy(void *pCopy, int nByte){
          396  +  void *pRet = testMalloc(nByte);
          397  +  memcpy(pRet, pCopy, nByte);
          398  +  return pRet;
          399  +}
   376    400   
   377    401   void *testRealloc(void *p, int n){
   378    402     return realloc(p, n);
   379    403   }
   380    404   
   381    405   /*
   382    406   ** Free an allocation made by an earlier call to testMalloc().
................................................................................
   432    456     if( nArg==1 ){
   433    457       zPattern = azArg[0];
   434    458     }
   435    459   
   436    460     for(j=0; tdb_system_name(j); j++){
   437    461       rc = 0;
   438    462   
   439         -    test_data_3(tdb_system_name(j), zPattern, &rc);
          463  +    test_data_1(tdb_system_name(j), zPattern, &rc);
          464  +    test_data_2(tdb_system_name(j), zPattern, &rc);
   440    465       test_rollback(tdb_system_name(j), zPattern, &rc);
   441    466       test_mc(tdb_system_name(j), zPattern, &rc);
   442    467       test_mt(tdb_system_name(j), zPattern, &rc);
   443    468   
   444    469       if( rc ) nFail++;
   445    470     }
   446    471   

Changes to lsm-test/lsmtest_tdb.c.

   177    177   }
   178    178   
   179    179   static int test_leveldb_open(const char *zFilename, int bClear, TestDb **ppDb){
   180    180     static const DatabaseMethods LeveldbMethods = {
   181    181       test_leveldb_close,
   182    182       test_leveldb_write,
   183    183       test_leveldb_delete,
          184  +    0,
   184    185       test_leveldb_fetch,
   185    186       test_leveldb_scan,
   186    187       error_transaction_function,
   187    188       error_transaction_function,
   188    189       error_transaction_function
   189    190     };
   190    191   
................................................................................
   307    308   **   (nOpenTrans-1) nested write transactions open.
   308    309   */
   309    310   struct SqlDb {
   310    311     TestDb base;
   311    312     sqlite3 *db;
   312    313     sqlite3_stmt *pInsert;
   313    314     sqlite3_stmt *pDelete;
          315  +  sqlite3_stmt *pDeleteRange;
   314    316     sqlite3_stmt *pFetch;
   315    317     sqlite3_stmt *apScan[8];
   316    318   
   317    319     int nOpenTrans;
   318    320   
   319    321     /* Used by sql_fetch() to allocate space for results */
   320    322     int nAlloc;
................................................................................
   321    323     u8 *aAlloc;
   322    324   };
   323    325   
   324    326   static int sql_close(TestDb *pTestDb){
   325    327     SqlDb *pDb = (SqlDb *)pTestDb;
   326    328     sqlite3_finalize(pDb->pInsert);
   327    329     sqlite3_finalize(pDb->pDelete);
          330  +  sqlite3_finalize(pDb->pDeleteRange);
   328    331     sqlite3_finalize(pDb->pFetch);
   329    332     sqlite3_finalize(pDb->apScan[0]);
   330    333     sqlite3_finalize(pDb->apScan[1]);
   331    334     sqlite3_finalize(pDb->apScan[2]);
   332    335     sqlite3_finalize(pDb->apScan[3]);
   333    336     sqlite3_finalize(pDb->apScan[4]);
   334    337     sqlite3_finalize(pDb->apScan[5]);
................................................................................
   356    359   
   357    360   static int sql_delete(TestDb *pTestDb, void *pKey, int nKey){
   358    361     SqlDb *pDb = (SqlDb *)pTestDb;
   359    362     sqlite3_bind_blob(pDb->pDelete, 1, pKey, nKey, SQLITE_STATIC);
   360    363     sqlite3_step(pDb->pDelete);
   361    364     return sqlite3_reset(pDb->pDelete);
   362    365   }
          366  +
          367  +static int sql_delete_range(
          368  +  TestDb *pTestDb, 
          369  +  void *pKey1, int nKey1,
          370  +  void *pKey2, int nKey2
          371  +){
          372  +  SqlDb *pDb = (SqlDb *)pTestDb;
          373  +  sqlite3_bind_blob(pDb->pDeleteRange, 1, pKey1, nKey1, SQLITE_STATIC);
          374  +  sqlite3_bind_blob(pDb->pDeleteRange, 2, pKey2, nKey2, SQLITE_STATIC);
          375  +  sqlite3_step(pDb->pDeleteRange);
          376  +  return sqlite3_reset(pDb->pDeleteRange);
          377  +}
   363    378   
   364    379   static int sql_fetch(
   365    380     TestDb *pTestDb, 
   366    381     void *pKey, 
   367    382     int nKey, 
   368    383     void **ppVal, 
   369    384     int *pnVal
................................................................................
   508    523   }
   509    524   
   510    525   static int sql_open(const char *zFilename, int bClear, TestDb **ppDb){
   511    526     static const DatabaseMethods SqlMethods = {
   512    527       sql_close,
   513    528       sql_write,
   514    529       sql_delete,
          530  +    sql_delete_range,
   515    531       sql_fetch,
   516    532       sql_scan,
   517    533       sql_begin,
   518    534       sql_commit,
   519    535       sql_rollback
   520    536     };
   521    537     const char *zCreate = "CREATE TABLE IF NOT EXISTS t1(k PRIMARY KEY, v)";
   522    538     const char *zInsert = "REPLACE INTO t1 VALUES(?, ?)";
   523    539     const char *zDelete = "DELETE FROM t1 WHERE k = ?";
          540  +  const char *zRange = "DELETE FROM t1 WHERE k>? AND k<?";
   524    541     const char *zFetch  = "SELECT v FROM t1 WHERE k = ?";
   525    542   
   526    543     const char *zScan0  = "SELECT * FROM t1 WHERE k BETWEEN ?1 AND ?2 ORDER BY k";
   527    544     const char *zScan1  = "SELECT * FROM t1 WHERE k <= ?2 ORDER BY k";
   528    545     const char *zScan2  = "SELECT * FROM t1 WHERE k >= ?1 ORDER BY k";
   529    546     const char *zScan3  = "SELECT * FROM t1 ORDER BY k";
   530    547   
................................................................................
   546    563     memset(pDb, 0, sizeof(SqlDb));
   547    564     pDb->base.pMethods = &SqlMethods;
   548    565   
   549    566     if( 0!=(rc = sqlite3_open(zFilename, &pDb->db))
   550    567      || 0!=(rc = sqlite3_exec(pDb->db, zCreate, 0, 0, 0))
   551    568      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zInsert, -1, &pDb->pInsert, 0))
   552    569      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zDelete, -1, &pDb->pDelete, 0))
          570  +   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zRange, -1, &pDb->pDeleteRange, 0))
   553    571      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zFetch, -1, &pDb->pFetch, 0))
   554    572      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan0, -1, &pDb->apScan[0], 0))
   555    573      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan1, -1, &pDb->apScan[1], 0))
   556    574      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan2, -1, &pDb->apScan[2], 0))
   557    575      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan3, -1, &pDb->apScan[3], 0))
   558    576      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan4, -1, &pDb->apScan[4], 0))
   559    577      || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan5, -1, &pDb->apScan[5], 0))
................................................................................
   644    662   int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal){
   645    663     return pDb->pMethods->xWrite(pDb, pKey, nKey, pVal, nVal);
   646    664   }
   647    665   
   648    666   int tdb_delete(TestDb *pDb, void *pKey, int nKey){
   649    667     return pDb->pMethods->xDelete(pDb, pKey, nKey);
   650    668   }
          669  +
          670  +int tdb_delete_range(
          671  +    TestDb *pDb, void *pKey1, int nKey1, void *pKey2, int nKey2
          672  +){
          673  +  return pDb->pMethods->xDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2);
          674  +}
   651    675   
   652    676   int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal){
   653    677     return pDb->pMethods->xFetch(pDb, pKey, nKey, ppVal, pnVal);
   654    678   }
   655    679   
   656    680   int tdb_scan(
   657    681     TestDb *pDb,                    /* Database handle */

Changes to lsm-test/lsmtest_tdb.h.

    46     46   int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal);
    47     47   
    48     48   /*
    49     49   ** Delete a key from the database.
    50     50   */
    51     51   int tdb_delete(TestDb *pDb, void *pKey, int nKey);
    52     52   
           53  +/*
           54  +** Delete a range of keys from the database.
           55  +*/
           56  +int tdb_delete_range(TestDb *, void *pKey1, int nKey1, void *pKey2, int nKey2);
           57  +
    53     58   /*
    54     59   ** Query the database for key (pKey/nKey). If no entry is found, set *ppVal
    55     60   ** to 0 and *pnVal to -1 before returning. Otherwise, set *ppVal and *pnVal
    56     61   ** to a pointer to and size of the value associated with (pKey/nKey).
    57     62   */
    58     63   int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal);
    59     64   

Changes to lsm-test/lsmtest_tdb3.c.

   450    450     return lsm_write(pDb->db, pKey, nKey, pVal, nVal);
   451    451   }
   452    452   
   453    453   static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
   454    454     LsmDb *pDb = (LsmDb *)pTestDb;
   455    455     return lsm_delete(pDb->db, pKey, nKey);
   456    456   }
          457  +
          458  +static int test_lsm_delete_range(
          459  +  TestDb *pTestDb, 
          460  +  void *pKey1, int nKey1,
          461  +  void *pKey2, int nKey2
          462  +){
          463  +  LsmDb *pDb = (LsmDb *)pTestDb;
          464  +  return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2);
          465  +}
   457    466   
   458    467   static int test_lsm_fetch(
   459    468     TestDb *pTestDb, 
   460    469     void *pKey, 
   461    470     int nKey, 
   462    471     void **ppVal, 
   463    472     int *pnVal
................................................................................
   726    735     int bClear, 
   727    736     TestDb **ppDb
   728    737   ){
   729    738     static const DatabaseMethods LsmMethods = {
   730    739       test_lsm_close,
   731    740       test_lsm_write,
   732    741       test_lsm_delete,
          742  +    test_lsm_delete_range,
   733    743       test_lsm_fetch,
   734    744       test_lsm_scan,
   735    745       test_lsm_begin,
   736    746       test_lsm_commit,
   737    747       test_lsm_rollback
   738    748     };
   739    749   
................................................................................
  1111   1121       }
  1112   1122     }
  1113   1123   
  1114   1124     return rc;
  1115   1125   }
  1116   1126   
  1117   1127   
  1118         -static int test_lsm_mt(
  1119         -  const char *zFilename,          /* File to open */
  1120         -  int nWorker,                    /* Either 1 or 2, for worker threads */
  1121         -  int bClear,                     /* True to delete any existing db */
  1122         -  TestDb **ppDb                   /* OUT: TestDb database handle */
  1123         -){
  1124         -  LsmDb *pDb;
  1125         -  int rc;
  1126         -
  1127         -  rc = test_lsm_open(zFilename, bClear, ppDb);
  1128         -  pDb = (LsmDb *)*ppDb;
  1129         -
  1130         -  assert( nWorker==1 || nWorker==2 );
  1131         -
  1132         -  /* Turn off auto-work and configure a work-hook on the client connection. */
  1133         -  if( rc==0 ){
  1134         -    int bAutowork = 0;
  1135         -    lsm_config(pDb->db, LSM_CONFIG_AUTOWORK, &bAutowork);
  1136         -    lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);
  1137         -  }
  1138         -
  1139         -  if( rc==0 ){
  1140         -    pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
  1141         -    memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
  1142         -    pDb->nWorker = nWorker;
  1143         -
  1144         -    rc = mt_start_worker(pDb, 0, zFilename, 0, LSM_WORK_FLUSH, 
  1145         -        nWorker==1 ? 512 : 0, 1
  1146         -    );
  1147         -  }
  1148         -
  1149         -  if( rc==0 && nWorker==2 ){
  1150         -    rc = mt_start_worker(pDb, 1, zFilename, 0, 0, 512, 0);
  1151         -  }
  1152         -
  1153         -  return rc;
  1154         -}
  1155         -
  1156   1128   int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  1157         -  return test_lsm_mt(zFilename, 1, bClear, ppDb);
         1129  +  const char *zCfg = "threads=2";
         1130  +  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
  1158   1131   }
  1159   1132   
  1160   1133   int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  1161         -  return test_lsm_mt(zFilename, 2, bClear, ppDb);
         1134  +  const char *zCfg = "threads=3";
         1135  +  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
  1162   1136   }
  1163         -
  1164   1137   
  1165   1138   #else
  1166   1139   static void mt_shutdown(LsmDb *pDb) { 
  1167   1140     unused_parameter(pDb); 
  1168   1141   }
  1169   1142   int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
  1170   1143     unused_parameter(zFilename);
  1171   1144     unused_parameter(bClear);
  1172   1145     unused_parameter(ppDb);
  1173   1146     testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
  1174   1147     return 1;
  1175   1148   }
  1176   1149   #endif

Changes to src/lsm.h.

   376    376   /*
   377    377   ** Delete a value from the database. No error is returned if the specified
   378    378   ** key value does not exist in the database.
   379    379   */
   380    380   int lsm_delete(lsm_db *, const void *pKey, int nKey);
   381    381   
   382    382   /*
   383         -** Delete all database entries with keys that are greater than or equal to
   384         -** (pKey1/nKey1) and smaller than or equal to (pKey2/nKey2).
          383  +** Delete all database entries with keys that are greater than (pKey1/nKey1) 
          384  +** and smaller than (pKey2/nKey2). Note that keys (pKey1/nKey1) and
          385  +** (pKey2/nKey2) themselves, if they exist in the database, are not deleted.
          386  +**
          387  +** Return LSM_OK if successful, or an LSM error code otherwise.
   385    388   */
   386    389   int lsm_delete_range(lsm_db *, 
   387    390       const void *pKey1, int nKey1, const void *pKey2, int nKey2
   388    391   );
   389    392   
   390    393   /*
   391    394   ** The lsm_tree_size() function reports on the current state of the 

Changes to src/lsmInt.h.

   143    143   ** a checkpoint (the remainder are stored as a system record in the LSM).
   144    144   ** See also LSM_CONFIG_MAX_FREELIST.
   145    145   */
   146    146   #define LSM_MAX_FREELIST_ENTRIES 100
   147    147   
   148    148   #define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000
   149    149   
          150  +
          151  +/*
          152  +** Each entry stored in the LSM (or in-memory tree structure) has an
          153  +** associated mask of the following flags.
          154  +*/
          155  +#define LSM_START_DELETE 0x01     /* Start of open-ended delete range */
          156  +#define LSM_END_DELETE   0x02     /* End of open-ended delete range */
          157  +#define LSM_POINT_DELETE 0x04     /* Delete this key */
          158  +#define LSM_INSERT       0x08     /* Insert this key and value */
          159  +#define LSM_SEPARATOR    0x10     /* True if entry is separator key only */
          160  +#define LSM_SYSTEMKEY    0x20     /* True if entry is a system key (FREELIST) */
          161  +
   150    162   /*
   151    163   ** A string that can grow by appending.
   152    164   */
   153    165   struct LsmString {
   154    166     lsm_env *pEnv;              /* Run-time environment */
   155    167     int n;                      /* Size of string.  -1 indicates error */
   156    168     int nAlloc;                 /* Space allocated for z[] */
................................................................................
   374    386   };
   375    387   struct Merge {
   376    388     int nInput;                     /* Number of input runs being merged */
   377    389     MergeInput *aInput;             /* Array nInput entries in size */
   378    390     MergeInput splitkey;            /* Location in file of current splitkey */
   379    391     int nSkip;                      /* Number of separators entries to skip */
   380    392     int iOutputOff;                 /* Write offset on output page */
          393  +  Pgno iCurrentPtr;               /* Current pointer value */
   381    394     int bHierReadonly;              /* True if b-tree heirarchies are read-only */
   382    395   };
   383    396   
   384    397   /* 
   385    398   ** The first argument to this macro is a pointer to a Segment structure.
   386    399   ** Returns true if the structure instance indicates that the separators
   387    400   ** array is valid.
................................................................................
   539    552   void lsmTreeCursorDestroy(TreeCursor *);
   540    553   
   541    554   int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes);
   542    555   int lsmTreeCursorNext(TreeCursor *pCsr);
   543    556   int lsmTreeCursorPrev(TreeCursor *pCsr);
   544    557   int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast);
   545    558   void lsmTreeCursorReset(TreeCursor *pCsr);
   546         -int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey);
          559  +int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey);
          560  +int lsmTreeCursorFlags(TreeCursor *pCsr);
   547    561   int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal);
   548    562   int lsmTreeCursorValid(TreeCursor *pCsr);
   549    563   int lsmTreeCursorSave(TreeCursor *pCsr);
   550    564   
          565  +void lsmFlagsToString(int flags, char *zFlags);
   551    566   
   552    567   /* 
   553    568   ** Functions from file "mem.c".
   554    569   */
   555    570   int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool);
   556    571   void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool);
   557    572   void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte);

Changes to src/lsm_ckpt.c.

    60     60   **     4. If nRight>0, The number of segments involved in the merge
    61     61   **     5. if nRight>0, Current nSkip value (see Merge structure defn.),
    62     62   **     6. For each segment in the merge:
    63     63   **        5a. Page number of next cell to read during merge
    64     64   **        5b. Cell number of next cell to read during merge
    65     65   **     7. Page containing current split-key.
    66     66   **     8. Cell within page containing current split-key.
           67  +**     9. Current pointer value.
    67     68   **
    68     69   **   The freelist. 
    69     70   **
    70     71   **     1. Number of free-list entries stored in checkpoint header.
    71     72   **     2. For each entry:
    72     73   **        2a. Block number of free block.
    73     74   **        2b. MSW of associated checkpoint id.
................................................................................
   308    309       ckptSetValue(p, iOut++, pMerge->nSkip, pRc);
   309    310       for(i=0; i<pMerge->nInput; i++){
   310    311         ckptSetValue(p, iOut++, pMerge->aInput[i].iPg, pRc);
   311    312         ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc);
   312    313       }
   313    314       ckptSetValue(p, iOut++, pMerge->splitkey.iPg, pRc);
   314    315       ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc);
          316  +    ckptSetValue(p, iOut++, pMerge->iCurrentPtr, pRc);
   315    317     }
   316    318   
   317    319     *piOut = iOut;
   318    320   }
   319    321   
   320    322   /*
   321    323   ** Populate the log offset fields of the checkpoint buffer. 4 values.
................................................................................
   497    499     pMerge->nSkip = (int)aInt[iIn++];
   498    500     for(i=0; i<nInput; i++){
   499    501       pMerge->aInput[i].iPg = (Pgno)aInt[iIn++];
   500    502       pMerge->aInput[i].iCell = (int)aInt[iIn++];
   501    503     }
   502    504     pMerge->splitkey.iPg = (Pgno)aInt[iIn++];
   503    505     pMerge->splitkey.iCell = (int)aInt[iIn++];
          506  +  pMerge->iCurrentPtr = (int)aInt[iIn++];
   504    507   
   505    508     /* Set *piIn and return LSM_OK. */
   506    509     *piIn = iIn;
   507    510     return LSM_OK;
   508    511   }
   509    512   
   510    513   

Changes to src/lsm_file.c.

   725    725     Page **ppPg                     /* OUT: New page handle */
   726    726   ){
   727    727     Page *p;
   728    728     int iHash;
   729    729     int rc = LSM_OK;
   730    730   
   731    731     assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
          732  +  *ppPg = 0;
   732    733   
   733    734     if( pFS->bUseMmap ){
   734    735       i64 iEnd = (i64)iPg * pFS->nPagesize;
   735    736       fsGrowMapping(pFS, iEnd, &rc);
   736    737       if( rc!=LSM_OK ) return rc;
   737    738   
   738    739       if( pFS->pFree ){

Changes to src/lsm_main.c.

   475    475         break;
   476    476     }
   477    477   
   478    478     va_end(ap);
   479    479     return rc;
   480    480   }
   481    481   
   482         -/* 
   483         -** Write a new value into the database.
   484         -*/
   485         -int lsm_write(
   486         -  lsm_db *pDb,                    /* Database connection */
          482  +static int doWriteOp(
          483  +  lsm_db *pDb,
          484  +  int bDeleteRange,
   487    485     const void *pKey, int nKey,     /* Key to write or delete */
   488    486     const void *pVal, int nVal      /* Value to write. Or nVal==-1 for a delete */
   489    487   ){
   490    488     int rc = LSM_OK;                /* Return code */
   491    489     int bCommit = 0;                /* True to commit before returning */
   492    490   
   493    491     if( pDb->nTransOpen==0 ){
   494    492       bCommit = 1;
   495    493       rc = lsm_begin(pDb, 1);
   496    494     }
   497    495   
   498    496     if( rc==LSM_OK ){
   499         -    rc = lsmLogWrite(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
          497  +    if( bDeleteRange==0 ){
          498  +      rc = lsmLogWrite(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
          499  +    }else{
          500  +      /* TODO */
          501  +    }
   500    502     }
   501    503   
   502    504     lsmSortedSaveTreeCursors(pDb);
   503    505   
   504    506     if( rc==LSM_OK ){
   505    507       int pgsz = lsmFsPageSize(pDb->pFS);
   506    508       int nQuant = LSM_AUTOWORK_QUANT * pgsz;
................................................................................
   509    511       int nDiff;
   510    512   
   511    513       if( nQuant>pDb->nTreeLimit ){
   512    514         nQuant = pDb->nTreeLimit;
   513    515       }
   514    516   
   515    517       nBefore = lsmTreeSize(pDb);
   516         -    rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
          518  +    if( bDeleteRange ){
          519  +      rc = lsmTreeDelete(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
          520  +    }else{
          521  +      rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
          522  +    }
          523  +
   517    524       nAfter = lsmTreeSize(pDb);
   518    525       nDiff = (nAfter/nQuant) - (nBefore/nQuant);
   519    526       if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
   520    527         rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);
   521    528       }
   522    529     }
   523    530   
................................................................................
   529    536       }else{
   530    537         lsm_rollback(pDb, 0);
   531    538       }
   532    539     }
   533    540   
   534    541     return rc;
   535    542   }
          543  +
          544  +/* 
          545  +** Write a new value into the database.
          546  +*/
          547  +int lsm_write(
          548  +  lsm_db *db,                     /* Database connection */
          549  +  const void *pKey, int nKey,     /* Key to write or delete */
          550  +  const void *pVal, int nVal      /* Value to write. Or nVal==-1 for a delete */
          551  +){
          552  +  return doWriteOp(db, 0, pKey, nKey, pVal, nVal);
          553  +}
   536    554   
   537    555   /*
   538    556   ** Delete a value from the database. 
   539    557   */
   540         -int lsm_delete(lsm_db *pDb, const void *pKey, int nKey){
   541         -  return lsm_write(pDb, pKey, nKey, 0, -1);
          558  +int lsm_delete(lsm_db *db, const void *pKey, int nKey){
          559  +  return doWriteOp(db, 0, pKey, nKey, 0, -1);
          560  +}
          561  +
          562  +/*
          563  +** Delete a range of database keys.
          564  +*/
          565  +int lsm_delete_range(
          566  +  lsm_db *db,                     /* Database handle */
          567  +  const void *pKey1, int nKey1,   /* Lower bound of range to delete */
          568  +  const void *pKey2, int nKey2    /* Upper bound of range to delete */
          569  +){
          570  +  int rc = LSM_OK;
          571  +  if( db->xCmp((void *)pKey1, nKey1, (void *)pKey2, nKey2)<0 ){
          572  +    rc = doWriteOp(db, 1, pKey1, nKey1, pKey2, nKey2);
          573  +  }
          574  +  return rc;
   542    575   }
   543    576   
   544    577   /*
   545    578   ** Open a new cursor handle. 
   546    579   **
   547    580   ** If there are currently no other open cursor handles, and no open write
   548    581   ** transaction, open a read transaction here.

Changes to src/lsm_shared.c.

   631    631   /*
   632    632   ** Argument bFlush is true if the contents of the in-memory tree has just
   633    633   ** been flushed to disk. The significance of this is that once the snapshot
   634    634   ** created to hold the updated state of the database is synced to disk, log
   635    635   ** file space can be recycled.
   636    636   */
   637    637   void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
   638         -  /* If no error has occurred, serialize the worker snapshot and write
   639         -  ** it to shared memory.  */
   640         -
   641         -  assert( pDb->pWorker );
   642         -  assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 );
   643         -  if( *pRc==LSM_OK ){
   644         -    *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
   645         -  }
   646         -
          638  +  assert( *pRc!=0 || pDb->pWorker );
   647    639     if( pDb->pWorker ){
          640  +    /* If no error has occurred, serialize the worker snapshot and write
          641  +    ** it to shared memory.  */
          642  +    assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 );
          643  +    if( *pRc==LSM_OK ){
          644  +      *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
          645  +    }
   648    646       lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
   649    647       pDb->pWorker = 0;
   650    648     }
   651    649   
   652    650     lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
   653    651   }
   654    652   

Changes to src/lsm_sorted.c.

    38     38   **   (N==0). And on most pages the first record that starts on the page will
    39     39   **   not start at byte offset 0. For example:
    40     40   **
    41     41   **      aaaaa bbbbb ccc <footer>    cc eeeee fffff g <footer>    gggg....
    42     42   **
    43     43   ** RECORD FORMAT:
    44     44   ** 
    45         -**   Each record in a sorted file is either a WRITE, a DELETE, or a 
    46         -**   SEPARATOR. 
           45  +**   The first byte of the record is a flags byte. It is a combination
           46  +**   of the following flags (defined in lsmInt.h):
    47     47   **
    48         -**   The first byte of the record indicates the type, as follows:
           48  +**       LSM_START_DELETE
           49  +**       LSM_END_DELETE 
           50  +**       LSM_POINT_DELETE
           51  +**       LSM_INSERT    
           52  +**       LSM_SEPARATOR
           53  +**       LSM_SYSTEMKEY
    49     54   **
    50         -**     SORTED_SEPARATOR:  0x01
    51         -**     SORTED_WRITE:      0x02
    52         -**     SORTED_DELETE:     0x03
    53         -**
    54         -**   If the sorted file contains pointers, then immediately following the
    55         -**   type byte is a pointer to the smallest key in the next file that is larger
    56         -**   than the key in the current record. The pointer is encoded as a varint.
    57         -**   When added to the 32-bit page number stored in the footer, it is the 
    58         -**   page number of the page that contains the smallest key in the next sorted 
    59         -**   file that is larger than this key. 
           55  +**   Immediately following the type byte is a pointer to the smallest key 
           56  +**   in the next file that is larger than the key in the current record. The 
           57  +**   pointer is encoded as a varint. When added to the 32-bit page number 
           58  +**   stored in the footer, it is the page number of the page that contains the
           59  +**   smallest key in the next sorted file that is larger than this key. 
    60     60   **
    61     61   **   Next is the number of bytes in the key, encoded as a varint.
    62     62   **
    63         -**   If the record is a SORTED_WRITE, the number of bytes in the value, as
           63  +**   If the LSM_INSERT flag is set, the number of bytes in the value, as
    64     64   **   a varint, is next.
    65     65   **
    66         -**   Finally, the blob of data containing the key, and for SORTED_WRITE
           66  +**   Finally, the blob of data containing the key, and for LSM_INSERT
    67     67   **   records, the value as well.
    68         -**
    69         -** SEPARATOR ARRAYS:
    70         -**
    71         -**   Each time a sorted run that spans more than one page is constructed, a
    72         -**   separators array is also constructed. A separators array contains normal
    73         -**   pages and b-tree pages. Both types of pages use the same format (as 
    74         -**   above).
    75         -**
    76         -**   The normal pages in the separators array contain a SORTED_SEPARATOR 
    77         -**   record with a copy of the first key on each page of the main array
    78         -**   except the leftmost. If a main array page contains no keys, then there
    79         -**   is no corresponding entry in the separators array.
    80     68   */
    81     69   
    82     70   #ifndef _LSM_INT_H
    83     71   # include "lsmInt.h"
    84     72   #endif
    85     73   #include "sqlite4.h"            /* only for sqlite4_snprintf() */
    86         -/* 
    87         -** Record types for user data.
    88         -*/
    89         -#define SORTED_SEPARATOR 0x01
    90         -#define SORTED_WRITE     0x02
    91         -#define SORTED_DELETE    0x03
    92         -
    93         -#define SORTED_SYSTEM_DATA 0x10
    94         -
    95         -/* 
    96         -** Record types for system data (e.g. free-block list, secondary storage
    97         -** management entries). These are the same as the user types with the
    98         -** most-significant bit set.
    99         -*/
   100         -#define SORTED_SYSTEM_SEPARATOR (SORTED_SYSTEM_DATA | SORTED_SEPARATOR)
   101         -#define SORTED_SYSTEM_WRITE     (SORTED_SYSTEM_DATA | SORTED_WRITE)
   102         -#define SORTED_SYSTEM_DELETE    (SORTED_SYSTEM_DATA | SORTED_DELETE)
   103     74   
   104     75   /*
   105     76   ** Macros to help decode record types.
   106     77   */
   107         -#define rtTopic(eType)       ((eType) & 0xF0)
   108         -#define rtIsDelete(eType)    (((eType) & 0x0F)==SORTED_DELETE)
   109         -#define rtIsSeparator(eType) (((eType) & 0x0F)==SORTED_SEPARATOR)
   110         -#define rtIsWrite(eType)     (((eType) & 0x0F)==SORTED_WRITE)
           78  +#define rtTopic(eType)       ((eType) & LSM_SYSTEMKEY)
           79  +#define rtIsDelete(eType)    (((eType) & 0x0F)==LSM_POINT_DELETE)
           80  +
           81  +#define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0)
           82  +#define rtIsWrite(eType)     (((eType) & LSM_INSERT)!=0)
           83  +#define rtIsSystem(eType)    (((eType) & LSM_SYSTEMKEY)!=0)
   111     84   
   112     85   /*
   113     86   ** The following macros are used to access a page footer.
   114     87   */
   115     88   #define SEGMENT_NRECORD_OFFSET(pgsz)        ((pgsz) - 2)
   116     89   #define SEGMENT_FLAGS_OFFSET(pgsz)          ((pgsz) - 2 - 2)
   117     90   #define SEGMENT_POINTER_OFFSET(pgsz)        ((pgsz) - 2 - 2 - 4)
................................................................................
   230    203     /* Comparison results */
   231    204     int nTree;                      /* Size of aTree[] array */
   232    205     int *aTree;                     /* Array of comparison results */
   233    206   
   234    207     /* Used by cursors flushing the in-memory tree only */
   235    208     int *pnOvfl;                    /* Number of free-list entries to store */
   236    209     void *pSystemVal;               /* Pointer to buffer to free */
          210  +
          211  +  /* Used by worker cursors only */
          212  +  Pgno *pPrevMergePtr;
   237    213   };
   238    214   
   239    215   #define CURSOR_DATA_TREE0     0   /* Current tree cursor */
   240    216   #define CURSOR_DATA_TREE1     1   /* The "old" tree, if any */
   241    217   #define CURSOR_DATA_SYSTEM    2
   242    218   #define CURSOR_DATA_SEGMENT   3
   243    219   
................................................................................
   264    240   **
   265    241   ** CURSOR_PREV_OK
   266    242   **   Set if it is Ok to call lsm_csr_prev().
   267    243   **
   268    244   ** CURSOR_READ_SEPARATORS
   269    245   **   Set if this cursor should visit the separator keys in segment 
   270    246   **   aPtr[nPtr-1].
          247  +**
          248  +** CURSOR_SEEK_EQ
          249  +**   Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation.
          250  +**   The key and value are stored in MultiCursor.key and MultiCursor.val
          251  +**   respectively.
   271    252   */
   272    253   #define CURSOR_IGNORE_DELETE    0x00000001
   273    254   #define CURSOR_NEW_SYSTEM       0x00000002
   274    255   #define CURSOR_AT_FREELIST      0x00000004
   275    256   #define CURSOR_IGNORE_SYSTEM    0x00000010
   276    257   #define CURSOR_NEXT_OK          0x00000020
   277    258   #define CURSOR_PREV_OK          0x00000040
   278    259   #define CURSOR_READ_SEPARATORS  0x00000080
          260  +#define CURSOR_SEEK_EQ          0x00000100
   279    261   
   280    262   typedef struct MergeWorker MergeWorker;
   281    263   typedef struct Hierarchy Hierarchy;
   282    264   
   283    265   struct Hierarchy {
   284    266     Page **apHier;
   285    267     int nHier;
................................................................................
   610    592       }
   611    593       if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;
   612    594   
   613    595       rc = pageGetBtreeKey(
   614    596           pCsr->aPg[iPg].pPage, iCell,
   615    597           &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
   616    598       );
   617         -    pCsr->eType |= SORTED_SEPARATOR;
          599  +    pCsr->eType |= LSM_SEPARATOR;
   618    600     }
   619    601   
   620    602     return rc;
   621    603   }
   622    604   
   623    605   static int btreeCursorPtr(u8 *aData, int nData, int iCell){
   624    606     int nCell;
................................................................................
   918    900             if( pCsr->aPg[i].iCell>0 ) break;
   919    901           }
   920    902           assert( i>=0 );
   921    903           rc = pageGetBtreeKey(
   922    904               pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
   923    905               &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
   924    906           );
   925         -        pCsr->eType |= SORTED_SEPARATOR;
          907  +        pCsr->eType |= LSM_SEPARATOR;
   926    908   
   927    909         }else{
   928    910           rc = btreeCursorLoadKey(pCsr);
   929    911         }
   930    912       }
   931    913     }
   932    914     return rc;
................................................................................
  1108   1090   
  1109   1091   static int segmentPtrAdvance(
  1110   1092     MultiCursor *pCsr, 
  1111   1093     SegmentPtr *pPtr,
  1112   1094     int bReverse
  1113   1095   ){
  1114   1096     int eDir = (bReverse ? -1 : 1);
         1097  +  Level *pLvl = pPtr->pLevel;
  1115   1098     do {
  1116   1099       int rc;
  1117   1100       int iCell;                    /* Number of new cell in page */
         1101  +    int svFlags = 0;              /* SegmentPtr.eType before advance */
  1118   1102   
  1119   1103       iCell = pPtr->iCell + eDir;
  1120   1104       assert( pPtr->pPg );
  1121   1105       assert( iCell<=pPtr->nCell && iCell>=-1 );
         1106  +
         1107  +    if( bReverse && pPtr->pSeg!=&pPtr->pLevel->lhs ){
         1108  +      svFlags = pPtr->eType;
         1109  +      assert( svFlags );
         1110  +    }
  1122   1111   
  1123   1112       if( iCell>=pPtr->nCell || iCell<0 ){
  1124   1113         do {
  1125   1114           rc = segmentPtrNextPage(pPtr, eDir); 
  1126   1115         }while( rc==LSM_OK 
  1127   1116              && pPtr->pPg 
  1128   1117              && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) ) 
  1129   1118         );
  1130   1119         if( rc!=LSM_OK ) return rc;
  1131   1120         iCell = bReverse ? (pPtr->nCell-1) : 0;
  1132   1121       }
  1133   1122       rc = segmentPtrLoadCell(pPtr, iCell);
  1134   1123       if( rc!=LSM_OK ) return rc;
         1124  +
         1125  +    if( svFlags && pPtr->pPg ){
         1126  +      int res = sortedKeyCompare(pCsr->pDb->xCmp,
         1127  +          rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
         1128  +          pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
         1129  +      );
         1130  +      if( res<0 ) segmentPtrReset(pPtr);
         1131  +    }
         1132  +    if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
         1133  +      rc = lsmFsDbPageGet(pCsr->pDb->pFS, pPtr->pSeg->iFirst, &pPtr->pPg);
         1134  +      if( rc!=LSM_OK ) return rc;
         1135  +      pPtr->eType = LSM_START_DELETE | (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
         1136  +      pPtr->pKey = pLvl->pSplitKey;
         1137  +      pPtr->nKey = pLvl->nSplitKey;
         1138  +    }
         1139  +
  1135   1140     }while( pCsr 
  1136   1141          && pPtr->pPg 
  1137   1142          && segmentPtrIgnoreSeparators(pCsr, pPtr)
  1138   1143          && rtIsSeparator(pPtr->eType)
  1139   1144     );
  1140   1145   
  1141   1146     return LSM_OK;
................................................................................
  1161   1166   ** points at either the first (bLast==0) or last (bLast==1) cell in the valid
  1162   1167   ** region of the segment defined by pPtr->iFirst and pPtr->iLast.
  1163   1168   **
  1164   1169   ** Return LSM_OK if successful or an lsm error code if something goes
  1165   1170   ** wrong (IO error, OOM etc.).
  1166   1171   */
  1167   1172   static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){
         1173  +  Level *pLvl = pPtr->pLevel;
  1168   1174     int rc = LSM_OK;
  1169   1175     FileSystem *pFS = pCsr->pDb->pFS;
  1170   1176     int bIgnore;
  1171   1177   
  1172   1178     segmentPtrEndPage(pFS, pPtr, bLast, &rc);
  1173   1179     while( rc==LSM_OK && pPtr->pPg 
  1174   1180         && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
  1175   1181     ){
  1176   1182       rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
  1177   1183     }
  1178         -  
         1184  +
  1179   1185     if( rc==LSM_OK && pPtr->pPg ){
  1180   1186       rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
  1181   1187     }
  1182         -
         1188  +  
  1183   1189     bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
  1184   1190     if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
  1185   1191       rc = segmentPtrAdvance(pCsr, pPtr, bLast);
  1186   1192     }
  1187   1193   
         1194  +
         1195  +  if( bLast && rc==LSM_OK && pPtr->pPg
         1196  +   && pPtr->pSeg==&pLvl->lhs 
         1197  +   && pLvl->nRight && (pPtr->eType & LSM_START_DELETE)
         1198  +  ){
         1199  +    pPtr->iCell++;
         1200  +    pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic);
         1201  +    pPtr->pKey = pLvl->pSplitKey;
         1202  +    pPtr->nKey = pLvl->nSplitKey;
         1203  +    pPtr->pVal = 0;
         1204  +    pPtr->nVal = 0;
         1205  +  }
         1206  +
  1188   1207     return rc;
  1189   1208   }
  1190   1209   
  1191   1210   static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
  1192   1211     assert( pPtr->pPg );
  1193   1212     *ppKey = pPtr->pKey;
  1194   1213     *pnKey = pPtr->nKey;
................................................................................
  1291   1310       if( eSeek==LSM_SEEK_GE ) return (res<=0);
  1292   1311     }
  1293   1312   
  1294   1313     return 1;
  1295   1314   }
  1296   1315   #endif
  1297   1316   
  1298         -int segmentPtrSeek(
         1317  +static int segmentPtrSearchOversized(
  1299   1318     MultiCursor *pCsr,              /* Cursor context */
  1300   1319     SegmentPtr *pPtr,               /* Pointer to seek */
  1301         -  void *pKey, int nKey,           /* Key to seek to */
  1302         -  int eSeek,                      /* Search bias - see above */
  1303         -  int *piPtr                      /* OUT: FC pointer */
         1320  +  void *pKey, int nKey            /* Key to seek to */
  1304   1321   ){
  1305   1322     int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  1306         -  int res;                        /* Result of comparison operation */
  1307   1323     int rc = LSM_OK;
  1308         -  int iMin;
  1309         -  int iMax;
  1310         -  int iPtrOut = 0;
  1311         -
  1312         -  const int iTopic = 0;
  1313   1324   
  1314   1325     /* If the OVERSIZED flag is set, then there is no pointer in the
  1315   1326     ** upper level to the next page in the segment that contains at least
  1316   1327     ** one key. So compare the largest key on the current page with the
  1317   1328     ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance
  1318   1329     ** to the next page in the segment that contains at least one key. 
  1319   1330     */
................................................................................
  1329   1340           pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1
  1330   1341       );
  1331   1342   
  1332   1343       /* If the loaded key is >= than (pKey/nKey), break out of the loop.
  1333   1344       ** If (pKey/nKey) is present in this array, it must be on the current 
  1334   1345       ** page.  */
  1335   1346       res = sortedKeyCompare(
  1336         -        xCmp, iLastTopic, pLastKey, nLastKey, iTopic, pKey, nKey
         1347  +        xCmp, iLastTopic, pLastKey, nLastKey, 0, pKey, nKey
  1337   1348       );
  1338   1349       if( res>=0 ) break;
  1339   1350   
  1340   1351       /* Advance to the next page that contains at least one key. */
  1341   1352       pNext = pPtr->pPg;
  1342   1353       lsmFsPageRef(pNext);
  1343   1354       while( 1 ){
................................................................................
  1360   1371       if( pNext==0 ) break;
  1361   1372       segmentPtrSetPage(pPtr, pNext);
  1362   1373   
  1363   1374       /* This should probably be an LSM_CORRUPT error. */
  1364   1375       assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) );
  1365   1376     }
  1366   1377   
         1378  +  return rc;
         1379  +}
         1380  +
         1381  +static int ptrFwdPointer(
         1382  +  Page *pPage,
         1383  +  int iCell,
         1384  +  Segment *pSeg,
         1385  +  Pgno *piPtr,
         1386  +  int *pbFound
         1387  +){
         1388  +  Page *pPg = pPage;
         1389  +  int iFirst = iCell;
         1390  +  int rc = LSM_OK;
         1391  +
         1392  +  do {
         1393  +    Page *pNext = 0;
         1394  +    u8 *aData;
         1395  +    int nData;
         1396  +
         1397  +    aData = lsmFsPageData(pPg, &nData);
         1398  +    if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){
         1399  +      int i;
         1400  +      int nCell = pageGetNRec(aData, nData);
         1401  +      for(i=iFirst; i<nCell; i++){
         1402  +        u8 eType = *pageGetCell(aData, nData, i);
         1403  +        if( (eType & LSM_START_DELETE)==0 ){
         1404  +          *pbFound = 1;
         1405  +          *piPtr = pageGetRecordPtr(aData, nData, i) + pageGetPtr(aData, nData);
         1406  +          lsmFsPageRelease(pPg);
         1407  +          return LSM_OK;
         1408  +        }
         1409  +      }
         1410  +    }
         1411  +
         1412  +    rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
         1413  +    lsmFsPageRelease(pPg);
         1414  +    pPg = pNext;
         1415  +    iFirst = 0;
         1416  +  }while( pPg && rc==LSM_OK );
         1417  +  lsmFsPageRelease(pPg);
         1418  +
         1419  +  *pbFound = 0;
         1420  +  return rc;
         1421  +}
         1422  +
         1423  +static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
         1424  +  int rc;
         1425  +  rc = segmentPtrEnd(pCsr, pPtr, 0);
         1426  +  while( pPtr->pPg && rc==LSM_OK ){
         1427  +    int res = sortedKeyCompare(pCsr->pDb->xCmp,
         1428  +        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
         1429  +        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
         1430  +    );
         1431  +    if( res<=0 ) break;
         1432  +    rc = segmentPtrAdvance(pCsr, pPtr, 0);
         1433  +  }
         1434  +  return rc;
         1435  +}
         1436  +
         1437  +
         1438  +/*
         1439  +** This function is called as part of a SEEK_GE op on a multi-cursor if the 
         1440  +** FC pointer read from segment *pPtr comes from an entry with the 
         1441  +** LSM_START_DELETE flag set. In this case the pointer value cannot be 
         1442  +** trusted. Instead, the pointer that should be followed is that associated
         1443  +** with the next entry in *pPtr that does not have LSM_START_DELETE set.
         1444  +**
         1445  +** Why the pointers can't be trusted:
         1446  +**
         1447  +**
         1448  +**
         1449  +** TODO: This is a stop-gap solution:
         1450  +** 
         1451  +**   At the moment, this function is called from within segmentPtrSeek(), 
         1452  +**   as part of the initial lsmMCursorSeek() call. However, consider a 
         1453  +**   database where the following has occurred:
         1454  +**
         1455  +**      1. A range delete removes keys 1..9999 using a range delete.
         1456  +**      2. Keys 1 through 9999 are reinserted.
         1457  +**      3. The levels containing the ops in 1. and 2. above are merged. Call
         1458  +**         this level N. Level N contains FC pointers to level N+1.
         1459  +**
         1460  +**   Then, if the user attempts to query for (key>=2 LIMIT 10), the 
         1461  +**   lsmMCursorSeek() call will iterate through 9998 entries searching for a 
         1462  +**   pointer down to the level N+1 that is never actually used. It would be
         1463  +**   much better if the multi-cursor could do this lazily - only seek to the
         1464  +**   level (N+1) page after the user has moved the cursor on level N passed
         1465  +**   the big range-delete.
         1466  +*/
         1467  +static int segmentPtrFwdPointer(
         1468  +  MultiCursor *pCsr,              /* Multi-cursor pPtr belongs to */
         1469  +  SegmentPtr *pPtr,               /* Segment-pointer to extract FC ptr from */
         1470  +  Pgno *piPtr                     /* OUT: FC pointer value */
         1471  +){
         1472  +  Level *pLvl = pPtr->pLevel;
         1473  +  Level *pNext = pLvl->pNext;
         1474  +  Page *pPg = pPtr->pPg;
         1475  +  int rc;
         1476  +  int bFound;
         1477  +  Pgno iOut = 0;
         1478  +
         1479  +  if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){
         1480  +    if( pNext==0 
         1481  +        || (pNext->nRight==0 && pNext->lhs.iRoot)
         1482  +        || (pNext->nRight!=0 && pNext->aRhs[0].iRoot)
         1483  +      ){
         1484  +      /* Do nothing. The pointer will not be used anyway. */
         1485  +      return LSM_OK;
         1486  +    }
         1487  +  }else{
         1488  +    if( pPtr[1].pSeg->iRoot ){
         1489  +      return LSM_OK;
         1490  +    }
         1491  +  }
         1492  +
         1493  +  /* Search for a pointer within the current segment. */
         1494  +  lsmFsPageRef(pPg);
         1495  +  rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound);
         1496  +
         1497  +  if( rc==LSM_OK && bFound==0 ){
         1498  +    /* This case happens when pPtr points to the left-hand-side of a segment
         1499  +    ** currently undergoing an incremental merge. In this case, jump to the
         1500  +    ** oldest segment in the right-hand-side of the same level and continue
         1501  +    ** searching. But - do not consider any keys smaller than the levels
         1502  +    ** split-key. */
         1503  +    SegmentPtr ptr;
         1504  +
         1505  +    if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){
         1506  +      return LSM_CORRUPT_BKPT;
         1507  +    }
         1508  +
         1509  +    memset(&ptr, 0, sizeof(SegmentPtr));
         1510  +    ptr.pLevel = pPtr->pLevel;
         1511  +    ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1];
         1512  +    rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr);
         1513  +    if( rc==LSM_OK ){
         1514  +      rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound);
         1515  +      ptr.pPg = 0;
         1516  +    }
         1517  +    segmentPtrReset(&ptr);
         1518  +  }
         1519  +
         1520  +  *piPtr = iOut;
         1521  +  return rc;
         1522  +}
         1523  +
         1524  +static int segmentPtrSeek(
         1525  +  MultiCursor *pCsr,              /* Cursor context */
         1526  +  SegmentPtr *pPtr,               /* Pointer to seek */
         1527  +  void *pKey, int nKey,           /* Key to seek to */
         1528  +  int eSeek,                      /* Search bias - see above */
         1529  +  int *piPtr,                     /* OUT: FC pointer */
         1530  +  int *pbStop
         1531  +){
         1532  +  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
         1533  +  int res;                        /* Result of comparison operation */
         1534  +  int rc = LSM_OK;
         1535  +  int iMin;
         1536  +  int iMax;
         1537  +  int iPtrOut = 0;
         1538  +  const int iTopic = 0;
         1539  +
         1540  +  /* If the current page contains an oversized entry, then there are no
         1541  +  ** pointers to one or more of the subsequent pages in the sorted run.
         1542  +  ** The following call ensures that the segment-ptr points to the correct 
         1543  +  ** page in this case.  */
         1544  +  rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey);
  1367   1545     iPtrOut = pPtr->iPtr;
  1368   1546   
  1369   1547     /* Assert that this page is the right page of this segment for the key
  1370   1548     ** that we are searching for. Do this by loading page (iPg-1) and testing
  1371   1549     ** that pKey/nKey is greater than all keys on that page, and then by 
  1372   1550     ** loading (iPg+1) and testing that pKey/nKey is smaller than all
  1373         -  ** the keys it houses.  */
         1551  +  ** the keys it houses.  
         1552  +  **
         1553  +  ** TODO: With range-deletes in the tree, the test described above may fail.
         1554  +  */
  1374   1555   #if 0
  1375   1556     assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
  1376   1557   #endif
  1377   1558   
  1378   1559     assert( pPtr->nCell>0 
  1379   1560          || pPtr->pSeg->nSize==1 
  1380   1561          || lsmFsPageNumber(pPtr->pPg)==pPtr->pSeg->iLast
................................................................................
  1413   1594       }
  1414   1595   
  1415   1596       if( rc==LSM_OK ){
  1416   1597         assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
  1417   1598         if( res ){
  1418   1599           rc = segmentPtrLoadCell(pPtr, iMin);
  1419   1600         }
         1601  +      assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) );
  1420   1602   
  1421   1603         if( rc==LSM_OK ){
  1422   1604           switch( eSeek ){
  1423         -          case LSM_SEEK_EQ:
  1424         -            if( res!=0 || rtIsSeparator(pPtr->eType) ) segmentPtrReset(pPtr);
         1605  +          case LSM_SEEK_EQ: {
         1606  +            int eType = pPtr->eType;
         1607  +            if( (res<0 && (eType & LSM_START_DELETE))
         1608  +             || (res>0 && (eType & LSM_END_DELETE))
         1609  +             || (res==0 && (eType & LSM_POINT_DELETE))
         1610  +            ){
         1611  +              *pbStop = 1;
         1612  +            }else if( res==0 && (eType & LSM_INSERT) ){
         1613  +              lsm_env *pEnv = pCsr->pDb->pEnv;
         1614  +              *pbStop = 1;
         1615  +              pCsr->eType = pPtr->eType;
         1616  +              rc = sortedBlobSet(pEnv, &pCsr->key, pPtr->pKey, pPtr->nKey);
         1617  +              if( rc==LSM_OK ){
         1618  +                rc = sortedBlobSet(pEnv, &pCsr->val, pPtr->pVal, pPtr->nVal);
         1619  +              }
         1620  +              pCsr->flags |= CURSOR_SEEK_EQ;
         1621  +            }
         1622  +            segmentPtrReset(pPtr);
  1425   1623               break;
         1624  +          }
  1426   1625             case LSM_SEEK_LE:
  1427   1626               if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
  1428   1627               break;
  1429         -          case LSM_SEEK_GE:
  1430         -            if( res<0 ) rc = segmentPtrAdvance(pCsr, pPtr, 0);
         1628  +          case LSM_SEEK_GE: {
         1629  +            /* Figure out if we need to 'skip' the pointer forward or not */
         1630  +            if( (res<=0 && (pPtr->eType & LSM_START_DELETE)) 
         1631  +             || (res>0  && (pPtr->eType & LSM_END_DELETE)) 
         1632  +            ){
         1633  +              rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut);
         1634  +            }
         1635  +            if( res<0 && rc==LSM_OK ){
         1636  +              rc = segmentPtrAdvance(pCsr, pPtr, 0);
         1637  +            }
  1431   1638               break;
         1639  +          }
  1432   1640           }
  1433   1641         }
  1434   1642       }
  1435   1643   
  1436   1644       /* If the cursor seek has found a separator key, and this cursor is
  1437   1645       ** supposed to ignore separators keys, advance to the next entry.  */
  1438         -    if( rc==LSM_OK && pPtr->pPg 
         1646  +    if( rc==LSM_OK && pPtr->pPg
  1439   1647        && segmentPtrIgnoreSeparators(pCsr, pPtr) 
  1440   1648        && rtIsSeparator(pPtr->eType)
  1441   1649       ){
         1650  +      assert( eSeek!=LSM_SEEK_EQ );
  1442   1651         rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
  1443   1652       }
  1444   1653     }
  1445   1654   
  1446   1655     assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  1447   1656     *piPtr = iPtrOut;
  1448   1657     return rc;
................................................................................
  1531   1740   
  1532   1741   static int seekInSegment(
  1533   1742     MultiCursor *pCsr, 
  1534   1743     SegmentPtr *pPtr,
  1535   1744     void *pKey, int nKey,
  1536   1745     int iPg,                        /* Page to search */
  1537   1746     int eSeek,                      /* Search bias - see above */
  1538         -  int *piPtr                      /* OUT: FC pointer */
         1747  +  int *piPtr,                     /* OUT: FC pointer */
         1748  +  int *pbStop                     /* OUT: Stop search flag */
  1539   1749   ){
  1540   1750     int iPtr = iPg;
  1541   1751     int rc = LSM_OK;
  1542   1752   
  1543   1753     if( pPtr->pSeg->iRoot ){
  1544   1754       Page *pPg;
  1545   1755       assert( pPtr->pSeg->iRoot!=0 );
................................................................................
  1551   1761       }
  1552   1762       if( rc==LSM_OK ){
  1553   1763         rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
  1554   1764       }
  1555   1765     }
  1556   1766   
  1557   1767     if( rc==LSM_OK ){
  1558         -    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr);
         1768  +    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr, pbStop);
  1559   1769     }
  1560   1770     return rc;
  1561   1771   }
  1562   1772   
  1563   1773   /*
  1564   1774   ** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
         1775  +**
         1776  +** pbStop:
         1777  +**   This parameter is only significant if parameter eSeek is set to
         1778  +**   LSM_SEEK_EQ. In this case, it is set to true before returning if
         1779  +**   the seek operation is finished. This can happen in two ways:
         1780  +**   
         1781  +**     a) A key matching (pKey/nKey) is found, or
         1782  +**     b) A point-delete or range-delete deleting the key is found.
         1783  +**
         1784  +**   In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
         1785  +**   and pCsr->val blobs populated before returning.
  1565   1786   */
  1566   1787   static int seekInLevel(
  1567   1788     MultiCursor *pCsr,              /* Sorted cursor object to seek */
  1568         -  Level *pLvl,                    /* Level to seek within */
  1569         -  SegmentPtr *aPtr,               /* Pointer to array of (nRight+1) SPs */
         1789  +  SegmentPtr *aPtr,               /* Pointer to array of (nRhs+1) SPs */
  1570   1790     int eSeek,                      /* Search bias - see above */
  1571   1791     void *pKey, int nKey,           /* Key to search for */
  1572         -  Pgno *piPgno                    /* IN/OUT: fraction cascade pointer (or 0) */
         1792  +  Pgno *piPgno,                   /* IN/OUT: fraction cascade pointer (or 0) */
         1793  +  int *pbStop                     /* OUT: See above */
  1573   1794   ){
         1795  +  Level *pLvl = aPtr[0].pLevel;   /* Level to seek within */
  1574   1796     int rc = LSM_OK;                /* Return code */
  1575   1797     int iOut = 0;                   /* Pointer to return to caller */
  1576   1798     int res = -1;                   /* Result of xCmp(pKey, split) */
  1577   1799     int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */
         1800  +  int bStop = 0;
  1578   1801   
  1579   1802     /* If this is a composite level (one currently undergoing an incremental
  1580   1803     ** merge), figure out if the search key is larger or smaller than the
  1581   1804     ** levels split-key.  */
  1582   1805     if( nRhs ){
  1583   1806       res = sortedKeyCompare(pCsr->pDb->xCmp, 0, pKey, nKey, 
  1584   1807           pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
................................................................................
  1588   1811     /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  1589   1812     ** is not a composite level and there is no split-key). Search the 
  1590   1813     ** left-hand-side of the level in this case.  */
  1591   1814     if( res<0 ){
  1592   1815       int iPtr = 0;
  1593   1816       if( nRhs==0 ) iPtr = *piPgno;
  1594   1817   
  1595         -    rc = seekInSegment(pCsr, &aPtr[0], pKey, nKey, iPtr, eSeek, &iOut);
         1818  +    rc = seekInSegment(pCsr, &aPtr[0], pKey, nKey, iPtr, eSeek, &iOut, &bStop);
  1596   1819       if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
  1597   1820         res = 0;
  1598   1821       }
  1599   1822     }
  1600   1823     
  1601   1824     if( res>=0 ){
  1602   1825       int iPtr = *piPgno;
  1603   1826       int i;
  1604         -    for(i=1; rc==LSM_OK && i<=nRhs; i++){
         1827  +    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
  1605   1828         iOut = 0;
  1606         -      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut);
         1829  +      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut,&bStop);
  1607   1830         iPtr = iOut;
  1608   1831       }
  1609   1832   
  1610   1833       if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
  1611   1834         rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
  1612   1835       }
  1613   1836     }
  1614   1837   
         1838  +  assert( eSeek==LSM_SEEK_EQ || bStop==0 );
  1615   1839     *piPgno = iOut;
         1840  +  *pbStop = bStop;
  1616   1841     return rc;
  1617   1842   }
  1618   1843   
  1619   1844   static void multiCursorGetKey(
  1620   1845     MultiCursor *pCsr, 
  1621   1846     int iKey,
  1622   1847     int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
................................................................................
  1631   1856       case CURSOR_DATA_TREE0:
  1632   1857       case CURSOR_DATA_TREE1: {
  1633   1858         TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
  1634   1859         if( lsmTreeCursorValid(pTreeCsr) ){
  1635   1860           int nVal;
  1636   1861           void *pVal;
  1637   1862   
  1638         -        lsmTreeCursorKey(pTreeCsr, &pKey, &nKey);
         1863  +        lsmTreeCursorKey(pTreeCsr, &eType, &pKey, &nKey);
  1639   1864           lsmTreeCursorValue(pTreeCsr, &pVal, &nVal);
  1640         -        eType = (nVal<0) ? SORTED_DELETE : SORTED_WRITE;
  1641   1865         }
  1642   1866         break;
  1643   1867       }
  1644   1868   
  1645   1869       case CURSOR_DATA_SYSTEM:
  1646   1870         if( pCsr->flags & CURSOR_AT_FREELIST ){
  1647   1871           pKey = (void *)"FREELIST";
  1648   1872           nKey = 8;
  1649         -        eType = SORTED_SYSTEM_WRITE;
         1873  +        eType = LSM_SYSTEMKEY | LSM_INSERT;
  1650   1874         }
  1651   1875         break;
  1652   1876   
  1653   1877       default: {
  1654   1878         int iPtr = iKey - CURSOR_DATA_SEGMENT;
  1655   1879         assert( iPtr>=0 );
  1656   1880         if( iPtr==pCsr->nPtr ){
................................................................................
  1672   1896     }
  1673   1897   
  1674   1898     if( peType ) *peType = eType;
  1675   1899     if( pnKey ) *pnKey = nKey;
  1676   1900     if( ppKey ) *ppKey = pKey;
  1677   1901   }
  1678   1902   
         1903  +static int sortedDbKeyCompare(
         1904  +  int (*xCmp)(void *, int, void *, int),
         1905  +  int iLhsFlags, void *pLhsKey, int nLhsKey,
         1906  +  int iRhsFlags, void *pRhsKey, int nRhsKey
         1907  +){
         1908  +  int res;
         1909  +
         1910  +  /* Compare the keys, including the system flag. */
         1911  +  res = sortedKeyCompare(xCmp, 
         1912  +    rtTopic(iLhsFlags), pLhsKey, nLhsKey,
         1913  +    rtTopic(iRhsFlags), pRhsKey, nRhsKey
         1914  +  );
         1915  +
         1916  +  /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or
         1917  +  ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents
         1918  +  ** the beginning of an open-ended set from masking a database entry or
         1919  +  ** delete at a lower level.  */
         1920  +  if( res==0 ){
         1921  +    const int insdel = LSM_POINT_DELETE|LSM_INSERT;
         1922  +    int iDel1 = 0;
         1923  +    int iDel2 = 0;
         1924  +    if( LSM_START_DELETE==(iLhsFlags & (LSM_START_DELETE|insdel)) ) iDel1 = +1;
         1925  +    if( LSM_END_DELETE  ==(iLhsFlags & (LSM_END_DELETE  |insdel)) ) iDel1 = -1;
         1926  +    if( LSM_START_DELETE==(iRhsFlags & (LSM_START_DELETE|insdel)) ) iDel2 = +1;
         1927  +    if( LSM_END_DELETE  ==(iRhsFlags & (LSM_END_DELETE  |insdel)) ) iDel2 = -1;
         1928  +    res = (iDel1 - iDel2);
         1929  +  }
         1930  +
         1931  +  return res;
         1932  +}
  1679   1933   
  1680   1934   static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
  1681   1935     int i1;
  1682   1936     int i2;
  1683   1937     int iRes;
  1684   1938     void *pKey1; int nKey1; int eType1;
  1685   1939     void *pKey2; int nKey2; int eType2;
  1686         -
  1687         -  int mul = (bReverse ? -1 : 1);
         1940  +  const int mul = (bReverse ? -1 : 1);
  1688   1941   
  1689   1942     assert( pCsr->aTree && iOut<pCsr->nTree );
  1690   1943     if( iOut>=(pCsr->nTree/2) ){
  1691   1944       i1 = (iOut - pCsr->nTree/2) * 2;
  1692   1945       i2 = i1 + 1;
  1693   1946     }else{
  1694   1947       i1 = pCsr->aTree[iOut*2];
................................................................................
  1701   1954     if( pKey1==0 ){
  1702   1955       iRes = i2;
  1703   1956     }else if( pKey2==0 ){
  1704   1957       iRes = i1;
  1705   1958     }else{
  1706   1959       int res;
  1707   1960   
  1708         -    res = (rtTopic(eType1) - rtTopic(eType2));
  1709         -    if( res==0 ){
  1710         -      res = pCsr->pDb->xCmp(pKey1, nKey1, pKey2, nKey2);
  1711         -    }
         1961  +    /* Compare the keys */
         1962  +    res = sortedDbKeyCompare(pCsr->pDb->xCmp, 
         1963  +        eType1, pKey1, nKey1, eType2, pKey2, nKey2
         1964  +    );
         1965  +
  1712   1966       res = res * mul;
  1713         -
  1714   1967       if( res==0 ){
  1715   1968         iRes = (rtIsSeparator(eType1) ? i2 : i1);
  1716   1969       }else if( res<0 ){
  1717   1970         iRes = i1;
  1718   1971       }else{
  1719   1972         iRes = i2;
  1720   1973       }
  1721   1974     }
  1722   1975   
  1723   1976     pCsr->aTree[iOut] = iRes;
  1724   1977   }
  1725   1978   
  1726         -static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
  1727         -  int rc;
  1728         -  rc = segmentPtrEnd(pCsr, pPtr, 0);
  1729         -  while( pPtr->pPg && rc==LSM_OK ){
  1730         -    int res = sortedKeyCompare(pCsr->pDb->xCmp,
  1731         -        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
  1732         -        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
  1733         -    );
  1734         -    if( res<=0 ) break;
  1735         -    rc = segmentPtrAdvance(pCsr, pPtr, 0);
  1736         -  }
  1737         -  return rc;
  1738         -}
  1739         -
  1740   1979   /*
  1741   1980   ** This function advances segment pointer iPtr belonging to multi-cursor
  1742   1981   ** pCsr forward (bReverse==0) or backward (bReverse!=0).
  1743   1982   **
  1744   1983   ** If the segment pointer points to a segment that is part of a composite
  1745         -** level, then the following special cases are handled.
         1984  +** level, then the following special case is handled.
  1746   1985   **
  1747   1986   **   * If iPtr is the lhs of a composite level, and the cursor is being
  1748   1987   **     advanced forwards, and segment iPtr is at EOF, move all pointers
  1749   1988   **     that correspond to rhs segments of the same level to the first
  1750   1989   **     key in their respective data.
  1751         -**
  1752         -**   * If iPtr is on the rhs of a composite level, and the cursor is being
  1753         -**     reversed, and the new key is smaller than the split-key, set the
  1754         -**     segment pointer to point to EOF.
  1755   1990   */
  1756   1991   static int segmentCursorAdvance(
  1757   1992     MultiCursor *pCsr, 
  1758   1993     int iPtr,
  1759   1994     int bReverse
  1760   1995   ){
  1761   1996     int rc;
................................................................................
  1768   2003     bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);
  1769   2004   
  1770   2005     if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
  1771   2006      && bReverse==0                                /* csr advanced forwards */
  1772   2007      && pPtr->pPg==0                               /* segment at EOF */
  1773   2008     ){
  1774   2009       int i;
  1775         -
  1776   2010       for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
  1777   2011         rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
  1778   2012       }
  1779         -
  1780   2013       for(i=pCsr->nTree-1; i>0; i--){
  1781   2014         multiCursorDoCompare(pCsr, i, 0);
  1782   2015       }
  1783   2016     }
  1784   2017   
  1785         -  else if( pPtr->pPg && bComposite && bReverse && pPtr->pSeg!=&pLvl->lhs ){
  1786         -    int res = sortedKeyCompare(pCsr->pDb->xCmp,
  1787         -        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
  1788         -        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
  1789         -    );
  1790         -    if( res<0 ){
  1791         -      segmentPtrReset(pPtr);
  1792         -    }
  1793         -  }
  1794         -
  1795   2018     return rc;
  1796   2019   }
  1797   2020   
  1798   2021   static void mcursorFreeComponents(MultiCursor *pCsr){
  1799   2022     int i;
  1800   2023     lsm_env *pEnv = pCsr->pDb->pEnv;
  1801   2024   
................................................................................
  2077   2300       rc = multiCursorAddAll(pCsr, pDb->pWorker);
  2078   2301       pCsr->flags |= CURSOR_IGNORE_DELETE;
  2079   2302     }
  2080   2303     
  2081   2304     if( rc==LSM_OK ){
  2082   2305       rc = lsmMCursorLast(pCsr);
  2083   2306       if( rc==LSM_OK 
  2084         -     && pCsr->eType==SORTED_SYSTEM_WRITE 
         2307  +     && rtIsWrite(pCsr->eType) && rtIsSystem(pCsr->eType)
  2085   2308        && pCsr->key.nData==8 
  2086   2309        && 0==memcmp(pCsr->key.pData, "FREELIST", 8)
  2087   2310       ){
  2088   2311         void *pVal; int nVal;         /* Value read from database */
  2089   2312         rc = lsmMCursorValue(pCsr, &pVal, &nVal);
  2090   2313         if( rc==LSM_OK ){
  2091   2314           *ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc);
................................................................................
  2124   2347     if( *pRc==LSM_OK ){
  2125   2348       void *pKey;
  2126   2349       int nKey;
  2127   2350       multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
  2128   2351       *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
  2129   2352     }
  2130   2353   }
         2354  +
         2355  +static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
         2356  +  int eType = pCsr->eType;
         2357  +  int iKey;
         2358  +  int i;
         2359  +  int rdmask = 0;
         2360  +  
         2361  +  assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
         2362  +  if( pCsr->flags & CURSOR_NEXT_OK ){
         2363  +    rdmask = LSM_END_DELETE;
         2364  +  }else{
         2365  +    rdmask = LSM_START_DELETE;
         2366  +  }
         2367  +
         2368  +  if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
         2369  +    if( (eType & LSM_INSERT)==0 ) return 0;
         2370  +  }
         2371  +  if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
         2372  +    return 0;
         2373  +  }
         2374  +
         2375  +  /* Check if this key has already been deleted by a range-delete */
         2376  +  iKey = pCsr->aTree[1];
         2377  +  if( (iKey>0 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[0]))) 
         2378  +   || (iKey>1 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[1]))) 
         2379  +  ){
         2380  +    return 0;
         2381  +  }
         2382  +  for(i=CURSOR_DATA_SEGMENT; i<iKey; i++){
         2383  +    int iPtr = i-CURSOR_DATA_SEGMENT;
         2384  +    if( pCsr->aPtr[iPtr].pPg && (pCsr->aPtr[iPtr].eType & rdmask) ){
         2385  +      return 0;
         2386  +    }
         2387  +  }
         2388  +
         2389  +  return 1;
         2390  +}
  2131   2391   
  2132   2392   static int multiCursorEnd(MultiCursor *pCsr, int bLast){
  2133   2393     int rc = LSM_OK;
  2134   2394     int i;
  2135   2395   
  2136   2396     pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
         2397  +  pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
         2398  +
  2137   2399     if( pCsr->apTreeCsr[0] ){
  2138   2400       rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast);
  2139   2401     }
  2140   2402     if( rc==LSM_OK && pCsr->apTreeCsr[1] ){
  2141   2403       rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast);
  2142   2404     }
  2143   2405   
................................................................................
  2173   2435     if( rc==LSM_OK ){
  2174   2436       rc = multiCursorAllocTree(pCsr);
  2175   2437     }
  2176   2438     if( rc==LSM_OK ){
  2177   2439       for(i=pCsr->nTree-1; i>0; i--){
  2178   2440         multiCursorDoCompare(pCsr, i, bLast);
  2179   2441       }
  2180         -    pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
  2181   2442     }
  2182   2443   
  2183   2444     multiCursorCacheKey(pCsr, &rc);
  2184         -  if( rc==LSM_OK && (
  2185         -        ((pCsr->flags & CURSOR_IGNORE_DELETE) && rtIsDelete(pCsr->eType))
  2186         -     || ((pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(pCsr->eType))
  2187         -  )){
         2445  +  if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
  2188   2446       if( bLast ){
  2189   2447         rc = lsmMCursorPrev(pCsr);
  2190   2448       }else{
  2191   2449         rc = lsmMCursorNext(pCsr);
  2192   2450       }
  2193   2451     }
  2194   2452   
................................................................................
  2255   2513     lsmTreeCursorReset(pCsr->apTreeCsr[1]);
  2256   2514     for(i=0; i<pCsr->nPtr; i++){
  2257   2515       segmentPtrReset(&pCsr->aPtr[i]);
  2258   2516     }
  2259   2517     pCsr->key.nData = 0;
  2260   2518   }
  2261   2519   
  2262         -static void treeCursorSeek(
         2520  +static int treeCursorSeek(
         2521  +  MultiCursor *pCsr,
  2263   2522     TreeCursor *pTreeCsr, 
  2264   2523     void *pKey, int nKey, 
  2265         -  int eSeek
         2524  +  int eSeek,
         2525  +  int *pbStop
  2266   2526   ){
         2527  +  int rc = LSM_OK;
  2267   2528     if( pTreeCsr ){
  2268   2529       int res = 0;
  2269   2530       lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
  2270   2531       switch( eSeek ){
  2271         -      case LSM_SEEK_EQ:
  2272         -        if( res!=0 ){
  2273         -          lsmTreeCursorReset(pTreeCsr);
         2532  +      case LSM_SEEK_EQ: {
         2533  +        int eType = lsmTreeCursorFlags(pTreeCsr);
         2534  +        if( (res<0 && (eType & LSM_START_DELETE))
         2535  +         || (res>0 && (eType & LSM_END_DELETE))
         2536  +         || (res==0 && (eType & LSM_POINT_DELETE))
         2537  +        ){
         2538  +          *pbStop = 1;
         2539  +        }else if( res==0 && (eType & LSM_INSERT) ){
         2540  +          lsm_env *pEnv = pCsr->pDb->pEnv;
         2541  +          void *p; int n;         /* Key/value from tree-cursor */
         2542  +          *pbStop = 1;
         2543  +          pCsr->flags |= CURSOR_SEEK_EQ;
         2544  +          rc = lsmTreeCursorKey(pTreeCsr, &pCsr->eType, &p, &n);
         2545  +          if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->key, p, n);
         2546  +          if( rc==LSM_OK ) rc = lsmTreeCursorValue(pTreeCsr, &p, &n);
         2547  +          if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->val, p, n);
  2274   2548           }
         2549  +        lsmTreeCursorReset(pTreeCsr);
  2275   2550           break;
         2551  +      }
  2276   2552         case LSM_SEEK_GE:
  2277   2553           if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
  2278   2554             lsmTreeCursorNext(pTreeCsr);
  2279   2555           }
  2280   2556           break;
  2281   2557         default:
  2282   2558           if( res>0 ){
  2283   2559             assert( lsmTreeCursorValid(pTreeCsr) );
  2284   2560             lsmTreeCursorPrev(pTreeCsr);
  2285   2561           }
  2286   2562           break;
  2287   2563       }
  2288   2564     }
         2565  +  return rc;
  2289   2566   }
  2290   2567   
  2291   2568   
  2292         -static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
  2293         -  int eType = pCsr->eType;
  2294         -
  2295         -  if( ((pCsr->flags & CURSOR_IGNORE_DELETE) && rtIsDelete(eType) && !bDeleteOk)
  2296         -   || ((pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(pCsr->eType)!=0)
  2297         -  ){
  2298         -    return 0;
  2299         -  }
  2300         -
  2301         -  return 1;
  2302         -}
  2303         -
  2304   2569   /*
  2305   2570   ** Seek the cursor.
  2306   2571   */
  2307   2572   int lsmMCursorSeek(MultiCursor *pCsr, void *pKey, int nKey, int eSeek){
  2308   2573     int eESeek = eSeek;             /* Effective eSeek parameter */
  2309         -  int rc = LSM_OK;
  2310         -  int iPtr = 0;
  2311         -  Pgno iPgno = 0; 
  2312         -  Level *pLvl;
         2574  +  int bStop = 0;                  /* Set to true to halt search operation */
         2575  +  int rc = LSM_OK;                /* Return code */
         2576  +  int iPtr = 0;                   /* Used to iterate through pCsr->aPtr[] */
         2577  +  Pgno iPgno = 0;                 /* FC pointer value */
  2313   2578   
  2314   2579     if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
         2580  +
  2315   2581     assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
  2316         -
  2317   2582     assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 );
  2318   2583     assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 );
  2319   2584     assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );
  2320   2585   
  2321         -  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  2322         -  treeCursorSeek(pCsr->apTreeCsr[0], pKey, nKey, eESeek);
  2323         -  treeCursorSeek(pCsr->apTreeCsr[1], pKey, nKey, eESeek);
         2586  +  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
         2587  +  rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop);
         2588  +  if( rc==LSM_OK && bStop==0 ){
         2589  +    rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
         2590  +  }
  2324   2591   
  2325   2592     /* Seek all segment pointers. */
  2326         -  for(pLvl=pCsr->pDb->pClient->pLevel; rc==LSM_OK && pLvl; pLvl=pLvl->pNext){
         2593  +  for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
  2327   2594       SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
  2328         -    iPtr += (1+pLvl->nRight);
  2329         -    assert( pPtr->pSeg==&pLvl->lhs );
  2330         -    rc = seekInLevel(pCsr, pLvl, pPtr, eESeek, pKey, nKey, &iPgno);
  2331         -  }
  2332         -
  2333         -  if( rc==LSM_OK ){
  2334         -    rc = multiCursorAllocTree(pCsr);
  2335         -  }
  2336         -  if( rc==LSM_OK ){
  2337         -    int i;
  2338         -    for(i=pCsr->nTree-1; i>0; i--){
  2339         -      multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
  2340         -    }
  2341         -    if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
  2342         -    if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
  2343         -  }
  2344         -
  2345         -  multiCursorCacheKey(pCsr, &rc);
  2346         -  if( rc==LSM_OK && 0==mcursorLocationOk(pCsr, eSeek==LSM_SEEK_LEFAST) ){
  2347         -    switch( eESeek ){
  2348         -      case LSM_SEEK_EQ:
  2349         -        lsmMCursorReset(pCsr);
  2350         -        break;
  2351         -      case LSM_SEEK_GE:
  2352         -        rc = lsmMCursorNext(pCsr);
  2353         -        break;
  2354         -      default:
  2355         -        rc = lsmMCursorPrev(pCsr);
  2356         -        break;
         2595  +    assert( pPtr->pSeg==&pPtr->pLevel->lhs );
         2596  +    rc = seekInLevel(pCsr, pPtr, eESeek, pKey, nKey, &iPgno, &bStop);
         2597  +    iPtr += pPtr->pLevel->nRight;
         2598  +  }
         2599  +
         2600  +  if( eSeek!=LSM_SEEK_EQ ){
         2601  +    if( rc==LSM_OK ){
         2602  +      rc = multiCursorAllocTree(pCsr);
         2603  +    }
         2604  +    if( rc==LSM_OK ){
         2605  +      int i;
         2606  +      for(i=pCsr->nTree-1; i>0; i--){
         2607  +        multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
         2608  +      }
         2609  +      if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
         2610  +      if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
         2611  +    }
         2612  +
         2613  +    multiCursorCacheKey(pCsr, &rc);
         2614  +    if( rc==LSM_OK && eSeek!=LSM_SEEK_LEFAST && 0==mcursorLocationOk(pCsr, 0) ){
         2615  +      switch( eESeek ){
         2616  +        case LSM_SEEK_EQ:
         2617  +          lsmMCursorReset(pCsr);
         2618  +          break;
         2619  +        case LSM_SEEK_GE:
         2620  +          rc = lsmMCursorNext(pCsr);
         2621  +          break;
         2622  +        default:
         2623  +          rc = lsmMCursorPrev(pCsr);
         2624  +          break;
         2625  +      }
  2357   2626       }
  2358   2627     }
  2359   2628   
  2360   2629     return rc;
  2361   2630   }
  2362   2631   
  2363   2632   int lsmMCursorValid(MultiCursor *pCsr){
  2364   2633     int res = 0;
  2365         -  if( pCsr->aTree ){
         2634  +  if( pCsr->flags & CURSOR_SEEK_EQ ){
         2635  +    res = 1;
         2636  +  }else if( pCsr->aTree ){
  2366   2637       int iKey = pCsr->aTree[1];
  2367   2638       if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
  2368   2639         res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
  2369   2640       }else{
  2370   2641         void *pKey; 
  2371   2642         multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
  2372   2643         res = pKey!=0;
................................................................................
  2388   2659   
  2389   2660     /* Check the current key value. If it is not greater than (if bReverse==0)
  2390   2661     ** or less than (if bReverse!=0) the key currently cached in pCsr->key, 
  2391   2662     ** then the cursor has not yet been successfully advanced.  
  2392   2663     */
  2393   2664     multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew);
  2394   2665     if( pNew ){
  2395         -    int res = rtTopic(eNewType) - rtTopic(pCsr->eType);
  2396         -    if( res==0 ){
  2397         -      res = pCsr->pDb->xCmp(pNew, nNew, pCsr->key.pData, pCsr->key.nData);
  2398         -    }
         2666  +    int res = sortedDbKeyCompare(pCsr->pDb->xCmp, 
         2667  +        eNewType, pNew, nNew, pCsr->eType, pCsr->key.pData, pCsr->key.nData
         2668  +    );
  2399   2669       if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){
  2400   2670         return 0;
  2401   2671       }
         2672  +
         2673  +    multiCursorCacheKey(pCsr, pRc);
         2674  +    assert( pCsr->eType==eNewType );
         2675  +
         2676  +    /* If this cursor is configured to skip deleted keys, and the current
         2677  +    ** cursor points to a SORTED_DELETE entry, then the cursor has not been 
         2678  +    ** successfully advanced.  
         2679  +    **
         2680  +    ** Similarly, if the cursor is configured to skip system keys and the
         2681  +    ** current cursor points to a system key, it has not yet been advanced.
         2682  +     */
         2683  +    if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
  2402   2684     }
  2403         -
  2404         -  multiCursorCacheKey(pCsr, pRc);
  2405         -  assert( pCsr->eType==eNewType );
  2406         -
  2407         -  /* If this cursor is configured to skip deleted keys, and the current
  2408         -  ** cursor points to a SORTED_DELETE entry, then the cursor has not been 
  2409         -  ** successfully advanced.  
  2410         -  **
  2411         -  ** Similarly, if the cursor is configured to skip system keys and the
  2412         -  ** current cursor points to a system key, it has not yet been advanced.
  2413         -  */
  2414         -  if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
  2415   2685     return 1;
  2416   2686   }
  2417   2687   
  2418   2688   static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
  2419   2689     int rc = LSM_OK;                /* Return Code */
  2420   2690     if( lsmMCursorValid(pCsr) ){
  2421   2691       do {
  2422   2692         int iKey = pCsr->aTree[1];
         2693  +
         2694  +      /* If this multi-cursor is advancing forwards, and the sub-cursor
         2695  +      ** being advanced is the one that separator keys may be being read
         2696  +      ** from, record the current absolute pointer value.  */
         2697  +      if( pCsr->pPrevMergePtr ){
         2698  +        if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
         2699  +          assert( pCsr->pBtCsr );
         2700  +          *pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr;
         2701  +        }else if( pCsr->pBtCsr==0 && pCsr->nPtr>0
         2702  +               && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1) 
         2703  +        ){
         2704  +          SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT];
         2705  +          *pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr;
         2706  +        }
         2707  +      }
         2708  +
  2423   2709         if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
  2424   2710           TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
  2425   2711           if( bReverse ){
  2426   2712             rc = lsmTreeCursorPrev(pTreeCsr);
  2427   2713           }else{
  2428   2714             rc = lsmTreeCursorNext(pTreeCsr);
  2429   2715           }
................................................................................
  2434   2720           pCsr->flags &= ~CURSOR_AT_FREELIST;
  2435   2721         }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
  2436   2722           assert( bReverse==0 && pCsr->pBtCsr );
  2437   2723           rc = btreeCursorNext(pCsr->pBtCsr);
  2438   2724         }else{
  2439   2725           rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
  2440   2726         }
  2441         -
  2442   2727         if( rc==LSM_OK ){
  2443   2728           int i;
  2444   2729           for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
  2445   2730             multiCursorDoCompare(pCsr, i, bReverse);
  2446   2731           }
  2447   2732         }
  2448   2733       }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
................................................................................
  2457   2742   
  2458   2743   int lsmMCursorPrev(MultiCursor *pCsr){
  2459   2744     if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
  2460   2745     return multiCursorAdvance(pCsr, 1);
  2461   2746   }
  2462   2747   
  2463   2748   int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){
  2464         -  int iKey = pCsr->aTree[1];
         2749  +  if( pCsr->flags & CURSOR_SEEK_EQ ){
         2750  +    *pnKey = pCsr->key.nData;
         2751  +    *ppKey = pCsr->key.pData;
         2752  +  }else{
         2753  +    int iKey = pCsr->aTree[1];
  2465   2754   
  2466         -  if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
  2467         -    TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
  2468         -    lsmTreeCursorKey(pTreeCsr, ppKey, pnKey);
  2469         -  }else{
  2470         -    int nKey;
         2755  +    if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
         2756  +      TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
         2757  +      lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey);
         2758  +    }else{
         2759  +      int nKey;
  2471   2760   
  2472   2761   #ifndef NDEBUG
  2473         -    void *pKey;
  2474         -    int eType;
  2475         -    multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
  2476         -    assert( eType==pCsr->eType );
  2477         -    assert( nKey==pCsr->key.nData );
  2478         -    assert( memcmp(pKey, pCsr->key.pData, nKey)==0 );
         2762  +      void *pKey;
         2763  +      int eType;
         2764  +      multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
         2765  +      assert( eType==pCsr->eType );
         2766  +      assert( nKey==pCsr->key.nData );
         2767  +      assert( memcmp(pKey, pCsr->key.pData, nKey)==0 );
  2479   2768   #endif
  2480   2769   
  2481         -    nKey = pCsr->key.nData;
  2482         -    if( nKey==0 ){
  2483         -      *ppKey = 0;
  2484         -    }else{
  2485         -      *ppKey = pCsr->key.pData;
         2770  +      nKey = pCsr->key.nData;
         2771  +      if( nKey==0 ){
         2772  +        *ppKey = 0;
         2773  +      }else{
         2774  +        *ppKey = pCsr->key.pData;
         2775  +      }
         2776  +      *pnKey = nKey; 
  2486   2777       }
  2487         -    *pnKey = nKey; 
  2488   2778     }
  2489   2779     return LSM_OK;
  2490   2780   }
  2491   2781   
  2492   2782   int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
  2493   2783     void *pVal;
  2494   2784     int nVal;
  2495   2785     int rc;
  2496         -
  2497         -  assert( pCsr->aTree );
  2498         -  assert( rtIsDelete(pCsr->eType)==0 || !(pCsr->flags & CURSOR_IGNORE_DELETE) );
  2499         -
  2500         -  rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
  2501         -  if( pVal && rc==LSM_OK ){
  2502         -    rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
         2786  +  if( pCsr->flags & CURSOR_SEEK_EQ ){
         2787  +    rc = LSM_OK;
         2788  +    nVal = pCsr->val.nData;
  2503   2789       pVal = pCsr->val.pData;
  2504         -  }
         2790  +  }else{
  2505   2791   
  2506         -  if( rc!=LSM_OK ){
  2507         -    pVal = 0;
  2508         -    nVal = 0;
         2792  +    assert( pCsr->aTree );
         2793  +    assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) );
         2794  +
         2795  +    rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
         2796  +    if( pVal && rc==LSM_OK ){
         2797  +      rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
         2798  +      pVal = pCsr->val.pData;
         2799  +    }
         2800  +
         2801  +    if( rc!=LSM_OK ){
         2802  +      pVal = 0;
         2803  +      nVal = 0;
         2804  +    }
  2509   2805     }
  2510   2806     *ppVal = pVal;
  2511   2807     *pnVal = nVal;
  2512   2808     return rc;
  2513   2809   }
  2514   2810   
  2515   2811   int lsmMCursorType(MultiCursor *pCsr, int *peType){
................................................................................
  2530   2826     int nKey;
  2531   2827     int eType;
  2532   2828   
  2533   2829     nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
  2534   2830     iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]);
  2535   2831     eType = aData[iOff++];
  2536   2832     assert( eType==0 
  2537         -       || eType==SORTED_SEPARATOR 
  2538         -       || eType==SORTED_SYSTEM_SEPARATOR 
         2833  +       || eType==(LSM_SYSTEMKEY|LSM_SEPARATOR) 
         2834  +       || eType==(LSM_SEPARATOR)
  2539   2835     );
  2540   2836   
  2541   2837     iOff += lsmVarintGet32(&aData[iOff], &nKey);
  2542   2838     iOff += lsmVarintGet32(&aData[iOff], &nKey);
  2543   2839   
  2544   2840     return iOff + (eType ? nKey : 0);
  2545   2841   }
................................................................................
  2856   3152     lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1);
  2857   3153   
  2858   3154     if( bIndirect ){
  2859   3155       aData[iOff++] = 0x00;
  2860   3156       iOff += lsmVarintPut32(&aData[iOff], iPtr);
  2861   3157       iOff += lsmVarintPut32(&aData[iOff], iKeyPg);
  2862   3158     }else{
  2863         -    aData[iOff++] = (u8)(iTopic | SORTED_SEPARATOR);
         3159  +    aData[iOff++] = (u8)(iTopic | LSM_SEPARATOR);
  2864   3160       iOff += lsmVarintPut32(&aData[iOff], iPtr);
  2865   3161       iOff += lsmVarintPut32(&aData[iOff], nKey);
  2866   3162       memcpy(&aData[iOff], pKey, nKey);
  2867   3163     }
  2868   3164   
  2869   3165     if( iLevel>0 ){
  2870   3166       int iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
................................................................................
  3002   3298     pSeg = &pMW->pLevel->lhs;
  3003   3299   
  3004   3300     pPg = pMW->pPage;
  3005   3301     aData = fsPageData(pPg, &nData);
  3006   3302     nRec = pageGetNRec(aData, nData);
  3007   3303     iFPtr = pageGetPtr(aData, nData);
  3008   3304   
  3009         -  /* If iPtr is 0, set it to the same value as the absolute pointer 
  3010         -  ** stored as part of the previous record.  */
  3011         -  if( iPtr==0 ){
  3012         -    iPtr = iFPtr;
  3013         -    if( nRec ) iPtr += pageGetRecordPtr(aData, nData, nRec-1);
  3014         -  }
  3015         -
  3016   3305     /* Calculate the relative pointer value to write to this record */
  3017   3306     iRPtr = iPtr - iFPtr;
  3018   3307     /* assert( iRPtr>=0 ); */
  3019   3308        
  3020   3309     /* Figure out how much space is required by the new record. The space
  3021   3310     ** required is divided into two sections: the header and the body. The
  3022   3311     ** header consists of the intial varint fields. The body are the blobs 
................................................................................
  3036   3325       nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
  3037   3326       if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);
  3038   3327   
  3039   3328       /* If the entire header will not fit on page pPg, or if page pPg is 
  3040   3329        ** marked read-only, advance to the next page of the output run. */
  3041   3330       iOff = pMerge->iOutputOff;
  3042   3331       if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
  3043         -      iFPtr = iFPtr + (nRec ? pageGetRecordPtr(aData, nData, nRec-1) : 0);
         3332  +      iFPtr = *pCsr->pPrevMergePtr;
  3044   3333         iRPtr = iPtr - iFPtr;
         3334  +
  3045   3335         iOff = 0;
  3046   3336         nRec = 0;
  3047   3337         rc = mergeWorkerNextPage(pMW, iFPtr);
  3048   3338         pPg = pMW->pPage;
  3049   3339       }
  3050   3340     }
  3051   3341   
................................................................................
  3211   3501         iFPtr = pageGetPtr(aData, nData);
  3212   3502         lsmFsPageRelease(pPg);
  3213   3503       }
  3214   3504     }
  3215   3505   
  3216   3506     if( rc==LSM_OK ){
  3217   3507       rc = mergeWorkerNextPage(pMW, iFPtr);
         3508  +    if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
  3218   3509     }
  3219   3510   
  3220   3511     return rc;
  3221   3512   }
         3513  +
         3514  +/*
         3515  +** The cursor passed as the first argument is being used as the input for
         3516  +** a merge operation. When this function is called, *piFlags contains the
         3517  +** database entry flags for the current entry. The entry about to be written
         3518  +** to the output.
         3519  +**
         3520  +*/
         3521  +static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){
         3522  +  int f = *piFlags;
         3523  +  int iKey = pCsr->aTree[1];
         3524  +  int i;
         3525  +
         3526  +  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
         3527  +    /* The ignore-delete flag is set when the output of the merge will form
         3528  +    ** the oldest level in the database. In this case there is no point in
         3529  +    ** retaining any range-delete flags.  */
         3530  +    assert( (f & LSM_POINT_DELETE)==0 );
         3531  +    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
         3532  +  }else{
         3533  +    if( iKey==0 ){
         3534  +      int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
         3535  +      if( btreeflags & LSM_END_DELETE ){
         3536  +        f |= (LSM_START_DELETE|LSM_END_DELETE);
         3537  +      }
         3538  +    }
         3539  +
         3540  +    for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){
         3541  +      if( pCsr->aPtr[i].eType & LSM_END_DELETE ){
         3542  +        f |= (LSM_START_DELETE|LSM_END_DELETE);
         3543  +      }
         3544  +    }
         3545  +
         3546  +    if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){
         3547  +      f = 0;
         3548  +    }
         3549  +  }
         3550  +
         3551  +  *piFlags = f;
         3552  +}
  3222   3553   
  3223   3554   static int mergeWorkerStep(MergeWorker *pMW){
  3224   3555     lsm_db *pDb = pMW->pDb;       /* Database handle */
  3225   3556     MultiCursor *pCsr;            /* Cursor to read input data from */
  3226   3557     int rc = LSM_OK;              /* Return code */
  3227   3558     int eType;                    /* SORTED_SEPARATOR, WRITE or DELETE */
  3228   3559     void *pKey; int nKey;         /* Key */
  3229   3560     Segment *pSeg;                /* Output segment */
  3230         -  int iPtr = 0;
         3561  +  Pgno iPtr;
  3231   3562   
  3232   3563     pCsr = pMW->pCsr;
  3233   3564     pSeg = &pMW->pLevel->lhs;
  3234   3565   
  3235   3566     /* Pull the next record out of the source cursor. */
  3236   3567     lsmMCursorKey(pCsr, &pKey, &nKey);
  3237   3568     eType = pCsr->eType;
................................................................................
  3238   3569   
  3239   3570     /* Figure out if the output record may have a different pointer value
  3240   3571     ** than the previous. This is the case if the current key is identical to
  3241   3572     ** a key that appears in the lowest level run being merged. If so, set 
  3242   3573     ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  3243   3574     ** indicating that the output pointer value should be a copy of the pointer 
  3244   3575     ** value written with the previous key.  */
         3576  +  iPtr = (pCsr->pPrevMergePtr ? *pCsr->pPrevMergePtr : 0);
  3245   3577     if( pCsr->pBtCsr ){
  3246   3578       BtreeCursor *pBtCsr = pCsr->pBtCsr;
  3247   3579       if( pBtCsr->pKey ){
  3248   3580         int res = rtTopic(pBtCsr->eType) - rtTopic(eType);
  3249   3581         if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey);
  3250   3582         if( 0==res ) iPtr = pBtCsr->iPtr;
  3251         -
  3252   3583         assert( res>=0 );
  3253   3584       }
  3254   3585     }else if( pCsr->nPtr ){
  3255   3586       SegmentPtr *pPtr = &pCsr->aPtr[pCsr->nPtr-1];
  3256   3587       if( pPtr->pPg
  3257   3588        && 0==pDb->xCmp(pPtr->pKey, pPtr->nKey, pKey, nKey)
  3258   3589       ){
  3259   3590         iPtr = pPtr->iPtr+pPtr->iPgPtr;
  3260   3591       }
  3261   3592     }
  3262   3593   
  3263         -  if( pMW->aGobble ){
  3264         -    int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
  3265         -    if( iGobble<pCsr->nPtr ){
  3266         -      SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
  3267         -      if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
  3268         -        pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
  3269         -      }
  3270         -    }
  3271         -  }
  3272         -
  3273         -  /* If this is a separator key and we know that the output pointer has not
  3274         -  ** changed, there is no point in writing an output record. Otherwise,
  3275         -  ** proceed. */
  3276         -  if( rtIsSeparator(eType)==0 || iPtr!=0 ){
  3277         -    int iSPtr = 0;                /* Separators require a pointer here */
  3278         -
  3279         -    if( pMW->pPage==0 ){
  3280         -      rc = mergeWorkerFirstPage(pMW);
  3281         -    }
  3282         -
  3283         -    /* Write the record into the main run. */
  3284         -    if( rc==LSM_OK ){
  3285         -      rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr, &iSPtr);
         3594  +  mergeRangeDeletes(pCsr, &eType);
         3595  +
         3596  +  if( eType!=0 ){
         3597  +    if( pMW->aGobble ){
         3598  +      int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
         3599  +      if( iGobble<pCsr->nPtr ){
         3600  +        SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
         3601  +        if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
         3602  +          pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
         3603  +        }
         3604  +      }
         3605  +    }
         3606  +
         3607  +    /* If this is a separator key and we know that the output pointer has not
         3608  +    ** changed, there is no point in writing an output record. Otherwise,
         3609  +    ** proceed. */
         3610  +    if( rtIsSeparator(eType)==0 || iPtr!=0 ){
         3611  +      int iSPtr = 0;                /* Separators require a pointer here */
         3612  +
         3613  +      if( pMW->pPage==0 ){
         3614  +        rc = mergeWorkerFirstPage(pMW);
         3615  +      }
         3616  +
         3617  +      /* Write the record into the main run. */
         3618  +      if( rc==LSM_OK ){
         3619  +        rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr, &iSPtr);
         3620  +      }
  3286   3621       }
  3287   3622     }
  3288   3623   
  3289   3624     /* Advance the cursor to the next input record (assuming one exists). */
  3290   3625     assert( lsmMCursorValid(pMW->pCsr) );
  3291   3626     if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);
  3292   3627   
................................................................................
  3341   3676     int *pnWrite                    /* OUT: Number of database pages written */
  3342   3677   ){
  3343   3678     int rc = LSM_OK;                /* Return Code */
  3344   3679     MultiCursor *pCsr = 0;
  3345   3680     Level *pNext = 0;               /* The current top level */
  3346   3681     Level *pNew;                    /* The new level itself */
  3347   3682     Segment *pDel = 0;              /* Delete separators from this segment */
  3348         -  int iLeftPtr = 0;
         3683  +  Pgno iLeftPtr = 0;
  3349   3684     int nWrite = 0;                 /* Number of database pages written */
  3350   3685   
  3351   3686     assert( pnOvfl );
  3352   3687   
  3353   3688     /* Allocate the new level structure to write to. */
  3354   3689     pNext = lsmDbSnapshotLevel(pDb->pWorker);
  3355   3690     pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
................................................................................
  3382   3717       memset(&merge, 0, sizeof(Merge));
  3383   3718       memset(&mergeworker, 0, sizeof(MergeWorker));
  3384   3719   
  3385   3720       pNew->pMerge = &merge;
  3386   3721       mergeworker.pDb = pDb;
  3387   3722       mergeworker.pLevel = pNew;
  3388   3723       mergeworker.pCsr = pCsr;
         3724  +    pCsr->pPrevMergePtr = &iLeftPtr;
  3389   3725   
  3390   3726       /* Mark the separators array for the new level as a "phantom". */
  3391   3727       mergeworker.bFlush = 1;
  3392   3728   
  3393   3729       /* Allocate the first page of the output segment. */
  3394   3730       rc = mergeWorkerNextPage(&mergeworker, iLeftPtr);
  3395   3731   
................................................................................
  3413   3749     }
  3414   3750   
  3415   3751     if( rc==LSM_OK ){
  3416   3752       sortedInvokeWorkHook(pDb);
  3417   3753     }
  3418   3754   
  3419   3755   #if 0
  3420         -  lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
         3756  +  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
  3421   3757   #endif
  3422   3758   
  3423   3759     if( pnWrite ) *pnWrite = nWrite;
  3424   3760     pDb->pWorker->nWrite += nWrite;
  3425   3761     return rc;
  3426   3762   }
  3427   3763   
................................................................................
  3581   3917     pMW->pCsr = pCsr;
  3582   3918   
  3583   3919     /* Load the current output page into memory. */
  3584   3920     if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW);
  3585   3921   
  3586   3922     /* Position the cursor. */
  3587   3923     if( rc==LSM_OK ){
         3924  +    pCsr->pPrevMergePtr = &pMerge->iCurrentPtr;
  3588   3925       if( pMW->pPage==0 ){
  3589   3926         /* The output array is still empty. So position the cursor at the very 
  3590   3927         ** start of the input.  */
  3591   3928         rc = multiCursorEnd(pCsr, 0);
  3592   3929       }else{
  3593   3930         /* The output array is non-empty. Position the cursor based on the
  3594   3931         ** page/cell data saved in the Merge.aInput[] array.  */
................................................................................
  3786   4123               SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
  3787   4124               if( pGobble->pSeg->iRoot ){
  3788   4125                 rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
  3789   4126               }else if( mergeworker.aGobble[i] ){
  3790   4127                 lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
  3791   4128               }
  3792   4129             }
  3793         -#if 0
  3794         -          int iGobble = mergeworker.pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
  3795         -          if( iGobble<pLevel->nRight ){
  3796         -            SegmentPtr *pGobble = &mergeworker.pCsr->aSegCsr[iGobble].aPtr[0];
  3797         -            if( pGobble->pSeg->iRoot ){
  3798         -              rc = sortedBtreeGobble(pDb, mergeworker.pCsr, iGobble);
  3799         -            }else if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
  3800         -              Pgno iPg = lsmFsPageNumber(pGobble->pPg);
  3801         -              lsmFsGobble(pDb, pGobble->pSeg, &iPg, 1);
  3802         -            }
  3803         -          }
  3804         -#endif
  3805         -
  3806   4130           }else if( pLevel->lhs.iFirst==0 ){
  3807   4131             /* If the new level is completely empty, remove it from the 
  3808   4132             ** database snapshot. This can only happen if all input keys were
  3809   4133             ** annihilated. Since keys are only annihilated if the new level
  3810   4134             ** is the last in the linked list (contains the most ancient of
  3811   4135             ** database content), this guarantees that pLevel->pNext==0.  */ 
  3812   4136   
................................................................................
  3848   4172   
  3849   4173         /* Clean up the MergeWorker object initialized above. If no error
  3850   4174         ** has occurred, invoke the work-hook to inform the application that
  3851   4175         ** the database structure has changed. */
  3852   4176         mergeWorkerShutdown(&mergeworker, &rc);
  3853   4177         if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);
  3854   4178   
         4179  +#if 0
         4180  +      lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work");
         4181  +#endif
         4182  +      assertRunInOrder(pDb, &pLevel->lhs);
         4183  +
  3855   4184         /* If bFlush is true and the database is no longer considered "full",
  3856   4185         ** break out of the loop even if nRemaining is still greater than
  3857   4186         ** zero. The caller has an in-memory tree to flush to disk.  */
  3858   4187         if( bFlush && sortedDbIsFull(pDb)==0 ) break;
  3859         -
  3860         -      assertRunInOrder(pDb, &pLevel->lhs);
  3861         -
  3862         -#if 0
  3863         -      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
  3864         -#endif
  3865   4188       }
  3866   4189     }
  3867   4190   
  3868   4191     if( pnWrite ) *pnWrite = (nWork - nRemaining);
  3869   4192     pWorker->nWrite += (nWork - nRemaining);
  3870   4193   
  3871   4194   #ifdef LSM_LOG_WORK
  3872   4195     lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
  3873   4196   #endif
  3874   4197     return rc;
  3875   4198   }
  3876   4199   
  3877         -typedef struct Metric Metric;
  3878         -struct Metric {
  3879         -  double fAvgHeight;
  3880         -  int nTotalSz;
  3881         -  int nMinSz;
  3882         -};
  3883         -
  3884         -#if 0
  3885         -static void sortedMeasureDb(lsm_db *pDb, Metric *p){
  3886         -  Level *pLevel;                  /* Used to iterate through levels */
  3887         -  assert( pDb->pWorker );
  3888         -
  3889         -  double fHeight = 0.0;
  3890         -  int nTotalSz = 0;
  3891         -  int nMinSz = 0;
  3892         -
  3893         -  for(pLevel=lsmDbSnapshotLevel(pDb->pWorker); pLevel; pLevel=pLevel->pNext){
  3894         -    const int nLhsSz = pLevel->lhs.run.nSize;
  3895         -    int nRhsSz = 0;
  3896         -    int nSz;
  3897         -    int i;
  3898         -
  3899         -    for(i=0; i<pLevel->nRight; i++){
  3900         -      nRhsSz += pLevel->aRhs[i].run.nSize;
  3901         -    }
  3902         -
  3903         -    nSz = nRhsSz + nLhsSz;
  3904         -    fHeight += (double)nLhsSz/nSz + (double)nRhsSz/nSz * pLevel->nRight;
  3905         -    nTotalSz += nSz;
  3906         -
  3907         -    if( nMinSz==0 || nMinSz>nSz ) nMinSz = nSz;
  3908         -  }
  3909         -
  3910         -  if( nMinSz ){
  3911         -    printf("avg-height=%.2f log2(min/total)=%.2f totalsz=%d minsz=%d\n", 
  3912         -        fHeight, 
  3913         -        log((double)nTotalSz / nMinSz) / log(2),
  3914         -        nTotalSz,
  3915         -        nMinSz
  3916         -    );
  3917         -  }
  3918         -}
  3919         -#endif
  3920         -
  3921   4200   /*
  3922   4201   ** The database connection passed as the first argument must be a worker
  3923   4202   ** connection. This function checks if there exists an "old" in-memory tree
  3924   4203   ** ready to be flushed to disk. If so, *pbOut is set to true before 
  3925   4204   ** returning. Otherwise false.
  3926   4205   **
  3927   4206   ** Normally, LSM_OK is returned. Or, if an error occurs, an LSM error code.
................................................................................
  4354   4633       }else{
  4355   4634         lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.');
  4356   4635       }
  4357   4636     }
  4358   4637     return LSM_OK;
  4359   4638   }
  4360   4639   
  4361         -int lsmInfoPageDump(lsm_db *pDb, Pgno iPg, int bHex, char **pzOut){
         4640  +#define INFO_PAGE_DUMP_DATA   0x01
         4641  +#define INFO_PAGE_DUMP_VALUES 0x02
         4642  +#define INFO_PAGE_DUMP_HEX    0x04
         4643  +
         4644  +static int infoPageDump(
         4645  +  lsm_db *pDb,                    /* Database handle */
         4646  +  Pgno iPg,                       /* Page number of page to dump */
         4647  +  int flags,
         4648  +  char **pzOut                    /* OUT: lsmMalloc'd string */
         4649  +){
  4362   4650     int rc = LSM_OK;                /* Return code */
  4363   4651     Page *pPg = 0;                  /* Handle for page iPg */
  4364   4652     int i, j;                       /* Loop counters */
  4365   4653     const int perLine = 16;         /* Bytes per line in the raw hex dump */
         4654  +
         4655  +  int bValues = (flags & INFO_PAGE_DUMP_VALUES);
         4656  +  int bHex = (flags & INFO_PAGE_DUMP_HEX);
         4657  +  int bData = (flags & INFO_PAGE_DUMP_DATA);
  4366   4658   
  4367   4659     *pzOut = 0;
  4368   4660     if( iPg==0 ) return LSM_ERROR;
  4369   4661   
  4370   4662     rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg);
  4371   4663     if( rc==LSM_OK ){
  4372   4664       Blob blob = {0, 0, 0, 0};
................................................................................
  4400   4692       for(iCell=0; iCell<nRec; iCell++){
  4401   4693         u8 *aKey; int nKey = 0;       /* Key */
  4402   4694         u8 *aVal; int nVal = 0;       /* Value */
  4403   4695         int iPgPtr;
  4404   4696         int eType;
  4405   4697         char cType = '?';
  4406   4698         Pgno iAbsPtr;
         4699  +      char zFlags[8];
  4407   4700   
  4408   4701         infoCellDump(pDb, pPg, iCell, &eType, &iPgPtr,
  4409   4702             &aKey, &nKey, &aVal, &nVal, &blob
  4410   4703         );
  4411   4704         iAbsPtr = iPgPtr + ((flags & SEGMENT_BTREE_FLAG) ? 0 : iPtr);
  4412   4705   
  4413         -      if( rtIsDelete(eType) ) cType = 'D';
  4414         -      if( rtIsWrite(eType) ) cType = 'W';
  4415         -      if( rtIsSeparator(eType) ) cType = 'S';
  4416         -      lsmStringAppendf(&str, "%c %d (%s) ", 
  4417         -          cType, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
         4706  +      lsmFlagsToString(eType, zFlags);
         4707  +      lsmStringAppendf(&str, "%s %d (%s) ", 
         4708  +          zFlags, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
  4418   4709         );
  4419   4710         infoAppendBlob(&str, bHex, aKey, nKey); 
  4420         -      if( nVal>0 ){
         4711  +      if( nVal>0 && bValues ){
  4421   4712           lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), "");
  4422   4713           lsmStringAppendf(&str, " ");
  4423   4714           infoAppendBlob(&str, bHex, aVal, nVal); 
  4424   4715         }
  4425   4716         lsmStringAppendf(&str, "\n");
  4426   4717       }
  4427   4718   
  4428         -    lsmStringAppendf(&str, "\n-------------------" 
  4429         -       "-------------------------------------------------------------\n");
  4430         -    lsmStringAppendf(&str, "Page %d\n",
  4431         -                     iPg, (iPg-1)*nData, iPg*nData - 1);
  4432         -    for(i=0; i<nData; i += perLine){
  4433         -      lsmStringAppendf(&str, "%04x: ", i);
  4434         -      for(j=0; j<perLine; j++){
  4435         -        if( i+j>nData ){
  4436         -          lsmStringAppendf(&str, "   ");
  4437         -        }else{
  4438         -          lsmStringAppendf(&str, "%02x ", aData[i+j]);
         4719  +    if( bData ){
         4720  +      lsmStringAppendf(&str, "\n-------------------" 
         4721  +          "-------------------------------------------------------------\n");
         4722  +      lsmStringAppendf(&str, "Page %d\n",
         4723  +          iPg, (iPg-1)*nData, iPg*nData - 1);
         4724  +      for(i=0; i<nData; i += perLine){
         4725  +        lsmStringAppendf(&str, "%04x: ", i);
         4726  +        for(j=0; j<perLine; j++){
         4727  +          if( i+j>nData ){
         4728  +            lsmStringAppendf(&str, "   ");
         4729  +          }else{
         4730  +            lsmStringAppendf(&str, "%02x ", aData[i+j]);
         4731  +          }
         4732  +        }
         4733  +        lsmStringAppendf(&str, "  ");
         4734  +        for(j=0; j<perLine; j++){
         4735  +          if( i+j>nData ){
         4736  +            lsmStringAppendf(&str, " ");
         4737  +          }else{
         4738  +            lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.');
         4739  +          }
  4439   4740           }
         4741  +        lsmStringAppendf(&str,"\n");
  4440   4742         }
  4441         -      lsmStringAppendf(&str, "  ");
  4442         -      for(j=0; j<perLine; j++){
  4443         -        if( i+j>nData ){
  4444         -          lsmStringAppendf(&str, " ");
  4445         -        }else{
  4446         -          lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.');
  4447         -        }
  4448         -      }
  4449         -      lsmStringAppendf(&str,"\n");
  4450   4743       }
  4451   4744   
  4452   4745       *pzOut = str.z;
  4453   4746       sortedBlobFree(&blob);
  4454   4747       lsmFsPageRelease(pPg);
  4455   4748     }
  4456   4749   
  4457   4750     return rc;
  4458   4751   }
         4752  +
         4753  +int lsmInfoPageDump(
         4754  +  lsm_db *pDb,                    /* Database handle */
         4755  +  Pgno iPg,                       /* Page number of page to dump */
         4756  +  int bHex,                       /* True to output key/value in hex form */
         4757  +  char **pzOut                    /* OUT: lsmMalloc'd string */
         4758  +){
         4759  +  int flags = INFO_PAGE_DUMP_DATA | INFO_PAGE_DUMP_VALUES;
         4760  +  if( bHex ) flags |= INFO_PAGE_DUMP_HEX;
         4761  +  return infoPageDump(pDb, iPg, flags, pzOut);
         4762  +}
  4459   4763   
  4460   4764   void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){
  4461   4765     assert( pDb->xLog );
  4462   4766     if( pRun && pRun->iFirst ){
         4767  +    int flags = (bVals ? INFO_PAGE_DUMP_VALUES : 0);
  4463   4768       char *zSeg;
  4464   4769       Page *pPg;
  4465   4770   
  4466   4771       zSeg = segToString(pDb->pEnv, pRun, 0);
  4467   4772       lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
  4468   4773       lsmFree(pDb->pEnv, zSeg);
  4469   4774   
  4470   4775       lsmFsDbPageGet(pDb->pFS, pRun->iFirst, &pPg);
  4471   4776       while( pPg ){
  4472   4777         Page *pNext;
         4778  +      char *z = 0;
         4779  +      infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z);
         4780  +      lsmLogMessage(pDb, LSM_OK, "%s", z);
         4781  +      lsmFree(pDb->pEnv, z);
         4782  +#if 0
  4473   4783         sortedDumpPage(pDb, pRun, pPg, bVals);
         4784  +#endif
  4474   4785         lsmFsDbPageNext(pRun, pPg, 1, &pNext);
  4475   4786         lsmFsPageRelease(pPg);
  4476   4787         pPg = pNext;
  4477   4788       }
  4478   4789     }
  4479   4790   }
  4480   4791   

Changes to src/lsm_tree.c.

    96     96   
    97     97   struct TreeOld {
    98     98     u32 iShmid;                     /* Last shared-memory chunk in use by old */
    99     99     u32 iRoot;                      /* Offset of root node in shm file */
   100    100     u32 nHeight;                    /* Height of tree structure */
   101    101   };
   102    102   
          103  +#ifndef NDEBUG
          104  +/*
          105  +** assert() that a TreeKey.flags value is sane. Usage:
          106  +**
          107  +**   assert( assertFlagsOk(pTreeKey->flags) );
          108  +*/
          109  +static int assertFlagsOk(u8 keyflags){
          110  +  /* At least one flag must be set. Otherwise, what is this key doing? */
          111  +  assert( keyflags!=0 );
          112  +
          113  +  /* The POINT_DELETE and INSERT flags cannot both be set. */
          114  +  assert( (keyflags & LSM_POINT_DELETE)==0 || (keyflags & LSM_INSERT)==0 );
          115  +
          116  +  /* If both the START_DELETE and END_DELETE flags are set, then the INSERT
          117  +  ** flag must also be set. In other words - the three DELETE flags cannot
          118  +  ** all be set */
          119  +  assert( (keyflags & LSM_END_DELETE)==0 
          120  +       || (keyflags & LSM_START_DELETE)==0 
          121  +       || (keyflags & LSM_POINT_DELETE)==0 
          122  +  );
          123  +
          124  +  return 1;
          125  +}
          126  +
          127  +static int assert_delete_ranges_match(lsm_db *);
          128  +static int treeCountEntries(lsm_db *db);
          129  +#else
          130  +# define assertFlagsOk(x)
          131  +#endif
          132  +
   103    133   /*
   104    134   ** Container for a key-value pair. Within the *-shm file, each key/value
   105    135   ** pair is stored in a single allocation (which may not actually be 
   106    136   ** contiguous in memory). Layout is the TreeKey structure, followed by
   107    137   ** the nKey bytes of key blob, followed by the nValue bytes of value blob
   108    138   ** (if nValue is non-negative).
   109    139   */
   110    140   struct TreeKey {
   111    141     int nKey;                       /* Size of pKey in bytes */
   112    142     int nValue;                     /* Size of pValue. Or negative. */
          143  +  u8 flags;                       /* Various LSM_XXX flags */
   113    144   };
   114    145   
   115    146   #define TK_KEY(p) ((void *)&(p)[1])
   116    147   #define TK_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))
   117    148   
   118    149   /*
   119    150   ** A single tree node. A node structure may contain up to 3 key/value
................................................................................
   281    312     return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy);
   282    313   }
   283    314   
   284    315   static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
   285    316     return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, pRc);
   286    317   }
   287    318   
          319  +
          320  +#ifndef NDEBUG
          321  +static void assertIsWorkingChild(
          322  +  lsm_db *db, 
          323  +  TreeNode *pNode, 
          324  +  TreeNode *pParent, 
          325  +  int iCell
          326  +){
          327  +  TreeNode *p;
          328  +  int rc = LSM_OK;
          329  +  u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
          330  +  p = treeShmptr(db, iPtr, &rc);
          331  +  assert( p==pNode || rc!=LSM_OK );
          332  +}
          333  +#else
          334  +# define assertIsWorkingChild(w,x,y,z)
          335  +#endif
          336  +
   288    337   /* Values for the third argument to treeShmkey(). */
   289    338   #define TK_LOADKEY  1
   290    339   #define TK_LOADVAL  2
   291    340   
   292    341   static TreeKey *treeShmkey(
   293    342     lsm_db *pDb,                    /* Database handle */
   294    343     u32 iPtr,                       /* Shmptr to TreeKey struct */
................................................................................
   362    411   **   * todo...
   363    412   */
   364    413   void assert_tree_looks_ok(int rc, Tree *pTree){
   365    414   }
   366    415   #else
   367    416   # define assert_tree_looks_ok(x,y)
   368    417   #endif
          418  +
          419  +void lsmFlagsToString(int flags, char *zFlags){
          420  +
          421  +  zFlags[0] = (flags & LSM_END_DELETE)   ? ']' : '.';
          422  +
          423  +  /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever
          424  +  ** be set. If this is not true, write a '?' to the output.  */
          425  +  switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){
          426  +    case 0:                zFlags[1] = '.'; break;
          427  +    case LSM_POINT_DELETE: zFlags[1] = '-'; break;
          428  +    case LSM_INSERT:       zFlags[1] = '+'; break;
          429  +    case LSM_SEPARATOR:    zFlags[1] = '^'; break;
          430  +    default:               zFlags[1] = '?'; break;
          431  +  }
          432  +
          433  +  zFlags[2] = (flags & LSM_SYSTEMKEY)    ? '*' : '.';
          434  +  zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.';
          435  +  zFlags[4] = '\0';
          436  +}
   369    437   
   370    438   #ifdef LSM_DEBUG
   371    439   
   372    440   /*
   373    441   ** Pointer pBlob points to a buffer containing a blob of binary data
   374    442   ** nBlob bytes long. Append the contents of this blob to *pStr, with
   375    443   ** each octet represented by a 2-digit hexadecimal number. For example,
................................................................................
   380    448     int i;
   381    449     lsmStringExtend(pStr, nBlob*2);
   382    450     if( pStr->nAlloc==0 ) return;
   383    451     for(i=0; i<nBlob; i++){
   384    452       u8 c = ((u8*)pBlob)[i];
   385    453       if( c>='a' && c<='z' ){
   386    454         pStr->z[pStr->n++] = c;
   387         -    }else{
          455  +    }else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){
   388    456         pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
   389    457         pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
   390    458       }
   391    459     }
   392    460     pStr->z[pStr->n] = 0;
   393    461   }
   394    462   
................................................................................
   396    464   ** Append nIndent space (0x20) characters to string *pStr.
   397    465   */
   398    466   static void lsmAppendIndent(LsmString *pStr, int nIndent){
   399    467     int i;
   400    468     lsmStringExtend(pStr, nIndent);
   401    469     for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
   402    470   }
          471  +
          472  +static void strAppendFlags(LsmString *pStr, u8 flags){
          473  +  char zFlags[8];
          474  +
          475  +  lsmFlagsToString(flags, zFlags);
          476  +  zFlags[4] = ':';
          477  +
          478  +  lsmStringAppend(pStr, zFlags, 5);
          479  +}
   403    480   
   404    481   void dump_node_contents(
   405    482     lsm_db *pDb,
   406         -  u32 iNode,                      /* Print out hte contents of this node */
   407         -  int nIndent,                    /* Number of spaces indentation */
          483  +  u32 iNode,                      /* Print out the contents of this node */
          484  +  char *zPath,                    /* Path from root to this node */
          485  +  int nPath,                      /* Number of bytes in zPath */
   408    486     int nHeight                     /* Height: (0==leaf) (1==parent-of-leaf) */
   409    487   ){
          488  +  const char *zSpace = "                                           ";
   410    489     int i;
   411    490     int rc = LSM_OK;
   412    491     LsmString s;
   413    492     TreeNode *pNode;
   414    493     TreeBlob b = {0, 0};
   415    494   
   416         -  /* Append the nIndent bytes of space to string s. */
   417         -  lsmStringInit(&s, pDb->pEnv);
   418         -  if( nIndent ) lsmAppendIndent(&s, nIndent);
   419         -
   420    495     pNode = (TreeNode *)treeShmptr(pDb, iNode, &rc);
   421    496   
   422         -  /* Append each key to string s. */
   423         -  for(i=0; i<3; i++){
   424         -    u32 iPtr = pNode->aiKeyPtr[i];
   425         -    if( iPtr ){
   426         -      TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b, &rc);
   427         -      lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
   428         -      lsmStringAppend(&s, "     ", -1);
   429         -    }
   430         -  }
   431         -
   432         -  printf("%s\n", s.z);
   433         -  lsmStringClear(&s);
   434         -
   435         -  for(i=0; i<4 && nHeight>0; i++){
   436         -    u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
   437         -    if( iPtr ){
   438         -      dump_node_contents(pDb, iPtr, nIndent + 2, nHeight-1);
          497  +  if( nHeight==0 ){
          498  +    /* Append the nIndent bytes of space to string s. */
          499  +    lsmStringInit(&s, pDb->pEnv);
          500  +
          501  +    /* Append each key to string s. */
          502  +    for(i=0; i<3; i++){
          503  +      u32 iPtr = pNode->aiKeyPtr[i];
          504  +      if( iPtr ){
          505  +        TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b,&rc);
          506  +        strAppendFlags(&s, pKey->flags);
          507  +        lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
          508  +        lsmStringAppend(&s, "     ", -1);
          509  +      }
          510  +    }
          511  +
          512  +    printf("% 6d %.*sleaf%.*s: %s\n", 
          513  +        iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
          514  +    );
          515  +    lsmStringClear(&s);
          516  +  }else{
          517  +    for(i=0; i<4 && nHeight>0; i++){
          518  +      u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
          519  +      zPath[nPath] = i+'0';
          520  +      zPath[nPath+1] = '/';
          521  +
          522  +      if( iPtr ){
          523  +        dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
          524  +      }
          525  +      if( i!=3 && pNode->aiKeyPtr[i] ){
          526  +        TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b,&rc);
          527  +        lsmStringInit(&s, pDb->pEnv);
          528  +        strAppendFlags(&s, pKey->flags);
          529  +        lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
          530  +        printf("% 6d %.*s%.*s: %s\n", 
          531  +            iNode, nPath+1, zPath, 20-nPath-1, zSpace, s.z);
          532  +        lsmStringClear(&s);
          533  +      }
   439    534       }
   440    535     }
   441    536   
   442    537     tblobFree(pDb, &b);
   443    538   }
   444    539   
   445    540   void dump_tree_contents(lsm_db *pDb, const char *zCaption){
          541  +  char zPath[64];
   446    542     TreeRoot *p = &pDb->treehdr.root;
   447    543     printf("\n%s\n", zCaption);
          544  +  zPath[0] = '/';
   448    545     if( p->iRoot ){
   449         -    dump_node_contents(pDb, p->iRoot, 0, p->nHeight-1);
          546  +    dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1);
   450    547     }
   451    548     fflush(stdout);
   452    549   }
   453    550   
   454    551   #endif
   455    552   
   456    553   /*
................................................................................
   469    566   }
   470    567   
   471    568   /*
   472    569   ** Return a pointer to the mapping of the TreeKey object that the cursor
   473    570   ** is pointing to. 
   474    571   */
   475    572   static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
   476         -  return (TreeKey *)treeShmkey(pCsr->pDb,
          573  +  TreeKey *pRet = (TreeKey *)treeShmkey(pCsr->pDb,
   477    574         pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], 
   478    575         TK_LOADVAL, pBlob, pRc
   479    576     );
          577  +  assert( pRet==0 || assertFlagsOk(pRet->flags) );
          578  +  return pRet;
   480    579   }
   481    580   
   482    581   /*
   483    582   ** Save the current position of tree cursor pCsr.
   484    583   */
   485    584   int lsmTreeCursorSave(TreeCursor *pCsr){
   486    585     int rc = LSM_OK;
................................................................................
  1074   1173     u32 iShmid;
  1075   1174     ShmChunk *p;
  1076   1175   
  1077   1176     p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
  1078   1177     iShmid = p->iShmid;
  1079   1178     while( rc==LSM_OK && p ){
  1080   1179       if( p->iNext ){
  1081         -      ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
  1082         -      if( rc==LSM_OK ){
  1083         -        if( pNext->iShmid!=p->iShmid+1 ){
  1084         -          rc = LSM_CORRUPT_BKPT;
         1180  +      if( p->iNext>=db->treehdr.nChunk ){
         1181  +        rc = LSM_CORRUPT_BKPT;
         1182  +      }else{
         1183  +        ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
         1184  +        if( rc==LSM_OK ){
         1185  +          if( pNext->iShmid!=p->iShmid+1 ){
         1186  +            rc = LSM_CORRUPT_BKPT;
         1187  +          }
         1188  +          p = pNext;
  1085   1189           }
  1086         -        p = pNext;
  1087   1190         }
  1088   1191       }else{
  1089   1192         p = 0;
  1090   1193       }
  1091   1194       nVisit++;
  1092   1195     }
  1093   1196   
................................................................................
  1163   1266       nSort = 1;
  1164   1267       while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
  1165   1268       nByte = sizeof(ShmChunkLoc) * nSort * 2;
  1166   1269       aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
  1167   1270       iPrevShmid = pMin->iShmid;
  1168   1271   
  1169   1272       /* Fix all shm-ids, if required. */
  1170         -    if( rc==LSM_OK && iMin!=db->treehdr.iFirst ){
         1273  +    if( rc==LSM_OK ){
  1171   1274         iPrevShmid = pMin->iShmid-1;
  1172   1275         for(i=1; i<db->treehdr.nChunk; i++){
  1173   1276           p = treeShmChunk(db, i);
  1174   1277           aSort[i-1].pShm = p;
  1175   1278           aSort[i-1].iLoc = i;
  1176   1279           if( i!=db->treehdr.iFirst ){
  1177   1280             if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
  1178   1281               p->iShmid = iPrevShmid--;
  1179   1282             }
  1180   1283           }
  1181   1284         }
  1182         -      p = treeShmChunk(db, db->treehdr.iFirst);
  1183         -      p->iShmid = iPrevShmid;
         1285  +      if( iMin!=db->treehdr.iFirst ){
         1286  +        p = treeShmChunk(db, db->treehdr.iFirst);
         1287  +        p->iShmid = iPrevShmid;
         1288  +      }
  1184   1289       }
  1185   1290   
  1186   1291       if( rc==LSM_OK ){
  1187   1292         ShmChunkLoc *aSpace = &aSort[nSort];
  1188   1293         for(i=0; i<nSort; i++){
  1189   1294           if( aSort[i].pShm ){
  1190   1295             assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
................................................................................
  1242   1347       rc = treeRepairList(db);
  1243   1348     }
  1244   1349   
  1245   1350     memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
  1246   1351     return rc;
  1247   1352   }
  1248   1353   
  1249         -/*
  1250         -** Insert a new entry into the in-memory tree.
  1251         -**
  1252         -** If the value of the 5th parameter, nVal, is negative, then a delete-marker
  1253         -** is inserted into the tree. In this case the value pointer, pVal, must be
  1254         -** NULL.
  1255         -*/
  1256         -int lsmTreeInsert(
         1354  +static void treeOverwriteKey(lsm_db *db, TreeCursor *pCsr, u32 iKey, int *pRc){
         1355  +  if( *pRc==LSM_OK ){
         1356  +    TreeRoot *p = &db->treehdr.root;
         1357  +    TreeNode *pNew;
         1358  +    u32 iNew;
         1359  +    TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
         1360  +    int iCell = pCsr->aiCell[pCsr->iNode];
         1361  +
         1362  +    /* Create a copy of this node */
         1363  +    if( (pCsr->iNode>0 && pCsr->iNode==(p->nHeight-1)) ){
         1364  +      pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
         1365  +    }else{
         1366  +      pNew = copyTreeNode(db, pNode, &iNew, pRc);
         1367  +    }
         1368  +
         1369  +    if( pNew ){
         1370  +      /* Modify the value in the new version */
         1371  +      pNew->aiKeyPtr[iCell] = iKey;
         1372  +
         1373  +      /* Change the pointer in the parent (if any) to point at the new 
         1374  +       ** TreeNode */
         1375  +      pCsr->iNode--;
         1376  +      treeUpdatePtr(db, pCsr, iNew);
         1377  +    }
         1378  +  }
         1379  +}
         1380  +
         1381  +static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
         1382  +  TreeNode *pNode;
         1383  +  int iNode = pCsr->iNode;
         1384  +  int iCell = pCsr->aiCell[iNode]+1;
         1385  +
         1386  +  /* Cursor currently points to a leaf node. */
         1387  +  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );
         1388  +
         1389  +  while( iNode>=0 ){
         1390  +    TreeNode *pNode = pCsr->apTreeNode[iNode];
         1391  +    if( iCell<3 && pNode->aiKeyPtr[iCell] ){
         1392  +      int rc = LSM_OK;
         1393  +      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc);
         1394  +      assert( rc==LSM_OK );
         1395  +      return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
         1396  +    }
         1397  +    iNode--;
         1398  +    iCell = pCsr->aiCell[iNode];
         1399  +  }
         1400  +
         1401  +  return 0;
         1402  +}
         1403  +
         1404  +static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
         1405  +  TreeNode *pNode;
         1406  +  int iNode = pCsr->iNode;
         1407  +
         1408  +  /* Cursor currently points to a leaf node. */
         1409  +  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );
         1410  +
         1411  +  while( iNode>=0 ){
         1412  +    TreeNode *pNode = pCsr->apTreeNode[iNode];
         1413  +    int iCell = pCsr->aiCell[iNode]-1;
         1414  +    if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
         1415  +      int rc = LSM_OK;
         1416  +      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc);
         1417  +      assert( rc==LSM_OK );
         1418  +      return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
         1419  +    }
         1420  +    iNode--;
         1421  +  }
         1422  +
         1423  +  return 0;
         1424  +}
         1425  +
         1426  +
         1427  +static int treeInsertEntry(
  1257   1428     lsm_db *pDb,                    /* Database handle */
         1429  +  int flags,                      /* Flags associated with entry */
  1258   1430     void *pKey,                     /* Pointer to key data */
  1259   1431     int nKey,                       /* Size of key data in bytes */
  1260   1432     void *pVal,                     /* Pointer to value data (or NULL) */
  1261   1433     int nVal                        /* Bytes in value data (or -ve for delete) */
  1262   1434   ){
  1263   1435     int rc = LSM_OK;                /* Return Code */
  1264   1436     TreeKey *pTreeKey;              /* New key-value being inserted */
  1265   1437     u32 iTreeKey;
  1266   1438     TreeRoot *p = &pDb->treehdr.root;
         1439  +  TreeCursor csr;                 /* Cursor to seek to pKey/nKey */
         1440  +  int res;                        /* Result of seek operation on csr */
  1267   1441   
  1268   1442     assert( nVal>=0 || pVal==0 );
  1269   1443     assert_tree_looks_ok(LSM_OK, pTree);
         1444  +  assert( flags==LSM_INSERT       || flags==LSM_POINT_DELETE 
         1445  +       || flags==LSM_START_DELETE || flags==LSM_END_DELETE 
         1446  +  );
  1270   1447   #if 0
  1271   1448     dump_tree_contents(pDb, "before");
  1272   1449   #endif
         1450  +
         1451  +  if( p->iRoot ){
         1452  +    TreeKey *pRes;                /* Key at end of seek operation */
         1453  +    treeCursorInit(pDb, 0, &csr);
         1454  +
         1455  +    /* Seek to the leaf (or internal node) that the new key belongs on */
         1456  +    rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res);
         1457  +    pRes = csrGetKey(&csr, &csr.blob, &rc);
         1458  +    if( rc!=LSM_OK ) return rc;
         1459  +
         1460  +    if( flags==LSM_START_DELETE ){
         1461  +      /* When inserting a start-delete-range entry, if the key that
         1462  +      ** occurs immediately before the new entry is already a START_DELETE,
         1463  +      ** then the new entry is not required.  */
         1464  +      if( (res<=0 && (pRes->flags & LSM_START_DELETE))
         1465  +       || (res>0  && treePrevIsStartDelete(pDb, &csr))
         1466  +      ){ 
         1467  +        goto insert_entry_out;
         1468  +      }
         1469  +    }else if( flags==LSM_END_DELETE ){
         1470  +      /* When inserting an start-delete-range entry, if the key that
         1471  +      ** occurs immediately after the new entry is already an END_DELETE,
         1472  +      ** then the new entry is not required.  */
         1473  +      if( (res<0  && treeNextIsEndDelete(pDb, &csr))
         1474  +       || (res>=0 && (pRes->flags & LSM_END_DELETE))
         1475  +      ){
         1476  +        goto insert_entry_out;
         1477  +      }
         1478  +    }
         1479  +
         1480  +    if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){
         1481  +      if( pRes->flags & LSM_INSERT ){
         1482  +        nVal = pRes->nValue;
         1483  +        pVal = TK_VAL(pRes);
         1484  +      }
         1485  +      flags = flags | pRes->flags;
         1486  +    }
         1487  +
         1488  +    if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){
         1489  +      if( (res<0 && (pRes->flags & LSM_START_DELETE))
         1490  +       || (res>0 && (pRes->flags & LSM_END_DELETE)) 
         1491  +      ){
         1492  +        flags = flags | (LSM_END_DELETE|LSM_START_DELETE);
         1493  +      }else if( res==0 ){
         1494  +        flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE));
         1495  +      }
         1496  +    }
         1497  +  }else{
         1498  +    memset(&csr, 0, sizeof(TreeCursor));
         1499  +  }
  1273   1500   
  1274   1501     /* Allocate and populate a new key-value pair structure */
  1275   1502     pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
  1276   1503     if( rc!=LSM_OK ) return rc;
         1504  +  pTreeKey->flags = flags;
  1277   1505   
  1278   1506     if( p->iRoot==0 ){
  1279   1507       /* The tree is completely empty. Add a new root node and install
  1280   1508       ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
  1281   1509       ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
  1282   1510       ** space for the fields used by interior nodes). This is because the
  1283   1511       ** treeInsert() routine may convert this node to an interior node. */
................................................................................
  1284   1512       TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
  1285   1513       if( rc==LSM_OK ){
  1286   1514         assert( p->nHeight==0 );
  1287   1515         pRoot->aiKeyPtr[1] = iTreeKey;
  1288   1516         p->nHeight = 1;
  1289   1517       }
  1290   1518     }else{
  1291         -    TreeCursor csr;
  1292         -    int res;
  1293         -
  1294         -    /* Seek to the leaf (or internal node) that the new key belongs on */
  1295         -    treeCursorInit(pDb, 0, &csr);
  1296         -    lsmTreeCursorSeek(&csr, pKey, nKey, &res);
  1297         -
  1298   1519       if( res==0 ){
  1299   1520         /* The search found a match within the tree. */
  1300         -      TreeNode *pNew;
  1301         -      u32 iNew;
  1302         -      TreeNode *pNode = csr.apTreeNode[csr.iNode];
  1303         -      int iCell = csr.aiCell[csr.iNode];
  1304         -
  1305         -      /* Create a copy of this node */
  1306         -      if( (csr.iNode>0 && csr.iNode==(p->nHeight-1)) ){
  1307         -        pNew = copyTreeLeaf(pDb, (TreeLeaf *)pNode, &iNew, &rc);
  1308         -      }else{
  1309         -        pNew = copyTreeNode(pDb, pNode, &iNew, &rc);
  1310         -      }
  1311         -
  1312         -      if( rc==LSM_OK ){
  1313         -        /* Modify the value in the new version */
  1314         -        pNew->aiKeyPtr[iCell] = iTreeKey;
  1315         -
  1316         -        /* Change the pointer in the parent (if any) to point at the new 
  1317         -        ** TreeNode */
  1318         -        csr.iNode--;
  1319         -        treeUpdatePtr(pDb, &csr, iNew);
  1320         -      }
         1521  +      treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
  1321   1522       }else{
  1322   1523         /* The cursor now points to the leaf node into which the new entry should
  1323   1524         ** be inserted. There may or may not be a free slot within the leaf for
  1324   1525         ** the new key-value pair. 
  1325   1526         **
  1326   1527         ** iSlot is set to the index of the key within pLeaf that the new key
  1327   1528         ** should be inserted to the left of (or to a value 1 greater than the
................................................................................
  1331   1532         int iSlot = csr.aiCell[csr.iNode] + (res<0);
  1332   1533         if( csr.iNode==0 ){
  1333   1534           rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
  1334   1535         }else{
  1335   1536           rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
  1336   1537         }
  1337   1538       }
  1338         -    tblobFree(pDb, &csr.blob);
  1339   1539     }
  1340   1540   
  1341   1541   #if 0
  1342   1542     dump_tree_contents(pDb, "after");
  1343   1543   #endif
         1544  + insert_entry_out:
         1545  +  tblobFree(pDb, &csr.blob);
  1344   1546     assert_tree_looks_ok(rc, pTree);
  1345   1547     return rc;
  1346   1548   }
         1549  +
         1550  +/*
         1551  +** Insert a new entry into the in-memory tree.
         1552  +**
         1553  +** If the value of the 5th parameter, nVal, is negative, then a delete-marker
         1554  +** is inserted into the tree. In this case the value pointer, pVal, must be
         1555  +** NULL.
         1556  +*/
         1557  +int lsmTreeInsert(
         1558  +  lsm_db *pDb,                    /* Database handle */
         1559  +  void *pKey,                     /* Pointer to key data */
         1560  +  int nKey,                       /* Size of key data in bytes */
         1561  +  void *pVal,                     /* Pointer to value data (or NULL) */
         1562  +  int nVal                        /* Bytes in value data (or -ve for delete) */
         1563  +){
         1564  +  int flags;
         1565  +  if( nVal<0 ){
         1566  +    flags = LSM_POINT_DELETE;
         1567  +  }else{
         1568  +    flags = LSM_INSERT;
         1569  +  }
         1570  +
         1571  +  return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal);
         1572  +}
         1573  +
         1574  +static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
         1575  +  TreeRoot *p = &db->treehdr.root;
         1576  +  TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
         1577  +  int iSlot = pCsr->aiCell[pCsr->iNode];
         1578  +  int bLeaf;
         1579  +  int rc = LSM_OK;
         1580  +
         1581  +  assert( pNode->aiKeyPtr[1] );
         1582  +  assert( pNode->aiKeyPtr[iSlot] );
         1583  +  assert( iSlot==0 || iSlot==1 || iSlot==2 );
         1584  +  assert( (pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );
         1585  +
         1586  +  bLeaf = (pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
         1587  +  
         1588  +  if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
         1589  +    /* There are currently at least 2 keys on this node. So just create
         1590  +    ** a new copy of the node with one of the keys removed. If the node
         1591  +    ** happens to be the root node of the tree, allocate an entire 
         1592  +    ** TreeNode structure instead of just a TreeLeaf.  */
         1593  +    TreeNode *pNew;
         1594  +    u32 iNew;
         1595  +
         1596  +    if( bLeaf ){
         1597  +      pNew = (TreeNode *)newTreeLeaf(db, &iNew, &rc);
         1598  +    }else{
         1599  +      pNew = newTreeNode(db, &iNew, &rc);
         1600  +    }
         1601  +    if( pNew ){
         1602  +      int i;
         1603  +      int iOut = 1;
         1604  +      for(i=0; i<4; i++){
         1605  +        if( i==iSlot ){
         1606  +          i++;
         1607  +          if( bLeaf==0 ) pNew->aiChildPtr[iOut] = iNewptr;
         1608  +          if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
         1609  +          iOut++;
         1610  +        }else if( bLeaf || p->nHeight==1 ){
         1611  +          if( i<3 && pNode->aiKeyPtr[i] ){
         1612  +            pNew->aiKeyPtr[iOut++] = pNode->aiKeyPtr[i];
         1613  +          }
         1614  +        }else{
         1615  +          if( getChildPtr(pNode, WORKING_VERSION, i) ){
         1616  +            pNew->aiChildPtr[iOut] = getChildPtr(pNode, WORKING_VERSION, i);
         1617  +            if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
         1618  +            iOut++;
         1619  +          }
         1620  +        }
         1621  +      }
         1622  +      assert( iOut<=4 );
         1623  +      assert( bLeaf || pNew->aiChildPtr[0]==0 );
         1624  +      pCsr->iNode--;
         1625  +      rc = treeUpdatePtr(db, pCsr, iNew);
         1626  +    }
         1627  +
         1628  +  }else if( pCsr->iNode==0 ){
         1629  +    /* Removing the only key in the root node. iNewptr is the new root. */
         1630  +    assert( iSlot==1 );
         1631  +    db->treehdr.root.iRoot = iNewptr;
         1632  +    db->treehdr.root.nHeight--;
         1633  +
         1634  +  }else{
         1635  +    /* There is only one key on this node and the node is not the root
         1636  +    ** node. Find a peer for this node. Then redistribute the contents of
         1637  +    ** the peer and the parent cell between the parent and either one or
         1638  +    ** two new nodes.  */
         1639  +    TreeNode *pParent;            /* Parent tree node */
         1640  +    int iPSlot;
         1641  +    u32 iPeer;                    /* Pointer to peer leaf node */
         1642  +    int iDir;
         1643  +    TreeNode *pPeer;              /* The peer leaf node */
         1644  +    TreeNode *pNew1; u32 iNew1;   /* First new leaf node */
         1645  +
         1646  +    assert( iSlot==1 );
         1647  +
         1648  +    pParent = pCsr->apTreeNode[pCsr->iNode-1];
         1649  +    iPSlot = pCsr->aiCell[pCsr->iNode-1];
         1650  +
         1651  +    if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
         1652  +      iDir = -1;
         1653  +    }else{
         1654  +      iDir = +1;
         1655  +    }
         1656  +    iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
         1657  +    pPeer = (TreeNode *)treeShmptr(db, iPeer, &rc);
         1658  +    assertIsWorkingChild(db, pNode, pParent, iPSlot);
         1659  +
         1660  +    /* Allocate the first new leaf node. This is always required. */
         1661  +    if( bLeaf ){
         1662  +      pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
         1663  +    }else{
         1664  +      pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);
         1665  +    }
         1666  +
         1667  +    if( pPeer->aiKeyPtr[0] && pPeer->aiKeyPtr[2] ){
         1668  +      /* Peer node is completely full. This means that two new leaf nodes
         1669  +      ** and a new parent node are required. */
         1670  +
         1671  +      TreeNode *pNew2; u32 iNew2; /* Second new leaf node */
         1672  +      TreeNode *pNewP; u32 iNewP; /* New parent node */
         1673  +
         1674  +      if( bLeaf ){
         1675  +        pNew2 = (TreeNode *)newTreeLeaf(db, &iNew2, &rc);
         1676  +      }else{
         1677  +        pNew2 = (TreeNode *)newTreeNode(db, &iNew2, &rc);
         1678  +      }
         1679  +      pNewP = copyTreeNode(db, pParent, &iNewP, &rc);
         1680  +
         1681  +      if( iDir==-1 ){
         1682  +        pNew1->aiKeyPtr[1] = pPeer->aiKeyPtr[0];
         1683  +        if( bLeaf==0 ){
         1684  +          pNew1->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 0);
         1685  +          pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 1);
         1686  +        }
         1687  +
         1688  +        pNewP->aiChildPtr[iPSlot-1] = iNew1;
         1689  +        pNewP->aiKeyPtr[iPSlot-1] = pPeer->aiKeyPtr[1];
         1690  +        pNewP->aiChildPtr[iPSlot] = iNew2;
         1691  +
         1692  +        pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[2];
         1693  +        pNew2->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot-1];
         1694  +        if( bLeaf==0 ){
         1695  +          pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 2);
         1696  +          pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 3);
         1697  +          pNew2->aiChildPtr[2] = iNewptr;
         1698  +        }
         1699  +      }else{
         1700  +        pNew1->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot];
         1701  +        if( bLeaf==0 ){
         1702  +          pNew1->aiChildPtr[1] = iNewptr;
         1703  +          pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 0);
         1704  +        }
         1705  +
         1706  +        pNewP->aiChildPtr[iPSlot] = iNew1;
         1707  +        pNewP->aiKeyPtr[iPSlot] = pPeer->aiKeyPtr[0];
         1708  +        pNewP->aiChildPtr[iPSlot+1] = iNew2;
         1709  +
         1710  +        pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[1];
         1711  +        pNew2->aiKeyPtr[1] = pPeer->aiKeyPtr[2];
         1712  +        if( bLeaf==0 ){
         1713  +          pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 1);
         1714  +          pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 2);
         1715  +          pNew2->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 3);
         1716  +        }
         1717  +      }
         1718  +      assert( pCsr->iNode>=1 );
         1719  +      pCsr->iNode -= 2;
         1720  +      if( rc==LSM_OK ){
         1721  +        assert( pNew1->aiKeyPtr[1] && pNew2->aiKeyPtr[1] );
         1722  +        rc = treeUpdatePtr(db, pCsr, iNewP);
         1723  +      }
         1724  +    }else{
         1725  +      int iKOut = 0;
         1726  +      int iPOut = 0;
         1727  +      int i;
         1728  +
         1729  +      pCsr->iNode--;
         1730  +
         1731  +      if( iDir==1 ){
         1732  +        pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
         1733  +        if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
         1734  +      }
         1735  +      for(i=0; i<3; i++){
         1736  +        if( pPeer->aiKeyPtr[i] ){
         1737  +          pNew1->aiKeyPtr[iKOut++] = pPeer->aiKeyPtr[i];
         1738  +        }
         1739  +      }
         1740  +      if( bLeaf==0 ){
         1741  +        for(i=0; i<4; i++){
         1742  +          if( getChildPtr(pPeer, WORKING_VERSION, i) ){
         1743  +            pNew1->aiChildPtr[iPOut++] = getChildPtr(pPeer, WORKING_VERSION, i);
         1744  +          }
         1745  +        }
         1746  +      }
         1747  +      if( iDir==-1 ){
         1748  +        iPSlot--;
         1749  +        pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
         1750  +        if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
         1751  +        pCsr->aiCell[pCsr->iNode] = iPSlot;
         1752  +      }
         1753  +
         1754  +      rc = treeDeleteEntry(db, pCsr, iNew1);
         1755  +    }
         1756  +  }
         1757  +
         1758  +  return rc;
         1759  +}
         1760  +
         1761  +/*
         1762  +** Delete a range of keys from the tree structure (i.e. the lsm_delete_range()
         1763  +** function, not lsm_delete()).
         1764  +**
         1765  +** This is a two step process: 
         1766  +**
         1767  +**     1) Remove all entries currently stored in the tree that have keys
         1768  +**        that fall into the deleted range.
         1769  +**
         1770  +**        TODO: There are surely good ways to optimize this step - removing 
         1771  +**        a range of keys from a b-tree. But for now, this function removes
         1772  +**        them one at a time using the usual approach.
         1773  +**
         1774  +**     2) Unless the largest key smaller than or equal to (pKey1/nKey1) is
         1775  +**        already marked as START_DELETE, insert a START_DELETE key. 
         1776  +**        Similarly, unless the smallest key greater than or equal to
         1777  +**        (pKey2/nKey2) is already START_END, insert a START_END key.
         1778  +*/
         1779  +int lsmTreeDelete(
         1780  +  lsm_db *db,
         1781  +  void *pKey1, int nKey1,         /* Start of range */
         1782  +  void *pKey2, int nKey2          /* End of range */
         1783  +){
         1784  +  int rc = LSM_OK;
         1785  +  int bDone = 0;
         1786  +  TreeRoot *p = &db->treehdr.root;
         1787  +  TreeBlob blob = {0, 0};
         1788  +
         1789  +  /* The range must be sensible - that (key1 < key2). */
         1790  +  assert( db->xCmp(pKey1, nKey1, pKey2, nKey2)<0 );
         1791  +  assert( assert_delete_ranges_match(db) );
         1792  +
         1793  +#if 0
         1794  +  static int nCall = 0;
         1795  +  printf("\n");
         1796  +  nCall++;
         1797  +  printf("%d delete %s .. %s\n", nCall, (char *)pKey1, (char *)pKey2);
         1798  +  dump_tree_contents(db, "before delete");
         1799  +#endif
         1800  +
         1801  +  /* Step 1. This loop runs until the tree contains no keys within the
         1802  +  ** range being deleted. Or until an error occurs. */
         1803  +  while( bDone==0 && rc==LSM_OK ){
         1804  +    int res;
         1805  +    TreeCursor csr;               /* Cursor to seek to first key in range */
         1806  +    void *pDel; int nDel;         /* Key to (possibly) delete this iteration */
         1807  +#ifndef NDEBUG
         1808  +    int nEntry = treeCountEntries(db);
         1809  +#endif
         1810  +
         1811  +    /* Seek the cursor to the first entry in the tree greater than pKey1. */
         1812  +    treeCursorInit(db, 0, &csr);
         1813  +    lsmTreeCursorSeek(&csr, pKey1, nKey1, &res);
         1814  +    if( res<=0 && lsmTreeCursorValid(&csr) ) lsmTreeCursorNext(&csr);
         1815  +
         1816  +    /* If there is no such entry, or if it is greater than pKey2, then the
         1817  +    ** tree now contains no keys in the range being deleted. In this case
         1818  +    ** break out of the loop.  */
         1819  +    bDone = 1;
         1820  +    if( lsmTreeCursorValid(&csr) ){
         1821  +      lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
         1822  +      if( db->xCmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
         1823  +    }
         1824  +
         1825  +    if( bDone==0 ){
         1826  +      if( csr.iNode==(p->nHeight-1) ){
         1827  +        /* The element to delete already lies on a leaf node */
         1828  +        rc = treeDeleteEntry(db, &csr, 0);
         1829  +      }else{
         1830  +        /* 1. Overwrite the current key with a copy of the next key in the 
         1831  +        **    tree (key N).
         1832  +        **
         1833  +        ** 2. Seek to key N (cursor will stop at the internal node copy of
         1834  +        **    N). Move to the next key (original copy of N). Delete
         1835  +        **    this entry. 
         1836  +        */
         1837  +        u32 iKey;
         1838  +        TreeKey *pKey;
         1839  +        int iNode = csr.iNode;
         1840  +        lsmTreeCursorNext(&csr);
         1841  +        assert( csr.iNode==(p->nHeight-1) );
         1842  +
         1843  +        iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
         1844  +        lsmTreeCursorPrev(&csr);
         1845  +
         1846  +        treeOverwriteKey(db, &csr, iKey, &rc);
         1847  +        pKey = treeShmkey(db, iKey, TK_LOADKEY, &blob, &rc);
         1848  +        if( pKey ){
         1849  +          rc = lsmTreeCursorSeek(&csr, TK_KEY(pKey), pKey->nKey, &res);
         1850  +        }
         1851  +        if( rc==LSM_OK ){
         1852  +          assert( res==0 && csr.iNode==iNode );
         1853  +          rc = lsmTreeCursorNext(&csr);
         1854  +          if( rc==LSM_OK ){
         1855  +            rc = treeDeleteEntry(db, &csr, 0);
         1856  +          }
         1857  +        }
         1858  +      }
         1859  +    }
         1860  +
         1861  +    /* Clean up any memory allocated by the cursor. */
         1862  +    tblobFree(db, &csr.blob);
         1863  +#if 0
         1864  +    dump_tree_contents(db, "ddd delete");
         1865  +#endif
         1866  +    assert( bDone || treeCountEntries(db)==(nEntry-1) );
         1867  +  }
         1868  +
         1869  +#if 0
         1870  +  dump_tree_contents(db, "during delete");
         1871  +#endif
         1872  +
         1873  +  /* Now insert the START_DELETE and END_DELETE keys. */
         1874  +  if( rc==LSM_OK ){
         1875  +    rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1);
         1876  +  }
         1877  +#if 0
         1878  +  dump_tree_contents(db, "during delete 2");
         1879  +#endif
         1880  +  if( rc==LSM_OK ){
         1881  +    rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1);
         1882  +  }
         1883  +
         1884  +#if 0
         1885  +  dump_tree_contents(db, "after delete");
         1886  +#endif
         1887  +
         1888  +  tblobFree(db, &blob);
         1889  +  assert( assert_delete_ranges_match(db) );
         1890  +  return rc;
         1891  +}
  1347   1892   
  1348   1893   /*
  1349   1894   ** Return, in bytes, the amount of memory currently used by the tree 
  1350   1895   ** structure.
  1351   1896   */
  1352   1897   int lsmTreeSize(lsm_db *pDb){
  1353   1898     return pDb->treehdr.nByte;
................................................................................
  1547   2092         if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
  1548   2093       }
  1549   2094     }
  1550   2095   
  1551   2096   #ifndef NDEBUG
  1552   2097     if( pCsr->iNode>=0 ){
  1553   2098       TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
  1554         -    assert( rc || pDb->xCmp(TK_KEY(pK2), pK2->nKey, TK_KEY(pK1), pK1->nKey)>0 );
         2099  +    assert( rc || pDb->xCmp(TK_KEY(pK2),pK2->nKey,TK_KEY(pK1),pK1->nKey)>=0 );
  1555   2100     }
  1556   2101     tblobFree(pDb, &key1);
  1557   2102   #endif
  1558   2103   
  1559   2104     return rc;
  1560   2105   }
  1561   2106   
................................................................................
  1629   2174   
  1630   2175   /*
  1631   2176   ** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
  1632   2177   ** in-memory tree.
  1633   2178   */
  1634   2179   int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
  1635   2180     lsm_db *pDb = pCsr->pDb;
  1636         -  TreeHeader *pHdr = &pDb->treehdr;
  1637   2181     TreeRoot *pRoot = pCsr->pRoot;
  1638   2182     int rc = LSM_OK;
  1639   2183   
  1640   2184     u32 iNodePtr;
  1641   2185     pCsr->iNode = -1;
  1642   2186   
  1643   2187     /* Discard any saved position data */
................................................................................
  1666   2210       }
  1667   2211       pCsr->aiCell[pCsr->iNode] = iCell - (iNodePtr==0 && bLast);
  1668   2212     }
  1669   2213   
  1670   2214     return rc;
  1671   2215   }
  1672   2216   
  1673         -int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey){
         2217  +int lsmTreeCursorFlags(TreeCursor *pCsr){
         2218  +  int flags = 0;
         2219  +  if( pCsr && pCsr->iNode>=0 ){
         2220  +    int rc = LSM_OK;
         2221  +    TreeKey *pKey = (TreeKey *)treeShmptr(pCsr->pDb,
         2222  +        pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], &rc
         2223  +    );
         2224  +    assert( rc==LSM_OK );
         2225  +    flags = pKey->flags;
         2226  +  }
         2227  +  return flags;
         2228  +}
         2229  +
         2230  +int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){
  1674   2231     TreeKey *pTreeKey;
  1675   2232     int rc = LSM_OK;
  1676   2233   
  1677   2234     assert( lsmTreeCursorValid(pCsr) );
  1678   2235   
  1679   2236     pTreeKey = pCsr->pSave;
  1680   2237     if( !pTreeKey ){
  1681   2238       pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
  1682   2239     }
  1683   2240     if( rc==LSM_OK ){
  1684   2241       *pnKey = pTreeKey->nKey;
         2242  +    if( pFlags ) *pFlags = pTreeKey->flags;
  1685   2243       *ppKey = (void *)&pTreeKey[1];
  1686   2244     }
  1687   2245   
  1688   2246     return rc;
  1689   2247   }
  1690   2248   
  1691   2249   int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){
................................................................................
  1692   2250     int res = 0;
  1693   2251     int rc;
  1694   2252   
  1695   2253     rc = treeCursorRestore(pCsr, &res);
  1696   2254     if( res==0 ){
  1697   2255       TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
  1698   2256       if( rc==LSM_OK ){
  1699         -      *pnVal = pTreeKey->nValue;
  1700         -      if( pTreeKey->nValue>=0 ){
         2257  +      if( pTreeKey->flags & LSM_INSERT ){
         2258  +        *pnVal = pTreeKey->nValue;
  1701   2259           *ppVal = TK_VAL(pTreeKey);
  1702   2260         }else{
  1703   2261           *ppVal = 0;
         2262  +        *pnVal = -1;
  1704   2263         }
  1705   2264       }
  1706   2265     }else{
  1707   2266       *ppVal = 0;
  1708   2267       *pnVal = 0;
  1709   2268     }
  1710   2269   
................................................................................
  1838   2397       memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
  1839   2398     }
  1840   2399     pShm->bWriter = 0;
  1841   2400     intArrayFree(pDb->pEnv, &pDb->rollback);
  1842   2401   
  1843   2402     return LSM_OK;
  1844   2403   }
         2404  +
         2405  +#ifndef NDEBUG
         2406  +static int assert_delete_ranges_match(lsm_db *db){
         2407  +  int prev = 0;
         2408  +  TreeBlob blob = {0, 0};
         2409  +  TreeCursor csr;               /* Cursor used to iterate through tree */
         2410  +  int rc;
         2411  +
         2412  +  treeCursorInit(db, 0, &csr);
         2413  +  for( rc = lsmTreeCursorEnd(&csr, 0);
         2414  +       rc==LSM_OK && lsmTreeCursorValid(&csr);
         2415  +       rc = lsmTreeCursorNext(&csr)
         2416  +  ){
         2417  +    TreeKey *pKey = csrGetKey(&csr, &blob, &rc);
         2418  +    if( rc!=LSM_OK ) break;
         2419  +    assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) );
         2420  +    prev = pKey->flags;
         2421  +  }
         2422  +
         2423  +  tblobFree(csr.pDb, &csr.blob);
         2424  +
         2425  +  return 1;
         2426  +}
         2427  +
         2428  +static int treeCountEntries(lsm_db *db){
         2429  +  TreeCursor csr;               /* Cursor used to iterate through tree */
         2430  +  int rc;
         2431  +  int nEntry = 0;
         2432  +
         2433  +  treeCursorInit(db, 0, &csr);
         2434  +  for( rc = lsmTreeCursorEnd(&csr, 0);
         2435  +       rc==LSM_OK && lsmTreeCursorValid(&csr);
         2436  +       rc = lsmTreeCursorNext(&csr)
         2437  +  ){
         2438  +    nEntry++;
         2439  +  }
         2440  +
         2441  +  tblobFree(csr.pDb, &csr.blob);
         2442  +
         2443  +  return nEntry;
         2444  +}
         2445  +#endif
         2446  +
  1845   2447