SQLite4
Check-in [ae3c8da44d]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add padding records to segments in compressed databases in order to avoid partial writes to segments that have already been synced to disk.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: ae3c8da44d25df955cfe102f7e06afb1d161bf11
User & Date: dan 2012-10-28 10:07:03
Context
2012-10-28
11:28
Enhance the file-format to allow padding records smaller than 6 bytes in length. check-in: 2ba0368e76 user: dan tags: compression-hooks
10:07
Add padding records to segments in compressed databases in order to avoid partial writes to segments that have already been synced to disk. check-in: ae3c8da44d user: dan tags: compression-hooks
2012-10-27
08:57
Change the format of compressed page records slightly so that the file format supports inserting padding records into sorted runs. check-in: 0b940bfe17 user: dan tags: compression-hooks
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest2.c.

200
201
202
203
204
205
206
207
208

209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
...
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266








267
268
269
270
271
272
273

274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
...
294
295
296
297
298
299
300

301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
...
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
...
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
...
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420

421
422

423
424
425
426
427
428
429
430
431
432
433
434
435
436
** This function is a no-op if *pRc is non-zero when it is called.
**
** Open the LSM database identified by zFile and compute its checksum
** (a string, as returned by testCksumDatabase()). If the checksum is
** identical to zExpect1 or, if it is not NULL, zExpect2, the test passes.
** Otherwise, print an error message and set *pRc to 1.
*/
void testCompareCksumLsmdb(
  const char *zFile,              /* Path to LSM database */

  const char *zExpect1,           /* Expected checksum 1 */
  const char *zExpect2,           /* Expected checksum 2 (or NULL) */
  int *pRc                        /* IN/OUT: Test case error code */
){
  if( *pRc==0 ){
    char zCksum[TEST_CKSUM_BYTES];
    TestDb *pDb;

    pDb = testOpen("lsm", 0, pRc);
    testCksumDatabase(pDb, zCksum);
    testClose(&pDb);

    if( *pRc==0 ){
      int r1 = 0;
      int r2 = -1;

................................................................................
** should really be in this file.
*************************************************************************/

/*
** This test verifies that if a system crash occurs while doing merge work
** on the db, no data is lost.
*/
static void crash_test1(int *pRc){
  const char *DBNAME = "testdb.lsm";

  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 200, 200};

  const int nRow = 5000;          /* Database size */
  const int nIter = 200;          /* Number of test iterations */
  const int nWork = 20;           /* Maximum lsm_work() calls per iteration */
  const int nPage = 15;           /* Pages per lsm_work call */

  int i;
  int iDot = 0;
  Datasource *pData;
  CksumDb *pCksumDb;
  TestDb *pDb;









  /* Allocate datasource. And calculate the expected checksums. */
  pData = testDatasourceNew(&defn);
  pCksumDb = testCksumArrayNew(pData, nRow, nRow, 1);

  /* Setup and save the initial database. */
  testSetupSavedLsmdb(

      "page_size=1024 block_size=65536 write_buffer=16384 nmerge=7", 
      DBNAME, pData, 5000, pRc
  );

  for(i=0; i<nIter && *pRc==0; i++){
    int iWork;
    int testrc = 0;

    testCaseProgress(i, nIter, testCaseNDot(), &iDot);

    /* Restore and open the database. */
    testRestoreLsmdb(DBNAME);
    testrc = tdb_lsm_open("safety=2", DBNAME, 0, &pDb);
    assert( testrc==0 );

    /* Call lsm_work() on the db */
    tdb_lsm_prepare_sync_crash(pDb, 1 + (i%(nWork*2)));
    for(iWork=0; testrc==0 && iWork<nWork; iWork++){
      int nWrite = 0;
      lsm_db *db = tdb_lsm(pDb);
................................................................................
      testrc = lsm_work(db, 0, nPage, &nWrite);
      assert( testrc!=0 || nWrite>0 );
      if( testrc==0 ) testrc = lsm_checkpoint(db, 0);
    }
    tdb_close(pDb);

    /* Check that the database content is still correct */

    testCompareCksumLsmdb(DBNAME, testCksumArrayGet(pCksumDb, nRow), 0, pRc);
  }

  testCksumArrayFree(pCksumDb);
  testDatasourceFree(pData);
}

/*
** This test verifies that if a system crash occurs while committing a
** transaction to the log file, no earlier transactions are lost or damaged.
*/
static void crash_test2(int *pRc){
  const char *DBNAME = "testdb.lsm";
  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 1000, 1000};

  const int nIter = 200;
  const int nInsert = 20;

  int i;
................................................................................
      testDatasourceEntry(pData, 100+iIns, &pKey, &nKey, &pVal, &nVal);
      testrc = tdb_write(pDb, pKey, nKey, pVal, nVal);
      if( testrc ) break;
    }
    tdb_close(pDb);

    /* Check that no data was lost when the system crashed. */
    testCompareCksumLsmdb(DBNAME, 
      testCksumArrayGet(pCksumDb, 100 + iIns),
      testCksumArrayGet(pCksumDb, 100 + iIns + 1),
      pRc
    );
  }

  testDatasourceFree(pData);
................................................................................
}

/*
** This test verifies that if a system crash occurs when checkpointing
** the database, data is not lost (assuming that any writes not synced
** to the db have been synced into the log file).
*/
static void crash_test3(int *pRc){
  const char *DBNAME = "testdb.lsm";
  const int nIter = 100;
  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 1000, 1000};

  int i;
  int iDot = 0;
  Datasource *pData;
................................................................................

      /* Schedule a crash simulation then close the db. */
      tdb_lsm_prepare_sync_crash(pDb, 1 + (i%2));
      tdb_close(pDb);

      /* Open the database and check that the crash did not cause any
      ** data loss.  */
      testCompareCksumLsmdb(DBNAME, 
        testCksumArrayGet(pCksumDb, 110 + iOpen*10), 0,
        pRc
      );
    }
  }

  testDatasourceFree(pData);
  testCksumArrayFree(pCksumDb);
}

void do_crash_test(const char *zPattern, int *pRc){
  struct Test {
    const char *zTest;
    void (*x)(int *);

  } aTest [] = {
    { "crash.lsm.1", crash_test1 },

    { "crash.lsm.2", crash_test2 },
    { "crash.lsm.3", crash_test3 },
  };
  int i;

  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
    struct Test *p = &aTest[i];
    if( testCaseBegin(pRc, zPattern, "%s", p->zTest) ){
      p->x(pRc);
      testCaseFinish(*pRc);
    }
  }
}








|

>








|







 







|

<












>
>
>
>
>
>
>
>






<
>
|
|
|









|







 







>
|










|







 







|







 







|







 







|













|
>

|
>
|
|






|





200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
...
246
247
248
249
250
251
252
253
254

255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280

