Index: src/btree.c ================================================================== --- src/btree.c +++ src/btree.c @@ -1149,28 +1149,27 @@ int cellOffset; /* Offset to the cell pointer array */ int cbrk; /* Offset to the cell content area */ int nCell; /* Number of cells on the page */ unsigned char *data; /* The page data */ unsigned char *temp; /* Temp area for cell content */ + unsigned char *src; /* Source of content */ int iCellFirst; /* First allowable cell index */ int iCellLast; /* Last possible cell index */ assert( sqlite3PagerIswriteable(pPage->pDbPage) ); assert( pPage->pBt!=0 ); assert( pPage->pBt->usableSize <= SQLITE_MAX_PAGE_SIZE ); assert( pPage->nOverflow==0 ); assert( sqlite3_mutex_held(pPage->pBt->mutex) ); - temp = sqlite3PagerTempSpace(pPage->pBt->pPager); - data = pPage->aData; + temp = 0; + src = data = pPage->aData; hdr = pPage->hdrOffset; cellOffset = pPage->cellOffset; nCell = pPage->nCell; assert( nCell==get2byte(&data[hdr+3]) ); usableSize = pPage->pBt->usableSize; - cbrk = get2byte(&data[hdr+5]); - memcpy(&temp[cbrk], &data[cbrk], usableSize - cbrk); cbrk = usableSize; iCellFirst = cellOffset + 2*nCell; iCellLast = usableSize - 4; for(i=0; iiCellLast ){ return SQLITE_CORRUPT_BKPT; } #endif assert( pc>=iCellFirst && pc<=iCellLast ); - size = cellSizePtr(pPage, &temp[pc]); + size = cellSizePtr(pPage, &src[pc]); cbrk -= size; #if defined(SQLITE_ENABLE_OVERSIZE_CELL_CHECK) if( cbrk=iCellFirst ); testcase( cbrk+size==usableSize ); testcase( pc+size==usableSize ); - memcpy(&data[cbrk], &temp[pc], size); put2byte(pAddr, cbrk); + if( temp==0 ){ + int x; + if( cbrk==pc ) continue; + temp = sqlite3PagerTempSpace(pPage->pBt->pPager); + x = get2byte(&data[hdr+5]); + memcpy(&temp[x], &data[x], (cbrk+size) - x); + src = temp; + } + memcpy(&data[cbrk], &src[pc], size); } assert( cbrk>=iCellFirst ); put2byte(&data[hdr+5], cbrk); data[hdr+1] = 0; data[hdr+2] = 0; @@ -1214,10 +1221,68 @@ if( cbrk-iCellFirst!=pPage->nFree ){ return SQLITE_CORRUPT_BKPT; } return SQLITE_OK; } + +/* +** Search the free-list on page pPg for space to store a cell nByte bytes in +** size. If one can be found, return a pointer to the space and remove it +** from the free-list. +** +** If no suitable space can be found on the free-list, return NULL. +** +** This function may detect corruption within pPg. If it does and argument +** pRc is non-NULL, then *pRc is set to SQLITE_CORRUPT and NULL is returned. +** Or, if corruption is detected and pRc is NULL, NULL is returned and the +** corruption goes unreported. +** +** If a slot of at least nByte bytes is found but cannot be used because +** there are already at least 60 fragmented bytes on the page, return NULL. +** In this case, if pbDefrag parameter is not NULL, set *pbDefrag to true. +*/ +static u8 *pageFindSlot(MemPage *pPg, int nByte, int *pRc, int *pbDefrag){ + const int hdr = pPg->hdrOffset; + u8 * const aData = pPg->aData; + int iAddr; + int pc; + int usableSize = pPg->pBt->usableSize; + + for(iAddr=hdr+1; (pc = get2byte(&aData[iAddr]))>0; iAddr=pc){ + int size; /* Size of the free slot */ + if( pc>usableSize-4 || pc=nByte ){ + int x = size - nByte; + testcase( x==4 ); + testcase( x==3 ); + if( x<4 ){ + if( aData[hdr+7]>=60 ){ + if( pbDefrag ) *pbDefrag = 1; + return 0; + } + /* Remove the slot from the free-list. Update the number of + ** fragmented bytes within the page. */ + memcpy(&aData[iAddr], &aData[pc], 2); + aData[hdr+7] += (u8)x; + }else if( size+pc > usableSize ){ + if( pRc ) *pRc = SQLITE_CORRUPT_BKPT; + return 0; + }else{ + /* The slot remains on the free-list. Reduce its size to account + ** for the portion used by the new allocation. */ + put2byte(&aData[pc+2], x); + } + return &aData[pc + x]; + } + } + + return 0; +} /* ** Allocate nByte bytes of space from within the B-Tree page passed ** as the first argument. Write into *pIdx the index into pPage->aData[] ** of the first byte of allocated space. Return either SQLITE_OK or @@ -1265,46 +1330,27 @@ */ testcase( gap+2==top ); testcase( gap+1==top ); testcase( gap==top ); if( gap+2<=top && (data[hdr+1] || data[hdr+2]) ){ - int pc, addr; - for(addr=hdr+1; (pc = get2byte(&data[addr]))>0; addr=pc){ - int size; /* Size of the free slot */ - if( pc>usableSize-4 || pc=nByte ){ - int x = size - nByte; - testcase( x==4 ); - testcase( x==3 ); - if( x<4 ){ - if( data[hdr+7]>=60 ) goto defragment_page; - /* Remove the slot from the free-list. Update the number of - ** fragmented bytes within the page. */ - memcpy(&data[addr], &data[pc], 2); - data[hdr+7] += (u8)x; - }else if( size+pc > usableSize ){ - return SQLITE_CORRUPT_BKPT; - }else{ - /* The slot remains on the free-list. Reduce its size to account - ** for the portion used by the new allocation. */ - put2byte(&data[pc+2], x); - } - *pIdx = pc + x; - return SQLITE_OK; - } + int rc = SQLITE_OK; + int bDefrag = 0; + u8 *pSpace = pageFindSlot(pPage, nByte, &rc, &bDefrag); + if( rc ) return rc; + if( bDefrag ) goto defragment_page; + if( pSpace ){ + *pIdx = pSpace - data; + return SQLITE_OK; } } /* The request could not be fulfilled using a freelist slot. Check ** to see if defragmentation is necessary. */ testcase( gap+2+nByte==top ); if( gap+2+nByte>top ){ -defragment_page: + defragment_page: testcase( pPage->nCell==0 ); rc = defragmentPage(pPage); if( rc ) return rc; top = get2byteNotZero(&data[hdr+5]); assert( gap+nByte<=top ); @@ -1348,11 +1394,11 @@ unsigned char *data = pPage->aData; /* Page content */ assert( pPage->pBt!=0 ); assert( sqlite3PagerIswriteable(pPage->pDbPage) ); assert( iStart>=pPage->hdrOffset+6+pPage->childPtrSize ); - assert( iEnd <= pPage->pBt->usableSize ); + assert( CORRUPT_DB || iEnd <= pPage->pBt->usableSize ); assert( sqlite3_mutex_held(pPage->pBt->mutex) ); assert( iSize>=4 ); /* Minimum cell size is 4 */ assert( iStart<=iLast ); /* Overwrite deleted information with zeros when the secure_delete @@ -5936,49 +5982,259 @@ #endif } } /* -** Add a list of cells to a page. The page should be initially empty. -** The cells are guaranteed to fit on the page. +** Array apCell[] contains pointers to nCell b-tree page cells. The +** szCell[] array contains the size in bytes of each cell. This function +** replaces the current contents of page pPg with the contents of the cell +** array. +** +** Some of the cells in apCell[] may currently be stored in pPg. This +** function works around problems caused by this by making a copy of any +** such cells before overwriting the page data. +** +** The MemPage.nFree field is invalidated by this function. It is the +** responsibility of the caller to set it correctly. +*/ +static void rebuildPage( + MemPage *pPg, /* Edit this page */ + int nCell, /* Final number of cells on page */ + u8 **apCell, /* Array of cells */ + u16 *szCell /* Array of cell sizes */ +){ + const int hdr = pPg->hdrOffset; /* Offset of header on pPg */ + u8 * const aData = pPg->aData; /* Pointer to data for pPg */ + const int usableSize = pPg->pBt->usableSize; + u8 * const pEnd = &aData[usableSize]; + int i; + u8 *pCellptr = pPg->aCellIdx; + u8 *pTmp = sqlite3PagerTempSpace(pPg->pBt->pPager); + u8 *pData; + + i = get2byte(&aData[hdr+5]); + memcpy(&pTmp[i], &aData[i], usableSize - i); + + pData = pEnd; + for(i=0; iaData && pCellnFree field is now set incorrectly. The caller will fix it. */ + pPg->nCell = nCell; + pPg->nOverflow = 0; + + put2byte(&aData[hdr+1], 0); + put2byte(&aData[hdr+3], pPg->nCell); + put2byte(&aData[hdr+5], pData - aData); + aData[hdr+7] = 0x00; +} + +/* +** Array apCell[] contains nCell pointers to b-tree cells. Array szCell +** contains the size in bytes of each such cell. This function attempts to +** add the cells stored in the array to page pPg. If it cannot (because +** the page needs to be defragmented before the cells will fit), non-zero +** is returned. Otherwise, if the cells are added successfully, zero is +** returned. +** +** Argument pCellptr points to the first entry in the cell-pointer array +** (part of page pPg) to populate. After cell apCell[0] is written to the +** page body, a 16-bit offset is written to pCellptr. And so on, for each +** cell in the array. It is the responsibility of the caller to ensure +** that it is safe to overwrite this part of the cell-pointer array. +** +** When this function is called, *ppData points to the start of the +** content area on page pPg. If the size of the content area is extended, +** *ppData is updated to point to the new start of the content area +** before returning. +** +** Finally, argument pBegin points to the byte immediately following the +** end of the space required by this page for the cell-pointer area (for +** all cells - not just those inserted by the current call). If the content +** area must be extended to before this point in order to accomodate all +** cells in apCell[], then the cells do not fit and non-zero is returned. +*/ +static int pageInsertArray( + MemPage *pPg, /* Page to add cells to */ + u8 *pBegin, /* End of cell-pointer array */ + u8 **ppData, /* IN/OUT: Page content -area pointer */ + u8 *pCellptr, /* Pointer to cell-pointer area */ + int nCell, /* Number of cells to add to pPg */ + u8 **apCell, /* Array of cells */ + u16 *szCell /* Array of cell sizes */ +){ + int i; + u8 *aData = pPg->aData; + u8 *pData = *ppData; + const int bFreelist = aData[1] || aData[2]; + assert( CORRUPT_DB || pPg->hdrOffset==0 ); /* Never called on page 1 */ + for(i=0; iaData; + u8 * const pEnd = &aData[pPg->pBt->usableSize]; + u8 * const pStart = &aData[pPg->hdrOffset + 8 + pPg->childPtrSize]; + int nRet = 0; + int i; + u8 *pFree = 0; + int szFree = 0; + + for(i=0; i=pStart && pCellpEnd ) return 0; + }else{ + pFree = pCell; + szFree += sz; + } + nRet++; + } + } + if( pFree ) freeSpace(pPg, pFree - aData, szFree); + return nRet; +} + +/* +** The pPg->nFree field is invalid when this function returns. It is the +** responsibility of the caller to set it correctly. */ -static void assemblePage( - MemPage *pPage, /* The page to be assembled */ - int nCell, /* The number of cells to add to this page */ - u8 **apCell, /* Pointers to cell bodies */ - u16 *aSize /* Sizes of the cells */ +static void editPage( + MemPage *pPg, /* Edit this page */ + int iOld, /* Index of first cell currently on page */ + int iNew, /* Index of new first cell on page */ + int nNew, /* Final number of cells on page */ + u8 **apCell, /* Array of cells */ + u16 *szCell /* Array of cell sizes */ ){ - int i; /* Loop counter */ - u8 *pCellptr; /* Address of next cell pointer */ - int cellbody; /* Address of next cell body */ - u8 * const data = pPage->aData; /* Pointer to data for pPage */ - const int hdr = pPage->hdrOffset; /* Offset of header on pPage */ - const int nUsable = pPage->pBt->usableSize; /* Usable size of page */ - - assert( pPage->nOverflow==0 ); - assert( sqlite3_mutex_held(pPage->pBt->mutex) ); - assert( nCell>=0 && nCell<=(int)MX_CELL(pPage->pBt) - && (int)MX_CELL(pPage->pBt)<=10921); - assert( sqlite3PagerIswriteable(pPage->pDbPage) ); - - /* Check that the page has just been zeroed by zeroPage() */ - assert( pPage->nCell==0 ); - assert( get2byteNotZero(&data[hdr+5])==nUsable ); - - pCellptr = &pPage->aCellIdx[nCell*2]; - cellbody = nUsable; - for(i=nCell-1; i>=0; i--){ - u16 sz = aSize[i]; - pCellptr -= 2; - cellbody -= sz; - put2byte(pCellptr, cellbody); - memcpy(&data[cellbody], apCell[i], sz); - } - put2byte(&data[hdr+3], nCell); - put2byte(&data[hdr+5], cellbody); - pPage->nFree -= (nCell*2 + nUsable - cellbody); - pPage->nCell = (u16)nCell; + u8 * const aData = pPg->aData; + const int hdr = pPg->hdrOffset; + u8 *pBegin = &pPg->aCellIdx[nNew * 2]; + int nCell = pPg->nCell; /* Cells stored on pPg */ + u8 *pData; + u8 *pCellptr; + int i; + int iOldEnd = iOld + pPg->nCell + pPg->nOverflow; + int iNewEnd = iNew + nNew; + +#ifdef SQLITE_DEBUG + u8 *pTmp = sqlite3PagerTempSpace(pPg->pBt->pPager); + memcpy(pTmp, aData, pPg->pBt->usableSize); +#endif + + /* Remove cells from the start and end of the page */ + if( iOldaCellIdx, &pPg->aCellIdx[nShift*2], nCell*2); + nCell -= nShift; + } + if( iNewEnd < iOldEnd ){ + nCell -= pageFreeArray( + pPg, iOldEnd-iNewEnd, &apCell[iNewEnd], &szCell[iNewEnd] + ); + } + + pData = &aData[get2byte(&aData[hdr+5])]; + if( pDataaCellIdx; + memmove(&pCellptr[nAdd*2], pCellptr, nCell*2); + if( pageInsertArray( + pPg, pBegin, &pData, pCellptr, + nAdd, &apCell[iNew], &szCell[iNew] + ) ) goto editpage_fail; + nCell += nAdd; + } + + /* Add any overflow cells */ + for(i=0; inOverflow; i++){ + int iCell = (iOld + pPg->aiOvfl[i]) - iNew; + if( iCell>=0 && iCellaCellIdx[iCell * 2]; + memmove(&pCellptr[2], pCellptr, (nCell - iCell) * 2); + nCell++; + if( pageInsertArray( + pPg, pBegin, &pData, pCellptr, + 1, &apCell[iCell + iNew], &szCell[iCell + iNew] + ) ) goto editpage_fail; + } + } + + /* Append cells to the end of the page */ + pCellptr = &pPg->aCellIdx[nCell*2]; + if( pageInsertArray( + pPg, pBegin, &pData, pCellptr, + nNew-nCell, &apCell[iNew+nCell], &szCell[iNew+nCell] + ) ) goto editpage_fail; + + pPg->nCell = nNew; + pPg->nOverflow = 0; + + put2byte(&aData[hdr+3], pPg->nCell); + put2byte(&aData[hdr+5], pData - aData); + +#ifdef SQLITE_DEBUG + for(i=0; iaCellIdx[i*2]); + if( pCell>=aData && pCell<&aData[pPg->pBt->usableSize] ){ + pCell = &pTmp[pCell - aData]; + } + assert( 0==memcmp(pCell, &aData[iOff], szCell[i+iNew]) ); + } +#endif + + return; + editpage_fail: + /* Unable to edit this page. Rebuild it from scratch instead. */ + rebuildPage(pPg, nNew, &apCell[iNew], &szCell[iNew]); } /* ** The following parameters determine how many adjacent pages get involved ** in a balancing operation. NN is the number of neighbors on either side @@ -6046,11 +6302,12 @@ u8 *pStop; assert( sqlite3PagerIswriteable(pNew->pDbPage) ); assert( pPage->aData[0]==(PTF_INTKEY|PTF_LEAFDATA|PTF_LEAF) ); zeroPage(pNew, PTF_INTKEY|PTF_LEAFDATA|PTF_LEAF); - assemblePage(pNew, 1, &pCell, &szCell); + rebuildPage(pNew, 1, &pCell, &szCell); + pNew->nFree = pBt->usableSize - pNew->cellOffset - 2 - szCell; /* If this is an auto-vacuum database, update the pointer map ** with entries for the new page, and any pointer from the ** cell on the page to an overflow page. If either of these ** operations fails, the return code is set, but the contents @@ -6265,21 +6522,25 @@ int subtotal; /* Subtotal of bytes in cells on one page */ int iSpace1 = 0; /* First unused byte of aSpace1[] */ int iOvflSpace = 0; /* First unused byte of aOvflSpace[] */ int szScratch; /* Size of scratch memory requested */ MemPage *apOld[NB]; /* pPage and up to two siblings */ - MemPage *apCopy[NB]; /* Private copies of apOld[] pages */ MemPage *apNew[NB+2]; /* pPage and up to NB siblings after balancing */ u8 *pRight; /* Location in parent of right-sibling pointer */ u8 *apDiv[NB-1]; /* Divider cells in pParent */ int cntNew[NB+2]; /* Index in aCell[] of cell after i-th page */ + int cntOld[NB+2]; /* Old index in aCell[] after i-th page */ int szNew[NB+2]; /* Combined size of cells place on i-th page */ u8 **apCell = 0; /* All cells begin balanced */ u16 *szCell; /* Local size of all cells in apCell[] */ u8 *aSpace1; /* Space for copies of dividers cells */ Pgno pgno; /* Temp var to store a page number in */ + u8 abDone[NB+2]; /* True after i'th new page is populated */ + Pgno aPgno[NB+2]; /* Page numbers of new pages before shuffling */ + u16 aPgFlags[NB+2]; /* flags field of new pages before shuffling */ + memset(abDone, 0, sizeof(abDone)); pBt = pParent->pBt; assert( sqlite3_mutex_held(pBt->mutex) ); assert( sqlite3PagerIswriteable(pParent->pDbPage) ); #if 0 @@ -6384,16 +6645,14 @@ nMaxCells = (nMaxCells + 3)&~3; /* ** Allocate space for memory structures */ - k = pBt->pageSize + ROUND8(sizeof(MemPage)); szScratch = nMaxCells*sizeof(u8*) /* apCell */ + nMaxCells*sizeof(u16) /* szCell */ - + pBt->pageSize /* aSpace1 */ - + k*nOld; /* Page copies (apCopy) */ + + pBt->pageSize; /* aSpace1 */ apCell = sqlite3ScratchMalloc( szScratch ); if( apCell==0 ){ rc = SQLITE_NOMEM; goto balance_cleanup; } @@ -6402,12 +6661,12 @@ assert( EIGHT_BYTE_ALIGNMENT(aSpace1) ); /* ** Load pointers to all cells on sibling pages and the divider cells ** into the local apCell[] array. Make copies of the divider cells - ** into space obtained from aSpace1[] and remove the divider cells - ** from pParent. + ** into space obtained from aSpace1[]. The divider cells have already + ** been removed from pParent. ** ** If the siblings are on leaf pages, then the child pointers of the ** divider cells are stripped from the cells before they are copied ** into aSpace1[]. In this way, all cells in apCell[] are without ** child pointers. If siblings are not leaves, then all cell in @@ -6419,19 +6678,11 @@ */ leafCorrection = apOld[0]->leaf*4; leafData = apOld[0]->intKeyLeaf; for(i=0; ipageSize + k*i]; - memcpy(pOld, apOld[i], sizeof(MemPage)); - pOld->aData = (void*)&pOld[1]; - memcpy(pOld->aData, apOld[i]->aData, pBt->pageSize); + MemPage *pOld = apOld[i]; limit = pOld->nCell+pOld->nOverflow; if( pOld->nOverflow>0 ){ for(j=0; jusableSize - 12 + leafCorrection; for(subtotal=k=i=0; i usableSpace ){ - szNew[k] = subtotal - szCell[i]; + szNew[k] = subtotal - szCell[i] - 2; cntNew[k] = i; if( leafData ){ i--; } subtotal = 0; k++; if( k>NB+1 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; } @@ -6556,14 +6808,14 @@ */ #if 0 assert( cntNew[0]>0 || (pParent->pgno==1 && pParent->nCell==0) ); #endif - TRACE(("BALANCE: old: %d %d %d ", - apOld[0]->pgno, - nOld>=2 ? apOld[1]->pgno : 0, - nOld>=3 ? apOld[2]->pgno : 0 + TRACE(("BALANCE: old: %d(nc=%d) %d(nc=%d) %d(nc=%d)\n", + apOld[0]->pgno, apOld[0]->nCell, + nOld>=2 ? apOld[1]->pgno : 0, nOld>=2 ? apOld[1]->nCell : 0, + nOld>=3 ? apOld[2]->pgno : 0, nOld>=3 ? apOld[2]->nCell : 0 )); /* ** Allocate k new pages. Reuse old pages where possible. */ @@ -6582,12 +6834,14 @@ if( rc ) goto balance_cleanup; }else{ assert( i>0 ); rc = allocateBtreePage(pBt, &pNew, &pgno, (bBulk ? 1 : pgno), 0); if( rc ) goto balance_cleanup; + zeroPage(pNew, pageFlags); apNew[i] = pNew; nNew++; + cntOld[i] = nCell; /* Set the pointer-map entry for the new sibling page. */ if( ISAUTOVACUUM ){ ptrmapPut(pBt, pNew->pgno, PTRMAP_BTREE, pParent->pgno, &rc); if( rc!=SQLITE_OK ){ @@ -6595,139 +6849,228 @@ } } } } - /* Free any old pages that were not reused as new pages. - */ - while( ipgno; - int minI = i; - for(j=i+1; jpgno<(unsigned)minV ){ - minI = j; - minV = apNew[j]->pgno; - } - } - if( minI>i ){ - MemPage *pT; - pT = apNew[i]; - apNew[i] = apNew[minI]; - apNew[minI] = pT; - } - } - TRACE(("new: %d(%d) %d(%d) %d(%d) %d(%d) %d(%d)\n", - apNew[0]->pgno, szNew[0], + ** Reassign page numbers so that the new pages are in ascending order. + ** This helps to keep entries in the disk file in order so that a scan + ** of the table is closer to a linear scan through the file. That in turn + ** helps the operating system to deliver pages from the disk more rapidly. + ** + ** An O(n^2) insertion sort algorithm is used, but since n is never more + ** than (NB+2) (a small constant), that should not be a problem. + ** + ** When NB==3, this one optimization makes the database about 25% faster + ** for large insertions and deletions. + */ + for(i=0; ipgno; + aPgFlags[i] = apNew[i]->pDbPage->flags; + for(j=0; ji ){ + sqlite3PagerRekey(apNew[iBest]->pDbPage, pBt->nPage+iBest+1, 0); + } + sqlite3PagerRekey(apNew[i]->pDbPage, pgno, aPgFlags[iBest]); + apNew[i]->pgno = pgno; + } + } + + TRACE(("BALANCE: new: %d(%d nc=%d) %d(%d nc=%d) %d(%d nc=%d) " + "%d(%d nc=%d) %d(%d nc=%d)\n", + apNew[0]->pgno, szNew[0], cntNew[0], nNew>=2 ? apNew[1]->pgno : 0, nNew>=2 ? szNew[1] : 0, + nNew>=2 ? cntNew[1] - cntNew[0] - !leafData : 0, nNew>=3 ? apNew[2]->pgno : 0, nNew>=3 ? szNew[2] : 0, + nNew>=3 ? cntNew[2] - cntNew[1] - !leafData : 0, nNew>=4 ? apNew[3]->pgno : 0, nNew>=4 ? szNew[3] : 0, - nNew>=5 ? apNew[4]->pgno : 0, nNew>=5 ? szNew[4] : 0)); + nNew>=4 ? cntNew[3] - cntNew[2] - !leafData : 0, + nNew>=5 ? apNew[4]->pgno : 0, nNew>=5 ? szNew[4] : 0, + nNew>=5 ? cntNew[4] - cntNew[3] - !leafData : 0 + )); assert( sqlite3PagerIswriteable(pParent->pDbPage) ); put4byte(pRight, apNew[nNew-1]->pgno); - /* - ** Evenly distribute the data in apCell[] across the new pages. - ** Insert divider cells into pParent as necessary. + /* If the sibling pages are not leaves, ensure that the right-child pointer + ** of the right-most new sibling page is set to the value that was + ** originally in the same field of the right-most old sibling page. */ + if( (pageFlags & PTF_LEAF)==0 && nOld!=nNew ){ + MemPage *pOld = (nNew>nOld ? apNew : apOld)[nOld-1]; + memcpy(&apNew[nNew-1]->aData[8], &pOld->aData[8], 4); + } + + /* Make any required updates to pointer map entries associated with + ** cells stored on sibling pages following the balance operation. Pointer + ** map entries associated with divider cells are set by the insertCell() + ** routine. The associated pointer map entries are: + ** + ** a) if the cell contains a reference to an overflow chain, the + ** entry associated with the first page in the overflow chain, and + ** + ** b) if the sibling pages are not leaves, the child page associated + ** with the cell. + ** + ** If the sibling pages are not leaves, then the pointer map entry + ** associated with the right-child of each sibling may also need to be + ** updated. This happens below, after the sibling pages have been + ** populated, not here. */ - j = 0; - for(i=0; iaData; + int cntOldNext = pNew->nCell + pNew->nOverflow; + int usableSize = pBt->usableSize; + int iNew = 0; + int iOld = 0; + + for(i=0; inCell + pOld->nOverflow + !leafData; + aOld = pOld->aData; + } + if( i==cntNew[iNew] ){ + pNew = apNew[++iNew]; + if( !leafData ) continue; + } + + /* Cell pCell is destined for new sibling page pNew. Originally, it + ** was either part of sibling page iOld (possibly an overflow page), + ** or else the divider cell to the left of sibling page iOld. So, + ** if sibling page iOld had the same page number as pNew, and if + ** pCell really was a part of sibling page iOld (not a divider or + ** overflow cell), we can skip updating the pointer map entries. */ + if( pNew->pgno!=aPgno[iOld] || pCell=&aOld[usableSize] ){ + if( !leafCorrection ){ + ptrmapPut(pBt, get4byte(pCell), PTRMAP_BTREE, pNew->pgno, &rc); + } + if( szCell[i]>pNew->minLocal ){ + ptrmapPutOvflPtr(pNew, pCell, &rc); + } + } + } + } + + /* Insert new divider cells into pParent. */ + for(i=0; inCell>0 || (nNew==1 && cntNew[0]==0) ); - assert( pNew->nOverflow==0 ); - j = cntNew[i]; - /* If the sibling page assembled above was not the right-most sibling, - ** insert a divider cell into the parent page. - */ - assert( ileaf ){ - memcpy(&pNew->aData[8], pCell, 4); - }else if( leafData ){ - /* If the tree is a leaf-data tree, and the siblings are leaves, - ** then there is no divider cell in apCell[]. Instead, the divider - ** cell consists of the integer key for the right-most cell of - ** the sibling-page assembled above only. - */ - CellInfo info; - j--; - btreeParseCellPtr(pNew, apCell[j], &info); - pCell = pTemp; - sz = 4 + putVarint(&pCell[4], info.nKey); - pTemp = 0; + assert( jleaf ){ + memcpy(&pNew->aData[8], pCell, 4); + }else if( leafData ){ + /* If the tree is a leaf-data tree, and the siblings are leaves, + ** then there is no divider cell in apCell[]. Instead, the divider + ** cell consists of the integer key for the right-most cell of + ** the sibling-page assembled above only. + */ + CellInfo info; + j--; + btreeParseCellPtr(pNew, apCell[j], &info); + pCell = pTemp; + sz = 4 + putVarint(&pCell[4], info.nKey); + pTemp = 0; + }else{ + pCell -= 4; + /* Obscure case for non-leaf-data trees: If the cell at pCell was + ** previously stored on a leaf node, and its reported size was 4 + ** bytes, then it may actually be smaller than this + ** (see btreeParseCellPtr(), 4 bytes is the minimum size of + ** any cell). But it is important to pass the correct size to + ** insertCell(), so reparse the cell now. + ** + ** Note that this can never happen in an SQLite data file, as all + ** cells are at least 4 bytes. It only happens in b-trees used + ** to evaluate "IN (SELECT ...)" and similar clauses. + */ + if( szCell[j]==4 ){ + assert(leafCorrection==4); + sz = cellSizePtr(pParent, pCell); + } + } + iOvflSpace += sz; + assert( sz<=pBt->maxLocal+23 ); + assert( iOvflSpace <= (int)pBt->pageSize ); + insertCell(pParent, nxDiv+i, pCell, sz, pTemp, pNew->pgno, &rc); + if( rc!=SQLITE_OK ) goto balance_cleanup; + assert( sqlite3PagerIswriteable(pParent->pDbPage) ); + } + + /* Now update the actual sibling pages. The order in which they are updated + ** is important, as this code needs to avoid disrupting any page from which + ** cells may still to be read. In practice, this means: + ** + ** 1) If cells are to be removed from the start of the page and shifted + ** to the left-hand sibling, it is not safe to update the page until + ** the left-hand sibling (apNew[i-1]) has already been updated. + ** + ** 2) If cells are to be removed from the end of the page and shifted + ** to the right-hand sibling, it is not safe to update the page until + ** the right-hand sibling (apNew[i+1]) has already been updated. + ** + ** If neither of the above apply, the page is safe to update. + */ + for(i=0; i=nNew ? i-nNew : nNew-1-i); + if( abDone[iPg]==0 + && (iPg==0 || cntOld[iPg-1]>=cntNew[iPg-1] || abDone[iPg-1]) + && (cntNew[iPg]>=cntOld[iPg] || abDone[iPg+1]) + ){ + int iNew; + int iOld; + int nNewCell; + + if( iPg==0 ){ + iNew = iOld = 0; + nNewCell = cntNew[0]; }else{ - pCell -= 4; - /* Obscure case for non-leaf-data trees: If the cell at pCell was - ** previously stored on a leaf node, and its reported size was 4 - ** bytes, then it may actually be smaller than this - ** (see btreeParseCellPtr(), 4 bytes is the minimum size of - ** any cell). But it is important to pass the correct size to - ** insertCell(), so reparse the cell now. - ** - ** Note that this can never happen in an SQLite data file, as all - ** cells are at least 4 bytes. It only happens in b-trees used - ** to evaluate "IN (SELECT ...)" and similar clauses. - */ - if( szCell[j]==4 ){ - assert(leafCorrection==4); - sz = cellSizePtr(pParent, pCell); - } - } - iOvflSpace += sz; - assert( sz<=pBt->maxLocal+23 ); - assert( iOvflSpace <= (int)pBt->pageSize ); - insertCell(pParent, nxDiv, pCell, sz, pTemp, pNew->pgno, &rc); - if( rc!=SQLITE_OK ) goto balance_cleanup; - assert( sqlite3PagerIswriteable(pParent->pDbPage) ); - - j++; - nxDiv++; - } - } - assert( j==nCell ); + iOld = iPgnFree = usableSpace-szNew[iPg]; + assert( apNew[iPg]->nOverflow==0 ); + assert( apNew[iPg]->nCell==nNewCell ); + } + } + assert( memcmp(abDone, "\01\01\01\01\01", nNew)==0 ); + assert( nOld>0 ); assert( nNew>0 ); - if( (pageFlags & PTF_LEAF)==0 ){ - u8 *zChild = &apCopy[nOld-1]->aData[8]; - memcpy(&apNew[nNew-1]->aData[8], zChild, 4); - } if( isRoot && pParent->nCell==0 && pParent->hdrOffset<=apNew[0]->nFree ){ /* The root page of the b-tree now contains no cells. The only sibling ** page is the right-child of the parent. Copy the contents of the ** child page into the parent, decreasing the overall height of the @@ -6741,125 +7084,48 @@ ** The second assert below verifies that the child page is defragmented ** (it must be, as it was just reconstructed using assemblePage()). This ** is important if the parent page happens to be page 1 of the database ** image. */ assert( nNew==1 ); - assert( apNew[0]->nFree == - (get2byte(&apNew[0]->aData[5])-apNew[0]->cellOffset-apNew[0]->nCell*2) - ); - copyNodeContent(apNew[0], pParent, &rc); - freePage(apNew[0], &rc); - }else if( ISAUTOVACUUM ){ - /* Fix the pointer-map entries for all the cells that were shifted around. - ** There are several different types of pointer-map entries that need to - ** be dealt with by this routine. Some of these have been set already, but - ** many have not. The following is a summary: - ** - ** 1) The entries associated with new sibling pages that were not - ** siblings when this function was called. These have already - ** been set. We don't need to worry about old siblings that were - ** moved to the free-list - the freePage() code has taken care - ** of those. - ** - ** 2) The pointer-map entries associated with the first overflow - ** page in any overflow chains used by new divider cells. These - ** have also already been taken care of by the insertCell() code. - ** - ** 3) If the sibling pages are not leaves, then the child pages of - ** cells stored on the sibling pages may need to be updated. - ** - ** 4) If the sibling pages are not internal intkey nodes, then any - ** overflow pages used by these cells may need to be updated - ** (internal intkey nodes never contain pointers to overflow pages). - ** - ** 5) If the sibling pages are not leaves, then the pointer-map - ** entries for the right-child pages of each sibling may need - ** to be updated. - ** - ** Cases 1 and 2 are dealt with above by other code. The next - ** block deals with cases 3 and 4 and the one after that, case 5. Since - ** setting a pointer map entry is a relatively expensive operation, this - ** code only sets pointer map entries for child or overflow pages that have - ** actually moved between pages. */ - MemPage *pNew = apNew[0]; - MemPage *pOld = apCopy[0]; - int nOverflow = pOld->nOverflow; - int iNextOld = pOld->nCell + nOverflow; - int iOverflow = (nOverflow ? pOld->aiOvfl[0] : -1); - j = 0; /* Current 'old' sibling page */ - k = 0; /* Current 'new' sibling page */ - for(i=0; inCell + pOld->nOverflow; - if( pOld->nOverflow ){ - nOverflow = pOld->nOverflow; - iOverflow = i + !leafData + pOld->aiOvfl[0]; - } - isDivider = !leafData; - } - - assert(nOverflow>0 || iOverflowaiOvfl[0]==pOld->aiOvfl[1]-1); - assert(nOverflow<3 || pOld->aiOvfl[1]==pOld->aiOvfl[2]-1); - if( i==iOverflow ){ - isDivider = 1; - if( (--nOverflow)>0 ){ - iOverflow++; - } - } - - if( i==cntNew[k] ){ - /* Cell i is the cell immediately following the last cell on new - ** sibling page k. If the siblings are not leaf pages of an - ** intkey b-tree, then cell i is a divider cell. */ - pNew = apNew[++k]; - if( !leafData ) continue; - } - assert( jpgno!=pNew->pgno ){ - if( !leafCorrection ){ - ptrmapPut(pBt, get4byte(apCell[i]), PTRMAP_BTREE, pNew->pgno, &rc); - } - if( szCell[i]>pNew->minLocal ){ - ptrmapPutOvflPtr(pNew, apCell[i], &rc); - } - } - } - - if( !leafCorrection ){ - for(i=0; iaData[8]); - ptrmapPut(pBt, key, PTRMAP_BTREE, apNew[i]->pgno, &rc); - } - } + rc = defragmentPage(apNew[0]); + if( rc==SQLITE_OK ){ + assert( apNew[0]->nFree == + (get2byte(&apNew[0]->aData[5])-apNew[0]->cellOffset-apNew[0]->nCell*2) + ); + copyNodeContent(apNew[0], pParent, &rc); + freePage(apNew[0], &rc); + } + }else if( ISAUTOVACUUM && !leafCorrection ){ + /* Fix the pointer map entries associated with the right-child of each + ** sibling page. All other pointer map entries have already been taken + ** care of. */ + for(i=0; iaData[8]); + ptrmapPut(pBt, key, PTRMAP_BTREE, apNew[i]->pgno, &rc); + } + } + + assert( pParent->isInit ); + TRACE(("BALANCE: finished: old=%d new=%d cells=%d\n", + nOld, nNew, nCell)); + + /* Free any old pages that were not reused as new pages. + */ + for(i=nNew; iisInit ){ /* The ptrmapCheckPages() contains assert() statements that verify that ** all pointer map pages are set correctly. This is helpful while ** debugging. This is usually disabled because a corrupt database may ** cause an assert() statement to fail. */ ptrmapCheckPages(apNew, nNew); ptrmapCheckPages(&pParent, 1); -#endif } - - assert( pParent->isInit ); - TRACE(("BALANCE: finished: old=%d new=%d cells=%d\n", - nOld, nNew, nCell)); +#endif /* ** Cleanup before returning. */ balance_cleanup: Index: src/pager.c ================================================================== --- src/pager.c +++ src/pager.c @@ -6844,10 +6844,22 @@ } return SQLITE_OK; } #endif + +/* +** The page handle passed as the first argument refers to a dirty page +** with a page number other than iNew. This function changes the page's +** page number to iNew and sets the value of the PgHdr.flags field to +** the value passed as the third parameter. +*/ +void sqlite3PagerRekey(DbPage *pPg, Pgno iNew, u16 flags){ + assert( pPg->pgno!=iNew ); + pPg->flags = flags; + sqlite3PcacheMove(pPg, iNew); +} /* ** Return a pointer to the data for the specified page. */ void *sqlite3PagerGetData(DbPage *pPg){ @@ -7242,7 +7254,8 @@ int sqlite3PagerWalFramesize(Pager *pPager){ assert( pPager->eState>=PAGER_READER ); return sqlite3WalFramesize(pPager->pWal); } #endif + #endif /* SQLITE_OMIT_DISKIO */ Index: src/pager.h ================================================================== --- src/pager.h +++ src/pager.h @@ -185,10 +185,12 @@ void sqlite3PagerClearCache(Pager *); int sqlite3SectorSize(sqlite3_file *); /* Functions used to truncate the database file. */ void sqlite3PagerTruncateImage(Pager*,Pgno); + +void sqlite3PagerRekey(DbPage*, Pgno, u16); #if defined(SQLITE_HAS_CODEC) && !defined(SQLITE_OMIT_WAL) void *sqlite3PagerCodec(DbPage *); #endif