Index: src/pager.c ================================================================== --- src/pager.c +++ src/pager.c @@ -7745,13 +7745,10 @@ return sqlite3WalFramesize(pPager->pWal); } #endif #ifdef SQLITE_SERVER_EDITION -int sqlite3PagerIsServer(Pager *pPager){ - return pagerIsServer(pPager); -} int sqlite3PagerPagelock(Pager *pPager, Pgno pgno, int bWrite){ if( pagerIsServer(pPager)==0 ) return SQLITE_OK; return sqlite3ServerLock(pPager->pServer, pgno, bWrite, 0); } #endif Index: src/pager.h ================================================================== --- src/pager.h +++ src/pager.h @@ -236,11 +236,10 @@ # define enable_simulated_io_errors() #endif #ifdef SQLITE_SERVER_EDITION int sqlite3PagerRollbackJournal(Pager*, sqlite3_file*); - int sqlite3PagerIsServer(Pager *pPager); int sqlite3PagerPagelock(Pager *pPager, Pgno, int); void sqlite3PagerServerJournal(Pager*, sqlite3_file*, const char*); #endif #endif /* SQLITE_PAGER_H */ Index: src/server.c ================================================================== --- src/server.c +++ src/server.c @@ -287,11 +287,11 @@ ServerDb **pp; serverShutdownDatabase(pDb); for(pp=&g_server.pDb; *pp!=pDb; pp=&((*pp)->pNext)); *pp = pDb->pNext; sqlite3_mutex_free(pDb->mutex); - while( pFree=pDb->pFree ){ + while( (pFree = pDb->pFree) ){ pDb->pFree = pFree->pNext; sqlite3_free(pFree); } sqlite3_free(pDb); } Index: tool/tserver.c ================================================================== --- tool/tserver.c +++ tool/tserver.c @@ -30,10 +30,12 @@ ** .list Display all SQL statements in the list. ** .quit Disconnect. ** .run Run all SQL statements in the list. ** .repeats N Configure the number of repeats per ".run". ** .seconds N Configure the number of seconds to ".run" for. +** .mutex_commit Add a "COMMIT" protected by a g.commit_mutex +** to the current SQL. ** ** Example input: ** ** BEGIN; ** INSERT INTO t1 VALUES(randomblob(10), randomblob(100)); @@ -58,22 +60,43 @@ #include #include #include "sqlite3.h" -/* Database used by this server */ -static char *zDatabaseName = 0; +#define TSERVER_DEFAULT_CHECKPOINT_THRESHOLD 3900 -static char *zGlobalVfs = 0; +/* Global variables */ +struct TserverGlobal { + char *zDatabaseName; /* Database used by this server */ + char *zVfs; + sqlite3_mutex *commit_mutex; + + /* The following use native pthreads instead of a portable interface. This + ** is because a condition variable, as well as a mutex, is required. */ + pthread_mutex_t ckpt_mutex; + pthread_cond_t ckpt_cond; + int nThreshold; /* Checkpoint when wal is this large */ + int bCkptRequired; /* True if wal checkpoint is required */ + int nRun; /* Number of clients in ".run" */ + int nWait; /* Number of clients waiting on ckpt_cond */ +}; + +static struct TserverGlobal g = {0}; + +typedef struct ClientSql ClientSql; +struct ClientSql { + sqlite3_stmt *pStmt; + int bMutex; +}; typedef struct ClientCtx ClientCtx; struct ClientCtx { sqlite3 *db; /* Database handle for this client */ int fd; /* Client fd */ int nRepeat; /* Number of times to repeat SQL */ int nSecond; /* Number of seconds to run for */ - sqlite3_stmt **apPrepare; /* Array of prepared statements */ + ClientSql *aPrepare; /* Array of prepared statements */ int nPrepare; /* Valid size of apPrepare[] */ int nAlloc; /* Allocated size of apPrepare[] */ }; static int is_eol(int i){ @@ -135,29 +158,30 @@ int nTail = nSql; int rc = SQLITE_OK; while( rc==SQLITE_OK ){ if( p->nPrepare>=p->nAlloc ){ - int nByte = (p->nPrepare+32) * sizeof(sqlite3_stmt*); - sqlite3_stmt **apNew = sqlite3_realloc(p->apPrepare, nByte); - if( apNew ){ - p->apPrepare = apNew; + int nByte = (p->nPrepare+32) * sizeof(ClientSql); + ClientSql *aNew = sqlite3_realloc(p->aPrepare, nByte); + if( aNew ){ + memset(&aNew[p->nPrepare], 0, sizeof(ClientSql)*32); + p->aPrepare = aNew; p->nAlloc = p->nPrepare+32; }else{ rc = SQLITE_NOMEM; break; } } rc = sqlite3_prepare_v2( - p->db, zTail, nTail, &p->apPrepare[p->nPrepare], &zTail + p->db, zTail, nTail, &p->aPrepare[p->nPrepare].pStmt, &zTail ); if( rc!=SQLITE_OK ){ send_message(p, "error - %s\n", sqlite3_errmsg(p->db)); rc = 1; break; } - if( p->apPrepare[p->nPrepare]==0 ){ + if( p->aPrepare[p->nPrepare].pStmt==0 ){ break; } p->nPrepare++; nTail = nSql - (zTail-zSql); rc = send_message(p, "ok (%d SQL statements)\n", p->nPrepare); @@ -173,23 +197,127 @@ } static void clear_sql(ClientCtx *p){ int j; for(j=0; jnPrepare; j++){ - sqlite3_finalize(p->apPrepare[j]); + sqlite3_finalize(p->aPrepare[j].pStmt); } p->nPrepare = 0; } + +/* +** The sqlite3_wal_hook() callback used by all client database connections. +*/ +static int clientWalHook(void *pArg, sqlite3 *db, const char *zDb, int nFrame){ + if( nFrame>=g.nThreshold ){ + g.bCkptRequired = 1; + } + return SQLITE_OK; +} + +static int handle_run_command(ClientCtx *p){ + int i, j; + int nBusy = 0; + sqlite3_int64 t0 = get_timer(); + sqlite3_int64 t1 = t0; + int nT1 = 0; + int nTBusy1 = 0; + int rc = SQLITE_OK; + + pthread_mutex_lock(&g.ckpt_mutex); + g.nRun++; + pthread_mutex_unlock(&g.ckpt_mutex); + + + for(j=0; (p->nRepeat<=0 || jnRepeat) && rc==SQLITE_OK; j++){ + sqlite3_int64 t2; + + for(i=0; inPrepare && rc==SQLITE_OK; i++){ + sqlite3_stmt *pStmt = p->aPrepare[i].pStmt; + + /* If the bMutex flag is set, grab g.commit_mutex before executing + ** the SQL statement (which is always "COMMIT" in this case). */ + if( p->aPrepare[i].bMutex ){ + sqlite3_mutex_enter(g.commit_mutex); + } + + /* Execute the statement */ + while( sqlite3_step(pStmt)==SQLITE_ROW ); + rc = sqlite3_reset(pStmt); + + /* Relinquish the g.commit_mutex mutex if required. */ + if( p->aPrepare[i].bMutex ){ + sqlite3_mutex_leave(g.commit_mutex); + } + + if( (rc & 0xFF)==SQLITE_BUSY ){ + if( sqlite3_get_autocommit(p->db)==0 ){ + sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0); + } + nBusy++; + rc = SQLITE_OK; + break; + } + else if( rc!=SQLITE_OK ){ + send_message(p, "error - %s\n", sqlite3_errmsg(p->db)); + } + } + + t2 = get_timer(); + if( t2>=(t1+1000) ){ + int nMs = (t2 - t1); + int nDone = (j+1 - nBusy - nT1); + + rc = send_message( + p, "(%d done @ %d per second, %d busy)\n", + nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1 + ); + t1 = t2; + nT1 = j+1 - nBusy; + nTBusy1 = nBusy; + if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break; + } + + /* Checkpoint handling. */ + pthread_mutex_lock(&g.ckpt_mutex); + if( rc==SQLITE_OK && g.bCkptRequired ){ + if( g.nWait==g.nRun-1 ){ + /* All other clients are already waiting on the condition variable. + ** Run the checkpoint, signal the condition and move on. */ + rc = sqlite3_wal_checkpoint(p->db, "main"); + g.bCkptRequired = 0; + pthread_cond_broadcast(&g.ckpt_cond); + }else{ + assert( g.nWait=1 && n<=4 && 0==strncmp(z, "list", n) ){ int i; for(i=0; rc==0 && inPrepare; i++){ - const char *zSql = sqlite3_sql(p->apPrepare[i]); + const char *zSql = sqlite3_sql(p->aPrepare[i].pStmt); int nSql = strlen(zSql); trim_string(&zSql, &nSql); rc = send_message(p, "%d: %.*s\n", i, nSql, zSql); } } @@ -217,74 +345,33 @@ } rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat); } else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){ - int i, j; - int nBusy = 0; - sqlite3_int64 t0 = get_timer(); - sqlite3_int64 t1 = t0; - int nT1 = 0; - int nTBusy1 = 0; - - for(j=0; (p->nRepeat<=0 || jnRepeat) && rc==SQLITE_OK; j++){ - sqlite3_int64 t2; - - for(i=0; inPrepare && rc==SQLITE_OK; i++){ - sqlite3_stmt *pStmt = p->apPrepare[i]; - - /* Execute the statement */ - while( sqlite3_step(pStmt)==SQLITE_ROW ); - rc = sqlite3_reset(pStmt); - - if( (rc & 0xFF)==SQLITE_BUSY ){ - if( sqlite3_get_autocommit(p->db)==0 ){ - sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0); - } - nBusy++; - rc = SQLITE_OK; - break; - } - else if( rc!=SQLITE_OK ){ - send_message(p, "error - %s\n", sqlite3_errmsg(p->db)); - } - } - - t2 = get_timer(); - if( t2>=(t1+1000) ){ - int nMs = (t2 - t1); - int nDone = (j+1 - nBusy - nT1); - - rc = send_message( - p, "(%d done @ %d per second, %d busy)\n", - nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1 - ); - t1 = t2; - nT1 = j+1 - nBusy; - nTBusy1 = nBusy; - if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break; - } - } - - if( rc==SQLITE_OK ){ - send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j); - } - clear_sql(p); + rc = handle_run_command(p); } else if( n>=1 && n<=7 && 0==strncmp(z, "seconds", n) ){ if( nArg ){ p->nSecond = strtol(zArg, 0, 0); if( p->nSecond>0 ) p->nRepeat = 0; } rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat); } + + else if( n>=1 && n<=12 && 0==strncmp(z, "mutex_commit", n) ){ + rc = handle_some_sql(p, "COMMIT;", 7); + if( rc==SQLITE_OK ){ + p->aPrepare[p->nPrepare-1].bMutex = 1; + } + } else{ send_message(p, "unrecognized dot command: %.*s\n" - "should be \"list\", \"run\", \"repeats\", or \"seconds\"\n", n, z + "should be \"list\", \"run\", \"repeats\", \"mutex_commit\" " + "or \"seconds\"\n", n, z ); rc = 1; } return rc; @@ -299,21 +386,24 @@ ClientCtx ctx; memset(&ctx, 0, sizeof(ClientCtx)); ctx.fd = (int)(intptr_t)pArg; ctx.nRepeat = 1; - rc = sqlite3_open_v2(zDatabaseName, &ctx.db, - SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs + rc = sqlite3_open_v2(g.zDatabaseName, &ctx.db, + SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs ); if( rc!=SQLITE_OK ){ fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db)); return 0; } sqlite3_create_function( ctx.db, "usleep", 1, SQLITE_UTF8, (void*)sqlite3_vfs_find(0), usleepFunc, 0, 0 ); + + /* Register the wal-hook with the new client connection */ + sqlite3_wal_hook(ctx.db, clientWalHook, (void*)&ctx); while( rc==SQLITE_OK ){ int i; int iStart; int nConsume; @@ -377,11 +467,11 @@ } fprintf(stdout, "Client %d disconnects\n", ctx.fd); close(ctx.fd); clear_sql(&ctx); - sqlite3_free(ctx.apPrepare); + sqlite3_free(ctx.aPrepare); sqlite3_close(ctx.db); return 0; } static void usage(const char *zExec){ @@ -404,16 +494,21 @@ usage(argv[0]); } if( argc==4 ){ int n = strlen(argv[1]); if( n<2 || n>4 || memcmp("-vfs", argv[1], 4) ) usage(argv[0]); - zGlobalVfs = argv[2]; + g.zVfs = argv[2]; } - zDatabaseName = argv[argc-1]; + g.zDatabaseName = argv[argc-1]; + g.commit_mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST); + + g.nThreshold = TSERVER_DEFAULT_CHECKPOINT_THRESHOLD; + pthread_mutex_init(&g.ckpt_mutex, 0); + pthread_cond_init(&g.ckpt_cond, 0); - rc = sqlite3_open_v2(zDatabaseName, &db, - SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs + rc = sqlite3_open_v2(g.zDatabaseName, &db, + SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs ); if( rc!=SQLITE_OK ){ fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(db)); return 1; }