Skip to content

Commit

Permalink
feature: Run batch evaluation job
Browse files Browse the repository at this point in the history
This commit implements the flow to execute a batch evaluation up until
the point of actually running the evaluation.
  • Loading branch information
geclos committed Sep 10, 2024
1 parent 2c684ed commit 0f5b6b4
Show file tree
Hide file tree
Showing 27 changed files with 681 additions and 97 deletions.
10 changes: 0 additions & 10 deletions apps/gateway/src/jobs/index.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { zValidator } from '@hono/zod-validator'
import { LogSources } from '@latitude-data/core/browser'
import { jobs } from '@latitude-data/core/jobs'
import { runDocumentAtCommit } from '@latitude-data/core/services/commits/runDocumentAtCommit'
import { pipeToStream } from '$/common/pipeToStream'
import { queues } from '$/jobs'
import { Factory } from 'hono/factory'
import { streamSSE } from 'hono/streaming'
import { z } from 'zod'
Expand Down Expand Up @@ -45,7 +45,8 @@ export const runHandler = factory.createHandlers(

await pipeToStream(stream, result.stream)

queues.defaultQueue.jobs.enqueueCreateDocumentLogJob({
// TODO: move to events
jobs.queues.defaultQueue.jobs.enqueueCreateDocumentLogJob({
commit,
data: {
uuid: result.documentLogUuid,
Expand Down
1 change: 1 addition & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"lodash-es": "^4.17.21",
"lucia": "^3.2.0",
"monaco-editor": "^0.50.0",
"nanoid": "^5.0.7",
"next": "^14.3.0-canary.87",
"next-themes": "^0.3.0",
"nextjs-toploader": "^1.6.12",
Expand Down
2 changes: 1 addition & 1 deletion apps/web/src/actions/datasets/create.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use server'

import { disk } from '@latitude-data/core/lib/disk'
import { DatasetsRepository } from '@latitude-data/core/repositories'
import { createDataset } from '@latitude-data/core/services/datasets/create'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
Expand Down
4 changes: 2 additions & 2 deletions apps/web/src/actions/datasets/destroy.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use server'

import { disk } from '@latitude-data/core/lib/disk'
import { DatasetsRepository } from '@latitude-data/core/repositories'
import { destroyDataset } from '@latitude-data/core/services/datasets/destroy'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
Expand All @@ -19,5 +19,5 @@ export const destroyDatasetAction = authProcedure
const repo = new DatasetsRepository(ctx.workspace.id)
const dataset = await repo.find(id).then((r) => r.unwrap())

return await destroyDataset({ dataset, disk }).then((r) => r.unwrap())
return await destroyDataset({ dataset, disk: disk }).then((r) => r.unwrap())
})
4 changes: 2 additions & 2 deletions apps/web/src/actions/datasets/preview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import { DatasetsRepository } from '@latitude-data/core/repositories'
import { previewDataset } from '@latitude-data/core/services/datasets/preview'
import disk from '$/lib/disk'
import { z } from 'zod'

import { authProcedure } from '../procedures'
import { disk } from '@latitude-data/core/lib/disk'

export const previewDatasetAction = authProcedure
.createServerAction()
Expand All @@ -17,5 +17,5 @@ export const previewDatasetAction = authProcedure
.handler(async ({ ctx, input }) => {
const repo = new DatasetsRepository(ctx.workspace.id)
const dataset = await repo.find(input.id).then((r) => r.unwrap())
return await previewDataset({ dataset, disk }).then((r) => r.unwrap())
return await previewDataset({ dataset, disk: disk }).then((r) => r.unwrap())
})
58 changes: 58 additions & 0 deletions apps/web/src/actions/evaluations/runBatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { jobs } from '@latitude-data/core/jobs'
import {
DatasetsRepository,
DocumentVersionsRepository,
EvaluationsRepository,
} from '@latitude-data/core/repositories'
import { nanoid } from 'nanoid'
import { z } from 'zod'

import { authProcedure } from '../procedures'

export const runBatchAction = authProcedure
.createServerAction()
.input(
z.object({
datasetId: z.number(),
projectId: z.number(),
documentUuid: z.string(),
commitUuid: z.string(),
runCount: z.number(),
offset: z.number().optional().default(0),
parameters: z.record(z.number()).optional(),
evaluationId: z.number(),
}),
)
.handler(async ({ input, ctx }) => {
const evaluationsRepo = new EvaluationsRepository(ctx.workspace.id)
const evaluation = await evaluationsRepo
.find(input.evaluationId)
.then((r) => r.unwrap())

const datasetsRepo = new DatasetsRepository(ctx.workspace.id)
const dataset = await datasetsRepo
.find(input.datasetId)
.then((r) => r.unwrap())

const docsRepo = new DocumentVersionsRepository(ctx.workspace.id)
const document = await docsRepo
.getDocumentAtCommit({
projectId: input.projectId,
commitUuid: input.commitUuid,
documentUuid: input.documentUuid,
})
.then((r) => r.unwrap())

const batchId = nanoid()
await jobs.queues.defaultQueue.jobs.enqueueRunBatchEvaluationJob({
evaluation,
dataset,
document,
runCount: input.runCount,
offset: input.offset,
parametersMap: input.parameters,
batchId,
})

return { success: true, batchId }
})
10 changes: 0 additions & 10 deletions apps/web/src/jobs/index.ts

This file was deleted.

14 changes: 0 additions & 14 deletions apps/web/src/lib/disk.ts

This file was deleted.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"./browser": "./src/browser.ts",
"./data-access": "./src/data-access/index.ts",
"./client": "./src/client/index.ts",
"./jobs": "./src/jobs/index.ts",
"./events/*": "./src/events/*.ts",
"./lib/*": "./src/lib/*.ts",
"./redis": "./src/redis/index.ts",
Expand Down
16 changes: 1 addition & 15 deletions packages/core/src/events/publisher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { env } from '@latitude-data/env'
import { setupJobs } from '@latitude-data/jobs'

import { jobs } from '../jobs'
import { EventHandler, EventHandlers, LatitudeEvent } from './handlers'

let jobs: ReturnType<typeof setupJobs>

export const publisher = {
publish: async (event: LatitudeEvent) => {
const handlers = EventHandlers[event.type] as EventHandler<typeof event>[]
Expand All @@ -15,16 +11,6 @@ export const publisher = {
)
},
publishLater: (event: LatitudeEvent) => {
if (!jobs) {
jobs = setupJobs({
connectionParams: {
host: env.REDIS_HOST,
port: env.REDIS_PORT,
password: env.REDIS_PASSWORD,
},
})
}

jobs.queues.eventsQueue.jobs.enqueueCreateEventJob(event)
jobs.queues.eventsQueue.jobs.enqueuePublishEventJob(event)
},
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { setupJobs } from '@latitude-data/jobs'

export const jobs = setupJobs()
16 changes: 16 additions & 0 deletions packages/core/src/lib/disk.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { debuglog } from 'node:util'
import path from 'path'
import { Readable } from 'stream'
import { fileURLToPath } from 'url'

import { HeadBucketCommand, S3Client } from '@aws-sdk/client-s3'
import { Result } from '@latitude-data/core/lib/Result'
Expand Down Expand Up @@ -50,6 +52,7 @@ async function getReadableStreamFromFile(file: File) {
}

type BuildArgs = { local: { publicPath: string; location: string } }

export class DiskWrapper {
private disk: Disk

Expand Down Expand Up @@ -139,6 +142,7 @@ export class DiskWrapper {
}

const awsConfig = getAwsConfig()

return new S3Driver({
credentials: awsConfig.credentials,
region: awsConfig.region,
Expand All @@ -147,3 +151,15 @@ export class DiskWrapper {
})
}
}

const PUBLIC_PATH = 'uploads'

export const disk = new DiskWrapper({
local: {
publicPath: PUBLIC_PATH,
location: path.join(
path.dirname(fileURLToPath(import.meta.url)),
`../../public/${PUBLIC_PATH}`,
),
},
})
1 change: 1 addition & 0 deletions packages/core/src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export { default as Transaction, type PromisedResult } from './Transaction'
export * from './Result'
export * from './errors'
export * from './commonTypes'
export * from './disk'
13 changes: 9 additions & 4 deletions packages/core/src/lib/readCsv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ function getData(file: File | string) {
type ParseCsvOptions = {
delimiter: string
// https://csv.js.org/parse/options/to_line/
limit?: number
toLine?: number
fromLine?: number
}
type ParseResult = {
record: Record<string, string>
Expand All @@ -25,7 +26,7 @@ export type CsvParsedData = {
}
export async function syncReadCsv(
file: File | string,
{ delimiter, limit = -1 }: ParseCsvOptions,
{ delimiter, toLine, fromLine }: ParseCsvOptions,
) {
try {
const data = await getData(file)
Expand All @@ -39,8 +40,12 @@ export async function syncReadCsv(
info: true,
}

if (limit > 0) {
opts = { ...opts, to_line: limit }
if (toLine) {
opts = { ...opts, toLine }
}

if (fromLine) {
opts = { ...opts, fromLine }
}

const records = parse(data, opts) as ParseResult[]
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/redis/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { env } from '@latitude-data/env'
import Redis, { RedisOptions } from 'ioredis'

export function buildRedisConnection({
Expand All @@ -7,3 +8,9 @@ export function buildRedisConnection({
}: Omit<RedisOptions, 'port' & 'host'> & { host: string; port: number }) {
return new Redis(port, host, opts)
}

export const connection = buildRedisConnection({
host: env.REDIS_HOST,
port: env.REDIS_PORT,
password: env.REDIS_PASSWORD,
})
9 changes: 6 additions & 3 deletions packages/core/src/services/datasets/preview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ import { syncReadCsv } from '../../lib/readCsv'
export async function previewDataset({
dataset,
disk,
limit = 100,
fromLine = 0,
toLine = 100,
}: {
dataset: Dataset
disk: DiskWrapper
limit?: number
fromLine?: number
toLine?: number
}) {
const diskFile = disk.file(dataset.fileKey)
const bytes = await diskFile.getBytes()
const file = new TextDecoder().decode(bytes)
const readResult = await syncReadCsv(file, {
limit,
fromLine,
toLine,
delimiter: dataset.csvDelimiter,
})
if (readResult.error) readResult
Expand Down
6 changes: 5 additions & 1 deletion packages/jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@
"scripts": {
"lint": "eslint src",
"tc": "tsc --noEmit",
"prettier": "prettier --write src/**/*.ts"
"prettier": "prettier --write src/**/*.ts",
"test": "vitest run --pool=forks",
"test:watch": "vitest"
},
"dependencies": {
"@latitude-data/env": "workspace:*"
},
"devDependencies": {
"@latitude-data/core": "workspace:^",
"@latitude-data/eslint-config": "workspace:*",
"@latitude-data/typescript-config": "workspace:*",
"@types/node": "*",
"bullmq": "^5.8.7",
"ioredis": "^5.4.1",
"vitest": "^2.0.5",
"zod": "^3.23.8"
},
"peerDependencies": {
Expand Down
18 changes: 12 additions & 6 deletions packages/jobs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { connection as coreConnection } from '@latitude-data/core/redis'

import { buildConnection, ConnectionParams } from './connection'
import { setupQueues } from './queues'
import startWorkers from './workers'

export { Worker } from 'bullmq'

function getConnection(connectionParams?: ConnectionParams) {
return connectionParams ? buildConnection(connectionParams) : coreConnection
}

export function setupJobs({
connectionParams,
}: {
connectionParams: ConnectionParams
}) {
const connection = buildConnection(connectionParams)
connectionParams?: ConnectionParams
} = {}) {
const connection = getConnection(connectionParams)
const queues = setupQueues({ connection })

return { queues }
Expand All @@ -18,8 +24,8 @@ export function setupJobs({
export function setupWorkers({
connectionParams,
}: {
connectionParams: ConnectionParams
}) {
const connection = buildConnection(connectionParams)
connectionParams?: ConnectionParams
} = {}) {
const connection = getConnection(connectionParams)
return startWorkers({ connection })
}
Loading

0 comments on commit 0f5b6b4

Please sign in to comment.