diff --git a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala index 38d77ed4..c6443e2a 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala @@ -2,6 +2,7 @@ package com.snowflake.snowpark.internal import com.snowflake.snowpark.DataFrame import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.{Span, StatusCode} object OpenTelemetry extends Logging { // class name format: snow.snowpark. @@ -12,7 +13,24 @@ object OpenTelemetry extends Logging { funcName: String, fileName: String, lineNumber: Int, - methodChain: String): Unit = { + methodChain: String): Unit = + emit(className, funcName) { span => + { + span.setAttribute("code.filepath", fileName) + span.setAttribute("code.lineno", lineNumber) + span.setAttribute("method.chain", methodChain) + } + } + + def reportError(className: String, funcName: String, error: Throwable): Unit = + emit(className, funcName) { span => + { + span.setStatus(StatusCode.ERROR, error.getMessage) + span.recordException(error) + } + } + + private def emit(className: String, funcName: String)(report: Span => Unit): Unit = { val name = s"snow.snowpark.$className" val tracer = GlobalOpenTelemetry.getTracer(name) val span = tracer.spanBuilder(funcName).startSpan() @@ -20,9 +38,7 @@ object OpenTelemetry extends Logging { val scope = span.makeCurrent() // Using Manager is not available in Scala 2.12 yet try { - span.setAttribute("code.filepath", fileName) - span.setAttribute("code.lineno", lineNumber) - span.setAttribute("method.chain", methodChain) + report(span) } catch { case e: Exception => logWarning(s"Error when acquiring span attributes. ${e.getMessage}") diff --git a/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala b/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala index a9b31440..ea4d82a8 100644 --- a/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala +++ b/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala @@ -2,11 +2,14 @@ package com.snowflake.snowpark import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.trace.StatusCode import io.opentelemetry.exporters.inmemory.InMemorySpanExporter import io.opentelemetry.sdk.OpenTelemetrySdk import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.sdk.trace.SdkTracerProvider import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor +import io.opentelemetry.sdk.trace.data.SpanData +import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData trait OpenTelemetryEnabled extends SNTestBase { lazy protected val testSpanExporter: InMemorySpanExporter = InMemorySpanExporter.create() @@ -35,17 +38,35 @@ trait OpenTelemetryEnabled extends SNTestBase { funcName: String, fileName: String, lineNumber: Int, - methodChain: String): Unit = { - val span = testSpanExporter.getFinishedSpanItems.get(0) - assert(span.getTotalAttributeCount == 3) // code.filepath, code.lineno, method.chain + methodChain: String): Unit = + checkSpan(className, funcName) { span => + { + assert(span.getTotalAttributeCount == 3) + assert(span.getAttributes.get(AttributeKey.stringKey("code.filepath")) == fileName) + assert(span.getAttributes.get(AttributeKey.longKey("code.lineno")) == lineNumber) + assert(span.getAttributes.get(AttributeKey.stringKey("method.chain")) == methodChain) + } + } + + def checkSpan(className: String, funcName: String)(func: SpanData => Unit): Unit = { + val span: SpanData = testSpanExporter.getFinishedSpanItems.get(0) assert(span.getName == funcName) assert(span.getInstrumentationScopeInfo.getName == className) - assert(span.getAttributes.get(AttributeKey.stringKey("code.filepath")) == fileName) - assert(span.getAttributes.get(AttributeKey.longKey("code.lineno")) == lineNumber) - assert(span.getAttributes.get(AttributeKey.stringKey("method.chain")) == methodChain) + func(span) testSpanExporter.reset() } + def checkSpanError(className: String, funcName: String, error: Throwable): Unit = + checkSpan(className, funcName) { span => + { + assert(span.getStatus.getStatusCode == StatusCode.ERROR) + assert(span.getStatus.getDescription == error.getMessage) + assert(span.getEvents.size() == 1) + assert(span.getEvents.get(0).isInstanceOf[ExceptionEventData]) + assert(span.getEvents.get(0).asInstanceOf[ExceptionEventData].getException == error) + } + } + override def afterAll: Unit = { GlobalOpenTelemetry.resetForTest() super.afterAll diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index 47b67a91..a3bc8abc 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -10,4 +10,10 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD") } + test("report error") { + val error = new Exception("test") + OpenTelemetry.reportError("ClassA1", "functionB1", error) + checkSpanError("snow.snowpark.ClassA1", "functionB1", error) + } + }