SQLite4
Check-in [90c2fae338]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fixes for compressed database mode. Some test cases pass. Many do not.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: 90c2fae338c8374dd4d028aca769ac3546cdb012
User & Date: dan 2012-10-23 19:54:23
Context
2012-10-24
18:26
Various fixes. check-in: 728d8cf5ae user: dan tags: compression-hooks
2012-10-23
19:54
Fixes for compressed database mode. Some test cases pass. Many do not. check-in: 90c2fae338 user: dan tags: compression-hooks
2012-10-22
20:05
Add some code to support compressed databases to lsm_file.c. Does not currently work. check-in: 3c45b911fe user: dan tags: compression-hooks
Changes
Hide Diffs Unified Diffs Show Whitespace Changes Patch

Changes to lsm-test/lsmtest.h.

74
75
76
77
78
79
80

81
82
83
84
85
86
87
** Functions in wrapper3.c. This file contains the tdb wrapper for lsm.
** The wrapper for lsm is a bit more involved than the others, as it 
** includes code for a couple of different lsm configurations, and for
** various types of fault injection and robustness testing.
*/
int test_lsm_open(const char *zFilename, int bClear, TestDb **ppDb);
int test_lsm_lomem_open(const char *zFilename, int bClear, TestDb **ppDb);

int test_lsm_small_open(const char *zFilename, int bClear, TestDb **ppDb);
int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb);
int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb);

/* Functions in testutil.c. */
int  testPrngInit(void);
u32  testPrngValue(u32 iVal);







>







74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
** Functions in wrapper3.c. This file contains the tdb wrapper for lsm.
** The wrapper for lsm is a bit more involved than the others, as it 
** includes code for a couple of different lsm configurations, and for
** various types of fault injection and robustness testing.
*/
int test_lsm_open(const char *zFilename, int bClear, TestDb **ppDb);
int test_lsm_lomem_open(const char *zFilename, int bClear, TestDb **ppDb);
int test_lsm_zip_open(const char *zFilename, int bClear, TestDb **ppDb);
int test_lsm_small_open(const char *zFilename, int bClear, TestDb **ppDb);
int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb);
int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb);

/* Functions in testutil.c. */
int  testPrngInit(void);
u32  testPrngValue(u32 iVal);

Changes to lsm-test/lsmtest1.c.

189
190
191
192
193
194
195

196
197
198
199

200
201
202
203
204
205
206
      memcpy(pKey1, pKey2, nKey1+1);
      testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0);

      testScanCompare(pDb2, pDb, 0, 0, 0,         0, 0,         &rc);
      testScanCompare(pDb2, pDb, 0, 0, 0,         pKey2, nKey2, &rc);
      testScanCompare(pDb2, pDb, 0, pKey1, nKey1, 0, 0,         &rc);
      testScanCompare(pDb2, pDb, 0, pKey1, nKey1, pKey2, nKey2, &rc);

      testScanCompare(pDb2, pDb, 1, 0, 0,         0, 0,         &rc);
      testScanCompare(pDb2, pDb, 1, 0, 0,         pKey2, nKey2, &rc);
      testScanCompare(pDb2, pDb, 1, pKey1, nKey1, 0, 0,         &rc);
      testScanCompare(pDb2, pDb, 1, pKey1, nKey1, pKey2, nKey2, &rc);

      testFree(pKey1);
    }
    tdb_close(pDb2);
  }

  /* Test some lookups. */
  for(j=0; rc==0 && j<nLookupTest; j++){







>




>







189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
      memcpy(pKey1, pKey2, nKey1+1);
      testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0);

      testScanCompare(pDb2, pDb, 0, 0, 0,         0, 0,         &rc);
      testScanCompare(pDb2, pDb, 0, 0, 0,         pKey2, nKey2, &rc);
      testScanCompare(pDb2, pDb, 0, pKey1, nKey1, 0, 0,         &rc);
      testScanCompare(pDb2, pDb, 0, pKey1, nKey1, pKey2, nKey2, &rc);
#if 0
      testScanCompare(pDb2, pDb, 1, 0, 0,         0, 0,         &rc);
      testScanCompare(pDb2, pDb, 1, 0, 0,         pKey2, nKey2, &rc);
      testScanCompare(pDb2, pDb, 1, pKey1, nKey1, 0, 0,         &rc);
      testScanCompare(pDb2, pDb, 1, pKey1, nKey1, pKey2, nKey2, &rc);
#endif
      testFree(pKey1);
    }
    tdb_close(pDb2);
  }

  /* Test some lookups. */
  for(j=0; rc==0 && j<nLookupTest; j++){

Changes to lsm-test/lsmtest_tdb.c.

609
610
611
612
613
614
615

616
617
618
619
620
621
622
  const char *zName;
  const char *zDefaultDb;
  int (*xOpen)(const char *zFilename, int bClear, TestDb **ppDb);
} aLib[] = {
  { "sqlite3",      "testdb.sqlite",    sql_open },
  { "lsm_small",    "testdb.lsm_small", test_lsm_small_open },
  { "lsm_lomem",    "testdb.lsm_lomem", test_lsm_lomem_open },

  { "lsm",          "testdb.lsm",       test_lsm_open },
#ifdef LSM_MUTEX_PTHREADS
  { "lsm_mt2",      "testdb.lsm_mt2",   test_lsm_mt2 },
  { "lsm_mt3",      "testdb.lsm_mt3",   test_lsm_mt3 },
#endif
#ifdef HAVE_LEVELDB
  { "leveldb",      "testdb.leveldb",   test_leveldb_open },







>







609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
  const char *zName;
  const char *zDefaultDb;
  int (*xOpen)(const char *zFilename, int bClear, TestDb **ppDb);
} aLib[] = {
  { "sqlite3",      "testdb.sqlite",    sql_open },
  { "lsm_small",    "testdb.lsm_small", test_lsm_small_open },
  { "lsm_lomem",    "testdb.lsm_lomem", test_lsm_lomem_open },
  { "lsm_zip",      "testdb.lsm_zip",   test_lsm_zip_open },
  { "lsm",          "testdb.lsm",       test_lsm_open },
#ifdef LSM_MUTEX_PTHREADS
  { "lsm_mt2",      "testdb.lsm_mt2",   test_lsm_mt2 },
  { "lsm_mt3",      "testdb.lsm_mt3",   test_lsm_mt3 },
#endif
#ifdef HAVE_LEVELDB
  { "leveldb",      "testdb.leveldb",   test_leveldb_open },

Changes to lsm-test/lsmtest_tdb3.c.

388
389
390
391
392
393
394












































395
396
397
398
399
400
401
...
613
614
615
616
617
618
619

620
621
622
623
624
625
626
...
641
642
643
644
645
646
647

648
649
650
651
652
653
654
...
692
693
694
695
696
697
698



699
700
701
702
703
704
705
...
831
832
833
834
835
836
837













838
839
840
841
842
843
844
  sqlite3_free(zFree);
}
/*
** End test VFS code.
**************************************************************************
*************************************************************************/













































static int test_lsm_close(TestDb *pTestDb){
  int i;
  int rc = LSM_OK;
  LsmDb *pDb = (LsmDb *)pTestDb;

  lsm_csr_close(pDb->pCsr);
  lsm_close(pDb->db);
................................................................................
static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}

#define TEST_NO_RECOVERY -1
#define TEST_THREADS     -2


static int test_lsm_config_str(
  LsmDb *pLsm,
  lsm_db *db, 
  int bWorker,
  const char *zStr,
  int *pnThread
................................................................................
    { "use_log",          0, LSM_CONFIG_USE_LOG },
    { "nmerge",           0, LSM_CONFIG_NMERGE },
    { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_nmerge",    1, LSM_CONFIG_NMERGE },
    { "test_no_recovery", 0, TEST_NO_RECOVERY },
    { "threads",          0, TEST_THREADS },

    { 0, 0 }
  };
  const char *z = zStr;
  int nThread = 1;

  assert( db );
  while( z[0] ){
................................................................................
          switch( eParam ){
            case TEST_NO_RECOVERY:
              pLsm->bNoRecovery = iVal;
              break;
            case TEST_THREADS:
              nThread = iVal;
              break;



          }
        }
      }
    }else if( z!=zStart ){
      goto syntax_error;
    }
  }
................................................................................
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 "
    "max_freelist=4 autocheckpoint=32768 "
    "mmap=0 "













  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){
    return ((LsmDb *)pDb)->db;







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>







 







>







 







>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>







388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
...
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
...
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
...
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
...
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
  sqlite3_free(zFree);
}
/*
** End test VFS code.
**************************************************************************
*************************************************************************/

/*************************************************************************
**************************************************************************
** Begin test compression hooks.
*/

static int testZipBound(void *pCtx, int nSrc){
  assert( 0 );
  return 0;
}

static int testZipCompress(
  void *pCtx,                    /* Context pointer */
  char *aOut, int *pnOut,        /* OUT: Buffer containing compressed data */
  const char *aIn, int nIn       /* Buffer containing input data */
){
  assert( 0 );
  return 0;
}

static int testZipUncompress(
  void *pCtx,                    /* Context pointer */
  char *aOut, int *pnOut,        /* OUT: Buffer containing uncompressed data */
  const char *aIn, int nIn       /* Buffer containing input data */
){
  assert( 0 );
  return 0;
}

static int testConfigureCompression(lsm_db *pDb){
  static lsm_compress zip = {
    1, sizeof(lsm_compress),
    0,                            /* Context pointer (unused) */
    testZipBound,                 /* xBound method */
    testZipCompress,              /* xCompress method */
    testZipUncompress             /* xUncompress method */
  };
  return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip);
}

/*
** End test compression hooks.
**************************************************************************
*************************************************************************/

static int test_lsm_close(TestDb *pTestDb){
  int i;
  int rc = LSM_OK;
  LsmDb *pDb = (LsmDb *)pTestDb;

  lsm_csr_close(pDb->pCsr);
  lsm_close(pDb->db);
................................................................................
static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}

#define TEST_NO_RECOVERY -1
#define TEST_THREADS     -2
#define TEST_COMPRESSION -3

static int test_lsm_config_str(
  LsmDb *pLsm,
  lsm_db *db, 
  int bWorker,
  const char *zStr,
  int *pnThread
................................................................................
    { "use_log",          0, LSM_CONFIG_USE_LOG },
    { "nmerge",           0, LSM_CONFIG_NMERGE },
    { "max_freelist",     0, LSM_CONFIG_MAX_FREELIST },
    { "multi_proc",       0, LSM_CONFIG_MULTIPLE_PROCESSES },
    { "worker_nmerge",    1, LSM_CONFIG_NMERGE },
    { "test_no_recovery", 0, TEST_NO_RECOVERY },
    { "threads",          0, TEST_THREADS },
    { "compression",      0, TEST_COMPRESSION },
    { 0, 0 }
  };
  const char *z = zStr;
  int nThread = 1;

  assert( db );
  while( z[0] ){
................................................................................
          switch( eParam ){
            case TEST_NO_RECOVERY:
              pLsm->bNoRecovery = iVal;
              break;
            case TEST_THREADS:
              nThread = iVal;
              break;
            case TEST_COMPRESSION:
              testConfigureCompression(db);
              break;
          }
        }
      }
    }else if( z!=zStart ){
      goto syntax_error;
    }
  }
................................................................................
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 "
    "max_freelist=4 autocheckpoint=32768 "
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_zip_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
  const char *zCfg = 
    "page_size=256 block_size=65536 write_buffer=16384 "
    "max_freelist=4 autocheckpoint=32768 compression=1"
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

lsm_db *tdb_lsm(TestDb *pDb){
  if( pDb->pMethods->xClose==test_lsm_close ){
    return ((LsmDb *)pDb)->db;

Changes to src/lsm.h.

211
212
213
214
215
216
217













218
219
220
221
222
223
224
225
226
227
228
229
230



231
232
233
234
235
236
237
**
**   LSM_CONFIG_MULTIPLE_PROCESSES
**     A read/write boolean parameter. This parameter may only be set before
**     lsm_open() has been called. If true, the library uses shared-memory
**     and posix advisory locks to co-ordinate access by clients from within
**     multiple processes. Otherwise, if false, all database clients must be 
**     located in the same process. The default value is true.













*/
#define LSM_CONFIG_WRITE_BUFFER        1
#define LSM_CONFIG_PAGE_SIZE           2
#define LSM_CONFIG_SAFETY              3
#define LSM_CONFIG_BLOCK_SIZE          4
#define LSM_CONFIG_AUTOWORK            5
#define LSM_CONFIG_LOG_SIZE            6
#define LSM_CONFIG_MMAP                7
#define LSM_CONFIG_USE_LOG             8
#define LSM_CONFIG_NMERGE              9
#define LSM_CONFIG_MAX_FREELIST       10
#define LSM_CONFIG_MULTIPLE_PROCESSES 11
#define LSM_CONFIG_AUTOCHECKPOINT     12




#define LSM_SAFETY_OFF    0
#define LSM_SAFETY_NORMAL 1
#define LSM_SAFETY_FULL   2


/*







>
>
>
>
>
>
>
>
>
>
>
>
>













>
>
>







211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
**
**   LSM_CONFIG_MULTIPLE_PROCESSES
**     A read/write boolean parameter. This parameter may only be set before
**     lsm_open() has been called. If true, the library uses shared-memory
**     and posix advisory locks to co-ordinate access by clients from within
**     multiple processes. Otherwise, if false, all database clients must be 
**     located in the same process. The default value is true.
**
**   LSM_CONFIG_SET_COMPRESSION
**     Set the compression methods used to compress and decompress database
**     content. The argument to this option should be a pointer to a structure
**     of type lsm_compress. The lsm_config() method takes a copy of the 
**     structures contents.
**
**     This option may only be used before lsm_open() is called. Invoking it
**     after lsm_open() has been called results in an LSM_MISUSE error.
**
**   LSM_CONFIG_GET_COMPRESSION
**     Query the compression methods used to compress and decompress database
**     content.
*/
#define LSM_CONFIG_WRITE_BUFFER        1
#define LSM_CONFIG_PAGE_SIZE           2
#define LSM_CONFIG_SAFETY              3
#define LSM_CONFIG_BLOCK_SIZE          4
#define LSM_CONFIG_AUTOWORK            5
#define LSM_CONFIG_LOG_SIZE            6
#define LSM_CONFIG_MMAP                7
#define LSM_CONFIG_USE_LOG             8
#define LSM_CONFIG_NMERGE              9
#define LSM_CONFIG_MAX_FREELIST       10
#define LSM_CONFIG_MULTIPLE_PROCESSES 11
#define LSM_CONFIG_AUTOCHECKPOINT     12

#define LSM_CONFIG_SET_COMPRESSION    13
#define LSM_CONFIG_GET_COMPRESSION    14

#define LSM_SAFETY_OFF    0
#define LSM_SAFETY_NORMAL 1
#define LSM_SAFETY_FULL   2


/*

Changes to src/lsmInt.h.

300
301
302
303
304
305
306

307
308
309
310
311
312
313
  int bUseLog;                    /* Configured by LSM_CONFIG_USE_LOG */
  int nDfltPgsz;                  /* Configured by LSM_CONFIG_PAGE_SIZE */
  int nDfltBlksz;                 /* Configured by LSM_CONFIG_BLOCK_SIZE */
  int nMaxFreelist;               /* Configured by LSM_CONFIG_MAX_FREELIST */
  int bMmap;                      /* Configured by LSM_CONFIG_MMAP */
  int nAutockpt;                  /* Configured by LSM_CONFIG_AUTOCHECKPOINT */
  int bMultiProc;                 /* Configured by L_C_MULTIPLE_PROCESSES */


  /* Sub-system handles */
  FileSystem *pFS;                /* On-disk portion of database */
  Database *pDatabase;            /* Database shared data */

  /* Client transaction context */
  Snapshot *pClient;              /* Client snapshot (non-NULL in read trans) */







>







300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
  int bUseLog;                    /* Configured by LSM_CONFIG_USE_LOG */
  int nDfltPgsz;                  /* Configured by LSM_CONFIG_PAGE_SIZE */
  int nDfltBlksz;                 /* Configured by LSM_CONFIG_BLOCK_SIZE */
  int nMaxFreelist;               /* Configured by LSM_CONFIG_MAX_FREELIST */
  int bMmap;                      /* Configured by LSM_CONFIG_MMAP */
  int nAutockpt;                  /* Configured by LSM_CONFIG_AUTOCHECKPOINT */
  int bMultiProc;                 /* Configured by L_C_MULTIPLE_PROCESSES */
  lsm_compress compress;          /* Compression callbacks */

  /* Sub-system handles */
  FileSystem *pFS;                /* On-disk portion of database */
  Database *pDatabase;            /* Database shared data */

  /* Client transaction context */
  Snapshot *pClient;              /* Client snapshot (non-NULL in read trans) */

Changes to src/lsm_file.c.

167
168
169
170
171
172
173

174
175
176
177
178
179
180
...
239
240
241
242
243
244
245
















246
247
248
249
250
251
252
...
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
...
453
454
455
456
457
458
459



460
461
462
463
464
465
466
...
585
586
587
588
589
590
591




592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
...
608
609
610
611
612
613
614




615
616
617
618
619
620
621
622
623
624




625
626
627
628
629
630
631
632
633
634


635
636
637
638
639
640
641

642
643
644
645
646
647


648
649
650
651
652
653
654
655


656
657
658
659
660
661
662
663
664
665
666
667
...
798
799
800
801
802
803
804





































805
806
807
808
809
810
811
812
813
814
815
816

817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
...
843
844
845
846
847
848
849

850
851
852
853
854
855
856
...
909
910
911
912
913
914
915
916


917
918
919
920
921
922
923
...
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
....
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
....
1233
1234
1235
1236
1237
1238
1239
1240




1241
1242
1243
1244
1245
1246
1247
....
1290
1291
1292
1293
1294
1295
1296

















1297
1298
1299
1300
1301
1302
1303
....
1334
1335
1336
1337
1338
1339
1340

1341
1342
1343
1344
1345
1346
1347
....
1384
1385
1386
1387
1388
1389
1390






























1391
1392
1393
1394
1395
1396
1397
1398
1399

1400
1401
1402
1403
1404
1405
1406
....
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527

1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
....
1610
1611
1612
1613
1614
1615
1616

1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631







1632
1633
1634
1635
1636
1637
1638
....
1859
1860
1861
1862
1863
1864
1865


1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
....
1908
1909
1910
1911
1912
1913
1914


1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
  LsmFile *pLsmFile;
  lsm_file *fdDb;                 /* Database file */
  lsm_file *fdLog;                /* Log file */

  /* If this is a compressed database, a pointer to the compression methods.
  ** For an uncompressed database, a NULL pointer.  */
  lsm_compress *pCompress;


  /* mmap() mode things */
  int bUseMmap;                   /* True to use mmap() to access db file */
  void *pMap;                     /* Current mapping of database file */
  i64 nMap;                       /* Bytes mapped at pMap */
  Page *pFree;

................................................................................

/*
** Number of pgsz byte pages omitted from the start of block 1. The start
** of block 1 contains two 4096 byte meta pages (8192 bytes in total).
*/
#define BLOCK1_HDR_SIZE(pgsz)  LSM_MAX(1, 8192/(pgsz))


















/*
** Wrappers around the VFS methods of the lsm_env object:
**
**     lsmEnvOpen()
**     lsmEnvRead()
**     lsmEnvWrite()
................................................................................
static int lsmEnvRead(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  lsm_i64 iOff, 
  void *pRead, 
  int nRead
){
  return pEnv->xRead(pFile, iOff, pRead, nRead);
}
static int lsmEnvWrite(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  lsm_i64 iOff, 
  void *pWrite, 
  int nWrite
){
  return pEnv->xWrite(pFile, iOff, pWrite, nWrite);
}
static int lsmEnvSync(lsm_env *pEnv, lsm_file *pFile){
  return pEnv->xSync(pFile);
}
static int lsmEnvSectorSize(lsm_env *pEnv, lsm_file *pFile){
  return pEnv->xSectorSize(pFile);
}
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile){
  return pEnv->xClose(pFile);
}
static int lsmEnvTruncate(lsm_env *pEnv, lsm_file *pFile, lsm_i64 nByte){
  return pEnv->xTruncate(pFile, nByte);
}
static int lsmEnvUnlink(lsm_env *pEnv, const char *zDel){
  return pEnv->xUnlink(pEnv, zDel);
}
static int lsmEnvRemap(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  i64 szMin,
  void **ppMap,
  i64 *pszMap
................................................................................
    pFS->zLog = &pFS->zDb[nDb+1];
    pFS->nPagesize = LSM_DFLT_PAGE_SIZE;
    pFS->nBlocksize = LSM_DFLT_BLOCK_SIZE;
    pFS->nMetasize = 4 * 1024;
    pFS->pDb = pDb;
    pFS->pEnv = pDb->pEnv;
    pFS->bUseMmap = pDb->bMmap;




    /* Make a copy of the database and log file names. */
    memcpy(pFS->zDb, zDb, nDb+1);
    memcpy(pFS->zLog, zDb, nDb);
    memcpy(&pFS->zLog[nDb], "-log", 5);

    /* Allocate the hash-table here. At some point, it should be changed
................................................................................
void lsmFsSetBlockSize(FileSystem *pFS, int nBlocksize){
  pFS->nBlocksize = nBlocksize;
}

/*
** Return the page number of the first page on block iBlock. Blocks are
** numbered starting from 1.




*/
static Pgno fsFirstPageOnBlock(FileSystem *pFS, int iBlock){
  Pgno iPg;
  if( pFS->pCompress ){
    if( iBlock==1 ){
      iPg = pFS->nMetasize * 2;
    }else{
      iPg = pFS->nBlocksize * (Pgno)(iBlock-1);
    }
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    if( iBlock==1 ){
      iPg = 1 + ((pFS->nMetasize*2 + pFS->nPagesize - 1) / pFS->nPagesize);
    }else{
      iPg = 1 + (iBlock-1) * nPagePerBlock;
................................................................................
  }
  return iPg;
}

/*
** Return the page number of the last page on block iBlock. Blocks are
** numbered starting from 1.




*/
static Pgno fsLastPageOnBlock(FileSystem *pFS, int iBlock){
  if( pFS->pCompress ){
    return pFS->nBlocksize * (Pgno)iBlock - 1;
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    return iBlock * nPagePerBlock;
  }
}





static int fsPageToBlock(FileSystem *pFS, Pgno iPg){
  if( pFS->pCompress ){
    return (iPg / pFS->nBlocksize) + 1;
  }else{
    return 1 + ((iPg-1) / (pFS->nBlocksize / pFS->nPagesize));
  }
}

/*
** Return true if page iPg is the last page on its block.


*/
static int fsIsLast(FileSystem *pFS, Pgno iPg){
  if( pFS->pCompress ){
    assert( 0 );
    return 0;
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);

    return ( iPg && (iPg % nPagePerBlock)==0 );
  }
}

/*
** Return true if page iPg is the first page on its block.


*/
static int fsIsFirst(FileSystem *pFS, Pgno iPg){
  if( pFS->pCompress ){
    assert( 0 );
    return 0;
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    return (


        (iPg % nPagePerBlock)==1
        || (iPg<nPagePerBlock && iPg==fsFirstPageOnBlock(pFS, 1))
    );
  }
}

/*
** Given a page reference, return a pointer to the in-memory buffer of the
** pages contents. If parameter pnData is not NULL, set *pnData to the size
** of the buffer in bytes before returning.
*/
u8 *lsmFsPageData(Page *pPage, int *pnData){
................................................................................
        pFix->aData = &aData[pFS->nPagesize * (i64)(pFix->iPg-1)];
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}






































/*
** This function is only called in compressed database mode.
*/
static int fsReadData(
  FileSystem *pFS,                /* File-system handle */
  i64 iOff,                       /* Read data from this offset */
  u8 *aData,                      /* Buffer to read data into */
  int nData                       /* Number of bytes to read */
){
  i64 iEob;                       /* End of block */
  int nRead;


  assert( pFS->pCompress );

  iEob = fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iOff)) + 1;
  nRead = LSM_MAX(iEob - iOff, nData);

  rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead);
  if( rc==LSM_OK && nRead!=nData ){
    int iBlk;

    rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk);
    if( rc==LSM_OK ){
      i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk);
      rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, &aData[nRead], nData-nRead);
    }
  }

  return rc;
}

/*
................................................................................
*/
static int fsReadPagedata(
  FileSystem *pFS,                /* File-system handle */
  Page *pPg                       /* Page to read and uncompress data for */
){
  i64 iOff;
  u8 aVarint[9];


  assert( pFS->pCompress && pPg->nCompress==0 );

  iOff = pPg->iPg;
  rc = fsReadData(pFS, iOff, aVarint, sizeof(aVarint));
  if( rc==LSM_OK ){
    iOff += lsmVarintGet32(aVarint, &pPg->nCompress);
................................................................................
    if( p==0 ){
      rc = fsPageBuffer(pFS, 1, &p);
      if( rc==LSM_OK ){
        p->iPg = iPg;
        p->nRef = 0;
        p->pFS = pFS;
        assert( p->flags==0 || p->flags==PAGE_FREE );
        if( fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg) ) p->flags |= PAGE_SHORT;



#ifdef LSM_DEBUG
        memset(p->aData, 0x56, pFS->nPagesize);
#endif
        assert( p->pLruNext==0 && p->pLruPrev==0 );
        if( noContent==0 ){
          if( pFS->pCompress ){
................................................................................
  }else{
    assert( 0 );
  }
  return rc;
}


/*
** Parameter iBlock is a database file block. This function reads the value 
** stored in the blocks "next block" pointer and stores it in *piNext.
** LSM_OK is returned if everything is successful, or an LSM error code
** otherwise.
*/
static int fsBlockNext(
  FileSystem *pFS,                /* File-system object handle */
  int iBlock,                     /* Read field from this block */
  int *piNext                     /* OUT: Next block in linked list */
){
  int rc;

  assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
  if( pFS->pCompress ){
    i64 iOff = (i64)iBlock * pFS->nBlocksize - sizeof(aNext);
    u8 aNext[4];                  /* 4-byte pointer read from db file */
    rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext));
    if( rc==LSM_OK ){
      *piNext = (int)lsmGetU32(aNext);
    }
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    Page *pLast;
    rc = fsPageGet(pFS, iBlock*nPagePerBlock, 0, &pLast);
    if( rc==LSM_OK ){
      *piNext = fsPageToBlock(pFS, lsmGetU32(&pLast->aData[pFS->nPagesize-4]));
      lsmFsPageRelease(pLast);
    }
  }
  return rc;
}

static int fsRunEndsBetween(
  Segment *pRun, 
  Segment *pIgnore, 
  Pgno iFirst, 
  Pgno iLast
){
  return (pRun!=pIgnore && (
................................................................................
  int nByte;

  assert( pFS->pCompress );

  iEob = 1 + fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iPg));
  nByte = 2 * lsmVarintLen32(pPg->nCompress) + pPg->nCompress;

  if( (iPg + nByte)<=iEob && (iPg + nByte - 1)==pSeg->iLastPg ){
    *piNext = 0;
  }else if( (iPg + nByte)>=iEob ){
    int iNext;
    Pgno iNextPg;

    rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPg), &iNext);
    iNextPg = fsFirstPageOnBlock(pFS, iNext) + (nByte - (iEob-iPg));
    if( pSeg->iLastPg==(iNextPg-1) ){
      iNextPg = 0;
    }
    *piNext = iNextPg;
  }else{
    *piNext = iPg + nByte;
  }

................................................................................
  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;

  if( pFS->pCompress ){
    if( eDir<0 ){
      assert( 0 );
    }else{
      rc = fsNextPageOffset(pRun, pPg, &iPg);




    }
  }else{
    assert( eDir==1 || eDir==-1 );
    if( eDir<0 ){
      if( pRun && iPg==pRun->iFirst ){
        *ppNext = 0;
        return LSM_OK;
................................................................................
  int rc = LSM_OK;
  Page *pPg = 0;
  *ppOut = 0;
  int iApp = 0;
  int iNext = 0;
  int iPrev = p->iLastPg;


















  if( iPrev==0 ){
    iApp = findAppendPoint(pFS);
  }else if( fsIsLast(pFS, iPrev) ){
    int iNext;
    rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPrev), &iNext);
    if( rc!=LSM_OK ) return rc;
    iApp = fsFirstPageOnBlock(pFS, iNext);
................................................................................
    if( fsIsLast(pFS, iApp) ){
      lsmPutU32(&pPg->aData[pFS->nPagesize-4], iNext);
    }else 
      if( fsIsFirst(pFS, iApp) ){
        lsmPutU32(&pPg->aData[pFS->nPagesize-4], iPrev);
      }
  }


  *ppOut = pPg;
  return rc;
}

/*
** Mark the sorted run passed as the second argument as finished. 
................................................................................
/*
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}































/*
** Obtain a reference to the last page in the segment passed as the 
** second argument.
*/
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
  Pgno iLast = pSeg->iLastPg;
  if( pFS->pCompress ){
    assert( 0 );

  }
  return fsPageGet(pFS, iLast, 0, ppPg);
}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should
................................................................................
  if( rc==LSM_OK ){
    int nRem;
    int nWrite;
    Pgno iApp = pSeg->iLastPg+1;

    /* If this is the first data written into the segment, find an append-point
    ** or allocate a new block.  */
    if( iApp==0 ){
      iApp = findAppendPoint(pFS);
    }
    if( iApp==0 ){
      int iBlk;
      rc = lsmBlockAllocate(pFS->pDb, &iBlk);
      iApp = fsFirstPageOnBlock(pFS, iBlk);

    }

    iRet = iApp;

    /* Write as much data as is possible at iApp (usually all of it). */
    if( rc==LSM_OK ){
      int nSpace = fsLastPageOnBlock(pFS, fsPageToBlock(iApp)) - iApp - 1;
      nWrite = LSM_MAX(nData, nSpace);
      nRem = nData - nWrite;
      rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite);

      iApp += nWrite;
    }

    /* If required, allocate a new block and write the rest of the data
................................................................................
*/
int lsmFsPagePersist(Page *pPg){
  int rc = LSM_OK;
  if( pPg && (pPg->flags & PAGE_DIRTY) ){
    FileSystem *pFS = pPg->pFS;

    if( pFS->pCompress ){

      u8 aVarint[10];             /* pPg->nCompress as a varint */
      int nVarint;                /* Length of varint stored in aVarint[] */
      assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 );

      /* Compress the page image. */
      rc = fsCompressIntoBuffer(pFS, pPg);

      /* Serialize the compressed size into buffer aVarint[] */
      nVarint = lsmVarintPut64(aVarint, pPg->nCompress);
      aVarint[nVarint] = aVarint[0];

      /* Write the serialized page record into the database file. */
      pPg->iPg = fsAppendData(pSeg, aVarint, nVarint, &rc);
      fsAppendData(pSeg, pFS->aBuffer, pPg->nCompress, &rc);
      fsAppendData(pSeg, &aVarint[1], nVarint, &rc);








    }else{
      i64 iOff;                   /* Offset to write within database file */
      iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1);
      if( pFS->bUseMmap==0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, pPg->aData,pFS->nPagesize);
      }else if( pPg->flags & PAGE_FREE ){
................................................................................
** This function also checks that there are no references to blocks with
** out-of-range block numbers.
**
** If no errors are found, non-zero is returned. If an error is found, an
** assert() fails.
*/
int lsmFsIntegrityCheck(lsm_db *pDb){


  int i;
  int j;
  Freelist freelist = {0, 0, 0};
  FileSystem *pFS = pDb->pFS;
  u8 *aUsed;
  Level *pLevel;
  Snapshot *pWorker = pDb->pWorker;
  int nBlock = pWorker->nBlock;

  aUsed = lsmMallocZero(pDb->pEnv, nBlock);
  if( aUsed==0 ){
................................................................................
    }
  }

  for(i=0; i<nBlock; i++) assert( aUsed[i]==1 );

  lsmFree(pDb->pEnv, aUsed);
  lsmFree(pDb->pEnv, freelist.aEntry);


  return 1;
}

#ifndef NDEBUG
/*
** Return true if pPg happens to be the last page in segment pSeg. Or false
** otherwise. This function is only invoked as part of assert() conditions.
*/
int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg){
  if( pPg->pFS->pCompress ){
    Pgno iNext = 0;
    rc = fsNextPageOffset(pSeg, pPg, &iNext);
    return (rc!=LSM_OK || iNext==0);
  }
  return (pPg->iPg==pSeg->iLastPg);
}
#endif







>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|





|


|


|





|


|


|







 







>
>
>







 







>
>
>
>





|

|







 







>
>
>
>



|






>
>
>
>










>
>


<
<
<
<
|
>
|
|
<



>
>


<
<
<
<
|
<
>
>
|



<







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>












>




|








|







 







>







 







|
>
>







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|







|







 







|
>
>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>








|
>







 







|
|
<



|
>






|
|







 







>












|
|
|
>
>
>
>
>
>
>







 







>
>



<







 







>
>











|





167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
...
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
...
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
...
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
...
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
...
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670




671
672
673
674

675
676
677
678
679
680
681




682

683
684
685
686
687
688

689
690
691
692
693
694
695
...
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
...
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
...
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
....
1051
1052
1053
1054
1055
1056
1057

































1058
1059
1060
1061
1062
1063
1064
....
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
....
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
....
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
....
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
....
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
....
1603
1604
1605
1606
1607
1608
1609
1610
1611

1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
....
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
....
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967

1968
1969
1970
1971
1972
1973
1974
....
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
  LsmFile *pLsmFile;
  lsm_file *fdDb;                 /* Database file */
  lsm_file *fdLog;                /* Log file */

  /* If this is a compressed database, a pointer to the compression methods.
  ** For an uncompressed database, a NULL pointer.  */
  lsm_compress *pCompress;
  u8 *aBuffer;                    /* Buffer to compress into */

  /* mmap() mode things */
  int bUseMmap;                   /* True to use mmap() to access db file */
  void *pMap;                     /* Current mapping of database file */
  i64 nMap;                       /* Bytes mapped at pMap */
  Page *pFree;

................................................................................

/*
** Number of pgsz byte pages omitted from the start of block 1. The start
** of block 1 contains two 4096 byte meta pages (8192 bytes in total).
*/
#define BLOCK1_HDR_SIZE(pgsz)  LSM_MAX(1, 8192/(pgsz))

/*
** If NDEBUG is not defined, set a breakpoint in function lsmIoerrBkpt()
** to catch IO errors. 
*/
#ifndef NDEBUG
static int lsmIoerrBkpt(){
  static int nErr = 0;
  nErr++;
}
static int IOERR_WRAPPER(int rc){
  if( rc!=LSM_OK ) lsmIoerrBkpt();
  return rc;
}
#else
# define IOERR_WRAPPER(rc) (rc)
#endif

