Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Mark blocks as free more aggressively.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: eb4ae342c5c851aa8b450100549fa26e1f957e52
User & Date: dan 2012-09-27 20:04:32.151
Context
2012-09-28
14:57
Improvements to lsmperf.tcl test. check-in: 371c6c984d user: dan tags: trunk
2012-09-27
20:04
Mark blocks as free more aggressively. check-in: eb4ae342c5 user: dan tags: trunk
16:09
Fix to ensure that the log file is always deleted following a successful system shutdown. check-in: 3d1dacff87 user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsmInt.h.
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611

int lsmFsPageSize(FileSystem *);
void lsmFsSetPageSize(FileSystem *, int);

int lsmFsFileid(lsm_db *pDb, void **ppId, int *pnId);

/* Creating, populating, gobbling and deleting sorted runs. */
void lsmFsGobble(Snapshot *, Segment *, Page *);
int lsmFsSortedDelete(FileSystem *, Snapshot *, int, Segment *);
int lsmFsSortedFinish(FileSystem *, Segment *);
int lsmFsSortedAppend(FileSystem *, Snapshot *, Segment *, Page **);
int lsmFsPhantomMaterialize(FileSystem *, Snapshot *, Segment *);

/* Functions to retrieve the lsm_env pointer from a FileSystem or Page object */
lsm_env *lsmFsEnv(FileSystem *);







|







597
598
599
600
601
602
603
604
605
606
607
608
609
610
611

int lsmFsPageSize(FileSystem *);
void lsmFsSetPageSize(FileSystem *, int);

int lsmFsFileid(lsm_db *pDb, void **ppId, int *pnId);

/* Creating, populating, gobbling and deleting sorted runs. */
void lsmFsGobble(lsm_db *, Segment *, Pgno *, int);
int lsmFsSortedDelete(FileSystem *, Snapshot *, int, Segment *);
int lsmFsSortedFinish(FileSystem *, Segment *);
int lsmFsSortedAppend(FileSystem *, Snapshot *, Segment *, Page **);
int lsmFsPhantomMaterialize(FileSystem *, Snapshot *, Segment *);

/* Functions to retrieve the lsm_env pointer from a FileSystem or Page object */
lsm_env *lsmFsEnv(FileSystem *);
Changes to src/lsm_file.c.
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
}

void lsmEnvShmBarrier(lsm_env *pEnv){
  return pEnv->xShmBarrier();
}

void lsmEnvShmUnmap(lsm_env *pEnv, lsm_file *pFile, int bDel){
  return pEnv->xShmUnmap(pFile, bDel);
}

void lsmEnvSleep(lsm_env *pEnv, int nUs){
  return pEnv->xSleep(pEnv, nUs);
}


/*
** Write the contents of string buffer pStr into the log file, starting at
** offset iOff.
*/







|



|







260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
}

void lsmEnvShmBarrier(lsm_env *pEnv){
  return pEnv->xShmBarrier();
}

void lsmEnvShmUnmap(lsm_env *pEnv, lsm_file *pFile, int bDel){
  pEnv->xShmUnmap(pFile, bDel);
}

void lsmEnvSleep(lsm_env *pEnv, int nUs){
  pEnv->xSleep(pEnv, nUs);
}


/*
** Write the contents of string buffer pStr into the log file, starting at
** offset iOff.
*/
928
929
930
931
932
933
934
935
936
937
938





939

940

941

942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968









































969
970
971
972
973
974
975
    }

    if( bZero ) memset(pDel, 0, sizeof(Segment));
  }
  return LSM_OK;
}

/*
** The pager reference passed as the only argument must refer to a sorted
** file page (not a log or meta page). This call indicates that the argument
** page is now the first page in its sorted file - all previous pages may





** be considered free.

*/

void lsmFsGobble(

  Snapshot *pSnapshot,
  Segment *pRun, 
  Page *pPg
){
  FileSystem *pFS = pPg->pFS;

  if( pPg->iPg!=pRun->iFirst ){
    int rc = LSM_OK;
    int iBlk = fsPageToBlock(pFS, pRun->iFirst);
    int iFirstBlk = fsPageToBlock(pFS, pPg->iPg);

    pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
    pRun->iFirst = pPg->iPg;
    while( rc==LSM_OK && iBlk!=iFirstBlk ){
      int iNext = 0;
      rc = fsBlockNext(pFS, iBlk, &iNext);
      if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, 0, iBlk);
      pRun->nSize -= (
          1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk)
      );
      iBlk = iNext;
    }

    pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
    assert( pRun->nSize>0 );
  }
}











































/*
** The first argument to this function is a valid reference to a database
** file page that is part of a sorted run. If parameter eDir is -1, this 
** function attempts to locate and load the previous page in the same run. 
** Or, if eDir is +1, it attempts to find the next page in the same run.







|
|
|
|
>
>
>
>
>
|
>
|
>
|
>


|

<
<
|


|


|






|







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953


954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
    }

    if( bZero ) memset(pDel, 0, sizeof(Segment));
  }
  return LSM_OK;
}

static Pgno firstOnBlock(FileSystem *pFS, int iBlk, Pgno *aPgno, int nPgno){
  Pgno iRet = 0;
  int i;
  for(i=0; i<nPgno; i++){
    Pgno iPg = aPgno[i];
    if( fsPageToBlock(pFS, iPg)==iBlk && (iRet==0 || iPg<iRet) ){
      iRet = iPg;
    }
  }
  return iRet;
}

#if 0
void fsOldGobble(
  FileSystem *pFS,
  Snapshot *pSnapshot,
  Segment *pRun, 
  Pgno iPg
){


  if( iPg!=pRun->iFirst ){
    int rc = LSM_OK;
    int iBlk = fsPageToBlock(pFS, pRun->iFirst);
    int iFirstBlk = fsPageToBlock(pFS, iPg);

    pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
    pRun->iFirst = iPg;
    while( rc==LSM_OK && iBlk!=iFirstBlk ){
      int iNext = 0;
      rc = fsBlockNext(pFS, iBlk, &iNext);
      if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, 0, iBlk);
      pRun->nSize -= (
          1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk)
          );
      iBlk = iNext;
    }

    pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
    assert( pRun->nSize>0 );
  }
}
#endif

/*
** Argument aPgno is an array of nPgno page numbers. All pages belong to
** the segment pRun. This function gobbles from the start of the run to the
** first page that appears in aPgno[] (i.e. so that the aPgno[] entry is
** the new first page of the run).
*/
void lsmFsGobble(
  lsm_db *pDb,
  Segment *pRun, 
  Pgno *aPgno,
  int nPgno
){
  int rc = LSM_OK;
  FileSystem *pFS = pDb->pFS;
  Snapshot *pSnapshot = pDb->pWorker;
  int iBlk;

  assert( pRun->nSize>0 );
  iBlk = fsPageToBlock(pFS, pRun->iFirst);
  pRun->nSize += (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));

  while( rc==LSM_OK ){
    int iNext = 0;
    Pgno iFirst = firstOnBlock(pFS, iBlk, aPgno, nPgno);
    if( iFirst ){
      pRun->iFirst = iFirst;
      break;
    }
    rc = fsBlockNext(pFS, iBlk, &iNext);
    if( rc==LSM_OK ) rc = fsFreeBlock(pFS, pSnapshot, pRun, iBlk);
    pRun->nSize -= (
        1 + fsLastPageOnBlock(pFS, iBlk) - fsFirstPageOnBlock(pFS, iBlk)
    );
    iBlk = iNext;
  }

  pRun->nSize -= (pRun->iFirst - fsFirstPageOnBlock(pFS, iBlk));
  assert( pRun->nSize>0 );
}


/*
** The first argument to this function is a valid reference to a database
** file page that is part of a sorted run. If parameter eDir is -1, this 
** function attempts to locate and load the previous page in the same run. 
** Or, if eDir is +1, it attempts to find the next page in the same run.
Changes to src/lsm_sorted.c.
317
318
319
320
321
322
323

324
325
326
327
328
329
330
  lsm_db *pDb;                    /* Database handle */
  Level *pLevel;                  /* Worker snapshot Level being merged */
  MultiCursor *pCsr;              /* Cursor to read new segment contents from */
  int bFlush;                     /* True if this is an in-memory tree flush */
  Hierarchy hier;                 /* B-tree hierarchy under construction */
  Page *pPage;                    /* Current output page */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */

};

#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
static int assertBtreeOk(lsm_db *, Segment *);
static void assertRunInOrder(lsm_db *pDb, Segment *pSeg);
#else







>







317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
  lsm_db *pDb;                    /* Database handle */
  Level *pLevel;                  /* Worker snapshot Level being merged */
  MultiCursor *pCsr;              /* Cursor to read new segment contents from */
  int bFlush;                     /* True if this is an in-memory tree flush */
  Hierarchy hier;                 /* B-tree hierarchy under construction */
  Page *pPage;                    /* Current output page */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
  Pgno *aGobble;                  /* Gobble point for each input segment */
};

#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
static int assertBtreeOk(lsm_db *, Segment *);
static void assertRunInOrder(lsm_db *pDb, Segment *pSeg);
#else
557
558
559
560
561
562
563
















564
565
566
567
568
569
570
  assert( (void *)aKey!=pBlob->pData || nKey==pBlob->nData );
  if( (void *)aKey!=pBlob->pData ){
    rc = sortedBlobSet(pEnv, pBlob, aKey, nKey);
  }

  return rc;
}

















static int pageGetBtreeKey(
  Page *pPg,
  int iKey, 
  int *piPtr, 
  int *piTopic, 
  void **ppKey,







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
  assert( (void *)aKey!=pBlob->pData || nKey==pBlob->nData );
  if( (void *)aKey!=pBlob->pData ){
    rc = sortedBlobSet(pEnv, pBlob, aKey, nKey);
  }

  return rc;
}

static Pgno pageGetBtreeRef(Page *pPg, int iKey){
  Pgno iRef;
  u8 *aData;
  int nData;
  u8 *aCell;

  aData = fsPageData(pPg, &nData);
  aCell = pageGetCell(aData, nData, iKey);
  assert( aCell[0]==0 );
  aCell++;
  aCell += lsmVarintGet32(aCell, &iRef);
  lsmVarintGet32(aCell, &iRef);
  assert( iRef>0 );
  return iRef;
}

static int pageGetBtreeKey(
  Page *pPg,
  int iKey, 
  int *piPtr, 
  int *piTopic, 
  void **ppKey,
1601
1602
1603
1604
1605
1606
1607

1608
1609

1610
1611
1612
1613
1614
1615
1616
1617






1618
1619
1620
1621
1622
1623
1624
  pCsr->iCurrentPtr = iBest;
}

static int seekInBtree(
  LevelCursor *pCsr,
  Segment *pSeg,
  void *pKey, int nKey,           /* Key to seek to */

  Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
){

  int rc;
  int iPg;
  Page *pPg = 0;
  Blob blob = {0, 0, 0};
  int iTopic = 0;                 /* TODO: Fix me */

  iPg = pSeg->iRoot;
  do {






    rc = lsmFsDbPageGet(pCsr->pFS, iPg, &pPg);
    assert( rc==LSM_OK || pPg==0 );
    if( rc==LSM_OK ){
      u8 *aData;                  /* Buffer containing page data */
      int nData;                  /* Size of aData[] in bytes */
      int iMin;
      int iMax;







>


>








>
>
>
>
>
>







1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
  pCsr->iCurrentPtr = iBest;
}

static int seekInBtree(
  LevelCursor *pCsr,
  Segment *pSeg,
  void *pKey, int nKey,           /* Key to seek to */
  Pgno *aPg,                      /* OUT: Page numbers */
  Page **ppPg                     /* OUT: Leaf (sorted-run) page reference */
){
  int i = 0;
  int rc;
  int iPg;
  Page *pPg = 0;
  Blob blob = {0, 0, 0};
  int iTopic = 0;                 /* TODO: Fix me */

  iPg = pSeg->iRoot;
  do {
    Pgno *piFirst = 0;
    if( aPg ){
      aPg[i++] = iPg;
      piFirst = &aPg[i];
    }

    rc = lsmFsDbPageGet(pCsr->pFS, iPg, &pPg);
    assert( rc==LSM_OK || pPg==0 );
    if( rc==LSM_OK ){
      u8 *aData;                  /* Buffer containing page data */
      int nData;                  /* Size of aData[] in bytes */
      int iMin;
      int iMax;
1639
1640
1641
1642
1643
1644
1645





1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663

1664



1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
        void *pKeyT; int nKeyT;       /* Key for cell iTry */
        int iTopicT;                  /* Topic for key pKeyT/nKeyT */
        int iPtr;                     /* Pointer associated with cell iTry */
        int res;                      /* (pKey - pKeyT) */

        rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob);
        if( rc!=LSM_OK ) break;






        res = iTopic - iTopicT;
        if( res==0 ) res = pCsr->xCmp(pKey, nKey, pKeyT, nKeyT);

        if( res<0 ){
          iPg = iPtr;
          iMax = iTry-1;
        }else{
          iMin = iTry+1;
        }
      }
      lsmFsPageRelease(pPg);
      pPg = 0;
    }
  }while( rc==LSM_OK );

  sortedBlobFree(&blob);
  assert( (rc==LSM_OK)==(pPg!=0) );

  *ppPg = pPg;



  return rc;
}

