Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -141,11 +141,12 @@ #define LSM_LOCK_DMS2 2 /* Read-write connections */ #define LSM_LOCK_DMS3 3 /* Read-only connections */ #define LSM_LOCK_WRITER 4 #define LSM_LOCK_WORKER 5 #define LSM_LOCK_CHECKPOINTER 6 -#define LSM_LOCK_READER(i) ((i) + LSM_LOCK_CHECKPOINTER + 1) +#define LSM_LOCK_ROTRANS 7 +#define LSM_LOCK_READER(i) ((i) + LSM_LOCK_ROTRANS + 1) #define LSM_LOCK_RWCLIENT(i) ((i) + LSM_LOCK_READER(LSM_LOCK_NREADER)) /* ** Hard limit on the number of free-list entries that may be stored in ** a checkpoint (the remainder are stored as a system record in the LSM). @@ -555,11 +556,11 @@ #define LSM_INITIAL_SNAPSHOT_ID 11 /* ** Functions from file "lsm_ckpt.c". */ -int lsmCheckpointWrite(lsm_db *, int, int, u32 *); +int lsmCheckpointWrite(lsm_db *, int, u32 *); int lsmCheckpointLevels(lsm_db *, int, void **, int *); int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal); int lsmCheckpointRecover(lsm_db *); int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **); @@ -847,10 +848,12 @@ int lsmBeginReadTrans(lsm_db *); int lsmBeginWriteTrans(lsm_db *); int lsmBeginFlush(lsm_db *); +int lsmDetectRoTrans(lsm_db *db, int *); + int lsmBeginWork(lsm_db *); void lsmFinishWork(lsm_db *, int, int *); int lsmFinishRecovery(lsm_db *); void lsmFinishReadTrans(lsm_db *); @@ -893,10 +896,11 @@ #define LSM_LOCK_SHARED 1 #define LSM_LOCK_EXCL 2 int lsmShmCacheChunks(lsm_db *db, int nChunk); int lsmShmLock(lsm_db *db, int iLock, int eOp, int bBlock); +int lsmShmTestLock(lsm_db *db, int iLock, int nLock, int eOp); void lsmShmBarrier(lsm_db *db); #ifdef LSM_DEBUG void lsmShmHasLock(lsm_db *db, int iLock, int eOp); #else Index: src/lsm_log.c ================================================================== --- src/lsm_log.c +++ src/lsm_log.c @@ -302,12 +302,18 @@ ** If possible, reclaim log file space. Log file space is reclaimed after ** a snapshot that points to the same data in the database file is synced ** into the db header. */ static int logReclaimSpace(lsm_db *pDb){ - int rc = LSM_OK; + int rc; int iMeta; + int bRotrans; /* True if there exists some ro-trans */ + + /* Test if there exists some other connection with a read-only transaction + ** open. If there does, then log file space may not be reclaimed. */ + rc = lsmDetectRoTrans(pDb, &bRotrans); + if( rc!=LSM_OK || bRotrans ) return rc; iMeta = (int)pDb->pShmhdr->iMetaPage; if( iMeta==1 || iMeta==2 ){ DbLog *pLog = &pDb->treehdr.log; i64 iSyncedId; @@ -1056,11 +1062,11 @@ assert( pLog->aRegion[1].iStart==0 ); pLog->aRegion[1].iEnd = reader.iOff; }else{ assert( pLog->aRegion[0].iStart==0 ); pLog->aRegion[0].iStart = pLog->aRegion[2].iStart; - pLog->aRegion[0].iEnd = reader.iOff - reader.buf.n+reader.iBuf; + pLog->aRegion[0].iEnd = reader.iOff-reader.buf.n+reader.iBuf; } pLog->aRegion[2].iStart = iOff; }else{ if( (nJump++)==2 ){ bEof = 1; Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -286,18 +286,29 @@ } } /* Write a checkpoint to disk. */ if( rc==LSM_OK ){ - rc = lsmCheckpointWrite(pDb, (bReadonly==0), 1, 0); + rc = lsmCheckpointWrite(pDb, (bReadonly==0), 0); } /* If the checkpoint was written successfully, delete the log file ** and, if possible, truncate the database file. */ if( rc==LSM_OK ){ + int bRotrans = 0; Database *p = pDb->pDatabase; - if( bReadonly==0 ){ + + /* The log file may only be deleted if there are no clients + ** read-only clients running rotrans transactions. */ + rc = lsmDetectRoTrans(pDb, &bRotrans); + if( rc==LSM_OK && bRotrans==0 ){ + lsmFsCloseAndDeleteLog(pDb->pFS); + } + + /* The database may only be truncated if there exist no read-only + ** clients - either connected or running rotrans transactions. */ + if( bReadonly==0 && bRotrans==0 ){ dbTruncateFile(pDb); if( p->pFile && p->bMultiProc ){ lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1); } } @@ -792,12 +803,27 @@ iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0) ); } #endif - /* Query the free block list for a suitable block */ - if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet); + + /* Unless there exists a read-only transaction (which prevents us from + ** recycling any blocks regardless, query the free block list for a + ** suitable block to reuse. + ** + ** It might seem more natural to check for a read-only transaction at + ** the start of this function. However, it is better do wait until after + ** the call to lsmCheckpointSynced() to do so. + */ + if( rc==LSM_OK ){ + int bRotrans; + rc = lsmDetectRoTrans(pDb, &bRotrans); + + if( rc==LSM_OK && bRotrans==0 ){ + rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet); + } + } if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){ iRet = 0; }else if( rc==LSM_OK ){ @@ -872,11 +898,11 @@ ** The WORKER lock must not be held when this is called. This is because ** this function may indirectly call fsync(). And the WORKER lock should ** not be held that long (in case it is required by a client flushing an ** in-memory tree to disk). */ -int lsmCheckpointWrite(lsm_db *pDb, int bTruncate, int bDellog, u32 *pnWrite){ +int lsmCheckpointWrite(lsm_db *pDb, int bTruncate, u32 *pnWrite){ int rc; /* Return Code */ u32 nWrite = 0; assert( pDb->pWorker==0 ); assert( 1 || pDb->pClient==0 ); @@ -931,13 +957,10 @@ } if( rc==LSM_OK && bTruncate ){ rc = lsmFsTruncateDb(pDb->pFS, (i64)nBlock*lsmFsBlockSize(pDb->pFS)); } - if( rc==LSM_OK && bDellog ){ - lsmFsCloseAndDeleteLog(pDb->pFS); - } } lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0); if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite; return rc; @@ -1190,10 +1213,38 @@ dbReleaseReadlock(pDb); } if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY; return rc; } + +/* +** This function is used by a read-write connection to determine if there +** are currently one or more read-only transactions open on the database +** (in this context a read-only transaction is one opened by a read-only +** connection on a non-live database). +** +** If no error occurs, LSM_OK is returned and *pbExists is set to true if +** some other connection has a read-only transaction open, or false +** otherwise. If an error occurs an LSM error code is returned and the final +** value of *pbExist is undefined. +*/ +int lsmDetectRoTrans(lsm_db *db, int *pbExist){ + int rc; + + /* Only a read-write connection may use this function. */ + assert( db->bReadonly==0 ); + + rc = lsmShmTestLock(db, LSM_LOCK_ROTRANS, 1, LSM_LOCK_EXCL); + if( rc==LSM_BUSY ){ + *pbExist = 1; + rc = LSM_OK; + }else{ + *pbExist = 0; + } + + return rc; +} /* ** db is a read-only database handle in the disconnected state. This function ** attempts to open a read-transaction on the database. This may involve ** connecting to the database system (opening shared memory etc.). @@ -1212,12 +1263,15 @@ rc = lsmShmTestLock( db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED ); if( rc==LSM_OK ){ - /* System is not live */ - rc = lsmShmLock(db, LSM_LOCK_CHECKPOINTER, LSM_LOCK_SHARED, 0); + /* System is not live. Take a SHARED lock on the ROTRANS byte and + ** release DMS1. Locking ROTRANS tells all read-write clients that they + ** may not recycle any disk space from within the database or log files, + ** as a read-only client may be using it. */ + rc = lsmShmLock(db, LSM_LOCK_ROTRANS, LSM_LOCK_SHARED, 0); lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0); if( rc==LSM_OK ){ db->bRoTrans = 1; rc = lsmShmCacheChunks(db, 1); @@ -1270,11 +1324,11 @@ lsmFree(pDb->pEnv, pDb->apShm); pDb->apShm = 0; pDb->nShm = 0; pDb->pShmhdr = 0; - lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0); + lsmShmLock(pDb, LSM_LOCK_ROTRANS, LSM_LOCK_UNLOCK, 0); } dbReleaseReadlock(pDb); } /* @@ -1885,11 +1939,11 @@ int rc; /* Return code */ u32 nWrite = 0; /* Number of pages checkpointed */ /* Attempt the checkpoint. If successful, nWrite is set to the number of ** pages written between this and the previous checkpoint. */ - rc = lsmCheckpointWrite(pDb, 0, 0, &nWrite); + rc = lsmCheckpointWrite(pDb, 0, &nWrite); /* If required, calculate the output variable (KB of data checkpointed). ** Set it to zero if an error occured. */ if( pnKB ){ int nKB = 0; Index: src/lsm_tree.c ================================================================== --- src/lsm_tree.c +++ src/lsm_tree.c @@ -1081,10 +1081,11 @@ */ int lsmTreeInit(lsm_db *pDb){ ShmChunk *pOne; int rc = LSM_OK; + memset(&pDb->treehdr, 0, sizeof(TreeHeader)); pDb->treehdr.root.iTransId = 1; pDb->treehdr.iFirst = 1; pDb->treehdr.nChunk = 2; pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR; pDb->treehdr.iNextShmid = 2; Index: test/lsm4.test ================================================================== --- test/lsm4.test +++ test/lsm4.test @@ -115,8 +115,18 @@ do_test 2.7 { db config {set_compression rle} list [db_fetch db 3] [db_fetch db 4] } {three four} + +#------------------------------------------------------------------------- +# +catch {db close} +forcedelete test.db + +do_test 3.1 { + lsm_open db test.db + db_fetch db abc +} {} finish_test Index: test/lsm5.test ================================================================== --- test/lsm5.test +++ test/lsm5.test @@ -27,11 +27,11 @@ # Create a new database with file name $file. # proc create_abc_db {file} { forcedelete $file - lsm_open db $file + lsm_open db $file {block_size 256} db write a alpha db write b bravo db write c charlie db close } @@ -191,12 +191,22 @@ db csr_open T list [db_fetch db a] [db_fetch db b] [db_fetch db c] } {alpha bravo charlie} do_test 4.3 { - lsm_open db_rw test.db + lsm_open db_rw test.db {block_size 64} + db_rw write b BRAVO db_rw close list [file size test.db] [file size test.db-log] -} {0 56} +} {65536 74} + +do_test 4.4 { + list [db_fetch db a] [db_fetch db b] [db_fetch db c] +} {alpha bravo charlie} + +do_test 4.5 { + T close + list [db_fetch db a] [db_fetch db b] [db_fetch db c] +} {alpha BRAVO charlie} finish_test