Skip to content

Commit

Permalink
Merge branch 'master' into adam/search-result-button-updated
Browse files Browse the repository at this point in the history
  • Loading branch information
adamleithp authored Dec 19, 2024
2 parents e51011f + 0a63875 commit a464df1
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 17 deletions.
Binary file modified frontend/__snapshots__/components-command-bar--search--dark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified frontend/__snapshots__/lemon-ui-lemon-input--search--dark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified frontend/__snapshots__/lemon-ui-lemon-input--search--light.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 7 additions & 4 deletions frontend/src/lib/utils/concurrencyController.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import FastPriorityQueue from 'fastpriorityqueue'
import { promiseResolveReject } from 'lib/utils'

// Note that this file also exists in the plugin-server, please keep them in sync as the tests only exist for this version

class ConcurrencyControllerItem<T> {
_debugTag?: string
_runFn: () => Promise<void>
Expand All @@ -8,7 +11,7 @@ class ConcurrencyControllerItem<T> {
constructor(
concurrencyController: ConcurrencyController,
userFn: () => Promise<T>,
abortController: AbortController,
abortController: AbortController | undefined,
priority: number = Infinity,
debugTag: string | undefined
) {
Expand All @@ -17,7 +20,7 @@ class ConcurrencyControllerItem<T> {
const { promise, resolve, reject } = promiseResolveReject<T>()
this._promise = promise
this._runFn = async () => {
if (abortController.signal.aborted) {
if (abortController?.signal.aborted) {
reject(new FakeAbortError(abortController.signal.reason || 'AbortError'))
return
}
Expand All @@ -32,7 +35,7 @@ class ConcurrencyControllerItem<T> {
reject(error)
}
}
abortController.signal.addEventListener('abort', () => {
abortController?.signal.addEventListener('abort', () => {
reject(new FakeAbortError(abortController.signal.reason || 'AbortError'))
})
promise
Expand Down Expand Up @@ -76,7 +79,7 @@ export class ConcurrencyController {
}: {
fn: () => Promise<T>
priority?: number
abortController: AbortController
abortController?: AbortController
debugTag?: string
}): Promise<T> => {
const item = new ConcurrencyControllerItem(this, fn, abortController, priority, debugTag)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
"pmtiles": "^2.11.0",
"postcss": "^8.4.31",
"postcss-preset-env": "^9.3.0",
"posthog-js": "1.202.2",
"posthog-js": "1.202.4",
"posthog-js-lite": "3.0.0",
"prettier": "^2.8.8",
"prop-types": "^15.7.2",
Expand Down
3 changes: 2 additions & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@
"express": "^4.18.2",
"faker": "^5.5.3",
"fast-deep-equal": "^3.1.3",
"fastpriorityqueue": "^0.7.5",
"fernet-nodejs": "^1.0.6",
"generic-pool": "^3.7.1",
"graphile-worker": "0.13.0",
"ioredis": "^4.27.6",
"ipaddr.js": "^2.1.0",
"kafkajs": "^2.2.0",
"lz4-kafkajs": "1.0.0",
"kafkajs-snappy": "^1.1.0",
"lru-cache": "^6.0.0",
"luxon": "^3.4.4",
"lz4-kafkajs": "1.0.0",
"node-fetch": "^2.6.1",
"node-rdkafka": "^2.17.0",
"node-schedule": "^2.1.0",
Expand Down
7 changes: 7 additions & 0 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 133 additions & 0 deletions plugin-server/src/utils/concurrencyController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import FastPriorityQueue from 'fastpriorityqueue'

export function promiseResolveReject<T>(): {
resolve: (value: T) => void
reject: (reason?: any) => void
promise: Promise<T>
} {
let resolve: (value: T) => void
let reject: (reason?: any) => void
const promise = new Promise<T>((innerResolve, innerReject) => {
resolve = innerResolve
reject = innerReject
})
return { resolve: resolve!, reject: reject!, promise }
}

// Note that this file also exists in the frontend code, please keep them in sync as the tests only exist in the other version
class ConcurrencyControllerItem<T> {
_debugTag?: string
_runFn: () => Promise<void>
_priority: number = Infinity
_promise: Promise<T>
constructor(
concurrencyController: ConcurrencyController,
userFn: () => Promise<T>,
abortController: AbortController | undefined,
priority: number = Infinity,
debugTag: string | undefined
) {
this._debugTag = debugTag
this._priority = priority
const { promise, resolve, reject } = promiseResolveReject<T>()
this._promise = promise
this._runFn = async () => {
if (abortController?.signal.aborted) {
reject(new FakeAbortError(abortController.signal.reason || 'AbortError'))
return
}
if (concurrencyController._current.length >= concurrencyController._concurrencyLimit) {
throw new Error('Developer Error: ConcurrencyControllerItem: _runFn called while already running')
}
try {
concurrencyController._current.push(this)
const result = await userFn()
resolve(result)
} catch (error) {
reject(error)
}
}
abortController?.signal.addEventListener('abort', () => {
reject(new FakeAbortError(abortController.signal.reason || 'AbortError'))
})
promise
.catch(() => {
// ignore
})
.finally(() => {
if (concurrencyController._current.includes(this)) {
concurrencyController._current = concurrencyController._current.filter((item) => item !== this)
concurrencyController._runNext()
}
})
}
}

export class ConcurrencyController {
_concurrencyLimit: number

_current: ConcurrencyControllerItem<any>[] = []
private _queue: FastPriorityQueue<ConcurrencyControllerItem<any>> = new FastPriorityQueue(
(a, b) => a._priority < b._priority
)

constructor(concurrencyLimit: number) {
this._concurrencyLimit = concurrencyLimit
}

/**
* Run a function with a mutex. If the mutex is already running, the function will be queued and run when the mutex
* is available.
* @param fn The function to run
* @param priority The priority of the function. Lower numbers will be run first. Defaults to Infinity.
* @param abortController An AbortController that, if aborted, will reject the promise and immediately start the next item in the queue.
* @param debugTag
*/
run = <T>({
fn,
priority,
abortController,
debugTag,
}: {
fn: () => Promise<T>
priority?: number
abortController?: AbortController
debugTag?: string
}): Promise<T> => {
const item = new ConcurrencyControllerItem(this, fn, abortController, priority, debugTag)

this._queue.add(item)

this._tryRunNext()

return item._promise
}

_runNext(): void {
const next = this._queue.poll()
if (next) {
next._runFn()
.catch(() => {
// ignore
})
.finally(() => {
this._tryRunNext()
})
}
}

_tryRunNext(): void {
if (this._current.length < this._concurrencyLimit) {
this._runNext()
}
}

setConcurrencyLimit = (limit: number): void => {
this._concurrencyLimit = limit
}
}

// Create a fake AbortError that allows us to use e.name === 'AbortError' to check if an error is an AbortError
class FakeAbortError extends Error {
name = 'AbortError'
}
22 changes: 11 additions & 11 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a464df1

Please sign in to comment.