/ Check-in [8299bdb7]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Update test program "tserver" to use a native pthreads mutex/condition variable to efficiently manage wal file checkpoints without the wal file growing indefinitely.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | server-process-edition
Files: files | file ages | folders
SHA3-256: 8299bdb7cbede30c665dda131bdcbd1d260b3ae9bd16d9b414d8c3776b08f1b3
User & Date: dan 2017-07-29 17:01:06
Context
2017-07-29
17:10
Merge latest trunk changes with this branch. check-in: b42c8779 user: dan tags: server-process-edition
17:01
Update test program "tserver" to use a native pthreads mutex/condition variable to efficiently manage wal file checkpoints without the wal file growing indefinitely. check-in: 8299bdb7 user: dan tags: server-process-edition
2017-07-28
21:02
Reduce the number of mallocs required of writers in server mode. check-in: 60953997 user: dan tags: server-process-edition
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/pager.c.

  7743   7743   int sqlite3PagerWalFramesize(Pager *pPager){
  7744   7744     assert( pPager->eState>=PAGER_READER );
  7745   7745     return sqlite3WalFramesize(pPager->pWal);
  7746   7746   }
  7747   7747   #endif
  7748   7748   
  7749   7749   #ifdef SQLITE_SERVER_EDITION
  7750         -int sqlite3PagerIsServer(Pager *pPager){
  7751         -  return pagerIsServer(pPager);
  7752         -}
  7753   7750   int sqlite3PagerPagelock(Pager *pPager, Pgno pgno, int bWrite){
  7754   7751     if( pagerIsServer(pPager)==0 ) return SQLITE_OK;
  7755   7752     return sqlite3ServerLock(pPager->pServer, pgno, bWrite, 0);
  7756   7753   }
  7757   7754   #endif
  7758   7755   
  7759   7756   #endif /* SQLITE_OMIT_DISKIO */

Changes to src/pager.h.

   234    234   #else
   235    235   # define disable_simulated_io_errors()
   236    236   # define enable_simulated_io_errors()
   237    237   #endif
   238    238   
   239    239   #ifdef SQLITE_SERVER_EDITION
   240    240     int sqlite3PagerRollbackJournal(Pager*, sqlite3_file*);
   241         -  int sqlite3PagerIsServer(Pager *pPager);
   242    241     int sqlite3PagerPagelock(Pager *pPager, Pgno, int);
   243    242     void sqlite3PagerServerJournal(Pager*, sqlite3_file*, const char*);
   244    243   #endif
   245    244   
   246    245   #endif /* SQLITE_PAGER_H */

