Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a couple of problems in the code that handles free block lists.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | freelist-rework
Files: files | file ages | folders
SHA1: 5c3e17cc908e66de2bef3580ace33222cf6ea38f
User & Date: dan 2012-10-30 16:27:31.322
Context
2012-10-30
17:19
Fix another free block list related bug. check-in: f9e75777b0 user: dan tags: freelist-rework
16:27
Fix a couple of problems in the code that handles free block lists. check-in: 5c3e17cc90 user: dan tags: freelist-rework
2012-10-29
20:04
Rework the free block list storage so that it scales properly. Currently some test cases fail. check-in: ebca1063ac user: dan tags: freelist-rework
Changes
Side-by-Side Diff Show Whitespace Changes Patch
Changes to src/lsm_shared.c.
438
439
440
441
442
443
444

445
446
447
448
449
450
451
452
453
454
455
456
457
458

459
460
461
462
463
464
465
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467







+














+







/* 
** Callback used by lsmWalkFreelist().
*/
static int walkFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){
  WalkFreelistCtx *p = (WalkFreelistCtx *)pCtx;
  Freelist *pFree = p->pFreelist;

  if( pFree ){
  while( (p->iFree < pFree->nEntry) ){
    FreelistEntry *pEntry = &pFree->aEntry[p->iFree];
    if( pEntry->iBlk>iBlk ){
      break;
    }else{
      p->iFree++;
      if( pEntry->iId>=0 
       && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) 
      ){
        return 1;
      }
      if( pEntry->iBlk==iBlk ) return 0;
    }
  }
  }

  return p->xUsr(p->pUsrctx, iBlk, iSnapshot);
}

/*
** The database handle passed as the first argument must be the worker
** connection. This function iterates through the contents of the current
473
474
475
476
477
478
479


480
481
482
483
484
485













486
487
488



489

490
491
492



493
494
495
496
497
498
499
475
476
477
478
479
480
481
482
483






484
485
486
487
488
489
490
491
492
493
494
495
496
497


498
499
500
501
502



503
504
505
506
507
508
509
510
511
512







+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+

-
-
+
+
+

+
-
-
-
+
+
+







*/
int lsmWalkFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  int (*x)(void *, int, i64),     /* Callback function */
  void *pCtx                      /* First argument to pass to callback */
){
  int rc;
  int iCtx;

  WalkFreelistCtx ctx;
  ctx.pDb = pDb;
  ctx.pFreelist = &pDb->pWorker->freelist;
  ctx.iFree = 0;
  ctx.xUsr = x;
  ctx.pUsrctx = pCtx;
  WalkFreelistCtx ctx[2];

  ctx[0].pDb = pDb;
  ctx[0].pFreelist = &pDb->pWorker->freelist;
  ctx[0].iFree = 0;
  ctx[0].xUsr = walkFreelistCb;
  ctx[0].pUsrctx = (void *)&ctx[1];

  ctx[1].pDb = pDb;
  ctx[1].pFreelist = pDb->pFreelist;
  ctx[1].iFree = 0;
  ctx[1].xUsr = x;
  ctx[1].pUsrctx = pCtx;

  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx);
  if( rc==LSM_OK ){
  rc = lsmSortedWalkFreelist(pDb, walkFreelistCb, (void *)&ctx[0]);

  for(iCtx=0; iCtx<2; iCtx++){
    int i;
    WalkFreelistCtx *p = &ctx[iCtx];
    for(i=ctx.iFree; i<ctx.pFreelist->nEntry; i++){
      FreelistEntry *pEntry = &ctx.pFreelist->aEntry[i];
      if( pEntry->iId>=0 && ctx.xUsr(ctx.pUsrctx, pEntry->iBlk, pEntry->iId) ){
    for(i=p->iFree; p->pFreelist && rc==LSM_OK && i<p->pFreelist->nEntry; i++){
      FreelistEntry *pEntry = &p->pFreelist->aEntry[i];
      if( pEntry->iId>=0 && p->xUsr(p->pUsrctx, pEntry->iBlk, pEntry->iId) ){
        return 1;
      }
    }
  }

  return rc;
}
555
556
557
558
559
560
561
562

563
564
565
566
567

568
569
570
571
572
573
574
568
569
570
571
572
573
574

575
576
577
578
579

580
581
582
583
584
585
586
587







-
+




-
+







  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, &iRet);

  /* If a block was found in the free block list, use it and remove it from 
  ** the list. Otherwise, if no suitable block was found, allocate one from
  ** the end of the file.  */
  if( rc==LSM_OK ){
    if( iRet>0 ){
#ifdef LSM_LOG_BLOCKS
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "reusing block %d", iRet);
#endif
      rc = freelistAppend(pDb, iRet, -1);
    }else{
#ifdef LSM_LOG_BLOCKS
#ifdef LSM_LOG_FREELIST
      lsmLogMessage(pDb, 0, "extending file to %d blocks", iRet);
#endif
      iRet = ++(p->nBlock);
    }
  }

  assert( iRet>0 || rc!=LSM_OK );
603
604
605
606
607
608
609




610
611
612
613
614
615
616
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633







+
+
+
+







** the freelist. Refreeing a block is different from freeing is, as a refreed
** block may be reused immediately. Whereas a freed block can not be reused 
** until (at least) after the next checkpoint.
*/
int lsmBlockRefree(lsm_db *pDb, int iBlk){
  int rc = LSM_OK;                /* Return code */
  Snapshot *p = pDb->pWorker;

#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, LSM_OK, "lsmBlockRefree(): Refree block %d", iBlk);
#endif

  if( iBlk==p->nBlock ){
    p->nBlock--;
  }else{
    rc = freelistAppend(pDb, iBlk, 0);
  }

Changes to src/lsm_sorted.c.
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2323
2324
2325
2326
2327
2328
2329

2330
2331
2332
2333
2334
2335
2336







-







      Snapshot *pWorker = pCsr->pDb->pWorker;
      if( pWorker && pWorker->freelist.nEntry > pCsr->iFree ){
        int iEntry = pWorker->freelist.nEntry - pCsr->iFree - 1;
        u8 *aVal = &((u8 *)(pCsr->pSystemVal))[4];
        lsmPutU64(aVal, pWorker->freelist.aEntry[iEntry].iId);
        *ppVal = aVal;
        *pnVal = 8;
        pCsr->pDb->bUseFreelist = 1;
      }
      break;
    }

    default: {
      int iPtr = iVal-CURSOR_DATA_SEGMENT;
      if( iPtr<pCsr->nPtr ){
3725
3726
3727
3728
3729
3730
3731


3732
3733
3734
3735
3736
3737

3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752

3753

3754
3755
3756
3757
3758
3759
3760
3724
3725
3726
3727
3728
3729
3730
3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752
3753
3754
3755

3756
3757
3758
3759
3760
3761
3762
3763







+
+






+















+
-
+








/*
** The cursor passed as the first argument is being used as the input for
** a merge operation. When this function is called, *piFlags contains the
** database entry flags for the current entry. The entry about to be written
** to the output.
**
** Note that this function only has to work for cursors configured to 
** iterate forwards (not backwards).
*/
static void mergeRangeDeletes(MultiCursor *pCsr, int *piFlags){
  int f = *piFlags;
  int iKey = pCsr->aTree[1];
  int i;

  assert( pCsr->flags & CURSOR_NEXT_OK );
  if( pCsr->flags & CURSOR_IGNORE_DELETE ){
    /* The ignore-delete flag is set when the output of the merge will form
    ** the oldest level in the database. In this case there is no point in
    ** retaining any range-delete flags.  */
    assert( (f & LSM_POINT_DELETE)==0 );
    f &= ~(LSM_START_DELETE|LSM_END_DELETE);
  }else{
    if( iKey==0 ){
      int btreeflags = lsmTreeCursorFlags(pCsr->apTreeCsr[1]);
      if( btreeflags & LSM_END_DELETE ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    for(i=LSM_MAX(0, iKey+1-CURSOR_DATA_SEGMENT); i<pCsr->nPtr; i++){
      SegmentPtr *pPtr = &pCsr->aPtr[i];
      if( pCsr->aPtr[i].eType & LSM_END_DELETE ){
      if( pPtr->pPg && (pPtr->eType & LSM_END_DELETE) ){
        f |= (LSM_START_DELETE|LSM_END_DELETE);
      }
    }

    if( (f & LSM_START_DELETE) && (f & LSM_END_DELETE) && (f & LSM_INSERT)==0 ){
      f = 0;
    }
3774
3775
3776
3777
3778
3779
3780





3781
3782
3783
3784
3785
3786
3787
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795







+
+
+
+
+








  pCsr = pMW->pCsr;
  pSeg = &pMW->pLevel->lhs;

  /* Pull the next record out of the source cursor. */
  lsmMCursorKey(pCsr, &pKey, &nKey);
  eType = pCsr->eType;

  if( eType & LSM_SYSTEMKEY ){
    int i;
    i = 1;
  }

  /* Figure out if the output record may have a different pointer value
  ** than the previous. This is the case if the current key is identical to
  ** a key that appears in the lowest level run being merged. If so, set 
  ** iPtr to the absolute pointer value. If not, leave iPtr set to zero, 
  ** indicating that the output pointer value should be a copy of the pointer 
  ** value written with the previous key.  */
3888
3889
3890
3891
3892
3893
3894

3895
3896

3897
3898
3899
3900
3901
3902
3903
3896
3897
3898
3899
3900
3901
3902
3903
3904

3905
3906
3907
3908
3909
3910
3911
3912







+

-
+







  Level *pNext = 0;               /* The current top level */
  Level *pNew;                    /* The new level itself */
  Segment *pDel = 0;              /* Delete separators from this segment */
  int nWrite = 0;                 /* Number of database pages written */
  Freelist freelist;
  assert( pnOvfl );

  assert( pDb->bUseFreelist==0 );
  pDb->pFreelist = &freelist;
  assert( pDb->bUseFreelist==0 );
  pDb->bUseFreelist = 1;
  memset(&freelist, 0, sizeof(freelist));

  /* Allocate the new level structure to write to. */
  pNext = lsmDbSnapshotLevel(pDb->pWorker);
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->pNext = pNext;