Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1480775 Collect Span Info from Copyable and Updatable Dataframes #119

Merged
merged 9 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions src/main/scala/com/snowflake/snowpark/CopyableDataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class CopyableDataFrame private[snowpark] (
* @param tableName Name of the table where the data should be saved.
* @since 0.9.0
*/
def copyInto(tableName: String): Unit =
def copyInto(tableName: String): Unit = action("copyInto") {
getCopyDataFrame(tableName, Seq.empty, Seq.empty, Map.empty).collect()
}

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -82,8 +83,9 @@ class CopyableDataFrame private[snowpark] (
* @since 0.9.0
*/
// scalastyle:on line.size.limit
def copyInto(tableName: String, transformations: Seq[Column]): Unit =
def copyInto(tableName: String, transformations: Seq[Column]): Unit = action("copyInto") {
getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty).collect()
}

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -136,7 +138,9 @@ class CopyableDataFrame private[snowpark] (
*/
// scalastyle:on line.size.limit
def copyInto(tableName: String, transformations: Seq[Column], options: Map[String, Any]): Unit =
getCopyDataFrame(tableName, Seq.empty, transformations, options).collect()
action("copyInto") {
getCopyDataFrame(tableName, Seq.empty, transformations, options).collect()
}

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -198,8 +202,9 @@ class CopyableDataFrame private[snowpark] (
tableName: String,
targetColumnNames: Seq[String],
transformations: Seq[Column],
options: Map[String, Any]): Unit =
options: Map[String, Any]): Unit = action("copyInto") {
getCopyDataFrame(tableName, targetColumnNames, transformations, options).collect()
}

// Internal function to create plan for COPY
private[snowpark] def getCopyDataFrame(
Expand Down Expand Up @@ -232,8 +237,9 @@ class CopyableDataFrame private[snowpark] (
* @since 0.10.0
* @group basic
*/
override def clone: CopyableDataFrame =
override def clone: CopyableDataFrame = action("clone", 2) {
new CopyableDataFrame(session, plan, stagedFileReader)
}

/**
* Returns a [[CopyableDataFrameAsyncActor]] object that can be used to execute
Expand All @@ -253,6 +259,14 @@ class CopyableDataFrame private[snowpark] (
*/
override def async: CopyableDataFrameAsyncActor = new CopyableDataFrameAsyncActor(this)

@inline override protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("CopyableDataFrame", funcName, isScala)(func)
}
@inline protected def action[T](funcName: String, javaOffset: Int)(func: => T): T = {
val isScala: Boolean = this.session.conn.isScalaAPI
OpenTelemetry.action("CopyableDataFrame", funcName, isScala, javaOffset)(func)
}
}

/**
Expand All @@ -271,7 +285,7 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
* and get the results.
* @since 0.11.0
*/
def copyInto(tableName: String): TypedAsyncJob[Unit] = {
def copyInto(tableName: String): TypedAsyncJob[Unit] = action("copyInto") {
val df = cdf.getCopyDataFrame(tableName)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}
Expand All @@ -288,10 +302,11 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
* @since 0.11.0
*/
// scalastyle:on line.size.limit
def copyInto(tableName: String, transformations: Seq[Column]): TypedAsyncJob[Unit] = {
val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}
def copyInto(tableName: String, transformations: Seq[Column]): TypedAsyncJob[Unit] =
action("copyInto") {
val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, Map.empty)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}

// scalastyle:off line.size.limit
/**
Expand All @@ -312,7 +327,7 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
def copyInto(
tableName: String,
transformations: Seq[Column],
options: Map[String, Any]): TypedAsyncJob[Unit] = {
options: Map[String, Any]): TypedAsyncJob[Unit] = action("copyInto") {
val df = cdf.getCopyDataFrame(tableName, Seq.empty, transformations, options)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}
Expand All @@ -338,9 +353,13 @@ class CopyableDataFrameAsyncActor private[snowpark] (cdf: CopyableDataFrame)
tableName: String,
targetColumnNames: Seq[String],
transformations: Seq[Column],
options: Map[String, Any]): TypedAsyncJob[Unit] = {
options: Map[String, Any]): TypedAsyncJob[Unit] = action("copyInto") {
val df = cdf.getCopyDataFrame(tableName, targetColumnNames, transformations, options)
cdf.session.conn.executeAsync[Unit](df.snowflakePlan)
}

@inline override protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = cdf.session.conn.isScalaAPI
OpenTelemetry.action("CopyableDataFrameAsyncActor", funcName, isScala)(func)
}
}
16 changes: 13 additions & 3 deletions src/main/scala/com/snowflake/snowpark/MergeBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowflake.snowpark

import com.snowflake.snowpark.internal.ErrorMessage
import com.snowflake.snowpark.internal.{ErrorMessage, OpenTelemetry}
import com.snowflake.snowpark.internal.analyzer.{MergeExpression, TableMerge}

/**
Expand Down Expand Up @@ -167,7 +167,7 @@ class MergeBuilder private[snowpark] (
* @since 0.7.0
* @return [[MergeResult]]
*/
def collect(): MergeResult = {
def collect(): MergeResult = action("collect") {
val rows = getMergeDataFrame().collect()
MergeBuilder.getMergeResult(rows, this)
}
Expand Down Expand Up @@ -202,6 +202,11 @@ class MergeBuilder private[snowpark] (
* @return A [[MergeBuilderAsyncActor]] object
*/
def async: MergeBuilderAsyncActor = new MergeBuilderAsyncActor(this)

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = target.session.conn.isScalaAPI
OpenTelemetry.action("MergeBuilder", funcName, isScala)(func)
}
}

/**
Expand All @@ -218,9 +223,14 @@ class MergeBuilderAsyncActor private[snowpark] (mergeBuilder: MergeBuilder) {
* and get the results.
* @since 1.3.0
*/
def collect(): TypedAsyncJob[MergeResult] = {
def collect(): TypedAsyncJob[MergeResult] = action("collect") {
val newDf = mergeBuilder.getMergeDataFrame()
mergeBuilder.target.session.conn
.executeAsync[MergeResult](newDf.snowflakePlan, Some(mergeBuilder))
}

@inline protected def action[T](funcName: String)(func: => T): T = {
val isScala: Boolean = mergeBuilder.target.session.conn.isScalaAPI
OpenTelemetry.action("MergeBuilderAsyncActor", funcName, isScala)(func)
}
}
Loading
Loading