SQLite4
Check-in [64895935bc]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Reuse existing lsm_cursor objects instead of always allocating new ones.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 64895935bc469d0971d39fa12098aa38b2af3233
User & Date: dan 2013-02-28 19:09:47
Context
2013-03-01
19:06
Add sqltest.c. Containing tests used to compare the performance of different sqlite versions. check-in: c9a4437853 user: dan tags: trunk
2013-02-28
19:09
Reuse existing lsm_cursor objects instead of always allocating new ones. check-in: 64895935bc user: dan tags: trunk
15:56
Minor lsm optimizations. check-in: a02dacd5bd user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

302
303
304
305
306
307
308












309
310
311
312
313
314
315
...
342
343
344
345
346
347
348


349
350
351
352
353
354
355
...
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799

800
801
802
803
804
805
806
**   lock on lock region iLock, then the following is true:
**
**       (mLock & ((iLock+32-1) << 1))
**
**   Or for an EXCLUSIVE lock:
**
**       (mLock & ((iLock-1) << 1))












*/
struct lsm_db {

  /* Database handle configuration */
  lsm_env *pEnv;                            /* runtime environment */
  int (*xCmp)(void *, int, void *, int);    /* Compare function */

................................................................................
  MultiCursor *pCsr;              /* List of all open cursors */
  LogWriter *pLogWriter;          /* Context for writing to the log file */
  int nTransOpen;                 /* Number of opened write transactions */
  int nTransAlloc;                /* Allocated size of aTrans[] array */
  TransMark *aTrans;              /* Array of marks for transaction rollback */
  IntArray rollback;              /* List of tree-nodes to roll back */
  int bDiscardOld;                /* True if lsmTreeDiscardOld() was called */



  /* Worker context */
  Snapshot *pWorker;              /* Worker snapshot (or NULL) */
  Freelist *pFreelist;            /* See sortedNewToplevel() */
  int bUseFreelist;               /* True to use pFreelist */
  int bIncrMerge;                 /* True if currently doing a merge */

................................................................................
int lsmSortedLoadFreelist(lsm_db *pDb, void **, int *);

void *lsmSortedSplitKey(Level *pLevel, int *pnByte);

void lsmSortedSaveTreeCursors(lsm_db *);

int lsmMCursorNew(lsm_db *, MultiCursor **);
void lsmMCursorClose(MultiCursor *);
int lsmMCursorSeek(MultiCursor *, int, void *, int , int);
int lsmMCursorFirst(MultiCursor *);
int lsmMCursorPrev(MultiCursor *);
int lsmMCursorLast(MultiCursor *);
int lsmMCursorValid(MultiCursor *);
int lsmMCursorNext(MultiCursor *);
int lsmMCursorKey(MultiCursor *, void **, int *);
int lsmMCursorValue(MultiCursor *, void **, int *);
int lsmMCursorType(MultiCursor *, int *);
lsm_db *lsmMCursorDb(MultiCursor *);


int lsmSaveCursors(lsm_db *pDb);
int lsmRestoreCursors(lsm_db *pDb);

void lsmSortedDumpStructure(lsm_db *pDb, Snapshot *, int, int, const char *);
void lsmFsDumpBlocklists(lsm_db *);








>
>
>
>
>
>
>
>
>
>
>
>







 







>
>







 







|










>







302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
...
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
...
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
**   lock on lock region iLock, then the following is true:
**
**       (mLock & ((iLock+32-1) << 1))
**
**   Or for an EXCLUSIVE lock:
**
**       (mLock & ((iLock-1) << 1))
** 
** pCsr:
**   Points to the head of a linked list that contains all currently open
**   cursors. Once this list becomes empty, the user has no outstanding
**   cursors and the database handle can be successfully closed.
**
** pCsrCache:
**   This list contains cursor objects that have been closed using
**   lsm_csr_close(). Each time a cursor is closed, it is shifted from 
**   the pCsr list to this list. When a new cursor is opened, this list
**   is inspected to see if there exists a cursor object that can be
**   reused. This is an optimization only.
*/
struct lsm_db {

  /* Database handle configuration */
  lsm_env *pEnv;                            /* runtime environment */
  int (*xCmp)(void *, int, void *, int);    /* Compare function */

................................................................................
  MultiCursor *pCsr;              /* List of all open cursors */
  LogWriter *pLogWriter;          /* Context for writing to the log file */
  int nTransOpen;                 /* Number of opened write transactions */
  int nTransAlloc;                /* Allocated size of aTrans[] array */
  TransMark *aTrans;              /* Array of marks for transaction rollback */
  IntArray rollback;              /* List of tree-nodes to roll back */
  int bDiscardOld;                /* True if lsmTreeDiscardOld() was called */

