SQLite4
Check-in [cf2ef747ad]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Merge rework-flow-control branch with trunk.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: cf2ef747ad1dacebe3439128eb4488396d005079
User & Date: dan 2012-09-26 15:38:48
Context
2012-09-27
04:59
Fix a deadlock in multi-threaded test code. check-in: e16b04ca69 user: dan tags: trunk
2012-09-26
15:38
Merge rework-flow-control branch with trunk. check-in: cf2ef747ad user: dan tags: trunk
15:23
Fix invocation of work-hook following an lsmTreeMakeOld() call. Leaf check-in: 0a45bfc7a4 user: dan tags: rework-flow-control
2012-09-20
19:33
Add lsm_tree_size() and lsm_ckpt_size(). check-in: 5062ffb017 user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest2.c.

287
288
289
290
291
292
293
294
295

296
297
298
299
300
301
302
    assert( testrc==0 );

    /* Call lsm_work() on the db */
    tdb_lsm_prepare_sync_crash(pDb, 1 + (i%(nWork*2)));
    for(iWork=0; testrc==0 && iWork<nWork; iWork++){
      int nWrite = 0;
      lsm_db *db = tdb_lsm(pDb);
      testrc = lsm_work(db, LSM_WORK_CHECKPOINT, nPage, &nWrite);
      assert( testrc!=0 || nWrite>0 );

    }
    tdb_close(pDb);

    /* Check that the database content is still correct */
    testCompareCksumLsmdb(DBNAME, testCksumArrayGet(pCksumDb, nRow), 0, pRc);
  }








|

>







287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
    assert( testrc==0 );

    /* Call lsm_work() on the db */
    tdb_lsm_prepare_sync_crash(pDb, 1 + (i%(nWork*2)));
    for(iWork=0; testrc==0 && iWork<nWork; iWork++){
      int nWrite = 0;
      lsm_db *db = tdb_lsm(pDb);
      testrc = lsm_work(db, 0, nPage, &nWrite);
      assert( testrc!=0 || nWrite>0 );
      if( testrc==0 ) testrc = lsm_checkpoint(db, 0);
    }
    tdb_close(pDb);

    /* Check that the database content is still correct */
    testCompareCksumLsmdb(DBNAME, testCksumArrayGet(pCksumDb, nRow), 0, pRc);
  }

Changes to lsm-test/lsmtest8.c.

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
..
28
29
30
31
32
33
34


35







36



37
38
39
40
41
42
43
..
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
...
167
168
169
170
171
172
173

174



175
176
177
178
179
180
181
182
*/
#include "lsmInt.h"

#include "lsmtest.h"

typedef struct SetupStep SetupStep;
struct SetupStep {
  int workflags;                  /* Flags to pass to lsm_work() */
  int iInsStart;                  /* First key-value from ds to insert */
  int nIns;                       /* Number of rows to insert */
  int iDelStart;                  /* First key from ds to delete */
  int nDel;                       /* Number of rows to delete */
};

static void doSetupStep(
................................................................................
  Datasource *pData, 
  const SetupStep *pStep, 
  int *pRc
){
  testWriteDatasourceRange(pDb, pData, pStep->iInsStart, pStep->nIns, pRc);
  testDeleteDatasourceRange(pDb, pData, pStep->iDelStart, pStep->nDel, pRc);
  if( *pRc==0 ){


    lsm_db *db = tdb_lsm(pDb);







    *pRc = lsm_work(db, pStep->workflags, 0, 0);



  }
}

static void doSetupStepArray(
  TestDb *pDb, 
  Datasource *pData, 
  const SetupStep *aStep, 
................................................................................
    assert( rc==0 );
  }
}

static void setupDatabase1(TestDb *pDb, Datasource **ppData){
  const SetupStep aStep[] = {
    { 0,                                  1,     2000, 0, 0 },
    { LSM_WORK_CHECKPOINT|LSM_WORK_FLUSH, 0,     0, 0, 0 },
    { 0,                                  10001, 1000, 0, 0 },
  };
  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 100, 500};
  Datasource *pData;

  pData = testDatasourceNew(&defn);
  doSetupStepArray(pDb, pData, aStep, ArraySize(aStep));
................................................................................
    pHdr = getShmHeader(zCopy);
    if( rc==0 && memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(pHdr->hdr1)) ){
      rc = 1;
    }
    testFree(pHdr);

    if( rc==0 ){

      db = tdb_lsm(pDb);



      rc = lsm_work(db, LSM_WORK_FLUSH|LSM_WORK_CHECKPOINT, 0, 0);
    }

    testCksumDatabase(pDb, zCksum2);
    testCompareStr(zCksum, zCksum2, &rc);
  }

  testDatasourceFree(pData);







|







 







>
>

>
>
>
>
>
>
>
|
>
>
>







 







|







 







>

>
>
>
|







12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
..
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
..
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
...
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
*/
#include "lsmInt.h"

#include "lsmtest.h"

typedef struct SetupStep SetupStep;
struct SetupStep {
  int bFlush;                     /* Flush to disk and checkpoint */
  int iInsStart;                  /* First key-value from ds to insert */
  int nIns;                       /* Number of rows to insert */
  int iDelStart;                  /* First key from ds to delete */
  int nDel;                       /* Number of rows to delete */
};

static void doSetupStep(
................................................................................
  Datasource *pData, 
  const SetupStep *pStep, 
  int *pRc
){
  testWriteDatasourceRange(pDb, pData, pStep->iInsStart, pStep->nIns, pRc);
  testDeleteDatasourceRange(pDb, pData, pStep->iDelStart, pStep->nDel, pRc);
  if( *pRc==0 ){
    int nSave = -1;
    int nBuf = 64;
    lsm_db *db = tdb_lsm(pDb);

    lsm_config(db, LSM_CONFIG_WRITE_BUFFER, &nSave);
    lsm_config(db, LSM_CONFIG_WRITE_BUFFER, &nBuf);
    lsm_begin(db, 1);
    lsm_commit(db, 0);
    lsm_config(db, LSM_CONFIG_WRITE_BUFFER, &nSave);

    *pRc = lsm_work(db, LSM_WORK_FLUSH, 0, 0);
    if( *pRc==0 ){
      *pRc = lsm_checkpoint(db, 0);
    }
  }
}

static void doSetupStepArray(
  TestDb *pDb, 
  Datasource *pData, 
  const SetupStep *aStep, 
................................................................................
    assert( rc==0 );
  }
}

static void setupDatabase1(TestDb *pDb, Datasource **ppData){
  const SetupStep aStep[] = {
    { 0,                                  1,     2000, 0, 0 },
    { 1,                                  0,     0, 0, 0 },
    { 0,                                  10001, 1000, 0, 0 },
  };
  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 100, 500};
  Datasource *pData;

  pData = testDatasourceNew(&defn);
  doSetupStepArray(pDb, pData, aStep, ArraySize(aStep));
................................................................................
    pHdr = getShmHeader(zCopy);
    if( rc==0 && memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(pHdr->hdr1)) ){
      rc = 1;
    }
    testFree(pHdr);

    if( rc==0 ){
      int nBuf = 64;
      db = tdb_lsm(pDb);
      lsm_config(db, LSM_CONFIG_WRITE_BUFFER, &nBuf);
      lsm_begin(db, 1);
      lsm_commit(db, 0);
      rc = lsm_work(db, LSM_WORK_FLUSH, 0, 0);
    }

    testCksumDatabase(pDb, zCksum2);
    testCompareStr(zCksum, zCksum2, &rc);
  }

  testDatasourceFree(pData);

Changes to lsm-test/lsmtest_func.c.

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
..
46
47
48
49
50
51
52



53
54
55
56
57
58
59
    { 0 }
  };

  lsm_db *pDb;
  int rc;
  int i;
  const char *zDb;
  int flags = LSM_WORK_CHECKPOINT;
  int nWork = (1<<30);

  if( nArg==0 ) goto usage;
  zDb = azArg[nArg-1];
  for(i=0; i<(nArg-1); i++){
    int iSel;
    rc = testArgSelect(aOpt, "option", azArg[i], &iSel);
................................................................................
    }else{
      rc = lsm_work(pDb, flags, nWork, 0);
      if( rc!=LSM_OK ){
        testPrintError("lsm_work(): rc=%d\n", rc);
      }
    }
  }




  lsm_close(pDb);
  return rc;

 usage:
  testPrintUsage("?-optimize? ?-n N? DATABASE");
  return -1;







|







 







>
>
>







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
..
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
    { 0 }
  };

  lsm_db *pDb;
  int rc;
  int i;
  const char *zDb;
  int flags = LSM_WORK_FLUSH;
  int nWork = (1<<30);

  if( nArg==0 ) goto usage;
  zDb = azArg[nArg-1];
  for(i=0; i<(nArg-1); i++){
    int iSel;
    rc = testArgSelect(aOpt, "option", azArg[i], &iSel);
................................................................................
    }else{
      rc = lsm_work(pDb, flags, nWork, 0);
      if( rc!=LSM_OK ){
        testPrintError("lsm_work(): rc=%d\n", rc);
      }
    }
  }
  if( rc==LSM_OK ){
    rc = lsm_checkpoint(pDb, 0);
  }

  lsm_close(pDb);
  return rc;

 usage:
  testPrintUsage("?-optimize? ?-n N? DATABASE");
  return -1;

Changes to lsm-test/lsmtest_tdb3.c.

25
26
27
28
29
30
31

32
33
34
35
36
37
38
...
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
...
622
623
624
625
626
627
628

629
630
631
632
633
634
635
...
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
...
928
929
930
931
932
933
934
935
936
937
938
939







940
941

942


943
944

945
946
947
948
949
950
951
952


953
954




955

956
957
958
959
960
961
962
963
964
965
966
967
968
...
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
....
1041
1042
1043
1044
1045
1046
1047

1048
1049

1050
1051
1052
1053
1054
1055
1056
1057
1058

1059
1060
1061
1062



1063

1064

1065
1066
1067
1068
1069
1070
1071
....
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097

1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
....
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
  pthread_cond_t worker_cond;     /* Condition var the worker waits on */
  pthread_mutex_t worker_mutex;   /* Mutex used with worker_cond */
  int bDoWork;                    /* Set to true by client when there is work */
  int worker_rc;                  /* Store error code here */

  int lsm_work_flags;             /* Flags to pass to lsm_work() */
  int lsm_work_npage;             /* nPage parameter to pass to lsm_work() */

};
#else
struct LsmWorker { int worker_rc; };
#endif

static void mt_shutdown(LsmDb *);

................................................................................
*/
static void xLog(void *pCtx, int rc, const char *z){
  unused_parameter(rc);
  /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */
  if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
  fprintf(stderr, "%s\n", z);
  fflush(stderr);

}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}

................................................................................
    int eParam;
  } aParam[] = {
    { "write_buffer",     0, LSM_CONFIG_WRITE_BUFFER },
    { "page_size",        0, LSM_CONFIG_PAGE_SIZE },
    { "block_size",       0, LSM_CONFIG_BLOCK_SIZE },
    { "safety",           0, LSM_CONFIG_SAFETY },
    { "autowork",         0, LSM_CONFIG_AUTOWORK },

    { "log_size",         0, LSM_CONFIG_LOG_SIZE },
    { "mmap",             0, LSM_CONFIG_MMAP },
    { "use_log",          0, LSM_CONFIG_USE_LOG },
    { "nmerge",           0, LSM_CONFIG_NMERGE },
    { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_nmerge",    1, LSM_CONFIG_NMERGE },
................................................................................

int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 max_freelist=4";
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){
    return ((LsmDb *)pDb)->db;
  }
................................................................................
static void *worker_main(void *pArg){
  LsmWorker *p = (LsmWorker *)pArg;
  lsm_db *pWorker;                /* Connection to access db through */

  pthread_mutex_lock(&p->worker_mutex);
  while( (pWorker = p->pWorker) ){
    int nWrite = 0;
    int rc;

    /* Do some work. If an error occurs, exit. */
    pthread_mutex_unlock(&p->worker_mutex);
    if( (p->lsm_work_flags & LSM_WORK_CHECKPOINT)==0 ){







      int nSleep = 0;
      while( 1 ){

        int nByte = 0;


        lsm_ckpt_size(pWorker, &nByte);
        if( nByte<(32*1024*1024) ) break;

        mt_signal_worker(p->pDb, 1);
        usleep(1000);
        nSleep++;
      }
#if 0
      if( nSleep ) printf("nSleep=%d (worker)\n", nSleep);
#endif
    }


    rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite);
/*    printf("# worked %d units\n", nWrite); */




    pthread_mutex_lock(&p->worker_mutex);

    if( rc!=LSM_OK && rc!=LSM_BUSY ){
      p->worker_rc = rc;
      break;
    }

    if( nWrite && (p->lsm_work_flags & LSM_WORK_CHECKPOINT)==0 ){
      mt_signal_worker(p->pDb, 1);
    }

    /* If the call to lsm_work() indicates that there is nothing more
    ** to do at this point, wait on the condition variable. The thread will
    ** wake up when it is signaled either because the client thread has
    ** flushed an in-memory tree into the db file or when the connection
................................................................................
      if( p->pWorker && p->bDoWork==0 ){
        pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
      }
      p->bDoWork = 0;
    }
  }
  pthread_mutex_unlock(&p->worker_mutex);
  /* printf("# worker EXIT\n"); */
  
  return 0;
}


static void mt_stop_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
................................................................................
/*
** Launch worker thread iWorker for database connection pDb.
*/
static int mt_start_worker(
  LsmDb *pDb,                     /* Main database structure */
  int iWorker,                    /* Worker number to start */
  const char *zFilename,          /* File name of database to open */

  int flags,                      /* flags parameter to lsm_work() */
  int nPage                       /* nPage parameter to lsm_work() */

){
  int rc = 0;                     /* Return code */
  LsmWorker *p;                   /* Object to initialize */

  assert( iWorker<pDb->nWorker );

  p = &pDb->aWorker[iWorker];
  p->lsm_work_flags = flags;
  p->lsm_work_npage = nPage;

  p->pDb = pDb;

  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);



  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);

lsm_config_log(p->pWorker, xLog, (void *)"worker");


  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  /* Kick off the worker thread. */
................................................................................
  lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);

  pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
  memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
  pDb->nWorker = nWorker;

  if( nWorker==1 ){
    int flags = LSM_WORK_CHECKPOINT|LSM_WORK_FLUSH;
    rc = mt_start_worker(pDb, 0, zFilename, flags, 2048);
  }else{

    rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_FLUSH, 1024);
    if( rc==LSM_OK ){
      rc = mt_start_worker(pDb, 1, zFilename, LSM_WORK_CHECKPOINT, 0);
    }
  }

  return rc;
}


................................................................................
  }

  if( rc==0 ){
    pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
    memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
    pDb->nWorker = nWorker;

    rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_CHECKPOINT, 
        nWorker==1 ? 512 : 0
    );
  }

  if( rc==0 && nWorker==2 ){
    rc = mt_start_worker(pDb, 1, zFilename, 0, 512);
  }

  return rc;
}

int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 1, bClear, ppDb);







>







 







<







 







>







 







|







 







|



|
>
>
>
>
>
>
>
|
|
>
|
>
>
|
<
>
|
|
|
|

|

|
>
>
|

>
>
>
>

>





|







 







<







 







>

|
>









>




>
>
>

>

>







 







|
|

>
|

|







 







|
|




|







25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
...
595
596
597
598
599
600
601

602
603
604
605
606
607
608
...
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
...
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
...
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954

955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
...
989
990
991
992
993
994
995

996
997
998
999
1000
1001
1002
....
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
....
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
....
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
  pthread_cond_t worker_cond;     /* Condition var the worker waits on */
  pthread_mutex_t worker_mutex;   /* Mutex used with worker_cond */
  int bDoWork;                    /* Set to true by client when there is work */
  int worker_rc;                  /* Store error code here */

  int lsm_work_flags;             /* Flags to pass to lsm_work() */
  int lsm_work_npage;             /* nPage parameter to pass to lsm_work() */
  int bCkpt;                      /* True to call lsm_checkpoint() */
};
#else
struct LsmWorker { int worker_rc; };
#endif

static void mt_shutdown(LsmDb *);

................................................................................
*/
static void xLog(void *pCtx, int rc, const char *z){
  unused_parameter(rc);
  /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */
  if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
  fprintf(stderr, "%s\n", z);
  fflush(stderr);

}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}

................................................................................
    int eParam;
  } aParam[] = {
    { "write_buffer",     0, LSM_CONFIG_WRITE_BUFFER },
    { "page_size",        0, LSM_CONFIG_PAGE_SIZE },
    { "block_size",       0, LSM_CONFIG_BLOCK_SIZE },
    { "safety",           0, LSM_CONFIG_SAFETY },
    { "autowork",         0, LSM_CONFIG_AUTOWORK },
    { "autocheckpoint",   0, LSM_CONFIG_AUTOCHECKPOINT },
    { "log_size",         0, LSM_CONFIG_LOG_SIZE },
    { "mmap",             0, LSM_CONFIG_MMAP },
    { "use_log",          0, LSM_CONFIG_USE_LOG },
    { "nmerge",           0, LSM_CONFIG_NMERGE },
    { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_nmerge",    1, LSM_CONFIG_NMERGE },
................................................................................

int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 max_freelist=4 autocheckpoint=32768";
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){
    return ((LsmDb *)pDb)->db;
  }
................................................................................
static void *worker_main(void *pArg){
  LsmWorker *p = (LsmWorker *)pArg;
  lsm_db *pWorker;                /* Connection to access db through */

  pthread_mutex_lock(&p->worker_mutex);
  while( (pWorker = p->pWorker) ){
    int nWrite = 0;
    int rc = LSM_OK;

    /* Do some work. If an error occurs, exit. */
    pthread_mutex_unlock(&p->worker_mutex);
    if( p->bCkpt==0 ){
      static const int nLimit = 16*1024*1024;
      static const int nIncr = 4*1024*1024;
      int nMax = 100;
      int nByte = 0;

      lsm_ckpt_size(pWorker, &nByte);
      if( nByte>nLimit ){
        int nSleep = 0;
        while( nByte>nLimit ){
          nMax = nMax<<1;
          nByte -= nIncr;
        }
        while( nSleep<nMax ){
          lsm_ckpt_size(pWorker, &nByte);

          if( nByte<nLimit ) break;
          mt_signal_worker(p->pDb, 1);
          usleep(1000);
          nSleep++;
        }
#if 0
      if( nSleep ) printf("nSleep=%d/%d (worker)\n", nSleep, nMax);
#endif
      }
    }
    if( p->lsm_work_npage ){
      rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite);
/*    printf("# worked %d units\n", nWrite); */
    }
    if( rc==LSM_OK && p->bCkpt ){
      rc = lsm_checkpoint(pWorker, 0);
    }
    pthread_mutex_lock(&p->worker_mutex);

    if( rc!=LSM_OK && rc!=LSM_BUSY ){
      p->worker_rc = rc;
      break;
    }

    if( nWrite && p->bCkpt==0 ){
      mt_signal_worker(p->pDb, 1);
    }

    /* If the call to lsm_work() indicates that there is nothing more
    ** to do at this point, wait on the condition variable. The thread will
    ** wake up when it is signaled either because the client thread has
    ** flushed an in-memory tree into the db file or when the connection
................................................................................
      if( p->pWorker && p->bDoWork==0 ){
        pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
      }
      p->bDoWork = 0;
    }
  }
  pthread_mutex_unlock(&p->worker_mutex);

  
  return 0;
}


static void mt_stop_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
................................................................................
/*
** Launch worker thread iWorker for database connection pDb.
*/
static int mt_start_worker(
  LsmDb *pDb,                     /* Main database structure */
  int iWorker,                    /* Worker number to start */
  const char *zFilename,          /* File name of database to open */
  const char *zCfg,
  int flags,                      /* flags parameter to lsm_work() */
  int nPage,                      /* nPage parameter to lsm_work() */
  int bCkpt                       /* True to call lsm_checkpoint() */
){
  int rc = 0;                     /* Return code */
  LsmWorker *p;                   /* Object to initialize */

  assert( iWorker<pDb->nWorker );

  p = &pDb->aWorker[iWorker];
  p->lsm_work_flags = flags;
  p->lsm_work_npage = nPage;
  p->bCkpt = bCkpt;
  p->pDb = pDb;

  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
  if( zCfg ){
    test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);
  }
  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);
#if 0
lsm_config_log(p->pWorker, xLog, (void *)"worker");
#endif

  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  /* Kick off the worker thread. */
................................................................................
  lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);

  pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
  memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
  pDb->nWorker = nWorker;

  if( nWorker==1 ){
    int flags = LSM_WORK_FLUSH;
    rc = mt_start_worker(pDb, 0, zFilename, zCfg, flags, 2048, 1);
  }else{
    int flags = LSM_WORK_FLUSH;
    rc = mt_start_worker(pDb, 0, zFilename, zCfg, flags, 1024, 0);
    if( rc==LSM_OK ){
      rc = mt_start_worker(pDb, 1, zFilename, zCfg, 0, 0, 1);
    }
  }

  return rc;
}


................................................................................
  }

  if( rc==0 ){
    pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
    memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
    pDb->nWorker = nWorker;

    rc = mt_start_worker(pDb, 0, zFilename, 0, LSM_WORK_FLUSH, 
        nWorker==1 ? 512 : 0, 1
    );
  }

  if( rc==0 && nWorker==2 ){
    rc = mt_start_worker(pDb, 1, zFilename, 0, 0, 512, 0);
  }

  return rc;
}

int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 1, bClear, ppDb);

Changes to src/kvlsm.c.

385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
      int nWrite = 0;
      lsm_work(p->pDb, LSM_WORK_OPTIMIZE, nPage, &nWrite);
      *(int*)pArg = nWrite;
      break;
    }

    case SQLITE4_KVCTRL_LSM_CHECKPOINT: {
      lsm_work(p->pDb, LSM_WORK_CHECKPOINT, 0, 0);
      break;
    }


    default:
      rc = SQLITE4_NOTFOUND;
      break;







|







385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
      int nWrite = 0;
      lsm_work(p->pDb, LSM_WORK_OPTIMIZE, nPage, &nWrite);
      *(int*)pArg = nWrite;
      break;
    }

    case SQLITE4_KVCTRL_LSM_CHECKPOINT: {
      lsm_checkpoint(p->pDb, 0);
      break;
    }


    default:
      rc = SQLITE4_NOTFOUND;
      break;

Changes to src/lsm.h.

163
164
165
166
167
168
169



170
171
172
173
174
175
176
...
197
198
199
200
201
202
203

204
205
206
207
208
209
210
...
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447













448
449
450
451
452
453
454
**       2 (full):   Full robustness. A system crash may not corrupt the
**                   database file. Following recovery the database file
**                   contains all successfully committed transactions.
**
**   LSM_CONFIG_AUTOWORK
**     A read/write integer parameter.
**



**   LSM_CONFIG_MMAP
**     A read/write integer parameter. True to use mmap() to access the 
**     database file. False otherwise.
**
**   LSM_CONFIG_USE_LOG
**     A read/write boolean parameter. True (the default) to use the log
**     file normally. False otherwise.
................................................................................
#define LSM_CONFIG_AUTOWORK            5
#define LSM_CONFIG_LOG_SIZE            6
#define LSM_CONFIG_MMAP                7
#define LSM_CONFIG_USE_LOG             8
#define LSM_CONFIG_NMERGE              9
#define LSM_CONFIG_MAX_FREELIST       10
#define LSM_CONFIG_MULTIPLE_PROCESSES 11


#define LSM_SAFETY_OFF    0
#define LSM_SAFETY_NORMAL 1
#define LSM_SAFETY_FULL   2


/*
................................................................................
**
** RACE CONDITION:
**   Describe the race condition this function is subject to. Or remove
**   it somehow.
*/
int lsm_ckpt_size(lsm_db *, int *pnByte);


/*
** This function is called by a thread to work on the database structure.
** The actual operations performed by this function depend on the value 
** passed as the "flags" parameter:
**
** LSM_WORK_FLUSH:
**   Attempt to flush the contents of the in-memory tree to disk.
**
** LSM_WORK_CHECKPOINT:
**   Write a checkpoint (if one exists in memory) to the database file.
**
** LSM_WORK_OPTIMIZE:
**   If nMerge suitable arrays cannot be found, where nMerge is as 
**   configured by LSM_CONFIG_NMERGE, merge together any arrays that
**   can be found. This is usually used to optimize the database by 
**   merging the whole thing into one big array.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite);

#define LSM_WORK_FLUSH           0x00000001
#define LSM_WORK_CHECKPOINT      0x00000002
#define LSM_WORK_OPTIMIZE        0x00000004

/* 













** Open and close a database cursor.
*/
int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr);
int lsm_csr_close(lsm_cursor *pCsr);

/* 
** If the fourth parameter is LSM_SEEK_EQ, LSM_SEEK_GE or LSM_SEEK_LE,







>
>
>







 







>







 







<








<
<
<









<
|

|
>
>
>
>
>
>
>
>
>
>
>
>
>







163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
...
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
...
420
421
422
423
424
425
426

427
428
429
430
431
432
433
434



435
436
437
438
439
440
441
442
443

444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
**       2 (full):   Full robustness. A system crash may not corrupt the
**                   database file. Following recovery the database file
**                   contains all successfully committed transactions.
**
**   LSM_CONFIG_AUTOWORK
**     A read/write integer parameter.
**
**   LSM_CONFIG_AUTOCHECKPOINT
**     A read/write integer parameter.
**
**   LSM_CONFIG_MMAP
**     A read/write integer parameter. True to use mmap() to access the 
**     database file. False otherwise.
**
**   LSM_CONFIG_USE_LOG
**     A read/write boolean parameter. True (the default) to use the log
**     file normally. False otherwise.
................................................................................
#define LSM_CONFIG_AUTOWORK            5
#define LSM_CONFIG_LOG_SIZE            6
#define LSM_CONFIG_MMAP                7
#define LSM_CONFIG_USE_LOG             8
#define LSM_CONFIG_NMERGE              9
#define LSM_CONFIG_MAX_FREELIST       10
#define LSM_CONFIG_MULTIPLE_PROCESSES 11
#define LSM_CONFIG_AUTOCHECKPOINT     12

#define LSM_SAFETY_OFF    0
#define LSM_SAFETY_NORMAL 1
#define LSM_SAFETY_FULL   2


/*
................................................................................
**
** RACE CONDITION:
**   Describe the race condition this function is subject to. Or remove
**   it somehow.
*/
int lsm_ckpt_size(lsm_db *, int *pnByte);


/*
** This function is called by a thread to work on the database structure.
** The actual operations performed by this function depend on the value 
** passed as the "flags" parameter:
**
** LSM_WORK_FLUSH:
**   Attempt to flush the contents of the in-memory tree to disk.
**



** LSM_WORK_OPTIMIZE:
**   If nMerge suitable arrays cannot be found, where nMerge is as 
**   configured by LSM_CONFIG_NMERGE, merge together any arrays that
**   can be found. This is usually used to optimize the database by 
**   merging the whole thing into one big array.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite);

#define LSM_WORK_FLUSH           0x00000001

#define LSM_WORK_OPTIMIZE        0x00000002

/*
** Attempt to checkpoint the current database snapshot. Return an LSM
** error code if an error occurs or LSM_OK otherwise.
**
** If the current snapshot has already been checkpointed, calling this 
** function is a no-op. In this case if pnByte is not NULL, *pnByte is
** set to 0. Or, if the current snapshot is successfully checkpointed
** by this function and pbCkpt is not NULL, *pnByte is set to the number
** of bytes written to the database file since the previous checkpoint
** (the same measure as returned by lsm_ckpt_size()).
*/
int lsm_checkpoint(lsm_db *pDb, int *pnByte);

/*
** Open and close a database cursor.
*/
int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr);
int lsm_csr_close(lsm_cursor *pCsr);

/* 
** If the fourth parameter is LSM_SEEK_EQ, LSM_SEEK_GE or LSM_SEEK_LE,

Changes to src/lsmInt.h.

38
39
40
41
42
43
44
45
46
47
48

49
50
51
52
53
54
55
56
57
58
59
60
61
...
290
291
292
293
294
295
296

297
298
299
300
301
302
303
...
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
...
498
499
500
501
502
503
504

505
506
507
508
509
510
511
...
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
...
672
673
674
675
676
677
678
679
680
681


682
683
684
685
686
687
688
...
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
# endif
#endif

/*
** Default values for various data structure parameters. These may be
** overridden by calls to lsm_config().
*/
#define LSM_PAGE_SIZE   4096
#define LSM_BLOCK_SIZE  (2 * 1024 * 1024)
#define LSM_TREE_BYTES  (2 * 1024 * 1024)


#define LSM_DEFAULT_LOG_SIZE (128*1024)
#define LSM_DEFAULT_NMERGE   4

/* Places where a NULL needs to be changed to a real lsm_env pointer
** are marked with NEED_ENV */
#define NEED_ENV ((lsm_env*)0)

/* Initial values for log file checksums. These are only used if the 
** database file does not contain a valid checkpoint.  */
#define LSM_CKSUM0_INIT 42
#define LSM_CKSUM1_INIT 42

#define LSM_META_PAGE_SIZE 4096
................................................................................
  int nMerge;                     /* Configured by LSM_CONFIG_NMERGE */
  int nLogSz;                     /* Configured by LSM_CONFIG_LOG_SIZE */
  int bUseLog;                    /* Configured by LSM_CONFIG_USE_LOG */
  int nDfltPgsz;                  /* Configured by LSM_CONFIG_PAGE_SIZE */
  int nDfltBlksz;                 /* Configured by LSM_CONFIG_BLOCK_SIZE */
  int nMaxFreelist;               /* Configured by LSM_CONFIG_MAX_FREELIST */
  int bMmap;                      /* Configured by LSM_CONFIG_MMAP */

  int bMultiProc;                 /* Configured by L_C_MULTIPLE_PROCESSES */

  /* Sub-system handles */
  FileSystem *pFS;                /* On-disk portion of database */
  Database *pDatabase;            /* Database shared data */

  /* Client transaction context */
................................................................................
  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);
int lsmCheckpointOverflowRequired(lsm_db *pDb);
int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *);

................................................................................
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

int lsmCheckpointLoad(lsm_db *pDb, int *);
int lsmCheckpointLoadOk(lsm_db *pDb, int);

i64 lsmCheckpointId(u32 *, int);

i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
................................................................................
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
void lsmTreeClear(lsm_db *);
int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

void lsmTreeMakeOld(lsm_db *pDb, int *pnFlush);
void lsmTreeDiscardOld(lsm_db *pDb);
int lsmTreeHasOld(lsm_db *pDb);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeLoadHeader(lsm_db *pDb, int *);
int lsmTreeLoadHeaderOk(lsm_db *, int);
................................................................................
** End of functions from "lsm_file.c".
**************************************************************************/

/* 
** Functions from file "lsm_sorted.c".
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
int lsmSortedFlushTree(lsm_db *, int *);
void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);



void lsmSortedRemap(lsm_db *pDb);

void lsmSortedFreeLevel(lsm_env *pEnv, Level *);

int lsmSortedFlushDb(lsm_db *);
int lsmSortedAdvanceAll(lsm_db *pDb);
................................................................................
int lsmVarintLen32(int);
int lsmVarintSize(u8 c);

/* 
** Functions from file "main.c".
*/
void lsmLogMessage(lsm_db *, int, const char *, ...);
int lsmFlushToDisk(lsm_db *);

/*
** Functions from file "lsm_log.c".
*/
int lsmLogBegin(lsm_db *pDb);
int lsmLogWrite(lsm_db *, void *, int, void *, int);
int lsmLogCommit(lsm_db *);







|
|
|
<
>
|
|
<
<
<
<







 







>







 







|







 







>







 







|







 







<


>
>







 







<







38
39
40
41
42
43
44
45
46
47

48
49
50




51
52
53
54
55
56
57
...
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
...
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
...
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
...
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
...
670
671
672
673
674
675
676

677
678
679
680
681
682
683
684
685
686
687
...
727
728
729
730
731
732
733

734
735
736
737
738
739
740
# endif
#endif

/*
** Default values for various data structure parameters. These may be
** overridden by calls to lsm_config().
*/
#define LSM_DFLT_PAGE_SIZE       (4 * 1024)
#define LSM_DFLT_BLOCK_SIZE      (2 * 1024 * 1024)
#define LSM_DFLT_WRITE_BUFFER    (2 * 1024 * 1024)

#define LSM_DFLT_AUTOCHECKPOINT  (4 * 1024 * 1024)
#define LSM_DFLT_LOG_SIZE        (128*1024)
#define LSM_DFLT_NMERGE          4





/* Initial values for log file checksums. These are only used if the 
** database file does not contain a valid checkpoint.  */
#define LSM_CKSUM0_INIT 42
#define LSM_CKSUM1_INIT 42

#define LSM_META_PAGE_SIZE 4096
................................................................................
  int nMerge;                     /* Configured by LSM_CONFIG_NMERGE */
  int nLogSz;                     /* Configured by LSM_CONFIG_LOG_SIZE */
  int bUseLog;                    /* Configured by LSM_CONFIG_USE_LOG */
  int nDfltPgsz;                  /* Configured by LSM_CONFIG_PAGE_SIZE */
  int nDfltBlksz;                 /* Configured by LSM_CONFIG_BLOCK_SIZE */
  int nMaxFreelist;               /* Configured by LSM_CONFIG_MAX_FREELIST */
  int bMmap;                      /* Configured by LSM_CONFIG_MMAP */
  int nAutockpt;                  /* Configured by LSM_CONFIG_AUTOCHECKPOINT */
  int bMultiProc;                 /* Configured by L_C_MULTIPLE_PROCESSES */

  /* Sub-system handles */
  FileSystem *pFS;                /* On-disk portion of database */
  Database *pDatabase;            /* Database shared data */

  /* Client transaction context */
................................................................................
  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *, u32 *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);
int lsmCheckpointOverflowRequired(lsm_db *pDb);
int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *);

................................................................................
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

int lsmCheckpointLoad(lsm_db *pDb, int *);
int lsmCheckpointLoadOk(lsm_db *pDb, int);

i64 lsmCheckpointId(u32 *, int);
u32 lsmCheckpointNWrite(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
................................................................................
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
void lsmTreeClear(lsm_db *);
int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

void lsmTreeMakeOld(lsm_db *pDb);
void lsmTreeDiscardOld(lsm_db *pDb);
int lsmTreeHasOld(lsm_db *pDb);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeLoadHeader(lsm_db *pDb, int *);
int lsmTreeLoadHeaderOk(lsm_db *, int);
................................................................................
** End of functions from "lsm_file.c".
**************************************************************************/

/* 
** Functions from file "lsm_sorted.c".
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);

void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);

void lsmSortedFreeLevel(lsm_env *pEnv, Level *);

int lsmSortedFlushDb(lsm_db *);
int lsmSortedAdvanceAll(lsm_db *pDb);
................................................................................
int lsmVarintLen32(int);
int lsmVarintSize(u8 c);

/* 
** Functions from file "main.c".
*/
void lsmLogMessage(lsm_db *, int, const char *, ...);


/*
** Functions from file "lsm_log.c".
*/
int lsmLogBegin(lsm_db *pDb);
int lsmLogWrite(lsm_db *, void *, int, void *, int);
int lsmLogCommit(lsm_db *);

Changes to src/lsm_ckpt.c.

327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
...
382
383
384
385
386
387
388

389
390
391
392
393
394
395
...
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
...
706
707
708
709
710
711
712
713
714
715
716
717

718
719
720
721
722
723
724
725
726
....
1122
1123
1124
1125
1126
1127
1128







1129
1130
1131
1132
1133
1134
1135
....
1204
1205
1206
1207
1208
1209
1210








1211
1212
1213
1214
1215
1216
1217
  int *piOut, 
  int *pRc
){
  int iOut = *piOut;

  assert( iOut==CKPT_HDR_LO_MSW );

  if( bFlush && pDb->treehdr.iOldShmid ){
    i64 iOff = pDb->treehdr.iOldLog;
    ckptSetValue(p, iOut++, (iOff >> 32) & 0xFFFFFFFF, pRc);
    ckptSetValue(p, iOut++, (iOff & 0xFFFFFFFF), pRc);
    ckptSetValue(p, iOut++, pDb->treehdr.oldcksum0, pRc);
    ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc);
  }else{
    for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
................................................................................
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
  if( nOvfl>=0 ){
    nFree -=  nOvfl;
  }else{

    nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL];
  }

  /* Initialize the output buffer */
  memset(&ckpt, 0, sizeof(CkptBuffer));
  ckpt.pEnv = pDb->pEnv;
  iOut = CKPT_HDR_SIZE;
................................................................................
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
  }
  iOut += 2;
  assert( iOut<=1024 );

#if 0
  lsmLogMessage(pDb, rc, 
      "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl
  );
#endif

  *ppCkpt = (void *)ckpt.aCkpt;
  if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
................................................................................
  return rc;
}

/*
** The connection must be the worker in order to call this function.
**
** True is returned if there are currently too many free-list entries
** in-memory to store in a checkpoint. Before calling lsmCheckpointSaveWorker()
** to save the current worker snapshot, a new top-level LSM segment must
** be created so that some of them can be written to the LSM. 
*/
int lsmCheckpointOverflowRequired(lsm_db *pDb){

  assert( lsmShmAssertWorker(pDb) );
  return (pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist);
}

/*
** Connection pDb must be the worker to call this function.
**
** Load the FREELIST record from the database. Decode it and append the
** results to list pFreelist.
................................................................................
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;








  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);
................................................................................
    iId = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32);
    iId += ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
  }else{
    iId = ((i64)aCkpt[CKPT_HDR_ID_MSW] << 32) + (i64)aCkpt[CKPT_HDR_ID_LSW];
  }
  return iId;
}









i64 lsmCheckpointLogOffset(u32 *aCkpt){
  return ((i64)aCkpt[CKPT_HDR_LO_MSW] << 32) + (i64)aCkpt[CKPT_HDR_LO_LSW];
}

int lsmCheckpointPgsz(u32 *aCkpt){ return (int)aCkpt[CKPT_HDR_PGSZ]; }








|







 







>







 







|











|







 







|




>

|







 







>
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>







327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
...
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
...
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
...
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
....
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
....
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
  int *piOut, 
  int *pRc
){
  int iOut = *piOut;

  assert( iOut==CKPT_HDR_LO_MSW );

  if( bFlush ){
    i64 iOff = pDb->treehdr.iOldLog;
    ckptSetValue(p, iOut++, (iOff >> 32) & 0xFFFFFFFF, pRc);
    ckptSetValue(p, iOut++, (iOff & 0xFFFFFFFF), pRc);
    ckptSetValue(p, iOut++, pDb->treehdr.oldcksum0, pRc);
    ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc);
  }else{
    for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
................................................................................
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
  if( nOvfl>=0 ){
    nFree -=  nOvfl;
  }else{
    assert( 0 );
    nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL];
  }

  /* Initialize the output buffer */
  memset(&ckpt, 0, sizeof(CkptBuffer));
  ckpt.pEnv = pDb->pEnv;
  iOut = CKPT_HDR_SIZE;
................................................................................
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, (nOvfl?nOvfl:pSnap->nFreelistOvfl), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
  }
  iOut += 2;
  assert( iOut<=1024 );

#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, rc, 
      "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl
  );
#endif

  *ppCkpt = (void *)ckpt.aCkpt;
  if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
................................................................................
  return rc;
}

/*
** The connection must be the worker in order to call this function.
**
** True is returned if there are currently too many free-list entries
** in-memory to store in a checkpoint. Before calling CheckpointSaveWorker()
** to save the current worker snapshot, a new top-level LSM segment must
** be created so that some of them can be written to the LSM. 
*/
int lsmCheckpointOverflowRequired(lsm_db *pDb){
  Snapshot *p = pDb->pWorker;
  assert( lsmShmAssertWorker(pDb) );
  return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0);
}

/*
** Connection pDb must be the worker to call this function.
**
** Load the FREELIST record from the database. Decode it and append the
** results to list pFreelist.
................................................................................
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;

#if 0
if( bFlush ){
  printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1);
  fflush(stdout);
}
#endif
  assert( lsmFsIntegrityCheck(pDb) );
  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);
................................................................................
    iId = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32);
    iId += ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
  }else{
    iId = ((i64)aCkpt[CKPT_HDR_ID_MSW] << 32) + (i64)aCkpt[CKPT_HDR_ID_LSW];
  }
  return iId;
}

u32 lsmCheckpointNWrite(u32 *aCkpt, int bDisk){
  if( bDisk ){
    return lsmGetU32((u8 *)&aCkpt[CKPT_HDR_NWRITE]);
  }else{
    return aCkpt[CKPT_HDR_NWRITE];
  }
}

i64 lsmCheckpointLogOffset(u32 *aCkpt){
  return ((i64)aCkpt[CKPT_HDR_LO_MSW] << 32) + (i64)aCkpt[CKPT_HDR_LO_LSW];
}

int lsmCheckpointPgsz(u32 *aCkpt){ return (int)aCkpt[CKPT_HDR_PGSZ]; }

Changes to src/lsm_file.c.

385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
...
696
697
698
699
700
701
702
703
704
705

706
707
708
709
710
711
712

  nByte = sizeof(FileSystem) + nDb+1 + nDb+4+1;
  pFS = (FileSystem *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
  if( pFS ){
    LsmFile *pLsmFile;
    pFS->zDb = (char *)&pFS[1];
    pFS->zLog = &pFS->zDb[nDb+1];
    pFS->nPagesize = LSM_PAGE_SIZE;
    pFS->nBlocksize = LSM_BLOCK_SIZE;
    pFS->nMetasize = 4 * 1024;
    pFS->pDb = pDb;
    pFS->pEnv = pDb->pEnv;
    pFS->bUseMmap = pDb->bMmap;

    /* Make a copy of the database and log file names. */
    memcpy(pFS->zDb, zDb, nDb+1);
