Skip to content

Commit

Permalink
Feature/sentry in gateway and workers (#207)
Browse files Browse the repository at this point in the history
* chore: add sentry to workers

* chore: added sentry to gateway
  • Loading branch information
geclos authored Sep 18, 2024
1 parent edfd8f6 commit c1eb4c6
Show file tree
Hide file tree
Showing 23 changed files with 164 additions and 134 deletions.
1 change: 1 addition & 0 deletions apps/gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@latitude-data/env": "workspace:^",
"@latitude-data/jobs": "workspace:^",
"@latitude-data/mailers": "workspace:^",
"@sentry/node": "^8.30.0",
"@t3-oss/env-core": "^0.10.1",
"drizzle-orm": "^0.33.0",
"hono": "^4.5.3",
Expand Down
24 changes: 24 additions & 0 deletions apps/gateway/src/common/sentry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { env } from '@latitude-data/env'
import * as Sentry from '@sentry/node'

Sentry.init({
dsn: env.SENTRY_DSN,

tracesSampleRate: 1.0,
})

export const captureException = (error: Error) => {
if (env.NODE_ENV === 'production') {
Sentry.captureException(error)
} else {
console.error(error)
}
}

export const captureMessage = (message: string) => {
if (env.NODE_ENV === 'production') {
Sentry.captureMessage(message)
} else {
console.log(message)
}
}
3 changes: 2 additions & 1 deletion apps/gateway/src/middlewares/errorHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
LatitudeError,
UnprocessableEntityError,
} from '@latitude-data/core/lib/errors'
import { captureException } from '$/common/sentry'
import { createMiddleware } from 'hono/factory'

import HttpStatusCodes from '../common/httpStatusCodes'
Expand All @@ -12,7 +13,7 @@ const errorHandlerMiddleware = () =>
if (!err) return next()

if (process.env.NODE_ENV !== 'test') {
console.error(err.message)
captureException(err)
}

if (err instanceof UnprocessableEntityError) {
Expand Down
3 changes: 0 additions & 3 deletions apps/gateway/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ serve(
},
)

// Add graceful shutdown handler
function gracefulShutdown() {
console.log('Received termination signal. Shutting down gracefully...')
// Perform any cleanup operations here
process.exit(0)
}

// Register signal handlers
process.on('SIGTERM', gracefulShutdown)
process.on('SIGINT', gracefulShutdown)
1 change: 1 addition & 0 deletions apps/workers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"@latitude-data/env": "workspace:^",
"@latitude-data/jobs": "workspace:^",
"@latitude-data/mailers": "workspace:^",
"@sentry/node": "^8.30.0",
"@t3-oss/env-core": "*",
"zod": "^3.23.8"
},
Expand Down
21 changes: 0 additions & 21 deletions apps/workers/src/env.ts

This file was deleted.

11 changes: 6 additions & 5 deletions apps/workers/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import http from 'http'

import { setupWorkers } from '@latitude-data/jobs'
import { captureException, captureMessage } from './utils/sentry'
import startWorkers from './workers'

const workers = setupWorkers()
const workers = startWorkers()

console.log('Workers started')

Expand Down Expand Up @@ -33,13 +34,13 @@ process.on('SIGINT', () => gracefulShutdown('SIGINT'))
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))

process.on('uncaughtException', function (err) {
// TODO: Sentry.captureException(err)
captureException(err)

console.error(err, 'Uncaught exception')
})

process.on('unhandledRejection', (reason, promise) => {
// TODO: Sentry.captureException(reason)
process.on('unhandledRejection', (reason: string, promise) => {
captureMessage(reason)

console.error({ promise, reason }, 'Unhandled Rejection at: Promise')
})
24 changes: 24 additions & 0 deletions apps/workers/src/utils/sentry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { env } from '@latitude-data/env'
import * as Sentry from '@sentry/node'

Sentry.init({
dsn: env.SENTRY_DSN,

tracesSampleRate: 1.0,
})

export const captureException = (error: Error) => {
if (env.NODE_ENV === 'production') {
Sentry.captureException(error)
} else {
console.error(error)
}
}

export const captureMessage = (message: string) => {
if (env.NODE_ENV === 'production') {
Sentry.captureMessage(message)
} else {
console.log(message)
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Queues, QUEUES } from '@latitude-data/jobs/constants'
import { captureException } from '$/utils/sentry'
import { Processor } from 'bullmq'

import { Queues } from '../constants'
import { QUEUES } from '../queues'

export const buildProcessor =
(queues: Queues[]): Processor =>
async (job) => {
Expand All @@ -13,10 +12,10 @@ export const buildProcessor =
if (j.name === job.name) {
try {
await j(job)
} catch (err) {
console.error(err)
} catch (error) {
captureException(error as Error)

throw err
throw error
}
}
}),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { queues } from '@latitude-data/core/queues'
import { captureException } from '$/utils/sentry'
import { Worker } from 'bullmq'

import { connection } from '../utils/connection'
import { defaultWorker } from './worker-definitions/defaultWorker'

const WORKER_OPTS = {
Expand All @@ -18,11 +19,11 @@ export default function startWorkers() {
w.queues.map((q) => {
const worker = new Worker(q, w.processor, {
...WORKER_OPTS,
connection,
connection: queues(),
})

worker.on('error', (error: Error) => {
console.error(error)
captureException(error)
})

return worker
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Queues } from '@latitude-data/jobs/constants'

import { buildProcessor } from '../_shared'
import { Queues } from '../../constants'

const defaultWorkerQueues = [
Queues.defaultQueue,
Queues.eventsQueue,
Queues.eventHandlersQueue,
Queues.eventsQueue,
]

export const defaultWorker = {
Expand Down
10 changes: 6 additions & 4 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
"type": "module",
"exports": {
"./browser": "./src/browser.ts",
"./data-access": "./src/data-access/index.ts",
"./cache": "./src/cache/index.ts",
"./client": "./src/client/index.ts",
"./data-access": "./src/data-access/index.ts",
"./events/*": "./src/events/*.ts",
"./lib/*": "./src/lib/*.ts",
"./queues": "./src/queues/index.ts",
"./redis": "./src/redis/index.ts",
"./websockets/*": "./src/websockets/*.ts",
"./repositories": "./src/repositories/index.ts",
"./repositories/*": "./src/repositories/*.ts",
"./schema": "./src/schema/index.ts",
"./schema/*": "./src/schema/*.ts",
"./services/*": "./src/services/*.ts",
"./repositories": "./src/repositories/index.ts",
"./repositories/*": "./src/repositories/*.ts",
"./websockets/*": "./src/websockets/*.ts",
"./factories": "./src/tests/factories/index.ts",
"./test": "./src/tests/useTestDatabase.ts"
},
Expand Down
20 changes: 20 additions & 0 deletions packages/core/src/queues/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { env } from '@latitude-data/env'
import Redis from 'ioredis'

import { buildRedisConnection } from '../redis'

let connection: Redis

export const queues = () => {
if (connection) return connection

connection = buildRedisConnection({
host: env.QUEUE_HOST,
port: env.QUEUE_PORT,
password: env.QUEUE_PASSWORD,
enableOfflineQueue: false,
maxRetriesPerRequest: null,
})

return connection
}
5 changes: 4 additions & 1 deletion packages/jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
"name": "@latitude-data/jobs",
"version": "0.0.1",
"description": "Latitude jobs from Latitude llm",
"main": "./src/index.ts",
"exports": {
".": "./src/index.ts",
"./constants": "./src/constants.ts"
},
"type": "module",
"scripts": {
"lint": "eslint src",
Expand Down
21 changes: 0 additions & 21 deletions packages/jobs/src/connection/index.ts

This file was deleted.

29 changes: 29 additions & 0 deletions packages/jobs/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
import { createProviderLogJob } from '@latitude-data/core/events/handlers/createProviderLogJob'
import { EventHandlers } from '@latitude-data/core/events/handlers/index'

import { runBatchEvaluationJob } from './job-definitions/batchEvaluations/runBatchEvaluationJob'
import { runDocumentJob } from './job-definitions/batchEvaluations/runDocumentJob'
import { runEvaluationJob } from './job-definitions/batchEvaluations/runEvaluationJob'
import { createEventJob } from './job-definitions/events/createEventJob'
import { publishEventJob } from './job-definitions/events/publishEventJob'

// TODO: Review if we can remove this declarations
export enum Queues {
defaultQueue = 'defaultQueue',
Expand All @@ -12,3 +21,23 @@ export enum Jobs {
createEventJob = 'createEventJob',
publishEventJob = 'publishEventJob',
}

export const QUEUES = {
[Queues.defaultQueue]: {
name: Queues.defaultQueue,
jobs: [
runBatchEvaluationJob,
runDocumentJob,
runEvaluationJob,
createProviderLogJob,
],
},
[Queues.eventsQueue]: {
name: Queues.eventsQueue,
jobs: [publishEventJob, createEventJob],
},
[Queues.eventHandlersQueue]: {
name: Queues.eventHandlersQueue,
jobs: Object.values(EventHandlers).flat(),
},
} as const
5 changes: 0 additions & 5 deletions packages/jobs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { setupQueues } from './queues'
import startWorkers from './workers'

export { Worker } from 'bullmq'

Expand All @@ -10,7 +9,3 @@ export function setupJobs() {

return queues
}

export function setupWorkers() {
return startWorkers()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import {
} from '@latitude-data/core/browser'
import { findWorkspaceFromDocument } from '@latitude-data/core/data-access'
import { NotFoundError } from '@latitude-data/core/lib/errors'
import { queues } from '@latitude-data/core/queues'
import { CommitsRepository } from '@latitude-data/core/repositories'
import { previewDataset } from '@latitude-data/core/services/datasets/preview'
import { WebsocketClient } from '@latitude-data/core/websockets/workers'
import { Job } from 'bullmq'

import { setupJobs } from '../..'
import { connection } from '../../utils/connection'
import { ProgressTracker } from '../../utils/progressTracker'

type RunBatchEvaluationJobParams = {
Expand Down Expand Up @@ -66,15 +66,15 @@ export const runBatchEvaluationJob = async (
)
})

const progressTracker = new ProgressTracker(connection, batchId)
const progressTracker = new ProgressTracker(queues(), batchId)
const firstAttempt = job.attemptsMade === 0

if (firstAttempt) {
await progressTracker.initializeProgress(parameters.length)
}

const progress = await progressTracker.getProgress()
const queues = setupJobs()
const jobs = setupJobs()

if (firstAttempt && parameters.length > 0) {
websockets.emit('evaluationStatus', {
Expand All @@ -95,7 +95,7 @@ export const runBatchEvaluationJob = async (
// enqueued job. This allows us to resume the batch if the job fails.
for (let i = progress.enqueued; i < parameters.length; i++) {
const isFirstEnqueued = progress.enqueued === 0
await queues.defaultQueue.jobs.enqueueRunDocumentJob({
await jobs.defaultQueue.jobs.enqueueRunDocumentJob({
workspaceId: workspace.id,
documentUuid: document.documentUuid,
commitUuid: commit.uuid,
Expand Down
Loading

0 comments on commit c1eb4c6

Please sign in to comment.