/ Check-in [1db198cc]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Avoid a linear scan of the entire table when ota updates or deletes a row from a table with an external primary key index.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | ota-update
Files: files | file ages | folders
SHA1: 1db198ccca1e5c5a922cefe3daeff8d2e5d3a7f7
User & Date: dan 2015-02-04 16:32:47
Context
2015-02-04
19:20
Fix the error messages returned by ota if an update violates a unique constraint. check-in: c4845a3b user: dan tags: ota-update
16:32
Avoid a linear scan of the entire table when ota updates or deletes a row from a table with an external primary key index. check-in: 1db198cc user: dan tags: ota-update
11:08
Fix a memory leak that could follow an OOM error in ota. check-in: 0d5415f2 user: dan tags: ota-update
Changes
Hide Diffs Unified Diffs Show Whitespace Changes Patch

Changes to ext/ota/sqlite3ota.c.

114
115
116
117
118
119
120
121

122
123
124
125
126
127
128
...
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
...
466
467
468
469
470
471
472
473

474
475
476
477
478
479
480
...
757
758
759
760
761
762
763













764
765
766
767
768
769
770
...
874
875
876
877
878
879
880





































































881
882
883
884
885
886
887
...
901
902
903
904
905
906
907
908


909
910
911
912
913
914
915
916
917
918
919
...
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
....
1063
1064
1065
1066
1067
1068
1069

1070
1071
1072
1073
1074
1075
1076
....
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
  unsigned char *abTblPk;         /* Array of flags, set on target PK columns */
  int eType;                      /* Table type - an OTA_PK_XXX value */

  /* Output variables. zTbl==0 implies EOF. */
  int bCleanup;                   /* True in "cleanup" state */
  const char *zTbl;               /* Name of target db table */
  const char *zIdx;               /* Name of target db index (or null) */
  int tnum;                       /* Root page of index (not table) */

  int bUnique;                    /* Current index is unique */
  int iVisit;                     /* Number of points visited, incl. current */

  /* Statements created by otaObjIterPrepareAll() */
  int nCol;                       /* Number of columns in current object */
  sqlite3_stmt *pSelect;          /* Source data */
  sqlite3_stmt *pInsert;          /* Statement for INSERT operations */
................................................................................
        pIter->bCleanup = 0;
        rc = sqlite3_step(pIter->pTblIter);
        if( rc!=SQLITE_ROW ){
          rc = sqlite3_reset(pIter->pTblIter);
          pIter->zTbl = 0;
        }else{
          pIter->zTbl = (const char*)sqlite3_column_text(pIter->pTblIter, 0);
          pIter->tnum = sqlite3_column_int(pIter->pTblIter, 1);
          rc = SQLITE_OK;
        }
      }else{
        if( pIter->zIdx==0 ){
          sqlite3_bind_text(pIter->pIdxIter, 1, pIter->zTbl, -1, SQLITE_STATIC);
        }
        rc = sqlite3_step(pIter->pIdxIter);
        if( rc!=SQLITE_ROW ){
          rc = sqlite3_reset(pIter->pIdxIter);
          pIter->bCleanup = 1;
          pIter->zIdx = 0;
        }else{
          pIter->zIdx = (const char*)sqlite3_column_text(pIter->pIdxIter, 0);
          pIter->tnum = sqlite3_column_int(pIter->pIdxIter, 1);
          pIter->bUnique = sqlite3_column_int(pIter->pIdxIter, 2);
          rc = SQLITE_OK;
        }
      }
    }
  }

................................................................................
    int rc2;                      /* sqlite3_finalize() return value */
    int bOtaRowid = 0;            /* If input table has column "ota_rowid" */
    int iOrder = 0;

    /* Figure out the type of table this step will deal with. */
    assert( pIter->eType==0 );
    sqlite3_test_control(
        SQLITE_TESTCTRL_TBLTYPE, p->db, "main", pIter->zTbl, &pIter->eType

    );
    assert( pIter->eType==OTA_PK_NONE || pIter->eType==OTA_PK_IPK 
         || pIter->eType==OTA_PK_EXTERNAL || pIter->eType==OTA_PK_WITHOUT_ROWID
         || pIter->eType==OTA_PK_VTAB
    );

    /* Populate the azTblCol[] and nTblCol variables based on the columns
................................................................................
static char *otaObjIterGetWhere(
  sqlite3ota *p, 
  OtaObjIter *pIter
){
  char *zList = 0;
  if( pIter->eType==OTA_PK_VTAB || pIter->eType==OTA_PK_NONE ){
    zList = otaMPrintf(p, "_rowid_ = ?%d", pIter->nTblCol+1);













  }else{
    const char *zSep = "";
    int i;
    for(i=0; i<pIter->nTblCol; i++){
      if( pIter->abTblPk[i] ){
        const char *zCol = pIter->azTblCol[i];
        zList = otaMPrintf(p, "%z%s\"%w\"=?%d", zList, zSep, zCol, i+1);
................................................................................
    z = otaMPrintf(p, "%z)", z);

    rc = sqlite3_finalize(pXInfo);
    if( p->rc==SQLITE_OK ) p->rc = rc;
  }
  return z;
}






































































/*
** If an error has already occurred when this function is called, it 
** immediately returns zero (without doing any work). Or, if an error
** occurs during the execution of this function, it sets the error code
** in the sqlite3ota object indicated by the first argument and returns
** zero.
................................................................................
**     precisely, the "same schema" means the same columns, types, collation
**     sequences and primary key declaration.
**
**   OTA_PK_VTAB:
**     No imposters required. 
**
**   OTA_PK_EXTERNAL:
**     Two imposters are required (TODO!!)


*/
static void otaCreateImposterTable(sqlite3ota *p, OtaObjIter *pIter){
  if( p->rc==SQLITE_OK && pIter->eType!=OTA_PK_VTAB ){
    int tnum = pIter->tnum;
    const char *zComma = "";
    char *zSql = 0;
    int iCol;
    sqlite3_test_control(SQLITE_TESTCTRL_IMPOSTER, p->db, "main", 0, 1);

    for(iCol=0; p->rc==SQLITE_OK && iCol<pIter->nTblCol; iCol++){
      const char *zPk = "";
................................................................................
static int otaObjIterPrepareAll(
  sqlite3ota *p, 
  OtaObjIter *pIter,
  int nOffset                     /* Add "LIMIT -1 OFFSET $nOffset" to SELECT */
){
  assert( pIter->bCleanup==0 );
  if( pIter->pSelect==0 && otaObjIterCacheTableInfo(p, pIter)==SQLITE_OK ){
    const int tnum = pIter->tnum;
    char *zCollist = 0;           /* List of indexed columns */
    char **pz = &p->zErrmsg;
    const char *zIdx = pIter->zIdx;
    char *zLimit = 0;

    if( nOffset ){
      zLimit = sqlite3_mprintf(" LIMIT -1 OFFSET %d", nOffset);
................................................................................
              zCollist, (bOtaRowid ? ", ota_rowid" : ""), zTbl, zLimit
            )
        );
      }

      /* Create the imposter table or tables (if required). */
      otaCreateImposterTable(p, pIter);

      zWrite = (pIter->eType==OTA_PK_VTAB ? zTbl : "ota_imposter");

      /* Create the INSERT statement to write to the target PK b-tree */
      if( p->rc==SQLITE_OK ){
        p->rc = prepareFreeAndCollectError(p->db, &pIter->pInsert, pz,
            sqlite3_mprintf(
              "INSERT INTO main.%Q(%s%s) VALUES(%s)", 
................................................................................

    case 2: /* create_ota_delta */ {
      sqlite3 *db = sqlite3ota_db(pOta);
      int rc = sqlite3_create_function(
          db, "ota_delta", -1, SQLITE_UTF8, (void*)interp, test_ota_delta, 0, 0
      );
      Tcl_SetObjResult(interp, Tcl_NewStringObj(sqlite3ErrName(rc), -1));
      sqlite3_exec(db, "PRAGMA vdbe_trace = 1", 0, 0, 0);
      ret = (rc==SQLITE_OK ? TCL_OK : TCL_ERROR);
      break;
    }

    default: /* seems unlikely */
      assert( !"cannot happen" );
      break;







|
>







 







|













|







 







|
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







 







|
>
>



|







 







|







 







>







 







<







114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
...
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
...
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
...
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
...
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
...
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
....
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
....
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
....
2024
2025
2026
2027
2028
2029
2030

2031
2032
2033
2034
2035
2036
2037
  unsigned char *abTblPk;         /* Array of flags, set on target PK columns */
  int eType;                      /* Table type - an OTA_PK_XXX value */

  /* Output variables. zTbl==0 implies EOF. */
  int bCleanup;                   /* True in "cleanup" state */
  const char *zTbl;               /* Name of target db table */
  const char *zIdx;               /* Name of target db index (or null) */
  int iTnum;                      /* Root page of current object */
  int iPkTnum;                    /* If eType==EXTERNAL, root of PK index */
  int bUnique;                    /* Current index is unique */
  int iVisit;                     /* Number of points visited, incl. current */

  /* Statements created by otaObjIterPrepareAll() */
  int nCol;                       /* Number of columns in current object */
  sqlite3_stmt *pSelect;          /* Source data */
  sqlite3_stmt *pInsert;          /* Statement for INSERT operations */
................................................................................
        pIter->bCleanup = 0;
        rc = sqlite3_step(pIter->pTblIter);
        if( rc!=SQLITE_ROW ){
          rc = sqlite3_reset(pIter->pTblIter);
          pIter->zTbl = 0;
        }else{
          pIter->zTbl = (const char*)sqlite3_column_text(pIter->pTblIter, 0);
          pIter->iTnum = sqlite3_column_int(pIter->pTblIter, 1);
          rc = SQLITE_OK;
        }
      }else{
        if( pIter->zIdx==0 ){
          sqlite3_bind_text(pIter->pIdxIter, 1, pIter->zTbl, -1, SQLITE_STATIC);
        }
        rc = sqlite3_step(pIter->pIdxIter);
        if( rc!=SQLITE_ROW ){
          rc = sqlite3_reset(pIter->pIdxIter);
          pIter->bCleanup = 1;
          pIter->zIdx = 0;
        }else{
          pIter->zIdx = (const char*)sqlite3_column_text(pIter->pIdxIter, 0);
          pIter->iTnum = sqlite3_column_int(pIter->pIdxIter, 1);
          pIter->bUnique = sqlite3_column_int(pIter->pIdxIter, 2);
          rc = SQLITE_OK;
        }
      }
    }
  }

................................................................................
    int rc2;                      /* sqlite3_finalize() return value */
    int bOtaRowid = 0;            /* If input table has column "ota_rowid" */
    int iOrder = 0;

    /* Figure out the type of table this step will deal with. */
    assert( pIter->eType==0 );
    sqlite3_test_control(
        SQLITE_TESTCTRL_TBLTYPE, p->db, "main", pIter->zTbl, &pIter->eType,
        &pIter->iPkTnum
    );
    assert( pIter->eType==OTA_PK_NONE || pIter->eType==OTA_PK_IPK 
         || pIter->eType==OTA_PK_EXTERNAL || pIter->eType==OTA_PK_WITHOUT_ROWID
         || pIter->eType==OTA_PK_VTAB
    );

    /* Populate the azTblCol[] and nTblCol variables based on the columns
................................................................................
static char *otaObjIterGetWhere(
  sqlite3ota *p, 
  OtaObjIter *pIter
){
  char *zList = 0;
  if( pIter->eType==OTA_PK_VTAB || pIter->eType==OTA_PK_NONE ){
    zList = otaMPrintf(p, "_rowid_ = ?%d", pIter->nTblCol+1);
  }else if( pIter->eType==OTA_PK_EXTERNAL ){
    const char *zSep = "";
    int i;
    for(i=0; i<pIter->nTblCol; i++){
      if( pIter->abTblPk[i] ){
        zList = otaMPrintf(p, "%z%sc%d=?%d", zList, zSep, i, i+1);
        zSep = " AND ";
      }
    }
    zList = otaMPrintf(p, 
        "_rowid_ = (SELECT id FROM ota_imposter2 WHERE %z)", zList
    );

  }else{
    const char *zSep = "";
    int i;
    for(i=0; i<pIter->nTblCol; i++){
      if( pIter->abTblPk[i] ){
        const char *zCol = pIter->azTblCol[i];
        zList = otaMPrintf(p, "%z%s\"%w\"=?%d", zList, zSep, zCol, i+1);
................................................................................
    z = otaMPrintf(p, "%z)", z);

    rc = sqlite3_finalize(pXInfo);
    if( p->rc==SQLITE_OK ) p->rc = rc;
  }
  return z;
}

static void otaCreateImposterTable2(sqlite3ota *p, OtaObjIter *pIter){
  if( p->rc==SQLITE_OK && pIter->eType==OTA_PK_EXTERNAL ){
    int tnum = pIter->iPkTnum;    /* Root page of PK index */
    sqlite3_stmt *pQuery = 0;     /* SELECT name ... WHERE rootpage = $tnum */
    const char *zIdx = 0;         /* Name of PK index */
    sqlite3_stmt *pXInfo = 0;     /* PRAGMA main.index_xinfo = $zIdx */
    int rc;

    const char *zComma = "";

    char *zCols = 0;              /* Used to build up list of table cols */
    char *zPk = 0;                /* Used to build up table PK declaration */
    char *zSql = 0;               /* CREATE TABLE statement */

    /* Figure out the name of the primary key index for the current table.
    ** This is needed for the argument to "PRAGMA index_xinfo". Set
    ** zIdx to point to a nul-terminated string containing this name. */
    p->rc = prepareAndCollectError(p->db, &pQuery, &p->zErrmsg, 
        "SELECT name FROM sqlite_master WHERE rootpage = ?"
    );
    if( p->rc==SQLITE_OK ){
      sqlite3_bind_int(pQuery, 1, tnum);
      if( SQLITE_ROW==sqlite3_step(pQuery) ){
        zIdx = (const char*)sqlite3_column_text(pQuery, 0);
      }
      if( zIdx==0 ){
        p->rc = SQLITE_CORRUPT;
      }
    }
    assert( (zIdx==0)==(p->rc!=SQLITE_OK) );

    if( p->rc==SQLITE_OK ){
      p->rc = prepareFreeAndCollectError(p->db, &pXInfo, &p->zErrmsg,
          sqlite3_mprintf("PRAGMA main.index_xinfo = %Q", zIdx)
      );
    }
    sqlite3_finalize(pQuery);

    while( p->rc==SQLITE_OK && SQLITE_ROW==sqlite3_step(pXInfo) ){
      int bKey = sqlite3_column_int(pXInfo, 5);
      if( bKey ){
        int iCid = sqlite3_column_int(pXInfo, 1);
        int bDesc = sqlite3_column_int(pXInfo, 3);
        const char *zCollate = (const char*)sqlite3_column_text(pXInfo, 4);
        zCols = otaMPrintf(p, "%z%sc%d %s COLLATE %s", zCols, zComma, 
            iCid, pIter->azTblType[iCid], zCollate
        );
        zPk = otaMPrintf(p, "%z%sc%d%s", zPk, zComma, iCid, bDesc?" DESC":"");
        zComma = ", ";
      }
    }
    zCols = otaMPrintf(p, "%z, id INTEGER", zCols);
    rc = sqlite3_finalize(pXInfo);
    if( p->rc==SQLITE_OK ) p->rc = rc;

    zSql = otaMPrintf(p, 
        "CREATE TABLE ota_imposter2(%z, PRIMARY KEY(%z)) WITHOUT ROWID", 
        zCols, zPk
    );
    assert( (zSql==0)==(p->rc!=SQLITE_OK) );
    if( zSql ){
      sqlite3_test_control(SQLITE_TESTCTRL_IMPOSTER, p->db, "main", 1, tnum);
      p->rc = sqlite3_exec(p->db, zSql, 0, 0, &p->zErrmsg);
      sqlite3_test_control(SQLITE_TESTCTRL_IMPOSTER, p->db, "main", 0, 0);
    }
    sqlite3_free(zSql);
  }
}

/*
** If an error has already occurred when this function is called, it 
** immediately returns zero (without doing any work). Or, if an error
** occurs during the execution of this function, it sets the error code
** in the sqlite3ota object indicated by the first argument and returns
** zero.
................................................................................
**     precisely, the "same schema" means the same columns, types, collation
**     sequences and primary key declaration.
**
**   OTA_PK_VTAB:
**     No imposters required. 
**
**   OTA_PK_EXTERNAL:
**     Two imposters are required. The first has the same schema as the
**     target database table, with no PRIMARY KEY or UNIQUE clauses. The
**     second is used to access the PK b-tree index on disk.
*/
static void otaCreateImposterTable(sqlite3ota *p, OtaObjIter *pIter){
  if( p->rc==SQLITE_OK && pIter->eType!=OTA_PK_VTAB ){
    int tnum = pIter->iTnum;
    const char *zComma = "";
    char *zSql = 0;
    int iCol;
    sqlite3_test_control(SQLITE_TESTCTRL_IMPOSTER, p->db, "main", 0, 1);

    for(iCol=0; p->rc==SQLITE_OK && iCol<pIter->nTblCol; iCol++){
      const char *zPk = "";
................................................................................
static int otaObjIterPrepareAll(
  sqlite3ota *p, 
  OtaObjIter *pIter,
  int nOffset                     /* Add "LIMIT -1 OFFSET $nOffset" to SELECT */
){
  assert( pIter->bCleanup==0 );
  if( pIter->pSelect==0 && otaObjIterCacheTableInfo(p, pIter)==SQLITE_OK ){
    const int tnum = pIter->iTnum;
    char *zCollist = 0;           /* List of indexed columns */
    char **pz = &p->zErrmsg;
    const char *zIdx = pIter->zIdx;
    char *zLimit = 0;

    if( nOffset ){
      zLimit = sqlite3_mprintf(" LIMIT -1 OFFSET %d", nOffset);
................................................................................
              zCollist, (bOtaRowid ? ", ota_rowid" : ""), zTbl, zLimit
            )
        );
      }

      /* Create the imposter table or tables (if required). */
      otaCreateImposterTable(p, pIter);
      otaCreateImposterTable2(p, pIter);
      zWrite = (pIter->eType==OTA_PK_VTAB ? zTbl : "ota_imposter");

      /* Create the INSERT statement to write to the target PK b-tree */
      if( p->rc==SQLITE_OK ){
        p->rc = prepareFreeAndCollectError(p->db, &pIter->pInsert, pz,
            sqlite3_mprintf(
              "INSERT INTO main.%Q(%s%s) VALUES(%s)", 
................................................................................

    case 2: /* create_ota_delta */ {
      sqlite3 *db = sqlite3ota_db(pOta);
      int rc = sqlite3_create_function(
          db, "ota_delta", -1, SQLITE_UTF8, (void*)interp, test_ota_delta, 0, 0
      );
      Tcl_SetObjResult(interp, Tcl_NewStringObj(sqlite3ErrName(rc), -1));

      ret = (rc==SQLITE_OK ? TCL_OK : TCL_ERROR);
      break;
    }

    default: /* seems unlikely */
      assert( !"cannot happen" );
      break;

Changes to src/main.c.

3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669







3670
3671
3672
3673
3674
3675

3676

3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689

3690




3691
3692
3693
3694
3695
3696
3697
      if( db->init.busy==0 && db->init.newTnum>0 ){
        sqlite3ResetAllSchemasOfConnection(db);
      }
      sqlite3_mutex_leave(db->mutex);
      break;
    }

    /* sqlite3_test_control(SQLITE_TESTCTRL_TBLTYPE, db, dbName, zTbl, peType)
    **
    **   peType is of type (int*), a pointer to an output parameter of type
    **   (int). This call sets the output parameter as follows, depending
    **   on the type of the table specified by parameters dbName and zTbl.
    **
    **     0: No such table.
    **     1: Table has an implicit rowid.
    **     2: Table has an explicit IPK column.
    **     3: Table has an external PK index.
    **     4: Table is WITHOUT ROWID.
    **     5: Table is a virtual table.







    */
    case SQLITE_TESTCTRL_TBLTYPE: {
      sqlite3 *db = va_arg(ap, sqlite3*);
      const char *zDb = va_arg(ap, const char*);
      const char *zTab = va_arg(ap, const char*);
      int *peType = va_arg(ap, int*);

      Table *pTab;

      sqlite3_mutex_enter(db->mutex);
      sqlite3BtreeEnterAll(db);
      pTab = sqlite3FindTable(db, zTab, zDb);
      if( pTab==0 ){
        *peType = 0;
      }else if( IsVirtual(pTab) ){
        *peType = 5;
      }else if( HasRowid(pTab)==0 ){
        *peType = 4;
      }else if( pTab->iPKey>=0 ){
        *peType = 2;
      }else{
        Index *pPk = sqlite3PrimaryKeyIndex(pTab);

        *peType = (pPk ? 3 : 1);




      }
      sqlite3BtreeLeaveAll(db);
      sqlite3_mutex_leave(db->mutex);
      break;
    }
  }
  va_end(ap);







|











>
>
>
>
>
>
>






>

>













>
|
>
>
>
>







3651
3652
3653
3654
3655
3656
3657
3658
3659
3660
3661
3662
3663
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3702
3703
3704
3705
3706
3707
3708
3709
3710
3711
      if( db->init.busy==0 && db->init.newTnum>0 ){
        sqlite3ResetAllSchemasOfConnection(db);
      }
      sqlite3_mutex_leave(db->mutex);
      break;
    }

    /* sqlite3_test_control(TESTCTRL_TBLTYPE, db, dbName, zTbl, peType, piPk)
    **
    **   peType is of type (int*), a pointer to an output parameter of type
    **   (int). This call sets the output parameter as follows, depending
    **   on the type of the table specified by parameters dbName and zTbl.
    **
    **     0: No such table.
    **     1: Table has an implicit rowid.
    **     2: Table has an explicit IPK column.
    **     3: Table has an external PK index.
    **     4: Table is WITHOUT ROWID.
    **     5: Table is a virtual table.
    **
    **   Argument *piPk is also of type (int*), and also points to an output
    **   parameter. Unless the table has an external primary key index 
    **   (i.e. unless *peType is set to 3), then *piPk is set to zero. Or,
    **   if the table does have an external primary key index, then *piPk
    **   is set to the root page number of the primary key index before
    **   returning.
    */
    case SQLITE_TESTCTRL_TBLTYPE: {
      sqlite3 *db = va_arg(ap, sqlite3*);
      const char *zDb = va_arg(ap, const char*);
      const char *zTab = va_arg(ap, const char*);
      int *peType = va_arg(ap, int*);
      int *piPk = va_arg(ap, int*);
      Table *pTab;
      *piPk = 0;
      sqlite3_mutex_enter(db->mutex);
      sqlite3BtreeEnterAll(db);
      pTab = sqlite3FindTable(db, zTab, zDb);
      if( pTab==0 ){
        *peType = 0;
      }else if( IsVirtual(pTab) ){
        *peType = 5;
      }else if( HasRowid(pTab)==0 ){
        *peType = 4;
      }else if( pTab->iPKey>=0 ){
        *peType = 2;
      }else{
        Index *pPk = sqlite3PrimaryKeyIndex(pTab);
        if( pPk ){
          *peType = 3;
          *piPk = pPk->tnum;
        }else{
          *peType = 1;
        }
      }
      sqlite3BtreeLeaveAll(db);
      sqlite3_mutex_leave(db->mutex);
      break;
    }
  }
  va_end(ap);