SQLite4
Changes On Branch block-redirects
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Changes In Branch block-redirects Excluding Merge-Ins

This is equivalent to a diff from 4d1b506594 to 520f3729b8

2013-01-28
08:56
Merge block-redirects branch with trunk. This changes the lsm file format. check-in: 647229e983 user: dan tags: trunk
2013-01-26
20:18
Handle calls to lsm_work() with the nPage parameter set to not greater than zero. Remove some dead code. Leaf check-in: 520f3729b8 user: dan tags: block-redirects
19:17
Fix problems with redirected blocks in compressed databases. check-in: 930b7e4507 user: dan tags: block-redirects
2013-01-20
00:07
Enable the AUTOINCREMENT feature. check-in: 5442b20bf6 user: drh tags: trunk
2013-01-19
20:07
Change the lsm file-format to allow a small number of blocks belonging to the oldest segment in the database to be moved without modifying their content. This makes it easier to compact a database file to the minimum required size. check-in: 09251cee6a user: dan tags: block-redirects
19:49
Insert a value in place of NULL in an INTEGER PRIMARY KEY, even if the IPK column is omitted from the VALUES list in the INSERT statement. check-in: 4d1b506594 user: drh tags: trunk
16:14
Inserting NULL into a INTEGER PRIMARY KEY fills that key with the next available integer value. The sqlite4_last_insert_rowid() function now works for those cases. check-in: 697ee9faad user: drh tags: trunk

Changes to lsm-test/README.

    24     24                 the system recovers and other clients proceed unaffected if
    25     25                 a process fails in the middle of a write transaction.
    26     26   
    27     27                 The difference from lsmtest2.c is that this file tests
    28     28                 live-recovery (recovery from a failure that occurs while other
    29     29                 clients are still running) whereas lsmtest2.c tests recovery
    30     30                 from a system or power failure.
           31  +
           32  +  lsmtest9.c: More data tests. These focus on testing that calling
           33  +              lsm_work(nMerge=1) to compact the database does not corrupt it.
           34  +              In other words, that databases containing block-redirects
           35  +              can be read and written.
    31     36   
    32     37   
    33     38   
    34     39   

Changes to lsm-test/lsmtest.h.

   215    215   void test_data_1(const char *, const char *, int *pRc);
   216    216   void test_data_2(const char *, const char *, int *pRc);
   217    217   void test_data_3(const char *, const char *, int *pRc);
   218    218   void testDbContents(TestDb *, Datasource *, int, int, int, int, int, int *);
   219    219   void testCaseProgress(int, int, int, int *);
   220    220   int testCaseNDot(void);
   221    221   
          222  +void testCompareDb(Datasource *, int, int, TestDb *, TestDb *, int *);
          223  +int testControlDb(TestDb **ppDb);
          224  +
   222    225   typedef struct CksumDb CksumDb;
   223    226   CksumDb *testCksumArrayNew(Datasource *, int, int, int);
   224    227   char *testCksumArrayGet(CksumDb *, int);
   225    228   void testCksumArrayFree(CksumDb *);
   226    229   void testCaseStart(int *pRc, char *zFmt, ...);
   227    230   void testCaseFinish(int rc);
   228    231   void testCaseSkip(void);
................................................................................
   230    233   
   231    234   #define TEST_CKSUM_BYTES 29
   232    235   int testCksumDatabase(TestDb *pDb, char *zOut);
   233    236   int testCountDatabase(TestDb *pDb);
   234    237   void testCompareInt(int, int, int *);
   235    238   void testCompareStr(const char *z1, const char *z2, int *pRc);
   236    239   
          240  +/* lsmtest9.c */
          241  +void test_data_4(const char *, const char *, int *pRc);
          242  +
   237    243   
   238    244   /*
   239    245   ** Similar to the Tcl_GetIndexFromObjStruct() Tcl library function.
   240    246   */
   241    247   #define testArgSelect(w,x,y,z) testArgSelectX(w,x,sizeof(w[0]),y,z)
   242    248   int testArgSelectX(void *, const char *, int, const char *, int *);
   243    249   
   244    250   #ifdef __cplusplus
   245    251   }  /* End of the 'extern "C"' block */
   246    252   #endif
   247    253   
   248    254   #endif

Changes to lsm-test/lsmtest1.c.

    88     88     zRet = testMallocPrintf("data.%s.%s.%d.%d", 
    89     89         zSystem, zData, pTest->nRow, pTest->nVerify
    90     90     );
    91     91     testFree(zData);
    92     92     return zRet;
    93     93   }
    94     94   
    95         -static int testControlDb(TestDb **ppDb){
           95  +int testControlDb(TestDb **ppDb){
    96     96   #ifdef HAVE_KYOTOCABINET
    97     97     return tdb_open("kyotocabinet", "tmp.db", 1, ppDb);
    98     98   #else
    99     99     return tdb_open("sqlite3", ":memory:", 1, ppDb);
   100    100   #endif
   101    101   }
   102    102   
................................................................................
   344    344       if( testCaseBegin(pRc, zPattern, "%s", zName) ){
   345    345         doDataTest1(zSystem, &aTest[i], pRc);
   346    346       }
   347    347       testFree(zName);
   348    348     }
   349    349   }
   350    350   
   351         -static void testCompareDb(
          351  +void testCompareDb(
   352    352     Datasource *pData,
   353    353     int nData,
   354    354     int iSeed,
   355    355     TestDb *pControl,
   356    356     TestDb *pDb,
   357    357     int *pRc
   358    358   ){

Added lsm-test/lsmtest9.c.

            1  +
            2  +#include "lsmtest.h"
            3  +
            4  +#define DATA_SEQUENTIAL TEST_DATASOURCE_SEQUENCE
            5  +#define DATA_RANDOM     TEST_DATASOURCE_RANDOM
            6  +
            7  +typedef struct Datatest4 Datatest4;
            8  +
            9  +/*
           10  +** Test overview:
           11  +**
           12  +**   1. Insert (Datatest4.nRec) records into a database.
           13  +**
           14  +**   2. Repeat (Datatest4.nRepeat) times:
           15  +**
           16  +**      2a. Delete 2/3 of the records in the database.
           17  +**
           18  +**      2b. Run lsm_work(nMerge=1).
           19  +**
           20  +**      2c. Insert as many records as were deleted in 2a.
           21  +**
           22  +**      2d. Check database content is as expected.
           23  +**
           24  +**      2e. If (Datatest4.bReopen) is true, close and reopen the database.
           25  +*/
           26  +struct Datatest4 {
           27  +  /* Datasource definition */
           28  +  DatasourceDefn defn;
           29  +
           30  +  int nRec;
           31  +  int nRepeat;
           32  +  int bReopen;
           33  +};
           34  +
           35  +static void doDataTest4(
           36  +  const char *zSystem,            /* Database system to test */
           37  +  Datatest4 *p,                   /* Structure containing test parameters */
           38  +  int *pRc                        /* OUT: Error code */
           39  +){
           40  +  lsm_db *db = 0;
           41  +  TestDb *pDb;
           42  +  TestDb *pControl;
           43  +  Datasource *pData;
           44  +  int i;
           45  +  int rc = 0;
           46  +  int iDot = 0;
           47  +
           48  +  int nRecOn3 = (p->nRec / 3);
           49  +  int iData = 0;
           50  +
           51  +  /* Start the test case, open a database and allocate the datasource. */
           52  +  rc = testControlDb(&pControl);
           53  +  pDb = testOpen(zSystem, 1, &rc);
           54  +  pData = testDatasourceNew(&p->defn);
           55  +  if( rc==0 ) db = tdb_lsm(pDb);
           56  +
           57  +  testWriteDatasourceRange(pControl, pData, iData, nRecOn3*3, &rc);
           58  +  testWriteDatasourceRange(pDb,      pData, iData, nRecOn3*3, &rc);
           59  +
           60  +  for(i=0; rc==0 && i<p->nRepeat; i++){
           61  +
           62  +    testDeleteDatasourceRange(pControl, pData, iData, nRecOn3*2, &rc);
           63  +    testDeleteDatasourceRange(pDb,      pData, iData, nRecOn3*2, &rc);
           64  +
           65  +    if( db ){
           66  +      int nDone;
           67  +#if 0
           68  +      fprintf(stderr, "lsm_work() start...\n"); fflush(stderr);
           69  +#endif
           70  +      do {
           71  +        nDone = 0;
           72  +        rc = lsm_work(db, 1, (1<<30), &nDone);
           73  +      }while( rc==0 && nDone>0 );
           74  +#if 0 
           75  +      fprintf(stderr, "lsm_work() done...\n"); fflush(stderr);
           76  +#endif
           77  +    }
           78  +
           79  +if( i+1<p->nRepeat ){
           80  +    iData += (nRecOn3*2);
           81  +    testWriteDatasourceRange(pControl, pData, iData+nRecOn3, nRecOn3*2, &rc);
           82  +    testWriteDatasourceRange(pDb,      pData, iData+nRecOn3, nRecOn3*2, &rc);
           83  +
           84  +    testCompareDb(pData, nRecOn3*3, iData, pControl, pDb, &rc);
           85  +
           86  +    /* If Datatest4.bReopen is true, close and reopen the database */
           87  +    if( p->bReopen ){
           88  +      testReopen(&pDb, &rc);
           89  +      if( rc==0 ) db = tdb_lsm(pDb);
           90  +    }
           91  +}
           92  +
           93  +    /* Update the progress dots... */
           94  +    testCaseProgress(i, p->nRepeat, testCaseNDot(), &iDot);
           95  +  }
           96  +
           97  +  testClose(&pDb);
           98  +  testClose(&pControl);
           99  +  testDatasourceFree(pData);
          100  +  testCaseFinish(rc);
          101  +  *pRc = rc;
          102  +}
          103  +
          104  +static char *getName4(const char *zSystem, Datatest4 *pTest){
          105  +  char *zRet;
          106  +  char *zData;
          107  +  zData = testDatasourceName(&pTest->defn);
          108  +  zRet = testMallocPrintf("data4.%s.%s.%d.%d.%d", 
          109  +      zSystem, zData, pTest->nRec, pTest->nRepeat, pTest->bReopen
          110  +  );
          111  +  testFree(zData);
          112  +  return zRet;
          113  +}
          114  +
          115  +void test_data_4(
          116  +  const char *zSystem,            /* Database system name */
          117  +  const char *zPattern,           /* Run test cases that match this pattern */
          118  +  int *pRc                        /* IN/OUT: Error code */
          119  +){
          120  +  Datatest4 aTest[] = {
          121  +      /* defn,                                 nRec, nRepeat, bReopen */
          122  +    { {DATA_RANDOM,     20,25,     500,600}, 10000,      10,       0   },
          123  +    { {DATA_RANDOM,     20,25,     500,600}, 10000,      10,       1   },
          124  +  };
          125  +
          126  +  int i;
          127  +
          128  +  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
          129  +    char *zName = getName4(zSystem, &aTest[i]);
          130  +    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
          131  +      doDataTest4(zSystem, &aTest[i], pRc);
          132  +    }
          133  +    testFree(zName);
          134  +  }
          135  +}
          136  +
          137  +
          138  +

Changes to lsm-test/lsmtest_func.c.

    42     42     if( rc!=LSM_OK ){
    43     43       testPrintError("lsm_open(): rc=%d\n", rc);
    44     44     }else{
    45     45       rc = lsm_open(pDb, zDb);
    46     46       if( rc!=LSM_OK ){
    47     47         testPrintError("lsm_open(): rc=%d\n", rc);
    48     48       }else{
           49  +      int n = -1;
           50  +      lsm_config(pDb, LSM_CONFIG_BLOCK_SIZE, &n);
           51  +      n = n*2;
           52  +      lsm_config(pDb, LSM_CONFIG_AUTOCHECKPOINT, &n);
           53  +
    49     54         rc = lsm_work(pDb, nMerge, nWork, 0);
    50     55         if( rc!=LSM_OK ){
    51     56           testPrintError("lsm_work(): rc=%d\n", rc);
    52     57         }
    53     58       }
    54     59     }
    55     60     if( rc==LSM_OK ){

Changes to lsm-test/lsmtest_main.c.

   459    459   
   460    460     for(j=0; tdb_system_name(j); j++){
   461    461       rc = 0;
   462    462   
   463    463       test_data_1(tdb_system_name(j), zPattern, &rc);
   464    464       test_data_2(tdb_system_name(j), zPattern, &rc);
   465    465       test_data_3(tdb_system_name(j), zPattern, &rc);
          466  +    test_data_4(tdb_system_name(j), zPattern, &rc);
   466    467       test_rollback(tdb_system_name(j), zPattern, &rc);
   467    468       test_mc(tdb_system_name(j), zPattern, &rc);
   468    469       test_mt(tdb_system_name(j), zPattern, &rc);
   469    470   
   470    471       if( rc ) nFail++;
   471    472     }
   472    473   

Changes to lsm-test/lsmtest_tdb3.c.

   984    984   }
   985    985   
   986    986   int test_lsm_lomem_open(
   987    987     const char *zFilename, 
   988    988     int bClear, 
   989    989     TestDb **ppDb
   990    990   ){
          991  +    /* "max_freelist=4 autocheckpoint=32768 " */
   991    992     const char *zCfg = 
   992    993       "page_size=256 block_size=65536 autoflush=16384 "
   993         -    "max_freelist=2 autocheckpoint=32768 "
          994  +    "autocheckpoint=32768 "
   994    995       "mmap=0 "
   995    996     ;
   996    997     return testLsmOpen(zCfg, zFilename, bClear, ppDb);
   997    998   }
   998    999   
   999   1000   int test_lsm_zip_open(
  1000   1001     const char *zFilename, 
  1001   1002     int bClear, 
  1002   1003     TestDb **ppDb
  1003   1004   ){
  1004   1005     const char *zCfg = 
  1005   1006       "page_size=256 block_size=65536 autoflush=16384 "
  1006         -    "max_freelist=2 autocheckpoint=32768 compression=1"
  1007         -    "mmap=0 "
         1007  +    "autocheckpoint=32768 compression=1 mmap=0 "
  1008   1008     ;
  1009   1009     return testLsmOpen(zCfg, zFilename, bClear, ppDb);
  1010   1010   }
  1011   1011   
  1012   1012   lsm_db *tdb_lsm(TestDb *pDb){
  1013   1013     if( pDb->pMethods->xClose==test_lsm_close ){
  1014   1014       return ((LsmDb *)pDb)->db;

Changes to main.mk.

   290    290   EXTHDR += \
   291    291     $(TOP)/ext/icu/sqliteicu.h
   292    292   
   293    293   LSMTESTSRC = $(TOP)/lsm-test/lsmtest1.c $(TOP)/lsm-test/lsmtest2.c           \
   294    294                $(TOP)/lsm-test/lsmtest3.c $(TOP)/lsm-test/lsmtest4.c           \
   295    295                $(TOP)/lsm-test/lsmtest5.c $(TOP)/lsm-test/lsmtest6.c           \
   296    296                $(TOP)/lsm-test/lsmtest7.c $(TOP)/lsm-test/lsmtest8.c           \
          297  +             $(TOP)/lsm-test/lsmtest9.c                                      \
   297    298                $(TOP)/lsm-test/lsmtest_datasource.c \
   298    299                $(TOP)/lsm-test/lsmtest_func.c $(TOP)/lsm-test/lsmtest_io.c     \
   299    300                $(TOP)/lsm-test/lsmtest_main.c $(TOP)/lsm-test/lsmtest_mem.c    \
   300    301                $(TOP)/lsm-test/lsmtest_tdb.c $(TOP)/lsm-test/lsmtest_tdb3.c    \
   301    302                $(TOP)/lsm-test/lsmtest_util.c 
   302    303   
   303    304   LSMTESTHDR = $(TOP)/lsm-test/lsmtest.h $(TOP)/lsm-test/lsmtest_tdb.h

Changes to src/lsmInt.h.

    79     79   typedef struct LsmString LsmString;
    80     80   typedef struct Mempool Mempool;
    81     81   typedef struct Merge Merge;
    82     82   typedef struct MergeInput MergeInput;
    83     83   typedef struct MetaPage MetaPage;
    84     84   typedef struct MultiCursor MultiCursor;
    85     85   typedef struct Page Page;
           86  +typedef struct Redirect Redirect;
    86     87   typedef struct Segment Segment;
    87     88   typedef struct SegmentMerger SegmentMerger;
    88     89   typedef struct ShmChunk ShmChunk;
    89     90   typedef struct ShmHeader ShmHeader;
    90     91   typedef struct ShmReader ShmReader;
    91     92   typedef struct Snapshot Snapshot;
    92     93   typedef struct TransMark TransMark;
................................................................................
   139    140   
   140    141   /*
   141    142   ** Hard limit on the number of free-list entries that may be stored in 
   142    143   ** a checkpoint (the remainder are stored as a system record in the LSM).
   143    144   ** See also LSM_CONFIG_MAX_FREELIST.
   144    145   */
   145    146   #define LSM_MAX_FREELIST_ENTRIES 24
          147  +
          148  +#define LSM_MAX_BLOCK_REDIRECTS 16
   146    149   
   147    150   #define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000
   148    151   
   149    152   
   150    153   /*
   151    154   ** Each entry stored in the LSM (or in-memory tree structure) has an
   152    155   ** associated mask of the following flags.
................................................................................
   186    189   */
   187    190   typedef struct IntArray IntArray;
   188    191   struct IntArray {
   189    192     int nAlloc;
   190    193     int nArray;
   191    194     u32 *aArray;
   192    195   };
          196  +
          197  +struct Redirect {
          198  +  int n;                          /* Number of redirects */
          199  +  struct RedirectEntry {
          200  +    int iFrom;
          201  +    int iTo;
          202  +  } *a;
          203  +};
   193    204   
   194    205   /*
   195    206   ** An instance of this structure represents a point in the history of the
   196    207   ** tree structure to roll back to. Refer to comments in lsm_tree.c for 
   197    208   ** details.
   198    209   */
   199    210   struct TreeMark {
................................................................................
   306    317     lsm_compress compress;          /* Compression callbacks */
   307    318   
   308    319     /* Sub-system handles */
   309    320     FileSystem *pFS;                /* On-disk portion of database */
   310    321     Database *pDatabase;            /* Database shared data */
   311    322   
   312    323     /* Client transaction context */
   313         -  Snapshot *pClient;              /* Client snapshot (non-NULL in read trans) */
          324  +  Snapshot *pClient;              /* Client snapshot */
   314    325     int iReader;                    /* Read lock held (-1 == unlocked) */
   315    326     MultiCursor *pCsr;              /* List of all open cursors */
   316    327     LogWriter *pLogWriter;          /* Context for writing to the log file */
   317    328     int nTransOpen;                 /* Number of opened write transactions */
   318    329     int nTransAlloc;                /* Allocated size of aTrans[] array */
   319    330     TransMark *aTrans;              /* Array of marks for transaction rollback */
   320    331     IntArray rollback;              /* List of tree-nodes to roll back */
   321    332   
   322    333     /* Worker context */
   323    334     Snapshot *pWorker;              /* Worker snapshot (or NULL) */
   324    335     Freelist *pFreelist;            /* See sortedNewToplevel() */
   325    336     int bUseFreelist;               /* True to use pFreelist */
          337  +  int bIncrMerge;                 /* True if currently doing a merge */
   326    338   
   327    339     /* Debugging message callback */
   328    340     void (*xLog)(void *, int, const char *);
   329    341     void *pLogCtx;
   330    342   
   331    343     /* Work done notification callback */
   332    344     void (*xWork)(lsm_db *, void *);
................................................................................
   343    355   };
   344    356   
   345    357   struct Segment {
   346    358     Pgno iFirst;                     /* First page of this run */
   347    359     Pgno iLastPg;                    /* Last page of this run */
   348    360     Pgno iRoot;                      /* Root page number (if any) */
   349    361     int nSize;                       /* Size of this run in pages */
          362  +
          363  +  Redirect *pRedirect;             /* Block redirects (or NULL) */
   350    364   };
   351    365   
   352    366   /*
   353    367   ** iSplitTopic/pSplitKey/nSplitKey:
   354    368   **   If nRight>0, this buffer contains a copy of the largest key that has
   355    369   **   already been written to the left-hand-side of the level.
   356    370   */
................................................................................
   510    524   ** Database below for futher details.
   511    525   */
   512    526   struct Snapshot {
   513    527     Database *pDatabase;            /* Database this snapshot belongs to */
   514    528     Level *pLevel;                  /* Pointer to level 0 of snapshot (or NULL) */
   515    529     i64 iId;                        /* Snapshot id */
   516    530     i64 iLogOff;                    /* Log file offset */
          531  +  Redirect redirect;              /* Block redirection array */
   517    532   
   518    533     /* Used by worker snapshots only */
   519    534     int nBlock;                     /* Number of blocks in database file */
   520    535     Pgno aiAppend[LSM_APPLIST_SZ];  /* Append point list */
   521    536     Freelist freelist;              /* Free block list */
   522    537     u32 nWrite;                     /* Total number of pages written to disk */
   523    538   };
................................................................................
   650    665   
   651    666   int lsmFsSectorSize(FileSystem *);
   652    667   
   653    668   void lsmSortedSplitkey(lsm_db *, Level *, int *);
   654    669   
   655    670   /* Reading sorted run content. */
   656    671   int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg);
   657         -int lsmFsDbPageGet(FileSystem *, Pgno, Page **);
          672  +int lsmFsDbPageGet(FileSystem *, Segment *, Pgno, Page **);
   658    673   int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **);
   659    674   
   660    675   u8 *lsmFsPageData(Page *, int *);
   661    676   int lsmFsPageRelease(Page *);
   662    677   int lsmFsPagePersist(Page *);
   663    678   void lsmFsPageRef(Page *);
   664    679   Pgno lsmFsPageNumber(Page *);
