Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:In mmap-mode, ensure a checkpointer has the entire database file mapped before calling msync().
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: a9f8129cce4a6adbdf70f80efb6d6f5dab752209
User & Date: dan 2012-11-29 09:48:22.756
References
2012-11-29
10:41 Wiki page "LsmPerformance" artifact: f77843e55e user: dan
Context
2012-11-29
18:25
Allow freelist-only age=1 segments to be written even if there are already NMERGE age=1 segments. check-in: 88205b2bc6 user: dan tags: trunk
09:48
In mmap-mode, ensure a checkpointer has the entire database file mapped before calling msync(). check-in: a9f8129cce user: dan tags: trunk
2012-11-28
19:39
Further updates to multi-threaded tests. check-in: f43bee2c1b user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to lsm-test/lsmtest_tdb3.c.
11
12
13
14
15
16
17



18
19
20
21
22
23
24

#include <sys/time.h>

typedef struct LsmDb LsmDb;
typedef struct LsmWorker LsmWorker;
typedef struct LsmFile LsmFile;




#ifdef LSM_MUTEX_PTHREADS
#include <pthread.h>

#define LSMTEST_THREAD_CKPT      1
#define LSMTEST_THREAD_WORKER    2
#define LSMTEST_THREAD_WORKER_AC 3








>
>
>







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

#include <sys/time.h>

typedef struct LsmDb LsmDb;
typedef struct LsmWorker LsmWorker;
typedef struct LsmFile LsmFile;

#define LSMTEST_DFLT_MT_MAX_CKPT (8*1024*1024)
#define LSMTEST_DFLT_MT_MIN_CKPT (2*1024*1024)

#ifdef LSM_MUTEX_PTHREADS
#include <pthread.h>

#define LSMTEST_THREAD_CKPT      1
#define LSMTEST_THREAD_WORKER    2
#define LSMTEST_THREAD_WORKER_AC 3

505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
    rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nByte);
    if( rc!=LSM_OK || nByte<pDb->nMtMaxCkpt ) break;
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
    if( nSleep ) printf("waitOnCheckpointer(): nSleep=%d\n", nSleep);
#endif

  return rc;
}

static int waitOnWorker(LsmDb *pDb){
  int rc;







|







508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
    rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nByte);
    if( rc!=LSM_OK || nByte<pDb->nMtMaxCkpt ) break;
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
    if( nSleep ) printf("# waitOnCheckpointer(): nSleep=%d\n", nSleep);
#endif

  return rc;
}

static int waitOnWorker(LsmDb *pDb){
  int rc;
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
    if( rc!=LSM_OK ) return rc;
    if( bOld==0 || nNew<nLimit ) break;
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
  if( nSleep ) printf("waitOnWorker(): nSleep=%d\n", nSleep);
#endif

  return rc;
}

static int test_lsm_write(
  TestDb *pTestDb, 







|







530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
    if( rc!=LSM_OK ) return rc;
    if( bOld==0 || nNew<nLimit ) break;
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
  if( nSleep ) printf("# waitOnWorker(): nSleep=%d\n", nSleep);
#endif

  return rc;
}

static int test_lsm_write(
  TestDb *pTestDb, 
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
  ** Todo: There should be an OS method to obtain this value - just as
  ** there is in SQLite. For now, LSM assumes that it is smaller than
  ** the page size (default 4KB).
  */
  pDb->szSector = 256;

  /* Default values for the mt_min_ckpt and mt_max_ckpt parameters. */
  pDb->nMtMinCkpt =  2 * (1<<20);
  pDb->nMtMaxCkpt = 16 * (1<<20);

  memcpy(&pDb->env, tdb_lsm_env(), sizeof(lsm_env));
  pDb->env.pVfsCtx = (void *)pDb;
  pDb->env.xFullpath = testEnvFullpath;
  pDb->env.xOpen = testEnvOpen;
  pDb->env.xRead = testEnvRead;
  pDb->env.xWrite = testEnvWrite;







|
|







918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
  ** Todo: There should be an OS method to obtain this value - just as
  ** there is in SQLite. For now, LSM assumes that it is smaller than
  ** the page size (default 4KB).
  */
  pDb->szSector = 256;

  /* Default values for the mt_min_ckpt and mt_max_ckpt parameters. */
  pDb->nMtMinCkpt = LSMTEST_DFLT_MT_MIN_CKPT;
  pDb->nMtMaxCkpt = LSMTEST_DFLT_MT_MAX_CKPT;

  memcpy(&pDb->env, tdb_lsm_env(), sizeof(lsm_env));
  pDb->env.pVfsCtx = (void *)pDb;
  pDb->env.xFullpath = testEnvFullpath;
  pDb->env.xOpen = testEnvOpen;
  pDb->env.xRead = testEnvRead;
  pDb->env.xWrite = testEnvWrite;
Changes to src/lsmInt.h.
530
531
532
533
534
535
536

537
538
539
540
541
542
543
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

int lsmCheckpointLoad(lsm_db *pDb, int *);
int lsmCheckpointLoadOk(lsm_db *pDb, int);
int lsmCheckpointClientCacheOk(lsm_db *);


i64 lsmCheckpointId(u32 *, int);
u32 lsmCheckpointNWrite(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);







>







530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

int lsmCheckpointLoad(lsm_db *pDb, int *);
int lsmCheckpointLoadOk(lsm_db *pDb, int);
int lsmCheckpointClientCacheOk(lsm_db *);

u32 lsmCheckpointNBlock(u32 *);
i64 lsmCheckpointId(u32 *, int);
u32 lsmCheckpointNWrite(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
int lsmFsReadLog(FileSystem *pFS, i64 iOff, int nRead, LsmString *pStr);
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte);
int lsmFsCloseAndDeleteLog(FileSystem *pFS);

void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp);

/* And to sync the db file */
int lsmFsSyncDb(FileSystem *);

void lsmFsFlushWaiting(FileSystem *, int *);

/* Used by lsm_info(ARRAY_STRUCTURE) and lsm_config(MMAP) */
int lsmInfoArrayStructure(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmConfigMmap(lsm_db *pDb, int *piParam);







|







686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
int lsmFsReadLog(FileSystem *pFS, i64 iOff, int nRead, LsmString *pStr);
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte);
int lsmFsCloseAndDeleteLog(FileSystem *pFS);

void lsmFsDeferClose(FileSystem *pFS, LsmFile **pp);

/* And to sync the db file */
int lsmFsSyncDb(FileSystem *, int);

void lsmFsFlushWaiting(FileSystem *, int *);

/* Used by lsm_info(ARRAY_STRUCTURE) and lsm_config(MMAP) */
int lsmInfoArrayStructure(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmConfigMmap(lsm_db *pDb, int *piParam);
Changes to src/lsm_ckpt.c.
1087
1088
1089
1090
1091
1092
1093




1094
1095
1096
1097
1098
1099
1100
    iId = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32);
    iId += ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
  }else{
    iId = ((i64)aCkpt[CKPT_HDR_ID_MSW] << 32) + (i64)aCkpt[CKPT_HDR_ID_LSW];
  }
  return iId;
}





u32 lsmCheckpointNWrite(u32 *aCkpt, int bDisk){
  if( bDisk ){
    return lsmGetU32((u8 *)&aCkpt[CKPT_HDR_NWRITE]);
  }else{
    return aCkpt[CKPT_HDR_NWRITE];
  }







>
>
>
>







1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
    iId = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32);
    iId += ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
  }else{
    iId = ((i64)aCkpt[CKPT_HDR_ID_MSW] << 32) + (i64)aCkpt[CKPT_HDR_ID_LSW];
  }
  return iId;
}

u32 lsmCheckpointNBlock(u32 *aCkpt){
  return aCkpt[CKPT_HDR_NBLOCK];
}

u32 lsmCheckpointNWrite(u32 *aCkpt, int bDisk){
  if( bDisk ){
    return lsmGetU32((u8 *)&aCkpt[CKPT_HDR_NWRITE]);
  }else{
    return aCkpt[CKPT_HDR_NWRITE];
  }
Changes to src/lsm_file.c.
852
853
854
855
856
857
858














859
860
861
862
863
864
865
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}















static int fsPageGet(FileSystem *, Pgno, int, Page **, int *);

/*
** Parameter iBlock is a database file block. This function reads the value 
** stored in the blocks "next block" pointer and stores it in *piNext.
** LSM_OK is returned if everything is successful, or an LSM error code
** otherwise.







>
>
>
>
>
>
>
>
>
>
>
>
>
>







852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
      }
      lsmSortedRemap(pFS->pDb);
    }
    *pRc = rc;
  }
}


/*
** fsync() the database file.
*/
int lsmFsSyncDb(FileSystem *pFS, int nBlock){
  if( nBlock && pFS->bUseMmap ){
    int rc = LSM_OK;
    i64 nMin = (i64)nBlock * (i64)pFS->nBlocksize;
    fsGrowMapping(pFS, nMin, &rc);
    if( rc!=LSM_OK ) return rc;
  }
  return lsmEnvSync(pFS->pEnv, pFS->fdDb);
}

static int fsPageGet(FileSystem *, Pgno, int, Page **, int *);

/*
** Parameter iBlock is a database file block. This function reads the value 
** stored in the blocks "next block" pointer and stores it in *piNext.
** LSM_OK is returned if everything is successful, or an LSM error code
** otherwise.
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
  nByte += (aBuf[2] & 0x7F);
  *pbFree = !(aBuf[1] & 0x80);
  return nByte;
}

static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){
  i64 iStart;
  int iBlk;
  int rc;

  assert( pFS->pCompress );

  iStart = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iOff));
  if( (iOff-iSub)>=iStart ){
    *piRes = (iOff-iSub);







|







990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
  nByte += (aBuf[2] & 0x7F);
  *pbFree = !(aBuf[1] & 0x80);
  return nByte;
}

static int fsSubtractOffset(FileSystem *pFS, i64 iOff, int iSub, i64 *piRes){
  i64 iStart;
  int iBlk = 0;
  int rc;

  assert( pFS->pCompress );

  iStart = fsFirstPageOnBlock(pFS, fsPageToBlock(pFS, iOff));
  if( (iOff-iSub)>=iStart ){
    *piRes = (iOff-iSub);
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
int lsmFsNRead(FileSystem *pFS){ return pFS->nRead; }

/*
** Return the total number of pages written to the database file.
*/
int lsmFsNWrite(FileSystem *pFS){ return pFS->nWrite; }

/*
** fsync() the database file.
*/
int lsmFsSyncDb(FileSystem *pFS){
  return lsmEnvSync(pFS->pEnv, pFS->fdDb);
}

/*
** Return a copy of the environment pointer used by the file-system object.
*/
lsm_env *lsmFsEnv(FileSystem *pFS) { 
  return pFS->pEnv; 
}








<
<
<
<
<
<
<







2175
2176
2177
2178
2179
2180
2181







2182
2183
2184
2185
2186
2187
2188
int lsmFsNRead(FileSystem *pFS){ return pFS->nRead; }

/*
** Return the total number of pages written to the database file.
*/
int lsmFsNWrite(FileSystem *pFS){ return pFS->nWrite; }








/*
** Return a copy of the environment pointer used by the file-system object.
*/
lsm_env *lsmFsEnv(FileSystem *pFS) { 
  return pFS->pEnv; 
}

Changes to src/lsm_shared.c.
686
687
688
689
690
691
692

693
694
695
696
697
698
699
700
701
702
703
704
      }
      bDone = (iDisk>=iCkpt);
    }

    if( rc==LSM_OK && bDone==0 ){
      int iMeta = (pShm->iMetaPage % 2) + 1;
      if( pDb->eSafety!=LSM_SAFETY_OFF ){

        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ){
        pShm->iMetaPage = iMeta;
        nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
      }
#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, 0, "finish checkpoint %d", 







>
|



|







686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
      }
      bDone = (iDisk>=iCkpt);
    }

    if( rc==LSM_OK && bDone==0 ){
      int iMeta = (pShm->iMetaPage % 2) + 1;
      if( pDb->eSafety!=LSM_SAFETY_OFF ){
        int nBlock = lsmCheckpointNBlock(pDb->aSnapshot);
        rc = lsmFsSyncDb(pDb->pFS, nBlock);
      }
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS, 0);
      }
      if( rc==LSM_OK ){
        pShm->iMetaPage = iMeta;
        nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
      }
#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, 0, "finish checkpoint %d", 
Changes to tool/lsmperf.tcl.
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
  append script $data3
  append script $data4

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 60 50000 50000 200 {
  lsm-mt "mt_mode=4 multi_proc=0 autoflush=4M"
  leveldb leveldb
}


  #lsm "mmap=1 multi_proc=0 page_size=4096 block_size=2097152 autocheckpoint=4194000"
  #lsm-mt    "mmap=1 multi_proc=0 threads=2 autowork=0 autocheckpoint=4196000"

# lsm     "safety=1 multi_proc=0"







|
|
<







186
187
188
189
190
191
192
193
194

195
196
197
198
199
200
201
  append script $data3
  append script $data4

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 400 100000 100000 100 {
  lsm-4M "mt_mode=4 multi_proc=0 autoflush=4M page_size=1024"

}


  #lsm "mmap=1 multi_proc=0 page_size=4096 block_size=2097152 autocheckpoint=4194000"
  #lsm-mt    "mmap=1 multi_proc=0 threads=2 autowork=0 autocheckpoint=4196000"

# lsm     "safety=1 multi_proc=0"