diff --git a/apps/gateway/src/jobs/index.ts b/apps/gateway/src/jobs/index.ts deleted file mode 100644 index 1136f0b8e..000000000 --- a/apps/gateway/src/jobs/index.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { setupJobs } from '@latitude-data/jobs' -import env from '$/common/env' - -export const { queues } = setupJobs({ - connectionParams: { - host: env.REDIS_HOST, - port: env.REDIS_PORT, - password: env.REDIS_PASSWORD, - }, -}) diff --git a/apps/gateway/src/routes/api/v1/projects/:projectId/commits/:commitUuid/documents/handlers/run.test.ts b/apps/gateway/src/routes/api/v1/projects/:projectId/commits/:commitUuid/documents/handlers/run.test.ts index 826bd3535..14afcc6b6 100644 --- a/apps/gateway/src/routes/api/v1/projects/:projectId/commits/:commitUuid/documents/handlers/run.test.ts +++ b/apps/gateway/src/routes/api/v1/projects/:projectId/commits/:commitUuid/documents/handlers/run.test.ts @@ -24,10 +24,12 @@ const mocks = vi.hoisted(() => ({ runDocumentAtCommit: vi.fn(), queues: { defaultQueue: { - jobs: { - enqueueCreateProviderLogJob: vi.fn(), - enqueueCreateDocumentLogJob: vi.fn(), - }, + addCreateProviderLogJob: vi.fn(), + addCreateDocumentLogJob: vi.fn(), + }, + eventsQueue: { + addCreateEventJob: vi.fn(), + addPublishEventJob: vi.fn(), }, }, })) @@ -44,7 +46,7 @@ vi.mock( }, ) -vi.mock('$/jobs', () => ({ +vi.mock('@latitude-data/jobs', () => ({ queues: mocks.queues, })) @@ -212,7 +214,7 @@ describe('POST /run', () => { await testConsumeStream(res.body as ReadableStream) expect( - mocks.queues.defaultQueue.jobs.enqueueCreateDocumentLogJob, + mocks.queues.defaultQueue.addCreateDocumentLogJob, ).toHaveBeenCalledWith({ commit, data: { diff --git a/apps/gateway/src/routes/api/v1/projects/:projectId/commits/:commitUuid/documents/handlers/run.ts b/apps/gateway/src/routes/api/v1/projects/:projectId/commits/:commitUuid/documents/handlers/run.ts index 7ef0b349d..0f4674706 100644 --- a/apps/gateway/src/routes/api/v1/projects/:projectId/commits/:commitUuid/documents/handlers/run.ts +++ b/apps/gateway/src/routes/api/v1/projects/:projectId/commits/:commitUuid/documents/handlers/run.ts @@ -1,8 +1,8 @@ import { zValidator } from '@hono/zod-validator' import { LogSources } from '@latitude-data/core/browser' import { runDocumentAtCommit } from '@latitude-data/core/services/commits/runDocumentAtCommit' +import { queues } from '@latitude-data/jobs' import { pipeToStream } from '$/common/pipeToStream' -import { queues } from '$/jobs' import { Factory } from 'hono/factory' import { streamSSE } from 'hono/streaming' import { z } from 'zod' @@ -45,7 +45,8 @@ export const runHandler = factory.createHandlers( await pipeToStream(stream, result.stream) - queues.defaultQueue.jobs.enqueueCreateDocumentLogJob({ + // TODO: move to events + await queues.defaultQueue.addCreateDocumentLogJob({ commit, data: { uuid: result.documentLogUuid, diff --git a/apps/web/next.config.mjs b/apps/web/next.config.mjs index 23bae3631..f208eddb2 100644 --- a/apps/web/next.config.mjs +++ b/apps/web/next.config.mjs @@ -13,12 +13,6 @@ const nextConfig = { output: 'standalone', transpilePackages: INTERNAL_PACKAGES, experimental: { - // Dear developer, - // - // Unfortunately, our jobs packages uses some meta programming that relies - // on the name of job handler functions for things to work properly. As you - // can imagine, minification would break this. So we have to disable it. - serverMinification: false, // TODO: Review this decision. It would be more performant to use // direct uploads. To implement it we need to generate a signed URL // that's send directly to S3 and the clint upload the file to Amazon directly diff --git a/apps/web/package.json b/apps/web/package.json index 656fff93f..666d5745b 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -34,6 +34,7 @@ "lodash-es": "^4.17.21", "lucia": "^3.2.0", "monaco-editor": "^0.50.0", + "nanoid": "^5.0.7", "next": "^14.3.0-canary.87", "next-themes": "^0.3.0", "nextjs-toploader": "^1.6.12", diff --git a/apps/web/src/actions/datasets/create.ts b/apps/web/src/actions/datasets/create.ts index 3fc89ab75..fbf225b7e 100644 --- a/apps/web/src/actions/datasets/create.ts +++ b/apps/web/src/actions/datasets/create.ts @@ -1,8 +1,8 @@ 'use server' +import { disk } from '@latitude-data/core/lib/disk' import { DatasetsRepository } from '@latitude-data/core/repositories' import { createDataset } from '@latitude-data/core/services/datasets/create' -import disk from '$/lib/disk' import { z } from 'zod' import { authProcedure } from '../procedures' diff --git a/apps/web/src/actions/datasets/destroy.ts b/apps/web/src/actions/datasets/destroy.ts index 039d190ba..b14538e8c 100644 --- a/apps/web/src/actions/datasets/destroy.ts +++ b/apps/web/src/actions/datasets/destroy.ts @@ -1,8 +1,8 @@ 'use server' +import { disk } from '@latitude-data/core/lib/disk' import { DatasetsRepository } from '@latitude-data/core/repositories' import { destroyDataset } from '@latitude-data/core/services/datasets/destroy' -import disk from '$/lib/disk' import { z } from 'zod' import { authProcedure } from '../procedures' @@ -19,5 +19,5 @@ export const destroyDatasetAction = authProcedure const repo = new DatasetsRepository(ctx.workspace.id) const dataset = await repo.find(id).then((r) => r.unwrap()) - return await destroyDataset({ dataset, disk }).then((r) => r.unwrap()) + return await destroyDataset({ dataset, disk: disk }).then((r) => r.unwrap()) }) diff --git a/apps/web/src/actions/datasets/preview.ts b/apps/web/src/actions/datasets/preview.ts index f0cba9e97..37f96d2f2 100644 --- a/apps/web/src/actions/datasets/preview.ts +++ b/apps/web/src/actions/datasets/preview.ts @@ -1,8 +1,8 @@ 'use server' +import { disk } from '@latitude-data/core/lib/disk' import { DatasetsRepository } from '@latitude-data/core/repositories' import { previewDataset } from '@latitude-data/core/services/datasets/preview' -import disk from '$/lib/disk' import { z } from 'zod' import { authProcedure } from '../procedures' @@ -17,5 +17,5 @@ export const previewDatasetAction = authProcedure .handler(async ({ ctx, input }) => { const repo = new DatasetsRepository(ctx.workspace.id) const dataset = await repo.find(input.id).then((r) => r.unwrap()) - return await previewDataset({ dataset, disk }).then((r) => r.unwrap()) + return await previewDataset({ dataset, disk: disk }).then((r) => r.unwrap()) }) diff --git a/apps/web/src/actions/evaluations/runBatch.test.ts b/apps/web/src/actions/evaluations/runBatch.test.ts new file mode 100644 index 000000000..2e1527bda --- /dev/null +++ b/apps/web/src/actions/evaluations/runBatch.test.ts @@ -0,0 +1,171 @@ +import { + Commit, + Dataset, + DocumentVersion, + EvaluationDto, + Project, + ProviderApiKey, + Providers, + User, + Workspace, +} from '@latitude-data/core/browser' +import * as factories from '@latitude-data/core/factories' +import { queues } from '@latitude-data/jobs' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +import { runBatchAction } from './runBatch' + +const mocks = vi.hoisted(() => ({ + getSession: vi.fn(), +})) + +vi.mock('$/services/auth/getSession', () => ({ + getSession: mocks.getSession, +})) + +vi.mock('@latitude-data/jobs', () => ({ + queues: { + defaultQueue: { + addRunBatchEvaluationJob: vi.fn(), + }, + eventsQueue: { + addCreateEventJob: vi.fn(), + addPublishEventJob: vi.fn(), + }, + }, +})) + +describe('runBatchAction', () => { + describe('unauthorized', () => { + it('errors when the user is not authenticated', async () => { + const [_, error] = await runBatchAction({ + datasetId: 1, + projectId: 1, + documentUuid: 'doc-uuid', + commitUuid: 'commit-uuid', + runCount: 5, + evaluationId: 1, + }) + + expect(error!.name).toEqual('UnauthorizedError') + }) + }) + + describe('authorized', () => { + let workspace: Workspace, + user: User, + project: Project, + document: DocumentVersion, + commit: Commit, + dataset: Dataset, + evaluation: EvaluationDto, + provider: ProviderApiKey + + beforeEach(async () => { + const setup = await factories.createProject({ + documents: { 'test-doc': 'Test content' }, + }) + workspace = setup.workspace + user = setup.user + project = setup.project + document = setup.documents[0]! + commit = setup.commit + + provider = await factories.createProviderApiKey({ + workspace, + type: Providers.OpenAI, + name: 'Test Provider', + user, + }) + + dataset = await factories + .createDataset({ + name: 'Test Dataset', + workspace, + author: user, + }) + .then((result) => result.dataset) + + evaluation = await factories.createEvaluation({ + provider, + name: 'Test Evaluation', + }) + + mocks.getSession.mockReturnValue({ + user, + workspace: { id: workspace.id, name: workspace.name }, + }) + }) + + it('successfully enqueues a batch evaluation job', async () => { + const [result, error] = await runBatchAction({ + datasetId: dataset.id, + projectId: project.id, + documentUuid: document.documentUuid, + commitUuid: commit.uuid, + runCount: 5, + evaluationId: evaluation.id, + }) + + expect(error).toBeNull() + expect(result).toEqual({ + success: true, + batchId: expect.any(String), + }) + + expect(queues.defaultQueue.addRunBatchEvaluationJob).toHaveBeenCalledWith( + expect.objectContaining({ + evaluation: expect.objectContaining({ id: evaluation.id }), + dataset: expect.objectContaining({ id: dataset.id }), + document: expect.objectContaining({ + documentUuid: document.documentUuid, + }), + runCount: 5, + offset: 0, + parametersMap: undefined, + batchId: expect.any(String), + }), + ) + }) + + it('handles optional parameters', async () => { + const [result, error] = await runBatchAction({ + datasetId: dataset.id, + projectId: project.id, + documentUuid: document.documentUuid, + commitUuid: commit.uuid, + runCount: 5, + evaluationId: evaluation.id, + offset: 10, + parameters: { 1: 100, 2: 200 }, + }) + + expect(error).toBeNull() + expect(result).toEqual({ + success: true, + batchId: expect.any(String), + }) + + expect(queues.defaultQueue.addRunBatchEvaluationJob).toHaveBeenCalledWith( + expect.objectContaining({ + offset: 10, + parametersMap: { 1: 100, 2: 200 }, + }), + ) + }) + + it('handles errors when resources are not found', async () => { + const [_, error] = await runBatchAction({ + datasetId: 999999, + projectId: project.id, + documentUuid: document.documentUuid, + commitUuid: commit.uuid, + runCount: 5, + evaluationId: evaluation.id, + }) + + expect(error).not.toBeNull() + expect(error!.message).toContain('not found') + }) + }) +}) diff --git a/apps/web/src/actions/evaluations/runBatch.ts b/apps/web/src/actions/evaluations/runBatch.ts new file mode 100644 index 000000000..c6e07a8d0 --- /dev/null +++ b/apps/web/src/actions/evaluations/runBatch.ts @@ -0,0 +1,58 @@ +import { + DatasetsRepository, + DocumentVersionsRepository, + EvaluationsRepository, +} from '@latitude-data/core/repositories' +import { queues } from '@latitude-data/jobs' +import { nanoid } from 'nanoid' +import { z } from 'zod' + +import { authProcedure } from '../procedures' + +export const runBatchAction = authProcedure + .createServerAction() + .input( + z.object({ + datasetId: z.number(), + projectId: z.number(), + documentUuid: z.string(), + commitUuid: z.string(), + runCount: z.number(), + offset: z.number().optional().default(0), + parameters: z.record(z.number()).optional(), + evaluationId: z.number(), + }), + ) + .handler(async ({ input, ctx }) => { + const evaluationsRepo = new EvaluationsRepository(ctx.workspace.id) + const evaluation = await evaluationsRepo + .find(input.evaluationId) + .then((r) => r.unwrap()) + + const datasetsRepo = new DatasetsRepository(ctx.workspace.id) + const dataset = await datasetsRepo + .find(input.datasetId) + .then((r) => r.unwrap()) + + const docsRepo = new DocumentVersionsRepository(ctx.workspace.id) + const document = await docsRepo + .getDocumentAtCommit({ + projectId: input.projectId, + commitUuid: input.commitUuid, + documentUuid: input.documentUuid, + }) + .then((r) => r.unwrap()) + + const batchId = nanoid() + await queues.defaultQueue.addRunBatchEvaluationJob({ + evaluation, + dataset, + document, + runCount: input.runCount, + offset: input.offset, + parametersMap: input.parameters, + batchId, + }) + + return { success: true, batchId } + }) diff --git a/apps/web/src/jobs/index.ts b/apps/web/src/jobs/index.ts deleted file mode 100644 index 859b12676..000000000 --- a/apps/web/src/jobs/index.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { env } from '@latitude-data/env' -import { setupJobs } from '@latitude-data/jobs' - -export const { queues } = setupJobs({ - connectionParams: { - host: env.REDIS_HOST, - port: env.REDIS_PORT, - password: env.REDIS_PASSWORD, - }, -}) diff --git a/apps/web/src/lib/disk.ts b/apps/web/src/lib/disk.ts deleted file mode 100644 index c8fbbce72..000000000 --- a/apps/web/src/lib/disk.ts +++ /dev/null @@ -1,14 +0,0 @@ -import path from 'path' -import { fileURLToPath } from 'url' - -import { DiskWrapper } from '@latitude-data/core/lib/disk' - -const PUBLIC_PATH = 'uploads' -const DIRNAME_PATH = path.dirname(fileURLToPath(import.meta.url)) - -export default new DiskWrapper({ - local: { - publicPath: PUBLIC_PATH, - location: path.join(DIRNAME_PATH, `../../public/${PUBLIC_PATH}`), - }, -}) diff --git a/apps/workers/src/server.ts b/apps/workers/src/server.ts index 7e2b9648a..675e3af7d 100644 --- a/apps/workers/src/server.ts +++ b/apps/workers/src/server.ts @@ -1,13 +1,6 @@ -import { setupWorkers } from '@latitude-data/jobs' -import env from '$/env' +import startWorkers from '@latitude-data/jobs/workers' -const workers = setupWorkers({ - connectionParams: { - host: env.REDIS_HOST, - port: env.REDIS_PORT, - password: env.REDIS_PASSWORD, - }, -}) +const workers = startWorkers() console.log('Workers started') diff --git a/packages/core/src/events/publisher.ts b/packages/core/src/events/publisher.ts index 4e6f20e6a..d3c20d181 100644 --- a/packages/core/src/events/publisher.ts +++ b/packages/core/src/events/publisher.ts @@ -1,10 +1,7 @@ -import { env } from '@latitude-data/env' -import { setupJobs } from '@latitude-data/jobs' +import { queues } from '@latitude-data/jobs' import { EventHandler, EventHandlers, LatitudeEvent } from './handlers' -let jobs: ReturnType - export const publisher = { publish: async (event: LatitudeEvent) => { const handlers = EventHandlers[event.type] as EventHandler[] @@ -15,17 +12,7 @@ export const publisher = { ) }, publishLater: (event: LatitudeEvent) => { - if (!jobs) { - jobs = setupJobs({ - connectionParams: { - host: env.REDIS_HOST, - port: env.REDIS_PORT, - password: env.REDIS_PASSWORD, - }, - }) - } - - jobs.queues.eventsQueue.jobs.enqueueCreateEventJob(event) - jobs.queues.eventsQueue.jobs.enqueuePublishEventJob(event) + queues.eventsQueue.addCreateEventJob(event) + queues.eventsQueue.addPublishEventJob(event) }, } diff --git a/packages/core/src/lib/disk.ts b/packages/core/src/lib/disk.ts index f4140686d..475e875f5 100644 --- a/packages/core/src/lib/disk.ts +++ b/packages/core/src/lib/disk.ts @@ -1,4 +1,6 @@ +import path from 'path' import { Readable } from 'stream' +import { fileURLToPath } from 'url' import { Result } from '@latitude-data/core/lib/Result' import { env } from '@latitude-data/env' @@ -46,6 +48,7 @@ async function getReadableStreamFromFile(file: File) { } type BuildArgs = { local: { publicPath: string; location: string } } + export class DiskWrapper { private disk: Disk @@ -117,7 +120,9 @@ export class DiskWrapper { } const awsConfig = getAwsConfig() + return new S3Driver({ + // @ts-ignore credentials: awsConfig.credentials, region: awsConfig.region, bucket: awsConfig.bucket, @@ -126,3 +131,15 @@ export class DiskWrapper { }) } } + +const PUBLIC_PATH = 'uploads' + +export const disk = new DiskWrapper({ + local: { + publicPath: PUBLIC_PATH, + location: path.join( + path.dirname(fileURLToPath(import.meta.url)), + `../../public/${PUBLIC_PATH}`, + ), + }, +}) diff --git a/packages/core/src/lib/index.ts b/packages/core/src/lib/index.ts index 7e2a326a4..79f8940a4 100644 --- a/packages/core/src/lib/index.ts +++ b/packages/core/src/lib/index.ts @@ -3,3 +3,4 @@ export { default as Transaction, type PromisedResult } from './Transaction' export * from './Result' export * from './errors' export * from './commonTypes' +export * from './disk' diff --git a/packages/core/src/lib/readCsv.ts b/packages/core/src/lib/readCsv.ts index 9f7426c3e..7582b38cf 100644 --- a/packages/core/src/lib/readCsv.ts +++ b/packages/core/src/lib/readCsv.ts @@ -12,7 +12,8 @@ function getData(file: File | string) { type ParseCsvOptions = { delimiter: string // https://csv.js.org/parse/options/to_line/ - limit?: number + toLine?: number + fromLine?: number } type ParseResult = { record: Record @@ -25,7 +26,7 @@ export type CsvParsedData = { } export async function syncReadCsv( file: File | string, - { delimiter, limit = -1 }: ParseCsvOptions, + { delimiter, toLine, fromLine }: ParseCsvOptions, ) { try { const data = await getData(file) @@ -39,8 +40,12 @@ export async function syncReadCsv( info: true, } - if (limit > 0) { - opts = { ...opts, to_line: limit } + if (toLine) { + opts = { ...opts, toLine } + } + + if (fromLine) { + opts = { ...opts, fromLine } } const records = parse(data, opts) as ParseResult[] diff --git a/packages/core/src/redis/index.ts b/packages/core/src/redis/index.ts index cbdd24829..ba87d3860 100644 --- a/packages/core/src/redis/index.ts +++ b/packages/core/src/redis/index.ts @@ -1,3 +1,4 @@ +import { env } from '@latitude-data/env' import Redis, { RedisOptions } from 'ioredis' export function buildRedisConnection({ @@ -7,3 +8,9 @@ export function buildRedisConnection({ }: Omit & { host: string; port: number }) { return new Redis(port, host, opts) } + +export const connection = buildRedisConnection({ + host: env.REDIS_HOST, + port: env.REDIS_PORT, + password: env.REDIS_PASSWORD, +}) diff --git a/packages/core/src/services/datasets/preview.ts b/packages/core/src/services/datasets/preview.ts index 7002fd01d..142abb54f 100644 --- a/packages/core/src/services/datasets/preview.ts +++ b/packages/core/src/services/datasets/preview.ts @@ -9,17 +9,20 @@ import { syncReadCsv } from '../../lib/readCsv' export async function previewDataset({ dataset, disk, - limit = 100, + fromLine = 0, + toLine = 100, }: { dataset: Dataset disk: DiskWrapper - limit?: number + fromLine?: number + toLine?: number }) { const diskFile = disk.file(dataset.fileKey) const bytes = await diskFile.getBytes() const file = new TextDecoder().decode(bytes) const readResult = await syncReadCsv(file, { - limit, + fromLine, + toLine, delimiter: dataset.csvDelimiter, }) if (readResult.error) readResult diff --git a/packages/core/src/tests/factories/datasets.ts b/packages/core/src/tests/factories/datasets.ts new file mode 100644 index 000000000..0cdadd242 --- /dev/null +++ b/packages/core/src/tests/factories/datasets.ts @@ -0,0 +1,80 @@ +import { faker } from '@faker-js/faker' + +import { SafeUser, Workspace } from '../../browser' +import { DiskWrapper } from '../../lib/disk' +import { createDataset as createDatasetFn } from '../../services/datasets/create' +import { createWorkspace, ICreateWorkspace } from './workspaces' + +export type ICreateDataset = { + name?: string + workspace?: Workspace | ICreateWorkspace + author?: SafeUser + csvDelimiter?: string + fileContent?: string +} + +export async function createDataset(datasetData: Partial = {}) { + let workspaceData = datasetData.workspace ?? {} + let user: SafeUser + let workspace: Workspace + + if ('id' in workspaceData) { + workspace = workspaceData as Workspace + user = datasetData.author! + } else { + const newWorkspace = await createWorkspace(workspaceData) + workspace = newWorkspace.workspace + user = newWorkspace.userData + } + + const randomName = faker.commerce.productName() + const { name = randomName } = datasetData + + const csvDelimiter = datasetData.csvDelimiter ?? ',' + const fileContent = + datasetData.fileContent ?? generateCsvContent(csvDelimiter) + + const file = new File([fileContent], `${name}.csv`, { type: 'text/csv' }) + + const mockDisk: DiskWrapper = { + putFile: async () => ({ error: null }), + file: () => ({ + toSnapshot: async () => ({ + name: `${name}.csv`, + size: file.size, + type: file.type, + lastModified: new Date(), + }), + }), + } as any + + const result = await createDatasetFn({ + author: user, + workspace, + disk: mockDisk, + data: { + name, + file, + csvDelimiter, + }, + }) + + const dataset = result.unwrap() + + return { dataset, user, workspace } +} + +function generateCsvContent(delimiter: string): string { + const headers = ['id', 'name', 'email', 'age'] + const rows = Array.from({ length: 10 }, (_, i) => [ + i + 1, + faker.person.fullName(), + faker.internet.email(), + faker.number.int({ min: 18, max: 80 }), + ]) + + return [ + headers.join(delimiter), + ...rows.map((row) => row.join(delimiter)), + ].join('\n') +} diff --git a/packages/core/src/tests/factories/index.ts b/packages/core/src/tests/factories/index.ts index 1dd800081..80edcff3f 100644 --- a/packages/core/src/tests/factories/index.ts +++ b/packages/core/src/tests/factories/index.ts @@ -9,3 +9,4 @@ export * from './evaluations' export * from './evaluationResults' export * from './helpers' export * from './evaluationTemplates' +export * from './datasets' diff --git a/packages/jobs/package.json b/packages/jobs/package.json index 5835bc26d..6a93fec00 100644 --- a/packages/jobs/package.json +++ b/packages/jobs/package.json @@ -2,22 +2,33 @@ "name": "@latitude-data/jobs", "version": "0.0.1", "description": "Latitude jobs from Latitude llm", - "main": "./src/index.ts", + "exports": { + ".": { + "import": "./src/index.ts" + }, + "./workers": { + "import": "./src/workers/index.ts" + } + }, "type": "module", "scripts": { "lint": "eslint src", "tc": "tsc --noEmit", - "prettier": "prettier --write src/**/*.ts" + "prettier": "prettier --write src/**/*.ts", + "test": "vitest run --pool=forks", + "test:watch": "vitest" }, "dependencies": { "@latitude-data/env": "workspace:*" }, "devDependencies": { + "@latitude-data/core": "workspace:^", "@latitude-data/eslint-config": "workspace:*", "@latitude-data/typescript-config": "workspace:*", "@types/node": "*", "bullmq": "^5.8.7", "ioredis": "^5.4.1", + "vitest": "^2.0.5", "zod": "^3.23.8" }, "peerDependencies": { diff --git a/packages/jobs/src/index.ts b/packages/jobs/src/index.ts index ca77d4dd8..f42635d18 100644 --- a/packages/jobs/src/index.ts +++ b/packages/jobs/src/index.ts @@ -1,25 +1,6 @@ -import { buildConnection, ConnectionParams } from './connection' import { setupQueues } from './queues' -import startWorkers from './workers' +import { connection } from './utils/connection' export { Worker } from 'bullmq' -export function setupJobs({ - connectionParams, -}: { - connectionParams: ConnectionParams -}) { - const connection = buildConnection(connectionParams) - const queues = setupQueues({ connection }) - - return { queues } -} - -export function setupWorkers({ - connectionParams, -}: { - connectionParams: ConnectionParams -}) { - const connection = buildConnection(connectionParams) - return startWorkers({ connection }) -} +export const queues = setupQueues({ connection: connection }) diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.test.ts b/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.test.ts new file mode 100644 index 000000000..5e46f9594 --- /dev/null +++ b/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.test.ts @@ -0,0 +1,180 @@ +import { randomUUID } from 'crypto' + +import { Workspace } from '@latitude-data/core/browser' +import { findWorkspaceFromDocument } from '@latitude-data/core/data-access' +import { NotFoundError } from '@latitude-data/core/lib/errors' +import { previewDataset } from '@latitude-data/core/services/datasets/preview' +import { Job } from 'bullmq' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +import { queues } from '../../' +import { ProgressTracker } from '../../utils/progressTracker' +import { runBatchEvaluationJob } from './runBatchEvaluationJob' + +// Mock dependencies +vi.mock('@latitude-data/core/data-access', () => ({ + findWorkspaceFromDocument: vi.fn(), +})) + +vi.mock('@latitude-data/core/repositories', () => ({ + CommitsRepository: vi.fn().mockImplementation(() => ({ + find: vi.fn().mockResolvedValue({ + unwrap: () => ({ id: 'commit-1' }), + }), + })), +})) + +vi.mock('@latitude-data/core/services/datasets/preview', () => ({ + previewDataset: vi.fn(), +})) + +vi.mock('../../', () => ({ + queues: { + defaultQueue: { + addRunDocumentJob: vi.fn(), + }, + }, +})) + +vi.mock('../../utils/progressTracker', () => ({ + ProgressTracker: vi.fn().mockImplementation(() => ({ + initializeProgress: vi.fn(), + getProgress: vi.fn().mockResolvedValue({ enqueued: 0 }), + incrementEnqueued: vi.fn(), + })), +})) + +describe('runBatchEvaluationJob', () => { + let mockJob: Job + let mockProgressTracker: ProgressTracker + + beforeEach(() => { + vi.clearAllMocks() + + mockJob = { + data: { + evaluation: { id: 1 }, + dataset: { fileMetadata: { rowCount: 3 } }, + document: { commitId: 'commit-1' }, + parametersMap: { param1: 0, param2: 1 }, + }, + attemptsMade: 0, + } as unknown as Job + + // @ts-ignore + mockProgressTracker = { + initializeProgress: vi.fn(), + getProgress: vi.fn().mockResolvedValue({ enqueued: 0 }), + incrementEnqueued: vi.fn(), + } + + vi.mocked(ProgressTracker).mockImplementation( + () => mockProgressTracker as any, + ) + + vi.mocked(findWorkspaceFromDocument).mockResolvedValue({ + id: 'workspace-1', + } as unknown as Workspace) + vi.mocked(previewDataset).mockResolvedValue({ + // @ts-ignore + unwrap: () => ({ + rows: [ + ['value1', 'value2'], + ['value3', 'value4'], + ['value5', 'value6'], + ], + }), + }) + }) + + it('should process all rows and enqueue jobs', async () => { + await runBatchEvaluationJob.handler(mockJob) + + expect( + vi.mocked(queues.defaultQueue.addRunDocumentJob), + ).toHaveBeenCalledTimes(3) + expect( + vi.mocked(queues.defaultQueue.addRunDocumentJob), + ).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: 'workspace-1', + parameters: { param1: 'value1', param2: 'value2' }, + evaluationId: 1, + batchId: expect.any(String), + }), + ) + }) + + it('should use provided offset and runCount', async () => { + mockJob.data.offset = 1 + mockJob.data.runCount = 2 + + await runBatchEvaluationJob.handler(mockJob) + + expect(vi.mocked(previewDataset)).toHaveBeenCalledWith( + expect.objectContaining({ + fromLine: 1, + toLine: 3, + }), + ) + expect( + vi.mocked(queues.defaultQueue.addRunDocumentJob), + ).toHaveBeenCalledTimes(3) + }) + + it('should use provided batchId', async () => { + const batchId = randomUUID() + mockJob.data.batchId = batchId + + await runBatchEvaluationJob.handler(mockJob) + + expect( + vi.mocked(queues.defaultQueue.addRunDocumentJob), + ).toHaveBeenCalledWith( + expect.objectContaining({ + batchId, + }), + ) + }) + + it('should throw NotFoundError if workspace is not found', async () => { + // @ts-ignore + vi.mocked(findWorkspaceFromDocument).mockResolvedValue(null) + + await expect(runBatchEvaluationJob.handler(mockJob)).rejects.toThrow( + NotFoundError, + ) + }) + + it('should resume from last enqueued job on retry', async () => { + mockJob.attemptsMade = 1 + // @ts-ignore + mockProgressTracker.getProgress.mockResolvedValue({ enqueued: 2 }) + + await runBatchEvaluationJob.handler(mockJob) + + expect( + vi.mocked(queues.defaultQueue.addRunDocumentJob), + ).toHaveBeenCalledTimes(1) + expect( + vi.mocked(queues.defaultQueue.addRunDocumentJob), + ).toHaveBeenCalledWith( + expect.objectContaining({ + parameters: { param1: 'value5', param2: 'value6' }, + }), + ) + }) + + it('should initialize progress on first attempt', async () => { + await runBatchEvaluationJob.handler(mockJob) + + expect(mockProgressTracker.initializeProgress).toHaveBeenCalledWith(3) + }) + + it('should not initialize progress on retry attempts', async () => { + mockJob.attemptsMade = 1 + await runBatchEvaluationJob.handler(mockJob) + + expect(mockProgressTracker.initializeProgress).not.toHaveBeenCalled() + }) +}) diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.ts b/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.ts new file mode 100644 index 000000000..3d62958b8 --- /dev/null +++ b/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.ts @@ -0,0 +1,96 @@ +import { randomUUID } from 'crypto' + +import { + Dataset, + DocumentVersion, + EvaluationDto, +} from '@latitude-data/core/browser' +import { findWorkspaceFromDocument } from '@latitude-data/core/data-access' +import { disk } from '@latitude-data/core/lib/disk' +import { NotFoundError } from '@latitude-data/core/lib/errors' +import { connection } from '@latitude-data/core/redis' +import { CommitsRepository } from '@latitude-data/core/repositories' +import { previewDataset } from '@latitude-data/core/services/datasets/preview' +import { Job } from 'bullmq' + +import { queues } from '../..' +import { Queues } from '../../constants' +import { ProgressTracker } from '../../utils/progressTracker' + +type RunBatchEvaluationJobParams = { + evaluation: EvaluationDto + dataset: Dataset + document: DocumentVersion + runCount?: number + offset?: number + parametersMap?: Record + batchId?: string +} + +export const runBatchEvaluationJob = { + name: 'runBatchEvaluationJob', + queue: Queues.defaultQueue, + handler: async (job: Job) => { + const { + evaluation, + dataset, + document, + offset = 0, + parametersMap, + runCount: jobRunCount, + batchId = randomUUID(), + } = job.data + const workspace = await findWorkspaceFromDocument(document) + if (!workspace) throw new NotFoundError('Workspace not found') + + const commit = await new CommitsRepository(workspace.id) + .find(document.commitId) + .then((r) => r.unwrap()) + const fileMetadata = dataset.fileMetadata + const runCount = jobRunCount || fileMetadata.rowCount + // TODO: use streaming instead of this service in order to avoid loading the + // whole dataset in memory + const result = await previewDataset({ + dataset, + disk: disk, + fromLine: offset, + toLine: offset + runCount, + }).then((r) => r.unwrap()) + + const { rows } = result + + const parameters = rows.map((row) => { + return Object.fromEntries( + Object.entries(parametersMap!).map(([key, index]) => [ + key, + row[index]!, + ]), + ) + }) + + const progressTracker = new ProgressTracker(connection, batchId) + + if (job.attemptsMade === 0) { + await progressTracker.initializeProgress(parameters.length) + } + + const { enqueued } = await progressTracker.getProgress() + + // Enqueue runDocumentJob for each set of parameters, starting from the last + // enqueued job. This allows us to resume the batch if the job fails. + for (let i = enqueued; i < parameters.length; i++) { + await queues.defaultQueue.addRunDocumentJob({ + workspaceId: workspace.id, + document, + commit, + parameters: parameters[i]!, + evaluationId: evaluation.id, + batchId, + }) + + await progressTracker.incrementEnqueued() + } + + return { batchId } + }, +} diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.test.ts b/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.test.ts new file mode 100644 index 000000000..b02e8ad61 --- /dev/null +++ b/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.test.ts @@ -0,0 +1,89 @@ +import { LogSources } from '@latitude-data/core/browser' +import { Result } from '@latitude-data/core/lib/Result' +import { runDocumentAtCommit } from '@latitude-data/core/services/commits/runDocumentAtCommit' +import { env } from '@latitude-data/env' +import { Job } from 'bullmq' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +import { queues } from '../..' +import { ProgressTracker } from '../../utils/progressTracker' +import { runDocumentJob } from './runDocumentJob' + +// Mock dependencies +vi.mock('../../') +vi.mock('@latitude-data/core/redis') +vi.mock('@latitude-data/core/services/commits/runDocumentAtCommit') +vi.mock('@latitude-data/env') +vi.mock('../../utils/progressTracker') + +describe('runDocumentJob', () => { + const mockJob = { + data: { + workspaceId: 1, + document: { id: 'doc1' }, + commit: { id: 'commit1' }, + parameters: { param1: 'value1' }, + evaluationId: 123, + batchId: 'batch1', + }, + } as Job + + beforeEach(() => { + vi.resetAllMocks() + }) + + it('should run document and enqueue evaluation job on success', async () => { + const mockResult = { + response: Promise.resolve(), + documentLogUuid: 'log1', + } + // @ts-ignore + vi.mocked(runDocumentAtCommit).mockResolvedValue(Result.ok(mockResult)) + + await runDocumentJob.handler(mockJob) + + expect(runDocumentAtCommit).toHaveBeenCalledWith({ + workspaceId: 1, + document: { id: 'doc1' }, + commit: { id: 'commit1' }, + parameters: { param1: 'value1' }, + source: LogSources.Evaluation, + }) + + expect(queues.defaultQueue.addRunEvaluationJob).toHaveBeenCalledWith({ + documentLogUuid: 'log1', + evaluationId: 123, + batchId: 'batch1', + }) + + expect(ProgressTracker.prototype.incrementErrors).not.toHaveBeenCalled() + expect(ProgressTracker.prototype.decrementTotal).not.toHaveBeenCalled() + }) + + it('should handle errors and update progress tracker', async () => { + vi.mocked(runDocumentAtCommit).mockRejectedValue(new Error('Test error')) + vi.mocked(env).NODE_ENV = 'production' + + await runDocumentJob.handler(mockJob) + + expect(runDocumentAtCommit).toHaveBeenCalled() + expect(queues.defaultQueue.addRunEvaluationJob).not.toHaveBeenCalled() + + expect(ProgressTracker.prototype.incrementErrors).toHaveBeenCalled() + expect(ProgressTracker.prototype.decrementTotal).toHaveBeenCalled() + }) + + it('should log errors in non-production environment', async () => { + const testError = new Error('Test error') + vi.mocked(runDocumentAtCommit).mockRejectedValue(testError) + vi.mocked(env).NODE_ENV = 'development' + + const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + + await runDocumentJob.handler(mockJob) + + expect(consoleSpy).toHaveBeenCalledWith(testError) + expect(ProgressTracker.prototype.incrementErrors).toHaveBeenCalled() + expect(ProgressTracker.prototype.decrementTotal).toHaveBeenCalled() + }) +}) diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.ts b/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.ts new file mode 100644 index 000000000..a3a37fe21 --- /dev/null +++ b/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.ts @@ -0,0 +1,59 @@ +import { + Commit, + DocumentVersion, + LogSources, +} from '@latitude-data/core/browser' +import { connection } from '@latitude-data/core/redis' +import { runDocumentAtCommit } from '@latitude-data/core/services/commits/runDocumentAtCommit' +import { env } from '@latitude-data/env' +import { Job } from 'bullmq' + +import { queues } from '../..' +import { Queues } from '../../constants' +import { ProgressTracker } from '../../utils/progressTracker' + +type RunDocumentJobData = { + workspaceId: number + document: DocumentVersion + commit: Commit + parameters: Record + evaluationId: number + batchId: string +} + +export const runDocumentJob = { + name: 'runDocumentJob', + queue: Queues.defaultQueue, + handler: async (job: Job) => { + const { workspaceId, document, commit, parameters, evaluationId, batchId } = + job.data + + const progressTracker = new ProgressTracker(connection, batchId) + + try { + const result = await runDocumentAtCommit({ + workspaceId, + document, + commit, + parameters, + source: LogSources.Evaluation, + }).then((r) => r.unwrap()) + + await result.response + + // Enqueue the evaluation job + await queues.defaultQueue.addRunEvaluationJob({ + documentLogUuid: result.documentLogUuid, + evaluationId, + batchId, + }) + } catch (error) { + if (env.NODE_ENV !== 'production') { + console.error(error) + } + + await progressTracker.incrementErrors() + await progressTracker.decrementTotal() + } + }, +} diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runEvaluationJob.ts b/packages/jobs/src/job-definitions/batchEvaluations/runEvaluationJob.ts new file mode 100644 index 000000000..690d07c7e --- /dev/null +++ b/packages/jobs/src/job-definitions/batchEvaluations/runEvaluationJob.ts @@ -0,0 +1,45 @@ +import { connection } from '@latitude-data/core/redis' +import { env } from '@latitude-data/env' +import { Job } from 'bullmq' + +import { Queues } from '../../constants' +import { ProgressTracker } from '../../utils/progressTracker' + +type RunEvaluationJobData = { + documentLogUuid: string + evaluationId: number + batchId: string +} + +export const runEvaluationJob = { + name: 'runEvaluationJob', + queue: Queues.defaultQueue, + handler: async (job: Job) => { + const { batchId } = job.data + + const progressTracker = new ProgressTracker(connection, batchId) + + try { + // Mock implementation of evaluation logic + const mockEvaluationLogic = async () => { + // Simulate some processing time + await new Promise((resolve) => setTimeout(resolve, 2000)) + + // Randomly throw an error 5% of the time + if (Math.random() < 0.05) { + throw new Error('Random evaluation error') + } + } + + await mockEvaluationLogic() + await progressTracker.incrementCompleted() + } catch (error) { + if (env.NODE_ENV !== 'production') { + console.error('Error in runEvaluationJob:', error) + } + + await progressTracker.incrementErrors() + await progressTracker.decrementTotal() + } + }, +} diff --git a/packages/jobs/src/job-definitions/documentLogs/createJob.ts b/packages/jobs/src/job-definitions/documentLogs/createJob.ts index fa09166cc..6a8790095 100644 --- a/packages/jobs/src/job-definitions/documentLogs/createJob.ts +++ b/packages/jobs/src/job-definitions/documentLogs/createJob.ts @@ -4,10 +4,14 @@ import { } from '@latitude-data/core/services/documentLogs/create' import { Job } from 'bullmq' +import { Queues } from '../../constants' + export type CreateDocumentLogJobData = CreateDocumentLogProps -export const createDocumentLogJob = async ( - job: Job, -) => { - await createDocumentLog(job.data).then((r) => r.unwrap()) +export const createDocumentLogJob = { + name: 'createDocumentLogJob', + queue: Queues.defaultQueue, + handler: async (job: Job) => { + await createDocumentLog(job.data).then((r) => r.unwrap()) + }, } diff --git a/packages/jobs/src/job-definitions/events/createEventJob.ts b/packages/jobs/src/job-definitions/events/createEventJob.ts index 0ee0d4966..f5b238b0f 100644 --- a/packages/jobs/src/job-definitions/events/createEventJob.ts +++ b/packages/jobs/src/job-definitions/events/createEventJob.ts @@ -2,8 +2,14 @@ import { LatitudeEvent } from '@latitude-data/core/events/handlers/index' import { createEvent } from '@latitude-data/core/services/events/create' import { Job } from 'bullmq' -export const createEventJob = async (job: Job) => { - const event = job.data +import { Queues } from '../../constants' - await createEvent(event) +export const createEventJob = { + name: 'createEventJob', + queue: Queues.eventsQueue, + handler: async (job: Job) => { + const event = job.data + + await createEvent(event) + }, } diff --git a/packages/jobs/src/job-definitions/events/publishEventJob.ts b/packages/jobs/src/job-definitions/events/publishEventJob.ts index 20dc5a5ed..92bea2bfc 100644 --- a/packages/jobs/src/job-definitions/events/publishEventJob.ts +++ b/packages/jobs/src/job-definitions/events/publishEventJob.ts @@ -6,26 +6,31 @@ import { env } from '@latitude-data/env' import { Job } from 'bullmq' import { buildConnection } from '../../connection' +import { Queues } from '../../constants' import { setupQueues } from '../../queues' let queues: ReturnType -export const publishEventJob = async (job: Job) => { - const event = job.data - const handlers = EventHandlers[event.type] - if (!handlers?.length) return +export const publishEventJob = { + name: 'publishEventJob', + queue: Queues.eventsQueue, + handler: async (job: Job) => { + const event = job.data + const handlers = EventHandlers[event.type] + if (!handlers?.length) return - handlers.forEach((handler) => { - if (!queues) { - const connection = buildConnection({ - host: env.REDIS_HOST, - port: env.REDIS_PORT, - password: env.REDIS_PASSWORD, - }) + handlers.forEach((handler) => { + if (!queues) { + const connection = buildConnection({ + host: env.REDIS_HOST, + port: env.REDIS_PORT, + password: env.REDIS_PASSWORD, + }) - queues = setupQueues({ connection }) - } + queues = setupQueues({ connection }) + } - queues.eventsQueue.queue.add(handler.name, event) - }) + queues.eventsQueue.queue.add(handler.name, event) + }) + }, } diff --git a/packages/jobs/src/job-definitions/index.ts b/packages/jobs/src/job-definitions/index.ts index b19cc900a..c8ca1e18e 100644 --- a/packages/jobs/src/job-definitions/index.ts +++ b/packages/jobs/src/job-definitions/index.ts @@ -1,26 +1,6 @@ -import { LatitudeEvent } from '@latitude-data/core/events/handlers/index' - -import { Jobs, Queues } from '../constants' -import { CreateDocumentLogJobData } from './documentLogs/createJob' - -export type JobDataMap = { - [Jobs.createDocumentLogJob]: CreateDocumentLogJobData - [Jobs.publishEventJob]: LatitudeEvent -} - -type JobData = J extends keyof JobDataMap - ? JobDataMap[J] - : never - -type JobSpec = { - name: J - data: JobData -} - -export type JobDefinition = { - [K in Queues]: { - [K in Jobs]: JobSpec - } -} - +export * from './batchEvaluations/runBatchEvaluationJob' +export * from './batchEvaluations/runDocumentJob' +export * from './batchEvaluations/runEvaluationJob' export * from './documentLogs/createJob' +export * from './events/createEventJob' +export * from './events/publishEventJob' diff --git a/packages/jobs/src/queues/index.ts b/packages/jobs/src/queues/index.ts index b3823adfb..598b02e79 100644 --- a/packages/jobs/src/queues/index.ts +++ b/packages/jobs/src/queues/index.ts @@ -1,26 +1,15 @@ -import { EventHandlers } from '@latitude-data/core/events/handlers/index' -import { Job, JobsOptions, Queue, QueueEvents } from 'bullmq' +import { JobsOptions, Queue } from 'bullmq' import { Redis } from 'ioredis' -import { Jobs, Queues } from '../constants' -import { createDocumentLogJob, JobDefinition } from '../job-definitions' +import { Queues } from '../constants' +import { createDocumentLogJob } from '../job-definitions' +import { runBatchEvaluationJob } from '../job-definitions/batchEvaluations/runBatchEvaluationJob' +import { runDocumentJob } from '../job-definitions/batchEvaluations/runDocumentJob' +import { runEvaluationJob } from '../job-definitions/batchEvaluations/runEvaluationJob' import { createEventJob } from '../job-definitions/events/createEventJob' import { publishEventJob } from '../job-definitions/events/publishEventJob' -export function capitalize(string: string) { - return string.charAt(0).toUpperCase() + string.slice(1) -} - -type EnqueueFunctionName = `enqueue${Capitalize}` - -type JobEnqueueFn = { - [P in EnqueueFunctionName]: ( - params: JobDefinition[Queues][Jobs]['data'], - options?: JobsOptions, - ) => Promise> -} - -const attempts = process.env.NODE_ENV === 'production' ? 100 : 3 +const attempts = process.env.NODE_ENV === 'production' ? 10 : 3 export const DEFAULT_JOB_OPTIONS: JobsOptions = { attempts, @@ -30,65 +19,55 @@ export const DEFAULT_JOB_OPTIONS: JobsOptions = { }, } -function setupQueue({ - name, - connection, - jobs, -}: { - name: Queues - connection: Redis - jobs: readonly QueueJob[] -}) { - const queue = new Queue(name, { +export function setupQueues({ connection }: { connection: Redis }) { + const defaultQueue = new Queue(Queues.defaultQueue, { + connection, + defaultJobOptions: DEFAULT_JOB_OPTIONS, + }) + const eventsQueue = new Queue(Queues.eventsQueue, { + connection, + defaultJobOptions: DEFAULT_JOB_OPTIONS, + }) + const eventHandlersQueue = new Queue(Queues.eventHandlersQueue, { connection, defaultJobOptions: DEFAULT_JOB_OPTIONS, }) - const jobz = jobs.reduce((acc, job) => { - const key = `enqueue${capitalize(job.name)}` as EnqueueFunctionName< - typeof job.name - > - const enqueueFn = ( - params: JobDefinition[typeof name][Jobs]['data'], - options: JobsOptions, - ) => queue.add(job.name, params, options) - - return { ...acc, [key]: enqueueFn } - }, {} as JobEnqueueFn) return { - queue, - events: new QueueEvents(name, { connection }), - jobs: jobz, - } -} - -export const QUEUES = { - [Queues.defaultQueue]: { - name: Queues.defaultQueue, - jobs: [createDocumentLogJob], - }, - [Queues.eventsQueue]: { - name: Queues.eventsQueue, - jobs: [publishEventJob, createEventJob], - }, - [Queues.eventHandlersQueue]: { - name: Queues.eventHandlersQueue, - jobs: Object.values(EventHandlers).flat(), - }, -} as const - -type QueueJob = (typeof QUEUES)[keyof typeof QUEUES]['jobs'][number] - -export function setupQueues({ connection }: { connection: Redis }) { - return Object.entries(QUEUES).reduce<{ - [K in keyof typeof QUEUES]: ReturnType - }>( - (acc, [name, { jobs }]) => { - return { - ...acc, - [name]: setupQueue({ name: name as Queues, connection, jobs }), - } + [Queues.defaultQueue]: { + queue: defaultQueue, + addCreateDocumentLogJob: ( + data: Parameters[0]['data'], + options?: JobsOptions, + ) => defaultQueue.add(createDocumentLogJob.name, data, options), + addRunDocumentJob: ( + data: Parameters[0]['data'], + options?: JobsOptions, + ) => defaultQueue.add(runDocumentJob.name, data, options), + addRunEvaluationJob: ( + data: Parameters[0]['data'], + options?: JobsOptions, + ) => defaultQueue.add(runEvaluationJob.name, data, options), + addRunBatchEvaluationJob: ( + data: Parameters[0]['data'], + options?: JobsOptions, + ) => defaultQueue.add(runBatchEvaluationJob.name, data, options), }, - {} as { [K in keyof typeof QUEUES]: ReturnType }, - ) + [Queues.eventsQueue]: { + queue: eventsQueue, + addPublishEventJob: ( + data: Parameters[0]['data'], + options?: JobsOptions, + ) => eventsQueue.add(publishEventJob.name, data, options), + addCreateEventJob: ( + data: Parameters[0]['data'], + options?: JobsOptions, + ) => eventsQueue.add(createEventJob.name, data, options), + }, + [Queues.eventHandlersQueue]: { + queue: eventHandlersQueue, + addJob: (name: string, data: any, options?: JobsOptions) => + eventHandlersQueue.add(name, data, options), + }, + } } diff --git a/packages/jobs/src/utils/connection.ts b/packages/jobs/src/utils/connection.ts new file mode 100644 index 000000000..71e69ef8b --- /dev/null +++ b/packages/jobs/src/utils/connection.ts @@ -0,0 +1,9 @@ +import { env } from '@latitude-data/env' + +import { buildConnection } from '../connection' + +export const connection = buildConnection({ + host: env.REDIS_HOST, + port: env.REDIS_PORT, + password: env.REDIS_PASSWORD, +}) diff --git a/packages/jobs/src/utils/progressTracker.ts b/packages/jobs/src/utils/progressTracker.ts new file mode 100644 index 000000000..d87961049 --- /dev/null +++ b/packages/jobs/src/utils/progressTracker.ts @@ -0,0 +1,51 @@ +import { Redis } from 'ioredis' + +export class ProgressTracker { + constructor( + private redis: Redis, + private batchId: string, + ) {} + + private getKey(suffix: string) { + return `batch:${this.batchId}:${suffix}` + } + + async initializeProgress(total: number) { + const multi = this.redis.multi() + multi.set(this.getKey('total'), total) + multi.set(this.getKey('completed'), 0) + multi.set(this.getKey('errors'), 0) + await multi.exec() + } + + async incrementCompleted() { + await this.redis.incr(this.getKey('completed')) + } + + async incrementErrors() { + await this.redis.incr(this.getKey('errors')) + } + + async decrementTotal() { + await this.redis.decr(this.getKey('total')) + } + + async incrementEnqueued() { + await this.redis.incr(this.getKey('enqueued')) + } + + async getProgress() { + const [total, completed, errors, enqueued] = await this.redis.mget([ + this.getKey('total'), + this.getKey('completed'), + this.getKey('errors'), + this.getKey('enqueued'), + ]) + return { + total: parseInt(total || '0', 10), + completed: parseInt(completed || '0', 10), + errors: parseInt(errors || '0', 10), + enqueued: parseInt(enqueued || '0', 10), + } + } +} diff --git a/packages/jobs/src/workers/_shared.ts b/packages/jobs/src/workers/_shared.ts index 7811bcf4a..87bf55cd1 100644 --- a/packages/jobs/src/workers/_shared.ts +++ b/packages/jobs/src/workers/_shared.ts @@ -1,20 +1,14 @@ -import { Processor } from 'bullmq' +import { Job, Processor } from 'bullmq' import { Queues } from '../constants' -import { QUEUES } from '../queues' +import * as jobDefinitions from '../job-definitions' export const buildProcessor = - (queues: Queues[]): Processor => - async (job) => { - await Promise.all( - queues.map(async (q) => { - await Promise.all( - QUEUES[q].jobs.map(async (j) => { - if (j.name === job.name) { - await j(job) - } - }), - ) - }), - ) + (subscribedQueues: Queues[]): Processor => + async (job: Job) => { + const jobHandler = (jobDefinitions as Record)[job.name] + if (!jobHandler) return + if (!subscribedQueues.includes(jobHandler.queue)) return + + await jobHandler.handler(job) } diff --git a/packages/jobs/src/workers/index.ts b/packages/jobs/src/workers/index.ts index 666801f0e..ee7b70537 100644 --- a/packages/jobs/src/workers/index.ts +++ b/packages/jobs/src/workers/index.ts @@ -1,6 +1,6 @@ import { Worker } from 'bullmq' -import { Redis } from 'ioredis' +import { connection } from '../utils/connection' import { defaultWorker } from './worker-definitions/defaultWorker' const WORKER_OPTS = { @@ -13,13 +13,13 @@ const WORKER_OPTS = { } const WORKERS = [defaultWorker] -export default function startWorkers({ connection }: { connection: Redis }) { +export default function startWorkers() { return WORKERS.flatMap((w) => w.queues.map( (q) => new Worker(q, w.processor, { ...WORKER_OPTS, - connection, + connection: connection, }), ), ) diff --git a/packages/sdks/typescript/rollup.config.mjs b/packages/sdks/typescript/rollup.config.mjs index 046e58465..a1e678f13 100644 --- a/packages/sdks/typescript/rollup.config.mjs +++ b/packages/sdks/typescript/rollup.config.mjs @@ -13,9 +13,13 @@ const aliasEntries = { find: '@latitude-data/core/browser', replacement: path.resolve(__dirname, '../../core/src/browser'), }, + { + find: '@latitude-data/core/lib/Result', + replacement: path.resolve(__dirname, '../../core/src/lib/Result'), + }, ], } -const EXTERNALS = ['@t3-oss/env-core', 'zod'] +const EXTERNALS = ['@t3-oss/env-core', 'zod', 'flydrive/types', 'stream'] const config = [ { input: 'src/index.ts', diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 36dcaee71..85c710026 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -311,7 +311,7 @@ importers: version: 4.6.0(monaco-editor@0.50.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) '@sentry/nextjs': specifier: ^8 - version: 8.29.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.26.0(@opentelemetry/api@1.9.0))(@opentelemetry/instrumentation@0.53.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.26.0(@opentelemetry/api@1.9.0))(encoding@0.1.13)(next@14.3.0-canary.87(@babel/core@7.25.2)(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522)(webpack@5.94.0(esbuild@0.19.12)) + version: 8.29.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.26.0(@opentelemetry/api@1.9.0))(@opentelemetry/instrumentation@0.53.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.26.0(@opentelemetry/api@1.9.0))(encoding@0.1.13)(next@14.3.0-canary.87(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522)(webpack@5.94.0) '@t3-oss/env-nextjs': specifier: ^0.10.1 version: 0.10.1(typescript@5.5.4)(zod@3.23.8) @@ -339,15 +339,18 @@ importers: monaco-editor: specifier: ^0.50.0 version: 0.50.0 + nanoid: + specifier: ^5.0.7 + version: 5.0.7 next: specifier: ^14.3.0-canary.87 - version: 14.3.0-canary.87(@babel/core@7.25.2)(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) + version: 14.3.0-canary.87(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) next-themes: specifier: ^0.3.0 version: 0.3.0(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) nextjs-toploader: specifier: ^1.6.12 - version: 1.6.12(next@14.3.0-canary.87(@babel/core@7.25.2)(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522))(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) + version: 1.6.12(next@14.3.0-canary.87(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522))(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) nprogress: specifier: ^0.2.0 version: 0.2.0 @@ -647,13 +650,13 @@ importers: packages/jobs: dependencies: - '@latitude-data/core': - specifier: workspace:^ - version: link:../core '@latitude-data/env': specifier: workspace:* version: link:../env devDependencies: + '@latitude-data/core': + specifier: workspace:^ + version: link:../core '@latitude-data/eslint-config': specifier: workspace:* version: link:../../tools/eslint @@ -669,6 +672,9 @@ importers: ioredis: specifier: ^5.4.1 version: 5.4.1 + vitest: + specifier: ^2.0.5 + version: 2.0.5(@types/node@22.5.4)(jsdom@24.1.3)(terser@5.32.0) zod: specifier: ^3.23.8 version: 3.23.8 @@ -2277,6 +2283,9 @@ packages: peerDependencies: drizzle-orm: '>= 0.29 <1' lucia: 3.x + peerDependenciesMeta: + drizzle-orm: + optional: true '@lukeed/ms@2.0.2': resolution: {integrity: sha512-9I2Zn6+NJLfaGoz9jN3lpwDgAYvfGeNYdbAIjJOqzs4Tpc+VU3Jqq4IofSUBKajiDS8k9fZIg18/z13mpk1bsA==} @@ -10461,8 +10470,9 @@ snapshots: '@lucia-auth/adapter-drizzle@1.1.0(drizzle-orm@0.33.0(@opentelemetry/api@1.9.0)(@types/pg@8.11.8)(@types/react@18.3.0)(pg@8.12.0)(react@19.0.0-rc-f994737d14-20240522))(lucia@3.2.0)': dependencies: - drizzle-orm: 0.33.0(@opentelemetry/api@1.9.0)(@types/pg@8.11.8)(@types/react@18.3.0)(pg@8.12.0)(react@19.0.0-rc-f994737d14-20240522) lucia: 3.2.0 + optionalDependencies: + drizzle-orm: 0.33.0(@opentelemetry/api@1.9.0)(@types/pg@8.11.8)(@types/react@18.3.0)(pg@8.12.0)(react@19.0.0-rc-f994737d14-20240522) '@lukeed/ms@2.0.2': {} @@ -12374,7 +12384,7 @@ snapshots: '@sentry/types': 8.29.0 '@sentry/utils': 8.29.0 - '@sentry/nextjs@8.29.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.26.0(@opentelemetry/api@1.9.0))(@opentelemetry/instrumentation@0.53.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.26.0(@opentelemetry/api@1.9.0))(encoding@0.1.13)(next@14.3.0-canary.87(@babel/core@7.25.2)(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522)(webpack@5.94.0(esbuild@0.19.12))': + '@sentry/nextjs@8.29.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.26.0(@opentelemetry/api@1.9.0))(@opentelemetry/instrumentation@0.53.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.26.0(@opentelemetry/api@1.9.0))(encoding@0.1.13)(next@14.3.0-canary.87(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522)(webpack@5.94.0)': dependencies: '@opentelemetry/instrumentation-http': 0.53.0(@opentelemetry/api@1.9.0) '@opentelemetry/semantic-conventions': 1.27.0 @@ -12386,14 +12396,14 @@ snapshots: '@sentry/types': 8.29.0 '@sentry/utils': 8.29.0 '@sentry/vercel-edge': 8.29.0 - '@sentry/webpack-plugin': 2.22.3(encoding@0.1.13)(webpack@5.94.0(esbuild@0.19.12)) + '@sentry/webpack-plugin': 2.22.3(encoding@0.1.13)(webpack@5.94.0) chalk: 3.0.0 - next: 14.3.0-canary.87(@babel/core@7.25.2)(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) + next: 14.3.0-canary.87(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) resolve: 1.22.8 rollup: 3.29.4 stacktrace-parser: 0.1.10 optionalDependencies: - webpack: 5.94.0(esbuild@0.19.12) + webpack: 5.94.0 transitivePeerDependencies: - '@opentelemetry/api' - '@opentelemetry/core' @@ -12472,12 +12482,12 @@ snapshots: '@sentry/types': 8.29.0 '@sentry/utils': 8.29.0 - '@sentry/webpack-plugin@2.22.3(encoding@0.1.13)(webpack@5.94.0(esbuild@0.19.12))': + '@sentry/webpack-plugin@2.22.3(encoding@0.1.13)(webpack@5.94.0)': dependencies: '@sentry/bundler-plugin-core': 2.22.3(encoding@0.1.13) unplugin: 1.0.1 uuid: 9.0.1 - webpack: 5.94.0(esbuild@0.19.12) + webpack: 5.94.0 transitivePeerDependencies: - encoding - supports-color @@ -16136,7 +16146,7 @@ snapshots: - '@babel/core' - babel-plugin-macros - next@14.3.0-canary.87(@babel/core@7.25.2)(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522): + next@14.3.0-canary.87(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522): dependencies: '@next/env': 14.3.0-canary.87 '@swc/helpers': 0.5.11 @@ -16146,7 +16156,7 @@ snapshots: postcss: 8.4.31 react: 19.0.0-rc-f994737d14-20240522 react-dom: 19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522) - styled-jsx: 5.1.6(@babel/core@7.25.2)(react@19.0.0-rc-f994737d14-20240522) + styled-jsx: 5.1.6(react@19.0.0-rc-f994737d14-20240522) optionalDependencies: '@next/swc-darwin-arm64': 14.3.0-canary.87 '@next/swc-darwin-x64': 14.3.0-canary.87 @@ -16163,9 +16173,9 @@ snapshots: - '@babel/core' - babel-plugin-macros - nextjs-toploader@1.6.12(next@14.3.0-canary.87(@babel/core@7.25.2)(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522))(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522): + nextjs-toploader@1.6.12(next@14.3.0-canary.87(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522))(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522): dependencies: - next: 14.3.0-canary.87(@babel/core@7.25.2)(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) + next: 14.3.0-canary.87(@opentelemetry/api@1.9.0)(react-dom@19.0.0-rc-f994737d14-20240522(react@19.0.0-rc-f994737d14-20240522))(react@19.0.0-rc-f994737d14-20240522) nprogress: 0.2.0 prop-types: 15.8.1 react: 19.0.0-rc-f994737d14-20240522 @@ -17540,12 +17550,10 @@ snapshots: optionalDependencies: '@babel/core': 7.24.5 - styled-jsx@5.1.6(@babel/core@7.25.2)(react@19.0.0-rc-f994737d14-20240522): + styled-jsx@5.1.6(react@19.0.0-rc-f994737d14-20240522): dependencies: client-only: 0.0.1 react: 19.0.0-rc-f994737d14-20240522 - optionalDependencies: - '@babel/core': 7.25.2 sucrase@3.35.0: dependencies: @@ -17705,16 +17713,14 @@ snapshots: mkdirp: 1.0.4 yallist: 4.0.0 - terser-webpack-plugin@5.3.10(esbuild@0.19.12)(webpack@5.94.0(esbuild@0.19.12)): + terser-webpack-plugin@5.3.10(webpack@5.94.0): dependencies: '@jridgewell/trace-mapping': 0.3.25 jest-worker: 27.5.1 schema-utils: 3.3.0 serialize-javascript: 6.0.2 terser: 5.32.0 - webpack: 5.94.0(esbuild@0.19.12) - optionalDependencies: - esbuild: 0.19.12 + webpack: 5.94.0 terser@5.32.0: dependencies: @@ -18369,7 +18375,7 @@ snapshots: webpack-virtual-modules@0.5.0: {} - webpack@5.94.0(esbuild@0.19.12): + webpack@5.94.0: dependencies: '@types/estree': 1.0.5 '@webassemblyjs/ast': 1.12.1 @@ -18391,7 +18397,7 @@ snapshots: neo-async: 2.6.2 schema-utils: 3.3.0 tapable: 2.2.1 - terser-webpack-plugin: 5.3.10(esbuild@0.19.12)(webpack@5.94.0(esbuild@0.19.12)) + terser-webpack-plugin: 5.3.10(webpack@5.94.0) watchpack: 2.4.2 webpack-sources: 3.2.3 transitivePeerDependencies: