Skip to content

Commit

Permalink
Add concurrency controller code to plugin server
Browse files Browse the repository at this point in the history
  • Loading branch information
robbie-c committed Dec 19, 2024
1 parent f111ceb commit 9fbf2dc
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 5 deletions.
11 changes: 7 additions & 4 deletions frontend/src/lib/utils/concurrencyController.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import FastPriorityQueue from 'fastpriorityqueue'
import { promiseResolveReject } from 'lib/utils'

// Note that this file also exists in the plugin-server, please keep them in sync as the tests only exist for this version

class ConcurrencyControllerItem<T> {
_debugTag?: string
_runFn: () => Promise<void>
Expand All @@ -8,7 +11,7 @@ class ConcurrencyControllerItem<T> {
constructor(
concurrencyController: ConcurrencyController,
userFn: () => Promise<T>,
abortController: AbortController,
abortController: AbortController | undefined,
priority: number = Infinity,
debugTag: string | undefined
) {
Expand All @@ -17,7 +20,7 @@ class ConcurrencyControllerItem<T> {
const { promise, resolve, reject } = promiseResolveReject<T>()
this._promise = promise
this._runFn = async () => {
if (abortController.signal.aborted) {
if (abortController?.signal.aborted) {
reject(new FakeAbortError(abortController.signal.reason || 'AbortError'))
return
}
Expand All @@ -32,7 +35,7 @@ class ConcurrencyControllerItem<T> {
reject(error)
}
}
abortController.signal.addEventListener('abort', () => {
abortController?.signal.addEventListener('abort', () => {
reject(new FakeAbortError(abortController.signal.reason || 'AbortError'))
})
promise
Expand Down Expand Up @@ -76,7 +79,7 @@ export class ConcurrencyController {
}: {
fn: () => Promise<T>
priority?: number
abortController: AbortController
abortController?: AbortController
debugTag?: string
}): Promise<T> => {
const item = new ConcurrencyControllerItem(this, fn, abortController, priority, debugTag)
Expand Down
3 changes: 2 additions & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@
"express": "^4.18.2",
"faker": "^5.5.3",
"fast-deep-equal": "^3.1.3",
"fastpriorityqueue": "^0.7.5",
"fernet-nodejs": "^1.0.6",
"generic-pool": "^3.7.1",
"graphile-worker": "0.13.0",
"ioredis": "^4.27.6",
"ipaddr.js": "^2.1.0",
"kafkajs": "^2.2.0",
"lz4-kafkajs": "1.0.0",
"kafkajs-snappy": "^1.1.0",
"lru-cache": "^6.0.0",
"luxon": "^3.4.4",
"lz4-kafkajs": "1.0.0",
"node-fetch": "^2.6.1",
"node-rdkafka": "^2.17.0",
"node-schedule": "^2.1.0",
Expand Down
7 changes: 7 additions & 0 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 133 additions & 0 deletions plugin-server/src/utils/concurrencyController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import FastPriorityQueue from 'fastpriorityqueue'

export function promiseResolveReject<T>(): {
resolve: (value: T) => void
reject: (reason?: any) => void
promise: Promise<T>
} {
let resolve: (value: T) => void
let reject: (reason?: any) => void
const promise = new Promise<T>((innerResolve, innerReject) => {
resolve = innerResolve
reject = innerReject
})
return { resolve: resolve!, reject: reject!, promise }
}

// Note that this file also exists in the frontend code, please keep them in sync as the tests only exist in the other version
class ConcurrencyControllerItem<T> {
_debugTag?: string
_runFn: () => Promise<void>
_priority: number = Infinity
_promise: Promise<T>
constructor(
concurrencyController: ConcurrencyController,
userFn: () => Promise<T>,
abortController: AbortController | undefined,
priority: number = Infinity,
debugTag: string | undefined
) {
this._debugTag = debugTag
this._priority = priority
const { promise, resolve, reject } = promiseResolveReject<T>()
this._promise = promise
this._runFn = async () => {
if (abortController?.signal.aborted) {
reject(new FakeAbortError(abortController.signal.reason || 'AbortError'))
return
}
if (concurrencyController._current.length >= concurrencyController._concurrencyLimit) {
throw new Error('Developer Error: ConcurrencyControllerItem: _runFn called while already running')
}
try {
concurrencyController._current.push(this)
const result = await userFn()
resolve(result)
} catch (error) {
reject(error)
}
}
abortController?.signal.addEventListener('abort', () => {
reject(new FakeAbortError(abortController.signal.reason || 'AbortError'))
})
promise
.catch(() => {
// ignore
})
.finally(() => {
if (concurrencyController._current.includes(this)) {
concurrencyController._current = concurrencyController._current.filter((item) => item !== this)
concurrencyController._runNext()
}
})
}
}

export class ConcurrencyController {
_concurrencyLimit: number

_current: ConcurrencyControllerItem<any>[] = []
private _queue: FastPriorityQueue<ConcurrencyControllerItem<any>> = new FastPriorityQueue(
(a, b) => a._priority < b._priority
)

constructor(concurrencyLimit: number) {
this._concurrencyLimit = concurrencyLimit
}

/**
* Run a function with a mutex. If the mutex is already running, the function will be queued and run when the mutex
* is available.
* @param fn The function to run
* @param priority The priority of the function. Lower numbers will be run first. Defaults to Infinity.
* @param abortController An AbortController that, if aborted, will reject the promise and immediately start the next item in the queue.
* @param debugTag
*/
run = <T>({
fn,
priority,
abortController,
debugTag,
}: {
fn: () => Promise<T>
priority?: number
abortController?: AbortController
debugTag?: string
}): Promise<T> => {
const item = new ConcurrencyControllerItem(this, fn, abortController, priority, debugTag)

this._queue.add(item)

this._tryRunNext()

return item._promise
}

_runNext(): void {
const next = this._queue.poll()
if (next) {
next._runFn()
.catch(() => {
// ignore
})
.finally(() => {
this._tryRunNext()
})
}
}

_tryRunNext(): void {
if (this._current.length < this._concurrencyLimit) {
this._runNext()
}
}

setConcurrencyLimit = (limit: number): void => {
this._concurrencyLimit = limit
}
}

// Create a fake AbortError that allows us to use e.name === 'AbortError' to check if an error is an AbortError
class FakeAbortError extends Error {
name = 'AbortError'
}

0 comments on commit 9fbf2dc

Please sign in to comment.