/ Check-in [d8bc3fdb]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix rebasing of UPDATE changes against a set of remote changesets that feature both OMIT and REPLACE conflict resolution on different fields of the same row.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | sessions-rebase
Files: files | file ages | folders
SHA3-256:d8bc3fdb6ba165ca8d7cab857ede8e7e6e2fac24ad59580c5e1db1a4942d295c
User & Date: dan 2018-03-21 17:29:53
Context
2018-03-21
19:46
Fix some documentation and other issues with the code on this branch. check-in: a9ec6862 user: dan tags: sessions-rebase
17:29
Fix rebasing of UPDATE changes against a set of remote changesets that feature both OMIT and REPLACE conflict resolution on different fields of the same row. check-in: d8bc3fdb user: dan tags: sessions-rebase
2018-03-20
20:27
Add further tests and documentation for the sessions rebase feature. check-in: 7475a363 user: dan tags: sessions-rebase
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to ext/session/sessionrebase.test.

116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
        lappend rebase [sqlite3changeset_apply_v2 db $c xConflict]
      }
    }
    #if {$tn=="2.1.4"} { puts [changeset_to_list $rebase] ; breakpoint }
    #puts [changeset_to_list [lindex $rebase 0]] ; breakpoint
    #puts [llength $rebase]
  
if {$i==2 && $tn=="3.3.1"} breakpoint
    sqlite3rebaser_create R
    foreach r $rebase {
puts [changeset_to_list $r]
      R configure $r
    }
    set c1r [R rebase $c1]
    R delete
    #if {$tn=="2.1.4"} { puts [changeset_to_list $c1r] }
  
    sqlite3changeset_apply_v2 db2 $c1r xConflictAbort







<


<







116
117
118
119
120
121
122

123
124

125
126
127
128
129
130
131
        lappend rebase [sqlite3changeset_apply_v2 db $c xConflict]
      }
    }
    #if {$tn=="2.1.4"} { puts [changeset_to_list $rebase] ; breakpoint }
    #puts [changeset_to_list [lindex $rebase 0]] ; breakpoint
    #puts [llength $rebase]
  

    sqlite3rebaser_create R
    foreach r $rebase {

      R configure $r
    }
    set c1r [R rebase $c1]
    R delete
    #if {$tn=="2.1.4"} { puts [changeset_to_list $c1r] }
  
    sqlite3changeset_apply_v2 db2 $c1r xConflictAbort

Changes to ext/session/sqlite3session.c.

512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
....
4523
4524
4525
4526
4527
4528
4529

4530
4531
4532
4533
4534
4535
4536
4537
4538
4539
4540


4541


















4542
4543
4544
4545
4546
4547
4548





































4549
4550
4551
4552
4553
4554
4555
....
4635
4636
4637
4638
4639
4640
4641
4642
4643
4644
4645
4646
4647
4648
4649
....
4995
4996
4997
4998
4999
5000
5001
5002
5003
5004
5005
5006
5007
5008
5009
....
5013
5014
5015
5016
5017
5018
5019
5020
5021
5022
5023
5024
5025
5026
5027
5028
5029
5030
5031
....
5036
5037
5038
5039
5040
5041
5042




5043
5044
5045
5046
5047
5048
5049
5050
5051
5052
5053
5054
5055
5056
5057
5058
5059
5060
5061
5062
5063
5064
5065
5066
5067
5068
5069
5070
5071
5072
5073
5074
....
5135
5136
5137
5138
5139
5140
5141
5142
5143
5144
5145
5146
5147
5148
5149
5150
5151
5152
5153
5154
5155
5156
5157
5158
5159
5160
5161
5162
** The buffer that the argument points to contains a serialized SQL value.
** Return the number of bytes of space occupied by the value (including
** the type byte).
*/
static int sessionSerialLen(u8 *a){
  int e = *a;
  int n;
  if( e==0 ) return 1;
  if( e==SQLITE_NULL ) return 1;
  if( e==SQLITE_INTEGER || e==SQLITE_FLOAT ) return 9;
  return sessionVarintGet(&a[1], &n) + 1 + n;
}

/*
** Based on the primary key values stored in change aRecord, calculate a
................................................................................
  int op2,                        /* Second change operation */
  int bIndirect,                  /* True if second change is indirect */
  u8 *aRec,                       /* Second change record */
  int nRec,                       /* Number of bytes in aRec */
  SessionChange **ppNew           /* OUT: Merged change */
){
  SessionChange *pNew = 0;


  if( !pExist ){
    pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange) + nRec);
    if( !pNew ){
      return SQLITE_NOMEM;
    }
    memset(pNew, 0, sizeof(SessionChange));
    pNew->op = op2;
    pNew->bIndirect = bIndirect;
    pNew->nRecord = nRec;
    pNew->aRecord = (u8*)&pNew[1];


    memcpy(pNew->aRecord, aRec, nRec);


















  }else if( bRebase){
    /* 
    **   op1=INSERT/R, op2=INSERT/R  ->
    **   op1=INSERT/R, op2=INSERT/O  ->
    **   op1=INSERT/O, op2=INSERT/R  ->
    **   op1=INSERT/O, op2=INSERT/O  ->
    */   





































  }else{
    int op1 = pExist->op;

    /* 
    **   op1=INSERT, op2=INSERT      ->      Unsupported. Discard op2.
    **   op1=INSERT, op2=UPDATE      ->      INSERT.
    **   op1=INSERT, op2=DELETE      ->      (none)
................................................................................
        pNew->nRecord = (int)(aCsr - pNew->aRecord);
      }
      sqlite3_free(pExist);
    }
  }

  *ppNew = pNew;
  return SQLITE_OK;
}

/*
** Add all changes in the changeset traversed by the iterator passed as
** the first argument to the changegroup hash tables.
*/
static int sessionChangesetToHash(
................................................................................
          memcpy(pOut, a2, nn2);
          pOut += nn2;
        }else{
          memcpy(pOut, a1, nn1);
          pOut += nn1;
        }
      }else{
        if( *a1==0 ){
          memcpy(pOut, a2, nn2);
          pOut += nn2;
        }else{
          memcpy(pOut, a1, nn1);
          pOut += nn1;
        }
      }
................................................................................

    pBuf->nBuf = pOut-pBuf->aBuf;
    assert( pBuf->nBuf<=pBuf->nAlloc );
  }
}

static void sessionAppendPartialUpdate(
  SessionBuffer *pBuf,
  sqlite3_changeset_iter *pIter,
  u8 *aRec, int nRec,
  u8 *aChange, int nChange,
  int *pRc
){
  sessionBufferGrow(pBuf, 2+nRec+nChange, pRc);
  if( *pRc==SQLITE_OK ){
    int bData = 0;
    u8 *pOut = &pBuf->aBuf[pBuf->nBuf];
    int i;
    u8 *a1 = aRec;
................................................................................
    for(i=0; i<pIter->nCol; i++){
      int n1 = sessionSerialLen(a1);
      int n2 = sessionSerialLen(a2);
      if( pIter->abPK[i] || a2[0]==0 ){
        if( !pIter->abPK[i] ) bData = 1;
        memcpy(pOut, a1, n1);
        pOut += n1;




      }else{
        *pOut++ = '\0';
      }
      a1 += n1;
      a2 += n2;
    }
    if( bData ){
      a2 = aChange;
      for(i=0; i<pIter->nCol; i++){
        int n1 = sessionSerialLen(a1);
        int n2 = sessionSerialLen(a2);
        if( pIter->abPK[i] || a2[0]==0 ){
          memcpy(pOut, a1, n1);
          pOut += n1;
        }else{
          *pOut++ = '\0';
        }
        a1 += n1;
        a2 += n2;
      }
      pBuf->nBuf = (pOut - pBuf->aBuf);
    }
  }
}


static int sessionRebase(
  sqlite3_rebaser *p,             /* Rebaser hash table */
  sqlite3_changeset_iter *pIter,  /* Input data */
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut,                     /* Context for xOutput callback */
  int *pnOut,                     /* OUT: Number of bytes in output changeset */
................................................................................
              sessionAppendByte(&sOut, pIter->bIndirect, &rc);
              sessionAppendRecordMerge(&sOut, pIter->nCol, 1,
                  pCsr, nRec-(pCsr-aRec), 
                  pChange->aRecord, pChange->nRecord, &rc
              );
            }
          }else{
            if( pChange->bIndirect==0 ){
              u8 *pCsr = aRec;
              sessionAppendByte(&sOut, SQLITE_UPDATE, &rc);
              sessionAppendByte(&sOut, pIter->bIndirect, &rc);
              sessionAppendRecordMerge(&sOut, pIter->nCol, 0,
                  aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
              );
              sessionSkipRecord(&pCsr, pIter->nCol);
              sessionAppendBlob(&sOut, pCsr, nRec - (pCsr-aRec), &rc);
            }else{
              sessionAppendPartialUpdate(&sOut, pIter,
                 aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
              );
            }
          }
          break;

        default:
          assert( pIter->op==SQLITE_DELETE );
          bDone = 1;
          if( pChange->op==SQLITE_INSERT ){







|







 







>









<

>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
<
<
<
<
<
<
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|







 







|







 







|
|
|
|
|







 







>
>
>
>











|












<







 







<
<
<
<
<
<
<
<
<
<
|
|
|
<







512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
....
4523
4524
4525
4526
4527
4528
4529
4530
4531
4532
4533
4534
4535
4536
4537
4538
4539

4540
4541
4542
4543
4544
4545
4546
4547
4548
4549
4550
4551
4552
4553
4554
4555
4556
4557
4558
4559
4560
4561
4562






4563
4564
4565
4566
4567
4568
4569
4570
4571
4572
4573
4574
4575
4576
4577
4578
4579
4580
4581
4582
4583
4584
4585
4586
4587
4588
4589
4590
4591
4592
4593
4594
4595
4596
4597
4598
4599
4600
4601
4602
4603
4604
4605
4606
....
4686
4687
4688
4689
4690
4691
4692
4693
4694
4695
4696
4697
4698
4699
4700
....
5046
5047
5048
5049
5050
5051
5052
5053
5054
5055
5056
5057
5058
5059
5060
....
5064
5065
5066
5067
5068
5069
5070
5071
5072
5073
5074
5075
5076
5077
5078
5079
5080
5081
5082
....
5087
5088
5089
5090
5091
5092
5093
5094
5095
5096
5097
5098
5099
5100
5101
5102
5103
5104
5105
5106
5107
5108
5109
5110
5111
5112
5113
5114
5115
5116
5117
5118
5119
5120
5121

5122
5123
5124
5125
5126
5127
5128
....
5189
5190
5191
5192
5193
5194
5195










5196
5197
5198

5199
5200
5201
5202
5203
5204
5205
** The buffer that the argument points to contains a serialized SQL value.
** Return the number of bytes of space occupied by the value (including
** the type byte).
*/
static int sessionSerialLen(u8 *a){
  int e = *a;
  int n;
  if( e==0 || e==0xFF ) return 1;
  if( e==SQLITE_NULL ) return 1;
  if( e==SQLITE_INTEGER || e==SQLITE_FLOAT ) return 9;
  return sessionVarintGet(&a[1], &n) + 1 + n;
}

/*
** Based on the primary key values stored in change aRecord, calculate a
................................................................................
  int op2,                        /* Second change operation */
  int bIndirect,                  /* True if second change is indirect */
  u8 *aRec,                       /* Second change record */
  int nRec,                       /* Number of bytes in aRec */
  SessionChange **ppNew           /* OUT: Merged change */
){
  SessionChange *pNew = 0;
  int rc = SQLITE_OK;

  if( !pExist ){
    pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange) + nRec);
    if( !pNew ){
      return SQLITE_NOMEM;
    }
    memset(pNew, 0, sizeof(SessionChange));
    pNew->op = op2;
    pNew->bIndirect = bIndirect;

    pNew->aRecord = (u8*)&pNew[1];
    if( bIndirect==0 || bRebase==0 ){
      pNew->nRecord = nRec;
      memcpy(pNew->aRecord, aRec, nRec);
    }else{
      int i;
      u8 *pIn = aRec;
      u8 *pOut = pNew->aRecord;
      for(i=0; i<pTab->nCol; i++){
        int nIn = sessionSerialLen(pIn);
        if( *pIn==0 ){
          *pOut++ = 0;
        }else if( pTab->abPK[i]==0 ){
          *pOut++ = 0xFF;
        }else{
          memcpy(pOut, pIn, nIn);
          pOut += nIn;
        }
        pIn += nIn;
      }
      pNew->nRecord = pOut - pNew->aRecord;
    }
  }else if( bRebase ){






    if( pExist->op==SQLITE_DELETE && pExist->bIndirect ){
      *ppNew = pExist;
    }else{
      int nByte = nRec + pExist->nRecord + sizeof(SessionChange);
      pNew = (SessionChange*)sqlite3_malloc(nByte);
      if( pNew==0 ){
        rc = SQLITE_NOMEM;
      }else{
        int i;
        u8 *a1 = pExist->aRecord;
        u8 *a2 = aRec;
        u8 *pOut;

        memset(pNew, 0, nByte);
        pNew->bIndirect = bIndirect || pExist->bIndirect;
        pNew->op = op2;
        pOut = pNew->aRecord = (u8*)&pNew[1];

        for(i=0; i<pTab->nCol; i++){
          int n1 = sessionSerialLen(a1);
          int n2 = sessionSerialLen(a2);
          if( *a1==0xFF || *a2==0xFF ){
            *pOut++ = 0xFF;
          }else if( *a2==0 ){
            memcpy(pOut, a1, n1);
            pOut += n1;
          }else{
            memcpy(pOut, a2, n2);
            pOut += n2;
          }
          a1 += n1;
          a2 += n2;
        }
        pNew->nRecord = pOut - pNew->aRecord;
      }
      sqlite3_free(pExist);
    }
  }else{
    int op1 = pExist->op;

    /* 
    **   op1=INSERT, op2=INSERT      ->      Unsupported. Discard op2.
    **   op1=INSERT, op2=UPDATE      ->      INSERT.
    **   op1=INSERT, op2=DELETE      ->      (none)
................................................................................
        pNew->nRecord = (int)(aCsr - pNew->aRecord);
      }
      sqlite3_free(pExist);
    }
  }

  *ppNew = pNew;
  return rc;
}

/*
** Add all changes in the changeset traversed by the iterator passed as
** the first argument to the changegroup hash tables.
*/
static int sessionChangesetToHash(
................................................................................
          memcpy(pOut, a2, nn2);
          pOut += nn2;
        }else{
          memcpy(pOut, a1, nn1);
          pOut += nn1;
        }
      }else{
        if( *a1==0 || *a1==0xFF ){
          memcpy(pOut, a2, nn2);
          pOut += nn2;
        }else{
          memcpy(pOut, a1, nn1);
          pOut += nn1;
        }
      }
................................................................................

    pBuf->nBuf = pOut-pBuf->aBuf;
    assert( pBuf->nBuf<=pBuf->nAlloc );
  }
}

static void sessionAppendPartialUpdate(
  SessionBuffer *pBuf,            /* Append record here */
  sqlite3_changeset_iter *pIter,  /* Iterator pointed at local change */
  u8 *aRec, int nRec,             /* Local change */
  u8 *aChange, int nChange,       /* Record to rebase against */
  int *pRc                        /* IN/OUT: Return Code */
){
  sessionBufferGrow(pBuf, 2+nRec+nChange, pRc);
  if( *pRc==SQLITE_OK ){
    int bData = 0;
    u8 *pOut = &pBuf->aBuf[pBuf->nBuf];
    int i;
    u8 *a1 = aRec;
................................................................................
    for(i=0; i<pIter->nCol; i++){
      int n1 = sessionSerialLen(a1);
      int n2 = sessionSerialLen(a2);
      if( pIter->abPK[i] || a2[0]==0 ){
        if( !pIter->abPK[i] ) bData = 1;
        memcpy(pOut, a1, n1);
        pOut += n1;
      }else if( a2[0]!=0xFF ){
        bData = 1;
        memcpy(pOut, a2, n2);
        pOut += n2;
      }else{
        *pOut++ = '\0';
      }
      a1 += n1;
      a2 += n2;
    }
    if( bData ){
      a2 = aChange;
      for(i=0; i<pIter->nCol; i++){
        int n1 = sessionSerialLen(a1);
        int n2 = sessionSerialLen(a2);
        if( pIter->abPK[i] || a2[0]!=0xFF ){
          memcpy(pOut, a1, n1);
          pOut += n1;
        }else{
          *pOut++ = '\0';
        }
        a1 += n1;
        a2 += n2;
      }
      pBuf->nBuf = (pOut - pBuf->aBuf);
    }
  }
}


static int sessionRebase(
  sqlite3_rebaser *p,             /* Rebaser hash table */
  sqlite3_changeset_iter *pIter,  /* Input data */
  int (*xOutput)(void *pOut, const void *pData, int nData),
  void *pOut,                     /* Context for xOutput callback */
  int *pnOut,                     /* OUT: Number of bytes in output changeset */
................................................................................
              sessionAppendByte(&sOut, pIter->bIndirect, &rc);
              sessionAppendRecordMerge(&sOut, pIter->nCol, 1,
                  pCsr, nRec-(pCsr-aRec), 
                  pChange->aRecord, pChange->nRecord, &rc
              );
            }
          }else{










            sessionAppendPartialUpdate(&sOut, pIter,
                aRec, nRec, pChange->aRecord, pChange->nRecord, &rc
            );

          }
          break;

        default:
          assert( pIter->op==SQLITE_DELETE );
          bDone = 1;
          if( pChange->op==SQLITE_INSERT ){

Changes to ext/session/sqlite3session.h.

1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279

















1280
1281
1282
1283
1284
1285
1286
**   changeset. Or, if the conflict resolution was REPLACE, add
**   nothing to the rebased changeset.
**
** <dt>Local DELETE<dd>
**   This may conflict with a remote UPDATE or DELETE. In both cases the
**   only possible resolution is OMIT. If the remote operation was a
**   DELETE, then add no change to the rebased changeset. If the remote
**   operation was an UPDATE, then the old.* fields of change are updated 
**   to reflect the new.* values in the UPDATE.
**
** <dt>Local UPDATE<dd>
**   This may conflict with a remote UPDATE or DELETE. If it conflicts
**   with a DELETE, and the conflict resolution was OMIT, then the update
**   is changed into an INSERT. Any undefined values in the new.* record
**   from the update change are filled in using the old.* values from 
**   the conflicting DELETE. Or, if the conflict resolution was REPLACE,
**   the UPDATE change is simply omitted from the rebased changeset.
**
**   If conflict is with a remote UPDATE and the resolution is OMIT, then
**   the old.* values are rebased using the new.* values in the remote
**   change. Or, if the resolution is REPLACE, then the change is copied
**   into the rebased changeset with updates to columns also updated by
**   the conflicting UPDATE removed. If this means no columns would be
**   updated, the change is omitted.
** </dl>
**
** A local change may be rebased against multiple remote changes 
** simultaneously.

















*/
typedef struct sqlite3_rebaser sqlite3_rebaser;

/* Create a new rebaser object */
int sqlite3rebaser_create(sqlite3_rebaser **ppNew);

/* Call this one or more times to configure a rebaser */







|






|







|
|



|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
**   changeset. Or, if the conflict resolution was REPLACE, add
**   nothing to the rebased changeset.
**
** <dt>Local DELETE<dd>
**   This may conflict with a remote UPDATE or DELETE. In both cases the
**   only possible resolution is OMIT. If the remote operation was a
**   DELETE, then add no change to the rebased changeset. If the remote
**   operation was an UPDATE, then the old.* fields of change are updated
**   to reflect the new.* values in the UPDATE.
**
** <dt>Local UPDATE<dd>
**   This may conflict with a remote UPDATE or DELETE. If it conflicts
**   with a DELETE, and the conflict resolution was OMIT, then the update
**   is changed into an INSERT. Any undefined values in the new.* record
**   from the update change are filled in using the old.* values from
**   the conflicting DELETE. Or, if the conflict resolution was REPLACE,
**   the UPDATE change is simply omitted from the rebased changeset.
**
**   If conflict is with a remote UPDATE and the resolution is OMIT, then
**   the old.* values are rebased using the new.* values in the remote
**   change. Or, if the resolution is REPLACE, then the change is copied
**   into the rebased changeset with updates to columns also updated by
**   the conflicting remote UPDATE removed. If this means no columns would 
**   be updated, the change is omitted.
** </dl>
**
** A local change may be rebased against multiple remote changes 
** simultaneously. If a single key is modified by multiple remote 
** changesets, they are combined as follows before the local changeset
** is rebased:
**
** <ul>
**    <li> If there has been one or more REPLACE resolutions on a
**         key, it is rebased according to a REPLACE.
**
**    <li> If there have been no REPLACE resolutions on a key, then
**         the local changeset is rebased according to the most recent
**         of the OMIT resolutions.
** </ul>
**
** Note that conflict resolutions from multiple remote changesets are 
** combined on a per-field basis, not per-row. This means that in the 
** case of multiple remote UPDATE operations, some fields of a single 
** local change may be rebased for REPLACE while others are rebased for 
** OMIT.
*/
typedef struct sqlite3_rebaser sqlite3_rebaser;

/* Create a new rebaser object */
int sqlite3rebaser_create(sqlite3_rebaser **ppNew);

/* Call this one or more times to configure a rebaser */