SQLite4
Check-in [e97d02f068]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add a missing unlock(WRITER) call to lsmCheckpointWrite().
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | multi-process
Files: files | file ages | folders
SHA1: e97d02f06850b13c5f41a5fa6d73d9d81776e0ea
User & Date: dan 2012-09-03 17:57:56
Context
2012-09-03
19:26
Fix a bug occuring when the last connection to disconnect from a database is not also the last to write to it. check-in: 75de95787f user: dan tags: multi-process
17:57
Add a missing unlock(WRITER) call to lsmCheckpointWrite(). check-in: e97d02f068 user: dan tags: multi-process
14:50
Fix a problem with growing the database file in mmap mode. check-in: 1a86aa1c6b user: dan tags: multi-process
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/lsm_ckpt.c.

   366    366     int bCksum,                     /* If true, include checksums */
   367    367     void **ppCkpt,                  /* OUT: Buffer containing checkpoint */
   368    368     int *pnCkpt                     /* OUT: Size of checkpoint in bytes */
   369    369   ){
   370    370     int rc = LSM_OK;                /* Return Code */
   371    371     FileSystem *pFS = pDb->pFS;     /* File system object */
   372    372     Snapshot *pSnap = pDb->pWorker; /* Worker snapshot */
   373         -  int nAll = 0;                   /* Number of levels in db */
   374         -  int nHdrLevel = 0;              /* Number of levels in checkpoint */
   375         -  int iLevel;                     /* Used to count out nHdrLevel levels */
          373  +  int nLevel = 0;                 /* Number of levels in checkpoint */
          374  +  int iLevel;                     /* Used to count out nLevel levels */
   376    375     int iOut = 0;                   /* Current offset in aCkpt[] */
   377    376     Level *pLevel;                  /* Level iterator */
   378    377     int i;                          /* Iterator used while serializing freelist */
   379    378     CkptBuffer ckpt;
   380    379     int nFree;
   381    380    
   382    381     nFree = pSnap->freelist.nEntry;
................................................................................
   394    393     /* Write the log offset into the checkpoint. */
   395    394     ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);
   396    395   
   397    396     /* Write the append-point list */
   398    397     ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);
   399    398   
   400    399     /* Figure out how many levels will be written to the checkpoint. */
   401         -  for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nAll++;
   402         -  nHdrLevel = nAll;
   403         -  assert( nHdrLevel>0 );
          400  +  for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nLevel++;
   404    401   
   405         -  /* Serialize nHdrLevel levels. */
          402  +  /* Serialize nLevel levels. */
   406    403     iLevel = 0;
   407         -  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nHdrLevel; pLevel=pLevel->pNext){
          404  +  for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
   408    405       ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
   409    406       iLevel++;
   410    407     }
   411    408   
   412    409     /* Write the freelist */
   413    410     if( rc==LSM_OK ){
   414    411       ckptSetValue(&ckpt, iOut++, nFree, &rc);
................................................................................
   423    420     /* Write the checkpoint header */
   424    421     assert( iId>=0 );
   425    422     ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
   426    423     ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
   427    424     ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
   428    425     ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
   429    426     ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
   430         -  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nHdrLevel, &rc);
          427  +  ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
   431    428     ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
   432    429     ckptSetValue(&ckpt, CKPT_HDR_OVFL, nOvfl, &rc);
   433    430   
   434    431     if( bCksum ){
   435    432       ckptAddChecksum(&ckpt, iOut, &rc);
   436    433     }else{
   437    434       ckptSetValue(&ckpt, iOut, 0, &rc);

Changes to src/lsm_shared.c.

   510    510         if( rc==LSM_OK ) pShm->iMetaPage = iMeta;
   511    511       }
   512    512     }
   513    513   
   514    514     /* If no error has occured, then the snapshot currently in pDb->aSnapshot
   515    515     ** has been synced to disk. This means it may be possible to wrap the
   516    516     ** log file. Obtain the WRITER lock and update the relevent tree-header
   517         -  ** fields to reflect this.  */
          517  +  ** fields to reflect this. 
          518  +  */
   518    519     if( rc==LSM_OK ){
   519    520       u64 iLogoff = lsmCheckpointLogOffset(pDb->aSnapshot);
   520         -    rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
   521         -    if( rc==LSM_OK ) rc = lsmTreeLoadHeader(pDb);
   522         -    if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
   523         -    if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
   524         -    if( rc==LSM_BUSY ) rc = LSM_OK;
          521  +    if( pDb->nTransOpen==0 ){
          522  +      rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
          523  +    }
          524  +    if( rc==LSM_OK ){
          525  +      rc = lsmTreeLoadHeader(pDb);
          526  +      if( rc==LSM_OK ) lsmLogCheckpoint(pDb, iLogoff);
          527  +      if( rc==LSM_OK ) lsmTreeEndTransaction(pDb, 1);
          528  +      if( rc==LSM_BUSY ) rc = LSM_OK;
          529  +      if( pDb->nTransOpen==0 ){
          530  +        rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_UNLOCK, 0);
          531  +      }
          532  +    }
   525    533     }
   526    534   
   527    535     lsmShmLock(pDb, LSM_LOCK_CHECKPOINTER, LSM_LOCK_UNLOCK, 0);
   528    536     return rc;
   529    537   }
   530    538   
   531    539   int lsmBeginWork(lsm_db *pDb){
................................................................................
   667    675   
   668    676     /* If there is no read-transaction open, open one now. */
   669    677     rc = lsmBeginReadTrans(pDb);
   670    678   
   671    679     /* Attempt to take the WRITER lock */
   672    680     if( rc==LSM_OK ){
   673    681       rc = lsmShmLock(pDb, LSM_LOCK_WRITER, LSM_LOCK_EXCL, 0);
          682  +  assert( rc!=LSM_BUSY );
   674    683     }
   675    684   
   676    685     /* If the previous writer failed mid-transaction, run emergency rollback. */
   677    686     if( rc==LSM_OK && pShm->bWriter ){
   678    687       /* TODO: This! */
   679    688       assert( 0 );
   680    689       rc = LSM_CORRUPT_BKPT;
................................................................................
   942    951     }
   943    952     lsmMutexLeave(db->pEnv, p->pClientMutex);
   944    953   
   945    954     return rc;
   946    955   }
   947    956   
   948    957   #ifdef LSM_DEBUG
          958  +
          959  +int shmLockType(lsm_db *db, int iLock){
          960  +  const u32 me = (1 << (iLock-1));
          961  +  const u32 ms = (1 << (iLock+16-1));
          962  +
          963  +  if( db->mLock & me ) return LSM_LOCK_EXCL;
          964  +  if( db->mLock & ms ) return LSM_LOCK_SHARED;
          965  +  return LSM_LOCK_UNLOCK;
          966  +}
          967  +
   949    968   /*
   950    969   ** The arguments passed to this function are similar to those passed to
   951    970   ** the lsmShmLock() function. However, instead of obtaining a new lock 
   952    971   ** this function returns true if the specified connection already holds 
   953    972   ** (or does not hold) such a lock, depending on the value of eOp. As
   954    973   ** follows:
   955    974   **
................................................................................
   957    976   **   (eOp==LSM_LOCK_SHARED) -> true if db has at least a SHARED lock on iLock.
   958    977   **   (eOp==LSM_LOCK_EXCL)   -> true if db has an EXCLUSIVE lock on iLock.
   959    978   */
   960    979   int lsmShmAssertLock(lsm_db *db, int iLock, int eOp){
   961    980     const u32 me = (1 << (iLock-1));
   962    981     const u32 ms = (1 << (iLock+16-1));
   963    982     int ret;
          983  +  int eHave;
   964    984   
   965    985     assert( iLock>=1 && iLock<=LSM_LOCK_READER(LSM_LOCK_NREADER-1) );
   966    986     assert( iLock<=16 );
   967    987     assert( eOp==LSM_LOCK_UNLOCK || eOp==LSM_LOCK_SHARED || eOp==LSM_LOCK_EXCL );
   968    988   
          989  +  eHave = shmLockType(db, iLock);
          990  +
   969    991     switch( eOp ){
   970    992       case LSM_LOCK_UNLOCK:
   971         -      ret = (db->mLock & (me|ms))==0;
          993  +      ret = (eHave==LSM_LOCK_UNLOCK);
   972    994         break;
   973    995       case LSM_LOCK_SHARED:
   974         -      ret = db->mLock & (me|ms);
          996  +      ret = (eHave!=LSM_LOCK_UNLOCK);
   975    997         break;
   976    998       case LSM_LOCK_EXCL:
   977         -      ret = (db->mLock & me);
          999  +      ret = (eHave==LSM_LOCK_EXCL);
   978   1000         break;
   979   1001       default:
   980   1002         assert( !"bad eOp value passed to lsmShmAssertLock()" );
   981   1003         break;
   982   1004     }
   983   1005   
   984   1006     return ret;
   985   1007   }
   986   1008   
   987   1009   int lsmShmAssertWorker(lsm_db *db){
   988   1010     return lsmShmAssertLock(db, LSM_LOCK_WORKER, LSM_LOCK_EXCL) && db->pWorker;
   989   1011   }
         1012  +
         1013  +/*
         1014  +** This function does not contribute to library functionality, and is not
         1015  +** included in release builds. It is intended to be called from within
         1016  +** an interactive debugger.
         1017  +**
         1018  +** When called, this function prints a single line of human readable output
         1019  +** to stdout describing the locks currently held by the connection. For 
         1020  +** example:
         1021  +**
         1022  +**     (gdb) call print_db_locks(pDb)
         1023  +**     (shared on dms2) (exclusive on writer) 
         1024  +*/
         1025  +void print_db_locks(lsm_db *db){
         1026  +  int iLock;
         1027  +  for(iLock=0; iLock<16; iLock++){
         1028  +    int bOne = 0;
         1029  +    const char *azLock[] = {0, "shared", "exclusive"};
         1030  +    const char *azName[] = {
         1031  +      0, "dms1", "dms2", "writer", "worker", "checkpointer",
         1032  +      "reader0", "reader1", "reader2", "reader3", "reader4", "reader5"
         1033  +    };
         1034  +    int eHave = shmLockType(db, iLock);
         1035  +    if( azLock[eHave] ){
         1036  +      printf("%s(%s on %s)", (bOne?" ":""), azLock[eHave], azName[iLock]);
         1037  +      bOne = 1;
         1038  +    }
         1039  +  }
         1040  +  printf("\n");
         1041  +}
   990   1042   #endif
   991   1043   
   992   1044   void lsmShmBarrier(lsm_db *db){
   993   1045     /* TODO */
   994   1046   }
   995   1047   
   996   1048   
   997   1049   
   998   1050