Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44839][SS][CONNECT] Better Error Logging when user tries to serialize spark session #42594

Closed
wants to merge 7 commits into from

Conversation

WweiL
Copy link
Contributor

@WweiL WweiL commented Aug 21, 2023

What changes were proposed in this pull request?

Add a new error with detailed message when a user tries to access spark session and dataframe created using local spark session, in streaming spark connect foreach, foreachBatch and StreamingQueryListener.

Update: per reviewer's request, added a new error class PySparkPicklingError. Also move UDTF_SERIALIZATION_ERROR to the new class

Why are the changes needed?

Better error logging for the breaking change introduced in streaming spark connect.

Does this PR introduce any user-facing change?

Yes, before users can only see this non-informative error when they access a local spark session in their streaming connect related functions:

Traceback (most recent call last):
  File "/home/wei.liu/oss-spark/python/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/home/wei.liu/oss-spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/home/wei.liu/oss-spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread._local' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/wei.liu/oss-spark/python/pyspark/sql/connect/streaming/readwriter.py", line 508, in foreachBatch
    self._write_proto.foreach_batch.python_function.command = CloudPickleSerializer().dumps(
  File "/home/wei.liu/oss-spark/python/pyspark/serializers.py", line 469, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread._local' object

Now it is replaced with:

pyspark.errors.exceptions.base.PySparkPicklingError: [STREAMING_CONNECT_SERIALIZATION_ERROR] Cannot serialize the function `foreachBatch`. If you accessed the spark session, or a dataframe defined outside of the function, please be aware that they are not allowed in Spark Connect. For foreachBatch, please access the spark session using `df.sparkSession`, where `df` is the first parameter in your foreachBatch function. For StreamingQueryListener, please access the spark session using `self.spark`

How was this patch tested?

Add unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@WweiL
Copy link
Contributor Author

WweiL commented Aug 21, 2023

cc @rangadi @bogao007

try:
expr.command = CloudPickleSerializer().dumps(listener)
except pickle.PicklingError:
raise PySparkRuntimeError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itholic do we need a dedicated error class for PicklingError? e.g., PySparkPicklingError?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe we need a new error class for new type of user-facing errors. Could you add a new PySpark error class for representing pickle.PicklingError?? See https://github.com/apache/spark/pull/40938/files as an example. I think we can also do it as a follow ups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sure I could do that. Just want to confirm the ask is to define a new PySparkPicklingError and replace this PySparkRuntimeError with that right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define a new PySparkPicklingError and replace this PySparkRuntimeError with that right?

Correct :-)

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise looks good

python/pyspark/errors/error_classes.py Outdated Show resolved Hide resolved
try:
expr.command = CloudPickleSerializer().dumps(listener)
except pickle.PicklingError:
raise PySparkRuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe we need a new error class for new type of user-facing errors. Could you add a new PySpark error class for representing pickle.PicklingError?? See https://github.com/apache/spark/pull/40938/files as an example. I think we can also do it as a follow ups.

@WweiL
Copy link
Contributor Author

WweiL commented Aug 22, 2023

CC @ueshin, I also changed the serialization error thrown by UDTF, ptal!

@ueshin
Copy link
Member

ueshin commented Aug 22, 2023

cc @allisonwang-db

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UDTF change looks good to me!

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, pending tests.

@WweiL
Copy link
Contributor Author

WweiL commented Aug 23, 2023

@ueshin Can we have this merged? Thank you!

@HyukjinKwon
Copy link
Member

@WweiL seems like your PR conflicted with this PR :-). Should probably update.

@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants