Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add lsm_tree_size() and lsm_ckpt_size().
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 5062ffb0177c5178ceb18f96edd5fc48484479fd
User & Date: dan 2012-09-20 19:33:15.576
Context
2012-09-26
15:38
Merge rework-flow-control branch with trunk. check-in: cf2ef747ad user: dan tags: trunk
2012-09-22
19:38
Rework flow control some (flow control = slowing down clients when worker threads or processes cannot keep up). check-in: 50f8b55823 user: dan tags: rework-flow-control
2012-09-20
19:33
Add lsm_tree_size() and lsm_ckpt_size(). check-in: 5062ffb017 user: dan tags: trunk
2012-09-18
19:39
Avoid malloc calls in lsm_file.c when running in mmap mode. Also avoid many mutex operations when accessing the in-memory tree. check-in: 1e661d0bad user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to lsm-test/lsmtest_func.c.
74
75
76
77
78
79
80

81
82
83
84
85
86
87
88
89
90
91
92
93




94

95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131




  struct Option {
    const char *zName;
    int eOpt;
  } aOpt [] = { 
    { "array",      LSM_INFO_ARRAY_STRUCTURE },
    { "page-ascii", LSM_INFO_PAGE_ASCII_DUMP },
    { "page-hex",   LSM_INFO_PAGE_HEX_DUMP },

    { 0, 0 } 
  };

  char *z = 0;

  if( nArg!=1 && nArg!=3 ){
    testPrintUsage("DATABASE ?array|page-ascii|page-hex PGNO?");
    return -1;
  }
  if( nArg==3 ){
    rc = testArgSelect(aOpt, "option", azArg[1], &eOpt);
    if( rc!=0 ) return rc;
    eOpt = aOpt[eOpt].eOpt;




    iPg = atoi(azArg[2]);

  }
  zDb = azArg[0];

  rc = lsm_new(0, &pDb);
  if( rc!=LSM_OK ){
    testPrintError("lsm_new(): rc=%d\n", rc);
  }else{
    rc = lsm_open(pDb, zDb);
    if( rc!=LSM_OK ){
      testPrintError("lsm_open(): rc=%d\n", rc);
    }
  }

  if( rc==LSM_OK ){
    switch( eOpt ){
      case LSM_INFO_DB_STRUCTURE:

        rc = lsm_info(pDb, LSM_INFO_DB_STRUCTURE, &z);
        break;
      case LSM_INFO_ARRAY_STRUCTURE:
      case LSM_INFO_PAGE_ASCII_DUMP:
      case LSM_INFO_PAGE_HEX_DUMP:
        rc = lsm_info(pDb, eOpt, iPg, &z);
        break;
      default:
        assert( !"no chance" );
    }

    if( rc==LSM_OK ){
      printf("%s\n", z ? z : "");
      fflush(stdout);
    }
    lsm_free(lsm_get_env(pDb), z);
  }

  lsm_close(pDb);
  return rc;
}











>





|
<
<
|
|



>
>
>
>
|
>
















>
|



















|
>
>
>
>
74
75
76
77
78
79
80
81
82
83
84
85
86
87


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
  struct Option {
    const char *zName;
    int eOpt;
  } aOpt [] = { 
    { "array",      LSM_INFO_ARRAY_STRUCTURE },
    { "page-ascii", LSM_INFO_PAGE_ASCII_DUMP },
    { "page-hex",   LSM_INFO_PAGE_HEX_DUMP },
    { "freelist",   LSM_INFO_FREELIST },
    { 0, 0 } 
  };

  char *z = 0;

  if( nArg<1 || nArg>3 ) goto usage;



  if( nArg>1 ){
    rc = testArgSelect(aOpt, "option", azArg[1], &eOpt);
    if( rc!=0 ) return rc;
    eOpt = aOpt[eOpt].eOpt;
    if( eOpt==LSM_INFO_FREELIST ){
      if( nArg!=2 ) goto usage;
    }else{
      if( nArg!=3 ) goto usage;
      iPg = atoi(azArg[2]);
    }
  }
  zDb = azArg[0];

  rc = lsm_new(0, &pDb);
  if( rc!=LSM_OK ){
    testPrintError("lsm_new(): rc=%d\n", rc);
  }else{
    rc = lsm_open(pDb, zDb);
    if( rc!=LSM_OK ){
      testPrintError("lsm_open(): rc=%d\n", rc);
    }
  }

  if( rc==LSM_OK ){
    switch( eOpt ){
      case LSM_INFO_DB_STRUCTURE:
      case LSM_INFO_FREELIST:
        rc = lsm_info(pDb, eOpt, &z);
        break;
      case LSM_INFO_ARRAY_STRUCTURE:
      case LSM_INFO_PAGE_ASCII_DUMP:
      case LSM_INFO_PAGE_HEX_DUMP:
        rc = lsm_info(pDb, eOpt, iPg, &z);
        break;
      default:
        assert( !"no chance" );
    }

    if( rc==LSM_OK ){
      printf("%s\n", z ? z : "");
      fflush(stdout);
    }
    lsm_free(lsm_get_env(pDb), z);
  }

  lsm_close(pDb);
  return rc;

 usage:
  testPrintUsage("DATABASE ?array|page-ascii|page-hex PGNO?");
  return -1;
}
Changes to lsm-test/lsmtest_tdb3.c.
424
425
426
427
428
429
430


















431
432
433
434
435
436
437
  TestDb *pTestDb, 
  void *pKey, 
  int nKey, 
  void *pVal, 
  int nVal
){
  LsmDb *pDb = (LsmDb *)pTestDb;


















  return lsm_write(pDb->db, pKey, nKey, pVal, nVal);
}

static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
  LsmDb *pDb = (LsmDb *)pTestDb;
  return lsm_delete(pDb->db, pKey, nKey);
}







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
  TestDb *pTestDb, 
  void *pKey, 
  int nKey, 
  void *pVal, 
  int nVal
){
  LsmDb *pDb = (LsmDb *)pTestDb;

  if( pDb->aWorker ){
    int nLimit = -1;
    int nSleep = 0;
    lsm_config(pDb->db, LSM_CONFIG_WRITE_BUFFER, &nLimit);
    do {
      int bOld, nNew, rc;
      rc = lsm_tree_size(pDb->db, &bOld, &nNew);
      if( rc!=LSM_OK ) return rc;
      if( bOld==0 || nNew<nLimit ) break;
      usleep(1000);
      nSleep += 1;
    }while( 1 );
#if 0
    if( nSleep ) printf("nSleep=%d\n", nSleep);
#endif
  }

  return lsm_write(pDb->db, pKey, nKey, pVal, nVal);
}

static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
  LsmDb *pDb = (LsmDb *)pTestDb;
  return lsm_delete(pDb->db, pKey, nKey);
}
914
915
916
917
918
919
920














921
922
923
924
925
926
927
  pthread_mutex_lock(&p->worker_mutex);
  while( (pWorker = p->pWorker) ){
    int nWrite = 0;
    int rc;

    /* Do some work. If an error occurs, exit. */
    pthread_mutex_unlock(&p->worker_mutex);














    rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite);
/*    printf("# worked %d units\n", nWrite); */
    pthread_mutex_lock(&p->worker_mutex);
    if( rc!=LSM_OK && rc!=LSM_BUSY ){
      p->worker_rc = rc;
      break;
    }







>
>
>
>
>
>
>
>
>
>
>
>
>
>







932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
  pthread_mutex_lock(&p->worker_mutex);
  while( (pWorker = p->pWorker) ){
    int nWrite = 0;
    int rc;

    /* Do some work. If an error occurs, exit. */
    pthread_mutex_unlock(&p->worker_mutex);
    if( (p->lsm_work_flags & LSM_WORK_CHECKPOINT)==0 ){
      int nSleep = 0;
      while( 1 ){
        int nByte = 0;
        lsm_ckpt_size(pWorker, &nByte);
        if( nByte<(32*1024*1024) ) break;
        mt_signal_worker(p->pDb, 1);
        usleep(1000);
        nSleep++;
      }
#if 0
      if( nSleep ) printf("nSleep=%d (worker)\n", nSleep);
#endif
    }
    rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite);
/*    printf("# worked %d units\n", nWrite); */
    pthread_mutex_lock(&p->worker_mutex);
    if( rc!=LSM_OK && rc!=LSM_BUSY ){
      p->worker_rc = rc;
      break;
    }
Changes to src/lsm.h.
379
380
381
382
383
384
385





































386
387
388
389
390
391
392
** Delete all database entries with keys that are greater than or equal to
** (pKey1/nKey1) and smaller than or equal to (pKey2/nKey2).
*/
int lsm_delete_range(lsm_db *, 
    const void *pKey1, int nKey1, const void *pKey2, int nKey2
);







































/*
** This function is called by a thread to work on the database structure.
** The actual operations performed by this function depend on the value 
** passed as the "flags" parameter:
**
** LSM_WORK_FLUSH:







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
** Delete all database entries with keys that are greater than or equal to
** (pKey1/nKey1) and smaller than or equal to (pKey2/nKey2).
*/
int lsm_delete_range(lsm_db *, 
    const void *pKey1, int nKey1, const void *pKey2, int nKey2
);

/*
** The lsm_tree_size() function reports on the current state of the 
** in-memory tree data structure. 
**
** At any time, there are either one or two tree structures held in shared
** memory that new database clients will access (there may also be additional 
** tree structures being used by older clients - this API does not provide
** information on them). One tree structure - the current tree - is used to
** accumulate new data written to the database. The other tree structure - the 
** old tree - is a read-only tree holding older data and may be flushed to disk
** at any time.
**
** If successful, this function sets *pnNew to the number of bytes of shared
** memory space used by the current tree. *pbOld is set to true if the old 
** tree exists, or false if it does not. 
**
** If no error occurs, LSM_OK is returned. Otherwise an LSM error code.
**
** RACE CONDITION:
**   Describe the race condition this function is subject to. 
*/
int lsm_tree_size(lsm_db *, int *pbOld, int *pnNew);

/*
** This function is used to query the amount of data that has been written
** to the database file but not checkpointed (synced). If successful, *pnByte
** is set to the number of bytes before returning.
**
** LSM_OK is returned if successful. Or if an error occurs, an LSM error
** code is returned.
**
** RACE CONDITION:
**   Describe the race condition this function is subject to. Or remove
**   it somehow.
*/
int lsm_ckpt_size(lsm_db *, int *pnByte);


/*
** This function is called by a thread to work on the database structure.
** The actual operations performed by this function depend on the value 
** passed as the "flags" parameter:
**
** LSM_WORK_FLUSH:
Changes to src/lsmInt.h.
473
474
475
476
477
478
479

480
481
482
483
484
485
486
  i64 iLogOff;                    /* Log file offset */

  /* Used by worker snapshots only */
  int nBlock;                     /* Number of blocks in database file */
  u32 aiAppend[LSM_APPLIST_SZ];   /* Append point list */
  Freelist freelist;              /* Free block list */
  int nFreelistOvfl;              /* Number of extra free-list entries in LSM */

};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *);







>







473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
  i64 iLogOff;                    /* Log file offset */

  /* Used by worker snapshots only */
  int nBlock;                     /* Number of blocks in database file */
  u32 aiAppend[LSM_APPLIST_SZ];   /* Append point list */
  Freelist freelist;              /* Free block list */
  int nFreelistOvfl;              /* Number of extra free-list entries in LSM */
  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *);
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
int lsmDatabaseFull(lsm_db *pDb);
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog);