  MultiCursor *pCsrCache;         /* List of all closed cursors */

  /* Worker context */
  Snapshot *pWorker;              /* Worker snapshot (or NULL) */
  Freelist *pFreelist;            /* See sortedNewToplevel() */
  int bUseFreelist;               /* True to use pFreelist */
  int bIncrMerge;                 /* True if currently doing a merge */

................................................................................
int lsmSortedLoadFreelist(lsm_db *pDb, void **, int *);

void *lsmSortedSplitKey(Level *pLevel, int *pnByte);

void lsmSortedSaveTreeCursors(lsm_db *);

int lsmMCursorNew(lsm_db *, MultiCursor **);
void lsmMCursorClose(MultiCursor *, int);
int lsmMCursorSeek(MultiCursor *, int, void *, int , int);
int lsmMCursorFirst(MultiCursor *);
int lsmMCursorPrev(MultiCursor *);
int lsmMCursorLast(MultiCursor *);
int lsmMCursorValid(MultiCursor *);
int lsmMCursorNext(MultiCursor *);
int lsmMCursorKey(MultiCursor *, void **, int *);
int lsmMCursorValue(MultiCursor *, void **, int *);
int lsmMCursorType(MultiCursor *, int *);
lsm_db *lsmMCursorDb(MultiCursor *);
void lsmMCursorFreeCache(lsm_db *);

int lsmSaveCursors(lsm_db *pDb);
int lsmRestoreCursors(lsm_db *pDb);

void lsmSortedDumpStructure(lsm_db *pDb, Snapshot *, int, int, const char *);
void lsmFsDumpBlocklists(lsm_db *);

Changes to src/lsm_main.c.

213
214
215
216
217
218
219

220
221
222
223
224
225
226
...
765
766
767
768
769
770
771

772

773
774
775
776
777
778
779
780
781
782
783
784
785
...
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
int lsm_close(lsm_db *pDb){
  int rc = LSM_OK;
  if( pDb ){
    assert_db_state(pDb);
    if( pDb->pCsr || pDb->nTransOpen ){
      rc = LSM_MISUSE_BKPT;
    }else{

      lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
      pDb->pClient = 0;

      assertRwclientLockValue(pDb);

      lsmDbDatabaseRelease(pDb);
      lsmLogClose(pDb);
................................................................................
    assert( pDb->bReadonly );
    rc = lsmBeginRoTrans(pDb);
  }else if( pDb->iReader<0 ){
    rc = lsmBeginReadTrans(pDb);
  }

  /* Allocate the multi-cursor. */

  if( rc==LSM_OK ) rc = lsmMCursorNew(pDb, &pCsr);


  /* If an error has occured, set the output to NULL and delete any partially
  ** allocated cursor. If this means there are no open cursors, release the
  ** client snapshot.  */
  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);
    dbReleaseClientSnapshot(pDb);
  }

  assert_db_state(pDb);
  *ppCsr = (lsm_cursor *)pCsr;
  return rc;
}
................................................................................
/*
** Close a cursor opened using lsm_csr_open().
*/
int lsm_csr_close(lsm_cursor *p){
  if( p ){
    lsm_db *pDb = lsmMCursorDb((MultiCursor *)p);
    assert_db_state(pDb);
    lsmMCursorClose((MultiCursor *)p);
    dbReleaseClientSnapshot(pDb);
    assert_db_state(pDb);
  }
  return LSM_OK;
}

/*







>







 







>
|
>





|







 







|







213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
...
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
...
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
int lsm_close(lsm_db *pDb){
  int rc = LSM_OK;
  if( pDb ){
    assert_db_state(pDb);
    if( pDb->pCsr || pDb->nTransOpen ){
      rc = LSM_MISUSE_BKPT;
    }else{
      lsmMCursorFreeCache(pDb);
      lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
      pDb->pClient = 0;

      assertRwclientLockValue(pDb);

      lsmDbDatabaseRelease(pDb);
      lsmLogClose(pDb);
................................................................................
    assert( pDb->bReadonly );
    rc = lsmBeginRoTrans(pDb);
  }else if( pDb->iReader<0 ){
    rc = lsmBeginReadTrans(pDb);
  }

  /* Allocate the multi-cursor. */
  if( rc==LSM_OK ){
    rc = lsmMCursorNew(pDb, &pCsr);
  }

