Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Fix a race condition causing LSM to read inconsistent in-memory and on-disk databases. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | rework-flow-control |
Files: | files | file ages | folders |
SHA1: |
1743941409535a3520de9361051a9c20 |
User & Date: | dan 2012-09-25 17:25:48.429 |
Context
2012-09-25
| ||
18:27 | Fix a problem causing read-locks to fail with LSM_BUSY. check-in: 7eee90a0aa user: dan tags: rework-flow-control | |
17:25 | Fix a race condition causing LSM to read inconsistent in-memory and on-disk databases. check-in: 1743941409 user: dan tags: rework-flow-control | |
14:50 | Fix a mmap-mode bug. check-in: be1e513090 user: dan tags: rework-flow-control | |
Changes
Changes to lsm-test/lsmtest_tdb3.c.
︙ | ︙ | |||
1077 1078 1079 1080 1081 1082 1083 | p->lsm_work_flags = flags; p->lsm_work_npage = nPage; p->bCkpt = bCkpt; p->pDb = pDb; /* Open the worker connection */ if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker); | > | > > > | 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 | p->lsm_work_flags = flags; p->lsm_work_npage = nPage; p->bCkpt = bCkpt; p->pDb = pDb; /* Open the worker connection */ if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker); if( zCfg ){ test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0); } if( rc==0 ) rc = lsm_open(p->pWorker, zFilename); #if 0 lsm_config_log(p->pWorker, xLog, (void *)"worker"); #endif /* Configure the work-hook */ if( rc==0 ){ lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb); } /* Kick off the worker thread. */ |
︙ | ︙ |
Changes to src/lsmInt.h.
︙ | ︙ | |||
759 760 761 762 763 764 765 | int lsmBeginFlush(lsm_db *); int lsmBeginWork(lsm_db *); void lsmFinishWork(lsm_db *, int, int, int *); int lsmFinishRecovery(lsm_db *); void lsmFinishReadTrans(lsm_db *); | | | 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 | int lsmBeginFlush(lsm_db *); int lsmBeginWork(lsm_db *); void lsmFinishWork(lsm_db *, int, int, int *); int lsmFinishRecovery(lsm_db *); void lsmFinishReadTrans(lsm_db *); int lsmFinishWriteTrans(lsm_db *, int, int); int lsmFinishFlush(lsm_db *, int); int lsmSnapshotSetFreelist(lsm_db *, int *, int); Snapshot *lsmDbSnapshotClient(lsm_db *); Snapshot *lsmDbSnapshotWorker(lsm_db *); |
︙ | ︙ |
Changes to src/lsm_ckpt.c.
︙ | ︙ | |||
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 | int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){ Snapshot *pSnap = pDb->pWorker; ShmHeader *pShm = pDb->pShmhdr; void *p = 0; int n = 0; int rc; rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); memcpy(pShm->aSnap2, p, n); lsmShmBarrier(pDb); | > > > > > > | 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 | int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){ Snapshot *pSnap = pDb->pWorker; ShmHeader *pShm = pDb->pShmhdr; void *p = 0; int n = 0; int rc; #if 0 if( bFlush ){ printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1); fflush(stdout); } #endif rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); memcpy(pShm->aSnap2, p, n); lsmShmBarrier(pDb); |
︙ | ︙ |
Changes to src/lsm_main.c.
︙ | ︙ | |||
701 702 703 704 705 706 707 708 709 710 711 712 713 714 | assert_db_state( pDb ); /* A value less than zero means close the innermost nested transaction. */ if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1); if( iLevel<pDb->nTransOpen ){ if( iLevel==0 ){ /* Commit the transaction to disk. */ if( rc==LSM_OK ) rc = lsmLogCommit(pDb); if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){ rc = lsmFsSyncLog(pDb->pFS); } if( rc==LSM_OK && lsmTreeSize(pDb)>pDb->nTreeLimit ){ | > < | | < | 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 | assert_db_state( pDb ); /* A value less than zero means close the innermost nested transaction. */ if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1); if( iLevel<pDb->nTransOpen ){ if( iLevel==0 ){ int bAutowork = 0; /* Commit the transaction to disk. */ if( rc==LSM_OK ) rc = lsmLogCommit(pDb); if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){ rc = lsmFsSyncLog(pDb->pFS); } if( rc==LSM_OK && lsmTreeSize(pDb)>pDb->nTreeLimit ){ lsmTreeMakeOld(pDb); bAutowork = pDb->bAutowork; } lsmFinishWriteTrans(pDb, (rc==LSM_OK), bAutowork); } pDb->nTransOpen = iLevel; } dbReleaseClientSnapshot(pDb); return rc; } int lsm_rollback(lsm_db *pDb, int iLevel){ int rc = LSM_OK; |
︙ | ︙ | |||
737 738 739 740 741 742 743 | TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)]; lsmTreeRollback(pDb, &pMark->tree); if( iLevel ) lsmLogSeek(pDb, &pMark->log); pDb->nTransOpen = iLevel; } if( pDb->nTransOpen==0 ){ | | | 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 | TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)]; lsmTreeRollback(pDb, &pMark->tree); if( iLevel ) lsmLogSeek(pDb, &pMark->log); pDb->nTransOpen = iLevel; } if( pDb->nTransOpen==0 ){ lsmFinishWriteTrans(pDb, 0, 0); } dbReleaseClientSnapshot(pDb); } return rc; } |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
705 706 707 708 709 710 711 712 713 714 715 716 717 718 | assert( pDb->iReader>=0 ); }else{ rc = lsmReleaseReadlock(pDb); } } if( rc==LSM_BUSY ) rc = LSM_OK; } } if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY; return rc; } /* | > > > > > > > > > > > | 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 | assert( pDb->iReader>=0 ); }else{ rc = lsmReleaseReadlock(pDb); } } if( rc==LSM_BUSY ) rc = LSM_OK; } #if 0 if( rc==LSM_OK && pDb->pClient ){ printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n", (void *)pDb, (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, (int)pDb->treehdr.root.iTransId, (int)pDb->treehdr.iOldShmid ); fflush(stdout); } #endif } if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY; return rc; } /* |
︙ | ︙ | |||
795 796 797 798 799 800 801 | ** written into the log file when this function is called. Or, if the ** transaction was rolled back, both the log file and in-memory tree ** structure have already been restored. In either case, this function ** merely releases locks and other resources held by the write-transaction. ** ** LSM_OK is returned if successful, or an LSM error code otherwise. */ | | > > > > | | 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 | ** written into the log file when this function is called. Or, if the ** transaction was rolled back, both the log file and in-memory tree ** structure have already been restored. In either case, this function ** merely releases locks and other resources held by the write-transaction. ** ** LSM_OK is returned if successful, or an LSM error code otherwise. */ int lsmFinishWriteTrans(lsm_db *pDb, int bCommit, int nAutowork){ int rc = LSM_OK; lsmLogEnd(pDb, bCommit); lsmTreeEndTransaction(pDb, bCommit); if( nAutowork ){ rc = lsmSortedAutoWork(pDb, nAutowork); } lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0); return rc; } /* ** Return non-zero if the caller is holding the client mutex. */ #ifdef LSM_DEBUG |
︙ | ︙ | |||
874 875 876 877 878 879 880 881 882 883 884 885 886 887 | db->iReader = i; }else if( rc==LSM_BUSY ){ rc = LSM_OK; } } } return rc; } /* ** This is used to check if there exists a read-lock locking a particular ** version of either the in-memory tree or database file. ** | > > > | 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 | db->iReader = i; }else if( rc==LSM_BUSY ){ rc = LSM_OK; } } } if( rc==LSM_OK && db->iReader<0 ){ rc = LSM_BUSY; } return rc; } /* ** This is used to check if there exists a read-lock locking a particular ** version of either the in-memory tree or database file. ** |
︙ | ︙ |