From 4fba588766519f92cdf38451598534abe3e4cc14 Mon Sep 17 00:00:00 2001 From: Gerard Clos Date: Fri, 20 Sep 2024 15:39:18 +0200 Subject: [PATCH] feature: improve streaming experience --- .../EvaluationStatusBanner/index.tsx | 43 ++++++++++++----- packages/core/src/websockets/constants.ts | 2 - .../runBatchEvaluationJob.test.ts | 7 +-- .../batchEvaluations/runBatchEvaluationJob.ts | 11 ++--- .../batchEvaluations/runDocumentJob.test.ts | 3 -- .../batchEvaluations/runDocumentJob.ts | 7 +-- .../batchEvaluations/runEvaluationJob.ts | 46 ++++++++----------- packages/jobs/src/utils/progressTracker.ts | 28 ++++++----- 8 files changed, 72 insertions(+), 75 deletions(-) diff --git a/apps/web/src/app/(private)/projects/[projectId]/versions/[commitUuid]/documents/[documentUuid]/evaluations/[evaluationId]/_components/EvaluationResults/EvaluationStatusBanner/index.tsx b/apps/web/src/app/(private)/projects/[projectId]/versions/[commitUuid]/documents/[documentUuid]/evaluations/[evaluationId]/_components/EvaluationResults/EvaluationStatusBanner/index.tsx index 354161912..40889e016 100644 --- a/apps/web/src/app/(private)/projects/[projectId]/versions/[commitUuid]/documents/[documentUuid]/evaluations/[evaluationId]/_components/EvaluationResults/EvaluationStatusBanner/index.tsx +++ b/apps/web/src/app/(private)/projects/[projectId]/versions/[commitUuid]/documents/[documentUuid]/evaluations/[evaluationId]/_components/EvaluationResults/EvaluationStatusBanner/index.tsx @@ -9,7 +9,7 @@ import { type EventArgs, } from '$/components/Providers/WebsocketsProvider/useSockets' -const DISAPERING_IN_MS = 1500 +const DISAPERING_IN_MS = 5000 export function EvaluationStatusBanner({ evaluation, }: { @@ -35,11 +35,11 @@ export function EvaluationStatusBanner({ const newJobs = [...prevJobs] newJobs[jobIndex] = args - if (args.status && args.status === 'finished') { + if (isDone(args)) { setTimeout(() => { - setJobs((currentJobs) => { - return currentJobs.filter((job) => job.batchId !== args.batchId) - }) + setJobs((currentJobs) => + currentJobs.filter((job) => job.batchId !== args.batchId), + ) }, DISAPERING_IN_MS) } @@ -49,6 +49,7 @@ export function EvaluationStatusBanner({ }, [evaluation.id, document.documentUuid], ) + useEffect(() => { return () => { if (timeoutRef.current) { @@ -62,13 +63,33 @@ export function EvaluationStatusBanner({ return ( <> {jobs.map((job) => ( - - {`Generating batch evaluation (ID: ${job.batchId}) ${job.completed}/${job.initialTotal}`} - +
+ {!isDone(job) && ( + + {`Running batch evaluation ${job.completed}/${job.total}`} + + )} + {job.errors > 0 && !isDone(job) && ( + + Some evaluations failed to run. We won't retry them automatically + to avoid increasing provider costs. Total errors:{' '} + {job.errors} + + )} + {isDone(job) && ( + + Batch evaluation completed! Total evaluations:{' '} + {job.total} · Total errors:{' '} + {job.errors} · Total completed:{' '} + {job.completed} + + )} +
))} ) } + +function isDone(job: EventArgs<'evaluationStatus'>) { + return job.total === job.completed + job.errors +} diff --git a/packages/core/src/websockets/constants.ts b/packages/core/src/websockets/constants.ts index 9b35f83b0..c4d69e32b 100644 --- a/packages/core/src/websockets/constants.ts +++ b/packages/core/src/websockets/constants.ts @@ -30,8 +30,6 @@ type EvaluationStatusArgs = { batchId: string evaluationId: number documentUuid: string - status: 'started' | 'running' | 'finished' - initialTotal: number total: number completed: number errors: number diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.test.ts b/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.test.ts index 6fae14859..0410e7d0d 100644 --- a/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.test.ts +++ b/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.test.ts @@ -81,7 +81,9 @@ describe('runBatchEvaluationJob', () => { // @ts-ignore mockProgressTracker = { initializeProgress: vi.fn(), - getProgress: vi.fn().mockResolvedValue({ enqueued: 0 }), + getProgress: vi + .fn() + .mockResolvedValue({ total: 3, completed: 0, enqueued: 0 }), incrementEnqueued: vi.fn(), } @@ -113,10 +115,9 @@ describe('runBatchEvaluationJob', () => { batchId: expect.any(String), evaluationId: 1, documentUuid: 'fake-document-uuid', - status: 'started', enqueued: 0, - completed: 1, total: 3, + completed: 0, }, }) }) diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.ts b/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.ts index 8ac37fca8..29ec84498 100644 --- a/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.ts +++ b/packages/jobs/src/job-definitions/batchEvaluations/runBatchEvaluationJob.ts @@ -83,10 +83,7 @@ export const runBatchEvaluationJob = async ( batchId, evaluationId: evaluation.id, documentUuid: document.documentUuid, - status: 'started', ...progress, - completed: 1, // Optimistic completion of first job - total: parameters.length, }, }) } @@ -94,19 +91,17 @@ export const runBatchEvaluationJob = async ( // Enqueue runDocumentJob for each set of parameters, starting from the last // enqueued job. This allows us to resume the batch if the job fails. for (let i = progress.enqueued; i < parameters.length; i++) { - const isFirstEnqueued = progress.enqueued === 0 + progressTracker.incrementEnqueued() + await jobs.defaultQueue.jobs.enqueueRunDocumentJob({ workspaceId: workspace.id, documentUuid: document.documentUuid, commitUuid: commit.uuid, projectId: commit.projectId, - parameters: parameters[i]!, + parameters: parameters[i], evaluationId: evaluation.id, - skipProgress: isFirstEnqueued, batchId, }) - - await progressTracker.incrementEnqueued() } return { batchId } diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.test.ts b/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.test.ts index d24ce70ea..a37610275 100644 --- a/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.test.ts +++ b/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.test.ts @@ -109,7 +109,6 @@ describe('runDocumentJob', () => { ) expect(ProgressTracker.prototype.incrementErrors).not.toHaveBeenCalled() - expect(ProgressTracker.prototype.decrementTotal).not.toHaveBeenCalled() }) it('should handle errors and update progress tracker', async () => { @@ -124,7 +123,6 @@ describe('runDocumentJob', () => { ).not.toHaveBeenCalled() expect(ProgressTracker.prototype.incrementErrors).toHaveBeenCalled() - expect(ProgressTracker.prototype.decrementTotal).toHaveBeenCalled() }) it('should log errors in non-production environment', async () => { @@ -138,6 +136,5 @@ describe('runDocumentJob', () => { expect(consoleSpy).toHaveBeenCalledWith(testError) expect(ProgressTracker.prototype.incrementErrors).toHaveBeenCalled() - expect(ProgressTracker.prototype.decrementTotal).toHaveBeenCalled() }) }) diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.ts b/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.ts index 0d9105e85..ebda56f5c 100644 --- a/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.ts +++ b/packages/jobs/src/job-definitions/batchEvaluations/runDocumentJob.ts @@ -34,11 +34,10 @@ export const runDocumentJob = async (job: Job) => { evaluationId, batchId, } = job.data - - const jobs = await setupJobs() const progressTracker = new ProgressTracker(await queues(), batchId) try { + const jobs = await setupJobs() const workspace = await unsafelyFindWorkspace(workspaceId) if (!workspace) throw new NotFoundError('Workspace not found') @@ -59,7 +58,6 @@ export const runDocumentJob = async (job: Job) => { }).then((r) => r.unwrap()) await result.response - await jobs.defaultQueue.jobs.enqueueRunEvaluationJob( { workspaceId, @@ -76,10 +74,8 @@ export const runDocumentJob = async (job: Job) => { } await progressTracker.incrementErrors() - await progressTracker.decrementTotal() const progress = await progressTracker.getProgress() - const finished = await progressTracker.isFinished() const websockets = await WebsocketClient.getSocket() websockets.emit('evaluationStatus', { @@ -88,7 +84,6 @@ export const runDocumentJob = async (job: Job) => { batchId, evaluationId, documentUuid, - status: finished ? 'finished' : 'running', ...progress, }, }) diff --git a/packages/jobs/src/job-definitions/batchEvaluations/runEvaluationJob.ts b/packages/jobs/src/job-definitions/batchEvaluations/runEvaluationJob.ts index ef1e9ff7d..807297a0a 100644 --- a/packages/jobs/src/job-definitions/batchEvaluations/runEvaluationJob.ts +++ b/packages/jobs/src/job-definitions/batchEvaluations/runEvaluationJob.ts @@ -5,6 +5,7 @@ import { } from '@latitude-data/core/repositories' import { runEvaluation } from '@latitude-data/core/services/evaluations/run' import { WebsocketClient } from '@latitude-data/core/websockets/workers' +import { env } from '@latitude-data/env' import { Job } from 'bullmq' import { ProgressTracker } from '../../utils/progressTracker' @@ -19,20 +20,14 @@ type RunEvaluationJobData = { } export const runEvaluationJob = async (job: Job) => { + const { workspaceId, batchId, documentUuid, documentLogUuid, evaluationId } = + job.data const websockets = await WebsocketClient.getSocket() - const { - skipProgress, - workspaceId, - batchId, - documentUuid, - documentLogUuid, - evaluationId, - } = job.data - const progressTracker = new ProgressTracker(await queues(), batchId) - const documentLogsScope = new DocumentLogsRepository(workspaceId) - const evaluationsScope = new EvaluationsRepository(workspaceId) + try { + const documentLogsScope = new DocumentLogsRepository(workspaceId) + const evaluationsScope = new EvaluationsRepository(workspaceId) const documentLog = await documentLogsScope .findByUuid(documentLogUuid) .then((r) => r.unwrap()) @@ -47,25 +42,22 @@ export const runEvaluationJob = async (job: Job) => { await progressTracker.incrementCompleted() } catch (error) { + if (env.NODE_ENV !== 'production') { + console.error(error) + } + await progressTracker.incrementErrors() } finally { - await progressTracker.decrementTotal() const progress = await progressTracker.getProgress() - const finished = await progressTracker.isFinished() - console.log('DEBUG: Emitting evaluationStatus before check') - if (!skipProgress) { - console.log('DEBUG: Emitting evaluationStatus', workspaceId) - websockets.emit('evaluationStatus', { - workspaceId, - data: { - batchId, - evaluationId, - documentUuid, - status: finished ? 'finished' : 'running', - ...progress, - }, - }) - } + websockets.emit('evaluationStatus', { + workspaceId, + data: { + batchId, + evaluationId, + documentUuid, + ...progress, + }, + }) } } diff --git a/packages/jobs/src/utils/progressTracker.ts b/packages/jobs/src/utils/progressTracker.ts index 3d290d3f1..ee0b193b4 100644 --- a/packages/jobs/src/utils/progressTracker.ts +++ b/packages/jobs/src/utils/progressTracker.ts @@ -8,10 +8,12 @@ export class ProgressTracker { async initializeProgress(total: number) { const multi = this.redis.multi() - multi.set(this.getKey('initialTotal'), total) + multi.set(this.getKey('total'), total) multi.set(this.getKey('completed'), 0) + multi.set(this.getKey('enqueued'), 0) multi.set(this.getKey('errors'), 0) + await multi.exec() } @@ -31,18 +33,19 @@ export class ProgressTracker { await this.redis.incr(this.getKey('enqueued')) } + async decrementEnqueued() { + await this.redis.decr(this.getKey('enqueued')) + } + async getProgress() { - const [initialTotal, total, completed, errors, enqueued] = - await this.redis.mget([ - this.getKey('initialTotal'), - this.getKey('total'), - this.getKey('completed'), - this.getKey('errors'), - this.getKey('enqueued'), - ]) + const [total, completed, errors, enqueued] = await this.redis.mget([ + this.getKey('total'), + this.getKey('completed'), + this.getKey('errors'), + this.getKey('enqueued'), + ]) return { - initialTotal: parseInt(initialTotal || '0', 10), total: parseInt(total || '0', 10), completed: parseInt(completed || '0', 10), errors: parseInt(errors || '0', 10), @@ -50,11 +53,6 @@ export class ProgressTracker { } } - async isFinished() { - const { enqueued, completed, errors } = await this.getProgress() - return enqueued === completed + errors - } - private getKey(suffix: string) { return `batch:${this.batchId}:${suffix}` }