SQLite4
Check-in [a56a334333]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Several block-redirect related bugfixes.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | block-redirects
Files: files | file ages | folders
SHA1: a56a33433354d1ff7bbce64f9af2d1d419927bdf
User & Date: dan 2013-01-22 20:07:07
Context
2013-01-23
18:13
Keep the contents of Segment structures up to date at all times, so that none of the page numbers contained within are subject to redirection. check-in: a89abc2117 user: dan tags: block-redirects
2013-01-22
20:07
Several block-redirect related bugfixes. check-in: a56a334333 user: dan tags: block-redirects
2013-01-21
19:50
Add tests for block-redirects to lsmtest. check-in: eec16b0f2f user: dan tags: block-redirects
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest9.c.

60
61
62
63
64
65
66

67
68
69
70

71
72
73
74
75
76
77
...
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
  for(i=0; rc==0 && i<p->nRepeat; i++){

    testDeleteDatasourceRange(pControl, pData, iData, nRecOn3*2, &rc);
    testDeleteDatasourceRange(pDb,      pData, iData, nRecOn3*2, &rc);

    if( db ){
      int nDone;

      do {
        nDone = 0;
        rc = lsm_work(db, 1, 100000, &nDone);
      }while( rc==0 && nDone>0 );

    }

    iData += (nRecOn3*2);
    testWriteDatasourceRange(pControl, pData, iData+nRecOn3, nRecOn3*2, &rc);
    testWriteDatasourceRange(pDb,      pData, iData+nRecOn3, nRecOn3*2, &rc);

    testCompareDb(pData, nRecOn3*3, iData, pControl, pDb, &rc);
................................................................................
void test_data_4(
  const char *zSystem,            /* Database system name */
  const char *zPattern,           /* Run test cases that match this pattern */
  int *pRc                        /* IN/OUT: Error code */
){
  Datatest4 aTest[] = {
      /* defn,                                 nRec, nRepeat, bReopen */
    { {DATA_RANDOM,     20,25,     100,200}, 10000,      10,       0   },
    { {DATA_RANDOM,     20,25,     100,200}, 10000,      10,       1   },
  };

  int i;

  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
    char *zName = getName4(zSystem, &aTest[i]);
    if( testCaseBegin(pRc, zPattern, "%s", zName) ){







>


|

>







 







|
|







60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
...
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
  for(i=0; rc==0 && i<p->nRepeat; i++){

    testDeleteDatasourceRange(pControl, pData, iData, nRecOn3*2, &rc);
    testDeleteDatasourceRange(pDb,      pData, iData, nRecOn3*2, &rc);

    if( db ){
      int nDone;
      fprintf(stderr, "lsm_work() start...\n");
      do {
        nDone = 0;
        rc = lsm_work(db, 1, (1<<30), &nDone);
      }while( rc==0 && nDone>0 );
      fprintf(stderr, "lsm_work() done...\n");
    }

    iData += (nRecOn3*2);
    testWriteDatasourceRange(pControl, pData, iData+nRecOn3, nRecOn3*2, &rc);
    testWriteDatasourceRange(pDb,      pData, iData+nRecOn3, nRecOn3*2, &rc);

    testCompareDb(pData, nRecOn3*3, iData, pControl, pDb, &rc);
................................................................................
void test_data_4(
  const char *zSystem,            /* Database system name */
  const char *zPattern,           /* Run test cases that match this pattern */
  int *pRc                        /* IN/OUT: Error code */
){
  Datatest4 aTest[] = {
      /* defn,                                 nRec, nRepeat, bReopen */
    { {DATA_RANDOM,     20,25,     500,600}, 10000,      10,       0   },
    { {DATA_RANDOM,     20,25,     500,600}, 10000,      10,       1   },
  };

  int i;

  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
    char *zName = getName4(zSystem, &aTest[i]);
    if( testCaseBegin(pRc, zPattern, "%s", zName) ){

Changes to src/lsmInt.h.

317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336

337
338
339
340
341
342
343
  lsm_compress compress;          /* Compression callbacks */

  /* Sub-system handles */
  FileSystem *pFS;                /* On-disk portion of database */
  Database *pDatabase;            /* Database shared data */

  /* Client transaction context */
  Snapshot *pClient;              /* Client snapshot (non-NULL in read trans) */
  int iReader;                    /* Read lock held (-1 == unlocked) */
  MultiCursor *pCsr;              /* List of all open cursors */
  LogWriter *pLogWriter;          /* Context for writing to the log file */
  int nTransOpen;                 /* Number of opened write transactions */
  int nTransAlloc;                /* Allocated size of aTrans[] array */
  TransMark *aTrans;              /* Array of marks for transaction rollback */
  IntArray rollback;              /* List of tree-nodes to roll back */

  /* Worker context */
  Snapshot *pWorker;              /* Worker snapshot (or NULL) */
  Freelist *pFreelist;            /* See sortedNewToplevel() */
  int bUseFreelist;               /* True to use pFreelist */


  /* Debugging message callback */
  void (*xLog)(void *, int, const char *);
  void *pLogCtx;

  /* Work done notification callback */
  void (*xWork)(lsm_db *, void *);







|












>







317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
  lsm_compress compress;          /* Compression callbacks */

  /* Sub-system handles */
  FileSystem *pFS;                /* On-disk portion of database */
  Database *pDatabase;            /* Database shared data */

  /* Client transaction context */
  Snapshot *pClient;              /* Client snapshot */
  int iReader;                    /* Read lock held (-1 == unlocked) */
  MultiCursor *pCsr;              /* List of all open cursors */
  LogWriter *pLogWriter;          /* Context for writing to the log file */
  int nTransOpen;                 /* Number of opened write transactions */
  int nTransAlloc;                /* Allocated size of aTrans[] array */
  TransMark *aTrans;              /* Array of marks for transaction rollback */
  IntArray rollback;              /* List of tree-nodes to roll back */

  /* Worker context */
  Snapshot *pWorker;              /* Worker snapshot (or NULL) */
  Freelist *pFreelist;            /* See sortedNewToplevel() */
  int bUseFreelist;               /* True to use pFreelist */
  int bIncrMerge;                 /* True if currently doing a merge */

  /* Debugging message callback */
  void (*xLog)(void *, int, const char *);
  void *pLogCtx;

  /* Work done notification callback */
  void (*xWork)(lsm_db *, void *);

Changes to src/lsm_ckpt.c.

918
919
920
921
922
923
924

925

926
927
928
929
930
931
932
....
1044
1045
1046
1047
1048
1049
1050

1051
1052
1053
1054
1055
1056
1057
1058
      return LSM_PROTOCOL;
    }
  }

  rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
  if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;


  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

  return rc;
}

int lsmCheckpointDeserialize(
  lsm_db *pDb, 
  int bInclFreelist,              /* If true, deserialize free-list */
  u32 *aCkpt, 
................................................................................
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;


  rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);
  memcpy(pShm->aSnap1, p, n);







>

>







 







>
|







918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
....
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
      return LSM_PROTOCOL;
    }
  }

  rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
  if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;

#if 0
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
#endif
  return rc;
}

