SQLite4
Check-in [89b4286682]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:If a free-list-only segment is generated while a merge of the top-level segment is underway, add the new segment to the merge inputs immediately. Also, if auto-checkpoints are enabled, schedule a checkpoint after each block is moved within an lsm_work(nmerge=1) call.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | block-redirects
Files: files | file ages | folders
SHA1: 89b4286682648710710d9c150bb127c16eb78501
User & Date: dan 2013-01-21 16:53:00
Context
2013-01-21
19:50
Add tests for block-redirects to lsmtest. check-in: eec16b0f2f user: dan tags: block-redirects
16:53
If a free-list-only segment is generated while a merge of the top-level segment is underway, add the new segment to the merge inputs immediately. Also, if auto-checkpoints are enabled, schedule a checkpoint after each block is moved within an lsm_work(nmerge=1) call. check-in: 89b4286682 user: dan tags: block-redirects
14:23
Fix LSM_INFO_PAGE_DUMP so that it works with redirected blocks. check-in: deb0ccacd4 user: dan tags: block-redirects
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/lsm_sorted.c.

  4775   4775         bRet = 0;
  4776   4776       }
  4777   4777       *pRc = rc;
  4778   4778     }
  4779   4779     assert( *pRc==LSM_OK || bRet==0 );
  4780   4780     return bRet;
  4781   4781   }
         4782  +
         4783  +/*
         4784  +** Create a new free-list only top-level segment. Return LSM_OK if successful
         4785  +** or an LSM error code if some error occurs.
         4786  +*/
         4787  +static int sortedNewFreelistOnly(lsm_db *pDb){
         4788  +  int rc;
         4789  +  Level *pLvl;
         4790  +
         4791  +  pLvl = pDb->pWorker->pLevel;
         4792  +  if( pLvl->pNext!=0 || pLvl->nRight==0 ){
         4793  +    pLvl = 0;
         4794  +  }
         4795  +
         4796  +  rc = sortedNewToplevel(pDb, TREE_NONE, 0);
         4797  +  if( rc==LSM_OK && pLvl ){
         4798  +    Level *pNew = pDb->pWorker->pLevel;
         4799  +    assert( pNew->pNext==pLvl );
         4800  +
         4801  +    if( pLvl->pSplitKey==0 ){
         4802  +      sortedSplitkey(pDb, pLvl, &rc);
         4803  +    }
         4804  +    if( rc==LSM_OK && pLvl->iSplitTopic==0 ){
         4805  +      /* Add the new top-level to the rhs of pLvl. */
         4806  +      Merge *pMerge;
         4807  +
         4808  +      Segment *aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, 
         4809  +          sizeof(Segment) * (pLvl->nRight + 1), &rc
         4810  +      );
         4811  +      if( rc==LSM_OK ){
         4812  +        memcpy(&aRhs[1], pLvl->aRhs, sizeof(Segment) * pLvl->nRight);
         4813  +        aRhs[0] = pNew->lhs;
         4814  +        lsmFree(pDb->pEnv, pLvl->aRhs);
         4815  +        pLvl->aRhs = aRhs;
         4816  +        pLvl->nRight++;
         4817  +      }
         4818  +
         4819  +      /* Also add an entry to the Merge object */
         4820  +      pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, 
         4821  +          sizeof(Merge) + sizeof(MergeInput)*(pLvl->pMerge->nInput+1), &rc
         4822  +      );
         4823  +      if( rc==LSM_OK ){
         4824  +        memcpy(pMerge, pLvl->pMerge, sizeof(Merge));
         4825  +        pMerge->aInput = (MergeInput *)&pMerge[1];
         4826  +        memcpy(&pMerge->aInput[1], pLvl->pMerge->aInput, 
         4827  +            sizeof(MergeInput)*(pLvl->pMerge->nInput+1)
         4828  +        );
         4829  +        pMerge->aInput[0].iPg = aRhs[0].iFirst;
         4830  +        pMerge->aInput[0].iCell = 0;
         4831  +        pMerge->nInput++;
         4832  +        lsmFree(pDb->pEnv, pLvl->pMerge);
         4833  +        pLvl->pMerge = pMerge;
         4834  +
         4835  +        sortedFreeLevel(pDb->pEnv, pNew);
         4836  +        pDb->pWorker->pLevel = pLvl;
         4837  +      }
         4838  +    }
         4839  +  }
         4840  +
         4841  +  return rc;
         4842  +}
  4782   4843   
  4783   4844   int lsmSaveWorker(lsm_db *pDb, int bFlush){
  4784   4845     Snapshot *p = pDb->pWorker;
  4785   4846     if( p->freelist.nEntry>pDb->nMaxFreelist ){
  4786         -    int rc = sortedNewToplevel(pDb, TREE_NONE, 0);
         4847  +    int rc = sortedNewFreelistOnly(pDb);
  4787   4848       if( rc!=LSM_OK ) return rc;
  4788   4849     }
  4789   4850     return lsmCheckpointSaveWorker(pDb, bFlush);
  4790   4851   }
  4791   4852   
  4792   4853   static int doLsmSingleWork(
  4793   4854     lsm_db *pDb, 
  4794   4855     int bShutdown,
  4795   4856     int nMerge,                     /* Minimum segments to merge together */
  4796   4857     int nPage,                      /* Number of pages to write to disk */
  4797   4858     int *pnWrite,                   /* OUT: Pages actually written to disk */
  4798   4859     int *pbCkpt                     /* OUT: True if an auto-checkpoint is req. */
  4799   4860   ){
         4861  +  Snapshot *pWorker;              /* Worker snapshot */
  4800   4862     int rc = LSM_OK;                /* Return code */
  4801   4863     int bDirty = 0;
  4802   4864     int nMax = nPage;               /* Maximum pages to write to disk */
  4803   4865     int nRem = nPage;
  4804   4866     int bCkpt = 0;
  4805   4867   
  4806   4868     /* Open the worker 'transaction'. It will be closed before this function
  4807   4869     ** returns.  */
  4808   4870     assert( pDb->pWorker==0 );
  4809   4871     rc = lsmBeginWork(pDb);
  4810   4872     if( rc!=LSM_OK ) return rc;
         4873  +  pWorker = pDb->pWorker;
  4811   4874   
  4812   4875     /* If this connection is doing auto-checkpoints, set nMax (and nRem) so
  4813   4876     ** that this call stops writing when the auto-checkpoint is due. The
  4814   4877     ** caller will do the checkpoint, then possibly call this function again. */
  4815   4878     if( bShutdown==0 && pDb->nAutockpt ){
  4816   4879       u32 nSync;
  4817   4880       u32 nUnsync;
................................................................................
  4873   4936     if( rc==LSM_OK && pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist ){
  4874   4937       int nPg = 0;
  4875   4938       while( rc==LSM_OK && lsmDatabaseFull(pDb) ){
  4876   4939         rc = sortedWork(pDb, 16, nMerge, 1, &nPg);
  4877   4940         nRem -= nPg;
  4878   4941       }
  4879   4942       if( rc==LSM_OK ){
  4880         -      rc = sortedNewToplevel(pDb, TREE_NONE, &nPg);
         4943  +      rc = sortedNewFreelistOnly(pDb);
  4881   4944       }
  4882   4945       nRem -= nPg;
  4883   4946       if( nPg ) bDirty = 1;
  4884   4947     }
  4885   4948   
  4886   4949     if( rc==LSM_OK && bDirty ){
  4887   4950       lsmFinishWork(pDb, 0, &rc);
................................................................................
  4890   4953       lsmFinishWork(pDb, 0, &rcdummy);
  4891   4954     }
  4892   4955     assert( pDb->pWorker==0 );
  4893   4956   
  4894   4957     if( rc==LSM_OK ){
  4895   4958       *pnWrite = (nMax - nRem);
  4896   4959       *pbCkpt = (bCkpt && nRem<=0);
         4960  +    if( nMerge==1 && pDb->nAutockpt>0 && bDirty
         4961  +     && pWorker->pLevel 
         4962  +     && pWorker->pLevel->nRight==0 
         4963  +     && pWorker->pLevel->pNext==0 
         4964  +    ){
         4965  +      *pbCkpt = 1;
         4966  +    }
  4897   4967     }else{
  4898   4968       *pnWrite = 0;
  4899   4969       *pbCkpt = 0;
  4900   4970     }
  4901   4971   
  4902   4972     return rc;
  4903   4973   }