From 0a29568cf4a87c37b31c1ff4933fba67d7fcbba0 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Fri, 29 Dec 2023 20:21:53 +0200 Subject: [PATCH] sdk-exporter: add `OtlpHttpSpanExporter` --- .github/workflows/ci.yml | 6 +- build.sbt | 112 +++++- flake.nix | 1 + project/DockerComposeEnvPlugin.scala | 60 +++ project/plugins.sbt | 6 + .../otel4s/sdk/exporter/RetryPolicy.scala | 188 ++++++++++ .../sdk/exporter/RetryPolicySuite.scala | 67 ++++ .../docker/config/otel-collector-config.yaml | 22 ++ sdk-exporter/trace/docker/docker-compose.yml | 17 + .../exporter/otlp/ExporterSuitePlatform.scala | 29 ++ .../exporter/otlp/ExporterSuitePlatform.scala | 21 ++ .../exporter/otlp/ExporterSuitePlatform.scala | 25 ++ .../otel4s/sdk/exporter/otlp/JsonCodecs.scala | 211 +++++++++++ .../exporter/otlp/OtlpHttpSpanExporter.scala | 324 +++++++++++++++++ .../sdk/exporter/otlp/ProtoCodecs.scala | 191 ++++++++++ .../sdk/exporter/otlp/JsonCodecsSuite.scala | 341 ++++++++++++++++++ .../otlp/OtlpHttpSpanExporterSuite.scala | 289 +++++++++++++++ .../org/typelevel/otel4s/sdk/trace/Gens.scala | 2 +- 18 files changed, 1907 insertions(+), 5 deletions(-) create mode 100644 project/DockerComposeEnvPlugin.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicy.scala create mode 100644 sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicySuite.scala create mode 100644 sdk-exporter/trace/docker/config/otel-collector-config.yaml create mode 100644 sdk-exporter/trace/docker/docker-compose.yml create mode 100644 sdk-exporter/trace/js/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala create mode 100644 sdk-exporter/trace/jvm/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala create mode 100644 sdk-exporter/trace/native/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala create mode 100644 sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecs.scala create mode 100644 sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpSpanExporter.scala create mode 100644 sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoCodecs.scala create mode 100644 sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecsSuite.scala create mode 100644 sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpSpanExporterSuite.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 35257f8d8..44f0e1b91 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,6 +52,10 @@ jobs: if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' run: sbt +update + - name: Install brew formulae (ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: /home/linuxbrew/.linuxbrew/bin/brew install s2n utf8proc + - name: Check that workflows are up to date run: sbt githubWorkflowCheck @@ -239,7 +243,7 @@ jobs: - name: Submit Dependencies uses: scalacenter/sbt-dependency-submission@v2 with: - modules-ignore: otel4s-sdk-common_native0.4_2.13 otel4s-sdk-common_native0.4_3 otel4s-benchmarks_2.13 otel4s-benchmarks_3 otel4s-examples_2.13 otel4s-examples_3 otel4s-sdk-common_sjs1_2.13 otel4s-sdk-common_sjs1_3 otel4s-sdk-trace_sjs1_2.13 otel4s-sdk-trace_sjs1_3 otel4s_2.13 otel4s_3 docs_2.13 docs_3 otel4s-sdk_native0.4_2.13 otel4s-sdk_native0.4_3 otel4s-sdk-common_2.13 otel4s-sdk-common_3 otel4s_2.13 otel4s_3 otel4s_2.13 otel4s_3 otel4s-sdk-trace_native0.4_2.13 otel4s-sdk-trace_native0.4_3 otel4s-sdk_sjs1_2.13 otel4s-sdk_sjs1_3 otel4s-sdk_2.13 otel4s-sdk_3 otel4s-sdk-trace_2.13 otel4s-sdk-trace_3 + modules-ignore: otel4s-sdk-exporter-common_sjs1_2.13 otel4s-sdk-exporter-common_sjs1_3 otel4s-sdk-common_native0.4_2.13 otel4s-sdk-common_native0.4_3 otel4s-benchmarks_2.13 otel4s-benchmarks_3 otel4s-examples_2.13 otel4s-examples_3 otel4s-sdk-common_sjs1_2.13 otel4s-sdk-common_sjs1_3 otel4s-sdk-exporter_2.13 otel4s-sdk-exporter_3 otel4s-sdk-trace_sjs1_2.13 otel4s-sdk-trace_sjs1_3 otel4s-sdk-exporter-common_native0.4_2.13 otel4s-sdk-exporter-common_native0.4_3 otel4s-sdk-exporter-trace_2.13 otel4s-sdk-exporter-trace_3 otel4s_2.13 otel4s_3 docs_2.13 docs_3 otel4s-sdk-exporter-proto_2.13 otel4s-sdk-exporter-proto_3 otel4s-sdk-exporter-proto_sjs1_2.13 otel4s-sdk-exporter-proto_sjs1_3 otel4s-sdk_native0.4_2.13 otel4s-sdk_native0.4_3 otel4s-sdk-exporter-trace_native0.4_2.13 otel4s-sdk-exporter-trace_native0.4_3 otel4s-sdk-common_2.13 otel4s-sdk-common_3 otel4s_2.13 otel4s_3 otel4s_2.13 otel4s_3 otel4s-sdk-trace_native0.4_2.13 otel4s-sdk-trace_native0.4_3 otel4s-sdk-exporter-proto_native0.4_2.13 otel4s-sdk-exporter-proto_native0.4_3 otel4s-sdk-exporter-common_2.13 otel4s-sdk-exporter-common_3 otel4s-sdk-exporter-trace_sjs1_2.13 otel4s-sdk-exporter-trace_sjs1_3 otel4s-sdk-exporter_sjs1_2.13 otel4s-sdk-exporter_sjs1_3 otel4s-sdk_sjs1_2.13 otel4s-sdk_sjs1_3 otel4s-sdk_2.13 otel4s-sdk_3 otel4s-sdk-exporter_native0.4_2.13 otel4s-sdk-exporter_native0.4_3 otel4s-sdk-trace_2.13 otel4s-sdk-trace_3 configs-ignore: test scala-tool scala-doc-tool test-internal validate-steward: diff --git a/build.sbt b/build.sbt index 3ffc927f4..23840e2dd 100644 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,3 @@ -import com.typesafe.tools.mima.core._ - ThisBuild / tlBaseVersion := "0.5" ThisBuild / organization := "org.typelevel" @@ -27,6 +25,8 @@ val Scala213 = "2.13.12" ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.1") ThisBuild / scalaVersion := Scala213 // the default Scala +ThisBuild / githubWorkflowBuildPreamble ++= nativeBrewInstallWorkflowSteps.value + val CatsVersion = "2.10.0" val CatsEffectVersion = "3.5.2" val CatsMtlVersion = "1.4.0" @@ -38,11 +38,15 @@ val MUnitScalaCheckEffectVersion = "2.0.0-M2" val OpenTelemetryVersion = "1.33.0" val OpenTelemetryInstrumentationVersion = "1.32.0" val OpenTelemetrySemConvVersion = "1.23.1-alpha" +val OpenTelemetryProtoVersion = "1.0.0-alpha" val PekkoStreamVersion = "1.0.2" val PekkoHttpVersion = "1.0.0" val PlatformVersion = "1.0.2" val ScodecVersion = "1.1.38" val VaultVersion = "3.5.0" +val Http4sVersion = "0.23.25" +val CirceVersion = "0.14.6" +val EpollcatVersion = "0.1.6" lazy val scalaReflectDependency = Def.settings( libraryDependencies ++= { @@ -77,6 +81,10 @@ lazy val root = tlCrossRootProject `sdk-common`, `sdk-trace`, sdk, + `sdk-exporter-common`, + `sdk-exporter-proto`, + `sdk-exporter-trace`, + `sdk-exporter`, `testkit-common`, `testkit-metrics`, testkit, @@ -198,12 +206,107 @@ lazy val sdk = crossProject(JVMPlatform, JSPlatform, NativePlatform) .crossType(CrossType.Pure) .enablePlugins(NoPublishPlugin) .in(file("sdk/all")) - .dependsOn(`sdk-common`, `sdk-trace`) + .dependsOn(core, `sdk-common`, `sdk-trace`) .settings( name := "otel4s-sdk" ) .settings(scalafixSettings) +// +// SDK exporter +// + +lazy val `sdk-exporter-common` = + crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Pure) + .in(file("sdk-exporter/common")) + .enablePlugins(NoPublishPlugin) + .dependsOn(`sdk-common`) + .settings( + name := "otel4s-sdk-exporter-common", + startYear := Some(2023), + libraryDependencies ++= Seq( + "org.typelevel" %%% "cats-laws" % CatsVersion % Test, + "org.typelevel" %%% "discipline-munit" % MUnitDisciplineVersion % Test + ) + ) + .settings(munitDependencies) + .settings(scalafixSettings) + +lazy val `sdk-exporter-proto` = + crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Pure) + .enablePlugins(NoPublishPlugin) + .in(file("sdk-exporter/proto")) + .settings( + name := "otel4s-sdk-exporter-proto", + Compile / PB.protoSources += baseDirectory.value.getParentFile / "src" / "main" / "protobuf", + Compile / PB.targets ++= Seq( + scalapb.gen(grpc = false) -> (Compile / sourceManaged).value / "scalapb" + ), + scalacOptions := { + val opts = scalacOptions.value + if (tlIsScala3.value) opts.filterNot(_ == "-Wvalue-discard") else opts + }, + // We use open-telemetry protobuf spec to generate models + // See https://scalapb.github.io/docs/third-party-protos/#there-is-a-library-on-maven-with-the-protos-and-possibly-generated-java-code + libraryDependencies ++= Seq( + "io.opentelemetry.proto" % "opentelemetry-proto" % OpenTelemetryProtoVersion % "protobuf-src" intransitive () + ) + ) + +lazy val `sdk-exporter-trace` = + crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Full) + .in(file("sdk-exporter/trace")) + .enablePlugins(NoPublishPlugin, DockerComposeEnvPlugin) + .dependsOn( + `sdk-exporter-common`, + `sdk-exporter-proto`, + `sdk-trace` % "compile->compile;test->test" + ) + .settings( + name := "otel4s-sdk-exporter-trace", + startYear := Some(2023), + libraryDependencies ++= Seq( + "org.http4s" %%% "http4s-ember-client" % Http4sVersion, + "org.http4s" %%% "http4s-circe" % Http4sVersion, + "org.scalameta" %%% "munit-scalacheck" % MUnitVersion % Test, + "io.circe" %%% "circe-generic" % CirceVersion % Test + ), + dockerComposeEnvFile := crossProjectBaseDirectory.value / "docker" / "docker-compose.yml" + ) + .jsSettings( + scalaJSLinkerConfig ~= (_.withESFeatures( + _.withESVersion(org.scalajs.linker.interface.ESVersion.ES2018) + )), + Test / scalaJSLinkerConfig ~= (_.withModuleKind( + ModuleKind.CommonJSModule + )) + ) + .nativeEnablePlugins(ScalaNativeBrewedConfigPlugin) + .nativeSettings( + libraryDependencies += "com.armanbilge" %%% "epollcat" % EpollcatVersion % Test, + Test / nativeBrewFormulas ++= Set("s2n", "utf8proc"), + Test / envVars ++= Map("S2N_DONT_MLOCK" -> "1") + ) + .settings(munitDependencies) + .settings(scalafixSettings) + +lazy val `sdk-exporter` = crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Pure) + .in(file("sdk-exporter/all")) + .enablePlugins(NoPublishPlugin) + .dependsOn(`sdk-exporter-common`, `sdk-exporter-trace`) + .settings( + name := "otel4s-sdk-exporter" + ) + .settings(scalafixSettings) + +// +// Testkit +// + lazy val `testkit-common` = crossProject(JVMPlatform) .crossType(CrossType.Full) .in(file("testkit/common")) @@ -399,6 +502,9 @@ lazy val unidocs = project `sdk-common`.jvm, `sdk-trace`.jvm, sdk.jvm, + `sdk-exporter-common`.jvm, + `sdk-exporter-trace`.jvm, + `sdk-exporter`.jvm, `testkit-common`.jvm, `testkit-metrics`.jvm, testkit.jvm, diff --git a/flake.nix b/flake.nix index 9dbb4d808..fd691edf5 100644 --- a/flake.nix +++ b/flake.nix @@ -21,6 +21,7 @@ jdk.package = pkgs.jdk8; nodejs.enable = true; native.enable = true; + native.libraries = [ pkgs.zlib pkgs.s2n ]; }; }; } diff --git a/project/DockerComposeEnvPlugin.scala b/project/DockerComposeEnvPlugin.scala new file mode 100644 index 000000000..c164a3a30 --- /dev/null +++ b/project/DockerComposeEnvPlugin.scala @@ -0,0 +1,60 @@ +import sbt._ +import sbt.Keys._ + +import scala.sys.process._ + +object DockerComposeEnvPlugin extends AutoPlugin { + + override def trigger = noTrigger + + object autoImport { + lazy val dockerComposeEnvStart = + taskKey[Unit]("Start local docker compose environment") + lazy val dockerComposeEnvStop = + taskKey[Unit]("Stop local docker compose environment") + lazy val dockerComposeEnvTestOpts = + taskKey[Seq[TestOption]]("Setup and cleanup options") + lazy val dockerComposeEnvFile = + settingKey[File]("The docker compose file to use") + } + + import autoImport._ + + override def projectSettings: Seq[Setting[_]] = + Seq( + dockerComposeEnvStart := { + val projectName = name.value + val file = dockerComposeEnvFile.value + val log = streams.value.log + + start(projectName, file, log) + }, + dockerComposeEnvStop := { + val projectName = name.value + val file = dockerComposeEnvFile.value + val log = streams.value.log + + stop(projectName, file, log) + }, + Test / testOptions := { + val projectName = name.value + val file = dockerComposeEnvFile.value + val log = streams.value.log + + val setup = Tests.Setup(() => start(projectName, file, log)) + + Seq(setup) + } + ) + + private def start(projectName: String, file: File, log: Logger): Unit = { + log.info(s"Project [$projectName]. Creating integration environment") + s"docker compose -f $file -p $projectName up -d".! + } + + private def stop(projectName: String, file: File, log: Logger): Unit = { + log.info(s"Project [$projectName]. Terminating integration environment") + s"docker compose -f $file -p $projectName down".! + } + +} diff --git a/project/plugins.sbt b/project/plugins.sbt index e3a04c5cf..bb9137cd3 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -7,3 +7,9 @@ addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.2") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.7") addSbtPlugin("com.github.sbt" % "sbt-javaagent" % "0.1.8") +addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6") +addSbtPlugin( + "com.armanbilge" % "sbt-scala-native-config-brew-github-actions" % "0.2.0-RC1" +) + +libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.14" diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicy.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicy.scala new file mode 100644 index 000000000..165f87989 --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicy.scala @@ -0,0 +1,188 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter + +import cats.Show +import cats.kernel.Hash +import cats.syntax.show._ + +import scala.concurrent.duration._ + +/** The exponential retry policy. Used by the exporters. + */ +sealed trait RetryPolicy { + + /** The max number of attempts, including the original request. + */ + def maxAttempts: Int + + /** The initial backoff duration. + */ + def initialBackoff: FiniteDuration + + /** The max backoff duration. + */ + def maxBackoff: FiniteDuration + + /** The backoff multiplier. + */ + def backoffMultiplier: Double + + override final def hashCode(): Int = + Hash[RetryPolicy].hash(this) + + override final def equals(obj: Any): Boolean = + obj match { + case other: RetryPolicy => Hash[RetryPolicy].eqv(this, other) + case _ => false + } + + override final def toString: String = + Show[RetryPolicy].show(this) +} + +object RetryPolicy { + + private object Defaults { + val MaxAttempts: Int = 5 + val InitialBackoff: FiniteDuration = 1.second + val MaxBackoff: FiniteDuration = 5.seconds + val BackoffMultiplier: Double = 1.5 + } + + private val Default = Impl( + Defaults.MaxAttempts, + Defaults.InitialBackoff, + Defaults.MaxBackoff, + Defaults.BackoffMultiplier + ) + + /** A builder of [[RetryPolicy]]. + */ + sealed trait Builder { + + /** Sets the number of maximum attempts. + * + * The default value is `5`. + * + * @note + * must be greater than `1` and less than `6`. + * + * @param max + * the number of maximum attempts to use + */ + def withMaxAttempts(max: Int): Builder + + /** Sets the initial backoff duration. + * + * The default value is `1 second`. + * + * @note + * must be greater than `0`. + * + * @param duration + * the initial backoff duration to use + */ + def withInitialBackoff(duration: FiniteDuration): Builder + + /** Sets the max backoff duration. + * + * The default value is `5 seconds`. + * + * @note + * must be greater than `0`. + * + * @param duration + * the max backoff duration to use + */ + def withMaxBackoff(duration: FiniteDuration): Builder + + /** Sets the backoff multiplier. + * + * The default value is `1.5`. + * + * @note + * must be greater than `0.0`. + * + * @param multiplier + * the backoff multiplier to use + */ + def withBackoffMultiplier(multiplier: Double): Builder + + /** Creates a [[RetryPolicy]] using the configuration of this builder. + */ + def build: RetryPolicy + } + + /** A [[RetryPolicy]] with the default configuration. + */ + def default: RetryPolicy = Default + + /** A [[Builder]] of [[RetryPolicy]] with the default configuration. + */ + def builder: Builder = Default + + implicit val retryPolicyShow: Show[RetryPolicy] = + Show.show { policy => + "RetryPolicy{" + + show"maxAttempts=${policy.maxAttempts}, " + + show"initialBackoff=${policy.initialBackoff}, " + + show"maxBackoff=${policy.maxBackoff}, " + + show"backoffMultiplier=${policy.backoffMultiplier}}" + } + + implicit val retryPolicyHash: Hash[RetryPolicy] = + Hash.by { p => + (p.maxAttempts, p.initialBackoff, p.maxBackoff, p.backoffMultiplier) + } + + private final case class Impl( + maxAttempts: Int, + initialBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + backoffMultiplier: Double + ) extends RetryPolicy + with Builder { + + def withMaxAttempts(max: Int): Builder = { + require( + max > 1 && max < 6, + "maxAttempts must be greater than 1 and less than 6" + ) + + copy(maxAttempts = max) + } + + def withInitialBackoff(duration: FiniteDuration): Builder = { + require(duration > Duration.Zero, "initialBackoff must be greater than 0") + copy(initialBackoff = duration) + } + + def withMaxBackoff(duration: FiniteDuration): Builder = { + require(duration > Duration.Zero, "maxBackoff must be greater than 0") + copy(maxBackoff = duration) + } + + def withBackoffMultiplier(multiplier: Double): Builder = { + require(multiplier > 0, "backoffMultiplier must be greater than 0") + copy(backoffMultiplier = multiplier) + } + + def build: RetryPolicy = this + } + +} diff --git a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicySuite.scala b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicySuite.scala new file mode 100644 index 000000000..6e512f724 --- /dev/null +++ b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicySuite.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter + +import cats.Show +import cats.kernel.laws.discipline.HashTests +import cats.syntax.show._ +import munit._ +import org.scalacheck.Arbitrary +import org.scalacheck.Cogen +import org.scalacheck.Gen +import org.scalacheck.Prop + +import scala.concurrent.duration._ + +class RetryPolicySuite extends DisciplineSuite { + + private val retryPolicyGen: Gen[RetryPolicy] = + for { + maxAttempts <- Gen.chooseNum(2, 5) + initialBackoff <- Gen.chooseNum(1, 100) + maxBackoff <- Gen.chooseNum(1, 100) + multiplier <- Gen.chooseNum(0.1, 100.1) + } yield RetryPolicy.builder + .withMaxAttempts(maxAttempts) + .withInitialBackoff(initialBackoff.seconds) + .withMaxBackoff(maxBackoff.seconds) + .withBackoffMultiplier(multiplier) + .build + + private implicit val retryPolicyArbitrary: Arbitrary[RetryPolicy] = + Arbitrary(retryPolicyGen) + + private implicit val retryPolicyCogen: Cogen[RetryPolicy] = + Cogen[(Int, FiniteDuration, FiniteDuration, Double)].contramap { p => + (p.maxAttempts, p.initialBackoff, p.maxBackoff, p.backoffMultiplier) + } + + checkAll("RetryPolicy.HashLaws", HashTests[RetryPolicy].hash) + + property("Show[RetryPolicy]") { + Prop.forAll(retryPolicyGen) { policy => + val expected = "RetryPolicy{" + + show"maxAttempts=${policy.maxAttempts}, " + + show"initialBackoff=${policy.initialBackoff}, " + + show"maxBackoff=${policy.maxBackoff}, " + + show"backoffMultiplier=${policy.backoffMultiplier}}" + + assertEquals(Show[RetryPolicy].show(policy), expected) + } + } + +} diff --git a/sdk-exporter/trace/docker/config/otel-collector-config.yaml b/sdk-exporter/trace/docker/config/otel-collector-config.yaml new file mode 100644 index 000000000..31c481478 --- /dev/null +++ b/sdk-exporter/trace/docker/config/otel-collector-config.yaml @@ -0,0 +1,22 @@ +receivers: + otlp: + protocols: # enable OpenTelemetry Protocol receiver, both gRPC and HTTP + grpc: + http: + +exporters: + jaeger: # export received traces to Jaeger + endpoint: jaeger:14250 + tls: + insecure: true + +processors: + batch: + timeout: 0 # send data immediately + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger] \ No newline at end of file diff --git a/sdk-exporter/trace/docker/docker-compose.yml b/sdk-exporter/trace/docker/docker-compose.yml new file mode 100644 index 000000000..26f1265f5 --- /dev/null +++ b/sdk-exporter/trace/docker/docker-compose.yml @@ -0,0 +1,17 @@ +version: '3.7' +services: + otel-collector: # receives application metrics and traces via gRPC or HTTP protocol + image: otel/opentelemetry-collector-contrib:0.84.0 + command: [--config=/etc/otel-collector-config.yaml] + volumes: + - "./config/otel-collector-config.yaml:/etc/otel-collector-config.yaml" + ports: + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP http receiver + + jaeger: # stores traces received from the OpenTelemetry Collector + image: jaegertracing/all-in-one:1.52 + ports: + - "14250:14250" + - "16685:16685" # GRPC + - "16686:16686" # UI diff --git a/sdk-exporter/trace/js/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala b/sdk-exporter/trace/js/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala new file mode 100644 index 000000000..9b99f6d30 --- /dev/null +++ b/sdk-exporter/trace/js/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import cats.effect.IO +import fs2.compression.Compression +import munit.CatsEffectSuite + +trait ExporterSuitePlatform { self: CatsEffectSuite => + + // Scala.js does not compile with the explicit value + implicit val compression: Compression[IO] = + fs2.io.compression.fs2ioCompressionForIO + +} diff --git a/sdk-exporter/trace/jvm/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala b/sdk-exporter/trace/jvm/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala new file mode 100644 index 000000000..9cdc7ada4 --- /dev/null +++ b/sdk-exporter/trace/jvm/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import munit.CatsEffectSuite + +trait ExporterSuitePlatform { self: CatsEffectSuite => } diff --git a/sdk-exporter/trace/native/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala b/sdk-exporter/trace/native/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala new file mode 100644 index 000000000..eeedf874d --- /dev/null +++ b/sdk-exporter/trace/native/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ExporterSuitePlatform.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import cats.effect.unsafe.IORuntime +import epollcat.unsafe.EpollRuntime +import munit.CatsEffectSuite + +trait ExporterSuitePlatform { self: CatsEffectSuite => + override def munitIORuntime: IORuntime = EpollRuntime.global +} diff --git a/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecs.scala b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecs.scala new file mode 100644 index 000000000..293ee00b7 --- /dev/null +++ b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecs.scala @@ -0,0 +1,211 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp + +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax._ +import org.typelevel.otel4s.sdk.{Resource => InstrumentationResource} +import org.typelevel.otel4s.sdk.common.InstrumentationScope +import org.typelevel.otel4s.sdk.trace.data.EventData +import org.typelevel.otel4s.sdk.trace.data.LinkData +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.data.StatusData +import org.typelevel.otel4s.trace.SpanKind +import org.typelevel.otel4s.trace.Status +import org.typelevel.otel4s.trace.TraceState + +/** @see + * [[https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto]] + */ +private object JsonCodecs { + + implicit val attributeEncoder: Encoder[Attribute[_]] = + Encoder.instance { attribute => + Json.obj( + "key" := attribute.key.name, + "value" := Json.obj( + attributeTypeName(attribute.key.`type`) := attributeValue( + attribute.key.`type`, + attribute.value + ) + ) + ) + } + + implicit val attributesEncoder: Encoder[Attributes] = + Encoder[List[Attribute[_]]].contramap(_.toList) + + implicit val resourceEncoder: Encoder[InstrumentationResource] = + Encoder.instance { resource => + Json.obj( + "attributes" := resource.attributes + ) + } + + implicit val instrumentationScopeEncoder: Encoder[InstrumentationScope] = + Encoder.instance { scope => + Json + .obj( + "name" := scope.name, + "version" := scope.version, + "attributes" := scope.attributes + ) + .dropNullValues + } + + implicit val statusEncoder: Encoder[Status] = + Encoder[Int].contramap { + case Status.Unset => 0 + case Status.Ok => 1 + case Status.Error => 2 + } + + implicit val spanKindEncoder: Encoder[SpanKind] = + Encoder[Int].contramap { + case SpanKind.Internal => 1 + case SpanKind.Server => 2 + case SpanKind.Client => 3 + case SpanKind.Producer => 4 + case SpanKind.Consumer => 5 + } + + implicit val traceStateEncoder: Encoder[TraceState] = + Encoder.instance { state => + if (state.isEmpty) + Json.Null + else + state.asMap + .map { case (key, value) => s"$key=$value" } + .mkString(",") + .asJson + } + + implicit val statusDataEncoder: Encoder[StatusData] = + Encoder.instance { statusData => + Json + .obj( + "message" := statusData.description, + "code" := statusData.status + ) + .dropNullValues + } + + implicit val eventDataEncoder: Encoder[EventData] = + Encoder.instance { eventData => + Json.obj( + "timeUnixNano" := eventData.timestamp.toNanos.toString, + "name" := eventData.name, + "attributes" := eventData.attributes + ) + } + + implicit val linkDataEncoder: Encoder[LinkData] = + Encoder.instance { link => + Json + .obj( + "traceId" := link.spanContext.traceIdHex, + "spanId" := link.spanContext.spanIdHex, + "traceState" := link.spanContext.traceState, + "attributes" := link.attributes + ) + .dropNullValues + } + + implicit val spanDataEncoder: Encoder[SpanData] = + Encoder.instance { span => + Json + .obj( + "traceId" := span.spanContext.traceIdHex, + "spanId" := span.spanContext.spanIdHex, + "traceState" := span.spanContext.traceState, + "parentSpanId" := span.parentSpanContext.map(_.spanIdHex), + "name" := span.name, + "kind" := span.kind, + "startTimeUnixNano" := span.startTimestamp.toNanos.toString, + "endTimeUnixNano" := span.endTimestamp.map(_.toNanos.toString), + "attributes" := span.attributes, + "events" := span.events, + "links" := span.links, + "status" := span.status + ) + .dropNullValues + } + + implicit val spanDataListEncoder: Encoder[List[SpanData]] = + Encoder.instance { spans => + val resourceSpans = + spans.groupBy(_.resource).map { case (resource, resourceSpans) => + val scopeSpans: Iterable[Json] = + resourceSpans + .groupBy(_.instrumentationScope) + .map { case (scope, spans) => + Json.obj( + "scope" := scope, + "spans" := spans.map(Encoder[SpanData].apply) + ) + } + + Json.obj( + "resource" := resource, + "scopeSpans" := scopeSpans + ) + } + + Json.obj("resourceSpans" := resourceSpans).deepDropNullValues + } + + private def attributeValue( + attributeType: AttributeType[_], + value: Any + ): Json = { + def primitive[A: Encoder]: Json = + Encoder[A].apply(value.asInstanceOf[A]) + + def list[A: Encoder](attributeType: AttributeType[A]): Json = { + val typeName = attributeTypeName(attributeType) + val list = value.asInstanceOf[List[A]] + Json.obj("values" := list.map(value => Json.obj(typeName := value))) + } + + attributeType match { + case AttributeType.Boolean => primitive[Boolean] + case AttributeType.Double => primitive[Double] + case AttributeType.String => primitive[String] + case AttributeType.Long => primitive[Long] + case AttributeType.BooleanList => list[Boolean](AttributeType.Boolean) + case AttributeType.DoubleList => list[Double](AttributeType.Double) + case AttributeType.StringList => list[String](AttributeType.String) + case AttributeType.LongList => list[Long](AttributeType.Long) + } + } + + private def attributeTypeName(attributeType: AttributeType[_]): String = + attributeType match { + case AttributeType.Boolean => "boolValue" + case AttributeType.Double => "doubleValue" + case AttributeType.String => "stringValue" + case AttributeType.Long => "intValue" + case AttributeType.BooleanList => "arrayValue" + case AttributeType.DoubleList => "arrayValue" + case AttributeType.StringList => "arrayValue" + case AttributeType.LongList => "arrayValue" + } + +} diff --git a/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpSpanExporter.scala b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpSpanExporter.scala new file mode 100644 index 000000000..436e1a121 --- /dev/null +++ b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpSpanExporter.scala @@ -0,0 +1,324 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter +package otlp + +import cats.Foldable +import cats.effect.Async +import cats.effect.Resource +import cats.effect.Temporal +import cats.effect.std.Console +import cats.effect.syntax.temporal._ +import cats.syntax.applicative._ +import cats.syntax.applicativeError._ +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import fs2.Chunk +import fs2.compression.Compression +import fs2.io.net.Network +import fs2.io.net.tls.TLSContext +import io.opentelemetry.proto.collector.trace.v1.trace_service.ExportTraceServiceRequest +import org.http4s.EntityEncoder +import org.http4s.Header +import org.http4s.Headers +import org.http4s.HttpVersion +import org.http4s.Method +import org.http4s.ProductId +import org.http4s.Request +import org.http4s.Response +import org.http4s.Status +import org.http4s.Uri +import org.http4s.client.Client +import org.http4s.client.middleware.{RetryPolicy => HttpRetryPolicy} +import org.http4s.client.middleware.GZip +import org.http4s.client.middleware.Retry +import org.http4s.ember.client.EmberClientBuilder +import org.http4s.headers.`User-Agent` +import org.http4s.syntax.literals._ +import org.typelevel.ci._ +import org.typelevel.otel4s.sdk.BuildInfo +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ + +/** Exports spans via HTTP. Support `json` and `protobuf` encoding. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/protocol/exporter/]] + * + * @see + * [[https://opentelemetry.io/docs/concepts/sdk-configuration/otlp-exporter-configuration/]] + */ +private final class OtlpHttpSpanExporter[F[_]: Temporal: Console] private ( + client: Client[F], + config: OtlpHttpSpanExporter.Config +) extends SpanExporter[F] { + import JsonCodecs._ + import OtlpHttpSpanExporter.Encoding + + val name: String = { + val headers = config.headers.mkString( + "headers={", + ",", + "}", + Headers.SensitiveHeaders + ) + + "OtlpHttpSpanExporter{" + + s"encoding=${config.encoding}, " + + s"endpoint=${config.endpoint}, " + + s"timeout=${config.timeout}, " + + s"gzipCompression=${config.gzipCompression}, " + + headers + + "}" + } + + private implicit val spansEncoder: EntityEncoder[F, List[SpanData]] = + config.encoding match { + case Encoding.Json => + import org.http4s.circe._ + jsonEncoderOf[F, List[SpanData]] + + case Encoding.Protobuf => + val content = Header.Raw(ci"Content-Type", "application/x-protobuf") + EntityEncoder.simple(content) { spans => + val request: ExportTraceServiceRequest = ProtoCodecs.toProto(spans) + Chunk.array(request.toByteArray) + } + } + + def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = { + val request = + Request[F](Method.POST, config.endpoint, HttpVersion.`HTTP/1.1`) + .withEntity(spans.toList) + .putHeaders(config.headers) + + client + .run(request) + .use(response => logBody(response).unlessA(response.status.isSuccess)) + .timeoutTo( + config.timeout, + Temporal[F].raiseError( + new TimeoutException( + s"The export to [${config.endpoint}] has timed out after [${config.timeout}]" + ) + ) + ) + .handleErrorWith { e => + Console[F].errorln( + s"OtlpHttpSpanExporter: cannot export spans: ${e.getMessage}\n${e.getStackTrace.mkString("\n")}\n" + ) + } + } + + def flush: F[Unit] = Temporal[F].unit + + private def logBody(response: Response[F]): F[Unit] = + for { + body <- response.bodyText.compile.string + _ <- Console[F].println( + s"[OtlpHttpSpanExporter/${config.encoding}] the request failed with [${response.status}]. Body: $body" + ) + } yield () + +} + +object OtlpHttpSpanExporter { + + private object Defaults { + val Endpoint: Uri = uri"http://localhost:4318/v1/traces" + val Timeout: FiniteDuration = 10.seconds + val GzipCompression: Boolean = false + val UserAgentName: String = "OTel-OTLP-Exporter-Scala-Otel4s" + } + + private final case class Config( + encoding: Encoding, + endpoint: Uri, + timeout: FiniteDuration, + headers: Headers, + gzipCompression: Boolean + ) + + sealed trait Encoding + object Encoding { + case object Json extends Encoding + case object Protobuf extends Encoding + } + + /** A builder of [[OtlpHttpSpanExporter]] */ + sealed trait Builder[F[_]] { + + /** Sets the OTLP endpoint to connect to. + * + * The endpoint must start with either `http://` or `https://`, and include + * the full HTTP path. + * + * Default value is `http://localhost:4318/v1/traces`. + */ + def withEndpoint(endpoint: Uri): Builder[F] + + /** Sets the maximum time to wait for the collector to process an exported + * batch of spans. + * + * Default value is `10 seconds`. + */ + def withTimeout(timeout: FiniteDuration): Builder[F] + + /** Enables Gzip compression. + * + * The compression is disabled by default. + */ + def withGzip: Builder[F] + + /** Disables Gzip compression. */ + def withoutGzip: Builder[F] + + /** Adds headers to requests. */ + def addHeaders(headers: Headers): Builder[F] + + /** Sets the explicit TLS context the HTTP client should use. + */ + def withTLSContext(context: TLSContext[F]): Builder[F] + + /** Sets the retry policy to use. + * + * Default retry policy is [[RetryPolicy.default]]. + */ + def withRetryPolicy(policy: RetryPolicy): Builder[F] + + /** Configures the exporter to use the given encoding. + * + * Default encoding is `Protobuf`. + */ + def withEncoding(encoding: Encoding): Builder[F] + + /** Creates a [[OtlpHttpSpanExporter]] using the configuration of this + * builder. + */ + def build: Resource[F, SpanExporter[F]] + } + + /** Creates a [[Builder]] of [[OtlpHttpSpanExporter]] with the default + * configuration: + * - encoding: `Protobuf` + * - endpoint: `http://localhost:4318/v1/traces` + * - timeout: `10 seconds` + * - retry policy: 5 exponential attempts, initial backoff is `1 second`, + * max backoff is `5 seconds` + */ + def builder[F[_]: Async: Network: Compression: Console]: Builder[F] = + BuilderImpl( + encoding = Encoding.Protobuf, + endpoint = Defaults.Endpoint, + gzipCompression = Defaults.GzipCompression, + timeout = Defaults.Timeout, + headers = Headers( + `User-Agent`( + ProductId(Defaults.UserAgentName, version = Some(BuildInfo.version)) + ) + ), + retryPolicy = RetryPolicy.default, + tlsContext = None + ) + + private final case class BuilderImpl[ + F[_]: Async: Network: Compression: Console + ]( + encoding: Encoding, + endpoint: Uri, + gzipCompression: Boolean, + timeout: FiniteDuration, + headers: Headers, + retryPolicy: RetryPolicy, + tlsContext: Option[TLSContext[F]] + ) extends Builder[F] { + + def withTimeout(timeout: FiniteDuration): Builder[F] = + copy(timeout = timeout) + + def withEndpoint(endpoint: Uri): Builder[F] = + copy(endpoint = endpoint) + + def addHeaders(headers: Headers): Builder[F] = + copy(headers = this.headers ++ headers) + + def withGzip: Builder[F] = + copy(gzipCompression = true) + + def withoutGzip: Builder[F] = + copy(gzipCompression = false) + + def withTLSContext(context: TLSContext[F]): Builder[F] = + copy(tlsContext = Some(context)) + + def withRetryPolicy(policy: RetryPolicy): Builder[F] = + copy(retryPolicy = policy) + + def withEncoding(encoding: Encoding): Builder[F] = + copy(encoding = encoding) + + def build: Resource[F, SpanExporter[F]] = { + val config = Config(encoding, endpoint, timeout, headers, gzipCompression) + + val builder = EmberClientBuilder + .default[F] + .withTimeout(config.timeout) + + val gzip: Client[F] => Client[F] = + if (gzipCompression) GZip[F]() else identity + + def backoff(attempt: Int): Option[FiniteDuration] = + Option.when(attempt < retryPolicy.maxAttempts) { + val next = + retryPolicy.initialBackoff * attempt.toLong * retryPolicy.backoffMultiplier + + val delay = + next.min(retryPolicy.maxBackoff) + + delay match { + case f: FiniteDuration => f + case _ => retryPolicy.maxBackoff + } + } + + // see https://opentelemetry.io/docs/specs/otlp/#failures-1 + val retryableCodes = Set( + Status.TooManyRequests, + Status.BadGateway, + Status.ServiceUnavailable, + Status.GatewayTimeout + ) + + def shouldRetry(result: Either[Throwable, Response[F]]): Boolean = + result match { + case Left(_) => true + case Right(response) => retryableCodes.contains(response.status) + } + + val policy = HttpRetryPolicy[F](backoff, (_, res) => shouldRetry(res)) + + for { + client <- tlsContext.foldLeft(builder)(_.withTLSContext(_)).build + } yield new OtlpHttpSpanExporter[F](Retry(policy)(gzip(client)), config) + } + } + +} diff --git a/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoCodecs.scala b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoCodecs.scala new file mode 100644 index 000000000..895389996 --- /dev/null +++ b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoCodecs.scala @@ -0,0 +1,191 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp + +import com.google.protobuf.ByteString +import io.opentelemetry.proto.collector.trace.v1.trace_service.ExportTraceServiceRequest +import io.opentelemetry.proto.common.v1.common.{ + InstrumentationScope => ScopeProto +} +import io.opentelemetry.proto.common.v1.common.AnyValue +import io.opentelemetry.proto.common.v1.common.ArrayValue +import io.opentelemetry.proto.common.v1.common.KeyValue +import io.opentelemetry.proto.resource.v1.resource.{Resource => ResourceProto} +import io.opentelemetry.proto.trace.v1.trace.{Span => SpanProto} +import io.opentelemetry.proto.trace.v1.trace.{Status => StatusProto} +import io.opentelemetry.proto.trace.v1.trace.ResourceSpans +import io.opentelemetry.proto.trace.v1.trace.ScopeSpans +import org.typelevel.otel4s.sdk.common.InstrumentationScope +import org.typelevel.otel4s.sdk.trace.data.EventData +import org.typelevel.otel4s.sdk.trace.data.LinkData +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.data.StatusData +import org.typelevel.otel4s.trace.SpanKind +import org.typelevel.otel4s.trace.Status + +/** @see + * [[https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto]] + */ +private object ProtoCodecs { + trait ToProto[A, P] { + def encode(a: A): P + } + + def toProto[A, P](a: A)(implicit ev: ToProto[A, P]): P = + ev.encode(a) + + implicit val attributeToProto: ToProto[Attribute[_], KeyValue] = { att => + import AnyValue.Value + + def primitive[A](lift: A => Value): Value = + lift(att.value.asInstanceOf[A]) + + def list[A](lift: A => Value): Value.ArrayValue = { + val list = att.value.asInstanceOf[List[A]] + Value.ArrayValue(ArrayValue(list.map(value => AnyValue(lift(value))))) + } + + val value = att.key.`type` match { + case AttributeType.Boolean => primitive[Boolean](Value.BoolValue(_)) + case AttributeType.Double => primitive[Double](Value.DoubleValue(_)) + case AttributeType.String => primitive[String](Value.StringValue(_)) + case AttributeType.Long => primitive[Long](Value.IntValue(_)) + case AttributeType.BooleanList => list[Boolean](Value.BoolValue(_)) + case AttributeType.DoubleList => list[Double](Value.DoubleValue(_)) + case AttributeType.StringList => list[String](Value.StringValue(_)) + case AttributeType.LongList => list[Long](Value.IntValue(_)) + } + + KeyValue(att.key.name, Some(AnyValue(value))) + } + + implicit val attributesToProto: ToProto[Attributes, Seq[KeyValue]] = { attr => + attr.toList.map(attribute => toProto[Attribute[_], KeyValue](attribute)) + } + + implicit val resourceToProto: ToProto[Resource, ResourceProto] = { resource => + ResourceProto( + attributes = toProto(resource.attributes) + ) + } + + implicit val scopeInfoToProto: ToProto[InstrumentationScope, ScopeProto] = { + scope => + ScopeProto( + name = scope.name, + version = scope.version.getOrElse(""), + attributes = toProto(scope.attributes) + ) + } + + implicit val statusToProto: ToProto[Status, StatusProto.StatusCode] = { + case Status.Unset => StatusProto.StatusCode.STATUS_CODE_UNSET + case Status.Ok => StatusProto.StatusCode.STATUS_CODE_OK + case Status.Error => StatusProto.StatusCode.STATUS_CODE_ERROR + } + + implicit val spanKindToProto: ToProto[SpanKind, SpanProto.SpanKind] = { + case SpanKind.Internal => SpanProto.SpanKind.SPAN_KIND_INTERNAL + case SpanKind.Server => SpanProto.SpanKind.SPAN_KIND_SERVER + case SpanKind.Client => SpanProto.SpanKind.SPAN_KIND_CLIENT + case SpanKind.Producer => SpanProto.SpanKind.SPAN_KIND_PRODUCER + case SpanKind.Consumer => SpanProto.SpanKind.SPAN_KIND_CONSUMER + } + + implicit val statusDataToProto: ToProto[StatusData, StatusProto] = { data => + StatusProto( + message = data.description.getOrElse(""), + code = toProto(data.status) + ) + } + + implicit val eventDataToProto: ToProto[EventData, SpanProto.Event] = { data => + SpanProto.Event( + timeUnixNano = data.timestamp.toNanos, + name = data.name, + attributes = toProto(data.attributes) + ) + } + + implicit val linkDataToProto: ToProto[LinkData, SpanProto.Link] = { data => + val traceState = data.spanContext.traceState.asMap + .map { case (key, value) => s"$key=$value" } + .mkString(",") + + SpanProto.Link( + traceId = ByteString.copyFrom(data.spanContext.traceId.toArray), + spanId = ByteString.copyFrom(data.spanContext.spanId.toArray), + traceState = traceState, + attributes = toProto(data.attributes) + ) + } + + implicit val spanDataToProto: ToProto[SpanData, SpanProto] = { span => + val traceState = span.spanContext.traceState.asMap + .map { case (key, value) => s"$key=$value" } + .mkString(",") + + SpanProto( + ByteString.copyFrom(span.spanContext.traceId.toArray), + ByteString.copyFrom(span.spanContext.spanId.toArray), + traceState = traceState, + parentSpanId = span.parentSpanContext + .map(s => ByteString.copyFrom(s.spanId.toArray)) + .getOrElse(ByteString.EMPTY), + name = span.name, + kind = toProto(span.kind), + startTimeUnixNano = span.startTimestamp.toNanos, + endTimeUnixNano = span.endTimestamp.map(_.toNanos).getOrElse(0L), + attributes = toProto(span.attributes), + events = span.events.map(event => toProto(event)), + links = span.links.map(link => toProto(link)), + status = Some(toProto(span.status)) + ) + } + + implicit val spanDataToRequest + : ToProto[List[SpanData], ExportTraceServiceRequest] = { spans => + val resourceSpans = + spans + .groupBy(_.resource) + .map { case (resource, resourceSpans) => + val scopeSpans: List[ScopeSpans] = + resourceSpans + .groupBy(_.instrumentationScope) + .map { case (scope, spans) => + ScopeSpans( + scope = Some(toProto(scope)), + spans = spans.map(span => toProto(span)), + schemaUrl = scope.schemaUrl.getOrElse("") + ) + } + .toList + + ResourceSpans( + Some(toProto(resource)), + scopeSpans, + resource.schemaUrl.getOrElse("") + ) + } + .toList + + ExportTraceServiceRequest(resourceSpans) + } + +} diff --git a/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecsSuite.scala b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecsSuite.scala new file mode 100644 index 000000000..27e404cc0 --- /dev/null +++ b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecsSuite.scala @@ -0,0 +1,341 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp + +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax._ +import munit._ +import org.scalacheck.Arbitrary +import org.scalacheck.Prop +import org.scalacheck.Test +import org.typelevel.otel4s.sdk.{Resource => InstrumentationResource} +import org.typelevel.otel4s.sdk.common.InstrumentationScope +import org.typelevel.otel4s.sdk.trace.Gens +import org.typelevel.otel4s.sdk.trace.data.EventData +import org.typelevel.otel4s.sdk.trace.data.LinkData +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.data.StatusData +import org.typelevel.otel4s.trace.SpanContext +import org.typelevel.otel4s.trace.SpanKind +import org.typelevel.otel4s.trace.Status +import org.typelevel.otel4s.trace.TraceFlags +import org.typelevel.otel4s.trace.TraceState +import scodec.bits.ByteVector + +import scala.concurrent.duration._ + +class JsonCodecsSuite extends ScalaCheckSuite { + import JsonCodecs._ + + private implicit val resourceArbitrary: Arbitrary[InstrumentationResource] = + Arbitrary(Gens.resource) + + private implicit val attributeArbitrary: Arbitrary[Attribute[_]] = + Arbitrary(Gens.attribute) + + private implicit val scopeArbitrary: Arbitrary[InstrumentationScope] = + Arbitrary(Gens.instrumentationScope) + + private implicit val statusArbitrary: Arbitrary[Status] = + Arbitrary(Gens.status) + + private implicit val statusDataArbitrary: Arbitrary[StatusData] = + Arbitrary(Gens.statusData) + + private implicit val spanKindArbitrary: Arbitrary[SpanKind] = + Arbitrary(Gens.spanKind) + + private implicit val eventDataArbitrary: Arbitrary[EventData] = + Arbitrary(Gens.eventData) + + private implicit val linkDataArbitrary: Arbitrary[LinkData] = + Arbitrary(Gens.linkData) + + private implicit val spanDataArbitrary: Arbitrary[SpanData] = + Arbitrary(Gens.spanData) + + test("encode Resource") { + Prop.forAll(Arbitrary.arbitrary[InstrumentationResource]) { resource => + val expected = Json.obj( + "attributes" := Encoder[Attributes].apply(resource.attributes) + ) + + assertEquals(Encoder[InstrumentationResource].apply(resource), expected) + } + } + + test("encode Attribute[_]") { + Prop.forAll(Arbitrary.arbitrary[Attribute[_]]) { attribute => + val value = attribute.value + + def list[A: Encoder](typeName: String): Json = { + val list = value.asInstanceOf[List[A]] + Json.obj("values" := list.map(value => Json.obj(typeName := value))) + } + + val expected = { + val v = attribute.key.`type` match { + case AttributeType.Boolean => + Json.obj("boolValue" := value.asInstanceOf[Boolean]) + case AttributeType.Double => + Json.obj("doubleValue" := value.asInstanceOf[Double]) + case AttributeType.String => + Json.obj("stringValue" := value.asInstanceOf[String]) + case AttributeType.Long => + Json.obj("intValue" := value.asInstanceOf[Long]) + case AttributeType.BooleanList => + Json.obj("arrayValue" := list[Boolean]("boolValue")) + case AttributeType.DoubleList => + Json.obj("arrayValue" := list[Double]("doubleValue")) + case AttributeType.StringList => + Json.obj("arrayValue" := list[String]("stringValue")) + case AttributeType.LongList => + Json.obj("arrayValue" := list[Long]("intValue")) + } + + Json.obj("key" := attribute.key.name, "value" := v) + } + + assertEquals(Encoder[Attribute[_]].apply(attribute), expected) + } + } + + test("encode InstrumentationScope") { + Prop.forAll(Arbitrary.arbitrary[InstrumentationScope]) { scope => + val expected = Json + .obj( + "name" := scope.name, + "version" := scope.version, + "attributes" := scope.attributes + ) + .dropNullValues + + assertEquals(Encoder[InstrumentationScope].apply(scope), expected) + } + } + + test("encode InstrumentationScope (noSpaces)") { + val attrs = Attributes(Attribute("key", "value")) + + assertEquals( + InstrumentationScope( + "name", + None, + None, + Attributes.empty + ).asJson.noSpaces, + """{"name":"name","attributes":[]}""" + ) + + assertEquals( + InstrumentationScope( + "name", + Some("version"), + Some("schema"), + attrs + ).asJson.noSpaces, + """{"name":"name","version":"version","attributes":[{"key":"key","value":{"stringValue":"value"}}]}""" + ) + } + + test("encode Status") { + Prop.forAll(Arbitrary.arbitrary[Status]) { status => + val expected = status match { + case Status.Unset => 0.asJson + case Status.Ok => 1.asJson + case Status.Error => 2.asJson + } + + assertEquals(Encoder[Status].apply(status), expected) + } + } + + test("encode StatusData") { + Prop.forAll(Arbitrary.arbitrary[StatusData]) { statusData => + val expected = Json + .obj( + "message" := statusData.description, + "code" := statusData.status + ) + .dropNullValues + + assertEquals(Encoder[StatusData].apply(statusData), expected) + } + } + + test("encode StatusData (noSpaces)") { + assertEquals( + StatusData(Status.Error, "").asJson.noSpaces, + """{"code":2}""" + ) + + assertEquals( + StatusData(Status.Error, "description").asJson.noSpaces, + """{"message":"description","code":2}""" + ) + } + + test("encode SpanKind") { + Prop.forAll(Arbitrary.arbitrary[SpanKind]) { spanKind => + val expected = spanKind match { + case SpanKind.Internal => 1.asJson + case SpanKind.Server => 2.asJson + case SpanKind.Client => 3.asJson + case SpanKind.Producer => 4.asJson + case SpanKind.Consumer => 5.asJson + } + + assertEquals(Encoder[SpanKind].apply(spanKind), expected) + } + } + + test("encode EventData") { + Prop.forAll(Arbitrary.arbitrary[EventData]) { eventData => + val expected = Json + .obj( + "timeUnixNano" := eventData.timestamp.toNanos.toString, + "name" := eventData.name, + "attributes" := eventData.attributes + ) + .dropNullValues + + assertEquals(Encoder[EventData].apply(eventData), expected) + } + } + + test("encode EventData (noSpaces)") { + val attrs = Attributes(Attribute("key", "value")) + + assertEquals( + EventData("name", 1.nanos, Attributes.empty).asJson.noSpaces, + """{"timeUnixNano":"1","name":"name","attributes":[]}""" + ) + + assertEquals( + EventData("name", 1.nanos, attrs).asJson.noSpaces, + """{"timeUnixNano":"1","name":"name","attributes":[{"key":"key","value":{"stringValue":"value"}}]}""" + ) + } + + test("encode LinkData") { + Prop.forAll(Arbitrary.arbitrary[LinkData]) { link => + val expected = Json + .obj( + "traceId" := link.spanContext.traceIdHex, + "spanId" := link.spanContext.spanIdHex, + "traceState" := link.spanContext.traceState, + "attributes" := link.attributes, + ) + .dropNullValues + + assertEquals(Encoder[LinkData].apply(link), expected) + } + } + + test("encode LinkData (noSpaces)") { + val attrs = Attributes(Attribute("key", "value")) + + val ctx = SpanContext( + ByteVector.fromValidHex("aae6750d58ff8148fa33894599afaaf2"), + ByteVector.fromValidHex("f676d76b0b3d4324"), + TraceFlags.fromByte(1), + TraceState.empty.updated("k", "v").updated("k2", "v2"), + false + ) + + val ctx2 = SpanContext( + ByteVector.fromValidHex("aae6750d58ff8148fa33894599afaaf2"), + ByteVector.fromValidHex("f676d76b0b3d4324"), + TraceFlags.Default, + TraceState.empty, + false + ) + + assertEquals( + LinkData(ctx).asJson.noSpaces, + """{"traceId":"aae6750d58ff8148fa33894599afaaf2","spanId":"f676d76b0b3d4324","traceState":"k2=v2,k=v","attributes":[]}""" + ) + + assertEquals( + LinkData(ctx, attrs).asJson.noSpaces, + """{"traceId":"aae6750d58ff8148fa33894599afaaf2","spanId":"f676d76b0b3d4324","traceState":"k2=v2,k=v","attributes":[{"key":"key","value":{"stringValue":"value"}}]}""" + ) + + assertEquals( + LinkData(ctx2).asJson.noSpaces, + """{"traceId":"aae6750d58ff8148fa33894599afaaf2","spanId":"f676d76b0b3d4324","attributes":[]}""" + ) + } + + test("encode SpanData") { + Prop.forAll(Arbitrary.arbitrary[SpanData]) { span => + val expected = Json + .obj( + "traceId" := span.spanContext.traceIdHex, + "spanId" := span.spanContext.spanIdHex, + "traceState" := span.spanContext.traceState, + "parentSpanId" := span.parentSpanContext.map(_.spanIdHex), + "name" := span.name, + "kind" := span.kind, + "startTimeUnixNano" := span.startTimestamp.toNanos.toString, + "endTimeUnixNano" := span.endTimestamp.map(_.toNanos.toString), + "attributes" := span.attributes, + "events" := span.events, + "links" := span.links, + "status" := span.status + ) + .dropNullValues + + assertEquals(Encoder[SpanData].apply(span), expected) + } + } + + test("encode List[SpanData]") { + Prop.forAll(Arbitrary.arbitrary[List[SpanData]]) { spans => + val resourceSpans = + spans.groupBy(_.resource).map { case (resource, resourceSpans) => + val scopeSpans: Iterable[Json] = + resourceSpans + .groupBy(_.instrumentationScope) + .map { case (scope, spans) => + Json.obj( + "scope" := scope, + "spans" := spans.map(Encoder[SpanData].apply) + ) + } + + Json.obj( + "resource" := resource, + "scopeSpans" := scopeSpans + ) + } + + val expected = Json.obj("resourceSpans" := resourceSpans) + + assertEquals(Encoder[List[SpanData]].apply(spans), expected) + } + } + + override protected def scalaCheckTestParameters: Test.Parameters = + super.scalaCheckTestParameters + .withMinSuccessfulTests(10) + .withMaxSize(10) + +} diff --git a/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpSpanExporterSuite.scala b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpSpanExporterSuite.scala new file mode 100644 index 000000000..93e9f22b0 --- /dev/null +++ b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpSpanExporterSuite.scala @@ -0,0 +1,289 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import cats.effect.IO +import com.comcast.ip4s.IpAddress +import io.circe.Encoder +import io.circe.Json +import munit._ +import org.http4s.Headers +import org.http4s.ember.client.EmberClientBuilder +import org.http4s.headers._ +import org.http4s.syntax.literals._ +import org.scalacheck.Arbitrary +import org.scalacheck.Gen +import org.scalacheck.Test +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeType +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.sdk.BuildInfo +import org.typelevel.otel4s.sdk.trace.Gens +import org.typelevel.otel4s.sdk.trace.data.EventData +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.trace.Status + +import java.util.Locale +import scala.concurrent.duration._ + +class OtlpHttpSpanExporterSuite + extends CatsEffectSuite + with ScalaCheckEffectSuite + with ExporterSuitePlatform { + + import OtlpHttpSpanExporterSuite._ + import OtlpHttpSpanExporter.Encoding + + private implicit val spanDataArbitrary: Arbitrary[SpanData] = + Arbitrary(Gens.spanData) + + private implicit val encodingArbitrary: Arbitrary[Encoding] = + Arbitrary(Gen.oneOf(Encoding.Protobuf, Encoding.Json)) + + test("represent builder parameters in the name") { + PropF.forAllF { (encoding: Encoding) => + val enc = encoding match { + case Encoding.Json => "Json" + case Encoding.Protobuf => "Protobuf" + } + + val expected = + s"OtlpHttpSpanExporter{encoding=$enc, " + + "endpoint=https://localhost:4318/api/v1/traces, " + + "timeout=5 seconds, " + + "gzipCompression=true, " + + s"headers={User-Agent: OTel-OTLP-Exporter-Scala-Otel4s/${BuildInfo.version},X-Forwarded-For: 127.0.0.1}}" + + OtlpHttpSpanExporter + .builder[IO] + .addHeaders( + Headers(`X-Forwarded-For`(IpAddress.fromString("127.0.0.1"))) + ) + .withEndpoint(uri"https://localhost:4318/api/v1/traces") + .withTimeout(5.seconds) + .withGzip + .withEncoding(encoding) + .build + .use { exporter => + IO(assertEquals(exporter.name, expected)) + } + } + } + + test("export spans") { + PropF.forAllF { (sd: SpanData, encoding: Encoding) => + IO.realTime.flatMap { now => + // we need to tweak end timestamps and attributes, so we recreate the span data + val span = SpanData( + name = sd.name, + spanContext = sd.spanContext, + parentSpanContext = sd.parentSpanContext, + kind = sd.kind, + startTimestamp = now, + endTimestamp = Some(now.plus(5.seconds)), + status = sd.status, + attributes = adaptAttributes(sd.attributes), + events = sd.events.map { event => + EventData( + event.name, + now.plus(2.seconds), + adaptAttributes(event.attributes) + ) + }, + links = sd.links, + instrumentationScope = sd.instrumentationScope, + resource = org.typelevel.otel4s.sdk.Resource.default + ) + + val expected = { + val references = { + val childOf = span.parentSpanContext.map { parent => + JaegerRef( + "CHILD_OF", + span.spanContext.traceIdHex, + parent.spanIdHex + ) + } + + val links = span.links.map { d => + JaegerRef( + "FOLLOWS_FROM", + d.spanContext.traceIdHex, + d.spanContext.spanIdHex + ) + } + + childOf.toList ::: links.toList + } + + val duration = + span.endTimestamp.getOrElse(Duration.Zero) - span.startTimestamp + + val tags = { + // the prefix attribute is required with otlp-collector past 0.84.0 + val prefix: List[Attribute[String]] = Nil /*List( + List( + Attribute("otel.library.name", span.instrumentationScope.name) + ), + span.instrumentationScope.version.map { version => + Attribute("otel.library.version", version) + } + ).flatten + */ + + val suffix = List( + List( + Attribute( + "span.kind", + span.kind.toString.toLowerCase(Locale.ROOT) + ) + ), + Option.when(span.status.status != Status.Unset)( + Attribute( + "otel.status_code", + span.status.status.toString.toUpperCase(Locale.ROOT) + ) + ), + Option.when(span.status.status == Status.Error)( + Attribute("error", true) + ), + span.status.description.filter(_.nonEmpty).map { description => + Attribute("otel.status_description", description) + }, + List(Attribute("internal.span.format", "proto")) + ).flatten + + prefix.map(a => toJaegerTag(a)) ++ + span.attributes.map(a => toJaegerTag(a)).toList ++ + suffix.map(a => toJaegerTag(a)) + } + + val events = + span.events.map(d => JaegerLog(d.timestamp.toMicros)).toList + + val jaegerSpan = JaegerSpan( + span.spanContext.traceIdHex, + span.spanContext.spanIdHex, + span.name, + references, + span.startTimestamp.toMicros, + duration.toMicros, + tags, + events, + "p1" + ) + + val jaegerTrace = JaegerTrace( + span.spanContext.traceIdHex, + List(jaegerSpan) + ) + + JaegerResponse(List(jaegerTrace)) + } + + OtlpHttpSpanExporter + .builder[IO] + .withEncoding(encoding) + .withTimeout(20.seconds) + .build + .use(exporter => exporter.exportSpans(List(span))) + .flatMap { _ => + assertIO(findTrace(span.spanContext.traceIdHex), expected) + } + } + } + } + + // it's hard to deal with big numeric values due to various encoding pitfalls + // so we simplify the numbers + private def adaptAttributes(attributes: Attributes): Attributes = { + val adapted = attributes.map { attribute => + val name = attribute.key.name + attribute.key.`type` match { + case AttributeType.Double => Attribute(name, 1.1) + case AttributeType.DoubleList => Attribute(name, List(1.1)) + case AttributeType.Long => Attribute(name, 1L) + case AttributeType.LongList => Attribute(name, List(1L)) + case _ => attribute + } + } + + Attributes.fromSpecific(adapted) + } + + private def toJaegerTag(a: Attribute[_]): JaegerTag = { + import io.circe.syntax._ + + def primitive[A: Encoder](tpe: String): JaegerTag = + JaegerTag(a.key.name, tpe, a.value.asInstanceOf[A].asJson) + + def list[A: Encoder]: JaegerTag = { + val json = a.value.asInstanceOf[List[A]].map(_.asJson).asJson + JaegerTag(a.key.name, "string", json.noSpaces.asJson) + } + + a.key.`type` match { + case AttributeType.Boolean => primitive[Boolean]("bool") + case AttributeType.String => primitive[String]("string") + case AttributeType.Double => primitive[Double]("float64") + case AttributeType.Long => primitive[Long]("int64") + case AttributeType.BooleanList => list[Boolean] + case AttributeType.StringList => list[String] + case AttributeType.DoubleList => list[Double] + case AttributeType.LongList => list[Long] + } + } + + private def findTrace(traceIdHex: String): IO[JaegerResponse] = + EmberClientBuilder.default[IO].build.use { client => + import org.http4s.syntax.literals._ + import org.http4s.circe.CirceEntityCodec._ + import io.circe.generic.auto._ + + val url = uri"http://localhost:16686" / "api" / "traces" / traceIdHex + client.expect[JaegerResponse](url) + } + + override protected def scalaCheckTestParameters: Test.Parameters = + super.scalaCheckTestParameters + .withMinSuccessfulTests(10) + .withMaxSize(10) + +} + +object OtlpHttpSpanExporterSuite { + case class JaegerRef(refType: String, traceID: String, spanID: String) + case class JaegerTag(key: String, `type`: String, value: Json) + case class JaegerLog(timestamp: Long) + + case class JaegerSpan( + traceID: String, + spanID: String, + operationName: String, + references: List[JaegerRef], + startTime: Long, + duration: Long, + tags: List[JaegerTag], + logs: List[JaegerLog], + processID: String + ) + + case class JaegerTrace(traceID: String, spans: List[JaegerSpan]) + case class JaegerResponse(data: List[JaegerTrace]) + +} diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/Gens.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/Gens.scala index 1804ff02a..830bcb059 100644 --- a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/Gens.scala +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/Gens.scala @@ -37,7 +37,7 @@ import scodec.bits.ByteVector object Gens { private val nonEmptyString: Gen[String] = - Arbitrary.arbitrary[String].suchThat(_.nonEmpty) + Gen.alphaNumStr.suchThat(_.nonEmpty) val attribute: Gen[Attribute[_]] = { implicit val stringArb: Arbitrary[String] =