-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1490100 Create Implement OpenTelemetry API and Span Function #113
Changes from all commits
7f62163
6ac3efc
b390d45
3eb4e71
afd7893
ceb001a
c09929e
2bb99b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package com.snowflake.snowpark.internal | ||
|
||
import com.snowflake.snowpark.DataFrame | ||
import io.opentelemetry.api.GlobalOpenTelemetry | ||
import io.opentelemetry.api.trace.{Span, StatusCode} | ||
|
||
object OpenTelemetry extends Logging { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One question about how OpenTelemetry trace action function, in Python, our implementation wrap around the action function. I am asking this because we also want to record any exception happened in the span that is not caused by open telemetry, the example to record it is here: https://opentelemetry.io/docs/languages/java/instrumentation/#set-span-status There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
// class name format: snow.snowpark.<class name> | ||
// method chain: Dataframe.filter.join.select.collect | ||
// todo: track line number in SNOW-1480775 | ||
def emit( | ||
className: String, | ||
funcName: String, | ||
fileName: String, | ||
lineNumber: Int, | ||
methodChain: String): Unit = | ||
emit(className, funcName) { span => | ||
{ | ||
span.setAttribute("code.filepath", fileName) | ||
span.setAttribute("code.lineno", lineNumber) | ||
span.setAttribute("method.chain", methodChain) | ||
} | ||
} | ||
|
||
def reportError(className: String, funcName: String, error: Throwable): Unit = | ||
emit(className, funcName) { span => | ||
{ | ||
span.setStatus(StatusCode.ERROR, error.getMessage) | ||
span.recordException(error) | ||
} | ||
} | ||
|
||
private def emit(className: String, funcName: String)(report: Span => Unit): Unit = { | ||
val name = s"snow.snowpark.$className" | ||
val tracer = GlobalOpenTelemetry.getTracer(name) | ||
val span = tracer.spanBuilder(funcName).startSpan() | ||
try { | ||
val scope = span.makeCurrent() | ||
// Using Manager is not available in Scala 2.12 yet | ||
try { | ||
report(span) | ||
} catch { | ||
case e: Exception => | ||
logWarning(s"Error when acquiring span attributes. ${e.getMessage}") | ||
} finally { | ||
scope.close() | ||
} | ||
} finally { | ||
span.end() | ||
} | ||
} | ||
|
||
// todo: Snow-1480779 | ||
def buildMethodChain(funcName: String, df: DataFrame): String = { | ||
"" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package com.snowflake.snowpark | ||
|
||
import io.opentelemetry.api.GlobalOpenTelemetry | ||
import io.opentelemetry.api.common.AttributeKey | ||
import io.opentelemetry.api.trace.StatusCode | ||
import io.opentelemetry.exporters.inmemory.InMemorySpanExporter | ||
import io.opentelemetry.sdk.OpenTelemetrySdk | ||
import io.opentelemetry.sdk.resources.Resource | ||
import io.opentelemetry.sdk.trace.SdkTracerProvider | ||
import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor | ||
import io.opentelemetry.sdk.trace.data.SpanData | ||
import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData | ||
|
||
trait OpenTelemetryEnabled extends SNTestBase { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Create a new trait for OpenTelemetry related tests. |
||
lazy protected val testSpanExporter: InMemorySpanExporter = InMemorySpanExporter.create() | ||
|
||
override def beforeAll: Unit = { | ||
super.beforeAll | ||
val resource = Resource.getDefault.toBuilder | ||
.put("service.name", "test-server") | ||
.put("service.version", "0.1.0") | ||
.build() | ||
|
||
val sdkTracerProvider = SdkTracerProvider | ||
.builder() | ||
.addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)) | ||
.setResource(resource) | ||
.build() | ||
|
||
OpenTelemetrySdk | ||
.builder() | ||
.setTracerProvider(sdkTracerProvider) | ||
.buildAndRegisterGlobal() | ||
} | ||
|
||
def checkSpan( | ||
className: String, | ||
funcName: String, | ||
fileName: String, | ||
lineNumber: Int, | ||
methodChain: String): Unit = | ||
checkSpan(className, funcName) { span => | ||
{ | ||
assert(span.getTotalAttributeCount == 3) | ||
assert(span.getAttributes.get(AttributeKey.stringKey("code.filepath")) == fileName) | ||
assert(span.getAttributes.get(AttributeKey.longKey("code.lineno")) == lineNumber) | ||
assert(span.getAttributes.get(AttributeKey.stringKey("method.chain")) == methodChain) | ||
} | ||
} | ||
|
||
def checkSpan(className: String, funcName: String)(func: SpanData => Unit): Unit = { | ||
val span: SpanData = testSpanExporter.getFinishedSpanItems.get(0) | ||
assert(span.getName == funcName) | ||
assert(span.getInstrumentationScopeInfo.getName == className) | ||
func(span) | ||
testSpanExporter.reset() | ||
} | ||
|
||
def checkSpanError(className: String, funcName: String, error: Throwable): Unit = | ||
checkSpan(className, funcName) { span => | ||
{ | ||
assert(span.getStatus.getStatusCode == StatusCode.ERROR) | ||
assert(span.getStatus.getDescription == error.getMessage) | ||
assert(span.getEvents.size() == 1) | ||
assert(span.getEvents.get(0).isInstanceOf[ExceptionEventData]) | ||
assert(span.getEvents.get(0).asInstanceOf[ExceptionEventData].getException == error) | ||
} | ||
} | ||
|
||
override def afterAll: Unit = { | ||
GlobalOpenTelemetry.resetForTest() | ||
super.afterAll | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package com.snowflake.snowpark_test | ||
|
||
import com.snowflake.snowpark.OpenTelemetryEnabled | ||
import com.snowflake.snowpark.internal.OpenTelemetry | ||
|
||
class OpenTelemetrySuite extends OpenTelemetryEnabled { | ||
|
||
test("OpenTelemetry.emit") { | ||
OpenTelemetry.emit("ClassA", "functionB", "fileC", 123, "chainD") | ||
checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD") | ||
} | ||
|
||
test("report error") { | ||
val error = new Exception("test") | ||
OpenTelemetry.reportError("ClassA1", "functionB1", error) | ||
checkSpanError("snow.snowpark.ClassA1", "functionB1", error) | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question here: how is the version control of a specific library done in Java? Does this code mean that user have to use opentelemetry-bom==1.39.0? Is there a way to limit it like >1.0.0, <2.0.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When compile, Maven while load opentelemetry 1.39.0 to compile the project. But during runtime, it actually works with any compatible dependencies.