-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50339][SPARK-50360][SS] Enable changelog to store lineage information #48880
base: master
Are you sure you want to change the base?
Conversation
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM. I left a test comment.
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a fan of the interface. You're essentially requiring the creator of the Changelog writer to first call writeLineage
, which feels like an abstraction leak and makes it error prone. Can the lineage be passed in to the writer instead instead of just the enableStateStoreCheckpointIds
? Or require that writeLineage
was called by an internal flag before put/merge/delete is called?
I would rather still prefer the former than the latter though
@@ -102,6 +115,13 @@ abstract class StateStoreChangelogWriter( | |||
fm.createAtomic(file, overwriteIfPossible = true) | |||
protected var compressedStream: DataOutputStream = compressStream(backingFileStream) | |||
|
|||
def writeLineage(lineage: Array[LineageItem]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anything that guarantees that this is going to be called first so that it goes into the first line?
+1 to @brkyvz about the interface and I also prefer former on the suggestion. Also, I'd like to see the changelog file to be "self described". It is indeed a change on the storage format to introduce the lineage. For the change of storage format, I'd rather say, we need to bump the version. I understand changelog file will be accessed after looking into commit log hence it's clear whether the query has uuid to be available or not, but still, we are drawing another axis of the versioning for the storage format of the changelog file. I'd like to see this be flattened (single axis of the timeline for the versioning). version 2 is superior to version 1 - it has more overhead but cover more functionality and I don't think the overhead would be outstanding. That said, why not come up with version 3 based on version 2 + lineage and let all operators use version 3 if uuid is available? I'm even OK with version 3 for version 1 + lineage and version 4 for version 2 + lineage if we are not very sure about the overhead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This V3, V4 stuff is really error prone... Can we please just introduce V3? Then introduce a writeHeader
to StateStoreChangelogWriter
. V1 doesn't write a version, V2 writes a version, V3 writes a version and lineage in this method.
Then, you should only have one Reader class. It just reads the first line, from there it knows whether it is v1 or something else. If it is a version it doesn't understand, then it'll throw an error along the lines of: "I don't know how to read this file". It can figure out the lineage as well if it is V3. Then it can read the data. The problem right now is that you've totally broken forward compatibility of changelogs - rather V1 and V2 is going to just fail with a FileNotFoundException if someone upgrades and then rolls back their Spark rather than a better error message of, "I can't read this stream"
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Show resolved
Hide resolved
// Also write lineage information to the changelog, it should appear | ||
// in the second line for v4 because the first line is the version | ||
writeLineage(stateStoreCheckpointIdLineage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this becomes tricky to guarantee based on class/trait/abstraction initialization...
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Outdated
Show resolved
Hide resolved
This is sadly not possible imho. Because v1 and v2 not only differ in whether having a "v2" in the first line, but their format are different. We likely still need to have v3 and v4.
Similarly for this one, I think a way out is to have a helper class that does this, i.e. it reads the version from the actual changelog file. Then the reader version should be decided by it. This is a tech debt indeed, I filed a spark ticket for it: https://github.com/WweiL/oss-spark/blob/8e439fcc43513f3048b51a8422c0ec136a0b624d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala#L211-L215 Do you think we can resolve it in a separate PR? |
@brkyvz That said, if we just want to avoid divergence of having multiple active versions, we probably need to cut the version differently, e.g. timeline. v3 understands the functionalities/features A, B, C and which features are used for this file is to put on the header, and reader parses the remaining differently based on the used features. v4 does not need to have the quite different storage format but just to enforce Spark (reader) to understand the features which were introduced with v4 to read the file for v4. I guess it's more proper versioning for reader vs writer scenario - we didn't do this because we didn't know the changelog is going to be improved and required divergence like this way. I'm still careful whether we will need to improve this area to even "have a protocol doc for versions", but we may want to further diverge because we are looking into using Avro for state row, so it might be inevitable... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the fact that we didn't initially implement these files with a proper protocol version, but at least we can do an assertion on the version checks...
Left some recommendations and please add tests for all the failure cases too on the versioning
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Show resolved
Hide resolved
writeLineage(stateStoreCheckpointIdLineage) | ||
} | ||
|
||
class StateStoreChangelogWriterV4( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scaladoc please. Why is this a new version? How is the encoding different
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a merge
operation in V2 reader / writer, also the eof token is different
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
Show resolved
Hide resolved
I ended up adding the helper class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're leaking streams in StateStoreChangelogReaderFactory. Can I ask you for a few more tests:
- Read a new version (v12) and make sure the error gets thrown
- Accidentally read a v1 file with v3 configurations. See what happens
- In the afterEach of tests, make sure that there are no leaked input/output streams
Will a stream be able to upgrade from 1 -> 3 and 2 -> 4? It seems impossible because the file names are not going to match
case f: FileNotFoundException => | ||
throw QueryExecutionErrors.failedToReadStreamingStateFileError(fileToRead, f) | ||
} | ||
protected val input: DataInputStream = decompressStream(sourceStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're leaking streams!
What changes were proposed in this pull request?
Break down #48355 into smaller PRs.
Changelog Reader / Writer
We purpose to save the lineage to the first line of the changelog files.
For changelog reader, there is an abstract function
readLineage
created. InRocksDBCheckpointManager.getChangelogReader
function, thereadLineage
will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. SubsequentgetNext
function won't be affecter because of this.For changelog writer, there is an abstract function
writeLineage
that writes the lineage. This function will be called before any actual changelog data is written inRocksDB.load()
.The lineage is stored as json.
Why are the changes needed?
Continue development of SPARK-49374
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit test
Was this patch authored or co-authored using generative AI tooling?
No