Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Various fixes.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: 728d8cf5ae8a051841c6a6afe6fd02203e482514
User & Date: dan 2012-10-24 18:26:11.919
Context
2012-10-24
18:33
Fix memory leaks in compressed database mode. check-in: 083e3a6c0f user: dan tags: compression-hooks
18:26
Various fixes. check-in: 728d8cf5ae user: dan tags: compression-hooks
2012-10-23
19:54
Fixes for compressed database mode. Some test cases pass. Many do not. check-in: 90c2fae338 user: dan tags: compression-hooks
Changes
Unified Diff Ignore Whitespace Patch
Changes to lsm-test/lsmtest1.c.
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
      memcpy(pKey1, pKey2, nKey1+1);
      testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0);

      testScanCompare(pDb2, pDb, 0, 0, 0,         0, 0,         &rc);
      testScanCompare(pDb2, pDb, 0, 0, 0,         pKey2, nKey2, &rc);
      testScanCompare(pDb2, pDb, 0, pKey1, nKey1, 0, 0,         &rc);
      testScanCompare(pDb2, pDb, 0, pKey1, nKey1, pKey2, nKey2, &rc);
#if 0
      testScanCompare(pDb2, pDb, 1, 0, 0,         0, 0,         &rc);
      testScanCompare(pDb2, pDb, 1, 0, 0,         pKey2, nKey2, &rc);
      testScanCompare(pDb2, pDb, 1, pKey1, nKey1, 0, 0,         &rc);
      testScanCompare(pDb2, pDb, 1, pKey1, nKey1, pKey2, nKey2, &rc);