/*
** Wrappers around the VFS methods of the lsm_env object:
**
**     lsmEnvOpen()
**     lsmEnvRead()
**     lsmEnvWrite()
................................................................................
static int lsmEnvRead(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  lsm_i64 iOff, 
  void *pRead, 
  int nRead
){
  return IOERR_WRAPPER( pEnv->xRead(pFile, iOff, pRead, nRead) );
}
static int lsmEnvWrite(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  lsm_i64 iOff, 
  const void *pWrite, 
  int nWrite
){
  return IOERR_WRAPPER( pEnv->xWrite(pFile, iOff, (void *)pWrite, nWrite) );
}
static int lsmEnvSync(lsm_env *pEnv, lsm_file *pFile){
  return IOERR_WRAPPER( pEnv->xSync(pFile) );
}
static int lsmEnvSectorSize(lsm_env *pEnv, lsm_file *pFile){
  return pEnv->xSectorSize(pFile);
}
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile){
  return IOERR_WRAPPER( pEnv->xClose(pFile) );
}
static int lsmEnvTruncate(lsm_env *pEnv, lsm_file *pFile, lsm_i64 nByte){
  return IOERR_WRAPPER( pEnv->xTruncate(pFile, nByte) );
}
static int lsmEnvUnlink(lsm_env *pEnv, const char *zDel){
  return IOERR_WRAPPER( pEnv->xUnlink(pEnv, zDel) );
}
static int lsmEnvRemap(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  i64 szMin,
  void **ppMap,
  i64 *pszMap
................................................................................
    pFS->zLog = &pFS->zDb[nDb+1];
    pFS->nPagesize = LSM_DFLT_PAGE_SIZE;
    pFS->nBlocksize = LSM_DFLT_BLOCK_SIZE;
    pFS->nMetasize = 4 * 1024;
    pFS->pDb = pDb;
    pFS->pEnv = pDb->pEnv;
    pFS->bUseMmap = pDb->bMmap;
    if( pDb->compress.xCompress ){
      pFS->pCompress = &pDb->compress;
    }

    /* Make a copy of the database and log file names. */
    memcpy(pFS->zDb, zDb, nDb+1);
    memcpy(pFS->zLog, zDb, nDb);
    memcpy(&pFS->zLog[nDb], "-log", 5);

    /* Allocate the hash-table here. At some point, it should be changed
................................................................................
void lsmFsSetBlockSize(FileSystem *pFS, int nBlocksize){
  pFS->nBlocksize = nBlocksize;
}

/*
** Return the page number of the first page on block iBlock. Blocks are
** numbered starting from 1.
**
** For a compressed database, page numbers are byte offsets. The first
** page on each block is the byte offset immediately following the 4-byte
** "previous block" pointer at the start of each block.
*/
static Pgno fsFirstPageOnBlock(FileSystem *pFS, int iBlock){
  Pgno iPg;
  if( pFS->pCompress ){
    if( iBlock==1 ){
      iPg = pFS->nMetasize * 2 + 4;
    }else{
      iPg = pFS->nBlocksize * (Pgno)(iBlock-1) + 4;
    }
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    if( iBlock==1 ){
      iPg = 1 + ((pFS->nMetasize*2 + pFS->nPagesize - 1) / pFS->nPagesize);
    }else{
      iPg = 1 + (iBlock-1) * nPagePerBlock;
................................................................................
  }
  return iPg;
}

/*
** Return the page number of the last page on block iBlock. Blocks are
** numbered starting from 1.
**
** For a compressed database, page numbers are byte offsets. The first
** page on each block is the byte offset of the byte immediately before 
** the 4-byte "next block" pointer at the end of each block.
*/
static Pgno fsLastPageOnBlock(FileSystem *pFS, int iBlock){
  if( pFS->pCompress ){
    return pFS->nBlocksize * (Pgno)iBlock - 1 - 4;
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    return iBlock * nPagePerBlock;
  }
}

/*
** Return the block number of the block that page iPg is located on. 
** Blocks are numbered starting from 1.
*/
static int fsPageToBlock(FileSystem *pFS, Pgno iPg){
  if( pFS->pCompress ){
    return (iPg / pFS->nBlocksize) + 1;
  }else{
    return 1 + ((iPg-1) / (pFS->nBlocksize / pFS->nPagesize));
  }
}

/*
** Return true if page iPg is the last page on its block.
**
** This function is only called in non-compressed database mode.
*/
static int fsIsLast(FileSystem *pFS, Pgno iPg){




  const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
  assert( !pFS->pCompress );
  return ( iPg && (iPg % nPagePerBlock)==0 );
}


/*
** Return true if page iPg is the first page on its block.
**
** This function is only called in non-compressed database mode.
*/
static int fsIsFirst(FileSystem *pFS, Pgno iPg){




  const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);

  assert( !pFS->pCompress );

  return ( (iPg % nPagePerBlock)==1
        || (iPg<nPagePerBlock && iPg==fsFirstPageOnBlock(pFS, 1))
  );
}


/*
** Given a page reference, return a pointer to the in-memory buffer of the
** pages contents. If parameter pnData is not NULL, set *pnData to the size
** of the buffer in bytes before returning.
*/
u8 *lsmFsPageData(Page *pPage, int *pnData){
................................................................................
        pFix->aData = &aData[pFS->nPagesize * (i64)(pFix->iPg-1)];
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}

static int fsPageGet(FileSystem *, Pgno, int, Page **);

/*
** Parameter iBlock is a database file block. This function reads the value 
** stored in the blocks "next block" pointer and stores it in *piNext.
** LSM_OK is returned if everything is successful, or an LSM error code
** otherwise.
*/
static int fsBlockNext(
  FileSystem *pFS,                /* File-system object handle */
  int iBlock,                     /* Read field from this block */
  int *piNext                     /* OUT: Next block in linked list */
){
  int rc;

  assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
  if( pFS->pCompress ){
    i64 iOff;                     /* File offset to read data from */
    u8 aNext[4];                  /* 4-byte pointer read from db file */

    iOff = (i64)iBlock * pFS->nBlocksize - sizeof(aNext);
    rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aNext, sizeof(aNext));
    if( rc==LSM_OK ){
      *piNext = (int)lsmGetU32(aNext);
    }
  }else{
    const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize);
    Page *pLast;
    rc = fsPageGet(pFS, iBlock*nPagePerBlock, 0, &pLast);
    if( rc==LSM_OK ){
      *piNext = fsPageToBlock(pFS, lsmGetU32(&pLast->aData[pFS->nPagesize-4]));
      lsmFsPageRelease(pLast);
    }
  }
  return rc;
}

/*
** This function is only called in compressed database mode.
*/
static int fsReadData(
  FileSystem *pFS,                /* File-system handle */
  i64 iOff,                       /* Read data from this offset */
  u8 *aData,                      /* Buffer to read data into */
  int nData                       /* Number of bytes to read */
){
  i64 iEob;                       /* End of block */
  int nRead;
  int rc;

  assert( pFS->pCompress );

  iEob = fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iOff)) + 1;
  nRead = LSM_MIN(iEob - iOff, nData);

  rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aData, nRead);
  if( rc==LSM_OK && nRead!=nData ){
    int iBlk;

    rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk);
    if( rc==LSM_OK ){
      i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk);
      rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff2, &aData[nRead], nData-nRead);
    }
  }

  return rc;
}

/*
................................................................................
*/
static int fsReadPagedata(
  FileSystem *pFS,                /* File-system handle */
  Page *pPg                       /* Page to read and uncompress data for */
){
  i64 iOff;
  u8 aVarint[9];
  int rc;

  assert( pFS->pCompress && pPg->nCompress==0 );

  iOff = pPg->iPg;
  rc = fsReadData(pFS, iOff, aVarint, sizeof(aVarint));
  if( rc==LSM_OK ){
    iOff += lsmVarintGet32(aVarint, &pPg->nCompress);
................................................................................
    if( p==0 ){
      rc = fsPageBuffer(pFS, 1, &p);
      if( rc==LSM_OK ){
        p->iPg = iPg;
        p->nRef = 0;
        p->pFS = pFS;
        assert( p->flags==0 || p->flags==PAGE_FREE );
        if( pFS->pCompress==0 && (fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg)) ){
          p->flags |= PAGE_SHORT;
        }

#ifdef LSM_DEBUG
        memset(p->aData, 0x56, pFS->nPagesize);
#endif
        assert( p->pLruNext==0 && p->pLruPrev==0 );
        if( noContent==0 ){
          if( pFS->pCompress ){
................................................................................
  }else{
    assert( 0 );
  }
  return rc;
}



































static int fsRunEndsBetween(
  Segment *pRun, 
  Segment *pIgnore, 
  Pgno iFirst, 
  Pgno iLast
){
  return (pRun!=pIgnore && (
................................................................................
  int nByte;

  assert( pFS->pCompress );

  iEob = 1 + fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iPg));
  nByte = 2 * lsmVarintLen32(pPg->nCompress) + pPg->nCompress;

  if( pSeg && (iPg + nByte)<=iEob && (iPg + nByte - 1)==pSeg->iLastPg ){
    *piNext = 0;
  }else if( (iPg + nByte)>=iEob ){
    int iNext;
    Pgno iNextPg;

    rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPg), &iNext);
    iNextPg = fsFirstPageOnBlock(pFS, iNext) + (nByte - (iEob-iPg));
    if( pSeg && pSeg->iLastPg==(iNextPg-1) ){
      iNextPg = 0;
    }
    *piNext = iNextPg;
  }else{
    *piNext = iPg + nByte;
  }

................................................................................
  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;

  if( pFS->pCompress ){
    if( eDir<0 ){
      assert( 0 );
    }else{
      int rc = fsNextPageOffset(pRun, pPg, &iPg);
      if( rc!=LSM_OK || iPg==0 ){
        *ppNext = 0;
        return rc;
      }
    }
  }else{
    assert( eDir==1 || eDir==-1 );
    if( eDir<0 ){
      if( pRun && iPg==pRun->iFirst ){
        *ppNext = 0;
        return LSM_OK;
................................................................................
  int rc = LSM_OK;
  Page *pPg = 0;
  *ppOut = 0;
  int iApp = 0;
  int iNext = 0;
  int iPrev = p->iLastPg;

  if( pFS->pCompress ){
    /* In compressed database mode the page is not assigned a page number
    ** or location in the database file at this point. This will be done
    ** by the lsmFsPagePersist() call.  */
    rc = fsPageBuffer(pFS, 1, &pPg);
    if( rc==LSM_OK ){
      pPg->pFS = pFS;
      pPg->pSeg = p;
      pPg->iPg = 0;
      pPg->flags = PAGE_DIRTY;
      pPg->nData = pFS->nPagesize;
      assert( pPg->aData );

      pPg->nRef = 1;
      pFS->nOut++;
    }
  }else{
    if( iPrev==0 ){
      iApp = findAppendPoint(pFS);
    }else if( fsIsLast(pFS, iPrev) ){
      int iNext;
      rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPrev), &iNext);
      if( rc!=LSM_OK ) return rc;
      iApp = fsFirstPageOnBlock(pFS, iNext);
................................................................................
      if( fsIsLast(pFS, iApp) ){
        lsmPutU32(&pPg->aData[pFS->nPagesize-4], iNext);
      }else 
        if( fsIsFirst(pFS, iApp) ){
          lsmPutU32(&pPg->aData[pFS->nPagesize-4], iPrev);
        }
    }
  }

  *ppOut = pPg;
  return rc;
}

/*
** Mark the sorted run passed as the second argument as finished. 
................................................................................
/*
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}

static int fsReadReverseVarint32(FileSystem *pFS, Pgno iPg, int *pnVal){
  int rc;
  Pgno iFirst;

  iFirst = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iPg));
  if( (iPg - iFirst)<4 ){
    int nRead = 4 + (1 + iPg - iFirst);
    u8 aRead[5 + 4];              /* Space for varint + ptr */
    rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iFirst-4, aRead, nRead);

  }else{
    u8 aRead[5];                  /* Space for varint + ptr */

    rc = lsmEnvRead(
        pFS->pEnv, pFS->fdDb, iPg+1-sizeof(aRead), aRead, sizeof(aRead)
    );

    if( aRead[4]<=240 ){
      *pnVal = aRead[4];
    }else if( aRead[4]<=248 ){
      *pnVal = 240 + 256 * (aRead[4]-241) + aRead[3];
    }else{
      *pnVal = ((int)(aRead[1])<<16) + ((int)(aRead[2])<<8) + (int)(aRead[3]);
      if( aRead[4]==250 ) *pnVal += (((int)aRead[0]) << 24);
    }

  }
  return rc;
}

