SQLite4
Check-in [bb42813797]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add lsmtest tests that focus on recovering from the failure of a writer process. And fixes for the same.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: bb42813797ffdec8d931023997213fa5e754d021
User & Date: dan 2012-09-13 15:24:53
Context
2012-09-13
18:13
Add tests for dealing with inconsistent tree-headers in shared-memory. check-in: 4ea78ff1f3 user: dan tags: trunk
15:24
Add lsmtest tests that focus on recovering from the failure of a writer process. And fixes for the same. check-in: bb42813797 user: dan tags: trunk
2012-09-12
17:23
Fix a problem in the lsm_info(LOG_STRUCTURE) command causing errors in tcl tests. check-in: 6e5e429ea7 user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/README.

15
16
17
18
19
20
21









22
23
24
25
  lsmtest4.c: Multi-client tests.

  lsmtest5.c: Multi-client tests with a different thread for each client.

  lsmtest6.c: OOM injection tests.

  lsmtest7.c: API tests.




















>
>
>
>
>
>
>
>
>




15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  lsmtest4.c: Multi-client tests.

  lsmtest5.c: Multi-client tests with a different thread for each client.

  lsmtest6.c: OOM injection tests.

  lsmtest7.c: API tests.

  lsmtest8.c: Writer crash tests. Tests in this file attempt to verify that
              the system recovers and other clients proceed unaffected if
              a process fails in the middle of a write transaction.

              The difference from lsmtest2.c is that this file tests
              live-recovery (recovery from a failure that occurs while other
              clients are still running) whereas lsmtest2.c tests recovery
              from a system or power failure.




Changes to lsm-test/lsmtest.h.

151
152
153
154
155
156
157

158
159
160
161



162
163
164
165
166
167
168

/* lsmtest6.c */
void test_oom(const char *zPattern, int *pRc);
void testDeleteLsmdb(const char *zFile);

void testSaveLsmdb(const char *zFile);
void testRestoreLsmdb(const char *zFile);


/* lsmtest7.c */
void test_api(const char *zPattern, int *pRc);




/*************************************************************************
** Interface to functionality in test_datasource.c.
*/
typedef struct Datasource Datasource;
typedef struct DatasourceDefn DatasourceDefn;

struct DatasourceDefn {







>




>
>
>







151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172

/* lsmtest6.c */
void test_oom(const char *zPattern, int *pRc);
void testDeleteLsmdb(const char *zFile);

void testSaveLsmdb(const char *zFile);
void testRestoreLsmdb(const char *zFile);
void testCopyLsmdb(const char *zFrom, const char *zTo);

/* lsmtest7.c */
void test_api(const char *zPattern, int *pRc);

/* lsmtest8.c */
void do_writer_crash_test(const char *zPattern, int *pRc);

/*************************************************************************
** Interface to functionality in test_datasource.c.
*/
typedef struct Datasource Datasource;
typedef struct DatasourceDefn DatasourceDefn;

struct DatasourceDefn {

Changes to lsm-test/lsmtest6.c.

280
281
282
283
284
285
286















287
288
289
290
291
292
293
    testFree(aBuf);

    close(fd1);
    close(fd2);
  }
}

















/*
** File zFile is the path to an LSM database. This function makes backups
** of the database file and its log as follows:
**
**     cp $(zFile)     $(zFile)-save
**     cp $(zFile)-log $(zFile)-save-log







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
    testFree(aBuf);

    close(fd1);
    close(fd2);
  }
}

void testCopyLsmdb(const char *zFrom, const char *zTo){
  char *zLog1 = testMallocPrintf("%s-log", zFrom);
  char *zLog2 = testMallocPrintf("%s-log", zTo);
  char *zShm1 = testMallocPrintf("%s-shm", zFrom);
  char *zShm2 = testMallocPrintf("%s-shm", zTo);

  unlink(zShm2);
  unlink(zLog2);
  unlink(zTo);
  copy_file(zFrom, zTo);
  copy_file(zLog1, zLog2);
  copy_file(zShm1, zShm2);

  testFree(zLog1); testFree(zLog2); testFree(zShm1); testFree(zShm2);
}

/*
** File zFile is the path to an LSM database. This function makes backups
** of the database file and its log as follows:
**
**     cp $(zFile)     $(zFile)-save
**     cp $(zFile)-log $(zFile)-save-log

Added lsm-test/lsmtest8.c.









































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

/*
** This file contains test cases to verify that "live-recovery" following
** a mid-transaction failure of a writer process.
*/

