Index: src/server.c ================================================================== --- src/server.c +++ src/server.c @@ -63,10 +63,11 @@ */ #define slotGetSlowReaders(v) (((v) & HMA_SLOT_TR_MASK) >> HMA_SLOT_RLWL_BITS) #define slotReaderMask(v) ((v) & HMA_SLOT_RL_MASK) +#define fdOpen(pFd) ((pFd)->pMethods!=0) /* ** Atomic CAS primitive used in multi-process mode. Equivalent to: ** ** int serverCompareAndSwap(u32 *ptr, u32 oldval, u32 newval){ @@ -82,15 +83,10 @@ typedef struct ServerDb ServerDb; typedef struct ServerJournal ServerJournal; -struct ServerGlobal { - ServerDb *pDb; /* Linked list of all ServerDb objects */ -}; -static struct ServerGlobal g_server; - struct ServerJournal { char *zJournal; sqlite3_file *jfd; }; @@ -97,23 +93,29 @@ /* ** There is one instance of the following structure for each distinct ** database file opened in server mode by this process. */ struct ServerDb { - sqlite3_mutex *mutex; /* Non-recursive mutex */ + i64 aFileId[2]; /* Opaque VFS file-id */ + ServerDb *pNext; /* Next db in this process */ int nClient; /* Current number of clients */ + sqlite3_mutex *mutex; /* Non-recursive mutex */ + + /* Variables above this point are protected by the global mutex - + ** serverEnterMutex()/LeaveMutex(). Those below this point are + ** protected by the ServerDb.mutex mutex. */ + int bInit; /* True once initialized */ u32 transmask; /* Bitmask of taken transaction ids */ u32 *aSlot; /* Array of page locking slots */ - i64 aFileId[2]; /* Opaque VFS file-id */ - ServerDb *pNext; /* Next db in this process */ sqlite3_vfs *pVfs; ServerJournal aJrnl[HMA_MAX_TRANSACTIONID]; u8 *aJrnlFdSpace; - void *pServerShm; + void *pServerShm; /* SHMOPEN handle (multi-process only) */ + u32 *aClient; /* Client "transaction active" flags */ int iNextCommit; /* Commit id for next pre-commit call */ Server *pCommit; /* List of connections currently commiting */ Server *pReader; /* Connections in slower-reader transaction */ ServerPage *pPgFirst; /* First (oldest) in list of pages */ @@ -135,10 +137,16 @@ int nAlloc; /* Allocated size of aLock[] array */ int nLock; /* Number of entries in aLock[] */ u32 *aLock; /* Array of held locks */ Server *pNext; /* Next in pCommit or pReader list */ }; + +struct ServerGlobal { + ServerDb *pDb; /* Linked list of all ServerDb objects */ +}; +static struct ServerGlobal g_server; + struct ServerFcntlArg { void *h; /* Handle from SHMOPEN */ void *p; /* Mapping */ int i1; /* Integer value 1 */ @@ -164,13 +172,15 @@ sqlite3_mutex_enter(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)); } static void serverLeaveMutex(void){ sqlite3_mutex_leave(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)); } +#if 0 static void serverAssertMutexHeld(void){ assert( sqlite3_mutex_held(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)) ); } +#endif /* ** Locate the ServerDb object shared by all connections to the db identified ** by aFileId[2], increment its ref count and set pNew->pDb to point to it. ** In this context "locate" may mean to find an existing object or to @@ -218,11 +228,11 @@ ServerDb *pDb = p->pDb; ServerJournal *pJ = &pDb->aJrnl[iClient]; int bExist = 1; int rc = SQLITE_OK; - if( pJ->jfd->pMethods==0 ){ + if( fdOpen(pJ->jfd)==0 ){ bExist = 0; rc = sqlite3OsAccess(pDb->pVfs, pJ->zJournal, SQLITE_ACCESS_EXISTS,&bExist); if( bExist && rc==SQLITE_OK ){ int flags = SQLITE_OPEN_READWRITE|SQLITE_OPEN_MAIN_JOURNAL; rc = sqlite3OsOpen(pDb->pVfs, pJ->zJournal, pJ->jfd, flags, &flags); @@ -246,22 +256,24 @@ int bDelete ){ ServerDb *pDb = p->pDb; int i; + assert( pDb->pServerShm || bDelete ); for(i=0; iaJrnl[i]; - if( pDb->pServerShm && bDelete ){ + if( bDelete && (pDb->pServerShm || fdOpen(pJ->jfd)) ){ int rc = serverClientRollback(p, i); if( rc!=SQLITE_OK ) bDelete = 0; } - if( pJ->jfd ){ + if( fdOpen(pJ->jfd) ){ sqlite3OsClose(pJ->jfd); if( bDelete ) sqlite3OsDelete(pDb->pVfs, pJ->zJournal, 0); } + sqlite3_free(pJ->zJournal); } memset(pDb->aJrnl, 0, sizeof(ServerJournal)*HMA_MAX_TRANSACTIONID); if( pDb->aJrnlFdSpace ){ @@ -279,10 +291,18 @@ } pDb->aSlot = 0; pDb->bInit = 0; } +/* +** Clear all page locks held by client iClient. The handle passed as the +** first argument may or may not correspond to client iClient. +** +** This function is called in multi-process mode as part of restoring the +** system state after it has been detected that client iClient may have +** failed mid transaction. It is never called for a single process system. +*/ static void serverClientUnlock(Server *p, int iClient){ ServerDb *pDb = p->pDb; int i; assert( pDb->pServerShm ); @@ -324,16 +344,17 @@ }else{ if( eServer==2 ){ ServerFcntlArg arg; arg.h = 0; arg.p = 0; - arg.i1 = sizeof(u32)*HMA_PAGELOCK_SLOTS; + arg.i1 = sizeof(u32)*(HMA_PAGELOCK_SLOTS + HMA_MAX_TRANSACTIONID); arg.i2 = 0; rc = sqlite3OsFileControl(dbfd, SQLITE_FCNTL_SERVER_SHMOPEN, (void*)&arg); if( rc==SQLITE_OK ){ pDb->aSlot = (u32*)arg.p; + pDb->aClient = &pDb->aSlot[HMA_PAGELOCK_SLOTS]; pDb->pServerShm = arg.h; bRollback = arg.i2; } }else{ pDb->aSlot = (u32*)sqlite3MallocZero(sizeof(u32)*HMA_PAGELOCK_SLOTS); @@ -497,11 +518,11 @@ /* If this is a multi-process database, it may be that the previous ** user of client-id pNew->iTransId crashed mid transaction. Roll ** back any hot journal file in the file-system and release ** page locks held by any crashed process. TODO: The call to ** serverClientUnlock() is expensive. */ - if( rc==SQLITE_OK && pDb->pServerShm ){ + if( rc==SQLITE_OK && pDb->pServerShm && pDb->aClient[pNew->iTransId] ){ serverClientUnlock(pNew, pNew->iTransId); rc = serverClientRollback(pNew, pNew->iTransId); } } }else{ @@ -529,10 +550,11 @@ u32 t; assert( p->pNext==0 ); if( pDb->pServerShm ){ p->eTrans = SERVER_TRANS_READWRITE; + pDb->aClient[p->iTransId] = 1; }else{ assert( p->iTransId<0 ); sqlite3_mutex_enter(pDb->mutex); if( bReadonly ){ Server *pIter; @@ -676,11 +698,10 @@ } sqlite3_mutex_leave(pDb->mutex); p->pNext = 0; - p->eTrans = SERVER_TRANS_NONE; p->iTransId = -1; p->iCommitId = 0; } /* @@ -688,13 +709,15 @@ */ int sqlite3ServerEnd(Server *p){ if( p->eTrans!=SERVER_TRANS_NONE ){ if( p->pDb->pServerShm ){ serverReleaseLocks(p); + p->pDb->aClient[p->iTransId] = 0; }else{ serverEndSingle(p); } + p->eTrans = SERVER_TRANS_NONE; } return SQLITE_OK; } int sqlite3ServerPreCommit(Server *p, ServerPage *pPg){