Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix lsm_checkpoint() and some lsm_info() calls to match documented behaviour.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 89e314d98b0f7e06ce3b5e779bed67cd32d91761
User & Date: dan 2013-02-05 18:32:16.505
Context
2013-02-06
11:58
Fix bug to do with block redirection. check-in: 7cc153f523 user: dan tags: trunk
2013-02-05
18:32
Fix lsm_checkpoint() and some lsm_info() calls to match documented behaviour. check-in: 89e314d98b user: dan tags: trunk
18:06
Fixes to lsm_work() to bring it into line with documentation: (a) third argument is a number of KB, not a number of db pages, and (b) passing -1 as the third argument means "do as much work as possible". check-in: 2f8966cd9c user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to lsm-test/lsmtest_tdb3.c.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

#include <sys/time.h>

typedef struct LsmDb LsmDb;
typedef struct LsmWorker LsmWorker;
typedef struct LsmFile LsmFile;

#define LSMTEST_DFLT_MT_MAX_CKPT (8*1024*1024)
#define LSMTEST_DFLT_MT_MIN_CKPT (2*1024*1024)

#ifdef LSM_MUTEX_PTHREADS
#include <pthread.h>

#define LSMTEST_THREAD_CKPT      1
#define LSMTEST_THREAD_WORKER    2
#define LSMTEST_THREAD_WORKER_AC 3







|
|







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

#include <sys/time.h>

typedef struct LsmDb LsmDb;
typedef struct LsmWorker LsmWorker;
typedef struct LsmFile LsmFile;

#define LSMTEST_DFLT_MT_MAX_CKPT (8*1024)
#define LSMTEST_DFLT_MT_MIN_CKPT (2*1024)

#ifdef LSM_MUTEX_PTHREADS
#include <pthread.h>

#define LSMTEST_THREAD_CKPT      1
#define LSMTEST_THREAD_WORKER    2
#define LSMTEST_THREAD_WORKER_AC 3
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
  testFree((char *)pDb->pBuf);
  testFree((char *)pDb);
  return rc;
}

static int waitOnCheckpointer(LsmDb *pDb, lsm_db *db){
  int nSleep = 0;
  int nByte;
  int rc;

  do {
    nByte = 0;
    rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nByte);
    if( rc!=LSM_OK || nByte<pDb->nMtMaxCkpt ) break;
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
    if( nSleep ) printf("# waitOnCheckpointer(): nSleep=%d\n", nSleep);
#endif

  return rc;
}

static int waitOnWorker(LsmDb *pDb){
  int rc;
  int nLimit = -1;
  int nSleep = 0;

  rc = lsm_config(pDb->db, LSM_CONFIG_AUTOFLUSH, &nLimit);
  do {
    int bOld, nNew, rc;
    rc = lsm_info(pDb->db, LSM_INFO_TREE_SIZE, &bOld, &nNew);
    if( rc!=LSM_OK ) return rc;
    if( bOld==0 || nNew<(nLimit/2) ) break;
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
  if( nSleep ) printf("# waitOnWorker(): nSleep=%d\n", nSleep);
#endif







|



|
|
|


















|
|

|







496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
  testFree((char *)pDb->pBuf);
  testFree((char *)pDb);
  return rc;
}

static int waitOnCheckpointer(LsmDb *pDb, lsm_db *db){
  int nSleep = 0;
  int nKB;
  int rc;

  do {
    nKB = 0;
    rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nKB);
    if( rc!=LSM_OK || nKB<pDb->nMtMaxCkpt ) break;
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
    if( nSleep ) printf("# waitOnCheckpointer(): nSleep=%d\n", nSleep);
#endif

  return rc;
}

static int waitOnWorker(LsmDb *pDb){
  int rc;
  int nLimit = -1;
  int nSleep = 0;

  rc = lsm_config(pDb->db, LSM_CONFIG_AUTOFLUSH, &nLimit);
  do {
    int nOld, nNew, rc;
    rc = lsm_info(pDb->db, LSM_INFO_TREE_SIZE, &nOld, &nNew);
    if( rc!=LSM_OK ) return rc;
    if( nOld==0 || nNew<(nLimit/2) ) break;
    usleep(5000);
    nSleep += 5;
  }while( 1 );

#if 0
  if( nSleep ) printf("# waitOnWorker(): nSleep=%d\n", nSleep);
#endif
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
    int rc = LSM_OK;
    int nCkpt = -1;

    /* Do some work. If an error occurs, exit. */

    pthread_mutex_unlock(&p->worker_mutex);
    if( p->eType==LSMTEST_THREAD_CKPT ){
      int nByte = 0;
      rc = lsm_info(pWorker, LSM_INFO_CHECKPOINT_SIZE, &nByte);
      if( rc==LSM_OK && nByte>=p->pDb->nMtMinCkpt ){
        rc = lsm_checkpoint(pWorker, 0);
      }
    }else{
      int nWrite;
      do {

        if( p->eType==LSMTEST_THREAD_WORKER ){







|
|
|







1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
    int rc = LSM_OK;
    int nCkpt = -1;

    /* Do some work. If an error occurs, exit. */

    pthread_mutex_unlock(&p->worker_mutex);
    if( p->eType==LSMTEST_THREAD_CKPT ){
      int nKB = 0;
      rc = lsm_info(pWorker, LSM_INFO_CHECKPOINT_SIZE, &nKB);
      if( rc==LSM_OK && nKB>=p->pDb->nMtMinCkpt ){
        rc = lsm_checkpoint(pWorker, 0);
      }
    }else{
      int nWrite;
      do {

        if( p->eType==LSMTEST_THREAD_WORKER ){
Changes to src/lsm.h.
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
**
**   The Tcl structure returned is a list containing one element for each
**   free block in the database. The element itself consists of two 
**   integers - the block number and the id of the snapshot that freed it.
**
** LSM_INFO_CHECKPOINT_SIZE:
**   The third argument should be of type (int *). The location pointed to
**   by this argument is populated with the number of bytes written to the
**   database file since the most recent checkpoint.
**
** LSM_INFO_TREE_SIZE:
**   If this value is passed as the second argument to an lsm_info() call, it
**   should be followed by two arguments of type (int *) (for a total of four
**   arguments).
**
**   At any time, there are either one or two tree structures held in shared
**   memory that new database clients will access (there may also be additional
**   tree structures being used by older clients - this API does not provide
**   information on them). One tree structure - the current tree - is used to
**   accumulate new data written to the database. The other tree structure -
**   the old tree - is a read-only tree holding older data and may be flushed 
**   to disk at any time.
** 
**   Assuming no error occurs, the location pointed to by the first of the two
**   (int *) arguments is set to the size of the old in-memory tree in bytes.
**   The second is set to the size of the current, or live in-memory tree.
*/
#define LSM_INFO_NWRITE           1
#define LSM_INFO_NREAD            2
#define LSM_INFO_DB_STRUCTURE     3
#define LSM_INFO_LOG_STRUCTURE    4
#define LSM_INFO_ARRAY_STRUCTURE  5







|
















|







392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
**
**   The Tcl structure returned is a list containing one element for each
**   free block in the database. The element itself consists of two 
**   integers - the block number and the id of the snapshot that freed it.
**
** LSM_INFO_CHECKPOINT_SIZE:
**   The third argument should be of type (int *). The location pointed to
**   by this argument is populated with the number of KB written to the
**   database file since the most recent checkpoint.
**
** LSM_INFO_TREE_SIZE:
**   If this value is passed as the second argument to an lsm_info() call, it
**   should be followed by two arguments of type (int *) (for a total of four
**   arguments).
**
**   At any time, there are either one or two tree structures held in shared
**   memory that new database clients will access (there may also be additional
**   tree structures being used by older clients - this API does not provide
**   information on them). One tree structure - the current tree - is used to
**   accumulate new data written to the database. The other tree structure -
**   the old tree - is a read-only tree holding older data and may be flushed 
**   to disk at any time.
** 
**   Assuming no error occurs, the location pointed to by the first of the two
**   (int *) arguments is set to the size of the old in-memory tree in KB.
**   The second is set to the size of the current, or live in-memory tree.
*/
#define LSM_INFO_NWRITE           1
#define LSM_INFO_NREAD            2
#define LSM_INFO_DB_STRUCTURE     3
#define LSM_INFO_LOG_STRUCTURE    4
#define LSM_INFO_ARRAY_STRUCTURE  5
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
int lsm_flush(lsm_db *pDb);

/*
** Attempt to checkpoint the current database snapshot. Return an LSM
** error code if an error occurs or LSM_OK otherwise.
**
** If the current snapshot has already been checkpointed, calling this 
** function is a no-op. In this case if pnByte is not NULL, *pnByte is
** set to 0. Or, if the current snapshot is successfully checkpointed
** by this function and pbCkpt is not NULL, *pnByte is set to the number
** of bytes written to the database file since the previous checkpoint
** (the same measure as returned by the LSM_INFO_CHECKPOINT_SIZE query).
*/
int lsm_checkpoint(lsm_db *pDb, int *pnByte);

/*
** CAPI: Opening and Closing Database Cursors
**
** Open and close a database cursor.
*/
int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr);







|

|



|







496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
int lsm_flush(lsm_db *pDb);

/*
** Attempt to checkpoint the current database snapshot. Return an LSM
** error code if an error occurs or LSM_OK otherwise.
**
** If the current snapshot has already been checkpointed, calling this 
** function is a no-op. In this case if pnKB is not NULL, *pnKB is
** set to 0. Or, if the current snapshot is successfully checkpointed
** by this function and pbKB is not NULL, *pnKB is set to the number
** of bytes written to the database file since the previous checkpoint
** (the same measure as returned by the LSM_INFO_CHECKPOINT_SIZE query).
*/
int lsm_checkpoint(lsm_db *pDb, int *pnKB);

/*
** CAPI: Opening and Closing Database Cursors
**
** Open and close a database cursor.
*/
int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr);
Changes to src/lsm_ckpt.c.
1182
1183
1184
1185
1186
1187
1188




1189
1190
1191
1192
1193


1194

1195
1196
1197
1198
1199
1200
1201
1202
1203
      &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
  );

  memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32));
  memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32));
}





int lsmCheckpointSize(lsm_db *db, int *pnByte){
  ShmHeader *pShm = db->pShmhdr;
  int rc = LSM_OK;
  u32 nSynced;



  rc = lsmCheckpointSynced(db, 0, 0, &nSynced);

  if( rc==LSM_OK ){
    u32 nPgsz = db->pShmhdr->aSnap1[CKPT_HDR_PGSZ];
    u32 nWrite = db->pShmhdr->aSnap1[CKPT_HDR_NWRITE];
    *pnByte = (int)((nWrite - nSynced) * nPgsz);
  }

  return rc;
}








>
>
>
>
|




>
>

>



|





1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
      &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
  );

  memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32));
  memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32));
}

/*
** Set the output variable to the number of KB of data written into the
** database file since the most recent checkpoint.
*/
int lsmCheckpointSize(lsm_db *db, int *pnKB){
  ShmHeader *pShm = db->pShmhdr;
  int rc = LSM_OK;
  u32 nSynced;

  /* Set nSynced to the number of pages that had been written when the 
  ** database was last checkpointed. */
  rc = lsmCheckpointSynced(db, 0, 0, &nSynced);

  if( rc==LSM_OK ){
    u32 nPgsz = db->pShmhdr->aSnap1[CKPT_HDR_PGSZ];
    u32 nWrite = db->pShmhdr->aSnap1[CKPT_HDR_NWRITE];
    *pnKB = (int)(( ((i64)(nWrite - nSynced) * nPgsz) + 1023) / 1024);
  }

  return rc;
}

Changes to src/lsm_main.c.
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
  infoFreeWorker(pDb, bUnlock);
  return rc;
}

static int infoFreelistSize(lsm_db *pDb, int *pnFree, int *pnWaiting){
}

static int infoTreeSize(lsm_db *db, int *pnOld, int *pnNew){
  ShmHeader *pShm = db->pShmhdr;
  TreeHeader *p = &pShm->hdr1;

  /* The following code suffers from two race conditions, as it accesses and
  ** trusts the contents of shared memory without verifying checksums:
  **
  **   * The two values read - TreeHeader.root.nByte and oldroot.nByte - are 







|







451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
  infoFreeWorker(pDb, bUnlock);
  return rc;
}

static int infoFreelistSize(lsm_db *pDb, int *pnFree, int *pnWaiting){
}

static int infoTreeSize(lsm_db *db, int *pnOldKB, int *pnNewKB){
  ShmHeader *pShm = db->pShmhdr;
  TreeHeader *p = &pShm->hdr1;

  /* The following code suffers from two race conditions, as it accesses and
  ** trusts the contents of shared memory without verifying checksums:
  **
  **   * The two values read - TreeHeader.root.nByte and oldroot.nByte - are 
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
  **     for the size of the "old" tree may reflect the size of an "old"
  **     tree that was recently flushed to disk.
  **
  ** Given the context in which this function is called (as a result of an
  ** lsm_info(LSM_INFO_TREE_SIZE) request), neither of these are considered to
  ** be problems.
  */
  *pnNew = (int)p->root.nByte;
  if( p->iOldShmid ){
    if( p->iOldLog==lsmCheckpointLogOffset(pShm->aSnap1) ){
      *pnOld = 0;
    }else{
      *pnOld = (int)p->oldroot.nByte;
    }
  }else{
    *pnOld = 0;
  }

  return LSM_OK;
}

int lsm_info(lsm_db *pDb, int eParam, ...){
  int rc = LSM_OK;







|


|

|


|







474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
  **     for the size of the "old" tree may reflect the size of an "old"
  **     tree that was recently flushed to disk.
  **
  ** Given the context in which this function is called (as a result of an
  ** lsm_info(LSM_INFO_TREE_SIZE) request), neither of these are considered to
  ** be problems.
  */
  *pnNewKB = ((int)p->root.nByte + 1023) / 1024;
  if( p->iOldShmid ){
    if( p->iOldLog==lsmCheckpointLogOffset(pShm->aSnap1) ){
      *pnOldKB = 0;
    }else{
      *pnOldKB = ((int)p->oldroot.nByte + 1023) / 1024;
    }
  }else{
    *pnOldKB = 0;
  }

  return LSM_OK;
}

int lsm_info(lsm_db *pDb, int eParam, ...){
  int rc = LSM_OK;
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
    case LSM_INFO_FREELIST: {
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoFreelist(pDb, pzVal);
      break;
    }

    case LSM_INFO_CHECKPOINT_SIZE: {
      int *pnByte = va_arg(ap, int *);
      rc = lsmCheckpointSize(pDb, pnByte);
      break;
    }

    case LSM_INFO_TREE_SIZE: {
      int *pnOld = va_arg(ap, int *);
      int *pnNew = va_arg(ap, int *);
      rc = infoTreeSize(pDb, pnOld, pnNew);







|
|







553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
    case LSM_INFO_FREELIST: {
      char **pzVal = va_arg(ap, char **);
      rc = lsmInfoFreelist(pDb, pzVal);
      break;
    }

    case LSM_INFO_CHECKPOINT_SIZE: {
      int *pnKB = va_arg(ap, int *);
      rc = lsmCheckpointSize(pDb, pnKB);
      break;
    }

    case LSM_INFO_TREE_SIZE: {
      int *pnOld = va_arg(ap, int *);
      int *pnNew = va_arg(ap, int *);
      rc = infoTreeSize(pDb, pnOld, pnNew);
Changes to src/lsm_shared.c.
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
}
#endif

void lsmShmBarrier(lsm_db *db){
  lsmEnvShmBarrier(db->pEnv);
}

int lsm_checkpoint(lsm_db *pDb, int *pnByte){
  int rc;                         /* Return code */
  u32 nWrite = 0;                 /* Number of pages checkpointed */

  /* Attempt the checkpoint. If successful, nWrite is set to the number of
  ** pages written between this and the previous checkpoint.  */
  rc = lsmCheckpointWrite(pDb, 0, &nWrite);

  /* If required, calculate the output variable (bytes of data checkpointed). 
  ** Set it to zero if an error occured.  */
  if( pnByte ){
    int nByte = 0;
    if( rc==LSM_OK && nWrite ){
      nByte = (int)nWrite * lsmFsPageSize(pDb->pFS);
    }
    *pnByte = nByte;
  }

  return rc;
}








|







|

|
|

|

|





1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
}
#endif

void lsmShmBarrier(lsm_db *db){
  lsmEnvShmBarrier(db->pEnv);
}

int lsm_checkpoint(lsm_db *pDb, int *pnKB){
  int rc;                         /* Return code */
  u32 nWrite = 0;                 /* Number of pages checkpointed */

  /* Attempt the checkpoint. If successful, nWrite is set to the number of
  ** pages written between this and the previous checkpoint.  */
  rc = lsmCheckpointWrite(pDb, 0, &nWrite);

  /* If required, calculate the output variable (KB of data checkpointed). 
  ** Set it to zero if an error occured.  */
  if( pnKB ){
    int nKB = 0;
    if( rc==LSM_OK && nWrite ){
      nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;
    }
    *pnKB = nKB;
  }

  return rc;
}