Index: src/log.c ================================================================== --- src/log.c +++ src/log.c @@ -200,10 +200,18 @@ #define LOG_REGION_A 0x01 #define LOG_REGION_B 0x02 #define LOG_REGION_C 0x04 #define LOG_REGION_D 0x08 +/* +** Values for the third parameter to logLockRegion(). +*/ +#define LOG_UNLOCK 0 /* Unlock a range of bytes */ +#define LOG_RDLOCK 1 /* Put a SHARED lock on a range of bytes */ +#define LOG_WRLOCK 2 /* Put an EXCLUSIVE lock on a byte-range */ +#define LOG_WRLOCKW 3 /* Block on EXCLUSIVE lock on a byte-range */ + /* ** A single instance of this structure is allocated as part of each ** connection to a database log. All structures associated with the ** same log file are linked together into a list using LogLock.pNext ** starting at LogSummary.pLock. @@ -223,10 +231,11 @@ LogSummary *pSummary; /* Log file summary data */ sqlite3_vfs *pVfs; /* The VFS used to create pFd */ sqlite3_file *pFd; /* File handle for log file */ int isLocked; /* Non-zero if a snapshot is held open */ int isWriteLocked; /* True if this is the writer connection */ + u32 iCallback; /* Value to pass to log callback (or 0) */ LogSummaryHdr hdr; /* Log summary header for current snapshot */ LogLock lock; /* Lock held by this connection (if any) */ }; @@ -645,33 +654,33 @@ logSummaryWriteHdr(pSummary, &hdr); return rc; } /* -** Values for the third parameter to logLockRegion(). +** Place, modify or remove a lock on the log-summary file associated +** with pSummary. */ -#define LOG_UNLOCK 0 -#define LOG_RDLOCK 1 -#define LOG_WRLOCK 2 -#define LOG_WRLOCKW 3 - -static int logLockFd(LogSummary *pSummary, int iStart, int nByte, int op){ +static int logLockFd( + LogSummary *pSummary, /* The log-summary object to lock */ + int iStart, /* First byte to lock */ + int nByte, /* Number of bytes to lock */ + int op /* LOG_UNLOCK, RDLOCK, WRLOCK or WRLOCKW */ +){ int aType[4] = { - F_UNLCK, /* LOG_UNLOCK */ - F_RDLCK, /* LOG_RDLOCK */ - F_WRLCK, /* LOG_WRLOCK */ - F_WRLCK /* LOG_WRLOCKW */ + F_UNLCK, /* LOG_UNLOCK */ + F_RDLCK, /* LOG_RDLOCK */ + F_WRLCK, /* LOG_WRLOCK */ + F_WRLCK /* LOG_WRLOCKW */ }; int aOp[4] = { - F_SETLK, /* LOG_UNLOCK */ - F_SETLK, /* LOG_RDLOCK */ - F_SETLK, /* LOG_WRLOCK */ - F_SETLKW /* LOG_WRLOCKW */ + F_SETLK, /* LOG_UNLOCK */ + F_SETLK, /* LOG_RDLOCK */ + F_SETLK, /* LOG_WRLOCK */ + F_SETLKW /* LOG_WRLOCKW */ }; - - struct flock f; /* Locking operation */ - int rc; /* Value returned by fcntl() */ + struct flock f; /* Locking operation */ + int rc; /* Value returned by fcntl() */ assert( ArraySize(aType)==ArraySize(aOp) ); assert( op>=0 && oplock.mLock = mNew; sqlite3_mutex_leave(pSummary->mutex); return SQLITE_OK; } +/* +** Lock the DMH region, either with an EXCLUSIVE or SHARED lock. This +** function is never called with LOG_UNLOCK - the only way the DMH region +** is every completely unlocked is by by closing the file descriptor. +*/ static int logLockDMH(LogSummary *pSummary, int eLock){ + assert( sqlite3_mutex_held(pSummary->mutex) ); assert( eLock==LOG_RDLOCK || eLock==LOG_WRLOCK ); return logLockFd(pSummary, LOG_LOCK_DMH, 1, eLock); } +/* +** Lock (or unlock) the MUTEX region. It is always locked using an +** EXCLUSIVE, blocking lock. +*/ static int logLockMutex(LogSummary *pSummary, int eLock){ + assert( sqlite3_mutex_held(pSummary->mutex) ); assert( eLock==LOG_WRLOCKW || eLock==LOG_UNLOCK ); logLockFd(pSummary, LOG_LOCK_MUTEX, 1, eLock); return SQLITE_OK; } - - /* ** This function intializes the connection to the log-summary identified ** by struct pSummary. */ @@ -878,11 +896,11 @@ if( rc==SQLITE_OK ){ memset(pSummary->aData, 0, (LOGSUMMARY_HDR_NFIELD+2)*sizeof(u32) ); } rc = logLockDMH(pSummary, LOG_RDLOCK); if( rc!=SQLITE_OK ){ - return SQLITE_IOERR; + rc = SQLITE_IOERR; } out: logLockMutex(pSummary, LOG_UNLOCK); return rc; @@ -1459,11 +1477,11 @@ /* ** Set *pPgno to the size of the database file (or zero, if unknown). */ -void sqlite3LogMaxpgno(Log *pLog, Pgno *pPgno){ +void sqlite3LogDbsize(Log *pLog, Pgno *pPgno){ assert( pLog->isLocked ); *pPgno = pLog->hdr.nPage; } /* @@ -1644,13 +1662,14 @@ /* If this is a commit, update the log-summary header too. */ if( isCommit && SQLITE_OK==(rc = logEnterMutex(pLog)) ){ logSummaryWriteHdr(pLog->pSummary, &pLog->hdr); logLeaveMutex(pLog); + pLog->iCallback = iFrame; } - return SQLITE_OK; + return rc; } /* ** Checkpoint the database: ** @@ -1695,6 +1714,15 @@ /* Release the locks. */ logLockRegion(pLog, LOG_REGION_A|LOG_REGION_B|LOG_REGION_C, LOG_UNLOCK); return rc; } + +int sqlite3LogCallback(Log *pLog){ + u32 ret = 0; + if( pLog ){ + ret = pLog->iCallback; + pLog->iCallback = 0; + } + return (int)ret; +} Index: src/log.h ================================================================== --- src/log.h +++ src/log.h @@ -30,11 +30,11 @@ int sqlite3LogOpenSnapshot(Log *pLog, int *); void sqlite3LogCloseSnapshot(Log *pLog); /* Read a page from the log, if it is present. */ int sqlite3LogRead(Log *pLog, Pgno pgno, int *pInLog, u8 *pOut); -void sqlite3LogMaxpgno(Log *pLog, Pgno *pPgno); +void sqlite3LogDbsize(Log *pLog, Pgno *pPgno); /* Obtain or release the WRITER lock. */ int sqlite3LogWriteLock(Log *pLog, int op); /* Write a frame or frames to the log. */ @@ -47,7 +47,10 @@ int sync_flags, /* Flags to sync db file with (or 0) */ u8 *zBuf, /* Temporary buffer to use */ int (*xBusyHandler)(void *), /* Pointer to busy-handler function */ void *pBusyHandlerArg /* Argument to pass to xBusyHandler */ ); + +/* Return the value to pass to a log callback. Or 0 for no callback. */ +int sqlite3LogCallback(Log *pLog); #endif /* _LOG_H_ */ Index: src/main.c ================================================================== --- src/main.c +++ src/main.c @@ -1183,10 +1183,28 @@ db->xRollbackCallback = xCallback; db->pRollbackArg = pArg; sqlite3_mutex_leave(db->mutex); return pRet; } + +/* +** Register a callback to be invoked each time a transaction is written +** into the write-ahead-log by this database connection. +*/ +void *sqlite3_log_hook( + sqlite3 *db, /* Attach the hook to this db handle */ + int(*xCallback)(void *, sqlite3*, const char*, int), + void *pArg /* First argument passed to xCallback() */ +){ + void *pRet; + sqlite3_mutex_enter(db->mutex); + pRet = db->pLogArg; + db->xLogCallback = xCallback; + db->pLogArg = pArg; + sqlite3_mutex_leave(db->mutex); + return pRet; +} /* ** This function returns true if main-memory should be used instead of ** a temporary file for transient pager files and statement journals. ** The value returned depends on the value of db->temp_store (runtime Index: src/pager.c ================================================================== --- src/pager.c +++ src/pager.c @@ -2684,11 +2684,11 @@ }else{ int rc; /* Error returned by OsFileSize() */ i64 n = 0; /* File size in bytes returned by OsFileSize() */ if( pagerUseLog(pPager) ){ - sqlite3LogMaxpgno(pPager->pLog, &nPage); + sqlite3LogDbsize(pPager->pLog, &nPage); } if( nPage==0 ){ assert( isOpen(pPager->fd) || pPager->tempFile ); if( isOpen(pPager->fd) ){ @@ -5706,7 +5706,11 @@ zBuf, pPager->xBusyHandler, pPager->pBusyHandlerArg ); } return rc; } + +int sqlite3PagerLogCallback(Pager *pPager){ + return sqlite3LogCallback(pPager->pLog); +} #endif /* SQLITE_OMIT_DISKIO */ Index: src/pager.h ================================================================== --- src/pager.h +++ src/pager.h @@ -131,11 +131,13 @@ int sqlite3PagerCommitPhaseTwo(Pager*); int sqlite3PagerRollback(Pager*); int sqlite3PagerOpenSavepoint(Pager *pPager, int n); int sqlite3PagerSavepoint(Pager *pPager, int op, int iSavepoint); int sqlite3PagerSharedLock(Pager *pPager); + int sqlite3PagerCheckpoint(Pager *pPager); +int sqlite3PagerLogCallback(Pager *pPager); /* Functions used to query pager state and configuration. */ u8 sqlite3PagerIsreadonly(Pager*); int sqlite3PagerRefcount(Pager*); int sqlite3PagerMemUsed(Pager*); Index: src/sqlite.h.in ================================================================== --- src/sqlite.h.in +++ src/sqlite.h.in @@ -5723,10 +5723,44 @@ ** a fixed-length buffer on the stack. If the log message is longer than ** a few hundred characters, it will be truncated to the length of the ** buffer. */ void sqlite3_log(int iErrCode, const char *zFormat, ...); + +/* +** Experimental WAL callback interface. +** +** The [sqlite3_log_hook()] function is used to register a callback that +** will be invoked each time a database connection commits data to a +** write-ahead-log (i.e. whenever a transaction is committed in +** journal_mode=WAL mode). +** +** The callback is invoked by SQLite after the commit has taken place and +** the associated write-lock on the database released, so the implementation +** may read, write or checkpoint the database as required. +** +** The first parameter passed to the callback function when it is invoked +** is a copy of the third parameter passed to sqlite3_log_hook() when +** registering the callback. The second is a copy of the database handle. +** The third parameter is the name of the database that was written to - +** either "main" or the name of an ATTACHed database. The fourth parameter +** is the number of pages currently in the log file, including those that +** were just committed. +** +** If an invocation of the callback function returns non-zero, then a +** checkpoint is automatically run on the database. If zero is returned, +** no special action is taken. +** +** A single database handle may have at most a single log callback +** registered at one time. Calling [sqlite3_log_hook()] replaces any +** previously registered log callback. +*/ +void *sqlite3_log_hook( + sqlite3*, + int(*)(void *,sqlite3*,const char*,int), + void* +); /* ** Undo the hack that converts floating point types to integer for ** builds on processors without floating point support. */ Index: src/sqliteInt.h ================================================================== --- src/sqliteInt.h +++ src/sqliteInt.h @@ -821,10 +821,12 @@ int (*xCommitCallback)(void*); /* Invoked at every commit. */ void *pRollbackArg; /* Argument to xRollbackCallback() */ void (*xRollbackCallback)(void*); /* Invoked at every commit. */ void *pUpdateArg; void (*xUpdateCallback)(void*,int, const char*,const char*,sqlite_int64); + int (*xLogCallback)(void *, sqlite3 *, const char *, u32); + void *pLogArg; void(*xCollNeeded)(void*,sqlite3*,int eTextRep,const char*); void(*xCollNeeded16)(void*,sqlite3*,int eTextRep,const void*); void *pCollNeededArg; sqlite3_value *pErr; /* Most recent error message */ char *zErrMsg; /* Most recent error message (UTF-8 encoded) */ Index: src/tclsqlite.c ================================================================== --- src/tclsqlite.c +++ src/tclsqlite.c @@ -121,10 +121,11 @@ int disableAuth; /* Disable the authorizer if it exists */ char *zNull; /* Text to substitute for an SQL NULL value */ SqlFunc *pFunc; /* List of SQL functions */ Tcl_Obj *pUpdateHook; /* Update hook script (if any) */ Tcl_Obj *pRollbackHook; /* Rollback hook script (if any) */ + Tcl_Obj *pLogHook; /* WAL hook script (if any) */ Tcl_Obj *pUnlockNotify; /* Unlock notify script (if any) */ SqlCollate *pCollate; /* List of SQL collation functions */ int rc; /* Return code of most recent sqlite3_exec() */ Tcl_Obj *pCollateNeeded; /* Collation needed script */ SqlPreparedStmt *stmtList; /* List of prepared statements*/ @@ -483,10 +484,13 @@ Tcl_DecrRefCount(pDb->pUpdateHook); } if( pDb->pRollbackHook ){ Tcl_DecrRefCount(pDb->pRollbackHook); } + if( pDb->pLogHook ){ + Tcl_DecrRefCount(pDb->pLogHook); + } if( pDb->pCollateNeeded ){ Tcl_DecrRefCount(pDb->pCollateNeeded); } Tcl_Free((char*)pDb); } @@ -586,10 +590,36 @@ assert(pDb->pRollbackHook); if( TCL_OK!=Tcl_EvalObjEx(pDb->interp, pDb->pRollbackHook, 0) ){ Tcl_BackgroundError(pDb->interp); } } + +static int DbLogHandler( + void *clientData, + sqlite3 *db, + const char *zDb, + int nEntry +){ + int ret = 0; + Tcl_Obj *p; + SqliteDb *pDb = (SqliteDb*)clientData; + Tcl_Interp *interp = pDb->interp; + assert(pDb->pLogHook); + + p = Tcl_DuplicateObj(pDb->pLogHook); + Tcl_IncrRefCount(p); + Tcl_ListObjAppendElement(interp, p, Tcl_NewStringObj(zDb, -1)); + Tcl_ListObjAppendElement(interp, p, Tcl_NewIntObj(nEntry)); + if( TCL_OK!=Tcl_EvalObjEx(interp, p, 0) + || TCL_OK!=Tcl_GetIntFromObj(interp, Tcl_GetObjResult(interp), &ret) + ){ + Tcl_BackgroundError(interp); + } + Tcl_DecrRefCount(p); + + return ret; +} #if defined(SQLITE_TEST) && defined(SQLITE_ENABLE_UNLOCK_NOTIFY) static void setTestUnlockNotifyVars(Tcl_Interp *interp, int iArg, int nArg){ char zBuf[64]; sprintf(zBuf, "%d", iArg); @@ -1538,30 +1568,30 @@ "cache", "changes", "close", "collate", "collation_needed", "commit_hook", "complete", "copy", "enable_load_extension", "errorcode", "eval", "exists", "function", "incrblob", "interrupt", - "last_insert_rowid", "nullvalue", "onecolumn", - "profile", "progress", "rekey", - "restore", "rollback_hook", "status", - "timeout", "total_changes", "trace", - "transaction", "unlock_notify", "update_hook", - "version", 0 + "last_insert_rowid", "log_hook", "nullvalue", + "onecolumn", "profile", "progress", + "rekey", "restore", "rollback_hook", + "status", "timeout", "total_changes", + "trace", "transaction", "unlock_notify", + "update_hook", "version", 0 }; enum DB_enum { DB_AUTHORIZER, DB_BACKUP, DB_BUSY, DB_CACHE, DB_CHANGES, DB_CLOSE, DB_COLLATE, DB_COLLATION_NEEDED, DB_COMMIT_HOOK, DB_COMPLETE, DB_COPY, DB_ENABLE_LOAD_EXTENSION, DB_ERRORCODE, DB_EVAL, DB_EXISTS, DB_FUNCTION, DB_INCRBLOB, DB_INTERRUPT, - DB_LAST_INSERT_ROWID, DB_NULLVALUE, DB_ONECOLUMN, - DB_PROFILE, DB_PROGRESS, DB_REKEY, - DB_RESTORE, DB_ROLLBACK_HOOK, DB_STATUS, - DB_TIMEOUT, DB_TOTAL_CHANGES, DB_TRACE, - DB_TRANSACTION, DB_UNLOCK_NOTIFY, DB_UPDATE_HOOK, - DB_VERSION, + DB_LAST_INSERT_ROWID, DB_LOG_HOOK, DB_NULLVALUE, + DB_ONECOLUMN, DB_PROFILE, DB_PROGRESS, + DB_REKEY, DB_RESTORE, DB_ROLLBACK_HOOK, + DB_STATUS, DB_TIMEOUT, DB_TOTAL_CHANGES, + DB_TRACE, DB_TRANSACTION, DB_UNLOCK_NOTIFY, + DB_UPDATE_HOOK, DB_VERSION }; /* don't leave trailing commas on DB_enum, it confuses the AIX xlc compiler */ if( objc<2 ){ Tcl_WrongNumArgs(interp, 1, objv, "SUBCOMMAND ..."); @@ -2728,22 +2758,26 @@ #endif break; } /* + ** $db log_hook ?script? ** $db update_hook ?script? ** $db rollback_hook ?script? */ + case DB_LOG_HOOK: case DB_UPDATE_HOOK: case DB_ROLLBACK_HOOK: { /* set ppHook to point at pUpdateHook or pRollbackHook, depending on ** whether [$db update_hook] or [$db rollback_hook] was invoked. */ Tcl_Obj **ppHook; if( choice==DB_UPDATE_HOOK ){ ppHook = &pDb->pUpdateHook; + }else if( choice==DB_LOG_HOOK ){ + ppHook = &pDb->pLogHook; }else{ ppHook = &pDb->pRollbackHook; } if( objc!=2 && objc!=3 ){ @@ -2765,10 +2799,11 @@ } } sqlite3_update_hook(pDb->db, (pDb->pUpdateHook?DbUpdateHandler:0), pDb); sqlite3_rollback_hook(pDb->db,(pDb->pRollbackHook?DbRollbackHandler:0),pDb); + sqlite3_log_hook(pDb->db,(pDb->pLogHook?DbLogHandler:0),pDb); break; } /* $db version Index: src/vdbeapi.c ================================================================== --- src/vdbeapi.c +++ src/vdbeapi.c @@ -303,10 +303,27 @@ assert( sqlite3_mutex_held(pCtx->s.db->mutex) ); sqlite3VdbeMemSetNull(&pCtx->s); pCtx->isError = SQLITE_NOMEM; pCtx->s.db->mallocFailed = 1; } + +static int doLogCallbacks(sqlite3 *db){ + int i; + int rc = SQLITE_OK; + for(i=0; inDb; i++){ + Btree *pBt = db->aDb[i].pBt; + if( pBt ){ + int nEntry = sqlite3PagerLogCallback(sqlite3BtreePager(pBt)); + if( db->xLogCallback && nEntry>0 && rc==SQLITE_OK + && db->xLogCallback(db->pLogArg, db, db->aDb[i].zName, nEntry) + ){ + rc = sqlite3PagerCheckpoint(sqlite3BtreePager(pBt)); + } + } + } + return rc; +} /* ** Execute the statement pStmt, either until a row of data is ready, the ** statement is completely executed or an error occurs. ** @@ -384,10 +401,18 @@ elapseTime = (u64)((rNow - (int)rNow)*3600.0*24.0*1000000000.0); elapseTime -= p->startTime; db->xProfile(db->pProfileArg, p->zSql, elapseTime); } #endif + + if( rc==SQLITE_DONE ){ + assert( p->rc==SQLITE_OK ); + p->rc = doLogCallbacks(db); + if( p->rc!=SQLITE_OK ){ + rc = SQLITE_ERROR; + } + } db->errCode = rc; if( SQLITE_NOMEM==sqlite3ApiExit(p->db, p->rc) ){ p->rc = SQLITE_NOMEM; } Index: src/vdbeaux.c ================================================================== --- src/vdbeaux.c +++ src/vdbeaux.c @@ -1649,11 +1649,11 @@ ** (b) how many database files have open write transactions, not ** including the temp database. (b) is important because if more than ** one database file has an open write transaction, a master journal ** file is required for an atomic commit. */ - for(i=0; inDb; i++){ + for(i=0; inDb; i++){ Btree *pBt = db->aDb[i].pBt; if( sqlite3BtreeIsInTrans(pBt) ){ needXcommit = 1; if( i!=1 ) nTrans++; } Index: test/tclsqlite.test ================================================================== --- test/tclsqlite.test +++ test/tclsqlite.test @@ -33,11 +33,11 @@ lappend v $msg } [list 1 "wrong # args: should be \"$r\""] do_test tcl-1.2 { set v [catch {db bogus} msg] lappend v $msg -} {1 {bad option "bogus": must be authorizer, backup, busy, cache, changes, close, collate, collation_needed, commit_hook, complete, copy, enable_load_extension, errorcode, eval, exists, function, incrblob, interrupt, last_insert_rowid, nullvalue, onecolumn, profile, progress, rekey, restore, rollback_hook, status, timeout, total_changes, trace, transaction, unlock_notify, update_hook, or version}} +} {1 {bad option "bogus": must be authorizer, backup, busy, cache, changes, close, collate, collation_needed, commit_hook, complete, copy, enable_load_extension, errorcode, eval, exists, function, incrblob, interrupt, last_insert_rowid, log_hook, nullvalue, onecolumn, profile, progress, rekey, restore, rollback_hook, status, timeout, total_changes, trace, transaction, unlock_notify, update_hook, or version}} do_test tcl-1.2.1 { set v [catch {db cache bogus} msg] lappend v $msg } {1 {bad option "bogus": must be flush or size}} do_test tcl-1.2.2 { ADDED test/walhook.test Index: test/walhook.test ================================================================== --- /dev/null +++ test/walhook.test @@ -0,0 +1,73 @@ +# 2010 April 19 +# +# The author disclaims copyright to this source code. In place of +# a legal notice, here is a blessing: +# +# May you do good and not evil. +# May you find forgiveness for yourself and forgive others. +# May you share freely, never taking more than you give. +# +#*********************************************************************** +# This file implements regression tests for SQLite library. The +# focus of this file is testing the operation of the library in +# "PRAGMA journal_mode=WAL" mode. +# + +set testdir [file dirname $argv0] +source $testdir/tester.tcl + +proc sqlite3_wal {args} { + eval sqlite3 $args + [lindex $args 0] eval { + PRAGMA journal_mode = wal; + PRAGMA synchronous = normal; + PRAGMA page_size = 1024; + } +} +sqlite3_wal db test.db +db log_hook log_hook + +set ::log_hook [list] +proc log_hook {zDb nEntry} { + lappend ::log_hook $zDb $nEntry + return 0 +} + +do_test walhook-1.1 { + execsql { CREATE TABLE t1(i PRIMARY KEY, j) } + set ::log_hook +} {main 3} +do_test walhook-1.2 { + set ::log_hook [list] + execsql { INSERT INTO t1 VALUES(1, 'one') } + set ::log_hook +} {main 5} +do_test walhook-1.3 { + proc log_hook {args} { return 1 } + execsql { INSERT INTO t1 VALUES(2, 'two') } + file size test.db +} [expr 3*1024] + +do_test walhook-1.4 { + proc log_hook {zDb nEntry} { + execsql { PRAGMA checkpoint } + return 0 + } + execsql { CREATE TABLE t2(a, b) } + file size test.db +} [expr 4*1024] + +do_test walhook-1.5 { + sqlite3_wal db2 test.db + proc log_hook {zDb nEntry} { + execsql { PRAGMA checkpoint } db2 + return 0 + } + execsql { CREATE TABLE t3(a PRIMARY KEY, b) } + file size test.db +} [expr 6*1024] + +catch { db2 close } +catch { db close } +finish_test +