Index: src/vdbesort.c ================================================================== --- src/vdbesort.c +++ src/vdbesort.c @@ -1750,21 +1750,32 @@ return rc; } /* ** Allocate and return a new IncrMerger object to read data from pMerger. +** +** If an OOM condition is encountered, return NULL. In this case free the +** pMerger argument before returning. */ -static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){ - IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger)); +static int vdbeIncrNew( + SortSubtask *pTask, + MergeEngine *pMerger, + IncrMerger **ppOut +){ + int rc = SQLITE_OK; + IncrMerger *pIncr = *ppOut = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger)); if( pIncr ){ memset(pIncr, 0, sizeof(IncrMerger)); pIncr->pMerger = pMerger; pIncr->pTask = pTask; pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); pTask->file2.iEof += pIncr->mxSz; + }else{ + vdbeMergeEngineFree(pMerger); + rc = SQLITE_NOMEM; } - return pIncr; + return rc; } /* ** Set the "use-threads" flag on object pIncr. */ @@ -1776,20 +1787,36 @@ } #define INCRINIT2_NORMAL 0 #define INCRINIT2_TASK 1 #define INCRINIT2_ROOT 2 - static int vdbeIncrInit2(PmaReader *pIter, int eMode); +/* +** Initialize the merger argument passed as the second argument. Once this +** function returns, the first key of merged data may be read from the merger +** object in the usual fashion. +** +** If argument eMode is INCRINIT2_ROOT, then it is assumed that any IncrMerge +** objects attached to the PmaReader objects that the merger reads from have +** already been populated, but that they have not yet populated aFile[0] and +** set the PmaReader objects up to read from it. In this case all that is +** required is to call vdbePmaReaderNext() on each iterator to point it at +** its first key. +** +** Otherwise, if eMode is any value other than INCRINIT2_ROOT, then use +** vdbeIncrInit2() to initialize each PmaReader that feeds data to pMerger. +** +** SQLITE_OK is returned if successful, or an SQLite error code otherwise. +*/ static int vdbeIncrInitMerger( SortSubtask *pTask, MergeEngine *pMerger, - int eMode + int eMode /* One of the INCRINIT2_XXX constants */ ){ - int i; - int rc = SQLITE_OK; + int rc = SQLITE_OK; /* Return code */ + int i; /* For iterating through PmaReader objects */ for(i=0; rc==SQLITE_OK && inTree; i++){ if( eMode==INCRINIT2_ROOT ){ rc = vdbePmaReaderNext(&pMerger->aIter[i]); }else{ @@ -1849,17 +1876,28 @@ } return rc; } #if SQLITE_MAX_WORKER_THREADS>0 +/* +** The main routine for vdbeIncrInit2() operations run in background threads. +*/ static void *vdbeIncrInit2Thread(void *pCtx){ PmaReader *pReader = (PmaReader*)pCtx; void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) ); pReader->pIncr->pTask->bDone = 1; return pRet; } +/* +** Use a background thread to invoke vdbeIncrInit2(INCRINIT2_TASK) on the +** the PmaReader object passed as the first argument. +** +** This call will initialize the various fields of the pIter->pIncr +** structure and, if it is a multi-threaded IncrMerger, launch a +** background thread to populate aFile[1]. +*/ static int vdbeIncrBgInit2(PmaReader *pIter){ void *pCtx = (void*)pIter; return vdbeSorterCreateThread(pIter->pIncr->pTask, vdbeIncrInit2Thread, pCtx); } #endif @@ -1903,129 +1941,165 @@ } *piOffset = iOff; return rc; } -typedef struct IncrBuilder IncrBuilder; -struct IncrBuilder { - int nPMA; /* Number of iterators used so far */ - MergeEngine *pMerger; /* Merge engine to populate. */ -}; - -static int vdbeAddToBuilder( - SortSubtask *pTask, - IncrBuilder *pBuilder, - MergeEngine *pMerger +/* +** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of +** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes. +** +** i.e. +** +** nPMA<=16 -> TreeDepth() == 0 +** nPMA<=256 -> TreeDepth() == 1 +** nPMA<=65536 -> TreeDepth() == 2 +*/ +static int vdbeSorterTreeDepth(int nPMA){ + int nDepth = 0; + i64 nDiv = SORTER_MAX_MERGE_COUNT; + while( nDiv < (i64)nPMA ){ + nDiv = nDiv * SORTER_MAX_MERGE_COUNT; + nDepth++; + } + return nDepth; +} + +/* +** pRoot is the root of an incremental merge-tree with depth nDepth (according +** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the +** tree, counting from zero. This function adds pLeaf to the tree. +** +** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error +** code is returned and pLeaf is freed. +*/ +static int vdbeSorterAddToTree( + SortSubtask *pTask, /* Task context */ + int nDepth, /* Depth of tree according to TreeDepth() */ + int iSeq, /* Sequence number of leaf within tree */ + MergeEngine *pRoot, /* Root of tree */ + MergeEngine *pLeaf /* Leaf to add to tree */ ){ int rc = SQLITE_OK; + int nDiv = 1; + int i; + MergeEngine *p = pRoot; IncrMerger *pIncr; - assert( pMerger ); - if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){ - rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger); - pBuilder->pMerger = 0; - pBuilder->nPMA = 0; + rc = vdbeIncrNew(pTask, pLeaf, &pIncr); + + for(i=1; ipMerger==0 ){ - pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); - if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM; + for(i=1; iaIter[iIter]; + + if( pIter->pIncr==0 ){ + MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); + if( pNew==0 ){ + rc = SQLITE_NOMEM; + }else{ + rc = vdbeIncrNew(pTask, pNew, &pIter->pIncr); + } + } + + p = pIter->pIncr->pMerger; + nDiv = nDiv / SORTER_MAX_MERGE_COUNT; } if( rc==SQLITE_OK ){ - pIncr = vdbeIncrNew(pTask, pMerger); - if( pIncr==0 ) rc = SQLITE_NOMEM; - pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr; + p->aIter[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr; + }else{ + vdbeIncrFree(pIncr); + } + return rc; +} + +/* +** This function is called as part of a SorterRewind() operation on a sorter +** that has already written two or more level-0 PMAs to one or more temp +** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that +** can be used to incrementally merge all PMAs on disk. +** +** If successful, SQLITE_OK is returned and *ppOut set to point to the +** MergeEngine object at the root of the tree before returning. Or, if an +** error occurs, an SQLite error code is returned and the final value +** of *ppOut is undefined. +*/ +static int vdbeSorterMergeTreeBuild(VdbeSorter *pSorter, MergeEngine **ppOut){ + MergeEngine *pMain = 0; + int rc = SQLITE_OK; + int iTask; + + /* If the sorter uses more than one task, then create the top-level + ** MergeEngine here. This MergeEngine will read data from exactly + ** one PmaReader per sub-task. */ + assert( pSorter->bUseThreads || pSorter->nTask==1 ); + if( pSorter->nTask>1 ){ + pMain = vdbeMergeEngineNew(pSorter->nTask); + if( pMain==0 ) rc = SQLITE_NOMEM; + } + + for(iTask=0; iTasknTask && rc==SQLITE_OK; iTask++){ + SortSubtask *pTask = &pSorter->aTask[iTask]; + if( pTask->nPMA ){ + MergeEngine *pRoot = 0; /* Root node of tree for this task */ + int nDepth = vdbeSorterTreeDepth(pTask->nPMA); + i64 iReadOff = 0; + + if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){ + rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot); + }else{ + int i; + int iSeq = 0; + pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); + if( pRoot==0 ) rc = SQLITE_NOMEM; + for(i=0; inPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){ + MergeEngine *pMerger = 0; /* New level-0 PMA merger */ + int nReader; /* Number of level-0 PMAs to merge */ + + nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT); + rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); + if( rc==SQLITE_OK ){ + rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger); + } + } + } + + if( rc==SQLITE_OK ){ + if( pMain==0 ){ + pMain = pRoot; + }else{ + rc = vdbeIncrNew(pTask, pRoot, &pMain->aIter[iTask].pIncr); + } + }else{ + vdbeMergeEngineFree(pRoot); + } + } } if( rc!=SQLITE_OK ){ - vdbeMergeEngineFree(pMerger); + vdbeMergeEngineFree(pMain); + pMain = 0; } - + *ppOut = pMain; return rc; } /* ** Populate iterator *pIter so that it may be used to iterate through all ** keys stored in all PMAs created by this sorter. */ static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){ + int rc; /* Return code */ SortSubtask *pTask0 = &pSorter->aTask[0]; MergeEngine *pMain = 0; sqlite3 *db = pTask0->pSorter->db; - int rc = SQLITE_OK; int iTask; - IncrBuilder *aMerge; - const int nMerge = 32; - aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge); - if( aMerge==0 ) return SQLITE_NOMEM; - - if( pSorter->nTask>1 ){ - pMain = vdbeMergeEngineNew(pSorter->nTask); - if( pMain==0 ) rc = SQLITE_NOMEM; - } - - for(iTask=0; iTasknTask && rc==SQLITE_OK; iTask++){ - MergeEngine *pRoot = 0; - int iPMA; - i64 iReadOff = 0; - SortSubtask *pTask = &pSorter->aTask[iTask]; - if( pTask->nPMA==0 ) continue; - for(iPMA=0; iPMAnPMA; iPMA += SORTER_MAX_MERGE_COUNT){ - MergeEngine *pMerger = 0; - int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT); - - rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); - if( rc!=SQLITE_OK ) break; - - if( iPMA==0 ){ - pRoot = pMerger; - }else{ - if( pRoot ){ - rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot); - pRoot = 0; - if( rc!=SQLITE_OK ){ - vdbeMergeEngineFree(pMerger); - break; - } - } - rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger); - if( rc!=SQLITE_OK ) break; - } - } - - if( pRoot==0 ){ - int i; - for(i=0; rc==SQLITE_OK && iaIter[iTask].pIncr = pNew; - if( pNew==0 ) rc = SQLITE_NOMEM; - } - memset(aMerge, 0, nMerge*sizeof(aMerge[0])); - } - - if( rc!=SQLITE_OK ){ - vdbeMergeEngineFree(pRoot); - } - } - + rc = vdbeSorterMergeTreeBuild(pSorter, &pMain); if( rc==SQLITE_OK ){ #if SQLITE_MAX_WORKER_THREADS if( pSorter->bUseThreads ){ PmaReader *pIter; SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1]; @@ -2033,15 +2107,12 @@ if( rc==SQLITE_OK ){ pIter = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader)); pSorter->pReader = pIter; } if( rc==SQLITE_OK ){ - pIter->pIncr = vdbeIncrNew(pLast, pMain); - if( pIter->pIncr==0 ){ - rc = SQLITE_NOMEM; - } - else{ + rc = vdbeIncrNew(pLast, pMain, &pIter->pIncr); + if( rc==SQLITE_OK ){ vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads); for(iTask=0; iTask<(pSorter->nTask-1); iTask++){ IncrMerger *pIncr; if( (pIncr = pMain->aIter[iTask].pIncr) ){ vdbeIncrSetThreads(pIncr, pSorter->bUseThreads); @@ -2053,34 +2124,29 @@ PmaReader *p = &pMain->aIter[iTask]; assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] ); if( p->pIncr ){ rc = vdbeIncrBgInit2(p); } } } - pMain = 0; } + pMain = 0; } if( rc==SQLITE_OK ){ int eMode = (pSorter->nTask>1 ? INCRINIT2_ROOT : INCRINIT2_NORMAL); rc = vdbeIncrInit2(pIter, eMode); } }else #endif { - pSorter->pMerger = pMain; rc = vdbeIncrInitMerger(pTask0, pMain, INCRINIT2_NORMAL); + pSorter->pMerger = pMain; pMain = 0; } } if( rc!=SQLITE_OK ){ - int i; - for(i=0; rc==SQLITE_OK && i