SQLite4
Check-in [dc4fa92596]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Truncate the database file when the number of connections drops from one to zero.
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1: dc4fa9259613d17dbb6dc4c87dfc3c5400b2c868
User & Date: dan 2013-01-18 10:46:03
Context
2013-01-18
15:58
Display the 'age' of each level in lsmview. check-in: 33cc91541c user: dan tags: trunk
10:46
Truncate the database file when the number of connections drops from one to zero. check-in: dc4fa92596 user: dan tags: trunk
2013-01-17
19:13
Fix an lsm bug causing it to choose the wrong block to reuse. check-in: 2ff461b422 user: dan tags: trunk
Changes
Hide Diffs Side-by-Side Diffs Ignore Whitespace Patch

Changes to src/lsmInt.h.

   636    636   
   637    637   int lsmFsFileid(lsm_db *pDb, void **ppId, int *pnId);
   638    638   
   639    639   /* Creating, populating, gobbling and deleting sorted runs. */
   640    640   void lsmFsGobble(lsm_db *, Segment *, Pgno *, int);
   641    641   int lsmFsSortedDelete(FileSystem *, Snapshot *, int, Segment *);
   642    642   int lsmFsSortedFinish(FileSystem *, Segment *);
   643         -int lsmFsSortedAppend(FileSystem *, Snapshot *, Segment *, int, Page **);
          643  +int lsmFsSortedAppend(FileSystem *, Snapshot *, Level *, int, Page **);
   644    644   int lsmFsSortedPadding(FileSystem *, Snapshot *, Segment *);
   645    645   
   646    646   /* Functions to retrieve the lsm_env pointer from a FileSystem or Page object */
   647    647   lsm_env *lsmFsEnv(FileSystem *);
   648    648   lsm_env *lsmPageEnv(Page *);
   649    649   FileSystem *lsmPageFS(Page *);
   650    650   

Changes to src/lsm_ckpt.c.

   882    882   
   883    883   int lsmCheckpointLoadWorker(lsm_db *pDb){
   884    884     int rc;
   885    885     ShmHeader *pShm = pDb->pShmhdr;
   886    886     int nInt1;
   887    887     int nInt2;
   888    888   
   889         -  /* Must be holding the WORKER lock to do this */
   890         -  assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );
          889  +  /* Must be holding the WORKER lock to do this. Or DMS2. */
          890  +  assert( 
          891  +      lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) 
          892  +   || lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL) 
          893  +  );
   891    894   
   892    895     /* Check that the two snapshots match. If not, repair them. */
   893    896     nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT];
   894    897     nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT];
   895    898     if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){
   896    899       if( ckptChecksumOk(pShm->aSnap1) ){
   897    900         memcpy(pShm->aSnap2, pShm->aSnap1, sizeof(u32)*nInt1);
................................................................................
   899    902         memcpy(pShm->aSnap1, pShm->aSnap2, sizeof(u32)*nInt2);
   900    903       }else{
   901    904         return LSM_PROTOCOL;
   902    905       }
   903    906     }
   904    907   
   905    908     rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
          909  +  if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;
          910  +
   906    911     assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
   907    912     return rc;
   908    913   }
   909    914   
   910    915   int lsmCheckpointDeserialize(
   911    916     lsm_db *pDb, 
   912    917     int bInclFreelist,              /* If true, deserialize free-list */

Changes to src/lsm_file.c.

  1547   1547       }
  1548   1548       rc = fsPageGet(pFS, iPg, 0, ppNext, 0);
  1549   1549     }
  1550   1550   
  1551   1551     return rc;
  1552   1552   }
  1553   1553   
  1554         -static Pgno findAppendPoint(FileSystem *pFS){
         1554  +static Pgno findAppendPoint(FileSystem *pFS, Level *pLvl){
  1555   1555     int i;
  1556   1556     Pgno *aiAppend = pFS->pDb->pWorker->aiAppend;
  1557   1557     u32 iRet = 0;
  1558   1558   
  1559   1559     for(i=LSM_APPLIST_SZ-1; iRet==0 && i>=0; i--){
  1560         -    if( (iRet = aiAppend[i]) ) aiAppend[i] = 0;
         1560  +    if( (iRet = aiAppend[i]) ){
         1561  +      if( pLvl ){
         1562  +        int iBlk = fsPageToBlock(pFS, iRet);
         1563  +        int j;
         1564  +        for(j=0; iRet && j<pLvl->nRight; j++){
         1565  +          if( fsPageToBlock(pFS, pLvl->aRhs[j].iLastPg)==iBlk ){
         1566  +            iRet = 0;
         1567  +          }
         1568  +        }
         1569  +      }
         1570  +      if( iRet ) aiAppend[i] = 0;
         1571  +    }
  1561   1572     }
  1562   1573     return iRet;
  1563   1574   }
  1564   1575   
  1565   1576   /*
  1566         -** Append a page to file iFile. Set the ref-count to 1 and return a pointer
  1567         -** to it. The page is writable until either lsmFsPagePersist() is called on 
  1568         -** it or the ref-count drops to zero.
         1577  +** Append a page to the left-hand-side of pLvl. Set the ref-count to 1 and
         1578  +** return a pointer to it. The page is writable until either 
         1579  +** lsmFsPagePersist() is called on it or the ref-count drops to zero.
  1569   1580   */
  1570   1581   int lsmFsSortedAppend(
  1571   1582     FileSystem *pFS, 
  1572   1583     Snapshot *pSnapshot,
  1573         -  Segment *p, 
         1584  +  Level *pLvl,
  1574   1585     int bDefer,
  1575   1586     Page **ppOut
  1576   1587   ){
  1577   1588     int rc = LSM_OK;
  1578   1589     Page *pPg = 0;
  1579   1590     *ppOut = 0;
  1580   1591     int iApp = 0;
  1581   1592     int iNext = 0;
         1593  +  Segment *p = &pLvl->lhs;
  1582   1594     int iPrev = p->iLastPg;
  1583   1595   
  1584   1596     if( pFS->pCompress || bDefer ){
  1585   1597       /* In compressed database mode the page is not assigned a page number
  1586   1598       ** or location in the database file at this point. This will be done
  1587   1599       ** by the lsmFsPagePersist() call.  */
  1588   1600       rc = fsPageBuffer(pFS, 1, &pPg);
................................................................................
  1596   1608         if( pFS->pCompress==0 ) pPg->nData -= 4;
  1597   1609   
  1598   1610         pPg->nRef = 1;
  1599   1611         pFS->nOut++;
  1600   1612       }
  1601   1613     }else{
  1602   1614       if( iPrev==0 ){
  1603         -      iApp = findAppendPoint(pFS);
         1615  +      iApp = findAppendPoint(pFS, pLvl);
  1604   1616       }else if( fsIsLast(pFS, iPrev) ){
  1605   1617         int iNext;
  1606   1618         rc = fsBlockNext(pFS, fsPageToBlock(pFS, iPrev), &iNext);
  1607   1619         if( rc!=LSM_OK ) return rc;
  1608   1620         iApp = fsFirstPageOnBlock(pFS, iNext);
  1609   1621       }else{
  1610   1622         iApp = iPrev + 1;
................................................................................
  1838   1850       int nWrite;
  1839   1851       Pgno iLastOnBlock;
  1840   1852       Pgno iApp = pSeg->iLastPg+1;
  1841   1853   
  1842   1854       /* If this is the first data written into the segment, find an append-point
  1843   1855       ** or allocate a new block.  */
  1844   1856       if( iApp==1 ){
  1845         -      pSeg->iFirst = iApp = findAppendPoint(pFS);
         1857  +      pSeg->iFirst = iApp = findAppendPoint(pFS, 0);
  1846   1858         if( iApp==0 ){
  1847   1859           int iBlk;
  1848   1860           rc = lsmBlockAllocate(pFS->pDb, &iBlk);
  1849   1861           pSeg->iFirst = iApp = fsFirstPageOnBlock(pFS, iBlk);
  1850   1862         }
  1851   1863       }
  1852   1864       iRet = iApp;

Changes to src/lsm_shared.c.

   146    146       /* Free the array of shm pointers */
   147    147       lsmFree(pEnv, p->apShmChunk);
   148    148   
   149    149       /* Free the memory allocated for the Database struct itself */
   150    150       lsmFree(pEnv, p);
   151    151     }
   152    152   }
          153  +
          154  +typedef struct DbTruncateCtx DbTruncateCtx;
          155  +struct DbTruncateCtx {
          156  +  int nBlock;
          157  +  i64 iInUse;
          158  +};
          159  +
          160  +static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){
          161  +  DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
          162  +  if( iBlk!=p->nBlock || (p->iInUse>=0 && iSnapshot>=p->iInUse) ) return 1;
          163  +  p->nBlock--;
          164  +  return 0;
          165  +}
          166  +
          167  +static int dbTruncate(lsm_db *pDb, i64 iInUse){
          168  +  int rc;
          169  +  int i;
          170  +  DbTruncateCtx ctx;
          171  +
          172  +  assert( pDb->pWorker );
          173  +  ctx.nBlock = pDb->pWorker->nBlock;
          174  +  ctx.iInUse = iInUse;
          175  +
          176  +  rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
          177  +  for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){
          178  +    rc = freelistAppend(pDb, i, -1);
          179  +  }
          180  +
          181  +  if( rc==LSM_OK ){
          182  +#ifdef LSM_LOG_FREELIST
          183  +    if( ctx.nBlock!=pDb->pWorker->nBlock ){
          184  +      lsmLogMessage(pDb, 0, 
          185  +          "dbTruncate(): truncated db to %d blocks",ctx.nBlock
          186  +      );
          187  +    }
          188  +#endif
          189  +    pDb->pWorker->nBlock = ctx.nBlock;
          190  +  }
          191  +  return rc;
          192  +}
          193  +
          194  +
          195  +/*
          196  +** This function is called during database shutdown (when the number of
          197  +** connections drops from one to zero). It truncates the database file
          198  +** to as small a size as possible without truncating away any blocks that
          199  +** contain data.
          200  +*/
          201  +static int dbTruncateFile(lsm_db *pDb){
          202  +  int rc;
          203  +
          204  +  assert( pDb->pWorker==0 );
          205  +  assert( lsmShmAssertLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_EXCL) );
          206  +  rc = lsmCheckpointLoadWorker(pDb);
          207  +
          208  +  if( rc==LSM_OK ){
          209  +    DbTruncateCtx ctx;
          210  +
          211  +    /* Walk the database free-block-list in reverse order. Set ctx.nBlock
          212  +    ** to the block number of the last block in the database that actually
          213  +    ** contains data. */
          214  +    ctx.nBlock = pDb->pWorker->nBlock;
          215  +    ctx.iInUse = -1;
          216  +    rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
          217  +
          218  +    /* If the last block that contains data is not already the last block in
          219  +    ** the database file, truncate the database file so that it is. */
          220  +    if( rc==LSM_OK && ctx.nBlock!=pDb->pWorker->nBlock ){
          221  +      rc = lsmFsTruncateDb(
          222  +          pDb->pFS, (i64)ctx.nBlock*lsmFsBlockSize(pDb->pFS)
          223  +      );
          224  +    }
          225  +  }
          226  +
          227  +  lsmFreeSnapshot(pDb->pEnv, pDb->pWorker);
          228  +  pDb->pWorker = 0;
          229  +  return rc;
          230  +}
   153    231   
   154    232   static void doDbDisconnect(lsm_db *pDb){
   155    233     int rc;
   156    234   
   157    235     /* Block for an exclusive lock on DMS1. This lock serializes all calls
   158    236     ** to doDbConnect() and doDbDisconnect() across all processes.  */
   159    237     rc = lsmShmLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL, 1);
................................................................................
   179    257         }
   180    258   
   181    259         /* Write a checkpoint to disk. */
   182    260         if( rc==LSM_OK ){
   183    261           rc = lsmCheckpointWrite(pDb, 1, 0);
   184    262         }
   185    263   
   186         -      /* If the checkpoint was written successfully, delete the log file */
          264  +      /* If the checkpoint was written successfully, delete the log file
          265  +      ** and, if possible, truncate the database file.  */
   187    266         if( rc==LSM_OK ){
   188    267           Database *p = pDb->pDatabase;
          268  +        dbTruncateFile(pDb);
   189    269           lsmFsCloseAndDeleteLog(pDb->pFS);
   190    270           if( p->pFile ) lsmEnvShmUnmap(pDb->pEnv, p->pFile, 1);
   191    271         }
   192    272       }
   193    273     }
   194    274   
   195    275     lsmShmLock(pDb, LSM_LOCK_DMS2, LSM_LOCK_UNLOCK, 0);
................................................................................
   538    618         }
   539    619       }
   540    620     }
   541    621   
   542    622     return rc;
   543    623   }
   544    624   
   545         -typedef struct DbTruncateCtx DbTruncateCtx;
   546         -struct DbTruncateCtx {
   547         -  int nBlock;
   548         -  i64 iInUse;
   549         -};
   550         -
   551         -static int dbTruncateCb(void *pCtx, int iBlk, i64 iSnapshot){
   552         -  DbTruncateCtx *p = (DbTruncateCtx *)pCtx;
   553         -  if( iBlk!=p->nBlock || iSnapshot>=p->iInUse ) return 1;
   554         -  p->nBlock--;
   555         -  return 0;
   556         -}
   557         -
   558         -static int dbTruncate(lsm_db *pDb, i64 iInUse){
   559         -  int rc;
   560         -  int i;
   561         -  DbTruncateCtx ctx;
   562         -
   563         -  assert( pDb->pWorker );
   564         -  ctx.nBlock = pDb->pWorker->nBlock;
   565         -  ctx.iInUse = iInUse;
   566         -
   567         -  rc = lsmWalkFreelist(pDb, 1, dbTruncateCb, (void *)&ctx);
   568         -  for(i=ctx.nBlock+1; rc==LSM_OK && i<=pDb->pWorker->nBlock; i++){
   569         -    rc = freelistAppend(pDb, i, -1);
   570         -  }
   571         -
   572         -  if( rc==LSM_OK ){
   573         -#ifdef LSM_LOG_FREELIST
   574         -    if( ctx.nBlock!=pDb->pWorker->nBlock ){
   575         -      lsmLogMessage(pDb, 0, 
   576         -          "dbTruncate(): truncated db to %d blocks",ctx.nBlock
   577         -      );
   578         -    }
   579         -#endif
   580         -    pDb->pWorker->nBlock = ctx.nBlock;
   581         -  }
   582         -  return rc;
   583         -}
   584         -
   585    625   
   586    626   typedef struct FindFreeblockCtx FindFreeblockCtx;
   587    627   struct FindFreeblockCtx {
   588    628     i64 iInUse;
   589    629     int iRet;
   590    630   };
   591    631   
................................................................................
   819    859   
   820    860     /* Attempt to take the WORKER lock */
   821    861     rc = lsmShmLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL, 0);
   822    862   
   823    863     /* Deserialize the current worker snapshot */
   824    864     if( rc==LSM_OK ){
   825    865       rc = lsmCheckpointLoadWorker(pDb);
   826         -    if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;
   827    866     }
   828    867     return rc;
   829    868   }
   830    869   
   831    870   void lsmFreeSnapshot(lsm_env *pEnv, Snapshot *p){
   832    871     if( p ){
   833    872       lsmSortedFreeLevel(pEnv, p->pLevel);

Changes to src/lsm_sorted.c.

  3104   3104   ** to the way the file.c module works some (the first and last in each block)
  3105   3105   ** are 4 bytes smaller than the others.
  3106   3106   */
  3107   3107   static int mergeWorkerMoveHierarchy(
  3108   3108     MergeWorker *pMW,               /* Merge worker */
  3109   3109     int bSep                        /* True for separators run */
  3110   3110   ){
  3111         -  Segment *pSeg;                  /* Segment being written */
  3112   3111     lsm_db *pDb = pMW->pDb;         /* Database handle */
  3113   3112     int rc = LSM_OK;                /* Return code */
  3114   3113     int i;
  3115   3114     Page **apHier = pMW->hier.apHier;
  3116   3115     int nHier = pMW->hier.nHier;
  3117   3116   
  3118         -  pSeg = &pMW->pLevel->lhs;
  3119         -
  3120   3117     for(i=0; rc==LSM_OK && i<nHier; i++){
  3121   3118       Page *pNew = 0;
  3122         -    rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, 1, &pNew);
         3119  +    rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &pNew);
  3123   3120       assert( rc==LSM_OK );
  3124   3121   
  3125   3122       if( rc==LSM_OK ){
  3126   3123         u8 *a1; int n1;
  3127   3124         u8 *a2; int n2;
  3128   3125   
  3129   3126         a1 = fsPageData(pNew, &n1);
................................................................................
  3293   3290     MergeWorker *pMW,
  3294   3291     u8 eType,
  3295   3292     Pgno iPtr,
  3296   3293     Pgno iKeyPg,
  3297   3294     void *pKey,
  3298   3295     int nKey
  3299   3296   ){
  3300         -  Segment *pSeg = &pMW->pLevel->lhs;
  3301   3297     Hierarchy *p = &pMW->hier;
  3302   3298     lsm_db *pDb = pMW->pDb;         /* Database handle */
  3303   3299     int rc = LSM_OK;                /* Return Code */
  3304   3300     int iLevel;                     /* Level of b-tree hierachy to write to */
  3305   3301     int nData;                      /* Size of aData[] in bytes */
  3306   3302     u8 *aData;                      /* Page data for level iLevel */
  3307   3303     int iOff;                       /* Offset on b-tree page to write record to */
................................................................................
  3361   3357         }
  3362   3358       }
  3363   3359   
  3364   3360       /* Allocate a new page for apHier[iLevel]. */
  3365   3361       p->apHier[iLevel] = 0;
  3366   3362       if( rc==LSM_OK ){
  3367   3363         rc = lsmFsSortedAppend(
  3368         -          pDb->pFS, pDb->pWorker, pSeg, 1, &p->apHier[iLevel]
         3364  +          pDb->pFS, pDb->pWorker, pMW->pLevel, 1, &p->apHier[iLevel]
  3369   3365         );
  3370   3366       }
  3371   3367       if( rc!=LSM_OK ) return rc;
  3372   3368   
  3373   3369       aData = fsPageData(p->apHier[iLevel], &nData);
  3374   3370       memset(aData, 0, nData);
  3375   3371       lsmPutU16(&aData[SEGMENT_FLAGS_OFFSET(nData)], SEGMENT_BTREE_FLAG);
................................................................................
  3561   3557     Pgno iFPtr                      /* Pointer value for footer of new page */
  3562   3558   ){
  3563   3559     int rc = LSM_OK;                /* Return code */
  3564   3560     Page *pNext = 0;                /* New page appended to run */
  3565   3561     lsm_db *pDb = pMW->pDb;         /* Database handle */
  3566   3562     Segment *pSeg;                  /* Run to append to */
  3567   3563   
  3568         -  pSeg = &pMW->pLevel->lhs;
  3569         -  rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pSeg, 0, &pNext);
  3570         -  assert( rc!=LSM_OK || pSeg->iFirst>0 || pMW->pDb->compress.xCompress );
         3564  +  rc = lsmFsSortedAppend(pDb->pFS, pDb->pWorker, pMW->pLevel, 0, &pNext);
         3565  +  assert( rc || pMW->pLevel->lhs.iFirst>0 || pMW->pDb->compress.xCompress );
  3571   3566   
  3572   3567     if( rc==LSM_OK ){
  3573   3568       u8 *aData;                    /* Data buffer belonging to page pNext */
  3574   3569       int nData;                    /* Size of aData[] in bytes */
  3575   3570   
  3576   3571       rc = mergeWorkerPersistAndRelease(pMW);
  3577   3572   
................................................................................
  4915   4910     while( rc==LSM_OK && sortedDbIsFull(pDb) ){
  4916   4911       rc = sortedWork(pDb, 256, pDb->nMerge, 1, 0);
  4917   4912     }
  4918   4913   
  4919   4914     if( rc==LSM_OK ){
  4920   4915       rc = sortedNewToplevel(pDb, TREE_BOTH, 0);
  4921   4916     }
         4917  +
  4922   4918     lsmFinishWork(pDb, 1, &rc);
  4923   4919     return rc;
  4924   4920   }
  4925   4921   
  4926   4922   /*
  4927   4923   ** Return a string representation of the segment passed as the only argument.
  4928   4924   ** Space for the returned string is allocated using lsmMalloc(), and should