SQLite4
Check-in [d52cc59da3]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add code for emergency rollback of the shared-memory tree.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: d52cc59da3d564d50ecc2e76df7cf106c09e3c4c
User & Date: dan 2012-09-12 14:19:21
Context
2012-09-12
14:47
Fix an assert() failure that can follow an OOM error. check-in: d65a5112b6 user: dan tags: trunk
14:19
Add code for emergency rollback of the shared-memory tree. check-in: d52cc59da3 user: dan tags: trunk
2012-09-11
18:57
Fix a problem preventing shared-memory space from being reused. check-in: 6f9c692a0e user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Show Whitespace Changes Patch

Changes to src/lsmInt.h.

501
502
503
504
505
506
507

508
509
510
511
512
513
514
...
523
524
525
526
527
528
529

530
531
532
533
534
535
536
/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
void lsmTreeClear(lsm_db *);
int lsmTreeInit(lsm_db *);


int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeBeginTransaction(lsm_db *pDb);
int lsmTreeLoadHeader(lsm_db *pDb);

int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);
................................................................................
int lsmTreeCursorPrev(TreeCursor *pCsr);
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast);
void lsmTreeCursorReset(TreeCursor *pCsr);
int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey);
int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal);
int lsmTreeCursorValid(TreeCursor *pCsr);
int lsmTreeCursorSave(TreeCursor *pCsr);


/* 
** Functions from file "mem.c".
*/
int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool);
void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool);
void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte);







>







 







>







501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
...
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
/* 
** Functions from file "lsm_tree.c".
*/
int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
void lsmTreeRelease(lsm_env *, Tree *);
void lsmTreeClear(lsm_db *);
int lsmTreeInit(lsm_db *);
int lsmTreeRepair(lsm_db *);

int lsmTreeSize(lsm_db *);
int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
int lsmTreeBeginTransaction(lsm_db *pDb);
int lsmTreeLoadHeader(lsm_db *pDb);

int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);
................................................................................
int lsmTreeCursorPrev(TreeCursor *pCsr);
int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast);
void lsmTreeCursorReset(TreeCursor *pCsr);
int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey);
int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal);
int lsmTreeCursorValid(TreeCursor *pCsr);
int lsmTreeCursorSave(TreeCursor *pCsr);


/* 
** Functions from file "mem.c".
*/
int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool);
void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool);
void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte);

Changes to src/lsm_shared.c.

659
660
661
662
663
664
665

666
667
668
669
670
671
672
...
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
...
895
896
897
898
899
900
901

902
903
904
905
906
907
908
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
          ** version of the snapshot.  */
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );

        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }
  }
................................................................................
  /* Attempt to take the WRITER lock */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
  }

  /* If the previous writer failed mid-transaction, run emergency rollback. */
  if( rc==LSM_OK && pShm->bWriter ){
    /* TODO: This! */
    assert( 0 );
    rc = LSM_CORRUPT_BKPT;
  }

  /* Check that this connection is currently reading from the most recent
  ** version of the database. If not, return LSM_BUSY.  */
  if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){
    rc = LSM_BUSY;
  }
................................................................................

/*
** Release the read-lock currently held by connection db.
*/
int lsmReleaseReadlock(lsm_db *db){
  int rc = LSM_OK;
  if( db->iReader>=0 ){

    rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
    db->iReader = -1;
  }
  return rc;
}

/*







>







 







|
|
<







 







>







659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
...
712
713
714
715
716
717
718
719
720

721
722
723
724
725
726
727
...
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
        ){
          /* Read lock has been successfully obtained. Deserialize the 
          ** checkpoint just loaded. TODO: This will be removed after 
          ** lsm_sorted.c is changed to work directly from the serialized
          ** version of the snapshot.  */
          rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
          assert( (rc==LSM_OK)==(pDb->pClient!=0) );
          assert( pDb->iReader>=0 );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }
  }
................................................................................
  /* Attempt to take the WRITER lock */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
  }

  /* If the previous writer failed mid-transaction, run emergency rollback. */
  if( rc==LSM_OK && pShm->bWriter ){
    rc = lsmTreeRepair(pDb);
    if( rc==LSM_OK ) pShm->bWriter = 0;

  }

  /* Check that this connection is currently reading from the most recent
  ** version of the database. If not, return LSM_BUSY.  */
  if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){
    rc = LSM_BUSY;
  }
................................................................................

/*
** Release the read-lock currently held by connection db.
*/
int lsmReleaseReadlock(lsm_db *db){
  int rc = LSM_OK;
  if( db->iReader>=0 ){
    assert( db->pClient==0 );
    rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
    db->iReader = -1;
  }
  return rc;
}

/*

Changes to src/lsm_tree.c.

990
991
992
993
994
995
996



































































































































































































































































997
998
999
1000
1001
1002
1003
....
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
  pOne = treeShmChunkRc(pDb, 1, &rc);
  if( pOne ){
    pOne->iNext = 0;
    pOne->iShmid = 1;
  }
  return rc;
}




































































































































































































































































/*
** Insert a new entry into the in-memory tree.
**
** If the value of the 5th parameter, nVal, is negative, then a delete-marker
** is inserted into the tree. In this case the value pointer, pVal, must be
** NULL.
................................................................................
  pDb->treehdr.iRoot = pMark->iRoot;
  pDb->treehdr.nHeight = pMark->nHeight;
  pDb->treehdr.iWrite = pMark->iWrite;
  pDb->treehdr.nChunk = pMark->nChunk;
  pDb->treehdr.iNextShmid = pMark->iNextShmid;
}

static void treeHeaderChecksum(
  TreeHeader *pHdr, 
  u32 *aCksum
){
  u32 cksum1 = 0x12345678;
  u32 cksum2 = 0x9ABCDEF0;
  u32 *a = (u32 *)pHdr;
  int i;

  assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
  assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );

  for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
    cksum1 += a[i];
    cksum2 += (cksum1 + a[i+1]);
  }
  aCksum[0] = cksum1;
  aCksum[1] = cksum2;
}

/*
** Return true if the checksum stored in TreeHeader object *pHdr is 
** consistent with the contents of its other fields.
*/
static int treeHeaderChecksumOk(TreeHeader *pHdr){
  u32 aCksum[2];
  treeHeaderChecksum(pHdr, aCksum);
  return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
}

/*
** Load the in-memory tree header from shared-memory into pDb->treehdr.
** If the header cannot be loaded, return LSM_BUSY.
*/
int lsmTreeLoadHeader(lsm_db *pDb){
  while( 1 ){
    int rc;







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
....
1788
1789
1790
1791
1792
1793
1794






























1795
1796
1797
1798
1799
1800
1801
  pOne = treeShmChunkRc(pDb, 1, &rc);
  if( pOne ){
    pOne->iNext = 0;
    pOne->iShmid = 1;
  }
  return rc;
}

static void treeHeaderChecksum(
  TreeHeader *pHdr, 
  u32 *aCksum
){
  u32 cksum1 = 0x12345678;
  u32 cksum2 = 0x9ABCDEF0;
  u32 *a = (u32 *)pHdr;
  int i;

  assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
  assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );

  for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
    cksum1 += a[i];
    cksum2 += (cksum1 + a[i+1]);
  }
  aCksum[0] = cksum1;
  aCksum[1] = cksum2;
}

/*
** Return true if the checksum stored in TreeHeader object *pHdr is 
** consistent with the contents of its other fields.
*/
static int treeHeaderChecksumOk(TreeHeader *pHdr){
  u32 aCksum[2];
  treeHeaderChecksum(pHdr, aCksum);
  return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
}

/*
** This type is used by functions lsmTreeRepair() and treeSortByShmid() to
** make relinking the linked list of shared-memory chunks easier.
*/
typedef struct ShmChunkLoc ShmChunkLoc;
struct ShmChunkLoc {
  ShmChunk *pShm;
  u32 iLoc;
};

