SQLite4
Check-in [1ff4639070]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fixes for range-delete and seek operations.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | range-delete
Files: files | file ages | folders
SHA1: 1ff4639070c68e643b86a89409f3294e79149089
User & Date: dan 2012-10-10 18:10:31
Context
2012-10-11
19:36
Fix cases involving iteration through split levels where the first part of a range-delete has been merged or annihilated but the second has not. check-in: 45d5b7570e user: dan tags: range-delete
2012-10-10
18:10
Fixes for range-delete and seek operations. check-in: 1ff4639070 user: dan tags: range-delete
2012-10-09
19:55
Fix further bugs in in-memory tree. Progress on writing range-deletes into the database file. check-in: 9081b1c92c user: dan tags: range-delete
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest.h.

118
119
120
121
122
123
124

125
126
127
128
129
130
131
void test_failed(void);

char *testMallocPrintf(const char *zFormat, ...);
char *testMallocVPrintf(const char *zFormat, va_list ap);
int testGlobMatch(const char *zPattern, const char *zStr);

void testScanCompare(TestDb *, TestDb *, int, void *, int, void *, int, int *);


void *testMalloc(int);
void *testMallocCopy(void *pCopy, int nByte);
void *testRealloc(void *, int);
void testFree(void *);

/* testio.c */







>







118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
void test_failed(void);

char *testMallocPrintf(const char *zFormat, ...);
char *testMallocVPrintf(const char *zFormat, va_list ap);
int testGlobMatch(const char *zPattern, const char *zStr);

void testScanCompare(TestDb *, TestDb *, int, void *, int, void *, int, int *);
void testFetchCompare(TestDb *, TestDb *, void *, int, int *);

void *testMalloc(int);
void *testMallocCopy(void *pCopy, int nByte);
void *testRealloc(void *, int);
void testFree(void *);

/* testio.c */

Changes to lsm-test/lsmtest1.c.

351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372














373
374
375
376
377
378
379
380








381
382
383
384
385
386
387
...
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
  Datasource *pData,
  int nData,
  int iSeed,
  TestDb *pControl,
  TestDb *pDb,
  int *pRc
){
  int iKey1;
  int iKey2;
  void *pKey1; int nKey1;       /* Start key */
  void *pKey2; int nKey2;       /* Final key */

  iKey1 = testPrngValue(iSeed) % nData;
  iKey2 = testPrngValue(iSeed+1) % nData;
  testDatasourceEntry(pData, iKey1, &pKey2, &nKey1, 0, 0);
  pKey1 = testMalloc(nKey1+1);
  memcpy(pKey1, pKey2, nKey1+1);
  testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0);

  testScanCompare(pControl, pDb, 0, 0, 0,         0, 0,         pRc);
  testScanCompare(pControl, pDb, 1, 0, 0,         0, 0,         pRc);















  testScanCompare(pControl, pDb, 0, 0, 0,         pKey2, nKey2, pRc);
  testScanCompare(pControl, pDb, 0, pKey1, nKey1, 0, 0,         pRc);
  testScanCompare(pControl, pDb, 0, pKey1, nKey1, pKey2, nKey2, pRc);
  testScanCompare(pControl, pDb, 1, 0, 0,         pKey2, nKey2, pRc);
  testScanCompare(pControl, pDb, 1, pKey1, nKey1, 0, 0,         pRc);
  testScanCompare(pControl, pDb, 1, pKey1, nKey1, pKey2, nKey2, pRc);

  testFree(pKey1);








}

static void doDataTest2(
  const char *zSystem,            /* Database system to test */
  Datatest2 *p,                   /* Structure containing test parameters */
  int *pRc                        /* OUT: Error code */
){
................................................................................
    pKey1 = testMallocCopy(pKey1, nKey1);
    testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0);

    testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc);
    testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc);
    testFree(pKey1);

if( 0 && i==4 ){
  extern int test_scan_debug;
  test_scan_debug = 1;
}
    testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc);
    testReopen(&pDb, &rc);
    testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc);

    /* Update the progress dots... */
    testCaseProgress(i, p->nIter, testCaseNDot(), &iDot);
  }







|
<
<
<
<
<
<
<
<
<
<




>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
|
|
|
<
|
>
>
>
>
>
>
>
>







 







<
<
<
<







351
352
353
354
355
356
357
358










359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382

383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
...
424
425
426
427
428
429
430




