/ Check-in [643c86a0]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Even if compile time option SQLITE_MAX_WORKER_THREADS is set to one or greater, set the default number of worker threads to zero. Distribute data more evenly between threads in sqlite3VdbeSorterWrite() to improve performance when sorting large amounts of data. Add new test file sort2.test.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | threads
Files: files | file ages | folders
SHA1: 643c86a056168e39fcb7f39b8a72731f1eb246db
User & Date: dan 2014-04-01 15:38:44
Context
2014-04-01
18:41
When sorting data for a CREATE INDEX statement in single-threaded mode, assume that keys are delivered to the sorter in primary key order. Also fix various comments that had fallen out of date. check-in: 821d1ac4 user: dan tags: threads
15:38
Even if compile time option SQLITE_MAX_WORKER_THREADS is set to one or greater, set the default number of worker threads to zero. Distribute data more evenly between threads in sqlite3VdbeSorterWrite() to improve performance when sorting large amounts of data. Add new test file sort2.test. check-in: 643c86a0 user: dan tags: threads
10:19
Fix a problem with OOM handling in the sorter code. check-in: 59cd5229 user: dan tags: threads
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/global.c.

163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
   0,                         /* szScratch */
   0,                         /* nScratch */
   (void*)0,                  /* pPage */
   0,                         /* szPage */
   0,                         /* nPage */
   0,                         /* mxParserStack */
   0,                         /* sharedCacheEnabled */
   SQLITE_MAX_WORKER_THREADS, /* nWorker */
   /* All the rest should always be initialized to zero */
   0,                         /* isInit */
   0,                         /* inProgress */
   0,                         /* isMutexInit */
   0,                         /* isMallocInit */
   0,                         /* isPCacheInit */
   0,                         /* pInitMutex */







|







163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
   0,                         /* szScratch */
   0,                         /* nScratch */
   (void*)0,                  /* pPage */
   0,                         /* szPage */
   0,                         /* nPage */
   0,                         /* mxParserStack */
   0,                         /* sharedCacheEnabled */
   0,                         /* nWorker */
   /* All the rest should always be initialized to zero */
   0,                         /* isInit */
   0,                         /* inProgress */
   0,                         /* isMutexInit */
   0,                         /* isMallocInit */
   0,                         /* isPCacheInit */
   0,                         /* pInitMutex */

Changes to src/shell.c.

3528
3529
3530
3531
3532
3533
3534


3535
3536
3537
3538
3539
3540
3541
  data->mode = MODE_List;
  memcpy(data->separator,"|", 2);
  data->showHeader = 0;
  sqlite3_config(SQLITE_CONFIG_URI, 1);
  sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data);
  sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
  sqlite3_snprintf(sizeof(continuePrompt), continuePrompt,"   ...> ");


}

/*
** Output text to the console in a font that attracts extra attention.
*/
#ifdef _WIN32
static void printBold(const char *zText){







>
>







3528
3529
3530
3531
3532
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
  data->mode = MODE_List;
  memcpy(data->separator,"|", 2);
  data->showHeader = 0;
  sqlite3_config(SQLITE_CONFIG_URI, 1);
  sqlite3_config(SQLITE_CONFIG_LOG, shellLog, data);
  sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
  sqlite3_snprintf(sizeof(continuePrompt), continuePrompt,"   ...> ");
  sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3);
}

