SQLite4
Check-in [93d9ff7c12]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a bug preventing block recycling in multi-threaded tests.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 93d9ff7c1272b51ec1022e7924daa8bf3925db2b
User & Date: dan 2012-09-18 15:48:39
Context
2012-09-18
19:39
Avoid malloc calls in lsm_file.c when running in mmap mode. Also avoid many mutex operations when accessing the in-memory tree. check-in: 1e661d0bad user: dan tags: trunk
15:48
Fix a bug preventing block recycling in multi-threaded tests. check-in: 93d9ff7c12 user: dan tags: trunk
2012-09-17
20:41
Make it possible to flush part of the in-memory tree to disk without blocking writer clients. check-in: 6c686c6d1a user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_tdb3.c.

571
572
573
574
575
576
577
578
579
580

581
582
583
584
585
586
587
...
813
814
815
816
817
818
819


820
821
822
823
824
825
826
827
...
889
890
891
892
893
894
895











896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914




915
916
917
918
919
920
921
...
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
...
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
....
1015
1016
1017
1018
1019
1020
1021

1022
1023
1024
1025

1026
1027
1028
1029
1030
1031
1032
....
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061

1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
}

/*
** A log message callback registered with lsm connections. Prints all 
** messages to stderr.
*/
static void xLog(void *pCtx, int rc, const char *z){
  unused_parameter(pCtx);
  unused_parameter(rc);
  /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */

  fprintf(stderr, "%s\n", z);
  fflush(stderr);

}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
................................................................................
  }
  return 0;
}

void tdb_lsm_enable_log(TestDb *pDb, int bEnable){
  lsm_db *db = tdb_lsm(pDb);
  if( db ){


    lsm_config_log(db, (bEnable ? xLog : 0), 0);
  }
}

void tdb_lsm_application_crash(TestDb *pDb){
  if( tdb_lsm(pDb) ){
    LsmDb *p = (LsmDb *)pDb;
    p->bCrashed = 1;
................................................................................
}

int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){
  return testLsmOpen(zCfg, zDb, bClear, ppDb);
}

#ifdef LSM_MUTEX_PTHREADS












static void *worker_main(void *pArg){
  LsmWorker *p = (LsmWorker *)pArg;
  lsm_db *pWorker;                /* Connection to access db through */

  pthread_mutex_lock(&p->worker_mutex);
  while( (pWorker = p->pWorker) ){
    int nWrite = 0;
    int rc;

    /* Do some work. If an error occurs, exit. */
    pthread_mutex_unlock(&p->worker_mutex);
    rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite);
    printf("# worked %d units\n", nWrite);
    pthread_mutex_lock(&p->worker_mutex);
    if( rc!=LSM_OK && rc!=LSM_BUSY ){
      p->worker_rc = rc;
      break;
    }





    /* If the call to lsm_work() indicates that there is nothing more
    ** to do at this point, wait on the condition variable. The thread will
    ** wake up when it is signaled either because the client thread has
    ** flushed an in-memory tree into the db file or when the connection
    ** is being closed.  */
    if( nWrite==0 ){
................................................................................
      if( p->pWorker && p->bDoWork==0 ){
        pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
      }
      p->bDoWork = 0;
    }
  }
  pthread_mutex_unlock(&p->worker_mutex);
  printf("# worker EXIT\n");
  
  return 0;
}


/*
** Signal worker thread iWorker that there may be work to do.
*/
static void mt_signal_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
  pthread_mutex_lock(&p->worker_mutex);
  p->bDoWork = 1;
  pthread_cond_signal(&p->worker_cond);
  pthread_mutex_unlock(&p->worker_mutex);
}


static void mt_stop_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
  if( p->pWorker ){
    void *pDummy;
    lsm_db *pWorker;

................................................................................
static void mt_client_work_hook(lsm_db *db, void *pArg){
  LsmDb *pDb = (LsmDb *)pArg;     /* LsmDb database handle */
  int i;                          /* Iterator variable */

  /* Invoke the user level work-hook, if any. */
  if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);

  printf("# signalling worker threads\n");
  /* Signal each worker thread */
  for(i=0; i<pDb->nWorker; i++){
    mt_signal_worker(pDb, i);
  }
}

static void mt_worker_work_hook(lsm_db *db, void *pArg){
................................................................................
  LsmWorker *p;                   /* Object to initialize */

  assert( iWorker<pDb->nWorker );

  p = &pDb->aWorker[iWorker];
  p->lsm_work_flags = flags;
  p->lsm_work_npage = nPage;


  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);


  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  /* Kick off the worker thread. */
................................................................................
static int testLsmStartWorkers(
  LsmDb *pDb, int nWorker, const char *zFilename, const char *zCfg
){
  int rc;
  int bAutowork = 0;
  assert( nWorker==1 || nWorker==2 );

#if 0
  /* Turn off auto-work and configure a work-hook on the client connection. */
  lsm_config(pDb->db, LSM_CONFIG_AUTOWORK, &bAutowork);
#endif
  lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);

  pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
  memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
  pDb->nWorker = nWorker;

  rc = mt_start_worker(
      pDb, 0, zFilename, LSM_WORK_CHECKPOINT|LSM_WORK_FLUSH, 512
  );
#if 0

  rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_CHECKPOINT, 
      nWorker==1 ? 512 : 0
  );
#endif

  if( rc==0 && nWorker==2 ){
    rc = mt_start_worker(pDb, 1, zFilename, 0, 512);
  }

  return rc;
}


static int test_lsm_mt(







<


>







 







>
>
|







 







>
>
>
>
>
>
>
>
>
>
>













|





>
>
>
>







 







|




<
<
<
<
<
<
<
<
<
<
<
<







 







|







 







>




>







 







<
|
<
<






|
|
|
<
>
|
|
|
<
|
<
<







571
572
573
574
575
576
577

578
579
580
581
582
583
584
585
586
587
...
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
...
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
...
939
940
941
942
943
944
945
946
947
948
949
950












951
952
953
954
955
956
957
...
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
....
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
....
1048
1049
1050
1051
1052
1053
1054

1055


1056
1057
1058
1059
1060
1061
1062
1063
1064

1065
1066
1067
1068

1069


1070
1071
1072
1073
1074
1075
1076
}

/*
** A log message callback registered with lsm connections. Prints all 
** messages to stderr.
*/
static void xLog(void *pCtx, int rc, const char *z){

  unused_parameter(rc);
  /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */
  if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
  fprintf(stderr, "%s\n", z);
  fflush(stderr);

}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
................................................................................
  }
  return 0;
}

void tdb_lsm_enable_log(TestDb *pDb, int bEnable){
  lsm_db *db = tdb_lsm(pDb);
  if( db ){
    LsmDb *p = (LsmDb *)pDb;
    int i;
    lsm_config_log(db, (bEnable ? xLog : 0), (void *)"client");
  }
}

void tdb_lsm_application_crash(TestDb *pDb){
  if( tdb_lsm(pDb) ){
    LsmDb *p = (LsmDb *)pDb;
    p->bCrashed = 1;
................................................................................
}

int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){
  return testLsmOpen(zCfg, zDb, bClear, ppDb);
}

#ifdef LSM_MUTEX_PTHREADS

/*
** Signal worker thread iWorker that there may be work to do.
*/
static void mt_signal_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
  pthread_mutex_lock(&p->worker_mutex);
  p->bDoWork = 1;
  pthread_cond_signal(&p->worker_cond);
  pthread_mutex_unlock(&p->worker_mutex);
}

static void *worker_main(void *pArg){
  LsmWorker *p = (LsmWorker *)pArg;
  lsm_db *pWorker;                /* Connection to access db through */

  pthread_mutex_lock(&p->worker_mutex);
  while( (pWorker = p->pWorker) ){
    int nWrite = 0;
    int rc;

    /* Do some work. If an error occurs, exit. */
    pthread_mutex_unlock(&p->worker_mutex);
    rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite);
/*    printf("# worked %d units\n", nWrite); */
    pthread_mutex_lock(&p->worker_mutex);
    if( rc!=LSM_OK && rc!=LSM_BUSY ){
      p->worker_rc = rc;
      break;
    }

    if( nWrite && (p->lsm_work_flags & LSM_WORK_CHECKPOINT)==0 ){
      mt_signal_worker(p->pDb, 1);
    }

    /* If the call to lsm_work() indicates that there is nothing more
    ** to do at this point, wait on the condition variable. The thread will
    ** wake up when it is signaled either because the client thread has
    ** flushed an in-memory tree into the db file or when the connection
    ** is being closed.  */
    if( nWrite==0 ){
................................................................................
      if( p->pWorker && p->bDoWork==0 ){
        pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
      }
      p->bDoWork = 0;
    }
  }
  pthread_mutex_unlock(&p->worker_mutex);
  /* printf("# worker EXIT\n"); */
  
  return 0;
}