431
432
433
434
435
436
437
  Datasource *pData,
  int nData,
  int iSeed,
  TestDb *pControl,
  TestDb *pDb,
  int *pRc
){
  int i;











  testScanCompare(pControl, pDb, 0, 0, 0,         0, 0,         pRc);
  testScanCompare(pControl, pDb, 1, 0, 0,         0, 0,         pRc);

#if 0
  if( *pRc==0 ){
    int iKey1;
    int iKey2;
    void *pKey1; int nKey1;       /* Start key */
    void *pKey2; int nKey2;       /* Final key */

    iKey1 = testPrngValue(iSeed) % nData;
    iKey2 = testPrngValue(iSeed+1) % nData;
    testDatasourceEntry(pData, iKey1, &pKey2, &nKey1, 0, 0);
    pKey1 = testMalloc(nKey1+1);
    memcpy(pKey1, pKey2, nKey1+1);
    testDatasourceEntry(pData, iKey2, &pKey2, &nKey2, 0, 0);

    testScanCompare(pControl, pDb, 0, 0, 0,         pKey2, nKey2, pRc);
    testScanCompare(pControl, pDb, 0, pKey1, nKey1, 0, 0,         pRc);
    testScanCompare(pControl, pDb, 0, pKey1, nKey1, pKey2, nKey2, pRc);
    testScanCompare(pControl, pDb, 1, 0, 0,         pKey2, nKey2, pRc);
    testScanCompare(pControl, pDb, 1, pKey1, nKey1, 0, 0,         pRc);
    testScanCompare(pControl, pDb, 1, pKey1, nKey1, pKey2, nKey2, pRc);

    testFree(pKey1);
  }
#endif

  for(i=0; i<nData && *pRc==0; i++){
    void *pKey; int nKey;
    testDatasourceEntry(pData, i, &pKey, &nKey, 0, 0);
    testFetchCompare(pControl, pDb, pKey, nKey, pRc);
  }
}

static void doDataTest2(
  const char *zSystem,            /* Database system to test */
  Datatest2 *p,                   /* Structure containing test parameters */
  int *pRc                        /* OUT: Error code */
){
................................................................................
    pKey1 = testMallocCopy(pKey1, nKey1);
    testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0);

    testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc);
    testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc);
    testFree(pKey1);





    testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc);
    testReopen(&pDb, &rc);
    testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc);

    /* Update the progress dots... */
    testCaseProgress(i, p->nIter, testCaseNDot(), &iDot);
  }

Changes to lsm-test/lsmtest_main.c.

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149



150
151
152
153
154
155
156
  const char *zVal,               /* Value to write */
  int *pRc                        /* IN/OUT: Error code */
){
  int nVal = (zVal ? strlen(zVal) : 0);
  testFetch(pDb, (void *)zKey, strlen(zKey), (void *)zVal, nVal, pRc);
}

static void testFetchCompare(
  TestDb *pDb1, 
  TestDb *pDb2, 
  void *pKey, int nKey, 
  int *pRc
){
  int rc;
  void *pDbVal1;
  void *pDbVal2;
  int nDbVal1;
  int nDbVal2;




  rc = tdb_fetch(pDb1, pKey, nKey, &pDbVal1, &nDbVal1);
  testSetError(rc);

  rc = tdb_fetch(pDb2, pKey, nKey, &pDbVal2, &nDbVal2);
  testSetError(rc);








|










>
>
>







132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
  const char *zVal,               /* Value to write */
  int *pRc                        /* IN/OUT: Error code */
){
  int nVal = (zVal ? strlen(zVal) : 0);
  testFetch(pDb, (void *)zKey, strlen(zKey), (void *)zVal, nVal, pRc);
}

