/ Check-in [83a105c8]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Changes to make the multi-threaded sorter sort stably.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | threads-closed
Files: files | file ages | folders
SHA1: 83a105c864247a02b723c84069055765bf373703
User & Date: dan 2014-03-29 19:48:15
Context
2014-03-29
19:48
Changes to make the multi-threaded sorter sort stably. Closed-Leaf check-in: 83a105c8 user: dan tags: threads-closed
10:01
Fix a broken assert() in vdbesort.c. check-in: 18d1b402 user: dan tags: threads
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/vdbesort.c.

    87     87     int pgsz;                       /* Main database page size */
    88     88   
    89     89     u8 eWork;                       /* One of the SORTER_THREAD_* constants */
    90     90     int nConsolidate;               /* For THREAD_CONS, max final PMAs */
    91     91     SorterRecord *pList;            /* List of records for pThread to sort */
    92     92     int nInMemory;                  /* Expected size of PMA based on pList */
    93     93     u8 *aListMemory;                /* Records memory (or NULL) */
           94  +  u32 iSeq;                       /* Sequence number for PMA */
    94     95   
    95     96     int nPMA;                       /* Number of PMAs currently in pTemp1 */
           97  +  int bEmbeddedSeq;               /* True if pTemp1 contains embedded seq. */
    96     98     i64 iTemp1Off;                  /* Offset to write to in pTemp1 */
    97     99     sqlite3_file *pTemp1;           /* File to write PMAs to, or NULL */
    98    100   };
    99    101   
   100    102   
   101    103   /*
   102    104   ** NOTES ON DATA STRUCTURE USED FOR N-WAY MERGES:
................................................................................
   183    185     int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   184    186     int bUsePMA;                    /* True if one or more PMAs created */
   185    187     SorterRecord *pRecord;          /* Head of in-memory record list */
   186    188     SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
   187    189     u8 *aMemory;                    /* Block of memory to alloc records from */
   188    190     int iMemory;                    /* Offset of first free byte in aMemory */
   189    191     int nMemory;                    /* Size of aMemory allocation in bytes */
          192  +  u32 iNextSeq;                   /* Sequence number for next PMA */
   190    193     SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
   191    194   };
   192    195   
   193    196   /*
   194    197   ** The following type is an iterator for a PMA. It caches the current key in 
   195    198   ** variables nKey/aKey. If the iterator is at EOF, pFile==0.
   196    199   */
   197    200   struct VdbeSorterIter {
   198    201     i64 iReadOff;                   /* Current read offset */
   199    202     i64 iEof;                       /* 1 byte past EOF for this iterator */
          203  +  int bEmbeddedSeq;               /* True if records have sequence values */
          204  +  int iSeq;                       /* Current sequence value */
   200    205     int nAlloc;                     /* Bytes of space at aAlloc */
   201    206     int nKey;                       /* Number of bytes in key */
   202    207     sqlite3_file *pFile;            /* File iterator is reading from */
   203    208     u8 *aAlloc;                     /* Allocated space */
   204    209     u8 *aKey;                       /* Pointer to current key */
   205    210     u8 *aBuffer;                    /* Current read buffer */
   206    211     int nBuffer;                    /* Size of read buffer in bytes */
................................................................................
   413    418     if( pIter->iReadOff>=pIter->iEof ){
   414    419       /* This is an EOF condition */
   415    420       vdbeSorterIterZero(pIter);
   416    421       return SQLITE_OK;
   417    422     }
   418    423   
   419    424     rc = vdbeSorterIterVarint(pIter, &nRec);
          425  +  if( rc==SQLITE_OK && pIter->bEmbeddedSeq ){
          426  +    u64 iSeq;
          427  +    rc = vdbeSorterIterVarint(pIter, &iSeq);
          428  +    pIter->iSeq = (int)iSeq;
          429  +  }
   420    430     if( rc==SQLITE_OK ){
   421    431       pIter->nKey = (int)nRec;
   422    432       rc = vdbeSorterIterRead(pIter, (int)nRec, &pIter->aKey);
   423    433     }
   424    434   
   425    435     return rc;
   426    436   }
................................................................................
   444    454     assert( pThread->iTemp1Off>iStart );
   445    455     assert( pIter->aAlloc==0 );
   446    456     assert( pIter->aBuffer==0 );
   447    457     pIter->pFile = pThread->pTemp1;
   448    458     pIter->iReadOff = iStart;
   449    459     pIter->nAlloc = 128;
   450    460     pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
          461  +  pIter->bEmbeddedSeq = pThread->bEmbeddedSeq;
   451    462   
   452    463     /* Try to xFetch() a mapping of the entire temp file. If this is possible,
   453    464     ** the PMA will be read via the mapping. Otherwise, use xRead().  */
   454    465     rc = sqlite3OsFetch(pIter->pFile, 0, pThread->iTemp1Off, &pMap);
   455    466   
   456    467     if( rc==SQLITE_OK ){
   457    468       if( pMap ){
................................................................................
   466    477           if( iBuf ){
   467    478             int nRead = nBuf - iBuf;
   468    479             if( (iStart + nRead) > pThread->iTemp1Off ){
   469    480               nRead = (int)(pThread->iTemp1Off - iStart);
   470    481             }
   471    482             rc = sqlite3OsRead(
   472    483                 pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
   473         -              );
          484  +          );
   474    485             assert( rc!=SQLITE_IOERR_SHORT_READ );
   475    486           }
   476    487         }
   477    488       }
   478    489     }
   479    490   
   480    491     if( rc==SQLITE_OK ){
   481    492       u64 nByte;                    /* Size of PMA in bytes */
   482    493       pIter->iEof = pThread->iTemp1Off;
   483         -    rc = vdbeSorterIterVarint(pIter, &nByte);
   484         -    pIter->iEof = pIter->iReadOff + nByte;
   485         -    *pnByte += nByte;
          494  +    if( pIter->bEmbeddedSeq==0 ){
          495  +      u64 iSeq, nElem;
          496  +      rc = vdbeSorterIterVarint(pIter, &iSeq);
          497  +      pIter->iSeq = (int)iSeq;
          498  +      if( rc==SQLITE_OK ){
          499  +        rc = vdbeSorterIterVarint(pIter, &nElem);
          500  +        *pnByte += (nElem * sqlite3VarintLen(iSeq));
          501  +      }
          502  +    }
          503  +    if( rc==SQLITE_OK ){
          504  +      rc = vdbeSorterIterVarint(pIter, &nByte);
          505  +      pIter->iEof = pIter->iReadOff + nByte;
          506  +      *pnByte += nByte;
          507  +    }
   486    508     }
   487    509   
   488    510     if( rc==SQLITE_OK ){
   489    511       rc = vdbeSorterIterNext(pIter);
   490    512     }
   491    513     return rc;
   492    514   }
................................................................................
   747    769       vdbeSorterRecordFree(0, pSorter->pRecord);
   748    770     }
   749    771     vdbeSorterMergerReset(pSorter->pMerger);
   750    772     pSorter->pRecord = 0;
   751    773     pSorter->nInMemory = 0;
   752    774     pSorter->bUsePMA = 0;
   753    775     pSorter->iMemory = 0;
          776  +  pSorter->iNextSeq = 0;
   754    777   }
   755    778   
   756    779   /*
   757    780   ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
   758    781   */
   759    782   void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
   760    783     VdbeSorter *pSorter = pCsr->pSorter;
................................................................................
   820    843     *ppOut = pFinal;
   821    844   }
   822    845   
   823    846   /*
   824    847   ** Sort the linked list of records headed at pThread->pList. Return 
   825    848   ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
   826    849   ** an error occurs.
          850  +**
          851  +** If the pnElem argument is not NULL and no error occurs, set *pnElem to
          852  +** the total number of elements in the list.
   827    853   */
   828         -static int vdbeSorterSort(SorterThread *pThread){
          854  +static int vdbeSorterSort(SorterThread *pThread, i64 *pnElem){
   829    855     int i;
   830    856     SorterRecord **aSlot;
   831    857     SorterRecord *p;
          858  +  i64 nElem = 0;
   832    859   
   833    860     aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
   834    861     if( !aSlot ){
   835    862       return SQLITE_NOMEM;
   836    863     }
   837    864   
   838    865     p = pThread->pList;
................................................................................
   852    879       p->u.pNext = 0;
   853    880       for(i=0; aSlot[i]; i++){
   854    881         vdbeSorterMerge(pThread, p, aSlot[i], &p);
   855    882         aSlot[i] = 0;
   856    883       }
   857    884       aSlot[i] = p;
   858    885       p = pNext;
          886  +    nElem++;
   859    887     }
   860    888   
   861    889     p = 0;
   862    890     for(i=0; i<64; i++){
   863    891       vdbeSorterMerge(pThread, p, aSlot[i], &p);
   864    892     }
   865    893     pThread->pList = p;
   866    894   
          895  +  *pnElem = nElem;
   867    896     sqlite3_free(aSlot);
   868    897     return SQLITE_OK;
   869    898   }
   870    899   
   871    900   /*
   872    901   ** Initialize a file-writer object.
   873    902   */
................................................................................
   985   1014   **     * A varint. This varint contains the total number of bytes of content
   986   1015   **       in the PMA (not including the varint itself).
   987   1016   **
   988   1017   **     * One or more records packed end-to-end in order of ascending keys. 
   989   1018   **       Each record consists of a varint followed by a blob of data (the 
   990   1019   **       key). The varint is the number of bytes in the blob of data.
   991   1020   */
   992         -static int vdbeSorterListToPMA(SorterThread *pThread){
         1021  +static int vdbeSorterListToPMA(SorterThread *pThread, i64 nElem){
   993   1022     int rc = SQLITE_OK;             /* Return code */
   994   1023     FileWriter writer;              /* Object used to write to the file */
   995   1024   
   996   1025     memset(&writer, 0, sizeof(FileWriter));
   997   1026     assert( pThread->nInMemory>0 );
   998   1027   
   999   1028     /* If the first temporary PMA file has not been opened, open it now. */
  1000   1029     if( pThread->pTemp1==0 ){
  1001   1030       rc = vdbeSorterOpenTempFile(pThread->pVfs, &pThread->pTemp1);
  1002   1031       assert( rc!=SQLITE_OK || pThread->pTemp1 );
  1003   1032       assert( pThread->iTemp1Off==0 );
  1004   1033       assert( pThread->nPMA==0 );
         1034  +    assert( pThread->bEmbeddedSeq==0 );
  1005   1035     }
  1006   1036   
  1007   1037     /* Try to get the file to memory map */
  1008   1038     if( rc==SQLITE_OK ){
  1009   1039       rc = vdbeSorterExtendFile(
  1010         -        pThread->pTemp1, pThread->iTemp1Off + pThread->nInMemory + 9
         1040  +        pThread->pTemp1, pThread->iTemp1Off + 9 + 9 + 9 + pThread->nInMemory
  1011   1041       );
  1012   1042     }
  1013   1043   
  1014   1044     if( rc==SQLITE_OK ){
  1015   1045       SorterRecord *p;
  1016   1046       SorterRecord *pNext = 0;
  1017   1047   
  1018   1048       fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off);
  1019   1049       pThread->nPMA++;
         1050  +    fileWriterWriteVarint(&writer, (u64)pThread->iSeq);
         1051  +    fileWriterWriteVarint(&writer, (u64)nElem);
  1020   1052       fileWriterWriteVarint(&writer, pThread->nInMemory);
  1021   1053       for(p=pThread->pList; p; p=pNext){
  1022   1054         pNext = p->u.pNext;
  1023   1055         fileWriterWriteVarint(&writer, p->nVal);
  1024   1056         fileWriterWrite(&writer, SRVAL(p), p->nVal);
  1025   1057         if( pThread->aListMemory==0 ) sqlite3_free(p);
  1026   1058       }
................................................................................
  1087   1119         ** a value equivalent to pIter2. So set pKey2 to NULL to prevent
  1088   1120         ** vdbeSorterCompare() from decoding pIter2 again.
  1089   1121         **
  1090   1122         ** If the two values were equal, then the value from the oldest
  1091   1123         ** PMA should be considered smaller. The VdbeSorter.aIter[] array
  1092   1124         ** is sorted from oldest to newest, so pIter1 contains older values
  1093   1125         ** than pIter2 iff (pIter1<pIter2).  */
  1094         -      if( iRes<0 || (iRes==0 && pIter1<pIter2) ){
         1126  +      if( iRes<0 || (iRes==0 && pIter1->iSeq < pIter2->iSeq) ){
  1095   1127           pMerger->aTree[i] = (int)(pIter1 - pMerger->aIter);
  1096   1128           pIter2 = &pMerger->aIter[ pMerger->aTree[i ^ 0x0001] ];
  1097   1129           pKey2 = pIter2->aKey;
  1098   1130         }else{
  1099   1131           if( pIter1->pFile ) pKey2 = 0;
  1100   1132           pMerger->aTree[i] = (int)(pIter2 - pMerger->aIter);
  1101   1133           pIter1 = &pMerger->aIter[ pMerger->aTree[i ^ 0x0001] ];
................................................................................
  1184   1216   
  1185   1217           fileWriterInit(pTemp2, &writer, pThread->pgsz, iWriteOff);
  1186   1218           fileWriterWriteVarint(&writer, nOut);
  1187   1219           while( rc==SQLITE_OK && bEof==0 ){
  1188   1220             VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
  1189   1221             assert( pIter->pFile!=0 );        /* pIter is not at EOF */
  1190   1222             fileWriterWriteVarint(&writer, pIter->nKey);
         1223  +          fileWriterWriteVarint(&writer, (u64)pIter->iSeq);
  1191   1224             fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
  1192   1225             rc = vdbeSorterNext(pThread, pMerger, &bEof);
  1193   1226           }
  1194   1227           rc2 = fileWriterFinish(&writer, &iWriteOff);
  1195   1228           if( rc==SQLITE_OK ) rc = rc2;
  1196   1229         }
  1197   1230   
  1198   1231         vdbeSorterMergerFree(pMerger);
  1199   1232         sqlite3OsCloseFree(pThread->pTemp1);
  1200   1233         pThread->pTemp1 = pTemp2;
  1201   1234         pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT);
  1202   1235         pThread->iTemp1Off = iWriteOff;
         1236  +      pThread->bEmbeddedSeq = 1;
         1237  +      sqlite3OsUnfetch(pTemp2, 0, 0);
  1203   1238       }
  1204   1239     }else{
         1240  +    i64 nElem;
         1241  +
  1205   1242       /* Sort the pThread->pList list */
  1206         -    rc = vdbeSorterSort(pThread);
         1243  +    rc = vdbeSorterSort(pThread, &nElem);
  1207   1244   
  1208   1245       /* If required, write the list out to a PMA. */
  1209   1246       if( rc==SQLITE_OK && pThread->eWork==SORTER_THREAD_TO_PMA ){
  1210   1247   #ifdef SQLITE_DEBUG
  1211   1248         i64 nExpect = pThread->nInMemory
  1212   1249           + sqlite3VarintLen(pThread->nInMemory)
         1250  +        + sqlite3VarintLen(pThread->iSeq)
         1251  +        + sqlite3VarintLen(nElem)
  1213   1252           + pThread->iTemp1Off;
  1214   1253   #endif
  1215         -      rc = vdbeSorterListToPMA(pThread);
         1254  +      rc = vdbeSorterListToPMA(pThread, nElem);
  1216   1255         assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) );
  1217   1256       }
  1218   1257     }
  1219   1258   
  1220   1259    thread_out:
  1221   1260     pThread->bDone = 1;
  1222   1261     return SQLITE_INT_TO_PTR(rc);
................................................................................
  1264   1303     if( rc==SQLITE_OK ){
  1265   1304       int bUseFg = (bFg || i==(SQLITE_MAX_SORTER_THREAD-1));
  1266   1305   
  1267   1306       assert( pThread->pThread==0 && pThread->bDone==0 );
  1268   1307       pThread->eWork = SORTER_THREAD_TO_PMA;
  1269   1308       pThread->pList = pSorter->pRecord;
  1270   1309       pThread->nInMemory = pSorter->nInMemory;
         1310  +    pThread->iSeq = pSorter->iNextSeq++;
  1271   1311       pSorter->nInMemory = 0;
  1272   1312       pSorter->pRecord = 0;
  1273   1313   
  1274   1314       if( pSorter->aMemory ){
  1275   1315         u8 *aMem = pThread->aListMemory;
  1276   1316         pThread->aListMemory = pSorter->aMemory;
  1277   1317         pSorter->aMemory = aMem;