Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: improve streaming experience #245

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
type EventArgs,
} from '$/components/Providers/WebsocketsProvider/useSockets'

const DISAPERING_IN_MS = 1500
const DISAPERING_IN_MS = 5000
export function EvaluationStatusBanner({
evaluation,
}: {
Expand All @@ -35,11 +35,11 @@ export function EvaluationStatusBanner({
const newJobs = [...prevJobs]
newJobs[jobIndex] = args

if (args.status && args.status === 'finished') {
if (isDone(args)) {
setTimeout(() => {
setJobs((currentJobs) => {
return currentJobs.filter((job) => job.batchId !== args.batchId)
})
setJobs((currentJobs) =>
currentJobs.filter((job) => job.batchId !== args.batchId),
)
}, DISAPERING_IN_MS)
}

Expand All @@ -49,6 +49,7 @@ export function EvaluationStatusBanner({
},
[evaluation.id, document.documentUuid],
)

useEffect(() => {
return () => {
if (timeoutRef.current) {
Expand All @@ -62,13 +63,33 @@ export function EvaluationStatusBanner({
return (
<>
{jobs.map((job) => (
<ProgressIndicator
key={job.batchId}
state={job.status === 'finished' ? 'completed' : 'running'}
>
{`Generating batch evaluation (ID: ${job.batchId}) ${job.completed}/${job.initialTotal}`}
</ProgressIndicator>
<div key={job.batchId} className='flex flex-col gap-4'>
{!isDone(job) && (
<ProgressIndicator state='running'>
{`Running batch evaluation ${job.completed}/${job.total}`}
</ProgressIndicator>
)}
{job.errors > 0 && !isDone(job) && (
<ProgressIndicator state='error'>
Some evaluations failed to run. We won't retry them automatically
to avoid increasing provider costs. Total errors:{' '}
<strong>{job.errors}</strong>
</ProgressIndicator>
)}
{isDone(job) && (
<ProgressIndicator state='completed'>
Batch evaluation completed! Total evaluations:{' '}
<strong>{job.total}</strong> · Total errors:{' '}
<strong>{job.errors}</strong> · Total completed:{' '}
<strong>{job.completed}</strong>
</ProgressIndicator>
)}
</div>
))}
</>
)
}

function isDone(job: EventArgs<'evaluationStatus'>) {
return job.total === job.completed + job.errors
}
2 changes: 0 additions & 2 deletions packages/core/src/websockets/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ type EvaluationStatusArgs = {
batchId: string
evaluationId: number
documentUuid: string
status: 'started' | 'running' | 'finished'
initialTotal: number
total: number
completed: number
errors: number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ describe('runBatchEvaluationJob', () => {
// @ts-ignore
mockProgressTracker = {
initializeProgress: vi.fn(),
getProgress: vi.fn().mockResolvedValue({ enqueued: 0 }),
getProgress: vi
.fn()
.mockResolvedValue({ total: 3, completed: 0, enqueued: 0 }),
incrementEnqueued: vi.fn(),
}

Expand Down Expand Up @@ -113,10 +115,9 @@ describe('runBatchEvaluationJob', () => {
batchId: expect.any(String),
evaluationId: 1,
documentUuid: 'fake-document-uuid',
status: 'started',
enqueued: 0,
completed: 1,
total: 3,
completed: 0,
},
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,25 @@ export const runBatchEvaluationJob = async (
batchId,
evaluationId: evaluation.id,
documentUuid: document.documentUuid,
status: 'started',
...progress,
completed: 1, // Optimistic completion of first job
total: parameters.length,
},
})
}

// Enqueue runDocumentJob for each set of parameters, starting from the last
// enqueued job. This allows us to resume the batch if the job fails.
for (let i = progress.enqueued; i < parameters.length; i++) {
const isFirstEnqueued = progress.enqueued === 0
progressTracker.incrementEnqueued()

await jobs.defaultQueue.jobs.enqueueRunDocumentJob({
workspaceId: workspace.id,
documentUuid: document.documentUuid,
commitUuid: commit.uuid,
projectId: commit.projectId,
parameters: parameters[i]!,
parameters: parameters[i],
evaluationId: evaluation.id,
skipProgress: isFirstEnqueued,
batchId,
})

await progressTracker.incrementEnqueued()
}

return { batchId }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ describe('runDocumentJob', () => {
)

expect(ProgressTracker.prototype.incrementErrors).not.toHaveBeenCalled()
expect(ProgressTracker.prototype.decrementTotal).not.toHaveBeenCalled()
})

it('should handle errors and update progress tracker', async () => {
Expand All @@ -124,7 +123,6 @@ describe('runDocumentJob', () => {
).not.toHaveBeenCalled()

expect(ProgressTracker.prototype.incrementErrors).toHaveBeenCalled()
expect(ProgressTracker.prototype.decrementTotal).toHaveBeenCalled()
})

it('should log errors in non-production environment', async () => {
Expand All @@ -138,6 +136,5 @@ describe('runDocumentJob', () => {

expect(consoleSpy).toHaveBeenCalledWith(testError)
expect(ProgressTracker.prototype.incrementErrors).toHaveBeenCalled()
expect(ProgressTracker.prototype.decrementTotal).toHaveBeenCalled()
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ export const runDocumentJob = async (job: Job<RunDocumentJobData>) => {
evaluationId,
batchId,
} = job.data

const jobs = await setupJobs()
const progressTracker = new ProgressTracker(await queues(), batchId)

try {
const jobs = await setupJobs()
const workspace = await unsafelyFindWorkspace(workspaceId)
if (!workspace) throw new NotFoundError('Workspace not found')

Expand All @@ -59,7 +58,6 @@ export const runDocumentJob = async (job: Job<RunDocumentJobData>) => {
}).then((r) => r.unwrap())

await result.response

await jobs.defaultQueue.jobs.enqueueRunEvaluationJob(
{
workspaceId,
Expand All @@ -76,10 +74,8 @@ export const runDocumentJob = async (job: Job<RunDocumentJobData>) => {
}

await progressTracker.incrementErrors()
await progressTracker.decrementTotal()

const progress = await progressTracker.getProgress()
const finished = await progressTracker.isFinished()
const websockets = await WebsocketClient.getSocket()

websockets.emit('evaluationStatus', {
Expand All @@ -88,7 +84,6 @@ export const runDocumentJob = async (job: Job<RunDocumentJobData>) => {
batchId,
evaluationId,
documentUuid,
status: finished ? 'finished' : 'running',
...progress,
},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from '@latitude-data/core/repositories'
import { runEvaluation } from '@latitude-data/core/services/evaluations/run'
import { WebsocketClient } from '@latitude-data/core/websockets/workers'
import { env } from '@latitude-data/env'
import { Job } from 'bullmq'

import { ProgressTracker } from '../../utils/progressTracker'
Expand All @@ -19,20 +20,14 @@ type RunEvaluationJobData = {
}

export const runEvaluationJob = async (job: Job<RunEvaluationJobData>) => {
const { workspaceId, batchId, documentUuid, documentLogUuid, evaluationId } =
job.data
const websockets = await WebsocketClient.getSocket()
const {
skipProgress,
workspaceId,
batchId,
documentUuid,
documentLogUuid,
evaluationId,
} = job.data

const progressTracker = new ProgressTracker(await queues(), batchId)
const documentLogsScope = new DocumentLogsRepository(workspaceId)
const evaluationsScope = new EvaluationsRepository(workspaceId)

try {
const documentLogsScope = new DocumentLogsRepository(workspaceId)
const evaluationsScope = new EvaluationsRepository(workspaceId)
const documentLog = await documentLogsScope
.findByUuid(documentLogUuid)
.then((r) => r.unwrap())
Expand All @@ -47,25 +42,22 @@ export const runEvaluationJob = async (job: Job<RunEvaluationJobData>) => {

await progressTracker.incrementCompleted()
} catch (error) {
if (env.NODE_ENV !== 'production') {
console.error(error)
}

await progressTracker.incrementErrors()
} finally {
await progressTracker.decrementTotal()
const progress = await progressTracker.getProgress()
const finished = await progressTracker.isFinished()

console.log('DEBUG: Emitting evaluationStatus before check')
if (!skipProgress) {
console.log('DEBUG: Emitting evaluationStatus', workspaceId)
websockets.emit('evaluationStatus', {
workspaceId,
data: {
batchId,
evaluationId,
documentUuid,
status: finished ? 'finished' : 'running',
...progress,
},
})
}
websockets.emit('evaluationStatus', {
workspaceId,
data: {
batchId,
evaluationId,
documentUuid,
...progress,
},
})
}
}
28 changes: 13 additions & 15 deletions packages/jobs/src/utils/progressTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ export class ProgressTracker {

async initializeProgress(total: number) {
const multi = this.redis.multi()
multi.set(this.getKey('initialTotal'), total)

multi.set(this.getKey('total'), total)
multi.set(this.getKey('completed'), 0)
multi.set(this.getKey('enqueued'), 0)
multi.set(this.getKey('errors'), 0)

await multi.exec()
}

Expand All @@ -31,30 +33,26 @@ export class ProgressTracker {
await this.redis.incr(this.getKey('enqueued'))
}

async decrementEnqueued() {
await this.redis.decr(this.getKey('enqueued'))
}

async getProgress() {
const [initialTotal, total, completed, errors, enqueued] =
await this.redis.mget([
this.getKey('initialTotal'),
this.getKey('total'),
this.getKey('completed'),
this.getKey('errors'),
this.getKey('enqueued'),
])
const [total, completed, errors, enqueued] = await this.redis.mget([
this.getKey('total'),
this.getKey('completed'),
this.getKey('errors'),
this.getKey('enqueued'),
])

return {
initialTotal: parseInt(initialTotal || '0', 10),
total: parseInt(total || '0', 10),
completed: parseInt(completed || '0', 10),
errors: parseInt(errors || '0', 10),
enqueued: parseInt(enqueued || '0', 10),
}
}

async isFinished() {
const { enqueued, completed, errors } = await this.getProgress()
return enqueued === completed + errors
}

private getKey(suffix: string) {
return `batch:${this.batchId}:${suffix}`
}
Expand Down
Loading