SQLite4
Check-in [5f4708d2e9]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add DMS "lock".
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | multi-process
Files: files | file ages | folders
SHA1: 5f4708d2e97c18eadb8b8f1d57d827f2172bc6e6
User & Date: dan 2012-08-31 18:43:30
Context
2012-08-31
20:26
Avoid reusing a block before it is guaranteed that it is not required even if a crash occurs. check-in: 5f9bb542f7 user: dan tags: multi-process
18:43
Add DMS "lock". check-in: 5f4708d2e9 user: dan tags: multi-process
2012-08-30
20:01
Remove dead code. Run "lomem" tests with max-freelist set to 4. check-in: d6c6889249 user: dan tags: multi-process
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

132
133
134
135
136
137
138


139
140
141
142
143
144
145
146
147
148
...
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
...
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
...
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
...
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
...
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
/* The number of bytes reserved at the start of each shm chunk for MM. */
#define LSM_SHM_CHUNK_HDR  (3 * 4)

/* The number of available read locks. */
#define LSM_LOCK_NREADER   6

/* Lock definitions */


#define LSM_LOCK_WRITER       1
#define LSM_LOCK_WORKER       2
#define LSM_LOCK_CHECKPOINTER 3
#define LSM_LOCK_READER(i)    ((i) + LSM_LOCK_CHECKPOINTER + 1)

/*
** Hard limit on the number of free-list entries that may be stored in 
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
*/
................................................................................
  i64 iLsmId;
};

/*
** An instance of this structure is stored in the first shared-memory
** page. The shared-memory header.
**
** bInit:
**   This value is set to non-zero once the contents of the ShmHeader are
**   initialized. In other words, once recovery has finished.
**
** bWriter:
**   Immediately after opening a write transaction taking the WRITER lock, 
**   each writer client sets this flag. It is cleared right before the 
**   WRITER lock is relinquished. If a subsequent writer finds that this
**   flag is already set when a write transaction is opened, this indicates
**   that a previous writer failed mid-transaction.
**
................................................................................
** hdr1, hdr2:
**   The two copies of the in-memory tree header. Two copies are required
**   in case a writer fails while updating one of them.
*/
struct ShmHeader {
  u32 aClient[LSM_META_PAGE_SIZE / 4];
  u32 aWorker[LSM_META_PAGE_SIZE / 4];
  u32 bInit;
  u32 bWriter;
  u32 iMetaPage;
  TreeHeader hdr1;
  TreeHeader hdr2;
  ShmReader aReader[LSM_LOCK_NREADER];
};

................................................................................
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast);
void lsmTreeCursorReset(TreeCursor *pCsr);
int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey);
int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal);
int lsmTreeCursorValid(TreeCursor *pCsr);
int lsmTreeCursorSave(TreeCursor *pCsr);

TreeVersion *lsmTreeReadVersion(Tree *);
int lsmTreeWriteVersion(lsm_env *pEnv, Tree *, TreeVersion **);
TreeVersion *lsmTreeRecoverVersion(Tree *);
int lsmTreeIsWriteVersion(TreeVersion *);
int lsmTreeReleaseWriteVersion(lsm_env *, TreeVersion *, int, TreeVersion **);
void lsmTreeReleaseReadVersion(lsm_env *, TreeVersion *);

/* 
** Functions from file "mem.c".
*/
int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool);
void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool);
void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte);
void *lsmPoolMallocZero(lsm_env *pEnv, Mempool *pPool, int nByte);
................................................................................
int lsmLogStructure(lsm_db *pDb, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".
*/

int lsmDbDatabaseFind(lsm_db*, const char *);
void lsmDbDatabaseRelease(lsm_db *);

int lsmBeginReadTrans(lsm_db *);
int lsmBeginWriteTrans(lsm_db *);
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
................................................................................

/* Candidate values for the 3rd argument to lsmShmLock() */
#define LSM_LOCK_UNLOCK 0
#define LSM_LOCK_SHARED 1
#define LSM_LOCK_EXCL   2

int lsmShmChunk(lsm_db *db, int iChunk, void **ppData);
int lsmShmLock(lsm_db *db, int iLock, int eOp);
void lsmShmBarrier(lsm_db *db);

#ifdef LSM_DEBUG
void lsmShmHasLock(lsm_db *db, int iLock, int eOp);
#else
# define lsmShmHasLock(x,y,z)
#endif







>
>
|
|
|







 







<
<
<
<







 







<







 







<
<
<
<
<
<
<







 







|







 







|







132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
...
378
379
380
381
382
383
384




385
386
387
388
389
390
391
...
397
398
399
400
401
402
403

404
405
406
407
408
409
410
...
513
514
515
516
517
518
519







520
521
522
523
524
525
526
...
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
...
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
/* The number of bytes reserved at the start of each shm chunk for MM. */
#define LSM_SHM_CHUNK_HDR  (3 * 4)

/* The number of available read locks. */
#define LSM_LOCK_NREADER   6

/* Lock definitions */
#define LSM_LOCK_DMS1         1
#define LSM_LOCK_DMS2         2
#define LSM_LOCK_WRITER       3
#define LSM_LOCK_WORKER       4
#define LSM_LOCK_CHECKPOINTER 5
#define LSM_LOCK_READER(i)    ((i) + LSM_LOCK_CHECKPOINTER + 1)

/*
** Hard limit on the number of free-list entries that may be stored in 
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
*/
................................................................................
  i64 iLsmId;
};

/*
** An instance of this structure is stored in the first shared-memory
** page. The shared-memory header.
**




** bWriter:
**   Immediately after opening a write transaction taking the WRITER lock, 
**   each writer client sets this flag. It is cleared right before the 
**   WRITER lock is relinquished. If a subsequent writer finds that this
**   flag is already set when a write transaction is opened, this indicates
**   that a previous writer failed mid-transaction.
**
................................................................................
** hdr1, hdr2:
**   The two copies of the in-memory tree header. Two copies are required
**   in case a writer fails while updating one of them.
*/
struct ShmHeader {
  u32 aClient[LSM_META_PAGE_SIZE / 4];
  u32 aWorker[LSM_META_PAGE_SIZE / 4];

  u32 bWriter;
  u32 iMetaPage;
  TreeHeader hdr1;
  TreeHeader hdr2;
  ShmReader aReader[LSM_LOCK_NREADER];
};

................................................................................
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast);
void lsmTreeCursorReset(TreeCursor *pCsr);
int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey);
int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal);
int lsmTreeCursorValid(TreeCursor *pCsr);
int lsmTreeCursorSave(TreeCursor *pCsr);








/* 
** Functions from file "mem.c".
*/
int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool);
void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool);
void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte);
void *lsmPoolMallocZero(lsm_env *pEnv, Mempool *pPool, int nByte);
................................................................................
int lsmLogStructure(lsm_db *pDb, char **pzVal);


/**************************************************************************
** Functions from file "lsm_shared.c".
*/

int lsmDbDatabaseConnect(lsm_db*, const char *);
void lsmDbDatabaseRelease(lsm_db *);

int lsmBeginReadTrans(lsm_db *);
int lsmBeginWriteTrans(lsm_db *);
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
................................................................................

/* Candidate values for the 3rd argument to lsmShmLock() */
#define LSM_LOCK_UNLOCK 0
#define LSM_LOCK_SHARED 1
#define LSM_LOCK_EXCL   2

int lsmShmChunk(lsm_db *db, int iChunk, void **ppData);
int lsmShmLock(lsm_db *db, int iLock, int eOp, int bBlock);
void lsmShmBarrier(lsm_db *db);

#ifdef LSM_DEBUG
void lsmShmHasLock(lsm_db *db, int iLock, int eOp);
#else
# define lsmShmHasLock(x,y,z)
#endif

Changes to src/lsm_ckpt.c.

928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
    int nInt;
    ShmHeader *pShm = pDb->pShmhdr;

    nInt = pShm->aClient[CKPT_HDR_NCKPT];
    memcpy(pDb->aSnapshot, pShm->aClient, nInt*sizeof(u32));
    if( ckptChecksumOk(pDb->aSnapshot) ) return LSM_OK;

    rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL);
    if( rc==LSM_BUSY ){
      usleep(50);
    }else{
      if( rc==LSM_OK ){
        if( ckptChecksumOk(pShm->aClient)==0 ){
          nInt = pShm->aWorker[CKPT_HDR_NCKPT];
          memcpy(pShm->aClient, pShm->aWorker, nInt*sizeof(u32));
        }
        nInt = pShm->aClient[CKPT_HDR_NCKPT];
        memcpy(pDb->aSnapshot, &pShm->aClient, nInt*sizeof(u32));
        lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK);

        if( ckptChecksumOk(pDb->aSnapshot)==0 ){
          rc = LSM_CORRUPT_BKPT;
        }
      }
      return rc;
    }







|










|







928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
    int nInt;
    ShmHeader *pShm = pDb->pShmhdr;

    nInt = pShm->aClient[CKPT_HDR_NCKPT];
    memcpy(pDb->aSnapshot, pShm->aClient, nInt*sizeof(u32));
    if( ckptChecksumOk(pDb->aSnapshot) ) return LSM_OK;

    rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);
    if( rc==LSM_BUSY ){
      usleep(50);
    }else{
      if( rc==LSM_OK ){
        if( ckptChecksumOk(pShm->aClient)==0 ){
          nInt = pShm->aWorker[CKPT_HDR_NCKPT];
          memcpy(pShm->aClient, pShm->aWorker, nInt*sizeof(u32));
        }
        nInt = pShm->aClient[CKPT_HDR_NCKPT];
        memcpy(pDb->aSnapshot, &pShm->aClient, nInt*sizeof(u32));
        lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);

        if( ckptChecksumOk(pDb->aSnapshot)==0 ){
          rc = LSM_CORRUPT_BKPT;
        }
      }
      return rc;
    }

Changes to src/lsm_main.c.

121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
...
214
215
216
217
218
219
220
221





222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  return rc;
}

/*
** If required, run the recovery procedure to initialize the database.
** Return LSM_OK if successful or an error code otherwise.
**
** As in SQLite 3 WAL mode, "recovery" essentially means make sure the
** contents of the shared-memory region are consistent with the data
** stored in the file-system.
*/
static int dbRecoverIfRequired(lsm_db *pDb){
  int rc = LSM_OK;
  ShmHeader *pShm = pDb->pShmhdr;

  assert( pDb->pWorker==0 && pDb->pClient==0 );

  while( rc==LSM_OK && pShm->bInit==0 ){
    /* The shared-memory "system initialized" flag is clear. Attempt to run
    ** the recovery procedure.  */

    rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL);
    if( rc==LSM_BUSY ){
      /* Could not obtain the lock. Sleep a while and try again. */
      usleep(50);
    }else{
      if( rc==LSM_OK ){
        if( pShm->bInit==0 ){
          memset(pShm, 0, sizeof(ShmHeader));
          rc = lsmCheckpointRecover(pDb);
          if( rc==LSM_OK ){
            rc = lsmLogRecover(pDb);
          }

          /* If successful, set the ShmHeader.bInit variable. */
          if( rc==LSM_OK ){
            lsmShmBarrier(pDb);
            pDb->pShmhdr->bInit = 1;
          }
        }
        lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK);
      }
    }
  }

  assert( pShm->bInit || rc!=LSM_OK );
  return rc;
}

static int getFullpathname(
  lsm_env *pEnv, 
  const char *zRel,
  char **pzAbs
){
  int nAlloc = 0;
  char *zAlloc = 0;
................................................................................
    ** than one purpose - to open both the database and log files, and 
    ** perhaps to unlink the log file during disconnection. An absolute
    ** path is required to ensure that the correct files are operated
    ** on even if the application changes the cwd.  */
    rc = getFullpathname(pDb->pEnv, zFilename, &zFull);
    assert( rc==LSM_OK || zFull==0 );

    /* Open the database file */





    if( rc==LSM_OK ){
      rc = lsmFsOpen(pDb, zFull);
    }

    /* Open the shared data handle. */
    if( rc==LSM_OK ){
      rc = lsmDbDatabaseFind(pDb, zFilename);
    }

    /* Obtain a pointer to the shared-memory header */
    if( rc==LSM_OK ){
      rc = lsmShmChunk(pDb, 0, (void **)&pDb->pShmhdr);
    }

    /* If required, run recovery */
    if( rc==LSM_OK ){
      rc = dbRecoverIfRequired(pDb);
    }

    /* Configure the file-system connection with the page-size and block-size
    ** of this database. Even if the database file is zero bytes in size
    ** on disk, these values have been set in shared-memory, and so are
    ** guaranteed not to change during the lifetime of this connection.  */
    if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb)) ){
      lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot));
      lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot));
    }

    lsmFree(pDb->pEnv, zFull);







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







|
>
>
>
>
>




|

|
<
<
<
<
<
<
<
<
<
<




|







121
122
123
124
125
126
127














































128
129
130
131
132
133
134
...
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187










188
189
190
191
192
193
194
195
196
197
198
199
  }else{
    int rcdummy = LSM_BUSY;
    lsmFinishWork(pDb, 0, 0, &rcdummy);
  }
  return rc;
}















































static int getFullpathname(
  lsm_env *pEnv, 
  const char *zRel,
  char **pzAbs
){
  int nAlloc = 0;
  char *zAlloc = 0;
................................................................................
    ** than one purpose - to open both the database and log files, and 
    ** perhaps to unlink the log file during disconnection. An absolute
    ** path is required to ensure that the correct files are operated
    ** on even if the application changes the cwd.  */
    rc = getFullpathname(pDb->pEnv, zFilename, &zFull);
    assert( rc==LSM_OK || zFull==0 );

    /* Open the database and log files. 
    **
    ** TODO: Opening the log file before calling DbDatabaseConnect() is 
    ** incorrect. Some other connection could unlink() it. Should change
    ** the FileSystem object to open the log file lazily.
    */
    if( rc==LSM_OK ){
      rc = lsmFsOpen(pDb, zFull);
    }

    /* Connect to the database */
    if( rc==LSM_OK ){
      rc = lsmDbDatabaseConnect(pDb, zFilename);










    }

    /* Configure the file-system connection with the page-size and block-size
    ** of this database. Even if the database file is zero bytes in size
    ** on disk, these values have been set in shared-memory by now, and so are
    ** guaranteed not to change during the lifetime of this connection.  */
    if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb)) ){
      lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot));
      lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot));
    }

    lsmFree(pDb->pEnv, zFull);

Changes to src/lsm_shared.c.

144
145
146
147
148
149
150





















































































151
152
153
154
155
156
157
...
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
...
222
223
224
225
226
227
228





229
230
231
232
233
234
235
236
237
238
239
240




241
242
243
244
245
246
247
...
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
...
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
...
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
...
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
...
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
...
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
...
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
...
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
...
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
...
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
...
821
822
823
824
825
826
827
828

829
830
831
832
833
834
835
    /* Free the mutexes */
    lsmMutexDel(pEnv, p->pClientMutex);

    /* Free the memory allocated for the Database struct itself */
    lsmFree(pEnv, p);
  }
}






















































































/*
** Return a reference to the shared Database handle for the database 
** identified by canonical path zName. If this is the first connection to
** the named database, a new Database object is allocated. Otherwise, a
** pointer to an existing object is returned.
**
................................................................................
** If successful, *ppDatabase is set to point to the shared Database 
** structure and LSM_OK returned. Otherwise, *ppDatabase is set to NULL
** and and LSM error code returned.
**
** Each successful call to this function should be (eventually) matched
** by a call to lsmDbDatabaseRelease().
*/
int lsmDbDatabaseFind(
  lsm_db *pDb,                    /* Database handle */
  const char *zName               /* Path to db file */
){
  lsm_env *pEnv = pDb->pEnv;
  int rc;                         /* Return code */
  Database *p = 0;                /* Pointer returned via *ppDatabase */
  int nId = 0;
................................................................................
      p->pConn = pDb;
      lsmMutexLeave(pDb->pEnv, p->pClientMutex);
    }
  }

  lsmFree(pEnv, pId);
  pDb->pDatabase = p;





  return rc;
}

/*
** Release a reference to a Database object obtained from lsmDbDatabaseFind().
** There should be exactly one call to this function for each successful
** call to Find().
*/
void lsmDbDatabaseRelease(lsm_db *pDb){
  Database *p = pDb->pDatabase;
  if( p ){
    lsm_db **ppDb;





    lsmMutexEnter(pDb->pEnv, p->pClientMutex);
    for(ppDb=&p->pConn; *ppDb!=pDb; ppDb=&((*ppDb)->pNext));
    *ppDb = pDb->pNext;
    lsmMutexLeave(pDb->pEnv, p->pClientMutex);

    enterGlobalMutex(pDb->pEnv);
................................................................................
      int i;
      Database **pp;

      /* Remove the Database structure from the linked list. */
      for(pp=&gShared.pDatabase; *pp!=p; pp=&((*pp)->pDbNext));
      *pp = p->pDbNext;

      if( pDb->pShmhdr && pDb->pShmhdr->bInit ){
        /* Flush the in-memory tree, if required. If there is data to flush,
        ** this will create a new client snapshot in Database.pClient. The
        ** checkpoint (serialization) of this snapshot may be written to disk
        ** by the following block.  */
        if( 0==lsmTreeIsEmpty(pDb) ){
          rc = lsmFlushToDisk(pDb);
        }

        /* Write a checkpoint, also if required */
        if( rc==LSM_OK ){
          rc = lsmCheckpointWrite(pDb);
        }

        /* If the checkpoint was written successfully, delete the log file */
        if( rc==LSM_OK && pDb->pFS ){
          lsmFsCloseAndDeleteLog(pDb->pFS);
        }
      }

      for(i=0; i<p->nShmChunk; i++){
        lsmFree(pDb->pEnv, p->apShmChunk[i]);
      }
      lsmFree(pDb->pEnv, p->apShmChunk);
      
      /* Free the Database object */
      freeDatabase(pDb->pEnv, p);
    }
    leaveGlobalMutex(pDb->pEnv);
  }
}

