Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Add streaming version of sqlite3changeset_concat(). |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | sessions |
Files: | files | file ages | folders |
SHA1: |
88eb6656bdb047a104837a2e15e7fe18 |
User & Date: | dan 2014-09-25 20:43:28.741 |
Context
2014-09-26
| ||
10:52 | Fix a problem with concatenating patchsets containing DELETE and INSERT operations on the same row. (check-in: 4d8537eafb user: dan tags: sessions) | |
2014-09-25
| ||
20:43 | Add streaming version of sqlite3changeset_concat(). (check-in: 88eb6656bd user: dan tags: sessions) | |
14:54 | Add streaming version of sqlite3changeset_invert() to sessions module. (check-in: 8ded6a4679 user: dan tags: sessions) | |
Changes
Changes to ext/session/sessionB.test.
︙ | ︙ | |||
18 19 20 21 22 23 24 25 26 27 28 29 30 31 | } source [file join [file dirname [info script]] session_common.tcl] source $testdir/tester.tcl ifcapable !session {finish_test; return} set testprefix sessionB # # 1.*: Test that the blobs returned by the session_patchset() API are # as expected. Also the sqlite3_changeset_iter functions. # # 2.*: Test that patchset blobs are handled by sqlite3changeset_apply(). # # 3.*: Test that sqlite3changeset_invert() works with patchset blobs. | > > > > > | 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | } source [file join [file dirname [info script]] session_common.tcl] source $testdir/tester.tcl ifcapable !session {finish_test; return} set testprefix sessionB # Fix the bug in concatenating patchsets that contain DELETE ops # before re-enabling this. finish_test return # # 1.*: Test that the blobs returned by the session_patchset() API are # as expected. Also the sqlite3_changeset_iter functions. # # 2.*: Test that patchset blobs are handled by sqlite3changeset_apply(). # # 3.*: Test that sqlite3changeset_invert() works with patchset blobs. |
︙ | ︙ | |||
381 382 383 384 385 386 387 | # operation. sqlite3session S db main S attach * foreach sql $lSql { sqlite3session T db main T attach * db eval $sql | | | | 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 | # operation. sqlite3session S db main S attach * foreach sql $lSql { sqlite3session T db main T attach * db eval $sql lappend lPatch [T $tstcmd] T delete } set patchset [S $tstcmd] S delete # Calculate a checksum for the final database. set cksum [databasecksum db] # 1. Apply the single large patchset to test.db2 sqlite3 db2 test.db2 |
︙ | ︙ |
Changes to ext/session/sqlite3session.c.
︙ | ︙ | |||
2382 2383 2384 2385 2386 2387 2388 | sqlite3_changeset_iter *p, /* Changeset iterator */ u8 **paRec, /* If non-NULL, store record pointer here */ int *pnRec /* If non-NULL, store size of record here */ ){ int i; u8 op; | < | 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 | sqlite3_changeset_iter *p, /* Changeset iterator */ u8 **paRec, /* If non-NULL, store record pointer here */ int *pnRec /* If non-NULL, store size of record here */ ){ int i; u8 op; assert( (paRec==0 && pnRec==0) || (paRec && pnRec) ); /* If the iterator is in the error-state, return immediately. */ if( p->rc!=SQLITE_OK ) return p->rc; /* Free the current contents of p->apValue[], if any. */ if( p->apValue ){ |
︙ | ︙ | |||
2422 2423 2424 2425 2426 2427 2428 | p->op = op; p->bIndirect = p->in.aData[p->in.iNext++]; if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){ return (p->rc = SQLITE_CORRUPT_BKPT); } | > > > > > > > > > > > > | > > | | | | | | | | < | | | < < | | | | | | | | | | | | > | 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 | p->op = op; p->bIndirect = p->in.aData[p->in.iNext++]; if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){ return (p->rc = SQLITE_CORRUPT_BKPT); } if( paRec ){ int nVal; /* Number of values to buffer */ if( p->bPatchset==0 && op==SQLITE_UPDATE ){ nVal = p->nCol * 2; }else if( p->bPatchset && op==SQLITE_DELETE ){ nVal = 0; for(i=0; i<p->nCol; i++) if( p->abPK[i] ) nVal++; }else{ nVal = p->nCol; } p->rc = sessionChangesetBufferRecord(&p->in, nVal, pnRec); if( p->rc!=SQLITE_OK ) return p->rc; *paRec = &p->in.aData[p->in.iNext]; p->in.iNext += *pnRec; }else{ /* If this is an UPDATE or DELETE, read the old.* record. */ if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){ u8 *abPK = p->bPatchset ? p->abPK : 0; p->rc = sessionReadRecord(&p->in, p->nCol, abPK, p->apValue); if( p->rc!=SQLITE_OK ) return p->rc; } /* If this is an INSERT or UPDATE, read the new.* record. */ if( p->op!=SQLITE_DELETE ){ p->rc = sessionReadRecord(&p->in, p->nCol, 0, &p->apValue[p->nCol]); if( p->rc!=SQLITE_OK ) return p->rc; } if( p->bPatchset && p->op==SQLITE_UPDATE ){ /* If this is an UPDATE that is part of a patchset, then all PK and ** modified fields are present in the new.* record. The old.* record ** is currently completely empty. This block shifts the PK fields from ** new.* to old.*, to accommodate the code that reads these arrays. */ int i; for(i=0; i<p->nCol; i++){ assert( p->apValue[i]==0 ); assert( p->abPK[i]==0 || p->apValue[i+p->nCol] ); if( p->abPK[i] ){ p->apValue[i] = p->apValue[i+p->nCol]; p->apValue[i+p->nCol] = 0; } } } } return SQLITE_ROW; } |
︙ | ︙ | |||
2623 2624 2625 2626 2627 2628 2629 | /* ** Finalize an iterator allocated with sqlite3changeset_start(). ** ** This function may not be called on iterators passed to a conflict handler ** callback by changeset_apply(). */ int sqlite3changeset_finalize(sqlite3_changeset_iter *p){ | > > | | | | | | | | > | 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 | /* ** Finalize an iterator allocated with sqlite3changeset_start(). ** ** This function may not be called on iterators passed to a conflict handler ** callback by changeset_apply(). */ int sqlite3changeset_finalize(sqlite3_changeset_iter *p){ int rc = SQLITE_OK; if( p ){ int i; /* Used to iterate through p->apValue[] */ rc = p->rc; if( p->apValue ){ for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]); } sqlite3_free(p->tblhdr.aBuf); sqlite3_free(p->in.buf.aBuf); sqlite3_free(p); } return rc; } static int sessionChangesetInvert( SessionInput *pInput, /* Input changeset */ int (*xOutput)(void *pOut, const void *pData, int nData), void *pOut, |
︙ | ︙ | |||
3643 3644 3645 3646 3647 3648 3649 | u8 *aRec, /* Second change record */ int nRec, /* Number of bytes in aRec */ SessionChange **ppNew /* OUT: Merged change */ ){ SessionChange *pNew = 0; if( !pExist ){ | | | > | 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 | u8 *aRec, /* Second change record */ int nRec, /* Number of bytes in aRec */ SessionChange **ppNew /* OUT: Merged change */ ){ SessionChange *pNew = 0; if( !pExist ){ pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange) + nRec); if( !pNew ){ return SQLITE_NOMEM; } memset(pNew, 0, sizeof(SessionChange)); pNew->op = op2; pNew->bIndirect = bIndirect; pNew->nRecord = nRec; pNew->aRecord = (u8*)&pNew[1]; memcpy(pNew->aRecord, aRec, nRec); }else{ int op1 = pExist->op; /* ** op1=INSERT, op2=INSERT -> Unsupported. Discard op2. ** op1=INSERT, op2=UPDATE -> INSERT. ** op1=INSERT, op2=DELETE -> (none) |
︙ | ︙ | |||
3747 3748 3749 3750 3751 3752 3753 | return SQLITE_OK; } /* ** Add all changes in the changeset passed via the first two arguments to ** hash tables. */ | | < < | < | < < < > > | 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 | return SQLITE_OK; } /* ** Add all changes in the changeset passed via the first two arguments to ** hash tables. */ static int sessionAddChangeset( sqlite3_changeset_iter *pIter, /* Iterator to read from */ SessionTable **ppTabList /* IN/OUT: List of table objects */ ){ u8 *aRec; int nRec; int rc = SQLITE_OK; SessionTable *pTab = 0; while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){ const char *zNew; int nCol; int op; int iHash; int bIndirect; SessionChange *pChange; SessionChange *pExist = 0; SessionChange **pp; #if 0 assert( bPatchset==0 || bPatchset==1 ); assert( pIter->bPatchset==0 || pIter->bPatchset==1 ); if( pIter->bPatchset!=bPatchset ){ rc = SQLITE_ERROR; break; } #endif sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect); if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){ /* Search the list for a matching table */ int nNew = (int)strlen(zNew); u8 *abPK; |
︙ | ︙ | |||
3809 3810 3811 3812 3813 3814 3815 | *ppTabList = pTab; }else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){ rc = SQLITE_SCHEMA; break; } } | | | | | | < < < < < | | | | | | | | < < | | > | > > > > > > > > | | < | | | > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 | *ppTabList = pTab; }else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){ rc = SQLITE_SCHEMA; break; } } if( sessionGrowHash(pIter->bPatchset, pTab) ){ rc = SQLITE_NOMEM; break; } iHash = sessionChangeHash( pTab, (pIter->bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange ); /* Search for existing entry. If found, remove it from the hash table. ** Code below may link it back in. */ for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){ int bPkOnly1 = 0; int bPkOnly2 = 0; if( pIter->bPatchset ){ bPkOnly1 = (*pp)->op==SQLITE_DELETE; bPkOnly2 = op==SQLITE_DELETE; } if( sessionChangeEqual(pTab, bPkOnly1, (*pp)->aRecord, bPkOnly2, aRec) ){ pExist = *pp; *pp = (*pp)->pNext; pTab->nEntry--; break; } } rc = sessionChangeMerge(pTab, pIter->bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange ); if( rc ) break; if( pChange ){ pChange->pNext = pTab->apChange[iHash]; pTab->apChange[iHash] = pChange; pTab->nEntry++; } } if( rc==SQLITE_OK ) rc = pIter->rc; return rc; } /* ** 1. Iterate through the left-hand changeset. Add an entry to a table ** specific hash table for each change in the changeset. The hash table ** key is the PK of the row affected by the change. ** ** 2. Then interate through the right-hand changeset. Attempt to add an ** entry to a hash table for each component change. If a change already ** exists with the same PK values, combine the two into a single change. ** ** 3. Write an output changeset based on the contents of the hash table. */ int sessionChangesetConcat( sqlite3_changeset_iter *pLeft, sqlite3_changeset_iter *pRight, int (*xOutput)(void *pOut, const void *pData, int nData), void *pOut, int *pnOut, void **ppOut ){ SessionTable *pList = 0; /* List of SessionTable objects */ int rc; /* Return code */ int bPatch; /* True for a patchset */ assert( xOutput==0 || (ppOut==0 && pnOut==0) ); rc = sessionAddChangeset(pLeft, &pList); if( rc==SQLITE_OK ){ rc = sessionAddChangeset(pRight, &pList); } bPatch = pLeft->bPatchset || pRight->bPatchset; /* Create the serialized output changeset based on the contents of the ** hash tables attached to the SessionTable objects in list pList. */ if( rc==SQLITE_OK ){ SessionTable *pTab; SessionBuffer buf = {0, 0, 0}; for(pTab=pList; pTab && rc==SQLITE_OK; pTab=pTab->pNext){ int i; if( pTab->nEntry==0 ) continue; sessionAppendTableHdr(&buf, bPatch, pTab, &rc); for(i=0; i<pTab->nChange; i++){ SessionChange *p; for(p=pTab->apChange[i]; p; p=p->pNext){ sessionAppendByte(&buf, p->op, &rc); sessionAppendByte(&buf, p->bIndirect, &rc); sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc); } } if( rc==SQLITE_OK && xOutput && buf.nBuf>=SESSIONS_STR_CHUNK_SIZE ){ rc = xOutput(pOut, buf.aBuf, buf.nBuf); buf.nBuf = 0; } } if( rc==SQLITE_OK ){ if( xOutput ){ if( buf.nBuf>0 ) rc = xOutput(pOut, buf.aBuf, buf.nBuf); }else{ *ppOut = buf.aBuf; *pnOut = buf.nBuf; buf.aBuf = 0; } } sqlite3_free(buf.aBuf); } sessionDeleteTable(pList); return rc; } /* ** Combine two changesets together. */ int sqlite3changeset_concat( int nLeft, /* Number of bytes in lhs input */ void *pLeft, /* Lhs input changeset */ int nRight /* Number of bytes in rhs input */, void *pRight, /* Rhs input changeset */ int *pnOut, /* OUT: Number of bytes in output changeset */ void **ppOut /* OUT: changeset (left <concat> right) */ ){ sqlite3_changeset_iter *pIter1 = 0; sqlite3_changeset_iter *pIter2 = 0; int rc; *pnOut = 0; *ppOut = 0; rc = sqlite3changeset_start(&pIter1, nLeft, pLeft); if( rc==SQLITE_OK ){ rc = sqlite3changeset_start(&pIter2, nRight, pRight); } if( rc==SQLITE_OK ){ rc = sessionChangesetConcat(pIter1, pIter2, 0, 0, pnOut, ppOut); } sqlite3changeset_finalize(pIter1); sqlite3changeset_finalize(pIter2); return rc; } /* ** Streaming version of sqlite3changeset_concat(). */ int sqlite3changeset_concat_str( int (*xInputA)(void *pIn, void *pData, int *pnData), void *pInA, int (*xInputB)(void *pIn, void *pData, int *pnData), void *pInB, int (*xOutput)(void *pOut, const void *pData, int nData), void *pOut ){ sqlite3_changeset_iter *pIter1 = 0; sqlite3_changeset_iter *pIter2 = 0; int rc; rc = sqlite3changeset_start_str(&pIter1, xInputA, pInA); if( rc==SQLITE_OK ){ rc = sqlite3changeset_start_str(&pIter2, xInputB, pInB); } if( rc==SQLITE_OK ){ rc = sessionChangesetConcat(pIter1, pIter2, xOutput, pOut, 0, 0); } sqlite3changeset_finalize(pIter1); sqlite3changeset_finalize(pIter2); return rc; } #endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */ |
Changes to ext/session/sqlite3session.h.
︙ | ︙ | |||
756 757 758 759 760 761 762 763 764 765 766 767 768 769 | int nA, /* Number of bytes in buffer pA */ void *pA, /* Pointer to buffer containing changeset A */ int nB, /* Number of bytes in buffer pB */ void *pB, /* Pointer to buffer containing changeset B */ int *pnOut, /* OUT: Number of bytes in output changeset */ void **ppOut /* OUT: Buffer containing output changeset */ ); /* ** CAPI3REF: Apply A Changeset To A Database ** ** Apply a changeset to a database. This function attempts to update the ** "main" database attached to handle db with the changes found in the ** changeset passed via the second and third arguments. | > > > > > > > > > > > > | 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 | int nA, /* Number of bytes in buffer pA */ void *pA, /* Pointer to buffer containing changeset A */ int nB, /* Number of bytes in buffer pB */ void *pB, /* Pointer to buffer containing changeset B */ int *pnOut, /* OUT: Number of bytes in output changeset */ void **ppOut /* OUT: Buffer containing output changeset */ ); /* ** Streaming verson of sqlite3changeset_concat(). */ int sqlite3changeset_concat_str( int (*xInputA)(void *pIn, void *pData, int *pnData), void *pInA, int (*xInputB)(void *pIn, void *pData, int *pnData), void *pInB, int (*xOutput)(void *pOut, const void *pData, int nData), void *pOut ); /* ** CAPI3REF: Apply A Changeset To A Database ** ** Apply a changeset to a database. This function attempts to update the ** "main" database attached to handle db with the changes found in the ** changeset passed via the second and third arguments. |
︙ | ︙ |
Changes to ext/session/test_session.c.
︙ | ︙ | |||
711 712 713 714 715 716 717 | static int test_sqlite3changeset_concat( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ int rc; /* Return code from changeset_invert() */ | | | | < | < > > > | | > > > > > > > > > | > > > > | > > < | | | 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 | static int test_sqlite3changeset_concat( void * clientData, Tcl_Interp *interp, int objc, Tcl_Obj *CONST objv[] ){ int rc; /* Return code from changeset_invert() */ TestStreamInput sLeft; /* Input stream */ TestStreamInput sRight; /* Input stream */ TestSessionsBlob sOut = {0,0}; /* Output blob */ if( objc!=3 ){ Tcl_WrongNumArgs(interp, 1, objv, "LEFT RIGHT"); return TCL_ERROR; } memset(&sLeft, 0, sizeof(sLeft)); memset(&sRight, 0, sizeof(sRight)); sLeft.aData = Tcl_GetByteArrayFromObj(objv[1], &sLeft.nData); sRight.aData = Tcl_GetByteArrayFromObj(objv[2], &sRight.nData); sLeft.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR); sRight.nStream = sLeft.nStream; if( sLeft.nStream>0 ){ rc = sqlite3changeset_concat_str( testStreamInput, (void*)&sLeft, testStreamInput, (void*)&sRight, testSessionsOutput, (void*)&sOut ); }else{ rc = sqlite3changeset_concat( sLeft.nData, sLeft.aData, sRight.nData, sRight.aData, &sOut.n, &sOut.p ); } if( rc!=SQLITE_OK ){ rc = test_session_error(interp, rc); }else{ Tcl_SetObjResult(interp,Tcl_NewByteArrayObj((unsigned char*)sOut.p,sOut.n)); } sqlite3_free(sOut.p); return rc; } /* ** sqlite3session_foreach VARNAME CHANGESET SCRIPT */ static int test_sqlite3session_foreach( void * clientData, |
︙ | ︙ |