SQLite4
Check-in [a7de625f13]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Merge range-delete branch back into trunk.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: a7de625f13d7a02d7a55e2e50210c949ada37a4d
User & Date: dan 2012-10-15 19:36:53
Context
2012-10-17
19:29
Fix various issues with tcl test cases. check-in: ae7d9c68ef user: dan tags: trunk
2012-10-16
15:26
Change page numbers to 8-byte numbers (from 4). This is required to support compressed databases, where a page number is a byte offset in the database file. check-in: 5d266a717d user: dan tags: compression-hooks
2012-10-15
19:36
Merge range-delete branch back into trunk. check-in: a7de625f13 user: dan tags: trunk
19:34
Fix a case in live-recovery from a writer crash. Leaf check-in: 80abdbea2d user: dan tags: range-delete
2012-10-03
09:24
Minor changes to the lsmperf.tcl script. check-in: 45e59053e7 user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Show Whitespace Changes Patch

Changes to lsm-test/lsmtest.h.

42
43
44
45
46
47
48

49
50
51
52
53
54
55
...
117
118
119
120
121
122
123

124
125

126
127
128
129
130
131
132
...
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
  DatabaseMethods const *pMethods;          /* Database methods */
  const char *zLibrary;                     /* Library name for tdb_open() */
};
struct DatabaseMethods {
  int (*xClose)(TestDb *);
  int (*xWrite)(TestDb *, void *, int , void *, int);
  int (*xDelete)(TestDb *, void *, int);

  int (*xFetch)(TestDb *, void *, int, void **, int *);
  int (*xScan)(TestDb *, void *, int, void *, int, void *, int,
    void (*)(void *, void *, int , void *, int)
  );
  int (*xBegin)(TestDb *, int);
  int (*xCommit)(TestDb *, int);
  int (*xRollback)(TestDb *, int);
................................................................................
void test_failed(void);

char *testMallocPrintf(const char *zFormat, ...);
char *testMallocVPrintf(const char *zFormat, va_list ap);
int testGlobMatch(const char *zPattern, const char *zStr);

void testScanCompare(TestDb *, TestDb *, int, void *, int, void *, int, int *);


void *testMalloc(int);

void *testRealloc(void *, int);
void testFree(void *);

/* testio.c */
int testVfsConfigureDb(TestDb *pDb);

/* testfunc.c */
................................................................................
void testWriteDatasource(TestDb *, Datasource *, int, int *);
void testWriteDatasourceRange(TestDb *, Datasource *, int, int, int *);
void testDeleteDatasource(TestDb *, Datasource *, int, int *);
void testDeleteDatasourceRange(TestDb *, Datasource *, int, int, int *);


/* test1.c */
void test_data_1(TestDb *pDb, int *pRc);
void test_data_2(TestDb *pDb, int *pRc);
void test_data_3(const char *, const char *, int *pRc);
void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *);
void testCaseProgress(int, int, int, int *);
int testCaseNDot(void);

typedef struct CksumDb CksumDb;
CksumDb *testCksumArrayNew(Datasource *, int, int, int);
char *testCksumArrayGet(CksumDb *, int);







>







 







>


>







 







|
<
|







42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
...
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
...
193
194
195
196
197
198
199
200

201
202
203
204
205
206
207
208
  DatabaseMethods const *pMethods;          /* Database methods */
  const char *zLibrary;                     /* Library name for tdb_open() */
};
struct DatabaseMethods {
  int (*xClose)(TestDb *);
  int (*xWrite)(TestDb *, void *, int , void *, int);
  int (*xDelete)(TestDb *, void *, int);
  int (*xDeleteRange)(TestDb *, void *, int, void *, int);
  int (*xFetch)(TestDb *, void *, int, void **, int *);
  int (*xScan)(TestDb *, void *, int, void *, int, void *, int,
    void (*)(void *, void *, int , void *, int)
  );
  int (*xBegin)(TestDb *, int);
  int (*xCommit)(TestDb *, int);
  int (*xRollback)(TestDb *, int);
................................................................................
void test_failed(void);

char *testMallocPrintf(const char *zFormat, ...);
char *testMallocVPrintf(const char *zFormat, va_list ap);
int testGlobMatch(const char *zPattern, const char *zStr);

void testScanCompare(TestDb *, TestDb *, int, void *, int, void *, int, int *);
void testFetchCompare(TestDb *, TestDb *, void *, int, int *);

void *testMalloc(int);
void *testMallocCopy(void *pCopy, int nByte);
void *testRealloc(void *, int);
void testFree(void *);

/* testio.c */
int testVfsConfigureDb(TestDb *pDb);

/* testfunc.c */
................................................................................
void testWriteDatasource(TestDb *, Datasource *, int, int *);
void testWriteDatasourceRange(TestDb *, Datasource *, int, int, int *);
void testDeleteDatasource(TestDb *, Datasource *, int, int *);
void testDeleteDatasourceRange(TestDb *, Datasource *, int, int, int *);


/* test1.c */
void test_data_1(const char *, const char *, int *pRc);

void test_data_2(const char *, const char *, int *pRc);
void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *);
void testCaseProgress(int, int, int, int *);
int testCaseNDot(void);

typedef struct CksumDb CksumDb;
CksumDb *testCksumArrayNew(Datasource *, int, int, int);
char *testCksumArrayGet(CksumDb *, int);

Changes to lsm-test/lsmtest1.c.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47









































48
49
50
51
52
53
54
55
...
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
...
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
...
302
303
304
305
306
307
308
309
310
311
312
313












































































































































#include "lsmtest.h"

#define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE
#define DATA_RANDOM     TEST_DATASOURCE_RANDOM

typedef struct Datatest Datatest;
typedef struct MinMax MinMax;

struct MinMax {
  int nMin;
  int nMax;
};

/*
** An instance of the following structure contains parameters used to
** customize the test function in this file. Test procedure:
**
**   1. Create a data-source based on the "datasource definition" vars.
**
**   2. Insert nRow key value pairs into the database.
**
**   3. Delete all keys from the database. Deletes are done in the same 
**      order as the inserts.
**
** During steps 2 and 3 above, after each Datatest.nVerify inserts or
** deletes, the following:
**
**   a. Run Datasource.nTest key lookups and check the results are as expected.
**
**   b. If Datasource.bTestScan is true, run a handful (8) of range
**      queries (scanning forwards and backwards). Check that the results
**      are as expected.
**
**   c. Close and reopen the database. Then run (a) and (b) again.
*/
struct Datatest {
  /* Datasource definition */
  DatasourceDefn defn;

  /* Test procedure parameters */
  int nRow;                       /* Number of rows to insert then delete */
  int nVerify;                    /* How often to verify the db contents */
  int nTest;                      /* Number of keys to test (0==all) */
  int bTestScan;                  /* True to do scan tests */
};










































static char *getName(const char *zSystem, Datatest *pTest){
  char *zRet;
  char *zData;
  zData = testDatasourceName(&pTest->defn);
  zRet = testMallocPrintf("data.%s.%s.%d.%d", 
      zSystem, zData, pTest->nRow, pTest->nVerify
  );
  testFree(zData);
................................................................................
    void *pCtx, void *pKey, int nKey, void *pVal, int nVal
){
  printf("%s\n", (char *)pKey);
  fflush(stdout);
}
#endif

static void doDataTest(
  const char *zSystem,            /* Database system to test */
  Datatest *p,                    /* Structure containing test parameters */
  int *pRc                        /* OUT: Error code */
){
  int i;
  int iDot;
  int rc = LSM_OK;
  Datasource *pData;
  TestDb *pDb;
................................................................................
  testDatasourceFree(pData);
  tdb_close(pDb);
  testCaseFinish(rc);
  *pRc = rc;
}


void test_data_3(
  const char *zSystem,            /* Database system name */
  const char *zPattern,           /* Run test cases that match this pattern */
  int *pRc                        /* IN/OUT: Error code */
){
  Datatest aTest[] = {
    { {DATA_RANDOM,     20,25,     100,200},       1000,  250, 1000, 1},
    { {DATA_RANDOM,     8,10,      100,200},       1000,  250, 1000, 1},
    { {DATA_RANDOM,     8,10,      10,20},         1000,  250, 1000, 1},
    { {DATA_RANDOM,     8,10,      1000,2000},     1000,  250, 1000, 1},
    { {DATA_RANDOM,     8,100,     10000,20000},    100,   25,  100, 1},
    { {DATA_RANDOM,     80,100,    10,20},         1000,  250, 1000, 1},
    { {DATA_RANDOM,     5000,6000, 10,20},          100,   25,  100, 1},
................................................................................
  };

  int i;

  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
    char *zName = getName(zSystem, &aTest[i]);
    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
      doDataTest(zSystem, &aTest[i], pRc);
    }
    testFree(zName);
  }
}

















































































































































|
|
<
<
<
<
<












|










|










>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|







 







|

|







 







|




|







 







|




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8





9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
...
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
...
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
...
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488

#include "lsmtest.h"

#define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE
#define DATA_RANDOM     TEST_DATASOURCE_RANDOM

typedef struct Datatest1 Datatest1;
typedef struct Datatest2 Datatest2;






/*
** An instance of the following structure contains parameters used to
** customize the test function in this file. Test procedure:
**
**   1. Create a data-source based on the "datasource definition" vars.
**
**   2. Insert nRow key value pairs into the database.
**
**   3. Delete all keys from the database. Deletes are done in the same 
**      order as the inserts.
**
** During steps 2 and 3 above, after each Datatest1.nVerify inserts or
** deletes, the following:
**
**   a. Run Datasource.nTest key lookups and check the results are as expected.
**
**   b. If Datasource.bTestScan is true, run a handful (8) of range
**      queries (scanning forwards and backwards). Check that the results
**      are as expected.
**
**   c. Close and reopen the database. Then run (a) and (b) again.
*/
struct Datatest1 {
  /* Datasource definition */
  DatasourceDefn defn;

  /* Test procedure parameters */
  int nRow;                       /* Number of rows to insert then delete */
  int nVerify;                    /* How often to verify the db contents */
  int nTest;                      /* Number of keys to test (0==all) */
  int bTestScan;                  /* True to do scan tests */
};

/*
** An instance of the following data structure is used to describe the
** second type of test case in this file. The chief difference between 
** these tests and those described by Datatest1 is that these tests also
** experiment with range-delete operations. Tests proceed as follows:
**
**     1. Open the datasource described by Datatest2.defn. 
**
**     2. Open a connection on an empty database.
**
**     3. Do this Datatest2.nIter times:
**
**        a) Insert Datatest2.nWrite key-value pairs from the datasource.
**
**        b) Select two pseudo-random keys and use them as the start
**           and end points of a range-delete operation.
**
**        c) Verify that the contents of the database are as expected (see
**           below for details).
**
**        d) Close and then reopen the database handle.
**
**        e) Verify that the contents of the database are still as expected.
**
** The inserts and range deletes are run twice - once on the database being
** tested and once using a control system (sqlite3, kc etc. - something that 
** works). In order to verify that the contents of the db being tested are
** correct, the test runs a bunch of scans and lookups on both the test and
** control databases. If the results are the same, the test passes.
*/
struct Datatest2 {
  DatasourceDefn defn;
  int nRange;
  int nWrite;                     /* Number of writes per iteration */
  int nIter;                      /* Total number of iterations to run */
};

/*
** Generate a unique name for the test case pTest with database system
** zSystem.
*/
static char *getName(const char *zSystem, Datatest1 *pTest){
  char *zRet;
  char *zData;
  zData = testDatasourceName(&pTest->defn);
  zRet = testMallocPrintf("data.%s.%s.%d.%d", 
      zSystem, zData, pTest->nRow, pTest->nVerify
  );
  testFree(zData);
................................................................................
    void *pCtx, void *pKey, int nKey, void *pVal, int nVal
){
  printf("%s\n", (char *)pKey);
  fflush(stdout);
}
#endif

static void doDataTest1(
  const char *zSystem,            /* Database system to test */
  Datatest1 *p,                   /* Structure containing test parameters */
  int *pRc                        /* OUT: Error code */
){
  int i;
  int iDot;
  int rc = LSM_OK;
  Datasource *pData;
  TestDb *pDb;
................................................................................
  testDatasourceFree(pData);
  tdb_close(pDb);
  testCaseFinish(rc);
  *pRc = rc;
}


void test_data_1(
  const char *zSystem,            /* Database system name */
  const char *zPattern,           /* Run test cases that match this pattern */
  int *pRc                        /* IN/OUT: Error code */
){
  Datatest1 aTest[] = {
    { {DATA_RANDOM,     20,25,     100,200},       1000,  250, 1000, 1},
    { {DATA_RANDOM,     8,10,      100,200},       1000,  250, 1000, 1},
    { {DATA_RANDOM,     8,10,      10,20},         1000,  250, 1000, 1},
    { {DATA_RANDOM,     8,10,      1000,2000},     1000,  250, 1000, 1},
    { {DATA_RANDOM,     8,100,     10000,20000},    100,   25,  100, 1},
    { {DATA_RANDOM,     80,100,    10,20},         1000,  250, 1000, 1},
    { {DATA_RANDOM,     5000,6000, 10,20},          100,   25,  100, 1},
................................................................................
  };

  int i;

  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
    char *zName = getName(zSystem, &aTest[i]);
    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
      doDataTest1(zSystem, &aTest[i], pRc);
    }
    testFree(zName);
  }
}

static void testCompareDb(
  Datasource *pData,
  int nData,
  int iSeed,
  TestDb *pControl,
  TestDb *pDb,
  int *pRc
){
  int i;

  testScanCompare(pControl, pDb, 0, 0, 0,         0, 0,         pRc);
  testScanCompare(pControl, pDb, 1, 0, 0,         0, 0,         pRc);

  if( *pRc==0 ){
    int iKey1;
    int iKey2;
    void *pKey1; int nKey1;       /* Start key */
    void *pKey2; int nKey2;       /* Final key */

    iKey1 = testPrngValue(iSeed) % nData;
    iKey2 = testPrngValue(iSeed+1) % nData;
    testDatasourceEntry(pData, iKey1, &pKey2, &nKey1, 0, 0);
    pKey1 = testMalloc(nKey1+1);
    memcpy(pKey1, pKey2, nKey1+1);
    testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0);

    testScanCompare(pControl, pDb, 0, 0, 0,         pKey2, nKey2, pRc);
    testScanCompare(pControl, pDb, 0, pKey1, nKey1, 0, 0,         pRc);
    testScanCompare(pControl, pDb, 0, pKey1, nKey1, pKey2, nKey2, pRc);
    testScanCompare(pControl, pDb, 1, 0, 0,         pKey2, nKey2, pRc);
    testScanCompare(pControl, pDb, 1, pKey1, nKey1, 0, 0,         pRc);
    testScanCompare(pControl, pDb, 1, pKey1, nKey1, pKey2, nKey2, pRc);
    testFree(pKey1);
  }

  for(i=0; i<nData && *pRc==0; i++){
    void *pKey; int nKey;
    testDatasourceEntry(pData, i, &pKey, &nKey, 0, 0);
    testFetchCompare(pControl, pDb, pKey, nKey, pRc);
  }
}

static void doDataTest2(
  const char *zSystem,            /* Database system to test */
  Datatest2 *p,                   /* Structure containing test parameters */
  int *pRc                        /* OUT: Error code */
){
  TestDb *pDb;
  TestDb *pControl;
  Datasource *pData;
  int i;
  int rc = LSM_OK;
  int iDot = 0;

  /* Start the test case, open a database and allocate the datasource. */
  pDb = testOpen(zSystem, 1, &rc);
  pData = testDatasourceNew(&p->defn);
  rc = testControlDb(&pControl);

  if( tdb_lsm(pDb) ){
    int nBuf = 32 * 1024 * 1024;
    lsm_config(tdb_lsm(pDb), LSM_CONFIG_WRITE_BUFFER, &nBuf);
  }

  for(i=0; rc==0 && i<p->nIter; i++){
    void *pKey1; int nKey1;
    void *pKey2; int nKey2;
    int ii;
    int nRange = MIN(p->nIter*p->nWrite, p->nRange);

    for(ii=0; rc==0 && ii<p->nWrite; ii++){
      int iKey = (i*p->nWrite + ii) % p->nRange;
      testWriteDatasource(pControl, pData, iKey, &rc);
      testWriteDatasource(pDb, pData, iKey, &rc);
    }

    testDatasourceEntry(pData, i+1000000, &pKey1, &nKey1, 0, 0);
    pKey1 = testMallocCopy(pKey1, nKey1);
    testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0);

    testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc);
    testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc);
    testFree(pKey1);

    testCompareDb(pData, nRange, i, pControl, pDb, &rc);
    testReopen(&pDb, &rc);
    testCompareDb(pData, nRange, i, pControl, pDb, &rc);

    /* Update the progress dots... */
    testCaseProgress(i, p->nIter, testCaseNDot(), &iDot);
  }

  testClose(&pDb);
  testClose(&pControl);
  testDatasourceFree(pData);
  testCaseFinish(rc);
  *pRc = rc;
}

static char *getName2(const char *zSystem, Datatest2 *pTest){
  char *zRet;
  char *zData;
  zData = testDatasourceName(&pTest->defn);
  zRet = testMallocPrintf("data2.%s.%s.%d.%d.%d", 
      zSystem, zData, pTest->nRange, pTest->nWrite, pTest->nIter
  );
  testFree(zData);
  return zRet;
}

void test_data_2(
  const char *zSystem,            /* Database system name */
  const char *zPattern,           /* Run test cases that match this pattern */
  int *pRc                        /* IN/OUT: Error code */
){
  Datatest2 aTest[] = {
      /* defn,                                 nRange, nWrite, nIter */
    { {DATA_RANDOM,     20,25,     100,200},   10000,  10,     50   },
    { {DATA_RANDOM,     20,25,     100,200},   10000,  200,    50   },
    { {DATA_RANDOM,     20,25,     100,200},   100,    10,     1000 },
    { {DATA_RANDOM,     20,25,     100,200},   100,    200,    50   },

#if 0
    { {DATA_RANDOM,     20,25,     100,200},       200, 50 }
#endif
  };

  int i;

  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
    char *zName = getName2(zSystem, &aTest[i]);
    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
      doDataTest2(zSystem, &aTest[i], pRc);
    }
    testFree(zName);
  }
}

Changes to lsm-test/lsmtest8.c.

301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
}

void do_writer_crash_test(const char *zPattern, int *pRc){
  struct Test {
    const char *zName;
    void (*xFunc)(int *);
  } aTest[] = {
    { "writercrash2.lsm", doWriterCrash2 },
    { "writercrash1.lsm", doWriterCrash1 },
  };
  int i;
  for(i=0; i<ArraySize(aTest); i++){
    struct Test *p = &aTest[i];
    if( testCaseBegin(pRc, zPattern, p->zName) ){
      p->xFunc(pRc);
      testCaseFinish(*pRc);
    }
  }

}









|
|













301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
}

void do_writer_crash_test(const char *zPattern, int *pRc){
  struct Test {
    const char *zName;
    void (*xFunc)(int *);
  } aTest[] = {
    { "writercrash1.lsm", doWriterCrash1 },
    { "writercrash2.lsm", doWriterCrash2 },
  };
  int i;
  for(i=0; i<ArraySize(aTest); i++){
    struct Test *p = &aTest[i];
    if( testCaseBegin(pRc, zPattern, p->zName) ){
      p->xFunc(pRc);
      testCaseFinish(*pRc);
    }
  }

}


Changes to lsm-test/lsmtest_main.c.

69
70
71
72
73
74
75
76












77
78
79
80
81
82
83
...
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137



138
139
140
141
142
143
144
...
181
182
183
184
185
186
187
188



189
190
191
192
193
194
195
...
369
370
371
372
373
374
375






376
377
378
379
380
381
382
...
432
433
434
435
436
437
438
439

440
441
442
443
444
445
446
void testDelete(
  TestDb *pDb,                    /* Database handle */
  void *pKey, int nKey,           /* Key to query database for */
  int *pRc                        /* IN/OUT: Error code */
){
  if( *pRc==0 ){
    int rc;
    rc = tdb_delete(pDb, pKey, nKey);












    testSetError(rc);
  }
}

