Skip to content

Commit

Permalink
SNOW-1638551 Start Open Telemetry Span before Process Actions (#153)
Browse files Browse the repository at this point in the history
* fix span time

* add test

* rename function

* empty

* fix error

* fix test

* fix test

* fix test
  • Loading branch information
sfc-gh-bli authored Aug 29, 2024
1 parent e120e0b commit d1058da
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 70 deletions.
117 changes: 51 additions & 66 deletions src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,30 @@ object OpenTelemetry extends Logging {
funcName: String,
execName: String,
execHandler: String,
execFilePath: String)(func: => T): T = {
execFilePath: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newSpan =
UdfInfo(className, funcName, fileName, lineNumber, execName, execHandler, execFilePath)
emitSpan(newSpan, className, funcName, func)
emitSpan(newSpan, thunk)
}
// wrapper of all action functions
def action[T](className: String, funcName: String, methodChain: String)(func: => T): T = {
def action[T](className: String, funcName: String, methodChain: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newInfo =
ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")
emitSpan(newInfo, className, funcName, func)
emitSpan(newInfo, thunk)
}

private def emitSpan[T](span: SpanInfo, className: String, funcName: String, thunk: => T): T = {
try {
spanInfo.value match {
case None =>
spanInfo.withValue(Some(span)) {
val result: T = thunk
// only emit one time, in the top level action
OpenTelemetry.emit(spanInfo.value.get)
result
}
case _ =>
thunk
}
} catch {
case error: Throwable =>
OpenTelemetry.reportError(className, funcName, error)
throw error
private def emitSpan[T](span: SpanInfo, thunk: => T): T = {
spanInfo.value match {
case None =>
spanInfo.withValue(Some(span)) {
span.emit(thunk)
}
case _ =>
thunk
}
}

Expand All @@ -118,58 +109,40 @@ object OpenTelemetry extends Logging {
}
}
}
}
trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int

def emit(info: SpanInfo): Unit =
emit(info.className, info.funcName) { span =>
{
span.setAttribute("code.filepath", info.fileName)
span.setAttribute("code.lineno", info.lineNumber)
info match {
case ActionInfo(_, _, _, _, methodChain) =>
span.setAttribute("method.chain", methodChain)
case UdfInfo(_, _, _, _, execName, execHandler, execFilePath) =>
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
}
lazy private val span =
GlobalOpenTelemetry
.getTracer(s"snow.snowpark.$className")
.spanBuilder(funcName)
.startSpan()

def reportError(className: String, funcName: String, error: Throwable): Unit =
emit(className, funcName) { span =>
{
def emit[T](thunk: => T): T = {
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
span.setAttribute("code.filepath", fileName)
span.setAttribute("code.lineno", lineNumber)
addAdditionalInfo(span)
thunk
} catch {
case error: Exception =>
OpenTelemetry.logWarning(s"Error when acquiring span attributes. ${error.getMessage}")
span.setStatus(StatusCode.ERROR, error.getMessage)
span.recordException(error)
}
}

private def emit(className: String, funcName: String)(report: Span => Unit): Unit = {
val name = s"snow.snowpark.$className"
val tracer = GlobalOpenTelemetry.getTracer(name)
val span = tracer.spanBuilder(funcName).startSpan()
try {
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
report(span)
} catch {
case e: Exception =>
logWarning(s"Error when acquiring span attributes. ${e.getMessage}")
} finally {
scope.close()
}
throw error
} finally {
scope.close()
span.end()
}
}

}

trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int
protected def addAdditionalInfo(span: Span): Unit
}

case class ActionInfo(
Expand All @@ -178,7 +151,12 @@ case class ActionInfo(
override val fileName: String,
override val lineNumber: Int,
methodChain: String)
extends SpanInfo
extends SpanInfo {

override protected def addAdditionalInfo(span: Span): Unit = {
span.setAttribute("method.chain", methodChain)
}
}

case class UdfInfo(
override val className: String,
Expand All @@ -188,4 +166,11 @@ case class UdfInfo(
execName: String,
execHandler: String,
execFilePath: String)
extends SpanInfo
extends SpanInfo {

override protected def addAdditionalInfo(span: Span): Unit = {
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.snowflake.snowpark_test

import com.snowflake.snowpark.{MergeResult, OpenTelemetryEnabled, SaveMode, UpdateResult}
import com.snowflake.snowpark.internal.{OpenTelemetry, ActionInfo}
import com.snowflake.snowpark.{OpenTelemetryEnabled, SaveMode}
import com.snowflake.snowpark.internal.ActionInfo
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.types.{DoubleType, IntegerType, StringType, StructField, StructType}

import java.time.Instant
import java.util

class OpenTelemetrySuite extends OpenTelemetryEnabled {
Expand Down Expand Up @@ -430,13 +431,14 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
}

test("OpenTelemetry.emit") {
OpenTelemetry.emit(ActionInfo("ClassA", "functionB", "fileC", 123, "chainD"))
ActionInfo("ClassA", "functionB", "fileC", 123, "chainD").emit(1)
checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD")
}

test("report error") {
val error = new Exception("test")
OpenTelemetry.reportError("ClassA1", "functionB1", error)
val span = ActionInfo("ClassA1", "functionB1", "", 0, "")
assertThrows[Exception](span.emit(throw error))
checkSpanError("snow.snowpark.ClassA1", "functionB1", error)
}

Expand All @@ -446,6 +448,23 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
assert(l.size() == 1)
}

test("actions should be processed in the span time period") {
val result = ActionInfo("ClassA", "functionB", "fileC", 123, "chainD").emit {
Thread.sleep(1)
val time = System.currentTimeMillis()
Thread.sleep(1)
time
}
val l = testSpanExporter.getFinishedSpanItems
val spanStart = l.get(0).getStartEpochNanos / 1000000
// val spanEnd = l.get(0).getEndEpochNanos / 1000000
assert(spanStart < result)
// it seems like a bug in the Github Action env,
// the end time is always be start time + 100.
// we can't reproduce it locally.
// assert(result < spanEnd)
}

override def beforeAll: Unit = {
super.beforeAll
createStage(stageName1)
Expand Down

0 comments on commit d1058da

Please sign in to comment.