  /* If an error has occured, set the output to NULL and delete any partially
  ** allocated cursor. If this means there are no open cursors, release the
  ** client snapshot.  */
  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr, 0);
    dbReleaseClientSnapshot(pDb);
  }

  assert_db_state(pDb);
  *ppCsr = (lsm_cursor *)pCsr;
  return rc;
}
................................................................................
/*
** Close a cursor opened using lsm_csr_open().
*/
int lsm_csr_close(lsm_cursor *p){
  if( p ){
    lsm_db *pDb = lsmMCursorDb((MultiCursor *)p);
    assert_db_state(pDb);
    lsmMCursorClose((MultiCursor *)p, 1);
    dbReleaseClientSnapshot(pDb);
    assert_db_state(pDb);
  }
  return LSM_OK;
}

/*

Changes to src/lsm_shared.c.

1147
1148
1149
1150
1151
1152
1153

1154
1155
1156
1157
1158
1159
1160
    rc = lsmTreeLoadHeader(pDb, &iTreehdr);

    /* Load the database snapshot */
    if( rc==LSM_OK ){
      if( lsmCheckpointClientCacheOk(pDb)==0 ){
        lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
        pDb->pClient = 0;

        rc = lsmCheckpointLoad(pDb, &iSnap);
      }else{
        iSnap = 1;
      }
    }

    /* Take a read-lock on the tree and snapshot just loaded. Then check







>







1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
    rc = lsmTreeLoadHeader(pDb, &iTreehdr);

    /* Load the database snapshot */
    if( rc==LSM_OK ){
      if( lsmCheckpointClientCacheOk(pDb)==0 ){
        lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
        pDb->pClient = 0;
        lsmMCursorFreeCache(pDb);
        rc = lsmCheckpointLoad(pDb, &iSnap);
      }else{
        iSnap = 1;
      }
    }

    /* Take a read-lock on the tree and snapshot just loaded. Then check

Changes to src/lsm_sorted.c.

2241
2242
2243
2244
2245
2246
2247
















2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261



















2262
2263
2264
2265
2266
2267
2268
2269
2270

2271
2272
2273
2274
2275
2276
2277
....
2422
2423
2424
2425
2426
2427
2428



2429
2430
2431
2432
2433
2434
2435
2436





















2437
2438
2439

2440
2441
2442
2443
2444
2445
2446
2447
2448
....
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
....
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
....
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041
4042
4043
4044
4045
....
4285
4286
4287
4288
4289
4290
4291
4292
4293
4294
4295
4296
4297
4298
4299
....
5335
5336
5337
5338
5339
5340
5341

5342
5343
5344
5345
5346
5347
5348
  pCsr->aTree = 0;
  pCsr->pSystemVal = 0;
  pCsr->apTreeCsr[0] = 0;
  pCsr->apTreeCsr[1] = 0;
  pCsr->pBtCsr = 0;
}

















void lsmMCursorClose(MultiCursor *pCsr){
  if( pCsr ){
    lsm_db *pDb = pCsr->pDb;
    MultiCursor **pp;             /* Iterator variable */

    /* The cursor may or may not be currently part of the linked list 
    ** starting at lsm_db.pCsr. If it is, extract it.  */
    for(pp=&pDb->pCsr; *pp; pp=&((*pp)->pNext)){
      if( *pp==pCsr ){
        *pp = pCsr->pNext;
        break;
      }
    }




















    /* Free the allocation used to cache the current key, if any. */
    sortedBlobFree(&pCsr->key);
    sortedBlobFree(&pCsr->val);

    /* Free the component cursors */
    mcursorFreeComponents(pCsr);

    /* Free the cursor structure itself */
    lsmFree(pDb->pEnv, pCsr);

  }
}

#define TREE_NONE 0
#define TREE_OLD  1
#define TREE_BOTH 2

................................................................................
  pCsr->flags |= CURSOR_FLUSH_FREELIST;
  pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
  return rc;
}

/*
** Allocate and return a new database cursor.



*/
int lsmMCursorNew(
  lsm_db *pDb,                    /* Database handle */
  MultiCursor **ppCsr             /* OUT: Allocated cursor */
){
  MultiCursor *pCsr = 0;
  int rc = LSM_OK;






















  pCsr = multiCursorNew(pDb, &rc);
  if( rc==LSM_OK ) rc = multiCursorInit(pCsr, pDb->pClient);


  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);
    pCsr = 0;
  }
  assert( (rc==LSM_OK)==(pCsr!=0) );
  *ppCsr = pCsr;
  return rc;
}

................................................................................
        iSnap = (i64)lsmGetU64((u8 *)pVal);
        if( x(pCtx, iBlk, iSnap) ) break;
        rc = multiCursorAdvance(pCsr, !bReverse);
      }
    }
  }

  lsmMCursorClose(pCsr);
  if( pSnap!=pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pSnap);
  }

  return rc;
}

................................................................................
        if( *ppVal ){
          memcpy(*ppVal, pVal, nVal);
          *pnVal = nVal;
        }
      }
    }

    lsmMCursorClose(pCsr);
  }

  return rc;
}

static int multiCursorAllocTree(MultiCursor *pCsr){
  int rc = LSM_OK;
................................................................................
    }else{
      btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
    }
    
    pMerge->iOutputOff = -1;
  }

  lsmMCursorClose(pCsr);

  /* Persist and release the output page. */
  if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW);
  lsmFsFlushWaiting(pMW->pDb->pFS, &rc);
................................................................................
    ** markers present in the in-memory tree.  */
    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);
  }else{
    Pgno iLeftPtr = 0;
    Merge merge;                  /* Merge object used to create new level */
    MergeWorker mergeworker;      /* MergeWorker object for the same purpose */

    memset(&merge, 0, sizeof(Merge));
    memset(&mergeworker, 0, sizeof(MergeWorker));
................................................................................
        nUnit, nDepth, nRemaining);
#endif
    assert( nRemaining>=0 );
    rc = doLsmWork(pDb, pDb->nMerge, nRemaining, 0);
    if( rc==LSM_BUSY ) rc = LSM_OK;

    if( bRestore && pDb->pCsr ){

      lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
      pDb->pClient = 0;
      rc = lsmCheckpointLoad(pDb, 0);
      if( rc==LSM_OK ){
        rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
      }
      if( rc==LSM_OK ){







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|













>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|

|
|

|
|
>







 







>
>
>








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
>

|







 







|







 







|







 







|







 







|







 







>







2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
....
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
....
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
....
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
....
4092
4093
4094
4095
4096
4097
4098
4099
4100
4101
4102
4103
4104
4105
4106
....
4346
4347
4348
4349
4350
4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
....
5396
5397
5398
5399
5400
5401
5402
5403
5404
5405
5406
5407
5408
5409
5410
  pCsr->aTree = 0;
  pCsr->pSystemVal = 0;
  pCsr->apTreeCsr[0] = 0;
  pCsr->apTreeCsr[1] = 0;
  pCsr->pBtCsr = 0;
}

void lsmMCursorFreeCache(lsm_db *pDb){
  MultiCursor *p;
  MultiCursor *pNext;
  for(p=pDb->pCsrCache; p; p=pNext){
    pNext = p->pNext;
    lsmMCursorClose(p, 0);
  }
  pDb->pCsrCache = 0;
}

/*
** Close the cursor passed as the first argument.
**
** If the bCache parameter is true, then shift the cursor to the pCsrCache
** list for possible reuse instead of actually deleting it.
*/
void lsmMCursorClose(MultiCursor *pCsr, int bCache){
  if( pCsr ){
    lsm_db *pDb = pCsr->pDb;
    MultiCursor **pp;             /* Iterator variable */

    /* The cursor may or may not be currently part of the linked list 
    ** starting at lsm_db.pCsr. If it is, extract it.  */
    for(pp=&pDb->pCsr; *pp; pp=&((*pp)->pNext)){
      if( *pp==pCsr ){
        *pp = pCsr->pNext;
        break;
      }
    }

    if( bCache ){
      int i;                      /* Used to iterate through segment-pointers */

      /* Release any page references held by this cursor. */
      assert( !pCsr->pBtCsr );
      for(i=0; i<pCsr->nPtr; i++){
        SegmentPtr *pPtr = &pCsr->aPtr[i];
        lsmFsPageRelease(pPtr->pPg);
        pPtr->pPg = 0;
      }

      /* Reset the tree cursors */
      lsmTreeCursorReset(pCsr->apTreeCsr[0]);
      lsmTreeCursorReset(pCsr->apTreeCsr[1]);

      /* Add the cursor to the pCsrCache list */
      pCsr->pNext = pDb->pCsrCache;
      pDb->pCsrCache = pCsr;
    }else{
      /* Free the allocation used to cache the current key, if any. */
      sortedBlobFree(&pCsr->key);
      sortedBlobFree(&pCsr->val);

      /* Free the component cursors */
      mcursorFreeComponents(pCsr);

      /* Free the cursor structure itself */
      lsmFree(pDb->pEnv, pCsr);
    }
  }
}

#define TREE_NONE 0
#define TREE_OLD  1
#define TREE_BOTH 2

................................................................................
  pCsr->flags |= CURSOR_FLUSH_FREELIST;
  pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
  return rc;
}

