/ Check-in [1db198cc]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Avoid a linear scan of the entire table when ota updates or deletes a row from a table with an external primary key index.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | ota-update
Files: files | file ages | folders
SHA1: 1db198ccca1e5c5a922cefe3daeff8d2e5d3a7f7
User & Date: dan 2015-02-04 16:32:47
Context
2015-02-04
19:20
Fix the error messages returned by ota if an update violates a unique constraint. check-in: c4845a3b user: dan tags: ota-update
16:32
Avoid a linear scan of the entire table when ota updates or deletes a row from a table with an external primary key index. check-in: 1db198cc user: dan tags: ota-update
11:08
Fix a memory leak that could follow an OOM error in ota. check-in: 0d5415f2 user: dan tags: ota-update
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to ext/ota/sqlite3ota.c.

   114    114     unsigned char *abTblPk;         /* Array of flags, set on target PK columns */
   115    115     int eType;                      /* Table type - an OTA_PK_XXX value */
   116    116   
   117    117     /* Output variables. zTbl==0 implies EOF. */
   118    118     int bCleanup;                   /* True in "cleanup" state */
   119    119     const char *zTbl;               /* Name of target db table */
   120    120     const char *zIdx;               /* Name of target db index (or null) */
   121         -  int tnum;                       /* Root page of index (not table) */
          121  +  int iTnum;                      /* Root page of current object */
          122  +  int iPkTnum;                    /* If eType==EXTERNAL, root of PK index */
   122    123     int bUnique;                    /* Current index is unique */
   123    124     int iVisit;                     /* Number of points visited, incl. current */
   124    125   
   125    126     /* Statements created by otaObjIterPrepareAll() */
   126    127     int nCol;                       /* Number of columns in current object */
   127    128     sqlite3_stmt *pSelect;          /* Source data */
   128    129     sqlite3_stmt *pInsert;          /* Statement for INSERT operations */
................................................................................
   316    317           pIter->bCleanup = 0;
   317    318           rc = sqlite3_step(pIter->pTblIter);
   318    319           if( rc!=SQLITE_ROW ){
   319    320             rc = sqlite3_reset(pIter->pTblIter);
   320    321             pIter->zTbl = 0;
   321    322           }else{
   322    323             pIter->zTbl = (const char*)sqlite3_column_text(pIter->pTblIter, 0);
   323         -          pIter->tnum = sqlite3_column_int(pIter->pTblIter, 1);
          324  +          pIter->iTnum = sqlite3_column_int(pIter->pTblIter, 1);
   324    325             rc = SQLITE_OK;
   325    326           }
   326    327         }else{
   327    328           if( pIter->zIdx==0 ){
   328    329             sqlite3_bind_text(pIter->pIdxIter, 1, pIter->zTbl, -1, SQLITE_STATIC);
   329    330           }
   330    331           rc = sqlite3_step(pIter->pIdxIter);
   331    332           if( rc!=SQLITE_ROW ){
   332    333             rc = sqlite3_reset(pIter->pIdxIter);
   333    334             pIter->bCleanup = 1;
   334    335             pIter->zIdx = 0;
   335    336           }else{
   336    337             pIter->zIdx = (const char*)sqlite3_column_text(pIter->pIdxIter, 0);
   337         -          pIter->tnum = sqlite3_column_int(pIter->pIdxIter, 1);
          338  +          pIter->iTnum = sqlite3_column_int(pIter->pIdxIter, 1);
   338    339             pIter->bUnique = sqlite3_column_int(pIter->pIdxIter, 2);
   339    340             rc = SQLITE_OK;
   340    341           }
   341    342         }
   342    343       }
   343    344     }
   344    345   
................................................................................
   466    467       int rc2;                      /* sqlite3_finalize() return value */
   467    468       int bOtaRowid = 0;            /* If input table has column "ota_rowid" */
   468    469       int iOrder = 0;
   469    470   
   470    471       /* Figure out the type of table this step will deal with. */
   471    472       assert( pIter->eType==0 );
   472    473       sqlite3_test_control(
   473         -        SQLITE_TESTCTRL_TBLTYPE, p->db, "main", pIter->zTbl, &pIter->eType
          474  +        SQLITE_TESTCTRL_TBLTYPE, p->db, "main", pIter->zTbl, &pIter->eType,
          475  +        &pIter->iPkTnum
   474    476       );
   475    477       assert( pIter->eType==OTA_PK_NONE || pIter->eType==OTA_PK_IPK 
   476    478            || pIter->eType==OTA_PK_EXTERNAL || pIter->eType==OTA_PK_WITHOUT_ROWID
   477    479            || pIter->eType==OTA_PK_VTAB
   478    480       );
   479    481   
   480    482       /* Populate the azTblCol[] and nTblCol variables based on the columns
................................................................................
   757    759   static char *otaObjIterGetWhere(
   758    760     sqlite3ota *p, 
   759    761     OtaObjIter *pIter
   760    762   ){
   761    763     char *zList = 0;
   762    764     if( pIter->eType==OTA_PK_VTAB || pIter->eType==OTA_PK_NONE ){
   763    765       zList = otaMPrintf(p, "_rowid_ = ?%d", pIter->nTblCol+1);
          766  +  }else if( pIter->eType==OTA_PK_EXTERNAL ){
          767  +    const char *zSep = "";
          768  +    int i;
          769  +    for(i=0; i<pIter->nTblCol; i++){
          770  +      if( pIter->abTblPk[i] ){
          771  +        zList = otaMPrintf(p, "%z%sc%d=?%d", zList, zSep, i, i+1);
          772  +        zSep = " AND ";
          773  +      }
          774  +    }
          775  +    zList = otaMPrintf(p, 
          776  +        "_rowid_ = (SELECT id FROM ota_imposter2 WHERE %z)", zList
          777  +    );
          778  +
   764    779     }else{
   765    780       const char *zSep = "";
   766    781       int i;
   767    782       for(i=0; i<pIter->nTblCol; i++){
   768    783         if( pIter->abTblPk[i] ){
   769    784           const char *zCol = pIter->azTblCol[i];
   770    785           zList = otaMPrintf(p, "%z%s\"%w\"=?%d", zList, zSep, zCol, i+1);
................................................................................
   874    889       z = otaMPrintf(p, "%z)", z);
   875    890   
   876    891       rc = sqlite3_finalize(pXInfo);
   877    892       if( p->rc==SQLITE_OK ) p->rc = rc;
   878    893     }
   879    894     return z;
   880    895   }
          896  +
          897  +static void otaCreateImposterTable2(sqlite3ota *p, OtaObjIter *pIter){
          898  +  if( p->rc==SQLITE_OK && pIter->eType==OTA_PK_EXTERNAL ){
          899  +    int tnum = pIter->iPkTnum;    /* Root page of PK index */
          900  +    sqlite3_stmt *pQuery = 0;     /* SELECT name ... WHERE rootpage = $tnum */
          901  +    const char *zIdx = 0;         /* Name of PK index */
          902  +    sqlite3_stmt *pXInfo = 0;     /* PRAGMA main.index_xinfo = $zIdx */
          903  +    int rc;
          904  +
          905  +    const char *zComma = "";
          906  +
          907  +    char *zCols = 0;              /* Used to build up list of table cols */
          908  +    char *zPk = 0;                /* Used to build up table PK declaration */
          909  +    char *zSql = 0;               /* CREATE TABLE statement */
          910  +
          911  +    /* Figure out the name of the primary key index for the current table.
          912  +    ** This is needed for the argument to "PRAGMA index_xinfo". Set
          913  +    ** zIdx to point to a nul-terminated string containing this name. */
          914  +    p->rc = prepareAndCollectError(p->db, &pQuery, &p->zErrmsg, 
          915  +        "SELECT name FROM sqlite_master WHERE rootpage = ?"
          916  +    );
          917  +    if( p->rc==SQLITE_OK ){
          918  +      sqlite3_bind_int(pQuery, 1, tnum);
          919  +      if( SQLITE_ROW==sqlite3_step(pQuery) ){
          920  +        zIdx = (const char*)sqlite3_column_text(pQuery, 0);
          921  +      }
          922  +      if( zIdx==0 ){
          923  +        p->rc = SQLITE_CORRUPT;
          924  +      }
          925  +    }
          926  +    assert( (zIdx==0)==(p->rc!=SQLITE_OK) );
          927  +
          928  +    if( p->rc==SQLITE_OK ){
          929  +      p->rc = prepareFreeAndCollectError(p->db, &pXInfo, &p->zErrmsg,
          930  +          sqlite3_mprintf("PRAGMA main.index_xinfo = %Q", zIdx)
          931  +      );
          932  +    }
          933  +    sqlite3_finalize(pQuery);
          934  +
          935  +    while( p->rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pXInfo) ){
          936  +      int bKey = sqlite3_column_int(pXInfo, 5);
          937  +      if( bKey ){
          938  +        int iCid = sqlite3_column_int(pXInfo, 1);
          939  +        int bDesc = sqlite3_column_int(pXInfo, 3);
          940  +        const char *zCollate = (const char*)sqlite3_column_text(pXInfo, 4);
          941  +        zCols = otaMPrintf(p, "%z%sc%d %s COLLATE %s", zCols, zComma, 
          942  +            iCid, pIter->azTblType[iCid], zCollate
          943  +        );
          944  +        zPk = otaMPrintf(p, "%z%sc%d%s", zPk, zComma, iCid, bDesc?" DESC":"");
          945  +        zComma = ", ";
          946  +      }
          947  +    }
          948  +    zCols = otaMPrintf(p, "%z, id INTEGER", zCols);
          949  +    rc = sqlite3_finalize(pXInfo);
          950  +    if( p->rc==SQLITE_OK ) p->rc = rc;
          951  +
          952  +    zSql = otaMPrintf(p, 
          953  +        "CREATE TABLE ota_imposter2(%z, PRIMARY KEY(%z)) WITHOUT ROWID", 
          954  +        zCols, zPk
          955  +    );
          956  +    assert( (zSql==0)==(p->rc!=SQLITE_OK) );
          957  +    if( zSql ){
          958  +      sqlite3_test_control(SQLITE_TESTCTRL_IMPOSTER, p->db, "main", 1, tnum);
          959  +      p->rc = sqlite3_exec(p->db, zSql, 0, 0, &p->zErrmsg);
          960  +      sqlite3_test_control(SQLITE_TESTCTRL_IMPOSTER, p->db, "main", 0, 0);
          961  +    }
          962  +    sqlite3_free(zSql);
          963  +  }
          964  +}
   881    965   
   882    966   /*
   883    967   ** If an error has already occurred when this function is called, it 
   884    968   ** immediately returns zero (without doing any work). Or, if an error
   885    969   ** occurs during the execution of this function, it sets the error code
   886    970   ** in the sqlite3ota object indicated by the first argument and returns
   887    971   ** zero.
................................................................................
   901    985   **     precisely, the "same schema" means the same columns, types, collation
   902    986   **     sequences and primary key declaration.
   903    987   **
   904    988   **   OTA_PK_VTAB:
   905    989   **     No imposters required. 
   906    990   **
   907    991   **   OTA_PK_EXTERNAL:
   908         -**     Two imposters are required (TODO!!)
          992  +**     Two imposters are required. The first has the same schema as the
          993  +**     target database table, with no PRIMARY KEY or UNIQUE clauses. The
          994  +**     second is used to access the PK b-tree index on disk.
   909    995   */
   910    996   static void otaCreateImposterTable(sqlite3ota *p, OtaObjIter *pIter){
   911    997     if( p->rc==SQLITE_OK && pIter->eType!=OTA_PK_VTAB ){
   912         -    int tnum = pIter->tnum;
          998  +    int tnum = pIter->iTnum;
   913    999       const char *zComma = "";
   914   1000       char *zSql = 0;
   915   1001       int iCol;
   916   1002       sqlite3_test_control(SQLITE_TESTCTRL_IMPOSTER, p->db, "main", 0, 1);
   917   1003   
   918   1004       for(iCol=0; p->rc==SQLITE_OK && iCol<pIter->nTblCol; iCol++){
   919   1005         const char *zPk = "";
................................................................................
   962   1048   static int otaObjIterPrepareAll(
   963   1049     sqlite3ota *p, 
   964   1050     OtaObjIter *pIter,
   965   1051     int nOffset                     /* Add "LIMIT -1 OFFSET $nOffset" to SELECT */
   966   1052   ){
   967   1053     assert( pIter->bCleanup==0 );
   968   1054     if( pIter->pSelect==0 && otaObjIterCacheTableInfo(p, pIter)==SQLITE_OK ){
   969         -    const int tnum = pIter->tnum;
         1055  +    const int tnum = pIter->iTnum;
   970   1056       char *zCollist = 0;           /* List of indexed columns */
   971   1057       char **pz = &p->zErrmsg;
   972   1058       const char *zIdx = pIter->zIdx;
   973   1059       char *zLimit = 0;
   974   1060   
   975   1061       if( nOffset ){
   976   1062         zLimit = sqlite3_mprintf(" LIMIT -1 OFFSET %d", nOffset);
................................................................................
  1063   1149                 zCollist, (bOtaRowid ? ", ota_rowid" : ""), zTbl, zLimit
  1064   1150               )
  1065   1151           );
  1066   1152         }
  1067   1153   
  1068   1154         /* Create the imposter table or tables (if required). */
  1069   1155         otaCreateImposterTable(p, pIter);
         1156  +      otaCreateImposterTable2(p, pIter);
  1070   1157         zWrite = (pIter->eType==OTA_PK_VTAB ? zTbl : "ota_imposter");
  1071   1158   
  1072   1159         /* Create the INSERT statement to write to the target PK b-tree */
  1073   1160         if( p->rc==SQLITE_OK ){
  1074   1161           p->rc = prepareFreeAndCollectError(p->db, &pIter->pInsert, pz,
  1075   1162               sqlite3_mprintf(
  1076   1163                 "INSERT INTO main.%Q(%s%s) VALUES(%s)", 
................................................................................
  1937   2024   
  1938   2025       case 2: /* create_ota_delta */ {
  1939   2026         sqlite3 *db = sqlite3ota_db(pOta);
  1940   2027         int rc = sqlite3_create_function(
  1941   2028             db, "ota_delta", -1, SQLITE_UTF8, (void*)interp, test_ota_delta, 0, 0
  1942   2029         );
  1943   2030         Tcl_SetObjResult(interp, Tcl_NewStringObj(sqlite3ErrName(rc), -1));
  1944         -      sqlite3_exec(db, "PRAGMA vdbe_trace = 1", 0, 0, 0);
  1945   2031         ret = (rc==SQLITE_OK ? TCL_OK : TCL_ERROR);
  1946   2032         break;
  1947   2033       }
  1948   2034   
  1949   2035       default: /* seems unlikely */
  1950   2036         assert( !"cannot happen" );
  1951   2037         break;

Changes to src/main.c.

  3651   3651         if( db->init.busy==0 && db->init.newTnum>0 ){
  3652   3652           sqlite3ResetAllSchemasOfConnection(db);
  3653   3653         }
  3654   3654         sqlite3_mutex_leave(db->mutex);
  3655   3655         break;
  3656   3656       }
  3657   3657   
  3658         -    /* sqlite3_test_control(SQLITE_TESTCTRL_TBLTYPE, db, dbName, zTbl, peType)
         3658  +    /* sqlite3_test_control(TESTCTRL_TBLTYPE, db, dbName, zTbl, peType, piPk)
  3659   3659       **
  3660   3660       **   peType is of type (int*), a pointer to an output parameter of type
  3661   3661       **   (int). This call sets the output parameter as follows, depending
  3662   3662       **   on the type of the table specified by parameters dbName and zTbl.
  3663   3663       **
  3664   3664       **     0: No such table.
  3665   3665       **     1: Table has an implicit rowid.
  3666   3666       **     2: Table has an explicit IPK column.
  3667   3667       **     3: Table has an external PK index.
  3668   3668       **     4: Table is WITHOUT ROWID.
  3669   3669       **     5: Table is a virtual table.
         3670  +    **
         3671  +    **   Argument *piPk is also of type (int*), and also points to an output
         3672  +    **   parameter. Unless the table has an external primary key index 
         3673  +    **   (i.e. unless *peType is set to 3), then *piPk is set to zero. Or,
         3674  +    **   if the table does have an external primary key index, then *piPk
         3675  +    **   is set to the root page number of the primary key index before
         3676  +    **   returning.
  3670   3677       */
  3671   3678       case SQLITE_TESTCTRL_TBLTYPE: {
  3672   3679         sqlite3 *db = va_arg(ap, sqlite3*);
  3673   3680         const char *zDb = va_arg(ap, const char*);
  3674   3681         const char *zTab = va_arg(ap, const char*);
  3675   3682         int *peType = va_arg(ap, int*);
         3683  +      int *piPk = va_arg(ap, int*);
  3676   3684         Table *pTab;
         3685  +      *piPk = 0;
  3677   3686         sqlite3_mutex_enter(db->mutex);
  3678   3687         sqlite3BtreeEnterAll(db);
  3679   3688         pTab = sqlite3FindTable(db, zTab, zDb);
  3680   3689         if( pTab==0 ){
  3681   3690           *peType = 0;
  3682   3691         }else if( IsVirtual(pTab) ){
  3683   3692           *peType = 5;
  3684   3693         }else if( HasRowid(pTab)==0 ){
  3685   3694           *peType = 4;
  3686   3695         }else if( pTab->iPKey>=0 ){
  3687   3696           *peType = 2;
  3688   3697         }else{
  3689   3698           Index *pPk = sqlite3PrimaryKeyIndex(pTab);
  3690         -        *peType = (pPk ? 3 : 1);
         3699  +        if( pPk ){
         3700  +          *peType = 3;
         3701  +          *piPk = pPk->tnum;
         3702  +        }else{
         3703  +          *peType = 1;
         3704  +        }
  3691   3705         }
  3692   3706         sqlite3BtreeLeaveAll(db);
  3693   3707         sqlite3_mutex_leave(db->mutex);
  3694   3708         break;
  3695   3709       }
  3696   3710     }
  3697   3711     va_end(ap);