Index: src/kvlsm.c ================================================================== --- src/kvlsm.c +++ src/kvlsm.c @@ -451,11 +451,12 @@ }else{ struct Config { const char *zParam; int eParam; } aConfig[] = { - { "lsm_block_size", LSM_CONFIG_BLOCK_SIZE } + { "lsm_block_size", LSM_CONFIG_BLOCK_SIZE }, + { "lsm_multiple_processes", LSM_CONFIG_MULTIPLE_PROCESSES } }; memset(pNew, 0, sizeof(KVLsm)); pNew->base.pStoreVfunc = &kvlsmMethods; pNew->base.pEnv = pEnv; Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -32,19 +32,26 @@ ** Database structure. There is one such structure for each distinct ** database accessed by this process. They are stored in the singly linked ** list starting at global variable gShared.pDatabase. Database objects are ** reference counted. Once the number of connections to the associated ** database drops to zero, they are removed from the linked list and deleted. +** +** pFile: +** In multi-process mode, this file descriptor is used to obtain locks +** and to access shared-memory. In single process mode, its only job is +** to hold the exclusive lock on the file. +** */ struct Database { /* Protected by the global mutex (enterGlobalMutex/leaveGlobalMutex): */ char *zName; /* Canonical path to database file */ int nName; /* strlen(zName) */ int nDbRef; /* Number of associated lsm_db handles */ Database *pDbNext; /* Next Database structure in global list */ /* Protected by the local mutex (pClientMutex) */ + int bMultiProc; /* True if running in multi-process mode */ lsm_file *pFile; /* Used for locks/shm in multi-proc mode */ LsmFile *pLsmFile; /* List of deferred closes */ lsm_mutex *pClientMutex; /* Protects the apShmChunk[] and pConn */ int nShmChunk; /* Number of entries in apShmChunk[] array */ void **apShmChunk; /* Array of "shared" memory regions */ @@ -267,11 +274,11 @@ ** and, if possible, truncate the database file. */ if( rc==LSM_OK ){ Database *p = pDb->pDatabase; dbTruncateFile(pDb); lsmFsCloseAndDeleteLog(pDb->pFS); - if( p->pFile ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1); + if( p->pFile && p->bMultiProc ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1); } } } lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0); @@ -316,17 +323,21 @@ } }else if( rc==LSM_BUSY ){ rc = LSM_OK; } - /* Take a shared lock on DMS2. This lock "cannot" fail, as connections - ** may only hold an exclusive lock on DMS2 if they first hold an exclusive - ** lock on DMS1. And this connection is currently holding the exclusive - ** lock on DSM1. */ + /* Take a shared lock on DMS2. In multi-process mode this lock "cannot" + ** fail, as connections may only hold an exclusive lock on DMS2 if they + ** first hold an exclusive lock on DMS1. And this connection is currently + ** holding the exclusive lock on DSM1. + ** + ** However, if some other connection has the database open in single-process + ** mode, this operation will fail. In this case, return the error to the + ** caller - the attempt to connect to the db has failed. + */ if( rc==LSM_OK ){ rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0); - assert( rc!=LSM_BUSY ); } /* If anything went wrong, unlock DMS2. Unlock DMS1 in any case. */ if( rc!=LSM_OK ){ lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0); @@ -361,11 +372,11 @@ assert( pDb->pDatabase==0 ); rc = enterGlobalMutex(pEnv); if( rc==LSM_OK ){ /* Search the global list for an existing object. TODO: Need something - ** better than the strcmp() below to figure out if a given Database + ** better than the memcmp() below to figure out if a given Database ** object represents the requested file. */ for(p=gShared.pDatabase; p; p=p->pDbNext){ if( nName==p->nName && 0==memcmp(zName, p->zName, nName) ) break; } @@ -374,21 +385,26 @@ p = (Database *)lsmMallocZeroRc(pEnv, sizeof(Database)+nName+1, &rc); /* If the allocation was successful, fill in other fields and ** allocate the client mutex. */ if( rc==LSM_OK ){ + p->bMultiProc = pDb->bMultiProc; p->zName = (char *)&p[1]; p->nName = nName; memcpy((void *)p->zName, zName, nName+1); rc = lsmMutexNew(pEnv, &p->pClientMutex); } - /* If running in multi-process mode and nothing has gone wrong so far, - ** open the shared fd */ - if( rc==LSM_OK && pDb->bMultiProc ){ + /* If nothing has gone wrong so far, open the shared fd. And if that + ** succeeds and this connection requested single-process mode, + ** attempt to take the exclusive lock on DMS2. */ + if( rc==LSM_OK ){ rc = lsmEnvOpen(pDb->pEnv, p->zName, &p->pFile); } + if( rc==LSM_OK && p->bMultiProc==0 ){ + rc = lsmEnvLock(pDb->pEnv, p->pFile, LSM_LOCK_DMS2, LSM_LOCK_EXCL); + } if( rc==LSM_OK ){ p->pDbNext = gShared.pDatabase; gShared.pDatabase = p; }else{ @@ -458,38 +474,37 @@ } lsmMutexEnter(pDb->pEnv, p->pClientMutex); for(ppDb=&p->pConn; *ppDb!=pDb; ppDb=&((*ppDb)->pNext)); *ppDb = pDb->pNext; - if( lsmDbMultiProc(pDb) ){ - dbDeferClose(pDb); - } + dbDeferClose(pDb); lsmMutexLeave(pDb->pEnv, p->pClientMutex); enterGlobalMutex(pDb->pEnv); p->nDbRef--; if( p->nDbRef==0 ){ + LsmFile *pIter; + LsmFile *pNext; Database **pp; /* Remove the Database structure from the linked list. */ for(pp=&gShared.pDatabase; *pp!=p; pp=&((*pp)->pDbNext)); *pp = p->pDbNext; - /* Free the Database object and shared memory buffers. */ - if( p->pFile==0 ){ + /* If they were allocated from the heap, free the shared memory chunks */ + if( p->bMultiProc==0 ){ int i; for(i=0; inShmChunk; i++){ lsmFree(pDb->pEnv, p->apShmChunk[i]); } - }else{ - LsmFile *pIter; - LsmFile *pNext; - for(pIter=p->pLsmFile; pIter; pIter=pNext){ - pNext = pIter->pNext; - lsmEnvClose(pDb->pEnv, pIter->pFile); - lsmFree(pDb->pEnv, pIter); - } + } + + /* Close any outstanding file descriptors */ + for(pIter=p->pLsmFile; pIter; pIter=pNext){ + pNext = pIter->pNext; + lsmEnvClose(pDb->pEnv, pIter->pFile); + lsmFree(pDb->pEnv, pIter); } freeDatabase(pDb->pEnv, p); } leaveGlobalMutex(pDb->pEnv); } @@ -1293,11 +1308,11 @@ ** This function may only be called after a successful call to ** lsmDbDatabaseConnect(). It returns true if the connection is in ** multi-process mode, or false otherwise. */ int lsmDbMultiProc(lsm_db *pDb){ - return pDb->pDatabase && (pDb->pDatabase->pFile!=0); + return pDb->pDatabase && pDb->pDatabase->bMultiProc; } /************************************************************************* ************************************************************************** @@ -1351,11 +1366,11 @@ } for(i=db->nShm; rc==LSM_OK && i=p->nShmChunk ){ void *pChunk = 0; - if( p->pFile==0 ){ + if( p->bMultiProc==0 ){ /* Single process mode */ pChunk = lsmMallocZeroRc(pEnv, LSM_SHM_CHUNK_SIZE, &rc); }else{ /* Multi-process mode */ rc = lsmEnvShmMap(pEnv, p->pFile, i, LSM_SHM_CHUNK_SIZE, &pChunk); @@ -1375,10 +1390,18 @@ lsmMutexLeave(pEnv, p->pClientMutex); } return rc; } + +static int lockSharedFile(lsm_env *pEnv, Database *p, int iLock, int eOp){ + int rc = LSM_OK; + if( p->bMultiProc ){ + rc = lsmEnvLock(pEnv, p->pFile, iLock, eOp); + } + return rc; +} /* ** Attempt to obtain the lock identified by the iLock and bExcl parameters. ** If successful, return LSM_OK. If the lock cannot be obtained because ** there exists some other conflicting lock, return LSM_BUSY. If some other @@ -1429,21 +1452,21 @@ assert( nExcl==0 || (db->mLock & (me|ms))==0 ); switch( eOp ){ case LSM_LOCK_UNLOCK: if( nShared==0 ){ - lsmEnvLock(db->pEnv, p->pFile, iLock, LSM_LOCK_UNLOCK); + lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_UNLOCK); } db->mLock &= ~(me|ms); break; case LSM_LOCK_SHARED: if( nExcl ){ rc = LSM_BUSY; }else{ if( nShared==0 ){ - rc = lsmEnvLock(db->pEnv, p->pFile, iLock, LSM_LOCK_SHARED); + rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_SHARED); } db->mLock |= ms; db->mLock &= ~me; } break; @@ -1451,11 +1474,11 @@ default: assert( eOp==LSM_LOCK_EXCL ); if( nExcl || nShared ){ rc = LSM_BUSY; }else{ - rc = lsmEnvLock(db->pEnv, p->pFile, iLock, LSM_LOCK_EXCL); + rc = lockSharedFile(db->pEnv, p, iLock, LSM_LOCK_EXCL); db->mLock |= (me|ms); } break; } Index: www/lsmusr.wiki ================================================================== --- www/lsmusr.wiki +++ www/lsmusr.wiki @@ -804,11 +804,11 @@ Once sufficient data has been accumulated in an in-memory tree (by default "sufficient data" means 1MB, including data structure overhead), it is marked as "old" and a new "live" in-memory tree created. An old in-memory tree is immutable - new data is always inserted into the live tree. There may be at most one old tree - in memory at any time. + in memory at a time.
  • The contents of an old in-memory tree may be written into the database file at any point. Once its contents have been written (or "flushed") to the database file, the in-memory tree may be discarded. @@ -816,21 +816,21 @@ "segment". A database segment is an immutable b-tree structure stored within the database file. A single database file may contain up to 64 segments.

  • - At any point, two or more existing segments within the database may - be merged together into a single segment. Once their contents has + At any point, two or more existing segments within the database file + may be merged together into a single segment. Once their contents has been merged into the new segment, the original segments may be discarded.

  • After the set of segments in a database file has been modified (either by flushing an in-memory tree to disk or by merging existing segments together), the changes may be made persistent by "checkpointing" the - database. Checkpointing involves syncing the contents of the database - file to disk and updating the database file header. + database. Checkpointing involves updating the database file header and + and (usually) syncing the contents of the database file to disk.

    Steps 3 and 4 above are known as "working" on the database. Step 5 is refered to as "checkpointing". By default, database connections perform work and checkpoint operations periodically from within calls to API functions @@ -996,12 +996,11 @@

    This option can only be set before lsm_open() is called on the database connection.

    If this option is set to false and there is already a connection to the database from another process when lsm_open() is called, the lsm_open() - call fails with error code LSM_BUSY. todo: It - doesn't actually do this yet. But it should... + call fails with error code LSM_BUSY.

    LSM_CONFIG_SAFETY

    The effect of this option on data durability is described above. @@ -1301,10 +1300,13 @@ example: rc = lsm_work(db, 1, -1, 0); + +

    todo: the -1 as the 3rd argument above is currently +not supported

    When optimizing the database as above, either the LSM_CONFIG_AUTOCHECKPOINT parameter should be set to a non-zero value or lsm_checkpoint() should be called periodically. Otherwise, no checkpoints will be performed, preventing the library from reusing any space occupied by old segments even after their