static void mt_stop_worker(LsmDb *pDb, int iWorker){
  LsmWorker *p = &pDb->aWorker[iWorker];
  if( p->pWorker ){
    void *pDummy;
    lsm_db *pWorker;

................................................................................
static void mt_client_work_hook(lsm_db *db, void *pArg){
  LsmDb *pDb = (LsmDb *)pArg;     /* LsmDb database handle */
  int i;                          /* Iterator variable */

  /* Invoke the user level work-hook, if any. */
  if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);

  /* printf("# signalling worker threads\n"); */
  /* Signal each worker thread */
  for(i=0; i<pDb->nWorker; i++){
    mt_signal_worker(pDb, i);
  }
}

static void mt_worker_work_hook(lsm_db *db, void *pArg){
................................................................................
  LsmWorker *p;                   /* Object to initialize */

  assert( iWorker<pDb->nWorker );

  p = &pDb->aWorker[iWorker];
  p->lsm_work_flags = flags;
  p->lsm_work_npage = nPage;
  p->pDb = pDb;

  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);
lsm_config_log(p->pWorker, xLog, (void *)"worker");

  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  /* Kick off the worker thread. */
................................................................................
static int testLsmStartWorkers(
  LsmDb *pDb, int nWorker, const char *zFilename, const char *zCfg
){
  int rc;
  int bAutowork = 0;
  assert( nWorker==1 || nWorker==2 );


  /* Configure a work-hook for the client connection. */


  lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);

  pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
  memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
  pDb->nWorker = nWorker;

  if( nWorker==1 ){
    int flags = LSM_WORK_CHECKPOINT|LSM_WORK_FLUSH;
    rc = mt_start_worker(pDb, 0, zFilename, flags, 2048);

  }else{
    rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_FLUSH, 1024);
    if( rc==LSM_OK ){
      rc = mt_start_worker(pDb, 1, zFilename, LSM_WORK_CHECKPOINT, 0);

    }


  }

  return rc;
}


static int test_lsm_mt(

Changes to src/lsm.h.

305
306
307
308
309
310
311










312
313
314
315
316
317
318
319

320
321
322
323
324
325
326
**     The third argument should be of type (char **). The location pointed
**     to is populated with a pointer to a nul-terminated string containing
**     the string representation of a Tcl data-structure. The returned 
**     string should be eventually freed by the caller using lsm_free().
**
**     The Tcl structure returned is a list of six integers that describe
**     the current structure of the log file.










*/
#define LSM_INFO_NWRITE           1
#define LSM_INFO_NREAD            2
#define LSM_INFO_DB_STRUCTURE     3
#define LSM_INFO_LOG_STRUCTURE    4
#define LSM_INFO_ARRAY_STRUCTURE  5
#define LSM_INFO_PAGE_ASCII_DUMP  6
#define LSM_INFO_PAGE_HEX_DUMP    7



/* 
** Open and close transactions and nested transactions.
**
** lsm_begin():
**   Used to open transactions and sub-transactions. A successful call to 







>
>
>
>
>
>
>
>
>
>








>







305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
**     The third argument should be of type (char **). The location pointed
**     to is populated with a pointer to a nul-terminated string containing
**     the string representation of a Tcl data-structure. The returned 
**     string should be eventually freed by the caller using lsm_free().
**
**     The Tcl structure returned is a list of six integers that describe
**     the current structure of the log file.
**
**   LSM_INFO_FREELIST
**     The third argument should be of type (char **). The location pointed
**     to is populated with a pointer to a nul-terminated string containing
**     the string representation of a Tcl data-structure. The returned 
**     string should be eventually freed by the caller using lsm_free().
**
**     The Tcl structure returned is a list containing one element for each
**     free block in the database. The element itself consists of two 
**     integers - the block number and the id of the snapshot that freed it.
*/
#define LSM_INFO_NWRITE           1
#define LSM_INFO_NREAD            2
#define LSM_INFO_DB_STRUCTURE     3
#define LSM_INFO_LOG_STRUCTURE    4
#define LSM_INFO_ARRAY_STRUCTURE  5
#define LSM_INFO_PAGE_ASCII_DUMP  6
#define LSM_INFO_PAGE_HEX_DUMP    7
#define LSM_INFO_FREELIST         8


/* 
** Open and close transactions and nested transactions.
**
** lsm_begin():
**   Used to open transactions and sub-transactions. A successful call to 

Changes to src/lsmInt.h.

741
742
743
744
745
746
747

748
749
750
751
752
753
754
int lsmLogCommit(lsm_db *);
void lsmLogEnd(lsm_db *pDb, int bCommit);
void lsmLogTell(lsm_db *, LogMark *);
void lsmLogSeek(lsm_db *, LogMark *);

int lsmLogRecover(lsm_db *);
int lsmInfoLogStructure(lsm_db *pDb, char **pzVal);



/**************************************************************************
** Functions from file "lsm_shared.c".
*/

int lsmDbDatabaseConnect(lsm_db*, const char *);







>







741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
int lsmLogCommit(lsm_db *);
void lsmLogEnd(lsm_db *pDb, int bCommit);
void lsmLogTell(lsm_db *, LogMark *);
void lsmLogSeek(lsm_db *, LogMark *);

int lsmLogRecover(lsm_db *);
int lsmInfoLogStructure(lsm_db *pDb, char **pzVal);
int lsmInfoFreelist(lsm_db *, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".
*/

int lsmDbDatabaseConnect(lsm_db*, const char *);

Changes to src/lsm_main.c.

393
394
395
396
397
398
399




















400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
...
429
430
431
432
433
434
435







436


437
438
439














440
441
442
443
444
445
446
...
481
482
483
484
485
486
487






488
489
490
491
492
493
494
...
753
754
755
756
757
758
759

760
761
762
763



764
765
766
767
768
769
770
}

void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){
  lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre, 
        pSeg->iFirst, pSeg->iLast, pSeg->iRoot, pSeg->nSize
  );
}





















