Index: main.mk ================================================================== --- main.mk +++ main.mk @@ -268,10 +268,11 @@ parse.c \ # Header files used by all library source files. # HDR = \ + $(TOP)/src/btInt.h \ $(TOP)/src/hash.h \ $(TOP)/src/hwtime.h \ keywordhash.h \ $(TOP)/src/kv.h \ $(TOP)/src/lsm.h \ Index: src/btInt.h ================================================================== --- src/btInt.h +++ src/btInt.h @@ -11,10 +11,12 @@ ************************************************************************* ** */ #include "bt.h" + +/* #define BT_STDERR_DEBUG 1 */ typedef sqlite4_int64 i64; typedef sqlite4_uint64 u64; typedef unsigned int u32; typedef unsigned short u16; Index: src/bt_main.c ================================================================== --- src/bt_main.c +++ src/bt_main.c @@ -289,11 +289,29 @@ u8 *ptr = btCellPtrFind(aData, nData, i); fprintf(f, "%s%d", i==0?"":" ", (int)btGetU16(ptr)); } fprintf(f, ")\n"); + for(i=0; ipDb->pPager); u32 pgno; /* Page number for next page to load */ int rc = SQLITE4_OK; /* Return Code */ @@ -386,17 +405,21 @@ } if( rc!=SQLITE4_OK ) break; assert( iHi==iLo ); pCsr->aiCell[pCsr->nPg-1] = iHi; +#if 0 +printPage(stderr, pgno, aData, pgsz); +#endif + if( aData[0] & BT_PGFLAGS_INTERNAL ){ if( iHi==nCell ){ pgno = btGetU32(&aData[1]); }else{ u8 *pCell; int nByte; - pCell = btCellFind(aData, pgsz, 0); + pCell = btCellFind(aData, pgsz, iHi); pCell += sqlite4BtVarintGet32(pCell, &nByte); pCell += nByte; pgno = btGetU32(pCell); } }else{ @@ -404,23 +427,38 @@ if( res!=0 ){ assert( BT_SEEK_LEFAST<0 && BT_SEEK_LE<0 ); if( eSeek<0 ){ rc = sqlite4BtCsrPrev(pCsr); - if( rc==SQLITE4_OK ) rc = SQLITE4_INEXACT; - }else if( eSeek==BT_SEEK_EQ || iHi==nCell ){ + }else if( eSeek==BT_SEEK_EQ ){ rc = SQLITE4_NOTFOUND; + }else if( iHi==nCell ){ + if( bUpdate ){ + rc = SQLITE4_NOTFOUND; + }else{ + rc = sqlite4BtCsrNext(pCsr); + } }else{ rc = SQLITE4_INEXACT; } + if( rc==SQLITE4_OK ) rc = SQLITE4_INEXACT; } } } } return rc; } + +int sqlite4BtCsrSeek( + bt_cursor *pCsr, + const void *pK, /* Key to seek for */ + int nK, /* Size of key pK in bytes */ + int eSeek /* Seek mode (a BT_SEEK_XXX constant) */ +){ + return btCsrSeek(pCsr, pK, nK, eSeek, 0); +} /* ** This function seeks the cursor as required for either sqlite4BtCsrFirst() ** (if parameter bLast is false) or sqlite4BtCsrLast() (if bLast is true). */ @@ -560,10 +598,11 @@ u8 *pCell; int iCell = pCsr->aiCell[pCsr->nPg-1]; int nK; aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]); + assert( btCellCount(aData, pgsz)>iCell ); pCell = btCellFind(aData, pgsz, iCell); pCell += sqlite4BtVarintGet32(pCell, &nK); if( nK==0 ){ assert( 0 ); @@ -767,19 +806,25 @@ /* Populated by btGatherSiblings */ int nIn; /* Number of sibling pages */ BtPage *apPg[5]; /* Array of sibling pages */ + int nCell; /* Number of input cells */ + /* Array populated by btBalanceMeasure */ int *anCellSz; + /* Populated in btBalance() */ int anOut[5]; /* Cell counts for output pages */ /* Variables used by btBalanceOutput */ int nOut; /* Number of output pages */ int iOut; /* Current output page */ u8 *apOut[5]; /* Buffers to assemble output in */ + KeyValue aPCell[5]; /* Cells to push into the parent page */ + u8 *pTmp; /* Space for apCell[x].pKey if required */ + int iTmp; /* Offset to free space within pTmp */ }; static int btGatherSiblings(BalanceCtx *p){ bt_cursor *pCsr = p->pCsr; bt_db * const pDb = pCsr->pDb; @@ -786,11 +831,11 @@ const int pgsz = sqlite4BtPagerPagesize(pDb->pPager); int rc = SQLITE4_OK; int nCell; /* Number of cells in parent page */ u8 *aParent; /* Buffer of parent page */ - int iChild; /* Index of child page */ + int iChild; /* Index of child page within parent */ int nSib; /* Number of siblings */ int iSib; /* Index of left-most sibling page */ int i; @@ -813,16 +858,33 @@ } for(i=0; ipPager, pgno, &p->apPg[i]); + assert( (iSib+i)!=iChild || p->apPg[i]==pCsr->apPage[pCsr->nPg-1] ); } p->nIn = nSib; pCsr->aiCell[pCsr->nPg-2] = iSib; return rc; } + +/* +** Argument pCell points to a cell on an internal node. Decode the +** cell into key-value object *pKV. An internal cell always has +** the same format: +** +** * Number of bytes in the key (nKey) as a varint. +** * nKey bytes of key data. +** * A page pointer, stored as a 32-bit big-endian unsigned. +*/ +static void btInternalCellToKeyValue(u8 *pCell, KeyValue *pKV){ + pKV->pK = pCell + sqlite4BtVarintGet32(pCell, &pKV->nK); + pKV->pgno = btGetU32(&((u8*)pKV->pK)[pKV->nK]); + pKV->pV = 0; + pKV->nV = 0; +} static int btSetChildPgno(bt_db *pDb, BtPage *pPg, int iChild, u32 pgno){ const int pgsz = sqlite4BtPagerPagesize(pDb->pPager); int rc; @@ -845,10 +907,11 @@ } /* Called recursively by btBalance(). todo: Fix this! */ static int btInsertAndBalance(bt_cursor *, int, KeyValue *); static int btDeleteFromPage(bt_cursor *, int); +static int btBalanceIfUnderfull(bt_cursor *pCsr); static int btBalanceMeasure( BalanceCtx *p, /* Description of balance operation */ int iCell, /* Cell number in this iteration */ u8 *pCell, int nByte, /* Binary cell */ @@ -866,40 +929,80 @@ BalanceCtx *p, /* Description of balance operation */ int iCell, /* Cell number in this iteration */ u8 *pCell, int nByte, /* Binary cell to copy to output */ KeyValue *pKV /* Key-value cell to write to output */ ){ - u8 *aOut; /* Buffer for output page */ + u8 *aOut = p->apOut[p->iOut]; /* Buffer for current output page */ int iOff; /* Offset of new cell within page */ int nCell; /* Number of cells already on page */ assert( (pCell==0)!=(pKV==0) ); - /* Scribble the new cell into the output page. */ - aOut = p->apOut[p->iOut]; - iOff = btFreeOffset(aOut, p->pgsz); - if( iOff==0 ) iOff = (p->bLeaf ? 1 : 5); - nCell = btCellCount(aOut, p->pgsz); - btPutU16(btCellPtrFind(aOut, p->pgsz, nCell), iOff); - if( pCell ){ - memcpy(&aOut[iOff], pCell, nByte); - iOff += nByte; - }else{ - iOff += btKVCellWrite(pKV, p->pgsz, &aOut[iOff]); - } - btPutU16(&aOut[p->pgsz-2], nCell+1); - btPutU16(&aOut[p->pgsz-6], iOff); - - if( (iCell+1)==p->anOut[p->iOut] ){ - /* That was the last cell for this page. Fill in the rest of the - ** output page footer and so on. Then increment BalanceCtx.iOut so - ** that the next call writes to the next page. */ - int nFree; /* Free space remaining on output page */ - nFree = p->pgsz - iOff - (6 + 2*(nCell+1)); - aOut[0] = (p->bLeaf ? 0 : BT_PGFLAGS_INTERNAL); - btPutU16(&aOut[p->pgsz-4], nFree); + if( p->bLeaf==0 && iCell==p->anOut[p->iOut] ){ + /* This cell is destined for the parent page of the siblings being + ** rebalanced. So instead of writing it to a page buffer it is copied + ** into the BalanceCtx.aPCell[] array. + ** + ** When this cell is eventually written to the parent, the accompanying + ** page pointer will be the page number of sibling page p->iOut. This + ** value will be filled in later. + ** + ** The pointer that is currently part of the cell is used as the + ** right-child pointer of page p->iOut. This value is written now. */ + int nKey; + u8 *pKey; + u8 *pCopy; + u32 pgno; + KeyValue *pPKey = &p->aPCell[p->iOut]; + + if( pCell ){ + pKey = pCell + sqlite4BtVarintGet32(pCell, &nKey); + pgno = btGetU32(&pKey[nKey]); + }else{ + pKey = pKV->pK; + nKey = pKV->nK; + pgno = pKV->pgno; + } + + pCopy = &p->pTmp[p->iTmp]; + p->iTmp += nKey; + memcpy(pCopy, pKey, nKey); + pPKey->pK = pCopy; + pPKey->nK = nKey; + + btPutU32(&aOut[1], pgno); p->iOut++; + }else{ + + /* Write the new cell into the output page. */ + iOff = btFreeOffset(aOut, p->pgsz); + if( iOff==0 ) iOff = (p->bLeaf ? 1 : 5); + nCell = btCellCount(aOut, p->pgsz); + btPutU16(btCellPtrFind(aOut, p->pgsz, nCell), iOff); + if( pCell ){ + memcpy(&aOut[iOff], pCell, nByte); + iOff += nByte; + }else{ + iOff += btKVCellWrite(pKV, p->pgsz, &aOut[iOff]); + } + btPutU16(&aOut[p->pgsz-2], nCell+1); + btPutU16(&aOut[p->pgsz-6], iOff); + + if( (iCell+1)==p->anOut[p->iOut] ){ + /* That was the last cell for this page. Fill in the rest of the + ** output page footer and the flags byte at the start of the page. */ + int nFree; /* Free space remaining on output page */ + nFree = p->pgsz - iOff - (6 + 2*(nCell+1)); + aOut[0] = (p->bLeaf ? 0 : BT_PGFLAGS_INTERNAL); + btPutU16(&aOut[p->pgsz-4], nFree); + + /* If the siblings are leaf pages, increment BalanceCtx.iOut here. + ** for internal nodes, it will be incremented by the next call to + ** this function, after a divider cell is pushed into the parent + ** node. */ + p->iOut += p->bLeaf; + } } return SQLITE4_OK; } @@ -913,10 +1016,15 @@ int iCall = 0; int i; /* Used to iterate through KV pairs */ BtPage *pIns = p->pCsr->apPage[p->pCsr->nPg-1]; int iIns = p->pCsr->aiCell[p->pCsr->nPg-1]; + + /* Check that page pIns is actually a member of the ctx.apPg[] array. */ +#ifndef NDEBUG + for(i=0; p->apPg[i]!=pIns; i++) assert( inIn ); +#endif for(iPg=0; iPgnIn && rc==SQLITE4_OK; iPg++){ BtPage *pPg; /* Current page */ u8 *aData; /* Page data */ int nCell; /* Number of cells on page pPg */ @@ -930,144 +1038,188 @@ int nByte; u8 *pCell; if( pPg==pIns && iCell==iIns ){ for(i=0; inKV; i++){ + assert( iCallnCell ); rc = xVisit(p, iCall++, 0, 0, &p->apKV[i]); if( rc!=SQLITE4_OK ) break; } } pCell = btCellFindSize(aData, pgsz, iCell, &nByte); rc = xVisit(p, iCall++, pCell, nByte, 0); } - if( pPg==pIns && iCell==nCell && rc==SQLITE4_OK ){ - for(i=0; inKV; i++){ + if( pPg==pIns && iIns==nCell ){ + for(i=0; inKV && rc==SQLITE4_OK; i++){ + assert( iCallnCell ); rc = xVisit(p, iCall++, 0, 0, &p->apKV[i]); } } + + /* If the siblings being balanced are not leaves, and the page just + ** processed was not the right-most sibling, visit a cell from the + ** parent page. */ + if( p->bLeaf==0 && iPg<(p->nIn-1) && rc==SQLITE4_OK ){ + int iPar = p->pCsr->nPg-2; + u8 *aParent = sqlite4BtPageData(p->pCsr->apPage[iPar]); + u8 *pCell = btCellFind(aParent, pgsz, p->pCsr->aiCell[iPar] + iPg); + KeyValue kv; + btInternalCellToKeyValue(pCell, &kv); + kv.pgno = btGetU32(&aData[1]); + rc = xVisit(p, iCall++, 0, 0, &kv); + } } + assert( rc!=SQLITE4_OK || iCall==p->nCell ); return rc; } -int btBalance(bt_cursor *pCsr, int nKV, KeyValue *apKV){ +int btBalance( + bt_cursor *pCsr, /* Cursor pointed to page to rebalance */ + int bLeaf, /* True if rebalancing leaf pages */ + int nKV, /* Number of entries in apKV[] array */ + KeyValue *apKV /* Extra entries to add while rebalancing */ +){ bt_db * const pDb = pCsr->pDb; const int pgsz = sqlite4BtPagerPagesize(pDb->pPager); - const int nSpacePerPage = (pgsz - 1 - 6); + const int nSpacePerPage = (pgsz - 1 - 6 - (!bLeaf)*4); int iPg; /* Used to iterate through pages */ int iCell; /* Used to iterate through cells */ - int nCell = 0; /* Total number of cells to redistribute */ - int *anCellSz; /* Array containing size in bytes of cells */ - KeyValue aPCell[5]; /* Cells to push into the parent page */ + int anByteOut[5]; /* Bytes of content on each output page */ BtPage *pPar; /* Parent page */ int iSib; /* Index of left-most sibling */ - int nTotal; /* Total bytes of content to distribute */ int rc = SQLITE4_OK; /* Return code */ BalanceCtx ctx; memset(&ctx, 0, sizeof(ctx)); ctx.pCsr = pCsr; ctx.nKV = nKV; ctx.apKV = apKV; ctx.pgsz = pgsz; - ctx.bLeaf = 1; /* todo */ + ctx.bLeaf = bLeaf; - memset(aPCell, 0, sizeof(aPCell)); + memset(anByteOut, 0, sizeof(anByteOut)); /* Gather the sibling pages from which cells will be redistributed into ** the ctx.apPg[] array. */ + assert( bLeaf==0 || bLeaf==1 ); assert( pCsr->nPg>1 ); rc = btGatherSiblings(&ctx); if( rc!=SQLITE4_OK ) goto rebalance_out; pPar = pCsr->apPage[pCsr->nPg-2]; iSib = pCsr->aiCell[pCsr->nPg-2]; - /* Count the number of input cells */ - nCell = 1; + /* Count the number of input cells. */ + ctx.nCell = nKV; for(iPg=0; iPg0 ); /* Allocate and populate the anCellSz[] array */ - anCellSz = ctx.anCellSz = (int*)sqlite4_malloc(pDb->pEnv, sizeof(int)*nCell); - if( anCellSz==0 ){ + ctx.anCellSz = (int*)sqlite4_malloc(pDb->pEnv, sizeof(int)*ctx.nCell); + if( ctx.anCellSz==0 ){ rc = btErrorBkpt(SQLITE4_NOMEM); goto rebalance_out; } rc = btBalanceVisitCells(&ctx, btBalanceMeasure); - /* Now figure out the number of output pages. Set ctx.nOut to this value. */ + /* Now figure out the number of output pages required. Set ctx.nOut to + ** this value. */ iCell = 0; - for(iPg=0; iCellnSpacePerPage ) break; + if( bLeaf==0 && iPg!=0 ){ + /* This cell will be pushed up to the parent node as a divider cell, + ** not written to any output page. */ + iCell++; + } + assert( anByteOut[iPg]==0 ); + for(/* noop */; iCellnSpacePerPage ) break; + anByteOut[iPg] += nByte; } ctx.anOut[iPg] = iCell; } ctx.nOut = iPg; - assert( ctx.anOut[ctx.nOut-1]==nCell ); - - /* Calculate the total size of all cells. */ - nTotal = 0; - for(iCell=0; iCell=0; iPg--){ - int nByte = 0; /* Number of bytes of content on page */ - int nGoal = nTotal / (iPg + 2); - - for( ; iCell>0 && ((nBytectx.anOut[iPg]); iCell--){ - int nThis = (anCellSz[iCell-1] + 2); - if( (nThis + nByte)>nSpacePerPage ) break; - nByte += nThis; - } - assert( iCell<=ctx.anOut[iPg] ); - ctx.anOut[iPg] = iCell; - nTotal = nByte; - } - -#ifndef NDEBUG + int iR = iPg+1; + while( 1 ){ + int nLeft = ctx.anCellSz[ ctx.anOut[iPg]-1 ] + 2; + int nRight = (bLeaf ? nLeft : (ctx.anCellSz[ ctx.anOut[iPg] ] + 2)); + + if( anByteOut[iPg]==nLeft || (anByteOut[iR] + nRight) > anByteOut[iPg] ){ + break; + } + ctx.anOut[iPg]--; + anByteOut[iPg] -= nLeft; + anByteOut[iR] += nRight; + } + } + +#ifdef BT_STDERR_DEBUG { int iDbg; - fprintf(stderr, "btBalance(): nIn=%d anIn[] = ", ctx.nIn); + fprintf(stderr, + "\nbtBalance(): bLeaf=%d nIn=%d anIn[] = ", ctx.bLeaf, ctx.nIn + ); for(iDbg=0; iDbg nOut=%d anOut[] = ", ctx.nOut); for(iDbg=0; iDbg=ctx.nIn ){ rc = sqlite4BtPageAllocate(pDb->pPager, &ctx.apPg[iPg]); @@ -1074,12 +1226,17 @@ if( rc!=SQLITE4_OK ) goto rebalance_out; } btSetBuffer(pDb, ctx.apPg[iPg], ctx.apOut[iPg]); ctx.apOut[iPg] = 0; } + for(iPg=ctx.nOut; iPgnPg--; rc = btDeleteFromPage(pCsr, ctx.nIn-1); } - if( rc==SQLITE4_OK ){ - rc = btInsertAndBalance(pCsr, ctx.nOut-1, aPCell); + iPg = pCsr->nPg; + if( rc==SQLITE4_OK && ctx.nOut>1 ){ + rc = btInsertAndBalance(pCsr, ctx.nOut-1, ctx.aPCell); + } + if( rc==SQLITE4_OK && iPg==pCsr->nPg ){ + rc = btBalanceIfUnderfull(pCsr); } -#ifndef NDEBUG +#ifdef BT_STDERR_DEBUG { u8 *aData = sqlite4BtPageData(pPar); printPage(stderr, sqlite4BtPagePgno(pPar), aData, pgsz); } #endif rebalance_out: - for(iPg=0; iPgpEnv, ctx.anCellSz); return rc; } static int btExtendTree(bt_cursor *pCsr){ bt_db * const pDb = pCsr->pDb; @@ -1183,11 +1352,13 @@ int nFree; /* Contiguous free space on this page */ int nReq = 0; /* Space required for type (a) cells */ int iCell; /* Position to insert new key */ int iWrite; /* Byte offset at which to write new cell */ int i; + int bLeaf; /* True if inserting into leaf page */ BtPage *pLeaf; + /* Bytes of space required on the current page. */ for(i=0; iaiCell[pCsr->nPg-1]; assert( pCsr->nPg>0 ); pLeaf = pCsr->apPage[pCsr->nPg-1]; aData = (u8*)sqlite4BtPageData(pLeaf); + + /* Set the bLeaf variable to true if inserting into a leaf page, or + ** false otherwise. Return SQLITE4_CORRUPT if the page is a leaf but + ** the KeyValue pairs being inserted are suitable for internal nodes, + ** or vice-versa. */ + assert( nKV>0 ); + bLeaf = (apKV[0].pgno==0); + if( (0==(btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL))!=bLeaf ){ + return btErrorBkpt(SQLITE4_CORRUPT); + } nCell = btCellCount(aData, pgsz); assert( iCell<=btCellCount(aData, pgsz) ); if( nCell==0 ){ /* If the nCell field is zero, then the rest of the header may ** contain invalid values (zeroes - as it may never have been ** initialized). So set our stack variables to values appropriate ** to an empty page explicitly here. */ - iWrite = ((btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL) ? 5 : 1); + iWrite = (bLeaf ? 1 : 5); nFree = pgsz - iWrite - 6; }else{ if( btFreeContiguous(aData, pgsz)=nReq ){ /* Special case - the new entry will not fit on the page at present ** but would if the page were defragmented. So defragment it before @@ -1228,11 +1409,11 @@ aData = sqlite4BtPageData(pLeaf); /* Make space within the cell pointer array */ if( iCell!=nCell ){ u8 *aFrom = btCellPtrFind(aData, pgsz, nCell-1); - u8 *aTo = btCellPtrFind(aData, pgsz, nCell); + u8 *aTo = btCellPtrFind(aData, pgsz, nCell-1+nKV); memmove(aTo, aFrom, (nCell-iCell) * 2); } for(i=0; inPg==1 ){ rc = btExtendTree(pCsr); } if( rc==SQLITE4_OK ){ - rc = btBalance(pCsr, nKV, apKV); + rc = btBalance(pCsr, bLeaf, nKV, apKV); } } return rc; } @@ -1308,27 +1489,62 @@ btPutU16(&aData[pgsz-4], btFreeSpace(aData, pgsz) + nFreed); } return rc; } + +static int btBalanceIfUnderfull(bt_cursor *pCsr){ + const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager); + int rc = SQLITE4_OK; + int iPg = pCsr->nPg-1; + BtPage *pPg = pCsr->apPage[iPg]; + u8 *aData = sqlite4BtPageData(pPg); + int nCell = btCellCount(aData, pgsz); + int nFree = btFreeSpace(aData, pgsz); + int bLeaf = (0==(btFlags(aData, pgsz) & BT_PGFLAGS_INTERNAL)); + + if( iPg==0 ){ + /* Root page. If it contains no cells at all and is not already + ** a leaf, shorten the tree by one here by copying the contents + ** of the only child into the root. */ + if( nCell==0 && bLeaf==0 ){ + BtPager *pPager = pCsr->pDb->pPager; + u32 pgno = btChildPgno(aData, pgsz, 0); + BtPage *pChild; + + rc = sqlite4BtPageWrite(pPg); + if( rc==SQLITE4_OK ){ + rc = sqlite4BtPageGet(pPager, pgno, &pChild); + } + if( rc==SQLITE4_OK ){ + u8 *a = sqlite4BtPageData(pChild); + memcpy(aData, a, pgsz); + rc = sqlite4BtPageTrim(pChild); + } + } + }else if( nCell==0 || (nFree>(2*pgsz/3) && bLeaf==0) ){ + rc = btBalance(pCsr, bLeaf, 0, 0); + } + return rc; +} /* ** Insert a new key/value pair or replace an existing one. */ int sqlite4BtReplace(bt_db *db, const void *pK, int nK, const void *pV, int nV){ int rc = SQLITE4_OK; bt_cursor csr; btCsrSetup(db, &csr); - rc = sqlite4BtCsrSeek(&csr, pK, nK, BT_SEEK_GE); + rc = btCsrSeek(&csr, pK, nK, BT_SEEK_GE, 1); if( rc==SQLITE4_OK ){ /* The cursor currently points to an entry with key pK/nK. This call ** should therefore replace that entry. So delete it and then re-seek ** the cursor. */ rc = sqlite4BtDelete(&csr); if( rc==SQLITE4_OK ){ - rc = sqlite4BtCsrSeek(&csr, pK, nK, BT_SEEK_GE); + rc = btCsrSeek(&csr, pK, nK, BT_SEEK_GE, 1); } } if( rc==SQLITE4_OK ) rc = btErrorBkpt(SQLITE4_CORRUPT); if( rc==SQLITE4_NOTFOUND || rc==SQLITE4_INEXACT ){ @@ -1342,11 +1558,15 @@ return rc; } int sqlite4BtDelete(bt_cursor *pCsr){ - return btDeleteFromPage(pCsr, 1); + int rc; + rc = btDeleteFromPage(pCsr, 1); + if( rc==SQLITE4_OK ){ + rc = btBalanceIfUnderfull(pCsr); + } } int sqlite4BtSetCookie(bt_db *db, unsigned int iVal){ return sqlite4BtPagerSetCookie(db->pPager, iVal); } Index: src/bt_pager.c ================================================================== --- src/bt_pager.c +++ src/bt_pager.c @@ -109,11 +109,11 @@ memset(aNew, 0, nNew*sizeof(BtPage*)); for(i=0; ihash.nHash; i++){ while( aOld[i] ){ BtPage *pShift = aOld[i]; aOld[i] = pShift->pNextHash; - h = hashkey(nNew, pPg->pgno); + h = hashkey(nNew, pShift->pgno); pShift->pNextHash = aNew[h]; aNew[h] = pShift; } } p->hash.aHash = aNew; @@ -457,10 +457,14 @@ ** Read, write and trim existing database pages. */ int sqlite4BtPageGet(BtPager *p, u32 pgno, BtPage **ppPg){ int rc = SQLITE4_OK; /* Return code */ BtPage *pRet; /* Returned page handle */ + + if( pgno>100000 ){ + return btErrorBkpt(SQLITE4_CORRUPT); + } /* Search the cache for an existing page. */ pRet = btHashSearch(p, pgno); /* If the page is not in the cache, load it from disk */ @@ -498,18 +502,24 @@ pPg->pPager->pDirty = pPg; } return SQLITE4_OK; } +/* +** Decrement the refcount on page pPg. Also, indicate that page pPg is +** no longer in use. +*/ int sqlite4BtPageTrim(BtPage *pPg){ - assert( !"todo" ); + /* assert( !"todo" ); */ return SQLITE4_OK; } int sqlite4BtPageRelease(BtPage *pPg){ - assert( pPg->nRef>=1 ); - pPg->nRef--; + if( pPg ){ + assert( pPg->nRef>=1 ); + pPg->nRef--; + } return SQLITE4_OK; } void sqlite4BtPageReference(BtPage *pPg){ assert( pPg->nRef>=1 ); @@ -534,11 +544,13 @@ }else{ p->dbhdr.nPg = pgno; } } +#ifdef BT_STDERR_DEBUG fprintf(stderr, "allocated page %d\n", pgno); +#endif *ppPg = pPg; return rc; } Index: test/simple3.test ================================================================== --- test/simple3.test +++ test/simple3.test @@ -89,11 +89,34 @@ do_execsql_test 2.5 { SELECT a, length(b) FROM t1 } {1 200 3 200 4 200 5 200 6 200} -#-------------------------------------------------------------------------- + +#------------------------------------------------------------------------- + +proc lshuffle {list} { + set nVal [llength $list] + for {set i 0} {$i < $nVal} {incr i} { + set i2 [expr int(rand()*$nVal)] + set tmp [lindex $list $i] + lset list $i [lindex $list $i2] + lset list $i2 $tmp + } + return $list +} + +proc K {a} { set a } + +proc int_list {nVal} { + set ret [list] + for {set i 0} {$i < $nVal} {incr i} { + lappend ret $i + } + return $ret +} + do_test 3.0 { catch { db close } forcedelete test.db sqlite4 db file:test.db?kv=bt } {} @@ -100,17 +123,48 @@ do_execsql_test 3.1 { CREATE TABLE t1(a PRIMARY KEY, b); } -for {set i 0} {$i < 100} {incr i} { -if {$i==6} breakpoint - lappend rows $i - do_execsql_test 3.2.$i { - INSERT INTO t1 VALUES($i, randomblob(200)); - SELECT a FROM t1 ORDER BY a; - } $rows -} - +set nRow 100000 +set nStep [expr $nRow / 50] + +foreach {tn shuffle_proc} { + 1 K + 2 lshuffle +} { + + set iRow 0 + foreach k [$shuffle_proc [int_list $nRow]] { + incr iRow + + execsql { INSERT INTO t1 VALUES($k, randomblob(100)); } + if {0==($iRow % $nStep)} { + do_execsql_test 4.$tn.1.$iRow { + SELECT count(*) FROM t1; + } $iRow + } + } + + do_test 4.$tn.2 { + set nInitial [db one {SELECT count(*) FROM t1}] + for {set i 0} {$i < $nRow} {incr i} { + set res [execsql {SELECT count(*) FROM t1 WHERE a = $i}] + if {$res!="1"} { error "res = $res for i=$i" } + } + } {} + + set iRow 0 + foreach k [$shuffle_proc [int_list $nRow]] { + incr iRow + + execsql { DELETE FROM t1 WHERE a = $k } + if {0==($iRow % $nStep)} { + do_execsql_test 4.$tn.3.$iRow { + SELECT count(*) FROM t1; + } [expr $nRow - $iRow] + } + } +} finish_test