Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Instead of locking the CHECKPOINTER byte, have read-only connections take a SHARED lock on the ROTRANS byte when reading from a non-live db. Read-write connections may not recycle space within either the database or log files while such a lock is held, but may perform checkpoint operations. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | read-only-clients |
Files: | files | file ages | folders |
SHA1: |
3b2a50c089542e624f100670a9962c5e |
User & Date: | dan 2013-02-20 16:03:02.059 |
Context
2013-02-20
| ||
17:54 | Add a test to verify that an lsm_close() that disconnects the last connection to a database flushes the in-memory tree regardless of the multi-process or use-log settings. Leaf check-in: 723d5f2f52 user: dan tags: read-only-clients | |
16:03 | Instead of locking the CHECKPOINTER byte, have read-only connections take a SHARED lock on the ROTRANS byte when reading from a non-live db. Read-write connections may not recycle space within either the database or log files while such a lock is held, but may perform checkpoint operations. check-in: 3b2a50c089 user: dan tags: read-only-clients | |
2013-02-19
| ||
20:16 | Add a test case for a read-only transaction outlasting an entire read-write session. And a fix. check-in: 3f53258219 user: dan tags: read-only-clients | |
Changes
Changes to src/lsmInt.h.
︙ | ︙ | |||
139 140 141 142 143 144 145 | /* Lock definitions. */ #define LSM_LOCK_DMS1 1 /* Serialize connect/disconnect ops */ #define LSM_LOCK_DMS2 2 /* Read-write connections */ #define LSM_LOCK_DMS3 3 /* Read-only connections */ #define LSM_LOCK_WRITER 4 #define LSM_LOCK_WORKER 5 #define LSM_LOCK_CHECKPOINTER 6 | > | | 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | /* Lock definitions. */ #define LSM_LOCK_DMS1 1 /* Serialize connect/disconnect ops */ #define LSM_LOCK_DMS2 2 /* Read-write connections */ #define LSM_LOCK_DMS3 3 /* Read-only connections */ #define LSM_LOCK_WRITER 4 #define LSM_LOCK_WORKER 5 #define LSM_LOCK_CHECKPOINTER 6 #define LSM_LOCK_ROTRANS 7 #define LSM_LOCK_READER(i) ((i) + LSM_LOCK_ROTRANS + 1) #define LSM_LOCK_RWCLIENT(i) ((i) + LSM_LOCK_READER(LSM_LOCK_NREADER)) /* ** Hard limit on the number of free-list entries that may be stored in ** a checkpoint (the remainder are stored as a system record in the LSM). ** See also LSM_CONFIG_MAX_FREELIST. */ |
︙ | ︙ | |||
553 554 555 556 557 558 559 | u32 nWrite; /* Total number of pages written to disk */ }; #define LSM_INITIAL_SNAPSHOT_ID 11 /* ** Functions from file "lsm_ckpt.c". */ | | | 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 | u32 nWrite; /* Total number of pages written to disk */ }; #define LSM_INITIAL_SNAPSHOT_ID 11 /* ** Functions from file "lsm_ckpt.c". */ int lsmCheckpointWrite(lsm_db *, int, u32 *); int lsmCheckpointLevels(lsm_db *, int, void **, int *); int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal); int lsmCheckpointRecover(lsm_db *); int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **); int lsmCheckpointLoadWorker(lsm_db *pDb); |
︙ | ︙ | |||
845 846 847 848 849 850 851 852 853 854 855 856 857 858 | int lsmDbDatabaseConnect(lsm_db*, const char *); void lsmDbDatabaseRelease(lsm_db *); int lsmBeginReadTrans(lsm_db *); int lsmBeginWriteTrans(lsm_db *); int lsmBeginFlush(lsm_db *); int lsmBeginWork(lsm_db *); void lsmFinishWork(lsm_db *, int, int *); int lsmFinishRecovery(lsm_db *); void lsmFinishReadTrans(lsm_db *); int lsmFinishWriteTrans(lsm_db *, int); int lsmFinishFlush(lsm_db *, int); | > > | 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 | int lsmDbDatabaseConnect(lsm_db*, const char *); void lsmDbDatabaseRelease(lsm_db *); int lsmBeginReadTrans(lsm_db *); int lsmBeginWriteTrans(lsm_db *); int lsmBeginFlush(lsm_db *); int lsmDetectRoTrans(lsm_db *db, int *); int lsmBeginWork(lsm_db *); void lsmFinishWork(lsm_db *, int, int *); int lsmFinishRecovery(lsm_db *); void lsmFinishReadTrans(lsm_db *); int lsmFinishWriteTrans(lsm_db *, int); int lsmFinishFlush(lsm_db *, int); |
︙ | ︙ | |||
891 892 893 894 895 896 897 898 899 900 901 902 903 904 | /* Candidate values for the 3rd argument to lsmShmLock() */ #define LSM_LOCK_UNLOCK 0 #define LSM_LOCK_SHARED 1 #define LSM_LOCK_EXCL 2 int lsmShmCacheChunks(lsm_db *db, int nChunk); int lsmShmLock(lsm_db *db, int iLock, int eOp, int bBlock); void lsmShmBarrier(lsm_db *db); #ifdef LSM_DEBUG void lsmShmHasLock(lsm_db *db, int iLock, int eOp); #else # define lsmShmHasLock(x,y,z) #endif | > | 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 | /* Candidate values for the 3rd argument to lsmShmLock() */ #define LSM_LOCK_UNLOCK 0 #define LSM_LOCK_SHARED 1 #define LSM_LOCK_EXCL 2 int lsmShmCacheChunks(lsm_db *db, int nChunk); int lsmShmLock(lsm_db *db, int iLock, int eOp, int bBlock); int lsmShmTestLock(lsm_db *db, int iLock, int nLock, int eOp); void lsmShmBarrier(lsm_db *db); #ifdef LSM_DEBUG void lsmShmHasLock(lsm_db *db, int iLock, int eOp); #else # define lsmShmHasLock(x,y,z) #endif |
︙ | ︙ |
Changes to src/lsm_log.c.
︙ | ︙ | |||
300 301 302 303 304 305 306 | /* ** If possible, reclaim log file space. Log file space is reclaimed after ** a snapshot that points to the same data in the database file is synced ** into the db header. */ static int logReclaimSpace(lsm_db *pDb){ | | > > > > > > | 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 | /* ** If possible, reclaim log file space. Log file space is reclaimed after ** a snapshot that points to the same data in the database file is synced ** into the db header. */ static int logReclaimSpace(lsm_db *pDb){ int rc; int iMeta; int bRotrans; /* True if there exists some ro-trans */ /* Test if there exists some other connection with a read-only transaction ** open. If there does, then log file space may not be reclaimed. */ rc = lsmDetectRoTrans(pDb, &bRotrans); if( rc!=LSM_OK || bRotrans ) return rc; iMeta = (int)pDb->pShmhdr->iMetaPage; if( iMeta==1 || iMeta==2 ){ DbLog *pLog = &pDb->treehdr.log; i64 iSyncedId; /* Read the snapshot-id of the snapshot stored on meta-page iMeta. Note |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
284 285 286 287 288 289 290 | bReadonly = 1; rc = LSM_OK; } } /* Write a checkpoint to disk. */ if( rc==LSM_OK ){ | | > > > > > > > > > > > | | 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 | bReadonly = 1; rc = LSM_OK; } } /* Write a checkpoint to disk. */ if( rc==LSM_OK ){ rc = lsmCheckpointWrite(pDb, (bReadonly==0), 0); } /* If the checkpoint was written successfully, delete the log file ** and, if possible, truncate the database file. */ if( rc==LSM_OK ){ int bRotrans = 0; Database *p = pDb->pDatabase; /* The log file may only be deleted if there are no clients ** read-only clients running rotrans transactions. */ rc = lsmDetectRoTrans(pDb, &bRotrans); if( rc==LSM_OK && bRotrans==0 ){ lsmFsCloseAndDeleteLog(pDb->pFS); } /* The database may only be truncated if there exist no read-only ** clients - either connected or running rotrans transactions. */ if( bReadonly==0 && bRotrans==0 ){ dbTruncateFile(pDb); if( p->pFile && p->bMultiProc ){ lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1); } } } } |
︙ | ︙ | |||
790 791 792 793 794 795 796 | lsmLogMessage(pDb, 0, "lsmBlockAllocate(): " "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)", iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0) ); } #endif | > > | > > > > > > > > > > > | > > | 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 | lsmLogMessage(pDb, 0, "lsmBlockAllocate(): " "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)", iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0) ); } #endif /* Unless there exists a read-only transaction (which prevents us from ** recycling any blocks regardless, query the free block list for a ** suitable block to reuse. ** ** It might seem more natural to check for a read-only transaction at ** the start of this function. However, it is better do wait until after ** the call to lsmCheckpointSynced() to do so. */ if( rc==LSM_OK ){ int bRotrans; rc = lsmDetectRoTrans(pDb, &bRotrans); if( rc==LSM_OK && bRotrans==0 ){ rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet); } } if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){ iRet = 0; }else if( rc==LSM_OK ){ /* If a block was found in the free block list, use it and remove it from ** the list. Otherwise, if no suitable block was found, allocate one from |
︙ | ︙ | |||
870 871 872 873 874 875 876 | ** database itself. ** ** The WORKER lock must not be held when this is called. This is because ** this function may indirectly call fsync(). And the WORKER lock should ** not be held that long (in case it is required by a client flushing an ** in-memory tree to disk). */ | | | 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 | ** database itself. ** ** The WORKER lock must not be held when this is called. This is because ** this function may indirectly call fsync(). And the WORKER lock should ** not be held that long (in case it is required by a client flushing an ** in-memory tree to disk). */ int lsmCheckpointWrite(lsm_db *pDb, int bTruncate, u32 *pnWrite){ int rc; /* Return Code */ u32 nWrite = 0; assert( pDb->pWorker==0 ); assert( 1 || pDb->pClient==0 ); assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) ); |
︙ | ︙ | |||
929 930 931 932 933 934 935 | ); #endif } if( rc==LSM_OK && bTruncate ){ rc = lsmFsTruncateDb(pDb->pFS, (i64)nBlock*lsmFsBlockSize(pDb->pFS)); } | < < < | 955 956 957 958 959 960 961 962 963 964 965 966 967 968 | ); #endif } if( rc==LSM_OK && bTruncate ){ rc = lsmFsTruncateDb(pDb->pFS, (i64)nBlock*lsmFsBlockSize(pDb->pFS)); } } lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0); if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite; return rc; } |
︙ | ︙ | |||
1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 | } if( rc!=LSM_OK ){ dbReleaseReadlock(pDb); } if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY; return rc; } /* ** db is a read-only database handle in the disconnected state. This function ** attempts to open a read-transaction on the database. This may involve ** connecting to the database system (opening shared memory etc.). */ int lsmBeginRoTrans(lsm_db *db){ | > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 | } if( rc!=LSM_OK ){ dbReleaseReadlock(pDb); } if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY; return rc; } /* ** This function is used by a read-write connection to determine if there ** are currently one or more read-only transactions open on the database ** (in this context a read-only transaction is one opened by a read-only ** connection on a non-live database). ** ** If no error occurs, LSM_OK is returned and *pbExists is set to true if ** some other connection has a read-only transaction open, or false ** otherwise. If an error occurs an LSM error code is returned and the final ** value of *pbExist is undefined. */ int lsmDetectRoTrans(lsm_db *db, int *pbExist){ int rc; /* Only a read-write connection may use this function. */ assert( db->bReadonly==0 ); rc = lsmShmTestLock(db, LSM_LOCK_ROTRANS, 1, LSM_LOCK_EXCL); if( rc==LSM_BUSY ){ *pbExist = 1; rc = LSM_OK; }else{ *pbExist = 0; } return rc; } /* ** db is a read-only database handle in the disconnected state. This function ** attempts to open a read-transaction on the database. This may involve ** connecting to the database system (opening shared memory etc.). */ int lsmBeginRoTrans(lsm_db *db){ |
︙ | ︙ | |||
1210 1211 1212 1213 1214 1215 1216 | rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0); if( rc!=LSM_OK ) return rc; rc = lsmShmTestLock( db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED ); if( rc==LSM_OK ){ | | > > > | | 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 | rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0); if( rc!=LSM_OK ) return rc; rc = lsmShmTestLock( db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED ); if( rc==LSM_OK ){ /* System is not live. Take a SHARED lock on the ROTRANS byte and ** release DMS1. Locking ROTRANS tells all read-write clients that they ** may not recycle any disk space from within the database or log files, ** as a read-only client may be using it. */ rc = lsmShmLock(db, LSM_LOCK_ROTRANS, LSM_LOCK_SHARED, 0); lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0); if( rc==LSM_OK ){ db->bRoTrans = 1; rc = lsmShmCacheChunks(db, 1); if( rc==LSM_OK ){ db->pShmhdr = (ShmHeader *)db->apShm[0]; |
︙ | ︙ | |||
1268 1269 1270 1271 1272 1273 1274 | lsmFree(pDb->pEnv, pDb->apShm[i]); } lsmFree(pDb->pEnv, pDb->apShm); pDb->apShm = 0; pDb->nShm = 0; pDb->pShmhdr = 0; | | | 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 | lsmFree(pDb->pEnv, pDb->apShm[i]); } lsmFree(pDb->pEnv, pDb->apShm); pDb->apShm = 0; pDb->nShm = 0; pDb->pShmhdr = 0; lsmShmLock(pDb, LSM_LOCK_ROTRANS, LSM_LOCK_UNLOCK, 0); } dbReleaseReadlock(pDb); } /* ** Open a write transaction. */ |
︙ | ︙ | |||
1883 1884 1885 1886 1887 1888 1889 | int lsm_checkpoint(lsm_db *pDb, int *pnKB){ int rc; /* Return code */ u32 nWrite = 0; /* Number of pages checkpointed */ /* Attempt the checkpoint. If successful, nWrite is set to the number of ** pages written between this and the previous checkpoint. */ | | | 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 | int lsm_checkpoint(lsm_db *pDb, int *pnKB){ int rc; /* Return code */ u32 nWrite = 0; /* Number of pages checkpointed */ /* Attempt the checkpoint. If successful, nWrite is set to the number of ** pages written between this and the previous checkpoint. */ rc = lsmCheckpointWrite(pDb, 0, &nWrite); /* If required, calculate the output variable (KB of data checkpointed). ** Set it to zero if an error occured. */ if( pnKB ){ int nKB = 0; if( rc==LSM_OK && nWrite ){ nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024; |
︙ | ︙ |
Changes to src/lsm_tree.c.
︙ | ︙ | |||
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 | ** is initialized here - it will be copied into shared memory if log file ** recovery is successful. */ int lsmTreeInit(lsm_db *pDb){ ShmChunk *pOne; int rc = LSM_OK; pDb->treehdr.root.iTransId = 1; pDb->treehdr.iFirst = 1; pDb->treehdr.nChunk = 2; pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR; pDb->treehdr.iNextShmid = 2; pDb->treehdr.iUsedShmid = 1; | > | 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 | ** is initialized here - it will be copied into shared memory if log file ** recovery is successful. */ int lsmTreeInit(lsm_db *pDb){ ShmChunk *pOne; int rc = LSM_OK; memset(&pDb->treehdr, 0, sizeof(TreeHeader)); pDb->treehdr.root.iTransId = 1; pDb->treehdr.iFirst = 1; pDb->treehdr.nChunk = 2; pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR; pDb->treehdr.iNextShmid = 2; pDb->treehdr.iUsedShmid = 1; |
︙ | ︙ |
Changes to test/lsm4.test.
︙ | ︙ | |||
113 114 115 116 117 118 119 120 121 122 | db info compression_id } $compression_id(rle) do_test 2.7 { db config {set_compression rle} list [db_fetch db 3] [db_fetch db 4] } {three four} finish_test | > > > > > > > > > > | 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | db info compression_id } $compression_id(rle) do_test 2.7 { db config {set_compression rle} list [db_fetch db 3] [db_fetch db 4] } {three four} #------------------------------------------------------------------------- # catch {db close} forcedelete test.db do_test 3.1 { lsm_open db test.db db_fetch db abc } {} finish_test |
Changes to test/lsm5.test.
︙ | ︙ | |||
25 26 27 28 29 30 31 | set ret } # Create a new database with file name $file. # proc create_abc_db {file} { forcedelete $file | | | 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | set ret } # Create a new database with file name $file. # proc create_abc_db {file} { forcedelete $file lsm_open db $file {block_size 256} db write a alpha db write b bravo db write c charlie db close } proc create_abc_log {file} { |
︙ | ︙ | |||
189 190 191 192 193 194 195 | do_test 4.2 { lsm_open db test.db {readonly 1} db csr_open T list [db_fetch db a] [db_fetch db b] [db_fetch db c] } {alpha bravo charlie} do_test 4.3 { | | > > | > > > > > > > > | 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | do_test 4.2 { lsm_open db test.db {readonly 1} db csr_open T list [db_fetch db a] [db_fetch db b] [db_fetch db c] } {alpha bravo charlie} do_test 4.3 { lsm_open db_rw test.db {block_size 64} db_rw write b BRAVO db_rw close list [file size test.db] [file size test.db-log] } {65536 74} do_test 4.4 { list [db_fetch db a] [db_fetch db b] [db_fetch db c] } {alpha bravo charlie} do_test 4.5 { T close list [db_fetch db a] [db_fetch db b] [db_fetch db c] } {alpha BRAVO charlie} finish_test |