Skip to content

Commit

Permalink
fix: adding missing tracing MDC for views (#2182)
Browse files Browse the repository at this point in the history
  • Loading branch information
franciscolopezsancho authored Aug 19, 2024
1 parent cd3e433 commit 8f9a86f
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ public interface TraceContext {
* Context section 3</a>
*/
Optional<String> traceState();

/**
* Allows retrieving the trace id of the trace parent if any.
*
* @return the traceId of the traceParent if any
*/
Optional<String> traceId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kalix.javasdk.impl

import com.google.protobuf.ByteString
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import kalix.javasdk.CloudEvent
import kalix.javasdk.JwtClaims
Expand Down Expand Up @@ -226,6 +227,14 @@ private[kalix] class MetadataImpl private (val entries: Seq[MetadataEntry]) exte
.getInstance()
.extract(OtelContext.current(), asMetadata(), otelGetter)

override def traceId(): Optional[String] = {
Span.fromContext(asOpenTelemetryContext()).getSpanContext.getTraceId match {
case "00000000000000000000000000000000" =>
Optional.empty() // when no traceId returns io.opentelemetry.api.trace.TraceId.INVALID
case traceId => Some(traceId).asJava
}
}

override def traceParent(): Optional[String] = getScala(TraceInstrumentation.TRACE_PARENT_KEY).asJava

override def traceState(): Optional[String] = getScala(TraceInstrumentation.TRACE_STATE_KEY).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@

package kalix.javasdk.impl.telemetry

import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId }
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.api.trace.{ Span, SpanKind, Tracer }
import io.opentelemetry.context.propagation.{ ContextPropagators, TextMapGetter, TextMapSetter }
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.context.propagation.TextMapGetter
import io.opentelemetry.context.propagation.TextMapSetter
import io.opentelemetry.context.{ Context => OtelContext }
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
Expand All @@ -18,12 +25,15 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor
import io.opentelemetry.semconv.ServiceAttributes
import kalix.javasdk.Metadata
import kalix.javasdk.impl.{ MetadataImpl, ProxyInfoHolder, Service }
import kalix.javasdk.impl.MetadataImpl
import kalix.javasdk.impl.ProxyInfoHolder
import kalix.javasdk.impl.Service
import kalix.protocol.action.ActionCommand
import kalix.protocol.component.MetadataEntry
import kalix.protocol.component.MetadataEntry.Value.StringValue
import kalix.protocol.entity.Command
import org.slf4j.{ Logger, LoggerFactory }
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -103,7 +113,6 @@ private[kalix] object TraceInstrumentation {

val TRACE_PARENT_KEY = "traceparent"
val TRACE_STATE_KEY = "tracestate"

val TRACING_ENDPOINT = "kalix.telemetry.tracing.collector-endpoint"

private val logger: Logger = LoggerFactory.getLogger(getClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@

package kalix.javasdk.impl.view

import java.util.Optional
import scala.compat.java8.OptionConverters._
import scala.util.control.NonFatal
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import kalix.javasdk.impl.{ Service, ViewFactory }
import com.google.protobuf.Descriptors
import com.google.protobuf.any.{ Any => ScalaPbAny }
import kalix.javasdk.Metadata
import kalix.javasdk.impl.Service
import kalix.javasdk.impl.ViewFactory
import kalix.javasdk.impl._
import kalix.javasdk.view.{ UpdateContext, ViewContext, ViewCreationContext, ViewOptions }
import kalix.javasdk.impl.telemetry.Telemetry
import kalix.javasdk.view.UpdateContext
import kalix.javasdk.view.ViewContext
import kalix.javasdk.view.ViewCreationContext
import kalix.javasdk.view.ViewOptions
import kalix.protocol.{ view => pv }
import com.google.protobuf.Descriptors
import com.google.protobuf.any.{ Any => ScalaPbAny }
import org.slf4j.LoggerFactory
import org.slf4j.MDC

import java.util.Optional
import scala.jdk.OptionConverters._
import scala.util.control.NonFatal

/** INTERNAL API */
final class ViewService(
Expand All @@ -38,7 +45,7 @@ final class ViewService(
this(factory, descriptor, additionalDescriptors, messageCodec, viewId, Some(viewOptions))

override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] =
factory.asScala.collect { case resolved: ResolvedEntityFactory =>
factory.toScala.collect { case resolved: ResolvedEntityFactory =>
resolved.resolvedMethods
}

Expand Down Expand Up @@ -93,6 +100,13 @@ final class ViewsImpl(system: ActorSystem, _services: Map[String, ViewService])
val commandName = receiveEvent.commandName
val msg = service.messageCodec.decodeMessage(receiveEvent.payload.get)
val metadata = MetadataImpl.of(receiveEvent.metadata.map(_.entries.toVector).getOrElse(Nil))
val addedToMDC = metadata.traceContext.traceId().toScala match {
case Some(traceId) =>
MDC.put(Telemetry.TRACE_ID, traceId)
true
case None => false
}

val context = new UpdateContextImpl(service.viewId, commandName, metadata)

val effect =
Expand All @@ -102,6 +116,8 @@ final class ViewsImpl(system: ActorSystem, _services: Map[String, ViewService])
case e: ViewException => throw e
case NonFatal(error) =>
throw ViewException(context, s"View unexpected failure: ${error.getMessage}", Some(error))
} finally {
if (addedToMDC) MDC.remove(Telemetry.TRACE_ID)
}

effect match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,22 @@ class MetadataImplSpec extends AnyWordSpec with Matchers with OptionValues {
ce.specversion() shouldBe "1.0"
ce.`type`() shouldBe "foo"
}

"be able to find the traceId in a traceParent" in {
val metadata = MetadataImpl.of(
Seq(
MetadataEntry(
"traceparent",
MetadataEntry.Value.StringValue("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"))))
metadata.traceContext.traceId() shouldBe Optional.of("4bf92f3577b34da6a3ce929d0e0e4736")
}

"return '00000000000000000000000000000000' if no traceId is found" in {
Metadata.EMPTY
.traceContext()
.traceId() shouldBe Optional.empty()

}
}

private def metadata(entries: (String, String)*): Metadata = {
Expand Down

0 comments on commit 8f9a86f

Please sign in to comment.