/* ** 2007 September 9 ** ** The author disclaims copyright to this source code. In place of ** a legal notice, here is a blessing: ** ** May you do good and not evil. ** May you find forgiveness for yourself and forgive others. ** May you share freely, never taking more than you give. ** ************************************************************************* ** ** This file contains the implementation of some Tcl commands used to ** test that sqlite4 database handles may be concurrently accessed by ** multiple threads. Right now this only works on unix. */ #include "sqliteInt.h" #include #if SQLITE4_THREADSAFE #include #if !defined(_MSC_VER) #include #endif /* ** One of these is allocated for each thread created by [sqlthread spawn]. */ typedef struct SqlThread SqlThread; struct SqlThread { Tcl_ThreadId parent; /* Thread id of parent thread */ Tcl_Interp *interp; /* Parent interpreter */ char *zScript; /* The script to execute. */ char *zVarname; /* Varname in parent script */ }; /* ** A custom Tcl_Event type used by this module. When the event is ** handled, script zScript is evaluated in interpreter interp. If ** the evaluation throws an exception (returns TCL_ERROR), then the ** error is handled by Tcl_BackgroundError(). If no error occurs, ** the result is simply discarded. */ typedef struct EvalEvent EvalEvent; struct EvalEvent { Tcl_Event base; /* Base class of type Tcl_Event */ char *zScript; /* The script to execute. */ Tcl_Interp *interp; /* The interpreter to execute it in. */ }; static Tcl_ObjCmdProc sqlthread_proc; static Tcl_ObjCmdProc clock_seconds_proc; #if SQLITE4_OS_UNIX && defined(SQLITE4_ENABLE_UNLOCK_NOTIFY) static Tcl_ObjCmdProc blocking_step_proc; static Tcl_ObjCmdProc blocking_prepare_proc; #endif int Sqlitetest1_Init(Tcl_Interp *); int Sqlite3_Init(Tcl_Interp *); /* Functions from test1.c */ void *sqlite4TestTextToPtr(const char *); const char *sqlite4TestErrorName(int); int getDbPointer(Tcl_Interp *, const char *, sqlite4 **); int sqlite4TestMakePointerStr(Tcl_Interp *, char *, void *); int sqlite4TestErrCode(Tcl_Interp *, sqlite4 *, int); /* ** Handler for events of type EvalEvent. */ static int tclScriptEvent(Tcl_Event *evPtr, int flags){ int rc; EvalEvent *p = (EvalEvent *)evPtr; rc = Tcl_Eval(p->interp, p->zScript); if( rc!=TCL_OK ){ Tcl_BackgroundError(p->interp); } UNUSED_PARAMETER(flags); return 1; } /* ** Register an EvalEvent to evaluate the script pScript in the ** parent interpreter/thread of SqlThread p. */ static void postToParent(SqlThread *p, Tcl_Obj *pScript){ EvalEvent *pEvent; char *zMsg; int nMsg; zMsg = Tcl_GetStringFromObj(pScript, &nMsg); pEvent = (EvalEvent *)ckalloc(sizeof(EvalEvent)+nMsg+1); pEvent->base.nextPtr = 0; pEvent->base.proc = tclScriptEvent; pEvent->zScript = (char *)&pEvent[1]; memcpy(pEvent->zScript, zMsg, nMsg+1); pEvent->interp = p->interp; Tcl_ThreadQueueEvent(p->parent, (Tcl_Event *)pEvent, TCL_QUEUE_TAIL); Tcl_ThreadAlert(p->parent); } /* ** The main function for threads created with [sqlthread spawn]. */ static Tcl_ThreadCreateType tclScriptThread(ClientData pSqlThread){ Tcl_Interp *interp; Tcl_Obj *pRes; Tcl_Obj *pList; int rc; SqlThread *p = (SqlThread *)pSqlThread; extern int Sqlitetest_mutex_Init(Tcl_Interp*); interp = Tcl_CreateInterp(); Tcl_CreateObjCommand(interp, "clock_seconds", clock_seconds_proc, 0, 0); Tcl_CreateObjCommand(interp, "sqlthread", sqlthread_proc, pSqlThread, 0); #if SQLITE4_OS_UNIX && defined(SQLITE4_ENABLE_UNLOCK_NOTIFY) Tcl_CreateObjCommand(interp, "sqlite4_blocking_step", blocking_step_proc,0,0); Tcl_CreateObjCommand(interp, "sqlite4_blocking_prepare", blocking_prepare_proc, (void *)1, 0); Tcl_CreateObjCommand(interp, "sqlite4_nonblocking_prepare", blocking_prepare_proc, 0, 0); #endif Sqlitetest1_Init(interp); Sqlitetest_mutex_Init(interp); Sqlite3_Init(interp); rc = Tcl_Eval(interp, p->zScript); pRes = Tcl_GetObjResult(interp); pList = Tcl_NewObj(); Tcl_IncrRefCount(pList); Tcl_IncrRefCount(pRes); if( rc!=TCL_OK ){ Tcl_ListObjAppendElement(interp, pList, Tcl_NewStringObj("error", -1)); Tcl_ListObjAppendElement(interp, pList, pRes); postToParent(p, pList); Tcl_DecrRefCount(pList); pList = Tcl_NewObj(); } Tcl_ListObjAppendElement(interp, pList, Tcl_NewStringObj("set", -1)); Tcl_ListObjAppendElement(interp, pList, Tcl_NewStringObj(p->zVarname, -1)); Tcl_ListObjAppendElement(interp, pList, pRes); postToParent(p, pList); ckfree((void *)p); Tcl_DecrRefCount(pList); Tcl_DecrRefCount(pRes); Tcl_DeleteInterp(interp); while( Tcl_DoOneEvent(TCL_ALL_EVENTS|TCL_DONT_WAIT) ); Tcl_ExitThread(0); TCL_THREAD_CREATE_RETURN; } /* ** sqlthread spawn VARNAME SCRIPT ** ** Spawn a new thread with its own Tcl interpreter and run the ** specified SCRIPT(s) in it. The thread terminates after running ** the script. The result of the script is stored in the variable ** VARNAME. ** ** The caller can wait for the script to terminate using [vwait VARNAME]. */ static int sqlthread_spawn( ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ Tcl_ThreadId x; SqlThread *pNew; int rc; int nVarname; char *zVarname; int nScript; char *zScript; /* Parameters for thread creation */ const int nStack = TCL_THREAD_STACK_DEFAULT; const int flags = TCL_THREAD_NOFLAGS; assert(objc==4); UNUSED_PARAMETER(clientData); UNUSED_PARAMETER(objc); zVarname = Tcl_GetStringFromObj(objv[2], &nVarname); zScript = Tcl_GetStringFromObj(objv[3], &nScript); pNew = (SqlThread *)ckalloc(sizeof(SqlThread)+nVarname+nScript+2); pNew->zVarname = (char *)&pNew[1]; pNew->zScript = (char *)&pNew->zVarname[nVarname+1]; memcpy(pNew->zVarname, zVarname, nVarname+1); memcpy(pNew->zScript, zScript, nScript+1); pNew->parent = Tcl_GetCurrentThread(); pNew->interp = interp; rc = Tcl_CreateThread(&x, tclScriptThread, (void *)pNew, nStack, flags); if( rc!=TCL_OK ){ Tcl_AppendResult(interp, "Error in Tcl_CreateThread()", 0); ckfree((char *)pNew); return TCL_ERROR; } return TCL_OK; } /* ** sqlthread parent SCRIPT ** ** This can be called by spawned threads only. It sends the specified ** script back to the parent thread for execution. The result of ** evaluating the SCRIPT is returned. The parent thread must enter ** the event loop for this to work - otherwise the caller will ** block indefinitely. ** ** NOTE: At the moment, this doesn't work. FIXME. */ static int sqlthread_parent( ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ EvalEvent *pEvent; char *zMsg; int nMsg; SqlThread *p = (SqlThread *)clientData; assert(objc==3); UNUSED_PARAMETER(objc); if( p==0 ){ Tcl_AppendResult(interp, "no parent thread", 0); return TCL_ERROR; } zMsg = Tcl_GetStringFromObj(objv[2], &nMsg); pEvent = (EvalEvent *)ckalloc(sizeof(EvalEvent)+nMsg+1); pEvent->base.nextPtr = 0; pEvent->base.proc = tclScriptEvent; pEvent->zScript = (char *)&pEvent[1]; memcpy(pEvent->zScript, zMsg, nMsg+1); pEvent->interp = p->interp; Tcl_ThreadQueueEvent(p->parent, (Tcl_Event *)pEvent, TCL_QUEUE_TAIL); Tcl_ThreadAlert(p->parent); return TCL_OK; } /* ** sqlthread open ** ** Open a database handle and return the string representation of ** the pointer value. */ static int sqlthread_open( ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ int sqlite4TestMakePointerStr(Tcl_Interp *interp, char *zPtr, void *p); const char *zFilename; sqlite4 *db; int rc; char zBuf[100]; extern void Md5_Register(sqlite4*); UNUSED_PARAMETER(clientData); UNUSED_PARAMETER(objc); zFilename = Tcl_GetString(objv[2]); rc = sqlite4_open(0, zFilename, &db, 0); Md5_Register(db); if( sqlite4TestMakePointerStr(interp, zBuf, db) ) return TCL_ERROR; Tcl_AppendResult(interp, zBuf, 0); return TCL_OK; } /* ** sqlthread open ** ** Return the current thread-id (Tcl_GetCurrentThread()) cast to ** an integer. */ static int sqlthread_id( ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ Tcl_ThreadId id = Tcl_GetCurrentThread(); Tcl_SetObjResult(interp, Tcl_NewIntObj(SQLITE4_PTR_TO_INT(id))); UNUSED_PARAMETER(clientData); UNUSED_PARAMETER(objc); UNUSED_PARAMETER(objv); return TCL_OK; } /* ** Dispatch routine for the sub-commands of [sqlthread]. */ static int sqlthread_proc( ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ struct SubCommand { char *zName; Tcl_ObjCmdProc *xProc; int nArg; char *zUsage; } aSub[] = { {"parent", sqlthread_parent, 1, "SCRIPT"}, {"spawn", sqlthread_spawn, 2, "VARNAME SCRIPT"}, {"open", sqlthread_open, 1, "DBNAME"}, {"id", sqlthread_id, 0, ""}, {0, 0, 0} }; struct SubCommand *pSub; int rc; int iIndex; if( objc<2 ){ Tcl_WrongNumArgs(interp, 1, objv, "SUB-COMMAND"); return TCL_ERROR; } rc = Tcl_GetIndexFromObjStruct( interp, objv[1], aSub, sizeof(aSub[0]), "sub-command", 0, &iIndex ); if( rc!=TCL_OK ) return rc; pSub = &aSub[iIndex]; if( objc<(pSub->nArg+2) ){ Tcl_WrongNumArgs(interp, 2, objv, pSub->zUsage); return TCL_ERROR; } return pSub->xProc(clientData, interp, objc, objv); } /* ** The [clock_seconds] command. This is more or less the same as the ** regular tcl [clock seconds], except that it is available in testfixture ** when linked against both Tcl 8.4 and 8.5. Because [clock seconds] is ** implemented as a script in Tcl 8.5, it is not usually available to ** testfixture. */ static int clock_seconds_proc( ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ Tcl_Time now; Tcl_GetTime(&now); Tcl_SetObjResult(interp, Tcl_NewIntObj(now.sec)); UNUSED_PARAMETER(clientData); UNUSED_PARAMETER(objc); UNUSED_PARAMETER(objv); return TCL_OK; } /************************************************************************* ** This block contains the implementation of the [sqlite4_blocking_step] ** command available to threads created by [sqlthread spawn] commands. It ** is only available on UNIX for now. This is because pthread condition ** variables are used. ** ** The source code for the C functions sqlite4_blocking_step(), ** blocking_step_notify() and the structure UnlockNotification is ** automatically extracted from this file and used as part of the ** documentation for the sqlite4_unlock_notify() API function. This ** should be considered if these functions are to be extended (i.e. to ** support windows) in the future. */ #if SQLITE4_OS_UNIX && defined(SQLITE4_ENABLE_UNLOCK_NOTIFY) /* BEGIN_SQLITE4_BLOCKING_STEP */ /* This example uses the pthreads API */ #include /* ** A pointer to an instance of this structure is passed as the user-context ** pointer when registering for an unlock-notify callback. */ typedef struct UnlockNotification UnlockNotification; struct UnlockNotification { int fired; /* True after unlock event has occurred */ pthread_cond_t cond; /* Condition variable to wait on */ pthread_mutex_t mutex; /* Mutex to protect structure */ }; /* ** This function is an unlock-notify callback registered with SQLite. */ static void unlock_notify_cb(void **apArg, int nArg){ int i; for(i=0; imutex); p->fired = 1; pthread_cond_signal(&p->cond); pthread_mutex_unlock(&p->mutex); } } /* ** This function assumes that an SQLite API call (either sqlite4_prepare() ** or sqlite4_step()) has just returned SQLITE4_LOCKED. The argument is the ** associated database connection. ** ** This function calls sqlite4_unlock_notify() to register for an ** unlock-notify callback, then blocks until that callback is delivered ** and returns SQLITE4_OK. The caller should then retry the failed operation. ** ** Or, if sqlite4_unlock_notify() indicates that to block would deadlock ** the system, then this function returns SQLITE4_LOCKED immediately. In ** this case the caller should not retry the operation and should roll ** back the current transaction (if any). */ static int wait_for_unlock_notify(sqlite4 *db){ int rc; UnlockNotification un; /* Initialize the UnlockNotification structure. */ un.fired = 0; pthread_mutex_init(&un.mutex, 0); pthread_cond_init(&un.cond, 0); /* Register for an unlock-notify callback. */ rc = sqlite4_unlock_notify(db, unlock_notify_cb, (void *)&un); assert( rc==SQLITE4_LOCKED || rc==SQLITE4_OK ); /* The call to sqlite4_unlock_notify() always returns either SQLITE4_LOCKED ** or SQLITE4_OK. ** ** If SQLITE4_LOCKED was returned, then the system is deadlocked. In this ** case this function needs to return SQLITE4_LOCKED to the caller so ** that the current transaction can be rolled back. Otherwise, block ** until the unlock-notify callback is invoked, then return SQLITE4_OK. */ if( rc==SQLITE4_OK ){ pthread_mutex_lock(&un.mutex); if( !un.fired ){ pthread_cond_wait(&un.cond, &un.mutex); } pthread_mutex_unlock(&un.mutex); } /* Destroy the mutex and condition variables. */ pthread_cond_destroy(&un.cond); pthread_mutex_destroy(&un.mutex); return rc; } /* ** This function is a wrapper around the SQLite function sqlite4_step(). */ int sqlite4_blocking_step(sqlite4_stmt *pStmt){ int rc; while( SQLITE4_LOCKED==(rc = sqlite4_step(pStmt)) ){ rc = wait_for_unlock_notify(sqlite4_db_handle(pStmt)); if( rc!=SQLITE4_OK ) break; sqlite4_reset(pStmt); } return rc; } /* ** This function is a wrapper around the SQLite function sqlite4_prepare(). ** It functions in the same way as prepare_v2(), except that if a required ** shared-cache lock cannot be obtained, this function may block waiting for ** the lock to become available. In this scenario the normal API prepare_v2() ** function always returns SQLITE4_LOCKED. ** ** If this function returns SQLITE4_LOCKED, the caller should rollback ** the current transaction (if any) and try again later. Otherwise, the ** system may become deadlocked. */ int sqlite4_blocking_prepare( sqlite4 *db, /* Database handle. */ const char *zSql, /* UTF-8 encoded SQL statement. */ int nSql, /* Length of zSql in bytes. */ sqlite4_stmt **ppStmt, /* OUT: A pointer to the prepared statement */ int *pn /* OUT: Bytes read from zSql */ ){ int rc; while( SQLITE4_LOCKED==(rc = sqlite4_prepare(db, zSql, nSql, ppStmt, pn)) ){ rc = wait_for_unlock_notify(db); if( rc!=SQLITE4_OK ) break; } return rc; } /* END_SQLITE4_BLOCKING_STEP */ /* ** Usage: sqlite4_blocking_step STMT ** ** Advance the statement to the next row. */ static int blocking_step_proc( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ sqlite4_stmt *pStmt; int rc; if( objc!=2 ){ Tcl_WrongNumArgs(interp, 1, objv, "STMT"); return TCL_ERROR; } pStmt = (sqlite4_stmt*)sqlite4TestTextToPtr(Tcl_GetString(objv[1])); rc = sqlite4_blocking_step(pStmt); Tcl_SetResult(interp, (char *)sqlite4TestErrorName(rc), 0); return TCL_OK; } /* ** Usage: sqlite4_blocking_prepare DB sql bytes ?tailvar? ** Usage: sqlite4_nonblocking_prepare DB sql bytes ?tailvar? */ static int blocking_prepare_proc( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ sqlite4 *db; const char *zSql; int bytes; int nUsed; sqlite4_stmt *pStmt = 0; char zBuf[50]; int rc; int isBlocking = !(clientData==0); if( objc!=5 && objc!=4 ){ Tcl_AppendResult(interp, "wrong # args: should be \"", Tcl_GetString(objv[0]), " DB sql bytes tailvar", 0); return TCL_ERROR; } if( getDbPointer(interp, Tcl_GetString(objv[1]), &db) ) return TCL_ERROR; zSql = Tcl_GetString(objv[2]); if( Tcl_GetIntFromObj(interp, objv[3], &bytes) ) return TCL_ERROR; if( isBlocking ){ rc = sqlite4_blocking_prepare(db, zSql, bytes, &pStmt, &nUsed); }else{ rc = sqlite4_prepare(db, zSql, bytes, &pStmt, &nUsed); } assert(rc==SQLITE4_OK || pStmt==0); if( objc>=5 ){ const char *zTail = zSql[nUsed]; int nTail = -1; if( bytes>=0 ){ nTail = bytes - nUsed; } Tcl_ObjSetVar2(interp, objv[4], 0, Tcl_NewStringObj(zTail, nTail), 0); } if( rc!=SQLITE4_OK ){ assert( pStmt==0 ); sprintf(zBuf, "%s ", (char *)sqlite4TestErrorName(rc)); Tcl_AppendResult(interp, zBuf, sqlite4_errmsg(db), 0); return TCL_ERROR; } if( pStmt ){ if( sqlite4TestMakePointerStr(interp, zBuf, pStmt) ) return TCL_ERROR; Tcl_AppendResult(interp, zBuf, 0); } return TCL_OK; } #endif /* SQLITE4_OS_UNIX && SQLITE4_ENABLE_UNLOCK_NOTIFY */ /* ** End of implementation of [sqlite4_blocking_step]. ************************************************************************/ /* ** Register commands with the TCL interpreter. */ int SqlitetestThread_Init(Tcl_Interp *interp){ Tcl_CreateObjCommand(interp, "sqlthread", sqlthread_proc, 0, 0); Tcl_CreateObjCommand(interp, "clock_seconds", clock_seconds_proc, 0, 0); #if SQLITE4_OS_UNIX && defined(SQLITE4_ENABLE_UNLOCK_NOTIFY) Tcl_CreateObjCommand(interp, "sqlite4_blocking_step", blocking_step_proc,0,0); Tcl_CreateObjCommand(interp, "sqlite4_blocking_prepare", blocking_prepare_proc, (void *)1, 0); Tcl_CreateObjCommand(interp, "sqlite4_nonblocking_prepare", blocking_prepare_proc, 0, 0); #endif return TCL_OK; } #else int SqlitetestThread_Init(Tcl_Interp *interp){ return TCL_OK; } #endif