Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-49618
Browse files Browse the repository at this point in the history
  • Loading branch information
ashahid committed Sep 14, 2024
2 parents 5c02cb3 + df0e34c commit 82b8cbe
Show file tree
Hide file tree
Showing 48 changed files with 7,308 additions and 167 deletions.
10 changes: 8 additions & 2 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,12 @@
],
"sqlState" : "42710"
},
"DATA_SOURCE_EXTERNAL_ERROR" : {
"message" : [
"Encountered error when saving to external data source."
],
"sqlState" : "KD00F"
},
"DATA_SOURCE_NOT_EXIST" : {
"message" : [
"Data source '<provider>' not found. Please make sure the data source is registered."
Expand Down Expand Up @@ -3118,12 +3124,12 @@
"subClass" : {
"NOT_ALLOWED_IN_SCOPE" : {
"message" : [
"Variable <varName> was declared on line <lineNumber>, which is not allowed in this scope."
"Declaration of the variable <varName> is not allowed in this scope."
]
},
"ONLY_AT_BEGINNING" : {
"message" : [
"Variable <varName> can only be declared at the beginning of the compound, but it was declared on line <lineNumber>."
"Variable <varName> can only be declared at the beginning of the compound."
]
}
},
Expand Down
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-states.json
Original file line number Diff line number Diff line change
Expand Up @@ -7417,6 +7417,12 @@
"standard": "N",
"usedBy": ["Databricks"]
},
"KD00F": {
"description": "external data source failure",
"origin": "Databricks",
"standard": "N",
"usedBy": ["Databricks"]
},
"P0000": {
"description": "procedural logic error",
"origin": "PostgreSQL",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
s"""CREATE TABLE pattern_testing_table (
|pattern_testing_col LONGTEXT
|)
""".stripMargin
|""".stripMargin
).executeUpdate()
connection.prepareStatement(
"CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)")
.executeUpdate()
}

override def dataPreparation(connection: Connection): Unit = {
super.dataPreparation(connection)
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
}

override def testUpdateColumnType(tbl: String): Unit = {
Expand Down Expand Up @@ -157,6 +168,79 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
assert(sql(s"SELECT char_length(c1) from $tableName").head().get(0) === 65536)
}
}

override def testDatetime(tbl: String): Unit = {
val df1 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")
checkFilterPushed(df1)
val rows1 = df1.collect()
assert(rows1.length === 2)
assert(rows1(0).getString(0) === "amy")
assert(rows1(1).getString(0) === "alex")

val df2 = sql(s"SELECT name FROM $tbl WHERE year(date1) = 2022 AND quarter(date1) = 2")
checkFilterPushed(df2)
val rows2 = df2.collect()
assert(rows2.length === 2)
assert(rows2(0).getString(0) === "amy")
assert(rows2(1).getString(0) === "alex")

val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0 AND month(date1) = 5")
checkFilterPushed(df3)
val rows3 = df3.collect()
assert(rows3.length === 2)
assert(rows3(0).getString(0) === "amy")
assert(rows3(1).getString(0) === "alex")

val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0")
checkFilterPushed(df4)
val rows4 = df4.collect()
assert(rows4.length === 2)
assert(rows4(0).getString(0) === "amy")
assert(rows4(1).getString(0) === "alex")

val df5 = sql(s"SELECT name FROM $tbl WHERE " +
"extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
checkFilterPushed(df5)
val rows5 = df5.collect()
assert(rows5.length === 2)
assert(rows5(0).getString(0) === "amy")
assert(rows5(1).getString(0) === "alex")

val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " +
"AND datediff(date1, '2022-05-10') > 0")
checkFilterPushed(df6)
val rows6 = df6.collect()
assert(rows6.length === 1)
assert(rows6(0).getString(0) === "amy")

val df7 = sql(s"SELECT name FROM $tbl WHERE weekday(date1) = 2")
checkFilterPushed(df7)
val rows7 = df7.collect()
assert(rows7.length === 1)
assert(rows7(0).getString(0) === "alex")

val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4")
checkFilterPushed(df8)
val rows8 = df8.collect()
assert(rows8.length === 1)
assert(rows8(0).getString(0) === "alex")

val df9 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
checkFilterPushed(df9)
val rows9 = df9.collect()
assert(rows9.length === 1)
assert(rows9(0).getString(0) === "alex")

// MySQL does not support
val df10 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'")
checkFilterPushed(df10, false)
val rows10 = df10.collect()
assert(rows10.length === 2)
assert(rows10(0).getString(0) === "amy")
assert(rows10(1).getString(0) === "alex")
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
}

private def checkFilterPushed(df: DataFrame, pushed: Boolean = true): Unit = {
protected def checkFilterPushed(df: DataFrame, pushed: Boolean = true): Unit = {
val filter = df.queryExecution.optimizedPlan.collect {
case f: Filter => f
}
Expand Down Expand Up @@ -980,4 +980,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
)
}
}

def testDatetime(tbl: String): Unit = {}

test("scan with filter push-down with date time functions") {
testDatetime(s"$catalogAndNamespace.${caseConvert("datetime")}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ trait SharedSparkContext extends BeforeAndAfterAll with BeforeAndAfterEach { sel

def sc: SparkContext = _sc

val conf = new SparkConf(false)
// SPARK-49647: use `SparkConf()` instead of `SparkConf(false)` because we want to
// load defaults from system properties and the classpath, including default test
// settings specified in the SBT and Maven build definitions.
val conf: SparkConf = new SparkConf()

/**
* Initialize the [[SparkContext]]. Generally, this is just called from beforeAll; however, in
Expand Down
2 changes: 1 addition & 1 deletion dev/create-release/spark-rm/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library
RUN add-apt-repository ppa:pypy/ppa
RUN mkdir -p /usr/local/pypy/pypy3.9 && \
curl -sqL https://downloads.python.org/pypy/pypy3.9-v7.3.16-linux64.tar.bz2 | tar xjf - -C /usr/local/pypy/pypy3.9 --strip-components=1 && \
ln -sf /usr/local/pypy/pypy3.9/bin/pypy /usr/local/bin/pypy3.8 && \
ln -sf /usr/local/pypy/pypy3.9/bin/pypy /usr/local/bin/pypy3.9 && \
ln -sf /usr/local/pypy/pypy3.9/bin/pypy /usr/local/bin/pypy3
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | pypy3
RUN pypy3 -m pip install numpy 'six==1.16.0' 'pandas==2.2.2' scipy coverage matplotlib lxml
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ JTransforms/3.1//JTransforms-3.1.jar
RoaringBitmap/1.2.1//RoaringBitmap-1.2.1.jar
ST4/4.0.4//ST4-4.0.4.jar
activation/1.1.1//activation-1.1.1.jar
aircompressor/0.27//aircompressor-0.27.jar
aircompressor/2.0.2//aircompressor-2.0.2.jar
algebra_2.13/2.8.0//algebra_2.13-2.8.0.jar
aliyun-java-sdk-core/4.5.10//aliyun-java-sdk-core-4.5.10.jar
aliyun-java-sdk-kms/2.11.0//aliyun-java-sdk-kms-2.11.0.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library
RUN add-apt-repository ppa:pypy/ppa
RUN mkdir -p /usr/local/pypy/pypy3.9 && \
curl -sqL https://downloads.python.org/pypy/pypy3.9-v7.3.16-linux64.tar.bz2 | tar xjf - -C /usr/local/pypy/pypy3.9 --strip-components=1 && \
ln -sf /usr/local/pypy/pypy3.9/bin/pypy /usr/local/bin/pypy3.8 && \
ln -sf /usr/local/pypy/pypy3.9/bin/pypy /usr/local/bin/pypy3.9 && \
ln -sf /usr/local/pypy/pypy3.9/bin/pypy /usr/local/bin/pypy3
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | pypy3
RUN pypy3 -m pip install 'numpy==1.26.4' 'six==1.16.0' 'pandas==2.2.2' scipy coverage matplotlib lxml
Expand Down
4 changes: 4 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ def __hash__(self):
"pyspark.sql.tests.test_udtf",
"pyspark.sql.tests.test_utils",
"pyspark.sql.tests.test_resources",
"pyspark.sql.tests.plot.test_frame_plot",
"pyspark.sql.tests.plot.test_frame_plot_plotly",
],
)

Expand Down Expand Up @@ -1051,6 +1053,8 @@ def __hash__(self):
"pyspark.sql.tests.connect.test_parity_arrow_cogrouped_map",
"pyspark.sql.tests.connect.test_parity_python_datasource",
"pyspark.sql.tests.connect.test_parity_python_streaming_datasource",
"pyspark.sql.tests.connect.test_parity_frame_plot",
"pyspark.sql.tests.connect.test_parity_frame_plot_plotly",
"pyspark.sql.tests.connect.test_utils",
"pyspark.sql.tests.connect.client.test_artifact",
"pyspark.sql.tests.connect.client.test_artifact_localcluster",
Expand Down
9 changes: 8 additions & 1 deletion docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ To enable authorization, Spark Master should have
`spark.master.rest.filters=org.apache.spark.ui.JWSFilter` and
`spark.org.apache.spark.ui.JWSFilter.param.secretKey=BASE64URL-ENCODED-KEY` configurations, and
client should provide HTTP `Authorization` header which contains JSON Web Token signed by
the shared secret key.
the shared secret key. Please note that this feature requires a Spark distribution built with
`jjwt` profile.

### YARN

Expand Down Expand Up @@ -813,6 +814,12 @@ They are generally private services, and should only be accessible within the ne
organization that deploys Spark. Access to the hosts and ports used by Spark services should
be limited to origin hosts that need to access the services.

However, like the REST Submission port, Spark also supports HTTP `Authorization` header
with a cryptographically signed JSON Web Token (JWT) for all UI ports.
To use it, a user needs the Spark distribution built with `jjwt` profile and to configure
`spark.ui.filters=org.apache.spark.ui.JWSFilter` and
`spark.org.apache.spark.ui.JWSFilter.param.secretKey=BASE64URL-ENCODED-KEY`.

Below are the primary ports that Spark uses for its communication and how to
configure those ports.

Expand Down
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ license: |
- Since Spark 4.0, By default views tolerate column type changes in the query and compensate with casts. To restore the previous behavior, allowing up-casts only, set `spark.sql.legacy.viewSchemaCompensation` to `false`.
- Since Spark 4.0, Views allow control over how they react to underlying query changes. By default views tolerate column type changes in the query and compensate with casts. To disable this feature set `spark.sql.legacy.viewSchemaBindingMode` to `false`. This also removes the clause from `DESCRIBE EXTENDED` and `SHOW CREATE TABLE`.
- Since Spark 4.0, The Storage-Partitioned Join feature flag `spark.sql.sources.v2.bucketing.pushPartValues.enabled` is set to `true`. To restore the previous behavior, set `spark.sql.sources.v2.bucketing.pushPartValues.enabled` to `false`.
- Since Spark 4.0, the `sentences` function uses `Locale(language)` instead of `Locale.US` when `language` parameter is not `NULL` and `country` parameter is `NULL`.

## Upgrading from Spark SQL 3.5.1 to 3.5.2

Expand Down
11 changes: 11 additions & 0 deletions licenses-binary/LICENSE-xz.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Permission to use, copy, modify, and/or distribute this
software for any purpose with or without fee is hereby granted.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2634,7 +2634,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>0.27</version>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,11 @@
"Function `<func_name>` should use only POSITIONAL or POSITIONAL OR KEYWORD arguments."
]
},
"UNSUPPORTED_PLOT_BACKEND": {
"message": [
"`<backend>` is not supported, it should be one of the values from <supported_backends>"
]
},
"UNSUPPORTED_SIGNATURE": {
"message": [
"Unsupported signature: <signature>."
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/classic/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from pyspark.sql.classic.column import _to_seq, _to_list, _to_java_column
from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
from pyspark.sql.merge import MergeIntoWriter
from pyspark.sql.plot import PySparkPlotAccessor
from pyspark.sql.streaming import DataStreamWriter
from pyspark.sql.types import (
StructType,
Expand Down Expand Up @@ -1862,6 +1863,10 @@ def executionInfo(self) -> Optional["ExecutionInfo"]:
messageParameters={"member": "queryExecution"},
)

@property
def plot(self) -> PySparkPlotAccessor:
return PySparkPlotAccessor(self)


class DataFrameNaFunctions(ParentDataFrameNaFunctions):
def __init__(self, df: ParentDataFrame):
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
UnresolvedStar,
)
from pyspark.sql.connect.functions import builtin as F
from pyspark.sql.plot import PySparkPlotAccessor
from pyspark.sql.pandas.types import from_arrow_schema, to_arrow_schema
from pyspark.sql.pandas.functions import _validate_pandas_udf # type: ignore[attr-defined]

Expand Down Expand Up @@ -2239,6 +2240,10 @@ def rdd(self) -> "RDD[Row]":
def executionInfo(self) -> Optional["ExecutionInfo"]:
return self._execution_info

@property
def plot(self) -> PySparkPlotAccessor:
return PySparkPlotAccessor(self)


class DataFrameNaFunctions(ParentDataFrameNaFunctions):
def __init__(self, df: ParentDataFrame):
Expand Down
27 changes: 27 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from pyspark.sql.column import Column
from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
from pyspark.sql.merge import MergeIntoWriter
from pyspark.sql.plot import PySparkPlotAccessor
from pyspark.sql.streaming import DataStreamWriter
from pyspark.sql.types import StructType, Row
from pyspark.sql.utils import dispatch_df_method
Expand Down Expand Up @@ -6394,6 +6395,32 @@ def executionInfo(self) -> Optional["ExecutionInfo"]:
"""
...

@property
def plot(self) -> PySparkPlotAccessor:
"""
Returns a :class:`PySparkPlotAccessor` for plotting functions.
.. versionadded:: 4.0.0
Returns
-------
:class:`PySparkPlotAccessor`
Notes
-----
This API is experimental.
Examples
--------
>>> data = [("A", 10, 1.5), ("B", 30, 2.5), ("C", 20, 3.5)]
>>> columns = ["category", "int_val", "float_val"]
>>> df = spark.createDataFrame(data, columns)
>>> type(df.plot)
<class 'pyspark.sql.plot.core.PySparkPlotAccessor'>
>>> df.plot.line(x="category", y=["int_val", "float_val"]) # doctest: +SKIP
"""
...


class DataFrameNaFunctions:
"""Functionality for working with missing data in :class:`DataFrame`.
Expand Down
21 changes: 21 additions & 0 deletions python/pyspark/sql/plot/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This package includes the plotting APIs for PySpark DataFrame.
"""
from pyspark.sql.plot.core import * # noqa: F403, F401
Loading

0 comments on commit 82b8cbe

Please sign in to comment.