void testFetchCompare(
  TestDb *pDb1, 
  TestDb *pDb2, 
  void *pKey, int nKey, 
  int *pRc
){
  int rc;
  void *pDbVal1;
  void *pDbVal2;
  int nDbVal1;
  int nDbVal2;

  static int nCall = 0;
  nCall++;

  rc = tdb_fetch(pDb1, pKey, nKey, &pDbVal1, &nDbVal1);
  testSetError(rc);

  rc = tdb_fetch(pDb2, pKey, nKey, &pDbVal2, &nDbVal2);
  testSetError(rc);

Changes to src/lsm_sorted.c.

240
241
242
243
244
245
246





247
248
249
250
251
252
253
254

255
256
257
258
259
260
261
....
1272
1273
1274
1275
1276
1277
1278
1279

1280
1281
1282
1283
1284
1285
1286
....
1399
1400
1401
1402
1403
1404
1405
1406
















1407
1408

1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424

1425
1426
1427
1428
1429
1430
1431
....
1514
1515
1516
1517
1518
1519
1520
1521

1522
1523
1524
1525
1526
1527
1528
....
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547











1548
1549
1550
1551
1552
1553
1554
1555

1556

1557
1558
1559
1560

1561
1562
1563
1564
1565
1566
1567
....
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597

1598

1599
1600
1601
1602
1603
1604
1605
....
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
....
2288
2289
2290
2291
2292
2293
2294
2295

2296
2297
2298

2299

2300
2301
2302
2303
2304
2305
2306













2307

2308

2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321

2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343

2344
2345

2346
2347

2348
2349
2350
2351

2352
2353

2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380

2381
2382
2383
2384
2385


2386
2387
2388
2389
2390
2391
2392
2393
....
2493
2494
2495
2496
2497
2498
2499




2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523

2524
2525
2526
2527
2528
2529
2530
2531





2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544

2545
2546
2547
2548
2549
2550
2551
**
** CURSOR_PREV_OK
**   Set if it is Ok to call lsm_csr_prev().
**
** CURSOR_READ_SEPARATORS
**   Set if this cursor should visit the separator keys in segment 
**   aPtr[nPtr-1].





*/
#define CURSOR_IGNORE_DELETE    0x00000001
#define CURSOR_NEW_SYSTEM       0x00000002
#define CURSOR_AT_FREELIST      0x00000004
#define CURSOR_IGNORE_SYSTEM    0x00000010
#define CURSOR_NEXT_OK          0x00000020
#define CURSOR_PREV_OK          0x00000040
#define CURSOR_READ_SEPARATORS  0x00000080


typedef struct MergeWorker MergeWorker;
typedef struct Hierarchy Hierarchy;

struct Hierarchy {
  Page **apHier;
  int nHier;
................................................................................
#endif

int segmentPtrSeek(
  MultiCursor *pCsr,              /* Cursor context */
  SegmentPtr *pPtr,               /* Pointer to seek */
  void *pKey, int nKey,           /* Key to seek to */
  int eSeek,                      /* Search bias - see above */
  int *piPtr                      /* OUT: FC pointer */

){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  int iPtrOut = 0;
................................................................................
      assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
      if( res ){
        rc = segmentPtrLoadCell(pPtr, iMin);
      }

      if( rc==LSM_OK ){
        switch( eSeek ){
          case LSM_SEEK_EQ:
















            if( res!=0 || rtIsSeparator(pPtr->eType) ) segmentPtrReset(pPtr);
            break;

          case LSM_SEEK_LE:
            if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
            break;
          case LSM_SEEK_GE:
            if( res<0 ) rc = segmentPtrAdvance(pCsr, pPtr, 0);
            break;
        }
      }
    }

    /* If the cursor seek has found a separator key, and this cursor is
    ** supposed to ignore separators keys, advance to the next entry.  */
    if( rc==LSM_OK && pPtr->pPg 
     && segmentPtrIgnoreSeparators(pCsr, pPtr) 
     && rtIsSeparator(pPtr->eType)
    ){

      rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
    }
  }

  assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  *piPtr = iPtrOut;
  return rc;
................................................................................

static int seekInSegment(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr,
  void *pKey, int nKey,
  int iPg,                        /* Page to search */
  int eSeek,                      /* Search bias - see above */
  int *piPtr                      /* OUT: FC pointer */

){
  int iPtr = iPg;
  int rc = LSM_OK;

  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
................................................................................
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr);
  }
  return rc;
}

