SQLite4
Check-in [f10991c423]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix an issue with using a bt cursor after sqlite4BtCsrDelete() has been called on it.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: f10991c423496070e1b49ad805a7afe152362f41
User & Date: dan 2013-11-13 15:21:46
Context
2013-11-13
16:34
Fix a problem in the VM code generated for a CREATE INDEX statement. check-in: e42f8bb1c2 user: dan tags: trunk
15:21
Fix an issue with using a bt cursor after sqlite4BtCsrDelete() has been called on it. check-in: f10991c423 user: dan tags: trunk
2013-11-11
20:27
Add the BT_CONTROL_MULTIPROC option. check-in: 7343be21c9 user: dan tags: trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/bt_main.c.

33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53




54
55
56
57
58
59
60
...
186
187
188
189
190
191
192

193
194
195
196
197
198
199
...
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223











224
225
226
227
228
229
230
...
476
477
478
479
480
481
482




483
484
485
486
487
488
489
490
491
492



493
494
495
496
497
498
499
...
538
539
540
541
542
543
544














545
546
547
548
549

550
551

552
553
554
555
556
557
558

559

560
561
562
563
564
565
566
...
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
...
633
634
635
636
637
638
639






















640
641
642
643
644
645
646
647
648
649







650
651
652
653
654
655
656
...
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
...
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887





888
889
890

891
892
893
894
895
896
897
...
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
....
1036
1037
1038
1039
1040
1041
1042
1043
1044

1045
1046
1047
1048
1049
1050
1051
1052
....
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
....
2332
2333
2334
2335
2336
2337
2338





















2339
2340


2341

2342
2343
2344
2345
2346
2347


2348
2349
2350
2351
2352
2353
2354
  bt_cursor *pAllCsr;             /* List of all open cursors */
};

typedef struct BtOvfl BtOvfl;
struct BtOvfl {
  int nKey;
  int nVal;
  u8 *pBuf;
  int nBuf;
};

/*
** Database cursor handle.
*/
struct bt_cursor {
  bt_db *pDb;                     /* Database that owns this cursor */
  int nPg;                        /* Number of valid entries in apPage[] */
  int aiCell[BT_MAX_DEPTH];       /* Current cell of each apPage[] entry */
  BtPage *apPage[BT_MAX_DEPTH];   /* All pages from root to current leaf */
  BtOvfl ovfl;                    /* Overflow cache (see above) */
  bt_cursor *pNextCsr;            /* Next cursor opened by same db handle */




};

#ifndef btErrorBkpt
int btErrorBkpt(int rc){
  static int error_cnt = 0;
  error_cnt++;
  return rc;
................................................................................

int sqlite4BtTransactionLevel(bt_db *db){
  return sqlite4BtPagerTransactionLevel(db->pPager);
}

static void btCsrSetup(bt_db *db, bt_cursor *pCsr){
  memset(pCsr, 0, sizeof(bt_cursor));

  pCsr->pDb = db;
}

int sqlite4BtCsrOpen(bt_db *db, int nExtra, bt_cursor **ppCsr){
  int rc = SQLITE4_OK;            /* Return Code */
  int nByte;                      /* Total bytes of space to allocate */
  bt_cursor *pCsr;                /* New cursor object */
................................................................................
    db->pAllCsr = pCsr;
  }

  btCheckPageRefs(db);
  return rc;
}

static void btCsrReset(bt_cursor *pCsr, int bFreeBuffer){
  int i;
  for(i=0; i<pCsr->nPg; i++){
    sqlite4BtPageRelease(pCsr->apPage[i]);
  }
  if( bFreeBuffer ) sqlite4_free(pCsr->pDb->pEnv, pCsr->ovfl.pBuf);
  pCsr->nPg = 0;
}












int sqlite4BtCsrClose(bt_cursor *pCsr){
  if( pCsr ){
    bt_db *pDb = pCsr->pDb;
    bt_cursor **pp;
    btCheckPageRefs(pDb);
    btCsrReset(pCsr, 1);
    for(pp=&pDb->pAllCsr; *pp!=pCsr; pp=&(*pp)->pNextCsr);
................................................................................
  }

  btCsrAscend(pCsr, nAscend);
  *piRes = res;
  return rc;
}





