SQLite

Check-in [98bf0307b1]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Avoid having the sorter merge too many PMAs at a time when incrementally merging data following a SorterRewind().
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | threads-experimental
Files: files | file ages | folders
SHA1: 98bf0307b121b0776a7170108cc8d3f948a7ebfe
User & Date: dan 2014-04-11 19:43:07.755
Context
2014-04-12
19:34
Fix many issues with new code. (check-in: 62c406a042 user: dan tags: threads-experimental)
2014-04-11
19:43
Avoid having the sorter merge too many PMAs at a time when incrementally merging data following a SorterRewind(). (check-in: 98bf0307b1 user: dan tags: threads-experimental)
2014-04-09
20:04
Experimental multi-threaded sorting changes to allow the sorter to begin returning items to the VDBE before all data is sorted. (check-in: f9d5e09afa user: dan tags: threads-experimental)
Changes
Side-by-Side Diff Ignore Whitespace Patch
Changes to src/shell.c.
3531
3532
3533
3534
3535
3536
3537
3538

3539
3540
3541
3542
3543
3544
3545
3531
3532
3533
3534
3535
3536
3537

3538
3539
3540
3541
3542
3543
3544
3545







-
+







  memcpy(data->separator,"|", 2);
  data->showHeader = 0;
  sqlite3_config(SQLITE_CONFIG_URI, 1);
  sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data);
  sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
  sqlite3_snprintf(sizeof(continuePrompt), continuePrompt,"   ...> ");
  sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3);
  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4);
}

/*
** Output text to the console in a font that attracts extra attention.
*/
#ifdef _WIN32
static void printBold(const char *zText){
Changes to src/vdbesort.c.
160
161
162
163
164
165
166
167


168
169
170
171
172
173
174
160
161
162
163
164
165
166

167
168
169
170
171
172
173
174
175







-
+
+







  u8 eWork;                       /* One of the SORT_SUBTASK_* constants */
  int nConsolidate;               /* For SORT_SUBTASK_CONS, max final PMAs */
  SorterRecord *pList;            /* List of records for pTask to sort */
  int nInMemory;                  /* Expected size of PMA based on pList */
  u8 *aListMemory;                /* Records memory (or NULL) */

  int nPMA;                       /* Number of PMAs currently in file */
  SorterFile file;
  SorterFile file;                /* Temp file for level-0 PMAs */
  SorterFile file2;               /* Space for other PMAs */
};


/*
** The MergeEngine object is used to combine two or more smaller PMAs into
** one big PMA using a merge operation.  Separate PMAs all need to be
** combined into one big PMA in order to be able to step through the sorted
236
237
238
239
240
241
242





243
244
245
246
247
248
249
250
251

252
253
254
255
256
257
258
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265







+
+
+
+
+









+







  int *aTree;                /* Current state of incremental merge */
  PmaReader *aIter;          /* Array of iterators to merge data from */
};

/*
** Main sorter structure. A single instance of this is allocated for each 
** sorter cursor created by the VDBE.
**
** mxKeysize:
**   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
**   this variable is updated so as to be set to the size on disk of the
**   largest record in the sorter.
*/
struct VdbeSorter {
  int nInMemory;                  /* Current size of pRecord list as PMA */
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  int bUsePMA;                    /* True if one or more PMAs created */
  int bUseThreads;                /* True if one or more PMAs created */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  PmaReader *pReader;             /* Read data from here after Rewind() */
  int mxKeysize;                  /* Largest serialized key seen so far */
  UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */
  int iPrev;                      /* Previous thread used to flush PMA */
  int nTask;                      /* Size of aTask[] array */
  SortSubtask aTask[1];           /* One or more subtasks */
273
274
275
276
277
278
279






280
281
282




283

284
285
286
287
288
289
290
291
292
293
280
281
282
283
284
285
286
287
288
289
290
291
292
293

294
295
296
297
298
299
300
301


302
303
304
305
306
307
308







+
+
+
+
+
+

-

+
+
+
+

+

-
-







  u8 *aKey;                       /* Pointer to current key */
  u8 *aBuffer;                    /* Current read buffer */
  int nBuffer;                    /* Size of read buffer in bytes */
  u8 *aMap;                       /* Pointer to mapping of entire file */
  IncrMerger *pIncr;              /* Incremental merger */
};

