SQLite4
Check-in [b84772a1aa]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Remove dead code from lsm_sorted.c.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: b84772a1aa7f526e2e56b8119573728eaaa58fe3
User & Date: dan 2012-10-02 12:05:48
Context
2012-10-02
18:06
Simplify the way new cursors are created. check-in: 63d8eea506 user: dan tags: trunk
12:05
Remove dead code from lsm_sorted.c. check-in: b84772a1aa user: dan tags: trunk
05:19
Remove a layer of abstraction from the cursor object in lsm_sorted.c. check-in: ff71b6f778 user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsm_sorted.c.

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
...
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
...
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
....
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
....
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
....
1271
1272
1273
1274
1275
1276
1277








1278
1279
1280
1281
1282
1283
1284
1285
....
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
....
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
....
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
....
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
....
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
....
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
....
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
....
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
....
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
....
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
....
4716
4717
4718
4719
4720
4721
4722
4723
4724
4725
4726
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739
4740
4741
4742
4743
4744
4745
4746
4747
4748
4749
4750

#define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry)

#define SEGMENT_BTREE_FLAG     0x0001
#define PGFTR_SKIP_NEXT_FLAG   0x0002
#define PGFTR_SKIP_THIS_FLAG   0x0004

typedef struct LevelCursor LevelCursor;
typedef struct SegmentPtr SegmentPtr;
typedef struct Blob Blob;

struct Blob {
  lsm_env *pEnv;
  void *pData;
  int nData;
................................................................................
  void *pVal; int nVal;         /* Current record value (eType==WRITE only) */

  /* Blobs used to allocate buffers for pKey and pVal as required */
  Blob blob1;
  Blob blob2;
};

/*
** A cursor used to search or iterate within a single sorted file. 
** LevelCursor structures are only used within this file. The rest of the
** system uses MultiCursor objects to search and iterate within the merge
** of a TreeCursor and zero or more LevelCursor objects.
**
** Instances of this structure are manipulated using the following static
** functions. Similar to the 
**
**   levelCursorInit()
**   segmentCursorSeek()
**   segmentCursorAdvance()
**   segmentCursorEnd()
**
**   segmentCursorKey()
**   segmentCursorValue()
**   segmentCursorValid()
*/
struct LevelCursor {
  FileSystem *pFS;                /* File system to read from */
  int (*xCmp)(void *, int, void *, int);         /* Compare function */
  int bIgnoreSeparators;          /* True to ignore SORTED_SEPARATOR records */
  int bIgnoreSystem;              /* True to ignore records for topic!=0 */
  int iCurrentPtr;                /* Current entry in aPtr[] */
  int nPtr;                       /* Size of aPtr[] array */
  SegmentPtr *aPtr;               /* Array of segment pointers */
  Level *pLevel;                  /* Pointer to Level object (if nPtr>1) */
};

/*
** Used to iterate through the keys stored in a b-tree hierarchy from start
** to finish. Only First() and Next() operations are required.
**
**   btreeCursorNew()
**   btreeCursorFirst()
**   btreeCursorNext()
................................................................................

  int eType;                      /* Cache of current key type */
  Blob key;                       /* Cache of current key (or NULL) */
  Blob val;                       /* Cache of current value */

  /* All the component cursors: */
  TreeCursor *apTreeCsr[2];       /* Up to two tree cursors */
#if 0
  int nSegCsr;                    /* Size of aSegCsr[] array */
  LevelCursor *aSegCsr;           /* Array of cursors open on sorted files */
#else
  SegmentPtr *aPtr;               /* Array of segment pointers */
  int nPtr;                       /* Size of array aPtr[] */
#endif
  BtreeCursor *pBtCsr;            /* b-tree cursor (db writes only) */

  /* Comparison results */
  int nTree;                      /* Size of aTree[] array */
  int *aTree;                     /* Array of comparison results */

  /* Used by cursors flushing the in-memory tree only */
................................................................................
    pLevel->nSplitKey = blob.nData;
    lsmFsPageRelease(pPg);
  }

  *pRc = rc;
}

/*
** Initialize a LevelCursor structure. The caller is responsible for
** allocating the memory that the structure itself occupies.
*/
static int levelCursorInit(
  lsm_db *pDb,
  Level *pLevel, 
  int (*xCmp)(void *, int, void *, int),
  LevelCursor *pCsr              /* Cursor structure to initialize */
){
  int rc = LSM_OK;
  int nPtr = 1 + pLevel->nRight;

  memset(pCsr, 0, sizeof(LevelCursor));
  pCsr->pFS = pDb->pFS;
  pCsr->bIgnoreSeparators = 1;
  pCsr->xCmp = xCmp;
  pCsr->pLevel = pLevel;
  pCsr->aPtr = (SegmentPtr*)lsmMallocZeroRc(pDb->pEnv, 
      sizeof(SegmentPtr)*nPtr, &rc
  );

  if( rc==LSM_OK ){
    int i;
    pCsr->aPtr[0].pSeg = &pLevel->lhs;
    pCsr->nPtr = nPtr;

    for(i=0; i<pLevel->nRight; i++){
      pCsr->aPtr[i+1].pSeg = &pLevel->aRhs[i];
    }
  }

  if( nPtr>1 && pLevel->pSplitKey==0 ){
    lsmSortedSplitkey(pDb, pLevel, &rc);
  }

  return rc;
}

static int levelCursorInitRun(
  lsm_db *pDb,
  Segment *pSeg, 
  int (*xCmp)(void *, int, void *, int),
  LevelCursor *pCsr              /* Cursor structure to initialize */
){
  int rc = LSM_OK;

  memset(pCsr, 0, sizeof(LevelCursor));
  pCsr->pFS = pDb->pFS;
  pCsr->bIgnoreSeparators = 1;
  pCsr->xCmp = xCmp;
  pCsr->nPtr = 1;
  pCsr->aPtr = (SegmentPtr*)lsmMallocZeroRc(pDb->pEnv, 
      sizeof(SegmentPtr)*pCsr->nPtr, &rc
  );

  if( rc==LSM_OK ){
    pCsr->aPtr[0].pSeg = pSeg;
  }

  return rc;
}

static void segmentPtrReset(SegmentPtr *pPtr){
  lsmFsPageRelease(pPtr->pPg);
  pPtr->pPg = 0;
  pPtr->nCell = 0;
  pPtr->pKey = 0;
  pPtr->nKey = 0;
  pPtr->pVal = 0;
................................................................................
  pPtr->nVal = 0;
  pPtr->eType = 0;
  pPtr->iCell = 0;
  sortedBlobFree(&pPtr->blob1);
  sortedBlobFree(&pPtr->blob2);
}

static void segmentCursorReset(LevelCursor *pCsr){
  int i;
  for(i=0; i<pCsr->nPtr; i++){
    segmentPtrReset(&pCsr->aPtr[i]);
  }
}

/*
** Close the cursor. Release all associated resources. The memory occupied
** by the LevelCursor structure itself is not released. It is assumed to
** be managed by the caller.
*/
static void segmentCursorClose(lsm_env *pEnv, LevelCursor *pCsr){
  segmentCursorReset(pCsr);
  lsmFree(pEnv, pCsr->aPtr);
  memset(pCsr, 0, sizeof(LevelCursor));
}

static int segmentPtrIgnoreSeparators(MultiCursor *pCsr, SegmentPtr *pPtr){
  return (pCsr->flags & CURSOR_READ_SEPARATORS)==0
      || (pPtr!=&pCsr->aPtr[pCsr->nPtr-1]);
}

static int segmentPtrAdvance(
  MultiCursor *pCsr, 
................................................................................
    Pgno iPg = (bLast ? pPtr->pSeg->iLast : pPtr->pSeg->iFirst);
    *pRc = lsmFsDbPageGet(pFS, iPg, &pNew);
    segmentPtrSetPage(pPtr, pNew);
  }
}










static int segmentPtrEnd2(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){
  int rc = LSM_OK;
  FileSystem *pFS = pCsr->pDb->pFS;
  int bIgnore;

  segmentPtrEndPage(pFS, pPtr, bLast, &rc);
  while( rc==LSM_OK && pPtr->pPg 
      && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
................................................................................
  if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
    rc = segmentPtrAdvance(pCsr, pPtr, bLast);
  }

  return rc;
}

/*
** Try to move the segment pointer passed as the second argument so that it
** points at either the first (bLast==0) or last (bLast==1) cell in the valid
** region of the segment defined by pPtr->iFirst and pPtr->iLast.
**
** Return LSM_OK if successful or an lsm error code if something goes
** wrong (IO error, OOM etc.).
*/
static void segmentPtrEnd(
  LevelCursor *pCsr,              /* Cursor that owns this segment-pointer */
  SegmentPtr *pPtr,               /* Segment pointer to reposition */
  int bLast,                      /* True for last, false for first */
  int *pRc                        /* IN/OUT error code */
){
#if 0
  if( *pRc==LSM_OK ){
    int rc = LSM_OK;

    segmentPtrEndPage(pCsr->pFS, pPtr, bLast, &rc);
    while( rc==LSM_OK 
        && pPtr->pPg 
        && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
    ){
      rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
    }
    if( rc==LSM_OK && pPtr->pPg ){
      rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
    }

    if( rc==LSM_OK && pPtr->pPg && (
          (pCsr->bIgnoreSeparators && rtIsSeparator(pPtr->eType))
       || (pCsr->bIgnoreSystem && rtTopic(pPtr->eType)!=0)
    )){
      rc = segmentPtrAdvance(pCsr, pPtr, bLast);
    }
    *pRc = rc;
  }
#endif
}

static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
  assert( pPtr->pPg );
  *ppKey = pPtr->pKey;
  *pnKey = pPtr->nKey;
}

static void segmentCursorKey(LevelCursor *pCsr, void **ppKey, int *pnKey){
  segmentPtrKey(&pCsr->aPtr[pCsr->iCurrentPtr], ppKey, pnKey);
}

static void segmentCursorValue(LevelCursor *pCsr, void **ppVal, int *pnVal){
  SegmentPtr *pPtr = &pCsr->aPtr[pCsr->iCurrentPtr];
  assert( pPtr->pPg );
  *ppVal = pPtr->pVal;
  *pnVal = pPtr->nVal;
}

static void segmentCursorType(LevelCursor *pCsr, int *peType){
  SegmentPtr *pPtr = &pCsr->aPtr[pCsr->iCurrentPtr];
  assert( pPtr->pPg );
  *peType = pPtr->eType;
}

static int segmentCursorValid(LevelCursor *pCsr){
  SegmentPtr *pPtr = &pCsr->aPtr[pCsr->iCurrentPtr];
  return (pPtr->pPg!=0);
}

#ifdef LSM_DEBUG

static char *keyToString(lsm_env *pEnv, void *pKey, int nKey){
  int i;
  u8 *aKey = (u8 *)pKey;
  char *zRet = (char *)lsmMalloc(pEnv, nKey+1);

................................................................................

/*
** Check that the page that pPtr currently has loaded is the correct page
** to search for key (pKey/nKey). If it is, return 1. Otherwise, an assert
** fails and this function does not return.
*/
static int assertKeyLocation(
  LevelCursor *pCsr, 
  SegmentPtr *pPtr, 
  void *pKey, int nKey
){
  lsm_env *pEnv = lsmFsEnv(pCsr->pFS);
  Blob blob = {0, 0, 0};
  int eDir;
  int iTopic = 0;                 /* TODO: Fix me */

  for(eDir=-1; eDir<=1; eDir+=2){
    Page *pTest = pPtr->pPg;

................................................................................
          u8 *pPgKey;
          int res;
          int iCell;

          iCell = ((eDir < 0) ? (nCell-1) : 0);
          pPgKey = pageGetKey(pTest, iCell, &iPgTopic, &nPgKey, &blob);
          res = iTopic - iPgTopic;
          if( res==0 ) res = pCsr->xCmp(pKey, nKey, pPgKey, nPgKey);
          if( (eDir==1 && res>0) || (eDir==-1 && res<0) ){
            /* Taking this branch means something has gone wrong. */
            char *zMsg = lsmMallocPrintf(pEnv, "Key \"%s\" is not on page %d", 
                keyToString(pEnv, pKey, nKey), lsmFsPageNumber(pPtr->pPg)
            );
            fprintf(stderr, "%s\n", zMsg);
            assert( !"assertKeyLocation() failed" );
................................................................................
  }

  assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  *piPtr = iPtrOut;
  return rc;
}

/*
** Helper function for segmentCursorSetCurrent(). See its header comment
** for details.
*/
static int segmentCursorPtrCmp(
  LevelCursor *pCsr,            /* Cursor context */
  int bLargest,                   /* True to identify the larger key */
  SegmentPtr *pLeft,
  SegmentPtr *pRight              
){
  int iRet;
  if( pLeft->pPg==0 ){
    iRet = 1;
  }else if( pRight->pPg==0 ){
    iRet = 0;
  }else{
    int res = rtTopic(pLeft->eType) - rtTopic(pRight->eType);
    if( res==0 ){
      res = pCsr->xCmp(pLeft->pKey, pLeft->nKey, pRight->pKey, pRight->nKey);
    }
    if( res==0 || (res<0 && bLargest==0) || (res>0 && bLargest) ){
      iRet = 0;
    }else{
      iRet = 1;
    }
  }
  return iRet;
}

static void segmentCursorSetCurrent(LevelCursor *pCsr, int bLargest){
  int iBest;
  int i;

  iBest = 0;
  for(i=1; i<pCsr->nPtr; i++){
    int res = segmentCursorPtrCmp(
        pCsr, bLargest, &pCsr->aPtr[iBest], &pCsr->aPtr[i]
    );
    if( res ) iBest = i;
  }

  pCsr->iCurrentPtr = iBest;
}

static int seekInBtree(
  MultiCursor *pCsr,              /* Multi-cursor object */
  Segment *pSeg,                  /* Seek within this segment */
  void *pKey, int nKey,           /* Key to seek to */
  Pgno *aPg,                      /* OUT: Page numbers */
  Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
){
................................................................................
    for(i=1; rc==LSM_OK && i<=nRhs; i++){
      iOut = 0;
      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut);
      iPtr = iOut;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd2(pCsr, &aPtr[0], 1);
    }
  }

  *piPgno = iOut;
  return rc;
}

................................................................................
  }

  pCsr->aTree[iOut] = iRes;
}

static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
  int rc;
  rc = segmentPtrEnd2(pCsr, pPtr, 0);
  while( pPtr->pPg && rc==LSM_OK ){
    int res = sortedKeyCompare(pCsr->pDb->xCmp,
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
    );
    if( res<=0 ) break;
    rc = segmentPtrAdvance(pCsr, pPtr, 0);
................................................................................
/*
** This function advances segment pointer iPtr belonging to multi-cursor
** pCsr forward (bReverse==0) or backward (bReverse!=0).
**
** If the segment pointer points to a segment that is part of a composite
** level, then the following special cases are handled.
**
** TODO: First point is not currently done.
**
**   * If iPtr is the lhs of a composite level, and the cursor is being
**     advanced forwards, and segment iPtr is at EOF, move all pointers
**     that correspond to rhs segments of the same level to the first
**     key in their respective data.
**
**   * If iPtr is on the rhs of a composite level, and the cursor is being
**     reversed, and the new key is smaller than the split-key, set the
................................................................................
  if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
   && bReverse==0                                /* csr advanced forwards */
   && pPtr->pPg==0                               /* segment at EOF */
  ){
    int i;

    for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
      SegmentPtr *pPtr = &pCsr->aPtr[iPtr+1+i];
      rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
    }

    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, 0);
    }
  }
................................................................................
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
    );
    if( res<0 ){
      segmentPtrReset(pPtr);
    }
  }

#if 0
  if( segmentCursorValid(pCsr)==0 
   && bReverse==0 
   && iCurrent==0 
   && pCsr->nPtr>1
   && pCsr->aPtr[1].pPg==0 
  ){
    Level *p = pCsr->pLevel;
    int i;
    for(i=1; i<pCsr->nPtr; i++){
      SegmentPtr *pPtr = &pCsr->aPtr[i];
      segmentPtrEnd(pCsr, pPtr, 0, &rc);

      /* If the segment-pointer now points to a key that is smaller than
      ** the split-key, advance it until it does not. Again, the danger
      ** is that if the current key is smaller than the split-key then 
      ** there may have existed a delete key on a page that has already 
      ** been gobbled. 
      **
      ** TODO: This might end up taking a long time. Is there some other
      ** way? Apart from searching the entire tree for pSplitkey to set 
      ** this levels segment-pointers?
      */
      while( pPtr->pPg ){
        int res = sortedKeyCompare(pCsr->xCmp,
            rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
            p->iSplitTopic, p->pSplitKey, p->nSplitKey
        );
        if( res>=0 ) break;
        segmentPtrAdvance(pCsr, pPtr, 0);
      }

    }
    segmentCursorSetCurrent(pCsr, bReverse);
  }
#endif

  return rc;
}

/*
** Move the cursor to point to either the first element (if bLast==0), or
** the last element (if bLast!=0) in the sorted file.
*/
static int segmentCursorEnd(LevelCursor *pCsr, int bLast){
  int rc = LSM_OK;

  segmentPtrEnd(pCsr, &pCsr->aPtr[0], bLast, &rc);
  if( bLast || pCsr->aPtr[0].pPg==0 ){
    int i;
    for(i=1; i<pCsr->nPtr; i++){
      SegmentPtr *pPtr = &pCsr->aPtr[i];
      segmentPtrEnd(pCsr, pPtr, bLast, &rc);
      if( rc==LSM_OK && bLast && pPtr->pPg ){
        Level *p = pCsr->pLevel;
        int res = sortedKeyCompare(pCsr->xCmp,
            rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
            p->iSplitTopic, p->pSplitKey, p->nSplitKey
        );
        if( res<0 ) segmentPtrReset(pPtr);
      }
    }
  }

  segmentCursorSetCurrent(pCsr, bLast);
  return rc;
}