................................................................................
   704    719   int lsmEnvShmMap(lsm_env *, lsm_file *, int, int, void **); 
   705    720   void lsmEnvShmBarrier(lsm_env *);
   706    721   void lsmEnvShmUnmap(lsm_env *, lsm_file *, int);
   707    722   
   708    723   void lsmEnvSleep(lsm_env *, int);
   709    724   
   710    725   int lsmFsReadSyncedId(lsm_db *db, int, i64 *piVal);
          726  +
          727  +int lsmFsSegmentContainsPg(FileSystem *pFS, Segment *, Pgno, int *);
          728  +Pgno lsmFsRedirectPage(FileSystem *, Redirect *, Pgno);
   711    729   
   712    730   /*
   713    731   ** End of functions from "lsm_file.c".
   714    732   **************************************************************************/
   715    733   
   716    734   /* 
   717    735   ** Functions from file "lsm_sorted.c".
................................................................................
   823    841   void lsmSnapshotSetCkptid(Snapshot *, i64);
   824    842   
   825    843   Level *lsmDbSnapshotLevel(Snapshot *);
   826    844   void lsmDbSnapshotSetLevel(Snapshot *, Level *);
   827    845   
   828    846   void lsmDbRecoveryComplete(lsm_db *, int);
   829    847   
   830         -int lsmBlockAllocate(lsm_db *, int *);
          848  +int lsmBlockAllocate(lsm_db *, int, int *);
   831    849   int lsmBlockFree(lsm_db *, int);
   832    850   int lsmBlockRefree(lsm_db *, int);
   833    851   
   834    852   void lsmFreelistDeltaBegin(lsm_db *);
   835    853   void lsmFreelistDeltaEnd(lsm_db *);
   836    854   int lsmFreelistDelta(lsm_db *pDb);
   837    855   

Changes to src/lsm_ckpt.c.

    63     63   **     6. For each segment in the merge:
    64     64   **        5a. Page number of next cell to read during merge (this field
    65     65   **            is 64-bits - 2 integers)
    66     66   **        5b. Cell number of next cell to read during merge
    67     67   **     7. Page containing current split-key (64-bits - 2 integers).
    68     68   **     8. Cell within page containing current split-key.
    69     69   **     9. Current pointer value (64-bits - 2 integers).
           70  +**
           71  +**   The block redirect array:
           72  +**
           73  +**     1. Number of redirections (maximum LSM_MAX_BLOCK_REDIRECTS).
           74  +**     2. For each redirection:
           75  +**        a. "from" block number
           76  +**        b. "to" block number
    70     77   **
    71     78   **   The in-memory freelist entries. Each entry is either an insert or a
    72     79   **   delete. The in-memory freelist is to the free-block-list as the
    73     80   **   in-memory tree is to the users database content.
    74     81   **
    75     82   **     1. Number of free-list entries stored in checkpoint header.
    76     83   **     2. Number of free blocks (in total).
................................................................................
   415    422   
   416    423     /* Serialize nLevel levels. */
   417    424     iLevel = 0;
   418    425     for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
   419    426       ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
   420    427       iLevel++;
   421    428     }
          429  +
          430  +  /* Write the block-redirect list */
          431  +  ckptSetValue(&ckpt, iOut++, pSnap->redirect.n, &rc);
          432  +  for(i=0; i<pSnap->redirect.n; i++){
          433  +    ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iFrom, &rc);
          434  +    ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iTo, &rc);
          435  +  }
   422    436   
   423    437     /* Write the freelist */
   424    438     assert( pSnap->freelist.nEntry<=pDb->nMaxFreelist );
   425    439     if( rc==LSM_OK ){
   426    440       int nFree = pSnap->freelist.nEntry;
   427    441       ckptSetValue(&ckpt, iOut++, nFree, &rc);
   428    442       for(i=0; i<nFree; i++){
................................................................................
   904    918         return LSM_PROTOCOL;
   905    919       }
   906    920     }
   907    921   
   908    922     rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
   909    923     if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;
   910    924   
          925  +#if 0
   911    926     assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
          927  +#endif
   912    928     return rc;
   913    929   }
   914    930   
   915    931   int lsmCheckpointDeserialize(
   916    932     lsm_db *pDb, 
   917    933     int bInclFreelist,              /* If true, deserialize free-list */
   918    934     u32 *aCkpt, 
................................................................................
   919    935     Snapshot **ppSnap
   920    936   ){
   921    937     int rc = LSM_OK;
   922    938     Snapshot *pNew;
   923    939   
   924    940     pNew = (Snapshot *)lsmMallocZeroRc(pDb->pEnv, sizeof(Snapshot), &rc);
   925    941     if( rc==LSM_OK ){
          942  +    Level *pLvl;
   926    943       int nFree;
   927    944       int i;
   928    945       int nLevel = (int)aCkpt[CKPT_HDR_NLEVEL];
   929    946       int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE;
   930    947   
   931    948       pNew->iId = lsmCheckpointId(aCkpt, 0);
   932    949       pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK];
................................................................................
   935    952       pNew->iLogOff = lsmCheckpointLogOffset(aCkpt);
   936    953   
   937    954       /* Make a copy of the append-list */
   938    955       for(i=0; i<LSM_APPLIST_SZ; i++){
   939    956         u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
   940    957         pNew->aiAppend[i] = ckptRead64(a);
   941    958       }
          959  +
          960  +    /* Read the block-redirect list */
          961  +    pNew->redirect.n = aCkpt[iIn++];
          962  +    if( pNew->redirect.n ){
          963  +      pNew->redirect.a = lsmMallocZeroRc(pDb->pEnv, 
          964  +          (sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS), &rc
          965  +      );
          966  +      if( rc==LSM_OK ){
          967  +        for(i=0; i<pNew->redirect.n; i++){
          968  +          pNew->redirect.a[i].iFrom = aCkpt[iIn++];
          969  +          pNew->redirect.a[i].iTo = aCkpt[iIn++];
          970  +        }
          971  +      }
          972  +      for(pLvl=pNew->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
          973  +      if( pLvl->nRight ){
          974  +        pLvl->aRhs[pLvl->nRight-1].pRedirect = &pNew->redirect;
          975  +      }else{
          976  +        pLvl->lhs.pRedirect = &pNew->redirect;
          977  +      }
          978  +    }
   942    979   
   943    980       /* Copy the free-list */
   944         -    if( bInclFreelist ){
          981  +    if( rc==LSM_OK && bInclFreelist ){
   945    982         nFree = aCkpt[iIn++];
   946    983         if( nFree ){
   947    984           pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc(
   948    985               pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
   949    986           );
   950    987           if( rc==LSM_OK ){
   951    988             int i;
................................................................................
  1009   1046   int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
  1010   1047     Snapshot *pSnap = pDb->pWorker;
  1011   1048     ShmHeader *pShm = pDb->pShmhdr;
  1012   1049     void *p = 0;
  1013   1050     int n = 0;
  1014   1051     int rc;
  1015   1052   
  1016         -  rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n);
         1053  +  pSnap->iId++;
         1054  +  rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId, 1, &p, &n);
  1017   1055     if( rc!=LSM_OK ) return rc;
  1018   1056     assert( ckptChecksumOk((u32 *)p) );
  1019   1057   
  1020   1058     assert( n<=LSM_META_PAGE_SIZE );
  1021   1059     memcpy(pShm->aSnap2, p, n);
  1022   1060     lsmShmBarrier(pDb);
  1023   1061     memcpy(pShm->aSnap1, p, n);

Changes to src/lsm_file.c.

   875    875       i64 nMin = (i64)nBlock * (i64)pFS->nBlocksize;
   876    876       fsGrowMapping(pFS, nMin, &rc);
   877    877       if( rc!=LSM_OK ) return rc;
   878    878     }
   879    879     return lsmEnvSync(pFS->pEnv, pFS->fdDb);
   880    880   }
   881    881   
   882         -static int fsPageGet(FileSystem *, Pgno, int, Page **, int *);
          882  +static int fsPageGet(FileSystem *, Segment *, Pgno, int, Page **, int *);
          883  +
          884  +static int fsRedirectBlock(Redirect *p, int iBlk){
          885  +  if( p ){
          886  +    int i;
          887  +    for(i=0; i<p->n; i++){
          888  +      if( iBlk==p->a[i].iFrom ) return p->a[i].iTo;
          889  +    }
          890  +  }
          891  +  assert( iBlk!=0 );
          892  +  return iBlk;
          893  +}
          894  +
          895  +Pgno lsmFsRedirectPage(FileSystem *pFS, Redirect *pRedir, Pgno iPg){
          896  +  Pgno iReal = iPg;
          897  +
          898  +  if( pRedir ){
          899  +    const int nPagePerBlock = (
          900  +        pFS->pCompress ? pFS->nBlocksize : (pFS->nBlocksize / pFS->nPagesize)
          901  +    );
          902  +    int iBlk = fsPageToBlock(pFS, iPg);
          903  +    int i;
          904  +    for(i=0; i<pRedir->n; i++){
          905  +      int iFrom = pRedir->a[i].iFrom;
          906  +      if( iFrom>iBlk ) break;
          907  +      if( iFrom==iBlk ){
          908  +        int iTo = pRedir->a[i].iTo;
          909  +        iReal = iPg - (Pgno)(iFrom - iTo) * nPagePerBlock;
          910  +        if( iTo==1 ){
          911  +          iReal += (fsFirstPageOnBlock(pFS, 1)-1);
          912  +        }
          913  +        break;
          914  +      }
          915  +    }
          916  +  }
          917  +
          918  +  assert( iReal!=0 );
          919  +  return iReal;
          920  +}
   883    921   
   884    922   /*
   885    923   ** Parameter iBlock is a database file block. This function reads the value 
   886    924   ** stored in the blocks "next block" pointer and stores it in *piNext.
   887    925   ** LSM_OK is returned if everything is successful, or an LSM error code
   888    926   ** otherwise.
   889    927   */
   890    928   static int fsBlockNext(
   891    929     FileSystem *pFS,                /* File-system object handle */
          930  +  Segment *pSeg,                  /* Use this segment for block redirects */
   892    931     int iBlock,                     /* Read field from this block */
   893    932     int *piNext                     /* OUT: Next block in linked list */
   894    933   ){
   895    934     int rc;
          935  +  int iRead;                      /* Read block from here */
          936  +  
          937  +  if( pSeg ){
          938  +    iRead = fsRedirectBlock(pSeg->pRedirect, iBlock);
          939  +  }else{
          940  +    iRead = iBlock;
          941  +  }
   896    942   
   897    943     assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
   898    944     if( pFS->pCompress ){
   899    945       i64 iOff;                     /* File offset to read data from */
   900    946       u8 aNext[4];                  /* 4-byte pointer read from db file */
   901    947   
   902         -    iOff = (i64)iBlock * pFS->nBlocksize - sizeof(aNext);
          948  +    iOff = (i64)iRead * pFS->nBlocksize - sizeof(aNext);
   903    949       rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext));
   904    950       if( rc==LSM_OK ){
   905    951         *piNext = (int)lsmGetU32(aNext);
   906    952       }
   907    953     }else{
   908    954       const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
   909    955       Page *pLast;
   910         -    rc = fsPageGet(pFS, iBlock*nPagePerBlock, 0, &pLast, 0);
          956  +    rc = fsPageGet(pFS, 0, iRead*nPagePerBlock, 0, &pLast, 0);
   911    957       if( rc==LSM_OK ){
   912    958         *piNext = lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
   913    959         lsmFsPageRelease(pLast);
   914    960       }
   915    961     }
          962  +
          963  +  if( pSeg ){
          964  +    *piNext = fsRedirectBlock(pSeg->pRedirect, *piNext);
          965  +  }
   916    966     return rc;
   917    967   }
   918    968   
   919    969   /*
   920    970   ** Return the page number of the last page on the same block as page iPg.
   921    971   */
   922    972   Pgno fsLastPageOnPagesBlock(FileSystem *pFS, Pgno iPg){
................................................................................
   924    974   }
   925    975   
   926    976   /*
   927    977   ** This function is only called in compressed database mode.
   928    978   */
   929    979   static int fsReadData(
   930    980     FileSystem *pFS,                /* File-system handle */
          981  +  Segment *pSeg,                  /* Block redirection */
   931    982     i64 iOff,                       /* Read data from this offset */
   932    983     u8 *aData,                      /* Buffer to read data into */
   933    984     int nData                       /* Number of bytes to read */
   934    985   ){
   935    986     i64 iEob;                       /* End of block */
   936    987     int nRead;
   937    988     int rc;
................................................................................
   941    992     iEob = fsLastPageOnPagesBlock(pFS, iOff) + 1;
   942    993     nRead = LSM_MIN(iEob - iOff, nData);
   943    994   
   944    995     rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead);
   945    996     if( rc==LSM_OK && nRead!=nData ){
   946    997       int iBlk;
   947    998   
   948         -    rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk);
          999  +    rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
   949   1000       if( rc==LSM_OK ){
   950   1001         i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk);
   951   1002         rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff2, &aData[nRead], nData-nRead);
   952   1003       }
   953   1004     }
   954   1005   
   955   1006     return rc;
................................................................................
   959   1010   ** Parameter iBlock is a database file block. This function reads the value 
   960   1011   ** stored in the blocks "previous block" pointer and stores it in *piPrev.
   961   1012   ** LSM_OK is returned if everything is successful, or an LSM error code
   962   1013   ** otherwise.
   963   1014   */
   964   1015   static int fsBlockPrev(
   965   1016     FileSystem *pFS,                /* File-system object handle */
         1017  +  Segment *pSeg,                  /* Use this segment for block redirects */
   966   1018     int iBlock,                     /* Read field from this block */
   967   1019     int *piPrev                     /* OUT: Previous block in linked list */
   968   1020   ){
   969   1021     int rc = LSM_OK;                /* Return code */
   970   1022   
   971   1023     assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
   972   1024     assert( iBlock>0 );
   973   1025   
   974   1026     if( pFS->pCompress ){
   975   1027       i64 iOff = fsFirstPageOnBlock(pFS, iBlock) - 4;
   976   1028       u8 aPrev[4];                  /* 4-byte pointer read from db file */
   977   1029       rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aPrev, sizeof(aPrev));
   978   1030       if( rc==LSM_OK ){
   979         -      *piPrev = (int)lsmGetU32(aPrev);
         1031  +      Redirect *pRedir = (pSeg ? pSeg->pRedirect : 0);
         1032  +      *piPrev = fsRedirectBlock(pRedir, (int)lsmGetU32(aPrev));
   980   1033       }
   981   1034     }else{
   982   1035       assert( 0 );
   983   1036     }
   984   1037     return rc;
   985   1038   }
   986   1039   
