Skip to content

Commit

Permalink
SNOW-1480775 Collect Span Info from Copyable and Updatable Dataframes (
Browse files Browse the repository at this point in the history
…#119)

* copyableDataFrame

* CopyableDataFrameAsyncActor

* update

* UpdatableAsyncActor

* builder

* fix java

* copyable java

* updatable

* test
  • Loading branch information
sfc-gh-bli authored Jul 12, 2024
1 parent 9a0a7a6 commit ef78860
Show file tree
Hide file tree
Showing 7 changed files with 641 additions and 307 deletions.
43 changes: 31 additions & 12 deletions src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class CopyableDataFrame private[snowpark] (
* @param tableName Name of the table where the data should be saved.
* @since 0.9.0
*/
def copyInto(tableName: String): Unit =
def copyInto(tableName: String): Unit = action("copyInto") {
getCopyDataFrame(tableName, Seq.empty, Seq.empty, Map.empty).collect()
}

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -82,8 +83,9 @@ class CopyableDataFrame private[snowpark] (
* @since 0.9.0
*/
// scalastyle:on line.size.limit
def copyInto(tableName: String, transformations: Seq[Column]): Unit =
def copyInto(tableName: String, transformations: Seq[Column]): Unit = action("copyInto") {
getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty).collect()
}

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -136,7 +138,9 @@ class CopyableDataFrame private[snowpark] (
*/
// scalastyle:on line.size.limit
def copyInto(tableName: String, transformations: Seq[Column], options: Map[String, Any]): Unit =
getCopyDataFrame(tableName, Seq.empty, transformations, options).collect()
action("copyInto") {
getCopyDataFrame(tableName, Seq.empty, transformations, options).collect()
}

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -198,8 +202,9 @@ class CopyableDataFrame private[snowpark] (
tableName: String,
targetColumnNames: Seq[String],
transformations: Seq[Column],
options: Map[String, Any]): Unit =
options: Map[String, Any]): Unit = action("copyInto") {
getCopyDataFrame(tableName, targetColumnNames, transformations, options).collect()
}

// Internal function to create plan for COPY
private[snowpark] def getCopyDataFrame(
Expand Down Expand Up @@ -232,8 +237,9 @@ class CopyableDataFrame private[snowpark] (
* @since 0.10.0
* @group basic
*/
override def clone: CopyableDataFrame =
override def clone: CopyableDataFrame = action("clone", 2) {
new CopyableDataFrame(session, plan, stagedFileReader)
}

/**
* Returns a [[CopyableDataFrameAsyncActor]] object that can be used to execute
Expand All @@ -253,6 +259,14 @@ class CopyableDataFrame private[snowpark] (
*/
override def async: CopyableDataFrameAsyncActor = new CopyableDataFrameAsyncActor(this)

@inline override protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("CopyableDataFrame", funcName, isScala)(func)
}
@inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("CopyableDataFrame", funcName, isScala, javaOffset)(func)
}
}

/**
Expand All @@ -271,7 +285,7 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
* and get the results.
* @since 0.11.0
*/
def copyInto(tableName: String): TypedAsyncJob[Unit] = {
def copyInto(tableName: String): TypedAsyncJob[Unit] = action("copyInto") {
val df = cdf.getCopyDataFrame(tableName)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}
Expand All @@ -288,10 +302,11 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
* @since 0.11.0
*/
// scalastyle:on line.size.limit
def copyInto(tableName: String, transformations: Seq[Column]): TypedAsyncJob[Unit] = {
val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}
def copyInto(tableName: String, transformations: Seq[Column]): TypedAsyncJob[Unit] =
action("copyInto") {
val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}

// scalastyle:off line.size.limit
/**
Expand All @@ -312,7 +327,7 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
def copyInto(
tableName: String,
transformations: Seq[Column],
options: Map[String, Any]): TypedAsyncJob[Unit] = {
options: Map[String, Any]): TypedAsyncJob[Unit] = action("copyInto") {
val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, options)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}
Expand All @@ -338,9 +353,13 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
tableName: String,
targetColumnNames: Seq[String],
transformations: Seq[Column],
options: Map[String, Any]): TypedAsyncJob[Unit] = {
options: Map[String, Any]): TypedAsyncJob[Unit] = action("copyInto") {
val df = cdf.getCopyDataFrame(tableName, targetColumnNames, transformations, options)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}

@inline override protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = cdf.session.conn.isScalaAPI
OpenTelemetry.action("CopyableDataFrameAsyncActor", funcName, isScala)(func)
}
}
16 changes: 13 additions & 3 deletions src/main/scala/com/snowflake/snowpark/MergeBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowflake.snowpark

import com.snowflake.snowpark.internal.ErrorMessage
import com.snowflake.snowpark.internal.{ErrorMessage, OpenTelemetry}
import com.snowflake.snowpark.internal.analyzer.{MergeExpression, TableMerge}

/**
Expand Down Expand Up @@ -167,7 +167,7 @@ class MergeBuilder private[snowpark] (
* @since 0.7.0
* @return [[MergeResult]]
*/
def collect(): MergeResult = {
def collect(): MergeResult = action("collect") {
val rows = getMergeDataFrame().collect()
MergeBuilder.getMergeResult(rows, this)
}
Expand Down Expand Up @@ -202,6 +202,11 @@ class MergeBuilder private[snowpark] (
* @return A [[MergeBuilderAsyncActor]] object
*/
def async: MergeBuilderAsyncActor = new MergeBuilderAsyncActor(this)

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = target.session.conn.isScalaAPI
OpenTelemetry.action("MergeBuilder", funcName, isScala)(func)
}
}

/**
Expand All @@ -218,9 +223,14 @@ class MergeBuilderAsyncActor private[snowpark] (mergeBuilder: MergeBuilder) {
* and get the results.
* @since 1.3.0
*/
def collect(): TypedAsyncJob[MergeResult] = {
def collect(): TypedAsyncJob[MergeResult] = action("collect") {
val newDf = mergeBuilder.getMergeDataFrame()
mergeBuilder.target.session.conn
.executeAsync[MergeResult](newDf.snowflakePlan, Some(mergeBuilder))
}

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = mergeBuilder.target.session.conn.isScalaAPI
OpenTelemetry.action("MergeBuilderAsyncActor", funcName, isScala)(func)
}
}
Loading

0 comments on commit ef78860

Please sign in to comment.