SQLite4
Check-in [96badcb933]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a problem in the range-delete code.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | block-redirects
Files: files | file ages | folders
SHA1: 96badcb933a2b9433f93e2cd1a1892dda5f74182
User & Date: dan 2013-01-25 20:46:02
Context
2013-01-26
16:44
Fix a problem to do with an lsm_seek(LSM_SEEK_LE) on a level that is currently undergoing a merge. Add more complex assert statements to catch any similar problems. check-in: ca4dc40190 user: dan tags: block-redirects
2013-01-25
20:46
Fix a problem in the range-delete code. check-in: 96badcb933 user: dan tags: block-redirects
2013-01-24
16:45
Improve the integrity-check assert that ensures all blocks in the file are accounted for. Roll any in-memory free-list records into the end of the main segment when executing lsm_work(nmerge=1). check-in: e5edba9caa user: dan tags: block-redirects
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_tdb3.c.

984
985
986
987
988
989
990

991
992
993
994
995
996
997
998
999
1000
}

int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){

  const char *zCfg = 
    "page_size=256 block_size=65536 autoflush=16384 "
    "max_freelist=4 autocheckpoint=32768 "
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_zip_open(
  const char *zFilename, 







>


|







984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
}

int test_lsm_lomem_open(
  const char *zFilename, 
  int bClear, 
  TestDb **ppDb
){
    /* "max_freelist=4 autocheckpoint=32768 " */
  const char *zCfg = 
    "page_size=256 block_size=65536 autoflush=16384 "
    "autocheckpoint=32768 "
    "mmap=0 "
  ;
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}

int test_lsm_zip_open(
  const char *zFilename, 

Changes to src/lsm_file.c.

2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674

      assert( 0==fsSegmentRedirects(pFS, pSeg) );
      iBlk = iFirstBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);

      bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg);
      assert( iBlk>0 );


      do {
        /* iBlk is a part of this sorted run. */
        aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;

        /* If the first page of this block is also part of the segment,
        ** set the flag to indicate that the first page of iBlk is in use.  







<







2660
2661
2662
2663
2664
2665
2666

2667
2668
2669
2670
2671
2672
2673

      assert( 0==fsSegmentRedirects(pFS, pSeg) );
      iBlk = iFirstBlk = fsPageToBlock(pFS, pSeg->iFirst);
      iLastBlk = fsPageToBlock(pFS, pSeg->iLastPg);

      bLastIsLastOnBlock = (fsLastPageOnBlock(pFS, iLastBlk)==pSeg->iLastPg);
      assert( iBlk>0 );


      do {
        /* iBlk is a part of this sorted run. */
        aUsed[iBlk-1] |= INTEGRITY_CHECK_USED;

        /* If the first page of this block is also part of the segment,
        ** set the flag to indicate that the first page of iBlk is in use.  

Changes to src/lsm_sorted.c.

1202
1203
1204
1205
1206
1207
1208

1209
1210
1211
1212

1213
1214
1215
1216
1217
1218
1219
1220
....
1262
1263
1264
1265
1266
1267
1268







1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287

1288
1289
1290
1291
1292
1293
1294
....
1903
1904
1905
1906
1907
1908
1909

1910
1911
1912
1913
1914

1915
1916
1917
1918
1919
1920
1921
1922

1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
....
2128
2129
2130
2131
2132
2133
2134
2135
2136

2137
2138

2139








2140





















2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152

2153
2154
2155
2156
2157
2158
2159
....
2569
2570
2571
2572
2573
2574
2575






























2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588

2589



2590
2591
2592



2593
2594
2595
2596














2597



























2598
2599
2600
2601
2602
2603
2604
....
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626


2627


2628
2629


2630
2631
2632
2633
2634
2635
2636


2637
2638

2639
2640
2641

2642
2643
2644
2645
2646
2647
2648

2649


2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661


2662






2663




2664
2665
2666
2667




2668
2669




2670
2671

2672
2673

2674
2675
2676
2677

2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
....
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
....
2973
2974
2975
2976
2977
2978
2979


2980
2981
2982
2983
2984
2985
2986
....
3011
3012
3013
3014
3015
3016
3017

3018
3019
3020
3021
3022
3023
3024
....
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
....
4199
4200
4201
4202
4203
4204
4205
4206
4207
4208
4209
4210
4211
4212
4213
....
4341
4342
4343
4344
4345
4346
4347

4348
4349
4350
4351
4352
4353
4354
....
4614
4615
4616
4617
4618
4619
4620
4621
4622


4623
4624
4625
4626
4627































































4628
4629
4630
4631
4632
4633
4634
....
4672
4673
4674
4675
4676
4677
4678





4679
4680
4681
4682
4683
4684


4685















4686

4687
4688
4689
4690

4691
4692
4693
4694
4695
4696
4697
....
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
....
4842
4843
4844
4845
4846
4847
4848
4849
4850
4851
4852
4853
4854
4855
4856
....
5563
5564
5565
5566
5567
5568
5569


5570
5571
5572
5573
5574
5575
5576
5577
5578
5579
5580
5581


5582
5583
5584
5585
5586
5587
5588
....
5645
5646
5647
5648
5649
5650
5651




5652
5653
5654
5655
5656
5657
5658
    if( svFlags && pPtr->pPg ){
      int res = sortedKeyCompare(pCsr->pDb->xCmp,
          rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
          pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
      );
      if( res<0 ) segmentPtrReset(pPtr);
    }

    if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
      Segment *pSeg = pPtr->pSeg;
      rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, pSeg->iFirst, &pPtr->pPg);
      if( rc!=LSM_OK ) return rc;

      pPtr->eType = LSM_START_DELETE | (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
      pPtr->pKey = pLvl->pSplitKey;
      pPtr->nKey = pLvl->nSplitKey;
    }

  }while( pCsr 
       && pPtr->pPg 
       && segmentPtrIgnoreSeparators(pCsr, pPtr)
................................................................................
      && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
  ){
    rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
  }

  if( rc==LSM_OK && pPtr->pPg ){
    rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);







  }
  
  bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
  if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
    rc = segmentPtrAdvance(pCsr, pPtr, bLast);
  }


  if( bLast && rc==LSM_OK && pPtr->pPg
   && pPtr->pSeg==&pLvl->lhs 
   && pLvl->nRight && (pPtr->eType & LSM_START_DELETE)
  ){
    pPtr->iCell++;
    pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic);
    pPtr->pKey = pLvl->pSplitKey;
    pPtr->nKey = pLvl->nSplitKey;
    pPtr->pVal = 0;
    pPtr->nVal = 0;
  }


  return rc;
}

static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
  assert( pPtr->pPg );
  *ppKey = pPtr->pKey;
................................................................................
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;

    rc = seekInSegment(
        pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
    );
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){

      res = 0;
    }
  }
  
  if( res>=0 ){

    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
      iOut = 0;
      rc = seekInSegment(
          pCsr, &aPtr[i], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
      );
      iPtr = iOut;

    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }

  assert( eSeek==LSM_SEEK_EQ || bStop==0 );
  *piPgno = iOut;
  *pbStop = bStop;
................................................................................
  MultiCursor *pCsr, 
  int iPtr,
  int bReverse
){
  int rc;
  SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
  Level *pLvl = pPtr->pLevel;
  int bComposite;


  rc = segmentPtrAdvance(pCsr, pPtr, bReverse);
  if( rc!=LSM_OK ) return rc;

  bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);






























  if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
   && bReverse==0                                /* csr advanced forwards */
   && pPtr->pPg==0                               /* segment at EOF */
  ){
    int i;
    for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
      rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
    }
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, 0);
    }
  }


  return rc;
}