Changes to src/server.c.

   285    285     if( pDb->nClient==0 ){
   286    286       ServerPage *pFree;
   287    287       ServerDb **pp;
   288    288       serverShutdownDatabase(pDb);
   289    289       for(pp=&g_server.pDb; *pp!=pDb; pp=&((*pp)->pNext));
   290    290       *pp = pDb->pNext;
   291    291       sqlite3_mutex_free(pDb->mutex);
   292         -    while( pFree=pDb->pFree ){
          292  +    while( (pFree = pDb->pFree) ){
   293    293         pDb->pFree = pFree->pNext;
   294    294         sqlite3_free(pFree);
   295    295       }
   296    296       sqlite3_free(pDb);
   297    297     }
   298    298     serverLeaveMutex();
   299    299   

Changes to tool/tserver.c.

    28     28   ** Dot-commands are:
    29     29   **
    30     30   **   .list                    Display all SQL statements in the list.
    31     31   **   .quit                    Disconnect.
    32     32   **   .run                     Run all SQL statements in the list.
    33     33   **   .repeats N               Configure the number of repeats per ".run".
    34     34   **   .seconds N               Configure the number of seconds to ".run" for.
           35  +**   .mutex_commit            Add a "COMMIT" protected by a g.commit_mutex
           36  +**                            to the current SQL.
    35     37   **
    36     38   ** Example input:
    37     39   **
    38     40   **   BEGIN;
    39     41   **     INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
    40     42   **     INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
    41     43   **     INSERT INTO t1 VALUES(randomblob(10), randomblob(100));
................................................................................
    56     58   #include <string.h>
    57     59   #include <sys/socket.h>
    58     60   #include <sys/time.h>
    59     61   #include <unistd.h>
    60     62   
    61     63   #include "sqlite3.h"
    62     64   
    63         -/* Database used by this server */
    64         -static char *zDatabaseName = 0;
           65  +#define TSERVER_DEFAULT_CHECKPOINT_THRESHOLD 3900
    65     66   
    66         -static char *zGlobalVfs = 0;
           67  +/* Global variables */
           68  +struct TserverGlobal {
           69  +  char *zDatabaseName;             /* Database used by this server */
           70  +  char *zVfs;
           71  +  sqlite3_mutex *commit_mutex;
           72  +
           73  +  /* The following use native pthreads instead of a portable interface. This
           74  +  ** is because a condition variable, as well as a mutex, is required.  */
           75  +  pthread_mutex_t ckpt_mutex;
           76  +  pthread_cond_t ckpt_cond;
           77  +  int nThreshold;                  /* Checkpoint when wal is this large */
           78  +  int bCkptRequired;               /* True if wal checkpoint is required */
           79  +  int nRun;                        /* Number of clients in ".run" */
           80  +  int nWait;                       /* Number of clients waiting on ckpt_cond */
           81  +};
           82  +
           83  +static struct TserverGlobal g = {0};
           84  +
           85  +typedef struct ClientSql ClientSql;
           86  +struct ClientSql {
           87  +  sqlite3_stmt *pStmt;
           88  +  int bMutex;
           89  +};
    67     90   
    68     91   typedef struct ClientCtx ClientCtx;
    69     92   struct ClientCtx {
    70     93     sqlite3 *db;                    /* Database handle for this client */
    71     94     int fd;                         /* Client fd */
    72     95     int nRepeat;                    /* Number of times to repeat SQL */
    73     96     int nSecond;                    /* Number of seconds to run for */
    74         -  sqlite3_stmt **apPrepare;       /* Array of prepared statements */
           97  +  ClientSql *aPrepare;            /* Array of prepared statements */
    75     98     int nPrepare;                   /* Valid size of apPrepare[] */
    76     99     int nAlloc;                     /* Allocated size of apPrepare[] */
    77    100   };
    78    101   
    79    102   static int is_eol(int i){
    80    103     return (i=='\n' || i=='\r');
    81    104   }
................................................................................
   133    156   static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){
   134    157     const char *zTail = zSql;
   135    158     int nTail = nSql;
   136    159     int rc = SQLITE_OK;
   137    160   
   138    161     while( rc==SQLITE_OK ){
   139    162       if( p->nPrepare>=p->nAlloc ){
   140         -      int nByte = (p->nPrepare+32) * sizeof(sqlite3_stmt*);
   141         -      sqlite3_stmt **apNew = sqlite3_realloc(p->apPrepare, nByte);
   142         -      if( apNew ){
   143         -        p->apPrepare = apNew;
          163  +      int nByte = (p->nPrepare+32) * sizeof(ClientSql);
          164  +      ClientSql *aNew = sqlite3_realloc(p->aPrepare, nByte);
          165  +      if( aNew ){
          166  +        memset(&aNew[p->nPrepare], 0, sizeof(ClientSql)*32);
          167  +        p->aPrepare = aNew;
   144    168           p->nAlloc = p->nPrepare+32;
   145    169         }else{
   146    170           rc = SQLITE_NOMEM;
   147    171           break;
   148    172         }
   149    173       }
   150    174       rc = sqlite3_prepare_v2(
   151         -        p->db, zTail, nTail, &p->apPrepare[p->nPrepare], &zTail
          175  +        p->db, zTail, nTail, &p->aPrepare[p->nPrepare].pStmt, &zTail
   152    176       );
   153    177       if( rc!=SQLITE_OK ){
   154    178         send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
   155    179         rc = 1;
   156    180         break;
   157    181       }
   158         -    if( p->apPrepare[p->nPrepare]==0 ){
          182  +    if( p->aPrepare[p->nPrepare].pStmt==0 ){
   159    183         break;
   160    184       }
   161    185       p->nPrepare++;
   162    186       nTail = nSql - (zTail-zSql);
   163    187       rc = send_message(p, "ok (%d SQL statements)\n", p->nPrepare);
   164    188     }
   165    189   
................................................................................
   171    195     gettimeofday(&t, 0);
   172    196     return ((sqlite3_int64)t.tv_usec / 1000) + ((sqlite3_int64)t.tv_sec * 1000);
   173    197   }
   174    198   
   175    199   static void clear_sql(ClientCtx *p){
   176    200     int j;
   177    201     for(j=0; j<p->nPrepare; j++){
   178         -    sqlite3_finalize(p->apPrepare[j]);
          202  +    sqlite3_finalize(p->aPrepare[j].pStmt);
   179    203     }
   180    204     p->nPrepare = 0;
   181    205   }
          206  +
          207  +/*
          208  +** The sqlite3_wal_hook() callback used by all client database connections.
          209  +*/
          210  +static int clientWalHook(void *pArg, sqlite3 *db, const char *zDb, int nFrame){
          211  +  if( nFrame>=g.nThreshold ){
          212  +    g.bCkptRequired = 1;
          213  +  }
          214  +  return SQLITE_OK;
          215  +}
          216  +
          217  +static int handle_run_command(ClientCtx *p){
          218  +  int i, j;
          219  +  int nBusy = 0;
          220  +  sqlite3_int64 t0 = get_timer();
          221  +  sqlite3_int64 t1 = t0;
          222  +  int nT1 = 0;
          223  +  int nTBusy1 = 0;
          224  +  int rc = SQLITE_OK;
          225  +
          226  +  pthread_mutex_lock(&g.ckpt_mutex);
          227  +  g.nRun++;
          228  +  pthread_mutex_unlock(&g.ckpt_mutex);
          229  +
          230  +
          231  +  for(j=0; (p->nRepeat<=0 || j<p->nRepeat) && rc==SQLITE_OK; j++){
          232  +    sqlite3_int64 t2;
          233  +
          234  +    for(i=0; i<p->nPrepare && rc==SQLITE_OK; i++){
          235  +      sqlite3_stmt *pStmt = p->aPrepare[i].pStmt;
          236  +
          237  +      /* If the bMutex flag is set, grab g.commit_mutex before executing
          238  +      ** the SQL statement (which is always "COMMIT" in this case). */
          239  +      if( p->aPrepare[i].bMutex ){
          240  +        sqlite3_mutex_enter(g.commit_mutex);
          241  +      }
          242  +
          243  +      /* Execute the statement */
          244  +      while( sqlite3_step(pStmt)==SQLITE_ROW );
          245  +      rc = sqlite3_reset(pStmt);
          246  +
          247  +      /* Relinquish the g.commit_mutex mutex if required. */
          248  +      if( p->aPrepare[i].bMutex ){
          249  +        sqlite3_mutex_leave(g.commit_mutex);
          250  +      }
          251  +
          252  +      if( (rc & 0xFF)==SQLITE_BUSY ){
          253  +        if( sqlite3_get_autocommit(p->db)==0 ){
          254  +          sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0);
          255  +        }
          256  +        nBusy++;
          257  +        rc = SQLITE_OK;
          258  +        break;
          259  +      }
          260  +      else if( rc!=SQLITE_OK ){
          261  +        send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
          262  +      }
          263  +    }
          264  +
          265  +    t2 = get_timer();
          266  +    if( t2>=(t1+1000) ){
          267  +      int nMs = (t2 - t1);
          268  +      int nDone = (j+1 - nBusy - nT1);
          269  +
          270  +      rc = send_message(
          271  +          p, "(%d done @ %d per second, %d busy)\n", 
          272  +          nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1
          273  +          );
          274  +      t1 = t2;
          275  +      nT1 = j+1 - nBusy;
          276  +      nTBusy1 = nBusy;
          277  +      if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break;
          278  +    }
          279  +
          280  +    /* Checkpoint handling. */
          281  +    pthread_mutex_lock(&g.ckpt_mutex);
          282  +    if( rc==SQLITE_OK && g.bCkptRequired ){
          283  +      if( g.nWait==g.nRun-1 ){
          284  +        /* All other clients are already waiting on the condition variable.
          285  +        ** Run the checkpoint, signal the condition and move on.  */
          286  +        rc = sqlite3_wal_checkpoint(p->db, "main");
          287  +        g.bCkptRequired = 0;
          288  +        pthread_cond_broadcast(&g.ckpt_cond);
          289  +      }else{
          290  +        assert( g.nWait<g.nRun-1 );
          291  +        g.nWait++;
          292  +        pthread_cond_wait(&g.ckpt_cond, &g.ckpt_mutex);
          293  +        g.nWait--;
          294  +      }
          295  +    }
          296  +    pthread_mutex_unlock(&g.ckpt_mutex);
          297  +  }
          298  +
          299  +  if( rc==SQLITE_OK ){
          300  +    send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j);
          301  +  }
          302  +  clear_sql(p);
          303  +
          304  +  pthread_mutex_lock(&g.ckpt_mutex);
          305  +  g.nRun--;
          306  +  pthread_mutex_unlock(&g.ckpt_mutex);
          307  +
          308  +  return rc;
          309  +}
   182    310   
   183    311   static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){
   184         -  assert( zCmd[0]=='.' );
   185    312     int n;
   186    313     int rc = 0;
   187    314     const char *z = &zCmd[1];
   188    315     const char *zArg;
   189    316     int nArg;
   190    317   
          318  +  assert( zCmd[0]=='.' );
   191    319     for(n=0; n<(nCmd-1); n++){
   192    320       if( is_whitespace(z[n]) ) break;
   193    321     }
   194    322   
   195    323     zArg = &z[n];
   196    324     nArg = nCmd-n;
   197    325     trim_string(&zArg, &nArg);
   198    326   
   199    327     if( n>=1 && n<=4 && 0==strncmp(z, "list", n) ){
   200    328       int i;
   201    329       for(i=0; rc==0 && i<p->nPrepare; i++){
   202         -      const char *zSql = sqlite3_sql(p->apPrepare[i]);
          330  +      const char *zSql = sqlite3_sql(p->aPrepare[i].pStmt);
   203    331         int nSql = strlen(zSql);
   204    332         trim_string(&zSql, &nSql);
   205    333         rc = send_message(p, "%d: %.*s\n", i, nSql, zSql);
   206    334       }
   207    335     }
   208    336   
   209    337     else if( n>=1 && n<=4 && 0==strncmp(z, "quit", n) ){
................................................................................
   215    343         p->nRepeat = strtol(zArg, 0, 0);
   216    344         if( p->nRepeat>0 ) p->nSecond = 0;
   217    345       }
   218    346       rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat);
   219    347     }
   220    348   
   221    349     else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){
   222         -    int i, j;
   223         -    int nBusy = 0;
   224         -    sqlite3_int64 t0 = get_timer();
   225         -    sqlite3_int64 t1 = t0;
   226         -    int nT1 = 0;
   227         -    int nTBusy1 = 0;
   228         -
   229         -    for(j=0; (p->nRepeat<=0 || j<p->nRepeat) && rc==SQLITE_OK; j++){
   230         -      sqlite3_int64 t2;
   231         -
   232         -      for(i=0; i<p->nPrepare && rc==SQLITE_OK; i++){
   233         -        sqlite3_stmt *pStmt = p->apPrepare[i];
   234         -
   235         -        /* Execute the statement */
   236         -        while( sqlite3_step(pStmt)==SQLITE_ROW );
   237         -        rc = sqlite3_reset(pStmt);
   238         -
   239         -        if( (rc & 0xFF)==SQLITE_BUSY ){
   240         -          if( sqlite3_get_autocommit(p->db)==0 ){
   241         -            sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0);
   242         -          }
   243         -          nBusy++;
   244         -          rc = SQLITE_OK;
   245         -          break;
   246         -        }
   247         -        else if( rc!=SQLITE_OK ){
   248         -          send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
   249         -        }
   250         -      }
   251         -
   252         -      t2 = get_timer();
   253         -      if( t2>=(t1+1000) ){
   254         -        int nMs = (t2 - t1);
   255         -        int nDone = (j+1 - nBusy - nT1);
   256         -
   257         -        rc = send_message(
   258         -            p, "(%d done @ %d per second, %d busy)\n", 
   259         -            nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1
   260         -        );
   261         -        t1 = t2;
   262         -        nT1 = j+1 - nBusy;
   263         -        nTBusy1 = nBusy;
   264         -        if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break;
   265         -      }
   266         -    }
   267         -
   268         -    if( rc==SQLITE_OK ){
   269         -      send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j);
   270         -    }
   271         -    clear_sql(p);
          350  +    rc = handle_run_command(p);
   272    351     }
   273    352   
   274    353     else if( n>=1 && n<=7 && 0==strncmp(z, "seconds", n) ){
   275    354       if( nArg ){
   276    355         p->nSecond = strtol(zArg, 0, 0);
   277    356         if( p->nSecond>0 ) p->nRepeat = 0;
   278    357       }
   279    358       rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat);
   280    359     }
          360  +
          361  +  else if( n>=1 && n<=12 && 0==strncmp(z, "mutex_commit", n) ){
          362  +    rc = handle_some_sql(p, "COMMIT;", 7);
          363  +    if( rc==SQLITE_OK ){
          364  +      p->aPrepare[p->nPrepare-1].bMutex = 1;
          365  +    }
          366  +  }
   281    367   
   282    368     else{
   283    369       send_message(p, 
   284    370           "unrecognized dot command: %.*s\n"
   285         -        "should be \"list\", \"run\", \"repeats\", or \"seconds\"\n", n, z
          371  +        "should be \"list\", \"run\", \"repeats\", \"mutex_commit\" "
          372  +        "or \"seconds\"\n", n, z
   286    373       );
   287    374       rc = 1;
   288    375     }
   289    376   
   290    377     return rc;
   291    378   }
   292    379   