281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
...
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
...
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
...
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
...
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
** This function is a no-op if *pRc is non-zero when it is called.
**
** Open the LSM database identified by zFile and compute its checksum
** (a string, as returned by testCksumDatabase()). If the checksum is
** identical to zExpect1 or, if it is not NULL, zExpect2, the test passes.
** Otherwise, print an error message and set *pRc to 1.
*/
static void testCompareCksumLsmdb(
  const char *zFile,              /* Path to LSM database */
  int bCompress,                  /* True if db is compressed */
  const char *zExpect1,           /* Expected checksum 1 */
  const char *zExpect2,           /* Expected checksum 2 (or NULL) */
  int *pRc                        /* IN/OUT: Test case error code */
){
  if( *pRc==0 ){
    char zCksum[TEST_CKSUM_BYTES];
    TestDb *pDb;

    *pRc = tdb_lsm_open((bCompress?"compression=1 mmap=0":""), zFile, 0, &pDb);
    testCksumDatabase(pDb, zCksum);
    testClose(&pDb);

    if( *pRc==0 ){
      int r1 = 0;
      int r2 = -1;

................................................................................
** should really be in this file.
*************************************************************************/

/*
** This test verifies that if a system crash occurs while doing merge work
** on the db, no data is lost.
*/
static void crash_test1(int bCompress, int *pRc){
  const char *DBNAME = "testdb.lsm";

  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 200, 200};

  const int nRow = 5000;          /* Database size */
  const int nIter = 200;          /* Number of test iterations */
  const int nWork = 20;           /* Maximum lsm_work() calls per iteration */
  const int nPage = 15;           /* Pages per lsm_work call */

  int i;
  int iDot = 0;
  Datasource *pData;
  CksumDb *pCksumDb;
  TestDb *pDb;
  char *zCfg;

  const char *azConfig[2] = {
    "page_size=1024 block_size=65536 write_buffer=16384 safety=2 mmap=0", 
    "page_size=1024 block_size=65536 write_buffer=16384 safety=2 "
    " compression=1 mmap=0"
  };
  assert( bCompress==0 || bCompress==1 );

  /* Allocate datasource. And calculate the expected checksums. */
  pData = testDatasourceNew(&defn);
  pCksumDb = testCksumArrayNew(pData, nRow, nRow, 1);

  /* Setup and save the initial database. */


  zCfg = testMallocPrintf("%s nmerge=7", azConfig[bCompress]);
  testSetupSavedLsmdb(zCfg, DBNAME, pData, 5000, pRc);
  testFree(zCfg);

  for(i=0; i<nIter && *pRc==0; i++){
    int iWork;
    int testrc = 0;

    testCaseProgress(i, nIter, testCaseNDot(), &iDot);

    /* Restore and open the database. */
    testRestoreLsmdb(DBNAME);
    testrc = tdb_lsm_open(azConfig[bCompress], DBNAME, 0, &pDb);
    assert( testrc==0 );

    /* Call lsm_work() on the db */
    tdb_lsm_prepare_sync_crash(pDb, 1 + (i%(nWork*2)));
    for(iWork=0; testrc==0 && iWork<nWork; iWork++){
      int nWrite = 0;
      lsm_db *db = tdb_lsm(pDb);
................................................................................
      testrc = lsm_work(db, 0, nPage, &nWrite);
      assert( testrc!=0 || nWrite>0 );
      if( testrc==0 ) testrc = lsm_checkpoint(db, 0);
    }
    tdb_close(pDb);

    /* Check that the database content is still correct */
    testCompareCksumLsmdb(DBNAME, 
        bCompress, testCksumArrayGet(pCksumDb, nRow), 0, pRc);
  }

  testCksumArrayFree(pCksumDb);
  testDatasourceFree(pData);
}

/*
** This test verifies that if a system crash occurs while committing a
** transaction to the log file, no earlier transactions are lost or damaged.
*/
static void crash_test2(int bCompress, int *pRc){
  const char *DBNAME = "testdb.lsm";
  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 1000, 1000};

  const int nIter = 200;
  const int nInsert = 20;

  int i;
................................................................................
      testDatasourceEntry(pData, 100+iIns, &pKey, &nKey, &pVal, &nVal);
      testrc = tdb_write(pDb, pKey, nKey, pVal, nVal);
      if( testrc ) break;
    }
    tdb_close(pDb);

    /* Check that no data was lost when the system crashed. */
    testCompareCksumLsmdb(DBNAME, bCompress,
      testCksumArrayGet(pCksumDb, 100 + iIns),
      testCksumArrayGet(pCksumDb, 100 + iIns + 1),
      pRc
    );
  }

  testDatasourceFree(pData);
................................................................................
}

/*
** This test verifies that if a system crash occurs when checkpointing
** the database, data is not lost (assuming that any writes not synced
** to the db have been synced into the log file).
*/
static void crash_test3(int bCompress, int *pRc){
  const char *DBNAME = "testdb.lsm";
  const int nIter = 100;
  const DatasourceDefn defn = {TEST_DATASOURCE_RANDOM, 12, 16, 1000, 1000};

  int i;
  int iDot = 0;
  Datasource *pData;
................................................................................

      /* Schedule a crash simulation then close the db. */
      tdb_lsm_prepare_sync_crash(pDb, 1 + (i%2));
      tdb_close(pDb);

      /* Open the database and check that the crash did not cause any
      ** data loss.  */
      testCompareCksumLsmdb(DBNAME, bCompress,
        testCksumArrayGet(pCksumDb, 110 + iOpen*10), 0,
        pRc
      );
    }
  }

  testDatasourceFree(pData);
  testCksumArrayFree(pCksumDb);
}

void do_crash_test(const char *zPattern, int *pRc){
  struct Test {
    const char *zTest;
    void (*x)(int, int *);
    int bCompress;
  } aTest [] = {
    { "crash.lsm.1",     crash_test1, 0 },
    { "crash.lsm_zip.1", crash_test1, 1 },
    { "crash.lsm.2",     crash_test2, 0 },
    { "crash.lsm.3",     crash_test3, 0 },
  };
  int i;

  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
    struct Test *p = &aTest[i];
    if( testCaseBegin(pRc, zPattern, "%s", p->zTest) ){
      p->x(p->bCompress, pRc);
      testCaseFinish(*pRc);
    }
  }
}

Changes to lsm-test/lsmtest_tdb3.c.

366
367
368
369
370
371
372

373
374
375
376
377
378
379
        int iOpt = testPrngValue(iSeed++) % 3;
        switch( iOpt ){
          case 0:
            break;

          case 1:
            testPrngArray(iSeed++, (u32 *)aOld, pDb->szSector/4);


          case 2:
            pEnv->xWrite(
                pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector
            );
            break;
        }







>







366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
        int iOpt = testPrngValue(iSeed++) % 3;
        switch( iOpt ){
          case 0:
            break;

          case 1:
            testPrngArray(iSeed++, (u32 *)aOld, pDb->szSector/4);
            /* Fall-through */

          case 2:
            pEnv->xWrite(
                pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector
            );
            break;
        }

Changes to src/lsmInt.h.

615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
int lsmFsFileid(lsm_db *pDb, void **ppId, int *pnId);

/* Creating, populating, gobbling and deleting sorted runs. */
void lsmFsGobble(lsm_db *, Segment *, Pgno *, int);
int lsmFsSortedDelete(FileSystem *, Snapshot *, int, Segment *);
int lsmFsSortedFinish(FileSystem *, Segment *);
int lsmFsSortedAppend(FileSystem *, Snapshot *, Segment *, Page **);
int lsmFsPhantomMaterialize(FileSystem *, Snapshot *, Segment *);

/* Functions to retrieve the lsm_env pointer from a FileSystem or Page object */
lsm_env *lsmFsEnv(FileSystem *);
lsm_env *lsmPageEnv(Page *);
FileSystem *lsmPageFS(Page *);

int lsmFsSectorSize(FileSystem *);







|







615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
int lsmFsFileid(lsm_db *pDb, void **ppId, int *pnId);

/* Creating, populating, gobbling and deleting sorted runs. */
void lsmFsGobble(lsm_db *, Segment *, Pgno *, int);
int lsmFsSortedDelete(FileSystem *, Snapshot *, int, Segment *);
int lsmFsSortedFinish(FileSystem *, Segment *);
int lsmFsSortedAppend(FileSystem *, Snapshot *, Segment *, Page **);
int lsmFsSortedPadding(FileSystem *, Snapshot *, Segment *);

/* Functions to retrieve the lsm_env pointer from a FileSystem or Page object */
lsm_env *lsmFsEnv(FileSystem *);
lsm_env *lsmPageEnv(Page *);
FileSystem *lsmPageFS(Page *);

int lsmFsSectorSize(FileSystem *);

Changes to src/lsm_file.c.

174
175
176
177
178
179
180

181
182
183
184
185
186
187
...
516
517
518
519
520
521
522


523
524
525
526
527
528
529
...
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
...
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
...
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
...
998
999
1000
1001
1002
1003
1004



1005
1006
1007
1008
1009
1010
1011
....
1013
1014
1015
1016
1017
1018
1019
1020

1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035







1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050

1051
1052
1053
1054
1055
1056
1057
....
1058
1059
1060
1061
1062
1063
1064
1065

1066
1067
1068
1069
1070
1071
1072
....
1101
1102
1103
1104
1105
1106
1107

1108
1109
1110
1111
1112
1113
1114
....
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140

1141
1142
1143
1144
1145
1146

1147

1148
1149
1150
1151
1152
1153
1154
1155
1156
....
1309
1310
1311
1312
1313
1314
1315
1316






1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
....
1364
1365
1366
1367
1368
1369
1370

1371
1372
1373
1374



1375
1376


1377
1378
1379


1380






1381
1382
1383
1384
1385
1386

1387
1388
1389
1390
1391
1392
1393
1394
....
1402
1403
1404
1405
1406
1407
1408

1409
1410
1411

1412
1413
1414
1415
1416
1417
1418
....
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
....
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
....
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543






1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561

1562
1563
1564



1565
1566

1567



1568


1569
1570
1571
1572
1573
1574
1575
....
1677
1678
1679
1680
1681
1682
1683

1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699

1700
1701

1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713

1714
1715
1716
1717

1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729

1730
1731
1732







1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
....
1823
1824
1825
1826
1827
1828
1829


























































1830
1831
1832
1833
1834
1835
1836
....
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
....
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
....
2094
2095
2096
2097
2098
2099
2100

2101
2102
2103
2104
2105
2106
  int nPagesize;                  /* Database page-size in bytes */
  int nBlocksize;                 /* Database block-size in bytes */

  /* r/w file descriptors for both files. */
  LsmFile *pLsmFile;
  lsm_file *fdDb;                 /* Database file */
  lsm_file *fdLog;                /* Log file */


  /* If this is a compressed database, a pointer to the compression methods.
  ** For an uncompressed database, a NULL pointer.  */
  lsm_compress *pCompress;
  u8 *aBuffer;                    /* Buffer to compress into */
  int nBuffer;                    /* Allocated size of aBuffer[] in bytes */

................................................................................
        pFS->fdDb = fsOpenFile(pFS, 0, &rc);
      }
    }

    if( rc!=LSM_OK ){
      lsmFsClose(pFS);
      pFS = 0;


    }
  }

  pDb->pFS = pFS;
  return rc;
}

................................................................................
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}

static int fsPageGet(FileSystem *, Pgno, int, Page **);

/*
** Parameter iBlock is a database file block. This function reads the value 
** stored in the blocks "next block" pointer and stores it in *piNext.
** LSM_OK is returned if everything is successful, or an LSM error code
** otherwise.
*/
................................................................................
    rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext));
    if( rc==LSM_OK ){
      *piNext = (int)lsmGetU32(aNext);
    }
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    Page *pLast;
    rc = fsPageGet(pFS, iBlock*nPagePerBlock, 0, &pLast);
    if( rc==LSM_OK ){
      *piNext = fsPageToBlock(pFS, lsmGetU32(&pLast->aData[pFS->nPagesize-4]));
      lsmFsPageRelease(pLast);
    }
  }
  return rc;
}
................................................................................
  aBuf[2] = (u8)nByte | 0x80;
}
static int getRecordSize(u8 *aBuf, int *pbFree){
  int nByte;
  nByte  = (aBuf[0] & 0x7F) << 14;
  nByte += (aBuf[1] & 0x7F) << 7;
  nByte += (aBuf[2] & 0x7F);
  *pbFree = !!(aBuf[1] & 0x80);
  return nByte;
}

static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){
  i64 iStart;
  int iBlk;
  int rc;
................................................................................
  return rc;
}

static int fsAllocateBuffer(FileSystem *pFS){
  assert( pFS->pCompress );
  if( pFS->aBuffer==0 ){
    pFS->nBuffer = pFS->pCompress->xBound(pFS->pCompress->pCtx, pFS->nPagesize);



    pFS->aBuffer = lsmMalloc(pFS->pEnv, LSM_MAX(pFS->nBuffer, pFS->nPagesize));
    if( pFS->aBuffer==0 ) return LSM_NOMEM_BKPT;
  }
  return LSM_OK;
}

