Index: src/lsmInt.h ================================================================== --- src/lsmInt.h +++ src/lsmInt.h @@ -503,10 +503,11 @@ */ int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree); void lsmTreeRelease(lsm_env *, Tree *); void lsmTreeClear(lsm_db *); int lsmTreeInit(lsm_db *); +int lsmTreeRepair(lsm_db *); int lsmTreeSize(lsm_db *); int lsmTreeEndTransaction(lsm_db *pDb, int bCommit); int lsmTreeBeginTransaction(lsm_db *pDb); int lsmTreeLoadHeader(lsm_db *pDb); @@ -525,10 +526,11 @@ void lsmTreeCursorReset(TreeCursor *pCsr); int lsmTreeCursorKey(TreeCursor *pCsr, void **ppKey, int *pnKey); int lsmTreeCursorValue(TreeCursor *pCsr, void **ppVal, int *pnVal); int lsmTreeCursorValid(TreeCursor *pCsr); int lsmTreeCursorSave(TreeCursor *pCsr); + /* ** Functions from file "mem.c". */ int lsmPoolNew(lsm_env *pEnv, Mempool **ppPool); Index: src/lsm_shared.c ================================================================== --- src/lsm_shared.c +++ src/lsm_shared.c @@ -661,10 +661,11 @@ ** checkpoint just loaded. TODO: This will be removed after ** lsm_sorted.c is changed to work directly from the serialized ** version of the snapshot. */ rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient); assert( (rc==LSM_OK)==(pDb->pClient!=0) ); + assert( pDb->iReader>=0 ); }else{ rc = lsmReleaseReadlock(pDb); } } if( rc==LSM_BUSY ) rc = LSM_OK; @@ -713,13 +714,12 @@ rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0); } /* If the previous writer failed mid-transaction, run emergency rollback. */ if( rc==LSM_OK && pShm->bWriter ){ - /* TODO: This! */ - assert( 0 ); - rc = LSM_CORRUPT_BKPT; + rc = lsmTreeRepair(pDb); + if( rc==LSM_OK ) pShm->bWriter = 0; } /* Check that this connection is currently reading from the most recent ** version of the database. If not, return LSM_BUSY. */ if( rc==LSM_OK && memcmp(&pShm->hdr1, &pDb->treehdr, sizeof(TreeHeader)) ){ @@ -897,10 +897,11 @@ ** Release the read-lock currently held by connection db. */ int lsmReleaseReadlock(lsm_db *db){ int rc = LSM_OK; if( db->iReader>=0 ){ + assert( db->pClient==0 ); rc = lsmShmLock(db, LSM_LOCK_READER(db->iReader), LSM_LOCK_UNLOCK, 0); db->iReader = -1; } return rc; } Index: src/lsm_tree.c ================================================================== --- src/lsm_tree.c +++ src/lsm_tree.c @@ -992,10 +992,269 @@ pOne->iNext = 0; pOne->iShmid = 1; } return rc; } + +static void treeHeaderChecksum( + TreeHeader *pHdr, + u32 *aCksum +){ + u32 cksum1 = 0x12345678; + u32 cksum2 = 0x9ABCDEF0; + u32 *a = (u32 *)pHdr; + int i; + + assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) ); + assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 ); + + for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){ + cksum1 += a[i]; + cksum2 += (cksum1 + a[i+1]); + } + aCksum[0] = cksum1; + aCksum[1] = cksum2; +} + +/* +** Return true if the checksum stored in TreeHeader object *pHdr is +** consistent with the contents of its other fields. +*/ +static int treeHeaderChecksumOk(TreeHeader *pHdr){ + u32 aCksum[2]; + treeHeaderChecksum(pHdr, aCksum); + return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum))); +} + +/* +** This type is used by functions lsmTreeRepair() and treeSortByShmid() to +** make relinking the linked list of shared-memory chunks easier. +*/ +typedef struct ShmChunkLoc ShmChunkLoc; +struct ShmChunkLoc { + ShmChunk *pShm; + u32 iLoc; +}; + +/* +** The array aShm[] is of size (nSz*2) elements. The first and second nSz +** elements are both sorted in order of iShmid. This function merges the two +** arrays and writes the sorted results over the top of aShm[]. +** +** Argument aSpace[] points to an array of at least (nSz*2) elements that +** can be used as temporary storage space while sorting. +*/ +static void treeSortByShmid(ShmChunkLoc *aShm1, int nSz, ShmChunkLoc *aSpace){ + ShmChunkLoc *aShm2 = &aShm1[nSz]; + int i1 = 0; + int i2 = 0; + int iOut = 0; + + while( i1iShmid, aShm2[i2].pShm->iShmid) ){ + aSpace[iOut] = aShm2[i2++]; + }else{ + aSpace[iOut] = aShm1[i1++]; + } + } + iOut++; + } + + memcpy(aShm1, aSpace, sizeof(ShmChunk *) * nSz*2); +} + +/* +** This function checks that the linked list of shared memory chunks +** that starts at chunk db->treehdr.iFirst: +** +** 1) Includes all chunks in the shared-memory region, and +** 2) Links them together in order of ascending shm-id. +** +** If no error occurs and the conditions above are met, LSM_OK is returned. +** +** If either of the conditions are untrue, LSM_CORRUPT is returned. Or, if +** an error is encountered before the checks are completed, another LSM error +** code (i.e. LSM_IOERR or LSM_NOMEM) may be returned. +*/ +static int treeCheckLinkedList(lsm_db *db){ + int rc = LSM_OK; + int nVisit = 0; + u32 iShmid; + ShmChunk *p; + + p = treeShmChunkRc(db, db->treehdr.iFirst, &rc); + iShmid = p->iShmid; + while( rc==LSM_OK && p ){ + if( p->iNext ){ + ShmChunk *pNext = treeShmChunkRc(db, p->iNext, &rc); + if( rc==LSM_OK ){ + if( pNext->iShmid!=p->iShmid+1 ){ + rc = LSM_CORRUPT_BKPT; + } + p = pNext; + } + }else{ + p = 0; + } + nVisit++; + } + + if( rc==LSM_OK && nVisit!=db->treehdr.nChunk-1 ){ + rc = LSM_CORRUPT_BKPT; + } + return rc; +} + +/* +** Iterate through the current in-memory tree. If there are any v2-pointers +** with transaction ids larger than db->treehdr.iTransId, zero them. +*/ +static int treeRepairPtrs(lsm_db *db){ + int rc = LSM_OK; + + if( db->treehdr.nHeight>1 ){ + TreeCursor csr; /* Cursor used to iterate through tree */ + u32 iTransId = db->treehdr.iTransId; + + /* Initialize the cursor structure. Also decrement the nHeight variable + ** in the tree-header. This will prevent the cursor from visiting any + ** leaf nodes. */ + db->treehdr.nHeight--; + treeCursorInit(db, &csr); + + rc = lsmTreeCursorEnd(&csr, 0); + while( rc==LSM_OK && lsmTreeCursorValid(&csr) ){ + TreeNode *pNode = csr.apTreeNode[csr.iNode]; + if( pNode->iV2>iTransId ){ + pNode->iV2Child = 0; + pNode->iV2Ptr = 0; + pNode->iV2 = 0; + } + rc = lsmTreeCursorNext(&csr); + } + + db->treehdr.nHeight++; + } + + return rc; +} + +static int treeRepairList(lsm_db *db){ + int rc = LSM_OK; + int i; + ShmChunk *p; + ShmChunk *pMin = 0; + u32 iMin = 0; + + /* Iterate through all shm chunks. Find the smallest shm-id present in + ** the shared-memory region. */ + for(i=1; rc==LSM_OK && itreehdr.nChunk; i++){ + p = treeShmChunkRc(db, i, &rc); + if( p && (pMin==0 || shm_sequence_ge(pMin->iShmid, p->iShmid)) ){ + pMin = p; + iMin = i; + } + } + + /* Fix the shm-id values on any chunks with a shm-id greater than or + ** equal to treehdr.iNextShmid. Then do a merge-sort of all chunks to + ** fix the ShmChunk.iNext pointers. + */ + if( rc==LSM_OK ){ + int nSort; + int nByte; + ShmChunkLoc *aSort; + + /* Allocate space for a merge sort. */ + nSort = 1; + while( nSort < (db->treehdr.nChunk-1) ) nSort = nSort * 2; + nByte = sizeof(ShmChunkLoc) * nSort * 2; + aSort = lsmMallocZeroRc(db->pEnv, nByte, &rc); + + /* Fix all shm-ids, if required. */ + if( rc==LSM_OK && iMin!=db->treehdr.iFirst ){ + u32 iPrevShmid = pMin->iShmid-1; + for(i=1; itreehdr.nChunk; i++){ + p = treeShmChunk(db, i); + aSort[i-1].pShm = p; + aSort[i-1].iLoc = i; + if( i!=db->treehdr.iFirst ){ + if( shm_sequence_ge(p->iShmid, db->treehdr.iNextShmid) ){ + p->iShmid = iPrevShmid--; + } + } + } + p = treeShmChunk(db, db->treehdr.iFirst); + p->iShmid = iPrevShmid; + } + + if( rc==LSM_OK ){ + ShmChunkLoc *aSpace = &aSort[nSort]; + for(i=2; i<=nSort; i+=2){ + int nSz; + for(nSz=1; (i & (1 << (nSz-1)))==0; nSz++){ + treeSortByShmid(&aSort[i - nSz*2], nSz, aSpace); + } + } + + for(i=0; iiNext = aSort[i+1].iLoc; + } + } + assert( aSort[nSort-1].iLoc==0 ); + + rc = treeCheckLinkedList(db); + } + } + + return rc; +} + +/* +** This function is called as part of opening a write-transaction if the +** writer-flag is already set - indicating that the previous writer +** failed before ending its transaction. +*/ +int lsmTreeRepair(lsm_db *db){ + int rc = LSM_OK; + TreeHeader hdr; + ShmHeader *pHdr = db->pShmhdr; + + /* Ensure that the two tree-headers are consistent. Copy one over the other + ** if necessary. Prefer the data from a tree-header for which the checksum + ** computes. Or, if they both compute, prefer tree-header-1. */ + if( memcmp(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)) ){ + if( treeHeaderChecksumOk(&pHdr->hdr1) ){ + memcpy(&pHdr->hdr2, &pHdr->hdr1, sizeof(TreeHeader)); + }else{ + memcpy(&pHdr->hdr1, &pHdr->hdr2, sizeof(TreeHeader)); + } + } + + /* Save the connections current copy of the tree-header. It will be + ** restored before returning. */ + memcpy(&hdr, &db->treehdr, sizeof(TreeHeader)); + + /* Walk the tree. Zero any v2 pointers with a transaction-id greater than + ** the transaction-id currently in the tree-headers. */ + rc = treeRepairPtrs(db); + + /* Repair the linked list of shared-memory chunks. */ + if( rc==LSM_OK ){ + rc = treeRepairList(db); + } + + memcpy(&db->treehdr, &hdr, sizeof(TreeHeader)); + return rc; +} /* ** Insert a new entry into the in-memory tree. ** ** If the value of the 5th parameter, nVal, is negative, then a delete-marker @@ -1531,40 +1790,10 @@ pDb->treehdr.iWrite = pMark->iWrite; pDb->treehdr.nChunk = pMark->nChunk; pDb->treehdr.iNextShmid = pMark->iNextShmid; } -static void treeHeaderChecksum( - TreeHeader *pHdr, - u32 *aCksum -){ - u32 cksum1 = 0x12345678; - u32 cksum2 = 0x9ABCDEF0; - u32 *a = (u32 *)pHdr; - int i; - - assert( (offsetof(TreeHeader, aCksum) + sizeof(u32)*2)==sizeof(TreeHeader) ); - assert( (sizeof(TreeHeader) % (sizeof(u32)*2))==0 ); - - for(i=0; i<(offsetof(TreeHeader, aCksum) / sizeof(u32)); i+=2){ - cksum1 += a[i]; - cksum2 += (cksum1 + a[i+1]); - } - aCksum[0] = cksum1; - aCksum[1] = cksum2; -} - -/* -** Return true if the checksum stored in TreeHeader object *pHdr is -** consistent with the contents of its other fields. -*/ -static int treeHeaderChecksumOk(TreeHeader *pHdr){ - u32 aCksum[2]; - treeHeaderChecksum(pHdr, aCksum); - return (0==memcmp(aCksum, pHdr->aCksum, sizeof(aCksum))); -} - /* ** Load the in-memory tree header from shared-memory into pDb->treehdr. ** If the header cannot be loaded, return LSM_BUSY. */ int lsmTreeLoadHeader(lsm_db *pDb){ Index: www/lsm.wiki ================================================================== --- www/lsm.wiki +++ www/lsm.wiki @@ -364,10 +364,12 @@ of the two meta-pages contains the most recent database snapshot.
  • READER lock values.

    Log file

    + +lsm_log.c.

    3. Database Recovery and Shutdown

    Exclusive locks on locking region DMS1 are used to serialize all connect and