SQLite4
Check-in [3b2a50c089]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Instead of locking the CHECKPOINTER byte, have read-only connections take a SHARED lock on the ROTRANS byte when reading from a non-live db. Read-write connections may not recycle space within either the database or log files while such a lock is held, but may perform checkpoint operations.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | read-only-clients
Files: files | file ages | folders
SHA1: 3b2a50c089542e624f100670a9962c5ea4797087
User & Date: dan 2013-02-20 16:03:02
Context
2013-02-20
17:54
Add a test to verify that an lsm_close() that disconnects the last connection to a database flushes the in-memory tree regardless of the multi-process or use-log settings. Leaf check-in: 723d5f2f52 user: dan tags: read-only-clients
16:03
Instead of locking the CHECKPOINTER byte, have read-only connections take a SHARED lock on the ROTRANS byte when reading from a non-live db. Read-write connections may not recycle space within either the database or log files while such a lock is held, but may perform checkpoint operations. check-in: 3b2a50c089 user: dan tags: read-only-clients
2013-02-19
20:16
Add a test case for a read-only transaction outlasting an entire read-write session. And a fix. check-in: 3f53258219 user: dan tags: read-only-clients
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

   139    139   /* Lock definitions. */
   140    140   #define LSM_LOCK_DMS1         1   /* Serialize connect/disconnect ops */
   141    141   #define LSM_LOCK_DMS2         2   /* Read-write connections */
   142    142   #define LSM_LOCK_DMS3         3   /* Read-only connections */
   143    143   #define LSM_LOCK_WRITER       4
   144    144   #define LSM_LOCK_WORKER       5
   145    145   #define LSM_LOCK_CHECKPOINTER 6
   146         -#define LSM_LOCK_READER(i)    ((i) + LSM_LOCK_CHECKPOINTER + 1)
          146  +#define LSM_LOCK_ROTRANS      7
          147  +#define LSM_LOCK_READER(i)    ((i) + LSM_LOCK_ROTRANS + 1)
   147    148   #define LSM_LOCK_RWCLIENT(i)  ((i) + LSM_LOCK_READER(LSM_LOCK_NREADER))
   148    149   
   149    150   /*
   150    151   ** Hard limit on the number of free-list entries that may be stored in 
   151    152   ** a checkpoint (the remainder are stored as a system record in the LSM).
   152    153   ** See also LSM_CONFIG_MAX_FREELIST.
   153    154   */
................................................................................
   553    554     u32 nWrite;                     /* Total number of pages written to disk */
   554    555   };
   555    556   #define LSM_INITIAL_SNAPSHOT_ID 11
   556    557   
   557    558   /*
   558    559   ** Functions from file "lsm_ckpt.c".
   559    560   */
   560         -int lsmCheckpointWrite(lsm_db *, int, int, u32 *);
          561  +int lsmCheckpointWrite(lsm_db *, int, u32 *);
   561    562   int lsmCheckpointLevels(lsm_db *, int, void **, int *);
   562    563   int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);
   563    564   
   564    565   int lsmCheckpointRecover(lsm_db *);
   565    566   int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);
   566    567   
   567    568   int lsmCheckpointLoadWorker(lsm_db *pDb);
................................................................................
   845    846   int lsmDbDatabaseConnect(lsm_db*, const char *);
   846    847   void lsmDbDatabaseRelease(lsm_db *);
   847    848   
   848    849   int lsmBeginReadTrans(lsm_db *);
   849    850   int lsmBeginWriteTrans(lsm_db *);
   850    851   int lsmBeginFlush(lsm_db *);
   851    852   
          853  +int lsmDetectRoTrans(lsm_db *db, int *);
          854  +
   852    855   int lsmBeginWork(lsm_db *);
   853    856   void lsmFinishWork(lsm_db *, int, int *);
   854    857   
   855    858   int lsmFinishRecovery(lsm_db *);
   856    859   void lsmFinishReadTrans(lsm_db *);
   857    860   int lsmFinishWriteTrans(lsm_db *, int);
   858    861   int lsmFinishFlush(lsm_db *, int);
................................................................................
   891    894   /* Candidate values for the 3rd argument to lsmShmLock() */
   892    895   #define LSM_LOCK_UNLOCK 0
   893    896   #define LSM_LOCK_SHARED 1
   894    897   #define LSM_LOCK_EXCL   2
   895    898   
   896    899   int lsmShmCacheChunks(lsm_db *db, int nChunk);
   897    900   int lsmShmLock(lsm_db *db, int iLock, int eOp, int bBlock);
          901  +int lsmShmTestLock(lsm_db *db, int iLock, int nLock, int eOp);
   898    902   void lsmShmBarrier(lsm_db *db);
   899    903   
   900    904   #ifdef LSM_DEBUG
   901    905   void lsmShmHasLock(lsm_db *db, int iLock, int eOp);
   902    906   #else
   903    907   # define lsmShmHasLock(x,y,z)
   904    908   #endif

Changes to src/lsm_log.c.

   300    300   
   301    301   /*
   302    302   ** If possible, reclaim log file space. Log file space is reclaimed after
   303    303   ** a snapshot that points to the same data in the database file is synced
   304    304   ** into the db header.
   305    305   */
   306    306   static int logReclaimSpace(lsm_db *pDb){
   307         -  int rc = LSM_OK;
          307  +  int rc;
   308    308     int iMeta;
          309  +  int bRotrans;                   /* True if there exists some ro-trans */
          310  +
          311  +  /* Test if there exists some other connection with a read-only transaction
          312  +  ** open. If there does, then log file space may not be reclaimed.  */
          313  +  rc = lsmDetectRoTrans(pDb, &bRotrans);
          314  +  if( rc!=LSM_OK || bRotrans ) return rc;
   309    315   
   310    316     iMeta = (int)pDb->pShmhdr->iMetaPage;
   311    317     if( iMeta==1 || iMeta==2 ){
   312    318       DbLog *pLog = &pDb->treehdr.log;
   313    319       i64 iSyncedId;
   314    320   
   315    321       /* Read the snapshot-id of the snapshot stored on meta-page iMeta. Note
................................................................................
  1054   1060                 if( iPass==1 ){
  1055   1061                   if( pLog->aRegion[2].iStart==0 ){
  1056   1062                     assert( pLog->aRegion[1].iStart==0 );
  1057   1063                     pLog->aRegion[1].iEnd = reader.iOff;
  1058   1064                   }else{
  1059   1065                     assert( pLog->aRegion[0].iStart==0 );
  1060   1066                     pLog->aRegion[0].iStart = pLog->aRegion[2].iStart;
  1061         -                  pLog->aRegion[0].iEnd = reader.iOff - reader.buf.n+reader.iBuf;
         1067  +                  pLog->aRegion[0].iEnd = reader.iOff-reader.buf.n+reader.iBuf;
  1062   1068                   }
  1063   1069                   pLog->aRegion[2].iStart = iOff;
  1064   1070                 }else{
  1065   1071                   if( (nJump++)==2 ){
  1066   1072                     bEof = 1;
  1067   1073                   }
  1068   1074                 }

Changes to src/lsm_shared.c.

   284    284               bReadonly = 1;
   285    285               rc = LSM_OK;
   286    286             }
   287    287           }
   288    288   
   289    289           /* Write a checkpoint to disk. */
   290    290           if( rc==LSM_OK ){
   291         -          rc = lsmCheckpointWrite(pDb, (bReadonly==0), 1, 0);
          291  +          rc = lsmCheckpointWrite(pDb, (bReadonly==0), 0);
   292    292           }
   293    293   
   294    294           /* If the checkpoint was written successfully, delete the log file
   295    295           ** and, if possible, truncate the database file.  */
   296    296           if( rc==LSM_OK ){
          297  +          int bRotrans = 0;
   297    298             Database *p = pDb->pDatabase;
   298         -          if( bReadonly==0 ){
          299  +
          300  +          /* The log file may only be deleted if there are no clients 
          301  +          ** read-only clients running rotrans transactions.  */
          302  +          rc = lsmDetectRoTrans(pDb, &bRotrans);
          303  +          if( rc==LSM_OK && bRotrans==0 ){
          304  +            lsmFsCloseAndDeleteLog(pDb->pFS);
          305  +          }
          306  +
          307  +          /* The database may only be truncated if there exist no read-only
          308  +          ** clients - either connected or running rotrans transactions. */
          309  +          if( bReadonly==0 && bRotrans==0 ){
   299    310               dbTruncateFile(pDb);
   300    311               if( p->pFile && p->bMultiProc ){
   301    312                 lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
   302    313               }
   303    314             }
   304    315           }
   305    316         }
................................................................................
   790    801       lsmLogMessage(pDb, 0, "lsmBlockAllocate(): "
   791    802           "snapshot-in-use: %lld (iSynced=%lld) (client-id=%lld)", 
   792    803           iInUse, iSynced, (pDb->iReader>=0 ? pDb->pClient->iId : 0)
   793    804       );
   794    805     }
   795    806   #endif
   796    807   
   797         -  /* Query the free block list for a suitable block */
   798         -  if( rc==LSM_OK ) rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);
          808  +
          809  +  /* Unless there exists a read-only transaction (which prevents us from
          810  +  ** recycling any blocks regardless, query the free block list for a 
          811  +  ** suitable block to reuse. 
          812  +  **
          813  +  ** It might seem more natural to check for a read-only transaction at
          814  +  ** the start of this function. However, it is better do wait until after
          815  +  ** the call to lsmCheckpointSynced() to do so.
          816  +  */
          817  +  if( rc==LSM_OK ){
          818  +    int bRotrans;
          819  +    rc = lsmDetectRoTrans(pDb, &bRotrans);
          820  +
          821  +    if( rc==LSM_OK && bRotrans==0 ){
          822  +      rc = findFreeblock(pDb, iInUse, (iBefore>0), &iRet);
          823  +    }
          824  +  }
   799    825   
   800    826     if( iBefore>0 && (iRet<=0 || iRet>=iBefore) ){
   801    827       iRet = 0;
   802    828   
   803    829     }else if( rc==LSM_OK ){
   804    830       /* If a block was found in the free block list, use it and remove it from 
   805    831       ** the list. Otherwise, if no suitable block was found, allocate one from
................................................................................
   870    896   ** database itself.
   871    897   **
   872    898   ** The WORKER lock must not be held when this is called. This is because
   873    899   ** this function may indirectly call fsync(). And the WORKER lock should
   874    900   ** not be held that long (in case it is required by a client flushing an
   875    901   ** in-memory tree to disk).
   876    902   */
   877         -int lsmCheckpointWrite(lsm_db *pDb, int bTruncate, int bDellog, u32 *pnWrite){
          903  +int lsmCheckpointWrite(lsm_db *pDb, int bTruncate, u32 *pnWrite){
   878    904     int rc;                         /* Return Code */
   879    905     u32 nWrite = 0;
   880    906   
   881    907     assert( pDb->pWorker==0 );
   882    908     assert( 1 || pDb->pClient==0 );
   883    909     assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );
   884    910   
................................................................................
   929    955         );
   930    956   #endif
   931    957       }
   932    958   
   933    959       if( rc==LSM_OK && bTruncate ){
   934    960         rc = lsmFsTruncateDb(pDb->pFS, (i64)nBlock*lsmFsBlockSize(pDb->pFS));
   935    961       }
   936         -    if( rc==LSM_OK && bDellog ){
   937         -      lsmFsCloseAndDeleteLog(pDb->pFS);
   938         -    }
   939    962     }
   940    963   
   941    964     lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
   942    965     if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
   943    966     return rc;
   944    967   }
   945    968   
................................................................................
  1188   1211     }
  1189   1212     if( rc!=LSM_OK ){
  1190   1213       dbReleaseReadlock(pDb);
  1191   1214     }
  1192   1215     if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;
  1193   1216     return rc;
  1194   1217   }
         1218  +
         1219  +/*
         1220  +** This function is used by a read-write connection to determine if there
         1221  +** are currently one or more read-only transactions open on the database
         1222  +** (in this context a read-only transaction is one opened by a read-only
         1223  +** connection on a non-live database).
         1224  +**
         1225  +** If no error occurs, LSM_OK is returned and *pbExists is set to true if
         1226  +** some other connection has a read-only transaction open, or false 
         1227  +** otherwise. If an error occurs an LSM error code is returned and the final
         1228  +** value of *pbExist is undefined.
         1229  +*/
         1230  +int lsmDetectRoTrans(lsm_db *db, int *pbExist){
         1231  +  int rc;
         1232  +
         1233  +  /* Only a read-write connection may use this function. */
         1234  +  assert( db->bReadonly==0 );
         1235  +
         1236  +  rc = lsmShmTestLock(db, LSM_LOCK_ROTRANS, 1, LSM_LOCK_EXCL);
         1237  +  if( rc==LSM_BUSY ){
         1238  +    *pbExist = 1;
         1239  +    rc = LSM_OK;
         1240  +  }else{
         1241  +    *pbExist = 0;
         1242  +  }
         1243  +
         1244  +  return rc;
         1245  +}
  1195   1246   
  1196   1247   /*
  1197   1248   ** db is a read-only database handle in the disconnected state. This function
  1198   1249   ** attempts to open a read-transaction on the database. This may involve
  1199   1250   ** connecting to the database system (opening shared memory etc.).
  1200   1251   */
  1201   1252   int lsmBeginRoTrans(lsm_db *db){
................................................................................
  1210   1261       rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0);
  1211   1262       if( rc!=LSM_OK ) return rc;
  1212   1263   
  1213   1264       rc = lsmShmTestLock(
  1214   1265           db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED
  1215   1266       );
  1216   1267       if( rc==LSM_OK ){
  1217         -      /* System is not live */
  1218         -      rc = lsmShmLock(db, LSM_LOCK_CHECKPOINTER, LSM_LOCK_SHARED, 0);
         1268  +      /* System is not live. Take a SHARED lock on the ROTRANS byte and
         1269  +      ** release DMS1. Locking ROTRANS tells all read-write clients that they
         1270  +      ** may not recycle any disk space from within the database or log files,
         1271  +      ** as a read-only client may be using it.  */
         1272  +      rc = lsmShmLock(db, LSM_LOCK_ROTRANS, LSM_LOCK_SHARED, 0);
  1219   1273         lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
  1220   1274   
  1221   1275         if( rc==LSM_OK ){
  1222   1276           db->bRoTrans = 1;
  1223   1277           rc = lsmShmCacheChunks(db, 1);
  1224   1278           if( rc==LSM_OK ){
  1225   1279             db->pShmhdr = (ShmHeader *)db->apShm[0];
................................................................................
  1268   1322         lsmFree(pDb->pEnv, pDb->apShm[i]);
  1269   1323       }
  1270   1324       lsmFree(pDb->pEnv, pDb->apShm);
  1271   1325       pDb->apShm = 0;
  1272   1326       pDb->nShm = 0;
  1273   1327       pDb->pShmhdr = 0;
  1274   1328   
  1275         -    lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
         1329  +    lsmShmLock(pDb, LSM_LOCK_ROTRANS, LSM_LOCK_UNLOCK, 0);
  1276   1330     }
  1277   1331     dbReleaseReadlock(pDb);
  1278   1332   }
  1279   1333   
  1280   1334   /*
  1281   1335   ** Open a write transaction.
  1282   1336   */
................................................................................
  1883   1937   
  1884   1938   int lsm_checkpoint(lsm_db *pDb, int *pnKB){
  1885   1939     int rc;                         /* Return code */
  1886   1940     u32 nWrite = 0;                 /* Number of pages checkpointed */
  1887   1941   
  1888   1942     /* Attempt the checkpoint. If successful, nWrite is set to the number of
  1889   1943     ** pages written between this and the previous checkpoint.  */
  1890         -  rc = lsmCheckpointWrite(pDb, 0, 0, &nWrite);
         1944  +  rc = lsmCheckpointWrite(pDb, 0, &nWrite);
  1891   1945   
  1892   1946     /* If required, calculate the output variable (KB of data checkpointed). 
  1893   1947     ** Set it to zero if an error occured.  */
  1894   1948     if( pnKB ){
  1895   1949       int nKB = 0;
  1896   1950       if( rc==LSM_OK && nWrite ){
  1897   1951         nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;

Changes to src/lsm_tree.c.

  1079   1079   ** is initialized here - it will be copied into shared memory if log file
  1080   1080   ** recovery is successful.
  1081   1081   */
  1082   1082   int lsmTreeInit(lsm_db *pDb){
  1083   1083     ShmChunk *pOne;
  1084   1084     int rc = LSM_OK;
  1085   1085   
         1086  +  memset(&pDb->treehdr, 0, sizeof(TreeHeader));
  1086   1087     pDb->treehdr.root.iTransId = 1;
  1087   1088     pDb->treehdr.iFirst = 1;
  1088   1089     pDb->treehdr.nChunk = 2;
  1089   1090     pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR;
  1090   1091     pDb->treehdr.iNextShmid = 2;
  1091   1092     pDb->treehdr.iUsedShmid = 1;
  1092   1093   

Changes to test/lsm4.test.

   113    113     db info compression_id
   114    114   } $compression_id(rle)
   115    115   
   116    116   do_test 2.7 {
   117    117     db config {set_compression rle}
   118    118     list [db_fetch db 3] [db_fetch db 4]
   119    119   } {three four}
          120  +
          121  +#-------------------------------------------------------------------------
          122  +#
          123  +catch {db close}
          124  +forcedelete test.db
          125  +
          126  +do_test 3.1 {
          127  +  lsm_open db test.db
          128  +  db_fetch db abc
          129  +} {}
   120    130   
   121    131   finish_test
   122    132   

Changes to test/lsm5.test.

    25     25     set ret
    26     26   }
    27     27   
    28     28   # Create a new database with file name $file.
    29     29   #
    30     30   proc create_abc_db {file} {
    31     31     forcedelete $file
    32         -  lsm_open db $file
           32  +  lsm_open db $file {block_size 256}
    33     33     db write a alpha
    34     34     db write b bravo
    35     35     db write c charlie
    36     36     db close
    37     37   }
    38     38   
    39     39   proc create_abc_log {file} {
................................................................................
   189    189   do_test 4.2 {
   190    190     lsm_open db test.db {readonly 1}
   191    191     db csr_open T
   192    192     list [db_fetch db a] [db_fetch db b] [db_fetch db c]
   193    193   } {alpha bravo charlie}
   194    194   
   195    195   do_test 4.3 { 
   196         -  lsm_open db_rw test.db
          196  +  lsm_open db_rw test.db {block_size 64}
          197  +  db_rw write b BRAVO
   197    198     db_rw close
   198    199     list [file size test.db] [file size test.db-log]
   199         -} {0 56}
          200  +} {65536 74}
          201  +
          202  +do_test 4.4 {
          203  +  list [db_fetch db a] [db_fetch db b] [db_fetch db c]
          204  +} {alpha bravo charlie}
          205  +
          206  +do_test 4.5 {
          207  +  T close
          208  +  list [db_fetch db a] [db_fetch db b] [db_fetch db c]
          209  +} {alpha BRAVO charlie}
   200    210   
   201    211   finish_test
   202    212