/*
** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].











*/
static int seekInLevel(
  MultiCursor *pCsr,              /* Sorted cursor object to seek */
  Level *pLvl,                    /* Level to seek within */
  SegmentPtr *aPtr,               /* Pointer to array of (nRight+1) SPs */
  int eSeek,                      /* Search bias - see above */
  void *pKey, int nKey,           /* Key to search for */
  Pgno *piPgno                    /* IN/OUT: fraction cascade pointer (or 0) */

){

  int rc = LSM_OK;                /* Return code */
  int iOut = 0;                   /* Pointer to return to caller */
  int res = -1;                   /* Result of xCmp(pKey, split) */
  int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */


  /* If this is a composite level (one currently undergoing an incremental
  ** merge), figure out if the search key is larger or smaller than the
  ** levels split-key.  */
  if( nRhs ){
    res = sortedKeyCompare(pCsr->pDb->xCmp, 0, pKey, nKey, 
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
................................................................................
  /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  ** is not a composite level and there is no split-key). Search the 
  ** left-hand-side of the level in this case.  */
  if( res<0 ){
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;

    rc = seekInSegment(pCsr, &aPtr[0], pKey, nKey, iPtr, eSeek, &iOut);
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      res = 0;
    }
  }
  
  if( res>=0 ){
    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs; i++){
      iOut = 0;
      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut);
      iPtr = iOut;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }


  *piPgno = iOut;

  return rc;
}

static void multiCursorGetKey(
  MultiCursor *pCsr, 
  int iKey,
  int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
................................................................................

static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
  int eType = pCsr->eType;
  int iKey;
  int i;
  int rdmask = 0;
  
  /* assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) ); */
  if( pCsr->flags & CURSOR_NEXT_OK ){
    rdmask = LSM_END_DELETE;
  }else if( pCsr->flags & CURSOR_PREV_OK ){
    rdmask = LSM_START_DELETE;
  }

  if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
    if( (eType & LSM_INSERT)==0 ) return 0;
  }
  if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
................................................................................
  lsmTreeCursorReset(pCsr->apTreeCsr[1]);
  for(i=0; i<pCsr->nPtr; i++){
    segmentPtrReset(&pCsr->aPtr[i]);
  }
  pCsr->key.nData = 0;
}

static void treeCursorSeek(

  TreeCursor *pTreeCsr, 
  void *pKey, int nKey, 
  int eSeek

){

  if( pTreeCsr ){
    int res = 0;
    lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
    switch( eSeek ){
      case LSM_SEEK_EQ:
        if( res!=0 ){
          lsmTreeCursorReset(pTreeCsr);













        }

        break;

      case LSM_SEEK_GE:
        if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
          lsmTreeCursorNext(pTreeCsr);
        }
        break;
      default:
        if( res>0 ){
          assert( lsmTreeCursorValid(pTreeCsr) );
          lsmTreeCursorPrev(pTreeCsr);
        }
        break;
    }
  }

}


/*
** Seek the cursor.
*/
int lsmMCursorSeek(MultiCursor *pCsr, void *pKey, int nKey, int eSeek){
  int eESeek = eSeek;             /* Effective eSeek parameter */
  int rc = LSM_OK;
  int iPtr = 0;
  Pgno iPgno = 0; 
  Level *pLvl;

  if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;
  assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );

  assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 );
  assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 );
  assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  treeCursorSeek(pCsr->apTreeCsr[0], pKey, nKey, eESeek);

  treeCursorSeek(pCsr->apTreeCsr[1], pKey, nKey, eESeek);


  /* Seek all segment pointers. */
  for(pLvl=pCsr->pDb->pClient->pLevel; rc==LSM_OK && pLvl; pLvl=pLvl->pNext){

    SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
    iPtr += (1+pLvl->nRight);
    assert( pPtr->pSeg==&pLvl->lhs );
    rc = seekInLevel(pCsr, pLvl, pPtr, eESeek, pKey, nKey, &iPgno);

  }


  if( rc==LSM_OK ){
    rc = multiCursorAllocTree(pCsr);
  }
  if( rc==LSM_OK ){
    int i;
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
    }
    if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
    if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
  }

  multiCursorCacheKey(pCsr, &rc);
  if( rc==LSM_OK && 0==mcursorLocationOk(pCsr, eSeek==LSM_SEEK_LEFAST) ){
    switch( eESeek ){
      case LSM_SEEK_EQ:
        lsmMCursorReset(pCsr);
        break;
      case LSM_SEEK_GE:
        rc = lsmMCursorNext(pCsr);
        break;
      default:
        rc = lsmMCursorPrev(pCsr);
        break;
    }
  }


  return rc;
}

int lsmMCursorValid(MultiCursor *pCsr){
  int res = 0;


  if( pCsr->aTree ){
    int iKey = pCsr->aTree[1];
    if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
      res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
    }else{
      void *pKey; 
      multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
      res = pKey!=0;
................................................................................

int lsmMCursorPrev(MultiCursor *pCsr){
  if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
  return multiCursorAdvance(pCsr, 1);
}

int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){




  int iKey = pCsr->aTree[1];

  if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
    TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
    lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey);
  }else{
    int nKey;

#ifndef NDEBUG
    void *pKey;
    int eType;
    multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
    assert( eType==pCsr->eType );
    assert( nKey==pCsr->key.nData );
    assert( memcmp(pKey, pCsr->key.pData, nKey)==0 );
#endif

    nKey = pCsr->key.nData;
    if( nKey==0 ){
      *ppKey = 0;
    }else{
      *ppKey = pCsr->key.pData;
    }
    *pnKey = nKey; 

  }
  return LSM_OK;
}

int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
  void *pVal;
  int nVal;
  int rc;






  assert( pCsr->aTree );
  assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) );

  rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
  if( pVal && rc==LSM_OK ){
    rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
    pVal = pCsr->val.pData;
  }

  if( rc!=LSM_OK ){
    pVal = 0;
    nVal = 0;

  }
  *ppVal = pVal;
  *pnVal = nVal;
  return rc;
}

int lsmMCursorType(MultiCursor *pCsr, int *peType){







>
>
>
>
>








>







 







|
>







 







|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|

>












|



>







 







|
>







 







|






>
>
>
>
>
>
>
>
>
>
>



<
|


|
>

>




>







 







|








|

|








>

>







 







|


|







 







|
>


|
>

>




|
<
|
>
>
>
>
>
>
>
>
>
>
>
>
>

>

>













>








|
|
|
|
|
|
|
|




|
|
>
|
|
>

<
>

<
|
|
>


>
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>





>
>
|







 







>
>
>
>
|

|
|
|
|
|


|
|
|
|
|
|


|
|
|
|
|
|
|
>








>
>
>
>
>

|
|

|
|
|
|
|

|
|
|
>







240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
....
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
....
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
....
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
....
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587

1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
....
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
....
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
....
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348

2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407

2408
2409

2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
....
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
**
** CURSOR_PREV_OK
**   Set if it is Ok to call lsm_csr_prev().
**
** CURSOR_READ_SEPARATORS
**   Set if this cursor should visit the separator keys in segment 
**   aPtr[nPtr-1].
**
** CURSOR_SEEK_EQ
**   Cursor has undergone a successful lsm_csr_seek(LSM_SEEK_EQ) operation.
**   The key and value are stored in MultiCursor.key and MultiCursor.val
**   respectively.
*/
#define CURSOR_IGNORE_DELETE    0x00000001
#define CURSOR_NEW_SYSTEM       0x00000002
#define CURSOR_AT_FREELIST      0x00000004
#define CURSOR_IGNORE_SYSTEM    0x00000010
#define CURSOR_NEXT_OK          0x00000020
#define CURSOR_PREV_OK          0x00000040
#define CURSOR_READ_SEPARATORS  0x00000080
#define CURSOR_SEEK_EQ          0x00000100

typedef struct MergeWorker MergeWorker;
typedef struct Hierarchy Hierarchy;

struct Hierarchy {
  Page **apHier;
  int nHier;
................................................................................
#endif

int segmentPtrSeek(
  MultiCursor *pCsr,              /* Cursor context */
  SegmentPtr *pPtr,               /* Pointer to seek */
  void *pKey, int nKey,           /* Key to seek to */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop
){
  int (*xCmp)(void *, int, void *, int) = pCsr->pDb->xCmp;
  int res;                        /* Result of comparison operation */
  int rc = LSM_OK;
  int iMin;
  int iMax;
  int iPtrOut = 0;
................................................................................
      assert( res==0 || (iMin==iMax && iMin>=0 && iMin<pPtr->nCell) );
      if( res ){
        rc = segmentPtrLoadCell(pPtr, iMin);
      }

      if( rc==LSM_OK ){
        switch( eSeek ){
          case LSM_SEEK_EQ: {
            int eType = pPtr->eType;
            if( (res<0 && (eType & LSM_START_DELETE))
             || (res>0 && (eType & LSM_END_DELETE))
             || (res==0 && (eType & LSM_POINT_DELETE))
            ){
              *pbStop = 1;
            }else if( res==0 && (eType & LSM_INSERT) ){
              lsm_env *pEnv = pCsr->pDb->pEnv;
              *pbStop = 1;
              pCsr->eType = pPtr->eType;
              rc = sortedBlobSet(pEnv, &pCsr->key, pPtr->pKey, pPtr->nKey);
              if( rc==LSM_OK ){
                rc = sortedBlobSet(pEnv, &pCsr->val, pPtr->pVal, pPtr->nVal);
              }
              pCsr->flags |= CURSOR_SEEK_EQ;
            }
            segmentPtrReset(pPtr);
            break;
          }
          case LSM_SEEK_LE:
            if( res>0 ) rc = segmentPtrAdvance(pCsr, pPtr, 1);
            break;
          case LSM_SEEK_GE:
            if( res<0 ) rc = segmentPtrAdvance(pCsr, pPtr, 0);
            break;
        }
      }
    }

    /* If the cursor seek has found a separator key, and this cursor is
    ** supposed to ignore separators keys, advance to the next entry.  */
    if( rc==LSM_OK && pPtr->pPg
     && segmentPtrIgnoreSeparators(pCsr, pPtr) 
     && rtIsSeparator(pPtr->eType)
    ){
      assert( eSeek!=LSM_SEEK_EQ );
      rc = segmentPtrAdvance(pCsr, pPtr, eSeek==LSM_SEEK_LE);
    }
  }

  assert( rc!=LSM_OK || assertSeekResult(pCsr,pPtr,iTopic,pKey,nKey,eSeek) );
  *piPtr = iPtrOut;
  return rc;
................................................................................

static int seekInSegment(
  MultiCursor *pCsr, 
  SegmentPtr *pPtr,
  void *pKey, int nKey,
  int iPg,                        /* Page to search */
  int eSeek,                      /* Search bias - see above */
  int *piPtr,                     /* OUT: FC pointer */
  int *pbStop                     /* OUT: Stop search flag */
){
  int iPtr = iPg;
  int rc = LSM_OK;

  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
................................................................................
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pDb->pFS, pPtr, iPtr);
    }
  }

  if( rc==LSM_OK ){
    rc = segmentPtrSeek(pCsr, pPtr, pKey, nKey, eSeek, piPtr, pbStop);
  }
  return rc;
}

/*
** Seek each segment pointer in the array of (pLvl->nRight+1) at aPtr[].
**
** pbStop:
**   This parameter is only significant if parameter eSeek is set to
**   LSM_SEEK_EQ. In this case, it is set to true before returning if
**   the seek operation is finished. This can happen in two ways:
**   
**     a) A key matching (pKey/nKey) is found, or
**     b) A point-delete or range-delete deleting the key is found.
**
**   In case (a), the multi-cursor CURSOR_SEEK_EQ flag is set and the pCsr->key
**   and pCsr->val blobs populated before returning.
*/
static int seekInLevel(
  MultiCursor *pCsr,              /* Sorted cursor object to seek */

  SegmentPtr *aPtr,               /* Pointer to array of (nRhs+1) SPs */
  int eSeek,                      /* Search bias - see above */
  void *pKey, int nKey,           /* Key to search for */
  Pgno *piPgno,                   /* IN/OUT: fraction cascade pointer (or 0) */
  int *pbStop                     /* OUT: See above */
){
  Level *pLvl = aPtr[0].pLevel;   /* Level to seek within */
  int rc = LSM_OK;                /* Return code */
  int iOut = 0;                   /* Pointer to return to caller */
  int res = -1;                   /* Result of xCmp(pKey, split) */
  int nRhs = pLvl->nRight;        /* Number of right-hand-side segments */
  int bStop = 0;

  /* If this is a composite level (one currently undergoing an incremental
  ** merge), figure out if the search key is larger or smaller than the
  ** levels split-key.  */
  if( nRhs ){
    res = sortedKeyCompare(pCsr->pDb->xCmp, 0, pKey, nKey, 
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
................................................................................
  /* If (res<0), then key pKey/nKey is smaller than the split-key (or this
  ** is not a composite level and there is no split-key). Search the 
  ** left-hand-side of the level in this case.  */
  if( res<0 ){
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;

    rc = seekInSegment(pCsr, &aPtr[0], pKey, nKey, iPtr, eSeek, &iOut, &bStop);
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      res = 0;
    }
  }
  
  if( res>=0 ){
    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
      iOut = 0;
      rc = seekInSegment(pCsr, &aPtr[i], pKey, nKey, iPtr, eSeek, &iOut,&bStop);
      iPtr = iOut;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }

  assert( eSeek==LSM_SEEK_EQ || bStop==0 );
  *piPgno = iOut;
  *pbStop = bStop;
  return rc;
}

static void multiCursorGetKey(
  MultiCursor *pCsr, 
  int iKey,
  int *peType,                    /* OUT: Key type (SORTED_WRITE etc.) */
................................................................................

static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
  int eType = pCsr->eType;
  int iKey;
  int i;
  int rdmask = 0;
  
  assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
  if( pCsr->flags & CURSOR_NEXT_OK ){
    rdmask = LSM_END_DELETE;
  }else{
    rdmask = LSM_START_DELETE;
  }

  if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
    if( (eType & LSM_INSERT)==0 ) return 0;
  }
  if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
................................................................................
  lsmTreeCursorReset(pCsr->apTreeCsr[1]);
  for(i=0; i<pCsr->nPtr; i++){
    segmentPtrReset(&pCsr->aPtr[i]);
  }
  pCsr->key.nData = 0;
}

