diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 5313369a2c987..1b41566ca1d1d 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -209,15 +209,7 @@ class SparkSession private[sql] (
/** @inheritdoc */
def read: DataFrameReader = new DataFrameReader(this)
- /**
- * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
- * {{{
- * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
- * sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
- * }}}
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def readStream: DataStreamReader = new DataStreamReader(this)
lazy val streams: StreamingQueryManager = new StreamingQueryManager(this)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 789425c9daea1..2ff34a6343644 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -21,11 +21,9 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Evolving
import org.apache.spark.connect.proto.Read.DataSource
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.{api, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.connect.ConnectConversions._
+import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.types.StructType
/**
@@ -35,101 +33,49 @@ import org.apache.spark.sql.types.StructType
* @since 3.5.0
*/
@Evolving
-final class DataStreamReader private[sql] (sparkSession: SparkSession) extends Logging {
+final class DataStreamReader private[sql] (sparkSession: SparkSession)
+ extends api.DataStreamReader {
- /**
- * Specifies the input data source format.
- *
- * @since 3.5.0
- */
- def format(source: String): DataStreamReader = {
+ private val sourceBuilder = DataSource.newBuilder()
+
+ /** @inheritdoc */
+ def format(source: String): this.type = {
sourceBuilder.setFormat(source)
this
}
- /**
- * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
- * automatically from data. By specifying the schema here, the underlying data source can skip
- * the schema inference step, and thus speed up data loading.
- *
- * @since 3.5.0
- */
- def schema(schema: StructType): DataStreamReader = {
+ /** @inheritdoc */
+ def schema(schema: StructType): this.type = {
if (schema != null) {
sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail all the attributes.
}
this
}
- /**
- * Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON)
- * can infer the input schema automatically from data. By specifying the schema here, the
- * underlying data source can skip the schema inference step, and thus speed up data loading.
- *
- * @since 3.5.0
- */
- def schema(schemaString: String): DataStreamReader = {
+ /** @inheritdoc */
+ override def schema(schemaString: String): this.type = {
sourceBuilder.setSchema(schemaString)
this
}
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 3.5.0
- */
- def option(key: String, value: String): DataStreamReader = {
+ /** @inheritdoc */
+ def option(key: String, value: String): this.type = {
sourceBuilder.putOptions(key, value)
this
}
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 3.5.0
- */
- def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 3.5.0
- */
- def option(key: String, value: Long): DataStreamReader = option(key, value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 3.5.0
- */
- def option(key: String, value: Double): DataStreamReader = option(key, value.toString)
-
- /**
- * (Scala-specific) Adds input options for the underlying data source.
- *
- * @since 3.5.0
- */
- def options(options: scala.collection.Map[String, String]): DataStreamReader = {
+ /** @inheritdoc */
+ def options(options: scala.collection.Map[String, String]): this.type = {
this.options(options.asJava)
- this
}
- /**
- * (Java-specific) Adds input options for the underlying data source.
- *
- * @since 3.5.0
- */
- def options(options: java.util.Map[String, String]): DataStreamReader = {
+ /** @inheritdoc */
+ override def options(options: java.util.Map[String, String]): this.type = {
sourceBuilder.putAllOptions(options)
this
}
- /**
- * Loads input data stream in as a `DataFrame`, for data streams that don't require a path (e.g.
- * external key-value stores).
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def load(): DataFrame = {
sparkSession.newDataFrame { relationBuilder =>
relationBuilder.getReadBuilder
@@ -138,120 +84,14 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
}
}
- /**
- * Loads input in as a `DataFrame`, for data streams that read from some path.
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
def load(path: String): DataFrame = {
sourceBuilder.clearPaths()
sourceBuilder.addPaths(path)
load()
}
- /**
- * Loads a JSON file stream and returns the results as a `DataFrame`.
- *
- * JSON Lines (newline-delimited JSON) is supported by
- * default. For JSON (one record per file), set the `multiLine` option to true.
- *
- * This function goes through the input once to determine the input schema. If you know the
- * schema in advance, use the version that specifies the schema to avoid the extra scan.
- *
- * You can set the following option(s):
- `maxFilesPerTrigger` (default: no max limit):
- * sets the maximum number of new files to be considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
- * be considered in every trigger.
- *
- * You can find the JSON-specific options for reading JSON file stream in
- * Data Source Option in the version you use.
- *
- * @since 3.5.0
- */
- def json(path: String): DataFrame = {
- format("json").load(path)
- }
-
- /**
- * Loads a CSV file stream and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
- * sets the maximum number of new files to be considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
- * be considered in every trigger.
- *
- * You can find the CSV-specific options for reading CSV file stream in
- * Data Source Option in the version you use.
- *
- * @since 3.5.0
- */
- def csv(path: String): DataFrame = format("csv").load(path)
-
- /**
- * Loads a XML file stream and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
- * sets the maximum number of new files to be considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
- * be considered in every trigger.
- *
- * You can find the XML-specific options for reading XML file stream in
- * Data Source Option in the version you use.
- *
- * @since 4.0.0
- */
- def xml(path: String): DataFrame = format("xml").load(path)
-
- /**
- * Loads a ORC file stream, returning the result as a `DataFrame`.
- *
- * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
- * sets the maximum number of new files to be considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
- * be considered in every trigger.
- *
- * ORC-specific option(s) for reading ORC file stream can be found in Data
- * Source Option in the version you use.
- *
- * @since 3.5.0
- */
- def orc(path: String): DataFrame = format("orc").load(path)
-
- /**
- * Loads a Parquet file stream, returning the result as a `DataFrame`.
- *
- * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
- * sets the maximum number of new files to be considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
- * be considered in every trigger.
- *
- * Parquet-specific option(s) for reading Parquet file stream can be found in Data
- * Source Option in the version you use.
- *
- * @since 3.5.0
- */
- def parquet(path: String): DataFrame = format("parquet").load(path)
-
- /**
- * Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should
- * support streaming mode.
- * @param tableName
- * The name of the table
- * @since 3.5.0
- */
+ /** @inheritdoc */
def table(tableName: String): DataFrame = {
require(tableName != null, "The table name can't be null")
sparkSession.newDataFrame { builder =>
@@ -263,59 +103,44 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
}
}
- /**
- * Loads text files and returns a `DataFrame` whose schema starts with a string column named
- * "value", and followed by partitioned columns if there are any. The text files must be encoded
- * as UTF-8.
- *
- * By default, each line in the text files is a new row in the resulting DataFrame. For example:
- * {{{
- * // Scala:
- * spark.readStream.text("/path/to/directory/")
- *
- * // Java:
- * spark.readStream().text("/path/to/directory/")
- * }}}
- *
- * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
- * sets the maximum number of new files to be considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
- * be considered in every trigger.
- *
- * You can find the text-specific options for reading text files in
- * Data Source Option in the version you use.
- *
- * @since 3.5.0
- */
- def text(path: String): DataFrame = format("text").load(path)
-
- /**
- * Loads text file(s) and returns a `Dataset` of String. The underlying schema of the Dataset
- * contains a single string column named "value". The text files must be encoded as UTF-8.
- *
- * If the directory structure of the text files contains partitioning information, those are
- * ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
- *
- * By default, each line in the text file is a new element in the resulting Dataset. For
- * example:
- * {{{
- * // Scala:
- * spark.readStream.textFile("/path/to/spark/README.md")
- *
- * // Java:
- * spark.readStream().textFile("/path/to/spark/README.md")
- * }}}
- *
- * You can set the text-specific options as specified in `DataStreamReader.text`.
- *
- * @param path
- * input path
- * @since 3.5.0
- */
- def textFile(path: String): Dataset[String] = {
- text(path).select("value").as[String](StringEncoder)
+ override protected def assertNoSpecifiedSchema(operation: String): Unit = {
+ if (sourceBuilder.hasSchema) {
+ throw DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
+ }
}
- private val sourceBuilder = DataSource.newBuilder()
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // Covariant overrides.
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ /** @inheritdoc */
+ override def option(key: String, value: Boolean): this.type = super.option(key, value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Long): this.type = super.option(key, value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Double): this.type = super.option(key, value)
+
+ /** @inheritdoc */
+ override def json(path: String): DataFrame = super.json(path)
+
+ /** @inheritdoc */
+ override def csv(path: String): DataFrame = super.csv(path)
+
+ /** @inheritdoc */
+ override def xml(path: String): DataFrame = super.xml(path)
+
+ /** @inheritdoc */
+ override def orc(path: String): DataFrame = super.orc(path)
+
+ /** @inheritdoc */
+ override def parquet(path: String): DataFrame = super.parquet(path)
+
+ /** @inheritdoc */
+ override def text(path: String): DataFrame = super.text(path)
+
+ /** @inheritdoc */
+ override def textFile(path: String): Dataset[String] = super.textFile(path)
+
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 16f6983efb187..c8776af18a14a 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -304,7 +304,13 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.DataFrameReader.validateJsonSchema"),
ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.DataFrameReader.validateXmlSchema"))
+ "org.apache.spark.sql.DataFrameReader.validateXmlSchema"),
+
+ // Protected DataStreamReader methods...
+ ProblemFilters.exclude[DirectMissingMethodProblem](
+ "org.apache.spark.sql.streaming.DataStreamReader.validateJsonSchema"),
+ ProblemFilters.exclude[DirectMissingMethodProblem](
+ "org.apache.spark.sql.streaming.DataStreamReader.validateXmlSchema"))
checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules)
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9a89ebb4797c9..0bd0121e6e141 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -179,6 +179,7 @@ object MimaExcludes {
// SPARK-49282: Shared SparkSessionBuilder
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.SparkSession$Builder"),
) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
+ loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++
loggingExcludes("org.apache.spark.sql.SparkSession#Builder")
// Default exclude rules
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala
new file mode 100644
index 0000000000000..219ecb77d4033
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/DataStreamReader.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.api
+
+import scala.jdk.CollectionConverters._
+
+import _root_.java
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.{Encoders, Row}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 2.0.0
+ */
+@Evolving
+abstract class DataStreamReader {
+
+ /**
+ * Specifies the input data source format.
+ *
+ * @since 2.0.0
+ */
+ def format(source: String): this.type
+
+ /**
+ * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
+ * automatically from data. By specifying the schema here, the underlying data source can skip
+ * the schema inference step, and thus speed up data loading.
+ *
+ * @since 2.0.0
+ */
+ def schema(schema: StructType): this.type
+
+ /**
+ * Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON)
+ * can infer the input schema automatically from data. By specifying the schema here, the
+ * underlying data source can skip the schema inference step, and thus speed up data loading.
+ *
+ * @since 2.3.0
+ */
+ def schema(schemaString: String): this.type = {
+ schema(StructType.fromDDL(schemaString))
+ }
+
+ /**
+ * Adds an input option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: String): this.type
+
+ /**
+ * Adds an input option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: Boolean): this.type = option(key, value.toString)
+
+ /**
+ * Adds an input option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: Long): this.type = option(key, value.toString)
+
+ /**
+ * Adds an input option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: Double): this.type = option(key, value.toString)
+
+ /**
+ * (Scala-specific) Adds input options for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def options(options: scala.collection.Map[String, String]): this.type
+
+ /**
+ * (Java-specific) Adds input options for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def options(options: java.util.Map[String, String]): this.type = {
+ this.options(options.asScala)
+ this
+ }
+
+ /**
+ * Loads input data stream in as a `DataFrame`, for data streams that don't require a path (e.g.
+ * external key-value stores).
+ *
+ * @since 2.0.0
+ */
+ def load(): Dataset[Row]
+
+ /**
+ * Loads input in as a `DataFrame`, for data streams that read from some path.
+ *
+ * @since 2.0.0
+ */
+ def load(path: String): Dataset[Row]
+
+ /**
+ * Loads a JSON file stream and returns the results as a `DataFrame`.
+ *
+ * JSON Lines (newline-delimited JSON) is supported by
+ * default. For JSON (one record per file), set the `multiLine` option to true.
+ *
+ * This function goes through the input once to determine the input schema. If you know the
+ * schema in advance, use the version that specifies the schema to avoid the extra scan.
+ *
+ * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
+ * sets the maximum number of new files to be considered in every trigger.
+ * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
+ * be considered in every trigger.
+ *
+ * You can find the JSON-specific options for reading JSON file stream in
+ * Data Source Option in the version you use.
+ *
+ * @since 2.0.0
+ */
+ def json(path: String): Dataset[Row] = {
+ validateJsonSchema()
+ format("json").load(path)
+ }
+
+ /**
+ * Loads a CSV file stream and returns the result as a `DataFrame`.
+ *
+ * This function will go through the input once to determine the input schema if `inferSchema`
+ * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
+ * specify the schema explicitly using `schema`.
+ *
+ * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
+ * sets the maximum number of new files to be considered in every trigger.
+ * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
+ * be considered in every trigger.
+ *
+ * You can find the CSV-specific options for reading CSV file stream in
+ * Data Source Option in the version you use.
+ *
+ * @since 2.0.0
+ */
+ def csv(path: String): Dataset[Row] = format("csv").load(path)
+
+ /**
+ * Loads a XML file stream and returns the result as a `DataFrame`.
+ *
+ * This function will go through the input once to determine the input schema if `inferSchema`
+ * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
+ * specify the schema explicitly using `schema`.
+ *
+ * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
+ * sets the maximum number of new files to be considered in every trigger.
+ * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
+ * be considered in every trigger.
+ *
+ * You can find the XML-specific options for reading XML file stream in
+ * Data Source Option in the version you use.
+ *
+ * @since 4.0.0
+ */
+ def xml(path: String): Dataset[Row] = {
+ validateXmlSchema()
+ format("xml").load(path)
+ }
+
+ /**
+ * Loads a ORC file stream, returning the result as a `DataFrame`.
+ *
+ * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
+ * sets the maximum number of new files to be considered in every trigger.
+ * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
+ * be considered in every trigger.
+ *
+ * ORC-specific option(s) for reading ORC file stream can be found in Data
+ * Source Option in the version you use.
+ *
+ * @since 2.3.0
+ */
+ def orc(path: String): Dataset[Row] = {
+ format("orc").load(path)
+ }
+
+ /**
+ * Loads a Parquet file stream, returning the result as a `DataFrame`.
+ *
+ * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
+ * sets the maximum number of new files to be considered in every trigger.
+ * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
+ * be considered in every trigger.
+ *
+ * Parquet-specific option(s) for reading Parquet file stream can be found in Data
+ * Source Option in the version you use.
+ *
+ * @since 2.0.0
+ */
+ def parquet(path: String): Dataset[Row] = {
+ format("parquet").load(path)
+ }
+
+ /**
+ * Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should
+ * support streaming mode.
+ * @param tableName
+ * The name of the table
+ * @since 3.1.0
+ */
+ def table(tableName: String): Dataset[Row]
+
+ /**
+ * Loads text files and returns a `DataFrame` whose schema starts with a string column named
+ * "value", and followed by partitioned columns if there are any. The text files must be encoded
+ * as UTF-8.
+ *
+ * By default, each line in the text files is a new row in the resulting DataFrame. For example:
+ * {{{
+ * // Scala:
+ * spark.readStream.text("/path/to/directory/")
+ *
+ * // Java:
+ * spark.readStream().text("/path/to/directory/")
+ * }}}
+ *
+ * You can set the following option(s): - `maxFilesPerTrigger` (default: no max limit):
+ * sets the maximum number of new files to be considered in every trigger.
+ * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
+ * be considered in every trigger.
+ *
+ * You can find the text-specific options for reading text files in
+ * Data Source Option in the version you use.
+ *
+ * @since 2.0.0
+ */
+ def text(path: String): Dataset[Row] = format("text").load(path)
+
+ /**
+ * Loads text file(s) and returns a `Dataset` of String. The underlying schema of the Dataset
+ * contains a single string column named "value". The text files must be encoded as UTF-8.
+ *
+ * If the directory structure of the text files contains partitioning information, those are
+ * ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
+ *
+ * By default, each line in the text file is a new element in the resulting Dataset. For
+ * example:
+ * {{{
+ * // Scala:
+ * spark.readStream.textFile("/path/to/spark/README.md")
+ *
+ * // Java:
+ * spark.readStream().textFile("/path/to/spark/README.md")
+ * }}}
+ *
+ * You can set the text-specific options as specified in `DataStreamReader.text`.
+ *
+ * @param path
+ * input path
+ * @since 2.1.0
+ */
+ def textFile(path: String): Dataset[String] = {
+ assertNoSpecifiedSchema("textFile")
+ text(path).select("value").as(Encoders.STRING)
+ }
+
+ protected def assertNoSpecifiedSchema(operation: String): Unit
+
+ protected def validateJsonSchema(): Unit = ()
+
+ protected def validateXmlSchema(): Unit = ()
+
+}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 2295c153cd51c..0f73a94c3c4a4 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -506,6 +506,17 @@ abstract class SparkSession extends Serializable with Closeable {
*/
def read: DataFrameReader
+ /**
+ * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
+ * {{{
+ * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
+ * sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ def readStream: DataStreamReader
+
/**
* (Scala-specific) Implicit methods available in Scala for converting common Scala objects into
* `DataFrame`s.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index fe139d629eb24..983cc24718fd2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -739,15 +739,7 @@ class SparkSession private(
/** @inheritdoc */
def read: DataFrameReader = new DataFrameReader(self)
- /**
- * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
- * {{{
- * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
- * sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
- * }}}
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def readStream: DataStreamReader = new DataStreamReader(self)
// scalastyle:off
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 24d769fc8fc87..f42d8b667ab12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -22,12 +22,12 @@ import java.util.Locale
import scala.jdk.CollectionConverters._
import org.apache.spark.annotation.Evolving
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.{api, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
+import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -49,25 +49,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* @since 2.0.0
*/
@Evolving
-final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
- /**
- * Specifies the input data source format.
- *
- * @since 2.0.0
- */
- def format(source: String): DataStreamReader = {
+final class DataStreamReader private[sql](sparkSession: SparkSession) extends api.DataStreamReader {
+ /** @inheritdoc */
+ def format(source: String): this.type = {
this.source = source
this
}
- /**
- * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
- * automatically from data. By specifying the schema here, the underlying data source can
- * skip the schema inference step, and thus speed up data loading.
- *
- * @since 2.0.0
- */
- def schema(schema: StructType): DataStreamReader = {
+ /** @inheritdoc */
+ def schema(schema: StructType): this.type = {
if (schema != null) {
val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
this.userSpecifiedSchema = Option(replaced)
@@ -75,75 +65,19 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
this
}
- /**
- * Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON) can
- * infer the input schema automatically from data. By specifying the schema here, the underlying
- * data source can skip the schema inference step, and thus speed up data loading.
- *
- * @since 2.3.0
- */
- def schema(schemaString: String): DataStreamReader = {
- schema(StructType.fromDDL(schemaString))
- }
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: String): DataStreamReader = {
+ /** @inheritdoc */
+ def option(key: String, value: String): this.type = {
this.extraOptions += (key -> value)
this
}
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Long): DataStreamReader = option(key, value.toString)
-
- /**
- * Adds an input option for the underlying data source.
- *
- * @since 2.0.0
- */
- def option(key: String, value: Double): DataStreamReader = option(key, value.toString)
-
- /**
- * (Scala-specific) Adds input options for the underlying data source.
- *
- * @since 2.0.0
- */
- def options(options: scala.collection.Map[String, String]): DataStreamReader = {
+ /** @inheritdoc */
+ def options(options: scala.collection.Map[String, String]): this.type = {
this.extraOptions ++= options
this
}
- /**
- * (Java-specific) Adds input options for the underlying data source.
- *
- * @since 2.0.0
- */
- def options(options: java.util.Map[String, String]): DataStreamReader = {
- this.options(options.asScala)
- this
- }
-
-
- /**
- * Loads input data stream in as a `DataFrame`, for data streams that don't require a path
- * (e.g. external key-value stores).
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def load(): DataFrame = loadInternal(None)
private def loadInternal(path: Option[String]): DataFrame = {
@@ -205,11 +139,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
}
}
- /**
- * Loads input in as a `DataFrame`, for data streams that read from some path.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def load(path: String): DataFrame = {
if (!sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path")) {
@@ -218,133 +148,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
loadInternal(Some(path))
}
- /**
- * Loads a JSON file stream and returns the results as a `DataFrame`.
- *
- * JSON Lines (newline-delimited JSON) is supported by
- * default. For JSON (one record per file), set the `multiLine` option to true.
- *
- * This function goes through the input once to determine the input schema. If you know the
- * schema in advance, use the version that specifies the schema to avoid the extra scan.
- *
- * You can set the following option(s):
- *
- * - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
- * considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files
- * to be considered in every trigger.
- *
- *
- * You can find the JSON-specific options for reading JSON file stream in
- *
- * Data Source Option in the version you use.
- *
- * @since 2.0.0
- */
- def json(path: String): DataFrame = {
- userSpecifiedSchema.foreach(checkJsonSchema)
- format("json").load(path)
- }
-
- /**
- * Loads a CSV file stream and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can set the following option(s):
- *
- * - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
- * considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files
- * to be considered in every trigger.
- *
- *
- * You can find the CSV-specific options for reading CSV file stream in
- *
- * Data Source Option in the version you use.
- *
- * @since 2.0.0
- */
- def csv(path: String): DataFrame = format("csv").load(path)
-
- /**
- * Loads a XML file stream and returns the result as a `DataFrame`.
- *
- * This function will go through the input once to determine the input schema if `inferSchema`
- * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
- * specify the schema explicitly using `schema`.
- *
- * You can set the following option(s):
- *
- * - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
- * considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files
- * to be considered in every trigger.
- *
- *
- * You can find the XML-specific options for reading XML file stream in
- *
- * Data Source Option in the version you use.
- *
- * @since 4.0.0
- */
- def xml(path: String): DataFrame = {
- userSpecifiedSchema.foreach(checkXmlSchema)
- format("xml").load(path)
- }
-
- /**
- * Loads a ORC file stream, returning the result as a `DataFrame`.
- *
- * You can set the following option(s):
- *
- * - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
- * considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files
- * to be considered in every trigger.
- *
- *
- * ORC-specific option(s) for reading ORC file stream can be found in
- *
- * Data Source Option in the version you use.
- *
- * @since 2.3.0
- */
- def orc(path: String): DataFrame = {
- format("orc").load(path)
- }
-
- /**
- * Loads a Parquet file stream, returning the result as a `DataFrame`.
- *
- * You can set the following option(s):
- *
- * - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
- * considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files
- * to be considered in every trigger.
- *
- *
- * Parquet-specific option(s) for reading Parquet file stream can be found in
- *
- * Data Source Option in the version you use.
- *
- * @since 2.0.0
- */
- def parquet(path: String): DataFrame = {
- format("parquet").load(path)
- }
-
- /**
- * Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should
- * support streaming mode.
- * @param tableName The name of the table
- * @since 3.1.0
- */
+ /** @inheritdoc */
def table(tableName: String): DataFrame = {
require(tableName != null, "The table name can't be null")
val identifier = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
@@ -356,65 +160,56 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
isStreaming = true))
}
- /**
- * Loads text files and returns a `DataFrame` whose schema starts with a string column named
- * "value", and followed by partitioned columns if there are any.
- * The text files must be encoded as UTF-8.
- *
- * By default, each line in the text files is a new row in the resulting DataFrame. For example:
- * {{{
- * // Scala:
- * spark.readStream.text("/path/to/directory/")
- *
- * // Java:
- * spark.readStream().text("/path/to/directory/")
- * }}}
- *
- * You can set the following option(s):
- *
- * - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
- * considered in every trigger.
- * - `maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files
- * to be considered in every trigger.
- *
- *
- * You can find the text-specific options for reading text files in
- *
- * Data Source Option in the version you use.
- *
- * @since 2.0.0
- */
- def text(path: String): DataFrame = format("text").load(path)
-
- /**
- * Loads text file(s) and returns a `Dataset` of String. The underlying schema of the Dataset
- * contains a single string column named "value".
- * The text files must be encoded as UTF-8.
- *
- * If the directory structure of the text files contains partitioning information, those are
- * ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
- *
- * By default, each line in the text file is a new element in the resulting Dataset. For example:
- * {{{
- * // Scala:
- * spark.readStream.textFile("/path/to/spark/README.md")
- *
- * // Java:
- * spark.readStream().textFile("/path/to/spark/README.md")
- * }}}
- *
- * You can set the text-specific options as specified in `DataStreamReader.text`.
- *
- * @param path input path
- * @since 2.1.0
- */
- def textFile(path: String): Dataset[String] = {
+ override protected def assertNoSpecifiedSchema(operation: String): Unit = {
if (userSpecifiedSchema.nonEmpty) {
- throw QueryCompilationErrors.userSpecifiedSchemaUnsupportedError("textFile")
+ throw QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
}
- text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
+ override protected def validateJsonSchema(): Unit = userSpecifiedSchema.foreach(checkJsonSchema)
+
+ override protected def validateXmlSchema(): Unit = userSpecifiedSchema.foreach(checkXmlSchema)
+
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // Covariant overrides.
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ /** @inheritdoc */
+ override def schema(schemaString: String): this.type = super.schema(schemaString)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Boolean): this.type = super.option(key, value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Long): this.type = super.option(key, value)
+
+ /** @inheritdoc */
+ override def option(key: String, value: Double): this.type = super.option(key, value)
+
+ /** @inheritdoc */
+ override def options(options: java.util.Map[String, String]): this.type = super.options(options)
+
+ /** @inheritdoc */
+ override def json(path: String): DataFrame = super.json(path)
+
+ /** @inheritdoc */
+ override def csv(path: String): DataFrame = super.csv(path)
+
+ /** @inheritdoc */
+ override def xml(path: String): DataFrame = super.xml(path)
+
+ /** @inheritdoc */
+ override def orc(path: String): DataFrame = super.orc(path)
+
+ /** @inheritdoc */
+ override def parquet(path: String): DataFrame = super.parquet(path)
+
+ /** @inheritdoc */
+ override def text(path: String): DataFrame = super.text(path)
+
+ /** @inheritdoc */
+ override def textFile(path: String): Dataset[String] = super.textFile(path)
+
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////