Level *lsmDbSnapshotLevel(Snapshot *pSnapshot){
................................................................................
int lsmCheckpointWrite(lsm_db *pDb){
  int rc;                         /* Return Code */

  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

  rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL);
  if( rc!=LSM_OK ) return rc;

  rc = lsmCheckpointLoad(pDb);
  if( rc==LSM_OK ){
    ShmHeader *pShm = pDb->pShmhdr;
    int bDone = 0;                /* True if checkpoint is already stored */

................................................................................
  /* If no error has occured, then the snapshot currently in pDb->aSnapshot
  ** has been synced to disk. This means it may be possible to wrap the
  ** log file. Obtain the WRITER lock and update the relevent tree-header
  ** fields to reflect this.  */
  if( rc==LSM_OK ){
    int rc2;
    u64 iLogoff = lsmCheckpointLogOffset(pDb->aSnapshot);
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL);
    if( rc==LSM_OK ) rc = lsmTreeLoadHeader(pDb);
    if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
    if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
    if( rc==LSM_BUSY ) rc = LSM_OK;
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK);
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
  int rc;

  /* Attempt to take the WORKER lock */
  rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL);

  /* Deserialize the current worker snapshot */
  if( rc==LSM_OK ){
    rc = lsmCheckpointLoadWorker(pDb);
    if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;
  }
  return rc;
................................................................................
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK);
}


/*
** Called when recovery is finished.
*/
int lsmFinishRecovery(lsm_db *pDb){
................................................................................
  assert( pDb->nTransOpen==0 );

  /* If there is no read-transaction open, open one now. */
  rc = lsmBeginReadTrans(pDb);

  /* Attempt to take the WRITER lock */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL);
  }

  /* If the previous writer failed mid-transaction, run emergency rollback. */
  if( rc==LSM_OK && pShm->bWriter ){
    /* TODO: This! */
    assert( 0 );
    rc = LSM_CORRUPT_BKPT;
................................................................................
  /* If everything was successful, set the "transaction-in-progress" flag
  ** and return LSM_OK. Otherwise, if some error occurred, relinquish the 
  ** WRITER lock and return an error code.  */
  if( rc==LSM_OK ){
    pShm->bWriter = 1;
    pDb->treehdr.iTransId++;
  }else{
    lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK);
    if( pDb->pCsr==0 ) lsmFinishReadTrans(pDb);
  }
  return rc;
}

/*
** End the current write transaction. The connection is left with an open
................................................................................
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
  lsmLogEnd(pDb, bCommit);
  lsmTreeEndTransaction(pDb, bCommit);
  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK);
  return LSM_OK;
}


/*
** Return non-zero if the caller is holding the client mutex.
*/
................................................................................

  assert( db->iReader<0 );

  /* Search for an exact match. */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId==iLsm && p->iTreeId==iTree ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED);
      if( rc==LSM_OK && p->iLsmId==iLsm && p->iTreeId==iTree ){
        db->iReader = i;
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }

  /* Try to obtain a write-lock on each slot, in order. If successful, set
  ** the slot values to iLsm/iTree.  */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL);
    if( rc==LSM_BUSY ){
      rc = LSM_OK;
    }else{
      ShmReader *p = &pShm->aReader[i];
      p->iLsmId = iLsm;
      p->iTreeId = iTree;
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED);
      if( rc==LSM_OK ) db->iReader = i;
    }
  }

  /* Search for any usable slot */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId && p->iTreeId && p->iLsmId<=iLsm && p->iTreeId<=iTree ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED);
      if( rc==LSM_OK ){
        if( p->iLsmId && p->iTreeId && p->iLsmId<=iLsm && p->iTreeId<=iTree ){
          db->iReader = i;
        }
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
................................................................................
  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId && p->iTreeId && (p->iTreeId<=iTree || p->iLsmId<=iLsm) ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL);
      if( rc==LSM_OK ){
        p->iTreeId = p->iLsmId = 0;
        lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK);
      }
    }
  }

  if( rc==LSM_BUSY ){
    *pbInUse = 1;
    return LSM_OK;
................................................................................

/*
** Release the read-lock currently held by connection db.
*/
int lsmReleaseReadlock(lsm_db *db){
  int rc = LSM_OK;
  if( db->iReader>=0 ){
    rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK);
    db->iReader = -1;
  }
  return rc;
}