/*
** The array aShm[] is of size (nSz*2) elements. The first and second nSz 
** elements are both sorted in order of iShmid. This function merges the two
** arrays and writes the sorted results over the top of aShm[].
**
** Argument aSpace[] points to an array of at least (nSz*2) elements that
** can be used as temporary storage space while sorting.
*/
static void treeSortByShmid(ShmChunkLoc *aShm1, int nSz, ShmChunkLoc *aSpace){
  ShmChunkLoc *aShm2 = &aShm1[nSz];
  int i1 = 0;
  int i2 = 0;
  int iOut = 0;

  while( i1<nSz || i2<nSz ){
    if( i1==nSz || (i2!=nSz && aShm1[i1].pShm==0) ){
      aSpace[iOut] = aShm2[i2++];
    }else if( i2==nSz || aShm2[i2].pShm==0 ){
      aSpace[iOut] = aShm1[i1++];
    }else{
      assert( aShm1[i1].pShm && aShm2[i2].pShm );
      if( shm_sequence_ge(aShm1[i1].pShm->iShmid, aShm2[i2].pShm->iShmid) ){
        aSpace[iOut] = aShm2[i2++];
      }else{
        aSpace[iOut] = aShm1[i1++];
      }
    }
    iOut++;
  }

  memcpy(aShm1, aSpace, sizeof(ShmChunk *) * nSz*2);
}

/*
** This function checks that the linked list of shared memory chunks 
** that starts at chunk db->treehdr.iFirst:
**
**   1) Includes all chunks in the shared-memory region, and
**   2) Links them together in order of ascending shm-id.
**
** If no error occurs and the conditions above are met, LSM_OK is returned.
**
** If either of the conditions are untrue, LSM_CORRUPT is returned. Or, if
** an error is encountered before the checks are completed, another LSM error
** code (i.e. LSM_IOERR or LSM_NOMEM) may be returned.
*/
static int treeCheckLinkedList(lsm_db *db){
  int rc = LSM_OK;
  int nVisit = 0;
  u32 iShmid;
  ShmChunk *p;

  p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
  iShmid = p->iShmid;
  while( rc==LSM_OK && p ){
    if( p->iNext ){
      ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
      if( rc==LSM_OK ){
        if( pNext->iShmid!=p->iShmid+1 ){
          rc = LSM_CORRUPT_BKPT;
        }
        p = pNext;
      }
    }else{
      p = 0;
    }
    nVisit++;
  }

  if( rc==LSM_OK && nVisit!=db->treehdr.nChunk-1 ){
    rc = LSM_CORRUPT_BKPT;
  }
  return rc;
}

/*
** Iterate through the current in-memory tree. If there are any v2-pointers
** with transaction ids larger than db->treehdr.iTransId, zero them.
*/
static int treeRepairPtrs(lsm_db *db){
  int rc = LSM_OK;

  if( db->treehdr.nHeight>1 ){
    TreeCursor csr;               /* Cursor used to iterate through tree */
    u32 iTransId = db->treehdr.iTransId;

    /* Initialize the cursor structure. Also decrement the nHeight variable
    ** in the tree-header. This will prevent the cursor from visiting any
    ** leaf nodes.  */
    db->treehdr.nHeight--;
    treeCursorInit(db, &csr);

    rc = lsmTreeCursorEnd(&csr, 0);
    while( rc==LSM_OK && lsmTreeCursorValid(&csr) ){
      TreeNode *pNode = csr.apTreeNode[csr.iNode];
      if( pNode->iV2>iTransId ){
        pNode->iV2Child = 0;
        pNode->iV2Ptr = 0;
        pNode->iV2 = 0;
      }
      rc = lsmTreeCursorNext(&csr);
    }

    db->treehdr.nHeight++;
  }

  return rc;
}

static int treeRepairList(lsm_db *db){
  int rc = LSM_OK;
  int i;
  ShmChunk *p;
  ShmChunk *pMin = 0;
  u32 iMin = 0;

  /* Iterate through all shm chunks. Find the smallest shm-id present in
  ** the shared-memory region. */
  for(i=1; rc==LSM_OK && i<db->treehdr.nChunk; i++){
    p = treeShmChunkRc(db, i, &rc);
    if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
      pMin = p;
      iMin = i;
    }
  }

  /* Fix the shm-id values on any chunks with a shm-id greater than or 
  ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to 
  ** fix the ShmChunk.iNext pointers.
  */
  if( rc==LSM_OK ){
    int nSort;
    int nByte;
    ShmChunkLoc *aSort;

    /* Allocate space for a merge sort. */
    nSort = 1;
    while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
    nByte = sizeof(ShmChunkLoc) * nSort * 2;
    aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);

    /* Fix all shm-ids, if required. */
    if( rc==LSM_OK && iMin!=db->treehdr.iFirst ){
      u32 iPrevShmid = pMin->iShmid-1;
      for(i=1; i<db->treehdr.nChunk; i++){
        p = treeShmChunk(db, i);
        aSort[i-1].pShm = p;
        aSort[i-1].iLoc = i;
        if( i!=db->treehdr.iFirst ){
          if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
            p->iShmid = iPrevShmid--;
          }
        }
      }
      p = treeShmChunk(db, db->treehdr.iFirst);
      p->iShmid = iPrevShmid;
    }

    if( rc==LSM_OK ){
      ShmChunkLoc *aSpace = &aSort[nSort];
      for(i=2; i<=nSort; i+=2){
        int nSz;
        for(nSz=1; (i & (1 << (nSz-1)))==0; nSz++){
          treeSortByShmid(&aSort[i - nSz*2], nSz, aSpace);
        }
      }

      for(i=0; i<nSort-1; i++){
        if( aSort[i].pShm ){
          aSort[i].pShm->iNext = aSort[i+1].iLoc;
        }
      }
      assert( aSort[nSort-1].iLoc==0 );

      rc = treeCheckLinkedList(db);
    }
  }

  return rc;
}

/*
** This function is called as part of opening a write-transaction if the
** writer-flag is already set - indicating that the previous writer 
** failed before ending its transaction.
*/
int lsmTreeRepair(lsm_db *db){
  int rc = LSM_OK;
  TreeHeader hdr;
  ShmHeader *pHdr = db->pShmhdr;

  /* Ensure that the two tree-headers are consistent. Copy one over the other
  ** if necessary. Prefer the data from a tree-header for which the checksum
  ** computes. Or, if they both compute, prefer tree-header-1.  */
  if( memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)) ){
    if( treeHeaderChecksumOk(&pHdr->hdr1) ){
      memcpy(&pHdr->hdr2, &pHdr->hdr1, sizeof(TreeHeader));
    }else{
      memcpy(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader));
    }
  }

  /* Save the connections current copy of the tree-header. It will be 
  ** restored before returning.  */
  memcpy(&hdr, &db->treehdr, sizeof(TreeHeader));

  /* Walk the tree. Zero any v2 pointers with a transaction-id greater than
  ** the transaction-id currently in the tree-headers.  */
  rc = treeRepairPtrs(db);

  /* Repair the linked list of shared-memory chunks. */
  if( rc==LSM_OK ){
    rc = treeRepairList(db);
  }

  memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
  return rc;
}

/*
** Insert a new entry into the in-memory tree.
**
** If the value of the 5th parameter, nVal, is negative, then a delete-marker
** is inserted into the tree. In this case the value pointer, pVal, must be
** NULL.
................................................................................
  pDb->treehdr.iRoot = pMark->iRoot;
  pDb->treehdr.nHeight = pMark->nHeight;
  pDb->treehdr.iWrite = pMark->iWrite;
  pDb->treehdr.nChunk = pMark->nChunk;
  pDb->treehdr.iNextShmid = pMark->iNextShmid;
}































/*
** Load the in-memory tree header from shared-memory into pDb->treehdr.
** If the header cannot be loaded, return LSM_BUSY.
*/
int lsmTreeLoadHeader(lsm_db *pDb){
  while( 1 ){
    int rc;

Changes to www/lsm.wiki.

362
363
364
365
366
367
368


369
370
371
372
373
374
375
  <li> Snapshot 2.
  <li> The meta-page pointer. This value is either 1 or 2. It indicates which
       of the two meta-pages contains the most recent database snapshot.
  <li> READER lock values.
</ul>

<h2>Log file</h2>



<h1>3. Database Recovery and Shutdown</h1>

<p>
Exclusive locks on locking region DMS1 are used to serialize all connect and
disconnect operations. 








>
>







362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
  <li> Snapshot 2.
  <li> The meta-page pointer. This value is either 1 or 2. It indicates which
       of the two meta-pages contains the most recent database snapshot.
  <li> READER lock values.
</ul>

<h2>Log file</h2>

<a href=../src/lsm_log.c>lsm_log.c</a>.

<h1>3. Database Recovery and Shutdown</h1>

<p>
Exclusive locks on locking region DMS1 are used to serialize all connect and
disconnect operations.