................................................................................
  int *pRc
){
  if( *pRc==LSM_OK && iSz>pFS->nMap ){
    Page *pFix;
    int rc;
    u8 *aOld = pFS->pMap;
    rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, iSz, &pFS->pMap, &pFS->nMap);
    if( rc==LSM_OK ){
      u8 *aData = (u8 *)pFS->pMap;
      for(pFix=pFS->pLruFirst; pFix; pFix=pFix->pLruNext){

        pFix->aData = &aData[pFS->nPagesize * (i64)(pFix->iPg-1)];
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}







|
|







 







|


>







385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
...
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713

  nByte = sizeof(FileSystem) + nDb+1 + nDb+4+1;
  pFS = (FileSystem *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
  if( pFS ){
    LsmFile *pLsmFile;
    pFS->zDb = (char *)&pFS[1];
    pFS->zLog = &pFS->zDb[nDb+1];
    pFS->nPagesize = LSM_DFLT_PAGE_SIZE;
    pFS->nBlocksize = LSM_DFLT_BLOCK_SIZE;
    pFS->nMetasize = 4 * 1024;
    pFS->pDb = pDb;
    pFS->pEnv = pDb->pEnv;
    pFS->bUseMmap = pDb->bMmap;

    /* Make a copy of the database and log file names. */
    memcpy(pFS->zDb, zDb, nDb+1);
................................................................................
  int *pRc
){
  if( *pRc==LSM_OK && iSz>pFS->nMap ){
    Page *pFix;
    int rc;
    u8 *aOld = pFS->pMap;
    rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, iSz, &pFS->pMap, &pFS->nMap);
    if( rc==LSM_OK && pFS->pMap!=aOld ){
      u8 *aData = (u8 *)pFS->pMap;
      for(pFix=pFS->pLruFirst; pFix; pFix=pFix->pLruNext){
        assert( &aOld[pFS->nPagesize * (i64)(pFix->iPg-1)]==pFix->aData );
        pFix->aData = &aData[pFS->nPagesize * (i64)(pFix->iPg-1)];
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}

Changes to src/lsm_log.c.

443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
** A call to this function deletes the LogWriter object allocated by
** lsmLogBegin(). If the transaction is being committed, the shared state
** in *pLog is updated before returning.
*/
void lsmLogEnd(lsm_db *pDb, int bCommit){
  DbLog *pLog;
  LogWriter *p;

  if( pDb->bUseLog==0 ) return;
  p = pDb->pLogWriter;
  pLog = &pDb->treehdr.log;

  if( bCommit ){
    pLog->aRegion[2].iEnd = p->iOff;
    pLog->cksum0 = p->cksum0;
    pLog->cksum1 = p->cksum1;
    if( p->iRegion1End ){







|
|
|







443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
** A call to this function deletes the LogWriter object allocated by
** lsmLogBegin(). If the transaction is being committed, the shared state
** in *pLog is updated before returning.
*/
void lsmLogEnd(lsm_db *pDb, int bCommit){
  DbLog *pLog;
  LogWriter *p;
  p = pDb->pLogWriter;

  if( p==0 ) return;
  pLog = &pDb->treehdr.log;

  if( bCommit ){
    pLog->aRegion[2].iEnd = p->iOff;
    pLog->cksum0 = p->cksum0;
    pLog->cksum1 = p->cksum1;
    if( p->iRegion1End ){

Changes to src/lsm_main.c.

69
70
71
72
73
74
75
76

77
78
79
80
81
82
83
84
85
86
87
88
89
90
...
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
...
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
...
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
...
281
282
283
284
285
286
287









288
289
290
291
292
293
294
...
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
...
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773

774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819

  /* Allocate the new database handle */
  *ppDb = pDb = (lsm_db *)lsmMallocZero(pEnv, sizeof(lsm_db));
  if( pDb==0 ) return LSM_NOMEM_BKPT;

  /* Initialize the new object */
  pDb->pEnv = pEnv;
  pDb->nTreeLimit = LSM_TREE_BYTES;

  pDb->bAutowork = 1;
  pDb->eSafety = LSM_SAFETY_NORMAL;
  pDb->xCmp = xCmp;
  pDb->nLogSz = LSM_DEFAULT_LOG_SIZE;
  pDb->nDfltPgsz = LSM_PAGE_SIZE;
  pDb->nDfltBlksz = LSM_BLOCK_SIZE;
  pDb->nMerge = LSM_DEFAULT_NMERGE;
  pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
  pDb->bUseLog = 1;
  pDb->iReader = -1;
  pDb->bMultiProc = 1;
  pDb->bMmap = LSM_IS_64_BIT;
  return LSM_OK;
}
................................................................................
*/
static void dbReleaseClientSnapshot(lsm_db *pDb){
  if( pDb->nTransOpen==0 && pDb->pCsr==0 ){
    lsmFinishReadTrans(pDb);
  }
}

static int dbAutoWork(lsm_db *pDb, int nUnit){
  int rc = LSM_OK;                /* Return code */

  assert( pDb->pWorker==0 );
  assert( pDb->bAutowork );
  assert( nUnit>0 );

  /* If one is required, run a checkpoint. */
#if 0
  rc = lsmCheckpointWrite(pDb);
#endif

  rc = lsmBeginWork(pDb);
  if( rc==LSM_OK ) rc = lsmSortedAutoWork(pDb, nUnit);
  if( pDb->pWorker && pDb->pWorker->pLevel ){
    lsmFinishWork(pDb, 0, -1, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  return rc;
}

static int getFullpathname(
  lsm_env *pEnv, 
  const char *zRel,
  char **pzAbs
){
  int nAlloc = 0;
  char *zAlloc = 0;
................................................................................

    lsmFree(pDb->pEnv, zFull);
  }

  return rc;
}

/*
** This function flushes the contents of the in-memory tree to disk. It
** returns LSM_OK if successful, or an error code otherwise.
*/
int lsmFlushToDisk(lsm_db *pDb){
  int rc = LSM_OK;                /* Return code */
  int nOvfl = 0;                  /* Number of free-list entries in LSM */

  /* Must not hold the worker snapshot when this is called. */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);

  /* Save the position of each open cursor belonging to pDb. */
  if( rc==LSM_OK ){
    rc = lsmSaveCursors(pDb);
  }

  if( rc==LSM_OK && pDb->bAutowork ){
    rc = lsmSortedAutoWork(pDb, LSM_AUTOWORK_QUANT);
  }
  while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
    rc = lsmSortedAutoWork(pDb, LSM_AUTOWORK_QUANT);
  }

  /* Write the contents of the in-memory tree into the database file and 
  ** update the worker snapshot accordingly. Then flush the contents of 
  ** the db file to disk too. No calls to fsync() are made here - just 
  ** write().  */
  if( rc==LSM_OK ) rc = lsmSortedFlushTree(pDb, &nOvfl);
  lsmFinishWork(pDb, 1, nOvfl, &rc);

  /* Restore the position of any open cursors */
  if( rc==LSM_OK && pDb->pCsr ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
    rc = lsmCheckpointLoad(pDb, 0);
    if( rc==LSM_OK ){
      rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
    }
    if( rc==LSM_OK ){
      rc = lsmRestoreCursors(pDb);
    }
  }

#if 0
  if( rc==LSM_OK ) lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "flush");
#endif

  return rc;
}

int lsm_close(lsm_db *pDb){
  int rc = LSM_OK;
  if( pDb ){
    assert_db_state(pDb);
    if( pDb->pCsr || pDb->nTransOpen ){
      rc = LSM_MISUSE_BKPT;
................................................................................
  int rc = LSM_OK;
  va_list ap;
  va_start(ap, eParam);

  switch( eParam ){
    case LSM_CONFIG_WRITE_BUFFER: {
      int *piVal = va_arg(ap, int *);
      if( *piVal>0 ){
        pDb->nTreeLimit = *piVal;
      }
      *piVal = pDb->nTreeLimit;
      break;
    }

    case LSM_CONFIG_AUTOWORK: {
................................................................................
      int *piVal = va_arg(ap, int *);
      if( *piVal>=0 ){
        pDb->bAutowork = *piVal;
      }
      *piVal = pDb->bAutowork;
      break;
    }










    case LSM_CONFIG_LOG_SIZE: {
      int *piVal = va_arg(ap, int *);
      if( *piVal>0 ){
        pDb->nLogSz = *piVal;
      }
      *piVal = pDb->nLogSz;
................................................................................
    }

    nBefore = lsmTreeSize(pDb);
    rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
    nAfter = lsmTreeSize(pDb);
    nDiff = (nAfter/nQuant) - (nBefore/nQuant);
    if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
      rc = dbAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }
  }

  /* If a transaction was opened at the start of this function, commit it. 
  ** Or, if an error has occurred, roll it back.
  */
  if( bCommit ){
    if( rc==LSM_OK ){
      rc = lsm_commit(pDb, 0);
    }else{
      lsm_rollback(pDb, 0);
    }
  }
................................................................................
    }
  }

  return rc;
}

int lsm_commit(lsm_db *pDb, int iLevel){
  int nFlush = 0;                 /* Number of flushable trees in memory */
  int rc = LSM_OK;

  assert_db_state( pDb );

  /* A value less than zero means close the innermost nested transaction. */
  if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);

  if( iLevel<pDb->nTransOpen ){
    if( iLevel==0 ){


      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }

      if( lsmTreeSize(pDb)>pDb->nTreeLimit ){
        lsmTreeMakeOld(pDb, &nFlush);
      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
    pDb->nTransOpen = iLevel;

  }
  dbReleaseClientSnapshot(pDb);

  /* If nFlush is not zero and auto-work is enabled, flush the tree to disk.
  **
  ** If auto-work is enabled and data was written to disk, also sync the 
  ** db and checkpoint the latest snapshot.
  **
  ** Ignore any LSM_BUSY errors that occur during these operations. If
  ** LSM_BUSY does occur, it means some other connection is already working
  ** on flushing the in-memory tree or checkpointing the database. 
  */
  assert( rc!=LSM_BUSY);
  if( rc==LSM_OK ){
    if( nFlush && pDb->bAutowork ){
      rc = lsmFlushToDisk(pDb);
      if( rc==LSM_OK && pDb->bAutowork ){
        rc = lsmCheckpointWrite(pDb);
      }
    }else if( nFlush && pDb->xWork ){
      pDb->xWork(pDb, pDb->pWorkCtx);
    }
  }
  if( rc==LSM_BUSY ) rc = LSM_OK;

  return rc;
}

int lsm_rollback(lsm_db *pDb, int iLevel){
  int rc = LSM_OK;
  assert_db_state( pDb );








|
>



|
|
|
|







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|







 







>
>
>
>
>
>
>
>
>







 







|
<




|
<







 







<









>






<
<
<
<



|
<

<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
...
101
102
103
104
105
106
107























108
109
110
111
112
113
114
...
168
169
170
171
172
173
174


















































175
176
177
178
179
180
181
...
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
...
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
...
511
512
513
514
515
516
517
518

519
520
521
522
523

524
525
526
527
528
529
530
...
692
693
694
695
696
697
698

699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714




715
716
717
718

719























720
721
722
723
724
725
726

  /* Allocate the new database handle */
  *ppDb = pDb = (lsm_db *)lsmMallocZero(pEnv, sizeof(lsm_db));
  if( pDb==0 ) return LSM_NOMEM_BKPT;

  /* Initialize the new object */
  pDb->pEnv = pEnv;
  pDb->nTreeLimit = LSM_DFLT_WRITE_BUFFER;
  pDb->nAutockpt = LSM_DFLT_AUTOCHECKPOINT;
  pDb->bAutowork = 1;
  pDb->eSafety = LSM_SAFETY_NORMAL;
  pDb->xCmp = xCmp;
  pDb->nLogSz = LSM_DFLT_LOG_SIZE;
  pDb->nDfltPgsz = LSM_DFLT_PAGE_SIZE;
  pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE;
  pDb->nMerge = LSM_DFLT_NMERGE;
  pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES;
  pDb->bUseLog = 1;
  pDb->iReader = -1;
  pDb->bMultiProc = 1;
  pDb->bMmap = LSM_IS_64_BIT;
  return LSM_OK;
}
................................................................................
*/
static void dbReleaseClientSnapshot(lsm_db *pDb){
  if( pDb->nTransOpen==0 && pDb->pCsr==0 ){
    lsmFinishReadTrans(pDb);
  }
}
























static int getFullpathname(
  lsm_env *pEnv, 
  const char *zRel,
  char **pzAbs
){
  int nAlloc = 0;
  char *zAlloc = 0;
................................................................................

    lsmFree(pDb->pEnv, zFull);
  }

  return rc;
}




















































int lsm_close(lsm_db *pDb){
  int rc = LSM_OK;
  if( pDb ){
    assert_db_state(pDb);
    if( pDb->pCsr || pDb->nTransOpen ){
      rc = LSM_MISUSE_BKPT;
................................................................................
  int rc = LSM_OK;
  va_list ap;
  va_start(ap, eParam);

  switch( eParam ){
    case LSM_CONFIG_WRITE_BUFFER: {
      int *piVal = va_arg(ap, int *);
      if( *piVal>=0 ){
        pDb->nTreeLimit = *piVal;
      }
      *piVal = pDb->nTreeLimit;
      break;
    }

    case LSM_CONFIG_AUTOWORK: {
................................................................................
      int *piVal = va_arg(ap, int *);
      if( *piVal>=0 ){
        pDb->bAutowork = *piVal;
      }
      *piVal = pDb->bAutowork;
      break;
    }

    case LSM_CONFIG_AUTOCHECKPOINT: {
      int *piVal = va_arg(ap, int *);
      if( *piVal>=0 ){
        pDb->nAutockpt = *piVal;
      }
      *piVal = pDb->nAutockpt;
      break;
    }

    case LSM_CONFIG_LOG_SIZE: {
      int *piVal = va_arg(ap, int *);
      if( *piVal>0 ){
        pDb->nLogSz = *piVal;
      }
      *piVal = pDb->nLogSz;
................................................................................
    }

    nBefore = lsmTreeSize(pDb);
    rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
    nAfter = lsmTreeSize(pDb);
    nDiff = (nAfter/nQuant) - (nBefore/nQuant);
    if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){
      rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT);

    }
  }

  /* If a transaction was opened at the start of this function, commit it. 
  ** Or, if an error has occurred, roll it back.  */

  if( bCommit ){
    if( rc==LSM_OK ){
      rc = lsm_commit(pDb, 0);
    }else{
      lsm_rollback(pDb, 0);
    }
  }
................................................................................
    }
  }

  return rc;
}

int lsm_commit(lsm_db *pDb, int iLevel){

  int rc = LSM_OK;

  assert_db_state( pDb );

  /* A value less than zero means close the innermost nested transaction. */
  if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);

  if( iLevel<pDb->nTransOpen ){
    if( iLevel==0 ){
      int bAutowork = 0;

      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }




      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
    pDb->nTransOpen = iLevel;
  }

  dbReleaseClientSnapshot(pDb);























  return rc;
}

int lsm_rollback(lsm_db *pDb, int iLevel){
  int rc = LSM_OK;
  assert_db_state( pDb );

Changes to src/lsm_shared.c.

85
86
87
88
89
90
91

92

93
94
95
96
97
98
99
...
171
172
173
174
175
176
177
178
179

180

181


182
183
184
185
186
187
188
189
190


191
192
193
194
195
196
197
...
491
492
493
494
495
496
497



498
499
500
501
502
503
504
...
527
528
529
530
531
532
533
534
535

536
537
538
539
540
541
542
...
555
556
557
558
559
560
561

562
563
564
565
566
567
568
...
572
573
574
575
576
577
578

579
580



581
582
583
584
585
586
587
588

589
590
591
592
593
594
595
...
616
617
618
619
620
621
622



623
624
625
626
627
628
629
...
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
...
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
...
688
689
690
691
692
693
694
695

696












697
698
699
700
701
702
703
...
781
782
783
784
785
786
787



788




789




790



791
792
793
794
795
796
797
798
...
811
812
813
814
815
816
817

818
819
820
821
822
823
824
825
826
827
...
842
843
844
845
846
847
848

849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865



866
867
868
869
870
871
872
....
1193
1194
1195
1196
1197
1198
1199



1200



1201














/*
** Append an entry to the free-list.
*/
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){

  /* Assert that this is not an attempt to insert a duplicate block number */

  assertNotInFreelist(p, iBlk);


  /* Extend the space allocated for the freelist, if required */
  assert( p->nAlloc>=p->nEntry );
  if( p->nAlloc==p->nEntry ){
    int nNew; 
    FreelistEntry *aNew;

................................................................................
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0);
    if( rc==LSM_OK ){
      /* Flush the in-memory tree, if required. If there is data to flush,
      ** this will create a new client snapshot in Database.pClient. The
      ** checkpoint (serialization) of this snapshot may be written to disk
      ** by the following block.  */
      rc = lsmTreeLoadHeader(pDb, 0);
      if( rc==LSM_OK && lsmTreeSize(pDb)>0 ){
        int nFlush = 0;

        lsmTreeMakeOld(pDb, &nFlush);

        if( nFlush ) rc = lsmFlushToDisk(pDb);


      }

      /* Write a checkpoint to disk. */
      if( rc==LSM_OK ){
        rc = lsmCheckpointWrite(pDb);
      }

      /* If the checkpoint was written successfully, delete the log file */
      if( rc==LSM_OK && pDb->pFS ){


        Database *p = pDb->pDatabase;
        lsmFsCloseAndDeleteLog(pDb->pFS);
        if( p->pFile ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
      }
    }
  }

................................................................................
** LSM_NOMEM).
*/
int lsmBlockFree(lsm_db *pDb, int iBlk){
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );
  /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */




  return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId);
}

/*
** Refree a database block. The worker snapshot must be held in order to call 
** this function.
................................................................................
** database itself.
**
** The WORKER lock must not be held when this is called. This is because
** this function may indirectly call fsync(). And the WORKER lock should
** not be held that long (in case it is required by a client flushing an
** in-memory tree to disk).
*/
int lsmCheckpointWrite(lsm_db *pDb){
  int rc;                         /* Return Code */


  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

  rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
  if( rc!=LSM_OK ) return rc;
................................................................................
      i64 iCkpt;                  /* Id of checkpoint just loaded */
      i64 iDisk;                  /* Id of checkpoint already stored in db */
      iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
      rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
      if( rc==LSM_OK ){
        aData = lsmFsMetaPageData(pPg, &nData);
        iDisk = lsmCheckpointId((u32 *)aData, 1);

        lsmFsMetaPageRelease(pPg);
      }
      bDone = (iDisk>=iCkpt);
    }

    if( rc==LSM_OK && bDone==0 ){
      int iMeta = (pShm->iMetaPage % 2) + 1;
................................................................................
      if( pDb->eSafety!=LSM_SAFETY_OFF ){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS);
      }

      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
#if 0



  lsmLogMessage(pDb, 0, "finish checkpoint %d", 
      (int)lsmCheckpointId(pDb->aSnapshot, 0)
  );
#endif
    }
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);

  return rc;
}

