From 8023504e69fdd037dea002e961b960fd9fa662ba Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 12 Sep 2024 12:01:08 +0900 Subject: [PATCH] [SPARK-49594][SS] Adding check on whether columnFamilies were added or removed to write StateSchemaV3 file ### What changes were proposed in this pull request? Up until this [PR](https://github.com/apache/spark/pull/47880) that enabled deleteIfExists, we changed the condition on which we throw an error. However, in doing so, we are not writing schema files whenever we add or remove column families, which is functionally incorrect. Additionally, we were initially always writing the newSchemaFilePath to the OperatorStateMetadata upon every new query run, when we should only do this if the schema changes. ### Why are the changes needed? These changes are needed because we want to write a schema file out every time we add or remove column families. Also, we want to make sure that we point to the old schema file for the current metadata file if the schema has not changed between this run and the last one, as opposed to populating the metadata with a new schema file path every time, even if this file is not created. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Amended unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #48067 from ericm-db/add-remove-cf. Authored-by: Eric Marnadi Signed-off-by: Jungtaek Lim --- .../StateSchemaCompatibilityChecker.scala | 40 +++- .../streaming/TransformWithStateSuite.scala | 219 +++++++++++++++++- 2 files changed, 250 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala index 90eb634689b23..3a1793f71794f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala @@ -27,6 +27,7 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, StatefulOperatorStateInfo} import org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, SchemaWriter} +import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker.SCHEMA_FORMAT_V3 import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.{DataType, StructType} @@ -95,7 +96,7 @@ class StateSchemaCompatibilityChecker( stateStoreColFamilySchema: List[StateStoreColFamilySchema], stateSchemaVersion: Int): Unit = { // Ensure that schema file path is passed explicitly for schema version 3 - if (stateSchemaVersion == 3 && newSchemaFilePath.isEmpty) { + if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFilePath.isEmpty) { throw new IllegalStateException("Schema file path is required for schema version 3") } @@ -186,8 +187,13 @@ class StateSchemaCompatibilityChecker( check(existingStateSchema, newSchema, ignoreValueSchema) } } + val colFamiliesAddedOrRemoved = + newStateSchemaList.map(_.colFamilyName) != existingStateSchemaList.map(_.colFamilyName) + if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) { + createSchemaFile(newStateSchemaList, stateSchemaVersion) + } // TODO: [SPARK-49535] Write Schema files after schema has changed for StateSchemaV3 - false + colFamiliesAddedOrRemoved } } @@ -196,6 +202,9 @@ class StateSchemaCompatibilityChecker( } object StateSchemaCompatibilityChecker { + + val SCHEMA_FORMAT_V3: Int = 3 + private def disallowBinaryInequalityColumn(schema: StructType): Unit = { if (!UnsafeRowUtils.isBinaryStable(schema)) { throw new SparkUnsupportedOperationException( @@ -275,10 +284,31 @@ object StateSchemaCompatibilityChecker { if (storeConf.stateSchemaCheckEnabled && result.isDefined) { throw result.get } - val schemaFileLocation = newSchemaFilePath match { - case Some(path) => path.toString - case None => checker.schemaFileLocation.toString + val schemaFileLocation = if (evolvedSchema) { + // if we are using the state schema v3, and we have + // evolved schema, this newSchemaFilePath should be defined + // and we want to populate the metadata with this file + if (stateSchemaVersion == SCHEMA_FORMAT_V3) { + newSchemaFilePath.get.toString + } else { + // if we are using any version less than v3, we have written + // the schema to this static location, which we will return + checker.schemaFileLocation.toString + } + } else { + // if we have not evolved schema (there has been a previous schema) + // and we are using state schema v3, this file path would be defined + // so we would just populate the next run's metadata file with this + // file path + if (stateSchemaVersion == SCHEMA_FORMAT_V3) { + oldSchemaFilePath.get.toString + } else { + // if we are using any version less than v3, we have written + // the schema to this static location, which we will return + checker.schemaFileLocation.toString + } } + StateSchemaValidationResult(evolvedSchema, schemaFileLocation) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index a17f3847323d5..d0e255bb30499 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -1448,6 +1448,10 @@ class TransformWithStateSuite extends StateStoreMetricsTest TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString, SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") { withTempDir { chkptDir => + val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") + val stateSchemaPath = getStateSchemaPath(stateOpIdPath) + + val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) // in this test case, we are changing the state spec back and forth // to trigger the writing of the schema and metadata files val inputData = MemoryStream[(String, String)] @@ -1483,6 +1487,11 @@ class TransformWithStateSuite extends StateStoreMetricsTest }, StopStream ) + // assert that a metadata and schema file has been written for each run + // as state variables have been deleted + assert(getFiles(metadataPath).length == 2) + assert(getFiles(stateSchemaPath).length == 2) + val result3 = inputData.toDS() .groupByKey(x => x._1) .transformWithState(new RunningCountMostRecentStatefulProcessor(), @@ -1512,10 +1521,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest }, StopStream ) - val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") - val stateSchemaPath = getStateSchemaPath(stateOpIdPath) - - val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) // by the end of the test, there have been 4 batches, // so the metadata and schema logs, and commitLog has been purged // for batches 0 and 1 so metadata and schema files exist for batches 0, 1, 2, 3 @@ -1527,6 +1532,116 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("transformWithState - verify that schema file is kept after metadata is purged") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") { + withTempDir { chkptDir => + val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") + val stateSchemaPath = getStateSchemaPath(stateOpIdPath) + + val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) + // in this test case, we are changing the state spec back and forth + // to trigger the writing of the schema and metadata files + val inputData = MemoryStream[(String, String)] + val result1 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new RunningCountMostRecentStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "1", "")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "2", "str1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + val result2 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new MostRecentStatefulProcessorWithDeletion(), + TimeMode.None(), + OutputMode.Update()) + testStream(result2, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str2")), + CheckNewAnswer(("a", "str1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + assert(getFiles(metadataPath).length == 3) + assert(getFiles(stateSchemaPath).length == 2) + + val result3 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new RunningCountMostRecentStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + testStream(result3, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str3")), + CheckNewAnswer(("a", "1", "str2")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + // metadata files should be kept for batches 1, 2, 3 + // schema files should be kept for batches 0, 2, 3 + assert(getFiles(metadataPath).length == 3) + assert(getFiles(stateSchemaPath).length == 3) + // we want to ensure that we can read batch 1 even though the + // metadata file for batch 0 was removed + val batch1Df = spark.read + .format("statestore") + .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath) + .option(StateSourceOptions.STATE_VAR_NAME, "countState") + .option(StateSourceOptions.BATCH_ID, 1) + .load() + + val batch1AnsDf = batch1Df.selectExpr( + "key.value AS groupingKey", + "single_value.value AS valueId") + + checkAnswer(batch1AnsDf, Seq(Row("a", 2L))) + + val batch3Df = spark.read + .format("statestore") + .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath) + .option(StateSourceOptions.STATE_VAR_NAME, "countState") + .option(StateSourceOptions.BATCH_ID, 3) + .load() + + val batch3AnsDf = batch3Df.selectExpr( + "key.value AS groupingKey", + "single_value.value AS valueId") + checkAnswer(batch3AnsDf, Seq(Row("a", 1L))) + } + } + } + test("state data source integration - value state supports time travel") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, @@ -1708,6 +1823,102 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } } + + test("transformWithState - verify that no metadata and schema logs are purged after" + + " removing column family") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "3") { + withTempDir { chkptDir => + val inputData = MemoryStream[(String, String)] + val result1 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new RunningCountMostRecentStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "1", "")), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "2", "str1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "1", "")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "2", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "3", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "4", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "5", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "6", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "7", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "8", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "9", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "10", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "11", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "12", "str1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + val result2 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new MostRecentStatefulProcessorWithDeletion(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result2, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("b", "str2")), + CheckNewAnswer(("b", "str1")), + AddData(inputData, ("b", "str3")), + CheckNewAnswer(("b", "str2")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + + val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") + val stateSchemaPath = getStateSchemaPath(stateOpIdPath) + + val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) + + // Metadata files are written for batches 0, 2, and 14. + // Schema files are written for 0, 14 + // At the beginning of the last query run, the thresholdBatchId is 11. + // However, we would need both schema files to be preserved, if we want to + // be able to read from batch 11 onwards. + assert(getFiles(metadataPath).length == 2) + assert(getFiles(stateSchemaPath).length == 2) + } + } + } } class TransformWithStateValidationSuite extends StateStoreMetricsTest {