Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Fix a problem in free-list management. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | rework-flow-control |
Files: | files | file ages | folders |
SHA1: |
57444405e3e3a0c2c5f204e0669bfbcc |
User & Date: | dan 2012-09-26 11:57:36.528 |
Context
2012-09-26
| ||
14:07 | Fix a broken assert() statement. check-in: 797f6c5578 user: dan tags: rework-flow-control | |
11:57 | Fix a problem in free-list management. check-in: 57444405e3 user: dan tags: rework-flow-control | |
2012-09-25
| ||
19:13 | Fix bug in recycling of shared memory space. check-in: 156b93d03b user: dan tags: rework-flow-control | |
Changes
Changes to lsm-test/lsmtest_tdb3.c.
︙ | ︙ | |||
989 990 991 992 993 994 995 | if( p->pWorker && p->bDoWork==0 ){ pthread_cond_wait(&p->worker_cond, &p->worker_mutex); } p->bDoWork = 0; } } pthread_mutex_unlock(&p->worker_mutex); | < | 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 | if( p->pWorker && p->bDoWork==0 ){ pthread_cond_wait(&p->worker_cond, &p->worker_mutex); } p->bDoWork = 0; } } pthread_mutex_unlock(&p->worker_mutex); return 0; } static void mt_stop_worker(LsmDb *pDb, int iWorker){ LsmWorker *p = &pDb->aWorker[iWorker]; |
︙ | ︙ |
Changes to src/lsm_ckpt.c.
︙ | ︙ | |||
382 383 384 385 386 387 388 389 390 391 392 393 394 395 | CkptBuffer ckpt; int nFree; nFree = pSnap->freelist.nEntry; if( nOvfl>=0 ){ nFree -= nOvfl; }else{ nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL]; } /* Initialize the output buffer */ memset(&ckpt, 0, sizeof(CkptBuffer)); ckpt.pEnv = pDb->pEnv; iOut = CKPT_HDR_SIZE; | > | 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 | CkptBuffer ckpt; int nFree; nFree = pSnap->freelist.nEntry; if( nOvfl>=0 ){ nFree -= nOvfl; }else{ assert( 0 ); nOvfl = pDb->pShmhdr->aSnap2[CKPT_HDR_OVFL]; } /* Initialize the output buffer */ memset(&ckpt, 0, sizeof(CkptBuffer)); ckpt.pEnv = pDb->pEnv; iOut = CKPT_HDR_SIZE; |
︙ | ︙ | |||
438 439 440 441 442 443 444 | }else{ ckptSetValue(&ckpt, iOut, 0, &rc); ckptSetValue(&ckpt, iOut+1, 0, &rc); } iOut += 2; assert( iOut<=1024 ); | | | 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 | }else{ ckptSetValue(&ckpt, iOut, 0, &rc); ckptSetValue(&ckpt, iOut+1, 0, &rc); } iOut += 2; assert( iOut<=1024 ); #ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, rc, "ckptExportSnapshot(): id=%d freelist: %d/%d", (int)iId, nFree, nOvfl ); #endif *ppCkpt = (void *)ckpt.aCkpt; if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut; |
︙ | ︙ | |||
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 | #if 0 if( bFlush ){ printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1); fflush(stdout); } #endif rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); memcpy(pShm->aSnap2, p, n); lsmShmBarrier(pDb); | > | 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 | #if 0 if( bFlush ){ printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1); fflush(stdout); } #endif assert( lsmFsIntegrityCheck(pDb) ); rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n); if( rc!=LSM_OK ) return rc; assert( ckptChecksumOk((u32 *)p) ); assert( n<=LSM_META_PAGE_SIZE ); memcpy(pShm->aSnap2, p, n); lsmShmBarrier(pDb); |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | /* ** Append an entry to the free-list. */ int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){ /* Assert that this is not an attempt to insert a duplicate block number */ assertNotInFreelist(p, iBlk); /* Extend the space allocated for the freelist, if required */ assert( p->nAlloc>=p->nEntry ); if( p->nAlloc==p->nEntry ){ int nNew; FreelistEntry *aNew; | > > | 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | /* ** Append an entry to the free-list. */ int lsmFreelistAppend(lsm_env *pEnv, Freelist *p, int iBlk, i64 iId){ /* Assert that this is not an attempt to insert a duplicate block number */ #if 0 assertNotInFreelist(p, iBlk); #endif /* Extend the space allocated for the freelist, if required */ assert( p->nAlloc>=p->nEntry ); if( p->nAlloc==p->nEntry ){ int nNew; FreelistEntry *aNew; |
︙ | ︙ | |||
497 498 499 500 501 502 503 504 505 506 507 508 509 510 | ** LSM_NOMEM). */ int lsmBlockFree(lsm_db *pDb, int iBlk){ Snapshot *p = pDb->pWorker; assert( lsmShmAssertWorker(pDb) ); /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */ return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId); } /* ** Refree a database block. The worker snapshot must be held in order to call ** this function. | > > > | 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 | ** LSM_NOMEM). */ int lsmBlockFree(lsm_db *pDb, int iBlk){ Snapshot *p = pDb->pWorker; assert( lsmShmAssertWorker(pDb) ); /* TODO: Should assert() that lsmCheckpointOverflow() has not been called */ #ifdef LSM_LOG_FREELIST lsmLogMessage(pDb, LSM_OK, "lsmBlockFree(): Free block %d", iBlk); #endif return lsmFreelistAppend(pDb->pEnv, &p->freelist, iBlk, p->iId); } /* ** Refree a database block. The worker snapshot must be held in order to call ** this function. |
︙ | ︙ | |||
810 811 812 813 814 815 816 817 | ** structure have already been restored. In either case, this function ** merely releases locks and other resources held by the write-transaction. ** ** LSM_OK is returned if successful, or an LSM error code otherwise. */ int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){ int rc = LSM_OK; lsmLogEnd(pDb, bCommit); | > > < > > > > | | < | 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 | ** structure have already been restored. In either case, this function ** merely releases locks and other resources held by the write-transaction. ** ** LSM_OK is returned if successful, or an LSM error code otherwise. */ int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){ int rc = LSM_OK; int bAutowork = 0; lsmLogEnd(pDb, bCommit); if( rc==LSM_OK && bCommit && lsmTreeSize(pDb)>pDb->nTreeLimit ){ bAutowork = pDb->bAutowork; lsmTreeMakeOld(pDb); } lsmTreeEndTransaction(pDb, bCommit); if( rc==LSM_OK && bAutowork ){ rc = lsmSortedAutoWork(pDb, 1); } lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0); return rc; } /* |
︙ | ︙ |
Changes to src/lsm_sorted.c.
︙ | ︙ | |||
3603 3604 3605 3606 3607 3608 3609 | sortedFreeLevel(pDb->pEnv, pNew); } if( rc==LSM_OK ){ sortedInvokeWorkHook(pDb); } | < | 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 | sortedFreeLevel(pDb->pEnv, pNew); } if( rc==LSM_OK ){ sortedInvokeWorkHook(pDb); } if( pnWrite ) *pnWrite = nWrite; pDb->pWorker->nWrite += nWrite; #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel"); #endif return rc; } |
︙ | ︙ | |||
3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 | Level **ppNew /* New, merged, level */ ){ int rc = LSM_OK; /* Return Code */ Level *pNew; /* New Level object */ int bUseNext = 0; /* True to link in next separators */ Merge *pMerge; /* New Merge object */ int nByte; /* Bytes of space allocated at pMerge */ /* Allocate the new Level object */ pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc); if( pNew ){ pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, nMerge * sizeof(Segment), &rc); } /* Populate the new Level object */ if( rc==LSM_OK ){ Level *pNext = 0; /* Level following pNew */ int i; Level *pTopLevel; Level *p = pLevel; Level **pp; pNew->nRight = nMerge; pNew->iAge = pLevel->iAge+1; for(i=0; i<nMerge; i++){ pNext = p->pNext; pNew->aRhs[i] = p->lhs; sortedFreeLevel(pDb->pEnv, p); p = pNext; } /* Replace the old levels with the new. */ | > > > > > > > > > > > | 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 | Level **ppNew /* New, merged, level */ ){ int rc = LSM_OK; /* Return Code */ Level *pNew; /* New Level object */ int bUseNext = 0; /* True to link in next separators */ Merge *pMerge; /* New Merge object */ int nByte; /* Bytes of space allocated at pMerge */ #ifdef LSM_DEBUG int iLevel; Level *pX = pLevel; for(iLevel=0; iLevel<nMerge; iLevel++){ assert( pX->nRight==0 ); pX = pX->pNext; } #endif /* Allocate the new Level object */ pNew = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc); if( pNew ){ pNew->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, nMerge * sizeof(Segment), &rc); } /* Populate the new Level object */ if( rc==LSM_OK ){ Level *pNext = 0; /* Level following pNew */ int i; Level *pTopLevel; Level *p = pLevel; Level **pp; pNew->nRight = nMerge; pNew->iAge = pLevel->iAge+1; for(i=0; i<nMerge; i++){ assert( p->nRight==0 ); pNext = p->pNext; pNew->aRhs[i] = p->lhs; sortedFreeLevel(pDb->pEnv, p); p = pNext; } /* Replace the old levels with the new. */ |
︙ | ︙ | |||
3839 3840 3841 3842 3843 3844 3845 | nBest = nThis; } } if( pLevel->nRight ){ if( pLevel->nRight>nBest ){ nBest = pLevel->nRight; pBest = pLevel; | > | | < | 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 | nBest = nThis; } } if( pLevel->nRight ){ if( pLevel->nRight>nBest ){ nBest = pLevel->nRight; pBest = pLevel; } nThis = 0; pThis = 0; }else{ pThis = pLevel; nThis = 1; } } } if( nThis>nBest ){ |
︙ | ︙ | |||
3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 | } return rc; } static int sortedDbIsFull(lsm_db *pDb){ Level *pTop = lsmDbSnapshotLevel(pDb->pWorker); if( pTop && pTop->iAge==0 && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge) ){ return 1; } return 0; } static int sortedWork( lsm_db *pDb, /* Database handle. Must be worker. */ int nWork, /* Number of pages of work to do */ int bOptimize, /* True to merge less than nMerge levels */ int bFlush, /* Set if call is to make room for a flush */ int *pnWrite /* OUT: Actual number of pages written */ ){ int rc = LSM_OK; /* Return Code */ int nRemaining = nWork; /* Units of work to do before returning */ Snapshot *pWorker = pDb->pWorker; | > > < < | 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 | } return rc; } static int sortedDbIsFull(lsm_db *pDb){ Level *pTop = lsmDbSnapshotLevel(pDb->pWorker); if( lsmDatabaseFull(pDb) ) return 1; if( pTop && pTop->iAge==0 && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge) ){ return 1; } return 0; } static int sortedWork( lsm_db *pDb, /* Database handle. Must be worker. */ int nWork, /* Number of pages of work to do */ int bOptimize, /* True to merge less than nMerge levels */ int bFlush, /* Set if call is to make room for a flush */ int *pnWrite /* OUT: Actual number of pages written */ ){ int rc = LSM_OK; /* Return Code */ int nRemaining = nWork; /* Units of work to do before returning */ Snapshot *pWorker = pDb->pWorker; assert( pWorker ); if( lsmDbSnapshotLevel(pWorker)==0 ) return LSM_OK; while( nRemaining>0 ){ Level *pLevel = 0; /* Find a level to work on. */ rc = sortedSelectLevel(pDb, bOptimize, &pLevel); |
︙ | ︙ | |||
3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 | assert( mergeworker.nWork==0 ); while( rc==LSM_OK && 0==mergeWorkerDone(&mergeworker) && mergeworker.nWork<nRemaining ){ rc = mergeWorkerStep(&mergeworker); } nRemaining -= mergeworker.nWork; /* Check if the merge operation is completely finished. If so, the ** Merge object and the right-hand-side of the level can be deleted. ** ** Otherwise, gobble up (declare eligible for recycling) any pages ** from rhs segments for which the content has been completely merged | > | 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 | assert( mergeworker.nWork==0 ); while( rc==LSM_OK && 0==mergeWorkerDone(&mergeworker) && mergeworker.nWork<nRemaining ){ rc = mergeWorkerStep(&mergeworker); } assert( mergeworker.nWork>0 ); nRemaining -= mergeworker.nWork; /* Check if the merge operation is completely finished. If so, the ** Merge object and the right-hand-side of the level can be deleted. ** ** Otherwise, gobble up (declare eligible for recycling) any pages ** from rhs segments for which the content has been completely merged |
︙ | ︙ | |||
4000 4001 4002 4003 4004 4005 4006 | #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work"); #endif } } | < | 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 | #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work"); #endif } } if( pnWrite ) *pnWrite = (nWork - nRemaining); pWorker->nWrite += (nWork - nRemaining); #ifdef LSM_LOG_WORK lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining)); #endif return rc; |
︙ | ︙ | |||
4146 4147 4148 4149 4150 4151 4152 | rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg); nRem -= nPg; if( rc==LSM_OK && pDb->nTransOpen>0 ){ lsmTreeDiscardOld(pDb); } bFlush = 1; bToplevel = 0; | < | > > > > > > > > | > | 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 | rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg); nRem -= nPg; if( rc==LSM_OK && pDb->nTransOpen>0 ){ lsmTreeDiscardOld(pDb); } bFlush = 1; bToplevel = 0; } } } /* If nPage is still greater than zero, do some merging. */ if( rc==LSM_OK && nRem>0 && bShutdown==0 ){ int nPg = 0; int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0); rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg); nRem -= nPg; if( nPg ){ bToplevel = 1; nOvfl = 0; } } if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){ while( rc==LSM_OK && sortedDbIsFull(pDb) ){ int nPg = 0; rc = sortedWork(pDb, 16, 0, 1, &nPg); } if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){ rc = sortedNewToplevel(pDb, 0, &nOvfl, 0); } } if( rc==LSM_OK && (nRem!=nMax) ){ rc = lsmSortedFlushDb(pDb); lsmFinishWork(pDb, bFlush, nOvfl, &rc); }else{ int rcdummy = LSM_BUSY; |
︙ | ︙ |