SQLite

Check-in [98bf0307b1]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Avoid having the sorter merge too many PMAs at a time when incrementally merging data following a SorterRewind().
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | threads-experimental
Files: files | file ages | folders
SHA1: 98bf0307b121b0776a7170108cc8d3f948a7ebfe
User & Date: dan 2014-04-11 19:43:07.755
Context
2014-04-12
19:34
Fix many issues with new code. (check-in: 62c406a042 user: dan tags: threads-experimental)
2014-04-11
19:43
Avoid having the sorter merge too many PMAs at a time when incrementally merging data following a SorterRewind(). (check-in: 98bf0307b1 user: dan tags: threads-experimental)
2014-04-09
20:04
Experimental multi-threaded sorting changes to allow the sorter to begin returning items to the VDBE before all data is sorted. (check-in: f9d5e09afa user: dan tags: threads-experimental)
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/shell.c.
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
  memcpy(data->separator,"|", 2);
  data->showHeader = 0;
  sqlite3_config(SQLITE_CONFIG_URI, 1);
  sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data);
  sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
  sqlite3_snprintf(sizeof(continuePrompt), continuePrompt,"   ...> ");
  sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3);
}

/*
** Output text to the console in a font that attracts extra attention.
*/
#ifdef _WIN32
static void printBold(const char *zText){







|







3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
  memcpy(data->separator,"|", 2);
  data->showHeader = 0;
  sqlite3_config(SQLITE_CONFIG_URI, 1);
  sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data);
  sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
  sqlite3_snprintf(sizeof(continuePrompt), continuePrompt,"   ...> ");
  sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4);
}

/*
** Output text to the console in a font that attracts extra attention.
*/
#ifdef _WIN32
static void printBold(const char *zText){
Changes to src/vdbesort.c.
160
161
162
163
164
165
166
167

168
169
170
171
172
173
174
  u8 eWork;                       /* One of the SORT_SUBTASK_* constants */
  int nConsolidate;               /* For SORT_SUBTASK_CONS, max final PMAs */
  SorterRecord *pList;            /* List of records for pTask to sort */
  int nInMemory;                  /* Expected size of PMA based on pList */
  u8 *aListMemory;                /* Records memory (or NULL) */

  int nPMA;                       /* Number of PMAs currently in file */
  SorterFile file;

};


/*
** The MergeEngine object is used to combine two or more smaller PMAs into
** one big PMA using a merge operation.  Separate PMAs all need to be
** combined into one big PMA in order to be able to step through the sorted







|
>







160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
  u8 eWork;                       /* One of the SORT_SUBTASK_* constants */
  int nConsolidate;               /* For SORT_SUBTASK_CONS, max final PMAs */
  SorterRecord *pList;            /* List of records for pTask to sort */
  int nInMemory;                  /* Expected size of PMA based on pList */
  u8 *aListMemory;                /* Records memory (or NULL) */

  int nPMA;                       /* Number of PMAs currently in file */
  SorterFile file;                /* Temp file for level-0 PMAs */
  SorterFile file2;               /* Space for other PMAs */
};


/*
** The MergeEngine object is used to combine two or more smaller PMAs into
** one big PMA using a merge operation.  Separate PMAs all need to be
** combined into one big PMA in order to be able to step through the sorted
236
237
238
239
240
241
242





243
244
245
246
247
248
249
250
251

252
253
254
255
256
257
258
  int *aTree;                /* Current state of incremental merge */
  PmaReader *aIter;          /* Array of iterators to merge data from */
};

/*
** Main sorter structure. A single instance of this is allocated for each 
** sorter cursor created by the VDBE.





*/
struct VdbeSorter {
  int nInMemory;                  /* Current size of pRecord list as PMA */
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  int bUsePMA;                    /* True if one or more PMAs created */
  int bUseThreads;                /* True if one or more PMAs created */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  PmaReader *pReader;             /* Read data from here after Rewind() */

  UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */
  int iPrev;                      /* Previous thread used to flush PMA */
  int nTask;                      /* Size of aTask[] array */
  SortSubtask aTask[1];           /* One or more subtasks */







>
>
>
>
>









>







237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
  int *aTree;                /* Current state of incremental merge */
  PmaReader *aIter;          /* Array of iterators to merge data from */
};

/*
** Main sorter structure. A single instance of this is allocated for each 
** sorter cursor created by the VDBE.
**
** mxKeysize:
**   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
**   this variable is updated so as to be set to the size on disk of the
**   largest record in the sorter.
*/
struct VdbeSorter {
  int nInMemory;                  /* Current size of pRecord list as PMA */
  int mnPmaSize;                  /* Minimum PMA size, in bytes */
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  int bUsePMA;                    /* True if one or more PMAs created */
  int bUseThreads;                /* True if one or more PMAs created */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  PmaReader *pReader;             /* Read data from here after Rewind() */
  int mxKeysize;                  /* Largest serialized key seen so far */
  UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */
  int iPrev;                      /* Previous thread used to flush PMA */
  int nTask;                      /* Size of aTask[] array */
  SortSubtask aTask[1];           /* One or more subtasks */
273
274
275
276
277
278
279






280
281
282




283

284
285
286
287
288
289
290
291
292
293
  u8 *aKey;                       /* Pointer to current key */
  u8 *aBuffer;                    /* Current read buffer */
  int nBuffer;                    /* Size of read buffer in bytes */
  u8 *aMap;                       /* Pointer to mapping of entire file */
  IncrMerger *pIncr;              /* Incremental merger */
};







