From 1f04897aa760722e1538a8efd420fea2785197c6 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Mon, 26 Aug 2024 14:52:39 -0700 Subject: [PATCH] fix span time --- .../snowpark/internal/OpenTelemetry.scala | 110 +++++++++--------- .../snowpark_test/OpenTelemetrySuite.scala | 5 +- 2 files changed, 57 insertions(+), 58 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala index 8a219847..a8a31322 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala @@ -59,39 +59,34 @@ object OpenTelemetry extends Logging { funcName: String, execName: String, execHandler: String, - execFilePath: String)(func: => T): T = { + execFilePath: String)(thunk: => T): T = { val stacks = Thread.currentThread().getStackTrace val (fileName, lineNumber) = findLineNumber(stacks) val newSpan = UdfInfo(className, funcName, fileName, lineNumber, execName, execHandler, execFilePath) - emitSpan(newSpan, className, funcName, func) + emitSpan(newSpan, thunk) } // wrapper of all action functions - def action[T](className: String, funcName: String, methodChain: String)(func: => T): T = { + def action[T](className: String, funcName: String, methodChain: String)(thunk: => T): T = { val stacks = Thread.currentThread().getStackTrace val (fileName, lineNumber) = findLineNumber(stacks) val newInfo = ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName") - emitSpan(newInfo, className, funcName, func) + emitSpan(newInfo, thunk) } - private def emitSpan[T](span: SpanInfo, className: String, funcName: String, thunk: => T): T = { + private def emitSpan[T](span: SpanInfo, thunk: => T): T = { try { spanInfo.value match { case None => spanInfo.withValue(Some(span)) { - val result: T = thunk - // only emit one time, in the top level action - OpenTelemetry.emit(spanInfo.value.get) - result + span.emitSpan(thunk) } case _ => thunk } } catch { - case error: Throwable => - OpenTelemetry.reportError(className, funcName, error) - throw error + case error: Throwable => throw span.reportError(error) } } @@ -118,58 +113,49 @@ object OpenTelemetry extends Logging { } } } +} - def emit(info: SpanInfo): Unit = - emit(info.className, info.funcName) { span => - { - span.setAttribute("code.filepath", info.fileName) - span.setAttribute("code.lineno", info.lineNumber) - info match { - case ActionInfo(_, _, _, _, methodChain) => - span.setAttribute("method.chain", methodChain) - case UdfInfo(_, _, _, _, execName, execHandler, execFilePath) => - span.setAttribute("snow.executable.name", execName) - span.setAttribute("snow.executable.handler", execHandler) - span.setAttribute("snow.executable.filepath", execFilePath) - } - } - } +trait SpanInfo { + val className: String + val funcName: String + val fileName: String + val lineNumber: Int - def reportError(className: String, funcName: String, error: Throwable): Unit = - emit(className, funcName) { span => - { - span.setStatus(StatusCode.ERROR, error.getMessage) - span.recordException(error) - } - } + lazy private val span = + GlobalOpenTelemetry + .getTracer(s"snow.snowpark.$className") + .spanBuilder(funcName) + .startSpan() - private def emit(className: String, funcName: String)(report: Span => Unit): Unit = { - val name = s"snow.snowpark.$className" - val tracer = GlobalOpenTelemetry.getTracer(name) - val span = tracer.spanBuilder(funcName).startSpan() + private def emit[T](thunk: => T): T = { + val scope = span.makeCurrent() + // Using Manager is not available in Scala 2.12 yet try { - val scope = span.makeCurrent() - // Using Manager is not available in Scala 2.12 yet - try { - report(span) - } catch { - case e: Exception => - logWarning(s"Error when acquiring span attributes. ${e.getMessage}") - } finally { - scope.close() - } + thunk + } catch { + case e: Exception => + OpenTelemetry.logWarning(s"Error when acquiring span attributes. ${e.getMessage}") + throw e } finally { + scope.close() span.end() } } -} + protected def withAdditionalInfo(span: Span): Unit -trait SpanInfo { - val className: String - val funcName: String - val fileName: String - val lineNumber: Int + def emitSpan[T](thunk: => T): T = emit { + span.setAttribute("code.filepath", fileName) + span.setAttribute("code.lineno", lineNumber) + withAdditionalInfo(span) + thunk + } + + def reportError(error: Throwable): Throwable = emit { + span.setStatus(StatusCode.ERROR, error.getMessage) + span.recordException(error) + error + } } case class ActionInfo( @@ -178,7 +164,12 @@ case class ActionInfo( override val fileName: String, override val lineNumber: Int, methodChain: String) - extends SpanInfo + extends SpanInfo { + + override protected def withAdditionalInfo(span: Span): Unit = { + span.setAttribute("method.chain", methodChain) + } +} case class UdfInfo( override val className: String, @@ -188,4 +179,11 @@ case class UdfInfo( execName: String, execHandler: String, execFilePath: String) - extends SpanInfo + extends SpanInfo { + + override protected def withAdditionalInfo(span: Span): Unit = { + span.setAttribute("snow.executable.name", execName) + span.setAttribute("snow.executable.handler", execHandler) + span.setAttribute("snow.executable.filepath", execFilePath) + } +} diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index 15118fd3..83d7a9c5 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -430,13 +430,14 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { } test("OpenTelemetry.emit") { - OpenTelemetry.emit(ActionInfo("ClassA", "functionB", "fileC", 123, "chainD")) + ActionInfo("ClassA", "functionB", "fileC", 123, "chainD").emitSpan(1) checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD") } test("report error") { val error = new Exception("test") - OpenTelemetry.reportError("ClassA1", "functionB1", error) + val span = ActionInfo("ClassA1", "functionB1", "", 0, "") + span.reportError(error) checkSpanError("snow.snowpark.ClassA1", "functionB1", error) }