Skip to content

Commit

Permalink
feature: Run batch evaluation job
Browse files Browse the repository at this point in the history
This commit implements the flow to execute a batch evaluation up until
the point of actually running the evaluation.
  • Loading branch information
geclos committed Sep 10, 2024
1 parent d2d7c28 commit c9b184f
Show file tree
Hide file tree
Showing 39 changed files with 1,059 additions and 273 deletions.
10 changes: 0 additions & 10 deletions apps/gateway/src/jobs/index.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ const mocks = vi.hoisted(() => ({
runDocumentAtCommit: vi.fn(),
queues: {
defaultQueue: {
jobs: {
enqueueCreateProviderLogJob: vi.fn(),
enqueueCreateDocumentLogJob: vi.fn(),
},
addCreateProviderLogJob: vi.fn(),
addCreateDocumentLogJob: vi.fn(),
},
eventsQueue: {
addCreateEventJob: vi.fn(),
addPublishEventJob: vi.fn(),
},
},
}))
Expand All @@ -44,7 +46,7 @@ vi.mock(
},
)

vi.mock('$/jobs', () => ({
vi.mock('@latitude-data/jobs', () => ({
queues: mocks.queues,
}))

Expand Down Expand Up @@ -212,7 +214,7 @@ describe('POST /run', () => {
await testConsumeStream(res.body as ReadableStream)

expect(
mocks.queues.defaultQueue.jobs.enqueueCreateDocumentLogJob,
mocks.queues.defaultQueue.addCreateDocumentLogJob,
).toHaveBeenCalledWith({
commit,
data: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { zValidator } from '@hono/zod-validator'
import { LogSources } from '@latitude-data/core/browser'
import { runDocumentAtCommit } from '@latitude-data/core/services/commits/runDocumentAtCommit'
import { queues } from '@latitude-data/jobs'
import { pipeToStream } from '$/common/pipeToStream'
import { queues } from '$/jobs'
import { Factory } from 'hono/factory'
import { streamSSE } from 'hono/streaming'
import { z } from 'zod'
Expand Down Expand Up @@ -45,7 +45,8 @@ export const runHandler = factory.createHandlers(

await pipeToStream(stream, result.stream)

queues.defaultQueue.jobs.enqueueCreateDocumentLogJob({
// TODO: move to events
await queues.defaultQueue.addCreateDocumentLogJob({
commit,
data: {
uuid: result.documentLogUuid,
Expand Down
6 changes: 0 additions & 6 deletions apps/web/next.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ const nextConfig = {
output: 'standalone',
transpilePackages: INTERNAL_PACKAGES,
experimental: {
// Dear developer,
//
// Unfortunately, our jobs packages uses some meta programming that relies
// on the name of job handler functions for things to work properly. As you
// can imagine, minification would break this. So we have to disable it.
serverMinification: false,
// TODO: Review this decision. It would be more performant to use
// direct uploads. To implement it we need to generate a signed URL
// that's send directly to S3 and the clint upload the file to Amazon directly
Expand Down
1 change: 1 addition & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"lodash-es": "^4.17.21",
"lucia": "^3.2.0",
"monaco-editor": "^0.50.0",
"nanoid": "^5.0.7",
"next": "^14.3.0-canary.87",
"next-themes": "^0.3.0",
"nextjs-toploader": "^1.6.12",
Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/actions/datasets/create.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use server'

import { disk } from '@latitude-data/core/lib/disk'
import { DatasetsRepository } from '@latitude-data/core/repositories'
import { createDataset } from '@latitude-data/core/services/datasets/create'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
Expand Down
4 changes: 2 additions & 2 deletions apps/web/src/actions/datasets/destroy.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use server'

import { disk } from '@latitude-data/core/lib/disk'
import { DatasetsRepository } from '@latitude-data/core/repositories'
import { destroyDataset } from '@latitude-data/core/services/datasets/destroy'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
Expand All @@ -19,5 +19,5 @@ export const destroyDatasetAction = authProcedure
const repo = new DatasetsRepository(ctx.workspace.id)
const dataset = await repo.find(id).then((r) => r.unwrap())

return await destroyDataset({ dataset, disk }).then((r) => r.unwrap())
return await destroyDataset({ dataset, disk: disk }).then((r) => r.unwrap())
})
4 changes: 2 additions & 2 deletions apps/web/src/actions/datasets/preview.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use server'

import { disk } from '@latitude-data/core/lib/disk'
import { DatasetsRepository } from '@latitude-data/core/repositories'
import { previewDataset } from '@latitude-data/core/services/datasets/preview'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
Expand All @@ -17,5 +17,5 @@ export const previewDatasetAction = authProcedure
.handler(async ({ ctx, input }) => {
const repo = new DatasetsRepository(ctx.workspace.id)
const dataset = await repo.find(input.id).then((r) => r.unwrap())
return await previewDataset({ dataset, disk }).then((r) => r.unwrap())
return await previewDataset({ dataset, disk: disk }).then((r) => r.unwrap())
})
171 changes: 171 additions & 0 deletions apps/web/src/actions/evaluations/runBatch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import {
Commit,
Dataset,
DocumentVersion,
EvaluationDto,
Project,
ProviderApiKey,
Providers,
User,
Workspace,
} from '@latitude-data/core/browser'
import * as factories from '@latitude-data/core/factories'
import { queues } from '@latitude-data/jobs'
import { beforeEach, describe, expect, it, vi } from 'vitest'

import { runBatchAction } from './runBatch'

const mocks = vi.hoisted(() => ({
getSession: vi.fn(),
}))

vi.mock('$/services/auth/getSession', () => ({
getSession: mocks.getSession,
}))

vi.mock('@latitude-data/jobs', () => ({
queues: {
defaultQueue: {
addRunBatchEvaluationJob: vi.fn(),
},
eventsQueue: {
addCreateEventJob: vi.fn(),
addPublishEventJob: vi.fn(),
},
},
}))

describe('runBatchAction', () => {
describe('unauthorized', () => {
it('errors when the user is not authenticated', async () => {
const [_, error] = await runBatchAction({
datasetId: 1,
projectId: 1,
documentUuid: 'doc-uuid',
commitUuid: 'commit-uuid',
runCount: 5,
evaluationId: 1,
})

expect(error!.name).toEqual('UnauthorizedError')
})
})

describe('authorized', () => {
let workspace: Workspace,
user: User,
project: Project,
document: DocumentVersion,
commit: Commit,
dataset: Dataset,
evaluation: EvaluationDto,
provider: ProviderApiKey

beforeEach(async () => {
const setup = await factories.createProject({
documents: { 'test-doc': 'Test content' },
})
workspace = setup.workspace
user = setup.user
project = setup.project
document = setup.documents[0]!
commit = setup.commit

provider = await factories.createProviderApiKey({
workspace,
type: Providers.OpenAI,
name: 'Test Provider',
user,
})

dataset = await factories
.createDataset({
name: 'Test Dataset',
workspace,
author: user,
})
.then((result) => result.dataset)

evaluation = await factories.createEvaluation({
provider,
name: 'Test Evaluation',
})

mocks.getSession.mockReturnValue({
user,
workspace: { id: workspace.id, name: workspace.name },
})
})

it('successfully enqueues a batch evaluation job', async () => {
const [result, error] = await runBatchAction({
datasetId: dataset.id,
projectId: project.id,
documentUuid: document.documentUuid,
commitUuid: commit.uuid,
runCount: 5,
evaluationId: evaluation.id,
})

expect(error).toBeNull()
expect(result).toEqual({
success: true,
batchId: expect.any(String),
})

expect(queues.defaultQueue.addRunBatchEvaluationJob).toHaveBeenCalledWith(
expect.objectContaining({
evaluation: expect.objectContaining({ id: evaluation.id }),
dataset: expect.objectContaining({ id: dataset.id }),
document: expect.objectContaining({
documentUuid: document.documentUuid,
}),
runCount: 5,
offset: 0,
parametersMap: undefined,
batchId: expect.any(String),
}),
)
})

it('handles optional parameters', async () => {
const [result, error] = await runBatchAction({
datasetId: dataset.id,
projectId: project.id,
documentUuid: document.documentUuid,
commitUuid: commit.uuid,
runCount: 5,
evaluationId: evaluation.id,
offset: 10,
parameters: { 1: 100, 2: 200 },
})

expect(error).toBeNull()
expect(result).toEqual({
success: true,
batchId: expect.any(String),
})

expect(queues.defaultQueue.addRunBatchEvaluationJob).toHaveBeenCalledWith(
expect.objectContaining({
offset: 10,
parametersMap: { 1: 100, 2: 200 },
}),
)
})

it('handles errors when resources are not found', async () => {
const [_, error] = await runBatchAction({
datasetId: 999999,
projectId: project.id,
documentUuid: document.documentUuid,
commitUuid: commit.uuid,
runCount: 5,
evaluationId: evaluation.id,
})

expect(error).not.toBeNull()
expect(error!.message).toContain('not found')
})
})
})
58 changes: 58 additions & 0 deletions apps/web/src/actions/evaluations/runBatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import {
DatasetsRepository,
DocumentVersionsRepository,
EvaluationsRepository,
} from '@latitude-data/core/repositories'
import { queues } from '@latitude-data/jobs'
import { nanoid } from 'nanoid'
import { z } from 'zod'

import { authProcedure } from '../procedures'

export const runBatchAction = authProcedure
.createServerAction()
.input(
z.object({
datasetId: z.number(),
projectId: z.number(),
documentUuid: z.string(),
commitUuid: z.string(),
runCount: z.number(),
offset: z.number().optional().default(0),
parameters: z.record(z.number()).optional(),
evaluationId: z.number(),
}),
)
.handler(async ({ input, ctx }) => {
const evaluationsRepo = new EvaluationsRepository(ctx.workspace.id)
const evaluation = await evaluationsRepo
.find(input.evaluationId)
.then((r) => r.unwrap())

const datasetsRepo = new DatasetsRepository(ctx.workspace.id)
const dataset = await datasetsRepo
.find(input.datasetId)
.then((r) => r.unwrap())

const docsRepo = new DocumentVersionsRepository(ctx.workspace.id)
const document = await docsRepo
.getDocumentAtCommit({
projectId: input.projectId,
commitUuid: input.commitUuid,
documentUuid: input.documentUuid,
})
.then((r) => r.unwrap())

const batchId = nanoid()
await queues.defaultQueue.addRunBatchEvaluationJob({
evaluation,
dataset,
document,
runCount: input.runCount,
offset: input.offset,
parametersMap: input.parameters,
batchId,
})

return { success: true, batchId }
})
10 changes: 0 additions & 10 deletions apps/web/src/jobs/index.ts

This file was deleted.

14 changes: 0 additions & 14 deletions apps/web/src/lib/disk.ts

This file was deleted.

Loading

0 comments on commit c9b184f

Please sign in to comment.