diff --git a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala index e951081d..6d9d4248 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala @@ -1,6 +1,5 @@ package com.snowflake.snowpark.internal -import com.snowflake.snowpark.DataFrame import io.opentelemetry.api.GlobalOpenTelemetry import io.opentelemetry.api.trace.{Span, StatusCode} @@ -9,7 +8,7 @@ import scala.util.DynamicVariable object OpenTelemetry extends Logging { // only report the top function info in case of recursion. - val spanInfo = new DynamicVariable[Option[SpanInfo]](None) + private val actionInfo = new DynamicVariable[Option[ActionInfo]](None) // wrapper of all action functions def action[T]( @@ -19,20 +18,20 @@ object OpenTelemetry extends Logging { isScala: Boolean, javaOffSet: Int = 0)(func: => T): T = { try { - spanInfo.withValue[T](spanInfo.value match { + actionInfo.withValue[T](actionInfo.value match { // empty info means this is the entry of the recursion case None => val stacks = Thread.currentThread().getStackTrace val index = if (isScala) 4 else 5 + javaOffSet val fileName = stacks(index).getFileName val lineNumber = stacks(index).getLineNumber - Some(SpanInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")) + Some(ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")) // if value is not empty, this function call should be recursion. // do not issue new SpanInfo, use the info inherited from previous. case other => other }) { val result: T = func - OpenTelemetry.emit(spanInfo.value.get) + OpenTelemetry.emit(actionInfo.value.get) result } } catch { @@ -43,28 +42,15 @@ object OpenTelemetry extends Logging { } // class name format: snow.snowpark. // method chain: Dataframe.filter.join.select.collect - def emit( - className: String, - funcName: String, - fileName: String, - lineNumber: Int, - methodChain: String): Unit = - emit(className, funcName) { span => + def emit(spanInfo: ActionInfo): Unit = + emit(spanInfo.className, spanInfo.funcName) { span => { - span.setAttribute("code.filepath", fileName) - span.setAttribute("code.lineno", lineNumber) - span.setAttribute("method.chain", methodChain) + span.setAttribute("code.filepath", spanInfo.fileName) + span.setAttribute("code.lineno", spanInfo.lineNumber) + span.setAttribute("method.chain", spanInfo.methodChain) } } - def emit(spanInfo: SpanInfo): Unit = - emit( - spanInfo.className, - spanInfo.funcName, - spanInfo.fileName, - spanInfo.lineNumber, - spanInfo.methodChain) - def reportError(className: String, funcName: String, error: Throwable): Unit = emit(className, funcName) { span => { @@ -93,13 +79,9 @@ object OpenTelemetry extends Logging { } } - // todo: Snow-1480779 - def buildMethodChain(funcName: String, df: DataFrame): String = { - "" - } } -case class SpanInfo( +case class ActionInfo( className: String, funcName: String, fileName: String, diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index c58cfc35..4e8f6d58 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -1,7 +1,7 @@ package com.snowflake.snowpark_test import com.snowflake.snowpark.{MergeResult, OpenTelemetryEnabled, SaveMode, UpdateResult} -import com.snowflake.snowpark.internal.OpenTelemetry +import com.snowflake.snowpark.internal.{OpenTelemetry, ActionInfo} import com.snowflake.snowpark.functions._ import com.snowflake.snowpark.types.{DoubleType, IntegerType, StringType, StructField, StructType} @@ -430,7 +430,7 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { } test("OpenTelemetry.emit") { - OpenTelemetry.emit("ClassA", "functionB", "fileC", 123, "chainD") + OpenTelemetry.emit(ActionInfo("ClassA", "functionB", "fileC", 123, "chainD")) checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD") }