/* ** 2023 December 20 ** ** The author disclaims copyright to this source code. In place of ** a legal notice, here is a blessing: ** ** May you do good and not evil. ** May you find forgiveness for yourself and forgive others. ** May you share freely, never taking more than you give. ** ************************************************************************* */ #include <tcl.h> #include <assert.h> #include <sqlite3.h> #include <sqlite3hct.h> #include <string.h> #include <pthread.h> #include <arpa/inet.h> #include <poll.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #ifndef MIN # define MIN(A,B) ((A)<(B)?(A):(B)) #endif #ifndef MAX # define MAX(A,B) ((A)>(B)?(A):(B)) #endif typedef sqlite3_int64 i64; typedef unsigned char u8; /* ** Default port for leader to listen for connections on. This can be ** overidden with -port. */ #define TESTSERVER_DEFAULT_PORT 21212 /* ** Maximum jobs and followers for a single leader process. */ #define TESTSERVER_MAX_JOB 64 /* Max number of jobs per process */ #define TESTSERVER_MAX_FOLLOWER 16 /* Max number of follower nodes */ /* ** The protocol used to communicate between follower and leader nodes is ** very simple. The follower connects to the leader and sends either a ** SYNC or SUB message as described below. The leader replies with a ** SYNCREPLY, SUBREPLY or ERROR message, and then closes the socket ** connection. ** ** Message Formats (follower to leader) ** ------------------------------------ ** ** Each message begins with a 32-bit message type - one ** of the TESTSERVER_MESSAGE_* values defined below. The remainder of ** the message depends on the message type: ** ** TESTSERVER_MESSAGE_SYNC: ** * A 64-bit CID value, and ** * A SQLITE_HCT_JOURNAL_HASHSIZE byte xor/hash value. ** ** TESTSERVER_MESSAGE_SUB: ** * A 32-bit job number (jobs are numbered from 0 to (nJob-1)). ** ** Message Formats (leader to follower) ** ------------------------------------ ** ** TESTSERVER_MESSAGE_SYNCREPLY: ** * A 32-bit integer containing the number of jobs. ** * Zero or more journal entries, each formatted as: ** * A 32-bit size field - the size of the following 4 fields combined. ** * A 64-bit "cid" value, ** * A 64-bit "schemacid" value, ** * A nul-terminated utf-8 "schema" string, ** * A blob of "data" for the entry. ** * A 32-bit 0 - "0x00 0x00 0x00 0x00". ** ** TESTSERVER_MESSAGE_SUBREPLY: ** * Zero or more journal entries, formatted as for SYNCREPLY. ** ** TESTSERVER_MESSAGE_ERROR: ** * A 32-bit size field (nSz), ** * Error string (nSz bytes, including nul-term). ** ** Protocol ** -------- ** ** The leader node listens on a well-known tcp/ip port for connections. ** To connect a follower to a leader: ** ** * The follower connects and sends a TESTSERVER_MESSAGE_SYNC message. ** ** * The leader replies with either a TESTSERVER_MESSAGE_ERROR, or ** a TESTSERVER_MESSAGE_SYNCREPLY. If the latter, it includes all ** transactions it currently has that the follower node does not. ** Either way, the leader then closes the socket connection. ** ** * The follower makes N connections to the leader, where N is the ** number of jobs the leader has running. It sends a ** TESTSERVER_MESSAGE_SUB message on each connection. ** ** * The follower then makes another connection to the leader, sending ** a TESTSERVER_MESSAGE_SYNC message. ** ** * The follower applies all journal entries received. */ #define TESTSERVER_MESSAGE_SYNC 1 #define TESTSERVER_MESSAGE_SUB 2 #define TESTSERVER_MESSAGE_ERROR 3 #define TESTSERVER_MESSAGE_SYNCREPLY 4 #define TESTSERVER_MESSAGE_SUBREPLY 5 /* ** Atomic read/write operators for the various integer types. */ #define TestAtomicStore(PTR,VAL) __atomic_store_n((PTR),(VAL), __ATOMIC_SEQ_CST) #define TestAtomicLoad(PTR) __atomic_load_n((PTR), __ATOMIC_SEQ_CST) /* ** Parameter zCmd should be the name of a Tcl database handle command ** created by the [sqlite3] command. This function attempts to extract the ** database handle and return it via output variable (*ppDb). If succesful, ** TCL_OK is returned. Otherwise, (*ppDb) is set to NULL, an error message ** left in interp, and TCL_ERROR returned. */ int getDbPointer(Tcl_Interp *interp, const char *zCmd, sqlite3 **ppDb); /* ** From tclsqlite.c and test_hct.c - these add the SQLite + hctree interfaces ** to the Tcl interpreter passed as the only argument. */ int Sqlite3_Init(Tcl_Interp *); int SqliteHctTest_Init(Tcl_Interp *); typedef struct TestServerJob TestServerJob; typedef struct TestServer TestServer; /* ** General buffer type. Used by testBufferAppend(), recvDataBuf() and others. */ typedef struct TestBuffer TestBuffer; struct TestBuffer { u8 *aBuf; int nBuf; int nAlloc; }; /* ** Wrapper around a socket fd that buffers writes. */ typedef struct TestSocket TestSocket; struct TestSocket { int fd; TestBuffer buf; }; /* ** Each Tcl job (configured with the [testserver job] method) is represented ** by an instance of the following structure. Stored in the ** TestServerJob.aJob[] array. */ struct TestServerJob { TestServer *pServer; /* Server object */ Tcl_Obj *pScript; /* Tcl script to run */ pthread_t tid; /* Thread-id of job thread */ int bDone; /* Set to true once job is finished */ char *zErr; /* ckalloc()'d error message (if any) */ pthread_mutex_t mutex; int nFollower; TestSocket aFollower[TESTSERVER_MAX_FOLLOWER]; }; /* ** nSyncThread: ** The number of threads that work on synchronization in follower ** nodes, including the main thread. So a value of 1 here means no ** extra threads are used for synchronization - just the main thread ** that also calls recv() to read from the socket. ** ** Set using the -syncthreads configuration option. */ struct TestServer { Tcl_Interp *interp; /* If error, leave error message here */ sqlite3 *db; char *zPath; /* Copy of DBFILE argument */ int fdListen; /* Listening socket */ i64 iTimeout; int nJob; TestServerJob aJob[TESTSERVER_MAX_JOB]; /* Fields set by the [configure] method */ int iPort; /* Tcp port to listen for connections on */ char *zHost; /* Tcp host to connect to */ int nSyncThread; /* Number of threads helping with sync */ int nSecond; /* Number of seconds to run for */ int bFollower; /* True for follower, false for leader */ int nSyncBytes; /* Bytes of data to quit syncing on */ }; /* ** Like strdup(). Result must be freed by ckfree(). */ static char *test_strdup(const char *zIn){ char *zRet = 0; if( zIn ){ int nByte = strlen(zIn) + 1; zRet = (char*)ckalloc(nByte); memcpy(zRet, zIn, nByte); } return zRet; } /* ** Write 32 and 64 bit integer values to the supplied buffer, respectively */ static void putInt32(u8 *aBuf, int val){ memcpy(aBuf, &val, 4); } static void putInt64(u8 *aBuf, i64 val){ memcpy(aBuf, &val, 8); } /* ** Read 32 and 64 bit integer values from the supplied buffer, respectively */ static int getInt32(u8 *aBuf){ int val; memcpy(&val, aBuf, 4); return val; } static i64 getInt64(const u8 *aBuf){ i64 val; memcpy(&val, aBuf, 8); return val; } /* ** Free the allocation managed by buffer pBuf. The structure pointed to by ** pBuf itself is not freed. */ static void testBufferFree(TestBuffer *pBuf){ ckfree(pBuf->aBuf); memset(pBuf, 0, sizeof(TestBuffer)); } /* ** Append nData bytes of data from buffer aData[] to buffer pBuf. */ static void testBufferAppend(TestBuffer *pBuf, const u8 *aData, int nData){ if( (pBuf->nBuf+nData)>pBuf->nAlloc ){ int nNew = (pBuf->nBuf + nData) * 2; pBuf->aBuf = (u8*)ckrealloc(pBuf->aBuf, nNew); pBuf->nAlloc = nNew; } memcpy(&pBuf->aBuf[pBuf->nBuf], aData, nData); pBuf->nBuf += nData; } /* ** If (*pRc) is other than TCL_OK when this is called, it is a no-op. If ** it is TCL_OK, then an attempt to send nData bytes of data from buffer ** aData[] on socket file-descriptor fd. If an error occurs, (*pRc) ** is set to TCL_ERROR before returning. */ static void sendData(int *pRc, int fd, const u8 *aData, int nData){ int nTotal = 0; while( *pRc==TCL_OK && nTotal<nData ){ ssize_t res = 0; res = send(fd, &aData[nTotal], nData-nTotal, 0); if( res<0 ){ *pRc = TCL_ERROR; }else{ nTotal += res; } } } /* ** TestSocket API: ** ** socketSendData() ** socketSendInt32() ** socketSendInt64() ** socketFlush() ** socketFlushAndFree() */ static void socketFlush(int *pRc, TestSocket *p){ if( *pRc==TCL_OK ){ sendData(pRc, p->fd, p->buf.aBuf, p->buf.nBuf); } p->buf.nBuf = 0; } static void socketSendData(int *pRc, TestSocket *p, const u8 *aData, int nData){ if( p->buf.nBuf+nData>p->buf.nAlloc ){ if( p->buf.nBuf==0 ){ p->buf.nAlloc = 1<<20; p->buf.aBuf = ckalloc(p->buf.nAlloc); }else{ socketFlush(pRc, p); } } testBufferAppend(&p->buf, aData, nData); } static void socketSendInt32(int *pRc, TestSocket *p, int val){ u8 aBuf[4]; putInt32(aBuf, val); socketSendData(pRc, p, aBuf, sizeof(aBuf)); } static void socketSendInt64(int *pRc, TestSocket *p, i64 val){ u8 aBuf[8]; putInt64(aBuf, val); socketSendData(pRc, p, aBuf, sizeof(aBuf)); } static void socketFlushAndFree(int *pRc, TestSocket *p){ socketFlush(pRc, p); testBufferFree(&p->buf); } /* ** Return the current time in ms since julian day 0.0. */ static i64 test_gettime(){ i64 ret = 0; sqlite3_vfs *pVfs = sqlite3_vfs_find(0); pVfs->xCurrentTimeInt64(pVfs, &ret); return ret; } /* ** Debugging output functions. */ static i64 hct_test_time_zero = 0; static void hct_test_debug_start(){ hct_test_time_zero = test_gettime(); } static void hct_test_debug(const char *zFmt, ...){ i64 tm = test_gettime() - hct_test_time_zero; va_list ap; char *z; va_start(ap, zFmt); z = sqlite3_vmprintf(zFmt, ap); va_end(ap); printf("hct_testserver: tm=%lldms %s\n", tm, z); sqlite3_free(z); } static void sendInt32(int *pRc, int fd, int val){ u8 aBuf[4]; putInt32(aBuf, val); sendData(pRc, fd, aBuf, sizeof(aBuf)); } static void sendInt64(int *pRc, int fd, i64 val){ u8 aBuf[8]; putInt64(aBuf, val); sendData(pRc, fd, aBuf, sizeof(aBuf)); } /* ** sqlite3_hct_journal_hook() callback for connections used by job threads. */ static int hctServerJournalHook( void *pCtx, i64 iCid, const char *zSchema, const void *pData, int nData, i64 iSchemaCid ){ int rc = SQLITE_OK; TestServerJob *pJob = (TestServerJob*)pCtx; int ii; int nSchema = strlen(zSchema) + 1; for(ii=0; ii<pJob->nFollower; ii++){ TestSocket *p = &pJob->aFollower[ii]; socketSendInt32(&rc, p, sizeof(i64)*2 + nSchema + nData); socketSendInt64(&rc, p, iCid); socketSendInt64(&rc, p, iSchemaCid); socketSendData(&rc, p, (const u8*)zSchema, nSchema); socketSendData(&rc, p, (const u8*)pData, nData); socketFlush(&rc, p); } return rc; } /* ** tclcmd: hct_testserver_timeout */ static int testTimeoutCmd( ClientData clientData, /* Unused */ Tcl_Interp *interp, /* The TCL interpreter */ int objc, /* Number of arguments */ Tcl_Obj *CONST objv[] /* Command arguments */ ){ TestServer *p = (TestServer*)clientData; int bRet = 0; i64 iTimeout = TestAtomicLoad(&p->iTimeout); if( objc!=1 ){ Tcl_WrongNumArgs(interp, 1, objv, ""); return TCL_ERROR; } if( iTimeout>0 && test_gettime()>=iTimeout ) bRet = 1; Tcl_SetObjResult(interp, Tcl_NewIntObj(bRet)); return TCL_OK; } /* ** The main() routine for a Tcl job thread. Within either a leader or ** follower node. */ static void *hctServerJobThread(void *pCtx){ TestServerJob *pJob = (TestServerJob*)pCtx; TestServer *p = pJob->pServer; Tcl_Interp *interp = 0; int rc = TCL_OK; Tcl_Obj *pOpenDb = 0; sqlite3 *db = 0; /* Create Tcl interpreter for this job */ interp = Tcl_CreateInterp(); Sqlite3_Init(interp); SqliteHctTest_Init(interp); /* Add the [hct_testserver_timeout] command. Returns true after the timer ** configured by the -seconds option expires. */ Tcl_CreateObjCommand( interp, "hct_testserver_timeout", testTimeoutCmd, (void*)p, 0 ); /* Open the database handle */ pOpenDb = Tcl_NewObj(); Tcl_IncrRefCount(pOpenDb); Tcl_ListObjAppendElement(interp, pOpenDb, Tcl_NewStringObj("sqlite3", -1)); Tcl_ListObjAppendElement(interp, pOpenDb, Tcl_NewStringObj("db", -1)); Tcl_ListObjAppendElement(interp, pOpenDb, Tcl_NewStringObj(p->zPath, -1)); rc = Tcl_EvalObjEx(interp, pOpenDb, 0); Tcl_DecrRefCount(pOpenDb); /* Register the journal hook with the database handle just created. Do ** this for both leader and follower nodes, even though the callback will ** never be invoked for the read-only connections used on follower nodes. */ if( rc==TCL_OK ){ getDbPointer(interp, "db", &db); sqlite3_hct_journal_hook(db, pCtx, hctServerJournalHook); } /* Evaluate the job script and delete the intepreter */ if( rc==TCL_OK ){ rc = Tcl_EvalObjEx(interp, pJob->pScript, 0); } if( rc!=SQLITE_OK ){ pJob->zErr = test_strdup(Tcl_GetStringResult(interp)); } Tcl_DeleteInterp(interp); /* Set "job done" flag before returning. */ TestAtomicStore(&pJob->bDone, 1); return 0; } /* ** Open a listening socket on port p->iPort, interface p->zHost. If ** successful, leave the file-descriptor in p->fdListen and return TCL_OK. ** Otherwise, if an error occurs, leave an error message in p->interp ** and return TCL_ERROR. */ static int hctServerListen(TestServer *p){ int fd = -1; struct sockaddr_in addr; int rc = TCL_OK; int reuse = 1; fd = socket(AF_INET, SOCK_STREAM, 0); if( fd<0 ){ Tcl_AppendResult(p->interp, "socket() failed", (char*)0); rc = TCL_ERROR; } setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)); if( rc==TCL_OK ){ memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons((uint16_t)p->iPort); addr.sin_addr.s_addr = inet_addr(p->zHost); } if( rc==TCL_OK && bind(fd, (struct sockaddr*)&addr, sizeof(addr))<0 ){ Tcl_AppendResult(p->interp, "bind() failed", (char*)0); rc = TCL_ERROR; } if( rc==TCL_OK && listen(fd, 128)<0 ){ Tcl_AppendResult(p->interp, "listen() failed", (char*)0); rc = TCL_ERROR; } if( rc==TCL_OK && fcntl(fd, F_SETFL, O_NONBLOCK)<0 ){ Tcl_AppendResult(p->interp, "fcntl() failed", (char*)0); rc = TCL_ERROR; } if( rc==TCL_OK ){ p->fdListen = fd; }else{ if( fd>=0 ) close(fd); } return rc; } /* ** This function is a no-op if (*pRc) is not TCL_OK when it is called. ** Otherwise, an attempt is made to read nBuf bytes of data from ** socket file-descriptor fd into buffer aBuf. If an error occurs, ** (*pRc) is set to TCL_ERROR before returning. */ static int recvData(int *pRc, int fd, u8 *aBuf, int nBuf){ if( *pRc==TCL_OK ){ int nRead = 0; while( nRead<nBuf ){ int n = recv(fd, &aBuf[nRead], nBuf-nRead, 0); if( n<=0 ){ *pRc = TCL_ERROR; return (n==0); }else{ nRead += n; } } } return 0; } /* ** These functions are no-ops if (*pRc) is not TCL_OK when it is called. ** Otherwise, an attempt is made to read a 32 or 64-bit integer from socket ** file-descriptor fd into output variable (*piVal). If an error occurs, ** (*pRc) is set to TCL_ERROR before returning. */ static int recvInt32(int *pRc, int fd, int *piVal){ u8 aBuf[4] = {0, 0, 0, 0}; assert( sizeof(*piVal)==sizeof(aBuf) ); if( recvData(pRc, fd, aBuf, sizeof(aBuf)) ) return 1; *piVal = getInt32(aBuf); return 0; } static void recvInt64(int *pRc, int fd, i64 *piVal){ u8 aBuf[8] = {0, 0, 0, 0, 0, 0, 0, 0}; assert( sizeof(*piVal)==sizeof(aBuf) ); recvData(pRc, fd, aBuf, sizeof(aBuf)); *piVal = getInt64(aBuf); } /* ** This function is a no-op if (*pRc) is not TCL_OK when it is called. ** Otherwise, an attempt is made to read nBuf bytes of data from ** socket file-descriptor fd into resizable buffer pBuf. If an error ** occurs, (*pRc) is set to TCL_ERROR before returning. */ static void recvDataBuf(int *pRc, int fd, int nData, TestBuffer *pBuf){ if( pBuf->nAlloc<nData ){ pBuf->aBuf = (u8*)ckrealloc(pBuf->aBuf, nData*2); pBuf->nAlloc = nData*2; } recvData(pRc, fd, pBuf->aBuf, nData); pBuf->nBuf = nData; } /* ** Finalize the statement handle passed as the second argument. If (*pRc) ** is set to SQLITE_OK, then set (*pRc) to the return value of ** sqlite3_finalize() before returning. */ static void testServerFinalize(int *pRc, sqlite3_stmt *pStmt){ int rc = sqlite3_finalize(pStmt); if( *pRc==SQLITE_OK ) *pRc = rc; } /* ** This function is a no-op if (*pRc) is not set to TCL_OK when this function ** is called. Assuming that it is, this function attempts to prepare SQL ** statement zSql against database handle TestServer.db. If successful, ** the statement handle is returned. Otherwise, (*pRc) is set to TCL_ERROR, ** an error message left in TestServer.interp and NULL returned. */ static sqlite3_stmt *testServerPrepare( int *pRc, TestServer *p, const char *zSql ){ sqlite3_stmt *pRet = 0; if( *pRc==TCL_OK ){ int rc = sqlite3_prepare_v2(p->db, zSql, -1, &pRet, 0); if( rc!=SQLITE_OK ){ const char *zErr = sqlite3_errmsg(p->db); Tcl_AppendResult(p->interp, "sqlite3_prepare_v2(): ", zErr, (char*)0); *pRc = TCL_ERROR; } } return pRet; } /* ** A new follower has connected and requested a sync. ** ** This function first verifies that the follower is compatible with ** the leader database (using the iCid and aXor values). If not, then ** it sends a TESTSERVER_MESSAGE_ERROR message to the follower. If ** the follower is compatible with the leader database, it sends a ** TESTSERVER_MESSAGE_DATA message. ** ** Return SQLITE_OK if successful, or an SQLite error code if an error ** occurs. It is not an error if the follower db is incompatible with ** the local leader db. If an error code is returned, then an English ** language error message may be left in TestServer.interp. */ static int hctServerDoSync( TestServer *p, /* Server (leader) object */ int newfd, /* File descriptor of new follower */ i64 iCid, /* Last CID in follower db */ const u8 *aXor /* XOR of all entries in follower db */ ){ const char *z1 = "SELECT hash FROM sqlite_hct_baseline"; const char *z2 = "SELECT hash FROM sqlite_hct_journal WHERE cid<=?"; const char *z3 = "SELECT cid, schema, data, schemacid FROM sqlite_hct_journal WHERE cid>?"; sqlite3_stmt *pStmt = 0; int rc = SQLITE_OK; u8 aHash[SQLITE_HCT_JOURNAL_HASHSIZE]; TestSocket sock; memset(&sock, 0, sizeof(sock)); sock.fd = newfd; memset(aHash, 0, sizeof(aHash)); pStmt = testServerPrepare(&rc, p, z1); if( rc==SQLITE_OK && sqlite3_step(pStmt)==SQLITE_ROW ){ const u8 *a = sqlite3_column_blob(pStmt, 0); memcpy(aHash, a, SQLITE_HCT_JOURNAL_HASHSIZE); } testServerFinalize(&rc, pStmt); pStmt = testServerPrepare(&rc, p, z2); if( rc==SQLITE_OK ) sqlite3_bind_int64(pStmt, 1, iCid); while( rc==SQLITE_OK && sqlite3_step(pStmt)==SQLITE_ROW ){ const u8 *a = sqlite3_column_blob(pStmt, 0); sqlite3_hct_journal_hash(aHash, a); } testServerFinalize(&rc, pStmt); if( memcmp(aHash, aXor, SQLITE_HCT_JOURNAL_HASHSIZE) ){ const char *zErr = "incompatible database version"; int nErr = strlen(zErr); socketSendInt32(&rc, &sock, TESTSERVER_MESSAGE_ERROR); socketSendInt32(&rc, &sock, nErr+1); socketSendData(&rc, &sock, (const u8*)zErr, nErr+1); }else{ i64 iThisCid = 0; int bLogged = 0; pStmt = testServerPrepare(&rc, p, z3); if( rc==SQLITE_OK ) sqlite3_bind_int64(pStmt, 1, iCid); socketSendInt32(&rc, &sock, TESTSERVER_MESSAGE_SYNCREPLY); socketSendInt32(&rc, &sock, p->nJob); while( rc==SQLITE_OK && sqlite3_step(pStmt)==SQLITE_ROW ){ int nSchema = sqlite3_column_bytes(pStmt, 1); int nData = sqlite3_column_bytes(pStmt, 2); int nSz = sizeof(i64) + sizeof(i64) + nSchema+1 + nData; iThisCid = sqlite3_column_int64(pStmt, 0); if( bLogged==0 ){ hct_test_debug("SYNC: first CID = %lld", iThisCid); bLogged = 1; } socketSendInt32(&rc, &sock, nSz); socketSendInt64(&rc, &sock, iThisCid); socketSendInt64(&rc, &sock, sqlite3_column_int64(pStmt, 3)); socketSendData(&rc, &sock, sqlite3_column_text(pStmt, 1), nSchema+1); socketSendData(&rc, &sock, sqlite3_column_blob(pStmt, 2), nData); } hct_test_debug("SYNC: last CID = %lld (rc=%d)", iThisCid, rc); socketSendInt32(&rc, &sock, 0); testServerFinalize(&rc, pStmt); } socketFlushAndFree(&rc, &sock); close(newfd); return rc; } /* ** Accept a new connection on the listening socket TestServer.fdListen and ** handle the message sent on it. */ static int hctServerAccept(TestServer *p){ int rc = TCL_OK; struct sockaddr_in address; socklen_t addrlen = sizeof(address); int newfd = accept(p->fdListen, (struct sockaddr*)&address, &addrlen); if( newfd>=0 ){ int eType; int flags = fcntl(newfd, F_GETFL, 0); fcntl(newfd, F_SETFL, flags & ~O_NONBLOCK); /* Read message */ recvInt32(&rc, newfd, &eType); if( eType==TESTSERVER_MESSAGE_SYNC ){ /* SYNC message. Read the CID and xor value. */ i64 iCid = 0; u8 aXor[SQLITE_HCT_JOURNAL_HASHSIZE]; recvInt64(&rc, newfd, &iCid); recvData(&rc, newfd, aXor, sizeof(aXor)); hctServerDoSync(p, newfd, iCid, aXor); } else if( eType==TESTSERVER_MESSAGE_SUB ){ /* SUB message */ int iJob = 0; recvInt32(&rc, newfd, &iJob); if( iJob<p->nJob ){ TestServerJob *pJob = &p->aJob[iJob]; pthread_mutex_lock(&pJob->mutex); if( pJob->nFollower<TESTSERVER_MAX_FOLLOWER ){ sendInt32(&rc, newfd, TESTSERVER_MESSAGE_SUBREPLY); pJob->aFollower[pJob->nFollower++].fd = newfd; newfd = -1; } pthread_mutex_unlock(&pJob->mutex); }else{ assert( 0 ); } }else{ /* An error - do nothing. */ assert( 0 ); } /* Unless this was a SUB, close the file-descriptor */ if( newfd>=0 ) close(newfd); }else{ assert( 0 ); } return 0; } /* ** Implementation of testserver method [run] for leader nodes. */ static int hctLeaderRun(TestServer *p){ int ii; int rc = TCL_OK; struct pollfd fds[1]; int res = 0; rc = sqlite3_hct_journal_setmode(p->db, SQLITE_HCT_JOURNAL_MODE_LEADER); if( rc!=SQLITE_OK ){ Tcl_AppendResult(p->interp, "sqlite3_hct_journal_setmode():", sqlite3_errmsg(p->db), (char*)0 ); return TCL_ERROR; } /* Listen for connections on the configured host and port. */ rc = hctServerListen(p); if( rc!=SQLITE_OK ) return rc; memset(fds, 0, sizeof(fds)); fds[0].fd = p->fdListen; fds[0].events = POLLIN; /* Start the tcl jobs */ for(ii=0; ii<p->nJob; ii++){ TestServerJob *pJob = &p->aJob[ii]; pthread_mutex_init(&pJob->mutex, 0); pthread_create(&pJob->tid, NULL, hctServerJobThread, (void*)pJob); } while( 1 ){ res = poll(fds, 1, 100); if( res<0 ){ Tcl_AppendResult(p->interp, "poll() failed", (char*)0); return TCL_ERROR; } assert( res==0 || res==1 ); if( res ){ rc = hctServerAccept(p); }else{ /* Check if all the user jobs are finished. If so, break out of ** the while( 1 ) loop. */ for(ii=0; ii<p->nJob; ii++){ if( TestAtomicLoad(&p->aJob[ii].bDone)==0 ) break; } if( ii==p->nJob ){ break; } } } /* Wait on the user jobs. */ for(ii=0; ii<p->nJob; ii++){ void *pVal = 0; TestServerJob *pJob = &p->aJob[ii]; pthread_join(pJob->tid, &pVal); if( pJob->zErr ){ hct_test_debug("Error in job %d: %s\n", ii, pJob->zErr); } } return TCL_OK; } /* ** Argument zFmt is a printf() style formatting string. Process it along ** with any trailing arguments and set the result of interpreter ** TestServer.interp to the results. e.g. ** ** testServerResult(p, "error in function: %d", rc); */ static void testServerResult(TestServer *p, const char *zFmt, ...){ va_list ap; char *z; va_start(ap, zFmt); z = sqlite3_vmprintf(zFmt, ap); va_end(ap); Tcl_ResetResult(p->interp); Tcl_SetObjResult(p->interp, Tcl_NewStringObj(z, -1)); sqlite3_free(z); } /* ** This function is called in follower nodes to establish a socket ** connection to the leader node - port TestServer.iPort of host ** TestServer.zHost. ** ** This function is a no-op if (*pRc) is other than TCL_OK when it is ** called. Otherwise, an attempt is made to establish the connection. ** If successful, the file descriptor is returned. Otherwise, (*pRc) ** is set to TCL_ERROR, and error message is left in TestServer.interp ** and a negative value returned. */ static int testServerConnect(int *pRc, TestServer *p){ int rc = *pRc; int fd = -1; if( rc==TCL_OK ){ struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(p->iPort); if( inet_pton(AF_INET, p->zHost, &addr.sin_addr)<=0 ){ testServerResult(p, "error in inet_pton()"); rc = TCL_ERROR; } if( rc==TCL_OK ){ fd = socket(AF_INET, SOCK_STREAM, 0); if( fd<0 ){ testServerResult(p, "error in socket()"); rc = TCL_ERROR; } } if( rc==SQLITE_OK && connect(fd, (struct sockaddr*)&addr, sizeof(addr))<0 ){ testServerResult(p, "error in connect()"); rc = TCL_ERROR; } if( rc!=TCL_OK && fd>=0 ){ close(fd); fd = -1; } } *pRc = rc; assert( rc!=TCL_OK || fd>=0 ); return fd; } /* ** Read the body of a TESTSERVER_MESSAGE_ERROR message from file-descriptor ** fd. The body of such a message consists of: ** ** * A 32-bit integer, N, and ** * N bytes of data containing a nul-terminated error message string. */ static char *recvErrorMsg(int *pRc, int fd){ char *aRet = 0; int nByte = 0; recvInt32(pRc, fd, &nByte); if( *pRc==TCL_OK ){ aRet = (char*)ckalloc(nByte); recvData(pRc, fd, (u8*)aRet, nByte); if( *pRc!=TCL_OK ){ ckfree(aRet); aRet = 0; } } return aRet; } /* ** This function is a no-op if (*pRc) is not TCL_OK when it is called. If ** it is, open a database connection to TestServer.zPath. If successful, ** return the new database handle. Otherwise, (*pRc) is set to TCL_ERROR, ** an error message is left in TestServer.interp and NULL returned. */ static sqlite3 *testServerOpenDb(int *pRc, TestServer *p){ int rc = *pRc; sqlite3 *db = 0; if( rc==SQLITE_OK ){ rc = sqlite3_open_v2(p->zPath, &db, SQLITE_OPEN_READWRITE, 0); if( rc!=SQLITE_OK ){ Tcl_AppendResult(p->interp, sqlite3_errmsg(db), (char*)0); sqlite3_close(db); db = 0; rc = TCL_ERROR; } *pRc = rc; } return db; } /* ** This is a wrapper around sqlite3_hct_journal_write(). It retries any ** attempted _write() that fails with SQLITE_BUSY. */ static int hctWriteWithRetry( sqlite3 *db, /* Database handle */ i64 iCid, /* CID of journal entry */ const char *zSchema, /* Schema modifications */ const u8 *aData, int nData, /* Data changes */ i64 iSchemaCid /* CID of required schema */ ){ int rc = SQLITE_OK; int nBusy = 0; while( 1 ){ rc = sqlite3_hct_journal_write(db, iCid, zSchema, aData, nData, iSchemaCid); if( rc!=SQLITE_BUSY ) break; nBusy++; if( nBusy>=10 ){ int nDelay = (nBusy-9)*(nBusy-9)*39; usleep( MIN(nDelay, 50000) ); } if( (nBusy % 200)==0 ){ hct_test_debug("warning - CID %lld failed %d times (%s)\n", iCid, nBusy, sqlite3_errmsg(db) ); } } assert( rc==SQLITE_OK ); return rc; } /* ** The following data structures are used while syncing. */ typedef struct TestFollowerChunk TestFollowerChunk; typedef struct TestFollower TestFollower; typedef struct TestFollowerThread TestFollowerThread; #define SYNC_BYTES_PER_CHUNK ((1<<20) - 64) /* ** When a SYNCREPLY message is being received, the data is read into a ** linked list of the following buffers. */ struct TestFollowerChunk { TestFollowerChunk *pNext; /* Next chunk in SYNCREPLY message */ int nRef; /* Current number of users of this link */ int nChunk; /* Valid size of aChunk[] in bytes */ u8 aChunk[SYNC_BYTES_PER_CHUNK];/* Payload */ }; /* ** Each follower thread - one that exists only to process the SUBREPLY ** data of a single connection to the leader - is represented by an ** instance of this structure. */ struct TestFollowerThread { TestFollower *pFollower; pthread_t tid; /* Thread id of this thread */ sqlite3 *db; /* Database handle for journal_write() */ int fd; /* Socket connection to leader node */ }; /* ** Object allocated on the stack of the main follower thread. ** ** pFirst, pLast: ** Linked list of chunks of SYNCREPLY data. pFirst is the first (oldest), ** and pLast points to the last (newest). Protected by TestFollower.mutex. ** ** nChunkTotal: ** Total data stored on all chunks in pFirst/pLast linked list. In bytes. ** ** This variable is only accessed by the main follower thread, so it ** is not protected by any mutex. ** ** iNext: ** Offset of the next serialized journal entry within the linked list ** of SYNCREPLY data that no thread has yet claimed to work on. The ** offset is a byte offset from the start of the data (i.e. offset 0 ** is the first byte on chunk TestFollower.pFirst). ** ** iEof: ** This variable is set only when the last of the SYNCREPLY data for ** the last synchronization request has been added to the pFirst/pLast ** list. At that point it is set to the offset of the first non-contiguous ** entry in the received data (i.e. the entry following the first hole). ** All threads but the main follower thread (the same one that calls recv() ** to receive the data) stop working on the synchronization data once ** this point is reached. Such threads either go on to handle SUBREPLY data ** on a socket of their own, or else exit altogether. */ struct TestFollower { TestServer *pServer; /* Pointer back to testserver object */ pthread_mutex_t mutex; pthread_cond_t cond; TestFollowerChunk *pFirst; TestFollowerChunk *pLast; i64 nChunkTotal; i64 iNext; i64 iEof; TestFollowerThread aThread[TESTSERVER_MAX_JOB]; }; /* ** Equivalent to an atomic version of: ** ** if( *pPtr==iOld ){ ** *pPtr = iNew; ** return 1; ** } ** return 0; */ static int testBoolCAS64(i64 *pPtr, i64 iOld, i64 iNew){ return (int)__sync_bool_compare_and_swap(pPtr, iOld, iNew); } /* ** Buffer aChange[], size nChange bytes, contains a journal entry serialized ** in the manner of a SYNCREPLY or SUBREPLY message. i.e. ** ** * A 64-bit "cid" value, ** * A 64-bit "schemacid" value, ** * A nul-terminated utf-8 "schema" string, ** * A blob of "data" for the entry. ** ** This function decodes the journal entry and attempts to _write() it to ** database handle db. SQLITE_OK is returned if successful or an SQLite ** error code if not. */ static int hctWriteSerialWithRetry(sqlite3 *db, const u8 *aChange, int nChange){ i64 iCid = getInt64(&aChange[0]); i64 iSchemaCid = getInt64(&aChange[8]); const char *zSchema = (const char*)&aChange[16]; const u8 *aData = (const u8*)&zSchema[strlen(zSchema)+1]; int nData = &aChange[nChange] - aData; return hctWriteWithRetry(db, iCid, zSchema, aData, nData, iSchemaCid); } /* ** Signal the condition variable to wake up any sleeping follower threads. */ static void hctFollowerSignal(TestFollower *pFollower){ pthread_mutex_lock(&pFollower->mutex); pthread_cond_broadcast(&pFollower->cond); pthread_mutex_unlock(&pFollower->mutex); } /* ** Called by a thread to help with applying journal entries received ** as part of a SYNCREPLY message. ** ** If bReturn is true, then this function returns as soon as there is ** no more SYNCREPLY data to process. Or, if bReturn is false, the thread ** blocks, waiting for more data. In this case such threads are woken ** up by signalling condition variable TestFollower.cond. */ static int hctFollowerHelpWithSync( TestFollower *pFollower, /* Follower context */ sqlite3 *db, /* Db handle to apply changes to */ int bReturn /* True to return (not block) when no data */ ){ int rc = TCL_OK; i64 iOff = 0; i64 iStart = 0; TestFollowerChunk *pChunk = 0; TestBuffer buf = {0, 0, 0}; while( rc==TCL_OK ){ TestFollowerChunk *pNext = 0; i64 iEof = 0; /* Grab the next TestFollowerChunk to work on. */ pthread_mutex_lock(&pFollower->mutex); while( 1 ){ iEof = pFollower->iEof; if( bReturn==0 && iEof>=0 && iOff>=pFollower->iEof ) break; pNext = (pChunk ? pChunk->pNext : pFollower->pFirst); if( pNext || bReturn ) break; /* Need to wait for the next chunk. Block on the condition variable. */ pthread_cond_wait(&pFollower->cond, &pFollower->mutex); } if( pNext ) pNext->nRef++; if( pChunk ) pChunk->nRef--; pthread_mutex_unlock(&pFollower->mutex); pChunk = pNext; assert( iOff>=iStart || (buf.nBuf==0 && pChunk==0) ); assert( rc==TCL_OK ); if( buf.nBuf>0 ){ int nCopy = MIN(pChunk->nChunk, iOff - iStart); assert( pChunk ); assert( iOff>iStart ); testBufferAppend(&buf, pChunk->aChunk, nCopy); if( iStart+nCopy==iOff ){ rc = hctWriteSerialWithRetry(db, buf.aBuf, buf.nBuf); buf.nBuf = 0; } } /* If there is no next chunk, exit the outer loop here. */ if( pChunk==0 || rc!=TCL_OK ) break; while( rc==TCL_OK && iOff<(iStart+pChunk->nChunk) ){ /* Read the next nByte value. The 12 bytes that make up this and ** the CID field are guaranteed to be stored contiguously within ** a single chunk. */ int nByte = getInt32(&pChunk->aChunk[iOff - iStart]); /* See if this thread can apply the current change */ if( iOff==pFollower->iNext && testBoolCAS64(&pFollower->iNext, iOff, iOff+4+nByte) ){ /* This thread is responsible for applying the current change. ** There are two possibilies - either the entire change fits ** on this chunk, or it does not. If it does, apply the change ** right now. Otherwise, copy the part of the change on this ** chunk into a buffer and apply the change on the next iteration ** of the outer loop - after grabbing the next chunk. */ const u8 *aChange = &pChunk->aChunk[iOff - iStart + 4]; if( (iOff+4+nByte)<=(iStart+pChunk->nChunk) ){ /* Case 1 - entire change is right here. */ rc = hctWriteSerialWithRetry(db, aChange, nByte); }else{ /* Case 2 - have to buffer the start of this one. */ int nHave = &pChunk->aChunk[pChunk->nChunk] - aChange; assert( buf.nBuf==0 ); testBufferAppend(&buf, aChange, nHave); } /* If this change was the last before EOF, kick the condition variable ** to start any mirror threads that were not part of the sync */ if( (iOff + 4 + nByte)==iEof ){ hctFollowerSignal(pFollower); } } iOff += (4 + nByte); if( bReturn==0 && iEof>=0 && iOff>=iEof ) break; } iStart += pChunk->nChunk; assert( iOff>=iStart || (bReturn==0 && iEof>=0 && iOff>=iEof) ); } hctFollowerSignal(pFollower); testBufferFree(&buf); return rc; } /* ** Called by the main follower thread to process the body of a SYNCREPLY ** message. */ static void hctFollowerSyncReply( int *pRc, /* IN/OUT: error code */ TestFollower *pFollower, int fd, int bFinalSync, /* True if this is final SYNCREPLY */ i64 iSentCid /* CID sent in SYNC message */ ){ TestServer *p = pFollower->pServer; int rc = *pRc; int bRecvDone = 0; /* True to break of out recv() loop */ i64 iLocalOff = pFollower->nChunkTotal; i64 iPrevCid = iSentCid; i64 iFirstNonContiguous = -1; int nCarry = 0; hct_test_debug("syncreply %d start", bFinalSync); while( rc==TCL_OK && bRecvDone==0 ){ /* Allocate a new TestFollowerChunk link */ TestFollowerChunk *pNew = (TestFollowerChunk*)ckalloc(sizeof(*pNew)); memset(pNew, 0, sizeof(TestFollowerChunk)); if( nCarry>0 ){ TestFollowerChunk *pLast = pFollower->pLast; assert( pLast ); memcpy(pNew->aChunk, &pLast->aChunk[pLast->nChunk], nCarry); pNew->nChunk = nCarry; nCarry = 0; } while( pNew->nChunk<SYNC_BYTES_PER_CHUNK ){ int nMaxRead = SYNC_BYTES_PER_CHUNK - pNew->nChunk; ssize_t res = recv(fd, &pNew->aChunk[pNew->nChunk], nMaxRead, 0); if( res<=0 ){ if( res<0 ){ testServerResult(p, "error in recv()"); rc = TCL_ERROR; } bRecvDone = 1; break; } pNew->nChunk += res; } if( rc==SQLITE_OK && pNew->nChunk>0 ){ /* Check if this chunk contains the first non-contiguous CID value. */ i64 iOff = 0; assert( iLocalOff>=pFollower->nChunkTotal ); iOff = iLocalOff - pFollower->nChunkTotal; while( iOff<pNew->nChunk ){ if( (pNew->nChunk-iOff)<12 ){ nCarry = pNew->nChunk - iOff; pNew->nChunk = iOff; }else{ int nThis = getInt32(&pNew->aChunk[iOff]); i64 iCid = getInt64(&pNew->aChunk[iOff+4]); if( iCid!=iPrevCid+1 && iFirstNonContiguous<0 ){ iFirstNonContiguous = iOff + pFollower->nChunkTotal; } iOff += (4 + nThis); iPrevCid = iCid; } } iLocalOff = pFollower->nChunkTotal + iOff; } if( iFirstNonContiguous>=0 && bFinalSync==0 ){ bRecvDone = 1; pNew->nChunk = iFirstNonContiguous - pFollower->nChunkTotal; } /* If all went well and data was received, link the new chunk into the ** list and hit the condition variable to restart paused sync threads. ** Or, if an error occurred or there was no data left to receive, free ** the chunk allocated above. */ if( rc==TCL_OK && pNew->nChunk>0 ){ pthread_mutex_lock(&pFollower->mutex); pNew->nRef = 1; assert( (pFollower->pLast==0)==(pFollower->pFirst==0) ); if( pFollower->pLast==0 ){ pFollower->pFirst = pNew; }else{ pFollower->pLast->pNext = pNew; } pFollower->pLast = pNew; pFollower->nChunkTotal += pNew->nChunk; if( bFinalSync && iFirstNonContiguous>=0 ){ pFollower->iEof = iFirstNonContiguous; } pthread_cond_broadcast(&pFollower->cond); pthread_mutex_unlock(&pFollower->mutex); }else{ ckfree(pNew); } } hct_test_debug( "syncreply %d received (%lld bytes total, %lld processed)", bFinalSync, pFollower->nChunkTotal, TestAtomicLoad(&pFollower->iNext) ); pthread_mutex_lock(&pFollower->mutex); if( bFinalSync && pFollower->iEof<0 ){ pFollower->iEof = iLocalOff; } pthread_mutex_unlock(&pFollower->mutex); if( rc==SQLITE_OK ){ hctFollowerSignal(pFollower); rc = hctFollowerHelpWithSync(pFollower, p->db, 1); } hct_test_debug("syncreply %d applied", bFinalSync); *pRc = rc; } /* ** This function is used in follower nodes to determine the current state ** of the database. Specifically, to discover: ** ** * The last CID before the first hole in the journal, and ** * The xor/hash value corresponding to that CID. */ static void hctFollowerGetVersion( int *pRc, /* IN/OUT: Error code */ TestServer *p, /* Follower testserver object */ i64 *piCid, /* OUT: Last contiguous CID value */ u8 *aHash /* OUT: Xor/hash value for (*piCid) */ ){ const char *z1 = "SELECT cid, hash FROM sqlite_hct_baseline"; const char *z2 = "SELECT cid, hash FROM sqlite_hct_journal ORDER BY cid ASC"; int rc = *pRc; sqlite3_stmt *pStmt = 0; i64 iCid = 0; /* Determine the current CID and hash values for the follower database. ** This is easy, as sqlite3_hct_journal_rollback(MAXIMUM) has already ** been called on the database. */ memset(aHash, 0, SQLITE_HCT_JOURNAL_HASHSIZE); pStmt = testServerPrepare(&rc, p, z1); if( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pStmt) ){ int nBaseHash = sqlite3_column_bytes(pStmt, 1); if( nBaseHash!=SQLITE_HCT_JOURNAL_HASHSIZE ){ testServerResult(p, "sqlite_hct_baseline.hash is wrong size"); rc = TCL_ERROR; }else{ const u8 *aBaseHash = sqlite3_column_blob(pStmt, 1); memcpy(aHash, aBaseHash, SQLITE_HCT_JOURNAL_MODE_FOLLOWER); iCid = sqlite3_column_int64(pStmt, 0); } } testServerFinalize(&rc, pStmt); pStmt = testServerPrepare(&rc, p, z2); while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pStmt) ){ int nJrnlHash = sqlite3_column_bytes(pStmt, 1); if( nJrnlHash!=SQLITE_HCT_JOURNAL_HASHSIZE ){ testServerResult(p, "sqlite_hct_journal.hash is wrong size"); rc = TCL_ERROR; }else{ const u8 *aJrnlHash = sqlite3_column_blob(pStmt, 1); i64 iThis = sqlite3_column_int64(pStmt, 0); if( iThis!=iCid+1 && iCid!=0 ) break; sqlite3_hct_journal_hash(aHash, aJrnlHash); iCid = iThis; } } testServerFinalize(&rc, pStmt); *piCid = iCid; *pRc = rc; } /* ** Sync with the leader node. Return the number of jobs running on the ** leader. Specifically, this function: ** ** 1) Establishes a socket connection to the leader. ** 2) Sends a SYNC message, specifying the values iCid and aHash. ** 3) Processes the SYNCREPLY message. Waiting follower threads are ** signaled to help with this. ** 4) Closes the socket and returns. ** ** This function is a no-op if (*pRc) is other than TCL_OK when it is ** called. Or, if an error occurs within this function, an error message is ** left in pFollower->interp and (*pRc) set to TCL_ERROR. */ static int hctFollowerDoSync( int *pRc, TestFollower *pFollower, int bFinalSync, i64 iCid, const u8 *aHash ){ TestServer *p = pFollower->pServer; int rc = *pRc; int fd = -1; int eReply = -1; int nJob = 0; fd = testServerConnect(&rc, p); sendInt32(&rc, fd, TESTSERVER_MESSAGE_SYNC); sendInt64(&rc, fd, iCid); sendData(&rc, fd, aHash, SQLITE_HCT_JOURNAL_HASHSIZE); recvInt32(&rc, fd, &eReply); if( rc==TCL_OK ){ if( eReply==TESTSERVER_MESSAGE_SYNCREPLY ){ recvInt32(&rc, fd, &nJob); hctFollowerSyncReply(&rc, pFollower, fd, bFinalSync, iCid); }else if( eReply==TESTSERVER_MESSAGE_ERROR ){ char *zErr = recvErrorMsg(&rc, fd); testServerResult(p, "error from leader: %s", zErr); ckfree(zErr); rc = TCL_ERROR; }else{ testServerResult(p, "unexpected reply type to SYNC - %d (expected %d or %d)", eReply, TESTSERVER_MESSAGE_SYNCREPLY, TESTSERVER_MESSAGE_ERROR ); rc = TCL_ERROR; } } if( fd>=0 ) close(fd); *pRc = rc; return nJob; } /* ** This is the main() routine for follower threads. A follower thread ** may do two things: ** ** 1) help with synchronization until the last contiguous journal entry ** received from the leader has been applied, then ** 2) process entries that are part of a SUBREPLY received from a job ** thread on a dedicated socket connection. */ static void *hctFollowerThread(void *pCtx){ TestFollowerThread *pThread = (TestFollowerThread*)pCtx; TestFollower *pFollower = pThread->pFollower; int iThread = pThread - pFollower->aThread; /* If this thread should help with synchronization, invoke the ** hctFollowerHelpWithSync() function now. It will not return until ** it is time to start processing messages on the SUB sockets. */ if( iThread<(pFollower->pServer->nSyncThread-1) ){ int rc = hctFollowerHelpWithSync(pFollower, pThread->db, 0); assert( rc==SQLITE_OK ); } /* Wait until it is time to start processing messages on the SUB ** sockets. Really, this is only required for threads that did not ** participate in synchronization, but it doesn't hurt to check ** for all threads. */ pthread_mutex_lock(&pFollower->mutex); while( pFollower->iEof<0 || pFollower->iNext<pFollower->iEof ){ pthread_cond_wait(&pFollower->cond, &pFollower->mutex); } pthread_mutex_unlock(&pFollower->mutex); /* If a SUB socket has been assigned to this thread, start reading ** and applying changes from it now. Otherwise, the thread exits. */ if( pThread->fd>=0 ){ int rc = SQLITE_OK; int fd = pThread->fd; sqlite3 *db = pThread->db; TestBuffer buf = {0, 0, 0}; while( rc==SQLITE_OK ){ /* Read the next change from the socket. If the socket is at EOF, ** break out of the loop. */ int nThis = 0; if( recvInt32(&rc, fd, &nThis) ){ rc = SQLITE_OK; break; } recvDataBuf(&rc, fd, nThis, &buf); /* Assuming no error occurred, write the change into the db. */ if( rc==SQLITE_OK ){ i64 iCid = getInt64(&buf.aBuf[0]); i64 iSchemaCid = getInt64(&buf.aBuf[8]); const char *zSchema = (const char*)&buf.aBuf[16]; const u8 *aData = (const u8*)&zSchema[strlen(zSchema)+1]; int nData = nThis - (aData - buf.aBuf); rc = hctWriteWithRetry(db, iCid, zSchema, aData, nData, iSchemaCid); } } hct_test_debug("thread fd=%d finished, rc=%d\n", fd, rc); testBufferFree(&buf); } return 0; } /* ** Launch follower thread iThread. Threads are numbered starting from 0. */ static int hctFollowerThreadInit(TestFollower *p, int iThread){ int rc = SQLITE_OK; TestFollowerThread *pT = &p->aThread[iThread]; pT->pFollower = p; pT->db = testServerOpenDb(&rc, p->pServer); pT->fd = -1; if( rc==SQLITE_OK ){ pthread_create(&pT->tid, NULL, hctFollowerThread, (void*)pT); } return rc; } /* ** Implementation of [run] command for follower nodes. */ static int hctFollowerRun(TestServer *p){ int rc = SQLITE_OK; i64 iCid = 0; u8 aHash[SQLITE_HCT_JOURNAL_HASHSIZE]; int nJob = 0; int ii; i64 nPrevSyncData = ((i64)1 << 60); TestFollower fol; hctFollowerInit(&fol, p); memset(&fol, 0, sizeof(fol)); pthread_mutex_init(&fol.mutex, 0); pthread_cond_init(&fol.cond, 0); fol.pServer = pServer; fol.iEof = -1; /* If extra sync threads are configured, launch them now. They will wait ** on condition variable "fol.cond" until there is data to process */ for(ii=0; ii<(p->nSyncThread-1); ii++){ hctFollowerThreadInit(&fol, ii); } while( rc==TCL_OK ){ i64 nTotal = fol.nChunkTotal; hct_test_debug("start GetVersion()"); hctFollowerGetVersion(&rc, p, &iCid, aHash); hct_test_debug("end GetVersion()"); nJob = hctFollowerDoSync(&rc, &fol, 0, iCid, aHash); nTotal = fol.nChunkTotal - nTotal; hct_test_debug("recv %lld bytes of SYNCREPLY data", nTotal); if( nTotal>nPrevSyncData || nTotal<p->nSyncBytes || p->nSyncBytes==0 ){ break; } nPrevSyncData = nTotal; } /* If there are more jobs on the leader than there are sync threads, ** launch some more threads now. */ for(/* no-op */; ii<nJob; ii++){ hctFollowerThreadInit(&fol, ii); } /* Start the tcl jobs */ for(ii=0; ii<p->nJob; ii++){ TestServerJob *pJob = &p->aJob[ii]; pthread_mutex_init(&pJob->mutex, 0); pthread_create(&pJob->tid, NULL, hctServerJobThread, (void*)pJob); } /* For each TestFollowerThread that will handle mirroring a leader ** thread, establish a connection to the leader. */ for(ii=0; ii<nJob && rc==TCL_OK; ii++){ int fd = -1; int eReply = 0; fd = testServerConnect(&rc, p); sendInt32(&rc, fd, TESTSERVER_MESSAGE_SUB); sendInt32(&rc, fd, ii); recvInt32(&rc, fd, &eReply); if( eReply==TESTSERVER_MESSAGE_SUBREPLY ) { fol.aThread[ii].fd = fd; fd = -1; }else if( eReply==TESTSERVER_MESSAGE_ERROR ){ char *zErr = recvErrorMsg(&rc, fd); testServerResult(p, "error from leader: %s", zErr); ckfree(zErr); rc = TCL_ERROR; }else{ testServerResult(p, "unexpected reply type to SUB - %d (expected %d or %d)", eReply, TESTSERVER_MESSAGE_SUBREPLY, TESTSERVER_MESSAGE_ERROR ); rc = TCL_ERROR; } if( fd>=0 ) close(fd); } hctFollowerGetVersion(&rc, p, &iCid, aHash); nJob = hctFollowerDoSync(&rc, &fol, 1, iCid, aHash); /* Wait on the threads handling sockets linked to jobs on the leader. */ for(ii=0; ii<MAX(p->nSyncThread-1, nJob); ii++){ void *pDummy = 0; TestFollowerThread *pT = &fol.aThread[ii]; pthread_join(pT->tid, &pDummy); } if( p->iTimeout==0 ){ TestAtomicStore(&p->iTimeout, 1); } /* Wait on local job threads. */ for(ii=0; ii<p->nJob; ii++){ void *pVal = 0; TestServerJob *pJob = &p->aJob[ii]; pthread_join(pJob->tid, &pVal); } return rc; } /* ** Configure the TestServer object according to the objc arguments in ** the objv[] array. The array may be extracted from the arguments passed ** to either the testserver [configure] method, or from a [hct_testserver] ** constructor. */ static int testServerConfigure( TestServer *p, /* Test-server object */ int objc, /* Number of arguments */ Tcl_Obj *CONST objv[] /* Command arguments */ ){ struct CfgOpt { const char *zName; int iType; /* 0==text, 1==integer, 2==boolean */ } aCfg[] = { { "-port", 1 }, /* 0 */ { "-host", 0 }, /* 1 */ { "-syncthreads", 1 }, /* 2 */ { "-seconds", 1 }, /* 3 */ { "-follower", 2 }, /* 4 */ { "-syncbytes", 1 }, /* 5 */ { 0, 0 } }; int rc = TCL_OK; int ii; for(ii=0; ii<objc; ii+=2){ int iCfg = 0; int d = 0; rc = Tcl_GetIndexFromObjStruct( p->interp, objv[ii], aCfg, sizeof(aCfg[0]), "OPTION", 0, &iCfg ); if( rc!=TCL_OK ) return rc; switch( aCfg[iCfg].iType ){ case 1: if( Tcl_GetIntFromObj(p->interp, objv[ii+1], &d) ) return TCL_ERROR; break; case 2: if( Tcl_GetBooleanFromObj(p->interp, objv[ii+1], &d) ) return TCL_ERROR; break; default: assert( iCfg==0 ); break; } } if( (objc%2)!=0 ){ Tcl_AppendResult(p->interp, "option requires an argument: ", Tcl_GetString(objv[objc-1]), (char*)0 ); return TCL_ERROR; } for(ii=0; ii<objc; ii+=2){ int iCfg = 0; rc = Tcl_GetIndexFromObjStruct( p->interp, objv[ii], aCfg, sizeof(aCfg[0]), "OPTION", 0, &iCfg ); switch( iCfg ){ case 0: assert( 0==strcmp(aCfg[iCfg].zName, "-port") ); { Tcl_GetIntFromObj(p->interp, objv[ii+1], &p->iPort); break; } case 1: assert( 0==strcmp(aCfg[iCfg].zName, "-host") ); { ckfree(p->zHost); p->zHost = test_strdup(Tcl_GetString(objv[ii])); break; } case 2: assert( 0==strcmp(aCfg[iCfg].zName, "-syncthreads") ); { int nThread = 0; Tcl_GetIntFromObj(p->interp, objv[ii+1], &nThread); nThread = MAX(nThread, 1); nThread = MIN(nThread, TESTSERVER_MAX_FOLLOWER); p->nSyncThread = nThread; break; } case 3: assert( 0==strcmp(aCfg[iCfg].zName, "-seconds") ); { Tcl_GetIntFromObj(p->interp, objv[ii+1], &p->nSecond); break; } case 4: assert( 0==strcmp(aCfg[iCfg].zName, "-follower") ); { Tcl_GetBooleanFromObj(p->interp, objv[ii+1], &p->bFollower); break; } case 5: assert( 0==strcmp(aCfg[iCfg].zName, "-syncbytes") ); { Tcl_GetIntFromObj(p->interp, objv[ii+1], &p->nSyncBytes); break; } } } return TCL_OK; } /* ** tclcmd: TESTSERVERCMD SUBCMD ...ARGS... ** ** The implementation of the commands returned by [hct_testserver]. */ static int hctServerCmd( ClientData clientData, /* Unused */ Tcl_Interp *interp, /* The TCL interpreter */ int objc, /* Number of arguments */ Tcl_Obj *CONST objv[] /* Command arguments */ ){ const char *azSub[] = { "configure", /* 0 */ "job", /* 1 */ "run", /* 2 */ "delete", /* 3 */ 0, }; TestServer *p = (TestServer*)clientData; int rc = TCL_OK; int iSub = 0; if( objc<2 ){ Tcl_WrongNumArgs(interp, 1, objv, "SUBCMD ..."); return TCL_ERROR; } rc = Tcl_GetIndexFromObj(interp, objv[1], azSub, "SUBCMD", 0, &iSub); if( rc!=TCL_OK ) return rc; switch( iSub ){ case 0: assert( 0==strcmp(azSub[iSub], "configure") ); { rc = testServerConfigure(p, objc-2, &objv[2]); break; }; case 1: assert( 0==strcmp(azSub[iSub], "job") ); { TestServerJob *pJob = &p->aJob[p->nJob]; if( objc!=3 ){ Tcl_WrongNumArgs(interp, 2, objv, "SCRIPT"); return TCL_ERROR; } pJob->pScript = Tcl_DuplicateObj(objv[2]); pJob->pServer = p; p->nJob++; break; }; case 2: assert( 0==strcmp(azSub[iSub], "run") ); { hct_test_debug_start(); if( p->nSecond ){ TestAtomicStore(&p->iTimeout, test_gettime() + 1000*(p->nSecond)); } if( p->bFollower ){ rc = hctFollowerRun(p); }else{ rc = hctLeaderRun(p); } break; }; case 3: assert( 0==strcmp(azSub[iSub], "delete") ); { Tcl_DeleteCommand(interp, Tcl_GetStringFromObj(objv[0], 0)); break; }; default: assert( 0 ); break; } return rc; } /* ** Destructor for an object created by [hct_testserver]. */ static void hctServerDel(void *pCtx){ TestServer *p = (TestServer*)pCtx; if( p->fdListen>=0 ) close(p->fdListen); ckfree(p->zPath); ckfree(p->zHost); ckfree(p); } /* ** tclcmd: hct_testserver NAME DBFILE ?OPTIONS? ** ** Create a new testserver object. */ static int test_hct_testserver( ClientData clientData, /* Unused */ Tcl_Interp *interp, /* The TCL interpreter */ int objc, /* Number of arguments */ Tcl_Obj *CONST objv[] /* Command arguments */ ){ const char *zCmd = 0; TestServer *p = 0; int rc = TCL_OK; if( objc<3 ){ Tcl_WrongNumArgs(interp, 1, objv, "NAME DBFILE ?OPTIONS?"); return TCL_ERROR; } zCmd = Tcl_GetString(objv[1]); p = (TestServer*)ckalloc(sizeof(TestServer)); memset(p, 0, sizeof(TestServer)); p->zPath = test_strdup(Tcl_GetString(objv[2])); p->iPort = TESTSERVER_DEFAULT_PORT; p->zHost = test_strdup("127.0.0.1"); p->interp = interp; p->fdListen = -1; p->db = testServerOpenDb(&rc, p); if( rc==SQLITE_OK ){ rc = sqlite3_hct_journal_setmode(p->db, SQLITE_HCT_JOURNAL_MODE_FOLLOWER); } if( rc==SQLITE_OK ){ rc = sqlite3_hct_journal_rollback(p->db, SQLITE_HCT_ROLLBACK_MAXIMUM); } if( rc==SQLITE_OK ){ rc = testServerConfigure(p, objc-3, &objv[3]); } if( rc==TCL_OK ){ Tcl_CreateObjCommand(interp, zCmd, hctServerCmd, (void*)p, hctServerDel); Tcl_SetObjResult(interp, objv[1]); }else{ hctServerDel((void*)p); } return rc; } /* ** Register commands with the TCL interpreter. */ int SqliteHctServerTest_Init(Tcl_Interp *interp){ struct TestCmd { const char *zName; Tcl_ObjCmdProc *x; } aCmd[] = { { "hct_testserver", test_hct_testserver }, }; int ii = 0; for(ii=0; ii<sizeof(aCmd)/sizeof(aCmd[0]); ii++){ Tcl_CreateObjCommand(interp, aCmd[ii].zName, aCmd[ii].x, 0, 0); } return TCL_OK; }