Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:If a free-list-only segment is generated while a merge of the top-level segment is underway, add the new segment to the merge inputs immediately. Also, if auto-checkpoints are enabled, schedule a checkpoint after each block is moved within an lsm_work(nmerge=1) call.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | block-redirects
Files: files | file ages | folders
SHA1: 89b4286682648710710d9c150bb127c16eb78501
User & Date: dan 2013-01-21 16:53:00.951
Context
2013-01-21
19:50
Add tests for block-redirects to lsmtest. check-in: eec16b0f2f user: dan tags: block-redirects
16:53
If a free-list-only segment is generated while a merge of the top-level segment is underway, add the new segment to the merge inputs immediately. Also, if auto-checkpoints are enabled, schedule a checkpoint after each block is moved within an lsm_work(nmerge=1) call. check-in: 89b4286682 user: dan tags: block-redirects
14:23
Fix LSM_INFO_PAGE_DUMP so that it works with redirected blocks. check-in: deb0ccacd4 user: dan tags: block-redirects
Changes
Unified Diff Show Whitespace Changes Patch
Changes to src/lsm_sorted.c.
4775
4776
4777
4778
4779
4780
4781
4782





























































4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
4793
4794
4795
4796
4797
4798
4799

4800
4801
4802
4803
4804
4805
4806
4807
4808
4809
4810

4811
4812
4813
4814
4815
4816
4817
      bRet = 0;
    }
    *pRc = rc;
  }
  assert( *pRc==LSM_OK || bRet==0 );
  return bRet;
}






























































int lsmSaveWorker(lsm_db *pDb, int bFlush){
  Snapshot *p = pDb->pWorker;
  if( p->freelist.nEntry>pDb->nMaxFreelist ){
    int rc = sortedNewToplevel(pDb, TREE_NONE, 0);
    if( rc!=LSM_OK ) return rc;
  }
  return lsmCheckpointSaveWorker(pDb, bFlush);
}

static int doLsmSingleWork(
  lsm_db *pDb, 
  int bShutdown,
  int nMerge,                     /* Minimum segments to merge together */
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
  int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
){

  int rc = LSM_OK;                /* Return code */
  int bDirty = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);
  if( rc!=LSM_OK ) return rc;


  /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  ** that this call stops writing when the auto-checkpoint is due. The
  ** caller will do the checkpoint, then possibly call this function again. */
  if( bShutdown==0 && pDb->nAutockpt ){
    u32 nSync;
    u32 nUnsync;








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



|













>











>







4775
4776
4777
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
4793
4794
4795
4796
4797
4798
4799
4800
4801
4802
4803
4804
4805
4806
4807
4808
4809
4810
4811
4812
4813
4814
4815
4816
4817
4818
4819
4820
4821
4822
4823
4824
4825
4826
4827
4828
4829
4830
4831
4832
4833
4834
4835
4836
4837
4838
4839
4840
4841
4842
4843
4844
4845
4846
4847
4848
4849
4850
4851
4852
4853
4854
4855
4856
4857
4858
4859
4860
4861
4862
4863
4864
4865
4866
4867
4868
4869
4870
4871
4872
4873
4874
4875
4876
4877
4878
4879
4880
      bRet = 0;
    }
    *pRc = rc;
  }
  assert( *pRc==LSM_OK || bRet==0 );
  return bRet;
}

/*
** Create a new free-list only top-level segment. Return LSM_OK if successful
** or an LSM error code if some error occurs.
*/
static int sortedNewFreelistOnly(lsm_db *pDb){
  int rc;
  Level *pLvl;

  pLvl = pDb->pWorker->pLevel;
  if( pLvl->pNext!=0 || pLvl->nRight==0 ){
    pLvl = 0;
  }

  rc = sortedNewToplevel(pDb, TREE_NONE, 0);
  if( rc==LSM_OK && pLvl ){
    Level *pNew = pDb->pWorker->pLevel;
    assert( pNew->pNext==pLvl );

    if( pLvl->pSplitKey==0 ){
      sortedSplitkey(pDb, pLvl, &rc);
    }
    if( rc==LSM_OK && pLvl->iSplitTopic==0 ){
      /* Add the new top-level to the rhs of pLvl. */
      Merge *pMerge;

      Segment *aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, 
          sizeof(Segment) * (pLvl->nRight + 1), &rc
      );
      if( rc==LSM_OK ){
        memcpy(&aRhs[1], pLvl->aRhs, sizeof(Segment) * pLvl->nRight);
        aRhs[0] = pNew->lhs;
        lsmFree(pDb->pEnv, pLvl->aRhs);
        pLvl->aRhs = aRhs;
        pLvl->nRight++;
      }

      /* Also add an entry to the Merge object */
      pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, 
          sizeof(Merge) + sizeof(MergeInput)*(pLvl->pMerge->nInput+1), &rc
      );
      if( rc==LSM_OK ){
        memcpy(pMerge, pLvl->pMerge, sizeof(Merge));
        pMerge->aInput = (MergeInput *)&pMerge[1];
        memcpy(&pMerge->aInput[1], pLvl->pMerge->aInput, 
            sizeof(MergeInput)*(pLvl->pMerge->nInput+1)
        );
        pMerge->aInput[0].iPg = aRhs[0].iFirst;
        pMerge->aInput[0].iCell = 0;
        pMerge->nInput++;
        lsmFree(pDb->pEnv, pLvl->pMerge);
        pLvl->pMerge = pMerge;

        sortedFreeLevel(pDb->pEnv, pNew);
        pDb->pWorker->pLevel = pLvl;
      }
    }
  }

  return rc;
}

int lsmSaveWorker(lsm_db *pDb, int bFlush){
  Snapshot *p = pDb->pWorker;
  if( p->freelist.nEntry>pDb->nMaxFreelist ){
    int rc = sortedNewFreelistOnly(pDb);
    if( rc!=LSM_OK ) return rc;
  }
  return lsmCheckpointSaveWorker(pDb, bFlush);
}

static int doLsmSingleWork(
  lsm_db *pDb, 
  int bShutdown,
  int nMerge,                     /* Minimum segments to merge together */
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
  int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
){
  Snapshot *pWorker;              /* Worker snapshot */
  int rc = LSM_OK;                /* Return code */
  int bDirty = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);
  if( rc!=LSM_OK ) return rc;
  pWorker = pDb->pWorker;

  /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  ** that this call stops writing when the auto-checkpoint is due. The
  ** caller will do the checkpoint, then possibly call this function again. */
  if( bShutdown==0 && pDb->nAutockpt ){
    u32 nSync;
    u32 nUnsync;
4873
4874
4875
4876
4877
4878
4879
4880
4881
4882
4883
4884
4885
4886
4887
4888
4889
4890
4891
4892
4893
4894
4895
4896







4897
4898
4899
4900
4901
4902
4903
  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
    int nPg = 0;
    while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
      rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
      nRem -= nPg;
    }
    if( rc==LSM_OK ){
      rc = sortedNewToplevel(pDb, TREE_NONE, &nPg);
    }
    nRem -= nPg;
    if( nPg ) bDirty = 1;
  }

  if( rc==LSM_OK && bDirty ){
    lsmFinishWork(pDb, 0, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcdummy);
  }
  assert( pDb->pWorker==0 );

  if( rc==LSM_OK ){
    *pnWrite = (nMax - nRem);
    *pbCkpt = (bCkpt && nRem<=0);







  }else{
    *pnWrite = 0;
    *pbCkpt = 0;
  }

  return rc;
}







|
















>
>
>
>
>
>
>







4936
4937
4938
4939
4940
4941
4942
4943
4944
4945
4946
4947
4948
4949
4950
4951
4952
4953
4954
4955
4956
4957
4958
4959
4960
4961
4962
4963
4964
4965
4966
4967
4968
4969
4970
4971
4972
4973
  if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
    int nPg = 0;
    while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
      rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
      nRem -= nPg;
    }
    if( rc==LSM_OK ){
      rc = sortedNewFreelistOnly(pDb);
    }
    nRem -= nPg;
    if( nPg ) bDirty = 1;
  }

  if( rc==LSM_OK && bDirty ){
    lsmFinishWork(pDb, 0, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, &rcdummy);
  }
  assert( pDb->pWorker==0 );

  if( rc==LSM_OK ){
    *pnWrite = (nMax - nRem);
    *pbCkpt = (bCkpt && nRem<=0);
    if( nMerge==1 && pDb->nAutockpt>0 && bDirty
     && pWorker->pLevel 
     && pWorker->pLevel->nRight==0 
     && pWorker->pLevel->pNext==0 
    ){
      *pbCkpt = 1;
    }
  }else{
    *pnWrite = 0;
    *pbCkpt = 0;
  }

  return rc;
}