Index: lsm-test/lsmtest_tdb.c ================================================================== --- lsm-test/lsmtest_tdb.c +++ lsm-test/lsmtest_tdb.c @@ -611,11 +611,13 @@ int (*xOpen)(const char *zFilename, int bClear, TestDb **ppDb); } aLib[] = { { "sqlite3", "testdb.sqlite", sql_open }, { "lsm_small", "testdb.lsm_small", test_lsm_small_open }, { "lsm_lomem", "testdb.lsm_lomem", test_lsm_lomem_open }, +#ifdef HAVE_ZLIB { "lsm_zip", "testdb.lsm_zip", test_lsm_zip_open }, +#endif { "lsm", "testdb.lsm", test_lsm_open }, #ifdef LSM_MUTEX_PTHREADS { "lsm_mt2", "testdb.lsm_mt2", test_lsm_mt2 }, { "lsm_mt3", "testdb.lsm_mt3", test_lsm_mt3 }, #endif Index: lsm-test/lsmtest_tdb3.c ================================================================== --- lsm-test/lsmtest_tdb3.c +++ lsm-test/lsmtest_tdb3.c @@ -395,31 +395,41 @@ /************************************************************************* ************************************************************************** ** Begin test compression hooks. */ +#ifdef HAVE_ZLIB +#include + static int testZipBound(void *pCtx, int nSrc){ - assert( 0 ); - return 0; + return compressBound(nSrc); } static int testZipCompress( - void *pCtx, /* Context pointer */ - char *aOut, int *pnOut, /* OUT: Buffer containing compressed data */ - const char *aIn, int nIn /* Buffer containing input data */ + void *pCtx, /* Context pointer */ + char *aOut, int *pnOut, /* OUT: Buffer containing compressed data */ + const char *aIn, int nIn /* Buffer containing input data */ ){ - assert( 0 ); - return 0; + uLongf n = *pnOut; /* In/out buffer size for compress() */ + int rc; /* compress() return code */ + + rc = compress((Bytef*)aOut, &n, (Bytef*)aIn, nIn); + *pnOut = n; + return (rc==Z_OK ? 0 : LSM_ERROR); } static int testZipUncompress( - void *pCtx, /* Context pointer */ - char *aOut, int *pnOut, /* OUT: Buffer containing uncompressed data */ - const char *aIn, int nIn /* Buffer containing input data */ + void *pCtx, /* Context pointer */ + char *aOut, int *pnOut, /* OUT: Buffer containing uncompressed data */ + const char *aIn, int nIn /* Buffer containing input data */ ){ - assert( 0 ); - return 0; + uLongf n = *pnOut; /* In/out buffer size for uncompress() */ + int rc; /* uncompress() return code */ + + rc = uncompress((Bytef*)aOut, &n, (Bytef*)aIn, nIn); + *pnOut = n; + return (rc==Z_OK ? 0 : LSM_ERROR); } static int testConfigureCompression(lsm_db *pDb){ static lsm_compress zip = { 1, sizeof(lsm_compress), @@ -428,10 +438,11 @@ testZipCompress, /* xCompress method */ testZipUncompress /* xUncompress method */ }; return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip); } +#endif /* ifdef HAVE_ZLIB */ /* ** End test compression hooks. ************************************************************************** *************************************************************************/ @@ -688,11 +699,13 @@ { "max_freelist", 0, LSM_CONFIG_MAX_FREELIST }, { "multi_proc", 0, LSM_CONFIG_MULTIPLE_PROCESSES }, { "worker_nmerge", 1, LSM_CONFIG_NMERGE }, { "test_no_recovery", 0, TEST_NO_RECOVERY }, { "threads", 0, TEST_THREADS }, +#ifdef HAVE_ZLIB { "compression", 0, TEST_COMPRESSION }, +#endif { 0, 0 } }; const char *z = zStr; int nThread = 1; @@ -740,13 +753,15 @@ pLsm->bNoRecovery = iVal; break; case TEST_THREADS: nThread = iVal; break; +#ifdef HAVE_ZLIB case TEST_COMPRESSION: testConfigureCompression(db); break; +#endif } } } }else if( z!=zStart ){ goto syntax_error; Index: src/lsm_file.c ================================================================== --- src/lsm_file.c +++ src/lsm_file.c @@ -170,10 +170,11 @@ /* If this is a compressed database, a pointer to the compression methods. ** For an uncompressed database, a NULL pointer. */ lsm_compress *pCompress; u8 *aBuffer; /* Buffer to compress into */ + int nBuffer; /* Allocated size of aBuffer[] in bytes */ /* mmap() mode things */ int bUseMmap; /* True to use mmap() to access db file */ void *pMap; /* Current mapping of database file */ i64 nMap; /* Bytes mapped at pMap */ @@ -978,10 +979,20 @@ rc = fsBlockNext(pFS, fsPageToBlock(pFS, iOff), &iBlk); *piRes = fsFirstPageOnBlock(pFS, iBlk) + iAdd - (iEob - iOff + 1); return rc; } + +static int fsAllocateBuffer(FileSystem *pFS){ + assert( pFS->pCompress ); + if( pFS->aBuffer==0 ){ + pFS->nBuffer = pFS->pCompress->xBound(pFS->pCompress->pCtx, pFS->nPagesize); + pFS->aBuffer = lsmMalloc(pFS->pEnv, LSM_MAX(pFS->nBuffer, pFS->nPagesize)); + if( pFS->aBuffer==0 ) return LSM_NOMEM_BKPT; + } + return LSM_OK; +} /* ** This function is only called in compressed database mode. It reads and ** uncompresses the compressed data for page pPg from the database and ** populates the pPg->aData[] buffer and pPg->nCompress field. @@ -990,23 +1001,41 @@ */ static int fsReadPagedata( FileSystem *pFS, /* File-system handle */ Page *pPg /* Page to read and uncompress data for */ ){ - i64 iOff; + lsm_compress *p = pFS->pCompress; + i64 iOff = pPg->iPg; u8 aSz[6]; int rc; - assert( pFS->pCompress && pPg->nCompress==0 ); - iOff = pPg->iPg; + assert( p && pPg->nCompress==0 ); + + if( fsAllocateBuffer(pFS) ) return LSM_NOMEM; rc = fsReadData(pFS, iOff, aSz, sizeof(aSz)); if( rc==LSM_OK ){ pPg->nCompress = (int)lsmGetU24(aSz); rc = fsAddOffset(pFS, iOff, 3, &iOff); - if( rc==LSM_OK ) rc = fsReadData(pFS, iOff, pPg->aData, pPg->nCompress); + if( rc==LSM_OK ){ + if( pPg->nCompress>pFS->nBuffer ){ + rc = LSM_CORRUPT_BKPT; + }else{ + rc = fsReadData(pFS, iOff, pFS->aBuffer, pPg->nCompress); + } + if( rc==LSM_OK ){ + int n = pFS->nBuffer; + rc = p->xUncompress(p->pCtx, + (char *)pPg->aData, &n, + (const char *)pFS->aBuffer, pPg->nCompress + ); + if( rc==LSM_OK && n!=pPg->nData ){ + rc = LSM_CORRUPT_BKPT; + } + } + } } return rc; } /* @@ -1064,10 +1093,11 @@ p->pFS = pFS; assert( p->flags==0 || p->flags==PAGE_FREE ); if( pFS->pCompress==0 && (fsIsLast(pFS, iPg) || fsIsFirst(pFS, iPg)) ){ p->flags |= PAGE_SHORT; } + p->nData = pFS->nPagesize - (p->flags & PAGE_SHORT); #ifdef LSM_DEBUG memset(p->aData, 0x56, pFS->nPagesize); #endif assert( p->pLruNext==0 && p->pLruPrev==0 ); @@ -1085,11 +1115,10 @@ /* If the xRead() call was successful (or not attempted), link the ** page into the page-cache hash-table. Otherwise, if it failed, ** free the buffer. */ if( rc==LSM_OK ){ p->pHashNext = pFS->apHash[iHash]; - p->nData = pFS->nPagesize - (p->flags & PAGE_SHORT); pFS->apHash[iHash] = p; }else{ fsPageBufferFree(p); p = 0; } @@ -1461,37 +1490,38 @@ /* ** Mark the sorted run passed as the second argument as finished. */ int lsmFsSortedFinish(FileSystem *pFS, Segment *p){ int rc = LSM_OK; - if( p ){ + if( p && p->iLastPg ){ const int nPagePerBlock = (pFS->nBlocksize / pFS->nPagesize); + int iBlk; /* Check if the last page of this run happens to be the last of a block. ** If it is, then an extra block has already been allocated for this run. ** Shift this extra block back to the free-block list. ** ** Otherwise, add the first free page in the last block used by the run ** to the lAppend list. */ - if( (p->iLastPg % nPagePerBlock)==0 ){ - Page *pLast; - rc = fsPageGet(pFS, p->iLastPg, 0, &pLast); - if( rc==LSM_OK ){ - int iPg = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]); - int iBlk = fsPageToBlock(pFS, iPg); - lsmBlockRefree(pFS->pDb, iBlk); - lsmFsPageRelease(pLast); - } - }else{ + iBlk = fsPageToBlock(pFS, p->iLastPg); + if( fsLastPageOnBlock(pFS, fsPageToBlock(pFS, p->iLastPg) )!=p->iLastPg ){ int i; u32 *aiAppend = pFS->pDb->pWorker->aiAppend; for(i=0; iiLastPg+1; break; } + } + }else if( pFS->pCompress==0 ){ + Page *pLast; + rc = fsPageGet(pFS, p->iLastPg, 0, &pLast); + if( rc==LSM_OK ){ + int iPg = (int)lsmGetU32(&pLast->aData[pFS->nPagesize-4]); + lsmBlockRefree(pFS->pDb, fsPageToBlock(pFS, iPg)); + lsmFsPageRelease(pLast); } } } return rc; } @@ -1649,12 +1679,14 @@ /* Write as much data as is possible at iApp (usually all of it). */ if( rc==LSM_OK ){ int nSpace = fsLastPageOnBlock(pFS, fsPageToBlock(pFS, iApp)) - iApp + 1; nWrite = LSM_MIN(nData, nSpace); nRem = nData - nWrite; - rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite); - + assert( nWrite>=0 ); + if( nWrite!=0 ){ + rc = lsmEnvWrite(pFS->pEnv, pFS->fdDb, iApp, aData, nWrite); + } iApp += nWrite; } /* If required, allocate a new block and write the rest of the data ** into it. Set the next and previous block pointers to link the new @@ -1702,21 +1734,20 @@ ** ** If buffer pFS->aBuffer[] has not been allocated then this function ** allocates it. If this fails, LSM_NOMEM is returned. Otherwise, LSM_OK. */ static int fsCompressIntoBuffer(FileSystem *pFS, Page *pPg){ - /* TODO: Fill in a real version of this function */ + lsm_compress *p = pFS->pCompress; - if( pFS->aBuffer==0 ){ - pFS->aBuffer = lsmMalloc(pFS->pEnv, pFS->nPagesize); - if( pFS->aBuffer==0 ) return LSM_NOMEM_BKPT; - } - + if( fsAllocateBuffer(pFS) ) return LSM_NOMEM; assert( pPg->nData==pFS->nPagesize ); - memcpy(pFS->aBuffer, pPg->aData, pFS->nPagesize); - pPg->nCompress = pFS->nPagesize; - return LSM_OK; + + pPg->nCompress = pFS->nBuffer; + return p->xCompress(p->pCtx, + (char *)pFS->aBuffer, &pPg->nCompress, + (const char *)pPg->aData, pPg->nData + ); } /* ** If the page passed as an argument is dirty, update the database file ** (or mapping of the database file) with its current contents and mark