Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a mmap-mode bug.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | rework-flow-control
Files: files | file ages | folders
SHA1: be1e51309003f6cc71148f13c8e3f569e966c4ad
User & Date: dan 2012-09-25 14:50:53.695
Context
2012-09-25
17:25
Fix a race condition causing LSM to read inconsistent in-memory and on-disk databases. check-in: 1743941409 user: dan tags: rework-flow-control
14:50
Fix a mmap-mode bug. check-in: be1e513090 user: dan tags: rework-flow-control
2012-09-24
17:18
Fix a problem preventing log file space from being reclaimed. check-in: b9f122f4e3 user: dan tags: rework-flow-control
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsm_file.c.
696
697
698
699
700
701
702
703
704
705

706
707
708
709
710
711
712
  int *pRc
){
  if( *pRc==LSM_OK && iSz>pFS->nMap ){
    Page *pFix;
    int rc;
    u8 *aOld = pFS->pMap;
    rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, iSz, &pFS->pMap, &pFS->nMap);
    if( rc==LSM_OK ){
      u8 *aData = (u8 *)pFS->pMap;
      for(pFix=pFS->pLruFirst; pFix; pFix=pFix->pLruNext){

        pFix->aData = &aData[pFS->nPagesize * (i64)(pFix->iPg-1)];
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}







|


>







696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
  int *pRc
){
  if( *pRc==LSM_OK && iSz>pFS->nMap ){
    Page *pFix;
    int rc;
    u8 *aOld = pFS->pMap;
    rc = lsmEnvRemap(pFS->pEnv, pFS->fdDb, iSz, &pFS->pMap, &pFS->nMap);
    if( rc==LSM_OK && pFS->pMap!=aOld ){
      u8 *aData = (u8 *)pFS->pMap;
      for(pFix=pFS->pLruFirst; pFix; pFix=pFix->pLruNext){
        assert( &aOld[pFS->nPagesize * (i64)(pFix->iPg-1)]==pFix->aData );
        pFix->aData = &aData[pFS->nPagesize * (i64)(pFix->iPg-1)];
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}
Changes to src/lsm_sorted.c.
322
323
324
325
326
327
328



329

330
331
332
333
334
335
336
  Page *pPage;                    /* Current output page */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
static int assertBtreeOk(lsm_db *, Segment *);



#endif


struct FilePage { u8 *aData; int nData; };
static u8 *fsPageData(Page *pPg, int *pnData){
  *pnData = ((struct FilePage *)(pPg))->nData;
  return ((struct FilePage *)(pPg))->aData;
}
/*UNUSED static u8 *fsPageDataPtr(Page *pPg){







>
>
>

>







322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
  Page *pPage;                    /* Current output page */
  int nWork;                      /* Number of calls to mergeWorkerNextPage() */
};

#ifdef LSM_DEBUG_EXPENSIVE
static int assertPointersOk(lsm_db *, Segment *, Segment *, int);
static int assertBtreeOk(lsm_db *, Segment *);
static void assertRunInOrder(lsm_db *pDb, Segment *pSeg);
#else
#define assertRunInOrder(x,y)
#endif


struct FilePage { u8 *aData; int nData; };
static u8 *fsPageData(Page *pPg, int *pnData){
  *pnData = ((struct FilePage *)(pPg))->nData;
  return ((struct FilePage *)(pPg))->aData;
}
/*UNUSED static u8 *fsPageDataPtr(Page *pPg){
603
604
605
606
607
608
609
610
611
612
613



614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
static int btreeCursorLoadKey(BtreeCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->iPg<0 ){
    pCsr->pKey = 0;
    pCsr->nKey = 0;
    pCsr->eType = 0;
  }else{
    int iPg;
    for(iPg=pCsr->iPg; iPg>=0; iPg--){
      int iCell = pCsr->aPg[pCsr->iPg].iCell;
      if( iCell>=0 ){



        int dummy;
        rc = pageGetBtreeKey(
            pCsr->aPg[pCsr->iPg].pPage, pCsr->aPg[pCsr->iPg].iCell,
            &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
        );
        pCsr->eType |= SORTED_SEPARATOR;
        break;
      }
    }

    if( iPg<0 ) rc = LSM_CORRUPT_BKPT;
  }

  return rc;
}

static int btreeCursorPtr(u8 *aData, int nData, int iCell){
  int nCell;







|
|
|
|
>
>
>
|
|
|
|
|
|
<
<
<
<
<







607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626





627
628
629
630
631
632
633
static int btreeCursorLoadKey(BtreeCursor *pCsr){
  int rc = LSM_OK;
  if( pCsr->iPg<0 ){
    pCsr->pKey = 0;
    pCsr->nKey = 0;
    pCsr->eType = 0;
  }else{
    int dummy;
    int iPg = pCsr->iPg;
    int iCell = pCsr->aPg[iPg].iCell;
    while( iCell<0 && (--iPg)>=0 ){
      iCell = pCsr->aPg[iPg].iCell-1;
    }
    if( iPg<0 || iCell<0 ) return LSM_CORRUPT_BKPT;

    rc = pageGetBtreeKey(
        pCsr->aPg[iPg].pPage, iCell,
        &dummy, &pCsr->eType, &pCsr->pKey, &pCsr->nKey, &pCsr->blob
    );
    pCsr->eType |= SORTED_SEPARATOR;





  }

  return rc;
}

static int btreeCursorPtr(u8 *aData, int nData, int iCell){
  int nCell;
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
  int nData;

  assert( pCsr->iPg>=0 );
  assert( pCsr->iPg==pCsr->nDepth-1 );

  aData = fsPageData(pPg->pPage, &nData);
  nCell = pageGetNRec(aData, nData);

  assert( pPg->iCell<=nCell );

  pPg->iCell++;
  if( pPg->iCell==nCell ){
    Pgno iLoad;

    /* Up to parent. */
    lsmFsPageRelease(pPg->pPage);
    pPg->pPage = 0;







<

<







648
649
650
651
652
653
654

655

656
657
658
659
660
661
662
  int nData;

  assert( pCsr->iPg>=0 );
  assert( pCsr->iPg==pCsr->nDepth-1 );

  aData = fsPageData(pPg->pPage, &nData);
  nCell = pageGetNRec(aData, nData);

  assert( pPg->iCell<=nCell );

  pPg->iCell++;
  if( pPg->iCell==nCell ){
    Pgno iLoad;

    /* Up to parent. */
    lsmFsPageRelease(pPg->pPage);
    pPg->pPage = 0;
3992
3993
3994
3995
3996
3997
3998


3999
4000
4001
4002
4003
4004
4005
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */
      if( bFlush && sortedDbIsFull(pDb)==0 ) break;



#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
    }
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );







>
>







3992
3993
3994
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */
      if( bFlush && sortedDbIsFull(pDb)==0 ) break;

      assertRunInOrder(pDb, &pLevel->lhs);

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
    }
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
4729
4730
4731
4732
4733
4734
4735














































4736
4737
4738
4739
4740
4741
4742
void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->apTreeCsr[0]);
    lsmTreeCursorSave(pCsr->apTreeCsr[1]);
  }
}















































