Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Allow deserialized snapshots to persist between transactions. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
fc4601f91fd20b89d24a1d4a03bafe91 |
User & Date: | dan 2012-09-28 18:35:58.900 |
Context
2012-10-02
| ||
05:19 | Remove a layer of abstraction from the cursor object in lsm_sorted.c. check-in: ff71b6f778 user: dan tags: trunk | |
2012-09-28
| ||
18:35 | Allow deserialized snapshots to persist between transactions. check-in: fc4601f91f user: dan tags: trunk | |
14:57 | Improvements to lsmperf.tcl test. check-in: 371c6c984d user: dan tags: trunk | |
Changes
Changes to src/lsmInt.h.
︙ | ︙ | |||
325 326 327 328 329 330 331 | void **apShm; /* Shared memory chunks */ ShmHeader *pShmhdr; /* Live shared-memory header */ TreeHeader treehdr; /* Local copy of tree-header */ u32 aSnapshot[LSM_META_PAGE_SIZE / sizeof(u32)]; }; struct Segment { | | | < > > | 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 | void **apShm; /* Shared memory chunks */ ShmHeader *pShmhdr; /* Live shared-memory header */ TreeHeader treehdr; /* Local copy of tree-header */ u32 aSnapshot[LSM_META_PAGE_SIZE / sizeof(u32)]; }; struct Segment { Pgno iFirst; /* First page of this run */ Pgno iLast; /* Last page of this run */ Pgno iRoot; /* Root page number (if any) */ int nSize; /* Size of this run in pages */ }; /* ** iSplitTopic/pSplitKey/nSplitKey: ** If nRight>0, this buffer contains a copy of the largest key that has ** already been written to the left-hand-side of the level. */ struct Level { Segment lhs; /* Left-hand (main) segment */ int nRight; /* Size of apRight[] array */ Segment *aRhs; /* Old segments being merged into this */ int iSplitTopic; /* Split key topic (if nRight>0) */ void *pSplitKey; /* Pointer to split-key (if nRight>0) */ int nSplitKey; /* Number of bytes in split-key */ int iAge; /* Number of times data has been written */ Merge *pMerge; /* Merge operation currently underway */ Level *pNext; /* Next level in tree */ }; /* ** A structure describing an ongoing merge. There is an instance of this ** structure for every Level currently undergoing a merge in the worker |
︙ | ︙ | |||
493 494 495 496 497 498 499 500 501 502 503 504 505 506 | int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **); int lsmCheckpointLoadWorker(lsm_db *pDb); int lsmCheckpointStore(lsm_db *pDb, int); int lsmCheckpointLoad(lsm_db *pDb, int *); int lsmCheckpointLoadOk(lsm_db *pDb, int); i64 lsmCheckpointId(u32 *, int); u32 lsmCheckpointNWrite(u32 *, int); i64 lsmCheckpointLogOffset(u32 *); int lsmCheckpointPgsz(u32 *); int lsmCheckpointBlksz(u32 *); void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog); | > | 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 | int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **); int lsmCheckpointLoadWorker(lsm_db *pDb); int lsmCheckpointStore(lsm_db *pDb, int); int lsmCheckpointLoad(lsm_db *pDb, int *); int lsmCheckpointLoadOk(lsm_db *pDb, int); int lsmCheckpointClientCacheOk(lsm_db *); i64 lsmCheckpointId(u32 *, int); u32 lsmCheckpointNWrite(u32 *, int); i64 lsmCheckpointLogOffset(u32 *); int lsmCheckpointPgsz(u32 *); int lsmCheckpointBlksz(u32 *); void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog); |
︙ | ︙ |
Changes to src/lsm_ckpt.c.
︙ | ︙ | |||
84 85 86 87 88 89 90 | ** 2. Checksum value 2. ** ** In the above, a segment record is: ** ** 1. First page of array, ** 2. Last page of array, ** 3. Root page of array (or 0), | | | 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | ** 2. Checksum value 2. ** ** In the above, a segment record is: ** ** 1. First page of array, ** 2. Last page of array, ** 3. Root page of array (or 0), ** 4. Size of array in pages. */ /* ** LARGE NUMBERS OF LEVEL RECORDS: ** ** A limit on the number of rhs segments that may be present in the database ** file. Defining this limit ensures that all level records fit within |
︙ | ︙ | |||
996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 | int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){ u32 *aShm; assert( iSnap==1 || iSnap==2 ); aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2; return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) ); } int lsmCheckpointLoadWorker(lsm_db *pDb){ int rc; ShmHeader *pShm = pDb->pShmhdr; int nInt1; int nInt2; | > > > > > > > > | 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 | int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){ u32 *aShm; assert( iSnap==1 || iSnap==2 ); aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2; return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) ); } int lsmCheckpointClientCacheOk(lsm_db *pDb){ return ( pDb->pClient && pDb->pClient->iId==lsmCheckpointId(pDb->aSnapshot, 0) && pDb->pClient->iId==lsmCheckpointId(pDb->pShmhdr->aSnap1, 0) && pDb->pClient->iId==lsmCheckpointId(pDb->pShmhdr->aSnap2, 0) ); } int lsmCheckpointLoadWorker(lsm_db *pDb){ int rc; ShmHeader *pShm = pDb->pShmhdr; int nInt1; int nInt2; |
︙ | ︙ |
Changes to src/lsm_main.c.
︙ | ︙ | |||
35 36 37 38 39 40 41 | */ static void assert_db_state(lsm_db *pDb){ /* If there is at least one cursor or a write transaction open, the database ** handle must be holding a pointer to a client snapshot. And the reverse ** - if there are no open cursors and no write transactions then there must ** not be a client snapshot. */ | | | 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | */ static void assert_db_state(lsm_db *pDb){ /* If there is at least one cursor or a write transaction open, the database ** handle must be holding a pointer to a client snapshot. And the reverse ** - if there are no open cursors and no write transactions then there must ** not be a client snapshot. */ assert( (pDb->pCsr!=0 || pDb->nTransOpen>0)==(pDb->iReader>=0) ); assert( pDb->nTransOpen>=0 ); } #else # define assert_db_state(x) #endif |
︙ | ︙ | |||
176 177 178 179 180 181 182 183 184 185 186 187 188 189 | int lsm_close(lsm_db *pDb){ int rc = LSM_OK; if( pDb ){ assert_db_state(pDb); if( pDb->pCsr || pDb->nTransOpen ){ rc = LSM_MISUSE_BKPT; }else{ lsmDbDatabaseRelease(pDb); lsmFsClose(pDb->pFS); lsmFree(pDb->pEnv, pDb->aTrans); lsmFree(pDb->pEnv, pDb->apShm); lsmFree(pDb->pEnv, pDb); } } | > > | 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | int lsm_close(lsm_db *pDb){ int rc = LSM_OK; if( pDb ){ assert_db_state(pDb); if( pDb->pCsr || pDb->nTransOpen ){ rc = LSM_MISUSE_BKPT; }else{ lsmFreeSnapshot(pDb->pEnv, pDb->pClient); pDb->pClient = 0; lsmDbDatabaseRelease(pDb); lsmFsClose(pDb->pFS); lsmFree(pDb->pEnv, pDb->aTrans); lsmFree(pDb->pEnv, pDb->apShm); lsmFree(pDb->pEnv, pDb); } } |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
667 668 669 670 671 672 673 | */ int lsmBeginReadTrans(lsm_db *pDb){ const int MAX_READLOCK_ATTEMPTS = 10; int rc = LSM_OK; /* Return code */ int iAttempt = 0; assert( pDb->pWorker==0 ); | < | > > > | > > > | 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 | */ int lsmBeginReadTrans(lsm_db *pDb){ const int MAX_READLOCK_ATTEMPTS = 10; int rc = LSM_OK; /* Return code */ int iAttempt = 0; assert( pDb->pWorker==0 ); while( rc==LSM_OK && pDb->iReader<0 && (iAttempt++)<MAX_READLOCK_ATTEMPTS ){ int iTreehdr = 0; int iSnap = 0; assert( pDb->pCsr==0 && pDb->nTransOpen==0 ); /* Load the in-memory tree header. */ rc = lsmTreeLoadHeader(pDb, &iTreehdr); /* Load the database snapshot */ if( rc==LSM_OK ){ if( lsmCheckpointClientCacheOk(pDb)==0 ){ lsmFreeSnapshot(pDb->pEnv, pDb->pClient); pDb->pClient = 0; rc = lsmCheckpointLoad(pDb, &iSnap); }else{ iSnap = 1; } } /* Take a read-lock on the tree and snapshot just loaded. Then check ** that the shared-memory still contains the same values. If so, proceed. ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ |
︙ | ︙ | |||
701 702 703 704 705 706 707 | if( lsmTreeLoadHeaderOk(pDb, iTreehdr) && lsmCheckpointLoadOk(pDb, iSnap) ){ /* Read lock has been successfully obtained. Deserialize the ** checkpoint just loaded. TODO: This will be removed after ** lsm_sorted.c is changed to work directly from the serialized ** version of the snapshot. */ | > | > | 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 | if( lsmTreeLoadHeaderOk(pDb, iTreehdr) && lsmCheckpointLoadOk(pDb, iSnap) ){ /* Read lock has been successfully obtained. Deserialize the ** checkpoint just loaded. TODO: This will be removed after ** lsm_sorted.c is changed to work directly from the serialized ** version of the snapshot. */ if( pDb->pClient==0 ){ rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot,&pDb->pClient); } assert( (rc==LSM_OK)==(pDb->pClient!=0) ); assert( pDb->iReader>=0 ); }else{ rc = lsmReleaseReadlock(pDb); } } if( rc==LSM_BUSY ){ |
︙ | ︙ | |||
724 725 726 727 728 729 730 | (int)pDb->treehdr.root.iTransId, (int)pDb->treehdr.iOldShmid ); fflush(stdout); } #endif } | | > > | > > > < | 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 | (int)pDb->treehdr.root.iTransId, (int)pDb->treehdr.iOldShmid ); fflush(stdout); } #endif } if( rc!=LSM_OK ){ lsmReleaseReadlock(pDb); } if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY; return rc; } /* ** Close the currently open read transaction. */ void lsmFinishReadTrans(lsm_db *pDb){ Snapshot *pClient = pDb->pClient; /* Worker connections should not be closing read transactions. And ** read transactions should only be closed after all cursors and write ** transactions have been closed. Finally pClient should be non-NULL ** only iff pDb->iReader>=0. */ assert( pDb->pWorker==0 ); assert( pDb->pCsr==0 && pDb->nTransOpen==0 ); #if 0 if( pClient ){ lsmFreeSnapshot(pDb->pEnv, pDb->pClient); pDb->pClient = 0; } #endif if( pDb->iReader>=0 ) lsmReleaseReadlock(pDb); } /* ** Open a write transaction. */ int lsmBeginWriteTrans(lsm_db *pDb){ int rc; /* Return code */ |
︙ | ︙ | |||
974 975 976 977 978 979 980 | /* ** Release the read-lock currently held by connection db. */ int lsmReleaseReadlock(lsm_db *db){ int rc = LSM_OK; if( db->iReader>=0 ){ | < | 985 986 987 988 989 990 991 992 993 994 995 996 997 998 | /* ** Release the read-lock currently held by connection db. */ int lsmReleaseReadlock(lsm_db *db){ int rc = LSM_OK; if( db->iReader>=0 ){ rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0); db->iReader = -1; } return rc; } /* |
︙ | ︙ |
Changes to src/lsm_sorted.c.
︙ | ︙ | |||
250 251 252 253 254 255 256 257 258 259 260 261 262 263 | int flags; /* Mask of CURSOR_XXX flags */ int (*xCmp)(void *, int, void *, int); /* Compare function */ int eType; /* Cache of current key type */ Blob key; /* Cache of current key (or NULL) */ Blob val; /* Cache of current value */ TreeCursor *apTreeCsr[2]; /* One or two tree cursors */ int nSegCsr; /* Size of aSegCsr[] array */ LevelCursor *aSegCsr; /* Array of cursors open on sorted files */ int nTree; int *aTree; BtreeCursor *pBtCsr; | > | 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | int flags; /* Mask of CURSOR_XXX flags */ int (*xCmp)(void *, int, void *, int); /* Compare function */ int eType; /* Cache of current key type */ Blob key; /* Cache of current key (or NULL) */ Blob val; /* Cache of current value */ /* All the component cursors: */ TreeCursor *apTreeCsr[2]; /* One or two tree cursors */ int nSegCsr; /* Size of aSegCsr[] array */ LevelCursor *aSegCsr; /* Array of cursors open on sorted files */ int nTree; int *aTree; BtreeCursor *pBtCsr; |
︙ | ︙ |