SQLite

Check-in [2774710df8]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add the SQLITE_MAX_WORKER_THREADS compile time option. And the SQLITE_CONFIG_WORKER_THREADS sqlite3_config() switch.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | threads
Files: files | file ages | folders
SHA1: 2774710df8cd2bfaca49888c69f1b01c0ddadf9a
User & Date: dan 2014-03-31 19:57:34.075
Context
2014-04-01
10:19
Fix a problem with OOM handling in the sorter code. (check-in: 59cd5229e2 user: dan tags: threads)
2014-03-31
19:57
Add the SQLITE_MAX_WORKER_THREADS compile time option. And the SQLITE_CONFIG_WORKER_THREADS sqlite3_config() switch. (check-in: 2774710df8 user: dan tags: threads)
2014-03-29
10:01
Fix a broken assert() in vdbesort.c. (check-in: 18d1b402f2 user: dan tags: threads)
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/global.c.
163
164
165
166
167
168
169

170
171
172
173
174
175
176
   0,                         /* szScratch */
   0,                         /* nScratch */
   (void*)0,                  /* pPage */
   0,                         /* szPage */
   0,                         /* nPage */
   0,                         /* mxParserStack */
   0,                         /* sharedCacheEnabled */

   /* All the rest should always be initialized to zero */
   0,                         /* isInit */
   0,                         /* inProgress */
   0,                         /* isMutexInit */
   0,                         /* isMallocInit */
   0,                         /* isPCacheInit */
   0,                         /* pInitMutex */







>







163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
   0,                         /* szScratch */
   0,                         /* nScratch */
   (void*)0,                  /* pPage */
   0,                         /* szPage */
   0,                         /* nPage */
   0,                         /* mxParserStack */
   0,                         /* sharedCacheEnabled */
   SQLITE_MAX_WORKER_THREADS, /* nWorker */
   /* All the rest should always be initialized to zero */
   0,                         /* isInit */
   0,                         /* inProgress */
   0,                         /* isMutexInit */
   0,                         /* isMallocInit */
   0,                         /* isPCacheInit */
   0,                         /* pInitMutex */
Changes to src/main.c.
510
511
512
513
514
515
516







517
518
519
520
521
522
523

#if SQLITE_OS_WIN && defined(SQLITE_WIN32_MALLOC)
    case SQLITE_CONFIG_WIN32_HEAPSIZE: {
      sqlite3GlobalConfig.nHeap = va_arg(ap, int);
      break;
    }
#endif








    default: {
      rc = SQLITE_ERROR;
      break;
    }
  }
  va_end(ap);







>
>
>
>
>
>
>







510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530

#if SQLITE_OS_WIN && defined(SQLITE_WIN32_MALLOC)
    case SQLITE_CONFIG_WIN32_HEAPSIZE: {
      sqlite3GlobalConfig.nHeap = va_arg(ap, int);
      break;
    }
#endif

    case SQLITE_CONFIG_WORKER_THREADS: {
      int n = va_arg(ap, int);
      if( n>SQLITE_MAX_WORKER_THREADS ) n = SQLITE_MAX_WORKER_THREADS;
      if( n>=0 ) sqlite3GlobalConfig.nWorker = n;
      break;
    }

    default: {
      rc = SQLITE_ERROR;
      break;
    }
  }
  va_end(ap);
Changes to src/sqlite.h.in.
1711
1712
1713
1714
1715
1716
1717










1718
1719
1720
1721
1722
1723
1724
** [[SQLITE_CONFIG_WIN32_HEAPSIZE]]
** <dt>SQLITE_CONFIG_WIN32_HEAPSIZE
** <dd>^This option is only available if SQLite is compiled for Windows
** with the [SQLITE_WIN32_MALLOC] pre-processor macro defined.
** SQLITE_CONFIG_WIN32_HEAPSIZE takes a 32-bit unsigned integer value
** that specifies the maximum size of the created heap.
** </dl>










*/
#define SQLITE_CONFIG_SINGLETHREAD  1  /* nil */
#define SQLITE_CONFIG_MULTITHREAD   2  /* nil */
#define SQLITE_CONFIG_SERIALIZED    3  /* nil */
#define SQLITE_CONFIG_MALLOC        4  /* sqlite3_mem_methods* */
#define SQLITE_CONFIG_GETMALLOC     5  /* sqlite3_mem_methods* */
#define SQLITE_CONFIG_SCRATCH       6  /* void*, int sz, int N */







