Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
Comment: | Add the SQLITE_MAX_WORKER_THREADS compile time option. And the SQLITE_CONFIG_WORKER_THREADS sqlite3_config() switch. |
---|---|
Downloads: | Tarball | ZIP archive |
Timelines: | family | ancestors | descendants | both | threads |
Files: | files | file ages | folders |
SHA1: |
2774710df8cd2bfaca49888c69f1b01c |
User & Date: | dan 2014-03-31 19:57:34.075 |
Context
2014-04-01
| ||
10:19 | Fix a problem with OOM handling in the sorter code. (check-in: 59cd5229e2 user: dan tags: threads) | |
2014-03-31
| ||
19:57 | Add the SQLITE_MAX_WORKER_THREADS compile time option. And the SQLITE_CONFIG_WORKER_THREADS sqlite3_config() switch. (check-in: 2774710df8 user: dan tags: threads) | |
2014-03-29
| ||
10:01 | Fix a broken assert() in vdbesort.c. (check-in: 18d1b402f2 user: dan tags: threads) | |
Changes
Changes to src/global.c.
︙ | ︙ | |||
163 164 165 166 167 168 169 170 171 172 173 174 175 176 | 0, /* szScratch */ 0, /* nScratch */ (void*)0, /* pPage */ 0, /* szPage */ 0, /* nPage */ 0, /* mxParserStack */ 0, /* sharedCacheEnabled */ /* All the rest should always be initialized to zero */ 0, /* isInit */ 0, /* inProgress */ 0, /* isMutexInit */ 0, /* isMallocInit */ 0, /* isPCacheInit */ 0, /* pInitMutex */ | > | 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | 0, /* szScratch */ 0, /* nScratch */ (void*)0, /* pPage */ 0, /* szPage */ 0, /* nPage */ 0, /* mxParserStack */ 0, /* sharedCacheEnabled */ SQLITE_MAX_WORKER_THREADS, /* nWorker */ /* All the rest should always be initialized to zero */ 0, /* isInit */ 0, /* inProgress */ 0, /* isMutexInit */ 0, /* isMallocInit */ 0, /* isPCacheInit */ 0, /* pInitMutex */ |
︙ | ︙ |
Changes to src/main.c.
︙ | ︙ | |||
510 511 512 513 514 515 516 517 518 519 520 521 522 523 | #if SQLITE_OS_WIN && defined(SQLITE_WIN32_MALLOC) case SQLITE_CONFIG_WIN32_HEAPSIZE: { sqlite3GlobalConfig.nHeap = va_arg(ap, int); break; } #endif default: { rc = SQLITE_ERROR; break; } } va_end(ap); | > > > > > > > | 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 | #if SQLITE_OS_WIN && defined(SQLITE_WIN32_MALLOC) case SQLITE_CONFIG_WIN32_HEAPSIZE: { sqlite3GlobalConfig.nHeap = va_arg(ap, int); break; } #endif case SQLITE_CONFIG_WORKER_THREADS: { int n = va_arg(ap, int); if( n>SQLITE_MAX_WORKER_THREADS ) n = SQLITE_MAX_WORKER_THREADS; if( n>=0 ) sqlite3GlobalConfig.nWorker = n; break; } default: { rc = SQLITE_ERROR; break; } } va_end(ap); |
︙ | ︙ |
Changes to src/sqlite.h.in.
︙ | ︙ | |||
1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 | ** [[SQLITE_CONFIG_WIN32_HEAPSIZE]] ** <dt>SQLITE_CONFIG_WIN32_HEAPSIZE ** <dd>^This option is only available if SQLite is compiled for Windows ** with the [SQLITE_WIN32_MALLOC] pre-processor macro defined. ** SQLITE_CONFIG_WIN32_HEAPSIZE takes a 32-bit unsigned integer value ** that specifies the maximum size of the created heap. ** </dl> */ #define SQLITE_CONFIG_SINGLETHREAD 1 /* nil */ #define SQLITE_CONFIG_MULTITHREAD 2 /* nil */ #define SQLITE_CONFIG_SERIALIZED 3 /* nil */ #define SQLITE_CONFIG_MALLOC 4 /* sqlite3_mem_methods* */ #define SQLITE_CONFIG_GETMALLOC 5 /* sqlite3_mem_methods* */ #define SQLITE_CONFIG_SCRATCH 6 /* void*, int sz, int N */ | > > > > > > > > > > | 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 | ** [[SQLITE_CONFIG_WIN32_HEAPSIZE]] ** <dt>SQLITE_CONFIG_WIN32_HEAPSIZE ** <dd>^This option is only available if SQLite is compiled for Windows ** with the [SQLITE_WIN32_MALLOC] pre-processor macro defined. ** SQLITE_CONFIG_WIN32_HEAPSIZE takes a 32-bit unsigned integer value ** that specifies the maximum size of the created heap. ** </dl> ** ** [[SQLITE_CONFIG_WORKER_THREADS]] ** <dt>SQLITE_CONFIG_WORKER_THREADS ** <dd>^SQLITE_CONFIG_WORKER_THREADS takes a single argument of type int. ** It is used to set the number of background worker threads that may be ** launched when sorting large amounts of data. A value of 0 means launch ** no background threads at all. The maximum number of background threads ** allowed is configured at build-time by the SQLITE_MAX_WORKER_THREADS ** pre-processor option. ** </dl> */ #define SQLITE_CONFIG_SINGLETHREAD 1 /* nil */ #define SQLITE_CONFIG_MULTITHREAD 2 /* nil */ #define SQLITE_CONFIG_SERIALIZED 3 /* nil */ #define SQLITE_CONFIG_MALLOC 4 /* sqlite3_mem_methods* */ #define SQLITE_CONFIG_GETMALLOC 5 /* sqlite3_mem_methods* */ #define SQLITE_CONFIG_SCRATCH 6 /* void*, int sz, int N */ |
︙ | ︙ | |||
1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 | #define SQLITE_CONFIG_URI 17 /* int */ #define SQLITE_CONFIG_PCACHE2 18 /* sqlite3_pcache_methods2* */ #define SQLITE_CONFIG_GETPCACHE2 19 /* sqlite3_pcache_methods2* */ #define SQLITE_CONFIG_COVERING_INDEX_SCAN 20 /* int */ #define SQLITE_CONFIG_SQLLOG 21 /* xSqllog, void* */ #define SQLITE_CONFIG_MMAP_SIZE 22 /* sqlite3_int64, sqlite3_int64 */ #define SQLITE_CONFIG_WIN32_HEAPSIZE 23 /* int nByte */ /* ** CAPI3REF: Database Connection Configuration Options ** ** These constants are the available integer configuration options that ** can be passed as the second argument to the [sqlite3_db_config()] interface. ** | > | 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 | #define SQLITE_CONFIG_URI 17 /* int */ #define SQLITE_CONFIG_PCACHE2 18 /* sqlite3_pcache_methods2* */ #define SQLITE_CONFIG_GETPCACHE2 19 /* sqlite3_pcache_methods2* */ #define SQLITE_CONFIG_COVERING_INDEX_SCAN 20 /* int */ #define SQLITE_CONFIG_SQLLOG 21 /* xSqllog, void* */ #define SQLITE_CONFIG_MMAP_SIZE 22 /* sqlite3_int64, sqlite3_int64 */ #define SQLITE_CONFIG_WIN32_HEAPSIZE 23 /* int nByte */ #define SQLITE_CONFIG_WORKER_THREADS 24 /* int nWorker */ /* ** CAPI3REF: Database Connection Configuration Options ** ** These constants are the available integer configuration options that ** can be passed as the second argument to the [sqlite3_db_config()] interface. ** |
︙ | ︙ |
Changes to src/sqliteInt.h.
︙ | ︙ | |||
417 418 419 420 421 422 423 424 425 426 427 428 429 430 | ** Provide a default value for SQLITE_TEMP_STORE in case it is not specified ** on the command-line */ #ifndef SQLITE_TEMP_STORE # define SQLITE_TEMP_STORE 1 # define SQLITE_TEMP_STORE_xc 1 /* Exclude from ctime.c */ #endif /* ** GCC does not define the offsetof() macro so we'll have to do it ** ourselves. */ #ifndef offsetof #define offsetof(STRUCTURE,FIELD) ((int)((char*)&((STRUCTURE*)0)->FIELD)) | > > > > > > > > > > > > > | 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 | ** Provide a default value for SQLITE_TEMP_STORE in case it is not specified ** on the command-line */ #ifndef SQLITE_TEMP_STORE # define SQLITE_TEMP_STORE 1 # define SQLITE_TEMP_STORE_xc 1 /* Exclude from ctime.c */ #endif /* ** If no value has been provided for SQLITE_MAX_WORKER_THREADS, or if ** SQLITE_TEMP_STORE is set to 3 (never use temporary files), set it ** to zero. */ #if SQLITE_TEMP_STORE==3 # undef SQLITE_MAX_WORKER_THREADS #endif #ifndef SQLITE_MAX_WORKER_THREADS # define SQLITE_MAX_WORKER_THREADS 0 #endif /* ** GCC does not define the offsetof() macro so we'll have to do it ** ourselves. */ #ifndef offsetof #define offsetof(STRUCTURE,FIELD) ((int)((char*)&((STRUCTURE*)0)->FIELD)) |
︙ | ︙ | |||
2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 | int szScratch; /* Size of each scratch buffer */ int nScratch; /* Number of scratch buffers */ void *pPage; /* Page cache memory */ int szPage; /* Size of each page in pPage[] */ int nPage; /* Number of pages in pPage[] */ int mxParserStack; /* maximum depth of the parser stack */ int sharedCacheEnabled; /* true if shared-cache mode enabled */ /* The above might be initialized to non-zero. The following need to always ** initially be zero, however. */ int isInit; /* True after initialization has finished */ int inProgress; /* True while initialization in progress */ int isMutexInit; /* True after mutexes are initialized */ int isMallocInit; /* True after malloc is initialized */ int isPCacheInit; /* True after malloc is initialized */ | > | 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 | int szScratch; /* Size of each scratch buffer */ int nScratch; /* Number of scratch buffers */ void *pPage; /* Page cache memory */ int szPage; /* Size of each page in pPage[] */ int nPage; /* Number of pages in pPage[] */ int mxParserStack; /* maximum depth of the parser stack */ int sharedCacheEnabled; /* true if shared-cache mode enabled */ int nWorker; /* Number of worker threads to use */ /* The above might be initialized to non-zero. The following need to always ** initially be zero, however. */ int isInit; /* True after initialization has finished */ int inProgress; /* True while initialization in progress */ int isMutexInit; /* True after mutexes are initialized */ int isMallocInit; /* True after malloc is initialized */ int isPCacheInit; /* True after malloc is initialized */ |
︙ | ︙ |
Changes to src/threads.c.
︙ | ︙ | |||
22 23 24 25 26 27 28 29 30 31 32 33 34 35 | ** or sqlite3ThreadJoin() call. This is, in fact, what happens in ** single threaded systems. Nothing in SQLite requires multiple threads. ** This interface exists so that applications that want to take advantage ** of multiple cores can do so, while also allowing applications to stay ** single-threaded if desired. */ #include "sqliteInt.h" /********************************* Unix Pthreads ****************************/ #if SQLITE_OS_UNIX && defined(SQLITE_MUTEX_PTHREADS) && SQLITE_THREADSAFE>0 #define SQLITE_THREADS_IMPLEMENTED 1 /* Prevent the single-thread code below */ #include <pthread.h> | > > | 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | ** or sqlite3ThreadJoin() call. This is, in fact, what happens in ** single threaded systems. Nothing in SQLite requires multiple threads. ** This interface exists so that applications that want to take advantage ** of multiple cores can do so, while also allowing applications to stay ** single-threaded if desired. */ #include "sqliteInt.h" #if SQLITE_MAX_WORKER_THREADS>0 /********************************* Unix Pthreads ****************************/ #if SQLITE_OS_UNIX && defined(SQLITE_MUTEX_PTHREADS) && SQLITE_THREADSAFE>0 #define SQLITE_THREADS_IMPLEMENTED 1 /* Prevent the single-thread code below */ #include <pthread.h> |
︙ | ︙ | |||
211 212 213 214 215 216 217 | } sqlite3_free(p); return SQLITE_OK; } #endif /* !defined(SQLITE_THREADS_IMPLEMENTED) */ /****************************** End Single-Threaded *************************/ | > | 213 214 215 216 217 218 219 220 | } sqlite3_free(p); return SQLITE_OK; } #endif /* !defined(SQLITE_THREADS_IMPLEMENTED) */ /****************************** End Single-Threaded *************************/ #endif /* SQLITE_MAX_WORKER_THREADS>0 */ |
Changes to src/vdbesort.c.
︙ | ︙ | |||
22 23 24 25 26 27 28 | typedef struct VdbeSorterIter VdbeSorterIter; typedef struct SorterThread SorterThread; typedef struct SorterRecord SorterRecord; typedef struct SorterMerger SorterMerger; typedef struct FileWriter FileWriter; | < < < < < < < < | | > | | 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | typedef struct VdbeSorterIter VdbeSorterIter; typedef struct SorterThread SorterThread; typedef struct SorterRecord SorterRecord; typedef struct SorterMerger SorterMerger; typedef struct FileWriter FileWriter; /* ** Candidate values for SorterThread.eWork */ #define SORTER_THREAD_SORT 1 #define SORTER_THREAD_TO_PMA 2 #define SORTER_THREAD_CONS 3 /* ** Much of the work performed in this module to sort the list of records is ** broken down into smaller units that may be peformed in parallel. In order ** to perform such a unit of work, an instance of the following structure ** is configured and passed to vdbeSorterThreadMain() - either directly by ** the main thread or via a background thread. ** ** Exactly SorterThread.nThread instances of this structure are allocated ** as part of each VdbeSorter object. Instances are never allocated any other ** way. SorterThread.nThread is set to the number of worker threads allowed ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). ** ** When a background thread is launched to perform work, SorterThread.bDone ** is set to 0 and the SorterThread.pThread variable set to point to the ** thread handle. SorterThread.bDone is set to 1 (to indicate to the main ** thread that joining SorterThread.pThread will not block) before the thread ** exits. SorterThread.pThread and bDone are always cleared after the ** background thread has been joined. ** ** One object (specifically, VdbeSorter.aThread[SorterThread.nThread-1]) ** is reserved for the foreground thread. ** ** The nature of the work performed is determined by SorterThread.eWork, ** as follows: ** ** SORTER_THREAD_SORT: ** Sort the linked list of records at SorterThread.pList. |
︙ | ︙ | |||
183 184 185 186 187 188 189 | int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ SorterRecord *pRecord; /* Head of in-memory record list */ SorterMerger *pMerger; /* For final merge of PMAs (by caller) */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ | > | | 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ SorterRecord *pRecord; /* Head of in-memory record list */ SorterMerger *pMerger; /* For final merge of PMAs (by caller) */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ int nThread; /* Size of aThread[] array */ SorterThread aThread[1]; }; /* ** The following type is an iterator for a PMA. It caches the current key in ** variables nKey/aKey. If the iterator is at EOF, pFile==0. */ struct VdbeSorterIter { |
︙ | ︙ | |||
568 569 570 571 572 573 574 | if( p1->pFile==0 ){ iRes = i2; }else if( p2->pFile==0 ){ iRes = i1; }else{ int res; | | | 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 | if( p1->pFile==0 ){ iRes = i2; }else if( p2->pFile==0 ){ iRes = i1; }else{ int res; assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSorterThreadMain() */ vdbeSorterCompare( pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res ); if( res<=0 ){ iRes = i1; }else{ iRes = i2; |
︙ | ︙ | |||
593 594 595 596 597 598 599 600 601 602 603 | int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){ int pgsz; /* Page size of main database */ int i; /* Used to iterate through aThread[] */ int mxCache; /* Cache size */ VdbeSorter *pSorter; /* The new sorter */ KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ int rc = SQLITE_OK; assert( pCsr->pKeyInfo && pCsr->pBt==0 ); szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); | > > > > | | > | | 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 | int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){ int pgsz; /* Page size of main database */ int i; /* Used to iterate through aThread[] */ int mxCache; /* Cache size */ VdbeSorter *pSorter; /* The new sorter */ KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ int sz; /* Size of pSorter in bytes */ int rc = SQLITE_OK; int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0); assert( pCsr->pKeyInfo && pCsr->pBt==0 ); szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); sz = sizeof(VdbeSorter) + nWorker * sizeof(SorterThread); pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo); pCsr->pSorter = pSorter; if( pSorter==0 ){ rc = SQLITE_NOMEM; }else{ pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); pKeyInfo->db = 0; pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); pSorter->nThread = nWorker + 1; for(i=0; i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; pThread->pKeyInfo = pKeyInfo; pThread->pVfs = db->pVfs; pThread->pgsz = pgsz; } if( !sqlite3TempInMemory(db) ){ |
︙ | ︙ | |||
670 671 672 673 674 675 676 677 678 679 | pThread->pTemp1 = 0; } } /* ** Join all threads. */ static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; | > | > > > | 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 | pThread->pTemp1 = 0; } } /* ** Join all threads. */ #if SQLITE_MAX_WORKER_THREADS>0 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; for(i=0; i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; if( pThread->pThread ){ void *pRet; int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet); pThread->pThread = 0; pThread->bDone = 0; if( rc==SQLITE_OK ) rc = rc2; if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); } } return rc; } #else # define vdbeSorterJoinAll(x,rcin) (rcin) #endif /* ** Allocate a new SorterMerger object with space for nIter iterators. */ static SorterMerger *vdbeSorterMergerNew(int nIter){ int N = 2; /* Smallest power of two >= nIter */ int nByte; /* Total bytes of space to allocate */ |
︙ | ︙ | |||
735 736 737 738 739 740 741 | /* ** Reset a sorting cursor back to its original empty state. */ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ int i; vdbeSorterJoinAll(pSorter, SQLITE_OK); | | | 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 | /* ** Reset a sorting cursor back to its original empty state. */ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ int i; vdbeSorterJoinAll(pSorter, SQLITE_OK); for(i=0; i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; vdbeSorterThreadCleanup(db, pThread); } if( pSorter->aMemory==0 ){ vdbeSorterRecordFree(0, pSorter->pRecord); } vdbeSorterMergerReset(pSorter->pMerger); |
︙ | ︙ | |||
1242 1243 1244 1245 1246 1247 1248 | static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; int i; SorterThread *pThread; /* Thread context used to create new PMA */ pSorter->bUsePMA = 1; | | > > | > | > > | 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 | static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; int i; SorterThread *pThread; /* Thread context used to create new PMA */ pSorter->bUsePMA = 1; for(i=0; ALWAYS( i<pSorter->nThread ); i++){ pThread = &pSorter->aThread[i]; #if SQLITE_MAX_WORKER_THREADS>0 if( pThread->bDone ){ void *pRet; assert( pThread->pThread ); rc = sqlite3ThreadJoin(pThread->pThread, &pRet); pThread->pThread = 0; pThread->bDone = 0; if( rc==SQLITE_OK ){ rc = SQLITE_PTR_TO_INT(pRet); } } #endif if( pThread->pThread==0 ) break; } if( rc==SQLITE_OK ){ int bUseFg = (bFg || i==(pSorter->nThread-1)); assert( pThread->pThread==0 && pThread->bDone==0 ); pThread->eWork = SORTER_THREAD_TO_PMA; pThread->pList = pSorter->pRecord; pThread->nInMemory = pSorter->nInMemory; pSorter->nInMemory = 0; pSorter->pRecord = 0; if( pSorter->aMemory ){ u8 *aMem = pThread->aListMemory; pThread->aListMemory = pSorter->aMemory; pSorter->aMemory = aMem; } #if SQLITE_MAX_WORKER_THREADS>0 if( bUseFg==0 ){ /* Launch a background thread for this operation */ void *pCtx = (void*)pThread; assert( pSorter->aMemory==0 || pThread->aListMemory!=0 ); if( pThread->aListMemory ){ if( pSorter->aMemory==0 ){ pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); if( pSorter->aMemory==0 ) return SQLITE_NOMEM; }else{ pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory); } } rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx); }else #endif { /* Use the foreground thread for this operation */ u8 *aMem; rc = vdbeSorterRunThread(pThread); aMem = pThread->aListMemory; pThread->aListMemory = pSorter->aMemory; pSorter->aMemory = aMem; } |
︙ | ︙ | |||
1366 1367 1368 1369 1370 1371 1372 | int nNew = pSorter->nMemory * 2; while( nNew < nMin ) nNew = nNew*2; if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; if( nNew < nMin ) nNew = nMin; aNew = sqlite3Realloc(pSorter->aMemory, nNew); if( !aNew ) return SQLITE_NOMEM; | > | > | 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 | int nNew = pSorter->nMemory * 2; while( nNew < nMin ) nNew = nNew*2; if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; if( nNew < nMin ) nNew = nMin; aNew = sqlite3Realloc(pSorter->aMemory, nNew); if( !aNew ) return SQLITE_NOMEM; pSorter->pRecord = (SorterRecord*)( aNew + ((u8*)pSorter->pRecord - pSorter->aMemory) ); pSorter->aMemory = aNew; pSorter->nMemory = nNew; } pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory]; pSorter->iMemory += ROUND8(nReq); pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory; |
︙ | ︙ | |||
1395 1396 1397 1398 1399 1400 1401 | /* ** Return the total number of PMAs in all temporary files. */ static int vdbeSorterCountPMA(VdbeSorter *pSorter){ int nPMA = 0; int i; | | | 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 | /* ** Return the total number of PMAs in all temporary files. */ static int vdbeSorterCountPMA(VdbeSorter *pSorter){ int nPMA = 0; int i; for(i=0; i<pSorter->nThread; i++){ nPMA += pSorter->aThread[i].nPMA; } return nPMA; } /* ** Once the sorter has been populated, this function is called to prepare |
︙ | ︙ | |||
1442 1443 1444 1445 1446 1447 1448 | } /* Join all threads */ rc = vdbeSorterJoinAll(pSorter, rc); /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge ** some of them together so that this is no longer the case. */ | < | | | > | > > | | | 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 | } /* Join all threads */ rc = vdbeSorterJoinAll(pSorter, rc); /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge ** some of them together so that this is no longer the case. */ if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ int i; for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){ SorterThread *pThread = &pSorter->aThread[i]; if( pThread->pTemp1 ){ pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/pSorter->nThread; pThread->eWork = SORTER_THREAD_CONS; #if SQLITE_MAX_WORKER_THREADS>0 if( i<(pSorter->nThread-1) ){ void *pCtx = (void*)pThread; rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx); }else #endif { rc = vdbeSorterRunThread(pThread); } } } } /* Join all threads */ rc = vdbeSorterJoinAll(pSorter, rc); /* Assuming no errors have occurred, set up a merger structure to read ** and merge all remaining PMAs. */ assert( pSorter->pMerger==0 ); if( rc==SQLITE_OK ){ int nIter = 0; /* Number of iterators used */ int i; SorterMerger *pMerger; for(i=0; i<pSorter->nThread; i++){ nIter += pSorter->aThread[i].nPMA; } pSorter->pMerger = pMerger = vdbeSorterMergerNew(nIter); if( pMerger==0 ){ rc = SQLITE_NOMEM; }else{ int iIter = 0; int iThread = 0; for(iThread=0; iThread<pSorter->nThread; iThread++){ int iPMA; i64 iReadOff = 0; SorterThread *pThread = &pSorter->aThread[iThread]; for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){ i64 nDummy = 0; VdbeSorterIter *pIter = &pMerger->aIter[iIter++]; rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy); |
︙ | ︙ |