static void mcursorFreeComponents(MultiCursor *pCsr){
  int i;
  lsm_env *pEnv = pCsr->pDb->pEnv;
................................................................................
  if( *pRc==LSM_OK ){
    void *pKey;
    int nKey;
    multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
    *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
  }
}































static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
  int eType = pCsr->eType;
  int iKey;
  int i;
  int rdmask = 0;
  
  assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
  if( pCsr->flags & CURSOR_NEXT_OK ){
    rdmask = LSM_END_DELETE;
  }else{
    rdmask = LSM_START_DELETE;
  }





  if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
    if( (eType & LSM_INSERT)==0 ) return 0;
  }



  if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
    return 0;
  }















  /* Check if this key has already been deleted by a range-delete */



























  iKey = pCsr->aTree[1];
  for(i=0; i<iKey; i++){
    int csrflags;
    multiCursorGetKey(pCsr, i, &csrflags, 0, 0);
    if( (rdmask & csrflags) ){
      const int SD_ED = (LSM_START_DELETE|LSM_END_DELETE);
      if( (csrflags & SD_ED)==SD_ED 
................................................................................
          continue;
        }
      }
      return 0;
    }
  }

#if 0
  if( (iKey>0 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[0]))) 
   || (iKey>1 && (rdmask & lsmTreeCursorFlags(pCsr->apTreeCsr[1]))) 
  ){
    return 0;
  }
  if( iKey>CURSOR_DATA_SYSTEM && (pCsr->flags & CURSOR_FLUSH_FREELIST) ){


    int eType;


    multiCursorGetKey(pCsr, CURSOR_DATA_SYSTEM, &eType, 0, 0);
    if( rdmask & eType ) return 0;


  }

  for(i=CURSOR_DATA_SEGMENT; i<iKey; i++){
    int iPtr = i-CURSOR_DATA_SEGMENT;
    if( pCsr->aPtr[iPtr].pPg && (pCsr->aPtr[iPtr].eType & rdmask) ){
      return 0;
    }


  }
#endif


  return 1;
}


static int multiCursorEnd(MultiCursor *pCsr, int bLast){
  int rc = LSM_OK;
  int i;

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);




  if( pCsr->apTreeCsr[0] ){
    rc = lsmTreeCursorEnd(pCsr->apTreeCsr[0], bLast);
  }
  if( rc==LSM_OK && pCsr->apTreeCsr[1] ){
    rc = lsmTreeCursorEnd(pCsr->apTreeCsr[1], bLast);
  }

  pCsr->iFree = 0;

  for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
    SegmentPtr *pPtr = &pCsr->aPtr[i];
    Level *pLvl = pPtr->pLevel;









    rc = segmentPtrEnd(pCsr, pPtr, bLast);




    if( rc==LSM_OK && bLast==0 && pLvl->nRight && pPtr->pSeg==&pLvl->lhs ){
      int iRhs;
      for(iRhs=1+i; rc==LSM_OK && iRhs<1+i+pLvl->nRight; iRhs++){
        SegmentPtr *pRhs = &pCsr->aPtr[iRhs];




        if( pPtr->pPg==0 ){
          rc = sortedRhsFirst(pCsr, pLvl, pRhs);




        }else{
          segmentPtrReset(pRhs);

        }
      }

      i += pLvl->nRight;
    }
  }


  if( rc==LSM_OK && pCsr->pBtCsr ){
    assert( bLast==0 );
    rc = btreeCursorFirst(pCsr->pBtCsr);
  }

  if( rc==LSM_OK ){
    rc = multiCursorAllocTree(pCsr);
  }
  if( rc==LSM_OK ){
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, bLast);
    }
  }

  multiCursorCacheKey(pCsr, &rc);
  if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
    if( bLast ){
      rc = lsmMCursorPrev(pCsr);
    }else{
      rc = lsmMCursorNext(pCsr);
    }
  }

  return rc;
}


