Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix log file wrapping so that it works as described in lsm.wiki. This eliminates some BUSY errors that were coming up in multi-thread tests.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: f8ce14403fc5c00d9ab88e8fcec1b06301641d5b
User & Date: dan 2012-09-11 18:48:07.430
Context
2012-09-11
18:57
Fix a problem preventing shared-memory space from being reused. check-in: 6f9c692a0e user: dan tags: trunk
18:48
Fix log file wrapping so that it works as described in lsm.wiki. This eliminates some BUSY errors that were coming up in multi-thread tests. check-in: f8ce14403f user: dan tags: trunk
17:44
Add lsm_env.xSleep() method. Fix shared-memory locks so that they work as described in lsm.wiki. check-in: 19f6896763 user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsmInt.h.
223
224
225
226
227
228
229

230
231
232
233
234
235
236
  i64 iStart;                     /* Start of region in log file */
  i64 iEnd;                       /* End of region in log file */
};

struct DbLog {
  u32 cksum0;                     /* Checksum 0 at offset iOff */
  u32 cksum1;                     /* Checksum 1 at offset iOff */

  LogRegion aRegion[3];           /* Log file regions (see docs in lsm_log.c) */
};

/*
** Tree header structure. 
*/
struct TreeHeader {







>







223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
  i64 iStart;                     /* Start of region in log file */
  i64 iEnd;                       /* End of region in log file */
};

struct DbLog {
  u32 cksum0;                     /* Checksum 0 at offset iOff */
  u32 cksum1;                     /* Checksum 1 at offset iOff */
  i64 iSnapshotId;                /* Log space has been reclaimed to this ss */
  LogRegion aRegion[3];           /* Log file regions (see docs in lsm_log.c) */
};

/*
** Tree header structure. 
*/
struct TreeHeader {
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
int lsmDatabaseFull(lsm_db *pDb);
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId);


/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);







|







491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);

int lsmCheckpointSaveWorker(lsm_db *pDb, int, int);
int lsmDatabaseFull(lsm_db *pDb);
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog);


/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
int lsmLogWrite(lsm_db *, void *, int, void *, int);
int lsmLogCommit(lsm_db *);
void lsmLogEnd(lsm_db *pDb, int bCommit);
void lsmLogTell(lsm_db *, LogMark *);
void lsmLogSeek(lsm_db *, LogMark *);

int lsmLogRecover(lsm_db *);
void lsmLogCheckpoint(lsm_db *, lsm_i64);
int lsmLogStructure(lsm_db *pDb, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".
*/








<







718
719
720
721
722
723
724

725
726
727
728
729
730
731
int lsmLogWrite(lsm_db *, void *, int, void *, int);
int lsmLogCommit(lsm_db *);
void lsmLogEnd(lsm_db *pDb, int bCommit);
void lsmLogTell(lsm_db *, LogMark *);
void lsmLogSeek(lsm_db *, LogMark *);

int lsmLogRecover(lsm_db *);

int lsmLogStructure(lsm_db *pDb, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".
*/

Changes to src/lsm_ckpt.c.
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
**
** If successful, this function loads the snapshot from the meta-page, 
** verifies its checksum and sets *piId to the snapshot-id before returning
** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and
** LSM_OK returned. If an error occurs, an LSM error code is returned and
** the final value of *piId is undefined.
*/
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId){
  int rc = LSM_OK;
  MetaPage *pPg;
  u32 iMeta;

  iMeta = pDb->pShmhdr->iMetaPage;
  rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg);
  if( rc==LSM_OK ){







|







1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
**
** If successful, this function loads the snapshot from the meta-page, 
** verifies its checksum and sets *piId to the snapshot-id before returning
** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and
** LSM_OK returned. If an error occurs, an LSM error code is returned and
** the final value of *piId is undefined.
*/
int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog){
  int rc = LSM_OK;
  MetaPage *pPg;
  u32 iMeta;

  iMeta = pDb->pShmhdr->iMetaPage;
  rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg);
  if( rc==LSM_OK ){
1147
1148
1149
1150
1151
1152
1153

1154
1155
1156
1157
1158
1159
1160
    if( nCkpt<(LSM_META_PAGE_SIZE/sizeof(u32)) ){
      u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc);
      if( aCopy ){
        memcpy(aCopy, aData, nCkpt*sizeof(u32));
        ckptChangeEndianness(aCopy, nCkpt);
        if( ckptChecksumOk(aCopy) ){
          *piId = lsmCheckpointId(aCopy, 0);

        }
        lsmFree(pDb->pEnv, aCopy);
      }
    }
    lsmFsMetaPageRelease(pPg);
  }








>







1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
    if( nCkpt<(LSM_META_PAGE_SIZE/sizeof(u32)) ){
      u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc);
      if( aCopy ){
        memcpy(aCopy, aData, nCkpt*sizeof(u32));
        ckptChangeEndianness(aCopy, nCkpt);
        if( ckptChecksumOk(aCopy) ){
          *piId = lsmCheckpointId(aCopy, 0);
          if( piLog ) *piLog = lsmCheckpointLogOffset(aCopy);
        }
        lsmFree(pDb->pEnv, aCopy);
      }
    }
    lsmFsMetaPageRelease(pPg);
  }

1195
1196
1197
1198
1199
1200
1201

1202
1203
1204
1205
1206
1207
1208
  DbLog *pLog
){ 
  u32 iOffMSB = aCkpt[CKPT_HDR_LO_MSW];
  u32 iOffLSB = aCkpt[CKPT_HDR_LO_LSW];
  pLog->aRegion[2].iStart = (((i64)iOffMSB) << 32) + ((i64)iOffLSB);
  pLog->cksum0 = aCkpt[CKPT_HDR_LO_CKSUM1];
  pLog->cksum1 = aCkpt[CKPT_HDR_LO_CKSUM2];

}

void lsmCheckpointZeroLogoffset(lsm_db *pDb){
  u32 nCkpt;

  nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT];
  assert( nCkpt>CKPT_HDR_NCKPT );







>







1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
  DbLog *pLog
){ 
  u32 iOffMSB = aCkpt[CKPT_HDR_LO_MSW];
  u32 iOffLSB = aCkpt[CKPT_HDR_LO_LSW];
  pLog->aRegion[2].iStart = (((i64)iOffMSB) << 32) + ((i64)iOffLSB);
  pLog->cksum0 = aCkpt[CKPT_HDR_LO_CKSUM1];
  pLog->cksum1 = aCkpt[CKPT_HDR_LO_CKSUM2];
  pLog->iSnapshotId = lsmCheckpointId(aCkpt, 0);
}

void lsmCheckpointZeroLogoffset(lsm_db *pDb){
  u32 nCkpt;

  nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT];
  assert( nCkpt>CKPT_HDR_NCKPT );
Changes to src/lsm_log.c.
290
291
292
293
294
295
296




























297
298
299
300
301
302
303

static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){
  return (iOff / pLog->szSector) * pLog->szSector;
}
static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){
  return firstByteOnSector(pLog, iOff) + pLog->szSector - 1;
}





























/*
** This function is called when a write-transaction is first opened. It
** is assumed that the caller is holding the client-mutex when it is 
** called.
**
** Before returning, this function allocates the LogWriter object that







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331

static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){
  return (iOff / pLog->szSector) * pLog->szSector;
}
static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){
  return firstByteOnSector(pLog, iOff) + pLog->szSector - 1;
}

/*
** If possible, reclaim log file space. Log file space is reclaimed after
** a snapshot that points to the same data in the database file is synced
** into the db header.
*/
static int logReclaimSpace(lsm_db *pDb){
  int rc = LSM_OK;
  if( pDb->pShmhdr->iMetaPage ){
    DbLog *pLog = &pDb->treehdr.log;
    i64 iSnapshotId = 0;
    i64 iOff = 0;
    rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff);
    if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
      int iRegion;
      for(iRegion=0; iRegion<3; iRegion++){
        LogRegion *p = &pLog->aRegion[iRegion];
        if( iOff>=p->iStart && iOff<=p->iEnd ) break;
        p->iStart = 0;
        p->iEnd = 0;
      }
      assert( iRegion<3 );
      pLog->aRegion[iRegion].iStart = iOff;
      pLog->iSnapshotId = iSnapshotId;
    }
  }
  return rc;
}

/*
** This function is called when a write-transaction is first opened. It
** is assumed that the caller is holding the client-mutex when it is 
** called.
**
** Before returning, this function allocates the LogWriter object that
312
313
314
315
316
317
318














319
320
321
322
323
324
325
  if( pDb->bUseLog==0 ) return LSM_OK;
  rc = lsmFsOpenLog(pDb->pFS);
  pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
  if( pNew ){
    lsmStringInit(&pNew->buf, pDb->pEnv);
    rc = lsmStringExtend(&pNew->buf, 2);
  }














  if( rc!=LSM_OK ){
    assert( pNew==0 || pNew->buf.z==0 );
    lsmFree(pDb->pEnv, pNew);
    return rc;
  }

  /* Set the effective sector-size for this transaction. Sectors are assumed







>
>
>
>
>
>
>
>
>
>
>
>
>
>







340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
  if( pDb->bUseLog==0 ) return LSM_OK;
  rc = lsmFsOpenLog(pDb->pFS);
  pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
  if( pNew ){
    lsmStringInit(&pNew->buf, pDb->pEnv);
    rc = lsmStringExtend(&pNew->buf, 2);
  }
  if( rc==LSM_OK ){
    /* The following call detects whether or not a new snapshot has been 
    ** synced into the database file. If so, it updates the contents of
    ** the pDb->treehdr.log structure to reclaim any space in the log
    ** file that is no longer required. 
    **
    ** TODO: Calling this every transaction is overkill. And since the 
    ** call has to read and checksum a snapshot from the database file,
    ** it is expensive. It would be better to figure out a way so that
    ** this is only called occasionally - say for every 32KB written to 
    ** the log file.
    */
    rc = logReclaimSpace(pDb);
  }
  if( rc!=LSM_OK ){
    assert( pNew==0 || pNew->buf.z==0 );
    lsmFree(pDb->pEnv, pNew);
    return rc;
  }

  /* Set the effective sector-size for this transaction. Sectors are assumed
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
    }
  }
  lsmStringClear(&p->buf);
  lsmFree(pDb->pEnv, p);
  pDb->pLogWriter = 0;
}

/*
** This function is called after a checkpoint is synced into the database
** file. The checkpoint specifies that the log starts at offset iOff.
** The shared state in *pLog is updated to reflect the fact that space
** in the log file that occurs logically before offset iOff may now
** be reused.
*/ 
void lsmLogCheckpoint(lsm_db *pDb, lsm_i64 iOff){
  DbLog *pLog = &pDb->treehdr.log;
  int iRegion;

  for(iRegion=0; iRegion<3; iRegion++){
    LogRegion *p = &pLog->aRegion[iRegion];
    if( iOff>=p->iStart && iOff<=p->iEnd ) break;
    p->iStart = 0;
    p->iEnd = 0;
  }
  assert( iRegion<3 );

  pLog->aRegion[iRegion].iStart = iOff;
}

static int jumpIfRequired(
  lsm_db *pDb,
  LogWriter *pLog,
  int nReq,
  int *pbJump
){
  /* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







467
468
469
470
471
472
473






















474
475
476
477
478
479
480
    }
  }
  lsmStringClear(&p->buf);
  lsmFree(pDb->pEnv, p);
  pDb->pLogWriter = 0;
}























static int jumpIfRequired(
  lsm_db *pDb,
  LogWriter *pLog,
  int nReq,
  int *pbJump
){
  /* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the
Changes to src/lsm_shared.c.
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463

    /* The "is in use" bit */
    rc = lsmLsmInUse(pDb, iFree, &bInUse);

    /* The "has been checkpointed" bit */
    if( rc==LSM_OK && bInUse==0 ){
      i64 iId = 0;
      rc = lsmCheckpointSynced(pDb, &iId);
      if( rc!=LSM_OK || iId<iFree ) bInUse = 1;
    }

    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );







|







449
450
451
452
453
454
455
456
457
458
459
460
461
462
463

    /* The "is in use" bit */
    rc = lsmLsmInUse(pDb, iFree, &bInUse);

    /* The "has been checkpointed" bit */
    if( rc==LSM_OK && bInUse==0 ){
      i64 iId = 0;
      rc = lsmCheckpointSynced(pDb, &iId, 0);
      if( rc!=LSM_OK || iId<iFree ) bInUse = 1;
    }

    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
      rc = lsmFsSyncDb(pDb->pFS);
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK ) rc = lsmFsSyncDb(pDb->pFS);
      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
    }
  }

  /* If no error has occured, then the snapshot currently in pDb->aSnapshot
  ** has been synced to disk. This means it may be possible to wrap the
  ** log file. Obtain the WRITER lock and update the relevent tree-header
  ** fields to reflect this. 
  */
  if( rc==LSM_OK ){
    u64 iLogoff = lsmCheckpointLogOffset(pDb->aSnapshot);
    if( pDb->nTransOpen==0 ){
      rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
    }
    if( rc==LSM_OK ){
      rc = lsmTreeLoadHeader(pDb);
      if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
      if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
      if( rc==LSM_BUSY ) rc = LSM_OK;
      if( pDb->nTransOpen==0 ){
        rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
      }
    }
    if( rc==LSM_BUSY ) rc = LSM_OK;
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
  int rc;








<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







560
561
562
563
564
565
566






















567
568
569
570
571
572
573
      rc = lsmFsSyncDb(pDb->pFS);
      if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta);
      if( rc==LSM_OK ) rc = lsmFsSyncDb(pDb->pFS);
      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
    }
  }























  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
  int rc;