Skip to content

Commit

Permalink
[SPARK-50387][SS] Update condition for timer expiry and relevant test
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Update condition for timer expiry and relevant test

### Why are the changes needed?
To ensure that the expiry and removal conditions are consistent. Also, we don't have to wait for an extra microbatch to expire timers in certain cases.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Added unit tests

```
[info] Run completed in 4 seconds, 638 milliseconds.
[info] Total number of tests run: 12
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48927 from anishshri-db/task/SPARK-50387.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Nov 26, 2024
1 parent 331d0bf commit 69d433b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,11 @@ def check_results(batch_df, batch_id):
Row(id="a-expired", timestamp="0"),
}
else:
# watermark has not progressed, so timer registered in batch 1(watermark = 10)
# has not yet expired
assert set(batch_df.sort("id").collect()) == {Row(id="a", timestamp="15")}
# verify that rows and expired timer produce the expected result
assert set(batch_df.sort("id").collect()) == {
Row(id="a", timestamp="15"),
Row(id="a-expired", timestamp="10000"),
}

self._test_transform_with_state_in_pandas_event_time(
EventTimeStatefulProcessor(), check_results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class TimerStateImpl(
val rowPair = iter.next()
val keyRow = rowPair.key
val result = getTimerRowFromSecIndex(keyRow)
if (result._2 < expiryTimestampMs) {
if (result._2 <= expiryTimestampMs) {
result
} else {
finished = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class TimerSuite extends StateVariableSuiteBase {
assert(timerState1.listTimers().toSet === Set(15000L, 1000L))
assert(timerState1.getExpiredTimers(Long.MaxValue).toSeq ===
Seq(("test_key", 1000L), ("test_key", 15000L)))
// if timestamp equals to expiryTimestampsMs, will not considered expired
assert(timerState1.getExpiredTimers(15000L).toSeq === Seq(("test_key", 1000L)))
// if timestamp equals to expiryTimestampsMs, it will be considered expired
assert(timerState1.getExpiredTimers(15000L).toSeq ===
Seq(("test_key", 1000L), ("test_key", 15000L)))
assert(timerState1.listTimers().toSet === Set(15000L, 1000L))

timerState1.registerTimer(20L * 1000)
Expand Down Expand Up @@ -128,7 +129,7 @@ class TimerSuite extends StateVariableSuiteBase {
timerTimerstamps.foreach(timerState.registerTimer)
assert(timerState.getExpiredTimers(Long.MaxValue).toSeq.map(_._2) === timerTimerstamps.sorted)
assert(timerState.getExpiredTimers(4200L).toSeq.map(_._2) ===
timerTimerstamps.sorted.takeWhile(_ < 4200L))
timerTimerstamps.sorted.takeWhile(_ <= 4200L))
assert(timerState.getExpiredTimers(Long.MinValue).toSeq === Seq.empty)
ImplicitGroupingKeyTracker.removeImplicitKey()
}
Expand Down Expand Up @@ -162,7 +163,7 @@ class TimerSuite extends StateVariableSuiteBase {
(timerTimestamps1 ++ timerTimestamps2 ++ timerTimerStamps3).sorted)
assert(timerState1.getExpiredTimers(Long.MinValue).toSeq === Seq.empty)
assert(timerState1.getExpiredTimers(8000L).toSeq.map(_._2) ===
(timerTimestamps1 ++ timerTimestamps2 ++ timerTimerStamps3).sorted.takeWhile(_ < 8000L))
(timerTimestamps1 ++ timerTimestamps2 ++ timerTimerStamps3).sorted.takeWhile(_ <= 8000L))
}
}

Expand Down

0 comments on commit 69d433b

Please sign in to comment.