static void mcursorFreeComponents(MultiCursor *pCsr){
  int i;
  lsm_env *pEnv = pCsr->pDb->pEnv;

................................................................................
    pCsr->flags |= CURSOR_AT_FREELIST;
  }

  for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
    SegmentPtr *pPtr = &pCsr->aPtr[i];
    Level *pLvl = pPtr->pLevel;

    rc = segmentPtrEnd2(pCsr, pPtr, bLast);
    if( rc==LSM_OK && bLast==0 && pLvl->nRight && pPtr->pSeg==&pLvl->lhs ){
      int iRhs;
      for(iRhs=1+i; rc==LSM_OK && iRhs<1+i+pLvl->nRight; iRhs++){
        SegmentPtr *pRhs = &pCsr->aPtr[iRhs];
        if( pPtr->pPg==0 ){
          rc = sortedRhsFirst(pCsr, pLvl, pRhs);
        }else{
................................................................................
    }
  }
  return LSM_OK;
}

int lsmInfoPageDump(lsm_db *pDb, Pgno iPg, int bHex, char **pzOut){
  int rc = LSM_OK;                /* Return code */
  Snapshot *pWorker;              /* Worker snapshot */
  Snapshot *pRelease = 0;         /* Snapshot to release */
  Page *pPg = 0;                  /* Handle for page iPg */
  int i, j;                       /* Loop counters */
  const int perLine = 16;         /* Bytes per line in the raw hex dump */
  int bEndWork = 0;

  *pzOut = 0;
  if( iPg==0 ) return LSM_ERROR;

  /* Obtain the worker snapshot */
#if 0
  pWorker = pDb->pWorker;
  if( !pWorker ){
    rc = lsmBeginWork(pDb);
    if( rc!=LSM_OK ) return rc;
    pWorker = pDb->pWorker;
    bEndWork = 1;
  }
#endif

  rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg);
  if( rc==LSM_OK ){
    Blob blob = {0, 0, 0, 0};
    int nKeyWidth = 0;
    LsmString str;
    int nRec;
    int iPtr;







<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







<
<
<
<


<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







>
>
>
>
>
>
>
>
|







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<






<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|



|







 







|







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|







 







|







 







<
<







 







<







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|







 







<
<



<




<
<
<
<
<
<
<
<
<
<
<







122
123
124
125
126
127
128

129
130
131
132
133
134
135
...
162
163
164
165
166
167
168





























169
170
171
172
173
174
175
...
222
223
224
225
226
227
228




229
230

231
232
233
234
235
236
237
....
1086
1087
1088
1089
1090
1091
1092































































1093
1094
1095
1096
1097
1098
1099
....
1100
1101
1102
1103
1104
1105
1106


















1107
1108
1109
1110
1111
1112
1113
....
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
....
1187
1188
1189
1190
1191
1192
1193








































1194
1195
1196
1197
1198
1199






















1200
1201
1202
1203
1204
1205
1206
....
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
....
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
....
1447
1448
1449
1450
1451
1452
1453












































1454
1455
1456
1457
1458
1459
1460
....
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
....
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
....
1743
1744
1745
1746
1747
1748
1749


1750
1751
1752
1753
1754
1755
1756
....
1773
1774
1775
1776
1777
1778
1779

1780
1781
1782
1783
1784
1785
1786
....
1791
1792
1793
1794
1795
1796
1797

































































1798
1799
1800
1801
1802
1803
1804
....
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
....
4434
4435
4436
4437
4438
4439
4440


4441
4442
4443

4444
4445
4446
4447











4448
4449
4450
4451
4452
4453
4454

#define SEGMENT_EOF(pgsz, nEntry) SEGMENT_CELLPTR_OFFSET(pgsz, nEntry)

#define SEGMENT_BTREE_FLAG     0x0001
#define PGFTR_SKIP_NEXT_FLAG   0x0002
#define PGFTR_SKIP_THIS_FLAG   0x0004


typedef struct SegmentPtr SegmentPtr;
typedef struct Blob Blob;

struct Blob {
  lsm_env *pEnv;
  void *pData;
  int nData;
................................................................................
  void *pVal; int nVal;         /* Current record value (eType==WRITE only) */

  /* Blobs used to allocate buffers for pKey and pVal as required */
  Blob blob1;
  Blob blob2;
};






























/*
** Used to iterate through the keys stored in a b-tree hierarchy from start
** to finish. Only First() and Next() operations are required.
**
**   btreeCursorNew()
**   btreeCursorFirst()
**   btreeCursorNext()
................................................................................

  int eType;                      /* Cache of current key type */
  Blob key;                       /* Cache of current key (or NULL) */
  Blob val;                       /* Cache of current value */

  /* All the component cursors: */
  TreeCursor *apTreeCsr[2];       /* Up to two tree cursors */




  SegmentPtr *aPtr;               /* Array of segment pointers */
  int nPtr;                       /* Size of array aPtr[] */

  BtreeCursor *pBtCsr;            /* b-tree cursor (db writes only) */

  /* Comparison results */
  int nTree;                      /* Size of aTree[] array */
  int *aTree;                     /* Array of comparison results */

  /* Used by cursors flushing the in-memory tree only */
................................................................................
    pLevel->nSplitKey = blob.nData;
    lsmFsPageRelease(pPg);
  }

  *pRc = rc;
}
































































static void segmentPtrReset(SegmentPtr *pPtr){
  lsmFsPageRelease(pPtr->pPg);
  pPtr->pPg = 0;
  pPtr->nCell = 0;
  pPtr->pKey = 0;
  pPtr->nKey = 0;
  pPtr->pVal = 0;
................................................................................
  pPtr->nVal = 0;
  pPtr->eType = 0;
  pPtr->iCell = 0;
  sortedBlobFree(&pPtr->blob1);
  sortedBlobFree(&pPtr->blob2);
}



















static int segmentPtrIgnoreSeparators(MultiCursor *pCsr, SegmentPtr *pPtr){
  return (pCsr->flags & CURSOR_READ_SEPARATORS)==0
      || (pPtr!=&pCsr->aPtr[pCsr->nPtr-1]);
}

static int segmentPtrAdvance(
  MultiCursor *pCsr, 
................................................................................
    Pgno iPg = (bLast ? pPtr->pSeg->iLast : pPtr->pSeg->iFirst);
    *pRc = lsmFsDbPageGet(pFS, iPg, &pNew);
    segmentPtrSetPage(pPtr, pNew);
  }
}


/*
** Try to move the segment pointer passed as the second argument so that it
** points at either the first (bLast==0) or last (bLast==1) cell in the valid
** region of the segment defined by pPtr->iFirst and pPtr->iLast.
**
** Return LSM_OK if successful or an lsm error code if something goes
** wrong (IO error, OOM etc.).
*/
static int segmentPtrEnd(MultiCursor *pCsr, SegmentPtr *pPtr, int bLast){
  int rc = LSM_OK;
  FileSystem *pFS = pCsr->pDb->pFS;
  int bIgnore;

  segmentPtrEndPage(pFS, pPtr, bLast, &rc);
  while( rc==LSM_OK && pPtr->pPg 
      && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
................................................................................
  if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
    rc = segmentPtrAdvance(pCsr, pPtr, bLast);
  }

  return rc;
}









































static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
  assert( pPtr->pPg );
  *ppKey = pPtr->pKey;
  *pnKey = pPtr->nKey;
}























#ifdef LSM_DEBUG

static char *keyToString(lsm_env *pEnv, void *pKey, int nKey){
  int i;
  u8 *aKey = (u8 *)pKey;
  char *zRet = (char *)lsmMalloc(pEnv, nKey+1);

................................................................................

/*
** Check that the page that pPtr currently has loaded is the correct page
** to search for key (pKey/nKey). If it is, return 1. Otherwise, an assert
** fails and this function does not return.
*/
static int assertKeyLocation(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr, 
  void *pKey, int nKey
){
  lsm_env *pEnv = lsmFsEnv(pCsr->pDb->pFS);
  Blob blob = {0, 0, 0};
  int eDir;
  int iTopic = 0;                 /* TODO: Fix me */

  for(eDir=-1; eDir<=1; eDir+=2){
    Page *pTest = pPtr->pPg;

................................................................................
          u8 *pPgKey;
          int res;
          int iCell;

          iCell = ((eDir < 0) ? (nCell-1) : 0);
          pPgKey = pageGetKey(pTest, iCell, &iPgTopic, &nPgKey, &blob);
          res = iTopic - iPgTopic;
          if( res==0 ) res = pCsr->pDb->xCmp(pKey, nKey, pPgKey, nPgKey);
          if( (eDir==1 && res>0) || (eDir==-1 && res<0) ){
            /* Taking this branch means something has gone wrong. */
            char *zMsg = lsmMallocPrintf(pEnv, "Key \"%s\" is not on page %d", 
                keyToString(pEnv, pKey, nKey), lsmFsPageNumber(pPtr->pPg)
            );
            fprintf(stderr, "%s\n", zMsg);
            assert( !"assertKeyLocation() failed" );
................................................................................
  }

  assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  *piPtr = iPtrOut;
  return rc;
}













































static int seekInBtree(
  MultiCursor *pCsr,              /* Multi-cursor object */
  Segment *pSeg,                  /* Seek within this segment */
  void *pKey, int nKey,           /* Key to seek to */
  Pgno *aPg,                      /* OUT: Page numbers */
  Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
){
................................................................................
    for(i=1; rc==LSM_OK && i<=nRhs; i++){
      iOut = 0;
      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut);
      iPtr = iOut;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }

  *piPgno = iOut;
  return rc;
}

................................................................................
  }

  pCsr->aTree[iOut] = iRes;
}

static int sortedRhsFirst(MultiCursor *pCsr, Level *pLvl, SegmentPtr *pPtr){
  int rc;
  rc = segmentPtrEnd(pCsr, pPtr, 0);
  while( pPtr->pPg && rc==LSM_OK ){
    int res = sortedKeyCompare(pCsr->pDb->xCmp,
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey,
        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey
    );
    if( res<=0 ) break;
    rc = segmentPtrAdvance(pCsr, pPtr, 0);
................................................................................
/*
** This function advances segment pointer iPtr belonging to multi-cursor
** pCsr forward (bReverse==0) or backward (bReverse!=0).
**
** If the segment pointer points to a segment that is part of a composite
** level, then the following special cases are handled.
**


**   * If iPtr is the lhs of a composite level, and the cursor is being
**     advanced forwards, and segment iPtr is at EOF, move all pointers
**     that correspond to rhs segments of the same level to the first
**     key in their respective data.
**
**   * If iPtr is on the rhs of a composite level, and the cursor is being
**     reversed, and the new key is smaller than the split-key, set the
................................................................................
  if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
   && bReverse==0                                /* csr advanced forwards */
   && pPtr->pPg==0                               /* segment at EOF */
  ){
    int i;

    for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){

      rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
    }

    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, 0);
    }
  }
................................................................................
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
    );
    if( res<0 ){
      segmentPtrReset(pPtr);
    }
  }


































































  return rc;
}

static void mcursorFreeComponents(MultiCursor *pCsr){
  int i;
  lsm_env *pEnv = pCsr->pDb->pEnv;

................................................................................
    pCsr->flags |= CURSOR_AT_FREELIST;
  }

  for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
    SegmentPtr *pPtr = &pCsr->aPtr[i];
    Level *pLvl = pPtr->pLevel;

    rc = segmentPtrEnd(pCsr, pPtr, bLast);
    if( rc==LSM_OK && bLast==0 && pLvl->nRight && pPtr->pSeg==&pLvl->lhs ){
      int iRhs;
      for(iRhs=1+i; rc==LSM_OK && iRhs<1+i+pLvl->nRight; iRhs++){
        SegmentPtr *pRhs = &pCsr->aPtr[iRhs];
        if( pPtr->pPg==0 ){
          rc = sortedRhsFirst(pCsr, pLvl, pRhs);
        }else{
................................................................................
    }
  }
  return LSM_OK;
}

int lsmInfoPageDump(lsm_db *pDb, Pgno iPg, int bHex, char **pzOut){
  int rc = LSM_OK;                /* Return code */


  Page *pPg = 0;                  /* Handle for page iPg */
  int i, j;                       /* Loop counters */
  const int perLine = 16;         /* Bytes per line in the raw hex dump */


  *pzOut = 0;
  if( iPg==0 ) return LSM_ERROR;












  rc = lsmFsDbPageGet(pDb->pFS, iPg, &pPg);
  if( rc==LSM_OK ){
    Blob blob = {0, 0, 0, 0};
    int nKeyWidth = 0;
    LsmString str;
    int nRec;
    int iPtr;

Changes to tool/lsmperf.tcl.

47
48
49
50
51
52
53
54
55
56
57
58
59
60

61



62
63
64
65
66
67
68
...
126
127
128
129
130
131
132
133
134
135
136
137
138
139

140
141
142

143
144
145
146


147
148
149
150


151
152
153
154
155

156
157





158
159
160
161
162
163
164
165

166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
  set script "
    set terminal pngcairo size 1000,400
    $script
  "
  exec gnuplot << $script > $png 2>/dev/null
}

proc make_totalset {res nWrite} {
  set ret ""
  set nMs 0
  set nIns 0
  foreach row $res {
    foreach {i msInsert msFetch} $row {}
    incr nIns $nWrite

    incr nMs $msInsert



    append ret "$nIns [expr $nIns*1000.0/$nMs]\n"
  }
  append ret "end\n"
  set ret
}

proc make_dataset {res iRes nWrite nShift nOp} {
................................................................................
    set xrange [0:$xmax]
    set key outside bottom
    $y2setup
    set label 1 "$labeltext" at screen 0.95,graph 1.0 right
  }]


  set cols [list {#B0C4DE #00008B} {#F08080 #8B0000}]
  set cols [lrange $cols 0 [expr ([llength $lSys]/2)-1]]

  set nShift [expr ($nWrite/2)]
  set plot1 ""
  set plot2 ""
  set plot3 ""

  set data1 ""
  set data2 ""
  set data3 ""


  foreach {name sys} $lSys res $lRes col $cols {
    foreach {c1 c2} $col {}



    if {$plot1 != ""} { set plot1 ", $plot1" }
    set plot1 "\"-\" ti \"$name writes/sec\" with boxes fs solid lc rgb \"$c1\"$plot1"
    set data1 "[make_dataset $res 0 $nWrite $nShift $nWrite] $data1"



    set plot3 ",\"-\" ti \"$name cumulative writes/sec\" with lines lc rgb \"$c2\" lw 2 $plot3"
    set data3 "[make_totalset $res $nWrite] $data3"

    if {$nFetch>0} {
      set new ", \"-\" ti \"$name fetches/sec\" axis x1y2 with points lw 3 lc rgb \"$c2\""

      set plot2 "$new $plot2"
      set data2 "[make_dataset $res 1 $nWrite $nWrite $nFetch] $data2"





    }

    incr nShift [expr $nWrite/4]
  }
  append script "plot $plot1 $plot2 $plot3\n"
  append script $data1
  append script $data2
  append script $data3


  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 200 50000 0 20 {
  lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0 nmerge=4"
  leveldb leveldb
}

# lsm-mt    "mmap=1 multi_proc=0 threads=2 autowork=0 autocheckpoint=8192000"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"







|





|
>
|
>
>
>







 







|






>



>


|

>
>




>
>

|


|
>


>
>
>
>
>




|



>





|
|
|







47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
...
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
  set script "
    set terminal pngcairo size 1000,400
    $script
  "
  exec gnuplot << $script > $png 2>/dev/null
}

proc make_totalset {res nOp bFetch} {
  set ret ""
  set nMs 0
  set nIns 0
  foreach row $res {
    foreach {i msInsert msFetch} $row {}
    incr nIns $nOp
    if {$bFetch==0} {
      incr nMs $msInsert
    } else {
      incr nMs $msFetch
    }
    append ret "$nIns [expr $nIns*1000.0/$nMs]\n"
  }
  append ret "end\n"
  set ret
}

proc make_dataset {res iRes nWrite nShift nOp} {
................................................................................
    set xrange [0:$xmax]
    set key outside bottom
    $y2setup
    set label 1 "$labeltext" at screen 0.95,graph 1.0 right
  }]


  set cols [list {#B0C4DE #00008B #000000} {#F08080 #8B0000 #FFA500}]
  set cols [lrange $cols 0 [expr ([llength $lSys]/2)-1]]

  set nShift [expr ($nWrite/2)]
  set plot1 ""
  set plot2 ""
  set plot3 ""
  set plot4 ""
  set data1 ""
  set data2 ""
  set data3 ""
  set data4 ""

  foreach {name sys} $lSys res $lRes col $cols {
    foreach {c1 c2 c3} $col {}

    # First plot. Writes per second (bar chart).
    #
    if {$plot1 != ""} { set plot1 ", $plot1" }
    set plot1 "\"-\" ti \"$name writes/sec\" with boxes fs solid lc rgb \"$c1\"$plot1"
    set data1 "[make_dataset $res 0 $nWrite $nShift $nWrite] $data1"

    # Third plot. Cumulative writes per second (line chart).
    #
    set plot3 ",\"-\" ti \"$name cumulative writes/sec\" with lines lc rgb \"$c2\" lw 2 $plot3"
    set data3 "[make_totalset $res $nWrite 0] $data3"

    if {$nFetch>0} {
      set    new ", \"-\" ti \"$name fetches/sec\" axis x1y2 "
      append new "with points lw 3 lc rgb \"$c2\""
      set plot2 "$new $plot2"
      set data2 "[make_dataset $res 1 $nWrite $nWrite $nFetch] $data2"

      set    new ",\"-\" ti \"$name cumulative fetches/sec\" "
      append new "with lines lc rgb \"$c3\" lw 2 "
      set plot4 "$new $plot4"
      set data4 "[make_totalset $res $nFetch 1] $data4"
    }

    incr nShift [expr $nWrite/4]
  }
  append script "plot $plot1 $plot2 $plot3 $plot4\n"
  append script $data1
  append script $data2
  append script $data3
  append script $data4

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 600 50000 50000 200 {
  lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
  LevelDB leveldb
}

# lsm-mt    "mmap=1 multi_proc=0 threads=2 autowork=0 autocheckpoint=8192000"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"