SQLite4
Check-in [08cc3604cf]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Changes to allow read-only clients to safely work with live databases.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | read-only-clients
Files: files | file ages | folders
SHA1: 08cc3604cf95dcedd6276057019594214b2ff9c6
User & Date: dan 2013-02-19 19:35:33
Context
2013-02-19
20:16
Add a test case for a read-only transaction outlasting an entire read-write session. And a fix. check-in: 3f53258219 user: dan tags: read-only-clients
19:35
Changes to allow read-only clients to safely work with live databases. check-in: 08cc3604cf user: dan tags: read-only-clients
2013-02-18
19:46
Add support for read-only clients reading from dormant databases using the checkpointer lock. check-in: 45e4472618 user: dan tags: read-only-clients
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_tdb3.c.

344
345
346
347
348
349
350










351
352
353
354
355
356
357
...
935
936
937
938
939
940
941

942
943
944
945
946
947
948
  lsm_env *pRealEnv = tdb_lsm_env();

  if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
    return LSM_BUSY;
  }
  return pRealEnv->xLock(p->pReal, iLock, eType);
}











static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();
  return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp);
}

................................................................................
  pDb->env.xSync = testEnvSync;
  pDb->env.xSectorSize = testEnvSectorSize;
  pDb->env.xRemap = testEnvRemap;
  pDb->env.xFileid = testEnvFileid;
  pDb->env.xClose = testEnvClose;
  pDb->env.xUnlink = testEnvUnlink;
  pDb->env.xLock = testEnvLock;

  pDb->env.xShmBarrier = testEnvShmBarrier;
  pDb->env.xShmMap = testEnvShmMap;
  pDb->env.xShmUnmap = testEnvShmUnmap;
  pDb->env.xSleep = testEnvSleep;

  rc = lsm_new(&pDb->env, &pDb->db);
  if( rc==LSM_OK ){







>
>
>
>
>
>
>
>
>
>







 







>







344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
...
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
  lsm_env *pRealEnv = tdb_lsm_env();

  if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
    return LSM_BUSY;
  }
  return pRealEnv->xLock(p->pReal, iLock, eType);
}

static int testEnvTestLock(lsm_file *pFile, int iLock, int nLock, int eType){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();

  if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
    return LSM_BUSY;
  }
  return pRealEnv->xTestLock(p->pReal, iLock, nLock, eType);
}

static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){
  LsmFile *p = (LsmFile *)pFile;
  lsm_env *pRealEnv = tdb_lsm_env();
  return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp);
}

................................................................................
  pDb->env.xSync = testEnvSync;
  pDb->env.xSectorSize = testEnvSectorSize;
  pDb->env.xRemap = testEnvRemap;
  pDb->env.xFileid = testEnvFileid;
  pDb->env.xClose = testEnvClose;
  pDb->env.xUnlink = testEnvUnlink;
  pDb->env.xLock = testEnvLock;
  pDb->env.xTestLock = testEnvTestLock;
  pDb->env.xShmBarrier = testEnvShmBarrier;
  pDb->env.xShmMap = testEnvShmMap;
  pDb->env.xShmUnmap = testEnvShmUnmap;
  pDb->env.xSleep = testEnvSleep;

  rc = lsm_new(&pDb->env, &pDb->db);
  if( rc==LSM_OK ){

Changes to src/lsm.h.

60
61
62
63
64
65
66

67
68
69
70
71
72
73
  int (*xSync)(lsm_file *);
  int (*xSectorSize)(lsm_file *);
  int (*xRemap)(lsm_file *, lsm_i64, void **, lsm_i64*);
  int (*xFileid)(lsm_file *, void *pBuf, int *pnBuf);
  int (*xClose)(lsm_file *);
  int (*xUnlink)(lsm_env*, const char *);
  int (*xLock)(lsm_file*, int, int);

  int (*xShmMap)(lsm_file*, int, int, void **);
  void (*xShmBarrier)(void);
  int (*xShmUnmap)(lsm_file*, int);
  /****** memory allocation ****************************************/
  void *pMemCtx;
  void *(*xMalloc)(lsm_env*, int);            /* malloc(3) function */
  void *(*xRealloc)(lsm_env*, void *, int);   /* realloc(3) function */







>







60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
  int (*xSync)(lsm_file *);
  int (*xSectorSize)(lsm_file *);
  int (*xRemap)(lsm_file *, lsm_i64, void **, lsm_i64*);
  int (*xFileid)(lsm_file *, void *pBuf, int *pnBuf);
  int (*xClose)(lsm_file *);
  int (*xUnlink)(lsm_env*, const char *);
  int (*xLock)(lsm_file*, int, int);
  int (*xTestLock)(lsm_file*, int, int, int);
  int (*xShmMap)(lsm_file*, int, int, void **);
  void (*xShmBarrier)(void);
  int (*xShmUnmap)(lsm_file*, int);
  /****** memory allocation ****************************************/
  void *pMemCtx;
  void *(*xMalloc)(lsm_env*, int);            /* malloc(3) function */
  void *(*xRealloc)(lsm_env*, void *, int);   /* realloc(3) function */

Changes to src/lsmInt.h.

133
134
135
136
137
138
139
140
141

142
143
144
145
146
147
148
149
150
151
...
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
...
733
734
735
736
737
738
739

740
741
742
743
744
745
746
/* The number of available read locks. */
#define LSM_LOCK_NREADER   6

/* The number of available read-write client locks. */
#define LSM_LOCK_NRWCLIENT   16

/* Lock definitions. */
#define LSM_LOCK_DMS1         1
#define LSM_LOCK_DMS2         2

#define LSM_LOCK_WRITER       3
#define LSM_LOCK_WORKER       4
#define LSM_LOCK_CHECKPOINTER 5
#define LSM_LOCK_READER(i)    ((i) + LSM_LOCK_CHECKPOINTER + 1)
#define LSM_LOCK_RWCLIENT(i)  ((i) + LSM_LOCK_READER(LSM_LOCK_NREADER))

/*
** Hard limit on the number of free-list entries that may be stored in 
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
................................................................................
  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *, int, u32 *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoadWorker(lsm_db *pDb);
................................................................................
int lsmInfoArrayStructure(lsm_db *pDb, int bBlock, Pgno iFirst, char **pzOut);
int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmConfigMmap(lsm_db *pDb, int *piParam);

int lsmEnvOpen(lsm_env *, const char *, int, lsm_file **);
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile);
int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock);


int lsmEnvShmMap(lsm_env *, lsm_file *, int, int, void **); 
void lsmEnvShmBarrier(lsm_env *);
void lsmEnvShmUnmap(lsm_env *, lsm_file *, int);

void lsmEnvSleep(lsm_env *, int);








|
|
>
|
|
|







 







|







 







>







133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
...
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
...
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
/* The number of available read locks. */
#define LSM_LOCK_NREADER   6

/* The number of available read-write client locks. */
#define LSM_LOCK_NRWCLIENT   16

/* Lock definitions. */
#define LSM_LOCK_DMS1         1   /* Serialize connect/disconnect ops */
#define LSM_LOCK_DMS2         2   /* Read-write connections */
#define LSM_LOCK_DMS3         3   /* Read-only connections */
#define LSM_LOCK_WRITER       4
#define LSM_LOCK_WORKER       5
#define LSM_LOCK_CHECKPOINTER 6
#define LSM_LOCK_READER(i)    ((i) + LSM_LOCK_CHECKPOINTER + 1)
#define LSM_LOCK_RWCLIENT(i)  ((i) + LSM_LOCK_READER(LSM_LOCK_NREADER))

/*
** Hard limit on the number of free-list entries that may be stored in 
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
................................................................................
  u32 nWrite;                     /* Total number of pages written to disk */
};
#define LSM_INITIAL_SNAPSHOT_ID 11

