Skip to content

Commit

Permalink
fix span time
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Aug 26, 2024
1 parent 3ab8a52 commit 1f04897
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 58 deletions.
110 changes: 54 additions & 56 deletions src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,34 @@ object OpenTelemetry extends Logging {
funcName: String,
execName: String,
execHandler: String,
execFilePath: String)(func: => T): T = {
execFilePath: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newSpan =
UdfInfo(className, funcName, fileName, lineNumber, execName, execHandler, execFilePath)
emitSpan(newSpan, className, funcName, func)
emitSpan(newSpan, thunk)
}
// wrapper of all action functions
def action[T](className: String, funcName: String, methodChain: String)(func: => T): T = {
def action[T](className: String, funcName: String, methodChain: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newInfo =
ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")
emitSpan(newInfo, className, funcName, func)
emitSpan(newInfo, thunk)
}

private def emitSpan[T](span: SpanInfo, className: String, funcName: String, thunk: => T): T = {
private def emitSpan[T](span: SpanInfo, thunk: => T): T = {
try {
spanInfo.value match {
case None =>
spanInfo.withValue(Some(span)) {
val result: T = thunk
// only emit one time, in the top level action
OpenTelemetry.emit(spanInfo.value.get)
result
span.emitSpan(thunk)
}
case _ =>
thunk
}
} catch {
case error: Throwable =>
OpenTelemetry.reportError(className, funcName, error)
throw error
case error: Throwable => throw span.reportError(error)
}
}

Expand All @@ -118,58 +113,49 @@ object OpenTelemetry extends Logging {
}
}
}
}

def emit(info: SpanInfo): Unit =
emit(info.className, info.funcName) { span =>
{
span.setAttribute("code.filepath", info.fileName)
span.setAttribute("code.lineno", info.lineNumber)
info match {
case ActionInfo(_, _, _, _, methodChain) =>
span.setAttribute("method.chain", methodChain)
case UdfInfo(_, _, _, _, execName, execHandler, execFilePath) =>
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
}
trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int

def reportError(className: String, funcName: String, error: Throwable): Unit =
emit(className, funcName) { span =>
{
span.setStatus(StatusCode.ERROR, error.getMessage)
span.recordException(error)
}
}
lazy private val span =
GlobalOpenTelemetry
.getTracer(s"snow.snowpark.$className")
.spanBuilder(funcName)
.startSpan()

private def emit(className: String, funcName: String)(report: Span => Unit): Unit = {
val name = s"snow.snowpark.$className"
val tracer = GlobalOpenTelemetry.getTracer(name)
val span = tracer.spanBuilder(funcName).startSpan()
private def emit[T](thunk: => T): T = {
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
report(span)
} catch {
case e: Exception =>
logWarning(s"Error when acquiring span attributes. ${e.getMessage}")
} finally {
scope.close()
}
thunk
} catch {
case e: Exception =>
OpenTelemetry.logWarning(s"Error when acquiring span attributes. ${e.getMessage}")
throw e
} finally {
scope.close()
span.end()
}
}

}
protected def withAdditionalInfo(span: Span): Unit

trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int
def emitSpan[T](thunk: => T): T = emit {
span.setAttribute("code.filepath", fileName)
span.setAttribute("code.lineno", lineNumber)
withAdditionalInfo(span)
thunk
}

def reportError(error: Throwable): Throwable = emit {
span.setStatus(StatusCode.ERROR, error.getMessage)
span.recordException(error)
error
}
}

case class ActionInfo(
Expand All @@ -178,7 +164,12 @@ case class ActionInfo(
override val fileName: String,
override val lineNumber: Int,
methodChain: String)
extends SpanInfo
extends SpanInfo {

override protected def withAdditionalInfo(span: Span): Unit = {
span.setAttribute("method.chain", methodChain)
}
}

case class UdfInfo(
override val className: String,
Expand All @@ -188,4 +179,11 @@ case class UdfInfo(
execName: String,
execHandler: String,
execFilePath: String)
extends SpanInfo
extends SpanInfo {

override protected def withAdditionalInfo(span: Span): Unit = {
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,14 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
}

test("OpenTelemetry.emit") {
OpenTelemetry.emit(ActionInfo("ClassA", "functionB", "fileC", 123, "chainD"))
ActionInfo("ClassA", "functionB", "fileC", 123, "chainD").emitSpan(1)
checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD")
}

test("report error") {
val error = new Exception("test")
OpenTelemetry.reportError("ClassA1", "functionB1", error)
val span = ActionInfo("ClassA1", "functionB1", "", 0, "")
span.reportError(error)
checkSpanError("snow.snowpark.ClassA1", "functionB1", error)
}

Expand Down

0 comments on commit 1f04897

Please sign in to comment.