SQLite4
Check-in [db2407a7af]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change lsm_sorted.c so that it does not use the Segment.iLastPg variable directly. This variables meaning is slightly different for compressed databases.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: db2407a7af0fd4dfdb2dca021b4f871ddf3410ca
User & Date: dan 2012-10-20 15:57:54
Context
2012-10-22
20:05
Add some code to support compressed databases to lsm_file.c. Does not currently work. check-in: 3c45b911fe user: dan tags: compression-hooks
2012-10-20
15:57
Change lsm_sorted.c so that it does not use the Segment.iLastPg variable directly. This variables meaning is slightly different for compressed databases. check-in: db2407a7af user: dan tags: compression-hooks
2012-10-19
16:16
Further changes to ensure that a pages page number is not required until after its content has been assembled. check-in: c03eeda99f user: dan tags: compression-hooks
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
...
626
627
628
629
630
631
632

633
634
635
636
637
638
639
...
643
644
645
646
647
648
649

650
651
652
653
654
655
656
  ShmHeader *pShmhdr;             /* Live shared-memory header */
  TreeHeader treehdr;             /* Local copy of tree-header */
  u32 aSnapshot[LSM_META_PAGE_SIZE / sizeof(u32)];
};

struct Segment {
  Pgno iFirst;                     /* First page of this run */
  Pgno iLast;                      /* Last page of this run */
  Pgno iRoot;                      /* Root page number (if any) */
  int nSize;                       /* Size of this run in pages */
};

/*
** iSplitTopic/pSplitKey/nSplitKey:
**   If nRight>0, this buffer contains a copy of the largest key that has
................................................................................
FileSystem *lsmPageFS(Page *);

int lsmFsSectorSize(FileSystem *);

void lsmSortedSplitkey(lsm_db *, Level *, int *);

/* Reading sorted run content. */

int lsmFsDbPageGet(FileSystem *, Pgno, Page **);
int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **);

u8 *lsmFsPageData(Page *, int *);
int lsmFsPageRelease(Page *);
int lsmFsPagePersist(Page *);
void lsmFsPageRef(Page *);
................................................................................
int lsmFsNWrite(FileSystem *);

int lsmFsMetaPageGet(FileSystem *, int, int, MetaPage **);
int lsmFsMetaPageRelease(MetaPage *);
u8 *lsmFsMetaPageData(MetaPage *, int *);

#ifdef LSM_DEBUG

int lsmFsIntegrityCheck(lsm_db *);
#endif

int lsmFsPageWritable(Page *);

/* Functions to read, write and sync the log file. */
int lsmFsWriteLog(FileSystem *pFS, i64 iOff, LsmString *pStr);







|







 







>







 







>







338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
...
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
...
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
  ShmHeader *pShmhdr;             /* Live shared-memory header */
  TreeHeader treehdr;             /* Local copy of tree-header */
  u32 aSnapshot[LSM_META_PAGE_SIZE / sizeof(u32)];
};

struct Segment {
  Pgno iFirst;                     /* First page of this run */
  Pgno iLastPg;                    /* Last page of this run */
  Pgno iRoot;                      /* Root page number (if any) */
  int nSize;                       /* Size of this run in pages */
};

/*
** iSplitTopic/pSplitKey/nSplitKey:
**   If nRight>0, this buffer contains a copy of the largest key that has
................................................................................
FileSystem *lsmPageFS(Page *);

int lsmFsSectorSize(FileSystem *);

void lsmSortedSplitkey(lsm_db *, Level *, int *);

/* Reading sorted run content. */
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg);
int lsmFsDbPageGet(FileSystem *, Pgno, Page **);
int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **);

u8 *lsmFsPageData(Page *, int *);
int lsmFsPageRelease(Page *);
int lsmFsPagePersist(Page *);
void lsmFsPageRef(Page *);
................................................................................
int lsmFsNWrite(FileSystem *);

int lsmFsMetaPageGet(FileSystem *, int, int, MetaPage **);
int lsmFsMetaPageRelease(MetaPage *);
u8 *lsmFsMetaPageData(MetaPage *, int *);

#ifdef LSM_DEBUG
int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg);
int lsmFsIntegrityCheck(lsm_db *);
#endif

int lsmFsPageWritable(Page *);

/* Functions to read, write and sync the log file. */
int lsmFsWriteLog(FileSystem *pFS, i64 iOff, LsmString *pStr);

Changes to src/lsm_ckpt.c.

271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
...
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
  CkptBuffer *p, 
  int *piOut, 
  int *pRc
){
  int iOut = *piOut;

  ckptSetValue(p, iOut++, pSeg->iFirst, pRc);
  ckptSetValue(p, iOut++, pSeg->iLast, pRc);
  ckptSetValue(p, iOut++, pSeg->iRoot, pRc);
  ckptSetValue(p, iOut++, pSeg->nSize, pRc);

  *piOut = iOut;
}

static void ckptExportLevel(
................................................................................
static void ckptNewSegment(
  u32 *aIn,
  int *piIn,
  Segment *pSegment               /* Populate this structure */
){
  int iIn = *piIn;

  assert( pSegment->iFirst==0 && pSegment->iLast==0 );
  assert( pSegment->nSize==0 && pSegment->iRoot==0 );
  pSegment->iFirst = aIn[iIn++];
  pSegment->iLast = aIn[iIn++];
  pSegment->iRoot = aIn[iIn++];
  pSegment->nSize = aIn[iIn++];

  *piIn = iIn;
}

static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){







|







 







|


|







271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
...
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
  CkptBuffer *p, 
  int *piOut, 
  int *pRc
){
  int iOut = *piOut;

  ckptSetValue(p, iOut++, pSeg->iFirst, pRc);
  ckptSetValue(p, iOut++, pSeg->iLastPg, pRc);
  ckptSetValue(p, iOut++, pSeg->iRoot, pRc);
  ckptSetValue(p, iOut++, pSeg->nSize, pRc);

  *piOut = iOut;
}

