Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove the lsmFsPageWrite() function. So that pages can only be written immediately after they are created - not loaded from the database and then made writable.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: 29bd2611a60d780265a087993c05acf60853cb65
User & Date: dan 2012-10-17 11:31:09.498
Context
2012-10-19
11:25
Changes to support building b-trees without using the page numbers of unfinished pages. check-in: d54af93981 user: dan tags: compression-hooks
2012-10-17
11:31
Remove the lsmFsPageWrite() function. So that pages can only be written immediately after they are created - not loaded from the database and then made writable. check-in: 29bd2611a6 user: dan tags: compression-hooks
2012-10-16
15:26
Change page numbers to 8-byte numbers (from 4). This is required to support compressed databases, where a page number is a byte offset in the database file. check-in: 5d266a717d user: dan tags: compression-hooks
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsmInt.h.
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
  TreeHeader treehdr;             /* Local copy of tree-header */
  u32 aSnapshot[LSM_META_PAGE_SIZE / sizeof(u32)];
};

struct Segment {
  Pgno iFirst;                     /* First page of this run */
  Pgno iLast;                      /* Last page of this run */
  Pgno iRoot;                     /* Root page number (if any) */
  int nSize;                      /* Size of this run in pages */
};

/*
** iSplitTopic/pSplitKey/nSplitKey:
**   If nRight>0, this buffer contains a copy of the largest key that has
**   already been written to the left-hand-side of the level.
*/







|
|







339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
  TreeHeader treehdr;             /* Local copy of tree-header */
  u32 aSnapshot[LSM_META_PAGE_SIZE / sizeof(u32)];
};

struct Segment {
  Pgno iFirst;                     /* First page of this run */
  Pgno iLast;                      /* Last page of this run */
  Pgno iRoot;                      /* Root page number (if any) */
  int nSize;                       /* Size of this run in pages */
};

/*
** iSplitTopic/pSplitKey/nSplitKey:
**   If nRight>0, this buffer contains a copy of the largest key that has
**   already been written to the left-hand-side of the level.
*/
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647

void lsmSortedSplitkey(lsm_db *, Level *, int *);

/* Reading sorted run content. */
int lsmFsDbPageGet(FileSystem *, Pgno, Page **);
int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **);

int lsmFsPageWrite(Page *);
u8 *lsmFsPageData(Page *, int *);
int lsmFsPageRelease(Page *);
int lsmFsPagePersist(Page *);
void lsmFsPageRef(Page *);
Pgno lsmFsPageNumber(Page *);

int lsmFsNRead(FileSystem *);







<







633
634
635
636
637
638
639

640
641
642
643
644
645
646

void lsmSortedSplitkey(lsm_db *, Level *, int *);

/* Reading sorted run content. */
int lsmFsDbPageGet(FileSystem *, Pgno, Page **);
int lsmFsDbPageNext(Segment *, Page *, int eDir, Page **);


u8 *lsmFsPageData(Page *, int *);
int lsmFsPageRelease(Page *);
int lsmFsPagePersist(Page *);
void lsmFsPageRef(Page *);
Pgno lsmFsPageNumber(Page *);

int lsmFsNRead(FileSystem *);
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710

int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);

void lsmSortedFreeLevel(lsm_env *pEnv, Level *);

int lsmSortedFlushDb(lsm_db *);
int lsmSortedAdvanceAll(lsm_db *pDb);

int lsmSortedLoadMerge(lsm_db *, Level *, u32 *, int *);
int lsmSortedLoadFreelist(lsm_db *pDb, void **, int *);

void *lsmSortedSplitKey(Level *pLevel, int *pnByte);








<







695
696
697
698
699
700
701

702
703
704
705
706
707
708

int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);

void lsmSortedFreeLevel(lsm_env *pEnv, Level *);


int lsmSortedAdvanceAll(lsm_db *pDb);

int lsmSortedLoadMerge(lsm_db *, Level *, u32 *, int *);
int lsmSortedLoadFreelist(lsm_db *pDb, void **, int *);

void *lsmSortedSplitKey(Level *pLevel, int *pnByte);

Changes to src/lsm_file.c.
1073
1074
1075
1076
1077
1078
1079
1080
1081

1082
1083
1084
1085
1086
1087
1088
  for(i=LSM_APPLIST_SZ-1; iRet==0 && i>=0; i--){
    if( (iRet = aiAppend[i]) ) aiAppend[i] = 0;
  }
  return iRet;
}

