/ Check-in [25131e70]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Performance optimization on balance_nonroot() and related routines. 2.6% faster overall with a size increase of less than 750 bytes.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 25131e7062125e91c2d60ed2cadf134dd7609124
User & Date: drh 2015-06-24 00:05:45
Context
2015-06-24
01:07
Remove an invalid NEVER() and add a test case to cover it. check-in: 14b73d20 user: drh tags: trunk
00:05
Performance optimization on balance_nonroot() and related routines. 2.6% faster overall with a size increase of less than 750 bytes. check-in: 25131e70 user: drh tags: trunk
2015-06-23
23:31
Mark some branches as unreachable after the recent change that recognizes mismatch result set sizes on compound SELECT statements sooner. check-in: c8d1f305 user: drh tags: trunk
21:35
Testability improvement. Closed-Leaf check-in: eed6a331 user: drh tags: btree-opt2
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/btree.c.

   952    952   ** the page, 1 means the second cell, and so forth) return a pointer
   953    953   ** to the cell content.
   954    954   **
   955    955   ** This routine works only for pages that do not contain overflow cells.
   956    956   */
   957    957   #define findCell(P,I) \
   958    958     ((P)->aData + ((P)->maskPage & get2byte(&(P)->aCellIdx[2*(I)])))
   959         -#define findCellv2(D,M,O,I) (D+(M&get2byte(D+(O+2*(I)))))
   960         -
   961         -
   962         -/*
   963         -** This a more complex version of findCell() that works for
   964         -** pages that do contain overflow cells.
   965         -*/
   966         -static u8 *findOverflowCell(MemPage *pPage, int iCell){
   967         -  int i;
   968         -  assert( sqlite3_mutex_held(pPage->pBt->mutex) );
   969         -  for(i=pPage->nOverflow-1; i>=0; i--){
   970         -    int k;
   971         -    k = pPage->aiOvfl[i];
   972         -    if( k<=iCell ){
   973         -      if( k==iCell ){
   974         -        return pPage->apOvfl[i];
   975         -      }
   976         -      iCell--;
   977         -    }
   978         -  }
   979         -  return findCell(pPage, iCell);
   980         -}
   981    959   
   982    960   /*
   983    961   ** This is common tail processing for btreeParseCellPtr() and
   984    962   ** btreeParseCellPtrIndex() for the case when the cell does not fit entirely
   985    963   ** on a single B-tree page.  Make necessary adjustments to the CellInfo
   986    964   ** structure.
   987    965   */
................................................................................
  1571   1549       **
  1572   1550       ** Check to see if iFreeBlk should be coalesced onto the end of iStart.
  1573   1551       */
  1574   1552       if( iFreeBlk && iEnd+3>=iFreeBlk ){
  1575   1553         nFrag = iFreeBlk - iEnd;
  1576   1554         if( iEnd>iFreeBlk ) return SQLITE_CORRUPT_BKPT;
  1577   1555         iEnd = iFreeBlk + get2byte(&data[iFreeBlk+2]);
  1578         -      if( iEnd > pPage->pBt->usableSize ) return SQLITE_CORRUPT_BKPT;
         1556  +      if( NEVER(iEnd > pPage->pBt->usableSize) ) return SQLITE_CORRUPT_BKPT;
  1579   1557         iSize = iEnd - iStart;
  1580   1558         iFreeBlk = get2byte(&data[iFreeBlk]);
  1581   1559       }
  1582   1560     
  1583   1561       /* If iPtr is another freeblock (that is, if iPtr is not the freelist
  1584   1562       ** pointer in the page header) then check to see if iStart should be
  1585   1563       ** coalesced onto the end of iPtr.
................................................................................
  6241   6219       if( iChild ){
  6242   6220         put4byte(pCell, iChild);
  6243   6221       }
  6244   6222       j = pPage->nOverflow++;
  6245   6223       assert( j<(int)(sizeof(pPage->apOvfl)/sizeof(pPage->apOvfl[0])) );
  6246   6224       pPage->apOvfl[j] = pCell;
  6247   6225       pPage->aiOvfl[j] = (u16)i;
         6226  +
         6227  +    /* When multiple overflows occur, they are always sequential and in
         6228  +    ** sorted order.  This invariants arise because multiple overflows can
         6229  +    ** only occur when inserting divider cells into the parent page during
         6230  +    ** balancing, and the dividers are adjacent and sorted.
         6231  +    */
         6232  +    assert( j==0 || pPage->aiOvfl[j-1]<(u16)i ); /* Overflows in sorted order */
         6233  +    assert( j==0 || i==pPage->aiOvfl[j-1]+1 );   /* Overflows are sequential */
  6248   6234     }else{
  6249   6235       int rc = sqlite3PagerWrite(pPage->pDbPage);
  6250   6236       if( rc!=SQLITE_OK ){
  6251   6237         *pRC = rc;
  6252   6238         return;
  6253   6239       }
  6254   6240       assert( sqlite3PagerIswriteable(pPage->pDbPage) );
................................................................................
  6277   6263         ** the entry for the overflow page into the pointer map.
  6278   6264         */
  6279   6265         ptrmapPutOvflPtr(pPage, pCell, pRC);
  6280   6266       }
  6281   6267   #endif
  6282   6268     }
  6283   6269   }
         6270  +
         6271  +/*
         6272  +** A CellArray object contains a cache of pointers and sizes for a
         6273  +** consecutive sequence of cells that might be held multiple pages.
         6274  +*/
         6275  +typedef struct CellArray CellArray;
         6276  +struct CellArray {
         6277  +  int nCell;              /* Number of cells in apCell[] */
         6278  +  MemPage *pRef;          /* Reference page */
         6279  +  u8 **apCell;            /* All cells begin balanced */
         6280  +  u16 *szCell;            /* Local size of all cells in apCell[] */
         6281  +};
         6282  +
         6283  +/*
         6284  +** Make sure the cell sizes at idx, idx+1, ..., idx+N-1 have been
         6285  +** computed.
         6286  +*/
         6287  +static void populateCellCache(CellArray *p, int idx, int N){
         6288  +  assert( idx>=0 && idx+N<=p->nCell );
         6289  +  while( N>0 ){
         6290  +    assert( p->apCell[idx]!=0 );
         6291  +    if( p->szCell[idx]==0 ){
         6292  +      p->szCell[idx] = p->pRef->xCellSize(p->pRef, p->apCell[idx]);
         6293  +    }else{
         6294  +      assert( CORRUPT_DB ||
         6295  +              p->szCell[idx]==p->pRef->xCellSize(p->pRef, p->apCell[idx]) );
         6296  +    }
         6297  +    idx++;
         6298  +    N--;
         6299  +  }
         6300  +}
         6301  +
         6302  +/*
         6303  +** Return the size of the Nth element of the cell array
         6304  +*/
         6305  +static SQLITE_NOINLINE u16 computeCellSize(CellArray *p, int N){
         6306  +  assert( N>=0 && N<p->nCell );
         6307  +  assert( p->szCell[N]==0 );
         6308  +  p->szCell[N] = p->pRef->xCellSize(p->pRef, p->apCell[N]);
         6309  +  return p->szCell[N];
         6310  +}
         6311  +static u16 cachedCellSize(CellArray *p, int N){
         6312  +  assert( N>=0 && N<p->nCell );
         6313  +  if( p->szCell[N] ) return p->szCell[N];
         6314  +  return computeCellSize(p, N);
         6315  +}
  6284   6316   
  6285   6317   /*
  6286   6318   ** Array apCell[] contains pointers to nCell b-tree page cells. The 
  6287   6319   ** szCell[] array contains the size in bytes of each cell. This function
  6288   6320   ** replaces the current contents of page pPg with the contents of the cell
  6289   6321   ** array.
  6290   6322   **
................................................................................
  6291   6323   ** Some of the cells in apCell[] may currently be stored in pPg. This
  6292   6324   ** function works around problems caused by this by making a copy of any 
  6293   6325   ** such cells before overwriting the page data.
  6294   6326   **
  6295   6327   ** The MemPage.nFree field is invalidated by this function. It is the 
  6296   6328   ** responsibility of the caller to set it correctly.
  6297   6329   */
  6298         -static void rebuildPage(
         6330  +static int rebuildPage(
  6299   6331     MemPage *pPg,                   /* Edit this page */
  6300   6332     int nCell,                      /* Final number of cells on page */
  6301   6333     u8 **apCell,                    /* Array of cells */
  6302   6334     u16 *szCell                     /* Array of cell sizes */
  6303   6335   ){
  6304   6336     const int hdr = pPg->hdrOffset;          /* Offset of header on pPg */
  6305   6337     u8 * const aData = pPg->aData;           /* Pointer to data for pPg */
................................................................................
  6316   6348     pData = pEnd;
  6317   6349     for(i=0; i<nCell; i++){
  6318   6350       u8 *pCell = apCell[i];
  6319   6351       if( pCell>aData && pCell<pEnd ){
  6320   6352         pCell = &pTmp[pCell - aData];
  6321   6353       }
  6322   6354       pData -= szCell[i];
  6323         -    memcpy(pData, pCell, szCell[i]);
  6324   6355       put2byte(pCellptr, (pData - aData));
  6325   6356       pCellptr += 2;
         6357  +    if( pData < pCellptr ) return SQLITE_CORRUPT_BKPT;
         6358  +    memcpy(pData, pCell, szCell[i]);
  6326   6359       assert( szCell[i]==pPg->xCellSize(pPg, pCell) || CORRUPT_DB );
  6327         -    testcase( szCell[i]==pPg->xCellSize(pPg,pCell) );
         6360  +    testcase( szCell[i]!=pPg->xCellSize(pPg,pCell) );
  6328   6361     }
  6329   6362   
  6330   6363     /* The pPg->nFree field is now set incorrectly. The caller will fix it. */
  6331   6364     pPg->nCell = nCell;
  6332   6365     pPg->nOverflow = 0;
  6333   6366   
  6334   6367     put2byte(&aData[hdr+1], 0);
  6335   6368     put2byte(&aData[hdr+3], pPg->nCell);
  6336   6369     put2byte(&aData[hdr+5], pData - aData);
  6337   6370     aData[hdr+7] = 0x00;
         6371  +  return SQLITE_OK;
  6338   6372   }
  6339   6373   
  6340   6374   /*
  6341   6375   ** Array apCell[] contains nCell pointers to b-tree cells. Array szCell
  6342   6376   ** contains the size in bytes of each such cell. This function attempts to 
  6343   6377   ** add the cells stored in the array to page pPg. If it cannot (because 
  6344   6378   ** the page needs to be defragmented before the cells will fit), non-zero
................................................................................
  6363   6397   ** cells in apCell[], then the cells do not fit and non-zero is returned.
  6364   6398   */
  6365   6399   static int pageInsertArray(
  6366   6400     MemPage *pPg,                   /* Page to add cells to */
  6367   6401     u8 *pBegin,                     /* End of cell-pointer array */
  6368   6402     u8 **ppData,                    /* IN/OUT: Page content -area pointer */
  6369   6403     u8 *pCellptr,                   /* Pointer to cell-pointer area */
         6404  +  int iFirst,                     /* Index of first cell to add */
  6370   6405     int nCell,                      /* Number of cells to add to pPg */
  6371         -  u8 **apCell,                    /* Array of cells */
  6372         -  u16 *szCell                     /* Array of cell sizes */
         6406  +  CellArray *pCArray              /* Array of cells */
  6373   6407   ){
  6374   6408     int i;
  6375   6409     u8 *aData = pPg->aData;
  6376   6410     u8 *pData = *ppData;
  6377   6411     const int bFreelist = aData[1] || aData[2];
         6412  +  int iEnd = iFirst + nCell;
  6378   6413     assert( CORRUPT_DB || pPg->hdrOffset==0 );    /* Never called on page 1 */
  6379         -  for(i=0; i<nCell; i++){
  6380         -    int sz = szCell[i];
  6381         -    int rc;
         6414  +  for(i=iFirst; i<iEnd; i++){
         6415  +    int sz, rc;
  6382   6416       u8 *pSlot;
         6417  +    sz = cachedCellSize(pCArray, i);
  6383   6418       if( bFreelist==0 || (pSlot = pageFindSlot(pPg, sz, &rc, 0))==0 ){
  6384   6419         pData -= sz;
  6385   6420         if( pData<pBegin ) return 1;
  6386   6421         pSlot = pData;
  6387   6422       }
  6388         -    memcpy(pSlot, apCell[i], sz);
         6423  +    memcpy(pSlot, pCArray->apCell[i], sz);
  6389   6424       put2byte(pCellptr, (pSlot - aData));
  6390   6425       pCellptr += 2;
  6391   6426     }
  6392   6427     *ppData = pData;
  6393   6428     return 0;
  6394   6429   }
  6395   6430   
................................................................................
  6400   6435   ** within the body of pPg to the pPg free-list. The cell-pointers and other
  6401   6436   ** fields of the page are not updated.
  6402   6437   **
  6403   6438   ** This function returns the total number of cells added to the free-list.
  6404   6439   */
  6405   6440   static int pageFreeArray(
  6406   6441     MemPage *pPg,                   /* Page to edit */
         6442  +  int iFirst,                     /* First cell to delete */
  6407   6443     int nCell,                      /* Cells to delete */
  6408         -  u8 **apCell,                    /* Array of cells */
  6409         -  u16 *szCell                     /* Array of cell sizes */
         6444  +  CellArray *pCArray              /* Array of cells */
  6410   6445   ){
  6411   6446     u8 * const aData = pPg->aData;
  6412   6447     u8 * const pEnd = &aData[pPg->pBt->usableSize];
  6413   6448     u8 * const pStart = &aData[pPg->hdrOffset + 8 + pPg->childPtrSize];
  6414   6449     int nRet = 0;
  6415   6450     int i;
         6451  +  int iEnd = iFirst + nCell;
  6416   6452     u8 *pFree = 0;
  6417   6453     int szFree = 0;
  6418   6454   
  6419         -  for(i=0; i<nCell; i++){
  6420         -    u8 *pCell = apCell[i];
         6455  +  for(i=iFirst; i<iEnd; i++){
         6456  +    u8 *pCell = pCArray->apCell[i];
  6421   6457       if( pCell>=pStart && pCell<pEnd ){
  6422         -      int sz = szCell[i];
         6458  +      int sz;
         6459  +      /* No need to use cachedCellSize() here.  The sizes of all cells that
         6460  +      ** are to be freed have already been computing while deciding which
         6461  +      ** cells need freeing */
         6462  +      sz = pCArray->szCell[i];  assert( sz>0 );
  6423   6463         if( pFree!=(pCell + sz) ){
  6424   6464           if( pFree ){
  6425   6465             assert( pFree>aData && (pFree - aData)<65536 );
  6426   6466             freeSpace(pPg, (u16)(pFree - aData), szFree);
  6427   6467           }
  6428   6468           pFree = pCell;
  6429   6469           szFree = sz;
................................................................................
  6450   6490   **
  6451   6491   ** This routine makes the necessary adjustments to pPg so that it contains
  6452   6492   ** the correct cells after being balanced.
  6453   6493   **
  6454   6494   ** The pPg->nFree field is invalid when this function returns. It is the
  6455   6495   ** responsibility of the caller to set it correctly.
  6456   6496   */
  6457         -static void editPage(
         6497  +static int editPage(
  6458   6498     MemPage *pPg,                   /* Edit this page */
  6459   6499     int iOld,                       /* Index of first cell currently on page */
  6460   6500     int iNew,                       /* Index of new first cell on page */
  6461   6501     int nNew,                       /* Final number of cells on page */
  6462         -  u8 **apCell,                    /* Array of cells */
  6463         -  u16 *szCell                     /* Array of cell sizes */
         6502  +  CellArray *pCArray              /* Array of cells and sizes */
  6464   6503   ){
  6465   6504     u8 * const aData = pPg->aData;
  6466   6505     const int hdr = pPg->hdrOffset;
  6467   6506     u8 *pBegin = &pPg->aCellIdx[nNew * 2];
  6468   6507     int nCell = pPg->nCell;       /* Cells stored on pPg */
  6469   6508     u8 *pData;
  6470   6509     u8 *pCellptr;
................................................................................
  6475   6514   #ifdef SQLITE_DEBUG
  6476   6515     u8 *pTmp = sqlite3PagerTempSpace(pPg->pBt->pPager);
  6477   6516     memcpy(pTmp, aData, pPg->pBt->usableSize);
  6478   6517   #endif
  6479   6518   
  6480   6519     /* Remove cells from the start and end of the page */
  6481   6520     if( iOld<iNew ){
  6482         -    int nShift = pageFreeArray(
  6483         -        pPg, iNew-iOld, &apCell[iOld], &szCell[iOld]
  6484         -    );
         6521  +    int nShift = pageFreeArray(pPg, iOld, iNew-iOld, pCArray);
  6485   6522       memmove(pPg->aCellIdx, &pPg->aCellIdx[nShift*2], nCell*2);
  6486   6523       nCell -= nShift;
  6487   6524     }
  6488   6525     if( iNewEnd < iOldEnd ){
  6489         -    nCell -= pageFreeArray(
  6490         -        pPg, iOldEnd-iNewEnd, &apCell[iNewEnd], &szCell[iNewEnd]
  6491         -    );
         6526  +    nCell -= pageFreeArray(pPg, iNewEnd, iOldEnd - iNewEnd, pCArray);
  6492   6527     }
  6493   6528   
  6494   6529     pData = &aData[get2byteNotZero(&aData[hdr+5])];
  6495   6530     if( pData<pBegin ) goto editpage_fail;
  6496   6531   
  6497   6532     /* Add cells to the start of the page */
  6498   6533     if( iNew<iOld ){
  6499   6534       int nAdd = MIN(nNew,iOld-iNew);
  6500   6535       assert( (iOld-iNew)<nNew || nCell==0 || CORRUPT_DB );
  6501   6536       pCellptr = pPg->aCellIdx;
  6502   6537       memmove(&pCellptr[nAdd*2], pCellptr, nCell*2);
  6503   6538       if( pageInsertArray(
  6504   6539             pPg, pBegin, &pData, pCellptr,
  6505         -          nAdd, &apCell[iNew], &szCell[iNew]
         6540  +          iNew, nAdd, pCArray
  6506   6541       ) ) goto editpage_fail;
  6507   6542       nCell += nAdd;
  6508   6543     }
  6509   6544   
  6510   6545     /* Add any overflow cells */
  6511   6546     for(i=0; i<pPg->nOverflow; i++){
  6512   6547       int iCell = (iOld + pPg->aiOvfl[i]) - iNew;
  6513   6548       if( iCell>=0 && iCell<nNew ){
  6514   6549         pCellptr = &pPg->aCellIdx[iCell * 2];
  6515   6550         memmove(&pCellptr[2], pCellptr, (nCell - iCell) * 2);
  6516   6551         nCell++;
  6517   6552         if( pageInsertArray(
  6518   6553               pPg, pBegin, &pData, pCellptr,
  6519         -            1, &apCell[iCell + iNew], &szCell[iCell + iNew]
         6554  +            iCell+iNew, 1, pCArray
  6520   6555         ) ) goto editpage_fail;
  6521   6556       }
  6522   6557     }
  6523   6558   
  6524   6559     /* Append cells to the end of the page */
  6525   6560     pCellptr = &pPg->aCellIdx[nCell*2];
  6526   6561     if( pageInsertArray(
  6527   6562           pPg, pBegin, &pData, pCellptr,
  6528         -        nNew-nCell, &apCell[iNew+nCell], &szCell[iNew+nCell]
         6563  +        iNew+nCell, nNew-nCell, pCArray
  6529   6564     ) ) goto editpage_fail;
  6530   6565   
  6531   6566     pPg->nCell = nNew;
  6532   6567     pPg->nOverflow = 0;
  6533   6568   
  6534   6569     put2byte(&aData[hdr+3], pPg->nCell);
  6535   6570     put2byte(&aData[hdr+5], pData - aData);
  6536   6571   
  6537   6572   #ifdef SQLITE_DEBUG
  6538   6573     for(i=0; i<nNew && !CORRUPT_DB; i++){
  6539         -    u8 *pCell = apCell[i+iNew];
         6574  +    u8 *pCell = pCArray->apCell[i+iNew];
  6540   6575       int iOff = get2byte(&pPg->aCellIdx[i*2]);
  6541   6576       if( pCell>=aData && pCell<&aData[pPg->pBt->usableSize] ){
  6542   6577         pCell = &pTmp[pCell - aData];
  6543   6578       }
  6544         -    assert( 0==memcmp(pCell, &aData[iOff], szCell[i+iNew]) );
         6579  +    assert( 0==memcmp(pCell, &aData[iOff],
         6580  +            pCArray->pRef->xCellSize(pCArray->pRef, pCArray->apCell[i+iNew])) );
  6545   6581     }
  6546   6582   #endif
  6547   6583   
  6548         -  return;
         6584  +  return SQLITE_OK;
  6549   6585    editpage_fail:
  6550   6586     /* Unable to edit this page. Rebuild it from scratch instead. */
  6551         -  rebuildPage(pPg, nNew, &apCell[iNew], &szCell[iNew]);
         6587  +  populateCellCache(pCArray, iNew, nNew);
         6588  +  return rebuildPage(pPg, nNew, &pCArray->apCell[iNew], &pCArray->szCell[iNew]);
  6552   6589   }
  6553   6590   
  6554   6591   /*
  6555   6592   ** The following parameters determine how many adjacent pages get involved
  6556   6593   ** in a balancing operation.  NN is the number of neighbors on either side
  6557   6594   ** of the page that participate in the balancing operation.  NB is the
  6558   6595   ** total number of pages that participate, including the target page and
................................................................................
  6616   6653       u8 *pCell = pPage->apOvfl[0];
  6617   6654       u16 szCell = pPage->xCellSize(pPage, pCell);
  6618   6655       u8 *pStop;
  6619   6656   
  6620   6657       assert( sqlite3PagerIswriteable(pNew->pDbPage) );
  6621   6658       assert( pPage->aData[0]==(PTF_INTKEY|PTF_LEAFDATA|PTF_LEAF) );
  6622   6659       zeroPage(pNew, PTF_INTKEY|PTF_LEAFDATA|PTF_LEAF);
  6623         -    rebuildPage(pNew, 1, &pCell, &szCell);
         6660  +    rc = rebuildPage(pNew, 1, &pCell, &szCell);
         6661  +    if( NEVER(rc) ) return rc;
  6624   6662       pNew->nFree = pBt->usableSize - pNew->cellOffset - 2 - szCell;
  6625   6663   
  6626   6664       /* If this is an auto-vacuum database, update the pointer map
  6627   6665       ** with entries for the new page, and any pointer from the 
  6628   6666       ** cell on the page to an overflow page. If either of these
  6629   6667       ** operations fails, the return code is set, but the contents
  6630   6668       ** of the parent page are still manipulated by thh code below.
................................................................................
  6820   6858     MemPage *pParent,               /* Parent page of siblings being balanced */
  6821   6859     int iParentIdx,                 /* Index of "the page" in pParent */
  6822   6860     u8 *aOvflSpace,                 /* page-size bytes of space for parent ovfl */
  6823   6861     int isRoot,                     /* True if pParent is a root-page */
  6824   6862     int bBulk                       /* True if this call is part of a bulk load */
  6825   6863   ){
  6826   6864     BtShared *pBt;               /* The whole database */
  6827         -  int nCell = 0;               /* Number of cells in apCell[] */
  6828   6865     int nMaxCells = 0;           /* Allocated size of apCell, szCell, aFrom. */
  6829   6866     int nNew = 0;                /* Number of pages in apNew[] */
  6830   6867     int nOld;                    /* Number of pages in apOld[] */
  6831   6868     int i, j, k;                 /* Loop counters */
  6832   6869     int nxDiv;                   /* Next divider slot in pParent->aCell[] */
  6833   6870     int rc = SQLITE_OK;          /* The return code */
  6834   6871     u16 leafCorrection;          /* 4 if pPage is a leaf.  0 if not */
  6835   6872     int leafData;                /* True if pPage is a leaf of a LEAFDATA tree */
  6836   6873     int usableSpace;             /* Bytes in pPage beyond the header */
  6837   6874     int pageFlags;               /* Value of pPage->aData[0] */
  6838         -  int subtotal;                /* Subtotal of bytes in cells on one page */
  6839   6875     int iSpace1 = 0;             /* First unused byte of aSpace1[] */
  6840   6876     int iOvflSpace = 0;          /* First unused byte of aOvflSpace[] */
  6841   6877     int szScratch;               /* Size of scratch memory requested */
  6842   6878     MemPage *apOld[NB];          /* pPage and up to two siblings */
  6843   6879     MemPage *apNew[NB+2];        /* pPage and up to NB siblings after balancing */
  6844   6880     u8 *pRight;                  /* Location in parent of right-sibling pointer */
  6845   6881     u8 *apDiv[NB-1];             /* Divider cells in pParent */
  6846         -  int cntNew[NB+2];            /* Index in aCell[] of cell after i-th page */
  6847         -  int cntOld[NB+2];            /* Old index in aCell[] after i-th page */
         6882  +  int cntNew[NB+2];            /* Index in b.paCell[] of cell after i-th page */
         6883  +  int cntOld[NB+2];            /* Old index in b.apCell[] */
  6848   6884     int szNew[NB+2];             /* Combined size of cells placed on i-th page */
  6849         -  u8 **apCell = 0;             /* All cells begin balanced */
  6850         -  u16 *szCell;                 /* Local size of all cells in apCell[] */
  6851   6885     u8 *aSpace1;                 /* Space for copies of dividers cells */
  6852   6886     Pgno pgno;                   /* Temp var to store a page number in */
  6853   6887     u8 abDone[NB+2];             /* True after i'th new page is populated */
  6854   6888     Pgno aPgno[NB+2];            /* Page numbers of new pages before shuffling */
  6855   6889     Pgno aPgOrder[NB+2];         /* Copy of aPgno[] used for sorting pages */
  6856   6890     u16 aPgFlags[NB+2];          /* flags field of new pages before shuffling */
         6891  +  CellArray b;                  /* Parsed information on cells being balanced */
  6857   6892   
  6858   6893     memset(abDone, 0, sizeof(abDone));
         6894  +  b.nCell = 0;
         6895  +  b.apCell = 0;
  6859   6896     pBt = pParent->pBt;
  6860   6897     assert( sqlite3_mutex_held(pBt->mutex) );
  6861   6898     assert( sqlite3PagerIswriteable(pParent->pDbPage) );
  6862   6899   
  6863   6900   #if 0
  6864   6901     TRACE(("BALANCE: begin page %d child of %d\n", pPage->pgno, pParent->pgno));
  6865   6902   #endif
................................................................................
  6960   6997     ** alignment */
  6961   6998     nMaxCells = (nMaxCells + 3)&~3;
  6962   6999   
  6963   7000     /*
  6964   7001     ** Allocate space for memory structures
  6965   7002     */
  6966   7003     szScratch =
  6967         -       nMaxCells*sizeof(u8*)                       /* apCell */
  6968         -     + nMaxCells*sizeof(u16)                       /* szCell */
         7004  +       nMaxCells*sizeof(u8*)                       /* b.apCell */
         7005  +     + nMaxCells*sizeof(u16)                       /* b.szCell */
  6969   7006        + pBt->pageSize;                              /* aSpace1 */
  6970   7007   
  6971   7008     /* EVIDENCE-OF: R-28375-38319 SQLite will never request a scratch buffer
  6972   7009     ** that is more than 6 times the database page size. */
  6973   7010     assert( szScratch<=6*(int)pBt->pageSize );
  6974         -  apCell = sqlite3ScratchMalloc( szScratch ); 
  6975         -  if( apCell==0 ){
         7011  +  b.apCell = sqlite3ScratchMalloc( szScratch ); 
         7012  +  if( b.apCell==0 ){
  6976   7013       rc = SQLITE_NOMEM;
  6977   7014       goto balance_cleanup;
  6978   7015     }
  6979         -  szCell = (u16*)&apCell[nMaxCells];
  6980         -  aSpace1 = (u8*)&szCell[nMaxCells];
         7016  +  b.szCell = (u16*)&b.apCell[nMaxCells];
         7017  +  aSpace1 = (u8*)&b.szCell[nMaxCells];
  6981   7018     assert( EIGHT_BYTE_ALIGNMENT(aSpace1) );
  6982   7019   
  6983   7020     /*
  6984   7021     ** Load pointers to all cells on sibling pages and the divider cells
  6985         -  ** into the local apCell[] array.  Make copies of the divider cells
         7022  +  ** into the local b.apCell[] array.  Make copies of the divider cells
  6986   7023     ** into space obtained from aSpace1[]. The divider cells have already
  6987   7024     ** been removed from pParent.
  6988   7025     **
  6989   7026     ** If the siblings are on leaf pages, then the child pointers of the
  6990   7027     ** divider cells are stripped from the cells before they are copied
  6991         -  ** into aSpace1[].  In this way, all cells in apCell[] are without
         7028  +  ** into aSpace1[].  In this way, all cells in b.apCell[] are without
  6992   7029     ** child pointers.  If siblings are not leaves, then all cell in
  6993         -  ** apCell[] include child pointers.  Either way, all cells in apCell[]
         7030  +  ** b.apCell[] include child pointers.  Either way, all cells in b.apCell[]
  6994   7031     ** are alike.
  6995   7032     **
  6996   7033     ** leafCorrection:  4 if pPage is a leaf.  0 if pPage is not a leaf.
  6997   7034     **       leafData:  1 if pPage holds key+data and pParent holds only keys.
  6998   7035     */
  6999         -  leafCorrection = apOld[0]->leaf*4;
  7000         -  leafData = apOld[0]->intKeyLeaf;
         7036  +  b.pRef = apOld[0];
         7037  +  leafCorrection = b.pRef->leaf*4;
         7038  +  leafData = b.pRef->intKeyLeaf;
  7001   7039     for(i=0; i<nOld; i++){
  7002         -    int limit;
  7003   7040       MemPage *pOld = apOld[i];
         7041  +    int limit = pOld->nCell;
         7042  +    u8 *aData = pOld->aData;
         7043  +    u16 maskPage = pOld->maskPage;
         7044  +    u8 *piCell = aData + pOld->cellOffset;
         7045  +    u8 *piEnd;
  7004   7046   
  7005   7047       /* Verify that all sibling pages are of the same "type" (table-leaf,
  7006   7048       ** table-interior, index-leaf, or index-interior).
  7007   7049       */
  7008   7050       if( pOld->aData[0]!=apOld[0]->aData[0] ){
  7009   7051         rc = SQLITE_CORRUPT_BKPT;
  7010   7052         goto balance_cleanup;
  7011   7053       }
  7012   7054   
  7013         -    limit = pOld->nCell+pOld->nOverflow;
         7055  +    /* Load b.apCell[] with pointers to all cells in pOld.  If pOld
         7056  +    ** constains overflow cells, include them in the b.apCell[] array
         7057  +    ** in the correct spot.
         7058  +    **
         7059  +    ** Note that when there are multiple overflow cells, it is always the
         7060  +    ** case that they are sequential and adjacent.  This invariant arises
         7061  +    ** because multiple overflows can only occurs when inserting divider
         7062  +    ** cells into a parent on a prior balance, and divider cells are always
         7063  +    ** adjacent and are inserted in order.  There is an assert() tagged
         7064  +    ** with "NOTE 1" in the overflow cell insertion loop to prove this
         7065  +    ** invariant.
         7066  +    **
         7067  +    ** This must be done in advance.  Once the balance starts, the cell
         7068  +    ** offset section of the btree page will be overwritten and we will no
         7069  +    ** long be able to find the cells if a pointer to each cell is not saved
         7070  +    ** first.
         7071  +    */
         7072  +    memset(&b.szCell[b.nCell], 0, sizeof(b.szCell[0])*limit);
  7014   7073       if( pOld->nOverflow>0 ){
  7015         -      for(j=0; j<limit; j++){
  7016         -        assert( nCell<nMaxCells );
  7017         -        apCell[nCell] = findOverflowCell(pOld, j);
  7018         -        szCell[nCell] = pOld->xCellSize(pOld, apCell[nCell]);
  7019         -        nCell++;
  7020         -      }
  7021         -    }else{
  7022         -      u8 *aData = pOld->aData;
  7023         -      u16 maskPage = pOld->maskPage;
  7024         -      u16 cellOffset = pOld->cellOffset;
         7074  +      memset(&b.szCell[b.nCell+limit], 0, sizeof(b.szCell[0])*pOld->nOverflow);
         7075  +      limit = pOld->aiOvfl[0];
  7025   7076         for(j=0; j<limit; j++){
  7026         -        assert( nCell<nMaxCells );
  7027         -        apCell[nCell] = findCellv2(aData, maskPage, cellOffset, j);
  7028         -        szCell[nCell] = pOld->xCellSize(pOld, apCell[nCell]);
  7029         -        nCell++;
         7077  +        b.apCell[b.nCell] = aData + (maskPage & get2byte(piCell));
         7078  +        piCell += 2;
         7079  +        b.nCell++;
  7030   7080         }
  7031         -    }       
  7032         -    cntOld[i] = nCell;
         7081  +      for(k=0; k<pOld->nOverflow; k++){
         7082  +        assert( k==0 || pOld->aiOvfl[k-1]+1==pOld->aiOvfl[k] );/* NOTE 1 */
         7083  +        b.apCell[b.nCell] = pOld->apOvfl[k];
         7084  +        b.nCell++;
         7085  +      }
         7086  +      limit = pOld->nCell - pOld->aiOvfl[0];
         7087  +    }
         7088  +    piEnd = aData + pOld->cellOffset + 2*pOld->nCell;
         7089  +    while( piCell<piEnd ){
         7090  +      assert( b.nCell<nMaxCells );
         7091  +      b.apCell[b.nCell] = aData + (maskPage & get2byte(piCell));
         7092  +      piCell += 2;
         7093  +      b.nCell++;
         7094  +    }
         7095  +
         7096  +    cntOld[i] = b.nCell;
  7033   7097       if( i<nOld-1 && !leafData){
  7034   7098         u16 sz = (u16)szNew[i];
  7035   7099         u8 *pTemp;
  7036         -      assert( nCell<nMaxCells );
  7037         -      szCell[nCell] = sz;
         7100  +      assert( b.nCell<nMaxCells );
         7101  +      b.szCell[b.nCell] = sz;
  7038   7102         pTemp = &aSpace1[iSpace1];
  7039   7103         iSpace1 += sz;
  7040   7104         assert( sz<=pBt->maxLocal+23 );
  7041   7105         assert( iSpace1 <= (int)pBt->pageSize );
  7042   7106         memcpy(pTemp, apDiv[i], sz);
  7043         -      apCell[nCell] = pTemp+leafCorrection;
         7107  +      b.apCell[b.nCell] = pTemp+leafCorrection;
  7044   7108         assert( leafCorrection==0 || leafCorrection==4 );
  7045         -      szCell[nCell] = szCell[nCell] - leafCorrection;
         7109  +      b.szCell[b.nCell] = b.szCell[b.nCell] - leafCorrection;
  7046   7110         if( !pOld->leaf ){
  7047   7111           assert( leafCorrection==0 );
  7048   7112           assert( pOld->hdrOffset==0 );
  7049   7113           /* The right pointer of the child page pOld becomes the left
  7050   7114           ** pointer of the divider cell */
  7051         -        memcpy(apCell[nCell], &pOld->aData[8], 4);
         7115  +        memcpy(b.apCell[b.nCell], &pOld->aData[8], 4);
  7052   7116         }else{
  7053   7117           assert( leafCorrection==4 );
  7054         -        while( szCell[nCell]<4 ){
         7118  +        while( b.szCell[b.nCell]<4 ){
  7055   7119             /* Do not allow any cells smaller than 4 bytes. If a smaller cell
  7056   7120             ** does exist, pad it with 0x00 bytes. */
  7057         -          assert( szCell[nCell]==3 || CORRUPT_DB );
  7058         -          assert( apCell[nCell]==&aSpace1[iSpace1-3] || CORRUPT_DB );
         7121  +          assert( b.szCell[b.nCell]==3 || CORRUPT_DB );
         7122  +          assert( b.apCell[b.nCell]==&aSpace1[iSpace1-3] || CORRUPT_DB );
  7059   7123             aSpace1[iSpace1++] = 0x00;
  7060         -          szCell[nCell]++;
         7124  +          b.szCell[b.nCell]++;
  7061   7125           }
  7062   7126         }
  7063         -      nCell++;
         7127  +      b.nCell++;
  7064   7128       }
  7065   7129     }
  7066   7130   
  7067   7131     /*
  7068         -  ** Figure out the number of pages needed to hold all nCell cells.
         7132  +  ** Figure out the number of pages needed to hold all b.nCell cells.
  7069   7133     ** Store this number in "k".  Also compute szNew[] which is the total
  7070   7134     ** size of all cells on the i-th page and cntNew[] which is the index
  7071         -  ** in apCell[] of the cell that divides page i from page i+1.  
  7072         -  ** cntNew[k] should equal nCell.
         7135  +  ** in b.apCell[] of the cell that divides page i from page i+1.  
         7136  +  ** cntNew[k] should equal b.nCell.
  7073   7137     **
  7074   7138     ** Values computed by this block:
  7075   7139     **
  7076   7140     **           k: The total number of sibling pages
  7077   7141     **    szNew[i]: Spaced used on the i-th sibling page.
  7078         -  **   cntNew[i]: Index in apCell[] and szCell[] for the first cell to
         7142  +  **   cntNew[i]: Index in b.apCell[] and b.szCell[] for the first cell to
  7079   7143     **              the right of the i-th sibling page.
  7080   7144     ** usableSpace: Number of bytes of space available on each sibling.
  7081   7145     ** 
  7082   7146     */
  7083   7147     usableSpace = pBt->usableSize - 12 + leafCorrection;
  7084         -  for(subtotal=k=i=0; i<nCell; i++){
  7085         -    assert( i<nMaxCells );
  7086         -    subtotal += szCell[i] + 2;
  7087         -    if( subtotal > usableSpace ){
  7088         -      szNew[k] = subtotal - szCell[i] - 2;
  7089         -      cntNew[k] = i;
  7090         -      if( leafData ){ i--; }
  7091         -      subtotal = 0;
  7092         -      k++;
  7093         -      if( k>NB+1 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; }
  7094         -    }
  7095         -  }
  7096         -  szNew[k] = subtotal;
  7097         -  cntNew[k] = nCell;
  7098         -  k++;
         7148  +  for(i=0; i<nOld; i++){
         7149  +    MemPage *p = apOld[i];
         7150  +    szNew[i] = usableSpace - p->nFree;
         7151  +    if( szNew[i]<0 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; }
         7152  +    for(j=0; j<p->nOverflow; j++){
         7153  +      szNew[i] += 2 + p->xCellSize(p, p->apOvfl[j]);
         7154  +    }
         7155  +    cntNew[i] = cntOld[i];
         7156  +  }
         7157  +  k = nOld;
         7158  +  for(i=0; i<k; i++){
         7159  +    int sz;
         7160  +    while( szNew[i]>usableSpace ){
         7161  +      if( i+1>=k ){
         7162  +        k = i+2;
         7163  +        if( k>NB+2 ){ rc = SQLITE_CORRUPT_BKPT; goto balance_cleanup; }
         7164  +        szNew[k-1] = 0;
         7165  +        cntNew[k-1] = b.nCell;
         7166  +      }
         7167  +      sz = 2 + cachedCellSize(&b, cntNew[i]-1);
         7168  +      szNew[i] -= sz;
         7169  +      if( !leafData ){
         7170  +        if( cntNew[i]<b.nCell ){
         7171  +          sz = 2 + cachedCellSize(&b, cntNew[i]);
         7172  +        }else{
         7173  +          sz = 0;
         7174  +        }
         7175  +      }
         7176  +      szNew[i+1] += sz;
         7177  +      cntNew[i]--;
         7178  +    }
         7179  +    while( cntNew[i]<b.nCell ){
         7180  +      sz = 2 + cachedCellSize(&b, cntNew[i]);
         7181  +      if( szNew[i]+sz>usableSpace ) break;
         7182  +      szNew[i] += sz;
         7183  +      cntNew[i]++;
         7184  +      if( !leafData ){
         7185  +        if( cntNew[i]<b.nCell ){
         7186  +          sz = 2 + cachedCellSize(&b, cntNew[i]);
         7187  +        }else{
         7188  +          sz = 0;
         7189  +        }
         7190  +      }
         7191  +      szNew[i+1] -= sz;
         7192  +    }
         7193  +    if( cntNew[i]>=b.nCell ){
         7194  +      k = i+1;
         7195  +    }else if( cntNew[i] - (i>0 ? cntNew[i-1] : 0) <= 0 ){
         7196  +      rc = SQLITE_CORRUPT_BKPT;
         7197  +      goto balance_cleanup;
         7198  +    }
         7199  +  }
  7099   7200   
  7100   7201     /*
  7101   7202     ** The packing computed by the previous block is biased toward the siblings
  7102   7203     ** on the left side (siblings with smaller keys). The left siblings are
  7103   7204     ** always nearly full, while the right-most sibling might be nearly empty.
  7104   7205     ** The next block of code attempts to adjust the packing of siblings to
  7105   7206     ** get a better balance.
................................................................................
  7112   7213       int szRight = szNew[i];  /* Size of sibling on the right */
  7113   7214       int szLeft = szNew[i-1]; /* Size of sibling on the left */
  7114   7215       int r;              /* Index of right-most cell in left sibling */
  7115   7216       int d;              /* Index of first cell to the left of right sibling */
  7116   7217   
  7117   7218       r = cntNew[i-1] - 1;
  7118   7219       d = r + 1 - leafData;
  7119         -    assert( d<nMaxCells );
  7120         -    assert( r<nMaxCells );
  7121         -    while( szRight==0 
  7122         -       || (!bBulk && szRight+szCell[d]+2<=szLeft-(szCell[r]+2)) 
  7123         -    ){
  7124         -      szRight += szCell[d] + 2;
  7125         -      szLeft -= szCell[r] + 2;
  7126         -      cntNew[i-1]--;
  7127         -      r = cntNew[i-1] - 1;
  7128         -      d = r + 1 - leafData;
         7220  +    (void)cachedCellSize(&b, d);
         7221  +    while(1){
         7222  +      assert( d<nMaxCells );
         7223  +      assert( r<nMaxCells );
         7224  +      (void)cachedCellSize(&b, r);
         7225  +      if( szRight!=0
         7226  +       && (bBulk || szRight+b.szCell[d]+2 > szLeft-(b.szCell[r]+2)) ){
         7227  +        break;
         7228  +      }
         7229  +      szRight += b.szCell[d] + 2;
         7230  +      szLeft -= b.szCell[r] + 2;
         7231  +      cntNew[i-1] = r;
         7232  +      if( cntNew[i-1] <= 0 ){
         7233  +        rc = SQLITE_CORRUPT_BKPT;
         7234  +        goto balance_cleanup;
         7235  +      }
         7236  +      r--;
         7237  +      d--;
  7129   7238       }
  7130   7239       szNew[i] = szRight;
  7131   7240       szNew[i-1] = szLeft;
  7132   7241     }
  7133   7242   
  7134   7243     /* Sanity check:  For a non-corrupt database file one of the follwing
  7135   7244     ** must be true:
................................................................................
  7160   7269       }else{
  7161   7270         assert( i>0 );
  7162   7271         rc = allocateBtreePage(pBt, &pNew, &pgno, (bBulk ? 1 : pgno), 0);
  7163   7272         if( rc ) goto balance_cleanup;
  7164   7273         zeroPage(pNew, pageFlags);
  7165   7274         apNew[i] = pNew;
  7166   7275         nNew++;
  7167         -      cntOld[i] = nCell;
         7276  +      cntOld[i] = b.nCell;
  7168   7277   
  7169   7278         /* Set the pointer-map entry for the new sibling page. */
  7170   7279         if( ISAUTOVACUUM ){
  7171   7280           ptrmapPut(pBt, pNew->pgno, PTRMAP_BTREE, pParent->pgno, &rc);
  7172   7281           if( rc!=SQLITE_OK ){
  7173   7282             goto balance_cleanup;
  7174   7283           }
................................................................................
  7265   7374       MemPage *pNew = apNew[0];
  7266   7375       u8 *aOld = pNew->aData;
  7267   7376       int cntOldNext = pNew->nCell + pNew->nOverflow;
  7268   7377       int usableSize = pBt->usableSize;
  7269   7378       int iNew = 0;
  7270   7379       int iOld = 0;
  7271   7380   
  7272         -    for(i=0; i<nCell; i++){
  7273         -      u8 *pCell = apCell[i];
         7381  +    for(i=0; i<b.nCell; i++){
         7382  +      u8 *pCell = b.apCell[i];
  7274   7383         if( i==cntOldNext ){
  7275   7384           MemPage *pOld = (++iOld)<nNew ? apNew[iOld] : apOld[iOld];
  7276   7385           cntOldNext += pOld->nCell + pOld->nOverflow + !leafData;
  7277   7386           aOld = pOld->aData;
  7278   7387         }
  7279   7388         if( i==cntNew[iNew] ){
  7280   7389           pNew = apNew[++iNew];
................................................................................
  7291   7400          || pNew->pgno!=aPgno[iOld]
  7292   7401          || pCell<aOld
  7293   7402          || pCell>=&aOld[usableSize]
  7294   7403         ){
  7295   7404           if( !leafCorrection ){
  7296   7405             ptrmapPut(pBt, get4byte(pCell), PTRMAP_BTREE, pNew->pgno, &rc);
  7297   7406           }
  7298         -        if( szCell[i]>pNew->minLocal ){
         7407  +        if( cachedCellSize(&b,i)>pNew->minLocal ){
  7299   7408             ptrmapPutOvflPtr(pNew, pCell, &rc);
  7300   7409           }
         7410  +        if( rc ) goto balance_cleanup;
  7301   7411         }
  7302   7412       }
  7303   7413     }
  7304   7414   
  7305   7415     /* Insert new divider cells into pParent. */
  7306   7416     for(i=0; i<nNew-1; i++){
  7307   7417       u8 *pCell;
  7308   7418       u8 *pTemp;
  7309   7419       int sz;
  7310   7420       MemPage *pNew = apNew[i];
  7311   7421       j = cntNew[i];
  7312   7422   
  7313   7423       assert( j<nMaxCells );
  7314         -    pCell = apCell[j];
  7315         -    sz = szCell[j] + leafCorrection;
         7424  +    assert( b.apCell[j]!=0 );
         7425  +    pCell = b.apCell[j];
         7426  +    sz = b.szCell[j] + leafCorrection;
  7316   7427       pTemp = &aOvflSpace[iOvflSpace];
  7317   7428       if( !pNew->leaf ){
  7318   7429         memcpy(&pNew->aData[8], pCell, 4);
  7319   7430       }else if( leafData ){
  7320   7431         /* If the tree is a leaf-data tree, and the siblings are leaves, 
  7321         -      ** then there is no divider cell in apCell[]. Instead, the divider 
         7432  +      ** then there is no divider cell in b.apCell[]. Instead, the divider 
  7322   7433         ** cell consists of the integer key for the right-most cell of 
  7323   7434         ** the sibling-page assembled above only.
  7324   7435         */
  7325   7436         CellInfo info;
  7326   7437         j--;
  7327         -      pNew->xParseCell(pNew, apCell[j], &info);
         7438  +      pNew->xParseCell(pNew, b.apCell[j], &info);
  7328   7439         pCell = pTemp;
  7329   7440         sz = 4 + putVarint(&pCell[4], info.nKey);
  7330   7441         pTemp = 0;
  7331   7442       }else{
  7332   7443         pCell -= 4;
  7333   7444         /* Obscure case for non-leaf-data trees: If the cell at pCell was
  7334   7445         ** previously stored on a leaf node, and its reported size was 4
................................................................................
  7337   7448         ** any cell). But it is important to pass the correct size to 
  7338   7449         ** insertCell(), so reparse the cell now.
  7339   7450         **
  7340   7451         ** Note that this can never happen in an SQLite data file, as all
  7341   7452         ** cells are at least 4 bytes. It only happens in b-trees used
  7342   7453         ** to evaluate "IN (SELECT ...)" and similar clauses.
  7343   7454         */
  7344         -      if( szCell[j]==4 ){
         7455  +      if( b.szCell[j]==4 ){
  7345   7456           assert(leafCorrection==4);
  7346   7457           sz = pParent->xCellSize(pParent, pCell);
  7347   7458         }
  7348   7459       }
  7349   7460       iOvflSpace += sz;
  7350   7461       assert( sz<=pBt->maxLocal+23 );
  7351   7462       assert( iOvflSpace <= (int)pBt->pageSize );
................................................................................
  7395   7506         ** only after iPg+1 has already been updated. */
  7396   7507         assert( cntNew[iPg]>=cntOld[iPg] || abDone[iPg+1] );
  7397   7508   
  7398   7509         if( iPg==0 ){
  7399   7510           iNew = iOld = 0;
  7400   7511           nNewCell = cntNew[0];
  7401   7512         }else{
  7402         -        iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : nCell;
         7513  +        iOld = iPg<nOld ? (cntOld[iPg-1] + !leafData) : b.nCell;
  7403   7514           iNew = cntNew[iPg-1] + !leafData;
  7404   7515           nNewCell = cntNew[iPg] - iNew;
  7405   7516         }
  7406   7517   
  7407         -      editPage(apNew[iPg], iOld, iNew, nNewCell, apCell, szCell);
         7518  +      rc = editPage(apNew[iPg], iOld, iNew, nNewCell, &b);
         7519  +      if( rc ) goto balance_cleanup;
  7408   7520         abDone[iPg]++;
  7409   7521         apNew[iPg]->nFree = usableSpace-szNew[iPg];
  7410   7522         assert( apNew[iPg]->nOverflow==0 );
  7411   7523         assert( apNew[iPg]->nCell==nNewCell );
  7412   7524       }
  7413   7525     }
  7414   7526   
................................................................................
  7451   7563         u32 key = get4byte(&apNew[i]->aData[8]);
  7452   7564         ptrmapPut(pBt, key, PTRMAP_BTREE, apNew[i]->pgno, &rc);
  7453   7565       }
  7454   7566     }
  7455   7567   
  7456   7568     assert( pParent->isInit );
  7457   7569     TRACE(("BALANCE: finished: old=%d new=%d cells=%d\n",
  7458         -          nOld, nNew, nCell));
         7570  +          nOld, nNew, b.nCell));
  7459   7571   
  7460   7572     /* Free any old pages that were not reused as new pages.
  7461   7573     */
  7462   7574     for(i=nNew; i<nOld; i++){
  7463   7575       freePage(apOld[i], &rc);
  7464   7576     }
  7465   7577   
................................................................................
  7474   7586     }
  7475   7587   #endif
  7476   7588   
  7477   7589     /*
  7478   7590     ** Cleanup before returning.
  7479   7591     */
  7480   7592   balance_cleanup:
  7481         -  sqlite3ScratchFree(apCell);
         7593  +  sqlite3ScratchFree(b.apCell);
  7482   7594     for(i=0; i<nOld; i++){
  7483   7595       releasePage(apOld[i]);
  7484   7596     }
  7485   7597     for(i=0; i<nNew; i++){
  7486   7598       releasePage(apNew[i]);
  7487   7599     }
  7488   7600   
................................................................................
  7925   8037     if( !pPage->leaf ){
  7926   8038       MemPage *pLeaf = pCur->apPage[pCur->iPage];
  7927   8039       int nCell;
  7928   8040       Pgno n = pCur->apPage[iCellDepth+1]->pgno;
  7929   8041       unsigned char *pTmp;
  7930   8042   
  7931   8043       pCell = findCell(pLeaf, pLeaf->nCell-1);
  7932         -    if( pCell<&pLeaf->aData[4] ) return SQLITE_CORRUPT_BKPT;
         8044  +    if( NEVER(pCell<&pLeaf->aData[4]) ) return SQLITE_CORRUPT_BKPT;
  7933   8045       nCell = pLeaf->xCellSize(pLeaf, pCell);
  7934   8046       assert( MX_CELL_SIZE(pBt) >= nCell );
  7935   8047       pTmp = pBt->pTmpSpace;
  7936   8048       assert( pTmp!=0 );
  7937   8049       rc = sqlite3PagerWrite(pLeaf->pDbPage);
  7938   8050       insertCell(pPage, iCellIdx, pCell-4, nCell+4, pTmp, n, &rc);
  7939   8051       dropCell(pLeaf, pLeaf->nCell-1, nCell, &rc);