Skip to content

Commit

Permalink
Store txn in AsyncLocalStorage context
Browse files Browse the repository at this point in the history
  • Loading branch information
matttylr committed Oct 29, 2024
1 parent 8673df9 commit a55c1c5
Showing 1 changed file with 104 additions and 74 deletions.
178 changes: 104 additions & 74 deletions store.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { pack, unpack } = require('msgpackr')
const { mkdirSync } = require('fs')
const { Cursor, Env } = require('node-gyp-build')(__dirname)
const { pack, unpack } = require('msgpackr')
const { mkdirSync } = require('fs')
const { Cursor, Env } = require('node-gyp-build')(__dirname)
const { AsyncLocalStorage } = require('async_hooks')

function asBinary(buffer) {
return {
Expand Down Expand Up @@ -69,138 +70,167 @@ class Iterator {
}

/**
* This class enforces a very specific usage of lmdb. All keys are utf8 encoded
* buffers and all values are encoded with msgpack. Transaction handling is also
* intentionally simplified to just one current transaction, no nesting, with
* the assumption that there is only one active client. This is the pattern that
* This class enforces a very specific usage of lmdb. All keys are
* utf8 encoded buffers and all values are encoded with
* msgpack. Transaction handling is also intentionally simplified to
* just one current transaction, no nesting. This is the pattern that
* adset-consumer uses in its current form.
*/
class Store {
// TODO: add in support for a persistent readonly transaction for the lifetime
// of the Store instance - Engines will need this
// TODO: maybe refactor to split env/dbi params
// TODO: support additional dbis

/**
* @param {object} options
* @param {boolean} options.create
* @param {number} options.mapSize
* @param {string} options.name
* @param {boolean} options.noReadAhead
* @param {string} options.path
* @param {AsyncLocalStorage} context
*/
constructor({
create = false,
mapSize,
name,
noReadAhead = false,
path
}) {
}, context) {
this.env = new Env()
this.env.open({ path, mapSize, noReadAhead })
this.dbi = this.env.openDbi({ name, create, keyIsBuffer: true })
this.txn = null

if (context) {
this.context = context
} else {
this.context = new AsyncLocalStorage()
}
}

/**
*
* @param {string} key
* @param {object} value
*/
put(key, value) {
this.transact(() => {
try {
const keyBuffer = Buffer.from(key, 'utf8')
let valueBuffer
if (value && value['\x10binary-data\x02'])
valueBuffer = value['\x10binary-data\x02']
else
valueBuffer = pack(value)
this.txn.putBinary(this.dbi, keyBuffer, valueBuffer)
} catch (error) {
console.error('Error storing value:', error)
}
this.transact((txn) => {
const keyBuffer = Buffer.from(key, 'utf8')
let valueBuffer
if (value && value['\x10binary-data\x02'])
valueBuffer = value['\x10binary-data\x02']
else
valueBuffer = pack(value)
txn.putBinary(this.dbi, keyBuffer, valueBuffer)
})
}

/**
*
* @param {object} key
* @returns
*/
get(key) {
return this.transact(() => {
try {
const keyBuffer = Buffer.from(key, 'utf8')
const value = this.txn.getBinary(this.dbi, keyBuffer)
return value == null ? null : unpack(value)
} catch (error) {
console.error('Error retrieving value:', error)
return null
}
return this.transact((txn) => {
const keyBuffer = Buffer.from(key, 'utf8')
const value = txn.getBinary(this.dbi, keyBuffer)
return value == null ? null : unpack(value)
}, true)
}

/**
*
* @param {object} key
* @returns
*/
del(key) {
return this.transact(() => {
try {
const keyBuffer = Buffer.from(key, 'utf8')
if (this.txn.getBinary(this.dbi, keyBuffer) != null) {
this.txn.del(this.dbi, keyBuffer)
return true
} else {
return false
}
} catch (error) {
console.error('Error deleting value:', error)
return this.transact((txn) => {
const keyBuffer = Buffer.from(key, 'utf8')
if (txn.getBinary(this.dbi, keyBuffer) != null) {
txn.del(this.dbi, keyBuffer)
return true
} else {
return false
}
})
}

/**
* Wrapper function for a transaction. This is only sensible in adset-consumer
* as there is only ever one thread of control working on the Store instance
* at a time.
* @param {*} f
* Wrapper function for a transaction.
* @param {*} f Function to execute within a txn
* @param {boolean} [readOnly=false] Set to true if txn is readOnly
*/
transact(f, readonly = false) {
transact(f, readOnly = false) {
let ownTxn = false
if (!this.txn) {
this.txn = this.env.beginTxn()
let store = this.context.getStore()
if (!store) {
store = {}
this.context.enterWith(store)
}
let txn = store.txn
if (!txn) {
txn = this.env.beginTxn({ readOnly })
store.txn = txn
ownTxn = true
}

try {
const result = f()
const result = f(txn)
if (ownTxn) {
if (readonly) {
this.txn.abort()
}
else {
this.txn.commit()
}
if (readOnly)
txn.abort()
else
txn.commit()
}
return result
} catch (error) {
console.error('Transaction aborted due to an error:', error.message)
this.txn.abort()
if (ownTxn) {
console.error('transaction aborted:', error.message)
txn.abort()
}
throw error
} finally {
if (ownTxn)
this.txn = null
store.txn = null
}
}

async transactAsync(f, readonly = false) {
async transactAsync(f, readOnly = false) {
let ownTxn = false
if (!this.txn) {
this.txn = this.env.beginTxn()
let store = this.context.getStore()
if (!store) {
store = {}
this.context.enterWith(store)
}
let txn = store.txn
if (!txn) {
txn = this.env.beginTxn({ readOnly })
store.txn = txn
ownTxn = true
}

try {
const result = await f()
const result = await f(txn)
if (ownTxn) {
if (readonly) {
this.txn.abort()
}
else {
this.txn.commit()
}
if (readOnly)
txn.abort()
else
txn.commit()
}
return result
} catch (error) {
console.error('Transaction aborted due to an error:', error.message)
this.txn.abort()
if (ownTxn) {
console.error('transaction aborted:', error.message)
txn.abort()
}
throw error
} finally {
if (ownTxn)
this.txn = null
store.txn = null
}
}


iterate() {
return new Iterator(this.env, this.dbi)
}
Expand Down Expand Up @@ -239,8 +269,8 @@ class Store {
}

getCount() {
return this.transact(() => {
return this.dbi.stat(this.txn)?.entryCount
return this.transact((txn) => {
return this.dbi.stat(txn)?.entryCount
})
}

Expand Down

0 comments on commit a55c1c5

Please sign in to comment.