/*
** Append a page to file iFile. Return a reference to it. lsmFsPageWrite()
** has already been called on the returned reference.

*/
int lsmFsSortedAppend(
  FileSystem *pFS, 
  Snapshot *pSnapshot,
  Segment *p, 
  Page **ppOut
){







|
|
>







1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
  for(i=LSM_APPLIST_SZ-1; iRet==0 && i>=0; i--){
    if( (iRet = aiAppend[i]) ) aiAppend[i] = 0;
  }
  return iRet;
}

/*
** Append a page to file iFile. Set the ref-count to 1 and return a pointer
** to it. The page is writable until either lsmFsPagePersist() is called on 
** it or the ref-count drops to zero.
*/
int lsmFsSortedAppend(
  FileSystem *pFS, 
  Snapshot *pSnapshot,
  Segment *p, 
  Page **ppOut
){
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
** set *pnData to the size of the meta-page in bytes before returning.
*/
u8 *lsmFsMetaPageData(MetaPage *pPg, int *pnData){
  if( pnData ) *pnData = pPg->pFS->nMetasize;
  return pPg->aData;
}

/*
** Notify the file-system that the page needs to be written back to disk
** when the reference count next drops to zero.
*/
int lsmFsPageWrite(Page *pPg){
  pPg->flags |= PAGE_DIRTY;
  return LSM_OK;
}

/*
** Return true if page is currently writable.
*/
int lsmFsPageWritable(Page *pPg){
  return (pPg->flags & PAGE_DIRTY) ? 1 : 0;
}








<
<
<
<
<
<
<
<
<







1275
1276
1277
1278
1279
1280
1281









1282
1283
1284
1285
1286
1287
1288
** set *pnData to the size of the meta-page in bytes before returning.
*/
u8 *lsmFsMetaPageData(MetaPage *pPg, int *pnData){
  if( pnData ) *pnData = pPg->pFS->nMetasize;
  return pPg->aData;
}










/*
** Return true if page is currently writable.
*/
int lsmFsPageWritable(Page *pPg){
  return (pPg->flags & PAGE_DIRTY) ? 1 : 0;
}

Changes to src/lsm_sorted.c.
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963


2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
  Segment *pSeg;
  Hierarchy *p;
 
  pSeg = &pMW->pLevel->lhs;
  p = &pMW->hier;

  if( p->apHier==0 && pSeg->iRoot!=0 ){
    int bHierReadonly = pMW->pLevel->pMerge->bHierReadonly;
    FileSystem *pFS = pMW->pDb->pFS;
    lsm_env *pEnv = pMW->pDb->pEnv;
    Page **apHier = 0;
    int nHier = 0;
    int iPg = pSeg->iRoot;



    do {
      Page *pPg = 0;
      u8 *aData;
      int nData;
      int flags;

      rc = lsmFsDbPageGet(pFS, iPg, &pPg);
      if( rc!=LSM_OK ) break;

      aData = fsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( flags&SEGMENT_BTREE_FLAG ){
        Page **apNew = (Page **)lsmRealloc(
            pEnv, apHier, sizeof(Page *)*(nHier+1)
        );
        if( apNew==0 ){
          rc = LSM_NOMEM_BKPT;
          break;
        }
        if( bHierReadonly==0 ) lsmFsPageWrite(pPg);
        apHier = apNew;
        memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
        nHier++;

        apHier[0] = pPg;
        iPg = pageGetPtr(aData, nData);
      }else{







<





>
>




















<







2951
2952
2953
2954
2955
2956
2957

2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984

2985
2986
2987
2988
2989
2990
2991
  Segment *pSeg;
  Hierarchy *p;
 
  pSeg = &pMW->pLevel->lhs;
  p = &pMW->hier;

  if( p->apHier==0 && pSeg->iRoot!=0 ){

    FileSystem *pFS = pMW->pDb->pFS;
    lsm_env *pEnv = pMW->pDb->pEnv;
    Page **apHier = 0;
    int nHier = 0;
    int iPg = pSeg->iRoot;

    assert( pMW->pLevel->pMerge->bHierReadonly );

    do {
      Page *pPg = 0;
      u8 *aData;
      int nData;
      int flags;

      rc = lsmFsDbPageGet(pFS, iPg, &pPg);
      if( rc!=LSM_OK ) break;

      aData = fsPageData(pPg, &nData);
      flags = pageGetFlags(aData, nData);
      if( flags&SEGMENT_BTREE_FLAG ){
        Page **apNew = (Page **)lsmRealloc(
            pEnv, apHier, sizeof(Page *)*(nHier+1)
        );
        if( apNew==0 ){
          rc = LSM_NOMEM_BKPT;
          break;
        }

        apHier = apNew;
        memmove(&apHier[1], &apHier[0], sizeof(Page *) * nHier);
        nHier++;

        apHier[0] = pPg;
        iPg = pageGetPtr(aData, nData);
      }else{
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
**
**   4. The pointer in the page footer of a b-tree page points to a page
**      that contains keys equal to or larger than the largest key on the
**      b-tree page.
**
** The reason for having the page footer pointer point to the right-child
** (instead of the left) is that doing things this way makes the 
** segWriterMoveHierarchy() operation less complicated (since the pointers 
** that need to be updated are all stored as fixed-size integers within the 
** page footer, not varints in page records).
**
** Records may not span b-tree pages. If this function is called to add a
** record larger than (page-size / 4) bytes, then a pointer to the indexed
** array page that contains the main record is added to the b-tree instead.
** In this case the record format is:







|







3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
**
**   4. The pointer in the page footer of a b-tree page points to a page
**      that contains keys equal to or larger than the largest key on the
**      b-tree page.
**
** The reason for having the page footer pointer point to the right-child
** (instead of the left) is that doing things this way makes the 
** mergeWorkerMoveHierarchy() operation less complicated (since the pointers 
** that need to be updated are all stored as fixed-size integers within the 
** page footer, not varints in page records).
**
** Records may not span b-tree pages. If this function is called to add a
** record larger than (page-size / 4) bytes, then a pointer to the indexed
** array page that contains the main record is added to the b-tree instead.
** In this case the record format is:
3229
3230
3231
3232
3233
3234
3235

3236

3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
  rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext);
  assert( rc!=LSM_OK || pSeg->iFirst>0 );

  if( rc==LSM_OK ){
    u8 *aData;                    /* Data buffer belonging to page pNext */
    int nData;                    /* Size of aData[] in bytes */


    lsmFsPageRelease(pMW->pPage);

    pMW->pPage = pNext;
    pMW->pLevel->pMerge->iOutputOff = 0;

    aData = fsPageData(pNext, &nData);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);

    pMW->nWork++;
  }







>

>


<







3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240

3241
3242
3243
3244
3245
3246
3247
  rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext);
  assert( rc!=LSM_OK || pSeg->iFirst>0 );

  if( rc==LSM_OK ){
    u8 *aData;                    /* Data buffer belonging to page pNext */
    int nData;                    /* Size of aData[] in bytes */

    /* Release the completed output page. */
    lsmFsPageRelease(pMW->pPage);

    pMW->pPage = pNext;
    pMW->pLevel->pMerge->iOutputOff = 0;

    aData = fsPageData(pNext, &nData);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);

    pMW->nWork++;
  }
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364
  ** subsequent pages.
  **
  ** The header space is:
  **
  **     1) record type - 1 byte.
  **     2) Page-pointer-offset - 1 varint
  **     3) Key size - 1 varint
  **     4) Value size - 1 varint (SORTED_WRITE only)
  */
  rc = lsmMCursorValue(pCsr, &pVal, &nVal);
  if( rc==LSM_OK ){
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
     ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = *pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;

      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;
    }
  }