/*
** Allocate and return a new database cursor.
**
** This method should only be called to allocate user cursors. As it may
** recycle a cursor from lsm_db.pCsrCache.
*/
int lsmMCursorNew(
  lsm_db *pDb,                    /* Database handle */
  MultiCursor **ppCsr             /* OUT: Allocated cursor */
){
  MultiCursor *pCsr = 0;
  int rc = LSM_OK;

  if( pDb->pCsrCache ){
    int bOld;                     /* True if there is an old in-memory tree */

    /* Remove a cursor from the pCsrCache list and add it to the open list. */
    pCsr = pDb->pCsrCache;
    pDb->pCsrCache = pCsr->pNext;
    pCsr->pNext = pDb->pCsr;
    pDb->pCsr = pCsr;

    /* The cursor can almost be used as is, except that the old in-memory
    ** tree cursor may be present and not required, or required and not
    ** present. Fix this if required.  */
    bOld = (lsmTreeHasOld(pDb) && pDb->treehdr.iOldLog!=pDb->pClient->iLogOff);
    if( !bOld && pCsr->apTreeCsr[1] ){
      lsmTreeCursorDestroy(pCsr->apTreeCsr[1]);
      pCsr->apTreeCsr[1] = 0;
    }else if( bOld && !pCsr->apTreeCsr[1] ){
      rc = lsmTreeCursorNew(pDb, 1, &pCsr->apTreeCsr[1]);
    }

  }else{
    pCsr = multiCursorNew(pDb, &rc);
    if( rc==LSM_OK ) rc = multiCursorInit(pCsr, pDb->pClient);
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr, 0);
    pCsr = 0;
  }
  assert( (rc==LSM_OK)==(pCsr!=0) );
  *ppCsr = pCsr;
  return rc;
}

................................................................................
        iSnap = (i64)lsmGetU64((u8 *)pVal);
        if( x(pCtx, iBlk, iSnap) ) break;
        rc = multiCursorAdvance(pCsr, !bReverse);
      }
    }
  }

  lsmMCursorClose(pCsr, 0);
  if( pSnap!=pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pSnap);
  }

  return rc;
}

................................................................................
        if( *ppVal ){
          memcpy(*ppVal, pVal, nVal);
          *pnVal = nVal;
        }
      }
    }

    lsmMCursorClose(pCsr, 0);
  }

  return rc;
}

static int multiCursorAllocTree(MultiCursor *pCsr){
  int rc = LSM_OK;
................................................................................
    }else{
      btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
    }
    
    pMerge->iOutputOff = -1;
  }

  lsmMCursorClose(pCsr, 0);

  /* Persist and release the output page. */
  if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW);
  lsmFsFlushWaiting(pMW->pDb->pFS, &rc);
................................................................................
    ** markers present in the in-memory tree.  */
    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr, 0);
  }else{
    Pgno iLeftPtr = 0;
    Merge merge;                  /* Merge object used to create new level */
    MergeWorker mergeworker;      /* MergeWorker object for the same purpose */

    memset(&merge, 0, sizeof(Merge));
    memset(&mergeworker, 0, sizeof(MergeWorker));
................................................................................
        nUnit, nDepth, nRemaining);
#endif
    assert( nRemaining>=0 );
    rc = doLsmWork(pDb, pDb->nMerge, nRemaining, 0);
    if( rc==LSM_BUSY ) rc = LSM_OK;

    if( bRestore && pDb->pCsr ){
      lsmMCursorFreeCache(pDb);
      lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
      pDb->pClient = 0;
      rc = lsmCheckpointLoad(pDb, 0);
      if( rc==LSM_OK ){
        rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
      }
      if( rc==LSM_OK ){