From ae6288bdf8b08e43c0b20a823f23750636a985f4 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 2 May 2023 14:51:27 +0900 Subject: [PATCH] [SPARK-43328][SS] Add latest timestamp on no-execution trigger for Idle event in streaming query listener ### What changes were proposed in this pull request? This PR proposes to add the latest timestamp on no-execution trigger for QueryIdleEvent. ### Why are the changes needed? This adds a "human readable" timestamp which is useful for user side verification as well as be consistent with other events. ### Does this PR introduce _any_ user-facing change? Yes, but QueryIdleEvent is not released yet. ### How was this patch tested? Existing tests. Closes #41001 from HeartSaVioR/SPARK-43328. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- python/pyspark/sql/streaming/listener.py | 8 ++++++++ .../spark/sql/execution/streaming/ProgressReporter.scala | 5 +++-- .../spark/sql/streaming/StreamingQueryListener.scala | 7 ++++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py index 83d866dcd492a..5fdcab3dfaf21 100644 --- a/python/pyspark/sql/streaming/listener.py +++ b/python/pyspark/sql/streaming/listener.py @@ -228,6 +228,7 @@ class QueryIdleEvent: def __init__(self, jevent: JavaObject) -> None: self._id: uuid.UUID = uuid.UUID(jevent.id().toString()) self._runId: uuid.UUID = uuid.UUID(jevent.runId().toString()) + self._timestamp: str = jevent.timestamp() @property def id(self) -> uuid.UUID: @@ -245,6 +246,13 @@ def runId(self) -> uuid.UUID: """ return self._runId + @property + def timestamp(self) -> str: + """ + The timestamp when the latest no-batch trigger happened. + """ + return self._timestamp + class QueryTerminatedEvent: """ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index aad4a29d85faa..6dbecd186dc64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -154,8 +154,9 @@ trait ProgressReporter extends Logging { } private def postIdleness(): Unit = { - postEvent(new QueryIdleEvent(id, runId)) - logInfo("Streaming query has been idle and waiting for new data.") + postEvent(new QueryIdleEvent(id, runId, formatTimestamp(currentTriggerStartTimestamp))) + logInfo(s"Streaming query has been idle and waiting for new data more than " + + s"${noDataProgressEventInterval} ms.") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 7ffeee60cc89b..aec5afe24c9e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -135,12 +135,17 @@ object StreamingQueryListener { /** * Event representing that query is idle and waiting for new data to process. + * + * @param id A unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param timestamp The timestamp when the latest no-batch trigger happened. * @since 3.5.0 */ @Evolving class QueryIdleEvent private[sql]( val id: UUID, - val runId: UUID) extends Event + val runId: UUID, + val timestamp: String) extends Event /** * Event representing that termination of a query.