|












<







3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357

3358
3359
3360
3361
3362
3363
3364
  ** subsequent pages.
  **
  ** The header space is:
  **
  **     1) record type - 1 byte.
  **     2) Page-pointer-offset - 1 varint
  **     3) Key size - 1 varint
  **     4) Value size - 1 varint (only if LSM_INSERT flag is set)
  */
  rc = lsmMCursorValue(pCsr, &pVal, &nVal);
  if( rc==LSM_OK ){
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
     ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
      iFPtr = *pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;

      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;
    }
  }

3476
3477
3478
3479
3480
3481
3482



3483
3484
3485
3486
3487
3488
3489
    /* Store the location of the split-key */
    iPtr = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
    if( iPtr<pCsr->nPtr ){
      pMerge->splitkey = pMerge->aInput[iPtr];
    }else{
      btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
    }



  }

  lsmMCursorClose(pCsr);
  lsmFsPageRelease(pMW->pPage);

  for(i=0; i<2; i++){
    Hierarchy *p = &pMW->hier;







>
>
>







3476
3477
3478
3479
3480
3481
3482
3483
3484
3485
3486
3487
3488
3489
3490
3491
3492
    /* Store the location of the split-key */
    iPtr = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
    if( iPtr<pCsr->nPtr ){
      pMerge->splitkey = pMerge->aInput[iPtr];
    }else{
      btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
    }
    
    pMerge->iOutputOff = -1;
    pMerge->bHierReadonly = 1;
  }

  lsmMCursorClose(pCsr);
  lsmFsPageRelease(pMW->pPage);

  for(i=0; i<2; i++){
    Hierarchy *p = &pMW->hier;
3498
3499
3500
3501
3502
3503
3504





3505
3506
3507
3508
3509
3510
3511
  }

  pMW->pCsr = 0;
  pMW->pPage = 0;
  pMW->pPage = 0;
}






static int mergeWorkerFirstPage(MergeWorker *pMW){
  int rc;                         /* Return code */
  Page *pPg = 0;                  /* First page of run pSeg */
  int iFPtr;                      /* Pointer value read from footer of pPg */
  MultiCursor *pCsr = pMW->pCsr;

  assert( pMW->pPage==0 );







>
>
>
>
>







3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514
3515
3516
3517
3518
3519
  }

  pMW->pCsr = 0;
  pMW->pPage = 0;
  pMW->pPage = 0;
}

/*
** The MergeWorker passed as the only argument is working to merge two or
** more existing segments together (not to flush an in-memory tree). It
** has not yet written the first key to the first page of the output.
*/
static int mergeWorkerFirstPage(MergeWorker *pMW){
  int rc;                         /* Return code */
  Page *pPg = 0;                  /* First page of run pSeg */
  int iFPtr;                      /* Pointer value read from footer of pPg */
  MultiCursor *pCsr = pMW->pCsr;

  assert( pMW->pPage==0 );
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
      rc = lsmFsDbPageNext(pSeg, pPg, -1, &pNext);
      lsmFsPageRelease(pPg);
      pPg = pNext;
    }

    if( rc==LSM_OK ){
      pMW->pPage = pPg;
      if( pLevel->pMerge->iOutputOff>=0 ) rc = lsmFsPageWrite(pPg);
    }
  }
  return rc;
}

static int mergeWorkerInit(
  lsm_db *pDb,                    /* Db connection to do merge work */







|







3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
      rc = lsmFsDbPageNext(pSeg, pPg, -1, &pNext);
      lsmFsPageRelease(pPg);
      pPg = pNext;
    }

    if( rc==LSM_OK ){
      pMW->pPage = pPg;
      assert( pLevel->pMerge->iOutputOff<0 );
    }
  }
  return rc;
}

static int mergeWorkerInit(
  lsm_db *pDb,                    /* Db connection to do merge work */
4335
4336
4337
4338
4339
4340
4341
4342
4343
4344
4345
4346
4347
4348
4349
    }
    if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
      rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, 0);
    }
  }

  if( rc==LSM_OK && (nRem!=nMax) ){
    rc = lsmSortedFlushDb(pDb);
    lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    assert( rc!=LSM_OK || bFlush==0 );
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  assert( pDb->pWorker==0 );







<







4343
4344
4345
4346
4347
4348
4349

4350
4351
4352
4353
4354
4355
4356
    }
    if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
      rc = sortedNewToplevel(pDb, TREE_NONE, &nOvfl, 0);
    }
  }

  if( rc==LSM_OK && (nRem!=nMax) ){

    lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  }else{
    int rcdummy = LSM_BUSY;
    assert( rc!=LSM_OK || bFlush==0 );
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  assert( pDb->pWorker==0 );
4903
4904
4905
4906
4907
4908
4909
4910
4911
4912
4913
4914
4915
4916
4917
4918
4919
4920
4921
4922
4923
4924
4925
4926
4927
4928
4929
4930
4931
4932

  for(p=pLevel; p; p=pNext){
    pNext = p->pNext;
    sortedFreeLevel(pEnv, p);
  }
}

int lsmSortedFlushDb(lsm_db *pDb){
  int rc = LSM_OK;
  Level *p;

  assert( pDb->pWorker );
  for(p=lsmDbSnapshotLevel(pDb->pWorker); p && rc==LSM_OK; p=p->pNext){
    Merge *pMerge = p->pMerge;
    if( pMerge ){
      pMerge->iOutputOff = -1;
      pMerge->bHierReadonly = 1;
    }
  }

  return LSM_OK;
}

void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->apTreeCsr[0]);
    lsmTreeCursorSave(pCsr->apTreeCsr[1]);
  }
}







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







4910
4911
4912
4913
4914
4915
4916
















4917
4918
4919
4920
4921
4922
4923

  for(p=pLevel; p; p=pNext){
    pNext = p->pNext;
    sortedFreeLevel(pEnv, p);
  }
}

















void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->apTreeCsr[0]);
    lsmTreeCursorSave(pCsr->apTreeCsr[1]);
  }
}