Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1638551 Start Open Telemetry Span before Process Actions #153

Merged
merged 9 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 51 additions & 66 deletions src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,30 @@ object OpenTelemetry extends Logging {
funcName: String,
execName: String,
execHandler: String,
execFilePath: String)(func: => T): T = {
execFilePath: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newSpan =
UdfInfo(className, funcName, fileName, lineNumber, execName, execHandler, execFilePath)
emitSpan(newSpan, className, funcName, func)
emitSpan(newSpan, thunk)
}
// wrapper of all action functions
def action[T](className: String, funcName: String, methodChain: String)(func: => T): T = {
def action[T](className: String, funcName: String, methodChain: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newInfo =
ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")
emitSpan(newInfo, className, funcName, func)
emitSpan(newInfo, thunk)
}

private def emitSpan[T](span: SpanInfo, className: String, funcName: String, thunk: => T): T = {
try {
spanInfo.value match {
case None =>
spanInfo.withValue(Some(span)) {
val result: T = thunk
// only emit one time, in the top level action
OpenTelemetry.emit(spanInfo.value.get)
result
}
case _ =>
thunk
}
} catch {
case error: Throwable =>
OpenTelemetry.reportError(className, funcName, error)
throw error
private def emitSpan[T](span: SpanInfo, thunk: => T): T = {
spanInfo.value match {
case None =>
spanInfo.withValue(Some(span)) {
span.emit(thunk)
}
case _ =>
thunk
}
}

Expand All @@ -118,58 +109,40 @@ object OpenTelemetry extends Logging {
}
}
}
}
trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int

def emit(info: SpanInfo): Unit =
emit(info.className, info.funcName) { span =>
{
span.setAttribute("code.filepath", info.fileName)
span.setAttribute("code.lineno", info.lineNumber)
info match {
case ActionInfo(_, _, _, _, methodChain) =>
span.setAttribute("method.chain", methodChain)
case UdfInfo(_, _, _, _, execName, execHandler, execFilePath) =>
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
}
lazy private val span =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't init until emit function call.

GlobalOpenTelemetry
.getTracer(s"snow.snowpark.$className")
.spanBuilder(funcName)
.startSpan()

def reportError(className: String, funcName: String, error: Throwable): Unit =
emit(className, funcName) { span =>
{
def emit[T](thunk: => T): T = {
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
span.setAttribute("code.filepath", fileName)
span.setAttribute("code.lineno", lineNumber)
addAdditionalInfo(span)
thunk
} catch {
case error: Exception =>
OpenTelemetry.logWarning(s"Error when acquiring span attributes. ${error.getMessage}")
span.setStatus(StatusCode.ERROR, error.getMessage)
span.recordException(error)
}
}

private def emit(className: String, funcName: String)(report: Span => Unit): Unit = {
val name = s"snow.snowpark.$className"
val tracer = GlobalOpenTelemetry.getTracer(name)
val span = tracer.spanBuilder(funcName).startSpan()
try {
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
report(span)
} catch {
case e: Exception =>
logWarning(s"Error when acquiring span attributes. ${e.getMessage}")
} finally {
scope.close()
}
throw error
} finally {
scope.close()
span.end()
}
}

}

trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int
protected def addAdditionalInfo(span: Span): Unit
}

case class ActionInfo(
Expand All @@ -178,7 +151,12 @@ case class ActionInfo(
override val fileName: String,
override val lineNumber: Int,
methodChain: String)
extends SpanInfo
extends SpanInfo {

override protected def addAdditionalInfo(span: Span): Unit = {
span.setAttribute("method.chain", methodChain)
}
}

case class UdfInfo(
override val className: String,
Expand All @@ -188,4 +166,11 @@ case class UdfInfo(
execName: String,
execHandler: String,
execFilePath: String)
extends SpanInfo
extends SpanInfo {

override protected def addAdditionalInfo(span: Span): Unit = {
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.snowflake.snowpark_test

import com.snowflake.snowpark.{MergeResult, OpenTelemetryEnabled, SaveMode, UpdateResult}
import com.snowflake.snowpark.internal.{OpenTelemetry, ActionInfo}
import com.snowflake.snowpark.{OpenTelemetryEnabled, SaveMode}
import com.snowflake.snowpark.internal.ActionInfo
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.types.{DoubleType, IntegerType, StringType, StructField, StructType}

import java.time.Instant
import java.util

class OpenTelemetrySuite extends OpenTelemetryEnabled {
Expand Down Expand Up @@ -430,13 +431,14 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
}

test("OpenTelemetry.emit") {
OpenTelemetry.emit(ActionInfo("ClassA", "functionB", "fileC", 123, "chainD"))
ActionInfo("ClassA", "functionB", "fileC", 123, "chainD").emit(1)
checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD")
}

test("report error") {
val error = new Exception("test")
OpenTelemetry.reportError("ClassA1", "functionB1", error)
val span = ActionInfo("ClassA1", "functionB1", "", 0, "")
assertThrows[Exception](span.emit(throw error))
checkSpanError("snow.snowpark.ClassA1", "functionB1", error)
}

Expand All @@ -446,6 +448,33 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
assert(l.size() == 1)
}

test("actions should be processed in the span time period") {
val time1 = System.currentTimeMillis()
val result = ActionInfo("ClassA", "functionB", "fileC", 123, "chainD").emit {
Thread.sleep(1)
val time = System.currentTimeMillis()
Thread.sleep(1)
time
}
val time2 = System.currentTimeMillis()
val l = testSpanExporter.getFinishedSpanItems
val spanStart = l.get(0).getStartEpochNanos / 1000000
val spanEnd = l.get(0).getEndEpochNanos / 1000000
// scalastyle:off
println(
s"""
|XXXXX
|time1: $time1
|time2: $time2
|result: $result
|spanStart: $spanStart
|spanEnd: $spanEnd
|""".stripMargin)
// scalastyle:on
assert(spanStart < result)
assert(result < spanEnd)
}

override def beforeAll: Unit = {
super.beforeAll
createStage(stageName1)
Expand Down
Loading