From a55c1c53dff8a96f061b06762d01926e5c98129b Mon Sep 17 00:00:00 2001 From: Matt Taylor Date: Tue, 29 Oct 2024 00:38:17 +0000 Subject: [PATCH] Store txn in AsyncLocalStorage context --- store.js | 178 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 104 insertions(+), 74 deletions(-) diff --git a/store.js b/store.js index 8dfcbe336..65de60d5b 100644 --- a/store.js +++ b/store.js @@ -1,6 +1,7 @@ -const { pack, unpack } = require('msgpackr') -const { mkdirSync } = require('fs') -const { Cursor, Env } = require('node-gyp-build')(__dirname) +const { pack, unpack } = require('msgpackr') +const { mkdirSync } = require('fs') +const { Cursor, Env } = require('node-gyp-build')(__dirname) +const { AsyncLocalStorage } = require('async_hooks') function asBinary(buffer) { return { @@ -69,10 +70,10 @@ class Iterator { } /** - * This class enforces a very specific usage of lmdb. All keys are utf8 encoded - * buffers and all values are encoded with msgpack. Transaction handling is also - * intentionally simplified to just one current transaction, no nesting, with - * the assumption that there is only one active client. This is the pattern that + * This class enforces a very specific usage of lmdb. All keys are + * utf8 encoded buffers and all values are encoded with + * msgpack. Transaction handling is also intentionally simplified to + * just one current transaction, no nesting. This is the pattern that * adset-consumer uses in its current form. */ class Store { @@ -80,127 +81,156 @@ class Store { // of the Store instance - Engines will need this // TODO: maybe refactor to split env/dbi params // TODO: support additional dbis + + /** + * @param {object} options + * @param {boolean} options.create + * @param {number} options.mapSize + * @param {string} options.name + * @param {boolean} options.noReadAhead + * @param {string} options.path + * @param {AsyncLocalStorage} context + */ constructor({ create = false, mapSize, name, noReadAhead = false, path - }) { + }, context) { this.env = new Env() this.env.open({ path, mapSize, noReadAhead }) this.dbi = this.env.openDbi({ name, create, keyIsBuffer: true }) - this.txn = null + + if (context) { + this.context = context + } else { + this.context = new AsyncLocalStorage() + } } + /** + * + * @param {string} key + * @param {object} value + */ put(key, value) { - this.transact(() => { - try { - const keyBuffer = Buffer.from(key, 'utf8') - let valueBuffer - if (value && value['\x10binary-data\x02']) - valueBuffer = value['\x10binary-data\x02'] - else - valueBuffer = pack(value) - this.txn.putBinary(this.dbi, keyBuffer, valueBuffer) - } catch (error) { - console.error('Error storing value:', error) - } + this.transact((txn) => { + const keyBuffer = Buffer.from(key, 'utf8') + let valueBuffer + if (value && value['\x10binary-data\x02']) + valueBuffer = value['\x10binary-data\x02'] + else + valueBuffer = pack(value) + txn.putBinary(this.dbi, keyBuffer, valueBuffer) }) } + /** + * + * @param {object} key + * @returns + */ get(key) { - return this.transact(() => { - try { - const keyBuffer = Buffer.from(key, 'utf8') - const value = this.txn.getBinary(this.dbi, keyBuffer) - return value == null ? null : unpack(value) - } catch (error) { - console.error('Error retrieving value:', error) - return null - } + return this.transact((txn) => { + const keyBuffer = Buffer.from(key, 'utf8') + const value = txn.getBinary(this.dbi, keyBuffer) + return value == null ? null : unpack(value) }, true) } + /** + * + * @param {object} key + * @returns + */ del(key) { - return this.transact(() => { - try { - const keyBuffer = Buffer.from(key, 'utf8') - if (this.txn.getBinary(this.dbi, keyBuffer) != null) { - this.txn.del(this.dbi, keyBuffer) - return true - } else { - return false - } - } catch (error) { - console.error('Error deleting value:', error) + return this.transact((txn) => { + const keyBuffer = Buffer.from(key, 'utf8') + if (txn.getBinary(this.dbi, keyBuffer) != null) { + txn.del(this.dbi, keyBuffer) + return true + } else { + return false } }) } /** - * Wrapper function for a transaction. This is only sensible in adset-consumer - * as there is only ever one thread of control working on the Store instance - * at a time. - * @param {*} f + * Wrapper function for a transaction. + * @param {*} f Function to execute within a txn + * @param {boolean} [readOnly=false] Set to true if txn is readOnly */ - transact(f, readonly = false) { + transact(f, readOnly = false) { let ownTxn = false - if (!this.txn) { - this.txn = this.env.beginTxn() + let store = this.context.getStore() + if (!store) { + store = {} + this.context.enterWith(store) + } + let txn = store.txn + if (!txn) { + txn = this.env.beginTxn({ readOnly }) + store.txn = txn ownTxn = true } try { - const result = f() + const result = f(txn) if (ownTxn) { - if (readonly) { - this.txn.abort() - } - else { - this.txn.commit() - } + if (readOnly) + txn.abort() + else + txn.commit() } return result } catch (error) { - console.error('Transaction aborted due to an error:', error.message) - this.txn.abort() + if (ownTxn) { + console.error('transaction aborted:', error.message) + txn.abort() + } throw error } finally { if (ownTxn) - this.txn = null + store.txn = null } } - async transactAsync(f, readonly = false) { + async transactAsync(f, readOnly = false) { let ownTxn = false - if (!this.txn) { - this.txn = this.env.beginTxn() + let store = this.context.getStore() + if (!store) { + store = {} + this.context.enterWith(store) + } + let txn = store.txn + if (!txn) { + txn = this.env.beginTxn({ readOnly }) + store.txn = txn ownTxn = true } try { - const result = await f() + const result = await f(txn) if (ownTxn) { - if (readonly) { - this.txn.abort() - } - else { - this.txn.commit() - } + if (readOnly) + txn.abort() + else + txn.commit() } return result } catch (error) { - console.error('Transaction aborted due to an error:', error.message) - this.txn.abort() + if (ownTxn) { + console.error('transaction aborted:', error.message) + txn.abort() + } throw error } finally { if (ownTxn) - this.txn = null + store.txn = null } } - iterate() { return new Iterator(this.env, this.dbi) } @@ -239,8 +269,8 @@ class Store { } getCount() { - return this.transact(() => { - return this.dbi.stat(this.txn)?.entryCount + return this.transact((txn) => { + return this.dbi.stat(txn)?.entryCount }) }