................................................................................
   997   1050     nByte  = (aBuf[0] & 0x7F) << 14;
   998   1051     nByte += (aBuf[1] & 0x7F) << 7;
   999   1052     nByte += (aBuf[2] & 0x7F);
  1000   1053     *pbFree = !(aBuf[1] & 0x80);
  1001   1054     return nByte;
  1002   1055   }
  1003   1056   
  1004         -static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){
         1057  +static int fsSubtractOffset(
         1058  +  FileSystem *pFS, 
         1059  +  Segment *pSeg,
         1060  +  i64 iOff, 
         1061  +  int iSub, 
         1062  +  i64 *piRes
         1063  +){
  1005   1064     i64 iStart;
  1006   1065     int iBlk = 0;
  1007   1066     int rc;
  1008   1067   
  1009   1068     assert( pFS->pCompress );
  1010   1069   
  1011   1070     iStart = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iOff));
  1012   1071     if( (iOff-iSub)>=iStart ){
  1013   1072       *piRes = (iOff-iSub);
  1014   1073       return LSM_OK;
  1015   1074     }
  1016   1075   
  1017         -  rc = fsBlockPrev(pFS, fsPageToBlock(pFS, iOff), &iBlk);
         1076  +  rc = fsBlockPrev(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
  1018   1077     *piRes = fsLastPageOnBlock(pFS, iBlk) - iSub + (iOff - iStart + 1);
  1019   1078     return rc;
  1020   1079   }
  1021   1080   
  1022         -static int fsAddOffset(FileSystem *pFS, i64 iOff, int iAdd, i64 *piRes){
         1081  +static int fsAddOffset(
         1082  +  FileSystem *pFS, 
         1083  +  Segment *pSeg,
         1084  +  i64 iOff, 
         1085  +  int iAdd, 
         1086  +  i64 *piRes
         1087  +){
  1023   1088     i64 iEob;
  1024   1089     int iBlk;
  1025   1090     int rc;
  1026   1091   
  1027   1092     assert( pFS->pCompress );
  1028   1093   
  1029   1094     iEob = fsLastPageOnPagesBlock(pFS, iOff);
  1030   1095     if( (iOff+iAdd)<=iEob ){
  1031   1096       *piRes = (iOff+iAdd);
  1032   1097       return LSM_OK;
  1033   1098     }
  1034   1099   
  1035         -  rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk);
         1100  +  rc = fsBlockNext(pFS, pSeg, fsPageToBlock(pFS, iOff), &iBlk);
  1036   1101     *piRes = fsFirstPageOnBlock(pFS, iBlk) + iAdd - (iEob - iOff + 1);
  1037   1102     return rc;
  1038   1103   }
  1039   1104   
  1040   1105   static int fsAllocateBuffer(FileSystem *pFS, int bWrite){
  1041   1106     u8 **pp;                        /* Pointer to either aIBuffer or aOBuffer */
  1042   1107   
................................................................................
  1066   1131   ** uncompresses the compressed data for page pPg from the database and
  1067   1132   ** populates the pPg->aData[] buffer and pPg->nCompress field.
  1068   1133   **
  1069   1134   ** LSM_OK is returned if successful, or an LSM error code otherwise.
  1070   1135   */
  1071   1136   static int fsReadPagedata(
  1072   1137     FileSystem *pFS,                /* File-system handle */
         1138  +  Segment *pSeg,                  /* pPg is part of this segment */
  1073   1139     Page *pPg,                      /* Page to read and uncompress data for */
  1074   1140     int *pnSpace                    /* OUT: Total bytes of free space */
  1075   1141   ){
  1076   1142     lsm_compress *p = pFS->pCompress;
  1077   1143     i64 iOff = pPg->iPg;
  1078   1144     u8 aSz[3];
  1079   1145     int rc;
  1080   1146   
  1081   1147     assert( p && pPg->nCompress==0 );
  1082   1148   
  1083   1149     if( fsAllocateBuffer(pFS, 0) ) return LSM_NOMEM;
  1084   1150   
  1085         -  rc = fsReadData(pFS, iOff, aSz, sizeof(aSz));
         1151  +  rc = fsReadData(pFS, pSeg, iOff, aSz, sizeof(aSz));
  1086   1152   
  1087   1153     if( rc==LSM_OK ){
  1088   1154       int bFree;
  1089   1155       if( aSz[0] & 0x80 ){
  1090   1156         pPg->nCompress = (int)getRecordSize(aSz, &bFree);
  1091   1157       }else{
  1092   1158         pPg->nCompress = (int)aSz[0] - sizeof(aSz)*2;
................................................................................
  1095   1161       if( bFree ){
  1096   1162         if( pnSpace ){
  1097   1163           *pnSpace = pPg->nCompress + sizeof(aSz)*2;
  1098   1164         }else{
  1099   1165           rc = LSM_CORRUPT_BKPT;
  1100   1166         }
  1101   1167       }else{
  1102         -      rc = fsAddOffset(pFS, iOff, 3, &iOff);
         1168  +      rc = fsAddOffset(pFS, pSeg, iOff, 3, &iOff);
  1103   1169         if( rc==LSM_OK ){
  1104   1170           if( pPg->nCompress>pFS->nBuffer ){
  1105   1171             rc = LSM_CORRUPT_BKPT;
  1106   1172           }else{
  1107         -          rc = fsReadData(pFS, iOff, pFS->aIBuffer, pPg->nCompress);
         1173  +          rc = fsReadData(pFS, pSeg, iOff, pFS->aIBuffer, pPg->nCompress);
  1108   1174           }
  1109   1175           if( rc==LSM_OK ){
  1110   1176             int n = pFS->nPagesize;
  1111   1177             rc = p->xUncompress(p->pCtx, 
  1112   1178                 (char *)pPg->aData, &n, 
  1113   1179                 (const char *)pFS->aIBuffer, pPg->nCompress
  1114   1180             );
................................................................................
  1123   1189   }
  1124   1190   
  1125   1191   /*
  1126   1192   ** Return a handle for a database page.
  1127   1193   */
  1128   1194   static int fsPageGet(
  1129   1195     FileSystem *pFS,                /* File-system handle */
         1196  +  Segment *pSeg,                  /* Block redirection to use (or NULL) */
  1130   1197     Pgno iPg,                       /* Page id */
  1131   1198     int noContent,                  /* True to not load content from disk */
  1132   1199     Page **ppPg,                    /* OUT: New page handle */
  1133   1200     int *pnSpace                    /* OUT: Bytes of free space */
  1134   1201   ){
  1135   1202     Page *p;
  1136   1203     int iHash;
  1137   1204     int rc = LSM_OK;
  1138   1205   
         1206  +  /* In most cases iReal is the same as iPg. Except, if pSeg->pRedirect is 
         1207  +  ** not NULL, and the block containing iPg has been redirected, then iReal
         1208  +  ** is the page number after redirection.  */
         1209  +  Pgno iReal = lsmFsRedirectPage(pFS, (pSeg ? pSeg->pRedirect : 0), iPg);
         1210  +
  1139   1211     assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
         1212  +  assert( iReal>=fsFirstPageOnBlock(pFS, 1) );
  1140   1213     *ppPg = 0;
  1141   1214   
  1142   1215     assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
  1143   1216     if( pFS->bUseMmap ){
  1144   1217       Page *pTest;
  1145         -    i64 iEnd = (i64)iPg * pFS->nPagesize;
         1218  +    i64 iEnd = (i64)iReal * pFS->nPagesize;
  1146   1219       fsGrowMapping(pFS, iEnd, &rc);
  1147   1220       if( rc!=LSM_OK ) return rc;
  1148   1221   
  1149   1222       p = 0;
  1150   1223       for(pTest=pFS->pWaiting; pTest; pTest=pTest->pNextWaiting){
  1151         -      if( pTest->iPg==iPg ){
         1224  +      if( pTest->iPg==iReal ){
         1225  +        assert( iReal==iPg );
  1152   1226           p = pTest;
  1153   1227           p->nRef++;
  1154   1228           *ppPg = p;
  1155   1229           return LSM_OK;
  1156   1230         }
  1157   1231       }
  1158   1232       if( pFS->pFree ){
................................................................................
  1161   1235         assert( p->nRef==0 );
  1162   1236       }else{
  1163   1237         p = lsmMallocZeroRc(pFS->pEnv, sizeof(Page), &rc);
  1164   1238         if( rc ) return rc;
  1165   1239         fsPageAddToLru(pFS, p);
  1166   1240         p->pFS = pFS;
  1167   1241       }
  1168         -    p->aData = &((u8 *)pFS->pMap)[pFS->nPagesize * (i64)(iPg-1)];
  1169         -    p->iPg = iPg;
         1242  +    p->aData = &((u8 *)pFS->pMap)[pFS->nPagesize * (iReal-1)];
         1243  +    p->iPg = iReal;
  1170   1244     }else{
  1171   1245   
  1172   1246       /* Search the hash-table for the page */
  1173         -    iHash = fsHashKey(pFS->nHash, iPg);
         1247  +    iHash = fsHashKey(pFS->nHash, iReal);
  1174   1248       for(p=pFS->apHash[iHash]; p; p=p->pHashNext){
  1175         -      if( p->iPg==iPg) break;
         1249  +      if( p->iPg==iReal) break;
  1176   1250       }
  1177   1251   
  1178   1252       if( p==0 ){
  1179   1253         rc = fsPageBuffer(pFS, 1, &p);
  1180   1254         if( rc==LSM_OK ){
  1181   1255           int nSpace = 0;
  1182         -        p->iPg = iPg;
         1256  +        p->iPg = iReal;
  1183   1257           p->nRef = 0;
  1184   1258           p->pFS = pFS;
  1185   1259           assert( p->flags==0 || p->flags==PAGE_FREE );
  1186   1260   
  1187   1261   #ifdef LSM_DEBUG
  1188   1262           memset(p->aData, 0x56, pFS->nPagesize);
  1189   1263   #endif
  1190   1264           assert( p->pLruNext==0 && p->pLruPrev==0 );
  1191   1265           if( noContent==0 ){
  1192   1266             if( pFS->pCompress ){
  1193         -            rc = fsReadPagedata(pFS, p, &nSpace);
         1267  +            rc = fsReadPagedata(pFS, pSeg, p, &nSpace);
  1194   1268             }else{
  1195   1269               int nByte = pFS->nPagesize;
  1196         -            i64 iOff = (i64)(iPg-1) * pFS->nPagesize;
         1270  +            i64 iOff = (i64)(iReal-1) * pFS->nPagesize;
  1197   1271               rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, p->aData, nByte);
  1198   1272             }
  1199   1273             pFS->nRead++;
  1200   1274           }
  1201   1275   
  1202   1276           /* If the xRead() call was successful (or not attempted), link the
  1203   1277            ** page into the page-cache hash-table. Otherwise, if it failed,
................................................................................
  1217   1291   
  1218   1292       assert( (rc==LSM_OK && (p || (pnSpace && *pnSpace)))
  1219   1293            || (rc!=LSM_OK && p==0) 
  1220   1294       );
  1221   1295     }
  1222   1296   
  1223   1297     if( rc==LSM_OK && p ){
  1224         -    if( pFS->pCompress==0 && (fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg)) ){
         1298  +    if( pFS->pCompress==0 && (fsIsLast(pFS, iReal) || fsIsFirst(pFS, iReal)) ){
  1225   1299         p->nData = pFS->nPagesize - 4;
  1226         -      if( fsIsFirst(pFS, iPg) && p->nRef==0 ){
         1300  +      if( fsIsFirst(pFS, iReal) && p->nRef==0 ){
  1227   1301           p->aData += 4;
  1228   1302           p->flags |= PAGE_HASPREV;
  1229   1303         }
  1230   1304       }else{
  1231   1305         p->nData = pFS->nPagesize;
  1232   1306       }
  1233   1307       pFS->nOut += (p->nRef==0);
................................................................................
  1359   1433       iBlk = fsPageToBlock(pFS, pDel->iFirst);
  1360   1434       iLastBlk = fsPageToBlock(pFS, pDel->iLastPg);
  1361   1435   
  1362   1436       /* Mark all blocks currently used by this sorted run as free */
  1363   1437       while( iBlk && rc==LSM_OK ){
  1364   1438         int iNext = 0;
  1365   1439         if( iBlk!=iLastBlk ){
  1366         -        rc = fsBlockNext(pFS, iBlk, &iNext);
         1440  +        rc = fsBlockNext(pFS, pDel, iBlk, &iNext);
  1367   1441         }else if( bZero==0 && pDel->iLastPg!=fsLastPageOnBlock(pFS, iLastBlk) ){
  1368   1442           break;
  1369   1443         }
  1370   1444         rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk);
  1371   1445         iBlk = iNext;
  1372   1446       }
         1447  +
         1448  +    if( pDel->pRedirect ){
         1449  +      assert( pDel->pRedirect==&pSnapshot->redirect );
         1450  +      pSnapshot->redirect.n = 0;
         1451  +    }
  1373   1452   
  1374   1453       if( bZero ) memset(pDel, 0, sizeof(Segment));
  1375   1454     }
  1376   1455     return LSM_OK;
  1377   1456   }
  1378   1457   
  1379   1458   static Pgno firstOnBlock(FileSystem *pFS, int iBlk, Pgno *aPgno, int nPgno){
................................................................................
  1384   1463       if( fsPageToBlock(pFS, iPg)==iBlk && (iRet==0 || iPg<iRet) ){
  1385   1464         iRet = iPg;
  1386   1465       }
  1387   1466     }
  1388   1467     return iRet;
  1389   1468   }
  1390   1469   
         1470  +#ifndef NDEBUG
         1471  +/*
         1472  +** Return true if page iPg, which is a part of segment p, lies on
         1473  +** a redirected block. 
         1474  +*/
         1475  +static int fsPageRedirects(FileSystem *pFS, Segment *p, Pgno iPg){
         1476  +  return (iPg!=0 && iPg!=lsmFsRedirectPage(pFS, p->pRedirect, iPg));
         1477  +}
         1478  +
         1479  +/*
         1480  +** Return true if the second argument is not NULL and any of the first
         1481  +** last or root pages lie on a redirected block. 
         1482  +*/
         1483  +static int fsSegmentRedirects(FileSystem *pFS, Segment *p){
         1484  +  return (p && (
         1485  +      fsPageRedirects(pFS, p, p->iFirst)
         1486  +   || fsPageRedirects(pFS, p, p->iRoot)
         1487  +   || fsPageRedirects(pFS, p, p->iLastPg)
         1488  +  ));
         1489  +}
         1490  +#endif
         1491  +
  1391   1492   /*
  1392   1493   ** Argument aPgno is an array of nPgno page numbers. All pages belong to
  1393   1494   ** the segment pRun. This function gobbles from the start of the run to the
  1394   1495   ** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is
  1395   1496   ** the new first page of the run).
  1396   1497   */
  1397   1498   void lsmFsGobble(
................................................................................
  1402   1503   ){
  1403   1504     int rc = LSM_OK;
  1404   1505     FileSystem *pFS = pDb->pFS;
  1405   1506     Snapshot *pSnapshot = pDb->pWorker;
  1406   1507     int iBlk;
  1407   1508   
  1408   1509     assert( pRun->nSize>0 );
         1510  +  assert( 0==fsSegmentRedirects(pFS, pRun) );
         1511  +  assert( nPgno>0 && 0==fsPageRedirects(pFS, pRun, aPgno[0]) );
         1512  +
  1409   1513     iBlk = fsPageToBlock(pFS, pRun->iFirst);
  1410   1514     pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  1411   1515   
  1412   1516     while( rc==LSM_OK ){
  1413   1517       int iNext = 0;
  1414   1518       Pgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno);
  1415   1519       if( iFirst ){
  1416   1520         pRun->iFirst = iFirst;
  1417   1521         break;
  1418   1522       }
  1419         -    rc = fsBlockNext(pFS, iBlk, &iNext);
         1523  +    rc = fsBlockNext(pFS, pRun, iBlk, &iNext);
  1420   1524       if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, pRun, iBlk);
  1421   1525       pRun->nSize -= (
  1422   1526           1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk)
  1423   1527       );
  1424   1528       iBlk = iNext;
  1425   1529     }
  1426   1530   
  1427   1531     pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  1428   1532     assert( pRun->nSize>0 );
  1429   1533   }
  1430   1534   
         1535  +/*
         1536  +** This function is only used in compressed database mode.
         1537  +**
         1538  +** Argument iPg is the page number (byte offset) of a page within segment
         1539  +** pSeg. The page record, including all headers, is nByte bytes in size.
         1540  +** Before returning, set *piNext to the page number of the next page in
         1541  +** the segment, or to zero if iPg is the last.
         1542  +**
         1543  +** In other words, do:
         1544  +**
         1545  +**   *piNext = iPg + nByte;
         1546  +**
         1547  +** But take block overflow and redirection into account.
         1548  +*/
  1431   1549   static int fsNextPageOffset(
  1432   1550     FileSystem *pFS,                /* File system object */
  1433   1551     Segment *pSeg,                  /* Segment to move within */
  1434   1552     Pgno iPg,                       /* Offset of current page */
  1435   1553     int nByte,                      /* Size of current page including headers */
  1436   1554     Pgno *piNext                    /* OUT: Offset of next page. Or zero (EOF) */
  1437   1555   ){
  1438   1556     Pgno iNext;
  1439   1557     int rc;
  1440   1558   
  1441   1559     assert( pFS->pCompress );
  1442   1560   
  1443         -  rc = fsAddOffset(pFS, iPg, nByte-1, &iNext);
         1561  +  rc = fsAddOffset(pFS, pSeg, iPg, nByte-1, &iNext);
  1444   1562     if( pSeg && iNext==pSeg->iLastPg ){
  1445   1563       iNext = 0;
  1446   1564     }else if( rc==LSM_OK ){
  1447         -    rc = fsAddOffset(pFS, iNext, 1, &iNext);
         1565  +    rc = fsAddOffset(pFS, pSeg, iNext, 1, &iNext);
  1448   1566     }
  1449   1567   
  1450   1568     *piNext = iNext;
  1451   1569     return rc;
  1452   1570   }
  1453   1571   
  1454         -static int fsGetPageBefore(FileSystem *pFS, i64 iOff, Pgno *piPrev){
         1572  +static int fsGetPageBefore(
         1573  +  FileSystem *pFS, 
         1574  +  Segment *pSeg, 
         1575  +  i64 iOff, 
         1576  +  Pgno *piPrev
         1577  +){
  1455   1578     u8 aSz[3];
  1456   1579     int rc;
  1457   1580     i64 iRead;
  1458   1581   
  1459         -  rc = fsSubtractOffset(pFS, iOff, sizeof(aSz), &iRead);
  1460         -  if( rc==LSM_OK ) rc = fsReadData(pFS, iRead, aSz, sizeof(aSz));
         1582  +  rc = fsSubtractOffset(pFS, pSeg, iOff, sizeof(aSz), &iRead);
         1583  +  if( rc==LSM_OK ) rc = fsReadData(pFS, pSeg, iRead, aSz, sizeof(aSz));
  1461   1584   
  1462   1585     if( rc==LSM_OK ){
  1463   1586       int bFree;
  1464   1587       int nSz;
  1465   1588       if( aSz[2] & 0x80 ){
  1466   1589         nSz = getRecordSize(aSz, &bFree) + sizeof(aSz)*2;
  1467   1590       }else{
  1468   1591         nSz = (int)(aSz[2] & 0x7F);
  1469   1592         bFree = 1;
  1470   1593       }
  1471         -    rc = fsSubtractOffset(pFS, iOff, nSz, piPrev);
         1594  +    rc = fsSubtractOffset(pFS, pSeg, iOff, nSz, piPrev);
  1472   1595     }
  1473   1596   
  1474   1597     return rc;
  1475   1598   }
  1476   1599   
  1477   1600   /*
  1478   1601   ** The first argument to this function is a valid reference to a database
................................................................................
  1496   1619   ** caller using lsmFsPageRelease().
  1497   1620   */
  1498   1621   int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){
  1499   1622     int rc = LSM_OK;
  1500   1623     FileSystem *pFS = pPg->pFS;
  1501   1624     Pgno iPg = pPg->iPg;
  1502   1625   
         1626  +  assert( 0==fsSegmentRedirects(pFS, pRun) );
  1503   1627     if( pFS->pCompress ){
  1504   1628       int nSpace = pPg->nCompress + 2*3;
  1505   1629   
  1506   1630       do {
  1507   1631         if( eDir>0 ){
  1508   1632           rc = fsNextPageOffset(pFS, pRun, iPg, nSpace, &iPg);
  1509   1633         }else{
  1510   1634           if( iPg==pRun->iFirst ){
  1511   1635             iPg = 0;
  1512   1636           }else{
  1513         -          rc = fsGetPageBefore(pFS, iPg, &iPg);
         1637  +          rc = fsGetPageBefore(pFS, pRun, iPg, &iPg);
  1514   1638           }
  1515   1639         }
  1516   1640   
  1517   1641         nSpace = 0;
  1518   1642         if( iPg!=0 ){
  1519         -        rc = fsPageGet(pFS, iPg, 0, ppNext, &nSpace);
         1643  +        rc = fsPageGet(pFS, pRun, iPg, 0, ppNext, &nSpace);
  1520   1644           assert( (*ppNext==0)==(rc!=LSM_OK || nSpace>0) );
  1521   1645         }else{
  1522   1646           *ppNext = 0;
  1523   1647         }
  1524   1648       }while( nSpace>0 && rc==LSM_OK );
  1525   1649   
  1526   1650     }else{
         1651  +    Redirect *pRedir = pRun ? pRun->pRedirect : 0;
  1527   1652       assert( eDir==1 || eDir==-1 );
  1528   1653       if( eDir<0 ){
  1529   1654         if( pRun && iPg==pRun->iFirst ){
  1530   1655           *ppNext = 0;
  1531   1656           return LSM_OK;
  1532   1657         }else if( fsIsFirst(pFS, iPg) ){
  1533   1658           assert( pPg->flags & PAGE_HASPREV );
  1534   1659           iPg = fsLastPageOnBlock(pFS, lsmGetU32(&pPg->aData[-4]));
  1535   1660         }else{
  1536   1661           iPg--;
  1537   1662         }
  1538   1663       }else{
  1539         -      if( pRun && iPg==pRun->iLastPg ){
  1540         -        *ppNext = 0;
  1541         -        return LSM_OK;
  1542         -      }else if( fsIsLast(pFS, iPg) ){
  1543         -        iPg = fsFirstPageOnBlock(pFS, lsmGetU32(&pPg->aData[pFS->nPagesize-4]));
         1664  +      if( pRun ){
         1665  +        if( iPg==pRun->iLastPg ){
         1666  +          *ppNext = 0;
         1667  +          return LSM_OK;
         1668  +        }
         1669  +      }
         1670  +
         1671  +      if( fsIsLast(pFS, iPg) ){
         1672  +        int iBlk = fsRedirectBlock(
         1673  +            pRedir, lsmGetU32(&pPg->aData[pFS->nPagesize-4])
         1674  +        );
         1675  +        iPg = fsFirstPageOnBlock(pFS, iBlk);
  1544   1676         }else{
  1545   1677           iPg++;
  1546   1678         }
  1547   1679       }
  1548         -    rc = fsPageGet(pFS, iPg, 0, ppNext, 0);
         1680  +    rc = fsPageGet(pFS, pRun, iPg, 0, ppNext, 0);
  1549   1681     }
  1550   1682   
  1551   1683     return rc;
  1552   1684   }
  1553   1685   
  1554   1686   static Pgno findAppendPoint(FileSystem *pFS, Level *pLvl){
  1555   1687     int i;
................................................................................
  1588   1720     int rc = LSM_OK;
  1589   1721     Page *pPg = 0;
  1590   1722     *ppOut = 0;
  1591   1723     int iApp = 0;
  1592   1724     int iNext = 0;
  1593   1725     Segment *p = &pLvl->lhs;
  1594   1726     int iPrev = p->iLastPg;
         1727  +
         1728  +  assert( p->pRedirect==0 );
  1595   1729   
  1596   1730     if( pFS->pCompress || bDefer ){
  1597   1731       /* In compressed database mode the page is not assigned a page number
  1598   1732       ** or location in the database file at this point. This will be done
  1599   1733       ** by the lsmFsPagePersist() call.  */
  1600   1734       rc = fsPageBuffer(pFS, 1, &pPg);
  1601   1735       if( rc==LSM_OK ){
................................................................................
  1611   1745         pFS->nOut++;
  1612   1746       }
  1613   1747     }else{
  1614   1748       if( iPrev==0 ){
  1615   1749         iApp = findAppendPoint(pFS, pLvl);
  1616   1750       }else if( fsIsLast(pFS, iPrev) ){
  1617   1751         int iNext;
  1618         -      rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPrev), &iNext);
         1752  +      rc = fsBlockNext(pFS, 0, fsPageToBlock(pFS, iPrev), &iNext);
  1619   1753         if( rc!=LSM_OK ) return rc;
  1620   1754         iApp = fsFirstPageOnBlock(pFS, iNext);
  1621   1755       }else{
  1622   1756         iApp = iPrev + 1;
  1623   1757       }
  1624   1758   
  1625   1759       /* If this is the first page allocated, or if the page allocated is the
  1626   1760       ** last in the block, also allocate the next block here.  */
  1627   1761       if( iApp==0 || fsIsLast(pFS, iApp) ){
  1628   1762         int iNew;                     /* New block number */
  1629   1763   
  1630         -      rc = lsmBlockAllocate(pFS->pDb, &iNew);
         1764  +      rc = lsmBlockAllocate(pFS->pDb, 0, &iNew);
  1631   1765         if( rc!=LSM_OK ) return rc;
  1632   1766         if( iApp==0 ){
  1633   1767           iApp = fsFirstPageOnBlock(pFS, iNew);
  1634   1768         }else{
  1635   1769           iNext = fsFirstPageOnBlock(pFS, iNew);
  1636   1770         }
  1637   1771       }
  1638   1772   
  1639   1773       /* Grab the new page. */
  1640   1774       pPg = 0;
  1641         -    rc = fsPageGet(pFS, iApp, 1, &pPg, 0);
         1775  +    rc = fsPageGet(pFS, 0, iApp, 1, &pPg, 0);
  1642   1776       assert( rc==LSM_OK || pPg==0 );
  1643   1777   
  1644   1778       /* If this is the first or last page of a block, fill in the pointer 
  1645   1779        ** value at the end of the new page. */
  1646   1780       if( rc==LSM_OK ){
  1647   1781         p->nSize++;
  1648   1782         p->iLastPg = iApp;
................................................................................
  1663   1797   
  1664   1798   /*
  1665   1799   ** Mark the sorted run passed as the second argument as finished. 
  1666   1800   */
  1667   1801   int lsmFsSortedFinish(FileSystem *pFS, Segment *p){
  1668   1802     int rc = LSM_OK;
  1669   1803     if( p && p->iLastPg ){
         1804  +    assert( p->pRedirect==0 );
  1670   1805   
  1671   1806       /* Check if the last page of this run happens to be the last of a block.
  1672   1807       ** If it is, then an extra block has already been allocated for this run.
  1673   1808       ** Shift this extra block back to the free-block list. 
  1674   1809       **
  1675   1810       ** Otherwise, add the first free page in the last block used by the run
  1676   1811       ** to the lAppend list.
................................................................................
  1682   1817           if( aiAppend[i]==0 ){
  1683   1818             aiAppend[i] = p->iLastPg+1;
  1684   1819             break;
  1685   1820           }
  1686   1821         }
  1687   1822       }else if( pFS->pCompress==0 ){
  1688   1823         Page *pLast;
  1689         -      rc = fsPageGet(pFS, p->iLastPg, 0, &pLast, 0);
         1824  +      rc = fsPageGet(pFS, 0, p->iLastPg, 0, &pLast, 0);
  1690   1825         if( rc==LSM_OK ){
  1691   1826           int iBlk = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
  1692   1827           lsmBlockRefree(pFS->pDb, iBlk);
  1693   1828           lsmFsPageRelease(pLast);
  1694   1829         }
  1695   1830       }else{
  1696   1831         int iBlk = 0;
  1697         -      rc = fsBlockNext(pFS, fsPageToBlock(pFS, p->iLastPg), &iBlk);
         1832  +      rc = fsBlockNext(pFS, p, fsPageToBlock(pFS, p->iLastPg), &iBlk);
  1698   1833         if( rc==LSM_OK ){
  1699   1834           lsmBlockRefree(pFS->pDb, iBlk);
  1700   1835         }
  1701   1836       }
  1702   1837     }
  1703   1838     return rc;
  1704   1839   }
  1705   1840   
  1706   1841   /*
  1707   1842   ** Obtain a reference to page number iPg.
  1708   1843   */
  1709         -int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
         1844  +int lsmFsDbPageGet(FileSystem *pFS, Segment *pSeg, Pgno iPg, Page **ppPg){
  1710   1845     assert( pFS );
  1711         -  return fsPageGet(pFS, iPg, 0, ppPg, 0);
         1846  +  return fsPageGet(pFS, pSeg, iPg, 0, ppPg, 0);
  1712   1847   }
  1713   1848   
  1714   1849   /*
  1715   1850   ** Obtain a reference to the last page in the segment passed as the 
  1716   1851   ** second argument.
  1717   1852   */
  1718   1853   int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
................................................................................
  1719   1854     int rc;
  1720   1855     Pgno iPg = pSeg->iLastPg;
  1721   1856     if( pFS->pCompress ){
  1722   1857       int nSpace;
  1723   1858       iPg++;
  1724   1859       do {
  1725   1860         nSpace = 0;
  1726         -      rc = fsGetPageBefore(pFS, iPg, &iPg);
         1861  +      rc = fsGetPageBefore(pFS, pSeg, iPg, &iPg);
  1727   1862         if( rc==LSM_OK ){
  1728         -        rc = fsPageGet(pFS, iPg, 0, ppPg, &nSpace);
         1863  +        rc = fsPageGet(pFS, pSeg, iPg, 0, ppPg, &nSpace);
  1729   1864         }
  1730   1865       }while( rc==LSM_OK && nSpace>0 );
  1731   1866   
  1732   1867     }else{
  1733         -    rc = fsPageGet(pFS, iPg, 0, ppPg, 0);
         1868  +    rc = fsPageGet(pFS, pSeg, iPg, 0, ppPg, 0);
  1734   1869     }
  1735   1870     return rc;
  1736   1871   }
  1737   1872   
  1738   1873   /*
  1739   1874   ** Return a reference to meta-page iPg. If successful, LSM_OK is returned
  1740   1875   ** and *ppPg populated with the new page reference. The reference should
................................................................................
  1826   1961   
  1827   1962   /*
  1828   1963   ** Return true if page is currently writable.
  1829   1964   */
  1830   1965   int lsmFsPageWritable(Page *pPg){
  1831   1966     return (pPg->flags & PAGE_DIRTY) ? 1 : 0;
  1832   1967   }
         1968  +
         1969  +static void fsMovePage(
         1970  +  FileSystem *pFS, 
         1971  +  Segment *pSeg, 
         1972  +  int iTo, 
         1973  +  int iFrom, 
         1974  +  Pgno *piPg
         1975  +){
         1976  +  Pgno iPg = *piPg;
         1977  +  if( iFrom==fsPageToBlock(pFS, iPg) ){
         1978  +    const int nPagePerBlock = (
         1979  +        pFS->pCompress ? pFS ->nBlocksize : (pFS->nBlocksize / pFS->nPagesize)
         1980  +    );
         1981  +    *piPg = iPg - (Pgno)(iFrom - iTo) * nPagePerBlock;
         1982  +  }
         1983  +}
         1984  +
         1985  +/*
         1986  +** Copy the contents of block iFrom to block iTo. 
         1987  +**
         1988  +** It is safe to assume that there are no outstanding references to pages 
         1989  +** on block iTo. And that block iFrom is not currently being written. In
         1990  +** other words, the data can be read and written directly.
         1991  +*/
         1992  +int lsmFsMoveBlock(FileSystem *pFS, Segment *pSeg, int iTo, int iFrom){
         1993  +  Snapshot *p = pFS->pDb->pWorker;
         1994  +  int rc = LSM_OK;
         1995  +
         1996  +  i64 iFromOff = (i64)(iFrom-1) * pFS->nBlocksize;
         1997  +  i64 iToOff = (i64)(iTo-1) * pFS->nBlocksize;
         1998  +  
         1999  +  assert( iTo!=1 );
         2000  +  assert( iFrom>iTo );
         2001  +
         2002  +  if( pFS->bUseMmap ){
         2003  +    fsGrowMapping(pFS, (i64)iFrom * pFS->nBlocksize, &rc);
         2004  +    if( rc==LSM_OK ){
         2005  +      u8 *aMap = (u8 *)(pFS->pMap);
         2006  +      memcpy(&aMap[iToOff], &aMap[iFromOff], pFS->nBlocksize);
         2007  +    }
         2008  +  }else{
         2009  +    int nSz = pFS->nPagesize;
         2010  +    u8 *aData = (u8 *)lsmMallocRc(pFS->pEnv, nSz, &rc);
         2011  +    if( rc==LSM_OK ){
         2012  +      const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
         2013  +      int i;
         2014  +      for(i=0; rc==LSM_OK && i<nPagePerBlock; i++){
         2015  +        i64 iOff = iFromOff + i*nSz;
         2016  +        rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nSz);
         2017  +        if( rc==LSM_OK ){
         2018  +          iOff = iToOff + i*nSz;
         2019  +          rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, aData, nSz);
         2020  +        }
         2021  +      }
         2022  +    }
         2023  +  }
         2024  +
         2025  +  /* Update append-point list if necessary */
         2026  +  if( rc==LSM_OK ){
         2027  +    int i;
         2028  +    for(i=0; i<LSM_APPLIST_SZ; i++){
         2029  +      if( fsPageToBlock(pFS, p->aiAppend[i])==iFrom ){
         2030  +        const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
         2031  +        p->aiAppend[i] -= (i64)(iFrom-iTo) * nPagePerBlock;
         2032  +      }
         2033  +    }
         2034  +  }
         2035  +
         2036  +  /* Update the Segment structure itself */
         2037  +  fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iFirst);
         2038  +  fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iLastPg);
         2039  +  fsMovePage(pFS, pSeg, iTo, iFrom, &pSeg->iRoot);
         2040  +
         2041  +  return rc;
         2042  +}
  1833   2043   
  1834   2044   /*
  1835   2045   ** Append raw data to a segment. This function is only used in compressed
  1836   2046   ** database mode.
  1837   2047   */
  1838   2048   static Pgno fsAppendData(
  1839   2049     FileSystem *pFS,                /* File-system handle */
................................................................................
  1853   2063   
  1854   2064       /* If this is the first data written into the segment, find an append-point
  1855   2065       ** or allocate a new block.  */
  1856   2066       if( iApp==1 ){
  1857   2067         pSeg->iFirst = iApp = findAppendPoint(pFS, 0);
  1858   2068         if( iApp==0 ){
  1859   2069           int iBlk;
  1860         -        rc = lsmBlockAllocate(pFS->pDb, &iBlk);
         2070  +        rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
  1861   2071           pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk);
  1862   2072         }
  1863   2073       }
  1864   2074       iRet = iApp;
  1865   2075   
  1866   2076       /* Write as much data as is possible at iApp (usually all of it). */
  1867   2077       iLastOnBlock = fsLastPageOnPagesBlock(pFS, iApp);
................................................................................
  1882   2092       assert( nRem<=0 || (iApp-1)==iLastOnBlock );
  1883   2093       if( rc==LSM_OK && (iApp-1)==iLastOnBlock ){
  1884   2094         u8 aPtr[4];                 /* Space to serialize a u32 */
  1885   2095         int iBlk;                   /* New block number */
  1886   2096   
  1887   2097         if( nWrite>0 ){
  1888   2098           /* Allocate a new block. */
  1889         -        rc = lsmBlockAllocate(pFS->pDb, &iBlk);
         2099  +        rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
  1890   2100   
  1891   2101           /* Set the "next" pointer on the old block */
  1892   2102           if( rc==LSM_OK ){
  1893   2103             assert( iApp==(fsPageToBlock(pFS, iApp)*pFS->nBlocksize)-4 );
  1894   2104             lsmPutU32(aPtr, iBlk);
  1895   2105             rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aPtr, sizeof(aPtr));
  1896   2106           }
................................................................................
  1902   2112             iWrite = fsFirstPageOnBlock(pFS, iBlk);
  1903   2113             rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iWrite-4, aPtr, sizeof(aPtr));
  1904   2114             if( nRem>0 ) iApp = iWrite;
  1905   2115           }
  1906   2116         }else{
  1907   2117           /* The next block is already allocated. */
  1908   2118           assert( nRem>0 );
  1909         -        rc = fsBlockNext(pFS, fsPageToBlock(pFS, iApp), &iBlk);
         2119  +        assert( pSeg->pRedirect==0 );
         2120  +        rc = fsBlockNext(pFS, 0, fsPageToBlock(pFS, iApp), &iBlk);
  1910   2121           iRet = iApp = fsFirstPageOnBlock(pFS, iBlk);
  1911   2122         }
  1912   2123   
  1913   2124         /* Write the remaining data into the new block */
  1914   2125         if( rc==LSM_OK && nRem>0 ){
  1915   2126           rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, &aData[nWrite], nRem);
  1916   2127           iApp += nRem;
................................................................................
  1963   2174     if( fsIsLast(pFS, iPrev) ){
  1964   2175       /* Grab the first page on the next block (which has already be
  1965   2176       ** allocated). In this case set *piPrev to tell the caller to set
  1966   2177       ** the "previous block" pointer in the first 4 bytes of the page.
  1967   2178       */
  1968   2179       int iNext;
  1969   2180       int iBlk = fsPageToBlock(pFS, iPrev);
  1970         -    rc = fsBlockNext(pFS, iBlk, &iNext);
         2181  +    assert( pSeg->pRedirect==0 );
         2182  +    rc = fsBlockNext(pFS, 0, iBlk, &iNext);
  1971   2183       if( rc!=LSM_OK ) return rc;
  1972   2184       *piNew = fsFirstPageOnBlock(pFS, iNext);
  1973   2185       *piPrev = iBlk;
  1974   2186     }else{
  1975   2187       *piNew = iPrev+1;
  1976   2188       if( fsIsLast(pFS, *piNew) ){
  1977   2189         /* Allocate the next block here. */
  1978   2190         int iBlk;
  1979         -      rc = lsmBlockAllocate(pFS->pDb, &iBlk);
         2191  +      rc = lsmBlockAllocate(pFS->pDb, 0, &iBlk);
  1980   2192         if( rc!=LSM_OK ) return rc;
  1981   2193         *piNext = iBlk;
  1982   2194       }
  1983   2195     }
  1984   2196   
  1985   2197     pSeg->nSize++;
  1986   2198     pSeg->iLastPg = *piNew;
