/ Check-in [e34eafd4]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add more tests for LSM log file recovery. Fix a problem in recovering log files that contain range deletes.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: e34eafd4c5b2bbf2735e136ad69b67bb4288ad4d01a0128d8e107ac46209a182
User & Date: dan 2017-07-03 09:00:18
Context
2017-07-03
17:37
Attempt to improve documentation on sqlite3_column_ and sqlite3_value_ interfaces. check-in: 9111ac69 user: drh tags: trunk
09:00
Add more tests for LSM log file recovery. Fix a problem in recovering log files that contain range deletes. check-in: e34eafd4 user: dan tags: trunk
2017-07-01
20:59
Fix a memory management problem in lsm log recovery code. check-in: dd55af30 user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to ext/lsm1/lsm-test/lsmtest1.c.

    77     77     int nIter;                      /* Total number of iterations to run */
    78     78   };
    79     79   
    80     80   /*
    81     81   ** Generate a unique name for the test case pTest with database system
    82     82   ** zSystem.
    83     83   */
    84         -static char *getName(const char *zSystem, Datatest1 *pTest){
           84  +static char *getName(const char *zSystem, int bRecover, Datatest1 *pTest){
    85     85     char *zRet;
    86     86     char *zData;
    87     87     zData = testDatasourceName(&pTest->defn);
    88         -  zRet = testMallocPrintf("data.%s.%s.%d.%d", 
    89         -      zSystem, zData, pTest->nRow, pTest->nVerify
           88  +  zRet = testMallocPrintf("data.%s.%s.rec=%d.%d.%d", 
           89  +      zSystem, zData, bRecover, pTest->nRow, pTest->nVerify
    90     90     );
    91     91     testFree(zData);
    92     92     return zRet;
    93     93   }
    94     94   
    95     95   int testControlDb(TestDb **ppDb){
    96     96   #ifdef HAVE_KYOTOCABINET
................................................................................
   246    246   static void printScanCb(
   247    247       void *pCtx, void *pKey, int nKey, void *pVal, int nVal
   248    248   ){
   249    249     printf("%s\n", (char *)pKey);
   250    250     fflush(stdout);
   251    251   }
   252    252   #endif
          253  +
          254  +void testReopenRecover(TestDb **ppDb, int *pRc){
          255  +  if( *pRc==0 ){
          256  +    const char *zLib = tdb_library_name(*ppDb);
          257  +    const char *zDflt = tdb_default_db(zLib);
          258  +    testCopyLsmdb(zDflt, "bak.db");
          259  +    testClose(ppDb);
          260  +    testCopyLsmdb("bak.db", zDflt);
          261  +    *pRc = tdb_open(zLib, 0, 0, ppDb);
          262  +  }
          263  +}
          264  +
   253    265   
   254    266   static void doDataTest1(
   255    267     const char *zSystem,            /* Database system to test */
          268  +  int bRecover,
   256    269     Datatest1 *p,                   /* Structure containing test parameters */
   257    270     int *pRc                        /* OUT: Error code */
   258    271   ){
   259    272     int i;
   260    273     int iDot;
   261    274     int rc = LSM_OK;
   262    275     Datasource *pData;
................................................................................
   273    286       /* Insert some data */
   274    287       testWriteDatasourceRange(pDb, pData, i, p->nVerify, &rc);
   275    288       i += p->nVerify;
   276    289   
   277    290       /* Check that the db content is correct. */
   278    291       testDbContents(pDb, pData, p->nRow, 0, i-1, p->nTest, p->bTestScan, &rc);
   279    292   
   280         -    /* Close and reopen the database. */
   281         -    testReopen(&pDb, &rc);
          293  +    if( bRecover ){
          294  +      testReopenRecover(&pDb, &rc);
          295  +    }else{
          296  +      testReopen(&pDb, &rc);
          297  +    }
   282    298   
   283    299       /* Check that the db content is still correct. */
   284    300       testDbContents(pDb, pData, p->nRow, 0, i-1, p->nTest, p->bTestScan, &rc);
   285    301   
   286    302       /* Update the progress dots... */
   287    303       testCaseProgress(i, p->nRow, testCaseNDot()/2, &iDot);
   288    304     }
................................................................................
   295    311       testDeleteDatasourceRange(pDb, pData, i, p->nVerify, &rc);
   296    312       i += p->nVerify;
   297    313   
   298    314       /* Check that the db content is correct. */
   299    315       testDbContents(pDb, pData, p->nRow, i, p->nRow-1,p->nTest,p->bTestScan,&rc);
   300    316   
   301    317       /* Close and reopen the database. */
   302         -    testReopen(&pDb, &rc);
          318  +    if( bRecover ){
          319  +      testReopenRecover(&pDb, &rc);
          320  +    }else{
          321  +      testReopen(&pDb, &rc);
          322  +    }
   303    323   
   304    324       /* Check that the db content is still correct. */
   305    325       testDbContents(pDb, pData, p->nRow, i, p->nRow-1,p->nTest,p->bTestScan,&rc);
   306    326   
   307    327       /* Update the progress dots... */
   308    328       testCaseProgress(i, p->nRow, testCaseNDot()/2, &iDot);
   309    329     }
................................................................................
   335    355       { {DATA_SEQUENTIAL, 5,10,      1000,2000},     1000,  250, 1000, 1},
   336    356       { {DATA_SEQUENTIAL, 5,100,     10000,20000},    100,   25,  100, 1},
   337    357       { {DATA_RANDOM,     10,10,     100,100},     100000, 1000,  100, 0},
   338    358       { {DATA_SEQUENTIAL, 10,10,     100,100},     100000, 1000,  100, 0},
   339    359     };
   340    360   
   341    361     int i;
          362  +  int bRecover;
   342    363   
   343         -  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
   344         -    char *zName = getName(zSystem, &aTest[i]);
   345         -    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
   346         -      doDataTest1(zSystem, &aTest[i], pRc);
          364  +  for(bRecover=0; bRecover<2; bRecover++){
          365  +    if( bRecover==1 && memcmp(zSystem, "lsm", 3) ) break;
          366  +    for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
          367  +      char *zName = getName(zSystem, bRecover, &aTest[i]);
          368  +      if( testCaseBegin(pRc, zPattern, "%s", zName) ){
          369  +        doDataTest1(zSystem, bRecover, &aTest[i], pRc);
          370  +      }
          371  +      testFree(zName);
   347    372       }
   348         -    testFree(zName);
   349    373     }
   350    374   }
   351    375   
   352    376   void testCompareDb(
   353    377     Datasource *pData,
   354    378     int nData,
   355    379     int iSeed,
................................................................................
   392    416       testDatasourceEntry(pData, i, &pKey, &nKey, 0, 0);
   393    417       testFetchCompare(pControl, pDb, pKey, nKey, pRc);
   394    418     }
   395    419   }
   396    420   
   397    421   static void doDataTest2(
   398    422     const char *zSystem,            /* Database system to test */
          423  +  int bRecover,
   399    424     Datatest2 *p,                   /* Structure containing test parameters */
   400    425     int *pRc                        /* OUT: Error code */
   401    426   ){
   402    427     TestDb *pDb;
   403    428     TestDb *pControl;
   404    429     Datasource *pData;
   405    430     int i;
................................................................................
   433    458       testDatasourceEntry(pData, i+2000000, &pKey2, &nKey2, 0, 0);
   434    459   
   435    460       testDeleteRange(pDb, pKey1, nKey1, pKey2, nKey2, &rc);
   436    461       testDeleteRange(pControl, pKey1, nKey1, pKey2, nKey2, &rc);
   437    462       testFree(pKey1);
   438    463   
   439    464       testCompareDb(pData, nRange, i, pControl, pDb, &rc);
   440         -    testReopen(&pDb, &rc);
          465  +    if( bRecover ){
          466  +      testReopenRecover(&pDb, &rc);
          467  +    }else{
          468  +      testReopen(&pDb, &rc);
          469  +    }
   441    470       testCompareDb(pData, nRange, i, pControl, pDb, &rc);
   442    471   
   443    472       /* Update the progress dots... */
   444    473       testCaseProgress(i, p->nIter, testCaseNDot(), &iDot);
   445    474     }
   446    475   
   447    476     testClose(&pDb);
   448    477     testClose(&pControl);
   449    478     testDatasourceFree(pData);
   450    479     testCaseFinish(rc);
   451    480     *pRc = rc;
   452    481   }
   453    482   
   454         -static char *getName2(const char *zSystem, Datatest2 *pTest){
          483  +static char *getName2(const char *zSystem, int bRecover, Datatest2 *pTest){
   455    484     char *zRet;
   456    485     char *zData;
   457    486     zData = testDatasourceName(&pTest->defn);
   458         -  zRet = testMallocPrintf("data2.%s.%s.%d.%d.%d", 
   459         -      zSystem, zData, pTest->nRange, pTest->nWrite, pTest->nIter
          487  +  zRet = testMallocPrintf("data2.%s.%s.rec=%d.%d.%d.%d", 
          488  +      zSystem, zData, bRecover, pTest->nRange, pTest->nWrite, pTest->nIter
   460    489     );
   461    490     testFree(zData);
   462    491     return zRet;
   463    492   }
   464    493   
   465    494   void test_data_2(
   466    495     const char *zSystem,            /* Database system name */
................................................................................
   472    501       { {DATA_RANDOM,     20,25,     100,200},   10000,  10,     50   },
   473    502       { {DATA_RANDOM,     20,25,     100,200},   10000,  200,    50   },
   474    503       { {DATA_RANDOM,     20,25,     100,200},   100,    10,     1000 },
   475    504       { {DATA_RANDOM,     20,25,     100,200},   100,    200,    50   },
   476    505     };
   477    506   
   478    507     int i;
          508  +  int bRecover;
   479    509   
   480         -  for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
   481         -    char *zName = getName2(zSystem, &aTest[i]);
   482         -    if( testCaseBegin(pRc, zPattern, "%s", zName) ){
   483         -      doDataTest2(zSystem, &aTest[i], pRc);
          510  +  for(bRecover=0; bRecover<2; bRecover++){
          511  +    if( bRecover==1 && memcmp(zSystem, "lsm", 3) ) break;
          512  +    for(i=0; *pRc==LSM_OK && i<ArraySize(aTest); i++){
          513  +      char *zName = getName2(zSystem, bRecover, &aTest[i]);
          514  +      if( testCaseBegin(pRc, zPattern, "%s", zName) ){
          515  +        doDataTest2(zSystem, bRecover, &aTest[i], pRc);
          516  +      }
          517  +      testFree(zName);
   484    518       }
   485         -    testFree(zName);
   486    519     }
   487    520   }
   488    521   
   489    522   /*************************************************************************
   490    523   ** Test case data3.*
   491    524   */
   492    525   

Changes to ext/lsm1/lsm-test/lsmtest_main.c.

    22     22     return; 
    23     23   }
    24     24   
    25     25   #define testSetError(rc) testSetErrorFunc(rc, pRc, __FILE__, __LINE__)
    26     26   static void testSetErrorFunc(int rc, int *pRc, const char *zFile, int iLine){
    27     27     if( rc ){
    28     28       *pRc = rc;
    29         -    printf("FAILED (%s:%d) rc=%d ", zFile, iLine, rc);
           29  +    fprintf(stderr, "FAILED (%s:%d) rc=%d ", zFile, iLine, rc);
    30     30       test_failed();
    31     31     }
    32     32   }
    33     33   
    34     34   static int lsm_memcmp(u8 *a, u8 *b, int c){
    35     35     int i;
    36     36     for(i=0; i<c; i++){

Changes to ext/lsm1/lsm-test/lsmtest_tdb.c.

   740    740   #endif
   741    741   };
   742    742   
   743    743   const char *tdb_system_name(int i){
   744    744     if( i<0 || i>=ArraySize(aLib) ) return 0;
   745    745     return aLib[i].zName;
   746    746   }
          747  +
          748  +const char *tdb_default_db(const char *zSys){
          749  +  int i;
          750  +  for(i=0; i<ArraySize(aLib); i++){
          751  +    if( strcmp(aLib[i].zName, zSys)==0 ) return aLib[i].zDefaultDb;
          752  +  }
          753  +  return 0;
          754  +}
   747    755   
   748    756   int tdb_open(const char *zLib, const char *zDb, int bClear, TestDb **ppDb){
   749    757     int i;
   750    758     int rc = 1;
   751    759     const char *zSpec = 0;
   752    760   
   753    761     int nLib = 0;

Changes to ext/lsm1/lsm-test/lsmtest_tdb.h.

   121    121     int bReverse,                   /* True to scan in reverse order */
   122    122     void *pKey1, int nKey1,         /* Start of search */
   123    123     void *pKey2, int nKey2,         /* End of search */
   124    124     void (*xCallback)(void *pCtx, void *pKey, int nKey, void *pVal, int nVal)
   125    125   );
   126    126   
   127    127   const char *tdb_system_name(int i);
          128  +const char *tdb_default_db(const char *zSys);
   128    129   
   129    130   int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb);
   130    131   
   131    132   /*
   132    133   ** If the TestDb handle passed as an argument is a wrapper around an LSM
   133    134   ** database, return the LSM handle. Otherwise, if the argument is some other
   134    135   ** database system, return NULL.

Changes to ext/lsm1/lsmInt.h.

   860    860   void lsmLogMessage(lsm_db *, int, const char *, ...);
   861    861   int lsmInfoFreelist(lsm_db *pDb, char **pzOut);
   862    862   
   863    863   /*
   864    864   ** Functions from file "lsm_log.c".
   865    865   */
   866    866   int lsmLogBegin(lsm_db *pDb);
   867         -int lsmLogWrite(lsm_db *, void *, int, void *, int);
          867  +int lsmLogWrite(lsm_db *, int, void *, int, void *, int);
   868    868   int lsmLogCommit(lsm_db *);
   869    869   void lsmLogEnd(lsm_db *pDb, int bCommit);
   870    870   void lsmLogTell(lsm_db *, LogMark *);
   871    871   void lsmLogSeek(lsm_db *, LogMark *);
   872    872   void lsmLogClose(lsm_db *);
   873    873   
   874    874   int lsmLogRecover(lsm_db *);
   875    875   int lsmInfoLogStructure(lsm_db *pDb, char **pzVal);
   876    876   
          877  +/* Valid values for the second argument to lsmLogWrite(). */
          878  +#define LSM_WRITE        0x06
          879  +#define LSM_DELETE       0x08
          880  +#define LSM_DRANGE       0x0A
   877    881   
   878    882   /**************************************************************************
   879    883   ** Functions from file "lsm_shared.c".
   880    884   */
   881    885   
   882    886   int lsmDbDatabaseConnect(lsm_db*, const char *);
   883    887   void lsmDbDatabaseRelease(lsm_db *);

Changes to ext/lsm1/lsm_log.c.

   195    195   #define LSM_LOG_PAD1         0x01
   196    196   #define LSM_LOG_PAD2         0x02
   197    197   #define LSM_LOG_COMMIT       0x03
   198    198   #define LSM_LOG_JUMP         0x04
   199    199   
   200    200   #define LSM_LOG_WRITE        0x06
   201    201   #define LSM_LOG_WRITE_CKSUM  0x07
          202  +
   202    203   #define LSM_LOG_DELETE       0x08
   203    204   #define LSM_LOG_DELETE_CKSUM 0x09
          205  +
          206  +#define LSM_LOG_DRANGE       0x0A
          207  +#define LSM_LOG_DRANGE_CKSUM 0x0B
   204    208   
   205    209   /* Require a checksum every 32KB. */
   206    210   #define LSM_CKSUM_MAXDATA (32*1024)
   207    211   
   208    212   /* Do not wrap a log file smaller than this in bytes. */
   209    213   #define LSM_MIN_LOGWRAP      (128*1024)
   210    214   
................................................................................
   649    653   
   650    654   /*
   651    655   ** Append an LSM_LOG_WRITE (if nVal>=0) or LSM_LOG_DELETE (if nVal<0) 
   652    656   ** record to the database log.
   653    657   */
   654    658   int lsmLogWrite(
   655    659     lsm_db *pDb,                    /* Database handle */
          660  +  int eType,
   656    661     void *pKey, int nKey,           /* Database key to write to log */
   657    662     void *pVal, int nVal            /* Database value (or nVal<0) to write */
   658    663   ){
   659    664     int rc = LSM_OK;
   660    665     LogWriter *pLog;                /* Log object to write to */
   661    666     int nReq;                       /* Bytes of space required in log */
   662    667     int bCksum = 0;                 /* True to embed a checksum in this record */
   663    668   
          669  +  assert( eType==LSM_WRITE || eType==LSM_DELETE || eType==LSM_DRANGE );
          670  +  assert( LSM_LOG_WRITE==LSM_WRITE );
          671  +  assert( LSM_LOG_DELETE==LSM_DELETE );
          672  +  assert( LSM_LOG_DRANGE==LSM_DRANGE );
          673  +  assert( (eType==LSM_LOG_DELETE)==(nVal<0) );
          674  +
   664    675     if( pDb->bUseLog==0 ) return LSM_OK;
   665    676     pLog = pDb->pLogWriter;
   666    677   
   667    678     /* Determine how many bytes of space are required, assuming that a checksum
   668    679     ** will be embedded in this record (even though it may not be).  */
   669    680     nReq = 1 + lsmVarintLen32(nKey) + 8 + nKey;
   670         -  if( nVal>=0 ) nReq += lsmVarintLen32(nVal) + nVal;
          681  +  if( eType!=LSM_LOG_DELETE ) nReq += lsmVarintLen32(nVal) + nVal;
   671    682   
   672    683     /* Jump over the jump region if required. Set bCksum to true to tell the
   673    684     ** code below to include a checksum in the record if either (a) writing
   674    685     ** this record would mean that more than LSM_CKSUM_MAXDATA bytes of data
   675    686     ** have been written to the log since the last checksum, or (b) the jump
   676    687     ** is taken.  */
   677    688     rc = jumpIfRequired(pDb, pLog, nReq, &bCksum);
................................................................................
   683    694     if( rc==LSM_OK ){
   684    695       u8 *a = (u8 *)&pLog->buf.z[pLog->buf.n];
   685    696       
   686    697       /* Write the record header - the type byte followed by either 1 (for
   687    698       ** DELETE) or 2 (for WRITE) varints.  */
   688    699       assert( LSM_LOG_WRITE_CKSUM == (LSM_LOG_WRITE | 0x0001) );
   689    700       assert( LSM_LOG_DELETE_CKSUM == (LSM_LOG_DELETE | 0x0001) );
   690         -    *(a++) = (nVal>=0 ? LSM_LOG_WRITE : LSM_LOG_DELETE) | (u8)bCksum;
          701  +    assert( LSM_LOG_DRANGE_CKSUM == (LSM_LOG_DRANGE | 0x0001) );
          702  +    *(a++) = (u8)eType | (u8)bCksum;
   691    703       a += lsmVarintPut32(a, nKey);
   692         -    if( nVal>=0 ) a += lsmVarintPut32(a, nVal);
          704  +    if( eType!=LSM_LOG_DELETE ) a += lsmVarintPut32(a, nVal);
   693    705   
   694    706       if( bCksum ){
   695    707         pLog->buf.n = (a - (u8 *)pLog->buf.z);
   696    708         rc = logCksumAndFlush(pDb);
   697    709         a = (u8 *)&pLog->buf.z[pLog->buf.n];
   698    710       }
   699    711   
   700    712       memcpy(a, pKey, nKey);
   701    713       a += nKey;
   702         -    if( nVal>=0 ){
          714  +    if( eType!=LSM_LOG_DELETE ){
   703    715         memcpy(a, pVal, nVal);
   704    716         a += nVal;
   705    717       }
   706    718       pLog->buf.n = a - (u8 *)pLog->buf.z;
   707    719       assert( pLog->buf.n<=pLog->buf.nAlloc );
   708    720     }
   709    721   
................................................................................
  1001   1013             case LSM_LOG_PAD2: {
  1002   1014               int nPad;
  1003   1015               logReaderVarint(&reader, &buf1, &nPad, &rc);
  1004   1016               logReaderBlob(&reader, &buf1, nPad, 0, &rc);
  1005   1017               break;
  1006   1018             }
  1007   1019   
         1020  +          case LSM_LOG_DRANGE:
         1021  +          case LSM_LOG_DRANGE_CKSUM:
  1008   1022             case LSM_LOG_WRITE:
  1009   1023             case LSM_LOG_WRITE_CKSUM: {
  1010   1024               int nKey;
  1011   1025               int nVal;
  1012   1026               u8 *aVal;
  1013   1027               logReaderVarint(&reader, &buf1, &nKey, &rc);
  1014   1028               logReaderVarint(&reader, &buf2, &nVal, &rc);
  1015   1029   
  1016         -            if( eType==LSM_LOG_WRITE_CKSUM ){
         1030  +            if( eType==LSM_LOG_WRITE_CKSUM || eType==LSM_LOG_DRANGE_CKSUM ){
  1017   1031                 logReaderCksum(&reader, &buf1, &bEof, &rc);
  1018   1032               }else{
  1019   1033                 bEof = logRequireCksum(&reader, nKey+nVal);
  1020   1034               }
  1021   1035               if( bEof ) break;
  1022   1036   
  1023   1037               logReaderBlob(&reader, &buf1, nKey, 0, &rc);
  1024   1038               logReaderBlob(&reader, &buf2, nVal, &aVal, &rc);
  1025   1039               if( iPass==1 && rc==LSM_OK ){ 
  1026         -              rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
         1040  +              if( eType==LSM_LOG_WRITE || eType==LSM_LOG_WRITE_CKSUM ){
         1041  +                rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
         1042  +              }else{
         1043  +                rc = lsmTreeDelete(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
         1044  +              }
  1027   1045               }
  1028   1046               break;
  1029   1047             }
  1030   1048   
  1031   1049             case LSM_LOG_DELETE:
  1032   1050             case LSM_LOG_DELETE_CKSUM: {
  1033   1051               int nKey; u8 *aKey;

Changes to ext/lsm1/lsm_main.c.

   665    665   
   666    666     if( pDb->nTransOpen==0 ){
   667    667       bCommit = 1;
   668    668       rc = lsm_begin(pDb, 1);
   669    669     }
   670    670   
   671    671     if( rc==LSM_OK ){
   672         -    if( bDeleteRange==0 ){
   673         -      rc = lsmLogWrite(pDb, (void *)pKey, nKey, (void *)pVal, nVal);
   674         -    }else{
   675         -      /* TODO */
   676         -    }
          672  +    int eType = (bDeleteRange ? LSM_DRANGE : (nVal>=0?LSM_WRITE:LSM_DELETE));
          673  +    rc = lsmLogWrite(pDb, eType, (void *)pKey, nKey, (void *)pVal, nVal);
   677    674     }
   678    675   
   679    676     lsmSortedSaveTreeCursors(pDb);
   680    677   
   681    678     if( rc==LSM_OK ){
   682    679       int pgsz = lsmFsPageSize(pDb->pFS);
   683    680       int nQuant = LSM_AUTOWORK_QUANT * pgsz;