From d50fc64b2048f7ce2849e24adc77b63109860359 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 17 Sep 2024 17:09:09 -0400 Subject: [PATCH] [SPARK-49413][CONNECT][SQL] Create a shared RuntimeConfig interface ### What changes were proposed in this pull request? This PR introduces a shared RuntimeConfig interface. ### Why are the changes needed? We are creating a shared Scala Spark SQL interface for Classic and Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47980 from hvanhovell/SPARK-49413. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell --- .../org/apache/spark/sql/SparkSession.scala | 14 +-- .../ConnectRuntimeConfig.scala} | 70 ++---------- .../kafka010/KafkaMicroBatchSourceSuite.scala | 4 +- project/MimaExcludes.scala | 4 + .../org/apache/spark/sql/RuntimeConfig.scala | 105 ++++++++++++++++++ .../apache/spark/sql/api/SparkSession.scala | 13 ++- .../execution/ExecuteGrpcResponseSender.scala | 8 +- .../sql/connect/service/SessionHolder.scala | 4 +- .../spark/sql/connect/utils/ErrorUtils.scala | 9 +- .../SparkConnectSessionHolderSuite.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../org/apache/spark/sql/SparkSession.scala | 19 +--- .../spark/sql/artifact/ArtifactManager.scala | 2 +- .../spark/sql/execution/CacheManager.scala | 2 +- .../spark/sql/execution/SQLExecution.scala | 2 +- .../spark/sql/execution/command/ddl.scala | 2 +- .../spark/sql/execution/command/views.scala | 2 +- .../execution/datasources/DataSource.scala | 3 +- .../binaryfile/BinaryFileFormat.scala | 2 +- .../spark/sql/execution/datasources/ddl.scala | 2 +- .../v2/state/StateDataSource.scala | 8 +- .../execution/streaming/AsyncLogPurge.scala | 3 +- .../streaming/MicroBatchExecution.scala | 2 +- .../sql/execution/streaming/OffsetSeq.scala | 23 ++-- .../execution/streaming/StreamExecution.scala | 2 +- .../streaming/WatermarkTracker.scala | 3 +- .../RuntimeConfigImpl.scala} | 101 ++--------------- .../sql/streaming/StreamingQueryManager.scala | 2 +- .../spark/sql/FileBasedDataSourceSuite.scala | 2 +- .../org/apache/spark/sql/GenTPCDSData.scala | 2 +- .../org/apache/spark/sql/JoinSuite.scala | 4 +- .../apache/spark/sql/RuntimeConfigSuite.scala | 3 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 7 +- .../spark/sql/SparkSessionBuilderSuite.scala | 20 ++-- .../sql/SparkSessionExtensionSuite.scala | 6 +- .../sql/StatisticsCollectionTestBase.scala | 4 +- .../sql/connector/DataSourceV2SQLSuite.scala | 8 +- .../CoalesceShufflePartitionsSuite.scala | 4 +- .../columnar/InMemoryColumnarQuerySuite.scala | 2 +- .../columnar/PartitionBatchPruningSuite.scala | 4 +- .../datasources/ReadSchemaSuite.scala | 10 +- .../binaryfile/BinaryFileFormatSuite.scala | 2 +- .../execution/datasources/orc/OrcTest.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala | 4 +- .../StateDataSourceChangeDataReadSuite.scala | 2 +- .../v2/state/StateDataSourceReadSuite.scala | 4 +- .../RocksDBStateStoreIntegrationSuite.scala | 2 +- .../streaming/state/RocksDBSuite.scala | 11 +- .../sql/expressions/ExpressionInfoSuite.scala | 2 +- .../spark/sql/internal/SQLConfSuite.scala | 88 +++++++-------- .../spark/sql/sources/BucketedReadSuite.scala | 2 +- .../sql/streaming/FileStreamSinkSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 4 +- .../FlatMapGroupsWithStateSuite.scala | 2 +- .../spark/sql/streaming/StreamTest.scala | 5 +- .../streaming/TriggerAvailableNowSuite.scala | 2 +- .../spark/sql/test/SharedSparkSession.scala | 2 + .../spark/sql/hive/HiveSharedStateSuite.scala | 2 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- .../execution/HiveSerDeReadWriteSuite.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 4 +- 61 files changed, 317 insertions(+), 322 deletions(-) rename connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/{RuntimeConfig.scala => internal/ConnectRuntimeConfig.scala} (68%) create mode 100644 sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala rename sql/core/src/main/scala/org/apache/spark/sql/{RuntimeConfig.scala => internal/RuntimeConfigImpl.scala} (51%) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 209ec88618c43..989a7e0c174c5 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.connect.client.{ClassFinder, CloseableIterator, Spar import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration import org.apache.spark.sql.connect.client.arrow.ArrowSerializer import org.apache.spark.sql.functions.lit -import org.apache.spark.sql.internal.{CatalogImpl, SessionCleaner, SqlApiConf} +import org.apache.spark.sql.internal.{CatalogImpl, ConnectRuntimeConfig, SessionCleaner, SqlApiConf} import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.{toExpr, toTypedExpr} import org.apache.spark.sql.streaming.DataStreamReader import org.apache.spark.sql.streaming.StreamingQueryManager @@ -88,16 +88,8 @@ class SparkSession private[sql] ( client.hijackServerSideSessionIdForTesting(suffix) } - /** - * Runtime configuration interface for Spark. - * - * This is the interface through which the user can get and set all Spark configurations that - * are relevant to Spark SQL. When getting the value of a config, his defaults to the value set - * in server, if any. - * - * @since 3.4.0 - */ - val conf: RuntimeConfig = new RuntimeConfig(client) + /** @inheritdoc */ + val conf: RuntimeConfig = new ConnectRuntimeConfig(client) /** @inheritdoc */ @transient diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala similarity index 68% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala rename to connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala index f77dd512ef257..7578e2424fb42 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala @@ -14,10 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.internal import org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, KeyValue} import org.apache.spark.internal.Logging +import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.connect.client.SparkConnectClient /** @@ -25,61 +26,31 @@ import org.apache.spark.sql.connect.client.SparkConnectClient * * @since 3.4.0 */ -class RuntimeConfig private[sql] (client: SparkConnectClient) extends Logging { +class ConnectRuntimeConfig private[sql] (client: SparkConnectClient) + extends RuntimeConfig + with Logging { - /** - * Sets the given Spark runtime configuration property. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def set(key: String, value: String): Unit = { executeConfigRequest { builder => builder.getSetBuilder.addPairsBuilder().setKey(key).setValue(value) } } - /** - * Sets the given Spark runtime configuration property. - * - * @since 3.4.0 - */ - def set(key: String, value: Boolean): Unit = set(key, String.valueOf(value)) - - /** - * Sets the given Spark runtime configuration property. - * - * @since 3.4.0 - */ - def set(key: String, value: Long): Unit = set(key, String.valueOf(value)) - - /** - * Returns the value of Spark runtime configuration property for the given key. - * - * @throws java.util.NoSuchElementException - * if the key is not set and does not have a default value - * @since 3.4.0 - */ + /** @inheritdoc */ @throws[NoSuchElementException]("if the key is not set") def get(key: String): String = getOption(key).getOrElse { throw new NoSuchElementException(key) } - /** - * Returns the value of Spark runtime configuration property for the given key. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def get(key: String, default: String): String = { executeConfigRequestSingleValue { builder => builder.getGetWithDefaultBuilder.addPairsBuilder().setKey(key).setValue(default) } } - /** - * Returns all properties set in this conf. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def getAll: Map[String, String] = { val response = executeConfigRequest { builder => builder.getGetAllBuilder @@ -92,11 +63,7 @@ class RuntimeConfig private[sql] (client: SparkConnectClient) extends Logging { builder.result() } - /** - * Returns the value of Spark runtime configuration property for the given key. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def getOption(key: String): Option[String] = { val pair = executeConfigRequestSinglePair { builder => builder.getGetOptionBuilder.addKeys(key) @@ -108,27 +75,14 @@ class RuntimeConfig private[sql] (client: SparkConnectClient) extends Logging { } } - /** - * Resets the configuration property for the given key. - * - * @since 3.4.0 - */ + /** @inheritdoc */ def unset(key: String): Unit = { executeConfigRequest { builder => builder.getUnsetBuilder.addKeys(key) } } - /** - * Indicates whether the configuration property with the given key is modifiable in the current - * session. - * - * @return - * `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid - * (not existing) and other non-modifiable configuration properties, the returned value is - * `false`. - * @since 3.4.0 - */ + /** @inheritdoc */ def isModifiable(key: String): Boolean = { val modifiable = executeConfigRequestSingleValue { builder => builder.getIsModifiableBuilder.addKeys(key) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 9ae6a9290f80a..1d119de43970f 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1156,7 +1156,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with test("allow group.id prefix") { // Group ID prefix is only supported by consumer based offset reader - if (spark.conf.get(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) { + if (sqlConf.getConf(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) { testGroupId("groupIdPrefix", (expected, actual) => { assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === expected), "Valid consumer groups don't contain the expected group id - " + @@ -1167,7 +1167,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with test("allow group.id override") { // Group ID override is only supported by consumer based offset reader - if (spark.conf.get(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) { + if (sqlConf.getConf(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) { testGroupId("kafka.group.id", (expected, actual) => { assert(actual.exists(_ === expected), "Valid consumer groups don't " + s"contain the expected group id - Valid consumer groups: $actual / " + diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 6eee1e759e5ea..68433b501bcc4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -160,6 +160,10 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriterV2"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WriteConfigMethods"), + // SPARK-49413: Create a shared RuntimeConfig interface. + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig$"), + // SPARK-49287: Shared Streaming interfaces ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.SparkListenerEvent"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ForeachWriter"), diff --git a/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala new file mode 100644 index 0000000000000..23a2774ebc3a5 --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.annotation.Stable + +/** + * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`. + * + * Options set here are automatically propagated to the Hadoop configuration during I/O. + * + * @since 2.0.0 + */ +@Stable +abstract class RuntimeConfig { + + /** + * Sets the given Spark runtime configuration property. + * + * @since 2.0.0 + */ + def set(key: String, value: String): Unit + + /** + * Sets the given Spark runtime configuration property. + * + * @since 2.0.0 + */ + def set(key: String, value: Boolean): Unit = { + set(key, value.toString) + } + + /** + * Sets the given Spark runtime configuration property. + * + * @since 2.0.0 + */ + def set(key: String, value: Long): Unit = { + set(key, value.toString) + } + + /** + * Returns the value of Spark runtime configuration property for the given key. + * + * @throws java.util.NoSuchElementException + * if the key is not set and does not have a default value + * @since 2.0.0 + */ + @throws[NoSuchElementException]("if the key is not set") + def get(key: String): String + + /** + * Returns the value of Spark runtime configuration property for the given key. + * + * @since 2.0.0 + */ + def get(key: String, default: String): String + + /** + * Returns all properties set in this conf. + * + * @since 2.0.0 + */ + def getAll: Map[String, String] + + /** + * Returns the value of Spark runtime configuration property for the given key. + * + * @since 2.0.0 + */ + def getOption(key: String): Option[String] + + /** + * Resets the configuration property for the given key. + * + * @since 2.0.0 + */ + def unset(key: String): Unit + + /** + * Indicates whether the configuration property with the given key is modifiable in the current + * session. + * + * @return + * `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid + * (not existing) and other non-modifiable configuration properties, the returned value is + * `false`. + * @since 2.4.0 + */ + def isModifiable(key: String): Boolean +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala index cf502c746d24e..0580931620aaa 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala @@ -26,7 +26,7 @@ import _root_.java.net.URI import _root_.java.util import org.apache.spark.annotation.{DeveloperApi, Experimental} -import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.{Encoder, Row, RuntimeConfig} import org.apache.spark.sql.types.StructType /** @@ -58,6 +58,17 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with C */ def version: String + /** + * Runtime configuration interface for Spark. + * + * This is the interface through which the user can get and set all Spark and Hadoop + * configurations that are relevant to Spark SQL. When getting the value of a config, this + * defaults to the value set in the underlying `SparkContext`, if any. + * + * @since 2.0.0 + */ + def conf: RuntimeConfig + /** * A collection of methods for registering user-defined functions (UDF). * diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 3e360372d5600..051093fcad277 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -142,7 +142,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( * client, but rather enqueued to in the response observer. */ private def enqueueProgressMessage(force: Boolean = false): Unit = { - if (executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL) > 0) { + val progressReportInterval = executeHolder.sessionHolder.session.sessionState.conf + .getConf(CONNECT_PROGRESS_REPORT_INTERVAL) + if (progressReportInterval > 0) { SparkConnectService.executionListener.foreach { listener => // It is possible, that the tracker is no longer available and in this // case we simply ignore it and do not send any progress message. This avoids @@ -240,8 +242,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( // monitor, and will notify upon state change. if (response.isEmpty) { // Wake up more frequently to send the progress updates. - val progressTimeout = - executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL) + val progressTimeout = executeHolder.sessionHolder.session.sessionState.conf + .getConf(CONNECT_PROGRESS_REPORT_INTERVAL) // If the progress feature is disabled, wait for the deadline. val timeout = if (progressTimeout > 0) { progressTimeout diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 0cb820b39e875..e56d66da3050d 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -444,8 +444,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio */ private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)( transform: proto.Relation => LogicalPlan): LogicalPlan = { - val planCacheEnabled = - Option(session).forall(_.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)) + val planCacheEnabled = Option(session) + .forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)) // We only cache plans that have a plan ID. val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index 355048cf30363..f1636ed1ef092 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -205,7 +205,9 @@ private[connect] object ErrorUtils extends Logging { case _ => } - if (sessionHolderOpt.exists(_.session.conf.get(Connect.CONNECT_ENRICH_ERROR_ENABLED))) { + val enrichErrorEnabled = sessionHolderOpt.exists( + _.session.sessionState.conf.getConf(Connect.CONNECT_ENRICH_ERROR_ENABLED)) + if (enrichErrorEnabled) { // Generate a new unique key for this exception. val errorId = UUID.randomUUID().toString @@ -216,9 +218,10 @@ private[connect] object ErrorUtils extends Logging { } lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st)) + val stackTraceEnabled = sessionHolderOpt.exists( + _.session.sessionState.conf.getConf(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED)) val withStackTrace = - if (sessionHolderOpt.exists( - _.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED) && stackTrace.nonEmpty)) { + if (stackTraceEnabled && stackTrace.nonEmpty) { val maxSize = Math.min( SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE), maxMetadataSize) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala index beebe5d2e2dc1..ed2f60afb0096 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala @@ -399,7 +399,7 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession { test("Test session plan cache - disabled") { val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark) // Disable plan cache of the session - sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, false) + sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, false) val planner = new SparkConnectPlanner(sessionHolder) val query = buildRelation("select 1") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0fab60a948423..6e5dcc24e29dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -243,7 +243,7 @@ class Dataset[T] private[sql]( @transient private[sql] val logicalPlan: LogicalPlan = { val plan = queryExecution.commandExecuted - if (sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) { + if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) { val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long]) dsIds.add(id) plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds) @@ -772,7 +772,7 @@ class Dataset[T] private[sql]( private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = { val newExpr = expr transform { case a: AttributeReference - if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) => + if sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) => val metadata = new MetadataBuilder() .withMetadata(a.metadata) .putLong(Dataset.DATASET_ID_KEY, id) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a7fb71d95d147..5746b942341fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -173,16 +173,8 @@ class SparkSession private( @transient val sqlContext: SQLContext = new SQLContext(this) - /** - * Runtime configuration interface for Spark. - * - * This is the interface through which the user can get and set all Spark and Hadoop - * configurations that are relevant to Spark SQL. When getting the value of a config, - * this defaults to the value set in the underlying `SparkContext`, if any. - * - * @since 2.0.0 - */ - @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf) + /** @inheritdoc */ + @transient lazy val conf: RuntimeConfig = new RuntimeConfigImpl(sessionState.conf) /** * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s @@ -745,7 +737,8 @@ class SparkSession private( } private[sql] def leafNodeDefaultParallelism: Int = { - conf.get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM).getOrElse(sparkContext.defaultParallelism) + sessionState.conf.getConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM) + .getOrElse(sparkContext.defaultParallelism) } private[sql] object Converter extends ColumnNodeToExpressionConverter with Serializable { @@ -1110,13 +1103,13 @@ object SparkSession extends Logging { private[sql] def getOrCloneSessionWithConfigsOff( session: SparkSession, configurations: Seq[ConfigEntry[Boolean]]): SparkSession = { - val configsEnabled = configurations.filter(session.conf.get[Boolean]) + val configsEnabled = configurations.filter(session.sessionState.conf.getConf[Boolean]) if (configsEnabled.isEmpty) { session } else { val newSession = session.cloneSession() configsEnabled.foreach(conf => { - newSession.conf.set(conf, false) + newSession.sessionState.conf.setConf(conf, false) }) newSession } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala index 4eb7d4fa17eea..1ee960622fc2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala @@ -324,7 +324,7 @@ class ArtifactManager(session: SparkSession) extends Logging { val fs = destFSPath.getFileSystem(hadoopConf) if (fs.isInstanceOf[LocalFileSystem]) { val allowDestLocalConf = - session.conf.get(SQLConf.ARTIFACT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL) + session.sessionState.conf.getConf(SQLConf.ARTIFACT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL) .getOrElse( session.conf.get("spark.connect.copyFromLocalToFs.allowDestLocal").contains("true")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index aae424afcb8ac..1bf6f4e4d7d9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -474,7 +474,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // Bucketed scan only has one time overhead but can have multi-times benefits in cache, // so we always do bucketed scan in a cached plan. var disableConfigs = Seq(SQLConf.AUTO_BUCKETED_SCAN_ENABLED) - if (!session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { + if (!session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { // Allowing changing cached plan output partitioning might lead to regression as it introduces // extra shuffle disableConfigs = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 58fff2d4a1a29..12ff649b621e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -87,7 +87,7 @@ object SQLExecution extends Logging { executionIdToQueryExecution.put(executionId, queryExecution) val originalInterruptOnCancel = sc.getLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL) if (originalInterruptOnCancel == null) { - val interruptOnCancel = sparkSession.conf.get(SQLConf.INTERRUPT_ON_CANCEL) + val interruptOnCancel = sparkSession.sessionState.conf.getConf(SQLConf.INTERRUPT_ON_CANCEL) sc.setInterruptOnCancel(interruptOnCancel) } try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 3f221bfa53051..814e56b204f9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -861,7 +861,7 @@ case class RepairTableCommand( // Hive metastore may not have enough memory to handle millions of partitions in single RPC, // we should split them into smaller batches. Since Hive client is not thread safe, we cannot // do this in parallel. - val batchSize = spark.conf.get(SQLConf.ADD_PARTITION_BATCH_SIZE) + val batchSize = spark.sessionState.conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE) partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch => val now = MILLISECONDS.toSeconds(System.currentTimeMillis()) val parts = batch.map { case (spec, location) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index e1061a46db7b0..071e3826b20a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -135,7 +135,7 @@ case class CreateViewCommand( referredTempFunctions) catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace) } else if (viewType == GlobalTempView) { - val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(name.table, Option(db)) val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) val tableDefinition = createTemporaryViewRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d88b5ee8877d7..968c204841e46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -267,7 +267,8 @@ case class DataSource( checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) createInMemoryFileIndex(globbedPaths) }) - val forceNullable = sparkSession.conf.get(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE) + val forceNullable = sparkSession.sessionState.conf + .getConf(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE) val sourceDataSchema = if (forceNullable) dataSchema.asNullable else dataSchema SourceInfo( s"FileSource[$path]", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index cbff526592f92..54c100282e2db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -98,7 +98,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) val filterFuncs = filters.flatMap(filter => createFilterFunction(filter)) - val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) + val maxLength = sparkSession.sessionState.conf.getConf(SOURCES_BINARY_FILE_MAX_LENGTH) file: PartitionedFile => { val path = file.toPath diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index fc6cba786c4ed..d9367d92d462e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -115,7 +115,7 @@ case class CreateTempViewUsing( }.logicalPlan if (global) { - val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(tableIdent.table, Option(db)) val viewDefinition = createTemporaryViewRelation( viewIdent, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala index 83399e2cac01b..50b90641d309b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.sql.{RuntimeConfig, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.DataSourceOptions import org.apache.spark.sql.connector.catalog.{Table, TableProvider} import org.apache.spark.sql.connector.expressions.Transform @@ -119,9 +119,9 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging throw StateDataSourceErrors.offsetMetadataLogUnavailable(batchId, checkpointLocation) ) - val clonedRuntimeConf = new RuntimeConfig(session.sessionState.conf.clone()) - OffsetSeqMetadata.setSessionConf(metadata, clonedRuntimeConf) - StateStoreConf(clonedRuntimeConf.sqlConf) + val clonedSqlConf = session.sessionState.conf.clone() + OffsetSeqMetadata.setSessionConf(metadata, clonedSqlConf) + StateStoreConf(clonedSqlConf) case _ => throw StateDataSourceErrors.offsetLogUnavailable(batchId, checkpointLocation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala index 06fdc6c53bc4e..cb7e71bda84dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala @@ -49,7 +49,8 @@ trait AsyncLogPurge extends Logging { // which are written per run. protected def purgeStatefulMetadata(plan: SparkPlan): Unit - protected lazy val useAsyncPurge: Boolean = sparkSession.conf.get(SQLConf.ASYNC_LOG_PURGE) + protected lazy val useAsyncPurge: Boolean = sparkSession.sessionState.conf + .getConf(SQLConf.ASYNC_LOG_PURGE) protected def purgeAsync(batchId: Long): Unit = { if (purgeRunning.compareAndSet(false, true)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 285494543533c..053aef6ced3a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -477,7 +477,7 @@ class MicroBatchExecution( // update offset metadata nextOffsets.metadata.foreach { metadata => - OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf) + OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.sessionState.conf) execCtx.offsetSeqMetadata = OffsetSeqMetadata( metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf) watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index f0be33ad9a9d8..d5facc245e72f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -26,6 +26,7 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream} import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager, SymmetricHashJoinStateManager} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ @@ -135,20 +136,21 @@ object OffsetSeqMetadata extends Logging { } /** Set the SparkSession configuration with the values in the metadata */ - def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: RuntimeConfig): Unit = { + def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: SQLConf): Unit = { + val configs = sessionConf.getAllConfs OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey => metadata.conf.get(confKey) match { case Some(valueInMetadata) => // Config value exists in the metadata, update the session config with this value - val optionalValueInSession = sessionConf.getOption(confKey) - if (optionalValueInSession.isDefined && optionalValueInSession.get != valueInMetadata) { + val optionalValueInSession = sessionConf.getConfString(confKey, null) + if (optionalValueInSession != null && optionalValueInSession != valueInMetadata) { logWarning(log"Updating the value of conf '${MDC(CONFIG, confKey)}' in current " + - log"session from '${MDC(OLD_VALUE, optionalValueInSession.get)}' " + + log"session from '${MDC(OLD_VALUE, optionalValueInSession)}' " + log"to '${MDC(NEW_VALUE, valueInMetadata)}'.") } - sessionConf.set(confKey, valueInMetadata) + sessionConf.setConfString(confKey, valueInMetadata) case None => // For backward compatibility, if a config was not recorded in the offset log, @@ -157,14 +159,17 @@ object OffsetSeqMetadata extends Logging { relevantSQLConfDefaultValues.get(confKey) match { case Some(defaultValue) => - sessionConf.set(confKey, defaultValue) + sessionConf.setConfString(confKey, defaultValue) logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in the offset log, " + log"using default value '${MDC(DEFAULT_VALUE, defaultValue)}'") case None => - val valueStr = sessionConf.getOption(confKey).map { v => - s" Using existing session conf value '$v'." - }.getOrElse { " No value set in session conf." } + val value = sessionConf.getConfString(confKey, null) + val valueStr = if (value != null) { + s" Using existing session conf value '$value'." + } else { + " No value set in session conf." + } logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in the offset log. " + log"${MDC(TIP, valueStr)}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 4b1b9e02a242a..8f030884ad33b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -483,7 +483,7 @@ abstract class StreamExecution( @throws[TimeoutException] protected def interruptAndAwaitExecutionThreadTermination(): Unit = { val timeout = math.max( - sparkSession.conf.get(SQLConf.STREAMING_STOP_TIMEOUT), 0) + sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_TIMEOUT), 0) queryExecutionThread.interrupt() queryExecutionThread.join(timeout) if (queryExecutionThread.isAlive) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala index 54c47ec4e6ed8..3e6f122f463d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala @@ -135,7 +135,8 @@ object WatermarkTracker { // saved in the checkpoint (e.g., old checkpoints), then the default `min` policy is enforced // through defaults specified in OffsetSeqMetadata.setSessionConf(). val policyName = conf.get( - SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY, MultipleWatermarkPolicy.DEFAULT_POLICY_NAME) + SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key, + MultipleWatermarkPolicy.DEFAULT_POLICY_NAME) new WatermarkTracker(MultipleWatermarkPolicy(policyName)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala similarity index 51% rename from sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala rename to sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala index ed8cf4f121f03..ca439cdb89958 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala @@ -15,15 +15,15 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.internal import scala.jdk.CollectionConverters._ import org.apache.spark.SPARK_DOC_ROOT import org.apache.spark.annotation.Stable -import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} +import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.SQLConf /** * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`. @@ -33,89 +33,26 @@ import org.apache.spark.sql.internal.SQLConf * @since 2.0.0 */ @Stable -class RuntimeConfig private[sql](val sqlConf: SQLConf = new SQLConf) { +class RuntimeConfigImpl private[sql](val sqlConf: SQLConf = new SQLConf) extends RuntimeConfig { - /** - * Sets the given Spark runtime configuration property. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def set(key: String, value: String): Unit = { requireNonStaticConf(key) sqlConf.setConfString(key, value) } - /** - * Sets the given Spark runtime configuration property. - * - * @since 2.0.0 - */ - def set(key: String, value: Boolean): Unit = { - set(key, value.toString) - } - - /** - * Sets the given Spark runtime configuration property. - * - * @since 2.0.0 - */ - def set(key: String, value: Long): Unit = { - set(key, value.toString) - } - - /** - * Sets the given Spark runtime configuration property. - */ - private[sql] def set[T](entry: ConfigEntry[T], value: T): Unit = { - requireNonStaticConf(entry.key) - sqlConf.setConf(entry, value) - } - - /** - * Returns the value of Spark runtime configuration property for the given key. - * - * @throws java.util.NoSuchElementException if the key is not set and does not have a default - * value - * @since 2.0.0 - */ + /** @inheritdoc */ @throws[NoSuchElementException]("if the key is not set") def get(key: String): String = { sqlConf.getConfString(key) } - /** - * Returns the value of Spark runtime configuration property for the given key. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def get(key: String, default: String): String = { sqlConf.getConfString(key, default) } - /** - * Returns the value of Spark runtime configuration property for the given key. - */ - @throws[NoSuchElementException]("if the key is not set") - private[sql] def get[T](entry: ConfigEntry[T]): T = { - sqlConf.getConf(entry) - } - - private[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = { - sqlConf.getConf(entry) - } - - /** - * Returns the value of Spark runtime configuration property for the given key. - */ - private[sql] def get[T](entry: ConfigEntry[T], default: T): T = { - sqlConf.getConf(entry, default) - } - - /** - * Returns all properties set in this conf. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def getAll: Map[String, String] = { sqlConf.getAllConfs } @@ -124,36 +61,20 @@ class RuntimeConfig private[sql](val sqlConf: SQLConf = new SQLConf) { getAll.asJava } - /** - * Returns the value of Spark runtime configuration property for the given key. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def getOption(key: String): Option[String] = { try Option(get(key)) catch { case _: NoSuchElementException => None } } - /** - * Resets the configuration property for the given key. - * - * @since 2.0.0 - */ + /** @inheritdoc */ def unset(key: String): Unit = { requireNonStaticConf(key) sqlConf.unsetConf(key) } - /** - * Indicates whether the configuration property with the given key - * is modifiable in the current session. - * - * @return `true` if the configuration property is modifiable. For static SQL, Spark Core, - * invalid (not existing) and other non-modifiable configuration properties, - * the returned value is `false`. - * @since 2.4.0 - */ + /** @inheritdoc */ def isModifiable(key: String): Boolean = sqlConf.isModifiable(key) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 55d2e639a56b1..3ab6d02f6b515 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -364,7 +364,7 @@ class StreamingQueryManager private[sql] ( .orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ... val shouldStopActiveRun = - sparkSession.conf.get(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART) + sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART) if (activeOption.isDefined) { if (shouldStopActiveRun) { val oldQuery = activeOption.get diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 2fe6a83427bca..e44bd5de4f4c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -52,7 +52,7 @@ class FileBasedDataSourceSuite extends QueryTest override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native") + spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native") } override def afterAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala index 48a16f01d5749..6cd8ade41da14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala @@ -225,7 +225,7 @@ class TPCDSTables(spark: SparkSession, dsdgenDir: String, scaleFactor: Int) // datagen speed files will be truncated to maxRecordsPerFile value, so the final // result will be the same. val numRows = data.count() - val maxRecordPerFile = spark.conf.get(SQLConf.MAX_RECORDS_PER_FILE) + val maxRecordPerFile = spark.sessionState.conf.getConf(SQLConf.MAX_RECORDS_PER_FILE) if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) { val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index fcb937d82ba42..0f5582def82da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -597,10 +597,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan SQLConf.CROSS_JOINS_ENABLED.key -> "true") { assert(statisticSizeInByte(spark.table("testData2")) > - spark.conf.get[Long](SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) assert(statisticSizeInByte(spark.table("testData")) < - spark.conf.get[Long](SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala index 4052130720811..352197f96acb6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite import org.apache.spark.internal.config +import org.apache.spark.sql.internal.RuntimeConfigImpl import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE class RuntimeConfigSuite extends SparkFunSuite { - private def newConf(): RuntimeConfig = new RuntimeConfig + private def newConf(): RuntimeConfig = new RuntimeConfigImpl() test("set and get") { val conf = newConf() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9beceda263797..ce88f7dc475d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2568,20 +2568,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath) val newSession = spark.newSession() + val newSqlConf = newSession.sessionState.conf val originalValue = newSession.sessionState.conf.runSQLonFile try { - newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, false) + newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, false) intercept[AnalysisException] { newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`") } - newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, true) + newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, true) checkAnswer( newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`"), Row(1, "a")) } finally { - newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, originalValue) + newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 4ac05373e5a34..d3117ec411feb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -201,10 +201,10 @@ class SparkSessionBuilderSuite extends SparkFunSuite with Eventually { .getOrCreate() assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234") - assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234") + assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) === "globalTempDB-SPARK-31234") session.sql("RESET") assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234") - assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234") + assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) === "globalTempDB-SPARK-31234") } test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") { @@ -244,8 +244,8 @@ class SparkSessionBuilderSuite extends SparkFunSuite with Eventually { .builder() .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1") .getOrCreate() - assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532") - assert(session1.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532") + assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) === "globalTempDB-SPARK-31532") + assert(session1.conf.get(GLOBAL_TEMP_DATABASE.key) === "globalTempDB-SPARK-31532") // do not propagate static sql configs to the existing default session SparkSession.clearActiveSession() @@ -255,9 +255,9 @@ class SparkSessionBuilderSuite extends SparkFunSuite with Eventually { .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2") .getOrCreate() - assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db")) - assert(session.conf.get(WAREHOUSE_PATH) === session2.conf.get(WAREHOUSE_PATH)) - assert(session2.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532") + assert(!session.conf.get(WAREHOUSE_PATH.key).contains("SPARK-31532-db")) + assert(session.conf.get(WAREHOUSE_PATH.key) === session2.conf.get(WAREHOUSE_PATH.key)) + assert(session2.conf.get(GLOBAL_TEMP_DATABASE.key) === "globalTempDB-SPARK-31532") } test("SPARK-31532: propagate static sql configs if no existing SparkSession") { @@ -275,8 +275,8 @@ class SparkSessionBuilderSuite extends SparkFunSuite with Eventually { .config(WAREHOUSE_PATH.key, "SPARK-31532-db-2") .getOrCreate() assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2") - assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532-2") - assert(session.conf.get(WAREHOUSE_PATH) contains "SPARK-31532-db-2") + assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) === "globalTempDB-SPARK-31532-2") + assert(session.conf.get(WAREHOUSE_PATH.key) contains "SPARK-31532-db-2") } test("SPARK-32062: reset listenerRegistered in SparkSession") { @@ -461,7 +461,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with Eventually { val expected = path.getFileSystem(hadoopConf).makeQualified(path).toString // session related configs assert(hadoopConf.get("hive.metastore.warehouse.dir") === expected) - assert(session.conf.get(WAREHOUSE_PATH) === expected) + assert(session.conf.get(WAREHOUSE_PATH.key) === expected) assert(session.sessionState.conf.warehousePath === expected) // shared configs diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 322210bf5b59f..ba87028a71477 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -178,7 +178,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule())) } withSession(extensions) { session => - session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true) + session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, true) assert(session.sessionState.adaptiveRulesHolder.queryStagePrepRules .contains(MyQueryStagePrepRule())) assert(session.sessionState.columnarRules.contains( @@ -221,7 +221,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } withSession(extensions) { session => - session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true) + session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, true) session.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") assert(session.sessionState.columnarRules.contains( MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) @@ -280,7 +280,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } withSession(extensions) { session => - session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE) + session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, enableAQE) assert(session.sessionState.columnarRules.contains( MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) import session.implicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index ef8b66566f246..7fa29dd38fd96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -366,7 +366,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.") - if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") { + if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) == "hive") { sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)") sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1") val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats @@ -381,7 +381,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils // Test data source table checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true) // Test hive serde table - if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") { + if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) == "hive") { checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 5df7b62cfb285..7aaec6d500ba0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2481,7 +2481,7 @@ class DataSourceV2SQLSuiteV1Filter } test("global temp view should not be masked by v2 catalog") { - val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key) registerCatalog(globalTempDB, classOf[InMemoryTableCatalog]) try { @@ -2495,7 +2495,7 @@ class DataSourceV2SQLSuiteV1Filter } test("SPARK-30104: global temp db is used as a table name under v2 catalog") { - val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key) val t = s"testcat.$globalTempDB" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string) USING foo") @@ -2506,7 +2506,7 @@ class DataSourceV2SQLSuiteV1Filter } test("SPARK-30104: v2 catalog named global_temp will be masked") { - val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key) registerCatalog(globalTempDB, classOf[InMemoryTableCatalog]) checkError( exception = intercept[AnalysisException] { @@ -2712,7 +2712,7 @@ class DataSourceV2SQLSuiteV1Filter parameters = Map("relationName" -> "`testcat`.`abc`"), context = ExpectedContext(fragment = "testcat.abc", start = 17, stop = 27)) - val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key) registerCatalog(globalTempDB, classOf[InMemoryTableCatalog]) withTempView("v") { sql("create global temp view v as select 1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index dc72b4a092aef..9ed4f1a006b2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -317,7 +317,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with SQLConfHelper { import spark.implicits._ spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1KB") spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB") - spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0) + spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR.key, "2.0") val df00 = spark.range(0, 1000, 2) .selectExpr("id as key", "id as value") .union(Seq.fill(100000)((600, 600)).toDF("key", "value")) @@ -345,7 +345,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with SQLConfHelper { import spark.implicits._ spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "100B") - spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0) + spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR.key, "2.0") val df00 = spark.range(0, 10, 2) .selectExpr("id as key", "id as value") .union(Seq.fill(1000)((600, 600)).toDF("key", "value")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index ad755bf22ab09..0ba55382cd9a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -150,7 +150,7 @@ class InMemoryColumnarQuerySuite extends QueryTest spark.catalog.cacheTable("sizeTst") assert( spark.table("sizeTst").queryExecution.analyzed.stats.sizeInBytes > - spark.conf.get(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) + sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala index 885286843a143..88ff51d0ff4cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala @@ -27,9 +27,9 @@ class PartitionBatchPruningSuite extends SharedSparkSession with AdaptiveSparkPl import testImplicits._ - private lazy val originalColumnBatchSize = spark.conf.get(SQLConf.COLUMN_BATCH_SIZE) + private lazy val originalColumnBatchSize = spark.conf.get(SQLConf.COLUMN_BATCH_SIZE.key) private lazy val originalInMemoryPartitionPruning = - spark.conf.get(SQLConf.IN_MEMORY_PARTITION_PRUNING) + spark.conf.get(SQLConf.IN_MEMORY_PARTITION_PRUNING.key) private val testArrayData = (1 to 100).map { key => Tuple1(Array.fill(key)(key)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala index fefb16a351fdb..c798196c4f0ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala @@ -101,7 +101,7 @@ class OrcReadSchemaSuite override def beforeAll(): Unit = { super.beforeAll() - originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED) + originalConf = sqlConf.getConf(SQLConf.ORC_VECTORIZED_READER_ENABLED) spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false") } @@ -126,7 +126,7 @@ class VectorizedOrcReadSchemaSuite override def beforeAll(): Unit = { super.beforeAll() - originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED) + originalConf = sqlConf.getConf(SQLConf.ORC_VECTORIZED_READER_ENABLED) spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "true") } @@ -169,7 +169,7 @@ class ParquetReadSchemaSuite override def beforeAll(): Unit = { super.beforeAll() - originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + originalConf = sqlConf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false") } @@ -193,7 +193,7 @@ class VectorizedParquetReadSchemaSuite override def beforeAll(): Unit = { super.beforeAll() - originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + originalConf = sqlConf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true") } @@ -217,7 +217,7 @@ class MergedParquetReadSchemaSuite override def beforeAll(): Unit = { super.beforeAll() - originalConf = spark.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED) + originalConf = sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED) spark.conf.set(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key, "true") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index 3dec1b9ff5cf2..deb62eb3ac234 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -346,7 +346,7 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { } test("fail fast and do not attempt to read if a file is too big") { - assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue) + assert(sqlConf.getConf(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue) withTempPath { file => val path = file.getPath val content = "123".getBytes diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index 48b4f8d4bc015..b8669ee4d1ef1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -63,7 +63,7 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest with BeforeAndAfter protected override def beforeAll(): Unit = { super.beforeAll() - originalConfORCImplementation = spark.conf.get(ORC_IMPLEMENTATION) + originalConfORCImplementation = spark.sessionState.conf.getConf(ORC_IMPLEMENTATION) spark.conf.set(ORC_IMPLEMENTATION.key, orcImp) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 6c2f5a2d134db..0afa545595c77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -846,7 +846,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { withParquetFile(data) { path => - assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT)) { + assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION.key).toUpperCase(Locale.ROOT)) { compressionCodecFor(path, codec.name()) } } @@ -855,7 +855,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // Checks default compression codec checkCompressionCodec( - ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) + ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION.key))) ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala index 2858d356d4c9a..4833b8630134c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala @@ -58,7 +58,7 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED, false) + spark.conf.set(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key, false) spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key, newStateStoreProvider().getClass.getName) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index 97c88037a7171..af07707569500 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -942,7 +942,7 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass // skip version and operator ID to test out functionalities .load() - val numShufflePartitions = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) + val numShufflePartitions = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS) val resultDf = stateReadDf .selectExpr("key.value AS key_value", "value.count AS value_count", "partition_id") @@ -966,7 +966,7 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass } test("partition_id column with stream-stream join") { - val numShufflePartitions = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) + val numShufflePartitions = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS) withTempDir { tempDir => runStreamStreamJoinQueryWithOneThousandInputs(tempDir.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala index 8fcd6edf1abb7..d20cfb04f8e81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala @@ -119,7 +119,7 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest private def getFormatVersion(query: StreamingQuery): Int = { query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.sparkSession - .conf.get(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION) + .sessionState.conf.getConf(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION) } testWithColumnFamilies("SPARK-36519: store RocksDB format version in the checkpoint", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 7ac574db98d45..691f18451af22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -34,7 +34,7 @@ import org.rocksdb.CompressionType import org.scalactic.source.Position import org.scalatest.Tag -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, FileSystemBasedCheckpointFileManager} import org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} @@ -167,7 +167,10 @@ trait AlsoTestWithChangelogCheckpointingEnabled @SlowSQLTest class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with SharedSparkSession { - sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS, classOf[RocksDBStateStoreProvider].getName) + override protected def sparkConf: SparkConf = { + super.sparkConf + .set(SQLConf.STATE_STORE_PROVIDER_CLASS, classOf[RocksDBStateStoreProvider].getName) + } testWithColumnFamilies( "RocksDB: check changelog and snapshot version", @@ -2157,9 +2160,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } - private def sqlConf = SQLConf.get.clone() - - private def dbConf = RocksDBConf(StateStoreConf(sqlConf)) + private def dbConf = RocksDBConf(StateStoreConf(SQLConf.get.clone())) def withDB[T]( remoteDir: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index 898aeec22ad17..6eff610433c9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -243,7 +243,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { // Examples can change settings. We clone the session to prevent tests clashing. val clonedSpark = spark.cloneSession() // Coalescing partitions can change result order, so disable it. - clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED, false) + clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, false) val info = clonedSpark.sessionState.catalog.lookupFunctionInfo(funcId) val className = info.getClassName if (!ignoreSet.contains(className)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index d0d4dc6b344fc..82795e551b6bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -47,7 +47,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { // Set a conf first. spark.conf.set(testKey, testVal) // Clear the conf. - spark.sessionState.conf.clear() + sqlConf.clear() // After clear, only overrideConfs used by unit test should be in the SQLConf. assert(spark.conf.getAll === TestSQLContext.overrideConfs) @@ -62,11 +62,11 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { assert(spark.conf.get(testKey, testVal + "_") === testVal) assert(spark.conf.getAll.contains(testKey)) - spark.sessionState.conf.clear() + sqlConf.clear() } test("parse SQL set commands") { - spark.sessionState.conf.clear() + sqlConf.clear() sql(s"set $testKey=$testVal") assert(spark.conf.get(testKey, testVal + "_") === testVal) assert(spark.conf.get(testKey, testVal + "_") === testVal) @@ -84,11 +84,11 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { sql(s"set $key=") assert(spark.conf.get(key, "0") === "") - spark.sessionState.conf.clear() + sqlConf.clear() } test("set command for display") { - spark.sessionState.conf.clear() + sqlConf.clear() checkAnswer( sql("SET").where("key = 'spark.sql.groupByOrdinal'").select("key", "value"), Nil) @@ -109,11 +109,11 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("deprecated property") { - spark.sessionState.conf.clear() - val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) + sqlConf.clear() + val original = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS) try { sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") - assert(spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) === 10) + assert(sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS) === 10) } finally { sql(s"set ${SQLConf.SHUFFLE_PARTITIONS.key}=$original") } @@ -146,18 +146,18 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("reset - public conf") { - spark.sessionState.conf.clear() - val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL) + sqlConf.clear() + val original = sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL) try { - assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL)) + assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL)) sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false") - assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false) + assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL) === false) assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1) - assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty) + assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty) sql(s"reset") - assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL)) + assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL)) assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0) - assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) === + assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES) === Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation")) } finally { sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=$original") @@ -165,15 +165,15 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("reset - internal conf") { - spark.sessionState.conf.clear() - val original = spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) + sqlConf.clear() + val original = sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) try { - assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100) + assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100) sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}=10") - assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 10) + assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 10) assert(sql(s"set").where(s"key = '${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 1) sql(s"reset") - assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100) + assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100) assert(sql(s"set").where(s"key = '${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 0) } finally { sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}=$original") @@ -181,7 +181,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("reset - user-defined conf") { - spark.sessionState.conf.clear() + sqlConf.clear() val userDefinedConf = "x.y.z.reset" try { assert(spark.conf.getOption(userDefinedConf).isEmpty) @@ -196,7 +196,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("SPARK-32406: reset - single configuration") { - spark.sessionState.conf.clear() + sqlConf.clear() // spark core conf w/o entry registered val appId = spark.sparkContext.getConf.getAppId sql("RESET spark.app.id") @@ -216,19 +216,19 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { sql("RESET spark.abc") // ignore nonexistent keys // runtime sql configs - val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL) + val original = sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL) sql(s"SET ${SQLConf.GROUP_BY_ORDINAL.key}=false") sql(s"RESET ${SQLConf.GROUP_BY_ORDINAL.key}") - assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === original) + assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL) === original) // runtime sql configs with optional defaults - assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty) + assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty) sql(s"RESET ${SQLConf.OPTIMIZER_EXCLUDED_RULES.key}") - assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) === + assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES) === Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation")) sql(s"SET ${SQLConf.PLAN_CHANGE_LOG_RULES.key}=abc") sql(s"RESET ${SQLConf.PLAN_CHANGE_LOG_RULES.key}") - assert(spark.conf.get(SQLConf.PLAN_CHANGE_LOG_RULES).isEmpty) + assert(sqlConf.getConf(SQLConf.PLAN_CHANGE_LOG_RULES).isEmpty) // static sql configs checkError( @@ -247,19 +247,19 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("Test ADVISORY_PARTITION_SIZE_IN_BYTES's method") { - spark.sessionState.conf.clear() + sqlConf.clear() spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "100") - assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100) + assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100) spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1k") - assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024) + assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024) spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1M") - assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1048576) + assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1048576) spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g") - assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1073741824) + assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1073741824) // test negative value intercept[IllegalArgumentException] { @@ -277,7 +277,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-90000000000g") } - spark.sessionState.conf.clear() + sqlConf.clear() } test("SparkSession can access configs set in SparkConf") { @@ -305,7 +305,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { try { sparkContext.conf.set(GLOBAL_TEMP_DATABASE, "a") val newSession = new SparkSession(sparkContext) - assert(newSession.conf.get(GLOBAL_TEMP_DATABASE) == "a") + assert(newSession.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) == "a") checkAnswer( newSession.sql(s"SET ${GLOBAL_TEMP_DATABASE.key}"), Row(GLOBAL_TEMP_DATABASE.key, "a")) @@ -338,16 +338,16 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("SPARK-10365: PARQUET_OUTPUT_TIMESTAMP_TYPE") { - spark.sessionState.conf.clear() + sqlConf.clear() // check default value assert(spark.sessionState.conf.parquetOutputTimestampType == SQLConf.ParquetOutputTimestampType.INT96) - spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros") + sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros") assert(spark.sessionState.conf.parquetOutputTimestampType == SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS) - spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96") + sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96") assert(spark.sessionState.conf.parquetOutputTimestampType == SQLConf.ParquetOutputTimestampType.INT96) @@ -356,7 +356,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "invalid") } - spark.sessionState.conf.clear() + sqlConf.clear() } test("SPARK-22779: correctly compute default value for fallback configs") { @@ -373,10 +373,10 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { .get assert(displayValue === fallback.defaultValueString) - spark.conf.set(SQLConf.PARQUET_COMPRESSION, GZIP.lowerCaseName()) + sqlConf.setConf(SQLConf.PARQUET_COMPRESSION, GZIP.lowerCaseName()) assert(spark.conf.get(fallback.key) === GZIP.lowerCaseName()) - spark.conf.set(fallback, LZO.lowerCaseName()) + sqlConf.setConf(fallback, LZO.lowerCaseName()) assert(spark.conf.get(fallback.key) === LZO.lowerCaseName()) val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs @@ -459,10 +459,10 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { test("set time zone") { TimeZone.getAvailableIDs().foreach { zid => sql(s"set time zone '$zid'") - assert(spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE) === zid) + assert(sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE) === zid) } sql("set time zone local") - assert(spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE) === TimeZone.getDefault.getID) + assert(sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE) === TimeZone.getDefault.getID) val tz = "Invalid TZ" checkError( @@ -476,7 +476,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { (-18 to 18).map(v => (v, s"interval '$v' hours")).foreach { case (i, interval) => sql(s"set time zone $interval") - val zone = spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE) + val zone = sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE) if (i == 0) { assert(zone === "Z") } else { @@ -504,7 +504,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { test("SPARK-47765: set collation") { Seq("UNICODE", "UNICODE_CI", "utf8_lcase", "utf8_binary").foreach { collation => sql(s"set collation $collation") - assert(spark.conf.get(SQLConf.DEFAULT_COLLATION) === collation.toUpperCase(Locale.ROOT)) + assert(sqlConf.getConf(SQLConf.DEFAULT_COLLATION) === collation.toUpperCase(Locale.ROOT)) } checkError( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 8b11e0c69fa70..24732223c6698 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -54,7 +54,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti protected override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true) + spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING.key, true) } protected override def afterAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 168b6b8629926..e27ec32e287e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -50,7 +50,7 @@ abstract class FileStreamSinkSuite extends StreamTest { override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native") + spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native") } override def afterAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 56c4aecb23770..773be0cc08e3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -262,7 +262,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native") + spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native") } override def afterAll(): Unit = { @@ -1504,7 +1504,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // This is to avoid running a spark job to list of files in parallel // by the InMemoryFileIndex. - spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2) + spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key, numFiles * 2) withTempDirs { case (root, tmp) => val src = new File(root, "a=1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index f3ef73c6af5fa..f7ff39622ed40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -1163,7 +1163,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { func: (Int, Iterator[Int], GroupState[Int]) => Iterator[Int], timeoutType: GroupStateTimeout = GroupStateTimeout.NoTimeout, batchTimestampMs: Long = NO_TIMESTAMP): FlatMapGroupsWithStateExec = { - val stateFormatVersion = spark.conf.get(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION) + val stateFormatVersion = sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION) val emptyRdd = spark.sparkContext.emptyRDD[InternalRow] MemoryStream[Int] .toDS() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 7ab45e25799bc..68436c4e355b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -542,10 +542,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with val metadataRoot = Option(checkpointLocation).getOrElse(defaultCheckpointLocation) additionalConfs.foreach(pair => { - val value = - if (sparkSession.conf.contains(pair._1)) { - Some(sparkSession.conf.get(pair._1)) - } else None + val value = sparkSession.conf.getOption(pair._1) resetConfValues(pair._1) = value sparkSession.conf.set(pair._1, pair._2) }) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala index defd5fd110de6..a47c2f839692c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala @@ -265,7 +265,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { private def assertQueryUsingRightBatchExecutor( testSource: TestDataFrameProvider, query: StreamingQuery): Unit = { - val useWrapper = query.sparkSession.conf.get( + val useWrapper = query.sparkSession.sessionState.conf.getConf( SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED) if (useWrapper) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index ff1473fea369b..4d4cc44eb3e72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -103,6 +103,8 @@ trait SharedSparkSessionBase new TestSparkSession(sparkConf) } + protected def sqlConf: SQLConf = _spark.sessionState.conf + /** * Initialize the [[TestSparkSession]]. Generally, this is just called from * beforeAll; however, in test using styles other than FunSuite, there is diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index d84b9f7960231..8c6113fb5569d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -86,7 +86,7 @@ class HiveSharedStateSuite extends SparkFunSuite { assert(ss2.sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") !== invalidPath, "warehouse conf in session options can't affect application wide hadoop conf") assert(ss.conf.get("spark.foo") === "bar2222", "session level conf should be passed to catalog") - assert(!ss.conf.get(WAREHOUSE_PATH).contains(invalidPath), + assert(!ss.conf.get(WAREHOUSE_PATH.key).contains(invalidPath), "session level conf should be passed to catalog") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 69abb1d1673ed..865ce81e151c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -828,7 +828,7 @@ object SPARK_18360 { .enableHiveSupport().getOrCreate() val defaultDbLocation = spark.catalog.getDatabase("default").locationUri - assert(new Path(defaultDbLocation) == new Path(spark.conf.get(WAREHOUSE_PATH))) + assert(new Path(defaultDbLocation) == new Path(spark.conf.get(WAREHOUSE_PATH.key))) val hiveClient = spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index aafc4764d2465..1922144a92efa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -44,7 +44,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS super.beforeAll() originalConvertMetastoreParquet = spark.conf.get(CONVERT_METASTORE_PARQUET.key) originalConvertMetastoreORC = spark.conf.get(CONVERT_METASTORE_ORC.key) - originalORCImplementation = spark.conf.get(ORC_IMPLEMENTATION) + originalORCImplementation = spark.conf.get(ORC_IMPLEMENTATION.key) spark.conf.set(CONVERT_METASTORE_PARQUET.key, "false") spark.conf.set(CONVERT_METASTORE_ORC.key, "false") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3deb355e0e4a9..594c097de2c7d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -79,13 +79,13 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi test("query global temp view") { val df = Seq(1).toDF("i1") df.createGlobalTempView("tbl1") - val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE) + val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE.key) checkAnswer(spark.sql(s"select * from ${global_temp_db}.tbl1"), Row(1)) spark.sql(s"drop view ${global_temp_db}.tbl1") } test("non-existent global temp view") { - val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE) + val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE.key) val e = intercept[AnalysisException] { spark.sql(s"select * from ${global_temp_db}.nonexistentview") }