static int seekInSegment(
  LevelCursor *pCsr, 
  SegmentPtr *pPtr,
  void *pKey, int nKey,
  int iPg,                        /* Page to search */
  int eSeek,                      /* Search bias - see above */
  int *piPtr                      /* OUT: FC pointer */
){
  int iPtr = iPg;
  int rc = LSM_OK;

  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
    rc = seekInBtree(pCsr, pPtr->pSeg, pKey, nKey, &pPg);
    if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
  }else{
    if( iPtr==0 ){
      iPtr = pPtr->pSeg->iFirst;
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pFS, pPtr, iPtr);







>
>
>
>
>


















>
|
>
>
>

















|







1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
        void *pKeyT; int nKeyT;       /* Key for cell iTry */
        int iTopicT;                  /* Topic for key pKeyT/nKeyT */
        int iPtr;                     /* Pointer associated with cell iTry */
        int res;                      /* (pKey - pKeyT) */

        rc = pageGetBtreeKey(pPg, iTry, &iPtr, &iTopicT, &pKeyT, &nKeyT, &blob);
        if( rc!=LSM_OK ) break;
        if( piFirst && pKeyT==blob.pData ){
          *piFirst = pageGetBtreeRef(pPg, iTry);
          piFirst = 0;
          i++;
        }

        res = iTopic - iTopicT;
        if( res==0 ) res = pCsr->xCmp(pKey, nKey, pKeyT, nKeyT);

        if( res<0 ){
          iPg = iPtr;
          iMax = iTry-1;
        }else{
          iMin = iTry+1;
        }
      }
      lsmFsPageRelease(pPg);
      pPg = 0;
    }
  }while( rc==LSM_OK );

  sortedBlobFree(&blob);
  assert( (rc==LSM_OK)==(pPg!=0) );
  if( ppPg ){
    *ppPg = pPg;
  }else{
    lsmFsPageRelease(pPg);
  }
  return rc;
}

static int seekInSegment(
  LevelCursor *pCsr, 
  SegmentPtr *pPtr,
  void *pKey, int nKey,
  int iPg,                        /* Page to search */
  int eSeek,                      /* Search bias - see above */
  int *piPtr                      /* OUT: FC pointer */
){
  int iPtr = iPg;
  int rc = LSM_OK;

  if( pPtr->pSeg->iRoot ){
    Page *pPg;
    assert( pPtr->pSeg->iRoot!=0 );
    rc = seekInBtree(pCsr, pPtr->pSeg, pKey, nKey, 0, &pPg);
    if( rc==LSM_OK ) segmentPtrSetPage(pPtr, pPg);
  }else{
    if( iPtr==0 ){
      iPtr = pPtr->pSeg->iFirst;
    }
    if( rc==LSM_OK ){
      rc = segmentPtrLoadPage(pCsr->pFS, pPtr, iPtr);
3443
3444
3445
3446
3447
3448
3449










3450
3451
3452
3453
3454
3455
3456
    LevelCursor *pPtrs = &pCsr->aSegCsr[pCsr->nSegCsr-1];
    if( segmentCursorValid(pPtrs)
     && 0==pDb->xCmp(pPtrs->aPtr[0].pKey, pPtrs->aPtr[0].nKey, pKey, nKey)
    ){
      iPtr = pPtrs->aPtr[0].iPtr+pPtrs->aPtr[0].iPgPtr;
    }
  }











  /* If this is a separator key and we know that the output pointer has not
  ** changed, there is no point in writing an output record. Otherwise,
  ** proceed. */
  if( rtIsSeparator(eType)==0 || iPtr!=0 ){
    int iSPtr = 0;                /* Separators require a pointer here */








>
>
>
>
>
>
>
>
>
>







3477
3478
3479
3480
3481
3482
3483
3484
3485
3486
3487
3488
3489
3490
3491
3492
3493
3494
3495
3496
3497
3498
3499
3500
    LevelCursor *pPtrs = &pCsr->aSegCsr[pCsr->nSegCsr-1];
    if( segmentCursorValid(pPtrs)
     && 0==pDb->xCmp(pPtrs->aPtr[0].pKey, pPtrs->aPtr[0].nKey, pKey, nKey)
    ){
      iPtr = pPtrs->aPtr[0].iPtr+pPtrs->aPtr[0].iPgPtr;
    }
  }

  if( pMW->aGobble ){
    int iGobble = pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
    if( iGobble<pMW->pLevel->nRight ){
      SegmentPtr *pGobble = &pCsr->aSegCsr[iGobble].aPtr[0];
      if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
        pMW->aGobble[iGobble] = lsmFsPageNumber(pGobble->pPg);
      }
    }
  }

  /* If this is a separator key and we know that the output pointer has not
  ** changed, there is no point in writing an output record. Otherwise,
  ** proceed. */
  if( rtIsSeparator(eType)==0 || iPtr!=0 ){
    int iSPtr = 0;                /* Separators require a pointer here */

3731
3732
3733
3734
3735
3736
3737
3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748

3749
3750
3751
3752
3753
3754
3755
3756
3757
3758
3759

3760

3761
3762
3763
3764
3765
3766
3767
}

static int mergeWorkerInit(
  lsm_db *pDb,                    /* Db connection to do merge work */
  Level *pLevel,                  /* Level to work on merging */
  MergeWorker *pMW                /* Object to initialize */
){
  int rc;                         /* Return code */
  Merge *pMerge = pLevel->pMerge; /* Persistent part of merge state */
  MultiCursor *pCsr = 0;          /* Cursor opened for pMW */

  assert( pDb->pWorker );
  assert( pLevel->pMerge );
  assert( pLevel->nRight>0 );

  memset(pMW, 0, sizeof(MergeWorker));
  pMW->pDb = pDb;
  pMW->pLevel = pLevel;


  /* Create a multi-cursor to read the data to write to the new
  ** segment. The new segment contains:
  **
  **   1. Records from LHS of each of the nMerge levels being merged.
  **   2. Separators from either the last level being merged, or the
  **      separators attached to the LHS of the following level, or neither.
  **
  ** If the new level is the lowest (oldest) in the db, discard any
  ** delete keys. Key annihilation.
  */

  rc = multiCursorNew(pDb, pDb->pWorker, TREE_NONE, 0, &pCsr);

  if( rc==LSM_OK ){
    rc = multiCursorAddLevel(pCsr, pLevel, MULTICURSOR_ADDLEVEL_RHS);
  }
  if( rc==LSM_OK && pLevel->pNext ){
    if( pMerge->nInput > pLevel->nRight ){
      Level *pNext = pLevel->pNext;
      rc = multiCursorAddLevel(pCsr, pNext, MULTICURSOR_ADDLEVEL_LHS_SEP);







|










>











>
|
>







3775
3776
3777
3778
3779
3780
3781
3782
3783
3784
3785
3786
3787
3788
3789
3790
3791
3792
3793
3794
3795
3796
3797
3798
3799
3800
3801
3802
3803
3804
3805
3806
3807
3808
3809
3810
3811
3812
3813
3814
}

static int mergeWorkerInit(
  lsm_db *pDb,                    /* Db connection to do merge work */
  Level *pLevel,                  /* Level to work on merging */
  MergeWorker *pMW                /* Object to initialize */
){
  int rc = LSM_OK;                /* Return code */
  Merge *pMerge = pLevel->pMerge; /* Persistent part of merge state */
  MultiCursor *pCsr = 0;          /* Cursor opened for pMW */

  assert( pDb->pWorker );
  assert( pLevel->pMerge );
  assert( pLevel->nRight>0 );

  memset(pMW, 0, sizeof(MergeWorker));
  pMW->pDb = pDb;
  pMW->pLevel = pLevel;
  pMW->aGobble = lsmMallocZeroRc(pDb->pEnv, sizeof(Pgno) * pLevel->nRight, &rc);

  /* Create a multi-cursor to read the data to write to the new
  ** segment. The new segment contains:
  **
  **   1. Records from LHS of each of the nMerge levels being merged.
  **   2. Separators from either the last level being merged, or the
  **      separators attached to the LHS of the following level, or neither.
  **
  ** If the new level is the lowest (oldest) in the db, discard any
  ** delete keys. Key annihilation.
  */
  if( rc==LSM_OK ){
    rc = multiCursorNew(pDb, pDb->pWorker, TREE_NONE, 0, &pCsr);
  }
  if( rc==LSM_OK ){
    rc = multiCursorAddLevel(pCsr, pLevel, MULTICURSOR_ADDLEVEL_RHS);
  }
  if( rc==LSM_OK && pLevel->pNext ){
    if( pMerge->nInput > pLevel->nRight ){
      Level *pNext = pLevel->pNext;
      rc = multiCursorAddLevel(pCsr, pNext, MULTICURSOR_ADDLEVEL_LHS_SEP);
3810
3811
3812
3813
3814
3815
3816































3817
3818
3819
3820
3821
3822
3823
      }
    }
    pCsr->flags |= CURSOR_NEXT_OK;
  }

  return rc;
}
































/*
** Argument p points to a level of age N. Return the number of levels in
** the linked list starting at p that have age=N (always at least 1).
*/
static int sortedCountLevels(Level *p){
  int iAge = p->iAge;







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867
3868
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
      }
    }
    pCsr->flags |= CURSOR_NEXT_OK;
  }

  return rc;
}

static int sortedBtreeGobble(
  lsm_db *pDb, 
  MultiCursor *pCsr, 
  int iGobble
){
  int rc = LSM_OK;
  if( rtTopic(pCsr->eType)==0 ){
    LevelCursor *pLvlcsr = &pCsr->aSegCsr[iGobble];
    Segment *pSeg = pLvlcsr->aPtr[0].pSeg;
    Blob *p = &pCsr->key;
    Pgno *aPg;
    int nPg;

    assert( pLvlcsr->nPtr==1 );
    assert( pSeg->iRoot>0 );

    aPg = lsmMallocZeroRc(pDb->pEnv, sizeof(Pgno)*32, &rc);
    if( rc==LSM_OK ){
      rc = seekInBtree(pLvlcsr, pSeg, p->pData, p->nData, aPg, 0); 
    }

    for(nPg=0; aPg[nPg]; nPg++);
#if 1
    lsmFsGobble(pDb, pSeg, aPg, nPg);
#endif

    lsmFree(pDb->pEnv, aPg);
  }
  return rc;
}

/*
** Argument p points to a level of age N. Return the number of levels in
** the linked list starting at p that have age=N (always at least 1).
*/
static int sortedCountLevels(Level *p){
  int iAge = p->iAge;
3942
3943
3944
3945
3946
3947
3948










3949
3950
3951


3952
3953
3954
3955
3956
3957

3958
3959
3960
3961
3962
3963
3964
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
      */
      if( rc==LSM_OK ){
        if( mergeWorkerDone(&mergeworker)==0 ){










          int iGobble = mergeworker.pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
          if( iGobble<pLevel->nRight ){
            SegmentPtr *pGobble = &mergeworker.pCsr->aSegCsr[iGobble].aPtr[0];


            if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 
             && pGobble->pSeg->iRoot==0
            ){
              lsmFsGobble(pWorker, pGobble->pSeg, pGobble->pPg);
            }
          }


        }else if( pLevel->lhs.iFirst==0 ){
          /* If the new level is completely empty, remove it from the 
          ** database snapshot. This can only happen if all input keys were
          ** annihilated. Since keys are only annihilated if the new level
          ** is the last in the linked list (contains the most ancient of
          ** database content), this guarantees that pLevel->pNext==0.  */ 







>
>
>
>
>
>
>
>
>
>



>
>
|
|
<
|


>







4020
4021
4022
4023
4024
4025
4026
4027
4028
4029
4030
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041
4042
4043

4044
4045
4046
4047
4048
4049
4050
4051
4052
4053
4054
      **
      ** Otherwise, gobble up (declare eligible for recycling) any pages
      ** from rhs segments for which the content has been completely merged
      ** into the lhs of the level.
      */
      if( rc==LSM_OK ){
        if( mergeWorkerDone(&mergeworker)==0 ){
          int i;
          for(i=0; i<pLevel->nRight; i++){
            SegmentPtr *pGobble = &mergeworker.pCsr->aSegCsr[i].aPtr[0];
            if( pGobble->pSeg->iRoot ){
              rc = sortedBtreeGobble(pDb, mergeworker.pCsr, i);
            }else if( mergeworker.aGobble[i] ){
              lsmFsGobble(pDb, pGobble->pSeg, &mergeworker.aGobble[i], 1);
            }
          }
#if 0
          int iGobble = mergeworker.pCsr->aTree[1] - CURSOR_DATA_SEGMENT;
          if( iGobble<pLevel->nRight ){
            SegmentPtr *pGobble = &mergeworker.pCsr->aSegCsr[iGobble].aPtr[0];
            if( pGobble->pSeg->iRoot ){
              rc = sortedBtreeGobble(pDb, mergeworker.pCsr, iGobble);
            }else if( (pGobble->flags & PGFTR_SKIP_THIS_FLAG)==0 ){
              Pgno iPg = lsmFsPageNumber(pGobble->pPg);

              lsmFsGobble(pDb, pGobble->pSeg, &iPg, 1);
            }
          }
#endif

        }else if( pLevel->lhs.iFirst==0 ){
          /* If the new level is completely empty, remove it from the 
          ** database snapshot. This can only happen if all input keys were
          ** annihilated. Since keys are only annihilated if the new level
          ** is the last in the linked list (contains the most ancient of
          ** database content), this guarantees that pLevel->pNext==0.  */ 
4257
4258
4259
4260
4261
4262
4263
4264
4265
4266
4267
4268
4269
4270
4271
int lsmSortedAutoWork(
  lsm_db *pDb,                    /* Database handle */
  int nUnit                       /* Pages of data written to in-memory tree */
){
  int rc;                         /* Return code */
  int nRemaining;                 /* Units of work to do before returning */
  int nDepth = 0;                 /* Current height of tree (longest path) */
  int nWrite;                     /* Pages written */
  Level *pLevel;                  /* Used to iterate through levels */
  int bRestore = 0;

  assert( pDb->pWorker==0 );
  assert( pDb->nTransOpen>0 );

  /* Determine how many units of work to do before returning. One unit of







<







4347
4348
4349
4350
4351
4352
4353

4354
4355
4356
4357
4358
4359
4360
int lsmSortedAutoWork(
  lsm_db *pDb,                    /* Database handle */
  int nUnit                       /* Pages of data written to in-memory tree */
){
  int rc;                         /* Return code */
  int nRemaining;                 /* Units of work to do before returning */
  int nDepth = 0;                 /* Current height of tree (longest path) */

  Level *pLevel;                  /* Used to iterate through levels */
  int bRestore = 0;

  assert( pDb->pWorker==0 );
  assert( pDb->nTransOpen>0 );

  /* Determine how many units of work to do before returning. One unit of
4298
4299
4300
4301
4302
4303
4304
4305
4306
4307
4308
4309
4310
4311
4312
4313
4314
4315
      rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
    }
    if( rc==LSM_OK ){
      rc = lsmRestoreCursors(pDb);
    }
  }

#if 0
  lsmLogMessage(pDb, 0, "auto-work: %d pages", nWrite);
#endif

  return rc;
}

/*
** This function is only called during system shutdown. The contents of
** any in-memory trees present (old or current) are written out to disk.
*/







<
<
<
<







4387
4388
4389
4390
4391
4392
4393




4394
4395
4396
4397
4398
4399
4400
      rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
    }
    if( rc==LSM_OK ){
      rc = lsmRestoreCursors(pDb);
    }
  }





  return rc;
}

/*
** This function is only called during system shutdown. The contents of
** any in-memory trees present (old or current) are written out to disk.
*/
Changes to tool/lsmperf.tcl.
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
  append script $data2
  append script $data3

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 120 50000 0 30 {
  lsm-mt    "mmap=1 multi_proc=0 threads=3 autowork=0 autocheckpoint=0"
  leveldb   leveldb
}

#  lsm-mt    "mmap=1 multi_proc=0 threads=2 autowork=0 autocheckpoint=8192000"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"







|







164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
  append script $data2
  append script $data3

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 120 50000 50000 30 {
  lsm-mt    "mmap=1 multi_proc=0 threads=3 autowork=0 autocheckpoint=0"
  leveldb   leveldb
}

#  lsm-mt    "mmap=1 multi_proc=0 threads=2 autowork=0 autocheckpoint=8192000"
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"