From 8f9a86fa0f69122da55fe85bc9ff7d2c9543417e Mon Sep 17 00:00:00 2001 From: Francisco Lopez-Sancho Date: Mon, 19 Aug 2024 15:24:41 +0200 Subject: [PATCH] fix: adding missing tracing MDC for views (#2182) --- .../main/java/kalix/javasdk/TraceContext.java | 7 ++++ .../kalix/javasdk/impl/MetadataImpl.scala | 9 ++++++ .../javasdk/impl/telemetry/Telemetry.scala | 21 ++++++++---- .../kalix/javasdk/impl/view/ViewsImpl.scala | 32 ++++++++++++++----- .../kalix/javasdk/impl/MetadataImplSpec.scala | 16 ++++++++++ 5 files changed, 71 insertions(+), 14 deletions(-) diff --git a/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/TraceContext.java b/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/TraceContext.java index 08135ee7fd..e4f796df5c 100644 --- a/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/TraceContext.java +++ b/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/TraceContext.java @@ -38,4 +38,11 @@ public interface TraceContext { * Context section 3 */ Optional traceState(); + + /** + * Allows retrieving the trace id of the trace parent if any. + * + * @return the traceId of the traceParent if any + */ + Optional traceId(); } diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/MetadataImpl.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/MetadataImpl.scala index 185631f493..4ceaaa12e1 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/MetadataImpl.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/MetadataImpl.scala @@ -5,6 +5,7 @@ package kalix.javasdk.impl import com.google.protobuf.ByteString +import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator import kalix.javasdk.CloudEvent import kalix.javasdk.JwtClaims @@ -226,6 +227,14 @@ private[kalix] class MetadataImpl private (val entries: Seq[MetadataEntry]) exte .getInstance() .extract(OtelContext.current(), asMetadata(), otelGetter) + override def traceId(): Optional[String] = { + Span.fromContext(asOpenTelemetryContext()).getSpanContext.getTraceId match { + case "00000000000000000000000000000000" => + Optional.empty() // when no traceId returns io.opentelemetry.api.trace.TraceId.INVALID + case traceId => Some(traceId).asJava + } + } + override def traceParent(): Optional[String] = getScala(TraceInstrumentation.TRACE_PARENT_KEY).asJava override def traceState(): Optional[String] = getScala(TraceInstrumentation.TRACE_STATE_KEY).asJava diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala index 99669cfd20..7801dd915e 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala @@ -4,12 +4,19 @@ package kalix.javasdk.impl.telemetry -import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId } +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId import io.opentelemetry.api.OpenTelemetry import io.opentelemetry.api.common.Attributes +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.api.trace.Tracer import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator -import io.opentelemetry.api.trace.{ Span, SpanKind, Tracer } -import io.opentelemetry.context.propagation.{ ContextPropagators, TextMapGetter, TextMapSetter } +import io.opentelemetry.context.propagation.ContextPropagators +import io.opentelemetry.context.propagation.TextMapGetter +import io.opentelemetry.context.propagation.TextMapSetter import io.opentelemetry.context.{ Context => OtelContext } import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter import io.opentelemetry.sdk.OpenTelemetrySdk @@ -18,12 +25,15 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor import io.opentelemetry.semconv.ServiceAttributes import kalix.javasdk.Metadata -import kalix.javasdk.impl.{ MetadataImpl, ProxyInfoHolder, Service } +import kalix.javasdk.impl.MetadataImpl +import kalix.javasdk.impl.ProxyInfoHolder +import kalix.javasdk.impl.Service import kalix.protocol.action.ActionCommand import kalix.protocol.component.MetadataEntry import kalix.protocol.component.MetadataEntry.Value.StringValue import kalix.protocol.entity.Command -import org.slf4j.{ Logger, LoggerFactory } +import org.slf4j.Logger +import org.slf4j.LoggerFactory import scala.collection.mutable import scala.concurrent.ExecutionContext @@ -103,7 +113,6 @@ private[kalix] object TraceInstrumentation { val TRACE_PARENT_KEY = "traceparent" val TRACE_STATE_KEY = "tracestate" - val TRACING_ENDPOINT = "kalix.telemetry.tracing.collector-endpoint" private val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/view/ViewsImpl.scala b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/view/ViewsImpl.scala index 0566ad4b01..8bd1187145 100644 --- a/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/view/ViewsImpl.scala +++ b/sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/view/ViewsImpl.scala @@ -4,19 +4,26 @@ package kalix.javasdk.impl.view -import java.util.Optional -import scala.compat.java8.OptionConverters._ -import scala.util.control.NonFatal import akka.actor.ActorSystem import akka.stream.scaladsl.Source -import kalix.javasdk.impl.{ Service, ViewFactory } +import com.google.protobuf.Descriptors +import com.google.protobuf.any.{ Any => ScalaPbAny } import kalix.javasdk.Metadata +import kalix.javasdk.impl.Service +import kalix.javasdk.impl.ViewFactory import kalix.javasdk.impl._ -import kalix.javasdk.view.{ UpdateContext, ViewContext, ViewCreationContext, ViewOptions } +import kalix.javasdk.impl.telemetry.Telemetry +import kalix.javasdk.view.UpdateContext +import kalix.javasdk.view.ViewContext +import kalix.javasdk.view.ViewCreationContext +import kalix.javasdk.view.ViewOptions import kalix.protocol.{ view => pv } -import com.google.protobuf.Descriptors -import com.google.protobuf.any.{ Any => ScalaPbAny } import org.slf4j.LoggerFactory +import org.slf4j.MDC + +import java.util.Optional +import scala.jdk.OptionConverters._ +import scala.util.control.NonFatal /** INTERNAL API */ final class ViewService( @@ -38,7 +45,7 @@ final class ViewService( this(factory, descriptor, additionalDescriptors, messageCodec, viewId, Some(viewOptions)) override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] = - factory.asScala.collect { case resolved: ResolvedEntityFactory => + factory.toScala.collect { case resolved: ResolvedEntityFactory => resolved.resolvedMethods } @@ -93,6 +100,13 @@ final class ViewsImpl(system: ActorSystem, _services: Map[String, ViewService]) val commandName = receiveEvent.commandName val msg = service.messageCodec.decodeMessage(receiveEvent.payload.get) val metadata = MetadataImpl.of(receiveEvent.metadata.map(_.entries.toVector).getOrElse(Nil)) + val addedToMDC = metadata.traceContext.traceId().toScala match { + case Some(traceId) => + MDC.put(Telemetry.TRACE_ID, traceId) + true + case None => false + } + val context = new UpdateContextImpl(service.viewId, commandName, metadata) val effect = @@ -102,6 +116,8 @@ final class ViewsImpl(system: ActorSystem, _services: Map[String, ViewService]) case e: ViewException => throw e case NonFatal(error) => throw ViewException(context, s"View unexpected failure: ${error.getMessage}", Some(error)) + } finally { + if (addedToMDC) MDC.remove(Telemetry.TRACE_ID) } effect match { diff --git a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/MetadataImplSpec.scala b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/MetadataImplSpec.scala index 194552c3a3..ae101da77a 100644 --- a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/MetadataImplSpec.scala +++ b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/impl/MetadataImplSpec.scala @@ -154,6 +154,22 @@ class MetadataImplSpec extends AnyWordSpec with Matchers with OptionValues { ce.specversion() shouldBe "1.0" ce.`type`() shouldBe "foo" } + + "be able to find the traceId in a traceParent" in { + val metadata = MetadataImpl.of( + Seq( + MetadataEntry( + "traceparent", + MetadataEntry.Value.StringValue("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")))) + metadata.traceContext.traceId() shouldBe Optional.of("4bf92f3577b34da6a3ce929d0e0e4736") + } + + "return '00000000000000000000000000000000' if no traceId is found" in { + Metadata.EMPTY + .traceContext() + .traceId() shouldBe Optional.empty() + + } } private def metadata(entries: (String, String)*): Metadata = {