Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Fix a problem causing read-locks to fail with LSM_BUSY. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | rework-flow-control |
Files: | files | file ages | folders |
SHA1: |
7eee90a0aaddc7b8d1503e5904aa1b69 |
User & Date: | dan 2012-09-25 18:27:39.612 |
Context
2012-09-25
| ||
19:13 | Fix bug in recycling of shared memory space. check-in: 156b93d03b user: dan tags: rework-flow-control | |
18:27 | Fix a problem causing read-locks to fail with LSM_BUSY. check-in: 7eee90a0aa user: dan tags: rework-flow-control | |
17:25 | Fix a race condition causing LSM to read inconsistent in-memory and on-disk databases. check-in: 1743941409 user: dan tags: rework-flow-control | |
Changes
Changes to src/lsm_shared.c.
︙ | ︙ | |||
657 658 659 660 661 662 663 | } /* ** Begin a read transaction. This function is a no-op if the connection ** passed as the only argument already has an open read transaction. */ int lsmBeginReadTrans(lsm_db *pDb){ | | | 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 | } /* ** Begin a read transaction. This function is a no-op if the connection ** passed as the only argument already has an open read transaction. */ int lsmBeginReadTrans(lsm_db *pDb){ const int MAX_READLOCK_ATTEMPTS = 10; int rc = LSM_OK; /* Return code */ int iAttempt = 0; assert( pDb->pWorker==0 ); assert( (pDb->pClient!=0)==(pDb->iReader>=0) ); while( rc==LSM_OK && pDb->pClient==0 && (iAttempt++)<MAX_READLOCK_ATTEMPTS ){ |
︙ | ︙ | |||
684 685 686 687 688 689 690 | /* Take a read-lock on the tree and snapshot just loaded. Then check ** that the shared-memory still contains the same values. If so, proceed. ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; u32 iShmMax = pDb->treehdr.iUsedShmid; | | | > > | 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 | /* Take a read-lock on the tree and snapshot just loaded. Then check ** that the shared-memory still contains the same values. If so, proceed. ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; u32 iShmMax = pDb->treehdr.iUsedShmid; u32 iShmMin = pDb->treehdr.iNextShmid+1-(1<<10); rc = lsmReadlock( pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax ); if( rc==LSM_OK ){ if( lsmTreeLoadHeaderOk(pDb, iTreehdr) && lsmCheckpointLoadOk(pDb, iSnap) ){ /* Read lock has been successfully obtained. Deserialize the ** checkpoint just loaded. TODO: This will be removed after ** lsm_sorted.c is changed to work directly from the serialized ** version of the snapshot. */ rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient); assert( (rc==LSM_OK)==(pDb->pClient!=0) ); assert( pDb->iReader>=0 ); }else{ rc = lsmReleaseReadlock(pDb); } } if( rc==LSM_BUSY ){ rc = LSM_OK; } } #if 0 if( rc==LSM_OK && pDb->pClient ){ printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n", (void *)pDb, (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, (int)pDb->treehdr.root.iTransId, |
︙ | ︙ | |||
841 842 843 844 845 846 847 848 849 | /* ** Obtain a read-lock on database version identified by the combination ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or ** an LSM error code otherwise. */ int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){ ShmHeader *pShm = db->pShmhdr; int i; | > < | 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 | /* ** Obtain a read-lock on database version identified by the combination ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or ** an LSM error code otherwise. */ int lsmReadlock(lsm_db *db, i64 iLsm, u32 iShmMin, u32 iShmMax){ int rc = LSM_OK; ShmHeader *pShm = db->pShmhdr; int i; assert( db->iReader<0 ); assert( shm_sequence_ge(iShmMax, iShmMin) ); /* Search for an exact match. */ for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){ ShmReader *p = &pShm->aReader[i]; |
︙ | ︙ | |||
872 873 874 875 876 877 878 879 880 881 882 883 884 885 | if( rc==LSM_BUSY ){ rc = LSM_OK; }else{ ShmReader *p = &pShm->aReader[i]; p->iLsmId = iLsm; p->iTreeId = iShmMax; rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0); if( rc==LSM_OK ) db->iReader = i; } } /* Search for any usable slot */ for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){ ShmReader *p = &pShm->aReader[i]; | > | | | 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 | if( rc==LSM_BUSY ){ rc = LSM_OK; }else{ ShmReader *p = &pShm->aReader[i]; p->iLsmId = iLsm; p->iTreeId = iShmMax; rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0); assert( rc!=LSM_BUSY ); if( rc==LSM_OK ) db->iReader = i; } } /* Search for any usable slot */ for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){ ShmReader *p = &pShm->aReader[i]; if( slotIsUsable(p, iLsm, iShmMin, iShmMax) ){ rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0); if( rc==LSM_OK && slotIsUsable(p, iLsm, iShmMin, iShmMax) ){ db->iReader = i; }else if( rc==LSM_BUSY ){ rc = LSM_OK; } } } |
︙ | ︙ |