int mcursorSave(MultiCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->aTree ){
................................................................................

    /* If this cursor is configured to skip deleted keys, and the current
    ** cursor points to a SORTED_DELETE entry, then the cursor has not been 
    ** successfully advanced.  
    **
    ** Similarly, if the cursor is configured to skip system keys and the
    ** current cursor points to a system key, it has not yet been advanced.
     */
    if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
  }
  return 1;
}

static void flCsrAdvance(MultiCursor *pCsr){
  assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
................................................................................
}

static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
  int rc = LSM_OK;                /* Return Code */
  if( lsmMCursorValid(pCsr) ){
    do {
      int iKey = pCsr->aTree[1];



      /* If this multi-cursor is advancing forwards, and the sub-cursor
      ** being advanced is the one that separator keys may be being read
      ** from, record the current absolute pointer value.  */
      if( pCsr->pPrevMergePtr ){
        if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
          assert( pCsr->pBtCsr );
................................................................................
        rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
      }
      if( rc==LSM_OK ){
        int i;
        for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
          multiCursorDoCompare(pCsr, i, bReverse);
        }

      }
    }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
  }
  return rc;
}

int lsmMCursorNext(MultiCursor *pCsr){
................................................................................
    }
  }

  return rc;
}


static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
  int rc;

  rc = multiCursorAllocTree(pCsr);
  if( rc==LSM_OK ){
    int i;
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, bRev);
    }
  }

  multiCursorCacheKey(pCsr, &rc);
  return rc;
}

/*
** Free all resources allocated by mergeWorkerInit().
*/
static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){
  int i;                          /* Iterator variable */
  int rc = *pRc;
  MultiCursor *pCsr = pMW->pCsr;
................................................................................
    }else if( pDel ){
      assert( pNew->pNext==pDel );
      pNew->pNext = pDel->pNext;
      lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
      sortedFreeLevel(pDb->pEnv, pDel);
    }

#if 1
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
      lsmFree(pDb->pEnv, p->aEntry);
      memcpy(p, &freelist, sizeof(freelist));
................................................................................
  **      separators attached to the LHS of the following level, or neither.
  **
  ** If the new level is the lowest (oldest) in the db, discard any
  ** delete keys. Key annihilation.
  */
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){

    rc = multiCursorAddRhs(pCsr, pLevel);
  }
  if( rc==LSM_OK && pMerge->nInput > pLevel->nRight ){
    rc = btreeCursorNew(pDb, &pNext->lhs, &pCsr->pBtCsr);
  }else if( pNext ){
    multiCursorReadSeparators(pCsr);
  }else{
................................................................................
      rc = lsmBlockFree(pDb, iFrom);

      *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
      pLvl->lhs.pRedirect = &p->redirect;
    }
  }

#if 1
  if( rc==LSM_OK ){


    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "move-block");
  }
#endif
  return rc;
}
































































static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int nMerge,                     /* Try to merge this many levels at once */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
................................................................................
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && (mergeworker.nWork<nRemaining || pDb->bUseFreelist)
      ){
        int eType = rtTopic(mergeworker.pCsr->eType);
        rc = mergeWorkerStep(&mergeworker);






        if( rc==LSM_OK 
         && nMerge==1 
         && mergeworker.pLevel==pWorker->pLevel
         && eType==0
         && (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker))
        ){


          assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );















          rc = multiCursorVisitFreelist(mergeworker.pCsr);

          if( rc==LSM_OK ){
            rc = multiCursorSetupTree(mergeworker.pCsr, 0);
            pDb->pFreelist = &freelist;
            pDb->bUseFreelist = 1;

          }
        }
      }
      nRemaining -= LSM_MAX(mergeworker.nWork, 1);

      if( rc==LSM_OK ){
        /* Check if the merge operation is completely finished. If not,
................................................................................
      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      pDb->bIncrMerge = 0;
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 1
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
................................................................................

  pLvl = pDb->pWorker->pLevel;
  if( pLvl->pNext!=0 || pLvl->nRight==0 ){
    pLvl = 0;
  }

  rc = sortedNewToplevel(pDb, TREE_NONE, 0);
  if( rc==LSM_OK && pLvl ){
    Level *pNew = pDb->pWorker->pLevel;
    assert( pNew->pNext==pLvl );

    if( pLvl->pSplitKey==0 ){
      sortedSplitkey(pDb, pLvl, &rc);
    }
    if( rc==LSM_OK && pLvl->iSplitTopic==0 ){
................................................................................
  Snapshot *pSnap,                /* Snapshot to dump */
  int bKeys,                      /* Output the keys from each segment */
  int bVals,                      /* Output the values from each segment */
  const char *zWhy                /* Caption to print near top of dump */
){
  Snapshot *pDump = pSnap;
  Level *pTopLevel;



  assert( pSnap );
  pTopLevel = lsmDbSnapshotLevel(pDump);
  if( pDb->xLog && pTopLevel ){
    static int nCall = 0;
    Level *pLevel;
    int iLevel = 0;

    nCall++;
    lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);

    if( nCall==275 || nCall==276 ) bKeys = 1;



    for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
      char zLeft[1024];
      char zRight[1024];
      int i = 0;

      Segment *aLeft[24];  
................................................................................
        sortedDumpSegment(pDb, &pLevel->lhs, bVals);
        for(i=0; i<pLevel->nRight; i++){
          sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
        }
      }
    }
  }





  assert( lsmFsIntegrityCheck(pDb) );
}

void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
  Level *pNext;
  Level *p;







>




>
|







 







>
>
>
>
>
>
>







|











>







 







>





>








>


|







 







|

>


>

>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>












>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>





|


|
<
<
<
|
>

>
>
>



>
>
>




>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|
<
<
<
|
|
<
>
>
|
>
>
|
|
>
>
|
|
<
<
<
<
|
>
>
|
|
>
|
|

>







>

>
>
|
|
|
<
<

<
<




>
>

>
>
>
>
>
>
|
>
>
>
>
|
<
<
<
>
>
>
>
|
<
>
>
>
>

<
>


>
|
|
|
<
>






|

<
<
<
|
<
<
<
<
<
<
<
<
<
<
<







 







|







 







>
>







 







>







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|







 







>







 







|

>
>
|




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
|
<
<
<


>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
|
|
|
|
>







 







|







 







|







 







>
>











|
>
>







 







>
>
>
>







1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
....
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
....
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
....
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
....
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659



2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
....
2733
2734
2735
2736
2737
2738
2739
2740



2741
2742

2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753




2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777


2778


2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797



2798
2799
2800
2801
2802

2803
2804
2805
2806
2807

2808
2809
2810
2811
2812
2813
2814

2815
2816
2817
2818
2819
2820
2821
2822
2823



2824











2825
2826
2827
2828
2829
2830
2831
....
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
....
3097
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
....
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
....
3959
3960
3961
3962
3963
3964
3965















3966
3967
3968
3969
3970
3971
3972
....
4311
4312
4313
4314
4315
4316
4317
4318
4319
4320
4321
4322
4323
4324
4325
....
4453
4454
4455
4456
4457
4458
4459
4460
4461
4462
4463
4464
4465
4466
4467
....
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739
4740
4741
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759
4760
4761
4762
4763
4764
4765
4766
4767
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
4793
4794
4795
4796
4797
4798
4799
4800
4801
4802
4803
4804
4805
4806
4807
4808
4809
4810
4811
4812
....
4850
4851
4852
4853
4854
4855
4856
4857
4858
4859
4860
4861
4862



4863
4864
4865
4866
4867
4868
4869
4870
4871
4872
4873
4874
4875
4876
4877
4878
4879
4880
4881
4882
4883
4884
4885
4886
4887
4888
4889
4890
4891
4892
4893
4894
4895
4896
....
4977
4978
4979
4980
4981
4982
4983
4984
4985
4986
4987
4988
4989
4990
4991
....
5041
5042
5043
5044
5045
5046
5047
5048
5049
5050
5051
5052
5053
5054
5055
....
5762
5763
5764
5765
5766
5767
5768
5769
5770
5771
5772
5773
5774
5775
5776
5777
5778
5779
5780
5781
5782
5783
5784
5785
5786
5787
5788
5789
5790
5791
....
5848
5849
5850
5851
5852
5853
5854
5855
5856
5857
5858
5859
5860
5861
5862
5863
5864
5865
    if( svFlags && pPtr->pPg ){
      int res = sortedKeyCompare(pCsr->pDb->xCmp,
          rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
          pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
      );
      if( res<0 ) segmentPtrReset(pPtr);
    }

    if( pPtr->pPg==0 && (svFlags & LSM_END_DELETE) ){
      Segment *pSeg = pPtr->pSeg;
      rc = lsmFsDbPageGet(pCsr->pDb->pFS, pSeg, pSeg->iFirst, &pPtr->pPg);
      if( rc!=LSM_OK ) return rc;
      pPtr->eType = LSM_START_DELETE | LSM_POINT_DELETE;
      pPtr->eType |= (pLvl->iSplitTopic ? LSM_SYSTEMKEY : 0);
      pPtr->pKey = pLvl->pSplitKey;
      pPtr->nKey = pLvl->nSplitKey;
    }

  }while( pCsr 
       && pPtr->pPg 
       && segmentPtrIgnoreSeparators(pCsr, pPtr)
................................................................................
      && (pPtr->nCell==0 || (pPtr->flags & SEGMENT_BTREE_FLAG))
  ){
    rc = segmentPtrNextPage(pPtr, (bLast ? -1 : 1));
  }

  if( rc==LSM_OK && pPtr->pPg ){
    rc = segmentPtrLoadCell(pPtr, bLast ? (pPtr->nCell-1) : 0);
    if( rc==LSM_OK && bLast && pPtr->pSeg!=&pLvl->lhs ){
      int res = sortedKeyCompare(pCsr->pDb->xCmp,
          rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
          pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
      );
      if( res<0 ) segmentPtrReset(pPtr);
    }
  }
  
  bIgnore = segmentPtrIgnoreSeparators(pCsr, pPtr);
  if( rc==LSM_OK && pPtr->pPg && bIgnore && rtIsSeparator(pPtr->eType) ){
    rc = segmentPtrAdvance(pCsr, pPtr, bLast);
  }

#if 0
  if( bLast && rc==LSM_OK && pPtr->pPg
   && pPtr->pSeg==&pLvl->lhs 
   && pLvl->nRight && (pPtr->eType & LSM_START_DELETE)
  ){
    pPtr->iCell++;
    pPtr->eType = LSM_END_DELETE | (pLvl->iSplitTopic);
    pPtr->pKey = pLvl->pSplitKey;
    pPtr->nKey = pLvl->nSplitKey;
    pPtr->pVal = 0;
    pPtr->nVal = 0;
  }
#endif

  return rc;
}

static void segmentPtrKey(SegmentPtr *pPtr, void **ppKey, int *pnKey){
  assert( pPtr->pPg );
  *ppKey = pPtr->pKey;
................................................................................
    int iPtr = 0;
    if( nRhs==0 ) iPtr = *piPgno;

    rc = seekInSegment(
        pCsr, &aPtr[0], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
    );
    if( rc==LSM_OK && nRhs>0 && eSeek==LSM_SEEK_GE && aPtr[0].pPg==0 ){
      assert( aPtr[0].pKey==0 );
      res = 0;
    }
  }
  
  if( res>=0 ){
    int bHit = 0;                 /* True if at least one rhs is not EOF */
    int iPtr = *piPgno;
    int i;
    for(i=1; rc==LSM_OK && i<=nRhs && bStop==0; i++){
      iOut = 0;
      rc = seekInSegment(
          pCsr, &aPtr[i], iTopic, pKey, nKey, iPtr, eSeek, &iOut, &bStop
      );
      iPtr = iOut;
      if( aPtr[i].pKey ) bHit = 1;
    }

    if( rc==LSM_OK && eSeek==LSM_SEEK_LE && bHit==0 ){
      rc = segmentPtrEnd(pCsr, &aPtr[0], 1);
    }
  }

  assert( eSeek==LSM_SEEK_EQ || bStop==0 );
  *piPgno = iOut;
  *pbStop = bStop;
................................................................................
  MultiCursor *pCsr, 
  int iPtr,
  int bReverse
){
  int rc;
  SegmentPtr *pPtr = &pCsr->aPtr[iPtr];
  Level *pLvl = pPtr->pLevel;
  int bComposite;                 /* True if pPtr is part of composite level */

  /* Advance the segment-pointer object. */
  rc = segmentPtrAdvance(pCsr, pPtr, bReverse);
  if( rc!=LSM_OK ) return rc;

  bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);
  if( bComposite && pPtr->pPg==0 ){
    int bFix = 0;
    if( (bReverse==0)==(pPtr->pSeg==&pLvl->lhs) ){
      int i;
      if( bReverse ){
        SegmentPtr *pLhs = &pCsr->aPtr[iPtr - 1 - (pPtr->pSeg - pLvl->aRhs)];
        for(i=0; i<pLvl->nRight; i++){
          if( pLhs[i+1].pPg ) break;
        }
        if( i==pLvl->nRight ){
          bFix = 1;
          rc = segmentPtrEnd(pCsr, pLhs, 1);
        }
      }else{
        bFix = 1;
        for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
          rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
        }
      }
    }

    if( bFix ){
      int i;
      for(i=pCsr->nTree-1; i>0; i--){
        multiCursorDoCompare(pCsr, i, bReverse);
      }
    }
  }

#if 0
  if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
   && bReverse==0                                /* csr advanced forwards */
   && pPtr->pPg==0                               /* segment at EOF */
  ){
    int i;
    for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
      rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
    }
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, 0);
    }
  }
#endif

  return rc;
}

static void mcursorFreeComponents(MultiCursor *pCsr){
  int i;
  lsm_env *pEnv = pCsr->pDb->pEnv;
................................................................................
  if( *pRc==LSM_OK ){
    void *pKey;
    int nKey;
    multiCursorGetKey(pCsr, pCsr->aTree[1], &pCsr->eType, &pKey, &nKey);
    *pRc = sortedBlobSet(pCsr->pDb->pEnv, &pCsr->key, pKey, nKey);
  }
}

#ifndef NDEBUG
static void assertCursorTree(MultiCursor *pCsr){
  int bRev = !!(pCsr->flags & CURSOR_PREV_OK);
  int *aSave = pCsr->aTree;
  int nSave = pCsr->nTree;
  int rc;

  pCsr->aTree = 0;
  pCsr->nTree = 0;
  rc = multiCursorAllocTree(pCsr);
  if( rc==LSM_OK ){
    int i;
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, bRev);
    }

    assert( nSave==pCsr->nTree 
        && 0==memcmp(aSave, pCsr->aTree, sizeof(int)*nSave)
    );

    lsmFree(pCsr->pDb->pEnv, pCsr->aTree);
  }

  pCsr->aTree = aSave;
  pCsr->nTree = nSave;
}
#else
# define assertCursorTree(x)
#endif

static int mcursorLocationOk(MultiCursor *pCsr, int bDeleteOk){
  int eType = pCsr->eType;
  int iKey;
  int i;
  int rdmask;
  
  assert( pCsr->flags & (CURSOR_NEXT_OK|CURSOR_PREV_OK) );
  assertCursorTree(pCsr);




  rdmask = (pCsr->flags & CURSOR_NEXT_OK) ? LSM_END_DELETE : LSM_START_DELETE;

  /* If the cursor does not currently point to an actual database key (i.e.
  ** it points to a delete key, or the start or end of a range-delete), and
  ** the CURSOR_IGNORE_DELETE flag is set, skip past this entry.  */
  if( (pCsr->flags & CURSOR_IGNORE_DELETE) && bDeleteOk==0 ){
    if( (eType & LSM_INSERT)==0 ) return 0;
  }

  /* If the cursor points to a system key (free-list entry), and the
  ** CURSOR_IGNORE_SYSTEM flag is set, skip thie entry.  */
  if( (pCsr->flags & CURSOR_IGNORE_SYSTEM) && rtTopic(eType)!=0 ){
    return 0;
  }

#ifndef NDEBUG
  /* This block fires assert() statements to check one of the assumptions
  ** in the comment below - that if the lhs sub-cursor of a level undergoing
  ** a merge is valid, then all the rhs sub-cursors must be at EOF.  */
  for(i=0; i<pCsr->nPtr; i++){
    SegmentPtr *pPtr = &pCsr->aPtr[i];
    Level *pLvl = pPtr->pLevel;
    if( pLvl->nRight && pPtr->pSeg==&pLvl->lhs && pPtr->pPg ){
      int j;
      for(j=0; j<pLvl->nRight; j++) assert( pPtr[j+1].pPg==0 );
    }
  }
#endif

  /* Now check if this key has already been deleted by a range-delete. If 
  ** so, skip past it.
  **
  ** Assume, for the moment, that the tree contains no levels currently 
  ** undergoing incremental merge, and that this cursor is iterating forwards
  ** through the database keys. The cursor currently points to a key in
  ** level L. This key has already been deleted if any of the sub-cursors
  ** that point to levels newer than L (or to the in-memory tree) point to
  ** a key greater than the current key with the LSM_END_DELETE flag set.
  **
  ** Or, if the cursor is iterating backwards through data keys, if any
  ** such sub-cursor points to a key smaller than the current key with the
  ** LSM_START_DELETE flag set.
  **
  ** Why it works with levels undergoing a merge too:
  **
  ** When a cursor iterates forwards, the sub-cursors for the rhs of a 
  ** level are only activated once the lhs reaches EOF. So when iterating
  ** forwards, the keys visited are the same as if the level was completely
  ** merged.
  **
  ** If the cursor is iterating backwards, then the lhs sub-cursor is not 
  ** initialized until the last of the rhs sub-cursors has reached EOF.
  ** Additionally, if the START_DELETE flag is set on the last entry (in
  ** reverse order - so the entry with the smallest key) of a rhs sub-cursor,
  ** then a pseudo-key equal to the levels split-key with the END_DELETE
  ** flag set is visited by the sub-cursor.
  */ 
  iKey = pCsr->aTree[1];
  for(i=0; i<iKey; i++){
    int csrflags;
    multiCursorGetKey(pCsr, i, &csrflags, 0, 0);
    if( (rdmask & csrflags) ){
      const int SD_ED = (LSM_START_DELETE|LSM_END_DELETE);
      if( (csrflags & SD_ED)==SD_ED 
................................................................................
          continue;
        }
      }
      return 0;
    }
  }

  /* The current cursor position is one this cursor should visit. Return 1. */



  return 1;
}


static int multiCursorSetupTree(MultiCursor *pCsr, int bRev){
  int rc;

  rc = multiCursorAllocTree(pCsr);
  if( rc==LSM_OK ){
    int i;
    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, bRev);
    }
  }





  assertCursorTree(pCsr);
  multiCursorCacheKey(pCsr, &rc);

  if( rc==LSM_OK && mcursorLocationOk(pCsr, 0)==0 ){
    rc = multiCursorAdvance(pCsr, bRev);
  }
  return rc;
}


static int multiCursorEnd(MultiCursor *pCsr, int bLast){
  int rc = LSM_OK;
  int i;

  pCsr->flags &= ~(CURSOR_NEXT_OK | CURSOR_PREV_OK);
  pCsr->flags |= (bLast ? CURSOR_PREV_OK : CURSOR_NEXT_OK);
  pCsr->iFree = 0;

  /* Position the two in-memory tree cursors */
  for(i=0; rc==LSM_OK && i<2; i++){
    if( pCsr->apTreeCsr[i] ){
      rc = lsmTreeCursorEnd(pCsr->apTreeCsr[i], bLast);
    }


  }



  for(i=0; rc==LSM_OK && i<pCsr->nPtr; i++){
    SegmentPtr *pPtr = &pCsr->aPtr[i];
    Level *pLvl = pPtr->pLevel;
    int iRhs;
    int bHit = 0;

    if( bLast ){
      for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
        rc = segmentPtrEnd(pCsr, &pPtr[iRhs+1], 1);
        if( pPtr[iRhs+1].pPg ) bHit = 1;
      }
      if( bHit==0 && rc==LSM_OK ){
        rc = segmentPtrEnd(pCsr, pPtr, 1);
      }else{
        segmentPtrReset(pPtr);
      }
    }else{
      int bLhs = (pPtr->pSeg==&pLvl->lhs);



      assert( pPtr->pSeg==&pLvl->lhs || pPtr->pSeg==&pLvl->aRhs[0] );

      if( bLhs ){
        rc = segmentPtrEnd(pCsr, pPtr, 0);
        if( pPtr->pKey ) bHit = 1;

      }
      for(iRhs=0; iRhs<pLvl->nRight && rc==LSM_OK; iRhs++){
        if( bHit ){
          segmentPtrReset(&pPtr[iRhs+1]);
        }else{

          rc = sortedRhsFirst(pCsr, pLvl, &pPtr[iRhs+bLhs]);
        }
      }
    }
    i += pLvl->nRight;
  }


  /* And the b-tree cursor, if applicable */
  if( rc==LSM_OK && pCsr->pBtCsr ){
    assert( bLast==0 );
    rc = btreeCursorFirst(pCsr->pBtCsr);
  }

  if( rc==LSM_OK ){
    rc = multiCursorSetupTree(pCsr, bLast);
  }



  











  return rc;
}


int mcursorSave(MultiCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->aTree ){
................................................................................

    /* If this cursor is configured to skip deleted keys, and the current
    ** cursor points to a SORTED_DELETE entry, then the cursor has not been 
    ** successfully advanced.  
    **
    ** Similarly, if the cursor is configured to skip system keys and the
    ** current cursor points to a system key, it has not yet been advanced.
    */
    if( *pRc==LSM_OK && 0==mcursorLocationOk(pCsr, 0) ) return 0;
  }
  return 1;
}

static void flCsrAdvance(MultiCursor *pCsr){
  assert( pCsr->flags & CURSOR_FLUSH_FREELIST );
................................................................................
}

static int multiCursorAdvance(MultiCursor *pCsr, int bReverse){
  int rc = LSM_OK;                /* Return Code */
  if( lsmMCursorValid(pCsr) ){
    do {
      int iKey = pCsr->aTree[1];

      assertCursorTree(pCsr);

      /* If this multi-cursor is advancing forwards, and the sub-cursor
      ** being advanced is the one that separator keys may be being read
      ** from, record the current absolute pointer value.  */
      if( pCsr->pPrevMergePtr ){
        if( iKey==(CURSOR_DATA_SEGMENT+pCsr->nPtr) ){
          assert( pCsr->pBtCsr );
................................................................................
        rc = segmentCursorAdvance(pCsr, iKey-CURSOR_DATA_SEGMENT, bReverse);
      }
      if( rc==LSM_OK ){
        int i;
        for(i=(iKey+pCsr->nTree)/2; i>0; i=i/2){
          multiCursorDoCompare(pCsr, i, bReverse);
        }
        assertCursorTree(pCsr);
      }
    }while( mcursorAdvanceOk(pCsr, bReverse, &rc)==0 );
  }
  return rc;
}

int lsmMCursorNext(MultiCursor *pCsr){
................................................................................
    }
  }

  return rc;
}

















/*
** Free all resources allocated by mergeWorkerInit().
*/
static void mergeWorkerShutdown(MergeWorker *pMW, int *pRc){
  int i;                          /* Iterator variable */
  int rc = *pRc;
  MultiCursor *pCsr = pMW->pCsr;
................................................................................
    }else if( pDel ){
      assert( pNew->pNext==pDel );
      pNew->pNext = pDel->pNext;
      lsmFsSortedDelete(pDb->pFS, pDb->pWorker, 1, &pDel->lhs);
      sortedFreeLevel(pDb->pEnv, pDel);
    }

#if 0
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "new-toplevel");
#endif

    if( freelist.nEntry ){
      Freelist *p = &pDb->pWorker->freelist;
      lsmFree(pDb->pEnv, p->aEntry);
      memcpy(p, &freelist, sizeof(freelist));
................................................................................
  **      separators attached to the LHS of the following level, or neither.
  **
  ** If the new level is the lowest (oldest) in the db, discard any
  ** delete keys. Key annihilation.
  */
  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    pCsr->flags |= CURSOR_NEXT_OK;
    rc = multiCursorAddRhs(pCsr, pLevel);
  }
  if( rc==LSM_OK && pMerge->nInput > pLevel->nRight ){
    rc = btreeCursorNew(pDb, &pNext->lhs, &pCsr->pBtCsr);
  }else if( pNext ){
    multiCursorReadSeparators(pCsr);
  }else{
................................................................................
      rc = lsmBlockFree(pDb, iFrom);

      *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
      pLvl->lhs.pRedirect = &p->redirect;
    }
  }

#if 0
  if( rc==LSM_OK ){
    char aBuf[64];
    sprintf(aBuf, "move-block %d/%d", p->redirect.n-1, LSM_MAX_BLOCK_REDIRECTS);
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, aBuf);
  }
#endif
  return rc;
}