>
>
>
>
>
>
>
>
>
>







1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
** [[SQLITE_CONFIG_WIN32_HEAPSIZE]]
** <dt>SQLITE_CONFIG_WIN32_HEAPSIZE
** <dd>^This option is only available if SQLite is compiled for Windows
** with the [SQLITE_WIN32_MALLOC] pre-processor macro defined.
** SQLITE_CONFIG_WIN32_HEAPSIZE takes a 32-bit unsigned integer value
** that specifies the maximum size of the created heap.
** </dl>
**
** [[SQLITE_CONFIG_WORKER_THREADS]]
** <dt>SQLITE_CONFIG_WORKER_THREADS
** <dd>^SQLITE_CONFIG_WORKER_THREADS takes a single argument of type int.
** It is used to set the number of background worker threads that may be
** launched when sorting large amounts of data. A value of 0 means launch 
** no background threads at all. The maximum number of background threads 
** allowed is configured at build-time by the SQLITE_MAX_WORKER_THREADS 
** pre-processor option. 
** </dl>
*/
#define SQLITE_CONFIG_SINGLETHREAD  1  /* nil */
#define SQLITE_CONFIG_MULTITHREAD   2  /* nil */
#define SQLITE_CONFIG_SERIALIZED    3  /* nil */
#define SQLITE_CONFIG_MALLOC        4  /* sqlite3_mem_methods* */
#define SQLITE_CONFIG_GETMALLOC     5  /* sqlite3_mem_methods* */
#define SQLITE_CONFIG_SCRATCH       6  /* void*, int sz, int N */
1735
1736
1737
1738
1739
1740
1741

1742
1743
1744
1745
1746
1747
1748
#define SQLITE_CONFIG_URI          17  /* int */
#define SQLITE_CONFIG_PCACHE2      18  /* sqlite3_pcache_methods2* */
#define SQLITE_CONFIG_GETPCACHE2   19  /* sqlite3_pcache_methods2* */
#define SQLITE_CONFIG_COVERING_INDEX_SCAN 20  /* int */
#define SQLITE_CONFIG_SQLLOG       21  /* xSqllog, void* */
#define SQLITE_CONFIG_MMAP_SIZE    22  /* sqlite3_int64, sqlite3_int64 */
#define SQLITE_CONFIG_WIN32_HEAPSIZE      23  /* int nByte */


/*
** CAPI3REF: Database Connection Configuration Options
**
** These constants are the available integer configuration options that
** can be passed as the second argument to the [sqlite3_db_config()] interface.
**







>







1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
#define SQLITE_CONFIG_URI          17  /* int */
#define SQLITE_CONFIG_PCACHE2      18  /* sqlite3_pcache_methods2* */
#define SQLITE_CONFIG_GETPCACHE2   19  /* sqlite3_pcache_methods2* */
#define SQLITE_CONFIG_COVERING_INDEX_SCAN 20  /* int */
#define SQLITE_CONFIG_SQLLOG       21  /* xSqllog, void* */
#define SQLITE_CONFIG_MMAP_SIZE    22  /* sqlite3_int64, sqlite3_int64 */
#define SQLITE_CONFIG_WIN32_HEAPSIZE      23  /* int nByte */
#define SQLITE_CONFIG_WORKER_THREADS      24  /* int nWorker */

/*
** CAPI3REF: Database Connection Configuration Options
**
** These constants are the available integer configuration options that
** can be passed as the second argument to the [sqlite3_db_config()] interface.
**
Changes to src/sqliteInt.h.
417
418
419
420
421
422
423













424
425
426
427
428
429
430
** Provide a default value for SQLITE_TEMP_STORE in case it is not specified
** on the command-line
*/
#ifndef SQLITE_TEMP_STORE
# define SQLITE_TEMP_STORE 1
# define SQLITE_TEMP_STORE_xc 1  /* Exclude from ctime.c */
#endif














/*
** GCC does not define the offsetof() macro so we'll have to do it
** ourselves.
*/
#ifndef offsetof
#define offsetof(STRUCTURE,FIELD) ((int)((char*)&((STRUCTURE*)0)->FIELD))







>
>
>
>
>
>
>
>
>
>
>
>
>







417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
** Provide a default value for SQLITE_TEMP_STORE in case it is not specified
** on the command-line
*/
#ifndef SQLITE_TEMP_STORE
# define SQLITE_TEMP_STORE 1
# define SQLITE_TEMP_STORE_xc 1  /* Exclude from ctime.c */
#endif

/*
** If no value has been provided for SQLITE_MAX_WORKER_THREADS, or if
** SQLITE_TEMP_STORE is set to 3 (never use temporary files), set it 
** to zero.
*/
#if SQLITE_TEMP_STORE==3
# undef SQLITE_MAX_WORKER_THREADS
#endif
#ifndef SQLITE_MAX_WORKER_THREADS
# define SQLITE_MAX_WORKER_THREADS 0
#endif


/*
** GCC does not define the offsetof() macro so we'll have to do it
** ourselves.
*/
#ifndef offsetof
#define offsetof(STRUCTURE,FIELD) ((int)((char*)&((STRUCTURE*)0)->FIELD))
2681
2682
2683
2684
2685
2686
2687

2688
2689
2690
2691
2692
2693
2694
  int szScratch;                    /* Size of each scratch buffer */
  int nScratch;                     /* Number of scratch buffers */
  void *pPage;                      /* Page cache memory */
  int szPage;                       /* Size of each page in pPage[] */
  int nPage;                        /* Number of pages in pPage[] */
  int mxParserStack;                /* maximum depth of the parser stack */
  int sharedCacheEnabled;           /* true if shared-cache mode enabled */

  /* The above might be initialized to non-zero.  The following need to always
  ** initially be zero, however. */
  int isInit;                       /* True after initialization has finished */
  int inProgress;                   /* True while initialization in progress */
  int isMutexInit;                  /* True after mutexes are initialized */
  int isMallocInit;                 /* True after malloc is initialized */
  int isPCacheInit;                 /* True after malloc is initialized */







>







2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
  int szScratch;                    /* Size of each scratch buffer */
  int nScratch;                     /* Number of scratch buffers */
  void *pPage;                      /* Page cache memory */
  int szPage;                       /* Size of each page in pPage[] */
  int nPage;                        /* Number of pages in pPage[] */
  int mxParserStack;                /* maximum depth of the parser stack */
  int sharedCacheEnabled;           /* true if shared-cache mode enabled */
  int nWorker;                      /* Number of worker threads to use */
  /* The above might be initialized to non-zero.  The following need to always
  ** initially be zero, however. */
  int isInit;                       /* True after initialization has finished */
  int inProgress;                   /* True while initialization in progress */
  int isMutexInit;                  /* True after mutexes are initialized */
  int isMallocInit;                 /* True after malloc is initialized */
  int isPCacheInit;                 /* True after malloc is initialized */
Changes to src/threads.c.
22
23
24
25
26
27
28


29
30
31
32
33
34
35
** or sqlite3ThreadJoin() call.  This is, in fact, what happens in
** single threaded systems.  Nothing in SQLite requires multiple threads.
** This interface exists so that applications that want to take advantage
** of multiple cores can do so, while also allowing applications to stay
** single-threaded if desired.
*/
#include "sqliteInt.h"



/********************************* Unix Pthreads ****************************/
#if SQLITE_OS_UNIX && defined(SQLITE_MUTEX_PTHREADS) && SQLITE_THREADSAFE>0

#define SQLITE_THREADS_IMPLEMENTED 1  /* Prevent the single-thread code below */
#include <pthread.h>








>
>







22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
** or sqlite3ThreadJoin() call.  This is, in fact, what happens in
** single threaded systems.  Nothing in SQLite requires multiple threads.
** This interface exists so that applications that want to take advantage
** of multiple cores can do so, while also allowing applications to stay
** single-threaded if desired.
*/
#include "sqliteInt.h"

#if SQLITE_MAX_WORKER_THREADS>0

/********************************* Unix Pthreads ****************************/
#if SQLITE_OS_UNIX && defined(SQLITE_MUTEX_PTHREADS) && SQLITE_THREADSAFE>0

#define SQLITE_THREADS_IMPLEMENTED 1  /* Prevent the single-thread code below */
#include <pthread.h>

211
212
213
214
215
216
217

  }
  sqlite3_free(p);
  return SQLITE_OK;
}

#endif /* !defined(SQLITE_THREADS_IMPLEMENTED) */
/****************************** End Single-Threaded *************************/








>
213
214
215
216
217
218
219
220
  }
  sqlite3_free(p);
  return SQLITE_OK;
}

#endif /* !defined(SQLITE_THREADS_IMPLEMENTED) */
/****************************** End Single-Threaded *************************/
#endif /* SQLITE_MAX_WORKER_THREADS>0 */
Changes to src/vdbesort.c.
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
typedef struct VdbeSorterIter VdbeSorterIter;
typedef struct SorterThread SorterThread;
typedef struct SorterRecord SorterRecord;
typedef struct SorterMerger SorterMerger;
typedef struct FileWriter FileWriter;


/*
** Maximum number of threads to use. Setting this value to 1 forces all
** operations to be single-threaded.
*/
#ifndef SQLITE_MAX_SORTER_THREAD
# define SQLITE_MAX_SORTER_THREAD 4
#endif

/*
** Candidate values for SorterThread.eWork
*/
#define SORTER_THREAD_SORT   1
#define SORTER_THREAD_TO_PMA 2
#define SORTER_THREAD_CONS   3

/*
** Much of the work performed in this module to sort the list of records is 
** broken down into smaller units that may be peformed in parallel. In order
** to perform such a unit of work, an instance of the following structure
** is configured and passed to vdbeSorterThreadMain() - either directly by 
** the main thread or via a background thread.
**
** Exactly SQLITE_MAX_SORTER_THREAD instances of this structure are allocated
** as part of each VdbeSorter object. Instances are never allocated any other
** way.

**
** When a background thread is launched to perform work, SorterThread.bDone
** is set to 0 and the SorterThread.pThread variable set to point to the
** thread handle. SorterThread.bDone is set to 1 (to indicate to the main
** thread that joining SorterThread.pThread will not block) before the thread
** exits. SorterThread.pThread and bDone are always cleared after the 
** background thread has been joined.
**
** One object (specifically, VdbeSorter.aThread[SQLITE_MAX_SORTER_THREAD-1])
** is reserved for the foreground thread.
**
** The nature of the work performed is determined by SorterThread.eWork,
** as follows:
**
**   SORTER_THREAD_SORT:
**     Sort the linked list of records at SorterThread.pList.







<
<
<
<
<
<
<
<














|

|
>








|







22
23
24
25
26
27
28








29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
typedef struct VdbeSorterIter VdbeSorterIter;
typedef struct SorterThread SorterThread;
typedef struct SorterRecord SorterRecord;
typedef struct SorterMerger SorterMerger;
typedef struct FileWriter FileWriter;










/*
** Candidate values for SorterThread.eWork
*/
#define SORTER_THREAD_SORT   1
#define SORTER_THREAD_TO_PMA 2
#define SORTER_THREAD_CONS   3

/*
** Much of the work performed in this module to sort the list of records is 
** broken down into smaller units that may be peformed in parallel. In order
** to perform such a unit of work, an instance of the following structure
** is configured and passed to vdbeSorterThreadMain() - either directly by 
** the main thread or via a background thread.
**
** Exactly SorterThread.nThread instances of this structure are allocated
** as part of each VdbeSorter object. Instances are never allocated any other
** way. SorterThread.nThread is set to the number of worker threads allowed
** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
**
** When a background thread is launched to perform work, SorterThread.bDone
** is set to 0 and the SorterThread.pThread variable set to point to the
** thread handle. SorterThread.bDone is set to 1 (to indicate to the main
** thread that joining SorterThread.pThread will not block) before the thread
** exits. SorterThread.pThread and bDone are always cleared after the 
** background thread has been joined.
**
** One object (specifically, VdbeSorter.aThread[SorterThread.nThread-1])
** is reserved for the foreground thread.
**
** The nature of the work performed is determined by SorterThread.eWork,
** as follows:
**
**   SORTER_THREAD_SORT:
**     Sort the linked list of records at SorterThread.pList.
183
184
185
186
187
188
189

190
191
192
193
194
195
196
197
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  int bUsePMA;                    /* True if one or more PMAs created */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */

  SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
*/
struct VdbeSorterIter {







>
|







176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  int bUsePMA;                    /* True if one or more PMAs created */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */
  int nThread;                    /* Size of aThread[] array */
  SorterThread aThread[1];
};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
*/
struct VdbeSorterIter {
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582

  if( p1->pFile==0 ){
    iRes = i2;
  }else if( p2->pFile==0 ){
    iRes = i1;
  }else{
    int res;
    assert( pThread->pUnpacked!=0 );  /* allocated in vdbeSorterMerge() */
    vdbeSorterCompare(
        pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
    );
    if( res<=0 ){
      iRes = i1;
    }else{
      iRes = i2;







|







562
563
564
565
566
567
568
569
570
571
572
573
574
575
576

  if( p1->pFile==0 ){
    iRes = i2;
  }else if( p2->pFile==0 ){
    iRes = i1;
  }else{
    int res;
    assert( pThread->pUnpacked!=0 );  /* allocated in vdbeSorterThreadMain() */
    vdbeSorterCompare(
        pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
    );
    if( res<=0 ){
      iRes = i1;
    }else{
      iRes = i2;
593
594
595
596
597
598
599

600

601
602
603


604
605
606
607
608
609
610
611
612
613

614
615
616
617
618
619
620
621
int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
  int pgsz;                       /* Page size of main database */
  int i;                          /* Used to iterate through aThread[] */
  int mxCache;                    /* Cache size */
  VdbeSorter *pSorter;            /* The new sorter */
  KeyInfo *pKeyInfo;              /* Copy of pCsr->pKeyInfo with db==0 */
  int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */

  int rc = SQLITE_OK;


  assert( pCsr->pKeyInfo && pCsr->pBt==0 );
  szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);


  pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sizeof(VdbeSorter)+szKeyInfo);
  pCsr->pSorter = pSorter;
  if( pSorter==0 ){
    rc = SQLITE_NOMEM;
  }else{
    pKeyInfo = (KeyInfo*)&pSorter[1];
    memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
    pKeyInfo->db = 0;
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);


    for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
      SorterThread *pThread = &pSorter->aThread[i];
      pThread->pKeyInfo = pKeyInfo;
      pThread->pVfs = db->pVfs;
      pThread->pgsz = pgsz;
    }

    if( !sqlite3TempInMemory(db) ){







>

>



>
>
|




|




>
|







587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
  int pgsz;                       /* Page size of main database */
  int i;                          /* Used to iterate through aThread[] */
  int mxCache;                    /* Cache size */
  VdbeSorter *pSorter;            /* The new sorter */
  KeyInfo *pKeyInfo;              /* Copy of pCsr->pKeyInfo with db==0 */
  int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
  int sz;                         /* Size of pSorter in bytes */
  int rc = SQLITE_OK;
  int nWorker = (sqlite3GlobalConfig.bCoreMutex?sqlite3GlobalConfig.nWorker:0);

  assert( pCsr->pKeyInfo && pCsr->pBt==0 );
  szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
  sz = sizeof(VdbeSorter) + nWorker * sizeof(SorterThread);

  pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
  pCsr->pSorter = pSorter;
  if( pSorter==0 ){
    rc = SQLITE_NOMEM;
  }else{
    pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
    memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
    pKeyInfo->db = 0;
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);

    pSorter->nThread = nWorker + 1;
    for(i=0; i<pSorter->nThread; i++){
      SorterThread *pThread = &pSorter->aThread[i];
      pThread->pKeyInfo = pKeyInfo;
      pThread->pVfs = db->pVfs;
      pThread->pgsz = pgsz;
    }

    if( !sqlite3TempInMemory(db) ){
670
671
672
673
674
675
676

677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692



693
694
695
696
697
698
699
    pThread->pTemp1 = 0;
  }
}

/*
** Join all threads.  
*/

static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
  int rc = rcin;
  int i;
  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
    SorterThread *pThread = &pSorter->aThread[i];
    if( pThread->pThread ){
      void *pRet;
      int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet);
      pThread->pThread = 0;
      pThread->bDone = 0;
      if( rc==SQLITE_OK ) rc = rc2;
      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
    }
  }
  return rc;
}




/*
** Allocate a new SorterMerger object with space for nIter iterators.
*/
static SorterMerger *vdbeSorterMergerNew(int nIter){
  int N = 2;                      /* Smallest power of two >= nIter */
  int nByte;                      /* Total bytes of space to allocate */







>



|












>
>
>







669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
    pThread->pTemp1 = 0;
  }
}

/*
** Join all threads.  
*/
#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
  int rc = rcin;
  int i;
  for(i=0; i<pSorter->nThread; i++){
    SorterThread *pThread = &pSorter->aThread[i];
    if( pThread->pThread ){
      void *pRet;
      int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet);
      pThread->pThread = 0;
      pThread->bDone = 0;
      if( rc==SQLITE_OK ) rc = rc2;
      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
    }
  }
  return rc;
}
#else
# define vdbeSorterJoinAll(x,rcin) (rcin)
#endif

/*
** Allocate a new SorterMerger object with space for nIter iterators.
*/
static SorterMerger *vdbeSorterMergerNew(int nIter){
  int N = 2;                      /* Smallest power of two >= nIter */
  int nByte;                      /* Total bytes of space to allocate */
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749

/*
** Reset a sorting cursor back to its original empty state.
*/
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
  int i;
  vdbeSorterJoinAll(pSorter, SQLITE_OK);
  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
    SorterThread *pThread = &pSorter->aThread[i];
    vdbeSorterThreadCleanup(db, pThread);
  }
  if( pSorter->aMemory==0 ){
    vdbeSorterRecordFree(0, pSorter->pRecord);
  }
  vdbeSorterMergerReset(pSorter->pMerger);







|







738
739
740
741
742
743
744
745
746
747
748
749
750
751
752

/*
** Reset a sorting cursor back to its original empty state.
*/
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
  int i;
  vdbeSorterJoinAll(pSorter, SQLITE_OK);
  for(i=0; i<pSorter->nThread; i++){
    SorterThread *pThread = &pSorter->aThread[i];
    vdbeSorterThreadCleanup(db, pThread);
  }
  if( pSorter->aMemory==0 ){
    vdbeSorterRecordFree(0, pSorter->pRecord);
  }
  vdbeSorterMergerReset(pSorter->pMerger);
1242
1243
1244
1245
1246
1247
1248
1249
1250

1251
1252
1253
1254
1255
1256
1257
1258
1259
1260

1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279

1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293


1294
1295
1296
1297
1298
1299
1300
static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
  VdbeSorter *pSorter = pCsr->pSorter;
  int rc = SQLITE_OK;
  int i;
  SorterThread *pThread;        /* Thread context used to create new PMA */

  pSorter->bUsePMA = 1;
  for(i=0; ALWAYS( i<SQLITE_MAX_SORTER_THREAD ); i++){
    pThread = &pSorter->aThread[i];

    if( pThread->bDone ){
      void *pRet;
      assert( pThread->pThread );
      rc = sqlite3ThreadJoin(pThread->pThread, &pRet);
      pThread->pThread = 0;
      pThread->bDone = 0;
      if( rc==SQLITE_OK ){
        rc = SQLITE_PTR_TO_INT(pRet);
      }
    }

    if( pThread->pThread==0 ) break;
  }

  if( rc==SQLITE_OK ){
    int bUseFg = (bFg || i==(SQLITE_MAX_SORTER_THREAD-1));

    assert( pThread->pThread==0 && pThread->bDone==0 );
    pThread->eWork = SORTER_THREAD_TO_PMA;
    pThread->pList = pSorter->pRecord;
    pThread->nInMemory = pSorter->nInMemory;
    pSorter->nInMemory = 0;
    pSorter->pRecord = 0;

    if( pSorter->aMemory ){
      u8 *aMem = pThread->aListMemory;
      pThread->aListMemory = pSorter->aMemory;
      pSorter->aMemory = aMem;
    }


    if( bUseFg==0 ){
      /* Launch a background thread for this operation */
      void *pCtx = (void*)pThread;
      assert( pSorter->aMemory==0 || pThread->aListMemory!=0 );
      if( pThread->aListMemory ){
        if( pSorter->aMemory==0 ){
          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
        }else{
          pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
        }
      }
      rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx);
    }else{


      /* Use the foreground thread for this operation */
      u8 *aMem;
      rc = vdbeSorterRunThread(pThread);
      aMem = pThread->aListMemory;
      pThread->aListMemory = pSorter->aMemory;
      pSorter->aMemory = aMem;
    }







