SQLite

Check-in [b979d02f]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Improved --commcheck. Add the infoMsg() function which is useful for debugging.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | sqlite3-rsync
Files: files | file ages | folders
SHA3-256: b979d02ffd1370d8840328bce06c76c224f0fc1fb54b47d6c904547580a820a1
User & Date: drh 2024-09-12 16:54:34
Context
2024-09-12
17:06
Replica must be in writable_schema mode. (check-in: e3855257 user: drh tags: sqlite3-rsync)
16:54
Improved --commcheck. Add the infoMsg() function which is useful for debugging. (check-in: b979d02f user: drh tags: sqlite3-rsync)
15:51
Pass the names of both the origin and the replica databases to the remote side, so that if the remote is the replica, it will have access to the origin database name in case the replica name is really a directory. (check-in: 435c3017 user: drh tags: sqlite3-rsync)
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to tool/sqlite3-rsync.c.

46
47
48
49
50
51
52

53
54
55
56
57
58
59
  FILE *pOut;              /* Transmit to the other side */
  FILE *pIn;               /* Receive from the other side */
  sqlite3 *db;             /* Database connection */
  int nErr;                /* Number of errors encountered */
  u8 eVerbose;             /* Bigger for more output.  0 means none. */
  u8 bCommCheck;           /* True to debug the communication protocol */
  u8 isRemote;             /* On the remote side of a connection */

  u8 iProtocol;            /* Protocol version number */
  sqlite3_uint64 nOut;     /* Bytes transmitted */
  sqlite3_uint64 nIn;      /* Bytes received */
  unsigned int nPage;      /* Total number of pages in the database */
  unsigned int szPage;     /* Database page size */
  unsigned int nHashSent;  /* Hashes sent (replica to origin) */
  unsigned int nPageSent;  /* Page contents sent (origin to replica) */







>







46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
  FILE *pOut;              /* Transmit to the other side */
  FILE *pIn;               /* Receive from the other side */
  sqlite3 *db;             /* Database connection */
  int nErr;                /* Number of errors encountered */
  u8 eVerbose;             /* Bigger for more output.  0 means none. */
  u8 bCommCheck;           /* True to debug the communication protocol */
  u8 isRemote;             /* On the remote side of a connection */
  u8 isReplica;            /* True if running on the replica side */
  u8 iProtocol;            /* Protocol version number */
  sqlite3_uint64 nOut;     /* Bytes transmitted */
  sqlite3_uint64 nIn;      /* Bytes received */
  unsigned int nPage;      /* Total number of pages in the database */
  unsigned int szPage;     /* Database page size */
  unsigned int nHashSent;  /* Hashes sent (replica to origin) */
  unsigned int nPageSent;  /* Page contents sent (origin to replica) */
68
69
70
71
72
73
74

75
76
77
78
79
80

81
82
83
84
85
86
87
/* Magic numbers to identify particular messages sent over the wire.
*/
#define ORIGIN_BEGIN    0x41     /* Initial message */
#define ORIGIN_END      0x42     /* Time to quit */
#define ORIGIN_ERROR    0x43     /* Error message from the remote */
#define ORIGIN_PAGE     0x44     /* New page data */
#define ORIGIN_TXN      0x45     /* Transaction commit */


#define REPLICA_BEGIN   0x61     /* Welcome message */
#define REPLICA_ERROR   0x62     /* Error.  Report and quit. */
#define REPLICA_END     0x63     /* Replica wants to stop */
#define REPLICA_HASH    0x64     /* One or more pages hashes to report */
#define REPLICA_READY   0x65     /* Read to receive page content */



/****************************************************************************
** Beginning of the popen2() implementation copied from Fossil  *************
****************************************************************************/
#ifdef _WIN32
#include <windows.h>







>






>







69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/* Magic numbers to identify particular messages sent over the wire.
*/
#define ORIGIN_BEGIN    0x41     /* Initial message */
#define ORIGIN_END      0x42     /* Time to quit */
#define ORIGIN_ERROR    0x43     /* Error message from the remote */
#define ORIGIN_PAGE     0x44     /* New page data */
#define ORIGIN_TXN      0x45     /* Transaction commit */
#define ORIGIN_MSG      0x46     /* Informational message */

#define REPLICA_BEGIN   0x61     /* Welcome message */
#define REPLICA_ERROR   0x62     /* Error.  Report and quit. */
#define REPLICA_END     0x63     /* Replica wants to stop */
#define REPLICA_HASH    0x64     /* One or more pages hashes to report */
#define REPLICA_READY   0x65     /* Read to receive page content */
#define REPLICA_MSG     0x66     /* Informational message */


/****************************************************************************
** Beginning of the popen2() implementation copied from Fossil  *************
****************************************************************************/
#ifdef _WIN32
#include <windows.h>
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
  }
  return 0;
}
/*****************************************************************************
** End of the append_escaped_arg() routine, adapted from the Fossil         **
*****************************************************************************/


/* For Debugging, specifically for --commcheck:
**
** Read a single line of text from p->pIn.  Write this to standard
** output if and only if p->eVerbose>0.
*/
static void echoOneLine(SQLiteRsync *p){
  char zLine[1000];
  if( fgets(zLine, sizeof(zLine), p->pIn) ){
    if( p->eVerbose ) printf("GOT: %s", zLine);
  }
}

/*
** Return the tail of a file pathname.  The tail is the last component
** of the path.  For example, the tail of "/a/b/c.d" is "c.d".
*/
const char *file_tail(const char *z){
  const char *zTail = z;
  if( !zTail ) return 0;







<
<
<
<
<
<
<
<
<
<
<
<
<







502
503
504
505
506
507
508













509
510
511
512
513
514
515
  }
  return 0;
}
/*****************************************************************************
** End of the append_escaped_arg() routine, adapted from the Fossil         **
*****************************************************************************/














/*
** Return the tail of a file pathname.  The tail is the last component
** of the path.  For example, the tail of "/a/b/c.d" is "c.d".
*/
const char *file_tail(const char *z){
  const char *zTail = z;
  if( !zTail ) return 0;
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654




























655
656
657
658
659







660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
  }else{
    p->nErr++;
  }
}

/* Report an error.
**
** If this happens on the remote side, we send back a REMOTE_ERROR
** message.  On the local side, the error message goes to stderr.
*/
static void reportError(SQLiteRsync *p, const char *zFormat, ...){
  va_list ap;
  char *zMsg;
  unsigned int nMsg;
  va_start(ap, zFormat);
  zMsg = sqlite3_vmprintf(zFormat, ap);
  va_end(ap);
  nMsg = zMsg ? (unsigned int)strlen(zMsg) : 0;
  if( p->isRemote ){
    if( p->zReplica ){
      putc(REPLICA_ERROR, p->pOut);
    }else{
      putc(ORIGIN_ERROR, p->pOut);
    }
    writeUint32(p, nMsg);
    writeBytes(p, nMsg, zMsg);
    fflush(p->pOut);
  }else{
    fprintf(stderr, "%s\n", zMsg);
  }
  sqlite3_free(zMsg);
  p->nErr++;
}





























/* Receive and report an error message coming from the other side.
*/
static void readAndDisplayError(SQLiteRsync *p){
  unsigned int n = 0;
  char *zMsg;







  (void)readUint32(p, &n);
  if( n==0 ){
    fprintf(stderr,"ERROR: unknown (possibly out-of-memory)\n");
  }else{
    zMsg = sqlite3_malloc64( n+1 );
    if( zMsg==0 ){
      fprintf(stderr, "ERROR: out-of-memory\n");
      return;
    }
    memset(zMsg, 0, n+1);
    readBytes(p, n, zMsg);
    fprintf(stderr,"ERROR: %s\n", zMsg);
    sqlite3_free(zMsg);
  }
  p->nErr++;
}

/* Construct a new prepared statement.  Report an error and return NULL
** if anything goes wrong.
*/
static sqlite3_stmt *prepareStmtVA(
  SQLiteRsync *p,







|











|














>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


|


>
>
>
>
>
>
>
|










|


<







611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698

699
700
701
702
703
704
705
  }else{
    p->nErr++;
  }
}

/* Report an error.
**
** If this happens on the remote side, we send back a *_ERROR
** message.  On the local side, the error message goes to stderr.
*/
static void reportError(SQLiteRsync *p, const char *zFormat, ...){
  va_list ap;
  char *zMsg;
  unsigned int nMsg;
  va_start(ap, zFormat);
  zMsg = sqlite3_vmprintf(zFormat, ap);
  va_end(ap);
  nMsg = zMsg ? (unsigned int)strlen(zMsg) : 0;
  if( p->isRemote ){
    if( p->isReplica ){
      putc(REPLICA_ERROR, p->pOut);
    }else{
      putc(ORIGIN_ERROR, p->pOut);
    }
    writeUint32(p, nMsg);
    writeBytes(p, nMsg, zMsg);
    fflush(p->pOut);
  }else{
    fprintf(stderr, "%s\n", zMsg);
  }
  sqlite3_free(zMsg);
  p->nErr++;
}

/* Send an informational message.
**
** If this happens on the remote side, we send back a *_MSG 
** message.  On the local side, the message goes to stdout.
*/
static void infoMsg(SQLiteRsync *p, const char *zFormat, ...){
  va_list ap;
  char *zMsg;
  unsigned int nMsg;
  va_start(ap, zFormat);
  zMsg = sqlite3_vmprintf(zFormat, ap);
  va_end(ap);
  nMsg = zMsg ? (unsigned int)strlen(zMsg) : 0;
  if( p->isRemote ){
    if( p->isReplica ){
      putc(REPLICA_MSG, p->pOut);
    }else{
      putc(ORIGIN_MSG, p->pOut);
    }
    writeUint32(p, nMsg);
    writeBytes(p, nMsg, zMsg);
    fflush(p->pOut);
  }else{
    printf("%s\n", zMsg);
  }
  sqlite3_free(zMsg);
}

/* Receive and report an error message coming from the other side.
*/
static void readAndDisplayMessage(SQLiteRsync *p, int c){
  unsigned int n = 0;
  char *zMsg;
  const char *zPrefix;
  if( c==ORIGIN_ERROR || c==REPLICA_ERROR ){
    zPrefix = "ERROR: ";
    p->nErr++;
  }else{
    zPrefix = "";
  }
  readUint32(p, &n);
  if( n==0 ){
    fprintf(stderr,"ERROR: unknown (possibly out-of-memory)\n");
  }else{
    zMsg = sqlite3_malloc64( n+1 );
    if( zMsg==0 ){
      fprintf(stderr, "ERROR: out-of-memory\n");
      return;
    }
    memset(zMsg, 0, n+1);
    readBytes(p, n, zMsg);
    fprintf(stderr,"%s%s\n", zPrefix, zMsg);
    sqlite3_free(zMsg);
  }

}

/* Construct a new prepared statement.  Report an error and return NULL
** if anything goes wrong.
*/
static sqlite3_stmt *prepareStmtVA(
  SQLiteRsync *p,
854
855
856
857
858
859
860

861

862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899

900
901
902
903
904
905
906
907
908
909
910
911
912
913
914

915
916
917
918
919
920
921
922
923
  int c = 0;
  unsigned int nPage = 0;
  unsigned int iPage = 0;
  unsigned int szPg = 0;
  sqlite3_stmt *pCkHash = 0;
  char buf[200];


  if( p->bCommCheck ){

    fprintf(p->pOut, "sqlite3-rsync origin-begin %s\n", p->zOrigin);
    fflush(p->pOut);
    echoOneLine(p);
    fprintf(p->pOut, "origin-end\n");
    fflush(p->pOut);
    echoOneLine(p);
    return;
  }

  /* Open the ORIGIN database. */
  rc = sqlite3_open_v2(p->zOrigin, &p->db, SQLITE_OPEN_READWRITE, 0);
  if( rc ){
    reportError(p, "unable to open origin database file \"%s\": %s",
                sqlite3_errmsg(p->db));
    closeDb(p);
    return;
  }
  sqlite3_sha_init(p->db, 0, 0);
  runSql(p, "BEGIN");
  runSqlReturnText(p, buf, "PRAGMA journal_mode");
  if( sqlite3_stricmp(buf,"wal")!=0 ){
    reportError(p, "Origin database is not in WAL mode");
  }
  runSqlReturnUInt(p, &nPage, "PRAGMA page_count");
  runSqlReturnUInt(p, &szPg, "PRAGMA page_size");

  if( p->nErr==0 ){
    /* Send the ORIGIN_BEGIN message */
    writeByte(p, ORIGIN_BEGIN);
    writeByte(p, PROTOCOL_VERSION);
    writePow2(p, szPg);
    writeUint32(p, nPage);
    fflush(p->pOut);
    p->nPage = nPage;
    p->szPage = szPg;
    p->iProtocol = PROTOCOL_VERSION;
  }


  /* Respond to message from the replica */
  while( p->nErr==0 && (c = readByte(p))!=EOF && c!=REPLICA_END ){
    switch( c ){
      case REPLICA_BEGIN: {
        /* This message is only sent if the replica received an origin-protocol
        ** that is larger than what it knows about.  The replica sends back
        ** a counter-proposal of an earlier protocol which the origin can
        ** accept by resending a new ORIGIN_BEGIN. */
        p->iProtocol = readByte(p);
        writeByte(p, ORIGIN_BEGIN);
        writeByte(p, p->iProtocol);
        writePow2(p, p->szPage);
        writeUint32(p, p->nPage);
        break;
      }

      case REPLICA_ERROR: {
        readAndDisplayError(p);
        break;
      }
      case REPLICA_HASH: {
        if( pCkHash==0 ){
          runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)");
          pCkHash = prepareStmt(p,
            "INSERT INTO badHash SELECT pgno FROM sqlite_dbpage('main')"







>

>
|
<
|
<

<
<
|
<
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>















>

|







878
879
880
881
882
883
884
885
886
887
888

889

890


891

892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
  int c = 0;
  unsigned int nPage = 0;
  unsigned int iPage = 0;
  unsigned int szPg = 0;
  sqlite3_stmt *pCkHash = 0;
  char buf[200];

  p->isReplica = 0;
  if( p->bCommCheck ){
    infoMsg(p, "origin  zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d",
               p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION);

    writeByte(p, ORIGIN_END);

    fflush(p->pOut);


  }else{

    /* Open the ORIGIN database. */
    rc = sqlite3_open_v2(p->zOrigin, &p->db, SQLITE_OPEN_READWRITE, 0);
    if( rc ){
      reportError(p, "unable to open origin database file \"%s\": %s",
                  sqlite3_errmsg(p->db));
      closeDb(p);
      return;
    }
    sqlite3_sha_init(p->db, 0, 0);
    runSql(p, "BEGIN");
    runSqlReturnText(p, buf, "PRAGMA journal_mode");
    if( sqlite3_stricmp(buf,"wal")!=0 ){
      reportError(p, "Origin database is not in WAL mode");
    }
    runSqlReturnUInt(p, &nPage, "PRAGMA page_count");
    runSqlReturnUInt(p, &szPg, "PRAGMA page_size");
  
    if( p->nErr==0 ){
      /* Send the ORIGIN_BEGIN message */
      writeByte(p, ORIGIN_BEGIN);
      writeByte(p, PROTOCOL_VERSION);
      writePow2(p, szPg);
      writeUint32(p, nPage);
      fflush(p->pOut);
      p->nPage = nPage;
      p->szPage = szPg;
      p->iProtocol = PROTOCOL_VERSION;
    }
  }
  
  /* Respond to message from the replica */
  while( p->nErr==0 && (c = readByte(p))!=EOF && c!=REPLICA_END ){
    switch( c ){
      case REPLICA_BEGIN: {
        /* This message is only sent if the replica received an origin-protocol
        ** that is larger than what it knows about.  The replica sends back
        ** a counter-proposal of an earlier protocol which the origin can
        ** accept by resending a new ORIGIN_BEGIN. */
        p->iProtocol = readByte(p);
        writeByte(p, ORIGIN_BEGIN);
        writeByte(p, p->iProtocol);
        writePow2(p, p->szPage);
        writeUint32(p, p->nPage);
        break;
      }
      case REPLICA_MSG:
      case REPLICA_ERROR: {
        readAndDisplayMessage(p, c);
        break;
      }
      case REPLICA_HASH: {
        if( pCkHash==0 ){
          runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)");
          pCkHash = prepareStmt(p,
            "INSERT INTO badHash SELECT pgno FROM sqlite_dbpage('main')"
1006
1007
1008
1009
1010
1011
1012


1013
1014

1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027

1028
1029
1030
1031
1032
1033
1034
1035
1036
**         Expect no more transmissions from the origin.
*/
static void replicaSide(SQLiteRsync *p){
  int c;
  sqlite3_stmt *pIns = 0;
  unsigned int szOPage = 0;
  char buf[65536];


  if( p->bCommCheck ){
    echoOneLine(p);

    fprintf(p->pOut, "replica-begin %s\n", p->zReplica);
    fflush(p->pOut);
    echoOneLine(p);
    fprintf(p->pOut, "replica-end\n");
    fflush(p->pOut);
    return;
  }

  /* Respond to message from the origin.  The origin will initiate the
  ** the conversation with an ORIGIN_BEGIN message.
  */
  while( p->nErr==0 && (c = readByte(p))!=EOF && c!=ORIGIN_END ){
    switch( c ){

      case ORIGIN_ERROR: {
        readAndDisplayError(p);
        break;
      }
      case ORIGIN_BEGIN: {
        unsigned int nOPage = 0;
        unsigned int nRPage = 0, szRPage = 0;
        int rc = 0;
        sqlite3_stmt *pStmt = 0;







>
>

<
>
|
<
|
<

<







>

|







1029
1030
1031
1032
1033
1034
1035
1036
1037
1038

1039
1040

1041

1042

1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
**         Expect no more transmissions from the origin.
*/
static void replicaSide(SQLiteRsync *p){
  int c;
  sqlite3_stmt *pIns = 0;
  unsigned int szOPage = 0;
  char buf[65536];

  p->isReplica = 1;
  if( p->bCommCheck ){

    infoMsg(p, "replica zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d",
               p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION);

    writeByte(p, REPLICA_END);

    fflush(p->pOut);

  }

  /* Respond to message from the origin.  The origin will initiate the
  ** the conversation with an ORIGIN_BEGIN message.
  */
  while( p->nErr==0 && (c = readByte(p))!=EOF && c!=ORIGIN_END ){
    switch( c ){
      case ORIGIN_MSG:
      case ORIGIN_ERROR: {
        readAndDisplayMessage(p, c);
        break;
      }
      case ORIGIN_BEGIN: {
        unsigned int nOPage = 0;
        unsigned int nRPage = 0, szRPage = 0;
        int rc = 0;
        sqlite3_stmt *pStmt = 0;