................................................................................
  2327   2539       iBlk = fsPageToBlock(pFS, pArray->iFirst);
  2328   2540       iLastBlk = fsPageToBlock(pFS, pArray->iLastPg);
  2329   2541   
  2330   2542       lsmStringInit(&str, pDb->pEnv);
  2331   2543       if( bBlock ){
  2332   2544         lsmStringAppendf(&str, "%d", iBlk);
  2333   2545         while( iBlk!=iLastBlk ){
  2334         -        fsBlockNext(pFS, iBlk, &iBlk);
         2546  +        fsBlockNext(pFS, pArray, iBlk, &iBlk);
  2335   2547           lsmStringAppendf(&str, " %d", iBlk);
  2336   2548         }
  2337   2549       }else{
  2338   2550         lsmStringAppendf(&str, "%d", pArray->iFirst);
  2339   2551         while( iBlk!=iLastBlk ){
  2340   2552           lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk));
  2341         -        fsBlockNext(pFS, iBlk, &iBlk);
         2553  +        fsBlockNext(pFS, pArray, iBlk, &iBlk);
  2342   2554           lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk));
  2343   2555         }
  2344   2556         lsmStringAppendf(&str, " %d", pArray->iLastPg);
  2345   2557       }
  2346   2558   
  2347   2559       *pzOut = str.z;
  2348   2560     }
................................................................................
  2349   2561   
  2350   2562     if( bUnlock ){
  2351   2563       int rcwork = LSM_BUSY;
  2352   2564       lsmFinishWork(pDb, 0, &rcwork);
  2353   2565     }
  2354   2566     return rc;
  2355   2567   }
         2568  +
         2569  +int lsmFsSegmentContainsPg(
         2570  +  FileSystem *pFS, 
         2571  +  Segment *pSeg, 
         2572  +  Pgno iPg, 
         2573  +  int *pbRes
         2574  +){
         2575  +  Redirect *pRedir = pSeg->pRedirect;
         2576  +  int rc = LSM_OK;
         2577  +  int iBlk;
         2578  +  int iLastBlk;
         2579  +  int iPgBlock;                   /* Block containing page iPg */
         2580  +
         2581  +  iPgBlock = fsPageToBlock(pFS, pSeg->iFirst);
         2582  +  iBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iFirst));
         2583  +  iLastBlk = fsRedirectBlock(pRedir, fsPageToBlock(pFS, pSeg->iLastPg));
         2584  +
         2585  +  while( iBlk!=iLastBlk && iBlk!=iPgBlock && rc==LSM_OK ){
         2586  +    rc = fsBlockNext(pFS, pSeg, iBlk, &iBlk);
         2587  +  }
         2588  +
         2589  +  *pbRes = (iBlk==iPgBlock);
         2590  +  return rc;
         2591  +}
  2356   2592   
  2357   2593   /*
  2358   2594   ** This function implements the lsm_info(LSM_INFO_ARRAY_PAGES) request.
  2359   2595   ** If successful, *pzOut is set to point to a nul-terminated string 
  2360   2596   ** containing the array structure and LSM_OK is returned. The caller should
  2361   2597   ** eventually free the string using lsmFree().
  2362   2598   **
  2363   2599   ** If an error occurs, *pzOut is set to NULL and an LSM error code returned.
  2364   2600   */
  2365   2601   int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut){
  2366   2602     int rc = LSM_OK;
  2367   2603     Snapshot *pWorker;              /* Worker snapshot */
  2368         -  Segment *pArray = 0;            /* Array to report on */
         2604  +  Segment *pSeg = 0;              /* Array to report on */
  2369   2605     int bUnlock = 0;
  2370   2606   
  2371   2607     *pzOut = 0;
  2372   2608     if( iFirst==0 ) return LSM_ERROR;
  2373   2609   
  2374   2610     /* Obtain the worker snapshot */
  2375   2611     pWorker = pDb->pWorker;
................................................................................
  2377   2613       rc = lsmBeginWork(pDb);
  2378   2614       if( rc!=LSM_OK ) return rc;
  2379   2615       pWorker = pDb->pWorker;
  2380   2616       bUnlock = 1;
  2381   2617     }
  2382   2618   
  2383   2619     /* Search for the array that starts on page iFirst */
  2384         -  pArray = findSegment(pWorker, iFirst);
         2620  +  pSeg = findSegment(pWorker, iFirst);
  2385   2621   
  2386         -  if( pArray==0 ){
         2622  +  if( pSeg==0 ){
  2387   2623       /* Could not find the requested array. This is an error. */
  2388   2624       rc = LSM_ERROR;
  2389   2625     }else{
  2390   2626       Page *pPg = 0;
  2391   2627       FileSystem *pFS = pDb->pFS;
  2392   2628       LsmString str;
  2393   2629   
  2394   2630       lsmStringInit(&str, pDb->pEnv);
  2395         -    rc = lsmFsDbPageGet(pFS, iFirst, &pPg);
         2631  +    rc = lsmFsDbPageGet(pFS, pSeg, iFirst, &pPg);
  2396   2632       while( rc==LSM_OK && pPg ){
  2397   2633         Page *pNext = 0;
  2398   2634         lsmStringAppendf(&str, " %lld", lsmFsPageNumber(pPg));
  2399         -      rc = lsmFsDbPageNext(pArray, pPg, 1, &pNext);
         2635  +      rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
  2400   2636         lsmFsPageRelease(pPg);
  2401   2637         pPg = pNext;
  2402   2638       }
  2403   2639   
  2404   2640       if( rc!=LSM_OK ){
  2405   2641         lsmFree(pDb->pEnv, str.z);
  2406   2642       }else{
................................................................................
  2447   2683     int bExtra,                     /* If true, count the "next" block if any */
  2448   2684     int nUsed,
  2449   2685     u8 *aUsed
  2450   2686   ){
  2451   2687     if( pSeg ){
  2452   2688       if( pSeg && pSeg->nSize>0 ){
  2453   2689         int rc;
  2454         -      Pgno iLast = pSeg->iLastPg;
  2455         -      int iBlk;
  2456         -      int iLastBlk;
  2457         -      int bLastIsLastOnBlock;
         2690  +      int iBlk;                   /* Current block (during iteration) */
         2691  +      int iLastBlk;               /* Last block of segment */
         2692  +      int iFirstBlk;              /* First block of segment */
         2693  +      int bLastIsLastOnBlock;     /* True iLast is the last on its block */
  2458   2694   
  2459         -      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
         2695  +      assert( 0==fsSegmentRedirects(pFS, pSeg) );
         2696  +      iBlk = iFirstBlk = fsPageToBlock(pFS, pSeg->iFirst);
  2460   2697         iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);
  2461         -      bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==iLast);
         2698  +
         2699  +      bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg);
  2462   2700         assert( iBlk>0 );
  2463   2701   
  2464         -      /* If the first page of this run is also the first page of its first
  2465         -      ** block, set the flag to indicate that the first page of iBlk is 
  2466         -      ** in use.  */
  2467         -      if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst ){
  2468         -        assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 );
  2469         -        aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG;
  2470         -      }
  2471         -
  2472   2702         do {
  2473   2703           /* iBlk is a part of this sorted run. */
  2474   2704           aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;
         2705  +
         2706  +        /* If the first page of this block is also part of the segment,
         2707  +        ** set the flag to indicate that the first page of iBlk is in use.  
         2708  +        */
         2709  +        if( fsFirstPageOnBlock(pFS, iBlk)==pSeg->iFirst || iBlk!=iFirstBlk ){
         2710  +          assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_FIRST_PG)==0 );
         2711  +          aUsed[iBlk-1] |= INTEGRITY_CHECK_FIRST_PG;
         2712  +        }
  2475   2713   
  2476   2714           /* Unless the sorted run finishes before the last page on this block, 
  2477   2715           ** the last page of this block is also in use.  */
  2478   2716           if( iBlk!=iLastBlk || bLastIsLastOnBlock ){
  2479   2717             assert( (aUsed[iBlk-1] & INTEGRITY_CHECK_LAST_PG)==0 );
  2480   2718             aUsed[iBlk-1] |= INTEGRITY_CHECK_LAST_PG;
  2481   2719           }
................................................................................
  2482   2720   
  2483   2721           /* Special case. The sorted run being scanned is the output run of
  2484   2722           ** a level currently undergoing an incremental merge. The sorted
  2485   2723           ** run ends on the last page of iBlk, but the next block has already
  2486   2724           ** been allocated. So mark it as in use as well.  */
  2487   2725           if( iBlk==iLastBlk && bLastIsLastOnBlock && bExtra ){
  2488   2726             int iExtra = 0;
  2489         -          rc = fsBlockNext(pFS, iBlk, &iExtra);
         2727  +          rc = fsBlockNext(pFS, pSeg, iBlk, &iExtra);
  2490   2728             assert( rc==LSM_OK );
  2491   2729   
  2492   2730             assert( aUsed[iExtra-1]==0 );
  2493   2731             aUsed[iExtra-1] |= INTEGRITY_CHECK_USED;
  2494   2732             aUsed[iExtra-1] |= INTEGRITY_CHECK_FIRST_PG;
  2495   2733             aUsed[iExtra-1] |= INTEGRITY_CHECK_LAST_PG;
  2496   2734           }
................................................................................
  2497   2735   
  2498   2736           /* Move on to the next block in the sorted run. Or set iBlk to zero
  2499   2737           ** in order to break out of the loop if this was the last block in
  2500   2738           ** the run.  */
  2501   2739           if( iBlk==iLastBlk ){
  2502   2740             iBlk = 0;
  2503   2741           }else{
  2504         -          rc = fsBlockNext(pFS, iBlk, &iBlk);
         2742  +          rc = fsBlockNext(pFS, pSeg, iBlk, &iBlk);
  2505   2743             assert( rc==LSM_OK );
  2506   2744           }
  2507   2745         }while( iBlk );
  2508   2746       }
  2509   2747     }
  2510   2748   }
  2511   2749   

Changes to src/lsm_main.c.

    37     37   
    38     38     /* If there is at least one cursor or a write transaction open, the database
    39     39     ** handle must be holding a pointer to a client snapshot. And the reverse 
    40     40     ** - if there are no open cursors and no write transactions then there must 
    41     41     ** not be a client snapshot.  */
    42     42     assert( (pDb->pCsr!=0 || pDb->nTransOpen>0)==(pDb->iReader>=0) );
    43     43   
           44  +  assert( pDb->iReader<0 || pDb->pClient!=0 );
           45  +
    44     46     assert( pDb->nTransOpen>=0 );
    45     47   }
    46     48   #else
    47     49   # define assert_db_state(x) 
    48     50   #endif
    49     51   
    50     52   /*
................................................................................
   371    373   
   372    374     assert( *pbUnlock==0 );
   373    375     if( !pDb->pWorker ){
   374    376       rc = lsmBeginWork(pDb);
   375    377       if( rc!=LSM_OK ) return rc;
   376    378       *pbUnlock = 1;
   377    379     }
   378         -  *pp = pDb->pWorker;
          380  +  if( pp ) *pp = pDb->pWorker;
   379    381     return rc;
   380    382   }
   381    383   
   382    384   static void infoFreeWorker(lsm_db *pDb, int bUnlock){
   383    385     if( bUnlock ){
   384    386       int rcdummy = LSM_BUSY;
   385    387       lsmFinishWork(pDb, 0, &rcdummy);
................................................................................
   529    531         break;
   530    532       }
   531    533   
   532    534       case LSM_INFO_PAGE_HEX_DUMP:
   533    535       case LSM_INFO_PAGE_ASCII_DUMP: {
   534    536         Pgno pgno = va_arg(ap, Pgno);
   535    537         char **pzVal = va_arg(ap, char **);
   536         -      rc = lsmInfoPageDump(pDb, pgno, (eParam==LSM_INFO_PAGE_HEX_DUMP), pzVal);
          538  +      int bUnlock = 0;
          539  +      rc = infoGetWorker(pDb, 0, &bUnlock);
          540  +      if( rc==LSM_OK ){
          541  +        int bHex = (eParam==LSM_INFO_PAGE_HEX_DUMP);
          542  +        rc = lsmInfoPageDump(pDb, pgno, bHex, pzVal);
          543  +      }
          544  +      infoFreeWorker(pDb, bUnlock);
   537    545         break;
   538    546       }
   539    547   
   540    548       case LSM_INFO_LOG_STRUCTURE: {
   541    549         char **pzVal = va_arg(ap, char **);
   542    550         rc = lsmInfoLogStructure(pDb, pzVal);
   543    551         break;

Changes to src/lsm_shared.c.

   161    161     DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
   162    162     if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
   163    163     p->nBlock--;
   164    164     return 0;
   165    165   }
   166    166   
   167    167   static int dbTruncate(lsm_db *pDb, i64 iInUse){
   168         -  int rc;
          168  +  int rc = LSM_OK;
          169  +#if 0
   169    170     int i;
   170    171     DbTruncateCtx ctx;
   171    172   
   172    173     assert( pDb->pWorker );
   173    174     ctx.nBlock = pDb->pWorker->nBlock;
   174    175     ctx.iInUse = iInUse;
   175    176   
................................................................................
   184    185         lsmLogMessage(pDb, 0, 
   185    186             "dbTruncate(): truncated db to %d blocks",ctx.nBlock
   186    187         );
   187    188       }
   188    189   #endif
   189    190       pDb->pWorker->nBlock = ctx.nBlock;
   190    191     }
          192  +#endif
   191    193     return rc;
   192    194   }
   193    195   
   194    196   
   195    197   /*
   196    198   ** This function is called during database shutdown (when the number of
   197    199   ** connections drops from one to zero). It truncates the database file
................................................................................
   623    625   }
   624    626   
   625    627   
   626    628   typedef struct FindFreeblockCtx FindFreeblockCtx;
   627    629   struct FindFreeblockCtx {
   628    630     i64 iInUse;
   629    631     int iRet;
          632  +  int bNotOne;
   630    633   };
   631    634   
   632    635   static int findFreeblockCb(void *pCtx, int iBlk, i64 iSnapshot){
   633    636     FindFreeblockCtx *p = (FindFreeblockCtx *)pCtx;
   634         -  if( iSnapshot<p->iInUse ){
          637  +  if( iSnapshot<p->iInUse && (iBlk!=1 || p->bNotOne==0) ){
   635    638       p->iRet = iBlk;
   636    639       return 1;
   637    640     }
   638    641     return 0;
   639    642   }
   640    643   
   641         -static int findFreeblock(lsm_db *pDb, i64 iInUse, int *piRet){
          644  +static int findFreeblock(lsm_db *pDb, i64 iInUse, int bNotOne, int *piRet){
   642    645     int rc;                         /* Return code */
   643    646     FindFreeblockCtx ctx;           /* Context object */
   644    647   
   645         -#ifdef LSM_LOG_BLOCKS
   646         -  char *zList = 0;
   647         -  lsmInfoFreelist(pDb, &zList);
   648         -  lsmLogMessage(pDb, 0, 
   649         -      "findFreeblock(): iInUse=%lld freelist=%s", iInUse, zList);
   650         -  lsmFree(pDb->pEnv, zList);
   651         -#endif
   652         -
   653    648     ctx.iInUse = iInUse;
   654    649     ctx.iRet = 0;
          650  +  ctx.bNotOne = bNotOne;
   655    651     rc = lsmWalkFreelist(pDb, 0, findFreeblockCb, (void *)&ctx);
   656    652     *piRet = ctx.iRet;
   657    653   
   658         -#ifdef LSM_LOG_BLOCKS
   659         -  lsmLogMessage(pDb, 0, "findFreeblock(): returning block %d", *piRet);
   660         -#endif
   661    654     return rc;
   662    655   }
   663    656   
   664    657   /*
   665    658   ** Allocate a new database file block to write data to, either by extending
   666    659   ** the database file or by recycling a free-list entry. The worker snapshot 
   667    660   ** must be held in order to call this function.
   668    661   **
   669    662   ** If successful, *piBlk is set to the block number allocated and LSM_OK is
   670    663   ** returned. Otherwise, *piBlk is zeroed and an lsm error code returned.
   671    664   */
   672         -int lsmBlockAllocate(lsm_db *pDb, int *piBlk){
          665  +int lsmBlockAllocate(lsm_db *pDb, int iBefore, int *piBlk){
   673    666     Snapshot *p = pDb->pWorker;
   674    667     int iRet = 0;                   /* Block number of allocated block */
   675    668     int rc = LSM_OK;
   676    669     i64 iInUse = 0;                 /* Snapshot id still in use */
   677    670     i64 iSynced = 0;                /* Snapshot id synced to disk */
   678    671   
   679    672     assert( p );
   680    673   
   681    674   #ifdef LSM_LOG_FREELIST
   682    675     {
          676  +    static int nCall = 0;
   683    677       char *zFree = 0;
          678  +    nCall++;
   684    679       rc = lsmInfoFreelist(pDb, &zFree);
   685    680       if( rc!=LSM_OK ) return rc;
   686         -    lsmLogMessage(pDb, 0, "lsmBlockAllocate(): freelist: %s", zFree);
          681  +    lsmLogMessage(pDb, 0, "lsmBlockAllocate(): %d freelist: %s", nCall, zFree);
   687    682       lsmFree(pDb->pEnv, zFree);
   688    683     }
   689    684   #endif
   690    685   
   691    686     /* Set iInUse to the smallest snapshot id that is either:
   692    687     **
   693    688     **   * Currently in use by a database client,
................................................................................
   694    689     **   * May be used by a database client in the future, or
   695    690     **   * Is the most recently checkpointed snapshot (i.e. the one that will
   696    691     **     be used following recovery if a failure occurs at this point).
   697    692     */
   698    693     rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
   699    694     if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
   700    695     iInUse = iSynced;
   701         -  if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
          696  +  if( rc==LSM_OK && pDb->iReader>=0 ){
          697  +    assert( pDb->pClient );
          698  +    iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
          699  +  }
   702    700     if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);
   703    701   
   704    702   #ifdef LSM_LOG_FREELIST
   705    703     {
   706    704       lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
   707    705           "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)", 
   708         -        iInUse, iSynced, (pDb->pClient ? pDb->pClient->iId : 0)
          706  +        iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0)
   709    707       );
   710    708     }
   711    709   #endif
   712    710   
   713    711     /* Query the free block list for a suitable block */
   714         -  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);
          712  +  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);
   715    713   
   716         -  /* If a block was found in the free block list, use it and remove it from 
   717         -  ** the list. Otherwise, if no suitable block was found, allocate one from
   718         -  ** the end of the file.  */
   719         -  if( rc==LSM_OK ){
          714  +  if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){
          715  +    iRet = 0;
          716  +
          717  +  }else if( rc==LSM_OK ){
          718  +    /* If a block was found in the free block list, use it and remove it from 
          719  +    ** the list. Otherwise, if no suitable block was found, allocate one from
          720  +    ** the end of the file.  */
   720    721       if( iRet>0 ){
   721    722   #ifdef LSM_LOG_FREELIST
   722    723         lsmLogMessage(pDb, 0, 
   723    724             "reusing block %d (snapshot-in-use=%lld)", iRet, iInUse);
   724    725   #endif
   725    726         rc = freelistAppend(pDb, iRet, -1);
   726    727         if( rc==LSM_OK ){
................................................................................
   730    731         iRet = ++(p->nBlock);
   731    732   #ifdef LSM_LOG_FREELIST
   732    733         lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
   733    734   #endif
   734    735       }
   735    736     }
   736    737   
   737         -  assert( iRet>0 || rc!=LSM_OK );
          738  +  assert( iBefore>0 || iRet>0 || rc!=LSM_OK );
   738    739     *piBlk = iRet;
   739    740     return rc;
   740    741   }
   741    742   
   742    743   /*
   743    744   ** Free a database block. The worker snapshot must be held in order to call 
   744    745   ** this function.
................................................................................
   867    868     return rc;
   868    869   }
   869    870   
   870    871   void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){
   871    872     if( p ){
   872    873       lsmSortedFreeLevel(pEnv, p->pLevel);
   873    874       lsmFree(pEnv, p->freelist.aEntry);
          875  +    lsmFree(pEnv, p->redirect.a);
   874    876       lsmFree(pEnv, p);
   875    877     }
   876    878   }
   877    879   
   878    880   /*
   879    881   ** Argument bFlush is true if the contents of the in-memory tree has just
   880    882   ** been flushed to disk. The significance of this is that once the snapshot

Changes to src/lsm_sorted.c.

    68     68   */
    69     69   
    70     70   #ifndef _LSM_INT_H
    71     71   # include "lsmInt.h"
    72     72   #endif
    73     73   #include "sqlite4.h"            /* only for sqlite4_snprintf() */
    74     74   
           75  +#define LSM_LOG_STRUCTURE 0
           76  +#define LSM_LOG_DATA      0
           77  +
    75     78   /*
    76     79   ** Macros to help decode record types.
    77     80   */
    78     81   #define rtTopic(eType)       ((eType) & LSM_SYSTEMKEY)
    79     82   #define rtIsDelete(eType)    (((eType) & 0x0F)==LSM_POINT_DELETE)
    80     83   
    81     84   #define rtIsSeparator(eType) (((eType) & LSM_SEPARATOR)!=0)
................................................................................
   391    394   static void sortedBlobFree(Blob *pBlob){
   392    395     assert( pBlob->pEnv || pBlob->pData==0 );
   393    396     if( pBlob->pData ) lsmFree(pBlob->pEnv, pBlob->pData);
   394    397     memset(pBlob, 0, sizeof(Blob));
   395    398   }
   396    399   
   397    400   static int sortedReadData(
          401  +  Segment *pSeg,
   398    402     Page *pPg,
   399    403     int iOff,
   400    404     int nByte,
   401    405     void **ppData,
   402    406     Blob *pBlob
   403    407   ){
   404    408     int rc = LSM_OK;
................................................................................
   444    448         assert( nRem>=0 );
   445    449         if( nRem==0 ) break;
   446    450         i -= iEnd;
   447    451   
   448    452         /* Grab the next page in the segment */
   449    453   
   450    454         do {
   451         -        rc = lsmFsDbPageNext(0, pPg, 1, &pNext);
          455  +        rc = lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
   452    456           if( rc==LSM_OK && pNext==0 ){
   453    457             rc = LSM_CORRUPT_BKPT;
   454    458           }
   455    459           if( rc ) break;
   456    460           lsmFsPageRelease(pPg);
   457    461           pPg = pNext;
   458    462           aData = fsPageData(pPg, &nData);
................................................................................
   505    509     assert( iCell<pageGetNRec(aData, nData) && iCell>=0 );
   506    510     aCell = pageGetCell(aData, nData, iCell);
   507    511     lsmVarintGet64(&aCell[1], &iRet);
   508    512     return iRet;
   509    513   }
   510    514   
   511    515   static u8 *pageGetKey(
          516  +  Segment *pSeg,                  /* Segment pPg belongs to */
   512    517     Page *pPg,                      /* Page to read from */
   513    518     int iCell,                      /* Index of cell on page to read */
   514    519     int *piTopic,                   /* OUT: Topic associated with this key */
   515    520     int *pnKey,                     /* OUT: Size of key in bytes */
   516    521     Blob *pBlob                     /* If required, use this for dynamic memory */
   517    522   ){
   518    523     u8 *pKey;
................................................................................
   531    536     pKey += lsmVarintGet32(pKey, &nDummy);
   532    537     pKey += lsmVarintGet32(pKey, pnKey);
   533    538     if( rtIsWrite(eType) ){
   534    539       pKey += lsmVarintGet32(pKey, &nDummy);
   535    540     }
   536    541     *piTopic = rtTopic(eType);
   537    542   
   538         -  sortedReadData(pPg, pKey-aData, *pnKey, (void **)&pKey, pBlob);
          543  +  sortedReadData(pSeg, pPg, pKey-aData, *pnKey, (void **)&pKey, pBlob);
   539    544     return pKey;
   540    545   }
   541    546   
   542    547   static int pageGetKeyCopy(
   543    548     lsm_env *pEnv,                  /* Environment handle */
          549  +  Segment *pSeg,                  /* Segment pPg belongs to */
   544    550     Page *pPg,                      /* Page to read from */
   545    551     int iCell,                      /* Index of cell on page to read */
   546    552     int *piTopic,                   /* OUT: Topic associated with this key */
   547    553     Blob *pBlob                     /* If required, use this for dynamic memory */
   548    554   ){
   549    555     int rc = LSM_OK;
   550    556     int nKey;
   551    557     u8 *aKey;
   552    558   
   553         -  aKey = pageGetKey(pPg, iCell, piTopic, &nKey, pBlob);
          559  +  aKey = pageGetKey(pSeg, pPg, iCell, piTopic, &nKey, pBlob);
   554    560     assert( (void *)aKey!=pBlob->pData || nKey==pBlob->nData );
   555    561     if( (void *)aKey!=pBlob->pData ){
   556    562       rc = sortedBlobSet(pEnv, pBlob, aKey, nKey);
   557    563     }
   558    564   
   559    565     return rc;
   560    566   }
................................................................................
   575    581     return iRef;
   576    582   }
   577    583   
   578    584   #define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i)))
   579    585   #define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i)))
   580    586   
   581    587   static int pageGetBtreeKey(
          588  +  Segment *pSeg,                  /* Segment page pPg belongs to */
   582    589     Page *pPg,
   583    590     int iKey, 
   584    591     Pgno *piPtr, 
   585    592     int *piTopic, 
   586    593     void **ppKey,
   587    594     int *pnKey,
   588    595     Blob *pBlob
................................................................................
   601    608     aCell += GETVARINT64(aCell, *piPtr);
   602    609   
   603    610     if( eType==0 ){
   604    611       int rc;
   605    612       Pgno iRef;                  /* Page number of referenced page */
   606    613       Page *pRef;
   607    614       aCell += GETVARINT64(aCell, iRef);
   608         -    rc = lsmFsDbPageGet(lsmPageFS(pPg), iRef, &pRef);
          615  +    rc = lsmFsDbPageGet(lsmPageFS(pPg), pSeg, iRef, &pRef);
   609    616       if( rc!=LSM_OK ) return rc;
   610         -    pageGetKeyCopy(lsmPageEnv(pPg), pRef, 0, &eType, pBlob);
          617  +    pageGetKeyCopy(lsmPageEnv(pPg), pSeg, pRef, 0, &eType, pBlob);
   611    618       lsmFsPageRelease(pRef);
   612    619       *ppKey = pBlob->pData;
   613    620       *pnKey = pBlob->nData;
   614    621     }else{
   615    622       aCell += GETVARINT32(aCell, *pnKey);
   616    623       *ppKey = aCell;
   617    624     }
................................................................................
   632    639       int iCell = pCsr->aPg[iPg].iCell;
   633    640       while( iCell<0 && (--iPg)>=0 ){
   634    641         iCell = pCsr->aPg[iPg].iCell-1;
   635    642       }
   636    643       if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;
   637    644   
   638    645       rc = pageGetBtreeKey(
          646  +        pCsr->pSeg,
   639    647           pCsr->aPg[iPg].pPage, iCell,
   640    648           &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
   641    649       );
   642    650       pCsr->eType |= LSM_SEPARATOR;
   643    651     }
   644    652   
   645    653     return rc;
................................................................................
   693    701       if( pCsr->iPg>=0 ){
   694    702         pCsr->aPg[pCsr->iPg].iCell++;
   695    703   
   696    704         iLoad = btreeCursorPtr(aData, nData, pPg->iCell);
   697    705         do {
   698    706           Page *pLoad;
   699    707           pCsr->iPg++;
   700         -        rc = lsmFsDbPageGet(pCsr->pFS, iLoad, &pLoad);
          708  +        rc = lsmFsDbPageGet(pCsr->pFS, pCsr->pSeg, iLoad, &pLoad);
   701    709           pCsr->aPg[pCsr->iPg].pPage = pLoad;
   702    710           pCsr->aPg[pCsr->iPg].iCell = 0;
   703    711           if( rc==LSM_OK ){
   704    712             if( pCsr->iPg==(pCsr->nDepth-1) ) break;
   705    713             aData = fsPageData(pLoad, &nData);
   706    714             iLoad = btreeCursorPtr(aData, nData, 0);
   707    715           }
................................................................................
   738    746     int rc;
   739    747   
   740    748     Page *pPg = 0;
   741    749     FileSystem *pFS = pCsr->pFS;
   742    750     int iPg = pCsr->pSeg->iRoot;
   743    751   
   744    752     do {
   745         -    rc = lsmFsDbPageGet(pFS, iPg, &pPg);
          753  +    rc = lsmFsDbPageGet(pFS, pCsr->pSeg, iPg, &pPg);
   746    754       assert( (rc==LSM_OK)==(pPg!=0) );
   747    755       if( rc==LSM_OK ){
   748    756         u8 *aData;
   749    757         int nData;
   750    758         int flags;
   751    759   
   752    760         aData = fsPageData(pPg, &nData);
................................................................................
   828    836     int rc = LSM_OK;
   829    837   
   830    838     if( p->iPg ){
   831    839       lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
   832    840       int iCell;                    /* Current cell number on leaf page */
   833    841       Pgno iLeaf;                   /* Page number of current leaf page */
   834    842       int nDepth;                   /* Depth of b-tree structure */
          843  +    Segment *pSeg = pCsr->pSeg;
   835    844   
   836    845       /* Decode the MergeInput structure */
   837    846       iLeaf = p->iPg;
   838    847       nDepth = (p->iCell & 0x00FF);
   839    848       iCell = (p->iCell >> 8) - 1;
   840    849   
   841    850       /* Allocate the BtreeCursor.aPg[] array */
   842    851       assert( pCsr->aPg==0 );
   843    852       pCsr->aPg = (BtreePg *)lsmMallocZeroRc(pEnv, sizeof(BtreePg) * nDepth, &rc);
   844    853   
   845    854       /* Populate the last entry of the aPg[] array */
   846    855       if( rc==LSM_OK ){
          856  +      Page **pp = &pCsr->aPg[nDepth-1].pPage;
   847    857         pCsr->iPg = nDepth-1;
   848    858         pCsr->nDepth = nDepth;
   849    859         pCsr->aPg[pCsr->iPg].iCell = iCell;
   850         -      rc = lsmFsDbPageGet(pCsr->pFS, iLeaf, &pCsr->aPg[nDepth-1].pPage);
          860  +      rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLeaf, pp);
   851    861       }
   852    862   
   853    863       /* Populate any other aPg[] array entries */
   854    864       if( rc==LSM_OK && nDepth>1 ){
   855    865         Blob blob = {0,0,0};
   856    866         void *pSeek;
   857    867         int nSeek;
   858    868         int iTopicSeek;
   859    869         int iPg = 0;
   860         -      int iLoad = pCsr->pSeg->iRoot;
          870  +      int iLoad = pSeg->iRoot;
   861    871         Page *pPg = pCsr->aPg[nDepth-1].pPage;
   862    872    
   863    873         if( pageObjGetNRec(pPg)==0 ){
   864    874           /* This can happen when pPg is the right-most leaf in the b-tree.
   865    875           ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
   866    876           ** greater than any real key.  */
   867    877           assert( iCell==-1 );
   868    878           iTopicSeek = 1000;
   869    879           pSeek = 0;
   870    880           nSeek = 0;
   871    881         }else{
   872    882           Pgno dummy;
   873         -        rc = pageGetBtreeKey(pPg,
          883  +        rc = pageGetBtreeKey(pSeg, pPg,
   874    884               0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob
   875    885           );
   876    886         }
   877    887   
   878    888         do {
   879    889           Page *pPg;
   880         -        rc = lsmFsDbPageGet(pCsr->pFS, iLoad, &pPg);
          890  +        rc = lsmFsDbPageGet(pCsr->pFS, pSeg, iLoad, &pPg);
   881    891           assert( rc==LSM_OK || pPg==0 );
   882    892           if( rc==LSM_OK ){
   883    893             u8 *aData;                  /* Buffer containing page data */
   884    894             int nData;                  /* Size of aData[] in bytes */
   885    895             int iMin;
   886    896             int iMax;
   887    897             int iCell;
................................................................................
   897    907             while( iMax>=iMin ){
   898    908               int iTry = (iMin+iMax)/2;
   899    909               void *pKey; int nKey;         /* Key for cell iTry */
   900    910               int iTopic;                   /* Topic for key pKeyT/nKeyT */
   901    911               Pgno iPtr;                    /* Pointer for cell iTry */
   902    912               int res;                      /* (pSeek - pKeyT) */
   903    913   
   904         -            rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopic, &pKey, &nKey,&blob);
          914  +            rc = pageGetBtreeKey(
          915  +                pSeg, pPg, iTry, &iPtr, &iTopic, &pKey, &nKey, &blob
          916  +            );
   905    917               if( rc!=LSM_OK ) break;
   906    918   
   907    919               res = sortedKeyCompare(
   908    920                   xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
   909    921               );
   910    922               assert( res!=0 );
   911    923   
................................................................................
   917    929                 iMin = iTry+1;
   918    930               }
   919    931             }
   920    932   
   921    933             pCsr->aPg[iPg].pPage = pPg;
   922    934             pCsr->aPg[iPg].iCell = iCell;
   923    935             iPg++;
   924         -          assert( iPg!=nDepth-1 || iLoad==iLeaf );
          936  +          assert( iPg!=nDepth-1 
          937  +               || lsmFsRedirectPage(pCsr->pFS, pSeg->pRedirect, iLoad)==iLeaf
          938  +          );
   925    939           }
   926    940         }while( rc==LSM_OK && iPg<(nDepth-1) );
   927    941         sortedBlobFree(&blob);
   928    942       }
   929    943   
   930    944       /* Load the current key and pointer */
   931    945       if( rc==LSM_OK ){
................................................................................
   939    953         if( pBtreePg->iCell<0 ){
   940    954           Pgno dummy;
   941    955           int i;
   942    956           for(i=pCsr->iPg-1; i>=0; i--){
   943    957             if( pCsr->aPg[i].iCell>0 ) break;
   944    958           }
   945    959           assert( i>=0 );
   946         -        rc = pageGetBtreeKey(
          960  +        rc = pageGetBtreeKey(pSeg,
   947    961               pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
   948    962               &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
   949    963           );
   950    964           pCsr->eType |= LSM_SEPARATOR;
   951    965   
   952    966         }else{
   953    967           rc = btreeCursorLoadKey(pCsr);
................................................................................
   996   1010     FileSystem *pFS,
   997   1011     SegmentPtr *pPtr,              /* Load page into this SegmentPtr object */
   998   1012     int iNew                       /* Page number of new page */
   999   1013   ){
  1000   1014     Page *pPg = 0;                 /* The new page */
  1001   1015     int rc;                        /* Return Code */
  1002   1016   
  1003         -  rc = lsmFsDbPageGet(pFS, iNew, &pPg);
         1017  +  rc = lsmFsDbPageGet(pFS, pPtr->pSeg, iNew, &pPg);
  1004   1018     assert( rc==LSM_OK || pPg==0 );
  1005   1019     segmentPtrSetPage(pPtr, pPg);
  1006   1020   
  1007   1021     return rc;
  1008   1022   }
  1009   1023   
  1010   1024   static int segmentPtrReadData(
  1011   1025     SegmentPtr *pPtr,
  1012   1026     int iOff,
  1013   1027     int nByte,
  1014   1028     void **ppData,
  1015   1029     Blob *pBlob
  1016   1030   ){
  1017         -  return sortedReadData(pPtr->pPg, iOff, nByte, ppData, pBlob);
         1031  +  return sortedReadData(pPtr->pSeg, pPtr->pPg, iOff, nByte, ppData, pBlob);
  1018   1032   }
  1019   1033   
  1020   1034   static int segmentPtrNextPage(
  1021   1035     SegmentPtr *pPtr,              /* Load page into this SegmentPtr object */
  1022   1036     int eDir                       /* +1 for next(), -1 for prev() */
  1023   1037   ){
  1024   1038     Page *pNext;                   /* New page to load */
................................................................................
  1069   1083         pPtr->pVal = 0;
  1070   1084       }
  1071   1085     }
  1072   1086   
  1073   1087     return rc;
  1074   1088   }
  1075   1089   
  1076         -void lsmSortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
         1090  +
         1091  +static Segment *sortedSplitkeySegment(Level *pLevel){
         1092  +  Merge *pMerge = pLevel->pMerge;
         1093  +  MergeInput *p = &pMerge->splitkey;
         1094  +  Segment *pSeg;
         1095  +  int i;
         1096  +
         1097  +  for(i=0; i<pMerge->nInput; i++){
         1098  +    if( p->iPg==pMerge->aInput[i].iPg ) break;
         1099  +  }
         1100  +  if( pMerge->nInput==(pLevel->nRight+1) && i>=(pMerge->nInput-1) ){
         1101  +    pSeg = &pLevel->pNext->lhs;
         1102  +  }else{
         1103  +    pSeg = &pLevel->aRhs[i];
         1104  +  }
         1105  +
         1106  +  return pSeg;
         1107  +}
         1108  +
         1109  +static void sortedSplitkey(lsm_db *pDb, Level *pLevel, int *pRc){
         1110  +  Segment *pSeg;
  1077   1111     Page *pPg = 0;
  1078   1112     lsm_env *pEnv = pDb->pEnv;      /* Environment handle */
  1079   1113     int rc = *pRc;
  1080   1114     Merge *pMerge = pLevel->pMerge;
  1081   1115   
         1116  +  pSeg = sortedSplitkeySegment(pLevel);
  1082   1117     if( rc==LSM_OK ){
  1083         -    rc = lsmFsDbPageGet(pDb->pFS, pMerge->splitkey.iPg, &pPg);
         1118  +    rc = lsmFsDbPageGet(pDb->pFS, pSeg, pMerge->splitkey.iPg, &pPg);
  1084   1119     }
  1085   1120     if( rc==LSM_OK ){
  1086   1121       int iTopic;
  1087   1122       Blob blob = {0, 0, 0, 0};
  1088   1123       u8 *aData;
  1089   1124       int nData;
  1090   1125     
  1091   1126       aData = lsmFsPageData(pPg, &nData);
  1092   1127       if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){
  1093   1128         void *pKey;
  1094   1129         int nKey;
  1095   1130         Pgno dummy;
  1096         -      rc = pageGetBtreeKey(
         1131  +      rc = pageGetBtreeKey(pSeg,
  1097   1132             pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob
  1098   1133         );
  1099   1134         if( rc==LSM_OK && blob.pData!=pKey ){
  1100   1135           rc = sortedBlobSet(pEnv, &blob, pKey, nKey);
  1101   1136         }
  1102   1137       }else{
  1103         -      rc = pageGetKeyCopy(pEnv, pPg, pMerge->splitkey.iCell, &iTopic, &blob);
         1138  +      rc = pageGetKeyCopy(
         1139  +          pEnv, pSeg, pPg, pMerge->splitkey.iCell, &iTopic, &blob
         1140  +      );
  1104   1141       }
  1105   1142   
  1106   1143       pLevel->iSplitTopic = iTopic;
  1107   1144       pLevel->pSplitKey = blob.pData;
  1108   1145       pLevel->nSplitKey = blob.nData;
  1109   1146       lsmFsPageRelease(pPg);
  1110   1147     }
................................................................................
  1168   1205       if( svFlags && pPtr->pPg ){
  1169   1206         int res = sortedKeyCompare(pCsr->pDb->xCmp,
  1170   1207             rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
  1171   1208             pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
  1172   1209         );
  1173   1210         if( res<0 ) segmentPtrReset(pPtr);
  1174   1211       }
         1212  +
  1175   1213       if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
  1176         -      rc = lsmFsDbPageGet(pCsr->pDb->pFS, pPtr->pSeg->iFirst, &pPtr->pPg);
         1214  +      Segment *pSeg = pPtr->pSeg;
         1215  +      rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, pSeg->iFirst, &pPtr->pPg);
  1177   1216         if( rc!=LSM_OK ) return rc;
  1178         -      pPtr->eType = LSM_START_DELETE | (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
         1217  +      pPtr->eType = LSM_START_DELETE | LSM_POINT_DELETE;
         1218  +      pPtr->eType |= (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
  1179   1219         pPtr->pKey = pLvl->pSplitKey;
  1180   1220         pPtr->nKey = pLvl->nSplitKey;
  1181   1221       }
  1182   1222   
  1183   1223     }while( pCsr 
  1184   1224          && pPtr->pPg 
  1185   1225          && segmentPtrIgnoreSeparators(pCsr, pPtr)
................................................................................
  1192   1232   static void segmentPtrEndPage(
  1193   1233     FileSystem *pFS, 
  1194   1234     SegmentPtr *pPtr, 
  1195   1235     int bLast, 
  1196   1236     int *pRc
  1197   1237   ){
  1198   1238     if( *pRc==LSM_OK ){
         1239  +    Segment *pSeg = pPtr->pSeg;
  1199   1240       Page *pNew = 0;
  1200   1241       if( bLast ){
  1201         -      *pRc = lsmFsDbPageLast(pFS, pPtr->pSeg, &pNew);
         1242  +      *pRc = lsmFsDbPageLast(pFS, pSeg, &pNew);
  1202   1243       }else{
  1203         -      *pRc = lsmFsDbPageGet(pFS, pPtr->pSeg->iFirst, &pNew);
         1244  +      *pRc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pNew);
  1204   1245       }
  1205   1246       segmentPtrSetPage(pPtr, pNew);
  1206   1247     }
  1207   1248   }
  1208   1249   
  1209   1250   
  1210   1251   /*
................................................................................
  1226   1267         && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
  1227   1268     ){
  1228   1269       rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
  1229   1270     }
  1230   1271   
  1231   1272     if( rc==LSM_OK && pPtr->pPg ){
  1232   1273       rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
         1274  +    if( rc==LSM_OK && bLast && pPtr->pSeg!=&pLvl->lhs ){
         1275  +      int res = sortedKeyCompare(pCsr->pDb->xCmp,
         1276  +          rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
         1277  +          pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
         1278  +      );
         1279  +      if( res<0 ) segmentPtrReset(pPtr);
         1280  +    }
  1233   1281     }
  1234   1282     
  1235   1283     bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
  1236   1284     if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
  1237   1285       rc = segmentPtrAdvance(pCsr, pPtr, bLast);
  1238   1286     }
  1239   1287   
  1240         -
         1288  +#if 0
  1241   1289     if( bLast && rc==LSM_OK && pPtr->pPg
  1242   1290      && pPtr->pSeg==&pLvl->lhs 
  1243   1291      && pLvl->nRight && (pPtr->eType & LSM_START_DELETE)
  1244   1292     ){
  1245   1293       pPtr->iCell++;
  1246   1294       pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic);
  1247   1295       pPtr->pKey = pLvl->pSplitKey;
  1248   1296       pPtr->nKey = pLvl->nSplitKey;
  1249   1297       pPtr->pVal = 0;
  1250   1298       pPtr->nVal = 0;
  1251   1299     }
         1300  +#endif
  1252   1301   
  1253   1302     return rc;
  1254   1303   }
  1255   1304   
  1256   1305   static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
  1257   1306     assert( pPtr->pPg );
  1258   1307     *ppKey = pPtr->pKey;
................................................................................
  1289   1338     int iTopic = 0;                 /* TODO: Fix me */
  1290   1339   
  1291   1340     for(eDir=-1; eDir<=1; eDir+=2){
  1292   1341       Page *pTest = pPtr->pPg;
  1293   1342   
  1294   1343       lsmFsPageRef(pTest);
  1295   1344       while( pTest ){
         1345  +      Segment *pSeg = pPtr->pSeg;
  1296   1346         Page *pNext;
  1297   1347   
  1298         -      int rc = lsmFsDbPageNext(pPtr->pSeg, pTest, eDir, &pNext);
         1348  +      int rc = lsmFsDbPageNext(pSeg, pTest, eDir, &pNext);
  1299   1349         lsmFsPageRelease(pTest);
  1300   1350         if( rc ) return 1;
  1301   1351         pTest = pNext;
  1302   1352   
  1303   1353         if( pTest ){
  1304   1354           int nData;
  1305   1355           u8 *aData = fsPageData(pTest, &nData);
................................................................................
  1309   1359             int nPgKey;
  1310   1360             int iPgTopic;
  1311   1361             u8 *pPgKey;
  1312   1362             int res;
  1313   1363             int iCell;
  1314   1364   
  1315   1365             iCell = ((eDir < 0) ? (nCell-1) : 0);
  1316         -          pPgKey = pageGetKey(pTest, iCell, &iPgTopic, &nPgKey, &blob);
         1366  +          pPgKey = pageGetKey(pSeg, pTest, iCell, &iPgTopic, &nPgKey, &blob);
  1317   1367             res = iTopic - iPgTopic;
  1318   1368             if( res==0 ) res = pCsr->pDb->xCmp(pKey, nKey, pPgKey, nPgKey);
  1319   1369             if( (eDir==1 && res>0) || (eDir==-1 && res<0) ){
  1320   1370               /* Taking this branch means something has gone wrong. */
  1321   1371               char *zMsg = lsmMallocPrintf(pEnv, "Key \"%s\" is not on page %d", 
  1322   1372                   keyToString(pEnv, pKey, nKey), lsmFsPageNumber(pPtr->pPg)
  1323   1373               );
................................................................................
  1379   1429       u8 *pLastKey;
  1380   1430       int nLastKey;
  1381   1431       int iLastTopic;
  1382   1432       int res;                      /* Result of comparison */
  1383   1433       Page *pNext;
  1384   1434   
  1385   1435       /* Load the last key on the current page. */
  1386         -    pLastKey = pageGetKey(
         1436  +    pLastKey = pageGetKey(pPtr->pSeg,
  1387   1437           pPtr->pPg, pPtr->nCell-1, &iLastTopic, &nLastKey, &pPtr->blob1
  1388   1438       );
  1389   1439   
  1390   1440       /* If the loaded key is >= than (pKey/nKey), break out of the loop.
  1391   1441       ** If (pKey/nKey) is present in this array, it must be on the current 
  1392   1442       ** page.  */
  1393   1443       res = sortedKeyCompare(
................................................................................
  1722   1772     do {
  1723   1773       Pgno *piFirst = 0;
  1724   1774       if( aPg ){
  1725   1775         aPg[i++] = iPg;
  1726   1776         piFirst = &aPg[i];
  1727   1777       }
  1728   1778   
  1729         -    rc = lsmFsDbPageGet(pCsr->pDb->pFS, iPg, &pPg);
         1779  +    rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, iPg, &pPg);
  1730   1780       assert( rc==LSM_OK || pPg==0 );
  1731   1781       if( rc==LSM_OK ){
  1732   1782         u8 *aData;                  /* Buffer containing page data */
  1733   1783         int nData;                  /* Size of aData[] in bytes */
  1734   1784         int iMin;
  1735   1785         int iMax;
  1736   1786         int nRec;
................................................................................
  1748   1798         while( iMax>=iMin ){
  1749   1799           int iTry = (iMin+iMax)/2;
  1750   1800           void *pKeyT; int nKeyT;       /* Key for cell iTry */
  1751   1801           int iTopicT;                  /* Topic for key pKeyT/nKeyT */
  1752   1802           Pgno iPtr;                    /* Pointer associated with cell iTry */
  1753   1803           int res;                      /* (pKey - pKeyT) */
  1754   1804   
  1755         -        rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob);
         1805  +        rc = pageGetBtreeKey(
         1806  +            pSeg, pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob
         1807  +        );
  1756   1808           if( rc!=LSM_OK ) break;
  1757   1809           if( piFirst && pKeyT==blob.pData ){
  1758   1810             *piFirst = pageGetBtreeRef(pPg, iTry);
  1759   1811             piFirst = 0;
  1760   1812             i++;
  1761   1813           }
  1762   1814   
................................................................................
  1869   1921       );
  1870   1922       if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
  1871   1923         res = 0;
  1872   1924       }
  1873   1925     }
  1874   1926     
  1875   1927     if( res>=0 ){
         1928  +    int bHit = 0;                 /* True if at least one rhs is not EOF */
  1876   1929       int iPtr = *piPgno;
  1877   1930       int i;
  1878   1931       for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
         1932  +      SegmentPtr *pPtr = &aPtr[i];
  1879   1933         iOut = 0;
  1880   1934         rc = seekInSegment(
  1881         -          pCsr, &aPtr[i], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
         1935  +          pCsr, pPtr, iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
  1882   1936         );
  1883   1937         iPtr = iOut;
         1938  +
         1939  +      /* If the segment-pointer has settled on a key that is smaller than
         1940  +      ** the splitkey, invalidate the segment-pointer.  */
         1941  +      if( pPtr->pPg ){
         1942  +        res = sortedKeyCompare(pCsr->pDb->xCmp, 
         1943  +            rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey, 
         1944  +            pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
         1945  +        );
         1946  +        if( res<0 ) segmentPtrReset(pPtr);
         1947  +      }
         1948  +
         1949  +      if( aPtr[i].pKey ) bHit = 1;
  1884   1950       }
  1885   1951   
  1886         -    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
         1952  +    if( rc==LSM_OK && eSeek==LSM_SEEK_LE && bHit==0 ){
  1887   1953         rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
  1888   1954       }
  1889   1955     }
  1890   1956   
  1891   1957     assert( eSeek==LSM_SEEK_EQ || bStop==0 );
  1892   1958     *piPgno = iOut;
  1893   1959     *pbStop = bStop;
................................................................................
  2089   2155     MultiCursor *pCsr, 
  2090   2156     int iPtr,
  2091   2157     int bReverse
  2092   2158   ){
  2093   2159     int rc;
  2094   2160     SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
  2095   2161     Level *pLvl = pPtr->pLevel;
  2096         -  int bComposite;
         2162  +  int bComposite;                 /* True if pPtr is part of composite level */
  2097   2163   
         2164  +  /* Advance the segment-pointer object. */
  2098   2165     rc = segmentPtrAdvance(pCsr, pPtr, bReverse);
  2099   2166     if( rc!=LSM_OK ) return rc;
         2167  +
  2100   2168     bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);
         2169  +  if( bComposite && pPtr->pPg==0 ){
         2170  +    int bFix = 0;
         2171  +    if( (bReverse==0)==(pPtr->pSeg==&pLvl->lhs) ){
         2172  +      int i;
         2173  +      if( bReverse ){
         2174  +        SegmentPtr *pLhs = &pCsr->aPtr[iPtr - 1 - (pPtr->pSeg - pLvl->aRhs)];
         2175  +        for(i=0; i<pLvl->nRight; i++){
         2176  +          if( pLhs[i+1].pPg ) break;
         2177  +        }
         2178  +        if( i==pLvl->nRight ){
         2179  +          bFix = 1;
         2180  +          rc = segmentPtrEnd(pCsr, pLhs, 1);
         2181  +        }
         2182  +      }else{
         2183  +        bFix = 1;
         2184  +        for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
         2185  +          rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
         2186  +        }
         2187  +      }
         2188  +    }
  2101   2189   
         2190  +    if( bFix ){
         2191  +      int i;
         2192  +      for(i=pCsr->nTree-1; i>0; i--){
         2193  +        multiCursorDoCompare(pCsr, i, bReverse);
         2194  +      }
         2195  +    }
         2196  +  }
         2197  +
         2198  +#if 0
  2102   2199     if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
  2103   2200      && bReverse==0                                /* csr advanced forwards */
  2104   2201      && pPtr->pPg==0                               /* segment at EOF */
  2105   2202     ){
  2106   2203       int i;
  2107   2204       for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
  2108   2205         rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
  2109   2206       }
  2110   2207       for(i=pCsr->nTree-1; i>0; i--){
  2111   2208         multiCursorDoCompare(pCsr, i, 0);
  2112   2209       }
  2113   2210     }
         2211  +#endif
  2114   2212   
  2115   2213     return rc;
  2116   2214   }
  2117   2215   
  2118   2216   static void mcursorFreeComponents(MultiCursor *pCsr){
  2119   2217     int i;
  2120   2218     lsm_env *pEnv = pCsr->pDb->pEnv;
................................................................................
  2213   2311     for(i=0; i<nRhs; i++){
  2214   2312       pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
  2215   2313       pCsr->aPtr[i].pLevel = pLvl;
  2216   2314     }
  2217   2315   
  2218   2316     return LSM_OK;
  2219   2317   }
         2318  +
         2319  +static void multiCursorAddOne(MultiCursor *pCsr, Level *pLvl, int *pRc){
         2320  +  if( *pRc==LSM_OK ){
         2321  +    int iPtr = pCsr->nPtr;
         2322  +    int i;
         2323  +    pCsr->aPtr[iPtr].pLevel = pLvl;
         2324  +    pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
         2325  +    iPtr++;
         2326  +    for(i=0; i<pLvl->nRight; i++){
         2327  +      pCsr->aPtr[iPtr].pLevel = pLvl;
         2328  +      pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
         2329  +      iPtr++;
         2330  +    }
         2331  +
         2332  +    if( pLvl->nRight && pLvl->pSplitKey==0 ){
         2333  +      sortedSplitkey(pCsr->pDb, pLvl, pRc);
         2334  +    }
         2335  +    pCsr->nPtr = iPtr;
         2336  +  }
         2337  +}
  2220   2338   
  2221   2339   static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
  2222   2340     Level *pLvl;
  2223   2341     int nPtr = 0;
  2224   2342     int iPtr = 0;
  2225   2343     int rc = LSM_OK;
  2226   2344   
................................................................................
  2232   2350       ** complete level may confuse it.  */
  2233   2351       if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
  2234   2352       nPtr += (1 + pLvl->nRight);
  2235   2353     }
  2236   2354   
  2237   2355     assert( pCsr->aPtr==0 );
  2238   2356     pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);
  2239         -  if( rc==LSM_OK ) pCsr->nPtr = nPtr;
  2240   2357   
  2241         -  for(pLvl=pSnap->pLevel; pLvl && rc==LSM_OK; pLvl=pLvl->pNext){
  2242         -    int i;
  2243         -    if( pLvl->flags & LEVEL_INCOMPLETE ) continue;
  2244         -    pCsr->aPtr[iPtr].pLevel = pLvl;
  2245         -    pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
  2246         -    iPtr++;
  2247         -    for(i=0; i<pLvl->nRight; i++){
  2248         -      pCsr->aPtr[iPtr].pLevel = pLvl;
  2249         -      pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
  2250         -      iPtr++;
  2251         -    }
  2252         -
  2253         -    if( pLvl->nRight && pLvl->pSplitKey==0 ){
  2254         -      lsmSortedSplitkey(pCsr->pDb, pLvl, &rc);
         2358  +  for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
         2359  +    if( (pLvl->flags & LEVEL_INCOMPLETE)==0 ){
         2360  +      multiCursorAddOne(pCsr, pLvl, &rc);
  2255   2361       }
  2256   2362     }
  2257   2363   
  2258   2364     return rc;
  2259   2365   }
  2260   2366   
  2261   2367   static int multiCursorInit(MultiCursor *pCsr, Snapshot *pSnap){
................................................................................
  2408   2514     void *pCtx                      /* First argument to pass to callback */
  2409   2515   ){
  2410   2516     MultiCursor *pCsr;              /* Cursor used to read db */
  2411   2517     int rc = LSM_OK;                /* Return Code */
  2412   2518     Snapshot *pSnap = 0;
  2413   2519   
  2414   2520     assert( pDb->pWorker );
  2415         -  rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
  2416         -  if( rc!=LSM_OK ) return rc;
         2521  +  if( pDb->bIncrMerge ){
         2522  +    rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
         2523  +    if( rc!=LSM_OK ) return rc;
         2524  +  }else{
         2525  +    pSnap = pDb->pWorker;
         2526  +  }
  2417   2527   
  2418   2528     pCsr = multiCursorNew(pDb, &rc);
  2419   2529     if( pCsr ){
  2420   2530       rc = multiCursorAddAll(pCsr, pSnap);
  2421   2531       pCsr->flags |= CURSOR_IGNORE_DELETE;
  2422   2532     }
  2423   2533     
................................................................................
  2444   2554           if( x(pCtx, iBlk, iSnap) ) break;
  2445   2555           rc = multiCursorAdvance(pCsr, !bReverse);
  2446   2556         }
  2447   2557       }
  2448   2558     }
  2449   2559   
  2450   2560     lsmMCursorClose(pCsr);
  2451         -  lsmFreeSnapshot(pDb->pEnv, pSnap);
         2561  +  if( pSnap!=pDb->pWorker ){
         2562  +    lsmFreeSnapshot(pDb->pEnv, pSnap);
         2563  +  }
  2452   2564   
  2453   2565     return rc;
  2454   2566   }
  2455   2567   
  2456   2568   int lsmSortedLoadFreelist(
  2457   2569     lsm_db *pDb,                    /* Database handle (must be worker) */
  2458   2570     void **ppVal,                   /* OUT: Blob containing LSM free-list */
................................................................................
  2516   2628     if( *pRc==LSM_OK ){
  2517   2629       void *pKey;
  2518   2630       int nKey;
  2519   2631       multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
  2520   2632       *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
  2521   2633     }
  2522   2634   }
         2635  +
         2636  +#ifndef NDEBUG
         2637  +static void assertCursorTree(MultiCursor *pCsr){
         2638  +  int bRev = !!(pCsr->flags & CURSOR_PREV_OK);
         2639  +  int *aSave = pCsr->aTree;
         2640  +  int nSave = pCsr->nTree;
         2641  +  int rc;
         2642  +
         2643  +  pCsr->aTree = 0;
         2644  +  pCsr->nTree = 0;
         2645  +  rc = multiCursorAllocTree(pCsr);
         2646  +  if( rc==LSM_OK ){
         2647  +    int i;
         2648  +    for(i=pCsr->nTree-1; i>0; i--){
         2649  +      multiCursorDoCompare(pCsr, i, bRev);
         2650  +    }
         2651  +
         2652  +    assert( nSave==pCsr->nTree 
         2653  +        && 0==memcmp(aSave, pCsr->aTree, sizeof(int)*nSave)
         2654  +    );
         2655  +
         2656  +    lsmFree(pCsr->pDb->pEnv, pCsr->aTree);
         2657  +  }
         2658  +
         2659  +  pCsr->aTree = aSave;
         2660  +  pCsr->nTree = nSave;
         2661  +}
         2662  +#else
         2663  +# define assertCursorTree(x)
         2664  +#endif
  2523   2665   
  2524   2666   static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
  2525   2667     int eType = pCsr->eType;
  2526   2668     int iKey;
  2527   2669     int i;
  2528         -  int rdmask = 0;
         2670  +  int rdmask;
  2529   2671     
  2530   2672     assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
  2531         -  if( pCsr->flags & CURSOR_NEXT_OK ){
  2532         -    rdmask = LSM_END_DELETE;
  2533         -  }else{
  2534         -    rdmask = LSM_START_DELETE;
  2535         -  }
         2673  +  assertCursorTree(pCsr);
  2536   2674   
         2675  +  rdmask = (pCsr->flags & CURSOR_NEXT_OK) ? LSM_END_DELETE : LSM_START_DELETE;
         2676  +
         2677  +  /* If the cursor does not currently point to an actual database key (i.e.
         2678  +  ** it points to a delete key, or the start or end of a range-delete), and
         2679  +  ** the CURSOR_IGNORE_DELETE flag is set, skip past this entry.  */
  2537   2680     if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
  2538   2681       if( (eType & LSM_INSERT)==0 ) return 0;
  2539   2682     }
         2683  +
         2684  +  /* If the cursor points to a system key (free-list entry), and the
         2685  +  ** CURSOR_IGNORE_SYSTEM flag is set, skip thie entry.  */
  2540   2686     if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
  2541   2687       return 0;
  2542   2688     }
  2543   2689   
  2544         -  /* Check if this key has already been deleted by a range-delete */
         2690  +#ifndef NDEBUG
         2691  +  /* This block fires assert() statements to check one of the assumptions
         2692  +  ** in the comment below - that if the lhs sub-cursor of a level undergoing
         2693  +  ** a merge is valid, then all the rhs sub-cursors must be at EOF. 
         2694  +  **
         2695  +  ** Also assert that all rhs sub-cursors are either at EOF or point to
         2696  +  ** a key that is not less than the level split-key.  */
         2697  +  for(i=0; i<pCsr->nPtr; i++){
         2698  +    SegmentPtr *pPtr = &pCsr->aPtr[i];
         2699  +    Level *pLvl = pPtr->pLevel;
         2700  +    if( pLvl->nRight && pPtr->pPg ){
         2701  +      if( pPtr->pSeg==&pLvl->lhs ){
         2702  +        int j;
         2703  +        for(j=0; j<pLvl->nRight; j++) assert( pPtr[j+1].pPg==0 );
         2704  +      }else{
         2705  +        int res = sortedKeyCompare(pCsr->pDb->xCmp, 
         2706  +            rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
         2707  +            pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
         2708  +        );
         2709  +        assert( res>=0 );
         2710  +      }
         2711  +    }
         2712  +  }
         2713  +#endif
         2714  +
         2715  +  /* Now check if this key has already been deleted by a range-delete. If 
         2716  +  ** so, skip past it.
         2717  +  **
         2718  +  ** Assume, for the moment, that the tree contains no levels currently 
         2719  +  ** undergoing incremental merge, and that this cursor is iterating forwards
         2720  +  ** through the database keys. The cursor currently points to a key in
         2721  +  ** level L. This key has already been deleted if any of the sub-cursors
         2722  +  ** that point to levels newer than L (or to the in-memory tree) point to
         2723  +  ** a key greater than the current key with the LSM_END_DELETE flag set.
         2724  +  **
         2725  +  ** Or, if the cursor is iterating backwards through data keys, if any
         2726  +  ** such sub-cursor points to a key smaller than the current key with the
         2727  +  ** LSM_START_DELETE flag set.
         2728  +  **
         2729  +  ** Why it works with levels undergoing a merge too:
         2730  +  **
         2731  +  ** When a cursor iterates forwards, the sub-cursors for the rhs of a 
         2732  +  ** level are only activated once the lhs reaches EOF. So when iterating
         2733  +  ** forwards, the keys visited are the same as if the level was completely
         2734  +  ** merged.
         2735  +  **
         2736  +  ** If the cursor is iterating backwards, then the lhs sub-cursor is not 
         2737  +  ** initialized until the last of the rhs sub-cursors has reached EOF.
         2738  +  ** Additionally, if the START_DELETE flag is set on the last entry (in
         2739  +  ** reverse order - so the entry with the smallest key) of a rhs sub-cursor,
         2740  +  ** then a pseudo-key equal to the levels split-key with the END_DELETE
         2741  +  ** flag set is visited by the sub-cursor.
         2742  +  */ 
  2545   2743     iKey = pCsr->aTree[1];
  2546   2744     for(i=0; i<iKey; i++){
  2547   2745       int csrflags;
  2548   2746       multiCursorGetKey(pCsr, i, &csrflags, 0, 0);
  2549   2747       if( (rdmask & csrflags) ){
  2550   2748         const int SD_ED = (LSM_START_DELETE|LSM_END_DELETE);
  2551   2749         if( (csrflags & SD_ED)==SD_ED 
................................................................................
  2560   2758             continue;
  2561   2759           }
  2562   2760         }
  2563   2761         return 0;
  2564   2762       }
  2565   2763     }
  2566   2764   
  2567         -#if 0
  2568         -  if( (iKey>0 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[0]))) 
  2569         -   || (iKey>1 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[1]))) 
  2570         -  ){
  2571         -    return 0;
  2572         -  }
  2573         -  if( iKey>CURSOR_DATA_SYSTEM && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){
  2574         -    int eType;
  2575         -    multiCursorGetKey(pCsr, CURSOR_DATA_SYSTEM, &eType, 0, 0);
  2576         -    if( rdmask & eType ) return 0;
  2577         -  }
  2578         -
  2579         -  for(i=CURSOR_DATA_SEGMENT; i<iKey; i++){
  2580         -    int iPtr = i-CURSOR_DATA_SEGMENT;
  2581         -    if( pCsr->aPtr[iPtr].pPg && (pCsr->aPtr[iPtr].eType & rdmask) ){
  2582         -      return 0;
  2583         -    }
  2584         -  }
  2585         -#endif
  2586         -
         2765  +  /* The current cursor position is one this cursor should visit. Return 1. */
  2587   2766     return 1;
  2588   2767   }
         2768  +
         2769  +static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
         2770  +  int rc;
         2771  +
         2772  +  rc = multiCursorAllocTree(pCsr);
         2773  +  if( rc==LSM_OK ){
         2774  +    int i;
         2775  +    for(i=pCsr->nTree-1; i>0; i--){
         2776  +      multiCursorDoCompare(pCsr, i, bRev);
         2777  +    }
         2778  +  }
         2779  +
         2780  +  assertCursorTree(pCsr);
         2781  +  multiCursorCacheKey(pCsr, &rc);
         2782  +
         2783  +  if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
         2784  +    rc = multiCursorAdvance(pCsr, bRev);
         2785  +  }
         2786  +  return rc;
         2787  +}
         2788  +
  2589   2789   
  2590   2790   static int multiCursorEnd(MultiCursor *pCsr, int bLast){
  2591   2791     int rc = LSM_OK;
  2592   2792     int i;
  2593   2793   
  2594   2794     pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  2595   2795     pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
  2596         -
  2597         -  if( pCsr->apTreeCsr[0] ){
  2598         -    rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast);
  2599         -  }
  2600         -  if( rc==LSM_OK && pCsr->apTreeCsr[1] ){
  2601         -    rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast);
  2602         -  }
  2603         -
  2604   2796     pCsr->iFree = 0;
         2797  +
         2798  +  /* Position the two in-memory tree cursors */
         2799  +  for(i=0; rc==LSM_OK && i<2; i++){
         2800  +    if( pCsr->apTreeCsr[i] ){
         2801  +      rc = lsmTreeCursorEnd(pCsr->apTreeCsr[i], bLast);
         2802  +    }
         2803  +  }
  2605   2804   
  2606   2805     for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
  2607   2806       SegmentPtr *pPtr = &pCsr->aPtr[i];
  2608   2807       Level *pLvl = pPtr->pLevel;
         2808  +    int iRhs;
         2809  +    int bHit = 0;
  2609   2810   
  2610         -    rc = segmentPtrEnd(pCsr, pPtr, bLast);
  2611         -    if( rc==LSM_OK && bLast==0 && pLvl->nRight && pPtr->pSeg==&pLvl->lhs ){
  2612         -      int iRhs;
  2613         -      for(iRhs=1+i; rc==LSM_OK && iRhs<1+i+pLvl->nRight; iRhs++){
  2614         -        SegmentPtr *pRhs = &pCsr->aPtr[iRhs];
  2615         -        if( pPtr->pPg==0 ){
  2616         -          rc = sortedRhsFirst(pCsr, pLvl, pRhs);
         2811  +    if( bLast ){
         2812  +      for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
         2813  +        rc = segmentPtrEnd(pCsr, &pPtr[iRhs+1], 1);
         2814  +        if( pPtr[iRhs+1].pPg ) bHit = 1;
         2815  +      }
         2816  +      if( bHit==0 && rc==LSM_OK ){
         2817  +        rc = segmentPtrEnd(pCsr, pPtr, 1);
         2818  +      }else{
         2819  +        segmentPtrReset(pPtr);
         2820  +      }
         2821  +    }else{
         2822  +      int bLhs = (pPtr->pSeg==&pLvl->lhs);
         2823  +      assert( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[0] );
         2824  +
         2825  +      if( bLhs ){
         2826  +        rc = segmentPtrEnd(pCsr, pPtr, 0);
         2827  +        if( pPtr->pKey ) bHit = 1;
         2828  +      }
         2829  +      for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
         2830  +        if( bHit ){
         2831  +          segmentPtrReset(&pPtr[iRhs+1]);
  2617   2832           }else{
  2618         -          segmentPtrReset(pRhs);
         2833  +          rc = sortedRhsFirst(pCsr, pLvl, &pPtr[iRhs+bLhs]);
  2619   2834           }
  2620   2835         }
  2621         -      i += pLvl->nRight;
  2622   2836       }
         2837  +    i += pLvl->nRight;
  2623   2838     }
  2624   2839   
         2840  +  /* And the b-tree cursor, if applicable */
  2625   2841     if( rc==LSM_OK && pCsr->pBtCsr ){
  2626   2842       assert( bLast==0 );
  2627   2843       rc = btreeCursorFirst(pCsr->pBtCsr);
  2628   2844     }
  2629   2845   
  2630   2846     if( rc==LSM_OK ){
  2631         -    rc = multiCursorAllocTree(pCsr);
         2847  +    rc = multiCursorSetupTree(pCsr, bLast);
  2632   2848     }
  2633         -  if( rc==LSM_OK ){
  2634         -    for(i=pCsr->nTree-1; i>0; i--){
  2635         -      multiCursorDoCompare(pCsr, i, bLast);
  2636         -    }
  2637         -  }
  2638         -
  2639         -  multiCursorCacheKey(pCsr, &rc);
  2640         -  if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
  2641         -    if( bLast ){
  2642         -      rc = lsmMCursorPrev(pCsr);
  2643         -    }else{
  2644         -      rc = lsmMCursorNext(pCsr);
  2645         -    }
  2646         -  }
  2647         -
         2849  +  
  2648   2850     return rc;
  2649   2851   }
  2650   2852   
  2651   2853   
  2652   2854   int mcursorSave(MultiCursor *pCsr){
  2653   2855     int rc = LSM_OK;
  2654   2856     if( pCsr->aTree ){
................................................................................
  2882   3084   
  2883   3085       /* If this cursor is configured to skip deleted keys, and the current
  2884   3086       ** cursor points to a SORTED_DELETE entry, then the cursor has not been 
  2885   3087       ** successfully advanced.  
  2886   3088       **
  2887   3089       ** Similarly, if the cursor is configured to skip system keys and the
  2888   3090       ** current cursor points to a system key, it has not yet been advanced.
  2889         -     */
         3091  +    */
  2890   3092       if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
  2891   3093     }
  2892   3094     return 1;
  2893   3095   }
  2894   3096   
  2895   3097   static void flCsrAdvance(MultiCursor *pCsr){
  2896   3098     assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
................................................................................
  2920   3122   }
  2921   3123   
  2922   3124   static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
  2923   3125     int rc = LSM_OK;                /* Return Code */
  2924   3126     if( lsmMCursorValid(pCsr) ){
  2925   3127       do {
  2926   3128         int iKey = pCsr->aTree[1];
         3129  +
         3130  +      assertCursorTree(pCsr);
  2927   3131   
  2928   3132         /* If this multi-cursor is advancing forwards, and the sub-cursor
  2929   3133         ** being advanced is the one that separator keys may be being read
  2930   3134         ** from, record the current absolute pointer value.  */
  2931   3135         if( pCsr->pPrevMergePtr ){
  2932   3136           if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
  2933   3137             assert( pCsr->pBtCsr );
................................................................................
  2958   3162           rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
  2959   3163         }
  2960   3164         if( rc==LSM_OK ){
  2961   3165           int i;
  2962   3166           for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
  2963   3167             multiCursorDoCompare(pCsr, i, bReverse);
  2964   3168           }
         3169  +        assertCursorTree(pCsr);
  2965   3170         }
  2966   3171       }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
  2967   3172     }
  2968   3173     return rc;
  2969   3174   }
  2970   3175   
  2971   3176   int lsmMCursorNext(MultiCursor *pCsr){
................................................................................
  3197   3402   
  3198   3403       do {
  3199   3404         Page *pPg = 0;
  3200   3405         u8 *aData;
  3201   3406         int nData;
  3202   3407         int flags;
  3203   3408   
  3204         -      rc = lsmFsDbPageGet(pFS, iPg, &pPg);
         3409  +      rc = lsmFsDbPageGet(pFS, pSeg, iPg, &pPg);
  3205   3410         if( rc!=LSM_OK ) break;
  3206   3411   
  3207   3412         aData = fsPageData(pPg, &nData);
  3208   3413         flags = pageGetFlags(aData, nData);
  3209   3414         if( flags&SEGMENT_BTREE_FLAG ){
  3210   3415           Page **apNew = (Page **)lsmRealloc(
  3211   3416               pEnv, apHier, sizeof(Page *)*(nHier+1)
................................................................................
  3643   3848   
  3644   3849     if( pCsr->pBtCsr ){
  3645   3850       rc = LSM_OK;
  3646   3851       iFPtr = pMW->pLevel->pNext->lhs.iFirst;
  3647   3852     }else if( pCsr->nPtr>0 ){
  3648   3853       Segment *pSeg;
  3649   3854       pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg;
  3650         -    rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iFirst, &pPg);
         3855  +    rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg, pSeg->iFirst, &pPg);
  3651   3856       if( rc==LSM_OK ){
  3652   3857         u8 *aData;                    /* Buffer for page pPg */
  3653   3858         int nData;                    /* Size of aData[] in bytes */
  3654   3859         aData = fsPageData(pPg, &nData);
  3655   3860         iFPtr = pageGetPtr(aData, nData);
  3656   3861         lsmFsPageRelease(pPg);
  3657   3862       }
................................................................................
  3779   3984       }
  3780   3985     }
  3781   3986   
  3782   3987     return rc;
  3783   3988   }
  3784   3989   
  3785   3990   
  3786         -static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
  3787         -  int rc;
  3788         -
  3789         -  assert( pCsr->aTree==0 );
  3790         -
  3791         -  rc = multiCursorAllocTree(pCsr);
  3792         -  if( rc==LSM_OK ){
  3793         -    int i;
  3794         -    for(i=pCsr->nTree-1; i>0; i--){
  3795         -      multiCursorDoCompare(pCsr, i, bRev);
  3796         -    }
  3797         -  }
  3798         -
  3799         -  multiCursorCacheKey(pCsr, &rc);
  3800         -  return rc;
  3801         -}
  3802         -
  3803   3991   /*
  3804   3992   ** Free all resources allocated by mergeWorkerInit().
  3805   3993   */
  3806   3994   static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){
  3807   3995     int i;                          /* Iterator variable */
  3808   3996     int rc = *pRc;
  3809   3997     MultiCursor *pCsr = pMW->pCsr;
................................................................................
  3985   4173   
  3986   4174     iVal = pCsr->aTree[1];
  3987   4175     mergeRangeDeletes(pCsr, &iVal, &eType);
  3988   4176   
  3989   4177     if( eType!=0 ){
  3990   4178       if( pMW->aGobble ){
  3991   4179         int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
  3992         -      if( iGobble<pCsr->nPtr ){
         4180  +      if( iGobble<pCsr->nPtr && iGobble>=0 ){
  3993   4181           SegmentPtr *pGobble = &pCsr->aPtr[iGobble];
  3994   4182           if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
  3995   4183             pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
  3996   4184           }
  3997   4185         }
  3998   4186       }
  3999   4187   
................................................................................
  4015   4203       }
  4016   4204     }
  4017   4205   
  4018   4206     /* Advance the cursor to the next input record (assuming one exists). */
  4019   4207     assert( lsmMCursorValid(pMW->pCsr) );
  4020   4208     if( rc==LSM_OK ) rc = lsmMCursorNext(pMW->pCsr);
  4021   4209   
  4022         -  /* If the cursor is at EOF, the merge is finished. Release all page
  4023         -  ** references currently held by the merge worker and inform the 
  4024         -  ** FileSystem object that no further pages will be appended to either 
  4025         -  ** the main or separators array. 
  4026         -  */
  4027         -  if( rc==LSM_OK && !lsmMCursorValid(pMW->pCsr) ){
  4028         -
  4029         -    mergeWorkerShutdown(pMW, &rc);
  4030         -    if( pSeg->iFirst ){
  4031         -      rc = lsmFsSortedFinish(pDb->pFS, pSeg);
  4032         -    }
  4033         -
  4034         -#ifdef LSM_DEBUG_EXPENSIVE
  4035         -    if( rc==LSM_OK ){
  4036         -#if 0
  4037         -      rc = assertBtreeOk(pDb, pSeg);
  4038         -      if( pMW->pCsr->pBtCsr ){
  4039         -        Segment *pNext = &pMW->pLevel->pNext->lhs;
  4040         -        rc = assertPointersOk(pDb, pSeg, pNext, 0);
  4041         -      }
  4042         -#endif
  4043         -    }
  4044         -#endif
  4045         -  }
  4046   4210     return rc;
  4047   4211   }
  4048   4212   
  4049   4213   static int mergeWorkerDone(MergeWorker *pMW){
  4050   4214     return pMW->pCsr==0 || !lsmMCursorValid(pMW->pCsr);
  4051   4215   }
  4052   4216   
................................................................................
  4070   4234     int eTree,                      /* One of the TREE_XXX constants */
  4071   4235     int *pnWrite                    /* OUT: Number of database pages written */
  4072   4236   ){
  4073   4237     int rc = LSM_OK;                /* Return Code */
  4074   4238     MultiCursor *pCsr = 0;
  4075   4239     Level *pNext = 0;               /* The current top level */
  4076   4240     Level *pNew;                    /* The new level itself */
  4077         -  Segment *pDel = 0;              /* Delete separators from this segment */
         4241  +  Segment *pLinked = 0;           /* Delete separators from this segment */
         4242  +  Level *pDel = 0;                /* Delete this entire level */
  4078   4243     int nWrite = 0;                 /* Number of database pages written */
  4079   4244     Freelist freelist;
  4080   4245   
  4081   4246     if( eTree!=TREE_NONE ){
  4082   4247       rc = lsmShmCacheChunks(pDb, pDb->treehdr.nChunk);
  4083   4248     }
  4084   4249   
................................................................................
  4101   4266     pCsr = multiCursorNew(pDb, &rc);
  4102   4267     if( pCsr ){
  4103   4268       pCsr->pDb = pDb;
  4104   4269       rc = multiCursorVisitFreelist(pCsr);
  4105   4270       if( rc==LSM_OK ){
  4106   4271         rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
  4107   4272       }
  4108         -    if( rc==LSM_OK 
  4109         -        && pNext && pNext->pMerge==0 && pNext->lhs.iRoot 
  4110         -        && (eTree!=TREE_NONE || (pNext->flags & LEVEL_FREELIST_ONLY))
  4111         -    ){
  4112         -      pDel = &pNext->lhs;
  4113         -      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
         4273  +    if( rc==LSM_OK && pNext && pNext->pMerge==0 ){
         4274  +      if( (pNext->flags & LEVEL_FREELIST_ONLY) ){
         4275  +        pDel = pNext;
         4276  +        pCsr->aPtr = lsmMallocZeroRc(pDb->pEnv, sizeof(SegmentPtr), &rc);
         4277  +        multiCursorAddOne(pCsr, pNext, &rc);
         4278  +      }else if( eTree!=TREE_NONE && pNext->lhs.iRoot ){
         4279  +        pLinked = &pNext->lhs;
         4280  +        rc = btreeCursorNew(pDb, pLinked, &pCsr->pBtCsr);
         4281  +      }
  4114   4282       }
  4115   4283   
  4116   4284       /* If this will be the only segment in the database, discard any delete
  4117   4285       ** markers present in the in-memory tree.  */
  4118   4286       if( pNext==0 ){
  4119   4287         multiCursorIgnoreDelete(pCsr);
  4120   4288       }
................................................................................
  4141   4309       mergeworker.bFlush = 1;
  4142   4310   
  4143   4311       /* Do the work to create the new merged segment on disk */
  4144   4312       if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
  4145   4313       while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
  4146   4314         rc = mergeWorkerStep(&mergeworker);
  4147   4315       }
         4316  +    mergeWorkerShutdown(&mergeworker, &rc);
  4148   4317       assert( rc!=LSM_OK || mergeworker.nWork==0 || pNew->lhs.iFirst );
  4149         -
         4318  +    if( rc==LSM_OK && pNew->lhs.iFirst ){
         4319  +      rc = lsmFsSortedFinish(pDb->pFS, &pNew->lhs);
         4320  +    }
  4150   4321       nWrite = mergeworker.nWork;
  4151         -    mergeWorkerShutdown(&mergeworker, &rc);
  4152   4322       pNew->flags &= ~LEVEL_INCOMPLETE;
  4153   4323       if( eTree==TREE_NONE ){
  4154   4324         pNew->flags |= LEVEL_FREELIST_ONLY;
  4155   4325       }
  4156   4326       pNew->pMerge = 0;
  4157   4327     }
  4158   4328   
  4159   4329     if( rc!=LSM_OK || pNew->lhs.iFirst==0 ){
  4160   4330       assert( rc!=LSM_OK || pDb->pWorker->freelist.nEntry==0 );
  4161   4331       lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
  4162   4332       sortedFreeLevel(pDb->pEnv, pNew);
  4163   4333     }else{
  4164         -    if( pDel ) pDel->iRoot = 0;
         4334  +    if( pLinked ){
         4335  +      pLinked->iRoot = 0;
         4336  +    }else if( pDel ){
         4337  +      assert( pNew->pNext==pDel );
         4338  +      pNew->pNext = pDel->pNext;
         4339  +      lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
         4340  +      sortedFreeLevel(pDb->pEnv, pDel);
         4341  +    }
  4165   4342   
  4166         -#if 0
  4167         -    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
         4343  +#if LSM_LOG_STRUCTURE
         4344  +    lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "new-toplevel");
  4168   4345   #endif
  4169   4346   
  4170   4347       if( freelist.nEntry ){
  4171   4348         Freelist *p = &pDb->pWorker->freelist;
  4172   4349         lsmFree(pDb->pEnv, p->aEntry);
  4173   4350         memcpy(p, &freelist, sizeof(freelist));
  4174   4351         freelist.aEntry = 0;
................................................................................
  4250   4427       pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
  4251   4428       pNew->pNext = p;
  4252   4429       for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext));
  4253   4430       *pp = pNew;
  4254   4431       lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel);
  4255   4432   
  4256   4433       /* Determine whether or not the next separators will be linked in */
  4257         -    if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){
         4434  +    if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot && pNext 
         4435  +     && (bFreeOnly==0 || (pNext->flags & LEVEL_FREELIST_ONLY))
         4436  +    ){
  4258   4437         bUseNext = 1;
  4259   4438       }
  4260   4439     }
  4261   4440   
  4262   4441     /* Allocate the merge object */
  4263   4442     nByte = sizeof(Merge) + sizeof(MergeInput) * (nMerge + bUseNext);
  4264   4443     pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
................................................................................
  4299   4478     **      separators attached to the LHS of the following level, or neither.
  4300   4479     **
  4301   4480     ** If the new level is the lowest (oldest) in the db, discard any
  4302   4481     ** delete keys. Key annihilation.
  4303   4482     */
  4304   4483     pCsr = multiCursorNew(pDb, &rc);
  4305   4484     if( pCsr ){
         4485  +    pCsr->flags |= CURSOR_NEXT_OK;
  4306   4486       rc = multiCursorAddRhs(pCsr, pLevel);
  4307   4487     }
  4308   4488     if( rc==LSM_OK && pMerge->nInput > pLevel->nRight ){
  4309   4489       rc = btreeCursorNew(pDb, &pNext->lhs, &pCsr->pBtCsr);
  4310   4490     }else if( pNext ){
  4311   4491       multiCursorReadSeparators(pCsr);
  4312   4492     }else{
................................................................................
  4427   4607     ** with the largest number of right-hand segments. Work on it. */
  4428   4608     for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
  4429   4609       if( pLevel->nRight==0 && pThis && pLevel->iAge==pThis->iAge ){
  4430   4610         nThis++;
  4431   4611       }else{
  4432   4612         if( nThis>nBest ){
  4433   4613           if( (pLevel->iAge!=pThis->iAge+1)
  4434         -            || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
  4435         -          ){
         4614  +         || (pLevel->nRight==0 && sortedCountLevels(pLevel)<=pDb->nMerge)
         4615  +        ){
  4436   4616             pBest = pThis;
  4437   4617             nBest = nThis;
  4438   4618           }
  4439   4619         }
  4440   4620         if( pLevel->nRight ){
  4441   4621           if( pLevel->nRight>nBest ){
  4442   4622             nBest = pLevel->nRight;
................................................................................
  4491   4671     if( pTop && pTop->iAge==0
  4492   4672      && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
  4493   4673     ){
  4494   4674       return 1;
  4495   4675     }
  4496   4676     return 0;
  4497   4677   }
         4678  +
         4679  +typedef struct MoveBlockCtx MoveBlockCtx;
         4680  +struct MoveBlockCtx {
         4681  +  int iSeen;                      /* Previous free block on list */
         4682  +  int iFrom;                      /* Total number of blocks in file */
         4683  +};
         4684  +
         4685  +static int moveBlockCb(void *pCtx, int iBlk, i64 iSnapshot){
         4686  +  MoveBlockCtx *p = (MoveBlockCtx *)pCtx;
         4687  +  assert( p->iFrom==0 );
         4688  +  if( iBlk==(p->iSeen-1) ){
         4689  +    p->iSeen = iBlk;
         4690  +    return 0;
         4691  +  }
         4692  +  p->iFrom = p->iSeen-1;
         4693  +  return 1;
         4694  +}
         4695  +
         4696  +/*
         4697  +** This function is called to further compact a database for which all 
         4698  +** of the content has already been merged into a single segment. If 
         4699  +** possible, it moves the contents of a single block from the end of the
         4700  +** file to a free-block that lies closer to the start of the file (allowing
         4701  +** the file to be eventually truncated).
         4702  +*/
         4703  +static int sortedMoveBlock(lsm_db *pDb, int *pnWrite){
         4704  +  Snapshot *p = pDb->pWorker;
         4705  +  Level *pLvl = lsmDbSnapshotLevel(p);
         4706  +  int iFrom;                      /* Block to move */
         4707  +  int iTo;                        /* Destination to move block to */
         4708  +  int rc;                         /* Return code */
         4709  +
         4710  +  MoveBlockCtx sCtx;
         4711  +
         4712  +  assert( pLvl->pNext==0 && pLvl->nRight==0 );
         4713  +  assert( p->redirect.n<=LSM_MAX_BLOCK_REDIRECTS );
         4714  +
         4715  +  *pnWrite = 0;
         4716  +
         4717  +  /* Check that the redirect array is not already full. If it is, return
         4718  +  ** without moving any database content.  */
         4719  +  if( p->redirect.n>=LSM_MAX_BLOCK_REDIRECTS ) return LSM_OK;
         4720  +
         4721  +  /* Find the last block of content in the database file. Do this by 
         4722  +  ** traversing the free-list in reverse (descending block number) order.
         4723  +  ** The first block not on the free list is the one that will be moved.
         4724  +  ** Since the db consists of a single segment, there is no ambiguity as
         4725  +  ** to which segment the block belongs to.  */
         4726  +  sCtx.iSeen = p->nBlock+1;
         4727  +  sCtx.iFrom = 0;
         4728  +  rc = lsmWalkFreelist(pDb, 1, moveBlockCb, &sCtx);
         4729  +  if( rc!=LSM_OK || sCtx.iFrom==0 ) return rc;
         4730  +  iFrom = sCtx.iFrom;
         4731  +
         4732  +  /* Find the first free block in the database, ignoring block 1. Block
         4733  +  ** 1 is tricky as it is smaller than the other blocks.  */
         4734  +  rc = lsmBlockAllocate(pDb, iFrom, &iTo);
         4735  +  if( rc!=LSM_OK || iTo==0 ) return rc;
         4736  +  assert( iTo!=1 && iTo<iFrom );
         4737  +
         4738  +  rc = lsmFsMoveBlock(pDb->pFS, &pLvl->lhs, iTo, iFrom);
         4739  +  if( rc==LSM_OK ){
         4740  +    if( p->redirect.a==0 ){
         4741  +      int nByte = sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS;
         4742  +      p->redirect.a = lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
         4743  +    }
         4744  +    if( rc==LSM_OK ){
         4745  +      memmove(&p->redirect.a[1], &p->redirect.a[0], 
         4746  +          sizeof(struct RedirectEntry) * p->redirect.n
         4747  +      );
         4748  +      p->redirect.a[0].iFrom = iFrom;
         4749  +      p->redirect.a[0].iTo = iTo;
         4750  +      p->redirect.n++;
         4751  +
         4752  +      rc = lsmBlockFree(pDb, iFrom);
         4753  +
         4754  +      *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
         4755  +      pLvl->lhs.pRedirect = &p->redirect;
         4756  +    }
         4757  +  }
         4758  +
         4759  +#if LSM_LOG_STRUCTURE
         4760  +  if( rc==LSM_OK ){
         4761  +    char aBuf[64];
         4762  +    sprintf(aBuf, "move-block %d/%d", p->redirect.n-1, LSM_MAX_BLOCK_REDIRECTS);
         4763  +    lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, aBuf);
         4764  +  }
         4765  +#endif
         4766  +  return rc;
         4767  +}
         4768  +
         4769  +/*
         4770  +*/
         4771  +static int mergeInsertFreelistSegments(
         4772  +  lsm_db *pDb, 
         4773  +  int nFree,
         4774  +  MergeWorker *pMW
         4775  +){
         4776  +  int rc = LSM_OK;
         4777  +  if( nFree>0 ){
         4778  +    MultiCursor *pCsr = pMW->pCsr;
         4779  +    Level *pLvl = pMW->pLevel;
         4780  +    SegmentPtr *aNew1;
         4781  +    Segment *aNew2;
         4782  +
         4783  +    Level *pIter;
         4784  +    Level *pNext;
         4785  +    int i = 0;
         4786  +
         4787  +    aNew1 = (SegmentPtr *)lsmMallocZeroRc(
         4788  +        pDb->pEnv, sizeof(SegmentPtr) * (pCsr->nPtr+nFree), &rc
         4789  +    );
         4790  +    if( rc ) return rc;
         4791  +    memcpy(&aNew1[nFree], pCsr->aPtr, sizeof(SegmentPtr)*pCsr->nPtr);
         4792  +    pCsr->nPtr += nFree;
         4793  +    lsmFree(pDb->pEnv, pCsr->aTree);
         4794  +    lsmFree(pDb->pEnv, pCsr->aPtr);
         4795  +    pCsr->aTree = 0;
         4796  +    pCsr->aPtr = aNew1;
         4797  +
         4798  +    aNew2 = (Segment *)lsmMallocZeroRc(
         4799  +        pDb->pEnv, sizeof(Segment) * (pLvl->nRight+nFree), &rc
         4800  +    );
         4801  +    if( rc ) return rc;
         4802  +    memcpy(&aNew2[nFree], pLvl->aRhs, sizeof(Segment)*pLvl->nRight);
         4803  +    pLvl->nRight += nFree;
         4804  +    lsmFree(pDb->pEnv, pLvl->aRhs);
         4805  +    pLvl->aRhs = aNew2;
         4806  +
         4807  +    for(pIter=pDb->pWorker->pLevel; rc==LSM_OK && pIter!=pLvl; pIter=pNext){
         4808  +      Segment *pSeg = &pLvl->aRhs[i];
         4809  +      memcpy(pSeg, &pIter->lhs, sizeof(Segment));
         4810  +
         4811  +      pCsr->aPtr[i].pSeg = pSeg;
         4812  +      pCsr->aPtr[i].pLevel = pLvl;
         4813  +      rc = segmentPtrEnd(pCsr, &pCsr->aPtr[i], 0);
         4814  +
         4815  +      pDb->pWorker->pLevel = pNext = pIter->pNext;
         4816  +      sortedFreeLevel(pDb->pEnv, pIter);
         4817  +      i++;
         4818  +    }
         4819  +    assert( i==nFree );
         4820  +    assert( rc!=LSM_OK || pDb->pWorker->pLevel==pLvl );
         4821  +
         4822  +    for(i=nFree; i<pCsr->nPtr; i++){
         4823  +      pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
         4824  +    }
         4825  +
         4826  +    lsmFree(pDb->pEnv, pMW->aGobble);
         4827  +    pMW->aGobble = 0;
         4828  +  }
         4829  +  return rc;
         4830  +}
  4498   4831   
  4499   4832   static int sortedWork(
  4500   4833     lsm_db *pDb,                    /* Database handle. Must be worker. */
  4501   4834     int nWork,                      /* Number of pages of work to do */
  4502   4835     int nMerge,                     /* Try to merge this many levels at once */
  4503   4836     int bFlush,                     /* Set if call is to make room for a flush */
  4504   4837     int *pnWrite                    /* OUT: Actual number of pages written */
................................................................................
  4514   4847       Level *pLevel = 0;
  4515   4848   
  4516   4849       /* Find a level to work on. */
  4517   4850       rc = sortedSelectLevel(pDb, nMerge, &pLevel);
  4518   4851       assert( rc==LSM_OK || pLevel==0 );
  4519   4852   
  4520   4853       if( pLevel==0 ){
         4854  +      int nDone = 0;
         4855  +      Level *pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
         4856  +      if( bFlush==0 && nMerge==1 && pTopLevel && pTopLevel->pNext==0 ){
         4857  +        rc = sortedMoveBlock(pDb, &nDone);
         4858  +      }
         4859  +      nRemaining -= nDone;
         4860  +
  4521   4861         /* Could not find any work to do. Finished. */
  4522         -      break;
         4862  +      if( nDone==0 ) break;
  4523   4863       }else{
         4864  +      int bSave = 0;
         4865  +      Freelist freelist = {0, 0, 0};
  4524   4866         MergeWorker mergeworker;    /* State used to work on the level merge */
  4525   4867   
         4868  +      assert( pDb->bIncrMerge==0 );
         4869  +      assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
         4870  +
         4871  +      pDb->bIncrMerge = 1;
  4526   4872         rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
  4527   4873         assert( mergeworker.nWork==0 );
         4874  +      
  4528   4875         while( rc==LSM_OK 
  4529   4876             && 0==mergeWorkerDone(&mergeworker) 
  4530         -          && mergeworker.nWork<nRemaining 
         4877  +          && (mergeworker.nWork<nRemaining || pDb->bUseFreelist)
  4531   4878         ){
         4879  +        int eType = rtTopic(mergeworker.pCsr->eType);
  4532   4880           rc = mergeWorkerStep(&mergeworker);
         4881  +
         4882  +        /* If the cursor now points at the first entry past the end of the
         4883  +        ** user data (i.e. either to EOF or to the first free-list entry
         4884  +        ** that will be added to the run), then check if it is possible to
         4885  +        ** merge in any free-list entries that are either in-memory or in
         4886  +        ** free-list-only blocks.  */
         4887  +        if( rc==LSM_OK && nMerge==1 && eType==0
         4888  +         && (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker))
         4889  +        ){
         4890  +          int nFree = 0;          /* Number of free-list-only levels to merge */
         4891  +          Level *pLvl;
         4892  +          assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );
         4893  +
         4894  +          /* Now check if all levels containing data newer than this one
         4895  +          ** are single-segment free-list only levels. If so, they will be
         4896  +          ** merged in now.  */
         4897  +          for(pLvl=pDb->pWorker->pLevel; 
         4898  +              pLvl!=mergeworker.pLevel && (pLvl->flags & LEVEL_FREELIST_ONLY); 
         4899  +              pLvl=pLvl->pNext
         4900  +          ){
         4901  +            assert( pLvl->nRight==0 );
         4902  +            nFree++;
         4903  +          }
         4904  +          if( pLvl==mergeworker.pLevel ){
         4905  +
         4906  +            rc = mergeInsertFreelistSegments(pDb, nFree, &mergeworker);
         4907  +            if( rc==LSM_OK ){
         4908  +              rc = multiCursorVisitFreelist(mergeworker.pCsr);
         4909  +            }
         4910  +            if( rc==LSM_OK ){
         4911  +              rc = multiCursorSetupTree(mergeworker.pCsr, 0);
         4912  +              pDb->pFreelist = &freelist;
         4913  +              pDb->bUseFreelist = 1;
         4914  +            }
         4915  +          }
         4916  +        }
  4533   4917         }
  4534   4918         nRemaining -= LSM_MAX(mergeworker.nWork, 1);
  4535   4919   
  4536         -      /* Check if the merge operation is completely finished. If so, the
  4537         -      ** Merge object and the right-hand-side of the level can be deleted. 
  4538         -      **
  4539         -      ** Otherwise, gobble up (declare eligible for recycling) any pages
  4540         -      ** from rhs segments for which the content has been completely merged
  4541         -      ** into the lhs of the level.
  4542         -      */
  4543   4920         if( rc==LSM_OK ){
         4921  +        /* Check if the merge operation is completely finished. If not,
         4922  +        ** gobble up (declare eligible for recycling) any pages from rhs
         4923  +        ** segments for which the content has been completely merged into 
         4924  +        ** the lhs of the level.  */
  4544   4925           if( mergeWorkerDone(&mergeworker)==0 ){
  4545   4926             int i;
  4546   4927             for(i=0; i<pLevel->nRight; i++){
  4547   4928               SegmentPtr *pGobble = &mergeworker.pCsr->aPtr[i];
  4548   4929               if( pGobble->pSeg->iRoot ){
  4549   4930                 rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
  4550   4931               }else if( mergeworker.aGobble[i] ){
  4551   4932                 lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
  4552   4933               }
  4553   4934             }
  4554         -        }else if( pLevel->lhs.iFirst==0 ){
  4555         -          /* If the new level is completely empty, remove it from the 
  4556         -          ** database snapshot. This can only happen if all input keys were
  4557         -          ** annihilated. Since keys are only annihilated if the new level
  4558         -          ** is the last in the linked list (contains the most ancient of
  4559         -          ** database content), this guarantees that pLevel->pNext==0.  */ 
  4560         -
  4561         -          Level *pTop;          /* Top level of worker snapshot */
  4562         -          Level **pp;           /* Read/write iterator for Level.pNext list */
         4935  +        }else{
  4563   4936             int i;
  4564         -          assert( pLevel->pNext==0 );
         4937  +          int bEmpty;
         4938  +          mergeWorkerShutdown(&mergeworker, &rc);
         4939  +          bEmpty = (pLevel->lhs.iFirst==0);
  4565   4940   
  4566         -          /* Remove the level from the worker snapshot. */
  4567         -          pTop = lsmDbSnapshotLevel(pWorker);
  4568         -          for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
  4569         -          *pp = pLevel->pNext;
  4570         -          lsmDbSnapshotSetLevel(pWorker, pTop);
         4941  +          if( bEmpty==0 && rc==LSM_OK ){
         4942  +            rc = lsmFsSortedFinish(pDb->pFS, &pLevel->lhs);
         4943  +          }
  4571   4944   
  4572         -          /* Free the Level structure. */
  4573         -          lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->lhs);
         4945  +          if( pDb->bUseFreelist ){
         4946  +            Freelist *p = &pDb->pWorker->freelist;
         4947  +            lsmFree(pDb->pEnv, p->aEntry);
         4948  +            memcpy(p, &freelist, sizeof(freelist));
         4949  +            pDb->bUseFreelist = 0;
         4950  +            pDb->pFreelist = 0;
         4951  +            bSave = 1;
         4952  +          }
         4953  +
  4574   4954             for(i=0; i<pLevel->nRight; i++){
  4575   4955               lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
  4576   4956             }
  4577         -          sortedFreeLevel(pDb->pEnv, pLevel);
  4578         -        }else{
  4579         -          int i;
  4580   4957   
  4581         -          /* Free the separators of the next level, if required. */
  4582         -          if( pLevel->pMerge->nInput > pLevel->nRight ){
  4583         -            assert( pLevel->pNext->lhs.iRoot );
  4584         -            pLevel->pNext->lhs.iRoot = 0;
         4958  +          if( bEmpty ){
         4959  +            /* If the new level is completely empty, remove it from the 
         4960  +            ** database snapshot. This can only happen if all input keys were
         4961  +            ** annihilated. Since keys are only annihilated if the new level
         4962  +            ** is the last in the linked list (contains the most ancient of
         4963  +            ** database content), this guarantees that pLevel->pNext==0.  */ 
         4964  +            Level *pTop;          /* Top level of worker snapshot */
         4965  +            Level **pp;           /* Read/write iterator for Level.pNext list */
         4966  +
         4967  +            assert( pLevel->pNext==0 );
         4968  +
         4969  +            /* Remove the level from the worker snapshot. */
         4970  +            pTop = lsmDbSnapshotLevel(pWorker);
         4971  +            for(pp=&pTop; *pp!=pLevel; pp=&((*pp)->pNext));
         4972  +            *pp = pLevel->pNext;
         4973  +            lsmDbSnapshotSetLevel(pWorker, pTop);
         4974  +
         4975  +            /* Free the Level structure. */
         4976  +            sortedFreeLevel(pDb->pEnv, pLevel);
         4977  +          }else{
         4978  +
         4979  +            /* Free the separators of the next level, if required. */
         4980  +            if( pLevel->pMerge->nInput > pLevel->nRight ){
         4981  +              assert( pLevel->pNext->lhs.iRoot );
         4982  +              pLevel->pNext->lhs.iRoot = 0;
         4983  +            }
         4984  +
         4985  +            /* Zero the right-hand-side of pLevel */
         4986  +            lsmFree(pDb->pEnv, pLevel->aRhs);
         4987  +            pLevel->nRight = 0;
         4988  +            pLevel->aRhs = 0;
         4989  +
         4990  +            /* Free the Merge object */
         4991  +            lsmFree(pDb->pEnv, pLevel->pMerge);
         4992  +            pLevel->pMerge = 0;
  4585   4993             }
  4586   4994   
  4587         -          /* Free the right-hand-side of pLevel */
  4588         -          for(i=0; i<pLevel->nRight; i++){
  4589         -            lsmFsSortedDelete(pDb->pFS, pWorker, 1, &pLevel->aRhs[i]);
         4995  +          if( bSave && rc==LSM_OK ){
         4996  +            pDb->bIncrMerge = 0;
         4997  +            rc = lsmSaveWorker(pDb, 0);
  4590   4998             }
  4591         -          lsmFree(pDb->pEnv, pLevel->aRhs);
  4592         -          pLevel->nRight = 0;
  4593         -          pLevel->aRhs = 0;
  4594         -
  4595         -          /* Free the Merge object */
  4596         -          lsmFree(pDb->pEnv, pLevel->pMerge);
  4597         -          pLevel->pMerge = 0;
  4598   4999           }
  4599   5000         }
  4600   5001   
  4601   5002         /* Clean up the MergeWorker object initialized above. If no error
  4602   5003         ** has occurred, invoke the work-hook to inform the application that
  4603   5004         ** the database structure has changed. */
  4604   5005         mergeWorkerShutdown(&mergeworker, &rc);
         5006  +      pDb->bIncrMerge = 0;
  4605   5007         if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);
  4606   5008   
  4607         -#if 0
  4608         -      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
         5009  +#if LSM_LOG_STRUCTURE
         5010  +      lsmSortedDumpStructure(pDb, pDb->pWorker, LSM_LOG_DATA, 0, "work");
  4609   5011   #endif
  4610   5012         assertBtreeOk(pDb, &pLevel->lhs);
  4611   5013         assertRunInOrder(pDb, &pLevel->lhs);
  4612   5014   
  4613   5015         /* If bFlush is true and the database is no longer considered "full",
  4614   5016         ** break out of the loop even if nRemaining is still greater than
  4615   5017         ** zero. The caller has an in-memory tree to flush to disk.  */
................................................................................
  4649   5051         bRet = 0;
  4650   5052       }
  4651   5053       *pRc = rc;
  4652   5054     }
  4653   5055     assert( *pRc==LSM_OK || bRet==0 );
  4654   5056     return bRet;
  4655   5057   }
         5058  +
         5059  +/*
         5060  +** Create a new free-list only top-level segment. Return LSM_OK if successful
         5061  +** or an LSM error code if some error occurs.
         5062  +*/
         5063  +static int sortedNewFreelistOnly(lsm_db *pDb){
         5064  +  return sortedNewToplevel(pDb, TREE_NONE, 0);
         5065  +}
  4656   5066   
  4657   5067   int lsmSaveWorker(lsm_db *pDb, int bFlush){
  4658   5068     Snapshot *p = pDb->pWorker;
  4659   5069     if( p->freelist.nEntry>pDb->nMaxFreelist ){
  4660         -    int rc = sortedNewToplevel(pDb, TREE_NONE, 0);
         5070  +    int rc = sortedNewFreelistOnly(pDb);
  4661   5071       if( rc!=LSM_OK ) return rc;
  4662   5072     }
  4663   5073     return lsmCheckpointSaveWorker(pDb, bFlush);
  4664   5074   }
  4665   5075   
  4666   5076   static int doLsmSingleWork(
  4667   5077     lsm_db *pDb, 
  4668   5078     int bShutdown,
  4669   5079     int nMerge,                     /* Minimum segments to merge together */
  4670   5080     int nPage,                      /* Number of pages to write to disk */
  4671   5081     int *pnWrite,                   /* OUT: Pages actually written to disk */
  4672   5082     int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
  4673   5083   ){
         5084  +  Snapshot *pWorker;              /* Worker snapshot */
  4674   5085     int rc = LSM_OK;                /* Return code */
  4675   5086     int bDirty = 0;
  4676   5087     int nMax = nPage;               /* Maximum pages to write to disk */
  4677   5088     int nRem = nPage;
  4678   5089     int bCkpt = 0;
         5090  +
         5091  +  assert( nPage>0 );
  4679   5092   
  4680   5093     /* Open the worker 'transaction'. It will be closed before this function
  4681   5094     ** returns.  */
  4682   5095     assert( pDb->pWorker==0 );
  4683   5096     rc = lsmBeginWork(pDb);
  4684   5097     if( rc!=LSM_OK ) return rc;
         5098  +  pWorker = pDb->pWorker;
  4685   5099   
  4686   5100     /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  4687   5101     ** that this call stops writing when the auto-checkpoint is due. The
  4688   5102     ** caller will do the checkpoint, then possibly call this function again. */
  4689   5103     if( bShutdown==0 && pDb->nAutockpt ){
  4690   5104       u32 nSync;
  4691   5105       u32 nUnsync;
................................................................................
  4747   5161     if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
  4748   5162       int nPg = 0;
  4749   5163       while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
  4750   5164         rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
  4751   5165         nRem -= nPg;
  4752   5166       }
  4753   5167       if( rc==LSM_OK ){
  4754         -      rc = sortedNewToplevel(pDb, TREE_NONE, &nPg);
         5168  +      rc = sortedNewFreelistOnly(pDb);
  4755   5169       }
  4756   5170       nRem -= nPg;
  4757   5171       if( nPg ) bDirty = 1;
  4758   5172     }
         5173  +
         5174  +  if( rc==LSM_OK ){
         5175  +    *pnWrite = (nMax - nRem);
         5176  +    *pbCkpt = (bCkpt && nRem<=0);
         5177  +    if( nMerge==1 && pDb->nAutockpt>0 && *pnWrite>0
         5178  +     && pWorker->pLevel 
         5179  +     && pWorker->pLevel->nRight==0 
         5180  +     && pWorker->pLevel->pNext==0 
         5181  +    ){
         5182  +      *pbCkpt = 1;
         5183  +    }
         5184  +  }
  4759   5185   
  4760   5186     if( rc==LSM_OK && bDirty ){
  4761   5187       lsmFinishWork(pDb, 0, &rc);
  4762   5188     }else{
  4763   5189       int rcdummy = LSM_BUSY;
  4764   5190       lsmFinishWork(pDb, 0, &rcdummy);
  4765         -  }
  4766         -  assert( pDb->pWorker==0 );
  4767         -
  4768         -  if( rc==LSM_OK ){
  4769         -    *pnWrite = (nMax - nRem);
  4770         -    *pbCkpt = (bCkpt && nRem<=0);
  4771         -  }else{
  4772   5191       *pnWrite = 0;
  4773         -    *pbCkpt = 0;
  4774   5192     }
  4775         -
         5193  +  assert( pDb->pWorker==0 );
  4776   5194     return rc;
  4777   5195   }
  4778   5196   
  4779   5197   static int doLsmWork(lsm_db *pDb, int nMerge, int nPage, int *pnWrite){
  4780         -  int rc;
  4781         -  int nWrite = 0;
  4782         -  int bCkpt = 0;
         5198  +  int rc = LSM_OK;                /* Return code */
         5199  +  int nWrite = 0;                 /* Number of pages written */
  4783   5200   
  4784   5201     assert( nMerge>=1 );
  4785         -  do {
  4786         -    int nThis = 0;
  4787         -    bCkpt = 0;
  4788         -    rc = doLsmSingleWork(pDb, 0, nMerge, nPage-nWrite, &nThis, &bCkpt);
  4789         -    nWrite += nThis;
  4790         -    if( rc==LSM_OK && bCkpt ){
  4791         -      rc = lsm_checkpoint(pDb, 0);
  4792         -    }
  4793         -  }while( rc==LSM_OK && (nWrite<nPage && bCkpt) );
         5202  +
         5203  +  if( nPage>0 ){
         5204  +    int bCkpt = 0;
         5205  +    do {
         5206  +      int nThis = 0;
         5207  +      bCkpt = 0;
         5208  +      rc = doLsmSingleWork(pDb, 0, nMerge, nPage-nWrite, &nThis, &bCkpt);
         5209  +      nWrite += nThis;
         5210  +      if( rc==LSM_OK && bCkpt ){
         5211  +        rc = lsm_checkpoint(pDb, 0);
         5212  +      }
         5213  +    }while( rc==LSM_OK && (nWrite<nPage && bCkpt) );
         5214  +  }
  4794   5215   
  4795   5216     if( pnWrite ){
  4796   5217       if( rc==LSM_OK ){
  4797   5218         *pnWrite = nWrite;
  4798   5219       }else{
  4799   5220         *pnWrite = 0;
  4800   5221       }
................................................................................
  5018   5439       eType = *aCell++;
  5019   5440       assert( (flags & SEGMENT_BTREE_FLAG) || eType!=0 );
  5020   5441       aCell += lsmVarintGet32(aCell, &iPgPtr);
  5021   5442   
  5022   5443       if( eType==0 ){
  5023   5444         Pgno iRef;                  /* Page number of referenced page */
  5024   5445         aCell += lsmVarintGet64(aCell, &iRef);
  5025         -      lsmFsDbPageGet(pDb->pFS, iRef, &pRef);
  5026         -      aKey = pageGetKey(pRef, 0, &iTopic, &nKey, &blob);
         5446  +      lsmFsDbPageGet(pDb->pFS, pRun, iRef, &pRef);
         5447  +      aKey = pageGetKey(pRun, pRef, 0, &iTopic, &nKey, &blob);
  5027   5448       }else{
  5028   5449         aCell += lsmVarintGet32(aCell, &nKey);
  5029   5450         if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
  5030         -      sortedReadData(pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob);
         5451  +      sortedReadData(0, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob);
  5031   5452         aVal = &aKey[nKey];
  5032   5453         iTopic = eType;
  5033   5454       }
  5034   5455   
  5035   5456       lsmStringAppendf(&s, "%s%2X:", (i==0?"":" "), iTopic);
  5036   5457       for(iChar=0; iChar<nKey; iChar++){
  5037   5458         lsmStringAppendf(&s, "%c", isalnum(aKey[iChar]) ? aKey[iChar] : '.');
................................................................................
  5051   5472     lsmLogMessage(pDb, LSM_OK, "      Page %d: %s", lsmFsPageNumber(pPg), s.z);
  5052   5473     lsmStringClear(&s);
  5053   5474   
  5054   5475     sortedBlobFree(&blob);
  5055   5476   }
  5056   5477   
  5057   5478   static void infoCellDump(
  5058         -  lsm_db *pDb,
         5479  +  lsm_db *pDb,                    /* Database handle */
         5480  +  Segment *pSeg,                  /* Segment page belongs to */
  5059   5481     int bIndirect,                  /* True to follow indirect refs */
  5060   5482     Page *pPg,
  5061   5483     int iCell,
  5062   5484     int *peType,
  5063   5485     int *piPgPtr,
  5064   5486     u8 **paKey, int *pnKey,
  5065   5487     u8 **paVal, int *pnVal,
................................................................................
  5080   5502     aCell += lsmVarintGet32(aCell, &iPgPtr);
  5081   5503   
  5082   5504     if( eType==0 ){
  5083   5505       int dummy;
  5084   5506       Pgno iRef;                  /* Page number of referenced page */
  5085   5507       aCell += lsmVarintGet64(aCell, &iRef);
  5086   5508       if( bIndirect ){
  5087         -      lsmFsDbPageGet(pDb->pFS, iRef, &pRef);
  5088         -      pageGetKeyCopy(pDb->pEnv, pRef, 0, &dummy, pBlob);
         5509  +      lsmFsDbPageGet(pDb->pFS, pSeg, iRef, &pRef);
         5510  +      pageGetKeyCopy(pDb->pEnv, pSeg, pRef, 0, &dummy, pBlob);
  5089   5511         aKey = (u8 *)pBlob->pData;
  5090   5512         nKey = pBlob->nData;
  5091   5513         lsmFsPageRelease(pRef);
  5092   5514       }else{
  5093   5515         aKey = (u8 *)"<indirect>";
  5094   5516         nKey = 11;
  5095   5517       }
  5096   5518     }else{
  5097   5519       aCell += lsmVarintGet32(aCell, &nKey);
  5098   5520       if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
  5099         -    sortedReadData(pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob);
         5521  +    sortedReadData(pSeg, pPg, (aCell-aData), nKey+nVal, (void **)&aKey, pBlob);
  5100   5522       aVal = &aKey[nKey];
  5101   5523     }
  5102   5524   
  5103   5525     if( peType ) *peType = eType;
  5104   5526     if( piPgPtr ) *piPgPtr = iPgPtr;
  5105   5527     if( paKey ) *paKey = aKey;
  5106   5528     if( paVal ) *paVal = aVal;
................................................................................
  5131   5553     int flags,
  5132   5554     char **pzOut                    /* OUT: lsmMalloc'd string */
  5133   5555   ){
  5134   5556     int rc = LSM_OK;                /* Return code */
  5135   5557     Page *pPg = 0;                  /* Handle for page iPg */
  5136   5558     int i, j;                       /* Loop counters */
  5137   5559     const int perLine = 16;         /* Bytes per line in the raw hex dump */
         5560  +  Segment *pSeg = 0;
         5561  +  Snapshot *pSnap;
  5138   5562   
  5139   5563     int bValues = (flags & INFO_PAGE_DUMP_VALUES);
  5140   5564     int bHex = (flags & INFO_PAGE_DUMP_HEX);
  5141   5565     int bData = (flags & INFO_PAGE_DUMP_DATA);
  5142   5566     int bIndirect = (flags & INFO_PAGE_DUMP_INDIRECT);
  5143   5567   
  5144   5568     *pzOut = 0;
  5145   5569     if( iPg==0 ) return LSM_ERROR;
  5146   5570   
  5147         -  rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg);
         5571  +  assert( pDb->pClient || pDb->pWorker );
         5572  +  pSnap = pDb->pClient;
         5573  +  if( pSnap==0 ) pSnap = pDb->pWorker;
         5574  +  if( pSnap->redirect.n>0 ){
         5575  +    Level *pLvl;
         5576  +    int bUse = 0;
         5577  +    for(pLvl=pSnap->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
         5578  +    pSeg = (pLvl->nRight==0 ? &pLvl->lhs : &pLvl->aRhs[pLvl->nRight-1]);
         5579  +    rc = lsmFsSegmentContainsPg(pDb->pFS, pSeg, iPg, &bUse);
         5580  +    if( bUse==0 ){
         5581  +      pSeg = 0;
         5582  +    }
         5583  +  }
         5584  +
         5585  +  /* iPg is a real page number (not subject to redirection). So it is safe 
         5586  +  ** to pass a NULL in place of the segment pointer as the second argument
         5587  +  ** to lsmFsDbPageGet() here.  */
         5588  +  if( rc==LSM_OK ){
         5589  +    rc = lsmFsDbPageGet(pDb->pFS, 0, iPg, &pPg);
         5590  +  }
         5591  +
  5148   5592     if( rc==LSM_OK ){
  5149   5593       Blob blob = {0, 0, 0, 0};
  5150   5594       int nKeyWidth = 0;
  5151   5595       LsmString str;
  5152   5596       int nRec;
  5153   5597       int iPtr;
  5154   5598       int flags;
................................................................................
  5165   5609       lsmStringAppendf(&str, "nRec : %d\n", nRec);
  5166   5610       lsmStringAppendf(&str, "iPtr : %d\n", iPtr);
  5167   5611       lsmStringAppendf(&str, "flags: %04x\n", flags);
  5168   5612       lsmStringAppendf(&str, "\n");
  5169   5613   
  5170   5614       for(iCell=0; iCell<nRec; iCell++){
  5171   5615         int nKey;
  5172         -      infoCellDump(pDb, bIndirect, pPg, iCell, 0, 0, 0, &nKey, 0, 0, &blob);
         5616  +      infoCellDump(
         5617  +          pDb, pSeg, bIndirect, pPg, iCell, 0, 0, 0, &nKey, 0, 0, &blob
         5618  +      );
  5173   5619         if( nKey>nKeyWidth ) nKeyWidth = nKey;
  5174   5620       }
  5175   5621       if( bHex ) nKeyWidth = nKeyWidth * 2;
  5176   5622   
  5177   5623       for(iCell=0; iCell<nRec; iCell++){
  5178   5624         u8 *aKey; int nKey = 0;       /* Key */
  5179   5625         u8 *aVal; int nVal = 0;       /* Value */
  5180   5626         int iPgPtr;
  5181   5627         int eType;
  5182         -      char cType = '?';
  5183   5628         Pgno iAbsPtr;
  5184   5629         char zFlags[8];
  5185   5630   
  5186         -      infoCellDump(pDb, bIndirect, pPg, iCell, &eType, &iPgPtr,
         5631  +      infoCellDump(pDb, pSeg, bIndirect, pPg, iCell, &eType, &iPgPtr,
  5187   5632             &aKey, &nKey, &aVal, &nVal, &blob
  5188   5633         );
  5189   5634         iAbsPtr = iPgPtr + ((flags & SEGMENT_BTREE_FLAG) ? 0 : iPtr);
  5190   5635   
  5191   5636         lsmFlagsToString(eType, zFlags);
  5192   5637         lsmStringAppendf(&str, "%s %d (%s) ", 
  5193   5638             zFlags, iAbsPtr, (rtTopic(eType) ? "sys" : "usr")
................................................................................
  5262   5707       char *zSeg;
  5263   5708       Page *pPg;
  5264   5709   
  5265   5710       zSeg = segToString(pDb->pEnv, pRun, 0);
  5266   5711       lsmLogMessage(pDb, LSM_OK, "Segment: %s", zSeg);
  5267   5712       lsmFree(pDb->pEnv, zSeg);
  5268   5713   
  5269         -    lsmFsDbPageGet(pDb->pFS, pRun->iFirst, &pPg);
         5714  +    lsmFsDbPageGet(pDb->pFS, pRun, pRun->iFirst, &pPg);
  5270   5715       while( pPg ){
  5271   5716         Page *pNext;
  5272   5717         char *z = 0;
  5273   5718         infoPageDump(pDb, lsmFsPageNumber(pPg), flags, &z);
  5274   5719         lsmLogMessage(pDb, LSM_OK, "%s", z);
  5275   5720         lsmFree(pDb->pEnv, z);
  5276   5721   #if 0
................................................................................
  5292   5737     Snapshot *pSnap,                /* Snapshot to dump */
  5293   5738     int bKeys,                      /* Output the keys from each segment */
  5294   5739     int bVals,                      /* Output the values from each segment */
  5295   5740     const char *zWhy                /* Caption to print near top of dump */
  5296   5741   ){
  5297   5742     Snapshot *pDump = pSnap;
  5298   5743     Level *pTopLevel;
         5744  +  char *zFree = 0;
         5745  +
  5299   5746   
  5300   5747     assert( pSnap );
  5301   5748     pTopLevel = lsmDbSnapshotLevel(pDump);
  5302   5749     if( pDb->xLog && pTopLevel ){
         5750  +    static int nCall = 0;
  5303   5751       Level *pLevel;
  5304   5752       int iLevel = 0;
  5305   5753   
  5306         -    lsmLogMessage(pDb, LSM_OK, "Database structure (%s)", zWhy);
         5754  +    nCall++;
         5755  +    lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);
         5756  +
         5757  +#if 0
         5758  +    if( nCall==1031 || nCall==1032 ) bKeys=1;
         5759  +#endif
  5307   5760   
  5308   5761       for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
  5309   5762         char zLeft[1024];
  5310   5763         char zRight[1024];
  5311   5764         int i = 0;
  5312   5765   
  5313   5766         Segment *aLeft[24];  
................................................................................
  5370   5823           sortedDumpSegment(pDb, &pLevel->lhs, bVals);
  5371   5824           for(i=0; i<pLevel->nRight; i++){
  5372   5825             sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
  5373   5826           }
  5374   5827         }
  5375   5828       }
  5376   5829     }
         5830  +
         5831  +  lsmInfoFreelist(pDb, &zFree);
         5832  +  lsmLogMessage(pDb, LSM_OK, "Freelist: %s", zFree);
         5833  +  lsmFree(pDb->pEnv, zFree);
         5834  +
         5835  +  assert( lsmFsIntegrityCheck(pDb) );
  5377   5836   }
  5378   5837   
  5379   5838   void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
  5380   5839     Level *pNext;
  5381   5840     Level *p;
  5382   5841   
  5383   5842     for(p=pLevel; p; p=pNext){
................................................................................
  5408   5867   
  5409   5868   #ifdef LSM_DEBUG_EXPENSIVE
  5410   5869   static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){
  5411   5870     Page *pPg = 0;
  5412   5871     Blob blob1 = {0, 0, 0, 0};
  5413   5872     Blob blob2 = {0, 0, 0, 0};
  5414   5873   
  5415         -  lsmFsDbPageGet(pDb->pFS, pSeg->iFirst, &pPg);
         5874  +  lsmFsDbPageGet(pDb->pFS, pSeg, pSeg->iFirst, &pPg);
  5416   5875     while( pPg ){
  5417   5876       u8 *aData; int nData;
  5418   5877       Page *pNext;
  5419   5878   
  5420   5879       aData = lsmFsPageData(pPg, &nData);
  5421   5880       if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){
  5422   5881         int i;
  5423   5882         int nRec = pageGetNRec(aData, nData);
  5424   5883         for(i=0; i<nRec; i++){
  5425   5884           int iTopic1, iTopic2;
  5426         -        pageGetKeyCopy(pDb->pEnv, pPg, i, &iTopic1, &blob1);
         5885  +        pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i, &iTopic1, &blob1);
  5427   5886   
  5428   5887           if( i==0 && blob2.nData ){
  5429   5888             assert( sortedKeyCompare(
  5430   5889                   pDb->xCmp, iTopic2, blob2.pData, blob2.nData,
  5431   5890                   iTopic1, blob1.pData, blob1.nData
  5432   5891             )<0 );
  5433   5892           }
  5434   5893   
  5435   5894           if( i<(nRec-1) ){
  5436         -          pageGetKeyCopy(pDb->pEnv, pPg, i+1, &iTopic2, &blob2);
         5895  +          pageGetKeyCopy(pDb->pEnv, pSeg, pPg, i+1, &iTopic2, &blob2);
  5437   5896             assert( sortedKeyCompare(
  5438   5897                   pDb->xCmp, iTopic1, blob1.pData, blob1.nData,
  5439   5898                   iTopic2, blob2.pData, blob2.nData
  5440   5899             )<0 );
  5441   5900           }
  5442   5901         }
  5443   5902       }
................................................................................
  5563   6022       BtreeCursor *pCsr = 0;        /* Btree cursor */
  5564   6023   
  5565   6024       rc = btreeCursorNew(pDb, pSeg, &pCsr);
  5566   6025       if( rc==LSM_OK ){
  5567   6026         rc = btreeCursorFirst(pCsr);
  5568   6027       }
  5569   6028       if( rc==LSM_OK ){
  5570         -      rc = lsmFsDbPageGet(pFS, pSeg->iFirst, &pPg);
         6029  +      rc = lsmFsDbPageGet(pFS, pSeg, pSeg->iFirst, &pPg);
  5571   6030       }
  5572   6031   
  5573   6032       while( rc==LSM_OK ){
  5574   6033         Page *pNext;
  5575   6034         u8 *aData;
  5576   6035         int nData;
  5577   6036         int flags;
................................................................................
  5585   6044         if( rc==LSM_OK 
  5586   6045          && 0==((SEGMENT_BTREE_FLAG|PGFTR_SKIP_THIS_FLAG) & flags)
  5587   6046          && 0!=pageGetNRec(aData, nData)
  5588   6047         ){
  5589   6048           u8 *pKey;
  5590   6049           int nKey;
  5591   6050           int iTopic;
  5592         -        pKey = pageGetKey(pPg, 0, &iTopic, &nKey, &blob);
         6051  +        pKey = pageGetKey(pSeg, pPg, 0, &iTopic, &nKey, &blob);
  5593   6052           assert( nKey==pCsr->nKey && 0==memcmp(pKey, pCsr->pKey, nKey) );
  5594   6053           assert( lsmFsPageNumber(pPg)==pCsr->iPtr );
  5595   6054           rc = btreeCursorNext(pCsr);
  5596   6055         }
  5597   6056       }
  5598   6057       assert( rc!=LSM_OK || pCsr->pKey==0 );
  5599   6058   

Changes to tool/lsmview.tcl.

    39     39     }
    40     40     namespace export scrollable
    41     41   }
    42     42   namespace import ::autoscroll::*
    43     43   #############################################################################
    44     44   
    45     45   proc exec_lsmtest_show {args} {
    46         -  set fd [open [list |lsmtest show {*}$args]]
           46  +  set fd [open [list |lsmtest show {*}$args 2>/dev/null]]
    47     47     set res ""
    48     48     while {![eof $fd]} { 
    49     49       set line [gets $fd]
    50     50       if {[regexp {^\#.*} $line]} continue
    51     51       if {[regexp {^Leaked*} $line]} continue
    52     52       append res $line
    53     53       append res "\n"