/*
** Output text to the console in a font that attracts extra attention.
*/
#ifdef _WIN32
static void printBold(const char *zText){

Changes to src/test_config.c.

94
95
96
97
98
99
100






101
102
103
104
105
106
107
#endif

#if SQLITE_MAX_MMAP_SIZE>0
  Tcl_SetVar2(interp, "sqlite_options", "mmap", "1", TCL_GLOBAL_ONLY);
#else
  Tcl_SetVar2(interp, "sqlite_options", "mmap", "0", TCL_GLOBAL_ONLY);
#endif







#if 1 /* def SQLITE_MEMDEBUG */
  Tcl_SetVar2(interp, "sqlite_options", "memdebug", "1", TCL_GLOBAL_ONLY);
#else
  Tcl_SetVar2(interp, "sqlite_options", "memdebug", "0", TCL_GLOBAL_ONLY);
#endif








>
>
>
>
>
>







94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#endif

#if SQLITE_MAX_MMAP_SIZE>0
  Tcl_SetVar2(interp, "sqlite_options", "mmap", "1", TCL_GLOBAL_ONLY);
#else
  Tcl_SetVar2(interp, "sqlite_options", "mmap", "0", TCL_GLOBAL_ONLY);
#endif

#if SQLITE_MAX_WORKER_THREADS>0
  Tcl_SetVar2(interp, "sqlite_options", "worker_threads", "1", TCL_GLOBAL_ONLY);
#else
  Tcl_SetVar2(interp, "sqlite_options", "worker_threads", "0", TCL_GLOBAL_ONLY);
#endif

#if 1 /* def SQLITE_MEMDEBUG */
  Tcl_SetVar2(interp, "sqlite_options", "memdebug", "1", TCL_GLOBAL_ONLY);
#else
  Tcl_SetVar2(interp, "sqlite_options", "memdebug", "0", TCL_GLOBAL_ONLY);
#endif

Changes to src/test_malloc.c.

1248
1249
1250
1251
1252
1253
1254


























1255
1256
1257
1258
1259
1260
1261
....
1502
1503
1504
1505
1506
1507
1508

1509
1510
1511
1512
1513
1514
1515
  }

  rc = sqlite3_config(SQLITE_CONFIG_COVERING_INDEX_SCAN, bUseCis);
  Tcl_SetResult(interp, (char *)sqlite3ErrName(rc), TCL_VOLATILE);

  return TCL_OK;
}



























/*
** Usage:    sqlite3_dump_memsys3  FILENAME
**           sqlite3_dump_memsys5  FILENAME
**
** Write a summary of unfreed memsys3 allocations to FILENAME.
*/
................................................................................
     { "sqlite3_config_heap",        test_config_heap              ,0 },
     { "sqlite3_config_heap_size",   test_config_heap_size         ,0 },
     { "sqlite3_config_memstatus",   test_config_memstatus         ,0 },
     { "sqlite3_config_lookaside",   test_config_lookaside         ,0 },
     { "sqlite3_config_error",       test_config_error             ,0 },
     { "sqlite3_config_uri",         test_config_uri               ,0 },
     { "sqlite3_config_cis",         test_config_cis               ,0 },

     { "sqlite3_db_config_lookaside",test_db_config_lookaside      ,0 },
     { "sqlite3_dump_memsys3",       test_dump_memsys3             ,3 },
     { "sqlite3_dump_memsys5",       test_dump_memsys3             ,5 },
     { "sqlite3_install_memsys3",    test_install_memsys3          ,0 },
     { "sqlite3_memdebug_vfs_oom_test", test_vfs_oom_test          ,0 },
  };
  int i;







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







>







1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
....
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
  }

  rc = sqlite3_config(SQLITE_CONFIG_COVERING_INDEX_SCAN, bUseCis);
  Tcl_SetResult(interp, (char *)sqlite3ErrName(rc), TCL_VOLATILE);

  return TCL_OK;
}

/*
** Usage:    sqlite3_config_worker_threads N
*/
static int test_config_worker_threads(
  void * clientData, 
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  int rc;
  int nThread;

  if( objc!=2 ){
    Tcl_WrongNumArgs(interp, 1, objv, "N");
    return TCL_ERROR;
  }
  if( Tcl_GetIntFromObj(interp, objv[1], &nThread) ){
    return TCL_ERROR;
  }

  rc = sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, nThread);
  Tcl_SetResult(interp, (char *)sqlite3ErrName(rc), TCL_VOLATILE);

  return TCL_OK;
}

/*
** Usage:    sqlite3_dump_memsys3  FILENAME
**           sqlite3_dump_memsys5  FILENAME
**
** Write a summary of unfreed memsys3 allocations to FILENAME.
*/
................................................................................
     { "sqlite3_config_heap",        test_config_heap              ,0 },
     { "sqlite3_config_heap_size",   test_config_heap_size         ,0 },
     { "sqlite3_config_memstatus",   test_config_memstatus         ,0 },
     { "sqlite3_config_lookaside",   test_config_lookaside         ,0 },
     { "sqlite3_config_error",       test_config_error             ,0 },
     { "sqlite3_config_uri",         test_config_uri               ,0 },
     { "sqlite3_config_cis",         test_config_cis               ,0 },
     { "sqlite3_config_worker_threads", test_config_worker_threads ,0 },
     { "sqlite3_db_config_lookaside",test_db_config_lookaside      ,0 },
     { "sqlite3_dump_memsys3",       test_dump_memsys3             ,3 },
     { "sqlite3_dump_memsys5",       test_dump_memsys3             ,5 },
     { "sqlite3_install_memsys3",    test_install_memsys3          ,0 },
     { "sqlite3_memdebug_vfs_oom_test", test_vfs_oom_test          ,0 },
  };
  int i;

Changes to src/vdbesort.c.

176
177
178
179
180
181
182

183
184
185
186
187
188
189
....
1247
1248
1249
1250
1251
1252
1253
1254

1255
1256
1257


1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271

1272




1273
1274
1275
1276
1277
1278
1279
....
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
....
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  int bUsePMA;                    /* True if one or more PMAs created */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */

  int nThread;                    /* Size of aThread[] array */
  SorterThread aThread[1];
};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
................................................................................
**
** If argument bFg is non-zero, the operation always uses the calling thread.
*/
static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
  VdbeSorter *pSorter = pCsr->pSorter;
  int rc = SQLITE_OK;
  int i;
  SorterThread *pThread;        /* Thread context used to create new PMA */


  pSorter->bUsePMA = 1;
  for(i=0; ALWAYS( i<pSorter->nThread ); i++){


    pThread = &pSorter->aThread[i];
#if SQLITE_MAX_WORKER_THREADS>0
    if( pThread->bDone ){
      void *pRet;
      assert( pThread->pThread );
      rc = sqlite3ThreadJoin(pThread->pThread, &pRet);
      pThread->pThread = 0;
      pThread->bDone = 0;
      if( rc==SQLITE_OK ){
        rc = SQLITE_PTR_TO_INT(pRet);
      }
    }
#endif
    if( pThread->pThread==0 ) break;

  }





  if( rc==SQLITE_OK ){
    assert( pThread->pThread==0 && pThread->bDone==0 );
    pThread->eWork = SORTER_THREAD_TO_PMA;
    pThread->pList = pSorter->pRecord;
    pThread->nInMemory = pSorter->nInMemory;
    pSorter->nInMemory = 0;
................................................................................
    if( pSorter->aMemory ){
      u8 *aMem = pThread->aListMemory;
      pThread->aListMemory = pSorter->aMemory;
      pSorter->aMemory = aMem;
    }

#if SQLITE_MAX_WORKER_THREADS>0
    if( bFg || i==(pSorter->nThread-1) ){
      /* Launch a background thread for this operation */
      void *pCtx = (void*)pThread;
      assert( pSorter->aMemory==0 || pThread->aListMemory!=0 );
      if( pThread->aListMemory ){
        if( pSorter->aMemory==0 ){
          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
................................................................................
  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
  ** some of them together so that this is no longer the case. */
  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
    int i;
    for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){
      SorterThread *pThread = &pSorter->aThread[i];
      if( pThread->pTemp1 ){
        pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/pSorter->nThread;
        pThread->eWork = SORTER_THREAD_CONS;

#if SQLITE_MAX_WORKER_THREADS>0
        if( i<(pSorter->nThread-1) ){
          void *pCtx = (void*)pThread;
          rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
        }else







>







 







|
>


<
>
>
|













>

>
>
>
>







 







|







 







|







176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
....
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258

1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
....
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
....
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
  int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
  int bUsePMA;                    /* True if one or more PMAs created */
  SorterRecord *pRecord;          /* Head of in-memory record list */
  SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
  u8 *aMemory;                    /* Block of memory to alloc records from */
  int iMemory;                    /* Offset of first free byte in aMemory */
  int nMemory;                    /* Size of aMemory allocation in bytes */
  int iPrev;                      /* Previous thread used to flush PMA */
  int nThread;                    /* Size of aThread[] array */
  SorterThread aThread[1];
};

/*
** The following type is an iterator for a PMA. It caches the current key in 
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
................................................................................
**
** If argument bFg is non-zero, the operation always uses the calling thread.
*/
static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
  VdbeSorter *pSorter = pCsr->pSorter;
  int rc = SQLITE_OK;
  int i;
  SorterThread *pThread = 0;    /* Thread context used to create new PMA */
  int nWorker = (pSorter->nThread-1);

  pSorter->bUsePMA = 1;

  for(i=0; i<nWorker; i++){
    int iTest = (pSorter->iPrev + i + 1) % nWorker;
    pThread = &pSorter->aThread[iTest];
#if SQLITE_MAX_WORKER_THREADS>0
    if( pThread->bDone ){
      void *pRet;
      assert( pThread->pThread );
      rc = sqlite3ThreadJoin(pThread->pThread, &pRet);
      pThread->pThread = 0;
      pThread->bDone = 0;
      if( rc==SQLITE_OK ){
        rc = SQLITE_PTR_TO_INT(pRet);
      }
    }
#endif
    if( pThread->pThread==0 ) break;
    pThread = 0;
  }
  if( pThread==0 ){
    pThread = &pSorter->aThread[nWorker];
  }
  pSorter->iPrev = (pThread - pSorter->aThread);

  if( rc==SQLITE_OK ){
    assert( pThread->pThread==0 && pThread->bDone==0 );
    pThread->eWork = SORTER_THREAD_TO_PMA;
    pThread->pList = pSorter->pRecord;
    pThread->nInMemory = pSorter->nInMemory;
    pSorter->nInMemory = 0;
................................................................................
    if( pSorter->aMemory ){
      u8 *aMem = pThread->aListMemory;
      pThread->aListMemory = pSorter->aMemory;
      pSorter->aMemory = aMem;
    }

#if SQLITE_MAX_WORKER_THREADS>0
    if( !bFg && pThread!=&pSorter->aThread[nWorker] ){
      /* Launch a background thread for this operation */
      void *pCtx = (void*)pThread;
      assert( pSorter->aMemory==0 || pThread->aListMemory!=0 );
      if( pThread->aListMemory ){
        if( pSorter->aMemory==0 ){
          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
................................................................................
  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
  ** some of them together so that this is no longer the case. */
  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
    int i;
    for(i=0; rc==SQLITE_OK && i<pSorter->nThread; i++){
      SorterThread *pThread = &pSorter->aThread[i];
      if( pThread->pTemp1 ){
        pThread->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nThread;
        pThread->eWork = SORTER_THREAD_CONS;

#if SQLITE_MAX_WORKER_THREADS>0
        if( i<(pSorter->nThread-1) ){
          void *pCtx = (void*)pThread;
          rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
        }else

Added test/sort2.test.



















































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 2014 March 25.
#
# The author disclaims copyright to this source code.  In place of
# a legal notice, here is a blessing:
#
#    May you do good and not evil.
#    May you find forgiveness for yourself and forgive others.
#    May you share freely, never taking more than you give.
#
#***********************************************************************
# This file implements regression tests for SQLite library. 
#

set testdir [file dirname $argv0]
source $testdir/tester.tcl
set testprefix sort2

db close
sqlite3_shutdown
sqlite3_config_worker_threads 7
reset_db

do_execsql_test 1 {
  PRAGMA cache_size = 5;
  WITH r(x,y) AS (
    SELECT 1, randomblob(100)
    UNION ALL
    SELECT x+1, randomblob(100) FROM r
    LIMIT 100000
  )
  SELECT count(x), length(y) FROM r GROUP BY (x%5)
} {
  20000 100 20000 100 20000 100 20000 100 20000 100
}

db close
sqlite3_shutdown
sqlite3_config_worker_threads 0
sqlite3_initialize
finish_test