Skip to content

Commit

Permalink
feature: run evaluations in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
geclos committed Sep 10, 2024
1 parent c3dcfb9 commit 68e20e4
Show file tree
Hide file tree
Showing 33 changed files with 985 additions and 137 deletions.
9 changes: 1 addition & 8 deletions apps/gateway/src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
import { setupJobs } from '@latitude-data/jobs'
import env from '$/common/env'

export const { queues } = setupJobs({
connectionParams: {
host: env.REDIS_HOST,
port: env.REDIS_PORT,
password: env.REDIS_PASSWORD,
},
})
export const queues = setupJobs()
1 change: 1 addition & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"lodash-es": "^4.17.21",
"lucia": "^3.2.0",
"monaco-editor": "^0.50.0",
"nanoid": "^5.0.7",
"next": "^14.3.0-canary.87",
"next-themes": "^0.3.0",
"nextjs-toploader": "^1.6.12",
Expand Down
2 changes: 0 additions & 2 deletions apps/web/src/actions/datasets/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import { DatasetsRepository } from '@latitude-data/core/repositories'
import { createDataset } from '@latitude-data/core/services/datasets/create'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
Expand Down Expand Up @@ -78,7 +77,6 @@ export const createDatasetAction = authProcedure
return createDataset({
workspace: ctx.workspace,
author: ctx.user,
disk: disk,
data: {
name: input.name,
file: input.dataset_file,
Expand Down
3 changes: 1 addition & 2 deletions apps/web/src/actions/datasets/destroy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import { DatasetsRepository } from '@latitude-data/core/repositories'
import { destroyDataset } from '@latitude-data/core/services/datasets/destroy'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
Expand All @@ -19,5 +18,5 @@ export const destroyDatasetAction = authProcedure
const repo = new DatasetsRepository(ctx.workspace.id)
const dataset = await repo.find(id).then((r) => r.unwrap())

return await destroyDataset({ dataset, disk }).then((r) => r.unwrap())
return await destroyDataset({ dataset }).then((r) => r.unwrap())
})
3 changes: 1 addition & 2 deletions apps/web/src/actions/datasets/preview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import { DatasetsRepository } from '@latitude-data/core/repositories'
import { previewDataset } from '@latitude-data/core/services/datasets/preview'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
Expand All @@ -17,5 +16,5 @@ export const previewDatasetAction = authProcedure
.handler(async ({ ctx, input }) => {
const repo = new DatasetsRepository(ctx.workspace.id)
const dataset = await repo.find(input.id).then((r) => r.unwrap())
return await previewDataset({ dataset, disk }).then((r) => r.unwrap())
return await previewDataset({ dataset }).then((r) => r.unwrap())
})
181 changes: 181 additions & 0 deletions apps/web/src/actions/evaluations/runBatch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import {
Commit,
Dataset,
DocumentVersion,
EvaluationDto,
Project,
ProviderApiKey,
Providers,
User,
Workspace,
} from '@latitude-data/core/browser'
import * as factories from '@latitude-data/core/factories'
import { beforeEach, describe, expect, it, vi } from 'vitest'

import { runBatchAction } from './runBatch'

const mocks = vi.hoisted(() => ({
getSession: vi.fn(),
queues: {
defaultQueue: {
jobs: {
enqueueRunBatchEvaluationJob: vi.fn(),
},
},
eventsQueue: {
jobs: {
enqueueCreateEventJob: vi.fn(),
enqueuePublishEventJob: vi.fn(),
},
},
},
}))

vi.mock('$/services/auth/getSession', () => ({
getSession: mocks.getSession,
}))

vi.mock('@latitude-data/jobs', () => ({
setupJobs: vi.fn().mockImplementation(() => mocks.queues),
}))

describe('runBatchAction', () => {
describe('unauthorized', () => {
it('errors when the user is not authenticated', async () => {
const [_, error] = await runBatchAction({
datasetId: 1,
projectId: 1,
documentUuid: 'doc-uuid',
commitUuid: 'commit-uuid',
runCount: 5,
evaluationId: 1,
})

expect(error!.name).toEqual('UnauthorizedError')
})
})

describe('authorized', () => {
let workspace: Workspace,
user: User,
project: Project,
document: DocumentVersion,
commit: Commit,
dataset: Dataset,
evaluation: EvaluationDto,
provider: ProviderApiKey

beforeEach(async () => {
const setup = await factories.createProject({
documents: { 'test-doc': 'Test content' },
})
workspace = setup.workspace
user = setup.user
project = setup.project
document = setup.documents[0]!
commit = setup.commit

provider = await factories.createProviderApiKey({
workspace,
type: Providers.OpenAI,
name: 'Test Provider',
user,
})

dataset = await factories
.createDataset({
name: 'Test Dataset',
workspace,
author: user,
})
.then((result) => result.dataset)

evaluation = await factories.createEvaluation({
provider,
name: 'Test Evaluation',
})

mocks.getSession.mockReturnValue({
user,
workspace: { id: workspace.id, name: workspace.name },
})
})

it('successfully enqueues a batch evaluation job', async () => {
const [result, error] = await runBatchAction({
datasetId: dataset.id,
projectId: project.id,
documentUuid: document.documentUuid,
commitUuid: commit.uuid,
fromLine: 0,
toLine: 5,
evaluationId: evaluation.id,
})

expect(error).toBeNull()
expect(result).toEqual({
success: true,
batchId: expect.any(String),
})

expect(
mocks.queues.defaultQueue.jobs.enqueueRunBatchEvaluationJob,
).toHaveBeenCalledWith(
expect.objectContaining({
evaluation: expect.objectContaining({ id: evaluation.id }),
dataset: expect.objectContaining({ id: dataset.id }),
document: expect.objectContaining({
documentUuid: document.documentUuid,
}),
fromLine: 0,
toLine: 5,
parametersMap: undefined,
batchId: expect.any(String),
}),
)
})

it('handles optional parameters', async () => {
const [result, error] = await runBatchAction({
datasetId: dataset.id,
projectId: project.id,
documentUuid: document.documentUuid,
commitUuid: commit.uuid,
fromLine: 10,
toLine: 20,
evaluationId: evaluation.id,
parameters: { 1: 100, 2: 200 },
})

expect(error).toBeNull()
expect(result).toEqual({
success: true,
batchId: expect.any(String),
})

expect(
mocks.queues.defaultQueue.jobs.enqueueRunBatchEvaluationJob,
).toHaveBeenCalledWith(
expect.objectContaining({
fromLine: 10,
toLine: 20,
parametersMap: { 1: 100, 2: 200 },
}),
)
})

it('handles errors when resources are not found', async () => {
const [_, error] = await runBatchAction({
datasetId: 999999,
projectId: project.id,
documentUuid: document.documentUuid,
commitUuid: commit.uuid,
runCount: 5,
evaluationId: evaluation.id,
})

expect(error).not.toBeNull()
expect(error!.message).toContain('not found')
})
})
})
60 changes: 60 additions & 0 deletions apps/web/src/actions/evaluations/runBatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import {
DatasetsRepository,
DocumentVersionsRepository,
EvaluationsRepository,
} from '@latitude-data/core/repositories'
import { setupJobs } from '@latitude-data/jobs'
import { nanoid } from 'nanoid'
import { z } from 'zod'

import { authProcedure } from '../procedures'

export const runBatchAction = authProcedure
.createServerAction()
.input(
z.object({
datasetId: z.number(),
projectId: z.number(),
documentUuid: z.string(),
commitUuid: z.string(),
fromLine: z.number(),
toLine: z.number(),
parameters: z.record(z.number()).optional(),
evaluationId: z.number(),
}),
)
.handler(async ({ input, ctx }) => {
const evaluationsRepo = new EvaluationsRepository(ctx.workspace.id)
const evaluation = await evaluationsRepo
.find(input.evaluationId)
.then((r) => r.unwrap())

const datasetsRepo = new DatasetsRepository(ctx.workspace.id)
const dataset = await datasetsRepo
.find(input.datasetId)
.then((r) => r.unwrap())

const docsRepo = new DocumentVersionsRepository(ctx.workspace.id)
const document = await docsRepo
.getDocumentAtCommit({
projectId: input.projectId,
commitUuid: input.commitUuid,
documentUuid: input.documentUuid,
})
.then((r) => r.unwrap())

const batchId = nanoid()
const queues = setupJobs()

await queues.defaultQueue.jobs.enqueueRunBatchEvaluationJob({
evaluation,
dataset,
document,
fromLine: input.fromLine,
toLine: input.toLine,
parametersMap: input.parameters,
batchId,
})

return { success: true, batchId }
})
9 changes: 1 addition & 8 deletions apps/web/src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
import { env } from '@latitude-data/env'
import { setupJobs } from '@latitude-data/jobs'

export const { queues } = setupJobs({
connectionParams: {
host: env.REDIS_HOST,
port: env.REDIS_PORT,
password: env.REDIS_PASSWORD,
},
})
export const queues = setupJobs()
14 changes: 0 additions & 14 deletions apps/web/src/lib/disk.ts

This file was deleted.

9 changes: 1 addition & 8 deletions apps/workers/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import { setupWorkers } from '@latitude-data/jobs'
import env from '$/env'

const workers = setupWorkers({
connectionParams: {
host: env.REDIS_HOST,
port: env.REDIS_PORT,
password: env.REDIS_PASSWORD,
},
})
const workers = setupWorkers()

console.log('Workers started')

Expand Down
17 changes: 3 additions & 14 deletions packages/core/src/events/publisher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { env } from '@latitude-data/env'
import { setupJobs } from '@latitude-data/jobs'

import { EventHandler, EventHandlers, LatitudeEvent } from './handlers'

let jobs: ReturnType<typeof setupJobs>

export const publisher = {
publish: async (event: LatitudeEvent) => {
const handlers = EventHandlers[event.type] as EventHandler<typeof event>[]
Expand All @@ -15,17 +12,9 @@ export const publisher = {
)
},
publishLater: (event: LatitudeEvent) => {
if (!jobs) {
jobs = setupJobs({
connectionParams: {
host: env.REDIS_HOST,
port: env.REDIS_PORT,
password: env.REDIS_PASSWORD,
},
})
}
const queues = setupJobs()

jobs.queues.eventsQueue.jobs.enqueueCreateEventJob(event)
jobs.queues.eventsQueue.jobs.enqueuePublishEventJob(event)
queues.eventsQueue.jobs.enqueueCreateEventJob(event)
queues.eventsQueue.jobs.enqueuePublishEventJob(event)
},
}
Loading

0 comments on commit 68e20e4

Please sign in to comment.