#include "lsmtest.h"

typedef struct SetupStep SetupStep;
struct SetupStep {
  int workflags;                  /* Flags to pass to lsm_work() */
  int iInsStart;                  /* First key-value from ds to insert */
  int nIns;                       /* Number of rows to insert */
  int iDelStart;                  /* First key from ds to delete */
  int nDel;                       /* Number of rows to delete */
};

static void doSetupStep(
  TestDb *pDb, 
  Datasource *pData, 
  const SetupStep *pStep, 
  int *pRc
){
  testWriteDatasourceRange(pDb, pData, pStep->iInsStart, pStep->nIns, pRc);
  testDeleteDatasourceRange(pDb, pData, pStep->iDelStart, pStep->nDel, pRc);
  if( *pRc==0 ){
    lsm_db *db = tdb_lsm(pDb);
    *pRc = lsm_work(db, pStep->workflags, 0, 0);
  }
}

static void doSetupStepArray(
  TestDb *pDb, 
  Datasource *pData, 
  const SetupStep *aStep, 
  int nStep
){
  int i;
  for(i=0; i<nStep; i++){
    int rc = 0;
    doSetupStep(pDb, pData, &aStep[i], &rc);
    assert( rc==0 );
  }
}

static void setupDatabase1(TestDb *pDb, Datasource **ppData){
  const SetupStep aStep[] = {
    { 0,                                  1,     2000, 0, 0 },
    { LSM_WORK_CHECKPOINT|LSM_WORK_FLUSH, 0,     0, 0, 0 },
    { 0,                                  10001, 1000, 0, 0 },
  };
  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 100, 500};
  Datasource *pData;

  pData = testDatasourceNew(&defn);
  doSetupStepArray(pDb, pData, aStep, ArraySize(aStep));
  if( ppData ){
    *ppData = pData;
  }else{
    testDatasourceFree(pData);
  }
}

/*
** This function makes a copy of the three files associated with LSM 
** database zDb (i.e. if zDb is "test.db", it makes copies of "test.db",
** "test.db-log" and "test.db-shm").
**
** It then opens a new database connection to the copy with the xLock() call
** instrumented so that it appears that some other process already connected
** to the db (holding a shared lock on DMS2). This prevents recovery from
** running. Then:
**
**    1) Check that the checksum of the database is zCksum. 
**    2) Write a few keys to the database. Then delete the same keys. 
**    3) Check that the checksum is zCksum.
**    4) Flush the db to disk and run a checkpoint. 
**    5) Check once more that the checksum is still zCksum.
*/
static void doLiveRecovery(const char *zDb, const char *zCksum, int *pRc){
  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 20, 25, 100, 500};
  Datasource *pData;
  const char *zCopy = "testcopy.lsm";
  char zCksum2[TEST_CKSUM_BYTES];
  TestDb *pDb = 0;
  int rc;

  pData = testDatasourceNew(&defn);

  testCopyLsmdb(zDb, zCopy);
  rc = tdb_lsm_open("test_no_recovery=1", zCopy, 0, &pDb);
  if( rc==0 ){
    lsm_db *db;
    testCksumDatabase(pDb, zCksum2);
    testCompareStr(zCksum, zCksum2, &rc);

    testWriteDatasourceRange(pDb, pData, 1, 10, &rc);
    testDeleteDatasourceRange(pDb, pData, 1, 10, &rc);

    if( rc==0 ){
      db = tdb_lsm(pDb);
      rc = lsm_work(db, LSM_WORK_FLUSH|LSM_WORK_CHECKPOINT, 0, 0);
    }

    testCksumDatabase(pDb, zCksum2);
    testCompareStr(zCksum, zCksum2, &rc);
  }

  testDatasourceFree(pData);
  testClose(&pDb);
  *pRc = rc;
}

static void doWriterCrash1(int *pRc){
  const int nWrite = 2000;
  const int iWriteStart = 20000;
  int rc = 0;
  TestDb *pDb = 0;
  Datasource *pData = 0;

  pDb = testOpen("lsm", 1, &rc);
  if( rc==0 ){
    int iDot = 0;
    char zCksum[TEST_CKSUM_BYTES];
    int i;
    setupDatabase1(pDb, &pData);
    testCksumDatabase(pDb, zCksum);
    testBegin(pDb, 2, &rc);
    for(i=0; rc==0 && i<nWrite; i++){
      testCaseProgress(i, nWrite, testCaseNDot(), &iDot);
      testWriteDatasourceRange(pDb, pData, iWriteStart+i, 1, &rc);
      doLiveRecovery("testdb.lsm", zCksum, &rc);
    }
  }
  testCommit(pDb, 0, &rc);
  testClose(&pDb);
  testDatasourceFree(pData);
  *pRc = rc;
}

void do_writer_crash_test(const char *zPattern, int *pRc){
  if( testCaseBegin(pRc, zPattern, "writercrash1.lsm") ){
    doWriterCrash1(pRc);
    testCaseFinish(*pRc);
  }
}


Changes to lsm-test/lsmtest_main.c.

452
453
454
455
456
457
458




459
460
461
462
463
464
465
....
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
  test_api(zPattern, &rc);
  if( rc ) nFail++;

  rc = 0;
  do_crash_test(zPattern, &rc);
  if( rc ) nFail++;





  return (nFail!=0);
}

static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
................................................................................
    memset(&hook, 0, sizeof(hook));
    hook.pOut = fopen("writelog.txt", "w");

    pData = testDatasourceNew(&defn);
    tdb_lsm_config_work_hook(pDb, do_insert_work_hook, 0);
    tdb_lsm_write_hook(pDb, do_insert_write_hook, (void *)&hook);
    if( zConfig ){
      rc = test_lsm_config_str(tdb_lsm(pDb), 0, zConfig);
    }

    if( rc==0 ){
      for(i=0; i<nRow; i++){
        void *pKey; int nKey;     /* Database key to insert */
        void *pVal; int nVal;     /* Database value to insert */
        testDatasourceEntry(pData, i, &pKey, &nKey, &pVal, &nVal);







>
>
>
>







 







|







452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
....
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
  test_api(zPattern, &rc);
  if( rc ) nFail++;

  rc = 0;
  do_crash_test(zPattern, &rc);
  if( rc ) nFail++;

  rc = 0;
  do_writer_crash_test(zPattern, &rc);
  if( rc ) nFail++;

  return (nFail!=0);
}

static lsm_db *configure_lsm_db(TestDb *pDb){
  lsm_db *pLsm;
  pLsm = tdb_lsm(pDb);
  if( pLsm ){
................................................................................
    memset(&hook, 0, sizeof(hook));
    hook.pOut = fopen("writelog.txt", "w");

    pData = testDatasourceNew(&defn);
    tdb_lsm_config_work_hook(pDb, do_insert_work_hook, 0);
    tdb_lsm_write_hook(pDb, do_insert_write_hook, (void *)&hook);
    if( zConfig ){
      rc = tdb_lsm_config_str(pDb, zConfig);
    }

    if( rc==0 ){
      for(i=0; i<nRow; i++){
        void *pKey; int nKey;     /* Database key to insert */
        void *pVal; int nVal;     /* Database value to insert */
        testDatasourceEntry(pData, i, &pKey, &nKey, &pVal, &nVal);

Changes to lsm-test/lsmtest_tdb.h.

151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
void tdb_lsm_safety(TestDb *pDb, int eMode);
void tdb_lsm_prepare_system_crash(TestDb *pDb);
void tdb_lsm_prepare_sync_crash(TestDb *pDb, int iSync);

void tdb_lsm_config_work_hook(TestDb *pDb, void (*)(lsm_db *, void *), void *);
void tdb_lsm_write_hook(TestDb *, void(*)(void*,int,lsm_i64,int,int), void*);

int test_lsm_config_str(lsm_db *pDb, int bWorker, const char *zStr);
int tdb_lsm_config_str(TestDb *pDb, const char *zStr);

#ifdef __cplusplus
}  /* End of the 'extern "C"' block */
#endif

#endif







<







151
152
153
154
155
156
157

158
159
160
161
162
163
164
void tdb_lsm_safety(TestDb *pDb, int eMode);
void tdb_lsm_prepare_system_crash(TestDb *pDb);
void tdb_lsm_prepare_sync_crash(TestDb *pDb, int iSync);

void tdb_lsm_config_work_hook(TestDb *pDb, void (*)(lsm_db *, void *), void *);
void tdb_lsm_write_hook(TestDb *, void(*)(void*,int,lsm_i64,int,int), void*);


int tdb_lsm_config_str(TestDb *pDb, const char *zStr);

#ifdef __cplusplus
}  /* End of the 'extern "C"' block */
#endif

#endif

Changes to lsm-test/lsmtest_tdb3.c.

90
91
92
93
94
95
96



97
98
99
100
101
102
103
...
312
313
314
315
316
317
318




319
320
321
322
323
324
325
...
577
578
579
580
581
582
583

584
585

586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607

608
609
610
611

612
613
614
615
616
617
618
619
...
637
638
639
640
641
642
643

644
645









646
647
648
649
650
651
652
...
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
  int nAutoCrash;                 /* Number of syncs until a crash */
  int bPrepareCrash;              /* True to store writes in memory */

  /* Unsynced data (while crash testing) */
  int szSector;                   /* Assumed size of disk sectors (512B) */
  FileData aFile[2];              /* Database and log file data */




  /* Work hook redirection */
  void (*xWork)(lsm_db *, void *);
  void *pWorkCtx;

  /* IO logging hook */
  void (*xWriteHook)(void *, int, lsm_i64, int, int);
  void *pWriteCtx;
................................................................................
  unused_parameter(pEnv);
  return pRealEnv->xUnlink(pRealEnv, zFile);
}

static int testEnvLock(lsm_file *pFile, int iLock, int eType){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();




  return pRealEnv->xLock(p->pReal, iLock, eType);
}

static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();
  return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp);
................................................................................
}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}



int test_lsm_config_str(

  lsm_db *pDb, 
  int bWorker,
  const char *zStr
){
  
  struct CfgParam {
    const char *zParam;
    int bWorker;
    int eParam;
  } aParam[] = {
    { "write_buffer",   0, LSM_CONFIG_WRITE_BUFFER },
    { "page_size",      0, LSM_CONFIG_PAGE_SIZE },
    { "block_size",     0, LSM_CONFIG_BLOCK_SIZE },
    { "safety",         0, LSM_CONFIG_SAFETY },
    { "autowork",       0, LSM_CONFIG_AUTOWORK },
    { "log_size",       0, LSM_CONFIG_LOG_SIZE },
    { "mmap",           0, LSM_CONFIG_MMAP },
    { "use_log",        0, LSM_CONFIG_USE_LOG },
    { "nmerge",         0, LSM_CONFIG_NMERGE },
    { "max_freelist",   0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",     0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_nmerge",  1, LSM_CONFIG_NMERGE },

    { 0, 0 }
  };
  const char *z = zStr;


  while( z[0] && pDb ){
    const char *zStart;

    /* Skip whitespace */
    while( *z==' ' ) z++;
    zStart = z;

    while( *z && *z!='=' ) z++;
................................................................................
      while( *z>='0' && *z<='9' ) z++;
      nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      iVal = atoi(zParam);


      if( bWorker || aParam[i].bWorker==0 ){
        lsm_config(pDb, eParam, &iVal);









      }
    }else if( z!=zStart ){
      goto syntax_error;
    }
  }

  return 0;
................................................................................

int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
  int rc = 0;
  if( tdb_lsm(pDb) ){
    int i;
    LsmDb *pLsm = (LsmDb *)pDb;

    rc = test_lsm_config_str(pLsm->db, 0, zStr);
#ifdef LSM_MUTEX_PTHREADS
    for(i=0; rc==0 && i<pLsm->nWorker; i++){
      rc = test_lsm_config_str(pLsm->aWorker[i].pWorker, 1, zStr);
    }
#endif
  }
  return rc;
}

static int testLsmOpen(







>
>
>







 







>
>
>
>







 







>


>
|



<





|
|
|
|
|
|
|
|
|
|
|
|
>




>
|







 







>
|
|
>
>
>
>
>
>
>
>
>







 







|


|







90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
...
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
...
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598

599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
...
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
...
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
  int nAutoCrash;                 /* Number of syncs until a crash */
  int bPrepareCrash;              /* True to store writes in memory */

  /* Unsynced data (while crash testing) */
  int szSector;                   /* Assumed size of disk sectors (512B) */
  FileData aFile[2];              /* Database and log file data */

  /* Other test instrumentation */
  int bNoRecovery;                /* If true, assume DMS2 is locked */

  /* Work hook redirection */
  void (*xWork)(lsm_db *, void *);
  void *pWorkCtx;

  /* IO logging hook */
  void (*xWriteHook)(void *, int, lsm_i64, int, int);
  void *pWriteCtx;
................................................................................
  unused_parameter(pEnv);
  return pRealEnv->xUnlink(pRealEnv, zFile);
}

static int testEnvLock(lsm_file *pFile, int iLock, int eType){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();

  if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
    return LSM_BUSY;
  }
  return pRealEnv->xLock(p->pReal, iLock, eType);
}

static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();
  return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp);
................................................................................
}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}

#define TEST_NO_RECOVERY -1

int test_lsm_config_str(
  LsmDb *pLsm,
  lsm_db *db, 
  int bWorker,
  const char *zStr
){

  struct CfgParam {
    const char *zParam;
    int bWorker;
    int eParam;
  } aParam[] = {
    { "write_buffer",     0, LSM_CONFIG_WRITE_BUFFER },
    { "page_size",        0, LSM_CONFIG_PAGE_SIZE },
    { "block_size",       0, LSM_CONFIG_BLOCK_SIZE },
    { "safety",           0, LSM_CONFIG_SAFETY },
    { "autowork",         0, LSM_CONFIG_AUTOWORK },
    { "log_size",         0, LSM_CONFIG_LOG_SIZE },
    { "mmap",             0, LSM_CONFIG_MMAP },
    { "use_log",          0, LSM_CONFIG_USE_LOG },
    { "nmerge",           0, LSM_CONFIG_NMERGE },
    { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_nmerge",    1, LSM_CONFIG_NMERGE },
    { "test_no_recovery", 0, TEST_NO_RECOVERY },
    { 0, 0 }
  };
  const char *z = zStr;

  assert( db );
  while( z[0] ){
    const char *zStart;

    /* Skip whitespace */
    while( *z==' ' ) z++;
    zStart = z;

    while( *z && *z!='=' ) z++;
................................................................................
      while( *z>='0' && *z<='9' ) z++;
      nParam = z-zStart;
      if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
      memcpy(zParam, zStart, nParam);
      zParam[nParam] = '\0';
      iVal = atoi(zParam);

      if( eParam>0 ){
        if( bWorker || aParam[i].bWorker==0 ){
          lsm_config(db, eParam, &iVal);
        }
      }else{
        if( pLsm ){
          switch( eParam ){
            case TEST_NO_RECOVERY:
              pLsm->bNoRecovery = iVal;
              break;
          }
        }
      }
    }else if( z!=zStart ){
      goto syntax_error;
    }
  }

  return 0;
................................................................................

int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
  int rc = 0;
  if( tdb_lsm(pDb) ){
    int i;
    LsmDb *pLsm = (LsmDb *)pDb;

    rc = test_lsm_config_str(pLsm, pLsm->db, 0, zStr);
#ifdef LSM_MUTEX_PTHREADS
    for(i=0; rc==0 && i<pLsm->nWorker; i++){
      rc = test_lsm_config_str(0, pLsm->aWorker[i].pWorker, 1, zStr);
    }
#endif
  }
  return rc;
}

static int testLsmOpen(

Changes to main.mk.

298
299
300
301
302
303
304
305

306
307
308
309
310
311
312
  $(TOP)/ext/rtree/rtree.h
EXTHDR += \
  $(TOP)/ext/icu/sqliteicu.h

LSMTESTSRC = $(TOP)/lsm-test/lsmtest1.c $(TOP)/lsm-test/lsmtest2.c           \
             $(TOP)/lsm-test/lsmtest3.c $(TOP)/lsm-test/lsmtest4.c           \
             $(TOP)/lsm-test/lsmtest5.c $(TOP)/lsm-test/lsmtest6.c           \
             $(TOP)/lsm-test/lsmtest7.c $(TOP)/lsm-test/lsmtest_datasource.c \

             $(TOP)/lsm-test/lsmtest_func.c $(TOP)/lsm-test/lsmtest_io.c     \
             $(TOP)/lsm-test/lsmtest_main.c $(TOP)/lsm-test/lsmtest_mem.c    \
             $(TOP)/lsm-test/lsmtest_tdb.c $(TOP)/lsm-test/lsmtest_tdb3.c    \
             $(TOP)/lsm-test/lsmtest_util.c 

LSMTESTHDR = $(TOP)/lsm-test/lsmtest.h $(TOP)/lsm-test/lsmtest_tdb.h








|
>







298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
  $(TOP)/ext/rtree/rtree.h
EXTHDR += \
  $(TOP)/ext/icu/sqliteicu.h

LSMTESTSRC = $(TOP)/lsm-test/lsmtest1.c $(TOP)/lsm-test/lsmtest2.c           \
             $(TOP)/lsm-test/lsmtest3.c $(TOP)/lsm-test/lsmtest4.c           \
             $(TOP)/lsm-test/lsmtest5.c $(TOP)/lsm-test/lsmtest6.c           \
             $(TOP)/lsm-test/lsmtest7.c $(TOP)/lsm-test/lsmtest8.c           \
             $(TOP)/lsm-test/lsmtest_datasource.c \
             $(TOP)/lsm-test/lsmtest_func.c $(TOP)/lsm-test/lsmtest_io.c     \
             $(TOP)/lsm-test/lsmtest_main.c $(TOP)/lsm-test/lsmtest_mem.c    \
             $(TOP)/lsm-test/lsmtest_tdb.c $(TOP)/lsm-test/lsmtest_tdb3.c    \
             $(TOP)/lsm-test/lsmtest_util.c 

LSMTESTHDR = $(TOP)/lsm-test/lsmtest.h $(TOP)/lsm-test/lsmtest_tdb.h

Changes to src/lsm_tree.c.

1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
....
1133
1134
1135
1136
1137
1138
1139

1140
1141
1142
1143
1144
1145
1146
....
1164
1165
1166
1167
1168
1169
1170

1171
1172
1173
1174
1175
1176
1177

1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
....
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201








1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212

1213
1214
1215
1216
1217
1218
1219
....
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
*/
typedef struct ShmChunkLoc ShmChunkLoc;
struct ShmChunkLoc {
  ShmChunk *pShm;
  u32 iLoc;
};

/*
** The array aShm[] is of size (nSz*2) elements. The first and second nSz 
** elements are both sorted in order of iShmid. This function merges the two
** arrays and writes the sorted results over the top of aShm[].
**
** Argument aSpace[] points to an array of at least (nSz*2) elements that
** can be used as temporary storage space while sorting.
*/
static void treeSortByShmid(ShmChunkLoc *aShm1, int nSz, ShmChunkLoc *aSpace){
  ShmChunkLoc *aShm2 = &aShm1[nSz];
  int i1 = 0;
  int i2 = 0;
  int iOut = 0;

  while( i1<nSz || i2<nSz ){
    if( i1==nSz || (i2!=nSz && aShm1[i1].pShm==0) ){
      aSpace[iOut] = aShm2[i2++];
    }else if( i2==nSz || aShm2[i2].pShm==0 ){
      aSpace[iOut] = aShm1[i1++];
    }else{
      assert( aShm1[i1].pShm && aShm2[i2].pShm );
      if( shm_sequence_ge(aShm1[i1].pShm->iShmid, aShm2[i2].pShm->iShmid) ){
        aSpace[iOut] = aShm2[i2++];
      }else{
        aSpace[iOut] = aShm1[i1++];
      }
    }
    iOut++;
  }

  memcpy(aShm1, aSpace, sizeof(ShmChunk *) * nSz*2);
}

/*
** This function checks that the linked list of shared memory chunks 
** that starts at chunk db->treehdr.iFirst:
**
**   1) Includes all chunks in the shared-memory region, and
**   2) Links them together in order of ascending shm-id.
**
................................................................................
      if( pNode->iV2>iTransId ){
        pNode->iV2Child = 0;
        pNode->iV2Ptr = 0;
        pNode->iV2 = 0;
      }
      rc = lsmTreeCursorNext(&csr);
    }


    db->treehdr.nHeight++;
  }

  return rc;
}

................................................................................
  /* Fix the shm-id values on any chunks with a shm-id greater than or 
  ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to 
  ** fix the ShmChunk.iNext pointers.
  */
  if( rc==LSM_OK ){
    int nSort;
    int nByte;

    ShmChunkLoc *aSort;

    /* Allocate space for a merge sort. */
    nSort = 1;
    while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
    nByte = sizeof(ShmChunkLoc) * nSort * 2;
    aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);


    /* Fix all shm-ids, if required. */
    if( rc==LSM_OK && iMin!=db->treehdr.iFirst ){
      u32 iPrevShmid = pMin->iShmid-1;
      for(i=1; i<db->treehdr.nChunk; i++){
        p = treeShmChunk(db, i);
        aSort[i-1].pShm = p;
        aSort[i-1].iLoc = i;
        if( i!=db->treehdr.iFirst ){
          if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
            p->iShmid = iPrevShmid--;
................................................................................
      }
      p = treeShmChunk(db, db->treehdr.iFirst);
      p->iShmid = iPrevShmid;
    }

    if( rc==LSM_OK ){
      ShmChunkLoc *aSpace = &aSort[nSort];
      for(i=2; i<=nSort; i+=2){
        int nSz;
        for(nSz=1; (i & (1 << (nSz-1)))==0; nSz++){
          treeSortByShmid(&aSort[i - nSz*2], nSz, aSpace);








        }
      }

      for(i=0; i<nSort-1; i++){
        if( aSort[i].pShm ){
          aSort[i].pShm->iNext = aSort[i+1].iLoc;
        }
      }
      assert( aSort[nSort-1].iLoc==0 );

      rc = treeCheckLinkedList(db);

    }
  }

  return rc;
}

/*
................................................................................
** If the header is successfully loaded and parameter piRead is not NULL,
** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if
** the header was loaded from ShmHeader.hdr2.
*/
int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){
  int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
  while( (nRem--)>0 ){
    int rc;
    ShmHeader *pShm = pDb->pShmhdr;

    memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
    if( treeHeaderChecksumOk(&pDb->treehdr) ){
      if( piRead ) *piRead = 1;
      return LSM_OK;
    }







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







>







 







>







>



|







 







|
|
|
|
>
>
>
>
>
>
>
>



<
<
<
<
<
<
<

>







 







<







1031
1032
1033
1034
1035
1036
1037

































1038
1039
1040
1041
1042
1043
1044
....
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
....
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
....
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182







1183
1184
1185
1186
1187
1188
1189
1190
1191
....
1771
1772
1773
1774
1775
1776
1777

1778
1779
1780
1781
1782
1783
1784
*/
typedef struct ShmChunkLoc ShmChunkLoc;
struct ShmChunkLoc {
  ShmChunk *pShm;
  u32 iLoc;
};


































/*
** This function checks that the linked list of shared memory chunks 
** that starts at chunk db->treehdr.iFirst:
**
**   1) Includes all chunks in the shared-memory region, and
**   2) Links them together in order of ascending shm-id.
**
................................................................................
      if( pNode->iV2>iTransId ){
        pNode->iV2Child = 0;
        pNode->iV2Ptr = 0;
        pNode->iV2 = 0;
      }
      rc = lsmTreeCursorNext(&csr);
    }
    tblobFree(csr.pDb, &csr.blob);

    db->treehdr.nHeight++;
  }

  return rc;
}

................................................................................
  /* Fix the shm-id values on any chunks with a shm-id greater than or 
  ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to 
  ** fix the ShmChunk.iNext pointers.
  */
  if( rc==LSM_OK ){
    int nSort;
    int nByte;
    u32 iPrevShmid;
    ShmChunkLoc *aSort;

    /* Allocate space for a merge sort. */
    nSort = 1;
    while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
    nByte = sizeof(ShmChunkLoc) * nSort * 2;
    aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
    iPrevShmid = pMin->iShmid;

    /* Fix all shm-ids, if required. */
    if( rc==LSM_OK && iMin!=db->treehdr.iFirst ){
      iPrevShmid = pMin->iShmid-1;
      for(i=1; i<db->treehdr.nChunk; i++){
        p = treeShmChunk(db, i);
        aSort[i-1].pShm = p;
        aSort[i-1].iLoc = i;
        if( i!=db->treehdr.iFirst ){
          if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
            p->iShmid = iPrevShmid--;
................................................................................
      }
      p = treeShmChunk(db, db->treehdr.iFirst);
      p->iShmid = iPrevShmid;
    }

    if( rc==LSM_OK ){
      ShmChunkLoc *aSpace = &aSort[nSort];
      for(i=0; i<nSort; i++){
        if( aSort[i].pShm ){
          assert( shm_sequence_ge(aSort[i].pShm->iShmid, iPrevShmid) );
          assert( aSpace[aSort[i].pShm->iShmid - iPrevShmid].pShm==0 );
          aSpace[aSort[i].pShm->iShmid - iPrevShmid] = aSort[i];
        }
      }

      if( aSpace[nSort-1].pShm ) aSpace[nSort-1].pShm->iNext = 0;
      for(i=0; i<nSort-1; i++){
        if( aSpace[i].pShm ){
          aSpace[i].pShm->iNext = aSpace[i+1].iLoc;
        }
      }








      rc = treeCheckLinkedList(db);
      lsmFree(db->pEnv, aSort);
    }
  }

  return rc;
}

/*
................................................................................
** If the header is successfully loaded and parameter piRead is not NULL,
** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if
** the header was loaded from ShmHeader.hdr2.
*/
int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){
  int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
  while( (nRem--)>0 ){

    ShmHeader *pShm = pDb->pShmhdr;

    memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
    if( treeHeaderChecksumOk(&pDb->treehdr) ){
      if( piRead ) *piRead = 1;
      return LSM_OK;
    }