SQLite4
Check-in [1cd7d6ca93]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix problems introduced in the previous checkin.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | rework-flow-control
Files: files | file ages | folders
SHA1: 1cd7d6ca93080b8b0fc8d363360115e1a265f29b
User & Date: dan 2012-09-24 10:55:39
Context
2012-09-24
16:04
Fix further bugs. check-in: 99b59dacbd user: dan tags: rework-flow-control
10:55
Fix problems introduced in the previous checkin. check-in: 1cd7d6ca93 user: dan tags: rework-flow-control
2012-09-22
19:38
Rework flow control some (flow control = slowing down clients when worker threads or processes cannot keep up). check-in: 50f8b55823 user: dan tags: rework-flow-control
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_tdb3.c.

595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
*/
static void xLog(void *pCtx, int rc, const char *z){
  unused_parameter(rc);
  /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */
  if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
  fprintf(stderr, "%s\n", z);
  fflush(stderr);

}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}








<







595
596
597
598
599
600
601

602
603
604
605
606
607
608
*/
static void xLog(void *pCtx, int rc, const char *z){
  unused_parameter(rc);
  /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */
  if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
  fprintf(stderr, "%s\n", z);
  fflush(stderr);

}

static void xWorkHook(lsm_db *db, void *pArg){
  LsmDb *p = (LsmDb *)pArg;
  if( p->xWork ) p->xWork(db, p->pWorkCtx);
}

Changes to src/lsmInt.h.

670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
** End of functions from "lsm_file.c".
**************************************************************************/

/* 
** Functions from file "lsm_sorted.c".
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);
int lsmSortedFlushTree(lsm_db *, int *);
void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);








<







670
671
672
673
674
675
676

677
678
679
680
681
682
683
** End of functions from "lsm_file.c".
**************************************************************************/

/* 
** Functions from file "lsm_sorted.c".
*/
int lsmInfoPageDump(lsm_db *, Pgno, int, char **);

void lsmSortedCleanup(lsm_db *);
int lsmSortedAutoWork(lsm_db *, int nUnit);

int lsmFlushTreeToDisk(lsm_db *pDb);

void lsmSortedRemap(lsm_db *pDb);

Changes to src/lsm_ckpt.c.

711
712
713
714
715
716
717

718
719
720
721
722
723
724
725
726
**
** True is returned if there are currently too many free-list entries
** in-memory to store in a checkpoint. Before calling lsmCheckpointSaveWorker()
** to save the current worker snapshot, a new top-level LSM segment must
** be created so that some of them can be written to the LSM. 
*/
int lsmCheckpointOverflowRequired(lsm_db *pDb){

  assert( lsmShmAssertWorker(pDb) );
  return (pDb->pWorker->freelist.nEntry > pDb->nMaxFreelist);
}

/*
** Connection pDb must be the worker to call this function.
**
** Load the FREELIST record from the database. Decode it and append the
** results to list pFreelist.







>

|







711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
**
** True is returned if there are currently too many free-list entries
** in-memory to store in a checkpoint. Before calling lsmCheckpointSaveWorker()
** to save the current worker snapshot, a new top-level LSM segment must
** be created so that some of them can be written to the LSM. 
*/
int lsmCheckpointOverflowRequired(lsm_db *pDb){
  Snapshot *p = pDb->pWorker;
  assert( lsmShmAssertWorker(pDb) );
  return (p->freelist.nEntry > pDb->nMaxFreelist || p->nFreelistOvfl>0);
}

/*
** Connection pDb must be the worker to call this function.
**
** Load the FREELIST record from the database. Decode it and append the
** results to list pFreelist.

Changes to src/lsm_log.c.

443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
** A call to this function deletes the LogWriter object allocated by
** lsmLogBegin(). If the transaction is being committed, the shared state
** in *pLog is updated before returning.
*/
void lsmLogEnd(lsm_db *pDb, int bCommit){
  DbLog *pLog;
  LogWriter *p;

  if( pDb->bUseLog==0 ) return;
  p = pDb->pLogWriter;
  pLog = &pDb->treehdr.log;

  if( bCommit ){
    pLog->aRegion[2].iEnd = p->iOff;
    pLog->cksum0 = p->cksum0;
    pLog->cksum1 = p->cksum1;
    if( p->iRegion1End ){







|
|
|







443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
** A call to this function deletes the LogWriter object allocated by
** lsmLogBegin(). If the transaction is being committed, the shared state
** in *pLog is updated before returning.
*/
void lsmLogEnd(lsm_db *pDb, int bCommit){
  DbLog *pLog;
  LogWriter *p;
  p = pDb->pLogWriter;

  if( p==0 ) return;
  pLog = &pDb->treehdr.log;

  if( bCommit ){
    pLog->aRegion[2].iEnd = p->iOff;
    pLog->cksum0 = p->cksum0;
    pLog->cksum1 = p->cksum1;
    if( p->iRegion1End ){

Changes to src/lsm_main.c.

731
732
733
734
735
736
737

738
739
740
741
742
743
744

      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }
      if( rc==LSM_OK && lsmTreeSize(pDb)>pDb->nTreeLimit ){

        lsmTreeMakeOld(pDb);
        rc = lsmSortedAutoWork(pDb, 1);
      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
    pDb->nTransOpen = iLevel;








>







731
732
733
734
735
736
737
738
739
740
741
742
743
744
745

      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }
      if( rc==LSM_OK && lsmTreeSize(pDb)>pDb->nTreeLimit ){
        lsmLogEnd(pDb, 1);
        lsmTreeMakeOld(pDb);
        rc = lsmSortedAutoWork(pDb, 1);
      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
    pDb->nTransOpen = iLevel;

Changes to src/lsm_shared.c.

584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ){
        pShm->iMetaPage = iMeta;
        nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
      }
#if 0
  lsmLogMessage(pDb, 0, "finish checkpoint %d", 
      (int)lsmCheckpointId(pDb->aSnapshot, 0)
  );
#endif
    }
  }








|







584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
      if( rc==LSM_OK && pDb->eSafety!=LSM_SAFETY_OFF){
        rc = lsmFsSyncDb(pDb->pFS);
      }
      if( rc==LSM_OK ){
        pShm->iMetaPage = iMeta;
        nWrite = lsmCheckpointNWrite(pDb->aSnapshot, 0) - nWrite;
      }
#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, 0, "finish checkpoint %d", 
      (int)lsmCheckpointId(pDb->aSnapshot, 0)
  );
#endif
    }
  }

Changes to src/lsm_sorted.c.

3606
3607
3608
3609
3610
3611
3612
3613
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628
3629
3630
3631
3632
3633
3634
3635
3636
3637
3638
3639
3640
3641
3642
3643
3644
3645
3646
3647
3648
3649
3650
3651
3652
3653
3654
3655
3656
3657
3658
....
3906
3907
3908
3909
3910
3911
3912










3913
3914
3915
3916
3917
3918
3919
....
4014
4015
4016
4017
4018
4019
4020
4021
4022
4023
4024
4025
4026
4027
4028
4029
4030
4031
4032
4033
4034




4035
4036
4037
4038
4039
4040
4041
....
4101
4102
4103
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
....
4127
4128
4129
4130
4131
4132
4133

4134
4135
4136
4137
4138
4139
4140
....
4165
4166
4167
4168
4169
4170
4171

4172
4173
4174
4175
4176
4177
4178
4179
4180
4181

4182
4183
4184
4185
4186
4187
4188
4189
4190
4191



4192
4193
4194
4195
4196
4197
4198
4199
4200
4201
....
4688
4689
4690
4691
4692
4693
4694
4695


4696
4697
4698
4699
4700
4701
4702
  if( rc==LSM_OK ){
    sortedInvokeWorkHook(pDb);
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  if( pnWrite ) *pnWrite = nWrite;
  pDb->pWorker->nWrite += nWrite;
  return rc;
}

/*
** Flush the contents of the in-memory tree to a new segment on disk.
** At present, this may occur in two scenarios:
**
**   1. When a transaction has just been committed (by connection pDb), 
**      and the in-memory tree has exceeded the size threshold, or
**
**   2. If the in-memory tree is not empty and the last connection to
**      the database (pDb) is being closed.
**
** In both cases, the connection hold a worker snapshot reference. In
** the first, the connection also holds the in-memory tree write-version.
** In the second, no in-memory tree version reference is held at all.
*/
int lsmSortedFlushTree(
  lsm_db *pDb,                    /* Connection handle */
  int *pnOvfl                     /* OUT: Number of free-list entries written */
){
  int rc;

  assert( pDb->pWorker );

  /* If there is nothing to do, return early. */
  if( lsmTreeHasOld(pDb)==0 && lsmCheckpointOverflowRequired(pDb)==0 ){
    *pnOvfl = 0;
    return LSM_OK;
  }

  rc = sortedNewToplevel(pDb, 1, pnOvfl, 0);
  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );

#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "tree flush");
#endif
#if 0
  lsmLogMessage(pDb, rc, "flushed tree to disk");
#endif
  return rc;
}

/*
** The nMerge levels in the LSM beginning with pLevel consist of a
** left-hand-side segment only. Replace these levels with a single new
................................................................................
    }else{
      *ppOut = pBest;
    }
  }

  return rc;
}











static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int bOptimize,                  /* True to merge less than nMerge levels */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

      /* If bFlush is true and the code above was working on an age=1 level
      ** break out of the loop now, even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */
      if( bFlush && pLevel->iAge==1 ) break;

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "work");
#endif
    }
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  if( pnWrite ) *pnWrite = (nWork - nRemaining);
  pWorker->nWrite += (nWork - nRemaining);




  return rc;
}

typedef struct Metric Metric;
struct Metric {
  double fAvgHeight;
  int nTotalSz;
................................................................................
  ){
    *pbOut = 1;
  }else{
    *pbOut = 0;
  }
  return rc;
}

static int sortedDbIsFull(lsm_db *pDb){
  Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);
  if( pTop && pTop->iAge==0
   && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
  ){
    return 1;
  }
  return 0;
}


static int doLsmSingleWork(
  lsm_db *pDb, 
  int bShutdown,
  int flags, 
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
................................................................................
){
  int rc = LSM_OK;                /* Return code */
  int nOvfl = 0;
  int bFlush = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;


  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);
  if( rc!=LSM_OK ) return rc;

................................................................................
    rc = sortedTreeHasOld(pDb, &bOld);
    if( bOld ){
      if( sortedDbIsFull(pDb) ){
        int nPg = 0;
        rc = sortedWork(pDb, nRem, 0, 1, &nPg);
        nRem -= nPg;
        assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );

      }

      if( rc==LSM_OK && nRem>0 ){
        int nPg = 0;
        rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg);
        nRem -= nPg;
        if( rc==LSM_OK && pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        bFlush = 1;

      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
    nRem -= nPg;



    if( rc==LSM_OK && nPg && lsmCheckpointOverflowRequired(pDb) ){
      rc = sortedNewToplevel(pDb, 0, &nOvfl, 0);
    }
  }

  if( rc==LSM_OK && (nRem!=nMax) ){
    rc = lsmSortedFlushDb(pDb);
    lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  }else{
    int rcdummy = LSM_BUSY;
................................................................................
          fileToString(pDb->pEnv, zLeft, sizeof(zLeft), 28, aLeft[i]); 
        }
        if( i<nRight ){ 
          fileToString(pDb->pEnv, zRight, sizeof(zRight), 28, aRight[i]); 
        }

        if( i==0 ){
          sqlite4_snprintf(zLevel, sizeof(zLevel), "L%d:", iLevel);


        }else{
          zLevel[0] = '\0';
        }

        if( nRight==0 ){
          iPad = 28 - (strlen(zLeft)/2) ;
        }







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<

|
<
<
<







 







>
>
>
>
>
>
>
>
>
>







 







|
|

|


|







>
>
>
>







 







<
<
<
<
<
<
<
<
<
<
<







 







>







 







>










>










>
>
>
|
|
<







 







|
>
>







3606
3607
3608
3609
3610
3611
3612


































3613
3614



3615
3616
3617
3618
3619
3620
3621
....
3869
3870
3871
3872
3873
3874
3875
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890
3891
3892
....
3987
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
4010
4011
4012
4013
4014
4015
4016
4017
4018
....
4078
4079
4080
4081
4082
4083
4084











4085
4086
4087
4088
4089
4090
4091
....
4093
4094
4095
4096
4097
4098
4099
4100
4101
4102
4103
4104
4105
4106
4107
....
4132
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143
4144
4145
4146
4147
4148
4149
4150
4151
4152
4153
4154
4155
4156
4157
4158
4159
4160
4161
4162
4163
4164
4165

4166
4167
4168
4169
4170
4171
4172
....
4659
4660
4661
4662
4663
4664
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674
4675
  if( rc==LSM_OK ){
    sortedInvokeWorkHook(pDb);
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  if( pnWrite ) *pnWrite = nWrite;
  pDb->pWorker->nWrite += nWrite;


































#if 0
  lsmSortedDumpStructure(pDb, pDb->pWorker, 1, 0, "new-toplevel");



#endif
  return rc;
}

/*
** The nMerge levels in the LSM beginning with pLevel consist of a
** left-hand-side segment only. Replace these levels with a single new
................................................................................
    }else{
      *ppOut = pBest;
    }
  }

  return rc;
}

static int sortedDbIsFull(lsm_db *pDb){
  Level *pTop = lsmDbSnapshotLevel(pDb->pWorker);
  if( pTop && pTop->iAge==0
   && (pTop->nRight || sortedCountLevels(pTop)>=pDb->nMerge)
  ){
    return 1;
  }
  return 0;
}

static int sortedWork(
  lsm_db *pDb,                    /* Database handle. Must be worker. */
  int nWork,                      /* Number of pages of work to do */
  int bOptimize,                  /* True to merge less than nMerge levels */
  int bFlush,                     /* Set if call is to make room for a flush */
  int *pnWrite                    /* OUT: Actual number of pages written */
................................................................................

      /* Clean up the MergeWorker object initialized above. If no error
      ** has occurred, invoke the work-hook to inform the application that
      ** the database structure has changed. */
      mergeWorkerShutdown(&mergeworker, &rc);
      if( rc==LSM_OK ) sortedInvokeWorkHook(pDb);

      /* If bFlush is true and the database is no longer considered "full",
      ** break out of the loop even if nRemaining is still greater than
      ** zero. The caller has an in-memory tree to flush to disk.  */
      if( bFlush && sortedDbIsFull(pDb)==0 ) break;

#if 0
      lsmSortedDumpStructure(pDb, pDb->pWorker, 0, 0, "work");
#endif
    }
  }

  assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
  if( pnWrite ) *pnWrite = (nWork - nRemaining);
  pWorker->nWrite += (nWork - nRemaining);

#ifdef LSM_LOG_WORK
  lsmLogMessage(pDb, rc, "sortedWork(): %d pages", (nWork-nRemaining));
#endif
  return rc;
}

typedef struct Metric Metric;
struct Metric {
  double fAvgHeight;
  int nTotalSz;
................................................................................
  ){
    *pbOut = 1;
  }else{
    *pbOut = 0;
  }
  return rc;
}












static int doLsmSingleWork(
  lsm_db *pDb, 
  int bShutdown,
  int flags, 
  int nPage,                      /* Number of pages to write to disk */
  int *pnWrite,                   /* OUT: Pages actually written to disk */
................................................................................
){
  int rc = LSM_OK;                /* Return code */
  int nOvfl = 0;
  int bFlush = 0;
  int nMax = nPage;               /* Maximum pages to write to disk */
  int nRem = nPage;
  int bCkpt = 0;
  int bToplevel = 0;

  /* Open the worker 'transaction'. It will be closed before this function
  ** returns.  */
  assert( pDb->pWorker==0 );
  rc = lsmBeginWork(pDb);
  if( rc!=LSM_OK ) return rc;

................................................................................
    rc = sortedTreeHasOld(pDb, &bOld);
    if( bOld ){
      if( sortedDbIsFull(pDb) ){
        int nPg = 0;
        rc = sortedWork(pDb, nRem, 0, 1, &nPg);
        nRem -= nPg;
        assert( rc!=LSM_OK || nRem<=0 || !sortedDbIsFull(pDb) );
        bToplevel = 1;
      }

      if( rc==LSM_OK && nRem>0 ){
        int nPg = 0;
        rc = sortedNewToplevel(pDb, 1, &nOvfl, &nPg);
        nRem -= nPg;
        if( rc==LSM_OK && pDb->nTransOpen>0 ){
          lsmTreeDiscardOld(pDb);
        }
        bFlush = 1;
        bToplevel = 0;
      }
    }
  }

  /* If nPage is still greater than zero, do some merging. */
  if( rc==LSM_OK && nRem>0 && bShutdown==0 ){
    int nPg = 0;
    int bOptimize = ((flags & LSM_WORK_OPTIMIZE) ? 1 : 0);
    rc = sortedWork(pDb, nRem, bOptimize, 0, &nPg);
    nRem -= nPg;
    if( nPg ) bToplevel = 1;
  }

  if( rc==LSM_OK && bToplevel && lsmCheckpointOverflowRequired(pDb) ){
    rc = sortedNewToplevel(pDb, 0, &nOvfl, 0);

  }

  if( rc==LSM_OK && (nRem!=nMax) ){
    rc = lsmSortedFlushDb(pDb);
    lsmFinishWork(pDb, bFlush, nOvfl, &rc);
  }else{
    int rcdummy = LSM_BUSY;
................................................................................
          fileToString(pDb->pEnv, zLeft, sizeof(zLeft), 28, aLeft[i]); 
        }
        if( i<nRight ){ 
          fileToString(pDb->pEnv, zRight, sizeof(zRight), 28, aRight[i]); 
        }

        if( i==0 ){
          sqlite4_snprintf(zLevel, sizeof(zLevel), "L%d: (age=%d)", 
              iLevel, pLevel->iAge
          );
        }else{
          zLevel[0] = '\0';
        }

        if( nRight==0 ){
          iPad = 28 - (strlen(zLeft)/2) ;
        }