SQLite4
Check-in [1743941409]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a race condition causing LSM to read inconsistent in-memory and on-disk databases.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | rework-flow-control
Files: files | file ages | folders
SHA1: 1743941409535a3520de9361051a9c20674a94aa
User & Date: dan 2012-09-25 17:25:48
Context
2012-09-25
18:27
Fix a problem causing read-locks to fail with LSM_BUSY. check-in: 7eee90a0aa user: dan tags: rework-flow-control
17:25
Fix a race condition causing LSM to read inconsistent in-memory and on-disk databases. check-in: 1743941409 user: dan tags: rework-flow-control
14:50
Fix a mmap-mode bug. check-in: be1e513090 user: dan tags: rework-flow-control
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to lsm-test/lsmtest_tdb3.c.

1077
1078
1079
1080
1081
1082
1083

1084

1085

1086

1087
1088
1089
1090
1091
1092
1093
  p->lsm_work_flags = flags;
  p->lsm_work_npage = nPage;
  p->bCkpt = bCkpt;
  p->pDb = pDb;

  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);

  test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);

  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);

lsm_config_log(p->pWorker, xLog, (void *)"worker");


  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  /* Kick off the worker thread. */







>
|
>

>

>







1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
  p->lsm_work_flags = flags;
  p->lsm_work_npage = nPage;
  p->bCkpt = bCkpt;
  p->pDb = pDb;

  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
  if( zCfg ){
    test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);
  }
  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);
#if 0
lsm_config_log(p->pWorker, xLog, (void *)"worker");
#endif

  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  /* Kick off the worker thread. */

Changes to src/lsmInt.h.

759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int);
int lsmFinishFlush(lsm_db *, int);

int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Snapshot *lsmDbSnapshotClient(lsm_db *);
Snapshot *lsmDbSnapshotWorker(lsm_db *);








|







759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int, int);
int lsmFinishFlush(lsm_db *, int);

int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Snapshot *lsmDbSnapshotClient(lsm_db *);
Snapshot *lsmDbSnapshotWorker(lsm_db *);

Changes to src/lsm_ckpt.c.

1123
1124
1125
1126
1127
1128
1129






1130
1131
1132
1133
1134
1135
1136
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;







  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);







>
>
>
>
>
>







1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;

#if 0
if( bFlush ){
  printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1);
  fflush(stdout);
}
#endif
  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);

Changes to src/lsm_main.c.

701
702
703
704
705
706
707

708
709
710
711
712
713
714
715
716
717

718
719
720
721
722
723
724
725
726
727
728
729
...
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
  assert_db_state( pDb );

  /* A value less than zero means close the innermost nested transaction. */
  if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);

  if( iLevel<pDb->nTransOpen ){
    if( iLevel==0 ){


      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }
      if( rc==LSM_OK && lsmTreeSize(pDb)>pDb->nTreeLimit ){
        lsmLogEnd(pDb, 1);
        lsmTreeMakeOld(pDb);
        rc = lsmSortedAutoWork(pDb, 1);

      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
    pDb->nTransOpen = iLevel;

  }
  dbReleaseClientSnapshot(pDb);
  return rc;
}

int lsm_rollback(lsm_db *pDb, int iLevel){
  int rc = LSM_OK;
................................................................................
      TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)];
      lsmTreeRollback(pDb, &pMark->tree);
      if( iLevel ) lsmLogSeek(pDb, &pMark->log);
      pDb->nTransOpen = iLevel;
    }

    if( pDb->nTransOpen==0 ){
      lsmFinishWriteTrans(pDb, 0);
    }
    dbReleaseClientSnapshot(pDb);
  }

  return rc;
}








>







<

<
>

|


<







 







|







701
702
703
704
705
706
707
708
709
710
711
712
713
714
715

716

717
718
719
720
721

722
723
724
725
726
727
728
...
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
  assert_db_state( pDb );

  /* A value less than zero means close the innermost nested transaction. */
  if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);

  if( iLevel<pDb->nTransOpen ){
    if( iLevel==0 ){
      int bAutowork = 0;

      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }
      if( rc==LSM_OK && lsmTreeSize(pDb)>pDb->nTreeLimit ){

        lsmTreeMakeOld(pDb);

        bAutowork = pDb->bAutowork;
      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK), bAutowork);
    }
    pDb->nTransOpen = iLevel;

  }
  dbReleaseClientSnapshot(pDb);
  return rc;
}

int lsm_rollback(lsm_db *pDb, int iLevel){
  int rc = LSM_OK;
................................................................................
      TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)];
      lsmTreeRollback(pDb, &pMark->tree);
      if( iLevel ) lsmLogSeek(pDb, &pMark->log);
      pDb->nTransOpen = iLevel;
    }

    if( pDb->nTransOpen==0 ){
      lsmFinishWriteTrans(pDb, 0, 0);
    }
    dbReleaseClientSnapshot(pDb);
  }

  return rc;
}

Changes to src/lsm_shared.c.

705
706
707
708
709
710
711











712
713
714
715
716
717
718
...
795
796
797
798
799
800
801
802

803
804



805
806
807
808
809
810
811
812
813
...
874
875
876
877
878
879
880



881
882
883
884
885
886
887
          assert( pDb->iReader>=0 );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }











  }
  if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;

  return rc;
}

/*
................................................................................
** written into the log file when this function is called. Or, if the
** transaction was rolled back, both the log file and in-memory tree 
** structure have already been restored. In either case, this function 
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){

  lsmLogEnd(pDb, bCommit);
  lsmTreeEndTransaction(pDb, bCommit);



  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  return LSM_OK;
}


/*
** Return non-zero if the caller is holding the client mutex.
*/
#ifdef LSM_DEBUG
................................................................................
        db->iReader = i;
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }




  return rc;
}

/*
** This is used to check if there exists a read-lock locking a particular
** version of either the in-memory tree or database file. 
**







>
>
>
>
>
>
>
>
>
>
>







 







|
>


>
>
>

|







 







>
>
>







705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
...
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
...
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
          assert( pDb->iReader>=0 );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }
#if 0
if( rc==LSM_OK && pDb->pClient ){
  printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
      (void *)pDb,
      (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, 
      (int)pDb->treehdr.root.iTransId,
      (int)pDb->treehdr.iOldShmid
  );
  fflush(stdout);
}
#endif
  }
  if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;

  return rc;
}

/*
................................................................................
** written into the log file when this function is called. Or, if the
** transaction was rolled back, both the log file and in-memory tree 
** structure have already been restored. In either case, this function 
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit, int nAutowork){
  int rc = LSM_OK;
  lsmLogEnd(pDb, bCommit);
  lsmTreeEndTransaction(pDb, bCommit);
  if( nAutowork ){
    rc = lsmSortedAutoWork(pDb, nAutowork);
  }
  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  return rc;
}


/*
** Return non-zero if the caller is holding the client mutex.
*/
#ifdef LSM_DEBUG
................................................................................
        db->iReader = i;
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }

  if( rc==LSM_OK && db->iReader<0 ){
    rc = LSM_BUSY;
  }
  return rc;
}

/*
** This is used to check if there exists a read-lock locking a particular
** version of either the in-memory tree or database file. 
**