#ifdef LSM_DEBUG_EXPENSIVE
/*
** This function is only included in the build if LSM_DEBUG_EXPENSIVE is 
** defined. Its only purpose is to evaluate various assert() statements to 
** verify that the database is well formed in certain respects.
**







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







4731
4732
4733
4734
4735
4736
4737
4738
4739
4740
4741
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759
4760
4761
4762
4763
4764
4765
4766
4767
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
void lsmSortedSaveTreeCursors(lsm_db *pDb){
  MultiCursor *pCsr;
  for(pCsr=pDb->pCsr; pCsr; pCsr=pCsr->pNext){
    lsmTreeCursorSave(pCsr->apTreeCsr[0]);
    lsmTreeCursorSave(pCsr->apTreeCsr[1]);
  }
}

#ifdef LSM_DEBUG_EXPENSIVE
static void assertRunInOrder(lsm_db *pDb, Segment *pSeg){
  Page *pPg = 0;
  Blob blob1 = {0, 0, 0, 0};
  Blob blob2 = {0, 0, 0, 0};

  lsmFsDbPageGet(pDb->pFS, pSeg->iFirst, &pPg);
  while( pPg ){
    u8 *aData; int nData;
    Page *pNext;

    aData = lsmFsPageData(pPg, &nData);
    if( 0==(pageGetFlags(aData, nData) & SEGMENT_BTREE_FLAG) ){
      int i;
      int nRec = pageGetNRec(aData, nData);
      for(i=0; i<nRec; i++){
        int iTopic1, iTopic2;
        pageGetKeyCopy(pDb->pEnv, pPg, i, &iTopic1, &blob1);

        if( i==0 && blob2.nData ){
          assert( sortedKeyCompare(
                pDb->xCmp, iTopic2, blob2.pData, blob2.nData,
                iTopic1, blob1.pData, blob1.nData
          )<0 );
        }

        if( i<(nRec-1) ){
          pageGetKeyCopy(pDb->pEnv, pPg, i+1, &iTopic2, &blob2);
          assert( sortedKeyCompare(
                pDb->xCmp, iTopic1, blob1.pData, blob1.nData,
                iTopic2, blob2.pData, blob2.nData
          )<0 );
        }
      }
    }

    lsmFsDbPageNext(pSeg, pPg, 1, &pNext);
    lsmFsPageRelease(pPg);
    pPg = pNext;
  }

  sortedBlobFree(&blob1);
  sortedBlobFree(&blob2);
}
#endif

#ifdef LSM_DEBUG_EXPENSIVE
/*
** This function is only included in the build if LSM_DEBUG_EXPENSIVE is 
** defined. Its only purpose is to evaluate various assert() statements to 
** verify that the database is well formed in certain respects.
**
Changes to tool/lsmperf.tcl.
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
  append script $data3

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 60 25000 0 40 {
  lsm-st "mmap=1 multi_proc=0 safety=1"
  LevelDB leveldb
}
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
# LevelDB leveldb
# SQLite sqlite3








|







155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
  append script $data3

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 60 25000 0 40 {
  lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
  LevelDB leveldb
}
# lsm-mt     "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0"
# lsm-st     "mmap=1 multi_proc=0 safety=1 threads=1 autowork=1"
# LevelDB leveldb
# SQLite sqlite3