static int btCsrSeek(
  bt_cursor *pCsr, 
  const void *pK,                 /* Key to seek for */
  int nK,                         /* Size of key pK in bytes */
  int eSeek,                      /* Seek mode (a BT_SEEK_XXX constant) */
  int bUpdate
){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  u32 pgno;                       /* Page number for next page to load */
  int rc = SQLITE4_OK;            /* Return Code */




  /* Reset the cursor */
  btCsrReset(pCsr, 0);

  /* Figure out the root page number */
  assert( pCsr->nPg==0 );
  pgno = sqlite4BtPagerRootpgno(pCsr->pDb->pPager);
................................................................................
      pCsr->aiCell[pCsr->nPg-1] = iHi;
      if( bLeaf==0 ){
        pgno = btChildPgno(aData, pgsz, iHi);
      }else{
        pgno = 0;

        if( res!=0 ){














          assert( BT_SEEK_LEFAST<0 && BT_SEEK_LE<0 );
          if( eSeek<0 ){
            rc = sqlite4BtCsrPrev(pCsr);
          }else if( eSeek==BT_SEEK_EQ ){
            rc = SQLITE4_NOTFOUND;

          }else if( iHi==nCell ){
            if( bUpdate ){

              rc = SQLITE4_NOTFOUND;
            }else{
              rc = sqlite4BtCsrNext(pCsr);
            }
          }else{
            rc = SQLITE4_INEXACT;
          }

          if( rc==SQLITE4_OK ) rc = SQLITE4_INEXACT;

        }
      }
    }
  }

  return rc;
}
................................................................................
  bt_cursor *pCsr, 
  const void *pK,                 /* Key to seek for */
  int nK,                         /* Size of key pK in bytes */
  int eSeek                       /* Seek mode (a BT_SEEK_XXX constant) */
){
  int rc;
  btCheckPageRefs(pCsr->pDb);
  rc = btCsrSeek(pCsr, pK, nK, eSeek, 0);
  btCheckPageRefs(pCsr->pDb);
  return rc;
}

/*
** This function seeks the cursor as required for either sqlite4BtCsrFirst()
** (if parameter bLast is false) or sqlite4BtCsrLast() (if bLast is true).
................................................................................
/*
** Position cursor pCsr to point to the largest key in the database.
*/
int sqlite4BtCsrLast(bt_cursor *pCsr){
  return btCsrEnd(pCsr, 1);
}
























