Skip to content

Commit

Permalink
[SPARK-43328][SS] Add latest timestamp on no-execution trigger for Id…
Browse files Browse the repository at this point in the history
…le event in streaming query listener

### What changes were proposed in this pull request?

This PR proposes to add the latest timestamp on no-execution trigger for QueryIdleEvent.

### Why are the changes needed?

This adds a "human readable" timestamp which is useful for user side verification as well as be consistent with other events.

### Does this PR introduce _any_ user-facing change?

Yes, but QueryIdleEvent is not released yet.

### How was this patch tested?

Existing tests.

Closes apache#41001 from HeartSaVioR/SPARK-43328.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed May 2, 2023
1 parent 08a12b6 commit ae6288b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
8 changes: 8 additions & 0 deletions python/pyspark/sql/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class QueryIdleEvent:
def __init__(self, jevent: JavaObject) -> None:
self._id: uuid.UUID = uuid.UUID(jevent.id().toString())
self._runId: uuid.UUID = uuid.UUID(jevent.runId().toString())
self._timestamp: str = jevent.timestamp()

@property
def id(self) -> uuid.UUID:
Expand All @@ -245,6 +246,13 @@ def runId(self) -> uuid.UUID:
"""
return self._runId

@property
def timestamp(self) -> str:
"""
The timestamp when the latest no-batch trigger happened.
"""
return self._timestamp


class QueryTerminatedEvent:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ trait ProgressReporter extends Logging {
}

private def postIdleness(): Unit = {
postEvent(new QueryIdleEvent(id, runId))
logInfo("Streaming query has been idle and waiting for new data.")
postEvent(new QueryIdleEvent(id, runId, formatTimestamp(currentTriggerStartTimestamp)))
logInfo(s"Streaming query has been idle and waiting for new data more than " +
s"${noDataProgressEventInterval} ms.")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,17 @@ object StreamingQueryListener {

/**
* Event representing that query is idle and waiting for new data to process.
*
* @param id A unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param timestamp The timestamp when the latest no-batch trigger happened.
* @since 3.5.0
*/
@Evolving
class QueryIdleEvent private[sql](
val id: UUID,
val runId: UUID) extends Event
val runId: UUID,
val timestamp: String) extends Event

/**
* Event representing that termination of a query.
Expand Down

0 comments on commit ae6288b

Please sign in to comment.