SQLite4
Check-in [c025a26642]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a problem with lsm_mt2 in lsmtest_tdb3.c.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | range-delete
Files: files | file ages | folders
SHA1: c025a2664282ff96c492721d4104df19e34811bf
User & Date: dan 2012-10-15 14:26:39
Context
2012-10-15
15:51
Fix a crash that can follow an OOM error. And an error in lsm_csr_seek(LEFAST). check-in: 3cb77a8512 user: dan tags: range-delete
14:26
Fix a problem with lsm_mt2 in lsmtest_tdb3.c. check-in: c025a26642 user: dan tags: range-delete
2012-10-14
09:41
Range-delete related SEEK_GE and SEEK_LE fixes. check-in: d30de7f821 user: dan tags: range-delete
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest1.c.

68
69
70
71
72
73
74

75
76
77
78
79
80
81
...
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
...
410
411
412
413
414
415
416


417
418
419





420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
...
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463



464
465
466
467
468
469
470
** tested and once using a control system (sqlite3, kc etc. - something that 
** works). In order to verify that the contents of the db being tested are
** correct, the test runs a bunch of scans and lookups on both the test and
** control databases. If the results are the same, the test passes.
*/
struct Datatest2 {
  DatasourceDefn defn;

  int nWrite;                     /* Number of writes per iteration */
  int nIter;                      /* Total number of iterations to run */
};

/*
** Generate a unique name for the test case pTest with database system
** zSystem.
................................................................................
  Datatest2 *p,                   /* Structure containing test parameters */
  int *pRc                        /* OUT: Error code */
){
  TestDb *pDb;
  TestDb *pControl;
  Datasource *pData;
  int i;
  int rc;
  int iDot = 0;

  /* Start the test case, open a database and allocate the datasource. */
  pDb = testOpen(zSystem, 1, &rc);
  pData = testDatasourceNew(&p->defn);
  rc = testControlDb(&pControl);

................................................................................
    int nBuf = 32 * 1024 * 1024;
    lsm_config(tdb_lsm(pDb), LSM_CONFIG_WRITE_BUFFER, &nBuf);
  }

  for(i=0; rc==0 && i<p->nIter; i++){
    void *pKey1; int nKey1;
    void *pKey2; int nKey2;



    testWriteDatasourceRange(pDb, pData, i*p->nWrite, p->nWrite, &rc);
    testWriteDatasourceRange(pControl, pData, i*p->nWrite, p->nWrite, &rc);






    testDatasourceEntry(pData, i+1000000, &pKey1, &nKey1, 0, 0);
    pKey1 = testMallocCopy(pKey1, nKey1);
    testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0);

    testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc);
    testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc);
    testFree(pKey1);

    testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc);
    testReopen(&pDb, &rc);
    testCompareDb(pData, (p->nIter*p->nWrite), i, pControl, pDb, &rc);

    /* Update the progress dots... */
    testCaseProgress(i, p->nIter, testCaseNDot(), &iDot);
  }

  testClose(&pDb);
  testClose(&pControl);
................................................................................
  *pRc = rc;
}

static char *getName2(const char *zSystem, Datatest2 *pTest){
  char *zRet;
  char *zData;
  zData = testDatasourceName(&pTest->defn);
  zRet = testMallocPrintf("data2.%s.%s.%d.%d", 
      zSystem, zData, pTest->nWrite, pTest->nIter
  );
  testFree(zData);
  return zRet;
}

void test_data_2(
  const char *zSystem,            /* Database system name */
  const char *zPattern,           /* Run test cases that match this pattern */
  int *pRc                        /* IN/OUT: Error code */
){
  Datatest2 aTest[] = {
      /* defn,                                  nWrite, nIter */
    { {DATA_RANDOM,     20,25,     100,200},        10, 50 },
    { {DATA_RANDOM,     20,25,     100,200},       200, 50 },



#if 0
    { {DATA_RANDOM,     20,25,     100,200},       200, 50 }
#endif
  };

  int i;








>







 







|







 







>
>

<
<
>
>
>
>
>









|

|







 







|
|











|
|
|
>
>
>







68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
...
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
...
411
412
413
414
415
416
417
418
419
420


421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
...
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
** tested and once using a control system (sqlite3, kc etc. - something that 
** works). In order to verify that the contents of the db being tested are
** correct, the test runs a bunch of scans and lookups on both the test and
** control databases. If the results are the same, the test passes.
*/
struct Datatest2 {
  DatasourceDefn defn;
  int nRange;
  int nWrite;                     /* Number of writes per iteration */
  int nIter;                      /* Total number of iterations to run */
};

/*
** Generate a unique name for the test case pTest with database system
** zSystem.
................................................................................
  Datatest2 *p,                   /* Structure containing test parameters */
  int *pRc                        /* OUT: Error code */
){
  TestDb *pDb;
  TestDb *pControl;
  Datasource *pData;
  int i;
  int rc = LSM_OK;
  int iDot = 0;

  /* Start the test case, open a database and allocate the datasource. */
  pDb = testOpen(zSystem, 1, &rc);
  pData = testDatasourceNew(&p->defn);
  rc = testControlDb(&pControl);

................................................................................
    int nBuf = 32 * 1024 * 1024;
    lsm_config(tdb_lsm(pDb), LSM_CONFIG_WRITE_BUFFER, &nBuf);
  }

  for(i=0; rc==0 && i<p->nIter; i++){
    void *pKey1; int nKey1;
    void *pKey2; int nKey2;
    int ii;
    int nRange = MIN(p->nIter*p->nWrite, p->nRange);



    for(ii=0; rc==0 && ii<p->nWrite; ii++){
      int iKey = (i*p->nWrite + ii) % p->nRange;
      testWriteDatasource(pControl, pData, iKey, &rc);
      testWriteDatasource(pDb, pData, iKey, &rc);
    }

    testDatasourceEntry(pData, i+1000000, &pKey1, &nKey1, 0, 0);
    pKey1 = testMallocCopy(pKey1, nKey1);
    testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0);

    testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc);
    testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc);
    testFree(pKey1);

    testCompareDb(pData, nRange, i, pControl, pDb, &rc);
    testReopen(&pDb, &rc);
    testCompareDb(pData, nRange, i, pControl, pDb, &rc);

    /* Update the progress dots... */
    testCaseProgress(i, p->nIter, testCaseNDot(), &iDot);
  }

  testClose(&pDb);
  testClose(&pControl);
................................................................................
  *pRc = rc;
}

static char *getName2(const char *zSystem, Datatest2 *pTest){
  char *zRet;
  char *zData;
  zData = testDatasourceName(&pTest->defn);
  zRet = testMallocPrintf("data2.%s.%s.%d.%d.%d", 
      zSystem, zData, pTest->nRange, pTest->nWrite, pTest->nIter
  );
  testFree(zData);
  return zRet;
}

void test_data_2(
  const char *zSystem,            /* Database system name */
  const char *zPattern,           /* Run test cases that match this pattern */
  int *pRc                        /* IN/OUT: Error code */
){
  Datatest2 aTest[] = {
      /* defn,                                 nRange, nWrite, nIter */
    { {DATA_RANDOM,     20,25,     100,200},   10000,  10,     50   },
    { {DATA_RANDOM,     20,25,     100,200},   10000,  200,    50   },
    { {DATA_RANDOM,     20,25,     100,200},   100,    10,     1000 },
    { {DATA_RANDOM,     20,25,     100,200},   100,    200,    50   },

#if 0
    { {DATA_RANDOM,     20,25,     100,200},       200, 50 }
#endif
  };

  int i;

Changes to lsm-test/lsmtest_tdb3.c.

1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171

1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
    }
  }

  return rc;
}


static int test_lsm_mt(
  const char *zFilename,          /* File to open */
  int nWorker,                    /* Either 1 or 2, for worker threads */
  int bClear,                     /* True to delete any existing db */
  TestDb **ppDb                   /* OUT: TestDb database handle */
){
  LsmDb *pDb;
  int rc;

  rc = test_lsm_open(zFilename, bClear, ppDb);
  pDb = (LsmDb *)*ppDb;

  assert( nWorker==1 || nWorker==2 );

  /* Turn off auto-work and configure a work-hook on the client connection. */
  if( rc==0 ){
    int bAutowork = 0;
    lsm_config(pDb->db, LSM_CONFIG_AUTOWORK, &bAutowork);
    lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);
  }

  if( rc==0 ){
    pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * nWorker);
    memset(pDb->aWorker, 0, sizeof(LsmWorker) * nWorker);
    pDb->nWorker = nWorker;

    rc = mt_start_worker(pDb, 0, zFilename, 0, LSM_WORK_FLUSH, 
        nWorker==1 ? 512 : 0, 1
    );
  }

  if( rc==0 && nWorker==2 ){
    rc = mt_start_worker(pDb, 1, zFilename, 0, 0, 512, 0);
  }

  return rc;
}

int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 1, bClear, ppDb);
}

int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  return test_lsm_mt(zFilename, 2, bClear, ppDb);

}


#else
static void mt_shutdown(LsmDb *pDb) { 
  unused_parameter(pDb); 
}
int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
  unused_parameter(zFilename);
  unused_parameter(bClear);
  unused_parameter(ppDb);
  testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
  return 1;
}
#endif







|
|
<
<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<



|
>

<













1121
1122
1123
1124
1125
1126
1127
1128
1129







1130






























1131
1132
1133
1134
1135
1136

1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
    }
  }

  return rc;
}


int test_lsm_mt2(const char *zFilename, int bClear, TestDb **ppDb){
  const char *zCfg = "threads=2";







  return testLsmOpen(zCfg, zFilename, bClear, ppDb);






























}

int test_lsm_mt3(const char *zFilename, int bClear, TestDb **ppDb){
  const char *zCfg = "threads=3";
  return testLsmOpen(zCfg, zFilename, bClear, ppDb);
}


#else
static void mt_shutdown(LsmDb *pDb) { 
  unused_parameter(pDb); 
}
int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
  unused_parameter(zFilename);
  unused_parameter(bClear);
  unused_parameter(ppDb);
  testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
  return 1;
}
#endif

Changes to src/lsm_sorted.c.

1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
....
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
....
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357
3358
3359
}

/*
** This function advances segment pointer iPtr belonging to multi-cursor
** pCsr forward (bReverse==0) or backward (bReverse!=0).
**
** If the segment pointer points to a segment that is part of a composite
** level, then the following special cases are handled.
**
**   * If iPtr is the lhs of a composite level, and the cursor is being
**     advanced forwards, and segment iPtr is at EOF, move all pointers
**     that correspond to rhs segments of the same level to the first
**     key in their respective data.
**
**   * If iPtr is on the rhs of a composite level, and the cursor is being
**     reversed, and the new key is smaller than the split-key, set the
**     segment pointer to point to EOF.
*/
static int segmentCursorAdvance(
  MultiCursor *pCsr, 
  int iPtr,
  int bReverse
){
  int rc;
................................................................................
  bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);

  if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
   && bReverse==0                                /* csr advanced forwards */
   && pPtr->pPg==0                               /* segment at EOF */
  ){
    int i;

    for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
      rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
    }

    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, 0);
    }
  }

#if 0
  else if( pPtr->pPg && bComposite && bReverse && pPtr->pSeg!=&pLvl->lhs ){
    int res = sortedKeyCompare(pCsr->pDb->xCmp,
        rtTopic(pPtr->eType), pPtr->pKey, pPtr->nKey,
        pLvl->iSplitTopic, pLvl->pSplitKey, pLvl->nSplitKey
    );
    if( res<0 ){
      segmentPtrReset(pPtr);
    }
  }
#endif

  return rc;
}

static void mcursorFreeComponents(MultiCursor *pCsr){
  int i;
  lsm_env *pEnv = pCsr->pDb->pEnv;

................................................................................
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
     ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){
#if 0
      iFPtr = iFPtr + (nRec ? pageGetRecordPtr(aData, nData, nRec-1) : 0);
      iRPtr = iPtr - iFPtr;
#endif
      iFPtr = *pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;

      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;







|





<
<
<
<







 







<



<





<
<
<
<
<
<
<
<
<
<
<
<







 







<
<
<
<







1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989




1990
1991
1992
1993
1994
1995
1996
....
2003
2004
2005
2006
2007
2008
2009

2010
2011
2012

2013
2014
2015
2016
2017












2018
2019
2020
2021
2022
2023
2024
....
3324
3325
3326
3327
3328
3329
3330




3331
3332
3333
3334
3335
3336
3337
}

/*
** This function advances segment pointer iPtr belonging to multi-cursor
** pCsr forward (bReverse==0) or backward (bReverse!=0).
**
** If the segment pointer points to a segment that is part of a composite
** level, then the following special case is handled.
**
**   * If iPtr is the lhs of a composite level, and the cursor is being
**     advanced forwards, and segment iPtr is at EOF, move all pointers
**     that correspond to rhs segments of the same level to the first
**     key in their respective data.




*/
static int segmentCursorAdvance(
  MultiCursor *pCsr, 
  int iPtr,
  int bReverse
){
  int rc;
................................................................................
  bComposite = (pLvl->nRight>0 && pCsr->nPtr>pLvl->nRight);

  if( bComposite && pPtr->pSeg==&pLvl->lhs       /* lhs of composite level */
   && bReverse==0                                /* csr advanced forwards */
   && pPtr->pPg==0                               /* segment at EOF */
  ){
    int i;

    for(i=0; rc==LSM_OK && i<pLvl->nRight; i++){
      rc = sortedRhsFirst(pCsr, pLvl, &pCsr->aPtr[iPtr+1+i]);
    }

    for(i=pCsr->nTree-1; i>0; i--){
      multiCursorDoCompare(pCsr, i, 0);
    }
  }













  return rc;
}

static void mcursorFreeComponents(MultiCursor *pCsr){
  int i;
  lsm_env *pEnv = pCsr->pDb->pEnv;

................................................................................
    nHdr = 1 + lsmVarintLen32(iRPtr) + lsmVarintLen32(nKey);
    if( rtIsWrite(eType) ) nHdr += lsmVarintLen32(nVal);

    /* If the entire header will not fit on page pPg, or if page pPg is 
     ** marked read-only, advance to the next page of the output run. */
    iOff = pMerge->iOutputOff;
    if( iOff<0 || iOff+nHdr > SEGMENT_EOF(nData, nRec+1) ){




      iFPtr = *pCsr->pPrevMergePtr;
      iRPtr = iPtr - iFPtr;

      iOff = 0;
      nRec = 0;
      rc = mergeWorkerNextPage(pMW, iFPtr);
      pPg = pMW->pPage;