Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Fix a race condition causing LSM to read inconsistent in-memory and on-disk databases.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | rework-flow-control
Files: files | file ages | folders
SHA1: 1743941409535a3520de9361051a9c20674a94aa
User & Date: dan 2012-09-25 17:25:48.429
Context
2012-09-25
18:27
Fix a problem causing read-locks to fail with LSM_BUSY. check-in: 7eee90a0aa user: dan tags: rework-flow-control
17:25
Fix a race condition causing LSM to read inconsistent in-memory and on-disk databases. check-in: 1743941409 user: dan tags: rework-flow-control
14:50
Fix a mmap-mode bug. check-in: be1e513090 user: dan tags: rework-flow-control
Changes
Unified Diff Ignore Whitespace Patch
Changes to lsm-test/lsmtest_tdb3.c.
1077
1078
1079
1080
1081
1082
1083

1084

1085

1086

1087
1088
1089
1090
1091
1092
1093
  p->lsm_work_flags = flags;
  p->lsm_work_npage = nPage;
  p->bCkpt = bCkpt;
  p->pDb = pDb;

  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);

  test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);

  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);

lsm_config_log(p->pWorker, xLog, (void *)"worker");


  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  /* Kick off the worker thread. */







>
|
>

>

>







1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
  p->lsm_work_flags = flags;
  p->lsm_work_npage = nPage;
  p->bCkpt = bCkpt;
  p->pDb = pDb;

  /* Open the worker connection */
  if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
  if( zCfg ){
    test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);
  }
  if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);
#if 0
lsm_config_log(p->pWorker, xLog, (void *)"worker");
#endif

  /* Configure the work-hook */
  if( rc==0 ){
    lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
  }

  /* Kick off the worker thread. */
Changes to src/lsmInt.h.
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int);
int lsmFinishFlush(lsm_db *, int);

int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Snapshot *lsmDbSnapshotClient(lsm_db *);
Snapshot *lsmDbSnapshotWorker(lsm_db *);








|







759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
int lsmBeginFlush(lsm_db *);

int lsmBeginWork(lsm_db *);
void lsmFinishWork(lsm_db *, int, int, int *);

int lsmFinishRecovery(lsm_db *);
void lsmFinishReadTrans(lsm_db *);
int lsmFinishWriteTrans(lsm_db *, int, int);
int lsmFinishFlush(lsm_db *, int);

int lsmSnapshotSetFreelist(lsm_db *, int *, int);

Snapshot *lsmDbSnapshotClient(lsm_db *);
Snapshot *lsmDbSnapshotWorker(lsm_db *);

Changes to src/lsm_ckpt.c.
1123
1124
1125
1126
1127
1128
1129






1130
1131
1132
1133
1134
1135
1136
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;







  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);







>
>
>
>
>
>







1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush, int nOvfl){
  Snapshot *pSnap = pDb->pWorker;
  ShmHeader *pShm = pDb->pShmhdr;
  void *p = 0;
  int n = 0;
  int rc;

#if 0
if( bFlush ){
  printf("pushing %p tree to %d\n", (void *)pDb, pSnap->iId+1);
  fflush(stdout);
}
#endif
  rc = ckptExportSnapshot(pDb, nOvfl, bFlush, pSnap->iId+1, 1, &p, &n);
  if( rc!=LSM_OK ) return rc;
  assert( ckptChecksumOk((u32 *)p) );

  assert( n<=LSM_META_PAGE_SIZE );
  memcpy(pShm->aSnap2, p, n);
  lsmShmBarrier(pDb);
Changes to src/lsm_main.c.
701
702
703
704
705
706
707

708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
  assert_db_state( pDb );

  /* A value less than zero means close the innermost nested transaction. */
  if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);

  if( iLevel<pDb->nTransOpen ){
    if( iLevel==0 ){


      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }
      if( rc==LSM_OK && lsmTreeSize(pDb)>pDb->nTreeLimit ){
        lsmLogEnd(pDb, 1);
        lsmTreeMakeOld(pDb);
        rc = lsmSortedAutoWork(pDb, 1);
      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK));
    }
    pDb->nTransOpen = iLevel;

  }
  dbReleaseClientSnapshot(pDb);
  return rc;
}

int lsm_rollback(lsm_db *pDb, int iLevel){
  int rc = LSM_OK;







>







<

|

|


<







701
702
703
704
705
706
707
708
709
710
711
712
713
714
715

716
717
718
719
720
721

722
723
724
725
726
727
728
  assert_db_state( pDb );

  /* A value less than zero means close the innermost nested transaction. */
  if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1);

  if( iLevel<pDb->nTransOpen ){
    if( iLevel==0 ){
      int bAutowork = 0;

      /* Commit the transaction to disk. */
      if( rc==LSM_OK ) rc = lsmLogCommit(pDb);
      if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){
        rc = lsmFsSyncLog(pDb->pFS);
      }
      if( rc==LSM_OK && lsmTreeSize(pDb)>pDb->nTreeLimit ){

        lsmTreeMakeOld(pDb);
        bAutowork = pDb->bAutowork;
      }
      lsmFinishWriteTrans(pDb, (rc==LSM_OK), bAutowork);
    }
    pDb->nTransOpen = iLevel;

  }
  dbReleaseClientSnapshot(pDb);
  return rc;
}

int lsm_rollback(lsm_db *pDb, int iLevel){
  int rc = LSM_OK;
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
      TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)];
      lsmTreeRollback(pDb, &pMark->tree);
      if( iLevel ) lsmLogSeek(pDb, &pMark->log);
      pDb->nTransOpen = iLevel;
    }

    if( pDb->nTransOpen==0 ){
      lsmFinishWriteTrans(pDb, 0);
    }
    dbReleaseClientSnapshot(pDb);
  }

  return rc;
}








|







736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
      TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)];
      lsmTreeRollback(pDb, &pMark->tree);
      if( iLevel ) lsmLogSeek(pDb, &pMark->log);
      pDb->nTransOpen = iLevel;
    }

    if( pDb->nTransOpen==0 ){
      lsmFinishWriteTrans(pDb, 0, 0);
    }
    dbReleaseClientSnapshot(pDb);
  }

  return rc;
}

Changes to src/lsm_shared.c.
705
706
707
708
709
710
711











712
713
714
715
716
717
718
          assert( pDb->iReader>=0 );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }











  }
  if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;

  return rc;
}

/*







>
>
>
>
>
>
>
>
>
>
>







705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
          assert( pDb->iReader>=0 );
        }else{
          rc = lsmReleaseReadlock(pDb);
        }
      }
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }
#if 0
if( rc==LSM_OK && pDb->pClient ){
  printf("reading %p: snapshot:%d used-shmid:%d trans-id:%d iOldShmid=%d\n",
      (void *)pDb,
      (int)pDb->pClient->iId, (int)pDb->treehdr.iUsedShmid, 
      (int)pDb->treehdr.root.iTransId,
      (int)pDb->treehdr.iOldShmid
  );
  fflush(stdout);
}
#endif
  }
  if( pDb->pClient==0 && rc==LSM_OK ) rc = LSM_BUSY;

  return rc;
}

/*
795
796
797
798
799
800
801
802

803
804



805
806
807
808
809
810
811
812
813
** written into the log file when this function is called. Or, if the
** transaction was rolled back, both the log file and in-memory tree 
** structure have already been restored. In either case, this function 
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit){

  lsmLogEnd(pDb, bCommit);
  lsmTreeEndTransaction(pDb, bCommit);



  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  return LSM_OK;
}


/*
** Return non-zero if the caller is holding the client mutex.
*/
#ifdef LSM_DEBUG







|
>


>
>
>

|







806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
** written into the log file when this function is called. Or, if the
** transaction was rolled back, both the log file and in-memory tree 
** structure have already been restored. In either case, this function 
** merely releases locks and other resources held by the write-transaction.
**
** LSM_OK is returned if successful, or an LSM error code otherwise.
*/
int lsmFinishWriteTrans(lsm_db *pDb, int bCommit, int nAutowork){
  int rc = LSM_OK;
  lsmLogEnd(pDb, bCommit);
  lsmTreeEndTransaction(pDb, bCommit);
  if( nAutowork ){
    rc = lsmSortedAutoWork(pDb, nAutowork);
  }
  lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
  return rc;
}


/*
** Return non-zero if the caller is holding the client mutex.
*/
#ifdef LSM_DEBUG
874
875
876
877
878
879
880



881
882
883
884
885
886
887
        db->iReader = i;
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }




  return rc;
}

/*
** This is used to check if there exists a read-lock locking a particular
** version of either the in-memory tree or database file. 
**







>
>
>







889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
        db->iReader = i;
      }else if( rc==LSM_BUSY ){
        rc = LSM_OK;
      }
    }
  }

  if( rc==LSM_OK && db->iReader<0 ){
    rc = LSM_BUSY;
  }
  return rc;
}

/*
** This is used to check if there exists a read-lock locking a particular
** version of either the in-memory tree or database file. 
**