Skip to content

Commit

Permalink
[SPARK-49429][CONNECT][SQL] Add Shared DataStreamWriter interface
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR adds a shared DataStreamWriter to sql.

### Why are the changes needed?
We are creating a unified Scala interface for sql.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48212 from hvanhovell/SPARK-49429.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
  • Loading branch information
hvanhovell committed Sep 24, 2024
1 parent 94d288e commit 742265e
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 476 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1035,12 +1035,7 @@ class Dataset[T] private[sql] (
new MergeIntoWriterImpl[T](table, this, condition)
}

/**
* Interface for saving the content of the streaming Dataset out into external storage.
*
* @group basic
* @since 3.5.0
*/
/** @inheritdoc */
def writeStream: DataStreamWriter[T] = {
new DataStreamWriter[T](this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.Command
import org.apache.spark.connect.proto.WriteStreamOperationStart
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, ForeachWriter}
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, ForeachWriterPacket, UdfUtils}
import org.apache.spark.sql.{api, Dataset, ForeachWriter}
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, ForeachWriterPacket}
import org.apache.spark.sql.execution.streaming.AvailableNowTrigger
import org.apache.spark.sql.execution.streaming.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.OneTimeTrigger
Expand All @@ -47,63 +46,23 @@ import org.apache.spark.util.SparkSerDeUtils
* @since 3.5.0
*/
@Evolving
final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends api.DataStreamWriter[T] {
override type DS[U] = Dataset[U]

/**
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. <ul> <li>
* `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be written
* to the sink.</li> <li> `OutputMode.Complete()`: all the rows in the streaming
* DataFrame/Dataset will be written to the sink every time there are some updates.</li> <li>
* `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
* will be written to the sink every time there are some updates. If the query doesn't contain
* aggregations, it will be equivalent to `OutputMode.Append()` mode.</li> </ul>
*
* @since 3.5.0
*/
def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
/** @inheritdoc */
def outputMode(outputMode: OutputMode): this.type = {
sinkBuilder.setOutputMode(outputMode.toString.toLowerCase(Locale.ROOT))
this
}

/**
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. <ul> <li>
* `append`: only the new rows in the streaming DataFrame/Dataset will be written to the
* sink.</li> <li> `complete`: all the rows in the streaming DataFrame/Dataset will be written
* to the sink every time there are some updates.</li> <li> `update`: only the rows that were
* updated in the streaming DataFrame/Dataset will be written to the sink every time there are
* some updates. If the query doesn't contain aggregations, it will be equivalent to `append`
* mode.</li> </ul>
*
* @since 3.5.0
*/
def outputMode(outputMode: String): DataStreamWriter[T] = {
/** @inheritdoc */
def outputMode(outputMode: String): this.type = {
sinkBuilder.setOutputMode(outputMode)
this
}

/**
* Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will
* run the query as fast as possible.
*
* Scala Example:
* {{{
* df.writeStream.trigger(ProcessingTime("10 seconds"))
*
* import scala.concurrent.duration._
* df.writeStream.trigger(ProcessingTime(10.seconds))
* }}}
*
* Java Example:
* {{{
* df.writeStream().trigger(ProcessingTime.create("10 seconds"))
*
* import java.util.concurrent.TimeUnit
* df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 3.5.0
*/
def trigger(trigger: Trigger): DataStreamWriter[T] = {
/** @inheritdoc */
def trigger(trigger: Trigger): this.type = {
trigger match {
case ProcessingTimeTrigger(intervalMs) =>
sinkBuilder.setProcessingTimeInterval(s"$intervalMs milliseconds")
Expand All @@ -117,123 +76,54 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
this
}

/**
* Specifies the name of the [[StreamingQuery]] that can be started with `start()`. This name
* must be unique among all the currently active queries in the associated SQLContext.
*
* @since 3.5.0
*/
def queryName(queryName: String): DataStreamWriter[T] = {
/** @inheritdoc */
def queryName(queryName: String): this.type = {
sinkBuilder.setQueryName(queryName)
this
}

/**
* Specifies the underlying output data source.
*
* @since 3.5.0
*/
def format(source: String): DataStreamWriter[T] = {
/** @inheritdoc */
def format(source: String): this.type = {
sinkBuilder.setFormat(source)
this
}

/**
* Partitions the output by the given columns on the file system. If specified, the output is
* laid out on the file system similar to Hive's partitioning scheme. As an example, when we
* partition a dataset by year and then month, the directory layout would look like:
*
* <ul> <li> year=2016/month=01/</li> <li> year=2016/month=02/</li> </ul>
*
* Partitioning is one of the most widely used techniques to optimize physical data layout. It
* provides a coarse-grained index for skipping unnecessary data reads when queries have
* predicates on the partitioned columns. In order for partitioning to work well, the number of
* distinct values in each column should typically be less than tens of thousands.
*
* @since 3.5.0
*/
/** @inheritdoc */
@scala.annotation.varargs
def partitionBy(colNames: String*): DataStreamWriter[T] = {
def partitionBy(colNames: String*): this.type = {
sinkBuilder.clearPartitioningColumnNames()
sinkBuilder.addAllPartitioningColumnNames(colNames.asJava)
this
}

/**
* Clusters the output by the given columns. If specified, the output is laid out such that
* records with similar values on the clustering column are grouped together in the same file.
*
* Clustering improves query efficiency by allowing queries with predicates on the clustering
* columns to skip unnecessary data. Unlike partitioning, clustering can be used on very high
* cardinality columns.
*
* @since 4.0.0
*/
/** @inheritdoc */
@scala.annotation.varargs
def clusterBy(colNames: String*): DataStreamWriter[T] = {
def clusterBy(colNames: String*): this.type = {
sinkBuilder.clearClusteringColumnNames()
sinkBuilder.addAllClusteringColumnNames(colNames.asJava)
this
}

/**
* Adds an output option for the underlying data source.
*
* @since 3.5.0
*/
def option(key: String, value: String): DataStreamWriter[T] = {
/** @inheritdoc */
def option(key: String, value: String): this.type = {
sinkBuilder.putOptions(key, value)
this
}

/**
* Adds an output option for the underlying data source.
*
* @since 3.5.0
*/
def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString)

/**
* Adds an output option for the underlying data source.
*
* @since 3.5.0
*/
def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString)

/**
* Adds an output option for the underlying data source.
*
* @since 3.5.0
*/
def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString)

/**
* (Scala-specific) Adds output options for the underlying data source.
*
* @since 3.5.0
*/
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = {
/** @inheritdoc */
def options(options: scala.collection.Map[String, String]): this.type = {
this.options(options.asJava)
this
}

/**
* Adds output options for the underlying data source.
*
* @since 3.5.0
*/
def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
/** @inheritdoc */
def options(options: java.util.Map[String, String]): this.type = {
sinkBuilder.putAllOptions(options)
this
}

/**
* Sets the output of the streaming query to be processed using the provided writer object.
* object. See [[org.apache.spark.sql.ForeachWriter]] for more details on the lifecycle and
* semantics.
* @since 3.5.0
*/
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = {
/** @inheritdoc */
def foreach(writer: ForeachWriter[T]): this.type = {
val serialized = SparkSerDeUtils.serialize(ForeachWriterPacket(writer, ds.agnosticEncoder))
val scalaWriterBuilder = proto.ScalarScalaUDF
.newBuilder()
Expand All @@ -242,21 +132,9 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
this
}

/**
* :: Experimental ::
*
* (Scala-specific) Sets the output of the streaming query to be processed using the provided
* function. This is supported only in the micro-batch execution modes (that is, when the
* trigger is not continuous). In every micro-batch, the provided function will be called in
* every micro-batch with (i) the output rows as a Dataset and (ii) the batch identifier. The
* batchId can be used to deduplicate and transactionally write the output (that is, the
* provided Dataset) to external systems. The output Dataset is guaranteed to be exactly the
* same for the same batchId (assuming all operations are deterministic in the query).
*
* @since 3.5.0
*/
/** @inheritdoc */
@Evolving
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T] = {
def foreachBatch(function: (Dataset[T], Long) => Unit): this.type = {
val serializedFn = SparkSerDeUtils.serialize(function)
sinkBuilder.getForeachBatchBuilder.getScalaFunctionBuilder
.setPayload(ByteString.copyFrom(serializedFn))
Expand All @@ -265,48 +143,13 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
this
}

/**
* :: Experimental ::
*
* (Java-specific) Sets the output of the streaming query to be processed using the provided
* function. This is supported only in the micro-batch execution modes (that is, when the
* trigger is not continuous). In every micro-batch, the provided function will be called in
* every micro-batch with (i) the output rows as a Dataset and (ii) the batch identifier. The
* batchId can be used to deduplicate and transactionally write the output (that is, the
* provided Dataset) to external systems. The output Dataset is guaranteed to be exactly the
* same for the same batchId (assuming all operations are deterministic in the query).
*
* @since 3.5.0
*/
@Evolving
def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): DataStreamWriter[T] = {
foreachBatch(UdfUtils.foreachBatchFuncToScalaFunc(function))
}

/**
* Starts the execution of the streaming query, which will continually output results to the
* given path as new data arrives. The returned [[StreamingQuery]] object can be used to
* interact with the stream.
*
* @since 3.5.0
*/
/** @inheritdoc */
def start(path: String): StreamingQuery = {
sinkBuilder.setPath(path)
start()
}

/**
* Starts the execution of the streaming query, which will continually output results to the
* given path as new data arrives. The returned [[StreamingQuery]] object can be used to
* interact with the stream. Throws a `TimeoutException` if the following conditions are met:
* - Another run of the same streaming query, that is a streaming query sharing the same
* checkpoint location, is already active on the same Spark Driver
* - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart` is enabled
* - The active run cannot be stopped within the timeout controlled by the SQL configuration
* `spark.sql.streaming.stopTimeout`
*
* @since 3.5.0
*/
/** @inheritdoc */
@throws[TimeoutException]
def start(): StreamingQuery = {
val startCmd = Command
Expand All @@ -323,29 +166,32 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
RemoteStreamingQuery.fromStartCommandResponse(ds.sparkSession, resp)
}

/**
* Starts the execution of the streaming query, which will continually output results to the
* given table as new data arrives. The returned [[StreamingQuery]] object can be used to
* interact with the stream.
*
* For v1 table, partitioning columns provided by `partitionBy` will be respected no matter the
* table exists or not. A new table will be created if the table not exists.
*
* For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will
* be respected only if the v2 table does not exist. Besides, the v2 table created by this API
* lacks some functionalities (e.g., customized properties, options, and serde info). If you
* need them, please create the v2 table manually before the execution to avoid creating a table
* with incomplete information.
*
* @since 3.5.0
*/
/** @inheritdoc */
@Evolving
@throws[TimeoutException]
def toTable(tableName: String): StreamingQuery = {
sinkBuilder.setTableName(tableName)
start()
}

///////////////////////////////////////////////////////////////////////////////////////
// Covariant Overrides
///////////////////////////////////////////////////////////////////////////////////////

/** @inheritdoc */
override def option(key: String, value: Boolean): this.type = super.option(key, value)

/** @inheritdoc */
override def option(key: String, value: Long): this.type = super.option(key, value)

/** @inheritdoc */
override def option(key: String, value: Double): this.type = super.option(key, value)

/** @inheritdoc */
@Evolving
override def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): this.type =
super.foreachBatch(function)

private val sinkBuilder = WriteStreamOperationStart
.newBuilder()
.setInput(ds.plan.getRoot)
Expand Down
Loading

0 comments on commit 742265e

Please sign in to comment.