SQLite4
Check-in [f512ea3c4d]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove dead code. Fix a read-lock related problem causing the multi-threaded tests to fail.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | freelist-rework
Files: files | file ages | folders
SHA1: f512ea3c4d806460fd3b384f4cb02af3fa49473e
User & Date: dan 2012-10-31 18:46:38
Context
2012-10-31
19:27
Fix a crash in the check-blocks assert that may occur following an OOM condition. Leaf check-in: 503f49b0cc user: dan tags: freelist-rework
18:46
Remove dead code. Fix a read-lock related problem causing the multi-threaded tests to fail. check-in: f512ea3c4d user: dan tags: freelist-rework
16:37
Fix a compressed mode bug unrelated to the free block list. check-in: 6bf6b00b8b user: dan tags: freelist-rework
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_tdb3.c.

894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 "
    "max_freelist=4 autocheckpoint=32768 "
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_zip_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 "
    "max_freelist=4 autocheckpoint=32768 compression=1"
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){







|












|







894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 "
    "max_freelist=2 autocheckpoint=32768 "
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_zip_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 "
    "max_freelist=2 autocheckpoint=32768 compression=1"
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){

Changes to src/lsmInt.h.

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
...
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
...
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
...
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802

/* "mmap" mode is currently only used in environments with 64-bit address 
** spaces. The following macro is used to test for this.  */
#define LSM_IS_64_BIT (sizeof(void*)==8)

#define LSM_AUTOWORK_QUANT 32

/* Minimum number of free-list entries to store in the checkpoint, assuming
** the free-list contains this many entries. i.e. if overflow is required,
** the first LSM_CKPT_MIN_FREELIST entries are stored in the checkpoint and
** the remainder in an LSM system entry.  */
#define LSM_CKPT_MIN_FREELIST     6
#define LSM_CKPT_MAX_REFREE       2
#define LSM_CKPT_MIN_NONLSM       (LSM_CKPT_MIN_FREELIST - LSM_CKPT_MAX_REFREE)

typedef struct Database Database;
typedef struct DbLog DbLog;
typedef struct FileSystem FileSystem;
typedef struct Freelist Freelist;
typedef struct FreelistEntry FreelistEntry;
typedef struct Level Level;
typedef struct LogMark LogMark;
................................................................................
  i64 iId;                        /* Snapshot id */
  i64 iLogOff;                    /* Log file offset */

  /* Used by worker snapshots only */
  int nBlock;                     /* Number of blocks in database file */
  Pgno aiAppend[LSM_APPLIST_SZ];  /* Append point list */
  Freelist freelist;              /* Free block list */
  int nFreelistOvfl;              /* Number of extra free-list entries in LSM */
  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *, u32 *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);
int lsmCheckpointOverflowRequired(lsm_db *pDb);
int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

int lsmCheckpointLoad(lsm_db *pDb, int *);
................................................................................
u32 lsmCheckpointNWrite(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
int lsmDatabaseFull(lsm_db *pDb);
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite);


/* 
** Functions from file "lsm_tree.c".
*/
................................................................................
void lsmDbDatabaseRelease(lsm_db *);

int lsmBeginReadTrans(lsm_db *);
int lsmBeginWriteTrans(lsm_db *);
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int);
int lsmFinishFlush(lsm_db *, int);

int lsmSnapshotSetFreelist(lsm_db *, int *, int);







<
<
<
<
<
<
<
<







 







<











<
<
<
<







 







|







 







|







58
59
60
61
62
63
64








65
66
67
68
69
70
71
...
489
490
491
492
493
494
495

496
497
498
499
500
501
502
503
504
505
506




507
508
509
510
511
512
513
...
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
...
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789

/* "mmap" mode is currently only used in environments with 64-bit address 
** spaces. The following macro is used to test for this.  */
#define LSM_IS_64_BIT (sizeof(void*)==8)

#define LSM_AUTOWORK_QUANT 32









typedef struct Database Database;
typedef struct DbLog DbLog;
typedef struct FileSystem FileSystem;
typedef struct Freelist Freelist;
typedef struct FreelistEntry FreelistEntry;
typedef struct Level Level;
typedef struct LogMark LogMark;
................................................................................
  i64 iId;                        /* Snapshot id */
  i64 iLogOff;                    /* Log file offset */

  /* Used by worker snapshots only */
  int nBlock;                     /* Number of blocks in database file */
  Pgno aiAppend[LSM_APPLIST_SZ];  /* Append point list */
  Freelist freelist;              /* Free block list */

  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *, u32 *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);





int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

int lsmCheckpointLoad(lsm_db *pDb, int *);
................................................................................
u32 lsmCheckpointNWrite(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int);
int lsmDatabaseFull(lsm_db *pDb);
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite);


/* 
** Functions from file "lsm_tree.c".
*/
................................................................................
void lsmDbDatabaseRelease(lsm_db *);

int lsmBeginReadTrans(lsm_db *);
int lsmBeginWriteTrans(lsm_db *);
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int);
int lsmFinishFlush(lsm_db *, int);

int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Changes to src/lsm_ckpt.c.

163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
...
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
...
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
....
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
....
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
 + (((x)&0x00FF0000)>>8)  + (((x)&0xFF000000)>>24) \
)

static const int one = 1;
#define LSM_LITTLE_ENDIAN (*(u8 *)(&one))

/* Sizes, in integers, of various parts of the checkpoint. */
#define CKPT_HDR_SIZE         9
#define CKPT_LOGPTR_SIZE      4
#define CKPT_APPENDLIST_SIZE  (LSM_APPLIST_SZ * 2)

/* A #define to describe each integer in the checkpoint header. */
#define CKPT_HDR_ID_MSW   0
#define CKPT_HDR_ID_LSW   1
#define CKPT_HDR_NCKPT    2
#define CKPT_HDR_NBLOCK   3
#define CKPT_HDR_BLKSZ    4
#define CKPT_HDR_NLEVEL   5
#define CKPT_HDR_PGSZ     6
#define CKPT_HDR_OVFL     7
#define CKPT_HDR_NWRITE   8

#define CKPT_HDR_LO_MSW     9
#define CKPT_HDR_LO_LSW    10
#define CKPT_HDR_LO_CKSUM1 11
#define CKPT_HDR_LO_CKSUM2 12

typedef struct CkptBuffer CkptBuffer;

/*
** Dynamic buffer used to accumulate data for a checkpoint.
*/
struct CkptBuffer {
................................................................................
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, 0, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
................................................................................
    *pnVal = 0;
    *paVal = 0;
  }

  return rc;
}

/*
** The worker lock must be held to call this function.
**
** The function serializes and returns the data that should be stored as
** the FREELIST system record.
*/
int lsmCheckpointOverflow(
  lsm_db *pDb,                    /* Database handle (must hold worker lock) */
  void **ppVal,                   /* OUT: lsmMalloc'd buffer */
  int *pnVal,                     /* OUT: Size of *ppVal in bytes */
  int *pnOvfl                     /* OUT: Number of freelist entries in buf */
){
  assert( 0 );
#if 0
  int rc = LSM_OK;
  int nRet;
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );
  assert( pnOvfl && ppVal && pnVal );
  assert( pDb->nMaxFreelist>=2 && pDb->nMaxFreelist<=LSM_MAX_FREELIST_ENTRIES );

  if( p->nFreelistOvfl ){
    rc = lsmCheckpointOverflowLoad(pDb, &p->freelist);
    if( rc!=LSM_OK ) return rc;
    p->nFreelistOvfl = 0;
  }

  if( p->freelist.nEntry<=pDb->nMaxFreelist ){
    nRet = 0;
    *pnVal = 0;
    *ppVal = 0;
  }else{
    int i;                        /* Iterator variable */
    int iOut = 0;                 /* Current size of blob in ckpt */
    CkptBuffer ckpt;              /* Used to build FREELIST blob */

    nRet = (p->freelist.nEntry - pDb->nMaxFreelist);

    memset(&ckpt, 0, sizeof(CkptBuffer));
    ckpt.pEnv = pDb->pEnv;
    for(i=p->freelist.nEntry-nRet; rc==LSM_OK && i<p->freelist.nEntry; i++){
      FreelistEntry *pEntry = &p->freelist.aEntry[i];
      ckptSetValue(&ckpt, iOut++, pEntry->iBlk, &rc);
      ckptSetValue(&ckpt, iOut++, (pEntry->iId >> 32) & 0xFFFFFFFF, &rc);
      ckptSetValue(&ckpt, iOut++, pEntry->iId & 0xFFFFFFFF, &rc);
    }
    ckptChangeEndianness(ckpt.aCkpt, iOut);

    *ppVal = ckpt.aCkpt;
    *pnVal = iOut*sizeof(u32);
  }

  *pnOvfl = nRet;
  return rc;
#endif
}

/*
** The connection must be the worker in order to call this function.
**
** True is returned if there are currently too many free-list entries
** in-memory to store in a checkpoint. Before calling CheckpointSaveWorker()
** to save the current worker snapshot, a new top-level LSM segment must
** be created so that some of them can be written to the LSM. 
*/
int lsmCheckpointOverflowRequired(lsm_db *pDb){
  Snapshot *p = pDb->pWorker;
  assert( 0 );
  assert( lsmShmAssertWorker(pDb) );
  return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0);
}

/*
** Connection pDb must be the worker to call this function.
**
** Load the FREELIST record from the database. Decode it and append the
** results to list pFreelist.
*/
int lsmCheckpointOverflowLoad(
  lsm_db *pDb,
  Freelist *pFreelist
){
  assert( 0 );
#if 0
  int rc;
  int nVal = 0;
  void *pVal = 0;
  assert( lsmShmAssertWorker(pDb) );

  /* Load the blob of data from the LSM. If that is successful (and the
  ** blob is greater than zero bytes in size), decode the contents and
  ** merge them into the current contents of *pFreelist.  */
  rc = lsmSortedLoadFreelist(pDb, &pVal, &nVal);
  if( pVal ){
    u32 *aFree = (u32 *)pVal;
    int nFree = nVal / sizeof(int);
    ckptChangeEndianness(aFree, nFree);
    if( (nFree % 3) ){
      rc = LSM_CORRUPT_BKPT;
    }else{
      int iNew = 0;               /* Offset of next element in aFree[] */
      int iOld = 0;               /* Next element in freelist fl */
      Freelist fl = *pFreelist;   /* Original contents of *pFreelist */

      memset(pFreelist, 0, sizeof(Freelist));
      while( rc==LSM_OK && (iNew<nFree || iOld<fl.nEntry) ){
        int iBlk;
        i64 iId;

        if( iOld>=fl.nEntry ){
          iBlk = aFree[iNew];
          iId = ((i64)(aFree[iNew+1])<<32) + (i64)aFree[iNew+2];
          iNew += 3;
        }else if( iNew>=nFree ){
          iBlk = fl.aEntry[iOld].iBlk;
          iId = fl.aEntry[iOld].iId;
          iOld += 1;
        }else{
          iId = ((i64)(aFree[iNew+1])<<32) + (i64)aFree[iNew+2];
          if( iId<fl.aEntry[iOld].iId ){
            iBlk = aFree[iNew];
            iNew += 3;
          }else{
            iBlk = fl.aEntry[iOld].iBlk;
            iId = fl.aEntry[iOld].iId;
            iOld += 1;
          }
        }

        rc = lsmFreelistAppend(pDb->pEnv, pFreelist, iBlk, iId);
      }
      lsmFree(pDb->pEnv, fl.aEntry);

#ifdef LSM_DEBUG
      if( rc==LSM_OK ){
        int i;
        for(i=1; rc==LSM_OK && i<pFreelist->nEntry; i++){
          assert( pFreelist->aEntry[i].iId >= pFreelist->aEntry[i-1].iId );
        }
        assert( pFreelist->nEntry==(fl.nEntry + nFree/3) );
      }
#endif
    }

    lsmFree(pDb->pEnv, pVal);
  }

  return rc;
#endif
}

/*
** Read the checkpoint id from meta-page pPg.
*/
static i64 ckptLoadId(MetaPage *pPg){
  i64 ret = 0;
  if( pPg ){
    int nData;
................................................................................
    for(i=0; i<LSM_APPLIST_SZ; i++){
      u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
      pNew->aiAppend[i] = ckptRead64(a);
    }

    /* Copy the free-list */
    if( bInclFreelist ){
      pNew->nFreelistOvfl = aCkpt[CKPT_HDR_OVFL];
      nFree = aCkpt[iIn++];
      if( nFree ){
        pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc(
            pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
        );
        if( rc==LSM_OK ){
          int i;
................................................................................
**
** This function updates the shared-memory worker and client snapshots with
** the new snapshot produced by the work performed by pDb.
**
** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM
** error code is returned.
*/
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;

  rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n);







|











<
|

|
|
|
|







 







<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







<







 







|







163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181

182
183
184
185
186
187
188
189
190
191
192
193
194
...
436
437
438
439
440
441
442

443
444
445
446
447
448
449
...
653
654
655
656
657
658
659
























































































































































660
661
662
663
664
665
666
...
922
923
924
925
926
927
928

929
930
931
932
933
934
935
...
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
 + (((x)&0x00FF0000)>>8)  + (((x)&0xFF000000)>>24) \
)

static const int one = 1;
#define LSM_LITTLE_ENDIAN (*(u8 *)(&one))

/* Sizes, in integers, of various parts of the checkpoint. */
#define CKPT_HDR_SIZE         8
#define CKPT_LOGPTR_SIZE      4
#define CKPT_APPENDLIST_SIZE  (LSM_APPLIST_SZ * 2)

/* A #define to describe each integer in the checkpoint header. */
#define CKPT_HDR_ID_MSW   0
#define CKPT_HDR_ID_LSW   1
#define CKPT_HDR_NCKPT    2
#define CKPT_HDR_NBLOCK   3
#define CKPT_HDR_BLKSZ    4
#define CKPT_HDR_NLEVEL   5
#define CKPT_HDR_PGSZ     6

#define CKPT_HDR_NWRITE   7

#define CKPT_HDR_LO_MSW     8
#define CKPT_HDR_LO_LSW     9
#define CKPT_HDR_LO_CKSUM1 10
#define CKPT_HDR_LO_CKSUM2 11

typedef struct CkptBuffer CkptBuffer;

/*
** Dynamic buffer used to accumulate data for a checkpoint.
*/
struct CkptBuffer {
................................................................................
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);

  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
................................................................................
    *pnVal = 0;
    *paVal = 0;
  }

  return rc;
}

























































































































































/*
** Read the checkpoint id from meta-page pPg.
*/
static i64 ckptLoadId(MetaPage *pPg){
  i64 ret = 0;
  if( pPg ){
    int nData;
................................................................................
    for(i=0; i<LSM_APPLIST_SZ; i++){
      u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
      pNew->aiAppend[i] = ckptRead64(a);
    }

    /* Copy the free-list */
    if( bInclFreelist ){

      nFree = aCkpt[iIn++];
      if( nFree ){
        pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc(
            pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
        );
        if( rc==LSM_OK ){
          int i;
................................................................................
**
** This function updates the shared-memory worker and client snapshots with
** the new snapshot produced by the work performed by pDb.
**
** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM
** error code is returned.
*/
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;

  rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n);

Changes to src/lsm_file.c.

2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
    lsmStringAppendf(&str, " %d", pArray->iLastPg);

    *pzOut = str.z;
  }

  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcwork);
  }
  return rc;
}

/*
** The following macros are used by the integrity-check code. Associated with
** each block in the database is an 8-bit bit mask (the entry in the aUsed[]







|







2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
    lsmStringAppendf(&str, " %d", pArray->iLastPg);

    *pzOut = str.z;
  }

  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcwork);
  }
  return rc;
}

/*
** The following macros are used by the integrity-check code. Associated with
** each block in the database is an 8-bit bit mask (the entry in the aUsed[]

Changes to src/lsm_main.c.

368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
...
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
  *pp = pDb->pWorker;
  return rc;
}

static void infoFreeWorker(lsm_db *pDb, int bUnlock){
  if( bUnlock ){
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
}

int lsmStructList(
  lsm_db *pDb,                    /* Database handle */
  char **pzOut                    /* OUT: Nul-terminated string (tcl list) */
){
................................................................................
  int rc;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  lsmStringAppendf(&s, "%d+%d",pWorker->freelist.nEntry,pWorker->nFreelistOvfl);
  for(i=0; i<pWorker->freelist.nEntry; i++){
    FreelistEntry *p = &pWorker->freelist.aEntry[i];
    lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId);
  }
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */







|







 







|







368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
...
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
  *pp = pDb->pWorker;
  return rc;
}

static void infoFreeWorker(lsm_db *pDb, int bUnlock){
  if( bUnlock ){
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcdummy);
  }
}

int lsmStructList(
  lsm_db *pDb,                    /* Database handle */
  char **pzOut                    /* OUT: Nul-terminated string (tcl list) */
){
................................................................................
  int rc;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  lsmStringAppendf(&s, "%d", pWorker->freelist.nEntry);
  for(i=0; i<pWorker->freelist.nEntry; i++){
    FreelistEntry *p = &pWorker->freelist.aEntry[i];
    lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId);
  }
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */

Changes to src/lsm_shared.c.

559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578

579
580
581
582
583
584
585
...
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
...
816
817
818
819
820
821
822

823
824
825
826
827
828
829
830
831
832
833
834
835
836
...
852
853
854
855
856
857
858








859
860
861
862
863
864
865
....
1092
1093
1094
1095
1096
1097
1098

1099
1100
1101
1102
1103
1104
1105
  **   * May be used by a database client in the future, or
  **   * Is the most recently checkpointed snapshot (i.e. the one that will
  **     be used following recovery if a failure occurs at this point).
  */
  rc = lsmCheckpointSynced(pDb, &iInUse, 0, 0);
  if( rc==LSM_OK && iInUse==0 ) iInUse = p->iId;
  if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId);

  if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);

  /* Query the free block list for a suitable block */
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "reusing block %d", iRet);

#endif
      rc = freelistAppend(pDb, iRet, -1);
    }else{
      iRet = ++(p->nBlock);
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
................................................................................

/*
** Argument bFlush is true if the contents of the in-memory tree has just
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int nOvfl, int *pRc){
  assert( *pRc!=0 || pDb->pWorker );
  if( pDb->pWorker ){
    /* If no error has occurred, serialize the worker snapshot and write
    ** it to shared memory.  */
    assert( pDb->pWorker->nFreelistOvfl==0 || nOvfl==0 );
    if( *pRc==LSM_OK ){
      *pRc = lsmCheckpointSaveWorker(pDb, bFlush, nOvfl);
    }
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}
................................................................................
      }
      if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
#if 0
if( rc==LSM_OK && pDb->pClient ){

  printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
      (void *)pDb,
      (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, 
      (int)pDb->treehdr.root.iTransId,
      (int)pDb->treehdr.iOldShmid
  );
  fflush(stdout);
}
#endif
  }

  if( rc!=LSM_OK ){
    lsmReleaseReadlock(pDb);
  }
................................................................................

#if 0
  if( pClient ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
  }
#endif








  if( pDb->iReader>=0 ) lsmReleaseReadlock(pDb);
}

/*
** Open a write transaction.
*/
int lsmBeginWriteTrans(lsm_db *pDb){
................................................................................
          ** the caller in this case.  */
          return rc;
        }
      }
    }
  }


  return LSM_OK;
}

int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
  if( db->treehdr.iUsedShmid==iShmid ){
    *pbInUse = 1;
    return LSM_OK;







<











|
>







 







|




<

|







 







>
|





<







 







>
>
>
>
>
>
>
>







 







>







559
560
561
562
563
564
565

566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
...
728
729
730
731
732
733
734
735
736
737
738
739

740
741
742
743
744
745
746
747
748
...
815
816
817
818
819
820
821
822
823
824
825
826
827
828

829
830
831
832
833
834
835
...
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
....
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
  **   * May be used by a database client in the future, or
  **   * Is the most recently checkpointed snapshot (i.e. the one that will
  **     be used following recovery if a failure occurs at this point).
  */
  rc = lsmCheckpointSynced(pDb, &iInUse, 0, 0);
  if( rc==LSM_OK && iInUse==0 ) iInUse = p->iId;
  if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId);

  if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);

  /* Query the free block list for a suitable block */
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, 
          "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
#endif
      rc = freelistAppend(pDb, iRet, -1);
    }else{
      iRet = ++(p->nBlock);
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
................................................................................

/*
** Argument bFlush is true if the contents of the in-memory tree has just
** been flushed to disk. The significance of this is that once the snapshot
** created to hold the updated state of the database is synced to disk, log
** file space can be recycled.
*/
void lsmFinishWork(lsm_db *pDb, int bFlush, int *pRc){
  assert( *pRc!=0 || pDb->pWorker );
  if( pDb->pWorker ){
    /* If no error has occurred, serialize the worker snapshot and write
    ** it to shared memory.  */

    if( *pRc==LSM_OK ){
      *pRc = lsmCheckpointSaveWorker(pDb, bFlush);
    }
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}
................................................................................
      }
      if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
#if 0
if( rc==LSM_OK && pDb->pClient ){
  fprintf(stderr, 
      "reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
      (void *)pDb,
      (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, 
      (int)pDb->treehdr.root.iTransId,
      (int)pDb->treehdr.iOldShmid
  );

}
#endif
  }

  if( rc!=LSM_OK ){
    lsmReleaseReadlock(pDb);
  }
................................................................................

#if 0
  if( pClient ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
  }
#endif

#if 0
if( pDb->pClient && pDb->iReader>=0 ){
  fprintf(stderr, 
      "finished reading %p: snapshot:%d\n", (void *)pDb, (int)pDb->pClient->iId
  );
}
#endif
  if( pDb->iReader>=0 ) lsmReleaseReadlock(pDb);
}

/*
** Open a write transaction.
*/
int lsmBeginWriteTrans(lsm_db *pDb){
................................................................................
          ** the caller in this case.  */
          return rc;
        }
      }
    }
  }

  *piInUse = iInUse;
  return LSM_OK;
}

int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){
  if( db->treehdr.iUsedShmid==iShmid ){
    *pbInUse = 1;
    return LSM_OK;

Changes to src/lsm_sorted.c.

202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
....
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
....
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
....
3915
3916
3917
3918
3919
3920
3921
3922
3923
3924
3925
3926
3927
3928
3929
....
3966
3967
3968
3969
3970
3971
3972
3973
3974
3975
3976

3977
3978


3979
3980
3981
3982
3983
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994
3995
....
4455
4456
4457
4458
4459
4460
4461
4462
4463
4464
4465
4466
4467
4468
4469
....
4503
4504
4505
4506
4507
4508
4509
4510
4511
4512
4513
4514
4515
4516
4517
4518
4519
4520
4521
4522
4523
....
4524
4525
4526
4527
4528
4529
4530
4531
4532
4533
4534
4535
4536
4537
4538
4539
4540
4541
4542
4543
4544
4545
4546
4547
4548
4549
4550
4551
4552
4553
4554
4555
4556
4557
....
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
4683
4684
  BtreeCursor *pBtCsr;            /* b-tree cursor (db writes only) */

  /* Comparison results */
  int nTree;                      /* Size of aTree[] array */
  int *aTree;                     /* Array of comparison results */

  /* Used by cursors flushing the in-memory tree only */
  int *pnOvfl;                    /* Number of free-list entries to store */
  void *pSystemVal;               /* Pointer to buffer to free */

  /* Used by worker cursors only */
  Pgno *pPrevMergePtr;
};

/*
................................................................................
}

/*
** If the free-block list is not empty, then have this cursor visit a key
** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value 
** blob containing the serialized free-block list.
*/
static int multiCursorVisitFreelist(MultiCursor *pCsr, int *pnOvfl){
  int rc = LSM_OK;

  assert( pCsr );
  pCsr->pnOvfl = pnOvfl;
  pCsr->flags |= CURSOR_FLUSH_FREELIST;
  pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
  return rc;
}

/*
** Allocate and return a new database cursor.
................................................................................
    pDb->xWork(pDb, pDb->pWorkCtx);
  }
}

static int sortedNewToplevel(
  lsm_db *pDb,                    /* Connection handle */
  int eTree,                      /* One of the TREE_XXX constants */
  int *pnOvfl,                    /* OUT: Number of free-list entries stored */
  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;
  assert( pnOvfl );

  assert( pDb->bUseFreelist==0 );
  pDb->pFreelist = &freelist;
  pDb->bUseFreelist = 1;
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */
................................................................................

  /* Create a cursor to gather the data required by the new segment. The new
  ** segment contains everything in the tree and pointers to the next segment
  ** in the database (if any).  */
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    pCsr->pDb = pDb;
    rc = multiCursorVisitFreelist(pCsr, pnOvfl);
    if( rc==LSM_OK ){
      rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
    }
    if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){
      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
    }
................................................................................

    nWrite = mergeworker.nWork;
    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->pMerge = 0;
    pNew->iAge = 0;
  }

  /* Link the new level into the top of the tree. */
  if( rc==LSM_OK && pNew->lhs.iFirst ){
    if( pDel ) pDel->iRoot = 0;
  }else{

    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);


  }

#if 0
  if( pNew->lhs.iFirst ){
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
  }
