Skip to content

Commit

Permalink
chore: wait for redis connection before returning the connection inst…
Browse files Browse the repository at this point in the history
…ance

fixes LATITUDE-LLM-APP-N
  • Loading branch information
geclos committed Sep 18, 2024
1 parent 48b7a81 commit 6b46f7f
Show file tree
Hide file tree
Showing 14 changed files with 38 additions and 28 deletions.
2 changes: 1 addition & 1 deletion apps/web/src/actions/evaluations/runBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export const runBatchEvaluationAction = withDataset
const evaluations = await evaluationsRepo
.filterById(input.evaluationIds)
.then((r) => r.unwrap())
const queues = setupJobs()
const queues = await setupJobs()
evaluations.forEach((evaluation) => {
const batchId = `evaluation:${evaluation.id}:${nanoid(5)}`
queues.defaultQueue.jobs.enqueueRunBatchEvaluationJob({
Expand Down
2 changes: 1 addition & 1 deletion apps/workers/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import http from 'http'
import { captureException, captureMessage } from './utils/sentry'
import startWorkers from './workers'

const workers = startWorkers()
const workers = await startWorkers()

console.log('Workers started')

Expand Down
5 changes: 3 additions & 2 deletions apps/workers/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ const WORKER_OPTS = {
}
const WORKERS = [defaultWorker]

export default function startWorkers() {
export default async function startWorkers() {
const connection = await queues()
return WORKERS.flatMap((w) =>
w.queues.map((q) => {
const worker = new Worker(q, w.processor, {
...WORKER_OPTS,
connection: queues(),
connection,
})

worker.on('error', (error: Error) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { buildRedisConnection } from '../redis'

let connection: Redis

export const cache = () => {
export const cache = async () => {
if (connection) return connection

connection = buildRedisConnection({
connection = await buildRedisConnection({
host: env.CACHE_HOST,
port: env.CACHE_PORT,
})
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/events/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export const publisher = {
handlers.map((handler) => handler({ data: event })),
)
},
publishLater: (event: LatitudeEvent) => {
const queues = setupJobs()
publishLater: async (event: LatitudeEvent) => {
const queues = await setupJobs()

queues.eventsQueue.jobs.enqueueCreateEventJob(event)
queues.eventsQueue.jobs.enqueuePublishEventJob(event)
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/queues/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { buildRedisConnection } from '../redis'

let connection: Redis

export const queues = () => {
export const queues = async () => {
if (connection) return connection

connection = buildRedisConnection({
connection = await buildRedisConnection({
host: env.QUEUE_HOST,
port: env.QUEUE_PORT,
password: env.QUEUE_PASSWORD,
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,11 @@ export function buildRedisConnection({
host,
...opts
}: Omit<RedisOptions, 'port' & 'host'> & { host: string; port: number }) {
return new Redis(port, host, opts)
return new Promise<Redis>((resolve) => {
const instance = new Redis(port, host, opts)

instance.on('connect', () => {
resolve(instance)
})
})
}
7 changes: 3 additions & 4 deletions packages/core/src/services/ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@ export async function ai({
)
providerLogUuid = providerLog.uuid
} else {
await setupJobs().defaultQueue.jobs.enqueueCreateProviderLogJob(
payload.data,
)
const queues = await setupJobs()
queues.defaultQueue.jobs.enqueueCreateProviderLogJob(payload.data)
}

onFinish?.({ ...event, providerLogUuid })
Expand Down Expand Up @@ -175,7 +174,7 @@ const checkDefaultProviderUsage = async ({
workspace: Workspace
}) => {
if (provider.token === env.DEFAULT_PROVIDER_API_KEY) {
const c = cache()
const c = await cache()
const value = await c.incr(
`workspace:${workspace.id}:defaultProviderRunCount`,
)
Expand Down
7 changes: 4 additions & 3 deletions packages/jobs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { setupQueues } from './queues'

export { Worker } from 'bullmq'

let queues: ReturnType<typeof setupQueues>
export function setupJobs() {
let queues: Awaited<ReturnType<typeof setupQueues>>

export async function setupJobs() {
if (queues) return queues
queues = setupQueues()
queues = await setupQueues()

return queues
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ export const runBatchEvaluationJob = async (
)
})

const progressTracker = new ProgressTracker(queues(), batchId)
const progressTracker = new ProgressTracker(await queues(), batchId)
const firstAttempt = job.attemptsMade === 0

if (firstAttempt) {
await progressTracker.initializeProgress(parameters.length)
}

const progress = await progressTracker.getProgress()
const jobs = setupJobs()
const jobs = await setupJobs()

if (firstAttempt && parameters.length > 0) {
websockets.emit('evaluationStatus', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const runDocumentJob = async (job: Job<RunDocumentJobData>) => {
batchId,
} = job.data

const progressTracker = new ProgressTracker(queues(), batchId)
const progressTracker = new ProgressTracker(await queues(), batchId)

try {
const workspace = await unsafelyFindWorkspace(workspaceId)
Expand All @@ -58,7 +58,7 @@ export const runDocumentJob = async (job: Job<RunDocumentJobData>) => {

await result.response

const queues = setupJobs()
const queues = await setupJobs()

// Enqueue the evaluation job
await queues.defaultQueue.jobs.enqueueRunEvaluationJob({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const runEvaluationJob = async (job: Job<RunEvaluationJobData>) => {
evaluationId,
} = job.data

const progressTracker = new ProgressTracker(queues(), batchId)
const progressTracker = new ProgressTracker(await queues(), batchId)
const documentLogsScope = new DocumentLogsRepository(workspaceId)
const evaluationsScope = new EvaluationsRepository(workspaceId)
try {
Expand Down
3 changes: 1 addition & 2 deletions packages/jobs/src/job-definitions/events/publishEventJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ export const publishEventJob = async (job: Job<LatitudeEvent>) => {
const handlers = EventHandlers[event.type]
if (!handlers?.length) return

const queues = await setupJobs()
handlers.forEach((handler) => {
const queues = setupJobs()

queues.eventsQueue.queue.add(handler.name, event)
})
}
10 changes: 7 additions & 3 deletions packages/jobs/src/queues/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { queues } from '@latitude-data/core/queues'
import { Job, JobsOptions, Queue, QueueEvents } from 'bullmq'
import Redis from 'ioredis'

import { Jobs, Queues, QUEUES } from '../constants'
import { JobDefinition } from '../job-definitions'
Expand Down Expand Up @@ -30,11 +31,12 @@ export const DEFAULT_JOB_OPTIONS: JobsOptions = {
function setupQueue({
name,
jobs,
connection,
}: {
name: Queues
jobs: readonly QueueJob[]
connection: Redis
}) {
const connection = queues()
const queue = new Queue(name, {
connection,
defaultJobOptions: DEFAULT_JOB_OPTIONS,
Expand All @@ -60,14 +62,16 @@ function setupQueue({

type QueueJob = (typeof QUEUES)[keyof typeof QUEUES]['jobs'][number]

export function setupQueues() {
export async function setupQueues() {
const connection = await queues()

return Object.entries(QUEUES).reduce<{
[K in keyof typeof QUEUES]: ReturnType<typeof setupQueue>
}>(
(acc, [name, { jobs }]) => {
return {
...acc,
[name]: setupQueue({ name: name as Queues, jobs }),
[name]: setupQueue({ name: name as Queues, jobs, connection }),
}
},
{} as { [K in keyof typeof QUEUES]: ReturnType<typeof setupQueue> },
Expand Down

0 comments on commit 6b46f7f

Please sign in to comment.