/*
** This function does the work of both sqlite4BtCsrNext() (if parameter
** bNext is true) and Pref() (if bNext is false).
*/
static int btCsrStep(bt_cursor *pCsr, int bNext){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;
  int bRequireDescent = 0;








  while( rc==SQLITE4_OK ){
    int iPg = pCsr->nPg-1;
    int iCell = pCsr->aiCell[iPg];

    if( bNext ){
      u8 *aData = (u8*)sqlite4BtPageData(pCsr->apPage[iPg]);
      int nCell = btCellCount(aData, pgsz);
................................................................................
/*
** Retreat to the previous entry in the tree.
*/
int sqlite4BtCsrPrev(bt_cursor *pCsr){
  return btCsrStep(pCsr, 0);
}

static int btGrowBuffer(bt_db *db, int nReq, u8 **ppVal, int *pnVal){
  if( nReq>*pnVal ){
    u8 *pNew = sqlite4_malloc(db->pEnv, nReq*2);
    if( pNew==0 ) return btErrorBkpt(SQLITE4_NOMEM);
    sqlite4_free(db->pEnv, *ppVal);
    *ppVal = pNew;
    *pnVal = nReq*2;
  }
  return SQLITE4_OK;
}

static int btOverflowArrayRead(
  bt_db *db,
  u8 *pOvfl,
  u8 *aOut,
  int nOut
){
  const int pgsz = sqlite4BtPagerPagesize(db->pPager);
................................................................................
    }
  }

  return rc;
}

/*
** If the current cell is a type (c) leaf cell, load the entire key
** into the pCsr->ovfl buffer. If bVal is true, then also load the
** entries value into the buffer.
*/
static int btOverflowBuffer(bt_cursor *pCsr, int bVal){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;
  u8 *aData;
  u8 *pCell;
  int iCell = pCsr->aiCell[pCsr->nPg-1];

  int nK;
  int nReq;
  u8 *pLocal;                     /* Pointer to local data within leaf page */
  u8 *aOut;                       /* Output buffer for overflow data */
  int nLocal = 0;
  int nOvfl1 = 0;
  int nOvfl2 = 0;

  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  pCell = btCellFind(aData, pgsz, iCell);
  pCell += sqlite4BtVarintGet32(pCell, &nK);
  if( nK==0 ){
    /* type (c) leaf cell */
    pCell += sqlite4BtVarintGet32(pCell, &nLocal);
    pLocal = pCell;
    pCell += nLocal;
    pCell += sqlite4BtVarintGet32(pCell, &nOvfl1);
    pCell += sqlite4BtVarintGet32(pCell, &nOvfl2);

    pCsr->ovfl.nKey = nLocal + nOvfl1;
    pCsr->ovfl.nVal = nOvfl2;
  }else{
    /* type (b) leaf cell */
    pCell += nK;
    assert( pCell[0]==0x00 );
    pCell++;

    pCell += sqlite4BtVarintGet32(pCell, &nLocal);
    pLocal = pCell;
    pCell += nLocal;
    pCell += sqlite4BtVarintGet32(pCell, &nOvfl1);

    pCsr->ovfl.nKey = 0;
    pCsr->ovfl.nVal = nLocal + nOvfl1;
  }

  nReq = nLocal + nOvfl1 + nOvfl2;
  rc = btGrowBuffer(pCsr->pDb, nReq, &pCsr->ovfl.pBuf, &pCsr->ovfl.nBuf);
  if( rc!=SQLITE4_OK ) return rc;

  /* Copy in local data */
  memcpy(pCsr->ovfl.pBuf, pLocal, nLocal);

  /* Load in overflow data */
  aOut = &pCsr->ovfl.pBuf[nLocal];
  rc = btOverflowArrayRead(pCsr->pDb, pCell, aOut, nOvfl1 + nOvfl2);






  return rc;
}


/*
** Helper function for btOverflowDelete(). 
**
** TODO: This uses recursion. Which is almost certainly not a problem 
** here, but makes some people nervous, so should probably be changed.
*/
................................................................................
  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  assert( btCellCount(aData, pgsz)>iCell );
  pCell = btCellFind(aData, pgsz, iCell);
  pCell += sqlite4BtVarintGet32(pCell, &nK);

  if( nK==0 ){
    /* type (c) leaf cell */
    rc = btOverflowBuffer(pCsr, 0);
    if( rc==SQLITE4_OK ){
      *ppK = pCsr->ovfl.pBuf;
      *pnK = pCsr->ovfl.nKey;
    }
  }else{
    *ppK = pCell;
    *pnK = nK;
  }

................................................................................
  pCell += sqlite4BtVarintGet32(pCell, &nK);
  if( nK>0 ){
    pCell += nK;
    pCell += sqlite4BtVarintGet32(pCell, &nV);
  }

  if( nV==0 ){
    rc = btOverflowBuffer(pCsr, 1);
    if( rc==SQLITE4_OK ){

      *ppV = &pCsr->ovfl.pBuf[pCsr->ovfl.nKey];
      *pnV = pCsr->ovfl.nVal;
    }
  }else{
    *ppV = pCell;
    *pnV = (nV-1);
  }

................................................................................
  int rc = SQLITE4_OK;
  bt_cursor csr;

  sqlite4BtDebugKV((BtLock*)db->pPager, "replace", (u8*)pK, nK, (u8*)pV, nV);

  btCheckPageRefs(db);
  btCsrSetup(db, &csr);
  rc = btCsrSeek(&csr, pK, nK, BT_SEEK_GE, 1);
  if( rc==SQLITE4_OK ){
    /* The cursor currently points to an entry with key pK/nK. This call
    ** should therefore replace that entry. So delete it and then re-seek
    ** the cursor.  */
    rc = sqlite4BtDelete(&csr);

    if( rc==SQLITE4_OK && nV>=0 ){
      rc = btCsrSeek(&csr, pK, nK, BT_SEEK_GE, 1);
      if( rc==SQLITE4_OK ) rc = btErrorBkpt(SQLITE4_CORRUPT);
    }
  }

  if( nV>=0 && (rc==SQLITE4_NOTFOUND || rc==SQLITE4_INEXACT) ){
    /* Insert the new KV pair into the current leaf. */
    KeyValue kv;
................................................................................
  }
  btCsrReset(&csr, 1);

  btCheckPageRefs(db);
  return rc;
}






















int sqlite4BtDelete(bt_cursor *pCsr){
  int rc;


  rc = btOverflowDelete(pCsr);

  if( rc==SQLITE4_OK ){
    rc =  btDeleteFromPage(pCsr, 1);
  }
  if( rc==SQLITE4_OK ){
    rc = btBalanceIfUnderfull(pCsr);
  }


  return rc;
}

int sqlite4BtSetCookie(bt_db *db, unsigned int iVal){
  return sqlite4BtPagerSetCookie(db->pPager, iVal);
}








|
<












>
>
>
>







 







>







 







|




<



>
>
>
>
>
>
>
>
>
>
>







 







>
>
>
>





|




>
>
>







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|
|
<
<
>
|
<
>
|
|
|
|
<
<
|
>
|
>







 







|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>










>
>
>
>
>
>
>







 







<
<
<
<
<
<
<
<
<
<
<







 







|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>
>
>
>
>



>







 







|

|







 







|

>
|







 







|







|







 







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


>
>
|
>






>
>







33
34
35
36
37
38
39
40

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
...
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
...
212
213
214
215
216
217
218
219
220
221
222
223

224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
...
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
...
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582


583
584

585
586
587
588
589


590
591
592
593
594
595
596
597
598
599
600
...
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
...
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
...
772
773
774
775
776
777
778











779
780
781
782
783
784
785
...
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
....
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
....
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
....
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
....
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
  bt_cursor *pAllCsr;             /* List of all open cursors */
};

typedef struct BtOvfl BtOvfl;
struct BtOvfl {
  int nKey;
  int nVal;
  sqlite4_buffer buf;

};

/*
** Database cursor handle.
*/
struct bt_cursor {
  bt_db *pDb;                     /* Database that owns this cursor */
  int nPg;                        /* Number of valid entries in apPage[] */
  int aiCell[BT_MAX_DEPTH];       /* Current cell of each apPage[] entry */
  BtPage *apPage[BT_MAX_DEPTH];   /* All pages from root to current leaf */
  BtOvfl ovfl;                    /* Overflow cache (see above) */
  bt_cursor *pNextCsr;            /* Next cursor opened by same db handle */

  int bRequireReseek;
  int bSkipNext;
  int bSkipPrev;
};

#ifndef btErrorBkpt
int btErrorBkpt(int rc){
  static int error_cnt = 0;
  error_cnt++;
  return rc;
................................................................................

int sqlite4BtTransactionLevel(bt_db *db){
  return sqlite4BtPagerTransactionLevel(db->pPager);
}

static void btCsrSetup(bt_db *db, bt_cursor *pCsr){
  memset(pCsr, 0, sizeof(bt_cursor));
  sqlite4_env_config(db->pEnv, SQLITE4_ENVCONFIG_GETMM, &pCsr->ovfl.buf.pMM);
  pCsr->pDb = db;
}

int sqlite4BtCsrOpen(bt_db *db, int nExtra, bt_cursor **ppCsr){
  int rc = SQLITE4_OK;            /* Return Code */
  int nByte;                      /* Total bytes of space to allocate */
  bt_cursor *pCsr;                /* New cursor object */
................................................................................
    db->pAllCsr = pCsr;
  }

  btCheckPageRefs(db);
  return rc;
}

static void btCsrReleaseAll(bt_cursor *pCsr){
  int i;
  for(i=0; i<pCsr->nPg; i++){
    sqlite4BtPageRelease(pCsr->apPage[i]);
  }

  pCsr->nPg = 0;
}


static void btCsrReset(bt_cursor *pCsr, int bFreeBuffer){
  btCsrReleaseAll(pCsr);
  if( bFreeBuffer ){
    sqlite4_buffer_clear(&pCsr->ovfl.buf);
  }
  pCsr->bSkipNext = 0;
  pCsr->bSkipPrev = 0;
  pCsr->bRequireReseek = 0;
}

int sqlite4BtCsrClose(bt_cursor *pCsr){
  if( pCsr ){
    bt_db *pDb = pCsr->pDb;
    bt_cursor **pp;
    btCheckPageRefs(pDb);
    btCsrReset(pCsr, 1);
    for(pp=&pDb->pAllCsr; *pp!=pCsr; pp=&(*pp)->pNextCsr);
................................................................................
  }

  btCsrAscend(pCsr, nAscend);
  *piRes = res;
  return rc;
}

#define BT_CSRSEEK_SEEK   0
#define BT_CSRSEEK_UPDATE 1
#define BT_CSRSEEK_RESEEK 2

static int btCsrSeek(
  bt_cursor *pCsr, 
  const void *pK,                 /* Key to seek for */
  int nK,                         /* Size of key pK in bytes */
  int eSeek,                      /* Seek mode (a BT_SEEK_XXX constant) */
  int eCsrseek
){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  u32 pgno;                       /* Page number for next page to load */
  int rc = SQLITE4_OK;            /* Return Code */

  assert( eSeek==BT_SEEK_EQ || eCsrseek!=BT_CSRSEEK_RESEEK );
  assert( eSeek==BT_SEEK_GE || eCsrseek!=BT_CSRSEEK_UPDATE );

  /* Reset the cursor */
  btCsrReset(pCsr, 0);

  /* Figure out the root page number */
  assert( pCsr->nPg==0 );
  pgno = sqlite4BtPagerRootpgno(pCsr->pDb->pPager);
................................................................................
      pCsr->aiCell[pCsr->nPg-1] = iHi;
      if( bLeaf==0 ){
        pgno = btChildPgno(aData, pgsz, iHi);
      }else{
        pgno = 0;

        if( res!=0 ){
          if( eSeek==BT_SEEK_EQ ){
            if( eCsrseek==BT_CSRSEEK_RESEEK ){
              rc = SQLITE4_OK;
              if( iHi==nCell ){
                assert( pCsr->aiCell[pCsr->nPg-1]>0 );
                pCsr->aiCell[pCsr->nPg-1]--;
                pCsr->bSkipPrev = 1;
              }else{
                pCsr->bSkipNext = 1;
              }
            }else{
              rc = SQLITE4_NOTFOUND;
            }
          }else{
            assert( BT_SEEK_LEFAST<0 && BT_SEEK_LE<0 );
            if( eSeek<0 ){
              rc = sqlite4BtCsrPrev(pCsr);


            }else{
              if( iHi==nCell ){

                if( eCsrseek==BT_CSRSEEK_UPDATE ){
                  rc = SQLITE4_NOTFOUND;
                }else{
                  rc = sqlite4BtCsrNext(pCsr);
                }


              }
            }
            if( rc==SQLITE4_OK ) rc = SQLITE4_INEXACT;
          }
        }
      }
    }
  }

  return rc;
}
................................................................................
  bt_cursor *pCsr, 
  const void *pK,                 /* Key to seek for */
  int nK,                         /* Size of key pK in bytes */
  int eSeek                       /* Seek mode (a BT_SEEK_XXX constant) */
){
  int rc;
  btCheckPageRefs(pCsr->pDb);
  rc = btCsrSeek(pCsr, pK, nK, eSeek, BT_CSRSEEK_SEEK);
  btCheckPageRefs(pCsr->pDb);
  return rc;
}

/*
** This function seeks the cursor as required for either sqlite4BtCsrFirst()
** (if parameter bLast is false) or sqlite4BtCsrLast() (if bLast is true).
................................................................................
/*
** Position cursor pCsr to point to the largest key in the database.
*/
int sqlite4BtCsrLast(bt_cursor *pCsr){
  return btCsrEnd(pCsr, 1);
}

static int btCsrReseek(bt_cursor *pCsr){
  int rc = SQLITE4_OK;
  if( pCsr->bRequireReseek ){
    BtOvfl ovfl;
    memcpy(&ovfl, &pCsr->ovfl, sizeof(BtOvfl));

    pCsr->ovfl.buf.n = 0;
    pCsr->ovfl.buf.p = 0;
    pCsr->bSkipNext = 0;
    pCsr->bRequireReseek = 0;

    rc = btCsrSeek(pCsr, ovfl.buf.p, ovfl.nKey, BT_SEEK_EQ, BT_CSRSEEK_RESEEK);
    assert( rc!=SQLITE4_INEXACT );
    if( pCsr->ovfl.buf.p==0 ){
      pCsr->ovfl.buf.p = ovfl.buf.p;
    }else{
      sqlite4_buffer_clear(&ovfl.buf);
    }
  }
  return rc;
}


/*
** This function does the work of both sqlite4BtCsrNext() (if parameter
** bNext is true) and Pref() (if bNext is false).
*/
static int btCsrStep(bt_cursor *pCsr, int bNext){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;
  int bRequireDescent = 0;

  rc = btCsrReseek(pCsr);
  if( (pCsr->bSkipNext && bNext) || (pCsr->bSkipPrev && bNext==0) ){
    pCsr->bSkipPrev = pCsr->bSkipNext = 0;
    return rc;
  }
  pCsr->bSkipPrev = pCsr->bSkipNext = 0;

  while( rc==SQLITE4_OK ){
    int iPg = pCsr->nPg-1;
    int iCell = pCsr->aiCell[iPg];

    if( bNext ){
      u8 *aData = (u8*)sqlite4BtPageData(pCsr->apPage[iPg]);
      int nCell = btCellCount(aData, pgsz);
................................................................................
/*
** Retreat to the previous entry in the tree.
*/
int sqlite4BtCsrPrev(bt_cursor *pCsr){
  return btCsrStep(pCsr, 0);
}












static int btOverflowArrayRead(
  bt_db *db,
  u8 *pOvfl,
  u8 *aOut,
  int nOut
){
  const int pgsz = sqlite4BtPagerPagesize(db->pPager);
................................................................................
    }
  }

  return rc;
}

/*
** Buffer the key and value belonging to the current cursor position
** in pCsr->ovfl.
*/
static int btCsrBuffer(bt_cursor *pCsr, int bVal){
  const int pgsz = sqlite4BtPagerPagesize(pCsr->pDb->pPager);
  int rc = SQLITE4_OK;            /* Return code */
  u8 *aData;                      /* Page data */
  u8 *pCell;                      /* Pointer to cell within aData[] */
  int nReq;                       /* Total required space */
  u8 *aOut;                       /* Output buffer */
  u8 *pKLocal = 0;                /* Pointer to local part of key */
  u8 *pVLocal = 0;                /* Pointer to local part of value (if any) */
  int nKLocal = 0;                /* Bytes of key on page */
  int nVLocal = 0;                /* Bytes of value on page */
  int nKOvfl = 0;                 /* Bytes of key on overflow pages */
  int nVOvfl = 0;                 /* Bytes of value on overflow pages */

  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  pCell = btCellFind(aData, pgsz, pCsr->aiCell[pCsr->nPg-1]);
  pCell += sqlite4BtVarintGet32(pCell, &nKLocal);
  if( nKLocal==0 ){
    /* Type (c) leaf cell. */
    pCell += sqlite4BtVarintGet32(pCell, &nKLocal);
    pKLocal = pCell;
    pCell += nKLocal;
    pCell += sqlite4BtVarintGet32(pCell, &nKOvfl);
    pCell += sqlite4BtVarintGet32(pCell, &nVOvfl);

  }else{
    pKLocal = pCell;
    pCell += nKLocal;
    pCell += sqlite4BtVarintGet32(pCell, &nVLocal);
    if( nVLocal==0 ){
      /* Type (b) */
      pCell += sqlite4BtVarintGet32(pCell, &nVLocal);
      pVLocal = pCell;
      pCell += nVLocal;
      pCell += sqlite4BtVarintGet32(pCell, &nVOvfl);
    }else{
      /* Type (a) */
      pVLocal = pCell;
    }
  }

  pCsr->ovfl.nKey = nKLocal + nKOvfl;
  pCsr->ovfl.nVal = nVLocal + nVOvfl;

  nReq = pCsr->ovfl.nKey + pCsr->ovfl.nVal;
  rc = sqlite4_buffer_resize(&pCsr->ovfl.buf, nReq);
  if( rc!=SQLITE4_OK ) return rc;

  /* Copy in local data */
  aOut = (u8*)pCsr->ovfl.buf.p;
  memcpy(aOut, pKLocal, nKLocal);
  memcpy(&aOut[nKLocal], pVLocal, nVLocal);

  /* Load in overflow data */
  if( nKOvfl || nVOvfl ){
    rc = btOverflowArrayRead(
        pCsr->pDb, pCell, &aOut[nKLocal + nVLocal], nKOvfl + nVOvfl
    );
  }

  return rc;
}


/*
** Helper function for btOverflowDelete(). 
**
** TODO: This uses recursion. Which is almost certainly not a problem 
** here, but makes some people nervous, so should probably be changed.
*/
................................................................................
  aData = (u8*)sqlite4BtPageData(pCsr->apPage[pCsr->nPg-1]);
  assert( btCellCount(aData, pgsz)>iCell );
  pCell = btCellFind(aData, pgsz, iCell);
  pCell += sqlite4BtVarintGet32(pCell, &nK);

  if( nK==0 ){
    /* type (c) leaf cell */
    rc = btCsrBuffer(pCsr, 0);
    if( rc==SQLITE4_OK ){
      *ppK = pCsr->ovfl.buf.p;
      *pnK = pCsr->ovfl.nKey;
    }
  }else{
    *ppK = pCell;
    *pnK = nK;
  }

................................................................................
  pCell += sqlite4BtVarintGet32(pCell, &nK);
  if( nK>0 ){
    pCell += nK;
    pCell += sqlite4BtVarintGet32(pCell, &nV);
  }

  if( nV==0 ){
    rc = btCsrBuffer(pCsr, 1);
    if( rc==SQLITE4_OK ){
      u8 *aBuf = (u8*)pCsr->ovfl.buf.p;
      *ppV = &aBuf[pCsr->ovfl.nKey];
      *pnV = pCsr->ovfl.nVal;
    }
  }else{
    *ppV = pCell;
    *pnV = (nV-1);
  }

................................................................................
  int rc = SQLITE4_OK;
  bt_cursor csr;

  sqlite4BtDebugKV((BtLock*)db->pPager, "replace", (u8*)pK, nK, (u8*)pV, nV);

  btCheckPageRefs(db);
  btCsrSetup(db, &csr);
  rc = btCsrSeek(&csr, pK, nK, BT_SEEK_GE, BT_CSRSEEK_UPDATE);
  if( rc==SQLITE4_OK ){
    /* The cursor currently points to an entry with key pK/nK. This call
    ** should therefore replace that entry. So delete it and then re-seek
    ** the cursor.  */
    rc = sqlite4BtDelete(&csr);

    if( rc==SQLITE4_OK && nV>=0 ){
      rc = btCsrSeek(&csr, pK, nK, BT_SEEK_GE, BT_CSRSEEK_UPDATE);
      if( rc==SQLITE4_OK ) rc = btErrorBkpt(SQLITE4_CORRUPT);
    }
  }

  if( nV>=0 && (rc==SQLITE4_NOTFOUND || rc==SQLITE4_INEXACT) ){
    /* Insert the new KV pair into the current leaf. */
    KeyValue kv;
................................................................................
  }
  btCsrReset(&csr, 1);

  btCheckPageRefs(db);
  return rc;
}

static int btSaveAllCursor(bt_cursor *pCsr){
  int rc = SQLITE4_OK;            /* Return code */
  bt_db *pDb = pCsr->pDb;         /* Database handle */
  bt_cursor *p;                   /* Used to iterate through cursors */

  for(p=pDb->pAllCsr; rc==SQLITE4_OK && p; p=p->pNextCsr){
    rc = btCsrBuffer(p, 0);
    if( rc==SQLITE4_OK ){
      assert( p->ovfl.buf.p );
      p->bRequireReseek = 1;
      if( p!=pCsr ) btCsrReleaseAll(p);
    }
  }

  return rc;
}


/*
** Delete the entry that the cursor currently points to.
*/
int sqlite4BtDelete(bt_cursor *pCsr){
  int rc;
  rc = btSaveAllCursor(pCsr);
  if( rc==SQLITE4_OK ){
    rc = btOverflowDelete(pCsr);
  }
  if( rc==SQLITE4_OK ){
    rc =  btDeleteFromPage(pCsr, 1);
  }
  if( rc==SQLITE4_OK ){
    rc = btBalanceIfUnderfull(pCsr);
  }

  btCsrReleaseAll(pCsr);
  return rc;
}

int sqlite4BtSetCookie(bt_db *db, unsigned int iVal){
  return sqlite4BtPagerSetCookie(db->pPager, iVal);
}

Changes to test/permutations.test.

121
122
123
124
125
126
127

128
129
130
131
132







133
134
135
136
137
138
139
# Start of tests
#

#-------------------------------------------------------------------------
# Define the generic test suites:
#
#   src4

#   veryquick
#   quick
#   full
#
lappend ::testsuitelist xxx








test_suite "src4" -prefix "" -description {
} -files {
  simple.test simple2.test
  lsm1.test lsm2.test lsm3.test lsm4.test lsm5.test
  csr1.test
  ckpt1.test







>





>
>
>
>
>
>
>







121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# Start of tests
#

#-------------------------------------------------------------------------
# Define the generic test suites:
#
#   src4
#   bt
#   veryquick
#   quick
#   full
#
lappend ::testsuitelist xxx

test_suite "bt" -prefix "" -description {
} -files {
  select1.test select2.test
} -initialize {
  kv_default bt
}

test_suite "src4" -prefix "" -description {
} -files {
  simple.test simple2.test
  lsm1.test lsm2.test lsm3.test lsm4.test lsm5.test
  csr1.test
  ckpt1.test

Changes to test/test_kv2.c.

397
398
399
400
401
402
403





























404
405
406
407
408
409
410
411
412

413
414
  );
  if( rc==TCL_OK ){
    rc = aSub[iSub].xCmd(interp, objc, (Tcl_Obj **)objv); 
  }

  return rc;
}






























/*
** Register the TCL commands defined above with the TCL interpreter.
**
** This routine should be the only externally visible symbol in this
** source code file.
*/
int Sqliteteststorage2_Init(Tcl_Interp *interp){
  Tcl_CreateObjCommand(interp, "kvwrap", kvwrap_command, 0, 0);

  return TCL_OK;
}







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>









>


397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
  );
  if( rc==TCL_OK ){
    rc = aSub[iSub].xCmd(interp, objc, (Tcl_Obj **)objv); 
  }

  return rc;
}

/*
** TCLCMD:    kv_default DEFAULT
*/
static int kv_default_cmd(
  void * clientData,
  Tcl_Interp *interp,
  int objc,
  Tcl_Obj *CONST objv[]
){
  sqlite4_kvfactory factory = 0;
  const char *zDflt;

  if( objc!=2 ){
    Tcl_WrongNumArgs(interp, 1, objv, "DEFAULT");
    return TCL_ERROR;
  }

  zDflt = Tcl_GetString(objv[1]);

  sqlite4_env_config(0, SQLITE4_ENVCONFIG_KVSTORE_GET, zDflt, &factory);
  if( factory==0 ){
    Tcl_ResetResult(interp);
    Tcl_AppendResult(interp, "no such factory: ", zDflt, 0);
    return TCL_ERROR;
  }
  sqlite4_env_config(0, SQLITE4_ENVCONFIG_KVSTORE_PUSH, "main", factory);
  return TCL_OK;
}

/*
** Register the TCL commands defined above with the TCL interpreter.
**
** This routine should be the only externally visible symbol in this
** source code file.
*/
int Sqliteteststorage2_Init(Tcl_Interp *interp){
  Tcl_CreateObjCommand(interp, "kvwrap", kvwrap_command, 0, 0);
  Tcl_CreateObjCommand(interp, "kv_default", kv_default_cmd, 0, 0);
  return TCL_OK;
}

Changes to test/tester.tcl.

866
867
868
869
870
871
872










































































873
874
875
876
877
878
879
  puts "----  ------------  ------  ------  ------  ---------------  --  -"
  $db eval "explain $sql" {} {
    puts [format {%-4d  %-12.12s  %-6d  %-6d  %-6d  % -17s %s  %s} \
      $addr $opcode $p1 $p2 $p3 $p4 $p5 $comment
    ]
  }
}











































































# Show the VDBE program for an SQL statement but omit the Trace
# opcode at the beginning.  This procedure can be used to prove
# that different SQL statements generate exactly the same VDBE code.
#
proc explain_no_trace {sql} {
  set tr [db eval "EXPLAIN $sql"]







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
  puts "----  ------------  ------  ------  ------  ---------------  --  -"
  $db eval "explain $sql" {} {
    puts [format {%-4d  %-12.12s  %-6d  %-6d  %-6d  % -17s %s  %s} \
      $addr $opcode $p1 $p2 $p3 $p4 $p5 $comment
    ]
  }
}

proc explain_i {sql {db db}} {
  puts ""
  puts "addr  opcode        p1      p2      p3      p4                p5  #"
  puts "----  ------------  ------  ------  ------  ----------------  --  -"


  # Set up colors for the different opcodes. Scheme is as follows:
  #
  #   Red:   Opcodes that write to a b-tree.
  #   Blue:  Opcodes that reposition or seek a cursor. 
  #   Green: The ResultRow opcode.
  #
  set R "\033\[31;1m"        ;# Red fg
  set G "\033\[32;1m"        ;# Green fg
  set B "\033\[34;1m"        ;# Red fg
  set D "\033\[39;0m"        ;# Default fg
  foreach opcode {
      Seek SeekGe SeekGt SeekLe SeekLt NotFound Last Rewind
      NoConflict Next Prev VNext VPrev VFilter
  } {
    set color($opcode) $B
  }
  foreach opcode {ResultRow} {
    set color($opcode) $G
  }
  foreach opcode {IdxInsert Insert Delete IdxDelete} {
    set color($opcode) $R
  }

  set bSeenGoto 0
  $db eval "explain $sql" {} {
    set x($addr) 0
    set op($addr) $opcode

    if {$opcode == "Goto" && ($bSeenGoto==0 || ($p2 > $addr+10))} {
      set linebreak($p2) 1
      set bSeenGoto 1
    }

    if {$opcode=="Next"  || $opcode=="Prev" 
     || $opcode=="VNext" || $opcode=="VPrev"
    } {
      for {set i $p2} {$i<$addr} {incr i} {
        incr x($i) 2
      }
    }

    if {$opcode == "Goto" && $p2<$addr && $op($p2)=="Yield"} {
      for {set i [expr $p2+1]} {$i<$addr} {incr i} {
        incr x($i) 2
      }
    }

    if {$opcode == "Halt" && $comment == "End of coroutine"} {
      set linebreak([expr $addr+1]) 1
    }
  }

  $db eval "explain $sql" {} {
    if {[info exists linebreak($addr)]} {
      puts ""
    }
    set I [string repeat " " $x($addr)]

    set col ""
    catch { set col $color($opcode) }

    puts [format {%-4d  %s%s%-12.12s%s  %-6d  %-6d  %-6d  % -17s %s  %s} \
      $addr $I $col $opcode $D $p1 $p2 $p3 $p4 $p5 $comment
    ]
  }
  puts "----  ------------  ------  ------  ------  ----------------  --  -"
}

# Show the VDBE program for an SQL statement but omit the Trace
# opcode at the beginning.  This procedure can be used to prove
# that different SQL statements generate exactly the same VDBE code.
#
proc explain_no_trace {sql} {
  set tr [db eval "EXPLAIN $sql"]