Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1638551 Start Open Telemetry Span before Process Actions #153

Merged
merged 9 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 54 additions & 57 deletions src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,34 @@ object OpenTelemetry extends Logging {
funcName: String,
execName: String,
execHandler: String,
execFilePath: String)(func: => T): T = {
execFilePath: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newSpan =
UdfInfo(className, funcName, fileName, lineNumber, execName, execHandler, execFilePath)
emitSpan(newSpan, className, funcName, func)
emitSpan(newSpan, thunk)
}
// wrapper of all action functions
def action[T](className: String, funcName: String, methodChain: String)(func: => T): T = {
def action[T](className: String, funcName: String, methodChain: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newInfo =
ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")
emitSpan(newInfo, className, funcName, func)
emitSpan(newInfo, thunk)
}

private def emitSpan[T](span: SpanInfo, className: String, funcName: String, thunk: => T): T = {
private def emitSpan[T](span: SpanInfo, thunk: => T): T = {
try {
spanInfo.value match {
case None =>
spanInfo.withValue(Some(span)) {
val result: T = thunk
// only emit one time, in the top level action
OpenTelemetry.emit(spanInfo.value.get)
result
span.emitSpan(thunk)
}
case _ =>
thunk
}
} catch {
case error: Throwable =>
OpenTelemetry.reportError(className, funcName, error)
throw error
case error: Throwable => throw span.reportError(error)
sfc-gh-enie marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -118,58 +113,48 @@ object OpenTelemetry extends Logging {
}
}
}
}
trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int

def emit(info: SpanInfo): Unit =
emit(info.className, info.funcName) { span =>
{
span.setAttribute("code.filepath", info.fileName)
span.setAttribute("code.lineno", info.lineNumber)
info match {
case ActionInfo(_, _, _, _, methodChain) =>
span.setAttribute("method.chain", methodChain)
case UdfInfo(_, _, _, _, execName, execHandler, execFilePath) =>
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
}

def reportError(className: String, funcName: String, error: Throwable): Unit =
emit(className, funcName) { span =>
{
span.setStatus(StatusCode.ERROR, error.getMessage)
span.recordException(error)
}
}
lazy private val span =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't init until emit function call.

GlobalOpenTelemetry
.getTracer(s"snow.snowpark.$className")
.spanBuilder(funcName)
.startSpan()

private def emit(className: String, funcName: String)(report: Span => Unit): Unit = {
val name = s"snow.snowpark.$className"
val tracer = GlobalOpenTelemetry.getTracer(name)
val span = tracer.spanBuilder(funcName).startSpan()
private def emit[T](thunk: => T): T = {
sfc-gh-enie marked this conversation as resolved.
Show resolved Hide resolved
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
report(span)
} catch {
case e: Exception =>
logWarning(s"Error when acquiring span attributes. ${e.getMessage}")
} finally {
scope.close()
}
thunk
} catch {
case e: Exception =>
OpenTelemetry.logWarning(s"Error when acquiring span attributes. ${e.getMessage}")
throw e
} finally {
scope.close()
span.end()
}
}

}
protected def addAdditionalInfo(span: Span): Unit

trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int
def emitSpan[T](thunk: => T): T = emit {
span.setAttribute("code.filepath", fileName)
span.setAttribute("code.lineno", lineNumber)
addAdditionalInfo(span)
Copy link
Collaborator Author

@sfc-gh-bli sfc-gh-bli Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the child classes only need to implement addAdditionalInfo method.

thunk
}

def reportError(error: Throwable): Throwable = emit {
span.setStatus(StatusCode.ERROR, error.getMessage)
span.recordException(error)
error
}
}

case class ActionInfo(
Expand All @@ -178,7 +163,12 @@ case class ActionInfo(
override val fileName: String,
override val lineNumber: Int,
methodChain: String)
extends SpanInfo
extends SpanInfo {

override protected def addAdditionalInfo(span: Span): Unit = {
span.setAttribute("method.chain", methodChain)
}
}

case class UdfInfo(
override val className: String,
Expand All @@ -188,4 +178,11 @@ case class UdfInfo(
execName: String,
execHandler: String,
execFilePath: String)
extends SpanInfo
extends SpanInfo {

override protected def addAdditionalInfo(span: Span): Unit = {
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,14 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
}

test("OpenTelemetry.emit") {
OpenTelemetry.emit(ActionInfo("ClassA", "functionB", "fileC", 123, "chainD"))
ActionInfo("ClassA", "functionB", "fileC", 123, "chainD").emitSpan(1)
checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD")
}

test("report error") {
val error = new Exception("test")
OpenTelemetry.reportError("ClassA1", "functionB1", error)
val span = ActionInfo("ClassA1", "functionB1", "", 0, "")
span.reportError(error)
checkSpanError("snow.snowpark.ClassA1", "functionB1", error)
}

Expand All @@ -446,6 +447,16 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
assert(l.size() == 1)
}

test("actions should be processed in the span time period") {
val result = session.sql("select current_timestamp()").collect().head.getTimestamp(0)
sfc-gh-enie marked this conversation as resolved.
Show resolved Hide resolved
val l = testSpanExporter.getFinishedSpanItems
val spanStart = l.get(0).getStartEpochNanos / 1000000
val time = result.getTime
val spanEnd = l.get(0).getEndEpochNanos / 1000000
assert(spanStart < time)
assert(time < spanEnd)
}

override def beforeAll: Unit = {
super.beforeAll
createStage(stageName1)
Expand Down
Loading