SQLite4
Check-in [2351f01937]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Truncate away any free blocks at the end of the database file when the system is shutdown (last connection disconnects).
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 2351f01937462a6749453aba4ce25685a5a1155c
User & Date: dan 2013-01-17 12:22:48
Context
2013-01-17
19:13
Fix an lsm bug causing it to choose the wrong block to reuse. check-in: 2ff461b422 user: dan tags: trunk
12:22
Truncate away any free blocks at the end of the database file when the system is shutdown (last connection disconnects). check-in: 2351f01937 user: dan tags: trunk
2013-01-15
17:39
Fix a typo in lsmusr.wiki. check-in: ace47a5829 user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
...
678
679
680
681
682
683
684

685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
...
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
...
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
...
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *, u32 *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoadWorker(lsm_db *pDb);
................................................................................
int lsmFsPageWritable(Page *);

/* Functions to read, write and sync the log file. */
int lsmFsWriteLog(FileSystem *pFS, i64 iOff, LsmString *pStr);
int lsmFsSyncLog(FileSystem *pFS);
int lsmFsReadLog(FileSystem *pFS, i64 iOff, int nRead, LsmString *pStr);
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte);

int lsmFsCloseAndDeleteLog(FileSystem *pFS);

void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp);

/* And to sync the db file */
int lsmFsSyncDb(FileSystem *, int);

void lsmFsFlushWaiting(FileSystem *, int *);

/* Used by lsm_info(ARRAY_STRUCTURE) and lsm_config(MMAP) */
int lsmInfoArrayStructure(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmConfigMmap(lsm_db *pDb, int *piParam);

int lsmEnvOpen(lsm_env *, const char *, lsm_file **);
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile);
int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock);

................................................................................
/* 
** Functions from file "lsm_sorted.c".
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

int lsmSortedWalkFreelist(lsm_db *, int (*)(void *, int, i64), void *);

int lsmSaveWorker(lsm_db *, int);

int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);

................................................................................

void *lsmSortedSplitKey(Level *pLevel, int *pnByte);

void lsmSortedSaveTreeCursors(lsm_db *);

int lsmMCursorNew(lsm_db *, MultiCursor **);
void lsmMCursorClose(MultiCursor *);
int lsmMCursorSeek(MultiCursor *, void *, int , int);
int lsmMCursorFirst(MultiCursor *);
int lsmMCursorPrev(MultiCursor *);
int lsmMCursorLast(MultiCursor *);
int lsmMCursorValid(MultiCursor *);
int lsmMCursorNext(MultiCursor *);
int lsmMCursorKey(MultiCursor *, void **, int *);
int lsmMCursorValue(MultiCursor *, void **, int *);
................................................................................
int lsmTreeInUse(lsm_db *db, u32 iLsmId, int *pbInUse);
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId);

int lsmDbMultiProc(lsm_db *);
void lsmDbDeferredClose(lsm_db *, lsm_file *, LsmFile *);
LsmFile *lsmDbRecycleFd(lsm_db *);

int lsmWalkFreelist(lsm_db *, int (*)(void *, int, i64), void *);


/**************************************************************************
** functions in lsm_str.c
*/
void lsmStringInit(LsmString*, lsm_env *pEnv);
int lsmStringExtend(LsmString*, int);







|







 







>










|







 







|







 







|







 







|







522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
...
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
...
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
...
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
...
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *, int, u32 *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoadWorker(lsm_db *pDb);
................................................................................
int lsmFsPageWritable(Page *);

/* Functions to read, write and sync the log file. */
int lsmFsWriteLog(FileSystem *pFS, i64 iOff, LsmString *pStr);
int lsmFsSyncLog(FileSystem *pFS);
int lsmFsReadLog(FileSystem *pFS, i64 iOff, int nRead, LsmString *pStr);
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte);
int lsmFsTruncateDb(FileSystem *pFS, i64 nByte);
int lsmFsCloseAndDeleteLog(FileSystem *pFS);

void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp);

/* And to sync the db file */
int lsmFsSyncDb(FileSystem *, int);

void lsmFsFlushWaiting(FileSystem *, int *);

/* Used by lsm_info(ARRAY_STRUCTURE) and lsm_config(MMAP) */
int lsmInfoArrayStructure(lsm_db *pDb, int bBlock, Pgno iFirst, char **pzOut);
int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmConfigMmap(lsm_db *pDb, int *piParam);

int lsmEnvOpen(lsm_env *, const char *, lsm_file **);
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile);
int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock);

................................................................................
/* 
** Functions from file "lsm_sorted.c".
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

int lsmSortedWalkFreelist(lsm_db *, int, int (*)(void *, int, i64), void *);

int lsmSaveWorker(lsm_db *, int);

int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);

................................................................................

void *lsmSortedSplitKey(Level *pLevel, int *pnByte);

void lsmSortedSaveTreeCursors(lsm_db *);

int lsmMCursorNew(lsm_db *, MultiCursor **);
void lsmMCursorClose(MultiCursor *);
int lsmMCursorSeek(MultiCursor *, int, void *, int , int);
int lsmMCursorFirst(MultiCursor *);
int lsmMCursorPrev(MultiCursor *);
int lsmMCursorLast(MultiCursor *);
int lsmMCursorValid(MultiCursor *);
int lsmMCursorNext(MultiCursor *);
int lsmMCursorKey(MultiCursor *, void **, int *);
int lsmMCursorValue(MultiCursor *, void **, int *);
................................................................................
int lsmTreeInUse(lsm_db *db, u32 iLsmId, int *pbInUse);
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId);

int lsmDbMultiProc(lsm_db *);
void lsmDbDeferredClose(lsm_db *, lsm_file *, LsmFile *);
LsmFile *lsmDbRecycleFd(lsm_db *);

int lsmWalkFreelist(lsm_db *, int, int (*)(void *, int, i64), void *);


/**************************************************************************
** functions in lsm_str.c
*/
void lsmStringInit(LsmString*, lsm_env *pEnv);
int lsmStringExtend(LsmString*, int);

Changes to src/lsm_file.c.

412
413
414
415
416
417
418








419
420
421
422
423
424
425
....
2265
2266
2267
2268
2269
2270
2271
2272





2273
2274
2275
2276
2277
2278
2279
....
2299
2300
2301
2302
2303
2304
2305







2306
2307
2308
2309
2310
2311
2312

2313
2314
2315
2316
2317
2318
2319
....
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
/*
** Truncate the log file to nByte bytes in size.
*/
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte){
  if( pFS->fdLog==0 ) return LSM_OK;
  return lsmEnvTruncate(pFS->pEnv, pFS->fdLog, nByte);
}









/*
** Close the log file. Then delete it from the file-system. This function
** is called during database shutdown only.
*/
int lsmFsCloseAndDeleteLog(FileSystem *pFS){
  char *zDel;
................................................................................
** This function implements the lsm_info(LSM_INFO_ARRAY_STRUCTURE) request.
** If successful, *pzOut is set to point to a nul-terminated string 
** containing the array structure and LSM_OK is returned. The caller should
** eventually free the string using lsmFree().
**
** If an error occurs, *pzOut is set to NULL and an LSM error code returned.
*/
int lsmInfoArrayStructure(lsm_db *pDb, Pgno iFirst, char **pzOut){





  int rc = LSM_OK;
  Snapshot *pWorker;              /* Worker snapshot */
  Segment *pArray = 0;            /* Array to report on */
  int bUnlock = 0;

  *pzOut = 0;
  if( iFirst==0 ) return LSM_ERROR;
................................................................................
    int iBlk;
    int iLastBlk;
   
    iBlk = fsPageToBlock(pFS, pArray->iFirst);
    iLastBlk = fsPageToBlock(pFS, pArray->iLastPg);

    lsmStringInit(&str, pDb->pEnv);







    lsmStringAppendf(&str, "%d", pArray->iFirst);
    while( iBlk!=iLastBlk ){
      lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk));
      fsBlockNext(pFS, iBlk, &iBlk);
      lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk));
    }
    lsmStringAppendf(&str, " %d", pArray->iLastPg);


    *pzOut = str.z;
  }

  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcwork);
................................................................................
      checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed);
    }
  }

  /* Mark all blocks in the free-list as used */
  ctx.aUsed = aUsed;
  ctx.nBlock = nBlock;
  rc = lsmWalkFreelist(pDb, checkFreelistCb, (void *)&ctx);

  if( rc==LSM_OK ){
    for(i=0; i<nBlock; i++) assert( aUsed[i]!=0 );
  }

  lsmFree(pDb->pEnv, aUsed);
  lsmFree(pDb->pEnv, freelist.aEntry);







>
>
>
>
>
>
>
>







 







|
>
>
>
>
>







 







>
>
>
>
>
>
>
|
|
|
|
|
|
|
>







 







|







412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
....
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
....
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
....
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
/*
** Truncate the log file to nByte bytes in size.
*/
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte){
  if( pFS->fdLog==0 ) return LSM_OK;
  return lsmEnvTruncate(pFS->pEnv, pFS->fdLog, nByte);
}

/*
** Truncate the log file to nByte bytes in size.
*/
int lsmFsTruncateDb(FileSystem *pFS, i64 nByte){
  if( pFS->fdDb==0 ) return LSM_OK;
  return lsmEnvTruncate(pFS->pEnv, pFS->fdDb, nByte);
}

/*
** Close the log file. Then delete it from the file-system. This function
** is called during database shutdown only.
*/
int lsmFsCloseAndDeleteLog(FileSystem *pFS){
  char *zDel;
................................................................................
** This function implements the lsm_info(LSM_INFO_ARRAY_STRUCTURE) request.
** If successful, *pzOut is set to point to a nul-terminated string 
** containing the array structure and LSM_OK is returned. The caller should
** eventually free the string using lsmFree().
**
** If an error occurs, *pzOut is set to NULL and an LSM error code returned.
*/
int lsmInfoArrayStructure(
  lsm_db *pDb, 
  int bBlock,                     /* True for block numbers only */
  Pgno iFirst,
  char **pzOut
){
  int rc = LSM_OK;
  Snapshot *pWorker;              /* Worker snapshot */
  Segment *pArray = 0;            /* Array to report on */
  int bUnlock = 0;

  *pzOut = 0;
  if( iFirst==0 ) return LSM_ERROR;
................................................................................
    int iBlk;
    int iLastBlk;
   
    iBlk = fsPageToBlock(pFS, pArray->iFirst);
    iLastBlk = fsPageToBlock(pFS, pArray->iLastPg);

    lsmStringInit(&str, pDb->pEnv);
    if( bBlock ){
      lsmStringAppendf(&str, "%d", iBlk);
      while( iBlk!=iLastBlk ){
        fsBlockNext(pFS, iBlk, &iBlk);
        lsmStringAppendf(&str, " %d", iBlk);
      }
    }else{
      lsmStringAppendf(&str, "%d", pArray->iFirst);
      while( iBlk!=iLastBlk ){
        lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk));
        fsBlockNext(pFS, iBlk, &iBlk);
        lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk));
      }
      lsmStringAppendf(&str, " %d", pArray->iLastPg);
    }

    *pzOut = str.z;
  }

  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcwork);
................................................................................
      checkBlocks(pFS, &pLevel->aRhs[i], 0, nBlock, aUsed);
    }
  }

  /* Mark all blocks in the free-list as used */
  ctx.aUsed = aUsed;
  ctx.nBlock = nBlock;
  rc = lsmWalkFreelist(pDb, 0, checkFreelistCb, (void *)&ctx);

  if( rc==LSM_OK ){
    for(i=0; i<nBlock; i++) assert( aUsed[i]!=0 );
  }

  lsmFree(pDb->pEnv, aUsed);
  lsmFree(pDb->pEnv, freelist.aEntry);

Changes to src/lsm_main.c.

435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
...
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
...
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
  int rc;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  rc = lsmWalkFreelist(pDb, infoFreelistCb, &s);
  if( rc!=LSM_OK ){
    lsmFree(pDb->pEnv, s.z);
  }else{
    *pzOut = s.z;
  }

  /* Release the snapshot and return */
................................................................................
      rc = lsmStructList(pDb, pzVal);
      break;
    }

    case LSM_INFO_ARRAY_STRUCTURE: {
      Pgno pgno = va_arg(ap, Pgno);
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoArrayStructure(pDb, pgno, pzVal);
      break;
    }

    case LSM_INFO_ARRAY_PAGES: {
      Pgno pgno = va_arg(ap, Pgno);
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoArrayPages(pDb, pgno, pzVal);
................................................................................

/*
** Attempt to seek the cursor to the database entry specified by pKey/nKey.
** If an error occurs (e.g. an OOM or IO error), return an LSM error code.
** Otherwise, return LSM_OK.
*/
int lsm_csr_seek(lsm_cursor *pCsr, const void *pKey, int nKey, int eSeek){
  return lsmMCursorSeek((MultiCursor *)pCsr, (void *)pKey, nKey, eSeek);
}

int lsm_csr_next(lsm_cursor *pCsr){
  return lsmMCursorNext((MultiCursor *)pCsr);
}

int lsm_csr_prev(lsm_cursor *pCsr){







|







 







|







 







|







435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
...
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
...
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
  int rc;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  rc = lsmWalkFreelist(pDb, 1, infoFreelistCb, &s);
  if( rc!=LSM_OK ){
    lsmFree(pDb->pEnv, s.z);
  }else{
    *pzOut = s.z;
  }

  /* Release the snapshot and return */
................................................................................
      rc = lsmStructList(pDb, pzVal);
      break;
    }

    case LSM_INFO_ARRAY_STRUCTURE: {
      Pgno pgno = va_arg(ap, Pgno);
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoArrayStructure(pDb, 0, pgno, pzVal);
      break;
    }

    case LSM_INFO_ARRAY_PAGES: {
      Pgno pgno = va_arg(ap, Pgno);
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoArrayPages(pDb, pgno, pzVal);
................................................................................

/*
** Attempt to seek the cursor to the database entry specified by pKey/nKey.
** If an error occurs (e.g. an OOM or IO error), return an LSM error code.
** Otherwise, return LSM_OK.
*/
int lsm_csr_seek(lsm_cursor *pCsr, const void *pKey, int nKey, int eSeek){
  return lsmMCursorSeek((MultiCursor *)pCsr, 0, (void *)pKey, nKey, eSeek);
}

int lsm_csr_next(lsm_cursor *pCsr){
  return lsmMCursorNext((MultiCursor *)pCsr);
}

int lsm_csr_prev(lsm_cursor *pCsr){

Changes to src/lsm_shared.c.

176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
...
426
427
428
429
430
431
432

433
434
435
436
437
438
439
440
441
442
443

444
445
446
447
448
449


450
451
452
453
454
455
456
457
458
459
...
472
473
474
475
476
477
478

479
480
481

482
483
484
485
486
487

488



489

490
491
492
493

494



495

496
497
498
499
500
501
502
503

504


505
506
507
508
509
510
511
512
513









































514
515
516
517
518
519
520
...
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
...
559
560
561
562
563
564
565

566
567










568
569
570
571
572
573
574
575
576
577

578
579









580
581
582
583
584
585
586
...
587
588
589
590
591
592
593



594
595
596
597
598
599
600
...
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668

669
670
671
672
673
674
675
...
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710




711
712
713
714
715
716
717
....
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
      rc = lsmTreeLoadHeader(pDb, 0);
      if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
        rc = lsmFlushTreeToDisk(pDb);
      }

      /* Write a checkpoint to disk. */
      if( rc==LSM_OK ){
        rc = lsmCheckpointWrite(pDb, 0);
      }

      /* If the checkpoint was written successfully, delete the log file */
      if( rc==LSM_OK ){
        Database *p = pDb->pDatabase;
        lsmFsCloseAndDeleteLog(pDb->pFS);
        if( p->pFile ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
................................................................................

/* 
** Context object used by the lsmWalkFreelist() utility. 
*/
typedef struct WalkFreelistCtx WalkFreelistCtx;
struct WalkFreelistCtx {
  lsm_db *pDb;

  Freelist *pFreelist;
  int iFree;
  int (*xUsr)(void *, int, i64);  /* User callback function */
  void *pUsrctx;                  /* User callback context */
};

/* 
** Callback used by lsmWalkFreelist().
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;

  Freelist *pFree = p->pFreelist;

  if( pFree ){
    while( (p->iFree < pFree->nEntry) ){
      FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
      if( pEntry->iBlk>iBlk ){


        break;
      }else{
        p->iFree++;
        if( pEntry->iId>=0 
            && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
          ){
          return 1;
        }
        if( pEntry->iBlk==iBlk ) return 0;
      }
................................................................................
** The difference between this function and lsmSortedWalkFreelist() is
** that lsmSortedWalkFreelist() only considers those free-list elements
** stored within the LSM. This function also merges in any in-memory 
** elements.
*/
int lsmWalkFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */

  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){

  int rc;
  int iCtx;

  WalkFreelistCtx ctx[2];

  ctx[0].pDb = pDb;

  ctx[0].pFreelist = &pDb->pWorker->freelist;



  ctx[0].iFree = 0;

  ctx[0].xUsr = walkFreelistCb;
  ctx[0].pUsrctx = (void *)&ctx[1];

  ctx[1].pDb = pDb;

  ctx[1].pFreelist = pDb->pFreelist;



  ctx[1].iFree = 0;

  ctx[1].xUsr = x;
  ctx[1].pUsrctx = pCtx;

  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx[0]);

  for(iCtx=0; iCtx<2; iCtx++){
    int i;
    WalkFreelistCtx *p = &ctx[iCtx];

    for(i=p->iFree; p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry; i++){


      FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
      if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
        return LSM_OK;
      }
    }
  }

  return rc;
}










































typedef struct FindFreeblockCtx FindFreeblockCtx;
struct FindFreeblockCtx {
  i64 iInUse;
  int iRet;
};

................................................................................
  lsmLogMessage(pDb, 0, 
      "findFreeblock(): iInUse=%lld freelist=%s", iInUse, zList);
  lsmFree(pDb->pEnv, zList);
#endif

  ctx.iInUse = iInUse;
  ctx.iRet = 0;
  rc = lsmWalkFreelist(pDb, findFreeblockCb, (void *)&ctx);
  *piRet = ctx.iRet;

#ifdef LSM_LOG_BLOCKS
  lsmLogMessage(pDb, 0, "findFreeblock(): returning block %d", *piRet);
#endif
  return rc;
}
................................................................................
** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
*/
int lsmBlockAllocate(lsm_db *pDb, int *piBlk){
  Snapshot *p = pDb->pWorker;
  int iRet = 0;                   /* Block number of allocated block */
  int rc = LSM_OK;
  i64 iInUse = 0;                 /* Snapshot id still in use */


  assert( p );











  /* Set iInUse to the smallest snapshot id that is either:
  **
  **   * Currently in use by a database client,
  **   * May be used by a database client in the future, or
  **   * Is the most recently checkpointed snapshot (i.e. the one that will
  **     be used following recovery if a failure occurs at this point).
  */
  rc = lsmCheckpointSynced(pDb, &iInUse, 0, 0);
  if( rc==LSM_OK && iInUse==0 ) iInUse = p->iId;

  if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
  if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);










  /* Query the free block list for a suitable block */
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
................................................................................
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, 
          "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
#endif
      rc = freelistAppend(pDb, iRet, -1);



    }else{
      iRet = ++(p->nBlock);
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
    }
  }
................................................................................
** database itself.
**
** The WORKER lock must not be held when this is called. This is because
** this function may indirectly call fsync(). And the WORKER lock should
** not be held that long (in case it is required by a client flushing an
** in-memory tree to disk).
*/
int lsmCheckpointWrite(lsm_db *pDb, u32 *pnWrite){
  int rc;                         /* Return Code */
  u32 nWrite = 0;

  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

  rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
  if( rc!=LSM_OK ) return rc;

  rc = lsmCheckpointLoad(pDb, 0);
  if( rc==LSM_OK ){

    ShmHeader *pShm = pDb->pShmhdr;
    int bDone = 0;                /* True if checkpoint is already stored */

    /* Check if this checkpoint has already been written to the database
    ** file. If so, set variable bDone to true.  */
    if( pShm->iMetaPage ){
      MetaPage *pPg;              /* Meta page */
................................................................................
      }
      bDone = (iDisk>=iCkpt);
    }

    if( rc==LSM_OK && bDone==0 ){
      int iMeta = (pShm->iMetaPage % 2) + 1;
      if( pDb->eSafety!=LSM_SAFETY_OFF ){
        int nBlock = lsmCheckpointNBlock(pDb->aSnapshot);
        rc = lsmFsSyncDb(pDb->pFS, nBlock);
      }
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS, 0);
      }
      if( rc==LSM_OK ){
        pShm->iMetaPage = iMeta;
        nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
      }
#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, 0, "finish checkpoint %d", 
      (int)lsmCheckpointId(pDb->aSnapshot, 0)
  );
#endif
    }




  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
  return rc;
}

................................................................................

int lsm_checkpoint(lsm_db *pDb, int *pnByte){
  int rc;                         /* Return code */
  u32 nWrite = 0;                 /* Number of pages checkpointed */

  /* Attempt the checkpoint. If successful, nWrite is set to the number of
  ** pages written between this and the previous checkpoint.  */
  rc = lsmCheckpointWrite(pDb, &nWrite);

  /* If required, calculate the output variable (bytes of data checkpointed). 
  ** Set it to zero if an error occured.  */
  if( pnByte ){
    int nByte = 0;
    if( rc==LSM_OK && nWrite ){
      nByte = (int)nWrite * lsmFsPageSize(pDb->pFS);







|







 







>











>



|

|
>
>


|







 







>



>






>

>
>
>
|
>




>

>
>
>
|
>



|




>
|
>
>









>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|







 







>


>
>
>
>
>
>
>
>
>
>








|
|
>


>
>
>
>
>
>
>
>
>







 







>
>
>







 







|












>







 







<











|
|
|


>
>
>
>







 







|







176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
...
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
...
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
...
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
...
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
...
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
...
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
...
772
773
774
775
776
777
778

779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
....
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
      rc = lsmTreeLoadHeader(pDb, 0);
      if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
        rc = lsmFlushTreeToDisk(pDb);
      }

      /* Write a checkpoint to disk. */
      if( rc==LSM_OK ){
        rc = lsmCheckpointWrite(pDb, 1, 0);
      }

      /* If the checkpoint was written successfully, delete the log file */
      if( rc==LSM_OK ){
        Database *p = pDb->pDatabase;
        lsmFsCloseAndDeleteLog(pDb->pFS);
        if( p->pFile ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
................................................................................

/* 
** Context object used by the lsmWalkFreelist() utility. 
*/
typedef struct WalkFreelistCtx WalkFreelistCtx;
struct WalkFreelistCtx {
  lsm_db *pDb;
  int bReverse;
  Freelist *pFreelist;
  int iFree;
  int (*xUsr)(void *, int, i64);  /* User callback function */
  void *pUsrctx;                  /* User callback context */
};

/* 
** Callback used by lsmWalkFreelist().
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  const int iDir = (p->bReverse ? -1 : 1);
  Freelist *pFree = p->pFreelist;

  if( pFree ){
    while( (p->iFree < pFree->nEntry) && p->iFree>=0 ){
      FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
      if( (p->bReverse==0 && pEntry->iBlk>iBlk)
       || (p->bReverse!=0 && pEntry->iBlk<iBlk)
      ){
        break;
      }else{
        p->iFree += iDir;
        if( pEntry->iId>=0 
            && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
          ){
          return 1;
        }
        if( pEntry->iBlk==iBlk ) return 0;
      }
................................................................................
** The difference between this function and lsmSortedWalkFreelist() is
** that lsmSortedWalkFreelist() only considers those free-list elements
** stored within the LSM. This function also merges in any in-memory 
** elements.
*/
int lsmWalkFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  int bReverse,                   /* True to iterate from largest to smallest */
  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){
  const int iDir = (bReverse ? -1 : 1);
  int rc;
  int iCtx;

  WalkFreelistCtx ctx[2];

  ctx[0].pDb = pDb;
  ctx[0].bReverse = bReverse;
  ctx[0].pFreelist = &pDb->pWorker->freelist;
  if( ctx[0].pFreelist && bReverse ){
    ctx[0].iFree = ctx[0].pFreelist->nEntry-1;
  }else{
    ctx[0].iFree = 0;
  }
  ctx[0].xUsr = walkFreelistCb;
  ctx[0].pUsrctx = (void *)&ctx[1];

  ctx[1].pDb = pDb;
  ctx[1].bReverse = bReverse;
  ctx[1].pFreelist = pDb->pFreelist;
  if( ctx[1].pFreelist && bReverse ){
    ctx[1].iFree = ctx[1].pFreelist->nEntry-1;
  }else{
    ctx[1].iFree = 0;
  }
  ctx[1].xUsr = x;
  ctx[1].pUsrctx = pCtx;

  rc = lsmSortedWalkFreelist(pDb, bReverse, walkFreelistCb, (void *)&ctx[0]);

  for(iCtx=0; iCtx<2; iCtx++){
    int i;
    WalkFreelistCtx *p = &ctx[iCtx];
    for(i=p->iFree; 
        p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry && i>=0;
        i += iDir
    ){
      FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
      if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
        return LSM_OK;
      }
    }
  }

  return rc;
}

typedef struct DbTruncateCtx DbTruncateCtx;
struct DbTruncateCtx {
  int nBlock;
  i64 iInUse;
};

static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){
  DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
  if( iBlk!=p->nBlock || iSnapshot>=p->iInUse ) return 1;
  p->nBlock--;
  return 0;
}

static int dbTruncate(lsm_db *pDb, i64 iInUse){
  int rc;
  int i;
  DbTruncateCtx ctx;

  assert( pDb->pWorker );
  ctx.nBlock = pDb->pWorker->nBlock;
  ctx.iInUse = iInUse;

  rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
  for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){
    rc = freelistAppend(pDb, i, -1);
  }

  if( rc==LSM_OK ){
#ifdef LSM_LOG_FREELIST
    if( ctx.nBlock!=pDb->pWorker->nBlock ){
      lsmLogMessage(pDb, 0, 
          "dbTruncate(): truncated db to %d blocks",ctx.nBlock
      );
    }
#endif
    pDb->pWorker->nBlock = ctx.nBlock;
  }
  return rc;
}


typedef struct FindFreeblockCtx FindFreeblockCtx;
struct FindFreeblockCtx {
  i64 iInUse;
  int iRet;
};

................................................................................
  lsmLogMessage(pDb, 0, 
      "findFreeblock(): iInUse=%lld freelist=%s", iInUse, zList);
  lsmFree(pDb->pEnv, zList);
#endif

  ctx.iInUse = iInUse;
  ctx.iRet = 0;
  rc = lsmWalkFreelist(pDb, 0, findFreeblockCb, (void *)&ctx);
  *piRet = ctx.iRet;

#ifdef LSM_LOG_BLOCKS
  lsmLogMessage(pDb, 0, "findFreeblock(): returning block %d", *piRet);
#endif
  return rc;
}
................................................................................
** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
*/
int lsmBlockAllocate(lsm_db *pDb, int *piBlk){
  Snapshot *p = pDb->pWorker;
  int iRet = 0;                   /* Block number of allocated block */
  int rc = LSM_OK;
  i64 iInUse = 0;                 /* Snapshot id still in use */
  i64 iSynced = 0;                /* Snapshot id synced to disk */

  assert( p );

#ifdef LSM_LOG_FREELIST
  {
    char *zFree = 0;
    rc = lsmInfoFreelist(pDb, &zFree);
    if( rc!=LSM_OK ) return rc;
    lsmLogMessage(pDb, 0, "lsmBlockAllocate(): freelist: %s", zFree);
    lsmFree(pDb->pEnv, zFree);
  }
#endif

  /* Set iInUse to the smallest snapshot id that is either:
  **
  **   * Currently in use by a database client,
  **   * May be used by a database client in the future, or
  **   * Is the most recently checkpointed snapshot (i.e. the one that will
  **     be used following recovery if a failure occurs at this point).
  */
  rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
  if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
  iInUse = iSynced;
  if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
  if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);

#ifdef LSM_LOG_FREELIST
  {
    lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
        "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)", 
        iInUse, iSynced, (pDb->pClient ? pDb->pClient->iId : 0)
    );
  }
#endif

  /* Query the free block list for a suitable block */
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
................................................................................
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, 
          "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
#endif
      rc = freelistAppend(pDb, iRet, -1);
      if( rc==LSM_OK ){
        rc = dbTruncate(pDb, iInUse);
      }
    }else{
      iRet = ++(p->nBlock);
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
    }
  }
................................................................................
** database itself.
**
** The WORKER lock must not be held when this is called. This is because
** this function may indirectly call fsync(). And the WORKER lock should
** not be held that long (in case it is required by a client flushing an
** in-memory tree to disk).
*/
int lsmCheckpointWrite(lsm_db *pDb, int bTruncate, u32 *pnWrite){
  int rc;                         /* Return Code */
  u32 nWrite = 0;

  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

  rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
  if( rc!=LSM_OK ) return rc;

  rc = lsmCheckpointLoad(pDb, 0);
  if( rc==LSM_OK ){
    int nBlock = lsmCheckpointNBlock(pDb->aSnapshot);
    ShmHeader *pShm = pDb->pShmhdr;
    int bDone = 0;                /* True if checkpoint is already stored */

    /* Check if this checkpoint has already been written to the database
    ** file. If so, set variable bDone to true.  */
    if( pShm->iMetaPage ){
      MetaPage *pPg;              /* Meta page */
................................................................................
      }
      bDone = (iDisk>=iCkpt);
    }

    if( rc==LSM_OK && bDone==0 ){
      int iMeta = (pShm->iMetaPage % 2) + 1;
      if( pDb->eSafety!=LSM_SAFETY_OFF ){

        rc = lsmFsSyncDb(pDb->pFS, nBlock);
      }
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS, 0);
      }
      if( rc==LSM_OK ){
        pShm->iMetaPage = iMeta;
        nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
      }
#ifdef LSM_LOG_WORK
      lsmLogMessage(pDb, 0, "finish checkpoint %d", 
          (int)lsmCheckpointId(pDb->aSnapshot, 0)
      );
#endif
    }

    if( rc==LSM_OK && bTruncate ){
      rc = lsmFsTruncateDb(pDb->pFS, (i64)nBlock*lsmFsBlockSize(pDb->pFS));
    }
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
  return rc;
}

................................................................................

