From f92bbe0b92243dea0d0990ffbbf9c486651a0c95 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Fri, 29 Dec 2023 20:21:53 +0200 Subject: [PATCH] sdk-exporter: add `OtlpHttpSpanExporter` --- .github/workflows/ci.yml | 6 +- build.sbt | 111 ++++++- flake.nix | 1 + project/DockerComposeEnvPlugin.scala | 60 ++++ project/plugins.sbt | 6 + .../otel4s/sdk/exporter/RetryPolicy.scala | 188 ++++++++++++ .../exporter/otlp/HttpPayloadEncoding.scala | 24 ++ .../sdk/exporter/otlp/OtlpHttpClient.scala | 200 +++++++++++++ .../sdk/exporter/otlp/ProtoEncoder.scala | 110 +++++++ .../sdk/exporter/RetryPolicySuite.scala | 67 +++++ .../otel4s/sdk/exporter/otlp/JsonCodecs.scala | 107 +++++++ .../sdk/exporter/otlp/ProtoEncoderSuite.scala | 197 ++++++++++++ .../otlp/metrics/MetricsProtoEncoder.scala | 242 +++++++++++++++ .../otlp/metrics/OtlpHttpMetricExporter.scala | 206 +++++++++++++ .../docker/config/otel-collector-config.yaml | 22 ++ sdk-exporter/trace/docker/docker-compose.yml | 23 ++ .../otlp/trace/ExporterSuitePlatform.scala | 28 ++ .../otlp/trace/ExporterSuitePlatform.scala | 21 ++ .../otlp/trace/ExporterSuitePlatform.scala | 25 ++ .../otlp/trace/OtlpHttpSpanExporter.scala | 198 ++++++++++++ .../otlp/trace/SpansProtoEncoder.scala | 169 +++++++++++ .../trace/OtlpHttpSpanExporterSuite.scala | 281 ++++++++++++++++++ .../exporter/otlp/trace/SpansJsonCodecs.scala | 143 +++++++++ .../otlp/trace/SpansProtoEncoderSuite.scala | 239 +++++++++++++++ .../otel4s/sdk/trace/scalacheck/Gens.scala | 21 +- 25 files changed, 2684 insertions(+), 11 deletions(-) create mode 100644 project/DockerComposeEnvPlugin.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicy.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/HttpPayloadEncoding.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpClient.scala create mode 100644 sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoEncoder.scala create mode 100644 sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicySuite.scala create mode 100644 sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecs.scala create mode 100644 sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoEncoderSuite.scala create mode 100644 sdk-exporter/metrics/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/MetricsProtoEncoder.scala create mode 100644 sdk-exporter/metrics/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporter.scala create mode 100644 sdk-exporter/trace/docker/config/otel-collector-config.yaml create mode 100644 sdk-exporter/trace/docker/docker-compose.yml create mode 100644 sdk-exporter/trace/js/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala create mode 100644 sdk-exporter/trace/jvm/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala create mode 100644 sdk-exporter/trace/native/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala create mode 100644 sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporter.scala create mode 100644 sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansProtoEncoder.scala create mode 100644 sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporterSuite.scala create mode 100644 sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansJsonCodecs.scala create mode 100644 sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansProtoEncoderSuite.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e37a73804..7be5a8a59 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,6 +52,10 @@ jobs: if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false' run: sbt +update + - name: Install brew formulae (ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: /home/linuxbrew/.linuxbrew/bin/brew install s2n utf8proc + - name: Check that workflows are up to date run: sbt githubWorkflowCheck @@ -239,7 +243,7 @@ jobs: - name: Submit Dependencies uses: scalacenter/sbt-dependency-submission@v2 with: - modules-ignore: otel4s-sdk-common_native0.4_2.13 otel4s-sdk-common_native0.4_3 otel4s-benchmarks_2.13 otel4s-benchmarks_3 otel4s-examples_2.13 otel4s-examples_3 otel4s-sdk-common_sjs1_2.13 otel4s-sdk-common_sjs1_3 otel4s-sdk-trace_sjs1_2.13 otel4s-sdk-trace_sjs1_3 otel4s_2.13 otel4s_3 docs_2.13 docs_3 scalafix-output_2.13 scalafix-input_2.13 otel4s-sdk_native0.4_2.13 otel4s-sdk_native0.4_3 otel4s-sdk-common_2.13 otel4s-sdk-common_3 otel4s_2.13 otel4s_3 otel4s_2.13 otel4s_3 otel4s-sdk-trace_native0.4_2.13 otel4s-sdk-trace_native0.4_3 scalafix-tests_2.13 otel4s-sdk_sjs1_2.13 otel4s-sdk_sjs1_3 otel4s-sdk_2.13 otel4s-sdk_3 otel4s-sdk-trace_2.13 otel4s-sdk-trace_3 + modules-ignore: otel4s-sdk-exporter-common_sjs1_2.13 otel4s-sdk-exporter-common_sjs1_3 otel4s-sdk-common_native0.4_2.13 otel4s-sdk-common_native0.4_3 otel4s-benchmarks_2.13 otel4s-benchmarks_3 otel4s-examples_2.13 otel4s-examples_3 otel4s-sdk-common_sjs1_2.13 otel4s-sdk-common_sjs1_3 otel4s-sdk-exporter_2.13 otel4s-sdk-exporter_3 otel4s-sdk-trace_sjs1_2.13 otel4s-sdk-trace_sjs1_3 otel4s-sdk-exporter-common_native0.4_2.13 otel4s-sdk-exporter-common_native0.4_3 otel4s-sdk-exporter-trace_2.13 otel4s-sdk-exporter-trace_3 otel4s_2.13 otel4s_3 docs_2.13 docs_3 scalafix-output_2.13 scalafix-input_2.13 otel4s-sdk-exporter-proto_2.13 otel4s-sdk-exporter-proto_3 otel4s-sdk-exporter-proto_sjs1_2.13 otel4s-sdk-exporter-proto_sjs1_3 otel4s-sdk_native0.4_2.13 otel4s-sdk_native0.4_3 otel4s-sdk-exporter-trace_native0.4_2.13 otel4s-sdk-exporter-trace_native0.4_3 otel4s-sdk-common_2.13 otel4s-sdk-common_3 otel4s_2.13 otel4s_3 otel4s_2.13 otel4s_3 otel4s-sdk-trace_native0.4_2.13 otel4s-sdk-trace_native0.4_3 otel4s-sdk-exporter-proto_native0.4_2.13 otel4s-sdk-exporter-proto_native0.4_3 otel4s-sdk-exporter-common_2.13 otel4s-sdk-exporter-common_3 otel4s-sdk-exporter-trace_sjs1_2.13 otel4s-sdk-exporter-trace_sjs1_3 scalafix-tests_2.13 otel4s-sdk-exporter_sjs1_2.13 otel4s-sdk-exporter_sjs1_3 otel4s-sdk_sjs1_2.13 otel4s-sdk_sjs1_3 otel4s-sdk_2.13 otel4s-sdk_3 otel4s-sdk-exporter_native0.4_2.13 otel4s-sdk-exporter_native0.4_3 otel4s-sdk-trace_2.13 otel4s-sdk-trace_3 configs-ignore: test scala-tool scala-doc-tool test-internal validate-steward: diff --git a/build.sbt b/build.sbt index 2eb275fdb..10164d8f2 100644 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,3 @@ -import com.typesafe.tools.mima.core._ - ThisBuild / tlBaseVersion := "0.5" ThisBuild / organization := "org.typelevel" @@ -27,6 +25,8 @@ val Scala213 = "2.13.12" ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.1") ThisBuild / scalaVersion := Scala213 // the default Scala +ThisBuild / githubWorkflowBuildPreamble ++= nativeBrewInstallWorkflowSteps.value + val CatsVersion = "2.10.0" val CatsEffectVersion = "3.5.3" val CatsMtlVersion = "1.4.0" @@ -38,11 +38,16 @@ val MUnitScalaCheckEffectVersion = "2.0.0-M2" val OpenTelemetryVersion = "1.34.1" val OpenTelemetryInstrumentationVersion = "2.0.0" val OpenTelemetrySemConvVersion = "1.23.1-alpha" +val OpenTelemetryProtoVersion = "1.0.0-alpha" val PekkoStreamVersion = "1.0.2" val PekkoHttpVersion = "1.0.0" val PlatformVersion = "1.0.2" val ScodecVersion = "1.1.38" val VaultVersion = "3.5.0" +val Http4sVersion = "0.23.25" +val CirceVersion = "0.14.6" +val EpollcatVersion = "0.1.6" +val ScalaPBCirceVersion = "0.15.1" lazy val scalaReflectDependency = Def.settings( libraryDependencies ++= { @@ -77,6 +82,10 @@ lazy val root = tlCrossRootProject `sdk-common`, `sdk-trace`, sdk, + `sdk-exporter-common`, + `sdk-exporter-proto`, + `sdk-exporter-trace`, + `sdk-exporter`, `testkit-common`, `testkit-metrics`, testkit, @@ -210,6 +219,101 @@ lazy val sdk = crossProject(JVMPlatform, JSPlatform, NativePlatform) ) .settings(scalafixSettings) +// +// SDK exporter +// + +lazy val `sdk-exporter-proto` = + crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Pure) + .enablePlugins(NoPublishPlugin) + .in(file("sdk-exporter/proto")) + .settings( + name := "otel4s-sdk-exporter-proto", + Compile / PB.protoSources += baseDirectory.value.getParentFile / "src" / "main" / "protobuf", + Compile / PB.targets ++= Seq( + scalapb.gen(grpc = false) -> (Compile / sourceManaged).value / "scalapb" + ), + scalacOptions := { + val opts = scalacOptions.value + if (tlIsScala3.value) opts.filterNot(_ == "-Wvalue-discard") else opts + }, + // We use open-telemetry protobuf spec to generate models + // See https://scalapb.github.io/docs/third-party-protos/#there-is-a-library-on-maven-with-the-protos-and-possibly-generated-java-code + libraryDependencies ++= Seq( + "io.opentelemetry.proto" % "opentelemetry-proto" % OpenTelemetryProtoVersion % "protobuf-src" intransitive () + ) + ) + +lazy val `sdk-exporter-common` = + crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Pure) + .in(file("sdk-exporter/common")) + .enablePlugins(NoPublishPlugin) + .dependsOn( + `sdk-common` % "compile->compile;test->test", + `sdk-exporter-proto` + ) + .settings( + name := "otel4s-sdk-exporter-common", + startYear := Some(2023), + libraryDependencies ++= Seq( + "org.http4s" %%% "http4s-ember-client" % Http4sVersion, + "org.http4s" %%% "http4s-circe" % Http4sVersion, + "io.github.scalapb-json" %%% "scalapb-circe" % ScalaPBCirceVersion, + "org.typelevel" %%% "cats-laws" % CatsVersion % Test, + "org.typelevel" %%% "discipline-munit" % MUnitDisciplineVersion % Test, + "io.circe" %%% "circe-generic" % CirceVersion % Test + ) + ) + .settings(munitDependencies) + .settings(scalafixSettings) + +lazy val `sdk-exporter-trace` = + crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Full) + .in(file("sdk-exporter/trace")) + .enablePlugins(NoPublishPlugin, DockerComposeEnvPlugin) + .dependsOn( + `sdk-exporter-common` % "compile->compile;test->test", + `sdk-trace` % "compile->compile;test->test" + ) + .settings( + name := "otel4s-sdk-exporter-trace", + startYear := Some(2023), + dockerComposeEnvFile := crossProjectBaseDirectory.value / "docker" / "docker-compose.yml" + ) + .jsSettings( + scalaJSLinkerConfig ~= (_.withESFeatures( + _.withESVersion(org.scalajs.linker.interface.ESVersion.ES2018) + )), + Test / scalaJSLinkerConfig ~= (_.withModuleKind( + ModuleKind.CommonJSModule + )) + ) + .nativeEnablePlugins(ScalaNativeBrewedConfigPlugin) + .nativeSettings( + libraryDependencies += "com.armanbilge" %%% "epollcat" % EpollcatVersion % Test, + Test / nativeBrewFormulas ++= Set("s2n", "utf8proc"), + Test / envVars ++= Map("S2N_DONT_MLOCK" -> "1") + ) + .settings(munitDependencies) + .settings(scalafixSettings) + +lazy val `sdk-exporter` = crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Pure) + .in(file("sdk-exporter/all")) + .enablePlugins(NoPublishPlugin) + .dependsOn(`sdk-exporter-common`, `sdk-exporter-trace`) + .settings( + name := "otel4s-sdk-exporter" + ) + .settings(scalafixSettings) + +// +// Testkit +// + lazy val `testkit-common` = crossProject(JVMPlatform) .crossType(CrossType.Full) .in(file("testkit/common")) @@ -428,6 +532,9 @@ lazy val unidocs = project `sdk-common`.jvm, `sdk-trace`.jvm, sdk.jvm, + `sdk-exporter-common`.jvm, + `sdk-exporter-trace`.jvm, + `sdk-exporter`.jvm, `testkit-common`.jvm, `testkit-metrics`.jvm, testkit.jvm, diff --git a/flake.nix b/flake.nix index 9dbb4d808..fd691edf5 100644 --- a/flake.nix +++ b/flake.nix @@ -21,6 +21,7 @@ jdk.package = pkgs.jdk8; nodejs.enable = true; native.enable = true; + native.libraries = [ pkgs.zlib pkgs.s2n ]; }; }; } diff --git a/project/DockerComposeEnvPlugin.scala b/project/DockerComposeEnvPlugin.scala new file mode 100644 index 000000000..c164a3a30 --- /dev/null +++ b/project/DockerComposeEnvPlugin.scala @@ -0,0 +1,60 @@ +import sbt._ +import sbt.Keys._ + +import scala.sys.process._ + +object DockerComposeEnvPlugin extends AutoPlugin { + + override def trigger = noTrigger + + object autoImport { + lazy val dockerComposeEnvStart = + taskKey[Unit]("Start local docker compose environment") + lazy val dockerComposeEnvStop = + taskKey[Unit]("Stop local docker compose environment") + lazy val dockerComposeEnvTestOpts = + taskKey[Seq[TestOption]]("Setup and cleanup options") + lazy val dockerComposeEnvFile = + settingKey[File]("The docker compose file to use") + } + + import autoImport._ + + override def projectSettings: Seq[Setting[_]] = + Seq( + dockerComposeEnvStart := { + val projectName = name.value + val file = dockerComposeEnvFile.value + val log = streams.value.log + + start(projectName, file, log) + }, + dockerComposeEnvStop := { + val projectName = name.value + val file = dockerComposeEnvFile.value + val log = streams.value.log + + stop(projectName, file, log) + }, + Test / testOptions := { + val projectName = name.value + val file = dockerComposeEnvFile.value + val log = streams.value.log + + val setup = Tests.Setup(() => start(projectName, file, log)) + + Seq(setup) + } + ) + + private def start(projectName: String, file: File, log: Logger): Unit = { + log.info(s"Project [$projectName]. Creating integration environment") + s"docker compose -f $file -p $projectName up -d".! + } + + private def stop(projectName: String, file: File, log: Logger): Unit = { + log.info(s"Project [$projectName]. Terminating integration environment") + s"docker compose -f $file -p $projectName down".! + } + +} diff --git a/project/plugins.sbt b/project/plugins.sbt index e3aea264b..7e103090d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -7,3 +7,9 @@ addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.2") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.7") addSbtPlugin("com.github.sbt" % "sbt-javaagent" % "0.1.8") +addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6") +addSbtPlugin( + "com.armanbilge" % "sbt-scala-native-config-brew-github-actions" % "0.2.0-RC1" +) + +libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.14" diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicy.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicy.scala new file mode 100644 index 000000000..165f87989 --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicy.scala @@ -0,0 +1,188 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter + +import cats.Show +import cats.kernel.Hash +import cats.syntax.show._ + +import scala.concurrent.duration._ + +/** The exponential retry policy. Used by the exporters. + */ +sealed trait RetryPolicy { + + /** The max number of attempts, including the original request. + */ + def maxAttempts: Int + + /** The initial backoff duration. + */ + def initialBackoff: FiniteDuration + + /** The max backoff duration. + */ + def maxBackoff: FiniteDuration + + /** The backoff multiplier. + */ + def backoffMultiplier: Double + + override final def hashCode(): Int = + Hash[RetryPolicy].hash(this) + + override final def equals(obj: Any): Boolean = + obj match { + case other: RetryPolicy => Hash[RetryPolicy].eqv(this, other) + case _ => false + } + + override final def toString: String = + Show[RetryPolicy].show(this) +} + +object RetryPolicy { + + private object Defaults { + val MaxAttempts: Int = 5 + val InitialBackoff: FiniteDuration = 1.second + val MaxBackoff: FiniteDuration = 5.seconds + val BackoffMultiplier: Double = 1.5 + } + + private val Default = Impl( + Defaults.MaxAttempts, + Defaults.InitialBackoff, + Defaults.MaxBackoff, + Defaults.BackoffMultiplier + ) + + /** A builder of [[RetryPolicy]]. + */ + sealed trait Builder { + + /** Sets the number of maximum attempts. + * + * The default value is `5`. + * + * @note + * must be greater than `1` and less than `6`. + * + * @param max + * the number of maximum attempts to use + */ + def withMaxAttempts(max: Int): Builder + + /** Sets the initial backoff duration. + * + * The default value is `1 second`. + * + * @note + * must be greater than `0`. + * + * @param duration + * the initial backoff duration to use + */ + def withInitialBackoff(duration: FiniteDuration): Builder + + /** Sets the max backoff duration. + * + * The default value is `5 seconds`. + * + * @note + * must be greater than `0`. + * + * @param duration + * the max backoff duration to use + */ + def withMaxBackoff(duration: FiniteDuration): Builder + + /** Sets the backoff multiplier. + * + * The default value is `1.5`. + * + * @note + * must be greater than `0.0`. + * + * @param multiplier + * the backoff multiplier to use + */ + def withBackoffMultiplier(multiplier: Double): Builder + + /** Creates a [[RetryPolicy]] using the configuration of this builder. + */ + def build: RetryPolicy + } + + /** A [[RetryPolicy]] with the default configuration. + */ + def default: RetryPolicy = Default + + /** A [[Builder]] of [[RetryPolicy]] with the default configuration. + */ + def builder: Builder = Default + + implicit val retryPolicyShow: Show[RetryPolicy] = + Show.show { policy => + "RetryPolicy{" + + show"maxAttempts=${policy.maxAttempts}, " + + show"initialBackoff=${policy.initialBackoff}, " + + show"maxBackoff=${policy.maxBackoff}, " + + show"backoffMultiplier=${policy.backoffMultiplier}}" + } + + implicit val retryPolicyHash: Hash[RetryPolicy] = + Hash.by { p => + (p.maxAttempts, p.initialBackoff, p.maxBackoff, p.backoffMultiplier) + } + + private final case class Impl( + maxAttempts: Int, + initialBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + backoffMultiplier: Double + ) extends RetryPolicy + with Builder { + + def withMaxAttempts(max: Int): Builder = { + require( + max > 1 && max < 6, + "maxAttempts must be greater than 1 and less than 6" + ) + + copy(maxAttempts = max) + } + + def withInitialBackoff(duration: FiniteDuration): Builder = { + require(duration > Duration.Zero, "initialBackoff must be greater than 0") + copy(initialBackoff = duration) + } + + def withMaxBackoff(duration: FiniteDuration): Builder = { + require(duration > Duration.Zero, "maxBackoff must be greater than 0") + copy(maxBackoff = duration) + } + + def withBackoffMultiplier(multiplier: Double): Builder = { + require(multiplier > 0, "backoffMultiplier must be greater than 0") + copy(backoffMultiplier = multiplier) + } + + def build: RetryPolicy = this + } + +} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/HttpPayloadEncoding.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/HttpPayloadEncoding.scala new file mode 100644 index 000000000..87d7344bf --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/HttpPayloadEncoding.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +sealed trait HttpPayloadEncoding + +object HttpPayloadEncoding { + case object Json extends HttpPayloadEncoding + case object Protobuf extends HttpPayloadEncoding +} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpClient.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpClient.scala new file mode 100644 index 000000000..1f58cb348 --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/OtlpHttpClient.scala @@ -0,0 +1,200 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import cats.Foldable +import cats.effect.Async +import cats.effect.Resource +import cats.effect.Temporal +import cats.effect.std.Console +import cats.effect.syntax.temporal._ +import cats.syntax.applicative._ +import cats.syntax.applicativeError._ +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import fs2.Chunk +import fs2.compression.Compression +import fs2.io.net.Network +import fs2.io.net.tls.TLSContext +import org.http4s.EntityEncoder +import org.http4s.Header +import org.http4s.Headers +import org.http4s.HttpVersion +import org.http4s.Method +import org.http4s.Request +import org.http4s.Response +import org.http4s.Status +import org.http4s.Uri +import org.http4s.client.Client +import org.http4s.client.middleware.{RetryPolicy => HttpRetryPolicy} +import org.http4s.client.middleware.GZip +import org.http4s.client.middleware.Retry +import org.http4s.ember.client.EmberClientBuilder +import org.typelevel.ci._ +import org.typelevel.otel4s.sdk.exporter.RetryPolicy +import scalapb_circe.Printer + +import scala.concurrent.TimeoutException +import scala.concurrent.duration.FiniteDuration + +/** Exports spans via HTTP. Support `json` and `protobuf` encoding. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/protocol/exporter/]] + * + * @see + * [[https://opentelemetry.io/docs/concepts/sdk-configuration/otlp-exporter-configuration/]] + */ +private final class OtlpHttpClient[F[_]: Temporal: Console, A] private ( + client: Client[F], + config: OtlpHttpClient.Config, +)(implicit encoder: ProtoEncoder.Message[List[A]], printer: Printer) { + + private implicit val entityEncoder: EntityEncoder[F, List[A]] = + config.encoding match { + case HttpPayloadEncoding.Json => + import io.circe.Printer + import org.http4s.circe._ + jsonEncoderWithPrinter(Printer.noSpaces).contramap[List[A]] { spans => + ProtoEncoder.toJson(spans) + } + + case HttpPayloadEncoding.Protobuf => + val content = Header.Raw(ci"Content-Type", "application/x-protobuf") + EntityEncoder.simple(content) { spans => + Chunk.array(ProtoEncoder.toByteArray(spans)) + } + } + + def doExport[G[_]: Foldable](records: G[A]): F[Unit] = { + val request = + Request[F](Method.POST, config.endpoint, HttpVersion.`HTTP/1.1`) + .withEntity(records.toList) + .putHeaders(config.headers) + + client + .run(request) + .use(response => logBody(response).unlessA(response.status.isSuccess)) + .timeoutTo( + config.timeout, + Temporal[F].unit >> Temporal[F].raiseError( + new TimeoutException( + s"The export to [${config.endpoint}] has timed out after [${config.timeout}]" + ) + ) + ) + .handleErrorWith { e => + Console[F].errorln( + s"OtlpHttpClient: cannot export: ${e.getMessage}\n${e.getStackTrace.mkString("\n")}\n" + ) + } + } + + private def logBody(response: Response[F]): F[Unit] = + for { + body <- response.bodyText.compile.string + _ <- Console[F].println( + s"[OtlpHttpClient/${config.encoding}] the request failed with [${response.status}]. Body: $body" + ) + } yield () + + override def toString: String = { + val headers = config.headers.mkString( + "headers={", + ",", + "}", + Headers.SensitiveHeaders + ) + + "OtlpHttpClient{" + + s"encoding=${config.encoding}, " + + s"endpoint=${config.endpoint}, " + + s"timeout=${config.timeout}, " + + s"gzipCompression=${config.gzipCompression}, " + + headers + + "}" + } +} + +private[otlp] object OtlpHttpClient { + + private final case class Config( + encoding: HttpPayloadEncoding, + endpoint: Uri, + timeout: FiniteDuration, + headers: Headers, + gzipCompression: Boolean + ) + + def create[F[_]: Async: Network: Compression: Console, A]( + encoding: HttpPayloadEncoding, + endpoint: Uri, + timeout: FiniteDuration, + headers: Headers, + gzipCompression: Boolean, + retryPolicy: RetryPolicy, + tlsContext: Option[TLSContext[F]] + )(implicit + encoder: ProtoEncoder.Message[List[A]], + printer: Printer + ): Resource[F, OtlpHttpClient[F, A]] = { + val config = Config(encoding, endpoint, timeout, headers, gzipCompression) + + val builder = EmberClientBuilder + .default[F] + .withTimeout(config.timeout) + + val gzip: Client[F] => Client[F] = + if (gzipCompression) GZip[F]() else identity + + def backoff(attempt: Int): Option[FiniteDuration] = + Option.when(attempt < retryPolicy.maxAttempts) { + val next = + retryPolicy.initialBackoff * attempt.toLong * retryPolicy.backoffMultiplier + + val delay = + next.min(retryPolicy.maxBackoff) + + delay match { + case f: FiniteDuration => f + case _ => retryPolicy.maxBackoff + } + } + + // see https://opentelemetry.io/docs/specs/otlp/#failures-1 + val retryableCodes = Set( + Status.TooManyRequests, + Status.BadGateway, + Status.ServiceUnavailable, + Status.GatewayTimeout + ) + + def shouldRetry(result: Either[Throwable, Response[F]]): Boolean = + result match { + case Left(_) => true + case Right(response) => retryableCodes.contains(response.status) + } + + val policy = HttpRetryPolicy[F](backoff, (_, res) => shouldRetry(res)) + + for { + client <- tlsContext.foldLeft(builder)(_.withTLSContext(_)).build + } yield new OtlpHttpClient[F, A](Retry(policy)(gzip(client)), config) + } + +} diff --git a/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoEncoder.scala b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoEncoder.scala new file mode 100644 index 000000000..0a1d9ef6b --- /dev/null +++ b/sdk-exporter/common/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoEncoder.scala @@ -0,0 +1,110 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp + +import io.circe.Json +import io.opentelemetry.proto.common.v1.common.{ + InstrumentationScope => ScopeProto +} +import io.opentelemetry.proto.common.v1.common.AnyValue +import io.opentelemetry.proto.common.v1.common.ArrayValue +import io.opentelemetry.proto.common.v1.common.KeyValue +import io.opentelemetry.proto.resource.v1.resource.{Resource => ResourceProto} +import org.typelevel.otel4s.sdk.common.InstrumentationScope +import scalapb.GeneratedMessage +import scalapb_circe.Printer + +/** @see + * [[https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/common/v1/common.proto]] + */ +private[otlp] trait ProtoEncoder[-A, +P] { + def encode(a: A): P +} + +private[otlp] object ProtoEncoder { + + type Message[A] = ProtoEncoder[A, GeneratedMessage] + + def encode[A, P](a: A)(implicit ev: ProtoEncoder[A, P]): P = + ev.encode(a) + + def toByteArray[A, P <: GeneratedMessage](a: A)(implicit + ev: ProtoEncoder[A, P] + ): Array[Byte] = + ev.encode(a).toByteArray + + def toJson[A, P <: GeneratedMessage](a: A)(implicit + ev: ProtoEncoder[A, P], + printer: Printer + ): Json = + printer.toJson(ev.encode(a)) + + // a preconfigured printer, different implementations may override some internal methods + // see SpansProtoEncoder + class JsonPrinter + extends Printer( + includingDefaultValueFields = false, + formattingLongAsNumber = false, + formattingEnumsAsNumber = true + ) + + implicit val attributeEncoder: ProtoEncoder[Attribute[_], KeyValue] = { att => + import AnyValue.Value + + def primitive[A](lift: A => Value): Value = + lift(att.value.asInstanceOf[A]) + + def list[A](lift: A => Value): Value.ArrayValue = { + val list = att.value.asInstanceOf[List[A]] + Value.ArrayValue(ArrayValue(list.map(value => AnyValue(lift(value))))) + } + + val value = att.key.`type` match { + case AttributeType.Boolean => primitive[Boolean](Value.BoolValue(_)) + case AttributeType.Double => primitive[Double](Value.DoubleValue(_)) + case AttributeType.String => primitive[String](Value.StringValue(_)) + case AttributeType.Long => primitive[Long](Value.IntValue(_)) + case AttributeType.BooleanList => list[Boolean](Value.BoolValue(_)) + case AttributeType.DoubleList => list[Double](Value.DoubleValue(_)) + case AttributeType.StringList => list[String](Value.StringValue(_)) + case AttributeType.LongList => list[Long](Value.IntValue(_)) + } + + KeyValue(att.key.name, Some(AnyValue(value))) + } + + implicit val attributesEncoder: ProtoEncoder[Attributes, Seq[KeyValue]] = { + attr => attr.toSeq.map(a => encode[Attribute[_], KeyValue](a)) + } + + implicit val telemetryResourceEncoder + : ProtoEncoder[TelemetryResource, ResourceProto] = { resource => + ResourceProto(attributes = encode(resource.attributes)) + } + + implicit val instrumentationScopeEncoder + : ProtoEncoder[InstrumentationScope, ScopeProto] = { scope => + ScopeProto( + name = scope.name, + version = scope.version.getOrElse(""), + attributes = encode(scope.attributes) + ) + } + +} diff --git a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicySuite.scala b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicySuite.scala new file mode 100644 index 000000000..6e512f724 --- /dev/null +++ b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/RetryPolicySuite.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter + +import cats.Show +import cats.kernel.laws.discipline.HashTests +import cats.syntax.show._ +import munit._ +import org.scalacheck.Arbitrary +import org.scalacheck.Cogen +import org.scalacheck.Gen +import org.scalacheck.Prop + +import scala.concurrent.duration._ + +class RetryPolicySuite extends DisciplineSuite { + + private val retryPolicyGen: Gen[RetryPolicy] = + for { + maxAttempts <- Gen.chooseNum(2, 5) + initialBackoff <- Gen.chooseNum(1, 100) + maxBackoff <- Gen.chooseNum(1, 100) + multiplier <- Gen.chooseNum(0.1, 100.1) + } yield RetryPolicy.builder + .withMaxAttempts(maxAttempts) + .withInitialBackoff(initialBackoff.seconds) + .withMaxBackoff(maxBackoff.seconds) + .withBackoffMultiplier(multiplier) + .build + + private implicit val retryPolicyArbitrary: Arbitrary[RetryPolicy] = + Arbitrary(retryPolicyGen) + + private implicit val retryPolicyCogen: Cogen[RetryPolicy] = + Cogen[(Int, FiniteDuration, FiniteDuration, Double)].contramap { p => + (p.maxAttempts, p.initialBackoff, p.maxBackoff, p.backoffMultiplier) + } + + checkAll("RetryPolicy.HashLaws", HashTests[RetryPolicy].hash) + + property("Show[RetryPolicy]") { + Prop.forAll(retryPolicyGen) { policy => + val expected = "RetryPolicy{" + + show"maxAttempts=${policy.maxAttempts}, " + + show"initialBackoff=${policy.initialBackoff}, " + + show"maxBackoff=${policy.maxBackoff}, " + + show"backoffMultiplier=${policy.backoffMultiplier}}" + + assertEquals(Show[RetryPolicy].show(policy), expected) + } + } + +} diff --git a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecs.scala b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecs.scala new file mode 100644 index 000000000..02486c58d --- /dev/null +++ b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/JsonCodecs.scala @@ -0,0 +1,107 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp + +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax._ +import org.typelevel.otel4s.sdk.common.InstrumentationScope + +trait JsonCodecs { + + implicit val attributeEncoder: Encoder[Attribute[_]] = + Encoder.instance { attribute => + Json.obj( + "key" := attribute.key.name, + "value" := Json.obj( + attributeTypeName(attribute.key.`type`) := attributeValue( + attribute.key.`type`, + attribute.value + ) + ) + ) + } + + implicit val attributesEncoder: Encoder[Attributes] = + Encoder[List[Attribute[_]]].contramap(_.toList) + + implicit val resourceEncoder: Encoder[TelemetryResource] = + Encoder.instance { resource => + Json + .obj( + "attributes" := resource.attributes + ) + .dropEmptyValues + } + + implicit val instrumentationScopeEncoder: Encoder[InstrumentationScope] = + Encoder.instance { scope => + Json + .obj( + "name" := scope.name, + "version" := scope.version, + "attributes" := scope.attributes + ) + .dropNullValues + .dropEmptyValues + } + + private def attributeValue( + attributeType: AttributeType[_], + value: Any + ): Json = { + def primitive[A: Encoder]: Json = + Encoder[A].apply(value.asInstanceOf[A]) + + def list[A: Encoder](attributeType: AttributeType[A]): Json = { + val typeName = attributeTypeName(attributeType) + val list = value.asInstanceOf[List[A]] + Json.obj("values" := list.map(value => Json.obj(typeName := value))) + } + + implicit val longEncoder: Encoder[Long] = + Encoder[String].contramap(_.toString) + + attributeType match { + case AttributeType.Boolean => primitive[Boolean] + case AttributeType.Double => primitive[Double] + case AttributeType.String => primitive[String] + case AttributeType.Long => primitive[Long] + case AttributeType.BooleanList => list[Boolean](AttributeType.Boolean) + case AttributeType.DoubleList => list[Double](AttributeType.Double) + case AttributeType.StringList => list[String](AttributeType.String) + case AttributeType.LongList => list[Long](AttributeType.Long) + } + } + + private def attributeTypeName(attributeType: AttributeType[_]): String = + attributeType match { + case AttributeType.Boolean => "boolValue" + case AttributeType.Double => "doubleValue" + case AttributeType.String => "stringValue" + case AttributeType.Long => "intValue" + case AttributeType.BooleanList => "arrayValue" + case AttributeType.DoubleList => "arrayValue" + case AttributeType.StringList => "arrayValue" + case AttributeType.LongList => "arrayValue" + } + +} + +object JsonCodecs extends JsonCodecs diff --git a/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoEncoderSuite.scala b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoEncoderSuite.scala new file mode 100644 index 000000000..0ad4a12e1 --- /dev/null +++ b/sdk-exporter/common/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/ProtoEncoderSuite.scala @@ -0,0 +1,197 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp + +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax._ +import io.opentelemetry.proto.common.v1.common.KeyValue +import munit.ScalaCheckSuite +import org.scalacheck.Arbitrary +import org.scalacheck.Prop +import org.scalacheck.Test +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeType +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.sdk.TelemetryResource +import org.typelevel.otel4s.sdk.common.InstrumentationScope +import org.typelevel.otel4s.sdk.scalacheck.Arbitraries._ +import scalapb_circe.Printer + +class ProtoEncoderSuite extends ScalaCheckSuite { + import JsonCodecs._ + + private implicit val jsonPrinter: Printer = + new ProtoEncoder.JsonPrinter + + private implicit val keyValueEncoder: ProtoEncoder[KeyValue, KeyValue] = + kv => kv + + test("encode Attribute[_]") { + Prop.forAll(Arbitrary.arbitrary[Attribute[_]]) { attribute => + val value = attribute.value + + def list[A: Encoder](typeName: String): Json = { + val list = value.asInstanceOf[List[A]] + Json.obj("values" := list.map(value => Json.obj(typeName := value))) + } + + implicit val longEncoder: Encoder[Long] = + Encoder[String].contramap[Long](_.toString) + + val expected = { + val v = attribute.key.`type` match { + case AttributeType.Boolean => + Json.obj("boolValue" := value.asInstanceOf[Boolean]) + case AttributeType.Double => + Json.obj("doubleValue" := value.asInstanceOf[Double]) + case AttributeType.String => + Json.obj("stringValue" := value.asInstanceOf[String]) + case AttributeType.Long => + Json.obj("intValue" := value.asInstanceOf[Long]) + case AttributeType.BooleanList => + Json.obj("arrayValue" := list[Boolean]("boolValue")) + case AttributeType.DoubleList => + Json.obj("arrayValue" := list[Double]("doubleValue")) + case AttributeType.StringList => + Json.obj("arrayValue" := list[String]("stringValue")) + case AttributeType.LongList => + Json.obj("arrayValue" := list[Long]("intValue")) + } + + Json.obj("key" := attribute.key.name, "value" := v) + } + + assertEquals( + ProtoEncoder.toJson( + ProtoEncoder.attributeEncoder.encode(attribute) + ), + expected + ) + } + } + + test("encode Attribute[_] (noSpaces)") { + assertEquals( + ProtoEncoder.toJson(Attribute("string", "value")).noSpaces, + """{"key":"string","value":{"stringValue":"value"}}""" + ) + + assertEquals( + ProtoEncoder.toJson(Attribute("string_list", List("a", "b"))).noSpaces, + """{"key":"string_list","value":{"arrayValue":{"values":[{"stringValue":"a"},{"stringValue":"b"}]}}}""" + ) + + assertEquals( + ProtoEncoder.toJson(Attribute("boolean", true)).noSpaces, + """{"key":"boolean","value":{"boolValue":true}}""" + ) + + assertEquals( + ProtoEncoder + .toJson(Attribute("boolean_list", List(true, false))) + .noSpaces, + """{"key":"boolean_list","value":{"arrayValue":{"values":[{"boolValue":true},{"boolValue":false}]}}}""" + ) + + assertEquals( + ProtoEncoder.toJson(Attribute("int+", 1L)).noSpaces, + """{"key":"int+","value":{"intValue":"1"}}""" + ) + + assertEquals( + ProtoEncoder.toJson(Attribute("int-", -1L)).noSpaces, + """{"key":"int-","value":{"intValue":"-1"}}""" + ) + + assertEquals( + ProtoEncoder.toJson(Attribute("int_list", List(1L, -1L))).noSpaces, + """{"key":"int_list","value":{"arrayValue":{"values":[{"intValue":"1"},{"intValue":"-1"}]}}}""" + ) + + assertEquals( + ProtoEncoder.toJson(Attribute("double+", 1.1)).noSpaces, + """{"key":"double+","value":{"doubleValue":1.1}}""" + ) + + assertEquals( + ProtoEncoder.toJson(Attribute("double-", -1.1)).noSpaces, + """{"key":"double-","value":{"doubleValue":-1.1}}""" + ) + + assertEquals( + ProtoEncoder.toJson(Attribute("double_list", List(1.1, -1.1))).noSpaces, + """{"key":"double_list","value":{"arrayValue":{"values":[{"doubleValue":1.1},{"doubleValue":-1.1}]}}}""" + ) + } + + test("encode TelemetryResource") { + Prop.forAll(Arbitrary.arbitrary[TelemetryResource]) { resource => + val expected = Json + .obj( + "attributes" := Encoder[Attributes].apply(resource.attributes) + ) + .dropEmptyValues + + assertEquals(ProtoEncoder.toJson(resource), expected) + } + } + + test("encode InstrumentationScope") { + Prop.forAll(Arbitrary.arbitrary[InstrumentationScope]) { scope => + val expected = Json + .obj( + "name" := scope.name, + "version" := scope.version, + "attributes" := scope.attributes + ) + .dropNullValues + .dropEmptyValues + + assertEquals(ProtoEncoder.toJson(scope), expected) + } + } + + test("encode InstrumentationScope (noSpaces)") { + val attrs = Attributes(Attribute("key", "value")) + + val scope1 = InstrumentationScope("name", None, None, Attributes.empty) + + val scope2 = InstrumentationScope( + "name", + Some("version"), + Some("schema"), + attrs + ) + + assertEquals( + ProtoEncoder.toJson(scope1).noSpaces, + """{"name":"name"}""" + ) + + assertEquals( + ProtoEncoder.toJson(scope2).noSpaces, + """{"name":"name","version":"version","attributes":[{"key":"key","value":{"stringValue":"value"}}]}""" + ) + } + + override protected def scalaCheckTestParameters: Test.Parameters = + super.scalaCheckTestParameters + .withMinSuccessfulTests(100) + .withMaxSize(100) + +} diff --git a/sdk-exporter/metrics/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/MetricsProtoEncoder.scala b/sdk-exporter/metrics/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/MetricsProtoEncoder.scala new file mode 100644 index 000000000..081862f08 --- /dev/null +++ b/sdk-exporter/metrics/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/MetricsProtoEncoder.scala @@ -0,0 +1,242 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp.metrics + +import io.opentelemetry.proto.collector.metrics.v1.metrics_service.ExportMetricsServiceRequest +import io.opentelemetry.proto.metrics.v1.{metrics => Proto} +import io.opentelemetry.proto.metrics.v1.metrics.ResourceMetrics +import io.opentelemetry.proto.metrics.v1.metrics.ScopeMetrics +import org.typelevel.otel4s.sdk.exporter.otlp.ProtoEncoder +import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality +import org.typelevel.otel4s.sdk.metrics.data.Data +import org.typelevel.otel4s.sdk.metrics.data.ExemplarData +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import scalapb_circe.Printer + +/** @see + * [[https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/common/v1/common.proto]] + */ +private object MetricsProtoEncoder { + implicit val jsonPrinter: Printer = new ProtoEncoder.JsonPrinter + + implicit val aggregationTemporalityEncoder: ProtoEncoder[ + AggregationTemporality, + Proto.AggregationTemporality + ] = { + case AggregationTemporality.Delta => + Proto.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA + case AggregationTemporality.Cumulative => + Proto.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + } + + implicit val exemplarEncoder: ProtoEncoder[ExemplarData, Proto.Exemplar] = { + exemplar => + Proto.Exemplar( + ProtoEncoder.encode(exemplar.filteredAttributes), + exemplar.timestamp.toNanos, + exemplar match { + case ExemplarData.LongExemplar(_, _, value) => + Proto.Exemplar.Value.AsInt(value) + case ExemplarData.DoubleExemplar(_, _, value) => + Proto.Exemplar.Value.AsDouble(value) + } + ) + } + + implicit val dataEncoder: ProtoEncoder[Data, Proto.Metric.Data] = { + case Data.DoubleSum(points, isMonotonic, aggregationTemporality) => + Proto.Metric.Data.Sum( + Proto.Sum( + points.map(p => + Proto.NumberDataPoint( + ProtoEncoder.encode(p.attributes), + p.startTimestamp.toNanos, + p.timestamp.toNanos, + value = Proto.NumberDataPoint.Value.AsDouble(p.value), + exemplars = p.exemplars.map(ProtoEncoder.encode(_)), + ) + ), + ProtoEncoder.encode(aggregationTemporality), + isMonotonic + ) + ) + + case Data.LongSum(points, isMonotonic, aggregationTemporality) => + Proto.Metric.Data.Sum( + Proto.Sum( + points.map(p => + Proto.NumberDataPoint( + ProtoEncoder.encode(p.attributes), + p.startTimestamp.toNanos, + p.timestamp.toNanos, + value = Proto.NumberDataPoint.Value.AsInt(p.value), + exemplars = p.exemplars.map(ProtoEncoder.encode(_)), + ) + ), + ProtoEncoder.encode(aggregationTemporality), + isMonotonic + ) + ) + + case Data.DoubleGauge(points) => + Proto.Metric.Data.Gauge( + Proto.Gauge( + points.map(p => + Proto.NumberDataPoint( + ProtoEncoder.encode(p.attributes), + p.startTimestamp.toNanos, + p.timestamp.toNanos, + value = Proto.NumberDataPoint.Value.AsDouble(p.value), + exemplars = p.exemplars.map(ProtoEncoder.encode(_)), + ) + ) + ) + ) + case Data.LongGauge(points) => + Proto.Metric.Data.Gauge( + Proto.Gauge( + points.map(p => + Proto.NumberDataPoint( + ProtoEncoder.encode(p.attributes), + p.startTimestamp.toNanos, + p.timestamp.toNanos, + value = Proto.NumberDataPoint.Value.AsInt(p.value), + exemplars = p.exemplars.map(ProtoEncoder.encode(_)), + ) + ) + ) + ) + + case Data.Summary(points) => + Proto.Metric.Data.Summary( + Proto.Summary( + points.map(p => + Proto.SummaryDataPoint( + ProtoEncoder.encode(p.attributes), + p.startTimestamp.toNanos, + p.timestamp.toNanos, + p.count, + p.sum, + p.percentileValues.map { q => + Proto.SummaryDataPoint + .ValueAtQuantile(q.quantile, q.value) + } + ) + ) + ) + ) + + case Data.Histogram(points, aggregationTemporality) => + Proto.Metric.Data.Histogram( + Proto.Histogram( + points.map(p => + Proto.HistogramDataPoint( + attributes = ProtoEncoder.encode(p.attributes), + startTimeUnixNano = p.startTimestamp.toNanos, + timeUnixNano = p.timestamp.toNanos, + count = p.count, + sum = p.sum, + bucketCounts = p.counts, + explicitBounds = p.boundaries, + exemplars = p.exemplars.map(ProtoEncoder.encode(_)), + min = p.min, + max = p.max + ) + ), + ProtoEncoder.encode(aggregationTemporality) + ) + ) + + case Data.ExponentialHistogram(points, aggregationTemporality) => + Proto.Metric.Data.ExponentialHistogram( + Proto.ExponentialHistogram( + points.map(p => + Proto + .ExponentialHistogramDataPoint( + attributes = ProtoEncoder.encode(p.attributes), + startTimeUnixNano = p.startTimestamp.toNanos, + timeUnixNano = p.timestamp.toNanos, + count = p.count, + sum = Some(p.sum), + scale = 0, // todo scale + zeroCount = p.zeroCount, + positive = Some( + Proto.ExponentialHistogramDataPoint.Buckets( + p.positiveBuckets.offset, + p.positiveBuckets.bucketCounts + ) + ), + negative = Some( + Proto.ExponentialHistogramDataPoint.Buckets( + p.negativeBuckets.offset, + p.negativeBuckets.bucketCounts + ) + ), + exemplars = p.exemplars.map(ProtoEncoder.encode(_)), + min = Some(p.min), + max = Some(p.max), + // zeroThreshold = , // todo? + ) + ), + ProtoEncoder.encode(aggregationTemporality) + ) + ) + } + + implicit val metricDataEncoder: ProtoEncoder[MetricData, Proto.Metric] = { + metric => + Proto.Metric( + metric.name, + metric.description.getOrElse(""), + unit = metric.unit.getOrElse(""), + data = ProtoEncoder.encode(metric.data) + ) + } + + implicit val exportMetricsRequest + : ProtoEncoder[List[MetricData], ExportMetricsServiceRequest] = { + metrics => + val resourceSpans = + metrics + .groupBy(_.resource) + .map { case (resource, resourceSpans) => + val scopeSpans: List[ScopeMetrics] = + resourceSpans + .groupBy(_.instrumentationScope) + .map { case (scope, spans) => + ScopeMetrics( + scope = Some(ProtoEncoder.encode(scope)), + metrics = spans.map(metric => ProtoEncoder.encode(metric)), + schemaUrl = scope.schemaUrl.getOrElse("") + ) + } + .toList + + ResourceMetrics( + Some(ProtoEncoder.encode(resource)), + scopeSpans, + resource.schemaUrl.getOrElse("") + ) + } + .toList + + ExportMetricsServiceRequest(resourceSpans) + } + +} diff --git a/sdk-exporter/metrics/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporter.scala b/sdk-exporter/metrics/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporter.scala new file mode 100644 index 000000000..90f37418a --- /dev/null +++ b/sdk-exporter/metrics/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/metrics/OtlpHttpMetricExporter.scala @@ -0,0 +1,206 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter +package otlp +package metrics + +import cats.Applicative +import cats.Foldable +import cats.effect.Async +import cats.effect.Resource +import cats.effect.std.Console +import fs2.compression.Compression +import fs2.io.net.Network +import fs2.io.net.tls.TLSContext +import org.http4s.Headers +import org.http4s.ProductId +import org.http4s.Uri +import org.http4s.headers.`User-Agent` +import org.http4s.syntax.literals._ +import org.typelevel.otel4s.sdk.BuildInfo +import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.metrics.exporter.MetricExporter +import org.typelevel.otel4s.sdk.metrics.internal.InstrumentType + +import scala.concurrent.duration._ + +/** Exports spans via HTTP. Support `json` and `protobuf` encoding. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/protocol/exporter/]] + * + * @see + * [[https://opentelemetry.io/docs/concepts/sdk-configuration/otlp-exporter-configuration/]] + */ +private final class OtlpHttpMetricExporter[F[_]: Applicative] private ( + client: OtlpHttpExporter[F, MetricData], +) extends MetricExporter[F] { + + val name: String = s"OtlpHttpMetricExporter{client=$client}" + + def aggregationTemporality( + instrumentType: InstrumentType + ): AggregationTemporality = ??? + + def exportMetrics[G[_]: Foldable](metrics: G[MetricData]): F[Unit] = + client.doExport(metrics) + + def flush: F[Unit] = Applicative[F].unit + +} + +object OtlpHttpMetricExporter { + + private object Defaults { + val Endpoint: Uri = uri"http://localhost:4318/v1/metrics" + val Timeout: FiniteDuration = 10.seconds + val GzipCompression: Boolean = false + val UserAgentName: String = "OTel-OTLP-Exporter-Scala-Otel4s" + } + + /** A builder of [[OtlpHttpMetricExporter]] */ + sealed trait Builder[F[_]] { + + /** Sets the OTLP endpoint to connect to. + * + * The endpoint must start with either `http://` or `https://`, and include + * the full HTTP path. + * + * Default value is `http://localhost:4318/v1/metrics`. + */ + def withEndpoint(endpoint: Uri): Builder[F] + + /** Sets the maximum time to wait for the collector to process an exported + * batch of spans. + * + * Default value is `10 seconds`. + */ + def withTimeout(timeout: FiniteDuration): Builder[F] + + /** Enables Gzip compression. + * + * The compression is disabled by default. + */ + def withGzip: Builder[F] + + /** Disables Gzip compression. */ + def withoutGzip: Builder[F] + + /** Adds headers to requests. */ + def addHeaders(headers: Headers): Builder[F] + + /** Sets the explicit TLS context the HTTP client should use. + */ + def withTLSContext(context: TLSContext[F]): Builder[F] + + /** Sets the retry policy to use. + * + * Default retry policy is [[RetryPolicy.default]]. + */ + def withRetryPolicy(policy: RetryPolicy): Builder[F] + + /** Configures the exporter to use the given encoding. + * + * Default encoding is `Protobuf`. + */ + def withEncoding(encoding: HttpPayloadEncoding): Builder[F] + + /** Creates a [[OtlpHttpMetricExporter]] using the configuration of this + * builder. + */ + def build: Resource[F, MetricExporter[F]] + } + + /** Creates a [[Builder]] of [[OtlpHttpMetricExporter]] with the default + * configuration: + * - encoding: `Protobuf` + * - endpoint: `http://localhost:4318/v1/metrics` + * - timeout: `10 seconds` + * - retry policy: 5 exponential attempts, initial backoff is `1 second`, + * max backoff is `5 seconds` + */ + def builder[F[_]: Async: Network: Compression: Console]: Builder[F] = + BuilderImpl( + encoding = HttpPayloadEncoding.Protobuf, + endpoint = Defaults.Endpoint, + gzipCompression = Defaults.GzipCompression, + timeout = Defaults.Timeout, + headers = Headers( + `User-Agent`( + ProductId(Defaults.UserAgentName, version = Some(BuildInfo.version)) + ) + ), + retryPolicy = RetryPolicy.default, + tlsContext = None + ) + + private final case class BuilderImpl[ + F[_]: Async: Network: Compression: Console + ]( + encoding: HttpPayloadEncoding, + endpoint: Uri, + gzipCompression: Boolean, + timeout: FiniteDuration, + headers: Headers, + retryPolicy: RetryPolicy, + tlsContext: Option[TLSContext[F]] + ) extends Builder[F] { + + def withTimeout(timeout: FiniteDuration): Builder[F] = + copy(timeout = timeout) + + def withEndpoint(endpoint: Uri): Builder[F] = + copy(endpoint = endpoint) + + def addHeaders(headers: Headers): Builder[F] = + copy(headers = this.headers ++ headers) + + def withGzip: Builder[F] = + copy(gzipCompression = true) + + def withoutGzip: Builder[F] = + copy(gzipCompression = false) + + def withTLSContext(context: TLSContext[F]): Builder[F] = + copy(tlsContext = Some(context)) + + def withRetryPolicy(policy: RetryPolicy): Builder[F] = + copy(retryPolicy = policy) + + def withEncoding(encoding: HttpPayloadEncoding): Builder[F] = + copy(encoding = encoding) + + def build: Resource[F, MetricExporter[F]] = { + import MetricsProtoEncoder.exportMetricsRequest + import MetricsProtoEncoder.jsonPrinter + + for { + client <- OtlpHttpExporter.create[F, MetricData]( + encoding, + endpoint, + timeout, + headers, + gzipCompression, + retryPolicy, + tlsContext + ) + } yield new OtlpHttpMetricExporter[F](client) + } + } + +} diff --git a/sdk-exporter/trace/docker/config/otel-collector-config.yaml b/sdk-exporter/trace/docker/config/otel-collector-config.yaml new file mode 100644 index 000000000..aa03de5d1 --- /dev/null +++ b/sdk-exporter/trace/docker/config/otel-collector-config.yaml @@ -0,0 +1,22 @@ +receivers: + otlp: + protocols: # enable OpenTelemetry Protocol receiver, both gRPC and HTTP + grpc: + http: + +exporters: + otlp/jaeger: # export received traces to Jaeger + endpoint: jaeger:4317 # grpc endpoint + tls: + insecure: true + +processors: + batch: + timeout: 0 # send data immediately + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp/jaeger] \ No newline at end of file diff --git a/sdk-exporter/trace/docker/docker-compose.yml b/sdk-exporter/trace/docker/docker-compose.yml new file mode 100644 index 000000000..a03347a50 --- /dev/null +++ b/sdk-exporter/trace/docker/docker-compose.yml @@ -0,0 +1,23 @@ +version: '3.7' +services: + otel-collector: # receives application metrics and traces via gRPC or HTTP protocol + image: otel/opentelemetry-collector-contrib:0.91.0 + command: [ --config=/etc/otel-collector-config.yaml ] + volumes: + - "./config/otel-collector-config.yaml:/etc/otel-collector-config.yaml" + ports: + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP http receiver + depends_on: + jaeger: + condition: service_healthy + + jaeger: # stores traces received from the OpenTelemetry Collector + image: jaegertracing/all-in-one:1.52 + ports: + - "16686:16686" # UI + healthcheck: + test: [ "CMD", "wget", "--spider", "-S", "http://localhost:14269/health" ] + interval: 10s + timeout: 5s + retries: 3 diff --git a/sdk-exporter/trace/js/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala b/sdk-exporter/trace/js/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala new file mode 100644 index 000000000..55fb27704 --- /dev/null +++ b/sdk-exporter/trace/js/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp.trace + +import cats.effect.IO +import fs2.compression.Compression +import munit.CatsEffectSuite + +trait ExporterSuitePlatform { self: CatsEffectSuite => + + implicit val compression: Compression[IO] = + fs2.io.compression.fs2ioCompressionForIO + +} diff --git a/sdk-exporter/trace/jvm/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala b/sdk-exporter/trace/jvm/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala new file mode 100644 index 000000000..fc5a118d6 --- /dev/null +++ b/sdk-exporter/trace/jvm/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp.trace + +import munit.CatsEffectSuite + +trait ExporterSuitePlatform { self: CatsEffectSuite => } diff --git a/sdk-exporter/trace/native/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala b/sdk-exporter/trace/native/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala new file mode 100644 index 000000000..25a5711de --- /dev/null +++ b/sdk-exporter/trace/native/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/ExporterSuitePlatform.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter.otlp.trace + +import cats.effect.unsafe.IORuntime +import epollcat.unsafe.EpollRuntime +import munit.CatsEffectSuite + +trait ExporterSuitePlatform { self: CatsEffectSuite => + override def munitIORuntime: IORuntime = EpollRuntime.global +} diff --git a/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporter.scala b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporter.scala new file mode 100644 index 000000000..c428f30a2 --- /dev/null +++ b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporter.scala @@ -0,0 +1,198 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.exporter +package otlp +package trace + +import cats.Applicative +import cats.Foldable +import cats.effect.Async +import cats.effect.Resource +import cats.effect.std.Console +import fs2.compression.Compression +import fs2.io.net.Network +import fs2.io.net.tls.TLSContext +import org.http4s.Headers +import org.http4s.ProductId +import org.http4s.Uri +import org.http4s.headers.`User-Agent` +import org.http4s.syntax.literals._ +import org.typelevel.otel4s.sdk.BuildInfo +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter + +import scala.concurrent.duration._ + +/** Exports spans via HTTP. Support `json` and `protobuf` encoding. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/protocol/exporter/]] + * + * @see + * [[https://opentelemetry.io/docs/concepts/sdk-configuration/otlp-exporter-configuration/]] + */ +private final class OtlpHttpSpanExporter[F[_]: Applicative] private ( + client: OtlpHttpClient[F, SpanData] +) extends SpanExporter[F] { + val name: String = s"OtlpHttpSpanExporter{client=$client}" + + def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = + client.doExport(spans) + + def flush: F[Unit] = Applicative[F].unit +} + +object OtlpHttpSpanExporter { + + private object Defaults { + val Endpoint: Uri = uri"http://localhost:4318/v1/traces" + val Timeout: FiniteDuration = 10.seconds + val GzipCompression: Boolean = false + val UserAgentName: String = "OTel-OTLP-Exporter-Scala-Otel4s" + } + + /** A builder of [[OtlpHttpSpanExporter]] */ + sealed trait Builder[F[_]] { + + /** Sets the OTLP endpoint to connect to. + * + * The endpoint must start with either `http://` or `https://`, and include + * the full HTTP path. + * + * Default value is `http://localhost:4318/v1/traces`. + */ + def withEndpoint(endpoint: Uri): Builder[F] + + /** Sets the maximum time to wait for the collector to process an exported + * batch of spans. + * + * Default value is `10 seconds`. + */ + def withTimeout(timeout: FiniteDuration): Builder[F] + + /** Enables Gzip compression. + * + * The compression is disabled by default. + */ + def withGzip: Builder[F] + + /** Disables Gzip compression. */ + def withoutGzip: Builder[F] + + /** Adds headers to requests. */ + def addHeaders(headers: Headers): Builder[F] + + /** Sets the explicit TLS context the HTTP client should use. + */ + def withTLSContext(context: TLSContext[F]): Builder[F] + + /** Sets the retry policy to use. + * + * Default retry policy is [[RetryPolicy.default]]. + */ + def withRetryPolicy(policy: RetryPolicy): Builder[F] + + /** Configures the exporter to use the given encoding. + * + * Default encoding is `Protobuf`. + */ + def withEncoding(encoding: HttpPayloadEncoding): Builder[F] + + /** Creates a [[OtlpHttpSpanExporter]] using the configuration of this + * builder. + */ + def build: Resource[F, SpanExporter[F]] + } + + /** Creates a [[Builder]] of [[OtlpHttpSpanExporter]] with the default + * configuration: + * - encoding: `Protobuf` + * - endpoint: `http://localhost:4318/v1/traces` + * - timeout: `10 seconds` + * - retry policy: 5 exponential attempts, initial backoff is `1 second`, + * max backoff is `5 seconds` + */ + def builder[F[_]: Async: Network: Compression: Console]: Builder[F] = + BuilderImpl( + encoding = HttpPayloadEncoding.Protobuf, + endpoint = Defaults.Endpoint, + gzipCompression = Defaults.GzipCompression, + timeout = Defaults.Timeout, + headers = Headers( + `User-Agent`( + ProductId(Defaults.UserAgentName, version = Some(BuildInfo.version)) + ) + ), + retryPolicy = RetryPolicy.default, + tlsContext = None + ) + + private final case class BuilderImpl[ + F[_]: Async: Network: Compression: Console + ]( + encoding: HttpPayloadEncoding, + endpoint: Uri, + gzipCompression: Boolean, + timeout: FiniteDuration, + headers: Headers, + retryPolicy: RetryPolicy, + tlsContext: Option[TLSContext[F]] + ) extends Builder[F] { + + def withTimeout(timeout: FiniteDuration): Builder[F] = + copy(timeout = timeout) + + def withEndpoint(endpoint: Uri): Builder[F] = + copy(endpoint = endpoint) + + def addHeaders(headers: Headers): Builder[F] = + copy(headers = this.headers ++ headers) + + def withGzip: Builder[F] = + copy(gzipCompression = true) + + def withoutGzip: Builder[F] = + copy(gzipCompression = false) + + def withTLSContext(context: TLSContext[F]): Builder[F] = + copy(tlsContext = Some(context)) + + def withRetryPolicy(policy: RetryPolicy): Builder[F] = + copy(retryPolicy = policy) + + def withEncoding(encoding: HttpPayloadEncoding): Builder[F] = + copy(encoding = encoding) + + def build: Resource[F, SpanExporter[F]] = { + import SpansProtoEncoder.spanDataToRequest + import SpansProtoEncoder.jsonPrinter + + for { + client <- OtlpHttpClient.create[F, SpanData]( + encoding, + endpoint, + timeout, + headers, + gzipCompression, + retryPolicy, + tlsContext + ) + } yield new OtlpHttpSpanExporter[F](client) + } + } + +} diff --git a/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansProtoEncoder.scala b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansProtoEncoder.scala new file mode 100644 index 000000000..2c649cd95 --- /dev/null +++ b/sdk-exporter/trace/shared/src/main/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansProtoEncoder.scala @@ -0,0 +1,169 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp +package trace + +import com.google.protobuf.ByteString +import io.circe.Json +import io.opentelemetry.proto.collector.trace.v1.trace_service.ExportTraceServiceRequest +import io.opentelemetry.proto.trace.v1.trace.{Span => SpanProto} +import io.opentelemetry.proto.trace.v1.trace.{Status => StatusProto} +import io.opentelemetry.proto.trace.v1.trace.ResourceSpans +import io.opentelemetry.proto.trace.v1.trace.ScopeSpans +import org.typelevel.otel4s.sdk.trace.data.EventData +import org.typelevel.otel4s.sdk.trace.data.LinkData +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.data.StatusData +import org.typelevel.otel4s.trace.SpanKind +import org.typelevel.otel4s.trace.Status +import scalapb.descriptors.FieldDescriptor +import scalapb.descriptors.PByteString +import scalapb.descriptors.PValue +import scalapb_circe.Printer +import scodec.bits.ByteVector + +/** @see + * [[https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto]] + */ +private object SpansProtoEncoder { + + implicit val jsonPrinter: Printer = new ProtoEncoder.JsonPrinter { + private val EncodeAsHex = Set("trace_id", "span_id", "parent_span_id") + + /** The `traceId` and `spanId` byte arrays are represented as + * case-insensitive hex-encoded strings; they are not base64-encoded as is + * defined in the standard Protobuf JSON Mapping. Hex encoding is used for + * traceId and spanId fields in all OTLP Protobuf messages, e.g., the Span, + * Link, LogRecord, etc. messages. + * + * @see + * [[https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/docs/specification.md#json-protobuf-encoding]] + */ + override def serializeSingleValue( + fd: FieldDescriptor, + value: PValue, + formattingLongAsNumber: Boolean + ): Json = { + value match { + case PByteString(bs) if EncodeAsHex.contains(fd.name) => + Json.fromString(ByteVector(bs.toByteArray()).toHex) + case _ => + super.serializeSingleValue(fd, value, formattingLongAsNumber) + } + } + } + + implicit val statusEncoder: ProtoEncoder[Status, StatusProto.StatusCode] = { + case Status.Unset => StatusProto.StatusCode.STATUS_CODE_UNSET + case Status.Ok => StatusProto.StatusCode.STATUS_CODE_OK + case Status.Error => StatusProto.StatusCode.STATUS_CODE_ERROR + } + + implicit val spanKindEncoder: ProtoEncoder[SpanKind, SpanProto.SpanKind] = { + case SpanKind.Internal => SpanProto.SpanKind.SPAN_KIND_INTERNAL + case SpanKind.Server => SpanProto.SpanKind.SPAN_KIND_SERVER + case SpanKind.Client => SpanProto.SpanKind.SPAN_KIND_CLIENT + case SpanKind.Producer => SpanProto.SpanKind.SPAN_KIND_PRODUCER + case SpanKind.Consumer => SpanProto.SpanKind.SPAN_KIND_CONSUMER + } + + implicit val statusDataEncoder: ProtoEncoder[StatusData, StatusProto] = { + data => + StatusProto( + message = data.description.getOrElse(""), + code = ProtoEncoder.encode(data.status) + ) + } + + implicit val eventDataEncoder: ProtoEncoder[EventData, SpanProto.Event] = { + data => + SpanProto.Event( + timeUnixNano = data.timestamp.toNanos, + name = data.name, + attributes = ProtoEncoder.encode(data.attributes) + ) + } + + implicit val linkDataEncoder: ProtoEncoder[LinkData, SpanProto.Link] = { + data => + val traceState = data.spanContext.traceState.asMap + .map { case (key, value) => s"$key=$value" } + .mkString(",") + + SpanProto.Link( + traceId = ByteString.copyFrom(data.spanContext.traceId.toArray), + spanId = ByteString.copyFrom(data.spanContext.spanId.toArray), + traceState = traceState, + attributes = ProtoEncoder.encode(data.attributes) + ) + } + + implicit val spanDataEncoder: ProtoEncoder[SpanData, SpanProto] = { span => + val traceState = span.spanContext.traceState.asMap + .map { case (key, value) => s"$key=$value" } + .mkString(",") + + SpanProto( + traceId = ByteString.copyFrom(span.spanContext.traceId.toArray), + spanId = ByteString.copyFrom(span.spanContext.spanId.toArray), + traceState = traceState, + parentSpanId = span.parentSpanContext + .map(s => ByteString.copyFrom(s.spanId.toArray)) + .getOrElse(ByteString.EMPTY), + name = span.name, + kind = ProtoEncoder.encode(span.kind), + startTimeUnixNano = span.startTimestamp.toNanos, + endTimeUnixNano = span.endTimestamp.map(_.toNanos).getOrElse(0L), + attributes = ProtoEncoder.encode(span.attributes), + events = span.events.map(event => ProtoEncoder.encode(event)), + links = span.links.map(link => ProtoEncoder.encode(link)), + status = Some(ProtoEncoder.encode(span.status)) + ) + } + + implicit val spanDataToRequest + : ProtoEncoder[List[SpanData], ExportTraceServiceRequest] = { spans => + val resourceSpans = + spans + .groupBy(_.resource) + .map { case (resource, resourceSpans) => + val scopeSpans: List[ScopeSpans] = + resourceSpans + .groupBy(_.instrumentationScope) + .map { case (scope, spans) => + ScopeSpans( + scope = Some(ProtoEncoder.encode(scope)), + spans = spans.map(span => ProtoEncoder.encode(span)), + schemaUrl = scope.schemaUrl.getOrElse("") + ) + } + .toList + + ResourceSpans( + Some(ProtoEncoder.encode(resource)), + scopeSpans, + resource.schemaUrl.getOrElse("") + ) + } + .toList + + ExportTraceServiceRequest(resourceSpans) + } + +} diff --git a/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporterSuite.scala b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporterSuite.scala new file mode 100644 index 000000000..c0b5c07b3 --- /dev/null +++ b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/OtlpHttpSpanExporterSuite.scala @@ -0,0 +1,281 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk +package exporter.otlp.trace + +import cats.effect.IO +import com.comcast.ip4s.IpAddress +import io.circe.Encoder +import io.circe.Json +import munit._ +import org.http4s.Headers +import org.http4s.ember.client.EmberClientBuilder +import org.http4s.headers._ +import org.http4s.syntax.literals._ +import org.scalacheck.Arbitrary +import org.scalacheck.Gen +import org.scalacheck.Test +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeType +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.sdk.exporter.RetryPolicy +import org.typelevel.otel4s.sdk.exporter.otlp.HttpPayloadEncoding +import org.typelevel.otel4s.sdk.trace.data.EventData +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.scalacheck.Arbitraries._ +import org.typelevel.otel4s.trace.Status + +import java.util.Locale +import scala.concurrent.duration._ + +class OtlpHttpSpanExporterSuite + extends CatsEffectSuite + with ScalaCheckEffectSuite + with ExporterSuitePlatform { + + import OtlpHttpSpanExporterSuite._ + + private implicit val encodingArbitrary: Arbitrary[HttpPayloadEncoding] = + Arbitrary(Gen.oneOf(HttpPayloadEncoding.Protobuf, HttpPayloadEncoding.Json)) + + test("represent builder parameters in the name") { + PropF.forAllF { (encoding: HttpPayloadEncoding) => + val enc = encoding match { + case HttpPayloadEncoding.Json => "Json" + case HttpPayloadEncoding.Protobuf => "Protobuf" + } + + val expected = + s"OtlpHttpSpanExporter{client=OtlpHttpClient{encoding=$enc, " + + "endpoint=https://localhost:4318/api/v1/traces, " + + "timeout=5 seconds, " + + "gzipCompression=true, " + + s"headers={User-Agent: OTel-OTLP-Exporter-Scala-Otel4s/${BuildInfo.version},X-Forwarded-For: 127.0.0.1}}}" + + OtlpHttpSpanExporter + .builder[IO] + .addHeaders( + Headers(`X-Forwarded-For`(IpAddress.fromString("127.0.0.1"))) + ) + .withEndpoint(uri"https://localhost:4318/api/v1/traces") + .withTimeout(5.seconds) + .withGzip + .withEncoding(encoding) + .build + .use { exporter => + IO(assertEquals(exporter.name, expected)) + } + } + } + + test("export spans") { + PropF.forAllF { (sd: SpanData, encoding: HttpPayloadEncoding) => + IO.realTime.flatMap { now => + // we need to tweak end timestamps and attributes, so we recreate the span data + val span = SpanData( + name = sd.name, + spanContext = sd.spanContext, + parentSpanContext = sd.parentSpanContext, + kind = sd.kind, + startTimestamp = now, + endTimestamp = Some(now.plus(5.seconds)), + status = sd.status, + attributes = adaptAttributes(sd.attributes), + events = sd.events.map { event => + EventData( + event.name, + now.plus(2.seconds), + adaptAttributes(event.attributes) + ) + }, + links = sd.links, + instrumentationScope = sd.instrumentationScope, + resource = TelemetryResource.default + ) + + val expected = { + val references = { + val childOf = span.parentSpanContext.map { parent => + JaegerRef( + "CHILD_OF", + span.spanContext.traceIdHex, + parent.spanIdHex + ) + } + + val links = span.links.map { d => + JaegerRef( + "FOLLOWS_FROM", + d.spanContext.traceIdHex, + d.spanContext.spanIdHex + ) + } + + childOf.toList ::: links.toList + } + + val duration = + span.endTimestamp.getOrElse(Duration.Zero) - span.startTimestamp + + val tags = { + val extra = List( + List( + Attribute( + "span.kind", + span.kind.toString.toLowerCase(Locale.ROOT) + ) + ), + Option.when(span.status.status != Status.Unset)( + Attribute( + "otel.status_code", + span.status.status.toString.toUpperCase(Locale.ROOT) + ) + ), + Option.when(span.status.status == Status.Error)( + Attribute("error", true) + ), + span.status.description.filter(_.nonEmpty).map { description => + Attribute("otel.status_description", description) + }, + List(Attribute("internal.span.format", "otlp")) + ).flatten + + span.attributes.map(a => toJaegerTag(a)).toList ++ + extra.map(a => toJaegerTag(a)) + } + + val events = + span.events.map(d => JaegerLog(d.timestamp.toMicros)).toList + + val jaegerSpan = JaegerSpan( + span.spanContext.traceIdHex, + span.spanContext.spanIdHex, + span.name, + references, + span.startTimestamp.toMicros, + duration.toMicros, + tags, + events, + "p1" + ) + + val jaegerTrace = JaegerTrace( + span.spanContext.traceIdHex, + List(jaegerSpan) + ) + + JaegerResponse(List(jaegerTrace)) + } + + OtlpHttpSpanExporter + .builder[IO] + .withEncoding(encoding) + .withTimeout(20.seconds) + .withRetryPolicy( + RetryPolicy.builder + .withInitialBackoff(2.second) + .withMaxBackoff(20.seconds) + .build + ) + .build + .use(exporter => exporter.exportSpans(List(span))) + .flatMap { _ => + assertIO(findTrace(span.spanContext.traceIdHex), expected) + } + } + } + } + + // it's hard to deal with big numeric values due to various encoding pitfalls + // so we simplify the numbers + private def adaptAttributes(attributes: Attributes): Attributes = { + val adapted = attributes.map { attribute => + val name = attribute.key.name + attribute.key.`type` match { + case AttributeType.Double => Attribute(name, 1.1) + case AttributeType.DoubleList => Attribute(name, List(1.1)) + case AttributeType.Long => Attribute(name, 1L) + case AttributeType.LongList => Attribute(name, List(1L)) + case _ => attribute + } + } + + Attributes.fromSpecific(adapted) + } + + private def toJaegerTag(a: Attribute[_]): JaegerTag = { + import io.circe.syntax._ + + def primitive[A: Encoder](tpe: String): JaegerTag = + JaegerTag(a.key.name, tpe, a.value.asInstanceOf[A].asJson) + + def list[A: Encoder]: JaegerTag = { + val json = a.value.asInstanceOf[List[A]].map(_.asJson).asJson + JaegerTag(a.key.name, "string", json.noSpaces.asJson) + } + + a.key.`type` match { + case AttributeType.Boolean => primitive[Boolean]("bool") + case AttributeType.String => primitive[String]("string") + case AttributeType.Double => primitive[Double]("float64") + case AttributeType.Long => primitive[Long]("int64") + case AttributeType.BooleanList => list[Boolean] + case AttributeType.StringList => list[String] + case AttributeType.DoubleList => list[Double] + case AttributeType.LongList => list[Long] + } + } + + private def findTrace(traceIdHex: String): IO[JaegerResponse] = + EmberClientBuilder.default[IO].build.use { client => + import org.http4s.syntax.literals._ + import org.http4s.circe.CirceEntityCodec._ + import io.circe.generic.auto._ + + val url = uri"http://localhost:16686" / "api" / "traces" / traceIdHex + client.expect[JaegerResponse](url) + } + + override protected def scalaCheckTestParameters: Test.Parameters = + super.scalaCheckTestParameters + .withMinSuccessfulTests(10) + .withMaxSize(10) + +} + +object OtlpHttpSpanExporterSuite { + case class JaegerRef(refType: String, traceID: String, spanID: String) + case class JaegerTag(key: String, `type`: String, value: Json) + case class JaegerLog(timestamp: Long) + + case class JaegerSpan( + traceID: String, + spanID: String, + operationName: String, + references: List[JaegerRef], + startTime: Long, + duration: Long, + tags: List[JaegerTag], + logs: List[JaegerLog], + processID: String + ) + + case class JaegerTrace(traceID: String, spans: List[JaegerSpan]) + case class JaegerResponse(data: List[JaegerTrace]) + +} diff --git a/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansJsonCodecs.scala b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansJsonCodecs.scala new file mode 100644 index 000000000..28a128dac --- /dev/null +++ b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansJsonCodecs.scala @@ -0,0 +1,143 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp +package trace + +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax._ +import org.typelevel.otel4s.sdk.trace.data.EventData +import org.typelevel.otel4s.sdk.trace.data.LinkData +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.data.StatusData +import org.typelevel.otel4s.trace.SpanKind +import org.typelevel.otel4s.trace.Status +import org.typelevel.otel4s.trace.TraceState + +// the instances mimic Protobuf encoding +private object SpansJsonCodecs extends JsonCodecs { + + implicit val spanKindJsonEncoder: Encoder[SpanKind] = + Encoder[Int].contramap { + case SpanKind.Internal => 1 + case SpanKind.Server => 2 + case SpanKind.Client => 3 + case SpanKind.Producer => 4 + case SpanKind.Consumer => 5 + } + + implicit val traceStateJsonEncoder: Encoder[TraceState] = + Encoder.instance { state => + if (state.isEmpty) + Json.Null + else + state.asMap + .map { case (key, value) => s"$key=$value" } + .mkString(",") + .asJson + } + + implicit val statusDataJsonEncoder: Encoder[StatusData] = + Encoder.instance { statusData => + val message = + statusData.description.filter(_.trim.nonEmpty).fold(Json.Null)(_.asJson) + + val code = statusData.status match { + case Status.Unset => Json.Null + case Status.Ok => 1.asJson + case Status.Error => 2.asJson + } + + Json + .obj( + "message" := message, + "code" := code + ) + .dropNullValues + } + + implicit val eventDataJsonEncoder: Encoder[EventData] = + Encoder.instance { eventData => + Json + .obj( + "timeUnixNano" := eventData.timestamp.toNanos.toString, + "name" := eventData.name, + "attributes" := eventData.attributes + ) + .dropEmptyValues + } + + implicit val linkDataJsonEncoder: Encoder[LinkData] = + Encoder.instance { link => + Json + .obj( + "traceId" := link.spanContext.traceIdHex, + "spanId" := link.spanContext.spanIdHex, + "traceState" := link.spanContext.traceState, + "attributes" := link.attributes + ) + .dropNullValues + .dropEmptyValues + } + + implicit val spanDataJsonEncoder: Encoder[SpanData] = + Encoder.instance { span => + Json + .obj( + "traceId" := span.spanContext.traceIdHex, + "spanId" := span.spanContext.spanIdHex, + "traceState" := span.spanContext.traceState, + "parentSpanId" := span.parentSpanContext.map(_.spanIdHex), + "name" := span.name, + "kind" := span.kind, + "startTimeUnixNano" := span.startTimestamp.toNanos.toString, + "endTimeUnixNano" := span.endTimestamp.map(_.toNanos.toString), + "attributes" := span.attributes, + "events" := span.events, + "links" := span.links + ) + .dropNullValues + .dropEmptyValues + .deepMerge(Json.obj("status" := span.status)) + } + + implicit val spanDataListJsonEncoder: Encoder[List[SpanData]] = + Encoder.instance { spans => + val resourceSpans = + spans.groupBy(_.resource).map { case (resource, resourceSpans) => + val scopeSpans: Iterable[Json] = + resourceSpans + .groupBy(_.instrumentationScope) + .map { case (scope, spans) => + Json.obj( + "scope" := scope, + "spans" := spans.map(Encoder[SpanData].apply) + ) + } + + Json.obj( + "resource" := resource, + "scopeSpans" := scopeSpans + ) + } + + Json.obj("resourceSpans" := resourceSpans).deepDropNullValues + } + +} diff --git a/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansProtoEncoderSuite.scala b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansProtoEncoderSuite.scala new file mode 100644 index 000000000..88fb43879 --- /dev/null +++ b/sdk-exporter/trace/shared/src/test/scala/org/typelevel/otel4s/sdk/exporter/otlp/trace/SpansProtoEncoderSuite.scala @@ -0,0 +1,239 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s +package sdk +package exporter.otlp +package trace + +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax._ +import munit._ +import org.scalacheck.Arbitrary +import org.scalacheck.Prop +import org.scalacheck.Test +import org.typelevel.otel4s.sdk.trace.data.EventData +import org.typelevel.otel4s.sdk.trace.data.LinkData +import org.typelevel.otel4s.sdk.trace.data.SpanData +import org.typelevel.otel4s.sdk.trace.data.StatusData +import org.typelevel.otel4s.sdk.trace.scalacheck.Arbitraries._ +import org.typelevel.otel4s.trace.SpanContext +import org.typelevel.otel4s.trace.Status +import org.typelevel.otel4s.trace.TraceFlags +import org.typelevel.otel4s.trace.TraceState +import scodec.bits.ByteVector + +import scala.concurrent.duration._ + +class SpansProtoEncoderSuite extends ScalaCheckSuite { + import SpansJsonCodecs._ + import SpansProtoEncoder._ + + test("encode StatusData") { + Prop.forAll(Arbitrary.arbitrary[StatusData]) { statusData => + val message = + statusData.description.filter(_.trim.nonEmpty).fold(Json.Null)(_.asJson) + + val code = statusData.status match { + case Status.Unset => Json.Null + case Status.Ok => 1.asJson + case Status.Error => 2.asJson + } + + val expected = Json + .obj( + "message" := message, + "code" := code + ) + .dropNullValues + + assertEquals(ProtoEncoder.toJson(statusData), expected) + } + } + + test("encode StatusData (noSpaces)") { + assertEquals( + ProtoEncoder.toJson(StatusData(Status.Unset)).noSpaces, + """{}""" + ) + + assertEquals( + ProtoEncoder.toJson(StatusData(Status.Ok)).noSpaces, + """{"code":1}""" + ) + + assertEquals( + ProtoEncoder.toJson(StatusData(Status.Error)).noSpaces, + """{"code":2}""" + ) + + assertEquals( + ProtoEncoder.toJson(StatusData(Status.Error, "description")).noSpaces, + """{"message":"description","code":2}""" + ) + } + + test("encode EventData") { + Prop.forAll(Arbitrary.arbitrary[EventData]) { eventData => + val expected = Json + .obj( + "timeUnixNano" := eventData.timestamp.toNanos.toString, + "name" := eventData.name, + "attributes" := eventData.attributes + ) + .dropNullValues + .dropEmptyValues + + assertEquals(ProtoEncoder.toJson(eventData), expected) + } + } + + test("encode EventData (noSpaces)") { + val attrs = Attributes(Attribute("key", "value")) + + assertEquals( + ProtoEncoder + .toJson(EventData("name", 1.nanos, Attributes.empty)) + .noSpaces, + """{"timeUnixNano":"1","name":"name"}""" + ) + + assertEquals( + ProtoEncoder.toJson(EventData("name", 1.nanos, attrs)).noSpaces, + """{"timeUnixNano":"1","name":"name","attributes":[{"key":"key","value":{"stringValue":"value"}}]}""" + ) + } + + test("encode LinkData") { + Prop.forAll(Arbitrary.arbitrary[LinkData]) { link => + val expected = Json + .obj( + "traceId" := link.spanContext.traceIdHex, + "spanId" := link.spanContext.spanIdHex, + "traceState" := link.spanContext.traceState, + "attributes" := link.attributes, + ) + .dropNullValues + .dropEmptyValues + + assertEquals(ProtoEncoder.toJson(link), expected) + } + } + + test("encode LinkData (noSpaces)") { + val attrs = Attributes(Attribute("key", "value")) + + val ctx = SpanContext( + ByteVector.fromValidHex("aae6750d58ff8148fa33894599afaaf2"), + ByteVector.fromValidHex("f676d76b0b3d4324"), + TraceFlags.fromByte(1), + TraceState.empty.updated("k", "v").updated("k2", "v2"), + false + ) + + val ctx2 = SpanContext( + ByteVector.fromValidHex("aae6750d58ff8148fa33894599afaaf2"), + ByteVector.fromValidHex("f676d76b0b3d4324"), + TraceFlags.Default, + TraceState.empty, + false + ) + + assertEquals( + ProtoEncoder.toJson(LinkData(ctx)).noSpaces, + """{"traceId":"aae6750d58ff8148fa33894599afaaf2","spanId":"f676d76b0b3d4324","traceState":"k2=v2,k=v"}""" + ) + + assertEquals( + ProtoEncoder.toJson(LinkData(ctx, attrs)).noSpaces, + """{"traceId":"aae6750d58ff8148fa33894599afaaf2","spanId":"f676d76b0b3d4324","traceState":"k2=v2,k=v","attributes":[{"key":"key","value":{"stringValue":"value"}}]}""" + ) + + assertEquals( + ProtoEncoder.toJson(LinkData(ctx2)).noSpaces, + """{"traceId":"aae6750d58ff8148fa33894599afaaf2","spanId":"f676d76b0b3d4324"}""" + ) + } + + test("encode SpanData") { + Prop.forAll(Arbitrary.arbitrary[SpanData]) { span => + val name = + if (span.name.trim.nonEmpty) span.name.asJson else Json.Null + + val expected = Json + .obj( + "traceId" := span.spanContext.traceIdHex, + "spanId" := span.spanContext.spanIdHex, + "traceState" := span.spanContext.traceState, + "parentSpanId" := span.parentSpanContext.map(_.spanIdHex), + "name" := name, + "kind" := span.kind, + "startTimeUnixNano" := span.startTimestamp.toNanos.toString, + "endTimeUnixNano" := span.endTimestamp.map(_.toNanos.toString), + "attributes" := span.attributes, + "events" := span.events, + "links" := span.links + ) + .dropNullValues + .dropEmptyValues + .deepMerge(Json.obj("status" := span.status)) + + assertEquals(ProtoEncoder.toJson(span), expected) + } + } + + test("encode List[SpanData]") { + Prop.forAll(Arbitrary.arbitrary[List[SpanData]]) { spans => + val resourceSpans = + spans.groupBy(_.resource).map { case (resource, resourceSpans) => + val scopeSpans: Iterable[Json] = + resourceSpans + .groupBy(_.instrumentationScope) + .map { case (scope, spans) => + Json + .obj( + "scope" := scope, + "spans" := spans.map(Encoder[SpanData].apply), + "schemaUrl" := scope.schemaUrl + ) + .dropNullValues + } + + Json + .obj( + "resource" := resource, + "scopeSpans" := scopeSpans, + "schemaUrl" := resource.schemaUrl + ) + .dropNullValues + } + + val expected = Json.obj("resourceSpans" := resourceSpans).dropEmptyValues + + assertEquals( + ProtoEncoder.toJson(spans).noSpacesSortKeys, + expected.noSpacesSortKeys + ) + } + } + + override protected def scalaCheckTestParameters: Test.Parameters = + super.scalaCheckTestParameters + .withMinSuccessfulTests(5) + .withMaxSize(5) + +} diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/scalacheck/Gens.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/scalacheck/Gens.scala index 98adcbecf..08d6f9039 100644 --- a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/scalacheck/Gens.scala +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/scalacheck/Gens.scala @@ -24,10 +24,15 @@ import org.typelevel.otel4s.sdk.trace.data.StatusData import org.typelevel.otel4s.sdk.trace.samplers.SamplingDecision import org.typelevel.otel4s.sdk.trace.samplers.SamplingResult +import scala.concurrent.duration._ + trait Gens extends org.typelevel.otel4s.sdk.scalacheck.Gens with org.typelevel.otel4s.trace.scalacheck.Gens { + val timestamp: Gen[FiniteDuration] = + Gen.chooseNum(1L, Long.MaxValue).map(_.nanos) + val samplingDecision: Gen[SamplingDecision] = Gen.oneOf( SamplingDecision.Drop, @@ -43,8 +48,8 @@ trait Gens val eventData: Gen[EventData] = for { - name <- Gen.alphaNumStr - epoch <- Gen.finiteDuration + name <- Gens.nonEmptyString + epoch <- Gens.timestamp attributes <- Gens.attributes } yield EventData(name, epoch, attributes) @@ -56,7 +61,7 @@ trait Gens val statusData: Gen[StatusData] = for { - description <- Gen.option(Gen.alphaNumStr) + description <- Gen.option(Gens.nonEmptyString) data <- Gen.oneOf( StatusData.Ok, StatusData.Unset, @@ -66,12 +71,12 @@ trait Gens val spanData: Gen[SpanData] = for { - name <- Gen.alphaStr + name <- Gens.nonEmptyString spanContext <- Gens.spanContext parentSpanContext <- Gen.option(Gens.spanContext) kind <- Gens.spanKind - startEpochNanos <- Gen.finiteDuration - endEpochNanos <- Gen.option(Gen.finiteDuration) + startTimestamp <- Gens.timestamp + endTimestamp <- Gen.option(Gens.timestamp) status <- Gens.statusData attributes <- Gens.attributes events <- Gen.listOf(Gens.eventData) @@ -83,8 +88,8 @@ trait Gens spanContext, parentSpanContext, kind, - startEpochNanos, - endEpochNanos, + startTimestamp, + endTimestamp, status, attributes, events.toVector,