................................................................................
**
** Parameter iLock must be one of LSM_LOCK_WRITER, WORKER or CHECKPOINTER,
** or else a value returned by the LSM_LOCK_READER macro.
*/
int lsmShmLock(
  lsm_db *db, 
  int iLock,
  int eOp                         /* One of LSM_LOCK_UNLOCK, SHARED or EXCL */

){
  int rc = LSM_OK;
  Database *p = db->pDatabase;

  assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  assert( iLock<=16 );
  assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|







 







>
>
>
>
>




|
|
|





>
>
>
>







 







|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<




<
<







 







|







 







|






|







|







 







|







 







|







 







|







 







|







 







|











|






|








|







 







|


|







 







|







 







|
>







144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
...
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
...
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
...
345
346
347
348
349
350
351
352



















353
354
355
356


357
358
359
360
361
362
363
...
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
...
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
...
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
...
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
...
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
...
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
...
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
...
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
...
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
...
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
    /* Free the mutexes */
    lsmMutexDel(pEnv, p->pClientMutex);

    /* Free the memory allocated for the Database struct itself */
    lsmFree(pEnv, p);
  }
}

static int doDbDisconnect(lsm_db *pDb){
  int rc;

  /* Block for an exclusive lock on DMS1. This lock serializes all calls
  ** to doDbConnect() and doDbDisconnect() across all processes.  */
  rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
  if( rc==LSM_OK ){

    /* Try an exclusive lock on DMS2. If successful, this is the first and 
    ** only connection to the database. In this case initialize the 
    ** shared-memory and run log file recovery.  */
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0);
    if( rc==LSM_OK ){
      /* Flush the in-memory tree, if required. If there is data to flush,
      ** this will create a new client snapshot in Database.pClient. The
      ** checkpoint (serialization) of this snapshot may be written to disk
      ** by the following block.  */
      if( 0==lsmTreeIsEmpty(pDb) ){
        rc = lsmFlushToDisk(pDb);
      }

      /* Write a checkpoint to disk. */
      if( rc==LSM_OK ){
        rc = lsmCheckpointWrite(pDb);
      }

      /* If the checkpoint was written successfully, delete the log file */
      if( rc==LSM_OK && pDb->pFS ){
        lsmFsCloseAndDeleteLog(pDb->pFS);
      }
    }
  }

  lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
  lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
  pDb->pShmhdr = 0;
}

static int doDbConnect(lsm_db *pDb){
  int rc;

  /* Obtain a pointer to the shared-memory header */
  assert( pDb->pShmhdr==0 );
  rc = lsmShmChunk(pDb, 0, (void **)&pDb->pShmhdr);
  if( rc!=LSM_OK ) return rc;

  /* Block for an exclusive lock on DMS1. This lock serializes all calls
  ** to doDbConnect() and doDbDisconnect() across all processes.  */
  rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
  if( rc!=LSM_OK ){
    pDb->pShmhdr = 0;
    return rc;
  }

  /* Try an exclusive lock on DMS2. If successful, this is the first and 
  ** only connection to the database. In this case initialize the 
  ** shared-memory and run log file recovery.  */
  rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0);
  if( rc==LSM_OK ){
    memset(pDb->pShmhdr, 0, sizeof(ShmHeader));
    rc = lsmCheckpointRecover(pDb);
    if( rc==LSM_OK ){
      rc = lsmLogRecover(pDb);
    }
  }else if( rc==LSM_BUSY ){
    rc = LSM_OK;
  }

  /* Take a shared lock on DMS2. This lock "cannot" fail, as connections 
  ** may only hold an exclusive lock on DMS2 if they first hold an exclusive
  ** lock on DMS1. And this connection is currently holding the exclusive
  ** lock on DSM1.  */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_SHARED, 0);
  }

  /* If anything went wrong, unlock DMS2. Unlock DMS1 in any case. */
  if( rc!=LSM_OK ){
    lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
    pDb->pShmhdr = 0;
  }
  lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_UNLOCK, 0);
  return rc;
}

/*
** Return a reference to the shared Database handle for the database 
** identified by canonical path zName. If this is the first connection to
** the named database, a new Database object is allocated. Otherwise, a
** pointer to an existing object is returned.
**
................................................................................
** If successful, *ppDatabase is set to point to the shared Database 
** structure and LSM_OK returned. Otherwise, *ppDatabase is set to NULL
** and and LSM error code returned.
**
** Each successful call to this function should be (eventually) matched
** by a call to lsmDbDatabaseRelease().
*/
int lsmDbDatabaseConnect(
  lsm_db *pDb,                    /* Database handle */
  const char *zName               /* Path to db file */
){
  lsm_env *pEnv = pDb->pEnv;
  int rc;                         /* Return code */
  Database *p = 0;                /* Pointer returned via *ppDatabase */
  int nId = 0;
................................................................................
      p->pConn = pDb;
      lsmMutexLeave(pDb->pEnv, p->pClientMutex);
    }
  }

  lsmFree(pEnv, pId);
  pDb->pDatabase = p;

  if( rc==LSM_OK ){
    rc = doDbConnect(pDb);
  }

  return rc;
}

/*
** Release a reference to a Database object obtained from 
** lsmDbDatabaseConnect(). There should be exactly one call to this function 
** for each successful call to Find().
*/
void lsmDbDatabaseRelease(lsm_db *pDb){
  Database *p = pDb->pDatabase;
  if( p ){
    lsm_db **ppDb;

    if( pDb->pShmhdr ){
      doDbDisconnect(pDb);
    }

    lsmMutexEnter(pDb->pEnv, p->pClientMutex);
    for(ppDb=&p->pConn; *ppDb!=pDb; ppDb=&((*ppDb)->pNext));
    *ppDb = pDb->pNext;
    lsmMutexLeave(pDb->pEnv, p->pClientMutex);

    enterGlobalMutex(pDb->pEnv);
................................................................................
      int i;
      Database **pp;

      /* Remove the Database structure from the linked list. */
      for(pp=&gShared.pDatabase; *pp!=p; pp=&((*pp)->pDbNext));
      *pp = p->pDbNext;

      /* Free the Database object and shared memory buffers. */



















      for(i=0; i<p->nShmChunk; i++){
        lsmFree(pDb->pEnv, p->apShmChunk[i]);
      }
      lsmFree(pDb->pEnv, p->apShmChunk);


      freeDatabase(pDb->pEnv, p);
    }
    leaveGlobalMutex(pDb->pEnv);
  }
}

Level *lsmDbSnapshotLevel(Snapshot *pSnapshot){
................................................................................
int lsmCheckpointWrite(lsm_db *pDb){
  int rc;                         /* Return Code */

  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

  rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
  if( rc!=LSM_OK ) return rc;

  rc = lsmCheckpointLoad(pDb);
  if( rc==LSM_OK ){
    ShmHeader *pShm = pDb->pShmhdr;
    int bDone = 0;                /* True if checkpoint is already stored */

................................................................................
  /* If no error has occured, then the snapshot currently in pDb->aSnapshot
  ** has been synced to disk. This means it may be possible to wrap the
  ** log file. Obtain the WRITER lock and update the relevent tree-header
  ** fields to reflect this.  */
  if( rc==LSM_OK ){
    int rc2;
    u64 iLogoff = lsmCheckpointLogOffset(pDb->aSnapshot);
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
    if( rc==LSM_OK ) rc = lsmTreeLoadHeader(pDb);
    if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
    if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
    if( rc==LSM_BUSY ) rc = LSM_OK;
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
  int rc;

  /* Attempt to take the WORKER lock */
  rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);

  /* Deserialize the current worker snapshot */
  if( rc==LSM_OK ){
    rc = lsmCheckpointLoadWorker(pDb);
    if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;
  }
  return rc;
................................................................................
  }

  if( pDb->pWorker ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
    pDb->pWorker = 0;
  }

  lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);
}


/*
** Called when recovery is finished.
*/
int lsmFinishRecovery(lsm_db *pDb){
................................................................................
  assert( pDb->nTransOpen==0 );

  /* If there is no read-transaction open, open one now. */
  rc = lsmBeginReadTrans(pDb);

  /* Attempt to take the WRITER lock */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
  }

  /* If the previous writer failed mid-transaction, run emergency rollback. */
  if( rc==LSM_OK && pShm->bWriter ){
    /* TODO: This! */
    assert( 0 );
    rc = LSM_CORRUPT_BKPT;
................................................................................
  /* If everything was successful, set the "transaction-in-progress" flag
  ** and return LSM_OK. Otherwise, if some error occurred, relinquish the 
  ** WRITER lock and return an error code.  */
  if( rc==LSM_OK ){
    pShm->bWriter = 1;
    pDb->treehdr.iTransId++;
  }else{
    lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
    if( pDb->pCsr==0 ) lsmFinishReadTrans(pDb);
  }
  return rc;
}

/*
** End the current write transaction. The connection is left with an open
................................................................................
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){
  lsmLogEnd(pDb, bCommit);
  lsmTreeEndTransaction(pDb, bCommit);
  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  return LSM_OK;
}


/*
** Return non-zero if the caller is holding the client mutex.
*/
................................................................................

  assert( db->iReader<0 );

  /* Search for an exact match. */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId==iLsm && p->iTreeId==iTree ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
      if( rc==LSM_OK && p->iLsmId==iLsm && p->iTreeId==iTree ){
        db->iReader = i;
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }

  /* Try to obtain a write-lock on each slot, in order. If successful, set
  ** the slot values to iLsm/iTree.  */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
    if( rc==LSM_BUSY ){
      rc = LSM_OK;
    }else{
      ShmReader *p = &pShm->aReader[i];
      p->iLsmId = iLsm;
      p->iTreeId = iTree;
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
      if( rc==LSM_OK ) db->iReader = i;
    }
  }

  /* Search for any usable slot */
  for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId && p->iTreeId && p->iLsmId<=iLsm && p->iTreeId<=iTree ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0);
      if( rc==LSM_OK ){
        if( p->iLsmId && p->iTreeId && p->iLsmId<=iLsm && p->iTreeId<=iTree ){
          db->iReader = i;
        }
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
................................................................................
  ShmHeader *pShm = db->pShmhdr;
  int i;
  int rc = LSM_OK;

  for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){
    ShmReader *p = &pShm->aReader[i];
    if( p->iLsmId && p->iTreeId && (p->iTreeId<=iTree || p->iLsmId<=iLsm) ){
      rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0);
      if( rc==LSM_OK ){
        p->iTreeId = p->iLsmId = 0;
        lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0);
      }
    }
  }

  if( rc==LSM_BUSY ){
    *pbInUse = 1;
    return LSM_OK;
................................................................................

/*
** Release the read-lock currently held by connection db.
*/
int lsmReleaseReadlock(lsm_db *db){
  int rc = LSM_OK;
  if( db->iReader>=0 ){
    rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
    db->iReader = -1;
  }
  return rc;
}



................................................................................
**
** Parameter iLock must be one of LSM_LOCK_WRITER, WORKER or CHECKPOINTER,
** or else a value returned by the LSM_LOCK_READER macro.
*/
int lsmShmLock(
  lsm_db *db, 
  int iLock,
  int eOp,                        /* One of LSM_LOCK_UNLOCK, SHARED or EXCL */
  int bBlock                      /* True for a blocking lock */
){
  int rc = LSM_OK;
  Database *p = db->pDatabase;

  assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  assert( iLock<=16 );
  assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );

Changes to src/lsm_tree.c.

1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
  while( 1 ){
    int rc;
    ShmHeader *pShm = pDb->pShmhdr;

    memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
    if( treeHeaderChecksumOk(&pDb->treehdr) ) return LSM_OK;

    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL);
    if( rc==LSM_BUSY ){
      usleep(50);
    }else{
      if( rc==LSM_OK ){
        if( treeHeaderChecksumOk(&pShm->hdr1)==0 ){
          memcpy(&pShm->hdr1, &pShm->hdr2, sizeof(TreeHeader));
        }
        memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
        lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK);

        if( treeHeaderChecksumOk(&pDb->treehdr)==0 ){
          rc = LSM_CORRUPT_BKPT;
        }
      }
      return rc;
    }







|








|







1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
  while( 1 ){
    int rc;
    ShmHeader *pShm = pDb->pShmhdr;

    memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
    if( treeHeaderChecksumOk(&pDb->treehdr) ) return LSM_OK;

    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
    if( rc==LSM_BUSY ){
      usleep(50);
    }else{
      if( rc==LSM_OK ){
        if( treeHeaderChecksumOk(&pShm->hdr1)==0 ){
          memcpy(&pShm->hdr1, &pShm->hdr2, sizeof(TreeHeader));
        }
        memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
        lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);

        if( treeHeaderChecksumOk(&pDb->treehdr)==0 ){
          rc = LSM_CORRUPT_BKPT;
        }
      }
      return rc;
    }