int lsm_checkpoint(lsm_db *pDb, int *pnByte){
  int rc;                         /* Return code */
  u32 nWrite = 0;                 /* Number of pages checkpointed */

  /* Attempt the checkpoint. If successful, nWrite is set to the number of
  ** pages written between this and the previous checkpoint.  */
  rc = lsmCheckpointWrite(pDb, 0, &nWrite);

  /* If required, calculate the output variable (bytes of data checkpointed). 
  ** Set it to zero if an error occured.  */
  if( pnByte ){
    int nByte = 0;
    if( rc==LSM_OK && nWrite ){
      nByte = (int)nWrite * lsmFsPageSize(pDb->pFS);

Changes to src/lsm_sorted.c.

1566
1567
1568
1569
1570
1571
1572

1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
....
1702
1703
1704
1705
1706
1707
1708

1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
....
1783
1784
1785
1786
1787
1788
1789

1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
....
1830
1831
1832
1833
1834
1835
1836

1837
1838
1839
1840
1841
1842
1843
....
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863

1864

1865
1866
1867
1868
1869
1870
1871
1872
1873
1874

1875

1876
1877
1878
1879
1880
1881
1882
....
2384
2385
2386
2387
2388
2389
2390


2391
2392
2393
2394
2395
2396

2397
2398
2399
2400
2401
2402
2403
....
2408
2409
2410
2411
2412
2413
2414

2415




2416
2417
2418
2419
2420
2421
2422
....
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
....
2646
2647
2648
2649
2650
2651
2652

2653

2654
2655
2656
2657
2658
2659
2660
....
2745
2746
2747
2748
2749
2750
2751



2752


2753
2754
2755
2756
2757



2758
2759
2760
2761
2762
2763
2764
....
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
....
4357
4358
4359
4360
4361
4362
4363

4364

4365
4366
4367
4368
4369
4370
4371
....
4930
4931
4932
4933
4934
4935
4936
4937
4938
4939
4940
4941
4942
4943

4944
4945
4946
4947
4948









4949
4950
4951
4952
4953
4954
4955
....
5285
5286
5287
5288
5289
5290
5291








5292
5293
5294
5295
5296
5297
5298
5299
5300
5301
5302
5303
5304
5305
5306
5307
5308
5309
5310
5311
5312
5313
5314
5315
5316
5317
5318
5319
5320
5321
5322
5323
5324
5325
  *piPtr = iOut;
  return rc;
}

static int segmentPtrSeek(
  MultiCursor *pCsr,              /* Cursor context */
  SegmentPtr *pPtr,               /* Pointer to seek */

  void *pKey, int nKey,           /* Key to seek to */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  Pgno iPtrOut = 0;
  const int iTopic = 0;

  /* If the current page contains an oversized entry, then there are no
  ** pointers to one or more of the subsequent pages in the sorted run.
  ** The following call ensures that the segment-ptr points to the correct 
  ** page in this case.  */
  rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey);
  iPtrOut = pPtr->iPtr;
................................................................................
  *piPtr = iPtrOut;
  return rc;
}

static int seekInBtree(
  MultiCursor *pCsr,              /* Multi-cursor object */
  Segment *pSeg,                  /* Seek within this segment */

  void *pKey, int nKey,           /* Key to seek to */
  Pgno *aPg,                      /* OUT: Page numbers */
  Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
){
  int i = 0;
  int rc;
  int iPg;
  Page *pPg = 0;
  Blob blob = {0, 0, 0};
  int iTopic = 0;                 /* TODO: Fix me */

  iPg = pSeg->iRoot;
  do {
    Pgno *piFirst = 0;
    if( aPg ){
      aPg[i++] = iPg;
      piFirst = &aPg[i];
................................................................................
  }
  return rc;
}

static int seekInSegment(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr,

  void *pKey, int nKey,
  int iPg,                        /* Page to search */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop                     /* OUT: Stop search flag */
){
  int iPtr = iPg;
  int rc = LSM_OK;

  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
    rc = seekInBtree(pCsr, pPtr->pSeg, pKey, nKey, 0, &pPg);
    if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
  }else{
    if( iPtr==0 ){
      iPtr = pPtr->pSeg->iFirst;
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr, pbStop);
  }
  return rc;
}

/*
** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
**
................................................................................
**   In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
**   and pCsr->val blobs populated before returning.
*/
static int seekInLevel(
  MultiCursor *pCsr,              /* Sorted cursor object to seek */
  SegmentPtr *aPtr,               /* Pointer to array of (nRhs+1) SPs */
  int eSeek,                      /* Search bias - see above */

  void *pKey, int nKey,           /* Key to search for */
  Pgno *piPgno,                   /* IN/OUT: fraction cascade pointer (or 0) */
  int *pbStop                     /* OUT: See above */
){
  Level *pLvl = aPtr[0].pLevel;   /* Level to seek within */
  int rc = LSM_OK;                /* Return code */
  int iOut = 0;                   /* Pointer to return to caller */
................................................................................
  int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */
  int bStop = 0;

  /* If this is a composite level (one currently undergoing an incremental
  ** merge), figure out if the search key is larger or smaller than the
  ** levels split-key.  */
  if( nRhs ){
    res = sortedKeyCompare(pCsr->pDb->xCmp, 0, pKey, nKey, 
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
    );
  }

  /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  ** is not a composite level and there is no split-key). Search the 
  ** left-hand-side of the level in this case.  */
  if( res<0 ){
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;


    rc = seekInSegment(pCsr, &aPtr[0], pKey, nKey, iPtr, eSeek, &iOut, &bStop);

    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      res = 0;
    }
  }
  
  if( res>=0 ){
    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
      iOut = 0;

      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut,&bStop);

      iPtr = iOut;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }
................................................................................
    }
  }

  assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
  return rc;
}



/*
** This function is called by worker connections to walk the part of the
** free-list stored within the LSM data structure.
*/
int lsmSortedWalkFreelist(
  lsm_db *pDb,                    /* Database handle */

  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){
  MultiCursor *pCsr;              /* Cursor used to read db */
  int rc = LSM_OK;                /* Return Code */
  Snapshot *pSnap = 0;

................................................................................
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    rc = multiCursorAddAll(pCsr, pSnap);
    pCsr->flags |= CURSOR_IGNORE_DELETE;
  }
  
  if( rc==LSM_OK ){

    rc = lsmMCursorLast(pCsr);




    while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
      void *pKey; int nKey;
      void *pVal; int nVal;

      rc = lsmMCursorKey(pCsr, &pKey, &nKey);
      if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;
................................................................................

      if( rc==LSM_OK ){
        int iBlk;
        i64 iSnap;
        iBlk = (int)(~(lsmGetU32((u8 *)pKey)));
        iSnap = (i64)lsmGetU64((u8 *)pVal);
        if( x(pCtx, iBlk, iSnap) ) break;
        rc = lsmMCursorPrev(pCsr);
      }
    }
  }

  lsmMCursorClose(pCsr);
  lsmFreeSnapshot(pDb->pEnv, pSnap);

................................................................................
  return rc;
}

int mcursorRestore(lsm_db *pDb, MultiCursor *pCsr){
  int rc;
  rc = multiCursorInit(pCsr, pDb->pClient);
  if( rc==LSM_OK && pCsr->key.pData ){

    rc = lsmMCursorSeek(pCsr, pCsr->key.pData, pCsr->key.nData, +1);

  }
  return rc;
}

int lsmSaveCursors(lsm_db *pDb){
  int rc = LSM_OK;
  MultiCursor *pCsr;
................................................................................
  return rc;
}


/*
** Seek the cursor.
*/



int lsmMCursorSeek(MultiCursor *pCsr, void *pKey, int nKey, int eSeek){


  int eESeek = eSeek;             /* Effective eSeek parameter */
  int bStop = 0;                  /* Set to true to halt search operation */
  int rc = LSM_OK;                /* Return code */
  int iPtr = 0;                   /* Used to iterate through pCsr->aPtr[] */
  Pgno iPgno = 0;                 /* FC pointer value */




  if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;

  assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
  assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 );
  assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );

................................................................................
    rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
  }

  /* Seek all segment pointers. */
  for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
    SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
    assert( pPtr->pSeg==&pPtr->pLevel->lhs );
    rc = seekInLevel(pCsr, pPtr, eESeek, pKey, nKey, &iPgno, &bStop);
    iPtr += pPtr->pLevel->nRight;
  }

  if( eSeek!=LSM_SEEK_EQ ){
    if( rc==LSM_OK ){
      rc = multiCursorAllocTree(pCsr);
    }
................................................................................
    ** a key equal to the one multi-cursor currently points to. Record the
    ** page number of each b-tree page and the leaf. The segment may be
    ** gobbled up to (but not including) the first of these page numbers.
    */
    assert( pSeg->iRoot>0 );
    aPg = lsmMallocZeroRc(pDb->pEnv, sizeof(Pgno)*32, &rc);
    if( rc==LSM_OK ){

      rc = seekInBtree(pCsr, pSeg, pCsr->key.pData, pCsr->key.nData, aPg, 0); 

    }

    if( rc==LSM_OK ){
      for(nPg=0; aPg[nPg]; nPg++);
      lsmFsGobble(pDb, pSeg, aPg, nPg);
    }

................................................................................
  lsmFree(pEnv, z1);
  lsmFree(pEnv, z2);

  return z;
}

static int fileToString(
  lsm_env *pEnv,                  /* For xMalloc() */
  char *aBuf, 
  int nBuf, 
  int nMin,
  Segment *pSeg
){
  int i = 0;

  char *zSeg;

  zSeg = segToString(pEnv, pSeg, nMin);
  i += sqlite4_snprintf(&aBuf[i], nBuf-i, "%s", zSeg);
  lsmFree(pEnv, zSeg);










  return i;
}

void sortedDumpPage(lsm_db *pDb, Segment *pRun, Page *pPg, int bVals){
  Blob blob = {0, 0, 0};         /* Blob used for keys */
  LsmString s;
................................................................................

      Segment *pSeg = &pLevel->lhs;
      aLeft[nLeft++] = pSeg;

      for(i=0; i<pLevel->nRight; i++){
        aRight[nRight++] = &pLevel->aRhs[i];
      }









      for(i=0; i<nLeft || i<nRight; i++){
        int iPad = 0;
        char zLevel[32];
        zLeft[0] = '\0';
        zRight[0] = '\0';

        if( i<nLeft ){ 
          fileToString(pDb->pEnv, zLeft, sizeof(zLeft), 28, aLeft[i]); 
        }
        if( i<nRight ){ 
          fileToString(pDb->pEnv, zRight, sizeof(zRight), 28, aRight[i]); 
        }

        if( i==0 ){
          sqlite4_snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d) (flags=%.4x)",
              iLevel, (int)pLevel->iAge, (int)pLevel->flags
          );
        }else{
          zLevel[0] = '\0';
        }

        if( nRight==0 ){
          iPad = 28 - (strlen(zLeft)/2) ;
        }

        lsmLogMessage(pDb, LSM_OK, "% 7s % *s% -35s %s", 
            zLevel, iPad, "", zLeft, zRight
        );
      }

      iLevel++;
    }








>











<







 







>









<







 







>












|











|







 







>







 







|











>
|
>










>
|
>







 







>
>






>







 







>
|
>
>
>
>







 







|







 







>
|
>







 







>
>
>
|
>
>





>
>
>







 







|







 







>
|
>







 







|






>
|

|
|
|
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>








|


|











|


|







1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584

1585
1586
1587
1588
1589
1590
1591
....
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718

1719
1720
1721
1722
1723
1724
1725
....
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
....
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
....
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
....
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
....
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
....
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
....
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
....
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
....
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
....
4381
4382
4383
4384
4385
4386
4387
4388
4389
4390
4391
4392
4393
4394
4395
4396
4397
....
4956
4957
4958
4959
4960
4961
4962
4963
4964
4965
4966
4967
4968
4969
4970
4971
4972
4973
4974
4975
4976
4977
4978
4979
4980
4981
4982
4983
4984
4985
4986
4987
4988
4989
4990
4991
....
5321
5322
5323
5324
5325
5326
5327
5328
5329
5330
5331
5332
5333
5334
5335
5336
5337
5338
5339
5340
5341
5342
5343
5344
5345
5346
5347
5348
5349
5350
5351
5352
5353
5354
5355
5356
5357
5358
5359
5360
5361
5362
5363
5364
5365
5366
5367
5368
5369
  *piPtr = iOut;
  return rc;
}

static int segmentPtrSeek(
  MultiCursor *pCsr,              /* Cursor context */
  SegmentPtr *pPtr,               /* Pointer to seek */
  int iTopic,                     /* Key topic to seek to */
  void *pKey, int nKey,           /* Key to seek to */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  Pgno iPtrOut = 0;


  /* If the current page contains an oversized entry, then there are no
  ** pointers to one or more of the subsequent pages in the sorted run.
  ** The following call ensures that the segment-ptr points to the correct 
  ** page in this case.  */
  rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey);
  iPtrOut = pPtr->iPtr;
................................................................................
  *piPtr = iPtrOut;
  return rc;
}

static int seekInBtree(
  MultiCursor *pCsr,              /* Multi-cursor object */
  Segment *pSeg,                  /* Seek within this segment */
  int iTopic,
  void *pKey, int nKey,           /* Key to seek to */
  Pgno *aPg,                      /* OUT: Page numbers */
  Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
){
  int i = 0;
  int rc;
  int iPg;
  Page *pPg = 0;
  Blob blob = {0, 0, 0};


  iPg = pSeg->iRoot;
  do {
    Pgno *piFirst = 0;
    if( aPg ){
      aPg[i++] = iPg;
      piFirst = &aPg[i];
................................................................................
  }
  return rc;
}

static int seekInSegment(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr,
  int iTopic,
  void *pKey, int nKey,
  int iPg,                        /* Page to search */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop                     /* OUT: Stop search flag */
){
  int iPtr = iPg;
  int rc = LSM_OK;

  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
    rc = seekInBtree(pCsr, pPtr->pSeg, iTopic, pKey, nKey, 0, &pPg);
    if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
  }else{
    if( iPtr==0 ){
      iPtr = pPtr->pSeg->iFirst;
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
    rc = segmentPtrSeek(pCsr, pPtr, iTopic, pKey, nKey, eSeek, piPtr, pbStop);
  }
  return rc;
}

/*
** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
**
................................................................................
**   In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
**   and pCsr->val blobs populated before returning.
*/
static int seekInLevel(
  MultiCursor *pCsr,              /* Sorted cursor object to seek */
  SegmentPtr *aPtr,               /* Pointer to array of (nRhs+1) SPs */
  int eSeek,                      /* Search bias - see above */
  int iTopic,                     /* Key topic to search for */
  void *pKey, int nKey,           /* Key to search for */
  Pgno *piPgno,                   /* IN/OUT: fraction cascade pointer (or 0) */
  int *pbStop                     /* OUT: See above */
){
  Level *pLvl = aPtr[0].pLevel;   /* Level to seek within */
  int rc = LSM_OK;                /* Return code */
  int iOut = 0;                   /* Pointer to return to caller */
................................................................................
  int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */
  int bStop = 0;

  /* If this is a composite level (one currently undergoing an incremental
  ** merge), figure out if the search key is larger or smaller than the
  ** levels split-key.  */
  if( nRhs ){
    res = sortedKeyCompare(pCsr->pDb->xCmp, iTopic, pKey, nKey, 
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
    );
  }

  /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  ** is not a composite level and there is no split-key). Search the 
  ** left-hand-side of the level in this case.  */
  if( res<0 ){
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;

    rc = seekInSegment(
        pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
    );
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      res = 0;
    }
  }
  
  if( res>=0 ){
    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
      iOut = 0;
      rc = seekInSegment(
          pCsr, &aPtr[i], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
      );
      iPtr = iOut;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }
................................................................................
    }
  }

  assert( rc==LSM_OK || (*ppVal==0 && *pnVal==0) );
  return rc;
}

static int multiCursorAdvance(MultiCursor *pCsr, int bReverse);

/*
** This function is called by worker connections to walk the part of the
** free-list stored within the LSM data structure.
*/
int lsmSortedWalkFreelist(
  lsm_db *pDb,                    /* Database handle */
  int bReverse,                   /* True to iterate from largest to smallest */
  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){
  MultiCursor *pCsr;              /* Cursor used to read db */
  int rc = LSM_OK;                /* Return Code */
  Snapshot *pSnap = 0;

................................................................................
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    rc = multiCursorAddAll(pCsr, pSnap);
    pCsr->flags |= CURSOR_IGNORE_DELETE;
  }
  
  if( rc==LSM_OK ){
    if( bReverse==0 ){
      rc = lsmMCursorLast(pCsr);
    }else{
      rc = lsmMCursorSeek(pCsr, 1, "", 0, LSM_SEEK_GE);
    }

    while( rc==LSM_OK && lsmMCursorValid(pCsr) && rtIsSystem(pCsr->eType) ){
      void *pKey; int nKey;
      void *pVal; int nVal;

      rc = lsmMCursorKey(pCsr, &pKey, &nKey);
      if( rc==LSM_OK ) rc = lsmMCursorValue(pCsr, &pVal, &nVal);
      if( rc==LSM_OK && (nKey!=4 || nVal!=8) ) rc = LSM_CORRUPT_BKPT;
................................................................................

      if( rc==LSM_OK ){
        int iBlk;
        i64 iSnap;
        iBlk = (int)(~(lsmGetU32((u8 *)pKey)));
        iSnap = (i64)lsmGetU64((u8 *)pVal);
        if( x(pCtx, iBlk, iSnap) ) break;
        rc = multiCursorAdvance(pCsr, !bReverse);
      }
    }
  }

  lsmMCursorClose(pCsr);
  lsmFreeSnapshot(pDb->pEnv, pSnap);

................................................................................
  return rc;
}

int mcursorRestore(lsm_db *pDb, MultiCursor *pCsr){
  int rc;
  rc = multiCursorInit(pCsr, pDb->pClient);
  if( rc==LSM_OK && pCsr->key.pData ){
    rc = lsmMCursorSeek(pCsr, 
         rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, +1
    );
  }
  return rc;
}

int lsmSaveCursors(lsm_db *pDb){
  int rc = LSM_OK;
  MultiCursor *pCsr;
................................................................................
  return rc;
}


/*
** Seek the cursor.
*/
int lsmMCursorSeek(
  MultiCursor *pCsr, 
  int iTopic, 
  void *pKey, int nKey, 
  int eSeek
){
  int eESeek = eSeek;             /* Effective eSeek parameter */
  int bStop = 0;                  /* Set to true to halt search operation */
  int rc = LSM_OK;                /* Return code */
  int iPtr = 0;                   /* Used to iterate through pCsr->aPtr[] */
  Pgno iPgno = 0;                 /* FC pointer value */

  assert( pCsr->apTreeCsr[0]==0 || iTopic==0 );
  assert( pCsr->apTreeCsr[1]==0 || iTopic==0 );

  if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;

  assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
  assert( (pCsr->flags & CURSOR_FLUSH_FREELIST)==0 );
  assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );

................................................................................
    rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
  }

  /* Seek all segment pointers. */
  for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
    SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
    assert( pPtr->pSeg==&pPtr->pLevel->lhs );
    rc = seekInLevel(pCsr, pPtr, eESeek, iTopic, pKey, nKey, &iPgno, &bStop);
    iPtr += pPtr->pLevel->nRight;
  }

  if( eSeek!=LSM_SEEK_EQ ){
    if( rc==LSM_OK ){
      rc = multiCursorAllocTree(pCsr);
    }
................................................................................
    ** a key equal to the one multi-cursor currently points to. Record the
    ** page number of each b-tree page and the leaf. The segment may be
    ** gobbled up to (but not including) the first of these page numbers.
    */
    assert( pSeg->iRoot>0 );
    aPg = lsmMallocZeroRc(pDb->pEnv, sizeof(Pgno)*32, &rc);
    if( rc==LSM_OK ){
      rc = seekInBtree(pCsr, pSeg, 
          rtTopic(pCsr->eType), pCsr->key.pData, pCsr->key.nData, aPg, 0
      ); 
    }

    if( rc==LSM_OK ){
      for(nPg=0; aPg[nPg]; nPg++);
      lsmFsGobble(pDb, pSeg, aPg, nPg);
    }

................................................................................
  lsmFree(pEnv, z1);
  lsmFree(pEnv, z2);

  return z;
}

static int fileToString(
  lsm_db *pDb,                    /* For xMalloc() */
  char *aBuf, 
  int nBuf, 
  int nMin,
  Segment *pSeg
){
  int i = 0;
  if( pSeg ){
    char *zSeg;

    zSeg = segToString(pDb->pEnv, pSeg, nMin);
    i += sqlite4_snprintf(&aBuf[i], nBuf-i, "%s", zSeg);
    lsmFree(pDb->pEnv, zSeg);

#ifdef LSM_LOG_FREELIST
    lsmInfoArrayStructure(pDb, 1, pSeg->iFirst, &zSeg);
    i += sqlite4_snprintf(&aBuf[i], nBuf-i, "    (%s)", zSeg);
    lsmFree(pDb->pEnv, zSeg);
#endif
  }else{
    aBuf[0] = '\0';
  }

  return i;
}

void sortedDumpPage(lsm_db *pDb, Segment *pRun, Page *pPg, int bVals){
  Blob blob = {0, 0, 0};         /* Blob used for keys */
  LsmString s;
................................................................................

      Segment *pSeg = &pLevel->lhs;
      aLeft[nLeft++] = pSeg;

      for(i=0; i<pLevel->nRight; i++){
        aRight[nRight++] = &pLevel->aRhs[i];
      }

#ifdef LSM_LOG_FREELIST
      if( nRight ){
        memmove(&aRight[1], aRight, sizeof(aRight[0])*nRight);
        aRight[0] = 0;
        nRight++;
      }
#endif

      for(i=0; i<nLeft || i<nRight; i++){
        int iPad = 0;
        char zLevel[32];
        zLeft[0] = '\0';
        zRight[0] = '\0';

        if( i<nLeft ){ 
          fileToString(pDb, zLeft, sizeof(zLeft), 24, aLeft[i]); 
        }
        if( i<nRight ){ 
          fileToString(pDb, zRight, sizeof(zRight), 24, aRight[i]); 
        }

        if( i==0 ){
          sqlite4_snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d) (flags=%.4x)",
              iLevel, (int)pLevel->iAge, (int)pLevel->flags
          );
        }else{
          zLevel[0] = '\0';
        }

        if( nRight==0 ){
          iPad = 10;
        }

        lsmLogMessage(pDb, LSM_OK, "% 25s % *s% -35s %s", 
            zLevel, iPad, "", zLeft, zRight
        );
      }

      iLevel++;
    }