int lsmStructList(
  lsm_db *pDb,                    /* Database handle */
  char **pzOut                    /* OUT: Nul-terminated string (tcl list) */
){
  Level *pTopLevel = 0;           /* Top level of snapshot to report on */
  int rc = LSM_OK;
  Level *p;
  LsmString s;
  Snapshot *pWorker;              /* Worker snapshot */
  int bUnlock = 0;

  /* Obtain the worker snapshot */
  pWorker = pDb->pWorker;
  if( !pWorker ){
    rc = lsmBeginWork(pDb);
    if( rc!=LSM_OK ) return rc;
    pWorker = pDb->pWorker;
    bUnlock = 1;
  }

  /* Format the contents of the snapshot as text */
  pTopLevel = lsmDbSnapshotLevel(pWorker);
  lsmStringInit(&s, pDb->pEnv);
  for(p=pTopLevel; rc==LSM_OK && p; p=p->pNext){
    int i;
    lsmStringAppendf(&s, "%s{", (s.n ? " " : ""));
................................................................................
      lsmAppendSegmentList(&s, " ", &p->aRhs[i]);
    }
    lsmStringAppend(&s, "}", 1);
  }
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */







  if( bUnlock ){


    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }














  *pzOut = s.z;
  return rc;
}

int lsm_info(lsm_db *pDb, int eParam, ...){
  int rc = LSM_OK;
  va_list ap;
................................................................................
    }

    case LSM_INFO_LOG_STRUCTURE: {
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoLogStructure(pDb, pzVal);
      break;
    }







    default:
      rc = LSM_MISUSE;
      break;
  }

  va_end(ap);
................................................................................
  ** db and checkpoint the latest snapshot.
  **
  ** Ignore any LSM_BUSY errors that occur during these operations. If
  ** LSM_BUSY does occur, it means some other connection is already working
  ** on flushing the in-memory tree or checkpointing the database. 
  */
  assert( rc!=LSM_BUSY);

  if( rc==LSM_OK && (nFlush>1 || (nFlush && pDb->bAutowork)) ){
    rc = lsmFlushToDisk(pDb);
    if( rc==LSM_OK && pDb->bAutowork ){
      rc = lsmCheckpointWrite(pDb);



    }
  }
  if( rc==LSM_BUSY ) rc = LSM_OK;

  return rc;
}








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>













|
<
<
|
<
<
<







 







>
>
>
>
>
>
>
|
>
>
|
<
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>







 







>
|
|
|
|
>
>
>







393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433


434



435
436
437
438
439
440
441
...
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461

462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
...
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
...
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
}

void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){
  lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre, 
        pSeg->iFirst, pSeg->iLast, pSeg->iRoot, pSeg->nSize
  );
}

static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){
  int rc = LSM_OK;

  assert( *pbUnlock==0 );
  if( !pDb->pWorker ){
    rc = lsmBeginWork(pDb);
    if( rc!=LSM_OK ) return rc;
    *pbUnlock = 1;
  }
  *pp = pDb->pWorker;
  return rc;
}

