diff --git a/package.json b/package.json index 6c10459..d24b3de 100644 --- a/package.json +++ b/package.json @@ -89,6 +89,7 @@ "@opentelemetry/sdk-metrics": "^1.18.1", "@opentelemetry/sdk-node": "^0.50.0", "@opentelemetry/sdk-trace-node": "^1.18.1", + "@opentelemetry/semantic-conventions": "^1.27.0", "@sentry/node": "^7.74.1", "@sentry/profiling-node": "^1.2.1", "axios": "^0.27.2", diff --git a/src/controllers/async/asyncquery.ts b/src/controllers/async/asyncquery.ts index f8703cd..4bfdc26 100644 --- a/src/controllers/async/asyncquery.ts +++ b/src/controllers/async/asyncquery.ts @@ -1,4 +1,5 @@ import axios, { AxiosError, AxiosResponse } from "axios"; +import { context, propagation } from "@opentelemetry/api"; import { customAlphabet } from "nanoid"; import * as utils from "../../utils/common"; import { redisClient } from "@biothings-explorer/utils"; @@ -42,8 +43,14 @@ export async function asyncquery( } const url = `${req.protocol}://${req.header("host")}/v1/asyncquery_status/${jobId}`; + // add OTel trace context + const otelData: Partial<{ traceparent: string; tracestate: string }> = {}; + propagation.inject(context.active(), otelData); + const { traceparent, tracestate } = otelData; + queueData = { ...queueData, traceparent, tracestate }; + const job = await queryQueue.add( - { ...queueData, url: url.replace("status", "response") }, + { ...queueData, traceparent: traceparent, tracestate: tracestate, url: url.replace("status", "response") }, { jobId: jobId, timeout: parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()), diff --git a/src/controllers/opentelemetry.ts b/src/controllers/opentelemetry.ts index 2acf1e9..2563881 100644 --- a/src/controllers/opentelemetry.ts +++ b/src/controllers/opentelemetry.ts @@ -2,18 +2,20 @@ import { NodeSDK } from "@opentelemetry/sdk-node"; import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node"; import { Resource } from "@opentelemetry/resources"; import Debug from "debug"; +import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto'; const debug = Debug("bte:biothings-explorer:otel-init"); -import { JaegerExporter } from "@opentelemetry/exporter-jaeger"; +import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions'; + debug("Initializing Opentelemetry instrumentation..."); const sdk = new NodeSDK({ - traceExporter: new JaegerExporter({ - host: process.env.JAEGER_HOST ?? "jaeger-otel-agent.sri", - port: parseInt(process.env.JAEGER_PORT ?? "6832"), + // metrics, if needed, shall be exported on a different endpoint + traceExporter: new OTLPTraceExporter({ + url: `${process.env.JAEGER_HOST ?? 'jaeger-otel-collector'}:${process.env.JAEGER_PORT ?? 4318}/v1/traces` }), instrumentations: [getNodeAutoInstrumentations()], resource: new Resource({ - "service.name": "biothings-explorer", + [ATTR_SERVICE_NAME]: "biothings-explorer", }), }); sdk.start(); diff --git a/src/controllers/threading/taskHandler.ts b/src/controllers/threading/taskHandler.ts index 9fcc41b..e082243 100644 --- a/src/controllers/threading/taskHandler.ts +++ b/src/controllers/threading/taskHandler.ts @@ -14,7 +14,7 @@ import { tasks } from "../../routes/index"; import { getQueryQueue } from "../async/asyncquery_queue"; import * as Sentry from "@sentry/node"; import { ProfilingIntegration } from "@sentry/profiling-node"; -import OpenTelemetry, { Span } from "@opentelemetry/api"; +import { Span, trace, context, propagation, Context } from "@opentelemetry/api"; import { Telemetry } from "@biothings-explorer/utils"; import { InnerTaskData } from "@biothings-explorer/types"; @@ -90,13 +90,14 @@ async function runTask({ scope.setSpan(transaction); }); - span = OpenTelemetry.trace - .getTracer("biothings-explorer-thread") - .startSpan( - routeNames[route], - undefined, - OpenTelemetry.propagation.extract(OpenTelemetry.context.active(), { traceparent, tracestate }), - ); + let activeContext: Context = propagation.extract(context.active(), { traceparent, tracestate }); + let tracer = trace.getTracer("biothings-explorer-thread") + span = tracer.startSpan( + routeNames[route], + {kind: 1}, // specifies internal span + activeContext, + ); + span.setAttribute("bte.requestData", JSON.stringify(req.data.queryGraph)); Telemetry.setOtelSpan(span); } catch (error) { diff --git a/src/controllers/threading/threadHandler.ts b/src/controllers/threading/threadHandler.ts index 8bf711e..eb52333 100644 --- a/src/controllers/threading/threadHandler.ts +++ b/src/controllers/threading/threadHandler.ts @@ -104,11 +104,8 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri const abortController = new AbortController(); const { port1: toWorker, port2: fromWorker } = new MessageChannel(); - // get otel context - - const otelData: Partial<{ traceparent: string; tracestate: string }> = {}; - propagation.inject(context.active(), otelData); - const { traceparent, tracestate } = otelData; + const traceparent: string = taskInfo.data.traceparent; + const tracestate: string = taskInfo.data.tracestate; const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: toWorker }; taskData.req.data.options = {...taskData.req.data.options, metakg: global.metakg?.ops, smartapi: global.smartapi} as QueryHandlerOptions; @@ -219,6 +216,11 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri export async function runTask(req: Request, res: Response, route: string, useBullSync = true): Promise { const queryQueue: Queue = global.queryQueue.bte_sync_query_queue; + + const otelData: Partial<{ traceparent: string; tracestate: string }> = {}; + propagation.inject(context.active(), otelData); + const { traceparent, tracestate } = otelData; + const taskInfo: TaskInfo = { data: { route, @@ -233,6 +235,8 @@ export async function runTask(req: Request, res: Response, route: string, useBul }, params: req.params, endpoint: req.originalUrl, + traceparent: traceparent, + tracestate: tracestate, }, };