Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a problem in free-list management.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | rework-flow-control
Files: files | file ages | folders
SHA1: 57444405e3e3a0c2c5f204e0669bfbcc9dfb889f
User & Date: dan 2012-09-26 11:57:36.528
Context
2012-09-26
14:07
Fix a broken assert() statement. check-in: 797f6c5578 user: dan tags: rework-flow-control
11:57
Fix a problem in free-list management. check-in: 57444405e3 user: dan tags: rework-flow-control
2012-09-25
19:13
Fix bug in recycling of shared memory space. check-in: 156b93d03b user: dan tags: rework-flow-control
Changes
Unified Diff Ignore Whitespace Patch
Changes to lsm-test/lsmtest_tdb3.c.
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
      if( p->pWorker && p->bDoWork==0 ){
        pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
      }
      p->bDoWork = 0;
    }
  }
  pthread_mutex_unlock(&p->worker_mutex);
  printf("# worker EXIT %d\n", p->worker_rc);
  
  return 0;
}


static void mt_stop_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];







<







989
990
991
992
993
994
995

996
997
998
999
1000
1001
1002
      if( p->pWorker && p->bDoWork==0 ){
        pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
      }
      p->bDoWork = 0;
    }
  }
  pthread_mutex_unlock(&p->worker_mutex);

  
  return 0;
}


static void mt_stop_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
Changes to src/lsm_ckpt.c.
382
383
384
385
386
387
388

389
390
391
392
393
394
395
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
  if( nOvfl>=0 ){
    nFree -=  nOvfl;
  }else{

    nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL];
  }

  /* Initialize the output buffer */
  memset(&ckpt, 0, sizeof(CkptBuffer));
  ckpt.pEnv = pDb->pEnv;
  iOut = CKPT_HDR_SIZE;







>







382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
  if( nOvfl>=0 ){
    nFree -=  nOvfl;
  }else{
    assert( 0 );
    nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL];
  }

  /* Initialize the output buffer */
  memset(&ckpt, 0, sizeof(CkptBuffer));
  ckpt.pEnv = pDb->pEnv;
  iOut = CKPT_HDR_SIZE;
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
  }
  iOut += 2;
  assert( iOut<=1024 );

#if 0
  lsmLogMessage(pDb, rc, 
      "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl
  );
#endif

  *ppCkpt = (void *)ckpt.aCkpt;
  if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;







|







439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
  }
  iOut += 2;
  assert( iOut<=1024 );

#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, rc, 
      "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl
  );
#endif

  *ppCkpt = (void *)ckpt.aCkpt;
  if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
1129
1130
1131
1132
1133
1134
1135

1136
1137
1138
1139
1140
1141
1142

#if 0
if( bFlush ){
  printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1);
  fflush(stdout);
}
#endif

  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);







>







1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144

#if 0
if( bFlush ){
  printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1);
  fflush(stdout);
}
#endif
  assert( lsmFsIntegrityCheck(pDb) );
  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);
Changes to src/lsm_shared.c.
85
86
87
88
89
90
91

92

93
94
95
96
97
98
99

/*
** Append an entry to the free-list.
*/
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){

  /* Assert that this is not an attempt to insert a duplicate block number */

  assertNotInFreelist(p, iBlk);


  /* Extend the space allocated for the freelist, if required */
  assert( p->nAlloc>=p->nEntry );
  if( p->nAlloc==p->nEntry ){
    int nNew; 
    FreelistEntry *aNew;








>

>







85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

/*
** Append an entry to the free-list.
*/
int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){

  /* Assert that this is not an attempt to insert a duplicate block number */
#if 0
  assertNotInFreelist(p, iBlk);
#endif

  /* Extend the space allocated for the freelist, if required */
  assert( p->nAlloc>=p->nEntry );
  if( p->nAlloc==p->nEntry ){
    int nNew; 
    FreelistEntry *aNew;

497
498
499
500
501
502
503



504
505
506
507
508
509
510
** LSM_NOMEM).
*/
int lsmBlockFree(lsm_db *pDb, int iBlk){
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );
  /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */




  return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId);
}

/*
** Refree a database block. The worker snapshot must be held in order to call 
** this function.







>
>
>







499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
** LSM_NOMEM).
*/
int lsmBlockFree(lsm_db *pDb, int iBlk){
  Snapshot *p = pDb->pWorker;

  assert( lsmShmAssertWorker(pDb) );
  /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */
#ifdef LSM_LOG_FREELIST
  lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk);
#endif

  return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId);
}

/*
** Refree a database block. The worker snapshot must be held in order to call 
** this function.
810
811
812
813
814
815
816


817
818
819

820



821
822
823
824
825
826
827
828
829
830
** structure have already been restored. In either case, this function 
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
  int rc = LSM_OK;


  lsmLogEnd(pDb, bCommit);
  lsmTreeEndTransaction(pDb, bCommit);
  if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){

    lsmTreeMakeOld(pDb);



    if( pDb->bAutowork ){
      rc = lsmSortedAutoWork(pDb, 1);
    }
  }
  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  return rc;
}


/*







>
>

<

>

>
>
>
|
|
<







815
816
817
818
819
820
821
822
823
824

825
826
827
828
829
830
831
832

833
834
835
836
837
838
839
** structure have already been restored. In either case, this function 
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
  int rc = LSM_OK;
  int bAutowork = 0;

  lsmLogEnd(pDb, bCommit);

  if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){
    bAutowork = pDb->bAutowork;
    lsmTreeMakeOld(pDb);
  }
  lsmTreeEndTransaction(pDb, bCommit);

  if( rc==LSM_OK && bAutowork ){
    rc = lsmSortedAutoWork(pDb, 1);

  }
  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  return rc;
}


/*
Changes to src/lsm_sorted.c.
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
    sortedFreeLevel(pDb->pEnv, pNew);
  }

  if( rc==LSM_OK ){
    sortedInvokeWorkHook(pDb);
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  if( pnWrite ) *pnWrite = nWrite;
  pDb->pWorker->nWrite += nWrite;
#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
#endif
  return rc;
}







<







3603
3604
3605
3606
3607
3608
3609

3610
3611
3612
3613
3614
3615
3616
    sortedFreeLevel(pDb->pEnv, pNew);
  }

  if( rc==LSM_OK ){
    sortedInvokeWorkHook(pDb);
  }


  if( pnWrite ) *pnWrite = nWrite;
  pDb->pWorker->nWrite += nWrite;
#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");
#endif
  return rc;
}
3632
3633
3634
3635
3636
3637
3638









3639
3640
3641
3642
3643
3644
3645

3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656

3657
3658
3659
3660
3661
3662
3663
  Level **ppNew                   /* New, merged, level */
){
  int rc = LSM_OK;                /* Return Code */
  Level *pNew;                    /* New Level object */
  int bUseNext = 0;               /* True to link in next separators */
  Merge *pMerge;                  /* New Merge object */
  int nByte;                      /* Bytes of space allocated at pMerge */










  /* Allocate the new Level object */
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, 
                                        nMerge * sizeof(Segment), &rc);
  }


  /* Populate the new Level object */
  if( rc==LSM_OK ){
    Level *pNext = 0;             /* Level following pNew */
    int i;
    Level *pTopLevel;
    Level *p = pLevel;
    Level **pp;
    pNew->nRight = nMerge;
    pNew->iAge = pLevel->iAge+1;
    for(i=0; i<nMerge; i++){

      pNext = p->pNext;
      pNew->aRhs[i] = p->lhs;
      sortedFreeLevel(pDb->pEnv, p);
      p = pNext;
    }

    /* Replace the old levels with the new. */







>
>
>
>
>
>
>
>
>







>











>







3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
  Level **ppNew                   /* New, merged, level */
){
  int rc = LSM_OK;                /* Return Code */
  Level *pNew;                    /* New Level object */
  int bUseNext = 0;               /* True to link in next separators */
  Merge *pMerge;                  /* New Merge object */
  int nByte;                      /* Bytes of space allocated at pMerge */

#ifdef LSM_DEBUG
  int iLevel;
  Level *pX = pLevel;
  for(iLevel=0; iLevel<nMerge; iLevel++){
    assert( pX->nRight==0 );
    pX = pX->pNext;
  }
#endif

  /* Allocate the new Level object */
  pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
  if( pNew ){
    pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, 
                                        nMerge * sizeof(Segment), &rc);
  }


  /* Populate the new Level object */
  if( rc==LSM_OK ){
    Level *pNext = 0;             /* Level following pNew */
    int i;
    Level *pTopLevel;
    Level *p = pLevel;
    Level **pp;
    pNew->nRight = nMerge;
    pNew->iAge = pLevel->iAge+1;
    for(i=0; i<nMerge; i++){
      assert( p->nRight==0 );
      pNext = p->pNext;
      pNew->aRhs[i] = p->lhs;
      sortedFreeLevel(pDb->pEnv, p);
      p = pNext;
    }

    /* Replace the old levels with the new. */
3839
3840
3841
3842
3843
3844
3845

3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
          nBest = nThis;
        }
      }
      if( pLevel->nRight ){
        if( pLevel->nRight>nBest ){
          nBest = pLevel->nRight;
          pBest = pLevel;

          nThis = 0;
          pThis = 0;
        }
      }else{
        pThis = pLevel;
        nThis = 1;
      }
    }
  }
  if( nThis>nBest ){







>
|
|
<







3849
3850
3851
3852
3853
3854
3855
3856
3857
3858

3859
3860
3861
3862
3863
3864
3865
          nBest = nThis;
        }
      }
      if( pLevel->nRight ){
        if( pLevel->nRight>nBest ){
          nBest = pLevel->nRight;
          pBest = pLevel;
        }
        nThis = 0;
        pThis = 0;

      }else{
        pThis = pLevel;
        nThis = 1;
      }
    }
  }
  if( nThis>nBest ){
3872
3873
3874
3875
3876
3877
3878


3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
  }

  return rc;
}

static int sortedDbIsFull(lsm_db *pDb){
  Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);


  if( pTop && pTop->iAge==0
   && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
  ){
    return 1;
  }
  return 0;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int bOptimize,                  /* True to merge less than nMerge levels */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;

  assert( lsmFsIntegrityCheck(pDb) );
  assert( pWorker );

  if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;

  while( nRemaining>0 ){
    Level *pLevel = 0;

    /* Find a level to work on. */
    rc = sortedSelectLevel(pDb, bOptimize, &pLevel);







>
>



















<

<







3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908
3909

3910

3911
3912
3913
3914
3915
3916
3917
  }

  return rc;
}

static int sortedDbIsFull(lsm_db *pDb){
  Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);

  if( lsmDatabaseFull(pDb) ) return 1;
  if( pTop && pTop->iAge==0
   && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
  ){
    return 1;
  }
  return 0;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int bOptimize,                  /* True to merge less than nMerge levels */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;


  assert( pWorker );

  if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK;

  while( nRemaining>0 ){
    Level *pLevel = 0;

    /* Find a level to work on. */
    rc = sortedSelectLevel(pDb, bOptimize, &pLevel);
3917
3918
3919
3920
3921
3922
3923

3924
3925
3926
3927
3928
3929
3930
      assert( mergeworker.nWork==0 );
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && mergeworker.nWork<nRemaining 
      ){
        rc = mergeWorkerStep(&mergeworker);
      }

      nRemaining -= mergeworker.nWork;

      /* Check if the merge operation is completely finished. If so, the
      ** Merge object and the right-hand-side of the level can be deleted. 
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged







>







3927
3928
3929
3930
3931
3932
3933
3934
3935
3936
3937
3938
3939
3940
3941
      assert( mergeworker.nWork==0 );
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && mergeworker.nWork<nRemaining 
      ){
        rc = mergeWorkerStep(&mergeworker);
      }
      assert( mergeworker.nWork>0 );
      nRemaining -= mergeworker.nWork;

      /* Check if the merge operation is completely finished. If so, the
      ** Merge object and the right-hand-side of the level can be deleted. 
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
4010
4011
4012
4013
4014

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
    }
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  if( pnWrite ) *pnWrite = (nWork - nRemaining);
  pWorker->nWrite += (nWork - nRemaining);

#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
#endif
  return rc;







<







4011
4012
4013
4014
4015
4016
4017

4018
4019
4020
4021
4022
4023
4024

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
    }
  }


  if( pnWrite ) *pnWrite = (nWork - nRemaining);
  pWorker->nWrite += (nWork - nRemaining);

#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
#endif
  return rc;
4146
4147
4148
4149
4150
4151
4152
4153
4154
4155
4156
4157
4158
4159
4160
4161
4162
4163
4164



4165
4166
4167





4168

4169
4170
4171
4172
4173
4174
4175
        rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg);
        nRem -= nPg;
        if( rc==LSM_OK && pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        bFlush = 1;
        bToplevel = 0;
        nOvfl = 0;
      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
    nRem -= nPg;
    if( nPg ) bToplevel = 1;



  }

  if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){





    rc = sortedNewToplevel(pDb, 0, &nOvfl, 0);

  }

  if( rc==LSM_OK && (nRem!=nMax) ){
    rc = lsmSortedFlushDb(pDb);
    lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  }else{
    int rcdummy = LSM_BUSY;







<










|
>
>
>



>
>
>
>
>
|
>







4156
4157
4158
4159
4160
4161
4162

4163
4164
4165
4166
4167
4168
4169
4170
4171
4172
4173
4174
4175
4176
4177
4178
4179
4180
4181
4182
4183
4184
4185
4186
4187
4188
4189
4190
4191
4192
4193
        rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg);
        nRem -= nPg;
        if( rc==LSM_OK && pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        bFlush = 1;
        bToplevel = 0;

      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
    nRem -= nPg;
    if( nPg ){
      bToplevel = 1;
      nOvfl = 0;
    }
  }

  if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){
    while( rc==LSM_OK && sortedDbIsFull(pDb) ){
      int nPg = 0;
      rc = sortedWork(pDb, 16, 0, 1, &nPg);
    }
    if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
      rc = sortedNewToplevel(pDb, 0, &nOvfl, 0);
    }
  }

  if( rc==LSM_OK && (nRem!=nMax) ){
    rc = lsmSortedFlushDb(pDb);
    lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  }else{
    int rcdummy = LSM_BUSY;