struct IncrMerger {
  int mxSz;                       /* Maximum size of files */
  SortSubtask *pTask;             /* Task that owns this merger */




  int bEof;                       /* Set to true when merge is finished */

  SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
  MergeEngine *pMerger;           /* Merge engine thread reads data from */
  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
};

/*
** An instance of this object is used for writing a PMA.
**
** The PMA is written one record at a time.  Each record is of an arbitrary
** size.  But I/O is more efficient if it occurs in page-sized blocks where







>
>
>
>
>
>

<

>
>
>
>

>

<
<







280
281
282
283
284
285
286
287
288
289
290
291
292
293

294
295
296
297
298
299
300
301


302
303
304
305
306
307
308
  u8 *aKey;                       /* Pointer to current key */
  u8 *aBuffer;                    /* Current read buffer */
  int nBuffer;                    /* Size of read buffer in bytes */
  u8 *aMap;                       /* Pointer to mapping of entire file */
  IncrMerger *pIncr;              /* Incremental merger */
};

/*
** Normally, a PmaReader object iterates through an existing PMA stored 
** within a temp file. However, if the PmaReader.pIncr variable points to
** an object of the following type, it may be used to iterate/merge through
** multiple PMAs simultaneously.
*/
struct IncrMerger {

  SortSubtask *pTask;             /* Task that owns this merger */
  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
  MergeEngine *pMerger;           /* Merge engine thread reads data from */
  i64 iStartOff;                  /* Offset to start writing file at */
  int mxSz;                       /* Maximum bytes of data to store */
  int bEof;                       /* Set to true when merge is finished */
  int bUseThread;                 /* True to use a bg thread for this object */
  SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */


};

/*
** An instance of this object is used for writing a PMA.
**
** The PMA is written one record at a time.  Each record is of an arbitrary
** size.  But I/O is more efficient if it occurs in page-sized blocks where
502
503
504
505
506
507
508
509
510
511
512
513
514
515



516
517
518











519
520
521
522
523
524
525

  assert( pIncr->bEof==0 );

  if( pIter->aMap ){
    sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
    pIter->aMap = 0;
  }
  pIter->iReadOff = 0;
  pIter->iEof = pIncr->aFile[0].iEof;
  pIter->pFile = pIncr->aFile[0].pFd;

  rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
  if( rc==SQLITE_OK ){
    if( pIter->aMap==0 && pIter->aBuffer==0 ){



      pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
      if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
      pIter->nBuffer = pTask->pgsz;











    }
  }

  return rc;
}









|





|
>
>
>
|
|
|
>
>
>
>
>
>
>
>
>
>
>







517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554

  assert( pIncr->bEof==0 );

  if( pIter->aMap ){
    sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
    pIter->aMap = 0;
  }
  pIter->iReadOff = pIncr->iStartOff;
  pIter->iEof = pIncr->aFile[0].iEof;
  pIter->pFile = pIncr->aFile[0].pFd;

  rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
  if( rc==SQLITE_OK ){
    if( pIter->aMap==0 ){
      /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
      int iBuf = pIter->iReadOff % pTask->pgsz;
      if( pIter->aBuffer==0 ){
        pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
        if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
        pIter->nBuffer = pTask->pgsz;
      }
      if( iBuf ){
        int nRead = pTask->pgsz - iBuf;
        if( (pIter->iReadOff + nRead) > pIter->iEof ){
          nRead = (int)(pIter->iEof - pIter->iReadOff);
        }
        rc = sqlite3OsRead(
            pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
        );
        assert( rc!=SQLITE_IOERR_SHORT_READ );
      }
    }
  }

  return rc;
}


573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
  SorterFile *pFile,              /* Sorter file to read from */
  i64 iStart,                     /* Start offset in pFile */
  PmaReader *pIter,               /* Iterator to populate */
  i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
){
  int rc = SQLITE_OK;
  int nBuf = pTask->pgsz;
  void *pMap = 0;                 /* Mapping of temp file */

  assert( pFile->iEof>iStart );
  assert( pIter->aAlloc==0 );
  assert( pIter->aBuffer==0 );
  pIter->pFile = pFile->pFd;
  pIter->iReadOff = iStart;
  pIter->nAlloc = 128;
  pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
  if( pIter->aAlloc ){
    /* Try to xFetch() a mapping of the entire temp file. If this is possible,
    ** the PMA will be read via the mapping. Otherwise, use xRead().  */
    if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
      rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap);
    }
  }else{
    rc = SQLITE_NOMEM;
  }

  if( rc==SQLITE_OK ){
    if( pMap ){
      pIter->aMap = (u8*)pMap;
    }else{
      pIter->nBuffer = nBuf;
      pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
      if( !pIter->aBuffer ){
        rc = SQLITE_NOMEM;
      }else{
        int iBuf = iStart % nBuf;
        if( iBuf ){
          int nRead = nBuf - iBuf;
          if( (iStart + nRead) > pFile->iEof ){
            nRead = (int)(pFile->iEof - iStart);
          }
          rc = sqlite3OsRead(
              pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
          );
          assert( rc!=SQLITE_IOERR_SHORT_READ );
        }
      }
    }
  }

  if( rc==SQLITE_OK ){
    u64 nByte;                    /* Size of PMA in bytes */
    pIter->iEof = pFile->iEof;







<











|
<
<




|
<
<
<
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<







602
603
604
605
606
607
608

609
610
611
612
613
614
615
616
617
618
619
620


621
622
623
624
625



626
627
628
629
630
631
632
633
634
635
636
637
638
639
640

641
642
643
644
645
646
647
  SorterFile *pFile,              /* Sorter file to read from */
  i64 iStart,                     /* Start offset in pFile */
  PmaReader *pIter,               /* Iterator to populate */
  i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
){
  int rc = SQLITE_OK;
  int nBuf = pTask->pgsz;


  assert( pFile->iEof>iStart );
  assert( pIter->aAlloc==0 );
  assert( pIter->aBuffer==0 );
  pIter->pFile = pFile->pFd;
  pIter->iReadOff = iStart;
  pIter->nAlloc = 128;
  pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
  if( pIter->aAlloc ){
    /* Try to xFetch() a mapping of the entire temp file. If this is possible,
    ** the PMA will be read via the mapping. Otherwise, use xRead().  */
    rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);


  }else{
    rc = SQLITE_NOMEM;
  }

  if( rc==SQLITE_OK && pIter->aMap==0 ){



    pIter->nBuffer = nBuf;
    pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
    if( !pIter->aBuffer ){
      rc = SQLITE_NOMEM;
    }else{
      int iBuf = iStart % nBuf;
      if( iBuf ){
        int nRead = nBuf - iBuf;
        if( (iStart + nRead) > pFile->iEof ){
          nRead = (int)(pFile->iEof - iStart);
        }
        rc = sqlite3OsRead(
            pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
        );
        assert( rc!=SQLITE_IOERR_SHORT_READ );

      }
    }
  }

  if( rc==SQLITE_OK ){
    u64 nByte;                    /* Size of PMA in bytes */
    pIter->iEof = pFile->iEof;
801
802
803
804
805
806
807





808
809
810
811
812
813
814
  }
  pTask->pList = 0;
  if( pTask->file.pFd ){
    sqlite3OsCloseFree(pTask->file.pFd);
    pTask->file.pFd = 0;
    pTask->file.iEof = 0;
  }





}

/*
** Join all threads.  
*/
#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){







>
>
>
>
>







823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
  }
  pTask->pList = 0;
  if( pTask->file.pFd ){
    sqlite3OsCloseFree(pTask->file.pFd);
    pTask->file.pFd = 0;
    pTask->file.iEof = 0;
  }
  if( pTask->file2.pFd ){
    sqlite3OsCloseFree(pTask->file2.pFd);
    pTask->file2.pFd = 0;
    pTask->file2.iEof = 0;
  }
}

/*
** Join all threads.  
*/
#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
** Allocate a new MergeEngine object with space for nIter iterators.
*/
static MergeEngine *vdbeMergeEngineNew(int nIter){
  int N = 2;                      /* Smallest power of two >= nIter */
  int nByte;                      /* Total bytes of space to allocate */
  MergeEngine *pNew;              /* Pointer to allocated object to return */

  /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */

  while( N<nIter ) N += N;
  nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));

  pNew = (MergeEngine*)sqlite3MallocZero(nByte);
  if( pNew ){
    pNew->nTree = N;







|







862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
** Allocate a new MergeEngine object with space for nIter iterators.
*/
static MergeEngine *vdbeMergeEngineNew(int nIter){
  int N = 2;                      /* Smallest power of two >= nIter */
  int nByte;                      /* Total bytes of space to allocate */
  MergeEngine *pNew;              /* Pointer to allocated object to return */

  assert( nIter<=SORTER_MAX_MERGE_COUNT );

  while( N<nIter ) N += N;
  nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));

  pNew = (MergeEngine*)sqlite3MallocZero(nByte);
  if( pNew ){
    pNew->nTree = N;
884
885
886
887
888
889
890

891
892
893
894
895
896
897
  if( pSorter->aMemory==0 ){
    vdbeSorterRecordFree(0, pSorter->pRecord);
  }
  pSorter->pRecord = 0;
  pSorter->nInMemory = 0;
  pSorter->bUsePMA = 0;
  pSorter->iMemory = 0;

  sqlite3DbFree(db, pSorter->pUnpacked);
  pSorter->pUnpacked = 0;
}

/*
** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
*/







>







911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
  if( pSorter->aMemory==0 ){
    vdbeSorterRecordFree(0, pSorter->pRecord);
  }
  pSorter->pRecord = 0;
  pSorter->nInMemory = 0;
  pSorter->bUsePMA = 0;
  pSorter->iMemory = 0;
  pSorter->mxKeysize = 0;
  sqlite3DbFree(db, pSorter->pUnpacked);
  pSorter->pUnpacked = 0;
}

/*
** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
*/
1255
1256
1257
1258
1259
1260
1261









1262
1263
1264

1265














1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
  fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
}
static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
  i64 t;
  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
  fprintf(stderr, "%lld:X %s\n", t, zEvent);
}









#else
# define vdbeSorterWorkDebug(x,y)
# define vdbeSorterRewindDebug(x,y)

#endif















/*
** The main routine for sorter-thread operations.
*/
static void *vdbeSortSubtaskMain(void *pCtx){
  int rc = SQLITE_OK;
  SortSubtask *pTask = (SortSubtask*)pCtx;

  assert( pTask->eWork==SORT_SUBTASK_SORT
       || pTask->eWork==SORT_SUBTASK_TO_PMA
       || pTask->eWork==SORT_SUBTASK_CONS
  );
  assert( pTask->bDone==0 );

  vdbeSorterWorkDebug(pTask, "enter");

  if( pTask->pUnpacked==0 ){
    char *pFree;
    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
        pTask->pKeyInfo, 0, 0, &pFree
    );
    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
    if( pFree==0 ){
      rc = SQLITE_NOMEM;
      goto thread_out;
    }
    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
    pTask->pUnpacked->errCode = 0;
  }

  if( pTask->eWork==SORT_SUBTASK_CONS ){
    assert( pTask->pList==0 );
    while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
      int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
      sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
      MergeEngine *pMerger;         /* Object for reading/merging PMA data */







>
>
>
>
>
>
>
>
>



>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
















<
<
|
<
<
<
<
<
|
<
<
<
<







1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333


1334





1335




1336
1337
1338
1339
1340
1341
1342
  fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
}
static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
  i64 t;
  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
  fprintf(stderr, "%lld:X %s\n", t, zEvent);
}
static void vdbeSorterPopulateDebug(
  SortSubtask *pTask,
  const char *zEvent
){
  i64 t;
  int iTask = (pTask - pTask->pSorter->aTask);
  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
  fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
}
#else
# define vdbeSorterWorkDebug(x,y)
# define vdbeSorterRewindDebug(x,y)
# define vdbeSorterPopulateDebug(x,y)
#endif

static int vdbeSortAllocUnpacked(SortSubtask *pTask){
  if( pTask->pUnpacked==0 ){
    char *pFree;
    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
        pTask->pKeyInfo, 0, 0, &pFree
    );
    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
    if( pFree==0 ) return SQLITE_NOMEM;
    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
    pTask->pUnpacked->errCode = 0;
  }
  return SQLITE_OK;
}

