/ Check-in [d03f5b86]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix further code and documentation issues in vdbesort.c.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads
Files: files | file ages | folders
SHA1: d03f5b8622d304f029f73c7cd0bee3182a81d081
User & Date: dan 2014-04-15 19:52:34
Context
2014-04-15
20:52
Fix some problems to do with OOM conditions in vdbesort.c. Some problems remain. check-in: 2f94f9ce user: dan tags: threads
19:52
Fix further code and documentation issues in vdbesort.c. check-in: d03f5b86 user: dan tags: threads
2014-04-14
19:23
Allow the sorter to begin returning data to the VDBE as soon as it is available, instead of waiting until all keys have been sorted. check-in: cb0ab20c user: dan tags: threads
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/vdbesort.c.

     6      6   **
     7      7   **    May you do good and not evil.
     8      8   **    May you find forgiveness for yourself and forgive others.
     9      9   **    May you share freely, never taking more than you give.
    10     10   **
    11     11   *************************************************************************
    12     12   ** This file contains code for the VdbeSorter object, used in concert with
    13         -** a VdbeCursor to sort large numbers of keys for CREATE TABLE statements
           13  +** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
    14     14   ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
    15     15   ** using indexes and without LIMIT clauses.
    16     16   **
    17     17   ** The VdbeSorter object implements a multi-threaded external merge sort
    18     18   ** algorithm that is efficient even if the number of element being sorted
    19     19   ** exceeds the available memory.
    20     20   **
................................................................................
    53     53   **
    54     54   **    sqlite3VdbeSorterReset()      Refurbish the VdbeSorter for reuse.  This
    55     55   **                                  is like Close() followed by Init() only
    56     56   **                                  much faster.
    57     57   **
    58     58   ** The interfaces above must be called in a particular order.  Write() can 
    59     59   ** only occur in between Init()/Reset() and Rewind().  Next(), Rowkey(), and
    60         -** Compare() can only occur in between Rewind() and Close()/Reset().
           60  +** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
           61  +**
           62  +**   Init()
           63  +**   for each record: Write()
           64  +**   Rewind()
           65  +**     Rowkey()/Compare()
           66  +**   Next() 
           67  +**   Close()
    61     68   **
    62     69   ** Algorithm:
    63     70   **
    64         -** Records to be sorted are initially held in memory, in the order in
    65         -** which they arrive from Write().  When the amount of memory needed exceeds
    66         -** a threshold, all in-memory records are sorted and then appended to
    67         -** a temporary file as a "Packed-Memory-Array" or "PMA" and the memory is
    68         -** reset.  There is a single temporary file used for all PMAs.  The PMAs
    69         -** are packed one after another in the file.  The VdbeSorter object keeps
    70         -** track of the number of PMAs written.
    71         -**
    72         -** When the Rewind() is seen, any records still held in memory are sorted.
    73         -** If no PMAs have been written (if all records are still held in memory)
    74         -** then subsequent Rowkey(), Next(), and Compare() operations work directly
    75         -** from memory.  But if PMAs have been written things get a little more
    76         -** complicated.
    77         -**
    78         -** When Rewind() is seen after PMAs have been written, any records still
    79         -** in memory are sorted and written as a final PMA.  Then all the PMAs
    80         -** are merged together into a single massive PMA that Next(), Rowkey(),
    81         -** and Compare() walk to extract the records in sorted order.
    82         -**
    83         -** If SQLITE_MAX_WORKER_THREADS is non-zero, various steps of the above
    84         -** algorithm might be performed in parallel by separate threads.  Threads
    85         -** are only used when one or more PMA spill to disk.  If the sort is small
    86         -** enough to fit entirely in memory, everything happens on the main thread.
           71  +** Records passed to the sorter via calls to Write() are initially held 
           72  +** unsorted in main memory. Assuming the amount of memory used never exceeds
           73  +** a threshold, when Rewind() is called the set of records is sorted using
           74  +** an in-memory merge sort. In this case, no temporary files are required
           75  +** and subsequent calls to Rowkey(), Next() and Compare() read records 
           76  +** directly from main memory.
           77  +**
           78  +** If the amount of space used to store records in main memory exceeds the
           79  +** threshold, then the set of records currently in memory are sorted and
           80  +** written to a temporary file in "Packed Memory Array" (PMA) format.
           81  +** A PMA created at this point is known as a "level-0 PMA". Higher levels
           82  +** of PMAs may be created by merging existing PMAs together - for example
           83  +** merging two or more level-0 PMAs together creates a level-1 PMA.
           84  +**
           85  +** The threshold for the amount of main memory to use before flushing 
           86  +** records to a PMA is roughly the same as the limit configured for the
           87  +** page-cache of the main database. Specifically, the threshold is set to 
           88  +** the value returned multiplied by "PRAGMA main.page_size" multipled by 
           89  +** that returned by "PRAGMA main.cache_size", in bytes.
           90  +**
           91  +** If the sorter is running in single-threaded mode, then all PMAs generated
           92  +** are appended to a single temporary file. Or, if the sorter is running in
           93  +** multi-threaded mode then up to (N+1) temporary files may be opened, where
           94  +** N is the configured number of worker threads. In this case, instead of
           95  +** sorting the records and writing the PMA to a temporary file itself, the
           96  +** calling thread usually launches a worker thread to do so. Except, if
           97  +** there are already N worker threads running, the main thread does the work
           98  +** itself.
           99  +**
          100  +** The sorter is running in multi-threaded mode if (a) the library was built
          101  +** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
          102  +** than zero, and (b) worker threads have been enabled at runtime by calling
          103  +** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...).
          104  +**
          105  +** When Rewind() is called, any data remaining in memory is flushed to a 
          106  +** final PMA. So at this point the data is stored in some number of sorted
          107  +** PMAs within temporary files on disk. Within a single file sorter is 
          108  +** running in single threaded mode, or distributed between one or more files
          109  +** for multi-threaded sorters.
          110  +**
          111  +** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
          112  +** sorter is running in single-threaded mode, then these PMAs are merged
          113  +** incrementally as keys are retreived from the sorter by the VDBE. See
          114  +** comments above object MergeEngine below for details.
          115  +**
          116  +** Or, if running in multi-threaded mode, then a background thread is
          117  +** launched to merge the existing PMAs. Once the background thread has
          118  +** merged T bytes of data into a single sorted PMA, the main thread 
          119  +** begins reading keys from that PMA while the background thread proceeds
          120  +** with merging the next T bytes of data. And so on.
          121  +**
          122  +** Parameter T is set to half the value of the memory threshold used 
          123  +** by Write() above to determine when to create a new PMA.
          124  +**
          125  +** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when 
          126  +** Rewind() is called, then a hierarchy of incremental-merges is used. 
          127  +** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on 
          128  +** disk are merged together. Then T bytes of data from the second set, and
          129  +** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
          130  +** PMAs at a time. This done is to improve locality.
          131  +**
          132  +** If running in multi-threaded mode and there are more than
          133  +** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
          134  +** than one background thread may be created. Specifically, there may be
          135  +** one background thread for each temporary file on disk, and one background
          136  +** thread to merge the output of each of the others to a single PMA for
          137  +** the main thread to read from.
    87    138   */
    88    139   #include "sqliteInt.h"
    89    140   #include "vdbeInt.h"
    90    141   
    91    142   /* 
    92    143   ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
    93    144   ** messages to stderr that may be helpful in understanding the performance
................................................................................
    98    149   #endif
    99    150   
   100    151   /*
   101    152   ** Private objects used by the sorter
   102    153   */
   103    154   typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
   104    155   typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
   105         -typedef struct PmaWriter PmaWriter;         /* Incrementally write on PMA */
          156  +typedef struct PmaWriter PmaWriter;         /* Incrementally write one PMA */
   106    157   typedef struct SorterRecord SorterRecord;   /* A record being sorted */
   107    158   typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
   108         -typedef struct SorterFile SorterFile;
   109         -typedef struct SorterThread SorterThread;
   110         -typedef struct SorterList SorterList;
          159  +typedef struct SorterFile SorterFile;       /* Temporary file object wrapper */
          160  +typedef struct SorterList SorterList;       /* In-memory list of records */
   111    161   typedef struct IncrMerger IncrMerger;
   112    162   
   113    163   /*
   114    164   ** A container for a temp file handle and the current amount of data 
   115    165   ** stored in the file.
   116    166   */
   117    167   struct SorterFile {
   118    168     sqlite3_file *pFd;              /* File handle */
   119    169     i64 iEof;                       /* Bytes of data stored in pFd */
   120    170   };
   121    171   
   122    172   /*
   123         -** An object of this type is used to store the thread handle for each 
   124         -** background thread launched by the sorter. Before the thread is launched,
   125         -** variable bDone is set to 0. Then, right before it exits, the thread 
   126         -** itself sets bDone to 1.
   127         -**
   128         -** This is then used for two purposes:
   129         -**
   130         -**   1. When flushing the contents of memory to a level-0 PMA on disk, to
   131         -**      attempt to select a SortSubtask for which there is not already an
   132         -**      active background thread (since doing so causes the main thread
   133         -**      to block until it finishes).
   134         -**
   135         -**   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
   136         -**      to sqlite3ThreadJoin() is likely to block.
   137         -**
   138         -** In both cases, the effects of the main thread seeing (bDone==0) even
   139         -** after the thread has finished are not dire. So we don't worry about
   140         -** memory barriers and such here.
          173  +** In memory linked list of records.
   141    174   */
   142         -struct SorterThread {
   143         -  SQLiteThread *pThread;
   144         -  int bDone;
   145         -};
   146         -
   147    175   struct SorterList {
   148    176     SorterRecord *pList;            /* Linked list of records */
   149    177     u8 *aMemory;                    /* If non-NULL, blob of memory for pList */
   150    178     int szPMA;                      /* Size of pList as PMA in bytes */
   151    179   };
   152         -
   153         -/*
   154         -** Sorting is divided up into smaller subtasks.  Each subtask is controlled
   155         -** by an instance of this object. A Subtask might run in either the main thread
   156         -** or in a background thread.
   157         -**
   158         -** Exactly VdbeSorter.nTask instances of this object are allocated
   159         -** as part of each VdbeSorter object. Instances are never allocated any other
   160         -** way. VdbeSorter.nTask is set to the number of worker threads allowed
   161         -** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
   162         -**
   163         -** When a background thread is launched to perform work, SortSubtask.bDone
   164         -** is set to 0 and the SortSubtask.pTask variable set to point to the
   165         -** thread handle. SortSubtask.bDone is set to 1 (to indicate to the main
   166         -** thread that joining SortSubtask.pTask will not block) before the thread
   167         -** exits. SortSubtask.pTask and bDone are always cleared after the 
   168         -** background thread has been joined.
   169         -**
   170         -** One object (specifically, VdbeSorter.aTask[VdbeSorter.nTask-1])
   171         -** is reserved for the foreground thread.
   172         -**
   173         -** The nature of the work performed is determined by SortSubtask.eWork,
   174         -** as follows:
   175         -**
   176         -**   SORT_SUBTASK_SORT:
   177         -**     Sort the linked list of records at SortSubtask.pList.
   178         -**
   179         -**   SORT_SUBTASK_TO_PMA:
   180         -**     Sort the linked list of records at SortSubtask.pList, and write
   181         -**     the results to a new PMA in temp file SortSubtask.pTemp1. Open
   182         -**     the temp file if it is not already open.
   183         -**
   184         -**   SORT_SUBTASK_CONS:
   185         -**     Merge existing PMAs until SortSubtask.nConsolidate or fewer
   186         -**     remain in temp file SortSubtask.pTemp1.
   187         -*/
   188         -struct SortSubtask {
   189         -  SorterThread thread;
   190         -  sqlite3 *db;                    /* Database connection */
   191         -  VdbeSorter *pSorter;            /* Sorter */
   192         -  KeyInfo *pKeyInfo;              /* How to compare records */
   193         -  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
   194         -  int pgsz;                       /* Main database page size */
   195         -  SorterList list;                /* List for thread to write to a PMA */
   196         -  int nPMA;                       /* Number of PMAs currently in file */
   197         -  SorterFile file;                /* Temp file for level-0 PMAs */
   198         -  SorterFile file2;               /* Space for other PMAs */
   199         -};
   200         -
   201    180   
   202    181   /*
   203    182   ** The MergeEngine object is used to combine two or more smaller PMAs into
   204    183   ** one big PMA using a merge operation.  Separate PMAs all need to be
   205    184   ** combined into one big PMA in order to be able to step through the sorted
   206    185   ** records in order.
   207    186   **
................................................................................
   264    243   */
   265    244   struct MergeEngine {
   266    245     int nTree;                 /* Used size of aTree/aIter (power of 2) */
   267    246     int *aTree;                /* Current state of incremental merge */
   268    247     PmaReader *aIter;          /* Array of iterators to merge data from */
   269    248   };
   270    249   
          250  +/*
          251  +** Exactly VdbeSorter.nTask instances of this object are allocated
          252  +** as part of each VdbeSorter object. Instances are never allocated any
          253  +** other way. VdbeSorter.nTask is set to the number of worker threads allowed
          254  +** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
          255  +**
          256  +** Essentially, this structure contains all those fields of the VdbeSorter
          257  +** structure for which each thread requires a separate instance. For example,
          258  +** each thread requries its own UnpackedRecord object to unpack records in
          259  +** as part of comparison operations.
          260  +**
          261  +** Before a background thread is launched, variable bDone is set to 0. Then, 
          262  +** right before it exits, the thread itself sets bDone to 1. This is used for 
          263  +** two purposes:
          264  +**
          265  +**   1. When flushing the contents of memory to a level-0 PMA on disk, to
          266  +**      attempt to select a SortSubtask for which there is not already an
          267  +**      active background thread (since doing so causes the main thread
          268  +**      to block until it finishes).
          269  +**
          270  +**   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
          271  +**      to sqlite3ThreadJoin() is likely to block. Cases that are likely to
          272  +**      block provoke debugging output.
          273  +**
          274  +** In both cases, the effects of the main thread seeing (bDone==0) even
          275  +** after the thread has finished are not dire. So we don't worry about
          276  +** memory barriers and such here.
          277  +*/
          278  +struct SortSubtask {
          279  +  SQLiteThread *pThread;          /* Background thread, if any */
          280  +  int bDone;                      /* Set if thread is finished but not joined */
          281  +  VdbeSorter *pSorter;            /* Sorter that owns this sub-task */
          282  +  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
          283  +  SorterList list;                /* List for thread to write to a PMA */
          284  +  int nPMA;                       /* Number of PMAs currently in file */
          285  +  SorterFile file;                /* Temp file for level-0 PMAs */
          286  +  SorterFile file2;               /* Space for other PMAs */
          287  +};
          288  +
   271    289   /*
   272    290   ** Main sorter structure. A single instance of this is allocated for each 
   273    291   ** sorter cursor created by the VDBE.
   274    292   **
   275    293   ** mxKeysize:
   276    294   **   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
   277    295   **   this variable is updated so as to be set to the size on disk of the
   278    296   **   largest record in the sorter.
   279    297   */
   280    298   struct VdbeSorter {
   281    299     int mnPmaSize;                  /* Minimum PMA size, in bytes */
   282    300     int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
          301  +  int mxKeysize;                  /* Largest serialized key seen so far */
          302  +  int pgsz;                       /* Main database page size */
   283    303     PmaReader *pReader;             /* Read data from here after Rewind() */
   284    304     MergeEngine *pMerger;           /* Or here, if bUseThreads==0 */
   285         -  int mxKeysize;                  /* Largest serialized key seen so far */
          305  +  sqlite3 *db;                    /* Database connection */
          306  +  KeyInfo *pKeyInfo;              /* How to compare records */
   286    307     UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
   287    308     SorterList list;                /* List of in-memory records */
   288    309     int iMemory;                    /* Offset of free space in list.aMemory */
   289    310     int nMemory;                    /* Size of list.aMemory allocation in bytes */
   290    311     u8 bUsePMA;                     /* True if one or more PMAs created */
   291    312     u8 bUseThreads;                 /* True to use background threads */
   292    313     u8 iPrev;                       /* Previous thread used to flush PMA */
................................................................................
   314    335   };
   315    336   
   316    337   /*
   317    338   ** Normally, a PmaReader object iterates through an existing PMA stored 
   318    339   ** within a temp file. However, if the PmaReader.pIncr variable points to
   319    340   ** an object of the following type, it may be used to iterate/merge through
   320    341   ** multiple PMAs simultaneously.
          342  +**
          343  +** There are two types of IncrMerger object - single (bUseThread==0) and 
          344  +** multi-threaded (bUseThread==1). 
          345  +**
          346  +** A multi-threaded IncrMerger object uses two temporary files - aFile[0] 
          347  +** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in 
          348  +** size. When the IncrMerger is initialized, it reads enough data from 
          349  +** pMerger to populate aFile[0]. It then sets variables within the 
          350  +** corresponding PmaReader object to read from that file and kicks off 
          351  +** a background thread to populate aFile[1] with the next mxSz bytes of 
          352  +** sorted record data from pMerger. 
          353  +**
          354  +** When the PmaReader reaches the end of aFile[0], it blocks until the
          355  +** background thread has finished populating aFile[1]. It then exchanges
          356  +** the contents of the aFile[0] and aFile[1] variables within this structure,
          357  +** sets the PmaReader fields to read from the new aFile[0] and kicks off
          358  +** another background thread to populate the new aFile[1]. And so on, until
          359  +** the contents of pMerger are exhausted.
          360  +**
          361  +** A single-threaded IncrMerger does not open any temporary files of its
          362  +** own. Instead, it has exclusive access to mxSz bytes of space beginning
          363  +** at offset iStartOff of file pTask->file2. And instead of using a 
          364  +** background thread to prepare data for the PmaReader, with a single
          365  +** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
          366  +** keys from pMerger by the calling thread whenever the PmaReader runs out
          367  +** of data.
   321    368   */
   322    369   struct IncrMerger {
   323    370     SortSubtask *pTask;             /* Task that owns this merger */
   324         -  SorterThread thread;            /* Thread for populating aFile[1] */
   325    371     MergeEngine *pMerger;           /* Merge engine thread reads data from */
   326    372     i64 iStartOff;                  /* Offset to start writing file at */
   327    373     int mxSz;                       /* Maximum bytes of data to store */
   328    374     int bEof;                       /* Set to true when merge is finished */
   329    375     int bUseThread;                 /* True to use a bg thread for this object */
   330    376     SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
   331    377   };
................................................................................
   350    396   
   351    397   /*
   352    398   ** This object is the header on a single record while that record is being
   353    399   ** held in memory and prior to being written out as part of a PMA.
   354    400   **
   355    401   ** How the linked list is connected depends on how memory is being managed
   356    402   ** by this module. If using a separate allocation for each in-memory record
   357         -** (VdbeSorter.aMemory==0), then the list is always connected using the
          403  +** (VdbeSorter.list.aMemory==0), then the list is always connected using the
   358    404   ** SorterRecord.u.pNext pointers.
   359    405   **
   360         -** Or, if using the single large allocation method (VdbeSorter.aMemory!=0),
          406  +** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
   361    407   ** then while records are being accumulated the list is linked using the
   362    408   ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
   363    409   ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
   364    410   ** has finished passing records to the sorter, or when the in-memory buffer
   365    411   ** is full, the list is sorted. As part of the sorting process, it is
   366    412   ** converted to use the SorterRecord.u.pNext pointers. See function
   367    413   ** vdbeSorterSort() for details.
................................................................................
   386    432   ** page size in bytes.  */
   387    433   #define SORTER_MIN_WORKING 10
   388    434   
   389    435   /* Maximum number of PMAs that a single MergeEngine can merge */
   390    436   #define SORTER_MAX_MERGE_COUNT 16
   391    437   
   392    438   static int vdbeIncrSwap(IncrMerger*);
   393         -static void vdbeIncrFree(IncrMerger*);
          439  +static void vdbeIncrFree(IncrMerger *);
   394    440   
   395    441   /*
   396    442   ** Free all memory belonging to the PmaReader object passed as the second
   397    443   ** argument. All structure fields are set to zero before returning.
   398    444   */
   399    445   static void vdbePmaReaderClear(PmaReader *pIter){
   400    446     sqlite3_free(pIter->aAlloc);
................................................................................
   527    573         sqlite3GetVarint(aVarint, pnOut);
   528    574       }
   529    575     }
   530    576   
   531    577     return SQLITE_OK;
   532    578   }
   533    579   
          580  +/*
          581  +** Attempt to memory map file pFile. If successful, set *pp to point to the
          582  +** new mapping and return SQLITE_OK. If the mapping is not attempted 
          583  +** (because the file is too large or the VFS layer is configured not to use
          584  +** mmap), return SQLITE_OK and set *pp to NULL.
          585  +**
          586  +** Or, if an error occurs, return an SQLite error code. The final value of
          587  +** *pp is undefined in this case.
          588  +*/
   534    589   static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
   535    590     int rc = SQLITE_OK;
   536         -  if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
          591  +  if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
   537    592       rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp);
   538    593     }
   539    594     return rc;
   540    595   }
   541    596   
   542         -static int vdbePmaReaderReinit(PmaReader *pIter){
   543         -  IncrMerger *pIncr = pIter->pIncr;
   544         -  SortSubtask *pTask = pIncr->pTask;
          597  +/*
          598  +** Seek iterator pIter to offset iOff within file pFile. Return SQLITE_OK 
          599  +** if successful, or an SQLite error code if an error occurs.
          600  +*/
          601  +static int vdbePmaReaderSeek(
          602  +  SortSubtask *pTask,             /* Task context */
          603  +  PmaReader *pIter,               /* Iterate to populate */
          604  +  SorterFile *pFile,              /* Sorter file to read from */
          605  +  i64 iOff                        /* Offset in pFile */
          606  +){
   545    607     int rc = SQLITE_OK;
   546    608   
   547         -  assert( pIncr->bEof==0 );
          609  +  assert( pIter->pIncr==0 || pIter->pIncr->bEof==0 );
   548    610   
   549    611     if( pIter->aMap ){
   550    612       sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
   551    613       pIter->aMap = 0;
   552    614     }
   553         -  pIter->iReadOff = pIncr->iStartOff;
   554         -  pIter->iEof = pIncr->aFile[0].iEof;
   555         -  pIter->pFile = pIncr->aFile[0].pFd;
   556         -
   557         -  rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
   558         -  if( rc==SQLITE_OK ){
   559         -    if( pIter->aMap==0 ){
   560         -      /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
   561         -      int iBuf = pIter->iReadOff % pTask->pgsz;
   562         -      if( pIter->aBuffer==0 ){
   563         -        pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
   564         -        if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
   565         -        pIter->nBuffer = pTask->pgsz;
   566         -      }
   567         -      if( iBuf ){
   568         -        int nRead = pTask->pgsz - iBuf;
   569         -        if( (pIter->iReadOff + nRead) > pIter->iEof ){
   570         -          nRead = (int)(pIter->iEof - pIter->iReadOff);
   571         -        }
   572         -        rc = sqlite3OsRead(
   573         -            pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
   574         -        );
   575         -        assert( rc!=SQLITE_IOERR_SHORT_READ );
   576         -      }
   577         -    }
   578         -  }
   579         -
   580         -  return rc;
   581         -}
   582         -
          615  +  pIter->iReadOff = iOff;
          616  +  pIter->iEof = pFile->iEof;
          617  +  pIter->pFile = pFile->pFd;
          618  +
          619  +  rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
          620  +  if( rc==SQLITE_OK && pIter->aMap==0 ){
          621  +    int pgsz = pTask->pSorter->pgsz;
          622  +    int iBuf = pIter->iReadOff % pgsz;
          623  +    if( pIter->aBuffer==0 ){
          624  +      pIter->aBuffer = (u8*)sqlite3Malloc(pgsz);
          625  +      if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
          626  +      pIter->nBuffer = pgsz;
          627  +    }
          628  +    if( iBuf ){
          629  +      int nRead = pgsz - iBuf;
          630  +      if( (pIter->iReadOff + nRead) > pIter->iEof ){
          631  +        nRead = (int)(pIter->iEof - pIter->iReadOff);
          632  +      }
          633  +      rc = sqlite3OsRead(
          634  +          pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
          635  +      );
          636  +      assert( rc!=SQLITE_IOERR_SHORT_READ );
          637  +    }
          638  +  }
          639  +
          640  +  return rc;
          641  +}
   583    642   
   584    643   /*
   585    644   ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
   586    645   ** no error occurs, or an SQLite error code if one does.
   587    646   */
   588    647   static int vdbePmaReaderNext(PmaReader *pIter){
   589    648     int rc = SQLITE_OK;             /* Return Code */
   590    649     u64 nRec = 0;                   /* Size of record in bytes */
   591    650   
   592    651     if( pIter->iReadOff>=pIter->iEof ){
          652  +    IncrMerger *pIncr = pIter->pIncr;
   593    653       int bEof = 1;
   594         -    if( pIter->pIncr ){
   595         -      rc = vdbeIncrSwap(pIter->pIncr);
   596         -      if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){
   597         -        rc = vdbePmaReaderReinit(pIter);
          654  +    if( pIncr ){
          655  +      rc = vdbeIncrSwap(pIncr);
          656  +      if( rc==SQLITE_OK && pIncr->bEof==0 ){
          657  +        rc = vdbePmaReaderSeek(
          658  +            pIncr->pTask, pIter, &pIncr->aFile[0], pIncr->iStartOff
          659  +        );
   598    660           bEof = 0;
   599    661         }
   600    662       }
   601    663   
   602    664       if( bEof ){
   603    665         /* This is an EOF condition */
   604    666         vdbePmaReaderClear(pIter);
................................................................................
   629    691   static int vdbePmaReaderInit(
   630    692     SortSubtask *pTask,             /* Task context */
   631    693     SorterFile *pFile,              /* Sorter file to read from */
   632    694     i64 iStart,                     /* Start offset in pFile */
   633    695     PmaReader *pIter,               /* Iterator to populate */
   634    696     i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
   635    697   ){
   636         -  int rc = SQLITE_OK;
   637         -  int nBuf = pTask->pgsz;
          698  +  int rc;
   638    699   
   639    700     assert( pFile->iEof>iStart );
   640         -  assert( pIter->aAlloc==0 );
          701  +  assert( pIter->aAlloc==0 && pIter->nAlloc==0 );
   641    702     assert( pIter->aBuffer==0 );
   642         -  pIter->pFile = pFile->pFd;
   643         -  pIter->iReadOff = iStart;
   644         -  pIter->nAlloc = 128;
   645         -  pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
   646         -  if( pIter->aAlloc ){
   647         -    /* Try to xFetch() a mapping of the entire temp file. If this is possible,
   648         -    ** the PMA will be read via the mapping. Otherwise, use xRead().  */
   649         -    rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
   650         -  }else{
   651         -    rc = SQLITE_NOMEM;
   652         -  }
          703  +  assert( pIter->aMap==0 );
   653    704   
   654         -  if( rc==SQLITE_OK && pIter->aMap==0 ){
   655         -    pIter->nBuffer = nBuf;
   656         -    pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
   657         -    if( !pIter->aBuffer ){
   658         -      rc = SQLITE_NOMEM;
   659         -    }else{
   660         -      int iBuf = iStart % nBuf;
   661         -      if( iBuf ){
   662         -        int nRead = nBuf - iBuf;
   663         -        if( (iStart + nRead) > pFile->iEof ){
   664         -          nRead = (int)(pFile->iEof - iStart);
   665         -        }
   666         -        rc = sqlite3OsRead(
   667         -            pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
   668         -        );
   669         -        assert( rc!=SQLITE_IOERR_SHORT_READ );
   670         -      }
   671         -    }
   672         -  }
   673         -
          705  +  rc = vdbePmaReaderSeek(pTask, pIter, pFile, iStart);
   674    706     if( rc==SQLITE_OK ){
   675    707       u64 nByte;                    /* Size of PMA in bytes */
   676         -    pIter->iEof = pFile->iEof;
   677    708       rc = vdbePmaReadVarint(pIter, &nByte);
   678    709       pIter->iEof = pIter->iReadOff + nByte;
   679    710       *pnByte += nByte;
   680    711     }
   681    712   
   682    713     if( rc==SQLITE_OK ){
   683    714       rc = vdbePmaReaderNext(pIter);
................................................................................
   702    733   static int vdbeSorterCompare(
   703    734     SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
   704    735     const void *pKey1, int nKey1,   /* Left side of comparison */
   705    736     const void *pKey2, int nKey2    /* Right side of comparison */
   706    737   ){
   707    738     UnpackedRecord *r2 = pTask->pUnpacked;
   708    739     if( pKey2 ){
   709         -    sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2);
          740  +    sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
   710    741     }
   711    742     return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
   712    743   }
   713    744   
   714    745   /*
   715    746   ** This function is called to compare two iterator keys when merging 
   716    747   ** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
................................................................................
   784    815     sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
   785    816   
   786    817     pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
   787    818     pCsr->pSorter = pSorter;
   788    819     if( pSorter==0 ){
   789    820       rc = SQLITE_NOMEM;
   790    821     }else{
   791         -    pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
          822  +    pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
   792    823       memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
   793    824       pKeyInfo->db = 0;
   794    825       if( nField && nWorker==0 ) pKeyInfo->nField = nField;
   795         -    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
   796         -
          826  +    pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
   797    827       pSorter->nTask = nWorker + 1;
   798    828       pSorter->bUseThreads = (pSorter->nTask>1);
          829  +    pSorter->db = db;
   799    830       for(i=0; i<pSorter->nTask; i++){
   800    831         SortSubtask *pTask = &pSorter->aTask[i];
   801         -      pTask->pKeyInfo = pKeyInfo;
   802         -      pTask->pgsz = pgsz;
   803         -      pTask->db = db;
   804    832         pTask->pSorter = pSorter;
   805    833       }
   806    834   
   807    835       if( !sqlite3TempInMemory(db) ){
   808    836         pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
   809    837         mxCache = db->aDb[0].pSchema->cache_size;
   810    838         if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
................................................................................
   900    928   # define vdbeSorterRewindDebug(x,y)
   901    929   # define vdbeSorterPopulateDebug(x,y)
   902    930   # define vdbeSorterBlockDebug(x,y,z)
   903    931   #endif
   904    932   
   905    933   #if SQLITE_MAX_WORKER_THREADS>0
   906    934   /*
   907         -** Join thread p.
          935  +** Join thread pTask->thread.
   908    936   */
   909         -static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){
          937  +static int vdbeSorterJoinThread(SortSubtask *pTask){
   910    938     int rc = SQLITE_OK;
   911         -  if( p->pThread ){
          939  +  if( pTask->pThread ){
   912    940   #ifdef SQLITE_DEBUG_SORTER_THREADS
   913         -    int bDone = p->bDone;
          941  +    int bDone = pTask->bDone;
   914    942   #endif
   915    943       void *pRet;
   916    944       vdbeSorterBlockDebug(pTask, !bDone, "enter");
   917         -    rc = sqlite3ThreadJoin(p->pThread, &pRet);
          945  +    rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
   918    946       vdbeSorterBlockDebug(pTask, !bDone, "exit");
   919    947       if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
   920         -    assert( p->bDone==1 );
   921         -    p->bDone = 0;
   922         -    p->pThread = 0;
          948  +    assert( pTask->bDone==1 );
          949  +    pTask->bDone = 0;
          950  +    pTask->pThread = 0;
   923    951     }
   924    952     return rc;
   925    953   }
   926    954   
   927    955   /*
   928    956   ** Launch a background thread to run xTask(pIn).
   929    957   */
   930    958   static int vdbeSorterCreateThread(
   931         -  SorterThread *p,                /* Thread object to populate */
          959  +  SortSubtask *pTask,             /* Thread will use this task object */
   932    960     void *(*xTask)(void*),          /* Routine to run in a separate thread */
   933    961     void *pIn                       /* Argument passed into xTask() */
   934    962   ){
   935         -  assert( p->pThread==0 && p->bDone==0 );
   936         -  return sqlite3ThreadCreate(&p->pThread, xTask, pIn);
          963  +  assert( pTask->pThread==0 && pTask->bDone==0 );
          964  +  return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
   937    965   }
   938    966   
   939    967   /*
   940    968   ** Join all outstanding threads launched by SorterWrite() to create 
   941    969   ** level-0 PMAs.
   942    970   */
   943    971   static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
   944    972     int rc = rcin;
   945    973     int i;
   946    974     for(i=0; i<pSorter->nTask; i++){
   947    975       SortSubtask *pTask = &pSorter->aTask[i];
   948         -    int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread);
          976  +    int rc2 = vdbeSorterJoinThread(pTask);
   949    977       if( rc==SQLITE_OK ) rc = rc2;
   950    978     }
   951    979     return rc;
   952    980   }
   953    981   #else
   954    982   # define vdbeSorterJoinAll(x,rcin) (rcin)
   955         -# define vdbeSorterJoinThread(pTask,p) SQLITE_OK
          983  +# define vdbeSorterJoinThread(pTask) SQLITE_OK
   956    984   #endif
   957    985   
   958    986   /*
   959    987   ** Allocate a new MergeEngine object with space for nIter iterators.
   960    988   */
   961    989   static MergeEngine *vdbeMergeEngineNew(int nIter){
   962    990     int N = 2;                      /* Smallest power of two >= nIter */
................................................................................
   985   1013     if( pMerger ){
   986   1014       for(i=0; i<pMerger->nTree; i++){
   987   1015         vdbePmaReaderClear(&pMerger->aIter[i]);
   988   1016       }
   989   1017     }
   990   1018     sqlite3_free(pMerger);
   991   1019   }
         1020  +
         1021  +/*
         1022  +** Free all resources associated with the IncrMerger object indicated by
         1023  +** the first argument.
         1024  +*/
         1025  +static void vdbeIncrFree(IncrMerger *pIncr){
         1026  +  if( pIncr ){
         1027  +#if SQLITE_MAX_WORKER_THREADS>0
         1028  +    if( pIncr->bUseThread ){
         1029  +      vdbeSorterJoinThread(pIncr->pTask);
         1030  +      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
         1031  +      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
         1032  +    }
         1033  +#endif
         1034  +    vdbeMergeEngineFree(pIncr->pMerger);
         1035  +    sqlite3_free(pIncr);
         1036  +  }
         1037  +}
   992   1038   
   993   1039   /*
   994   1040   ** Reset a sorting cursor back to its original empty state.
   995   1041   */
   996   1042   void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
   997   1043     int i;
   998   1044     (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
................................................................................
  1047   1093     if( rc==SQLITE_OK ){
  1048   1094       i64 max = SQLITE_MAX_MMAP_SIZE;
  1049   1095       sqlite3OsFileControlHint( *ppFile, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
  1050   1096     }
  1051   1097     return rc;
  1052   1098   }
  1053   1099   
         1100  +/*
         1101  +** If it has not already been allocated, allocate the UnpackedRecord 
         1102  +** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or 
         1103  +** if no allocation was required), or SQLITE_NOMEM otherwise.
         1104  +*/
  1054   1105   static int vdbeSortAllocUnpacked(SortSubtask *pTask){
  1055   1106     if( pTask->pUnpacked==0 ){
  1056   1107       char *pFree;
  1057   1108       pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
  1058         -        pTask->pKeyInfo, 0, 0, &pFree
         1109  +        pTask->pSorter->pKeyInfo, 0, 0, &pFree
  1059   1110       );
  1060   1111       assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
  1061   1112       if( pFree==0 ) return SQLITE_NOMEM;
  1062         -    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
         1113  +    pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
  1063   1114       pTask->pUnpacked->errCode = 0;
  1064   1115     }
  1065   1116     return SQLITE_OK;
  1066   1117   }
  1067   1118   
  1068   1119   
  1069   1120   /*
................................................................................
  1276   1327   **       in the PMA (not including the varint itself).
  1277   1328   **
  1278   1329   **     * One or more records packed end-to-end in order of ascending keys. 
  1279   1330   **       Each record consists of a varint followed by a blob of data (the 
  1280   1331   **       key). The varint is the number of bytes in the blob of data.
  1281   1332   */
  1282   1333   static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
         1334  +  sqlite3 *db = pTask->pSorter->db;
  1283   1335     int rc = SQLITE_OK;             /* Return code */
  1284   1336     PmaWriter writer;               /* Object used to write to the file */
  1285   1337   
  1286   1338   #ifdef SQLITE_DEBUG
  1287   1339     /* Set iSz to the expected size of file pTask->file after writing the PMA. 
  1288   1340     ** This is used by an assert() statement at the end of this function.  */
  1289   1341     i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
................................................................................
  1291   1343   
  1292   1344     vdbeSorterWorkDebug(pTask, "enter");
  1293   1345     memset(&writer, 0, sizeof(PmaWriter));
  1294   1346     assert( pList->szPMA>0 );
  1295   1347   
  1296   1348     /* If the first temporary PMA file has not been opened, open it now. */
  1297   1349     if( pTask->file.pFd==0 ){
  1298         -    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd);
         1350  +    rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file.pFd);
  1299   1351       assert( rc!=SQLITE_OK || pTask->file.pFd );
  1300   1352       assert( pTask->file.iEof==0 );
  1301   1353       assert( pTask->nPMA==0 );
  1302   1354     }
  1303   1355   
  1304   1356     /* Try to get the file to memory map */
  1305   1357     if( rc==SQLITE_OK ){
  1306         -    vdbeSorterExtendFile(pTask->db, 
  1307         -        pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9
  1308         -    );
         1358  +    vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
  1309   1359     }
  1310   1360   
  1311   1361     /* Sort the list */
  1312   1362     if( rc==SQLITE_OK ){
  1313   1363       rc = vdbeSorterSort(pTask, pList);
  1314   1364     }
  1315   1365   
  1316   1366     if( rc==SQLITE_OK ){
  1317   1367       SorterRecord *p;
  1318   1368       SorterRecord *pNext = 0;
  1319   1369   
  1320         -    vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
         1370  +    vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
  1321   1371                         pTask->file.iEof);
  1322   1372       pTask->nPMA++;
  1323   1373       vdbePmaWriteVarint(&writer, pList->szPMA);
  1324   1374       for(p=pList->pList; p; p=pNext){
  1325   1375         pNext = p->u.pNext;
  1326   1376         vdbePmaWriteVarint(&writer, p->nVal);
  1327   1377         vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
................................................................................
  1409   1459       *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
  1410   1460     }
  1411   1461   
  1412   1462     return rc;
  1413   1463   }
  1414   1464   
  1415   1465   /*
  1416         -** The main routine for sorter-thread operations.
         1466  +** The main routine for background threads that write level-0 PMAs.
  1417   1467   */
  1418   1468   static void *vdbeSorterFlushThread(void *pCtx){
  1419   1469     SortSubtask *pTask = (SortSubtask*)pCtx;
  1420   1470     int rc;                         /* Return code */
  1421         -  assert( pTask->thread.bDone==0 );
         1471  +  assert( pTask->bDone==0 );
  1422   1472     rc = vdbeSorterListToPMA(pTask, &pTask->list);
  1423         -  pTask->thread.bDone = 1;
         1473  +  pTask->bDone = 1;
  1424   1474     return SQLITE_INT_TO_PTR(rc);
  1425   1475   }
  1426   1476   
  1427   1477   /*
  1428   1478   ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
  1429   1479   ** using a background thread.
  1430   1480   */
................................................................................
  1449   1499     ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
  1450   1500     ** fall back to using the final sub-task. The first (pSorter->nTask-1)
  1451   1501     ** sub-tasks are prefered as they use background threads - the final 
  1452   1502     ** sub-task uses the main thread. */
  1453   1503     for(i=0; i<nWorker; i++){
  1454   1504       int iTest = (pSorter->iPrev + i + 1) % nWorker;
  1455   1505       pTask = &pSorter->aTask[iTest];
  1456         -    if( pTask->thread.bDone ){
  1457         -      rc = vdbeSorterJoinThread(pTask, &pTask->thread);
         1506  +    if( pTask->bDone ){
         1507  +      rc = vdbeSorterJoinThread(pTask);
  1458   1508       }
  1459         -    if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break;
         1509  +    if( pTask->pThread==0 || rc!=SQLITE_OK ) break;
  1460   1510     }
  1461   1511   
  1462   1512     if( rc==SQLITE_OK ){
  1463   1513       if( i==nWorker ){
  1464   1514         /* Use the foreground thread for this operation */
  1465   1515         rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
  1466   1516       }else{
  1467   1517         /* Launch a background thread for this operation */
  1468   1518         u8 *aMem = pTask->list.aMemory;
  1469   1519         void *pCtx = (void*)pTask;
  1470   1520   
  1471         -      assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 );
         1521  +      assert( pTask->pThread==0 && pTask->bDone==0 );
  1472   1522         assert( pTask->list.pList==0 );
  1473   1523         assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
  1474   1524   
  1475   1525         pSorter->iPrev = (pTask - pSorter->aTask);
  1476   1526         pTask->list = pSorter->list;
  1477   1527         pSorter->list.pList = 0;
  1478   1528         pSorter->list.szPMA = 0;
................................................................................
  1480   1530           pSorter->list.aMemory = aMem;
  1481   1531           pSorter->nMemory = sqlite3MallocSize(aMem);
  1482   1532         }else{
  1483   1533           pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
  1484   1534           if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
  1485   1535         }
  1486   1536   
  1487         -      rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx);
         1537  +      rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
  1488   1538       }
  1489   1539     }
  1490   1540   
  1491   1541     return rc;
  1492   1542   #endif
  1493   1543   }
  1494   1544   
................................................................................
  1593   1643   ** except that the number-of-bytes varint is omitted from the start.
  1594   1644   */
  1595   1645   static int vdbeIncrPopulate(IncrMerger *pIncr){
  1596   1646     int rc = SQLITE_OK;
  1597   1647     int rc2;
  1598   1648     i64 iStart = pIncr->iStartOff;
  1599   1649     SorterFile *pOut = &pIncr->aFile[1];
         1650  +  SortSubtask *pTask = pIncr->pTask;
  1600   1651     MergeEngine *pMerger = pIncr->pMerger;
  1601   1652     PmaWriter writer;
  1602   1653     assert( pIncr->bEof==0 );
  1603   1654   
  1604         -  vdbeSorterPopulateDebug(pIncr->pTask, "enter");
         1655  +  vdbeSorterPopulateDebug(pTask, "enter");
  1605   1656   
  1606         -  vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
         1657  +  vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
  1607   1658     while( rc==SQLITE_OK ){
  1608   1659       int dummy;
  1609   1660       PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
  1610   1661       int nKey = pReader->nKey;
  1611   1662       i64 iEof = writer.iWriteOff + writer.iBufEnd;
  1612   1663   
  1613   1664       /* Check if the output file is full or if the input has been exhausted.
................................................................................
  1614   1665       ** In either case exit the loop. */
  1615   1666       if( pReader->pFile==0 ) break;
  1616   1667       if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
  1617   1668   
  1618   1669       /* Write the next key to the output. */
  1619   1670       vdbePmaWriteVarint(&writer, nKey);
  1620   1671       vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
  1621         -    rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
         1672  +    rc = vdbeSorterNext(pTask, pIncr->pMerger, &dummy);
  1622   1673     }
  1623   1674   
  1624   1675     rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  1625   1676     if( rc==SQLITE_OK ) rc = rc2;
  1626         -  vdbeSorterPopulateDebug(pIncr->pTask, "exit");
         1677  +  vdbeSorterPopulateDebug(pTask, "exit");
  1627   1678     return rc;
  1628   1679   }
  1629   1680   
         1681  +#if SQLITE_MAX_WORKER_THREADS>0
         1682  +/*
         1683  +** The main routine for background threads that populate aFile[1] of
         1684  +** multi-threaded IncrMerger objects.
         1685  +*/
  1630   1686   static void *vdbeIncrPopulateThread(void *pCtx){
  1631   1687     IncrMerger *pIncr = (IncrMerger*)pCtx;
  1632   1688     void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
  1633         -  pIncr->thread.bDone = 1;
         1689  +  pIncr->pTask->bDone = 1;
  1634   1690     return pRet;
  1635   1691   }
  1636   1692   
  1637         -#if SQLITE_MAX_WORKER_THREADS>0
         1693  +/*
         1694  +** Launch a background thread to populate aFile[1] of pIncr.
         1695  +*/
  1638   1696   static int vdbeIncrBgPopulate(IncrMerger *pIncr){
  1639         -  void *pCtx = (void*)pIncr;
         1697  +  void *p = (void*)pIncr;
  1640   1698     assert( pIncr->bUseThread );
  1641         -  return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx);
         1699  +  return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
  1642   1700   }
  1643   1701   #endif
  1644   1702   
         1703  +/*
         1704  +** This function is called when the PmaReader corresponding to pIncr has
         1705  +** finished reading the contents of aFile[0]. Its purpose is to "refill"
         1706  +** aFile[0] such that the iterator should start rereading it from the
         1707  +** beginning.
         1708  +**
         1709  +** For single-threaded objects, this is accomplished by literally reading 
         1710  +** keys from pIncr->pMerger and repopulating aFile[0]. 
         1711  +**
         1712  +** For multi-threaded objects, all that is required is to wait until the 
         1713  +** background thread is finished (if it is not already) and then swap 
         1714  +** aFile[0] and aFile[1] in place. If the contents of pMerger have not
         1715  +** been exhausted, this function also launches a new background thread
         1716  +** to populate the new aFile[1].
         1717  +**
         1718  +** SQLITE_OK is returned on success, or an SQLite error code otherwise.
         1719  +*/
  1645   1720   static int vdbeIncrSwap(IncrMerger *pIncr){
  1646   1721     int rc = SQLITE_OK;
  1647   1722   
  1648   1723   #if SQLITE_MAX_WORKER_THREADS>0
  1649   1724     if( pIncr->bUseThread ){
  1650         -    rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
         1725  +    rc = vdbeSorterJoinThread(pIncr->pTask);
  1651   1726   
  1652   1727       if( rc==SQLITE_OK ){
  1653   1728         SorterFile f0 = pIncr->aFile[0];
  1654   1729         pIncr->aFile[0] = pIncr->aFile[1];
  1655   1730         pIncr->aFile[1] = f0;
  1656   1731       }
  1657   1732   
................................................................................
  1671   1746         pIncr->bEof = 1;
  1672   1747       }
  1673   1748     }
  1674   1749   
  1675   1750     return rc;
  1676   1751   }
  1677   1752   
  1678         -static void vdbeIncrFree(IncrMerger *pIncr){
  1679         -  if( pIncr ){
  1680         -#if SQLITE_MAX_WORKER_THREADS>0
  1681         -    vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
  1682         -    if( pIncr->bUseThread ){
  1683         -      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
  1684         -      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
  1685         -    }
  1686         -#endif
  1687         -    vdbeMergeEngineFree(pIncr->pMerger);
  1688         -    sqlite3_free(pIncr);
  1689         -  }
  1690         -}
  1691         -
         1753  +/*
         1754  +** Allocate and return a new IncrMerger object to read data from pMerger.
         1755  +*/
  1692   1756   static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
  1693   1757     IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
  1694   1758     if( pIncr ){
  1695   1759       memset(pIncr, 0, sizeof(IncrMerger));
  1696   1760       pIncr->pMerger = pMerger;
  1697   1761       pIncr->pTask = pTask;
  1698   1762       pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
  1699   1763       pTask->file2.iEof += pIncr->mxSz;
  1700   1764     }
  1701   1765     return pIncr;
  1702   1766   }
  1703   1767   
         1768  +/*
         1769  +** Set the "use-threads" flag on object pIncr.
         1770  +*/
  1704   1771   static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  1705   1772     if( bUseThread ){
  1706   1773       pIncr->bUseThread = 1;
  1707   1774       pIncr->pTask->file2.iEof -= pIncr->mxSz;
  1708   1775     }
  1709   1776   }
  1710   1777   
................................................................................
  1738   1805   }
  1739   1806   
  1740   1807   static int vdbeIncrInit2(PmaReader *pIter, int eMode){
  1741   1808     int rc = SQLITE_OK;
  1742   1809     IncrMerger *pIncr = pIter->pIncr;
  1743   1810     if( pIncr ){
  1744   1811       SortSubtask *pTask = pIncr->pTask;
         1812  +    sqlite3 *db = pTask->pSorter->db;
  1745   1813   
  1746   1814       rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode);
  1747   1815   
  1748   1816       /* Set up the required files for pIncr */
  1749   1817       if( rc==SQLITE_OK ){
  1750   1818         if( pIncr->bUseThread==0 ){
  1751   1819           if( pTask->file2.pFd==0 ){
  1752         -          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
         1820  +          rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file2.pFd);
  1753   1821             assert( pTask->file2.iEof>0 );
  1754   1822             if( rc==SQLITE_OK ){
  1755         -            vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
         1823  +            vdbeSorterExtendFile(db, pTask->file2.pFd, pTask->file2.iEof);
  1756   1824               pTask->file2.iEof = 0;
  1757   1825             }
  1758   1826           }
  1759   1827           if( rc==SQLITE_OK ){
  1760   1828             pIncr->aFile[1].pFd = pTask->file2.pFd;
  1761   1829             pIncr->iStartOff = pTask->file2.iEof;
  1762   1830             pTask->file2.iEof += pIncr->mxSz;
  1763   1831           }
  1764   1832         }else{
  1765         -        rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
         1833  +        rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[0].pFd);
  1766   1834           if( rc==SQLITE_OK ){
  1767         -          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
         1835  +          rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[1].pFd);
  1768   1836           }
  1769   1837         }
  1770   1838       }
  1771   1839   
  1772   1840       if( rc==SQLITE_OK && pIncr->bUseThread ){
  1773   1841         /* Use the current thread */
  1774   1842         assert( eMode==INCRINIT2_ROOT || eMode==INCRINIT2_TASK );
................................................................................
  1782   1850     return rc;
  1783   1851   }
  1784   1852   
  1785   1853   #if SQLITE_MAX_WORKER_THREADS>0
  1786   1854   static void *vdbeIncrInit2Thread(void *pCtx){
  1787   1855     PmaReader *pReader = (PmaReader*)pCtx;
  1788   1856     void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) );
  1789         -  pReader->pIncr->thread.bDone = 1;
         1857  +  pReader->pIncr->pTask->bDone = 1;
  1790   1858     return pRet;
  1791   1859   }
  1792   1860   
  1793   1861   static int vdbeIncrBgInit2(PmaReader *pIter){
  1794   1862     void *pCtx = (void*)pIter;
  1795         -  return vdbeSorterCreateThread(
  1796         -      &pIter->pIncr->thread, vdbeIncrInit2Thread, pCtx
  1797         -  );
         1863  +  return vdbeSorterCreateThread(pIter->pIncr->pTask, vdbeIncrInit2Thread, pCtx);
  1798   1864   }
  1799   1865   #endif
  1800   1866   
  1801   1867   /*
  1802   1868   ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
  1803   1869   ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
  1804   1870   ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
................................................................................
  1881   1947   /*
  1882   1948   ** Populate iterator *pIter so that it may be used to iterate through all 
  1883   1949   ** keys stored in all PMAs created by this sorter.
  1884   1950   */
  1885   1951   static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
  1886   1952     SortSubtask *pTask0 = &pSorter->aTask[0];
  1887   1953     MergeEngine *pMain = 0;
  1888         -  sqlite3 *db = pTask0->db;
         1954  +  sqlite3 *db = pTask0->pSorter->db;
  1889   1955     int rc = SQLITE_OK;
  1890   1956     int iTask;
  1891   1957   
  1892   1958     IncrBuilder *aMerge;
  1893   1959     const int nMerge = 32;
  1894   1960     aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
  1895   1961     if( aMerge==0 ) return SQLITE_NOMEM;