Skip to content

Commit

Permalink
feat: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
andresgutgon committed Sep 15, 2024
1 parent 46fe7d9 commit 10a8d99
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { findWorkspaceFromDocument } from '@latitude-data/core/data-access'
import { NotFoundError } from '@latitude-data/core/lib/errors'
import { CommitsRepository } from '@latitude-data/core/repositories'
import { previewDataset } from '@latitude-data/core/services/datasets/preview'
import { WebsocketClient } from '@latitude-data/core/websockets/workers'
import { Job } from 'bullmq'

import { setupJobs } from '../..'
Expand Down Expand Up @@ -37,6 +38,7 @@ export const runBatchEvaluationJob = async (
parametersMap,
batchId = randomUUID(),
} = job.data
const websockets = await WebsocketClient.getSocket()
const workspace = await findWorkspaceFromDocument(document)
if (!workspace) throw new NotFoundError('Workspace not found')

Expand All @@ -62,16 +64,31 @@ export const runBatchEvaluationJob = async (

const progressTracker = new ProgressTracker(connection, batchId)

if (job.attemptsMade === 0) {
const firstAttempt = job.attemptsMade === 0

if (firstAttempt) {
await progressTracker.initializeProgress(parameters.length)
}

const { enqueued } = await progressTracker.getProgress()
const progress = await progressTracker.getProgress()
const queues = setupJobs()

if (firstAttempt) {
websockets.emit('evaluationStatus', {
workspaceId: workspace.id,
data: {
evaluationId: evaluation.id,
documentUuid: document.documentUuid,
status: 'started',
...progress,
total: parameters.length,
},
})
}

// Enqueue runDocumentJob for each set of parameters, starting from the last
// enqueued job. This allows us to resume the batch if the job fails.
for (let i = enqueued; i < parameters.length; i++) {
for (let i = progress.enqueued; i < parameters.length; i++) {
await queues.defaultQueue.jobs.enqueueRunDocumentJob({
workspaceId: workspace.id,
documentUuid: document.documentUuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@ type RunEvaluationJobData = {
batchId: string
}

function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
export const runEvaluationJob = async (job: Job<RunEvaluationJobData>) => {
const websockets = await WebsocketClient.getSocket()
const { workspaceId, batchId, documentUuid, documentLogUuid, evaluationId } =
job.data

await sleep(1000)
const progressTracker = new ProgressTracker(connection, batchId)
const documentLogsScope = new DocumentLogsRepository(workspaceId)
const evaluationsScope = new EvaluationsRepository(workspaceId)
Expand All @@ -52,8 +48,6 @@ export const runEvaluationJob = async (job: Job<RunEvaluationJobData>) => {
const progress = await progressTracker.getProgress()
const finished = await progressTracker.isFinished()

console.log('Prgress:', progress)

websockets.emit('evaluationStatus', {
workspaceId,
data: {
Expand Down

0 comments on commit 10a8d99

Please sign in to comment.