SQLite4
Check-in [752517c1cf]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Various fixes and tests for range-deletes.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | range-delete-fixes
Files: files | file ages | folders
SHA1: 752517c1cfa07f002ddbcc68fa34d04056e4de8b
User & Date: dan 2012-11-03 19:06:11
Context
2012-11-03
19:51
Fix a problem in writing free-list entries to the LSM. check-in: b3b4c58d9a user: dan tags: range-delete-fixes
19:06
Various fixes and tests for range-deletes. check-in: 752517c1cf user: dan tags: range-delete-fixes
2012-11-02
20:13
Change free-list deletes to use range-deletes instead of point-deletes (so that they can be coalesced when segments are merged). This has revealed problems with the range-delete code. check-in: 9374c3a283 user: dan tags: range-delete-fixes
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest.h.

106
107
108
109
110
111
112

113
114
115
116
117
118
119
...
196
197
198
199
200
201
202

203
204
205
206
207
208
209
TestDb *testOpen(const char *zSystem, int, int *pRc);
void testReopen(TestDb **ppDb, int *pRc);
void testClose(TestDb **ppDb);

void testFetch(TestDb *, void *, int, void *, int, int *);
void testWrite(TestDb *, void *, int, void *, int, int *);
void testDelete(TestDb *, void *, int, int *);

void testWriteStr(TestDb *, const char *, const char *zVal, int *pRc);
void testFetchStr(TestDb *, const char *, const char *, int *pRc);

void testBegin(TestDb *pDb, int iTrans, int *pRc);
void testCommit(TestDb *pDb, int iTrans, int *pRc);

void test_failed(void);
................................................................................
void testDeleteDatasource(TestDb *, Datasource *, int, int *);
void testDeleteDatasourceRange(TestDb *, Datasource *, int, int, int *);


/* test1.c */
void test_data_1(const char *, const char *, int *pRc);
void test_data_2(const char *, const char *, int *pRc);

void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *);
void testCaseProgress(int, int, int, int *);
int testCaseNDot(void);

typedef struct CksumDb CksumDb;
CksumDb *testCksumArrayNew(Datasource *, int, int, int);
char *testCksumArrayGet(CksumDb *, int);







>







 







>







106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
...
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
TestDb *testOpen(const char *zSystem, int, int *pRc);
void testReopen(TestDb **ppDb, int *pRc);
void testClose(TestDb **ppDb);

void testFetch(TestDb *, void *, int, void *, int, int *);
void testWrite(TestDb *, void *, int, void *, int, int *);
void testDelete(TestDb *, void *, int, int *);
void testDeleteRange(TestDb *, void *, int, void *, int, int *);
void testWriteStr(TestDb *, const char *, const char *zVal, int *pRc);
void testFetchStr(TestDb *, const char *, const char *, int *pRc);

void testBegin(TestDb *pDb, int iTrans, int *pRc);
void testCommit(TestDb *pDb, int iTrans, int *pRc);

void test_failed(void);
................................................................................
void testDeleteDatasource(TestDb *, Datasource *, int, int *);
void testDeleteDatasourceRange(TestDb *, Datasource *, int, int, int *);


/* test1.c */
void test_data_1(const char *, const char *, int *pRc);
void test_data_2(const char *, const char *, int *pRc);
void test_data_3(const char *, const char *, int *pRc);
void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *);
void testCaseProgress(int, int, int, int *);
int testCaseNDot(void);

typedef struct CksumDb CksumDb;
CksumDb *testCksumArrayNew(Datasource *, int, int, int);
char *testCksumArrayGet(CksumDb *, int);

Changes to lsm-test/lsmtest1.c.

481
482
483
484
485
486
487
488





































































































































    char *zName = getName2(zSystem, &aTest[i]);
    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
      doDataTest2(zSystem, &aTest[i], pRc);
    }
    testFree(zName);
  }
}














































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
    char *zName = getName2(zSystem, &aTest[i]);
    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
      doDataTest2(zSystem, &aTest[i], pRc);
    }
    testFree(zName);
  }
}

/*************************************************************************
** Test case data3.*
*/

typedef struct Datatest3 Datatest3;
struct Datatest3 {
  int nRange;                     /* Keys are between 1 and this value, incl. */
  int nIter;                      /* Number of iterations */
  int nWrite;                     /* Number of writes per iteration */
  int nDelete;                    /* Number of deletes per iteration */

  int nValMin;                    /* Minimum value size for writes */
  int nValMax;                    /* Maximum value size for writes */
};

void testPutU32(u8 *aBuf, u32 iVal){
  aBuf[0] = (iVal >> 24) & 0xFF;
  aBuf[1] = (iVal >> 16) & 0xFF;
  aBuf[2] = (iVal >>  8) & 0xFF;
  aBuf[3] = (iVal >>  0) & 0xFF;
}

void dt3PutKey(u8 *aBuf, int iKey){
  assert( iKey<100000 && iKey>=0 );
  sprintf((char *)aBuf, "%.5d", iKey);
}

