SQLite

Check-in [fc17f73170]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a problem whereby concurrent transactions would not consider pages read by the transaction before the first write statement.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | begin-concurrent
Files: files | file ages | folders
SHA1: fc17f73170a27c2fe511ed6b6d488535c4e35bae
User & Date: dan 2015-08-27 17:42:38.260
Context
2015-08-27
19:22
Add test cases for concurrent transactions and long-lived SELECT statements. (check-in: fd4798cb7a user: dan tags: begin-concurrent)
17:42
Fix a problem whereby concurrent transactions would not consider pages read by the transaction before the first write statement. (check-in: fc17f73170 user: dan tags: begin-concurrent)
2015-08-26
18:54
Fix an assert() in pager.c that could fail in a concurrent transaction. (check-in: 69394ddaa2 user: dan tags: begin-concurrent)
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/btree.c.
591
592
593
594
595
596
597

598
599
600
601
602
603
604
605
606

607
608
609
610
611
612
613
** This function is called after an CONCURRENT transaction is opened on the
** database. It allocates the BtreePtrmap structure used to track pointers
** to allocated pages and zeroes the nFree/iTrunk fields in the database 
** header on page 1.
*/
static int btreePtrmapAllocate(BtShared *pBt){
  int rc = SQLITE_OK;

  BtreePtrmap *pMap = sqlite3_malloc(sizeof(BtreePtrmap));
  assert( pBt->pMap==0 && sqlite3PagerIsConcurrent(pBt->pPager) );
  if( pMap==0 ){
    rc = SQLITE_NOMEM;
  }else{
    memset(&pBt->pPage1->aData[32], 0, sizeof(u32)*2);
    memset(pMap, 0, sizeof(BtreePtrmap));
    pMap->iFirst = pBt->nPage + 1;
    pBt->pMap = pMap;

  }
  return rc;
}

/*
** Free any BtreePtrmap structure allocated by an earlier call to
** btreePtrmapAllocate().







>
|
<
|
|
|
|
|
|
|
>







591
592
593
594
595
596
597
598
599

600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
** This function is called after an CONCURRENT transaction is opened on the
** database. It allocates the BtreePtrmap structure used to track pointers
** to allocated pages and zeroes the nFree/iTrunk fields in the database 
** header on page 1.
*/
static int btreePtrmapAllocate(BtShared *pBt){
  int rc = SQLITE_OK;
  if( pBt->pMap==0 ){
    BtreePtrmap *pMap = sqlite3_malloc(sizeof(BtreePtrmap));

    if( pMap==0 ){
      rc = SQLITE_NOMEM;
    }else{
      memset(&pBt->pPage1->aData[32], 0, sizeof(u32)*2);
      memset(pMap, 0, sizeof(BtreePtrmap));
      pMap->iFirst = pBt->nPage + 1;
      pBt->pMap = pMap;
    }
  }
  return rc;
}

/*
** Free any BtreePtrmap structure allocated by an earlier call to
** btreePtrmapAllocate().
3269
3270
3271
3272
3273
3274
3275

3276
3277
3278
3279
3280
3281
3282
** when A already has a read lock, we encourage A to give up and let B
** proceed.
*/
int sqlite3BtreeBeginTrans(Btree *p, int wrflag){
  sqlite3 *pBlock = 0;
  BtShared *pBt = p->pBt;
  int rc = SQLITE_OK;


  sqlite3BtreeEnter(p);
  btreeIntegrity(p);

  /* If the btree is already in a write-transaction, or it
  ** is already in a read-transaction and a read-transaction
  ** is requested, this is a no-op.







>







3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
** when A already has a read lock, we encourage A to give up and let B
** proceed.
*/
int sqlite3BtreeBeginTrans(Btree *p, int wrflag){
  sqlite3 *pBlock = 0;
  BtShared *pBt = p->pBt;
  int rc = SQLITE_OK;
  int bConcurrent = (p->db->bConcurrent && !ISAUTOVACUUM);

  sqlite3BtreeEnter(p);
  btreeIntegrity(p);

  /* If the btree is already in a write-transaction, or it
  ** is already in a read-transaction and a read-transaction
  ** is requested, this is a no-op.
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
3360
    */
    while( pBt->pPage1==0 && SQLITE_OK==(rc = lockBtree(pBt)) );

    if( rc==SQLITE_OK && wrflag ){
      if( (pBt->btsFlags & BTS_READ_ONLY)!=0 ){
        rc = SQLITE_READONLY;
      }else{
        int exFlag = (p->db->bConcurrent && !ISAUTOVACUUM) ? -1 : (wrflag>1);
        int bSubjInMem = sqlite3TempInMemory(p->db);
        assert( p->db->bConcurrent==0 || wrflag==1 );
        rc = sqlite3PagerBegin(pBt->pPager, exFlag, bSubjInMem);
        if( rc==SQLITE_OK ){
          rc = newDatabase(pBt);
        }
#ifdef SQLITE_ENABLE_CONCURRENT
        if( rc==SQLITE_OK && sqlite3PagerIsConcurrent(pBt->pPager) ){
          rc = btreePtrmapAllocate(pBt);
        }
#endif
      }
    }
  
    if( rc!=SQLITE_OK ){
      unlockBtreeIfUnused(pBt);
    }
  }while( (rc&0xFF)==SQLITE_BUSY && pBt->inTransaction==TRANS_NONE &&







|
<
<
|



<
<
<
<
<







3337
3338
3339
3340
3341
3342
3343
3344


3345
3346
3347
3348





3349
3350
3351
3352
3353
3354
3355
    */
    while( pBt->pPage1==0 && SQLITE_OK==(rc = lockBtree(pBt)) );

    if( rc==SQLITE_OK && wrflag ){
      if( (pBt->btsFlags & BTS_READ_ONLY)!=0 ){
        rc = SQLITE_READONLY;
      }else{
        int exFlag = bConcurrent ? -1 : (wrflag>1);


        rc = sqlite3PagerBegin(pBt->pPager, exFlag, sqlite3TempInMemory(p->db));
        if( rc==SQLITE_OK ){
          rc = newDatabase(pBt);
        }





      }
    }
  
    if( rc!=SQLITE_OK ){
      unlockBtreeIfUnused(pBt);
    }
  }while( (rc&0xFF)==SQLITE_BUSY && pBt->inTransaction==TRANS_NONE &&
3398
3399
3400
3401
3402
3403
3404









3405
3406
3407
3408
3409
3410
3411
        }
      }
    }
  }


trans_begun:









  if( rc==SQLITE_OK && wrflag ){
    /* This call makes sure that the pager has the correct number of
    ** open savepoints. If the second parameter is greater than 0 and
    ** the sub-journal is not already open, then it will be opened here.
    */
    int nSavepoint = p->db->nSavepoint;
    rc = sqlite3PagerOpenSavepoint(pBt->pPager, nSavepoint);







>
>
>
>
>
>
>
>
>







3393
3394
3395
3396
3397
3398
3399
3400
3401
3402
3403
3404
3405
3406
3407
3408
3409
3410
3411
3412
3413
3414
3415
        }
      }
    }
  }


trans_begun:
#ifdef SQLITE_ENABLE_CONCURRENT
  if( bConcurrent && rc==SQLITE_OK && sqlite3PagerIsWal(pBt->pPager) ){
    rc = sqlite3PagerBeginConcurrent(pBt->pPager);
    if( rc==SQLITE_OK && wrflag ){
      rc = btreePtrmapAllocate(pBt);
    }
  }
#endif

  if( rc==SQLITE_OK && wrflag ){
    /* This call makes sure that the pager has the correct number of
    ** open savepoints. If the second parameter is greater than 0 and
    ** the sub-journal is not already open, then it will be opened here.
    */
    int nSavepoint = p->db->nSavepoint;
    rc = sqlite3PagerOpenSavepoint(pBt->pPager, nSavepoint);
3933
3934
3935
3936
3937
3938
3939
3940
3941
3942
3943
3944
3945
3946
3947
  ** or freed by this transaction. In this case no special handling is 
  ** required. Otherwise, if page 1 is dirty, proceed.  */
  BtreePtrmap *pMap = pBt->pMap;
  Pgno iTrunk = get4byte(&p1[32]);
  Pgno nPage = btreePagecount(pBt);
  u32 nFree = get4byte(&p1[36]);

  assert( sqlite3PagerIsConcurrent(pPager) );
  assert( pBt->pMap );
  rc = sqlite3PagerUpgradeSnapshot(pPager, pPage1->pDbPage);
  assert( p1==pPage1->aData );

  if( rc==SQLITE_OK ){
    Pgno nHPage = get4byte(&p1[28]);
    Pgno nFin = nHPage;         /* Size of db after transaction merge */







<







3937
3938
3939
3940
3941
3942
3943

3944
3945
3946
3947
3948
3949
3950
  ** or freed by this transaction. In this case no special handling is 
  ** required. Otherwise, if page 1 is dirty, proceed.  */
  BtreePtrmap *pMap = pBt->pMap;
  Pgno iTrunk = get4byte(&p1[32]);
  Pgno nPage = btreePagecount(pBt);
  u32 nFree = get4byte(&p1[36]);


  assert( pBt->pMap );
  rc = sqlite3PagerUpgradeSnapshot(pPager, pPage1->pDbPage);
  assert( p1==pPage1->aData );

  if( rc==SQLITE_OK ){
    Pgno nHPage = get4byte(&p1[28]);
    Pgno nFin = nHPage;         /* Size of db after transaction merge */
4097
4098
4099
4100
4101
4102
4103
4104


4105

4106
4107
4108
4109
4110
4111
4112

    /* Set the current transaction state to TRANS_NONE and unlock the 
    ** pager if this call closed the only read or write transaction.  */
    p->inTrans = TRANS_NONE;
    unlockBtreeIfUnused(pBt);
  }

  /* If this was an CONCURRENT transaction, delete the pBt->pMap object */


  btreePtrmapDelete(pBt);

  btreeIntegrity(p);
}

/*
** Commit the transaction currently in progress.
**
** This routine implements the second phase of a 2-phase commit.  The







|
>
>

>







4100
4101
4102
4103
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118

    /* Set the current transaction state to TRANS_NONE and unlock the 
    ** pager if this call closed the only read or write transaction.  */
    p->inTrans = TRANS_NONE;
    unlockBtreeIfUnused(pBt);
  }

  /* If this was an CONCURRENT transaction, delete the pBt->pMap object.
  ** Also call PagerEndConcurrent() to ensure that the pager has discarded
  ** the record of all pages read within the transaction.  */
  btreePtrmapDelete(pBt);
  sqlite3PagerEndConcurrent(pBt->pPager);
  btreeIntegrity(p);
}

/*
** Commit the transaction currently in progress.
**
** This routine implements the second phase of a 2-phase commit.  The
Changes to src/pager.c.
1737
1738
1739
1740
1741
1742
1743






















1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
      testcase( rc==SQLITE_NOMEM );
      assert( rc==SQLITE_OK || rc==SQLITE_NOMEM );
    }
  }
  return rc;
}























/*
** Free the Pager.pInJournal and Pager.pAllRead bitvec objects.
*/
static void pagerFreeBitvecs(Pager *pPager){
  sqlite3BitvecDestroy(pPager->pInJournal);
  pPager->pInJournal = 0;
#ifdef SQLITE_ENABLE_CONCURRENT
  sqlite3BitvecDestroy(pPager->pAllRead);
  pPager->pAllRead = 0;
#endif
}

/*
** This function is a no-op if the pager is in exclusive mode and not
** in the ERROR state. Otherwise, it switches the pager to PAGER_OPEN
** state.
**







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>






<
<
|
<







1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771


1772

1773
1774
1775
1776
1777
1778
1779
      testcase( rc==SQLITE_NOMEM );
      assert( rc==SQLITE_OK || rc==SQLITE_NOMEM );
    }
  }
  return rc;
}

#ifdef SQLITE_ENABLE_CONCURRENT
int sqlite3PagerBeginConcurrent(Pager *pPager){
  int rc = SQLITE_OK;
  if( pPager->pAllRead==0 ){
    pPager->pAllRead = sqlite3BitvecCreate(pPager->dbSize);
    if( pPager->pAllRead==0 ){
      rc = SQLITE_NOMEM;
    }
  }
  return rc;
}

void sqlite3PagerEndConcurrent(Pager *pPager){
  sqlite3BitvecDestroy(pPager->pAllRead);
  pPager->pAllRead = 0;
}

int sqlite3PagerIsWal(Pager *pPager){
  return pPager->pWal!=0;
}
#endif

/*
** Free the Pager.pInJournal and Pager.pAllRead bitvec objects.
*/
static void pagerFreeBitvecs(Pager *pPager){
  sqlite3BitvecDestroy(pPager->pInJournal);
  pPager->pInJournal = 0;


  sqlite3PagerEndConcurrent(pPager);

}

/*
** This function is a no-op if the pager is in exclusive mode and not
** in the ERROR state. Otherwise, it switches the pager to PAGER_OPEN
** state.
**
5608
5609
5610
5611
5612
5613
5614
5615
5616
5617
5618
5619
5620
5621
5622
5623
5624
5625
5626
5627
5628
5629
5630
5631
5632
5633
5634
5635
5636
5637
5638
5639
5640
5641
5642
5643
5644
5645
5646
5647
5648
5649
5650
5651

  if( pPager->errCode ) return pPager->errCode;
  assert( pPager->eState>=PAGER_READER && pPager->eState<PAGER_ERROR );
  pPager->subjInMemory = (u8)subjInMemory;

  if( ALWAYS(pPager->eState==PAGER_READER) ){
    assert( pPager->pInJournal==0 );
#ifdef SQLITE_ENABLE_CONCURRENT
    assert( pPager->pAllRead==0 );
#endif

    if( pagerUseWal(pPager) ){
      /* If the pager is configured to use locking_mode=exclusive, and an
      ** exclusive lock on the database is not already held, obtain it now.
      */
      if( pPager->exclusiveMode && sqlite3WalExclusiveMode(pPager->pWal, -1) ){
        rc = pagerLockDb(pPager, EXCLUSIVE_LOCK);
        if( rc!=SQLITE_OK ){
          return rc;
        }
        sqlite3WalExclusiveMode(pPager->pWal, 1);
      }

      /* Grab the write lock on the log file. If successful, upgrade to
      ** PAGER_RESERVED state. Otherwise, return an error code to the caller.
      ** The busy-handler is not invoked if another connection already
      ** holds the write-lock. If possible, the upper layer will call it.
      */
#ifdef SQLITE_ENABLE_CONCURRENT
      if( exFlag<0 ){
        pPager->pAllRead = sqlite3BitvecCreate(pPager->dbSize);
        if( pPager->pAllRead==0 ){
          rc = SQLITE_NOMEM;
        }
      }else
#endif
      {
        rc = sqlite3WalBeginWriteTransaction(pPager->pWal);
      }
    }else{
      /* Obtain a RESERVED lock on the database file. If the exFlag parameter
      ** is true, then immediately upgrade this to an EXCLUSIVE lock. The
      ** busy-handler callback can be used when upgrading to the EXCLUSIVE
      ** lock, but not when obtaining the RESERVED lock.







<
<
<
<















|
<
<
|
<
<
<
<
<
<
<







5627
5628
5629
5630
5631
5632
5633




5634
5635
5636
5637
5638
5639
5640
5641
5642
5643
5644
5645
5646
5647
5648
5649


5650







5651
5652
5653
5654
5655
5656
5657

  if( pPager->errCode ) return pPager->errCode;
  assert( pPager->eState>=PAGER_READER && pPager->eState<PAGER_ERROR );
  pPager->subjInMemory = (u8)subjInMemory;

  if( ALWAYS(pPager->eState==PAGER_READER) ){
    assert( pPager->pInJournal==0 );




    if( pagerUseWal(pPager) ){
      /* If the pager is configured to use locking_mode=exclusive, and an
      ** exclusive lock on the database is not already held, obtain it now.
      */
      if( pPager->exclusiveMode && sqlite3WalExclusiveMode(pPager->pWal, -1) ){
        rc = pagerLockDb(pPager, EXCLUSIVE_LOCK);
        if( rc!=SQLITE_OK ){
          return rc;
        }
        sqlite3WalExclusiveMode(pPager->pWal, 1);
      }

      /* Grab the write lock on the log file. If successful, upgrade to
      ** PAGER_RESERVED state. Otherwise, return an error code to the caller.
      ** The busy-handler is not invoked if another connection already
      ** holds the write-lock. If possible, the upper layer will call it.  */


      if( exFlag>=0 ){







        rc = sqlite3WalBeginWriteTransaction(pPager->pWal);
      }
    }else{
      /* Obtain a RESERVED lock on the database file. If the exFlag parameter
      ** is true, then immediately upgrade this to an EXCLUSIVE lock. The
      ** busy-handler callback can be used when upgrading to the EXCLUSIVE
      ** lock, but not when obtaining the RESERVED lock.
6180
6181
6182
6183
6184
6185
6186
6187
6188
6189
6190
6191
6192
6193
6194
6195
6196
6197
6198
6199
6200
/*
** Set the in-memory cache of the database file size to nSz pages.
*/
void sqlite3PagerSetDbsize(Pager *pPager, Pgno nSz){
  pPager->dbSize = nSz;
}

/*
** Return true if this pager is currently within an CONCURRENT transaction.
*/
int sqlite3PagerIsConcurrent(Pager *pPager){
  return pPager->pAllRead!=0;
}

/*
** If this is a WAL mode connection and the WRITER lock is currently held,
** relinquish it.
*/
void sqlite3PagerDropExclusiveLock(Pager *pPager){
  if( pagerUseWal(pPager) ){
    sqlite3WalEndWriteTransaction(pPager->pWal);







<
<
<
<
<
<
<







6186
6187
6188
6189
6190
6191
6192







6193
6194
6195
6196
6197
6198
6199
/*
** Set the in-memory cache of the database file size to nSz pages.
*/
void sqlite3PagerSetDbsize(Pager *pPager, Pgno nSz){
  pPager->dbSize = nSz;
}








/*
** If this is a WAL mode connection and the WRITER lock is currently held,
** relinquish it.
*/
void sqlite3PagerDropExclusiveLock(Pager *pPager){
  if( pagerUseWal(pPager) ){
    sqlite3WalEndWriteTransaction(pPager->pWal);
Changes to src/pager.h.
190
191
192
193
194
195
196

197
198
199
200
201





202
203
204
205
206
207
208
int sqlite3SectorSize(sqlite3_file *);

/* Functions used to truncate the database file. */
void sqlite3PagerTruncateImage(Pager*,Pgno);

void sqlite3PagerRekey(DbPage*, Pgno, u16);


void sqlite3PagerDropExclusiveLock(Pager*);
int sqlite3PagerIsConcurrent(Pager*);
int sqlite3PagerIswriteable(DbPage*);
int sqlite3PagerUpgradeSnapshot(Pager *pPager, DbPage*);
void sqlite3PagerSetDbsize(Pager *pPager, Pgno);






#if defined(SQLITE_HAS_CODEC) && !defined(SQLITE_OMIT_WAL)
void *sqlite3PagerCodec(DbPage *);
#endif

/* Functions to support testing and debugging. */
#if !defined(NDEBUG) || defined(SQLITE_TEST)







>
|
|
|


>
>
>
>
>







190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
int sqlite3SectorSize(sqlite3_file *);

/* Functions used to truncate the database file. */
void sqlite3PagerTruncateImage(Pager*,Pgno);

void sqlite3PagerRekey(DbPage*, Pgno, u16);

#ifdef SQLITE_ENABLE_CONCURRENT
void sqlite3PagerEndConcurrent(Pager*);
int sqlite3PagerBeginConcurrent(Pager*);
void sqlite3PagerDropExclusiveLock(Pager*);
int sqlite3PagerUpgradeSnapshot(Pager *pPager, DbPage*);
void sqlite3PagerSetDbsize(Pager *pPager, Pgno);
#else
# define sqlite3PagerEndConcurrent(x)
#endif

int sqlite3PagerIswriteable(DbPage*);

#if defined(SQLITE_HAS_CODEC) && !defined(SQLITE_OMIT_WAL)
void *sqlite3PagerCodec(DbPage *);
#endif

/* Functions to support testing and debugging. */
#if !defined(NDEBUG) || defined(SQLITE_TEST)
Changes to test/concurrent2.test.
422
423
424
425
426
427
428

























429
430
431
432
    }
  } {}
  do_test 9.$tn.4.2 {
    sql2 { DELETE FROM pp WHERE i=1 }
    list [catch { sql1 COMMIT } msg] $msg
  } {0 {}}
}



























finish_test








>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>




422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
    }
  } {}
  do_test 9.$tn.4.2 {
    sql2 { DELETE FROM pp WHERE i=1 }
    list [catch { sql1 COMMIT } msg] $msg
  } {0 {}}
}

do_multiclient_test tn {
  do_test 10.$tn.1 {
    sql1 {
      PRAGMA journal_mode = wal;
      CREATE TABLE t1(a);
      CREATE TABLE t2(b);
      INSERT INTO t1 VALUES(1), (2), (3);
      INSERT INTO t2 VALUES(1), (2), (3);
    }
  } {wal}

  do_test 10.$tn.2 {
    sql1 {
      BEGIN CONCURRENT;
        SELECT * FROM t1;
        INSERT INTO t2 VALUES(4);
    }
  } {1 2 3}

  do_test 10.$tn.3 {
    sql2 { INSERT INTO t1 VALUES(4) }
    list [catch {sql1 COMMIT} msg] $msg
  } {1 {database is locked}}
}


finish_test