/ Check-in [69026ec7]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Rework the way trees of MergeEngine objects are built in vdbesort.c to make it easier to follow. Fix memory leaks that could follow an OOM or IO error. Add various comments to explain functions in vdbesort.c.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads
Files: files | file ages | folders
SHA1: 69026ec7dc3bd3e33bbe17c221a53cf1dd0f8945
User & Date: dan 2014-04-16 16:43:05
Context
2014-04-16
17:41
Change the name of vdbeIncrInit2 to vdbePmaReaderIncrInit. Add a header comment to the same function. check-in: 6622d876 user: dan tags: threads
16:43
Rework the way trees of MergeEngine objects are built in vdbesort.c to make it easier to follow. Fix memory leaks that could follow an OOM or IO error. Add various comments to explain functions in vdbesort.c. check-in: 69026ec7 user: dan tags: threads
2014-04-15
20:52
Fix some problems to do with OOM conditions in vdbesort.c. Some problems remain. check-in: 2f94f9ce user: dan tags: threads
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/vdbesort.c.

  1748   1748     }
  1749   1749   
  1750   1750     return rc;
  1751   1751   }
  1752   1752   
  1753   1753   /*
  1754   1754   ** Allocate and return a new IncrMerger object to read data from pMerger.
         1755  +**
         1756  +** If an OOM condition is encountered, return NULL. In this case free the
         1757  +** pMerger argument before returning.
  1755   1758   */
  1756         -static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
  1757         -  IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
         1759  +static int vdbeIncrNew(
         1760  +  SortSubtask *pTask, 
         1761  +  MergeEngine *pMerger,
         1762  +  IncrMerger **ppOut
         1763  +){
         1764  +  int rc = SQLITE_OK;
         1765  +  IncrMerger *pIncr = *ppOut = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
  1758   1766     if( pIncr ){
  1759   1767       memset(pIncr, 0, sizeof(IncrMerger));
  1760   1768       pIncr->pMerger = pMerger;
  1761   1769       pIncr->pTask = pTask;
  1762   1770       pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
  1763   1771       pTask->file2.iEof += pIncr->mxSz;
         1772  +  }else{
         1773  +    vdbeMergeEngineFree(pMerger);
         1774  +    rc = SQLITE_NOMEM;
  1764   1775     }
  1765         -  return pIncr;
         1776  +  return rc;
  1766   1777   }
  1767   1778   
  1768   1779   /*
  1769   1780   ** Set the "use-threads" flag on object pIncr.
  1770   1781   */
  1771   1782   static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
  1772   1783     if( bUseThread ){
................................................................................
  1774   1785       pIncr->pTask->file2.iEof -= pIncr->mxSz;
  1775   1786     }
  1776   1787   }
  1777   1788   
  1778   1789   #define INCRINIT2_NORMAL 0
  1779   1790   #define INCRINIT2_TASK   1
  1780   1791   #define INCRINIT2_ROOT   2
  1781         -
  1782   1792   static int vdbeIncrInit2(PmaReader *pIter, int eMode);
  1783   1793   
         1794  +/*
         1795  +** Initialize the merger argument passed as the second argument. Once this
         1796  +** function returns, the first key of merged data may be read from the merger
         1797  +** object in the usual fashion.
         1798  +**
         1799  +** If argument eMode is INCRINIT2_ROOT, then it is assumed that any IncrMerge
         1800  +** objects attached to the PmaReader objects that the merger reads from have
         1801  +** already been populated, but that they have not yet populated aFile[0] and
         1802  +** set the PmaReader objects up to read from it. In this case all that is
         1803  +** required is to call vdbePmaReaderNext() on each iterator to point it at
         1804  +** its first key.
         1805  +**
         1806  +** Otherwise, if eMode is any value other than INCRINIT2_ROOT, then use 
         1807  +** vdbeIncrInit2() to initialize each PmaReader that feeds data to pMerger.
         1808  +**
         1809  +** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
         1810  +*/
  1784   1811   static int vdbeIncrInitMerger(
  1785   1812     SortSubtask *pTask, 
  1786   1813     MergeEngine *pMerger, 
  1787         -  int eMode
         1814  +  int eMode                       /* One of the INCRINIT2_XXX constants */
  1788   1815   ){
  1789         -  int i;
  1790         -  int rc = SQLITE_OK;
         1816  +  int rc = SQLITE_OK;             /* Return code */
         1817  +  int i;                          /* For iterating through PmaReader objects */
  1791   1818   
  1792   1819     for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
  1793   1820       if( eMode==INCRINIT2_ROOT ){
  1794   1821         rc = vdbePmaReaderNext(&pMerger->aIter[i]);
  1795   1822       }else{
  1796   1823         rc = vdbeIncrInit2(&pMerger->aIter[i], INCRINIT2_NORMAL);
  1797   1824       }
................................................................................
  1847   1874         rc = vdbePmaReaderNext(pIter);
  1848   1875       }
  1849   1876     }
  1850   1877     return rc;
  1851   1878   }
  1852   1879   
  1853   1880   #if SQLITE_MAX_WORKER_THREADS>0
         1881  +/*
         1882  +** The main routine for vdbeIncrInit2() operations run in background threads.
         1883  +*/
  1854   1884   static void *vdbeIncrInit2Thread(void *pCtx){
  1855   1885     PmaReader *pReader = (PmaReader*)pCtx;
  1856   1886     void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) );
  1857   1887     pReader->pIncr->pTask->bDone = 1;
  1858   1888     return pRet;
  1859   1889   }
  1860   1890   
         1891  +/*
         1892  +** Use a background thread to invoke vdbeIncrInit2(INCRINIT2_TASK) on the
         1893  +** the PmaReader object passed as the first argument.
         1894  +**
         1895  +** This call will initialize the various fields of the pIter->pIncr 
         1896  +** structure and, if it is a multi-threaded IncrMerger, launch a 
         1897  +** background thread to populate aFile[1].
         1898  +*/
  1861   1899   static int vdbeIncrBgInit2(PmaReader *pIter){
  1862   1900     void *pCtx = (void*)pIter;
  1863   1901     return vdbeSorterCreateThread(pIter->pIncr->pTask, vdbeIncrInit2Thread, pCtx);
  1864   1902   }
  1865   1903   #endif
  1866   1904   
  1867   1905   /*
................................................................................
  1901   1939       vdbeMergeEngineFree(pNew);
  1902   1940       *ppOut = 0;
  1903   1941     }
  1904   1942     *piOffset = iOff;
  1905   1943     return rc;
  1906   1944   }
  1907   1945   
  1908         -typedef struct IncrBuilder IncrBuilder;
  1909         -struct IncrBuilder {
  1910         -  int nPMA;                     /* Number of iterators used so far */
  1911         -  MergeEngine *pMerger;         /* Merge engine to populate. */
  1912         -};
         1946  +/*
         1947  +** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
         1948  +** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
         1949  +**
         1950  +** i.e.
         1951  +**
         1952  +**   nPMA<=16    -> TreeDepth() == 0
         1953  +**   nPMA<=256   -> TreeDepth() == 1
         1954  +**   nPMA<=65536 -> TreeDepth() == 2
         1955  +*/
         1956  +static int vdbeSorterTreeDepth(int nPMA){
         1957  +  int nDepth = 0;
         1958  +  i64 nDiv = SORTER_MAX_MERGE_COUNT;
         1959  +  while( nDiv < (i64)nPMA ){
         1960  +    nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
         1961  +    nDepth++;
         1962  +  }
         1963  +  return nDepth;
         1964  +}
  1913   1965   
  1914         -static int vdbeAddToBuilder(
  1915         -  SortSubtask *pTask,
  1916         -  IncrBuilder *pBuilder, 
  1917         -  MergeEngine *pMerger
         1966  +/*
         1967  +** pRoot is the root of an incremental merge-tree with depth nDepth (according
         1968  +** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
         1969  +** tree, counting from zero. This function adds pLeaf to the tree.
         1970  +**
         1971  +** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
         1972  +** code is returned and pLeaf is freed.
         1973  +*/
         1974  +static int vdbeSorterAddToTree(
         1975  +  SortSubtask *pTask,             /* Task context */
         1976  +  int nDepth,                     /* Depth of tree according to TreeDepth() */
         1977  +  int iSeq,                       /* Sequence number of leaf within tree */
         1978  +  MergeEngine *pRoot,             /* Root of tree */
         1979  +  MergeEngine *pLeaf              /* Leaf to add to tree */
  1918   1980   ){
  1919   1981     int rc = SQLITE_OK;
         1982  +  int nDiv = 1;
         1983  +  int i;
         1984  +  MergeEngine *p = pRoot;
  1920   1985     IncrMerger *pIncr;
  1921   1986   
  1922         -  assert( pMerger );
  1923         -  if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){
  1924         -    rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger);
  1925         -    pBuilder->pMerger = 0;
  1926         -    pBuilder->nPMA = 0;
         1987  +  rc = vdbeIncrNew(pTask, pLeaf, &pIncr);
         1988  +
         1989  +  for(i=1; i<nDepth; i++){
         1990  +    nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
  1927   1991     }
  1928   1992   
  1929         -  if( rc==SQLITE_OK && pBuilder->pMerger==0 ){
  1930         -    pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
  1931         -    if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM;
         1993  +  for(i=1; i<nDepth && rc==SQLITE_OK; i++){
         1994  +    int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
         1995  +    PmaReader *pIter = &p->aIter[iIter];
         1996  +
         1997  +    if( pIter->pIncr==0 ){
         1998  +      MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
         1999  +      if( pNew==0 ){
         2000  +        rc = SQLITE_NOMEM;
         2001  +      }else{
         2002  +        rc = vdbeIncrNew(pTask, pNew, &pIter->pIncr);
         2003  +      }
         2004  +    }
         2005  +
         2006  +    p = pIter->pIncr->pMerger;
         2007  +    nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
  1932   2008     }
  1933   2009   
  1934   2010     if( rc==SQLITE_OK ){
  1935         -    pIncr = vdbeIncrNew(pTask, pMerger);
  1936         -    if( pIncr==0 ) rc = SQLITE_NOMEM;
  1937         -    pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr;
         2011  +    p->aIter[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
         2012  +  }else{
         2013  +    vdbeIncrFree(pIncr);
         2014  +  }
         2015  +  return rc;
         2016  +}
         2017  +
         2018  +/*
         2019  +** This function is called as part of a SorterRewind() operation on a sorter
         2020  +** that has already written two or more level-0 PMAs to one or more temp
         2021  +** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that 
         2022  +** can be used to incrementally merge all PMAs on disk.
         2023  +**
         2024  +** If successful, SQLITE_OK is returned and *ppOut set to point to the
         2025  +** MergeEngine object at the root of the tree before returning. Or, if an
         2026  +** error occurs, an SQLite error code is returned and the final value 
         2027  +** of *ppOut is undefined.
         2028  +*/
         2029  +static int vdbeSorterMergeTreeBuild(VdbeSorter *pSorter, MergeEngine **ppOut){
         2030  +  MergeEngine *pMain = 0;
         2031  +  int rc = SQLITE_OK;
         2032  +  int iTask;
         2033  +
         2034  +  /* If the sorter uses more than one task, then create the top-level 
         2035  +  ** MergeEngine here. This MergeEngine will read data from exactly 
         2036  +  ** one PmaReader per sub-task.  */
         2037  +  assert( pSorter->bUseThreads || pSorter->nTask==1 );
         2038  +  if( pSorter->nTask>1 ){
         2039  +    pMain = vdbeMergeEngineNew(pSorter->nTask);
         2040  +    if( pMain==0 ) rc = SQLITE_NOMEM;
         2041  +  }
         2042  +
         2043  +  for(iTask=0; iTask<pSorter->nTask && rc==SQLITE_OK; iTask++){
         2044  +    SortSubtask *pTask = &pSorter->aTask[iTask];
         2045  +    if( pTask->nPMA ){
         2046  +      MergeEngine *pRoot = 0;     /* Root node of tree for this task */
         2047  +      int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
         2048  +      i64 iReadOff = 0;
         2049  +
         2050  +      if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
         2051  +        rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
         2052  +      }else{
         2053  +        int i;
         2054  +        int iSeq = 0;
         2055  +        pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
         2056  +        if( pRoot==0 ) rc = SQLITE_NOMEM;
         2057  +        for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
         2058  +          MergeEngine *pMerger = 0; /* New level-0 PMA merger */
         2059  +          int nReader;              /* Number of level-0 PMAs to merge */
         2060  +
         2061  +          nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
         2062  +          rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
         2063  +          if( rc==SQLITE_OK ){
         2064  +            rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
         2065  +          }
         2066  +        }
         2067  +      }
         2068  +
         2069  +      if( rc==SQLITE_OK ){
         2070  +        if( pMain==0 ){
         2071  +          pMain = pRoot;
         2072  +        }else{
         2073  +          rc = vdbeIncrNew(pTask, pRoot, &pMain->aIter[iTask].pIncr);
         2074  +        }
         2075  +      }else{
         2076  +        vdbeMergeEngineFree(pRoot);
         2077  +      }
         2078  +    }
  1938   2079     }
  1939   2080   
  1940   2081     if( rc!=SQLITE_OK ){
  1941         -    vdbeMergeEngineFree(pMerger);
         2082  +    vdbeMergeEngineFree(pMain);
         2083  +    pMain = 0;
  1942   2084     }
  1943         -
         2085  +  *ppOut = pMain;
  1944   2086     return rc;
  1945   2087   }
  1946   2088   
  1947   2089   /*
  1948   2090   ** Populate iterator *pIter so that it may be used to iterate through all 
  1949   2091   ** keys stored in all PMAs created by this sorter.
  1950   2092   */
  1951   2093   static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
         2094  +  int rc;                         /* Return code */
  1952   2095     SortSubtask *pTask0 = &pSorter->aTask[0];
  1953   2096     MergeEngine *pMain = 0;
  1954   2097     sqlite3 *db = pTask0->pSorter->db;
  1955         -  int rc = SQLITE_OK;
  1956   2098     int iTask;
  1957   2099   
  1958         -  IncrBuilder *aMerge;
  1959         -  const int nMerge = 32;
  1960         -  aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
  1961         -  if( aMerge==0 ) return SQLITE_NOMEM;
  1962         -
  1963         -  if( pSorter->nTask>1 ){
  1964         -    pMain = vdbeMergeEngineNew(pSorter->nTask);
  1965         -    if( pMain==0 ) rc = SQLITE_NOMEM;
  1966         -  }
  1967         -
  1968         -  for(iTask=0; iTask<pSorter->nTask && rc==SQLITE_OK; iTask++){
  1969         -    MergeEngine *pRoot = 0;
  1970         -    int iPMA;
  1971         -    i64 iReadOff = 0;
  1972         -    SortSubtask *pTask = &pSorter->aTask[iTask];
  1973         -    if( pTask->nPMA==0 ) continue;
  1974         -    for(iPMA=0; iPMA<pTask->nPMA; iPMA += SORTER_MAX_MERGE_COUNT){
  1975         -      MergeEngine *pMerger = 0;
  1976         -      int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT);
  1977         -
  1978         -      rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
  1979         -      if( rc!=SQLITE_OK ) break;
  1980         -
  1981         -      if( iPMA==0 ){
  1982         -        pRoot = pMerger;
  1983         -      }else{
  1984         -        if( pRoot ){
  1985         -          rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot);
  1986         -          pRoot = 0;
  1987         -          if( rc!=SQLITE_OK ){
  1988         -            vdbeMergeEngineFree(pMerger);
  1989         -            break;
  1990         -          }
  1991         -        }
  1992         -        rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger);
  1993         -        if( rc!=SQLITE_OK ) break;
  1994         -      }
  1995         -    }
  1996         -
  1997         -    if( pRoot==0 ){
  1998         -      int i;
  1999         -      for(i=0; rc==SQLITE_OK && i<nMerge; i++){
  2000         -        if( aMerge[i].pMerger ){
  2001         -          if( pRoot ){
  2002         -            rc = vdbeAddToBuilder(pTask, &aMerge[i], pRoot);
  2003         -            if( rc!=SQLITE_OK ) break;
  2004         -          }
  2005         -          pRoot = aMerge[i].pMerger;
  2006         -          aMerge[i].pMerger = 0;
  2007         -        }
  2008         -      }
  2009         -    }
  2010         -
  2011         -    if( rc==SQLITE_OK ){
  2012         -      if( pMain==0 ){
  2013         -        pMain = pRoot;
  2014         -      }else{
  2015         -        IncrMerger *pNew = vdbeIncrNew(pTask, pRoot);
  2016         -        pMain->aIter[iTask].pIncr = pNew;
  2017         -        if( pNew==0 ) rc = SQLITE_NOMEM;
  2018         -      }
  2019         -      memset(aMerge, 0, nMerge*sizeof(aMerge[0]));
  2020         -    }
  2021         -
  2022         -    if( rc!=SQLITE_OK ){
  2023         -      vdbeMergeEngineFree(pRoot);
  2024         -    }
  2025         -  }
  2026         -
         2100  +  rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
  2027   2101     if( rc==SQLITE_OK ){
  2028   2102   #if SQLITE_MAX_WORKER_THREADS
  2029   2103       if( pSorter->bUseThreads ){
  2030   2104         PmaReader *pIter;
  2031   2105         SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
  2032   2106         rc = vdbeSortAllocUnpacked(pLast);
  2033   2107         if( rc==SQLITE_OK ){
  2034   2108           pIter = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
  2035   2109           pSorter->pReader = pIter;
  2036   2110         }
  2037   2111         if( rc==SQLITE_OK ){
  2038         -        pIter->pIncr = vdbeIncrNew(pLast, pMain);
  2039         -        if( pIter->pIncr==0 ){
  2040         -          rc = SQLITE_NOMEM;
  2041         -        }
  2042         -        else{
         2112  +        rc = vdbeIncrNew(pLast, pMain, &pIter->pIncr);
         2113  +        if( rc==SQLITE_OK ){
  2043   2114             vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
  2044   2115             for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
  2045   2116               IncrMerger *pIncr;
  2046   2117               if( (pIncr = pMain->aIter[iTask].pIncr) ){
  2047   2118                 vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
  2048   2119                 assert( pIncr->pTask!=pLast );
  2049   2120               }
................................................................................
  2051   2122             if( pSorter->nTask>1 ){
  2052   2123               for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
  2053   2124                 PmaReader *p = &pMain->aIter[iTask];
  2054   2125                 assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] );
  2055   2126                 if( p->pIncr ){ rc = vdbeIncrBgInit2(p); }
  2056   2127               }
  2057   2128             }
  2058         -          pMain = 0;
  2059   2129           }
         2130  +        pMain = 0;
  2060   2131         }
  2061   2132         if( rc==SQLITE_OK ){
  2062   2133           int eMode = (pSorter->nTask>1 ? INCRINIT2_ROOT : INCRINIT2_NORMAL);
  2063   2134           rc = vdbeIncrInit2(pIter, eMode);
  2064   2135         }
  2065   2136       }else
  2066   2137   #endif
  2067   2138       {
  2068         -      pSorter->pMerger = pMain;
  2069   2139         rc = vdbeIncrInitMerger(pTask0, pMain, INCRINIT2_NORMAL);
         2140  +      pSorter->pMerger = pMain;
  2070   2141         pMain = 0;
  2071   2142       }
  2072   2143     }
  2073   2144   
  2074   2145     if( rc!=SQLITE_OK ){
  2075         -    int i;
  2076         -    for(i=0; rc==SQLITE_OK && i<nMerge; i++){
  2077         -      vdbeMergeEngineFree(aMerge[i].pMerger);
  2078         -    }
  2079   2146       vdbeMergeEngineFree(pMain);
  2080   2147     }
  2081         -  sqlite3_free(aMerge);
  2082   2148     return rc;
  2083   2149   }
  2084   2150   
  2085   2151   
  2086   2152   /*
  2087   2153   ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
  2088   2154   ** this function is called to prepare for iterating through the records