Skip to content

Commit

Permalink
[SPARK-42944][PYTHON][FOLLOW-UP] Rename tests from foreachBatch to fo…
Browse files Browse the repository at this point in the history
…reach_batch

### What changes were proposed in this pull request?

This PR proposes to rename tests from foreachBatch to foreach_batch.

### Why are the changes needed?

Non-API should follow snake_naming rule per PEP 8.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

CI in this PR should test it out.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #42675 from HyukjinKwon/pyspark-connect.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
HyukjinKwon authored and dongjoon-hyun committed Aug 27, 2023
1 parent 9326615 commit 84a65bd
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ object StreamingForeachBatchHelper extends Logging {
pythonFn,
connectUrl,
sessionHolder.sessionId,
"pyspark.sql.connect.streaming.worker.foreachBatch_worker")
"pyspark.sql.connect.streaming.worker.foreach_batch_worker")
val (dataOut, dataIn) = runner.init()

val foreachBatchRunnerFn: FnArgsWithId => Unit = (args: FnArgsWithId) => {
Expand Down
4 changes: 2 additions & 2 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def __hash__(self):
"pyspark.sql.tests.test_session",
"pyspark.sql.tests.streaming.test_streaming",
"pyspark.sql.tests.streaming.test_streaming_foreach",
"pyspark.sql.tests.streaming.test_streaming_foreachBatch",
"pyspark.sql.tests.streaming.test_streaming_foreach_batch",
"pyspark.sql.tests.streaming.test_streaming_listener",
"pyspark.sql.tests.test_types",
"pyspark.sql.tests.test_udf",
Expand Down Expand Up @@ -866,7 +866,7 @@ def __hash__(self):
"pyspark.sql.tests.connect.streaming.test_parity_streaming",
"pyspark.sql.tests.connect.streaming.test_parity_listener",
"pyspark.sql.tests.connect.streaming.test_parity_foreach",
"pyspark.sql.tests.connect.streaming.test_parity_foreachBatch",
"pyspark.sql.tests.connect.streaming.test_parity_foreach_batch",
"pyspark.sql.tests.connect.test_parity_pandas_grouped_map_with_state",
"pyspark.sql.tests.connect.test_parity_pandas_udf_scalar",
"pyspark.sql.tests.connect.test_parity_pandas_udf_grouped_agg",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

import unittest

from pyspark.sql.tests.streaming.test_streaming_foreachBatch import StreamingTestsForeachBatchMixin
from pyspark.sql.tests.streaming.test_streaming_foreach_batch import StreamingTestsForeachBatchMixin
from pyspark.testing.connectutils import ReusedConnectTestCase
from pyspark.errors import PySparkPicklingError


class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedConnectTestCase):
@unittest.skip("SPARK-44463: Error handling needs improvement in connect foreachBatch")
def test_streaming_foreachBatch_propagates_python_errors(self):
super().test_streaming_foreachBatch_propagates_python_errors
def test_streaming_foreach_batch_propagates_python_errors(self):
super().test_streaming_foreach_batch_propagates_python_errors()

@unittest.skip("This seems specific to py4j and pinned threads. The intention is unclear")
def test_streaming_foreachBatch_graceful_stop(self):
super().test_streaming_foreachBatch_graceful_stop()
def test_streaming_foreach_batch_graceful_stop(self):
super().test_streaming_foreach_batch_graceful_stop()

# class StreamingForeachBatchParityTests(ReusedConnectTestCase):
def test_accessing_spark_session(self):
Expand Down Expand Up @@ -63,7 +63,7 @@ def func(df, _):

if __name__ == "__main__":
import unittest
from pyspark.sql.tests.connect.streaming.test_parity_foreachBatch import * # noqa: F401,E501
from pyspark.sql.tests.connect.streaming.test_parity_foreach_batch import * # noqa: F401,E501

try:
import xmlrunner # type: ignore[import]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def my_test_function_1():


class StreamingTestsForeachBatchMixin:
def test_streaming_foreachBatch(self):
def test_streaming_foreach_batch(self):
q = None

def collectBatch(batch_df, batch_id):
Expand All @@ -41,7 +41,7 @@ def collectBatch(batch_df, batch_id):
if q:
q.stop()

def test_streaming_foreachBatch_tempview(self):
def test_streaming_foreach_batch_tempview(self):
q = None

def collectBatch(batch_df, batch_id):
Expand All @@ -63,7 +63,7 @@ def collectBatch(batch_df, batch_id):
if q:
q.stop()

def test_streaming_foreachBatch_propagates_python_errors(self):
def test_streaming_foreach_batch_propagates_python_errors(self):
from pyspark.errors import StreamingQueryException

q = None
Expand All @@ -82,7 +82,7 @@ def collectBatch(df, id):
if q:
q.stop()

def test_streaming_foreachBatch_graceful_stop(self):
def test_streaming_foreach_batch_graceful_stop(self):
# SPARK-39218: Make foreachBatch streaming query stop gracefully
def func(batch_df, _):
batch_df.sparkSession._jvm.java.lang.Thread.sleep(10000)
Expand All @@ -92,8 +92,8 @@ def func(batch_df, _):
q.stop()
self.assertIsNone(q.exception(), "No exception has to be propagated.")

def test_streaming_foreachBatch_spark_session(self):
table_name = "testTable_foreachBatch"
def test_streaming_foreach_batch_spark_session(self):
table_name = "testTable_foreach_batch"

def func(df: DataFrame, batch_id: int):
if batch_id > 0: # only process once
Expand All @@ -115,8 +115,8 @@ def func(df: DataFrame, batch_id: int):
)
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))

def test_streaming_foreachBatch_path_access(self):
table_name = "testTable_foreachBatch_path"
def test_streaming_foreach_batch_path_access(self):
table_name = "testTable_foreach_batch_path"

def func(df: DataFrame, batch_id: int):
if batch_id > 0: # only process once
Expand All @@ -141,11 +141,11 @@ def func(df: DataFrame, batch_id: int):
def my_test_function_2():
return 2

def test_streaming_foreachBatch_fuction_calling(self):
def test_streaming_foreach_batch_fuction_calling(self):
def my_test_function_3():
return 3

table_name = "testTable_foreachBatch_function"
table_name = "testTable_foreach_batch_function"

def func(df: DataFrame, batch_id: int):
if batch_id > 0: # only process once
Expand Down Expand Up @@ -175,10 +175,10 @@ def func(df: DataFrame, batch_id: int):
)
self.assertEqual(sorted(df.collect()), sorted(actual.collect()))

def test_streaming_foreachBatch_import(self):
import time # not imported in foreachBatch_worker
def test_streaming_foreach_batch_import(self):
import time # not imported in foreach_batch_worker

table_name = "testTable_foreachBatch_import"
table_name = "testTable_foreach_batch_import"

def func(df: DataFrame, batch_id: int):
if batch_id > 0: # only process once
Expand All @@ -204,7 +204,7 @@ class StreamingTestsForeachBatch(StreamingTestsForeachBatchMixin, ReusedSQLTestC

if __name__ == "__main__":
import unittest
from pyspark.sql.tests.streaming.test_streaming_foreachBatch import * # noqa: F401
from pyspark.sql.tests.streaming.test_streaming_foreach_batch import * # noqa: F401

try:
import xmlrunner
Expand Down

0 comments on commit 84a65bd

Please sign in to comment.