SQLite4
Check-in [5c3e17cc90]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a couple of problems in the code that handles free block lists.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | freelist-rework
Files: files | file ages | folders
SHA1: 5c3e17cc908e66de2bef3580ace33222cf6ea38f
User & Date: dan 2012-10-30 16:27:31
Context
2012-10-30
17:19
Fix another free block list related bug. check-in: f9e75777b0 user: dan tags: freelist-rework
16:27
Fix a couple of problems in the code that handles free block lists. check-in: 5c3e17cc90 user: dan tags: freelist-rework
2012-10-29
20:04
Rework the free block list storage so that it scales properly. Currently some test cases fail. check-in: ebca1063ac user: dan tags: freelist-rework
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/lsm_shared.c.

   438    438   /* 
   439    439   ** Callback used by lsmWalkFreelist().
   440    440   */
   441    441   static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
   442    442     WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
   443    443     Freelist *pFree = p->pFreelist;
   444    444   
   445         -  while( (p->iFree < pFree->nEntry) ){
   446         -    FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
   447         -    if( pEntry->iBlk>iBlk ){
   448         -      break;
   449         -    }else{
   450         -      p->iFree++;
   451         -      if( pEntry->iId>=0 
   452         -       && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
   453         -      ){
   454         -        return 1;
          445  +  if( pFree ){
          446  +    while( (p->iFree < pFree->nEntry) ){
          447  +      FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
          448  +      if( pEntry->iBlk>iBlk ){
          449  +        break;
          450  +      }else{
          451  +        p->iFree++;
          452  +        if( pEntry->iId>=0 
          453  +            && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
          454  +          ){
          455  +          return 1;
          456  +        }
          457  +        if( pEntry->iBlk==iBlk ) return 0;
   455    458         }
   456         -      if( pEntry->iBlk==iBlk ) return 0;
   457    459       }
   458    460     }
   459    461   
   460    462     return p->xUsr(p->pUsrctx, iBlk, iSnapshot);
   461    463   }
   462    464   
   463    465   /*
................................................................................
   473    475   */
   474    476   int lsmWalkFreelist(
   475    477     lsm_db *pDb,                    /* Database handle (must be worker) */
   476    478     int (*x)(void *, int, i64),     /* Callback function */
   477    479     void *pCtx                      /* First argument to pass to callback */
   478    480   ){
   479    481     int rc;
   480         -  WalkFreelistCtx ctx;
   481         -  ctx.pDb = pDb;
   482         -  ctx.pFreelist = &pDb->pWorker->freelist;
   483         -  ctx.iFree = 0;
   484         -  ctx.xUsr = x;
   485         -  ctx.pUsrctx = pCtx;
          482  +  int iCtx;
   486    483   
   487         -  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx);
   488         -  if( rc==LSM_OK ){
          484  +  WalkFreelistCtx ctx[2];
          485  +
          486  +  ctx[0].pDb = pDb;
          487  +  ctx[0].pFreelist = &pDb->pWorker->freelist;
          488  +  ctx[0].iFree = 0;
          489  +  ctx[0].xUsr = walkFreelistCb;
          490  +  ctx[0].pUsrctx = (void *)&ctx[1];
          491  +
          492  +  ctx[1].pDb = pDb;
          493  +  ctx[1].pFreelist = pDb->pFreelist;
          494  +  ctx[1].iFree = 0;
          495  +  ctx[1].xUsr = x;
          496  +  ctx[1].pUsrctx = pCtx;
          497  +
          498  +  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx[0]);
          499  +
          500  +  for(iCtx=0; iCtx<2; iCtx++){
   489    501       int i;
   490         -    for(i=ctx.iFree; i<ctx.pFreelist->nEntry; i++){
   491         -      FreelistEntry *pEntry = &ctx.pFreelist->aEntry[i];
   492         -      if( pEntry->iId>=0 && ctx.xUsr(ctx.pUsrctx, pEntry->iBlk, pEntry->iId) ){
          502  +    WalkFreelistCtx *p = &ctx[iCtx];
          503  +    for(i=p->iFree; p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry; i++){
          504  +      FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
          505  +      if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
   493    506           return 1;
   494    507         }
   495    508       }
   496    509     }
   497    510   
   498    511     return rc;
   499    512   }
................................................................................
   555    568     if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);
   556    569   
   557    570     /* If a block was found in the free block list, use it and remove it from 
   558    571     ** the list. Otherwise, if no suitable block was found, allocate one from
   559    572     ** the end of the file.  */
   560    573     if( rc==LSM_OK ){
   561    574       if( iRet>0 ){
   562         -#ifdef LSM_LOG_BLOCKS
          575  +#ifdef LSM_LOG_FREELIST
   563    576         lsmLogMessage(pDb, 0, "reusing block %d", iRet);
   564    577   #endif
   565    578         rc = freelistAppend(pDb, iRet, -1);
   566    579       }else{
   567         -#ifdef LSM_LOG_BLOCKS
          580  +#ifdef LSM_LOG_FREELIST
   568    581         lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
   569    582   #endif
   570    583         iRet = ++(p->nBlock);
   571    584       }
   572    585     }
   573    586   
   574    587     assert( iRet>0 || rc!=LSM_OK );
................................................................................
   603    616   ** the freelist. Refreeing a block is different from freeing is, as a refreed
   604    617   ** block may be reused immediately. Whereas a freed block can not be reused 
   605    618   ** until (at least) after the next checkpoint.
   606    619   */
   607    620   int lsmBlockRefree(lsm_db *pDb, int iBlk){
   608    621     int rc = LSM_OK;                /* Return code */
   609    622     Snapshot *p = pDb->pWorker;
          623  +
          624  +#ifdef LSM_LOG_FREELIST
          625  +  lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
          626  +#endif
   610    627   
   611    628     if( iBlk==p->nBlock ){
   612    629       p->nBlock--;
   613    630     }else{
   614    631       rc = freelistAppend(pDb, iBlk, 0);
   615    632     }
   616    633   

Changes to src/lsm_sorted.c.

  2323   2323         Snapshot *pWorker = pCsr->pDb->pWorker;
  2324   2324         if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){
  2325   2325           int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1;
  2326   2326           u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
  2327   2327           lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
  2328   2328           *ppVal = aVal;
  2329   2329           *pnVal = 8;
  2330         -        pCsr->pDb->bUseFreelist = 1;
  2331   2330         }
  2332   2331         break;
  2333   2332       }
  2334   2333   
  2335   2334       default: {
  2336   2335         int iPtr = iVal-CURSOR_DATA_SEGMENT;
  2337   2336         if( iPtr<pCsr->nPtr ){
................................................................................
  3725   3724   
  3726   3725   /*
  3727   3726   ** The cursor passed as the first argument is being used as the input for
  3728   3727   ** a merge operation. When this function is called, *piFlags contains the
  3729   3728   ** database entry flags for the current entry. The entry about to be written
  3730   3729   ** to the output.
  3731   3730   **
         3731  +** Note that this function only has to work for cursors configured to 
         3732  +** iterate forwards (not backwards).
  3732   3733   */
  3733   3734   static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){
  3734   3735     int f = *piFlags;
  3735   3736     int iKey = pCsr->aTree[1];
  3736   3737     int i;
  3737   3738   
         3739  +  assert( pCsr->flags & CURSOR_NEXT_OK );
  3738   3740     if( pCsr->flags & CURSOR_IGNORE_DELETE ){
  3739   3741       /* The ignore-delete flag is set when the output of the merge will form
  3740   3742       ** the oldest level in the database. In this case there is no point in
  3741   3743       ** retaining any range-delete flags.  */
  3742   3744       assert( (f & LSM_POINT_DELETE)==0 );
  3743   3745       f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  3744   3746     }else{
................................................................................
  3746   3748         int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
  3747   3749         if( btreeflags & LSM_END_DELETE ){
  3748   3750           f |= (LSM_START_DELETE|LSM_END_DELETE);
  3749   3751         }
  3750   3752       }
  3751   3753   
  3752   3754       for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){
  3753         -      if( pCsr->aPtr[i].eType & LSM_END_DELETE ){
         3755  +      SegmentPtr *pPtr = &pCsr->aPtr[i];
         3756  +      if( pPtr->pPg && (pPtr->eType & LSM_END_DELETE) ){
  3754   3757           f |= (LSM_START_DELETE|LSM_END_DELETE);
  3755   3758         }
  3756   3759       }
  3757   3760   
  3758   3761       if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){
  3759   3762         f = 0;
  3760   3763       }
................................................................................
  3774   3777   
  3775   3778     pCsr = pMW->pCsr;
  3776   3779     pSeg = &pMW->pLevel->lhs;
  3777   3780   
  3778   3781     /* Pull the next record out of the source cursor. */
  3779   3782     lsmMCursorKey(pCsr, &pKey, &nKey);
  3780   3783     eType = pCsr->eType;
         3784  +
         3785  +  if( eType & LSM_SYSTEMKEY ){
         3786  +    int i;
         3787  +    i = 1;
         3788  +  }
  3781   3789   
  3782   3790     /* Figure out if the output record may have a different pointer value
  3783   3791     ** than the previous. This is the case if the current key is identical to
  3784   3792     ** a key that appears in the lowest level run being merged. If so, set 
  3785   3793     ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  3786   3794     ** indicating that the output pointer value should be a copy of the pointer 
  3787   3795     ** value written with the previous key.  */
................................................................................
  3888   3896     Level *pNext = 0;               /* The current top level */
  3889   3897     Level *pNew;                    /* The new level itself */
  3890   3898     Segment *pDel = 0;              /* Delete separators from this segment */
  3891   3899     int nWrite = 0;                 /* Number of database pages written */
  3892   3900     Freelist freelist;
  3893   3901     assert( pnOvfl );
  3894   3902   
  3895         -  pDb->pFreelist = &freelist;
  3896   3903     assert( pDb->bUseFreelist==0 );
         3904  +  pDb->pFreelist = &freelist;
         3905  +  pDb->bUseFreelist = 1;
  3897   3906     memset(&freelist, 0, sizeof(freelist));
  3898   3907   
  3899   3908     /* Allocate the new level structure to write to. */
  3900   3909     pNext = lsmDbSnapshotLevel(pDb->pWorker);
  3901   3910     pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  3902   3911     if( pNew ){
  3903   3912       pNew->pNext = pNext;