SQLite4
Check-in [ef0cf1ade3]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add code for taking and querying read-locks. Untested.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: ef0cf1ade397e34127a927c805e058fe4fde1cf0
User & Date: dan 2013-10-28 19:48:35
Context
2013-10-29
17:46
Add code to prevent database writers from overwriting portions of the log that might be required by present or future database readers or recoverers. check-in: 407a82adbf user: dan tags: trunk
2013-10-28
19:48
Add code for taking and querying read-locks. Untested. check-in: ef0cf1ade3 user: dan tags: trunk
10:56
Fix a problem causing builds without LSM_NO_SYNC to fail. check-in: f8ce90c4a6 user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/btInt.h.

   193    193   int sqlite4BtLogSnapshotOpen(BtLog*);
   194    194   int sqlite4BtLogSnapshotClose(BtLog*);
   195    195   int sqlite4BtLogSnapshotWritable(BtLog*);
   196    196   
   197    197   int sqlite4BtLogSize(BtLog*);
   198    198   int sqlite4BtLogCheckpoint(BtLog*);
   199    199   
          200  +int sqlite4BtLogFrameToIdx(u32 *aLog, u32 iFrame);
          201  +
   200    202   #ifndef NDEBUG
   201    203   void sqlite4BtDebugReadPage(u32 pgno, u8 *aData, int pgsz);
   202    204   #else
   203    205   # define sqlite4BtDebugReadPage(a,b,c)
   204    206   #endif
   205    207   
   206    208   /*
................................................................................
   208    210   *************************************************************************/
   209    211   
   210    212   /*************************************************************************
   211    213   ** Interface to bt_lock.c functionality.
   212    214   */
   213    215   typedef struct BtShared BtShared;
   214    216   typedef struct BtLock BtLock;
          217  +typedef struct BtReadSlot BtReadSlot;
   215    218   struct BtLock {
   216    219     /* These three are set by the bt_pager module and thereafter used by 
   217    220     ** the bt_lock, bt_pager and bt_log modules. */
   218    221     sqlite4_env *pEnv;              /* SQLite environment */
   219    222     bt_env *pVfs;                   /* Bt environment */
   220    223     bt_file *pFd;                   /* Database file descriptor */
   221    224   
   222    225     /* These are used only by the bt_lock module. */
   223    226     BtShared *pShared;              /* Shared by all handles on this file */
   224    227     BtLock *pNext;                  /* Next connection using pShared */
   225    228     u32 mExclLock;                  /* Mask of exclusive locks held */
   226    229     u32 mSharedLock;                /* Mask of shared locks held */
   227    230   };
          231  +
          232  +struct BtReadSlot {
          233  +  u32 iFirst;
          234  +  u32 iLast;
          235  +};
   228    236   
   229    237   /* Connect and disconnect procedures */
   230    238   int sqlite4BtLockConnect(BtLock*, int (*xRecover)(BtLock*));
   231    239   int sqlite4BtLockDisconnect(BtLock*, int(*xCkpt)(BtLock*), int(*xDel)(BtLock*));
   232    240   
   233    241   /* Obtain and release the WRITER lock */
   234    242   int sqlite4BtLockWriter(BtLock*);
   235    243   int sqlite4BtLockWriterUnlock(BtLock*);
   236    244   
   237         -/* Obtain, release and query READER locks.  */
   238         -int sqlite4BtLockReader(BtLock*, u32 *aLog, u32 *aLock);
   239         -int sqlite4BtLockReaderUnlock(BtLock*);
   240         -int sqlite4BtLockReaderMin(BtLock*, u32 *aLog, u32 *aLock, u32 *piMinFrame);
   241         -
   242    245   /* Obtain and release CHECKPOINTER lock */
   243    246   int sqlite4BtLockCkpt(BtLock*);
   244    247   int sqlite4BtLockCkptUnlock(BtLock*);
   245    248   
          249  +/* Obtain and release READER locks.  */
          250  +int sqlite4BtLockReader(BtLock*, u32 *aLog, BtReadSlot *aLock);
          251  +int sqlite4BtLockReaderUnlock(BtLock*);
          252  +
          253  +/* Query READER locks.  */
          254  +int sqlite4BtLockReaderQuery(BtLock*, u32*, BtReadSlot*, u32*, int*);
          255  +
   246    256   /* Obtain pointers to shared-memory chunks */
   247    257   int sqlite4BtLockShmMap(BtLock*, int iChunk, int nByte, u8 **ppOut);
   248    258   
   249    259   /*
   250    260   ** End of bt_lock.c interface.
   251    261   *************************************************************************/
   252    262   

