SQLite

sqlite3-rsync.c at 2fe2f374584b025676684ebe4ef29304883a3b0b125b62abc1dbf74815eecdfb
Login

File tool/sqlite3-rsync.c artifact 7c78ba15 on branch 2fe2f374584b025676684ebe4ef29304883a3b0b125b62abc1dbf74815eecdfb


/*
** 2024-09-10
**
** The author disclaims copyright to this source code.  In place of
** a legal notice, here is a blessing:
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
**
** This is a utility program that makes a copy of a live SQLite database
** using a bandwidth-efficient protocol, similar to "rsync".
*/
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <string.h>
#include <stdarg.h>
#include "sqlite3.h"

static const char zUsage[] = 
  "sqlite3-rsync ORIGIN REPLICA ?OPTIONS?\n"
  "\n"
  "One of ORIGIN or REPLICA is a pathname to a database on the local\n"
  "machine and the other is of the form \"USER@HOST:PATH\" describing\n"
  "a database on a remote machine.  This utility makes REPLICA into a\n"
  "copy of ORIGIN\n"
  "\n"
  "OPTIONS:\n"
  "\n"
  "   --exe PATH    Name of the sqlite3-rsync program on the remote side\n"
  "   --help        Show this help screen\n"
  "   --ssh PATH    Name of the SSH program used to reach the remote side\n"
  "   -v            Verbose.  Multiple v's for increasing output\n"
  "   --version     Show detailed version information\n"
;

typedef unsigned char u8;

/* Context for the run */
typedef struct SQLiteRsync SQLiteRsync;
struct SQLiteRsync {
  const char *zOrigin;     /* Name of the origin */
  const char *zReplica;    /* Name of the replica */
  const char *zErrFile;    /* Append error messages to this file */
  FILE *pOut;              /* Transmit to the other side */
  FILE *pIn;               /* Receive from the other side */
  FILE *pLog;              /* Duplicate output here if not NULL */
  sqlite3 *db;             /* Database connection */
  int nErr;                /* Number of errors encountered */
  int nWrErr;              /* Number of failed attempts to write on the pipe */
  u8 eVerbose;             /* Bigger for more output.  0 means none. */
  u8 bCommCheck;           /* True to debug the communication protocol */
  u8 isRemote;             /* On the remote side of a connection */
  u8 isReplica;            /* True if running on the replica side */
  u8 iProtocol;            /* Protocol version number */
  sqlite3_uint64 nOut;     /* Bytes transmitted */
  sqlite3_uint64 nIn;      /* Bytes received */
  unsigned int nPage;      /* Total number of pages in the database */
  unsigned int szPage;     /* Database page size */
  unsigned int nHashSent;  /* Hashes sent (replica to origin) */
  unsigned int nPageSent;  /* Page contents sent (origin to replica) */
};

/* The version number of the protocol.  Sent in the *_BEGIN message
** to verify that both sides speak the same dialect.
*/
#define PROTOCOL_VERSION  1


/* Magic numbers to identify particular messages sent over the wire.
*/
#define ORIGIN_BEGIN    0x41     /* Initial message */
#define ORIGIN_END      0x42     /* Time to quit */
#define ORIGIN_ERROR    0x43     /* Error message from the remote */
#define ORIGIN_PAGE     0x44     /* New page data */
#define ORIGIN_TXN      0x45     /* Transaction commit */
#define ORIGIN_MSG      0x46     /* Informational message */

#define REPLICA_BEGIN   0x61     /* Welcome message */
#define REPLICA_ERROR   0x62     /* Error.  Report and quit. */
#define REPLICA_END     0x63     /* Replica wants to stop */
#define REPLICA_HASH    0x64     /* One or more pages hashes to report */
#define REPLICA_READY   0x65     /* Read to receive page content */
#define REPLICA_MSG     0x66     /* Informational message */


/****************************************************************************
** Beginning of the popen2() implementation copied from Fossil  *************
****************************************************************************/
#ifdef _WIN32
#include <windows.h>
#include <fcntl.h>
/*
** Print a fatal error and quit.
*/
static void win32_fatal_error(const char *zMsg){
  fprintf(stderr, "%s", zMsg);
  exit(1);
}
extern int _open_osfhandle(intptr_t,int);
#else
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#endif

/*
** The following macros are used to cast pointers to integers and
** integers to pointers.  The way you do this varies from one compiler
** to the next, so we have developed the following set of #if statements
** to generate appropriate macros for a wide range of compilers.
**
** The correct "ANSI" way to do this is to use the intptr_t type.
** Unfortunately, that typedef is not available on all compilers, or
** if it is available, it requires an #include of specific headers
** that vary from one machine to the next.
**
** This code is copied out of SQLite.
*/
#if defined(__PTRDIFF_TYPE__)  /* This case should work for GCC */
# define INT_TO_PTR(X)  ((void*)(__PTRDIFF_TYPE__)(X))
# define PTR_TO_INT(X)  ((int)(__PTRDIFF_TYPE__)(X))
#elif !defined(__GNUC__)       /* Works for compilers other than LLVM */
# define INT_TO_PTR(X)  ((void*)&((char*)0)[X])
# define PTR_TO_INT(X)  ((int)(((char*)X)-(char*)0))
#elif defined(HAVE_STDINT_H)   /* Use this case if we have ANSI headers */
# define INT_TO_PTR(X)  ((void*)(intptr_t)(X))
# define PTR_TO_INT(X)  ((int)(intptr_t)(X))
#else                          /* Generates a warning - but it always works */
# define INT_TO_PTR(X)  ((void*)(X))
# define PTR_TO_INT(X)  ((int)(X))
#endif

/* Register SQL functions provided by ext/misc/sha1.c */
extern int sqlite3_sha_init(
  sqlite3 *db,
  char **pzErrMsg,
  const sqlite3_api_routines *pApi
);

#ifdef _WIN32
/*
** On windows, create a child process and specify the stdin, stdout,
** and stderr channels for that process to use.
**
** Return the number of errors.
*/
static int win32_create_child_process(
  wchar_t *zCmd,       /* The command that the child process will run */
  HANDLE hIn,          /* Standard input */
  HANDLE hOut,         /* Standard output */
  HANDLE hErr,         /* Standard error */
  DWORD *pChildPid     /* OUT: Child process handle */
){
  STARTUPINFOW si;
  PROCESS_INFORMATION pi;
  BOOL rc;

  memset(&si, 0, sizeof(si));
  si.cb = sizeof(si);
  si.dwFlags = STARTF_USESTDHANDLES;
  SetHandleInformation(hIn, HANDLE_FLAG_INHERIT, TRUE);
  si.hStdInput  = hIn;
  SetHandleInformation(hOut, HANDLE_FLAG_INHERIT, TRUE);
  si.hStdOutput = hOut;
  SetHandleInformation(hErr, HANDLE_FLAG_INHERIT, TRUE);
  si.hStdError  = hErr;
  rc = CreateProcessW(
     NULL,  /* Application Name */
     zCmd,  /* Command-line */
     NULL,  /* Process attributes */
     NULL,  /* Thread attributes */
     TRUE,  /* Inherit Handles */
     0,     /* Create flags  */
     NULL,  /* Environment */
     NULL,  /* Current directory */
     &si,   /* Startup Info */
     &pi    /* Process Info */
  );
  if( rc ){
    CloseHandle( pi.hProcess );
    CloseHandle( pi.hThread );
    *pChildPid = pi.dwProcessId;
  }else{
    win32_fatal_error("cannot create child process");
  }
  return rc!=0;
}
void *win32_utf8_to_unicode(const char *zUtf8){
  int nByte = MultiByteToWideChar(CP_UTF8, 0, zUtf8, -1, 0, 0);
  wchar_t *zUnicode = malloc( nByte*2 );
  MultiByteToWideChar(CP_UTF8, 0, zUtf8, -1, zUnicode, nByte);
  return zUnicode;
}
#endif

/*
** Create a child process running shell command "zCmd".  *ppOut is
** a FILE that becomes the standard input of the child process.
** (The caller writes to *ppOut in order to send text to the child.)
** *ppIn is stdout from the child process.  (The caller
** reads from *ppIn in order to receive input from the child.)
** Note that *ppIn is an unbuffered file descriptor, not a FILE.
** The process ID of the child is written into *pChildPid.
**
** Return the number of errors.
*/
static int popen2(
  const char *zCmd,      /* Command to run in the child process */
  FILE **ppIn,           /* Read from child using this file descriptor */
  FILE **ppOut,          /* Write to child using this file descriptor */
  int *pChildPid,        /* PID of the child process */
  int bDirect            /* 0: run zCmd as a shell cmd.  1: run directly */
){
#ifdef _WIN32
  HANDLE hStdinRd, hStdinWr, hStdoutRd, hStdoutWr, hStderr;
  SECURITY_ATTRIBUTES saAttr;
  DWORD childPid = 0;
  int fd;

  saAttr.nLength = sizeof(saAttr);
  saAttr.bInheritHandle = TRUE;
  saAttr.lpSecurityDescriptor = NULL;
  hStderr = GetStdHandle(STD_ERROR_HANDLE);
  if( !CreatePipe(&hStdoutRd, &hStdoutWr, &saAttr, 4096) ){
    win32_fatal_error("cannot create pipe for stdout");
  }
  SetHandleInformation( hStdoutRd, HANDLE_FLAG_INHERIT, FALSE);

  if( !CreatePipe(&hStdinRd, &hStdinWr, &saAttr, 4096) ){
    win32_fatal_error("cannot create pipe for stdin");
  }
  SetHandleInformation( hStdinWr, HANDLE_FLAG_INHERIT, FALSE);

  win32_create_child_process(win32_utf8_to_unicode(zCmd),
                             hStdinRd, hStdoutWr, hStderr,&childPid);
  *pChildPid = childPid;
  fd = _open_osfhandle(PTR_TO_INT(hStdoutRd), 0);
  *ppIn = fdopen(fd, "r");
  fd = _open_osfhandle(PTR_TO_INT(hStdinWr), 0);
  *ppOut = _fdopen(fd, "w");
  CloseHandle(hStdinRd);
  CloseHandle(hStdoutWr);
  return 0;
#else
  int pin[2], pout[2];
  *ppIn = 0;
  *ppOut = 0;
  *pChildPid = 0;

  if( pipe(pin)<0 ){
    return 1;
  }
  if( pipe(pout)<0 ){
    close(pin[0]);
    close(pin[1]);
    return 1;
  }
  *pChildPid = fork();
  if( *pChildPid<0 ){
    close(pin[0]);
    close(pin[1]);
    close(pout[0]);
    close(pout[1]);
    *pChildPid = 0;
    return 1;
  }
  signal(SIGPIPE,SIG_IGN);
  if( *pChildPid==0 ){
    int fd;
    /* This is the child process */
    close(0);
    fd = dup(pout[0]);
    if( fd!=0 ) {
      fprintf(stderr,"popen2() failed to open file descriptor 0");
      exit(1);
    }
    close(pout[0]);
    close(pout[1]);
    close(1);
    fd = dup(pin[1]);
    if( fd!=1 ){
      fprintf(stderr,"popen() failed to open file descriptor 1");
      exit(1);
    }
    close(pin[0]);
    close(pin[1]);
    if( bDirect ){
      execl(zCmd, zCmd, (char*)0);
    }else{
      execl("/bin/sh", "/bin/sh", "-c", zCmd, (char*)0);
    }
    return 1;
  }else{
    /* This is the parent process */
    close(pin[1]);
    *ppIn = fdopen(pin[0], "r");
    close(pout[0]);
    *ppOut = fdopen(pout[1], "w");
    return 0;
  }
#endif
}

/*
** Close the connection to a child process previously created using
** popen2().
*/
static void pclose2(FILE *pIn, FILE *pOut, int childPid){
#ifdef _WIN32
  /* Not implemented, yet */
  fclose(pIn);
  fclose(pOut);
#else
  fclose(pIn);
  fclose(pOut);
  while( waitpid(0, 0, WNOHANG)>0 ) {}
#endif
}
/*****************************************************************************
** End of the popen2() implementation copied from Fossil *********************
*****************************************************************************/

/*****************************************************************************
** Beginning of the append_escaped_arg() routine, adapted from the Fossil   **
** subroutine nameed blob_append_escaped_arg()                              **
*****************************************************************************/
/*
** ASCII (for reference):
**    x0  x1  x2  x3  x4  x5  x6  x7  x8  x9  xa  xb  xc  xd  xe  xf
** 0x ^`  ^a  ^b  ^c  ^d  ^e  ^f  ^g  \b  \t  \n  ()  \f  \r  ^n  ^o
** 1x ^p  ^q  ^r  ^s  ^t  ^u  ^v  ^w  ^x  ^y  ^z  ^{  ^|  ^}  ^~  ^
** 2x ()  !   "   #   $   %   &   '   (   )   *   +   ,   -   .   /
** 3x 0   1   2   3   4   5   6   7   8   9   :   ;   <   =   >   ?
** 4x @   A   B   C   D   E   F   G   H   I   J   K   L   M   N   O
** 5x P   Q   R   S   T   U   V   W   X   Y   Z   [   \   ]   ^   _
** 6x `   a   b   c   d   e   f   g   h   i   j   k   l   m   n   o
** 7x p   q   r   s   t   u   v   w   x   y   z   {   |   }   ~   ^_
*/

/*
** Meanings for bytes in a filename:
**
**    0      Ordinary character.  No encoding required
**    1      Needs to be escaped
**    2      Illegal character.  Do not allow in a filename
**    3      First byte of a 2-byte UTF-8
**    4      First byte of a 3-byte UTF-8
**    5      First byte of a 4-byte UTF-8
*/
static const char aSafeChar[256] = {
#ifdef _WIN32
/* Windows
** Prohibit:  all control characters, including tab, \r and \n.
** Escape:    (space) " # $ % & ' ( ) * ; < > ? [ ] ^ ` { | }
*/
/*  x0  x1  x2  x3  x4  x5  x6  x7  x8  x9  xa  xb  xc  xd  xe  xf  */
     2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2, /* 0x */
     2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2, /* 1x */
     1,  0,  1,  1,  1,  1,  1,  1,  1,  1,  1,  0,  0,  0,  0,  0, /* 2x */
     0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  1,  0,  1,  1, /* 3x */
     1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0, /* 4x */
     0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  0,  1,  1,  0, /* 5x */
     1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0, /* 6x */
     0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  1,  1,  0,  1, /* 7x */
#else
/* Unix
** Prohibit:  all control characters, including tab, \r and \n
** Escape:    (space) ! " # $ % & ' ( ) * ; < > ? [ \ ] ^ ` { | }
*/
/*  x0  x1  x2  x3  x4  x5  x6  x7  x8  x9  xa  xb  xc  xd  xe  xf  */
     2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2, /* 0x */
     2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2, /* 1x */
     1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  0,  0,  0,  0,  0, /* 2x */
     0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  1,  0,  1,  1, /* 3x */
     1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0, /* 4x */
     0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  1,  1,  1,  0, /* 5x */
     1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0, /* 6x */
     0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  1,  1,  0,  1, /* 7x */
#endif
    /* all bytes 0x80 through 0xbf are unescaped, being secondary
    ** bytes to UTF8 characters.  Bytes 0xc0 through 0xff are the
    ** first byte of a UTF8 character and do get escaped */
     2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2, /* 8x */
     2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2, /* 9x */
     2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2, /* ax */
     2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2, /* bx */
     3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3, /* cx */
     3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3, /* dx */
     4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4, /* ex */
     5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5  /* fx */
};

/*
** pStr is a shell command under construction.  This routine safely
** appends filename argument zIn.  It returns 0 on success or non-zero
** on any error.
**
** The argument is escaped if it contains white space or other characters
** that need to be escaped for the shell.  If zIn contains characters
** that cannot be safely escaped, then throw a fatal error.
**
** If the isFilename argument is true, then the argument is expected
** to be a filename.  As shell commands commonly have command-line
** options that begin with "-" and since we do not want an attacker
** to be able to invoke these switches using filenames that begin
** with "-", if zIn begins with "-", prepend an additional "./"
** (or ".\\" on Windows).
*/
int append_escaped_arg(sqlite3_str *pStr, const char *zIn, int isFilename){
  int i;
  unsigned char c;
  int needEscape = 0;
  int n = sqlite3_str_length(pStr);
  char *z = sqlite3_str_value(pStr);

  /* Look for illegal byte-sequences and byte-sequences that require
  ** escaping.  No control-characters are allowed.  All spaces and
  ** non-ASCII unicode characters and some punctuation characters require
  ** escaping. */
  for(i=0; (c = (unsigned char)zIn[i])!=0; i++){
    if( aSafeChar[c] ){
      unsigned char x = aSafeChar[c];
      needEscape = 1;
      if( x==2 ){
        /* Bad ASCII character */
        return 1;
      }else if( x>2 ){
        if( (zIn[i+1]&0xc0)!=0x80
         || (x>=4 && (zIn[i+2]&0xc0)!=0x80)
         || (x==5 && (zIn[i+3]&0xc0)!=0x80)
        ){
          /* Bad UTF8 character */
          return 1;
        }
        i += x-2;
      }
    }
  }

  /* Separate from the previous argument by a space */
  if( n>0 && !isspace(z[n-1]) ){
    sqlite3_str_appendchar(pStr, 1, ' ');
  }

  /* Check for characters that need quoting */
  if( !needEscape ){
    if( isFilename && zIn[0]=='-' ){
      sqlite3_str_appendchar(pStr, 1, '.');
#if defined(_WIN32)
      sqlite3_str_appendchar(pStr, 1, '\\');
#else
      sqlite3_str_appendchar(pStr, 1, '/');
#endif
    }
    sqlite3_str_appendall(pStr, zIn);
  }else{
#if defined(_WIN32)
    /* Quoting strategy for windows:
    ** Put the entire name inside of "...".  Any " characters within
    ** the name get doubled.
    */
    sqlite3_str_appendchar(pStr, 1, '"');
    if( isFilename && zIn[0]=='-' ){
      sqlite3_str_appendchar(pStr, 1, '.');
      sqlite3_str_appendchar(pStr, 1, '\\');
    }else if( zIn[0]=='/' ){
      sqlite3_str_appendchar(pStr, 1, '.');
    }
    for(i=0; (c = (unsigned char)zIn[i])!=0; i++){
      sqlite3_str_appendchar(pStr, 1, (char)c);
      if( c=='"' ) sqlite3_str_appendchar(pStr, 1, '"');
      if( c=='\\' ) sqlite3_str_appendchar(pStr, 1, '\\');
      if( c=='%' && isFilename ) sqlite3_str_append(pStr, "%cd:~,%", 7);
    }
    sqlite3_str_appendchar(pStr, 1, '"');
#else
    /* Quoting strategy for unix:
    ** If the name does not contain ', then surround the whole thing
    ** with '...'.   If there is one or more ' characters within the
    ** name, then put \ before each special character.
    */
    if( strchr(zIn,'\'') ){
      if( isFilename && zIn[0]=='-' ){
        sqlite3_str_appendchar(pStr, 1, '.');
        sqlite3_str_appendchar(pStr, 1, '/');
      }
      for(i=0; (c = (unsigned char)zIn[i])!=0; i++){
        if( aSafeChar[c] && aSafeChar[c]!=2 ){
          sqlite3_str_appendchar(pStr, 1, '\\');
        }
        sqlite3_str_appendchar(pStr, 1, (char)c);
      }
    }else{
      sqlite3_str_appendchar(pStr, 1, '\'');
      if( isFilename && zIn[0]=='-' ){
        sqlite3_str_appendchar(pStr, 1, '.');
        sqlite3_str_appendchar(pStr, 1, '/');
      }
      sqlite3_str_appendall(pStr, zIn);
      sqlite3_str_appendchar(pStr, 1, '\'');
    }
#endif
  }
  return 0;
}
/*****************************************************************************
** End of the append_escaped_arg() routine, adapted from the Fossil         **
*****************************************************************************/

/*****************************************************************************
** The Hash Engine
**
** This is basically SHA3, though with a 160-bit hash, and reducing the
** number of rounds in the KeccakF1600 step function from 24 to 6.
*/
/*
** Macros to determine whether the machine is big or little endian,
** and whether or not that determination is run-time or compile-time.
**
** For best performance, an attempt is made to guess at the byte-order
** using C-preprocessor macros.  If that is unsuccessful, or if
** -DHash_BYTEORDER=0 is set, then byte-order is determined
** at run-time.
*/
#ifndef Hash_BYTEORDER
# if defined(i386)     || defined(__i386__)   || defined(_M_IX86) ||    \
     defined(__x86_64) || defined(__x86_64__) || defined(_M_X64)  ||    \
     defined(_M_AMD64) || defined(_M_ARM)     || defined(__x86)   ||    \
     defined(__arm__)
#   define Hash_BYTEORDER    1234
# elif defined(sparc)    || defined(__ppc__)
#   define Hash_BYTEORDER    4321
# else
#   define Hash_BYTEORDER 0
# endif
#endif

typedef sqlite3_uint64 u64;

/*
** State structure for a Hash hash in progress
*/
typedef struct HashContext HashContext;
struct HashContext {
  union {
    u64 s[25];                /* Keccak state. 5x5 lines of 64 bits each */
    unsigned char x[1600];    /* ... or 1600 bytes */
  } u;
  unsigned nRate;        /* Bytes of input accepted per Keccak iteration */
  unsigned nLoaded;      /* Input bytes loaded into u.x[] so far this cycle */
  unsigned ixMask;       /* Insert next input into u.x[nLoaded^ixMask]. */
  unsigned iSize;        /* 224, 256, 358, or 512 */
};

/*
** A single step of the Keccak mixing function for a 1600-bit state
*/
static void KeccakF1600Step(HashContext *p){
  int i;
  u64 b0, b1, b2, b3, b4;
  u64 c0, c1, c2, c3, c4;
  u64 d0, d1, d2, d3, d4;
  static const u64 RC[] = {
    0x0000000000000001ULL,  0x0000000000008082ULL,
    0x800000000000808aULL,  0x8000000080008000ULL,
    0x000000000000808bULL,  0x0000000080000001ULL,
    0x8000000080008081ULL,  0x8000000000008009ULL,
    0x000000000000008aULL,  0x0000000000000088ULL,
    0x0000000080008009ULL,  0x000000008000000aULL,
    0x000000008000808bULL,  0x800000000000008bULL,
    0x8000000000008089ULL,  0x8000000000008003ULL,
    0x8000000000008002ULL,  0x8000000000000080ULL,
    0x000000000000800aULL,  0x800000008000000aULL,
    0x8000000080008081ULL,  0x8000000000008080ULL,
    0x0000000080000001ULL,  0x8000000080008008ULL
  };
# define a00 (p->u.s[0])
# define a01 (p->u.s[1])
# define a02 (p->u.s[2])
# define a03 (p->u.s[3])
# define a04 (p->u.s[4])
# define a10 (p->u.s[5])
# define a11 (p->u.s[6])
# define a12 (p->u.s[7])
# define a13 (p->u.s[8])
# define a14 (p->u.s[9])
# define a20 (p->u.s[10])
# define a21 (p->u.s[11])
# define a22 (p->u.s[12])
# define a23 (p->u.s[13])
# define a24 (p->u.s[14])
# define a30 (p->u.s[15])
# define a31 (p->u.s[16])
# define a32 (p->u.s[17])
# define a33 (p->u.s[18])
# define a34 (p->u.s[19])
# define a40 (p->u.s[20])
# define a41 (p->u.s[21])
# define a42 (p->u.s[22])
# define a43 (p->u.s[23])
# define a44 (p->u.s[24])
# define ROL64(a,x) ((a<<x)|(a>>(64-x)))

  /*         v---- Number of rounds.  SHA3 has 24 here. */
  for(i=0; i<6; i++){
    c0 = a00^a10^a20^a30^a40;
    c1 = a01^a11^a21^a31^a41;
    c2 = a02^a12^a22^a32^a42;
    c3 = a03^a13^a23^a33^a43;
    c4 = a04^a14^a24^a34^a44;
    d0 = c4^ROL64(c1, 1);
    d1 = c0^ROL64(c2, 1);
    d2 = c1^ROL64(c3, 1);
    d3 = c2^ROL64(c4, 1);
    d4 = c3^ROL64(c0, 1);

    b0 = (a00^d0);
    b1 = ROL64((a11^d1), 44);
    b2 = ROL64((a22^d2), 43);
    b3 = ROL64((a33^d3), 21);
    b4 = ROL64((a44^d4), 14);
    a00 =   b0 ^((~b1)&  b2 );
    a00 ^= RC[i];
    a11 =   b1 ^((~b2)&  b3 );
    a22 =   b2 ^((~b3)&  b4 );
    a33 =   b3 ^((~b4)&  b0 );
    a44 =   b4 ^((~b0)&  b1 );

    b2 = ROL64((a20^d0), 3);
    b3 = ROL64((a31^d1), 45);
    b4 = ROL64((a42^d2), 61);
    b0 = ROL64((a03^d3), 28);
    b1 = ROL64((a14^d4), 20);
    a20 =   b0 ^((~b1)&  b2 );
    a31 =   b1 ^((~b2)&  b3 );
    a42 =   b2 ^((~b3)&  b4 );
    a03 =   b3 ^((~b4)&  b0 );
    a14 =   b4 ^((~b0)&  b1 );

    b4 = ROL64((a40^d0), 18);
    b0 = ROL64((a01^d1), 1);
    b1 = ROL64((a12^d2), 6);
    b2 = ROL64((a23^d3), 25);
    b3 = ROL64((a34^d4), 8);
    a40 =   b0 ^((~b1)&  b2 );
    a01 =   b1 ^((~b2)&  b3 );
    a12 =   b2 ^((~b3)&  b4 );
    a23 =   b3 ^((~b4)&  b0 );
    a34 =   b4 ^((~b0)&  b1 );

    b1 = ROL64((a10^d0), 36);
    b2 = ROL64((a21^d1), 10);
    b3 = ROL64((a32^d2), 15);
    b4 = ROL64((a43^d3), 56);
    b0 = ROL64((a04^d4), 27);
    a10 =   b0 ^((~b1)&  b2 );
    a21 =   b1 ^((~b2)&  b3 );
    a32 =   b2 ^((~b3)&  b4 );
    a43 =   b3 ^((~b4)&  b0 );
    a04 =   b4 ^((~b0)&  b1 );

    b3 = ROL64((a30^d0), 41);
    b4 = ROL64((a41^d1), 2);
    b0 = ROL64((a02^d2), 62);
    b1 = ROL64((a13^d3), 55);
    b2 = ROL64((a24^d4), 39);
    a30 =   b0 ^((~b1)&  b2 );
    a41 =   b1 ^((~b2)&  b3 );
    a02 =   b2 ^((~b3)&  b4 );
    a13 =   b3 ^((~b4)&  b0 );
    a24 =   b4 ^((~b0)&  b1 );
  }
}

/*
** Initialize a new hash.  iSize determines the size of the hash
** in bits and should be one of 224, 256, 384, or 512.  Or iSize
** can be zero to use the default hash size of 256 bits.
*/
static void HashInit(HashContext *p, int iSize){
  memset(p, 0, sizeof(*p));
  p->iSize = iSize;
  if( iSize>=128 && iSize<=512 ){
    p->nRate = (1600 - ((iSize + 31)&~31)*2)/8;
  }else{
    p->nRate = (1600 - 2*256)/8;
  }
#if Hash_BYTEORDER==1234
  /* Known to be little-endian at compile-time. No-op */
#elif Hash_BYTEORDER==4321
  p->ixMask = 7;  /* Big-endian */
#else
  {
    static unsigned int one = 1;
    if( 1==*(unsigned char*)&one ){
      /* Little endian.  No byte swapping. */
      p->ixMask = 0;
    }else{
      /* Big endian.  Byte swap. */
      p->ixMask = 7;
    }
  }
#endif
}

/*
** Make consecutive calls to the HashUpdate function to add new content
** to the hash
*/
static void HashUpdate(
  HashContext *p,
  const unsigned char *aData,
  unsigned int nData
){
  unsigned int i = 0;
  if( aData==0 ) return;
#if Hash_BYTEORDER==1234
  if( (p->nLoaded % 8)==0 && ((aData - (const unsigned char*)0)&7)==0 ){
    for(; i+7<nData; i+=8){
      p->u.s[p->nLoaded/8] ^= *(u64*)&aData[i];
      p->nLoaded += 8;
      if( p->nLoaded>=p->nRate ){
        KeccakF1600Step(p);
        p->nLoaded = 0;
      }
    }
  }
#endif
  for(; i<nData; i++){
#if Hash_BYTEORDER==1234
    p->u.x[p->nLoaded] ^= aData[i];
#elif Hash_BYTEORDER==4321
    p->u.x[p->nLoaded^0x07] ^= aData[i];
#else
    p->u.x[p->nLoaded^p->ixMask] ^= aData[i];
#endif
    p->nLoaded++;
    if( p->nLoaded==p->nRate ){
      KeccakF1600Step(p);
      p->nLoaded = 0;
    }
  }
}

/*
** After all content has been added, invoke HashFinal() to compute
** the final hash.  The function returns a pointer to the binary
** hash value.
*/
static unsigned char *HashFinal(HashContext *p){
  unsigned int i;
  if( p->nLoaded==p->nRate-1 ){
    const unsigned char c1 = 0x86;
    HashUpdate(p, &c1, 1);
  }else{
    const unsigned char c2 = 0x06;
    const unsigned char c3 = 0x80;
    HashUpdate(p, &c2, 1);
    p->nLoaded = p->nRate - 1;
    HashUpdate(p, &c3, 1);
  }
  for(i=0; i<p->nRate; i++){
    p->u.x[i+p->nRate] = p->u.x[i^p->ixMask];
  }
  return &p->u.x[p->nRate];
}

/*
** Implementation of the hash(X) function.
**
** Return a 160-bit BLOB which is the hash of X.
*/
static void hashFunc(
  sqlite3_context *context,
  int argc,
  sqlite3_value **argv
){
  HashContext cx;
  int eType = sqlite3_value_type(argv[0]);
  int nByte = sqlite3_value_bytes(argv[0]);
  if( eType==SQLITE_NULL ) return;
  HashInit(&cx, 160);
  if( eType==SQLITE_BLOB ){
    HashUpdate(&cx, sqlite3_value_blob(argv[0]), nByte);
  }else{
    HashUpdate(&cx, sqlite3_value_text(argv[0]), nByte);
  }
  sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT);
}

/* Register the hash function */
static int hashRegister(sqlite3 *db){
   return sqlite3_create_function(db, "hash", 1,
                SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
                0, hashFunc, 0, 0);
}

/* End of the hashing logic
*****************************************************************************/

/*
** Return the tail of a file pathname.  The tail is the last component
** of the path.  For example, the tail of "/a/b/c.d" is "c.d".
*/
const char *file_tail(const char *z){
  const char *zTail = z;
  if( !zTail ) return 0;
  while( z[0] ){
    if( z[0]=='/' ) zTail = &z[1];
    z++;
  }
  return zTail;
}

/*
** Append error message text to the error file, if an error file is
** specified.  In any case, increment the error count.
*/
static void logError(SQLiteRsync *p, const char *zFormat, ...){
  if( p->zErrFile ){
    FILE *pErr = fopen(p->zErrFile, "a");
    if( pErr ){
      va_list ap;
      va_start(ap, zFormat);
      vfprintf(pErr, zFormat, ap);
      va_end(ap);
      fclose(pErr);
    }
  }
  p->nErr++;
}


/* Read a single big-endian 32-bit unsigned integer from the input
** stream.  Return 0 on success and 1 if there are any errors.
*/
static int readUint32(SQLiteRsync *p, unsigned int *pU){
  unsigned char buf[4];
  if( fread(buf, sizeof(buf), 1, p->pIn)==1 ){
    *pU = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3];
    p->nIn += 4;
    return 0;
  }else{
    logError(p, "failed to read a 32-bit integer\n");
    return 1;
  }
}

/* Write a single big-endian 32-bit unsigned integer to the output stream.
** Return 0 on success and 1 if there are any errors.
*/
static int writeUint32(SQLiteRsync *p, unsigned int x){
  unsigned char buf[4];
  buf[3] = x & 0xff;
  x >>= 8;
  buf[2] = x & 0xff;
  x >>= 8;
  buf[1] = x & 0xff;
  x >>= 8;
  buf[0] = x;
  if( p->pLog ) fwrite(buf, sizeof(buf), 1, p->pLog);
  if( fwrite(buf, sizeof(buf), 1, p->pOut)!=1 ){
    logError(p, "failed to write 32-bit integer 0x%x\n", x);
    p->nWrErr++;
    return 1;
  }
  p->nOut += 4;
  return 0;
}

/* Read a single byte from the wire.
*/
int readByte(SQLiteRsync *p){
  int c = fgetc(p->pIn);
  if( c!=EOF ) p->nIn++;
  return c;
}

/* Write a single byte into the wire.
*/
void writeByte(SQLiteRsync *p, int c){
  if( p->pLog ) fputc(c, p->pLog);
  fputc(c, p->pOut);
  p->nOut++;
}

/* Read a power of two encoded as a single byte.
*/
int readPow2(SQLiteRsync *p){
  int x = readByte(p);
  if( x<0 || x>=32 ){
    logError(p, "read invalid page size %d\n", x);
    return 0;
  }
  return 1<<x;
}

/* Write a power-of-two value onto the wire as a single byte.
*/
void writePow2(SQLiteRsync *p, int c){
  int n;
  if( c<0 || (c&(c-1))!=0 ){
    logError(p, "trying to read invalid page size %d\n", c);
  }
  for(n=0; c>1; n++){ c /= 2; }
  writeByte(p, n);
}

/* Read an array of bytes from the wire.
*/
void readBytes(SQLiteRsync *p, int nByte, void *pData){
  if( fread(pData, 1, nByte, p->pIn)==nByte ){
    p->nIn += nByte;
  }else{
    logError(p, "failed to read %d bytes\n", nByte);
  }
}

/* Write an array of bytes onto the wire.
*/
void writeBytes(SQLiteRsync *p, int nByte, const void *pData){
  if( p->pLog ) fwrite(pData, 1, nByte, p->pLog);
  if( fwrite(pData, 1, nByte, p->pOut)==nByte ){
    p->nOut += nByte;
  }else{
    logError(p, "failed to write %d bytes\n", nByte);
    p->nWrErr++;
  }
}

/* Report an error.
**
** If this happens on the remote side, we send back a *_ERROR
** message.  On the local side, the error message goes to stderr.
*/
static void reportError(SQLiteRsync *p, const char *zFormat, ...){
  va_list ap;
  char *zMsg;
  unsigned int nMsg;
  va_start(ap, zFormat);
  zMsg = sqlite3_vmprintf(zFormat, ap);
  va_end(ap);
  nMsg = zMsg ? (unsigned int)strlen(zMsg) : 0;
  if( p->isRemote ){
    if( p->isReplica ){
      putc(REPLICA_ERROR, p->pOut);
    }else{
      putc(ORIGIN_ERROR, p->pOut);
    }
    writeUint32(p, nMsg);
    writeBytes(p, nMsg, zMsg);
    fflush(p->pOut);
  }else{
    fprintf(stderr, "%s\n", zMsg);
  }
  logError(p, "%s\n", zMsg);
  sqlite3_free(zMsg);
}

/* Send an informational message.
**
** If this happens on the remote side, we send back a *_MSG 
** message.  On the local side, the message goes to stdout.
*/
static void infoMsg(SQLiteRsync *p, const char *zFormat, ...){
  va_list ap;
  char *zMsg;
  unsigned int nMsg;
  va_start(ap, zFormat);
  zMsg = sqlite3_vmprintf(zFormat, ap);
  va_end(ap);
  nMsg = zMsg ? (unsigned int)strlen(zMsg) : 0;
  if( p->isRemote ){
    if( p->isReplica ){
      putc(REPLICA_MSG, p->pOut);
    }else{
      putc(ORIGIN_MSG, p->pOut);
    }
    writeUint32(p, nMsg);
    writeBytes(p, nMsg, zMsg);
    fflush(p->pOut);
  }else{
    printf("%s\n", zMsg);
  }
  sqlite3_free(zMsg);
}

/* Receive and report an error message coming from the other side.
*/
static void readAndDisplayMessage(SQLiteRsync *p, int c){
  unsigned int n = 0;
  char *zMsg;
  const char *zPrefix;
  if( c==ORIGIN_ERROR || c==REPLICA_ERROR ){
    zPrefix = "ERROR: ";
  }else{
    zPrefix = "";
  }
  readUint32(p, &n);
  if( n==0 ){
    fprintf(stderr,"ERROR: unknown (possibly out-of-memory)\n");
  }else{
    zMsg = sqlite3_malloc64( n+1 );
    if( zMsg==0 ){
      fprintf(stderr, "ERROR: out-of-memory\n");
      return;
    }
    memset(zMsg, 0, n+1);
    readBytes(p, n, zMsg);
    fprintf(stderr,"%s%s\n", zPrefix, zMsg);
    if( zPrefix[0] ) logError(p, "%s%s\n", zPrefix, zMsg);
    sqlite3_free(zMsg);
  }
}

/* Construct a new prepared statement.  Report an error and return NULL
** if anything goes wrong.
*/
static sqlite3_stmt *prepareStmtVA(
  SQLiteRsync *p,
  char *zFormat,
  va_list ap
){
  sqlite3_stmt *pStmt = 0;
  char *zSql;
  char *zToFree = 0;
  int rc;

  if( strchr(zFormat,'%') ){
    zSql = sqlite3_vmprintf(zFormat, ap);
    if( zSql==0 ){
      reportError(p, "out-of-memory");
      return 0;
    }else{
      zToFree = zSql;
    }
  }else{
    zSql = zFormat;
  }
  rc = sqlite3_prepare_v2(p->db, zSql, -1, &pStmt, 0);
  if( rc || pStmt==0 ){
    reportError(p, "unable to prepare SQL [%s]: %s", zSql,
                sqlite3_errmsg(p->db));
    sqlite3_finalize(pStmt);
    pStmt = 0;
  }
  if( zToFree ) sqlite3_free(zToFree);
  return pStmt;
}
static sqlite3_stmt *prepareStmt(
  SQLiteRsync *p,
  char *zFormat,
  ...
){
  sqlite3_stmt *pStmt;
  va_list ap;
  va_start(ap, zFormat);
  pStmt = prepareStmtVA(p, zFormat, ap);
  va_end(ap);
  return pStmt;
}

/* Run a single SQL statement
*/
static void runSql(SQLiteRsync *p, char *zSql, ...){
  sqlite3_stmt *pStmt;
  va_list ap;

  va_start(ap, zSql);
  pStmt = prepareStmtVA(p, zSql, ap);
  va_end(ap);
  if( pStmt ){
    int rc = sqlite3_step(pStmt);
    if( rc==SQLITE_ROW ) rc = sqlite3_step(pStmt);
    if( rc!=SQLITE_OK && rc!=SQLITE_DONE ){
      reportError(p, "SQL statement [%s] failed: %s", zSql,
                  sqlite3_errmsg(p->db));
    }
    sqlite3_finalize(pStmt);
  }
}

/* Run an SQL statement that returns a single unsigned 32-bit integer result
*/
static int runSqlReturnUInt(
  SQLiteRsync *p,
  unsigned int *pRes,
  char *zSql,
  ...
){
  sqlite3_stmt *pStmt;
  int res = 0;
  va_list ap;

  va_start(ap, zSql);
  pStmt = prepareStmtVA(p, zSql, ap);
  va_end(ap);
  if( pStmt==0 ){
    res = 1;
  }else{
    int rc = sqlite3_step(pStmt);
    if( rc==SQLITE_ROW ){
      *pRes = (unsigned int)(sqlite3_column_int64(pStmt, 0)&0xffffffff);
    }else{
      reportError(p, "SQL statement [%s] failed: %s", zSql,
                  sqlite3_errmsg(p->db));
      res = 1;
    }
    sqlite3_finalize(pStmt);
  }
  return res;
}

/* Run an SQL statement that returns a single TEXT value that is no more
** than 99 bytes in length.
*/
static int runSqlReturnText(
  SQLiteRsync *p,
  char *pRes,
  char *zSql,
  ...
){
  sqlite3_stmt *pStmt;
  int res = 0;
  va_list ap;

  va_start(ap, zSql);
  pStmt = prepareStmtVA(p, zSql, ap);
  va_end(ap);
  pRes[0] = 0;
  if( pStmt==0 ){
    res = 1;
  }else{
    int rc = sqlite3_step(pStmt);
    if( rc==SQLITE_ROW ){
      const unsigned char *a = sqlite3_column_text(pStmt, 0);
      int n;
      if( a==0 ){
        pRes[0] = 0;
      }else{
        n = sqlite3_column_bytes(pStmt, 0);
        if( n>99 ) n = 99;
        memcpy(pRes, a, n);
        pRes[n] = 0;
      }
    }else{
      reportError(p, "SQL statement [%s] failed: %s", zSql,
                  sqlite3_errmsg(p->db));
      res = 1;
    }
    sqlite3_finalize(pStmt);
  }
  return res;
}

/* Close the database connection associated with p
*/
static void closeDb(SQLiteRsync *p){
  if( p->db ){
    sqlite3_close(p->db);
    p->db = 0;
  }
}

/*
** Run the origin-side protocol.
**
** Begin by sending the ORIGIN_BEGIN message with two arguments,
** nPage, and szPage.  Then enter a loop responding to message from
** the replica:
**
**    REPLICA_ERROR  size  text
**
**         Report an error from the replica and quit
**
**    REPLICA_END
**
**         The replica is terminating.  Stop processing now.
**
**    REPLICA_HASH  hash
**
**         The argument is the 20-byte SHA1 hash for the next page
**         page hashes appear in sequential order with no gaps.
**
**    REPLICA_READY
**
**         The replica has sent all the hashes that it intends to send.
**         This side (the origin) can now start responding with page
**         content for pages that do not have a matching hash.
*/
static void originSide(SQLiteRsync *p){
  int rc = 0;
  int c = 0;
  unsigned int nPage = 0;
  unsigned int iPage = 0;
  unsigned int lockBytePage = 0;
  unsigned int szPg = 0;
  sqlite3_stmt *pCkHash = 0;
  char buf[200];

  p->isReplica = 0;
  if( p->bCommCheck ){
    infoMsg(p, "origin  zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d",
               p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION);
    writeByte(p, ORIGIN_END);
    fflush(p->pOut);
  }else{
    /* Open the ORIGIN database. */
    rc = sqlite3_open_v2(p->zOrigin, &p->db, SQLITE_OPEN_READWRITE, 0);
    if( rc ){
      reportError(p, "cannot open origin \"%s\": %s",
                  p->zOrigin, sqlite3_errmsg(p->db));
      closeDb(p);
      return;
    }
    hashRegister(p->db);
    runSql(p, "BEGIN");
    runSqlReturnText(p, buf, "PRAGMA journal_mode");
    if( sqlite3_stricmp(buf,"wal")!=0 ){
      reportError(p, "Origin database is not in WAL mode");
    }
    runSqlReturnUInt(p, &nPage, "PRAGMA page_count");
    runSqlReturnUInt(p, &szPg, "PRAGMA page_size");
  
    if( p->nErr==0 ){
      /* Send the ORIGIN_BEGIN message */
      writeByte(p, ORIGIN_BEGIN);
      writeByte(p, PROTOCOL_VERSION);
      writePow2(p, szPg);
      writeUint32(p, nPage);
      fflush(p->pOut);
      p->nPage = nPage;
      p->szPage = szPg;
      p->iProtocol = PROTOCOL_VERSION;
      lockBytePage = (1<<30)/szPg + 1;
    }
  }
  
  /* Respond to message from the replica */
  while( p->nErr<=p->nWrErr && (c = readByte(p))!=EOF && c!=REPLICA_END ){
    switch( c ){
      case REPLICA_BEGIN: {
        /* This message is only sent if the replica received an origin-protocol
        ** that is larger than what it knows about.  The replica sends back
        ** a counter-proposal of an earlier protocol which the origin can
        ** accept by resending a new ORIGIN_BEGIN. */
        p->iProtocol = readByte(p);
        writeByte(p, ORIGIN_BEGIN);
        writeByte(p, p->iProtocol);
        writePow2(p, p->szPage);
        writeUint32(p, p->nPage);
        break;
      }
      case REPLICA_MSG:
      case REPLICA_ERROR: {
        readAndDisplayMessage(p, c);
        break;
      }
      case REPLICA_HASH: {
        if( pCkHash==0 ){
          runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)");
          pCkHash = prepareStmt(p,
            "INSERT INTO badHash SELECT pgno FROM sqlite_dbpage('main')"
            " WHERE pgno=?1 AND hash(data)!=?2"
          );
          if( pCkHash==0 ) break;
        }
        p->nHashSent++;
        iPage++;
        sqlite3_bind_int64(pCkHash, 1, iPage);
        readBytes(p, 20, buf);
        sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC);
        rc = sqlite3_step(pCkHash);
        if( rc!=SQLITE_DONE ){
          reportError(p, "SQL statement [%s] failed: %s",
                      sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
        }
        sqlite3_reset(pCkHash);
        break;
      }
      case REPLICA_READY: {
        sqlite3_stmt *pStmt;
        sqlite3_finalize(pCkHash);
        pCkHash = 0;
        if( iPage+1<p->nPage ){
          runSql(p, "WITH RECURSIVE c(n) AS"
                    " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
                    " INSERT INTO badHash SELECT n FROM c",
                    iPage+1, p->nPage);
        }
        runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
        pStmt = prepareStmt(p,
               "SELECT pgno, data"
               "  FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
        if( pStmt==0 ) break;
        while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
          unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
          const void *pContent = sqlite3_column_blob(pStmt, 1);
          writeByte(p, ORIGIN_PAGE);
          writeUint32(p, pgno);
          writeBytes(p, szPg, pContent);
          p->nPageSent++;
        }
        sqlite3_finalize(pStmt);
        writeByte(p, ORIGIN_TXN);
        writeUint32(p, nPage);
        writeByte(p, ORIGIN_END);
        fflush(p->pOut);
        break;
      }
      default: {
        reportError(p, "Unknown message 0x%02x %lld bytes into conversation",
                       c, p->nIn);
        break;
      }
    }
  }

  if( pCkHash ) sqlite3_finalize(pCkHash);
  closeDb(p);
}

/*
** Run the replica-side protocol.  The protocol is passive in the sense
** that it only response to message from the origin side.
**
**    ORIGIN_BEGIN  idProtocol szPage nPage
**
**         The origin is reporting the protocol version number, the size of
**         each page in the origin database (sent as a single-byte power-of-2),
**         and the number of pages in the origin database.
**         This procedure checks compatibility, and if everything is ok,
**         it starts sending hashes of pages already present back to the origin.
**
**    ORIGIN_ERROR  size text
**
**         Report the received error and quit.
**
**    ORIGIN_PAGE  pgno content
**
**         Update the content of the given page.
**
**    ORIGIN_TXN   pgno
**
**         Close the update transaction.  The total database size is pgno
**         pages.
**
**    ORIGIN_END
**
**         Expect no more transmissions from the origin.
*/
static void replicaSide(SQLiteRsync *p){
  int c;
  sqlite3_stmt *pIns = 0;
  unsigned int szOPage = 0;
  char buf[65536];

  p->isReplica = 1;
  if( p->bCommCheck ){
    infoMsg(p, "replica zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d",
               p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION);
    writeByte(p, REPLICA_END);
    fflush(p->pOut);
  }

  /* Respond to message from the origin.  The origin will initiate the
  ** the conversation with an ORIGIN_BEGIN message.
  */
  while( p->nErr<=p->nWrErr && (c = readByte(p))!=EOF && c!=ORIGIN_END ){
    switch( c ){
      case ORIGIN_MSG:
      case ORIGIN_ERROR: {
        readAndDisplayMessage(p, c);
        break;
      }
      case ORIGIN_BEGIN: {
        unsigned int nOPage = 0;
        unsigned int nRPage = 0, szRPage = 0;
        int rc = 0;
        sqlite3_stmt *pStmt = 0;

        closeDb(p);
        p->iProtocol = readByte(p);
        szOPage = readPow2(p);
        readUint32(p, &nOPage);
        if( p->nErr ) break;
        if( p->iProtocol>PROTOCOL_VERSION ){
          /* If the protocol version on the origin side is larger, send back
          ** a REPLICA_BEGIN message with the protocol version number of the
          ** replica side.  This gives the origin an opportunity to resend
          ** a new ORIGIN_BEGIN with a reduced protocol version. */
          writeByte(p, REPLICA_BEGIN);
          writeByte(p, PROTOCOL_VERSION);
          break;
        }
        p->nPage = nOPage;
        p->szPage = szOPage;
        rc = sqlite3_open(":memory:", &p->db);
        if( rc ){
          reportError(p, "cannot open in-memory database: %s",
                      sqlite3_errmsg(p->db));
          closeDb(p);
          break;
        }
        runSql(p, "ATTACH %Q AS 'replica'", p->zReplica);
        if( p->nErr ){
          closeDb(p);
          break;
        }
        hashRegister(p->db);
        if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){
          break;
        }
        if( nRPage==0 ){
          runSql(p, "PRAGMA replica.page_size=%u", szOPage);
          runSql(p, "PRAGMA replica.journal_mode=WAL");
          runSql(p, "SELECT * FROM replica.sqlite_schema");
        }
        runSql(p, "BEGIN IMMEDIATE");
        runSqlReturnText(p, buf, "PRAGMA replica.journal_mode");
        if( strcmp(buf, "wal")!=0 ){
          reportError(p, "replica is not in WAL mode");
          break;
        }
        runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count");
        runSqlReturnUInt(p, &szRPage, "PRAGMA replica.page_size");
        if( szRPage!=szOPage ){
          reportError(p, "page size mismatch; origin is %d bytes and "
                         "replica is %d bytes", szOPage, szRPage);
          break;
        }

        pStmt = prepareStmt(p,
                   "SELECT hash(data) FROM sqlite_dbpage('replica')"
                   " WHERE pgno<=min(%d,%d)"
                   " ORDER BY pgno", nRPage, nOPage);
        while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
          const unsigned char *a = sqlite3_column_blob(pStmt, 0);
          writeByte(p, REPLICA_HASH);
          writeBytes(p, 20, a);
          p->nHashSent++;
        }
        sqlite3_finalize(pStmt);
        writeByte(p, REPLICA_READY);
        fflush(p->pOut);
        runSql(p, "PRAGMA writable_schema=ON");
        break;
      }
      case ORIGIN_TXN: {
        unsigned int nOPage = 0;
        readUint32(p, &nOPage);
        if( pIns==0 ){
          /* Nothing has changed */
          runSql(p, "COMMIT");
        }else if( p->nErr ){
          runSql(p, "ROLLBACK");
        }else{
          if( nOPage<0xffffffff ){
            int rc;
            sqlite3_bind_int64(pIns, 1, nOPage+1);
            sqlite3_bind_null(pIns, 2);
            rc = sqlite3_step(pIns);
            if( rc!=SQLITE_DONE ){
              reportError(p,
                  "SQL statement [%s] failed (pgno=%u, data=NULL): %s",
                  sqlite3_sql(pIns), nOPage, sqlite3_errmsg(p->db));
            }
            sqlite3_reset(pIns);
          }
          p->nPage = nOPage;
          runSql(p, "COMMIT");
        }
        break;
      }
      case ORIGIN_PAGE: {
        unsigned int pgno = 0;
        int rc;
        readUint32(p, &pgno);
        if( p->nErr ) break;
        if( pIns==0 ){
          pIns = prepareStmt(p,
            "INSERT INTO sqlite_dbpage(pgno,data,schema)VALUES(?1,?2,'replica')"
          );
          if( pIns==0 ) break;
        }
        readBytes(p, szOPage, buf);
        if( p->nErr ) break;
        p->nPageSent++;
        sqlite3_bind_int64(pIns, 1, pgno);
        sqlite3_bind_blob(pIns, 2, buf, szOPage, SQLITE_STATIC);
        rc = sqlite3_step(pIns);
        if( rc!=SQLITE_DONE ){
          reportError(p, "SQL statement [%s] failed (pgno=%u): %s",
                 sqlite3_sql(pIns), pgno, sqlite3_errmsg(p->db));
        }
        sqlite3_reset(pIns);
        break;
      }
      default: {
        reportError(p, "Unknown message 0x%02x %lld bytes into conversation",
                       c, p->nIn);
        break;
      }
    }
  }

  if( pIns ) sqlite3_finalize(pIns);
  closeDb(p);
}

/*
** The argument might be -vvv...vv with any number of "v"s.  Return
** the number of "v"s.  Return 0 if the argument is not a -vvv...v.
*/
static int numVs(const char *z){
  int n = 0;
  if( z[0]!='-' ) return 0;
  z++;
  if( z[0]=='-' ) z++;
  while( z[0]=='v' ){ n++; z++; }
  if( z[0]==0 ) return n;
  return 0;
}

/*
** Get the argument to an --option.  Throw an error and die if no argument
** is available.
*/
static const char *cmdline_option_value(int argc, const char * const*argv,
                                        int i){
  if( i==argc ){
    fprintf(stderr,"%s: Error: missing argument to %s\n",
            argv[0], argv[argc-1]);
    exit(1);
  }
  return argv[i];
}

/*
** Return the current time in milliseconds since the Julian epoch.
*/
sqlite3_int64 currentTime(void){
  sqlite3_int64 now = 0;
  sqlite3_vfs *pVfs = sqlite3_vfs_find(0);
  if( pVfs && pVfs->iVersion>=2 && pVfs->xCurrentTimeInt64!=0 ){
    pVfs->xCurrentTimeInt64(pVfs, &now);
  }
  return now;
}

/*
** Input string zIn might be in any of these formats:
**
**    (1) PATH
**    (2) HOST:PATH
**    (3) USER@HOST:PATH
**
** For format 1, return NULL.  For formats 2 and 3, return
** a pointer to the ':' character that separates the hostname
** from the path.
*/
static char *hostSeparator(const char *zIn){
  char *zPath = strchr(zIn, ':');
  if( zPath==0 ) return 0;
#ifdef _WIN32
  if( isalpha(zIn[0]) && zIn[1]==':' && (zIn[2]=='/' || zIn[2]=='\\') ){
    return 0;
  }
#endif
  while( zIn<zPath ){
    if( zIn[0]=='/' ) return 0;
    if( zIn[0]=='\\' ) return 0;
    zIn++;
  }
  return zPath;

}

/*
** Parse command-line arguments.  Dispatch subroutines to do the
** requested work.
**
** Input formats:
**
**  (1)    sqlite3-rsync  FILENAME1  USER@HOST:FILENAME2
**
**  (2)    sqlite3-rsync  USER@HOST:FILENAME1  FILENAME2
**
**  (3)    sqlite3-rsync --origin FILENAME1
**
**  (4)    sqlite3-rsync --replica FILENAME2
**
** The user types (1) or (2).  SSH launches (3) or (4).
**
** If (1) is seen then popen2 is used launch (4) on the remote and
** originSide() is called locally.
**
** If (2) is seen, then popen2() is used to launch (3) on the remote
** and replicaSide() is run locally.
**
** If (3) is seen, call originSide() on stdin and stdout.
**
q** If (4) is seen, call replicaSide() on stdin and stdout.
*/
int main(int argc, char const * const *argv){
  int isOrigin = 0;
  int isReplica = 0;
  int i;
  SQLiteRsync ctx;
  char *zDiv;
  FILE *pIn = 0;
  FILE *pOut = 0;
  int childPid = 0;
  const char *zSsh = "ssh";
  const char *zExe = "sqlite3-rsync";
  char *zCmd = 0;
  sqlite3_int64 tmStart;
  sqlite3_int64 tmEnd;
  sqlite3_int64 tmElapse;
  const char *zRemoteErrFile = 0;

#define cli_opt_val cmdline_option_value(argc, argv, ++i)
  memset(&ctx, 0, sizeof(ctx));
  for(i=1; i<argc; i++){
    const char *z = argv[i];
    if( strcmp(z,"--origin")==0 ){
      isOrigin = 1;
      continue;
    }
    if( strcmp(z,"--replica")==0 ){
      isReplica = 1;
      continue;
    }
    if( numVs(z) ){
      ctx.eVerbose += numVs(z);
      continue;
    }
    if( strcmp(z, "--ssh")==0 ){
      zSsh = cli_opt_val;
      continue;
    }
    if( strcmp(z, "--exe")==0 ){
      zExe = cli_opt_val;
      continue;
    }
    if( strcmp(z, "--logfile")==0 ){
      /* DEBUG OPTION:  --logfile FILENAME
      ** Cause all local output traffic to be duplicated in FILENAME */
      const char *zLog = cli_opt_val;
      if( ctx.pLog ) fclose(ctx.pLog);
      ctx.pLog = fopen(zLog, "wb");
      if( ctx.pLog==0 ){
        fprintf(stderr, "cannot open \"%s\" for writing\n", argv[i]);
        return 1;
      }
      continue;
    }
    if( strcmp(z, "--errorfile")==0 ){
      /* DEBUG OPTION:  --errorfile FILENAME
      ** Error messages on the local side are written into FILENAME */
      ctx.zErrFile = cli_opt_val;
      continue;
    }
    if( strcmp(z, "--remote-errorfile")==0 ){
      /* DEBUG OPTION:  --remote-errorfile FILENAME
      ** Error messages on the remote side are written into FILENAME on
      ** the remote side. */
      zRemoteErrFile = cli_opt_val;
      continue;
    }
    if( strcmp(z, "-help")==0 || strcmp(z, "--help")==0
     || strcmp(z, "-?")==0
    ){
      printf("%s", zUsage);
      return 0;
    }
    if( strcmp(z, "--version")==0 ){
      printf("%s\n", sqlite3_sourceid());
      return 0;
    }
    if( z[0]=='-' ){
      if( strcmp(z,"--commcheck")==0 ){  /* DEBUG ONLY */
        /* Run a communication check with the remote side.  Do not attempt
        ** to exchange any database connection */
        ctx.bCommCheck = 1;
        continue;
      }
      if( strcmp(z,"--arg-escape-check")==0 ){  /* DEBUG ONLY */
        /* Test the append_escaped_arg() routine by using it to render a
        ** copy of the input command-line, assuming all arguments except
        ** this one are filenames. */
        sqlite3_str *pStr = sqlite3_str_new(0);
        int k;
        for(k=0; k<argc; k++){
          append_escaped_arg(pStr, argv[k], i!=k);
        }
        printf("%s\n", sqlite3_str_value(pStr));
        return 0;
      }
      fprintf(stderr,
         "unknown option: \"%s\". Use --help for more detail.\n", z);
      return 1;
    }
    if( ctx.zOrigin==0 ){
      ctx.zOrigin = z;
    }else if( ctx.zReplica==0 ){
      ctx.zReplica = z;
    }else{
      fprintf(stderr, "Unknown argument: \"%s\"\n", z);
      return 1;
    }
  }
  if( ctx.zOrigin==0 ){
    fprintf(stderr, "missing ORIGIN database filename\n");
    return 1;
  }
  if( ctx.zReplica==0 ){
    fprintf(stderr, "missing REPLICA database filename\n");
    return 1;
  }
  if( isOrigin && isReplica ){
    fprintf(stderr, "bad option combination\n");
    return 1;
  }
  if( isOrigin ){
    ctx.pIn = stdin;
    ctx.pOut = stdout;
    ctx.isRemote = 1;
    originSide(&ctx);
    return 0;
  }
  if( isReplica ){
    ctx.pIn = stdin;
    ctx.pOut = stdout;
    ctx.isRemote = 1;
    replicaSide(&ctx);
    return 0;
  }
  if( ctx.zReplica==0 ){
    fprintf(stderr, "missing REPLICA database filename\n");
    return 1;
  }
  tmStart = currentTime();
  zDiv = hostSeparator(ctx.zOrigin);
  if( zDiv ){
    if( hostSeparator(ctx.zReplica)!=0 ){
      fprintf(stderr,
         "At least one of ORIGIN and REPLICA must be a local database\n"
         "You provided two remote databases.\n");
      return 1;
    }
    /* Remote ORIGIN and local REPLICA */
    sqlite3_str *pStr = sqlite3_str_new(0);
    append_escaped_arg(pStr, zSsh, 1);
    sqlite3_str_appendf(pStr, " -e none");
    *(zDiv++) = 0;
    append_escaped_arg(pStr, ctx.zOrigin, 0);
    append_escaped_arg(pStr, zExe, 1);
    append_escaped_arg(pStr, "--origin", 0);
    if( ctx.bCommCheck ){
      append_escaped_arg(pStr, "--commcheck", 0);
      if( ctx.eVerbose==0 ) ctx.eVerbose = 1;
    }
    if( zRemoteErrFile ){
      append_escaped_arg(pStr, "--errorfile", 0);
      append_escaped_arg(pStr, zRemoteErrFile, 1);
    }
    append_escaped_arg(pStr, zDiv, 1);
    append_escaped_arg(pStr, file_tail(ctx.zReplica), 1);
    zCmd = sqlite3_str_finish(pStr);
    if( ctx.eVerbose>=2 ) printf("%s\n", zCmd);
    if( popen2(zCmd, &ctx.pIn, &ctx.pOut, &childPid, 0) ){
      fprintf(stderr, "Could not start auxiliary process: %s\n", zCmd);
      return 1;
    }
    replicaSide(&ctx);
  }else if( (zDiv = hostSeparator(ctx.zReplica))!=0 ){
    /* Local ORIGIN and remote REPLICA */
    sqlite3_str *pStr = sqlite3_str_new(0);
    append_escaped_arg(pStr, zSsh, 1);
    sqlite3_str_appendf(pStr, " -e none");
    *(zDiv++) = 0;
    append_escaped_arg(pStr, ctx.zReplica, 0);
    append_escaped_arg(pStr, zExe, 1);
    append_escaped_arg(pStr, "--replica", 0);
    if( ctx.bCommCheck ){
      append_escaped_arg(pStr, "--commcheck", 0);
      if( ctx.eVerbose==0 ) ctx.eVerbose = 1;
    }
    if( zRemoteErrFile ){
      append_escaped_arg(pStr, "--errorfile", 0);
      append_escaped_arg(pStr, zRemoteErrFile, 1);
    }
    append_escaped_arg(pStr, file_tail(ctx.zOrigin), 1);
    append_escaped_arg(pStr, zDiv, 1);
    zCmd = sqlite3_str_finish(pStr);
    if( ctx.eVerbose>=2 ) printf("%s\n", zCmd);
    if( popen2(zCmd, &ctx.pIn, &ctx.pOut, &childPid, 0) ){
      fprintf(stderr, "Could not start auxiliary process: %s\n", zCmd);
      return 1;
    }
    originSide(&ctx);
  }else{
    /* Local ORIGIN and REPLICA */
    sqlite3_str *pStr = sqlite3_str_new(0);
    append_escaped_arg(pStr, argv[0], 1);
    append_escaped_arg(pStr, "--replica", 0);
    if( ctx.bCommCheck ){
      append_escaped_arg(pStr, "--commcheck", 0);
    }
    if( zRemoteErrFile ){
      append_escaped_arg(pStr, "--errorfile", 0);
      append_escaped_arg(pStr, zRemoteErrFile, 1);
    }
    append_escaped_arg(pStr, ctx.zOrigin, 1);
    append_escaped_arg(pStr, ctx.zReplica, 1);
    zCmd = sqlite3_str_finish(pStr);
    if( ctx.eVerbose>=2 ) printf("%s\n", zCmd);
    if( popen2(zCmd, &ctx.pIn, &ctx.pOut, &childPid, 0) ){
      fprintf(stderr, "Could not start auxiliary process: %s\n", zCmd);
      return 1;
    }
    originSide(&ctx);
  }
  pclose2(ctx.pIn, ctx.pOut, childPid);
  if( ctx.pLog ) fclose(ctx.pLog);
  tmEnd = currentTime();
  tmElapse = tmEnd - tmStart;  /* Elapse time in milliseconds */
  if( ctx.nErr ){
    printf("Databases were not synced due to errors\n");
  }
  if( ctx.eVerbose>=1 ){
    char *zMsg;
    sqlite3_int64 szTotal = (sqlite3_int64)ctx.nPage*(sqlite3_int64)ctx.szPage;
    sqlite3_int64 nIO = ctx.nOut +ctx.nIn;
    zMsg = sqlite3_mprintf("sent %,lld bytes, received %,lld bytes",
                           ctx.nOut, ctx.nIn);
    printf("%s", zMsg);
    sqlite3_free(zMsg);
    if( tmElapse>0 ){
      zMsg = sqlite3_mprintf(", %,.2f bytes/sec",
                             1000.0*(double)nIO/(double)tmElapse);
      printf("%s\n", zMsg);
      sqlite3_free(zMsg);
    }else{
      printf("\n");
    }
    if( ctx.nErr==0 ){
      if( nIO<=szTotal && nIO>0 ){
        zMsg = sqlite3_mprintf("total size %,lld  speedup is %.2f",
           szTotal, (double)szTotal/(double)nIO);
      }else{
        zMsg = sqlite3_mprintf("total size %,lld", szTotal);
      }
      printf("%s\n", zMsg);
      sqlite3_free(zMsg);
    }
  }
  sqlite3_free(zCmd);
  if( pIn!=0 && pOut!=0 ){
    pclose2(pIn, pOut, childPid);
  }
  return ctx.nErr;
}