Index: ext/wasm/api/sqlite3-api-oo1.js ================================================================== --- ext/wasm/api/sqlite3-api-oo1.js +++ ext/wasm/api/sqlite3-api-oo1.js @@ -198,13 +198,12 @@ If passed an object, any additional properties it has are copied as-is into the new object. */ dbCtorHelper.normalizeArgs = function(filename=':memory:',flags = 'c',vfs = null){ const arg = {}; - if(1===arguments.length && 'object'===typeof arguments[0]){ - const x = arguments[0]; - Object.keys(x).forEach((k)=>arg[k] = x[k]); + if(1===arguments.length && arguments[0] && 'object'===typeof arguments[0]){ + Object.assign(arg, arguments[0]); if(undefined===arg.flags) arg.flags = 'c'; if(undefined===arg.vfs) arg.vfs = null; if(undefined===arg.filename) arg.filename = ':memory:'; }else{ arg.filename = filename; Index: ext/wasm/api/sqlite3-api-opfs.js ================================================================== --- ext/wasm/api/sqlite3-api-opfs.js +++ ext/wasm/api/sqlite3-api-opfs.js @@ -346,15 +346,35 @@ 'SQLITE_LOCKED', 'SQLITE_MISUSE', 'SQLITE_NOTFOUND', 'SQLITE_OPEN_CREATE', 'SQLITE_OPEN_DELETEONCLOSE', + 'SQLITE_OPEN_MAIN_DB', 'SQLITE_OPEN_READONLY' ].forEach((k)=>{ if(undefined === (state.sq3Codes[k] = capi[k])){ toss("Maintenance required: not found:",k); } + }); + state.opfsFlags = Object.assign(Object.create(null),{ + /** + Flag for use with xOpen(). "opfs-unlock-asap=1" enables + this. See defaultUnlockAsap, below. + */ + OPFS_UNLOCK_ASAP: 0x01, + /** + If true, any async routine which implicitly acquires a sync + access handle (i.e. an OPFS lock) will release that locks at + the end of the call which acquires it. If false, such + "autolocks" are not released until the VFS is idle for some + brief amount of time. + + The benefit of enabling this is much higher concurrency. The + down-side is much-reduced performance (as much as a 4x decrease + in speedtest1). + */ + defaultUnlockAsap: false }); /** Runs the given operation (by name) in the async worker counterpart, waits for its response, and returns the result @@ -842,21 +862,27 @@ return 0; }, //xSleep is optionally defined below xOpen: function f(pVfs, zName, pFile, flags, pOutFlags){ mTimeStart('xOpen'); + let opfsFlags = 0; if(0===zName){ zName = randomFilename(); }else if('number'===typeof zName){ + if(capi.sqlite3_uri_boolean(zName, "opfs-unlock-asap", 0)){ + /* -----------------------^^^^^ MUST pass the untranslated + C-string here. */ + opfsFlags |= state.opfsFlags.OPFS_UNLOCK_ASAP; + } zName = wasm.cstringToJs(zName); } const fh = Object.create(null); fh.fid = pFile; fh.filename = zName; fh.sab = new SharedArrayBuffer(state.fileBufferSize); fh.flags = flags; - const rc = opRun('xOpen', pFile, zName, flags); + const rc = opRun('xOpen', pFile, zName, flags, opfsFlags); if(!rc){ /* Recall that sqlite3_vfs::xClose() will be called, even on error, unless pFile->pMethods is NULL. */ if(fh.readOnly){ wasm.setMemValue(pOutFlags, capi.SQLITE_OPEN_READONLY, 'i32'); @@ -1143,10 +1169,13 @@ doDir(opt.directory, 0); }; //TODO to support fiddle and worker1 db upload: //opfsUtil.createFile = function(absName, content=undefined){...} + //We have sqlite3.wasm.sqlite3_wasm_vfs_create_file() for this + //purpose but its interface and name are still under + //consideration. if(sqlite3.oo1){ opfsUtil.OpfsDb = function(...args){ const opt = sqlite3.oo1.DB.dbCtorHelper.normalizeArgs(...args); opt.vfs = opfsVfs.$zName; Index: ext/wasm/api/sqlite3-opfs-async-proxy.js ================================================================== --- ext/wasm/api/sqlite3-opfs-async-proxy.js +++ ext/wasm/api/sqlite3-opfs-async-proxy.js @@ -103,11 +103,11 @@ synchronous part of this constellation. Each value is an object with a structure demonstrated in the xOpen() impl. */ const __openFiles = Object.create(null); /** - __autoLocks is a Set of sqlite3_file pointers (integers) which were + __implicitLocks is a Set of sqlite3_file pointers (integers) which were "auto-locked". i.e. those for which we obtained a sync access handle without an explicit xLock() call. Such locks will be released during db connection idle time, whereas a sync access handle obtained via xLock(), or subsequently xLock()'d after auto-acquisition, will not be released until xUnlock() is called. @@ -115,11 +115,11 @@ Maintenance reminder: if we relinquish auto-locks at the end of the operation which acquires them, we pay a massive performance penalty: speedtest1 benchmarks take up to 4x as long. By delaying the lock release until idle time, the hit is negligible. */ -const __autoLocks = new Set(); +const __implicitLocks = new Set(); /** Expects an OPFS file path. It gets resolved, such that ".." components are properly expanded, and returned. If the 2nd arg is true, the result is returned as an array of path elements, else an @@ -164,11 +164,11 @@ if(fh.syncHandle){ log("Closing sync handle for",fh.filenameAbs); const h = fh.syncHandle; delete fh.syncHandle; delete fh.xLock; - __autoLocks.delete(fh.fid); + __implicitLocks.delete(fh.fid); return h.close(); } }; /** @@ -188,20 +188,34 @@ warn("closeSyncHandleNoThrow() ignoring:",e,fh); } }; /* Release all auto-locks. */ -const closeAutoLocks = async ()=>{ - if(__autoLocks.size){ +const releaseImplicitLocks = async ()=>{ + if(__implicitLocks.size){ /* Release all auto-locks. */ - for(const fid of __autoLocks){ + for(const fid of __implicitLocks){ const fh = __openFiles[fid]; await closeSyncHandleNoThrow(fh); log("Auto-unlocked",fid,fh.filenameAbs); } } }; + +/** + An experiment in improving concurrency by freeing up implicit locks + sooner. This is known to impact performance dramatically but it has + also shown to improve concurrency considerably. + + If fh.releaseImplicitLocks is truthy and fh is in __implicitLocks, + this routine returns closeSyncHandleNoThrow(), else it is a no-op. +*/ +const releaseImplicitLock = async (fh)=>{ + if(fh.releaseImplicitLocks && __implicitLocks.has(fh.fid)){ + return closeSyncHandleNoThrow(fh); + } +}; /** An error class specifically for use with getSyncHandle(), the goal of which is to eventually be able to distinguish unambiguously between locking-related failures and other types, noting that we @@ -244,11 +258,11 @@ if an exception is thrown while acquiring the handle, this routine will wait briefly and try again, up to 3 times. If acquisition still fails at that point it will give up and propagate the exception. */ -const getSyncHandle = async (fh)=>{ +const getSyncHandle = async (fh,opName)=>{ if(!fh.syncHandle){ const t = performance.now(); log("Acquiring sync handle for",fh.filenameAbs); const maxTries = 6, msBase = 300; let i = 1, ms = msBase; @@ -260,24 +274,25 @@ fh.syncHandle = await fh.fileHandle.createSyncAccessHandle(); break; }catch(e){ if(i === maxTries){ throw new GetSyncHandleError( - e, "Error getting sync handle.",maxTries, + e, "Error getting sync handle for",opName+"().",maxTries, "attempts failed.",fh.filenameAbs ); } - warn("Error getting sync handle. Waiting",ms, + warn("Error getting sync handle for",opName+"(). Waiting",ms, "ms and trying again.",fh.filenameAbs,e); - await closeAutoLocks(); + //await releaseImplicitLocks(); Atomics.wait(state.sabOPView, state.opIds.retry, 0, ms); } } - log("Got sync handle for",fh.filenameAbs,'in',performance.now() - t,'ms'); + log("Got",opName+"() sync handle for",fh.filenameAbs, + 'in',performance.now() - t,'ms'); if(!fh.xLock){ - __autoLocks.add(fh.fid); - log("Auto-locked",fh.fid,fh.filenameAbs); + __implicitLocks.add(fh.fid); + log("Auto-locked for",opName+"()",fh.fid,fh.filenameAbs); } } return fh.syncHandle; }; @@ -407,11 +422,11 @@ mTimeEnd(); }, xClose: async function(fid/*sqlite3_file pointer*/){ const opName = 'xClose'; mTimeStart(opName); - __autoLocks.delete(fid); + __implicitLocks.delete(fid); const fh = __openFiles[fid]; let rc = 0; wTimeStart(opName); if(fh){ delete __openFiles[fid]; @@ -472,17 +487,18 @@ const fh = __openFiles[fid]; let rc; wTimeStart('xFileSize'); try{ affirmLocked('xFileSize',fh); - const sz = await (await getSyncHandle(fh)).getSize(); + const sz = await (await getSyncHandle(fh,'xFileSize')).getSize(); state.s11n.serialize(Number(sz)); rc = 0; }catch(e){ state.s11n.storeException(2,e); rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR); } + await releaseImplicitLock(fh); wTimeEnd(); storeAndNotify('xFileSize', rc); mTimeEnd(); }, xLock: async function(fid/*sqlite3_file pointer*/, @@ -493,12 +509,12 @@ const oldLockType = fh.xLock; fh.xLock = lockType; if( !fh.syncHandle ){ wTimeStart('xLock'); try { - await getSyncHandle(fh); - __autoLocks.delete(fid); + await getSyncHandle(fh,'xLock'); + __implicitLocks.delete(fid); }catch(e){ state.s11n.storeException(1,e); rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_LOCK); fh.xLock = oldLockType; } @@ -506,14 +522,14 @@ } storeAndNotify('xLock',rc); mTimeEnd(); }, xOpen: async function(fid/*sqlite3_file pointer*/, filename, - flags/*SQLITE_OPEN_...*/){ + flags/*SQLITE_OPEN_...*/, + opfsFlags/*OPFS_...*/){ const opName = 'xOpen'; mTimeStart(opName); - const deleteOnClose = (state.sq3Codes.SQLITE_OPEN_DELETEONCLOSE & flags); const create = (state.sq3Codes.SQLITE_OPEN_CREATE & flags); wTimeStart('xOpen'); try{ let hDir, filenamePart; try { @@ -524,28 +540,40 @@ mTimeEnd(); wTimeEnd(); return; } const hFile = await hDir.getFileHandle(filenamePart, {create}); - /** - wa-sqlite, at this point, grabs a SyncAccessHandle and - assigns it to the syncHandle prop of the file state - object, but only for certain cases and it's unclear why it - places that limitation on it. - */ wTimeEnd(); - __openFiles[fid] = Object.assign(Object.create(null),{ + const fh = Object.assign(Object.create(null),{ fid: fid, filenameAbs: filename, filenamePart: filenamePart, dirHandle: hDir, fileHandle: hFile, sabView: state.sabFileBufView, readOnly: create ? false : (state.sq3Codes.SQLITE_OPEN_READONLY & flags), - deleteOnClose: deleteOnClose + deleteOnClose: !!(state.sq3Codes.SQLITE_OPEN_DELETEONCLOSE & flags) }); + fh.releaseImplicitLocks = + (opfsFlags & state.opfsFlags.OPFS_UNLOCK_ASAP) + || state.opfsFlags.defaultUnlockAsap; + if(0 /* this block is modelled after something wa-sqlite + does but it leads to immediate contention on journal files. */ + && (0===(flags & state.sq3Codes.SQLITE_OPEN_MAIN_DB))){ + /* sqlite does not lock these files, so go ahead and grab an OPFS + lock. + + https://www.sqlite.org/uri.html + */ + fh.xLock = "xOpen"/* Truthy value to keep entry from getting + flagged as auto-locked. String value so + that we can easily distinguish is later + if needed. */; + await getSyncHandle(fh,'xOpen'); + } + __openFiles[fid] = fh; storeAndNotify(opName, 0); }catch(e){ wTimeEnd(); error(opName,e); state.s11n.storeException(1,e); @@ -558,11 +586,11 @@ let rc = 0, nRead; const fh = __openFiles[fid]; try{ affirmLocked('xRead',fh); wTimeStart('xRead'); - nRead = (await getSyncHandle(fh)).read( + nRead = (await getSyncHandle(fh,'xRead')).read( fh.sabView.subarray(0, n), {at: Number(offset64)} ); wTimeEnd(); if(nRead < n){/* Zero-fill remaining bytes */ @@ -573,10 +601,11 @@ if(undefined===nRead) wTimeEnd(); error("xRead() failed",e,fh); state.s11n.storeException(1,e); rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_READ); } + await releaseImplicitLock(fh); storeAndNotify('xRead',rc); mTimeEnd(); }, xSync: async function(fid/*sqlite3_file pointer*/,flags/*ignored*/){ mTimeStart('xSync'); @@ -601,16 +630,17 @@ const fh = __openFiles[fid]; wTimeStart('xTruncate'); try{ affirmLocked('xTruncate',fh); affirmNotRO('xTruncate', fh); - await (await getSyncHandle(fh)).truncate(size); + await (await getSyncHandle(fh,'xTruncate')).truncate(size); }catch(e){ error("xTruncate():",e,fh); state.s11n.storeException(2,e); rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_TRUNCATE); } + await releaseImplicitLock(fh); wTimeEnd(); storeAndNotify('xTruncate',rc); mTimeEnd(); }, xUnlock: async function(fid/*sqlite3_file pointer*/, @@ -638,19 +668,20 @@ wTimeStart('xWrite'); try{ affirmLocked('xWrite',fh); affirmNotRO('xWrite', fh); rc = ( - n === (await getSyncHandle(fh)) + n === (await getSyncHandle(fh,'xWrite')) .write(fh.sabView.subarray(0, n), {at: Number(offset64)}) ) ? 0 : state.sq3Codes.SQLITE_IOERR_WRITE; }catch(e){ error("xWrite():",e,fh); state.s11n.storeException(1,e); rc = GetSyncHandleError.convertRc(e,state.sq3Codes.SQLITE_IOERR_WRITE); } + await releaseImplicitLock(fh); wTimeEnd(); storeAndNotify('xWrite',rc); mTimeEnd(); } }/*vfsAsyncImpls*/; @@ -775,17 +806,17 @@ waitTime is how long (ms) to wait for each Atomics.wait(). We need to wake up periodically to give the thread a chance to do other things. If this is too high (e.g. 500ms) then even two workers/tabs can easily run into locking errors. */ - const waitTime = 150; + const waitTime = 100; while(!flagAsyncShutdown){ try { if('timed-out'===Atomics.wait( state.sabOPView, state.opIds.whichOp, 0, waitTime )){ - await closeAutoLocks(); + await releaseImplicitLocks(); continue; } const opId = Atomics.load(state.sabOPView, state.opIds.whichOp); Atomics.store(state.sabOPView, state.opIds.whichOp, 0); const hnd = opHandlers[opId] ?? toss("No waitLoop handler for whichOp #",opId); @@ -822,10 +853,11 @@ state.sabIO = opt.sabIO; state.sabFileBufView = new Uint8Array(state.sabIO, 0, state.fileBufferSize); state.sabS11nView = new Uint8Array(state.sabIO, state.sabS11nOffset, state.sabS11nSize); state.opIds = opt.opIds; state.sq3Codes = opt.sq3Codes; + state.opfsFlags = opt.opfsFlags; Object.keys(vfsAsyncImpls).forEach((k)=>{ if(!Number.isFinite(state.opIds[k])){ toss("Maintenance required: missing state.opIds[",k,"]"); } }); Index: ext/wasm/tests/opfs/concurrency/index.html ================================================================== --- ext/wasm/tests/opfs/concurrency/index.html +++ ext/wasm/tests/opfs/concurrency/index.html @@ -22,14 +22,17 @@ and timing/concurrency mitigation in this environment is highly unpredictable!
URL flags: pass a number of workers using
- the workers=N
URL flag and the worker work interval
- as interval=N
(milliseconds). Enable OPFS VFS
- verbosity with verbose=1-3
(output goes to the
- dev console).
+ the workers=N
URL flag. Set the time between each
+ workload with interval=N
(milliseconds). Set the
+ number of worker iterations with iterations=N
.
+ Enable OPFS VFS verbosity with verbose=1-3
(output
+ goes to the dev console). Enable/disable "unlock ASAP" mode
+ (higher concurrency, lower speed)
+ with unlock-asap=0-1
.
Achtung: if it does not start to do anything within a couple of seconds, check the dev console: Chrome often fails with "cannot allocate WasmMemory" at startup. Closing and re-opening the tab usually resolves it. Index: ext/wasm/tests/opfs/concurrency/test.js ================================================================== --- ext/wasm/tests/opfs/concurrency/test.js +++ ext/wasm/tests/opfs/concurrency/test.js @@ -54,52 +54,74 @@ const urlArgsHtml = new URL(self.location.href).searchParams; const options = Object.create(null); options.sqlite3Dir = urlArgsJs.get('sqlite3.dir'); options.workerCount = ( urlArgsHtml.has('workers') ? +urlArgsHtml.get('workers') : 3 - ) || 3; + ) || 4; options.opfsVerbose = ( urlArgsHtml.has('verbose') ? +urlArgsHtml.get('verbose') : 1 ) || 1; options.interval = ( urlArgsHtml.has('interval') ? +urlArgsHtml.get('interval') : 750 - ) || 750; + ) || 1000; + options.iterations = ( + urlArgsHtml.has('iterations') ? +urlArgsHtml.get('iterations') : 10 + ) || 10; + options.unlockAsap = ( + urlArgsHtml.has('unlock-asap') ? +urlArgsHtml.get('unlock-asap') : 0 + ) || 0; const workers = []; workers.post = (type,...args)=>{ for(const w of workers) w.postMessage({type, payload:args}); }; - workers.loadedCount = 0; + workers.counts = {loaded: 0, passed: 0, failed: 0}; + const checkFinished = function(){ + if(workers.counts.passed + workers.counts.failed !== workers.length){ + return; + } + if(workers.counts.failed>0){ + logCss('tests-fail',"Finished with",workers.counts.failed,"failure(s)."); + }else{ + logCss('tests-pass',"All",workers.length,"workers finished."); + } + }; workers.onmessage = function(msg){ msg = msg.data; const prefix = 'Worker #'+msg.worker+':'; switch(msg.type){ case 'loaded': stdout(prefix,"loaded"); - if(++workers.loadedCount === workers.length){ - stdout("All workers loaded. Telling them to run..."); + if(++workers.counts.loaded === workers.length){ + stdout("All",workers.length,"workers loaded. Telling them to run..."); workers.post('run'); } break; case 'stdout': stdout(prefix,...msg.payload); break; case 'stderr': stderr(prefix,...msg.payload); break; case 'error': stderr(prefix,"ERROR:",...msg.payload); break; case 'finished': + ++workers.counts.passed; logCss('tests-pass',prefix,...msg.payload); + checkFinished(); break; case 'failed': + ++workers.counts.failed; logCss('tests-fail',prefix,"FAILED:",...msg.payload); + checkFinished(); break; default: logCss('error',"Unhandled message type:",msg); break; } }; - stdout("Launching",options.workerCount,"workers..."); + stdout("Launching",options.workerCount,"workers. Options:",options); workers.uri = ( 'worker.js?' + 'sqlite3.dir='+options.sqlite3Dir + '&interval='+options.interval + + '&iterations='+options.iterations + '&opfs-verbose='+options.opfsVerbose + + '&opfs-unlock-asap='+options.unlockAsap ); for(let i = 0; i < options.workerCount; ++i){ stdout("Launching worker..."); workers.push(new Worker( workers.uri+'&workerId='+(i+1)+(i ? '' : '&unlink-db') Index: ext/wasm/tests/opfs/concurrency/worker.js ================================================================== --- ext/wasm/tests/opfs/concurrency/worker.js +++ ext/wasm/tests/opfs/concurrency/worker.js @@ -1,13 +1,16 @@ importScripts( (new URL(self.location.href).searchParams).get('sqlite3.dir') + '/sqlite3.js' ); self.sqlite3InitModule().then(async function(sqlite3){ const urlArgs = new URL(self.location.href).searchParams; - const wName = urlArgs.get('workerId') || Math.round(Math.random()*10000); + const options = { + workerName: urlArgs.get('workerId') || Math.round(Math.random()*10000), + unlockAsap: urlArgs.get('opfs-unlock-asap') || 0 /*EXPERIMENTAL*/ + }; const wPost = (type,...payload)=>{ - postMessage({type, worker: wName, payload}); + postMessage({type, worker: options.workerName, payload}); }; const stdout = (...args)=>wPost('stdout',...args); const stderr = (...args)=>wPost('stderr',...args); if(!sqlite3.opfs){ stderr("OPFS support not detected. Aborting."); @@ -41,30 +44,34 @@ }else{ wPost('finished',"Ending work after",interval.count,"intervals."); } }; const run = async function(){ - db = new sqlite3.opfs.OpfsDb(dbName,'c'); + db = new sqlite3.opfs.OpfsDb({ + filename: 'file:'+dbName+'?opfs-unlock-asap='+options.unlockAsap, + flags: 'c' + }); sqlite3.capi.sqlite3_busy_timeout(db.pointer, 5000); db.transaction((db)=>{ db.exec([ "create table if not exists t1(w TEXT UNIQUE ON CONFLICT REPLACE,v);", "create table if not exists t2(w TEXT UNIQUE ON CONFLICT REPLACE,v);" ]); }); - const maxIterations = 10; + const maxIterations = + urlArgs.has('iterations') ? (+urlArgs.get('iterations') || 10) : 10; stdout("Starting interval-based db updates with delay of",interval.delay,"ms."); const doWork = async ()=>{ const tm = new Date().getTime(); ++interval.count; const prefix = "v(#"+interval.count+")"; stdout("Setting",prefix,"=",tm); try{ db.exec({ sql:"INSERT OR REPLACE INTO t1(w,v) VALUES(?,?)", - bind: [wName, new Date().getTime()] + bind: [options.workerName, new Date().getTime()] }); //stdout("Set",prefix); }catch(e){ interval.error = e; }