Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: wait for redis connection before returning the connection instance #212

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions apps/gateway/docker/run-production.sh

This file was deleted.

2 changes: 1 addition & 1 deletion apps/web/src/actions/evaluations/runBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export const runBatchEvaluationAction = withDataset
const evaluations = await evaluationsRepo
.filterById(input.evaluationIds)
.then((r) => r.unwrap())
const queues = setupJobs()
const queues = await setupJobs()
evaluations.forEach((evaluation) => {
const batchId = `evaluation:${evaluation.id}:${nanoid(5)}`
queues.defaultQueue.jobs.enqueueRunBatchEvaluationJob({
Expand Down
13 changes: 0 additions & 13 deletions apps/workers/docker/run-production.sh

This file was deleted.

2 changes: 1 addition & 1 deletion apps/workers/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import http from 'http'
import { captureException, captureMessage } from './utils/sentry'
import startWorkers from './workers'

const workers = startWorkers()
const workers = await startWorkers()

console.log('Workers started')

Expand Down
5 changes: 3 additions & 2 deletions apps/workers/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ const WORKER_OPTS = {
}
const WORKERS = [defaultWorker]

export default function startWorkers() {
export default async function startWorkers() {
const connection = await queues()
return WORKERS.flatMap((w) =>
w.queues.map((q) => {
const worker = new Worker(q, w.processor, {
...WORKER_OPTS,
connection: queues(),
connection,
})

worker.on('error', (error: Error) => {
Expand Down
25 changes: 25 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,20 @@ services:
dockerfile: apps/gateway/docker/Dockerfile
profiles: [building]
environment:
- CACHE_HOST=redis
- DATABASE_URL=postgresql://latitude:secret@db:5432/latitude
- DEFAULT_PROJECT_ID=1
- DEFAULT_PROVIDER_API_KEY=test
- [email protected]
- LATITUDE_DOMAIN=web
- LATITUDE_URL=http://web:8080
- NODE_ENV=production
- QUEUE_HOST=redis
- WEBSOCKETS_SERVER=websockets
- WEBSOCKET_REFRESH_SECRET_TOKEN_KEY=test
- WEBSOCKET_SECRET_TOKEN_KEY=test
- WORKERS_WEBSOCKET_SECRET_TOKEN=test
- HOSTNAME=localhost
depends_on:
- db
- redis
Expand All @@ -51,7 +64,19 @@ services:
dockerfile: apps/workers/docker/Dockerfile
profiles: [building]
environment:
- CACHE_HOST=redis
- DATABASE_URL=postgresql://latitude:secret@db:5432/latitude
- DEFAULT_PROJECT_ID=1
- DEFAULT_PROVIDER_API_KEY=test
- [email protected]
- LATITUDE_DOMAIN=web
- LATITUDE_URL=http://web:8080
- NODE_ENV=production
- QUEUE_HOST=redis
- WEBSOCKETS_SERVER=websockets
- WEBSOCKET_REFRESH_SECRET_TOKEN_KEY=test
- WEBSOCKET_SECRET_TOKEN_KEY=test
- WORKERS_WEBSOCKET_SECRET_TOKEN=test
depends_on:
- db
- redis
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { buildRedisConnection } from '../redis'

let connection: Redis

export const cache = () => {
export const cache = async () => {
if (connection) return connection

connection = buildRedisConnection({
connection = await buildRedisConnection({
host: env.CACHE_HOST,
port: env.CACHE_PORT,
})
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/events/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export const publisher = {
handlers.map((handler) => handler({ data: event })),
)
},
publishLater: (event: LatitudeEvent) => {
const queues = setupJobs()
publishLater: async (event: LatitudeEvent) => {
const queues = await setupJobs()

queues.eventsQueue.jobs.enqueueCreateEventJob(event)
queues.eventsQueue.jobs.enqueuePublishEventJob(event)
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/queues/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { buildRedisConnection } from '../redis'

let connection: Redis

export const queues = () => {
export const queues = async () => {
if (connection) return connection

connection = buildRedisConnection({
connection = await buildRedisConnection({
host: env.QUEUE_HOST,
port: env.QUEUE_PORT,
password: env.QUEUE_PASSWORD,
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,11 @@ export function buildRedisConnection({
host,
...opts
}: Omit<RedisOptions, 'port' & 'host'> & { host: string; port: number }) {
return new Redis(port, host, opts)
return new Promise<Redis>((resolve) => {
const instance = new Redis(port, host, opts)

instance.on('connect', () => {
resolve(instance)
})
})
}
7 changes: 3 additions & 4 deletions packages/core/src/services/ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@ export async function ai({
)
providerLogUuid = providerLog.uuid
} else {
await setupJobs().defaultQueue.jobs.enqueueCreateProviderLogJob(
payload.data,
)
const queues = await setupJobs()
queues.defaultQueue.jobs.enqueueCreateProviderLogJob(payload.data)
}

onFinish?.({ ...event, providerLogUuid })
Expand Down Expand Up @@ -175,7 +174,7 @@ const checkDefaultProviderUsage = async ({
workspace: Workspace
}) => {
if (provider.token === env.DEFAULT_PROVIDER_API_KEY) {
const c = cache()
const c = await cache()
const value = await c.incr(
`workspace:${workspace.id}:defaultProviderRunCount`,
)
Expand Down
7 changes: 4 additions & 3 deletions packages/jobs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { setupQueues } from './queues'

export { Worker } from 'bullmq'

let queues: ReturnType<typeof setupQueues>
export function setupJobs() {
let queues: Awaited<ReturnType<typeof setupQueues>>

export async function setupJobs() {
if (queues) return queues
queues = setupQueues()
queues = await setupQueues()

return queues
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ export const runBatchEvaluationJob = async (
)
})

const progressTracker = new ProgressTracker(queues(), batchId)
const progressTracker = new ProgressTracker(await queues(), batchId)
const firstAttempt = job.attemptsMade === 0

if (firstAttempt) {
await progressTracker.initializeProgress(parameters.length)
}

const progress = await progressTracker.getProgress()
const jobs = setupJobs()
const jobs = await setupJobs()

if (firstAttempt && parameters.length > 0) {
websockets.emit('evaluationStatus', {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { LogSources } from '@latitude-data/core/browser'
import { LogSources, Providers } from '@latitude-data/core/browser'
import * as factories from '@latitude-data/core/factories'
import { Result } from '@latitude-data/core/lib/Result'
import { runDocumentAtCommit } from '@latitude-data/core/services/commits/runDocumentAtCommit'
Expand Down Expand Up @@ -48,7 +48,10 @@ describe('runDocumentJob', () => {

// Create necessary resources using factories
const setup = await factories.createProject({
documents: { 'test-doc': 'Test content' },
providers: [{ type: Providers.OpenAI, name: 'Latitude' }],
documents: {
'test-doc': factories.helpers.createPrompt({ provider: 'Latitude' }),
},
})
workspace = setup.workspace
project = setup.project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const runDocumentJob = async (job: Job<RunDocumentJobData>) => {
batchId,
} = job.data

const progressTracker = new ProgressTracker(queues(), batchId)
const progressTracker = new ProgressTracker(await queues(), batchId)

try {
const workspace = await unsafelyFindWorkspace(workspaceId)
Expand All @@ -58,7 +58,7 @@ export const runDocumentJob = async (job: Job<RunDocumentJobData>) => {

await result.response

const queues = setupJobs()
const queues = await setupJobs()

// Enqueue the evaluation job
await queues.defaultQueue.jobs.enqueueRunEvaluationJob({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const runEvaluationJob = async (job: Job<RunEvaluationJobData>) => {
evaluationId,
} = job.data

const progressTracker = new ProgressTracker(queues(), batchId)
const progressTracker = new ProgressTracker(await queues(), batchId)
const documentLogsScope = new DocumentLogsRepository(workspaceId)
const evaluationsScope = new EvaluationsRepository(workspaceId)
try {
Expand Down
3 changes: 1 addition & 2 deletions packages/jobs/src/job-definitions/events/publishEventJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ export const publishEventJob = async (job: Job<LatitudeEvent>) => {
const handlers = EventHandlers[event.type]
if (!handlers?.length) return

const queues = await setupJobs()
handlers.forEach((handler) => {
const queues = setupJobs()

queues.eventsQueue.queue.add(handler.name, event)
})
}
10 changes: 7 additions & 3 deletions packages/jobs/src/queues/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { queues } from '@latitude-data/core/queues'
import { Job, JobsOptions, Queue, QueueEvents } from 'bullmq'
import Redis from 'ioredis'

import { Jobs, Queues, QUEUES } from '../constants'
import { JobDefinition } from '../job-definitions'
Expand Down Expand Up @@ -30,11 +31,12 @@ export const DEFAULT_JOB_OPTIONS: JobsOptions = {
function setupQueue({
name,
jobs,
connection,
}: {
name: Queues
jobs: readonly QueueJob[]
connection: Redis
}) {
const connection = queues()
const queue = new Queue(name, {
connection,
defaultJobOptions: DEFAULT_JOB_OPTIONS,
Expand All @@ -60,14 +62,16 @@ function setupQueue({

type QueueJob = (typeof QUEUES)[keyof typeof QUEUES]['jobs'][number]

export function setupQueues() {
export async function setupQueues() {
const connection = await queues()

return Object.entries(QUEUES).reduce<{
[K in keyof typeof QUEUES]: ReturnType<typeof setupQueue>
}>(
(acc, [name, { jobs }]) => {
return {
...acc,
[name]: setupQueue({ name: name as Queues, jobs }),
[name]: setupQueue({ name: name as Queues, jobs, connection }),
}
},
{} as { [K in keyof typeof QUEUES]: ReturnType<typeof setupQueue> },
Expand Down
47 changes: 7 additions & 40 deletions packages/sdks/typescript/src/addMessage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from 'vitest'

import { LatitudeSdk } from './index'
import { parseSSE } from './utils/parseSSE'

const encoder = new TextEncoder()
let latitudeApiKey = 'fake-api-key'
Expand Down Expand Up @@ -60,7 +61,7 @@ describe('addMessage', () => {
const stream = new ReadableStream({
start(controller) {
CHUNKS.forEach((chunk, index) => {
controller.enqueue(encoder.encode(JSON.stringify(chunk)))
controller.enqueue(encoder.encode(chunk))
if (index === CHUNKS.length - 1) {
controller.close()
}
Expand All @@ -86,48 +87,14 @@ describe('addMessage', () => {
})

CHUNKS.forEach((chunk, index) => {
expect(onMessageMock).toHaveBeenNthCalledWith(index + 1, chunk)
expect(onMessageMock).toHaveBeenNthCalledWith(
index + 1,
parseSSE(chunk)!.data,
)
})
}),
)

it(
'send on Error callback when chunk is not a valid JSON',
server.boundary(async () => {
const onMessageMock = vi.fn()
const onErrorMock = vi.fn()
server.use(
http.post(
'http://localhost:8787/api/v1/chats/add-message',
async () => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode('invalid json'))
controller.close()
},
})

return new HttpResponse(stream, {
headers: {
'Content-Type': 'text/plain',
},
})
},
),
)
await SDK.addMessages({
params: {
messages: [],
documentLogUuid: 'fake-document-log-uuid',
source: LogSources.Playground,
},
onMessage: onMessageMock,
onError: onErrorMock,
})
expect(onErrorMock).toHaveBeenCalledWith(expect.any(Error))
}),
)

it(
'sends all message onFinish callback and final response',
server.boundary(async () => {
Expand All @@ -139,7 +106,7 @@ describe('addMessage', () => {
const stream = new ReadableStream({
start(controller) {
CHUNKS.forEach((chunk, index) => {
controller.enqueue(encoder.encode(JSON.stringify(chunk)))
controller.enqueue(encoder.encode(chunk))
if (index === CHUNKS.length - 1) {
controller.close()
}
Expand Down
1 change: 1 addition & 0 deletions packages/sdks/typescript/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export class LatitudeSdk {
params: { projectId, commitUuid },
body: { documentPath, parameters, source },
})

return this.handleStreamChainResponse({
response,
onMessage,
Expand Down
Loading
Loading