From e5c3b3b8b747ad5648afda8d50d16e6b4e85459b Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Jul 2024 16:09:30 -0700 Subject: [PATCH 1/9] copyableDataFrame --- .../snowpark/CopyableDataFrame.scala | 20 +- .../snowpark_test/OpenTelemetrySuite.scala | 217 +++++++----------- 2 files changed, 97 insertions(+), 140 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala b/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala index b3cde262..39382c5f 100644 --- a/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala +++ b/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala @@ -40,8 +40,9 @@ class CopyableDataFrame private[snowpark] ( * @param tableName Name of the table where the data should be saved. * @since 0.9.0 */ - def copyInto(tableName: String): Unit = + def copyInto(tableName: String): Unit = action("copyInto") { getCopyDataFrame(tableName, Seq.empty, Seq.empty, Map.empty).collect() + } // scalastyle:off line.size.limit /** @@ -82,8 +83,9 @@ class CopyableDataFrame private[snowpark] ( * @since 0.9.0 */ // scalastyle:on line.size.limit - def copyInto(tableName: String, transformations: Seq[Column]): Unit = + def copyInto(tableName: String, transformations: Seq[Column]): Unit = action("copyInto") { getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty).collect() + } // scalastyle:off line.size.limit /** @@ -136,7 +138,9 @@ class CopyableDataFrame private[snowpark] ( */ // scalastyle:on line.size.limit def copyInto(tableName: String, transformations: Seq[Column], options: Map[String, Any]): Unit = - getCopyDataFrame(tableName, Seq.empty, transformations, options).collect() + action("copyInto") { + getCopyDataFrame(tableName, Seq.empty, transformations, options).collect() + } // scalastyle:off line.size.limit /** @@ -198,8 +202,9 @@ class CopyableDataFrame private[snowpark] ( tableName: String, targetColumnNames: Seq[String], transformations: Seq[Column], - options: Map[String, Any]): Unit = + options: Map[String, Any]): Unit = action("copyInto") { getCopyDataFrame(tableName, targetColumnNames, transformations, options).collect() + } // Internal function to create plan for COPY private[snowpark] def getCopyDataFrame( @@ -232,8 +237,9 @@ class CopyableDataFrame private[snowpark] ( * @since 0.10.0 * @group basic */ - override def clone: CopyableDataFrame = + override def clone: CopyableDataFrame = action("clone") { new CopyableDataFrame(session, plan, stagedFileReader) + } /** * Returns a [[CopyableDataFrameAsyncActor]] object that can be used to execute @@ -253,6 +259,10 @@ class CopyableDataFrame private[snowpark] ( */ override def async: CopyableDataFrameAsyncActor = new CopyableDataFrameAsyncActor(this) + @inline override protected def action[T](funcName: String)(func: => T): T = { + val isScala: Boolean = this.session.conn.isScalaAPI + OpenTelemetry.action("CopyableDataFrame", funcName, isScala)(func) + } } /** diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index 9e2ead33..6ba90a61 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -3,55 +3,55 @@ package com.snowflake.snowpark_test import com.snowflake.snowpark.OpenTelemetryEnabled import com.snowflake.snowpark.internal.OpenTelemetry import com.snowflake.snowpark.functions._ +import com.snowflake.snowpark.types.{DoubleType, IntegerType, StringType, StructField, StructType} + import java.util class OpenTelemetrySuite extends OpenTelemetryEnabled { - // do not add test before line number tests - // it verifies code line numbers test("line number - collect") { session.sql("select 1").collect() - checkSpan("snow.snowpark.DataFrame", "collect", "OpenTelemetrySuite.scala", 12, "") + checkSpan("snow.snowpark.DataFrame", "collect", "") } test("line number - randomSplit") { session.sql("select * from values(1),(2),(3) as t(num)").randomSplit(Array(0.5, 0.5)) - checkSpan("snow.snowpark.DataFrame", "randomSplit", "OpenTelemetrySuite.scala", 17, "") + checkSpan("snow.snowpark.DataFrame", "randomSplit", "") } test("line number - first") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.first() - checkSpan("snow.snowpark.DataFrame", "first", "OpenTelemetrySuite.scala", 23, "") + checkSpan("snow.snowpark.DataFrame", "first", "") df.first(2) - checkSpan("snow.snowpark.DataFrame", "first", "OpenTelemetrySuite.scala", 25, "") + checkSpan("snow.snowpark.DataFrame", "first", "") } test("line number - cacheResult") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.cacheResult() - checkSpan("snow.snowpark.DataFrame", "cacheResult", "OpenTelemetrySuite.scala", 31, "") + checkSpan("snow.snowpark.DataFrame", "cacheResult", "") } test("line number - toLocalIterator") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.toLocalIterator - checkSpan("snow.snowpark.DataFrame", "toLocalIterator", "OpenTelemetrySuite.scala", 37, "") + checkSpan("snow.snowpark.DataFrame", "toLocalIterator", "") } test("line number - count") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.count() - checkSpan("snow.snowpark.DataFrame", "count", "OpenTelemetrySuite.scala", 43, "") + checkSpan("snow.snowpark.DataFrame", "count", "") } test("line number - show") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.show() - checkSpan("snow.snowpark.DataFrame", "show", "OpenTelemetrySuite.scala", 49, "") + checkSpan("snow.snowpark.DataFrame", "show", "") df.show(1) - checkSpan("snow.snowpark.DataFrame", "show", "OpenTelemetrySuite.scala", 51, "") + checkSpan("snow.snowpark.DataFrame", "show", "") df.show(1, 10) - checkSpan("snow.snowpark.DataFrame", "show", "OpenTelemetrySuite.scala", 53, "") + checkSpan("snow.snowpark.DataFrame", "show", "") } test("line number - createOrReplaceView") { @@ -59,23 +59,13 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val name = randomName() try { df.createOrReplaceView(name) - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceView", - "OpenTelemetrySuite.scala", - 61, - "") + checkSpan("snow.snowpark.DataFrame", "createOrReplaceView", "") } finally { dropView(name) } try { df.createOrReplaceView(Seq(name)) - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceView", - "OpenTelemetrySuite.scala", - 72, - "") + checkSpan("snow.snowpark.DataFrame", "createOrReplaceView", "") } finally { dropView(name) } @@ -84,12 +74,7 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val list: java.util.List[String] = new util.ArrayList[String](1) list.add(name) df.createOrReplaceView(list) - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceView", - "OpenTelemetrySuite.scala", - 86, - "") + checkSpan("snow.snowpark.DataFrame", "createOrReplaceView", "") } finally { dropView(name) } @@ -100,23 +85,13 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val name = randomName() try { df.createOrReplaceTempView(name) - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceTempView", - "OpenTelemetrySuite.scala", - 102, - "") + checkSpan("snow.snowpark.DataFrame", "createOrReplaceTempView", "") } finally { dropView(name) } try { df.createOrReplaceTempView(Seq(name)) - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceTempView", - "OpenTelemetrySuite.scala", - 113, - "") + checkSpan("snow.snowpark.DataFrame", "createOrReplaceTempView", "") } finally { dropView(name) } @@ -125,12 +100,7 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val list: java.util.List[String] = new util.ArrayList[String](1) list.add(name) df.createOrReplaceTempView(list) - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceTempView", - "OpenTelemetrySuite.scala", - 127, - "") + checkSpan("snow.snowpark.DataFrame", "createOrReplaceTempView", "") } finally { dropView(name) } @@ -139,80 +109,60 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { test("line number - HasCachedResult") { val df = session.sql("select * from values(1),(2),(3) as t(num)") val cached = df.cacheResult() - checkSpan("snow.snowpark.DataFrame", "cacheResult", "OpenTelemetrySuite.scala", 141, "") + checkSpan("snow.snowpark.DataFrame", "cacheResult", "") cached.cacheResult() - checkSpan("snow.snowpark.DataFrame", "cacheResult", "OpenTelemetrySuite.scala", 143, "") + checkSpan("snow.snowpark.DataFrame", "cacheResult", "") } test("line number - DataFrameAsyncActor") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.async.count() - checkSpan("snow.snowpark.DataFrameAsyncActor", "count", "OpenTelemetrySuite.scala", 149, "") + checkSpan("snow.snowpark.DataFrameAsyncActor", "count", "") df.async.collect() - checkSpan("snow.snowpark.DataFrameAsyncActor", "collect", "OpenTelemetrySuite.scala", 151, "") + checkSpan("snow.snowpark.DataFrameAsyncActor", "collect", "") df.async.toLocalIterator() - checkSpan( - "snow.snowpark.DataFrameAsyncActor", - "toLocalIterator", - "OpenTelemetrySuite.scala", - 153, - "") + checkSpan("snow.snowpark.DataFrameAsyncActor", "toLocalIterator", "") } test("line number - DataFrameStatFunctions - corr") { import session.implicits._ val df = Seq((0.1, 0.5), (0.2, 0.6), (0.3, 0.7)).toDF("a", "b") df.stat.corr("a", "b") - checkSpan("snow.snowpark.DataFrameStatFunctions", "corr", "OpenTelemetrySuite.scala", 165, "") + checkSpan("snow.snowpark.DataFrameStatFunctions", "corr", "") } test("line number - DataFrameStatFunctions - cov") { import session.implicits._ val df = Seq((0.1, 0.5), (0.2, 0.6), (0.3, 0.7)).toDF("a", "b") df.stat.cov("a", "b") - checkSpan("snow.snowpark.DataFrameStatFunctions", "cov", "OpenTelemetrySuite.scala", 172, "") + checkSpan("snow.snowpark.DataFrameStatFunctions", "cov", "") } test("line number - DataFrameStatFunctions - approxQuantile") { import session.implicits._ val df = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 0).toDF("a") df.stat.approxQuantile("a", Array(0, 0.1, 0.4, 0.6, 1)) - checkSpan( - "snow.snowpark.DataFrameStatFunctions", - "approxQuantile", - "OpenTelemetrySuite.scala", - 179, - "") + checkSpan("snow.snowpark.DataFrameStatFunctions", "approxQuantile", "") } test("line number - DataFrameStatFunctions - approxQuantile 2") { import session.implicits._ val df = Seq((0.1, 0.5), (0.2, 0.6), (0.3, 0.7)).toDF("a", "b") df.stat.approxQuantile(Array("a", "b"), Array(0, 0.1, 0.6)) - checkSpan( - "snow.snowpark.DataFrameStatFunctions", - "approxQuantile", - "OpenTelemetrySuite.scala", - 191, - "") + checkSpan("snow.snowpark.DataFrameStatFunctions", "approxQuantile", "") } test("line number - DataFrameStatFunctions - crosstab") { import session.implicits._ val df = Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2), (3, 3)).toDF("key", "value") df.stat.crosstab("key", "value") - checkSpan( - "snow.snowpark.DataFrameStatFunctions", - "crosstab", - "OpenTelemetrySuite.scala", - 203, - "") + checkSpan("snow.snowpark.DataFrameStatFunctions", "crosstab", "") } test("line number - DataFrameWriter - csv") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.write.csv(s"@$stageName1/csv1") - checkSpan("snow.snowpark.DataFrameWriter", "csv", "OpenTelemetrySuite.scala", 214, "") + checkSpan("snow.snowpark.DataFrameWriter", "csv", "") } test("line number - DataFrameWriter - json") { @@ -220,13 +170,13 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val df = Seq((1, 1.1, "a"), (2, 2.2, "b")).toDF("a", "b", "c") val df2 = df.select(array_construct(df.schema.names.map(df(_)): _*)) df2.write.option("compression", "none").json(s"@$stageName1/json1") - checkSpan("snow.snowpark.DataFrameWriter", "json", "OpenTelemetrySuite.scala", 222, "") + checkSpan("snow.snowpark.DataFrameWriter", "json", "") } test("line number - DataFrameWriter - parquet") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.write.parquet(s"@$stageName1/parquet1") - checkSpan("snow.snowpark.DataFrameWriter", "parquet", "OpenTelemetrySuite.scala", 228, "") + checkSpan("snow.snowpark.DataFrameWriter", "parquet", "") } test("line number - DataFrameWriter - saveAsTable") { @@ -234,23 +184,13 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val tableName = randomName() try { df.write.saveAsTable(tableName) - checkSpan( - "snow.snowpark.DataFrameWriter", - "saveAsTable", - "OpenTelemetrySuite.scala", - 236, - "") + checkSpan("snow.snowpark.DataFrameWriter", "saveAsTable", "") } finally { dropTable(tableName) } try { df.write.saveAsTable(Seq(tableName)) - checkSpan( - "snow.snowpark.DataFrameWriter", - "saveAsTable", - "OpenTelemetrySuite.scala", - 247, - "") + checkSpan("snow.snowpark.DataFrameWriter", "saveAsTable", "") } finally { dropTable(tableName) } @@ -258,12 +198,7 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val list = new util.ArrayList[String](1) list.add(tableName) df.write.saveAsTable(tableName) - checkSpan( - "snow.snowpark.DataFrameWriter", - "saveAsTable", - "OpenTelemetrySuite.scala", - 260, - "") + checkSpan("snow.snowpark.DataFrameWriter", "saveAsTable", "") } finally { dropTable(tableName) } @@ -274,23 +209,13 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val tableName = randomName() try { df.write.async.saveAsTable(tableName).getResult() - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "saveAsTable", - "OpenTelemetrySuite.scala", - 276, - "") + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "saveAsTable", "") } finally { dropTable(tableName) } try { df.write.async.saveAsTable(Seq(tableName)).getResult() - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "saveAsTable", - "OpenTelemetrySuite.scala", - 287, - "") + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "saveAsTable", "") } finally { dropTable(tableName) } @@ -298,12 +223,7 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val list = new util.ArrayList[String](1) list.add(tableName) df.write.async.saveAsTable(tableName).getResult() - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "saveAsTable", - "OpenTelemetrySuite.scala", - 300, - "") + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "saveAsTable", "") } finally { dropTable(tableName) } @@ -312,12 +232,7 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { test("line number - DataFrameWriterAsyncActor - csv") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.write.async.csv(s"@$stageName1/csv2").getResult() - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "csv", - "OpenTelemetrySuite.scala", - 314, - "") + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "csv", "") } test("line number - DataFrameWriterAsyncActor - json") { @@ -325,23 +240,44 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { val df = Seq((1, 1.1, "a"), (2, 2.2, "b")).toDF("a", "b", "c") val df2 = df.select(array_construct(df.schema.names.map(df(_)): _*)) df2.write.option("compression", "none").async.json(s"@$stageName1/json2") - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "json", - "OpenTelemetrySuite.scala", - 327, - "") + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "json", "") } test("line number - DataFrameWriterAsyncActor - parquet") { val df = session.sql("select * from values(1),(2),(3) as t(num)") df.write.async.parquet(s"@$stageName1/parquet2") - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "parquet", - "OpenTelemetrySuite.scala", - 338, - "") + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "parquet", "") + } + + test("line number - CopyableDataFrame") { + val stageName = randomName() + val tableName = randomName() + val userSchema: StructType = StructType( + Seq( + StructField("a", IntegerType), + StructField("b", StringType), + StructField("c", DoubleType))) + try { + createStage(stageName) + uploadFileToStage(stageName, testFileCsv, compress = false) + createTable(tableName, "a Int, b String, c Double") + val testFileOnStage = s"@$stageName/$testFileCsv" + testSpanExporter.reset() + val df = session.read.schema(userSchema).csv(testFileOnStage) + df.copyInto(tableName) + checkSpan("snow.snowpark.CopyableDataFrame", "copyInto", "") + df.copyInto(tableName, Seq(col("$1"), col("$2"), col("$3"))) + checkSpan("snow.snowpark.CopyableDataFrame", "copyInto", "") + df.copyInto(tableName, Seq(col("$1"), col("$2"), col("$3")), Map("FORCE" -> "TRUE")) + checkSpan("snow.snowpark.CopyableDataFrame", "copyInto", "") + df.copyInto(tableName, Seq("a", "b", "c"), Seq(col("$1"), col("$2"), col("$3")), Map.empty) + checkSpan("snow.snowpark.CopyableDataFrame", "copyInto", "") + df.clone() + checkSpan("snow.snowpark.CopyableDataFrame", "clone", "") + } finally { + dropStage(stageName) + dropTable(tableName) + } } test("OpenTelemetry.emit") { @@ -366,4 +302,15 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { } private val stageName1 = randomName() + + def checkSpan(className: String, funcName: String, methodChain: String): Unit = { + val stack = Thread.currentThread().getStackTrace + val file = stack(2) // this file + checkSpan( + className, + funcName, + "OpenTelemetrySuite.scala", + file.getLineNumber - 1, + methodChain) + } } From 798e35b9eea629623e6106f9bb7239c0ae48f6e5 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Jul 2024 16:16:59 -0700 Subject: [PATCH 2/9] CopyableDataFrameAsyncActor --- .../snowpark/CopyableDataFrame.scala | 19 +++++++----- .../snowpark_test/OpenTelemetrySuite.scala | 30 +++++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala b/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala index 39382c5f..6e8eda5f 100644 --- a/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala +++ b/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala @@ -281,7 +281,7 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame) * and get the results. * @since 0.11.0 */ - def copyInto(tableName: String): TypedAsyncJob[Unit] = { + def copyInto(tableName: String): TypedAsyncJob[Unit] = action("copyInto") { val df = cdf.getCopyDataFrame(tableName) cdf.session.conn.executeAsync[Unit](df.snowflakePlan) } @@ -298,10 +298,11 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame) * @since 0.11.0 */ // scalastyle:on line.size.limit - def copyInto(tableName: String, transformations: Seq[Column]): TypedAsyncJob[Unit] = { - val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty) - cdf.session.conn.executeAsync[Unit](df.snowflakePlan) - } + def copyInto(tableName: String, transformations: Seq[Column]): TypedAsyncJob[Unit] = + action("copyInto") { + val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty) + cdf.session.conn.executeAsync[Unit](df.snowflakePlan) + } // scalastyle:off line.size.limit /** @@ -322,7 +323,7 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame) def copyInto( tableName: String, transformations: Seq[Column], - options: Map[String, Any]): TypedAsyncJob[Unit] = { + options: Map[String, Any]): TypedAsyncJob[Unit] = action("copyInto") { val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, options) cdf.session.conn.executeAsync[Unit](df.snowflakePlan) } @@ -348,9 +349,13 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame) tableName: String, targetColumnNames: Seq[String], transformations: Seq[Column], - options: Map[String, Any]): TypedAsyncJob[Unit] = { + options: Map[String, Any]): TypedAsyncJob[Unit] = action("copyInto") { val df = cdf.getCopyDataFrame(tableName, targetColumnNames, transformations, options) cdf.session.conn.executeAsync[Unit](df.snowflakePlan) } + @inline override protected def action[T](funcName: String)(func: => T): T = { + val isScala: Boolean = cdf.session.conn.isScalaAPI + OpenTelemetry.action("CopyableDataFrameAsyncActor", funcName, isScala)(func) + } } diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index 6ba90a61..06597841 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -280,6 +280,36 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { } } + test("line number - CopyableDataFrameAsyncActor") { + val stageName = randomName() + val tableName = randomName() + val userSchema: StructType = StructType( + Seq( + StructField("a", IntegerType), + StructField("b", StringType), + StructField("c", DoubleType))) + try { + createStage(stageName) + uploadFileToStage(stageName, testFileCsv, compress = false) + createTable(tableName, "a Int, b String, c Double") + val testFileOnStage = s"@$stageName/$testFileCsv" + testSpanExporter.reset() + val df = session.read.schema(userSchema).csv(testFileOnStage) + df.async.copyInto(tableName).getResult() + checkSpan("snow.snowpark.CopyableDataFrameAsyncActor", "copyInto", "") + df.async.copyInto(tableName, Seq(col("$1"), col("$2"), col("$3"))).getResult() + checkSpan("snow.snowpark.CopyableDataFrameAsyncActor", "copyInto", "") + val seq1 = Seq(col("$1"), col("$2"), col("$3")) + df.async.copyInto(tableName, seq1, Map("FORCE" -> "TRUE")).getResult() + checkSpan("snow.snowpark.CopyableDataFrameAsyncActor", "copyInto", "") + df.async.copyInto(tableName, Seq("a", "b", "c"), seq1, Map.empty).getResult() + checkSpan("snow.snowpark.CopyableDataFrameAsyncActor", "copyInto", "") + } finally { + dropStage(stageName) + dropTable(tableName) + } + } + test("OpenTelemetry.emit") { OpenTelemetry.emit("ClassA", "functionB", "fileC", 123, "chainD") checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD") From 18489d1a3683e876dff3525fa06535db93371bb6 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Jul 2024 16:33:51 -0700 Subject: [PATCH 3/9] update --- .../com/snowflake/snowpark/Updatable.scala | 20 ++++++++----- .../snowpark/OpenTelemetryEnabled.scala | 2 +- .../snowpark_test/OpenTelemetrySuite.scala | 29 ++++++++++++++++++- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/Updatable.scala b/src/main/scala/com/snowflake/snowpark/Updatable.scala index 2ab3cfc9..d6866fd6 100644 --- a/src/main/scala/com/snowflake/snowpark/Updatable.scala +++ b/src/main/scala/com/snowflake/snowpark/Updatable.scala @@ -1,6 +1,6 @@ package com.snowflake.snowpark -import com.snowflake.snowpark.internal.Logging +import com.snowflake.snowpark.internal.{Logging, OpenTelemetry} import com.snowflake.snowpark.internal.analyzer._ import scala.reflect.ClassTag @@ -76,7 +76,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update(assignments: Map[Column, Column]): UpdateResult = { + def update(assignments: Map[Column, Column]): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithColumn(assignments, None, None) Updatable.getUpdateResult(newDf.collect()) } @@ -102,7 +102,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update[T: ClassTag](assignments: Map[String, Column]): UpdateResult = { + def update[T: ClassTag](assignments: Map[String, Column]): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithString(assignments, None, None) Updatable.getUpdateResult(newDf.collect()) } @@ -123,7 +123,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update(assignments: Map[Column, Column], condition: Column): UpdateResult = { + def update(assignments: Map[Column, Column], condition: Column): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), None) Updatable.getUpdateResult(newDf.collect()) } @@ -144,7 +144,8 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update[T: ClassTag](assignments: Map[String, Column], condition: Column): UpdateResult = { + def update[T: ClassTag](assignments: Map[String, Column], condition: Column): UpdateResult = + action("update") { val newDf = getUpdateDataFrameWithString(assignments, Some(condition), None) Updatable.getUpdateResult(newDf.collect()) } @@ -168,7 +169,7 @@ class Updatable private[snowpark] ( def update( assignments: Map[Column, Column], condition: Column, - sourceData: DataFrame): UpdateResult = { + sourceData: DataFrame): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), Some(sourceData)) Updatable.getUpdateResult(newDf.collect()) } @@ -192,7 +193,7 @@ class Updatable private[snowpark] ( def update[T: ClassTag]( assignments: Map[String, Column], condition: Column, - sourceData: DataFrame): UpdateResult = { + sourceData: DataFrame): UpdateResult = action("update") { val newDf = getUpdateDataFrameWithString(assignments, Some(condition), Some(sourceData)) Updatable.getUpdateResult(newDf.collect()) } @@ -347,6 +348,11 @@ class Updatable private[snowpark] ( */ override def async: UpdatableAsyncActor = new UpdatableAsyncActor(this) + @inline override protected def action[T](funcName: String)(func: => T): T = { + val isScala: Boolean = this.session.conn.isScalaAPI + OpenTelemetry.action("Updatable", funcName, isScala)(func) + } + } /** diff --git a/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala b/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala index 3bc34165..949014ce 100644 --- a/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala +++ b/src/test/scala/com/snowflake/snowpark/OpenTelemetryEnabled.scala @@ -11,7 +11,7 @@ import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData -trait OpenTelemetryEnabled extends SNTestBase { +trait OpenTelemetryEnabled extends TestData { lazy protected val testSpanExporter: InMemorySpanExporter = InMemorySpanExporter.create() override def beforeAll: Unit = { diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index 06597841..feac0a76 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -1,6 +1,6 @@ package com.snowflake.snowpark_test -import com.snowflake.snowpark.OpenTelemetryEnabled +import com.snowflake.snowpark.{OpenTelemetryEnabled, SaveMode, UpdateResult} import com.snowflake.snowpark.internal.OpenTelemetry import com.snowflake.snowpark.functions._ import com.snowflake.snowpark.types.{DoubleType, IntegerType, StringType, StructField, StructType} @@ -310,6 +310,33 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { } } + test("line number - updatable") { + val tableName = randomName() + val tableName2 = randomName() + try { + testData2.write.mode(SaveMode.Overwrite).saveAsTable(tableName) + val updatable = session.table(tableName) + upperCaseData.write.mode(SaveMode.Overwrite).saveAsTable(tableName2) + val t2 = session.table(tableName2) + testSpanExporter.reset() + updatable.update(Map(col("a") -> lit(1), col("b") -> lit(0))) + checkSpan("snow.snowpark.Updatable", "update", "") + updatable.update(Map("b" -> (col("a") + col("b")))) + checkSpan("snow.snowpark.Updatable", "update", "") + updatable.update(Map(col("b") -> lit(0)), col("a") === 1) + checkSpan("snow.snowpark.Updatable", "update", "") + updatable.update(Map("b" -> lit(0)), col("a") === 1) + checkSpan("snow.snowpark.Updatable", "update", "") + t2.update(Map(col("n") -> lit(0)), updatable("a") === t2("n"), updatable) + checkSpan("snow.snowpark.Updatable", "update", "") + t2.update(Map("n" -> lit(0)), updatable("a") === t2("n"), updatable) + checkSpan("snow.snowpark.Updatable", "update", "") + } finally { + dropTable(tableName) + dropTable(tableName2) + } + } + test("OpenTelemetry.emit") { OpenTelemetry.emit("ClassA", "functionB", "fileC", 123, "chainD") checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD") From 176d9847ae3883db06168f49fe5c04713a3c1b1a Mon Sep 17 00:00:00 2001 From: Bing Li Date: Thu, 11 Jul 2024 14:07:24 -0700 Subject: [PATCH 4/9] UpdatableAsyncActor --- .../com/snowflake/snowpark/Updatable.scala | 82 +++++++++++-------- .../snowpark_test/OpenTelemetrySuite.scala | 43 ++++++++++ 2 files changed, 90 insertions(+), 35 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/Updatable.scala b/src/main/scala/com/snowflake/snowpark/Updatable.scala index d6866fd6..cdc47535 100644 --- a/src/main/scala/com/snowflake/snowpark/Updatable.scala +++ b/src/main/scala/com/snowflake/snowpark/Updatable.scala @@ -123,10 +123,11 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update(assignments: Map[Column, Column], condition: Column): UpdateResult = action("update") { - val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), None) - Updatable.getUpdateResult(newDf.collect()) - } + def update(assignments: Map[Column, Column], condition: Column): UpdateResult = + action("update") { + val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), None) + Updatable.getUpdateResult(newDf.collect()) + } /** * Updates all rows in the updatable that satisfy specified condition with specified assignments @@ -146,9 +147,9 @@ class Updatable private[snowpark] ( */ def update[T: ClassTag](assignments: Map[String, Column], condition: Column): UpdateResult = action("update") { - val newDf = getUpdateDataFrameWithString(assignments, Some(condition), None) - Updatable.getUpdateResult(newDf.collect()) - } + val newDf = getUpdateDataFrameWithString(assignments, Some(condition), None) + Updatable.getUpdateResult(newDf.collect()) + } /** * Updates all rows in the updatable that satisfy specified condition where condition includes @@ -235,7 +236,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[DeleteResult]] */ - def delete(): DeleteResult = { + def delete(): DeleteResult = action("delete") { val newDf = getDeleteDataFrame(None, None) Updatable.getDeleteResult(newDf.collect()) } @@ -255,7 +256,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[DeleteResult]] */ - def delete(condition: Column): DeleteResult = { + def delete(condition: Column): DeleteResult = action("delete") { val newDf = getDeleteDataFrame(Some(condition), None) Updatable.getDeleteResult(newDf.collect()) } @@ -276,7 +277,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[DeleteResult]] */ - def delete(condition: Column, sourceData: DataFrame): DeleteResult = { + def delete(condition: Column, sourceData: DataFrame): DeleteResult = action("delete") { val newDf = getDeleteDataFrame(Some(condition), Some(sourceData)) Updatable.getDeleteResult(newDf.collect()) } @@ -327,7 +328,9 @@ class Updatable private[snowpark] ( * @since 0.10.0 * @group basic */ - override def clone: Updatable = new Updatable(tableName, session) + override def clone: Updatable = action("clone") { + new Updatable(tableName, session) + } /** * Returns an [[UpdatableAsyncActor]] object that can be used to execute @@ -370,10 +373,11 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * and get the results. * @since 0.11.0 */ - def update(assignments: Map[Column, Column]): TypedAsyncJob[UpdateResult] = { - val newDf = updatable.getUpdateDataFrameWithColumn(assignments, None, None) - updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) - } + def update(assignments: Map[Column, Column]): TypedAsyncJob[UpdateResult] = + action("update") { + val newDf = updatable.getUpdateDataFrameWithColumn(assignments, None, None) + updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) + } /** * Executes `Updatable.update` asynchronously. @@ -382,10 +386,11 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * and get the results. * @since 0.11.0 */ - def update[T: ClassTag](assignments: Map[String, Column]): TypedAsyncJob[UpdateResult] = { - val newDf = updatable.getUpdateDataFrameWithString(assignments, None, None) - updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) - } + def update[T: ClassTag](assignments: Map[String, Column]): TypedAsyncJob[UpdateResult] = + action("update") { + val newDf = updatable.getUpdateDataFrameWithString(assignments, None, None) + updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) + } /** * Executes `Updatable.update` asynchronously. @@ -394,10 +399,11 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * and get the results. * @since 0.11.0 */ - def update(assignments: Map[Column, Column], condition: Column): TypedAsyncJob[UpdateResult] = { - val newDf = updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), None) - updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) - } + def update(assignments: Map[Column, Column], condition: Column): TypedAsyncJob[UpdateResult] = + action("update") { + val newDf = updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), None) + updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) + } /** * Executes `Updatable.update` asynchronously. @@ -408,10 +414,11 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) */ def update[T: ClassTag]( assignments: Map[String, Column], - condition: Column): TypedAsyncJob[UpdateResult] = { - val newDf = updatable.getUpdateDataFrameWithString(assignments, Some(condition), None) - updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) - } + condition: Column): TypedAsyncJob[UpdateResult] = + action("update") { + val newDf = updatable.getUpdateDataFrameWithString(assignments, Some(condition), None) + updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) + } /** * Executes `Updatable.update` asynchronously. @@ -423,7 +430,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) def update( assignments: Map[Column, Column], condition: Column, - sourceData: DataFrame): TypedAsyncJob[UpdateResult] = { + sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update") { val newDf = updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), Some(sourceData)) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) @@ -439,7 +446,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) def update[T: ClassTag]( assignments: Map[String, Column], condition: Column, - sourceData: DataFrame): TypedAsyncJob[UpdateResult] = { + sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update") { val newDf = updatable.getUpdateDataFrameWithString(assignments, Some(condition), Some(sourceData)) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) @@ -452,7 +459,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * and get the results. * @since 0.11.0 */ - def delete(): TypedAsyncJob[DeleteResult] = { + def delete(): TypedAsyncJob[DeleteResult] = action("delete") { val newDf = updatable.getDeleteDataFrame(None, None) updatable.session.conn.executeAsync[DeleteResult](newDf.snowflakePlan) } @@ -464,7 +471,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * and get the results. * @since 0.11.0 */ - def delete(condition: Column): TypedAsyncJob[DeleteResult] = { + def delete(condition: Column): TypedAsyncJob[DeleteResult] = action("delete") { val newDf = updatable.getDeleteDataFrame(Some(condition), None) updatable.session.conn.executeAsync[DeleteResult](newDf.snowflakePlan) } @@ -476,9 +483,14 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * and get the results. * @since 0.11.0 */ - def delete(condition: Column, sourceData: DataFrame): TypedAsyncJob[DeleteResult] = { - val newDf = updatable.getDeleteDataFrame(Some(condition), Some(sourceData)) - updatable.session.conn.executeAsync[DeleteResult](newDf.snowflakePlan) - } + def delete(condition: Column, sourceData: DataFrame): TypedAsyncJob[DeleteResult] = + action("delete") { + val newDf = updatable.getDeleteDataFrame(Some(condition), Some(sourceData)) + updatable.session.conn.executeAsync[DeleteResult](newDf.snowflakePlan) + } + @inline override protected def action[T](funcName: String)(func: => T): T = { + val isScala: Boolean = updatable.session.conn.isScalaAPI + OpenTelemetry.action("UpdatableAsyncActor", funcName, isScala)(func) + } } diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index feac0a76..24d2c650 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -331,12 +331,55 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { checkSpan("snow.snowpark.Updatable", "update", "") t2.update(Map("n" -> lit(0)), updatable("a") === t2("n"), updatable) checkSpan("snow.snowpark.Updatable", "update", "") + updatable.delete() + checkSpan("snow.snowpark.Updatable", "delete", "") + updatable.delete(col("a") === 1 && col("b") === 2) + checkSpan("snow.snowpark.Updatable", "delete", "") + t2.delete(updatable("a") === t2("n"), updatable) + checkSpan("snow.snowpark.Updatable", "delete", "") + updatable.clone + checkSpan("snow.snowpark.Updatable", "clone", "") } finally { dropTable(tableName) dropTable(tableName2) } } + test("line number - UpdatableAsyncActor") { + val tableName = randomName() + val tableName2 = randomName() + try { + testData2.write.mode(SaveMode.Overwrite).saveAsTable(tableName) + val updatable = session.table(tableName) + upperCaseData.write.mode(SaveMode.Overwrite).saveAsTable(tableName2) + val t2 = session.table(tableName2) + testSpanExporter.reset() + updatable.async.update(Map(col("a") -> lit(1), col("b") -> lit(0))).getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "update", "") + updatable.async.update(Map("b" -> (col("a") + col("b")))).getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "update", "") + updatable.async.update(Map(col("b") -> lit(0)), col("a") === 1).getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "update", "") + updatable.async.update(Map("b" -> lit(0)), col("a") === 1).getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "update", "") + t2.async.update(Map(col("n") -> lit(0)), updatable("a") === t2("n"), updatable).getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "update", "") + t2.async.update(Map("n" -> lit(0)), updatable("a") === t2("n"), updatable).getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "update", "") + updatable.async.delete().getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "delete", "") + updatable.async.delete(col("a") === 1 && col("b") === 2).getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "delete", "") + t2.async.delete(updatable("a") === t2("n"), updatable).getResult() + checkSpan("snow.snowpark.UpdatableAsyncActor", "delete", "") + } finally { + dropTable(tableName) + dropTable(tableName2) + } + } + + // mergerBuilder and async + test("OpenTelemetry.emit") { OpenTelemetry.emit("ClassA", "functionB", "fileC", 123, "chainD") checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD") From d8644d00ef73a909482b967fbdd60a6174218fd2 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Thu, 11 Jul 2024 14:21:13 -0700 Subject: [PATCH 5/9] builder --- .../com/snowflake/snowpark/MergeBuilder.scala | 16 +++++-- .../snowpark_test/OpenTelemetrySuite.scala | 42 ++++++++++++++++++- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/MergeBuilder.scala b/src/main/scala/com/snowflake/snowpark/MergeBuilder.scala index ffb17484..08ca51d6 100644 --- a/src/main/scala/com/snowflake/snowpark/MergeBuilder.scala +++ b/src/main/scala/com/snowflake/snowpark/MergeBuilder.scala @@ -1,6 +1,6 @@ package com.snowflake.snowpark -import com.snowflake.snowpark.internal.ErrorMessage +import com.snowflake.snowpark.internal.{ErrorMessage, OpenTelemetry} import com.snowflake.snowpark.internal.analyzer.{MergeExpression, TableMerge} /** @@ -167,7 +167,7 @@ class MergeBuilder private[snowpark] ( * @since 0.7.0 * @return [[MergeResult]] */ - def collect(): MergeResult = { + def collect(): MergeResult = action("collect") { val rows = getMergeDataFrame().collect() MergeBuilder.getMergeResult(rows, this) } @@ -202,6 +202,11 @@ class MergeBuilder private[snowpark] ( * @return A [[MergeBuilderAsyncActor]] object */ def async: MergeBuilderAsyncActor = new MergeBuilderAsyncActor(this) + + @inline protected def action[T](funcName: String)(func: => T): T = { + val isScala: Boolean = target.session.conn.isScalaAPI + OpenTelemetry.action("MergeBuilder", funcName, isScala)(func) + } } /** @@ -218,9 +223,14 @@ class MergeBuilderAsyncActor private[snowpark] (mergeBuilder: MergeBuilder) { * and get the results. * @since 1.3.0 */ - def collect(): TypedAsyncJob[MergeResult] = { + def collect(): TypedAsyncJob[MergeResult] = action("collect") { val newDf = mergeBuilder.getMergeDataFrame() mergeBuilder.target.session.conn .executeAsync[MergeResult](newDf.snowflakePlan, Some(mergeBuilder)) } + + @inline protected def action[T](funcName: String)(func: => T): T = { + val isScala: Boolean = mergeBuilder.target.session.conn.isScalaAPI + OpenTelemetry.action("MergeBuilderAsyncActor", funcName, isScala)(func) + } } diff --git a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala index 24d2c650..81c2161e 100644 --- a/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/OpenTelemetrySuite.scala @@ -1,6 +1,6 @@ package com.snowflake.snowpark_test -import com.snowflake.snowpark.{OpenTelemetryEnabled, SaveMode, UpdateResult} +import com.snowflake.snowpark.{MergeResult, OpenTelemetryEnabled, SaveMode, UpdateResult} import com.snowflake.snowpark.internal.OpenTelemetry import com.snowflake.snowpark.functions._ import com.snowflake.snowpark.types.{DoubleType, IntegerType, StringType, StructField, StructType} @@ -378,7 +378,45 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled { } } - // mergerBuilder and async + test("line number - MergeBuilder") { + val tableName = randomName() + try { + import session.implicits._ + val targetDF = Seq((10, "old"), (10, "too_old"), (11, "old")).toDF("id", "desc") + targetDF.write.mode(SaveMode.Overwrite).saveAsTable(tableName) + val target = session.table(tableName) + val source = Seq((10, "new")).toDF("id", "desc") + testSpanExporter.reset() + val builder = target + .merge(source, target("id") === source("id")) + .whenMatched + .update(Map(target("desc") -> source("desc"))) + builder.collect() + checkSpan("snow.snowpark.MergeBuilder", "collect", "") + } finally { + dropTable(tableName) + } + } + + test("line number - MergeBuilderAsyncActor") { + val tableName = randomName() + try { + import session.implicits._ + val targetDF = Seq((10, "old"), (10, "too_old"), (11, "old")).toDF("id", "desc") + targetDF.write.mode(SaveMode.Overwrite).saveAsTable(tableName) + val target = session.table(tableName) + val source = Seq((10, "new")).toDF("id", "desc") + testSpanExporter.reset() + val builder = target + .merge(source, target("id") === source("id")) + .whenMatched + .update(Map(target("desc") -> source("desc"))) + builder.async.collect().getResult() + checkSpan("snow.snowpark.MergeBuilderAsyncActor", "collect", "") + } finally { + dropTable(tableName) + } + } test("OpenTelemetry.emit") { OpenTelemetry.emit("ClassA", "functionB", "fileC", 123, "chainD") From 1f3c38e9e2e9a011daaebc0bbb6a7f4d09facb02 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Thu, 11 Jul 2024 15:01:54 -0700 Subject: [PATCH 6/9] fix java --- .../snowpark_test/JavaOpenTelemetrySuite.java | 144 +++++------------- 1 file changed, 39 insertions(+), 105 deletions(-) diff --git a/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java b/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java index 0eaee6c1..50a1c723 100644 --- a/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java +++ b/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java @@ -9,40 +9,39 @@ public class JavaOpenTelemetrySuite extends JavaOpenTelemetryEnabled { public void cacheResult() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.cacheResult(); - checkSpan("snow.snowpark.DataFrame", "cacheResult", "JavaOpenTelemetrySuite.java", 11, null); + checkSpan("snow.snowpark.DataFrame", "cacheResult", null); } @Test public void count() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.count(); - checkSpan("snow.snowpark.DataFrame", "count", "JavaOpenTelemetrySuite.java", 18, null); + checkSpan("snow.snowpark.DataFrame", "count", null); } @Test public void collect() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.collect(); - checkSpan("snow.snowpark.DataFrame", "collect", "JavaOpenTelemetrySuite.java", 25, null); + checkSpan("snow.snowpark.DataFrame", "collect", null); } @Test public void toLocalIterator() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.toLocalIterator(); - checkSpan( - "snow.snowpark.DataFrame", "toLocalIterator", "JavaOpenTelemetrySuite.java", 32, null); + checkSpan("snow.snowpark.DataFrame", "toLocalIterator", null); } @Test public void show() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.show(); - checkSpan("snow.snowpark.DataFrame", "show", "JavaOpenTelemetrySuite.java", 40, null); + checkSpan("snow.snowpark.DataFrame", "show", null); df.show(1); - checkSpan("snow.snowpark.DataFrame", "show", "JavaOpenTelemetrySuite.java", 42, null); + checkSpan("snow.snowpark.DataFrame", "show", null); df.show(1, 100); - checkSpan("snow.snowpark.DataFrame", "show", "JavaOpenTelemetrySuite.java", 44, null); + checkSpan("snow.snowpark.DataFrame", "show", null); } @Test @@ -51,20 +50,10 @@ public void createOrReplaceView() { String name = randomName(); try { df.createOrReplaceView(name); - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceView", - "JavaOpenTelemetrySuite.java", - 53, - null); + checkSpan("snow.snowpark.DataFrame", "createOrReplaceView", null); String[] names = {name}; df.createOrReplaceView(names); - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceView", - "JavaOpenTelemetrySuite.java", - 61, - null); + checkSpan("snow.snowpark.DataFrame", "createOrReplaceView", null); } finally { dropView(name); } @@ -76,20 +65,10 @@ public void createOrReplaceTempView() { String name = randomName(); try { df.createOrReplaceTempView(name); - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceTempView", - "JavaOpenTelemetrySuite.java", - 78, - null); + checkSpan("snow.snowpark.DataFrame", "createOrReplaceTempView", null); String[] names = {name}; df.createOrReplaceTempView(names); - checkSpan( - "snow.snowpark.DataFrame", - "createOrReplaceTempView", - "JavaOpenTelemetrySuite.java", - 86, - null); + checkSpan("snow.snowpark.DataFrame", "createOrReplaceTempView", null); } finally { dropView(name); } @@ -99,9 +78,9 @@ public void createOrReplaceTempView() { public void first() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.first(); - checkSpan("snow.snowpark.DataFrame", "first", "JavaOpenTelemetrySuite.java", 101, null); + checkSpan("snow.snowpark.DataFrame", "first", null); df.first(1); - checkSpan("snow.snowpark.DataFrame", "first", "JavaOpenTelemetrySuite.java", 103, null); + checkSpan("snow.snowpark.DataFrame", "first", null); } @Test @@ -109,41 +88,32 @@ public void randomSplit() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); double[] weight = {0.5, 0.5}; df.randomSplit(weight); - checkSpan("snow.snowpark.DataFrame", "randomSplit", "JavaOpenTelemetrySuite.java", 111, null); + checkSpan("snow.snowpark.DataFrame", "randomSplit", null); } @Test public void DataFrameAsyncActor() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.async().collect(); - checkSpan( - "snow.snowpark.DataFrameAsyncActor", "collect", "JavaOpenTelemetrySuite.java", 118, null); + checkSpan("snow.snowpark.DataFrameAsyncActor", "collect", null); df.async().toLocalIterator(); - checkSpan( - "snow.snowpark.DataFrameAsyncActor", - "toLocalIterator", - "JavaOpenTelemetrySuite.java", - 121, - null); + checkSpan("snow.snowpark.DataFrameAsyncActor", "toLocalIterator", null); df.async().count(); - checkSpan( - "snow.snowpark.DataFrameAsyncActor", "count", "JavaOpenTelemetrySuite.java", 128, null); + checkSpan("snow.snowpark.DataFrameAsyncActor", "count", null); } @Test public void dataFrameStatFunctionsCorr() { DataFrame df = getSession().sql("select * from values(0.1, 0.5) as t(a, b)"); df.stat().corr("a", "b"); - checkSpan( - "snow.snowpark.DataFrameStatFunctions", "corr", "JavaOpenTelemetrySuite.java", 136, null); + checkSpan("snow.snowpark.DataFrameStatFunctions", "corr", null); } @Test public void dataFrameStatFunctionsCov() { DataFrame df = getSession().sql("select * from values(0.1, 0.5) as t(a, b)"); df.stat().cov("a", "b"); - checkSpan( - "snow.snowpark.DataFrameStatFunctions", "cov", "JavaOpenTelemetrySuite.java", 144, null); + checkSpan("snow.snowpark.DataFrameStatFunctions", "cov", null); } @Test @@ -151,12 +121,7 @@ public void dataFrameStatFunctionsApproxQuantile() { DataFrame df = getSession().sql("select * from values(1), (2) as t(a)"); double[] values = {0, 0.1, 0.4, 0.6, 1}; df.stat().approxQuantile("a", values); - checkSpan( - "snow.snowpark.DataFrameStatFunctions", - "approxQuantile", - "JavaOpenTelemetrySuite.java", - 153, - null); + checkSpan("snow.snowpark.DataFrameStatFunctions", "approxQuantile", null); } @Test @@ -165,24 +130,14 @@ public void dataFrameStatFunctionsApproxQuantile2() { double[] values = {0, 0.1, 0.6}; String[] cols = {"a", "b"}; df.stat().approxQuantile(cols, values); - checkSpan( - "snow.snowpark.DataFrameStatFunctions", - "approxQuantile", - "JavaOpenTelemetrySuite.java", - 167, - null); + checkSpan("snow.snowpark.DataFrameStatFunctions", "approxQuantile", null); } @Test public void dataFrameStatFunctionsCrosstab() { DataFrame df = getSession().sql("select * from values(0.1, 0.5) as t(a, b)"); df.stat().crosstab("a", "b"); - checkSpan( - "snow.snowpark.DataFrameStatFunctions", - "crosstab", - "JavaOpenTelemetrySuite.java", - 179, - null); + checkSpan("snow.snowpark.DataFrameStatFunctions", "crosstab", null); } @Test @@ -193,7 +148,7 @@ public void dataFrameWriterCsv() { testSpanExporter.reset(); DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.write().csv("@" + name + "/csv"); - checkSpan("snow.snowpark.DataFrameWriter", "csv", "JavaOpenTelemetrySuite.java", 195, null); + checkSpan("snow.snowpark.DataFrameWriter", "csv", null); } finally { dropStage(name); } @@ -210,7 +165,7 @@ public void dataFrameWriterJson() { df.select( com.snowflake.snowpark_java.Functions.array_construct(df.col("a"), df.col("b"))); df2.write().json("@" + name + "/json"); - checkSpan("snow.snowpark.DataFrameWriter", "json", "JavaOpenTelemetrySuite.java", 212, null); + checkSpan("snow.snowpark.DataFrameWriter", "json", null); } finally { dropStage(name); } @@ -224,8 +179,7 @@ public void dataFrameWriterParquet() { testSpanExporter.reset(); DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.write().parquet("@" + name + "/parquet"); - checkSpan( - "snow.snowpark.DataFrameWriter", "parquet", "JavaOpenTelemetrySuite.java", 226, null); + checkSpan("snow.snowpark.DataFrameWriter", "parquet", null); } finally { dropStage(name); } @@ -237,8 +191,7 @@ public void dataFrameWriterSaveAsTable() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); try { df.write().saveAsTable(name); - checkSpan( - "snow.snowpark.DataFrameWriter", "saveAsTable", "JavaOpenTelemetrySuite.java", 239, null); + checkSpan("snow.snowpark.DataFrameWriter", "saveAsTable", null); } finally { dropTable(name); } @@ -246,8 +199,7 @@ public void dataFrameWriterSaveAsTable() { String[] names = {name}; testSpanExporter.reset(); df.write().saveAsTable(names); - checkSpan( - "snow.snowpark.DataFrameWriter", "saveAsTable", "JavaOpenTelemetrySuite.java", 248, null); + checkSpan("snow.snowpark.DataFrameWriter", "saveAsTable", null); } finally { dropTable(name); } @@ -259,12 +211,7 @@ public void dataFrameWriterAsyncActorSaveAsTable() { DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); try { df.write().async().saveAsTable(name).getResult(); - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "saveAsTable", - "JavaOpenTelemetrySuite.java", - 261, - null); + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "saveAsTable", null); } finally { dropTable(name); } @@ -272,12 +219,7 @@ public void dataFrameWriterAsyncActorSaveAsTable() { String[] names = {name}; testSpanExporter.reset(); df.write().async().saveAsTable(names).getResult(); - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "saveAsTable", - "JavaOpenTelemetrySuite.java", - 274, - null); + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "saveAsTable", null); } finally { dropTable(name); } @@ -291,12 +233,7 @@ public void dataFrameWriterAsyncActorCsv() { testSpanExporter.reset(); DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.write().async().csv("@" + name + "/csv").getResult(); - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "csv", - "JavaOpenTelemetrySuite.java", - 293, - null); + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "csv", null); } finally { dropStage(name); } @@ -313,12 +250,7 @@ public void dataFrameWriterAsyncActorJson() { df.select( com.snowflake.snowpark_java.Functions.array_construct(df.col("a"), df.col("b"))); df2.write().async().json("@" + name + "/json").getResult(); - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "json", - "JavaOpenTelemetrySuite.java", - 315, - null); + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "json", null); } finally { dropStage(name); } @@ -332,14 +264,16 @@ public void dataFrameWriterAsyncActorParquet() { testSpanExporter.reset(); DataFrame df = getSession().sql("select * from values(1),(2),(3) as t(num)"); df.write().async().parquet("@" + name + "/parquet").getResult(); - checkSpan( - "snow.snowpark.DataFrameWriterAsyncActor", - "parquet", - "JavaOpenTelemetrySuite.java", - 334, - null); + checkSpan("snow.snowpark.DataFrameWriterAsyncActor", "parquet", null); } finally { dropStage(name); } } + + private void checkSpan(String className, String funcName, String methodChain) { + StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + StackTraceElement file = stack[2]; + checkSpan( + className, funcName, "JavaOpenTelemetrySuite.java", file.getLineNumber() - 1, methodChain); + } } From dc9a3057905e238ac6bb6aca0db37f6e876023dd Mon Sep 17 00:00:00 2001 From: Bing Li Date: Thu, 11 Jul 2024 15:30:22 -0700 Subject: [PATCH 7/9] copyable java --- .../snowpark/CopyableDataFrame.scala | 6 +- .../snowpark/internal/OpenTelemetry.scala | 14 +-- .../snowpark_test/JavaOpenTelemetrySuite.java | 117 +++++++++++++++++- 3 files changed, 126 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala b/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala index 6e8eda5f..f30c24e9 100644 --- a/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala +++ b/src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala @@ -237,7 +237,7 @@ class CopyableDataFrame private[snowpark] ( * @since 0.10.0 * @group basic */ - override def clone: CopyableDataFrame = action("clone") { + override def clone: CopyableDataFrame = action("clone", 2) { new CopyableDataFrame(session, plan, stagedFileReader) } @@ -263,6 +263,10 @@ class CopyableDataFrame private[snowpark] ( val isScala: Boolean = this.session.conn.isScalaAPI OpenTelemetry.action("CopyableDataFrame", funcName, isScala)(func) } + @inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = { + val isScala: Boolean = this.session.conn.isScalaAPI + OpenTelemetry.action("CopyableDataFrame", funcName, isScala, javaOffset)(func) + } } /** diff --git a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala index 37bab409..34de4dd3 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala @@ -12,21 +12,17 @@ object OpenTelemetry extends Logging { val spanInfo = new DynamicVariable[Option[SpanInfo]](None) // wrapper of all action functions - def action[T](className: String, funcName: String, isScala: Boolean)(func: => T): T = { + def action[T](className: String, funcName: String, isScala: Boolean, javaOffSet: Int = 0)( + func: => T): T = { try { spanInfo.withValue[T](spanInfo.value match { // empty info means this is the entry of the recursion case None => val stacks = Thread.currentThread().getStackTrace val methodChain = "" - val (fileName, lineNumber): (String, Int) = - if (isScala) { - val file = stacks(4) - (file.getFileName, file.getLineNumber) - } else { - val file = stacks(5) - (file.getFileName, file.getLineNumber) - } + val index = if (isScala) 4 else 5 + javaOffSet + val fileName = stacks(index).getFileName + val lineNumber = stacks(index).getLineNumber Some(SpanInfo(className, funcName, fileName, lineNumber, methodChain)) // if value is not empty, this function call should be recursion. // do not issue new SpanInfo, use the info inherited from previous. diff --git a/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java b/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java index 50a1c723..51bb4580 100644 --- a/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java +++ b/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java @@ -1,6 +1,11 @@ package com.snowflake.snowpark_test; -import com.snowflake.snowpark_java.DataFrame; +import com.snowflake.snowpark_java.*; +import com.snowflake.snowpark_java.types.DataTypes; +import com.snowflake.snowpark_java.types.StructField; +import com.snowflake.snowpark_java.types.StructType; +import java.util.HashMap; +import java.util.Map; import org.junit.Test; public class JavaOpenTelemetrySuite extends JavaOpenTelemetryEnabled { @@ -270,6 +275,116 @@ public void dataFrameWriterAsyncActorParquet() { } } + @Test + public void copyableDataFrame() { + String stageName = randomName(); + String tableName = randomName(); + StructType schema = + StructType.create( + new StructField("num", DataTypes.IntegerType), + new StructField("str", DataTypes.StringType), + new StructField("double", DataTypes.DoubleType)); + try { + createTable(tableName, "a Int, b String, c Double", true); + createTempStage(stageName); + uploadFileToStage(stageName, TestFiles.testFileCsv, false); + testSpanExporter.reset(); + String className = "snow.snowpark.CopyableDataFrame"; + getSession() + .read() + .schema(schema) + .csv("@" + stageName + "/" + TestFiles.testFileCsv) + .copyInto(tableName); + checkSpan(className, "copyInto", null); + Column[] transformation = {Functions.col("$1"), Functions.col("$2"), Functions.col("$3")}; + getSession() + .read() + .schema(schema) + .csv("@" + stageName + "/" + TestFiles.testFileCsv) + .copyInto(tableName, transformation); + checkSpan(className, "copyInto", null); + Map options = new HashMap<>(); + options.put("skip_header", 1); + options.put("FORCE", "true"); + getSession() + .read() + .schema(schema) + .csv("@" + stageName + "/" + TestFiles.testFileCsv) + .copyInto(tableName, transformation, options); + checkSpan(className, "copyInto", null); + String[] columns = {"a", "b", "c"}; + getSession() + .read() + .schema(schema) + .csv("@" + stageName + "/" + TestFiles.testFileCsv) + .copyInto(tableName, columns, transformation, options); + checkSpan(className, "copyInto", null); + getSession().read().schema(schema).csv("@" + stageName + "/" + TestFiles.testFileCsv).clone(); + checkSpan(className, "clone", null); + } finally { + dropTable(tableName); + dropStage(stageName); + } + } + + @Test + public void copyableDataFrameAsyncActor() { + String stageName = randomName(); + String tableName = randomName(); + StructType schema = + StructType.create( + new StructField("num", DataTypes.IntegerType), + new StructField("str", DataTypes.StringType), + new StructField("double", DataTypes.DoubleType)); + try { + createTable(tableName, "a Int, b String, c Double", true); + createTempStage(stageName); + uploadFileToStage(stageName, TestFiles.testFileCsv, false); + testSpanExporter.reset(); + String className = "snow.snowpark.CopyableDataFrameAsyncActor"; + CopyableDataFrameAsyncActor df1 = + getSession() + .read() + .schema(schema) + .csv("@" + stageName + "/" + TestFiles.testFileCsv) + .async(); + df1.copyInto(tableName).getResult(); + checkSpan(className, "copyInto", null); + Column[] transformation = {Functions.col("$1"), Functions.col("$2"), Functions.col("$3")}; + CopyableDataFrameAsyncActor df2 = + getSession() + .read() + .schema(schema) + .csv("@" + stageName + "/" + TestFiles.testFileCsv) + .async(); + df2.copyInto(tableName, transformation).getResult(); + checkSpan(className, "copyInto", null); + Map options = new HashMap<>(); + options.put("skip_header", 1); + options.put("FORCE", "true"); + CopyableDataFrameAsyncActor df3 = + getSession() + .read() + .schema(schema) + .csv("@" + stageName + "/" + TestFiles.testFileCsv) + .async(); + df3.copyInto(tableName, transformation, options).getResult(); + checkSpan(className, "copyInto", null); + String[] columns = {"a", "b", "c"}; + CopyableDataFrameAsyncActor df4 = + getSession() + .read() + .schema(schema) + .csv("@" + stageName + "/" + TestFiles.testFileCsv) + .async(); + df4.copyInto(tableName, columns, transformation, options).getResult(); + checkSpan(className, "copyInto", null); + } finally { + dropTable(tableName); + dropStage(stageName); + } + } + private void checkSpan(String className, String funcName, String methodChain) { StackTraceElement[] stack = Thread.currentThread().getStackTrace(); StackTraceElement file = stack[2]; From 93a4c2ba89b9a1e2c92d99c5db40fb7594064d7b Mon Sep 17 00:00:00 2001 From: Bing Li Date: Thu, 11 Jul 2024 16:10:30 -0700 Subject: [PATCH 8/9] updatable --- .../com/snowflake/snowpark/Updatable.scala | 36 +++++--- .../snowpark_test/JavaOpenTelemetrySuite.java | 91 +++++++++++++++++++ 2 files changed, 114 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/Updatable.scala b/src/main/scala/com/snowflake/snowpark/Updatable.scala index cdc47535..b89d8f2a 100644 --- a/src/main/scala/com/snowflake/snowpark/Updatable.scala +++ b/src/main/scala/com/snowflake/snowpark/Updatable.scala @@ -76,7 +76,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update(assignments: Map[Column, Column]): UpdateResult = action("update") { + def update(assignments: Map[Column, Column]): UpdateResult = action("update", 2) { val newDf = getUpdateDataFrameWithColumn(assignments, None, None) Updatable.getUpdateResult(newDf.collect()) } @@ -102,7 +102,7 @@ class Updatable private[snowpark] ( * @since 0.7.0 * @return [[UpdateResult]] */ - def update[T: ClassTag](assignments: Map[String, Column]): UpdateResult = action("update") { + def update[T: ClassTag](assignments: Map[String, Column]): UpdateResult = action("update", 2) { val newDf = getUpdateDataFrameWithString(assignments, None, None) Updatable.getUpdateResult(newDf.collect()) } @@ -124,7 +124,7 @@ class Updatable private[snowpark] ( * @return [[UpdateResult]] */ def update(assignments: Map[Column, Column], condition: Column): UpdateResult = - action("update") { + action("update", 2) { val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), None) Updatable.getUpdateResult(newDf.collect()) } @@ -146,7 +146,7 @@ class Updatable private[snowpark] ( * @return [[UpdateResult]] */ def update[T: ClassTag](assignments: Map[String, Column], condition: Column): UpdateResult = - action("update") { + action("update", 2) { val newDf = getUpdateDataFrameWithString(assignments, Some(condition), None) Updatable.getUpdateResult(newDf.collect()) } @@ -170,7 +170,7 @@ class Updatable private[snowpark] ( def update( assignments: Map[Column, Column], condition: Column, - sourceData: DataFrame): UpdateResult = action("update") { + sourceData: DataFrame): UpdateResult = action("update", 2) { val newDf = getUpdateDataFrameWithColumn(assignments, Some(condition), Some(sourceData)) Updatable.getUpdateResult(newDf.collect()) } @@ -194,7 +194,7 @@ class Updatable private[snowpark] ( def update[T: ClassTag]( assignments: Map[String, Column], condition: Column, - sourceData: DataFrame): UpdateResult = action("update") { + sourceData: DataFrame): UpdateResult = action("update", 2) { val newDf = getUpdateDataFrameWithString(assignments, Some(condition), Some(sourceData)) Updatable.getUpdateResult(newDf.collect()) } @@ -328,7 +328,7 @@ class Updatable private[snowpark] ( * @since 0.10.0 * @group basic */ - override def clone: Updatable = action("clone") { + override def clone: Updatable = action("clone", 2) { new Updatable(tableName, session) } @@ -356,6 +356,11 @@ class Updatable private[snowpark] ( OpenTelemetry.action("Updatable", funcName, isScala)(func) } + @inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = { + val isScala: Boolean = this.session.conn.isScalaAPI + OpenTelemetry.action("Updatable", funcName, isScala, javaOffset)(func) + } + } /** @@ -374,7 +379,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * @since 0.11.0 */ def update(assignments: Map[Column, Column]): TypedAsyncJob[UpdateResult] = - action("update") { + action("update", 2) { val newDf = updatable.getUpdateDataFrameWithColumn(assignments, None, None) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) } @@ -387,7 +392,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * @since 0.11.0 */ def update[T: ClassTag](assignments: Map[String, Column]): TypedAsyncJob[UpdateResult] = - action("update") { + action("update", 2) { val newDf = updatable.getUpdateDataFrameWithString(assignments, None, None) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) } @@ -400,7 +405,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) * @since 0.11.0 */ def update(assignments: Map[Column, Column], condition: Column): TypedAsyncJob[UpdateResult] = - action("update") { + action("update", 2) { val newDf = updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), None) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) } @@ -415,7 +420,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) def update[T: ClassTag]( assignments: Map[String, Column], condition: Column): TypedAsyncJob[UpdateResult] = - action("update") { + action("update", 2) { val newDf = updatable.getUpdateDataFrameWithString(assignments, Some(condition), None) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) } @@ -430,7 +435,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) def update( assignments: Map[Column, Column], condition: Column, - sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update") { + sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update", 2) { val newDf = updatable.getUpdateDataFrameWithColumn(assignments, Some(condition), Some(sourceData)) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) @@ -446,7 +451,7 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) def update[T: ClassTag]( assignments: Map[String, Column], condition: Column, - sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update") { + sourceData: DataFrame): TypedAsyncJob[UpdateResult] = action("update", 2) { val newDf = updatable.getUpdateDataFrameWithString(assignments, Some(condition), Some(sourceData)) updatable.session.conn.executeAsync[UpdateResult](newDf.snowflakePlan) @@ -493,4 +498,9 @@ class UpdatableAsyncActor private[snowpark] (updatable: Updatable) val isScala: Boolean = updatable.session.conn.isScalaAPI OpenTelemetry.action("UpdatableAsyncActor", funcName, isScala)(func) } + + @inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = { + val isScala: Boolean = updatable.session.conn.isScalaAPI + OpenTelemetry.action("UpdatableAsyncActor", funcName, isScala, javaOffset)(func) + } } diff --git a/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java b/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java index 51bb4580..77a9f2c8 100644 --- a/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java +++ b/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java @@ -385,6 +385,97 @@ public void copyableDataFrameAsyncActor() { } } + @Test + public void updatable() { + String tableName = randomName(); + Row[] data = {Row.create(1, "a", true), Row.create(2, "b", false)}; + StructType schema = + StructType.create( + new StructField("col1", DataTypes.IntegerType), + new StructField("col2", DataTypes.StringType), + new StructField("col3", DataTypes.BooleanType)); + String className = "snow.snowpark.Updatable"; + DataFrame df = getSession().sql("select * from values(1, 2), (1, 4) as t(a, b)"); + try { + getSession().createDataFrame(data, schema).write().saveAsTable(tableName); + testSpanExporter.reset(); + Map map = new HashMap<>(); + map.put(Functions.col("col1"), Functions.lit(3)); + Map map1 = new HashMap<>(); + map1.put("col1", Functions.lit(3)); + getSession().table(tableName).update(map); + checkSpan(className, "update", null); + getSession().table(tableName).updateColumn(map1); + checkSpan(className, "update", null); + getSession() + .table(tableName) + .update(map, Functions.col("col3").equal_to(Functions.lit(true))); + checkSpan(className, "update", null); + getSession() + .table(tableName) + .updateColumn(map1, Functions.col("col3").equal_to(Functions.lit(true))); + checkSpan(className, "update", null); + getSession().table(tableName).update(map, Functions.col("col1").equal_to(df.col("a")), df); + checkSpan(className, "update", null); + getSession() + .table(tableName) + .updateColumn(map1, Functions.col("col1").equal_to(df.col("a")), df); + checkSpan(className, "update", null); + getSession().table(tableName).delete(); + checkSpan(className, "delete", null); + getSession().table(tableName).delete(Functions.col("col1").equal_to(Functions.lit(1))); + checkSpan(className, "delete", null); + getSession().table(tableName).delete(Functions.col("col1").equal_to(df.col("a")), df); + checkSpan(className, "delete", null); + getSession().table(tableName).clone(); + checkSpan(className, "clone", null); + } finally { + dropTable(tableName); + } + } + + @Test + public void updatableAsyncActor() { + String tableName = randomName(); + Row[] data = {Row.create(1, "a", true), Row.create(2, "b", false)}; + StructType schema = + StructType.create( + new StructField("col1", DataTypes.IntegerType), + new StructField("col2", DataTypes.StringType), + new StructField("col3", DataTypes.BooleanType)); + String className = "snow.snowpark.UpdatableAsyncActor"; + DataFrame df = getSession().sql("select * from values(1, 2), (1, 4) as t(a, b)"); + try { + getSession().createDataFrame(data, schema).write().saveAsTable(tableName); + testSpanExporter.reset(); + Map map = new HashMap<>(); + map.put(Functions.col("col1"), Functions.lit(3)); + Map map1 = new HashMap<>(); + map1.put("col1", Functions.lit(3)); + UpdatableAsyncActor df1 = getSession().table(tableName).async(); + df1.update(map).getResult(); + checkSpan(className, "update", null); + df1.updateColumn(map1).getResult(); + checkSpan(className, "update", null); + df1.update(map, Functions.col("col3").equal_to(Functions.lit(true))).getResult(); + checkSpan(className, "update", null); + df1.updateColumn(map1, Functions.col("col3").equal_to(Functions.lit(true))).getResult(); + checkSpan(className, "update", null); + df1.update(map, Functions.col("col1").equal_to(df.col("a")), df); + checkSpan(className, "update", null); + df1.updateColumn(map1, Functions.col("col1").equal_to(df.col("a")), df); + checkSpan(className, "update", null); + df1.delete().getResult(); + checkSpan(className, "delete", null); + df1.delete(Functions.col("col1").equal_to(Functions.lit(1))).getResult(); + checkSpan(className, "delete", null); + df1.delete(Functions.col("col1").equal_to(df.col("a")), df).getResult(); + checkSpan(className, "delete", null); + } finally { + dropTable(tableName); + } + } + private void checkSpan(String className, String funcName, String methodChain) { StackTraceElement[] stack = Thread.currentThread().getStackTrace(); StackTraceElement file = stack[2]; From af44cbc80f980a1ae959d71705d1721fb88b0279 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Thu, 11 Jul 2024 16:22:08 -0700 Subject: [PATCH 9/9] test --- .../snowpark_test/JavaOpenTelemetrySuite.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java b/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java index 77a9f2c8..997b194b 100644 --- a/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java +++ b/src/test/java/com/snowflake/snowpark_test/JavaOpenTelemetrySuite.java @@ -476,6 +476,62 @@ public void updatableAsyncActor() { } } + @Test + public void mergeBuilder() { + String tableName = randomName(); + DataFrame df = getSession().sql("select * from values(1, 2), (3, 4) as t(a, b)"); + Row[] data = {Row.create(1, "a", true), Row.create(2, "b", false)}; + StructType schema = + StructType.create( + new StructField("col1", DataTypes.IntegerType), + new StructField("col2", DataTypes.StringType), + new StructField("col3", DataTypes.BooleanType)); + try { + getSession().createDataFrame(data, schema).write().saveAsTable(tableName); + testSpanExporter.reset(); + Map assignments = new HashMap<>(); + assignments.put(Functions.col("col1"), df.col("b")); + getSession() + .table(tableName) + .merge(df, Functions.col("col1").equal_to(df.col("a"))) + .whenMatched() + .update(assignments) + .collect(); + checkSpan("snow.snowpark.MergeBuilder", "collect", null); + } finally { + dropTable(tableName); + } + } + + @Test + public void mergeBuilderAsyncActor() { + String tableName = randomName(); + DataFrame df = getSession().sql("select * from values(1, 2), (3, 4) as t(a, b)"); + Row[] data = {Row.create(1, "a", true), Row.create(2, "b", false)}; + StructType schema = + StructType.create( + new StructField("col1", DataTypes.IntegerType), + new StructField("col2", DataTypes.StringType), + new StructField("col3", DataTypes.BooleanType)); + try { + getSession().createDataFrame(data, schema).write().saveAsTable(tableName); + testSpanExporter.reset(); + Map assignments = new HashMap<>(); + assignments.put(Functions.col("col1"), df.col("b")); + MergeBuilderAsyncActor builderAsyncActor = + getSession() + .table(tableName) + .merge(df, Functions.col("col1").equal_to(df.col("a"))) + .whenMatched() + .update(assignments) + .async(); + builderAsyncActor.collect().getResult(); + checkSpan("snow.snowpark.MergeBuilderAsyncActor", "collect", null); + } finally { + dropTable(tableName); + } + } + private void checkSpan(String className, String funcName, String methodChain) { StackTraceElement[] stack = Thread.currentThread().getStackTrace(); StackTraceElement file = stack[2];