/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);







|







506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
int lsmDatabaseFull(lsm_db *pDb);
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite);


/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
Changes to src/lsm_ckpt.c.
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175

176
177
178
179
180
181
182
183
184
185
186
187
 + (((x)&0x00FF0000)>>8)  + (((x)&0xFF000000)>>24) \
)

static const int one = 1;
#define LSM_LITTLE_ENDIAN (*(u8 *)(&one))

/* Sizes, in integers, of various parts of the checkpoint. */
#define CKPT_HDR_SIZE         8
#define CKPT_LOGPTR_SIZE      4
#define CKPT_SEGMENT_SIZE     4
#define CKPT_CKSUM_SIZE       2
#define CKPT_APPENDLIST_SIZE  LSM_APPLIST_SZ

/* A #define to describe each integer in the checkpoint header. */
#define CKPT_HDR_ID_MSW   0
#define CKPT_HDR_ID_LSW   1
#define CKPT_HDR_NCKPT    2
#define CKPT_HDR_NBLOCK   3
#define CKPT_HDR_BLKSZ    4
#define CKPT_HDR_NLEVEL   5
#define CKPT_HDR_PGSZ     6
#define CKPT_HDR_OVFL     7


#define CKPT_HDR_LO_MSW     8
#define CKPT_HDR_LO_LSW     9
#define CKPT_HDR_LO_CKSUM1 10
#define CKPT_HDR_LO_CKSUM2 11

typedef struct CkptBuffer CkptBuffer;

/*
** Dynamic buffer used to accumulate data for a checkpoint.
*/
struct CkptBuffer {







|














>

|
|
|
|







154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
 + (((x)&0x00FF0000)>>8)  + (((x)&0xFF000000)>>24) \
)

static const int one = 1;
#define LSM_LITTLE_ENDIAN (*(u8 *)(&one))

/* Sizes, in integers, of various parts of the checkpoint. */
#define CKPT_HDR_SIZE         9
#define CKPT_LOGPTR_SIZE      4
#define CKPT_SEGMENT_SIZE     4
#define CKPT_CKSUM_SIZE       2
#define CKPT_APPENDLIST_SIZE  LSM_APPLIST_SZ

/* A #define to describe each integer in the checkpoint header. */
#define CKPT_HDR_ID_MSW   0
#define CKPT_HDR_ID_LSW   1
#define CKPT_HDR_NCKPT    2
#define CKPT_HDR_NBLOCK   3
#define CKPT_HDR_BLKSZ    4
#define CKPT_HDR_NLEVEL   5
#define CKPT_HDR_PGSZ     6
#define CKPT_HDR_OVFL     7
#define CKPT_HDR_NWRITE   8

#define CKPT_HDR_LO_MSW     9
#define CKPT_HDR_LO_LSW    10
#define CKPT_HDR_LO_CKSUM1 11
#define CKPT_HDR_LO_CKSUM2 12

typedef struct CkptBuffer CkptBuffer;

/*
** Dynamic buffer used to accumulate data for a checkpoint.
*/
struct CkptBuffer {
426
427
428
429
430
431
432

433
434
435
436
437
438
439
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);


  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
  }







>







427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
    ckptSetValue(&ckpt, iOut+1, 0, &rc);
  }
876
877
878
879
880
881
882

883
884
885
886
887
888
889
    10,                 /* CKPT_HDR_ID_LSW */
    0,                  /* CKPT_HDR_NCKPT */
    0,                  /* CKPT_HDR_NBLOCK */
    0,                  /* CKPT_HDR_BLKSZ */
    0,                  /* CKPT_HDR_NLEVEL */
    0,                  /* CKPT_HDR_PGSZ */
    0,                  /* CKPT_HDR_OVFL */

    0, 0, 1234, 5678,   /* The log pointer and initial checksum */
    0, 0, 0, 0,         /* The append list */
    0,                  /* The free block list */
    0, 0                /* Space for checksum values */
  };
  u32 nCkpt = array_size(aCkpt);
  ShmHeader *pShm = pDb->pShmhdr;







>







878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
    10,                 /* CKPT_HDR_ID_LSW */
    0,                  /* CKPT_HDR_NCKPT */
    0,                  /* CKPT_HDR_NBLOCK */
    0,                  /* CKPT_HDR_BLKSZ */
    0,                  /* CKPT_HDR_NLEVEL */
    0,                  /* CKPT_HDR_PGSZ */
    0,                  /* CKPT_HDR_OVFL */
    0,                  /* CKPT_HDR_NWRITE */
    0, 0, 1234, 5678,   /* The log pointer and initial checksum */
    0, 0, 0, 0,         /* The append list */
    0,                  /* The free block list */
    0, 0                /* Space for checksum values */
  };
  u32 nCkpt = array_size(aCkpt);
  ShmHeader *pShm = pDb->pShmhdr;
1037
1038
1039
1040
1041
1042
1043

1044
1045
1046
1047
1048
1049
1050
    int nFree;
    int nCopy;
    int nLevel = (int)aCkpt[CKPT_HDR_NLEVEL];
    int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE;

    pNew->iId = lsmCheckpointId(aCkpt, 0);
    pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK];

    rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);
    pNew->iLogOff = lsmCheckpointLogOffset(aCkpt);

    /* Make a copy of the append-list */
    nCopy = sizeof(u32) * LSM_APPLIST_SZ;
    memcpy(pNew->aiAppend, &aCkpt[CKPT_HDR_SIZE+CKPT_LOGPTR_SIZE], nCopy);








>







1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
    int nFree;
    int nCopy;
    int nLevel = (int)aCkpt[CKPT_HDR_NLEVEL];
    int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE;

    pNew->iId = lsmCheckpointId(aCkpt, 0);
    pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK];
    pNew->nWrite = aCkpt[CKPT_HDR_NWRITE];
    rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);
    pNew->iLogOff = lsmCheckpointLogOffset(aCkpt);

    /* Make a copy of the append-list */
    nCopy = sizeof(u32) * LSM_APPLIST_SZ;
    memcpy(pNew->aiAppend, &aCkpt[CKPT_HDR_SIZE+CKPT_LOGPTR_SIZE], nCopy);

1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154

1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171

1172
1173
1174
1175
1176
1177
1178

1179
1180


1181
1182
1183
1184
1185
1186
1187
**
** If successful, this function loads the snapshot from the meta-page, 
** verifies its checksum and sets *piId to the snapshot-id before returning
** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and
** LSM_OK returned. If an error occurs, an LSM error code is returned and
** the final value of *piId is undefined.
*/
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog){
  int rc = LSM_OK;
  MetaPage *pPg;
  u32 iMeta;

  iMeta = pDb->pShmhdr->iMetaPage;

  rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg);
  if( rc==LSM_OK ){
    int nCkpt;
    int nData;
    u8 *aData; 

    aData = lsmFsMetaPageData(pPg, &nData);
    assert( nData==LSM_META_PAGE_SIZE );
    nCkpt = lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);
    if( nCkpt<(LSM_META_PAGE_SIZE/sizeof(u32)) ){
      u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc);
      if( aCopy ){
        memcpy(aCopy, aData, nCkpt*sizeof(u32));
        ckptChangeEndianness(aCopy, nCkpt);
        if( ckptChecksumOk(aCopy) ){
          *piId = lsmCheckpointId(aCopy, 0);
          if( piLog ) *piLog = lsmCheckpointLogOffset(aCopy);

        }
        lsmFree(pDb->pEnv, aCopy);
      }
    }
    lsmFsMetaPageRelease(pPg);
  }


  if( rc!=LSM_OK || pDb->pShmhdr->iMetaPage!=iMeta ){
    *piId = 0;


  }
  return rc;
}

/*
** Return the checkpoint-id of the checkpoint array passed as the first
** argument to this function. If the second argument is true, then assume







|





>
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
>
|
|
|
|
|
|
|
>
|
|
>
>







1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
**
** If successful, this function loads the snapshot from the meta-page, 
** verifies its checksum and sets *piId to the snapshot-id before returning
** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and
** LSM_OK returned. If an error occurs, an LSM error code is returned and
** the final value of *piId is undefined.
*/
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite){
  int rc = LSM_OK;
  MetaPage *pPg;
  u32 iMeta;

  iMeta = pDb->pShmhdr->iMetaPage;
  if( iMeta==1 || iMeta==2 ){
    rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg);
    if( rc==LSM_OK ){
      int nCkpt;
      int nData;
      u8 *aData; 

      aData = lsmFsMetaPageData(pPg, &nData);
      assert( nData==LSM_META_PAGE_SIZE );
      nCkpt = lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);
      if( nCkpt<(LSM_META_PAGE_SIZE/sizeof(u32)) ){
        u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc);
        if( aCopy ){
          memcpy(aCopy, aData, nCkpt*sizeof(u32));
          ckptChangeEndianness(aCopy, nCkpt);
          if( ckptChecksumOk(aCopy) ){
            if( piId ) *piId = lsmCheckpointId(aCopy, 0);
            if( piLog ) *piLog = lsmCheckpointLogOffset(aCopy);
            if( pnWrite ) *pnWrite = aCopy[CKPT_HDR_NWRITE];
          }
          lsmFree(pDb->pEnv, aCopy);
        }
      }
      lsmFsMetaPageRelease(pPg);
    }
  }

  if( (iMeta!=1 && iMeta!=2) || rc!=LSM_OK || pDb->pShmhdr->iMetaPage!=iMeta ){
    if( piId ) *piId = 0;
    if( piLog ) *piLog = 0;
    if( pnWrite ) *pnWrite = 0;
  }
  return rc;
}

/*
** Return the checkpoint-id of the checkpoint array passed as the first
** argument to this function. If the second argument is true, then assume
1234
1235
1236
1237
1238
1239
1240
1241















  ckptChecksum(pDb->aSnapshot, nCkpt, 
      &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
  );

  memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32));
  memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32));
}
























>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
  ckptChecksum(pDb->aSnapshot, nCkpt, 
      &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
  );

  memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32));
  memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32));
}

int lsm_ckpt_size(lsm_db *db, int *pnByte){
  ShmHeader *pShm = db->pShmhdr;
  int rc = LSM_OK;
  u32 nSynced;

  rc = lsmCheckpointSynced(db, 0, 0, &nSynced);
  if( rc==LSM_OK ){
    u32 nPgsz = db->pShmhdr->aSnap1[CKPT_HDR_PGSZ];
    u32 nWrite = db->pShmhdr->aSnap1[CKPT_HDR_NWRITE];
    *pnByte = (int)((nWrite - nSynced) * nPgsz);
  }

  return rc;
}

Changes to src/lsm_log.c.
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
*/
static int logReclaimSpace(lsm_db *pDb){
  int rc = LSM_OK;
  if( pDb->pShmhdr->iMetaPage ){
    DbLog *pLog = &pDb->treehdr.log;
    i64 iSnapshotId = 0;
    i64 iOff = 0;
    rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff);
    if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
      int iRegion;
      for(iRegion=0; iRegion<3; iRegion++){
        LogRegion *p = &pLog->aRegion[iRegion];
        if( iOff>=p->iStart && iOff<=p->iEnd ) break;
        p->iStart = 0;
        p->iEnd = 0;







|







302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
*/
static int logReclaimSpace(lsm_db *pDb){
  int rc = LSM_OK;
  if( pDb->pShmhdr->iMetaPage ){
    DbLog *pLog = &pDb->treehdr.log;
    i64 iSnapshotId = 0;
    i64 iOff = 0;
    rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff, 0);
    if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
      int iRegion;
      for(iRegion=0; iRegion<3; iRegion++){
        LogRegion *p = &pLog->aRegion[iRegion];
        if( iOff>=p->iStart && iOff<=p->iEnd ) break;
        p->iStart = 0;
        p->iEnd = 0;
Changes to src/lsm_main.c.
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
    pDb->nTransOpen = iLevel;

  }
  dbReleaseClientSnapshot(pDb);

  /* If nFlush==0, then do not flush any data from the in-memory tree to 
  ** disk. If nFlush==1, then there exists an "old" tree that should be 
  ** flushed to disk if auto-work is enabled. Or if nFlush==2, then both
  ** the old and current trees are large enough to flush to disk. In this
  ** case do so regardless of the auto-work setting.  
  **
  ** If auto-work is enabled and data was written to disk, also sync the 
  ** db and checkpoint the latest snapshot.
  **
  ** Ignore any LSM_BUSY errors that occur during these operations. If
  ** LSM_BUSY does occur, it means some other connection is already working
  ** on flushing the in-memory tree or checkpointing the database. 
  */
  assert( rc!=LSM_BUSY);
  if( rc==LSM_OK ){
    if( nFlush>1 || (nFlush && pDb->bAutowork) ){
      rc = lsmFlushToDisk(pDb);
      if( rc==LSM_OK && pDb->bAutowork ){
        rc = lsmCheckpointWrite(pDb);
      }
    }else if( nFlush && pDb->xWork ){
      pDb->xWork(pDb, pDb->pWorkCtx);
    }







<
<
|
<
<










|







784
785
786
787
788
789
790


791


792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
    pDb->nTransOpen = iLevel;

  }
  dbReleaseClientSnapshot(pDb);



  /* If nFlush is not zero and auto-work is enabled, flush the tree to disk.


  **
  ** If auto-work is enabled and data was written to disk, also sync the 
  ** db and checkpoint the latest snapshot.
  **
  ** Ignore any LSM_BUSY errors that occur during these operations. If
  ** LSM_BUSY does occur, it means some other connection is already working
  ** on flushing the in-memory tree or checkpointing the database. 
  */
  assert( rc!=LSM_BUSY);
  if( rc==LSM_OK ){
    if( nFlush && pDb->bAutowork ){
      rc = lsmFlushToDisk(pDb);
      if( rc==LSM_OK && pDb->bAutowork ){
        rc = lsmCheckpointWrite(pDb);
      }
    }else if( nFlush && pDb->xWork ){
      pDb->xWork(pDb, pDb->pWorkCtx);
    }
836
837
838
839
840
841
842

















      lsmFinishWriteTrans(pDb, 0);
    }
    dbReleaseClientSnapshot(pDb);
  }

  return rc;
}
























>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
      lsmFinishWriteTrans(pDb, 0);
    }
    dbReleaseClientSnapshot(pDb);
  }

  return rc;
}

int lsm_tree_size(lsm_db *db, int *pnOld, int *pnNew){
  ShmHeader *pShm = db->pShmhdr;

  *pnNew = (int)pShm->hdr1.nByte;
  *pnOld = 0;
  if( pShm->hdr1.iOldShmid ){
    i64 iOff = pShm->hdr1.iOldLog;
    if( iOff!=lsmCheckpointLogOffset(pShm->aSnap1) ){
      *pnOld = 1;
    }
  }

  return LSM_OK;
}


Changes to src/lsm_shared.c.
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465

    /* The "is in use" bit */
    rc = lsmLsmInUse(pDb, iFree, &bInUse);

    /* The "has been checkpointed" bit */
    if( rc==LSM_OK && bInUse==0 ){
      i64 iId = 0;
      rc = lsmCheckpointSynced(pDb, &iId, 0);
      if( rc!=LSM_OK || iId<iFree ) bInUse = 2;
    }

    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );







|







451
452
453
454
455
456
457
458
459
460
461
462
463
464
465

    /* The "is in use" bit */
    rc = lsmLsmInUse(pDb, iFree, &bInUse);

    /* The "has been checkpointed" bit */
    if( rc==LSM_OK && bInUse==0 ){
      i64 iId = 0;
      rc = lsmCheckpointSynced(pDb, &iId, 0, 0);
      if( rc!=LSM_OK || iId<iFree ) bInUse = 2;
    }

    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );
Changes to src/lsm_sorted.c.
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
    }
    pCsr->flags |= CURSOR_NEXT_OK;
  }

  return rc;
}


int sortedWork(lsm_db *pDb, int nWork, int bOptimize, int *pnWrite){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;

  assert( lsmFsIntegrityCheck(pDb) );
  assert( pWorker );








<
|







3828
3829
3830
3831
3832
3833
3834

3835
3836
3837
3838
3839
3840
3841
3842
    }
    pCsr->flags |= CURSOR_NEXT_OK;
  }

  return rc;
}


static int sortedWork(lsm_db *pDb, int nWork, int bOptimize, int *pnWrite){
  int rc = LSM_OK;                /* Return Code */
  int nRemaining = nWork;         /* Units of work to do before returning */
  Snapshot *pWorker = pDb->pWorker;

  assert( lsmFsIntegrityCheck(pDb) );
  assert( pWorker );

3987
3988
3989
3990
3991
3992
3993

3994
3995
3996
3997
3998
3999
4000

    }
  }

  if( pnWrite ){
    *pnWrite = (nWork - nRemaining);
  }


  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  return rc;
}

typedef struct Metric Metric;
struct Metric {







>







3986
3987
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000

    }
  }

  if( pnWrite ){
    *pnWrite = (nWork - nRemaining);
  }
  pWorker->nWrite += (nWork - nRemaining);

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  return rc;
}

typedef struct Metric Metric;
struct Metric {
Changes to src/lsm_tree.c.
260
261
262
263
264
265
266
267
268
269


270

271
272
273
274
275
276
277

278
279
280
281
282
283
284
/*
** Return a pointer to the mapped memory location associated with *-shm 
** file offset iPtr.
*/
static void *treeShmptr(lsm_db *pDb, u32 iPtr, int *pRc){
  /* TODO: This will likely be way too slow. If it is, chunks should be
  ** cached as part of the db handle.  */
  if( iPtr && *pRc==0 ){
    int rc;
    void *pChunk;




    rc = lsmShmChunk(pDb, treeOffsetToChunk(iPtr), &pChunk);
    if( rc==LSM_OK ){
      return &((u8 *)pChunk)[iPtr & (LSM_SHM_CHUNK_SIZE-1)];
    }
    *pRc = rc;
  }
  return 0;

}

static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
  int rcdummy = LSM_OK;
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy);
}








<
<
|
>
>

>
|
<
<
|
<
|
|
>







260
261
262
263
264
265
266


267
268
269
270
271
272


273

274
275
276
277
278
279
280
281
282
283
/*
** Return a pointer to the mapped memory location associated with *-shm 
** file offset iPtr.
*/
static void *treeShmptr(lsm_db *pDb, u32 iPtr, int *pRc){
  /* TODO: This will likely be way too slow. If it is, chunks should be
  ** cached as part of the db handle.  */


  void *pChunk;
  int iChunk = (iPtr>>15);
  assert( LSM_SHM_CHUNK_SIZE==(1<<15) );

  if( (pDb->nShm<=iChunk || 0==(pChunk = pDb->apShm[iChunk])) ){
    *pRc = lsmShmChunk(pDb, iChunk, &pChunk);


  }


  if( iPtr==0 || *pRc ) return 0;
  return &((u8 *)pChunk)[iPtr & (LSM_SHM_CHUNK_SIZE-1)];
}

static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){
  int rcdummy = LSM_OK;
  return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy);
}