Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -130,17 +130,22 @@ #define LSM_SHM_CHUNK_HDR (sizeof(ShmChunk)) /* The number of available read locks. */ #define LSM_LOCK_NREADER 6 +/* The number of available read-write client locks. */ +#define LSM_LOCK_NRWCLIENT 16 + /* Lock definitions */ #define LSM_LOCK_DMS1 1 #define LSM_LOCK_DMS2 2 -#define LSM_LOCK_WRITER 3 -#define LSM_LOCK_WORKER 4 -#define LSM_LOCK_CHECKPOINTER 5 +#define LSM_LOCK_DMS3 3 +#define LSM_LOCK_WRITER 4 +#define LSM_LOCK_WORKER 5 +#define LSM_LOCK_CHECKPOINTER 6 #define LSM_LOCK_READER(i) ((i) + LSM_LOCK_CHECKPOINTER + 1) +#define LSM_LOCK_RWCLIENT(i) ((i) + LSM_LOCK_READER(LSM_LOCK_NREADER)) /* ** Hard limit on the number of free-list entries that may be stored in ** a checkpoint (the remainder are stored as a system record in the LSM). ** See also LSM_CONFIG_MAX_FREELIST. @@ -282,18 +287,18 @@ ** Database handle structure. ** ** mLock: ** A bitmask representing the locks currently held by the connection. ** An LSM database supports N distinct locks, where N is some number less -** than or equal to 16. Locks are numbered starting from 1 (see the +** than or equal to 32. Locks are numbered starting from 1 (see the ** definitions for LSM_LOCK_WRITER and co.). ** -** The least significant 16-bits in mLock represent EXCLUSIVE locks. The +** The least significant 32-bits in mLock represent EXCLUSIVE locks. The ** most significant are SHARED locks. So, if a connection holds a SHARED ** lock on lock region iLock, then the following is true: ** -** (mLock & ((iLock+16-1) << 1)) +** (mLock & ((iLock+32-1) << 1)) ** ** Or for an EXCLUSIVE lock: ** ** (mLock & ((iLock-1) << 1)) */ @@ -319,10 +324,11 @@ lsm_compress_factory factory; /* Compression callback factory */ /* Sub-system handles */ FileSystem *pFS; /* On-disk portion of database */ Database *pDatabase; /* Database shared data */ + int iRwclient; /* Read-write client lock held (-1 == none) */ /* Client transaction context */ Snapshot *pClient; /* Client snapshot */ int iReader; /* Read lock held (-1 == unlocked) */ MultiCursor *pCsr; /* List of all open cursors */ @@ -347,11 +353,11 @@ /* Work done notification callback */ void (*xWork)(lsm_db *, void *); void *pWorkCtx; - u32 mLock; /* Mask of current locks. See lsmShmLock(). */ + u64 mLock; /* Mask of current locks. See lsmShmLock(). */ lsm_db *pNext; /* Next connection to same database */ int nShm; /* Size of apShm[] array */ void **apShm; /* Shared memory chunks */ ShmHeader *pShmhdr; /* Live shared-memory header */ Index: src/lsm_main.c ================================================================== --- src/lsm_main.c +++ src/lsm_main.c @@ -91,10 +91,11 @@ pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE; pDb->nMerge = LSM_DFLT_AUTOMERGE; pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES; pDb->bUseLog = LSM_DFLT_USE_LOG; pDb->iReader = -1; + pDb->iRwclient = -1; pDb->bMultiProc = LSM_DFLT_MULTIPLE_PROCESSES; pDb->bMmap = LSM_DFLT_MMAP; pDb->xLog = xLog; pDb->compress.iId = LSM_COMPRESSION_NONE; return LSM_OK; @@ -190,13 +191,15 @@ if( pDb->pCsr || pDb->nTransOpen ){ rc = LSM_MISUSE_BKPT; }else{ lsmFreeSnapshot(pDb->pEnv, pDb->pClient); pDb->pClient = 0; + lsmDbDatabaseRelease(pDb); lsmLogClose(pDb); lsmFsClose(pDb->pFS); + assert( pDb->mLock==0 ); /* Invoke any destructors registered for the compression or ** compression factory callbacks. */ if( pDb->factory.xFree ) pDb->factory.xFree(pDb->factory.pCtx); if( pDb->compress.xFree ) pDb->compress.xFree(pDb->compress.pCtx); Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -278,10 +278,14 @@ lsmFsCloseAndDeleteLog(pDb->pFS); if( p->pFile && p->bMultiProc ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1); } } } + + if( pDb->iRwclient>=0 ){ + lsmShmLock(pDb, LSM_LOCK_RWCLIENT(pDb->iRwclient), LSM_LOCK_UNLOCK, 0); + } lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0); lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0); pDb->pShmhdr = 0; } @@ -336,14 +340,25 @@ */ if( rc==LSM_OK ){ rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0); } - /* If anything went wrong, unlock DMS2. Unlock DMS1 in any case. */ + /* If anything went wrong, unlock DMS2. Otherwise, try to take an exclusive + ** lock on one of the LSM_LOCK_RWCLIENT() locks. Unlock DMS1 in any case. */ if( rc!=LSM_OK ){ lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0); pDb->pShmhdr = 0; + }else{ + int i; + for(i=0; iiRwclient = i; + if( rc2!=LSM_BUSY ){ + rc = rc2; + break; + } + } } lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0); return rc; } @@ -1522,17 +1537,17 @@ int iLock, int eOp, /* One of LSM_LOCK_UNLOCK, SHARED or EXCL */ int bBlock /* True for a blocking lock */ ){ lsm_db *pIter; - const u32 me = (1 << (iLock-1)); - const u32 ms = (1 << (iLock+16-1)); + const u64 me = ((u64)1 << (iLock-1)); + const u64 ms = ((u64)1 << (iLock+32-1)); int rc = LSM_OK; Database *p = db->pDatabase; - assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) ); - assert( iLock<=16 ); + assert( iLock>=1 && iLock<=LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1) ); + assert( LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT-1)<=32 ); assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL ); /* Check for a no-op. Proceed only if this is not one of those. */ if( (eOp==LSM_LOCK_UNLOCK && (db->mLock & (me|ms))!=0) || (eOp==LSM_LOCK_SHARED && (db->mLock & (me|ms))!=ms) @@ -1596,12 +1611,12 @@ } #ifdef LSM_DEBUG int shmLockType(lsm_db *db, int iLock){ - const u32 me = (1 << (iLock-1)); - const u32 ms = (1 << (iLock+16-1)); + const u64 me = ((u64)1 << (iLock-1)); + const u64 ms = ((u64)1 << (iLock+32-1)); if( db->mLock & me ) return LSM_LOCK_EXCL; if( db->mLock & ms ) return LSM_LOCK_SHARED; return LSM_LOCK_UNLOCK; } Index: src/lsm_unix.c ================================================================== --- src/lsm_unix.c +++ src/lsm_unix.c @@ -304,11 +304,11 @@ assert( aType[LSM_LOCK_UNLOCK]==F_UNLCK ); assert( aType[LSM_LOCK_SHARED]==F_RDLCK ); assert( aType[LSM_LOCK_EXCL]==F_WRLCK ); assert( eType>=0 && eType0 && iLock<=16 ); + assert( iLock>0 && iLock<=32 ); memset(&lock, 0, sizeof(lock)); lock.l_whence = SEEK_SET; lock.l_len = 1; lock.l_type = aType[eType];