static int treeCursorSeek(
  MultiCursor *pCsr,
  TreeCursor *pTreeCsr, 
  void *pKey, int nKey, 
  int eSeek,
  int *pbStop
){
  int rc = LSM_OK;
  if( pTreeCsr ){
    int res = 0;
    lsmTreeCursorSeek(pTreeCsr, pKey, nKey, &res);
    switch( eSeek ){
      case LSM_SEEK_EQ: {

        int eType = lsmTreeCursorFlags(pTreeCsr);
        if( (res<0 && (eType & LSM_START_DELETE))
         || (res>0 && (eType & LSM_END_DELETE))
         || (res==0 && (eType & LSM_POINT_DELETE))
        ){
          *pbStop = 1;
        }else if( res==0 && (eType & LSM_INSERT) ){
          void *p; int n;         /* Key/value from tree-cursor */
          *pbStop = 1;
          pCsr->flags |= CURSOR_SEEK_EQ;
          rc = lsmTreeCursorKey(pTreeCsr, &pCsr->eType, &p, &n);
          if( rc==LSM_OK ) sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, p, n);
          if( rc==LSM_OK ) rc = lsmTreeCursorValue(pTreeCsr, &p, &n);
          if( rc==LSM_OK ) sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, p, n);
        }
        lsmTreeCursorReset(pTreeCsr);
        break;
      }
      case LSM_SEEK_GE:
        if( res<0 && lsmTreeCursorValid(pTreeCsr) ){
          lsmTreeCursorNext(pTreeCsr);
        }
        break;
      default:
        if( res>0 ){
          assert( lsmTreeCursorValid(pTreeCsr) );
          lsmTreeCursorPrev(pTreeCsr);
        }
        break;
    }
  }
  return rc;
}


/*
** Seek the cursor.
*/
int lsmMCursorSeek(MultiCursor *pCsr, void *pKey, int nKey, int eSeek){
  int eESeek = eSeek;             /* Effective eSeek parameter */
  int bStop = 0;                  /* Set to true to halt search operation */
  int rc = LSM_OK;                /* Return code */
  int iPtr = 0;                   /* Used to iterate through pCsr->aPtr[] */
  Pgno iPgno = 0;                 /* FC pointer value */

  if( eESeek==LSM_SEEK_LEFAST ) eESeek = LSM_SEEK_LE;

  assert( eESeek==LSM_SEEK_EQ || eESeek==LSM_SEEK_LE || eESeek==LSM_SEEK_GE );
  assert( (pCsr->flags & CURSOR_NEW_SYSTEM)==0 );
  assert( (pCsr->flags & CURSOR_AT_FREELIST)==0 );
  assert( pCsr->nPtr==0 || pCsr->aPtr[0].pLevel );

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK | CURSOR_SEEK_EQ);
  rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[0], pKey, nKey, eESeek, &bStop);
  if( rc==LSM_OK && bStop==0 ){
    rc = treeCursorSeek(pCsr, pCsr->apTreeCsr[1], pKey, nKey, eESeek, &bStop);
  }

  /* Seek all segment pointers. */

  for(iPtr=0; iPtr<pCsr->nPtr && rc==LSM_OK && bStop==0; iPtr++){
    SegmentPtr *pPtr = &pCsr->aPtr[iPtr];

    assert( pPtr->pSeg==&pPtr->pLevel->lhs );
    rc = seekInLevel(pCsr, pPtr, eESeek, pKey, nKey, &iPgno, &bStop);
    iPtr += pPtr->pLevel->nRight;
  }

  if( eSeek!=LSM_SEEK_EQ ){
    if( rc==LSM_OK ){
      rc = multiCursorAllocTree(pCsr);
    }
    if( rc==LSM_OK ){
      int i;
      for(i=pCsr->nTree-1; i>0; i--){
        multiCursorDoCompare(pCsr, i, eESeek==LSM_SEEK_LE);
      }
      if( eSeek==LSM_SEEK_GE ) pCsr->flags |= CURSOR_NEXT_OK;
      if( eSeek==LSM_SEEK_LE ) pCsr->flags |= CURSOR_PREV_OK;
    }

    multiCursorCacheKey(pCsr, &rc);
    if( rc==LSM_OK && 0==mcursorLocationOk(pCsr, eSeek==LSM_SEEK_LEFAST) ){
      switch( eESeek ){
        case LSM_SEEK_EQ:
          lsmMCursorReset(pCsr);
          break;
        case LSM_SEEK_GE:
          rc = lsmMCursorNext(pCsr);
          break;
        default:
          rc = lsmMCursorPrev(pCsr);
          break;
      }
    }
  }

  return rc;
}