int lsmCheckpointDeserialize(
  lsm_db *pDb, 
  int bInclFreelist,              /* If true, deserialize free-list */
  u32 *aCkpt, 
................................................................................
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;

  pSnap->iId++;
  rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);
  memcpy(pShm->aSnap1, p, n);

Changes to src/lsm_main.c.

36
37
38
39
40
41
42


43
44
45
46
47
48
49
static void assert_db_state(lsm_db *pDb){

  /* If there is at least one cursor or a write transaction open, the database
  ** handle must be holding a pointer to a client snapshot. And the reverse 
  ** - if there are no open cursors and no write transactions then there must 
  ** not be a client snapshot.  */
  assert( (pDb->pCsr!=0 || pDb->nTransOpen>0)==(pDb->iReader>=0) );



  assert( pDb->nTransOpen>=0 );
}
#else
# define assert_db_state(x) 
#endif








>
>







36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void assert_db_state(lsm_db *pDb){

  /* If there is at least one cursor or a write transaction open, the database
  ** handle must be holding a pointer to a client snapshot. And the reverse 
  ** - if there are no open cursors and no write transactions then there must 
  ** not be a client snapshot.  */
  assert( (pDb->pCsr!=0 || pDb->nTransOpen>0)==(pDb->iReader>=0) );

  assert( pDb->iReader<0 || pDb->pClient!=0 );

  assert( pDb->nTransOpen>=0 );
}
#else
# define assert_db_state(x) 
#endif

Changes to src/lsm_shared.c.

667
668
669
670
671
672
673

674

675
676
677
678
679
680
681
682
683
684
...
685
686
687
688
689
690
691


692

693
694
695
696
697
698
699
700
701
702
703
704
705
706
  i64 iInUse = 0;                 /* Snapshot id still in use */
  i64 iSynced = 0;                /* Snapshot id synced to disk */

  assert( p );

