Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Change locks to use 32-bit shm-sequence-ids instead of 64-bit tree-ids. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
acf38e32c81b8d3fc643c53fe2fa5d84 |
User & Date: | dan 2012-09-10 20:07:54.155 |
Context
2012-09-11
| ||
11:47 | Fix a memory leak in lsm_unix.c. check-in: bf4758ab15 user: dan tags: trunk | |
2012-09-10
| ||
20:07 | Change locks to use 32-bit shm-sequence-ids instead of 64-bit tree-ids. check-in: acf38e32c8 user: dan tags: trunk | |
18:39 | Change the way version races between the in-memory tree and lsm are handled in lsm.wiki. check-in: e12aa10a7c user: dan tags: trunk | |
Changes
Changes to src/lsmInt.h.
︙ | ︙ | |||
126 127 128 129 130 131 132 | #define array_size(x) (sizeof(x)/sizeof(x[0])) /* The size of each shared-memory chunk */ #define LSM_SHM_CHUNK_SIZE (32*1024) /* The number of bytes reserved at the start of each shm chunk for MM. */ | | | 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | #define array_size(x) (sizeof(x)/sizeof(x[0])) /* The size of each shared-memory chunk */ #define LSM_SHM_CHUNK_SIZE (32*1024) /* The number of bytes reserved at the start of each shm chunk for MM. */ #define LSM_SHM_CHUNK_HDR (sizeof(ShmChunk)) /* The number of available read locks. */ #define LSM_LOCK_NREADER 6 /* Lock definitions */ #define LSM_LOCK_DMS1 1 #define LSM_LOCK_DMS2 2 |
︙ | ︙ | |||
229 230 231 232 233 234 235 | LogRegion aRegion[3]; /* Log file regions (see docs in lsm_log.c) */ }; /* ** Tree header structure. */ struct TreeHeader { | > > | > < < | 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | LogRegion aRegion[3]; /* Log file regions (see docs in lsm_log.c) */ }; /* ** Tree header structure. */ struct TreeHeader { u32 iUsedShmid; /* Id of first shm chunk used by this tree */ u32 iNextShmid; /* Shm-id of next chunk allocated */ u32 iFirst; /* Chunk number of smallest shm-id */ u32 iTransId; /* Current transaction id */ u32 nChunk; /* Number of chunks in shared-memory file */ u32 iRoot; /* Offset of root node in shm file */ u32 nHeight; /* Current height of tree structure */ u32 iWrite; /* Write offset in shm file */ u32 nByte; /* Size of current tree structure in bytes */ DbLog log; /* Current layout of log file */ u32 aCksum[2]; /* Checksums 1 and 2. */ }; /* ** Database handle structure. |
︙ | ︙ | |||
376 377 378 379 380 381 382 | */ #define segmentHasSeparators(pSegment) ((pSegment)->sep.iFirst>0) /* ** The values that accompany the lock held by a database reader. */ struct ShmReader { | | | 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 | */ #define segmentHasSeparators(pSegment) ((pSegment)->sep.iFirst>0) /* ** The values that accompany the lock held by a database reader. */ struct ShmReader { u32 iTreeId; i64 iLsmId; }; /* ** An instance of this structure is stored in the first shared-memory ** page. The shared-memory header. ** |
︙ | ︙ | |||
415 416 417 418 419 420 421 | }; /* ** An instance of this structure is stored at the start of each shared-memory ** chunk except the first (which is the header chunk - see above). */ struct ShmChunk { | < | > > > | 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 | }; /* ** An instance of this structure is stored at the start of each shared-memory ** chunk except the first (which is the header chunk - see above). */ struct ShmChunk { u32 iShmid; u32 iNext; }; /* Return true if shm-sequence "a" is larger than or equal to "b" */ #define shm_sequence_ge(a, b) (((u32)a-(u32)b) < (1<<30)) #define LSM_APPLIST_SZ 4 typedef struct Freelist Freelist; typedef struct FreelistEntry FreelistEntry; /* |
︙ | ︙ | |||
495 496 497 498 499 500 501 | /* ** Functions from file "lsm_tree.c". */ int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree); void lsmTreeRelease(lsm_env *, Tree *); void lsmTreeClear(lsm_db *); | | | 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 | /* ** Functions from file "lsm_tree.c". */ int lsmTreeNew(lsm_env *, int (*)(void *, int, void *, int), Tree **ppTree); void lsmTreeRelease(lsm_env *, Tree *); void lsmTreeClear(lsm_db *); int lsmTreeInit(lsm_db *); int lsmTreeSize(lsm_db *); int lsmTreeEndTransaction(lsm_db *pDb, int bCommit); int lsmTreeBeginTransaction(lsm_db *pDb); int lsmTreeLoadHeader(lsm_db *pDb); int lsmTreeInsert(lsm_db *pDb, void *pKey, int nKey, void *pVal, int nVal); |
︙ | ︙ | |||
782 783 784 785 786 787 788 | #ifdef LSM_DEBUG void lsmShmHasLock(lsm_db *db, int iLock, int eOp); #else # define lsmShmHasLock(x,y,z) #endif | | | 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 | #ifdef LSM_DEBUG void lsmShmHasLock(lsm_db *db, int iLock, int eOp); #else # define lsmShmHasLock(x,y,z) #endif int lsmReadlock(lsm_db *, i64 iLsm, u32 iTree); int lsmReleaseReadlock(lsm_db *); int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse); int lsmTreeInUse(lsm_db *db, u32 iLsmId, int *pbInUse); int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId); int lsmDbMultiProc(lsm_db *); |
︙ | ︙ |
Changes to src/lsm_log.c.
︙ | ︙ | |||
894 895 896 897 898 899 900 | int iPass; int nJump = 0; /* Number of LSM_LOG_JUMP records in pass 0 */ DbLog *pLog; rc = lsmFsOpenLog(pDb->pFS); if( rc!=LSM_OK ) return rc; | | > > | 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 | int iPass; int nJump = 0; /* Number of LSM_LOG_JUMP records in pass 0 */ DbLog *pLog; rc = lsmFsOpenLog(pDb->pFS); if( rc!=LSM_OK ) return rc; rc = lsmTreeInit(pDb); if( rc!=LSM_OK ) return rc; pLog = &pDb->treehdr.log; lsmCheckpointLogoffset(pDb->pShmhdr->aWorker, pLog); logReaderInit(pDb, pLog, 1, &reader); lsmStringInit(&buf1, pDb->pEnv); lsmStringInit(&buf2, pDb->pEnv); |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
338 339 340 341 342 343 344 | } } LsmFile *lsmDbRecycleFd(lsm_db *db){ LsmFile *pRet; Database *p = db->pDatabase; lsmMutexEnter(db->pEnv, p->pClientMutex); | | | 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 | } } LsmFile *lsmDbRecycleFd(lsm_db *db){ LsmFile *pRet; Database *p = db->pDatabase; lsmMutexEnter(db->pEnv, p->pClientMutex); if( (pRet = p->pLsmFile)!=0 ){ p->pLsmFile = pRet->pNext; } lsmMutexLeave(db->pEnv, p->pClientMutex); return pRet; } /* |
︙ | ︙ | |||
657 658 659 660 661 662 663 | /* Take a read-lock on the tree and snapshot just loaded. Then check ** that the shared-memory still contains the same values. If so, proceed. ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; | | | < | | | 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 | /* Take a read-lock on the tree and snapshot just loaded. Then check ** that the shared-memory still contains the same values. If so, proceed. ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; u32 iShmchunk = pDb->treehdr.iUsedShmid; i64 iSnap = lsmCheckpointId(pDb->aSnapshot, 0); rc = lsmReadlock(pDb, iSnap, iShmchunk); if( rc==LSM_OK ){ if( 0==memcmp(pShm->hdr1.aCksum, pDb->treehdr.aCksum, sizeof(u32)*2) && iSnap==lsmCheckpointId(pShm->aClient, 0) ){ /* Read lock has been successfully obtained. Deserialize the ** checkpoint just loaded. TODO: This will be removed after ** lsm_sorted.c is changed to work directly from the serialized ** version of the snapshot. */ rc = lsmCheckpointDeserialize(pDb, 0, pDb->aSnapshot, &pDb->pClient); assert( (rc==LSM_OK)==(pDb->pClient!=0) ); |
︙ | ︙ | |||
786 787 788 789 790 791 792 | #endif /* ** Obtain a read-lock on database version identified by the combination ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or ** an LSM error code otherwise. */ | | | 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 | #endif /* ** Obtain a read-lock on database version identified by the combination ** of snapshot iLsm and tree iTree. Return LSM_OK if successful, or ** an LSM error code otherwise. */ int lsmReadlock(lsm_db *db, i64 iLsm, u32 iTree){ ShmHeader *pShm = db->pShmhdr; int i; int rc = LSM_OK; assert( db->iReader<0 ); /* Search for an exact match. */ |
︙ | ︙ | |||
824 825 826 827 828 829 830 | if( rc==LSM_OK ) db->iReader = i; } } /* Search for any usable slot */ for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){ ShmReader *p = &pShm->aReader[i]; | | | > > > > > > > > > > > | | > > > | | | | > | | | | 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 | if( rc==LSM_OK ) db->iReader = i; } } /* Search for any usable slot */ for(i=0; db->iReader<0 && rc==LSM_OK && i<LSM_LOCK_NREADER; i++){ ShmReader *p = &pShm->aReader[i]; if( p->iLsmId && p->iLsmId<=iLsm && shm_sequence_ge(iTree, p->iTreeId) ){ rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_SHARED, 0); if( rc==LSM_OK ){ if( p->iLsmId && p->iLsmId<=iLsm && shm_sequence_ge(iTree,p->iTreeId) ){ db->iReader = i; } }else if( rc==LSM_BUSY ){ rc = LSM_OK; } } } return rc; } /* ** This is used to check if there exists a read-lock locking a particular ** version of either the in-memory tree or database file. ** ** If iLsmId is non-zero, then it is a snapshot id. If there exists a ** read-lock using this snapshot or newer, set *pbInUse to true. Or, ** if there is no such read-lock, set it to false. ** ** Or, if iLsmId is zero, then iShmid is a shared-memory sequence id. ** Search for a read-lock using this sequence id or newer. etc. */ static int isInUse(lsm_db *db, i64 iLsmId, u32 iShmid, int *pbInUse){ ShmHeader *pShm = db->pShmhdr; int i; int rc = LSM_OK; for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){ ShmReader *p = &pShm->aReader[i]; if( p->iLsmId ){ if( (iLsmId!=0 && iLsmId>=p->iLsmId) || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid)) ){ rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0); if( rc==LSM_OK ){ p->iLsmId = 0; lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0); } } } } if( rc==LSM_BUSY ){ *pbInUse = 1; return LSM_OK; } *pbInUse = 0; return rc; } int lsmTreeInUse(lsm_db *db, u32 iShmid, int *pbInUse){ if( db->treehdr.iUsedShmid==iShmid ){ *pbInUse = 1; return LSM_OK; } return isInUse(db, 0, iShmid, pbInUse); } int lsmLsmInUse(lsm_db *db, i64 iLsmId, int *pbInUse){ if( db->pClient && db->pClient->iId<=iLsmId ){ *pbInUse = 1; return LSM_OK; } |
︙ | ︙ |
Changes to src/lsm_tree.c.
︙ | ︙ | |||
270 271 272 273 274 275 276 277 278 279 280 281 282 283 | return 0; } static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){ int rcdummy = LSM_OK; return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy); } /* Values for the third argument to treeShmkey(). */ #define TK_LOADKEY 1 #define TK_LOADVAL 2 static TreeKey *treeShmkey( lsm_db *pDb, /* Database handle */ | > > > > | 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | return 0; } static ShmChunk * treeShmChunk(lsm_db *pDb, int iChunk){ int rcdummy = LSM_OK; return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, &rcdummy); } static ShmChunk * treeShmChunkRc(lsm_db *pDb, int iChunk, int *pRc){ return (ShmChunk *)treeShmptr(pDb, iChunk*LSM_SHM_CHUNK_SIZE, pRc); } /* Values for the third argument to treeShmkey(). */ #define TK_LOADKEY 1 #define TK_LOADVAL 2 static TreeKey *treeShmkey( lsm_db *pDb, /* Database handle */ |
︙ | ︙ | |||
529 530 531 532 533 534 535 536 | assert( iWrite ); iChunk = treeOffsetToChunk(iWrite-1); iEof = (iChunk+1) * CHUNK_SIZE; assert( iEof>=iWrite && (iEof-iWrite)<CHUNK_SIZE ); if( (iWrite+nByte)>iEof ){ ShmChunk *pHdr; /* Header of chunk just finished (iChunk) */ ShmChunk *pFirst; /* Header of chunk treehdr.iFirst */ int iNext = 0; /* Next chunk */ | > | > > > | < | < < < > > > > > > > > > > < | 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 | assert( iWrite ); iChunk = treeOffsetToChunk(iWrite-1); iEof = (iChunk+1) * CHUNK_SIZE; assert( iEof>=iWrite && (iEof-iWrite)<CHUNK_SIZE ); if( (iWrite+nByte)>iEof ){ ShmChunk *pHdr; /* Header of chunk just finished (iChunk) */ ShmChunk *pFirst; /* Header of chunk treehdr.iFirst */ ShmChunk *pNext; /* Header of new chunk */ int iNext = 0; /* Next chunk */ int rc = LSM_OK; pFirst = treeShmChunk(pDb, pDb->treehdr.iFirst); /* Check if the chunk at the start of the linked list is still in ** use. If not, reuse it. If so, allocate a new chunk by appending ** to the *-shm file. */ assert( shm_sequence_ge(pDb->treehdr.iUsedShmid, pFirst->iShmid) ); if( pDb->treehdr.iUsedShmid!=pFirst->iShmid ){ int bInUse; rc = lsmTreeInUse(pDb, pFirst->iShmid, &bInUse); if( rc!=LSM_OK ){ *pRc = rc; return 0; } if( bInUse==0 ){ iNext = pDb->treehdr.iFirst; pDb->treehdr.iFirst = pFirst->iNext; assert( pDb->treehdr.iFirst ); } } if( iNext==0 ) iNext = pDb->treehdr.nChunk++; /* Set the header values for the new chunk */ pNext = treeShmChunkRc(pDb, iNext, &rc); if( pNext ){ pNext->iNext = 0; pNext->iShmid = (pDb->treehdr.iNextShmid++); }else{ *pRc = rc; return 0; } /* Set the header values for the chunk just finished */ pHdr = (ShmChunk *)treeShmptr(pDb, iChunk*CHUNK_SIZE, pRc); pHdr->iNext = iNext; /* Advance to the next chunk */ iWrite = iNext * CHUNK_SIZE + CHUNK_HDR; } /* Allocate space at iWrite. */ |
︙ | ︙ | |||
945 946 947 948 949 950 951 | return rc; } /* ** Empty the contents of the in-memory tree. */ void lsmTreeClear(lsm_db *pDb){ | < | > > > | > > > > > > > > < < | 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 | return rc; } /* ** Empty the contents of the in-memory tree. */ void lsmTreeClear(lsm_db *pDb){ pDb->treehdr.iTransId = 1; pDb->treehdr.iRoot = 0; pDb->treehdr.nHeight = 0; pDb->treehdr.nByte = 0; } /* ** This function is called during recovery to initialize the ** tree header. Only the database connections private copy of the tree-header ** is initialized here - it will be copied into shared memory if log file ** recovery is successful. */ int lsmTreeInit(lsm_db *pDb){ ShmChunk *pOne; int rc = LSM_OK; pDb->treehdr.iTransId = 1; pDb->treehdr.iFirst = 1; pDb->treehdr.nChunk = 2; pDb->treehdr.iWrite = LSM_SHM_CHUNK_SIZE + LSM_SHM_CHUNK_HDR; pDb->treehdr.iNextShmid = 2; pDb->treehdr.iUsedShmid = 1; pOne = treeShmChunkRc(pDb, 1, &rc); if( pOne ){ pOne->iNext = 0; pOne->iShmid = 1; } return rc; } /* ** Insert a new entry into the in-memory tree. ** ** If the value of the 5th parameter, nVal, is negative, then a delete-marker ** is inserted into the tree. In this case the value pointer, pVal, must be ** NULL. */ int lsmTreeInsert( lsm_db *pDb, /* Database handle */ void *pKey, /* Pointer to key data */ int nKey, /* Size of key data in bytes */ void *pVal, /* Pointer to value data (or NULL) */ int nVal /* Bytes in value data (or -ve for delete) */ ){ int rc = LSM_OK; /* Return Code */ TreeKey *pTreeKey; /* New key-value being inserted */ u32 iTreeKey; TreeHeader *pHdr = &pDb->treehdr; assert( nVal>=0 || pVal==0 ); assert_tree_looks_ok(LSM_OK, pTree); #if 0 dump_tree_contents(pDb, "before"); #endif |
︙ | ︙ | |||
1445 1446 1447 1448 1449 1450 1451 | ** contents back to their current state. */ void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){ pMark->iRoot = pDb->treehdr.iRoot; pMark->nHeight = pDb->treehdr.nHeight; pMark->iWrite = pDb->treehdr.iWrite; pMark->nChunk = pDb->treehdr.nChunk; | < > | | | | | < > > | | | > | | | > | 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 | ** contents back to their current state. */ void lsmTreeMark(lsm_db *pDb, TreeMark *pMark){ pMark->iRoot = pDb->treehdr.iRoot; pMark->nHeight = pDb->treehdr.nHeight; pMark->iWrite = pDb->treehdr.iWrite; pMark->nChunk = pDb->treehdr.nChunk; pMark->iRollback = intArraySize(&pDb->rollback); } /* ** Roll back to mark pMark. Structure *pMark should have been previously ** populated by a call to lsmTreeMark(). */ void lsmTreeRollback(lsm_db *pDb, TreeMark *pMark){ int rcdummy = LSM_OK; int iIdx; int nIdx; u32 iNext; ShmChunk *pChunk; u32 iChunk; u32 iShmid; /* Revert all required v2 pointers. */ nIdx = intArraySize(&pDb->rollback); for(iIdx = pMark->iRollback; iIdx<nIdx; iIdx++){ TreeNode *pNode; pNode = treeShmptr(pDb, intArrayEntry(&pDb->rollback, iIdx), &rcdummy); assert( pNode && rcdummy==LSM_OK ); pNode->iV2 = 0; pNode->iV2Child = 0; pNode->iV2Ptr = 0; } intArrayTruncate(&pDb->rollback, pMark->iRollback); /* Restore the free-chunk list. */ assert( pMark->iWrite!=0 ); iChunk = treeOffsetToChunk(pMark->iWrite-1); pChunk = treeShmChunk(pDb, iChunk); iNext = pChunk->iNext; pChunk->iNext = 0; pChunk = treeShmChunk(pDb, pDb->treehdr.iFirst); iShmid = pChunk->iShmid-1; while( iNext ){ u32 iFree = iNext; /* Current chunk being rollback-freed */ ShmChunk *pFree; /* Pointer to chunk iFree */ pFree = treeShmChunk(pDb, iFree); iNext = pFree->iNext; if( iFree<pMark->nChunk ){ pFree->iNext = pDb->treehdr.iFirst; pFree->iShmid = iShmid--; pDb->treehdr.iFirst = iFree; } } /* Restore the tree-header fields */ pDb->treehdr.iRoot = pMark->iRoot; pDb->treehdr.nHeight = pMark->nHeight; pDb->treehdr.iWrite = pMark->iWrite; |
︙ | ︙ |