int lsmBeginWork(lsm_db *pDb){
  int rc;

  /* Attempt to take the WORKER lock */
................................................................................
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
  /* If no error has occurred, serialize the worker snapshot and write
  ** it to shared memory.  */



  if( *pRc==LSM_OK ){
    *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
................................................................................
}

/*
** Begin a read transaction. This function is a no-op if the connection
** passed as the only argument already has an open read transaction.
*/
int lsmBeginReadTrans(lsm_db *pDb){
  const int MAX_READLOCK_ATTEMPTS = 5;
  int rc = LSM_OK;                /* Return code */
  int iAttempt = 0;

  assert( pDb->pWorker==0 );
  assert( (pDb->pClient!=0)==(pDb->iReader>=0) );

  while( rc==LSM_OK && pDb->pClient==0 && (iAttempt++)<MAX_READLOCK_ATTEMPTS ){
................................................................................
    /* Take a read-lock on the tree and snapshot just loaded. Then check
    ** that the shared-memory still contains the same values. If so, proceed.
    ** Otherwise, relinquish the read-lock and retry the whole procedure
    ** (starting with loading the in-memory tree header).  */
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      u32 iShmMax = pDb->treehdr.iUsedShmid;
      u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk;
      rc = lsmReadlock(
          pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax
      );
      if( rc==LSM_OK ){
        if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
         && lsmCheckpointLoadOk(pDb, iSnap)
        ){
................................................................................
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
          assert( pDb->iReader>=0 );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;

    }












  }
  if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;

  return rc;
}

/*
................................................................................
** transaction was rolled back, both the log file and in-memory tree 
** structure have already been restored. In either case, this function 
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){



  lsmLogEnd(pDb, bCommit);




  lsmTreeEndTransaction(pDb, bCommit);




  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);



  return LSM_OK;
}


/*
** Return non-zero if the caller is holding the client mutex.
*/
#ifdef LSM_DEBUG
................................................................................

/*
** Obtain a read-lock on database version identified by the combination
** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
** an LSM error code otherwise.
*/
int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){

  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  assert( db->iReader<0 );
  assert( shm_sequence_ge(iShmMax, iShmMin) );

  /* Search for an exact match. */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
................................................................................
    if( rc==LSM_BUSY ){
      rc = LSM_OK;
    }else{
      ShmReader *p = &pShm->aReader[i];
      p->iLsmId = iLsm;
      p->iTreeId = iShmMax;
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);

      if( rc==LSM_OK ) db->iReader = i;
    }
  }

  /* Search for any usable slot */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( slotIsUsable(p, iLsm, iShmMax, iShmMax) ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
      if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMax, iShmMax) ){
        db->iReader = i;
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }




  return rc;
}

/*
** This is used to check if there exists a read-lock locking a particular
** version of either the in-memory tree or database file. 
**
................................................................................
}
#endif

void lsmShmBarrier(lsm_db *db){
  lsmEnvShmBarrier(db->pEnv);
}





























>

>







 







|
|
>
|
>
|
>
>




|



|
>
>







 







>
>
>







 







|

>







 







>







 







>
|
<
>
>
>








>







 







>
>
>







 







|







 







|







 







|
>
|
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>

>
>
>
>

>
>
>
>

>
>
>
|







 







>


<







 







>







|

|







>
>
>







 







>
>
>

>
>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
...
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
...
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
...
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
...
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
...
585
586
587
588
589
590
591
592
593

594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
...
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
...
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
...
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
...
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
...
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
...
858
859
860
861
862
863
864
865
866
867

868
869
870
871
872
873
874
...
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
....
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271

/*
** Append an entry to the free-list.
*/
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){

  /* Assert that this is not an attempt to insert a duplicate block number */
#if 0
  assertNotInFreelist(p, iBlk);
#endif

  /* Extend the space allocated for the freelist, if required */
  assert( p->nAlloc>=p->nEntry );
  if( p->nAlloc==p->nEntry ){
    int nNew; 
    FreelistEntry *aNew;

................................................................................
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0);
    if( rc==LSM_OK ){
      /* Flush the in-memory tree, if required. If there is data to flush,
      ** this will create a new client snapshot in Database.pClient. The
      ** checkpoint (serialization) of this snapshot may be written to disk
      ** by the following block.  */
      rc = lsmTreeLoadHeader(pDb, 0);
      if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
        assert( pDb->nTransOpen==0 );
        pDb->nTransOpen = 1;
        lsmTreeMakeOld(pDb);
        if( pDb->treehdr.iOldShmid ){
          rc = lsmFlushTreeToDisk(pDb);
        }
        pDb->nTransOpen = 0;
      }

      /* Write a checkpoint to disk. */
      if( rc==LSM_OK ){
        rc = lsmCheckpointWrite(pDb, 0);
      }

      /* If the checkpoint was written successfully, delete the log file */
      if( rc==LSM_OK && pDb->pFS 
       && pDb->treehdr.iOldShmid==0 && pDb->treehdr.nByte==0 
      ){
        Database *p = pDb->pDatabase;
        lsmFsCloseAndDeleteLog(pDb->pFS);
        if( p->pFile ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
      }
    }
  }

................................................................................
** LSM_NOMEM).
*/
int lsmBlockFree(lsm_db *pDb, int iBlk){
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );
  /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */
#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk);
#endif

  return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId);
}

/*
** Refree a database block. The worker snapshot must be held in order to call 
** this function.
................................................................................
** database itself.
**
** The WORKER lock must not be held when this is called. This is because
** this function may indirectly call fsync(). And the WORKER lock should
** not be held that long (in case it is required by a client flushing an
** in-memory tree to disk).
*/
int lsmCheckpointWrite(lsm_db *pDb, u32 *pnWrite){
  int rc;                         /* Return Code */
  u32 nWrite = 0;

  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

  rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
  if( rc!=LSM_OK ) return rc;
................................................................................
      i64 iCkpt;                  /* Id of checkpoint just loaded */
      i64 iDisk;                  /* Id of checkpoint already stored in db */
      iCkpt = lsmCheckpointId(pDb->aSnapshot, 0);
      rc = lsmFsMetaPageGet(pDb->pFS, 0, pShm->iMetaPage, &pPg);
      if( rc==LSM_OK ){
        aData = lsmFsMetaPageData(pPg, &nData);
        iDisk = lsmCheckpointId((u32 *)aData, 1);
        nWrite = lsmCheckpointNWrite((u32 *)aData, 1);
        lsmFsMetaPageRelease(pPg);
      }
      bDone = (iDisk>=iCkpt);
    }

    if( rc==LSM_OK && bDone==0 ){
      int iMeta = (pShm->iMetaPage % 2) + 1;
................................................................................
      if( pDb->eSafety!=LSM_SAFETY_OFF ){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ){
        pShm->iMetaPage = iMeta;

        nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
      }
#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, 0, "finish checkpoint %d", 
      (int)lsmCheckpointId(pDb->aSnapshot, 0)
  );
#endif
    }
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
  int rc;

  /* Attempt to take the WORKER lock */
................................................................................
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
  /* If no error has occurred, serialize the worker snapshot and write
  ** it to shared memory.  */

  assert( pDb->pWorker );
  assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 );
  if( *pRc==LSM_OK ){
    *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
................................................................................
}

