Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Fix log file wrapping so that it works as described in lsm.wiki. This eliminates some BUSY errors that were coming up in multi-thread tests. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
f8ce14403fc5c00d9ab88e8fcec1b063 |
User & Date: | dan 2012-09-11 18:48:07.430 |
Context
2012-09-11
| ||
18:57 | Fix a problem preventing shared-memory space from being reused. check-in: 6f9c692a0e user: dan tags: trunk | |
18:48 | Fix log file wrapping so that it works as described in lsm.wiki. This eliminates some BUSY errors that were coming up in multi-thread tests. check-in: f8ce14403f user: dan tags: trunk | |
17:44 | Add lsm_env.xSleep() method. Fix shared-memory locks so that they work as described in lsm.wiki. check-in: 19f6896763 user: dan tags: trunk | |
Changes
Changes to src/lsmInt.h.
︙ | ︙ | |||
223 224 225 226 227 228 229 230 231 232 233 234 235 236 | i64 iStart; /* Start of region in log file */ i64 iEnd; /* End of region in log file */ }; struct DbLog { u32 cksum0; /* Checksum 0 at offset iOff */ u32 cksum1; /* Checksum 1 at offset iOff */ LogRegion aRegion[3]; /* Log file regions (see docs in lsm_log.c) */ }; /* ** Tree header structure. */ struct TreeHeader { | > | 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | i64 iStart; /* Start of region in log file */ i64 iEnd; /* End of region in log file */ }; struct DbLog { u32 cksum0; /* Checksum 0 at offset iOff */ u32 cksum1; /* Checksum 1 at offset iOff */ i64 iSnapshotId; /* Log space has been reclaimed to this ss */ LogRegion aRegion[3]; /* Log file regions (see docs in lsm_log.c) */ }; /* ** Tree header structure. */ struct TreeHeader { |
︙ | ︙ | |||
490 491 492 493 494 495 496 | int lsmCheckpointPgsz(u32 *); int lsmCheckpointBlksz(u32 *); void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog); void lsmCheckpointZeroLogoffset(lsm_db *); int lsmCheckpointSaveWorker(lsm_db *pDb, int, int); int lsmDatabaseFull(lsm_db *pDb); | | | 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 | int lsmCheckpointPgsz(u32 *); int lsmCheckpointBlksz(u32 *); void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog); void lsmCheckpointZeroLogoffset(lsm_db *); int lsmCheckpointSaveWorker(lsm_db *pDb, int, int); int lsmDatabaseFull(lsm_db *pDb); int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog); /* ** Functions from file "lsm_tree.c". */ int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree); void lsmTreeRelease(lsm_env *, Tree *); |
︙ | ︙ | |||
717 718 719 720 721 722 723 | int lsmLogWrite(lsm_db *, void *, int, void *, int); int lsmLogCommit(lsm_db *); void lsmLogEnd(lsm_db *pDb, int bCommit); void lsmLogTell(lsm_db *, LogMark *); void lsmLogSeek(lsm_db *, LogMark *); int lsmLogRecover(lsm_db *); | < | 718 719 720 721 722 723 724 725 726 727 728 729 730 731 | int lsmLogWrite(lsm_db *, void *, int, void *, int); int lsmLogCommit(lsm_db *); void lsmLogEnd(lsm_db *pDb, int bCommit); void lsmLogTell(lsm_db *, LogMark *); void lsmLogSeek(lsm_db *, LogMark *); int lsmLogRecover(lsm_db *); int lsmLogStructure(lsm_db *pDb, char **pzVal); /************************************************************************** ** Functions from file "lsm_shared.c". */ |
︙ | ︙ |
Changes to src/lsm_ckpt.c.
︙ | ︙ | |||
1125 1126 1127 1128 1129 1130 1131 | ** ** If successful, this function loads the snapshot from the meta-page, ** verifies its checksum and sets *piId to the snapshot-id before returning ** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and ** LSM_OK returned. If an error occurs, an LSM error code is returned and ** the final value of *piId is undefined. */ | | | 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 | ** ** If successful, this function loads the snapshot from the meta-page, ** verifies its checksum and sets *piId to the snapshot-id before returning ** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and ** LSM_OK returned. If an error occurs, an LSM error code is returned and ** the final value of *piId is undefined. */ int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog){ int rc = LSM_OK; MetaPage *pPg; u32 iMeta; iMeta = pDb->pShmhdr->iMetaPage; rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg); if( rc==LSM_OK ){ |
︙ | ︙ | |||
1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 | if( nCkpt<(LSM_META_PAGE_SIZE/sizeof(u32)) ){ u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc); if( aCopy ){ memcpy(aCopy, aData, nCkpt*sizeof(u32)); ckptChangeEndianness(aCopy, nCkpt); if( ckptChecksumOk(aCopy) ){ *piId = lsmCheckpointId(aCopy, 0); } lsmFree(pDb->pEnv, aCopy); } } lsmFsMetaPageRelease(pPg); } | > | 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 | if( nCkpt<(LSM_META_PAGE_SIZE/sizeof(u32)) ){ u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc); if( aCopy ){ memcpy(aCopy, aData, nCkpt*sizeof(u32)); ckptChangeEndianness(aCopy, nCkpt); if( ckptChecksumOk(aCopy) ){ *piId = lsmCheckpointId(aCopy, 0); if( piLog ) *piLog = lsmCheckpointLogOffset(aCopy); } lsmFree(pDb->pEnv, aCopy); } } lsmFsMetaPageRelease(pPg); } |
︙ | ︙ | |||
1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 | DbLog *pLog ){ u32 iOffMSB = aCkpt[CKPT_HDR_LO_MSW]; u32 iOffLSB = aCkpt[CKPT_HDR_LO_LSW]; pLog->aRegion[2].iStart = (((i64)iOffMSB) << 32) + ((i64)iOffLSB); pLog->cksum0 = aCkpt[CKPT_HDR_LO_CKSUM1]; pLog->cksum1 = aCkpt[CKPT_HDR_LO_CKSUM2]; } void lsmCheckpointZeroLogoffset(lsm_db *pDb){ u32 nCkpt; nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT]; assert( nCkpt>CKPT_HDR_NCKPT ); | > | 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 | DbLog *pLog ){ u32 iOffMSB = aCkpt[CKPT_HDR_LO_MSW]; u32 iOffLSB = aCkpt[CKPT_HDR_LO_LSW]; pLog->aRegion[2].iStart = (((i64)iOffMSB) << 32) + ((i64)iOffLSB); pLog->cksum0 = aCkpt[CKPT_HDR_LO_CKSUM1]; pLog->cksum1 = aCkpt[CKPT_HDR_LO_CKSUM2]; pLog->iSnapshotId = lsmCheckpointId(aCkpt, 0); } void lsmCheckpointZeroLogoffset(lsm_db *pDb){ u32 nCkpt; nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT]; assert( nCkpt>CKPT_HDR_NCKPT ); |
︙ | ︙ |
Changes to src/lsm_log.c.
︙ | ︙ | |||
290 291 292 293 294 295 296 297 298 299 300 301 302 303 | static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){ return (iOff / pLog->szSector) * pLog->szSector; } static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){ return firstByteOnSector(pLog, iOff) + pLog->szSector - 1; } /* ** This function is called when a write-transaction is first opened. It ** is assumed that the caller is holding the client-mutex when it is ** called. ** ** Before returning, this function allocates the LogWriter object that | > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 | static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){ return (iOff / pLog->szSector) * pLog->szSector; } static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){ return firstByteOnSector(pLog, iOff) + pLog->szSector - 1; } /* ** If possible, reclaim log file space. Log file space is reclaimed after ** a snapshot that points to the same data in the database file is synced ** into the db header. */ static int logReclaimSpace(lsm_db *pDb){ int rc = LSM_OK; if( pDb->pShmhdr->iMetaPage ){ DbLog *pLog = &pDb->treehdr.log; i64 iSnapshotId = 0; i64 iOff = 0; rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff); if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){ int iRegion; for(iRegion=0; iRegion<3; iRegion++){ LogRegion *p = &pLog->aRegion[iRegion]; if( iOff>=p->iStart && iOff<=p->iEnd ) break; p->iStart = 0; p->iEnd = 0; } assert( iRegion<3 ); pLog->aRegion[iRegion].iStart = iOff; pLog->iSnapshotId = iSnapshotId; } } return rc; } /* ** This function is called when a write-transaction is first opened. It ** is assumed that the caller is holding the client-mutex when it is ** called. ** ** Before returning, this function allocates the LogWriter object that |
︙ | ︙ | |||
312 313 314 315 316 317 318 319 320 321 322 323 324 325 | if( pDb->bUseLog==0 ) return LSM_OK; rc = lsmFsOpenLog(pDb->pFS); pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc); if( pNew ){ lsmStringInit(&pNew->buf, pDb->pEnv); rc = lsmStringExtend(&pNew->buf, 2); } if( rc!=LSM_OK ){ assert( pNew==0 || pNew->buf.z==0 ); lsmFree(pDb->pEnv, pNew); return rc; } /* Set the effective sector-size for this transaction. Sectors are assumed | > > > > > > > > > > > > > > | 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 | if( pDb->bUseLog==0 ) return LSM_OK; rc = lsmFsOpenLog(pDb->pFS); pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc); if( pNew ){ lsmStringInit(&pNew->buf, pDb->pEnv); rc = lsmStringExtend(&pNew->buf, 2); } if( rc==LSM_OK ){ /* The following call detects whether or not a new snapshot has been ** synced into the database file. If so, it updates the contents of ** the pDb->treehdr.log structure to reclaim any space in the log ** file that is no longer required. ** ** TODO: Calling this every transaction is overkill. And since the ** call has to read and checksum a snapshot from the database file, ** it is expensive. It would be better to figure out a way so that ** this is only called occasionally - say for every 32KB written to ** the log file. */ rc = logReclaimSpace(pDb); } if( rc!=LSM_OK ){ assert( pNew==0 || pNew->buf.z==0 ); lsmFree(pDb->pEnv, pNew); return rc; } /* Set the effective sector-size for this transaction. Sectors are assumed |
︙ | ︙ | |||
425 426 427 428 429 430 431 | } } lsmStringClear(&p->buf); lsmFree(pDb->pEnv, p); pDb->pLogWriter = 0; } | < < < < < < < < < < < < < < < < < < < < < < | 467 468 469 470 471 472 473 474 475 476 477 478 479 480 | } } lsmStringClear(&p->buf); lsmFree(pDb->pEnv, p); pDb->pLogWriter = 0; } static int jumpIfRequired( lsm_db *pDb, LogWriter *pLog, int nReq, int *pbJump ){ /* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
449 450 451 452 453 454 455 | /* The "is in use" bit */ rc = lsmLsmInUse(pDb, iFree, &bInUse); /* The "has been checkpointed" bit */ if( rc==LSM_OK && bInUse==0 ){ i64 iId = 0; | | | 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 | /* The "is in use" bit */ rc = lsmLsmInUse(pDb, iFree, &bInUse); /* The "has been checkpointed" bit */ if( rc==LSM_OK && bInUse==0 ){ i64 iId = 0; rc = lsmCheckpointSynced(pDb, &iId, 0); if( rc!=LSM_OK || iId<iFree ) bInUse = 1; } if( rc==LSM_OK && bInUse==0 ){ iRet = pFree->aEntry[0].iBlk; flRemoveEntry0(pFree); assert( iRet!=0 ); |
︙ | ︙ | |||
560 561 562 563 564 565 566 | rc = lsmFsSyncDb(pDb->pFS); if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta); if( rc==LSM_OK ) rc = lsmFsSyncDb(pDb->pFS); if( rc==LSM_OK ) pShm->iMetaPage = iMeta; } } | < < < < < < < < < < < < < < < < < < < < < < | 560 561 562 563 564 565 566 567 568 569 570 571 572 573 | rc = lsmFsSyncDb(pDb->pFS); if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta); if( rc==LSM_OK ) rc = lsmFsSyncDb(pDb->pFS); if( rc==LSM_OK ) pShm->iMetaPage = iMeta; } } lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0); return rc; } int lsmBeginWork(lsm_db *pDb){ int rc; |
︙ | ︙ |