-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45794][SS] Introduce state metadata source to query the streaming state metadata information #43660
[SPARK-45794][SS] Introduce state metadata source to query the streaming state metadata information #43660
Conversation
@HeartSaVioR PTAL, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only minors. Maybe the important part is how/where to document this, probably along with state data source once it is merged. We could file a separate ticket for it.
|
||
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { | ||
() => { | ||
assert(options.containsKey("path"), "Must specify checkpoint path to read state metadata") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw an IllegalArgumentException
or proper error class. Let's do former and we can apply error class altogether for state data source & state metadata data source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
.add("operatorName", StringType) | ||
.add("stateStoreName", StringType) | ||
.add("numPartitions", IntegerType) | ||
.add("numColsPrefixKey", IntegerType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this be a metadata column? Probably adding a underbar as prefix as well - _numColsPrefixKey
.
This is purely an internal one and most users won't have a context for this. We never require users to know this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I make it a metadata column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Filed a JIRA ticket https://issues.apache.org/jira/browse/SPARK-45833 for addressing documentation on both state data source and state metadata source. |
CI only failed from Run / Build modules: pyspark-mllib, pyspark-ml, pyspark-ml-connect which is irrelevant. |
Thanks! Merging to master. |
What changes were proposed in this pull request?
Introduce a new data source so that user can query the metadata of each state store of a streaming query, the schema of the result will be following:
To use this source, specify the source format and checkpoint path and load the dataframe
df = spark.read.format(“state-metadata”).load(“/checkpointPath”)
Why are the changes needed?
To improve debugability. Also facilitate the query of state store data source introduced in SPARK-45511 by displaying the operator id, batch id and state store name.
Does this PR introduce any user-facing change?
Yes, this is a new source exposed to user.
How was this patch tested?
Add test to verify the output of state metadata
Was this patch authored or co-authored using generative AI tooling?
No.