#endif

  if( rc==LSM_OK ){
    assert( pNew->lhs.iFirst || pDb->pWorker->freelist.nEntry==0 );
    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
      lsmFree(pDb->pEnv, p->aEntry);
      memcpy(p, &freelist, sizeof(freelist));
      freelist.aEntry = 0;
    }else{
      pDb->pWorker->freelist.nEntry = 0;
................................................................................
  int bShutdown,
  int flags, 
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
  int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
){
  int rc = LSM_OK;                /* Return code */
  int nOvfl = 0;
  int bDirty = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
................................................................................
      rc = sortedWork(pDb, nRem, 0, 1, &nPg);
      nRem -= nPg;
      assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
    }

    if( rc==LSM_OK && nRem>0 ){
      int nPg = 0;
      rc = sortedNewToplevel(pDb, TREE_OLD, &nOvfl, &nPg);
      nRem -= nPg;
      if( rc==LSM_OK ){
        if( pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        rc = lsmCheckpointSaveWorker(pDb, 1, 0);
      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
................................................................................
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
    nRem -= nPg;
    if( nPg ) bDirty = 1;
  }

  /* If the in-memory part of the free-list is too large, write a new 
  ** top-level containing just the in-memory free-list entries to disk.
  */
  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > LSM_MAX_FREELIST_ENTRIES ){
    int nPg = 0;
    while( rc==LSM_OK && sortedDbIsFull(pDb) ){
      rc = sortedWork(pDb, 16, 0, 1, &nPg);
      nRem -= nPg;
    }
    if( rc==LSM_OK ){
      rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, &nPg);
    }
    nRem -= nPg;
    if( nPg ) bDirty = 1;
  }

  if( rc==LSM_OK && bDirty ){
    lsmFinishWork(pDb, 0, 0, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  assert( pDb->pWorker==0 );

  if( rc==LSM_OK ){
    if( pnWrite ) *pnWrite = (nMax - nRem);
    if( pbCkpt ) *pbCkpt = (bCkpt && nRem<=0);
  }else{
................................................................................

/*
** This function is only called during system shutdown. The contents of
** any in-memory trees present (old or current) are written out to disk.
*/
int lsmFlushTreeToDisk(lsm_db *pDb){
  int rc;
  int nOvfl = 0;

  rc = lsmBeginWork(pDb);
  while( rc==LSM_OK && sortedDbIsFull(pDb) ){
    rc = sortedWork(pDb, 256, 0, 1, 0);
  }

  if( rc==LSM_OK ){
    rc = sortedNewToplevel(pDb, TREE_BOTH, &nOvfl, 0);
  }
  lsmFinishWork(pDb, 1, nOvfl, &rc);
  return rc;
}

/*
** Return a string representation of the segment passed as the only argument.
** Space for the returned string is allocated using lsmMalloc(), and should
** be freed by the caller using lsmFree().







<







 







|

<
<
<







 







<









<







 







|







 







<
|
<
<
>


>
>
|
<

<

<


<
<







 







<







 







|





|







 







|
<
|






|






|


|







 







<







|

|







202
203
204
205
206
207
208

209
210
211
212
213
214
215
....
2258
2259
2260
2261
2262
2263
2264
2265
2266



2267
2268
2269
2270
2271
2272
2273
....
3880
3881
3882
3883
3884
3885
3886

3887
3888
3889
3890
3891
3892
3893
3894
3895

3896
3897
3898
3899
3900
3901
3902
....
3909
3910
3911
3912
3913
3914
3915
3916
3917
3918
3919
3920
3921
3922
3923
....
3960
3961
3962
3963
3964
3965
3966

3967


3968
3969
3970
3971
3972
3973

3974

3975

3976
3977


3978
3979
3980
3981
3982
3983
3984
....
4444
4445
4446
4447
4448
4449
4450

4451
4452
4453
4454
4455
4456
4457
....
4491
4492
4493
4494
4495
4496
4497
4498
4499
4500
4501
4502
4503
4504
4505
4506
4507
4508
4509
4510
4511
....
4512
4513
4514
4515
4516
4517
4518
4519

4520
4521
4522
4523
4524
4525
4526
4527
4528
4529
4530
4531
4532
4533
4534
4535
4536
4537
4538
4539
4540
4541
4542
4543
4544
....
4647
4648
4649
4650
4651
4652
4653

4654
4655
4656
4657
4658
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
  BtreeCursor *pBtCsr;            /* b-tree cursor (db writes only) */

  /* Comparison results */
  int nTree;                      /* Size of aTree[] array */
  int *aTree;                     /* Array of comparison results */

  /* Used by cursors flushing the in-memory tree only */

  void *pSystemVal;               /* Pointer to buffer to free */

  /* Used by worker cursors only */
  Pgno *pPrevMergePtr;
};

/*
................................................................................
}

/*
** If the free-block list is not empty, then have this cursor visit a key
** with (a) the system bit set, and (b) the key "FREELIST" and (c) a value 
** blob containing the serialized free-block list.
*/
static int multiCursorVisitFreelist(MultiCursor *pCsr){
  int rc = LSM_OK;



  pCsr->flags |= CURSOR_FLUSH_FREELIST;
  pCsr->pSystemVal = lsmMallocRc(pCsr->pDb->pEnv, 4 + 8, &rc);
  return rc;
}

/*
** Allocate and return a new database cursor.
................................................................................
    pDb->xWork(pDb, pDb->pWorkCtx);
  }
}

static int sortedNewToplevel(
  lsm_db *pDb,                    /* Connection handle */
  int eTree,                      /* One of the TREE_XXX constants */

  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;


  assert( pDb->bUseFreelist==0 );
  pDb->pFreelist = &freelist;
  pDb->bUseFreelist = 1;
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */
................................................................................

  /* Create a cursor to gather the data required by the new segment. The new
  ** segment contains everything in the tree and pointers to the next segment
  ** in the database (if any).  */
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    pCsr->pDb = pDb;
    rc = multiCursorVisitFreelist(pCsr);
    if( rc==LSM_OK ){
      rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
    }
    if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){
      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
    }
................................................................................

    nWrite = mergeworker.nWork;
    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->pMerge = 0;
    pNew->iAge = 0;
  }


  if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){


    assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }else{
    if( pDel ) pDel->iRoot = 0;


#if 0

    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");

#endif



    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
      lsmFree(pDb->pEnv, p->aEntry);
      memcpy(p, &freelist, sizeof(freelist));
      freelist.aEntry = 0;
    }else{
      pDb->pWorker->freelist.nEntry = 0;
................................................................................
  int bShutdown,
  int flags, 
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
  int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
){
  int rc = LSM_OK;                /* Return code */

  int bDirty = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
................................................................................
      rc = sortedWork(pDb, nRem, 0, 1, &nPg);
      nRem -= nPg;
      assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
    }

    if( rc==LSM_OK && nRem>0 ){
      int nPg = 0;
      rc = sortedNewToplevel(pDb, TREE_OLD, &nPg);
      nRem -= nPg;
      if( rc==LSM_OK ){
        if( pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        rc = lsmCheckpointSaveWorker(pDb, 1);
      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
................................................................................
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
    nRem -= nPg;
    if( nPg ) bDirty = 1;
  }

  /* If the in-memory part of the free-list is too large, write a new 
  ** top-level containing just the in-memory free-list entries to disk. */

  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
    int nPg = 0;
    while( rc==LSM_OK && sortedDbIsFull(pDb) ){
      rc = sortedWork(pDb, 16, 0, 1, &nPg);
      nRem -= nPg;
    }
    if( rc==LSM_OK ){
      rc = sortedNewToplevel(pDb, TREE_NONE, &nPg);
    }
    nRem -= nPg;
    if( nPg ) bDirty = 1;
  }

  if( rc==LSM_OK && bDirty ){
    lsmFinishWork(pDb, 0, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcdummy);
  }
  assert( pDb->pWorker==0 );

  if( rc==LSM_OK ){
    if( pnWrite ) *pnWrite = (nMax - nRem);
    if( pbCkpt ) *pbCkpt = (bCkpt && nRem<=0);
  }else{
................................................................................

/*
** This function is only called during system shutdown. The contents of
** any in-memory trees present (old or current) are written out to disk.
*/
int lsmFlushTreeToDisk(lsm_db *pDb){
  int rc;


  rc = lsmBeginWork(pDb);
  while( rc==LSM_OK && sortedDbIsFull(pDb) ){
    rc = sortedWork(pDb, 256, 0, 1, 0);
  }

  if( rc==LSM_OK ){
    rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
  }
  lsmFinishWork(pDb, 1, &rc);
  return rc;
}

/*
** Return a string representation of the segment passed as the only argument.
** Space for the returned string is allocated using lsmMalloc(), and should
** be freed by the caller using lsmFree().