/*
** Normally, a PmaReader object iterates through an existing PMA stored 
** within a temp file. However, if the PmaReader.pIncr variable points to
** an object of the following type, it may be used to iterate/merge through
** multiple PMAs simultaneously.
*/
struct IncrMerger {
  int mxSz;                       /* Maximum size of files */
  SortSubtask *pTask;             /* Task that owns this merger */
  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
  MergeEngine *pMerger;           /* Merge engine thread reads data from */
  i64 iStartOff;                  /* Offset to start writing file at */
  int mxSz;                       /* Maximum bytes of data to store */
  int bEof;                       /* Set to true when merge is finished */
  int bUseThread;                 /* True to use a bg thread for this object */
  SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
  MergeEngine *pMerger;           /* Merge engine thread reads data from */
  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
};

/*
** An instance of this object is used for writing a PMA.
**
** The PMA is written one record at a time.  Each record is of an arbitrary
** size.  But I/O is more efficient if it occurs in page-sized blocks where
502
503
504
505
506
507
508
509

510
511
512
513
514
515
516
517
518


















519
520
521
522
523
524
525
517
518
519
520
521
522
523

524
525
526
527
528
529




530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554







-
+





-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+








  assert( pIncr->bEof==0 );

  if( pIter->aMap ){
    sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
    pIter->aMap = 0;
  }
  pIter->iReadOff = 0;
  pIter->iReadOff = pIncr->iStartOff;
  pIter->iEof = pIncr->aFile[0].iEof;
  pIter->pFile = pIncr->aFile[0].pFd;

  rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
  if( rc==SQLITE_OK ){
    if( pIter->aMap==0 && pIter->aBuffer==0 ){
      pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
      if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
      pIter->nBuffer = pTask->pgsz;
    if( pIter->aMap==0 ){
      /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
      int iBuf = pIter->iReadOff % pTask->pgsz;
      if( pIter->aBuffer==0 ){
        pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
        if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
        pIter->nBuffer = pTask->pgsz;
      }
      if( iBuf ){
        int nRead = pTask->pgsz - iBuf;
        if( (pIter->iReadOff + nRead) > pIter->iEof ){
          nRead = (int)(pIter->iEof - pIter->iReadOff);
        }
        rc = sqlite3OsRead(
            pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
        );
        assert( rc!=SQLITE_IOERR_SHORT_READ );
      }
    }
  }

  return rc;
}


573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592

593
594
595
596
597
598
599

600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617















618
619
620
621
622
623
624
625
602
603
604
605
606
607
608

609
610
611
612
613
614
615
616
617
618
619

620


621
622
623
624

625


















626
627
628
629
630
631
632
633
634
635
636
637
638
639
640

641
642
643
644
645
646
647







-











-
+
-
-




-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-







  SorterFile *pFile,              /* Sorter file to read from */
  i64 iStart,                     /* Start offset in pFile */
  PmaReader *pIter,               /* Iterator to populate */
  i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
){
  int rc = SQLITE_OK;
  int nBuf = pTask->pgsz;
  void *pMap = 0;                 /* Mapping of temp file */

  assert( pFile->iEof>iStart );
  assert( pIter->aAlloc==0 );
  assert( pIter->aBuffer==0 );
  pIter->pFile = pFile->pFd;
  pIter->iReadOff = iStart;
  pIter->nAlloc = 128;
  pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
  if( pIter->aAlloc ){
    /* Try to xFetch() a mapping of the entire temp file. If this is possible,
    ** the PMA will be read via the mapping. Otherwise, use xRead().  */
    if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
    rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
      rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap);
    }
  }else{
    rc = SQLITE_NOMEM;
  }

  if( rc==SQLITE_OK ){
  if( rc==SQLITE_OK && pIter->aMap==0 ){
    if( pMap ){
      pIter->aMap = (u8*)pMap;
    }else{
      pIter->nBuffer = nBuf;
      pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
      if( !pIter->aBuffer ){
        rc = SQLITE_NOMEM;
      }else{
        int iBuf = iStart % nBuf;
        if( iBuf ){
          int nRead = nBuf - iBuf;
          if( (iStart + nRead) > pFile->iEof ){
            nRead = (int)(pFile->iEof - iStart);
          }
          rc = sqlite3OsRead(
              pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
          );
          assert( rc!=SQLITE_IOERR_SHORT_READ );
    pIter->nBuffer = nBuf;
    pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
    if( !pIter->aBuffer ){
      rc = SQLITE_NOMEM;
    }else{
      int iBuf = iStart % nBuf;
      if( iBuf ){
        int nRead = nBuf - iBuf;
        if( (iStart + nRead) > pFile->iEof ){
          nRead = (int)(pFile->iEof - iStart);
        }
        rc = sqlite3OsRead(
            pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
        );
        assert( rc!=SQLITE_IOERR_SHORT_READ );
        }
      }
    }
  }

  if( rc==SQLITE_OK ){
    u64 nByte;                    /* Size of PMA in bytes */
    pIter->iEof = pFile->iEof;
801
802
803
804
805
806
807





808
809
810
811
812
813
814
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841







+
+
+
+
+







  }
  pTask->pList = 0;
  if( pTask->file.pFd ){
    sqlite3OsCloseFree(pTask->file.pFd);
    pTask->file.pFd = 0;
    pTask->file.iEof = 0;
  }
  if( pTask->file2.pFd ){
    sqlite3OsCloseFree(pTask->file2.pFd);
    pTask->file2.pFd = 0;
    pTask->file2.iEof = 0;
  }
}

/*
** Join all threads.  
*/
#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
835
836
837
838
839
840
841
842

843
844
845
846
847
848
849
862
863
864
865
866
867
868

869
870
871
872
873
874
875
876







-
+







** Allocate a new MergeEngine object with space for nIter iterators.
*/
static MergeEngine *vdbeMergeEngineNew(int nIter){
  int N = 2;                      /* Smallest power of two >= nIter */
  int nByte;                      /* Total bytes of space to allocate */
  MergeEngine *pNew;              /* Pointer to allocated object to return */

  /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */
  assert( nIter<=SORTER_MAX_MERGE_COUNT );

  while( N<nIter ) N += N;
  nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));

  pNew = (MergeEngine*)sqlite3MallocZero(nByte);
  if( pNew ){
    pNew->nTree = N;
884
885
886
887
888
889
890

891
892
893
894
895
896
897
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925







+







  if( pSorter->aMemory==0 ){
    vdbeSorterRecordFree(0, pSorter->pRecord);
  }
  pSorter->pRecord = 0;
  pSorter->nInMemory = 0;
  pSorter->bUsePMA = 0;
  pSorter->iMemory = 0;
  pSorter->mxKeysize = 0;
  sqlite3DbFree(db, pSorter->pUnpacked);
  pSorter->pUnpacked = 0;
}

/*
** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
*/
1255
1256
1257
1258
1259
1260
1261









1262
1263
1264

1265














1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284

1285
1286
1287
1288
1289
1290

1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333



1334






1335




1336
1337
1338
1339
1340
1341
1342







+
+
+
+
+
+
+
+
+



+

+
+
+
+
+
+
+
+
+
+
+
+
+
+
















-
-
-
+
-
-
-
-
-
-
+
-
-
-
-







  fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
}
static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
  i64 t;
  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
  fprintf(stderr, "%lld:X %s\n", t, zEvent);
}
static void vdbeSorterPopulateDebug(
  SortSubtask *pTask,
  const char *zEvent
){
  i64 t;
  int iTask = (pTask - pTask->pSorter->aTask);
  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
  fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
}
#else
# define vdbeSorterWorkDebug(x,y)
# define vdbeSorterRewindDebug(x,y)
# define vdbeSorterPopulateDebug(x,y)
#endif

static int vdbeSortAllocUnpacked(SortSubtask *pTask){
  if( pTask->pUnpacked==0 ){
    char *pFree;
    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
        pTask->pKeyInfo, 0, 0, &pFree
    );
    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
    if( pFree==0 ) return SQLITE_NOMEM;
    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
    pTask->pUnpacked->errCode = 0;
  }
  return SQLITE_OK;
}

/*
** The main routine for sorter-thread operations.
*/
static void *vdbeSortSubtaskMain(void *pCtx){
  int rc = SQLITE_OK;
  SortSubtask *pTask = (SortSubtask*)pCtx;

  assert( pTask->eWork==SORT_SUBTASK_SORT
       || pTask->eWork==SORT_SUBTASK_TO_PMA
       || pTask->eWork==SORT_SUBTASK_CONS
  );
  assert( pTask->bDone==0 );

  vdbeSorterWorkDebug(pTask, "enter");

  if( pTask->pUnpacked==0 ){
    char *pFree;
    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
  rc = vdbeSortAllocUnpacked(pTask);
        pTask->pKeyInfo, 0, 0, &pFree
    );
    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
    if( pFree==0 ){
      rc = SQLITE_NOMEM;
      goto thread_out;
  if( rc!=SQLITE_OK ) goto thread_out;
    }
    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
    pTask->pUnpacked->errCode = 0;
  }

  if( pTask->eWork==SORT_SUBTASK_CONS ){
    assert( pTask->pList==0 );
    while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
      int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
      sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
      MergeEngine *pMerger;         /* Object for reading/merging PMA data */
1529
1530
1531
1532
1533
1534
1535



1536
1537
1538
1539
1540
1541
1542
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586







+
+
+







      pSorter->nInMemory = 0;
      pSorter->iMemory = 0;
      assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
    }
  }

  pSorter->nInMemory += nPMA;
  if( nPMA>pSorter->mxKeysize ){
    pSorter->mxKeysize = nPMA;
  }

  if( pSorter->aMemory ){
    int nMin = pSorter->iMemory + nReq;

    if( nMin>pSorter->nMemory ){
      u8 *aNew;
      int nNew = pSorter->nMemory * 2;
1587
1588
1589
1590
1591
1592
1593

1594
1595
1596
1597
1598


1599

1600
1601
1602
1603
1604
1605
1606
1607
1608
1609

1610
1611
1612
1613
1614
1615
1616
1617
1618

1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630

1631


1632

1633
1634
1635

1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647











1648
1649
1650
1651
1652
1653
1654
1655
1656
1657


















1658
1659
1660
1661
1662
1663
1664


1665
1666
1667
1668
1669
1670
1671
1672













































































































































































1673
1674
1675
1676
1677

1678
1679
1680


1681













1682

1683
1684
1685







1686
1687
1688
1689
1690
1691





1692
1693

1694
1695

1696
1697
1698
1699
1700
1701




1702
1703
1704
1705



1706
1707
1708
1709
1710
1711



















1712
1713
1714
1715
1716




1717
1718
1719
1720
1721









1722
1723
1724
1725
1726













1727
1728
1729
1730


1731
1732
1733
1734
1735


1736
1737
1738

1739


1740
1741
1742
1743
1744
1745
1746
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645

1646
1647
1648
1649
1650
1651
1652
1653
1654
1655

1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677

1678
1679
1680
1681

1682
1683
1684
1685
1686
1687
1688
1689
1690
1691







1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703









1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730








1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907

1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927

1928



1929
1930
1931
1932
1933
1934
1935
1936





1937
1938
1939
1940
1941


1942


1943






1944
1945
1946
1947




1948
1949
1950






1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969





1970
1971
1972
1973





1974
1975
1976
1977
1978
1979
1980
1981
1982
1983




1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996




1997
1998





1999
2000

2001

2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012







+





+
+
-
+









-
+









+











-
+

+
+
-
+



+





-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+

-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+




-
+



+
+

+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
-
+
+
+
+
+
+
+

-
-
-
-
-
+
+
+
+
+
-
-
+
-
-
+
-
-
-
-
-
-
+
+
+
+
-
-
-
-
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
+
+
+
+
-
-
-
-
-
+
+
+
+
+
+
+
+
+

-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
+
+
-
-
-
-
-
+
+
-

-
+

+
+







** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
** of the data stored in aFile[1] is the same as that used by regular PMAs,
** except that the number-of-bytes varint is omitted from the start.
*/
static int vdbeIncrPopulate(IncrMerger *pIncr){
  int rc = SQLITE_OK;
  int rc2;
  i64 iStart = pIncr->iStartOff;
  SorterFile *pOut = &pIncr->aFile[1];
  MergeEngine *pMerger = pIncr->pMerger;
  PmaWriter writer;
  assert( pIncr->bEof==0 );

  vdbeSorterPopulateDebug(pIncr->pTask, "enter");

  vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0);
  vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
  while( rc==SQLITE_OK ){
    int dummy;
    PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
    int nKey = pReader->nKey;
    i64 iEof = writer.iWriteOff + writer.iBufEnd;

    /* Check if the output file is full or if the input has been exhausted.
    ** In either case exit the loop. */
    if( pReader->pFile==0 ) break;
    if( iEof && (iEof + nKey)>pIncr->mxSz ) break;
    if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;

    /* Write the next key to the output. */
    vdbePmaWriteVarint(&writer, nKey);
    vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
    rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
  }

  rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  if( rc==SQLITE_OK ) rc = rc2;
  vdbeSorterPopulateDebug(pIncr->pTask, "exit");
  return rc;
}

static void *vdbeIncrPopulateThreadMain(void *pCtx){
  IncrMerger *pIncr = (IncrMerger*)pCtx;
  return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
}

static int vdbeIncrBgPopulate(IncrMerger *pIncr){
  int rc;
  assert( pIncr->pThread==0 );
  if( pIncr->pTask->pSorter->bUseThreads==0 ){
  if( pIncr->bUseThread==0 ){
    rc = vdbeIncrPopulate(pIncr);
  }
#if SQLITE_MAX_WORKER_THREADS>0
  }else{
  else{
    void *pCtx = (void*)pIncr;
    rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
  }
#endif
  return rc;
}

static int vdbeIncrSwap(IncrMerger *pIncr){
  int rc = SQLITE_OK;
  
  if( pIncr->pThread ){
    void *pRet;
    rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
    pIncr->pThread = 0;
  }

  if( pIncr->bUseThread ){
#if SQLITE_MAX_WORKER_THREADS>0
    if( pIncr->pThread ){
      void *pRet;
      assert( pIncr->bUseThread );
      rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
      pIncr->pThread = 0;
    }
#endif

  if( rc==SQLITE_OK ){
    SorterFile f0 = pIncr->aFile[0];
    pIncr->aFile[0] = pIncr->aFile[1];
    pIncr->aFile[1] = f0;

    if( pIncr->aFile[0].iEof==0 ){
      pIncr->bEof = 1;
    }else{
      rc = vdbeIncrBgPopulate(pIncr);
    if( rc==SQLITE_OK ){
      SorterFile f0 = pIncr->aFile[0];
      pIncr->aFile[0] = pIncr->aFile[1];
      pIncr->aFile[1] = f0;
    }

    if( rc==SQLITE_OK ){
      if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
        pIncr->bEof = 1;
      }else{
        rc = vdbeIncrBgPopulate(pIncr);
      }
    }
  }else{
    rc = vdbeIncrPopulate(pIncr);
    pIncr->aFile[0] = pIncr->aFile[1];
    if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
      pIncr->bEof = 1;
    }
  }

  return rc;
}

static void vdbeIncrFree(IncrMerger *pIncr){
  if( pIncr ){
#if SQLITE_MAX_WORKER_THREADS>0
  if( pIncr->pThread ){
    void *pRet;
    sqlite3ThreadJoin(pIncr->pThread, &pRet);
  }
  if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
  if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
  vdbeMergeEngineFree(pIncr->pMerger);
  sqlite3_free(pIncr);
    if( pIncr->pThread ){
      void *pRet;
      sqlite3ThreadJoin(pIncr->pThread, &pRet);
    }
    if( pIncr->bUseThread ){
      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
    }
#endif
    vdbeMergeEngineFree(pIncr->pMerger);
    sqlite3_free(pIncr);
  }
}

static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
  IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
  if( pIncr ){
    memset(pIncr, 0, sizeof(IncrMerger));
    pIncr->pMerger = pMerger;
    pIncr->pTask = pTask;
    pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
    pTask->file2.iEof += pIncr->mxSz;

#if 0
    /* Open the two temp files. */
    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
    if( rc==SQLITE_OK ){
      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
    }
    if( rc!=SQLITE_OK ){
      vdbeIncrFree(pIncr);
      pIncr = 0;
    }
#endif
  }
  return pIncr;
}

static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  if( bUseThread ){
    pIncr->bUseThread = 1;
    pIncr->pTask->file2.iEof -= pIncr->mxSz;
  }
}

static int vdbeIncrInit2(PmaReader *pIter){
  int rc = SQLITE_OK;
  IncrMerger *pIncr = pIter->pIncr;
  if( pIncr ){
    SortSubtask *pTask = pIncr->pTask;
    int i;
    MergeEngine *pMerger = pIncr->pMerger;

    for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
      rc = vdbeIncrInit2(&pMerger->aIter[i]);
    }
    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
    }

    /* Set up the required files for pIncr */
    if( rc==SQLITE_OK ){
      if( pIncr->bUseThread==0 ){
        if( pTask->file2.pFd==0 ){
          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
          assert( pTask->file2.iEof>0 );
          if( rc==SQLITE_OK ){
            vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
            pTask->file2.iEof = 0;
          }
        }
        if( rc==SQLITE_OK ){
          pIncr->aFile[1].pFd = pTask->file2.pFd;
          pIncr->iStartOff = pTask->file2.iEof;
          pTask->file2.iEof += pIncr->mxSz;
        }
      }else{
        rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
        if( rc==SQLITE_OK ){
          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
        }
      }
    }

    if( rc==SQLITE_OK && pIncr->bUseThread ){
      rc = vdbeIncrBgPopulate(pIncr);
    }

    if( rc==SQLITE_OK ){
      rc = vdbePmaReaderNext(pIter);
    }
  }
  return rc;
}

/*
** Allocate a new MergeEngine object to merge the contents of nPMA level-0
** PMAs from pTask->file. If no error occurs, set *ppOut to point to
** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
** to NULL and return an SQLite error code.
**
** When this function is called, *piOffset is set to the offset of the
** first PMA to read from pTask->file. Assuming no error occurs, it is 
** set to the offset immediately following the last byte of the last
** PMA before returning. If an error does occur, then the final value of
** *piOffset is undefined.
*/
static int vdbeMergeEngineLevel0(
  SortSubtask *pTask,             /* Sorter task to read from */
  int nPMA,                       /* Number of PMAs to read */
  i64 *piOffset,                  /* IN/OUT: Read offset in pTask->file */
  MergeEngine **ppOut             /* OUT: New merge-engine */
){
  MergeEngine *pNew;              /* Merge engine to return */
  i64 iOff = *piOffset;
  int i;
  int rc = SQLITE_OK;

  *ppOut = pNew = vdbeMergeEngineNew(nPMA);
  if( pNew==0 ) rc = SQLITE_NOMEM;

  for(i=0; i<nPMA && rc==SQLITE_OK; i++){
    i64 nDummy;
    PmaReader *pIter = &pNew->aIter[i];
    rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pIter, &nDummy);
    iOff = pIter->iEof;
  }

  if( rc!=SQLITE_OK ){
    vdbeMergeEngineFree(pNew);
    *ppOut = 0;
  }
  *piOffset = iOff;
  return rc;
}

typedef struct IncrBuilder IncrBuilder;
struct IncrBuilder {
  int nPMA;                     /* Number of iterators used so far */
  MergeEngine *pMerger;         /* Merge engine to populate. */
};

static int vdbeAddToBuilder(
  SortSubtask *pTask,
  IncrBuilder *pBuilder, 
  MergeEngine *pMerger
){
  int rc = SQLITE_OK;
  IncrMerger *pIncr;

  assert( pMerger );
  if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){
    rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger);
    pBuilder->pMerger = 0;
    pBuilder->nPMA = 0;
  }

  if( rc==SQLITE_OK && pBuilder->pMerger==0 ){
    pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
    if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM;
  }

  if( rc==SQLITE_OK ){
    pIncr = vdbeIncrNew(pTask, pMerger);
    if( pIncr==0 ) rc = SQLITE_NOMEM;
    pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr;
  }

  if( rc!=SQLITE_OK ){
    vdbeMergeEngineFree(pMerger);
  }

  return rc;
}

/*
** Populate iterator *pIter so that it may be used to iterate through all 
** keys stored in subtask pTask using the incremental merge method.
** keys stored in all PMAs created by this sorter.
*/
static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
  SortSubtask *pTask0 = &pSorter->aTask[0];
  MergeEngine *pMain = 0;
  sqlite3 *db = pTask0->db;
  int rc = SQLITE_OK;
  int iTask;

  IncrBuilder *aMerge;
  const int nMerge = 32;
  aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
  if( aMerge==0 ) return SQLITE_NOMEM;

  if( pSorter->nTask>1 ){
    pMain = vdbeMergeEngineNew(pSorter->nTask);
    if( pMain==0 ) rc = SQLITE_NOMEM;
  }

  for(iTask=0; iTask<pSorter->nTask && rc==SQLITE_OK; iTask++){
  MergeEngine *pMerger = 0;
    MergeEngine *pRoot = 0;
  IncrMerger *pIncr = 0;
  int i;
  int nPMA = 0;
    int iPMA;
    i64 iReadOff = 0;
    SortSubtask *pTask = &pSorter->aTask[iTask];
    if( pTask->nPMA==0 ) continue;
    for(iPMA=0; iPMA<pTask->nPMA; iPMA += SORTER_MAX_MERGE_COUNT){
      MergeEngine *pMerger = 0;
      int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT);

  for(i=0; i<pSorter->nTask; i++){
    nPMA += pSorter->aTask[i].nPMA;
  }
  pMerger = vdbeMergeEngineNew(nPMA);
  if( pMerger==0 ){
      rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
      if( rc!=SQLITE_OK ) break;

      if( iPMA==0 ){
        pRoot = pMerger;
    rc = SQLITE_NOMEM;
  }else{
      }else{
    int iIter = 0;
    int iPMA;
        if( pRoot ){
    for(i=0; i<pSorter->nTask; i++){
      i64 iReadOff = 0;
      SortSubtask *pTask = &pSorter->aTask[i];
      for(iPMA=0; iPMA<pTask->nPMA; iPMA++){
        i64 nDummy = 0;
        PmaReader *pIter = &pMerger->aIter[iIter++];
          rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot);
          pRoot = 0;
          if( rc!=SQLITE_OK ){
            vdbeMergeEngineFree(pMerger);
        rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy);
        iReadOff = pIter->iEof;
      }
    }
            break;
          }
        }
    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
      rc = vdbeSorterDoCompare(pTask0, pMerger, i);
    }
  }

  if( rc==SQLITE_OK ){
        rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger);
      }
    }

    if( pRoot==0 ){
      int i;
      for(i=0; rc==SQLITE_OK && i<nMerge; i++){
        if( aMerge[i].pMerger ){
          if( pRoot ){
            rc = vdbeAddToBuilder(pTask, &aMerge[i], pRoot);
            if( rc!=SQLITE_OK ) break;
          }
          pRoot = aMerge[i].pMerger;
          aMerge[i].pMerger = 0;
        }
      }
    }

    if( rc==SQLITE_OK ){
    pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
    if( pIncr==0 ){
      rc = SQLITE_NOMEM;
    }else{
      memset(pIncr, 0, sizeof(IncrMerger));
      if( pMain==0 ){
        pMain = pRoot;
      }else{
        IncrMerger *pNew = vdbeIncrNew(pTask, pRoot);
      pIncr->mxSz = (pSorter->mxPmaSize / 2);
      pIncr->pMerger = pMerger;
      pIncr->pTask = pTask0;
    }
  }
        pMain->aIter[iTask].pIncr = pNew;
        if( pNew==0 ) rc = SQLITE_NOMEM;
      }
      memset(aMerge, 0, nMerge*sizeof(aMerge[0]));
    }
  }

  if( rc==SQLITE_OK ){
    SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];

  /* Open the two temp files. */
  if( rc==SQLITE_OK ){
    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd);
  }
    rc = vdbeSortAllocUnpacked(pLast);
    if( rc==SQLITE_OK ){
      pIter->pIncr = vdbeIncrNew(pLast, pMain);
      if( pIter->pIncr==0 ){
        rc = SQLITE_NOMEM;
      }else{
        vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
        for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
          IncrMerger *pIncr;
          if( (pIncr = pMain->aIter[iTask].pIncr) ){
            vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
            assert( pIncr->pTask!=pLast );
          }
  if( rc==SQLITE_OK ){
    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd);
  }

        }
      }
  /* Launch a background thread to populate aFile[1]. */
  if( rc==SQLITE_OK ){
    rc = vdbeIncrBgPopulate(pIncr);
  }

    }
  }
  pIter->pIncr = pIncr;
  if( rc==SQLITE_OK ){
    rc = vdbePmaReaderNext(pIter);
    rc = vdbeIncrInit2(pIter);
  }

  sqlite3_free(aMerge);
  return rc;
}


/*
** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
** this function is called to prepare for iterating through the records
Changes to test/sort2.test.
11
12
13
14
15
16
17



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34





















35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59









































60
61
11
12
13
14
15
16
17
18
19
20

















21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
























43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85







+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+


# This file implements regression tests for SQLite library. 
#

set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix sort2

foreach {tn script} {
  1 { }
  2 {
db close
sqlite3_shutdown
sqlite3_config_worker_threads 7
reset_db

do_execsql_test 1 {
  PRAGMA cache_size = 5;
  WITH r(x,y) AS (
    SELECT 1, randomblob(100)
    UNION ALL
    SELECT x+1, randomblob(100) FROM r
    LIMIT 100000
  )
  SELECT count(x), length(y) FROM r GROUP BY (x%5)
} {
  20000 100 20000 100 20000 100 20000 100 20000 100
}
    catch { db close }
    sqlite3_shutdown
    sqlite3_config_worker_threads 7
    reset_db
  }
} {

  eval $script

  do_execsql_test $tn.1 {
    PRAGMA cache_size = 5;
    WITH r(x,y) AS (
      SELECT 1, randomblob(100)
      UNION ALL
      SELECT x+1, randomblob(100) FROM r
      LIMIT 100000
    )
    SELECT count(x), length(y) FROM r GROUP BY (x%5)
  } {
    20000 100 20000 100 20000 100 20000 100 20000 100
  }

do_execsql_test 2.1 {
  CREATE TABLE t1(a, b);
  WITH r(x,y) AS (
    SELECT 1, randomblob(100)
    UNION ALL
    SELECT x+1, randomblob(100) FROM r
    LIMIT 10000
  ) INSERT INTO t1 SELECT * FROM r;
}

do_execsql_test 2.2 {
  CREATE UNIQUE INDEX i1 ON t1(b, a);
}

do_execsql_test 2.3 {
  CREATE UNIQUE INDEX i2 ON t1(a);
}

do_execsql_test 2.4 { PRAGMA integrity_check } {ok}

db close
sqlite3_shutdown
sqlite3_config_worker_threads 0
sqlite3_initialize
  do_execsql_test $tn.2.1 {
    CREATE TABLE t1(a, b);
    WITH r(x,y) AS (
      SELECT 1, randomblob(100)
      UNION ALL
      SELECT x+1, randomblob(100) FROM r
      LIMIT 10000
    ) INSERT INTO t1 SELECT * FROM r;
  }
  
  do_execsql_test $tn.2.2 {
    CREATE UNIQUE INDEX i1 ON t1(b, a);
  }
  
  do_execsql_test $tn.2.3 {
    CREATE UNIQUE INDEX i2 ON t1(a);
  }
  
  do_execsql_test $tn.2.4 { PRAGMA integrity_check } {ok}
  
  breakpoint
  do_execsql_test $tn.3 {
    PRAGMA cache_size = 5;
    WITH r(x,y) AS (
      SELECT 1, randomblob(100)
      UNION ALL
      SELECT x+1, randomblob(100) FROM r
      LIMIT 1000000
    )
    SELECT count(x), length(y) FROM r GROUP BY (x%5)
  } {
    200000 100 200000 100 200000 100 200000 100 200000 100
  }
  
  db close
  sqlite3_shutdown
  sqlite3_config_worker_threads 0
  sqlite3_initialize

}

finish_test