/*
** Functions from file "lsm_ckpt.c".
*/
int lsmCheckpointWrite(lsm_db *, int, int, u32 *);
int lsmCheckpointLevels(lsm_db *, int, void **, int *);
int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoadWorker(lsm_db *pDb);
................................................................................
int lsmInfoArrayStructure(lsm_db *pDb, int bBlock, Pgno iFirst, char **pzOut);
int lsmInfoArrayPages(lsm_db *pDb, Pgno iFirst, char **pzOut);
int lsmConfigMmap(lsm_db *pDb, int *piParam);

int lsmEnvOpen(lsm_env *, const char *, int, lsm_file **);
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile);
int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock);
int lsmEnvTestLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int nLock, int);

int lsmEnvShmMap(lsm_env *, lsm_file *, int, int, void **); 
void lsmEnvShmBarrier(lsm_env *);
void lsmEnvShmUnmap(lsm_env *, lsm_file *, int);

void lsmEnvSleep(lsm_env *, int);

Changes to src/lsm_ckpt.c.

918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
  ShmHeader *pShm = pDb->pShmhdr;
  int nInt1;
  int nInt2;

  /* Must be holding the WORKER lock to do this. Or DMS2. */
  assert( 
      lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) 
   || lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL) 
  );

  /* Check that the two snapshots match. If not, repair them. */
  nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT];
  nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT];
  if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){
    if( ckptChecksumOk(pShm->aSnap1) ){







|







918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
  ShmHeader *pShm = pDb->pShmhdr;
  int nInt1;
  int nInt2;

  /* Must be holding the WORKER lock to do this. Or DMS2. */
  assert( 
      lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) 
   || lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL) 
  );

  /* Check that the two snapshots match. If not, repair them. */
  nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT];
  nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT];
  if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){
    if( ckptChecksumOk(pShm->aSnap1) ){

Changes to src/lsm_file.c.

349
350
351
352
353
354
355










356
357
358
359
360
361
362
...
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
  return pEnv->xRemap(pFile, szMin, ppMap, pszMap);
}

int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock){
  if( pFile==0 ) return LSM_OK;
  return pEnv->xLock(pFile, iLock, eLock);
}











int lsmEnvShmMap(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  int iChunk, 
  int sz, 
  void **ppOut
................................................................................
*/
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte){
  if( pFS->fdLog==0 ) return LSM_OK;
  return lsmEnvTruncate(pFS->pEnv, pFS->fdLog, nByte);
}

/*
** Truncate the log file to nByte bytes in size.
*/
int lsmFsTruncateDb(FileSystem *pFS, i64 nByte){
  if( pFS->fdDb==0 ) return LSM_OK;
  return lsmEnvTruncate(pFS->pEnv, pFS->fdDb, nByte);
}

/*







>
>
>
>
>
>
>
>
>
>







 







|







349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
...
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
  return pEnv->xRemap(pFile, szMin, ppMap, pszMap);
}

int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock){
  if( pFile==0 ) return LSM_OK;
  return pEnv->xLock(pFile, iLock, eLock);
}

int lsmEnvTestLock(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  int iLock, 
  int nLock, 
  int eLock
){
  return pEnv->xTestLock(pFile, iLock, nLock, eLock);
}

int lsmEnvShmMap(
  lsm_env *pEnv, 
  lsm_file *pFile, 
  int iChunk, 
  int sz, 
  void **ppOut
................................................................................
*/
int lsmFsTruncateLog(FileSystem *pFS, i64 nByte){
  if( pFS->fdLog==0 ) return LSM_OK;
  return lsmEnvTruncate(pFS->pEnv, pFS->fdLog, nByte);
}

/*
** Truncate the db file to nByte bytes in size.
*/
int lsmFsTruncateDb(FileSystem *pFS, i64 nByte){
  if( pFS->fdDb==0 ) return LSM_OK;
  return lsmEnvTruncate(pFS->pEnv, pFS->fdDb, nByte);
}

/*

Changes to src/lsm_shared.c.

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
...
238
239
240
241
242
243
244



245
246
247
248
249
250
251
252
253
254


255
256
257
258
259
260
261
262




263
264
265
266
267
268











269
270
271
272
273
274
275
276
277

278
279

280
281
282
283
284


285
286
287
288
289
290

291
292
293
294
295
296
297
298
299
300

301
302
303
304
305
306
307
...
312
313
314
315
316
317
318
319
320
321

322
323
324
325
326
327
328





329
330
331
332
333
334
335
...
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
...
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
...
899
900
901
902
903
904
905



906
907
908
909
910
911
912
....
1156
1157
1158
1159
1160
1161
1162
1163










1164


1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178








1179
1180
1181
1182
1183
1184
1185
....
1594
1595
1596
1597
1598
1599
1600











































1601
1602
1603
1604
1605
1606
1607
....
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
** to as small a size as possible without truncating away any blocks that
** contain data.
*/
static int dbTruncateFile(lsm_db *pDb){
  int rc;

  assert( pDb->pWorker==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL) );
  rc = lsmCheckpointLoadWorker(pDb);

  if( rc==LSM_OK ){
    DbTruncateCtx ctx;

    /* Walk the database free-block-list in reverse order. Set ctx.nBlock
    ** to the block number of the last block in the database that actually
................................................................................
  pDb->pWorker = 0;
  return rc;
}

static void doDbDisconnect(lsm_db *pDb){
  int rc;




  /* Block for an exclusive lock on DMS1. This lock serializes all calls
  ** to doDbConnect() and doDbDisconnect() across all processes.  */
  rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
  if( rc==LSM_OK ){

    /* Try an exclusive lock on DMS2. If successful, this is the last
    ** connection to the database. In this case flush the contents of the
    ** in-memory tree to disk and write a checkpoint.  */
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0);
    if( rc==LSM_OK ){


      /* Flush the in-memory tree, if required. If there is data to flush,
      ** this will create a new client snapshot in Database.pClient. The
      ** checkpoint (serialization) of this snapshot may be written to disk
      ** by the following block.  
      **
      ** There is no need to mess around with WRITER locks or anything at
      ** this point. The lock on DMS2 guarantees that pDb has exclusive
      ** access to the db at this point.




      */
      rc = lsmTreeLoadHeader(pDb, 0);
      if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
        rc = lsmFlushTreeToDisk(pDb);
      }












      /* Write a checkpoint to disk. */
      if( rc==LSM_OK ){
        rc = lsmCheckpointWrite(pDb, 1, 0);
      }

      /* If the checkpoint was written successfully, delete the log file
      ** and, if possible, truncate the database file.  */
      if( rc==LSM_OK ){
        Database *p = pDb->pDatabase;

        dbTruncateFile(pDb);
        lsmFsCloseAndDeleteLog(pDb->pFS);

        if( p->pFile && p->bMultiProc ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
      }
    }
  }



  if( pDb->iRwclient>=0 ){
    lsmShmLock(pDb, LSM_LOCK_RWCLIENT(pDb->iRwclient), LSM_LOCK_UNLOCK, 0);
  }

  lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
  lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);

  pDb->pShmhdr = 0;
}

