SQLite4
Check-in [29bd2611a6]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove the lsmFsPageWrite() function. So that pages can only be written immediately after they are created - not loaded from the database and then made writable.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: 29bd2611a60d780265a087993c05acf60853cb65
User & Date: dan 2012-10-17 11:31:09
Context
2012-10-19
11:25
Changes to support building b-trees without using the page numbers of unfinished pages. check-in: d54af93981 user: dan tags: compression-hooks
2012-10-17
11:31
Remove the lsmFsPageWrite() function. So that pages can only be written immediately after they are created - not loaded from the database and then made writable. check-in: 29bd2611a6 user: dan tags: compression-hooks
2012-10-16
15:26
Change page numbers to 8-byte numbers (from 4). This is required to support compressed databases, where a page number is a byte offset in the database file. check-in: 5d266a717d user: dan tags: compression-hooks
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

   339    339     TreeHeader treehdr;             /* Local copy of tree-header */
   340    340     u32 aSnapshot[LSM_META_PAGE_SIZE / sizeof(u32)];
   341    341   };
   342    342   
   343    343   struct Segment {
   344    344     Pgno iFirst;                     /* First page of this run */
   345    345     Pgno iLast;                      /* Last page of this run */
   346         -  Pgno iRoot;                     /* Root page number (if any) */
   347         -  int nSize;                      /* Size of this run in pages */
          346  +  Pgno iRoot;                      /* Root page number (if any) */
          347  +  int nSize;                       /* Size of this run in pages */
   348    348   };
   349    349   
   350    350   /*
   351    351   ** iSplitTopic/pSplitKey/nSplitKey:
   352    352   **   If nRight>0, this buffer contains a copy of the largest key that has
   353    353   **   already been written to the left-hand-side of the level.
   354    354   */
................................................................................
   633    633   
   634    634   void lsmSortedSplitkey(lsm_db *, Level *, int *);
   635    635   
   636    636   /* Reading sorted run content. */
   637    637   int lsmFsDbPageGet(FileSystem *, Pgno, Page **);
   638    638   int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **);
   639    639   
   640         -int lsmFsPageWrite(Page *);
   641    640   u8 *lsmFsPageData(Page *, int *);
   642    641   int lsmFsPageRelease(Page *);
   643    642   int lsmFsPagePersist(Page *);
   644    643   void lsmFsPageRef(Page *);
   645    644   Pgno lsmFsPageNumber(Page *);
   646    645   
   647    646   int lsmFsNRead(FileSystem *);
................................................................................
   696    695   
   697    696   int lsmFlushTreeToDisk(lsm_db *pDb);
   698    697   
   699    698   void lsmSortedRemap(lsm_db *pDb);
   700    699   
   701    700   void lsmSortedFreeLevel(lsm_env *pEnv, Level *);
   702    701   
   703         -int lsmSortedFlushDb(lsm_db *);
   704    702   int lsmSortedAdvanceAll(lsm_db *pDb);
   705    703   
   706    704   int lsmSortedLoadMerge(lsm_db *, Level *, u32 *, int *);
   707    705   int lsmSortedLoadFreelist(lsm_db *pDb, void **, int *);
   708    706   
   709    707   void *lsmSortedSplitKey(Level *pLevel, int *pnByte);
   710    708   

Changes to src/lsm_file.c.

  1073   1073     for(i=LSM_APPLIST_SZ-1; iRet==0 && i>=0; i--){
  1074   1074       if( (iRet = aiAppend[i]) ) aiAppend[i] = 0;
  1075   1075     }
  1076   1076     return iRet;
  1077   1077   }
  1078   1078   
  1079   1079   /*
  1080         -** Append a page to file iFile. Return a reference to it. lsmFsPageWrite()
  1081         -** has already been called on the returned reference.
         1080  +** Append a page to file iFile. Set the ref-count to 1 and return a pointer
         1081  +** to it. The page is writable until either lsmFsPagePersist() is called on 
         1082  +** it or the ref-count drops to zero.
  1082   1083   */
  1083   1084   int lsmFsSortedAppend(
  1084   1085     FileSystem *pFS, 
  1085   1086     Snapshot *pSnapshot,
  1086   1087     Segment *p, 
  1087   1088     Page **ppOut
  1088   1089   ){
................................................................................
  1274   1275   ** set *pnData to the size of the meta-page in bytes before returning.
  1275   1276   */
  1276   1277   u8 *lsmFsMetaPageData(MetaPage *pPg, int *pnData){
  1277   1278     if( pnData ) *pnData = pPg->pFS->nMetasize;
  1278   1279     return pPg->aData;
  1279   1280   }
  1280   1281   
  1281         -/*
  1282         -** Notify the file-system that the page needs to be written back to disk
  1283         -** when the reference count next drops to zero.
  1284         -*/
  1285         -int lsmFsPageWrite(Page *pPg){
  1286         -  pPg->flags |= PAGE_DIRTY;
  1287         -  return LSM_OK;
  1288         -}
  1289         -
  1290   1282   /*
  1291   1283   ** Return true if page is currently writable.
  1292   1284   */
  1293   1285   int lsmFsPageWritable(Page *pPg){
  1294   1286     return (pPg->flags & PAGE_DIRTY) ? 1 : 0;
  1295   1287   }
  1296   1288   

Changes to src/lsm_sorted.c.

  2951   2951     Segment *pSeg;
  2952   2952     Hierarchy *p;
  2953   2953    
  2954   2954     pSeg = &pMW->pLevel->lhs;
  2955   2955     p = &pMW->hier;
  2956   2956   
  2957   2957     if( p->apHier==0 && pSeg->iRoot!=0 ){
  2958         -    int bHierReadonly = pMW->pLevel->pMerge->bHierReadonly;
  2959   2958       FileSystem *pFS = pMW->pDb->pFS;
  2960   2959       lsm_env *pEnv = pMW->pDb->pEnv;
  2961   2960       Page **apHier = 0;
  2962   2961       int nHier = 0;
  2963   2962       int iPg = pSeg->iRoot;
         2963  +
         2964  +    assert( pMW->pLevel->pMerge->bHierReadonly );
  2964   2965   
  2965   2966       do {
  2966   2967         Page *pPg = 0;
  2967   2968         u8 *aData;
  2968   2969         int nData;
  2969   2970         int flags;
  2970   2971   
................................................................................
  2977   2978           Page **apNew = (Page **)lsmRealloc(
  2978   2979               pEnv, apHier, sizeof(Page *)*(nHier+1)
  2979   2980           );
  2980   2981           if( apNew==0 ){
  2981   2982             rc = LSM_NOMEM_BKPT;
  2982   2983             break;
  2983   2984           }
  2984         -        if( bHierReadonly==0 ) lsmFsPageWrite(pPg);
  2985   2985           apHier = apNew;
  2986   2986           memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
  2987   2987           nHier++;
  2988   2988   
  2989   2989           apHier[0] = pPg;
  2990   2990           iPg = pageGetPtr(aData, nData);
  2991   2991         }else{
................................................................................
  3032   3032   **
  3033   3033   **   4. The pointer in the page footer of a b-tree page points to a page
  3034   3034   **      that contains keys equal to or larger than the largest key on the
  3035   3035   **      b-tree page.
  3036   3036   **
  3037   3037   ** The reason for having the page footer pointer point to the right-child
  3038   3038   ** (instead of the left) is that doing things this way makes the 
  3039         -** segWriterMoveHierarchy() operation less complicated (since the pointers 
         3039  +** mergeWorkerMoveHierarchy() operation less complicated (since the pointers 
  3040   3040   ** that need to be updated are all stored as fixed-size integers within the 
  3041   3041   ** page footer, not varints in page records).
  3042   3042   **
  3043   3043   ** Records may not span b-tree pages. If this function is called to add a
  3044   3044   ** record larger than (page-size / 4) bytes, then a pointer to the indexed
  3045   3045   ** array page that contains the main record is added to the b-tree instead.
  3046   3046   ** In this case the record format is:
................................................................................
  3229   3229     rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext);
  3230   3230     assert( rc!=LSM_OK || pSeg->iFirst>0 );
  3231   3231   
  3232   3232     if( rc==LSM_OK ){
  3233   3233       u8 *aData;                    /* Data buffer belonging to page pNext */
  3234   3234       int nData;                    /* Size of aData[] in bytes */
  3235   3235   
         3236  +    /* Release the completed output page. */
  3236   3237       lsmFsPageRelease(pMW->pPage);
         3238  +
  3237   3239       pMW->pPage = pNext;
  3238   3240       pMW->pLevel->pMerge->iOutputOff = 0;
  3239         -
  3240   3241       aData = fsPageData(pNext, &nData);
  3241   3242       lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
  3242   3243       lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
  3243   3244       lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);
  3244   3245   
  3245   3246       pMW->nWork++;
  3246   3247     }
................................................................................
  3337   3338     ** subsequent pages.
  3338   3339     **
  3339   3340     ** The header space is:
  3340   3341     **
  3341   3342     **     1) record type - 1 byte.
  3342   3343     **     2) Page-pointer-offset - 1 varint
  3343   3344     **     3) Key size - 1 varint
  3344         -  **     4) Value size - 1 varint (SORTED_WRITE only)
         3345  +  **     4) Value size - 1 varint (only if LSM_INSERT flag is set)
  3345   3346     */
  3346   3347     rc = lsmMCursorValue(pCsr, &pVal, &nVal);
  3347   3348     if( rc==LSM_OK ){
  3348   3349       nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
  3349   3350       if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);
  3350   3351   
  3351   3352       /* If the entire header will not fit on page pPg, or if page pPg is 
  3352   3353        ** marked read-only, advance to the next page of the output run. */
  3353   3354       iOff = pMerge->iOutputOff;
  3354   3355       if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
  3355   3356         iFPtr = *pCsr->pPrevMergePtr;
  3356   3357         iRPtr = iPtr - iFPtr;
  3357         -
  3358   3358         iOff = 0;
  3359   3359         nRec = 0;
  3360   3360         rc = mergeWorkerNextPage(pMW, iFPtr);
  3361   3361         pPg = pMW->pPage;
  3362   3362       }
  3363   3363     }
  3364   3364   
................................................................................
  3476   3476       /* Store the location of the split-key */
  3477   3477       iPtr = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
  3478   3478       if( iPtr<pCsr->nPtr ){
  3479   3479         pMerge->splitkey = pMerge->aInput[iPtr];
  3480   3480       }else{
  3481   3481         btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
  3482   3482       }
         3483  +    
         3484  +    pMerge->iOutputOff = -1;
         3485  +    pMerge->bHierReadonly = 1;
  3483   3486     }
  3484   3487   
  3485   3488     lsmMCursorClose(pCsr);
  3486   3489     lsmFsPageRelease(pMW->pPage);
  3487   3490   
  3488   3491     for(i=0; i<2; i++){
  3489   3492       Hierarchy *p = &pMW->hier;
................................................................................
  3498   3501     }
  3499   3502   
  3500   3503     pMW->pCsr = 0;
  3501   3504     pMW->pPage = 0;
  3502   3505     pMW->pPage = 0;
  3503   3506   }
  3504   3507   
         3508  +/*
         3509  +** The MergeWorker passed as the only argument is working to merge two or
         3510  +** more existing segments together (not to flush an in-memory tree). It
         3511  +** has not yet written the first key to the first page of the output.
         3512  +*/
  3505   3513   static int mergeWorkerFirstPage(MergeWorker *pMW){
  3506   3514     int rc;                         /* Return code */
  3507   3515     Page *pPg = 0;                  /* First page of run pSeg */
  3508   3516     int iFPtr;                      /* Pointer value read from footer of pPg */
  3509   3517     MultiCursor *pCsr = pMW->pCsr;
  3510   3518   
  3511   3519     assert( pMW->pPage==0 );
................................................................................
  3885   3893         rc = lsmFsDbPageNext(pSeg, pPg, -1, &pNext);
  3886   3894         lsmFsPageRelease(pPg);
  3887   3895         pPg = pNext;
  3888   3896       }
  3889   3897   
  3890   3898       if( rc==LSM_OK ){
  3891   3899         pMW->pPage = pPg;
  3892         -      if( pLevel->pMerge->iOutputOff>=0 ) rc = lsmFsPageWrite(pPg);
         3900  +      assert( pLevel->pMerge->iOutputOff<0 );
  3893   3901       }
  3894   3902     }
  3895   3903     return rc;
  3896   3904   }
  3897   3905   
  3898   3906   static int mergeWorkerInit(
  3899   3907     lsm_db *pDb,                    /* Db connection to do merge work */
................................................................................
  4335   4343       }
  4336   4344       if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
  4337   4345         rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, 0);
  4338   4346       }
  4339   4347     }
  4340   4348   
  4341   4349     if( rc==LSM_OK && (nRem!=nMax) ){
  4342         -    rc = lsmSortedFlushDb(pDb);
  4343   4350       lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  4344   4351     }else{
  4345   4352       int rcdummy = LSM_BUSY;
  4346   4353       assert( rc!=LSM_OK || bFlush==0 );
  4347   4354       lsmFinishWork(pDb, 0, 0, &rcdummy);
  4348   4355     }
  4349   4356     assert( pDb->pWorker==0 );
................................................................................
  4903   4910   
  4904   4911     for(p=pLevel; p; p=pNext){
  4905   4912       pNext = p->pNext;
  4906   4913       sortedFreeLevel(pEnv, p);
  4907   4914     }
  4908   4915   }
  4909   4916   
  4910         -int lsmSortedFlushDb(lsm_db *pDb){
  4911         -  int rc = LSM_OK;
  4912         -  Level *p;
  4913         -
  4914         -  assert( pDb->pWorker );
  4915         -  for(p=lsmDbSnapshotLevel(pDb->pWorker); p && rc==LSM_OK; p=p->pNext){
  4916         -    Merge *pMerge = p->pMerge;
  4917         -    if( pMerge ){
  4918         -      pMerge->iOutputOff = -1;
  4919         -      pMerge->bHierReadonly = 1;
  4920         -    }
  4921         -  }
  4922         -
  4923         -  return LSM_OK;
  4924         -}
  4925         -
  4926   4917   void lsmSortedSaveTreeCursors(lsm_db *pDb){
  4927   4918     MultiCursor *pCsr;
  4928   4919     for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
  4929   4920       lsmTreeCursorSave(pCsr->apTreeCsr[0]);
  4930   4921       lsmTreeCursorSave(pCsr->apTreeCsr[1]);
  4931   4922     }
  4932   4923   }