#ifdef LSM_LOG_FREELIST
  {

    char *zFree = 0;

    rc = lsmInfoFreelist(pDb, &zFree);
    if( rc!=LSM_OK ) return rc;
    lsmLogMessage(pDb, 0, "lsmBlockAllocate(): freelist: %s", zFree);
    lsmFree(pDb->pEnv, zFree);
  }
#endif

  /* Set iInUse to the smallest snapshot id that is either:
  **
  **   * Currently in use by a database client,
................................................................................
  **   * May be used by a database client in the future, or
  **   * Is the most recently checkpointed snapshot (i.e. the one that will
  **     be used following recovery if a failure occurs at this point).
  */
  rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
  if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
  iInUse = iSynced;


  if( rc==LSM_OK && pDb->pClient ) iInUse = LSM_MIN(iInUse, pDb->pClient->iId);

  if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);

#ifdef LSM_LOG_FREELIST
  {
    lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
        "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)", 
        iInUse, iSynced, (pDb->pClient ? pDb->pClient->iId : 0)
    );
  }
#endif

  /* Query the free block list for a suitable block */
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);








>

>


|







 







>
>
|
>






|







667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
...
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
  i64 iInUse = 0;                 /* Snapshot id still in use */
  i64 iSynced = 0;                /* Snapshot id synced to disk */

  assert( p );

#ifdef LSM_LOG_FREELIST
  {
    static int nCall = 0;
    char *zFree = 0;
    nCall++;
    rc = lsmInfoFreelist(pDb, &zFree);
    if( rc!=LSM_OK ) return rc;
    lsmLogMessage(pDb, 0, "lsmBlockAllocate(): %d freelist: %s", nCall, zFree);
    lsmFree(pDb->pEnv, zFree);
  }
#endif

  /* Set iInUse to the smallest snapshot id that is either:
  **
  **   * Currently in use by a database client,
................................................................................
  **   * May be used by a database client in the future, or
  **   * Is the most recently checkpointed snapshot (i.e. the one that will
  **     be used following recovery if a failure occurs at this point).
  */
  rc = lsmCheckpointSynced(pDb, &iSynced, 0, 0);
  if( rc==LSM_OK && iSynced==0 ) iSynced = p->iId;
  iInUse = iSynced;
  if( rc==LSM_OK && pDb->iReader>=0 ){
    assert( pDb->pClient );
    iInUse = LSM_MIN(iInUse, pDb->pClient->iId);
  }
  if( rc==LSM_OK ) rc = firstSnapshotInUse(pDb, &iInUse);

#ifdef LSM_LOG_FREELIST
  {
    lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
        "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)", 
        iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0)
    );
  }
#endif

  /* Query the free block list for a suitable block */
  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);

Changes to src/lsm_sorted.c.

2447
2448
2449
2450
2451
2452
2453

2454
2455



2456
2457
2458
2459
2460
2461
2462
....
2483
2484
2485
2486
2487
2488
2489

2490

2491
2492
2493
2494
2495
2496
2497
....
4289
4290
4291
4292
4293
4294
4295
4296


4297
4298
4299
4300
4301
4302
4303
....
4611
4612
4613
4614
4615
4616
4617





4618
4619
4620
4621
4622
4623
4624
....
4648
4649
4650
4651
4652
4653
4654


4655
4656
4657
4658
4659
4660
4661
4662
4663

4664
4665
4666
4667
4668
4669
4670
....
4823
4824
4825
4826
4827
4828
4829
4830
4831
4832
4833
4834
4835
4836
4837
....
5517
5518
5519
5520
5521
5522
5523

5524
5525
5526

5527


5528
5529
5530
5531
5532
5533
5534
  void *pCtx                      /* First argument to pass to callback */
){
  MultiCursor *pCsr;              /* Cursor used to read db */
  int rc = LSM_OK;                /* Return Code */
  Snapshot *pSnap = 0;

  assert( pDb->pWorker );

  rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
  if( rc!=LSM_OK ) return rc;




  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    rc = multiCursorAddAll(pCsr, pSnap);
    pCsr->flags |= CURSOR_IGNORE_DELETE;
  }
  
................................................................................
        if( x(pCtx, iBlk, iSnap) ) break;
        rc = multiCursorAdvance(pCsr, !bReverse);
      }
    }
  }

  lsmMCursorClose(pCsr);

  lsmFreeSnapshot(pDb->pEnv, pSnap);


  return rc;
}

int lsmSortedLoadFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  void **ppVal,                   /* OUT: Blob containing LSM free-list */
................................................................................
    pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
    pNew->pNext = p;
    for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext));
    *pp = pNew;
    lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel);

    /* Determine whether or not the next separators will be linked in */
    if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot ){


      bUseNext = 1;
    }
  }

  /* Allocate the merge object */
  nByte = sizeof(Merge) + sizeof(MergeInput) * (nMerge + bUseNext);
  pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
................................................................................
      rc = lsmBlockFree(pDb, iFrom);

      *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
      pLvl->lhs.pRedirect = &p->redirect;
    }
  }






  return rc;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int nMerge,                     /* Try to merge this many levels at once */
................................................................................
      nRemaining -= nDone;

      /* Could not find any work to do. Finished. */
      if( nDone==0 ) break;
    }else{
      MergeWorker mergeworker;    /* State used to work on the level merge */



      rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
      assert( mergeworker.nWork==0 );
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && mergeworker.nWork<nRemaining 
      ){
        rc = mergeWorkerStep(&mergeworker);
      }
      nRemaining -= LSM_MAX(mergeworker.nWork, 1);


      /* Check if the merge operation is completely finished. If so, the
      ** Merge object and the right-hand-side of the level can be deleted. 
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
................................................................................
      pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, 
          sizeof(Merge) + sizeof(MergeInput)*(pLvl->pMerge->nInput+1), &rc
      );
      if( rc==LSM_OK ){
        memcpy(pMerge, pLvl->pMerge, sizeof(Merge));
        pMerge->aInput = (MergeInput *)&pMerge[1];
        memcpy(&pMerge->aInput[1], pLvl->pMerge->aInput, 
            sizeof(MergeInput)*(pLvl->pMerge->nInput+1)
        );
        pMerge->aInput[0].iPg = aRhs[0].iFirst;
        pMerge->aInput[0].iCell = 0;
        pMerge->nInput++;
        lsmFree(pDb->pEnv, pLvl->pMerge);
        pLvl->pMerge = pMerge;

................................................................................
){
  Snapshot *pDump = pSnap;
  Level *pTopLevel;

  assert( pSnap );
  pTopLevel = lsmDbSnapshotLevel(pDump);
  if( pDb->xLog && pTopLevel ){

    Level *pLevel;
    int iLevel = 0;


    lsmLogMessage(pDb, LSM_OK, "Database structure (%s)", zWhy);



    for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
      char zLeft[1024];
      char zRight[1024];
      int i = 0;

      Segment *aLeft[24];  







>
|
|
>
>
>







 







>
|
>







 







|
>
>







 







>
>
>
>
>







 







>
>









>







 







|







 







>



>
|
>
>







2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
....
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
....
4295
4296
4297
4298
4299
4300
4301
4302
4303
4304
4305
4306
4307
4308
4309
4310
4311
....
4619
4620
4621
4622
4623
4624
4625
4626
4627
4628
4629
4630
4631
4632
4633
4634
4635
4636
4637
....
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
4676
4677
4678
4679
4680
4681
4682
4683
4684
4685
4686
....
4839
4840
4841
4842
4843
4844
4845
4846
4847
4848
4849
4850
4851
4852
4853
....
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543
5544
5545
5546
5547
5548
5549
5550
5551
5552
5553
5554
  void *pCtx                      /* First argument to pass to callback */
){
  MultiCursor *pCsr;              /* Cursor used to read db */
  int rc = LSM_OK;                /* Return Code */
  Snapshot *pSnap = 0;

  assert( pDb->pWorker );
  if( pDb->bIncrMerge ){
    rc = lsmCheckpointDeserialize(pDb, 0, pDb->pShmhdr->aSnap1, &pSnap);
    if( rc!=LSM_OK ) return rc;
  }else{
    pSnap = pDb->pWorker;
  }

  pCsr = multiCursorNew(pDb, &rc);
  if( pCsr ){
    rc = multiCursorAddAll(pCsr, pSnap);
    pCsr->flags |= CURSOR_IGNORE_DELETE;
  }
  
................................................................................
        if( x(pCtx, iBlk, iSnap) ) break;
        rc = multiCursorAdvance(pCsr, !bReverse);
      }
    }
  }

  lsmMCursorClose(pCsr);
  if( pSnap!=pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pSnap);
  }

  return rc;
}