#endif
      testFree(pKey1);
    }
    tdb_close(pDb2);
  }

  /* Test some lookups. */
  for(j=0; rc==0 && j<nLookupTest; j++){







<




<







189
190
191
192
193
194
195

196
197
198
199

200
201
202
203
204
205
206
      memcpy(pKey1, pKey2, nKey1+1);
      testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0);

      testScanCompare(pDb2, pDb, 0, 0, 0,         0, 0,         &rc);
      testScanCompare(pDb2, pDb, 0, 0, 0,         pKey2, nKey2, &rc);
      testScanCompare(pDb2, pDb, 0, pKey1, nKey1, 0, 0,         &rc);
      testScanCompare(pDb2, pDb, 0, pKey1, nKey1, pKey2, nKey2, &rc);

      testScanCompare(pDb2, pDb, 1, 0, 0,         0, 0,         &rc);
      testScanCompare(pDb2, pDb, 1, 0, 0,         pKey2, nKey2, &rc);
      testScanCompare(pDb2, pDb, 1, pKey1, nKey1, 0, 0,         &rc);
      testScanCompare(pDb2, pDb, 1, pKey1, nKey1, pKey2, nKey2, &rc);

      testFree(pKey1);
    }
    tdb_close(pDb2);
  }

  /* Test some lookups. */
  for(j=0; rc==0 && j<nLookupTest; j++){
Changes to src/lsm_file.c.
96
97
98
99
100
101
102
103

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120


121
122
123
124
125
126
127
** the first and last page of each block in uncompressed databases. From
** the point of view of the upper layer, all pages are the same size - this
** is different from the uncompressed format where the first and last pages
** on each block are 4 bytes smaller than the others.
**
** Pages are stored in variable length compressed form, as follows:
**
**     * Number of bytes in compressed page image, as a varint.

**
**     * Compressed page image.
**
**     * Number of bytes in compressed page image, as a varint. Except,
**       the first byte of the varint is moved so that it is the last
**       byte (i.e. for a 4 byte varint: ABCD -> BCDA). This is done
**       to make it possible to iterate through a packed array of compressed 
**       pages in reverse order.
**
** A page number is a byte offset into the database file. So the smallest
** possible page number is 8192 (immediately after the two meta-pages).
** The first and root page of a segment are identified by a page number
** corresponding to the byte offset of the first byte in the corresponding
** page record. The last page of a segment is identified by the byte offset
** of the last byte in its record.
**
** Unlike uncompressed pages, compressed page records may span blocks.


**
** Sometimes, in order to avoid touching sectors that contain synced data
** when writing, it is necessary to insert unused space between compressed
** page records. This can be done as follows:
**
**     * For less than 4 bytes of empty space, a series of 0x00 bytes.
**







|
>



|
<
<
<
|









>
>







96
97
98
99
100
101
102
103
104
105
106
107
108



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
** the first and last page of each block in uncompressed databases. From
** the point of view of the upper layer, all pages are the same size - this
** is different from the uncompressed format where the first and last pages
** on each block are 4 bytes smaller than the others.
**
** Pages are stored in variable length compressed form, as follows:
**
**     * Number of bytes in compressed page image, as a 3-byte big-endian
**       integer.
**
**     * Compressed page image.
**
**     * The number of bytes in the compressed page image, again as a 3-byte



**       big-endian integer.
**
** A page number is a byte offset into the database file. So the smallest
** possible page number is 8192 (immediately after the two meta-pages).
** The first and root page of a segment are identified by a page number
** corresponding to the byte offset of the first byte in the corresponding
** page record. The last page of a segment is identified by the byte offset
** of the last byte in its record.
**
** Unlike uncompressed pages, compressed page records may span blocks.
**
** TODO:
**
** Sometimes, in order to avoid touching sectors that contain synced data
** when writing, it is necessary to insert unused space between compressed
** page records. This can be done as follows:
**
**     * For less than 4 bytes of empty space, a series of 0x00 bytes.
**
208
209
210
211
212
213
214
215
216
217
218
219
220





221
222
223
224
225
226
227
*/
struct Page {
  u8 *aData;                      /* Buffer containing page data */
  int nData;                      /* Bytes of usable data at aData[] */
  Pgno iPg;                       /* Page number */
  int nRef;                       /* Number of outstanding references */
  int flags;                      /* Combination of PAGE_XXX flags */
  int nCompress;                  /* Compressed size (or 0 for uncomp. db) */
  Segment *pSeg;                  /* Segment this page will be written to */
  Page *pHashNext;                /* Next page in hash table slot */
  Page *pLruNext;                 /* Next page in LRU list */
  Page *pLruPrev;                 /* Previous page in LRU list */
  FileSystem *pFS;                /* File system that owns this page */





};

/*
** Meta-data page handle. There are two meta-data pages at the start of
** the database file, each FileSystem.nMetasize bytes in size.
*/
struct MetaPage {







<
<




>
>
>
>
>







208
209
210
211
212
213
214


215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
*/
struct Page {
  u8 *aData;                      /* Buffer containing page data */
  int nData;                      /* Bytes of usable data at aData[] */
  Pgno iPg;                       /* Page number */
  int nRef;                       /* Number of outstanding references */
  int flags;                      /* Combination of PAGE_XXX flags */


  Page *pHashNext;                /* Next page in hash table slot */
  Page *pLruNext;                 /* Next page in LRU list */
  Page *pLruPrev;                 /* Previous page in LRU list */
  FileSystem *pFS;                /* File system that owns this page */

  /* Only used in compressed database mode: */
  int nCompress;                  /* Compressed size (or 0 for uncomp. db) */
  int nCompressPrev;              /* Compressed size of prev page */
  Segment *pSeg;                  /* Segment this page will be written to */
};

/*
** Meta-data page handle. There are two meta-data pages at the start of
** the database file, each FileSystem.nMetasize bytes in size.
*/
struct MetaPage {
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#define BLOCK1_HDR_SIZE(pgsz)  LSM_MAX(1, 8192/(pgsz))

/*
** If NDEBUG is not defined, set a breakpoint in function lsmIoerrBkpt()
** to catch IO errors. 
*/
#ifndef NDEBUG
static int lsmIoerrBkpt(){
  static int nErr = 0;
  nErr++;
}
static int IOERR_WRAPPER(int rc){
  if( rc!=LSM_OK ) lsmIoerrBkpt();
  return rc;
}







|







248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
#define BLOCK1_HDR_SIZE(pgsz)  LSM_MAX(1, 8192/(pgsz))

/*
** If NDEBUG is not defined, set a breakpoint in function lsmIoerrBkpt()
** to catch IO errors. 
*/
#ifndef NDEBUG
static void lsmIoerrBkpt(){
  static int nErr = 0;
  nErr++;
}
static int IOERR_WRAPPER(int rc){
  if( rc!=LSM_OK ) lsmIoerrBkpt();
  return rc;
}
895
896
897
898
899
900
901













































































902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920

921

922
923

924
925
926
927
928
929
930
931
      i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk);
      rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff2, &aData[nRead], nData-nRead);
    }
  }

  return rc;
}














































