void testBegin(TestDb *pDb, int iTrans, int *pRc){
  if( *pRc==0 ){
    int rc;
................................................................................
  const char *zVal,               /* Value to write */
  int *pRc                        /* IN/OUT: Error code */
){
  int nVal = (zVal ? strlen(zVal) : 0);
  testFetch(pDb, (void *)zKey, strlen(zKey), (void *)zVal, nVal, pRc);
}

static void testFetchCompare(
  TestDb *pDb1, 
  TestDb *pDb2, 
  void *pKey, int nKey, 
  int *pRc
){
  int rc;
  void *pDbVal1;
  void *pDbVal2;
  int nDbVal1;
  int nDbVal2;




  rc = tdb_fetch(pDb1, pKey, nKey, &pDbVal1, &nDbVal1);
  testSetError(rc);

  rc = tdb_fetch(pDb2, pKey, nKey, &pDbVal2, &nDbVal2);
  testSetError(rc);

................................................................................
  void *pVal, int nVal
){
  ScanResult *p = (ScanResult *)pCtx;
  u8 *aKey = (u8 *)pKey;
  u8 *aVal = (u8 *)pVal;
  int i;

  if( test_scan_debug ) printf("%.*s\n", nKey, (char *)pKey);



#if 0
  if( test_scan_debug ) printf("%.20s\n", (char *)pVal);
#endif

#if 0
  /* Check tdb_fetch() matches */
  int rc = 0;
................................................................................
** Allocations made using testMalloc() should be freed using testFree().
*/
void *testMalloc(int n){
  void *pRet = malloc(n);
  memset(pRet, 0, n);
  return pRet;
}







void *testRealloc(void *p, int n){
  return realloc(p, n);
}

/*
** Free an allocation made by an earlier call to testMalloc().
................................................................................
  if( nArg==1 ){
    zPattern = azArg[0];
  }

  for(j=0; tdb_system_name(j); j++){
    rc = 0;

    test_data_3(tdb_system_name(j), zPattern, &rc);

    test_rollback(tdb_system_name(j), zPattern, &rc);
    test_mc(tdb_system_name(j), zPattern, &rc);
    test_mt(tdb_system_name(j), zPattern, &rc);

    if( rc ) nFail++;
  }








|
>
>
>
>
>
>
>
>
>
>
>
>







 







|










>
>
>







 







|
>
>
>







 







>
>
>
>
>
>







 







|
>







69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
...
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
...
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
...
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
...
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
void testDelete(
  TestDb *pDb,                    /* Database handle */
  void *pKey, int nKey,           /* Key to query database for */
  int *pRc                        /* IN/OUT: Error code */
){
  if( *pRc==0 ){
    int rc;
    *pRc = rc = tdb_delete(pDb, pKey, nKey);
    testSetError(rc);
  }
}
void testDeleteRange(
  TestDb *pDb,                    /* Database handle */
  void *pKey1, int nKey1,
  void *pKey2, int nKey2,
  int *pRc                        /* IN/OUT: Error code */
){
  if( *pRc==0 ){
    int rc;
    *pRc = rc = tdb_delete_range(pDb, pKey1, nKey1, pKey2, nKey2);
    testSetError(rc);
  }
}

void testBegin(TestDb *pDb, int iTrans, int *pRc){
  if( *pRc==0 ){
    int rc;
................................................................................
  const char *zVal,               /* Value to write */
  int *pRc                        /* IN/OUT: Error code */
){
  int nVal = (zVal ? strlen(zVal) : 0);
  testFetch(pDb, (void *)zKey, strlen(zKey), (void *)zVal, nVal, pRc);
}

void testFetchCompare(
  TestDb *pDb1, 
  TestDb *pDb2, 
  void *pKey, int nKey, 
  int *pRc
){
  int rc;
  void *pDbVal1;
  void *pDbVal2;
  int nDbVal1;
  int nDbVal2;

  static int nCall = 0;
  nCall++;

  rc = tdb_fetch(pDb1, pKey, nKey, &pDbVal1, &nDbVal1);
  testSetError(rc);

  rc = tdb_fetch(pDb2, pKey, nKey, &pDbVal2, &nDbVal2);
  testSetError(rc);

................................................................................
  void *pVal, int nVal
){
  ScanResult *p = (ScanResult *)pCtx;
  u8 *aKey = (u8 *)pKey;
  u8 *aVal = (u8 *)pVal;
  int i;

  if( test_scan_debug ){
    printf("%d: %.*s\n", p->nRow, nKey, (char *)pKey);
    fflush(stdout);
  }
#if 0
  if( test_scan_debug ) printf("%.20s\n", (char *)pVal);
#endif

#if 0
  /* Check tdb_fetch() matches */
  int rc = 0;
................................................................................
** Allocations made using testMalloc() should be freed using testFree().
*/
void *testMalloc(int n){
  void *pRet = malloc(n);
  memset(pRet, 0, n);
  return pRet;
}

void *testMallocCopy(void *pCopy, int nByte){
  void *pRet = testMalloc(nByte);
  memcpy(pRet, pCopy, nByte);
  return pRet;
}

void *testRealloc(void *p, int n){
  return realloc(p, n);
}

/*
** Free an allocation made by an earlier call to testMalloc().
................................................................................
  if( nArg==1 ){
    zPattern = azArg[0];
  }

  for(j=0; tdb_system_name(j); j++){
    rc = 0;

    test_data_1(tdb_system_name(j), zPattern, &rc);
    test_data_2(tdb_system_name(j), zPattern, &rc);
    test_rollback(tdb_system_name(j), zPattern, &rc);
    test_mc(tdb_system_name(j), zPattern, &rc);
    test_mt(tdb_system_name(j), zPattern, &rc);

    if( rc ) nFail++;
  }

Changes to lsm-test/lsmtest_tdb.c.

177
178
179
180
181
182
183

184
185
186
187
188
189
190
...
307
308
309
310
311
312
313

314
315
316
317
318
319
320
...
321
322
323
324
325
326
327

328
329
330
331
332
333
334
...
356
357
358
359
360
361
362












363
364
365
366
367
368
369
...
508
509
510
511
512
513
514

515
516
517
518
519
520
521
522
523

524
525
526
527
528
529
530
...
546
547
548
549
550
551
552

553
554
555
556
557
558
559
...
644
645
646
647
648
649
650






651
652
653
654
655
656
657
}

static int test_leveldb_open(const char *zFilename, int bClear, TestDb **ppDb){
  static const DatabaseMethods LeveldbMethods = {
    test_leveldb_close,
    test_leveldb_write,
    test_leveldb_delete,

    test_leveldb_fetch,
    test_leveldb_scan,
    error_transaction_function,
    error_transaction_function,
    error_transaction_function
  };

................................................................................
**   (nOpenTrans-1) nested write transactions open.
*/
struct SqlDb {
  TestDb base;
  sqlite3 *db;
  sqlite3_stmt *pInsert;
  sqlite3_stmt *pDelete;

  sqlite3_stmt *pFetch;
  sqlite3_stmt *apScan[8];

  int nOpenTrans;

  /* Used by sql_fetch() to allocate space for results */
  int nAlloc;
................................................................................
  u8 *aAlloc;
};

static int sql_close(TestDb *pTestDb){
  SqlDb *pDb = (SqlDb *)pTestDb;
  sqlite3_finalize(pDb->pInsert);
  sqlite3_finalize(pDb->pDelete);

  sqlite3_finalize(pDb->pFetch);
  sqlite3_finalize(pDb->apScan[0]);
  sqlite3_finalize(pDb->apScan[1]);
  sqlite3_finalize(pDb->apScan[2]);
  sqlite3_finalize(pDb->apScan[3]);
  sqlite3_finalize(pDb->apScan[4]);
  sqlite3_finalize(pDb->apScan[5]);
................................................................................

static int sql_delete(TestDb *pTestDb, void *pKey, int nKey){
  SqlDb *pDb = (SqlDb *)pTestDb;
  sqlite3_bind_blob(pDb->pDelete, 1, pKey, nKey, SQLITE_STATIC);
  sqlite3_step(pDb->pDelete);
  return sqlite3_reset(pDb->pDelete);
}













static int sql_fetch(
  TestDb *pTestDb, 
  void *pKey, 
  int nKey, 
  void **ppVal, 
  int *pnVal
................................................................................
}

static int sql_open(const char *zFilename, int bClear, TestDb **ppDb){
  static const DatabaseMethods SqlMethods = {
    sql_close,
    sql_write,
    sql_delete,

    sql_fetch,
    sql_scan,
    sql_begin,
    sql_commit,
    sql_rollback
  };
  const char *zCreate = "CREATE TABLE IF NOT EXISTS t1(k PRIMARY KEY, v)";
  const char *zInsert = "REPLACE INTO t1 VALUES(?, ?)";
  const char *zDelete = "DELETE FROM t1 WHERE k = ?";

  const char *zFetch  = "SELECT v FROM t1 WHERE k = ?";

  const char *zScan0  = "SELECT * FROM t1 WHERE k BETWEEN ?1 AND ?2 ORDER BY k";
  const char *zScan1  = "SELECT * FROM t1 WHERE k <= ?2 ORDER BY k";
  const char *zScan2  = "SELECT * FROM t1 WHERE k >= ?1 ORDER BY k";
  const char *zScan3  = "SELECT * FROM t1 ORDER BY k";

................................................................................
  memset(pDb, 0, sizeof(SqlDb));
  pDb->base.pMethods = &SqlMethods;

  if( 0!=(rc = sqlite3_open(zFilename, &pDb->db))
   || 0!=(rc = sqlite3_exec(pDb->db, zCreate, 0, 0, 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zInsert, -1, &pDb->pInsert, 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zDelete, -1, &pDb->pDelete, 0))

   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zFetch, -1, &pDb->pFetch, 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan0, -1, &pDb->apScan[0], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan1, -1, &pDb->apScan[1], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan2, -1, &pDb->apScan[2], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan3, -1, &pDb->apScan[3], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan4, -1, &pDb->apScan[4], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan5, -1, &pDb->apScan[5], 0))
................................................................................
int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal){
  return pDb->pMethods->xWrite(pDb, pKey, nKey, pVal, nVal);
}

int tdb_delete(TestDb *pDb, void *pKey, int nKey){
  return pDb->pMethods->xDelete(pDb, pKey, nKey);
}







int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal){
  return pDb->pMethods->xFetch(pDb, pKey, nKey, ppVal, pnVal);
}

int tdb_scan(
  TestDb *pDb,                    /* Database handle */







>







 







>







 







>







 







>
>
>
>
>
>
>
>
>
>
>
>







 







>









>







 







>







 







>
>
>
>
>
>







177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
...
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
...
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
...
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
...
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
...
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
...
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
}

static int test_leveldb_open(const char *zFilename, int bClear, TestDb **ppDb){
  static const DatabaseMethods LeveldbMethods = {
    test_leveldb_close,
    test_leveldb_write,
    test_leveldb_delete,
    0,
    test_leveldb_fetch,
    test_leveldb_scan,
    error_transaction_function,
    error_transaction_function,
    error_transaction_function
  };

................................................................................
**   (nOpenTrans-1) nested write transactions open.
*/
struct SqlDb {
  TestDb base;
  sqlite3 *db;
  sqlite3_stmt *pInsert;
  sqlite3_stmt *pDelete;
  sqlite3_stmt *pDeleteRange;
  sqlite3_stmt *pFetch;
  sqlite3_stmt *apScan[8];

  int nOpenTrans;

  /* Used by sql_fetch() to allocate space for results */
  int nAlloc;
................................................................................
  u8 *aAlloc;
};

static int sql_close(TestDb *pTestDb){
  SqlDb *pDb = (SqlDb *)pTestDb;
  sqlite3_finalize(pDb->pInsert);
  sqlite3_finalize(pDb->pDelete);
  sqlite3_finalize(pDb->pDeleteRange);
  sqlite3_finalize(pDb->pFetch);
  sqlite3_finalize(pDb->apScan[0]);
  sqlite3_finalize(pDb->apScan[1]);
  sqlite3_finalize(pDb->apScan[2]);
  sqlite3_finalize(pDb->apScan[3]);
  sqlite3_finalize(pDb->apScan[4]);
  sqlite3_finalize(pDb->apScan[5]);
................................................................................

static int sql_delete(TestDb *pTestDb, void *pKey, int nKey){
  SqlDb *pDb = (SqlDb *)pTestDb;
  sqlite3_bind_blob(pDb->pDelete, 1, pKey, nKey, SQLITE_STATIC);
  sqlite3_step(pDb->pDelete);
  return sqlite3_reset(pDb->pDelete);
}

static int sql_delete_range(
  TestDb *pTestDb, 
  void *pKey1, int nKey1,
  void *pKey2, int nKey2
){
  SqlDb *pDb = (SqlDb *)pTestDb;
  sqlite3_bind_blob(pDb->pDeleteRange, 1, pKey1, nKey1, SQLITE_STATIC);
  sqlite3_bind_blob(pDb->pDeleteRange, 2, pKey2, nKey2, SQLITE_STATIC);
  sqlite3_step(pDb->pDeleteRange);
  return sqlite3_reset(pDb->pDeleteRange);
}

static int sql_fetch(
  TestDb *pTestDb, 
  void *pKey, 
  int nKey, 
  void **ppVal, 
  int *pnVal
................................................................................
}

static int sql_open(const char *zFilename, int bClear, TestDb **ppDb){
  static const DatabaseMethods SqlMethods = {
    sql_close,
    sql_write,
    sql_delete,
    sql_delete_range,
    sql_fetch,
    sql_scan,
    sql_begin,
    sql_commit,
    sql_rollback
  };
  const char *zCreate = "CREATE TABLE IF NOT EXISTS t1(k PRIMARY KEY, v)";
  const char *zInsert = "REPLACE INTO t1 VALUES(?, ?)";
  const char *zDelete = "DELETE FROM t1 WHERE k = ?";
  const char *zRange = "DELETE FROM t1 WHERE k>? AND k<?";
  const char *zFetch  = "SELECT v FROM t1 WHERE k = ?";

  const char *zScan0  = "SELECT * FROM t1 WHERE k BETWEEN ?1 AND ?2 ORDER BY k";
  const char *zScan1  = "SELECT * FROM t1 WHERE k <= ?2 ORDER BY k";
  const char *zScan2  = "SELECT * FROM t1 WHERE k >= ?1 ORDER BY k";
  const char *zScan3  = "SELECT * FROM t1 ORDER BY k";

................................................................................
  memset(pDb, 0, sizeof(SqlDb));
  pDb->base.pMethods = &SqlMethods;

  if( 0!=(rc = sqlite3_open(zFilename, &pDb->db))
   || 0!=(rc = sqlite3_exec(pDb->db, zCreate, 0, 0, 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zInsert, -1, &pDb->pInsert, 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zDelete, -1, &pDb->pDelete, 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zRange, -1, &pDb->pDeleteRange, 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zFetch, -1, &pDb->pFetch, 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan0, -1, &pDb->apScan[0], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan1, -1, &pDb->apScan[1], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan2, -1, &pDb->apScan[2], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan3, -1, &pDb->apScan[3], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan4, -1, &pDb->apScan[4], 0))
   || 0!=(rc = sqlite3_prepare_v2(pDb->db, zScan5, -1, &pDb->apScan[5], 0))
................................................................................
int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal){
  return pDb->pMethods->xWrite(pDb, pKey, nKey, pVal, nVal);
}

int tdb_delete(TestDb *pDb, void *pKey, int nKey){
  return pDb->pMethods->xDelete(pDb, pKey, nKey);
}

int tdb_delete_range(
    TestDb *pDb, void *pKey1, int nKey1, void *pKey2, int nKey2
){
  return pDb->pMethods->xDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2);
}

int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal){
  return pDb->pMethods->xFetch(pDb, pKey, nKey, ppVal, pnVal);
}

int tdb_scan(
  TestDb *pDb,                    /* Database handle */

Changes to lsm-test/lsmtest_tdb.h.

46
47
48
49
50
51
52





53
54
55
56
57
58
59
int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal);

/*
** Delete a key from the database.
*/
int tdb_delete(TestDb *pDb, void *pKey, int nKey);






/*
** Query the database for key (pKey/nKey). If no entry is found, set *ppVal
** to 0 and *pnVal to -1 before returning. Otherwise, set *ppVal and *pnVal
** to a pointer to and size of the value associated with (pKey/nKey).
*/
int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal);








>
>
>
>
>







46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
int tdb_write(TestDb *pDb, void *pKey, int nKey, void *pVal, int nVal);

/*
** Delete a key from the database.
*/
int tdb_delete(TestDb *pDb, void *pKey, int nKey);

/*
** Delete a range of keys from the database.
*/
int tdb_delete_range(TestDb *, void *pKey1, int nKey1, void *pKey2, int nKey2);

/*
** Query the database for key (pKey/nKey). If no entry is found, set *ppVal
** to 0 and *pnVal to -1 before returning. Otherwise, set *ppVal and *pnVal
** to a pointer to and size of the value associated with (pKey/nKey).
*/
int tdb_fetch(TestDb *pDb, void *pKey, int nKey, void **ppVal, int *pnVal);

Changes to lsm-test/lsmtest_tdb3.c.

450
451
452
453
454
455
456









457
458
459
460
461
462
463
...
726
727
728
729
730
731
732

733
734
735
736
737
738
739
....
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161

1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
  return lsm_write(pDb->db, pKey, nKey, pVal, nVal);
}

static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
  LsmDb *pDb = (LsmDb *)pTestDb;
  return lsm_delete(pDb->db, pKey, nKey);
}










static int test_lsm_fetch(
  TestDb *pTestDb, 
  void *pKey, 
  int nKey, 
  void **ppVal, 
  int *pnVal
................................................................................
  int bClear, 
  TestDb **ppDb
){
  static const DatabaseMethods LsmMethods = {
    test_lsm_close,
    test_lsm_write,
    test_lsm_delete,

    test_lsm_fetch,
    test_lsm_scan,
    test_lsm_begin,
    test_lsm_commit,
    test_lsm_rollback
  };

................................................................................
    }
  }

  return rc;
}


static int test_lsm_mt(
  const char *zFilename,          /* File to open */
  int nWorker,                    /* Either 1 or 2, for worker threads */
  int bClear,                     /* True to delete any existing db */
  TestDb **ppDb                   /* OUT: TestDb database handle */
){
  LsmDb *pDb;
  int rc;

  rc = test_lsm_open(zFilename, bClear, ppDb);
  pDb = (LsmDb *)*ppDb;

  assert( nWorker==1 || nWorker==2 );

  /* Turn off auto-work and configure a work-hook on the client connection. */
  if( rc==0 ){
    int bAutowork = 0;
    lsm_config(pDb->db, LSM_CONFIG_AUTOWORK, &bAutowork);
    lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);
  }

  if( rc==0 ){
    pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
    memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
    pDb->nWorker = nWorker;

    rc = mt_start_worker(pDb, 0, zFilename, 0, LSM_WORK_FLUSH, 
        nWorker==1 ? 512 : 0, 1
    );
  }

  if( rc==0 && nWorker==2 ){
    rc = mt_start_worker(pDb, 1, zFilename, 0, 0, 512, 0);
  }

  return rc;
}

int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 1, bClear, ppDb);
}

int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 2, bClear, ppDb);

}


#else
static void mt_shutdown(LsmDb *pDb) { 
  unused_parameter(pDb); 
}
int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
  unused_parameter(zFilename);
  unused_parameter(bClear);
  unused_parameter(ppDb);
  testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
  return 1;
}
#endif







>
>
>
>
>
>
>
>
>







 







>







 







|
|
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<



|
>

<













450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
...
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
....
1121
1122
1123
1124
1125
1126
1127
1128
1129







1130






























1131
1132
1133
1134
1135
1136

1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
  return lsm_write(pDb->db, pKey, nKey, pVal, nVal);
}

static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
  LsmDb *pDb = (LsmDb *)pTestDb;
  return lsm_delete(pDb->db, pKey, nKey);
}

static int test_lsm_delete_range(
  TestDb *pTestDb, 
  void *pKey1, int nKey1,
  void *pKey2, int nKey2
){
  LsmDb *pDb = (LsmDb *)pTestDb;
  return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2);
}

static int test_lsm_fetch(
  TestDb *pTestDb, 
  void *pKey, 
  int nKey, 
  void **ppVal, 
  int *pnVal
................................................................................
  int bClear, 
  TestDb **ppDb
){
  static const DatabaseMethods LsmMethods = {
    test_lsm_close,
    test_lsm_write,
    test_lsm_delete,
    test_lsm_delete_range,
    test_lsm_fetch,
    test_lsm_scan,
    test_lsm_begin,
    test_lsm_commit,
    test_lsm_rollback
  };

................................................................................
    }
  }

  return rc;
}


int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  const char *zCfg = "threads=2";







  return testLsmOpen(zCfg, zFilename, bClear, ppDb);






























}

int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  const char *zCfg = "threads=3";
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}


#else
static void mt_shutdown(LsmDb *pDb) { 
  unused_parameter(pDb); 
}
int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
  unused_parameter(zFilename);
  unused_parameter(bClear);
  unused_parameter(ppDb);
  testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
  return 1;
}
#endif

Changes to src/lsm.h.

376
377
378
379
380
381
382
383
384



385
386
387
388
389
390
391
/*
** Delete a value from the database. No error is returned if the specified
** key value does not exist in the database.
*/
int lsm_delete(lsm_db *, const void *pKey, int nKey);

/*
** Delete all database entries with keys that are greater than or equal to
** (pKey1/nKey1) and smaller than or equal to (pKey2/nKey2).



*/
int lsm_delete_range(lsm_db *, 
    const void *pKey1, int nKey1, const void *pKey2, int nKey2
);

/*
** The lsm_tree_size() function reports on the current state of the 







|
|
>
>
>







376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
/*
** Delete a value from the database. No error is returned if the specified
** key value does not exist in the database.
*/
int lsm_delete(lsm_db *, const void *pKey, int nKey);

/*
** Delete all database entries with keys that are greater than (pKey1/nKey1) 
** and smaller than (pKey2/nKey2). Note that keys (pKey1/nKey1) and
** (pKey2/nKey2) themselves, if they exist in the database, are not deleted.
**
** Return LSM_OK if successful, or an LSM error code otherwise.
*/
int lsm_delete_range(lsm_db *, 
    const void *pKey1, int nKey1, const void *pKey2, int nKey2
);

/*
** The lsm_tree_size() function reports on the current state of the 

Changes to src/lsmInt.h.

143
144
145
146
147
148
149












150
151
152
153
154
155
156
...
374
375
376
377
378
379
380

381
382
383
384
385
386
387
...
539
540
541
542
543
544
545
546

547
548
549
550

551
552
553
554
555
556
557
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
*/
#define LSM_MAX_FREELIST_ENTRIES 100

#define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000













/*
** A string that can grow by appending.
*/
struct LsmString {
  lsm_env *pEnv;              /* Run-time environment */
  int n;                      /* Size of string.  -1 indicates error */
  int nAlloc;                 /* Space allocated for z[] */
................................................................................
};
struct Merge {
  int nInput;                     /* Number of input runs being merged */
  MergeInput *aInput;             /* Array nInput entries in size */
  MergeInput splitkey;            /* Location in file of current splitkey */
  int nSkip;                      /* Number of separators entries to skip */
  int iOutputOff;                 /* Write offset on output page */

  int bHierReadonly;              /* True if b-tree heirarchies are read-only */
};

/* 
** The first argument to this macro is a pointer to a Segment structure.
** Returns true if the structure instance indicates that the separators
** array is valid.
................................................................................
void lsmTreeCursorDestroy(TreeCursor *);

int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes);
int lsmTreeCursorNext(TreeCursor *pCsr);
int lsmTreeCursorPrev(TreeCursor *pCsr);
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast);
void lsmTreeCursorReset(TreeCursor *pCsr);
int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey);

int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal);
int lsmTreeCursorValid(TreeCursor *pCsr);
int lsmTreeCursorSave(TreeCursor *pCsr);



/* 
** Functions from file "mem.c".
*/
int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool);
void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool);
void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte);







>
>
>
>
>
>
>
>
>
>
>
>







 







>







 







|
>




>







143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
...
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
...
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
*/
#define LSM_MAX_FREELIST_ENTRIES 100

#define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000


/*
** Each entry stored in the LSM (or in-memory tree structure) has an
** associated mask of the following flags.
*/
#define LSM_START_DELETE 0x01     /* Start of open-ended delete range */
#define LSM_END_DELETE   0x02     /* End of open-ended delete range */
#define LSM_POINT_DELETE 0x04     /* Delete this key */
#define LSM_INSERT       0x08     /* Insert this key and value */
#define LSM_SEPARATOR    0x10     /* True if entry is separator key only */
#define LSM_SYSTEMKEY    0x20     /* True if entry is a system key (FREELIST) */

/*
** A string that can grow by appending.
*/
struct LsmString {
  lsm_env *pEnv;              /* Run-time environment */
  int n;                      /* Size of string.  -1 indicates error */
  int nAlloc;                 /* Space allocated for z[] */
................................................................................
};
struct Merge {
  int nInput;                     /* Number of input runs being merged */
  MergeInput *aInput;             /* Array nInput entries in size */
  MergeInput splitkey;            /* Location in file of current splitkey */
  int nSkip;                      /* Number of separators entries to skip */
  int iOutputOff;                 /* Write offset on output page */
  Pgno iCurrentPtr;               /* Current pointer value */
  int bHierReadonly;              /* True if b-tree heirarchies are read-only */
};

/* 
** The first argument to this macro is a pointer to a Segment structure.
** Returns true if the structure instance indicates that the separators
** array is valid.
................................................................................
void lsmTreeCursorDestroy(TreeCursor *);

int lsmTreeCursorSeek(TreeCursor *pCsr, void *pKey, int nKey, int *pRes);
int lsmTreeCursorNext(TreeCursor *pCsr);
int lsmTreeCursorPrev(TreeCursor *pCsr);
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast);
void lsmTreeCursorReset(TreeCursor *pCsr);
int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey);
int lsmTreeCursorFlags(TreeCursor *pCsr);
int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal);
int lsmTreeCursorValid(TreeCursor *pCsr);
int lsmTreeCursorSave(TreeCursor *pCsr);

void lsmFlagsToString(int flags, char *zFlags);

/* 
** Functions from file "mem.c".
*/
int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool);
void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool);
void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte);

Changes to src/lsm_ckpt.c.

60
61
62
63
64
65
66

67
68
69
70
71
72
73
...
308
309
310
311
312
313
314

315
316
317
318
319
320
321
...
497
498
499
500
501
502
503

504
505
506
507
508
509
510
**     4. If nRight>0, The number of segments involved in the merge
**     5. if nRight>0, Current nSkip value (see Merge structure defn.),
**     6. For each segment in the merge:
**        5a. Page number of next cell to read during merge
**        5b. Cell number of next cell to read during merge
**     7. Page containing current split-key.
**     8. Cell within page containing current split-key.

**
**   The freelist. 
**
**     1. Number of free-list entries stored in checkpoint header.
**     2. For each entry:
**        2a. Block number of free block.
**        2b. MSW of associated checkpoint id.
................................................................................
    ckptSetValue(p, iOut++, pMerge->nSkip, pRc);
    for(i=0; i<pMerge->nInput; i++){
      ckptSetValue(p, iOut++, pMerge->aInput[i].iPg, pRc);
      ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc);
    }
    ckptSetValue(p, iOut++, pMerge->splitkey.iPg, pRc);
    ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc);

  }

  *piOut = iOut;
}

/*
** Populate the log offset fields of the checkpoint buffer. 4 values.
................................................................................
  pMerge->nSkip = (int)aInt[iIn++];
  for(i=0; i<nInput; i++){
    pMerge->aInput[i].iPg = (Pgno)aInt[iIn++];
    pMerge->aInput[i].iCell = (int)aInt[iIn++];
  }
  pMerge->splitkey.iPg = (Pgno)aInt[iIn++];
  pMerge->splitkey.iCell = (int)aInt[iIn++];


  /* Set *piIn and return LSM_OK. */
  *piIn = iIn;
  return LSM_OK;
}









>







 







>







 







>







60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
...
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
...
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
**     4. If nRight>0, The number of segments involved in the merge
**     5. if nRight>0, Current nSkip value (see Merge structure defn.),
**     6. For each segment in the merge:
**        5a. Page number of next cell to read during merge
**        5b. Cell number of next cell to read during merge
**     7. Page containing current split-key.
**     8. Cell within page containing current split-key.
**     9. Current pointer value.
**
**   The freelist. 
**
**     1. Number of free-list entries stored in checkpoint header.
**     2. For each entry:
**        2a. Block number of free block.
**        2b. MSW of associated checkpoint id.
................................................................................
    ckptSetValue(p, iOut++, pMerge->nSkip, pRc);
    for(i=0; i<pMerge->nInput; i++){
      ckptSetValue(p, iOut++, pMerge->aInput[i].iPg, pRc);
      ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc);
    }
    ckptSetValue(p, iOut++, pMerge->splitkey.iPg, pRc);
    ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc);
    ckptSetValue(p, iOut++, pMerge->iCurrentPtr, pRc);
  }

  *piOut = iOut;
}

/*
** Populate the log offset fields of the checkpoint buffer. 4 values.
................................................................................
  pMerge->nSkip = (int)aInt[iIn++];
  for(i=0; i<nInput; i++){
    pMerge->aInput[i].iPg = (Pgno)aInt[iIn++];
    pMerge->aInput[i].iCell = (int)aInt[iIn++];
  }
  pMerge->splitkey.iPg = (Pgno)aInt[iIn++];
  pMerge->splitkey.iCell = (int)aInt[iIn++];
  pMerge->iCurrentPtr = (int)aInt[iIn++];

  /* Set *piIn and return LSM_OK. */
  *piIn = iIn;
  return LSM_OK;
}


Changes to src/lsm_file.c.

725
726
727
728
729
730
731

732
733
734
735
736
737
738
  Page **ppPg                     /* OUT: New page handle */
){
  Page *p;
  int iHash;
  int rc = LSM_OK;

  assert( iPg>=fsFirstPageOnBlock(pFS, 1) );


  if( pFS->bUseMmap ){
    i64 iEnd = (i64)iPg * pFS->nPagesize;
    fsGrowMapping(pFS, iEnd, &rc);
    if( rc!=LSM_OK ) return rc;

    if( pFS->pFree ){







>







725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
  Page **ppPg                     /* OUT: New page handle */
){
  Page *p;
  int iHash;
  int rc = LSM_OK;

  assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
  *ppPg = 0;

  if( pFS->bUseMmap ){
    i64 iEnd = (i64)iPg * pFS->nPagesize;
    fsGrowMapping(pFS, iEnd, &rc);
    if( rc!=LSM_OK ) return rc;

    if( pFS->pFree ){

Changes to src/lsm_main.c.

475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498

499



500
501
502
503
504
505
506
...
509
510
511
512
513
514
515



516


517
518
519
520
521
522
523
...
529
530
531
532
533
534
535











536
537
538
539
540
541















542
543
544
545
546
547
548
      break;
  }

  va_end(ap);
  return rc;
}

/* 
** Write a new value into the database.
*/
int lsm_write(
  lsm_db *pDb,                    /* Database connection */
  const void *pKey, int nKey,     /* Key to write or delete */
  const void *pVal, int nVal      /* Value to write. Or nVal==-1 for a delete */
){
  int rc = LSM_OK;                /* Return code */
  int bCommit = 0;                /* True to commit before returning */

  if( pDb->nTransOpen==0 ){
    bCommit = 1;
    rc = lsm_begin(pDb, 1);
  }

  if( rc==LSM_OK ){

    rc = lsmLogWrite(pDb, (void *)pKey, nKey, (void *)pVal, nVal);



  }

  lsmSortedSaveTreeCursors(pDb);

  if( rc==LSM_OK ){
    int pgsz = lsmFsPageSize(pDb->pFS);
    int nQuant = LSM_AUTOWORK_QUANT * pgsz;
................................................................................
    int nDiff;

    if( nQuant>pDb->nTreeLimit ){
      nQuant = pDb->nTreeLimit;
    }

    nBefore = lsmTreeSize(pDb);



    rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);


    nAfter = lsmTreeSize(pDb);
    nDiff = (nAfter/nQuant) - (nBefore/nQuant);
    if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
      rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);
    }
  }

................................................................................
    }else{
      lsm_rollback(pDb, 0);
    }
  }

  return rc;
}












/*
** Delete a value from the database. 
*/
int lsm_delete(lsm_db *pDb, const void *pKey, int nKey){
  return lsm_write(pDb, pKey, nKey, 0, -1);















}

/*
** Open a new cursor handle. 
**
** If there are currently no other open cursor handles, and no open write
** transaction, open a read transaction here.







|
|
|
<
<












>
|
>
>
>







 







>
>
>
|
>
>







 







>
>
>
>
>
>
>
>
>
>
>




|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







475
476
477
478
479
480
481
482
483
484


485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
...
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
...
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
      break;
  }

  va_end(ap);
  return rc;
}

static int doWriteOp(
  lsm_db *pDb,
  int bDeleteRange,


  const void *pKey, int nKey,     /* Key to write or delete */
  const void *pVal, int nVal      /* Value to write. Or nVal==-1 for a delete */
){
  int rc = LSM_OK;                /* Return code */
  int bCommit = 0;                /* True to commit before returning */

  if( pDb->nTransOpen==0 ){
    bCommit = 1;
    rc = lsm_begin(pDb, 1);
  }

  if( rc==LSM_OK ){
    if( bDeleteRange==0 ){
      rc = lsmLogWrite(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
    }else{
      /* TODO */
    }
  }

  lsmSortedSaveTreeCursors(pDb);

  if( rc==LSM_OK ){
    int pgsz = lsmFsPageSize(pDb->pFS);
    int nQuant = LSM_AUTOWORK_QUANT * pgsz;
................................................................................
    int nDiff;

    if( nQuant>pDb->nTreeLimit ){
      nQuant = pDb->nTreeLimit;
    }

    nBefore = lsmTreeSize(pDb);
    if( bDeleteRange ){
      rc = lsmTreeDelete(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
    }else{
      rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
    }

    nAfter = lsmTreeSize(pDb);
    nDiff = (nAfter/nQuant) - (nBefore/nQuant);
    if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
      rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);
    }
  }

................................................................................
    }else{
      lsm_rollback(pDb, 0);
    }
  }

  return rc;
}

/* 
** Write a new value into the database.
*/
int lsm_write(
  lsm_db *db,                     /* Database connection */
  const void *pKey, int nKey,     /* Key to write or delete */
  const void *pVal, int nVal      /* Value to write. Or nVal==-1 for a delete */
){
  return doWriteOp(db, 0, pKey, nKey, pVal, nVal);
}

/*
** Delete a value from the database. 
*/
int lsm_delete(lsm_db *db, const void *pKey, int nKey){
  return doWriteOp(db, 0, pKey, nKey, 0, -1);
}

/*
** Delete a range of database keys.
*/
int lsm_delete_range(
  lsm_db *db,                     /* Database handle */
  const void *pKey1, int nKey1,   /* Lower bound of range to delete */
  const void *pKey2, int nKey2    /* Upper bound of range to delete */
){
  int rc = LSM_OK;
  if( db->xCmp((void *)pKey1, nKey1, (void *)pKey2, nKey2)<0 ){
    rc = doWriteOp(db, 1, pKey1, nKey1, pKey2, nKey2);
  }
  return rc;
}

/*
** Open a new cursor handle. 
**
** If there are currently no other open cursor handles, and no open write
** transaction, open a read transaction here.

Changes to src/lsm_shared.c.

631
632
633
634
635
636
637


638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
/*
** Argument bFlush is true if the contents of the in-memory tree has just
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){


  /* If no error has occurred, serialize the worker snapshot and write
  ** it to shared memory.  */

  assert( pDb->pWorker );
  assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 );
  if( *pRc==LSM_OK ){
    *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}








>
>
|
|
<
<




<
<







631
632
633
634
635
636
637
638
639
640
641


642
643
644
645


646
647
648
649
650
651
652
/*
** Argument bFlush is true if the contents of the in-memory tree has just
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
  assert( *pRc!=0 || pDb->pWorker );
  if( pDb->pWorker ){
    /* If no error has occurred, serialize the worker snapshot and write
    ** it to shared memory.  */


    assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 );
    if( *pRc==LSM_OK ){
      *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
    }


    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}

Changes to src/lsm_sorted.c.

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

109
110

111
112
113
114
115
116
117
...
230
231
232
233
234
235
236



237
238
239
240
241
242
243
...
264
265
266
267
268
269
270





271
272
273
274
275
276
277
278

279
280
281
282
283
284
285
...
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
...
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
....
1108
1109
1110
1111
1112
1113
1114

1115
1116
1117

1118
1119
1120
1121





1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
















1135
1136
1137
1138
1139
1140
1141
....
1161
1162
1163
1164
1165
1166
1167

1168
1169
1170
1171
1172
1173
1174
....
1181
1182
1183
1184
1185
1186
1187













1188
1189
1190
1191
1192
1193
1194
....
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
....
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
....
1360
1361
1362
1363
1364
1365
1366







































































































































































1367
1368
1369
1370
1371
1372
1373



1374
1375
1376
1377
1378
1379
1380
....
1413
1414
1415
1416
1417
1418
1419

1420
1421
1422
1423
















1424
1425

1426
1427
1428
1429







1430

1431
1432
1433
1434

1435
1436
1437
1438
1439
1440
1441

1442
1443
1444
1445
1446
1447
1448
....
1531
1532
1533
1534
1535
1536
1537
1538

1539
1540
1541
1542
1543
1544
1545
....
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564











1565
1566
1567
1568
1569
1570
1571
1572

1573

1574
1575
1576
1577

1578
1579
1580
1581
1582
1583
1584
....
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614

1615

1616
1617
1618
1619
1620
1621
1622
....
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
....
1672
1673
1674
1675
1676
1677
1678






























1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
....
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710

1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
....
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
....
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
....
2124
2125
2126
2127
2128
2129
2130





































2131
2132
2133
2134
2135
2136


2137
2138
2139
2140
2141
2142
2143
....
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
....
2255
2256
2257
2258
2259
2260
2261
2262

2263
2264
2265

2266

2267
2268
2269
2270
2271
2272
2273














2274

2275

2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288

2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322

2323
2324

2325
2326

2327
2328
2329
2330

2331
2332

2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358

2359
2360
2361
2362
2363
2364


2365
2366
2367
2368
2369
2370
2371
2372
....
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398

2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414

2415
2416
2417
2418
2419
2420
2421
2422
















2423
2424
2425
2426
2427
2428
2429
....
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
....
2457
2458
2459
2460
2461
2462
2463




2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
....
2482
2483
2484
2485
2486
2487
2488

2489
2490
2491
2492
2493
2494
2495





2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509

2510
2511
2512
2513
2514
2515
2516
....
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
....
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
....
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
....
3036
3037
3038
3039
3040
3041
3042
3043
3044

3045
3046
3047
3048
3049
3050
3051
....
3211
3212
3213
3214
3215
3216
3217

3218
3219
3220
3221








































3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
....
3238
3239
3240
3241
3242
3243
3244

3245
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262



3263
3264
3265
3266
3267
3268
3269
....
3281
3282
3283
3284
3285
3286
3287

3288
3289
3290
3291
3292
3293
3294
....
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
....
3382
3383
3384
3385
3386
3387
3388

3389
3390
3391
3392
3393
3394
3395
....
3413
3414
3415
3416
3417
3418
3419
3420
3421
3422
3423
3424
3425
3426
3427
....
3581
3582
3583
3584
3585
3586
3587

3588
3589
3590
3591
3592
3593
3594
....
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
....
3848
3849
3850
3851
3852
3853
3854





3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
3913
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
....
4354
4355
4356
4357
4358
4359
4360
4361









4362
4363
4364
4365




4366
4367
4368
4369
4370
4371
4372
....
4400
4401
4402
4403
4404
4405
4406

4407
4408
4409
4410
4411
4412
4413
4414
4415
4416
4417
4418
4419
4420
4421
4422
4423
4424
4425
4426
4427

4428
4429
4430
4431
4432
4433
4434
....
4444
4445
4446
4447
4448
4449
4450

4451
4452
4453
4454
4455
4456
4457
4458











4459
4460
4461
4462

4463
4464
4465
4466
4467
4468
4469
4470
4471
4472





4473

4474
4475
4476
4477
4478
4479
4480
**   (N==0). And on most pages the first record that starts on the page will
**   not start at byte offset 0. For example:
**
**      aaaaa bbbbb ccc <footer>    cc eeeee fffff g <footer>    gggg....
**
** RECORD FORMAT:
** 
**   Each record in a sorted file is either a WRITE, a DELETE, or a 
**   SEPARATOR. 
**
**   The first byte of the record indicates the type, as follows:
**
**     SORTED_SEPARATOR:  0x01
**     SORTED_WRITE:      0x02
**     SORTED_DELETE:     0x03

**
**   If the sorted file contains pointers, then immediately following the
**   type byte is a pointer to the smallest key in the next file that is larger
**   than the key in the current record. The pointer is encoded as a varint.
**   When added to the 32-bit page number stored in the footer, it is the 
**   page number of the page that contains the smallest key in the next sorted 
**   file that is larger than this key. 
**
**   Next is the number of bytes in the key, encoded as a varint.
**
**   If the record is a SORTED_WRITE, the number of bytes in the value, as
**   a varint, is next.
**
**   Finally, the blob of data containing the key, and for SORTED_WRITE
**   records, the value as well.
**
** SEPARATOR ARRAYS:
**
**   Each time a sorted run that spans more than one page is constructed, a
**   separators array is also constructed. A separators array contains normal
**   pages and b-tree pages. Both types of pages use the same format (as 
**   above).
**
**   The normal pages in the separators array contain a SORTED_SEPARATOR 
**   record with a copy of the first key on each page of the main array
**   except the leftmost. If a main array page contains no keys, then there
**   is no corresponding entry in the separators array.
*/

#ifndef _LSM_INT_H
# include "lsmInt.h"
#endif
#include "sqlite4.h"            /* only for sqlite4_snprintf() */
/* 
** Record types for user data.
*/
#define SORTED_SEPARATOR 0x01
#define SORTED_WRITE     0x02
#define SORTED_DELETE    0x03

#define SORTED_SYSTEM_DATA 0x10

/* 
** Record types for system data (e.g. free-block list, secondary storage
** management entries). These are the same as the user types with the
** most-significant bit set.
*/
#define SORTED_SYSTEM_SEPARATOR (SORTED_SYSTEM_DATA | SORTED_SEPARATOR)
#define SORTED_SYSTEM_WRITE     (SORTED_SYSTEM_DATA | SORTED_WRITE)
#define SORTED_SYSTEM_DELETE    (SORTED_SYSTEM_DATA | SORTED_DELETE)

/*
** Macros to help decode record types.
*/
#define rtTopic(eType)       ((eType) & 0xF0)
#define rtIsDelete(eType)    (((eType) & 0x0F)==SORTED_DELETE)

#define rtIsSeparator(eType) (((eType) & 0x0F)==SORTED_SEPARATOR)
#define rtIsWrite(eType)     (((eType) & 0x0F)==SORTED_WRITE)


/*
** The following macros are used to access a page footer.
*/
#define SEGMENT_NRECORD_OFFSET(pgsz)        ((pgsz) - 2)
#define SEGMENT_FLAGS_OFFSET(pgsz)          ((pgsz) - 2 - 2)
#define SEGMENT_POINTER_OFFSET(pgsz)        ((pgsz) - 2 - 2 - 4)
................................................................................
  /* Comparison results */
  int nTree;                      /* Size of aTree[] array */
  int *aTree;                     /* Array of comparison results */

  /* Used by cursors flushing the in-memory tree only */
  int *pnOvfl;                    /* Number of free-list entries to store */
  void *pSystemVal;               /* Pointer to buffer to free */



};

#define CURSOR_DATA_TREE0     0   /* Current tree cursor */
#define CURSOR_DATA_TREE1     1   /* The "old" tree, if any */
#define CURSOR_DATA_SYSTEM    2
#define CURSOR_DATA_SEGMENT   3

................................................................................
**
** CURSOR_PREV_OK
**   Set if it is Ok to call lsm_csr_prev().
**
** CURSOR_READ_SEPARATORS
**   Set if this cursor should visit the separator keys in segment 
**   aPtr[nPtr-1].





*/
#define CURSOR_IGNORE_DELETE    0x00000001
#define CURSOR_NEW_SYSTEM       0x00000002
#define CURSOR_AT_FREELIST      0x00000004
#define CURSOR_IGNORE_SYSTEM    0x00000010
#define CURSOR_NEXT_OK          0x00000020
#define CURSOR_PREV_OK          0x00000040
#define CURSOR_READ_SEPARATORS  0x00000080


typedef struct MergeWorker MergeWorker;
typedef struct Hierarchy Hierarchy;

struct Hierarchy {
  Page **apHier;
  int nHier;
................................................................................
    }
    if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;

    rc = pageGetBtreeKey(
        pCsr->aPg[iPg].pPage, iCell,
        &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
    );
    pCsr->eType |= SORTED_SEPARATOR;
  }

  return rc;
}

static int btreeCursorPtr(u8 *aData, int nData, int iCell){
  int nCell;
................................................................................
          if( pCsr->aPg[i].iCell>0 ) break;
        }
        assert( i>=0 );
        rc = pageGetBtreeKey(
            pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
            &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
        );
        pCsr->eType |= SORTED_SEPARATOR;

      }else{
        rc = btreeCursorLoadKey(pCsr);
      }
    }
  }
  return rc;
................................................................................

static int segmentPtrAdvance(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr,
  int bReverse
){
  int eDir = (bReverse ? -1 : 1);

  do {
    int rc;
    int iCell;                    /* Number of new cell in page */


    iCell = pPtr->iCell + eDir;
    assert( pPtr->pPg );
    assert( iCell<=pPtr->nCell && iCell>=-1 );






    if( iCell>=pPtr->nCell || iCell<0 ){
      do {
        rc = segmentPtrNextPage(pPtr, eDir); 
      }while( rc==LSM_OK 
           && pPtr->pPg 
           && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) ) 
      );
      if( rc!=LSM_OK ) return rc;
      iCell = bReverse ? (pPtr->nCell-1) : 0;
    }
    rc = segmentPtrLoadCell(pPtr, iCell);
    if( rc!=LSM_OK ) return rc;
















  }while( pCsr 
       && pPtr->pPg 
       && segmentPtrIgnoreSeparators(pCsr, pPtr)
       && rtIsSeparator(pPtr->eType)
  );

  return LSM_OK;
................................................................................
** points at either the first (bLast==0) or last (bLast==1) cell in the valid
** region of the segment defined by pPtr->iFirst and pPtr->iLast.
**
** Return LSM_OK if successful or an lsm error code if something goes
** wrong (IO error, OOM etc.).
*/
static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){

  int rc = LSM_OK;
  FileSystem *pFS = pCsr->pDb->pFS;
  int bIgnore;

  segmentPtrEndPage(pFS, pPtr, bLast, &rc);
  while( rc==LSM_OK && pPtr->pPg 
      && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
................................................................................
  }

  bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
  if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
    rc = segmentPtrAdvance(pCsr, pPtr, bLast);
  }














  return rc;
}

static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
  assert( pPtr->pPg );
  *ppKey = pPtr->pKey;
  *pnKey = pPtr->nKey;
................................................................................
    if( eSeek==LSM_SEEK_GE ) return (res<=0);
  }

  return 1;
}
#endif

int segmentPtrSeek(
  MultiCursor *pCsr,              /* Cursor context */
  SegmentPtr *pPtr,               /* Pointer to seek */
  void *pKey, int nKey,           /* Key to seek to */
  int eSeek,                      /* Search bias - see above */
  int *piPtr                      /* OUT: FC pointer */
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  int iPtrOut = 0;

  const int iTopic = 0;

  /* If the OVERSIZED flag is set, then there is no pointer in the
  ** upper level to the next page in the segment that contains at least
  ** one key. So compare the largest key on the current page with the
  ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance
  ** to the next page in the segment that contains at least one key. 
  */
................................................................................
        pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1
    );

    /* If the loaded key is >= than (pKey/nKey), break out of the loop.
    ** If (pKey/nKey) is present in this array, it must be on the current 
    ** page.  */
    res = sortedKeyCompare(
        xCmp, iLastTopic, pLastKey, nLastKey, iTopic, pKey, nKey
    );
    if( res>=0 ) break;

    /* Advance to the next page that contains at least one key. */
    pNext = pPtr->pPg;
    lsmFsPageRef(pNext);
    while( 1 ){
................................................................................
    if( pNext==0 ) break;
    segmentPtrSetPage(pPtr, pNext);

    /* This should probably be an LSM_CORRUPT error. */
    assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) );
  }








































































































































































  iPtrOut = pPtr->iPtr;

  /* Assert that this page is the right page of this segment for the key
  ** that we are searching for. Do this by loading page (iPg-1) and testing
  ** that pKey/nKey is greater than all keys on that page, and then by 
  ** loading (iPg+1) and testing that pKey/nKey is smaller than all
  ** the keys it houses.  */



#if 0
  assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
#endif

  assert( pPtr->nCell>0 
       || pPtr->pSeg->nSize==1 
       || lsmFsPageNumber(pPtr->pPg)==pPtr->pSeg->iLast
................................................................................
    }

    if( rc==LSM_OK ){
      assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
      if( res ){
        rc = segmentPtrLoadCell(pPtr, iMin);
      }


      if( rc==LSM_OK ){
        switch( eSeek ){
          case LSM_SEEK_EQ:
















            if( res!=0 || rtIsSeparator(pPtr->eType) ) segmentPtrReset(pPtr);
            break;

          case LSM_SEEK_LE:
            if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
            break;
          case LSM_SEEK_GE:







            if( res<0 ) rc = segmentPtrAdvance(pCsr, pPtr, 0);

            break;
        }
      }
    }


    /* If the cursor seek has found a separator key, and this cursor is
    ** supposed to ignore separators keys, advance to the next entry.  */
    if( rc==LSM_OK && pPtr->pPg 
     && segmentPtrIgnoreSeparators(pCsr, pPtr) 
     && rtIsSeparator(pPtr->eType)
    ){

      rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
    }
  }

  assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  *piPtr = iPtrOut;
  return rc;
................................................................................

static int seekInSegment(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr,
  void *pKey, int nKey,
  int iPg,                        /* Page to search */
  int eSeek,                      /* Search bias - see above */
  int *piPtr                      /* OUT: FC pointer */

){
  int iPtr = iPg;
  int rc = LSM_OK;

  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
................................................................................
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr);
  }
  return rc;
}

/*
** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].











*/
static int seekInLevel(
  MultiCursor *pCsr,              /* Sorted cursor object to seek */
  Level *pLvl,                    /* Level to seek within */
  SegmentPtr *aPtr,               /* Pointer to array of (nRight+1) SPs */
  int eSeek,                      /* Search bias - see above */
  void *pKey, int nKey,           /* Key to search for */
  Pgno *piPgno                    /* IN/OUT: fraction cascade pointer (or 0) */

){

  int rc = LSM_OK;                /* Return code */
  int iOut = 0;                   /* Pointer to return to caller */
  int res = -1;                   /* Result of xCmp(pKey, split) */
  int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */


  /* If this is a composite level (one currently undergoing an incremental
  ** merge), figure out if the search key is larger or smaller than the
  ** levels split-key.  */
  if( nRhs ){
    res = sortedKeyCompare(pCsr->pDb->xCmp, 0, pKey, nKey, 
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
................................................................................
  /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  ** is not a composite level and there is no split-key). Search the 
  ** left-hand-side of the level in this case.  */
  if( res<0 ){
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;

    rc = seekInSegment(pCsr, &aPtr[0], pKey, nKey, iPtr, eSeek, &iOut);
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      res = 0;
    }
  }
  
  if( res>=0 ){
    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs; i++){
      iOut = 0;
      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut);
      iPtr = iOut;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }


  *piPgno = iOut;

  return rc;
}

static void multiCursorGetKey(
  MultiCursor *pCsr, 
  int iKey,
  int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
................................................................................
    case CURSOR_DATA_TREE0:
    case CURSOR_DATA_TREE1: {
      TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
      if( lsmTreeCursorValid(pTreeCsr) ){
        int nVal;
        void *pVal;

        lsmTreeCursorKey(pTreeCsr, &pKey, &nKey);
        lsmTreeCursorValue(pTreeCsr, &pVal, &nVal);
        eType = (nVal<0) ? SORTED_DELETE : SORTED_WRITE;
      }
      break;
    }

    case CURSOR_DATA_SYSTEM:
      if( pCsr->flags & CURSOR_AT_FREELIST ){
        pKey = (void *)"FREELIST";
        nKey = 8;
        eType = SORTED_SYSTEM_WRITE;
      }
      break;

    default: {
      int iPtr = iKey - CURSOR_DATA_SEGMENT;
      assert( iPtr>=0 );
      if( iPtr==pCsr->nPtr ){
................................................................................
  }

  if( peType ) *peType = eType;
  if( pnKey ) *pnKey = nKey;
  if( ppKey ) *ppKey = pKey;
}
































static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
  int i1;
  int i2;
  int iRes;
  void *pKey1; int nKey1; int eType1;
  void *pKey2; int nKey2; int eType2;

  int mul = (bReverse ? -1 : 1);

  assert( pCsr->aTree && iOut<pCsr->nTree );
  if( iOut>=(pCsr->nTree/2) ){
    i1 = (iOut - pCsr->nTree/2) * 2;
    i2 = i1 + 1;
  }else{
    i1 = pCsr->aTree[iOut*2];
................................................................................
  if( pKey1==0 ){
    iRes = i2;
  }else if( pKey2==0 ){
    iRes = i1;
  }else{
    int res;

    res = (rtTopic(eType1) - rtTopic(eType2));
    if( res==0 ){
      res = pCsr->pDb->xCmp(pKey1, nKey1, pKey2, nKey2);

    }
    res = res * mul;

    if( res==0 ){
      iRes = (rtIsSeparator(eType1) ? i2 : i1);
    }else if( res<0 ){
      iRes = i1;
    }else{
      iRes = i2;
    }
  }

  pCsr->aTree[iOut] = iRes;
}

static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
  int rc;
  rc = segmentPtrEnd(pCsr, pPtr, 0);
  while( pPtr->pPg && rc==LSM_OK ){
    int res = sortedKeyCompare(pCsr->pDb->xCmp,
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
    );
    if( res<=0 ) break;
    rc = segmentPtrAdvance(pCsr, pPtr, 0);
  }
  return rc;
}

/*
** This function advances segment pointer iPtr belonging to multi-cursor
** pCsr forward (bReverse==0) or backward (bReverse!=0).
**
** If the segment pointer points to a segment that is part of a composite
** level, then the following special cases are handled.
**
**   * If iPtr is the lhs of a composite level, and the cursor is being
**     advanced forwards, and segment iPtr is at EOF, move all pointers
**     that correspond to rhs segments of the same level to the first
**     key in their respective data.
**
**   * If iPtr is on the rhs of a composite level, and the cursor is being
**     reversed, and the new key is smaller than the split-key, set the
**     segment pointer to point to EOF.
*/
static int segmentCursorAdvance(
  MultiCursor *pCsr, 
  int iPtr,
  int bReverse
){
  int rc;
................................................................................
  bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);

  if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
   && bReverse==0                                /* csr advanced forwards */
   && pPtr->pPg==0                               /* segment at EOF */
  ){
    int i;

    for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
      rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
    }

    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, 0);
    }
  }

  else if( pPtr->pPg && bComposite && bReverse && pPtr->pSeg!=&pLvl->lhs ){
    int res = sortedKeyCompare(pCsr->pDb->xCmp,
        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
    );
    if( res<0 ){
      segmentPtrReset(pPtr);
    }
  }

  return rc;
}

static void mcursorFreeComponents(MultiCursor *pCsr){
  int i;
  lsm_env *pEnv = pCsr->pDb->pEnv;

................................................................................
    rc = multiCursorAddAll(pCsr, pDb->pWorker);
    pCsr->flags |= CURSOR_IGNORE_DELETE;
  }
  
  if( rc==LSM_OK ){
    rc = lsmMCursorLast(pCsr);
    if( rc==LSM_OK 
     && pCsr->eType==SORTED_SYSTEM_WRITE 
     && pCsr->key.nData==8 
     && 0==memcmp(pCsr->key.pData, "FREELIST", 8)
    ){
      void *pVal; int nVal;         /* Value read from database */
      rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK ){
        *ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc);
................................................................................
  if( *pRc==LSM_OK ){
    void *pKey;
    int nKey;
    multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
    *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
  }
}






































static int multiCursorEnd(MultiCursor *pCsr, int bLast){
  int rc = LSM_OK;
  int i;

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);


  if( pCsr->apTreeCsr[0] ){
    rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast);
  }
  if( rc==LSM_OK && pCsr->apTreeCsr[1] ){
    rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast);
  }

................................................................................
  if( rc==LSM_OK ){
    rc = multiCursorAllocTree(pCsr);
  }
  if( rc==LSM_OK ){
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, bLast);
    }
    pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
  }

  multiCursorCacheKey(pCsr, &rc);
  if( rc==LSM_OK && (
        ((pCsr->flags & CURSOR_IGNORE_DELETE) && rtIsDelete(pCsr->eType))
     || ((pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(pCsr->eType))
  )){
    if( bLast ){
      rc = lsmMCursorPrev(pCsr);
    }else{
      rc = lsmMCursorNext(pCsr);
    }
  }

................................................................................
  lsmTreeCursorReset(pCsr->apTreeCsr[1]);
  for(i=0; i<pCsr->nPtr; i++){
    segmentPtrReset(&pCsr->aPtr[i]);
  }
  pCsr->key.nData = 0;
}

static void treeCursorSeek(

  TreeCursor *pTreeCsr, 
  void *pKey, int nKey, 
  int eSeek

){

  if( pTreeCsr ){
    int res = 0;
    lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
    switch( eSeek ){
      case LSM_SEEK_EQ:
        if( res!=0 ){
          lsmTreeCursorReset(pTreeCsr);














        }

        break;

      case LSM_SEEK_GE:
        if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
          lsmTreeCursorNext(pTreeCsr);
        }
        break;
      default:
        if( res>0 ){
          assert( lsmTreeCursorValid(pTreeCsr) );
          lsmTreeCursorPrev(pTreeCsr);
        }
        break;
    }
  }

}


static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
  int eType = pCsr->eType;

  if( ((pCsr->flags & CURSOR_IGNORE_DELETE) && rtIsDelete(eType) && !bDeleteOk)
   || ((pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(pCsr->eType)!=0)
  ){
    return 0;
  }

  return 1;
}

/*
** Seek the cursor.
*/
int lsmMCursorSeek(MultiCursor *pCsr, void *pKey, int nKey, int eSeek){
  int eESeek = eSeek;             /* Effective eSeek parameter */
  int rc = LSM_OK;
  int iPtr = 0;
  Pgno iPgno = 0; 
  Level *pLvl;

  if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
  assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );

  assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 );
  assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 );
  assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  treeCursorSeek(pCsr->apTreeCsr[0], pKey, nKey, eESeek);

  treeCursorSeek(pCsr->apTreeCsr[1], pKey, nKey, eESeek);


  /* Seek all segment pointers. */
  for(pLvl=pCsr->pDb->pClient->pLevel; rc==LSM_OK && pLvl; pLvl=pLvl->pNext){

    SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
    iPtr += (1+pLvl->nRight);
    assert( pPtr->pSeg==&pLvl->lhs );
    rc = seekInLevel(pCsr, pLvl, pPtr, eESeek, pKey, nKey, &iPgno);

  }


  if( rc==LSM_OK ){
    rc = multiCursorAllocTree(pCsr);
  }
  if( rc==LSM_OK ){
    int i;
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
    }
    if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
    if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
  }

  multiCursorCacheKey(pCsr, &rc);
  if( rc==LSM_OK && 0==mcursorLocationOk(pCsr, eSeek==LSM_SEEK_LEFAST) ){
    switch( eESeek ){
      case LSM_SEEK_EQ:
        lsmMCursorReset(pCsr);
        break;
      case LSM_SEEK_GE:
        rc = lsmMCursorNext(pCsr);
        break;
      default:
        rc = lsmMCursorPrev(pCsr);
        break;
    }
  }


  return rc;
}

int lsmMCursorValid(MultiCursor *pCsr){
  int res = 0;


  if( pCsr->aTree ){
    int iKey = pCsr->aTree[1];
    if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
      res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
    }else{
      void *pKey; 
      multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
      res = pKey!=0;
................................................................................

  /* Check the current key value. If it is not greater than (if bReverse==0)
  ** or less than (if bReverse!=0) the key currently cached in pCsr->key, 
  ** then the cursor has not yet been successfully advanced.  
  */
  multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew);
  if( pNew ){
    int res = rtTopic(eNewType) - rtTopic(pCsr->eType);
    if( res==0 ){
      res = pCsr->pDb->xCmp(pNew, nNew, pCsr->key.pData, pCsr->key.nData);
    }

    if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){
      return 0;
    }
  }

  multiCursorCacheKey(pCsr, pRc);
  assert( pCsr->eType==eNewType );

  /* If this cursor is configured to skip deleted keys, and the current
  ** cursor points to a SORTED_DELETE entry, then the cursor has not been 
  ** successfully advanced.  
  **
  ** Similarly, if the cursor is configured to skip system keys and the
  ** current cursor points to a system key, it has not yet been advanced.
  */
  if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;

  return 1;
}

static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
  int rc = LSM_OK;                /* Return Code */
  if( lsmMCursorValid(pCsr) ){
    do {
      int iKey = pCsr->aTree[1];
















      if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
        TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
        if( bReverse ){
          rc = lsmTreeCursorPrev(pTreeCsr);
        }else{
          rc = lsmTreeCursorNext(pTreeCsr);
        }
................................................................................
        pCsr->flags &= ~CURSOR_AT_FREELIST;
      }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
        assert( bReverse==0 && pCsr->pBtCsr );
        rc = btreeCursorNext(pCsr->pBtCsr);
      }else{
        rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
      }

      if( rc==LSM_OK ){
        int i;
        for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
          multiCursorDoCompare(pCsr, i, bReverse);
        }
      }
    }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
................................................................................

int lsmMCursorPrev(MultiCursor *pCsr){
  if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
  return multiCursorAdvance(pCsr, 1);
}

int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){




  int iKey = pCsr->aTree[1];

  if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
    TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
    lsmTreeCursorKey(pTreeCsr, ppKey, pnKey);
  }else{
    int nKey;

#ifndef NDEBUG
    void *pKey;
    int eType;
    multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
................................................................................
    if( nKey==0 ){
      *ppKey = 0;
    }else{
      *ppKey = pCsr->key.pData;
    }
    *pnKey = nKey; 
  }

  return LSM_OK;
}

int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
  void *pVal;
  int nVal;
  int rc;






  assert( pCsr->aTree );
  assert( rtIsDelete(pCsr->eType)==0 || !(pCsr->flags & CURSOR_IGNORE_DELETE) );

  rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
  if( pVal && rc==LSM_OK ){
    rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
    pVal = pCsr->val.pData;
  }

  if( rc!=LSM_OK ){
    pVal = 0;
    nVal = 0;
  }

  *ppVal = pVal;
  *pnVal = nVal;
  return rc;
}

int lsmMCursorType(MultiCursor *pCsr, int *peType){
  assert( pCsr->aTree );
................................................................................
  int nKey;
  int eType;

  nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
  iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]);
  eType = aData[iOff++];
  assert( eType==0 
       || eType==SORTED_SEPARATOR 
       || eType==SORTED_SYSTEM_SEPARATOR 
  );

  iOff += lsmVarintGet32(&aData[iOff], &nKey);
  iOff += lsmVarintGet32(&aData[iOff], &nKey);

  return iOff + (eType ? nKey : 0);
}
................................................................................
  lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1);

  if( bIndirect ){
    aData[iOff++] = 0x00;
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], iKeyPg);
  }else{
    aData[iOff++] = (u8)(iTopic | SORTED_SEPARATOR);
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  if( iLevel>0 ){
    int iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
................................................................................
  pSeg = &pMW->pLevel->lhs;

  pPg = pMW->pPage;
  aData = fsPageData(pPg, &nData);
  nRec = pageGetNRec(aData, nData);
  iFPtr = pageGetPtr(aData, nData);

  /* If iPtr is 0, set it to the same value as the absolute pointer 
  ** stored as part of the previous record.  */
  if( iPtr==0 ){
    iPtr = iFPtr;
    if( nRec ) iPtr += pageGetRecordPtr(aData, nData, nRec-1);
  }

  /* Calculate the relative pointer value to write to this record */
  iRPtr = iPtr - iFPtr;
  /* assert( iRPtr>=0 ); */
     
  /* Figure out how much space is required by the new record. The space
  ** required is divided into two sections: the header and the body. The
  ** header consists of the intial varint fields. The body are the blobs 
................................................................................
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
     ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = iFPtr + (nRec ? pageGetRecordPtr(aData, nData, nRec-1) : 0);
      iRPtr = iPtr - iFPtr;

      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;
    }
  }

................................................................................
      iFPtr = pageGetPtr(aData, nData);
      lsmFsPageRelease(pPg);
    }
  }

  if( rc==LSM_OK ){
    rc = mergeWorkerNextPage(pMW, iFPtr);

  }

  return rc;
}









































static int mergeWorkerStep(MergeWorker *pMW){
  lsm_db *pDb = pMW->pDb;       /* Database handle */
  MultiCursor *pCsr;            /* Cursor to read input data from */
  int rc = LSM_OK;              /* Return code */
  int eType;                    /* SORTED_SEPARATOR, WRITE or DELETE */
  void *pKey; int nKey;         /* Key */
  Segment *pSeg;                /* Output segment */
  int iPtr = 0;

  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
  lsmMCursorKey(pCsr, &pKey, &nKey);
  eType = pCsr->eType;
................................................................................

  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */

  if( pCsr->pBtCsr ){
    BtreeCursor *pBtCsr = pCsr->pBtCsr;
    if( pBtCsr->pKey ){
      int res = rtTopic(pBtCsr->eType) - rtTopic(eType);
      if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey);
      if( 0==res ) iPtr = pBtCsr->iPtr;

      assert( res>=0 );
    }
  }else if( pCsr->nPtr ){
    SegmentPtr *pPtr = &pCsr->aPtr[pCsr->nPtr-1];
    if( pPtr->pPg
     && 0==pDb->xCmp(pPtr->pKey, pPtr->nKey, pKey, nKey)
    ){
      iPtr = pPtr->iPtr+pPtr->iPgPtr;
    }
  }




  if( pMW->aGobble ){
    int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
    if( iGobble<pCsr->nPtr ){
      SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
      if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
        pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
      }
................................................................................
    }

    /* Write the record into the main run. */
    if( rc==LSM_OK ){
      rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr, &iSPtr);
    }
  }


  /* Advance the cursor to the next input record (assuming one exists). */
  assert( lsmMCursorValid(pMW->pCsr) );
  if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);

  /* If the cursor is at EOF, the merge is finished. Release all page
  ** references currently held by the merge worker and inform the 
................................................................................
  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int iLeftPtr = 0;
  int nWrite = 0;                 /* Number of database pages written */

  assert( pnOvfl );

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
................................................................................
    memset(&merge, 0, sizeof(Merge));
    memset(&mergeworker, 0, sizeof(MergeWorker));

    pNew->pMerge = &merge;
    mergeworker.pDb = pDb;
    mergeworker.pLevel = pNew;
    mergeworker.pCsr = pCsr;


    /* Mark the separators array for the new level as a "phantom". */
    mergeworker.bFlush = 1;

    /* Allocate the first page of the output segment. */
    rc = mergeWorkerNextPage(&mergeworker, iLeftPtr);

................................................................................
  }

  if( rc==LSM_OK ){
    sortedInvokeWorkHook(pDb);
  }

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

  if( pnWrite ) *pnWrite = nWrite;
  pDb->pWorker->nWrite += nWrite;
  return rc;
}

................................................................................
  pMW->pCsr = pCsr;

  /* Load the current output page into memory. */
  if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW);

  /* Position the cursor. */
  if( rc==LSM_OK ){

    if( pMW->pPage==0 ){
      /* The output array is still empty. So position the cursor at the very 
      ** start of the input.  */
      rc = multiCursorEnd(pCsr, 0);
    }else{
      /* The output array is non-empty. Position the cursor based on the
      ** page/cell data saved in the Merge.aInput[] array.  */
................................................................................
            SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
            if( pGobble->pSeg->iRoot ){
              rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
            }else if( mergeworker.aGobble[i] ){
              lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
            }
          }
#if 0
          int iGobble = mergeworker.pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
          if( iGobble<pLevel->nRight ){
            SegmentPtr *pGobble = &mergeworker.pCsr->aSegCsr[iGobble].aPtr[0];
            if( pGobble->pSeg->iRoot ){
              rc = sortedBtreeGobble(pDb, mergeworker.pCsr, iGobble);
            }else if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
              Pgno iPg = lsmFsPageNumber(pGobble->pPg);
              lsmFsGobble(pDb, pGobble->pSeg, &iPg, 1);
            }
          }
#endif

        }else if( pLevel->lhs.iFirst==0 ){
          /* If the new level is completely empty, remove it from the 
          ** database snapshot. This can only happen if all input keys were
          ** annihilated. Since keys are only annihilated if the new level
          ** is the last in the linked list (contains the most ancient of
          ** database content), this guarantees that pLevel->pNext==0.  */ 

................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);






      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */
      if( bFlush && sortedDbIsFull(pDb)==0 ) break;

      assertRunInOrder(pDb, &pLevel->lhs);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
    }
  }

  if( pnWrite ) *pnWrite = (nWork - nRemaining);
  pWorker->nWrite += (nWork - nRemaining);

#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
#endif
  return rc;
}

typedef struct Metric Metric;
struct Metric {
  double fAvgHeight;
  int nTotalSz;
  int nMinSz;
};

#if 0
static void sortedMeasureDb(lsm_db *pDb, Metric *p){
  Level *pLevel;                  /* Used to iterate through levels */
  assert( pDb->pWorker );

  double fHeight = 0.0;
  int nTotalSz = 0;
  int nMinSz = 0;

  for(pLevel=lsmDbSnapshotLevel(pDb->pWorker); pLevel; pLevel=pLevel->pNext){
    const int nLhsSz = pLevel->lhs.run.nSize;
    int nRhsSz = 0;
    int nSz;
    int i;

    for(i=0; i<pLevel->nRight; i++){
      nRhsSz += pLevel->aRhs[i].run.nSize;
    }

    nSz = nRhsSz + nLhsSz;
    fHeight += (double)nLhsSz/nSz + (double)nRhsSz/nSz * pLevel->nRight;
    nTotalSz += nSz;

    if( nMinSz==0 || nMinSz>nSz ) nMinSz = nSz;
  }

  if( nMinSz ){
    printf("avg-height=%.2f log2(min/total)=%.2f totalsz=%d minsz=%d\n", 
        fHeight, 
        log((double)nTotalSz / nMinSz) / log(2),
        nTotalSz,
        nMinSz
    );
  }
}
#endif

/*
** The database connection passed as the first argument must be a worker
** connection. This function checks if there exists an "old" in-memory tree
** ready to be flushed to disk. If so, *pbOut is set to true before 
** returning. Otherwise false.
**
** Normally, LSM_OK is returned. Or, if an error occurs, an LSM error code.
................................................................................
    }else{
      lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.');
    }
  }
  return LSM_OK;
}

int lsmInfoPageDump(lsm_db *pDb, Pgno iPg, int bHex, char **pzOut){









  int rc = LSM_OK;                /* Return code */
  Page *pPg = 0;                  /* Handle for page iPg */
  int i, j;                       /* Loop counters */
  const int perLine = 16;         /* Bytes per line in the raw hex dump */





  *pzOut = 0;
  if( iPg==0 ) return LSM_ERROR;

  rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg);
  if( rc==LSM_OK ){
    Blob blob = {0, 0, 0, 0};
................................................................................
    for(iCell=0; iCell<nRec; iCell++){
      u8 *aKey; int nKey = 0;       /* Key */
      u8 *aVal; int nVal = 0;       /* Value */
      int iPgPtr;
      int eType;
      char cType = '?';
      Pgno iAbsPtr;


      infoCellDump(pDb, pPg, iCell, &eType, &iPgPtr,
          &aKey, &nKey, &aVal, &nVal, &blob
      );
      iAbsPtr = iPgPtr + ((flags & SEGMENT_BTREE_FLAG) ? 0 : iPtr);

      if( rtIsDelete(eType) ) cType = 'D';
      if( rtIsWrite(eType) ) cType = 'W';
      if( rtIsSeparator(eType) ) cType = 'S';
      lsmStringAppendf(&str, "%c %d (%s) ", 
          cType, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
      );
      infoAppendBlob(&str, bHex, aKey, nKey); 
      if( nVal>0 ){
        lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), "");
        lsmStringAppendf(&str, " ");
        infoAppendBlob(&str, bHex, aVal, nVal); 
      }
      lsmStringAppendf(&str, "\n");
    }


    lsmStringAppendf(&str, "\n-------------------" 
       "-------------------------------------------------------------\n");
    lsmStringAppendf(&str, "Page %d\n",
                     iPg, (iPg-1)*nData, iPg*nData - 1);
    for(i=0; i<nData; i += perLine){
      lsmStringAppendf(&str, "%04x: ", i);
      for(j=0; j<perLine; j++){
................................................................................
          lsmStringAppendf(&str, " ");
        }else{
          lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.');
        }
      }
      lsmStringAppendf(&str,"\n");
    }


    *pzOut = str.z;
    sortedBlobFree(&blob);
    lsmFsPageRelease(pPg);
  }

  return rc;
}












void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){
  assert( pDb->xLog );
  if( pRun && pRun->iFirst ){

    char *zSeg;
    Page *pPg;

    zSeg = segToString(pDb->pEnv, pRun, 0);
    lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
    lsmFree(pDb->pEnv, zSeg);

    lsmFsDbPageGet(pDb->pFS, pRun->iFirst, &pPg);
    while( pPg ){
      Page *pNext;





      sortedDumpPage(pDb, pRun, pPg, bVals);

      lsmFsDbPageNext(pRun, pPg, 1, &pNext);
      lsmFsPageRelease(pPg);
      pPg = pNext;
    }
  }
}