/*
*/
static int mergeInsertFreelistSegments(
  lsm_db *pDb, 
  int nFree,
  MergeWorker *pMW
){
  int rc = LSM_OK;
  if( nFree>0 ){
    MultiCursor *pCsr = pMW->pCsr;
    Level *pLvl = pMW->pLevel;
    SegmentPtr *aNew1;
    Segment *aNew2;

    Level *pIter;
    Level *pNext;
    int i = 0;

    aNew1 = (SegmentPtr *)lsmMallocZeroRc(
        pDb->pEnv, sizeof(SegmentPtr) * (pCsr->nPtr+nFree), &rc
    );
    if( rc ) return rc;
    memcpy(&aNew1[nFree], pCsr->aPtr, sizeof(SegmentPtr)*pCsr->nPtr);
    pCsr->nPtr += nFree;
    lsmFree(pDb->pEnv, pCsr->aTree);
    lsmFree(pDb->pEnv, pCsr->aPtr);
    pCsr->aTree = 0;
    pCsr->aPtr = aNew1;

    aNew2 = (Segment *)lsmMallocZeroRc(
        pDb->pEnv, sizeof(Segment) * (pLvl->nRight+nFree), &rc
    );
    if( rc ) return rc;
    memcpy(&aNew2[nFree], pLvl->aRhs, sizeof(Segment)*pLvl->nRight);
    pLvl->nRight += nFree;
    lsmFree(pDb->pEnv, pLvl->aRhs);
    pLvl->aRhs = aNew2;

    for(pIter=pDb->pWorker->pLevel; rc==LSM_OK && pIter!=pLvl; pIter=pNext){
      Segment *pSeg = &pLvl->aRhs[i];
      memcpy(pSeg, &pIter->lhs, sizeof(Segment));

      pCsr->aPtr[i].pSeg = pSeg;
      pCsr->aPtr[i].pLevel = pLvl;
      rc = segmentPtrEnd(pCsr, &pCsr->aPtr[i], 0);

      pDb->pWorker->pLevel = pNext = pIter->pNext;
      sortedFreeLevel(pDb->pEnv, pIter);
      i++;
    }
    assert( i==nFree );
    assert( rc!=LSM_OK || pDb->pWorker->pLevel==pLvl );

    for(i=nFree; i<pCsr->nPtr; i++){
      pCsr->aPtr[i].pSeg = &pLvl->aRhs[i];
    }

    lsmFree(pDb->pEnv, pMW->aGobble);
    pMW->aGobble = 0;
  }
  return rc;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int nMerge,                     /* Try to merge this many levels at once */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
................................................................................
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && (mergeworker.nWork<nRemaining || pDb->bUseFreelist)
      ){
        int eType = rtTopic(mergeworker.pCsr->eType);
        rc = mergeWorkerStep(&mergeworker);

        /* If the cursor now points at the first entry past the end of the
        ** user data (i.e. either to EOF or to the first free-list entry
        ** that will be added to the run), then check if it is possible to
        ** merge in any free-list entries that are either in-memory or in
        ** free-list-only blocks.  */
        if( rc==LSM_OK && nMerge==1 && eType==0



         && (rtTopic(mergeworker.pCsr->eType) || mergeWorkerDone(&mergeworker))
        ){
          int nFree = 0;          /* Number of free-list-only levels to merge */
          Level *pLvl;
          assert( pDb->pFreelist==0 && pDb->bUseFreelist==0 );

          /* Now check if all levels containing data newer than this one
          ** are single-segment free-list only levels. If so, they will be
          ** merged in now.  */
          for(pLvl=pDb->pWorker->pLevel; 
              pLvl!=mergeworker.pLevel && (pLvl->flags & LEVEL_FREELIST_ONLY); 
              pLvl=pLvl->pNext
          ){
            assert( pLvl->nRight==0 );
            nFree++;
          }
          if( pLvl==mergeworker.pLevel ){

            rc = mergeInsertFreelistSegments(pDb, nFree, &mergeworker);
            if( rc==LSM_OK ){
              rc = multiCursorVisitFreelist(mergeworker.pCsr);
            }
            if( rc==LSM_OK ){
              rc = multiCursorSetupTree(mergeworker.pCsr, 0);
              pDb->pFreelist = &freelist;
              pDb->bUseFreelist = 1;
            }
          }
        }
      }
      nRemaining -= LSM_MAX(mergeworker.nWork, 1);

      if( rc==LSM_OK ){
        /* Check if the merge operation is completely finished. If not,
................................................................................
      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      pDb->bIncrMerge = 0;
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
      assertBtreeOk(pDb, &pLevel->lhs);
      assertRunInOrder(pDb, &pLevel->lhs);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
................................................................................

  pLvl = pDb->pWorker->pLevel;
  if( pLvl->pNext!=0 || pLvl->nRight==0 ){
    pLvl = 0;
  }

  rc = sortedNewToplevel(pDb, TREE_NONE, 0);
  if( 0 && rc==LSM_OK && pLvl ){
    Level *pNew = pDb->pWorker->pLevel;
    assert( pNew->pNext==pLvl );

    if( pLvl->pSplitKey==0 ){
      sortedSplitkey(pDb, pLvl, &rc);
    }
    if( rc==LSM_OK && pLvl->iSplitTopic==0 ){
................................................................................
  Snapshot *pSnap,                /* Snapshot to dump */
  int bKeys,                      /* Output the keys from each segment */
  int bVals,                      /* Output the values from each segment */
  const char *zWhy                /* Caption to print near top of dump */
){
  Snapshot *pDump = pSnap;
  Level *pTopLevel;
  char *zFree = 0;


  assert( pSnap );
  pTopLevel = lsmDbSnapshotLevel(pDump);
  if( pDb->xLog && pTopLevel ){
    static int nCall = 0;
    Level *pLevel;
    int iLevel = 0;

    nCall++;
    lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);

#if 0
    if( nCall==1031 || nCall==1032 ) bKeys=1;
#endif

    for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
      char zLeft[1024];
      char zRight[1024];
      int i = 0;

      Segment *aLeft[24];  
................................................................................
        sortedDumpSegment(pDb, &pLevel->lhs, bVals);
        for(i=0; i<pLevel->nRight; i++){
          sortedDumpSegment(pDb, &pLevel->aRhs[i], bVals);
        }
      }
    }
  }

  lsmInfoFreelist(pDb, &zFree);
  lsmLogMessage(pDb, LSM_OK, "Freelist: %s", zFree);
  lsmFree(pDb->pEnv, zFree);

  assert( lsmFsIntegrityCheck(pDb) );
}

void lsmSortedFreeLevel(lsm_env *pEnv, Level *pLevel){
  Level *pNext;
  Level *p;