Skip to content

Commit

Permalink
[SPARK-49687][SQL] Delay sorting in validateAndMaybeEvolveStateSchema
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In `validateAndMaybeEvolveStateSchema`, existing schema and new schema are sorted by column family name.
The sorting can be delayed until `createSchemaFile` is called.
When computing `colFamiliesAddedOrRemoved`, we can use `toSet` to compare column families.

### Why are the changes needed?
This would make `validateAndMaybeEvolveStateSchema` faster.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48116 from tedyu/ty-comp-chk.

Authored-by: Zhihong Yu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
tedyu authored and attilapiros committed Oct 4, 2024
1 parent 9abbf7d commit d27bbce
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ class StateSchemaCompatibilityChecker(
newStateSchema: List[StateStoreColFamilySchema],
ignoreValueSchema: Boolean,
stateSchemaVersion: Int): Boolean = {
val existingStateSchemaList = getExistingKeyAndValueSchema().sortBy(_.colFamilyName)
val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName)
val existingStateSchemaList = getExistingKeyAndValueSchema()
val newStateSchemaList = newStateSchema

if (existingStateSchemaList.isEmpty) {
// write the schema file if it doesn't exist
createSchemaFile(newStateSchemaList, stateSchemaVersion)
createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion)
true
} else {
// validate if the new schema is compatible with the existing schema
Expand All @@ -188,9 +188,9 @@ class StateSchemaCompatibilityChecker(
}
}
val colFamiliesAddedOrRemoved =
newStateSchemaList.map(_.colFamilyName) != existingStateSchemaList.map(_.colFamilyName)
(newStateSchemaList.map(_.colFamilyName).toSet != existingSchemaMap.keySet)
if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) {
createSchemaFile(newStateSchemaList, stateSchemaVersion)
createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion)
}
// TODO: [SPARK-49535] Write Schema files after schema has changed for StateSchemaV3
colFamiliesAddedOrRemoved
Expand Down

0 comments on commit d27bbce

Please sign in to comment.