SQLite4
Check-in [5d266a717d]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change page numbers to 8-byte numbers (from 4). This is required to support compressed databases, where a page number is a byte offset in the database file.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | compression-hooks
Files: files | file ages | folders
SHA1: 5d266a717da6db072c1a21b11fbc7029ca9e55a0
User & Date: dan 2012-10-16 15:26:52
Context
2012-10-17
11:31
Remove the lsmFsPageWrite() function. So that pages can only be written immediately after they are created - not loaded from the database and then made writable. check-in: 29bd2611a6 user: dan tags: compression-hooks
2012-10-16
15:26
Change page numbers to 8-byte numbers (from 4). This is required to support compressed databases, where a page number is a byte offset in the database file. check-in: 5d266a717d user: dan tags: compression-hooks
2012-10-15
19:36
Merge range-delete branch back into trunk. check-in: a7de625f13 user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsm.h.

29
30
31
32
33
34
35

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
..
76
77
78
79
80
81
82

















83
84
85
86
87
88
89
...
188
189
190
191
192
193
194





195
196
197
198
199
200
201
typedef struct lsm_file lsm_file;           /* OS file handle */

/* 64-bit integer type used for file offsets. */
typedef long long int lsm_i64;              /* 64-bit signed integer type */

/* Forward reference */
typedef struct lsm_env lsm_env;             /* Runtime environment */


/* Candidate values for the 3rd argument to lsm_env.xLock() */
#define LSM_LOCK_UNLOCK 0
#define LSM_LOCK_SHARED 1
#define LSM_LOCK_EXCL   2

/*
** Run-time environment used by LSM
*/
struct lsm_env {
  int nByte;                 /* Size of this structure in bytes */
  int iVersion;              /* Version number of this structure */
  /****** file i/o ***********************************************/
  void *pVfsCtx;
  int (*xFullpath)(lsm_env*, const char *, char *, int *);
  int (*xOpen)(lsm_env*, const char *, lsm_file **);
  int (*xRead)(lsm_file *, lsm_i64, void *, int);
  int (*xWrite)(lsm_file *, lsm_i64, void *, int);
  int (*xTruncate)(lsm_file *, lsm_i64);
................................................................................
  void (*xMutexEnter)(lsm_mutex *);         /* Grab a mutex */
  int (*xMutexTry)(lsm_mutex *);            /* Attempt to obtain a mutex */
  void (*xMutexLeave)(lsm_mutex *);         /* Leave a mutex */
  int (*xMutexHeld)(lsm_mutex *);           /* Return true if mutex is held */
  int (*xMutexNotHeld)(lsm_mutex *);        /* Return true if mutex not held */
  /****** other ****************************************************/
  int (*xSleep)(lsm_env*, int microseconds);


















  /* New fields may be added in future releases, in which case the
  ** iVersion value will increase. */
};

/* 
** Values that may be passed as the second argument to xMutexStatic. 
................................................................................
**     stored elsewhere in the database).
**
**     There is no reason for an application to configure or query this
**     parameter. It is only present because configuring a small value
**     makes certain parts of the lsm code easier to test.
**
**   LSM_CONFIG_MULTIPLE_PROCESSES





*/
#define LSM_CONFIG_WRITE_BUFFER        1
#define LSM_CONFIG_PAGE_SIZE           2
#define LSM_CONFIG_SAFETY              3
#define LSM_CONFIG_BLOCK_SIZE          4
#define LSM_CONFIG_AUTOWORK            5
#define LSM_CONFIG_LOG_SIZE            6







>











|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>







29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
..
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
...
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
typedef struct lsm_file lsm_file;           /* OS file handle */

/* 64-bit integer type used for file offsets. */
typedef long long int lsm_i64;              /* 64-bit signed integer type */

/* Forward reference */
typedef struct lsm_env lsm_env;             /* Runtime environment */
typedef struct lsm_compress lsm_compress;   /* Compression library functions */

/* Candidate values for the 3rd argument to lsm_env.xLock() */
#define LSM_LOCK_UNLOCK 0
#define LSM_LOCK_SHARED 1
#define LSM_LOCK_EXCL   2

/*
** Run-time environment used by LSM
*/
struct lsm_env {
  int nByte;                 /* Size of this structure in bytes */
  int iVersion;              /* Version number of this structure (1) */
  /****** file i/o ***********************************************/
  void *pVfsCtx;
  int (*xFullpath)(lsm_env*, const char *, char *, int *);
  int (*xOpen)(lsm_env*, const char *, lsm_file **);
  int (*xRead)(lsm_file *, lsm_i64, void *, int);
  int (*xWrite)(lsm_file *, lsm_i64, void *, int);
  int (*xTruncate)(lsm_file *, lsm_i64);
................................................................................
  void (*xMutexEnter)(lsm_mutex *);         /* Grab a mutex */
  int (*xMutexTry)(lsm_mutex *);            /* Attempt to obtain a mutex */
  void (*xMutexLeave)(lsm_mutex *);         /* Leave a mutex */
  int (*xMutexHeld)(lsm_mutex *);           /* Return true if mutex is held */
  int (*xMutexNotHeld)(lsm_mutex *);        /* Return true if mutex not held */
  /****** other ****************************************************/
  int (*xSleep)(lsm_env*, int microseconds);

  /* New fields may be added in future releases, in which case the
  ** iVersion value will increase. */
};

/*
** The compression library interface.
*/
struct lsm_compress {
  int nByte;                 /* Size of this structure in bytes */
  int iVersion;              /* Version number of this structure (1) */

  /* Compression library functions */
  void *pCtx;
  int (*xBound)(void *, int nSrc);
  int (*xCompress)(void *, char *, int *, const char *, int);
  int (*xUncompress)(void *, char *, int *, const char *, int);

  /* New fields may be added in future releases, in which case the
  ** iVersion value will increase. */
};

/* 
** Values that may be passed as the second argument to xMutexStatic. 
................................................................................
**     stored elsewhere in the database).
**
**     There is no reason for an application to configure or query this
**     parameter. It is only present because configuring a small value
**     makes certain parts of the lsm code easier to test.
**
**   LSM_CONFIG_MULTIPLE_PROCESSES
**     A read/write boolean parameter. This parameter may only be set before
**     lsm_open() has been called. If true, the library uses shared-memory
**     and posix advisory locks to co-ordinate access by clients from within
**     multiple processes. Otherwise, if false, all database clients must be 
**     located in the same process. The default value is true.
*/
#define LSM_CONFIG_WRITE_BUFFER        1
#define LSM_CONFIG_PAGE_SIZE           2
#define LSM_CONFIG_SAFETY              3
#define LSM_CONFIG_BLOCK_SIZE          4
#define LSM_CONFIG_AUTOWORK            5
#define LSM_CONFIG_LOG_SIZE            6

Changes to src/lsmInt.h.

99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

typedef unsigned char u8;
typedef unsigned short int u16;
typedef unsigned int u32;
typedef lsm_i64 i64;
typedef unsigned long long int u64;

/* A page number is an integer. */
typedef int Pgno;

#ifdef LSM_DEBUG
int lsmErrorBkpt(int);
#else
# define lsmErrorBkpt(x) (x)
#endif








|
|







99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

typedef unsigned char u8;
typedef unsigned short int u16;
typedef unsigned int u32;
typedef lsm_i64 i64;
typedef unsigned long long int u64;

/* A page number is a 64-bit integer. */
typedef i64 Pgno;

#ifdef LSM_DEBUG
int lsmErrorBkpt(int);
#else
# define lsmErrorBkpt(x) (x)
#endif

Changes to src/lsm_file.c.

1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
  }
  return rc;
}

/*
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, int iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should







|







1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
  }
  return rc;
}

/*
** Obtain a reference to page number iPg.
*/
int lsmFsDbPageGet(FileSystem *pFS, Pgno iPg, Page **ppPg){
  assert( pFS );
  return fsPageGet(pFS, iPg, 0, ppPg);
}

/*
** Return a reference to meta-page iPg. If successful, LSM_OK is returned
** and *ppPg populated with the new page reference. The reference should

Changes to src/lsm_sorted.c.

25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
..
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
...
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
...
317
318
319
320
321
322
323






















324
325
326
327
328
329
330
...
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
...
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
...
523
524
525
526
527
528
529
530
531
532
533
534
535

536
537
538
539
540
541
542
543
544
545
546
547
548
...
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
...
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
...
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829

830
831
832
833
834
835
836
...
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
...
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
....
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
....
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
....
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
....
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
....
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
....
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
....
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
....
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
....
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
....
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
....
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
....
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
....
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
....
4540
4541
4542
4543
4544
4545
4546
4547
4548
4549
4550
4551
4552
4553
4554
....
4600
4601
4602
4603
4604
4605
4606
4607
4608
4609
4610
4611
4612
4613
4614
**
**   The footer consists of the following values (starting at the end of
**   the page and continuing backwards towards the start). All values are
**   stored as unsigned big-endian integers.
**
**     * Number of records on page (2 bytes).
**     * Flags field (2 bytes).
**     * Left-hand pointer value (4 bytes).
**     * The starting offset of each record (2 bytes per record).
**
**   Records may span pages. Unless it happens to be an exact fit, the part
**   of the final record that starts on page X that does not fit on page X
**   is stored at the start of page (X+1). This means there may be pages where
**   (N==0). And on most pages the first record that starts on the page will
**   not start at byte offset 0. For example:
................................................................................
#define rtIsSystem(eType)    (((eType) & LSM_SYSTEMKEY)!=0)

/*
** The following macros are used to access a page footer.
*/
#define SEGMENT_NRECORD_OFFSET(pgsz)        ((pgsz) - 2)
#define SEGMENT_FLAGS_OFFSET(pgsz)          ((pgsz) - 2 - 2)
#define SEGMENT_POINTER_OFFSET(pgsz)        ((pgsz) - 2 - 2 - 4)
#define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 4 - 2 - (iCell)*2)

#define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry)

#define SEGMENT_BTREE_FLAG     0x0001
#define PGFTR_SKIP_NEXT_FLAG   0x0002
#define PGFTR_SKIP_THIS_FLAG   0x0004

................................................................................
  Level *pLevel;                /* Level object segment is part of */
  Segment *pSeg;                /* Segment to access */

  /* Current page. See segmentPtrLoadPage(). */
  Page *pPg;                    /* Current page */
  u16 flags;                    /* Copy of page flags field */
  int nCell;                    /* Number of cells on pPg */
  int iPtr;                     /* Base cascade pointer */

  /* Current cell. See segmentPtrLoadCell() */
  int iCell;                    /* Current record within page pPg */
  int eType;                    /* Type of current record */
  int iPgPtr;                   /* Cascade pointer offset */
  void *pKey; int nKey;         /* Key associated with current record */
  void *pVal; int nVal;         /* Current record value (eType==WRITE only) */

  /* Blobs used to allocate buffers for pKey and pVal as required */
  Blob blob1;
  Blob blob2;
};
................................................................................

u32 lsmGetU32(u8 *aOut){
  return ((u32)aOut[0] << 24) 
       + ((u32)aOut[1] << 16) 
       + ((u32)aOut[2] << 8) 
       + ((u32)aOut[3]);
}























static int sortedBlobGrow(lsm_env *pEnv, Blob *pBlob, int nData){
  assert( pBlob->pEnv==pEnv || (pBlob->pEnv==0 && pBlob->pData==0) );
  if( pBlob->nAlloc<nData ){
    pBlob->pData = lsmReallocOrFree(pEnv, pBlob->pData, nData);
    if( !pBlob->pData ) return LSM_NOMEM;
    pBlob->nAlloc = nData;
................................................................................
  return rc;
}

static int pageGetNRec(u8 *aData, int nData){
  return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
}

static int pageGetPtr(u8 *aData, int nData){
  return (int)lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]);
}

static int pageGetFlags(u8 *aData, int nData){
  return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
}

static u8 *pageGetCell(u8 *aData, int nData, int iCell){
................................................................................
  return pageGetNRec(aData, nData);
}

/*
** Return the decoded (possibly relative) pointer value stored in cell 
** iCell from page aData/nData.
*/
static int pageGetRecordPtr(u8 *aData, int nData, int iCell){
  int iRet;                       /* Return value */
  u8 *aCell;                      /* Pointer to cell iCell */

  assert( iCell<pageGetNRec(aData, nData) && iCell>=0 );
  aCell = pageGetCell(aData, nData, iCell);
  lsmVarintGet32(&aCell[1], &iRet);
  return iRet;
}

static u8 *pageGetKey(
  Page *pPg,                      /* Page to read from */
  int iCell,                      /* Index of cell on page to read */
  int *piTopic,                   /* OUT: Topic associated with this key */
................................................................................
  int nData;
  u8 *aCell;

  aData = fsPageData(pPg, &nData);
  aCell = pageGetCell(aData, nData, iKey);
  assert( aCell[0]==0 );
  aCell++;
  aCell += lsmVarintGet32(aCell, &iRef);
  lsmVarintGet32(aCell, &iRef);
  assert( iRef>0 );
  return iRef;
}


#define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i)))

static int pageGetBtreeKey(
  Page *pPg,
  int iKey, 
  int *piPtr, 
  int *piTopic, 
  void **ppKey,
  int *pnKey,
  Blob *pBlob
){
  u8 *aData;
  int nData;
................................................................................

  aData = fsPageData(pPg, &nData);
  assert( SEGMENT_BTREE_FLAG & pageGetFlags(aData, nData) );
  assert( iKey>=0 && iKey<pageGetNRec(aData, nData) );

  aCell = pageGetCell(aData, nData, iKey);
  eType = *aCell++;
  aCell += GETVARINT32(aCell, *piPtr);

  if( eType==0 ){
    int rc;
    Pgno iRef;                  /* Page number of referenced page */
    Page *pRef;
    aCell += GETVARINT32(aCell, iRef);
    rc = lsmFsDbPageGet(lsmPageFS(pPg), iRef, &pRef);
    if( rc!=LSM_OK ) return rc;
    pageGetKeyCopy(lsmPageEnv(pPg), pRef, 0, &eType, pBlob);
    lsmFsPageRelease(pRef);
    *ppKey = pBlob->pData;
    *pnKey = pBlob->nData;
  }else{
................................................................................
static int btreeCursorLoadKey(BtreeCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->iPg<0 ){
    pCsr->pKey = 0;
    pCsr->nKey = 0;
    pCsr->eType = 0;
  }else{
    int dummy;
    int iPg = pCsr->iPg;
    int iCell = pCsr->aPg[iPg].iCell;
    while( iCell<0 && (--iPg)>=0 ){
      iCell = pCsr->aPg[iPg].iCell-1;
    }
    if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;

................................................................................

    /* Populate any other aPg[] array entries */
    if( rc==LSM_OK && nDepth>1 ){
      Blob blob = {0,0,0};
      void *pSeek;
      int nSeek;
      int iTopicSeek;
      int dummy;
      int iPg = 0;
      int iLoad = pCsr->pSeg->iRoot;
      Page *pPg = pCsr->aPg[nDepth-1].pPage;
 
      if( pageObjGetNRec(pPg)==0 ){
        /* This can happen when pPg is the right-most leaf in the b-tree.
        ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
        ** greater than any real key.  */
        assert( iCell==-1 );
        iTopicSeek = 1000;
        pSeek = 0;
        nSeek = 0;
      }else{

        rc = pageGetBtreeKey(pPg,
            0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob
        );
      }

      do {
        Page *pPg;
................................................................................
          iMax = iCell-1;
          iMin = 0;

          while( iMax>=iMin ){
            int iTry = (iMin+iMax)/2;
            void *pKey; int nKey;         /* Key for cell iTry */
            int iTopic;                   /* Topic for key pKeyT/nKeyT */
            int iPtr;                     /* Pointer for cell iTry */
            int res;                      /* (pSeek - pKeyT) */

            rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopic, &pKey, &nKey,&blob);
            if( rc!=LSM_OK ) break;

            res = sortedKeyCompare(
                xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
................................................................................
      u8 *aData;
      int nData;

      pBtreePg = &pCsr->aPg[pCsr->iPg];
      aData = fsPageData(pBtreePg->pPage, &nData);
      pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1);
      if( pBtreePg->iCell<0 ){
        int dummy;
        int i;
        for(i=pCsr->iPg-1; i>=0; i--){
          if( pCsr->aPg[i].iCell>0 ) break;
        }
        assert( i>=0 );
        rc = pageGetBtreeKey(
            pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
................................................................................

    assert( iNew<pPtr->nCell );
    pPtr->iCell = iNew;
    aData = fsPageData(pPtr->pPg, &nPgsz);
    iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nPgsz, pPtr->iCell)]);
    pPtr->eType = aData[iOff];
    iOff++;
    iOff += GETVARINT32(&aData[iOff], pPtr->iPgPtr);
    iOff += GETVARINT32(&aData[iOff], pPtr->nKey);
    if( rtIsWrite(pPtr->eType) ){
      iOff += GETVARINT32(&aData[iOff], pPtr->nVal);
    }
    assert( pPtr->nKey>=0 );

    rc = segmentPtrReadData(
................................................................................
    u8 *aData;
    int nData;
  
    aData = lsmFsPageData(pPg, &nData);
    if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){
      void *pKey;
      int nKey;
      int dummy;
      rc = pageGetBtreeKey(
          pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob
      );
      if( rc==LSM_OK && blob.pData!=pKey ){
        rc = sortedBlobSet(pEnv, &blob, pKey, nKey);
      }
    }else{
................................................................................
  int *pbStop
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  int iPtrOut = 0;
  const int iTopic = 0;

  /* If the current page contains an oversized entry, then there are no
  ** pointers to one or more of the subsequent pages in the sorted run.
  ** The following call ensures that the segment-ptr points to the correct 
  ** page in this case.  */
  rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey);
................................................................................

      iMin = 0;
      iMax = nRec-1;
      while( iMax>=iMin ){
        int iTry = (iMin+iMax)/2;
        void *pKeyT; int nKeyT;       /* Key for cell iTry */
        int iTopicT;                  /* Topic for key pKeyT/nKeyT */
        int iPtr;                     /* Pointer associated with cell iTry */
        int res;                      /* (pKey - pKeyT) */

        rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob);
        if( rc!=LSM_OK ) break;
        if( piFirst && pKeyT==blob.pData ){
          *piFirst = pageGetBtreeRef(pPg, iTry);
          piFirst = 0;
................................................................................
  MergeWorker *pMW,               /* Merge worker */
  int bSep                        /* True for separators run */
){
  Segment *pSeg;                  /* Segment being written */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  int rc = LSM_OK;                /* Return code */
  int i;
  int iRight = 0;
  Page **apHier = pMW->hier.apHier;
  int nHier = pMW->hier.nHier;

  assert( nHier>0 && pMW->pLevel->pMerge->bHierReadonly );
  pSeg = &pMW->pLevel->lhs;

  for(i=0; rc==LSM_OK && i<nHier; i++){
................................................................................
        ** since sometimes n1>n2, the page content and footer must be copied 
        ** separately. */
        int nEntry = pageGetNRec(a2, n2);
        int iEof1 = SEGMENT_EOF(n1, nEntry);
        int iEof2 = SEGMENT_EOF(n2, nEntry);
        memcpy(a1, a2, iEof2);
        memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
        if( iRight ) lsmPutU32(&a1[SEGMENT_POINTER_OFFSET(n1)], iRight);
        lsmFsPageRelease(apHier[i]);
        apHier[i] = pNew;
        iRight = lsmFsPageNumber(pNew);
      }else{
        lsmPutU16(&a1[SEGMENT_FLAGS_OFFSET(n1)], SEGMENT_BTREE_FLAG);
        lsmPutU16(&a1[SEGMENT_NRECORD_OFFSET(n1)], 0);
        lsmPutU32(&a1[SEGMENT_POINTER_OFFSET(n1)], 0);
        i = i - 1;
        lsmFsPageRelease(pNew);
      }
    }
  }

#ifdef LSM_DEBUG
................................................................................
  rc = mergeWorkerLoadHierarchy(pMW);

  /* Obtain the absolute pointer value to store along with the key in the
  ** page body. This pointer points to a page that contains keys that are
  ** smaller than pKey/nKey.  */
  if( p->nHier ){
    aData = fsPageData(p->apHier[0], &nData);
    iPtr = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]);
  }else{
    iPtr = pSeg->iFirst;
  }

  if( p->nHier && pMW->pLevel->pMerge->bHierReadonly ){
    rc = mergeWorkerMoveHierarchy(pMW, bSep);
    if( rc!=LSM_OK ) goto push_hierarchy_out;
................................................................................
  **
  ** This loop searches for a node with enough space to store the key on,
  ** starting with the leaf and iterating up towards the root. When the loop
  ** exits, the key may be written to apHier[iLevel].
  */
  for(iLevel=0; iLevel<=p->nHier; iLevel++){
    int nByte;                    /* Number of free bytes required */
    int iRight;                   /* Right hand pointer from aData[]/nData */

    if( iLevel==p->nHier ){
      /* Extend the array and allocate a new root page. */
      Page **aNew;
      aNew = (Page **)lsmRealloc(
          pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
      );
................................................................................
      p->apHier = aNew;
    }else{
      int nFree;

      /* If the key will fit on this page, break out of the loop. */
      assert( lsmFsPageWritable(p->apHier[iLevel]) );
      aData = fsPageData(p->apHier[iLevel], &nData);
      iRight = lsmGetU32(&aData[SEGMENT_POINTER_OFFSET(nData)]);
      if( bIndirect ){
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(iKeyPg);
      }else{
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(nKey) + nKey;
      }
      nRec = pageGetNRec(aData, nData);
      nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
................................................................................

    aData = fsPageData(p->apHier[iLevel], &nData);
    memset(aData, 0, nData);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    if( iLevel>0 ){
      iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
      lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
    }

    if( iLevel==p->nHier ){
      p->nHier++;
      break;
    }
  }
................................................................................
    aData[iOff++] = (u8)(iTopic | LSM_SEPARATOR);
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  if( iLevel>0 ){
    int iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
    lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
  }

  /* Write the right-hand pointer of the right-most leaf page of the 
  ** b-tree heirarchy. */
  aData = fsPageData(p->apHier[0], &nData);
  lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iKeyPg);

  /* Ensure that the SortedRun.iRoot field is correct. */
  pSeg->iRoot = lsmFsPageNumber(p->apHier[p->nHier-1]);

push_hierarchy_out:
  return rc;
}
................................................................................
** zero records. The flags field is cleared. The page footer pointer field
** is set to iFPtr.
**
** If successful, LSM_OK is returned. Otherwise, an error code.
*/
static int mergeWorkerNextPage(
  MergeWorker *pMW,               /* Merge worker object to append page to */
  int iFPtr                       /* Pointer value for footer of new page */
){
  int rc = LSM_OK;                /* Return code */
  Page *pNext = 0;                /* New page appended to run */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  Segment *pSeg;                  /* Run to append to */

  pSeg = &pMW->pLevel->lhs;
................................................................................
    lsmFsPageRelease(pMW->pPage);
    pMW->pPage = pNext;
    pMW->pLevel->pMerge->iOutputOff = 0;

    aData = fsPageData(pNext, &nData);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
    lsmPutU32(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);

    pMW->nWork++;
  }

  return rc;
}

................................................................................
    aCell = pageGetCell(aData, nData, i);
    eType = *aCell++;
    assert( (flags & SEGMENT_BTREE_FLAG) || eType!=0 );
    aCell += lsmVarintGet32(aCell, &iPgPtr);

    if( eType==0 ){
      Pgno iRef;                  /* Page number of referenced page */
      aCell += lsmVarintGet32(aCell, &iRef);
      lsmFsDbPageGet(pDb->pFS, iRef, &pRef);
      aKey = pageGetKey(pRef, 0, &iTopic, &nKey, &blob);
    }else{
      aCell += lsmVarintGet32(aCell, &nKey);
      if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
      sortedReadData(pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob);
      aVal = &aKey[nKey];
................................................................................
  aCell = pageGetCell(aData, nData, iCell);
  eType = *aCell++;
  aCell += lsmVarintGet32(aCell, &iPgPtr);

  if( eType==0 ){
    int dummy;
    Pgno iRef;                  /* Page number of referenced page */
    aCell += lsmVarintGet32(aCell, &iRef);
    lsmFsDbPageGet(pDb->pFS, iRef, &pRef);
    pageGetKeyCopy(pDb->pEnv, pRef, 0, &dummy, pBlob);
    aKey = (u8 *)pBlob->pData;
    nKey = pBlob->nData;
    lsmFsPageRelease(pRef);
  }else{
    aCell += lsmVarintGet32(aCell, &nKey);







|







 







|
|







 







|




|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|
|







 







|
|




|







 







|
|




>





|







 







|





|







 







|







 







<













>







 







|







 







|







 







|







 







|







 







|







 







|







 







|







 







|






|







 







|







 







|







 







|







 







|







 







|
|





|







 







|







 







|







 







|







 







|







25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
..
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
...
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
...
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
...
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
...
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
...
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
...
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
...
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
...
832
833
834
835
836
837
838

839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
...
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
...
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
....
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
....
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
....
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
....
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
....
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
....
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
....
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
....
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
....
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
....
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
....
3182
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
....
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
....
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
3247
3248
3249
3250
....
4563
4564
4565
4566
4567
4568
4569
4570
4571
4572
4573
4574
4575
4576
4577
....
4623
4624
4625
4626
4627
4628
4629
4630
4631
4632
4633
4634
4635
4636
4637
**
**   The footer consists of the following values (starting at the end of
**   the page and continuing backwards towards the start). All values are
**   stored as unsigned big-endian integers.
**
**     * Number of records on page (2 bytes).
**     * Flags field (2 bytes).
**     * Left-hand pointer value (8 bytes).
**     * The starting offset of each record (2 bytes per record).
**
**   Records may span pages. Unless it happens to be an exact fit, the part
**   of the final record that starts on page X that does not fit on page X
**   is stored at the start of page (X+1). This means there may be pages where
**   (N==0). And on most pages the first record that starts on the page will
**   not start at byte offset 0. For example:
................................................................................
#define rtIsSystem(eType)    (((eType) & LSM_SYSTEMKEY)!=0)

/*
** The following macros are used to access a page footer.
*/
#define SEGMENT_NRECORD_OFFSET(pgsz)        ((pgsz) - 2)
#define SEGMENT_FLAGS_OFFSET(pgsz)          ((pgsz) - 2 - 2)
#define SEGMENT_POINTER_OFFSET(pgsz)        ((pgsz) - 2 - 2 - 8)
#define SEGMENT_CELLPTR_OFFSET(pgsz, iCell) ((pgsz) - 2 - 2 - 8 - 2 - (iCell)*2)

#define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry)

#define SEGMENT_BTREE_FLAG     0x0001
#define PGFTR_SKIP_NEXT_FLAG   0x0002
#define PGFTR_SKIP_THIS_FLAG   0x0004

................................................................................
  Level *pLevel;                /* Level object segment is part of */
  Segment *pSeg;                /* Segment to access */

  /* Current page. See segmentPtrLoadPage(). */
  Page *pPg;                    /* Current page */
  u16 flags;                    /* Copy of page flags field */
  int nCell;                    /* Number of cells on pPg */
  Pgno iPtr;                    /* Base cascade pointer */

  /* Current cell. See segmentPtrLoadCell() */
  int iCell;                    /* Current record within page pPg */
  int eType;                    /* Type of current record */
  Pgno iPgPtr;                  /* Cascade pointer offset */
  void *pKey; int nKey;         /* Key associated with current record */
  void *pVal; int nVal;         /* Current record value (eType==WRITE only) */

  /* Blobs used to allocate buffers for pKey and pVal as required */
  Blob blob1;
  Blob blob2;
};
................................................................................

u32 lsmGetU32(u8 *aOut){
  return ((u32)aOut[0] << 24) 
       + ((u32)aOut[1] << 16) 
       + ((u32)aOut[2] << 8) 
       + ((u32)aOut[3]);
}

u32 lsmGetU64(u8 *aOut){
  return ((u64)aOut[0] << 56) 
       + ((u64)aOut[1] << 48) 
       + ((u64)aOut[2] << 40) 
       + ((u64)aOut[3] << 32) 
       + ((u64)aOut[4] << 24)
       + ((u32)aOut[5] << 16) 
       + ((u32)aOut[6] << 8) 
       + ((u32)aOut[7]);
}

void lsmPutU64(u8 *aOut, u64 nVal){
  aOut[0] = (u8)((nVal>>56) & 0xFF);
  aOut[1] = (u8)((nVal>>48) & 0xFF);
  aOut[2] = (u8)((nVal>>40) & 0xFF);
  aOut[3] = (u8)((nVal>>32) & 0xFF);
  aOut[4] = (u8)((nVal>>24) & 0xFF);
  aOut[5] = (u8)((nVal>>16) & 0xFF);
  aOut[6] = (u8)((nVal>> 8) & 0xFF);
  aOut[7] = (u8)((nVal    ) & 0xFF);
}

static int sortedBlobGrow(lsm_env *pEnv, Blob *pBlob, int nData){
  assert( pBlob->pEnv==pEnv || (pBlob->pEnv==0 && pBlob->pData==0) );
  if( pBlob->nAlloc<nData ){
    pBlob->pData = lsmReallocOrFree(pEnv, pBlob->pData, nData);
    if( !pBlob->pData ) return LSM_NOMEM;
    pBlob->nAlloc = nData;
................................................................................
  return rc;
}

static int pageGetNRec(u8 *aData, int nData){
  return (int)lsmGetU16(&aData[SEGMENT_NRECORD_OFFSET(nData)]);
}

static Pgno pageGetPtr(u8 *aData, int nData){
  return (Pgno)lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]);
}

static int pageGetFlags(u8 *aData, int nData){
  return (int)lsmGetU16(&aData[SEGMENT_FLAGS_OFFSET(nData)]);
}

static u8 *pageGetCell(u8 *aData, int nData, int iCell){
................................................................................
  return pageGetNRec(aData, nData);
}

/*
** Return the decoded (possibly relative) pointer value stored in cell 
** iCell from page aData/nData.
*/
static Pgno pageGetRecordPtr(u8 *aData, int nData, int iCell){
  Pgno iRet;                      /* Return value */
  u8 *aCell;                      /* Pointer to cell iCell */

  assert( iCell<pageGetNRec(aData, nData) && iCell>=0 );
  aCell = pageGetCell(aData, nData, iCell);
  lsmVarintGet64(&aCell[1], &iRet);
  return iRet;
}

static u8 *pageGetKey(
  Page *pPg,                      /* Page to read from */
  int iCell,                      /* Index of cell on page to read */
  int *piTopic,                   /* OUT: Topic associated with this key */
................................................................................
  int nData;
  u8 *aCell;

  aData = fsPageData(pPg, &nData);
  aCell = pageGetCell(aData, nData, iKey);
  assert( aCell[0]==0 );
  aCell++;
  aCell += lsmVarintGet64(aCell, &iRef);
  lsmVarintGet64(aCell, &iRef);
  assert( iRef>0 );
  return iRef;
}

#define GETVARINT64(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet64((a), &(i)))
#define GETVARINT32(a, i) (((i)=((u8*)(a))[0])<=240?1:lsmVarintGet32((a), &(i)))

static int pageGetBtreeKey(
  Page *pPg,
  int iKey, 
  Pgno *piPtr, 
  int *piTopic, 
  void **ppKey,
  int *pnKey,
  Blob *pBlob
){
  u8 *aData;
  int nData;
................................................................................

  aData = fsPageData(pPg, &nData);
  assert( SEGMENT_BTREE_FLAG & pageGetFlags(aData, nData) );
  assert( iKey>=0 && iKey<pageGetNRec(aData, nData) );

  aCell = pageGetCell(aData, nData, iKey);
  eType = *aCell++;
  aCell += GETVARINT64(aCell, *piPtr);

  if( eType==0 ){
    int rc;
    Pgno iRef;                  /* Page number of referenced page */
    Page *pRef;
    aCell += GETVARINT64(aCell, iRef);
    rc = lsmFsDbPageGet(lsmPageFS(pPg), iRef, &pRef);
    if( rc!=LSM_OK ) return rc;
    pageGetKeyCopy(lsmPageEnv(pPg), pRef, 0, &eType, pBlob);
    lsmFsPageRelease(pRef);
    *ppKey = pBlob->pData;
    *pnKey = pBlob->nData;
  }else{
................................................................................
static int btreeCursorLoadKey(BtreeCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->iPg<0 ){
    pCsr->pKey = 0;
    pCsr->nKey = 0;
    pCsr->eType = 0;
  }else{
    Pgno dummy;
    int iPg = pCsr->iPg;
    int iCell = pCsr->aPg[iPg].iCell;
    while( iCell<0 && (--iPg)>=0 ){
      iCell = pCsr->aPg[iPg].iCell-1;
    }
    if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;

................................................................................

    /* Populate any other aPg[] array entries */
    if( rc==LSM_OK && nDepth>1 ){
      Blob blob = {0,0,0};
      void *pSeek;
      int nSeek;
      int iTopicSeek;

      int iPg = 0;
      int iLoad = pCsr->pSeg->iRoot;
      Page *pPg = pCsr->aPg[nDepth-1].pPage;
 
      if( pageObjGetNRec(pPg)==0 ){
        /* This can happen when pPg is the right-most leaf in the b-tree.
        ** In this case, set the iTopicSeek/pSeek/nSeek key to a value
        ** greater than any real key.  */
        assert( iCell==-1 );
        iTopicSeek = 1000;
        pSeek = 0;
        nSeek = 0;
      }else{
        Pgno dummy;
        rc = pageGetBtreeKey(pPg,
            0, &dummy, &iTopicSeek, &pSeek, &nSeek, &pCsr->blob
        );
      }

      do {
        Page *pPg;
................................................................................
          iMax = iCell-1;
          iMin = 0;

          while( iMax>=iMin ){
            int iTry = (iMin+iMax)/2;
            void *pKey; int nKey;         /* Key for cell iTry */
            int iTopic;                   /* Topic for key pKeyT/nKeyT */
            Pgno iPtr;                    /* Pointer for cell iTry */
            int res;                      /* (pSeek - pKeyT) */

            rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopic, &pKey, &nKey,&blob);
            if( rc!=LSM_OK ) break;

            res = sortedKeyCompare(
                xCmp, iTopicSeek, pSeek, nSeek, iTopic, pKey, nKey
................................................................................
      u8 *aData;
      int nData;

      pBtreePg = &pCsr->aPg[pCsr->iPg];
      aData = fsPageData(pBtreePg->pPage, &nData);
      pCsr->iPtr = btreeCursorPtr(aData, nData, pBtreePg->iCell+1);
      if( pBtreePg->iCell<0 ){
        Pgno dummy;
        int i;
        for(i=pCsr->iPg-1; i>=0; i--){
          if( pCsr->aPg[i].iCell>0 ) break;
        }
        assert( i>=0 );
        rc = pageGetBtreeKey(
            pCsr->aPg[i].pPage, pCsr->aPg[i].iCell-1,
................................................................................

    assert( iNew<pPtr->nCell );
    pPtr->iCell = iNew;
    aData = fsPageData(pPtr->pPg, &nPgsz);
    iOff = lsmGetU16(&aData[SEGMENT_CELLPTR_OFFSET(nPgsz, pPtr->iCell)]);
    pPtr->eType = aData[iOff];
    iOff++;
    iOff += GETVARINT64(&aData[iOff], pPtr->iPgPtr);
    iOff += GETVARINT32(&aData[iOff], pPtr->nKey);
    if( rtIsWrite(pPtr->eType) ){
      iOff += GETVARINT32(&aData[iOff], pPtr->nVal);
    }
    assert( pPtr->nKey>=0 );

    rc = segmentPtrReadData(
................................................................................
    u8 *aData;
    int nData;
  
    aData = lsmFsPageData(pPg, &nData);
    if( pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG ){
      void *pKey;
      int nKey;
      Pgno dummy;
      rc = pageGetBtreeKey(
          pPg, pMerge->splitkey.iCell, &dummy, &iTopic, &pKey, &nKey, &blob
      );
      if( rc==LSM_OK && blob.pData!=pKey ){
        rc = sortedBlobSet(pEnv, &blob, pKey, nKey);
      }
    }else{
................................................................................
  int *pbStop
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  Pgno iPtrOut = 0;
  const int iTopic = 0;

  /* If the current page contains an oversized entry, then there are no
  ** pointers to one or more of the subsequent pages in the sorted run.
  ** The following call ensures that the segment-ptr points to the correct 
  ** page in this case.  */
  rc = segmentPtrSearchOversized(pCsr, pPtr, pKey, nKey);
................................................................................

      iMin = 0;
      iMax = nRec-1;
      while( iMax>=iMin ){
        int iTry = (iMin+iMax)/2;
        void *pKeyT; int nKeyT;       /* Key for cell iTry */
        int iTopicT;                  /* Topic for key pKeyT/nKeyT */
        Pgno iPtr;                    /* Pointer associated with cell iTry */
        int res;                      /* (pKey - pKeyT) */

        rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob);
        if( rc!=LSM_OK ) break;
        if( piFirst && pKeyT==blob.pData ){
          *piFirst = pageGetBtreeRef(pPg, iTry);
          piFirst = 0;
................................................................................
  MergeWorker *pMW,               /* Merge worker */
  int bSep                        /* True for separators run */
){
  Segment *pSeg;                  /* Segment being written */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  int rc = LSM_OK;                /* Return code */
  int i;
  Pgno iRight = 0;
  Page **apHier = pMW->hier.apHier;
  int nHier = pMW->hier.nHier;

  assert( nHier>0 && pMW->pLevel->pMerge->bHierReadonly );
  pSeg = &pMW->pLevel->lhs;

  for(i=0; rc==LSM_OK && i<nHier; i++){
................................................................................
        ** since sometimes n1>n2, the page content and footer must be copied 
        ** separately. */
        int nEntry = pageGetNRec(a2, n2);
        int iEof1 = SEGMENT_EOF(n1, nEntry);
        int iEof2 = SEGMENT_EOF(n2, nEntry);
        memcpy(a1, a2, iEof2);
        memcpy(&a1[iEof1], &a2[iEof2], n2 - iEof2);
        if( iRight ) lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], iRight);
        lsmFsPageRelease(apHier[i]);
        apHier[i] = pNew;
        iRight = lsmFsPageNumber(pNew);
      }else{
        lsmPutU16(&a1[SEGMENT_FLAGS_OFFSET(n1)], SEGMENT_BTREE_FLAG);
        lsmPutU16(&a1[SEGMENT_NRECORD_OFFSET(n1)], 0);
        lsmPutU64(&a1[SEGMENT_POINTER_OFFSET(n1)], 0);
        i = i - 1;
        lsmFsPageRelease(pNew);
      }
    }
  }

#ifdef LSM_DEBUG
................................................................................
  rc = mergeWorkerLoadHierarchy(pMW);

  /* Obtain the absolute pointer value to store along with the key in the
  ** page body. This pointer points to a page that contains keys that are
  ** smaller than pKey/nKey.  */
  if( p->nHier ){
    aData = fsPageData(p->apHier[0], &nData);
    iPtr = lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]);
  }else{
    iPtr = pSeg->iFirst;
  }

  if( p->nHier && pMW->pLevel->pMerge->bHierReadonly ){
    rc = mergeWorkerMoveHierarchy(pMW, bSep);
    if( rc!=LSM_OK ) goto push_hierarchy_out;
................................................................................
  **
  ** This loop searches for a node with enough space to store the key on,
  ** starting with the leaf and iterating up towards the root. When the loop
  ** exits, the key may be written to apHier[iLevel].
  */
  for(iLevel=0; iLevel<=p->nHier; iLevel++){
    int nByte;                    /* Number of free bytes required */
    Pgno iRight;                  /* Right hand pointer from aData[]/nData */

    if( iLevel==p->nHier ){
      /* Extend the array and allocate a new root page. */
      Page **aNew;
      aNew = (Page **)lsmRealloc(
          pMW->pDb->pEnv, p->apHier, sizeof(Page *)*(p->nHier+1)
      );
................................................................................
      p->apHier = aNew;
    }else{
      int nFree;

      /* If the key will fit on this page, break out of the loop. */
      assert( lsmFsPageWritable(p->apHier[iLevel]) );
      aData = fsPageData(p->apHier[iLevel], &nData);
      iRight = lsmGetU64(&aData[SEGMENT_POINTER_OFFSET(nData)]);
      if( bIndirect ){
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(iKeyPg);
      }else{
        nByte = 2 + 1 + lsmVarintLen32(iRight) + lsmVarintLen32(nKey) + nKey;
      }
      nRec = pageGetNRec(aData, nData);
      nFree = SEGMENT_EOF(nData, nRec) - mergeWorkerPageOffset(aData, nData);
................................................................................

    aData = fsPageData(p->apHier[iLevel], &nData);
    memset(aData, 0, nData);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    if( iLevel>0 ){
      iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
      lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
    }

    if( iLevel==p->nHier ){
      p->nHier++;
      break;
    }
  }
................................................................................
    aData[iOff++] = (u8)(iTopic | LSM_SEPARATOR);
    iOff += lsmVarintPut32(&aData[iOff], iPtr);
    iOff += lsmVarintPut32(&aData[iOff], nKey);
    memcpy(&aData[iOff], pKey, nKey);
  }

  if( iLevel>0 ){
    Pgno iRight = lsmFsPageNumber(p->apHier[iLevel-1]);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iRight);
  }

  /* Write the right-hand pointer of the right-most leaf page of the 
  ** b-tree heirarchy. */
  aData = fsPageData(p->apHier[0], &nData);
  lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iKeyPg);

  /* Ensure that the SortedRun.iRoot field is correct. */
  pSeg->iRoot = lsmFsPageNumber(p->apHier[p->nHier-1]);

push_hierarchy_out:
  return rc;
}
................................................................................
** zero records. The flags field is cleared. The page footer pointer field
** is set to iFPtr.
**
** If successful, LSM_OK is returned. Otherwise, an error code.
*/
static int mergeWorkerNextPage(
  MergeWorker *pMW,               /* Merge worker object to append page to */
  Pgno iFPtr                      /* Pointer value for footer of new page */
){
  int rc = LSM_OK;                /* Return code */
  Page *pNext = 0;                /* New page appended to run */
  lsm_db *pDb = pMW->pDb;         /* Database handle */
  Segment *pSeg;                  /* Run to append to */

  pSeg = &pMW->pLevel->lhs;
................................................................................
    lsmFsPageRelease(pMW->pPage);
    pMW->pPage = pNext;
    pMW->pLevel->pMerge->iOutputOff = 0;

    aData = fsPageData(pNext, &nData);
    lsmPutU16(&aData[SEGMENT_NRECORD_OFFSET(nData)], 0);
    lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], 0);
    lsmPutU64(&aData[SEGMENT_POINTER_OFFSET(nData)], iFPtr);

    pMW->nWork++;
  }

  return rc;
}

................................................................................
    aCell = pageGetCell(aData, nData, i);
    eType = *aCell++;
    assert( (flags & SEGMENT_BTREE_FLAG) || eType!=0 );
    aCell += lsmVarintGet32(aCell, &iPgPtr);

    if( eType==0 ){
      Pgno iRef;                  /* Page number of referenced page */
      aCell += lsmVarintGet64(aCell, &iRef);
      lsmFsDbPageGet(pDb->pFS, iRef, &pRef);
      aKey = pageGetKey(pRef, 0, &iTopic, &nKey, &blob);
    }else{
      aCell += lsmVarintGet32(aCell, &nKey);
      if( rtIsWrite(eType) ) aCell += lsmVarintGet32(aCell, &nVal);
      sortedReadData(pPg, (aCell-aData), nKey+nVal, (void **)&aKey, &blob);
      aVal = &aKey[nKey];
................................................................................
  aCell = pageGetCell(aData, nData, iCell);
  eType = *aCell++;
  aCell += lsmVarintGet32(aCell, &iPgPtr);

  if( eType==0 ){
    int dummy;
    Pgno iRef;                  /* Page number of referenced page */
    aCell += lsmVarintGet64(aCell, &iRef);
    lsmFsDbPageGet(pDb->pFS, iRef, &pRef);
    pageGetKeyCopy(pDb->pEnv, pRef, 0, &dummy, pBlob);
    aKey = (u8 *)pBlob->pData;
    nKey = pBlob->nData;
    lsmFsPageRelease(pRef);
  }else{
    aCell += lsmVarintGet32(aCell, &nKey);

Changes to tool/lsmperf.tcl.

187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
  append script $data4

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 600 50000 50000 20 {
  lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0 write_buffer=2097152"
  LevelDB leveldb
}

# lsm-mt    "mmap=1 multi_proc=0 threads=2 autowork=0 autocheckpoint=8192000"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"







|
|







187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
  append script $data4

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 600 50000 50000 20 {
 lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
 lsm-st2    "page_size=1024 mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
}

# lsm-mt    "mmap=1 multi_proc=0 threads=2 autowork=0 autocheckpoint=8192000"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"