SQLite4
Check-in [e97d02f068]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add a missing unlock(WRITER) call to lsmCheckpointWrite().
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | multi-process
Files: files | file ages | folders
SHA1: e97d02f06850b13c5f41a5fa6d73d9d81776e0ea
User & Date: dan 2012-09-03 17:57:56
Context
2012-09-03
19:26
Fix a bug occuring when the last connection to disconnect from a database is not also the last to write to it. check-in: 75de95787f user: dan tags: multi-process
17:57
Add a missing unlock(WRITER) call to lsmCheckpointWrite(). check-in: e97d02f068 user: dan tags: multi-process
14:50
Fix a problem with growing the database file in mmap mode. check-in: 1a86aa1c6b user: dan tags: multi-process
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to src/lsm_ckpt.c.

366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
...
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
...
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
  int bCksum,                     /* If true, include checksums */
  void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
  int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
){
  int rc = LSM_OK;                /* Return Code */
  FileSystem *pFS = pDb->pFS;     /* File system object */
  Snapshot *pSnap = pDb->pWorker; /* Worker snapshot */
  int nAll = 0;                   /* Number of levels in db */
  int nHdrLevel = 0;              /* Number of levels in checkpoint */
  int iLevel;                     /* Used to count out nHdrLevel levels */
  int iOut = 0;                   /* Current offset in aCkpt[] */
  Level *pLevel;                  /* Level iterator */
  int i;                          /* Iterator used while serializing freelist */
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
................................................................................
  /* Write the log offset into the checkpoint. */
  ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);

  /* Write the append-point list */
  ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);

  /* Figure out how many levels will be written to the checkpoint. */
  for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nAll++;
  nHdrLevel = nAll;
  assert( nHdrLevel>0 );

  /* Serialize nHdrLevel levels. */
  iLevel = 0;
  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nHdrLevel; pLevel=pLevel->pNext){
    ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
    iLevel++;
  }

  /* Write the freelist */
  if( rc==LSM_OK ){
    ckptSetValue(&ckpt, iOut++, nFree, &rc);
................................................................................
  /* Write the checkpoint header */
  assert( iId>=0 );
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nHdrLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);







<
|
|







 







|
<
<

|

|







 







|







366
367
368
369
370
371
372

373
374
375
376
377
378
379
380
381
...
393
394
395
396
397
398
399
400


401
402
403
404
405
406
407
408
409
410
411
...
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
  int bCksum,                     /* If true, include checksums */
  void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
  int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
){
  int rc = LSM_OK;                /* Return Code */
  FileSystem *pFS = pDb->pFS;     /* File system object */
  Snapshot *pSnap = pDb->pWorker; /* Worker snapshot */

  int nLevel = 0;                 /* Number of levels in checkpoint */
  int iLevel;                     /* Used to count out nLevel levels */
  int iOut = 0;                   /* Current offset in aCkpt[] */
  Level *pLevel;                  /* Level iterator */
  int i;                          /* Iterator used while serializing freelist */
  CkptBuffer ckpt;
  int nFree;
 
  nFree = pSnap->freelist.nEntry;
................................................................................
  /* Write the log offset into the checkpoint. */
  ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);

  /* Write the append-point list */
  ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);

  /* Figure out how many levels will be written to the checkpoint. */
  for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nLevel++;



  /* Serialize nLevel levels. */
  iLevel = 0;
  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
    ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
    iLevel++;
  }

  /* Write the freelist */
  if( rc==LSM_OK ){
    ckptSetValue(&ckpt, iOut++, nFree, &rc);
................................................................................
  /* Write the checkpoint header */
  assert( iId>=0 );
  ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
  ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
  ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);

  if( bCksum ){
    ckptAddChecksum(&ckpt, iOut, &rc);
  }else{
    ckptSetValue(&ckpt, iOut, 0, &rc);

Changes to src/lsm_shared.c.

510
511
512
513
514
515
516
517

518
519

520


521
522
523
524




525
526
527
528
529
530
531
...
667
668
669
670
671
672
673

674
675
676
677
678
679
680
...
942
943
944
945
946
947
948










949
950
951
952
953
954
955
...
957
958
959
960
961
962
963

964
965
966
967
968


969
970
971

972
973
974

975
976
977

978
979
980
981
982
983
984
985
986
987
988
989






























990
991
992
993
994
995
996
997
998
      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
    }
  }

  /* If no error has occured, then the snapshot currently in pDb->aSnapshot
  ** has been synced to disk. This means it may be possible to wrap the
  ** log file. Obtain the WRITER lock and update the relevent tree-header
  ** fields to reflect this.  */

  if( rc==LSM_OK ){
    u64 iLogoff = lsmCheckpointLogOffset(pDb->aSnapshot);

    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);


    if( rc==LSM_OK ) rc = lsmTreeLoadHeader(pDb);
    if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
    if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
    if( rc==LSM_BUSY ) rc = LSM_OK;




  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
................................................................................

  /* If there is no read-transaction open, open one now. */
  rc = lsmBeginReadTrans(pDb);

  /* Attempt to take the WRITER lock */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);

  }

  /* If the previous writer failed mid-transaction, run emergency rollback. */
  if( rc==LSM_OK && pShm->bWriter ){
    /* TODO: This! */
    assert( 0 );
    rc = LSM_CORRUPT_BKPT;
................................................................................
  }
  lsmMutexLeave(db->pEnv, p->pClientMutex);

  return rc;
}

#ifdef LSM_DEBUG










/*
** The arguments passed to this function are similar to those passed to
** the lsmShmLock() function. However, instead of obtaining a new lock 
** this function returns true if the specified connection already holds 
** (or does not hold) such a lock, depending on the value of eOp. As
** follows:
**
................................................................................
**   (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
**   (eOp==LSM_LOCK_EXCL)   -> true if db has an EXCLUSIVE lock on iLock.
*/
int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
  const u32 me = (1 << (iLock-1));
  const u32 ms = (1 << (iLock+16-1));
  int ret;


  assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  assert( iLock<=16 );
  assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );



  switch( eOp ){
    case LSM_LOCK_UNLOCK:
      ret = (db->mLock & (me|ms))==0;

      break;
    case LSM_LOCK_SHARED:
      ret = db->mLock & (me|ms);

      break;
    case LSM_LOCK_EXCL:
      ret = (db->mLock & me);

      break;
    default:
      assert( !"bad eOp value passed to lsmShmAssertLock()" );
      break;
  }

  return ret;
}

int lsmShmAssertWorker(lsm_db *db){
  return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
}






























#endif

void lsmShmBarrier(lsm_db *db){
  /* TODO */
}











|
>


>
|
>
>
|
|
|
|
>
>
>
>







 







>







 







>
>
>
>
>
>
>
>
>
>







 







>





>
>


<
>


<
>


<
>












>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>









510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
...
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
...
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
...
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992

993
994
995

996
997
998

999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
      if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
    }
  }

  /* If no error has occured, then the snapshot currently in pDb->aSnapshot
  ** has been synced to disk. This means it may be possible to wrap the
  ** log file. Obtain the WRITER lock and update the relevent tree-header
  ** fields to reflect this. 
  */
  if( rc==LSM_OK ){
    u64 iLogoff = lsmCheckpointLogOffset(pDb->aSnapshot);
    if( pDb->nTransOpen==0 ){
      rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
    }
    if( rc==LSM_OK ){
      rc = lsmTreeLoadHeader(pDb);
      if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
      if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
      if( rc==LSM_BUSY ) rc = LSM_OK;
      if( pDb->nTransOpen==0 ){
        rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
      }
    }
  }

  lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
  return rc;
}

int lsmBeginWork(lsm_db *pDb){
................................................................................

  /* If there is no read-transaction open, open one now. */
  rc = lsmBeginReadTrans(pDb);

  /* Attempt to take the WRITER lock */
  if( rc==LSM_OK ){
    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
  assert( rc!=LSM_BUSY );
  }

  /* If the previous writer failed mid-transaction, run emergency rollback. */
  if( rc==LSM_OK && pShm->bWriter ){
    /* TODO: This! */
    assert( 0 );
    rc = LSM_CORRUPT_BKPT;
................................................................................
  }
  lsmMutexLeave(db->pEnv, p->pClientMutex);

  return rc;
}

#ifdef LSM_DEBUG

int shmLockType(lsm_db *db, int iLock){
  const u32 me = (1 << (iLock-1));
  const u32 ms = (1 << (iLock+16-1));

  if( db->mLock & me ) return LSM_LOCK_EXCL;
  if( db->mLock & ms ) return LSM_LOCK_SHARED;
  return LSM_LOCK_UNLOCK;
}

/*
** The arguments passed to this function are similar to those passed to
** the lsmShmLock() function. However, instead of obtaining a new lock 
** this function returns true if the specified connection already holds 
** (or does not hold) such a lock, depending on the value of eOp. As
** follows:
**
................................................................................
**   (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
**   (eOp==LSM_LOCK_EXCL)   -> true if db has an EXCLUSIVE lock on iLock.
*/
int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
  const u32 me = (1 << (iLock-1));
  const u32 ms = (1 << (iLock+16-1));
  int ret;
  int eHave;

  assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
  assert( iLock<=16 );
  assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );

  eHave = shmLockType(db, iLock);

  switch( eOp ){
    case LSM_LOCK_UNLOCK:

      ret = (eHave==LSM_LOCK_UNLOCK);
      break;
    case LSM_LOCK_SHARED:

      ret = (eHave!=LSM_LOCK_UNLOCK);
      break;
    case LSM_LOCK_EXCL:

      ret = (eHave==LSM_LOCK_EXCL);
      break;
    default:
      assert( !"bad eOp value passed to lsmShmAssertLock()" );
      break;
  }

  return ret;
}

int lsmShmAssertWorker(lsm_db *db){
  return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
}

/*
** This function does not contribute to library functionality, and is not
** included in release builds. It is intended to be called from within
** an interactive debugger.
**
** When called, this function prints a single line of human readable output
** to stdout describing the locks currently held by the connection. For 
** example:
**
**     (gdb) call print_db_locks(pDb)
**     (shared on dms2) (exclusive on writer) 
*/
void print_db_locks(lsm_db *db){
  int iLock;
  for(iLock=0; iLock<16; iLock++){
    int bOne = 0;
    const char *azLock[] = {0, "shared", "exclusive"};
    const char *azName[] = {
      0, "dms1", "dms2", "writer", "worker", "checkpointer",
      "reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
    };
    int eHave = shmLockType(db, iLock);
    if( azLock[eHave] ){
      printf("%s(%s on %s)", (bOne?" ":""), azLock[eHave], azName[iLock]);
      bOne = 1;
    }
  }
  printf("\n");
}
#endif

void lsmShmBarrier(lsm_db *db){
  /* TODO */
}