Index: src/vdbesort.c ================================================================== --- src/vdbesort.c +++ src/vdbesort.c @@ -89,15 +89,15 @@ #include "vdbeInt.h" /* ** Private objects used by the sorter */ -typedef struct VdbeSorterIter VdbeSorterIter; -typedef struct SortSubtask SortSubtask; -typedef struct SorterRecord SorterRecord; -typedef struct SorterMerger SorterMerger; -typedef struct FileWriter FileWriter; +typedef struct MergeEngine MergeEngine; /* Merge PMAs together */ +typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ +typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */ +typedef struct SorterRecord SorterRecord; /* A record being sorted */ +typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ /* ** Candidate values for SortSubtask.eWork */ @@ -105,26 +105,26 @@ #define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */ #define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */ /* ** Sorting is divided up into smaller subtasks. Each subtask is controlled -** by an instance of this object. Subtask might run in either the main thread +** by an instance of this object. A Subtask might run in either the main thread ** or in a background thread. ** -** Exactly VdbeSorter.nThread instances of this object are allocated +** Exactly VdbeSorter.nTask instances of this object are allocated ** as part of each VdbeSorter object. Instances are never allocated any other -** way. VdbeSorter.nThread is set to the number of worker threads allowed +** way. VdbeSorter.nTask is set to the number of worker threads allowed ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). ** ** When a background thread is launched to perform work, SortSubtask.bDone -** is set to 0 and the SortSubtask.pThread variable set to point to the +** is set to 0 and the SortSubtask.pTask variable set to point to the ** thread handle. SortSubtask.bDone is set to 1 (to indicate to the main -** thread that joining SortSubtask.pThread will not block) before the thread -** exits. SortSubtask.pThread and bDone are always cleared after the +** thread that joining SortSubtask.pTask will not block) before the thread +** exits. SortSubtask.pTask and bDone are always cleared after the ** background thread has been joined. ** -** One object (specifically, VdbeSorter.aThread[VdbeSorter.nThread-1]) +** One object (specifically, VdbeSorter.aTask[VdbeSorter.nTask-1]) ** is reserved for the foreground thread. ** ** The nature of the work performed is determined by SortSubtask.eWork, ** as follows: ** @@ -140,20 +140,20 @@ ** Merge existing PMAs until SortSubtask.nConsolidate or fewer ** remain in temp file SortSubtask.pTemp1. */ struct SortSubtask { SQLiteThread *pThread; /* Thread handle, or NULL */ - int bDone; /* Set to true by pThread when finished */ + int bDone; /* Set to true by pTask when finished */ sqlite3_vfs *pVfs; /* VFS used to open temporary files */ KeyInfo *pKeyInfo; /* How to compare records */ UnpackedRecord *pUnpacked; /* Space to unpack a record */ int pgsz; /* Main database page size */ u8 eWork; /* One of the SORT_SUBTASK_* constants */ int nConsolidate; /* For SORT_SUBTASK_CONS, max final PMAs */ - SorterRecord *pList; /* List of records for pThread to sort */ + SorterRecord *pList; /* List of records for pTask to sort */ int nInMemory; /* Expected size of PMA based on pList */ u8 *aListMemory; /* Records memory (or NULL) */ int nPMA; /* Number of PMAs currently in pTemp1 */ i64 iTemp1Off; /* Offset to write to in pTemp1 */ @@ -160,33 +160,27 @@ sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */ }; /* -** NOTES ON DATA STRUCTURE USED FOR N-WAY MERGES: -** -** As keys are added to the sorter, they are written to disk in a series -** of sorted packed-memory-arrays (PMAs). The size of each PMA is roughly -** the same as the cache-size allowed for temporary databases. In order -** to allow the caller to extract keys from the sorter in sorted order, -** all PMAs currently stored on disk must be merged together. This comment -** describes the data structure used to do so. The structure supports -** merging any number of arrays in a single pass with no redundant comparison -** operations. -** -** The aIter[] array contains an iterator for each of the PMAs being merged. -** An aIter[] iterator either points to a valid key or else is at EOF. For -** the purposes of the paragraphs below, we assume that the array is actually -** N elements in size, where N is the smallest power of 2 greater to or equal -** to the number of iterators being merged. The extra aIter[] elements are -** treated as if they are empty (always at EOF). +** The MergeEngine object is used to combine two or more smaller PMAs into +** one big PMA using a merge operation. Separate PMAs all need to be +** combined into one big PMA in order to be able to step through the sorted +** records in order. +** +** The aIter[] array contains a PmaReader object for each of the PMAs being +** merged. An aIter[] object either points to a valid key or else is at EOF. +** For the purposes of the paragraphs below, we assume that the array is +** actually N elements in size, where N is the smallest power of 2 greater +** to or equal to the number of PMAs being merged. The extra aIter[] elements +** are treated as if they are empty (always at EOF). ** ** The aTree[] array is also N elements in size. The value of N is stored in -** the VdbeSorter.nTree variable. +** the MergeEngine.nTree variable. ** ** The final (N/2) elements of aTree[] contain the results of comparing -** pairs of iterator keys together. Element i contains the result of +** pairs of PMA keys together. Element i contains the result of ** comparing aIter[2*i-N] and aIter[2*i-N+1]. Whichever key is smaller, the ** aTree element is set to the index of it. ** ** For the purposes of this comparison, EOF is considered greater than any ** other key value. If the keys are equal (only possible with two EOF @@ -228,14 +222,14 @@ ** ** In other words, each time we advance to the next sorter element, log2(N) ** key comparison operations are required, where N is the number of segments ** being merged (rounded up to the next power of 2). */ -struct SorterMerger { - int nTree; /* Used size of aTree/aIter (power of 2) */ - int *aTree; /* Current state of incremental merge */ - VdbeSorterIter *aIter; /* Array of iterators to merge data from */ +struct MergeEngine { + int nTree; /* Used size of aTree/aIter (power of 2) */ + int *aTree; /* Current state of incremental merge */ + PmaReader *aIter; /* Array of iterators to merge data from */ }; /* ** Main sorter structure. A single instance of this is allocated for each ** sorter cursor created by the VDBE. @@ -244,24 +238,25 @@ int nInMemory; /* Current size of pRecord list as PMA */ int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ SorterRecord *pRecord; /* Head of in-memory record list */ - SorterMerger *pMerger; /* For final merge of PMAs (by caller) */ + MergeEngine *pMerger; /* For final merge of PMAs (by caller) */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ int iPrev; /* Previous thread used to flush PMA */ - int nThread; /* Size of aThread[] array */ - SortSubtask aThread[1]; + int nTask; /* Size of aTask[] array */ + SortSubtask aTask[1]; /* One or more subtasks */ }; /* -** The following type is an iterator for a PMA. It caches the current key in -** variables nKey/aKey. If the iterator is at EOF, pFile==0. +** An instance of the following object is used to read records out of a +** PMA, in sorted order. The next key to be read is cached in nKey/aKey. +** pFile==0 at EOF. */ -struct VdbeSorterIter { +struct PmaReader { i64 iReadOff; /* Current read offset */ i64 iEof; /* 1 byte past EOF for this iterator */ int nAlloc; /* Bytes of space at aAlloc */ int nKey; /* Number of bytes in key */ sqlite3_file *pFile; /* File iterator is reading from */ @@ -271,16 +266,18 @@ int nBuffer; /* Size of read buffer in bytes */ u8 *aMap; /* Pointer to mapping of entire file */ }; /* -** An instance of this structure is used to organize the stream of records -** being written to files by the merge-sort code into aligned, page-sized -** blocks. Doing all I/O in aligned page-sized blocks helps I/O to go -** faster on many operating systems. +** An instance of this object is used for writing a PMA. +** +** The PMA is written one record at a time. Each record is of an arbitrary +** size. But I/O is more efficient if it occurs in page-sized blocks where +** each block is aligned on a page boundary. This object caches writes to +** the PMA so that aligned, page-size blocks are written. */ -struct FileWriter { +struct PmaWriter { int eFWErr; /* Non-zero if in an error state */ u8 *aBuffer; /* Pointer to write buffer */ int nBuffer; /* Size of write buffer in bytes */ int iBufStart; /* First byte of buffer to write */ int iBufEnd; /* Last byte of buffer to write */ @@ -287,12 +284,12 @@ i64 iWriteOff; /* Offset of start of buffer in file */ sqlite3_file *pFile; /* File to write to */ }; /* -** A structure to store a single record. All in-memory records are connected -** together into a linked list headed at VdbeSorter.pRecord. +** This object is the header on a single record while that record is being +** held in memory and prior to being written out as part of a PMA. ** ** How the linked list is connected depends on how memory is being managed ** by this module. If using a separate allocation for each in-memory record ** (VdbeSorter.aMemory==0), then the list is always connected using the ** SorterRecord.u.pNext pointers. @@ -305,15 +302,16 @@ ** is full, the list is sorted. As part of the sorting process, it is ** converted to use the SorterRecord.u.pNext pointers. See function ** vdbeSorterSort() for details. */ struct SorterRecord { - int nVal; + int nVal; /* Size of the record in bytes */ union { SorterRecord *pNext; /* Pointer to next record in list */ int iNext; /* Offset within aMemory of next record */ } u; + /* The data for the record immediately follows this header */ }; /* Return a pointer to the buffer containing the record data for SorterRecord ** object p. Should be used as if: ** @@ -323,22 +321,22 @@ /* The minimum PMA size is set to this value multiplied by the database ** page size in bytes. */ #define SORTER_MIN_WORKING 10 -/* Maximum number of segments to merge in a single pass. */ +/* Maximum number of PMAs that a single MergeEngine can merge */ #define SORTER_MAX_MERGE_COUNT 16 /* -** Free all memory belonging to the VdbeSorterIter object passed as the second +** Free all memory belonging to the PmaReader object passed as the second ** argument. All structure fields are set to zero before returning. */ -static void vdbeSorterIterZero(VdbeSorterIter *pIter){ +static void vdbePmaReaderClear(PmaReader *pIter){ sqlite3_free(pIter->aAlloc); sqlite3_free(pIter->aBuffer); if( pIter->aMap ) sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap); - memset(pIter, 0, sizeof(VdbeSorterIter)); + memset(pIter, 0, sizeof(PmaReader)); } /* ** Read nByte bytes of data from the stream of data iterated by object p. ** If successful, set *ppOut to point to a buffer containing the data @@ -346,12 +344,12 @@ ** error code. ** ** The buffer indicated by *ppOut may only be considered valid until the ** next call to this function. */ -static int vdbeSorterIterRead( - VdbeSorterIter *p, /* Iterator */ +static int vdbePmaReadBlob( + PmaReader *p, /* Iterator */ int nByte, /* Bytes of data to read */ u8 **ppOut /* OUT: Pointer to buffer containing data */ ){ int iBuf; /* Offset within buffer to read from */ int nAvail; /* Bytes of data available in buffer */ @@ -417,17 +415,17 @@ nRem = nByte - nAvail; /* The following loop copies up to p->nBuffer bytes per iteration into ** the p->aAlloc[] buffer. */ while( nRem>0 ){ - int rc; /* vdbeSorterIterRead() return code */ + int rc; /* vdbePmaReadBlob() return code */ int nCopy; /* Number of bytes to copy */ u8 *aNext; /* Pointer to buffer to copy data from */ nCopy = nRem; if( nRem>p->nBuffer ) nCopy = p->nBuffer; - rc = vdbeSorterIterRead(p, nCopy, &aNext); + rc = vdbePmaReadBlob(p, nCopy, &aNext); if( rc!=SQLITE_OK ) return rc; assert( aNext!=p->aAlloc ); memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy); nRem -= nCopy; } @@ -440,11 +438,11 @@ /* ** Read a varint from the stream of data accessed by p. Set *pnOut to ** the value read. */ -static int vdbeSorterIterVarint(VdbeSorterIter *p, u64 *pnOut){ +static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){ int iBuf; if( p->aMap ){ p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut); }else{ @@ -453,11 +451,11 @@ p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut); }else{ u8 aVarint[16], *a; int i = 0, rc; do{ - rc = vdbeSorterIterRead(p, 1, &a); + rc = vdbePmaReadBlob(p, 1, &a); if( rc ) return rc; aVarint[(i++)&0xf] = a[0]; }while( (a[0]&0x80)!=0 ); sqlite3GetVarint(aVarint, pnOut); } @@ -469,24 +467,24 @@ /* ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if ** no error occurs, or an SQLite error code if one does. */ -static int vdbeSorterIterNext(VdbeSorterIter *pIter){ +static int vdbePmaReaderNext(PmaReader *pIter){ int rc; /* Return Code */ u64 nRec = 0; /* Size of record in bytes */ if( pIter->iReadOff>=pIter->iEof ){ /* This is an EOF condition */ - vdbeSorterIterZero(pIter); + vdbePmaReaderClear(pIter); return SQLITE_OK; } - rc = vdbeSorterIterVarint(pIter, &nRec); + rc = vdbePmaReadVarint(pIter, &nRec); if( rc==SQLITE_OK ){ pIter->nKey = (int)nRec; - rc = vdbeSorterIterRead(pIter, (int)nRec, &pIter->aKey); + rc = vdbePmaReadBlob(pIter, (int)nRec, &pIter->aKey); } return rc; } @@ -494,31 +492,31 @@ ** Initialize iterator pIter to scan through the PMA stored in file pFile ** starting at offset iStart and ending at offset iEof-1. This function ** leaves the iterator pointing to the first key in the PMA (or EOF if the ** PMA is empty). */ -static int vdbeSorterIterInit( - SortSubtask *pThread, /* Thread context */ - i64 iStart, /* Start offset in pThread->pTemp1 */ - VdbeSorterIter *pIter, /* Iterator to populate */ +static int vdbePmaReaderInit( + SortSubtask *pTask, /* Thread context */ + i64 iStart, /* Start offset in pTask->pTemp1 */ + PmaReader *pIter, /* Iterator to populate */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */ ){ int rc = SQLITE_OK; - int nBuf = pThread->pgsz; + int nBuf = pTask->pgsz; void *pMap = 0; /* Mapping of temp file */ - assert( pThread->iTemp1Off>iStart ); + assert( pTask->iTemp1Off>iStart ); assert( pIter->aAlloc==0 ); assert( pIter->aBuffer==0 ); - pIter->pFile = pThread->pTemp1; + pIter->pFile = pTask->pTemp1; pIter->iReadOff = iStart; pIter->nAlloc = 128; pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc); if( pIter->aAlloc ){ /* Try to xFetch() a mapping of the entire temp file. If this is possible, ** the PMA will be read via the mapping. Otherwise, use xRead(). */ - rc = sqlite3OsFetch(pIter->pFile, 0, pThread->iTemp1Off, &pMap); + rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap); }else{ rc = SQLITE_NOMEM; } if( rc==SQLITE_OK ){ @@ -531,58 +529,58 @@ rc = SQLITE_NOMEM; }else{ int iBuf = iStart % nBuf; if( iBuf ){ int nRead = nBuf - iBuf; - if( (iStart + nRead) > pThread->iTemp1Off ){ - nRead = (int)(pThread->iTemp1Off - iStart); + if( (iStart + nRead) > pTask->iTemp1Off ){ + nRead = (int)(pTask->iTemp1Off - iStart); } rc = sqlite3OsRead( - pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart + pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart ); assert( rc!=SQLITE_IOERR_SHORT_READ ); } } } } if( rc==SQLITE_OK ){ u64 nByte; /* Size of PMA in bytes */ - pIter->iEof = pThread->iTemp1Off; - rc = vdbeSorterIterVarint(pIter, &nByte); + pIter->iEof = pTask->iTemp1Off; + rc = vdbePmaReadVarint(pIter, &nByte); pIter->iEof = pIter->iReadOff + nByte; *pnByte += nByte; } if( rc==SQLITE_OK ){ - rc = vdbeSorterIterNext(pIter); + rc = vdbePmaReaderNext(pIter); } return rc; } /* ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, -** size nKey2 bytes). Use (pThread->pKeyInfo) for the collation sequences +** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences ** used by the comparison. Return the result of the comparison. ** -** Before returning, object (pThread->pUnpacked) is populated with the +** Before returning, object (pTask->pUnpacked) is populated with the ** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it -** is assumed that the (pThread->pUnpacked) structure already contains the +** is assumed that the (pTask->pUnpacked) structure already contains the ** unpacked key to use as key2. ** -** If an OOM error is encountered, (pThread->pUnpacked->error_rc) is set +** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set ** to SQLITE_NOMEM. */ static int vdbeSorterCompare( - SortSubtask *pThread, /* Thread context (for pKeyInfo) */ + SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ const void *pKey1, int nKey1, /* Left side of comparison */ const void *pKey2, int nKey2 /* Right side of comparison */ ){ - UnpackedRecord *r2 = pThread->pUnpacked; + UnpackedRecord *r2 = pTask->pUnpacked; if( pKey2 ){ - sqlite3VdbeRecordUnpack(pThread->pKeyInfo, nKey2, pKey2, r2); + sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2); } return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0); } /* @@ -589,19 +587,19 @@ ** This function is called to compare two iterator keys when merging ** multiple b-tree segments. Parameter iOut is the index of the aTree[] ** value to recalculate. */ static int vdbeSorterDoCompare( - SortSubtask *pThread, - SorterMerger *pMerger, + SortSubtask *pTask, + MergeEngine *pMerger, int iOut ){ int i1; int i2; int iRes; - VdbeSorterIter *p1; - VdbeSorterIter *p2; + PmaReader *p1; + PmaReader *p2; assert( iOutnTree && iOut>0 ); if( iOut>=(pMerger->nTree/2) ){ i1 = (iOut - pMerger->nTree/2) * 2; @@ -618,13 +616,13 @@ iRes = i2; }else if( p2->pFile==0 ){ iRes = i1; }else{ int res; - assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSortSubtaskMain() */ + assert( pTask->pUnpacked!=0 ); /* allocated in vdbeSortSubtaskMain() */ res = vdbeSorterCompare( - pThread, p1->aKey, p1->nKey, p2->aKey, p2->nKey + pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey ); if( res<=0 ){ iRes = i1; }else{ iRes = i2; @@ -636,13 +634,17 @@ } /* ** Initialize the temporary index cursor just opened as a sorter cursor. */ -int sqlite3VdbeSorterInit(sqlite3 *db, int nField, VdbeCursor *pCsr){ +int sqlite3VdbeSorterInit( + sqlite3 *db, /* Database connection (for malloc()) */ + int nField, /* Number of key fields in each record */ + VdbeCursor *pCsr /* Cursor that holds the new sorter */ +){ int pgsz; /* Page size of main database */ - int i; /* Used to iterate through aThread[] */ + int i; /* Used to iterate through aTask[] */ int mxCache; /* Cache size */ VdbeSorter *pSorter; /* The new sorter */ KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ int sz; /* Size of pSorter in bytes */ @@ -662,16 +664,16 @@ memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); pKeyInfo->db = 0; if( nField && nWorker==0 ) pKeyInfo->nField = nField; pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); - pSorter->nThread = nWorker + 1; - for(i=0; inThread; i++){ - SortSubtask *pThread = &pSorter->aThread[i]; - pThread->pKeyInfo = pKeyInfo; - pThread->pVfs = db->pVfs; - pThread->pgsz = pgsz; + pSorter->nTask = nWorker + 1; + for(i=0; inTask; i++){ + SortSubtask *pTask = &pSorter->aTask[i]; + pTask->pKeyInfo = pKeyInfo; + pTask->pVfs = db->pVfs; + pTask->pgsz = pgsz; } if( !sqlite3TempInMemory(db) ){ pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz; mxCache = db->aDb[0].pSchema->cache_size; @@ -704,26 +706,26 @@ sqlite3DbFree(db, p); } } /* -** Free all resources owned by the object indicated by argument pThread. All -** fields of *pThread are zeroed before returning. +** Free all resources owned by the object indicated by argument pTask. All +** fields of *pTask are zeroed before returning. */ -static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pThread){ - sqlite3DbFree(db, pThread->pUnpacked); - pThread->pUnpacked = 0; - if( pThread->aListMemory==0 ){ - vdbeSorterRecordFree(0, pThread->pList); +static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ + sqlite3DbFree(db, pTask->pUnpacked); + pTask->pUnpacked = 0; + if( pTask->aListMemory==0 ){ + vdbeSorterRecordFree(0, pTask->pList); }else{ - sqlite3_free(pThread->aListMemory); - pThread->aListMemory = 0; + sqlite3_free(pTask->aListMemory); + pTask->aListMemory = 0; } - pThread->pList = 0; - if( pThread->pTemp1 ){ - sqlite3OsCloseFree(pThread->pTemp1); - pThread->pTemp1 = 0; + pTask->pList = 0; + if( pTask->pTemp1 ){ + sqlite3OsCloseFree(pTask->pTemp1); + pTask->pTemp1 = 0; } } /* ** Join all threads. @@ -730,17 +732,17 @@ */ #if SQLITE_MAX_WORKER_THREADS>0 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; - for(i=0; inThread; i++){ - SortSubtask *pThread = &pSorter->aThread[i]; - if( pThread->pThread ){ + for(i=0; inTask; i++){ + SortSubtask *pTask = &pSorter->aTask[i]; + if( pTask->pTask ){ void *pRet; - int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet); - pThread->pThread = 0; - pThread->bDone = 0; + int rc2 = sqlite3ThreadJoin(pTask->pTask, &pRet); + pTask->pTask = 0; + pTask->bDone = 0; if( rc==SQLITE_OK ) rc = rc2; if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); } } return rc; @@ -748,38 +750,38 @@ #else # define vdbeSorterJoinAll(x,rcin) (rcin) #endif /* -** Allocate a new SorterMerger object with space for nIter iterators. +** Allocate a new MergeEngine object with space for nIter iterators. */ -static SorterMerger *vdbeSorterMergerNew(int nIter){ +static MergeEngine *vdbeMergeEngineNew(int nIter){ int N = 2; /* Smallest power of two >= nIter */ int nByte; /* Total bytes of space to allocate */ - SorterMerger *pNew; /* Pointer to allocated object to return */ + MergeEngine *pNew; /* Pointer to allocated object to return */ assert( nIter<=SORTER_MAX_MERGE_COUNT ); while( NnTree = N; - pNew->aIter = (VdbeSorterIter*)&pNew[1]; + pNew->aIter = (PmaReader*)&pNew[1]; pNew->aTree = (int*)&pNew->aIter[N]; } return pNew; } /* -** Free the SorterMerger object passed as the only argument. +** Free the MergeEngine object passed as the only argument. */ -static void vdbeSorterMergerFree(SorterMerger *pMerger){ +static void vdbeMergeEngineFree(MergeEngine *pMerger){ int i; if( pMerger ){ for(i=0; inTree; i++){ - vdbeSorterIterZero(&pMerger->aIter[i]); + vdbePmaReaderClear(&pMerger->aIter[i]); } } sqlite3_free(pMerger); } @@ -786,16 +788,16 @@ /* ** Reset a sorting cursor back to its original empty state. */ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ int i; - vdbeSorterJoinAll(pSorter, SQLITE_OK); - vdbeSorterMergerFree(pSorter->pMerger); + (void)vdbeSorterJoinAll(pSorter, SQLITE_OK); + vdbeMergeEngineFree(pSorter->pMerger); pSorter->pMerger = 0; - for(i=0; inThread; i++){ - SortSubtask *pThread = &pSorter->aThread[i]; - vdbeSortSubtaskCleanup(db, pThread); + for(i=0; inTask; i++){ + SortSubtask *pTask = &pSorter->aTask[i]; + vdbeSortSubtaskCleanup(db, pTask); } if( pSorter->aMemory==0 ){ vdbeSorterRecordFree(0, pSorter->pRecord); } pSorter->pRecord = 0; @@ -809,11 +811,11 @@ */ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ VdbeSorter *pSorter = pCsr->pSorter; if( pSorter ){ sqlite3VdbeSorterReset(db, pSorter); - vdbeSorterMergerFree(pSorter->pMerger); + vdbeMergeEngineFree(pSorter->pMerger); sqlite3_free(pSorter->aMemory); sqlite3DbFree(db, pSorter); pCsr->pSorter = 0; } } @@ -840,11 +842,11 @@ /* ** Merge the two sorted lists p1 and p2 into a single list. ** Set *ppOut to the head of the new list. */ static void vdbeSorterMerge( - SortSubtask *pThread, /* Calling thread context */ + SortSubtask *pTask, /* Calling thread context */ SorterRecord *p1, /* First list to merge */ SorterRecord *p2, /* Second list to merge */ SorterRecord **ppOut /* OUT: Head of merged list */ ){ SorterRecord *pFinal = 0; @@ -851,11 +853,11 @@ SorterRecord **pp = &pFinal; void *pVal2 = p2 ? SRVAL(p2) : 0; while( p1 && p2 ){ int res; - res = vdbeSorterCompare(pThread, SRVAL(p1), p1->nVal, pVal2, p2->nVal); + res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal); if( res<=0 ){ *pp = p1; pp = &p1->u.pNext; p1 = p1->u.pNext; pVal2 = 0; @@ -870,67 +872,67 @@ *pp = p1 ? p1 : p2; *ppOut = pFinal; } /* -** Sort the linked list of records headed at pThread->pList. Return +** Sort the linked list of records headed at pTask->pList. Return ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if ** an error occurs. */ -static int vdbeSorterSort(SortSubtask *pThread){ +static int vdbeSorterSort(SortSubtask *pTask){ int i; SorterRecord **aSlot; SorterRecord *p; aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); if( !aSlot ){ return SQLITE_NOMEM; } - p = pThread->pList; + p = pTask->pList; while( p ){ SorterRecord *pNext; - if( pThread->aListMemory ){ - if( (u8*)p==pThread->aListMemory ){ + if( pTask->aListMemory ){ + if( (u8*)p==pTask->aListMemory ){ pNext = 0; }else{ - assert( p->u.iNextaListMemory) ); - pNext = (SorterRecord*)&pThread->aListMemory[p->u.iNext]; + assert( p->u.iNextaListMemory) ); + pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext]; } }else{ pNext = p->u.pNext; } p->u.pNext = 0; for(i=0; aSlot[i]; i++){ - vdbeSorterMerge(pThread, p, aSlot[i], &p); + vdbeSorterMerge(pTask, p, aSlot[i], &p); aSlot[i] = 0; } aSlot[i] = p; p = pNext; } p = 0; for(i=0; i<64; i++){ - vdbeSorterMerge(pThread, p, aSlot[i], &p); + vdbeSorterMerge(pTask, p, aSlot[i], &p); } - pThread->pList = p; + pTask->pList = p; sqlite3_free(aSlot); return SQLITE_OK; } /* -** Initialize a file-writer object. +** Initialize a PMA-writer object. */ -static void fileWriterInit( +static void vdbePmaWriterInit( sqlite3_file *pFile, /* File to write to */ - FileWriter *p, /* Object to populate */ + PmaWriter *p, /* Object to populate */ int nBuf, /* Buffer size */ i64 iStart /* Offset of pFile to begin writing at */ ){ - memset(p, 0, sizeof(FileWriter)); + memset(p, 0, sizeof(PmaWriter)); p->aBuffer = (u8*)sqlite3Malloc(nBuf); if( !p->aBuffer ){ p->eFWErr = SQLITE_NOMEM; }else{ p->iBufEnd = p->iBufStart = (iStart % nBuf); @@ -939,14 +941,14 @@ p->pFile = pFile; } } /* -** Write nData bytes of data to the file-write object. Return SQLITE_OK +** Write nData bytes of data to the PMA. Return SQLITE_OK ** if successful, or an SQLite error code if an error occurs. */ -static void fileWriterWrite(FileWriter *p, u8 *pData, int nData){ +static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){ int nRem = nData; while( nRem>0 && p->eFWErr==0 ){ int nCopy = nRem; if( nCopy>(p->nBuffer - p->iBufEnd) ){ nCopy = p->nBuffer - p->iBufEnd; @@ -967,19 +969,19 @@ nRem -= nCopy; } } /* -** Flush any buffered data to disk and clean up the file-writer object. -** The results of using the file-writer after this call are undefined. +** Flush any buffered data to disk and clean up the PMA-writer object. +** The results of using the PMA-writer after this call are undefined. ** Return SQLITE_OK if flushing the buffered data succeeds or is not ** required. Otherwise, return an SQLite error code. ** ** Before returning, set *piEof to the offset immediately following the ** last byte written to the file. */ -static int fileWriterFinish(FileWriter *p, i64 *piEof){ +static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){ int rc; if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){ p->eFWErr = sqlite3OsWrite(p->pFile, &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, p->iWriteOff + p->iBufStart @@ -986,23 +988,23 @@ ); } *piEof = (p->iWriteOff + p->iBufEnd); sqlite3_free(p->aBuffer); rc = p->eFWErr; - memset(p, 0, sizeof(FileWriter)); + memset(p, 0, sizeof(PmaWriter)); return rc; } /* -** Write value iVal encoded as a varint to the file-write object. Return +** Write value iVal encoded as a varint to the PMA. Return ** SQLITE_OK if successful, or an SQLite error code if an error occurs. */ -static void fileWriterWriteVarint(FileWriter *p, u64 iVal){ +static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){ int nByte; u8 aByte[10]; nByte = sqlite3PutVarint(aByte, iVal); - fileWriterWrite(p, aByte, nByte); + vdbePmaWriteBlob(p, aByte, nByte); } #if SQLITE_MAX_MMAP_SIZE>0 /* ** The first argument is a file-handle open on a temporary file. The file @@ -1038,76 +1040,77 @@ ** ** * One or more records packed end-to-end in order of ascending keys. ** Each record consists of a varint followed by a blob of data (the ** key). The varint is the number of bytes in the blob of data. */ -static int vdbeSorterListToPMA(SortSubtask *pThread){ +static int vdbeSorterListToPMA(SortSubtask *pTask){ int rc = SQLITE_OK; /* Return code */ - FileWriter writer; /* Object used to write to the file */ + PmaWriter writer; /* Object used to write to the file */ - memset(&writer, 0, sizeof(FileWriter)); - assert( pThread->nInMemory>0 ); + memset(&writer, 0, sizeof(PmaWriter)); + assert( pTask->nInMemory>0 ); /* If the first temporary PMA file has not been opened, open it now. */ - if( pThread->pTemp1==0 ){ - rc = vdbeSorterOpenTempFile(pThread->pVfs, &pThread->pTemp1); - assert( rc!=SQLITE_OK || pThread->pTemp1 ); - assert( pThread->iTemp1Off==0 ); - assert( pThread->nPMA==0 ); + if( pTask->pTemp1==0 ){ + rc = vdbeSorterOpenTempFile(pTask->pVfs, &pTask->pTemp1); + assert( rc!=SQLITE_OK || pTask->pTemp1 ); + assert( pTask->iTemp1Off==0 ); + assert( pTask->nPMA==0 ); } /* Try to get the file to memory map */ if( rc==SQLITE_OK ){ rc = vdbeSorterExtendFile( - pThread->pTemp1, pThread->iTemp1Off + pThread->nInMemory + 9 + pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9 ); } if( rc==SQLITE_OK ){ SorterRecord *p; SorterRecord *pNext = 0; - fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off); - pThread->nPMA++; - fileWriterWriteVarint(&writer, pThread->nInMemory); - for(p=pThread->pList; p; p=pNext){ + vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz, + pTask->iTemp1Off); + pTask->nPMA++; + vdbePmaWriteVarint(&writer, pTask->nInMemory); + for(p=pTask->pList; p; p=pNext){ pNext = p->u.pNext; - fileWriterWriteVarint(&writer, p->nVal); - fileWriterWrite(&writer, SRVAL(p), p->nVal); - if( pThread->aListMemory==0 ) sqlite3_free(p); - } - pThread->pList = p; - rc = fileWriterFinish(&writer, &pThread->iTemp1Off); - } - - assert( pThread->pList==0 || rc!=SQLITE_OK ); + vdbePmaWriteVarint(&writer, p->nVal); + vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); + if( pTask->aListMemory==0 ) sqlite3_free(p); + } + pTask->pList = p; + rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off); + } + + assert( pTask->pList==0 || rc!=SQLITE_OK ); return rc; } /* -** Advance the SorterMerger iterator passed as the second argument to +** Advance the MergeEngine iterator passed as the second argument to ** the next entry. Set *pbEof to true if this means the iterator has ** reached EOF. ** ** Return SQLITE_OK if successful or an error code if an error occurs. */ static int vdbeSorterNext( - SortSubtask *pThread, - SorterMerger *pMerger, + SortSubtask *pTask, + MergeEngine *pMerger, int *pbEof ){ int rc; int iPrev = pMerger->aTree[1];/* Index of iterator to advance */ /* Advance the current iterator */ - rc = vdbeSorterIterNext(&pMerger->aIter[iPrev]); + rc = vdbePmaReaderNext(&pMerger->aIter[iPrev]); /* Update contents of aTree[] */ if( rc==SQLITE_OK ){ int i; /* Index of aTree[] to recalculate */ - VdbeSorterIter *pIter1; /* First iterator to compare */ - VdbeSorterIter *pIter2; /* Second iterator to compare */ + PmaReader *pIter1; /* First iterator to compare */ + PmaReader *pIter2; /* Second iterator to compare */ u8 *pKey2; /* To pIter2->aKey, or 0 if record cached */ /* Find the first two iterators to compare. The one that was just ** advanced (iPrev) and the one next to it in the array. */ pIter1 = &pMerger->aIter[(iPrev & 0xFFFE)]; @@ -1120,23 +1123,23 @@ if( pIter1->pFile==0 ){ iRes = +1; }else if( pIter2->pFile==0 ){ iRes = -1; }else{ - iRes = vdbeSorterCompare(pThread, + iRes = vdbeSorterCompare(pTask, pIter1->aKey, pIter1->nKey, pKey2, pIter2->nKey ); } /* If pIter1 contained the smaller value, set aTree[i] to its index. ** Then set pIter2 to the next iterator to compare to pIter1. In this - ** case there is no cache of pIter2 in pThread->pUnpacked, so set + ** case there is no cache of pIter2 in pTask->pUnpacked, so set ** pKey2 to point to the record belonging to pIter2. ** ** Alternatively, if pIter2 contains the smaller of the two values, ** set aTree[i] to its index and update pIter1. If vdbeSorterCompare() - ** was actually called above, then pThread->pUnpacked now contains + ** was actually called above, then pTask->pUnpacked now contains ** a value equivalent to pIter2. So set pKey2 to NULL to prevent ** vdbeSorterCompare() from decoding pIter2 again. ** ** If the two values were equal, then the value from the oldest ** PMA should be considered smaller. The VdbeSorter.aIter[] array @@ -1161,133 +1164,133 @@ /* ** The main routine for sorter-thread operations. */ static void *vdbeSortSubtaskMain(void *pCtx){ int rc = SQLITE_OK; - SortSubtask *pThread = (SortSubtask*)pCtx; - - assert( pThread->eWork==SORT_SUBTASK_SORT - || pThread->eWork==SORT_SUBTASK_TO_PMA - || pThread->eWork==SORT_SUBTASK_CONS - ); - assert( pThread->bDone==0 ); - - if( pThread->pUnpacked==0 ){ - char *pFree; - pThread->pUnpacked = sqlite3VdbeAllocUnpackedRecord( - pThread->pKeyInfo, 0, 0, &pFree - ); - assert( pThread->pUnpacked==(UnpackedRecord*)pFree ); + SortSubtask *pTask = (SortSubtask*)pCtx; + + assert( pTask->eWork==SORT_SUBTASK_SORT + || pTask->eWork==SORT_SUBTASK_TO_PMA + || pTask->eWork==SORT_SUBTASK_CONS + ); + assert( pTask->bDone==0 ); + + if( pTask->pUnpacked==0 ){ + char *pFree; + pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( + pTask->pKeyInfo, 0, 0, &pFree + ); + assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); if( pFree==0 ){ rc = SQLITE_NOMEM; goto thread_out; } - pThread->pUnpacked->nField = pThread->pKeyInfo->nField; - pThread->pUnpacked->errCode = 0; + pTask->pUnpacked->nField = pTask->pKeyInfo->nField; + pTask->pUnpacked->errCode = 0; } - if( pThread->eWork==SORT_SUBTASK_CONS ){ - assert( pThread->pList==0 ); - while( pThread->nPMA>pThread->nConsolidate && rc==SQLITE_OK ){ - int nIter = MIN(pThread->nPMA, SORTER_MAX_MERGE_COUNT); + if( pTask->eWork==SORT_SUBTASK_CONS ){ + assert( pTask->pList==0 ); + while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){ + int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT); sqlite3_file *pTemp2 = 0; /* Second temp file to use */ - SorterMerger *pMerger; /* Object for reading/merging PMA data */ + MergeEngine *pMerger; /* Object for reading/merging PMA data */ i64 iReadOff = 0; /* Offset in pTemp1 to read from */ i64 iWriteOff = 0; /* Offset in pTemp2 to write to */ int i; /* Allocate a merger object to merge PMAs together. */ - pMerger = vdbeSorterMergerNew(nIter); + pMerger = vdbeMergeEngineNew(nIter); if( pMerger==0 ){ rc = SQLITE_NOMEM; break; } /* Open a second temp file to write merged data to */ - rc = vdbeSorterOpenTempFile(pThread->pVfs, &pTemp2); + rc = vdbeSorterOpenTempFile(pTask->pVfs, &pTemp2); if( rc==SQLITE_OK ){ - rc = vdbeSorterExtendFile(pTemp2, pThread->iTemp1Off); + rc = vdbeSorterExtendFile(pTemp2, pTask->iTemp1Off); } if( rc!=SQLITE_OK ){ - vdbeSorterMergerFree(pMerger); + vdbeMergeEngineFree(pMerger); break; } /* This loop runs once for each output PMA. Each output PMA is made ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */ - for(i=0; inPMA; i+=SORTER_MAX_MERGE_COUNT){ - FileWriter writer; /* Object for writing data to pTemp2 */ + for(i=0; inPMA; i+=SORTER_MAX_MERGE_COUNT){ + PmaWriter writer; /* Object for writing data to pTemp2 */ i64 nOut = 0; /* Bytes of data in output PMA */ int bEof = 0; int rc2; /* Configure the merger object to read and merge data from the next ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs, ** if that is fewer). */ int iIter; for(iIter=0; iIteraIter[iIter]; - rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nOut); + PmaReader *pIter = &pMerger->aIter[iIter]; + rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut); iReadOff = pIter->iEof; - if( iReadOff>=pThread->iTemp1Off || rc!=SQLITE_OK ) break; + if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break; } for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ - rc = vdbeSorterDoCompare(pThread, pMerger, iIter); + rc = vdbeSorterDoCompare(pTask, pMerger, iIter); } - fileWriterInit(pTemp2, &writer, pThread->pgsz, iWriteOff); - fileWriterWriteVarint(&writer, nOut); + vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff); + vdbePmaWriteVarint(&writer, nOut); while( rc==SQLITE_OK && bEof==0 ){ - VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; + PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; assert( pIter->pFile!=0 ); /* pIter is not at EOF */ - fileWriterWriteVarint(&writer, pIter->nKey); - fileWriterWrite(&writer, pIter->aKey, pIter->nKey); - rc = vdbeSorterNext(pThread, pMerger, &bEof); + vdbePmaWriteVarint(&writer, pIter->nKey); + vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey); + rc = vdbeSorterNext(pTask, pMerger, &bEof); } - rc2 = fileWriterFinish(&writer, &iWriteOff); + rc2 = vdbePmaWriterFinish(&writer, &iWriteOff); if( rc==SQLITE_OK ) rc = rc2; } - vdbeSorterMergerFree(pMerger); - sqlite3OsCloseFree(pThread->pTemp1); - pThread->pTemp1 = pTemp2; - pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT); - pThread->iTemp1Off = iWriteOff; + vdbeMergeEngineFree(pMerger); + sqlite3OsCloseFree(pTask->pTemp1); + pTask->pTemp1 = pTemp2; + pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT); + pTask->iTemp1Off = iWriteOff; } }else{ - /* Sort the pThread->pList list */ - rc = vdbeSorterSort(pThread); + /* Sort the pTask->pList list */ + rc = vdbeSorterSort(pTask); /* If required, write the list out to a PMA. */ - if( rc==SQLITE_OK && pThread->eWork==SORT_SUBTASK_TO_PMA ){ + if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){ #ifdef SQLITE_DEBUG - i64 nExpect = pThread->nInMemory - + sqlite3VarintLen(pThread->nInMemory) - + pThread->iTemp1Off; + i64 nExpect = pTask->nInMemory + + sqlite3VarintLen(pTask->nInMemory) + + pTask->iTemp1Off; #endif - rc = vdbeSorterListToPMA(pThread); - assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) ); + rc = vdbeSorterListToPMA(pTask); + assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) ); } } thread_out: - pThread->bDone = 1; - if( rc==SQLITE_OK && pThread->pUnpacked->errCode ){ - assert( pThread->pUnpacked->errCode==SQLITE_NOMEM ); + pTask->bDone = 1; + if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){ + assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); rc = SQLITE_NOMEM; } return SQLITE_INT_TO_PTR(rc); } /* ** Run the activity scheduled by the object passed as the only argument ** in the current thread. */ -static int vdbeSorterRunThread(SortSubtask *pThread){ - int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pThread) ); - assert( pThread->bDone ); - pThread->bDone = 0; +static int vdbeSorterRunTask(SortSubtask *pTask){ + int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) ); + assert( pTask->bDone ); + pTask->bDone = 0; return rc; } /* ** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly @@ -1297,75 +1300,75 @@ */ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; int i; - SortSubtask *pThread = 0; /* Thread context used to create new PMA */ - int nWorker = (pSorter->nThread-1); + SortSubtask *pTask = 0; /* Thread context used to create new PMA */ + int nWorker = (pSorter->nTask-1); pSorter->bUsePMA = 1; for(i=0; iiPrev + i + 1) % nWorker; - pThread = &pSorter->aThread[iTest]; + pTask = &pSorter->aTask[iTest]; #if SQLITE_MAX_WORKER_THREADS>0 - if( pThread->bDone ){ + if( pTask->bDone ){ void *pRet; - assert( pThread->pThread ); - rc = sqlite3ThreadJoin(pThread->pThread, &pRet); - pThread->pThread = 0; - pThread->bDone = 0; + assert( pTask->pTask ); + rc = sqlite3ThreadJoin(pTask->pTask, &pRet); + pTask->pTask = 0; + pTask->bDone = 0; if( rc==SQLITE_OK ){ rc = SQLITE_PTR_TO_INT(pRet); } } #endif - if( pThread->pThread==0 ) break; - pThread = 0; + if( pTask->pThread==0 ) break; + pTask = 0; } - if( pThread==0 ){ - pThread = &pSorter->aThread[nWorker]; + if( pTask==0 ){ + pTask = &pSorter->aTask[nWorker]; } - pSorter->iPrev = (pThread - pSorter->aThread); + pSorter->iPrev = (pTask - pSorter->aTask); if( rc==SQLITE_OK ){ - assert( pThread->pThread==0 && pThread->bDone==0 ); - pThread->eWork = SORT_SUBTASK_TO_PMA; - pThread->pList = pSorter->pRecord; - pThread->nInMemory = pSorter->nInMemory; + assert( pTask->pThread==0 && pTask->bDone==0 ); + pTask->eWork = SORT_SUBTASK_TO_PMA; + pTask->pList = pSorter->pRecord; + pTask->nInMemory = pSorter->nInMemory; pSorter->nInMemory = 0; pSorter->pRecord = 0; if( pSorter->aMemory ){ - u8 *aMem = pThread->aListMemory; - pThread->aListMemory = pSorter->aMemory; + u8 *aMem = pTask->aListMemory; + pTask->aListMemory = pSorter->aMemory; pSorter->aMemory = aMem; } #if SQLITE_MAX_WORKER_THREADS>0 - if( !bFg && pThread!=&pSorter->aThread[nWorker] ){ + if( !bFg && pTask!=&pSorter->aTask[nWorker] ){ /* Launch a background thread for this operation */ - void *pCtx = (void*)pThread; - assert( pSorter->aMemory==0 || pThread->aListMemory!=0 ); - if( pThread->aListMemory ){ + void *pCtx = (void*)pTask; + assert( pSorter->aMemory==0 || pTask->aListMemory!=0 ); + if( pTask->aListMemory ){ if( pSorter->aMemory==0 ){ pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); if( pSorter->aMemory==0 ) return SQLITE_NOMEM; }else{ pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory); } } - rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSortSubtaskMain, pCtx); + rc = sqlite3ThreadCreate(&pTask->pTask, vdbeSortSubtaskMain, pCtx); }else #endif { /* Use the foreground thread for this operation */ - rc = vdbeSorterRunThread(pThread); + rc = vdbeSorterRunTask(pTask); if( rc==SQLITE_OK ){ - u8 *aMem = pThread->aListMemory; - pThread->aListMemory = pSorter->aMemory; + u8 *aMem = pTask->aListMemory; + pTask->aListMemory = pSorter->aMemory; pSorter->aMemory = aMem; - assert( pThread->pList==0 ); + assert( pTask->pList==0 ); } } } return rc; @@ -1467,19 +1470,20 @@ ** Return the total number of PMAs in all temporary files. */ static int vdbeSorterCountPMA(VdbeSorter *pSorter){ int nPMA = 0; int i; - for(i=0; inThread; i++){ - nPMA += pSorter->aThread[i].nPMA; + for(i=0; inTask; i++){ + nPMA += pSorter->aTask[i].nPMA; } return nPMA; } /* -** Once the sorter has been populated, this function is called to prepare -** for iterating through its contents in sorted order. +** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, +** this function is called to prepare for iterating through the records +** in sorted order. */ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; /* Return code */ @@ -1488,20 +1492,20 @@ /* If no data has been written to disk, then do not do so now. Instead, ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly ** from the in-memory list. */ if( pSorter->bUsePMA==0 ){ if( pSorter->pRecord ){ - SortSubtask *pThread = &pSorter->aThread[0]; + SortSubtask *pTask = &pSorter->aTask[0]; *pbEof = 0; - pThread->pList = pSorter->pRecord; - pThread->eWork = SORT_SUBTASK_SORT; - assert( pThread->aListMemory==0 ); - pThread->aListMemory = pSorter->aMemory; - rc = vdbeSorterRunThread(pThread); - pThread->aListMemory = 0; - pSorter->pRecord = pThread->pList; - pThread->pList = 0; + pTask->pList = pSorter->pRecord; + pTask->eWork = SORT_SUBTASK_SORT; + assert( pTask->aListMemory==0 ); + pTask->aListMemory = pSorter->aMemory; + rc = vdbeSorterRunTask(pTask); + pTask->aListMemory = 0; + pSorter->pRecord = pTask->pList; + pTask->pList = 0; }else{ *pbEof = 1; } return rc; } @@ -1516,24 +1520,24 @@ /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge ** some of them together so that this is no longer the case. */ if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ int i; - for(i=0; rc==SQLITE_OK && inThread; i++){ - SortSubtask *pThread = &pSorter->aThread[i]; - if( pThread->pTemp1 ){ - pThread->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nThread; - pThread->eWork = SORT_SUBTASK_CONS; + for(i=0; rc==SQLITE_OK && inTask; i++){ + SortSubtask *pTask = &pSorter->aTask[i]; + if( pTask->pTemp1 ){ + pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask; + pTask->eWork = SORT_SUBTASK_CONS; #if SQLITE_MAX_WORKER_THREADS>0 - if( i<(pSorter->nThread-1) ){ - void *pCtx = (void*)pThread; - rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSortSubtaskMain,pCtx); + if( i<(pSorter->nTask-1) ){ + void *pCtx = (void*)pTask; + rc = sqlite3ThreadCreate(&pTask->pTask,vdbeSortSubtaskMain,pCtx); }else #endif { - rc = vdbeSorterRunThread(pThread); + rc = vdbeSorterRunTask(pTask); } } } } @@ -1544,35 +1548,35 @@ ** and merge all remaining PMAs. */ assert( pSorter->pMerger==0 ); if( rc==SQLITE_OK ){ int nIter = 0; /* Number of iterators used */ int i; - SorterMerger *pMerger; - for(i=0; inThread; i++){ - nIter += pSorter->aThread[i].nPMA; + MergeEngine *pMerger; + for(i=0; inTask; i++){ + nIter += pSorter->aTask[i].nPMA; } - pSorter->pMerger = pMerger = vdbeSorterMergerNew(nIter); + pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter); if( pMerger==0 ){ rc = SQLITE_NOMEM; }else{ int iIter = 0; int iThread = 0; - for(iThread=0; iThreadnThread; iThread++){ + for(iThread=0; iThreadnTask; iThread++){ int iPMA; i64 iReadOff = 0; - SortSubtask *pThread = &pSorter->aThread[iThread]; - for(iPMA=0; iPMAnPMA && rc==SQLITE_OK; iPMA++){ + SortSubtask *pTask = &pSorter->aTask[iThread]; + for(iPMA=0; iPMAnPMA && rc==SQLITE_OK; iPMA++){ i64 nDummy = 0; - VdbeSorterIter *pIter = &pMerger->aIter[iIter++]; - rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy); + PmaReader *pIter = &pMerger->aIter[iIter++]; + rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy); iReadOff = pIter->iEof; } } for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(&pSorter->aThread[0], pMerger, i); + rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i); } } } if( rc==SQLITE_OK ){ @@ -1587,11 +1591,11 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; int rc; /* Return code */ if( pSorter->pMerger ){ - rc = vdbeSorterNext(&pSorter->aThread[0], pSorter->pMerger, pbEof); + rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof); }else{ SorterRecord *pFree = pSorter->pRecord; pSorter->pRecord = pFree->u.pNext; pFree->u.pNext = 0; if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree); @@ -1609,11 +1613,11 @@ const VdbeSorter *pSorter, /* Sorter object */ int *pnKey /* OUT: Size of current key in bytes */ ){ void *pKey; if( pSorter->pMerger ){ - VdbeSorterIter *pIter; + PmaReader *pIter; pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ]; *pnKey = pIter->nKey; pKey = pIter->aKey; }else{ *pnKey = pSorter->pRecord->nVal; @@ -1661,11 +1665,11 @@ Mem *pVal, /* Value to compare to current sorter key */ int nIgnore, /* Ignore this many fields at the end */ int *pRes /* OUT: Result of comparison */ ){ VdbeSorter *pSorter = pCsr->pSorter; - UnpackedRecord *r2 = pSorter->aThread[0].pUnpacked; + UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked; KeyInfo *pKeyInfo = pCsr->pKeyInfo; int i; void *pKey; int nKey; /* Sorter key to compare pVal with */ assert( r2->nField>=pKeyInfo->nField-nIgnore );