/*
** The main routine for sorter-thread operations.
*/
static void *vdbeSortSubtaskMain(void *pCtx){
  int rc = SQLITE_OK;
  SortSubtask *pTask = (SortSubtask*)pCtx;

  assert( pTask->eWork==SORT_SUBTASK_SORT
       || pTask->eWork==SORT_SUBTASK_TO_PMA
       || pTask->eWork==SORT_SUBTASK_CONS
  );
  assert( pTask->bDone==0 );

  vdbeSorterWorkDebug(pTask, "enter");



  rc = vdbeSortAllocUnpacked(pTask);





  if( rc!=SQLITE_OK ) goto thread_out;





  if( pTask->eWork==SORT_SUBTASK_CONS ){
    assert( pTask->pList==0 );
    while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
      int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
      sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
      MergeEngine *pMerger;         /* Object for reading/merging PMA data */
1529
1530
1531
1532
1533
1534
1535



1536
1537
1538
1539
1540
1541
1542
      pSorter->nInMemory = 0;
      pSorter->iMemory = 0;
      assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
    }
  }

  pSorter->nInMemory += nPMA;




  if( pSorter->aMemory ){
    int nMin = pSorter->iMemory + nReq;

    if( nMin>pSorter->nMemory ){
      u8 *aNew;
      int nNew = pSorter->nMemory * 2;







>
>
>







1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
      pSorter->nInMemory = 0;
      pSorter->iMemory = 0;
      assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
    }
  }

  pSorter->nInMemory += nPMA;
  if( nPMA>pSorter->mxKeysize ){
    pSorter->mxKeysize = nPMA;
  }

  if( pSorter->aMemory ){
    int nMin = pSorter->iMemory + nReq;

    if( nMin>pSorter->nMemory ){
      u8 *aNew;
      int nNew = pSorter->nMemory * 2;
1587
1588
1589
1590
1591
1592
1593

1594
1595
1596
1597
1598


1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618

1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631


1632
1633
1634
1635

1636
1637
1638
1639
1640
1641


1642
1643

1644
1645
1646
1647

1648
1649
1650
1651
1652
1653


1654
1655
1656
1657







1658
1659
1660
1661
1662
1663
1664


1665
1666
1667
1668

1669
1670


1671













































































1672





















































































1673
1674
1675
1676
1677
1678
1679
1680


1681













1682
1683
1684
1685





1686
1687
1688

1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700

1701
1702
1703
1704
1705
1706
1707
1708
1709








1710






1711
1712
1713
1714
1715
1716
1717
1718
1719
1720

1721




1722
1723

1724









1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739


1740
1741
1742
1743
1744
1745
1746
** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
** of the data stored in aFile[1] is the same as that used by regular PMAs,
** except that the number-of-bytes varint is omitted from the start.
*/
static int vdbeIncrPopulate(IncrMerger *pIncr){
  int rc = SQLITE_OK;
  int rc2;

  SorterFile *pOut = &pIncr->aFile[1];
  MergeEngine *pMerger = pIncr->pMerger;
  PmaWriter writer;
  assert( pIncr->bEof==0 );



  vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0);
  while( rc==SQLITE_OK ){
    int dummy;
    PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
    int nKey = pReader->nKey;
    i64 iEof = writer.iWriteOff + writer.iBufEnd;

    /* Check if the output file is full or if the input has been exhausted.
    ** In either case exit the loop. */
    if( pReader->pFile==0 ) break;
    if( iEof && (iEof + nKey)>pIncr->mxSz ) break;

    /* Write the next key to the output. */
    vdbePmaWriteVarint(&writer, nKey);
    vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
    rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
  }

  rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  if( rc==SQLITE_OK ) rc = rc2;

  return rc;
}

static void *vdbeIncrPopulateThreadMain(void *pCtx){
  IncrMerger *pIncr = (IncrMerger*)pCtx;
  return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
}

static int vdbeIncrBgPopulate(IncrMerger *pIncr){
  int rc;
  assert( pIncr->pThread==0 );
  if( pIncr->pTask->pSorter->bUseThreads==0 ){
    rc = vdbeIncrPopulate(pIncr);


  }else{
    void *pCtx = (void*)pIncr;
    rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
  }

  return rc;
}

static int vdbeIncrSwap(IncrMerger *pIncr){
  int rc = SQLITE_OK;
  


  if( pIncr->pThread ){
    void *pRet;

    rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
    pIncr->pThread = 0;
  }


  if( rc==SQLITE_OK ){
    SorterFile f0 = pIncr->aFile[0];
    pIncr->aFile[0] = pIncr->aFile[1];
    pIncr->aFile[1] = f0;



    if( pIncr->aFile[0].iEof==0 ){
      pIncr->bEof = 1;
    }else{
      rc = vdbeIncrBgPopulate(pIncr);







    }
  }

  return rc;
}

static void vdbeIncrFree(IncrMerger *pIncr){


  if( pIncr->pThread ){
    void *pRet;
    sqlite3ThreadJoin(pIncr->pThread, &pRet);
  }

  if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
  if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);


  vdbeMergeEngineFree(pIncr->pMerger);













































































  sqlite3_free(pIncr);





















































































}

/*
** Populate iterator *pIter so that it may be used to iterate through all 
** keys stored in subtask pTask using the incremental merge method.
*/
static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
  SortSubtask *pTask0 = &pSorter->aTask[0];


  int rc = SQLITE_OK;













  MergeEngine *pMerger = 0;
  IncrMerger *pIncr = 0;
  int i;
  int nPMA = 0;






  for(i=0; i<pSorter->nTask; i++){
    nPMA += pSorter->aTask[i].nPMA;

  }
  pMerger = vdbeMergeEngineNew(nPMA);
  if( pMerger==0 ){
    rc = SQLITE_NOMEM;
  }else{
    int iIter = 0;
    int iPMA;
    for(i=0; i<pSorter->nTask; i++){
      i64 iReadOff = 0;
      SortSubtask *pTask = &pSorter->aTask[i];
      for(iPMA=0; iPMA<pTask->nPMA; iPMA++){
        i64 nDummy = 0;

        PmaReader *pIter = &pMerger->aIter[iIter++];
        rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy);
        iReadOff = pIter->iEof;
      }
    }
    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
      rc = vdbeSorterDoCompare(pTask0, pMerger, i);
    }
  }















  if( rc==SQLITE_OK ){
    pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
    if( pIncr==0 ){
      rc = SQLITE_NOMEM;
    }else{
      memset(pIncr, 0, sizeof(IncrMerger));
      pIncr->mxSz = (pSorter->mxPmaSize / 2);
      pIncr->pMerger = pMerger;
      pIncr->pTask = pTask0;
    }

  }





  /* Open the two temp files. */

  if( rc==SQLITE_OK ){









    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd);
  }
  if( rc==SQLITE_OK ){
    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd);
  }

  /* Launch a background thread to populate aFile[1]. */
  if( rc==SQLITE_OK ){
    rc = vdbeIncrBgPopulate(pIncr);
  }

  pIter->pIncr = pIncr;
  if( rc==SQLITE_OK ){
    rc = vdbePmaReaderNext(pIter);
  }


  return rc;
}


/*
** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
** this function is called to prepare for iterating through the records







>





>
>
|









|









>











|

>
>
|



>





|
>
>
|
|
>
|
|
|
|
>

|
|
|
|
|
>
>
|
|
|
|
>
>
>
>
>
>
>







>
>
|
|
|
|
>
|
|
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>




|



>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
|
<
|
|
>
>
>
>
>

<
|
>
|
|
|
<
|
<
|
<
<
<
|
|
>
|
<
|
|
|
<
|
|
|
>
>
>
>
>
>
>
>
|
>
>
>
>
>
>
|
<
|
|
|
|
<
|
|
|
>
|
>
>
>
>

<
>
|
>
>
>
>
>
>
>
>
>
|
|
<
<
|
|
<
<
<
|
|
<

|

>
>







1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928

1929
1930
1931
1932
1933
1934
1935
1936

1937
1938
1939
1940
1941

1942

1943



1944
1945
1946
1947

1948
1949
1950

1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969

1970
1971
1972
1973

1974
1975
1976
1977
1978
1979
1980
1981
1982
1983

1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996


1997
1998



1999
2000

2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
** of the data stored in aFile[1] is the same as that used by regular PMAs,
** except that the number-of-bytes varint is omitted from the start.
*/
static int vdbeIncrPopulate(IncrMerger *pIncr){
  int rc = SQLITE_OK;
  int rc2;
  i64 iStart = pIncr->iStartOff;
  SorterFile *pOut = &pIncr->aFile[1];
  MergeEngine *pMerger = pIncr->pMerger;
  PmaWriter writer;
  assert( pIncr->bEof==0 );

  vdbeSorterPopulateDebug(pIncr->pTask, "enter");

  vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
  while( rc==SQLITE_OK ){
    int dummy;
    PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
    int nKey = pReader->nKey;
    i64 iEof = writer.iWriteOff + writer.iBufEnd;

    /* Check if the output file is full or if the input has been exhausted.
    ** In either case exit the loop. */
    if( pReader->pFile==0 ) break;
    if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;

    /* Write the next key to the output. */
    vdbePmaWriteVarint(&writer, nKey);
    vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
    rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
  }

  rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
  if( rc==SQLITE_OK ) rc = rc2;
  vdbeSorterPopulateDebug(pIncr->pTask, "exit");
  return rc;
}

static void *vdbeIncrPopulateThreadMain(void *pCtx){
  IncrMerger *pIncr = (IncrMerger*)pCtx;
  return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
}

static int vdbeIncrBgPopulate(IncrMerger *pIncr){
  int rc;
  assert( pIncr->pThread==0 );
  if( pIncr->bUseThread==0 ){
    rc = vdbeIncrPopulate(pIncr);
  }
#if SQLITE_MAX_WORKER_THREADS>0
  else{
    void *pCtx = (void*)pIncr;
    rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
  }
#endif
  return rc;
}

static int vdbeIncrSwap(IncrMerger *pIncr){
  int rc = SQLITE_OK;

  if( pIncr->bUseThread ){
#if SQLITE_MAX_WORKER_THREADS>0
    if( pIncr->pThread ){
      void *pRet;
      assert( pIncr->bUseThread );
      rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
      pIncr->pThread = 0;
    }
#endif

    if( rc==SQLITE_OK ){
      SorterFile f0 = pIncr->aFile[0];
      pIncr->aFile[0] = pIncr->aFile[1];
      pIncr->aFile[1] = f0;
    }

    if( rc==SQLITE_OK ){
      if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
        pIncr->bEof = 1;
      }else{
        rc = vdbeIncrBgPopulate(pIncr);
      }
    }
  }else{
    rc = vdbeIncrPopulate(pIncr);
    pIncr->aFile[0] = pIncr->aFile[1];
    if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
      pIncr->bEof = 1;
    }
  }

  return rc;
}

static void vdbeIncrFree(IncrMerger *pIncr){
  if( pIncr ){
#if SQLITE_MAX_WORKER_THREADS>0
    if( pIncr->pThread ){
      void *pRet;
      sqlite3ThreadJoin(pIncr->pThread, &pRet);
    }
    if( pIncr->bUseThread ){
      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
    }
#endif
    vdbeMergeEngineFree(pIncr->pMerger);
    sqlite3_free(pIncr);
  }
}

static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
  IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
  if( pIncr ){
    memset(pIncr, 0, sizeof(IncrMerger));
    pIncr->pMerger = pMerger;
    pIncr->pTask = pTask;
    pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
    pTask->file2.iEof += pIncr->mxSz;

#if 0
    /* Open the two temp files. */
    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
    if( rc==SQLITE_OK ){
      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
    }
    if( rc!=SQLITE_OK ){
      vdbeIncrFree(pIncr);
      pIncr = 0;
    }
#endif
  }
  return pIncr;
}

static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  if( bUseThread ){
    pIncr->bUseThread = 1;
    pIncr->pTask->file2.iEof -= pIncr->mxSz;
  }
}

static int vdbeIncrInit2(PmaReader *pIter){
  int rc = SQLITE_OK;
  IncrMerger *pIncr = pIter->pIncr;
  if( pIncr ){
    SortSubtask *pTask = pIncr->pTask;
    int i;
    MergeEngine *pMerger = pIncr->pMerger;

    for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
      rc = vdbeIncrInit2(&pMerger->aIter[i]);
    }
    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
    }

    /* Set up the required files for pIncr */
    if( rc==SQLITE_OK ){
      if( pIncr->bUseThread==0 ){
        if( pTask->file2.pFd==0 ){
          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
          assert( pTask->file2.iEof>0 );
          if( rc==SQLITE_OK ){
            vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
            pTask->file2.iEof = 0;
          }
        }
        if( rc==SQLITE_OK ){
          pIncr->aFile[1].pFd = pTask->file2.pFd;
          pIncr->iStartOff = pTask->file2.iEof;
          pTask->file2.iEof += pIncr->mxSz;
        }
      }else{
        rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
        if( rc==SQLITE_OK ){
          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
        }
      }
    }

    if( rc==SQLITE_OK && pIncr->bUseThread ){
      rc = vdbeIncrBgPopulate(pIncr);
    }

    if( rc==SQLITE_OK ){
      rc = vdbePmaReaderNext(pIter);
    }
  }
  return rc;
}

/*
** Allocate a new MergeEngine object to merge the contents of nPMA level-0
** PMAs from pTask->file. If no error occurs, set *ppOut to point to
** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
** to NULL and return an SQLite error code.
**
** When this function is called, *piOffset is set to the offset of the
** first PMA to read from pTask->file. Assuming no error occurs, it is 
** set to the offset immediately following the last byte of the last
** PMA before returning. If an error does occur, then the final value of
** *piOffset is undefined.
*/
static int vdbeMergeEngineLevel0(
  SortSubtask *pTask,             /* Sorter task to read from */
  int nPMA,                       /* Number of PMAs to read */
  i64 *piOffset,                  /* IN/OUT: Read offset in pTask->file */
  MergeEngine **ppOut             /* OUT: New merge-engine */
){
  MergeEngine *pNew;              /* Merge engine to return */
  i64 iOff = *piOffset;
  int i;
  int rc = SQLITE_OK;

  *ppOut = pNew = vdbeMergeEngineNew(nPMA);
  if( pNew==0 ) rc = SQLITE_NOMEM;

  for(i=0; i<nPMA && rc==SQLITE_OK; i++){
    i64 nDummy;
    PmaReader *pIter = &pNew->aIter[i];
    rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pIter, &nDummy);
    iOff = pIter->iEof;
  }

  if( rc!=SQLITE_OK ){
    vdbeMergeEngineFree(pNew);
    *ppOut = 0;
  }
  *piOffset = iOff;
  return rc;
}

typedef struct IncrBuilder IncrBuilder;
struct IncrBuilder {
  int nPMA;                     /* Number of iterators used so far */
  MergeEngine *pMerger;         /* Merge engine to populate. */
};

static int vdbeAddToBuilder(
  SortSubtask *pTask,
  IncrBuilder *pBuilder, 
  MergeEngine *pMerger
){
  int rc = SQLITE_OK;
  IncrMerger *pIncr;

  assert( pMerger );
  if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){
    rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger);
    pBuilder->pMerger = 0;
    pBuilder->nPMA = 0;
  }

  if( rc==SQLITE_OK && pBuilder->pMerger==0 ){
    pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
    if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM;
  }

  if( rc==SQLITE_OK ){
    pIncr = vdbeIncrNew(pTask, pMerger);
    if( pIncr==0 ) rc = SQLITE_NOMEM;
    pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr;
  }

  if( rc!=SQLITE_OK ){
    vdbeMergeEngineFree(pMerger);
  }

  return rc;
}

/*
** Populate iterator *pIter so that it may be used to iterate through all 
** keys stored in all PMAs created by this sorter.
*/
static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
  SortSubtask *pTask0 = &pSorter->aTask[0];
  MergeEngine *pMain = 0;
  sqlite3 *db = pTask0->db;
  int rc = SQLITE_OK;
  int iTask;

  IncrBuilder *aMerge;
  const int nMerge = 32;
  aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
  if( aMerge==0 ) return SQLITE_NOMEM;

  if( pSorter->nTask>1 ){
    pMain = vdbeMergeEngineNew(pSorter->nTask);
    if( pMain==0 ) rc = SQLITE_NOMEM;
  }

  for(iTask=0; iTask<pSorter->nTask && rc==SQLITE_OK; iTask++){
    MergeEngine *pRoot = 0;

    int iPMA;
    i64 iReadOff = 0;
    SortSubtask *pTask = &pSorter->aTask[iTask];
    if( pTask->nPMA==0 ) continue;
    for(iPMA=0; iPMA<pTask->nPMA; iPMA += SORTER_MAX_MERGE_COUNT){
      MergeEngine *pMerger = 0;
      int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT);


      rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
      if( rc!=SQLITE_OK ) break;

      if( iPMA==0 ){
        pRoot = pMerger;

      }else{

        if( pRoot ){



          rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot);
          pRoot = 0;
          if( rc!=SQLITE_OK ){
            vdbeMergeEngineFree(pMerger);

            break;
          }
        }

        rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger);
      }
    }

    if( pRoot==0 ){
      int i;
      for(i=0; rc==SQLITE_OK && i<nMerge; i++){
        if( aMerge[i].pMerger ){
          if( pRoot ){
            rc = vdbeAddToBuilder(pTask, &aMerge[i], pRoot);
            if( rc!=SQLITE_OK ) break;
          }
          pRoot = aMerge[i].pMerger;
          aMerge[i].pMerger = 0;
        }
      }
    }

    if( rc==SQLITE_OK ){

      if( pMain==0 ){
        pMain = pRoot;
      }else{
        IncrMerger *pNew = vdbeIncrNew(pTask, pRoot);

        pMain->aIter[iTask].pIncr = pNew;
        if( pNew==0 ) rc = SQLITE_NOMEM;
      }
      memset(aMerge, 0, nMerge*sizeof(aMerge[0]));
    }
  }

  if( rc==SQLITE_OK ){
    SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];


    rc = vdbeSortAllocUnpacked(pLast);
    if( rc==SQLITE_OK ){
      pIter->pIncr = vdbeIncrNew(pLast, pMain);
      if( pIter->pIncr==0 ){
        rc = SQLITE_NOMEM;
      }else{
        vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
        for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
          IncrMerger *pIncr;
          if( (pIncr = pMain->aIter[iTask].pIncr) ){
            vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
            assert( pIncr->pTask!=pLast );
          }


        }
      }



    }
  }

  if( rc==SQLITE_OK ){
    rc = vdbeIncrInit2(pIter);
  }

  sqlite3_free(aMerge);
  return rc;
}


/*
** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
** this function is called to prepare for iterating through the records
Changes to test/sort2.test.
11
12
13
14
15
16
17



18
19
20
21
22




23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55














56
57
58
59



60
61
# This file implements regression tests for SQLite library. 
#

set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix sort2




db close
sqlite3_shutdown
sqlite3_config_worker_threads 7
reset_db





do_execsql_test 1 {
  PRAGMA cache_size = 5;
  WITH r(x,y) AS (
    SELECT 1, randomblob(100)
    UNION ALL
    SELECT x+1, randomblob(100) FROM r
    LIMIT 100000
  )
  SELECT count(x), length(y) FROM r GROUP BY (x%5)
} {
  20000 100 20000 100 20000 100 20000 100 20000 100
}

do_execsql_test 2.1 {
  CREATE TABLE t1(a, b);
  WITH r(x,y) AS (
    SELECT 1, randomblob(100)
    UNION ALL
    SELECT x+1, randomblob(100) FROM r
    LIMIT 10000
  ) INSERT INTO t1 SELECT * FROM r;
}

do_execsql_test 2.2 {
  CREATE UNIQUE INDEX i1 ON t1(b, a);
}

do_execsql_test 2.3 {
  CREATE UNIQUE INDEX i2 ON t1(a);
}

do_execsql_test 2.4 { PRAGMA integrity_check } {ok}















db close
sqlite3_shutdown
sqlite3_config_worker_threads 0
sqlite3_initialize



finish_test








>
>
>
|
|
|
|
|
>
>
>
>
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
|
>
>
>


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# This file implements regression tests for SQLite library. 
#

set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix sort2

foreach {tn script} {
  1 { }
  2 {
    catch { db close }
    sqlite3_shutdown
    sqlite3_config_worker_threads 7
    reset_db
  }
} {

  eval $script

  do_execsql_test $tn.1 {
    PRAGMA cache_size = 5;
    WITH r(x,y) AS (
      SELECT 1, randomblob(100)
      UNION ALL
      SELECT x+1, randomblob(100) FROM r
      LIMIT 100000
    )
    SELECT count(x), length(y) FROM r GROUP BY (x%5)
  } {
    20000 100 20000 100 20000 100 20000 100 20000 100
  }

  do_execsql_test $tn.2.1 {
    CREATE TABLE t1(a, b);
    WITH r(x,y) AS (
      SELECT 1, randomblob(100)
      UNION ALL
      SELECT x+1, randomblob(100) FROM r
      LIMIT 10000
    ) INSERT INTO t1 SELECT * FROM r;
  }
  
  do_execsql_test $tn.2.2 {
    CREATE UNIQUE INDEX i1 ON t1(b, a);
  }
  
  do_execsql_test $tn.2.3 {
    CREATE UNIQUE INDEX i2 ON t1(a);
  }
  
  do_execsql_test $tn.2.4 { PRAGMA integrity_check } {ok}
  
  breakpoint
  do_execsql_test $tn.3 {
    PRAGMA cache_size = 5;
    WITH r(x,y) AS (
      SELECT 1, randomblob(100)
      UNION ALL
      SELECT x+1, randomblob(100) FROM r
      LIMIT 1000000
    )
    SELECT count(x), length(y) FROM r GROUP BY (x%5)
  } {
    200000 100 200000 100 200000 100 200000 100 200000 100
  }
  
  db close
  sqlite3_shutdown
  sqlite3_config_worker_threads 0
  sqlite3_initialize

}

finish_test