/*
** Obtain a reference to the last page in the segment passed as the 
** second argument.
*/
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
  Pgno iLast = pSeg->iLastPg;
  if( pFS->pCompress ){
    int nCompress;
    rc = fsReadReverseVarint32(pFS, iLast, &nCompress);
  }
  return fsPageGet(pFS, iLast, 0, ppPg);
}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should
................................................................................
  if( rc==LSM_OK ){
    int nRem;
    int nWrite;
    Pgno iApp = pSeg->iLastPg+1;

    /* If this is the first data written into the segment, find an append-point
    ** or allocate a new block.  */
    if( iApp==1 ){
      pSeg->iFirst = iApp = findAppendPoint(pFS);

      if( iApp==0 ){
        int iBlk;
        rc = lsmBlockAllocate(pFS->pDb, &iBlk);
        pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk);
      }
    }

    iRet = iApp;

    /* Write as much data as is possible at iApp (usually all of it). */
    if( rc==LSM_OK ){
      int nSpace = fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iApp)) - iApp + 1;
      nWrite = LSM_MIN(nData, nSpace);
      nRem = nData - nWrite;
      rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite);

      iApp += nWrite;
    }

    /* If required, allocate a new block and write the rest of the data
................................................................................
*/
int lsmFsPagePersist(Page *pPg){
  int rc = LSM_OK;
  if( pPg && (pPg->flags & PAGE_DIRTY) ){
    FileSystem *pFS = pPg->pFS;

    if( pFS->pCompress ){
      int iHash;                  /* Hash key of assigned page number */
      u8 aVarint[10];             /* pPg->nCompress as a varint */
      int nVarint;                /* Length of varint stored in aVarint[] */
      assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 );

      /* Compress the page image. */
      rc = fsCompressIntoBuffer(pFS, pPg);

      /* Serialize the compressed size into buffer aVarint[] */
      nVarint = lsmVarintPut64(aVarint, pPg->nCompress);
      aVarint[nVarint] = aVarint[0];

      /* Write the serialized page record into the database file. */
      pPg->iPg = fsAppendData(pFS, pPg->pSeg, aVarint, nVarint, &rc);
      fsAppendData(pFS, pPg->pSeg, pFS->aBuffer, pPg->nCompress, &rc);
      fsAppendData(pFS, pPg->pSeg, &aVarint[1], nVarint, &rc);

      /* Now that it has a page number, insert the page into the hash table */
      iHash = fsHashKey(pFS->nHash, pPg->iPg);
      pPg->pHashNext = pFS->apHash[iHash];
      pFS->apHash[iHash] = pPg;

      pPg->pSeg->nSize += (nVarint * 2) + pPg->nCompress;

    }else{
      i64 iOff;                   /* Offset to write within database file */
      iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1);
      if( pFS->bUseMmap==0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, pPg->aData,pFS->nPagesize);
      }else if( pPg->flags & PAGE_FREE ){
................................................................................
** This function also checks that there are no references to blocks with
** out-of-range block numbers.
**
** If no errors are found, non-zero is returned. If an error is found, an
** assert() fails.
*/
int lsmFsIntegrityCheck(lsm_db *pDb){
  FileSystem *pFS = pDb->pFS;
  if( pFS->pCompress==0 ){
    int i;
    int j;
    Freelist freelist = {0, 0, 0};

    u8 *aUsed;
    Level *pLevel;
    Snapshot *pWorker = pDb->pWorker;
    int nBlock = pWorker->nBlock;

    aUsed = lsmMallocZero(pDb->pEnv, nBlock);
    if( aUsed==0 ){
................................................................................
      }
    }

    for(i=0; i<nBlock; i++) assert( aUsed[i]==1 );

    lsmFree(pDb->pEnv, aUsed);
    lsmFree(pDb->pEnv, freelist.aEntry);
  }

  return 1;
}

#ifndef NDEBUG
/*
** Return true if pPg happens to be the last page in segment pSeg. Or false
** otherwise. This function is only invoked as part of assert() conditions.
*/
int lsmFsDbPageIsLast(Segment *pSeg, Page *pPg){
  if( pPg->pFS->pCompress ){
    Pgno iNext = 0;
    int rc = fsNextPageOffset(pSeg, pPg, &iNext);
    return (rc!=LSM_OK || iNext==0);
  }
  return (pPg->iPg==pSeg->iLastPg);
}
#endif

Changes to src/lsm_main.c.

319
320
321
322
323
324
325

















326
327
328
329
330
331
332
        ** in multi-process mode.  */
        *piVal = lsmDbMultiProc(pDb);
      }else{
        pDb->bMultiProc = *piVal = (*piVal!=0);
      }
      break;
    }


















    default:
      rc = LSM_MISUSE;
      break;
  }

  va_end(ap);







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
        ** in multi-process mode.  */
        *piVal = lsmDbMultiProc(pDb);
      }else{
        pDb->bMultiProc = *piVal = (*piVal!=0);
      }
      break;
    }

    case LSM_CONFIG_SET_COMPRESSION: {
      int *p = va_arg(ap, lsm_compress *);
      if( pDb->pDatabase ){
        /* If lsm_open() has been called, this call is against the rules. */
        rc = LSM_MISUSE_BKPT;
      }else{
        memcpy(&pDb->compress, p, sizeof(lsm_compress));
      }
      break;
    }

    case LSM_CONFIG_GET_COMPRESSION: {
      int *p = va_arg(ap, lsm_compress *);
      memcpy(p, &pDb->compress, sizeof(lsm_compress));
      break;
    }

    default:
      rc = LSM_MISUSE;
      break;
  }

  va_end(ap);

Changes to src/lsm_sorted.c.

1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
....
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
....
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
....
4264
4265
4266
4267
4268
4269
4270
4271
4272
4273
4274
4275
4276
4277
4278
4279
  */
#if 0
  assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
#endif

  assert( pPtr->nCell>0 
       || pPtr->pSeg->nSize==1 
       || lsmFsDbPageIsLast(pPtr->pPg, pPtr->pSeg)
  );
  if( pPtr->nCell==0 ){
    segmentPtrReset(pPtr);
  }else{
    iMin = 0;
    iMax = pPtr->nCell-1;

................................................................................
  int rc = LSM_OK;                /* Return code */
  Page *pNext = 0;                /* New page appended to run */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  Segment *pSeg;                  /* Run to append to */

  pSeg = &pMW->pLevel->lhs;
  rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext);
  assert( rc!=LSM_OK || pSeg->iFirst>0 );

  if( rc==LSM_OK ){
    u8 *aData;                    /* Data buffer belonging to page pNext */
    int nData;                    /* Size of aData[] in bytes */

    rc = mergeWorkerPersistAndRelease(pMW);

................................................................................
  if( rc==LSM_OK ){
    if( pDel ) pDel->iRoot = 0;
  }else{
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
#endif

  if( rc==LSM_OK ){
    assertBtreeOk(pDb, &pNew->lhs);
    sortedInvokeWorkHook(pDb);
  }

................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */







|







 







|







 







|
|







 







|
|







1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
....
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
....
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
....
4264
4265
4266
4267
4268
4269
4270
4271
4272
4273
4274
4275
4276
4277
4278
4279
  */
#if 0
  assert( assertKeyLocation(pCsr, pPtr, pKey, nKey) );
#endif

  assert( pPtr->nCell>0 
       || pPtr->pSeg->nSize==1 
       || lsmFsDbPageIsLast(pPtr->pSeg, pPtr->pPg)
  );
  if( pPtr->nCell==0 ){
    segmentPtrReset(pPtr);
  }else{
    iMin = 0;
    iMax = pPtr->nCell-1;

................................................................................
  int rc = LSM_OK;                /* Return code */
  Page *pNext = 0;                /* New page appended to run */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  Segment *pSeg;                  /* Run to append to */

  pSeg = &pMW->pLevel->lhs;
  rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext);
  assert( rc!=LSM_OK || pSeg->iFirst>0 || pMW->pDb->compress.xCompress );

  if( rc==LSM_OK ){
    u8 *aData;                    /* Data buffer belonging to page pNext */
    int nData;                    /* Size of aData[] in bytes */

    rc = mergeWorkerPersistAndRelease(pMW);

................................................................................
  if( rc==LSM_OK ){
    if( pDel ) pDel->iRoot = 0;
  }else{
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }

#if 1
  lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

  if( rc==LSM_OK ){
    assertBtreeOk(pDb, &pNew->lhs);
    sortedInvokeWorkHook(pDb);
  }

................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 1
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */