Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Change the way worker clients keep snapshots consistent so as to match the description in lsm.wiki.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: 0427b07c145adb655fc0450d8835984d57e275d3
User & Date: dan 2012-09-12 16:19:03.155
Context
2012-09-12
17:23
Fix a problem in the lsm_info(LOG_STRUCTURE) command causing errors in tcl tests. check-in: 6e5e429ea7 user: dan tags: trunk
16:19
Change the way worker clients keep snapshots consistent so as to match the description in lsm.wiki. check-in: 0427b07c14 user: dan tags: trunk
14:47
Fix an assert() failure that can follow an OOM error. check-in: d65a5112b6 user: dan tags: trunk
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsm.h.
103
104
105
106
107
108
109

110
111
112
113
114
115
116
#define LSM_ERROR      1
#define LSM_BUSY       5
#define LSM_NOMEM      7
#define LSM_IOERR     10
#define LSM_CORRUPT   11
#define LSM_FULL      13
#define LSM_CANTOPEN  14

#define LSM_MISUSE    21

/* 
** Open and close a connection to a named database.
*/
int lsm_new(lsm_env*, lsm_db **ppDb);
int lsm_open(lsm_db *pDb, const char *zFilename);







>







103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#define LSM_ERROR      1
#define LSM_BUSY       5
#define LSM_NOMEM      7
#define LSM_IOERR     10
#define LSM_CORRUPT   11
#define LSM_FULL      13
#define LSM_CANTOPEN  14
#define LSM_PROTOCOL  15
#define LSM_MISUSE    21

/* 
** Open and close a connection to a named database.
*/
int lsm_new(lsm_env*, lsm_db **ppDb);
int lsm_open(lsm_db *pDb, const char *zFilename);
Changes to src/lsmInt.h.
146
147
148
149
150
151
152


153
154
155
156
157
158
159
/*
** Hard limit on the number of free-list entries that may be stored in 
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
*/
#define LSM_MAX_FREELIST_ENTRIES 100



/*
** A string that can grow by appending.
*/
struct LsmString {
  lsm_env *pEnv;              /* Run-time environment */
  int n;                      /* Size of string.  -1 indicates error */
  int nAlloc;                 /* Space allocated for z[] */







>
>







146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
** Hard limit on the number of free-list entries that may be stored in 
** a checkpoint (the remainder are stored as a system record in the LSM).
** See also LSM_CONFIG_MAX_FREELIST.
*/
#define LSM_MAX_FREELIST_ENTRIES 100

#define LSM_ATTEMPTS_BEFORE_PROTOCOL 10000

/*
** A string that can grow by appending.
*/
struct LsmString {
  lsm_env *pEnv;              /* Run-time environment */
  int n;                      /* Size of string.  -1 indicates error */
  int nAlloc;                 /* Space allocated for z[] */
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
**   contains the most recently written checkpoint (either 1 or 2).
**
** hdr1, hdr2:
**   The two copies of the in-memory tree header. Two copies are required
**   in case a writer fails while updating one of them.
*/
struct ShmHeader {
  u32 aClient[LSM_META_PAGE_SIZE / 4];
  u32 aWorker[LSM_META_PAGE_SIZE / 4];
  u32 bWriter;
  u32 iMetaPage;
  TreeHeader hdr1;
  TreeHeader hdr2;
  ShmReader aReader[LSM_LOCK_NREADER];
};








|
|







406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
**   contains the most recently written checkpoint (either 1 or 2).
**
** hdr1, hdr2:
**   The two copies of the in-memory tree header. Two copies are required
**   in case a writer fails while updating one of them.
*/
struct ShmHeader {
  u32 aSnap1[LSM_META_PAGE_SIZE / 4];
  u32 aSnap2[LSM_META_PAGE_SIZE / 4];
  u32 bWriter;
  u32 iMetaPage;
  TreeHeader hdr1;
  TreeHeader hdr2;
  ShmReader aReader[LSM_LOCK_NREADER];
};

478
479
480
481
482
483
484
485
486
487



488
489
490
491
492
493
494
int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);
int lsmCheckpointOverflowRequired(lsm_db *pDb);
int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);

int lsmCheckpointLoad(lsm_db *pDb);
int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);




i64 lsmCheckpointId(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);







<


>
>
>







480
481
482
483
484
485
486

487
488
489
490
491
492
493
494
495
496
497
498
int lsmCheckpointOverflow(lsm_db *pDb, void **, int *, int *);
int lsmCheckpointOverflowRequired(lsm_db *pDb);
int lsmCheckpointOverflowLoad(lsm_db *pDb, Freelist *);

int lsmCheckpointRecover(lsm_db *);
int lsmCheckpointDeserialize(lsm_db *, int, u32 *, Snapshot **);


int lsmCheckpointLoadWorker(lsm_db *pDb);
int lsmCheckpointStore(lsm_db *pDb, int);

int lsmCheckpointLoad(lsm_db *pDb, int *);
int lsmCheckpointLoadOk(lsm_db *pDb, int);

i64 lsmCheckpointId(u32 *, int);
i64 lsmCheckpointLogOffset(u32 *);
int lsmCheckpointPgsz(u32 *);
int lsmCheckpointBlksz(u32 *);
void lsmCheckpointLogoffset(u32 *aCkpt, DbLog *pLog);
void lsmCheckpointZeroLogoffset(lsm_db *);
506
507
508
509
510
511
512
513

514
515
516
517
518
519
520
void lsmTreeClear(lsm_db *);
int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeBeginTransaction(lsm_db *pDb);
int lsmTreeLoadHeader(lsm_db *pDb);


int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);
void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark);
void lsmTreeMark(lsm_db *pDb, TreeMark *pMark);

int lsmTreeCursorNew(lsm_db *pDb, TreeCursor **);
void lsmTreeCursorDestroy(TreeCursor *);







|
>







510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
void lsmTreeClear(lsm_db *);
int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeBeginTransaction(lsm_db *pDb);
int lsmTreeLoadHeader(lsm_db *pDb, int *);
int lsmTreeLoadHeaderOk(lsm_db *, int);

int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);
void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark);
void lsmTreeMark(lsm_db *pDb, TreeMark *pMark);

int lsmTreeCursorNew(lsm_db *pDb, TreeCursor **);
void lsmTreeCursorDestroy(TreeCursor *);
642
643
644
645
646
647
648


649
650
651
652
653
654
655
int lsmEnvOpen(lsm_env *, const char *, lsm_file **);
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile);
int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock);

int lsmEnvShmMap(lsm_env *, lsm_file *, int, int, void **); 
void lsmEnvShmBarrier(lsm_env *);
void lsmEnvShmUnmap(lsm_env *, lsm_file *, int);



/*
** End of functions from "lsm_file.c".
**************************************************************************/

/* 
** Functions from file "lsm_sorted.c".







>
>







647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
int lsmEnvOpen(lsm_env *, const char *, lsm_file **);
int lsmEnvClose(lsm_env *pEnv, lsm_file *pFile);
int lsmEnvLock(lsm_env *pEnv, lsm_file *pFile, int iLock, int eLock);

int lsmEnvShmMap(lsm_env *, lsm_file *, int, int, void **); 
void lsmEnvShmBarrier(lsm_env *);
void lsmEnvShmUnmap(lsm_env *, lsm_file *, int);

void lsmEnvSleep(lsm_env *, int);

/*
** End of functions from "lsm_file.c".
**************************************************************************/

/* 
** Functions from file "lsm_sorted.c".
Changes to src/lsm_ckpt.c.
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
    i64 iOff = pLog->aRegion[2].iEnd;
    ckptSetValue(p, iOut++, (iOff >> 32) & 0xFFFFFFFF, pRc);
    ckptSetValue(p, iOut++, (iOff & 0xFFFFFFFF), pRc);
    ckptSetValue(p, iOut++, pLog->cksum0, pRc);
    ckptSetValue(p, iOut++, pLog->cksum1, pRc);
  }else{
    for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
      ckptSetValue(p, iOut, pDb->pShmhdr->aWorker[iOut], pRc);
    }
  }

  *piOut = iOut;
}

static void ckptExportAppendlist(







|







331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
    i64 iOff = pLog->aRegion[2].iEnd;
    ckptSetValue(p, iOut++, (iOff >> 32) & 0xFFFFFFFF, pRc);
    ckptSetValue(p, iOut++, (iOff & 0xFFFFFFFF), pRc);
    ckptSetValue(p, iOut++, pLog->cksum0, pRc);
    ckptSetValue(p, iOut++, pLog->cksum1, pRc);
  }else{
    for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
      ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc);
    }
  }

  *piOut = iOut;
}

static void ckptExportAppendlist(
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
  if( nOvfl>=0 ){
    nFree -=  nOvfl;
  }else{
    nOvfl = pDb->pShmhdr->aWorker[CKPT_HDR_OVFL];
  }

  /* Initialize the output buffer */
  memset(&ckpt, 0, sizeof(CkptBuffer));
  ckpt.pEnv = pDb->pEnv;
  iOut = CKPT_HDR_SIZE;








|







378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
  if( nOvfl>=0 ){
    nFree -=  nOvfl;
  }else{
    nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL];
  }

  /* Initialize the output buffer */
  memset(&ckpt, 0, sizeof(CkptBuffer));
  ckpt.pEnv = pDb->pEnv;
  iOut = CKPT_HDR_SIZE;

822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
** Attempt to load a checkpoint from meta page iMeta.
**
** This function is a no-op if *pRc is set to any value other than LSM_OK
** when it is called. If an error occurs, *pRc is set to an LSM error code
** before returning.
**
** If no error occurs and the checkpoint is successfully loaded, copy it to
** ShmHeader.aClient[] and ShmHeader.aWorker[], and set ShmHeader.iMetaPage 
** to indicate its origin. In this case return 1. Or, if the checkpoint 
** cannot be loaded (because the checksum does not compute), return 0.
*/
static int ckptTryLoad(lsm_db *pDb, MetaPage *pPg, u32 iMeta, int *pRc){
  int bLoaded = 0;                /* Return value */
  if( *pRc==LSM_OK ){
    int rc = LSM_OK;              /* Error code */







|







822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
** Attempt to load a checkpoint from meta page iMeta.
**
** This function is a no-op if *pRc is set to any value other than LSM_OK
** when it is called. If an error occurs, *pRc is set to an LSM error code
** before returning.
**
** If no error occurs and the checkpoint is successfully loaded, copy it to
** ShmHeader.aSnap1[] and ShmHeader.aSnap2[], and set ShmHeader.iMetaPage 
** to indicate its origin. In this case return 1. Or, if the checkpoint 
** cannot be loaded (because the checksum does not compute), return 0.
*/
static int ckptTryLoad(lsm_db *pDb, MetaPage *pPg, u32 iMeta, int *pRc){
  int bLoaded = 0;                /* Return value */
  if( *pRc==LSM_OK ){
    int rc = LSM_OK;              /* Error code */
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
      aCkpt = (u32 *)lsmMallocRc(pDb->pEnv, nCkpt*sizeof(u32), &rc);
    }
    if( aCkpt ){
      memcpy(aCkpt, aData, nCkpt*sizeof(u32));
      ckptChangeEndianness(aCkpt, nCkpt);
      if( ckptChecksumOk(aCkpt) ){
        ShmHeader *pShm = pDb->pShmhdr;
        memcpy(pShm->aClient, aCkpt, nCkpt*sizeof(u32));
        memcpy(pShm->aWorker, aCkpt, nCkpt*sizeof(u32));
        memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
        pShm->iMetaPage = iMeta;
        bLoaded = 1;
      }
    }

    lsmFree(pDb->pEnv, aCkpt);







|
|







845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
      aCkpt = (u32 *)lsmMallocRc(pDb->pEnv, nCkpt*sizeof(u32), &rc);
    }
    if( aCkpt ){
      memcpy(aCkpt, aData, nCkpt*sizeof(u32));
      ckptChangeEndianness(aCkpt, nCkpt);
      if( ckptChecksumOk(aCkpt) ){
        ShmHeader *pShm = pDb->pShmhdr;
        memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32));
        memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32));
        memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
        pShm->iMetaPage = iMeta;
        bLoaded = 1;
      }
    }

    lsmFree(pDb->pEnv, aCkpt);
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
  ShmHeader *pShm = pDb->pShmhdr;

  aCkpt[CKPT_HDR_NCKPT] = nCkpt;
  aCkpt[CKPT_HDR_BLKSZ] = pDb->nDfltBlksz;
  aCkpt[CKPT_HDR_PGSZ] = pDb->nDfltPgsz;
  ckptChecksum(aCkpt, array_size(aCkpt), &aCkpt[nCkpt-2], &aCkpt[nCkpt-1]);

  memcpy(pShm->aClient, aCkpt, nCkpt*sizeof(u32));
  memcpy(pShm->aWorker, aCkpt, nCkpt*sizeof(u32));
  memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
}

/*
** This function is called as part of database recovery to initialize the
** ShmHeader.aClient[] and ShmHeader.aWorker[] snapshots.
*/
int lsmCheckpointRecover(lsm_db *pDb){
  int rc = LSM_OK;                /* Return Code */
  i64 iId1;                       /* Id of checkpoint on meta-page 1 */
  i64 iId2;                       /* Id of checkpoint on meta-page 2 */
  int bLoaded = 0;                /* True once checkpoint has been loaded */
  int cmp;                        /* True if (iId2>iId1) */







|
|





|







886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
  ShmHeader *pShm = pDb->pShmhdr;

  aCkpt[CKPT_HDR_NCKPT] = nCkpt;
  aCkpt[CKPT_HDR_BLKSZ] = pDb->nDfltBlksz;
  aCkpt[CKPT_HDR_PGSZ] = pDb->nDfltPgsz;
  ckptChecksum(aCkpt, array_size(aCkpt), &aCkpt[nCkpt-2], &aCkpt[nCkpt-1]);

  memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32));
  memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32));
  memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
}

/*
** This function is called as part of database recovery to initialize the
** ShmHeader.aSnap1[] and ShmHeader.aSnap2[] snapshots.
*/
int lsmCheckpointRecover(lsm_db *pDb){
  int rc = LSM_OK;                /* Return Code */
  i64 iId1;                       /* Id of checkpoint on meta-page 1 */
  i64 iId2;                       /* Id of checkpoint on meta-page 2 */
  int bLoaded = 0;                /* True once checkpoint has been loaded */
  int cmp;                        /* True if (iId2>iId1) */
953
954
955
956
957
958
959
960


961
962
963
964
965
966

967
968


969

970

971
972
973
974

975
976
977

978
979
980
981
982
983
984
985

986
987
988
989





990
991
992
993
994


995
996
997
998




999
1000
1001
1002



1003
1004

1005
1006
1007
1008
1009
1010
1011
1012
      
  return rc;
}

/*
** Copy the current client snapshot from shared-memory to pDb->aSnapshot[].
*/
int lsmCheckpointLoad(lsm_db *pDb){


  while( 1 ){
    int rc;
    int nInt;
    ShmHeader *pShm = pDb->pShmhdr;

    nInt = pShm->aClient[CKPT_HDR_NCKPT];

    memcpy(pDb->aSnapshot, pShm->aClient, nInt*sizeof(u32));
    if( ckptChecksumOk(pDb->aSnapshot) ) return LSM_OK;




    rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);

    if( rc==LSM_BUSY ){
      usleep(50);
    }else{
      if( rc==LSM_OK ){

        if( ckptChecksumOk(pShm->aClient)==0 ){
          nInt = pShm->aWorker[CKPT_HDR_NCKPT];
          memcpy(pShm->aClient, pShm->aWorker, nInt*sizeof(u32));

        }
        nInt = pShm->aClient[CKPT_HDR_NCKPT];
        memcpy(pDb->aSnapshot, &pShm->aClient, nInt*sizeof(u32));
        lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK, 0);

        if( ckptChecksumOk(pDb->aSnapshot)==0 ){
          rc = LSM_CORRUPT_BKPT;
        }

      }
      return rc;
    }
  }





}

int lsmCheckpointLoadWorker(lsm_db *pDb){
  int rc;
  ShmHeader *pShm = pDb->pShmhdr;



  /* Must be holding the WORKER lock to do this */
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );





  if( ckptChecksumOk(pShm->aWorker)==0 ){
    int nInt = (int)pShm->aClient[CKPT_HDR_NCKPT];
    memcpy(pShm->aWorker, pShm->aClient, nInt*sizeof(u32));
    if( ckptChecksumOk(pShm->aWorker)==0 ) return LSM_CORRUPT_BKPT;



  }


  rc = lsmCheckpointDeserialize(pDb, 1, pShm->aWorker, &pDb->pWorker);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  return rc;
}

int lsmCheckpointDeserialize(
  lsm_db *pDb, 
  int bInclFreelist,              /* If true, deserialize free-list */







|
>
>
|
<

<

|
>
|
|
>
>
|
>
|
>
|
<
<
<
>
|
|
<
>
|
<
<
<
|
<
<
|
>
|
|
|
|
>
>
>
>
>





>
>




>
>
>
>
|
<
|
|
>
>
>
|
|
>
|







953
954
955
956
957
958
959
960
961
962
963

964

965
966
967
968
969
970
971
972
973
974
975
976



977
978
979

980
981



982


983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009

1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
      
  return rc;
}

/*
** Copy the current client snapshot from shared-memory to pDb->aSnapshot[].
*/
int lsmCheckpointLoad(lsm_db *pDb, int *piRead){
  int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
  ShmHeader *pShm = pDb->pShmhdr;
  while( (nRem--)>0 ){

    int nInt;


    nInt = pShm->aSnap1[CKPT_HDR_NCKPT];
    if( nInt<=(LSM_META_PAGE_SIZE / sizeof(u32)) ){
      memcpy(pDb->aSnapshot, pShm->aSnap1, nInt*sizeof(u32));
      if( ckptChecksumOk(pDb->aSnapshot) ){
        if( piRead ) *piRead = 1;
        return LSM_OK;
      }
    }

    nInt = pShm->aSnap2[CKPT_HDR_NCKPT];
    if( nInt<=(LSM_META_PAGE_SIZE / sizeof(u32)) ){



      memcpy(pDb->aSnapshot, pShm->aSnap2, nInt*sizeof(u32));
      if( ckptChecksumOk(pDb->aSnapshot) ){
        if( piRead ) *piRead = 2;

        return LSM_OK;
      }



    }



    lsmShmBarrier(pDb);
  }
  return LSM_PROTOCOL;
}

int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){
  u32 *aShm;
  assert( iSnap==1 || iSnap==2 );
  aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2;
  return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) );
}

int lsmCheckpointLoadWorker(lsm_db *pDb){
  int rc;
  ShmHeader *pShm = pDb->pShmhdr;
  int nInt1;
  int nInt2;

  /* Must be holding the WORKER lock to do this */
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );

  /* Check that the two snapshots match. If not, repair them. */
  nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT];
  nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT];
  if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){
    if( ckptChecksumOk(pShm->aSnap1) ){

      memcpy(pShm->aSnap2, pShm->aSnap1, sizeof(u32)*nInt1);
    }else if( ckptChecksumOk(pShm->aSnap2) ){
      memcpy(pShm->aSnap1, pShm->aSnap2, sizeof(u32)*nInt2);
    }else{
      return LSM_PROTOCOL;
    }
  }

  rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  return rc;
}

int lsmCheckpointDeserialize(
  lsm_db *pDb, 
  int bInclFreelist,              /* If true, deserialize free-list */
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
  int rc;

  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aWorker, p, n);
  lsmShmBarrier(pDb);
  memcpy(pShm->aClient, p, n);
  lsmFree(pDb->pEnv, p);

  return LSM_OK;
}

/*
** This function is used to determine the snapshot-id of the most recently







|

|







1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
  int rc;

  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);
  memcpy(pShm->aSnap1, p, n);
  lsmFree(pDb->pEnv, p);

  return LSM_OK;
}

/*
** This function is used to determine the snapshot-id of the most recently
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
}

void lsmCheckpointZeroLogoffset(lsm_db *pDb){
  u32 nCkpt;

  nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT];
  assert( nCkpt>CKPT_HDR_NCKPT );
  assert( nCkpt==pDb->pShmhdr->aClient[CKPT_HDR_NCKPT] );
  assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aClient, nCkpt*sizeof(u32)) );
  assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aWorker, nCkpt*sizeof(u32)) );

  pDb->aSnapshot[CKPT_HDR_LO_MSW] = 0;
  pDb->aSnapshot[CKPT_HDR_LO_LSW] = 0;
  ckptChecksum(pDb->aSnapshot, nCkpt, 
      &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
  );

  memcpy(pDb->pShmhdr->aClient, pDb->aSnapshot, nCkpt*sizeof(u32));
  memcpy(pDb->pShmhdr->aWorker, pDb->aSnapshot, nCkpt*sizeof(u32));
}








|
|
|







|
|


1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
}

void lsmCheckpointZeroLogoffset(lsm_db *pDb){
  u32 nCkpt;

  nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT];
  assert( nCkpt>CKPT_HDR_NCKPT );
  assert( nCkpt==pDb->pShmhdr->aSnap1[CKPT_HDR_NCKPT] );
  assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap1, nCkpt*sizeof(u32)) );
  assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap2, nCkpt*sizeof(u32)) );

  pDb->aSnapshot[CKPT_HDR_LO_MSW] = 0;
  pDb->aSnapshot[CKPT_HDR_LO_LSW] = 0;
  ckptChecksum(pDb->aSnapshot, nCkpt, 
      &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
  );

  memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32));
  memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32));
}

Changes to src/lsm_log.c.
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
  rc = lsmFsOpenLog(pDb->pFS);
  if( rc!=LSM_OK ) return rc;

  rc = lsmTreeInit(pDb);
  if( rc!=LSM_OK ) return rc;

  pLog = &pDb->treehdr.log;
  lsmCheckpointLogoffset(pDb->pShmhdr->aWorker, pLog);

  logReaderInit(pDb, pLog, 1, &reader);
  lsmStringInit(&buf1, pDb->pEnv);
  lsmStringInit(&buf2, pDb->pEnv);

  /* The outer for() loop runs at most twice. The first iteration is to 
  ** count the number of committed transactions in the log. The second 







|







918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
  rc = lsmFsOpenLog(pDb->pFS);
  if( rc!=LSM_OK ) return rc;

  rc = lsmTreeInit(pDb);
  if( rc!=LSM_OK ) return rc;

  pLog = &pDb->treehdr.log;
  lsmCheckpointLogoffset(pDb->pShmhdr->aSnap2, pLog);

  logReaderInit(pDb, pLog, 1, &reader);
  lsmStringInit(&buf1, pDb->pEnv);
  lsmStringInit(&buf2, pDb->pEnv);

  /* The outer for() loop runs at most twice. The first iteration is to 
  ** count the number of committed transactions in the log. The second 
Changes to src/lsm_main.c.
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
      rc = lsmDbDatabaseConnect(pDb, zFull);
    }

    /* Configure the file-system connection with the page-size and block-size
    ** of this database. Even if the database file is zero bytes in size
    ** on disk, these values have been set in shared-memory by now, and so are
    ** guaranteed not to change during the lifetime of this connection.  */
    if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb)) ){
      lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot));
      lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot));
    }

    lsmFree(pDb->pEnv, zFull);
  }








|







178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
      rc = lsmDbDatabaseConnect(pDb, zFull);
    }

    /* Configure the file-system connection with the page-size and block-size
    ** of this database. Even if the database file is zero bytes in size
    ** on disk, these values have been set in shared-memory by now, and so are
    ** guaranteed not to change during the lifetime of this connection.  */
    if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb, 0)) ){
      lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot));
      lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot));
    }

    lsmFree(pDb->pEnv, zFull);
  }

226
227
228
229
230
231
232
233
234
235
236
237
238
239
240

  lsmFinishWork(pDb, 1, nOvfl, &rc);

  /* Restore the position of any open cursors */
  if( rc==LSM_OK && pDb->pCsr ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
    rc = lsmCheckpointLoad(pDb);
    if( rc==LSM_OK ){
      rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
    }
    if( rc==LSM_OK ){
      rc = lsmRestoreCursors(pDb);
    }
  }







|







226
227
228
229
230
231
232
233
234
235
236
237
238
239
240

  lsmFinishWork(pDb, 1, nOvfl, &rc);

  /* Restore the position of any open cursors */
  if( rc==LSM_OK && pDb->pCsr ){
    lsmFreeSnapshot(pDb->pEnv, pDb->pClient);
    pDb->pClient = 0;
    rc = lsmCheckpointLoad(pDb, 0);
    if( rc==LSM_OK ){
      rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
    }
    if( rc==LSM_OK ){
      rc = lsmRestoreCursors(pDb);
    }
  }
Changes to src/lsm_shared.c.
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    ** in-memory tree to disk and write a checkpoint.  */
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0);
    if( rc==LSM_OK ){
      /* Flush the in-memory tree, if required. If there is data to flush,
      ** this will create a new client snapshot in Database.pClient. The
      ** checkpoint (serialization) of this snapshot may be written to disk
      ** by the following block.  */
      rc = lsmTreeLoadHeader(pDb);
      if( rc==LSM_OK && lsmTreeSize(pDb)>0 ){
        rc = lsmFlushToDisk(pDb);
      }

      /* Write a checkpoint to disk. */
      if( rc==LSM_OK ){
        rc = lsmCheckpointWrite(pDb);







|







170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    ** in-memory tree to disk and write a checkpoint.  */
    rc = lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL, 0);
    if( rc==LSM_OK ){
      /* Flush the in-memory tree, if required. If there is data to flush,
      ** this will create a new client snapshot in Database.pClient. The
      ** checkpoint (serialization) of this snapshot may be written to disk
      ** by the following block.  */
      rc = lsmTreeLoadHeader(pDb, 0);
      if( rc==LSM_OK && lsmTreeSize(pDb)>0 ){
        rc = lsmFlushToDisk(pDb);
      }

      /* Write a checkpoint to disk. */
      if( rc==LSM_OK ){
        rc = lsmCheckpointWrite(pDb);
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

  rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
  if( rc!=LSM_OK ) return rc;

  rc = lsmCheckpointLoad(pDb);
  if( rc==LSM_OK ){
    ShmHeader *pShm = pDb->pShmhdr;
    int bDone = 0;                /* True if checkpoint is already stored */

    /* Check if this checkpoint has already been written to the database
    ** file. If so, set variable bDone to true.  */
    if( pShm->iMetaPage ){







|







528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
  assert( pDb->pWorker==0 );
  assert( 1 || pDb->pClient==0 );
  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_UNLOCK) );

  rc = lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_EXCL, 0);
  if( rc!=LSM_OK ) return rc;

  rc = lsmCheckpointLoad(pDb, 0);
  if( rc==LSM_OK ){
    ShmHeader *pShm = pDb->pShmhdr;
    int bDone = 0;                /* True if checkpoint is already stored */

    /* Check if this checkpoint has already been written to the database
    ** file. If so, set variable bDone to true.  */
    if( pShm->iMetaPage ){
629
630
631
632
633
634
635


636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657

658
659
660
661
662
663
664
665
  int rc = LSM_OK;                /* Return code */
  int iAttempt = 0;

  assert( pDb->pWorker==0 );
  assert( (pDb->pClient!=0)==(pDb->iReader>=0) );

  while( rc==LSM_OK && pDb->pClient==0 && (iAttempt++)<MAX_READLOCK_ATTEMPTS ){


    assert( pDb->pCsr==0 && pDb->nTransOpen==0 );

    /* Load the in-memory tree header. */
    rc = lsmTreeLoadHeader(pDb);

    /* Load the database snapshot */
    if( rc==LSM_OK ){
      rc = lsmCheckpointLoad(pDb);
    }

    /* Take a read-lock on the tree and snapshot just loaded. Then check
    ** that the shared-memory still contains the same values. If so, proceed.
    ** Otherwise, relinquish the read-lock and retry the whole procedure
    ** (starting with loading the in-memory tree header).  */
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      u32 iShmMax = pDb->treehdr.iUsedShmid;
      u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk;
      i64 iSnap = lsmCheckpointId(pDb->aSnapshot, 0);
      rc = lsmReadlock(pDb, iSnap, iShmMin, iShmMax);
      if( rc==LSM_OK ){
        if( 0==memcmp(pShm->hdr1.aCksum, pDb->treehdr.aCksum, sizeof(u32)*2)

         && iSnap==lsmCheckpointId(pShm->aClient, 0)
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
          ** version of the snapshot.  */
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );







>
>



|



|










<


<
>
|







629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655

656
657

658
659
660
661
662
663
664
665
666
  int rc = LSM_OK;                /* Return code */
  int iAttempt = 0;

  assert( pDb->pWorker==0 );
  assert( (pDb->pClient!=0)==(pDb->iReader>=0) );

  while( rc==LSM_OK && pDb->pClient==0 && (iAttempt++)<MAX_READLOCK_ATTEMPTS ){
    int iTreehdr = 0;
    int iSnap = 0;
    assert( pDb->pCsr==0 && pDb->nTransOpen==0 );

    /* Load the in-memory tree header. */
    rc = lsmTreeLoadHeader(pDb, &iTreehdr);

    /* Load the database snapshot */
    if( rc==LSM_OK ){
      rc = lsmCheckpointLoad(pDb, &iSnap);
    }

    /* Take a read-lock on the tree and snapshot just loaded. Then check
    ** that the shared-memory still contains the same values. If so, proceed.
    ** Otherwise, relinquish the read-lock and retry the whole procedure
    ** (starting with loading the in-memory tree header).  */
    if( rc==LSM_OK ){
      ShmHeader *pShm = pDb->pShmhdr;
      u32 iShmMax = pDb->treehdr.iUsedShmid;
      u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk;

      rc = lsmReadlock(pDb, iSnap, iShmMin, iShmMax);
      if( rc==LSM_OK ){

        if( lsmTreeLoadHeaderOk(pDb, iTreehdr)
         && lsmCheckpointLoadOk(pDb, iSnap)
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
          ** version of the snapshot.  */
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
Changes to src/lsm_tree.c.
1790
1791
1792
1793
1794
1795
1796
1797




1798
1799

1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818

1819
1820
1821


1822
1823
1824




1825
1826
1827
1828
1829
1830
1831
  pDb->treehdr.iWrite = pMark->iWrite;
  pDb->treehdr.nChunk = pMark->nChunk;
  pDb->treehdr.iNextShmid = pMark->iNextShmid;
}

/*
** Load the in-memory tree header from shared-memory into pDb->treehdr.
** If the header cannot be loaded, return LSM_BUSY.




*/
int lsmTreeLoadHeader(lsm_db *pDb){

  while( 1 ){
    int rc;
    ShmHeader *pShm = pDb->pShmhdr;

    memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
    if( treeHeaderChecksumOk(&pDb->treehdr) ) return LSM_OK;

    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
    if( rc==LSM_BUSY ){
      usleep(50);
    }else{
      if( rc==LSM_OK ){
        if( treeHeaderChecksumOk(&pShm->hdr1)==0 ){
          memcpy(&pShm->hdr1, &pShm->hdr2, sizeof(TreeHeader));
        }
        memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
        lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);

        if( treeHeaderChecksumOk(&pDb->treehdr)==0 ){

          rc = LSM_CORRUPT_BKPT;
        }
      }


      return rc;
    }
  }




}

/*
** This function is called to conclude a transaction. If argument bCommit
** is true, the transaction is committed. Otherwise it is rolled back.
*/
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit){







|
>
>
>
>

|
>
|




|
|
<
<
<
<
|
<
<
|
|
<
<
|
>
|
|
|
>
>
|
|
|
>
>
>
>







1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811




1812


1813
1814


1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
  pDb->treehdr.iWrite = pMark->iWrite;
  pDb->treehdr.nChunk = pMark->nChunk;
  pDb->treehdr.iNextShmid = pMark->iNextShmid;
}

/*
** Load the in-memory tree header from shared-memory into pDb->treehdr.
** If the header cannot be loaded, return LSM_PROTOCOL.
**
** If the header is successfully loaded and parameter piRead is not NULL,
** is is set to 1 if the header was loaded from ShmHeader.hdr1, or 2 if
** the header was loaded from ShmHeader.hdr2.
*/
int lsmTreeLoadHeader(lsm_db *pDb, int *piRead){
  int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
  while( (nRem--)>0 ){
    int rc;
    ShmHeader *pShm = pDb->pShmhdr;

    memcpy(&pDb->treehdr, &pShm->hdr1, sizeof(TreeHeader));
    if( treeHeaderChecksumOk(&pDb->treehdr) ){
      if( piRead ) *piRead = 1;




      return LSM_OK;


    }
    memcpy(&pDb->treehdr, &pShm->hdr2, sizeof(TreeHeader));


    if( treeHeaderChecksumOk(&pDb->treehdr) ){
      if( piRead ) *piRead = 2;
      return LSM_OK;
    }

    lsmShmBarrier(pDb);
  }
  return LSM_PROTOCOL;
}

int lsmTreeLoadHeaderOk(lsm_db *pDb, int iRead){
  TreeHeader *p = (iRead==1) ? &pDb->pShmhdr->hdr1 : &pDb->pShmhdr->hdr2;
  assert( iRead==1 || iRead==2 );
  return (0==memcmp(pDb->treehdr.aCksum, p->aCksum, sizeof(u32)*2));
}

/*
** This function is called to conclude a transaction. If argument bCommit
** is true, the transaction is committed. Otherwise it is rolled back.
*/
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit){