From 84a65bd1cf95775c4210c6dac8026551fd9d150f Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Sat, 26 Aug 2023 21:51:29 -0700 Subject: [PATCH] [SPARK-42944][PYTHON][FOLLOW-UP] Rename tests from foreachBatch to foreach_batch ### What changes were proposed in this pull request? This PR proposes to rename tests from foreachBatch to foreach_batch. ### Why are the changes needed? Non-API should follow snake_naming rule per PEP 8. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? CI in this PR should test it out. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42675 from HyukjinKwon/pyspark-connect. Authored-by: Hyukjin Kwon Signed-off-by: Dongjoon Hyun --- .../planner/StreamingForeachBatchHelper.scala | 2 +- dev/sparktestsupport/modules.py | 4 +-- ...atch_worker.py => foreach_batch_worker.py} | 0 ...hBatch.py => test_parity_foreach_batch.py} | 12 ++++---- ...tch.py => test_streaming_foreach_batch.py} | 28 +++++++++---------- 5 files changed, 23 insertions(+), 23 deletions(-) rename python/pyspark/sql/connect/streaming/worker/{foreachBatch_worker.py => foreach_batch_worker.py} (100%) rename python/pyspark/sql/tests/connect/streaming/{test_parity_foreachBatch.py => test_parity_foreach_batch.py} (87%) rename python/pyspark/sql/tests/streaming/{test_streaming_foreachBatch.py => test_streaming_foreach_batch.py} (90%) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index ef7195439f9cd..c30e08bc39dd8 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -108,7 +108,7 @@ object StreamingForeachBatchHelper extends Logging { pythonFn, connectUrl, sessionHolder.sessionId, - "pyspark.sql.connect.streaming.worker.foreachBatch_worker") + "pyspark.sql.connect.streaming.worker.foreach_batch_worker") val (dataOut, dataIn) = runner.init() val foreachBatchRunnerFn: FnArgsWithId => Unit = (args: FnArgsWithId) => { diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 3c018ac7c832f..741b89466beb9 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -497,7 +497,7 @@ def __hash__(self): "pyspark.sql.tests.test_session", "pyspark.sql.tests.streaming.test_streaming", "pyspark.sql.tests.streaming.test_streaming_foreach", - "pyspark.sql.tests.streaming.test_streaming_foreachBatch", + "pyspark.sql.tests.streaming.test_streaming_foreach_batch", "pyspark.sql.tests.streaming.test_streaming_listener", "pyspark.sql.tests.test_types", "pyspark.sql.tests.test_udf", @@ -866,7 +866,7 @@ def __hash__(self): "pyspark.sql.tests.connect.streaming.test_parity_streaming", "pyspark.sql.tests.connect.streaming.test_parity_listener", "pyspark.sql.tests.connect.streaming.test_parity_foreach", - "pyspark.sql.tests.connect.streaming.test_parity_foreachBatch", + "pyspark.sql.tests.connect.streaming.test_parity_foreach_batch", "pyspark.sql.tests.connect.test_parity_pandas_grouped_map_with_state", "pyspark.sql.tests.connect.test_parity_pandas_udf_scalar", "pyspark.sql.tests.connect.test_parity_pandas_udf_grouped_agg", diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py similarity index 100% rename from python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py rename to python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_foreachBatch.py b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py similarity index 87% rename from python/pyspark/sql/tests/connect/streaming/test_parity_foreachBatch.py rename to python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py index 0718c6a88b0da..e4577173687d3 100644 --- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreachBatch.py +++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py @@ -17,19 +17,19 @@ import unittest -from pyspark.sql.tests.streaming.test_streaming_foreachBatch import StreamingTestsForeachBatchMixin +from pyspark.sql.tests.streaming.test_streaming_foreach_batch import StreamingTestsForeachBatchMixin from pyspark.testing.connectutils import ReusedConnectTestCase from pyspark.errors import PySparkPicklingError class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedConnectTestCase): @unittest.skip("SPARK-44463: Error handling needs improvement in connect foreachBatch") - def test_streaming_foreachBatch_propagates_python_errors(self): - super().test_streaming_foreachBatch_propagates_python_errors + def test_streaming_foreach_batch_propagates_python_errors(self): + super().test_streaming_foreach_batch_propagates_python_errors() @unittest.skip("This seems specific to py4j and pinned threads. The intention is unclear") - def test_streaming_foreachBatch_graceful_stop(self): - super().test_streaming_foreachBatch_graceful_stop() + def test_streaming_foreach_batch_graceful_stop(self): + super().test_streaming_foreach_batch_graceful_stop() # class StreamingForeachBatchParityTests(ReusedConnectTestCase): def test_accessing_spark_session(self): @@ -63,7 +63,7 @@ def func(df, _): if __name__ == "__main__": import unittest - from pyspark.sql.tests.connect.streaming.test_parity_foreachBatch import * # noqa: F401,E501 + from pyspark.sql.tests.connect.streaming.test_parity_foreach_batch import * # noqa: F401,E501 try: import xmlrunner # type: ignore[import] diff --git a/python/pyspark/sql/tests/streaming/test_streaming_foreachBatch.py b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py similarity index 90% rename from python/pyspark/sql/tests/streaming/test_streaming_foreachBatch.py rename to python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py index 65a0f6279fb08..af2831ef193a7 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_foreachBatch.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py @@ -25,7 +25,7 @@ def my_test_function_1(): class StreamingTestsForeachBatchMixin: - def test_streaming_foreachBatch(self): + def test_streaming_foreach_batch(self): q = None def collectBatch(batch_df, batch_id): @@ -41,7 +41,7 @@ def collectBatch(batch_df, batch_id): if q: q.stop() - def test_streaming_foreachBatch_tempview(self): + def test_streaming_foreach_batch_tempview(self): q = None def collectBatch(batch_df, batch_id): @@ -63,7 +63,7 @@ def collectBatch(batch_df, batch_id): if q: q.stop() - def test_streaming_foreachBatch_propagates_python_errors(self): + def test_streaming_foreach_batch_propagates_python_errors(self): from pyspark.errors import StreamingQueryException q = None @@ -82,7 +82,7 @@ def collectBatch(df, id): if q: q.stop() - def test_streaming_foreachBatch_graceful_stop(self): + def test_streaming_foreach_batch_graceful_stop(self): # SPARK-39218: Make foreachBatch streaming query stop gracefully def func(batch_df, _): batch_df.sparkSession._jvm.java.lang.Thread.sleep(10000) @@ -92,8 +92,8 @@ def func(batch_df, _): q.stop() self.assertIsNone(q.exception(), "No exception has to be propagated.") - def test_streaming_foreachBatch_spark_session(self): - table_name = "testTable_foreachBatch" + def test_streaming_foreach_batch_spark_session(self): + table_name = "testTable_foreach_batch" def func(df: DataFrame, batch_id: int): if batch_id > 0: # only process once @@ -115,8 +115,8 @@ def func(df: DataFrame, batch_id: int): ) self.assertEqual(sorted(df.collect()), sorted(actual.collect())) - def test_streaming_foreachBatch_path_access(self): - table_name = "testTable_foreachBatch_path" + def test_streaming_foreach_batch_path_access(self): + table_name = "testTable_foreach_batch_path" def func(df: DataFrame, batch_id: int): if batch_id > 0: # only process once @@ -141,11 +141,11 @@ def func(df: DataFrame, batch_id: int): def my_test_function_2(): return 2 - def test_streaming_foreachBatch_fuction_calling(self): + def test_streaming_foreach_batch_fuction_calling(self): def my_test_function_3(): return 3 - table_name = "testTable_foreachBatch_function" + table_name = "testTable_foreach_batch_function" def func(df: DataFrame, batch_id: int): if batch_id > 0: # only process once @@ -175,10 +175,10 @@ def func(df: DataFrame, batch_id: int): ) self.assertEqual(sorted(df.collect()), sorted(actual.collect())) - def test_streaming_foreachBatch_import(self): - import time # not imported in foreachBatch_worker + def test_streaming_foreach_batch_import(self): + import time # not imported in foreach_batch_worker - table_name = "testTable_foreachBatch_import" + table_name = "testTable_foreach_batch_import" def func(df: DataFrame, batch_id: int): if batch_id > 0: # only process once @@ -204,7 +204,7 @@ class StreamingTestsForeachBatch(StreamingTestsForeachBatchMixin, ReusedSQLTestC if __name__ == "__main__": import unittest - from pyspark.sql.tests.streaming.test_streaming_foreachBatch import * # noqa: F401 + from pyspark.sql.tests.streaming.test_streaming_foreach_batch import * # noqa: F401 try: import xmlrunner