Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Fix a bug preventing block recycling in multi-threaded tests. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA1: |
93d9ff7c1272b51ec1022e7924daa8bf |
User & Date: | dan 2012-09-18 15:48:39.882 |
Context
2012-09-18
| ||
19:39 | Avoid malloc calls in lsm_file.c when running in mmap mode. Also avoid many mutex operations when accessing the in-memory tree. check-in: 1e661d0bad user: dan tags: trunk | |
15:48 | Fix a bug preventing block recycling in multi-threaded tests. check-in: 93d9ff7c12 user: dan tags: trunk | |
2012-09-17
| ||
20:41 | Make it possible to flush part of the in-memory tree to disk without blocking writer clients. check-in: 6c686c6d1a user: dan tags: trunk | |
Changes
Changes to lsm-test/lsmtest_tdb3.c.
︙ | ︙ | |||
571 572 573 574 575 576 577 | } /* ** A log message callback registered with lsm connections. Prints all ** messages to stderr. */ static void xLog(void *pCtx, int rc, const char *z){ | < > | 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 | } /* ** A log message callback registered with lsm connections. Prints all ** messages to stderr. */ static void xLog(void *pCtx, int rc, const char *z){ unused_parameter(rc); /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */ if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx); fprintf(stderr, "%s\n", z); fflush(stderr); } static void xWorkHook(lsm_db *db, void *pArg){ LsmDb *p = (LsmDb *)pArg; |
︙ | ︙ | |||
813 814 815 816 817 818 819 | } return 0; } void tdb_lsm_enable_log(TestDb *pDb, int bEnable){ lsm_db *db = tdb_lsm(pDb); if( db ){ | > > | | 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 | } return 0; } void tdb_lsm_enable_log(TestDb *pDb, int bEnable){ lsm_db *db = tdb_lsm(pDb); if( db ){ LsmDb *p = (LsmDb *)pDb; int i; lsm_config_log(db, (bEnable ? xLog : 0), (void *)"client"); } } void tdb_lsm_application_crash(TestDb *pDb){ if( tdb_lsm(pDb) ){ LsmDb *p = (LsmDb *)pDb; p->bCrashed = 1; |
︙ | ︙ | |||
889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 | } int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){ return testLsmOpen(zCfg, zDb, bClear, ppDb); } #ifdef LSM_MUTEX_PTHREADS static void *worker_main(void *pArg){ LsmWorker *p = (LsmWorker *)pArg; lsm_db *pWorker; /* Connection to access db through */ pthread_mutex_lock(&p->worker_mutex); while( (pWorker = p->pWorker) ){ int nWrite = 0; int rc; /* Do some work. If an error occurs, exit. */ pthread_mutex_unlock(&p->worker_mutex); rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite); | > > > > > > > > > > > | > > > > | < < < < < < < < < < < < | 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 | } int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){ return testLsmOpen(zCfg, zDb, bClear, ppDb); } #ifdef LSM_MUTEX_PTHREADS /* ** Signal worker thread iWorker that there may be work to do. */ static void mt_signal_worker(LsmDb *pDb, int iWorker){ LsmWorker *p = &pDb->aWorker[iWorker]; pthread_mutex_lock(&p->worker_mutex); p->bDoWork = 1; pthread_cond_signal(&p->worker_cond); pthread_mutex_unlock(&p->worker_mutex); } static void *worker_main(void *pArg){ LsmWorker *p = (LsmWorker *)pArg; lsm_db *pWorker; /* Connection to access db through */ pthread_mutex_lock(&p->worker_mutex); while( (pWorker = p->pWorker) ){ int nWrite = 0; int rc; /* Do some work. If an error occurs, exit. */ pthread_mutex_unlock(&p->worker_mutex); rc = lsm_work(pWorker, p->lsm_work_flags, p->lsm_work_npage, &nWrite); /* printf("# worked %d units\n", nWrite); */ pthread_mutex_lock(&p->worker_mutex); if( rc!=LSM_OK && rc!=LSM_BUSY ){ p->worker_rc = rc; break; } if( nWrite && (p->lsm_work_flags & LSM_WORK_CHECKPOINT)==0 ){ mt_signal_worker(p->pDb, 1); } /* If the call to lsm_work() indicates that there is nothing more ** to do at this point, wait on the condition variable. The thread will ** wake up when it is signaled either because the client thread has ** flushed an in-memory tree into the db file or when the connection ** is being closed. */ if( nWrite==0 ){ if( p->pWorker && p->bDoWork==0 ){ pthread_cond_wait(&p->worker_cond, &p->worker_mutex); } p->bDoWork = 0; } } pthread_mutex_unlock(&p->worker_mutex); /* printf("# worker EXIT\n"); */ return 0; } static void mt_stop_worker(LsmDb *pDb, int iWorker){ LsmWorker *p = &pDb->aWorker[iWorker]; if( p->pWorker ){ void *pDummy; lsm_db *pWorker; |
︙ | ︙ | |||
983 984 985 986 987 988 989 | static void mt_client_work_hook(lsm_db *db, void *pArg){ LsmDb *pDb = (LsmDb *)pArg; /* LsmDb database handle */ int i; /* Iterator variable */ /* Invoke the user level work-hook, if any. */ if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx); | | | 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 | static void mt_client_work_hook(lsm_db *db, void *pArg){ LsmDb *pDb = (LsmDb *)pArg; /* LsmDb database handle */ int i; /* Iterator variable */ /* Invoke the user level work-hook, if any. */ if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx); /* printf("# signalling worker threads\n"); */ /* Signal each worker thread */ for(i=0; i<pDb->nWorker; i++){ mt_signal_worker(pDb, i); } } static void mt_worker_work_hook(lsm_db *db, void *pArg){ |
︙ | ︙ | |||
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 | LsmWorker *p; /* Object to initialize */ assert( iWorker<pDb->nWorker ); p = &pDb->aWorker[iWorker]; p->lsm_work_flags = flags; p->lsm_work_npage = nPage; /* Open the worker connection */ if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker); if( rc==0 ) rc = lsm_open(p->pWorker, zFilename); /* Configure the work-hook */ if( rc==0 ){ lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb); } /* Kick off the worker thread. */ | > > | 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 | LsmWorker *p; /* Object to initialize */ assert( iWorker<pDb->nWorker ); p = &pDb->aWorker[iWorker]; p->lsm_work_flags = flags; p->lsm_work_npage = nPage; p->pDb = pDb; /* Open the worker connection */ if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker); if( rc==0 ) rc = lsm_open(p->pWorker, zFilename); lsm_config_log(p->pWorker, xLog, (void *)"worker"); /* Configure the work-hook */ if( rc==0 ){ lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb); } /* Kick off the worker thread. */ |
︙ | ︙ | |||
1041 1042 1043 1044 1045 1046 1047 | static int testLsmStartWorkers( LsmDb *pDb, int nWorker, const char *zFilename, const char *zCfg ){ int rc; int bAutowork = 0; assert( nWorker==1 || nWorker==2 ); | < | < < | | < < | < < | | | | > | 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 | static int testLsmStartWorkers( LsmDb *pDb, int nWorker, const char *zFilename, const char *zCfg ){ int rc; int bAutowork = 0; assert( nWorker==1 || nWorker==2 ); /* Configure a work-hook for the client connection. */ lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb); pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker); memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker); pDb->nWorker = nWorker; if( nWorker==1 ){ int flags = LSM_WORK_CHECKPOINT|LSM_WORK_FLUSH; rc = mt_start_worker(pDb, 0, zFilename, flags, 2048); }else{ rc = mt_start_worker(pDb, 0, zFilename, LSM_WORK_FLUSH, 1024); if( rc==LSM_OK ){ rc = mt_start_worker(pDb, 1, zFilename, LSM_WORK_CHECKPOINT, 0); } } return rc; } static int test_lsm_mt( |
︙ | ︙ |
Changes to src/lsm.h.
︙ | ︙ | |||
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 | ** The third argument should be of type (char **). The location pointed ** to is populated with a pointer to a nul-terminated string containing ** the string representation of a Tcl data-structure. The returned ** string should be eventually freed by the caller using lsm_free(). ** ** The Tcl structure returned is a list of six integers that describe ** the current structure of the log file. */ #define LSM_INFO_NWRITE 1 #define LSM_INFO_NREAD 2 #define LSM_INFO_DB_STRUCTURE 3 #define LSM_INFO_LOG_STRUCTURE 4 #define LSM_INFO_ARRAY_STRUCTURE 5 #define LSM_INFO_PAGE_ASCII_DUMP 6 #define LSM_INFO_PAGE_HEX_DUMP 7 /* ** Open and close transactions and nested transactions. ** ** lsm_begin(): ** Used to open transactions and sub-transactions. A successful call to | > > > > > > > > > > > | 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 | ** The third argument should be of type (char **). The location pointed ** to is populated with a pointer to a nul-terminated string containing ** the string representation of a Tcl data-structure. The returned ** string should be eventually freed by the caller using lsm_free(). ** ** The Tcl structure returned is a list of six integers that describe ** the current structure of the log file. ** ** LSM_INFO_FREELIST ** The third argument should be of type (char **). The location pointed ** to is populated with a pointer to a nul-terminated string containing ** the string representation of a Tcl data-structure. The returned ** string should be eventually freed by the caller using lsm_free(). ** ** The Tcl structure returned is a list containing one element for each ** free block in the database. The element itself consists of two ** integers - the block number and the id of the snapshot that freed it. */ #define LSM_INFO_NWRITE 1 #define LSM_INFO_NREAD 2 #define LSM_INFO_DB_STRUCTURE 3 #define LSM_INFO_LOG_STRUCTURE 4 #define LSM_INFO_ARRAY_STRUCTURE 5 #define LSM_INFO_PAGE_ASCII_DUMP 6 #define LSM_INFO_PAGE_HEX_DUMP 7 #define LSM_INFO_FREELIST 8 /* ** Open and close transactions and nested transactions. ** ** lsm_begin(): ** Used to open transactions and sub-transactions. A successful call to |
︙ | ︙ |
Changes to src/lsmInt.h.
︙ | ︙ | |||
741 742 743 744 745 746 747 748 749 750 751 752 753 754 | int lsmLogCommit(lsm_db *); void lsmLogEnd(lsm_db *pDb, int bCommit); void lsmLogTell(lsm_db *, LogMark *); void lsmLogSeek(lsm_db *, LogMark *); int lsmLogRecover(lsm_db *); int lsmInfoLogStructure(lsm_db *pDb, char **pzVal); /************************************************************************** ** Functions from file "lsm_shared.c". */ int lsmDbDatabaseConnect(lsm_db*, const char *); | > | 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 | int lsmLogCommit(lsm_db *); void lsmLogEnd(lsm_db *pDb, int bCommit); void lsmLogTell(lsm_db *, LogMark *); void lsmLogSeek(lsm_db *, LogMark *); int lsmLogRecover(lsm_db *); int lsmInfoLogStructure(lsm_db *pDb, char **pzVal); int lsmInfoFreelist(lsm_db *, char **pzVal); /************************************************************************** ** Functions from file "lsm_shared.c". */ int lsmDbDatabaseConnect(lsm_db*, const char *); |
︙ | ︙ |
Changes to src/lsm_main.c.
︙ | ︙ | |||
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 | } void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){ lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre, pSeg->iFirst, pSeg->iLast, pSeg->iRoot, pSeg->nSize ); } int lsmStructList( lsm_db *pDb, /* Database handle */ char **pzOut /* OUT: Nul-terminated string (tcl list) */ ){ Level *pTopLevel = 0; /* Top level of snapshot to report on */ int rc = LSM_OK; Level *p; LsmString s; Snapshot *pWorker; /* Worker snapshot */ int bUnlock = 0; /* Obtain the worker snapshot */ | > > > > > > > > > > > > > > > > > > > > < | < | < < < > > > > > > > | > > | | > > > | > > > > > > > > > > | 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 | } void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){ lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre, pSeg->iFirst, pSeg->iLast, pSeg->iRoot, pSeg->nSize ); } static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){ int rc = LSM_OK; assert( *pbUnlock==0 ); if( !pDb->pWorker ){ rc = lsmBeginWork(pDb); if( rc!=LSM_OK ) return rc; *pbUnlock = 1; } *pp = pDb->pWorker; return rc; } static void infoFreeWorker(lsm_db *pDb, int bUnlock){ if( bUnlock ){ int rcdummy = LSM_BUSY; lsmFinishWork(pDb, 0, 0, &rcdummy); } } int lsmStructList( lsm_db *pDb, /* Database handle */ char **pzOut /* OUT: Nul-terminated string (tcl list) */ ){ Level *pTopLevel = 0; /* Top level of snapshot to report on */ int rc = LSM_OK; Level *p; LsmString s; Snapshot *pWorker; /* Worker snapshot */ int bUnlock = 0; /* Obtain the worker snapshot */ rc = infoGetWorker(pDb, &pWorker, &bUnlock); if( rc!=LSM_OK ) return rc; /* Format the contents of the snapshot as text */ pTopLevel = lsmDbSnapshotLevel(pWorker); lsmStringInit(&s, pDb->pEnv); for(p=pTopLevel; rc==LSM_OK && p; p=p->pNext){ int i; lsmStringAppendf(&s, "%s{", (s.n ? " " : "")); lsmAppendSegmentList(&s, "", &p->lhs); for(i=0; rc==LSM_OK && i<p->nRight; i++){ lsmAppendSegmentList(&s, " ", &p->aRhs[i]); } lsmStringAppend(&s, "}", 1); } rc = s.n>=0 ? LSM_OK : LSM_NOMEM; /* Release the snapshot and return */ infoFreeWorker(pDb, bUnlock); *pzOut = s.z; return rc; } int lsmInfoFreelist(lsm_db *pDb, char **pzOut){ Snapshot *pWorker; /* Worker snapshot */ int bUnlock = 0; LsmString s; int i; int rc; /* Obtain the worker snapshot */ rc = infoGetWorker(pDb, &pWorker, &bUnlock); if( rc!=LSM_OK ) return rc; lsmStringInit(&s, pDb->pEnv); lsmStringAppendf(&s, "%d+%d",pWorker->freelist.nEntry,pWorker->nFreelistOvfl); for(i=0; i<pWorker->freelist.nEntry; i++){ FreelistEntry *p = &pWorker->freelist.aEntry[i]; lsmStringAppendf(&s, " {%d %d}", p->iBlk, (int)p->iId); } rc = s.n>=0 ? LSM_OK : LSM_NOMEM; /* Release the snapshot and return */ infoFreeWorker(pDb, bUnlock); *pzOut = s.z; return rc; } int lsm_info(lsm_db *pDb, int eParam, ...){ int rc = LSM_OK; va_list ap; |
︙ | ︙ | |||
481 482 483 484 485 486 487 488 489 490 491 492 493 494 | } case LSM_INFO_LOG_STRUCTURE: { char **pzVal = va_arg(ap, char **); rc = lsmInfoLogStructure(pDb, pzVal); break; } default: rc = LSM_MISUSE; break; } va_end(ap); | > > > > > > | 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 | } case LSM_INFO_LOG_STRUCTURE: { char **pzVal = va_arg(ap, char **); rc = lsmInfoLogStructure(pDb, pzVal); break; } case LSM_INFO_FREELIST: { char **pzVal = va_arg(ap, char **); rc = lsmInfoFreelist(pDb, pzVal); break; } default: rc = LSM_MISUSE; break; } va_end(ap); |
︙ | ︙ | |||
753 754 755 756 757 758 759 | ** db and checkpoint the latest snapshot. ** ** Ignore any LSM_BUSY errors that occur during these operations. If ** LSM_BUSY does occur, it means some other connection is already working ** on flushing the in-memory tree or checkpointing the database. */ assert( rc!=LSM_BUSY); | > | | | | > > > | 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 | ** db and checkpoint the latest snapshot. ** ** Ignore any LSM_BUSY errors that occur during these operations. If ** LSM_BUSY does occur, it means some other connection is already working ** on flushing the in-memory tree or checkpointing the database. */ assert( rc!=LSM_BUSY); if( rc==LSM_OK ){ if( nFlush>1 || (nFlush && pDb->bAutowork) ){ rc = lsmFlushToDisk(pDb); if( rc==LSM_OK && pDb->bAutowork ){ rc = lsmCheckpointWrite(pDb); } }else if( nFlush && pDb->xWork ){ pDb->xWork(pDb, pDb->pWorkCtx); } } if( rc==LSM_BUSY ) rc = LSM_OK; return rc; } |
︙ | ︙ |
Changes to src/lsm_shared.c.
︙ | ︙ | |||
452 453 454 455 456 457 458 | /* The "is in use" bit */ rc = lsmLsmInUse(pDb, iFree, &bInUse); /* The "has been checkpointed" bit */ if( rc==LSM_OK && bInUse==0 ){ i64 iId = 0; rc = lsmCheckpointSynced(pDb, &iId, 0); | | > > > > > > > | 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 | /* The "is in use" bit */ rc = lsmLsmInUse(pDb, iFree, &bInUse); /* The "has been checkpointed" bit */ if( rc==LSM_OK && bInUse==0 ){ i64 iId = 0; rc = lsmCheckpointSynced(pDb, &iId, 0); if( rc!=LSM_OK || iId<iFree ) bInUse = 2; } if( rc==LSM_OK && bInUse==0 ){ iRet = pFree->aEntry[0].iBlk; flRemoveEntry0(pFree); assert( iRet!=0 ); } #if 0 lsmLogMessage( pDb, 0, "%d reusing block %d%s", (iRet==0 ? "not " : "") pFree->aEntry[0].iBlk, bInUse==0 ? "" : bInUse==1 ? " (client)" : " (unsynced)" ); #endif } /* If no block was allocated from the free-list, allocate one at the ** end of the file. */ if( rc==LSM_OK && iRet==0 ){ iRet = ++pDb->pWorker->nBlock; } |
︙ | ︙ | |||
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 | lsmFsMetaPageRelease(pPg); } bDone = (iDisk>=iCkpt); } if( rc==LSM_OK && bDone==0 ){ int iMeta = (pShm->iMetaPage % 2) + 1; if( pDb->eSafety!=LSM_SAFETY_OFF ){ rc = lsmFsSyncDb(pDb->pFS); } if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta); if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){ rc = lsmFsSyncDb(pDb->pFS); } if( rc==LSM_OK ) pShm->iMetaPage = iMeta; } } lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0); return rc; } | > > > > > > > > | 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 | lsmFsMetaPageRelease(pPg); } bDone = (iDisk>=iCkpt); } if( rc==LSM_OK && bDone==0 ){ int iMeta = (pShm->iMetaPage % 2) + 1; #if 0 lsmLogMessage(pDb, 0, "starting checkpoint"); #endif if( pDb->eSafety!=LSM_SAFETY_OFF ){ rc = lsmFsSyncDb(pDb->pFS); } if( rc==LSM_OK ) rc = lsmCheckpointStore(pDb, iMeta); if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){ rc = lsmFsSyncDb(pDb->pFS); } if( rc==LSM_OK ) pShm->iMetaPage = iMeta; #if 0 lsmLogMessage(pDb, 0, "finish checkpoint %d", (int)lsmCheckpointId(pDb->aSnapshot, 0) ); #endif } } lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0); return rc; } |
︙ | ︙ | |||
655 656 657 658 659 660 661 | ** that the shared-memory still contains the same values. If so, proceed. ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; u32 iShmMax = pDb->treehdr.iUsedShmid; u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk; | | > > | 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 | ** that the shared-memory still contains the same values. If so, proceed. ** Otherwise, relinquish the read-lock and retry the whole procedure ** (starting with loading the in-memory tree header). */ if( rc==LSM_OK ){ ShmHeader *pShm = pDb->pShmhdr; u32 iShmMax = pDb->treehdr.iUsedShmid; u32 iShmMin = pDb->treehdr.iNextShmid+1-pDb->treehdr.nChunk; rc = lsmReadlock( pDb, lsmCheckpointId(pDb->aSnapshot, 0), iShmMin, iShmMax ); if( rc==LSM_OK ){ if( lsmTreeLoadHeaderOk(pDb, iTreehdr) && lsmCheckpointLoadOk(pDb, iSnap) ){ /* Read lock has been successfully obtained. Deserialize the ** checkpoint just loaded. TODO: This will be removed after ** lsm_sorted.c is changed to work directly from the serialized |
︙ | ︙ | |||
864 865 866 867 868 869 870 | ShmHeader *pShm = db->pShmhdr; int i; int rc = LSM_OK; for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){ ShmReader *p = &pShm->aReader[i]; if( p->iLsmId ){ | | | 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 | ShmHeader *pShm = db->pShmhdr; int i; int rc = LSM_OK; for(i=0; rc==LSM_OK && i<LSM_LOCK_NREADER; i++){ ShmReader *p = &pShm->aReader[i]; if( p->iLsmId ){ if( (iLsmId!=0 && p->iLsmId!=0 && iLsmId>=p->iLsmId) || (iLsmId==0 && shm_sequence_ge(p->iTreeId, iShmid)) ){ rc = lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_EXCL, 0); if( rc==LSM_OK ){ p->iLsmId = 0; lsmShmLock(db, LSM_LOCK_READER(i), LSM_LOCK_UNLOCK, 0); } |
︙ | ︙ |
Changes to src/lsm_sorted.c.
︙ | ︙ | |||
3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 | rc = sortedNewToplevel(pDb, 1, pnOvfl); assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush"); #endif return rc; } /* ** The nMerge levels in the LSM beginning with pLevel consist of a ** left-hand-side segment only. Replace these levels with a single new ** level consisting of a new empty segment on the left-hand-side and the | > > > | 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 | rc = sortedNewToplevel(pDb, 1, pnOvfl); assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) ); #if 0 lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush"); #endif #if 0 lsmLogMessage(pDb, rc, "flushed tree to disk"); #endif return rc; } /* ** The nMerge levels in the LSM beginning with pLevel consist of a ** left-hand-side segment only. Replace these levels with a single new ** level consisting of a new empty segment on the left-hand-side and the |
︙ | ︙ | |||
4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 | int lsmSortedAutoWork( lsm_db *pDb, /* Database handle */ int nUnit /* Pages of data written to in-memory tree */ ){ int rc; /* Return code */ int nRemaining; /* Units of work to do before returning */ int nDepth; /* Current height of tree (longest path) */ Level *pLevel; /* Used to iterate through levels */ assert( lsmFsIntegrityCheck(pDb) ); assert( pDb->pWorker ); /* Determine how many units of work to do before returning. One unit of ** work is achieved by writing one page (~4KB) of merged data. */ nRemaining = nDepth = 0; for(pLevel=lsmDbSnapshotLevel(pDb->pWorker); pLevel; pLevel=pLevel->pNext){ /* nDepth += LSM_MAX(1, pLevel->nRight); */ nDepth += 1; } nRemaining = nUnit * nDepth; | > | > > > | 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 | int lsmSortedAutoWork( lsm_db *pDb, /* Database handle */ int nUnit /* Pages of data written to in-memory tree */ ){ int rc; /* Return code */ int nRemaining; /* Units of work to do before returning */ int nDepth; /* Current height of tree (longest path) */ int nWrite; /* Pages written */ Level *pLevel; /* Used to iterate through levels */ assert( lsmFsIntegrityCheck(pDb) ); assert( pDb->pWorker ); /* Determine how many units of work to do before returning. One unit of ** work is achieved by writing one page (~4KB) of merged data. */ nRemaining = nDepth = 0; for(pLevel=lsmDbSnapshotLevel(pDb->pWorker); pLevel; pLevel=pLevel->pNext){ /* nDepth += LSM_MAX(1, pLevel->nRight); */ nDepth += 1; } nRemaining = nUnit * nDepth; rc = sortedWork(pDb, nRemaining, 0, &nWrite); #if 0 lsmLogMessage(pDb, 0, "auto-work: %d pages", nWrite); #endif return rc; } /* ** Perform work to merge database segments together. */ int lsm_work(lsm_db *pDb, int flags, int nPage, int *pnWrite){ |
︙ | ︙ | |||
4100 4101 4102 4103 4104 4105 4106 | bFinishWork = 1; } /* If the FLUSH flag is set, try to flush the contents of the in-memory ** tree to disk. */ if( rc==LSM_OK && ((flags & LSM_WORK_FLUSH)) ){ rc = lsmTreeLoadHeader(pDb, 0); | | > > > > > > > > > > > > | 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 | bFinishWork = 1; } /* If the FLUSH flag is set, try to flush the contents of the in-memory ** tree to disk. */ if( rc==LSM_OK && ((flags & LSM_WORK_FLUSH)) ){ rc = lsmTreeLoadHeader(pDb, 0); if( rc==LSM_OK && pDb->treehdr.iOldShmid && pDb->treehdr.iOldLog!=pDb->pWorker->iLogOff ){ rc = lsmSortedFlushTree(pDb, &nOvfl); bFlush = 1; } } /* If nPage is greater than zero, do some merging. */ if( rc==LSM_OK && nPage>0 ){ int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0); rc = sortedWork(pDb, nPage, bOptimize, &nWrite); if( rc==LSM_OK && nWrite ){ #if 0 { char *z = 0; lsmInfoFreelist(pDb, &z); lsmLogMessage(pDb, 0, "work: %d pages", nWrite); lsmLogMessage(pDb, 0, "freelist: %s", z); lsm_free(lsm_get_env(pDb), z); } #endif rc = lsmSortedFlushDb(pDb); if( rc==LSM_OK && lsmCheckpointOverflowRequired(pDb) ){ nOvfl = -1; rc = sortedNewToplevel(pDb, 0, &nOvfl); } } } |
︙ | ︙ |
Changes to tool/lsmperf.tcl.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #!/bin/sh # \ exec tclsh "$0" "$@" proc exec_lsmtest_speed {nSec spec} { set fd [open [list |lsmtest speed2 {*}$spec]] set res [list] set initial [clock seconds] while {![eof $fd] && ($nSec==0 || ([clock second]-$initial)<$nSec)} { set line [gets $fd] puts $line if {[string range $line 0 0]=="#"} continue if {[llength $line]==0} continue | > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | #!/bin/sh # \ exec tclsh "$0" "$@" proc exec_lsmtest_speed {nSec spec} { set fd [open [list |lsmtest speed2 {*}$spec]] set res [list] puts "lsmtest speed2 $spec" set initial [clock seconds] while {![eof $fd] && ($nSec==0 || ([clock second]-$initial)<$nSec)} { set line [gets $fd] puts $line if {[string range $line 0 0]=="#"} continue if {[llength $line]==0} continue |
︙ | ︙ | |||
49 50 51 52 53 54 55 | incr nMs $msInsert append ret "$nIns [expr $nIns*1000.0/$nMs]\n" } append ret "end\n" set ret } | | | | 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | incr nMs $msInsert append ret "$nIns [expr $nIns*1000.0/$nMs]\n" } append ret "end\n" set ret } proc make_dataset {res iRes nWrite nShift nOp} { set ret "" foreach row $res { set i [lindex $row 0] set j [lindex $row [expr $iRes+1]] set x [expr $i*$nWrite + $nShift] append ret "$x [expr int($nOp * 1000.0 / $j)]\n" } append ret "end\n" set ret } proc do_write_test {zPng nSec nWrite nFetch nRepeat lSys} { |
︙ | ︙ | |||
130 131 132 133 134 135 136 | set data3 "" foreach {name sys} $lSys res $lRes col $cols { foreach {c1 c2} $col {} if {$plot1 != ""} { set plot1 ", $plot1" } set plot1 "\"-\" ti \"$name writes/sec\" with boxes fs solid lc rgb \"$c1\"$plot1" | | | | | | 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | set data3 "" foreach {name sys} $lSys res $lRes col $cols { foreach {c1 c2} $col {} if {$plot1 != ""} { set plot1 ", $plot1" } set plot1 "\"-\" ti \"$name writes/sec\" with boxes fs solid lc rgb \"$c1\"$plot1" set data1 "[make_dataset $res 0 $nWrite $nShift $nWrite] $data1" set plot3 ",\"-\" ti \"$name cumulative writes/sec\" with lines lc rgb \"$c2\" lw 2 $plot3" set data3 "[make_totalset $res $nWrite] $data3" if {$nFetch>0} { set new ", \"-\" ti \"$name fetches/sec\" axis x1y2 with points lw 3 lc rgb \"$c2\"" set plot2 "$new $plot2" set data2 "[make_dataset $res 1 $nWrite $nWrite $nFetch] $data2" } incr nShift [expr $nWrite/4] } append script "plot $plot1 $plot2 $plot3\n" append script $data1 append script $data2 append script $data3 append script "pause -1\n" exec_gnuplot_script $script $zPng } do_write_test x.png 60 10000 20000 1000 { LSM "mmap=1 multi_proc=0 safety=1 threads=3 autowork=0 worker_nmerge=2" } |