Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Further changes to ensure that a pages page number is not required until after its content has been assembled.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: c03eeda99fb06f7f246098853396e3d9d9402aff
User & Date: dan 2012-10-19 16:16:54.246
Context
2012-10-20
15:57
Change lsm_sorted.c so that it does not use the Segment.iLastPg variable directly. This variables meaning is slightly different for compressed databases. check-in: db2407a7af user: dan tags: compression-hooks
2012-10-19
16:16
Further changes to ensure that a pages page number is not required until after its content has been assembled. check-in: c03eeda99f user: dan tags: compression-hooks
11:25
Changes to support building b-trees without using the page numbers of unfinished pages. check-in: d54af93981 user: dan tags: compression-hooks
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsmInt.h.
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
** A structure describing an ongoing merge. There is an instance of this
** structure for every Level currently undergoing a merge in the worker
** snapshot.
**
** It is assumed that code that uses an instance of this structure has
** access to the associated Level struct.
**
** bHierReadonly:
**   True if the b-tree hierarchy is currently read-only.
**
** iOutputOff:
**   The byte offset to write to next within the last page of the 
**   output segment.
*/
struct MergeInput {
  Pgno iPg;                       /* Page on which next input is stored */
  int iCell;                      /* Cell containing next input to merge */
};
struct Merge {
  int nInput;                     /* Number of input runs being merged */
  MergeInput *aInput;             /* Array nInput entries in size */
  MergeInput splitkey;            /* Location in file of current splitkey */
  int nSkip;                      /* Number of separators entries to skip */
  int iOutputOff;                 /* Write offset on output page */
  Pgno iCurrentPtr;               /* Current pointer value */
  int bHierReadonly;              /* True if b-tree heirarchies are read-only */
};

/* 
** The first argument to this macro is a pointer to a Segment structure.
** Returns true if the structure instance indicates that the separators
** array is valid.
*/







<
<
<















<







369
370
371
372
373
374
375



376
377
378
379
380
381
382
383
384
385
386
387
388
389
390

391
392
393
394
395
396
397
** A structure describing an ongoing merge. There is an instance of this
** structure for every Level currently undergoing a merge in the worker
** snapshot.
**
** It is assumed that code that uses an instance of this structure has
** access to the associated Level struct.
**



** iOutputOff:
**   The byte offset to write to next within the last page of the 
**   output segment.
*/
struct MergeInput {
  Pgno iPg;                       /* Page on which next input is stored */
  int iCell;                      /* Cell containing next input to merge */
};
struct Merge {
  int nInput;                     /* Number of input runs being merged */
  MergeInput *aInput;             /* Array nInput entries in size */
  MergeInput splitkey;            /* Location in file of current splitkey */
  int nSkip;                      /* Number of separators entries to skip */
  int iOutputOff;                 /* Write offset on output page */
  Pgno iCurrentPtr;               /* Current pointer value */

};

/* 
** The first argument to this macro is a pointer to a Segment structure.
** Returns true if the structure instance indicates that the separators
** array is valid.
*/
Changes to src/lsm_ckpt.c.
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
  if( !pMerge ) return LSM_NOMEM_BKPT;
  pLevel->pMerge = pMerge;

  /* Populate the Merge object. */
  pMerge->aInput = (MergeInput *)&pMerge[1];
  pMerge->nInput = nInput;
  pMerge->iOutputOff = -1;
  pMerge->bHierReadonly = 1;
  pMerge->nSkip = (int)aInt[iIn++];
  for(i=0; i<nInput; i++){
    pMerge->aInput[i].iPg = (Pgno)aInt[iIn++];
    pMerge->aInput[i].iCell = (int)aInt[iIn++];
  }
  pMerge->splitkey.iPg = (Pgno)aInt[iIn++];
  pMerge->splitkey.iCell = (int)aInt[iIn++];







<







491
492
493
494
495
496
497

498
499
500
501
502
503
504
  if( !pMerge ) return LSM_NOMEM_BKPT;
  pLevel->pMerge = pMerge;

  /* Populate the Merge object. */
  pMerge->aInput = (MergeInput *)&pMerge[1];
  pMerge->nInput = nInput;
  pMerge->iOutputOff = -1;

  pMerge->nSkip = (int)aInt[iIn++];
  for(i=0; i<nInput; i++){
    pMerge->aInput[i].iPg = (Pgno)aInt[iIn++];
    pMerge->aInput[i].iCell = (int)aInt[iIn++];
  }
  pMerge->splitkey.iPg = (Pgno)aInt[iIn++];
  pMerge->splitkey.iCell = (int)aInt[iIn++];
Changes to src/lsm_file.c.
589
590
591
592
593
594
595

596
597
598
599
600
601
602
  return pPage->aData;
}

/*
** Return the page number of a page.
*/
Pgno lsmFsPageNumber(Page *pPage){

  return pPage ? pPage->iPg : 0;
}

/*
** Page pPg is currently part of the LRU list belonging to pFS. Remove
** it from the list. pPg->pLruNext and pPg->pLruPrev are cleared by this
** operation.







>







589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
  return pPage->aData;
}

/*
** Return the page number of a page.
*/
Pgno lsmFsPageNumber(Page *pPage){
  assert( (pPage->flags & PAGE_DIRTY)==0 );
  return pPage ? pPage->iPg : 0;
}

/*
** Page pPg is currently part of the LRU list belonging to pFS. Remove
** it from the list. pPg->pLruNext and pPg->pLruPrev are cleared by this
** operation.
Changes to src/lsm_sorted.c.
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
  MergeWorker *pMW,               /* Merge worker */
  int bSep                        /* True for separators run */
){
  Segment *pSeg;                  /* Segment being written */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  int rc = LSM_OK;                /* Return code */
  int i;
  Pgno iRight = 0;
  Page **apHier = pMW->hier.apHier;
  int nHier = pMW->hier.nHier;

  assert( nHier>0 && pMW->pLevel->pMerge->bHierReadonly );
  pSeg = &pMW->pLevel->lhs;

  for(i=0; rc==LSM_OK && i<nHier; i++){
    Page *pNew = 0;
    rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNew);
    assert( rc==LSM_OK );








<



<







2901
2902
2903
2904
2905
2906
2907

2908
2909
2910

2911
2912
2913
2914
2915
2916
2917
  MergeWorker *pMW,               /* Merge worker */
  int bSep                        /* True for separators run */
){
  Segment *pSeg;                  /* Segment being written */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  int rc = LSM_OK;                /* Return code */
  int i;

  Page **apHier = pMW->hier.apHier;
  int nHier = pMW->hier.nHier;


  pSeg = &pMW->pLevel->lhs;

  for(i=0; rc==LSM_OK && i<nHier; i++){
    Page *pNew = 0;
    rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNew);
    assert( rc==LSM_OK );

2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
        ** since sometimes n1>n2, the page content and footer must be copied 
        ** separately. */
        int nEntry = pageGetNRec(a2, n2);
        int iEof1 = SEGMENT_EOF(n1, nEntry);
        int iEof2 = SEGMENT_EOF(n2, nEntry);
        memcpy(a1, a2, iEof2);
        memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
        if( iRight ) lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], iRight);
        lsmFsPageRelease(apHier[i]);
        apHier[i] = pNew;
        iRight = lsmFsPageNumber(pNew);
      }else{
        lsmPutU16(&a1[SEGMENT_FLAGS_OFFSET(n1)], SEGMENT_BTREE_FLAG);
        lsmPutU16(&a1[SEGMENT_NRECORD_OFFSET(n1)], 0);
        lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], 0);
        i = i - 1;
        lsmFsPageRelease(pNew);
      }
    }
  }

#ifdef LSM_DEBUG
  if( rc==LSM_OK ){
    for(i=0; i<nHier; i++) assert( lsmFsPageWritable(apHier[i]) );
  }
#endif

  if( rc==LSM_OK ){
    pMW->pLevel->pMerge->bHierReadonly = 0;
  }
  return rc;
}

/*
** Allocate and populate the MergeWorker.apHier[] array.
*/
static int mergeWorkerLoadHierarchy(MergeWorker *pMW){







<


<
















<
<
<







2930
2931
2932
2933
2934
2935
2936

2937
2938

2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954



2955
2956
2957
2958
2959
2960
2961
        ** since sometimes n1>n2, the page content and footer must be copied 
        ** separately. */
        int nEntry = pageGetNRec(a2, n2);
        int iEof1 = SEGMENT_EOF(n1, nEntry);
        int iEof2 = SEGMENT_EOF(n2, nEntry);
        memcpy(a1, a2, iEof2);
        memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);

        lsmFsPageRelease(apHier[i]);
        apHier[i] = pNew;

      }else{
        lsmPutU16(&a1[SEGMENT_FLAGS_OFFSET(n1)], SEGMENT_BTREE_FLAG);
        lsmPutU16(&a1[SEGMENT_NRECORD_OFFSET(n1)], 0);
        lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], 0);
        i = i - 1;
        lsmFsPageRelease(pNew);
      }
    }
  }

#ifdef LSM_DEBUG
  if( rc==LSM_OK ){
    for(i=0; i<nHier; i++) assert( lsmFsPageWritable(apHier[i]) );
  }
#endif




  return rc;
}

/*
** Allocate and populate the MergeWorker.apHier[] array.
*/
static int mergeWorkerLoadHierarchy(MergeWorker *pMW){
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
  if( p->apHier==0 && pSeg->iRoot!=0 ){
    FileSystem *pFS = pMW->pDb->pFS;
    lsm_env *pEnv = pMW->pDb->pEnv;
    Page **apHier = 0;
    int nHier = 0;
    int iPg = pSeg->iRoot;

    assert( pMW->pLevel->pMerge->bHierReadonly );

    do {
      Page *pPg = 0;
      u8 *aData;
      int nData;
      int flags;

      rc = lsmFsDbPageGet(pFS, iPg, &pPg);







<
<







2969
2970
2971
2972
2973
2974
2975


2976
2977
2978
2979
2980
2981
2982
  if( p->apHier==0 && pSeg->iRoot!=0 ){
    FileSystem *pFS = pMW->pDb->pFS;
    lsm_env *pEnv = pMW->pDb->pEnv;
    Page **apHier = 0;
    int nHier = 0;
    int iPg = pSeg->iRoot;



    do {
      Page *pPg = 0;
      u8 *aData;
      int nData;
      int flags;

      rc = lsmFsDbPageGet(pFS, iPg, &pPg);
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
**         + Absolute page number of page containing key (varint).
**
** See function seekInBtree() for the code that traverses b-tree pages.
*/

static int mergeWorkerBtreeWrite(
  MergeWorker *pMW,
  u8 eType, 
  Pgno iPtr,
  Pgno iKeyPg,
  void *pKey,
  int nKey
){
  Segment *pSeg = &pMW->pLevel->lhs;
  Hierarchy *p = &pMW->hier;







|







3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
**         + Absolute page number of page containing key (varint).
**
** See function seekInBtree() for the code that traverses b-tree pages.
*/

static int mergeWorkerBtreeWrite(
  MergeWorker *pMW,
  u8 eType,
  Pgno iPtr,
  Pgno iKeyPg,
  void *pKey,
  int nKey
){
  Segment *pSeg = &pMW->pLevel->lhs;
  Hierarchy *p = &pMW->hier;
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125

3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
  /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree
  ** hierarchy, the root node, and all nodes that lie on the path between.
  ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current
  ** root page.
  **
  ** This loop searches for a node with enough space to store the key on,
  ** starting with the leaf and iterating up towards the root. When the loop
  ** exits, the key may be written to apHier[iLevel].
  */
  for(iLevel=0; iLevel<=p->nHier; iLevel++){
    int nByte;                    /* Number of free bytes required */
    Pgno iRight;                  /* Right hand pointer from aData[]/nData */

    if( iLevel==p->nHier ){
      /* Extend the array and allocate a new root page. */
      Page **aNew;
      aNew = (Page **)lsmRealloc(
          pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
      );
      if( !aNew ){
        return LSM_NOMEM_BKPT;
      }
      p->apHier = aNew;
    }else{
      Page *pOld;
      int nFree;

      /* If the key will fit on this page, break out of the loop. */

      pOld = p->apHier[iLevel];
      assert( lsmFsPageWritable(pOld) );
      aData = fsPageData(pOld, &nData);
      iRight = lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]);
      if( eType==0 ){
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(iKeyPg);
      }else{
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(nKey) + nKey;
      }
      nRec = pageGetNRec(aData, nData);
      nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
      if( nByte<=nFree ) break;

      /* Otherwise, this page is full. Set the right-hand-child pointer
      ** to iPtr and release it.  */







|
<


<















|
>



<

|

|







3089
3090
3091
3092
3093
3094
3095
3096

3097
3098

3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118

3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
  /* The MergeWorker.apHier[] array contains the right-most leaf of the b-tree
  ** hierarchy, the root node, and all nodes that lie on the path between.
  ** apHier[0] is the right-most leaf and apHier[pMW->nHier-1] is the current
  ** root page.
  **
  ** This loop searches for a node with enough space to store the key on,
  ** starting with the leaf and iterating up towards the root. When the loop
  ** exits, the key may be written to apHier[iLevel].  */

  for(iLevel=0; iLevel<=p->nHier; iLevel++){
    int nByte;                    /* Number of free bytes required */


    if( iLevel==p->nHier ){
      /* Extend the array and allocate a new root page. */
      Page **aNew;
      aNew = (Page **)lsmRealloc(
          pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
      );
      if( !aNew ){
        return LSM_NOMEM_BKPT;
      }
      p->apHier = aNew;
    }else{
      Page *pOld;
      int nFree;

      /* If the key will fit on this page, break out of the loop here.
      ** The new entry will be written to page apHier[iLevel]. */
      pOld = p->apHier[iLevel];
      assert( lsmFsPageWritable(pOld) );
      aData = fsPageData(pOld, &nData);

      if( eType==0 ){
        nByte = 2 + 1 + lsmVarintLen32(iPtr) + lsmVarintLen32(iKeyPg);
      }else{
        nByte = 2 + 1 + lsmVarintLen32(iPtr) + lsmVarintLen32(nKey) + nKey;
      }
      nRec = pageGetNRec(aData, nData);
      nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
      if( nByte<=nFree ) break;

      /* Otherwise, this page is full. Set the right-hand-child pointer
      ** to iPtr and release it.  */
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172
    }
    if( rc!=LSM_OK ) return rc;

    aData = fsPageData(p->apHier[iLevel], &nData);
    memset(aData, 0, nData);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    if( iLevel>0 ){
      iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
      lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
    }

    if( iLevel==p->nHier ){
      p->nHier++;
      break;
    }
  }








<
<
<
<







3144
3145
3146
3147
3148
3149
3150




3151
3152
3153
3154
3155
3156
3157
    }
    if( rc!=LSM_OK ) return rc;

    aData = fsPageData(p->apHier[iLevel], &nData);
    memset(aData, 0, nData);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);





    if( iLevel==p->nHier ){
      p->nHier++;
      break;
    }
  }

3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
  }else{
    aData[iOff++] = eType;
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  if( iLevel>0 ){
    Pgno iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
  }

  return rc;
}

static int mergeWorkerBtreeIndirect(MergeWorker *pMW){
  int rc = LSM_OK;
  if( pMW->iIndirect ){
    Pgno iKeyPg = pMW->aSave[1].iPgno;







<
<
<
<
<







3168
3169
3170
3171
3172
3173
3174





3175
3176
3177
3178
3179
3180
3181
  }else{
    aData[iOff++] = eType;
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }






  return rc;
}

static int mergeWorkerBtreeIndirect(MergeWorker *pMW){
  int rc = LSM_OK;
  if( pMW->iIndirect ){
    Pgno iKeyPg = pMW->aSave[1].iPgno;
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275




3276
3277
3278
3279



3280








3281
3282
3283
3284
3285
3286
3287
3288


























3289
3290
3291
3292
3293
3294
3295
  int rc = LSM_OK;                /* Return Code */
  int iLevel;                     /* Level of b-tree hierachy to write to */
  int nData;                      /* Size of aData[] in bytes */
  u8 *aData;                      /* Page data for level iLevel */
  int iOff;                       /* Offset on b-tree page to write record to */
  int nRec;                       /* Initial number of records on b-tree page */
  Pgno iPtr;                      /* Pointer value to accompany pKey/nKey */
  int bIndirect;                  /* True to use an indirect record */

  Hierarchy *p;
  Segment *pSeg;

  /* If there exists a b-tree hierarchy and it is not loaded into 
  ** memory, load it now.  */
  pSeg = &pMW->pLevel->lhs;
  p = &pMW->hier;

  assert( pMW->aSave[0].bStore==0 );
  assert( pMW->aSave[1].bStore==0 );
  rc = mergeWorkerBtreeIndirect(pMW);

  /* Obtain the absolute pointer value to store along with the key in the
  ** page body. This pointer points to a page that contains keys that are
  ** smaller than pKey/nKey.  */
  iPtr = pMW->aSave[0].iPgno;
  assert( iPtr!=0 );

  /* Determine if the indirect format should be used. */
  bIndirect = (nKey*4 > lsmFsPageSize(pMW->pDb->pFS));
  if( bIndirect ){
    pMW->iIndirect = iPtr;
    pMW->aSave[1].bStore = 1;
  }else{
    rc = mergeWorkerBtreeWrite(
        pMW, (u8)(iTopic | LSM_SEPARATOR), iPtr, 0, pKey, nKey
    );
  }

  /* Ensure that the SortedRun.iRoot field is correct. */
  return rc;
}

static int mergeWorkerFinishHierarchy(
  MergeWorker *pMW                /* Merge worker object */
){
  if( pMW->hier.nHier>0 ){
    Page *pPg = pMW->hier.apHier[0];
    int nData;                      /* Size of aData[] in bytes */
    u8 *aData;                      /* Page data for pLeaf */
    Pgno iPtr;

    assert( pPg );
    assert( pMW->aSave[0].bStore==0 );
    iPtr = pMW->aSave[0].iPgno;





    aData = fsPageData(pPg, &nData);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);
  }












  return LSM_OK;
}

static int keyszToSkip(FileSystem *pFS, int nKey){
  int nPgsz;                /* Nominal database page size */
  nPgsz = lsmFsPageSize(pFS);
  return LSM_MIN(((nKey * 4) / nPgsz), 3);
}



























/*
** Advance to the next page of an output run being populated by merge-worker
** pMW. The footer of the new page is initialized to indicate that it contains
** zero records. The flags field is cleared. The page footer pointer field
** is set to iFPtr.
**







<




















|
<















<
<
|
|
|

<
<
|
>
>
>
>



|
>
>
>
|
>
>
>
>
>
>
>
>
|







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







3202
3203
3204
3205
3206
3207
3208

3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229

3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244


3245
3246
3247
3248


3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
  int rc = LSM_OK;                /* Return Code */
  int iLevel;                     /* Level of b-tree hierachy to write to */
  int nData;                      /* Size of aData[] in bytes */
  u8 *aData;                      /* Page data for level iLevel */
  int iOff;                       /* Offset on b-tree page to write record to */
  int nRec;                       /* Initial number of records on b-tree page */
  Pgno iPtr;                      /* Pointer value to accompany pKey/nKey */


  Hierarchy *p;
  Segment *pSeg;

  /* If there exists a b-tree hierarchy and it is not loaded into 
  ** memory, load it now.  */
  pSeg = &pMW->pLevel->lhs;
  p = &pMW->hier;

  assert( pMW->aSave[0].bStore==0 );
  assert( pMW->aSave[1].bStore==0 );
  rc = mergeWorkerBtreeIndirect(pMW);

  /* Obtain the absolute pointer value to store along with the key in the
  ** page body. This pointer points to a page that contains keys that are
  ** smaller than pKey/nKey.  */
  iPtr = pMW->aSave[0].iPgno;
  assert( iPtr!=0 );

  /* Determine if the indirect format should be used. */
  if( (nKey*4 > lsmFsPageSize(pMW->pDb->pFS)) ){

    pMW->iIndirect = iPtr;
    pMW->aSave[1].bStore = 1;
  }else{
    rc = mergeWorkerBtreeWrite(
        pMW, (u8)(iTopic | LSM_SEPARATOR), iPtr, 0, pKey, nKey
    );
  }

  /* Ensure that the SortedRun.iRoot field is correct. */
  return rc;
}

static int mergeWorkerFinishHierarchy(
  MergeWorker *pMW                /* Merge worker object */
){


  int i;                          /* Used to loop through apHier[] */
  int rc = LSM_OK;                /* Return code */
  Pgno iPtr;                      /* New right-hand-child pointer value */



  iPtr = pMW->aSave[0].iPgno;
  for(i=0; i<pMW->hier.nHier && rc==LSM_OK; i++){
    Page *pPg = pMW->hier.apHier[i];
    int nData;                    /* Size of aData[] in bytes */
    u8 *aData;                    /* Page data for pPg */

    aData = fsPageData(pPg, &nData);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iPtr);

    rc = lsmFsPagePersist(pPg);
    iPtr = lsmFsPageNumber(pPg);
    lsmFsPageRelease(pPg);
  }

  if( pMW->hier.nHier ){
    pMW->pLevel->lhs.iRoot = iPtr;
    lsmFree(pMW->pDb->pEnv, pMW->hier.apHier);
    pMW->hier.apHier = 0;
    pMW->hier.nHier = 0;
  }

  return rc;
}

static int keyszToSkip(FileSystem *pFS, int nKey){
  int nPgsz;                /* Nominal database page size */
  nPgsz = lsmFsPageSize(pFS);
  return LSM_MIN(((nKey * 4) / nPgsz), 3);
}

/*
** Release the reference to the current output page of merge-worker *pMW
** (reference pMW->pPage). Set the page number values in aSave[] as 
** required (see comments above struct MergeWorker for details).
*/
static int mergeWorkerPersistAndRelease(MergeWorker *pMW){
  int rc;
  int i;

  /* Persist the page */
  rc = lsmFsPagePersist(pMW->pPage);

  /* If required, save the page number. */
  for(i=0; i<2; i++){
    if( pMW->aSave[i].bStore ){
      pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
      pMW->aSave[i].bStore = 0;
    }
  }

  /* Release the completed output page. */
  lsmFsPageRelease(pMW->pPage);
  pMW->pPage = 0;
  return rc;
}

/*
** Advance to the next page of an output run being populated by merge-worker
** pMW. The footer of the new page is initialized to indicate that it contains
** zero records. The flags field is cleared. The page footer pointer field
** is set to iFPtr.
**
3307
3308
3309
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
  pSeg = &pMW->pLevel->lhs;
  rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext);
  assert( rc!=LSM_OK || pSeg->iFirst>0 );

  if( rc==LSM_OK ){
    u8 *aData;                    /* Data buffer belonging to page pNext */
    int nData;                    /* Size of aData[] in bytes */
    int i;

    lsmFsPagePersist(pMW->pPage);
    for(i=0; i<2; i++){
      if( pMW->aSave[i].bStore ){
        pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
        pMW->aSave[i].bStore = 0;
      }
    }

    /* Release the completed output page. */
    lsmFsPageRelease(pMW->pPage);

    pMW->pPage = pNext;
    pMW->pLevel->pMerge->iOutputOff = 0;
    aData = fsPageData(pNext, &nData);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);

    pMW->nWork++;
  }

  return rc;
}

/*







<

<
<
<
<
<
<
<
|
<
<







<







3322
3323
3324
3325
3326
3327
3328

3329







3330


3331
3332
3333
3334
3335
3336
3337

3338
3339
3340
3341
3342
3343
3344
  pSeg = &pMW->pLevel->lhs;
  rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, &pNext);
  assert( rc!=LSM_OK || pSeg->iFirst>0 );

  if( rc==LSM_OK ){
    u8 *aData;                    /* Data buffer belonging to page pNext */
    int nData;                    /* Size of aData[] in bytes */









    rc = mergeWorkerPersistAndRelease(pMW);



    pMW->pPage = pNext;
    pMW->pLevel->pMerge->iOutputOff = 0;
    aData = fsPageData(pNext, &nData);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);

    pMW->nWork++;
  }

  return rc;
}

/*
3565
3566
3567
3568
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
3599
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609


3610
3611
3612
3613
3614
3615
3616
    if( iPtr<pCsr->nPtr ){
      pMerge->splitkey = pMerge->aInput[iPtr];
    }else{
      btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
    }
    
    pMerge->iOutputOff = -1;
    pMerge->bHierReadonly = 1;
  }

  lsmMCursorClose(pCsr);

  /* Persist and release the output page. */
  rc = lsmFsPagePersist(pMW->pPage);
  for(i=0; i<2; i++){
    if( pMW->aSave[i].bStore ){
      pMW->aSave[i].iPgno = lsmFsPageNumber(pMW->pPage);
      pMW->aSave[i].bStore = 0;
    }
  }
  lsmFsPageRelease(pMW->pPage);

  if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);
  if( rc==LSM_OK && p->nHier ){
    pMW->pLevel->lhs.iRoot = lsmFsPageNumber(p->apHier[p->nHier-1]);
  }

  for(i=0; i<2; i++){
    Hierarchy *p = &pMW->hier;
    int iPg;
    for(iPg=0; iPg<p->nHier; iPg++){
      int rc2 = lsmFsPageRelease(p->apHier[iPg]);
      if( rc==LSM_OK ) rc = rc2;
    }
    lsmFree(pMW->pDb->pEnv, p->apHier);
    p->apHier = 0;
    p->nHier = 0;
  }

  lsmFree(pMW->pDb->pEnv, pMW->aGobble);
  pMW->aGobble = 0;
  pMW->pCsr = 0;
  pMW->pPage = 0;
  pMW->pPage = 0;


}

/*
** The MergeWorker passed as the only argument is working to merge two or
** more existing segments together (not to flush an in-memory tree). It
** has not yet written the first key to the first page of the output.
*/







<





<
<
<
<
<
<
<
<
<
|
|
<
<
<
<
<
<
<
<
<
|
<
<
<
<
<






>
>







3569
3570
3571
3572
3573
3574
3575

3576
3577
3578
3579
3580









3581
3582









3583





3584
3585
3586
3587
3588
3589
3590
3591
3592
3593
3594
3595
3596
3597
3598
    if( iPtr<pCsr->nPtr ){
      pMerge->splitkey = pMerge->aInput[iPtr];
    }else{
      btreeCursorSplitkey(pCsr->pBtCsr, &pMerge->splitkey);
    }
    
    pMerge->iOutputOff = -1;

  }

  lsmMCursorClose(pCsr);

  /* Persist and release the output page. */









  if( rc==LSM_OK ) rc = mergeWorkerPersistAndRelease(pMW);
  if( rc==LSM_OK ) rc = mergeWorkerBtreeIndirect(pMW);









  if( rc==LSM_OK ) rc = mergeWorkerFinishHierarchy(pMW);






  lsmFree(pMW->pDb->pEnv, pMW->aGobble);
  pMW->aGobble = 0;
  pMW->pCsr = 0;
  pMW->pPage = 0;
  pMW->pPage = 0;

  *pRc = rc;
}

/*
** The MergeWorker passed as the only argument is working to merge two or
** more existing segments together (not to flush an in-memory tree). It
** has not yet written the first key to the first page of the output.
*/