SQLite

Check-in [427e2c9d]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Extend begin_concurrent_report() so that it reports on accesses to non-intkey tables as well.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | begin-concurrent-report
Files: files | file ages | folders
SHA3-256: 427e2c9d95a92308406079b238aed63e32ddb5073e016e6f147ddec0918a6c86
User & Date: dan 2020-03-03 21:01:04
Context
2020-03-04
19:17
Fix cases to do with index scans. (check-in: 6ec5a899 user: dan tags: begin-concurrent-report)
2020-03-03
21:01
Extend begin_concurrent_report() so that it reports on accesses to non-intkey tables as well. (check-in: 427e2c9d user: dan tags: begin-concurrent-report)
2020-03-02
21:00
Fix reporting for some range scan cases. (check-in: ad961aee user: dan tags: begin-concurrent-report)
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/btree.c.

4923
4924
4925
4926
4927
4928
4929



4930
4931
4932
4933
4934
4935
4936
  memset(p, 0, offsetof(BtCursor, BTCURSOR_FIRST_UNINIT));
}

void sqlite3BtreeScanDeref(CursorScan *pScan){
  if( pScan ){
    pScan->nRef--;
    if( pScan->nRef==0 ){



      sqlite3_free(pScan);
    }
  }
}

/*
** Close a cursor.  The read lock on the database file is released







>
>
>







4923
4924
4925
4926
4927
4928
4929
4930
4931
4932
4933
4934
4935
4936
4937
4938
4939
  memset(p, 0, offsetof(BtCursor, BTCURSOR_FIRST_UNINIT));
}

void sqlite3BtreeScanDeref(CursorScan *pScan){
  if( pScan ){
    pScan->nRef--;
    if( pScan->nRef==0 ){
      sqlite3_free(pScan->aMax);
      sqlite3_free(pScan->aMin);
      sqlite3_free(pScan->aLimit);
      sqlite3_free(pScan);
    }
  }
}

/*
** Close a cursor.  The read lock on the database file is released
5828
5829
5830
5831
5832
5833
5834


5835
5836
5837
5838
5839
5840
5841
  }else if( rc==SQLITE_EMPTY ){
    assert( pCur->pgnoRoot==0 || pCur->pPage->nCell==0 );
    *pRes = 1;
    rc = SQLITE_OK;
  }
  return rc;
}



const char *sqlite3_begin_concurrent_report(sqlite3 *db){
  sqlite3DbFree(db, db->zBCReport);
  db->zBCReport = 0;
  if( db->pCScanList ){
    CursorScan *p;
    StrAccum accum;







>
>







5831
5832
5833
5834
5835
5836
5837
5838
5839
5840
5841
5842
5843
5844
5845
5846
  }else if( rc==SQLITE_EMPTY ){
    assert( pCur->pgnoRoot==0 || pCur->pPage->nCell==0 );
    *pRes = 1;
    rc = SQLITE_OK;
  }
  return rc;
}

#include "vdbeInt.h"

const char *sqlite3_begin_concurrent_report(sqlite3 *db){
  sqlite3DbFree(db, db->zBCReport);
  db->zBCReport = 0;
  if( db->pCScanList ){
    CursorScan *p;
    StrAccum accum;
5878
5879
5880
5881
5882
5883
5884







5885

5886















5887


5888






5889




























5890








5891







5892
5893


5894

5895
5896
5897
5898
5899
5900
5901
5902
5903
5904
5905
5906
5907
5908
5909
5910
5911
5912
5913
5914
5915
5916
5917
5918
5919
5920
5921
5922
5923
5924
5925
5926
5927
5928
5929
      );
    }
    db->zBCReport = sqlite3StrAccumFinish(&accum);
  }
  return db->zBCReport;
}








void sqlite3BtreeScanDerefList(CursorScan *pList){

  CursorScan *p;















  CursorScan *pNext;


  for(p=pList; p; p=pNext){






    pNext = p->pNext;




























    sqlite3BtreeScanDeref(p);








  }







}



static void btreeScanNext(BtCursor *pCsr, int bPrev){

  if( pCsr->pCScan ){
    if( bPrev ){
      if( sqlite3BtreeEof(pCsr) ){
        pCsr->pCScan->flags &= ~(CURSORSCAN_MINVALID|CURSORSCAN_MININCL);
      }else{
        pCsr->pCScan->iMin = sqlite3BtreeIntegerKey(pCsr);
        pCsr->pCScan->flags |= CURSORSCAN_MINVALID|CURSORSCAN_MININCL;
      }
    }else{
      if( sqlite3BtreeEof(pCsr) ){
        pCsr->pCScan->flags &= ~(CURSORSCAN_MAXVALID|CURSORSCAN_MAXINCL);
      }else{
        pCsr->pCScan->iMax = sqlite3BtreeIntegerKey(pCsr);
        pCsr->pCScan->flags |= CURSORSCAN_MAXVALID|CURSORSCAN_MAXINCL;
      }
    }
  }
}

int sqlite3BtreeScanLimit(
  BtCursor *pCsr, 
  UnpackedRecord *pKey, 
  i64 iKey, 
  int opcode
){
  CursorScan *pScan = pCsr->pCScan;
  if( pScan ){
    pScan->iLimit = iKey;
    pScan->flags |= CURSORSCAN_LIMITVALID;
    switch( opcode ){
      case OP_Lt:
        pScan->flags |= CURSORSCAN_LIMITINCL;
      case OP_Le:
        break;








>
>
>
>
>
>
>
|
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
|
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
|
|
>
>

>
|


|

|
|



|

|
|













|







5883
5884
5885
5886
5887
5888
5889
5890
5891
5892
5893
5894
5895
5896
5897
5898
5899
5900
5901
5902
5903
5904
5905
5906
5907
5908
5909
5910
5911
5912
5913
5914
5915
5916
5917
5918
5919
5920
5921
5922
5923
5924
5925
5926
5927
5928
5929
5930
5931
5932
5933
5934
5935
5936
5937
5938
5939
5940
5941
5942
5943
5944
5945
5946
5947
5948
5949
5950
5951
5952
5953
5954
5955
5956
5957
5958
5959
5960
5961
5962
5963
5964
5965
5966
5967
5968
5969
5970
5971
5972
5973
5974
5975
5976
5977
5978
5979
5980
5981
5982
5983
5984
5985
5986
5987
5988
5989
5990
5991
5992
5993
5994
5995
5996
5997
5998
5999
6000
6001
6002
6003
6004
6005
6006
6007
6008
6009
6010
6011
      );
    }
    db->zBCReport = sqlite3StrAccumFinish(&accum);
  }
  return db->zBCReport;
}

static u32 btreeScanSerialType(Mem *pMem){
  if( pMem->flags & MEM_Int ) return 6;
  if( pMem->flags & MEM_Real ) return 7;
  if( pMem->flags & MEM_Str ) return (pMem->n * 2)+12;
  if( pMem->flags & MEM_Blob ) return (pMem->n * 2)+13;
  return 0;
}

static void btreeScanSet(
  CursorScan *pNew, 
  UnpackedRecord *pKey, 
  i64 iKey,
  i64 *piKey,
  u8 **paKey
){
  if( pKey==0 ){
    assert( *paKey==0 );
    *piKey = iKey;
  }else{
    /* Figure out how much space is required */
    int ii;
    u8 *aRec = 0;
    int nByte = 0;
    int nHdr = 0;
    int nSize = 0;

    sqlite3_free(*paKey);
    *paKey = 0;

    for(ii=0; ii<pKey->nField; ii++){
      Mem *pMem = &pKey->aMem[ii];
      u32 serial_type = btreeScanSerialType(pMem);
      nByte += sqlite3VdbeSerialTypeLen(serial_type);
      nHdr += sqlite3VarintLen(serial_type);
    }

    nSize = sqlite3VarintLen(nHdr);
    if( sqlite3VarintLen(nSize+nHdr)>nSize ) nSize++;

    aRec = (u8*)sqlite3_malloc(nSize+nHdr+nByte);
    if( aRec==0 ){
      pNew->flags |= CURSORSCAN_OOM;
    }else{
      int iOff = 0;
      iOff += sqlite3PutVarint(&aRec[iOff], nSize);
      for(ii=0; ii<pKey->nField; ii++){
        u32 serial_type = btreeScanSerialType(&pKey->aMem[ii]);
        iOff += sqlite3PutVarint(&aRec[iOff], serial_type);
      }
      for(ii=0; ii<pKey->nField; ii++){
        Mem *pMem = &pKey->aMem[ii];
        u32 serial_type = btreeScanSerialType(pMem);
        iOff += sqlite3VdbeSerialPut(&aRec[iOff], pMem, serial_type);
      }
      assert( iOff==(nSize+nHdr+nByte) );
      *paKey = aRec;
      *piKey = iOff;
    }

  }
}

static void btreeScanSetKey(BtCursor *pCsr, i64 *piKey, u8 **paKey){
  if( pCsr->curIntKey ){
    *piKey = sqlite3BtreeIntegerKey(pCsr);
  }else{
    int rc;
    u32 nKey = sqlite3BtreePayloadSize(pCsr);
    u8 *aKey = sqlite3_malloc(nKey);
    if( aKey==0 ){
      rc = SQLITE_NOMEM;
    }else{
      rc = sqlite3BtreePayload(pCsr, 0, nKey, aKey);
    }
    if( rc ){
      sqlite3_free(aKey);
      pCsr->pCScan->flags |= CURSORSCAN_OOM;
    }else{
      sqlite3_free(*paKey);
      *piKey = nKey;
      *paKey = aKey;
    }
  }
}

static void btreeScanNext(BtCursor *pCsr, int bPrev){
  CursorScan *pCScan = pCsr->pCScan;
  if( pCScan ){
    if( bPrev ){
      if( sqlite3BtreeEof(pCsr) ){
        pCScan->flags &= ~(CURSORSCAN_MINVALID|CURSORSCAN_MININCL);
      }else{
        pCScan->iMin = sqlite3BtreeIntegerKey(pCsr);
        btreeScanSetKey(pCsr, &pCScan->iMin, &pCScan->aMin);
      }
    }else{
      if( sqlite3BtreeEof(pCsr) ){
        pCScan->flags &= ~(CURSORSCAN_MAXVALID|CURSORSCAN_MAXINCL);
      }else{
        btreeScanSetKey(pCsr, &pCScan->iMax, &pCScan->aMax);
        pCScan->flags |= CURSORSCAN_MAXVALID|CURSORSCAN_MAXINCL;
      }
    }
  }
}

int sqlite3BtreeScanLimit(
  BtCursor *pCsr, 
  UnpackedRecord *pKey, 
  i64 iKey, 
  int opcode
){
  CursorScan *pScan = pCsr->pCScan;
  if( pScan ){
    btreeScanSet(pScan, pKey, iKey, &pScan->iLimit, &pScan->aLimit);
    pScan->flags |= CURSORSCAN_LIMITVALID;
    switch( opcode ){
      case OP_Lt:
        pScan->flags |= CURSORSCAN_LIMITINCL;
      case OP_Le:
        break;

5940
5941
5942
5943
5944
5945
5946
5947
5948
5949
5950
5951
5952
5953
5954
5955
5956
5957
5958
5959
5960
5961
5962
5963
5964
int sqlite3BtreeScanStart(
  BtCursor *pCsr, 
  UnpackedRecord *pKey, 
  i64 iKey,
  int opcode
){
  Btree *pBtree = pCsr->pBtree;
  if( pKey==0 
   && pBtree->db->bConcurrent 
   && sqlite3PagerIsWal(pBtree->pBt->pPager) 
  ){
    sqlite3 *db = pCsr->pBtree->db;
    CursorScan *pNew;
    pNew = (CursorScan*)sqlite3MallocZero(sizeof(CursorScan));
    if( pNew==0 ) return SQLITE_NOMEM;
    pNew->tnum = (int)pCsr->pgnoRoot;
    pNew->flags = CURSORSCAN_INTKEY;
    pNew->iMin = pNew->iMax = iKey;

    if( pCsr->pCScan ){
      sqlite3BtreeScanDeref(pCsr->pCScan);
    }
    pCsr->pCScan = pNew;
    pNew->pNext = db->pCScanList;
    db->pCScanList = pNew;







<
|







|
<







6022
6023
6024
6025
6026
6027
6028

6029
6030
6031
6032
6033
6034
6035
6036
6037

6038
6039
6040
6041
6042
6043
6044
int sqlite3BtreeScanStart(
  BtCursor *pCsr, 
  UnpackedRecord *pKey, 
  i64 iKey,
  int opcode
){
  Btree *pBtree = pCsr->pBtree;

  if( pBtree->db->bConcurrent 
   && sqlite3PagerIsWal(pBtree->pBt->pPager) 
  ){
    sqlite3 *db = pCsr->pBtree->db;
    CursorScan *pNew;
    pNew = (CursorScan*)sqlite3MallocZero(sizeof(CursorScan));
    if( pNew==0 ) return SQLITE_NOMEM;
    pNew->tnum = (int)pCsr->pgnoRoot;
    if( pKey==0 ) pNew->flags = CURSORSCAN_INTKEY;


    if( pCsr->pCScan ){
      sqlite3BtreeScanDeref(pCsr->pCScan);
    }
    pCsr->pCScan = pNew;
    pNew->pNext = db->pCScanList;
    db->pCScanList = pNew;
5972
5973
5974
5975
5976
5977
5978

5979
5980
5981
5982
5983
5984
5985

5986
5987
5988
5989
5990

5991
5992
5993
5994
5995
5996
5997










5998
5999
6000
6001
6002
6003
6004
        btreeScanNext(pCsr, 1);
        break;

      case OP_SeekLE:
        pNew->flags |= CURSORSCAN_MAXINCL;
      case OP_SeekLT:
        pNew->flags |= CURSORSCAN_MAXVALID;

        btreeScanNext(pCsr, 1);
        break;

      case OP_SeekGE:
        pNew->flags |= CURSORSCAN_MININCL;
      case OP_SeekGT:
        pNew->flags |= CURSORSCAN_MINVALID;

        btreeScanNext(pCsr, 0);
        break;

      case OP_SeekRowid:
      case OP_DeferredSeek:

        pNew->flags |= (CURSORSCAN_MININCL|CURSORSCAN_MAXINCL);
        pNew->flags |= CURSORSCAN_MINVALID|CURSORSCAN_MAXVALID;
        break;
    }
  }
  return SQLITE_OK;
}











/* Move the cursor so that it points to an entry near the key 
** specified by pIdxKey or intKey.   Return a success code.
**
** For INTKEY tables, the intKey parameter is used.  pIdxKey 
** must be NULL.  For index tables, pIdxKey is used and intKey
** is ignored.







>







>





>







>
>
>
>
>
>
>
>
>
>







6052
6053
6054
6055
6056
6057
6058
6059
6060
6061
6062
6063
6064
6065
6066
6067
6068
6069
6070
6071
6072
6073
6074
6075
6076
6077
6078
6079
6080
6081
6082
6083
6084
6085
6086
6087
6088
6089
6090
6091
6092
6093
6094
6095
6096
6097
        btreeScanNext(pCsr, 1);
        break;

      case OP_SeekLE:
        pNew->flags |= CURSORSCAN_MAXINCL;
      case OP_SeekLT:
        pNew->flags |= CURSORSCAN_MAXVALID;
        pNew->iMax = iKey;
        btreeScanNext(pCsr, 1);
        break;

      case OP_SeekGE:
        pNew->flags |= CURSORSCAN_MININCL;
      case OP_SeekGT:
        pNew->flags |= CURSORSCAN_MINVALID;
        btreeScanSet(pNew, pKey, iKey, &pNew->iMin, &pNew->aMin);
        btreeScanNext(pCsr, 0);
        break;

      case OP_SeekRowid:
      case OP_DeferredSeek:
        pNew->iMin = pNew->iMax = iKey;
        pNew->flags |= (CURSORSCAN_MININCL|CURSORSCAN_MAXINCL);
        pNew->flags |= CURSORSCAN_MINVALID|CURSORSCAN_MAXVALID;
        break;
    }
  }
  return SQLITE_OK;
}

void sqlite3BtreeScanDerefList(CursorScan *pList){
  CursorScan *p;
  CursorScan *pNext;
  for(p=pList; p; p=pNext){
    pNext = p->pNext;
    sqlite3BtreeScanDeref(p);
  }
}


/* Move the cursor so that it points to an entry near the key 
** specified by pIdxKey or intKey.   Return a success code.
**
** For INTKEY tables, the intKey parameter is used.  pIdxKey 
** must be NULL.  For index tables, pIdxKey is used and intKey
** is ignored.

Changes to src/sqliteInt.h.

1430
1431
1432
1433
1434
1435
1436


1437
1438
1439
1440
1441
1442
1443
1444



1445
1446
1447
1448
1449
1450
1451
#define CURSORSCAN_REVERSE    0x0008
#define CURSORSCAN_MININCL    0x0010
#define CURSORSCAN_MAXINCL    0x0020

#define CURSORSCAN_LIMITVALID 0x0040
#define CURSORSCAN_LIMITINCL  0x0080
#define CURSORSCAN_LIMITMAX   0x0100



typedef struct CursorScan CursorScan;
struct CursorScan {
  int tnum;                     /* Root page of scanned b-tree */
  int flags;
  i64 iMin;
  i64 iMax;
  i64 iLimit;



  int nRef;                     /* Number of pointers to this structure */
  CursorScan *pNext;            /* Next CursorScan object in list */
};


/*
** Each database connection is an instance of the following structure.







>
>








>
>
>







1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
#define CURSORSCAN_REVERSE    0x0008
#define CURSORSCAN_MININCL    0x0010
#define CURSORSCAN_MAXINCL    0x0020

#define CURSORSCAN_LIMITVALID 0x0040
#define CURSORSCAN_LIMITINCL  0x0080
#define CURSORSCAN_LIMITMAX   0x0100

#define CURSORSCAN_OOM        0x0200

typedef struct CursorScan CursorScan;
struct CursorScan {
  int tnum;                     /* Root page of scanned b-tree */
  int flags;
  i64 iMin;
  i64 iMax;
  i64 iLimit;
  u8 *aMin;
  u8 *aMax;
  u8 *aLimit;
  int nRef;                     /* Number of pointers to this structure */
  CursorScan *pNext;            /* Next CursorScan object in list */
};


/*
** Each database connection is an instance of the following structure.

Changes to src/vdbe.c.

4330
4331
4332
4333
4334
4335
4336

4337
4338
4339
4340
4341
4342
4343

    r.aMem = &aMem[pOp->p3];
#ifdef SQLITE_DEBUG
    { int i; for(i=0; i<r.nField; i++) assert( memIsValid(&r.aMem[i]) ); }
#endif
    r.eqSeen = 0;
    rc = sqlite3BtreeMovetoUnpacked(pC->uc.pCursor, &r, 0, 0, &res);

    if( rc!=SQLITE_OK ){
      goto abort_due_to_error;
    }
    if( eqOnly && r.eqSeen==0 ){
      assert( res!=0 );
      goto seek_not_found;
    }







>







4330
4331
4332
4333
4334
4335
4336
4337
4338
4339
4340
4341
4342
4343
4344

    r.aMem = &aMem[pOp->p3];
#ifdef SQLITE_DEBUG
    { int i; for(i=0; i<r.nField; i++) assert( memIsValid(&r.aMem[i]) ); }
#endif
    r.eqSeen = 0;
    rc = sqlite3BtreeMovetoUnpacked(pC->uc.pCursor, &r, 0, 0, &res);
    sqlite3BtreeScanStart(pC->uc.pCursor, &r, 0, pOp->opcode);
    if( rc!=SQLITE_OK ){
      goto abort_due_to_error;
    }
    if( eqOnly && r.eqSeen==0 ){
      assert( res!=0 );
      goto seek_not_found;
    }

Changes to test/concreport.test.

120
121
122
123
124
125
126
127
128
129








130

131





















132
133
134
135

# This case does not work. The result should be [101..200], but as there
# are no rows matching (k>=101) the system never sees the (k<=200) 
# constraint. Hence "..EOF".
do_concreport_test 1.13 {
  SELECT v FROM t1 WHERE k >= 101 AND k <= 200
} {} {
  2:[101..EOF]
}









do_execsql_test 2.0 {

  CREATE TABLE t2(k PRIMARY KEY, v);





















}

finish_test








|


>
>
>
>
>
>
>
>

>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>




120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165

# This case does not work. The result should be [101..200], but as there
# are no rows matching (k>=101) the system never sees the (k<=200) 
# constraint. Hence "..EOF".
do_concreport_test 1.13 {
  SELECT v FROM t1 WHERE k >= 101 AND k <= 200
} {} {
  2:[101..EOF)
}

do_concreport_test 1.14 {
  SELECT v FROM t1 WHERE +v=5
} {5} {
  2:(EOF..EOF)
}

#--------------------------------------------------------------------------
reset_db
do_execsql_test 2.0 {
  PRAGMA journal_mode = wal;
  CREATE TABLE t2(k PRIMARY KEY, v);
  INSERT INTO t2 VALUES('a',  1);
  INSERT INTO t2 VALUES('b',  2);
  INSERT INTO t2 VALUES('c',  3);
  INSERT INTO t2 VALUES('d',  4);
  INSERT INTO t2 VALUES('e',  5);
  INSERT INTO t2 VALUES('f',  6);
  INSERT INTO t2 VALUES('g',  7);
  INSERT INTO t2 VALUES('h',  8);
  INSERT INTO t2 VALUES('i',  9);
  INSERT INTO t2 VALUES('j', 10);
  INSERT INTO t2 VALUES('k', 11);
}

explain_i {
  SELECT * FROM t2 WHERE k = 'e'
}
breakpoint
do_concreport_test 2.1 {
  SELECT * FROM t2 WHERE k = 'e'
} {e 5} {
  2:
}

finish_test