|

>










>




|














>













|
>
>







1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
  VdbeSorter *pSorter = pCsr->pSorter;
  int rc = SQLITE_OK;
  int i;
  SorterThread *pThread;        /* Thread context used to create new PMA */

  pSorter->bUsePMA = 1;
  for(i=0; ALWAYS( i<pSorter->nThread ); i++){
    pThread = &pSorter->aThread[i];
#if SQLITE_MAX_WORKER_THREADS>0
    if( pThread->bDone ){
      void *pRet;
      assert( pThread->pThread );
      rc = sqlite3ThreadJoin(pThread->pThread, &pRet);
      pThread->pThread = 0;
      pThread->bDone = 0;
      if( rc==SQLITE_OK ){
        rc = SQLITE_PTR_TO_INT(pRet);
      }
    }
#endif
    if( pThread->pThread==0 ) break;
  }

  if( rc==SQLITE_OK ){
    int bUseFg = (bFg || i==(pSorter->nThread-1));

    assert( pThread->pThread==0 && pThread->bDone==0 );
    pThread->eWork = SORTER_THREAD_TO_PMA;
    pThread->pList = pSorter->pRecord;
    pThread->nInMemory = pSorter->nInMemory;
    pSorter->nInMemory = 0;
    pSorter->pRecord = 0;

    if( pSorter->aMemory ){
      u8 *aMem = pThread->aListMemory;
      pThread->aListMemory = pSorter->aMemory;
      pSorter->aMemory = aMem;
    }

#if SQLITE_MAX_WORKER_THREADS>0
    if( bUseFg==0 ){
      /* Launch a background thread for this operation */
      void *pCtx = (void*)pThread;
      assert( pSorter->aMemory==0 || pThread->aListMemory!=0 );
      if( pThread->aListMemory ){
        if( pSorter->aMemory==0 ){
          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
        }else{
          pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
        }
      }
      rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx);
    }else
#endif
    {
      /* Use the foreground thread for this operation */
      u8 *aMem;
      rc = vdbeSorterRunThread(pThread);
      aMem = pThread->aListMemory;
      pThread->aListMemory = pSorter->aMemory;
      pSorter->aMemory = aMem;
    }
1366
1367
1368
1369
1370
1371
1372

1373

1374
1375
1376
1377
1378
1379
1380
      int nNew = pSorter->nMemory * 2;
      while( nNew < nMin ) nNew = nNew*2;
      if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
      if( nNew < nMin ) nNew = nMin;

      aNew = sqlite3Realloc(pSorter->aMemory, nNew);
      if( !aNew ) return SQLITE_NOMEM;

      pSorter->pRecord = aNew + ((u8*)pSorter->pRecord - pSorter->aMemory);

      pSorter->aMemory = aNew;
      pSorter->nMemory = nNew;
    }

    pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory];
    pSorter->iMemory += ROUND8(nReq);
    pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory;







>
|
>







1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
      int nNew = pSorter->nMemory * 2;
      while( nNew < nMin ) nNew = nNew*2;
      if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
      if( nNew < nMin ) nNew = nMin;

      aNew = sqlite3Realloc(pSorter->aMemory, nNew);
      if( !aNew ) return SQLITE_NOMEM;
      pSorter->pRecord = (SorterRecord*)(
          aNew + ((u8*)pSorter->pRecord - pSorter->aMemory)
      );
      pSorter->aMemory = aNew;
      pSorter->nMemory = nNew;
    }

    pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory];
    pSorter->iMemory += ROUND8(nReq);
    pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory;
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409

/*
** Return the total number of PMAs in all temporary files.
*/
static int vdbeSorterCountPMA(VdbeSorter *pSorter){
  int nPMA = 0;
  int i;
  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
    nPMA += pSorter->aThread[i].nPMA;
  }
  return nPMA;
}

/*
** Once the sorter has been populated, this function is called to prepare







|







1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419

/*
** Return the total number of PMAs in all temporary files.
*/
static int vdbeSorterCountPMA(VdbeSorter *pSorter){
  int nPMA = 0;
  int i;
  for(i=0; i<pSorter->nThread; i++){
    nPMA += pSorter->aThread[i].nPMA;
  }
  return nPMA;
}

/*
** Once the sorter has been populated, this function is called to prepare
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458

1459
1460
1461


1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
  }

  /* Join all threads */
  rc = vdbeSorterJoinAll(pSorter, rc);

  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
  ** some of them together so that this is no longer the case. */
  assert( SORTER_MAX_MERGE_COUNT>=SQLITE_MAX_SORTER_THREAD );
  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
    int i;
    for(i=0; rc==SQLITE_OK && i<SQLITE_MAX_SORTER_THREAD; i++){
      SorterThread *pThread = &pSorter->aThread[i];
      if( pThread->pTemp1 ){
        pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/SQLITE_MAX_SORTER_THREAD;
        pThread->eWork = SORTER_THREAD_CONS;

        if( i<(SQLITE_MAX_SORTER_THREAD-1) ){

          void *pCtx = (void*)pThread;
          rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
        }else{


          rc = vdbeSorterRunThread(pThread);
        }
      }
    }
  }

  /* Join all threads */
  rc = vdbeSorterJoinAll(pSorter, rc);

  /* Assuming no errors have occurred, set up a merger structure to read
  ** and merge all remaining PMAs.  */
  assert( pSorter->pMerger==0 );
  if( rc==SQLITE_OK ){
    int nIter = 0;                /* Number of iterators used */
    int i;
    SorterMerger *pMerger;
    for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
      nIter += pSorter->aThread[i].nPMA;
    }

    pSorter->pMerger = pMerger = vdbeSorterMergerNew(nIter);
    if( pMerger==0 ){
      rc = SQLITE_NOMEM;
    }else{
      int iIter = 0;
      int iThread = 0;
      for(iThread=0; iThread<SQLITE_MAX_SORTER_THREAD; iThread++){
        int iPMA;
        i64 iReadOff = 0;
        SorterThread *pThread = &pSorter->aThread[iThread];
        for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){
          i64 nDummy = 0;
          VdbeSorterIter *pIter = &pMerger->aIter[iIter++];
          rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy);







<


|


|


|
>


|
>
>
















|









|







1452
1453
1454
1455
1456
1457
1458

1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
  }

  /* Join all threads */
  rc = vdbeSorterJoinAll(pSorter, rc);

  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
  ** some of them together so that this is no longer the case. */

  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
    int i;
    for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){
      SorterThread *pThread = &pSorter->aThread[i];
      if( pThread->pTemp1 ){
        pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/pSorter->nThread;
        pThread->eWork = SORTER_THREAD_CONS;

#if SQLITE_MAX_WORKER_THREADS>0
        if( i<(pSorter->nThread-1) ){
          void *pCtx = (void*)pThread;
          rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
        }else
#endif
        {
          rc = vdbeSorterRunThread(pThread);
        }
      }
    }
  }

  /* Join all threads */
  rc = vdbeSorterJoinAll(pSorter, rc);

  /* Assuming no errors have occurred, set up a merger structure to read
  ** and merge all remaining PMAs.  */
  assert( pSorter->pMerger==0 );
  if( rc==SQLITE_OK ){
    int nIter = 0;                /* Number of iterators used */
    int i;
    SorterMerger *pMerger;
    for(i=0; i<pSorter->nThread; i++){
      nIter += pSorter->aThread[i].nPMA;
    }

    pSorter->pMerger = pMerger = vdbeSorterMergerNew(nIter);
    if( pMerger==0 ){
      rc = SQLITE_NOMEM;
    }else{
      int iIter = 0;
      int iThread = 0;
      for(iThread=0; iThread<pSorter->nThread; iThread++){
        int iPMA;
        i64 iReadOff = 0;
        SorterThread *pThread = &pSorter->aThread[iThread];
        for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){
          i64 nDummy = 0;
          VdbeSorterIter *pIter = &pMerger->aIter[iIter++];
          rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy);