Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Change the way worker clients keep snapshots consistent so as to match the description in lsm.wiki. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
0427b07c145adb655fc0450d8835984d |
User & Date: | dan 2012-09-12 16:19:03.155 |
Context
2012-09-12
| ||
17:23 | Fix a problem in the lsm_info(LOG_STRUCTURE) command causing errors in tcl tests. check-in: 6e5e429ea7 user: dan tags: trunk | |
16:19 | Change the way worker clients keep snapshots consistent so as to match the description in lsm.wiki. check-in: 0427b07c14 user: dan tags: trunk | |
14:47 | Fix an assert() failure that can follow an OOM error. check-in: d65a5112b6 user: dan tags: trunk | |
Changes
Changes to src/lsm.h.
︙ | ︙ | |||
103 104 105 106 107 108 109 110 111 112 113 114 115 116 | #define LSM_ERROR 1 #define LSM_BUSY 5 #define LSM_NOMEM 7 #define LSM_IOERR 10 #define LSM_CORRUPT 11 #define LSM_FULL 13 #define LSM_CANTOPEN 14 #define LSM_MISUSE 21 /* ** Open and close a connection to a named database. */ int lsm_new(lsm_env*, lsm_db **ppDb); int lsm_open(lsm_db *pDb, const char *zFilename); | > | 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | #define LSM_ERROR 1 #define LSM_BUSY 5 #define LSM_NOMEM 7 #define LSM_IOERR 10 #define LSM_CORRUPT 11 #define LSM_FULL 13 #define LSM_CANTOPEN 14 #define LSM_PROTOCOL 15 #define LSM_MISUSE 21 /* ** Open and close a connection to a named database. */ int lsm_new(lsm_env*, lsm_db **ppDb); int lsm_open(lsm_db *pDb, const char *zFilename); |
︙ | ︙ |
Changes to src/lsmInt.h.
︙ | ︙ | |||
146 147 148 149 150 151 152 153 154 155 156 157 158 159 | /* ** Hard limit on the number of free-list entries that may be stored in ** a checkpoint (the remainder are stored as a system record in the LSM). ** See also LSM_CONFIG_MAX_FREELIST. */ #define LSM_MAX_FREELIST_ENTRIES 100 /* ** A string that can grow by appending. */ struct LsmString { lsm_env *pEnv; /* Run-time environment */ int n; /* Size of string. -1 indicates error */ int nAlloc; /* Space allocated for z[] */ | > > | 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | /* ** Hard limit on the number of free-list entries that may be stored in ** a checkpoint (the remainder are stored as a system record in the LSM). ** See also LSM_CONFIG_MAX_FREELIST. */ #define LSM_MAX_FREELIST_ENTRIES 100 #define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000 /* ** A string that can grow by appending. */ struct LsmString { lsm_env *pEnv; /* Run-time environment */ int n; /* Size of string. -1 indicates error */ int nAlloc; /* Space allocated for z[] */ |
︙ | ︙ | |||
404 405 406 407 408 409 410 | ** contains the most recently written checkpoint (either 1 or 2). ** ** hdr1, hdr2: ** The two copies of the in-memory tree header. Two copies are required ** in case a writer fails while updating one of them. */ struct ShmHeader { | | | | 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 | ** contains the most recently written checkpoint (either 1 or 2). ** ** hdr1, hdr2: ** The two copies of the in-memory tree header. Two copies are required ** in case a writer fails while updating one of them. */ struct ShmHeader { u32 aSnap1[LSM_META_PAGE_SIZE / 4]; u32 aSnap2[LSM_META_PAGE_SIZE / 4]; u32 bWriter; u32 iMetaPage; TreeHeader hdr1; TreeHeader hdr2; ShmReader aReader[LSM_LOCK_NREADER]; }; |
︙ | ︙ | |||
478 479 480 481 482 483 484 | int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *); int lsmCheckpointOverflowRequired(lsm_db *pDb); int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *); int lsmCheckpointRecover(lsm_db *); int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **); | < > > > | 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 | int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *); int lsmCheckpointOverflowRequired(lsm_db *pDb); int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *); int lsmCheckpointRecover(lsm_db *); int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **); int lsmCheckpointLoadWorker(lsm_db *pDb); int lsmCheckpointStore(lsm_db *pDb, int); int lsmCheckpointLoad(lsm_db *pDb, int *); int lsmCheckpointLoadOk(lsm_db *pDb, int); i64 lsmCheckpointId(u32 *, int); i64 lsmCheckpointLogOffset(u32 *); int lsmCheckpointPgsz(u32 *); int lsmCheckpointBlksz(u32 *); void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog); void lsmCheckpointZeroLogoffset(lsm_db *); |
︙ | ︙ | |||
506 507 508 509 510 511 512 | void lsmTreeClear(lsm_db *); int lsmTreeInit(lsm_db *); int lsmTreeRepair(lsm_db *); int lsmTreeSize(lsm_db *); int lsmTreeEndTransaction(lsm_db *pDb, int bCommit); int lsmTreeBeginTransaction(lsm_db *pDb); | | > | 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 | void lsmTreeClear(lsm_db *); int lsmTreeInit(lsm_db *); int lsmTreeRepair(lsm_db *); int lsmTreeSize(lsm_db *); int lsmTreeEndTransaction(lsm_db *pDb, int bCommit); int lsmTreeBeginTransaction(lsm_db *pDb); int lsmTreeLoadHeader(lsm_db *pDb, int *); int lsmTreeLoadHeaderOk(lsm_db *, int); int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal); void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark); void lsmTreeMark(lsm_db *pDb, TreeMark *pMark); int lsmTreeCursorNew(lsm_db *pDb, TreeCursor **); void lsmTreeCursorDestroy(TreeCursor *); |
︙ | ︙ | |||
642 643 644 645 646 647 648 649 650 651 652 653 654 655 | int lsmEnvOpen(lsm_env *, const char *, lsm_file **); int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile); int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock); int lsmEnvShmMap(lsm_env *, lsm_file *, int, int, void **); void lsmEnvShmBarrier(lsm_env *); void lsmEnvShmUnmap(lsm_env *, lsm_file *, int); /* ** End of functions from "lsm_file.c". **************************************************************************/ /* ** Functions from file "lsm_sorted.c". | > > | 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 | int lsmEnvOpen(lsm_env *, const char *, lsm_file **); int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile); int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock); int lsmEnvShmMap(lsm_env *, lsm_file *, int, int, void **); void lsmEnvShmBarrier(lsm_env *); void lsmEnvShmUnmap(lsm_env *, lsm_file *, int); void lsmEnvSleep(lsm_env *, int); /* ** End of functions from "lsm_file.c". **************************************************************************/ /* ** Functions from file "lsm_sorted.c". |
︙ | ︙ |
Changes to src/lsm_ckpt.c.
︙ | ︙ | |||
331 332 333 334 335 336 337 | i64 iOff = pLog->aRegion[2].iEnd; ckptSetValue(p, iOut++, (iOff >> 32) & 0xFFFFFFFF, pRc); ckptSetValue(p, iOut++, (iOff & 0xFFFFFFFF), pRc); ckptSetValue(p, iOut++, pLog->cksum0, pRc); ckptSetValue(p, iOut++, pLog->cksum1, pRc); }else{ for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){ | | | 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 | i64 iOff = pLog->aRegion[2].iEnd; ckptSetValue(p, iOut++, (iOff >> 32) & 0xFFFFFFFF, pRc); ckptSetValue(p, iOut++, (iOff & 0xFFFFFFFF), pRc); ckptSetValue(p, iOut++, pLog->cksum0, pRc); ckptSetValue(p, iOut++, pLog->cksum1, pRc); }else{ for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){ ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc); } } *piOut = iOut; } static void ckptExportAppendlist( |
︙ | ︙ | |||
378 379 380 381 382 383 384 | CkptBuffer ckpt; int nFree; nFree = pSnap->freelist.nEntry; if( nOvfl>=0 ){ nFree -= nOvfl; }else{ | | | 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 | CkptBuffer ckpt; int nFree; nFree = pSnap->freelist.nEntry; if( nOvfl>=0 ){ nFree -= nOvfl; }else{ nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL]; } /* Initialize the output buffer */ memset(&ckpt, 0, sizeof(CkptBuffer)); ckpt.pEnv = pDb->pEnv; iOut = CKPT_HDR_SIZE; |
︙ | ︙ | |||
822 823 824 825 826 827 828 | ** Attempt to load a checkpoint from meta page iMeta. ** ** This function is a no-op if *pRc is set to any value other than LSM_OK ** when it is called. If an error occurs, *pRc is set to an LSM error code ** before returning. ** ** If no error occurs and the checkpoint is successfully loaded, copy it to | | | 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 | ** Attempt to load a checkpoint from meta page iMeta. ** ** This function is a no-op if *pRc is set to any value other than LSM_OK ** when it is called. If an error occurs, *pRc is set to an LSM error code ** before returning. ** ** If no error occurs and the checkpoint is successfully loaded, copy it to ** ShmHeader.aSnap1[] and ShmHeader.aSnap2[], and set ShmHeader.iMetaPage ** to indicate its origin. In this case return 1. Or, if the checkpoint ** cannot be loaded (because the checksum does not compute), return 0. */ static int ckptTryLoad(lsm_db *pDb, MetaPage *pPg, u32 iMeta, int *pRc){ int bLoaded = 0; /* Return value */ if( *pRc==LSM_OK ){ int rc = LSM_OK; /* Error code */ |
︙ | ︙ | |||
845 846 847 848 849 850 851 | aCkpt = (u32 *)lsmMallocRc(pDb->pEnv, nCkpt*sizeof(u32), &rc); } if( aCkpt ){ memcpy(aCkpt, aData, nCkpt*sizeof(u32)); ckptChangeEndianness(aCkpt, nCkpt); if( ckptChecksumOk(aCkpt) ){ ShmHeader *pShm = pDb->pShmhdr; | | | | 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 | aCkpt = (u32 *)lsmMallocRc(pDb->pEnv, nCkpt*sizeof(u32), &rc); } if( aCkpt ){ memcpy(aCkpt, aData, nCkpt*sizeof(u32)); ckptChangeEndianness(aCkpt, nCkpt); if( ckptChecksumOk(aCkpt) ){ ShmHeader *pShm = pDb->pShmhdr; memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32)); memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32)); memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32)); pShm->iMetaPage = iMeta; bLoaded = 1; } } lsmFree(pDb->pEnv, aCkpt); |
︙ | ︙ | |||
886 887 888 889 890 891 892 | ShmHeader *pShm = pDb->pShmhdr; aCkpt[CKPT_HDR_NCKPT] = nCkpt; aCkpt[CKPT_HDR_BLKSZ] = pDb->nDfltBlksz; aCkpt[CKPT_HDR_PGSZ] = pDb->nDfltPgsz; ckptChecksum(aCkpt, array_size(aCkpt), &aCkpt[nCkpt-2], &aCkpt[nCkpt-1]); | | | | | 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 | ShmHeader *pShm = pDb->pShmhdr; aCkpt[CKPT_HDR_NCKPT] = nCkpt; aCkpt[CKPT_HDR_BLKSZ] = pDb->nDfltBlksz; aCkpt[CKPT_HDR_PGSZ] = pDb->nDfltPgsz; ckptChecksum(aCkpt, array_size(aCkpt), &aCkpt[nCkpt-2], &aCkpt[nCkpt-1]); memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32)); memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32)); memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32)); } /* ** This function is called as part of database recovery to initialize the ** ShmHeader.aSnap1[] and ShmHeader.aSnap2[] snapshots. */ int lsmCheckpointRecover(lsm_db *pDb){ int rc = LSM_OK; /* Return Code */ i64 iId1; /* Id of checkpoint on meta-page 1 */ i64 iId2; /* Id of checkpoint on meta-page 2 */ int bLoaded = 0; /* True once checkpoint has been loaded */ int cmp; /* True if (iId2>iId1) */ |
︙ | ︙ | |||
953 954 955 956 957 958 959 | return rc; } /* ** Copy the current client snapshot from shared-memory to pDb->aSnapshot[]. */ | | > > | < < | > | | > > | > | > | < < < > | | < > | < < < | < < | > | | | | > > > > > > > > > > > | < | | > > > | | > | | 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 | return rc; } /* ** Copy the current client snapshot from shared-memory to pDb->aSnapshot[]. */ int lsmCheckpointLoad(lsm_db *pDb, int *piRead){ int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL; ShmHeader *pShm = pDb->pShmhdr; while( (nRem--)>0 ){ int nInt; nInt = pShm->aSnap1[CKPT_HDR_NCKPT]; if( nInt<=(LSM_META_PAGE_SIZE / sizeof(u32)) ){ memcpy(pDb->aSnapshot, pShm->aSnap1, nInt*sizeof(u32)); if( ckptChecksumOk(pDb->aSnapshot) ){ if( piRead ) *piRead = 1; return LSM_OK; } } nInt = pShm->aSnap2[CKPT_HDR_NCKPT]; if( nInt<=(LSM_META_PAGE_SIZE / sizeof(u32)) ){ memcpy(pDb->aSnapshot, pShm->aSnap2, nInt*sizeof(u32)); if( ckptChecksumOk(pDb->aSnapshot) ){ if( piRead ) *piRead = 2; return LSM_OK; } } lsmShmBarrier(pDb); } return LSM_PROTOCOL; } int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){ u32 *aShm; assert( iSnap==1 || iSnap==2 ); aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2; return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) ); } int lsmCheckpointLoadWorker(lsm_db *pDb){ int rc; ShmHeader *pShm = pDb->pShmhdr; int nInt1; int nInt2; /* Must be holding the WORKER lock to do this */ assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) ); /* Check that the two snapshots match. If not, repair them. */ nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT]; nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT]; if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){ if( ckptChecksumOk(pShm->aSnap1) ){ memcpy(pShm->aSnap2, pShm->aSnap1, sizeof(u32)*nInt1); }else if( ckptChecksumOk(pShm->aSnap2) ){ memcpy(pShm->aSnap1, pShm->aSnap2, sizeof(u32)*nInt2); }else{ return LSM_PROTOCOL; } } rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker); assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); return rc; } int lsmCheckpointDeserialize( lsm_db *pDb, int bInclFreelist, /* If true, deserialize free-list */ |
︙ | ︙ | |||
1106 1107 1108 1109 1110 1111 1112 | int rc; rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); | | | | 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 | int rc; rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); memcpy(pShm->aSnap2, p, n); lsmShmBarrier(pDb); memcpy(pShm->aSnap1, p, n); lsmFree(pDb->pEnv, p); return LSM_OK; } /* ** This function is used to determine the snapshot-id of the most recently |
︙ | ︙ | |||
1204 1205 1206 1207 1208 1209 1210 | } void lsmCheckpointZeroLogoffset(lsm_db *pDb){ u32 nCkpt; nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT]; assert( nCkpt>CKPT_HDR_NCKPT ); | | | | | | | 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 | } void lsmCheckpointZeroLogoffset(lsm_db *pDb){ u32 nCkpt; nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT]; assert( nCkpt>CKPT_HDR_NCKPT ); assert( nCkpt==pDb->pShmhdr->aSnap1[CKPT_HDR_NCKPT] ); assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap1, nCkpt*sizeof(u32)) ); assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap2, nCkpt*sizeof(u32)) ); pDb->aSnapshot[CKPT_HDR_LO_MSW] = 0; pDb->aSnapshot[CKPT_HDR_LO_LSW] = 0; ckptChecksum(pDb->aSnapshot, nCkpt, &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1] ); memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32)); memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32)); } |
Changes to src/lsm_log.c.
︙ | ︙ | |||
918 919 920 921 922 923 924 | rc = lsmFsOpenLog(pDb->pFS); if( rc!=LSM_OK ) return rc; rc = lsmTreeInit(pDb); if( rc!=LSM_OK ) return rc; pLog = &pDb->treehdr.log; | | | 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 | rc = lsmFsOpenLog(pDb->pFS); if( rc!=LSM_OK ) return rc; rc = lsmTreeInit(pDb); if( rc!=LSM_OK ) return rc; pLog = &pDb->treehdr.log; lsmCheckpointLogoffset(pDb->pShmhdr->aSnap2, pLog); logReaderInit(pDb, pLog, 1, &reader); lsmStringInit(&buf1, pDb->pEnv); lsmStringInit(&buf2, pDb->pEnv); /* The outer for() loop runs at most twice. The first iteration is to ** count the number of committed transactions in the log. The second |
︙ | ︙ |
Changes to src/lsm_main.c.
︙ | ︙ | |||
178 179 180 181 182 183 184 | rc = lsmDbDatabaseConnect(pDb, zFull); } /* Configure the file-system connection with the page-size and block-size ** of this database. Even if the database file is zero bytes in size ** on disk, these values have been set in shared-memory by now, and so are ** guaranteed not to change during the lifetime of this connection. */ | | | 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | rc = lsmDbDatabaseConnect(pDb, zFull); } /* Configure the file-system connection with the page-size and block-size ** of this database. Even if the database file is zero bytes in size ** on disk, these values have been set in shared-memory by now, and so are ** guaranteed not to change during the lifetime of this connection. */ if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb, 0)) ){ lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot)); lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot)); } lsmFree(pDb->pEnv, zFull); } |
︙ | ︙ | |||
226 227 228 229 230 231 232 | lsmFinishWork(pDb, 1, nOvfl, &rc); /* Restore the position of any open cursors */ if( rc==LSM_OK && pDb->pCsr ){ lsmFreeSnapshot(pDb->pEnv, pDb->pClient); pDb->pClient = 0; | | | 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 | lsmFinishWork(pDb, 1, nOvfl, &rc); /* Restore the position of any open cursors */ if( rc==LSM_OK && pDb->pCsr ){ lsmFreeSnapshot(pDb->pEnv, pDb->pClient); pDb->pClient = 0; rc = lsmCheckpointLoad(pDb, 0); if( rc==LSM_OK ){ rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient); } if( rc==LSM_OK ){ rc = lsmRestoreCursors(pDb); } } |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
170 171 172 173 174 175 176 | ** in-memory tree to disk and write a checkpoint. */ rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0); if( rc==LSM_OK ){ /* Flush the in-memory tree, if required. If there is data to flush, ** this will create a new client snapshot in Database.pClient. The ** checkpoint (serialization) of this snapshot may be written to disk ** by the following block. */ | | | 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | ** in-memory tree to disk and write a checkpoint. */ rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0); if( rc==LSM_OK ){ /* Flush the in-memory tree, if required. If there is data to flush, ** this will create a new client snapshot in Database.pClient. The ** checkpoint (serialization) of this snapshot may be written to disk ** by the following block. */ rc = lsmTreeLoadHeader(pDb, 0); if( rc==LSM_OK && lsmTreeSize(pDb)>0 ){ rc = lsmFlushToDisk(pDb); } /* Write a checkpoint to disk. */ if( rc==LSM_OK ){ rc = lsmCheckpointWrite(pDb); |
︙ | ︙ | |||
528 529 530 531 532 533 534 | assert( pDb->pWorker==0 ); assert( 1 || pDb->pClient==0 ); assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) ); rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0); if( rc!=LSM_OK ) return rc; | | | 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 | assert( pDb->pWorker==0 ); assert( 1 || pDb->pClient==0 ); assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) ); rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0); if( rc!=LSM_OK ) return rc; rc = lsmCheckpointLoad(pDb, 0); if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; int bDone = 0; /* True if checkpoint is already stored */ /* Check if this checkpoint has already been written to the database ** file. If so, set variable bDone to true. */ if( pShm->iMetaPage ){ |
︙ | ︙ | |||
629 630 631 632 633 634 635 636 637 638 | int rc = LSM_OK; /* Return code */ int iAttempt = 0; assert( pDb->pWorker==0 ); assert( (pDb->pClient!=0)==(pDb->iReader>=0) ); while( rc==LSM_OK && pDb->pClient==0 && (iAttempt++)<MAX_READLOCK_ATTEMPTS ){ assert( pDb->pCsr==0 && pDb->nTransOpen==0 ); /* Load the in-memory tree header. */ | > > | | < < > | | 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 | int rc = LSM_OK; /* Return code */ int iAttempt = 0; assert( pDb->pWorker==0 ); assert( (pDb->pClient!=0)==(pDb->iReader>=0) ); while( rc==LSM_OK && pDb->pClient==0 && (iAttempt++)<MAX_READLOCK_ATTEMPTS ){ int iTreehdr = 0; int iSnap = 0; assert( pDb->pCsr==0 && pDb->nTransOpen==0 ); /* Load the in-memory tree header. */ rc = lsmTreeLoadHeader(pDb, &iTreehdr); /* Load the database snapshot */ if( rc==LSM_OK ){ rc = lsmCheckpointLoad(pDb, &iSnap); } /* Take a read-lock on the tree and snapshot just loaded. Then check ** that the shared-memory still contains the same values. If so, proceed. ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; u32 iShmMax = pDb->treehdr.iUsedShmid; u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk; rc = lsmReadlock(pDb, iSnap, iShmMin, iShmMax); if( rc==LSM_OK ){ if( lsmTreeLoadHeaderOk(pDb, iTreehdr) && lsmCheckpointLoadOk(pDb, iSnap) ){ /* Read lock has been successfully obtained. Deserialize the ** checkpoint just loaded. TODO: This will be removed after ** lsm_sorted.c is changed to work directly from the serialized ** version of the snapshot. */ rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient); assert( (rc==LSM_OK)==(pDb->pClient!=0) ); |
︙ | ︙ |
Changes to src/lsm_tree.c.
︙ | ︙ | |||
1790 1791 1792 1793 1794 1795 1796 | pDb->treehdr.iWrite = pMark->iWrite; pDb->treehdr.nChunk = pMark->nChunk; pDb->treehdr.iNextShmid = pMark->iNextShmid; } /* ** Load the in-memory tree header from shared-memory into pDb->treehdr. | | > > > > | > | | | < < < < | < < | | < < | > | | | > > | | | > > > > | 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 | pDb->treehdr.iWrite = pMark->iWrite; pDb->treehdr.nChunk = pMark->nChunk; pDb->treehdr.iNextShmid = pMark->iNextShmid; } /* ** Load the in-memory tree header from shared-memory into pDb->treehdr. ** If the header cannot be loaded, return LSM_PROTOCOL. ** ** If the header is successfully loaded and parameter piRead is not NULL, ** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if ** the header was loaded from ShmHeader.hdr2. */ int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){ int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL; while( (nRem--)>0 ){ int rc; ShmHeader *pShm = pDb->pShmhdr; memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader)); if( treeHeaderChecksumOk(&pDb->treehdr) ){ if( piRead ) *piRead = 1; return LSM_OK; } memcpy(&pDb->treehdr, &pShm->hdr2, sizeof(TreeHeader)); if( treeHeaderChecksumOk(&pDb->treehdr) ){ if( piRead ) *piRead = 2; return LSM_OK; } lsmShmBarrier(pDb); } return LSM_PROTOCOL; } int lsmTreeLoadHeaderOk(lsm_db *pDb, int iRead){ TreeHeader *p = (iRead==1) ? &pDb->pShmhdr->hdr1 : &pDb->pShmhdr->hdr2; assert( iRead==1 || iRead==2 ); return (0==memcmp(pDb->treehdr.aCksum, p->aCksum, sizeof(u32)*2)); } /* ** This function is called to conclude a transaction. If argument bCommit ** is true, the transaction is committed. Otherwise it is rolled back. */ int lsmTreeEndTransaction(lsm_db *pDb, int bCommit){ |
︙ | ︙ |