int lsmMCursorValid(MultiCursor *pCsr){
  int res = 0;
  if( pCsr->flags & CURSOR_SEEK_EQ ){
    res = 1;
  }else if( pCsr->aTree ){
    int iKey = pCsr->aTree[1];
    if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
      res = lsmTreeCursorValid(pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0]);
    }else{
      void *pKey; 
      multiCursorGetKey(pCsr, iKey, 0, &pKey, 0);
      res = pKey!=0;
................................................................................

int lsmMCursorPrev(MultiCursor *pCsr){
  if( (pCsr->flags & CURSOR_PREV_OK)==0 ) return LSM_MISUSE_BKPT;
  return multiCursorAdvance(pCsr, 1);
}

int lsmMCursorKey(MultiCursor *pCsr, void **ppKey, int *pnKey){
  if( pCsr->flags & CURSOR_SEEK_EQ ){
    *pnKey = pCsr->key.nData;
    *ppKey = pCsr->key.pData;
  }else{
    int iKey = pCsr->aTree[1];

    if( iKey==CURSOR_DATA_TREE0 || iKey==CURSOR_DATA_TREE1 ){
      TreeCursor *pTreeCsr = pCsr->apTreeCsr[iKey-CURSOR_DATA_TREE0];
      lsmTreeCursorKey(pTreeCsr, 0, ppKey, pnKey);
    }else{
      int nKey;

#ifndef NDEBUG
      void *pKey;
      int eType;
      multiCursorGetKey(pCsr, iKey, &eType, &pKey, &nKey);
      assert( eType==pCsr->eType );
      assert( nKey==pCsr->key.nData );
      assert( memcmp(pKey, pCsr->key.pData, nKey)==0 );
#endif

      nKey = pCsr->key.nData;
      if( nKey==0 ){
        *ppKey = 0;
      }else{
        *ppKey = pCsr->key.pData;
      }
      *pnKey = nKey; 
    }
  }
  return LSM_OK;
}

int lsmMCursorValue(MultiCursor *pCsr, void **ppVal, int *pnVal){
  void *pVal;
  int nVal;
  int rc;
  if( pCsr->flags & CURSOR_SEEK_EQ ){
    rc = LSM_OK;
    nVal = pCsr->val.nData;
    pVal = pCsr->val.pData;
  }else{

    assert( pCsr->aTree );
    assert( mcursorLocationOk(pCsr, (pCsr->flags & CURSOR_IGNORE_DELETE)) );

    rc = multiCursorGetVal(pCsr, pCsr->aTree[1], &pVal, &nVal);
    if( pVal && rc==LSM_OK ){
      rc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->val, pVal, nVal);
      pVal = pCsr->val.pData;
    }

    if( rc!=LSM_OK ){
      pVal = 0;
      nVal = 0;
    }
  }
  *ppVal = pVal;
  *pnVal = nVal;
  return rc;
}

int lsmMCursorType(MultiCursor *pCsr, int *peType){