static int doDbConnect(lsm_db *pDb){
  const int nUsMax = 100000;      /* Max value for nUs */
  int nUs = 1000;                 /* us to wait between DMS1 attempts */
  int rc;

  /* Obtain a pointer to the shared-memory header */
  assert( pDb->pShmhdr==0 );

  rc = lsmShmCacheChunks(pDb, 1);
  if( rc!=LSM_OK ) return rc;
  pDb->pShmhdr = (ShmHeader *)pDb->apShm[0];

  /* Block for an exclusive lock on DMS1. This lock serializes all calls
  ** to doDbConnect() and doDbDisconnect() across all processes.  */
  while( 1 ){
................................................................................
    if( nUs>nUsMax ) nUs = nUsMax;
  }
  if( rc!=LSM_OK ){
    pDb->pShmhdr = 0;
    return rc;
  }

  /* Try an exclusive lock on DMS2. If successful, this is the first and 
  ** only connection to the database. In this case initialize the 
  ** shared-memory and run log file recovery.  */

  rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0);
  if( rc==LSM_OK ){
    memset(pDb->pShmhdr, 0, sizeof(ShmHeader));
    rc = lsmCheckpointRecover(pDb);
    if( rc==LSM_OK ){
      rc = lsmLogRecover(pDb);
    }





  }else if( rc==LSM_BUSY ){
    rc = LSM_OK;
  }

  /* Take a shared lock on DMS2. In multi-process mode this lock "cannot" 
  ** fail, as connections may only hold an exclusive lock on DMS2 if they 
  ** first hold an exclusive lock on DMS1. And this connection is currently 
................................................................................
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0);
  }

  /* If anything went wrong, unlock DMS2. Otherwise, try to take an exclusive
  ** lock on one of the LSM_LOCK_RWCLIENT() locks. Unlock DMS1 in any case. */
  if( rc!=LSM_OK ){
    lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
    pDb->pShmhdr = 0;
  }else{
    int i;
    for(i=0; i<LSM_LOCK_NRWCLIENT; i++){
      int rc2 = lsmShmLock(pDb, LSM_LOCK_RWCLIENT(i), LSM_LOCK_EXCL, 0);
      if( rc2==LSM_OK ) pDb->iRwclient = i;
      if( rc2!=LSM_BUSY ){
................................................................................
** database itself.
**
** The WORKER lock must not be held when this is called. This is because
** this function may indirectly call fsync(). And the WORKER lock should
** not be held that long (in case it is required by a client flushing an
** in-memory tree to disk).
*/
int lsmCheckpointWrite(lsm_db *pDb, int bTruncate, u32 *pnWrite){
  int rc;                         /* Return Code */
  u32 nWrite = 0;

  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

................................................................................
      );
#endif
    }

    if( rc==LSM_OK && bTruncate ){
      rc = lsmFsTruncateDb(pDb->pFS, (i64)nBlock*lsmFsBlockSize(pDb->pFS));
    }



  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
  return rc;
}

................................................................................
int lsmBeginRoTrans(lsm_db *db){
  int rc = LSM_OK;

  assert( db->bReadonly && db->pShmhdr==0 );
  assert( db->iReader<0 );

  if( db->bRoTrans==0 ){
    if( 1 ){










      rc = lsmShmLock(db, LSM_LOCK_CHECKPOINTER, LSM_LOCK_SHARED, 0);


      if( rc==LSM_OK ){
        db->bRoTrans = 1;
        rc = lsmShmCacheChunks(db, 1);
        if( rc==LSM_OK ){
          db->pShmhdr = (ShmHeader *)db->apShm[0];
          memset(db->pShmhdr, 0, sizeof(ShmHeader));
          rc = lsmCheckpointRecover(db);
          if( rc==LSM_OK ){
            rc = lsmLogRecover(db);
          }
        }
      }
    }else{
      /* lock(DMS2, SHARED) etc. */








    }

    if( rc==LSM_OK ){
      rc = lsmBeginReadTrans(db);
    }
  }

................................................................................
static int lockSharedFile(lsm_env *pEnv, Database *p, int iLock, int eOp){
  int rc = LSM_OK;
  if( p->bMultiProc ){
    rc = lsmEnvLock(pEnv, p->pFile, iLock, eOp);
  }
  return rc;
}












































/*
** Attempt to obtain the lock identified by the iLock and bExcl parameters.
** If successful, return LSM_OK. If the lock cannot be obtained because 
** there exists some other conflicting lock, return LSM_BUSY. If some other
** error occurs, return an LSM error code.
**
................................................................................

int lsm_checkpoint(lsm_db *pDb, int *pnKB){
  int rc;                         /* Return code */
  u32 nWrite = 0;                 /* Number of pages checkpointed */

  /* Attempt the checkpoint. If successful, nWrite is set to the number of
  ** pages written between this and the previous checkpoint.  */
  rc = lsmCheckpointWrite(pDb, 0, &nWrite);

  /* If required, calculate the output variable (KB of data checkpointed). 
  ** Set it to zero if an error occured.  */
  if( pnKB ){
    int nKB = 0;
    if( rc==LSM_OK && nWrite ){
      nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;







|







 







>
>
>
|
|
|
|

|
|
|
|
|
>
>
|
|
|
|
|
<
<
<
>
>
>
>
|
|
|
|
|

>
>
>
>
>
>
>
>
>
>
>
|
|
|
|

|
|
|
|
>
|
<
>
|
|
|
|
|
>
>
|
|
|

|
|
>










>







 







|
|

>
|






>
>
>
>
>







 







<







 







|







 







>
>
>







 







<
>
>
>
>
>
>
>
>
>
>

>
>












|
|
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|







208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
...
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296

297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
...
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
...
370
371
372
373
374
375
376

377
378
379
380
381
382
383
...
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
...
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
....
1186
1187
1188
1189
1190
1191
1192

1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
....
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
....
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
** to as small a size as possible without truncating away any blocks that
** contain data.
*/
static int dbTruncateFile(lsm_db *pDb){
  int rc;

  assert( pDb->pWorker==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL) );
  rc = lsmCheckpointLoadWorker(pDb);

  if( rc==LSM_OK ){
    DbTruncateCtx ctx;

    /* Walk the database free-block-list in reverse order. Set ctx.nBlock
    ** to the block number of the last block in the database that actually
................................................................................
  pDb->pWorker = 0;
  return rc;
}

static void doDbDisconnect(lsm_db *pDb){
  int rc;

  if( pDb->bReadonly ){
    lsmShmLock(pDb, LSM_LOCK_DMS3, LSM_LOCK_UNLOCK, 0);
  }else{
    /* Block for an exclusive lock on DMS1. This lock serializes all calls
    ** to doDbConnect() and doDbDisconnect() across all processes.  */
    rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
    if( rc==LSM_OK ){

      /* Try an exclusive lock on DMS2. If successful, this is the last
      ** connection to the database. In this case flush the contents of the
      ** in-memory tree to disk and write a checkpoint.  */
      rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 1, LSM_LOCK_EXCL);
      if( rc==LSM_OK ){
        int bReadonly = 0;        /* True if there exist read-only conns. */

        /* Flush the in-memory tree, if required. If there is data to flush,
        ** this will create a new client snapshot in Database.pClient. The
        ** checkpoint (serialization) of this snapshot may be written to disk
        ** by the following block.  
        **



        ** There is no need to take a WRITER lock here. That there are no 
        ** other locks on DMS2 guarantees that there are no other read-write
        ** connections at this time (and the lock on DMS1 guarantees that
        ** no new ones may appear).
        */
        rc = lsmTreeLoadHeader(pDb, 0);
        if( rc==LSM_OK && (lsmTreeHasOld(pDb) || lsmTreeSize(pDb)>0) ){
          rc = lsmFlushTreeToDisk(pDb);
        }

        /* Now check if there are any read-only connections. If there are,
        ** then do not truncate the db file or unlink the shared-memory 
        ** region.  */
        if( rc==LSM_OK ){
          rc = lsmShmTestLock(pDb, LSM_LOCK_DMS3, 1, LSM_LOCK_EXCL);
          if( rc==LSM_BUSY ){
            bReadonly = 1;
            rc = LSM_OK;
          }
        }

        /* Write a checkpoint to disk. */
        if( rc==LSM_OK ){
          rc = lsmCheckpointWrite(pDb, (bReadonly==0), 1, 0);
        }

        /* If the checkpoint was written successfully, delete the log file
        ** and, if possible, truncate the database file.  */
        if( rc==LSM_OK ){
          Database *p = pDb->pDatabase;
          if( bReadonly==0 ){
            dbTruncateFile(pDb);

            if( p->pFile && p->bMultiProc ){
              lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
            }
          }
        }
      }
    }

    if( pDb->iRwclient>=0 ){
      lsmShmLock(pDb, LSM_LOCK_RWCLIENT(pDb->iRwclient), LSM_LOCK_UNLOCK, 0);
    }

    lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
    lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
  }
  pDb->pShmhdr = 0;
}

static int doDbConnect(lsm_db *pDb){
  const int nUsMax = 100000;      /* Max value for nUs */
  int nUs = 1000;                 /* us to wait between DMS1 attempts */
  int rc;

  /* Obtain a pointer to the shared-memory header */
  assert( pDb->pShmhdr==0 );
  assert( pDb->bReadonly==0 );
  rc = lsmShmCacheChunks(pDb, 1);
  if( rc!=LSM_OK ) return rc;
  pDb->pShmhdr = (ShmHeader *)pDb->apShm[0];

  /* Block for an exclusive lock on DMS1. This lock serializes all calls
  ** to doDbConnect() and doDbDisconnect() across all processes.  */
  while( 1 ){
................................................................................
    if( nUs>nUsMax ) nUs = nUsMax;
  }
  if( rc!=LSM_OK ){
    pDb->pShmhdr = 0;
    return rc;
  }

  /* Try an exclusive lock on DMS2/DMS3. If successful, this is the first 
  ** and only connection to the database. In this case initialize the 
  ** shared-memory and run log file recovery.  */
  assert( LSM_LOCK_DMS3==1+LSM_LOCK_DMS2 );
  rc = lsmShmTestLock(pDb, LSM_LOCK_DMS2, 2, LSM_LOCK_EXCL);
  if( rc==LSM_OK ){
    memset(pDb->pShmhdr, 0, sizeof(ShmHeader));
    rc = lsmCheckpointRecover(pDb);
    if( rc==LSM_OK ){
      rc = lsmLogRecover(pDb);
    }
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      pShm->aReader[0].iLsmId = lsmCheckpointId(pShm->aSnap1, 0);
      pShm->aReader[0].iTreeId = pDb->treehdr.iUsedShmid;
    }
  }else if( rc==LSM_BUSY ){
    rc = LSM_OK;
  }

  /* Take a shared lock on DMS2. In multi-process mode this lock "cannot" 
  ** fail, as connections may only hold an exclusive lock on DMS2 if they 
  ** first hold an exclusive lock on DMS1. And this connection is currently 
................................................................................
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0);
  }

  /* If anything went wrong, unlock DMS2. Otherwise, try to take an exclusive
  ** lock on one of the LSM_LOCK_RWCLIENT() locks. Unlock DMS1 in any case. */
  if( rc!=LSM_OK ){

    pDb->pShmhdr = 0;
  }else{
    int i;
    for(i=0; i<LSM_LOCK_NRWCLIENT; i++){
      int rc2 = lsmShmLock(pDb, LSM_LOCK_RWCLIENT(i), LSM_LOCK_EXCL, 0);
      if( rc2==LSM_OK ) pDb->iRwclient = i;
      if( rc2!=LSM_BUSY ){
................................................................................
** database itself.
**
** The WORKER lock must not be held when this is called. This is because
** this function may indirectly call fsync(). And the WORKER lock should
** not be held that long (in case it is required by a client flushing an
** in-memory tree to disk).
*/
int lsmCheckpointWrite(lsm_db *pDb, int bTruncate, int bDellog, u32 *pnWrite){
  int rc;                         /* Return Code */
  u32 nWrite = 0;

  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

................................................................................
      );
#endif
    }

    if( rc==LSM_OK && bTruncate ){
      rc = lsmFsTruncateDb(pDb->pFS, (i64)nBlock*lsmFsBlockSize(pDb->pFS));
    }
    if( rc==LSM_OK && bDellog ){
      lsmFsCloseAndDeleteLog(pDb->pFS);
    }
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  if( pnWrite && rc==LSM_OK ) *pnWrite = nWrite;
  return rc;
}

................................................................................
int lsmBeginRoTrans(lsm_db *db){
  int rc = LSM_OK;

  assert( db->bReadonly && db->pShmhdr==0 );
  assert( db->iReader<0 );

  if( db->bRoTrans==0 ){


    /* Attempt a shared-lock on DMS1. */
    rc = lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_SHARED, 0);
    if( rc!=LSM_OK ) return rc;

    rc = lsmShmTestLock(
        db, LSM_LOCK_RWCLIENT(0), LSM_LOCK_NREADER, LSM_LOCK_SHARED
    );
    if( rc==LSM_OK ){
      /* System is not live */
      rc = lsmShmLock(db, LSM_LOCK_CHECKPOINTER, LSM_LOCK_SHARED, 0);
      lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);

      if( rc==LSM_OK ){
        db->bRoTrans = 1;
        rc = lsmShmCacheChunks(db, 1);
        if( rc==LSM_OK ){
          db->pShmhdr = (ShmHeader *)db->apShm[0];
          memset(db->pShmhdr, 0, sizeof(ShmHeader));
          rc = lsmCheckpointRecover(db);
          if( rc==LSM_OK ){
            rc = lsmLogRecover(db);
          }
        }
      }
    }else if( rc==LSM_BUSY ){
      /* System is live! */
      rc = lsmShmLock(db, LSM_LOCK_DMS3, LSM_LOCK_SHARED, 0);
      lsmShmLock(db, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
      if( rc==LSM_OK ){
        rc = lsmShmCacheChunks(db, 1);
        if( rc==LSM_OK ){
          db->pShmhdr = (ShmHeader *)db->apShm[0];
        }
      }
    }

    if( rc==LSM_OK ){
      rc = lsmBeginReadTrans(db);
    }
  }

................................................................................
static int lockSharedFile(lsm_env *pEnv, Database *p, int iLock, int eOp){
  int rc = LSM_OK;
  if( p->bMultiProc ){
    rc = lsmEnvLock(pEnv, p->pFile, iLock, eOp);
  }
  return rc;
}

/*
** Test if it would be possible for connection db to obtain a lock of type
** eType on the nLock locks starting at iLock. If so, return LSM_OK. If it
** would not be possible to obtain the lock due to a lock held by another
** connection, return LSM_BUSY. If an IO or other error occurs (i.e. in the 
** lsm_env.xTestLock function), return some other LSM error code.
**
** Note that this function never actually locks the database - it merely
** queries the system to see if there exists a lock that would prevent
** it from doing so.
*/
int lsmShmTestLock(
  lsm_db *db,
  int iLock,
  int nLock,
  int eOp
){
  int rc = LSM_OK;
  lsm_db *pIter;
  Database *p = db->pDatabase;
  int i;
  u64 mask = 0;

  for(i=iLock; i<(iLock+nLock); i++){
    mask |= ((u64)1 << (iLock-1));
    if( eOp==LSM_LOCK_EXCL ) mask |= ((u64)1 << (iLock+32-1));
  }

  lsmMutexEnter(db->pEnv, p->pClientMutex);
  for(pIter=p->pConn; pIter; pIter=pIter->pNext){
    if( pIter!=db && (pIter->mLock & mask) ) break;
  }

  if( pIter ){
    rc = LSM_BUSY;
  }else if( p->bMultiProc ){
    rc = lsmEnvTestLock(db->pEnv, p->pFile, iLock, nLock, eOp);
  }

  lsmMutexLeave(db->pEnv, p->pClientMutex);
  return rc;
}

/*
** Attempt to obtain the lock identified by the iLock and bExcl parameters.
** If successful, return LSM_OK. If the lock cannot be obtained because 
** there exists some other conflicting lock, return LSM_BUSY. If some other
** error occurs, return an LSM error code.
**
................................................................................

int lsm_checkpoint(lsm_db *pDb, int *pnKB){
  int rc;                         /* Return code */
  u32 nWrite = 0;                 /* Number of pages checkpointed */

  /* Attempt the checkpoint. If successful, nWrite is set to the number of
  ** pages written between this and the previous checkpoint.  */
  rc = lsmCheckpointWrite(pDb, 0, 0, &nWrite);

  /* If required, calculate the output variable (KB of data checkpointed). 
  ** Set it to zero if an error occured.  */
  if( pnKB ){
    int nKB = 0;
    if( rc==LSM_OK && nWrite ){
      nKB = (((i64)nWrite * lsmFsPageSize(pDb->pFS)) + 1023) / 1024;

Changes to src/lsm_unix.c.

326
327
328
329
330
331
332



























333
334
335
336
337
338
339
...
678
679
680
681
682
683
684

685
686
687
688
689
690
691
    }else{
      rc = LSM_IOERR_BKPT;
    }
  }

  return rc;
}




























int lsmPosixOsShmMap(lsm_file *pFile, int iChunk, int sz, void **ppShm){
  PosixFile *p = (PosixFile *)pFile;

  *ppShm = 0;
  assert( sz==LSM_SHM_CHUNK_SIZE );
  if( iChunk>=p->nShm ){
................................................................................
    lsmPosixOsSync,          /* xSync */
    lsmPosixOsSectorSize,    /* xSectorSize */
    lsmPosixOsRemap,         /* xRemap */
    lsmPosixOsFileid,        /* xFileid */
    lsmPosixOsClose,         /* xClose */
    lsmPosixOsUnlink,        /* xUnlink */
    lsmPosixOsLock,          /* xLock */

    lsmPosixOsShmMap,        /* xShmMap */
    lsmPosixOsShmBarrier,    /* xShmBarrier */
    lsmPosixOsShmUnmap,      /* xShmUnmap */
    /***** memory allocation *********/
    0,                       /* pMemCtx */
    lsmPosixOsMalloc,        /* xMalloc */
    lsmPosixOsRealloc,       /* xRealloc */







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>







326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
...
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
    }else{
      rc = LSM_IOERR_BKPT;
    }
  }

  return rc;
}

int lsmPosixOsTestLock(lsm_file *pFile, int iLock, int nLock, int eType){
  int rc = LSM_OK;
  PosixFile *p = (PosixFile *)pFile;
  static const short aType[3] = { 0, F_RDLCK, F_WRLCK };
  struct flock lock;

  assert( eType==LSM_LOCK_SHARED || eType==LSM_LOCK_EXCL );
  assert( aType[LSM_LOCK_SHARED]==F_RDLCK );
  assert( aType[LSM_LOCK_EXCL]==F_WRLCK );
  assert( eType>=0 && eType<array_size(aType) );
  assert( iLock>0 && iLock<=32 );

  memset(&lock, 0, sizeof(lock));
  lock.l_whence = SEEK_SET;
  lock.l_len = nLock;
  lock.l_type = aType[eType];
  lock.l_start = (4096-iLock);

  if( fcntl(p->fd, F_GETLK, &lock) ){
    rc = LSM_IOERR_BKPT;
  }else if( lock.l_type!=F_UNLCK ){
    rc = LSM_BUSY;
  }

  return rc;
}

int lsmPosixOsShmMap(lsm_file *pFile, int iChunk, int sz, void **ppShm){
  PosixFile *p = (PosixFile *)pFile;

  *ppShm = 0;
  assert( sz==LSM_SHM_CHUNK_SIZE );
  if( iChunk>=p->nShm ){
................................................................................
    lsmPosixOsSync,          /* xSync */
    lsmPosixOsSectorSize,    /* xSectorSize */
    lsmPosixOsRemap,         /* xRemap */
    lsmPosixOsFileid,        /* xFileid */
    lsmPosixOsClose,         /* xClose */
    lsmPosixOsUnlink,        /* xUnlink */
    lsmPosixOsLock,          /* xLock */
    lsmPosixOsTestLock,      /* xTestLock */
    lsmPosixOsShmMap,        /* xShmMap */
    lsmPosixOsShmBarrier,    /* xShmBarrier */
    lsmPosixOsShmUnmap,      /* xShmUnmap */
    /***** memory allocation *********/
    0,                       /* pMemCtx */
    lsmPosixOsMalloc,        /* xMalloc */
    lsmPosixOsRealloc,       /* xRealloc */

Changes to test/lsm5.test.

31
32
33
34
35
36
37











38
39
40
41
42
43
44
..
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
...
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127




128













129


























130
131
  forcedelete $file
  lsm_open db $file
  db write a alpha
  db write b bravo
  db write c charlie
  db close
}












#-------------------------------------------------------------------------
# When the database system is shut down (i.e. when the last connection
# disconnects), an attempt is made to truncate the database file to the
# minimum number of blocks required.
# 
# This test case checks that this process does not actually cause the
................................................................................
} {}
do_test 1.3 {
  expr [file size test.db] < (64*1024)
} 1

#-------------------------------------------------------------------------
# Test that if an attempt is made to open a read-write connection to a 
# database that the client does not have permission to write to is attempted
# an error is reported. In order to open a read-write connection to a
# database, the client requires:
#
#   * read-write access to the db file,
#   * read-write access to the log file,
#   * for multi-process mode, read-write access to the shm file.
#
# In the above, "read-write access" includes the ability to create the db,
# log or shm file if it does not exist.
#
# These tests verify that the lsm_open() command returns LSM_IOERR. At some
# point in the future this will be improved. Likely when sqlite4 level tests 
# for opening read-only databases are added.
# 

foreach {tn filename setup} {

  1 test.dir/test.db {
    # Create a directory "test.dir".
    forcedelete test.dir
    file mkdir test.dir

................................................................................
    create_abc_db test.dir/test.db
  
    # Now make test.dir read-only.
    file attr test.dir -perm r-xr-xr-x
  }

} {

  do_test 2.$tn.1 {
    eval $setup
    set rc [catch {lsm_open db $filename} msg]
    list $rc $msg
  } {1 {error in lsm_open() - 10}}

  do_test 2.$tn.2 {
    eval $setup
    lsm_open db $filename {readonly 1}
    set res [list [db_fetch db a] [db_fetch db b] [db_fetch db c]]
    db close
    set res
  } {alpha bravo charlie}

}













































finish_test








>
>
>
>
>
>
>
>
>
>
>







 







|
|
|












<







 







<













|
|
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
..
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

88
89
90
91
92
93
94
...
115
116
117
118
119
120
121

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
  forcedelete $file
  lsm_open db $file
  db write a alpha
  db write b bravo
  db write c charlie
  db close
}

proc create_abc_log {file} {
  forcedelete $file ${file}-2
  lsm_open db ${file}-2
  db write a alpha
  db write b bravo
  db write c charlie
  file copy ${file}-2 $file
  file copy ${file}-2-log $file-log
  db close
}

#-------------------------------------------------------------------------
# When the database system is shut down (i.e. when the last connection
# disconnects), an attempt is made to truncate the database file to the
# minimum number of blocks required.
# 
# This test case checks that this process does not actually cause the
................................................................................
} {}
do_test 1.3 {
  expr [file size test.db] < (64*1024)
} 1

#-------------------------------------------------------------------------
# Test that if an attempt is made to open a read-write connection to a 
# non-live database that the client does not have permission to write to is
# attempted an error is reported. In order to open a read-write connection 
# to a database, the client requires:
#
#   * read-write access to the db file,
#   * read-write access to the log file,
#   * for multi-process mode, read-write access to the shm file.
#
# In the above, "read-write access" includes the ability to create the db,
# log or shm file if it does not exist.
#
# These tests verify that the lsm_open() command returns LSM_IOERR. At some
# point in the future this will be improved. Likely when sqlite4 level tests 
# for opening read-only databases are added.
# 

foreach {tn filename setup} {

  1 test.dir/test.db {
    # Create a directory "test.dir".
    forcedelete test.dir
    file mkdir test.dir

................................................................................
    create_abc_db test.dir/test.db
  
    # Now make test.dir read-only.
    file attr test.dir -perm r-xr-xr-x
  }

} {

  do_test 2.$tn.1 {
    eval $setup
    set rc [catch {lsm_open db $filename} msg]
    list $rc $msg
  } {1 {error in lsm_open() - 10}}

  do_test 2.$tn.2 {
    eval $setup
    lsm_open db $filename {readonly 1}
    set res [list [db_fetch db a] [db_fetch db b] [db_fetch db c]]
    db close
    set res
  } {alpha bravo charlie}
}

#-------------------------------------------------------------------------
# Try having a read-only connection connect to a non-live system where the
# log file contains content. In this scenario the read-only client must 
# read the contents from the log file at the start of each read-transaction. 
#
do_test 3.1 {
  create_abc_log test.db
  list [file size test.db] [file size test.db-log]
} {0 56}
do_test 3.2 {
  lsm_open db $filename {readonly 1}
  set res [list [db_fetch db a] [db_fetch db b] [db_fetch db c]]
  db close
  set res
} {alpha bravo charlie}
do_test 3.3 {
  list [file size test.db] [file size test.db-log]
} {0 56}

# Now make the same db live and check the read-only connection can still
# read it.
do_test 3.4 { file exists test.db-shm } 0
do_test 3.5 { 
  lsm_open db_rw test.db
  file exists test.db-shm
} 1
do_test 3.6 {
  lsm_open db test.db {readonly 1}
  list [db_fetch db a] [db_fetch db b] [db_fetch db c]
} {alpha bravo charlie}

# Close the read-write connection. This should cause a checkpoint and delete
# the log file, even though the system remains live.
do_test 3.7 { 
  db_rw close
  list [file exists test.db-log] [file exists test.db-shm]
} {0 1}

# Now close the read-only connection. The system is now non-live, but the 
# *-shm remains in the file-system (the readonly connection cannot unlink it).
do_test 3.8 { 
  db close
  list [file exists test.db-log] [file exists test.db-shm]
} {0 1}

finish_test