Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change the format of compressed page records slightly so that the file format supports inserting padding records into sorted runs.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: 0b940bfe17487aaec091c1915715104a982af507
User & Date: dan 2012-10-27 08:57:49.368
Context
2012-10-28
10:07
Add padding records to segments in compressed databases in order to avoid partial writes to segments that have already been synced to disk. check-in: ae3c8da44d user: dan tags: compression-hooks
2012-10-27
08:57
Change the format of compressed page records slightly so that the file format supports inserting padding records into sorted runs. check-in: 0b940bfe17 user: dan tags: compression-hooks
2012-10-26
18:08
Enable assert() checking for lost blocks in compressed database mode. check-in: 6e7bc9099c user: dan tags: compression-hooks
Changes
Unified Diff Show Whitespace Changes Patch
Changes to lsm-test/lsmtest_main.c.
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353




1354


1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
  if( pClose ) fclose(pClose);
  pEnv->xClose(pOut);

  return rc;
}

static int do_insert(int nArg, char **azArg){
  const char *zConfig = 0;
  const char *zDb = "lsm";
  TestDb *pDb = 0;
  int i;
  int rc;
  const int nRow = 1 * 1000 * 1000;

  DatasourceDefn defn = { TEST_DATASOURCE_RANDOM, 8, 15, 80, 150 };
  Datasource *pData = 0;

  if( nArg>2 ){
    testPrintError("Usage: insert ?DATABASE? ?LSM-CONFIG?\n");
    return 1;
  }
  if( nArg==1 ){ zDb = azArg[0]; }
  if( nArg==2 ){ zConfig = azArg[1]; }

  testMallocUninstall(tdb_lsm_env());




  rc = tdb_open(zDb, 0, 1, &pDb);


  if( rc!=0 ){
    testPrintError("Error opening db \"%s\": %d\n", zDb, rc);
  }else{
    InsertWriteHook hook;
    memset(&hook, 0, sizeof(hook));
    hook.pOut = fopen("writelog.txt", "w");

    pData = testDatasourceNew(&defn);
    tdb_lsm_config_work_hook(pDb, do_insert_work_hook, 0);
    tdb_lsm_write_hook(pDb, do_insert_write_hook, (void *)&hook);
    if( zConfig ){
      rc = tdb_lsm_config_str(pDb, zConfig);
    }

    if( rc==0 ){
      for(i=0; i<nRow; i++){
        void *pKey; int nKey;     /* Database key to insert */
        void *pVal; int nVal;     /* Database value to insert */
        testDatasourceEntry(pData, i, &pKey, &nKey, &pVal, &nVal);
        tdb_write(pDb, pKey, nKey, pVal, nVal);







<









|
|



<


>
>
>
>

>
>










<
<
<







1329
1330
1331
1332
1333
1334
1335

1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349

1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368



1369
1370
1371
1372
1373
1374
1375
  if( pClose ) fclose(pClose);
  pEnv->xClose(pOut);

  return rc;
}

static int do_insert(int nArg, char **azArg){

  const char *zDb = "lsm";
  TestDb *pDb = 0;
  int i;
  int rc;
  const int nRow = 1 * 1000 * 1000;

  DatasourceDefn defn = { TEST_DATASOURCE_RANDOM, 8, 15, 80, 150 };
  Datasource *pData = 0;

  if( nArg>1 ){
    testPrintError("Usage: insert ?DATABASE?\n");
    return 1;
  }
  if( nArg==1 ){ zDb = azArg[0]; }


  testMallocUninstall(tdb_lsm_env());
  for(i=0; zDb[i] && zDb[i]!='='; i++);
  if( zDb[i] ){
    rc = tdb_lsm_open(zDb, "testdb.lsm", 1, &pDb);
  }else{
  rc = tdb_open(zDb, 0, 1, &pDb);
  }

  if( rc!=0 ){
    testPrintError("Error opening db \"%s\": %d\n", zDb, rc);
  }else{
    InsertWriteHook hook;
    memset(&hook, 0, sizeof(hook));
    hook.pOut = fopen("writelog.txt", "w");

    pData = testDatasourceNew(&defn);
    tdb_lsm_config_work_hook(pDb, do_insert_work_hook, 0);
    tdb_lsm_write_hook(pDb, do_insert_write_hook, (void *)&hook);




    if( rc==0 ){
      for(i=0; i<nRow; i++){
        void *pKey; int nKey;     /* Database key to insert */
        void *pVal; int nVal;     /* Database value to insert */
        testDatasourceEntry(pData, i, &pKey, &nKey, &pVal, &nVal);
        tdb_write(pDb, pKey, nKey, pVal, nVal);
Changes to src/lsm_file.c.
96
97
98
99
100
101
102





103





104



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

131
132
133
134
135
136
137
138
** the first and last page of each block in uncompressed databases. From
** the point of view of the upper layer, all pages are the same size - this
** is different from the uncompressed format where the first and last pages
** on each block are 4 bytes smaller than the others.
**
** Pages are stored in variable length compressed form, as follows:
**





**     * Number of bytes in compressed page image, as a 3-byte big-endian





**       integer.



**
**     * Compressed page image.
**
**     * The number of bytes in the compressed page image, again as a 3-byte
**       big-endian integer.
**
** A page number is a byte offset into the database file. So the smallest
** possible page number is 8192 (immediately after the two meta-pages).
** The first and root page of a segment are identified by a page number
** corresponding to the byte offset of the first byte in the corresponding
** page record. The last page of a segment is identified by the byte offset
** of the last byte in its record.
**
** Unlike uncompressed pages, compressed page records may span blocks.
**
** TODO:
**
** Sometimes, in order to avoid touching sectors that contain synced data
** when writing, it is necessary to insert unused space between compressed
** page records. This can be done as follows:
**
**     * For less than 4 bytes of empty space, a series of 0x00 bytes.
**
**     * For 4 or more bytes, the block of free space begins with an 
**       0x01 byte, followed by a varint containing the total size of the 
**       free space. Similarly, it ends with an (ABCD -> BCDA) transformed

**       varint an a final 0x01 byte.
*/
#include "lsmInt.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>








>
>
>
>
>
|
>
>
>
>
>
|
>
>
>



<
|










<
<




|

|
|
|
>
|







96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

121
122
123
124
125
126
127
128
129
130
131


132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
** the first and last page of each block in uncompressed databases. From
** the point of view of the upper layer, all pages are the same size - this
** is different from the uncompressed format where the first and last pages
** on each block are 4 bytes smaller than the others.
**
** Pages are stored in variable length compressed form, as follows:
**
**     * 3-byte size field containing the size of the compressed page image
**       in bytes. The most significant bit of each byte of the size field
**       is always set. The remaining 7 bits are used to store a 21-bit
**       integer value (in big-endian order - the first byte in the field
**       contains the most significant 7 bits). Since the maximum allowed 
**       size of a compressed page image is (2^17 - 1) bytes, there are
**       actually 4 unused bits in the size field.
**
**       In other words, if the size of the compressed page image is nSz,
**       the header can be serialized as follows:
**
**         u8 aHdr[3]
**         aHdr[0] = 0x80 | (u8)(nSz >> 14);
**         aHdr[1] = 0x80 | (u8)(nSz >>  7);
**         aHdr[2] = 0x80 | (u8)(nSz >>  0);
**
**     * Compressed page image.
**

**     * A second copy of the 3-byte record header.
**
** A page number is a byte offset into the database file. So the smallest
** possible page number is 8192 (immediately after the two meta-pages).
** The first and root page of a segment are identified by a page number
** corresponding to the byte offset of the first byte in the corresponding
** page record. The last page of a segment is identified by the byte offset
** of the last byte in its record.
**
** Unlike uncompressed pages, compressed page records may span blocks.
**


** Sometimes, in order to avoid touching sectors that contain synced data
** when writing, it is necessary to insert unused space between compressed
** page records. This can be done as follows:
**
**     * For less than 6 bytes of empty space, a series of 0x00 bytes.
**
**     * For 6 or more bytes of empty space, a record similar to a 
**       compressed page record is added to the segment. A padding record
**       is distinguished from a compressed page record by the most 
**       significant bit of the second byte of the size field, which is
**       cleared instead of set. 
*/
#include "lsmInt.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

931
932
933
934
935
936
937
938
939
940


941
942
943

944
945
946


947
948
949
950
951
952
953
  }else{
    assert( 0 );
  }
  return rc;
}

/*
** Encode and decode routines for 24-bit big-endian integers.
*/
static u32 lsmGetU24(u8 *aBuf){


  return (((u32)aBuf[0]) << 16) + (((u32)aBuf[1]) << 8) + ((u32)aBuf[2]);
}
static void lsmPutU24(u8 *aBuf, u32 iVal){

  aBuf[0] = (u8)(iVal >> 16);
  aBuf[1] = (u8)(iVal >>  8);
  aBuf[2] = (u8)(iVal >>  0);


}

static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){
  i64 iStart;
  int iBlk;
  int rc;








|

|
>
>
|

|
>
|
|
|
>
>







942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
  }else{
    assert( 0 );
  }
  return rc;
}

/*
** Encode and decode routines for record size fields.
*/
static void putRecordSize(u8 *aBuf, int nByte, int bFree){
  aBuf[0] = (u8)(nByte >> 14) | 0x80;
  aBuf[1] = ((u8)(nByte >>  7) & 0x7F) | (bFree ? 0x00 : 0x80);
  aBuf[2] = (u8)nByte | 0x80;
}
static int getRecordSize(u8 *aBuf, int *pbFree){
  int nByte;
  nByte  = (aBuf[0] & 0x7F) << 14;
  nByte += (aBuf[1] & 0x7F) << 7;
  nByte += (aBuf[2] & 0x7F);
  *pbFree = !!(aBuf[1] & 0x80);
  return nByte;
}

static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){
  i64 iStart;
  int iBlk;
  int rc;

1011
1012
1013
1014
1015
1016
1017

1018
1019
1020
1021
1022
1023
1024
1025
  assert( p && pPg->nCompress==0 );

  if( fsAllocateBuffer(pFS) ) return LSM_NOMEM;

  rc = fsReadData(pFS, iOff, aSz, sizeof(aSz));

  if( rc==LSM_OK ){

    pPg->nCompress = (int)lsmGetU24(aSz);
    rc = fsAddOffset(pFS, iOff, 3, &iOff);
    if( rc==LSM_OK ){
      if( pPg->nCompress>pFS->nBuffer ){
        rc = LSM_CORRUPT_BKPT;
      }else{
        rc = fsReadData(pFS, iOff, pFS->aBuffer, pPg->nCompress);
      }







>
|







1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
  assert( p && pPg->nCompress==0 );

  if( fsAllocateBuffer(pFS) ) return LSM_NOMEM;

  rc = fsReadData(pFS, iOff, aSz, sizeof(aSz));

  if( rc==LSM_OK ){
    int bFree;
    pPg->nCompress = (int)getRecordSize(aSz, &bFree);
    rc = fsAddOffset(pFS, iOff, 3, &iOff);
    if( rc==LSM_OK ){
      if( pPg->nCompress>pFS->nBuffer ){
        rc = LSM_CORRUPT_BKPT;
      }else{
        rc = fsReadData(pFS, iOff, pFS->aBuffer, pPg->nCompress);
      }
1317
1318
1319
1320
1321
1322
1323

1324
1325
1326
1327
1328
1329
1330
1331
1332
  u8 aSz[3];
  int rc;
  i64 iRead;

  rc = fsSubtractOffset(pFS, iOff, sizeof(aSz), &iRead);
  if( rc==LSM_OK ) rc = fsReadData(pFS, iRead, aSz, sizeof(aSz));
  if( rc==LSM_OK ){

    int nSz = lsmGetU24(aSz) + sizeof(aSz)*2;
    rc = fsSubtractOffset(pFS, iOff, nSz, piPrev);
  }

  return rc;
}

/*
** The first argument to this function is a valid reference to a database







>
|
|







1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
  u8 aSz[3];
  int rc;
  i64 iRead;

  rc = fsSubtractOffset(pFS, iOff, sizeof(aSz), &iRead);
  if( rc==LSM_OK ) rc = fsReadData(pFS, iRead, aSz, sizeof(aSz));
  if( rc==LSM_OK ){
    int bFree;
    int nSz = getRecordSize(aSz, &bFree);
    rc = fsSubtractOffset(pFS, iOff, nSz + sizeof(aSz)*2, piPrev);
  }

  return rc;
}

/*
** The first argument to this function is a valid reference to a database
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
      u8 aSz[3];                  /* pPg->nCompress as a 24-bit big-endian */
      assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 );

      /* Compress the page image. */
      rc = fsCompressIntoBuffer(pFS, pPg);

      /* Serialize the compressed size into buffer aSz[] */
      lsmPutU24(aSz, pPg->nCompress);

      /* Write the serialized page record into the database file. */
      pPg->iPg = fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);
      fsAppendData(pFS, pPg->pSeg, pFS->aBuffer, pPg->nCompress, &rc);
      fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);

      /* Now that it has a page number, insert the page into the hash table */







|







1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
      u8 aSz[3];                  /* pPg->nCompress as a 24-bit big-endian */
      assert( pPg->pSeg && pPg->iPg==0 && pPg->nCompress==0 );

      /* Compress the page image. */
      rc = fsCompressIntoBuffer(pFS, pPg);

      /* Serialize the compressed size into buffer aSz[] */
      putRecordSize(aSz, pPg->nCompress, 0);

      /* Write the serialized page record into the database file. */
      pPg->iPg = fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);
      fsAppendData(pFS, pPg->pSeg, pFS->aBuffer, pPg->nCompress, &rc);
      fsAppendData(pFS, pPg->pSeg, aSz, sizeof(aSz), &rc);

      /* Now that it has a page number, insert the page into the hash table */