diff --git a/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-breakdown-edit--light.png b/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-breakdown-edit--light.png index 00916265cd38b..6724b2a2d5179 100644 Binary files a/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-breakdown-edit--light.png and b/frontend/__snapshots__/scenes-app-insights--funnel-top-to-bottom-breakdown-edit--light.png differ diff --git a/frontend/src/lib/utils/concurrencyController.ts b/frontend/src/lib/utils/concurrencyController.ts index 941af92f33b74..7326165b623a5 100644 --- a/frontend/src/lib/utils/concurrencyController.ts +++ b/frontend/src/lib/utils/concurrencyController.ts @@ -1,5 +1,8 @@ import FastPriorityQueue from 'fastpriorityqueue' import { promiseResolveReject } from 'lib/utils' + +// Note that this file also exists in the plugin-server, please keep them in sync as the tests only exist for this version + class ConcurrencyControllerItem { _debugTag?: string _runFn: () => Promise @@ -8,7 +11,7 @@ class ConcurrencyControllerItem { constructor( concurrencyController: ConcurrencyController, userFn: () => Promise, - abortController: AbortController, + abortController: AbortController | undefined, priority: number = Infinity, debugTag: string | undefined ) { @@ -17,7 +20,7 @@ class ConcurrencyControllerItem { const { promise, resolve, reject } = promiseResolveReject() this._promise = promise this._runFn = async () => { - if (abortController.signal.aborted) { + if (abortController?.signal.aborted) { reject(new FakeAbortError(abortController.signal.reason || 'AbortError')) return } @@ -32,7 +35,7 @@ class ConcurrencyControllerItem { reject(error) } } - abortController.signal.addEventListener('abort', () => { + abortController?.signal.addEventListener('abort', () => { reject(new FakeAbortError(abortController.signal.reason || 'AbortError')) }) promise @@ -76,7 +79,7 @@ export class ConcurrencyController { }: { fn: () => Promise priority?: number - abortController: AbortController + abortController?: AbortController debugTag?: string }): Promise => { const item = new ConcurrencyControllerItem(this, fn, abortController, priority, debugTag) diff --git a/plugin-server/package.json b/plugin-server/package.json index 11df155e0757c..9014d19be548b 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -69,16 +69,17 @@ "express": "^4.18.2", "faker": "^5.5.3", "fast-deep-equal": "^3.1.3", + "fastpriorityqueue": "^0.7.5", "fernet-nodejs": "^1.0.6", "generic-pool": "^3.7.1", "graphile-worker": "0.13.0", "ioredis": "^4.27.6", "ipaddr.js": "^2.1.0", "kafkajs": "^2.2.0", - "lz4-kafkajs": "1.0.0", "kafkajs-snappy": "^1.1.0", "lru-cache": "^6.0.0", "luxon": "^3.4.4", + "lz4-kafkajs": "1.0.0", "node-fetch": "^2.6.1", "node-rdkafka": "^2.17.0", "node-schedule": "^2.1.0", diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index c297462845d8e..f187191553102 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -91,6 +91,9 @@ dependencies: fast-deep-equal: specifier: ^3.1.3 version: 3.1.3 + fastpriorityqueue: + specifier: ^0.7.5 + version: 0.7.5 fernet-nodejs: specifier: ^1.0.6 version: 1.0.6 @@ -6276,6 +6279,10 @@ packages: strnum: 1.0.5 dev: false + /fastpriorityqueue@0.7.5: + resolution: {integrity: sha512-3Pa0n9gwy8yIbEsT3m2j/E9DXgWvvjfiZjjqcJ+AdNKTAlVMIuFYrYG5Y3RHEM8O6cwv9hOpOWY/NaMfywoQVA==} + dev: false + /fastq@1.15.0: resolution: {integrity: sha512-wBrocU2LCXXa+lWBt8RoIRD89Fi8OdABODa/kEnyeyjS5aZO5/GNvI5sEINADqP/h8M29UHTHUb53sUu5Ihqdw==} dependencies: diff --git a/plugin-server/src/utils/concurrencyController.ts b/plugin-server/src/utils/concurrencyController.ts new file mode 100644 index 0000000000000..ac84d439fd507 --- /dev/null +++ b/plugin-server/src/utils/concurrencyController.ts @@ -0,0 +1,133 @@ +import FastPriorityQueue from 'fastpriorityqueue' + +export function promiseResolveReject(): { + resolve: (value: T) => void + reject: (reason?: any) => void + promise: Promise +} { + let resolve: (value: T) => void + let reject: (reason?: any) => void + const promise = new Promise((innerResolve, innerReject) => { + resolve = innerResolve + reject = innerReject + }) + return { resolve: resolve!, reject: reject!, promise } +} + +// Note that this file also exists in the frontend code, please keep them in sync as the tests only exist in the other version +class ConcurrencyControllerItem { + _debugTag?: string + _runFn: () => Promise + _priority: number = Infinity + _promise: Promise + constructor( + concurrencyController: ConcurrencyController, + userFn: () => Promise, + abortController: AbortController | undefined, + priority: number = Infinity, + debugTag: string | undefined + ) { + this._debugTag = debugTag + this._priority = priority + const { promise, resolve, reject } = promiseResolveReject() + this._promise = promise + this._runFn = async () => { + if (abortController?.signal.aborted) { + reject(new FakeAbortError(abortController.signal.reason || 'AbortError')) + return + } + if (concurrencyController._current.length >= concurrencyController._concurrencyLimit) { + throw new Error('Developer Error: ConcurrencyControllerItem: _runFn called while already running') + } + try { + concurrencyController._current.push(this) + const result = await userFn() + resolve(result) + } catch (error) { + reject(error) + } + } + abortController?.signal.addEventListener('abort', () => { + reject(new FakeAbortError(abortController.signal.reason || 'AbortError')) + }) + promise + .catch(() => { + // ignore + }) + .finally(() => { + if (concurrencyController._current.includes(this)) { + concurrencyController._current = concurrencyController._current.filter((item) => item !== this) + concurrencyController._runNext() + } + }) + } +} + +export class ConcurrencyController { + _concurrencyLimit: number + + _current: ConcurrencyControllerItem[] = [] + private _queue: FastPriorityQueue> = new FastPriorityQueue( + (a, b) => a._priority < b._priority + ) + + constructor(concurrencyLimit: number) { + this._concurrencyLimit = concurrencyLimit + } + + /** + * Run a function with a mutex. If the mutex is already running, the function will be queued and run when the mutex + * is available. + * @param fn The function to run + * @param priority The priority of the function. Lower numbers will be run first. Defaults to Infinity. + * @param abortController An AbortController that, if aborted, will reject the promise and immediately start the next item in the queue. + * @param debugTag + */ + run = ({ + fn, + priority, + abortController, + debugTag, + }: { + fn: () => Promise + priority?: number + abortController?: AbortController + debugTag?: string + }): Promise => { + const item = new ConcurrencyControllerItem(this, fn, abortController, priority, debugTag) + + this._queue.add(item) + + this._tryRunNext() + + return item._promise + } + + _runNext(): void { + const next = this._queue.poll() + if (next) { + next._runFn() + .catch(() => { + // ignore + }) + .finally(() => { + this._tryRunNext() + }) + } + } + + _tryRunNext(): void { + if (this._current.length < this._concurrencyLimit) { + this._runNext() + } + } + + setConcurrencyLimit = (limit: number): void => { + this._concurrencyLimit = limit + } +} + +// Create a fake AbortError that allows us to use e.name === 'AbortError' to check if an error is an AbortError +class FakeAbortError extends Error { + name = 'AbortError' +}