................................................................................
   297    384     int rc = SQLITE_OK;
   298    385   
   299    386     ClientCtx ctx;
   300    387     memset(&ctx, 0, sizeof(ClientCtx));
   301    388   
   302    389     ctx.fd = (int)(intptr_t)pArg;
   303    390     ctx.nRepeat = 1;
   304         -  rc = sqlite3_open_v2(zDatabaseName, &ctx.db,
   305         -      SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs
          391  +  rc = sqlite3_open_v2(g.zDatabaseName, &ctx.db,
          392  +      SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
   306    393     );
   307    394     if( rc!=SQLITE_OK ){
   308    395       fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db));
   309    396       return 0;
   310    397     }
   311    398     sqlite3_create_function(
   312    399         ctx.db, "usleep", 1, SQLITE_UTF8, (void*)sqlite3_vfs_find(0), 
   313    400         usleepFunc, 0, 0
   314    401     );
          402  +
          403  +  /* Register the wal-hook with the new client connection */
          404  +  sqlite3_wal_hook(ctx.db, clientWalHook, (void*)&ctx);
   315    405   
   316    406     while( rc==SQLITE_OK ){
   317    407       int i;
   318    408       int iStart;
   319    409       int nConsume;
   320    410       res = read(ctx.fd, &zCmd[nCmd], sizeof(zCmd)-nCmd-1);
   321    411       if( res<=0 ) break;
................................................................................
   375    465         }
   376    466       }while( rc==SQLITE_OK && nConsume>0 );
   377    467     }
   378    468   
   379    469     fprintf(stdout, "Client %d disconnects\n", ctx.fd);
   380    470     close(ctx.fd);
   381    471     clear_sql(&ctx);
   382         -  sqlite3_free(ctx.apPrepare);
          472  +  sqlite3_free(ctx.aPrepare);
   383    473     sqlite3_close(ctx.db);
   384    474     return 0;
   385    475   } 
   386    476   
   387    477   static void usage(const char *zExec){
   388    478     fprintf(stderr, "Usage: %s ?-vfs VFS? DATABASE\n", zExec);
   389    479     exit(1);
................................................................................
   402    492   
   403    493     if( argc!=2 && argc!=4 ){
   404    494       usage(argv[0]);
   405    495     }
   406    496     if( argc==4 ){
   407    497       int n = strlen(argv[1]);
   408    498       if( n<2 || n>4 || memcmp("-vfs", argv[1], 4) ) usage(argv[0]);
   409         -    zGlobalVfs = argv[2];
          499  +    g.zVfs = argv[2];
   410    500     }
   411         -  zDatabaseName = argv[argc-1];
          501  +  g.zDatabaseName = argv[argc-1];
          502  +  g.commit_mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST);
          503  +
          504  +  g.nThreshold = TSERVER_DEFAULT_CHECKPOINT_THRESHOLD;
          505  +  pthread_mutex_init(&g.ckpt_mutex, 0);
          506  +  pthread_cond_init(&g.ckpt_cond, 0);
   412    507   
   413         -  rc = sqlite3_open_v2(zDatabaseName, &db,
   414         -      SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs
          508  +  rc = sqlite3_open_v2(g.zDatabaseName, &db,
          509  +      SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
   415    510     );
   416    511     if( rc!=SQLITE_OK ){
   417    512       fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(db));
   418    513       return 1;
   419    514     }
   420    515   
   421    516     rc = sqlite3_exec(db, "SELECT * FROM sqlite_master", 0, 0, 0);