Changes to src/bt_lock.c.

    15     15   #include "sqliteInt.h"
    16     16   #include "btInt.h"
    17     17   
    18     18   #include <string.h>
    19     19   #include <assert.h>
    20     20   #include <stdio.h>
    21     21   
    22         -#define BT_LOCK_DMS1       0      /* DMS1 */
    23         -#define BT_LOCK_DMS2_RW    1      /* DMS2/rw */
    24         -#define BT_LOCK_DMS2_RO    2      /* DMS2/ro */
    25         -#define BT_LOCK_WRITER     3      /* WRITER lock */
    26         -#define BT_LOCK_CKPTER     4      /* CHECKPOINTER lock */
    27         -#define BT_LOCK_READER0    5      /* Array of BT_NREADER locks */
           22  +#define BT_LOCK_DMS1          0   /* DMS1 */
           23  +#define BT_LOCK_DMS2_RW       1   /* DMS2/rw */
           24  +#define BT_LOCK_DMS2_RO       2   /* DMS2/ro */
           25  +#define BT_LOCK_WRITER        3   /* WRITER lock */
           26  +#define BT_LOCK_CKPTER        4   /* CHECKPOINTER lock */
           27  +#define BT_LOCK_READER_DBONLY 5   /* Reading the db file only */
           28  +#define BT_LOCK_READER0       6   /* Array of BT_NREADER locks */
    28     29   
    29     30   #define BT_LOCK_UNLOCK     0
    30     31   #define BT_LOCK_SHARED     1
    31     32   #define BT_LOCK_EXCL       2
    32     33   
    33     34   typedef struct BtFile BtFile;
    34     35   