/*
................................................................................
** uncompresses the compressed data for page pPg from the database and
** populates the pPg->aData[] buffer and pPg->nCompress field.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
static int fsReadPagedata(
  FileSystem *pFS,                /* File-system handle */
  Page *pPg                       /* Page to read and uncompress data for */

){
  lsm_compress *p = pFS->pCompress;
  i64 iOff = pPg->iPg;
  u8 aSz[6];
  int rc;

  assert( p && pPg->nCompress==0 );

  if( fsAllocateBuffer(pFS) ) return LSM_NOMEM;

  rc = fsReadData(pFS, iOff, aSz, sizeof(aSz));

  if( rc==LSM_OK ){
    int bFree;
    pPg->nCompress = (int)getRecordSize(aSz, &bFree);







    rc = fsAddOffset(pFS, iOff, 3, &iOff);
    if( rc==LSM_OK ){
      if( pPg->nCompress>pFS->nBuffer ){
        rc = LSM_CORRUPT_BKPT;
      }else{
        rc = fsReadData(pFS, iOff, pFS->aBuffer, pPg->nCompress);
      }
      if( rc==LSM_OK ){
        int n = pFS->nBuffer;
        rc = p->xUncompress(p->pCtx, 
            (char *)pPg->aData, &n, 
            (const char *)pFS->aBuffer, pPg->nCompress
        );
        if( rc==LSM_OK && n!=pPg->nData ){
          rc = LSM_CORRUPT_BKPT;

        }
      }
    }
  }
  return rc;
}

................................................................................
/*
** Return a handle for a database page.
*/
static int fsPageGet(
  FileSystem *pFS,                /* File-system handle */
  Pgno iPg,                       /* Page id */
  int noContent,                  /* True to not load content from disk */
  Page **ppPg                     /* OUT: New page handle */

){
  Page *p;
  int iHash;
  int rc = LSM_OK;

  assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
  *ppPg = 0;
................................................................................
    for(p=pFS->apHash[iHash]; p; p=p->pHashNext){
      if( p->iPg==iPg) break;
    }

    if( p==0 ){
      rc = fsPageBuffer(pFS, 1, &p);
      if( rc==LSM_OK ){

        p->iPg = iPg;
        p->nRef = 0;
        p->pFS = pFS;
        assert( p->flags==0 || p->flags==PAGE_FREE );
        if( pFS->pCompress==0 && (fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg)) ){
          p->flags |= PAGE_SHORT;
        }
................................................................................

#ifdef LSM_DEBUG
        memset(p->aData, 0x56, pFS->nPagesize);
#endif
        assert( p->pLruNext==0 && p->pLruPrev==0 );
        if( noContent==0 ){
          if( pFS->pCompress ){
            rc = fsReadPagedata(pFS, p);
          }else{
            int nByte = pFS->nPagesize;
            i64 iOff = (i64)(iPg-1) * pFS->nPagesize;
            rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, p->aData, nByte);
          }
          pFS->nRead++;
        }

        /* If the xRead() call was successful (or not attempted), link the
         ** page into the page-cache hash-table. Otherwise, if it failed,
         ** free the buffer. */
        if( rc==LSM_OK ){
          p->pHashNext = pFS->apHash[iHash];
          pFS->apHash[iHash] = p;
        }else{
          fsPageBufferFree(p);
          p = 0;

        }
      }
    }else if( p->nRef==0 ){
      fsPageRemoveFromLru(pFS, p);
    }


    assert( (rc==LSM_OK && p) || (rc!=LSM_OK && p==0) );

  }
  if( rc==LSM_OK ){
    pFS->nOut += (p->nRef==0);
    p->nRef++;
  }
  *ppPg = p;
  return rc;
}

................................................................................
    iBlk = iNext;
  }

  pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  assert( pRun->nSize>0 );
}

static int fsNextPageOffset(Segment *pSeg, Page *pPg, Pgno *piNext){






  Pgno iNext;
  int rc;

  assert( pPg->pFS->pCompress );

  rc = fsAddOffset(pPg->pFS, pPg->iPg, 2*3 + pPg->nCompress - 1, &iNext);
  if( pSeg && iNext==pSeg->iLastPg ){
    iNext = 0;
  }else if( rc==LSM_OK ){
    rc = fsAddOffset(pPg->pFS, iNext, 1, &iNext);
  }

  *piNext = iNext;
  return rc;
}

static int fsGetPageBefore(FileSystem *pFS, i64 iOff, Pgno *piPrev){
................................................................................
** is set to point to it and LSM_OK is returned. Otherwise, if an error 
** occurs, *ppNext is set to NULL and and lsm error code returned.
**
** Page references returned by this function should be released by the 
** caller using lsmFsPageRelease().
*/
int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){

  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;

  if( pFS->pCompress ){



    if( eDir<0 ){
      int rc = LSM_OK;


      if( iPg==pRun->iFirst || (rc = fsGetPageBefore(pFS, iPg, &iPg)) ){
        *ppNext = 0;
        return LSM_OK;


      }






    }else{
      int rc = fsNextPageOffset(pRun, pPg, &iPg);
      if( rc!=LSM_OK || iPg==0 ){
        *ppNext = 0;
        return rc;
      }

    }
  }else{
    assert( eDir==1 || eDir==-1 );
    if( eDir<0 ){
      if( pRun && iPg==pRun->iFirst ){
        *ppNext = 0;
        return LSM_OK;
      }else if( fsIsFirst(pFS, iPg) ){
................................................................................
        return LSM_OK;
      }else if( fsIsLast(pFS, iPg) ){
        iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]);
      }else{
        iPg++;
      }
    }

  }

  return fsPageGet(pFS, iPg, 0, ppNext);

}

static Pgno findAppendPoint(FileSystem *pFS){
  int i;
  Pgno *aiAppend = pFS->pDb->pWorker->aiAppend;
  u32 iRet = 0;

................................................................................
      }else{
        iNext = fsFirstPageOnBlock(pFS, iNew);
      }
    }

    /* Grab the new page. */
    pPg = 0;
    rc = fsPageGet(pFS, iApp, 1, &pPg);
    assert( rc==LSM_OK || pPg==0 );

    /* If this is the first or last page of a block, fill in the pointer 
     ** value at the end of the new page. */
    if( rc==LSM_OK ){
      p->nSize++;
      p->iLastPg = iApp;
................................................................................

/*
** Mark the sorted run passed as the second argument as finished. 
*/
int lsmFsSortedFinish(FileSystem *pFS, Segment *p){
  int rc = LSM_OK;
  if( p && p->iLastPg ){
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    int iBlk;

    /* Check if the last page of this run happens to be the last of a block.
    ** If it is, then an extra block has already been allocated for this run.
    ** Shift this extra block back to the free-block list. 
    **
    ** Otherwise, add the first free page in the last block used by the run
................................................................................
        if( aiAppend[i]==0 ){
          aiAppend[i] = p->iLastPg+1;
          break;
        }
      }
    }else if( pFS->pCompress==0 ){
      Page *pLast;
      rc = fsPageGet(pFS, p->iLastPg, 0, &pLast);
      if( rc==LSM_OK ){
        int iPg = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
        lsmBlockRefree(pFS->pDb, fsPageToBlock(pFS, iPg));
        lsmFsPageRelease(pLast);
      }






    }
  }
  return rc;
}

/*
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}

/*
** Obtain a reference to the last page in the segment passed as the 
** second argument.
*/
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){

  Pgno iLast = pSeg->iLastPg;
  if( pFS->pCompress ){
    int rc;



    rc = fsGetPageBefore(pFS, iLast+1, &iLast);
    if( rc!=LSM_OK ) return rc;

  }



  return fsPageGet(pFS, iLast, 0, ppPg);


}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should
** be released by the caller using lsmFsPageRelease().
**
................................................................................
){
  Pgno iRet = 0;
  int rc = *pRc;
  assert( pFS->pCompress );
  if( rc==LSM_OK ){
    int nRem;
    int nWrite;

    Pgno iApp = pSeg->iLastPg+1;

    /* If this is the first data written into the segment, find an append-point
    ** or allocate a new block.  */
    if( iApp==1 ){
      pSeg->iFirst = iApp = findAppendPoint(pFS);
      if( iApp==0 ){
        int iBlk;
        rc = lsmBlockAllocate(pFS->pDb, &iBlk);
        pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk);
      }
    }

    iRet = iApp;

    /* Write as much data as is possible at iApp (usually all of it). */

    if( rc==LSM_OK ){
      int nSpace = fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iApp)) - iApp + 1;

      nWrite = LSM_MIN(nData, nSpace);
      nRem = nData - nWrite;
      assert( nWrite>=0 );
      if( nWrite!=0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite);
      }
      iApp += nWrite;
    }

    /* If required, allocate a new block and write the rest of the data
    ** into it. Set the next and previous block pointers to link the new
    ** block to the old.  */

    if( rc==LSM_OK && nRem ){
      u8 aPtr[4];                 /* Space to serialize a u32 */
      int iBlk;                   /* New block number */


      /* Allocate a new block. */
      rc = lsmBlockAllocate(pFS->pDb, &iBlk);

      /* Set the "next" pointer on the old block */
      if( rc==LSM_OK ){
        assert( iApp==(fsPageToBlock(pFS, iApp)*pFS->nBlocksize)-4 );
        lsmPutU32(aPtr, iBlk);
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aPtr, sizeof(aPtr));
      }

      /* Set the "prev" pointer on the new block */
      if( rc==LSM_OK ){

        lsmPutU32(aPtr, fsPageToBlock(pFS, iApp));
        iApp = fsFirstPageOnBlock(pFS, iBlk);
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp-4, aPtr, sizeof(aPtr));







      }

      /* Write the remaining data into the new block */
      if( rc==LSM_OK ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, &aData[nWrite], nRem);
        iApp += nRem;
      }
    }

    pSeg->iLastPg = iApp-1;
    *pRc = rc;
................................................................................
    }
    pPg->flags &= ~PAGE_DIRTY;
    pFS->nWrite++;
  }

  return rc;
}



























































/*
** Increment the reference count on the page object passed as the first
** argument.
*/
void lsmFsPageRef(Page *pPg){
  if( pPg ){
................................................................................
  Segment *pSeg,
  int bExtra,                     /* If true, count the "next" block if any */
  int nUsed,
  u8 *aUsed
){
  if( pSeg ){
    if( pSeg && pSeg->nSize>0 ){
      const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);

      int iBlk;
      int iLastBlk;
      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);

      while( iBlk ){
        assert( iBlk<=nUsed );
................................................................................
        if( iBlk!=iLastBlk ){
          fsBlockNext(pFS, iBlk, &iBlk);
        }else{
          iBlk = 0;
        }
      }

      if( pFS->pCompress==0 && bExtra && (pSeg->iLastPg % nPagePerBlock)==0 ){
        fsBlockNext(pFS, iLastBlk, &iBlk);
        aUsed[iBlk-1] = 1;
      }
    }
  }
}

................................................................................
/*
** Return true if pPg happens to be the last page in segment pSeg. Or false
** otherwise. This function is only invoked as part of assert() conditions.
*/
int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg){
  if( pPg->pFS->pCompress ){
    Pgno iNext = 0;

    int rc = fsNextPageOffset(pSeg, pPg, &iNext);
    return (rc!=LSM_OK || iNext==0);
  }
  return (pPg->iPg==pSeg->iLastPg);
}
#endif







>







 







>
>







 







|







 







|







 







|







 







>
>
>







 







|
>



|











>
>
>
>
>
>
>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>







 







|
>







 







>







 







|











|





>






>
|
>

|







 







|
>
>
>
>
>
>



|

|



|







 







>




>
>
>
|
<
>
>
|
|
<
>
>
|
>
>
>
>
>
>
|
<
<

<

>
|







 







>


<
>







 







|







 







<







 







|





>
>
>
>
>
>










|







>
|

|
>
>
>
|
|
>
|
>
>
>
|
>
>







 







>












<



>

<
>












>
|



>
|
|

|
|
|
|
|
|

|
|
>
|
|
|
>
>
>
>
>
>
>



|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|
<







 







|







 







>
|





174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
...
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
...
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
...
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
...
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
....
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
....
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
....
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
....
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
....
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
....
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
....
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405

1406
1407
1408
1409

1410
1411
1412
1413
1414
1415
1416
1417
1418
1419


1420

1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
....
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447

1448
1449
1450
1451
1452
1453
1454
1455
....
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
....
1546
1547
1548
1549
1550
1551
1552

1553
1554
1555
1556
1557
1558
1559
....
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
....
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748

1749
1750
1751
1752
1753

1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
....
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
....
2114
2115
2116
2117
2118
2119
2120
2121

2122
2123
2124
2125
2126
2127
2128
....
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
....
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
  int nPagesize;                  /* Database page-size in bytes */
  int nBlocksize;                 /* Database block-size in bytes */

  /* r/w file descriptors for both files. */
  LsmFile *pLsmFile;
  lsm_file *fdDb;                 /* Database file */
  lsm_file *fdLog;                /* Log file */
  int szSector;                   /* Database file sector size */

  /* If this is a compressed database, a pointer to the compression methods.
  ** For an uncompressed database, a NULL pointer.  */
  lsm_compress *pCompress;
  u8 *aBuffer;                    /* Buffer to compress into */
  int nBuffer;                    /* Allocated size of aBuffer[] in bytes */

................................................................................
        pFS->fdDb = fsOpenFile(pFS, 0, &rc);
      }
    }

    if( rc!=LSM_OK ){
      lsmFsClose(pFS);
      pFS = 0;
    }else{
      pFS->szSector = lsmEnvSectorSize(pFS->pEnv, pFS->fdDb);
    }
  }

  pDb->pFS = pFS;
  return rc;
}

................................................................................
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}

static int fsPageGet(FileSystem *, Pgno, int, Page **, int *);

