Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add a missing unlock(WRITER) call to lsmCheckpointWrite().
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | multi-process
Files: files | file ages | folders
SHA1: e97d02f06850b13c5f41a5fa6d73d9d81776e0ea
User & Date: dan 2012-09-03 17:57:56.347
Context
2012-09-03
19:26
Fix a bug occuring when the last connection to disconnect from a database is not also the last to write to it. check-in: 75de95787f user: dan tags: multi-process
17:57
Add a missing unlock(WRITER) call to lsmCheckpointWrite(). check-in: e97d02f068 user: dan tags: multi-process
14:50
Fix a problem with growing the database file in mmap mode. check-in: 1a86aa1c6b user: dan tags: multi-process
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsm_ckpt.c.
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
  int bCksum,                     /* If true, include checksums */
  void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
  int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
){
  int rc = LSM_OK;                /* Return Code */
  FileSystem *pFS = pDb->pFS;     /* File system object */
  Snapshot *pSnap = pDb->pWorker; /* Worker snapshot */
  int nAll = 0;                   /* Number of levels in db */
  int nHdrLevel = 0;              /* Number of levels in checkpoint */
  int iLevel;                     /* Used to count out nHdrLevel levels */
  int iOut = 0;                   /* Current offset in aCkpt[] */
  Level *pLevel;                  /* Level iterator */
  int i;                          /* Iterator used while serializing freelist */
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;







<
|
|







366
367
368
369
370
371
372

373
374
375
376
377
378
379
380
381
  int bCksum,                     /* If true, include checksums */
  void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
  int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
){
  int rc = LSM_OK;                /* Return Code */
  FileSystem *pFS = pDb->pFS;     /* File system object */
  Snapshot *pSnap = pDb->pWorker; /* Worker snapshot */

  int nLevel = 0;                 /* Number of levels in checkpoint */
  int iLevel;                     /* Used to count out nLevel levels */
  int iOut = 0;                   /* Current offset in aCkpt[] */
  Level *pLevel;                  /* Level iterator */
  int i;                          /* Iterator used while serializing freelist */
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
  /* Write the log offset into the checkpoint. */
  ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);

  /* Write the append-point list */
  ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);

  /* Figure out how many levels will be written to the checkpoint. */
  for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nAll++;
  nHdrLevel = nAll;
  assert( nHdrLevel>0 );

  /* Serialize nHdrLevel levels. */
  iLevel = 0;
  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nHdrLevel; pLevel=pLevel->pNext){
    ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
    iLevel++;
  }

  /* Write the freelist */
  if( rc==LSM_OK ){
    ckptSetValue(&ckpt, iOut++, nFree, &rc);







|
<
<

|

|







393
394
395
396
397
398
399
400


401
402
403
404
405
406
407
408
409
410
411
  /* Write the log offset into the checkpoint. */
  ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);

  /* Write the append-point list */
  ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);

  /* Figure out how many levels will be written to the checkpoint. */
  for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nLevel++;



  /* Serialize nLevel levels. */
  iLevel = 0;
  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
    ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
    iLevel++;
  }

  /* Write the freelist */
  if( rc==LSM_OK ){
    ckptSetValue(&ckpt, iOut++, nFree, &rc);
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
  /* Write the checkpoint header */
  assert( iId>=0 );
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nHdrLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);







|







420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
  /* Write the checkpoint header */
  assert( iId>=0 );
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);
Changes to src/lsm_shared.c.
510
511
512
513
514
515
516
517

518
519

520

521

522
523
524




525
526
527
528
529
530
531
      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
    }
  }

  /* If no error has occured, then the snapshot currently in pDb->aSnapshot
  ** has been synced to disk. This means it may be possible to wrap the
  ** log file. Obtain the WRITER lock and update the relevent tree-header
  ** fields to reflect this.  */

  if( rc==LSM_OK ){
    u64 iLogoff = lsmCheckpointLogOffset(pDb->aSnapshot);

    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);

    if( rc==LSM_OK ) rc = lsmTreeLoadHeader(pDb);

    if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
    if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
    if( rc==LSM_BUSY ) rc = LSM_OK;




  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

int lsmBeginWork(lsm_db *pDb){







|
>


>
|
>
|
>
|
|
|
>
>
>
>







510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
    }
  }

  /* If no error has occured, then the snapshot currently in pDb->aSnapshot
  ** has been synced to disk. This means it may be possible to wrap the
  ** log file. Obtain the WRITER lock and update the relevent tree-header
  ** fields to reflect this. 
  */
  if( rc==LSM_OK ){
    u64 iLogoff = lsmCheckpointLogOffset(pDb->aSnapshot);
    if( pDb->nTransOpen==0 ){
      rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
    }
    if( rc==LSM_OK ){
      rc = lsmTreeLoadHeader(pDb);
      if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
      if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
      if( rc==LSM_BUSY ) rc = LSM_OK;
      if( pDb->nTransOpen==0 ){
        rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
      }
    }
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
667
668
669
670
671
672
673

674
675
676
677
678
679
680

  /* If there is no read-transaction open, open one now. */
  rc = lsmBeginReadTrans(pDb);

  /* Attempt to take the WRITER lock */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);

  }

  /* If the previous writer failed mid-transaction, run emergency rollback. */
  if( rc==LSM_OK && pShm->bWriter ){
    /* TODO: This! */
    assert( 0 );
    rc = LSM_CORRUPT_BKPT;







>







675
676
677
678
679
680
681
682
683
684
685
686
687
688
689

  /* If there is no read-transaction open, open one now. */
  rc = lsmBeginReadTrans(pDb);

  /* Attempt to take the WRITER lock */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
  assert( rc!=LSM_BUSY );
  }

  /* If the previous writer failed mid-transaction, run emergency rollback. */
  if( rc==LSM_OK && pShm->bWriter ){
    /* TODO: This! */
    assert( 0 );
    rc = LSM_CORRUPT_BKPT;
942
943
944
945
946
947
948










949
950
951
952
953
954
955
956
957
958
959
960
961
962
963

964
965
966
967
968


969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989






























990
991
992
993
994
995
996
997
998
  }
  lsmMutexLeave(db->pEnv, p->pClientMutex);

  return rc;
}

#ifdef LSM_DEBUG










/*
** The arguments passed to this function are similar to those passed to
** the lsmShmLock() function. However, instead of obtaining a new lock 
** this function returns true if the specified connection already holds 
** (or does not hold) such a lock, depending on the value of eOp. As
** follows:
**
**   (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
**   (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
**   (eOp==LSM_LOCK_EXCL)   -> true if db has an EXCLUSIVE lock on iLock.
*/
int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
  const u32 me = (1 << (iLock-1));
  const u32 ms = (1 << (iLock+16-1));
  int ret;


  assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  assert( iLock<=16 );
  assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );



  switch( eOp ){
    case LSM_LOCK_UNLOCK:
      ret = (db->mLock & (me|ms))==0;
      break;
    case LSM_LOCK_SHARED:
      ret = db->mLock & (me|ms);
      break;
    case LSM_LOCK_EXCL:
      ret = (db->mLock & me);
      break;
    default:
      assert( !"bad eOp value passed to lsmShmAssertLock()" );
      break;
  }

  return ret;
}

int lsmShmAssertWorker(lsm_db *db){
  return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
}






























#endif

void lsmShmBarrier(lsm_db *db){
  /* TODO */
}











>
>
>
>
>
>
>
>
>
>















>





>
>


|


|


|












>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>









951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
  }
  lsmMutexLeave(db->pEnv, p->pClientMutex);

  return rc;
}

#ifdef LSM_DEBUG

int shmLockType(lsm_db *db, int iLock){
  const u32 me = (1 << (iLock-1));
  const u32 ms = (1 << (iLock+16-1));

  if( db->mLock & me ) return LSM_LOCK_EXCL;
  if( db->mLock & ms ) return LSM_LOCK_SHARED;
  return LSM_LOCK_UNLOCK;
}

/*
** The arguments passed to this function are similar to those passed to
** the lsmShmLock() function. However, instead of obtaining a new lock 
** this function returns true if the specified connection already holds 
** (or does not hold) such a lock, depending on the value of eOp. As
** follows:
**
**   (eOp==LSM_LOCK_UNLOCK) -> true if db has no lock on iLock
**   (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
**   (eOp==LSM_LOCK_EXCL)   -> true if db has an EXCLUSIVE lock on iLock.
*/
int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
  const u32 me = (1 << (iLock-1));
  const u32 ms = (1 << (iLock+16-1));
  int ret;
  int eHave;

  assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  assert( iLock<=16 );
  assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );

  eHave = shmLockType(db, iLock);

  switch( eOp ){
    case LSM_LOCK_UNLOCK:
      ret = (eHave==LSM_LOCK_UNLOCK);
      break;
    case LSM_LOCK_SHARED:
      ret = (eHave!=LSM_LOCK_UNLOCK);
      break;
    case LSM_LOCK_EXCL:
      ret = (eHave==LSM_LOCK_EXCL);
      break;
    default:
      assert( !"bad eOp value passed to lsmShmAssertLock()" );
      break;
  }

  return ret;
}

int lsmShmAssertWorker(lsm_db *db){
  return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
}

/*
** This function does not contribute to library functionality, and is not
** included in release builds. It is intended to be called from within
** an interactive debugger.
**
** When called, this function prints a single line of human readable output
** to stdout describing the locks currently held by the connection. For 
** example:
**
**     (gdb) call print_db_locks(pDb)
**     (shared on dms2) (exclusive on writer) 
*/
void print_db_locks(lsm_db *db){
  int iLock;
  for(iLock=0; iLock<16; iLock++){
    int bOne = 0;
    const char *azLock[] = {0, "shared", "exclusive"};
    const char *azName[] = {
      0, "dms1", "dms2", "writer", "worker", "checkpointer",
      "reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
    };
    int eHave = shmLockType(db, iLock);
    if( azLock[eHave] ){
      printf("%s(%s on %s)", (bOne?" ":""), azLock[eHave], azName[iLock]);
      bOne = 1;
    }
  }
  printf("\n");
}
#endif

void lsmShmBarrier(lsm_db *db){
  /* TODO */
}