................................................................................
   330    331       sqlite4_free(p->pEnv, pShared->apShmChunk);
   331    332       sqlite4_free(p->pEnv, pShared);
   332    333     }
   333    334     btLockMutexLeave();
   334    335     return rc;
   335    336   }
   336    337   
   337         -int sqlite4BtLockCheckpoint(BtLock *p, int (*xCkpt)(BtLock*)){
   338         -  return xCkpt(p);
   339         -}
   340         -
   341         -int sqlite4BtLockBegin(BtLock *p, int eLock){
   342         -  return SQLITE4_OK;
   343         -}
   344         -
   345         -int sqlite4BtLockEnd(BtLock *p, int eLock){
   346         -  return SQLITE4_OK;
   347         -}
   348         -
   349    338   /* 
   350    339   ** Obtain a READER lock. 
   351    340   **
   352    341   ** Argument aLog points to an array of 6 frame addresses. These are the 
   353    342   ** first and last frames in each of log regions A, B and C. Argument 
   354    343   ** aLock points to the array of read-lock slots in shared memory.
   355    344   */
   356         -int sqlite4BtLockReader(BtLock *pLock, u32 *aLog, u32 *aLock){
   357         -  /* todo... */
   358         -  return SQLITE4_OK;
          345  +int sqlite4BtLockReader(
          346  +  BtLock *pLock,                  /* Lock module handle */
          347  +  u32 *aLog,                      /* Current log file topology */
          348  +  BtReadSlot *aSlot               /* Array of read-lock slots (in shmem) */
          349  +){
          350  +  int rc = SQLITE4_BUSY;          /* Return code */
          351  +  int i;                          /* Loop counter */
          352  +
          353  +  /* Find the first and last log frames that this client will use. */
          354  +  u32 iLast = aLog[5];
          355  +  u32 iFirst = 0;
          356  +  for(i=0; iFirst==0 && i<3; i++){
          357  +    iFirst = aLog[i*2];
          358  +  }
          359  +
          360  +  if( iFirst==0 ){
          361  +    rc = btLockLockop(pLock, BT_LOCK_READER_DBONLY, BT_LOCK_SHARED, 0);
          362  +  }else{
          363  +    int nAttempt = 100;           /* Remaining lock attempts */
          364  +
          365  +    while( rc==SQLITE4_BUSY && (nAttempt--)>0 ){
          366  +
          367  +      /* Try to find a slot populated with the values required. */
          368  +      for(i=0; i<BT_NREADER; i++){
          369  +        if( aSlot[i].iFirst==iFirst && aSlot[i].iLast==iLast ) break;
          370  +      }
          371  +
          372  +      /* Or, if there is no slot with the required values - try to create one */
          373  +      if( i==BT_NREADER ){
          374  +        for(i=0; i<BT_NREADER; i++){
          375  +          rc = btLockLockop(pLock, BT_LOCK_READER0 + i, BT_LOCK_EXCL, 0);
          376  +          if( rc==SQLITE4_OK ){
          377  +            /* The EXCLUSIVE lock obtained by the successful call to
          378  +             ** btLockLockop() is released below by the call to obtain
          379  +             ** a SHARED lock on the same locking slot. */
          380  +            aSlot[i].iFirst = iFirst;
          381  +            aSlot[i].iLast = iLast;
          382  +            break;
          383  +          }else if( rc!=SQLITE4_BUSY ){
          384  +            return rc;
          385  +          }
          386  +        }
          387  +      }
          388  +
          389  +      if( i==BT_NREADER ){
          390  +        /* TODO: Try to find a usable slot */
          391  +      }
          392  +
          393  +      if( i<BT_NREADER ){
          394  +        rc = btLockLockop(pLock, BT_LOCK_READER0 + i, BT_LOCK_SHARED, 0);
          395  +        if( rc==SQLITE4_OK ){
          396  +          if( aSlot[i].iFirst!=iFirst || aSlot[i].iLast!=iLast ){
          397  +            btLockLockop(pLock, BT_LOCK_READER0 + i, BT_LOCK_UNLOCK, 0);
          398  +            rc = SQLITE4_BUSY;
          399  +          }
          400  +        }
          401  +      }
          402  +    }
          403  +  }
          404  +
          405  +  return rc;
   359    406   }
   360    407   
   361    408   /*
   362    409   ** Release the READER lock currently held by connection pLock.
   363    410   */
   364    411   int sqlite4BtLockReaderUnlock(BtLock *pLock){
          412  +  int i;
          413  +
          414  +  /* Release any locks held on reader slots. */
          415  +  assert( (BT_LOCK_READER_DBONLY+1)==BT_LOCK_READER0 );
          416  +  for(i=0; i<BT_NREADER+1; i++){
          417  +    btLockLockop(pLock, BT_LOCK_READER_DBONLY + i, BT_LOCK_UNLOCK, 0);
          418  +  }
          419  +
          420  +  return SQLITE4_OK;
          421  +}
          422  +
          423  +/*
          424  +** This function is used to determine which parts of the log and database
          425  +** files are currently in use by readers. It is called in two scenarios:
          426  +**
          427  +**   * by CHECKPOINTER clients, to determine how much of the log may
          428  +**     be safely copied into the database file. In this case parameter
          429  +**     piDblocked is non-NULL.
          430  +**
          431  +**   * by WRITER clients, to determine how much of the log is no longer
          432  +**     required by any present or future reader. This case can be identified
          433  +**     by (piDblocked==NULL).
          434  +*/
          435  +int sqlite4BtLockReaderQuery(
          436  +  BtLock *pLock,                  /* Lock handle */
          437  +  u32 *aLog,                      /* Current log topology */
          438  +  BtReadSlot *aSlot,              /* Array of BT_NREADER read slots */
          439  +  u32 *piOut,                     /* OUT: Query result */
          440  +  int *piDblocked                 /* OUT: True if READER_DB_ONLY is locked */
          441  +){
          442  +  u32 iOut = 0;
          443  +  u32 iIdxOut = 0;
          444  +  int bLast = (piDblocked!=0);
          445  +  int rc = SQLITE4_OK;
          446  +  int i;
          447  +
          448  +  if( piDblocked ){
          449  +    rc = btLockLockop(pLock, BT_LOCK_READER_DBONLY, BT_LOCK_EXCL, 0);
          450  +    if( rc==SQLITE4_OK ){
          451  +      *piDblocked = 0;
          452  +      btLockLockop(pLock, BT_LOCK_READER_DBONLY, BT_LOCK_UNLOCK, 0);
          453  +    }else if( rc==SQLITE4_BUSY ){
          454  +      *piDblocked = 1;
          455  +    }else{
          456  +      return rc;
          457  +    }
          458  +  }
          459  +
          460  +  for(i=0; i<3 && iOut==0; i++){
          461  +    int iSlot;
          462  +    for(iSlot=0; iSlot<BT_NREADER; iSlot++){
          463  +      u32 iVal = (bLast ? aSlot[iSlot].iLast : aSlot[iSlot].iFirst);
          464  +      if( iVal ){
          465  +        /* Try to zero the slot. */
          466  +        rc = btLockLockop(pLock, BT_LOCK_READER0 + iSlot, BT_LOCK_EXCL, 0);
          467  +        if( rc==SQLITE4_OK ){
          468  +          aSlot[iSlot].iFirst = 0;
          469  +          aSlot[iSlot].iLast = 0;
          470  +          btLockLockop(pLock, BT_LOCK_READER0 + iSlot, BT_LOCK_UNLOCK, 0);
          471  +        }else if( rc==SQLITE4_BUSY ){
          472  +          int iIdx = sqlite4BtLogFrameToIdx(aLog, iVal);
          473  +          if( iOut==0 || iIdx<iIdxOut ){
          474  +            iIdxOut = iIdx;
          475  +            iOut = iVal;
          476  +          }
          477  +        }else{
          478  +          return rc;
          479  +        }
          480  +      }
          481  +    }
          482  +  }
          483  +
          484  +  *piOut = iOut;
   365    485     return SQLITE4_OK;
   366    486   }
   367    487   
   368    488   int sqlite4BtLockShmMap(BtLock *pLock, int iChunk, int nByte, u8 **ppOut){
   369    489     int rc = SQLITE4_OK;
   370    490     BtShared *pShared = pLock->pShared;
   371    491     u8 *pOut = 0;
................................................................................
   401    521     }
   402    522     sqlite4_mutex_leave(pShared->pClientMutex);
   403    523     
   404    524     *ppOut = pOut;
   405    525     return rc;
   406    526   }
   407    527   
          528  +/*
          529  +** Attempt to obtain the CHECKPOINTER lock. If the attempt is successful,
          530  +** return SQLITE4_OK. If the CHECKPOINTER lock cannot be obtained because
          531  +** it is held by some other connection, return SQLITE4_BUSY. 
          532  +**
          533  +** If any other error occurs, return an SQLite4 error code.
          534  +*/
   408    535   int sqlite4BtLockCkpt(BtLock *pLock){
   409    536     return btLockLockop(pLock, BT_LOCK_CKPTER, BT_LOCK_EXCL, 0);
   410    537   }
   411         -
   412    538   int sqlite4BtLockCkptUnlock(BtLock *pLock){
   413    539     return btLockLockop(pLock, BT_LOCK_CKPTER, BT_LOCK_UNLOCK, 0);
   414    540   }
          541  +
          542  +/*
          543  +** Attempt to obtain the WRITER lock. If the attempt is successful,
          544  +** return SQLITE4_OK. If the WRITER lock cannot be obtained because
          545  +** it is held by some other connection, return SQLITE4_BUSY. 
          546  +**
          547  +** If any other error occurs, return an SQLite4 error code.
          548  +*/
          549  +int sqlite4BtLockWriter(BtLock *pLock){
          550  +  return btLockLockop(pLock, BT_LOCK_WRITER, BT_LOCK_EXCL, 0);
          551  +}
          552  +int sqlite4BtLockWriterUnlock(BtLock *pLock){
          553  +  return btLockLockop(pLock, BT_LOCK_WRITER, BT_LOCK_UNLOCK, 0);
          554  +}
   415    555   

Changes to src/bt_log.c.

   105    105     u32 iFirstRecover;              /* First recovery frame */
   106    106   };
   107    107   
   108    108   struct BtShm {
   109    109     BtShmHdr hdr1;
   110    110     BtShmHdr hdr2;
   111    111     BtCkptHdr ckpt;
   112         -  u32 aReadlock[BT_NREADER];
          112  +  BtReadSlot aReadlock[BT_NREADER];
   113    113   };
   114    114   
   115    115   /* 
   116    116   ** Log handle used by bt_pager.c to access functionality implemented by
   117    117   ** this module. 
   118    118   */
   119    119   struct BtLog {
................................................................................
   816    816       }
   817    817     }
   818    818   
   819    819     return rc;
   820    820   }
   821    821   
   822    822   /*
   823         -** Attempt to read data for page pgno from the log file. If successful,
   824         -** the data is written into buffer aData[] (which must be at least as
   825         -** large as a database page). In this case SQLITE4_OK is returned.
   826         -**
   827         -** If the log does not contain any version of page pgno, SQLITE4_NOTFOUND
   828         -** is returned and the contents of buffer aData[] are not modified.
   829         -**
   830         -** If any other error occurs, an SQLite4 error code is returned. The final
   831         -** state of buffer aData[] is undefined in this case.
          823  +** If parameter iSafe is non-zero, then this function is being called as
          824  +** part of a checkpoint operation. In this case, if there exists a version
          825  +** of page pgno within the log at some point past frame iSafe, return
          826  +** SQLITE4_NOTFOUND.
   832    827   */
   833         -int sqlite4BtLogRead(BtLog *pLog, u32 pgno, u8 *aData){
          828  +int btLogRead(BtLog *pLog, u32 pgno, u8 *aData, u32 iSafe){
   834    829     const int pgsz = sqlite4BtPagerPagesize((BtPager*)(pLog->pLock));
   835    830     int rc = SQLITE4_NOTFOUND;
   836    831     u32 iFrame = 0;
   837    832     int i;
          833  +
          834  +  int bSeen = (iSafe==0);
   838    835   
   839    836     /* Loop through regions (c), (b) and (a) of the log file. In that order. */
   840    837     for(i=2; i>=0 && rc==SQLITE4_NOTFOUND; i--){
   841    838       u32 iLo = pLog->snapshot.aLog[i*2+0];
   842    839       u32 iHi = pLog->snapshot.aLog[i*2+1];
   843    840       int iSide;
   844    841       int iHash;
................................................................................
   846    843   
   847    844       iHash = btLogFrameHash(pLog, iHi);
   848    845       iHashLast = btLogFrameHash(pLog, iLo);
   849    846       iSide = (pLog->snapshot.iHashSide + (i==0)) % 2;
   850    847   
   851    848       for( ; rc==SQLITE4_NOTFOUND && iHash>=iHashLast; iHash--){
   852    849         rc = btLogHashSearch(pLog, iSide, iHash, iHi, pgno, &iFrame);
   853         -      if( rc==SQLITE4_OK && (iFrame<iLo || iFrame>iHi) ){
   854         -        rc = SQLITE4_NOTFOUND;
          850  +      if( rc==SQLITE4_OK ){
          851  +        if( iFrame<iLo || iFrame>iHi ){
          852  +          rc = SQLITE4_NOTFOUND;
          853  +        }else{
          854  +          if( iSafe>=iLo && iSafe<=iHi ){
          855  +            if( iFrame>iSafe ) return SQLITE4_NOTFOUND;
          856  +          }else if( bSeen==0 ){
          857  +            return SQLITE4_NOTFOUND;
          858  +          }
          859  +        }
   855    860         }
          861  +    }
          862  +    if( (iSafe>=iLo && iSafe<=iHi) ){
          863  +      bSeen = 1;
   856    864       }
   857    865     }
   858    866   
   859    867     if( rc==SQLITE4_OK ){
   860    868       bt_env *pVfs = pLog->pLock->pVfs;
   861    869       i64 iOff;
   862    870       assert( rc==SQLITE4_OK );
................................................................................
   867    875       fprintf(stderr, "read page %d from offset %d\n", (int)pgno, (int)iOff);
   868    876       fflush(stderr);
   869    877   #endif
   870    878     }
   871    879   
   872    880     return rc;
   873    881   }
          882  +
          883  +/*
          884  +** Attempt to read data for page pgno from the log file. If successful,
          885  +** the data is written into buffer aData[] (which must be at least as
          886  +** large as a database page). In this case SQLITE4_OK is returned.
          887  +**
          888  +** If the log does not contain any version of page pgno, SQLITE4_NOTFOUND
          889  +** is returned and the contents of buffer aData[] are not modified.
          890  +**
          891  +** If any other error occurs, an SQLite4 error code is returned. The final
          892  +** state of buffer aData[] is undefined in this case.
          893  +*/
          894  +int sqlite4BtLogRead(BtLog *pLog, u32 pgno, u8 *aData){
          895  +  return btLogRead(pLog, pgno, aData, 0);
          896  +}
   874    897   
   875    898   /*
   876    899   ** Write a frame to the log file.
   877    900   */
   878    901   int sqlite4BtLogWrite(BtLog *pLog, u32 pgno, u8 *aData, int bCommit){
   879    902     const int pgsz = sqlite4BtPagerPagesize((BtPager*)(pLog->pLock));
   880    903     int rc = SQLITE4_OK;
................................................................................
  1031   1054   
  1032   1055     while( rc==SQLITE4_NOTFOUND ){
  1033   1056       /* Attempt to read a copy of the BtShmHdr from shared-memory. */
  1034   1057       rc = btLogSnapshot(pLog, &pLog->snapshot);
  1035   1058   
  1036   1059       /* Take a read lock on the database */
  1037   1060       if( rc==SQLITE4_OK ){
  1038         -      u32 *aReadlock = btLogShm(pLog)->aReadlock;
         1061  +      BtReadSlot *aReadlock = btLogShm(pLog)->aReadlock;
  1039   1062         rc = sqlite4BtLockReader(pLog->pLock, pLog->snapshot.aLog, aReadlock);
  1040   1063       }
  1041   1064   
  1042   1065       /* Check that the BtShmHdr in shared-memory has not changed. If it has,
  1043   1066       ** drop the read-lock and re-attempt the entire operation. */
  1044   1067       if( rc==SQLITE4_OK ){
  1045   1068         rc = btLogSnapshot(pLog, &shmhdr);
................................................................................
  1124   1147         int nLeft = MIN(nMerge, nPgno-iLeft);
  1125   1148         u32 *aRight = &aPgno[iLeft+nMerge];
  1126   1149         int nRight = MIN(nMerge, nPgno-iLeft-nLeft);
  1127   1150         btLogMergeInplace(aLeft, nLeft, aRight, nRight, aSpace, pnPgno);
  1128   1151       }
  1129   1152     }
  1130   1153   }
         1154  +
         1155  +int sqlite4BtLogFrameToIdx(u32 *aLog, u32 iFrame){
         1156  +  int i;
         1157  +  int iRet = 0;
         1158  +  for(i=0; i<3; i++){
         1159  +    u32 iFirst = aLog[i*2];
         1160  +    u32 iLast = aLog[i*2+1];
         1161  +    if( iFirst ){
         1162  +      if( iFrame>=iFirst && iFrame<=iLast ){
         1163  +        iRet += (iFrame - iFirst);
         1164  +        return iRet;
         1165  +      }else{
         1166  +        iRet += (iLast - iFirst);
         1167  +      }
         1168  +    }
         1169  +  }
         1170  +  if( i==3 ) return -1;
         1171  +  return iRet;
         1172  +}
  1131   1173   
  1132   1174   /*
  1133   1175   ** Parameters iFirst and iLast are frame numbers for frames that are part 
  1134   1176   ** of the current log. This function scans the wal-index from iFirst to
  1135   1177   ** iLast (inclusive) and records the set of page numbers that occur once.
  1136   1178   ** This set is sorted in ascending order and returned via the output 
  1137   1179   ** variables *paPgno and *pnPgno.
  1138   1180   */
  1139   1181   static int btLogGatherPgno(
  1140   1182     BtLog *pLog,                    /* Log module handle */
  1141   1183     u32 **paPgno,                   /* OUT: s4_malloc'd array of sorted pgno */
  1142   1184     int *pnPgno,                    /* OUT: Number of entries in *paPgno */
  1143         -  u32 *piLastFrame
         1185  +  u32 *piLastFrame                /* OUT: Last frame checkpointed */
  1144   1186   ){
         1187  +  BtShm *pShm = btLogShm(pLog);
         1188  +  BtLock *pLock = pLog->pLock;
  1145   1189     u32 *aLog = pLog->snapshot.aLog;/* Log file topology */
  1146   1190     u32 i;
  1147   1191     u32 *aPgno;                     /* Returned array */
  1148   1192     int nPgno;                      /* Elements in aPgno[] */
  1149   1193     u32 *aSpace;                    /* Temporary space used by merge-sort */
  1150   1194     int nMax;
  1151   1195     int rc = SQLITE4_OK;
  1152   1196     int iRegion;
         1197  +  int bLocked;
         1198  +  u32 iSafe;                      /* Last frame in log it is safe to gather */
         1199  +
         1200  +  *paPgno = 0;
         1201  +  *pnPgno = 0;
         1202  +  *piLastFrame = 0;
         1203  +
         1204  +  rc = sqlite4BtLockReaderQuery(pLock, aLog, pShm->aReadlock, &iSafe, &bLocked);
         1205  +  if( rc!=SQLITE4_OK || bLocked ) return rc;
  1153   1206   
  1154   1207     /* Determine an upper limit on the number of distinct page numbers. This
  1155   1208     ** limit is used to allocate space for the returned array.  */
  1156   1209     nMax = 0;
  1157   1210     for(iRegion=0; iRegion<3; iRegion++){
  1158   1211       if( aLog[iRegion*2] ){
  1159   1212         nMax += 1 + aLog[iRegion*2+1] - aLog[iRegion*2+0];
................................................................................
  1167   1220     nPgno = 0;
  1168   1221   
  1169   1222     /* Copy the required page numbers into the allocated array */
  1170   1223     for(iRegion=0; iRegion<3; iRegion++){
  1171   1224       u32 iFirst = aLog[iRegion*2];
  1172   1225       u32 iLast = aLog[iRegion*2+1];
  1173   1226       if( iFirst ){
         1227  +      /* If the last frame that it is safe to gather is part of this 
         1228  +      ** region, gather no frames that lie beyond it.  */
         1229  +      if( iSafe>=iFirst && iSafe<=iLast ){
         1230  +        iLast = iSafe;
         1231  +      }
         1232  +
  1174   1233         for(i=iFirst; rc==SQLITE4_OK && i<=iLast; i++){
  1175   1234           int iHash = btLogFrameHash(pLog, i);
  1176   1235           u32 *aPage;
  1177   1236           ht_slot *aHash;
  1178   1237           u32 iZero;
  1179   1238   
  1180   1239           /* It doesn't matter which 'side' of the hash table is requested here,
................................................................................
  1183   1242           ** the second argument to btLogFindHash().  */
  1184   1243           rc = btLogFindHash(pLog, 0, iHash, &aHash, &aPage, &iZero);
  1185   1244           if( rc==SQLITE4_OK ){
  1186   1245             aPgno[nPgno++] = aPage[i-iZero];
  1187   1246           }
  1188   1247         }
  1189   1248         *piLastFrame = iLast;
         1249  +
         1250  +      /* If the last frame of this region is the last frame that it is
         1251  +      ** safe to gather, break out of the loop.  */
         1252  +      if( iLast==iSafe ) break;
  1190   1253       }
  1191   1254     }
  1192   1255   
  1193   1256     /* Sort the contents of the array in ascending order. This step also 
  1194   1257     ** eliminates any  duplicate page numbers. */
  1195   1258     if( rc==SQLITE4_OK ){
  1196   1259       btLogMergeSort(aPgno, &nPgno, aSpace);
................................................................................
  1247   1310         rc = btLogReadData(pLog, iOff, (u8*)&fhdr, sizeof(BtFrameHdr));
  1248   1311         iFirstRead = (fhdr.ctrl & ~BT_FRAME_COMMIT);
  1249   1312       }
  1250   1313   
  1251   1314       /* Copy data from the log file to the database file. */
  1252   1315       for(i=0; rc==SQLITE4_OK && i<nPgno; i++){
  1253   1316         u32 pgno = aPgno[i];
  1254         -      rc = sqlite4BtLogRead(pLog, pgno, aBuf);
         1317  +      rc = btLogRead(pLog, pgno, aBuf, iLast);
  1255   1318         if( rc==SQLITE4_OK ){
  1256   1319           btDebugCkptPage(pgno, aBuf, pgsz);
  1257   1320           i64 iOff = (i64)pgsz * (pgno-1);
  1258   1321           rc = pVfs->xWrite(pFd, iOff, aBuf, pgsz);
         1322  +      }else if( rc==SQLITE4_NOTFOUND ){
         1323  +        rc = SQLITE4_OK;
  1259   1324         }
  1260   1325       }
  1261   1326   
  1262   1327       /* Update the first field of the checkpoint-header. This tells readers
  1263   1328       ** that they need not consider anything that in the log before this
  1264   1329       ** point (since the data has already been copied into the database
  1265   1330       ** file).  */
................................................................................
  1303   1368   
  1304   1369       /* Free the buffer and drop the checkpointer lock */
  1305   1370       sqlite4_free(pLock->pEnv, aBuf);
  1306   1371       sqlite4BtLockCkptUnlock(pLog->pLock);
  1307   1372     }
  1308   1373     return rc;
  1309   1374   }
  1310         -
  1311   1375