/*
** This function is only called in compressed database mode. It reads and
** uncompresses the compressed data for page pPg from the database and
** populates the pPg->aData[] buffer and pPg->nCompress field.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
static int fsReadPagedata(
  FileSystem *pFS,                /* File-system handle */
  Page *pPg                       /* Page to read and uncompress data for */
){
  i64 iOff;
  u8 aVarint[9];
  int rc;

  assert( pFS->pCompress && pPg->nCompress==0 );

  iOff = pPg->iPg;

  rc = fsReadData(pFS, iOff, aVarint, sizeof(aVarint));

  if( rc==LSM_OK ){
    iOff += lsmVarintGet32(aVarint, &pPg->nCompress);

    rc = fsReadData(pFS, iOff, pPg->aData, pPg->nCompress);
  }
  return rc;
}

/*
** Return a handle for a database page.
*/







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>













|



<

>
|
>

|
>
|







898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998

999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
      i64 iOff2 = fsFirstPageOnBlock(pFS, iBlk);
      rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff2, &aData[nRead], nData-nRead);
    }
  }

  return rc;
}

/*
** Parameter iBlock is a database file block. This function reads the value 
** stored in the blocks "previous block" pointer and stores it in *piPrev.
** LSM_OK is returned if everything is successful, or an LSM error code
** otherwise.
*/
static int fsBlockPrev(
  FileSystem *pFS,                /* File-system object handle */
  int iBlock,                     /* Read field from this block */
  int *piPrev                     /* OUT: Previous block in linked list */
){
  int rc = LSM_OK;                /* Return code */

  assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
  assert( iBlock>0 );

  if( pFS->pCompress ){
    i64 iOff = fsFirstPageOnBlock(pFS, iBlock) - 4;
    u8 aPrev[4];                  /* 4-byte pointer read from db file */
    rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aPrev, sizeof(aPrev));
    if( rc==LSM_OK ){
      *piPrev = (int)lsmGetU32(aPrev);
    }
  }else{
    assert( 0 );
  }
  return rc;
}

/*
** Encode and decode routines for 24-bit big-endian integers.
*/
static u32 lsmGetU24(u8 *aBuf){
  return (((u32)aBuf[0]) << 16) + (((u32)aBuf[1]) << 8) + ((u32)aBuf[2]);
}
static void lsmPutU24(u8 *aBuf, u32 iVal){
  aBuf[0] = (u8)(iVal >> 16);
  aBuf[1] = (u8)(iVal >>  8);
  aBuf[2] = (u8)(iVal >>  0);
}

static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){
  i64 iStart;
  int iBlk;
  int rc;

  assert( pFS->pCompress );

  iStart = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iOff));
  if( (iOff-iSub)>=iStart ){
    *piRes = (iOff-iSub);
    return LSM_OK;
  }

  rc = fsBlockPrev(pFS, fsPageToBlock(pFS, iOff), &iBlk);
  *piRes = fsLastPageOnBlock(pFS, iBlk) - iSub + (iOff - iStart + 1);
  return rc;
}

static int fsAddOffset(FileSystem *pFS, i64 iOff, int iAdd, i64 *piRes){
  i64 iEob;
  int iBlk;
  int rc;

  assert( pFS->pCompress );

  iEob = fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iOff));
  if( (iOff+iAdd)<=iEob ){
    *piRes = (iOff+iAdd);
    return LSM_OK;
  }

  rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk);
  *piRes = fsFirstPageOnBlock(pFS, iBlk) + iAdd - (iEob - iOff + 1);
  return rc;
}

