SQLite4
Check-in [5c3e17cc90]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a couple of problems in the code that handles free block lists.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | freelist-rework
Files: files | file ages | folders
SHA1: 5c3e17cc908e66de2bef3580ace33222cf6ea38f
User & Date: dan 2012-10-30 16:27:31
Context
2012-10-30
17:19
Fix another free block list related bug. check-in: f9e75777b0 user: dan tags: freelist-rework
16:27
Fix a couple of problems in the code that handles free block lists. check-in: 5c3e17cc90 user: dan tags: freelist-rework
2012-10-29
20:04
Rework the free block list storage so that it scales properly. Currently some test cases fail. check-in: ebca1063ac user: dan tags: freelist-rework
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsm_shared.c.

438
439
440
441
442
443
444

445
446
447
448
449
450
451
452
453
454
455
456

457
458
459
460
461
462
463
...
473
474
475
476
477
478
479


480

481
482
483






484
485
486
487
488


489
490


491
492
493
494
495
496
497
498
499
...
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
...
603
604
605
606
607
608
609




610
611
612
613
614
615
616
/* 
** Callback used by lsmWalkFreelist().
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  Freelist *pFree = p->pFreelist;


  while( (p->iFree < pFree->nEntry) ){
    FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
    if( pEntry->iBlk>iBlk ){
      break;
    }else{
      p->iFree++;
      if( pEntry->iId>=0 
       && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
      ){
        return 1;
      }
      if( pEntry->iBlk==iBlk ) return 0;

    }
  }

  return p->xUsr(p->pUsrctx, iBlk, iSnapshot);
}

/*
................................................................................
*/
int lsmWalkFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){
  int rc;


  WalkFreelistCtx ctx;

  ctx.pDb = pDb;
  ctx.pFreelist = &pDb->pWorker->freelist;
  ctx.iFree = 0;






  ctx.xUsr = x;
  ctx.pUsrctx = pCtx;

  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx);
  if( rc==LSM_OK ){


    int i;
    for(i=ctx.iFree; i<ctx.pFreelist->nEntry; i++){


      FreelistEntry *pEntry = &ctx.pFreelist->aEntry[i];
      if( pEntry->iId>=0 && ctx.xUsr(ctx.pUsrctx, pEntry->iBlk, pEntry->iId) ){
        return 1;
      }
    }
  }

  return rc;
}
................................................................................
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_BLOCKS
      lsmLogMessage(pDb, 0, "reusing block %d", iRet);
#endif
      rc = freelistAppend(pDb, iRet, -1);
    }else{
#ifdef LSM_LOG_BLOCKS
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
      iRet = ++(p->nBlock);
    }
  }

  assert( iRet>0 || rc!=LSM_OK );
................................................................................
** the freelist. Refreeing a block is different from freeing is, as a refreed
** block may be reused immediately. Whereas a freed block can not be reused 
** until (at least) after the next checkpoint.
*/
int lsmBlockRefree(lsm_db *pDb, int iBlk){
  int rc = LSM_OK;                /* Return code */
  Snapshot *p = pDb->pWorker;





  if( iBlk==p->nBlock ){
    p->nBlock--;
  }else{
    rc = freelistAppend(pDb, iBlk, 0);
  }








>
|
|
|
|
|
|
|
|
|
|
|
|
>







 







>
>
|
>
|
|
|
>
>
>
>
>
>
|
|

|
<
>
>

<
>
>
|
|







 







|




|







 







>
>
>
>







438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
...
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498

499
500
501

502
503
504
505
506
507
508
509
510
511
512
...
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
...
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
/* 
** Callback used by lsmWalkFreelist().
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  Freelist *pFree = p->pFreelist;

  if( pFree ){
    while( (p->iFree < pFree->nEntry) ){
      FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
      if( pEntry->iBlk>iBlk ){
        break;
      }else{
        p->iFree++;
        if( pEntry->iId>=0 
            && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
          ){
          return 1;
        }
        if( pEntry->iBlk==iBlk ) return 0;
      }
    }
  }

  return p->xUsr(p->pUsrctx, iBlk, iSnapshot);
}

/*
................................................................................
*/
int lsmWalkFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){
  int rc;
  int iCtx;

  WalkFreelistCtx ctx[2];

  ctx[0].pDb = pDb;
  ctx[0].pFreelist = &pDb->pWorker->freelist;
  ctx[0].iFree = 0;
  ctx[0].xUsr = walkFreelistCb;
  ctx[0].pUsrctx = (void *)&ctx[1];

  ctx[1].pDb = pDb;
  ctx[1].pFreelist = pDb->pFreelist;
  ctx[1].iFree = 0;
  ctx[1].xUsr = x;
  ctx[1].pUsrctx = pCtx;

  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx[0]);


  for(iCtx=0; iCtx<2; iCtx++){
    int i;

    WalkFreelistCtx *p = &ctx[iCtx];
    for(i=p->iFree; p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry; i++){
      FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
      if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
        return 1;
      }
    }
  }

  return rc;
}
................................................................................
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "reusing block %d", iRet);
#endif
      rc = freelistAppend(pDb, iRet, -1);
    }else{
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
      iRet = ++(p->nBlock);
    }
  }

  assert( iRet>0 || rc!=LSM_OK );
................................................................................
** the freelist. Refreeing a block is different from freeing is, as a refreed
** block may be reused immediately. Whereas a freed block can not be reused 
** until (at least) after the next checkpoint.
*/
int lsmBlockRefree(lsm_db *pDb, int iBlk){
  int rc = LSM_OK;                /* Return code */
  Snapshot *p = pDb->pWorker;

#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
#endif

  if( iBlk==p->nBlock ){
    p->nBlock--;
  }else{
    rc = freelistAppend(pDb, iBlk, 0);
  }

Changes to src/lsm_sorted.c.

2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
....
3725
3726
3727
3728
3729
3730
3731


3732
3733
3734
3735
3736
3737

3738
3739
3740
3741
3742
3743
3744
....
3746
3747
3748
3749
3750
3751
3752

3753
3754
3755
3756
3757
3758
3759
3760
....
3774
3775
3776
3777
3778
3779
3780





3781
3782
3783
3784
3785
3786
3787
....
3888
3889
3890
3891
3892
3893
3894

3895
3896
3897
3898
3899
3900
3901
3902
3903
      Snapshot *pWorker = pCsr->pDb->pWorker;
      if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){
        int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1;
        u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
        lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
        *ppVal = aVal;
        *pnVal = 8;
        pCsr->pDb->bUseFreelist = 1;
      }
      break;
    }

    default: {
      int iPtr = iVal-CURSOR_DATA_SEGMENT;
      if( iPtr<pCsr->nPtr ){
................................................................................

/*
** The cursor passed as the first argument is being used as the input for
** a merge operation. When this function is called, *piFlags contains the
** database entry flags for the current entry. The entry about to be written
** to the output.
**


*/
static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){
  int f = *piFlags;
  int iKey = pCsr->aTree[1];
  int i;


  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
    /* The ignore-delete flag is set when the output of the merge will form
    ** the oldest level in the database. In this case there is no point in
    ** retaining any range-delete flags.  */
    assert( (f & LSM_POINT_DELETE)==0 );
    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  }else{
................................................................................
      int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
      if( btreeflags & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){

      if( pCsr->aPtr[i].eType & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){
      f = 0;
    }
................................................................................

  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
  lsmMCursorKey(pCsr, &pKey, &nKey);
  eType = pCsr->eType;






  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */
................................................................................
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;
  assert( pnOvfl );


  pDb->pFreelist = &freelist;
  assert( pDb->bUseFreelist==0 );
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->pNext = pNext;







<







 







>
>






>







 







>
|







 







>
>
>
>
>







 







>

|







2323
2324
2325
2326
2327
2328
2329

2330
2331
2332
2333
2334
2335
2336
....
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746
....
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763
....
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
....
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
      Snapshot *pWorker = pCsr->pDb->pWorker;
      if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){
        int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1;
        u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
        lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
        *ppVal = aVal;
        *pnVal = 8;

      }
      break;
    }

    default: {
      int iPtr = iVal-CURSOR_DATA_SEGMENT;
      if( iPtr<pCsr->nPtr ){
................................................................................

/*
** The cursor passed as the first argument is being used as the input for
** a merge operation. When this function is called, *piFlags contains the
** database entry flags for the current entry. The entry about to be written
** to the output.
**
** Note that this function only has to work for cursors configured to 
** iterate forwards (not backwards).
*/
static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){
  int f = *piFlags;
  int iKey = pCsr->aTree[1];
  int i;

  assert( pCsr->flags & CURSOR_NEXT_OK );
  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
    /* The ignore-delete flag is set when the output of the merge will form
    ** the oldest level in the database. In this case there is no point in
    ** retaining any range-delete flags.  */
    assert( (f & LSM_POINT_DELETE)==0 );
    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  }else{
................................................................................
      int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
      if( btreeflags & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){
      SegmentPtr *pPtr = &pCsr->aPtr[i];
      if( pPtr->pPg && (pPtr->eType & LSM_END_DELETE) ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){
      f = 0;
    }
................................................................................

  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
  lsmMCursorKey(pCsr, &pKey, &nKey);
  eType = pCsr->eType;

  if( eType & LSM_SYSTEMKEY ){
    int i;
    i = 1;
  }

  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */
................................................................................
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;
  assert( pnOvfl );

  assert( pDb->bUseFreelist==0 );
  pDb->pFreelist = &freelist;
  pDb->bUseFreelist = 1;
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->pNext = pNext;