/*
** Begin a read transaction. This function is a no-op if the connection
** passed as the only argument already has an open read transaction.
*/
int lsmBeginReadTrans(lsm_db *pDb){
  const int MAX_READLOCK_ATTEMPTS = 10;
  int rc = LSM_OK;                /* Return code */
  int iAttempt = 0;

  assert( pDb->pWorker==0 );
  assert( (pDb->pClient!=0)==(pDb->iReader>=0) );

  while( rc==LSM_OK && pDb->pClient==0 && (iAttempt++)<MAX_READLOCK_ATTEMPTS ){
................................................................................
    /* Take a read-lock on the tree and snapshot just loaded. Then check
    ** that the shared-memory still contains the same values. If so, proceed.
    ** Otherwise, relinquish the read-lock and retry the whole procedure
    ** (starting with loading the in-memory tree header).  */
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      u32 iShmMax = pDb->treehdr.iUsedShmid;
      u32 iShmMin = pDb->treehdr.iNextShmid+1-(1<<10);
      rc = lsmReadlock(
          pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax
      );
      if( rc==LSM_OK ){
        if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
         && lsmCheckpointLoadOk(pDb, iSnap)
        ){
................................................................................
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
          assert( pDb->iReader>=0 );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
#if 0
if( rc==LSM_OK && pDb->pClient ){
  printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
      (void *)pDb,
      (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, 
      (int)pDb->treehdr.root.iTransId,
      (int)pDb->treehdr.iOldShmid
  );
  fflush(stdout);
}
#endif
  }
  if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;

  return rc;
}

/*
................................................................................
** transaction was rolled back, both the log file and in-memory tree 
** structure have already been restored. In either case, this function 
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
  int rc = LSM_OK;
  int bFlush = 0;

  lsmLogEnd(pDb, bCommit);
  if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){
    bFlush = 1;
    lsmTreeMakeOld(pDb);
  }
  lsmTreeEndTransaction(pDb, bCommit);

  if( rc==LSM_OK && bFlush && pDb->bAutowork ){
    rc = lsmSortedAutoWork(pDb, 1);
  }
  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  if( bFlush && pDb->bAutowork==0 && pDb->xWork ){
    pDb->xWork(pDb, pDb->pWorkCtx);
  }
  return rc;
}


/*
** Return non-zero if the caller is holding the client mutex.
*/
#ifdef LSM_DEBUG
................................................................................

/*
** Obtain a read-lock on database version identified by the combination
** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or
** an LSM error code otherwise.
*/
int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){
  int rc = LSM_OK;
  ShmHeader *pShm = db->pShmhdr;
  int i;


  assert( db->iReader<0 );
  assert( shm_sequence_ge(iShmMax, iShmMin) );

  /* Search for an exact match. */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
................................................................................
    if( rc==LSM_BUSY ){
      rc = LSM_OK;
    }else{
      ShmReader *p = &pShm->aReader[i];
      p->iLsmId = iLsm;
      p->iTreeId = iShmMax;
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
      assert( rc!=LSM_BUSY );
      if( rc==LSM_OK ) db->iReader = i;
    }
  }

  /* Search for any usable slot */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
      if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMin, iShmMax) ){
        db->iReader = i;
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }

  if( rc==LSM_OK && db->iReader<0 ){
    rc = LSM_BUSY;
  }
  return rc;
}

/*
** This is used to check if there exists a read-lock locking a particular
** version of either the in-memory tree or database file. 
**
................................................................................
}
#endif

void lsmShmBarrier(lsm_db *db){
  lsmEnvShmBarrier(db->pEnv);
}

int lsm_checkpoint(lsm_db *pDb, int *pnByte){
  int rc;                         /* Return code */
  u32 nWrite = 0;                 /* Number of pages checkpointed */

  /* Attempt the checkpoint. If successful, nWrite is set to the number of
  ** pages written between this and the previous checkpoint.  */
  rc = lsmCheckpointWrite(pDb, &nWrite);

  /* If required, calculate the output variable (bytes of data checkpointed). 
  ** Set it to zero if an error occured.  */
  if( pnByte ){
    int nByte = 0;
    if( rc==LSM_OK && nWrite ){
      nByte = (int)nWrite * lsmFsPageSize(pDb->pFS);
    }
    *pnByte = nByte;
  }

  return rc;
}

Changes to src/lsm_sorted.c.

322
323
324
325
326
327
328



329

330
331
332
333
334
335
336
...
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
...
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
....
3507
3508
3509
3510
3511
3512
3513
3514

3515
3516
3517
3518
3519
3520
3521

3522
3523
3524
3525
3526
3527
3528
....
3582
3583
3584
3585
3586
3587
3588

3589
3590
3591
3592
3593
3594
3595
....
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
....
3663
3664
3665
3666
3667
3668
3669









3670
3671
3672
3673
3674
3675
3676

3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687

3688
3689
3690
3691
3692
3693
3694
....
3828
3829
3830
3831
3832
3833
3834





















































































3835






3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848

3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
3913
3914
3915
3916
3917
3918
3919
3920
....
3976
3977
3978
3979
3980
3981
3982







3983
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994


3995
3996
3997
3998
3999
4000
4001
4002
....
4035
4036
4037
4038
4039
4040
4041
















































































































































































4042
4043
4044
4045
4046
4047
4048
....
4049
4050
4051
4052
4053
4054
4055
4056
4057
4058

4059
4060
4061

4062
4063
4064
4065
4066
4067
4068
4069







4070






4071
4072












4073
4074
4075

4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094

4095
4096
4097
4098
4099

4100
4101
4102
4103
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143
4144
4145
4146
4147
4148
4149
4150
4151
4152
4153
4154
4155
4156
4157
4158
4159
4160
4161
4162
4163
4164
4165
4166
4167
4168
....
4531
4532
4533
4534
4535
4536
4537
4538


4539
4540
4541
4542
4543
4544
4545
....
4593
4594
4595
4596
4597
4598
4599














































4600
4601
4602
4603
4604
4605
4606
  Page *pPage;                    /* Current output page */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
static int assertBtreeOk(lsm_db *, Segment *);



#endif


struct FilePage { u8 *aData; int nData; };
static u8 *fsPageData(Page *pPg, int *pnData){
  *pnData = ((struct FilePage *)(pPg))->nData;
  return ((struct FilePage *)(pPg))->aData;
}
/*UNUSED static u8 *fsPageDataPtr(Page *pPg){
................................................................................
static int btreeCursorLoadKey(BtreeCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->iPg<0 ){
    pCsr->pKey = 0;
    pCsr->nKey = 0;
    pCsr->eType = 0;
  }else{
    int iPg;
    for(iPg=pCsr->iPg; iPg>=0; iPg--){
      int iCell = pCsr->aPg[pCsr->iPg].iCell;
      if( iCell>=0 ){
        int dummy;
        rc = pageGetBtreeKey(
            pCsr->aPg[pCsr->iPg].pPage, pCsr->aPg[pCsr->iPg].iCell,
            &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
        );
        pCsr->eType |= SORTED_SEPARATOR;
        break;
      }
    }

    if( iPg<0 ) rc = LSM_CORRUPT_BKPT;
  }

  return rc;
}

static int btreeCursorPtr(u8 *aData, int nData, int iCell){
  int nCell;
................................................................................
  int nData;

  assert( pCsr->iPg>=0 );
  assert( pCsr->iPg==pCsr->nDepth-1 );

  aData = fsPageData(pPg->pPage, &nData);
  nCell = pageGetNRec(aData, nData);

  assert( pPg->iCell<=nCell );

  pPg->iCell++;
  if( pPg->iCell==nCell ){
    Pgno iLoad;

    /* Up to parent. */
    lsmFsPageRelease(pPg->pPage);
    pPg->pPage = 0;
................................................................................
    pDb->xWork(pDb, pDb->pWorkCtx);
  }
}

static int sortedNewToplevel(
  lsm_db *pDb,                    /* Connection handle */
  int bTree,                      /* True to store contents of in-memory tree */
  int *pnOvfl                     /* OUT: Number of free-list entries stored */

){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int iLeftPtr = 0;


  assert( pnOvfl );

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);

................................................................................

    /* Do the work to create the new merged segment on disk */
    if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }


    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->pMerge = 0;
  }

  /* Link the new level into the top of the tree. */
  if( rc==LSM_OK ){
    if( pDel ){
................................................................................
    sortedFreeLevel(pDb->pEnv, pNew);
  }

  if( rc==LSM_OK ){
    sortedInvokeWorkHook(pDb);
  }

  return rc;
}

/*
** Flush the contents of the in-memory tree to a new segment on disk.
** At present, this may occur in two scenarios:
**
**   1. When a transaction has just been committed (by connection pDb), 
**      and the in-memory tree has exceeded the size threshold, or
**
**   2. If the in-memory tree is not empty and the last connection to
**      the database (pDb) is being closed.
**
** In both cases, the connection hold a worker snapshot reference. In
** the first, the connection also holds the in-memory tree write-version.
** In the second, no in-memory tree version reference is held at all.
*/
int lsmSortedFlushTree(
  lsm_db *pDb,                    /* Connection handle */
  int *pnOvfl                     /* OUT: Number of free-list entries written */
){
  int rc;

  assert( pDb->pWorker );

  /* If there is nothing to do, return early. */
  if( lsmTreeHasOld(pDb)==0 && lsmCheckpointOverflowRequired(pDb)==0 ){
    *pnOvfl = 0;
    return LSM_OK;
  }

  rc = sortedNewToplevel(pDb, 1, pnOvfl);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush");
#endif
#if 0
  lsmLogMessage(pDb, rc, "flushed tree to disk");
#endif
  return rc;
}

/*
** The nMerge levels in the LSM beginning with pLevel consist of a
** left-hand-side segment only. Replace these levels with a single new
................................................................................
  Level **ppNew                   /* New, merged, level */
){
  int rc = LSM_OK;                /* Return Code */
  Level *pNew;                    /* New Level object */
  int bUseNext = 0;               /* True to link in next separators */
  Merge *pMerge;                  /* New Merge object */
  int nByte;                      /* Bytes of space allocated at pMerge */










  /* Allocate the new Level object */
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, 
                                        nMerge * sizeof(Segment), &rc);
  }


  /* Populate the new Level object */
  if( rc==LSM_OK ){
    Level *pNext = 0;             /* Level following pNew */
    int i;
    Level *pTopLevel;
    Level *p = pLevel;
    Level **pp;
    pNew->nRight = nMerge;
    pNew->iAge = pLevel->iAge+1;
    for(i=0; i<nMerge; i++){

      pNext = p->pNext;
      pNew->aRhs[i] = p->lhs;
      sortedFreeLevel(pDb->pEnv, p);
      p = pNext;
    }

    /* Replace the old levels with the new. */
................................................................................
    }
    pCsr->flags |= CURSOR_NEXT_OK;
  }

  return rc;
}






















































































static int sortedWork(lsm_db *pDb, int nWork, int bOptimize, int *pnWrite){






  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;

  assert( lsmFsIntegrityCheck(pDb) );
  assert( pWorker );

  if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;

  while( nRemaining>0 ){
    Level *pLevel;
    Level *pTopLevel = lsmDbSnapshotLevel(pWorker);


    /* Find the longest contiguous run of levels not currently undergoing a 
    ** merge with the same age in the structure. Or the level being merged
    ** with the largest number of right-hand segments. Work on it.  */
    Level *pBest = 0;
    int nBest = pDb->nMerge;

    Level *pThis = 0;
    int nThis = 0;

    for(pLevel = pTopLevel; pLevel; pLevel=pLevel->pNext){
      if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
        nThis++;
      }else{
        if( nThis>=nBest ){
          pBest = pThis;
          nBest = nThis;
        }
        if( pLevel->nRight ){
          if( pLevel->nRight>=nBest ){
            nBest = pLevel->nRight;
            pBest = pLevel;
            nThis = 0;
            pThis = 0;
          }
        }else{
          pThis = pLevel;
          nThis = 1;
        }
      }
    }
    if( nThis>nBest ){
      assert( pThis );
      pBest = pThis;
      nBest = nThis;
    }

    if( pBest==0 && bOptimize && pTopLevel->pNext ){
      pBest = pTopLevel;
      nBest = 2;
    }

    if( pBest ){
      if( pBest->nRight==0 ){
        rc = sortedMergeSetup(pDb, pBest, nBest, &pLevel);
      }else{
        pLevel = pBest;
      }
    }

    if( pLevel==0 ){
      /* Could not find any work to do. Finished. */
      break;
    }else{
      MergeWorker mergeworker;    /* State used to work on the level merge */

      rc = mergeWorkerInit(pDb, pLevel, &mergeworker);

      assert( mergeworker.nWork==0 );
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && mergeworker.nWork<nRemaining 
      ){
        rc = mergeWorkerStep(&mergeworker);
      }
      nRemaining -= mergeworker.nWork;

      /* Check if the merge operation is completely finished. If so, the
      ** Merge object and the right-hand-side of the level can be deleted. 
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);








#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work");
#endif

    }
  }

  if( pnWrite ){
    *pnWrite = (nWork - nRemaining);
  }
  pWorker->nWrite += (nWork - nRemaining);



  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  return rc;
}

typedef struct Metric Metric;
struct Metric {
  double fAvgHeight;
  int nTotalSz;
................................................................................
        log((double)nTotalSz / nMinSz) / log(2),
        nTotalSz,
        nMinSz
    );
  }
}
#endif

















































































































































































/*
** This function is called in auto-work mode to perform merging work on
** the data structure. It performs enough merging work to prevent the
** height of the tree from growing indefinitely assuming that roughly
** nUnit database pages worth of data have been written to the database
** (i.e. the in-memory tree) since the last call.
................................................................................
*/
int lsmSortedAutoWork(
  lsm_db *pDb,                    /* Database handle */
  int nUnit                       /* Pages of data written to in-memory tree */
){
  int rc;                         /* Return code */
  int nRemaining;                 /* Units of work to do before returning */
  int nDepth;                     /* Current height of tree (longest path) */
  int nWrite;                     /* Pages written */
  Level *pLevel;                  /* Used to iterate through levels */


  assert( lsmFsIntegrityCheck(pDb) );
  assert( pDb->pWorker );


  /* Determine how many units of work to do before returning. One unit of
  ** work is achieved by writing one page (~4KB) of merged data.  */
  nRemaining = nDepth = 0;
  for(pLevel=lsmDbSnapshotLevel(pDb->pWorker); pLevel; pLevel=pLevel->pNext){
    /* nDepth += LSM_MAX(1, pLevel->nRight); */
    nDepth += 1;
  }







  nRemaining = nUnit * nDepth;







  rc = sortedWork(pDb, nRemaining, 0, &nWrite);












#if 0
  lsmLogMessage(pDb, 0, "auto-work: %d pages", nWrite);
#endif

  return rc;
}

/*
** Perform work to merge database segments together.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite){
  int rc = LSM_OK;                /* Return code */
  int nOvfl = 0;
  int bFlush = 0;
  int bFinishWork = 0;
  int nWrite = 0;

  /* This function may not be called if pDb has an open read or write
  ** transaction. Return LSM_MISUSE if an application attempts this.  */
  if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;

  if( (flags & LSM_WORK_FLUSH) && (flags & LSM_WORK_OPTIMIZE) ){
    rc = lsmBeginWriteTrans(pDb);

    if( rc==LSM_OK ){
      int nDummy;
      lsmTreeMakeOld(pDb, &nDummy);
      lsmFinishWriteTrans(pDb, 1);
      lsmFinishReadTrans(pDb);

    }
    if( rc==LSM_BUSY ) rc = LSM_OK;
  }

  assert( pDb->pWorker==0 );
  if( (flags & LSM_WORK_FLUSH) || nPage>0 ){
    rc = lsmBeginWork(pDb);
    bFinishWork = 1;
  }

  /* If the FLUSH flag is set, try to flush the contents of the in-memory
  ** tree to disk.  */
  if( rc==LSM_OK && ((flags & LSM_WORK_FLUSH)) ){
    rc = lsmTreeLoadHeader(pDb, 0);
    if( rc==LSM_OK 
     && pDb->treehdr.iOldShmid 
     && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff 
    ){
      rc = lsmSortedFlushTree(pDb, &nOvfl);
      bFlush = 1;
    }
  }

  /* If nPage is greater than zero, do some merging. */
  if( rc==LSM_OK && nPage>0 ){
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nPage, bOptimize, &nWrite);
    if( rc==LSM_OK && nWrite ){
#if 0
  {
    char *z = 0;
    lsmInfoFreelist(pDb, &z);
    lsmLogMessage(pDb, 0, "work: %d pages", nWrite);
    lsmLogMessage(pDb, 0, "freelist: %s", z);
    lsm_free(lsm_get_env(pDb), z);
  }
#endif
      rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
        nOvfl = -1;
        rc = sortedNewToplevel(pDb, 0, &nOvfl);
      }
    }
  }

  if( pnWrite ) *pnWrite = nWrite;
  if( bFinishWork ){
    if( nWrite || bFlush ){
      lsmFinishWork(pDb, bFlush, nOvfl, &rc);
    }else{
      int rcdummy = LSM_BUSY;
      lsmFinishWork(pDb, 0, 0, &rcdummy);
    }
  }
  assert( pDb->pWorker==0 );

  /* If the LSM_WORK_CHECKPOINT flag is specified and one is available,
  ** write a checkpoint out to disk.  */
  if( rc==LSM_OK && (flags & LSM_WORK_CHECKPOINT) ){
    rc = lsmCheckpointWrite(pDb);
  }

  return rc;
}

/*
** Return a string representation of the segment passed as the only argument.
** Space for the returned string is allocated using lsmMalloc(), and should
** be freed by the caller using lsmFree().
................................................................................
          fileToString(pDb->pEnv, zLeft, sizeof(zLeft), 28, aLeft[i]); 
        }
        if( i<nRight ){ 
          fileToString(pDb->pEnv, zRight, sizeof(zRight), 28, aRight[i]); 
        }

        if( i==0 ){
          sqlite4_snprintf(zLevel, sizeof(zLevel), "L%d:", iLevel);


        }else{
          zLevel[0] = '\0';
        }

        if( nRight==0 ){
          iPad = 28 - (strlen(zLeft)/2) ;
        }
................................................................................
void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->apTreeCsr[0]);
    lsmTreeCursorSave(pCsr->apTreeCsr[1]);
  }
}















































#ifdef LSM_DEBUG_EXPENSIVE
/*
** This function is only included in the build if LSM_DEBUG_EXPENSIVE is 
** defined. Its only purpose is to evaluate various assert() statements to 
** verify that the database is well formed in certain respects.
**







>
>
>

>







 







|
|
|
|
|
|
|
|
|
|
|
|
|
<
<







 







<

<







 







|
>







>







 







>







 







|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<

|
<
<
<







 







>
>
>
>
>
>
>
>
>







>











>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>




<

<



|
<

>
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<








<







|







 







>
>
>
>
>
>
>

|

<



<
|
<


>
>
|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|


>

<
|
>



|
|



>
>
>
>
>
>
>

>
>
>
>
>
>

<
>
>
>
>
>
>
>
>
>
>
>
>



>



<
|
<
<
<
|
<
<
<
<
<
<
<
<
<
<
>
|
<
|
<
<
>
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
...
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626


627
628
629
630
631
632
633
...
648
649
650
651
652
653
654

655

656
657
658
659
660
661
662
....
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
3520
3521
3522
3523
3524
3525
3526
3527
3528
3529
3530
....
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
....
3603
3604
3605
3606
3607
3608
3609
3610






















3611










3612
3613



3614
3615
3616
3617
3618
3619
3620
....
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
....
3807
3808
3809
3810
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909

3910

3911
3912
3913
3914

3915
3916
3917
3918














































3919
3920
3921
3922
3923
3924
3925
3926

3927
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
3938
3939
3940
3941
....
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
4010
4011
4012
4013

4014
4015
4016

4017

4018
4019
4020
4021
4022
4023
4024
4025
4026
4027
4028
4029
....
4062
4063
4064
4065
4066
4067
4068
4069
4070
4071
4072
4073
4074
4075
4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
4098
4099
4100
4101
4102
4103
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143
4144
4145
4146
4147
4148
4149
4150
4151
4152
4153
4154
4155
4156
4157
4158
4159
4160
4161
4162
4163
4164
4165
4166
4167
4168
4169
4170
4171
4172
4173
4174
4175
4176
4177
4178
4179
4180
4181
4182
4183
4184
4185
4186
4187
4188
4189
4190
4191
4192
4193
4194
4195
4196
4197
4198
4199
4200
4201
4202
4203
4204
4205
4206
4207
4208
4209
4210
4211
4212
4213
4214
4215
4216
4217
4218
4219
4220
4221
4222
4223
4224
4225
4226
4227
4228
4229
4230
4231
4232
4233
4234
4235
4236
4237
4238
4239
4240
4241
4242
4243
4244
4245
4246
4247
4248
4249
4250
4251
....
4252
4253
4254
4255
4256
4257
4258
4259
4260
4261
4262
4263

4264
4265
4266
4267
4268
4269
4270
4271
4272
4273
4274
4275
4276
4277
4278
4279
4280
4281
4282
4283
4284
4285
4286
4287
4288

4289
4290
4291
4292
4293
4294
4295
4296
4297
4298
4299
4300
4301
4302
4303
4304
4305
4306
4307

4308



4309










4310
4311

4312


4313
4314





























































4315
4316
4317
4318
4319
4320
4321
....
4684
4685
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697
4698
4699
4700
....
4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759
4760
4761
4762
4763
4764
4765
4766
4767
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
4793
4794
4795
4796
4797
4798
4799
4800
4801
4802
4803
4804
4805
4806
4807
  Page *pPage;                    /* Current output page */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
static int assertBtreeOk(lsm_db *, Segment *);
static void assertRunInOrder(lsm_db *pDb, Segment *pSeg);
#else
#define assertRunInOrder(x,y)
#endif


struct FilePage { u8 *aData; int nData; };
static u8 *fsPageData(Page *pPg, int *pnData){
  *pnData = ((struct FilePage *)(pPg))->nData;
  return ((struct FilePage *)(pPg))->aData;
}
/*UNUSED static u8 *fsPageDataPtr(Page *pPg){
................................................................................
static int btreeCursorLoadKey(BtreeCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->iPg<0 ){
    pCsr->pKey = 0;
    pCsr->nKey = 0;
    pCsr->eType = 0;
  }else{
    int dummy;
    int iPg = pCsr->iPg;
    int iCell = pCsr->aPg[iPg].iCell;
    while( iCell<0 && (--iPg)>=0 ){
      iCell = pCsr->aPg[iPg].iCell-1;
    }
    if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;

    rc = pageGetBtreeKey(
        pCsr->aPg[iPg].pPage, iCell,
        &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
    );
    pCsr->eType |= SORTED_SEPARATOR;


  }

  return rc;
}

static int btreeCursorPtr(u8 *aData, int nData, int iCell){
  int nCell;
................................................................................
  int nData;

  assert( pCsr->iPg>=0 );
  assert( pCsr->iPg==pCsr->nDepth-1 );

  aData = fsPageData(pPg->pPage, &nData);
  nCell = pageGetNRec(aData, nData);

  assert( pPg->iCell<=nCell );

  pPg->iCell++;
  if( pPg->iCell==nCell ){
    Pgno iLoad;

    /* Up to parent. */
    lsmFsPageRelease(pPg->pPage);
    pPg->pPage = 0;
................................................................................
    pDb->xWork(pDb, pDb->pWorkCtx);
  }
}

static int sortedNewToplevel(
  lsm_db *pDb,                    /* Connection handle */
  int bTree,                      /* True to store contents of in-memory tree */
  int *pnOvfl,                    /* OUT: Number of free-list entries stored */
  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int iLeftPtr = 0;
  int nWrite = 0;                 /* Number of database pages written */

  assert( pnOvfl );

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);

................................................................................

    /* Do the work to create the new merged segment on disk */
    if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }

    nWrite = mergeworker.nWork;
    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->pMerge = 0;
  }

  /* Link the new level into the top of the tree. */
  if( rc==LSM_OK ){
    if( pDel ){
................................................................................
    sortedFreeLevel(pDb->pEnv, pNew);
  }

  if( rc==LSM_OK ){
    sortedInvokeWorkHook(pDb);
  }

  if( pnWrite ) *pnWrite = nWrite;






















  pDb->pWorker->nWrite += nWrite;










#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");



#endif
  return rc;
}

/*
** The nMerge levels in the LSM beginning with pLevel consist of a
** left-hand-side segment only. Replace these levels with a single new
................................................................................
  Level **ppNew                   /* New, merged, level */
){
  int rc = LSM_OK;                /* Return Code */
  Level *pNew;                    /* New Level object */
  int bUseNext = 0;               /* True to link in next separators */
  Merge *pMerge;                  /* New Merge object */
  int nByte;                      /* Bytes of space allocated at pMerge */

#ifdef LSM_DEBUG
  int iLevel;
  Level *pX = pLevel;
  for(iLevel=0; iLevel<nMerge; iLevel++){
    assert( pX->nRight==0 );
    pX = pX->pNext;
  }
#endif

  /* Allocate the new Level object */
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, 
                                        nMerge * sizeof(Segment), &rc);
  }


  /* Populate the new Level object */
  if( rc==LSM_OK ){
    Level *pNext = 0;             /* Level following pNew */
    int i;
    Level *pTopLevel;
    Level *p = pLevel;
    Level **pp;
    pNew->nRight = nMerge;
    pNew->iAge = pLevel->iAge+1;
    for(i=0; i<nMerge; i++){
      assert( p->nRight==0 );
      pNext = p->pNext;
      pNew->aRhs[i] = p->lhs;
      sortedFreeLevel(pDb->pEnv, p);
      p = pNext;
    }

    /* Replace the old levels with the new. */
................................................................................
    }
    pCsr->flags |= CURSOR_NEXT_OK;
  }

  return rc;
}

/*
** Argument p points to a level of age N. Return the number of levels in
** the linked list starting at p that have age=N (always at least 1).
*/
static int sortedCountLevels(Level *p){
  int iAge = p->iAge;
  int nRet = 0;
  do {
    nRet++;
    p = p->pNext;
  }while( p && p->iAge==iAge );
  return nRet;
}

static int sortedSelectLevel(lsm_db *pDb, int bOpt, Level **ppOut){
  Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
  int rc = LSM_OK;
  Level *pLevel = 0;            /* Output value */
  Level *pBest = 0;             /* Best level to work on found so far */
  int nBest = pDb->nMerge-1;    /* Number of segments merged at pBest */
  Level *pThis = 0;             /* First in run of levels with age=iAge */
  int nThis = 0;                /* Number of levels starting at pThis */

  /* Find the longest contiguous run of levels not currently undergoing a 
  ** merge with the same age in the structure. Or the level being merged
  ** with the largest number of right-hand segments. Work on it. */
  for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
    if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
      nThis++;
    }else{
      if( nThis>nBest ){
        if( (pLevel->iAge!=pThis->iAge+1)
            || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
          ){
          pBest = pThis;
          nBest = nThis;
        }
      }
      if( pLevel->nRight ){
        if( pLevel->nRight>nBest ){
          nBest = pLevel->nRight;
          pBest = pLevel;
        }
        nThis = 0;
        pThis = 0;
      }else{
        pThis = pLevel;
        nThis = 1;
      }
    }
  }
  if( nThis>nBest ){
    assert( pThis );
    pBest = pThis;
    nBest = nThis;
  }

  if( pBest==0 && bOpt && pTopLevel->pNext ){
    pBest = pTopLevel;
    nBest = 2;
  }

  if( pBest ){
    if( pBest->nRight==0 ){
      rc = sortedMergeSetup(pDb, pBest, nBest, ppOut);
    }else{
      *ppOut = pBest;
    }
  }

  return rc;
}

static int sortedDbIsFull(lsm_db *pDb){
  Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);

  if( lsmDatabaseFull(pDb) ) return 1;
  if( pTop && pTop->iAge==0
   && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
  ){
    return 1;
  }
  return 0;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int bOptimize,                  /* True to merge less than nMerge levels */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;


  assert( pWorker );

  if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;

  while( nRemaining>0 ){
    Level *pLevel = 0;


    /* Find a level to work on. */
    rc = sortedSelectLevel(pDb, bOptimize, &pLevel);
    assert( rc==LSM_OK || pLevel==0 );















































    if( pLevel==0 ){
      /* Could not find any work to do. Finished. */
      break;
    }else{
      MergeWorker mergeworker;    /* State used to work on the level merge */

      rc = mergeWorkerInit(pDb, pLevel, &mergeworker);

      assert( mergeworker.nWork==0 );
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && mergeworker.nWork<nRemaining 
      ){
        rc = mergeWorkerStep(&mergeworker);
      }
      nRemaining -= LSM_MAX(mergeworker.nWork, 1);

      /* Check if the merge operation is completely finished. If so, the
      ** Merge object and the right-hand-side of the level can be deleted. 
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */
      if( bFlush && sortedDbIsFull(pDb)==0 ) break;

      assertRunInOrder(pDb, &pLevel->lhs);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif

    }
  }


  if( pnWrite ) *pnWrite = (nWork - nRemaining);

  pWorker->nWrite += (nWork - nRemaining);

#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
#endif
  return rc;
}

typedef struct Metric Metric;
struct Metric {
  double fAvgHeight;
  int nTotalSz;
................................................................................
        log((double)nTotalSz / nMinSz) / log(2),
        nTotalSz,
        nMinSz
    );
  }
}
#endif

/*
** The database connection passed as the first argument must be a worker
** connection. This function checks if there exists an "old" in-memory tree
** ready to be flushed to disk. If so, *pbOut is set to true before 
** returning. Otherwise false.
**
** Normally, LSM_OK is returned. Or, if an error occurs, an LSM error code.
*/
static int sortedTreeHasOld(lsm_db *pDb, int *pbOut){
  int rc = LSM_OK;

  assert( pDb->pWorker );
  if( pDb->nTransOpen==0 ){
    rc = lsmTreeLoadHeader(pDb, 0);
  }

  if( rc==LSM_OK 
   && pDb->treehdr.iOldShmid
   && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff 
  ){
    *pbOut = 1;
  }else{
    *pbOut = 0;
  }
  return rc;
}

static int doLsmSingleWork(
  lsm_db *pDb, 
  int bShutdown,
  int flags, 
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
  int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
){
  int rc = LSM_OK;                /* Return code */
  int nOvfl = 0;
  int bFlush = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;
  int bToplevel = 0;

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);
  if( rc!=LSM_OK ) return rc;

  /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  ** that this call stops writing when the auto-checkpoint is due.  */
  if( bShutdown==0 && pDb->nAutockpt ){
    u32 nSync;
    u32 nUnsync;
    int nPgsz;
    int nMax;

    lsmCheckpointSynced(pDb, 0, 0, &nSync);
    nUnsync = lsmCheckpointNWrite(pDb->pShmhdr->aSnap1, 0);
    nPgsz = lsmCheckpointPgsz(pDb->pShmhdr->aSnap1);

    nMax = (pDb->nAutockpt/nPgsz) - (nUnsync-nSync);
    if( nMax<nRem ){
      bCkpt = 1;
      nRem = LSM_MAX(nMax, 0);
    }
  }

  /* If the FLUSH flag is set, there exists in-memory ready to be flushed
  ** to disk and there are lsm_db.nMerge or fewer age=0 levels, flush the 
  ** data to disk now.  */
  if( (flags & LSM_WORK_FLUSH) ){
    int bOld;
    rc = sortedTreeHasOld(pDb, &bOld);
    if( bOld ){
      if( sortedDbIsFull(pDb) ){
        int nPg = 0;
        rc = sortedWork(pDb, nRem, 0, 1, &nPg);
        nRem -= nPg;
        assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
        bToplevel = 1;
      }

      if( rc==LSM_OK && nRem>0 ){
        int nPg = 0;
        rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg);
        nRem -= nPg;
        if( rc==LSM_OK && pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        bFlush = 1;
        bToplevel = 0;
      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
    nRem -= nPg;
    if( nPg ){
      bToplevel = 1;
      nOvfl = 0;
    }
  }

  if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){
    while( rc==LSM_OK && sortedDbIsFull(pDb) ){
      int nPg = 0;
      rc = sortedWork(pDb, 16, 0, 1, &nPg);
    }
    if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
      rc = sortedNewToplevel(pDb, 0, &nOvfl, 0);
    }
  }

  if( rc==LSM_OK && (nRem!=nMax) ){
    rc = lsmSortedFlushDb(pDb);
    lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    assert( rc!=LSM_OK || bFlush==0 );
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  assert( pDb->pWorker==0 );

  if( rc==LSM_OK ){
    if( pnWrite ) *pnWrite = (nMax - nRem);
    if( pbCkpt ) *pbCkpt = (bCkpt && nRem<=0);
  }else{
    if( pnWrite ) *pnWrite = 0;
    if( pbCkpt ) *pbCkpt = 0;
  }

  return rc;
}

static int doLsmWork(lsm_db *pDb, int flags, int nPage, int *pnWrite){
  int rc;
  int nWrite = 0;
  int bCkpt = 0;

  do {
    int nThis = 0;
    bCkpt = 0;
    rc = doLsmSingleWork(pDb, 0, flags, nPage-nWrite, &nThis, &bCkpt);
    nWrite += nThis;
    if( rc==LSM_OK && bCkpt ){
      rc = lsm_checkpoint(pDb, 0);
    }
  }while( rc==LSM_OK && (nWrite<nPage && bCkpt) );

  if( pnWrite ){
    if( rc==LSM_OK ){
      *pnWrite = nWrite;
    }else{
      *pnWrite = 0;
    }
  }
  return rc;
}

/*
** Perform work to merge database segments together.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite){

  /* This function may not be called if pDb has an open read or write
  ** transaction. Return LSM_MISUSE if an application attempts this.  */
  if( pDb->nTransOpen || pDb->pCsr ) return LSM_MISUSE_BKPT;

  return doLsmWork(pDb, flags, nPage, pnWrite);
}

/*
** This function is called in auto-work mode to perform merging work on
** the data structure. It performs enough merging work to prevent the
** height of the tree from growing indefinitely assuming that roughly
** nUnit database pages worth of data have been written to the database
** (i.e. the in-memory tree) since the last call.
................................................................................
*/
int lsmSortedAutoWork(
  lsm_db *pDb,                    /* Database handle */
  int nUnit                       /* Pages of data written to in-memory tree */
){
  int rc;                         /* Return code */
  int nRemaining;                 /* Units of work to do before returning */
  int nDepth = 0;                 /* Current height of tree (longest path) */
  int nWrite;                     /* Pages written */
  Level *pLevel;                  /* Used to iterate through levels */
  int bRestore = 0;


  assert( pDb->pWorker==0 );
  assert( pDb->nTransOpen>0 );

  /* Determine how many units of work to do before returning. One unit of
  ** work is achieved by writing one page (~4KB) of merged data.  */
  nRemaining = 0;
  for(pLevel=lsmDbSnapshotLevel(pDb->pClient); pLevel; pLevel=pLevel->pNext){
    /* nDepth += LSM_MAX(1, pLevel->nRight); */
    nDepth += 1;
  }
  if( lsmTreeHasOld(pDb) ){
    nDepth += 1;
    bRestore = 1;
    rc = lsmSaveCursors(pDb);
    if( rc!=LSM_OK ) return rc;
  }

  nRemaining = nUnit * nDepth;
#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, rc, "lsmSortedAutoWork(): %d*%d = %d pages", 
      nUnit, nDepth, nRemaining);
#endif
  rc = doLsmWork(pDb, LSM_WORK_FLUSH, nRemaining, 0);
  if( rc==LSM_BUSY ) rc = LSM_OK;


  if( bRestore && pDb->pCsr ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
    rc = lsmCheckpointLoad(pDb, 0);
    if( rc==LSM_OK ){
      rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
    }
    if( rc==LSM_OK ){
      rc = lsmRestoreCursors(pDb);
    }
  }

#if 0
  lsmLogMessage(pDb, 0, "auto-work: %d pages", nWrite);
#endif

  return rc;
}


int lsmFlushTreeToDisk(lsm_db *pDb){



  int rc;










  rc = doLsmSingleWork(pDb, 1, LSM_WORK_FLUSH, (1<<30), 0, 0);
  if( rc==LSM_OK ){

    lsmTreeMakeOld(pDb);


    rc = doLsmSingleWork(pDb, 1, LSM_WORK_FLUSH, (1<<30), 0, 0);
  }





























































  return rc;
}

/*
** Return a string representation of the segment passed as the only argument.
** Space for the returned string is allocated using lsmMalloc(), and should
** be freed by the caller using lsmFree().
................................................................................
          fileToString(pDb->pEnv, zLeft, sizeof(zLeft), 28, aLeft[i]); 
        }
        if( i<nRight ){ 
          fileToString(pDb->pEnv, zRight, sizeof(zRight), 28, aRight[i]); 
        }

        if( i==0 ){
          sqlite4_snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d)", 
              iLevel, pLevel->iAge
          );
        }else{
          zLevel[0] = '\0';
        }

        if( nRight==0 ){
          iPad = 28 - (strlen(zLeft)/2) ;
        }
................................................................................
void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->apTreeCsr[0]);
    lsmTreeCursorSave(pCsr->apTreeCsr[1]);
  }
}

#ifdef LSM_DEBUG_EXPENSIVE
static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){
  Page *pPg = 0;
  Blob blob1 = {0, 0, 0, 0};
  Blob blob2 = {0, 0, 0, 0};

  lsmFsDbPageGet(pDb->pFS, pSeg->iFirst, &pPg);
  while( pPg ){
    u8 *aData; int nData;
    Page *pNext;

    aData = lsmFsPageData(pPg, &nData);
    if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){
      int i;
      int nRec = pageGetNRec(aData, nData);
      for(i=0; i<nRec; i++){
        int iTopic1, iTopic2;
        pageGetKeyCopy(pDb->pEnv, pPg, i, &iTopic1, &blob1);

        if( i==0 && blob2.nData ){
          assert( sortedKeyCompare(
                pDb->xCmp, iTopic2, blob2.pData, blob2.nData,
                iTopic1, blob1.pData, blob1.nData
          )<0 );
        }

        if( i<(nRec-1) ){
          pageGetKeyCopy(pDb->pEnv, pPg, i+1, &iTopic2, &blob2);
          assert( sortedKeyCompare(
                pDb->xCmp, iTopic1, blob1.pData, blob1.nData,
                iTopic2, blob2.pData, blob2.nData
          )<0 );
        }
      }
    }

    lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
    lsmFsPageRelease(pPg);
    pPg = pNext;
  }

  sortedBlobFree(&blob1);
  sortedBlobFree(&blob2);
}
#endif

#ifdef LSM_DEBUG_EXPENSIVE
/*
** This function is only included in the build if LSM_DEBUG_EXPENSIVE is 
** defined. Its only purpose is to evaluate various assert() statements to 
** verify that the database is well formed in certain respects.
**

Changes to src/lsm_tree.c.

959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
  pDb->treehdr.root.iTransId = 1;
  pDb->treehdr.root.iRoot = 0;
  pDb->treehdr.root.nHeight = 0;
  pDb->treehdr.nByte = 0;
  pDb->treehdr.iUsedShmid = pDb->treehdr.iNextShmid-1;
}

void lsmTreeMakeOld(lsm_db *pDb, int *pnFlush){
  if( pDb->treehdr.iOldShmid ){
    *pnFlush = 2;
  }else{
    *pnFlush = 1;
    pDb->treehdr.iOldLog = pDb->treehdr.log.aRegion[2].iEnd;
    pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
    pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
    pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
    memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));

    pDb->treehdr.root.iTransId = 1;







|
|
<
<
<







959
960
961
962
963
964
965
966
967



968
969
970
971
972
973
974
  pDb->treehdr.root.iTransId = 1;
  pDb->treehdr.root.iRoot = 0;
  pDb->treehdr.root.nHeight = 0;
  pDb->treehdr.nByte = 0;
  pDb->treehdr.iUsedShmid = pDb->treehdr.iNextShmid-1;
}

void lsmTreeMakeOld(lsm_db *pDb){
  if( pDb->treehdr.iOldShmid==0 ){



    pDb->treehdr.iOldLog = pDb->treehdr.log.aRegion[2].iEnd;
    pDb->treehdr.oldcksum0 = pDb->treehdr.log.cksum0;
    pDb->treehdr.oldcksum1 = pDb->treehdr.log.cksum1;
    pDb->treehdr.iOldShmid = pDb->treehdr.iNextShmid-1;
    memcpy(&pDb->treehdr.oldroot, &pDb->treehdr.root, sizeof(TreeRoot));

    pDb->treehdr.root.iTransId = 1;

Changes to tool/lsmperf.tcl.

154
155
156
157
158
159
160
161
162
163
164




165
166
167
168
  append script $data2
  append script $data3

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 40 20000 40000 1000 {
  LSM     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
  LevelDB leveldb
}















|
|


>
>
>
>




154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
  append script $data2
  append script $data3

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 60 25000 0 40 {
  lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
  LevelDB leveldb
}
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
# LevelDB leveldb
# SQLite sqlite3




Changes to www/lsm.wiki.

700
701
702
703
704
705
706
707

708













































  <li> Sync the database file again.

  <li> Update the shared-memory variable to indicate the meta-page written in
       step 5.

  <li> Drop the CHECKPOINTER lock.
</ol>
























































>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
  <li> Sync the database file again.

  <li> Update the shared-memory variable to indicate the meta-page written in
       step 5.

  <li> Drop the CHECKPOINTER lock.
</ol>

<h1>5. Scheduling Policies</h1>

<p>
When a client writes to a database, the in-memory tree and log file are
updated by the client itself before the lsm_write() call returns. Eventually, 
once sufficient writes have accumulated in memory, the client marks the 
current tree as "old", and subsequent writes are accumulated in a new tree.

<p>
In order to prevent the in-memory tree and log file from growing indefinitely,
at some point in the future the following must occur:

<ul>
  <li>The contents of the old tree must be written into the database file
      (a WORKER lock operation). Once this is done the memory used to store the
      old tree is available for reuse.

  <li>A checkpoint operation must take place to sync the data into the 
      database file and update the database header (a CHECKPOINT lock 
      operation). Once this has been done the log file space that was used 
      to store the data may be reclaimed.
</ul>

<p>
In addition to the above, it is necessary to perform a certain amount of 
work on the database to merge existing levels together. This is not just
to speed up queries - there is a hard limit of roughly 40 levels to stop
database snapshots from growing overly large.

<p><b> Explicit Calls to lsm_work() and lsm_checkpoint() </b>

<p><b> Compulsory work </b>

<ul>
  <li><p> If a writer tries to mark a tree as "old", but there is already an
       old tree in-memory, the writer attempts to grab the WORKER lock and
       write both the old and new tree to a new database level.

      <p> If the WORKER lock cannot be obtained immediately, block until it
       can be
</ul>

<p><b> Auto work </b>