diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala index 3a1793f71794f..721d72b6a0991 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala @@ -168,12 +168,12 @@ class StateSchemaCompatibilityChecker( newStateSchema: List[StateStoreColFamilySchema], ignoreValueSchema: Boolean, stateSchemaVersion: Int): Boolean = { - val existingStateSchemaList = getExistingKeyAndValueSchema().sortBy(_.colFamilyName) - val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName) + val existingStateSchemaList = getExistingKeyAndValueSchema() + val newStateSchemaList = newStateSchema if (existingStateSchemaList.isEmpty) { // write the schema file if it doesn't exist - createSchemaFile(newStateSchemaList, stateSchemaVersion) + createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) true } else { // validate if the new schema is compatible with the existing schema @@ -188,9 +188,9 @@ class StateSchemaCompatibilityChecker( } } val colFamiliesAddedOrRemoved = - newStateSchemaList.map(_.colFamilyName) != existingStateSchemaList.map(_.colFamilyName) + (newStateSchemaList.map(_.colFamilyName).toSet != existingSchemaMap.keySet) if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) { - createSchemaFile(newStateSchemaList, stateSchemaVersion) + createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) } // TODO: [SPARK-49535] Write Schema files after schema has changed for StateSchemaV3 colFamiliesAddedOrRemoved