static void infoFreeWorker(lsm_db *pDb, int bUnlock){
  if( bUnlock ){
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
}

int lsmStructList(
  lsm_db *pDb,                    /* Database handle */
  char **pzOut                    /* OUT: Nul-terminated string (tcl list) */
){
  Level *pTopLevel = 0;           /* Top level of snapshot to report on */
  int rc = LSM_OK;
  Level *p;
  LsmString s;
  Snapshot *pWorker;              /* Worker snapshot */
  int bUnlock = 0;

  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);


  if( rc!=LSM_OK ) return rc;




  /* Format the contents of the snapshot as text */
  pTopLevel = lsmDbSnapshotLevel(pWorker);
  lsmStringInit(&s, pDb->pEnv);
  for(p=pTopLevel; rc==LSM_OK && p; p=p->pNext){
    int i;
    lsmStringAppendf(&s, "%s{", (s.n ? " " : ""));
................................................................................
      lsmAppendSegmentList(&s, " ", &p->aRhs[i]);
    }
    lsmStringAppend(&s, "}", 1);
  }
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */
  infoFreeWorker(pDb, bUnlock);
  *pzOut = s.z;
  return rc;
}

int lsmInfoFreelist(lsm_db *pDb, char **pzOut){
  Snapshot *pWorker;              /* Worker snapshot */
  int bUnlock = 0;
  LsmString s;
  int i;
  int rc;


  /* Obtain the worker snapshot */
  rc = infoGetWorker(pDb, &pWorker, &bUnlock);
  if( rc!=LSM_OK ) return rc;

  lsmStringInit(&s, pDb->pEnv);
  lsmStringAppendf(&s, "%d+%d",pWorker->freelist.nEntry,pWorker->nFreelistOvfl);
  for(i=0; i<pWorker->freelist.nEntry; i++){
    FreelistEntry *p = &pWorker->freelist.aEntry[i];
    lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId);
  }
  rc = s.n>=0 ? LSM_OK : LSM_NOMEM;

  /* Release the snapshot and return */
  infoFreeWorker(pDb, bUnlock);
  *pzOut = s.z;
  return rc;
}

int lsm_info(lsm_db *pDb, int eParam, ...){
  int rc = LSM_OK;
  va_list ap;
................................................................................
    }

    case LSM_INFO_LOG_STRUCTURE: {
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoLogStructure(pDb, pzVal);
      break;
    }

    case LSM_INFO_FREELIST: {
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoFreelist(pDb, pzVal);
      break;
    }

    default:
      rc = LSM_MISUSE;
      break;
  }

  va_end(ap);
................................................................................
  ** db and checkpoint the latest snapshot.
  **
  ** Ignore any LSM_BUSY errors that occur during these operations. If
  ** LSM_BUSY does occur, it means some other connection is already working
  ** on flushing the in-memory tree or checkpointing the database. 
  */
  assert( rc!=LSM_BUSY);
  if( rc==LSM_OK ){
    if( nFlush>1 || (nFlush && pDb->bAutowork) ){
      rc = lsmFlushToDisk(pDb);
      if( rc==LSM_OK && pDb->bAutowork ){
        rc = lsmCheckpointWrite(pDb);
      }
    }else if( nFlush && pDb->xWork ){
      pDb->xWork(pDb, pDb->pWorkCtx);
    }
  }
  if( rc==LSM_BUSY ) rc = LSM_OK;

  return rc;
}

Changes to src/lsm_shared.c.

452
453
454
455
456
457
458
459
460
461
462
463
464
465
466







467
468
469
470
471
472
473
...
555
556
557
558
559
560
561



562
563
564
565
566
567
568
569





570
571
572
573
574
575
576
...
655
656
657
658
659
660
661
662


663
664
665
666
667
668
669
...
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
    /* The "is in use" bit */
    rc = lsmLsmInUse(pDb, iFree, &bInUse);

    /* The "has been checkpointed" bit */
    if( rc==LSM_OK && bInUse==0 ){
      i64 iId = 0;
      rc = lsmCheckpointSynced(pDb, &iId, 0);
      if( rc!=LSM_OK || iId<iFree ) bInUse = 1;
    }

    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );
    }







  }

  /* If no block was allocated from the free-list, allocate one at the
  ** end of the file. */
  if( rc==LSM_OK && iRet==0 ){
    iRet = ++pDb->pWorker->nBlock;
  }
................................................................................
        lsmFsMetaPageRelease(pPg);
      }
      bDone = (iDisk>=iCkpt);
    }

    if( rc==LSM_OK && bDone==0 ){
      int iMeta = (pShm->iMetaPage % 2) + 1;



      if( pDb->eSafety!=LSM_SAFETY_OFF ){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;





    }
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

................................................................................
    ** that the shared-memory still contains the same values. If so, proceed.
    ** Otherwise, relinquish the read-lock and retry the whole procedure
    ** (starting with loading the in-memory tree header).  */
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      u32 iShmMax = pDb->treehdr.iUsedShmid;
      u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk;
      rc = lsmReadlock(pDb, iSnap, iShmMin, iShmMax);


      if( rc==LSM_OK ){
        if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
         && lsmCheckpointLoadOk(pDb, iSnap)
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
................................................................................
  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId ){
      if( (iLsmId!=0 && iLsmId>=p->iLsmId) 
       || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid))
      ){
        rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
        if( rc==LSM_OK ){
          p->iLsmId = 0;
          lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
        }







|







>
>
>
>
>
>
>







 







>
>
>








>
>
>
>
>







 







|
>
>







 







|







452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
...
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
...
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
...
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
    /* The "is in use" bit */
    rc = lsmLsmInUse(pDb, iFree, &bInUse);

    /* The "has been checkpointed" bit */
    if( rc==LSM_OK && bInUse==0 ){
      i64 iId = 0;
      rc = lsmCheckpointSynced(pDb, &iId, 0);
      if( rc!=LSM_OK || iId<iFree ) bInUse = 2;
    }

    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );
    }
#if 0
    lsmLogMessage(
        pDb, 0, "%d reusing block %d%s", (iRet==0 ? "not " : "")
        pFree->aEntry[0].iBlk, 
        bInUse==0 ? "" : bInUse==1 ? " (client)" : " (unsynced)"
    );
#endif
  }

  /* If no block was allocated from the free-list, allocate one at the
  ** end of the file. */
  if( rc==LSM_OK && iRet==0 ){
    iRet = ++pDb->pWorker->nBlock;
  }
................................................................................
        lsmFsMetaPageRelease(pPg);
      }
      bDone = (iDisk>=iCkpt);
    }

    if( rc==LSM_OK && bDone==0 ){
      int iMeta = (pShm->iMetaPage % 2) + 1;
#if 0
  lsmLogMessage(pDb, 0, "starting checkpoint");
#endif
      if( pDb->eSafety!=LSM_SAFETY_OFF ){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
#if 0
  lsmLogMessage(pDb, 0, "finish checkpoint %d", 
      (int)lsmCheckpointId(pDb->aSnapshot, 0)
  );
#endif
    }
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

................................................................................
    ** that the shared-memory still contains the same values. If so, proceed.
    ** Otherwise, relinquish the read-lock and retry the whole procedure
    ** (starting with loading the in-memory tree header).  */
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      u32 iShmMax = pDb->treehdr.iUsedShmid;
      u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk;
      rc = lsmReadlock(
          pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax
      );
      if( rc==LSM_OK ){
        if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
         && lsmCheckpointLoadOk(pDb, iSnap)
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
................................................................................
  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId ){
      if( (iLsmId!=0 && p->iLsmId!=0 && iLsmId>=p->iLsmId) 
       || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid))
      ){
        rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
        if( rc==LSM_OK ){
          p->iLsmId = 0;
          lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
        }

Changes to src/lsm_sorted.c.

3637
3638
3639
3640
3641
3642
3643



3644
3645
3646
3647
3648
3649
3650
....
4047
4048
4049
4050
4051
4052
4053

4054
4055
4056
4057
4058
4059
4060
....
4061
4062
4063
4064
4065
4066
4067
4068



4069
4070
4071
4072
4073
4074
4075
....
4100
4101
4102
4103
4104
4105
4106

4107


4108
4109
4110
4111
4112
4113
4114
4115
4116
4117









4118
4119
4120
4121
4122
4123
4124

  rc = sortedNewToplevel(pDb, 1, pnOvfl);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush");
#endif



  return rc;
}

/*
** The nMerge levels in the LSM beginning with pLevel consist of a
** left-hand-side segment only. Replace these levels with a single new
** level consisting of a new empty segment on the left-hand-side and the
................................................................................
int lsmSortedAutoWork(
  lsm_db *pDb,                    /* Database handle */
  int nUnit                       /* Pages of data written to in-memory tree */
){
  int rc;                         /* Return code */
  int nRemaining;                 /* Units of work to do before returning */
  int nDepth;                     /* Current height of tree (longest path) */

  Level *pLevel;                  /* Used to iterate through levels */

  assert( lsmFsIntegrityCheck(pDb) );
  assert( pDb->pWorker );

  /* Determine how many units of work to do before returning. One unit of
  ** work is achieved by writing one page (~4KB) of merged data.  */
................................................................................
  nRemaining = nDepth = 0;
  for(pLevel=lsmDbSnapshotLevel(pDb->pWorker); pLevel; pLevel=pLevel->pNext){
    /* nDepth += LSM_MAX(1, pLevel->nRight); */
    nDepth += 1;
  }
  nRemaining = nUnit * nDepth;

  rc = sortedWork(pDb, nRemaining, 0, 0);



  return rc;
}

/*
** Perform work to merge database segments together.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite){
................................................................................
    bFinishWork = 1;
  }

  /* If the FLUSH flag is set, try to flush the contents of the in-memory
  ** tree to disk.  */
  if( rc==LSM_OK && ((flags & LSM_WORK_FLUSH)) ){
    rc = lsmTreeLoadHeader(pDb, 0);

    if( rc==LSM_OK && pDb->treehdr.iOldShmid ){


      rc = lsmSortedFlushTree(pDb, &nOvfl);
      bFlush = 1;
    }
  }

  /* If nPage is greater than zero, do some merging. */
  if( rc==LSM_OK && nPage>0 ){
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nPage, bOptimize, &nWrite);
    if( rc==LSM_OK && nWrite ){









      rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
        nOvfl = -1;
        rc = sortedNewToplevel(pDb, 0, &nOvfl);
      }
    }
  }







>
>
>







 







>







 







|
>
>
>







 







>
|
>
>










>
>
>
>
>
>
>
>
>







3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
....
4050
4051
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
....
4065
4066
4067
4068
4069
4070
4071
4072
4073
4074
4075
4076
4077
4078
4079
4080
4081
4082
....
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143

  rc = sortedNewToplevel(pDb, 1, pnOvfl);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush");
#endif
#if 0
  lsmLogMessage(pDb, rc, "flushed tree to disk");
#endif
  return rc;
}

/*
** The nMerge levels in the LSM beginning with pLevel consist of a
** left-hand-side segment only. Replace these levels with a single new
** level consisting of a new empty segment on the left-hand-side and the
................................................................................
int lsmSortedAutoWork(
  lsm_db *pDb,                    /* Database handle */
  int nUnit                       /* Pages of data written to in-memory tree */
){
  int rc;                         /* Return code */
  int nRemaining;                 /* Units of work to do before returning */
  int nDepth;                     /* Current height of tree (longest path) */
  int nWrite;                     /* Pages written */
  Level *pLevel;                  /* Used to iterate through levels */

  assert( lsmFsIntegrityCheck(pDb) );
  assert( pDb->pWorker );

  /* Determine how many units of work to do before returning. One unit of
  ** work is achieved by writing one page (~4KB) of merged data.  */
................................................................................
  nRemaining = nDepth = 0;
  for(pLevel=lsmDbSnapshotLevel(pDb->pWorker); pLevel; pLevel=pLevel->pNext){
    /* nDepth += LSM_MAX(1, pLevel->nRight); */
    nDepth += 1;
  }
  nRemaining = nUnit * nDepth;

  rc = sortedWork(pDb, nRemaining, 0, &nWrite);
#if 0
  lsmLogMessage(pDb, 0, "auto-work: %d pages", nWrite);
#endif
  return rc;
}

/*
** Perform work to merge database segments together.
*/
int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite){
................................................................................
    bFinishWork = 1;
  }

  /* If the FLUSH flag is set, try to flush the contents of the in-memory
  ** tree to disk.  */
  if( rc==LSM_OK && ((flags & LSM_WORK_FLUSH)) ){
    rc = lsmTreeLoadHeader(pDb, 0);
    if( rc==LSM_OK 
     && pDb->treehdr.iOldShmid 
     && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff 
    ){
      rc = lsmSortedFlushTree(pDb, &nOvfl);
      bFlush = 1;
    }
  }

  /* If nPage is greater than zero, do some merging. */
  if( rc==LSM_OK && nPage>0 ){
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nPage, bOptimize, &nWrite);
    if( rc==LSM_OK && nWrite ){
#if 0
  {
    char *z = 0;
    lsmInfoFreelist(pDb, &z);
    lsmLogMessage(pDb, 0, "work: %d pages", nWrite);
    lsmLogMessage(pDb, 0, "freelist: %s", z);
    lsm_free(lsm_get_env(pDb), z);
  }
#endif
      rc = lsmSortedFlushDb(pDb);
      if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){
        nOvfl = -1;
        rc = sortedNewToplevel(pDb, 0, &nOvfl);
      }
    }
  }

Changes to tool/lsmperf.tcl.

3
4
5
6
7
8
9


10
11
12
13
14
15
16
..
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
...
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
exec tclsh "$0" "$@"


proc exec_lsmtest_speed {nSec spec} {
  set fd [open [list |lsmtest speed2 {*}$spec]]
  set res [list]



  set initial [clock seconds]

  while {![eof $fd] && ($nSec==0 || ([clock second]-$initial)<$nSec)} { 
    set line [gets $fd]
    puts $line
    if {[string range $line 0 0]=="#"} continue
    if {[llength $line]==0} continue
................................................................................
    incr nMs $msInsert
    append ret "$nIns [expr $nIns*1000.0/$nMs]\n"
  }
  append ret "end\n"
  set ret
}

proc make_dataset {res iRes nWrite nShift} {
  set ret ""
  foreach row $res {
    set i [lindex $row 0]
    set j [lindex $row [expr $iRes+1]]
    set x [expr $i*$nWrite + $nShift]
    append ret "$x [expr int($nWrite * 1000.0 / $j)]\n"
  }
  append ret "end\n"
  set ret
}

proc do_write_test {zPng nSec nWrite nFetch nRepeat lSys} {

................................................................................
  set data3 ""

  foreach {name sys} $lSys res $lRes col $cols {
    foreach {c1 c2} $col {}

    if {$plot1 != ""} { set plot1 ", $plot1" }
    set plot1 "\"-\" ti \"$name writes/sec\" with boxes fs solid lc rgb \"$c1\"$plot1"
    set data1 "[make_dataset $res 0 $nWrite $nShift] $data1"

    set plot3 ",\"-\" ti \"$name cumulative writes/sec\" with lines lc rgb \"$c2\" lw 2 $plot3"
    set data3 "[make_totalset $res $nWrite] $data3"

    if {$nFetch>0} {
      set new ", \"-\" ti \"$name fetches/sec\" axis x1y2 with points lw 3 lc rgb \"$c2\""
      set plot2 "$new $plot2"
      set data2 "[make_dataset $res 1 $nWrite $nWrite] $data2"
    }

    incr nShift [expr $nWrite/4]
  }
  append script "plot $plot1 $plot2 $plot3\n"
  append script $data1
  append script $data2
  append script $data3

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 180 10000 10000 1000 {
  LSM   "mmap=1 multi_proc=0 safety=1 threads=2 autowork=0" 
}











>
>







 







|





|







 







|







|













|
|





3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
..
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
...
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
exec tclsh "$0" "$@"


proc exec_lsmtest_speed {nSec spec} {
  set fd [open [list |lsmtest speed2 {*}$spec]]
  set res [list]

  puts "lsmtest speed2 $spec"

  set initial [clock seconds]

  while {![eof $fd] && ($nSec==0 || ([clock second]-$initial)<$nSec)} { 
    set line [gets $fd]
    puts $line
    if {[string range $line 0 0]=="#"} continue
    if {[llength $line]==0} continue
................................................................................
    incr nMs $msInsert
    append ret "$nIns [expr $nIns*1000.0/$nMs]\n"
  }
  append ret "end\n"
  set ret
}

proc make_dataset {res iRes nWrite nShift nOp} {
  set ret ""
  foreach row $res {
    set i [lindex $row 0]
    set j [lindex $row [expr $iRes+1]]
    set x [expr $i*$nWrite + $nShift]
    append ret "$x [expr int($nOp * 1000.0 / $j)]\n"
  }
  append ret "end\n"
  set ret
}

proc do_write_test {zPng nSec nWrite nFetch nRepeat lSys} {

................................................................................
  set data3 ""

  foreach {name sys} $lSys res $lRes col $cols {
    foreach {c1 c2} $col {}

    if {$plot1 != ""} { set plot1 ", $plot1" }
    set plot1 "\"-\" ti \"$name writes/sec\" with boxes fs solid lc rgb \"$c1\"$plot1"
    set data1 "[make_dataset $res 0 $nWrite $nShift $nWrite] $data1"

    set plot3 ",\"-\" ti \"$name cumulative writes/sec\" with lines lc rgb \"$c2\" lw 2 $plot3"
    set data3 "[make_totalset $res $nWrite] $data3"

    if {$nFetch>0} {
      set new ", \"-\" ti \"$name fetches/sec\" axis x1y2 with points lw 3 lc rgb \"$c2\""
      set plot2 "$new $plot2"
      set data2 "[make_dataset $res 1 $nWrite $nWrite $nFetch] $data2"
    }

    incr nShift [expr $nWrite/4]
  }
  append script "plot $plot1 $plot2 $plot3\n"
  append script $data1
  append script $data2
  append script $data3

  append script "pause -1\n"
  exec_gnuplot_script $script $zPng
}

do_write_test x.png 60 10000 20000 1000 {
  LSM   "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0 worker_nmerge=2" 
}