static void doDataTest3(
  const char *zSystem,            /* Database system to test */
  Datatest3 *p,                   /* Structure containing test parameters */
  int *pRc                        /* OUT: Error code */
){
  int iDot = 0;
  int rc = *pRc;
  TestDb *pDb;
  u8 *abPresent;                  /* Array of boolean */
  char *aVal;                     /* Buffer to hold values */
  int i;
  u32 iSeq = 10;                  /* prng counter */

  abPresent = (u8 *)testMalloc(p->nRange+1);
  aVal = (char *)testMalloc(p->nValMax+1);
  pDb = testOpen(zSystem, 1, &rc);

  for(i=0; i<p->nIter && rc==0; i++){
    int ii;

    testCaseProgress(i, p->nIter, testCaseNDot(), &iDot);

    /* Perform nWrite inserts */
    for(ii=0; ii<p->nWrite; ii++){
      u8 aKey[6];
      u32 iKey;
      int nVal;

      iKey = (testPrngValue(iSeq++) % p->nRange) + 1;
      nVal = (testPrngValue(iSeq++) % (p->nValMax - p->nValMin)) + p->nValMin;
      testPrngString(testPrngValue(iSeq++), aVal, nVal);
      dt3PutKey(aKey, iKey);

      testWrite(pDb, aKey, sizeof(aKey)-1, aVal, nVal, &rc);
      abPresent[iKey] = 1;
    }

    /* Perform nDelete deletes */
    for(ii=0; ii<p->nDelete; ii++){
      u8 aKey1[6];
      u8 aKey2[6];
      u32 iKey;

      iKey = (testPrngValue(iSeq++) % p->nRange) + 1;
      dt3PutKey(aKey1, iKey-1);
      dt3PutKey(aKey2, iKey+1);

      testDeleteRange(pDb, aKey1, sizeof(aKey1)-1, aKey2, sizeof(aKey2)-1, &rc);
      abPresent[iKey] = 0;
    }

    testReopen(&pDb, &rc);

    for(ii=1; rc==0 && ii<=p->nRange; ii++){
      int nDbVal;
      void *pDbVal;
      u8 aKey[6];
      int dbrc;

      dt3PutKey(aKey, ii);
      dbrc = tdb_fetch(pDb, aKey, sizeof(aKey)-1, &pDbVal, &nDbVal);
      testCompareInt(0, dbrc, &rc);

      if( abPresent[ii] ){
        testCompareInt(1, (nDbVal>0), &rc);
      }else{
        testCompareInt(1, (nDbVal<0), &rc);
      }
    }
  }

  testClose(&pDb);
  testCaseFinish(rc);
  *pRc = rc;
}

static char *getName3(const char *zSystem, Datatest3 *p){
  return testMallocPrintf("data3.%s.%d.%d.%d.%d.(%d..%d)",
      zSystem, p->nRange, p->nIter, p->nWrite, p->nDelete, 
      p->nValMin, p->nValMax
  );
}

void test_data_3(
  const char *zSystem,            /* Database system name */
  const char *zPattern,           /* Run test cases that match this pattern */
  int *pRc                        /* IN/OUT: Error code */
){
  Datatest3 aTest[] = {
    /* nRange, nIter, nWrite, nDelete, nValMin, nValMax */
    {  100,    1000,  5,      5,       50,      100 },
    {  100,    1000,  2,      2,        5,       10 },
  };

  int i;

  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
    char *zName = getName3(zSystem, &aTest[i]);
    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
      doDataTest3(zSystem, &aTest[i], pRc);
    }
    testFree(zName);
  }
}


Changes to lsm-test/lsmtest_main.c.

458
459
460
461
462
463
464

465
466
467
468
469
470
471
  }

  for(j=0; tdb_system_name(j); j++){
    rc = 0;

    test_data_1(tdb_system_name(j), zPattern, &rc);
    test_data_2(tdb_system_name(j), zPattern, &rc);

    test_rollback(tdb_system_name(j), zPattern, &rc);
    test_mc(tdb_system_name(j), zPattern, &rc);
    test_mt(tdb_system_name(j), zPattern, &rc);

    if( rc ) nFail++;
  }








>







458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
  }

  for(j=0; tdb_system_name(j); j++){
    rc = 0;

    test_data_1(tdb_system_name(j), zPattern, &rc);
    test_data_2(tdb_system_name(j), zPattern, &rc);
    test_data_3(tdb_system_name(j), zPattern, &rc);
    test_rollback(tdb_system_name(j), zPattern, &rc);
    test_mc(tdb_system_name(j), zPattern, &rc);
    test_mt(tdb_system_name(j), zPattern, &rc);

    if( rc ) nFail++;
  }

Changes to src/lsm.h.

480
481
482
483
484
485
486


487
488
489
490
491
492
493
**   can be found. This is usually used to optimize the database by 
**   merging the whole thing into one big array.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite);

#define LSM_WORK_FLUSH           0x00000001
#define LSM_WORK_OPTIMIZE        0x00000002



/*
** Attempt to checkpoint the current database snapshot. Return an LSM
** error code if an error occurs or LSM_OK otherwise.
**
** If the current snapshot has already been checkpointed, calling this 
** function is a no-op. In this case if pnByte is not NULL, *pnByte is







>
>







480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
**   can be found. This is usually used to optimize the database by 
**   merging the whole thing into one big array.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite);

#define LSM_WORK_FLUSH           0x00000001
#define LSM_WORK_OPTIMIZE        0x00000002

int lsm_flush(lsm_db *pDb);

/*
** Attempt to checkpoint the current database snapshot. Return an LSM
** error code if an error occurs or LSM_OK otherwise.
**
** If the current snapshot has already been checkpointed, calling this 
** function is a no-op. In this case if pnByte is not NULL, *pnByte is

Changes to src/lsmInt.h.

528
529
530
531
532
533
534
535
536
537
538
539
540
541
542


/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
void lsmTreeClear(lsm_db *);
int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

void lsmTreeMakeOld(lsm_db *pDb);
void lsmTreeDiscardOld(lsm_db *pDb);
int lsmTreeHasOld(lsm_db *pDb);








<







528
529
530
531
532
533
534

535
536
537
538
539
540
541


/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);

int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

void lsmTreeMakeOld(lsm_db *pDb);
void lsmTreeDiscardOld(lsm_db *pDb);
int lsmTreeHasOld(lsm_db *pDb);

Changes to src/lsm_sorted.c.

2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011

2012
2013
2014
2015

2016
2017
2018
2019
2020
2021
2022
....
2049
2050
2051
2052
2053
2054
2055
2056






2057
2058
2059
2060
2061
2062
2063
....
2517
2518
2519
2520
2521
2522
2523






















2524
2525
2526
2527
2528
2529
2530
....
2534
2535
2536
2537
2538
2539
2540

2541
2542
2543
2544
2545
2546
2547
....
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
....
3854
3855
3856
3857
3858
3859
3860

3861




3862
3863
3864
3865
3866
3867
3868
....
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
....
4500
4501
4502
4503
4504
4505
4506
4507
4508
4509
4510
4511
4512
4513
4514
....
4701
4702
4703
4704
4705
4706
4707

























4708
4709
4710
4711
4712
4713
4714
    rtTopic(iRhsFlags), pRhsKey, nRhsKey
  );

  /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or
  ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents
  ** the beginning of an open-ended set from masking a database entry or
  ** delete at a lower level.  */
  if( res==0 ){
    const int insdel = LSM_POINT_DELETE|LSM_INSERT;
    int iDel1 = 0;
    int iDel2 = 0;

    if( LSM_START_DELETE==(iLhsFlags & (LSM_START_DELETE|insdel)) ) iDel1 = +1;
    if( LSM_END_DELETE  ==(iLhsFlags & (LSM_END_DELETE  |insdel)) ) iDel1 = -1;
    if( LSM_START_DELETE==(iRhsFlags & (LSM_START_DELETE|insdel)) ) iDel2 = +1;
    if( LSM_END_DELETE  ==(iRhsFlags & (LSM_END_DELETE  |insdel)) ) iDel2 = -1;

    res = (iDel1 - iDel2);
  }

  return res;
}

static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
................................................................................
    /* Compare the keys */
    res = sortedDbKeyCompare(pCsr,
        eType1, pKey1, nKey1, eType2, pKey2, nKey2
    );

    res = res * mul;
    if( res==0 ){
      iRes = (rtIsSeparator(eType1) ? i2 : i1);






    }else if( res<0 ){
      iRes = i1;
    }else{
      iRes = i2;
    }
  }

................................................................................
  }
  if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
    return 0;
  }

  /* Check if this key has already been deleted by a range-delete */
  iKey = pCsr->aTree[1];






















  if( (iKey>0 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[0]))) 
   || (iKey>1 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[1]))) 
  ){
    return 0;
  }
  if( iKey>CURSOR_DATA_SYSTEM && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){
    int eType;
................................................................................

  for(i=CURSOR_DATA_SEGMENT; i<iKey; i++){
    int iPtr = i-CURSOR_DATA_SEGMENT;
    if( pCsr->aPtr[iPtr].pPg && (pCsr->aPtr[iPtr].eType & rdmask) ){
      return 0;
    }
  }


  return 1;
}

static int multiCursorEnd(MultiCursor *pCsr, int bLast){
  int rc = LSM_OK;
  int i;
................................................................................
  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
    /* The ignore-delete flag is set when the output of the merge will form
    ** the oldest level in the database. In this case there is no point in
    ** retaining any range-delete flags.  */
    assert( (f & LSM_POINT_DELETE)==0 );
    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  }else{
#if 0
    if( iKey==0 ){
      int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
      if( btreeflags & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){
      SegmentPtr *pPtr = &pCsr->aPtr[i];
      if( pPtr->pPg && (pPtr->eType & LSM_END_DELETE) ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }
#endif
    for(i=0; i<(CURSOR_DATA_SEGMENT + pCsr->nPtr); i++){
      if( i!=iKey ){
        int eType;
        void *pKey;
        int nKey;
        int res;
        multiCursorGetKey(pCsr, i, &eType, &pKey, &nKey);

        if( pKey ){
          res = sortedKeyCompare(pCsr->pDb->xCmp, 
              rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData,
              rtTopic(eType), pKey, nKey
              );
          assert( res<=0 );
          if( res==0 ){
            if( (f & (LSM_INSERT|LSM_POINT_DELETE))==0 ){
              if( eType & LSM_INSERT ){
                f |= LSM_INSERT;
                *piVal = i;
              }
................................................................................
                f |= LSM_POINT_DELETE;
              }
            }
            f |= (eType & (LSM_END_DELETE|LSM_START_DELETE));
          }

          if( i>iKey && (eType & LSM_END_DELETE) && res<0 ){

            f |= (LSM_END_DELETE|LSM_START_DELETE);




          }
        }
      }
    }

    assert( (f & LSM_INSERT)==0 || (f & LSM_POINT_DELETE)==0 );
    if( (f & LSM_START_DELETE) 
................................................................................
  if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
    assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }else{
    if( pDel ) pDel->iRoot = 0;

#if 1
    lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
#endif

    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
      lsmFree(pDb->pEnv, p->aEntry);
      memcpy(p, &freelist, sizeof(freelist));
................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 1
      lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
................................................................................

  /* This function may not be called if pDb has an open read or write
  ** transaction. Return LSM_MISUSE if an application attempts this.  */
  if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;

  return doLsmWork(pDb, flags, nPage, pnWrite);
}


























/*
** This function is called in auto-work mode to perform merging work on
** the data structure. It performs enough merging work to prevent the
** height of the tree from growing indefinitely assuming that roughly
** nUnit database pages worth of data have been written to the database
** (i.e. the in-memory tree) since the last call.







|
|


>
|
|
|
|
>







 







|
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<












|







 







>
|
>
>
>
>







 







|







 







|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
....
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
....
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
....
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
....
3842
3843
3844
3845
3846
3847
3848















3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
....
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
....
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
....
4521
4522
4523
4524
4525
4526
4527
4528
4529
4530
4531
4532
4533
4534
4535
....
4722
4723
4724
4725
4726
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739
4740
4741
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759
4760
    rtTopic(iRhsFlags), pRhsKey, nRhsKey
  );

  /* If a key has the LSM_START_DELETE flag set, but not the LSM_INSERT or
  ** LSM_POINT_DELETE flags, it is considered a delta larger. This prevents
  ** the beginning of an open-ended set from masking a database entry or
  ** delete at a lower level.  */
  if( res==0 && (pCsr->flags & CURSOR_IGNORE_DELETE) ){
    const int m = LSM_POINT_DELETE|LSM_INSERT|LSM_END_DELETE |LSM_START_DELETE;
    int iDel1 = 0;
    int iDel2 = 0;

    if( LSM_START_DELETE==(iLhsFlags & m) ) iDel1 = +1;
    if( LSM_END_DELETE  ==(iLhsFlags & m) ) iDel1 = -1;
    if( LSM_START_DELETE==(iRhsFlags & m) ) iDel2 = +1;
    if( LSM_END_DELETE  ==(iRhsFlags & m) ) iDel2 = -1;

    res = (iDel1 - iDel2);
  }

  return res;
}

static void multiCursorDoCompare(MultiCursor *pCsr, int iOut, int bReverse){
................................................................................
    /* Compare the keys */
    res = sortedDbKeyCompare(pCsr,
        eType1, pKey1, nKey1, eType2, pKey2, nKey2
    );

    res = res * mul;
    if( res==0 ){
      /* The two keys are identical. Normally, this means that the key from
      ** the newer run clobbers the old. However, if the newer key is a
      ** separator key, or a range-delete-boundary only, do not allow it
      ** to clobber an older entry.  */
      int nc1 = (eType1 & (LSM_INSERT|LSM_POINT_DELETE))==0;
      int nc2 = (eType2 & (LSM_INSERT|LSM_POINT_DELETE))==0;
      iRes = (nc1 > nc2) ? i2 : i1;
    }else if( res<0 ){
      iRes = i1;
    }else{
      iRes = i2;
    }
  }

................................................................................
  }
  if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
    return 0;
  }

  /* Check if this key has already been deleted by a range-delete */
  iKey = pCsr->aTree[1];
  for(i=0; i<iKey; i++){
    int csrflags;
    multiCursorGetKey(pCsr, i, &csrflags, 0, 0);
    if( (rdmask & csrflags) ){
      const int SD_ED = (LSM_START_DELETE|LSM_END_DELETE);
      if( (csrflags & SD_ED)==SD_ED 
       || (pCsr->flags & CURSOR_IGNORE_DELETE)==0
      ){
        void *pKey; int nKey;
        multiCursorGetKey(pCsr, i, 0, &pKey, &nKey);
        if( 0==sortedKeyCompare(pCsr->pDb->xCmp,
              rtTopic(eType), pCsr->key.pData, pCsr->key.nData,
              rtTopic(csrflags), pKey, nKey
        )){
          continue;
        }
      }
      return 0;
    }
  }

#if 0
  if( (iKey>0 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[0]))) 
   || (iKey>1 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[1]))) 
  ){
    return 0;
  }
  if( iKey>CURSOR_DATA_SYSTEM && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){
    int eType;
................................................................................

  for(i=CURSOR_DATA_SEGMENT; i<iKey; i++){
    int iPtr = i-CURSOR_DATA_SEGMENT;
    if( pCsr->aPtr[iPtr].pPg && (pCsr->aPtr[iPtr].eType & rdmask) ){
      return 0;
    }
  }
#endif

  return 1;
}

static int multiCursorEnd(MultiCursor *pCsr, int bLast){
  int rc = LSM_OK;
  int i;
................................................................................
  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
    /* The ignore-delete flag is set when the output of the merge will form
    ** the oldest level in the database. In this case there is no point in
    ** retaining any range-delete flags.  */
    assert( (f & LSM_POINT_DELETE)==0 );
    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  }else{















    for(i=0; i<(CURSOR_DATA_SEGMENT + pCsr->nPtr); i++){
      if( i!=iKey ){
        int eType;
        void *pKey;
        int nKey;
        int res;
        multiCursorGetKey(pCsr, i, &eType, &pKey, &nKey);

        if( pKey ){
          res = sortedKeyCompare(pCsr->pDb->xCmp, 
              rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData,
              rtTopic(eType), pKey, nKey
          );
          assert( res<=0 );
          if( res==0 ){
            if( (f & (LSM_INSERT|LSM_POINT_DELETE))==0 ){
              if( eType & LSM_INSERT ){
                f |= LSM_INSERT;
                *piVal = i;
              }
................................................................................
                f |= LSM_POINT_DELETE;
              }
            }
            f |= (eType & (LSM_END_DELETE|LSM_START_DELETE));
          }

          if( i>iKey && (eType & LSM_END_DELETE) && res<0 ){
            if( f & (LSM_INSERT|LSM_POINT_DELETE) ){
              f |= (LSM_END_DELETE|LSM_START_DELETE);
            }else{
              f = 0;
            }
            break;
          }
        }
      }
    }

    assert( (f & LSM_INSERT)==0 || (f & LSM_POINT_DELETE)==0 );
    if( (f & LSM_START_DELETE) 
................................................................................
  if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
    assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }else{
    if( pDel ) pDel->iRoot = 0;

#if 0
    lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
#endif

    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
      lsmFree(pDb->pEnv, p->aEntry);
      memcpy(p, &freelist, sizeof(freelist));
................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
................................................................................

  /* This function may not be called if pDb has an open read or write
  ** transaction. Return LSM_MISUSE if an application attempts this.  */
  if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;

  return doLsmWork(pDb, flags, nPage, pnWrite);
}

int lsm_flush(lsm_db *db){
  int rc;

  if( db->nTransOpen>0 || db->pCsr ){
    rc = LSM_MISUSE_BKPT;
  }else{
    rc = lsmBeginWriteTrans(db);
    if( rc==LSM_OK ){
      lsmFlushTreeToDisk(db);
      lsmTreeDiscardOld(db);
      lsmTreeMakeOld(db);
      lsmTreeDiscardOld(db);
    }

    if( rc==LSM_OK ){
      rc = lsmFinishWriteTrans(db, 1);
    }else{
      lsmFinishWriteTrans(db, 0);
    }
    lsmFinishReadTrans(db);
  }

  return rc;
}

/*
** This function is called in auto-work mode to perform merging work on
** the data structure. It performs enough merging work to prevent the
** height of the tree from growing indefinitely assuming that roughly
** nUnit database pages worth of data have been written to the database
** (i.e. the in-memory tree) since the last call.

Changes to src/lsm_tree.c.

1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
      rc = treeUpdatePtr(pDb, pCsr, iNew);
    }
  }

  return rc;
}

/*
** Empty the contents of the in-memory tree.
*/
void lsmTreeClear(lsm_db *pDb){
  pDb->treehdr.root.iTransId = 1;
  pDb->treehdr.root.iRoot = 0;
  pDb->treehdr.root.nHeight = 0;
  pDb->treehdr.nByte = 0;
  pDb->treehdr.iUsedShmid = pDb->treehdr.iNextShmid-1;
}

void lsmTreeMakeOld(lsm_db *pDb){
  if( pDb->treehdr.iOldShmid==0 ){
    pDb->treehdr.iOldLog = pDb->treehdr.log.aRegion[2].iEnd;
    pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
    pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
    pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
    memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));







<
<
<
<
<
<
<
<
<
<
<







1047
1048
1049
1050
1051
1052
1053











1054
1055
1056
1057
1058
1059
1060
      rc = treeUpdatePtr(pDb, pCsr, iNew);
    }
  }

  return rc;
}












void lsmTreeMakeOld(lsm_db *pDb){
  if( pDb->treehdr.iOldShmid==0 ){
    pDb->treehdr.iOldLog = pDb->treehdr.log.aRegion[2].iEnd;
    pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
    pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
    pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
    memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));

Added test/lsm1.test.

































































































































































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# 2012 November 02
#
# The author disclaims copyright to this source code.  In place of
# a legal notice, here is a blessing:
#
#    May you do good and not evil.
#    May you find forgiveness for yourself and forgive others.
#    May you share freely, never taking more than you give.
#
#***********************************************************************
#
set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix lsm1
db close


proc reopen {{bClear 0}} {
  catch {db close}
  if {$bClear} { forcedelete test.db }
  lsm_open db test.db {mmap 0 nmerge 2 autowork 0}
}

proc contents {} {
  db csr_open csr
  set res [list]
  for {csr first} {[csr valid]} {csr next} {
    lappend res [list [csr key] [csr value]]
  }
  csr close
  set res
}

proc fetch {key} {
  db csr_open csr
  csr seek $key eq
  set val [csr value]
  csr close
  set val
}

proc dbwrite {list} {
  foreach {k v} $list {
    db write $k $v
  }
}

proc do_contents_test {tn res} {
  set con [contents]
  set res2 [list]
  foreach r $res {lappend res2 $r}
  uplevel do_test $tn [list [list set {} $con]] [list $res2]
}

if 1 {

do_test 1.1 {
  reopen
  db write abc def
  db close
} {}

do_test 1.2 {
  reopen
  db csr_open csr
  csr seek abc eq
} {}

do_test 1.3 { 
  list [csr valid] [csr key] [csr value] 
} {1 abc def}

do_test 1.4 { 
  db delete abc
  csr seek abc eq
  csr valid
} {0}

do_test 1.5 { csr close } {}
do_test 1.6 { db close  } {}


do_test 2.1 {
  forcedelete test.db
  reopen
  db write aaa one
  db write bbb two
  db write ccc three
  db write ddd four
  db write eee five
  db write fff six
  reopen
  db delete_range a bbb
  reopen
  db work 10 
} {1}

do_contents_test 2.2 { {bbb two} {ccc three} {ddd four} {eee five} {fff six} }


#-------------------------------------------------------------------------

# The following populates the db with a single age=1 segment, containing
# the six keys inserted below.
do_test 3.1 {
  reopen 1
  db write aaa one
  db write ddd four
  db write fff six
  reopen
  db write bbb two
  db write ccc three
  db write eee five
  reopen
  db work 10
} {1}

do_test 3.2 {
  db write bx seven
  reopen
  db delete_range aaa bx
  reopen
  db work 10
} {2}

do_contents_test 3.3 { 
  {aaa one} {bx seven} {ccc three} {ddd four} {eee five} {fff six}
}

do_test 3.4 { fetch ddd } four

#-------------------------------------------------------------------------
#
do_test 4.1 {
  reopen 1
  dbwrite { 222 helloworld }
  db flush
  db delete_range 111 222
  db delete_range 222 333
  db flush
  contents
} {{222 helloworld}}

do_test 4.2 { fetch 222 } helloworld

#-------------------------------------------------------------------------
#
do_test 5.1 {
  reopen 1

  dbwrite { 10 ten    }  ; db flush
  dbwrite { 20 twenty }  ; db flush
  db work 10

  dbwrite { 30 thirty }  ; db flush
  dbwrite { 40 forty  }  ; db flush
  db work 10

  db delete_range 11 29  ; db flush
  db delete_range 20 39  ; db flush
  db work 10

  contents
} {{10 ten} {40 forty}}

do_test 5.2 {
  reopen 1
  db config {nmerge 4}

  dbwrite { 10 ten    }  ; db flush
  dbwrite { 20 twenty }  ; db flush
  dbwrite { 30 thirty }  ; db flush
  dbwrite { 40 forty  }  ; db flush
  db work 10

  db delete_range 10 17  ; db flush
  dbwrite {17 seventeen} ; db flush
  db delete_range 10 17  ; db flush

  db config {nmerge 3}
  db work 10

  contents
} {{10 ten} {17 seventeen} {20 twenty} {30 thirty} {40 forty}}

}

do_test 5.3 {
  reopen 1
  db config {nmerge 4}

  dbwrite { 10 ten    }  ; db flush
  dbwrite { 20 twenty }  ; db flush
  dbwrite { 30 thirty }  ; db flush
  dbwrite { 40 forty  }  ; db flush
  db work 10

  db delete_range 10 17  ; db flush
  db delete_range 12 19  ; db flush
  dbwrite {17 seventeen} ; db flush

  db config {nmerge 3}
  db work 10

  contents
} {{10 ten} {17 seventeen} {20 twenty} {30 thirty} {40 forty}}

finish_test

Changes to test/permutations.test.

131
132
133
134
135
136
137

138
139
140
141
142
143
144
#
lappend ::testsuitelist xxx

test_suite "src4" -prefix "" -description {
} -files {
  simple.test simple2.test
  log3.test 

  csr1.test
  ckpt1.test
  mc1.test

  aggerror.test
  attach.test
  autoindex1.test







>







131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#
lappend ::testsuitelist xxx

test_suite "src4" -prefix "" -description {
} -files {
  simple.test simple2.test
  log3.test 
  lsm1.test
  csr1.test
  ckpt1.test
  mc1.test

  aggerror.test
  attach.test
  autoindex1.test

Changes to test/test_lsm.c.

286
287
288
289
290
291
292


















































293
294
295
296
297
298
299
...
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474


475
476
477
478
479
480
481
...
592
593
594
595
596
597
598








599
600
601
602
603
604
605
...
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
    Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC);
    return TCL_ERROR;
  }

  Tcl_ResetResult(interp);
  return TCL_OK;
}



















































typedef struct TclLsmCursor TclLsmCursor;
typedef struct TclLsm TclLsm;

struct TclLsm {
  lsm_db *db;
};
................................................................................
  Tcl_Obj *CONST objv[]
){
  struct Subcmd {
    const char *zCmd;
    int nArg;
    const char *zUsage;
  } aCmd[] = {
    /* 0 */ {"close",        0, ""},
    /* 1 */ {"write",        2, "KEY VALUE"},
    /* 2 */ {"delete",       1, "KEY"},
    /* 3 */ {"delete_range", 2, "START-KEY END-KEY"},
    /* 4 */ {"begin",        1, "LEVEL"},
    /* 5 */ {"commit",       1, "LEVEL"},
    /* 6 */ {"rollback",     1, "LEVEL"},
    /* 7 */ {"csr_open",     1, "CSR"},
    /* 8 */ {"work",        -1, "NPAGE ?SWITCHES?"},


    {0, 0, 0}
  };
  int iCmd;
  int rc;
  TclLsm *p = (TclLsm *)clientData;

  if( objc<2 ){
................................................................................

      rc = lsm_work(p->db, flags, nWork, &nWrite);
      if( rc!=LSM_OK ) return test_lsm_error(interp, "lsm_work", rc);
      Tcl_SetObjResult(interp, Tcl_NewIntObj(nWrite));
      return TCL_OK;
    }










    default:
      assert( 0 );
  }

  Tcl_AppendResult(interp, "internal error", 0);
  return TCL_ERROR;
................................................................................
  if( rc!=LSM_OK ){
    test_lsm_del((void *)p);
    test_lsm_error(interp, "lsm_new", rc);
    return TCL_ERROR;
  }

  if( objc==4 ){
    struct Lsmconfig {
      const char *zOpt;
      int eOpt;
    } aConfig[] = {
      { "write_buffer",     LSM_CONFIG_WRITE_BUFFER },
      { "page_size",        LSM_CONFIG_PAGE_SIZE },
      { "block_size",       LSM_CONFIG_BLOCK_SIZE },
      { "safety",           LSM_CONFIG_SAFETY },
      { "autowork",         LSM_CONFIG_AUTOWORK },
      { "autocheckpoint",   LSM_CONFIG_AUTOCHECKPOINT },
      { "log_size",         LSM_CONFIG_LOG_SIZE },
      { "mmap",             LSM_CONFIG_MMAP },
      { "use_log",          LSM_CONFIG_USE_LOG },
      { "nmerge",           LSM_CONFIG_NMERGE },
      { "max_freelist",     LSM_CONFIG_MAX_FREELIST },
      { "multi_proc",       LSM_CONFIG_MULTIPLE_PROCESSES },
      { 0, 0 }
    };
    int nElem;
    int i;
    Tcl_Obj **apElem;

    rc = Tcl_ListObjGetElements(interp, objv[3], &nElem, &apElem);
    for(i=0; rc==TCL_OK && i<nElem; i+=2){
      int iOpt;
      rc = Tcl_GetIndexFromObjStruct(
          interp, apElem[i], aConfig, sizeof(aConfig[0]), "option", 0, &iOpt
      );
      if( rc==TCL_OK ){
        if( i==(nElem-1) ){
          Tcl_ResetResult(interp);
          Tcl_AppendResult(interp, "option \"", Tcl_GetString(apElem[i]), 
              "\" requires an argument", 0
          );
          rc = TCL_ERROR;
        }else{
          int iVal;
          rc = Tcl_GetIntFromObj(interp, apElem[i+1], &iVal);
          if( rc==TCL_OK ){
            lsm_config(p->db, aConfig[iOpt].eOpt, &iVal);
          }
        }
      }
    }
    if( rc!=TCL_OK ){ 
      test_lsm_del((void *)p);
      return rc;
    }
  }

  lsm_config_log(p->db, xLog, 0);







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|
|
|
|
|
|
|
|
|
>
>







 







>
>
>
>
>
>
>
>







 







|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
...
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
...
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
...
699
700
701
702
703
704
705
706











































707
708
709
710
711
712
713
    Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), TCL_STATIC);
    return TCL_ERROR;
  }

  Tcl_ResetResult(interp);
  return TCL_OK;
}

static int testConfigureLsm(Tcl_Interp *interp, lsm_db *db, Tcl_Obj *pObj){
  struct Lsmconfig {
    const char *zOpt;
    int eOpt;
  } aConfig[] = {
    { "write_buffer",     LSM_CONFIG_WRITE_BUFFER },
    { "page_size",        LSM_CONFIG_PAGE_SIZE },
    { "block_size",       LSM_CONFIG_BLOCK_SIZE },
    { "safety",           LSM_CONFIG_SAFETY },
    { "autowork",         LSM_CONFIG_AUTOWORK },
    { "autocheckpoint",   LSM_CONFIG_AUTOCHECKPOINT },
    { "log_size",         LSM_CONFIG_LOG_SIZE },
    { "mmap",             LSM_CONFIG_MMAP },
    { "use_log",          LSM_CONFIG_USE_LOG },
    { "nmerge",           LSM_CONFIG_NMERGE },
    { "max_freelist",     LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       LSM_CONFIG_MULTIPLE_PROCESSES },
    { 0, 0 }
  };
  int nElem;
  int i;
  Tcl_Obj **apElem;
  int rc;

  rc = Tcl_ListObjGetElements(interp, pObj, &nElem, &apElem);
  for(i=0; rc==TCL_OK && i<nElem; i+=2){
    int iOpt;
    rc = Tcl_GetIndexFromObjStruct(
        interp, apElem[i], aConfig, sizeof(aConfig[0]), "option", 0, &iOpt
    );
    if( rc==TCL_OK ){
      if( i==(nElem-1) ){
        Tcl_ResetResult(interp);
        Tcl_AppendResult(interp, "option \"", Tcl_GetString(apElem[i]), 
            "\" requires an argument", 0
            );
        rc = TCL_ERROR;
      }else{
        int iVal;
        rc = Tcl_GetIntFromObj(interp, apElem[i+1], &iVal);
        if( rc==TCL_OK ){
          lsm_config(db, aConfig[iOpt].eOpt, &iVal);
        }
      }
    }
  }

  return rc;
}

typedef struct TclLsmCursor TclLsmCursor;
typedef struct TclLsm TclLsm;

struct TclLsm {
  lsm_db *db;
};
................................................................................
  Tcl_Obj *CONST objv[]
){
  struct Subcmd {
    const char *zCmd;
    int nArg;
    const char *zUsage;
  } aCmd[] = {
    /*  0 */ {"close",        0, ""},
    /*  1 */ {"write",        2, "KEY VALUE"},
    /*  2 */ {"delete",       1, "KEY"},
    /*  3 */ {"delete_range", 2, "START-KEY END-KEY"},
    /*  4 */ {"begin",        1, "LEVEL"},
    /*  5 */ {"commit",       1, "LEVEL"},
    /*  6 */ {"rollback",     1, "LEVEL"},
    /*  7 */ {"csr_open",     1, "CSR"},
    /*  8 */ {"work",        -1, "NPAGE ?SWITCHES?"},
    /*  9 */ {"flush",        0, ""},
    /* 10 */ {"config",       1, "LIST"},
    {0, 0, 0}
  };
  int iCmd;
  int rc;
  TclLsm *p = (TclLsm *)clientData;

  if( objc<2 ){
................................................................................

      rc = lsm_work(p->db, flags, nWork, &nWrite);
      if( rc!=LSM_OK ) return test_lsm_error(interp, "lsm_work", rc);
      Tcl_SetObjResult(interp, Tcl_NewIntObj(nWrite));
      return TCL_OK;
    }

    case 9: assert( 0==strcmp(aCmd[9].zCmd, "flush") ); {
      rc = lsm_flush(p->db);
      return test_lsm_error(interp, "lsm_flush", rc);
    }

    case 10: assert( 0==strcmp(aCmd[10].zCmd, "config") ); {
      return testConfigureLsm(interp, p->db, objv[2]);
    }

    default:
      assert( 0 );
  }

  Tcl_AppendResult(interp, "internal error", 0);
  return TCL_ERROR;
................................................................................
  if( rc!=LSM_OK ){
    test_lsm_del((void *)p);
    test_lsm_error(interp, "lsm_new", rc);
    return TCL_ERROR;
  }

  if( objc==4 ){
    rc = testConfigureLsm(interp, p->db, objv[3]);











































    if( rc!=TCL_OK ){ 
      test_lsm_del((void *)p);
      return rc;
    }
  }

  lsm_config_log(p->db, xLog, 0);