/ Check-in [7221f6e3]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Have .recover store all orphaned rows in a single table, with extra columns to indicate the orphaned page and sub-tree they were discovered within.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | dbdata
Files: files | file ages | folders
SHA3-256: 7221f6e33ed6a5a96ec61e25f2a1f70b84aae66e503d897eb7b7ff1aec42355d
User & Date: dan 2019-04-26 21:11:37
Context
2019-04-27
15:35
Fix a problem in the .recover command with recovering WITHOUT ROWID tables where the PK columns are not the leftmost in the CREATE TABLE statement. check-in: 91df4b8e user: dan tags: dbdata
2019-04-26
21:11
Have .recover store all orphaned rows in a single table, with extra columns to indicate the orphaned page and sub-tree they were discovered within. check-in: 7221f6e3 user: dan tags: dbdata
15:40
Fix another problem with database freelist handling in the ".recover" command. check-in: bee2652a user: dan tags: dbdata
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/shell.c.in.

  6222   6222   */
  6223   6223   static void recoverFreeTable(RecoverTable *pTab){
  6224   6224     if( pTab ){
  6225   6225       sqlite3_free(pTab->zName);
  6226   6226       sqlite3_free(pTab->zQuoted);
  6227   6227       if( pTab->azlCol ){
  6228   6228         int i;
  6229         -      for(i=0; i<pTab->nCol; i++){
         6229  +      for(i=0; i<=pTab->nCol; i++){
  6230   6230           sqlite3_free(pTab->azlCol[i]);
  6231   6231         }
  6232   6232         sqlite3_free(pTab->azlCol);
  6233   6233       }
  6234   6234       sqlite3_free(pTab);
  6235   6235     }
  6236   6236   }
  6237   6237   
  6238         -static void recoverOldTable(
         6238  +static RecoverTable *recoverOldTable(
  6239   6239     int *pRc,                       /* IN/OUT: Error code */
  6240         -  RecoverTable *pTab,
  6241   6240     const char *zName,              /* Name of table */
  6242   6241     const char *zSql,               /* CREATE TABLE statement */
  6243   6242     int bIntkey, 
  6244   6243     int nCol
  6245   6244   ){
  6246   6245     sqlite3 *dbtmp = 0;             /* sqlite3 handle for testing CREATE TABLE */
  6247   6246     int rc = *pRc;
         6247  +  RecoverTable *pTab = 0;
  6248   6248   
         6249  +  pTab = (RecoverTable*)shellMalloc(&rc, sizeof(RecoverTable));
  6249   6250     if( rc==SQLITE_OK ){
  6250   6251       int nSqlCol = 0;
  6251   6252       int bSqlIntkey = 0;
  6252   6253       sqlite3_stmt *pStmt = 0;
  6253         -
         6254  +    
  6254   6255       rc = sqlite3_open("", &dbtmp);
  6255   6256       if( rc==SQLITE_OK ){
  6256   6257         rc = sqlite3_exec(dbtmp, "PRAGMA writable_schema = on", 0, 0, 0);
  6257   6258       }
  6258   6259       if( rc==SQLITE_OK ){
  6259   6260         rc = sqlite3_exec(dbtmp, zSql, 0, 0, 0);
  6260   6261         if( rc==SQLITE_ERROR ){
................................................................................
  6281   6282       );
  6282   6283       if( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pStmt) ){
  6283   6284         bSqlIntkey = sqlite3_column_int(pStmt, 0);
  6284   6285       }
  6285   6286       shellFinalize(&rc, pStmt);
  6286   6287   
  6287   6288       if( bIntkey==bSqlIntkey ){
         6289  +      int i;
  6288   6290         const char *zPk = "_rowid_";
  6289   6291         sqlite3_stmt *pPkFinder = 0;
  6290   6292   
  6291         -      shellPreparePrintf(dbtmp, &rc, &pPkFinder, 
         6293  +      pTab->iPk = -2;
         6294  +      if( bIntkey ){
         6295  +        shellPreparePrintf(dbtmp, &rc, &pPkFinder, 
  6292   6296             "SELECT cid, name FROM pragma_table_info(%Q) "
  6293   6297             "  WHERE pk=1 AND type='integer' COLLATE nocase"
  6294         -          "  AND NOT EXISTS (SELECT cid FROM pragma_table_info(%Q) WHERE pk=2)",
  6295         -          zName, zName
  6296         -      );
  6297         -      if( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pPkFinder) ){
  6298         -        pTab->iPk = sqlite3_column_int(pPkFinder, 0);
  6299         -        zPk = (const char*)sqlite3_column_text(pPkFinder, 1);
         6298  +          "  AND NOT EXISTS (SELECT cid FROM pragma_table_info(%Q) WHERE pk=2)"
         6299  +          , zName, zName
         6300  +        );
         6301  +        if( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pPkFinder) ){
         6302  +          pTab->iPk = sqlite3_column_int(pPkFinder, 0);
         6303  +          zPk = (const char*)sqlite3_column_text(pPkFinder, 1);
         6304  +        }
  6300   6305         }
  6301   6306   
  6302   6307         pTab->zName = shellMPrintf(&rc, "%s", zName);
  6303   6308         pTab->zQuoted = shellMPrintf(&rc, "%Q", pTab->zName);
  6304         -      pTab->azlCol = (char**)shellMalloc(&rc, sizeof(char*) * nSqlCol);
         6309  +      pTab->azlCol = (char**)shellMalloc(&rc, sizeof(char*) * (nSqlCol+1));
  6305   6310         pTab->nCol = nSqlCol;
  6306   6311   
  6307         -      if( nSqlCol==1 && pTab->iPk==0 ){
         6312  +      if( bIntkey ){
  6308   6313           pTab->azlCol[0] = shellMPrintf(&rc, "%Q", zPk);
  6309   6314         }else{
  6310         -        shellPreparePrintf(dbtmp, &rc, &pStmt, 
  6311         -          "SELECT -1+row_number()          OVER (ORDER BY cid),"
  6312         -          "    %Q||%Q||group_concat(name, ', ') FILTER (WHERE cid!=%d) "
  6313         -          "           OVER (ORDER BY cid) "
         6315  +        pTab->azlCol[0] = shellMPrintf(&rc, "");
         6316  +      }
         6317  +      i = 1;
         6318  +      shellPreparePrintf(dbtmp, &rc, &pStmt, 
         6319  +          "SELECT %Q || group_concat(name, ', ') "
         6320  +          "  FILTER (WHERE cid!=%d) OVER (ORDER BY cid) "
  6314   6321             "FROM pragma_table_info(%Q)", 
  6315         -          (bIntkey ? zPk : ""), (bIntkey ? ", " : ""), 
  6316         -          pTab->iPk, zName
  6317         -        );
  6318         -        while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pStmt) ){
  6319         -          int idx = sqlite3_column_int(pStmt, 0);
  6320         -          const char *zText = (const char*)sqlite3_column_text(pStmt, 1);
  6321         -          pTab->azlCol[idx] = shellMPrintf(&rc, "%s", zText);
  6322         -        }
  6323         -        shellFinalize(&rc, pStmt);
         6322  +          bIntkey ? ", " : "", pTab->iPk, zName
         6323  +      );
         6324  +      while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pStmt) ){
         6325  +        const char *zText = (const char*)sqlite3_column_text(pStmt, 0);
         6326  +        pTab->azlCol[i] = shellMPrintf(&rc, "%s%s", pTab->azlCol[0], zText);
         6327  +        i++;
  6324   6328         }
         6329  +      shellFinalize(&rc, pStmt);
         6330  +
  6325   6331         shellFinalize(&rc, pPkFinder);
  6326   6332       }
  6327   6333     }
  6328   6334   
  6329   6335    finished:
  6330   6336     sqlite3_close(dbtmp);
  6331   6337     *pRc = rc;
         6338  +  if( rc!=SQLITE_OK ){
         6339  +    recoverFreeTable(pTab);
         6340  +    pTab = 0;
         6341  +  }
         6342  +  return pTab;
  6332   6343   }
  6333   6344   
  6334   6345   static RecoverTable *recoverNewTable(
  6335   6346     ShellState *pState, 
  6336   6347     int *pRc,
  6337   6348     int iRoot,
  6338   6349     int bIntkey,
  6339         -  int nCol
         6350  +  int nCol,
         6351  +  int *pbNoop
  6340   6352   ){
  6341   6353     sqlite3_stmt *pStmt = 0;
  6342   6354     RecoverTable *pRet = 0;
  6343   6355     int bNoop = 0;
  6344   6356     const char *zSql = 0;
  6345   6357     const char *zName = 0;
  6346   6358   
  6347         -  pRet = (RecoverTable*)shellMalloc(pRc, sizeof(RecoverTable));
  6348         -  if( pRet ) pRet->iPk = -2;
  6349   6359   
  6350   6360     /* Search the recovered schema for an object with root page iRoot. */
  6351   6361     shellPreparePrintf(pState->db, pRc, &pStmt,
  6352   6362         "SELECT type, name, sql FROM recovery.schema WHERE rootpage=%d", iRoot
  6353   6363     );
  6354   6364     while( *pRc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pStmt) ){
  6355   6365       const char *zType = (const char*)sqlite3_column_text(pStmt, 0);
................................................................................
  6356   6366       if( bIntkey==0 && sqlite3_stricmp(zType, "index")==0 ){
  6357   6367         bNoop = 1;
  6358   6368         break;
  6359   6369       }
  6360   6370       if( sqlite3_stricmp(zType, "table")==0 ){
  6361   6371         zName = (const char*)sqlite3_column_text(pStmt, 1);
  6362   6372         zSql = (const char*)sqlite3_column_text(pStmt, 2);
  6363         -      recoverOldTable(pRc, pRet, zName, zSql, bIntkey, nCol);
         6373  +      pRet = recoverOldTable(pRc, zName, zSql, bIntkey, nCol);
  6364   6374         break;
  6365   6375       }
  6366   6376     }
         6377  +
  6367   6378     shellFinalize(pRc, pStmt);
  6368         -  if( bNoop ){
  6369         -    sqlite3_free(pRet);
  6370         -    return 0;
  6371         -  }
         6379  +  *pbNoop = bNoop;
         6380  +  return pRet;
         6381  +}
  6372   6382   
  6373         -  if( pRet && pRet->zName==0 ){
  6374         -    sqlite3_stmt *pStmt = 0;
  6375         -
  6376         -    pRet->zName = shellMPrintf(pRc, "orphan_%d_%d", nCol, iRoot);
  6377         -    pRet->zQuoted = shellMPrintf(pRc, "%Q", pRet->zName);
  6378         -    pRet->azlCol = (char**)shellMalloc(pRc, sizeof(char*) * nCol);
  6379         -    pRet->nCol = nCol;
  6380         -
  6381         -    shellPreparePrintf(pState->db, pRc, &pStmt, 
  6382         -      "WITH s(i) AS ("
  6383         -      "  SELECT 1 UNION ALL SELECT i+1 FROM s WHERE i<%d"
  6384         -      ")"
  6385         -      "SELECT i-1, %Q || group_concat('c' || i, ', ') OVER (ORDER BY i) FROM s",
  6386         -      nCol, (bIntkey ? "id, " : "")
         6383  +static RecoverTable *recoverOrphanTable(
         6384  +  ShellState *pState, 
         6385  +  int *pRc, 
         6386  +  int nCol
         6387  +){
         6388  +  RecoverTable *pTab = 0;
         6389  +  if( nCol>=0 && *pRc==SQLITE_OK ){
         6390  +    int i;
         6391  +    raw_printf(pState->out, 
         6392  +        "CREATE TABLE recover_orphan(rootpgno INTEGER, "
         6393  +        "pgno INTEGER, nfield INTEGER, id INTEGER"
  6387   6394       );
  6388         -    while( *pRc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pStmt) ){
  6389         -      int idx = sqlite3_column_int(pStmt, 0);
  6390         -      const char *zText = (const char*)sqlite3_column_text(pStmt, 1);
  6391         -      pRet->azlCol[idx] = shellMPrintf(pRc, "%s", zText);
  6392         -    }
  6393         -    shellFinalize(pRc, pStmt);
  6394         -
  6395         -    if( *pRc==SQLITE_OK ){
  6396         -      char *zCreate = shellMPrintf(pRc, "CREATE TABLE %Q (%s)", 
  6397         -        pRet->zName, pRet->azlCol[nCol-1]
  6398         -      );
  6399         -      if( zCreate ){
  6400         -        raw_printf(pState->out, "%s;\n", zCreate);
  6401         -        sqlite3_free(zCreate);
  6402         -      }
  6403         -    }
  6404         -  }
  6405         -
  6406         -  if( *pRc!=SQLITE_OK ){
  6407         -    recoverFreeTable(pRet);
  6408         -    pRet = 0;
  6409         -  }
  6410         -
  6411         -  return pRet;
         6395  +    for(i=0; i<nCol; i++){
         6396  +      raw_printf(pState->out, ", c%d", i);
         6397  +    }
         6398  +    raw_printf(pState->out, ");\n");
         6399  +
         6400  +    pTab = (RecoverTable*)shellMalloc(pRc, sizeof(RecoverTable));
         6401  +    if( pTab ){
         6402  +      pTab->zName = shellMPrintf(pRc, "%s", "recover_orphan");
         6403  +      pTab->zQuoted = shellMPrintf(pRc, "%Q", pTab->zName);
         6404  +      pTab->nCol = nCol;
         6405  +      pTab->iPk = -2;
         6406  +      if( nCol>0 ){
         6407  +        pTab->azlCol = (char**)shellMalloc(pRc, sizeof(char*) * (nCol+1));
         6408  +        if( pTab->azlCol ){
         6409  +          pTab->azlCol[nCol] = shellMPrintf(pRc, "");
         6410  +          for(i=nCol-1; i>=0; i--){
         6411  +            pTab->azlCol[i] = shellMPrintf(pRc, "%s, NULL", pTab->azlCol[i+1]);
         6412  +          }
         6413  +        }
         6414  +      }
         6415  +    }
         6416  +
         6417  +    if( *pRc!=SQLITE_OK ){
         6418  +      recoverFreeTable(pTab);
         6419  +      pTab = 0;
         6420  +    }
         6421  +  }
         6422  +  return pTab;
  6412   6423   }
  6413   6424   
  6414   6425   /*
  6415   6426   ** This function is called to recover data from the database. A script
  6416   6427   ** to construct a new database containing all recovered data is output
  6417   6428   ** on stream pState->out.
  6418   6429   */
................................................................................
  6419   6430   static int recoverDatabaseCmd(ShellState *pState, int nArg, char **azArg){
  6420   6431     int rc = SQLITE_OK;
  6421   6432     sqlite3_stmt *pLoop = 0;        /* Loop through all root pages */
  6422   6433     sqlite3_stmt *pPages = 0;       /* Loop through all pages in a group */
  6423   6434     sqlite3_stmt *pCells = 0;       /* Loop through all cells in a page */
  6424   6435     const char *zRecoveryDb = "";   /* Name of "recovery" database */
  6425   6436     int i;
         6437  +  int nOrphan = -1;
         6438  +  RecoverTable *pOrphan = 0;
  6426   6439   
  6427   6440     int bFreelist = 1;              /* 0 if --freelist-corrupt is specified */
  6428   6441     for(i=1; i<nArg; i++){
  6429   6442       char *z = azArg[i];
  6430   6443       int n;
  6431   6444       if( z[0]=='-' && z[1]=='-' ) z++;
  6432   6445       n = strlen(z);
................................................................................
  6541   6554       "  max(CASE WHEN field=2 THEN value ELSE NULL END),"
  6542   6555       "  max(CASE WHEN field=3 THEN value ELSE NULL END),"
  6543   6556       "  max(CASE WHEN field=4 THEN value ELSE NULL END)"
  6544   6557       "FROM sqlite_dbdata WHERE pgno IN ("
  6545   6558       "  SELECT pgno FROM recovery.map WHERE root=1"
  6546   6559       ")"
  6547   6560       "GROUP BY pgno, cell;"
         6561  +    "CREATE INDEX recovery.schema_rootpage ON schema(rootpage);"
  6548   6562     );
  6549   6563   
  6550   6564     /* Open a transaction, then print out all non-virtual, non-"sqlite_%" 
  6551   6565     ** CREATE TABLE statements that extracted from the existing schema.  */
  6552   6566     if( rc==SQLITE_OK ){
  6553   6567       sqlite3_stmt *pStmt = 0;
  6554   6568       raw_printf(pState->out, "BEGIN;\n");
................................................................................
  6561   6575         const char *zCreateTable = (const char*)sqlite3_column_text(pStmt, 0);
  6562   6576         raw_printf(pState->out, "CREATE TABLE IF NOT EXISTS %s;\n", 
  6563   6577             &zCreateTable[12]
  6564   6578         );
  6565   6579       }
  6566   6580       shellFinalize(&rc, pStmt);
  6567   6581     }
         6582  +
         6583  +  /* Figure out if an orphan table will be required. And if so, how many
         6584  +  ** user columns it should contain */
         6585  +  shellPrepare(pState->db, &rc, 
         6586  +      "SELECT coalesce(max(maxlen), -2) FROM recovery.map" 
         6587  +      "  WHERE root>1 AND root NOT IN (SELECT rootpage FROM recovery.schema)"
         6588  +      , &pLoop
         6589  +  );
         6590  +  if( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pLoop) ){
         6591  +    nOrphan = sqlite3_column_int(pLoop, 0);
         6592  +  }
         6593  +  shellFinalize(&rc, pLoop);
         6594  +  pLoop = 0;
         6595  +  pOrphan = recoverOrphanTable(pState, &rc, nOrphan);
  6568   6596   
  6569   6597     shellPrepare(pState->db, &rc,
  6570   6598         "SELECT pgno FROM recovery.map WHERE root=?", &pPages
  6571   6599     );
  6572   6600     shellPrepare(pState->db, &rc,
  6573   6601         "SELECT max(field), group_concat(shell_escape_crnl(quote(value)), ', ')"
  6574   6602         "FROM sqlite_dbdata WHERE pgno = ? AND field != ?"
................................................................................
  6582   6610         "  SELECT rootpage FROM recovery.schema WHERE name='sqlite_sequence'"
  6583   6611         ")", &pLoop
  6584   6612     );
  6585   6613     while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pLoop) ){
  6586   6614       int iRoot = sqlite3_column_int(pLoop, 0);
  6587   6615       int bIntkey = sqlite3_column_int(pLoop, 1);
  6588   6616       int nCol = sqlite3_column_int(pLoop, 2);
         6617  +    int bNoop = 0;
  6589   6618       RecoverTable *pTab;
  6590   6619   
  6591         -    pTab = recoverNewTable(pState, &rc, iRoot, bIntkey, nCol);
  6592         -    if( pTab ){
  6593         -      if( 0==sqlite3_stricmp(pTab->zName, "sqlite_sequence") ){
  6594         -        raw_printf(pState->out, "DELETE FROM sqlite_sequence;\n");
  6595         -      }
  6596         -      sqlite3_bind_int(pPages, 1, iRoot);
  6597         -      sqlite3_bind_int(pCells, 2, pTab->iPk);
  6598         -      while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pPages) ){
  6599         -        sqlite3_bind_int(pCells, 1, sqlite3_column_int(pPages, 0));
  6600         -        while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pCells) ){
  6601         -          int iMax = sqlite3_column_int(pCells, 0);
  6602         -          const char *zVal = (const char*)sqlite3_column_text(pCells, 1);
         6620  +    pTab = recoverNewTable(pState, &rc, iRoot, bIntkey, nCol, &bNoop);
         6621  +    if( bNoop || rc ) continue;
         6622  +    if( pTab==0 ) pTab = pOrphan;
         6623  +
         6624  +    if( 0==sqlite3_stricmp(pTab->zName, "sqlite_sequence") ){
         6625  +      raw_printf(pState->out, "DELETE FROM sqlite_sequence;\n");
         6626  +    }
         6627  +    sqlite3_bind_int(pPages, 1, iRoot);
         6628  +    sqlite3_bind_int(pCells, 2, pTab->iPk);
         6629  +
         6630  +    while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pPages) ){
         6631  +      int iPgno = sqlite3_column_int(pPages, 0);
         6632  +      sqlite3_bind_int(pCells, 1, iPgno);
         6633  +      while( rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pCells) ){
         6634  +        int nField = sqlite3_column_int(pCells, 0);
         6635  +        const char *zVal = (const char*)sqlite3_column_text(pCells, 1);
         6636  +
         6637  +        nField = nField+1;
         6638  +        if( pTab==pOrphan ){
         6639  +          raw_printf(pState->out, 
         6640  +              "INSERT INTO %s VALUES(%d, %d, %d, %s%s%s);\n",
         6641  +              pTab->zQuoted, iRoot, iPgno, nField, 
         6642  +              bIntkey ? "" : "NULL, ", zVal, pTab->azlCol[nField]
         6643  +          );
         6644  +        }else{
  6603   6645             raw_printf(pState->out, "INSERT INTO %s(%s) VALUES( %s );\n", 
  6604         -              pTab->zQuoted, pTab->azlCol[iMax>0?iMax:0], zVal
         6646  +              pTab->zQuoted, pTab->azlCol[nField], zVal
  6605   6647             );
  6606   6648           }
  6607         -        shellReset(&rc, pCells);
  6608   6649         }
  6609         -      shellReset(&rc, pPages);
         6650  +      shellReset(&rc, pCells);
  6610   6651       }
  6611         -    recoverFreeTable(pTab);
         6652  +    shellReset(&rc, pPages);
         6653  +    if( pTab!=pOrphan ) recoverFreeTable(pTab);
  6612   6654     }
  6613   6655     shellFinalize(&rc, pLoop);
  6614   6656     shellFinalize(&rc, pPages);
  6615   6657     shellFinalize(&rc, pCells);
         6658  +  recoverFreeTable(pOrphan);
  6616   6659   
  6617   6660     /* The rest of the schema */
  6618   6661     if( rc==SQLITE_OK ){
  6619   6662       sqlite3_stmt *pStmt = 0;
  6620   6663       shellPrepare(pState->db, &rc, 
  6621   6664           "SELECT sql, name FROM recovery.schema "
  6622   6665           "WHERE sql NOT LIKE 'create table%'", &pStmt