Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Avoid reusing a block before it is guaranteed that it is not required even if a crash occurs.
Downloads: Tarball | ZIP archive
Timelines: family | ancestors | descendants | both | multi-process
Files: files | file ages | folders
SHA1: 5f9bb542f741d7c221b57bc7d5761b68b1ec6654
User & Date: dan 2012-08-31 20:26:40.872
Context
2012-09-01
09:11
Fix a bug in the test used to determine if a free block is ready for reuse. check-in: 3423d37092 user: dan tags: multi-process
2012-08-31
20:26
Avoid reusing a block before it is guaranteed that it is not required even if a crash occurs. check-in: 5f9bb542f7 user: dan tags: multi-process
18:43
Add DMS "lock". check-in: 5f4708d2e9 user: dan tags: multi-process
Changes
Unified Diff Ignore Whitespace Patch
Changes to src/lsm_ckpt.c.
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
** checkpoint.
*/
static int ckptChecksumOk(u32 *aCkpt){
  u32 nCkpt = aCkpt[CKPT_HDR_NCKPT];
  u32 cksum1;
  u32 cksum2;

  if( nCkpt<CKPT_HDR_NCKPT ) return 0;
  ckptChecksum(aCkpt, nCkpt, &cksum1, &cksum2);
  return (cksum1==aCkpt[nCkpt-2] && cksum2==aCkpt[nCkpt-1]);
}

/*
** Attempt to load a checkpoint from meta page iMeta.
**







|







774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
** checkpoint.
*/
static int ckptChecksumOk(u32 *aCkpt){
  u32 nCkpt = aCkpt[CKPT_HDR_NCKPT];
  u32 cksum1;
  u32 cksum2;

  if( nCkpt<CKPT_HDR_NCKPT || nCkpt>(LSM_META_PAGE_SIZE)/sizeof(u32) ) return 0;
  ckptChecksum(aCkpt, nCkpt, &cksum1, &cksum2);
  return (cksum1==aCkpt[nCkpt-2] && cksum2==aCkpt[nCkpt-1]);
}

/*
** Attempt to load a checkpoint from meta page iMeta.
**
1078
1079
1080
1081
1082
1083
1084






































1085
1086
1087
1088
1089
1090
1091
  memcpy(pShm->aWorker, p, n);
  lsmShmBarrier(pDb);
  memcpy(pShm->aClient, p, n);
  lsmFree(pDb->pEnv, p);

  return LSM_OK;
}







































/*
** Return the checkpoint-id of the checkpoint array passed as the first
** argument to this function. If the second argument is true, then assume
** that the checkpoint is made up of 32-bit big-endian integers. If it
** is false, assume that the integers are in machine byte order.
*/







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
  memcpy(pShm->aWorker, p, n);
  lsmShmBarrier(pDb);
  memcpy(pShm->aClient, p, n);
  lsmFree(pDb->pEnv, p);

  return LSM_OK;
}

int lsmCheckpointSynced(lsm_db *pDb, i64 *piId){
  int rc = LSM_OK;
  const int nAttempt = 3;
  int i;
  for(i=0; i<nAttempt; i++){
    MetaPage *pPg;
    u32 iMeta;

    iMeta = pDb->pShmhdr->iMetaPage;
    rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg);
    if( rc==LSM_OK ){
      int nCkpt;
      int nData;
      u8 *aData; 

      aData = lsmFsMetaPageData(pPg, &nData);
      assert( nData==LSM_META_PAGE_SIZE );
      nCkpt = lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);

      if( nCkpt<(LSM_META_PAGE_SIZE/sizeof(u32)) ){
        u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc);
        if( aCopy ){
          memcpy(aCopy, aData, nCkpt*sizeof(u32));
          ckptChangeEndianness(aCopy, nCkpt);
          if( ckptChecksumOk(aCopy) ){
            *piId = lsmCheckpointId(aCopy, 0);
          }
          lsmFree(pDb->pEnv, aCopy);
        }
      }
      lsmFsMetaPageRelease(pPg);
    }
    if( rc!=LSM_OK || pDb->pShmhdr->iMetaPage==iMeta ) break;
  }

  return (rc==LSM_OK && i==3) ? LSM_BUSY : LSM_OK;
}

/*
** Return the checkpoint-id of the checkpoint array passed as the first
** argument to this function. If the second argument is true, then assume
** that the checkpoint is made up of 32-bit big-endian integers. If it
** is false, assume that the integers are in machine byte order.
*/
Changes to src/lsm_shared.c.
390
391
392
393
394
395
396



397





398
399
400
401
402
403
404
405
406
    /* The first block on the free list was freed as part of the work done
    ** to create the snapshot with id iFree. So, we can reuse this block if
    ** snapshot iFree or later has been checkpointed and all currently 
    ** active clients are reading from snapshot iFree or later.  */
    i64 iFree = pFree->aEntry[0].iId;
    int bInUse = 0;




    /* TODO: The "has been checkpointed" bit */






    rc = lsmLsmInUse(pDb, iFree, &bInUse);
    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );
    }
  }








>
>
>
|
>
>
>
>
>
|
|







390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
    /* The first block on the free list was freed as part of the work done
    ** to create the snapshot with id iFree. So, we can reuse this block if
    ** snapshot iFree or later has been checkpointed and all currently 
    ** active clients are reading from snapshot iFree or later.  */
    i64 iFree = pFree->aEntry[0].iId;
    int bInUse = 0;

    /* The "is in use" bit */
    rc = lsmLsmInUse(pDb, iFree, &bInUse);

    /* The "has been checkpointed" bit */
    if( rc==LSM_OK && bInUse==0 ){
      i64 iId = 0;
      rc = lsmCheckpointSynced(pDb, &iId);
      if( rc==LSM_OK && iId<iFree ) bInUse = 1;
      if( rc==LSM_BUSY ) rc = LSM_OK;
    }

    if( rc==LSM_OK && bInUse==0 ){
      iRet = pFree->aEntry[0].iBlk;
      flRemoveEntry0(pFree);
      assert( iRet!=0 );
    }
  }