/ Check-in [d03f5b86]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix further code and documentation issues in vdbesort.c.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads
Files: files | file ages | folders
SHA1: d03f5b8622d304f029f73c7cd0bee3182a81d081
User & Date: dan 2014-04-15 19:52:34
Context
2014-04-15
20:52
Fix some problems to do with OOM conditions in vdbesort.c. Some problems remain. check-in: 2f94f9ce user: dan tags: threads
19:52
Fix further code and documentation issues in vdbesort.c. check-in: d03f5b86 user: dan tags: threads
2014-04-14
19:23
Allow the sorter to begin returning data to the VDBE as soon as it is available, instead of waiting until all keys have been sorted. check-in: cb0ab20c user: dan tags: threads
Changes
Hide Diffs Unified Diffs Show Whitespace Changes Patch

Changes to src/vdbesort.c.

6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
..
53
54
55
56
57
58
59
60
61
62





63
64
65
66
67
68
69
70
71

72
73
74
75
76
77
78
79
80
81


82
83
84
85
86



















































87
88
89
90
91
92
93
..
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
...
264
265
266
267
268
269
270







































271
272
273
274
275
276
277
278
279
280
281
282


283
284
285


286
287
288
289
290
291
292
...
314
315
316
317
318
319
320


























321
322
323
324
325
326
327
328
329
330
331
...
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
...
527
528
529
530
531
532
533









534
535
536
537
538
539
540
541




542
543
544




545
546
547

548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592

593
594
595
596
597


598
599
600
601
602
603
604
...
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673

674
675
676
677
678
679
680
681
682
683
...
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
...
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798

799
800
801
802
803
804
805
806
807
808
809
810
...
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
...
985
986
987
988
989
990
991


















992
993
994
995
996
997
998
....
1047
1048
1049
1050
1051
1052
1053





1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
....
1276
1277
1278
1279
1280
1281
1282

1283
1284
1285
1286
1287
1288
1289
....
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
....
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
....
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
....
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
....
1593
1594
1595
1596
1597
1598
1599

1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
....
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629





1630
1631
1632
1633
1634
1635
1636
1637



1638
1639
1640
1641
1642
1643
1644

















1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
....
1671
1672
1673
1674
1675
1676
1677
1678

1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691

1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703



1704
1705
1706
1707
1708
1709
1710
....
1738
1739
1740
1741
1742
1743
1744

1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
....
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
....
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
** This file contains code for the VdbeSorter object, used in concert with
** a VdbeCursor to sort large numbers of keys for CREATE TABLE statements
** or by SELECT statements with ORDER BY clauses that cannot be satisfied
** using indexes and without LIMIT clauses.
**
** The VdbeSorter object implements a multi-threaded external merge sort
** algorithm that is efficient even if the number of element being sorted
** exceeds the available memory.
**
................................................................................
**
**    sqlite3VdbeSorterReset()      Refurbish the VdbeSorter for reuse.  This
**                                  is like Close() followed by Init() only
**                                  much faster.
**
** The interfaces above must be called in a particular order.  Write() can 
** only occur in between Init()/Reset() and Rewind().  Next(), Rowkey(), and
** Compare() can only occur in between Rewind() and Close()/Reset().
**
** Algorithm:





**
** Records to be sorted are initially held in memory, in the order in
** which they arrive from Write().  When the amount of memory needed exceeds
** a threshold, all in-memory records are sorted and then appended to
** a temporary file as a "Packed-Memory-Array" or "PMA" and the memory is
** reset.  There is a single temporary file used for all PMAs.  The PMAs
** are packed one after another in the file.  The VdbeSorter object keeps
** track of the number of PMAs written.
**

** When the Rewind() is seen, any records still held in memory are sorted.
** If no PMAs have been written (if all records are still held in memory)
** then subsequent Rowkey(), Next(), and Compare() operations work directly
** from memory.  But if PMAs have been written things get a little more
** complicated.
**
** When Rewind() is seen after PMAs have been written, any records still
** in memory are sorted and written as a final PMA.  Then all the PMAs
** are merged together into a single massive PMA that Next(), Rowkey(),
** and Compare() walk to extract the records in sorted order.


**
** If SQLITE_MAX_WORKER_THREADS is non-zero, various steps of the above
** algorithm might be performed in parallel by separate threads.  Threads
** are only used when one or more PMA spill to disk.  If the sort is small
** enough to fit entirely in memory, everything happens on the main thread.



















































*/
#include "sqliteInt.h"
#include "vdbeInt.h"

/* 
** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
** messages to stderr that may be helpful in understanding the performance
................................................................................
#endif

/*
** Private objects used by the sorter
*/
typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
typedef struct PmaWriter PmaWriter;         /* Incrementally write on PMA */
typedef struct SorterRecord SorterRecord;   /* A record being sorted */
typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
typedef struct SorterFile SorterFile;
typedef struct SorterThread SorterThread;
typedef struct SorterList SorterList;
typedef struct IncrMerger IncrMerger;

/*
** A container for a temp file handle and the current amount of data 
** stored in the file.
*/
struct SorterFile {
  sqlite3_file *pFd;              /* File handle */
  i64 iEof;                       /* Bytes of data stored in pFd */
};

/*
** An object of this type is used to store the thread handle for each 
** background thread launched by the sorter. Before the thread is launched,
** variable bDone is set to 0. Then, right before it exits, the thread 
** itself sets bDone to 1.
**
** This is then used for two purposes:
**
**   1. When flushing the contents of memory to a level-0 PMA on disk, to
**      attempt to select a SortSubtask for which there is not already an
**      active background thread (since doing so causes the main thread
**      to block until it finishes).
**
**   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
**      to sqlite3ThreadJoin() is likely to block.
**
** In both cases, the effects of the main thread seeing (bDone==0) even
** after the thread has finished are not dire. So we don't worry about
** memory barriers and such here.
*/
struct SorterThread {
  SQLiteThread *pThread;
  int bDone;
};

struct SorterList {
  SorterRecord *pList;            /* Linked list of records */
  u8 *aMemory;                    /* If non-NULL, blob of memory for pList */
  int szPMA;                      /* Size of pList as PMA in bytes */
};

/*
** Sorting is divided up into smaller subtasks.  Each subtask is controlled
** by an instance of this object. A Subtask might run in either the main thread
** or in a background thread.
**
** Exactly VdbeSorter.nTask instances of this object are allocated
** as part of each VdbeSorter object. Instances are never allocated any other
** way. VdbeSorter.nTask is set to the number of worker threads allowed
** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
**
** When a background thread is launched to perform work, SortSubtask.bDone
** is set to 0 and the SortSubtask.pTask variable set to point to the
** thread handle. SortSubtask.bDone is set to 1 (to indicate to the main
** thread that joining SortSubtask.pTask will not block) before the thread
** exits. SortSubtask.pTask and bDone are always cleared after the 
** background thread has been joined.
**
** One object (specifically, VdbeSorter.aTask[VdbeSorter.nTask-1])
** is reserved for the foreground thread.
**
** The nature of the work performed is determined by SortSubtask.eWork,
** as follows:
**
**   SORT_SUBTASK_SORT:
**     Sort the linked list of records at SortSubtask.pList.
**
**   SORT_SUBTASK_TO_PMA:
**     Sort the linked list of records at SortSubtask.pList, and write
**     the results to a new PMA in temp file SortSubtask.pTemp1. Open
**     the temp file if it is not already open.
**
**   SORT_SUBTASK_CONS:
**     Merge existing PMAs until SortSubtask.nConsolidate or fewer
**     remain in temp file SortSubtask.pTemp1.
*/
struct SortSubtask {
  SorterThread thread;
  sqlite3 *db;                    /* Database connection */
  VdbeSorter *pSorter;            /* Sorter */
  KeyInfo *pKeyInfo;              /* How to compare records */
  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
  int pgsz;                       /* Main database page size */
  SorterList list;                /* List for thread to write to a PMA */
  int nPMA;                       /* Number of PMAs currently in file */
  SorterFile file;                /* Temp file for level-0 PMAs */
  SorterFile file2;               /* Space for other PMAs */
};


/*
** The MergeEngine object is used to combine two or more smaller PMAs into
** one big PMA using a merge operation.  Separate PMAs all need to be
** combined into one big PMA in order to be able to step through the sorted
** records in order.
**
................................................................................
*/
struct MergeEngine {
  int nTree;                 /* Used size of aTree/aIter (power of 2) */
  int *aTree;                /* Current state of incremental merge */
  PmaReader *aIter;          /* Array of iterators to merge data from */
};








































/*
** Main sorter structure. A single instance of this is allocated for each 
** sorter cursor created by the VDBE.
**
** mxKeysize:
**   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
**   this variable is updated so as to be set to the size on disk of the
**   largest record in the sorter.
*/
struct VdbeSorter {
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */


  PmaReader *pReader;             /* Read data from here after Rewind() */
  MergeEngine *pMerger;           /* Or here, if bUseThreads==0 */
  int mxKeysize;                  /* Largest serialized key seen so far */


  UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
  SorterList list;                /* List of in-memory records */
  int iMemory;                    /* Offset of free space in list.aMemory */
  int nMemory;                    /* Size of list.aMemory allocation in bytes */
  u8 bUsePMA;                     /* True if one or more PMAs created */
  u8 bUseThreads;                 /* True to use background threads */
  u8 iPrev;                       /* Previous thread used to flush PMA */
................................................................................
};

/*
** Normally, a PmaReader object iterates through an existing PMA stored 
** within a temp file. However, if the PmaReader.pIncr variable points to
** an object of the following type, it may be used to iterate/merge through
** multiple PMAs simultaneously.


























*/
struct IncrMerger {
  SortSubtask *pTask;             /* Task that owns this merger */
  SorterThread thread;            /* Thread for populating aFile[1] */
  MergeEngine *pMerger;           /* Merge engine thread reads data from */
  i64 iStartOff;                  /* Offset to start writing file at */
  int mxSz;                       /* Maximum bytes of data to store */
  int bEof;                       /* Set to true when merge is finished */
  int bUseThread;                 /* True to use a bg thread for this object */
  SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
};
................................................................................

/*
** This object is the header on a single record while that record is being
** held in memory and prior to being written out as part of a PMA.
**
** How the linked list is connected depends on how memory is being managed
** by this module. If using a separate allocation for each in-memory record
** (VdbeSorter.aMemory==0), then the list is always connected using the
** SorterRecord.u.pNext pointers.
**
** Or, if using the single large allocation method (VdbeSorter.aMemory!=0),
** then while records are being accumulated the list is linked using the
** SorterRecord.u.iNext offset. This is because the aMemory[] array may
** be sqlite3Realloc()ed while records are being accumulated. Once the VM
** has finished passing records to the sorter, or when the in-memory buffer
** is full, the list is sorted. As part of the sorting process, it is
** converted to use the SorterRecord.u.pNext pointers. See function
** vdbeSorterSort() for details.
................................................................................
      sqlite3GetVarint(aVarint, pnOut);
    }
  }

  return SQLITE_OK;
}










static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
  int rc = SQLITE_OK;
  if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
    rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp);
  }
  return rc;
}





static int vdbePmaReaderReinit(PmaReader *pIter){
  IncrMerger *pIncr = pIter->pIncr;
  SortSubtask *pTask = pIncr->pTask;




  int rc = SQLITE_OK;

  assert( pIncr->bEof==0 );


  if( pIter->aMap ){
    sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
    pIter->aMap = 0;
  }
  pIter->iReadOff = pIncr->iStartOff;
  pIter->iEof = pIncr->aFile[0].iEof;
  pIter->pFile = pIncr->aFile[0].pFd;

  rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
  if( rc==SQLITE_OK ){
    if( pIter->aMap==0 ){
      /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
      int iBuf = pIter->iReadOff % pTask->pgsz;
      if( pIter->aBuffer==0 ){
        pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
        if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
        pIter->nBuffer = pTask->pgsz;
      }
      if( iBuf ){
        int nRead = pTask->pgsz - iBuf;
        if( (pIter->iReadOff + nRead) > pIter->iEof ){
          nRead = (int)(pIter->iEof - pIter->iReadOff);
        }
        rc = sqlite3OsRead(
            pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
        );
        assert( rc!=SQLITE_IOERR_SHORT_READ );
      }
    }
  }

  return rc;
}


/*
** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
** no error occurs, or an SQLite error code if one does.
*/
static int vdbePmaReaderNext(PmaReader *pIter){
  int rc = SQLITE_OK;             /* Return Code */
  u64 nRec = 0;                   /* Size of record in bytes */

  if( pIter->iReadOff>=pIter->iEof ){

    int bEof = 1;
    if( pIter->pIncr ){
      rc = vdbeIncrSwap(pIter->pIncr);
      if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){
        rc = vdbePmaReaderReinit(pIter);


        bEof = 0;
      }
    }

    if( bEof ){
      /* This is an EOF condition */
      vdbePmaReaderClear(pIter);
................................................................................
static int vdbePmaReaderInit(
  SortSubtask *pTask,             /* Task context */
  SorterFile *pFile,              /* Sorter file to read from */
  i64 iStart,                     /* Start offset in pFile */
  PmaReader *pIter,               /* Iterator to populate */
  i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
){
  int rc = SQLITE_OK;
  int nBuf = pTask->pgsz;

  assert( pFile->iEof>iStart );
  assert( pIter->aAlloc==0 );
  assert( pIter->aBuffer==0 );
  pIter->pFile = pFile->pFd;
  pIter->iReadOff = iStart;
  pIter->nAlloc = 128;
  pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
  if( pIter->aAlloc ){
    /* Try to xFetch() a mapping of the entire temp file. If this is possible,
    ** the PMA will be read via the mapping. Otherwise, use xRead().  */
    rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
  }else{
    rc = SQLITE_NOMEM;
  }

  if( rc==SQLITE_OK && pIter->aMap==0 ){
    pIter->nBuffer = nBuf;
    pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
    if( !pIter->aBuffer ){
      rc = SQLITE_NOMEM;
    }else{
      int iBuf = iStart % nBuf;
      if( iBuf ){
        int nRead = nBuf - iBuf;
        if( (iStart + nRead) > pFile->iEof ){
          nRead = (int)(pFile->iEof - iStart);
        }
        rc = sqlite3OsRead(
            pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
        );
        assert( rc!=SQLITE_IOERR_SHORT_READ );
      }
    }
  }


  if( rc==SQLITE_OK ){
    u64 nByte;                    /* Size of PMA in bytes */
    pIter->iEof = pFile->iEof;
    rc = vdbePmaReadVarint(pIter, &nByte);
    pIter->iEof = pIter->iReadOff + nByte;
    *pnByte += nByte;
  }

  if( rc==SQLITE_OK ){
    rc = vdbePmaReaderNext(pIter);
................................................................................
static int vdbeSorterCompare(
  SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
  const void *pKey1, int nKey1,   /* Left side of comparison */
  const void *pKey2, int nKey2    /* Right side of comparison */
){
  UnpackedRecord *r2 = pTask->pUnpacked;
  if( pKey2 ){
    sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2);
  }
  return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
}

/*
** This function is called to compare two iterator keys when merging 
** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
................................................................................
  sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);

  pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
  pCsr->pSorter = pSorter;
  if( pSorter==0 ){
    rc = SQLITE_NOMEM;
  }else{
    pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
    memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
    pKeyInfo->db = 0;
    if( nField && nWorker==0 ) pKeyInfo->nField = nField;
    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);

    pSorter->nTask = nWorker + 1;
    pSorter->bUseThreads = (pSorter->nTask>1);

    for(i=0; i<pSorter->nTask; i++){
      SortSubtask *pTask = &pSorter->aTask[i];
      pTask->pKeyInfo = pKeyInfo;
      pTask->pgsz = pgsz;
      pTask->db = db;
      pTask->pSorter = pSorter;
    }

    if( !sqlite3TempInMemory(db) ){
      pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
      mxCache = db->aDb[0].pSchema->cache_size;
      if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
................................................................................
# define vdbeSorterRewindDebug(x,y)
# define vdbeSorterPopulateDebug(x,y)
# define vdbeSorterBlockDebug(x,y,z)
#endif

#if SQLITE_MAX_WORKER_THREADS>0
/*
** Join thread p.
*/
static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){
  int rc = SQLITE_OK;
  if( p->pThread ){
#ifdef SQLITE_DEBUG_SORTER_THREADS
    int bDone = p->bDone;
#endif
    void *pRet;
    vdbeSorterBlockDebug(pTask, !bDone, "enter");
    rc = sqlite3ThreadJoin(p->pThread, &pRet);
    vdbeSorterBlockDebug(pTask, !bDone, "exit");
    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
    assert( p->bDone==1 );
    p->bDone = 0;
    p->pThread = 0;
  }
  return rc;
}

/*
** Launch a background thread to run xTask(pIn).
*/
static int vdbeSorterCreateThread(
  SorterThread *p,                /* Thread object to populate */
  void *(*xTask)(void*),          /* Routine to run in a separate thread */
  void *pIn                       /* Argument passed into xTask() */
){
  assert( p->pThread==0 && p->bDone==0 );
  return sqlite3ThreadCreate(&p->pThread, xTask, pIn);
}

/*
** Join all outstanding threads launched by SorterWrite() to create 
** level-0 PMAs.
*/
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
  int rc = rcin;
  int i;
  for(i=0; i<pSorter->nTask; i++){
    SortSubtask *pTask = &pSorter->aTask[i];
    int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread);
    if( rc==SQLITE_OK ) rc = rc2;
  }
  return rc;
}
#else
# define vdbeSorterJoinAll(x,rcin) (rcin)
# define vdbeSorterJoinThread(pTask,p) SQLITE_OK
#endif

/*
** Allocate a new MergeEngine object with space for nIter iterators.
*/
static MergeEngine *vdbeMergeEngineNew(int nIter){
  int N = 2;                      /* Smallest power of two >= nIter */
................................................................................
  if( pMerger ){
    for(i=0; i<pMerger->nTree; i++){
      vdbePmaReaderClear(&pMerger->aIter[i]);
    }
  }
  sqlite3_free(pMerger);
}



















/*
** Reset a sorting cursor back to its original empty state.
*/
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
  int i;
  (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
................................................................................
  if( rc==SQLITE_OK ){
    i64 max = SQLITE_MAX_MMAP_SIZE;
    sqlite3OsFileControlHint( *ppFile, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
  }
  return rc;
}






static int vdbeSortAllocUnpacked(SortSubtask *pTask){
  if( pTask->pUnpacked==0 ){
    char *pFree;
    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
        pTask->pKeyInfo, 0, 0, &pFree
    );
    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
    if( pFree==0 ) return SQLITE_NOMEM;
    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
    pTask->pUnpacked->errCode = 0;
  }
  return SQLITE_OK;
}


/*
................................................................................
**       in the PMA (not including the varint itself).
**
**     * One or more records packed end-to-end in order of ascending keys. 
**       Each record consists of a varint followed by a blob of data (the 
**       key). The varint is the number of bytes in the blob of data.
*/
static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){

  int rc = SQLITE_OK;             /* Return code */
  PmaWriter writer;               /* Object used to write to the file */

#ifdef SQLITE_DEBUG
  /* Set iSz to the expected size of file pTask->file after writing the PMA. 
  ** This is used by an assert() statement at the end of this function.  */
  i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
................................................................................

  vdbeSorterWorkDebug(pTask, "enter");
  memset(&writer, 0, sizeof(PmaWriter));
  assert( pList->szPMA>0 );

  /* If the first temporary PMA file has not been opened, open it now. */
  if( pTask->file.pFd==0 ){
    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd);
    assert( rc!=SQLITE_OK || pTask->file.pFd );
    assert( pTask->file.iEof==0 );
    assert( pTask->nPMA==0 );
  }

  /* Try to get the file to memory map */
  if( rc==SQLITE_OK ){
    vdbeSorterExtendFile(pTask->db, 
        pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9
    );
  }

  /* Sort the list */
  if( rc==SQLITE_OK ){
    rc = vdbeSorterSort(pTask, pList);
  }

  if( rc==SQLITE_OK ){
    SorterRecord *p;
    SorterRecord *pNext = 0;

    vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
                      pTask->file.iEof);
    pTask->nPMA++;
    vdbePmaWriteVarint(&writer, pList->szPMA);
    for(p=pList->pList; p; p=pNext){
      pNext = p->u.pNext;
      vdbePmaWriteVarint(&writer, p->nVal);
      vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
................................................................................
    *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
  }

  return rc;
}

/*
** The main routine for sorter-thread operations.
*/
static void *vdbeSorterFlushThread(void *pCtx){
  SortSubtask *pTask = (SortSubtask*)pCtx;
  int rc;                         /* Return code */
  assert( pTask->thread.bDone==0 );
  rc = vdbeSorterListToPMA(pTask, &pTask->list);
  pTask->thread.bDone = 1;
  return SQLITE_INT_TO_PTR(rc);
}

/*
** Flush the current contents of VdbeSorter.list to a new PMA, possibly
** using a background thread.
*/
................................................................................
  ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
  ** fall back to using the final sub-task. The first (pSorter->nTask-1)
  ** sub-tasks are prefered as they use background threads - the final 
  ** sub-task uses the main thread. */
  for(i=0; i<nWorker; i++){
    int iTest = (pSorter->iPrev + i + 1) % nWorker;
    pTask = &pSorter->aTask[iTest];
    if( pTask->thread.bDone ){
      rc = vdbeSorterJoinThread(pTask, &pTask->thread);
    }
    if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break;
  }

  if( rc==SQLITE_OK ){
    if( i==nWorker ){
      /* Use the foreground thread for this operation */
      rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
    }else{
      /* Launch a background thread for this operation */
      u8 *aMem = pTask->list.aMemory;
      void *pCtx = (void*)pTask;

      assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 );
      assert( pTask->list.pList==0 );
      assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );

      pSorter->iPrev = (pTask - pSorter->aTask);
      pTask->list = pSorter->list;
      pSorter->list.pList = 0;
      pSorter->list.szPMA = 0;
................................................................................
        pSorter->list.aMemory = aMem;
        pSorter->nMemory = sqlite3MallocSize(aMem);
      }else{
        pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
        if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
      }

      rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx);
    }
  }

  return rc;
#endif
}

................................................................................
** except that the number-of-bytes varint is omitted from the start.
*/
static int vdbeIncrPopulate(IncrMerger *pIncr){
  int rc = SQLITE_OK;
  int rc2;
  i64 iStart = pIncr->iStartOff;
  SorterFile *pOut = &pIncr->aFile[1];

  MergeEngine *pMerger = pIncr->pMerger;
  PmaWriter writer;
  assert( pIncr->bEof==0 );

  vdbeSorterPopulateDebug(pIncr->pTask, "enter");

  vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
  while( rc==SQLITE_OK ){
    int dummy;
    PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
    int nKey = pReader->nKey;
    i64 iEof = writer.iWriteOff + writer.iBufEnd;

    /* Check if the output file is full or if the input has been exhausted.
................................................................................
    ** In either case exit the loop. */
    if( pReader->pFile==0 ) break;
    if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;

    /* Write the next key to the output. */
    vdbePmaWriteVarint(&writer, nKey);
    vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
    rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
  }

  rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  if( rc==SQLITE_OK ) rc = rc2;
  vdbeSorterPopulateDebug(pIncr->pTask, "exit");
  return rc;
}






static void *vdbeIncrPopulateThread(void *pCtx){
  IncrMerger *pIncr = (IncrMerger*)pCtx;
  void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
  pIncr->thread.bDone = 1;
  return pRet;
}

#if SQLITE_MAX_WORKER_THREADS>0



static int vdbeIncrBgPopulate(IncrMerger *pIncr){
  void *pCtx = (void*)pIncr;
  assert( pIncr->bUseThread );
  return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx);
}
#endif


















static int vdbeIncrSwap(IncrMerger *pIncr){
  int rc = SQLITE_OK;

#if SQLITE_MAX_WORKER_THREADS>0
  if( pIncr->bUseThread ){
    rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);

    if( rc==SQLITE_OK ){
      SorterFile f0 = pIncr->aFile[0];
      pIncr->aFile[0] = pIncr->aFile[1];
      pIncr->aFile[1] = f0;
    }

................................................................................
      pIncr->bEof = 1;
    }
  }

  return rc;
}

static void vdbeIncrFree(IncrMerger *pIncr){

  if( pIncr ){
#if SQLITE_MAX_WORKER_THREADS>0
    vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
    if( pIncr->bUseThread ){
      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
    }
#endif
    vdbeMergeEngineFree(pIncr->pMerger);
    sqlite3_free(pIncr);
  }
}


static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
  IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
  if( pIncr ){
    memset(pIncr, 0, sizeof(IncrMerger));
    pIncr->pMerger = pMerger;
    pIncr->pTask = pTask;
    pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
    pTask->file2.iEof += pIncr->mxSz;
  }
  return pIncr;
}




static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  if( bUseThread ){
    pIncr->bUseThread = 1;
    pIncr->pTask->file2.iEof -= pIncr->mxSz;
  }
}

................................................................................
}

static int vdbeIncrInit2(PmaReader *pIter, int eMode){
  int rc = SQLITE_OK;
  IncrMerger *pIncr = pIter->pIncr;
  if( pIncr ){
    SortSubtask *pTask = pIncr->pTask;


    rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode);

    /* Set up the required files for pIncr */
    if( rc==SQLITE_OK ){
      if( pIncr->bUseThread==0 ){
        if( pTask->file2.pFd==0 ){
          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
          assert( pTask->file2.iEof>0 );
          if( rc==SQLITE_OK ){
            vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
            pTask->file2.iEof = 0;
          }
        }
        if( rc==SQLITE_OK ){
          pIncr->aFile[1].pFd = pTask->file2.pFd;
          pIncr->iStartOff = pTask->file2.iEof;
          pTask->file2.iEof += pIncr->mxSz;
        }
      }else{
        rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
        if( rc==SQLITE_OK ){
          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
        }
      }
    }

    if( rc==SQLITE_OK && pIncr->bUseThread ){
      /* Use the current thread */
      assert( eMode==INCRINIT2_ROOT || eMode==INCRINIT2_TASK );
................................................................................
  return rc;
}

#if SQLITE_MAX_WORKER_THREADS>0
static void *vdbeIncrInit2Thread(void *pCtx){
  PmaReader *pReader = (PmaReader*)pCtx;
  void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) );
  pReader->pIncr->thread.bDone = 1;
  return pRet;
}

static int vdbeIncrBgInit2(PmaReader *pIter){
  void *pCtx = (void*)pIter;
  return vdbeSorterCreateThread(
      &pIter->pIncr->thread, vdbeIncrInit2Thread, pCtx
  );
}
#endif

/*
** Allocate a new MergeEngine object to merge the contents of nPMA level-0
** PMAs from pTask->file. If no error occurs, set *ppOut to point to
** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
................................................................................
/*
** Populate iterator *pIter so that it may be used to iterate through all 
** keys stored in all PMAs created by this sorter.
*/
static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
  SortSubtask *pTask0 = &pSorter->aTask[0];
  MergeEngine *pMain = 0;
  sqlite3 *db = pTask0->db;
  int rc = SQLITE_OK;
  int iTask;

  IncrBuilder *aMerge;
  const int nMerge = 32;
  aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
  if( aMerge==0 ) return SQLITE_NOMEM;







|







 







|

|
>
>
>
>
>

<
<
<
<
<
|
<

>
|
|
|
|
|

|
|
|
|
>
>

<
<
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|


|
<
|












|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<

<
<
<
<
<





<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>












>
>


<
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



<







 







|


|







 







>
>
>
>
>
>
>
>
>


|





>
>
>
>
|
<
|
>
>
>
>


<
>





|
|
|

|
|
|
<
|
|
|
|
|
|
|
|









<



<










>

|
|
|
|
>
>







 







|
<


|

<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
>


<







 







|







 







|



|
<


>


<
<
<







 







|

|

|

|



|


|
|
|








|



|
|











|






|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>




|



|







 







>







 







|







<
|
<











|







 







|




|

|







 







|
|

|











|







 







|







 







>




|

|







 







|




|



>
>
>
>
>



|



<
>
>
>

|

|



>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>





|







 







<
>
|
<
<
<
<
<
<
<
<
<
<
<
<
>












>
>
>







 







>







|


|









|

|







 







|





|
<
<







 







|







6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
..
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68





69

70
71
72
73
74
75
76
77
78
79
80
81
82
83
84


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
...
149
150
151
152
153
154
155
156
157
158
159

160
161
162
163
164
165
166
167
168
169
170
171
172
173

















174





175
176
177
178
179

















































180
181
182
183
184
185
186
...
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304

305
306
307
308
309
310
311
312
313
...
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370

371
372
373
374
375
376
377
...
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
...
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601

602
603
604
605
606
607
608

609
610
611
612
613
614
615
616
617
618
619
620
621

622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638

639
640
641

642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
...
691
692
693
694
695
696
697
698

699
700
701
702












703










704








705
706
707

708
709
710
711
712
713
714
...
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
...
815
816
817
818
819
820
821
822
823
824
825
826

827
828
829
830
831



832
833
834
835
836
837
838
...
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
....
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
....
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
....
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
....
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357

1358

1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
....
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
....
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
....
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
....
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
....
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692

1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
....
1746
1747
1748
1749
1750
1751
1752

1753
1754












1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
....
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
....
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863


1864
1865
1866
1867
1868
1869
1870
....
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
**
**    May you do good and not evil.
**    May you find forgiveness for yourself and forgive others.
**    May you share freely, never taking more than you give.
**
*************************************************************************
** This file contains code for the VdbeSorter object, used in concert with
** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
** or by SELECT statements with ORDER BY clauses that cannot be satisfied
** using indexes and without LIMIT clauses.
**
** The VdbeSorter object implements a multi-threaded external merge sort
** algorithm that is efficient even if the number of element being sorted
** exceeds the available memory.
**
................................................................................
**
**    sqlite3VdbeSorterReset()      Refurbish the VdbeSorter for reuse.  This
**                                  is like Close() followed by Init() only
**                                  much faster.
**
** The interfaces above must be called in a particular order.  Write() can 
** only occur in between Init()/Reset() and Rewind().  Next(), Rowkey(), and
** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
**
**   Init()
**   for each record: Write()
**   Rewind()
**     Rowkey()/Compare()
**   Next() 
**   Close()
**





** Algorithm:

**
** Records passed to the sorter via calls to Write() are initially held 
** unsorted in main memory. Assuming the amount of memory used never exceeds
** a threshold, when Rewind() is called the set of records is sorted using
** an in-memory merge sort. In this case, no temporary files are required
** and subsequent calls to Rowkey(), Next() and Compare() read records 
** directly from main memory.
**
** If the amount of space used to store records in main memory exceeds the
** threshold, then the set of records currently in memory are sorted and
** written to a temporary file in "Packed Memory Array" (PMA) format.
** A PMA created at this point is known as a "level-0 PMA". Higher levels
** of PMAs may be created by merging existing PMAs together - for example
** merging two or more level-0 PMAs together creates a level-1 PMA.
**


** The threshold for the amount of main memory to use before flushing 
** records to a PMA is roughly the same as the limit configured for the
** page-cache of the main database. Specifically, the threshold is set to 
** the value returned multiplied by "PRAGMA main.page_size" multipled by 
** that returned by "PRAGMA main.cache_size", in bytes.
**
** If the sorter is running in single-threaded mode, then all PMAs generated
** are appended to a single temporary file. Or, if the sorter is running in
** multi-threaded mode then up to (N+1) temporary files may be opened, where
** N is the configured number of worker threads. In this case, instead of
** sorting the records and writing the PMA to a temporary file itself, the
** calling thread usually launches a worker thread to do so. Except, if
** there are already N worker threads running, the main thread does the work
** itself.
**
** The sorter is running in multi-threaded mode if (a) the library was built
** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
** than zero, and (b) worker threads have been enabled at runtime by calling
** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...).
**
** When Rewind() is called, any data remaining in memory is flushed to a 
** final PMA. So at this point the data is stored in some number of sorted
** PMAs within temporary files on disk. Within a single file sorter is 
** running in single threaded mode, or distributed between one or more files
** for multi-threaded sorters.
**
** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
** sorter is running in single-threaded mode, then these PMAs are merged
** incrementally as keys are retreived from the sorter by the VDBE. See
** comments above object MergeEngine below for details.
**
** Or, if running in multi-threaded mode, then a background thread is
** launched to merge the existing PMAs. Once the background thread has
** merged T bytes of data into a single sorted PMA, the main thread 
** begins reading keys from that PMA while the background thread proceeds
** with merging the next T bytes of data. And so on.
**
** Parameter T is set to half the value of the memory threshold used 
** by Write() above to determine when to create a new PMA.
**
** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when 
** Rewind() is called, then a hierarchy of incremental-merges is used. 
** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on 
** disk are merged together. Then T bytes of data from the second set, and
** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
** PMAs at a time. This done is to improve locality.
**
** If running in multi-threaded mode and there are more than
** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
** than one background thread may be created. Specifically, there may be
** one background thread for each temporary file on disk, and one background
** thread to merge the output of each of the others to a single PMA for
** the main thread to read from.
*/
#include "sqliteInt.h"
#include "vdbeInt.h"

/* 
** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
** messages to stderr that may be helpful in understanding the performance
................................................................................
#endif

/*
** Private objects used by the sorter
*/
typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
typedef struct PmaWriter PmaWriter;         /* Incrementally write one PMA */
typedef struct SorterRecord SorterRecord;   /* A record being sorted */
typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
typedef struct SorterFile SorterFile;       /* Temporary file object wrapper */

typedef struct SorterList SorterList;       /* In-memory list of records */
typedef struct IncrMerger IncrMerger;

/*
** A container for a temp file handle and the current amount of data 
** stored in the file.
*/
struct SorterFile {
  sqlite3_file *pFd;              /* File handle */
  i64 iEof;                       /* Bytes of data stored in pFd */
};

/*
** In memory linked list of records.

















*/





struct SorterList {
  SorterRecord *pList;            /* Linked list of records */
  u8 *aMemory;                    /* If non-NULL, blob of memory for pList */
  int szPMA;                      /* Size of pList as PMA in bytes */
};


















































/*
** The MergeEngine object is used to combine two or more smaller PMAs into
** one big PMA using a merge operation.  Separate PMAs all need to be
** combined into one big PMA in order to be able to step through the sorted
** records in order.
**
................................................................................
*/
struct MergeEngine {
  int nTree;                 /* Used size of aTree/aIter (power of 2) */
  int *aTree;                /* Current state of incremental merge */
  PmaReader *aIter;          /* Array of iterators to merge data from */
};

/*
** Exactly VdbeSorter.nTask instances of this object are allocated
** as part of each VdbeSorter object. Instances are never allocated any
** other way. VdbeSorter.nTask is set to the number of worker threads allowed
** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
**
** Essentially, this structure contains all those fields of the VdbeSorter
** structure for which each thread requires a separate instance. For example,
** each thread requries its own UnpackedRecord object to unpack records in
** as part of comparison operations.
**
** Before a background thread is launched, variable bDone is set to 0. Then, 
** right before it exits, the thread itself sets bDone to 1. This is used for 
** two purposes:
**
**   1. When flushing the contents of memory to a level-0 PMA on disk, to
**      attempt to select a SortSubtask for which there is not already an
**      active background thread (since doing so causes the main thread
**      to block until it finishes).
**
**   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
**      to sqlite3ThreadJoin() is likely to block. Cases that are likely to
**      block provoke debugging output.
**
** In both cases, the effects of the main thread seeing (bDone==0) even
** after the thread has finished are not dire. So we don't worry about
** memory barriers and such here.
*/
struct SortSubtask {
  SQLiteThread *pThread;          /* Background thread, if any */
  int bDone;                      /* Set if thread is finished but not joined */
  VdbeSorter *pSorter;            /* Sorter that owns this sub-task */
  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
  SorterList list;                /* List for thread to write to a PMA */
  int nPMA;                       /* Number of PMAs currently in file */
  SorterFile file;                /* Temp file for level-0 PMAs */
  SorterFile file2;               /* Space for other PMAs */
};

/*
** Main sorter structure. A single instance of this is allocated for each 
** sorter cursor created by the VDBE.
**
** mxKeysize:
**   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
**   this variable is updated so as to be set to the size on disk of the
**   largest record in the sorter.
*/
struct VdbeSorter {
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  int mxKeysize;                  /* Largest serialized key seen so far */
  int pgsz;                       /* Main database page size */
  PmaReader *pReader;             /* Read data from here after Rewind() */
  MergeEngine *pMerger;           /* Or here, if bUseThreads==0 */

  sqlite3 *db;                    /* Database connection */
  KeyInfo *pKeyInfo;              /* How to compare records */
  UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
  SorterList list;                /* List of in-memory records */
  int iMemory;                    /* Offset of free space in list.aMemory */
  int nMemory;                    /* Size of list.aMemory allocation in bytes */
  u8 bUsePMA;                     /* True if one or more PMAs created */
  u8 bUseThreads;                 /* True to use background threads */
  u8 iPrev;                       /* Previous thread used to flush PMA */
................................................................................
};

/*
** Normally, a PmaReader object iterates through an existing PMA stored 
** within a temp file. However, if the PmaReader.pIncr variable points to
** an object of the following type, it may be used to iterate/merge through
** multiple PMAs simultaneously.
**
** There are two types of IncrMerger object - single (bUseThread==0) and 
** multi-threaded (bUseThread==1). 
**
** A multi-threaded IncrMerger object uses two temporary files - aFile[0] 
** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in 
** size. When the IncrMerger is initialized, it reads enough data from 
** pMerger to populate aFile[0]. It then sets variables within the 
** corresponding PmaReader object to read from that file and kicks off 
** a background thread to populate aFile[1] with the next mxSz bytes of 
** sorted record data from pMerger. 
**
** When the PmaReader reaches the end of aFile[0], it blocks until the
** background thread has finished populating aFile[1]. It then exchanges
** the contents of the aFile[0] and aFile[1] variables within this structure,
** sets the PmaReader fields to read from the new aFile[0] and kicks off
** another background thread to populate the new aFile[1]. And so on, until
** the contents of pMerger are exhausted.
**
** A single-threaded IncrMerger does not open any temporary files of its
** own. Instead, it has exclusive access to mxSz bytes of space beginning
** at offset iStartOff of file pTask->file2. And instead of using a 
** background thread to prepare data for the PmaReader, with a single
** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
** keys from pMerger by the calling thread whenever the PmaReader runs out
** of data.
*/
struct IncrMerger {
  SortSubtask *pTask;             /* Task that owns this merger */

  MergeEngine *pMerger;           /* Merge engine thread reads data from */
  i64 iStartOff;                  /* Offset to start writing file at */
  int mxSz;                       /* Maximum bytes of data to store */
  int bEof;                       /* Set to true when merge is finished */
  int bUseThread;                 /* True to use a bg thread for this object */
  SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
};
................................................................................

/*
** This object is the header on a single record while that record is being
** held in memory and prior to being written out as part of a PMA.
**
** How the linked list is connected depends on how memory is being managed
** by this module. If using a separate allocation for each in-memory record
** (VdbeSorter.list.aMemory==0), then the list is always connected using the
** SorterRecord.u.pNext pointers.
**
** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
** then while records are being accumulated the list is linked using the
** SorterRecord.u.iNext offset. This is because the aMemory[] array may
** be sqlite3Realloc()ed while records are being accumulated. Once the VM
** has finished passing records to the sorter, or when the in-memory buffer
** is full, the list is sorted. As part of the sorting process, it is
** converted to use the SorterRecord.u.pNext pointers. See function
** vdbeSorterSort() for details.
................................................................................
      sqlite3GetVarint(aVarint, pnOut);
    }
  }

  return SQLITE_OK;
}

/*
** Attempt to memory map file pFile. If successful, set *pp to point to the
** new mapping and return SQLITE_OK. If the mapping is not attempted 
** (because the file is too large or the VFS layer is configured not to use
** mmap), return SQLITE_OK and set *pp to NULL.
**
** Or, if an error occurs, return an SQLite error code. The final value of
** *pp is undefined in this case.
*/
static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
  int rc = SQLITE_OK;
  if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
    rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp);
  }
  return rc;
}

/*
** Seek iterator pIter to offset iOff within file pFile. Return SQLITE_OK 
** if successful, or an SQLite error code if an error occurs.
*/
static int vdbePmaReaderSeek(

  SortSubtask *pTask,             /* Task context */
  PmaReader *pIter,               /* Iterate to populate */
  SorterFile *pFile,              /* Sorter file to read from */
  i64 iOff                        /* Offset in pFile */
){
  int rc = SQLITE_OK;


  assert( pIter->pIncr==0 || pIter->pIncr->bEof==0 );

  if( pIter->aMap ){
    sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
    pIter->aMap = 0;
  }
  pIter->iReadOff = iOff;
  pIter->iEof = pFile->iEof;
  pIter->pFile = pFile->pFd;

  rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
  if( rc==SQLITE_OK && pIter->aMap==0 ){
    int pgsz = pTask->pSorter->pgsz;

    int iBuf = pIter->iReadOff % pgsz;
    if( pIter->aBuffer==0 ){
      pIter->aBuffer = (u8*)sqlite3Malloc(pgsz);
      if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
      pIter->nBuffer = pgsz;
    }
    if( iBuf ){
      int nRead = pgsz - iBuf;
      if( (pIter->iReadOff + nRead) > pIter->iEof ){
        nRead = (int)(pIter->iEof - pIter->iReadOff);
      }
      rc = sqlite3OsRead(
          pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
      );
      assert( rc!=SQLITE_IOERR_SHORT_READ );
    }
  }


  return rc;
}


/*
** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
** no error occurs, or an SQLite error code if one does.
*/
static int vdbePmaReaderNext(PmaReader *pIter){
  int rc = SQLITE_OK;             /* Return Code */
  u64 nRec = 0;                   /* Size of record in bytes */

  if( pIter->iReadOff>=pIter->iEof ){
    IncrMerger *pIncr = pIter->pIncr;
    int bEof = 1;
    if( pIncr ){
      rc = vdbeIncrSwap(pIncr);
      if( rc==SQLITE_OK && pIncr->bEof==0 ){
        rc = vdbePmaReaderSeek(
            pIncr->pTask, pIter, &pIncr->aFile[0], pIncr->iStartOff
        );
        bEof = 0;
      }
    }

    if( bEof ){
      /* This is an EOF condition */
      vdbePmaReaderClear(pIter);
................................................................................
static int vdbePmaReaderInit(
  SortSubtask *pTask,             /* Task context */
  SorterFile *pFile,              /* Sorter file to read from */
  i64 iStart,                     /* Start offset in pFile */
  PmaReader *pIter,               /* Iterator to populate */
  i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
){
  int rc;


  assert( pFile->iEof>iStart );
  assert( pIter->aAlloc==0 && pIter->nAlloc==0 );
  assert( pIter->aBuffer==0 );












  assert( pIter->aMap==0 );



















  rc = vdbePmaReaderSeek(pTask, pIter, pFile, iStart);
  if( rc==SQLITE_OK ){
    u64 nByte;                    /* Size of PMA in bytes */

    rc = vdbePmaReadVarint(pIter, &nByte);
    pIter->iEof = pIter->iReadOff + nByte;
    *pnByte += nByte;
  }

  if( rc==SQLITE_OK ){
    rc = vdbePmaReaderNext(pIter);
................................................................................
static int vdbeSorterCompare(
  SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
  const void *pKey1, int nKey1,   /* Left side of comparison */
  const void *pKey2, int nKey2    /* Right side of comparison */
){
  UnpackedRecord *r2 = pTask->pUnpacked;
  if( pKey2 ){
    sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
  }
  return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
}

/*
** This function is called to compare two iterator keys when merging 
** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
................................................................................
  sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);

  pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
  pCsr->pSorter = pSorter;
  if( pSorter==0 ){
    rc = SQLITE_NOMEM;
  }else{
    pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
    memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
    pKeyInfo->db = 0;
    if( nField && nWorker==0 ) pKeyInfo->nField = nField;
    pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);

    pSorter->nTask = nWorker + 1;
    pSorter->bUseThreads = (pSorter->nTask>1);
    pSorter->db = db;
    for(i=0; i<pSorter->nTask; i++){
      SortSubtask *pTask = &pSorter->aTask[i];



      pTask->pSorter = pSorter;
    }

    if( !sqlite3TempInMemory(db) ){
      pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
      mxCache = db->aDb[0].pSchema->cache_size;
      if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
................................................................................
# define vdbeSorterRewindDebug(x,y)
# define vdbeSorterPopulateDebug(x,y)
# define vdbeSorterBlockDebug(x,y,z)
#endif

#if SQLITE_MAX_WORKER_THREADS>0
/*
** Join thread pTask->thread.
*/
static int vdbeSorterJoinThread(SortSubtask *pTask){
  int rc = SQLITE_OK;
  if( pTask->pThread ){
#ifdef SQLITE_DEBUG_SORTER_THREADS
    int bDone = pTask->bDone;
#endif
    void *pRet;
    vdbeSorterBlockDebug(pTask, !bDone, "enter");
    rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
    vdbeSorterBlockDebug(pTask, !bDone, "exit");
    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
    assert( pTask->bDone==1 );
    pTask->bDone = 0;
    pTask->pThread = 0;
  }
  return rc;
}

/*
** Launch a background thread to run xTask(pIn).
*/
static int vdbeSorterCreateThread(
  SortSubtask *pTask,             /* Thread will use this task object */
  void *(*xTask)(void*),          /* Routine to run in a separate thread */
  void *pIn                       /* Argument passed into xTask() */
){
  assert( pTask->pThread==0 && pTask->bDone==0 );
  return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
}

/*
** Join all outstanding threads launched by SorterWrite() to create 
** level-0 PMAs.
*/
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
  int rc = rcin;
  int i;
  for(i=0; i<pSorter->nTask; i++){
    SortSubtask *pTask = &pSorter->aTask[i];
    int rc2 = vdbeSorterJoinThread(pTask);
    if( rc==SQLITE_OK ) rc = rc2;
  }
  return rc;
}
#else
# define vdbeSorterJoinAll(x,rcin) (rcin)
# define vdbeSorterJoinThread(pTask) SQLITE_OK
#endif

/*
** Allocate a new MergeEngine object with space for nIter iterators.
*/
static MergeEngine *vdbeMergeEngineNew(int nIter){
  int N = 2;                      /* Smallest power of two >= nIter */
................................................................................
  if( pMerger ){
    for(i=0; i<pMerger->nTree; i++){
      vdbePmaReaderClear(&pMerger->aIter[i]);
    }
  }
  sqlite3_free(pMerger);
}

/*
** Free all resources associated with the IncrMerger object indicated by
** the first argument.
*/
static void vdbeIncrFree(IncrMerger *pIncr){
  if( pIncr ){
#if SQLITE_MAX_WORKER_THREADS>0
    if( pIncr->bUseThread ){
      vdbeSorterJoinThread(pIncr->pTask);
      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
    }
#endif
    vdbeMergeEngineFree(pIncr->pMerger);
    sqlite3_free(pIncr);
  }
}

/*
** Reset a sorting cursor back to its original empty state.
*/
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
  int i;
  (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
................................................................................
  if( rc==SQLITE_OK ){
    i64 max = SQLITE_MAX_MMAP_SIZE;
    sqlite3OsFileControlHint( *ppFile, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
  }
  return rc;
}

/*
** If it has not already been allocated, allocate the UnpackedRecord 
** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or 
** if no allocation was required), or SQLITE_NOMEM otherwise.
*/
static int vdbeSortAllocUnpacked(SortSubtask *pTask){
  if( pTask->pUnpacked==0 ){
    char *pFree;
    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
        pTask->pSorter->pKeyInfo, 0, 0, &pFree
    );
    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
    if( pFree==0 ) return SQLITE_NOMEM;
    pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
    pTask->pUnpacked->errCode = 0;
  }
  return SQLITE_OK;
}


/*
................................................................................
**       in the PMA (not including the varint itself).
**
**     * One or more records packed end-to-end in order of ascending keys. 
**       Each record consists of a varint followed by a blob of data (the 
**       key). The varint is the number of bytes in the blob of data.
*/
static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
  sqlite3 *db = pTask->pSorter->db;
  int rc = SQLITE_OK;             /* Return code */
  PmaWriter writer;               /* Object used to write to the file */

#ifdef SQLITE_DEBUG
  /* Set iSz to the expected size of file pTask->file after writing the PMA. 
  ** This is used by an assert() statement at the end of this function.  */
  i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
................................................................................

  vdbeSorterWorkDebug(pTask, "enter");
  memset(&writer, 0, sizeof(PmaWriter));
  assert( pList->szPMA>0 );

  /* If the first temporary PMA file has not been opened, open it now. */
  if( pTask->file.pFd==0 ){
    rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file.pFd);
    assert( rc!=SQLITE_OK || pTask->file.pFd );
    assert( pTask->file.iEof==0 );
    assert( pTask->nPMA==0 );
  }

  /* Try to get the file to memory map */
  if( rc==SQLITE_OK ){

    vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);

  }

  /* Sort the list */
  if( rc==SQLITE_OK ){
    rc = vdbeSorterSort(pTask, pList);
  }

  if( rc==SQLITE_OK ){
    SorterRecord *p;
    SorterRecord *pNext = 0;

    vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
                      pTask->file.iEof);
    pTask->nPMA++;
    vdbePmaWriteVarint(&writer, pList->szPMA);
    for(p=pList->pList; p; p=pNext){
      pNext = p->u.pNext;
      vdbePmaWriteVarint(&writer, p->nVal);
      vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
................................................................................
    *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
  }

  return rc;
}

/*
** The main routine for background threads that write level-0 PMAs.
*/
static void *vdbeSorterFlushThread(void *pCtx){
  SortSubtask *pTask = (SortSubtask*)pCtx;
  int rc;                         /* Return code */
  assert( pTask->bDone==0 );
  rc = vdbeSorterListToPMA(pTask, &pTask->list);
  pTask->bDone = 1;
  return SQLITE_INT_TO_PTR(rc);
}

/*
** Flush the current contents of VdbeSorter.list to a new PMA, possibly
** using a background thread.
*/
................................................................................
  ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
  ** fall back to using the final sub-task. The first (pSorter->nTask-1)
  ** sub-tasks are prefered as they use background threads - the final 
  ** sub-task uses the main thread. */
  for(i=0; i<nWorker; i++){
    int iTest = (pSorter->iPrev + i + 1) % nWorker;
    pTask = &pSorter->aTask[iTest];
    if( pTask->bDone ){
      rc = vdbeSorterJoinThread(pTask);
    }
    if( pTask->pThread==0 || rc!=SQLITE_OK ) break;
  }

  if( rc==SQLITE_OK ){
    if( i==nWorker ){
      /* Use the foreground thread for this operation */
      rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
    }else{
      /* Launch a background thread for this operation */
      u8 *aMem = pTask->list.aMemory;
      void *pCtx = (void*)pTask;

      assert( pTask->pThread==0 && pTask->bDone==0 );
      assert( pTask->list.pList==0 );
      assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );

      pSorter->iPrev = (pTask - pSorter->aTask);
      pTask->list = pSorter->list;
      pSorter->list.pList = 0;
      pSorter->list.szPMA = 0;
................................................................................
        pSorter->list.aMemory = aMem;
        pSorter->nMemory = sqlite3MallocSize(aMem);
      }else{
        pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
        if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
      }

      rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
    }
  }

  return rc;
#endif
}

................................................................................
** except that the number-of-bytes varint is omitted from the start.
*/
static int vdbeIncrPopulate(IncrMerger *pIncr){
  int rc = SQLITE_OK;
  int rc2;
  i64 iStart = pIncr->iStartOff;
  SorterFile *pOut = &pIncr->aFile[1];
  SortSubtask *pTask = pIncr->pTask;
  MergeEngine *pMerger = pIncr->pMerger;
  PmaWriter writer;
  assert( pIncr->bEof==0 );

  vdbeSorterPopulateDebug(pTask, "enter");

  vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
  while( rc==SQLITE_OK ){
    int dummy;
    PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
    int nKey = pReader->nKey;
    i64 iEof = writer.iWriteOff + writer.iBufEnd;

    /* Check if the output file is full or if the input has been exhausted.
................................................................................
    ** In either case exit the loop. */
    if( pReader->pFile==0 ) break;
    if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;

    /* Write the next key to the output. */
    vdbePmaWriteVarint(&writer, nKey);
    vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
    rc = vdbeSorterNext(pTask, pIncr->pMerger, &dummy);
  }

  rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  if( rc==SQLITE_OK ) rc = rc2;
  vdbeSorterPopulateDebug(pTask, "exit");
  return rc;
}

#if SQLITE_MAX_WORKER_THREADS>0
/*
** The main routine for background threads that populate aFile[1] of
** multi-threaded IncrMerger objects.
*/
static void *vdbeIncrPopulateThread(void *pCtx){
  IncrMerger *pIncr = (IncrMerger*)pCtx;
  void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
  pIncr->pTask->bDone = 1;
  return pRet;
}


/*
** Launch a background thread to populate aFile[1] of pIncr.
*/
static int vdbeIncrBgPopulate(IncrMerger *pIncr){
  void *p = (void*)pIncr;
  assert( pIncr->bUseThread );
  return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
}
#endif

/*
** This function is called when the PmaReader corresponding to pIncr has
** finished reading the contents of aFile[0]. Its purpose is to "refill"
** aFile[0] such that the iterator should start rereading it from the
** beginning.
**
** For single-threaded objects, this is accomplished by literally reading 
** keys from pIncr->pMerger and repopulating aFile[0]. 
**
** For multi-threaded objects, all that is required is to wait until the 
** background thread is finished (if it is not already) and then swap 
** aFile[0] and aFile[1] in place. If the contents of pMerger have not
** been exhausted, this function also launches a new background thread
** to populate the new aFile[1].
**
** SQLITE_OK is returned on success, or an SQLite error code otherwise.
*/
static int vdbeIncrSwap(IncrMerger *pIncr){
  int rc = SQLITE_OK;

#if SQLITE_MAX_WORKER_THREADS>0
  if( pIncr->bUseThread ){
    rc = vdbeSorterJoinThread(pIncr->pTask);

    if( rc==SQLITE_OK ){
      SorterFile f0 = pIncr->aFile[0];
      pIncr->aFile[0] = pIncr->aFile[1];
      pIncr->aFile[1] = f0;
    }

................................................................................
      pIncr->bEof = 1;
    }
  }

  return rc;
}


/*
** Allocate and return a new IncrMerger object to read data from pMerger.












*/
static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
  IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
  if( pIncr ){
    memset(pIncr, 0, sizeof(IncrMerger));
    pIncr->pMerger = pMerger;
    pIncr->pTask = pTask;
    pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
    pTask->file2.iEof += pIncr->mxSz;
  }
  return pIncr;
}

/*
** Set the "use-threads" flag on object pIncr.
*/
static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  if( bUseThread ){
    pIncr->bUseThread = 1;
    pIncr->pTask->file2.iEof -= pIncr->mxSz;
  }
}

................................................................................
}

static int vdbeIncrInit2(PmaReader *pIter, int eMode){
  int rc = SQLITE_OK;
  IncrMerger *pIncr = pIter->pIncr;
  if( pIncr ){
    SortSubtask *pTask = pIncr->pTask;
    sqlite3 *db = pTask->pSorter->db;

    rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode);

    /* Set up the required files for pIncr */
    if( rc==SQLITE_OK ){
      if( pIncr->bUseThread==0 ){
        if( pTask->file2.pFd==0 ){
          rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file2.pFd);
          assert( pTask->file2.iEof>0 );
          if( rc==SQLITE_OK ){
            vdbeSorterExtendFile(db, pTask->file2.pFd, pTask->file2.iEof);
            pTask->file2.iEof = 0;
          }
        }
        if( rc==SQLITE_OK ){
          pIncr->aFile[1].pFd = pTask->file2.pFd;
          pIncr->iStartOff = pTask->file2.iEof;
          pTask->file2.iEof += pIncr->mxSz;
        }
      }else{
        rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[0].pFd);
        if( rc==SQLITE_OK ){
          rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[1].pFd);
        }
      }
    }

    if( rc==SQLITE_OK && pIncr->bUseThread ){
      /* Use the current thread */
      assert( eMode==INCRINIT2_ROOT || eMode==INCRINIT2_TASK );
................................................................................
  return rc;
}

#if SQLITE_MAX_WORKER_THREADS>0
static void *vdbeIncrInit2Thread(void *pCtx){
  PmaReader *pReader = (PmaReader*)pCtx;
  void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) );
  pReader->pIncr->pTask->bDone = 1;
  return pRet;
}

static int vdbeIncrBgInit2(PmaReader *pIter){
  void *pCtx = (void*)pIter;
  return vdbeSorterCreateThread(pIter->pIncr->pTask, vdbeIncrInit2Thread, pCtx);


}
#endif

/*
** Allocate a new MergeEngine object to merge the contents of nPMA level-0
** PMAs from pTask->file. If no error occurs, set *ppOut to point to
** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
................................................................................
/*
** Populate iterator *pIter so that it may be used to iterate through all 
** keys stored in all PMAs created by this sorter.
*/
static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
  SortSubtask *pTask0 = &pSorter->aTask[0];
  MergeEngine *pMain = 0;
  sqlite3 *db = pTask0->pSorter->db;
  int rc = SQLITE_OK;
  int iTask;

  IncrBuilder *aMerge;
  const int nMerge = 32;
  aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
  if( aMerge==0 ) return SQLITE_NOMEM;