Skip to content

Commit

Permalink
Merge branch 'main' into snow-1529469
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Jul 25, 2024
2 parents 8e00c92 + d95af71 commit 8d8cbd3
Show file tree
Hide file tree
Showing 15 changed files with 70 additions and 117 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/snowflake/snowpark_java/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -4286,6 +4286,6 @@ private static Session getActiveSession() {

private static UserDefinedFunction userDefinedFunction(
String funcName, Supplier<UserDefinedFunction> func) {
return javaUDF("Functions", funcName, "", "", 0, func);
return javaUDF("Functions", funcName, "", "", func);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2679,14 +2679,14 @@ public Object runLocally(JavaSProc sproc, Object... args) {

private StoredProcedure sproc(
String funcName, String execName, String execFilePath, Supplier<StoredProcedure> func) {
return javaSProc("SProcRegistration", funcName, execName, execFilePath, 0, func);
return javaSProc("SProcRegistration", funcName, execName, execFilePath, func);
}

private StoredProcedure sproc(String funcName, String execName, Supplier<StoredProcedure> func) {
return javaSProc("SProcRegistration", funcName, execName, "", 0, func);
return javaSProc("SProcRegistration", funcName, execName, "", func);
}

private StoredProcedure sproc(String funcName, Supplier<StoredProcedure> func) {
return javaSProc("SProcRegistration", funcName, "", "", 0, func);
return javaSProc("SProcRegistration", funcName, "", "", func);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2120,15 +2120,15 @@ static com.snowflake.snowpark.types.DataType[] convertDataType(DataType[] javaTy

private UserDefinedFunction udf(
String funcName, String execName, String execFilePath, Supplier<UserDefinedFunction> func) {
return javaUDF("UDFRegistration", funcName, execName, execFilePath, 0, func);
return javaUDF("UDFRegistration", funcName, execName, execFilePath, func);
}

private UserDefinedFunction udf(
String funcName, String execName, Supplier<UserDefinedFunction> func) {
return javaUDF("UDFRegistration", funcName, execName, "", 0, func);
return javaUDF("UDFRegistration", funcName, execName, "", func);
}

private UserDefinedFunction udf(String funcName, Supplier<UserDefinedFunction> func) {
return javaUDF("UDFRegistration", funcName, "", "", 0, func);
return javaUDF("UDFRegistration", funcName, "", "", func);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,6 @@ public TableFunction registerPermanent(String funcName, JavaUDTF udtf, String st

private TableFunction tableFunction(
String funcName, String execName, String execFilePath, Supplier<TableFunction> func) {
return javaUDTF("UDTFRegistration", funcName, execName, execFilePath, 0, func);
return javaUDTF("UDTFRegistration", funcName, execName, execFilePath, func);
}
}
14 changes: 3 additions & 11 deletions src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class CopyableDataFrame private[snowpark] (
* @since 0.10.0
* @group basic
*/
override def clone: CopyableDataFrame = action("clone", 2) {
override def clone: CopyableDataFrame = action("clone") {
new CopyableDataFrame(session, plan, Seq(), stagedFileReader)
}

Expand All @@ -261,13 +261,7 @@ class CopyableDataFrame private[snowpark] (
override def async: CopyableDataFrameAsyncActor = new CopyableDataFrameAsyncActor(this)

@inline override protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("CopyableDataFrame", funcName, methodChainString, isScala)(func)
}
@inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("CopyableDataFrame", funcName, methodChainString, isScala, javaOffset)(
func)
OpenTelemetry.action("CopyableDataFrame", funcName, methodChainString)(func)
}
}

Expand Down Expand Up @@ -361,11 +355,9 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
}

@inline override protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = cdf.session.conn.isScalaAPI
OpenTelemetry.action(
"CopyableDataFrameAsyncActor",
funcName,
cdf.methodChainString + ".async",
isScala)(func)
cdf.methodChainString + ".async")(func)
}
}
10 changes: 2 additions & 8 deletions src/main/scala/com/snowflake/snowpark/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3029,8 +3029,7 @@ class DataFrame private[snowpark] (
@inline protected def withPlan(plan: LogicalPlan): DataFrame = DataFrame(session, plan)

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("DataFrame", funcName, methodChainString, isScala)(func)
OpenTelemetry.action("DataFrame", funcName, methodChainString)(func)
}

@inline protected def transformation(funcName: String)(func: => DataFrame): DataFrame =
Expand Down Expand Up @@ -3108,11 +3107,6 @@ class DataFrameAsyncActor private[snowpark] (df: DataFrame) {
}

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = df.session.conn.isScalaAPI
OpenTelemetry.action(
"DataFrameAsyncActor",
funcName,
df.methodChainString + ".async",
isScala)(func)
OpenTelemetry.action("DataFrameAsyncActor", funcName, df.methodChainString + ".async")(func)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,7 @@ final class DataFrameStatFunctions private[snowpark] (df: DataFrame) extends Log
}

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = df.session.conn.isScalaAPI
OpenTelemetry.action(
"DataFrameStatFunctions",
funcName,
df.methodChainString + ".stat",
isScala)(func)
OpenTelemetry.action("DataFrameStatFunctions", funcName, df.methodChainString + ".stat")(func)
}
@inline protected def transformation(funcName: String)(func: => DataFrame): DataFrame =
DataFrame.buildMethodChain(this.df.methodChain :+ "stat", funcName)(func)
Expand Down
7 changes: 2 additions & 5 deletions src/main/scala/com/snowflake/snowpark/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ class DataFrameWriter(private[snowpark] val dataFrame: DataFrame) {
OpenTelemetry.action(
"DataFrameWriter",
funcName,
this.dataFrame.methodChainString + ".writer",
isScala)(func)
this.dataFrame.methodChainString + ".writer")(func)
}

}
Expand Down Expand Up @@ -490,12 +489,10 @@ class DataFrameWriterAsyncActor private[snowpark] (writer: DataFrameWriter) {
}

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = writer.dataFrame.session.conn.isScalaAPI
OpenTelemetry.action(
"DataFrameWriterAsyncActor",
funcName,
writer.dataFrame.methodChainString + ".writer.async",
isScala)(func)
writer.dataFrame.methodChainString + ".writer.async")(func)
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/main/scala/com/snowflake/snowpark/MergeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,7 @@ class MergeBuilder private[snowpark] (
def async: MergeBuilderAsyncActor = new MergeBuilderAsyncActor(this)

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = target.session.conn.isScalaAPI
OpenTelemetry.action("MergeBuilder", funcName, target.methodChainString + ".merge", isScala)(
func)
OpenTelemetry.action("MergeBuilder", funcName, target.methodChainString + ".merge")(func)
}
}

Expand All @@ -231,11 +229,9 @@ class MergeBuilderAsyncActor private[snowpark] (mergeBuilder: MergeBuilder) {
}

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = mergeBuilder.target.session.conn.isScalaAPI
OpenTelemetry.action(
"MergeBuilderAsyncActor",
funcName,
mergeBuilder.target.methodChainString + ".merge.async",
isScala)(func)
mergeBuilder.target.methodChainString + ".merge.async")(func)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3180,7 +3180,6 @@ class SProcRegistration(session: Session) {
funcName,
execName,
s"${UDXRegistrationHandler.className}.${UDXRegistrationHandler.methodName}",
execFilePath,
0)(func)
execFilePath)(func)
}
}
3 changes: 1 addition & 2 deletions src/main/scala/com/snowflake/snowpark/UDFRegistration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2447,7 +2447,6 @@ class UDFRegistration(session: Session) extends Logging {
funcName,
execName,
s"${UDXRegistrationHandler.className}.${UDXRegistrationHandler.methodName}",
execFilePath,
0)(func)
execFilePath)(func)
}
}
3 changes: 1 addition & 2 deletions src/main/scala/com/snowflake/snowpark/UDTFRegistration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ class UDTFRegistration(session: Session) extends Logging {
funcName,
execName,
UDXRegistrationHandler.udtfClassName,
execFilePath,
0)(func)
execFilePath)(func)
}
}
53 changes: 16 additions & 37 deletions src/main/scala/com/snowflake/snowpark/Updatable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class Updatable private[snowpark] (
* @since 0.7.0
* @return [[UpdateResult]]
*/
def update(assignments: Map[Column, Column]): UpdateResult = action("update", 2) {
def update(assignments: Map[Column, Column]): UpdateResult = action("update") {
val newDf = getUpdateDataFrameWithColumn(assignments, None, None)
Updatable.getUpdateResult(newDf.collect())
}
Expand All @@ -106,7 +106,7 @@ class Updatable private[snowpark] (
* @since 0.7.0
* @return [[UpdateResult]]
*/
def update[T: ClassTag](assignments: Map[String, Column]): UpdateResult = action("update", 2) {
def update[T: ClassTag](assignments: Map[String, Column]): UpdateResult = action("update") {
val newDf = getUpdateDataFrameWithString(assignments, None, None)
Updatable.getUpdateResult(newDf.collect())
}
Expand All @@ -128,7 +128,7 @@ class Updatable private[snowpark] (
* @return [[UpdateResult]]
*/
def update(assignments: Map[Column, Column], condition: Column): UpdateResult =
action("update", 2) {
action("update") {
val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), None)
Updatable.getUpdateResult(newDf.collect())
}
Expand All @@ -150,7 +150,7 @@ class Updatable private[snowpark] (
* @return [[UpdateResult]]
*/
def update[T: ClassTag](assignments: Map[String, Column], condition: Column): UpdateResult =
action("update", 2) {
action("update") {
val newDf = getUpdateDataFrameWithString(assignments, Some(condition), None)
Updatable.getUpdateResult(newDf.collect())
}
Expand All @@ -174,7 +174,7 @@ class Updatable private[snowpark] (
def update(
assignments: Map[Column, Column],
condition: Column,
sourceData: DataFrame): UpdateResult = action("update", 2) {
sourceData: DataFrame): UpdateResult = action("update") {
val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), Some(sourceData))
Updatable.getUpdateResult(newDf.collect())
}
Expand All @@ -198,7 +198,7 @@ class Updatable private[snowpark] (
def update[T: ClassTag](
assignments: Map[String, Column],
condition: Column,
sourceData: DataFrame): UpdateResult = action("update", 2) {
sourceData: DataFrame): UpdateResult = action("update") {
val newDf = getUpdateDataFrameWithString(assignments, Some(condition), Some(sourceData))
Updatable.getUpdateResult(newDf.collect())
}
Expand Down Expand Up @@ -332,7 +332,7 @@ class Updatable private[snowpark] (
* @since 0.10.0
* @group basic
*/
override def clone: Updatable = action("clone", 2) {
override def clone: Updatable = action("clone") {
new Updatable(tableName, session, Seq())
}

Expand All @@ -356,15 +356,8 @@ class Updatable private[snowpark] (
override def async: UpdatableAsyncActor = new UpdatableAsyncActor(this)

@inline override protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("Updatable", funcName, methodChainString, isScala)(func)
OpenTelemetry.action("Updatable", funcName, methodChainString)(func)
}

@inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("Updatable", funcName, methodChainString, isScala, javaOffset)(func)
}

}

/**
Expand All @@ -383,7 +376,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable)
* @since 0.11.0
*/
def update(assignments: Map[Column, Column]): TypedAsyncJob[UpdateResult] =
action("update", 2) {
action("update") {
val newDf = updatable.getUpdateDataFrameWithColumn(assignments, None, None)
updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan)
}
Expand All @@ -396,7 +389,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable)
* @since 0.11.0
*/
def update[T: ClassTag](assignments: Map[String, Column]): TypedAsyncJob[UpdateResult] =
action("update", 2) {
action("update") {
val newDf = updatable.getUpdateDataFrameWithString(assignments, None, None)
updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan)
}
Expand All @@ -409,7 +402,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable)
* @since 0.11.0
*/
def update(assignments: Map[Column, Column], condition: Column): TypedAsyncJob[UpdateResult] =
action("update", 2) {
action("update") {
val newDf = updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), None)
updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan)
}
Expand All @@ -424,7 +417,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable)
def update[T: ClassTag](
assignments: Map[String, Column],
condition: Column): TypedAsyncJob[UpdateResult] =
action("update", 2) {
action("update") {
val newDf = updatable.getUpdateDataFrameWithString(assignments, Some(condition), None)
updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan)
}
Expand All @@ -439,7 +432,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable)
def update(
assignments: Map[Column, Column],
condition: Column,
sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update", 2) {
sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update") {
val newDf =
updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), Some(sourceData))
updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan)
Expand All @@ -455,7 +448,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable)
def update[T: ClassTag](
assignments: Map[String, Column],
condition: Column,
sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update", 2) {
sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update") {
val newDf =
updatable.getUpdateDataFrameWithString(assignments, Some(condition), Some(sourceData))
updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan)
Expand Down Expand Up @@ -499,21 +492,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable)
}

@inline override protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = updatable.session.conn.isScalaAPI
OpenTelemetry.action(
"UpdatableAsyncActor",
funcName,
updatable.methodChainString + ".async",
isScala)(func)
}

@inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = {
val isScala: Boolean = updatable.session.conn.isScalaAPI
OpenTelemetry.action(
"UpdatableAsyncActor",
funcName,
updatable.methodChainString + ".async",
isScala,
javaOffset)(func)
OpenTelemetry.action("UpdatableAsyncActor", funcName, updatable.methodChainString + ".async")(
func)
}
}
3 changes: 1 addition & 2 deletions src/main/scala/com/snowflake/snowpark/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3864,8 +3864,7 @@ object functions {
funcName,
"",
s"${UDXRegistrationHandler.className}.${UDXRegistrationHandler.methodName}",
"",
0)(func)
"")(func)
}

}
Loading

0 comments on commit 8d8cbd3

Please sign in to comment.