diff --git a/src/main/java/com/snowflake/snowpark_java/Functions.java b/src/main/java/com/snowflake/snowpark_java/Functions.java index f3fb46eb..56d8d08b 100644 --- a/src/main/java/com/snowflake/snowpark_java/Functions.java +++ b/src/main/java/com/snowflake/snowpark_java/Functions.java @@ -4286,6 +4286,6 @@ private static Session getActiveSession() { private static UserDefinedFunction userDefinedFunction( String funcName, Supplier func) { - return javaUDF("Functions", funcName, "", "", 0, func); + return javaUDF("Functions", funcName, "", "", func); } } diff --git a/src/main/java/com/snowflake/snowpark_java/SProcRegistration.java b/src/main/java/com/snowflake/snowpark_java/SProcRegistration.java index acb6c9e4..d6beb75e 100644 --- a/src/main/java/com/snowflake/snowpark_java/SProcRegistration.java +++ b/src/main/java/com/snowflake/snowpark_java/SProcRegistration.java @@ -2679,14 +2679,14 @@ public Object runLocally(JavaSProc sproc, Object... args) { private StoredProcedure sproc( String funcName, String execName, String execFilePath, Supplier func) { - return javaSProc("SProcRegistration", funcName, execName, execFilePath, 0, func); + return javaSProc("SProcRegistration", funcName, execName, execFilePath, func); } private StoredProcedure sproc(String funcName, String execName, Supplier func) { - return javaSProc("SProcRegistration", funcName, execName, "", 0, func); + return javaSProc("SProcRegistration", funcName, execName, "", func); } private StoredProcedure sproc(String funcName, Supplier func) { - return javaSProc("SProcRegistration", funcName, "", "", 0, func); + return javaSProc("SProcRegistration", funcName, "", "", func); } } diff --git a/src/main/java/com/snowflake/snowpark_java/UDFRegistration.java b/src/main/java/com/snowflake/snowpark_java/UDFRegistration.java index b77581d7..cf339ead 100644 --- a/src/main/java/com/snowflake/snowpark_java/UDFRegistration.java +++ b/src/main/java/com/snowflake/snowpark_java/UDFRegistration.java @@ -2120,15 +2120,15 @@ static com.snowflake.snowpark.types.DataType[] convertDataType(DataType[] javaTy private UserDefinedFunction udf( String funcName, String execName, String execFilePath, Supplier func) { - return javaUDF("UDFRegistration", funcName, execName, execFilePath, 0, func); + return javaUDF("UDFRegistration", funcName, execName, execFilePath, func); } private UserDefinedFunction udf( String funcName, String execName, Supplier func) { - return javaUDF("UDFRegistration", funcName, execName, "", 0, func); + return javaUDF("UDFRegistration", funcName, execName, "", func); } private UserDefinedFunction udf(String funcName, Supplier func) { - return javaUDF("UDFRegistration", funcName, "", "", 0, func); + return javaUDF("UDFRegistration", funcName, "", "", func); } } diff --git a/src/main/java/com/snowflake/snowpark_java/UDTFRegistration.java b/src/main/java/com/snowflake/snowpark_java/UDTFRegistration.java index 13689a7b..5a471c8a 100644 --- a/src/main/java/com/snowflake/snowpark_java/UDTFRegistration.java +++ b/src/main/java/com/snowflake/snowpark_java/UDTFRegistration.java @@ -313,6 +313,6 @@ public TableFunction registerPermanent(String funcName, JavaUDTF udtf, String st private TableFunction tableFunction( String funcName, String execName, String execFilePath, Supplier func) { - return javaUDTF("UDTFRegistration", funcName, execName, execFilePath, 0, func); + return javaUDTF("UDTFRegistration", funcName, execName, execFilePath, func); } } diff --git a/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala b/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala index fa8b8e15..56c67ef3 100644 --- a/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala +++ b/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala @@ -238,7 +238,7 @@ class CopyableDataFrame private[snowpark] ( * @since 0.10.0 * @group basic */ - override def clone: CopyableDataFrame = action("clone", 2) { + override def clone: CopyableDataFrame = action("clone") { new CopyableDataFrame(session, plan, Seq(), stagedFileReader) } @@ -261,13 +261,7 @@ class CopyableDataFrame private[snowpark] ( override def async: CopyableDataFrameAsyncActor = new CopyableDataFrameAsyncActor(this) @inline override protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = this.session.conn.isScalaAPI - OpenTelemetry.action("CopyableDataFrame", funcName, methodChainString, isScala)(func) - } - @inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = { - val isScala: Boolean = this.session.conn.isScalaAPI - OpenTelemetry.action("CopyableDataFrame", funcName, methodChainString, isScala, javaOffset)( - func) + OpenTelemetry.action("CopyableDataFrame", funcName, methodChainString)(func) } } @@ -361,11 +355,9 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame) } @inline override protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = cdf.session.conn.isScalaAPI OpenTelemetry.action( "CopyableDataFrameAsyncActor", funcName, - cdf.methodChainString + ".async", - isScala)(func) + cdf.methodChainString + ".async")(func) } } diff --git a/src/main/scala/com/snowflake/snowpark/DataFrame.scala b/src/main/scala/com/snowflake/snowpark/DataFrame.scala index 8f99693c..e063a736 100644 --- a/src/main/scala/com/snowflake/snowpark/DataFrame.scala +++ b/src/main/scala/com/snowflake/snowpark/DataFrame.scala @@ -3029,8 +3029,7 @@ class DataFrame private[snowpark] ( @inline protected def withPlan(plan: LogicalPlan): DataFrame = DataFrame(session, plan) @inline protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = this.session.conn.isScalaAPI - OpenTelemetry.action("DataFrame", funcName, methodChainString, isScala)(func) + OpenTelemetry.action("DataFrame", funcName, methodChainString)(func) } @inline protected def transformation(funcName: String)(func: => DataFrame): DataFrame = @@ -3108,11 +3107,6 @@ class DataFrameAsyncActor private[snowpark] (df: DataFrame) { } @inline protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = df.session.conn.isScalaAPI - OpenTelemetry.action( - "DataFrameAsyncActor", - funcName, - df.methodChainString + ".async", - isScala)(func) + OpenTelemetry.action("DataFrameAsyncActor", funcName, df.methodChainString + ".async")(func) } } diff --git a/src/main/scala/com/snowflake/snowpark/DataFrameStatFunctions.scala b/src/main/scala/com/snowflake/snowpark/DataFrameStatFunctions.scala index e42ada91..b99153d5 100644 --- a/src/main/scala/com/snowflake/snowpark/DataFrameStatFunctions.scala +++ b/src/main/scala/com/snowflake/snowpark/DataFrameStatFunctions.scala @@ -311,12 +311,7 @@ final class DataFrameStatFunctions private[snowpark] (df: DataFrame) extends Log } @inline protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = df.session.conn.isScalaAPI - OpenTelemetry.action( - "DataFrameStatFunctions", - funcName, - df.methodChainString + ".stat", - isScala)(func) + OpenTelemetry.action("DataFrameStatFunctions", funcName, df.methodChainString + ".stat")(func) } @inline protected def transformation(funcName: String)(func: => DataFrame): DataFrame = DataFrame.buildMethodChain(this.df.methodChain :+ "stat", funcName)(func) diff --git a/src/main/scala/com/snowflake/snowpark/DataFrameWriter.scala b/src/main/scala/com/snowflake/snowpark/DataFrameWriter.scala index a0643f1d..8a060430 100644 --- a/src/main/scala/com/snowflake/snowpark/DataFrameWriter.scala +++ b/src/main/scala/com/snowflake/snowpark/DataFrameWriter.scala @@ -394,8 +394,7 @@ class DataFrameWriter(private[snowpark] val dataFrame: DataFrame) { OpenTelemetry.action( "DataFrameWriter", funcName, - this.dataFrame.methodChainString + ".writer", - isScala)(func) + this.dataFrame.methodChainString + ".writer")(func) } } @@ -490,12 +489,10 @@ class DataFrameWriterAsyncActor private[snowpark] (writer: DataFrameWriter) { } @inline protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = writer.dataFrame.session.conn.isScalaAPI OpenTelemetry.action( "DataFrameWriterAsyncActor", funcName, - writer.dataFrame.methodChainString + ".writer.async", - isScala)(func) + writer.dataFrame.methodChainString + ".writer.async")(func) } } diff --git a/src/main/scala/com/snowflake/snowpark/MergeBuilder.scala b/src/main/scala/com/snowflake/snowpark/MergeBuilder.scala index 146e854a..7fa7e36b 100644 --- a/src/main/scala/com/snowflake/snowpark/MergeBuilder.scala +++ b/src/main/scala/com/snowflake/snowpark/MergeBuilder.scala @@ -204,9 +204,7 @@ class MergeBuilder private[snowpark] ( def async: MergeBuilderAsyncActor = new MergeBuilderAsyncActor(this) @inline protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = target.session.conn.isScalaAPI - OpenTelemetry.action("MergeBuilder", funcName, target.methodChainString + ".merge", isScala)( - func) + OpenTelemetry.action("MergeBuilder", funcName, target.methodChainString + ".merge")(func) } } @@ -231,11 +229,9 @@ class MergeBuilderAsyncActor private[snowpark] (mergeBuilder: MergeBuilder) { } @inline protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = mergeBuilder.target.session.conn.isScalaAPI OpenTelemetry.action( "MergeBuilderAsyncActor", funcName, - mergeBuilder.target.methodChainString + ".merge.async", - isScala)(func) + mergeBuilder.target.methodChainString + ".merge.async")(func) } } diff --git a/src/main/scala/com/snowflake/snowpark/SProcRegistration.scala b/src/main/scala/com/snowflake/snowpark/SProcRegistration.scala index 7520cffd..016ed339 100644 --- a/src/main/scala/com/snowflake/snowpark/SProcRegistration.scala +++ b/src/main/scala/com/snowflake/snowpark/SProcRegistration.scala @@ -3180,7 +3180,6 @@ class SProcRegistration(session: Session) { funcName, execName, s"${UDXRegistrationHandler.className}.${UDXRegistrationHandler.methodName}", - execFilePath, - 0)(func) + execFilePath)(func) } } diff --git a/src/main/scala/com/snowflake/snowpark/UDFRegistration.scala b/src/main/scala/com/snowflake/snowpark/UDFRegistration.scala index 564c2ed2..3fc52d0d 100644 --- a/src/main/scala/com/snowflake/snowpark/UDFRegistration.scala +++ b/src/main/scala/com/snowflake/snowpark/UDFRegistration.scala @@ -2447,7 +2447,6 @@ class UDFRegistration(session: Session) extends Logging { funcName, execName, s"${UDXRegistrationHandler.className}.${UDXRegistrationHandler.methodName}", - execFilePath, - 0)(func) + execFilePath)(func) } } diff --git a/src/main/scala/com/snowflake/snowpark/UDTFRegistration.scala b/src/main/scala/com/snowflake/snowpark/UDTFRegistration.scala index 2f553ee3..cb998529 100644 --- a/src/main/scala/com/snowflake/snowpark/UDTFRegistration.scala +++ b/src/main/scala/com/snowflake/snowpark/UDTFRegistration.scala @@ -230,7 +230,6 @@ class UDTFRegistration(session: Session) extends Logging { funcName, execName, UDXRegistrationHandler.udtfClassName, - execFilePath, - 0)(func) + execFilePath)(func) } } diff --git a/src/main/scala/com/snowflake/snowpark/Updatable.scala b/src/main/scala/com/snowflake/snowpark/Updatable.scala index ca992166..f7dbac81 100644 --- a/src/main/scala/com/snowflake/snowpark/Updatable.scala +++ b/src/main/scala/com/snowflake/snowpark/Updatable.scala @@ -80,7 +80,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update(assignments: Map[Column, Column]): UpdateResult = action("update", 2) { + def update(assignments: Map[Column, Column]): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithColumn(assignments, None, None) Updatable.getUpdateResult(newDf.collect()) } @@ -106,7 +106,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update[T: ClassTag](assignments: Map[String, Column]): UpdateResult = action("update", 2) { + def update[T: ClassTag](assignments: Map[String, Column]): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithString(assignments, None, None) Updatable.getUpdateResult(newDf.collect()) } @@ -128,7 +128,7 @@ class Updatable private[snowpark] ( * @return [[UpdateResult]] */ def update(assignments: Map[Column, Column], condition: Column): UpdateResult = - action("update", 2) { + action("update") { val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), None) Updatable.getUpdateResult(newDf.collect()) } @@ -150,7 +150,7 @@ class Updatable private[snowpark] ( * @return [[UpdateResult]] */ def update[T: ClassTag](assignments: Map[String, Column], condition: Column): UpdateResult = - action("update", 2) { + action("update") { val newDf = getUpdateDataFrameWithString(assignments, Some(condition), None) Updatable.getUpdateResult(newDf.collect()) } @@ -174,7 +174,7 @@ class Updatable private[snowpark] ( def update( assignments: Map[Column, Column], condition: Column, - sourceData: DataFrame): UpdateResult = action("update", 2) { + sourceData: DataFrame): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), Some(sourceData)) Updatable.getUpdateResult(newDf.collect()) } @@ -198,7 +198,7 @@ class Updatable private[snowpark] ( def update[T: ClassTag]( assignments: Map[String, Column], condition: Column, - sourceData: DataFrame): UpdateResult = action("update", 2) { + sourceData: DataFrame): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithString(assignments, Some(condition), Some(sourceData)) Updatable.getUpdateResult(newDf.collect()) } @@ -332,7 +332,7 @@ class Updatable private[snowpark] ( * @since 0.10.0 * @group basic */ - override def clone: Updatable = action("clone", 2) { + override def clone: Updatable = action("clone") { new Updatable(tableName, session, Seq()) } @@ -356,15 +356,8 @@ class Updatable private[snowpark] ( override def async: UpdatableAsyncActor = new UpdatableAsyncActor(this) @inline override protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = this.session.conn.isScalaAPI - OpenTelemetry.action("Updatable", funcName, methodChainString, isScala)(func) + OpenTelemetry.action("Updatable", funcName, methodChainString)(func) } - - @inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = { - val isScala: Boolean = this.session.conn.isScalaAPI - OpenTelemetry.action("Updatable", funcName, methodChainString, isScala, javaOffset)(func) - } - } /** @@ -383,7 +376,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * @since 0.11.0 */ def update(assignments: Map[Column, Column]): TypedAsyncJob[UpdateResult] = - action("update", 2) { + action("update") { val newDf = updatable.getUpdateDataFrameWithColumn(assignments, None, None) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) } @@ -396,7 +389,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * @since 0.11.0 */ def update[T: ClassTag](assignments: Map[String, Column]): TypedAsyncJob[UpdateResult] = - action("update", 2) { + action("update") { val newDf = updatable.getUpdateDataFrameWithString(assignments, None, None) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) } @@ -409,7 +402,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * @since 0.11.0 */ def update(assignments: Map[Column, Column], condition: Column): TypedAsyncJob[UpdateResult] = - action("update", 2) { + action("update") { val newDf = updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), None) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) } @@ -424,7 +417,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) def update[T: ClassTag]( assignments: Map[String, Column], condition: Column): TypedAsyncJob[UpdateResult] = - action("update", 2) { + action("update") { val newDf = updatable.getUpdateDataFrameWithString(assignments, Some(condition), None) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) } @@ -439,7 +432,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) def update( assignments: Map[Column, Column], condition: Column, - sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update", 2) { + sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update") { val newDf = updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), Some(sourceData)) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) @@ -455,7 +448,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) def update[T: ClassTag]( assignments: Map[String, Column], condition: Column, - sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update", 2) { + sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update") { val newDf = updatable.getUpdateDataFrameWithString(assignments, Some(condition), Some(sourceData)) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) @@ -499,21 +492,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) } @inline override protected def action[T](funcName: String)(func: => T): T = { - val isScala: Boolean = updatable.session.conn.isScalaAPI - OpenTelemetry.action( - "UpdatableAsyncActor", - funcName, - updatable.methodChainString + ".async", - isScala)(func) - } - - @inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = { - val isScala: Boolean = updatable.session.conn.isScalaAPI - OpenTelemetry.action( - "UpdatableAsyncActor", - funcName, - updatable.methodChainString + ".async", - isScala, - javaOffset)(func) + OpenTelemetry.action("UpdatableAsyncActor", funcName, updatable.methodChainString + ".async")( + func) } } diff --git a/src/main/scala/com/snowflake/snowpark/functions.scala b/src/main/scala/com/snowflake/snowpark/functions.scala index db5d76a5..a7fd9ff0 100644 --- a/src/main/scala/com/snowflake/snowpark/functions.scala +++ b/src/main/scala/com/snowflake/snowpark/functions.scala @@ -3864,8 +3864,7 @@ object functions { funcName, "", s"${UDXRegistrationHandler.className}.${UDXRegistrationHandler.methodName}", - "", - 0)(func) + "")(func) } } diff --git a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala index 975f79a4..82108d43 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala @@ -21,15 +21,13 @@ object OpenTelemetry extends Logging { funcName: String, execName: String, execFilePath: String, - stackOffset: Int, func: Supplier[JavaUDF]): JavaUDF = { udx( className, funcName, execName, s"${UDXRegistrationHandler.className}.${UDXRegistrationHandler.methodName}", - execFilePath, - stackOffset + 2)(func.get()) + execFilePath)(func.get()) } def javaUDTF( @@ -37,30 +35,22 @@ object OpenTelemetry extends Logging { funcName: String, execName: String, execFilePath: String, - stackOffset: Int, func: Supplier[JavaTableFunction]): JavaTableFunction = { - udx( - className, - funcName, - execName, - UDXRegistrationHandler.udtfClassName, - execFilePath, - stackOffset + 2)(func.get()) + udx(className, funcName, execName, UDXRegistrationHandler.udtfClassName, execFilePath)( + func.get()) } def javaSProc( className: String, funcName: String, execName: String, execFilePath: String, - stackOffset: Int, func: Supplier[JavaSProc]): JavaSProc = { udx( className, funcName, execName, s"${UDXRegistrationHandler.className}.${UDXRegistrationHandler.methodName}", - execFilePath, - stackOffset + 2)(func.get()) + execFilePath)(func.get()) } // Scala API @@ -69,16 +59,13 @@ object OpenTelemetry extends Logging { funcName: String, execName: String, execHandler: String, - execFilePath: String, - stackOffset: Int)(func: => T): T = { + execFilePath: String)(func: => T): T = { try { spanInfo.withValue[T](spanInfo.value match { // empty info means this is the entry of the recursion case None => val stacks = Thread.currentThread().getStackTrace - val index = 4 + stackOffset - val fileName = stacks(index).getFileName - val lineNumber = stacks(index).getLineNumber + val (fileName, lineNumber) = findLineNumber(stacks) Some( UdfInfo( className, @@ -103,20 +90,13 @@ object OpenTelemetry extends Logging { } } // wrapper of all action functions - def action[T]( - className: String, - funcName: String, - methodChain: String, - isScala: Boolean, - javaOffSet: Int = 0)(func: => T): T = { + def action[T](className: String, funcName: String, methodChain: String)(func: => T): T = { try { spanInfo.withValue[T](spanInfo.value match { // empty info means this is the entry of the recursion case None => val stacks = Thread.currentThread().getStackTrace - val index = if (isScala) 4 else 5 + javaOffSet - val fileName = stacks(index).getFileName - val lineNumber = stacks(index).getLineNumber + val (fileName, lineNumber) = findLineNumber(stacks) Some(ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")) // if value is not empty, this function call should be recursion. // do not issue new SpanInfo, use the info inherited from previous. @@ -133,6 +113,30 @@ object OpenTelemetry extends Logging { } } + private def findLineNumber(stacks: Array[StackTraceElement]): (String, Int) = { + var index: Int = 0 + // start with OpenTelemetry class + while (index < stacks.length && stacks(index).getFileName != "OpenTelemetry.scala") { + index += 1 + } + if (index == stacks.length) { + // if can't find open telemetry class, make it N/A + ("N/A", 0) + } else { + while (index < stacks.length && + (stacks(index).getClassName.startsWith("com.snowflake.snowpark.") || + stacks(index).getClassName.startsWith("com.snowflake.snowpark_java."))) { + index += 1 + } + if (index == stacks.length) { + // all class inside of snowpark/snowpark-java package, make it N/A + ("N/A", 0) + } else { + (stacks(index).getFileName, stacks(index).getLineNumber) + } + } + } + def emit(info: SpanInfo): Unit = emit(info.className, info.funcName) { span => {