Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50387][SS] Update condition for timer expiry and relevant test #48927

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

anishshri-db
Copy link
Contributor

What changes were proposed in this pull request?

Update condition for timer expiry and relevant test

Why are the changes needed?

To ensure that the expiry and removal conditions are consistent. Also, we don't have to wait for an extra microbatch to expire timers in certain cases.

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Added unit tests

[info] Run completed in 4 seconds, 638 milliseconds.
[info] Total number of tests run: 12
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Was this patch authored or co-authored using generative AI tooling?

No

@anishshri-db anishshri-db changed the title [SPARK-50387] Update condition for timer expiry and relevant test [SPARK-50387][SS] Update condition for timer expiry and relevant test Nov 22, 2024
@anishshri-db
Copy link
Contributor Author

cc - @HeartSaVioR - PTAL, thx !

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI. Nice finding.

@HeartSaVioR
Copy link
Contributor

https://github.com/anishshri-db/spark/actions/runs/11964792826/job/33357779164
I guess we have PySpark tests being impacted by this?

@anishshri-db
Copy link
Contributor Author

@HeartSaVioR - tried to run this locally and this fails even with/without my change. Let me merge back from master once

@anishshri-db
Copy link
Contributor Author

@HeartSaVioR - verified on master and I see the same assert locally as well. So don't think its related to this change

  File "/Users/anish.shrigondekar/spark/spark/python/pyspark/sql/utils.py", line 154, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/Users/anish.shrigondekar/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 549, in check_results
    assert set(batch_df.sort("id").collect()) == {Row(id="a", timestamp="20")}
AssertionError
 SQLSTATE: 39000

@jingz-db @bogao007 - is this a known issue ? do we have a PR to fix this already ?

@bogao007
Copy link
Contributor

bogao007 commented Nov 24, 2024

@HeartSaVioR - verified on master and I see the same assert locally as well. So don't think its related to this change

  File "/Users/anish.shrigondekar/spark/spark/python/pyspark/sql/utils.py", line 154, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/Users/anish.shrigondekar/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 549, in check_results
    assert set(batch_df.sort("id").collect()) == {Row(id="a", timestamp="20")}
AssertionError
 SQLSTATE: 39000

@jingz-db @bogao007 - is this a known issue ? do we have a PR to fix this already ?

I made a change previously to fix this issue https://github.com/apache/spark/pull/48805/files. Maybe 2s is not enough, @jingz-db could you help take a look?

@anishshri-db There might be an issue running locally if it's using local time zone since the test is using UTC time, I need to use option spark.conf.set("spark.sql.session.timeZone", "UTC") to make it work in local. That's not an issue in CI though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants