SQLite4
Check-in [d52cc59da3]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add code for emergency rollback of the shared-memory tree.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: d52cc59da3d564d50ecc2e76df7cf106c09e3c4c
User & Date: dan 2012-09-12 14:19:21
Context
2012-09-12
14:47
Fix an assert() failure that can follow an OOM error. check-in: d65a5112b6 user: dan tags: trunk
14:19
Add code for emergency rollback of the shared-memory tree. check-in: d52cc59da3 user: dan tags: trunk
2012-09-11
18:57
Fix a problem preventing shared-memory space from being reused. check-in: 6f9c692a0e user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

   501    501   /* 
   502    502   ** Functions from file "lsm_tree.c".
   503    503   */
   504    504   int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree);
   505    505   void lsmTreeRelease(lsm_env *, Tree *);
   506    506   void lsmTreeClear(lsm_db *);
   507    507   int lsmTreeInit(lsm_db *);
          508  +int lsmTreeRepair(lsm_db *);
   508    509   
   509    510   int lsmTreeSize(lsm_db *);
   510    511   int lsmTreeEndTransaction(lsm_db *pDb, int bCommit);
   511    512   int lsmTreeBeginTransaction(lsm_db *pDb);
   512    513   int lsmTreeLoadHeader(lsm_db *pDb);
   513    514   
   514    515   int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal);
................................................................................
   523    524   int lsmTreeCursorPrev(TreeCursor *pCsr);
   524    525   int lsmTreeCursorEnd(TreeCursor *pCsr, int bLast);
   525    526   void lsmTreeCursorReset(TreeCursor *pCsr);
   526    527   int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey);
   527    528   int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal);
   528    529   int lsmTreeCursorValid(TreeCursor *pCsr);
   529    530   int lsmTreeCursorSave(TreeCursor *pCsr);
          531  +
   530    532   
   531    533   /* 
   532    534   ** Functions from file "mem.c".
   533    535   */
   534    536   int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool);
   535    537   void lsmPoolDestroy(lsm_env *pEnv, Mempool *pPool);
   536    538   void *lsmPoolMalloc(lsm_env *pEnv, Mempool *pPool, int nByte);

Changes to src/lsm_shared.c.

   659    659           ){
   660    660             /* Read lock has been successfully obtained. Deserialize the 
   661    661             ** checkpoint just loaded. TODO: This will be removed after 
   662    662             ** lsm_sorted.c is changed to work directly from the serialized
   663    663             ** version of the snapshot.  */
   664    664             rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient);
   665    665             assert( (rc==LSM_OK)==(pDb->pClient!=0) );
          666  +          assert( pDb->iReader>=0 );
   666    667           }else{
   667    668             rc = lsmReleaseReadlock(pDb);
   668    669           }
   669    670         }
   670    671         if( rc==LSM_BUSY ) rc = LSM_OK;
   671    672       }
   672    673     }
................................................................................
   711    712     /* Attempt to take the WRITER lock */
   712    713     if( rc==LSM_OK ){
   713    714       rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
   714    715     }
   715    716   
   716    717     /* If the previous writer failed mid-transaction, run emergency rollback. */
   717    718     if( rc==LSM_OK && pShm->bWriter ){
   718         -    /* TODO: This! */
   719         -    assert( 0 );
   720         -    rc = LSM_CORRUPT_BKPT;
          719  +    rc = lsmTreeRepair(pDb);
          720  +    if( rc==LSM_OK ) pShm->bWriter = 0;
   721    721     }
   722    722   
   723    723     /* Check that this connection is currently reading from the most recent
   724    724     ** version of the database. If not, return LSM_BUSY.  */
   725    725     if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){
   726    726       rc = LSM_BUSY;
   727    727     }
................................................................................
   895    895   
   896    896   /*
   897    897   ** Release the read-lock currently held by connection db.
   898    898   */
   899    899   int lsmReleaseReadlock(lsm_db *db){
   900    900     int rc = LSM_OK;
   901    901     if( db->iReader>=0 ){
          902  +    assert( db->pClient==0 );
   902    903       rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0);
   903    904       db->iReader = -1;
   904    905     }
   905    906     return rc;
   906    907   }
   907    908   
   908    909   /*

Changes to src/lsm_tree.c.

   990    990     pOne = treeShmChunkRc(pDb, 1, &rc);
   991    991     if( pOne ){
   992    992       pOne->iNext = 0;
   993    993       pOne->iShmid = 1;
   994    994     }
   995    995     return rc;
   996    996   }
          997  +
          998  +static void treeHeaderChecksum(
          999  +  TreeHeader *pHdr, 
         1000  +  u32 *aCksum
         1001  +){
         1002  +  u32 cksum1 = 0x12345678;
         1003  +  u32 cksum2 = 0x9ABCDEF0;
         1004  +  u32 *a = (u32 *)pHdr;
         1005  +  int i;
         1006  +
         1007  +  assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
         1008  +  assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );
         1009  +
         1010  +  for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
         1011  +    cksum1 += a[i];
         1012  +    cksum2 += (cksum1 + a[i+1]);
         1013  +  }
         1014  +  aCksum[0] = cksum1;
         1015  +  aCksum[1] = cksum2;
         1016  +}
         1017  +
         1018  +/*
         1019  +** Return true if the checksum stored in TreeHeader object *pHdr is 
         1020  +** consistent with the contents of its other fields.
         1021  +*/
         1022  +static int treeHeaderChecksumOk(TreeHeader *pHdr){
         1023  +  u32 aCksum[2];
         1024  +  treeHeaderChecksum(pHdr, aCksum);
         1025  +  return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
         1026  +}
         1027  +
         1028  +/*
         1029  +** This type is used by functions lsmTreeRepair() and treeSortByShmid() to
         1030  +** make relinking the linked list of shared-memory chunks easier.
         1031  +*/
         1032  +typedef struct ShmChunkLoc ShmChunkLoc;
         1033  +struct ShmChunkLoc {
         1034  +  ShmChunk *pShm;
         1035  +  u32 iLoc;
         1036  +};
         1037  +
         1038  +/*
         1039  +** The array aShm[] is of size (nSz*2) elements. The first and second nSz 
         1040  +** elements are both sorted in order of iShmid. This function merges the two
         1041  +** arrays and writes the sorted results over the top of aShm[].
         1042  +**
         1043  +** Argument aSpace[] points to an array of at least (nSz*2) elements that
         1044  +** can be used as temporary storage space while sorting.
         1045  +*/
         1046  +static void treeSortByShmid(ShmChunkLoc *aShm1, int nSz, ShmChunkLoc *aSpace){
         1047  +  ShmChunkLoc *aShm2 = &aShm1[nSz];
         1048  +  int i1 = 0;
         1049  +  int i2 = 0;
         1050  +  int iOut = 0;
         1051  +
         1052  +  while( i1<nSz || i2<nSz ){
         1053  +    if( i1==nSz || (i2!=nSz && aShm1[i1].pShm==0) ){
         1054  +      aSpace[iOut] = aShm2[i2++];
         1055  +    }else if( i2==nSz || aShm2[i2].pShm==0 ){
         1056  +      aSpace[iOut] = aShm1[i1++];
         1057  +    }else{
         1058  +      assert( aShm1[i1].pShm && aShm2[i2].pShm );
         1059  +      if( shm_sequence_ge(aShm1[i1].pShm->iShmid, aShm2[i2].pShm->iShmid) ){
         1060  +        aSpace[iOut] = aShm2[i2++];
         1061  +      }else{
         1062  +        aSpace[iOut] = aShm1[i1++];
         1063  +      }
         1064  +    }
         1065  +    iOut++;
         1066  +  }
         1067  +
         1068  +  memcpy(aShm1, aSpace, sizeof(ShmChunk *) * nSz*2);
         1069  +}
         1070  +
         1071  +/*
         1072  +** This function checks that the linked list of shared memory chunks 
         1073  +** that starts at chunk db->treehdr.iFirst:
         1074  +**
         1075  +**   1) Includes all chunks in the shared-memory region, and
         1076  +**   2) Links them together in order of ascending shm-id.
         1077  +**
         1078  +** If no error occurs and the conditions above are met, LSM_OK is returned.
         1079  +**
         1080  +** If either of the conditions are untrue, LSM_CORRUPT is returned. Or, if
         1081  +** an error is encountered before the checks are completed, another LSM error
         1082  +** code (i.e. LSM_IOERR or LSM_NOMEM) may be returned.
         1083  +*/
         1084  +static int treeCheckLinkedList(lsm_db *db){
         1085  +  int rc = LSM_OK;
         1086  +  int nVisit = 0;
         1087  +  u32 iShmid;
         1088  +  ShmChunk *p;
         1089  +
         1090  +  p = treeShmChunkRc(db, db->treehdr.iFirst, &rc);
         1091  +  iShmid = p->iShmid;
         1092  +  while( rc==LSM_OK && p ){
         1093  +    if( p->iNext ){
         1094  +      ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc);
         1095  +      if( rc==LSM_OK ){
         1096  +        if( pNext->iShmid!=p->iShmid+1 ){
         1097  +          rc = LSM_CORRUPT_BKPT;
         1098  +        }
         1099  +        p = pNext;
         1100  +      }
         1101  +    }else{
         1102  +      p = 0;
         1103  +    }
         1104  +    nVisit++;
         1105  +  }
         1106  +
         1107  +  if( rc==LSM_OK && nVisit!=db->treehdr.nChunk-1 ){
         1108  +    rc = LSM_CORRUPT_BKPT;
         1109  +  }
         1110  +  return rc;
         1111  +}
         1112  +
         1113  +/*
         1114  +** Iterate through the current in-memory tree. If there are any v2-pointers
         1115  +** with transaction ids larger than db->treehdr.iTransId, zero them.
         1116  +*/
         1117  +static int treeRepairPtrs(lsm_db *db){
         1118  +  int rc = LSM_OK;
         1119  +
         1120  +  if( db->treehdr.nHeight>1 ){
         1121  +    TreeCursor csr;               /* Cursor used to iterate through tree */
         1122  +    u32 iTransId = db->treehdr.iTransId;
         1123  +
         1124  +    /* Initialize the cursor structure. Also decrement the nHeight variable
         1125  +    ** in the tree-header. This will prevent the cursor from visiting any
         1126  +    ** leaf nodes.  */
         1127  +    db->treehdr.nHeight--;
         1128  +    treeCursorInit(db, &csr);
         1129  +
         1130  +    rc = lsmTreeCursorEnd(&csr, 0);
         1131  +    while( rc==LSM_OK && lsmTreeCursorValid(&csr) ){
         1132  +      TreeNode *pNode = csr.apTreeNode[csr.iNode];
         1133  +      if( pNode->iV2>iTransId ){
         1134  +        pNode->iV2Child = 0;
         1135  +        pNode->iV2Ptr = 0;
         1136  +        pNode->iV2 = 0;
         1137  +      }
         1138  +      rc = lsmTreeCursorNext(&csr);
         1139  +    }
         1140  +
         1141  +    db->treehdr.nHeight++;
         1142  +  }
         1143  +
         1144  +  return rc;
         1145  +}
         1146  +
         1147  +static int treeRepairList(lsm_db *db){
         1148  +  int rc = LSM_OK;
         1149  +  int i;
         1150  +  ShmChunk *p;
         1151  +  ShmChunk *pMin = 0;
         1152  +  u32 iMin = 0;
         1153  +
         1154  +  /* Iterate through all shm chunks. Find the smallest shm-id present in
         1155  +  ** the shared-memory region. */
         1156  +  for(i=1; rc==LSM_OK && i<db->treehdr.nChunk; i++){
         1157  +    p = treeShmChunkRc(db, i, &rc);
         1158  +    if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){
         1159  +      pMin = p;
         1160  +      iMin = i;
         1161  +    }
         1162  +  }
         1163  +
         1164  +  /* Fix the shm-id values on any chunks with a shm-id greater than or 
         1165  +  ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to 
         1166  +  ** fix the ShmChunk.iNext pointers.
         1167  +  */
         1168  +  if( rc==LSM_OK ){
         1169  +    int nSort;
         1170  +    int nByte;
         1171  +    ShmChunkLoc *aSort;
         1172  +
         1173  +    /* Allocate space for a merge sort. */
         1174  +    nSort = 1;
         1175  +    while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2;
         1176  +    nByte = sizeof(ShmChunkLoc) * nSort * 2;
         1177  +    aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc);
         1178  +
         1179  +    /* Fix all shm-ids, if required. */
         1180  +    if( rc==LSM_OK && iMin!=db->treehdr.iFirst ){
         1181  +      u32 iPrevShmid = pMin->iShmid-1;
         1182  +      for(i=1; i<db->treehdr.nChunk; i++){
         1183  +        p = treeShmChunk(db, i);
         1184  +        aSort[i-1].pShm = p;
         1185  +        aSort[i-1].iLoc = i;
         1186  +        if( i!=db->treehdr.iFirst ){
         1187  +          if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){
         1188  +            p->iShmid = iPrevShmid--;
         1189  +          }
         1190  +        }
         1191  +      }
         1192  +      p = treeShmChunk(db, db->treehdr.iFirst);
         1193  +      p->iShmid = iPrevShmid;
         1194  +    }
         1195  +
         1196  +    if( rc==LSM_OK ){
         1197  +      ShmChunkLoc *aSpace = &aSort[nSort];
         1198  +      for(i=2; i<=nSort; i+=2){
         1199  +        int nSz;
         1200  +        for(nSz=1; (i & (1 << (nSz-1)))==0; nSz++){
         1201  +          treeSortByShmid(&aSort[i - nSz*2], nSz, aSpace);
         1202  +        }
         1203  +      }
         1204  +
         1205  +      for(i=0; i<nSort-1; i++){
         1206  +        if( aSort[i].pShm ){
         1207  +          aSort[i].pShm->iNext = aSort[i+1].iLoc;
         1208  +        }
         1209  +      }
         1210  +      assert( aSort[nSort-1].iLoc==0 );
         1211  +
         1212  +      rc = treeCheckLinkedList(db);
         1213  +    }
         1214  +  }
         1215  +
         1216  +  return rc;
         1217  +}
         1218  +
         1219  +/*
         1220  +** This function is called as part of opening a write-transaction if the
         1221  +** writer-flag is already set - indicating that the previous writer 
         1222  +** failed before ending its transaction.
         1223  +*/
         1224  +int lsmTreeRepair(lsm_db *db){
         1225  +  int rc = LSM_OK;
         1226  +  TreeHeader hdr;
         1227  +  ShmHeader *pHdr = db->pShmhdr;
         1228  +
         1229  +  /* Ensure that the two tree-headers are consistent. Copy one over the other
         1230  +  ** if necessary. Prefer the data from a tree-header for which the checksum
         1231  +  ** computes. Or, if they both compute, prefer tree-header-1.  */
         1232  +  if( memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)) ){
         1233  +    if( treeHeaderChecksumOk(&pHdr->hdr1) ){
         1234  +      memcpy(&pHdr->hdr2, &pHdr->hdr1, sizeof(TreeHeader));
         1235  +    }else{
         1236  +      memcpy(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader));
         1237  +    }
         1238  +  }
         1239  +
         1240  +  /* Save the connections current copy of the tree-header. It will be 
         1241  +  ** restored before returning.  */
         1242  +  memcpy(&hdr, &db->treehdr, sizeof(TreeHeader));
         1243  +
         1244  +  /* Walk the tree. Zero any v2 pointers with a transaction-id greater than
         1245  +  ** the transaction-id currently in the tree-headers.  */
         1246  +  rc = treeRepairPtrs(db);
         1247  +
         1248  +  /* Repair the linked list of shared-memory chunks. */
         1249  +  if( rc==LSM_OK ){
         1250  +    rc = treeRepairList(db);
         1251  +  }
         1252  +
         1253  +  memcpy(&db->treehdr, &hdr, sizeof(TreeHeader));
         1254  +  return rc;
         1255  +}
   997   1256   
   998   1257   /*
   999   1258   ** Insert a new entry into the in-memory tree.
  1000   1259   **
  1001   1260   ** If the value of the 5th parameter, nVal, is negative, then a delete-marker
  1002   1261   ** is inserted into the tree. In this case the value pointer, pVal, must be
  1003   1262   ** NULL.
................................................................................
  1529   1788     pDb->treehdr.iRoot = pMark->iRoot;
  1530   1789     pDb->treehdr.nHeight = pMark->nHeight;
  1531   1790     pDb->treehdr.iWrite = pMark->iWrite;
  1532   1791     pDb->treehdr.nChunk = pMark->nChunk;
  1533   1792     pDb->treehdr.iNextShmid = pMark->iNextShmid;
  1534   1793   }
  1535   1794   
  1536         -static void treeHeaderChecksum(
  1537         -  TreeHeader *pHdr, 
  1538         -  u32 *aCksum
  1539         -){
  1540         -  u32 cksum1 = 0x12345678;
  1541         -  u32 cksum2 = 0x9ABCDEF0;
  1542         -  u32 *a = (u32 *)pHdr;
  1543         -  int i;
  1544         -
  1545         -  assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) );
  1546         -  assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 );
  1547         -
  1548         -  for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){
  1549         -    cksum1 += a[i];
  1550         -    cksum2 += (cksum1 + a[i+1]);
  1551         -  }
  1552         -  aCksum[0] = cksum1;
  1553         -  aCksum[1] = cksum2;
  1554         -}
  1555         -
  1556         -/*
  1557         -** Return true if the checksum stored in TreeHeader object *pHdr is 
  1558         -** consistent with the contents of its other fields.
  1559         -*/
  1560         -static int treeHeaderChecksumOk(TreeHeader *pHdr){
  1561         -  u32 aCksum[2];
  1562         -  treeHeaderChecksum(pHdr, aCksum);
  1563         -  return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum)));
  1564         -}
  1565         -
  1566   1795   /*
  1567   1796   ** Load the in-memory tree header from shared-memory into pDb->treehdr.
  1568   1797   ** If the header cannot be loaded, return LSM_BUSY.
  1569   1798   */
  1570   1799   int lsmTreeLoadHeader(lsm_db *pDb){
  1571   1800     while( 1 ){
  1572   1801       int rc;

Changes to www/lsm.wiki.

   362    362     <li> Snapshot 2.
   363    363     <li> The meta-page pointer. This value is either 1 or 2. It indicates which
   364    364          of the two meta-pages contains the most recent database snapshot.
   365    365     <li> READER lock values.
   366    366   </ul>
   367    367   
   368    368   <h2>Log file</h2>
          369  +
          370  +<a href=../src/lsm_log.c>lsm_log.c</a>.
   369    371   
   370    372   <h1>3. Database Recovery and Shutdown</h1>
   371    373   
   372    374   <p>
   373    375   Exclusive locks on locking region DMS1 are used to serialize all connect and
   374    376   disconnect operations. 
   375    377