static void ckptExportLevel(
................................................................................
static void ckptNewSegment(
  u32 *aIn,
  int *piIn,
  Segment *pSegment               /* Populate this structure */
){
  int iIn = *piIn;

  assert( pSegment->iFirst==0 && pSegment->iLastPg==0 );
  assert( pSegment->nSize==0 && pSegment->iRoot==0 );
  pSegment->iFirst = aIn[iIn++];
  pSegment->iLastPg = aIn[iIn++];
  pSegment->iRoot = aIn[iIn++];
  pSegment->nSize = aIn[iIn++];

  *piIn = iIn;
}

static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){

Changes to src/lsm_file.c.

6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
..
81
82
83
84
85
86
87































88
89
90
91
92
93
94
95
96
97
98
99
100
101
...
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
...
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
....
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
....
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
....
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
....
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
....
1186
1187
1188
1189
1190
1191
1192








1193
1194
1195
1196
1197
1198
1199
....
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
....
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
....
1579
1580
1581
1582
1583
1584
1585










**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
** 
** DATABASE FILE FORMAT
**
** The following database file format concepts are used by the code in
** this file to read and write the database file.
**
** Pages:
**
**   A database file is divided into pages. The first 8KB of the file consists
................................................................................
**     lsmFsOpenLog
**     lsmFsWriteLog
**     lsmFsSyncLog
**     lsmFsReadLog
**     lsmFsTruncateLog
**     lsmFsCloseAndDeleteLog
**































*/
#include "lsmInt.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>


/*
** File-system object. Each database connection allocates a single instance
** of the following structure. It is used for all access to the database and
** log files.
**
** pLruFirst, pLruLast:
................................................................................
  Segment *pRun, 
  Segment *pIgnore, 
  int iFirst, 
  int iLast
){
  return (pRun!=pIgnore && (
        (pRun->iFirst>=iFirst && pRun->iFirst<=iLast)
     || (pRun->iLast>=iFirst && pRun->iLast<=iLast)
  ));
}

static int fsLevelEndsBetween(
  Level *pLevel, 
  Segment *pIgnore, 
  int iFirst, 
................................................................................
  if( pDel->iFirst ){
    int rc = LSM_OK;

    int iBlk;
    int iLastBlk;

    iBlk = fsPageToBlock(pFS, pDel->iFirst);
    iLastBlk = fsPageToBlock(pFS, pDel->iLast);

    /* Mark all blocks currently used by this sorted run as free */
    while( iBlk && rc==LSM_OK ){
      int iNext = 0;
      if( iBlk!=iLastBlk ){
        rc = fsBlockNext(pFS, iBlk, &iNext);
      }else if( bZero==0 && pDel->iLast!=fsLastPageOnBlock(pFS, iLastBlk) ){
        break;
      }
      rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk);
      iBlk = iNext;
    }

    if( bZero ) memset(pDel, 0, sizeof(Segment));
................................................................................
      return LSM_OK;
    }else if( fsIsFirst(pFS, iPg) ){
      iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]);
    }else{
      iPg--;
    }
  }else{
    if( pRun && iPg==pRun->iLast ){
      *ppNext = 0;
      return LSM_OK;
    }else if( fsIsLast(pFS, iPg) ){
      iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]);
    }else{
      iPg++;
    }
................................................................................
  Page **ppOut
){
  int rc = LSM_OK;
  Page *pPg = 0;
  *ppOut = 0;
  int iApp = 0;
  int iNext = 0;
  int iPrev = p->iLast;

  if( iPrev==0 ){
    iApp = findAppendPoint(pFS);
  }else if( fsIsLast(pFS, iPrev) ){
    Page *pLast = 0;
    rc = fsPageGet(pFS, iPrev, 0, &pLast);
    if( rc!=LSM_OK ) return rc;
................................................................................
  rc = fsPageGet(pFS, iApp, 1, &pPg);
  assert( rc==LSM_OK || pPg==0 );

  /* If this is the first or last page of a block, fill in the pointer 
   ** value at the end of the new page. */
  if( rc==LSM_OK ){
    p->nSize++;
    p->iLast = iApp;
    if( p->iFirst==0 ) p->iFirst = iApp;
    pPg->flags |= PAGE_DIRTY;

    if( fsIsLast(pFS, iApp) ){
      lsmPutU32(&pPg->aData[pFS->nPagesize-4], iNext);
    }else 
      if( fsIsFirst(pFS, iApp) ){
................................................................................
    /* Check if the last page of this run happens to be the last of a block.
    ** If it is, then an extra block has already been allocated for this run.
    ** Shift this extra block back to the free-block list. 
    **
    ** Otherwise, add the first free page in the last block used by the run
    ** to the lAppend list.
    */
    if( (p->iLast % nPagePerBlock)==0 ){
      Page *pLast;
      rc = fsPageGet(pFS, p->iLast, 0, &pLast);
      if( rc==LSM_OK ){
        int iPg = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
        int iBlk = fsPageToBlock(pFS, iPg);
        lsmBlockRefree(pFS->pDb, iBlk);
        lsmFsPageRelease(pLast);
      }
    }else{
      int i;
      u32 *aiAppend = pFS->pDb->pWorker->aiAppend;
      for(i=0; i<LSM_APPLIST_SZ; i++){
        if( aiAppend[i]==0 ){
          aiAppend[i] = p->iLast+1;
          break;
        }
      }
    }
  }
  return rc;
}
................................................................................
/*
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}









/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should
** be released by the caller using lsmFsPageRelease().
**
** Otherwise, if an error occurs, *ppPg is set to NULL and an LSM error 
................................................................................
  }else{
    FileSystem *pFS = pDb->pFS;
    LsmString str;
    int iBlk;
    int iLastBlk;
   
    iBlk = fsPageToBlock(pFS, pArray->iFirst);
    iLastBlk = fsPageToBlock(pFS, pArray->iLast);

    lsmStringInit(&str, pDb->pEnv);
    lsmStringAppendf(&str, "%d", pArray->iFirst);
    while( iBlk!=iLastBlk ){
      lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk));
      fsBlockNext(pFS, iBlk, &iBlk);
      lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk));
    }
    lsmStringAppendf(&str, " %d", pArray->iLast);

    *pzOut = str.z;
  }

  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcwork);
................................................................................
  if( pSeg ){
    if( pSeg && pSeg->nSize>0 ){
      const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);

      int iBlk;
      int iLastBlk;
      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLast);

      while( iBlk ){
        assert( iBlk<=nUsed );
        /* assert( aUsed[iBlk-1]==0 ); */
        aUsed[iBlk-1] = 1;
        if( iBlk!=iLastBlk ){
          fsBlockNext(pFS, iBlk, &iBlk);
        }else{
          iBlk = 0;
        }
      }

      if( bExtra && (pSeg->iLast % nPagePerBlock)==0 ){
        fsBlockNext(pFS, iLastBlk, &iBlk);
        aUsed[iBlk-1] = 1;
      }
    }
  }
}

................................................................................

  for(i=0; i<nBlock; i++) assert( aUsed[i]==1 );

  lsmFree(pDb->pEnv, aUsed);
  lsmFree(pDb->pEnv, freelist.aEntry);
  return 1;
}

















|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>






<







 







|







 







|






|







 







|







 







|







 







|







 







|

|











|







 







>
>
>
>
>
>
>
>







 







|








|







 







|












|







 







>
>
>
>
>
>
>
>
>
>
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
..
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

125
126
127
128
129
130
131
...
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
...
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
....
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
....
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
....
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
....
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
....
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
....
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
....
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
....
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
** 
** NORMAL DATABASE FILE FORMAT
**
** The following database file format concepts are used by the code in
** this file to read and write the database file.
**
** Pages:
**
**   A database file is divided into pages. The first 8KB of the file consists
................................................................................
**     lsmFsOpenLog
**     lsmFsWriteLog
**     lsmFsSyncLog
**     lsmFsReadLog
**     lsmFsTruncateLog
**     lsmFsCloseAndDeleteLog
**
** COMPRESSED DATABASE FILE FORMAT
**
** The compressed database file format is very similar to the normal format.
** The file still begins with two 4KB meta-pages (which are never compressed).
** It is still divided into blocks.
**
** The first and last four bytes of each block are reserved for 32-bit 
** pointer values. Similar to the way four bytes are carved from the end of 
** the first and last page of each block in uncompressed databases. From
** the point of view of the upper layer, all pages are the same size - this
** is different from the uncompressed format where the first and last pages
** on each block are 4 bytes smaller than the others.
**
** Pages are stored in variable length compressed form, as follows:
**
**     * Number of bytes in compressed page image, as a varint.
**
**     * Compressed page image.
**
**     * Number of bytes in compressed page image, as a varint. Except,
**       the first byte of the varint is moved so that it is the last
**       byte (i.e. for a 4 byte varint: ABCD -> BCDA). This is done
**       to make it possible to iterate through a packed array of compressed 
**       pages in reverse order.
**
** A page number is a byte offset into the database file. So the smallest
** possible page number is 8192 (immediately after the two meta-pages).
** The first and root page of a segment are identified by a page number
** corresponding to the byte offset of the first byte in the corresponding
** page record. The last page of a segment is identified by the byte offset
** of the last byte in its record.
*/
#include "lsmInt.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>


/*
** File-system object. Each database connection allocates a single instance
** of the following structure. It is used for all access to the database and
** log files.
**
** pLruFirst, pLruLast:
................................................................................
  Segment *pRun, 
  Segment *pIgnore, 
  int iFirst, 
  int iLast
){
  return (pRun!=pIgnore && (
        (pRun->iFirst>=iFirst && pRun->iFirst<=iLast)
     || (pRun->iLastPg>=iFirst && pRun->iLastPg<=iLast)
  ));
}

static int fsLevelEndsBetween(
  Level *pLevel, 
  Segment *pIgnore, 
  int iFirst, 
................................................................................
  if( pDel->iFirst ){
    int rc = LSM_OK;

    int iBlk;
    int iLastBlk;

    iBlk = fsPageToBlock(pFS, pDel->iFirst);
    iLastBlk = fsPageToBlock(pFS, pDel->iLastPg);

    /* Mark all blocks currently used by this sorted run as free */
    while( iBlk && rc==LSM_OK ){
      int iNext = 0;
      if( iBlk!=iLastBlk ){
        rc = fsBlockNext(pFS, iBlk, &iNext);
      }else if( bZero==0 && pDel->iLastPg!=fsLastPageOnBlock(pFS, iLastBlk) ){
        break;
      }
      rc = fsFreeBlock(pFS, pSnapshot, pDel, iBlk);
      iBlk = iNext;
    }

    if( bZero ) memset(pDel, 0, sizeof(Segment));
................................................................................
      return LSM_OK;
    }else if( fsIsFirst(pFS, iPg) ){
      iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]);
    }else{
      iPg--;
    }
  }else{
    if( pRun && iPg==pRun->iLastPg ){
      *ppNext = 0;
      return LSM_OK;
    }else if( fsIsLast(pFS, iPg) ){
      iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]);
    }else{
      iPg++;
    }
................................................................................
  Page **ppOut
){
  int rc = LSM_OK;
  Page *pPg = 0;
  *ppOut = 0;
  int iApp = 0;
  int iNext = 0;
  int iPrev = p->iLastPg;

  if( iPrev==0 ){
    iApp = findAppendPoint(pFS);
  }else if( fsIsLast(pFS, iPrev) ){
    Page *pLast = 0;
    rc = fsPageGet(pFS, iPrev, 0, &pLast);
    if( rc!=LSM_OK ) return rc;
................................................................................
  rc = fsPageGet(pFS, iApp, 1, &pPg);
  assert( rc==LSM_OK || pPg==0 );

  /* If this is the first or last page of a block, fill in the pointer 
   ** value at the end of the new page. */
  if( rc==LSM_OK ){
    p->nSize++;
    p->iLastPg = iApp;
    if( p->iFirst==0 ) p->iFirst = iApp;
    pPg->flags |= PAGE_DIRTY;

    if( fsIsLast(pFS, iApp) ){
      lsmPutU32(&pPg->aData[pFS->nPagesize-4], iNext);
    }else 
      if( fsIsFirst(pFS, iApp) ){
................................................................................
    /* Check if the last page of this run happens to be the last of a block.
    ** If it is, then an extra block has already been allocated for this run.
    ** Shift this extra block back to the free-block list. 
    **
    ** Otherwise, add the first free page in the last block used by the run
    ** to the lAppend list.
    */
    if( (p->iLastPg % nPagePerBlock)==0 ){
      Page *pLast;
      rc = fsPageGet(pFS, p->iLastPg, 0, &pLast);
      if( rc==LSM_OK ){
        int iPg = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
        int iBlk = fsPageToBlock(pFS, iPg);
        lsmBlockRefree(pFS->pDb, iBlk);
        lsmFsPageRelease(pLast);
      }
    }else{
      int i;
      u32 *aiAppend = pFS->pDb->pWorker->aiAppend;
      for(i=0; i<LSM_APPLIST_SZ; i++){
        if( aiAppend[i]==0 ){
          aiAppend[i] = p->iLastPg+1;
          break;
        }
      }
    }
  }
  return rc;
}
................................................................................
/*
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}

/*
** Obtain a reference to the last page in the segment passed as the 
** second argument.
*/
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
  return fsPageGet(pFS, pSeg->iLastPg, 0, ppPg);
}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should
** be released by the caller using lsmFsPageRelease().
**
** Otherwise, if an error occurs, *ppPg is set to NULL and an LSM error 
................................................................................
  }else{
    FileSystem *pFS = pDb->pFS;
    LsmString str;
    int iBlk;
    int iLastBlk;
   
    iBlk = fsPageToBlock(pFS, pArray->iFirst);
    iLastBlk = fsPageToBlock(pFS, pArray->iLastPg);

    lsmStringInit(&str, pDb->pEnv);
    lsmStringAppendf(&str, "%d", pArray->iFirst);
    while( iBlk!=iLastBlk ){
      lsmStringAppendf(&str, " %d", fsLastPageOnBlock(pFS, iBlk));
      fsBlockNext(pFS, iBlk, &iBlk);
      lsmStringAppendf(&str, " %d", fsFirstPageOnBlock(pFS, iBlk));
    }
    lsmStringAppendf(&str, " %d", pArray->iLastPg);

    *pzOut = str.z;
  }

  if( bUnlock ){
    int rcwork = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcwork);
................................................................................
  if( pSeg ){
    if( pSeg && pSeg->nSize>0 ){
      const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);

      int iBlk;
      int iLastBlk;
      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);

      while( iBlk ){
        assert( iBlk<=nUsed );
        /* assert( aUsed[iBlk-1]==0 ); */
        aUsed[iBlk-1] = 1;
        if( iBlk!=iLastBlk ){
          fsBlockNext(pFS, iBlk, &iBlk);
        }else{
          iBlk = 0;
        }
      }

      if( bExtra && (pSeg->iLastPg % nPagePerBlock)==0 ){
        fsBlockNext(pFS, iLastBlk, &iBlk);
        aUsed[iBlk-1] = 1;
      }
    }
  }
}

................................................................................

  for(i=0; i<nBlock; i++) assert( aUsed[i]==1 );

  lsmFree(pDb->pEnv, aUsed);
  lsmFree(pDb->pEnv, freelist.aEntry);
  return 1;
}

#ifndef NDEBUG
/*
** Return true if pPg happens to be the last page in segment pSeg. Or false
** otherwise. This function is only invoked as part of assert() conditions.
*/
int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg){
  return (pPg->iPg==pSeg->iLastPg);
}
#endif

Changes to src/lsm_main.c.

331
332
333
334
335
336
337
338
339
340
341
342
343
344
345

  va_end(ap);
  return rc;
}

void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){
  lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre, 
        pSeg->iFirst, pSeg->iLast, pSeg->iRoot, pSeg->nSize
  );
}

static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){
  int rc = LSM_OK;

  assert( *pbUnlock==0 );







|







331
332
333
334
335
336
337
338
339
340
341
342
343
344
345

  va_end(ap);
  return rc;
}

void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){
  lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre, 
        pSeg->iFirst, pSeg->iLastPg, pSeg->iRoot, pSeg->nSize
  );
}

static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){
  int rc = LSM_OK;

  assert( *pbUnlock==0 );

Changes to src/lsm_sorted.c.

1192
1193
1194
1195
1196
1197
1198
1199


1200

1201
1202
1203
1204
1205
1206
1207
....
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
....
3281
3282
3283
3284
3285
3286
3287


3288
3289
3290
3291
3292
3293
3294
....
3382
3383
3384
3385
3386
3387
3388






































3389
3390
3391
3392
3393
3394
3395
....
3402
3403
3404
3405
3406
3407
3408

3409
3410
3411
3412
3413
3414




3415

3416
3417
3418
3419
3420
3421
3422

3423
3424
3425
3426
3427
3428
3429
....
3437
3438
3439
3440
3441
3442
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
3454
3455
3456
3457
3458
3459
3460
3461
3462
3463
3464
3465
3466
3467
3468
3469
3470
3471
3472
3473
3474
....
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
3599
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
....
3722
3723
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
....
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
....
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835

3836
3837
3838
3839
3840
3841
3842
....
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
....
3962
3963
3964
3965
3966
3967
3968
3969
3970
3971
3972
3973
3974
3975
3976
3977
3978
3979
3980
3981
3982
3983
3984
3985
3986
3987
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
....
4036
4037
4038
4039
4040
4041
4042
4043
4044
4045
4046
4047
4048
4049
4050
4051
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
....
4379
4380
4381
4382
4383
4384
4385

4386
4387
4388
4389
4390
4391
4392
....
4589
4590
4591
4592
4593
4594
4595
4596
4597
4598
4599
4600
4601
4602
4603
  FileSystem *pFS, 
  SegmentPtr *pPtr, 
  int bLast, 
  int *pRc
){
  if( *pRc==LSM_OK ){
    Page *pNew = 0;
    Pgno iPg = (bLast ? pPtr->pSeg->iLast : pPtr->pSeg->iFirst);


    *pRc = lsmFsDbPageGet(pFS, iPg, &pNew);

    segmentPtrSetPage(pPtr, pNew);
  }
}


/*
** Try to move the segment pointer passed as the second argument so that it
................................................................................
  */
#if 0
  assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
#endif

  assert( pPtr->nCell>0 
       || pPtr->pSeg->nSize==1 
       || lsmFsPageNumber(pPtr->pPg)==pPtr->pSeg->iLast
  );
  if( pPtr->nCell==0 ){
    segmentPtrReset(pPtr);
  }else{
    iMin = 0;
    iMax = pPtr->nCell-1;

................................................................................
** (reference pMW->pPage). Set the page number values in aSave[] as 
** required (see comments above struct MergeWorker for details).
*/
static int mergeWorkerPersistAndRelease(MergeWorker *pMW){
  int rc;
  int i;



  /* Persist the page */
  rc = lsmFsPagePersist(pMW->pPage);

  /* If required, save the page number. */
  for(i=0; i<2; i++){
    if( pMW->aSave[i].bStore ){
      pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
................................................................................
      pMerge->iOutputOff = iOff + nCopy;
    }
  }

  return rc;
}








































static int mergeWorkerWrite(
  MergeWorker *pMW,               /* Merge worker object to write into */
  int eType,                      /* One of SORTED_SEPARATOR, WRITE or DELETE */
  void *pKey, int nKey,           /* Key value */
  MultiCursor *pCsr,              /* Read value (if any) from here */
  int iPtr                        /* Absolute value of page pointer, or 0 */
................................................................................
  int nData;                      /* Size of buffer aData[] in bytes */
  int nRec;                       /* Number of records on page pPg */
  int iFPtr;                      /* Value of pointer in footer of pPg */
  int iRPtr;                      /* Value of pointer written into record */
  int iOff;                       /* Current write offset within page pPg */
  Segment *pSeg;                  /* Segment being written */
  int flags = 0;                  /* If != 0, flags value for page footer */

  void *pVal;
  int nVal;

  pMerge = pMW->pLevel->pMerge;    
  pSeg = &pMW->pLevel->lhs;





  pPg = pMW->pPage;

  aData = fsPageData(pPg, &nData);
  nRec = pageGetNRec(aData, nData);
  iFPtr = pageGetPtr(aData, nData);

  /* Calculate the relative pointer value to write to this record */
  iRPtr = iPtr - iFPtr;
  /* assert( iRPtr>=0 ); */

     
  /* Figure out how much space is required by the new record. The space
  ** required is divided into two sections: the header and the body. The
  ** header consists of the intial varint fields. The body are the blobs 
  ** of data that correspond to the key and value data. The entire header 
  ** must be stored on the page. The body may overflow onto the next and
  ** subsequent pages.
................................................................................
  */
  rc = lsmMCursorValue(pCsr, &pVal, &nVal);
  if( rc==LSM_OK ){
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
     ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = *pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;
      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;
    }
  }

  /* If this record header will be the first on the page, and the page is 
  ** not the very first in the entire run, special actions may need to be 
  ** taken:
  **
  **   * If currently writing the main run, *piPtrOut should be set to
  **     the current page number. The caller will add a key to the separators
  **     array that points to the current page.
  **
  **   * If currently writing the separators array, push a copy of the key
  **     into the b-tree hierarchy.
  */
  if( rc==LSM_OK && nRec==0 && pSeg->iFirst!=pSeg->iLast ){
    assert( pMerge->nSkip>=0 );

    if( pMerge->nSkip==0 ){
      rc = mergeWorkerPushHierarchy(pMW, rtTopic(eType), pKey, nKey);
      assert( pMW->aSave[0].bStore==0 );
      pMW->aSave[0].bStore = 1;
      pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey);
................................................................................
  pMW->pCsr = 0;
  pMW->pPage = 0;
  pMW->pPage = 0;

  *pRc = rc;
}

/*
** The MergeWorker passed as the only argument is working to merge two or
** more existing segments together (not to flush an in-memory tree). It
** has not yet written the first key to the first page of the output.
*/
static int mergeWorkerFirstPage(MergeWorker *pMW){
  int rc;                         /* Return code */
  Page *pPg = 0;                  /* First page of run pSeg */
  int iFPtr;                      /* Pointer value read from footer of pPg */
  MultiCursor *pCsr = pMW->pCsr;

  assert( pMW->pPage==0 );

  if( pCsr->pBtCsr ){
    rc = LSM_OK;
    iFPtr = pMW->pLevel->pNext->lhs.iFirst;
  }else{
    Segment *pSeg;
    pSeg = pMW->pCsr->aPtr[pMW->pCsr->nPtr-1].pSeg;
    rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iFirst, &pPg);
    if( rc==LSM_OK ){
      u8 *aData;                    /* Buffer for page pPg */
      int nData;                    /* Size of aData[] in bytes */
      aData = fsPageData(pPg, &nData);
      iFPtr = pageGetPtr(aData, nData);
      lsmFsPageRelease(pPg);
    }
  }

  if( rc==LSM_OK ){
    rc = mergeWorkerNextPage(pMW, iFPtr);
    if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
    pMW->aSave[0].bStore = 1;
  }

  return rc;
}

/*
** The cursor passed as the first argument is being used as the input for
** a merge operation. When this function is called, *piFlags contains the
** database entry flags for the current entry. The entry about to be written
** to the output.
**
*/
................................................................................
      }
    }

    /* If this is a separator key and we know that the output pointer has not
    ** changed, there is no point in writing an output record. Otherwise,
    ** proceed. */
    if( rtIsSeparator(eType)==0 || iPtr!=0 ){
      if( pMW->pPage==0 ){
        rc = mergeWorkerFirstPage(pMW);
      }

      /* Write the record into the main run. */
      if( rc==LSM_OK ){
        rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr);
      }
    }
  }

................................................................................
  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  Pgno iLeftPtr = 0;
  int nWrite = 0;                 /* Number of database pages written */

  assert( pnOvfl );

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
................................................................................
  if( pCsr ){
    pCsr->pDb = pDb;
    multiCursorVisitFreelist(pCsr, pnOvfl);
    rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
    if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){
      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);
      iLeftPtr = pNext->lhs.iFirst;
    }

    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);
  }else{

    Merge merge;                  /* Merge object used to create new level */
    MergeWorker mergeworker;      /* MergeWorker object for the same purpose */

    memset(&merge, 0, sizeof(Merge));
    memset(&mergeworker, 0, sizeof(MergeWorker));

    pNew->pMerge = &merge;
................................................................................
    mergeworker.pLevel = pNew;
    mergeworker.pCsr = pCsr;
    pCsr->pPrevMergePtr = &iLeftPtr;

    /* Mark the separators array for the new level as a "phantom". */
    mergeworker.bFlush = 1;

    /* Allocate the first page of the output segment. */
    rc = mergeWorkerNextPage(&mergeworker, iLeftPtr);
    mergeworker.aSave[0].bStore = 1;

    /* Do the work to create the new merged segment on disk */
    if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }

    nWrite = mergeworker.nWork;
................................................................................
    pNew->pMerge = pMerge;
  }

  *ppNew = pNew;
  return rc;
}

static int mergeWorkerLoadOutputPage(MergeWorker *pMW){
  int rc = LSM_OK;                /* Return code */
  Segment *pSeg;                  /* Run to load page from */
  Level *pLevel;

  pLevel = pMW->pLevel;
  pSeg = &pLevel->lhs;
  if( pSeg->iLast ){
    Page *pPg;
    rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iLast, &pPg);

    while( rc==LSM_OK ){
      Page *pNext;
      u8 *aData;
      int nData;
      aData = fsPageData(pPg, &nData);
      if( (pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG)==0 ) break;
      rc = lsmFsDbPageNext(pSeg, pPg, -1, &pNext);
      lsmFsPageRelease(pPg);
      pPg = pNext;
    }

    if( rc==LSM_OK ){
      pMW->pPage = pPg;
      assert( pLevel->pMerge->iOutputOff<0 );
    }
  }
  return rc;
}

static int mergeWorkerInit(
  lsm_db *pDb,                    /* Db connection to do merge work */
  Level *pLevel,                  /* Level to work on merging */
  MergeWorker *pMW                /* Object to initialize */
){
  int rc = LSM_OK;                /* Return code */
  Merge *pMerge = pLevel->pMerge; /* Persistent part of merge state */
................................................................................
  }else{
    multiCursorIgnoreDelete(pCsr);
  }

  assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nPtr+(pCsr->pBtCsr!=0)) );
  pMW->pCsr = pCsr;

  /* Load the current output page and b-tree hierarchy into memory. */
  if( rc==LSM_OK ) rc = mergeWorkerLoadOutputPage(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerLoadHierarchy(pMW);
  if( rc==LSM_OK && pMW->pPage && pMW->hier.nHier==0 ){
    pMW->aSave[0].iPgno = pLevel->lhs.iFirst;
  }

  /* Set MergeWorker.aSave[0].iPgno to contain the */
  if( rc==LSM_OK ){
  }

  /* Position the cursor. */
  if( rc==LSM_OK ){
    pCsr->pPrevMergePtr = &pMerge->iCurrentPtr;
    if( pMW->pPage==0 ){
      /* The output array is still empty. So position the cursor at the very 
      ** start of the input.  */
      rc = multiCursorEnd(pCsr, 0);
    }else{
      /* The output array is non-empty. Position the cursor based on the
      ** page/cell data saved in the Merge.aInput[] array.  */
      int i;
................................................................................
  int bCkpt = 0;
  int bToplevel = 0;

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);

  if( rc!=LSM_OK ) return rc;

  /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  ** that this call stops writing when the auto-checkpoint is due.  */
  if( bShutdown==0 && pDb->nAutockpt ){
    u32 nSync;
    u32 nUnsync;
................................................................................
** Space for the returned string is allocated using lsmMalloc(), and should
** be freed by the caller using lsmFree().
*/
static char *segToString(lsm_env *pEnv, Segment *pSeg, int nMin){
  int nSize = pSeg->nSize;
  Pgno iRoot = pSeg->iRoot;
  Pgno iFirst = pSeg->iFirst;
  Pgno iLast = pSeg->iLast;
  char *z;

  char *z1;
  char *z2;
  int nPad;

  z1 = lsmMallocPrintf(pEnv, "%d.%d", iFirst, iLast);







|
>
>
|
>







 







|







 







>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>






>
>
>
>

>
|
|
|
<
<
|
<
>







 







|

|










|
<
<
<
<
<
<
<
|

|







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







<
<
<
<







 







<







 







<










>







 







<
<
<
<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|
<

|



<
<
<
<



|







 







>







 







|







1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
....
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
....
3284
3285
3286
3287
3288
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
....
3387
3388
3389
3390
3391
3392
3393
3394
3395
3396
3397
3398
3399
3400
3401
3402
3403
3404
3405
3406
3407
3408
3409
3410
3411
3412
3413
3414
3415
3416
3417
3418
3419
3420
3421
3422
3423
3424
3425
3426
3427
3428
3429
3430
3431
3432
3433
3434
3435
3436
3437
3438
....
3445
3446
3447
3448
3449
3450
3451
3452
3453
3454
3455
3456
3457
3458
3459
3460
3461
3462
3463
3464
3465
3466
3467


3468

3469
3470
3471
3472
3473
3474
3475
3476
....
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
3496
3497
3498
3499
3500
3501
3502
3503
3504







3505
3506
3507
3508
3509
3510
3511
3512
3513
3514
....
3627
3628
3629
3630
3631
3632
3633






































3634
3635
3636
3637
3638
3639
3640
....
3724
3725
3726
3727
3728
3729
3730




3731
3732
3733
3734
3735
3736
3737
....
3792
3793
3794
3795
3796
3797
3798

3799
3800
3801
3802
3803
3804
3805
....
3815
3816
3817
3818
3819
3820
3821

3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
....
3841
3842
3843
3844
3845
3846
3847




3848
3849
3850
3851
3852
3853
3854
....
3955
3956
3957
3958
3959
3960
3961






























3962
3963
3964
3965
3966
3967
3968
....
3999
4000
4001
4002
4003
4004
4005
4006

4007
4008
4009
4010
4011




4012
4013
4014
4015
4016
4017
4018
4019
4020
4021
4022
....
4337
4338
4339
4340
4341
4342
4343
4344
4345
4346
4347
4348
4349
4350
4351
....
4548
4549
4550
4551
4552
4553
4554
4555
4556
4557
4558
4559
4560
4561
4562
  FileSystem *pFS, 
  SegmentPtr *pPtr, 
  int bLast, 
  int *pRc
){
  if( *pRc==LSM_OK ){
    Page *pNew = 0;
    if( bLast ){
      *pRc = lsmFsDbPageLast(pFS, pPtr->pSeg, &pNew);
    }else{
      *pRc = lsmFsDbPageGet(pFS, pPtr->pSeg->iFirst, &pNew);
    }
    segmentPtrSetPage(pPtr, pNew);
  }
}


/*
** Try to move the segment pointer passed as the second argument so that it
................................................................................
  */
#if 0
  assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
#endif

  assert( pPtr->nCell>0 
       || pPtr->pSeg->nSize==1 
       || lsmFsDbPageIsLast(pPtr->pPg, pPtr->pSeg)
  );
  if( pPtr->nCell==0 ){
    segmentPtrReset(pPtr);
  }else{
    iMin = 0;
    iMax = pPtr->nCell-1;

................................................................................
** (reference pMW->pPage). Set the page number values in aSave[] as 
** required (see comments above struct MergeWorker for details).
*/
static int mergeWorkerPersistAndRelease(MergeWorker *pMW){
  int rc;
  int i;

  assert( pMW->pPage || (pMW->aSave[0].bStore==0 && pMW->aSave[1].bStore==0) );

  /* Persist the page */
  rc = lsmFsPagePersist(pMW->pPage);

  /* If required, save the page number. */
  for(i=0; i<2; i++){
    if( pMW->aSave[i].bStore ){
      pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
................................................................................
      pMerge->iOutputOff = iOff + nCopy;
    }
  }

  return rc;
}


/*
** The MergeWorker passed as the only argument is working to merge two or
** more existing segments together (not to flush an in-memory tree). It
** has not yet written the first key to the first page of the output.
*/
static int mergeWorkerFirstPage(MergeWorker *pMW){
  int rc = LSM_OK;                /* Return code */
  Page *pPg = 0;                  /* First page of run pSeg */
  int iFPtr = 0;                  /* Pointer value read from footer of pPg */
  MultiCursor *pCsr = pMW->pCsr;

  assert( pMW->pPage==0 );

  if( pCsr->pBtCsr ){
    rc = LSM_OK;
    iFPtr = pMW->pLevel->pNext->lhs.iFirst;
  }else if( pCsr->nPtr>0 ){
    Segment *pSeg;
    pSeg = pCsr->aPtr[pCsr->nPtr-1].pSeg;
    rc = lsmFsDbPageGet(pMW->pDb->pFS, pSeg->iFirst, &pPg);
    if( rc==LSM_OK ){
      u8 *aData;                    /* Buffer for page pPg */
      int nData;                    /* Size of aData[] in bytes */
      aData = fsPageData(pPg, &nData);
      iFPtr = pageGetPtr(aData, nData);
      lsmFsPageRelease(pPg);
    }
  }

  if( rc==LSM_OK ){
    rc = mergeWorkerNextPage(pMW, iFPtr);
    if( pCsr->pPrevMergePtr ) *pCsr->pPrevMergePtr = iFPtr;
    pMW->aSave[0].bStore = 1;
  }

  return rc;
}

static int mergeWorkerWrite(
  MergeWorker *pMW,               /* Merge worker object to write into */
  int eType,                      /* One of SORTED_SEPARATOR, WRITE or DELETE */
  void *pKey, int nKey,           /* Key value */
  MultiCursor *pCsr,              /* Read value (if any) from here */
  int iPtr                        /* Absolute value of page pointer, or 0 */
................................................................................
  int nData;                      /* Size of buffer aData[] in bytes */
  int nRec;                       /* Number of records on page pPg */
  int iFPtr;                      /* Value of pointer in footer of pPg */
  int iRPtr;                      /* Value of pointer written into record */
  int iOff;                       /* Current write offset within page pPg */
  Segment *pSeg;                  /* Segment being written */
  int flags = 0;                  /* If != 0, flags value for page footer */
  int bFirst = 0;                 /* True for first key of output run */
  void *pVal;
  int nVal;

  pMerge = pMW->pLevel->pMerge;    
  pSeg = &pMW->pLevel->lhs;

  if( pSeg->iFirst==0 && pMW->pPage==0 ){
    rc = mergeWorkerFirstPage(pMW);
    bFirst = 1;
  }
  pPg = pMW->pPage;
  if( pPg ){
    aData = fsPageData(pPg, &nData);
    nRec = pageGetNRec(aData, nData);
    iFPtr = pageGetPtr(aData, nData);


    iRPtr = iPtr - iFPtr;

  }
     
  /* Figure out how much space is required by the new record. The space
  ** required is divided into two sections: the header and the body. The
  ** header consists of the intial varint fields. The body are the blobs 
  ** of data that correspond to the key and value data. The entire header 
  ** must be stored on the page. The body may overflow onto the next and
  ** subsequent pages.
................................................................................
  */
  rc = lsmMCursorValue(pCsr, &pVal, &nVal);
  if( rc==LSM_OK ){
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
    ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || pPg==0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = *pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;
      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;
    }
  }

  /* If this record header will be the first on the page, and the page is 
  ** not the very first in the entire run, add a copy of the key to the







  ** b-tree hierarchy.
  */
  if( rc==LSM_OK && nRec==0 && bFirst==0 ){
    assert( pMerge->nSkip>=0 );

    if( pMerge->nSkip==0 ){
      rc = mergeWorkerPushHierarchy(pMW, rtTopic(eType), pKey, nKey);
      assert( pMW->aSave[0].bStore==0 );
      pMW->aSave[0].bStore = 1;
      pMerge->nSkip = keyszToSkip(pMW->pDb->pFS, nKey);
................................................................................
  pMW->pCsr = 0;
  pMW->pPage = 0;
  pMW->pPage = 0;

  *pRc = rc;
}







































/*
** The cursor passed as the first argument is being used as the input for
** a merge operation. When this function is called, *piFlags contains the
** database entry flags for the current entry. The entry about to be written
** to the output.
**
*/
................................................................................
      }
    }

    /* If this is a separator key and we know that the output pointer has not
    ** changed, there is no point in writing an output record. Otherwise,
    ** proceed. */
    if( rtIsSeparator(eType)==0 || iPtr!=0 ){




      /* Write the record into the main run. */
      if( rc==LSM_OK ){
        rc = mergeWorkerWrite(pMW, eType, pKey, nKey, pCsr, iPtr);
      }
    }
  }

................................................................................
  int *pnWrite                    /* OUT: Number of database pages written */
){
  int rc = LSM_OK;                /* Return Code */
  MultiCursor *pCsr = 0;
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */

  int nWrite = 0;                 /* Number of database pages written */

  assert( pnOvfl );

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
................................................................................
  if( pCsr ){
    pCsr->pDb = pDb;
    multiCursorVisitFreelist(pCsr, pnOvfl);
    rc = multiCursorAddTree(pCsr, pDb->pWorker, eTree);
    if( rc==LSM_OK && pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){
      pDel = &pNext->lhs;
      rc = btreeCursorNew(pDb, pDel, &pCsr->pBtCsr);

    }

    if( pNext==0 ){
      multiCursorIgnoreDelete(pCsr);
    }
  }

  if( rc!=LSM_OK ){
    lsmMCursorClose(pCsr);
  }else{
    Pgno iLeftPtr = 0;
    Merge merge;                  /* Merge object used to create new level */
    MergeWorker mergeworker;      /* MergeWorker object for the same purpose */

    memset(&merge, 0, sizeof(Merge));
    memset(&mergeworker, 0, sizeof(MergeWorker));

    pNew->pMerge = &merge;
................................................................................
    mergeworker.pLevel = pNew;
    mergeworker.pCsr = pCsr;
    pCsr->pPrevMergePtr = &iLeftPtr;

    /* Mark the separators array for the new level as a "phantom". */
    mergeworker.bFlush = 1;





    /* Do the work to create the new merged segment on disk */
    if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }

    nWrite = mergeworker.nWork;
................................................................................
    pNew->pMerge = pMerge;
  }

  *ppNew = pNew;
  return rc;
}































static int mergeWorkerInit(
  lsm_db *pDb,                    /* Db connection to do merge work */
  Level *pLevel,                  /* Level to work on merging */
  MergeWorker *pMW                /* Object to initialize */
){
  int rc = LSM_OK;                /* Return code */
  Merge *pMerge = pLevel->pMerge; /* Persistent part of merge state */
................................................................................
  }else{
    multiCursorIgnoreDelete(pCsr);
  }

  assert( rc!=LSM_OK || pMerge->nInput==(pCsr->nPtr+(pCsr->pBtCsr!=0)) );
  pMW->pCsr = pCsr;

  /* Load the b-tree hierarchy into memory. */

  if( rc==LSM_OK ) rc = mergeWorkerLoadHierarchy(pMW);
  if( rc==LSM_OK && pMW->hier.nHier==0 ){
    pMW->aSave[0].iPgno = pLevel->lhs.iFirst;
  }





  /* Position the cursor. */
  if( rc==LSM_OK ){
    pCsr->pPrevMergePtr = &pMerge->iCurrentPtr;
    if( pLevel->lhs.iFirst==0 ){
      /* The output array is still empty. So position the cursor at the very 
      ** start of the input.  */
      rc = multiCursorEnd(pCsr, 0);
    }else{
      /* The output array is non-empty. Position the cursor based on the
      ** page/cell data saved in the Merge.aInput[] array.  */
      int i;
................................................................................
  int bCkpt = 0;
  int bToplevel = 0;

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);
  assert( rc!=8 );
  if( rc!=LSM_OK ) return rc;

  /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  ** that this call stops writing when the auto-checkpoint is due.  */
  if( bShutdown==0 && pDb->nAutockpt ){
    u32 nSync;
    u32 nUnsync;
................................................................................
** Space for the returned string is allocated using lsmMalloc(), and should
** be freed by the caller using lsmFree().
*/
static char *segToString(lsm_env *pEnv, Segment *pSeg, int nMin){
  int nSize = pSeg->nSize;
  Pgno iRoot = pSeg->iRoot;
  Pgno iFirst = pSeg->iFirst;
  Pgno iLast = pSeg->iLastPg;
  char *z;

  char *z1;
  char *z2;
  int nPad;

  z1 = lsmMallocPrintf(pEnv, "%d.%d", iFirst, iLast);