diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala index 90eb634689b23..3a1793f71794f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala @@ -27,6 +27,7 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, StatefulOperatorStateInfo} import org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, SchemaWriter} +import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker.SCHEMA_FORMAT_V3 import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.{DataType, StructType} @@ -95,7 +96,7 @@ class StateSchemaCompatibilityChecker( stateStoreColFamilySchema: List[StateStoreColFamilySchema], stateSchemaVersion: Int): Unit = { // Ensure that schema file path is passed explicitly for schema version 3 - if (stateSchemaVersion == 3 && newSchemaFilePath.isEmpty) { + if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFilePath.isEmpty) { throw new IllegalStateException("Schema file path is required for schema version 3") } @@ -186,8 +187,13 @@ class StateSchemaCompatibilityChecker( check(existingStateSchema, newSchema, ignoreValueSchema) } } + val colFamiliesAddedOrRemoved = + newStateSchemaList.map(_.colFamilyName) != existingStateSchemaList.map(_.colFamilyName) + if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) { + createSchemaFile(newStateSchemaList, stateSchemaVersion) + } // TODO: [SPARK-49535] Write Schema files after schema has changed for StateSchemaV3 - false + colFamiliesAddedOrRemoved } } @@ -196,6 +202,9 @@ class StateSchemaCompatibilityChecker( } object StateSchemaCompatibilityChecker { + + val SCHEMA_FORMAT_V3: Int = 3 + private def disallowBinaryInequalityColumn(schema: StructType): Unit = { if (!UnsafeRowUtils.isBinaryStable(schema)) { throw new SparkUnsupportedOperationException( @@ -275,10 +284,31 @@ object StateSchemaCompatibilityChecker { if (storeConf.stateSchemaCheckEnabled && result.isDefined) { throw result.get } - val schemaFileLocation = newSchemaFilePath match { - case Some(path) => path.toString - case None => checker.schemaFileLocation.toString + val schemaFileLocation = if (evolvedSchema) { + // if we are using the state schema v3, and we have + // evolved schema, this newSchemaFilePath should be defined + // and we want to populate the metadata with this file + if (stateSchemaVersion == SCHEMA_FORMAT_V3) { + newSchemaFilePath.get.toString + } else { + // if we are using any version less than v3, we have written + // the schema to this static location, which we will return + checker.schemaFileLocation.toString + } + } else { + // if we have not evolved schema (there has been a previous schema) + // and we are using state schema v3, this file path would be defined + // so we would just populate the next run's metadata file with this + // file path + if (stateSchemaVersion == SCHEMA_FORMAT_V3) { + oldSchemaFilePath.get.toString + } else { + // if we are using any version less than v3, we have written + // the schema to this static location, which we will return + checker.schemaFileLocation.toString + } } + StateSchemaValidationResult(evolvedSchema, schemaFileLocation) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index a17f3847323d5..d0e255bb30499 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -1448,6 +1448,10 @@ class TransformWithStateSuite extends StateStoreMetricsTest TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString, SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") { withTempDir { chkptDir => + val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") + val stateSchemaPath = getStateSchemaPath(stateOpIdPath) + + val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) // in this test case, we are changing the state spec back and forth // to trigger the writing of the schema and metadata files val inputData = MemoryStream[(String, String)] @@ -1483,6 +1487,11 @@ class TransformWithStateSuite extends StateStoreMetricsTest }, StopStream ) + // assert that a metadata and schema file has been written for each run + // as state variables have been deleted + assert(getFiles(metadataPath).length == 2) + assert(getFiles(stateSchemaPath).length == 2) + val result3 = inputData.toDS() .groupByKey(x => x._1) .transformWithState(new RunningCountMostRecentStatefulProcessor(), @@ -1512,10 +1521,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest }, StopStream ) - val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") - val stateSchemaPath = getStateSchemaPath(stateOpIdPath) - - val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) // by the end of the test, there have been 4 batches, // so the metadata and schema logs, and commitLog has been purged // for batches 0 and 1 so metadata and schema files exist for batches 0, 1, 2, 3 @@ -1527,6 +1532,116 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("transformWithState - verify that schema file is kept after metadata is purged") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") { + withTempDir { chkptDir => + val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") + val stateSchemaPath = getStateSchemaPath(stateOpIdPath) + + val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) + // in this test case, we are changing the state spec back and forth + // to trigger the writing of the schema and metadata files + val inputData = MemoryStream[(String, String)] + val result1 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new RunningCountMostRecentStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "1", "")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "2", "str1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + val result2 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new MostRecentStatefulProcessorWithDeletion(), + TimeMode.None(), + OutputMode.Update()) + testStream(result2, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str2")), + CheckNewAnswer(("a", "str1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + assert(getFiles(metadataPath).length == 3) + assert(getFiles(stateSchemaPath).length == 2) + + val result3 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new RunningCountMostRecentStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + testStream(result3, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str3")), + CheckNewAnswer(("a", "1", "str2")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + // metadata files should be kept for batches 1, 2, 3 + // schema files should be kept for batches 0, 2, 3 + assert(getFiles(metadataPath).length == 3) + assert(getFiles(stateSchemaPath).length == 3) + // we want to ensure that we can read batch 1 even though the + // metadata file for batch 0 was removed + val batch1Df = spark.read + .format("statestore") + .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath) + .option(StateSourceOptions.STATE_VAR_NAME, "countState") + .option(StateSourceOptions.BATCH_ID, 1) + .load() + + val batch1AnsDf = batch1Df.selectExpr( + "key.value AS groupingKey", + "single_value.value AS valueId") + + checkAnswer(batch1AnsDf, Seq(Row("a", 2L))) + + val batch3Df = spark.read + .format("statestore") + .option(StateSourceOptions.PATH, chkptDir.getAbsolutePath) + .option(StateSourceOptions.STATE_VAR_NAME, "countState") + .option(StateSourceOptions.BATCH_ID, 3) + .load() + + val batch3AnsDf = batch3Df.selectExpr( + "key.value AS groupingKey", + "single_value.value AS valueId") + checkAnswer(batch3AnsDf, Seq(Row("a", 1L))) + } + } + } + test("state data source integration - value state supports time travel") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, @@ -1708,6 +1823,102 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } } + + test("transformWithState - verify that no metadata and schema logs are purged after" + + " removing column family") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "3") { + withTempDir { chkptDir => + val inputData = MemoryStream[(String, String)] + val result1 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new RunningCountMostRecentStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "1", "")), + AddData(inputData, ("a", "str1")), + CheckNewAnswer(("a", "2", "str1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "1", "")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "2", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "3", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "4", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "5", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "6", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "7", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "8", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "9", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "10", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "11", "str1")), + AddData(inputData, ("b", "str1")), + CheckNewAnswer(("b", "12", "str1")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + val result2 = inputData.toDS() + .groupByKey(x => x._1) + .transformWithState(new MostRecentStatefulProcessorWithDeletion(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result2, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, ("b", "str2")), + CheckNewAnswer(("b", "str1")), + AddData(inputData, ("b", "str3")), + CheckNewAnswer(("b", "str2")), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + }, + StopStream + ) + + val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0") + val stateSchemaPath = getStateSchemaPath(stateOpIdPath) + + val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath) + + // Metadata files are written for batches 0, 2, and 14. + // Schema files are written for 0, 14 + // At the beginning of the last query run, the thresholdBatchId is 11. + // However, we would need both schema files to be preserved, if we want to + // be able to read from batch 11 onwards. + assert(getFiles(metadataPath).length == 2) + assert(getFiles(stateSchemaPath).length == 2) + } + } + } } class TransformWithStateValidationSuite extends StateStoreMetricsTest {