/*
** Parameter iBlock is a database file block. This function reads the value 
** stored in the blocks "next block" pointer and stores it in *piNext.
** LSM_OK is returned if everything is successful, or an LSM error code
** otherwise.
*/
................................................................................
    rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext));
    if( rc==LSM_OK ){
      *piNext = (int)lsmGetU32(aNext);
    }
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    Page *pLast;
    rc = fsPageGet(pFS, iBlock*nPagePerBlock, 0, &pLast, 0);
    if( rc==LSM_OK ){
      *piNext = fsPageToBlock(pFS, lsmGetU32(&pLast->aData[pFS->nPagesize-4]));
      lsmFsPageRelease(pLast);
    }
  }
  return rc;
}
................................................................................
  aBuf[2] = (u8)nByte | 0x80;
}
static int getRecordSize(u8 *aBuf, int *pbFree){
  int nByte;
  nByte  = (aBuf[0] & 0x7F) << 14;
  nByte += (aBuf[1] & 0x7F) << 7;
  nByte += (aBuf[2] & 0x7F);
  *pbFree = !(aBuf[1] & 0x80);
  return nByte;
}

static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){
  i64 iStart;
  int iBlk;
  int rc;
................................................................................
  return rc;
}

static int fsAllocateBuffer(FileSystem *pFS){
  assert( pFS->pCompress );
  if( pFS->aBuffer==0 ){
    pFS->nBuffer = pFS->pCompress->xBound(pFS->pCompress->pCtx, pFS->nPagesize);
    if( pFS->nBuffer<(pFS->szSector+6) ){
      pFS->nBuffer = pFS->szSector+6;
    }
    pFS->aBuffer = lsmMalloc(pFS->pEnv, LSM_MAX(pFS->nBuffer, pFS->nPagesize));
    if( pFS->aBuffer==0 ) return LSM_NOMEM_BKPT;
  }
  return LSM_OK;
}

/*
................................................................................
** uncompresses the compressed data for page pPg from the database and
** populates the pPg->aData[] buffer and pPg->nCompress field.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
static int fsReadPagedata(
  FileSystem *pFS,                /* File-system handle */
  Page *pPg,                      /* Page to read and uncompress data for */
  int *pnSpace                    /* OUT: Total bytes of free space */
){
  lsm_compress *p = pFS->pCompress;
  i64 iOff = pPg->iPg;
  u8 aSz[3];
  int rc;

  assert( p && pPg->nCompress==0 );

  if( fsAllocateBuffer(pFS) ) return LSM_NOMEM;

  rc = fsReadData(pFS, iOff, aSz, sizeof(aSz));

  if( rc==LSM_OK ){
    int bFree;
    pPg->nCompress = (int)getRecordSize(aSz, &bFree);
    if( bFree ){
      if( pnSpace ){
        *pnSpace = pPg->nCompress + sizeof(aSz)*2;
      }else{
        rc = LSM_CORRUPT_BKPT;
      }
    }else{
      rc = fsAddOffset(pFS, iOff, 3, &iOff);
      if( rc==LSM_OK ){
        if( pPg->nCompress>pFS->nBuffer ){
          rc = LSM_CORRUPT_BKPT;
        }else{
          rc = fsReadData(pFS, iOff, pFS->aBuffer, pPg->nCompress);
        }
        if( rc==LSM_OK ){
          int n = pFS->nPagesize;
          rc = p->xUncompress(p->pCtx, 
              (char *)pPg->aData, &n, 
              (const char *)pFS->aBuffer, pPg->nCompress
              );
          if( rc==LSM_OK && n!=pPg->nData ){
            rc = LSM_CORRUPT_BKPT;
          }
        }
      }
    }
  }
  return rc;
}

................................................................................
/*
** Return a handle for a database page.
*/
static int fsPageGet(
  FileSystem *pFS,                /* File-system handle */
  Pgno iPg,                       /* Page id */
  int noContent,                  /* True to not load content from disk */
  Page **ppPg,                    /* OUT: New page handle */
  int *pnSpace                    /* OUT: Bytes of free space */
){
  Page *p;
  int iHash;
  int rc = LSM_OK;

  assert( iPg>=fsFirstPageOnBlock(pFS, 1) );
  *ppPg = 0;
................................................................................
    for(p=pFS->apHash[iHash]; p; p=p->pHashNext){
      if( p->iPg==iPg) break;
    }

    if( p==0 ){
      rc = fsPageBuffer(pFS, 1, &p);
      if( rc==LSM_OK ){
        int nSpace = 0;
        p->iPg = iPg;
        p->nRef = 0;
        p->pFS = pFS;
        assert( p->flags==0 || p->flags==PAGE_FREE );
        if( pFS->pCompress==0 && (fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg)) ){
          p->flags |= PAGE_SHORT;
        }
................................................................................

#ifdef LSM_DEBUG
        memset(p->aData, 0x56, pFS->nPagesize);
#endif
        assert( p->pLruNext==0 && p->pLruPrev==0 );
        if( noContent==0 ){
          if( pFS->pCompress ){
            rc = fsReadPagedata(pFS, p, &nSpace);
          }else{
            int nByte = pFS->nPagesize;
            i64 iOff = (i64)(iPg-1) * pFS->nPagesize;
            rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, p->aData, nByte);
          }
          pFS->nRead++;
        }

        /* If the xRead() call was successful (or not attempted), link the
         ** page into the page-cache hash-table. Otherwise, if it failed,
         ** free the buffer. */
        if( rc==LSM_OK && nSpace==0 ){
          p->pHashNext = pFS->apHash[iHash];
          pFS->apHash[iHash] = p;
        }else{
          fsPageBufferFree(p);
          p = 0;
          if( pnSpace ) *pnSpace = nSpace;
        }
      }
    }else if( p->nRef==0 ){
      fsPageRemoveFromLru(pFS, p);
    }

    assert( (rc==LSM_OK && (p || (pnSpace && *pnSpace)))
         || (rc!=LSM_OK && p==0) 
    );
  }
  if( rc==LSM_OK && p ){
    pFS->nOut += (p->nRef==0);
    p->nRef++;
  }
  *ppPg = p;
  return rc;
}

................................................................................
    iBlk = iNext;
  }

  pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  assert( pRun->nSize>0 );
}

static int fsNextPageOffset(
  FileSystem *pFS,                /* File system object */
  Segment *pSeg,                  /* Segment to move within */
  Pgno iPg,                       /* Offset of current page */
  int nByte,                      /* Size of current page including headers */
  Pgno *piNext                    /* OUT: Offset of next page. Or zero (EOF) */
){
  Pgno iNext;
  int rc;

  assert( pFS->pCompress );

  rc = fsAddOffset(pFS, iPg, nByte-1, &iNext);
  if( pSeg && iNext==pSeg->iLastPg ){
    iNext = 0;
  }else if( rc==LSM_OK ){
    rc = fsAddOffset(pFS, iNext, 1, &iNext);
  }

  *piNext = iNext;
  return rc;
}