/*
** This function is only called in compressed database mode. It reads and
** uncompresses the compressed data for page pPg from the database and
** populates the pPg->aData[] buffer and pPg->nCompress field.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
static int fsReadPagedata(
  FileSystem *pFS,                /* File-system handle */
  Page *pPg                       /* Page to read and uncompress data for */
){
  i64 iOff;
  u8 aSz[6];
  int rc;

  assert( pFS->pCompress && pPg->nCompress==0 );

  iOff = pPg->iPg;

  rc = fsReadData(pFS, iOff, aSz, sizeof(aSz));

  if( rc==LSM_OK ){
    pPg->nCompress = (int)lsmGetU24(aSz);
    rc = fsAddOffset(pFS, iOff, 3, &iOff);
    if( rc==LSM_OK ) rc = fsReadData(pFS, iOff, pPg->aData, pPg->nCompress);
  }
  return rc;
}

/*
** Return a handle for a database page.
*/
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
    assert( (rc==LSM_OK && p) || (rc!=LSM_OK && p==0) );
  }
  if( rc==LSM_OK ){
    pFS->nOut += (p->nRef==0);
    p->nRef++;
  }
  *ppPg = p;
  return rc;
}

/*
** Parameter iBlock is a database file block. This function reads the value 
** stored in the blocks "previous block" pointer and stores it in *piPrev.
** LSM_OK is returned if everything is successful, or an LSM error code
** otherwise.
*/
static int fsBlockPrev(
  FileSystem *pFS,                /* File-system object handle */
  int iBlock,                     /* Read field from this block */
  int *piPrev                     /* OUT: Previous block in linked list */
){
  int rc = LSM_OK;                /* Return code */

  assert( pFS->bUseMmap==0 || pFS->pCompress==0 );
  assert( iBlock>0 );

  if( pFS->pCompress ){
    i64 iOff = (i64)(iBlock-1) * pFS->nBlocksize;
    u8 aPrev[4];                  /* 4-byte pointer read from db file */
    rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iOff, aPrev, sizeof(aPrev));
    if( rc==LSM_OK ){
      *piPrev = (int)lsmGetU32(aPrev);
    }
  }else{
    assert( 0 );
  }
  return rc;
}


static int fsRunEndsBetween(
  Segment *pRun, 
  Segment *pIgnore, 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







1100
1101
1102
1103
1104
1105
1106





























1107
1108
1109
1110
1111
1112
1113
    assert( (rc==LSM_OK && p) || (rc!=LSM_OK && p==0) );
  }
  if( rc==LSM_OK ){
    pFS->nOut += (p->nRef==0);
    p->nRef++;
  }
  *ppPg = p;





























  return rc;
}


static int fsRunEndsBetween(
  Segment *pRun, 
  Segment *pIgnore, 
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226


1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238


1239
1240
1241





1242
1243
1244
1245
1246
1247
1248
  }

  pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  assert( pRun->nSize>0 );
}

static int fsNextPageOffset(Segment *pSeg, Page *pPg, Pgno *piNext){
  int rc = LSM_OK;                /* Return code */
  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;
  i64 iEob;
  int nByte;

  assert( pFS->pCompress );

  iEob = 1 + fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iPg));
  nByte = 2 * lsmVarintLen32(pPg->nCompress) + pPg->nCompress;



  if( pSeg && (iPg + nByte)<=iEob && (iPg + nByte - 1)==pSeg->iLastPg ){
    *piNext = 0;
  }else if( (iPg + nByte)>=iEob ){
    int iNext;
    Pgno iNextPg;

    rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPg), &iNext);
    iNextPg = fsFirstPageOnBlock(pFS, iNext) + (nByte - (iEob-iPg));
    if( pSeg && pSeg->iLastPg==(iNextPg-1) ){
      iNextPg = 0;
    }


    *piNext = iNextPg;
  }else{
    *piNext = iPg + nByte;





  }

  return rc;
}

/*
** The first argument to this function is a valid reference to a database







<
<
|
<
|

|

<
|
>
>
|
|
|
<
|
<
|
<
<
<
<
|
>
>
|
|
|
>
>
>
>
>







1263
1264
1265
1266
1267
1268
1269


1270

1271
1272
1273
1274

1275
1276
1277
1278
1279
1280

1281

1282




1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
  }

  pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  assert( pRun->nSize>0 );
}

static int fsNextPageOffset(Segment *pSeg, Page *pPg, Pgno *piNext){


  Pgno iNext;

  int rc;

  assert( pPg->pFS->pCompress );


  rc = fsAddOffset(pPg->pFS, pPg->iPg, 2*3 + pPg->nCompress, &iNext);
  if( pSeg && pSeg->iLastPg==(iNext-1) ){
    iNext = 0;
  }

  *piNext = iNext;

  return rc;

}





static int fsGetPageBefore(FileSystem *pFS, i64 iOff, Pgno *piPrev){
  u8 aSz[3];
  int rc;
  i64 iRead;

  rc = fsSubtractOffset(pFS, iOff, sizeof(aSz), &iRead);
  if( rc==LSM_OK ) rc = fsReadData(pFS, iRead, aSz, sizeof(aSz));
  if( rc==LSM_OK ){
    int nSz = lsmGetU24(aSz) + sizeof(aSz)*2;
    rc = fsSubtractOffset(pFS, iOff, nSz, piPrev);
  }

  return rc;
}

/*
** The first argument to this function is a valid reference to a database
1267
1268
1269
1270
1271
1272
1273


1274


1275
1276
1277
1278
1279
1280
1281
*/
int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){
  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;

  if( pFS->pCompress ){
    if( eDir<0 ){


      assert( 0 );


    }else{
      int rc = fsNextPageOffset(pRun, pPg, &iPg);
      if( rc!=LSM_OK || iPg==0 ){
        *ppNext = 0;
        return rc;
      }
    }







>
>
|
>
>







1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
*/
int lsmFsDbPageNext(Segment *pRun, Page *pPg, int eDir, Page **ppNext){
  FileSystem *pFS = pPg->pFS;
  Pgno iPg = pPg->iPg;

  if( pFS->pCompress ){
    if( eDir<0 ){
      int rc = LSM_OK;
      if( iPg==pRun->iFirst || (rc = fsGetPageBefore(pFS, iPg, &iPg)) ){
        *ppNext = 0;
        return LSM_OK;
      }
    }else{
      int rc = fsNextPageOffset(pRun, pPg, &iPg);
      if( rc!=LSM_OK || iPg==0 ){
        *ppNext = 0;
        return rc;
      }
    }
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488

1489
1490
1491
1492
1493
1494
1495
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}

static int fsReadReverseVarint32(FileSystem *pFS, Pgno iPg, int *pnVal){
  int rc;
  Pgno iFirst;

  iFirst = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iPg));
  if( (iPg - iFirst)<4 ){
    int nRead = 4 + (1 + iPg - iFirst);
    u8 aRead[5 + 4];              /* Space for varint + ptr */
    rc = lsmEnvRead(pFS->pEnv, pFS->fdDb, iFirst-4, aRead, nRead);

  }else{
    u8 aRead[5];                  /* Space for varint + ptr */

    rc = lsmEnvRead(
        pFS->pEnv, pFS->fdDb, iPg+1-sizeof(aRead), aRead, sizeof(aRead)
    );

    if( aRead[4]<=240 ){
      *pnVal = aRead[4];
    }else if( aRead[4]<=248 ){
      *pnVal = 240 + 256 * (aRead[4]-241) + aRead[3];
    }else{
      *pnVal = ((int)(aRead[1])<<16) + ((int)(aRead[2])<<8) + (int)(aRead[3]);
      if( aRead[4]==250 ) *pnVal += (((int)aRead[0]) << 24);
    }

  }
  return rc;
}

/*
** Obtain a reference to the last page in the segment passed as the 
** second argument.
*/
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
  Pgno iLast = pSeg->iLastPg;
  if( pFS->pCompress ){
    int nCompress;
    rc = fsReadReverseVarint32(pFS, iLast, &nCompress);

  }
  return fsPageGet(pFS, iLast, 0, ppPg);
}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







|
|
>







1499
1500
1501
1502
1503
1504
1505






























1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}































/*
** Obtain a reference to the last page in the segment passed as the 
** second argument.
*/
int lsmFsDbPageLast(FileSystem *pFS, Segment *pSeg, Page **ppPg){
  Pgno iLast = pSeg->iLastPg;
  if( pFS->pCompress ){
    int rc;
    rc = fsGetPageBefore(pFS, iLast+1, &iLast);
    if( rc!=LSM_OK ) return rc;
  }
  return fsPageGet(pFS, iLast, 0, ppPg);
}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
int lsmFsPagePersist(Page *pPg){
  int rc = LSM_OK;
  if( pPg && (pPg->flags & PAGE_DIRTY) ){
    FileSystem *pFS = pPg->pFS;

    if( pFS->pCompress ){
      int iHash;                  /* Hash key of assigned page number */
      u8 aVarint[10];             /* pPg->nCompress as a varint */
      int nVarint;                /* Length of varint stored in aVarint[] */
      assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 );

      /* Compress the page image. */
      rc = fsCompressIntoBuffer(pFS, pPg);

      /* Serialize the compressed size into buffer aVarint[] */
      nVarint = lsmVarintPut64(aVarint, pPg->nCompress);
      aVarint[nVarint] = aVarint[0];

      /* Write the serialized page record into the database file. */
      pPg->iPg = fsAppendData(pFS, pPg->pSeg, aVarint, nVarint, &rc);
      fsAppendData(pFS, pPg->pSeg, pFS->aBuffer, pPg->nCompress, &rc);
      fsAppendData(pFS, pPg->pSeg, &aVarint[1], nVarint, &rc);

      /* Now that it has a page number, insert the page into the hash table */
      iHash = fsHashKey(pFS->nHash, pPg->iPg);
      pPg->pHashNext = pFS->apHash[iHash];
      pFS->apHash[iHash] = pPg;

      pPg->pSeg->nSize += (nVarint * 2) + pPg->nCompress;

    }else{
      i64 iOff;                   /* Offset to write within database file */
      iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1);
      if( pFS->bUseMmap==0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, pPg->aData,pFS->nPagesize);
      }else if( pPg->flags & PAGE_FREE ){







|
<





|
|
<


|

|






|







1727
1728
1729
1730
1731
1732
1733
1734

1735
1736
1737
1738
1739
1740
1741

1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
int lsmFsPagePersist(Page *pPg){
  int rc = LSM_OK;
  if( pPg && (pPg->flags & PAGE_DIRTY) ){
    FileSystem *pFS = pPg->pFS;

    if( pFS->pCompress ){
      int iHash;                  /* Hash key of assigned page number */
      u8 aSz[3];                  /* pPg->nCompress as a 24-bit big-endian */

      assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 );

      /* Compress the page image. */
      rc = fsCompressIntoBuffer(pFS, pPg);

      /* Serialize the compressed size into buffer aSz[] */
      lsmPutU24(aSz, pPg->nCompress);


      /* Write the serialized page record into the database file. */
      pPg->iPg = fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);
      fsAppendData(pFS, pPg->pSeg, pFS->aBuffer, pPg->nCompress, &rc);
      fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);

      /* Now that it has a page number, insert the page into the hash table */
      iHash = fsHashKey(pFS->nHash, pPg->iPg);
      pPg->pHashNext = pFS->apHash[iHash];
      pFS->apHash[iHash] = pPg;

      pPg->pSeg->nSize += (sizeof(aSz) * 2) + pPg->nCompress;

    }else{
      i64 iOff;                   /* Offset to write within database file */
      iOff = (i64)pFS->nPagesize * (i64)(pPg->iPg-1);
      if( pFS->bUseMmap==0 ){
        rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iOff, pPg->aData,pFS->nPagesize);
      }else if( pPg->flags & PAGE_FREE ){
Changes to src/lsm_sorted.c.
2169
2170
2171
2172
2173
2174
2175

2176
2177
2178
2179
2180
2181
2182
2183
2184

2185
2186
2187
2188
2189
2190
2191
static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
  Level *pLvl;
  int nPtr = 0;
  int iPtr = 0;
  int rc = LSM_OK;

  for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){

    nPtr += (1 + pLvl->nRight);
  }

  assert( pCsr->aPtr==0 );
  pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);
  if( rc==LSM_OK ) pCsr->nPtr = nPtr;

  for(pLvl=pSnap->pLevel; pLvl && rc==LSM_OK; pLvl=pLvl->pNext){
    int i;

    pCsr->aPtr[iPtr].pLevel = pLvl;
    pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
    iPtr++;
    for(i=0; i<pLvl->nRight; i++){
      pCsr->aPtr[iPtr].pLevel = pLvl;
      pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
      iPtr++;







>









>







2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
static int multiCursorAddAll(MultiCursor *pCsr, Snapshot *pSnap){
  Level *pLvl;
  int nPtr = 0;
  int iPtr = 0;
  int rc = LSM_OK;

  for(pLvl=pSnap->pLevel; pLvl; pLvl=pLvl->pNext){
    if( pLvl->iAge<0 ) continue;
    nPtr += (1 + pLvl->nRight);
  }

  assert( pCsr->aPtr==0 );
  pCsr->aPtr = lsmMallocZeroRc(pCsr->pDb->pEnv, sizeof(SegmentPtr) * nPtr, &rc);
  if( rc==LSM_OK ) pCsr->nPtr = nPtr;

  for(pLvl=pSnap->pLevel; pLvl && rc==LSM_OK; pLvl=pLvl->pNext){
    int i;
    if( pLvl->iAge<0 ) continue;
    pCsr->aPtr[iPtr].pLevel = pLvl;
    pCsr->aPtr[iPtr].pSeg = &pLvl->lhs;
    iPtr++;
    for(i=0; i<pLvl->nRight; i++){
      pCsr->aPtr[iPtr].pLevel = pLvl;
      pCsr->aPtr[iPtr].pSeg = &pLvl->aRhs[i];
      iPtr++;
3833
3834
3835
3836
3837
3838
3839

3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856

3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
    Merge merge;                  /* Merge object used to create new level */
    MergeWorker mergeworker;      /* MergeWorker object for the same purpose */

    memset(&merge, 0, sizeof(Merge));
    memset(&mergeworker, 0, sizeof(MergeWorker));

    pNew->pMerge = &merge;

    mergeworker.pDb = pDb;
    mergeworker.pLevel = pNew;
    mergeworker.pCsr = pCsr;
    pCsr->pPrevMergePtr = &iLeftPtr;

    /* Mark the separators array for the new level as a "phantom". */
    mergeworker.bFlush = 1;

    /* Do the work to create the new merged segment on disk */
    if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }

    nWrite = mergeworker.nWork;
    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->pMerge = 0;

  }

  /* Link the new level into the top of the tree. */
  if( rc==LSM_OK ){
    if( pDel ) pDel->iRoot = 0;
  }else{
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }

#if 1
  lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

  if( rc==LSM_OK ){
    assertBtreeOk(pDb, &pNew->lhs);
    sortedInvokeWorkHook(pDb);
  }








>

















>










|
|







3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
    Merge merge;                  /* Merge object used to create new level */
    MergeWorker mergeworker;      /* MergeWorker object for the same purpose */

    memset(&merge, 0, sizeof(Merge));
    memset(&mergeworker, 0, sizeof(MergeWorker));

    pNew->pMerge = &merge;
    pNew->iAge = -1;
    mergeworker.pDb = pDb;
    mergeworker.pLevel = pNew;
    mergeworker.pCsr = pCsr;
    pCsr->pPrevMergePtr = &iLeftPtr;

    /* Mark the separators array for the new level as a "phantom". */
    mergeworker.bFlush = 1;

    /* Do the work to create the new merged segment on disk */
    if( rc==LSM_OK ) rc = lsmMCursorFirst(pCsr);
    while( rc==LSM_OK && mergeWorkerDone(&mergeworker)==0 ){
      rc = mergeWorkerStep(&mergeworker);
    }

    nWrite = mergeworker.nWork;
    mergeWorkerShutdown(&mergeworker, &rc);
    pNew->pMerge = 0;
    pNew->iAge = 0;
  }

  /* Link the new level into the top of the tree. */
  if( rc==LSM_OK ){
    if( pDel ) pDel->iRoot = 0;
  }else{
    lsmDbSnapshotSetLevel(pDb->pWorker, pNext);
    sortedFreeLevel(pDb->pEnv, pNew);
  }

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
#endif

  if( rc==LSM_OK ){
    assertBtreeOk(pDb, &pNew->lhs);
    sortedInvokeWorkHook(pDb);
  }

4264
4265
4266
4267
4268
4269
4270
4271
4272
4273
4274
4275
4276
4277
4278
4279

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 1
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */







|
|







4268
4269
4270
4271
4272
4273
4274
4275
4276
4277
4278
4279
4280
4281
4282
4283

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */