SQLite4
Check-in [9879e2a63d]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fixes for range-deletes on the in-memory tree structure.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | range-delete
Files: files | file ages | folders
SHA1: 9879e2a63da7187aa829b773db9a65401f224224
User & Date: dan 2012-10-08 17:08:15
Context
2012-10-09
19:55
Fix further bugs in in-memory tree. Progress on writing range-deletes into the database file. check-in: 9081b1c92c user: dan tags: range-delete
2012-10-08
17:08
Fixes for range-deletes on the in-memory tree structure. check-in: 9879e2a63d user: dan tags: range-delete
2012-10-06
20:38
Add tests for range-deletes. Fix some things. Still doesn't work properly. check-in: 178f7d5eca user: dan tags: range-delete
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

143
144
145
146
147
148
149










150
151
152
153
154
155
156
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
*/
#define LSM_MAX_FREELIST_ENTRIES 100

#define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000











/*
** A string that can grow by appending.
*/
struct LsmString {
  lsm_env *pEnv;              /* Run-time environment */
  int n;                      /* Size of string.  -1 indicates error */
  int nAlloc;                 /* Space allocated for z[] */







>
>
>
>
>
>
>
>
>
>







143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
*/
#define LSM_MAX_FREELIST_ENTRIES 100

#define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000


/*
** Each entry stored in the LSM (or in-memory tree structure) has an
** associated mask of the following flags.
*/
#define LSM_START_DELETE 0x01     /* Start of open-ended delete range */
#define LSM_END_DELETE   0x02     /* End of open-ended delete range */
#define LSM_POINT_DELETE 0x04     /* Delete this key */
#define LSM_INSERT       0x08     /* Insert this key and value */

/*
** A string that can grow by appending.
*/
struct LsmString {
  lsm_env *pEnv;              /* Run-time environment */
  int n;                      /* Size of string.  -1 indicates error */
  int nAlloc;                 /* Space allocated for z[] */

Changes to src/lsm_tree.c.

96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
...
127
128
129
130
131
132
133


134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
...
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
...
448
449
450
451
452
453
454












455
456
457
458
459

460
461

462
463
464
465
466
467



468
469
470
471
472
473
474
475
476
477
478

479
480
481
482
483
484

485
486

487
488



489
490









491
492
493
494
495
496
497

498
499

500
501
502
503
504
505
506
507
508
...
521
522
523
524
525
526
527
528
529
530
531


532
533
534
535
536
537
538
....
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330

1331
1332

1333
1334








































1335
1336

1337
1338
1339
1340
1341
1342
1343
1344
1345


1346
1347
1348



1349
1350
1351
1352


















































1353
1354
1355

1356
1357
1358
1359
1360
1361
1362
....
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
....
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402


1403
1404
1405
























1406
1407
1408
1409
1410
1411
1412
....
1586
1587
1588
1589
1590
1591
1592
1593

1594
1595
1596
1597
1598
1599
1600
....
1614
1615
1616
1617
1618
1619
1620









1621
1622
1623
1624
1625
1626
1627
....
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
....
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696













1697

1698
1699
1700
1701
1702
1703
1704
....
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
....
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056

2057
2058
2059
2060
2061
2062
2063
....
2191
2192
2193
2194
2195
2196
2197
2198

























struct TreeOld {
  u32 iShmid;                     /* Last shared-memory chunk in use by old */
  u32 iRoot;                      /* Offset of root node in shm file */
  u32 nHeight;                    /* Height of tree structure */
};

/*
** Flags for the TreeKey.flags variable.
*/
#define LSM_START_DELETE 0x01     /* Start of delete range */
#define LSM_END_DELETE   0x02     /* End of delete range */
#define LSM_POINT_DELETE 0x04     /* Delete this key */
#define LSM_INSERT       0x08     /* Insert this key and value */

#ifndef NDEBUG
/*
** assert() that a TreeKey.flags value is sane. Usage:
**
**   assert( assertFlagsOk(pTreeKey->flags) );
*/
static int assertFlagsOk(u8 keyflags){
................................................................................
  assert( (keyflags & LSM_END_DELETE)==0 
       || (keyflags & LSM_START_DELETE)==0 
       || (keyflags & LSM_POINT_DELETE)==0 
  );

  return 1;
}


#endif

/*
** Container for a key-value pair. Within the *-shm file, each key/value
** pair is stored in a single allocation (which may not actually be 
** contiguous in memory). Layout is the TreeKey structure, followed by
** the nKey bytes of key blob, followed by the nValue bytes of value blob
** (if nValue is non-negative).
*/
struct TreeKey {
  int nKey;                       /* Size of pKey in bytes */
  int nValue;                     /* Size of pValue. Or negative. */
  int flags;
};

#define TK_KEY(p) ((void *)&(p)[1])
#define TK_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))

/*
** A single tree node. A node structure may contain up to 3 key/value
................................................................................
  int i;
  lsmStringExtend(pStr, nBlob*2);
  if( pStr->nAlloc==0 ) return;
  for(i=0; i<nBlob; i++){
    u8 c = ((u8*)pBlob)[i];
    if( c>='a' && c<='z' ){
      pStr->z[pStr->n++] = c;
    }else{
      pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
      pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
    }
  }
  pStr->z[pStr->n] = 0;
}

................................................................................
** Append nIndent space (0x20) characters to string *pStr.
*/
static void lsmAppendIndent(LsmString *pStr, int nIndent){
  int i;
  lsmStringExtend(pStr, nIndent);
  for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
}













void dump_node_contents(
  lsm_db *pDb,
  u32 iNode,                      /* Print out hte contents of this node */
  int nIndent,                    /* Number of spaces indentation */

  int nHeight                     /* Height: (0==leaf) (1==parent-of-leaf) */
){

  int i;
  int rc = LSM_OK;
  LsmString s;
  TreeNode *pNode;
  TreeBlob b = {0, 0};




  /* Append the nIndent bytes of space to string s. */
  lsmStringInit(&s, pDb->pEnv);
  if( nIndent ) lsmAppendIndent(&s, nIndent);

  pNode = (TreeNode *)treeShmptr(pDb, iNode, &rc);

  /* Append each key to string s. */
  for(i=0; i<3; i++){
    u32 iPtr = pNode->aiKeyPtr[i];
    if( iPtr ){
      TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b, &rc);

      lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
      lsmStringAppend(&s, "     ", -1);
    }
  }

  printf("%s\n", s.z);

  lsmStringClear(&s);


  for(i=0; i<4 && nHeight>0; i++){
    u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);



    if( iPtr ){
      dump_node_contents(pDb, iPtr, nIndent + 2, nHeight-1);









    }
  }

  tblobFree(pDb, &b);
}

void dump_tree_contents(lsm_db *pDb, const char *zCaption){

  TreeRoot *p = &pDb->treehdr.root;
  printf("\n%s\n", zCaption);

  if( p->iRoot ){
    dump_node_contents(pDb, p->iRoot, 0, p->nHeight-1);
  }
  fflush(stdout);
}

#endif

/*
................................................................................
}

/*
** Return a pointer to the mapping of the TreeKey object that the cursor
** is pointing to. 
*/
static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
  return (TreeKey *)treeShmkey(pCsr->pDb,
      pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], 
      TK_LOADVAL, pBlob, pRc
  );


}

/*
** Save the current position of tree cursor pCsr.
*/
int lsmTreeCursorSave(TreeCursor *pCsr){
  int rc = LSM_OK;
................................................................................
       ** TreeNode */
      pCsr->iNode--;
      treeUpdatePtr(db, pCsr, iNew);
    }
  }
}

/*
** Insert a new entry into the in-memory tree.
**

** If the value of the 5th parameter, nVal, is negative, then a delete-marker
** is inserted into the tree. In this case the value pointer, pVal, must be

** NULL.
*/








































int lsmTreeInsert(
  lsm_db *pDb,                    /* Database handle */

  void *pKey,                     /* Pointer to key data */
  int nKey,                       /* Size of key data in bytes */
  void *pVal,                     /* Pointer to value data (or NULL) */
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int rc = LSM_OK;                /* Return Code */
  TreeKey *pTreeKey;              /* New key-value being inserted */
  u32 iTreeKey;
  TreeRoot *p = &pDb->treehdr.root;



  assert( nVal>=0 || pVal==0 );
  assert_tree_looks_ok(LSM_OK, pTree);



#if 0
  dump_tree_contents(pDb, "before");
#endif



















































  /* Allocate and populate a new key-value pair structure */
  pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
  if( rc!=LSM_OK ) return rc;


  if( p->iRoot==0 ){
    /* The tree is completely empty. Add a new root node and install
    ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
    ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
    ** space for the fields used by interior nodes). This is because the
    ** treeInsert() routine may convert this node to an interior node. */
................................................................................
    TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
    if( rc==LSM_OK ){
      assert( p->nHeight==0 );
      pRoot->aiKeyPtr[1] = iTreeKey;
      p->nHeight = 1;
    }
  }else{
    TreeCursor csr;
    int res;

    /* Seek to the leaf (or internal node) that the new key belongs on */
    treeCursorInit(pDb, 0, &csr);
    lsmTreeCursorSeek(&csr, pKey, nKey, &res);

    if( res==0 ){
      /* The search found a match within the tree. */
      treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
    }else{
      /* The cursor now points to the leaf node into which the new entry should
      ** be inserted. There may or may not be a free slot within the leaf for
      ** the new key-value pair. 
................................................................................
      int iSlot = csr.aiCell[csr.iNode] + (res<0);
      if( csr.iNode==0 ){
        rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
      }else{
        rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
      }
    }
    tblobFree(pDb, &csr.blob);
  }

#if 0
  dump_tree_contents(pDb, "after");
#endif


  assert_tree_looks_ok(rc, pTree);
  return rc;
}

























static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
  TreeRoot *p = &db->treehdr.root;
  TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
  int iSlot = pCsr->aiCell[pCsr->iNode];
  int bLeaf;
  int rc = LSM_OK;
................................................................................
    }
  }

  return rc;
}

/*
** Delete a range of keys from the tree structure.

**
** This is a two step process: 
**
**     1) Remove all entries currently stored in the tree that have keys
**        that fall into the deleted range.
**
**        TODO: There are surely good ways to optimize this step - removing 
................................................................................
  int rc = LSM_OK;
  int bDone = 0;
  TreeRoot *p = &db->treehdr.root;
  TreeBlob blob = {0, 0};

  /* The range must be sensible - that (key1 < key2). */
  assert( db->xCmp(pKey1, nKey1, pKey2, nKey2)<0 );










  /* Step 1. This loop runs until the tree contains no keys within the
  ** range being deleted. Or until an error occurs. */
  while( bDone==0 && rc==LSM_OK ){
    int res;
    TreeCursor csr;               /* Cursor to seek to first key in range */
    void *pDel; int nDel;         /* Key to (possibly) delete this iteration */
................................................................................
    bDone = 1;
    if( lsmTreeCursorValid(&csr) ){
      lsmTreeCursorKey(&csr, &pDel, &nDel);
      if( db->xCmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
    }

    if( bDone==0 ){
#if 0
dump_tree_contents(db, "BEFORE");
fprintf(stderr, "DELETE %s\n", (char *)pDel);
static nCall = 0;
nCall ++;
fprintf(stderr, "%d\n", nCall);
#endif

      if( csr.iNode==(p->nHeight-1) ){
        /* The element to delete already lies on a leaf node */
        rc = treeDeleteEntry(db, &csr, 0);
      }else{
        /* 1. Overwrite the current key with a copy of the next key in the 
        **    tree (key N).
        **
................................................................................
        if( pKey ){
          rc = lsmTreeCursorSeek(&csr, TK_KEY(pKey), pKey->nKey, &res);
        }
        if( rc==LSM_OK ){
          assert( res==0 && csr.iNode==iNode );
          rc = lsmTreeCursorNext(&csr);
          if( rc==LSM_OK ){
#if 0
dump_tree_contents(db, "DURING");
#endif
            rc = treeDeleteEntry(db, &csr, 0);
          }
        }
      }
#if 0
dump_tree_contents(db, "AFTER");
#endif
    }

    /* Clean up any memory allocated by the cursor. */
    tblobFree(db, &csr.blob);
  }














  tblobFree(db, &blob);

  return rc;
}

/*
** Return, in bytes, the amount of memory currently used by the tree 
** structure.
*/
................................................................................

/*
** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
** in-memory tree.
*/
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
  lsm_db *pDb = pCsr->pDb;
  TreeHeader *pHdr = &pDb->treehdr;
  TreeRoot *pRoot = pCsr->pRoot;
  int rc = LSM_OK;

  u32 iNodePtr;
  pCsr->iNode = -1;

  /* Discard any saved position data */
................................................................................
  int res = 0;
  int rc;

  rc = treeCursorRestore(pCsr, &res);
  if( res==0 ){
    TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
    if( rc==LSM_OK ){
      *pnVal = pTreeKey->nValue;
      if( pTreeKey->nValue>=0 ){
        *ppVal = TK_VAL(pTreeKey);
      }else{
        *ppVal = 0;

      }
    }
  }else{
    *ppVal = 0;
    *pnVal = 0;
  }

................................................................................
    memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
  }
  pShm->bWriter = 0;
  intArrayFree(pDb->pEnv, &pDb->rollback);

  return LSM_OK;
}
































<
<
<
<
<
<
<
<







 







>
>












|







 







|







 







>
>
>
>
>
>
>
>
>
>
>
>



|
|
>


>






>
>
>
|
|
<

<
<
|
|
|
|
|
>
|
|
|
|

<
>
|
<
>
|
|
>
>
>
|
|
>
>
>
>
>
>
>
>
>







>


>

|







 







|



>
>







 







|
|
<
>
|
<
>
|
<
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|

>









>
>



>
>
>




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



>







 







<
<
<
<
<
<
<







 







<





>
>



>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|
>







 







>
>
>
>
>
>
>
>
>







 







<
<
<
<
<
<
<
<







 







<
<
<




<
<
<






>
>
>
>
>
>
>
>
>
>
>
>
>

>







 







<







 







|
|



>







 








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
96
97
98
99
100
101
102








103
104
105
106
107
108
109
...
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
...
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
...
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480

481


482
483
484
485
486
487
488
489
490
491
492

493
494

495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
...
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
....
1346
1347
1348
1349
1350
1351
1352
1353
1354

1355
1356

1357
1358

1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
....
1484
1485
1486
1487
1488
1489
1490







1491
1492
1493
1494
1495
1496
1497
....
1504
1505
1506
1507
1508
1509
1510

1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
....
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
....
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
....
1786
1787
1788
1789
1790
1791
1792








1793
1794
1795
1796
1797
1798
1799
....
1815
1816
1817
1818
1819
1820
1821



1822
1823
1824
1825



1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
....
2131
2132
2133
2134
2135
2136
2137

2138
2139
2140
2141
2142
2143
2144
....
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
....
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371

struct TreeOld {
  u32 iShmid;                     /* Last shared-memory chunk in use by old */
  u32 iRoot;                      /* Offset of root node in shm file */
  u32 nHeight;                    /* Height of tree structure */
};









#ifndef NDEBUG
/*
** assert() that a TreeKey.flags value is sane. Usage:
**
**   assert( assertFlagsOk(pTreeKey->flags) );
*/
static int assertFlagsOk(u8 keyflags){
................................................................................
  assert( (keyflags & LSM_END_DELETE)==0 
       || (keyflags & LSM_START_DELETE)==0 
       || (keyflags & LSM_POINT_DELETE)==0 
  );

  return 1;
}

static int assert_delete_ranges_match(lsm_db *);
#endif

/*
** Container for a key-value pair. Within the *-shm file, each key/value
** pair is stored in a single allocation (which may not actually be 
** contiguous in memory). Layout is the TreeKey structure, followed by
** the nKey bytes of key blob, followed by the nValue bytes of value blob
** (if nValue is non-negative).
*/
struct TreeKey {
  int nKey;                       /* Size of pKey in bytes */
  int nValue;                     /* Size of pValue. Or negative. */
  u8 flags;                       /* Various LSM_XXX flags */
};

#define TK_KEY(p) ((void *)&(p)[1])
#define TK_VAL(p) ((void *)(((u8 *)&(p)[1]) + (p)->nKey))

/*
** A single tree node. A node structure may contain up to 3 key/value
................................................................................
  int i;
  lsmStringExtend(pStr, nBlob*2);
  if( pStr->nAlloc==0 ) return;
  for(i=0; i<nBlob; i++){
    u8 c = ((u8*)pBlob)[i];
    if( c>='a' && c<='z' ){
      pStr->z[pStr->n++] = c;
    }else if( c!=0 || nBlob==1 || i!=(nBlob-1) ){
      pStr->z[pStr->n++] = "0123456789abcdef"[(c>>4)&0xf];
      pStr->z[pStr->n++] = "0123456789abcdef"[c&0xf];
    }
  }
  pStr->z[pStr->n] = 0;
}

................................................................................
** Append nIndent space (0x20) characters to string *pStr.
*/
static void lsmAppendIndent(LsmString *pStr, int nIndent){
  int i;
  lsmStringExtend(pStr, nIndent);
  for(i=0; i<nIndent; i++) lsmStringAppend(pStr, " ", 1);
}

static void strAppendFlags(LsmString *pStr, u8 flags){
  char zFlags[5];

  zFlags[0] = (flags & LSM_END_DELETE)   ? 'E' : '.';
  zFlags[1] = (flags & LSM_START_DELETE) ? 'S' : '.';
  zFlags[2] = (flags & LSM_POINT_DELETE) ? 'P' : '.';
  zFlags[3] = (flags & LSM_INSERT)       ? 'I' : '.';
  zFlags[4] = ':';

  lsmStringAppend(pStr, zFlags, 5);
}

void dump_node_contents(
  lsm_db *pDb,
  u32 iNode,                      /* Print out the contents of this node */
  char *zPath,                    /* Path from root to this node */
  int nPath,                      /* Number of bytes in zPath */
  int nHeight                     /* Height: (0==leaf) (1==parent-of-leaf) */
){
  const char *zSpace = "                                           ";
  int i;
  int rc = LSM_OK;
  LsmString s;
  TreeNode *pNode;
  TreeBlob b = {0, 0};

  pNode = (TreeNode *)treeShmptr(pDb, iNode, &rc);

  if( nHeight==0 ){
    /* Append the nIndent bytes of space to string s. */
    lsmStringInit(&s, pDb->pEnv);




    /* Append each key to string s. */
    for(i=0; i<3; i++){
      u32 iPtr = pNode->aiKeyPtr[i];
      if( iPtr ){
        TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b,&rc);
        strAppendFlags(&s, pKey->flags);
        lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
        lsmStringAppend(&s, "     ", -1);
      }
    }


    printf("%.*sleaf%.*s: %s\n", nPath, zPath, 20-nPath-4, zSpace, s.z);
    lsmStringClear(&s);

  }else{
    for(i=0; i<4 && nHeight>0; i++){
      u32 iPtr = getChildPtr(pNode, pDb->treehdr.root.iTransId, i);
      zPath[nPath] = i+'0';
      zPath[nPath+1] = '/';

      if( iPtr ){
        dump_node_contents(pDb, iPtr, zPath, nPath+2, nHeight-1);
      }
      if( i!=3 && pNode->aiKeyPtr[i] ){
        TreeKey *pKey = treeShmkey(pDb, pNode->aiKeyPtr[i], TK_LOADKEY, &b,&rc);
        lsmStringInit(&s, pDb->pEnv);
        strAppendFlags(&s, pKey->flags);
        lsmAppendStrBlob(&s, TK_KEY(pKey), pKey->nKey);
        printf("%.*s%.*s: %s\n", nPath+1, zPath, 20-nPath-1, zSpace, s.z);
        lsmStringClear(&s);
      }
    }
  }

  tblobFree(pDb, &b);
}

void dump_tree_contents(lsm_db *pDb, const char *zCaption){
  char zPath[64];
  TreeRoot *p = &pDb->treehdr.root;
  printf("\n%s\n", zCaption);
  zPath[0] = '/';
  if( p->iRoot ){
    dump_node_contents(pDb, p->iRoot, zPath, 1, p->nHeight-1);
  }
  fflush(stdout);
}

#endif

/*
................................................................................
}

/*
** Return a pointer to the mapping of the TreeKey object that the cursor
** is pointing to. 
*/
static TreeKey *csrGetKey(TreeCursor *pCsr, TreeBlob *pBlob, int *pRc){
  TreeKey *pRet = (TreeKey *)treeShmkey(pCsr->pDb,
      pCsr->apTreeNode[pCsr->iNode]->aiKeyPtr[pCsr->aiCell[pCsr->iNode]], 
      TK_LOADVAL, pBlob, pRc
  );
  assertFlagsOk(pRet->flags);
  return pRet;
}

/*
** Save the current position of tree cursor pCsr.
*/
int lsmTreeCursorSave(TreeCursor *pCsr){
  int rc = LSM_OK;
................................................................................
       ** TreeNode */
      pCsr->iNode--;
      treeUpdatePtr(db, pCsr, iNew);
    }
  }
}

static int treeNextIsEndDelete(lsm_db *db, TreeCursor *pCsr){
  TreeNode *pNode;

  int iNode = pCsr->iNode;
  int iCell = pCsr->aiCell[iNode]+1;


  /* Cursor currently points to a leaf node. */

  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    if( iCell<3 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc);
      assert( rc==LSM_OK );
      return ((pKey->flags & LSM_END_DELETE) ? 1 : 0);
    }
    iNode--;
    iCell = pCsr->aiCell[iNode];
  }

  return 0;
}

static int treePrevIsStartDelete(lsm_db *db, TreeCursor *pCsr){
  TreeNode *pNode;
  int iNode = pCsr->iNode;

  /* Cursor currently points to a leaf node. */
  assert( pCsr->iNode==(db->treehdr.root.nHeight-1) );

  while( iNode>=0 ){
    TreeNode *pNode = pCsr->apTreeNode[iNode];
    int iCell = pCsr->aiCell[iNode]-1;
    if( iCell>=0 && pNode->aiKeyPtr[iCell] ){
      int rc = LSM_OK;
      TreeKey *pKey = treeShmptr(db, pNode->aiKeyPtr[iCell], &rc);
      assert( rc==LSM_OK );
      return ((pKey->flags & LSM_START_DELETE) ? 1 : 0);
    }
    iNode--;
  }

  return 0;
}


static int treeInsertEntry(
  lsm_db *pDb,                    /* Database handle */
  int flags,                      /* Flags associated with entry */
  void *pKey,                     /* Pointer to key data */
  int nKey,                       /* Size of key data in bytes */
  void *pVal,                     /* Pointer to value data (or NULL) */
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int rc = LSM_OK;                /* Return Code */
  TreeKey *pTreeKey;              /* New key-value being inserted */
  u32 iTreeKey;
  TreeRoot *p = &pDb->treehdr.root;
  TreeCursor csr;                 /* Cursor to seek to pKey/nKey */
  int res;                        /* Result of seek operation on csr */

  assert( nVal>=0 || pVal==0 );
  assert_tree_looks_ok(LSM_OK, pTree);
  assert( flags==LSM_INSERT       || flags==LSM_POINT_DELETE 
       || flags==LSM_START_DELETE || flags==LSM_END_DELETE 
  );
#if 0
  dump_tree_contents(pDb, "before");
#endif

  if( p->iRoot ){
    TreeKey *pRes;                /* Key at end of seek operation */
    treeCursorInit(pDb, 0, &csr);

    /* Seek to the leaf (or internal node) that the new key belongs on */
    rc = lsmTreeCursorSeek(&csr, pKey, nKey, &res);
    pRes = csrGetKey(&csr, &csr.blob, &rc);
    if( rc!=LSM_OK ) return rc;

    if( flags==LSM_START_DELETE ){
      /* When inserting a start-delete-range entry, if the key that
      ** occurs immediately before the new entry is already a START_DELETE,
      ** then the new entry is not required.  */
      if( (res<=0 && (pRes->flags & LSM_START_DELETE))
       || (res>0  && treePrevIsStartDelete(pDb, &csr))
      ){ 
        goto insert_entry_out;
      }
    }else if( flags==LSM_END_DELETE ){
      /* When inserting an start-delete-range entry, if the key that
      ** occurs immediately after the new entry is already an END_DELETE,
      ** then the new entry is not required.  */
      if( (res<0  && treeNextIsEndDelete(pDb, &csr))
       || (res>=0 && (pRes->flags & LSM_END_DELETE))
      ){
        goto insert_entry_out;
      }
    }

    if( res==0 && (flags & (LSM_END_DELETE|LSM_START_DELETE)) ){
      if( pRes->flags & LSM_INSERT ){
        nVal = pRes->nValue;
        pVal = TK_VAL(pRes);
      }
      flags = flags | pRes->flags;
    }

    if( flags & (LSM_INSERT|LSM_POINT_DELETE) ){
      if( (res<0 && (pRes->flags & LSM_START_DELETE))
       || (res>0 && (pRes->flags & LSM_END_DELETE)) 
      ){
        flags = flags | (LSM_END_DELETE|LSM_START_DELETE);
      }else if( res==0 ){
        flags = flags | (pRes->flags & (LSM_END_DELETE|LSM_START_DELETE));
      }
    }
  }else{
    memset(&csr, 0, sizeof(TreeCursor));
  }

  /* Allocate and populate a new key-value pair structure */
  pTreeKey = newTreeKey(pDb, &iTreeKey, pKey, nKey, pVal, nVal, &rc);
  if( rc!=LSM_OK ) return rc;
  pTreeKey->flags = flags;

  if( p->iRoot==0 ){
    /* The tree is completely empty. Add a new root node and install
    ** (pKey/nKey) as the middle entry. Even though it is a leaf at the
    ** moment, use newTreeNode() to allocate the node (i.e. allocate enough
    ** space for the fields used by interior nodes). This is because the
    ** treeInsert() routine may convert this node to an interior node. */
................................................................................
    TreeNode *pRoot = newTreeNode(pDb, &p->iRoot, &rc);
    if( rc==LSM_OK ){
      assert( p->nHeight==0 );
      pRoot->aiKeyPtr[1] = iTreeKey;
      p->nHeight = 1;
    }
  }else{







    if( res==0 ){
      /* The search found a match within the tree. */
      treeOverwriteKey(pDb, &csr, iTreeKey, &rc);
    }else{
      /* The cursor now points to the leaf node into which the new entry should
      ** be inserted. There may or may not be a free slot within the leaf for
      ** the new key-value pair. 
................................................................................
      int iSlot = csr.aiCell[csr.iNode] + (res<0);
      if( csr.iNode==0 ){
        rc = treeInsert(pDb, &csr, 0, iTreeKey, 0, iSlot);
      }else{
        rc = treeInsertLeaf(pDb, &csr, iTreeKey, iSlot);
      }
    }

  }

#if 0
  dump_tree_contents(pDb, "after");
#endif
 insert_entry_out:
  tblobFree(pDb, &csr.blob);
  assert_tree_looks_ok(rc, pTree);
  return rc;
}

/*
** Insert a new entry into the in-memory tree.
**
** If the value of the 5th parameter, nVal, is negative, then a delete-marker
** is inserted into the tree. In this case the value pointer, pVal, must be
** NULL.
*/
int lsmTreeInsert(
  lsm_db *pDb,                    /* Database handle */
  void *pKey,                     /* Pointer to key data */
  int nKey,                       /* Size of key data in bytes */
  void *pVal,                     /* Pointer to value data (or NULL) */
  int nVal                        /* Bytes in value data (or -ve for delete) */
){
  int flags;
  if( nVal<0 ){
    flags = LSM_POINT_DELETE;
  }else{
    flags = LSM_INSERT;
  }

  return treeInsertEntry(pDb, flags, pKey, nKey, pVal, nVal);
}

static int treeDeleteEntry(lsm_db *db, TreeCursor *pCsr, u32 iNewptr){
  TreeRoot *p = &db->treehdr.root;
  TreeNode *pNode = pCsr->apTreeNode[pCsr->iNode];
  int iSlot = pCsr->aiCell[pCsr->iNode];
  int bLeaf;
  int rc = LSM_OK;
................................................................................
    }
  }

  return rc;
}

/*
** Delete a range of keys from the tree structure (i.e. the lsm_delete_range()
** function, not lsm_delete()).
**
** This is a two step process: 
**
**     1) Remove all entries currently stored in the tree that have keys
**        that fall into the deleted range.
**
**        TODO: There are surely good ways to optimize this step - removing 
................................................................................
  int rc = LSM_OK;
  int bDone = 0;
  TreeRoot *p = &db->treehdr.root;
  TreeBlob blob = {0, 0};

  /* The range must be sensible - that (key1 < key2). */
  assert( db->xCmp(pKey1, nKey1, pKey2, nKey2)<0 );
  assert( assert_delete_ranges_match(db) );

#if 0
  static int nCall = 0;
  printf("\n");
  nCall++;
  printf("%d delete %s .. %s\n", nCall, (char *)pKey1, (char *)pKey2);
  dump_tree_contents(db, "before delete");
#endif

  /* Step 1. This loop runs until the tree contains no keys within the
  ** range being deleted. Or until an error occurs. */
  while( bDone==0 && rc==LSM_OK ){
    int res;
    TreeCursor csr;               /* Cursor to seek to first key in range */
    void *pDel; int nDel;         /* Key to (possibly) delete this iteration */
................................................................................
    bDone = 1;
    if( lsmTreeCursorValid(&csr) ){
      lsmTreeCursorKey(&csr, &pDel, &nDel);
      if( db->xCmp(pDel, nDel, pKey2, nKey2)<0 ) bDone = 0;
    }

    if( bDone==0 ){








      if( csr.iNode==(p->nHeight-1) ){
        /* The element to delete already lies on a leaf node */
        rc = treeDeleteEntry(db, &csr, 0);
      }else{
        /* 1. Overwrite the current key with a copy of the next key in the 
        **    tree (key N).
        **
................................................................................
        if( pKey ){
          rc = lsmTreeCursorSeek(&csr, TK_KEY(pKey), pKey->nKey, &res);
        }
        if( rc==LSM_OK ){
          assert( res==0 && csr.iNode==iNode );
          rc = lsmTreeCursorNext(&csr);
          if( rc==LSM_OK ){



            rc = treeDeleteEntry(db, &csr, 0);
          }
        }
      }



    }

    /* Clean up any memory allocated by the cursor. */
    tblobFree(db, &csr.blob);
  }

  /* dump_tree_contents(db, "during delete"); */

  /* Now insert the START_DELETE and END_DELETE keys. */
  if( rc==LSM_OK ){
    rc = treeInsertEntry(db, LSM_START_DELETE, pKey1, nKey1, 0, -1);
  }
  /* dump_tree_contents(db, "during delete 2"); */
  if( rc==LSM_OK ){
    rc = treeInsertEntry(db, LSM_END_DELETE, pKey2, nKey2, 0, -1);
  }

  /* dump_tree_contents(db, "after delete"); */

  tblobFree(db, &blob);
  assert( assert_delete_ranges_match(db) );
  return rc;
}

/*
** Return, in bytes, the amount of memory currently used by the tree 
** structure.
*/
................................................................................

/*
** Move the cursor to the first (bLast==0) or last (bLast!=0) entry in the
** in-memory tree.
*/
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast){
  lsm_db *pDb = pCsr->pDb;

  TreeRoot *pRoot = pCsr->pRoot;
  int rc = LSM_OK;

  u32 iNodePtr;
  pCsr->iNode = -1;

  /* Discard any saved position data */
................................................................................
  int res = 0;
  int rc;

  rc = treeCursorRestore(pCsr, &res);
  if( res==0 ){
    TreeKey *pTreeKey = csrGetKey(pCsr, &pCsr->blob, &rc);
    if( rc==LSM_OK ){
      if( pTreeKey->flags & LSM_INSERT ){
        *pnVal = pTreeKey->nValue;
        *ppVal = TK_VAL(pTreeKey);
      }else{
        *ppVal = 0;
        *pnVal = -1;
      }
    }
  }else{
    *ppVal = 0;
    *pnVal = 0;
  }

................................................................................
    memcpy(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader));
  }
  pShm->bWriter = 0;
  intArrayFree(pDb->pEnv, &pDb->rollback);

  return LSM_OK;
}

static int assert_delete_ranges_match(lsm_db *db){
  int prev = 0;
  TreeBlob blob = {0, 0};
  TreeCursor csr;               /* Cursor used to iterate through tree */
  int rc;
  u32 iTransId = db->treehdr.root.iTransId;

  treeCursorInit(db, 0, &csr);
  for( rc = lsmTreeCursorEnd(&csr, 0);
       rc==LSM_OK && lsmTreeCursorValid(&csr);
       rc = lsmTreeCursorNext(&csr)
  ){
    TreeKey *pKey = csrGetKey(&csr, &blob, &rc);
    if( rc!=LSM_OK ) break;
    assert( ((prev&LSM_START_DELETE)==0)==((pKey->flags&LSM_END_DELETE)==0) );
    prev = pKey->flags;
  }

  tblobFree(csr.pDb, &csr.blob);

  return 1;
}