Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a couple of problems in the code that handles free block lists.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | freelist-rework
Files: files | file ages | folders
SHA1: 5c3e17cc908e66de2bef3580ace33222cf6ea38f
User & Date: dan 2012-10-30 16:27:31.322
Context
2012-10-30
17:19
Fix another free block list related bug. check-in: f9e75777b0 user: dan tags: freelist-rework
16:27
Fix a couple of problems in the code that handles free block lists. check-in: 5c3e17cc90 user: dan tags: freelist-rework
2012-10-29
20:04
Rework the free block list storage so that it scales properly. Currently some test cases fail. check-in: ebca1063ac user: dan tags: freelist-rework
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsm_shared.c.
438
439
440
441
442
443
444

445
446
447
448
449
450
451
452
453
454
455
456

457
458
459
460
461
462
463
/* 
** Callback used by lsmWalkFreelist().
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  Freelist *pFree = p->pFreelist;


  while( (p->iFree < pFree->nEntry) ){
    FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
    if( pEntry->iBlk>iBlk ){
      break;
    }else{
      p->iFree++;
      if( pEntry->iId>=0 
       && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
      ){
        return 1;
      }
      if( pEntry->iBlk==iBlk ) return 0;

    }
  }

  return p->xUsr(p->pUsrctx, iBlk, iSnapshot);
}

/*







>
|
|
|
|
|
|
|
|
|
|
|
|
>







438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
/* 
** Callback used by lsmWalkFreelist().
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  Freelist *pFree = p->pFreelist;

  if( pFree ){
    while( (p->iFree < pFree->nEntry) ){
      FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
      if( pEntry->iBlk>iBlk ){
        break;
      }else{
        p->iFree++;
        if( pEntry->iId>=0 
            && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
          ){
          return 1;
        }
        if( pEntry->iBlk==iBlk ) return 0;
      }
    }
  }

  return p->xUsr(p->pUsrctx, iBlk, iSnapshot);
}

/*
473
474
475
476
477
478
479


480

481
482
483






484
485
486
487
488

489

490
491
492
493
494
495
496
497
498
499
*/
int lsmWalkFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){
  int rc;


  WalkFreelistCtx ctx;

  ctx.pDb = pDb;
  ctx.pFreelist = &pDb->pWorker->freelist;
  ctx.iFree = 0;






  ctx.xUsr = x;
  ctx.pUsrctx = pCtx;

  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx);
  if( rc==LSM_OK ){

    int i;

    for(i=ctx.iFree; i<ctx.pFreelist->nEntry; i++){
      FreelistEntry *pEntry = &ctx.pFreelist->aEntry[i];
      if( pEntry->iId>=0 && ctx.xUsr(ctx.pUsrctx, pEntry->iBlk, pEntry->iId) ){
        return 1;
      }
    }
  }

  return rc;
}







>
>
|
>
|
|
|
>
>
>
>
>
>
|
|

|
|
>

>
|
|
|







475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
*/
int lsmWalkFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){
  int rc;
  int iCtx;

  WalkFreelistCtx ctx[2];

  ctx[0].pDb = pDb;
  ctx[0].pFreelist = &pDb->pWorker->freelist;
  ctx[0].iFree = 0;
  ctx[0].xUsr = walkFreelistCb;
  ctx[0].pUsrctx = (void *)&ctx[1];

  ctx[1].pDb = pDb;
  ctx[1].pFreelist = pDb->pFreelist;
  ctx[1].iFree = 0;
  ctx[1].xUsr = x;
  ctx[1].pUsrctx = pCtx;

  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx[0]);

  for(iCtx=0; iCtx<2; iCtx++){
    int i;
    WalkFreelistCtx *p = &ctx[iCtx];
    for(i=p->iFree; p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry; i++){
      FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
      if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
        return 1;
      }
    }
  }

  return rc;
}
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_BLOCKS
      lsmLogMessage(pDb, 0, "reusing block %d", iRet);
#endif
      rc = freelistAppend(pDb, iRet, -1);
    }else{
#ifdef LSM_LOG_BLOCKS
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
      iRet = ++(p->nBlock);
    }
  }

  assert( iRet>0 || rc!=LSM_OK );







|




|







568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "reusing block %d", iRet);
#endif
      rc = freelistAppend(pDb, iRet, -1);
    }else{
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
      iRet = ++(p->nBlock);
    }
  }

  assert( iRet>0 || rc!=LSM_OK );
603
604
605
606
607
608
609




610
611
612
613
614
615
616
** the freelist. Refreeing a block is different from freeing is, as a refreed
** block may be reused immediately. Whereas a freed block can not be reused 
** until (at least) after the next checkpoint.
*/
int lsmBlockRefree(lsm_db *pDb, int iBlk){
  int rc = LSM_OK;                /* Return code */
  Snapshot *p = pDb->pWorker;





  if( iBlk==p->nBlock ){
    p->nBlock--;
  }else{
    rc = freelistAppend(pDb, iBlk, 0);
  }








>
>
>
>







616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
** the freelist. Refreeing a block is different from freeing is, as a refreed
** block may be reused immediately. Whereas a freed block can not be reused 
** until (at least) after the next checkpoint.
*/
int lsmBlockRefree(lsm_db *pDb, int iBlk){
  int rc = LSM_OK;                /* Return code */
  Snapshot *p = pDb->pWorker;

#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
#endif

  if( iBlk==p->nBlock ){
    p->nBlock--;
  }else{
    rc = freelistAppend(pDb, iBlk, 0);
  }

Changes to src/lsm_sorted.c.
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
      Snapshot *pWorker = pCsr->pDb->pWorker;
      if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){
        int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1;
        u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
        lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
        *ppVal = aVal;
        *pnVal = 8;
        pCsr->pDb->bUseFreelist = 1;
      }
      break;
    }

    default: {
      int iPtr = iVal-CURSOR_DATA_SEGMENT;
      if( iPtr<pCsr->nPtr ){







<







2323
2324
2325
2326
2327
2328
2329

2330
2331
2332
2333
2334
2335
2336
      Snapshot *pWorker = pCsr->pDb->pWorker;
      if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){
        int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1;
        u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
        lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
        *ppVal = aVal;
        *pnVal = 8;

      }
      break;
    }

    default: {
      int iPtr = iVal-CURSOR_DATA_SEGMENT;
      if( iPtr<pCsr->nPtr ){
3725
3726
3727
3728
3729
3730
3731


3732
3733
3734
3735
3736
3737

3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752

3753
3754
3755
3756
3757
3758
3759
3760

/*
** The cursor passed as the first argument is being used as the input for
** a merge operation. When this function is called, *piFlags contains the
** database entry flags for the current entry. The entry about to be written
** to the output.
**


*/
static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){
  int f = *piFlags;
  int iKey = pCsr->aTree[1];
  int i;


  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
    /* The ignore-delete flag is set when the output of the merge will form
    ** the oldest level in the database. In this case there is no point in
    ** retaining any range-delete flags.  */
    assert( (f & LSM_POINT_DELETE)==0 );
    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  }else{
    if( iKey==0 ){
      int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
      if( btreeflags & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){

      if( pCsr->aPtr[i].eType & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){
      f = 0;
    }







>
>






>















>
|







3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759
3760
3761
3762
3763

/*
** The cursor passed as the first argument is being used as the input for
** a merge operation. When this function is called, *piFlags contains the
** database entry flags for the current entry. The entry about to be written
** to the output.
**
** Note that this function only has to work for cursors configured to 
** iterate forwards (not backwards).
*/
static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){
  int f = *piFlags;
  int iKey = pCsr->aTree[1];
  int i;

  assert( pCsr->flags & CURSOR_NEXT_OK );
  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
    /* The ignore-delete flag is set when the output of the merge will form
    ** the oldest level in the database. In this case there is no point in
    ** retaining any range-delete flags.  */
    assert( (f & LSM_POINT_DELETE)==0 );
    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  }else{
    if( iKey==0 ){
      int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
      if( btreeflags & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){
      SegmentPtr *pPtr = &pCsr->aPtr[i];
      if( pPtr->pPg && (pPtr->eType & LSM_END_DELETE) ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){
      f = 0;
    }
3774
3775
3776
3777
3778
3779
3780





3781
3782
3783
3784
3785
3786
3787

  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
  lsmMCursorKey(pCsr, &pKey, &nKey);
  eType = pCsr->eType;






  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */







>
>
>
>
>







3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795

  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
  lsmMCursorKey(pCsr, &pKey, &nKey);
  eType = pCsr->eType;

  if( eType & LSM_SYSTEMKEY ){
    int i;
    i = 1;
  }

  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */
3888
3889
3890
3891
3892
3893
3894

3895
3896
3897
3898
3899
3900
3901
3902
3903
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;
  assert( pnOvfl );


  pDb->pFreelist = &freelist;
  assert( pDb->bUseFreelist==0 );
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->pNext = pNext;







>

|







3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909
3910
3911
3912
  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;
  assert( pnOvfl );

  assert( pDb->bUseFreelist==0 );
  pDb->pFreelist = &freelist;
  pDb->bUseFreelist = 1;
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->pNext = pNext;