diff --git a/fips-pom.xml b/fips-pom.xml index b2a8a957..c766ad71 100644 --- a/fips-pom.xml +++ b/fips-pom.xml @@ -46,6 +46,18 @@ 2.13.4.2 + + + + io.opentelemetry + opentelemetry-bom + 1.39.0 + pom + import + + + + org.scala-lang @@ -83,6 +95,12 @@ 1.15 + + + io.opentelemetry + opentelemetry-api + + @@ -128,6 +146,17 @@ + + io.opentelemetry + opentelemetry-sdk + test + + + io.opentelemetry + opentelemetry-exporters-inmemory + 0.9.1 + test + junit junit diff --git a/pom.xml b/pom.xml index 7ba2f364..462b6137 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,17 @@ 2.13.2 2.13.4.2 + + + + io.opentelemetry + opentelemetry-bom + 1.39.0 + pom + import + + + @@ -84,7 +95,11 @@ commons-codec 1.15 - + + + io.opentelemetry + opentelemetry-api + @@ -118,6 +133,17 @@ + + io.opentelemetry + opentelemetry-sdk + test + + + io.opentelemetry + opentelemetry-exporters-inmemory + 0.9.1 + test + junit junit diff --git a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala new file mode 100644 index 00000000..c6443e2a --- /dev/null +++ b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala @@ -0,0 +1,57 @@ +package com.snowflake.snowpark.internal + +import com.snowflake.snowpark.DataFrame +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.{Span, StatusCode} + +object OpenTelemetry extends Logging { + // class name format: snow.snowpark. + // method chain: Dataframe.filter.join.select.collect + // todo: track line number in SNOW-1480775 + def emit( + className: String, + funcName: String, + fileName: String, + lineNumber: Int, + methodChain: String): Unit = + emit(className, funcName) { span => + { + span.setAttribute("code.filepath", fileName) + span.setAttribute("code.lineno", lineNumber) + span.setAttribute("method.chain", methodChain) + } + } + + def reportError(className: String, funcName: String, error: Throwable): Unit = + emit(className, funcName) { span => + { + span.setStatus(StatusCode.ERROR, error.getMessage) + span.recordException(error) + } + } + + private def emit(className: String, funcName: String)(report: Span => Unit): Unit = { + val name = s"snow.snowpark.$className" + val tracer = GlobalOpenTelemetry.getTracer(name) + val span = tracer.spanBuilder(funcName).startSpan() + try { + val scope = span.makeCurrent() + // Using Manager is not available in Scala 2.12 yet + try { + report(span) + } catch { + case e: Exception => + logWarning(s"Error when acquiring span attributes. ${e.getMessage}") + } finally { + scope.close() + } + } finally { + span.end() + } + } + + // todo: Snow-1480779 + def buildMethodChain(funcName: String, df: DataFrame): String = { + "" + } +} diff --git a/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala b/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala new file mode 100644 index 00000000..ea4d82a8 --- /dev/null +++ b/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala @@ -0,0 +1,74 @@ +package com.snowflake.snowpark + +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.exporters.inmemory.InMemorySpanExporter +import io.opentelemetry.sdk.OpenTelemetrySdk +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.sdk.trace.SdkTracerProvider +import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor +import io.opentelemetry.sdk.trace.data.SpanData +import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData + +trait OpenTelemetryEnabled extends SNTestBase { + lazy protected val testSpanExporter: InMemorySpanExporter = InMemorySpanExporter.create() + + override def beforeAll: Unit = { + super.beforeAll + val resource = Resource.getDefault.toBuilder + .put("service.name", "test-server") + .put("service.version", "0.1.0") + .build() + + val sdkTracerProvider = SdkTracerProvider + .builder() + .addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)) + .setResource(resource) + .build() + + OpenTelemetrySdk + .builder() + .setTracerProvider(sdkTracerProvider) + .buildAndRegisterGlobal() + } + + def checkSpan( + className: String, + funcName: String, + fileName: String, + lineNumber: Int, + methodChain: String): Unit = + checkSpan(className, funcName) { span => + { + assert(span.getTotalAttributeCount == 3) + assert(span.getAttributes.get(AttributeKey.stringKey("code.filepath")) == fileName) + assert(span.getAttributes.get(AttributeKey.longKey("code.lineno")) == lineNumber) + assert(span.getAttributes.get(AttributeKey.stringKey("method.chain")) == methodChain) + } + } + + def checkSpan(className: String, funcName: String)(func: SpanData => Unit): Unit = { + val span: SpanData = testSpanExporter.getFinishedSpanItems.get(0) + assert(span.getName == funcName) + assert(span.getInstrumentationScopeInfo.getName == className) + func(span) + testSpanExporter.reset() + } + + def checkSpanError(className: String, funcName: String, error: Throwable): Unit = + checkSpan(className, funcName) { span => + { + assert(span.getStatus.getStatusCode == StatusCode.ERROR) + assert(span.getStatus.getDescription == error.getMessage) + assert(span.getEvents.size() == 1) + assert(span.getEvents.get(0).isInstanceOf[ExceptionEventData]) + assert(span.getEvents.get(0).asInstanceOf[ExceptionEventData].getException == error) + } + } + + override def afterAll: Unit = { + GlobalOpenTelemetry.resetForTest() + super.afterAll + } +} diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala new file mode 100644 index 00000000..a3bc8abc --- /dev/null +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -0,0 +1,19 @@ +package com.snowflake.snowpark_test + +import com.snowflake.snowpark.OpenTelemetryEnabled +import com.snowflake.snowpark.internal.OpenTelemetry + +class OpenTelemetrySuite extends OpenTelemetryEnabled { + + test("OpenTelemetry.emit") { + OpenTelemetry.emit("ClassA", "functionB", "fileC", 123, "chainD") + checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD") + } + + test("report error") { + val error = new Exception("test") + OpenTelemetry.reportError("ClassA1", "functionB1", error) + checkSpanError("snow.snowpark.ClassA1", "functionB1", error) + } + +}