|
|

|
|
|
|
|
>

<
|
|
|
|
|



|


|

<
<
<
<
<
<
<
<
<
<
<
<






<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<




|
|
>
|
|
>







 







>
>
>







 







>
>
>
>
>








>







 







|







 







|







 







>



>




>
>
>
>
>













>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>







 







>
>
>
>
>
>
>
>
>
>
>
>
>







 







|


|
<
<


<

<
<
<
<
<







 







|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>






|
>
>
>







 







>



|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|

>



|
>
>
>
>
>
>
>
|
>




>







>







 







|
>







 







|






>
>
>
>
>
>
>
>
>
>
>



<
|


|
>

>




>







 







|








|

|








>

>







 







|

<








|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







<
|







 







|
|
|
>
|

<












<
<
<
<
<
<
<
<
<
<
<
<
<
<





|





<
<
<
<







 







<



<





<
<
<
<
<
<
<
<
<
<







 







|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>






>
>







 







<



|
<
<
<







 







|
>


|
>

>




|
<
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>

>

>













>



<
<
<
<
<
<
<
<
<
<
<
<





|
|
|
|
|
|
|
|




|
|
>
|
|
>

<
>

<
|
|
>


>













|












>






>
>
|







 







|
<
|
<
>



<












>








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







<







 







>
>
>
>




|







 







>







>
>
>
>
>

|
|











>







 







|
|







 







|







 







<
<
<
<
<
<
<







 







|

>







 







>




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>








|







 







>






<











>
>
>







 







>







 







|







 







>







 







|







 







>







 







<
<
<
<
<
<
<
<
<
<
<
<
<







 







>
>
>
>
>




<
<
<
<
<
<












<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|
>
>
>
>
>
>
>
>
>




>
>
>
>







 







>






|
<
<
|
|


|







>







 







>








>
>
>
>
>
>
>
>
>
>
>




>










>
>
>
>
>

>







38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64
65
66
67












68
69
70
71
72
73

















74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
...
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
...
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
...
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
...
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
....
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
....
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
....
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
....
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320


1321
1322

1323





1324
1325
1326
1327
1328
1329
1330
....
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
....
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
....
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
....
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
....
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788

1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
....
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
....
1856
1857
1858
1859
1860
1861
1862
1863
1864

1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
....
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939

1940
1941
1942
1943
1944
1945
1946
1947
....
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966

1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978














1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989




1990
1991
1992
1993
1994
1995
1996
....
2003
2004
2005
2006
2007
2008
2009

2010
2011
2012

2013
2014
2015
2016
2017










2018
2019
2020
2021
2022
2023
2024
....
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
....
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
....
2435
2436
2437
2438
2439
2440
2441

2442
2443
2444
2445



2446
2447
2448
2449
2450
2451
2452
....
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532

2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568












2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592

2593
2594

2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
....
2659
2660
2661
2662
2663
2664
2665
2666

2667

2668
2669
2670
2671

2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
....
2720
2721
2722
2723
2724
2725
2726

2727
2728
2729
2730
2731
2732
2733
....
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
....
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
....
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
....
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
....
3298
3299
3300
3301
3302
3303
3304







3305
3306
3307
3308
3309
3310
3311
....
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
....
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525
3526
3527
3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
3549
3550
3551
3552
3553
3554
3555
3556
3557
3558
3559
3560
3561
3562
3563
3564
3565
3566
3567
3568
....
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582

3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
3599
3600
3601
3602
3603
....
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
....
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
....
3717
3718
3719
3720
3721
3722
3723
3724
3725
3726
3727
3728
3729
3730
3731
....
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
....
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
3930
3931
....
4123
4124
4125
4126
4127
4128
4129













4130
4131
4132
4133
4134
4135
4136
....
4172
4173
4174
4175
4176
4177
4178
4179
4180
4181
4182
4183
4184
4185
4186
4187






4188
4189
4190
4191
4192
4193
4194
4195
4196
4197
4198
4199












































4200
4201
4202
4203
4204
4205
4206
....
4633
4634
4635
4636
4637
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
4650
4651
4652
4653
4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
....
4692
4693
4694
4695
4696
4697
4698
4699
4700
4701
4702
4703
4704
4705
4706


4707
4708
4709
4710
4711
4712
4713
4714
4715
4716
4717
4718
4719
4720
4721
4722
4723
4724
4725
4726
....
4736
4737
4738
4739
4740
4741
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759
4760
4761
4762
4763
4764
4765
4766
4767
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
**   (N==0). And on most pages the first record that starts on the page will
**   not start at byte offset 0. For example:
**
**      aaaaa bbbbb ccc <footer>    cc eeeee fffff g <footer>    gggg....
**
** RECORD FORMAT:
** 
**   The first byte of the record is a flags byte. It is a combination
**   of the following flags (defined in lsmInt.h):
**
**       LSM_START_DELETE
**       LSM_END_DELETE 
**       LSM_POINT_DELETE
**       LSM_INSERT    
**       LSM_SEPARATOR
**       LSM_SYSTEMKEY
**

**   Immediately following the type byte is a pointer to the smallest key 
**   in the next file that is larger than the key in the current record. The 
**   pointer is encoded as a varint. When added to the 32-bit page number 
**   stored in the footer, it is the page number of the page that contains the
**   smallest key in the next sorted file that is larger than this key. 
**
**   Next is the number of bytes in the key, encoded as a varint.
**
**   If the LSM_INSERT flag is set, the number of bytes in the value, as
**   a varint, is next.
**
**   Finally, the blob of data containing the key, and for LSM_INSERT
**   records, the value as well.












*/

#ifndef _LSM_INT_H
# include "lsmInt.h"
#endif
#include "sqlite4.h"            /* only for sqlite4_snprintf() */


















/*
** Macros to help decode record types.
*/
#define rtTopic(eType)       ((eType) & LSM_SYSTEMKEY)
#define rtIsDelete(eType)    (((eType) & 0x0F)==LSM_POINT_DELETE)

#define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0)
#define rtIsWrite(eType)     (((eType) & LSM_INSERT)!=0)
#define rtIsSystem(eType)    (((eType) & LSM_SYSTEMKEY)!=0)

/*
** The following macros are used to access a page footer.
*/
#define SEGMENT_NRECORD_OFFSET(pgsz)        ((pgsz) - 2)
#define SEGMENT_FLAGS_OFFSET(pgsz)          ((pgsz) - 2 - 2)
#define SEGMENT_POINTER_OFFSET(pgsz)        ((pgsz) - 2 - 2 - 4)
................................................................................
  /* Comparison results */
  int nTree;                      /* Size of aTree[] array */
  int *aTree;                     /* Array of comparison results */

  /* Used by cursors flushing the in-memory tree only */
  int *pnOvfl;                    /* Number of free-list entries to store */
  void *pSystemVal;               /* Pointer to buffer to free */

  /* Used by worker cursors only */
  Pgno *pPrevMergePtr;
};

#define CURSOR_DATA_TREE0     0   /* Current tree cursor */
#define CURSOR_DATA_TREE1     1   /* The "old" tree, if any */
#define CURSOR_DATA_SYSTEM    2
#define CURSOR_DATA_SEGMENT   3

................................................................................
**
** CURSOR_PREV_OK
**   Set if it is Ok to call lsm_csr_prev().
**
** CURSOR_READ_SEPARATORS
**   Set if this cursor should visit the separator keys in segment 
**   aPtr[nPtr-1].
**
** CURSOR_SEEK_EQ
**   Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation.
**   The key and value are stored in MultiCursor.key and MultiCursor.val
**   respectively.
*/
#define CURSOR_IGNORE_DELETE    0x00000001
#define CURSOR_NEW_SYSTEM       0x00000002
#define CURSOR_AT_FREELIST      0x00000004
#define CURSOR_IGNORE_SYSTEM    0x00000010
#define CURSOR_NEXT_OK          0x00000020
#define CURSOR_PREV_OK          0x00000040
#define CURSOR_READ_SEPARATORS  0x00000080
#define CURSOR_SEEK_EQ          0x00000100

typedef struct MergeWorker MergeWorker;
typedef struct Hierarchy Hierarchy;

struct Hierarchy {
  Page **apHier;
  int nHier;
................................................................................
    }
    if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;

    rc = pageGetBtreeKey(
        pCsr->aPg[iPg].pPage, iCell,
        &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
    );
    pCsr->eType |= LSM_SEPARATOR;
  }

  return rc;
}

static int btreeCursorPtr(u8 *aData, int nData, int iCell){
  int nCell;
................................................................................
          if( pCsr->aPg[i].iCell>0 ) break;
        }
        assert( i>=0 );
        rc = pageGetBtreeKey(
            pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
            &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
        );
        pCsr->eType |= LSM_SEPARATOR;

      }else{
        rc = btreeCursorLoadKey(pCsr);
      }
    }
  }
  return rc;
................................................................................

static int segmentPtrAdvance(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr,
  int bReverse
){
  int eDir = (bReverse ? -1 : 1);
  Level *pLvl = pPtr->pLevel;
  do {
    int rc;
    int iCell;                    /* Number of new cell in page */
    int svFlags = 0;              /* SegmentPtr.eType before advance */

    iCell = pPtr->iCell + eDir;
    assert( pPtr->pPg );
    assert( iCell<=pPtr->nCell && iCell>=-1 );

    if( bReverse && pPtr->pSeg!=&pPtr->pLevel->lhs ){
      svFlags = pPtr->eType;
      assert( svFlags );
    }

    if( iCell>=pPtr->nCell || iCell<0 ){
      do {
        rc = segmentPtrNextPage(pPtr, eDir); 
      }while( rc==LSM_OK 
           && pPtr->pPg 
           && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG) ) 
      );
      if( rc!=LSM_OK ) return rc;
      iCell = bReverse ? (pPtr->nCell-1) : 0;
    }
    rc = segmentPtrLoadCell(pPtr, iCell);
    if( rc!=LSM_OK ) return rc;

    if( svFlags && pPtr->pPg ){
      int res = sortedKeyCompare(pCsr->pDb->xCmp,
          rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
          pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
      );
      if( res<0 ) segmentPtrReset(pPtr);
    }
    if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
      rc = lsmFsDbPageGet(pCsr->pDb->pFS, pPtr->pSeg->iFirst, &pPtr->pPg);
      if( rc!=LSM_OK ) return rc;
      pPtr->eType = LSM_START_DELETE | (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
      pPtr->pKey = pLvl->pSplitKey;
      pPtr->nKey = pLvl->nSplitKey;
    }

  }while( pCsr 
       && pPtr->pPg 
       && segmentPtrIgnoreSeparators(pCsr, pPtr)
       && rtIsSeparator(pPtr->eType)
  );

  return LSM_OK;
................................................................................
** points at either the first (bLast==0) or last (bLast==1) cell in the valid
** region of the segment defined by pPtr->iFirst and pPtr->iLast.
**
** Return LSM_OK if successful or an lsm error code if something goes
** wrong (IO error, OOM etc.).
*/
static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){
  Level *pLvl = pPtr->pLevel;
  int rc = LSM_OK;
  FileSystem *pFS = pCsr->pDb->pFS;
  int bIgnore;

  segmentPtrEndPage(pFS, pPtr, bLast, &rc);
  while( rc==LSM_OK && pPtr->pPg 
      && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
................................................................................
  }
  
  bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
  if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
    rc = segmentPtrAdvance(pCsr, pPtr, bLast);
  }


  if( bLast && rc==LSM_OK && pPtr->pPg
   && pPtr->pSeg==&pLvl->lhs 
   && pLvl->nRight && (pPtr->eType & LSM_START_DELETE)
  ){
    pPtr->iCell++;
    pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic);
    pPtr->pKey = pLvl->pSplitKey;
    pPtr->nKey = pLvl->nSplitKey;
    pPtr->pVal = 0;
    pPtr->nVal = 0;
  }

  return rc;
}

static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
  assert( pPtr->pPg );
  *ppKey = pPtr->pKey;
  *pnKey = pPtr->nKey;
................................................................................
    if( eSeek==LSM_SEEK_GE ) return (res<=0);
  }

  return 1;
}
#endif

static int segmentPtrSearchOversized(
  MultiCursor *pCsr,              /* Cursor context */
  SegmentPtr *pPtr,               /* Pointer to seek */
  void *pKey, int nKey            /* Key to seek to */


){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;

  int rc = LSM_OK;






  /* If the OVERSIZED flag is set, then there is no pointer in the
  ** upper level to the next page in the segment that contains at least
  ** one key. So compare the largest key on the current page with the
  ** key being sought (pKey/nKey). If (pKey/nKey) is larger, advance
  ** to the next page in the segment that contains at least one key. 
  */
................................................................................
        pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1
    );

    /* If the loaded key is >= than (pKey/nKey), break out of the loop.
    ** If (pKey/nKey) is present in this array, it must be on the current 
    ** page.  */
    res = sortedKeyCompare(
        xCmp, iLastTopic, pLastKey, nLastKey, 0, pKey, nKey
    );
    if( res>=0 ) break;

    /* Advance to the next page that contains at least one key. */
    pNext = pPtr->pPg;
    lsmFsPageRef(pNext);
    while( 1 ){
................................................................................
    if( pNext==0 ) break;
    segmentPtrSetPage(pPtr, pNext);

    /* This should probably be an LSM_CORRUPT error. */
    assert( rc!=LSM_OK || (pPtr->flags & PGFTR_SKIP_THIS_FLAG) );
  }

  return rc;
}

static int ptrFwdPointer(
  Page *pPage,
  int iCell,
  Segment *pSeg,
  Pgno *piPtr,
  int *pbFound
){
  Page *pPg = pPage;
  int iFirst = iCell;
  int rc = LSM_OK;

  do {
    Page *pNext = 0;
    u8 *aData;
    int nData;

    aData = lsmFsPageData(pPg, &nData);
    if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ){
      int i;
      int nCell = pageGetNRec(aData, nData);
      for(i=iFirst; i<nCell; i++){
        u8 eType = *pageGetCell(aData, nData, i);
        if( (eType & LSM_START_DELETE)==0 ){
          *pbFound = 1;
          *piPtr = pageGetRecordPtr(aData, nData, i) + pageGetPtr(aData, nData);
          lsmFsPageRelease(pPg);
          return LSM_OK;
        }
      }
    }

    rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
    lsmFsPageRelease(pPg);
    pPg = pNext;
    iFirst = 0;
  }while( pPg && rc==LSM_OK );
  lsmFsPageRelease(pPg);

  *pbFound = 0;
  return rc;
}

static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
  int rc;
  rc = segmentPtrEnd(pCsr, pPtr, 0);
  while( pPtr->pPg && rc==LSM_OK ){
    int res = sortedKeyCompare(pCsr->pDb->xCmp,
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
    );
    if( res<=0 ) break;
    rc = segmentPtrAdvance(pCsr, pPtr, 0);
  }
  return rc;
}


/*
** This function is called as part of a SEEK_GE op on a multi-cursor if the 
** FC pointer read from segment *pPtr comes from an entry with the 
** LSM_START_DELETE flag set. In this case the pointer value cannot be 
** trusted. Instead, the pointer that should be followed is that associated
** with the next entry in *pPtr that does not have LSM_START_DELETE set.
**
** Why the pointers can't be trusted:
**
**
**
** TODO: This is a stop-gap solution:
** 
**   At the moment, this function is called from within segmentPtrSeek(), 
**   as part of the initial lsmMCursorSeek() call. However, consider a 
**   database where the following has occurred:
**
**      1. A range delete removes keys 1..9999 using a range delete.
**      2. Keys 1 through 9999 are reinserted.
**      3. The levels containing the ops in 1. and 2. above are merged. Call
**         this level N. Level N contains FC pointers to level N+1.
**
**   Then, if the user attempts to query for (key>=2 LIMIT 10), the 
**   lsmMCursorSeek() call will iterate through 9998 entries searching for a 
**   pointer down to the level N+1 that is never actually used. It would be
**   much better if the multi-cursor could do this lazily - only seek to the
**   level (N+1) page after the user has moved the cursor on level N passed
**   the big range-delete.
*/
static int segmentPtrFwdPointer(
  MultiCursor *pCsr,              /* Multi-cursor pPtr belongs to */
  SegmentPtr *pPtr,               /* Segment-pointer to extract FC ptr from */
  Pgno *piPtr                     /* OUT: FC pointer value */
){
  Level *pLvl = pPtr->pLevel;
  Level *pNext = pLvl->pNext;
  Page *pPg = pPtr->pPg;
  int rc;
  int bFound;
  Pgno iOut = 0;

  if( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[pLvl->nRight-1] ){
    if( pNext==0 
        || (pNext->nRight==0 && pNext->lhs.iRoot)
        || (pNext->nRight!=0 && pNext->aRhs[0].iRoot)
      ){
      /* Do nothing. The pointer will not be used anyway. */
      return LSM_OK;
    }
  }else{
    if( pPtr[1].pSeg->iRoot ){
      return LSM_OK;
    }
  }

  /* Search for a pointer within the current segment. */
  lsmFsPageRef(pPg);
  rc = ptrFwdPointer(pPg, pPtr->iCell, pPtr->pSeg, &iOut, &bFound);

  if( rc==LSM_OK && bFound==0 ){
    /* This case happens when pPtr points to the left-hand-side of a segment
    ** currently undergoing an incremental merge. In this case, jump to the
    ** oldest segment in the right-hand-side of the same level and continue
    ** searching. But - do not consider any keys smaller than the levels
    ** split-key. */
    SegmentPtr ptr;

    if( pPtr->pLevel->nRight==0 || pPtr->pSeg!=&pPtr->pLevel->lhs ){
      return LSM_CORRUPT_BKPT;
    }

    memset(&ptr, 0, sizeof(SegmentPtr));
    ptr.pLevel = pPtr->pLevel;
    ptr.pSeg = &ptr.pLevel->aRhs[ptr.pLevel->nRight-1];
    rc = sortedRhsFirst(pCsr, ptr.pLevel, &ptr);
    if( rc==LSM_OK ){
      rc = ptrFwdPointer(ptr.pPg, ptr.iCell, ptr.pSeg, &iOut, &bFound);
      ptr.pPg = 0;
    }
    segmentPtrReset(&ptr);
  }

  *piPtr = iOut;
  return rc;
}

static int segmentPtrSeek(
  MultiCursor *pCsr,              /* Cursor context */
  SegmentPtr *pPtr,               /* Pointer to seek */
  void *pKey, int nKey,           /* Key to seek to */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  int iPtrOut = 0;
  const int iTopic = 0;

  /* If the current page contains an oversized entry, then there are no
  ** pointers to one or more of the subsequent pages in the sorted run.
  ** The following call ensures that the segment-ptr points to the correct 
  ** page in this case.  */
  rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey);
  iPtrOut = pPtr->iPtr;

  /* Assert that this page is the right page of this segment for the key
  ** that we are searching for. Do this by loading page (iPg-1) and testing
  ** that pKey/nKey is greater than all keys on that page, and then by 
  ** loading (iPg+1) and testing that pKey/nKey is smaller than all
  ** the keys it houses.  
  **
  ** TODO: With range-deletes in the tree, the test described above may fail.
  */
#if 0
  assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
#endif

  assert( pPtr->nCell>0 
       || pPtr->pSeg->nSize==1 
       || lsmFsPageNumber(pPtr->pPg)==pPtr->pSeg->iLast
................................................................................
    }

    if( rc==LSM_OK ){
      assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
      if( res ){
        rc = segmentPtrLoadCell(pPtr, iMin);
      }
      assert( rc!=LSM_OK || res>0 || iPtrOut==(pPtr->iPtr + pPtr->iPgPtr) );

      if( rc==LSM_OK ){
        switch( eSeek ){
          case LSM_SEEK_EQ: {
            int eType = pPtr->eType;
            if( (res<0 && (eType & LSM_START_DELETE))
             || (res>0 && (eType & LSM_END_DELETE))
             || (res==0 && (eType & LSM_POINT_DELETE))
            ){
              *pbStop = 1;
            }else if( res==0 && (eType & LSM_INSERT) ){
              lsm_env *pEnv = pCsr->pDb->pEnv;
              *pbStop = 1;
              pCsr->eType = pPtr->eType;
              rc = sortedBlobSet(pEnv, &pCsr->key, pPtr->pKey, pPtr->nKey);
              if( rc==LSM_OK ){
                rc = sortedBlobSet(pEnv, &pCsr->val, pPtr->pVal, pPtr->nVal);
              }
              pCsr->flags |= CURSOR_SEEK_EQ;
            }
            segmentPtrReset(pPtr);
            break;
          }
          case LSM_SEEK_LE:
            if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
            break;
          case LSM_SEEK_GE: {
            /* Figure out if we need to 'skip' the pointer forward or not */
            if( (res<=0 && (pPtr->eType & LSM_START_DELETE)) 
             || (res>0  && (pPtr->eType & LSM_END_DELETE)) 
            ){
              rc = segmentPtrFwdPointer(pCsr, pPtr, &iPtrOut);
            }
            if( res<0 && rc==LSM_OK ){
              rc = segmentPtrAdvance(pCsr, pPtr, 0);
            }
            break;
          }
        }
      }
    }

    /* If the cursor seek has found a separator key, and this cursor is
    ** supposed to ignore separators keys, advance to the next entry.  */
    if( rc==LSM_OK && pPtr->pPg
     && segmentPtrIgnoreSeparators(pCsr, pPtr) 
     && rtIsSeparator(pPtr->eType)
    ){
      assert( eSeek!=LSM_SEEK_EQ );
      rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
    }
  }

  assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  *piPtr = iPtrOut;
  return rc;
................................................................................

static int seekInSegment(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr,
  void *pKey, int nKey,
  int iPg,                        /* Page to search */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop                     /* OUT: Stop search flag */
){
  int iPtr = iPg;
  int rc = LSM_OK;

  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
................................................................................
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr, pbStop);
  }
  return rc;
}

/*
** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
**
** pbStop:
**   This parameter is only significant if parameter eSeek is set to
**   LSM_SEEK_EQ. In this case, it is set to true before returning if
**   the seek operation is finished. This can happen in two ways:
**   
**     a) A key matching (pKey/nKey) is found, or
**     b) A point-delete or range-delete deleting the key is found.
**
**   In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
**   and pCsr->val blobs populated before returning.
*/
static int seekInLevel(
  MultiCursor *pCsr,              /* Sorted cursor object to seek */

  SegmentPtr *aPtr,               /* Pointer to array of (nRhs+1) SPs */
  int eSeek,                      /* Search bias - see above */
  void *pKey, int nKey,           /* Key to search for */
  Pgno *piPgno,                   /* IN/OUT: fraction cascade pointer (or 0) */
  int *pbStop                     /* OUT: See above */
){
  Level *pLvl = aPtr[0].pLevel;   /* Level to seek within */
  int rc = LSM_OK;                /* Return code */
  int iOut = 0;                   /* Pointer to return to caller */
  int res = -1;                   /* Result of xCmp(pKey, split) */
  int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */
  int bStop = 0;

  /* If this is a composite level (one currently undergoing an incremental
  ** merge), figure out if the search key is larger or smaller than the
  ** levels split-key.  */
  if( nRhs ){
    res = sortedKeyCompare(pCsr->pDb->xCmp, 0, pKey, nKey, 
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
................................................................................
  /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  ** is not a composite level and there is no split-key). Search the 
  ** left-hand-side of the level in this case.  */
  if( res<0 ){
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;

    rc = seekInSegment(pCsr, &aPtr[0], pKey, nKey, iPtr, eSeek, &iOut, &bStop);
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      res = 0;
    }
  }
  
  if( res>=0 ){
    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
      iOut = 0;
      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut,&bStop);
      iPtr = iOut;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }

  assert( eSeek==LSM_SEEK_EQ || bStop==0 );
  *piPgno = iOut;
  *pbStop = bStop;
  return rc;
}

static void multiCursorGetKey(
  MultiCursor *pCsr, 
  int iKey,
  int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
................................................................................
    case CURSOR_DATA_TREE0:
    case CURSOR_DATA_TREE1: {
      TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
      if( lsmTreeCursorValid(pTreeCsr) ){
        int nVal;
        void *pVal;

        lsmTreeCursorKey(pTreeCsr, &eType, &pKey, &nKey);
        lsmTreeCursorValue(pTreeCsr, &pVal, &nVal);

      }
      break;
    }

    case CURSOR_DATA_SYSTEM:
      if( pCsr->flags & CURSOR_AT_FREELIST ){
        pKey = (void *)"FREELIST";
        nKey = 8;
        eType = LSM_SYSTEMKEY | LSM_INSERT;
      }
      break;

    default: {
      int iPtr = iKey - CURSOR_DATA_SEGMENT;
      assert( iPtr>=0 );
      if( iPtr==pCsr->nPtr ){
................................................................................
  }

  if( peType ) *peType = eType;
  if( pnKey ) *pnKey = nKey;
  if( ppKey ) *ppKey = pKey;
}

static int sortedDbKeyCompare(
  int (*xCmp)(void *, int, void *, int),
  int iLhsFlags, void *pLhsKey, int nLhsKey,
  int iRhsFlags, void *pRhsKey, int nRhsKey
){
  int res;

  /* Compare the keys, including the system flag. */
  res = sortedKeyCompare(xCmp, 
    rtTopic(iLhsFlags), pLhsKey, nLhsKey,
    rtTopic(iRhsFlags), pRhsKey, nRhsKey
  );

  /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or
  ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents
  ** the beginning of an open-ended set from masking a database entry or
  ** delete at a lower level.  */
  if( res==0 ){
    const int insdel = LSM_POINT_DELETE|LSM_INSERT;
    int iDel1 = 0;
    int iDel2 = 0;
    if( LSM_START_DELETE==(iLhsFlags & (LSM_START_DELETE|insdel)) ) iDel1 = +1;
    if( LSM_END_DELETE  ==(iLhsFlags & (LSM_END_DELETE  |insdel)) ) iDel1 = -1;
    if( LSM_START_DELETE==(iRhsFlags & (LSM_START_DELETE|insdel)) ) iDel2 = +1;
    if( LSM_END_DELETE  ==(iRhsFlags & (LSM_END_DELETE  |insdel)) ) iDel2 = -1;
    res = (iDel1 - iDel2);
  }

  return res;
}

static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
  int i1;
  int i2;
  int iRes;
  void *pKey1; int nKey1; int eType1;
  void *pKey2; int nKey2; int eType2;

  const int mul = (bReverse ? -1 : 1);

  assert( pCsr->aTree && iOut<pCsr->nTree );
  if( iOut>=(pCsr->nTree/2) ){
    i1 = (iOut - pCsr->nTree/2) * 2;
    i2 = i1 + 1;
  }else{
    i1 = pCsr->aTree[iOut*2];
................................................................................
  if( pKey1==0 ){
    iRes = i2;
  }else if( pKey2==0 ){
    iRes = i1;
  }else{
    int res;

    /* Compare the keys */
    res = sortedDbKeyCompare(pCsr->pDb->xCmp, 
        eType1, pKey1, nKey1, eType2, pKey2, nKey2
    );

    res = res * mul;

    if( res==0 ){
      iRes = (rtIsSeparator(eType1) ? i2 : i1);
    }else if( res<0 ){
      iRes = i1;
    }else{
      iRes = i2;
    }
  }

  pCsr->aTree[iOut] = iRes;
}















/*
** This function advances segment pointer iPtr belonging to multi-cursor
** pCsr forward (bReverse==0) or backward (bReverse!=0).
**
** If the segment pointer points to a segment that is part of a composite
** level, then the following special case is handled.
**
**   * If iPtr is the lhs of a composite level, and the cursor is being
**     advanced forwards, and segment iPtr is at EOF, move all pointers
**     that correspond to rhs segments of the same level to the first
**     key in their respective data.




*/
static int segmentCursorAdvance(
  MultiCursor *pCsr, 
  int iPtr,
  int bReverse
){
  int rc;
................................................................................
  bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);

  if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
   && bReverse==0                                /* csr advanced forwards */
   && pPtr->pPg==0                               /* segment at EOF */
  ){
    int i;

    for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
      rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
    }

    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, 0);
    }
  }











  return rc;
}

static void mcursorFreeComponents(MultiCursor *pCsr){
  int i;
  lsm_env *pEnv = pCsr->pDb->pEnv;

................................................................................
    rc = multiCursorAddAll(pCsr, pDb->pWorker);
    pCsr->flags |= CURSOR_IGNORE_DELETE;
  }
  
  if( rc==LSM_OK ){
    rc = lsmMCursorLast(pCsr);
    if( rc==LSM_OK 
     && rtIsWrite(pCsr->eType) && rtIsSystem(pCsr->eType)
     && pCsr->key.nData==8 
     && 0==memcmp(pCsr->key.pData, "FREELIST", 8)
    ){
      void *pVal; int nVal;         /* Value read from database */
      rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK ){
        *ppVal = lsmMallocRc(pDb->pEnv, nVal, &rc);
................................................................................
  if( *pRc==LSM_OK ){
    void *pKey;
    int nKey;
    multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
    *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
  }
}

static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
  int eType = pCsr->eType;
  int iKey;
  int i;
  int rdmask = 0;
  
  assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
  if( pCsr->flags & CURSOR_NEXT_OK ){
    rdmask = LSM_END_DELETE;
  }else{
    rdmask = LSM_START_DELETE;
  }

  if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
    if( (eType & LSM_INSERT)==0 ) return 0;
  }
  if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
    return 0;
  }

  /* Check if this key has already been deleted by a range-delete */
  iKey = pCsr->aTree[1];
  if( (iKey>0 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[0]))) 
   || (iKey>1 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[1]))) 
  ){
    return 0;
  }
  for(i=CURSOR_DATA_SEGMENT; i<iKey; i++){
    int iPtr = i-CURSOR_DATA_SEGMENT;
    if( pCsr->aPtr[iPtr].pPg && (pCsr->aPtr[iPtr].eType & rdmask) ){
      return 0;
    }
  }

  return 1;
}

static int multiCursorEnd(MultiCursor *pCsr, int bLast){
  int rc = LSM_OK;
  int i;

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);

  if( pCsr->apTreeCsr[0] ){
    rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast);
  }
  if( rc==LSM_OK && pCsr->apTreeCsr[1] ){
    rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast);
  }

................................................................................
  if( rc==LSM_OK ){
    rc = multiCursorAllocTree(pCsr);
  }
  if( rc==LSM_OK ){
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, bLast);
    }

  }

  multiCursorCacheKey(pCsr, &rc);
  if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){



    if( bLast ){
      rc = lsmMCursorPrev(pCsr);
    }else{
      rc = lsmMCursorNext(pCsr);
    }
  }

................................................................................
  lsmTreeCursorReset(pCsr->apTreeCsr[1]);
  for(i=0; i<pCsr->nPtr; i++){
    segmentPtrReset(&pCsr->aPtr[i]);
  }
  pCsr->key.nData = 0;
}

static int treeCursorSeek(
  MultiCursor *pCsr,
  TreeCursor *pTreeCsr, 
  void *pKey, int nKey, 
  int eSeek,
  int *pbStop
){
  int rc = LSM_OK;
  if( pTreeCsr ){
    int res = 0;
    lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
    switch( eSeek ){
      case LSM_SEEK_EQ: {

        int eType = lsmTreeCursorFlags(pTreeCsr);
        if( (res<0 && (eType & LSM_START_DELETE))
         || (res>0 && (eType & LSM_END_DELETE))
         || (res==0 && (eType & LSM_POINT_DELETE))
        ){
          *pbStop = 1;
        }else if( res==0 && (eType & LSM_INSERT) ){
          lsm_env *pEnv = pCsr->pDb->pEnv;
          void *p; int n;         /* Key/value from tree-cursor */
          *pbStop = 1;
          pCsr->flags |= CURSOR_SEEK_EQ;
          rc = lsmTreeCursorKey(pTreeCsr, &pCsr->eType, &p, &n);
          if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->key, p, n);
          if( rc==LSM_OK ) rc = lsmTreeCursorValue(pTreeCsr, &p, &n);
          if( rc==LSM_OK ) rc = sortedBlobSet(pEnv, &pCsr->val, p, n);
        }
        lsmTreeCursorReset(pTreeCsr);
        break;
      }
      case LSM_SEEK_GE:
        if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
          lsmTreeCursorNext(pTreeCsr);
        }
        break;
      default:
        if( res>0 ){
          assert( lsmTreeCursorValid(pTreeCsr) );
          lsmTreeCursorPrev(pTreeCsr);
        }
        break;
    }
  }
  return rc;
}














/*
** Seek the cursor.
*/
int lsmMCursorSeek(MultiCursor *pCsr, void *pKey, int nKey, int eSeek){
  int eESeek = eSeek;             /* Effective eSeek parameter */
  int bStop = 0;                  /* Set to true to halt search operation */
  int rc = LSM_OK;                /* Return code */
  int iPtr = 0;                   /* Used to iterate through pCsr->aPtr[] */
  Pgno iPgno = 0;                 /* FC pointer value */

  if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;

  assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
  assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 );
  assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 );
  assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
  rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop);
  if( rc==LSM_OK && bStop==0 ){
    rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
  }

  /* Seek all segment pointers. */

  for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
    SegmentPtr *pPtr = &pCsr->aPtr[iPtr];

    assert( pPtr->pSeg==&pPtr->pLevel->lhs );
    rc = seekInLevel(pCsr, pPtr, eESeek, pKey, nKey, &iPgno, &bStop);
    iPtr += pPtr->pLevel->nRight;
  }

  if( eSeek!=LSM_SEEK_EQ ){
    if( rc==LSM_OK ){
      rc = multiCursorAllocTree(pCsr);
    }
    if( rc==LSM_OK ){
      int i;
      for(i=pCsr->nTree-1; i>0; i--){
        multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
      }
      if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
      if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
    }

    multiCursorCacheKey(pCsr, &rc);
    if( rc==LSM_OK && eSeek!=LSM_SEEK_LEFAST && 0==mcursorLocationOk(pCsr, 0) ){
      switch( eESeek ){
        case LSM_SEEK_EQ:
          lsmMCursorReset(pCsr);
          break;
        case LSM_SEEK_GE:
          rc = lsmMCursorNext(pCsr);
          break;
        default:
          rc = lsmMCursorPrev(pCsr);
          break;
      }
    }
  }

  return rc;
}

int lsmMCursorValid(MultiCursor *pCsr){
  int res = 0;
  if( pCsr->flags & CURSOR_SEEK_EQ ){
    res = 1;
  }else if( pCsr->aTree ){
    int iKey = pCsr->aTree[1];
    if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
      res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
    }else{
      void *pKey; 
      multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
      res = pKey!=0;
................................................................................

  /* Check the current key value. If it is not greater than (if bReverse==0)
  ** or less than (if bReverse!=0) the key currently cached in pCsr->key, 
  ** then the cursor has not yet been successfully advanced.  
  */
  multiCursorGetKey(pCsr, pCsr->aTree[1], &eNewType, &pNew, &nNew);
  if( pNew ){
    int res = sortedDbKeyCompare(pCsr->pDb->xCmp, 

        eNewType, pNew, nNew, pCsr->eType, pCsr->key.pData, pCsr->key.nData

    );
    if( (bReverse==0 && res<=0) || (bReverse!=0 && res>=0) ){
      return 0;
    }


    multiCursorCacheKey(pCsr, pRc);
    assert( pCsr->eType==eNewType );

    /* If this cursor is configured to skip deleted keys, and the current
    ** cursor points to a SORTED_DELETE entry, then the cursor has not been 
    ** successfully advanced.  
    **
    ** Similarly, if the cursor is configured to skip system keys and the
    ** current cursor points to a system key, it has not yet been advanced.
     */
    if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
  }
  return 1;
}

static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
  int rc = LSM_OK;                /* Return Code */
  if( lsmMCursorValid(pCsr) ){
    do {
      int iKey = pCsr->aTree[1];

      /* If this multi-cursor is advancing forwards, and the sub-cursor
      ** being advanced is the one that separator keys may be being read
      ** from, record the current absolute pointer value.  */
      if( pCsr->pPrevMergePtr ){
        if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
          assert( pCsr->pBtCsr );
          *pCsr->pPrevMergePtr = pCsr->pBtCsr->iPtr;
        }else if( pCsr->pBtCsr==0 && pCsr->nPtr>0
               && iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr-1) 
        ){
          SegmentPtr *pPtr = &pCsr->aPtr[iKey-CURSOR_DATA_SEGMENT];
          *pCsr->pPrevMergePtr = pPtr->iPtr+pPtr->iPgPtr;
        }
      }

      if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
        TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
        if( bReverse ){
          rc = lsmTreeCursorPrev(pTreeCsr);
        }else{
          rc = lsmTreeCursorNext(pTreeCsr);
        }
................................................................................
        pCsr->flags &= ~CURSOR_AT_FREELIST;
      }else if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
        assert( bReverse==0 && pCsr->pBtCsr );
        rc = btreeCursorNext(pCsr->pBtCsr);
      }else{
        rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
      }

      if( rc==LSM_OK ){
        int i;
        for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
          multiCursorDoCompare(pCsr, i, bReverse);
        }
      }
    }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
................................................................................

int lsmMCursorPrev(MultiCursor *pCsr){
  if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
  return multiCursorAdvance(pCsr, 1);
}

int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){
  if( pCsr->flags & CURSOR_SEEK_EQ ){
    *pnKey = pCsr->key.nData;
    *ppKey = pCsr->key.pData;
  }else{
    int iKey = pCsr->aTree[1];

    if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
      TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
      lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey);
    }else{
      int nKey;

#ifndef NDEBUG
      void *pKey;
      int eType;
      multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
................................................................................
      if( nKey==0 ){
        *ppKey = 0;
      }else{
        *ppKey = pCsr->key.pData;
      }
      *pnKey = nKey; 
    }
  }
  return LSM_OK;
}

int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
  void *pVal;
  int nVal;
  int rc;
  if( pCsr->flags & CURSOR_SEEK_EQ ){
    rc = LSM_OK;
    nVal = pCsr->val.nData;
    pVal = pCsr->val.pData;
  }else{

    assert( pCsr->aTree );
    assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) );

    rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
    if( pVal && rc==LSM_OK ){
      rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
      pVal = pCsr->val.pData;
    }

    if( rc!=LSM_OK ){
      pVal = 0;
      nVal = 0;
    }
  }
  *ppVal = pVal;
  *pnVal = nVal;
  return rc;
}

int lsmMCursorType(MultiCursor *pCsr, int *peType){
  assert( pCsr->aTree );
................................................................................
  int nKey;
  int eType;

  nRec = lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
  iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nData, nRec-1)]);
  eType = aData[iOff++];
  assert( eType==0 
       || eType==(LSM_SYSTEMKEY|LSM_SEPARATOR) 
       || eType==(LSM_SEPARATOR)
  );

  iOff += lsmVarintGet32(&aData[iOff], &nKey);
  iOff += lsmVarintGet32(&aData[iOff], &nKey);

  return iOff + (eType ? nKey : 0);
}
................................................................................
  lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], nRec+1);

  if( bIndirect ){
    aData[iOff++] = 0x00;
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], iKeyPg);
  }else{
    aData[iOff++] = (u8)(iTopic | LSM_SEPARATOR);
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  if( iLevel>0 ){
    int iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
................................................................................
  pSeg = &pMW->pLevel->lhs;

  pPg = pMW->pPage;
  aData = fsPageData(pPg, &nData);
  nRec = pageGetNRec(aData, nData);
  iFPtr = pageGetPtr(aData, nData);








  /* Calculate the relative pointer value to write to this record */
  iRPtr = iPtr - iFPtr;
  /* assert( iRPtr>=0 ); */
     
  /* Figure out how much space is required by the new record. The space
  ** required is divided into two sections: the header and the body. The
  ** header consists of the intial varint fields. The body are the blobs 
................................................................................
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
     ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = *pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;

      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;
    }
  }

................................................................................
      iFPtr = pageGetPtr(aData, nData);
      lsmFsPageRelease(pPg);
    }
  }

  if( rc==LSM_OK ){
    rc = mergeWorkerNextPage(pMW, iFPtr);
    if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
  }

  return rc;
}

/*
** The cursor passed as the first argument is being used as the input for
** a merge operation. When this function is called, *piFlags contains the
** database entry flags for the current entry. The entry about to be written
** to the output.
**
*/
static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){
  int f = *piFlags;
  int iKey = pCsr->aTree[1];
  int i;

  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
    /* The ignore-delete flag is set when the output of the merge will form
    ** the oldest level in the database. In this case there is no point in
    ** retaining any range-delete flags.  */
    assert( (f & LSM_POINT_DELETE)==0 );
    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  }else{
    if( iKey==0 ){
      int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
      if( btreeflags & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){
      if( pCsr->aPtr[i].eType & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){
      f = 0;
    }
  }

  *piFlags = f;
}

static int mergeWorkerStep(MergeWorker *pMW){
  lsm_db *pDb = pMW->pDb;       /* Database handle */
  MultiCursor *pCsr;            /* Cursor to read input data from */
  int rc = LSM_OK;              /* Return code */
  int eType;                    /* SORTED_SEPARATOR, WRITE or DELETE */
  void *pKey; int nKey;         /* Key */
  Segment *pSeg;                /* Output segment */
  Pgno iPtr;

  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
  lsmMCursorKey(pCsr, &pKey, &nKey);
  eType = pCsr->eType;
................................................................................

  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */
  iPtr = (pCsr->pPrevMergePtr ? *pCsr->pPrevMergePtr : 0);
  if( pCsr->pBtCsr ){
    BtreeCursor *pBtCsr = pCsr->pBtCsr;
    if( pBtCsr->pKey ){
      int res = rtTopic(pBtCsr->eType) - rtTopic(eType);
      if( res==0 ) res = pDb->xCmp(pBtCsr->pKey, pBtCsr->nKey, pKey, nKey);
      if( 0==res ) iPtr = pBtCsr->iPtr;

      assert( res>=0 );
    }
  }else if( pCsr->nPtr ){
    SegmentPtr *pPtr = &pCsr->aPtr[pCsr->nPtr-1];
    if( pPtr->pPg
     && 0==pDb->xCmp(pPtr->pKey, pPtr->nKey, pKey, nKey)
    ){
      iPtr = pPtr->iPtr+pPtr->iPgPtr;
    }
  }

  mergeRangeDeletes(pCsr, &eType);

  if( eType!=0 ){
    if( pMW->aGobble ){
      int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
      if( iGobble<pCsr->nPtr ){
        SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
        if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
          pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
        }
................................................................................
      }

      /* Write the record into the main run. */
      if( rc==LSM_OK ){
        rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr, &iSPtr);
      }
    }
  }

  /* Advance the cursor to the next input record (assuming one exists). */
  assert( lsmMCursorValid(pMW->pCsr) );
  if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);

  /* If the cursor is at EOF, the merge is finished. Release all page
  ** references currently held by the merge worker and inform the 
................................................................................
  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  Pgno iLeftPtr = 0;
  int nWrite = 0;                 /* Number of database pages written */

  assert( pnOvfl );

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
................................................................................
    memset(&merge, 0, sizeof(Merge));
    memset(&mergeworker, 0, sizeof(MergeWorker));

    pNew->pMerge = &merge;
    mergeworker.pDb = pDb;
    mergeworker.pLevel = pNew;
    mergeworker.pCsr = pCsr;
    pCsr->pPrevMergePtr = &iLeftPtr;

    /* Mark the separators array for the new level as a "phantom". */
    mergeworker.bFlush = 1;

    /* Allocate the first page of the output segment. */
    rc = mergeWorkerNextPage(&mergeworker, iLeftPtr);

................................................................................
  }

  if( rc==LSM_OK ){
    sortedInvokeWorkHook(pDb);
  }

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
#endif

  if( pnWrite ) *pnWrite = nWrite;
  pDb->pWorker->nWrite += nWrite;
  return rc;
}

................................................................................
  pMW->pCsr = pCsr;

  /* Load the current output page into memory. */
  if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW);

  /* Position the cursor. */
  if( rc==LSM_OK ){
    pCsr->pPrevMergePtr = &pMerge->iCurrentPtr;
    if( pMW->pPage==0 ){
      /* The output array is still empty. So position the cursor at the very 
      ** start of the input.  */
      rc = multiCursorEnd(pCsr, 0);
    }else{
      /* The output array is non-empty. Position the cursor based on the
      ** page/cell data saved in the Merge.aInput[] array.  */
................................................................................
            SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
            if( pGobble->pSeg->iRoot ){
              rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
            }else if( mergeworker.aGobble[i] ){
              lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
            }
          }













        }else if( pLevel->lhs.iFirst==0 ){
          /* If the new level is completely empty, remove it from the 
          ** database snapshot. This can only happen if all input keys were
          ** annihilated. Since keys are only annihilated if the new level
          ** is the last in the linked list (contains the most ancient of
          ** database content), this guarantees that pLevel->pNext==0.  */ 

................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work");
#endif
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */
      if( bFlush && sortedDbIsFull(pDb)==0 ) break;






    }
  }

  if( pnWrite ) *pnWrite = (nWork - nRemaining);
  pWorker->nWrite += (nWork - nRemaining);

#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
#endif
  return rc;
}













































/*
** The database connection passed as the first argument must be a worker
** connection. This function checks if there exists an "old" in-memory tree
** ready to be flushed to disk. If so, *pbOut is set to true before 
** returning. Otherwise false.
**
** Normally, LSM_OK is returned. Or, if an error occurs, an LSM error code.
................................................................................
    }else{
      lsmStringAppendf(pStr, "%c", isalnum(z[iChar]) ?z[iChar] : '.');
    }
  }
  return LSM_OK;
}

#define INFO_PAGE_DUMP_DATA   0x01
#define INFO_PAGE_DUMP_VALUES 0x02
#define INFO_PAGE_DUMP_HEX    0x04

static int infoPageDump(
  lsm_db *pDb,                    /* Database handle */
  Pgno iPg,                       /* Page number of page to dump */
  int flags,
  char **pzOut                    /* OUT: lsmMalloc'd string */
){
  int rc = LSM_OK;                /* Return code */
  Page *pPg = 0;                  /* Handle for page iPg */
  int i, j;                       /* Loop counters */
  const int perLine = 16;         /* Bytes per line in the raw hex dump */

  int bValues = (flags & INFO_PAGE_DUMP_VALUES);
  int bHex = (flags & INFO_PAGE_DUMP_HEX);
  int bData = (flags & INFO_PAGE_DUMP_DATA);

  *pzOut = 0;
  if( iPg==0 ) return LSM_ERROR;

  rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg);
  if( rc==LSM_OK ){
    Blob blob = {0, 0, 0, 0};
................................................................................
    for(iCell=0; iCell<nRec; iCell++){
      u8 *aKey; int nKey = 0;       /* Key */
      u8 *aVal; int nVal = 0;       /* Value */
      int iPgPtr;
      int eType;
      char cType = '?';
      Pgno iAbsPtr;
      char zFlags[8];

      infoCellDump(pDb, pPg, iCell, &eType, &iPgPtr,
          &aKey, &nKey, &aVal, &nVal, &blob
      );
      iAbsPtr = iPgPtr + ((flags & SEGMENT_BTREE_FLAG) ? 0 : iPtr);

      lsmFlagsToString(eType, zFlags);


      lsmStringAppendf(&str, "%s %d (%s) ", 
          zFlags, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
      );
      infoAppendBlob(&str, bHex, aKey, nKey); 
      if( nVal>0 && bValues ){
        lsmStringAppendf(&str, "%*s", nKeyWidth - (nKey*(1+bHex)), "");
        lsmStringAppendf(&str, " ");
        infoAppendBlob(&str, bHex, aVal, nVal); 
      }
      lsmStringAppendf(&str, "\n");
    }

    if( bData ){
      lsmStringAppendf(&str, "\n-------------------" 
          "-------------------------------------------------------------\n");
      lsmStringAppendf(&str, "Page %d\n",
          iPg, (iPg-1)*nData, iPg*nData - 1);
      for(i=0; i<nData; i += perLine){
        lsmStringAppendf(&str, "%04x: ", i);
        for(j=0; j<perLine; j++){
................................................................................
            lsmStringAppendf(&str, " ");
          }else{
            lsmStringAppendf(&str,"%c", isprint(aData[i+j]) ? aData[i+j] : '.');
          }
        }
        lsmStringAppendf(&str,"\n");
      }
    }

    *pzOut = str.z;
    sortedBlobFree(&blob);
    lsmFsPageRelease(pPg);
  }

  return rc;
}

int lsmInfoPageDump(
  lsm_db *pDb,                    /* Database handle */
  Pgno iPg,                       /* Page number of page to dump */
  int bHex,                       /* True to output key/value in hex form */
  char **pzOut                    /* OUT: lsmMalloc'd string */
){
  int flags = INFO_PAGE_DUMP_DATA | INFO_PAGE_DUMP_VALUES;
  if( bHex ) flags |= INFO_PAGE_DUMP_HEX;
  return infoPageDump(pDb, iPg, flags, pzOut);
}

void sortedDumpSegment(lsm_db *pDb, Segment *pRun, int bVals){
  assert( pDb->xLog );
  if( pRun && pRun->iFirst ){
    int flags = (bVals ? INFO_PAGE_DUMP_VALUES : 0);
    char *zSeg;
    Page *pPg;

    zSeg = segToString(pDb->pEnv, pRun, 0);
    lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
    lsmFree(pDb->pEnv, zSeg);

    lsmFsDbPageGet(pDb->pFS, pRun->iFirst, &pPg);
    while( pPg ){
      Page *pNext;
      char *z = 0;
      infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z);
      lsmLogMessage(pDb, LSM_OK, "%s", z);
      lsmFree(pDb->pEnv, z);
#if 0
      sortedDumpPage(pDb, pRun, pPg, bVals);
#endif
      lsmFsDbPageNext(pRun, pPg, 1, &pNext);
      lsmFsPageRelease(pPg);
      pPg = pNext;
    }
  }
}

Changes to src/lsm_tree.c.

96
97
98
99
100
101
102






























103
104
105
106
107
108
109
110
111
112

113
114
115
116
117
118
119
...
281
282
283
284
285
286
287


















288
289
290
291
292
293
294
...
362
363
364
365
366
367
368



















369
370
371
372
373
374
375
...
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
...
396
397
398
399
400
401
402









403
404
405
406
407

408
409

410
411
412
413
414
415



416
417
418
419
420
421
422
423
424
425
426

427
428
429
430
431
432


433
434

435
436



437
438










439
440
441
442
443
444
445

446
447

448
449
450
451
452
453
454
455
456
...
469
470
471
472
473
474
475
476
477
478
479


480
481
482
483
484
485
486
....
1074
1075
1076
1077
1078
1079
1080



1081
1082
1083
1084
1085
1086
1087

1088
1089
1090
1091
1092
1093
1094
....
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181

1182
1183
1184

1185
1186
1187
1188
1189
1190
1191
....
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255



































































1256
1257

1258
1259
1260
1261
1262
1263
1264
1265
1266


1267
1268
1269



1270
1271
1272
1273


















































1274
1275
1276

1277
1278
1279
1280
1281
1282
1283
....
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320

1321
1322
1323
1324
1325
1326
1327
....
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343


1344
1345
1346























































































































































































































































































































































1347
1348
1349
1350
1351
1352
1353
....
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
....
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
....
1666
1667
1668
1669
1670
1671
1672













1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684

1685
1686
1687
1688
1689
1690
1691
....
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703

1704
1705
1706
1707
1708
1709
1710
....
1838
1839
1840
1841
1842
1843
1844
1845












































struct TreeOld {
  u32 iShmid;                     /* Last shared-memory chunk in use by old */
  u32 iRoot;                      /* Offset of root node in shm file */
  u32 nHeight;                    /* Height of tree structure */
};































/*
** Container for a key-value pair. Within the *-shm file, each key/value
** pair is stored in a single allocation (which may not actually be 
** contiguous in memory). Layout is the TreeKey structure, followed by
** the nKey bytes of key blob, followed by the nValue bytes of value blob
** (if nValue is non-negative).
*/
struct TreeKey {
  int nKey;                       /* Size of pKey in bytes */
  int nValue;                     /* Size of pValue. Or negative. */

};

#define TK_KEY(p) ((void *)&(p)[1])
#define TK_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))

/*
** A single tree node. A node structure may contain up to 3 key/value
................................................................................
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy);
}

static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, pRc);
}



















/* Values for the third argument to treeShmkey(). */
#define TK_LOADKEY  1
#define TK_LOADVAL  2

static TreeKey *treeShmkey(
  lsm_db *pDb,                    /* Database handle */
  u32 iPtr,                       /* Shmptr to TreeKey struct */
................................................................................
**   * todo...
*/
void assert_tree_looks_ok(int rc, Tree *pTree){
}
#else
# define assert_tree_looks_ok(x,y)
#endif




















#ifdef LSM_DEBUG

/*
** Pointer pBlob points to a buffer containing a blob of binary data
** nBlob bytes long. Append the contents of this blob to *pStr, with
** each octet represented by a 2-digit hexadecimal number. For example,
................................................................................
  int i;
  lsmStringExtend(pStr, nBlob*2);
  if( pStr->nAlloc==0 ) return;
  for(i=0; i<nBlob; i++){
    u8 c = ((u8*)pBlob)[i];
    if( c>='a' && c<='z' ){
      pStr->z[pStr->n++] = c;
    }else{
      pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
      pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
    }
  }
  pStr->z[pStr->n] = 0;
}

................................................................................
** Append nIndent space (0x20) characters to string *pStr.
*/
static void lsmAppendIndent(LsmString *pStr, int nIndent){
  int i;
  lsmStringExtend(pStr, nIndent);
  for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
}










void dump_node_contents(
  lsm_db *pDb,
  u32 iNode,                      /* Print out hte contents of this node */
  int nIndent,                    /* Number of spaces indentation */

  int nHeight                     /* Height: (0==leaf) (1==parent-of-leaf) */
){

  int i;
  int rc = LSM_OK;
  LsmString s;
  TreeNode *pNode;
  TreeBlob b = {0, 0};




  /* Append the nIndent bytes of space to string s. */
  lsmStringInit(&s, pDb->pEnv);
  if( nIndent ) lsmAppendIndent(&s, nIndent);

  pNode = (TreeNode *)treeShmptr(pDb, iNode, &rc);

  /* Append each key to string s. */
  for(i=0; i<3; i++){
    u32 iPtr = pNode->aiKeyPtr[i];
    if( iPtr ){
      TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b, &rc);

      lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
      lsmStringAppend(&s, "     ", -1);
    }
  }

  printf("%s\n", s.z);


  lsmStringClear(&s);


  for(i=0; i<4 && nHeight>0; i++){
    u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);



    if( iPtr ){
      dump_node_contents(pDb, iPtr, nIndent + 2, nHeight-1);










    }
  }

  tblobFree(pDb, &b);
}

void dump_tree_contents(lsm_db *pDb, const char *zCaption){

  TreeRoot *p = &pDb->treehdr.root;
  printf("\n%s\n", zCaption);

  if( p->iRoot ){
    dump_node_contents(pDb, p->iRoot, 0, p->nHeight-1);
  }
  fflush(stdout);
}

#endif

/*
................................................................................
}

/*
** Return a pointer to the mapping of the TreeKey object that the cursor
** is pointing to. 
*/
static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
  return (TreeKey *)treeShmkey(pCsr->pDb,
      pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], 
      TK_LOADVAL, pBlob, pRc
  );


}

/*
** Save the current position of tree cursor pCsr.
*/
int lsmTreeCursorSave(TreeCursor *pCsr){
  int rc = LSM_OK;
................................................................................
  u32 iShmid;
  ShmChunk *p;

  p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
  iShmid = p->iShmid;
  while( rc==LSM_OK && p ){
    if( p->iNext ){



      ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
      if( rc==LSM_OK ){
        if( pNext->iShmid!=p->iShmid+1 ){
          rc = LSM_CORRUPT_BKPT;
        }
        p = pNext;
      }

    }else{
      p = 0;
    }
    nVisit++;
  }

  if( rc==LSM_OK && nVisit!=db->treehdr.nChunk-1 ){
................................................................................
    nSort = 1;
    while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
    nByte = sizeof(ShmChunkLoc) * nSort * 2;
    aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
    iPrevShmid = pMin->iShmid;

    /* Fix all shm-ids, if required. */
    if( rc==LSM_OK && iMin!=db->treehdr.iFirst ){
      iPrevShmid = pMin->iShmid-1;
      for(i=1; i<db->treehdr.nChunk; i++){
        p = treeShmChunk(db, i);
        aSort[i-1].pShm = p;
        aSort[i-1].iLoc = i;
        if( i!=db->treehdr.iFirst ){
          if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
            p->iShmid = iPrevShmid--;
          }
        }
      }

      p = treeShmChunk(db, db->treehdr.iFirst);
      p->iShmid = iPrevShmid;
    }


    if( rc==LSM_OK ){
      ShmChunkLoc *aSpace = &aSort[nSort];
      for(i=0; i<nSort; i++){
        if( aSort[i].pShm ){
          assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
          assert( aSpace[aSort[i].pShm->iShmid - iPrevShmid].pShm==0 );
................................................................................
    rc = treeRepairList(db);
  }

  memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
  return rc;
}

/*
** Insert a new entry into the in-memory tree.
**
** If the value of the 5th parameter, nVal, is negative, then a delete-marker
** is inserted into the tree. In this case the value pointer, pVal, must be
** NULL.
*/



































































int lsmTreeInsert(
  lsm_db *pDb,                    /* Database handle */

  void *pKey,                     /* Pointer to key data */
  int nKey,                       /* Size of key data in bytes */
  void *pVal,                     /* Pointer to value data (or NULL) */
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int rc = LSM_OK;                /* Return Code */
  TreeKey *pTreeKey;              /* New key-value being inserted */
  u32 iTreeKey;
  TreeRoot *p = &pDb->treehdr.root;



  assert( nVal>=0 || pVal==0 );
  assert_tree_looks_ok(LSM_OK, pTree);



#if 0
  dump_tree_contents(pDb, "before");
#endif



















































  /* Allocate and populate a new key-value pair structure */
  pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
  if( rc!=LSM_OK ) return rc;


  if( p->iRoot==0 ){
    /* The tree is completely empty. Add a new root node and install
    ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
    ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
    ** space for the fields used by interior nodes). This is because the
    ** treeInsert() routine may convert this node to an interior node. */
................................................................................
    TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
    if( rc==LSM_OK ){
      assert( p->nHeight==0 );
      pRoot->aiKeyPtr[1] = iTreeKey;
      p->nHeight = 1;
    }
  }else{
    TreeCursor csr;
    int res;

    /* Seek to the leaf (or internal node) that the new key belongs on */
    treeCursorInit(pDb, 0, &csr);
    lsmTreeCursorSeek(&csr, pKey, nKey, &res);

    if( res==0 ){
      /* The search found a match within the tree. */
      TreeNode *pNew;
      u32 iNew;
      TreeNode *pNode = csr.apTreeNode[csr.iNode];
      int iCell = csr.aiCell[csr.iNode];

      /* Create a copy of this node */
      if( (csr.iNode>0 && csr.iNode==(p->nHeight-1)) ){
        pNew = copyTreeLeaf(pDb, (TreeLeaf *)pNode, &iNew, &rc);
      }else{
        pNew = copyTreeNode(pDb, pNode, &iNew, &rc);
      }

      if( rc==LSM_OK ){
        /* Modify the value in the new version */
        pNew->aiKeyPtr[iCell] = iTreeKey;

        /* Change the pointer in the parent (if any) to point at the new 
        ** TreeNode */
        csr.iNode--;
        treeUpdatePtr(pDb, &csr, iNew);
      }

    }else{
      /* The cursor now points to the leaf node into which the new entry should
      ** be inserted. There may or may not be a free slot within the leaf for
      ** the new key-value pair. 
      **
      ** iSlot is set to the index of the key within pLeaf that the new key
      ** should be inserted to the left of (or to a value 1 greater than the
................................................................................
      int iSlot = csr.aiCell[csr.iNode] + (res<0);
      if( csr.iNode==0 ){
        rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
      }else{
        rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
      }
    }
    tblobFree(pDb, &csr.blob);
  }

#if 0
  dump_tree_contents(pDb, "after");
#endif


  assert_tree_looks_ok(rc, pTree);
  return rc;
}
























































































































































































































































































































































/*
** Return, in bytes, the amount of memory currently used by the tree 
** structure.
*/
int lsmTreeSize(lsm_db *pDb){
  return pDb->treehdr.nByte;
................................................................................
      if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
    }
  }

#ifndef NDEBUG
  if( pCsr->iNode>=0 ){
    TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
    assert( rc || pDb->xCmp(TK_KEY(pK2), pK2->nKey, TK_KEY(pK1), pK1->nKey)>0 );
  }
  tblobFree(pDb, &key1);
#endif

  return rc;
}

................................................................................

/*
** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
** in-memory tree.
*/
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
  lsm_db *pDb = pCsr->pDb;
  TreeHeader *pHdr = &pDb->treehdr;
  TreeRoot *pRoot = pCsr->pRoot;
  int rc = LSM_OK;

  u32 iNodePtr;
  pCsr->iNode = -1;

  /* Discard any saved position data */
................................................................................
    }
    pCsr->aiCell[pCsr->iNode] = iCell - (iNodePtr==0 && bLast);
  }

  return rc;
}














int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey){
  TreeKey *pTreeKey;
  int rc = LSM_OK;

  assert( lsmTreeCursorValid(pCsr) );

  pTreeKey = pCsr->pSave;
  if( !pTreeKey ){
    pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
  }
  if( rc==LSM_OK ){
    *pnKey = pTreeKey->nKey;

    *ppKey = (void *)&pTreeKey[1];
  }

  return rc;
}

int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){
................................................................................
  int res = 0;
  int rc;

  rc = treeCursorRestore(pCsr, &res);
  if( res==0 ){
    TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
    if( rc==LSM_OK ){
      *pnVal = pTreeKey->nValue;
      if( pTreeKey->nValue>=0 ){
        *ppVal = TK_VAL(pTreeKey);
      }else{
        *ppVal = 0;

      }
    }
  }else{
    *ppVal = 0;
    *pnVal = 0;
  }

................................................................................
    memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
  }
  pShm->bWriter = 0;
  intArrayFree(pDb->pEnv, &pDb->rollback);

  return LSM_OK;
}



















































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>










>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|







 







>
>
>
>
>
>
>
>
>



|
|
>


>






>
>
>
|
|
<
<
<






>





|
>
>
|
<
>
|
|
>
>
>
|
|
>
>
>
>
>
>
>
>
>
>







>


>

|







 







|



>
>







 







>
>
>







>







 







|











>



>







 







|
|
|
|
|
|
<
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|

>









>
>



>
>
>




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



>







 







<
<
<
<
<
<
<


<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
>







 







<





>
>



>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|







 







<







 







>
>
>
>
>
>
>
>
>
>
>
>
>
|











>







 







|
|



>







 








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
...
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
...
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
...
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
...
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499



500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515

516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
...
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
....
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
....
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
....
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359

1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
....
1512
1513
1514
1515
1516
1517
1518







1519
1520





















1521
1522
1523
1524
1525
1526
1527
1528
....
1532
1533
1534
1535
1536
1537
1538

1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
....
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
....
2174
2175
2176
2177
2178
2179
2180

2181
2182
2183
2184
2185
2186
2187
....
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
....
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
....
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447

struct TreeOld {
  u32 iShmid;                     /* Last shared-memory chunk in use by old */
  u32 iRoot;                      /* Offset of root node in shm file */
  u32 nHeight;                    /* Height of tree structure */
};

#ifndef NDEBUG
/*
** assert() that a TreeKey.flags value is sane. Usage:
**
**   assert( assertFlagsOk(pTreeKey->flags) );
*/
static int assertFlagsOk(u8 keyflags){
  /* At least one flag must be set. Otherwise, what is this key doing? */
  assert( keyflags!=0 );

  /* The POINT_DELETE and INSERT flags cannot both be set. */
  assert( (keyflags & LSM_POINT_DELETE)==0 || (keyflags & LSM_INSERT)==0 );

  /* If both the START_DELETE and END_DELETE flags are set, then the INSERT
  ** flag must also be set. In other words - the three DELETE flags cannot
  ** all be set */
  assert( (keyflags & LSM_END_DELETE)==0 
       || (keyflags & LSM_START_DELETE)==0 
       || (keyflags & LSM_POINT_DELETE)==0 
  );

  return 1;
}

static int assert_delete_ranges_match(lsm_db *);
static int treeCountEntries(lsm_db *db);
#else
# define assertFlagsOk(x)
#endif

/*
** Container for a key-value pair. Within the *-shm file, each key/value
** pair is stored in a single allocation (which may not actually be 
** contiguous in memory). Layout is the TreeKey structure, followed by
** the nKey bytes of key blob, followed by the nValue bytes of value blob
** (if nValue is non-negative).
*/
struct TreeKey {
  int nKey;                       /* Size of pKey in bytes */
  int nValue;                     /* Size of pValue. Or negative. */
  u8 flags;                       /* Various LSM_XXX flags */
};

#define TK_KEY(p) ((void *)&(p)[1])
#define TK_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))

/*
** A single tree node. A node structure may contain up to 3 key/value
................................................................................
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy);
}

static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, pRc);
}


#ifndef NDEBUG
static void assertIsWorkingChild(
  lsm_db *db, 
  TreeNode *pNode, 
  TreeNode *pParent, 
  int iCell
){
  TreeNode *p;
  int rc = LSM_OK;
  u32 iPtr = getChildPtr(pParent, WORKING_VERSION, iCell);
  p = treeShmptr(db, iPtr, &rc);
  assert( p==pNode || rc!=LSM_OK );
}
#else
# define assertIsWorkingChild(w,x,y,z)
#endif

/* Values for the third argument to treeShmkey(). */
#define TK_LOADKEY  1
#define TK_LOADVAL  2

static TreeKey *treeShmkey(
  lsm_db *pDb,                    /* Database handle */
  u32 iPtr,                       /* Shmptr to TreeKey struct */
................................................................................
**   * todo...
*/
void assert_tree_looks_ok(int rc, Tree *pTree){
}
#else
# define assert_tree_looks_ok(x,y)
#endif

void lsmFlagsToString(int flags, char *zFlags){

  zFlags[0] = (flags & LSM_END_DELETE)   ? ']' : '.';

  /* Only one of LSM_POINT_DELETE, LSM_INSERT and LSM_SEPARATOR should ever
  ** be set. If this is not true, write a '?' to the output.  */
  switch( flags & (LSM_POINT_DELETE|LSM_INSERT|LSM_SEPARATOR) ){
    case 0:                zFlags[1] = '.'; break;
    case LSM_POINT_DELETE: zFlags[1] = '-'; break;
    case LSM_INSERT:       zFlags[1] = '+'; break;
    case LSM_SEPARATOR:    zFlags[1] = '^'; break;
    default:               zFlags[1] = '?'; break;
  }

  zFlags[2] = (flags & LSM_SYSTEMKEY)    ? '*' : '.';
  zFlags[3] = (flags & LSM_START_DELETE) ? '[' : '.';
  zFlags[4] = '\0';
}

#ifdef LSM_DEBUG

/*
** Pointer pBlob points to a buffer containing a blob of binary data
** nBlob bytes long. Append the contents of this blob to *pStr, with
** each octet represented by a 2-digit hexadecimal number. For example,
................................................................................
  int i;
  lsmStringExtend(pStr, nBlob*2);
  if( pStr->nAlloc==0 ) return;
  for(i=0; i<nBlob; i++){
    u8 c = ((u8*)pBlob)[i];
    if( c>='a' && c<='z' ){
      pStr->z[pStr->n++] = c;
    }else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){
      pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
      pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
    }
  }
  pStr->z[pStr->n] = 0;
}

................................................................................
** Append nIndent space (0x20) characters to string *pStr.
*/
static void lsmAppendIndent(LsmString *pStr, int nIndent){
  int i;
  lsmStringExtend(pStr, nIndent);
  for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
}

static void strAppendFlags(LsmString *pStr, u8 flags){
  char zFlags[8];

  lsmFlagsToString(flags, zFlags);
  zFlags[4] = ':';

  lsmStringAppend(pStr, zFlags, 5);
}

void dump_node_contents(
  lsm_db *pDb,
  u32 iNode,                      /* Print out the contents of this node */
  char *zPath,                    /* Path from root to this node */
  int nPath,                      /* Number of bytes in zPath */
  int nHeight                     /* Height: (0==leaf) (1==parent-of-leaf) */
){
  const char *zSpace = "                                           ";
  int i;
  int rc = LSM_OK;
  LsmString s;
  TreeNode *pNode;
  TreeBlob b = {0, 0};

  pNode = (TreeNode *)treeShmptr(pDb, iNode, &rc);

  if( nHeight==0 ){
    /* Append the nIndent bytes of space to string s. */
    lsmStringInit(&s, pDb->pEnv);




    /* Append each key to string s. */
    for(i=0; i<3; i++){
      u32 iPtr = pNode->aiKeyPtr[i];
      if( iPtr ){
        TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b,&rc);
        strAppendFlags(&s, pKey->flags);
        lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
        lsmStringAppend(&s, "     ", -1);
      }
    }

    printf("% 6d %.*sleaf%.*s: %s\n", 
        iNode, nPath, zPath, 20-nPath-4, zSpace, s.z
    );
    lsmStringClear(&s);

  }else{
    for(i=0; i<4 && nHeight>0; i++){
      u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
      zPath[nPath] = i+'0';
      zPath[nPath+1] = '/';

      if( iPtr ){
        dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
      }
      if( i!=3 && pNode->aiKeyPtr[i] ){
        TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b,&rc);
        lsmStringInit(&s, pDb->pEnv);
        strAppendFlags(&s, pKey->flags);
        lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
        printf("% 6d %.*s%.*s: %s\n", 
            iNode, nPath+1, zPath, 20-nPath-1, zSpace, s.z);
        lsmStringClear(&s);
      }
    }
  }

  tblobFree(pDb, &b);
}

void dump_tree_contents(lsm_db *pDb, const char *zCaption){
  char zPath[64];
  TreeRoot *p = &pDb->treehdr.root;
  printf("\n%s\n", zCaption);
  zPath[0] = '/';
  if( p->iRoot ){
    dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1);
  }
  fflush(stdout);
}

#endif

/*
................................................................................
}

/*
** Return a pointer to the mapping of the TreeKey object that the cursor
** is pointing to. 
*/
static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
  TreeKey *pRet = (TreeKey *)treeShmkey(pCsr->pDb,
      pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], 
      TK_LOADVAL, pBlob, pRc
  );
  assert( pRet==0 || assertFlagsOk(pRet->flags) );
  return pRet;
}

/*
** Save the current position of tree cursor pCsr.
*/
int lsmTreeCursorSave(TreeCursor *pCsr){
  int rc = LSM_OK;
................................................................................
  u32 iShmid;
  ShmChunk *p;

  p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
  iShmid = p->iShmid;
  while( rc==LSM_OK && p ){
    if( p->iNext ){
      if( p->iNext>=db->treehdr.nChunk ){
        rc = LSM_CORRUPT_BKPT;
      }else{
        ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
        if( rc==LSM_OK ){
          if( pNext->iShmid!=p->iShmid+1 ){
            rc = LSM_CORRUPT_BKPT;
          }
          p = pNext;
        }
      }
    }else{
      p = 0;
    }
    nVisit++;
  }

  if( rc==LSM_OK && nVisit!=db->treehdr.nChunk-1 ){
................................................................................
    nSort = 1;
    while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
    nByte = sizeof(ShmChunkLoc) * nSort * 2;
    aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
    iPrevShmid = pMin->iShmid;

    /* Fix all shm-ids, if required. */
    if( rc==LSM_OK ){
      iPrevShmid = pMin->iShmid-1;
      for(i=1; i<db->treehdr.nChunk; i++){
        p = treeShmChunk(db, i);
        aSort[i-1].pShm = p;
        aSort[i-1].iLoc = i;
        if( i!=db->treehdr.iFirst ){
          if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
            p->iShmid = iPrevShmid--;
          }
        }
      }
      if( iMin!=db->treehdr.iFirst ){
        p = treeShmChunk(db, db->treehdr.iFirst);
        p->iShmid = iPrevShmid;
      }
    }

    if( rc==LSM_OK ){
      ShmChunkLoc *aSpace = &aSort[nSort];
      for(i=0; i<nSort; i++){
        if( aSort[i].pShm ){
          assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
          assert( aSpace[aSort[i].pShm->iShmid - iPrevShmid].pShm==0 );
................................................................................
    rc = treeRepairList(db);
  }

  memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
  return rc;
}

static void treeOverwriteKey(lsm_db *db, TreeCursor *pCsr, u32 iKey, int *pRc){
  if( *pRc==LSM_OK ){
    TreeRoot *p = &db->treehdr.root;
    TreeNode *pNew;
    u32 iNew;
    TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];

    int iCell = pCsr->aiCell[pCsr->iNode];

    /* Create a copy of this node */
    if( (pCsr->iNode>0 && pCsr->iNode==(p->nHeight-1)) ){
      pNew = copyTreeLeaf(db, (TreeLeaf *)pNode, &iNew, pRc);
    }else{
      pNew = copyTreeNode(db, pNode, &iNew, pRc);
    }

    if( pNew ){
      /* Modify the value in the new version */
      pNew->aiKeyPtr[iCell] = iKey;

      /* Change the pointer in the parent (if any) to point at the new 
       ** TreeNode */
      pCsr->iNode--;
      treeUpdatePtr(db, pCsr, iNew);
    }
  }
}

static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
  TreeNode *pNode;
  int iNode = pCsr->iNode;
  int iCell = pCsr->aiCell[iNode]+1;

  /* Cursor currently points to a leaf node. */
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    if( iCell<3 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc);
      assert( rc==LSM_OK );
      return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
    }
    iNode--;
    iCell = pCsr->aiCell[iNode];
  }

  return 0;
}

static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
  TreeNode *pNode;
  int iNode = pCsr->iNode;

  /* Cursor currently points to a leaf node. */
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    int iCell = pCsr->aiCell[iNode]-1;
    if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc);
      assert( rc==LSM_OK );
      return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
    }
    iNode--;
  }

  return 0;
}


static int treeInsertEntry(
  lsm_db *pDb,                    /* Database handle */
  int flags,                      /* Flags associated with entry */
  void *pKey,                     /* Pointer to key data */
  int nKey,                       /* Size of key data in bytes */
  void *pVal,                     /* Pointer to value data (or NULL) */
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int rc = LSM_OK;                /* Return Code */
  TreeKey *pTreeKey;              /* New key-value being inserted */
  u32 iTreeKey;
  TreeRoot *p = &pDb->treehdr.root;
  TreeCursor csr;                 /* Cursor to seek to pKey/nKey */
  int res;                        /* Result of seek operation on csr */

  assert( nVal>=0 || pVal==0 );
  assert_tree_looks_ok(LSM_OK, pTree);
  assert( flags==LSM_INSERT       || flags==LSM_POINT_DELETE 
       || flags==LSM_START_DELETE || flags==LSM_END_DELETE 
  );
#if 0
  dump_tree_contents(pDb, "before");
#endif

  if( p->iRoot ){
    TreeKey *pRes;                /* Key at end of seek operation */
    treeCursorInit(pDb, 0, &csr);

    /* Seek to the leaf (or internal node) that the new key belongs on */
    rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res);
    pRes = csrGetKey(&csr, &csr.blob, &rc);
    if( rc!=LSM_OK ) return rc;

    if( flags==LSM_START_DELETE ){
      /* When inserting a start-delete-range entry, if the key that
      ** occurs immediately before the new entry is already a START_DELETE,
      ** then the new entry is not required.  */
      if( (res<=0 && (pRes->flags & LSM_START_DELETE))
       || (res>0  && treePrevIsStartDelete(pDb, &csr))
      ){ 
        goto insert_entry_out;
      }
    }else if( flags==LSM_END_DELETE ){
      /* When inserting an start-delete-range entry, if the key that
      ** occurs immediately after the new entry is already an END_DELETE,
      ** then the new entry is not required.  */
      if( (res<0  && treeNextIsEndDelete(pDb, &csr))
       || (res>=0 && (pRes->flags & LSM_END_DELETE))
      ){
        goto insert_entry_out;
      }
    }

    if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){
      if( pRes->flags & LSM_INSERT ){
        nVal = pRes->nValue;
        pVal = TK_VAL(pRes);
      }
      flags = flags | pRes->flags;
    }

    if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){
      if( (res<0 && (pRes->flags & LSM_START_DELETE))
       || (res>0 && (pRes->flags & LSM_END_DELETE)) 
      ){
        flags = flags | (LSM_END_DELETE|LSM_START_DELETE);
      }else if( res==0 ){
        flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE));
      }
    }
  }else{
    memset(&csr, 0, sizeof(TreeCursor));
  }

  /* Allocate and populate a new key-value pair structure */
  pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
  if( rc!=LSM_OK ) return rc;
  pTreeKey->flags = flags;

  if( p->iRoot==0 ){
    /* The tree is completely empty. Add a new root node and install
    ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
    ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
    ** space for the fields used by interior nodes). This is because the
    ** treeInsert() routine may convert this node to an interior node. */
................................................................................
    TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
    if( rc==LSM_OK ){
      assert( p->nHeight==0 );
      pRoot->aiKeyPtr[1] = iTreeKey;
      p->nHeight = 1;
    }
  }else{







    if( res==0 ){
      /* The search found a match within the tree. */





















      treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
    }else{
      /* The cursor now points to the leaf node into which the new entry should
      ** be inserted. There may or may not be a free slot within the leaf for
      ** the new key-value pair. 
      **
      ** iSlot is set to the index of the key within pLeaf that the new key
      ** should be inserted to the left of (or to a value 1 greater than the
................................................................................
      int iSlot = csr.aiCell[csr.iNode] + (res<0);
      if( csr.iNode==0 ){
        rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
      }else{
        rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
      }
    }

  }

#if 0
  dump_tree_contents(pDb, "after");
#endif
 insert_entry_out:
  tblobFree(pDb, &csr.blob);
  assert_tree_looks_ok(rc, pTree);
  return rc;
}

/*
** Insert a new entry into the in-memory tree.
**
** If the value of the 5th parameter, nVal, is negative, then a delete-marker
** is inserted into the tree. In this case the value pointer, pVal, must be
** NULL.
*/
int lsmTreeInsert(
  lsm_db *pDb,                    /* Database handle */
  void *pKey,                     /* Pointer to key data */
  int nKey,                       /* Size of key data in bytes */
  void *pVal,                     /* Pointer to value data (or NULL) */
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int flags;
  if( nVal<0 ){
    flags = LSM_POINT_DELETE;
  }else{
    flags = LSM_INSERT;
  }

  return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal);
}

static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
  TreeRoot *p = &db->treehdr.root;
  TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
  int iSlot = pCsr->aiCell[pCsr->iNode];
  int bLeaf;
  int rc = LSM_OK;

  assert( pNode->aiKeyPtr[1] );
  assert( pNode->aiKeyPtr[iSlot] );
  assert( iSlot==0 || iSlot==1 || iSlot==2 );
  assert( (pCsr->iNode==(db->treehdr.root.nHeight-1))==(iNewptr==0) );

  bLeaf = (pCsr->iNode==(p->nHeight-1) && p->nHeight>1);
  
  if( pNode->aiKeyPtr[0] || pNode->aiKeyPtr[2] ){
    /* There are currently at least 2 keys on this node. So just create
    ** a new copy of the node with one of the keys removed. If the node
    ** happens to be the root node of the tree, allocate an entire 
    ** TreeNode structure instead of just a TreeLeaf.  */
    TreeNode *pNew;
    u32 iNew;

    if( bLeaf ){
      pNew = (TreeNode *)newTreeLeaf(db, &iNew, &rc);
    }else{
      pNew = newTreeNode(db, &iNew, &rc);
    }
    if( pNew ){
      int i;
      int iOut = 1;
      for(i=0; i<4; i++){
        if( i==iSlot ){
          i++;
          if( bLeaf==0 ) pNew->aiChildPtr[iOut] = iNewptr;
          if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
          iOut++;
        }else if( bLeaf || p->nHeight==1 ){
          if( i<3 && pNode->aiKeyPtr[i] ){
            pNew->aiKeyPtr[iOut++] = pNode->aiKeyPtr[i];
          }
        }else{
          if( getChildPtr(pNode, WORKING_VERSION, i) ){
            pNew->aiChildPtr[iOut] = getChildPtr(pNode, WORKING_VERSION, i);
            if( i<3 ) pNew->aiKeyPtr[iOut] = pNode->aiKeyPtr[i];
            iOut++;
          }
        }
      }
      assert( iOut<=4 );
      assert( bLeaf || pNew->aiChildPtr[0]==0 );
      pCsr->iNode--;
      rc = treeUpdatePtr(db, pCsr, iNew);
    }

  }else if( pCsr->iNode==0 ){
    /* Removing the only key in the root node. iNewptr is the new root. */
    assert( iSlot==1 );
    db->treehdr.root.iRoot = iNewptr;
    db->treehdr.root.nHeight--;

  }else{
    /* There is only one key on this node and the node is not the root
    ** node. Find a peer for this node. Then redistribute the contents of
    ** the peer and the parent cell between the parent and either one or
    ** two new nodes.  */
    TreeNode *pParent;            /* Parent tree node */
    int iPSlot;
    u32 iPeer;                    /* Pointer to peer leaf node */
    int iDir;
    TreeNode *pPeer;              /* The peer leaf node */
    TreeNode *pNew1; u32 iNew1;   /* First new leaf node */

    assert( iSlot==1 );

    pParent = pCsr->apTreeNode[pCsr->iNode-1];
    iPSlot = pCsr->aiCell[pCsr->iNode-1];

    if( iPSlot>0 && getChildPtr(pParent, WORKING_VERSION, iPSlot-1) ){
      iDir = -1;
    }else{
      iDir = +1;
    }
    iPeer = getChildPtr(pParent, WORKING_VERSION, iPSlot+iDir);
    pPeer = (TreeNode *)treeShmptr(db, iPeer, &rc);
    assertIsWorkingChild(db, pNode, pParent, iPSlot);

    /* Allocate the first new leaf node. This is always required. */
    if( bLeaf ){
      pNew1 = (TreeNode *)newTreeLeaf(db, &iNew1, &rc);
    }else{
      pNew1 = (TreeNode *)newTreeNode(db, &iNew1, &rc);
    }

    if( pPeer->aiKeyPtr[0] && pPeer->aiKeyPtr[2] ){
      /* Peer node is completely full. This means that two new leaf nodes
      ** and a new parent node are required. */

      TreeNode *pNew2; u32 iNew2; /* Second new leaf node */
      TreeNode *pNewP; u32 iNewP; /* New parent node */

      if( bLeaf ){
        pNew2 = (TreeNode *)newTreeLeaf(db, &iNew2, &rc);
      }else{
        pNew2 = (TreeNode *)newTreeNode(db, &iNew2, &rc);
      }
      pNewP = copyTreeNode(db, pParent, &iNewP, &rc);

      if( iDir==-1 ){
        pNew1->aiKeyPtr[1] = pPeer->aiKeyPtr[0];
        if( bLeaf==0 ){
          pNew1->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 0);
          pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 1);
        }

        pNewP->aiChildPtr[iPSlot-1] = iNew1;
        pNewP->aiKeyPtr[iPSlot-1] = pPeer->aiKeyPtr[1];
        pNewP->aiChildPtr[iPSlot] = iNew2;

        pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[2];
        pNew2->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot-1];
        if( bLeaf==0 ){
          pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 2);
          pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 3);
          pNew2->aiChildPtr[2] = iNewptr;
        }
      }else{
        pNew1->aiKeyPtr[1] = pParent->aiKeyPtr[iPSlot];
        if( bLeaf==0 ){
          pNew1->aiChildPtr[1] = iNewptr;
          pNew1->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 0);
        }

        pNewP->aiChildPtr[iPSlot] = iNew1;
        pNewP->aiKeyPtr[iPSlot] = pPeer->aiKeyPtr[0];
        pNewP->aiChildPtr[iPSlot+1] = iNew2;

        pNew2->aiKeyPtr[0] = pPeer->aiKeyPtr[1];
        pNew2->aiKeyPtr[1] = pPeer->aiKeyPtr[2];
        if( bLeaf==0 ){
          pNew2->aiChildPtr[0] = getChildPtr(pPeer, WORKING_VERSION, 1);
          pNew2->aiChildPtr[1] = getChildPtr(pPeer, WORKING_VERSION, 2);
          pNew2->aiChildPtr[2] = getChildPtr(pPeer, WORKING_VERSION, 3);
        }
      }
      assert( pCsr->iNode>=1 );
      pCsr->iNode -= 2;
      if( rc==LSM_OK ){
        assert( pNew1->aiKeyPtr[1] && pNew2->aiKeyPtr[1] );
        rc = treeUpdatePtr(db, pCsr, iNewP);
      }
    }else{
      int iKOut = 0;
      int iPOut = 0;
      int i;

      pCsr->iNode--;

      if( iDir==1 ){
        pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
        if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
      }
      for(i=0; i<3; i++){
        if( pPeer->aiKeyPtr[i] ){
          pNew1->aiKeyPtr[iKOut++] = pPeer->aiKeyPtr[i];
        }
      }
      if( bLeaf==0 ){
        for(i=0; i<4; i++){
          if( getChildPtr(pPeer, WORKING_VERSION, i) ){
            pNew1->aiChildPtr[iPOut++] = getChildPtr(pPeer, WORKING_VERSION, i);
          }
        }
      }
      if( iDir==-1 ){
        iPSlot--;
        pNew1->aiKeyPtr[iKOut++] = pParent->aiKeyPtr[iPSlot];
        if( bLeaf==0 ) pNew1->aiChildPtr[iPOut++] = iNewptr;
        pCsr->aiCell[pCsr->iNode] = iPSlot;
      }

      rc = treeDeleteEntry(db, pCsr, iNew1);
    }
  }

  return rc;
}

/*
** Delete a range of keys from the tree structure (i.e. the lsm_delete_range()
** function, not lsm_delete()).
**
** This is a two step process: 
**
**     1) Remove all entries currently stored in the tree that have keys
**        that fall into the deleted range.
**
**        TODO: There are surely good ways to optimize this step - removing 
**        a range of keys from a b-tree. But for now, this function removes
**        them one at a time using the usual approach.
**
**     2) Unless the largest key smaller than or equal to (pKey1/nKey1) is
**        already marked as START_DELETE, insert a START_DELETE key. 
**        Similarly, unless the smallest key greater than or equal to
**        (pKey2/nKey2) is already START_END, insert a START_END key.
*/
int lsmTreeDelete(
  lsm_db *db,
  void *pKey1, int nKey1,         /* Start of range */
  void *pKey2, int nKey2          /* End of range */
){
  int rc = LSM_OK;
  int bDone = 0;
  TreeRoot *p = &db->treehdr.root;
  TreeBlob blob = {0, 0};

  /* The range must be sensible - that (key1 < key2). */
  assert( db->xCmp(pKey1, nKey1, pKey2, nKey2)<0 );
  assert( assert_delete_ranges_match(db) );

#if 0
  static int nCall = 0;
  printf("\n");
  nCall++;
  printf("%d delete %s .. %s\n", nCall, (char *)pKey1, (char *)pKey2);
  dump_tree_contents(db, "before delete");
#endif

  /* Step 1. This loop runs until the tree contains no keys within the
  ** range being deleted. Or until an error occurs. */
  while( bDone==0 && rc==LSM_OK ){
    int res;
    TreeCursor csr;               /* Cursor to seek to first key in range */
    void *pDel; int nDel;         /* Key to (possibly) delete this iteration */
#ifndef NDEBUG
    int nEntry = treeCountEntries(db);
#endif

    /* Seek the cursor to the first entry in the tree greater than pKey1. */
    treeCursorInit(db, 0, &csr);
    lsmTreeCursorSeek(&csr, pKey1, nKey1, &res);
    if( res<=0 && lsmTreeCursorValid(&csr) ) lsmTreeCursorNext(&csr);

    /* If there is no such entry, or if it is greater than pKey2, then the
    ** tree now contains no keys in the range being deleted. In this case
    ** break out of the loop.  */
    bDone = 1;
    if( lsmTreeCursorValid(&csr) ){
      lsmTreeCursorKey(&csr, 0, &pDel, &nDel);
      if( db->xCmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
    }

    if( bDone==0 ){
      if( csr.iNode==(p->nHeight-1) ){
        /* The element to delete already lies on a leaf node */
        rc = treeDeleteEntry(db, &csr, 0);
      }else{
        /* 1. Overwrite the current key with a copy of the next key in the 
        **    tree (key N).
        **
        ** 2. Seek to key N (cursor will stop at the internal node copy of
        **    N). Move to the next key (original copy of N). Delete
        **    this entry. 
        */
        u32 iKey;
        TreeKey *pKey;
        int iNode = csr.iNode;
        lsmTreeCursorNext(&csr);
        assert( csr.iNode==(p->nHeight-1) );

        iKey = csr.apTreeNode[csr.iNode]->aiKeyPtr[csr.aiCell[csr.iNode]];
        lsmTreeCursorPrev(&csr);

        treeOverwriteKey(db, &csr, iKey, &rc);
        pKey = treeShmkey(db, iKey, TK_LOADKEY, &blob, &rc);
        if( pKey ){
          rc = lsmTreeCursorSeek(&csr, TK_KEY(pKey), pKey->nKey, &res);
        }
        if( rc==LSM_OK ){
          assert( res==0 && csr.iNode==iNode );
          rc = lsmTreeCursorNext(&csr);
          if( rc==LSM_OK ){
            rc = treeDeleteEntry(db, &csr, 0);
          }
        }
      }
    }

    /* Clean up any memory allocated by the cursor. */
    tblobFree(db, &csr.blob);
#if 0
    dump_tree_contents(db, "ddd delete");
#endif
    assert( bDone || treeCountEntries(db)==(nEntry-1) );
  }

#if 0
  dump_tree_contents(db, "during delete");
#endif

  /* Now insert the START_DELETE and END_DELETE keys. */
  if( rc==LSM_OK ){
    rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1);
  }
#if 0
  dump_tree_contents(db, "during delete 2");
#endif
  if( rc==LSM_OK ){
    rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1);
  }

#if 0
  dump_tree_contents(db, "after delete");
#endif

  tblobFree(db, &blob);
  assert( assert_delete_ranges_match(db) );
  return rc;
}

/*
** Return, in bytes, the amount of memory currently used by the tree 
** structure.
*/
int lsmTreeSize(lsm_db *pDb){
  return pDb->treehdr.nByte;
................................................................................
      if( iCell<3 && pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[iCell] ) break;
    }
  }

#ifndef NDEBUG
  if( pCsr->iNode>=0 ){
    TreeKey *pK2 = csrGetKey(pCsr, &pCsr->blob, &rc);
    assert( rc || pDb->xCmp(TK_KEY(pK2),pK2->nKey,TK_KEY(pK1),pK1->nKey)>=0 );
  }
  tblobFree(pDb, &key1);
#endif

  return rc;
}

................................................................................

/*
** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
** in-memory tree.
*/
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
  lsm_db *pDb = pCsr->pDb;

  TreeRoot *pRoot = pCsr->pRoot;
  int rc = LSM_OK;

  u32 iNodePtr;
  pCsr->iNode = -1;

  /* Discard any saved position data */
................................................................................
    }
    pCsr->aiCell[pCsr->iNode] = iCell - (iNodePtr==0 && bLast);
  }

  return rc;
}

int lsmTreeCursorFlags(TreeCursor *pCsr){
  int flags = 0;
  if( pCsr && pCsr->iNode>=0 ){
    int rc = LSM_OK;
    TreeKey *pKey = (TreeKey *)treeShmptr(pCsr->pDb,
        pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], &rc
    );
    assert( rc==LSM_OK );
    flags = pKey->flags;
  }
  return flags;
}

int lsmTreeCursorKey(TreeCursor *pCsr, int *pFlags, void **ppKey, int *pnKey){
  TreeKey *pTreeKey;
  int rc = LSM_OK;

  assert( lsmTreeCursorValid(pCsr) );

  pTreeKey = pCsr->pSave;
  if( !pTreeKey ){
    pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
  }
  if( rc==LSM_OK ){
    *pnKey = pTreeKey->nKey;
    if( pFlags ) *pFlags = pTreeKey->flags;
    *ppKey = (void *)&pTreeKey[1];
  }

  return rc;
}

int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal){
................................................................................
  int res = 0;
  int rc;

  rc = treeCursorRestore(pCsr, &res);
  if( res==0 ){
    TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
    if( rc==LSM_OK ){
      if( pTreeKey->flags & LSM_INSERT ){
        *pnVal = pTreeKey->nValue;
        *ppVal = TK_VAL(pTreeKey);
      }else{
        *ppVal = 0;
        *pnVal = -1;
      }
    }
  }else{
    *ppVal = 0;
    *pnVal = 0;
  }

................................................................................
    memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
  }
  pShm->bWriter = 0;
  intArrayFree(pDb->pEnv, &pDb->rollback);

  return LSM_OK;
}

#ifndef NDEBUG
static int assert_delete_ranges_match(lsm_db *db){
  int prev = 0;
  TreeBlob blob = {0, 0};
  TreeCursor csr;               /* Cursor used to iterate through tree */
  int rc;

  treeCursorInit(db, 0, &csr);
  for( rc = lsmTreeCursorEnd(&csr, 0);
       rc==LSM_OK && lsmTreeCursorValid(&csr);
       rc = lsmTreeCursorNext(&csr)
  ){
    TreeKey *pKey = csrGetKey(&csr, &blob, &rc);
    if( rc!=LSM_OK ) break;
    assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) );
    prev = pKey->flags;
  }

  tblobFree(csr.pDb, &csr.blob);

  return 1;
}

static int treeCountEntries(lsm_db *db){
  TreeCursor csr;               /* Cursor used to iterate through tree */
  int rc;
  int nEntry = 0;

  treeCursorInit(db, 0, &csr);
  for( rc = lsmTreeCursorEnd(&csr, 0);
       rc==LSM_OK && lsmTreeCursorValid(&csr);
       rc = lsmTreeCursorNext(&csr)
  ){
    nEntry++;
  }

  tblobFree(csr.pDb, &csr.blob);

  return nEntry;
}
#endif