Skip to content

Commit

Permalink
[SPARK-49413][CONNECT][SQL] Create a shared RuntimeConfig interface
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR introduces a shared RuntimeConfig interface.

### Why are the changes needed?
We are creating a shared Scala Spark SQL interface for Classic and Connect.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47980 from hvanhovell/SPARK-49413.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
  • Loading branch information
hvanhovell authored and attilapiros committed Oct 4, 2024
1 parent d27bbce commit d50fc64
Show file tree
Hide file tree
Showing 61 changed files with 317 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.connect.client.{ClassFinder, CloseableIterator, Spar
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.{CatalogImpl, SessionCleaner, SqlApiConf}
import org.apache.spark.sql.internal.{CatalogImpl, ConnectRuntimeConfig, SessionCleaner, SqlApiConf}
import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.{toExpr, toTypedExpr}
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.streaming.StreamingQueryManager
Expand Down Expand Up @@ -88,16 +88,8 @@ class SparkSession private[sql] (
client.hijackServerSideSessionIdForTesting(suffix)
}

/**
* Runtime configuration interface for Spark.
*
* This is the interface through which the user can get and set all Spark configurations that
* are relevant to Spark SQL. When getting the value of a config, his defaults to the value set
* in server, if any.
*
* @since 3.4.0
*/
val conf: RuntimeConfig = new RuntimeConfig(client)
/** @inheritdoc */
val conf: RuntimeConfig = new ConnectRuntimeConfig(client)

/** @inheritdoc */
@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,72 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package org.apache.spark.sql.internal

import org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, KeyValue}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.connect.client.SparkConnectClient

/**
* Runtime configuration interface for Spark. To access this, use `SparkSession.conf`.
*
* @since 3.4.0
*/
class RuntimeConfig private[sql] (client: SparkConnectClient) extends Logging {
class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
extends RuntimeConfig
with Logging {

/**
* Sets the given Spark runtime configuration property.
*
* @since 3.4.0
*/
/** @inheritdoc */
def set(key: String, value: String): Unit = {
executeConfigRequest { builder =>
builder.getSetBuilder.addPairsBuilder().setKey(key).setValue(value)
}
}

/**
* Sets the given Spark runtime configuration property.
*
* @since 3.4.0
*/
def set(key: String, value: Boolean): Unit = set(key, String.valueOf(value))

/**
* Sets the given Spark runtime configuration property.
*
* @since 3.4.0
*/
def set(key: String, value: Long): Unit = set(key, String.valueOf(value))

/**
* Returns the value of Spark runtime configuration property for the given key.
*
* @throws java.util.NoSuchElementException
* if the key is not set and does not have a default value
* @since 3.4.0
*/
/** @inheritdoc */
@throws[NoSuchElementException]("if the key is not set")
def get(key: String): String = getOption(key).getOrElse {
throw new NoSuchElementException(key)
}

/**
* Returns the value of Spark runtime configuration property for the given key.
*
* @since 3.4.0
*/
/** @inheritdoc */
def get(key: String, default: String): String = {
executeConfigRequestSingleValue { builder =>
builder.getGetWithDefaultBuilder.addPairsBuilder().setKey(key).setValue(default)
}
}

/**
* Returns all properties set in this conf.
*
* @since 3.4.0
*/
/** @inheritdoc */
def getAll: Map[String, String] = {
val response = executeConfigRequest { builder =>
builder.getGetAllBuilder
Expand All @@ -92,11 +63,7 @@ class RuntimeConfig private[sql] (client: SparkConnectClient) extends Logging {
builder.result()
}

/**
* Returns the value of Spark runtime configuration property for the given key.
*
* @since 3.4.0
*/
/** @inheritdoc */
def getOption(key: String): Option[String] = {
val pair = executeConfigRequestSinglePair { builder =>
builder.getGetOptionBuilder.addKeys(key)
Expand All @@ -108,27 +75,14 @@ class RuntimeConfig private[sql] (client: SparkConnectClient) extends Logging {
}
}

/**
* Resets the configuration property for the given key.
*
* @since 3.4.0
*/
/** @inheritdoc */
def unset(key: String): Unit = {
executeConfigRequest { builder =>
builder.getUnsetBuilder.addKeys(key)
}
}

/**
* Indicates whether the configuration property with the given key is modifiable in the current
* session.
*
* @return
* `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid
* (not existing) and other non-modifiable configuration properties, the returned value is
* `false`.
* @since 3.4.0
*/
/** @inheritdoc */
def isModifiable(key: String): Boolean = {
val modifiable = executeConfigRequestSingleValue { builder =>
builder.getIsModifiableBuilder.addKeys(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with

test("allow group.id prefix") {
// Group ID prefix is only supported by consumer based offset reader
if (spark.conf.get(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
if (sqlConf.getConf(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
testGroupId("groupIdPrefix", (expected, actual) => {
assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === expected),
"Valid consumer groups don't contain the expected group id - " +
Expand All @@ -1167,7 +1167,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with

test("allow group.id override") {
// Group ID override is only supported by consumer based offset reader
if (spark.conf.get(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
if (sqlConf.getConf(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
testGroupId("kafka.group.id", (expected, actual) => {
assert(actual.exists(_ === expected), "Valid consumer groups don't " +
s"contain the expected group id - Valid consumer groups: $actual / " +
Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriterV2"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WriteConfigMethods"),

// SPARK-49413: Create a shared RuntimeConfig interface.
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig$"),

// SPARK-49287: Shared Streaming interfaces
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.SparkListenerEvent"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ForeachWriter"),
Expand Down
105 changes: 105 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.annotation.Stable

/**
* Runtime configuration interface for Spark. To access this, use `SparkSession.conf`.
*
* Options set here are automatically propagated to the Hadoop configuration during I/O.
*
* @since 2.0.0
*/
@Stable
abstract class RuntimeConfig {

/**
* Sets the given Spark runtime configuration property.
*
* @since 2.0.0
*/
def set(key: String, value: String): Unit

/**
* Sets the given Spark runtime configuration property.
*
* @since 2.0.0
*/
def set(key: String, value: Boolean): Unit = {
set(key, value.toString)
}

/**
* Sets the given Spark runtime configuration property.
*
* @since 2.0.0
*/
def set(key: String, value: Long): Unit = {
set(key, value.toString)
}

/**
* Returns the value of Spark runtime configuration property for the given key.
*
* @throws java.util.NoSuchElementException
* if the key is not set and does not have a default value
* @since 2.0.0
*/
@throws[NoSuchElementException]("if the key is not set")
def get(key: String): String

/**
* Returns the value of Spark runtime configuration property for the given key.
*
* @since 2.0.0
*/
def get(key: String, default: String): String

/**
* Returns all properties set in this conf.
*
* @since 2.0.0
*/
def getAll: Map[String, String]

/**
* Returns the value of Spark runtime configuration property for the given key.
*
* @since 2.0.0
*/
def getOption(key: String): Option[String]

/**
* Resets the configuration property for the given key.
*
* @since 2.0.0
*/
def unset(key: String): Unit

/**
* Indicates whether the configuration property with the given key is modifiable in the current
* session.
*
* @return
* `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid
* (not existing) and other non-modifiable configuration properties, the returned value is
* `false`.
* @since 2.4.0
*/
def isModifiable(key: String): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import _root_.java.net.URI
import _root_.java.util

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.{Encoder, Row, RuntimeConfig}
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -58,6 +58,17 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with C
*/
def version: String

/**
* Runtime configuration interface for Spark.
*
* This is the interface through which the user can get and set all Spark and Hadoop
* configurations that are relevant to Spark SQL. When getting the value of a config, this
* defaults to the value set in the underlying `SparkContext`, if any.
*
* @since 2.0.0
*/
def conf: RuntimeConfig

/**
* A collection of methods for registering user-defined functions (UDF).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
* client, but rather enqueued to in the response observer.
*/
private def enqueueProgressMessage(force: Boolean = false): Unit = {
if (executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL) > 0) {
val progressReportInterval = executeHolder.sessionHolder.session.sessionState.conf
.getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
if (progressReportInterval > 0) {
SparkConnectService.executionListener.foreach { listener =>
// It is possible, that the tracker is no longer available and in this
// case we simply ignore it and do not send any progress message. This avoids
Expand Down Expand Up @@ -240,8 +242,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
// monitor, and will notify upon state change.
if (response.isEmpty) {
// Wake up more frequently to send the progress updates.
val progressTimeout =
executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL)
val progressTimeout = executeHolder.sessionHolder.session.sessionState.conf
.getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
// If the progress feature is disabled, wait for the deadline.
val timeout = if (progressTimeout > 0) {
progressTimeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
*/
private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)(
transform: proto.Relation => LogicalPlan): LogicalPlan = {
val planCacheEnabled =
Option(session).forall(_.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true))
val planCacheEnabled = Option(session)
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true))
// We only cache plans that have a plan ID.
val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ private[connect] object ErrorUtils extends Logging {
case _ =>
}

if (sessionHolderOpt.exists(_.session.conf.get(Connect.CONNECT_ENRICH_ERROR_ENABLED))) {
val enrichErrorEnabled = sessionHolderOpt.exists(
_.session.sessionState.conf.getConf(Connect.CONNECT_ENRICH_ERROR_ENABLED))
if (enrichErrorEnabled) {
// Generate a new unique key for this exception.
val errorId = UUID.randomUUID().toString

Expand All @@ -216,9 +218,10 @@ private[connect] object ErrorUtils extends Logging {
}

lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
val stackTraceEnabled = sessionHolderOpt.exists(
_.session.sessionState.conf.getConf(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED))
val withStackTrace =
if (sessionHolderOpt.exists(
_.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED) && stackTrace.nonEmpty)) {
if (stackTraceEnabled && stackTrace.nonEmpty) {
val maxSize = Math.min(
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE),
maxMetadataSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
test("Test session plan cache - disabled") {
val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
// Disable plan cache of the session
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, false)
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, false)
val planner = new SparkConnectPlanner(sessionHolder)

val query = buildRelation("select 1")
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class Dataset[T] private[sql](

@transient private[sql] val logicalPlan: LogicalPlan = {
val plan = queryExecution.commandExecuted
if (sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long])
dsIds.add(id)
plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
Expand Down Expand Up @@ -772,7 +772,7 @@ class Dataset[T] private[sql](
private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = {
val newExpr = expr transform {
case a: AttributeReference
if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) =>
if sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) =>
val metadata = new MetadataBuilder()
.withMetadata(a.metadata)
.putLong(Dataset.DATASET_ID_KEY, id)
Expand Down
Loading

0 comments on commit d50fc64

Please sign in to comment.