static int fsGetPageBefore(FileSystem *pFS, i64 iOff, Pgno *piPrev){
................................................................................
** is set to point to it and LSM_OK is returned. Otherwise, if an error 
** occurs, *ppNext is set to NULL and and lsm error code returned.
**
** Page references returned by this function should be released by the 
** caller using lsmFsPageRelease().
*/
int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){
  int rc = LSM_OK;
  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;

  if( pFS->pCompress ){
    int nSpace = pPg->nCompress + 2*3;

    do {
      if( eDir>0 ){

        rc = fsNextPageOffset(pFS, pRun, iPg, nSpace, &iPg);
      }else{
        if( iPg==pRun->iFirst ){
          iPg = 0;

        }else{
          rc = fsGetPageBefore(pFS, iPg, &iPg);
        }
      }

      nSpace = 0;
      if( iPg!=0 ){
        rc = fsPageGet(pFS, iPg, 0, ppNext, &nSpace);
        assert( (*ppNext==0)==(rc!=LSM_OK || nSpace>0) );
      }else{


        *ppNext = 0;

      }
    }while( nSpace>0 && rc==LSM_OK );

  }else{
    assert( eDir==1 || eDir==-1 );
    if( eDir<0 ){
      if( pRun && iPg==pRun->iFirst ){
        *ppNext = 0;
        return LSM_OK;
      }else if( fsIsFirst(pFS, iPg) ){
................................................................................
        return LSM_OK;
      }else if( fsIsLast(pFS, iPg) ){
        iPg = lsmGetU32(&pPg->aData[pFS->nPagesize-4]);
      }else{
        iPg++;
      }
    }
    rc = fsPageGet(pFS, iPg, 0, ppNext, 0);
  }


  return rc;
}

static Pgno findAppendPoint(FileSystem *pFS){
  int i;
  Pgno *aiAppend = pFS->pDb->pWorker->aiAppend;
  u32 iRet = 0;

................................................................................
      }else{
        iNext = fsFirstPageOnBlock(pFS, iNew);
      }
    }

    /* Grab the new page. */
    pPg = 0;
    rc = fsPageGet(pFS, iApp, 1, &pPg, 0);
    assert( rc==LSM_OK || pPg==0 );

    /* If this is the first or last page of a block, fill in the pointer 
     ** value at the end of the new page. */
    if( rc==LSM_OK ){
      p->nSize++;
      p->iLastPg = iApp;
................................................................................

/*
** Mark the sorted run passed as the second argument as finished. 
*/
int lsmFsSortedFinish(FileSystem *pFS, Segment *p){
  int rc = LSM_OK;
  if( p && p->iLastPg ){

    int iBlk;

    /* Check if the last page of this run happens to be the last of a block.
    ** If it is, then an extra block has already been allocated for this run.
    ** Shift this extra block back to the free-block list. 
    **
    ** Otherwise, add the first free page in the last block used by the run
................................................................................
        if( aiAppend[i]==0 ){
          aiAppend[i] = p->iLastPg+1;
          break;
        }
      }
    }else if( pFS->pCompress==0 ){
      Page *pLast;
      rc = fsPageGet(pFS, p->iLastPg, 0, &pLast, 0);
      if( rc==LSM_OK ){
        int iPg = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]);
        lsmBlockRefree(pFS->pDb, fsPageToBlock(pFS, iPg));
        lsmFsPageRelease(pLast);
      }
    }else{
      int iBlk = 0;
      rc = fsBlockNext(pFS, fsPageToBlock(pFS, p->iLastPg), &iBlk);
      if( rc==LSM_OK ){
        lsmBlockRefree(pFS->pDb, iBlk);
      }
    }
  }
  return rc;
}

/*
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg, 0);
}

/*
** Obtain a reference to the last page in the segment passed as the 
** second argument.
*/
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
  int rc;
  Pgno iPg = pSeg->iLastPg;
  if( pFS->pCompress ){
    int nSpace;
    iPg++;
    do {
      nSpace = 0;
      rc = fsGetPageBefore(pFS, iPg, &iPg);
      if( rc==LSM_OK ){
        rc = fsPageGet(pFS, iPg, 0, ppPg, &nSpace);
      }
    }while( rc==LSM_OK && nSpace>0 );

  }else{
    rc = fsPageGet(pFS, iPg, 0, ppPg, 0);
  }
  return rc;
}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should
** be released by the caller using lsmFsPageRelease().
**
................................................................................
){
  Pgno iRet = 0;
  int rc = *pRc;
  assert( pFS->pCompress );
  if( rc==LSM_OK ){
    int nRem;
    int nWrite;
    Pgno iLastOnBlock;
    Pgno iApp = pSeg->iLastPg+1;

    /* If this is the first data written into the segment, find an append-point
    ** or allocate a new block.  */
    if( iApp==1 ){
      pSeg->iFirst = iApp = findAppendPoint(pFS);
      if( iApp==0 ){
        int iBlk;
        rc = lsmBlockAllocate(pFS->pDb, &iBlk);
        pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk);
      }
    }

    iRet = iApp;

    /* Write as much data as is possible at iApp (usually all of it). */
    iLastOnBlock = fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iApp));
    if( rc==LSM_OK ){

      int nSpace = iLastOnBlock - iApp + 1;
      nWrite = LSM_MIN(nData, nSpace);
      nRem = nData - nWrite;
      assert( nWrite>=0 );
      if( nWrite!=0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite);
      }
      iApp += nWrite;
    }

    /* If required, allocate a new block and write the rest of the data
    ** into it. Set the next and previous block pointers to link the new
    ** block to the old.  */
    assert( nRem<=0 || (iApp-1)==iLastOnBlock );
    if( rc==LSM_OK && (iApp-1)==iLastOnBlock ){
      u8 aPtr[4];                 /* Space to serialize a u32 */
      int iBlk;                   /* New block number */

      if( nWrite>0 ){
        /* Allocate a new block. */
        rc = lsmBlockAllocate(pFS->pDb, &iBlk);

        /* Set the "next" pointer on the old block */
        if( rc==LSM_OK ){
          assert( iApp==(fsPageToBlock(pFS, iApp)*pFS->nBlocksize)-4 );
          lsmPutU32(aPtr, iBlk);
          rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aPtr, sizeof(aPtr));
        }

        /* Set the "prev" pointer on the new block */
        if( rc==LSM_OK ){
          Pgno iWrite;
          lsmPutU32(aPtr, fsPageToBlock(pFS, iApp));
          iWrite = fsFirstPageOnBlock(pFS, iBlk);
          rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iWrite-4, aPtr, sizeof(aPtr));
          if( nRem>0 ) iApp = iWrite;
        }
      }else{
        /* The next block is already allocated. */
        assert( nRem>0 );
        rc = fsBlockNext(pFS, fsPageToBlock(pFS, iApp), &iBlk);
        iApp = fsFirstPageOnBlock(pFS, iBlk);
      }

      /* Write the remaining data into the new block */
      if( rc==LSM_OK && nRem>0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, &aData[nWrite], nRem);
        iApp += nRem;
      }
    }

    pSeg->iLastPg = iApp-1;
    *pRc = rc;
................................................................................
    }
    pPg->flags &= ~PAGE_DIRTY;
    pFS->nWrite++;
  }

  return rc;
}

/*
** Add a padding record to the segment passed as the third argument.
*/
int lsmFsSortedPadding(
  FileSystem *pFS, 
  Snapshot *pSnapshot,
  Segment *pSeg
){
  int rc = LSM_OK;
  if( pFS->pCompress ){
    Pgno iLast2;
    Pgno iLast = pSeg->iLastPg;     /* Current last page of segment */
    int nPad;                       /* Bytes of padding required */
    u8 aSz[3];

    nPad = pFS->szSector - 1 - (iLast % pFS->szSector);
    if( nPad==0 
     || (nPad==4 && iLast==fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iLast)) )
    ){
      return LSM_OK;
    }

    iLast2 = (1 + (iLast + 6)/pFS->szSector) * pFS->szSector - 1;
    assert( fsPageToBlock(pFS, iLast)==fsPageToBlock(pFS, iLast2) );
    nPad = iLast2 - iLast;

    if( iLast2>fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iLast)) ){
      nPad -= 4;
      if( nPad<6 ){
        nPad += (pFS->szSector - 4);
      }
    }
    assert( nPad>=6 );

#if 0
    printf("padding segment with %d bytes at %d...\n", nPad, (int)iLast);
#endif

    pSeg->nSize += nPad;
    nPad -= 6;
    putRecordSize(aSz, nPad, 1);

    fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc);
    memset(pFS->aBuffer, 0, nPad);
    fsAppendData(pFS, pSeg, pFS->aBuffer, nPad, &rc);
    fsAppendData(pFS, pSeg, aSz, sizeof(aSz), &rc);


    assert( rc!=LSM_OK 
     || pSeg->iLastPg==fsLastPageOnBlock(pFS, fsPageToBlock(pFS, pSeg->iLastPg))
     || ((pSeg->iLastPg + 1) % pFS->szSector)==0
    );
  }

  return rc;
}


/*
** Increment the reference count on the page object passed as the first
** argument.
*/
void lsmFsPageRef(Page *pPg){
  if( pPg ){
................................................................................
  Segment *pSeg,
  int bExtra,                     /* If true, count the "next" block if any */
  int nUsed,
  u8 *aUsed
){
  if( pSeg ){
    if( pSeg && pSeg->nSize>0 ){
      Pgno iLast = pSeg->iLastPg;

      int iBlk;
      int iLastBlk;
      iBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);

      while( iBlk ){
        assert( iBlk<=nUsed );
................................................................................
        if( iBlk!=iLastBlk ){
          fsBlockNext(pFS, iBlk, &iBlk);
        }else{
          iBlk = 0;
        }
      }

      if( bExtra && iLast==fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iLast)) ){
        fsBlockNext(pFS, iLastBlk, &iBlk);
        aUsed[iBlk-1] = 1;
      }
    }
  }
}

................................................................................
/*
** Return true if pPg happens to be the last page in segment pSeg. Or false
** otherwise. This function is only invoked as part of assert() conditions.
*/
int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg){
  if( pPg->pFS->pCompress ){
    Pgno iNext = 0;
    int rc;
    rc = fsNextPageOffset(pPg->pFS, pSeg, pPg->iPg, pPg->nCompress+6, &iNext);
    return (rc!=LSM_OK || iNext==0);
  }
  return (pPg->iPg==pSeg->iLastPg);
}
#endif

Changes to src/lsm_sorted.c.

3270
3271
3272
3273
3274
3275
3276







3277
3278
3279
3280
3281
3282
3283
....
3443
3444
3445
3446
3447
3448
3449
3450
3451
3452
3453
3454
3455
3456
3457
....
3619
3620
3621
3622
3623
3624
3625

3626
3627
3628
3629
3630
3631
3632
    lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
    pMW->hier.apHier = 0;
    pMW->hier.nHier = 0;
  }

  return rc;
}








static int keyszToSkip(FileSystem *pFS, int nKey){
  int nPgsz;                /* Nominal database page size */
  nPgsz = lsmFsPageSize(pFS);
  return LSM_MIN(((nKey * 4) / nPgsz), 3);
}

................................................................................
  Merge *pMerge;                  /* Persistent part of level merge state */
  int nHdr;                       /* Space required for this record header */
  Page *pPg;                      /* Page to write to */
  u8 *aData;                      /* Data buffer for page pWriter->pPage */
  int nData;                      /* Size of buffer aData[] in bytes */
  int nRec;                       /* Number of records on page pPg */
  int iFPtr;                      /* Value of pointer in footer of pPg */
  int iRPtr;                      /* Value of pointer written into record */
  int iOff;                       /* Current write offset within page pPg */
  Segment *pSeg;                  /* Segment being written */
  int flags = 0;                  /* If != 0, flags value for page footer */
  int bFirst = 0;                 /* True for first key of output run */
  void *pVal;
  int nVal;

................................................................................

  lsmMCursorClose(pCsr);

  /* Persist and release the output page. */
  if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);


  lsmFree(pMW->pDb->pEnv, pMW->aGobble);
  pMW->aGobble = 0;
  pMW->pCsr = 0;
  pMW->pPage = 0;
  pMW->pPage = 0;








>
>
>
>
>
>
>







 







|







 







>







3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
....
3450
3451
3452
3453
3454
3455
3456
3457
3458
3459
3460
3461
3462
3463
3464
....
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
    lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
    pMW->hier.apHier = 0;
    pMW->hier.nHier = 0;
  }

  return rc;
}

static int mergeWorkerAddPadding(
  MergeWorker *pMW                /* Merge worker object */
){
  FileSystem *pFS = pMW->pDb->pFS;
  return lsmFsSortedPadding(pFS, pMW->pDb->pWorker, &pMW->pLevel->lhs);
}

static int keyszToSkip(FileSystem *pFS, int nKey){
  int nPgsz;                /* Nominal database page size */
  nPgsz = lsmFsPageSize(pFS);
  return LSM_MIN(((nKey * 4) / nPgsz), 3);
}

................................................................................
  Merge *pMerge;                  /* Persistent part of level merge state */
  int nHdr;                       /* Space required for this record header */
  Page *pPg;                      /* Page to write to */
  u8 *aData;                      /* Data buffer for page pWriter->pPage */
  int nData;                      /* Size of buffer aData[] in bytes */
  int nRec;                       /* Number of records on page pPg */
  int iFPtr;                      /* Value of pointer in footer of pPg */
  int iRPtr = 0;                  /* Value of pointer written into record */
  int iOff;                       /* Current write offset within page pPg */
  Segment *pSeg;                  /* Segment being written */
  int flags = 0;                  /* If != 0, flags value for page footer */
  int bFirst = 0;                 /* True for first key of output run */
  void *pVal;
  int nVal;

................................................................................

  lsmMCursorClose(pCsr);

  /* Persist and release the output page. */
  if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerAddPadding(pMW);

  lsmFree(pMW->pDb->pEnv, pMW->aGobble);
  pMW->aGobble = 0;
  pMW->pCsr = 0;
  pMW->pPage = 0;
  pMW->pPage = 0;