int lsmSortedLoadFreelist(
  lsm_db *pDb,                    /* Database handle (must be worker) */
  void **ppVal,                   /* OUT: Blob containing LSM free-list */
................................................................................
    pTopLevel = lsmDbSnapshotLevel(pDb->pWorker);
    pNew->pNext = p;
    for(pp=&pTopLevel; *pp!=pLevel; pp=&((*pp)->pNext));
    *pp = pNew;
    lsmDbSnapshotSetLevel(pDb->pWorker, pTopLevel);

    /* Determine whether or not the next separators will be linked in */
    if( pNext && pNext->pMerge==0 && pNext->lhs.iRoot && pNext 
     && (bFreeOnly==0 || (pNext->flags & LEVEL_FREELIST_ONLY))
    ){
      bUseNext = 1;
    }
  }

  /* Allocate the merge object */
  nByte = sizeof(Merge) + sizeof(MergeInput) * (nMerge + bUseNext);
  pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
................................................................................
      rc = lsmBlockFree(pDb, iFrom);

      *pnWrite = lsmFsBlockSize(pDb->pFS) / lsmFsPageSize(pDb->pFS);
      pLvl->lhs.pRedirect = &p->redirect;
    }
  }

#if 0
  if( rc==LSM_OK ){
    lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "move-block");
  }
#endif
  return rc;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int nMerge,                     /* Try to merge this many levels at once */
................................................................................
      nRemaining -= nDone;

      /* Could not find any work to do. Finished. */
      if( nDone==0 ) break;
    }else{
      MergeWorker mergeworker;    /* State used to work on the level merge */

      assert( pDb->bIncrMerge==0 );
      pDb->bIncrMerge = 1;
      rc = mergeWorkerInit(pDb, pLevel, &mergeworker);
      assert( mergeworker.nWork==0 );
      while( rc==LSM_OK 
          && 0==mergeWorkerDone(&mergeworker) 
          && mergeworker.nWork<nRemaining 
      ){
        rc = mergeWorkerStep(&mergeworker);
      }
      nRemaining -= LSM_MAX(mergeworker.nWork, 1);
      pDb->bIncrMerge = 0;

      /* Check if the merge operation is completely finished. If so, the
      ** Merge object and the right-hand-side of the level can be deleted. 
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
................................................................................
      pMerge = (Merge *)lsmMallocZeroRc(pDb->pEnv, 
          sizeof(Merge) + sizeof(MergeInput)*(pLvl->pMerge->nInput+1), &rc
      );
      if( rc==LSM_OK ){
        memcpy(pMerge, pLvl->pMerge, sizeof(Merge));
        pMerge->aInput = (MergeInput *)&pMerge[1];
        memcpy(&pMerge->aInput[1], pLvl->pMerge->aInput, 
            sizeof(MergeInput)*(pLvl->pMerge->nInput)
        );
        pMerge->aInput[0].iPg = aRhs[0].iFirst;
        pMerge->aInput[0].iCell = 0;
        pMerge->nInput++;
        lsmFree(pDb->pEnv, pLvl->pMerge);
        pLvl->pMerge = pMerge;

................................................................................
){
  Snapshot *pDump = pSnap;
  Level *pTopLevel;

  assert( pSnap );
  pTopLevel = lsmDbSnapshotLevel(pDump);
  if( pDb->xLog && pTopLevel ){
    static int nCall = 0;
    Level *pLevel;
    int iLevel = 0;

    nCall++;
    lsmLogMessage(pDb, LSM_OK, "Database structure %d (%s)", nCall, zWhy);

    /* if( nCall>639 ) bKeys = 1; */

    for(pLevel=pTopLevel; pLevel; pLevel=pLevel->pNext){
      char zLeft[1024];
      char zRight[1024];
      int i = 0;

      Segment *aLeft[24];  

Changes to tool/lsmview.tcl.

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  }
  namespace export scrollable
}
namespace import ::autoscroll::*
#############################################################################

proc exec_lsmtest_show {args} {
  set fd [open [list |lsmtest show {*}$args]]
  set res ""
  while {![eof $fd]} { 
    set line [gets $fd]
    if {[regexp {^\#.*} $line]} continue
    if {[regexp {^Leaked*} $line]} continue
    append res $line
    append res "\n"







|







39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  }
  namespace export scrollable
}
namespace import ::autoscroll::*
#############################################################################

proc exec_lsmtest_show {args} {
  set fd [open [list |lsmtest show {*}$args 2>/dev/null]]
  set res ""
  while {![eof $fd]} { 
    set line [gets $fd]
    if {[regexp {^\#.*} $line]} continue
    if {[regexp {^Leaked*} $line]} continue
    append res $line
    append res "\n"