Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Jul 19, 2024
1 parent 16dce45 commit 23b06ac
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 30 deletions.
38 changes: 10 additions & 28 deletions src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.snowflake.snowpark.internal

import com.snowflake.snowpark.DataFrame
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.{Span, StatusCode}

Expand All @@ -9,7 +8,7 @@ import scala.util.DynamicVariable
object OpenTelemetry extends Logging {

// only report the top function info in case of recursion.
val spanInfo = new DynamicVariable[Option[SpanInfo]](None)
private val actionInfo = new DynamicVariable[Option[ActionInfo]](None)

// wrapper of all action functions
def action[T](
Expand All @@ -19,20 +18,20 @@ object OpenTelemetry extends Logging {
isScala: Boolean,
javaOffSet: Int = 0)(func: => T): T = {
try {
spanInfo.withValue[T](spanInfo.value match {
actionInfo.withValue[T](actionInfo.value match {
// empty info means this is the entry of the recursion
case None =>
val stacks = Thread.currentThread().getStackTrace
val index = if (isScala) 4 else 5 + javaOffSet
val fileName = stacks(index).getFileName
val lineNumber = stacks(index).getLineNumber
Some(SpanInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName"))
Some(ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName"))
// if value is not empty, this function call should be recursion.
// do not issue new SpanInfo, use the info inherited from previous.
case other => other
}) {
val result: T = func
OpenTelemetry.emit(spanInfo.value.get)
OpenTelemetry.emit(actionInfo.value.get)
result
}
} catch {
Expand All @@ -43,28 +42,15 @@ object OpenTelemetry extends Logging {
}
// class name format: snow.snowpark.<class name>
// method chain: Dataframe.filter.join.select.collect
def emit(
className: String,
funcName: String,
fileName: String,
lineNumber: Int,
methodChain: String): Unit =
emit(className, funcName) { span =>
def emit(spanInfo: ActionInfo): Unit =
emit(spanInfo.className, spanInfo.funcName) { span =>
{
span.setAttribute("code.filepath", fileName)
span.setAttribute("code.lineno", lineNumber)
span.setAttribute("method.chain", methodChain)
span.setAttribute("code.filepath", spanInfo.fileName)
span.setAttribute("code.lineno", spanInfo.lineNumber)
span.setAttribute("method.chain", spanInfo.methodChain)
}
}

def emit(spanInfo: SpanInfo): Unit =
emit(
spanInfo.className,
spanInfo.funcName,
spanInfo.fileName,
spanInfo.lineNumber,
spanInfo.methodChain)

def reportError(className: String, funcName: String, error: Throwable): Unit =
emit(className, funcName) { span =>
{
Expand Down Expand Up @@ -93,13 +79,9 @@ object OpenTelemetry extends Logging {
}
}

// todo: Snow-1480779
def buildMethodChain(funcName: String, df: DataFrame): String = {
""
}
}

case class SpanInfo(
case class ActionInfo(
className: String,
funcName: String,
fileName: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.snowflake.snowpark_test

import com.snowflake.snowpark.{MergeResult, OpenTelemetryEnabled, SaveMode, UpdateResult}
import com.snowflake.snowpark.internal.OpenTelemetry
import com.snowflake.snowpark.internal.{OpenTelemetry, ActionInfo}
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.types.{DoubleType, IntegerType, StringType, StructField, StructType}

Expand Down Expand Up @@ -430,7 +430,7 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
}

test("OpenTelemetry.emit") {
OpenTelemetry.emit("ClassA", "functionB", "fileC", 123, "chainD")
OpenTelemetry.emit(ActionInfo("ClassA", "functionB", "fileC", 123, "chainD"))
checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD")
}

Expand Down

0 comments on commit 23b06ac

Please sign in to comment.