/ Check-in [98bf0307]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Avoid having the sorter merge too many PMAs at a time when incrementally merging data following a SorterRewind().
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads-experimental
Files: files | file ages | folders
SHA1: 98bf0307b121b0776a7170108cc8d3f948a7ebfe
User & Date: dan 2014-04-11 19:43:07
Context
2014-04-12
19:34
Fix many issues with new code. check-in: 62c406a0 user: dan tags: threads-experimental
2014-04-11
19:43
Avoid having the sorter merge too many PMAs at a time when incrementally merging data following a SorterRewind(). check-in: 98bf0307 user: dan tags: threads-experimental
2014-04-09
20:04
Experimental multi-threaded sorting changes to allow the sorter to begin returning items to the VDBE before all data is sorted. check-in: f9d5e09a user: dan tags: threads-experimental
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/shell.c.

  3531   3531     memcpy(data->separator,"|", 2);
  3532   3532     data->showHeader = 0;
  3533   3533     sqlite3_config(SQLITE_CONFIG_URI, 1);
  3534   3534     sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data);
  3535   3535     sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
  3536   3536     sqlite3_snprintf(sizeof(continuePrompt), continuePrompt,"   ...> ");
  3537   3537     sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
  3538         -  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3);
         3538  +  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4);
  3539   3539   }
  3540   3540   
  3541   3541   /*
  3542   3542   ** Output text to the console in a font that attracts extra attention.
  3543   3543   */
  3544   3544   #ifdef _WIN32
  3545   3545   static void printBold(const char *zText){

Changes to src/vdbesort.c.

   160    160     u8 eWork;                       /* One of the SORT_SUBTASK_* constants */
   161    161     int nConsolidate;               /* For SORT_SUBTASK_CONS, max final PMAs */
   162    162     SorterRecord *pList;            /* List of records for pTask to sort */
   163    163     int nInMemory;                  /* Expected size of PMA based on pList */
   164    164     u8 *aListMemory;                /* Records memory (or NULL) */
   165    165   
   166    166     int nPMA;                       /* Number of PMAs currently in file */
   167         -  SorterFile file;
          167  +  SorterFile file;                /* Temp file for level-0 PMAs */
          168  +  SorterFile file2;               /* Space for other PMAs */
   168    169   };
   169    170   
   170    171   
   171    172   /*
   172    173   ** The MergeEngine object is used to combine two or more smaller PMAs into
   173    174   ** one big PMA using a merge operation.  Separate PMAs all need to be
   174    175   ** combined into one big PMA in order to be able to step through the sorted
................................................................................
   236    237     int *aTree;                /* Current state of incremental merge */
   237    238     PmaReader *aIter;          /* Array of iterators to merge data from */
   238    239   };
   239    240   
   240    241   /*
   241    242   ** Main sorter structure. A single instance of this is allocated for each 
   242    243   ** sorter cursor created by the VDBE.
          244  +**
          245  +** mxKeysize:
          246  +**   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
          247  +**   this variable is updated so as to be set to the size on disk of the
          248  +**   largest record in the sorter.
   243    249   */
   244    250   struct VdbeSorter {
   245    251     int nInMemory;                  /* Current size of pRecord list as PMA */
   246    252     int mnPmaSize;                  /* Minimum PMA size, in bytes */
   247    253     int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   248    254     int bUsePMA;                    /* True if one or more PMAs created */
   249    255     int bUseThreads;                /* True if one or more PMAs created */
   250    256     SorterRecord *pRecord;          /* Head of in-memory record list */
   251    257     PmaReader *pReader;             /* Read data from here after Rewind() */
          258  +  int mxKeysize;                  /* Largest serialized key seen so far */
   252    259     UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
   253    260     u8 *aMemory;                    /* Block of memory to alloc records from */
   254    261     int iMemory;                    /* Offset of first free byte in aMemory */
   255    262     int nMemory;                    /* Size of aMemory allocation in bytes */
   256    263     int iPrev;                      /* Previous thread used to flush PMA */
   257    264     int nTask;                      /* Size of aTask[] array */
   258    265     SortSubtask aTask[1];           /* One or more subtasks */
................................................................................
   273    280     u8 *aKey;                       /* Pointer to current key */
   274    281     u8 *aBuffer;                    /* Current read buffer */
   275    282     int nBuffer;                    /* Size of read buffer in bytes */
   276    283     u8 *aMap;                       /* Pointer to mapping of entire file */
   277    284     IncrMerger *pIncr;              /* Incremental merger */
   278    285   };
   279    286   
          287  +/*
          288  +** Normally, a PmaReader object iterates through an existing PMA stored 
          289  +** within a temp file. However, if the PmaReader.pIncr variable points to
          290  +** an object of the following type, it may be used to iterate/merge through
          291  +** multiple PMAs simultaneously.
          292  +*/
   280    293   struct IncrMerger {
   281         -  int mxSz;                       /* Maximum size of files */
   282    294     SortSubtask *pTask;             /* Task that owns this merger */
   283         -  int bEof;                       /* Set to true when merge is finished */
   284         -  SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
   285         -  MergeEngine *pMerger;           /* Merge engine thread reads data from */
   286    295     SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
          296  +  MergeEngine *pMerger;           /* Merge engine thread reads data from */
          297  +  i64 iStartOff;                  /* Offset to start writing file at */
          298  +  int mxSz;                       /* Maximum bytes of data to store */
          299  +  int bEof;                       /* Set to true when merge is finished */
          300  +  int bUseThread;                 /* True to use a bg thread for this object */
          301  +  SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
   287    302   };
   288    303   
   289    304   /*
   290    305   ** An instance of this object is used for writing a PMA.
   291    306   **
   292    307   ** The PMA is written one record at a time.  Each record is of an arbitrary
   293    308   ** size.  But I/O is more efficient if it occurs in page-sized blocks where
................................................................................
   502    517   
   503    518     assert( pIncr->bEof==0 );
   504    519   
   505    520     if( pIter->aMap ){
   506    521       sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
   507    522       pIter->aMap = 0;
   508    523     }
   509         -  pIter->iReadOff = 0;
          524  +  pIter->iReadOff = pIncr->iStartOff;
   510    525     pIter->iEof = pIncr->aFile[0].iEof;
   511    526     pIter->pFile = pIncr->aFile[0].pFd;
   512    527   
   513    528     rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
   514    529     if( rc==SQLITE_OK ){
   515         -    if( pIter->aMap==0 && pIter->aBuffer==0 ){
   516         -      pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
   517         -      if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
   518         -      pIter->nBuffer = pTask->pgsz;
          530  +    if( pIter->aMap==0 ){
          531  +      /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
          532  +      int iBuf = pIter->iReadOff % pTask->pgsz;
          533  +      if( pIter->aBuffer==0 ){
          534  +        pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
          535  +        if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
          536  +        pIter->nBuffer = pTask->pgsz;
          537  +      }
          538  +      if( iBuf ){
          539  +        int nRead = pTask->pgsz - iBuf;
          540  +        if( (pIter->iReadOff + nRead) > pIter->iEof ){
          541  +          nRead = (int)(pIter->iEof - pIter->iReadOff);
          542  +        }
          543  +        rc = sqlite3OsRead(
          544  +            pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
          545  +        );
          546  +        assert( rc!=SQLITE_IOERR_SHORT_READ );
          547  +      }
   519    548       }
   520    549     }
   521    550   
   522    551     return rc;
   523    552   }
   524    553   
   525    554   
................................................................................
   573    602     SorterFile *pFile,              /* Sorter file to read from */
   574    603     i64 iStart,                     /* Start offset in pFile */
   575    604     PmaReader *pIter,               /* Iterator to populate */
   576    605     i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
   577    606   ){
   578    607     int rc = SQLITE_OK;
   579    608     int nBuf = pTask->pgsz;
   580         -  void *pMap = 0;                 /* Mapping of temp file */
   581    609   
   582    610     assert( pFile->iEof>iStart );
   583    611     assert( pIter->aAlloc==0 );
   584    612     assert( pIter->aBuffer==0 );
   585    613     pIter->pFile = pFile->pFd;
   586    614     pIter->iReadOff = iStart;
   587    615     pIter->nAlloc = 128;
   588    616     pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
   589    617     if( pIter->aAlloc ){
   590    618       /* Try to xFetch() a mapping of the entire temp file. If this is possible,
   591    619       ** the PMA will be read via the mapping. Otherwise, use xRead().  */
   592         -    if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
   593         -      rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap);
   594         -    }
          620  +    rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
   595    621     }else{
   596    622       rc = SQLITE_NOMEM;
   597    623     }
   598    624   
   599         -  if( rc==SQLITE_OK ){
   600         -    if( pMap ){
   601         -      pIter->aMap = (u8*)pMap;
          625  +  if( rc==SQLITE_OK && pIter->aMap==0 ){
          626  +    pIter->nBuffer = nBuf;
          627  +    pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
          628  +    if( !pIter->aBuffer ){
          629  +      rc = SQLITE_NOMEM;
   602    630       }else{
   603         -      pIter->nBuffer = nBuf;
   604         -      pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
   605         -      if( !pIter->aBuffer ){
   606         -        rc = SQLITE_NOMEM;
   607         -      }else{
   608         -        int iBuf = iStart % nBuf;
   609         -        if( iBuf ){
   610         -          int nRead = nBuf - iBuf;
   611         -          if( (iStart + nRead) > pFile->iEof ){
   612         -            nRead = (int)(pFile->iEof - iStart);
   613         -          }
   614         -          rc = sqlite3OsRead(
   615         -              pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
   616         -          );
   617         -          assert( rc!=SQLITE_IOERR_SHORT_READ );
          631  +      int iBuf = iStart % nBuf;
          632  +      if( iBuf ){
          633  +        int nRead = nBuf - iBuf;
          634  +        if( (iStart + nRead) > pFile->iEof ){
          635  +          nRead = (int)(pFile->iEof - iStart);
   618    636           }
          637  +        rc = sqlite3OsRead(
          638  +            pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
          639  +        );
          640  +        assert( rc!=SQLITE_IOERR_SHORT_READ );
   619    641         }
   620    642       }
   621    643     }
   622    644   
   623    645     if( rc==SQLITE_OK ){
   624    646       u64 nByte;                    /* Size of PMA in bytes */
   625    647       pIter->iEof = pFile->iEof;
................................................................................
   801    823     }
   802    824     pTask->pList = 0;
   803    825     if( pTask->file.pFd ){
   804    826       sqlite3OsCloseFree(pTask->file.pFd);
   805    827       pTask->file.pFd = 0;
   806    828       pTask->file.iEof = 0;
   807    829     }
          830  +  if( pTask->file2.pFd ){
          831  +    sqlite3OsCloseFree(pTask->file2.pFd);
          832  +    pTask->file2.pFd = 0;
          833  +    pTask->file2.iEof = 0;
          834  +  }
   808    835   }
   809    836   
   810    837   /*
   811    838   ** Join all threads.  
   812    839   */
   813    840   #if SQLITE_MAX_WORKER_THREADS>0
   814    841   static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
................................................................................
   835    862   ** Allocate a new MergeEngine object with space for nIter iterators.
   836    863   */
   837    864   static MergeEngine *vdbeMergeEngineNew(int nIter){
   838    865     int N = 2;                      /* Smallest power of two >= nIter */
   839    866     int nByte;                      /* Total bytes of space to allocate */
   840    867     MergeEngine *pNew;              /* Pointer to allocated object to return */
   841    868   
   842         -  /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */
          869  +  assert( nIter<=SORTER_MAX_MERGE_COUNT );
   843    870   
   844    871     while( N<nIter ) N += N;
   845    872     nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
   846    873   
   847    874     pNew = (MergeEngine*)sqlite3MallocZero(nByte);
   848    875     if( pNew ){
   849    876       pNew->nTree = N;
................................................................................
   884    911     if( pSorter->aMemory==0 ){
   885    912       vdbeSorterRecordFree(0, pSorter->pRecord);
   886    913     }
   887    914     pSorter->pRecord = 0;
   888    915     pSorter->nInMemory = 0;
   889    916     pSorter->bUsePMA = 0;
   890    917     pSorter->iMemory = 0;
          918  +  pSorter->mxKeysize = 0;
   891    919     sqlite3DbFree(db, pSorter->pUnpacked);
   892    920     pSorter->pUnpacked = 0;
   893    921   }
   894    922   
   895    923   /*
   896    924   ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
   897    925   */
................................................................................
  1255   1283     fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
  1256   1284   }
  1257   1285   static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
  1258   1286     i64 t;
  1259   1287     sqlite3OsCurrentTimeInt64(db->pVfs, &t);
  1260   1288     fprintf(stderr, "%lld:X %s\n", t, zEvent);
  1261   1289   }
         1290  +static void vdbeSorterPopulateDebug(
         1291  +  SortSubtask *pTask,
         1292  +  const char *zEvent
         1293  +){
         1294  +  i64 t;
         1295  +  int iTask = (pTask - pTask->pSorter->aTask);
         1296  +  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
         1297  +  fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
         1298  +}
  1262   1299   #else
  1263   1300   # define vdbeSorterWorkDebug(x,y)
  1264   1301   # define vdbeSorterRewindDebug(x,y)
         1302  +# define vdbeSorterPopulateDebug(x,y)
  1265   1303   #endif
         1304  +
         1305  +static int vdbeSortAllocUnpacked(SortSubtask *pTask){
         1306  +  if( pTask->pUnpacked==0 ){
         1307  +    char *pFree;
         1308  +    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
         1309  +        pTask->pKeyInfo, 0, 0, &pFree
         1310  +    );
         1311  +    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
         1312  +    if( pFree==0 ) return SQLITE_NOMEM;
         1313  +    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
         1314  +    pTask->pUnpacked->errCode = 0;
         1315  +  }
         1316  +  return SQLITE_OK;
         1317  +}
  1266   1318   
  1267   1319   /*
  1268   1320   ** The main routine for sorter-thread operations.
  1269   1321   */
  1270   1322   static void *vdbeSortSubtaskMain(void *pCtx){
  1271   1323     int rc = SQLITE_OK;
  1272   1324     SortSubtask *pTask = (SortSubtask*)pCtx;
................................................................................
  1275   1327          || pTask->eWork==SORT_SUBTASK_TO_PMA
  1276   1328          || pTask->eWork==SORT_SUBTASK_CONS
  1277   1329     );
  1278   1330     assert( pTask->bDone==0 );
  1279   1331   
  1280   1332     vdbeSorterWorkDebug(pTask, "enter");
  1281   1333   
  1282         -  if( pTask->pUnpacked==0 ){
  1283         -    char *pFree;
  1284         -    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
  1285         -        pTask->pKeyInfo, 0, 0, &pFree
  1286         -    );
  1287         -    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
  1288         -    if( pFree==0 ){
  1289         -      rc = SQLITE_NOMEM;
  1290         -      goto thread_out;
  1291         -    }
  1292         -    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
  1293         -    pTask->pUnpacked->errCode = 0;
  1294         -  }
         1334  +  rc = vdbeSortAllocUnpacked(pTask);
         1335  +  if( rc!=SQLITE_OK ) goto thread_out;
  1295   1336   
  1296   1337     if( pTask->eWork==SORT_SUBTASK_CONS ){
  1297   1338       assert( pTask->pList==0 );
  1298   1339       while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
  1299   1340         int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
  1300   1341         sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
  1301   1342         MergeEngine *pMerger;         /* Object for reading/merging PMA data */
................................................................................
  1529   1570         pSorter->nInMemory = 0;
  1530   1571         pSorter->iMemory = 0;
  1531   1572         assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
  1532   1573       }
  1533   1574     }
  1534   1575   
  1535   1576     pSorter->nInMemory += nPMA;
         1577  +  if( nPMA>pSorter->mxKeysize ){
         1578  +    pSorter->mxKeysize = nPMA;
         1579  +  }
  1536   1580   
  1537   1581     if( pSorter->aMemory ){
  1538   1582       int nMin = pSorter->iMemory + nReq;
  1539   1583   
  1540   1584       if( nMin>pSorter->nMemory ){
  1541   1585         u8 *aNew;
  1542   1586         int nNew = pSorter->nMemory * 2;
................................................................................
  1587   1631   ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
  1588   1632   ** of the data stored in aFile[1] is the same as that used by regular PMAs,
  1589   1633   ** except that the number-of-bytes varint is omitted from the start.
  1590   1634   */
  1591   1635   static int vdbeIncrPopulate(IncrMerger *pIncr){
  1592   1636     int rc = SQLITE_OK;
  1593   1637     int rc2;
         1638  +  i64 iStart = pIncr->iStartOff;
  1594   1639     SorterFile *pOut = &pIncr->aFile[1];
  1595   1640     MergeEngine *pMerger = pIncr->pMerger;
  1596   1641     PmaWriter writer;
  1597   1642     assert( pIncr->bEof==0 );
  1598   1643   
  1599         -  vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0);
         1644  +  vdbeSorterPopulateDebug(pIncr->pTask, "enter");
         1645  +
         1646  +  vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
  1600   1647     while( rc==SQLITE_OK ){
  1601   1648       int dummy;
  1602   1649       PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
  1603   1650       int nKey = pReader->nKey;
  1604   1651       i64 iEof = writer.iWriteOff + writer.iBufEnd;
  1605   1652   
  1606   1653       /* Check if the output file is full or if the input has been exhausted.
  1607   1654       ** In either case exit the loop. */
  1608   1655       if( pReader->pFile==0 ) break;
  1609         -    if( iEof && (iEof + nKey)>pIncr->mxSz ) break;
         1656  +    if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
  1610   1657   
  1611   1658       /* Write the next key to the output. */
  1612   1659       vdbePmaWriteVarint(&writer, nKey);
  1613   1660       vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
  1614   1661       rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
  1615   1662     }
  1616   1663   
  1617   1664     rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  1618   1665     if( rc==SQLITE_OK ) rc = rc2;
         1666  +  vdbeSorterPopulateDebug(pIncr->pTask, "exit");
  1619   1667     return rc;
  1620   1668   }
  1621   1669   
  1622   1670   static void *vdbeIncrPopulateThreadMain(void *pCtx){
  1623   1671     IncrMerger *pIncr = (IncrMerger*)pCtx;
  1624   1672     return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
  1625   1673   }
  1626   1674   
  1627   1675   static int vdbeIncrBgPopulate(IncrMerger *pIncr){
  1628   1676     int rc;
  1629   1677     assert( pIncr->pThread==0 );
  1630         -  if( pIncr->pTask->pSorter->bUseThreads==0 ){
         1678  +  if( pIncr->bUseThread==0 ){
  1631   1679       rc = vdbeIncrPopulate(pIncr);
  1632         -  }else{
         1680  +  }
         1681  +#if SQLITE_MAX_WORKER_THREADS>0
         1682  +  else{
  1633   1683       void *pCtx = (void*)pIncr;
  1634   1684       rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
  1635   1685     }
         1686  +#endif
  1636   1687     return rc;
  1637   1688   }
  1638   1689   
  1639   1690   static int vdbeIncrSwap(IncrMerger *pIncr){
  1640   1691     int rc = SQLITE_OK;
  1641         -  
  1642         -  if( pIncr->pThread ){
  1643         -    void *pRet;
  1644         -    rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
  1645         -    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
  1646         -    pIncr->pThread = 0;
  1647         -  }
  1648   1692   
  1649         -  if( rc==SQLITE_OK ){
  1650         -    SorterFile f0 = pIncr->aFile[0];
         1693  +  if( pIncr->bUseThread ){
         1694  +#if SQLITE_MAX_WORKER_THREADS>0
         1695  +    if( pIncr->pThread ){
         1696  +      void *pRet;
         1697  +      assert( pIncr->bUseThread );
         1698  +      rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
         1699  +      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
         1700  +      pIncr->pThread = 0;
         1701  +    }
         1702  +#endif
         1703  +
         1704  +    if( rc==SQLITE_OK ){
         1705  +      SorterFile f0 = pIncr->aFile[0];
         1706  +      pIncr->aFile[0] = pIncr->aFile[1];
         1707  +      pIncr->aFile[1] = f0;
         1708  +    }
         1709  +
         1710  +    if( rc==SQLITE_OK ){
         1711  +      if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
         1712  +        pIncr->bEof = 1;
         1713  +      }else{
         1714  +        rc = vdbeIncrBgPopulate(pIncr);
         1715  +      }
         1716  +    }
         1717  +  }else{
         1718  +    rc = vdbeIncrPopulate(pIncr);
  1651   1719       pIncr->aFile[0] = pIncr->aFile[1];
  1652         -    pIncr->aFile[1] = f0;
  1653         -
  1654         -    if( pIncr->aFile[0].iEof==0 ){
         1720  +    if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
  1655   1721         pIncr->bEof = 1;
  1656         -    }else{
  1657         -      rc = vdbeIncrBgPopulate(pIncr);
  1658   1722       }
  1659   1723     }
  1660   1724   
  1661   1725     return rc;
  1662   1726   }
  1663   1727   
  1664   1728   static void vdbeIncrFree(IncrMerger *pIncr){
  1665         -  if( pIncr->pThread ){
  1666         -    void *pRet;
  1667         -    sqlite3ThreadJoin(pIncr->pThread, &pRet);
  1668         -  }
  1669         -  if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
  1670         -  if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
  1671         -  vdbeMergeEngineFree(pIncr->pMerger);
  1672         -  sqlite3_free(pIncr);
         1729  +  if( pIncr ){
         1730  +#if SQLITE_MAX_WORKER_THREADS>0
         1731  +    if( pIncr->pThread ){
         1732  +      void *pRet;
         1733  +      sqlite3ThreadJoin(pIncr->pThread, &pRet);
         1734  +    }
         1735  +    if( pIncr->bUseThread ){
         1736  +      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
         1737  +      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
         1738  +    }
         1739  +#endif
         1740  +    vdbeMergeEngineFree(pIncr->pMerger);
         1741  +    sqlite3_free(pIncr);
         1742  +  }
         1743  +}
         1744  +
         1745  +static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
         1746  +  IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
         1747  +  if( pIncr ){
         1748  +    memset(pIncr, 0, sizeof(IncrMerger));
         1749  +    pIncr->pMerger = pMerger;
         1750  +    pIncr->pTask = pTask;
         1751  +    pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
         1752  +    pTask->file2.iEof += pIncr->mxSz;
         1753  +
         1754  +#if 0
         1755  +    /* Open the two temp files. */
         1756  +    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
         1757  +    if( rc==SQLITE_OK ){
         1758  +      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
         1759  +    }
         1760  +    if( rc!=SQLITE_OK ){
         1761  +      vdbeIncrFree(pIncr);
         1762  +      pIncr = 0;
         1763  +    }
         1764  +#endif
         1765  +  }
         1766  +  return pIncr;
         1767  +}
         1768  +
         1769  +static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
         1770  +  if( bUseThread ){
         1771  +    pIncr->bUseThread = 1;
         1772  +    pIncr->pTask->file2.iEof -= pIncr->mxSz;
         1773  +  }
         1774  +}
         1775  +
         1776  +static int vdbeIncrInit2(PmaReader *pIter){
         1777  +  int rc = SQLITE_OK;
         1778  +  IncrMerger *pIncr = pIter->pIncr;
         1779  +  if( pIncr ){
         1780  +    SortSubtask *pTask = pIncr->pTask;
         1781  +    int i;
         1782  +    MergeEngine *pMerger = pIncr->pMerger;
         1783  +
         1784  +    for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
         1785  +      rc = vdbeIncrInit2(&pMerger->aIter[i]);
         1786  +    }
         1787  +    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
         1788  +      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
         1789  +    }
         1790  +
         1791  +    /* Set up the required files for pIncr */
         1792  +    if( rc==SQLITE_OK ){
         1793  +      if( pIncr->bUseThread==0 ){
         1794  +        if( pTask->file2.pFd==0 ){
         1795  +          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
         1796  +          assert( pTask->file2.iEof>0 );
         1797  +          if( rc==SQLITE_OK ){
         1798  +            vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
         1799  +            pTask->file2.iEof = 0;
         1800  +          }
         1801  +        }
         1802  +        if( rc==SQLITE_OK ){
         1803  +          pIncr->aFile[1].pFd = pTask->file2.pFd;
         1804  +          pIncr->iStartOff = pTask->file2.iEof;
         1805  +          pTask->file2.iEof += pIncr->mxSz;
         1806  +        }
         1807  +      }else{
         1808  +        rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
         1809  +        if( rc==SQLITE_OK ){
         1810  +          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
         1811  +        }
         1812  +      }
         1813  +    }
         1814  +
         1815  +    if( rc==SQLITE_OK && pIncr->bUseThread ){
         1816  +      rc = vdbeIncrBgPopulate(pIncr);
         1817  +    }
         1818  +
         1819  +    if( rc==SQLITE_OK ){
         1820  +      rc = vdbePmaReaderNext(pIter);
         1821  +    }
         1822  +  }
         1823  +  return rc;
         1824  +}
         1825  +
         1826  +/*
         1827  +** Allocate a new MergeEngine object to merge the contents of nPMA level-0
         1828  +** PMAs from pTask->file. If no error occurs, set *ppOut to point to
         1829  +** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
         1830  +** to NULL and return an SQLite error code.
         1831  +**
         1832  +** When this function is called, *piOffset is set to the offset of the
         1833  +** first PMA to read from pTask->file. Assuming no error occurs, it is 
         1834  +** set to the offset immediately following the last byte of the last
         1835  +** PMA before returning. If an error does occur, then the final value of
         1836  +** *piOffset is undefined.
         1837  +*/
         1838  +static int vdbeMergeEngineLevel0(
         1839  +  SortSubtask *pTask,             /* Sorter task to read from */
         1840  +  int nPMA,                       /* Number of PMAs to read */
         1841  +  i64 *piOffset,                  /* IN/OUT: Read offset in pTask->file */
         1842  +  MergeEngine **ppOut             /* OUT: New merge-engine */
         1843  +){
         1844  +  MergeEngine *pNew;              /* Merge engine to return */
         1845  +  i64 iOff = *piOffset;
         1846  +  int i;
         1847  +  int rc = SQLITE_OK;
         1848  +
         1849  +  *ppOut = pNew = vdbeMergeEngineNew(nPMA);
         1850  +  if( pNew==0 ) rc = SQLITE_NOMEM;
         1851  +
         1852  +  for(i=0; i<nPMA && rc==SQLITE_OK; i++){
         1853  +    i64 nDummy;
         1854  +    PmaReader *pIter = &pNew->aIter[i];
         1855  +    rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pIter, &nDummy);
         1856  +    iOff = pIter->iEof;
         1857  +  }
         1858  +
         1859  +  if( rc!=SQLITE_OK ){
         1860  +    vdbeMergeEngineFree(pNew);
         1861  +    *ppOut = 0;
         1862  +  }
         1863  +  *piOffset = iOff;
         1864  +  return rc;
         1865  +}
         1866  +
         1867  +typedef struct IncrBuilder IncrBuilder;
         1868  +struct IncrBuilder {
         1869  +  int nPMA;                     /* Number of iterators used so far */
         1870  +  MergeEngine *pMerger;         /* Merge engine to populate. */
         1871  +};
         1872  +
         1873  +static int vdbeAddToBuilder(
         1874  +  SortSubtask *pTask,
         1875  +  IncrBuilder *pBuilder, 
         1876  +  MergeEngine *pMerger
         1877  +){
         1878  +  int rc = SQLITE_OK;
         1879  +  IncrMerger *pIncr;
         1880  +
         1881  +  assert( pMerger );
         1882  +  if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){
         1883  +    rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger);
         1884  +    pBuilder->pMerger = 0;
         1885  +    pBuilder->nPMA = 0;
         1886  +  }
         1887  +
         1888  +  if( rc==SQLITE_OK && pBuilder->pMerger==0 ){
         1889  +    pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
         1890  +    if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM;
         1891  +  }
         1892  +
         1893  +  if( rc==SQLITE_OK ){
         1894  +    pIncr = vdbeIncrNew(pTask, pMerger);
         1895  +    if( pIncr==0 ) rc = SQLITE_NOMEM;
         1896  +    pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr;
         1897  +  }
         1898  +
         1899  +  if( rc!=SQLITE_OK ){
         1900  +    vdbeMergeEngineFree(pMerger);
         1901  +  }
         1902  +
         1903  +  return rc;
  1673   1904   }
  1674   1905   
  1675   1906   /*
  1676   1907   ** Populate iterator *pIter so that it may be used to iterate through all 
  1677         -** keys stored in subtask pTask using the incremental merge method.
         1908  +** keys stored in all PMAs created by this sorter.
  1678   1909   */
  1679   1910   static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
  1680   1911     SortSubtask *pTask0 = &pSorter->aTask[0];
         1912  +  MergeEngine *pMain = 0;
         1913  +  sqlite3 *db = pTask0->db;
  1681   1914     int rc = SQLITE_OK;
  1682         -  MergeEngine *pMerger = 0;
  1683         -  IncrMerger *pIncr = 0;
  1684         -  int i;
  1685         -  int nPMA = 0;
         1915  +  int iTask;
  1686   1916   
  1687         -  for(i=0; i<pSorter->nTask; i++){
  1688         -    nPMA += pSorter->aTask[i].nPMA;
         1917  +  IncrBuilder *aMerge;
         1918  +  const int nMerge = 32;
         1919  +  aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
         1920  +  if( aMerge==0 ) return SQLITE_NOMEM;
         1921  +
         1922  +  if( pSorter->nTask>1 ){
         1923  +    pMain = vdbeMergeEngineNew(pSorter->nTask);
         1924  +    if( pMain==0 ) rc = SQLITE_NOMEM;
  1689   1925     }
  1690         -  pMerger = vdbeMergeEngineNew(nPMA);
  1691         -  if( pMerger==0 ){
  1692         -    rc = SQLITE_NOMEM;
  1693         -  }else{
  1694         -    int iIter = 0;
         1926  +
         1927  +  for(iTask=0; iTask<pSorter->nTask && rc==SQLITE_OK; iTask++){
         1928  +    MergeEngine *pRoot = 0;
  1695   1929       int iPMA;
  1696         -    for(i=0; i<pSorter->nTask; i++){
  1697         -      i64 iReadOff = 0;
  1698         -      SortSubtask *pTask = &pSorter->aTask[i];
  1699         -      for(iPMA=0; iPMA<pTask->nPMA; iPMA++){
  1700         -        i64 nDummy = 0;
  1701         -        PmaReader *pIter = &pMerger->aIter[iIter++];
  1702         -        rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy);
  1703         -        iReadOff = pIter->iEof;
         1930  +    i64 iReadOff = 0;
         1931  +    SortSubtask *pTask = &pSorter->aTask[iTask];
         1932  +    if( pTask->nPMA==0 ) continue;
         1933  +    for(iPMA=0; iPMA<pTask->nPMA; iPMA += SORTER_MAX_MERGE_COUNT){
         1934  +      MergeEngine *pMerger = 0;
         1935  +      int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT);
         1936  +
         1937  +      rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
         1938  +      if( rc!=SQLITE_OK ) break;
         1939  +
         1940  +      if( iPMA==0 ){
         1941  +        pRoot = pMerger;
         1942  +      }else{
         1943  +        if( pRoot ){
         1944  +          rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot);
         1945  +          pRoot = 0;
         1946  +          if( rc!=SQLITE_OK ){
         1947  +            vdbeMergeEngineFree(pMerger);
         1948  +            break;
         1949  +          }
         1950  +        }
         1951  +        rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger);
         1952  +      }
         1953  +    }
         1954  +
         1955  +    if( pRoot==0 ){
         1956  +      int i;
         1957  +      for(i=0; rc==SQLITE_OK && i<nMerge; i++){
         1958  +        if( aMerge[i].pMerger ){
         1959  +          if( pRoot ){
         1960  +            rc = vdbeAddToBuilder(pTask, &aMerge[i], pRoot);
         1961  +            if( rc!=SQLITE_OK ) break;
         1962  +          }
         1963  +          pRoot = aMerge[i].pMerger;
         1964  +          aMerge[i].pMerger = 0;
         1965  +        }
  1704   1966         }
  1705   1967       }
  1706         -    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
  1707         -      rc = vdbeSorterDoCompare(pTask0, pMerger, i);
         1968  +
         1969  +    if( rc==SQLITE_OK ){
         1970  +      if( pMain==0 ){
         1971  +        pMain = pRoot;
         1972  +      }else{
         1973  +        IncrMerger *pNew = vdbeIncrNew(pTask, pRoot);
         1974  +        pMain->aIter[iTask].pIncr = pNew;
         1975  +        if( pNew==0 ) rc = SQLITE_NOMEM;
         1976  +      }
         1977  +      memset(aMerge, 0, nMerge*sizeof(aMerge[0]));
  1708   1978       }
  1709   1979     }
  1710   1980   
  1711   1981     if( rc==SQLITE_OK ){
  1712         -    pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
  1713         -    if( pIncr==0 ){
  1714         -      rc = SQLITE_NOMEM;
  1715         -    }else{
  1716         -      memset(pIncr, 0, sizeof(IncrMerger));
  1717         -      pIncr->mxSz = (pSorter->mxPmaSize / 2);
  1718         -      pIncr->pMerger = pMerger;
  1719         -      pIncr->pTask = pTask0;
         1982  +    SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
         1983  +
         1984  +    rc = vdbeSortAllocUnpacked(pLast);
         1985  +    if( rc==SQLITE_OK ){
         1986  +      pIter->pIncr = vdbeIncrNew(pLast, pMain);
         1987  +      if( pIter->pIncr==0 ){
         1988  +        rc = SQLITE_NOMEM;
         1989  +      }else{
         1990  +        vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
         1991  +        for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
         1992  +          IncrMerger *pIncr;
         1993  +          if( (pIncr = pMain->aIter[iTask].pIncr) ){
         1994  +            vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
         1995  +            assert( pIncr->pTask!=pLast );
         1996  +          }
         1997  +        }
         1998  +      }
  1720   1999       }
  1721   2000     }
  1722         -
  1723         -  /* Open the two temp files. */
  1724   2001     if( rc==SQLITE_OK ){
  1725         -    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd);
  1726         -  }
  1727         -  if( rc==SQLITE_OK ){
  1728         -    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd);
         2002  +    rc = vdbeIncrInit2(pIter);
  1729   2003     }
  1730   2004   
  1731         -  /* Launch a background thread to populate aFile[1]. */
  1732         -  if( rc==SQLITE_OK ){
  1733         -    rc = vdbeIncrBgPopulate(pIncr);
  1734         -  }
  1735         -
  1736         -  pIter->pIncr = pIncr;
  1737         -  if( rc==SQLITE_OK ){
  1738         -    rc = vdbePmaReaderNext(pIter);
  1739         -  }
         2005  +  sqlite3_free(aMerge);
  1740   2006     return rc;
  1741   2007   }
  1742   2008   
  1743   2009   
  1744   2010   /*
  1745   2011   ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
  1746   2012   ** this function is called to prepare for iterating through the records

Changes to test/sort2.test.

    11     11   # This file implements regression tests for SQLite library. 
    12     12   #
    13     13   
    14     14   set testdir [file dirname $argv0]
    15     15   source $testdir/tester.tcl
    16     16   set testprefix sort2
    17     17   
    18         -db close
    19         -sqlite3_shutdown
    20         -sqlite3_config_worker_threads 7
    21         -reset_db
    22         -
    23         -do_execsql_test 1 {
    24         -  PRAGMA cache_size = 5;
    25         -  WITH r(x,y) AS (
    26         -    SELECT 1, randomblob(100)
    27         -    UNION ALL
    28         -    SELECT x+1, randomblob(100) FROM r
    29         -    LIMIT 100000
    30         -  )
    31         -  SELECT count(x), length(y) FROM r GROUP BY (x%5)
           18  +foreach {tn script} {
           19  +  1 { }
           20  +  2 {
           21  +    catch { db close }
           22  +    sqlite3_shutdown
           23  +    sqlite3_config_worker_threads 7
           24  +    reset_db
           25  +  }
    32     26   } {
    33         -  20000 100 20000 100 20000 100 20000 100 20000 100
    34         -}
           27  +
           28  +  eval $script
           29  +
           30  +  do_execsql_test $tn.1 {
           31  +    PRAGMA cache_size = 5;
           32  +    WITH r(x,y) AS (
           33  +      SELECT 1, randomblob(100)
           34  +      UNION ALL
           35  +      SELECT x+1, randomblob(100) FROM r
           36  +      LIMIT 100000
           37  +    )
           38  +    SELECT count(x), length(y) FROM r GROUP BY (x%5)
           39  +  } {
           40  +    20000 100 20000 100 20000 100 20000 100 20000 100
           41  +  }
    35     42   
    36         -do_execsql_test 2.1 {
    37         -  CREATE TABLE t1(a, b);
    38         -  WITH r(x,y) AS (
    39         -    SELECT 1, randomblob(100)
    40         -    UNION ALL
    41         -    SELECT x+1, randomblob(100) FROM r
    42         -    LIMIT 10000
    43         -  ) INSERT INTO t1 SELECT * FROM r;
           43  +  do_execsql_test $tn.2.1 {
           44  +    CREATE TABLE t1(a, b);
           45  +    WITH r(x,y) AS (
           46  +      SELECT 1, randomblob(100)
           47  +      UNION ALL
           48  +      SELECT x+1, randomblob(100) FROM r
           49  +      LIMIT 10000
           50  +    ) INSERT INTO t1 SELECT * FROM r;
           51  +  }
           52  +  
           53  +  do_execsql_test $tn.2.2 {
           54  +    CREATE UNIQUE INDEX i1 ON t1(b, a);
           55  +  }
           56  +  
           57  +  do_execsql_test $tn.2.3 {
           58  +    CREATE UNIQUE INDEX i2 ON t1(a);
           59  +  }
           60  +  
           61  +  do_execsql_test $tn.2.4 { PRAGMA integrity_check } {ok}
           62  +  
           63  +  breakpoint
           64  +  do_execsql_test $tn.3 {
           65  +    PRAGMA cache_size = 5;
           66  +    WITH r(x,y) AS (
           67  +      SELECT 1, randomblob(100)
           68  +      UNION ALL
           69  +      SELECT x+1, randomblob(100) FROM r
           70  +      LIMIT 1000000
           71  +    )
           72  +    SELECT count(x), length(y) FROM r GROUP BY (x%5)
           73  +  } {
           74  +    200000 100 200000 100 200000 100 200000 100 200000 100
           75  +  }
           76  +  
           77  +  db close
           78  +  sqlite3_shutdown
           79  +  sqlite3_config_worker_threads 0
           80  +  sqlite3_initialize
           81  +
    44     82   }
    45     83   
    46         -do_execsql_test 2.2 {
    47         -  CREATE UNIQUE INDEX i1 ON t1(b, a);
    48         -}
    49         -
    50         -do_execsql_test 2.3 {
    51         -  CREATE UNIQUE INDEX i2 ON t1(a);
    52         -}
    53         -
    54         -do_execsql_test 2.4 { PRAGMA integrity_check } {ok}
    55         -
    56         -db close
    57         -sqlite3_shutdown
    58         -sqlite3_config_worker_threads 0
    59         -sqlite3_initialize
    60     84   finish_test
    61     85