Skip to content

Commit

Permalink
[SPARK-49048][SS] Add support for reading relevant operator metadata …
Browse files Browse the repository at this point in the history
…at given batch id

### What changes were proposed in this pull request?
Add support for reading relevant operator metadata at given batch id

### Why are the changes needed?
Needed to support reading state for operators that allow for schema changes across batch ids

This change also introduces the location for the state schema format for the v2 version. As part of this version, the operator metadata and the state schema will be written to the following locations:
- for the operator metadata, this will be under the `<checkpoint_loc>/state/<operator_id>/ _metadata` directory
- for the state schema, this will be under the `<checkpoint_loc>/state/<operator_id>/ _stateSchema` directory

In the older versions for the operator/state schema formats, this would be stored in the following location:
- for the operator metadata, this will be under the `<checkpoint_loc>/state/<operator_id>/ _metadata` directory
- for the state schema, this will be under the `<checkpoint_loc>/state/<operator_id>/ 0/<storeName>/_metadata/schema` directory

### Does this PR introduce _any_ user-facing change?
Yes

Allows the user to specify a batchId while querying the operator metadata. This is a no-op for operators using metadata version 1 and will provide the right metadata from v2 onwards

```
spark
   .read
   .format("state-metadata")
   .option("batchId", <batchId>)
   .load(<path>)
```

### How was this patch tested?
Added new unit tests

```
===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.OperatorStateMetadataSuite, threads: Idle Worker Monitor for python3 (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), ForkJoinPool.commonPool-worker-2 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info] Run completed in 30 seconds, 859 milliseconds.
[info] Total number of tests run: 11
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47528 from anishshri-db/task/SPARK-49048.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Aug 6, 2024
1 parent b477753 commit 2a75210
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 40 deletions.
8 changes: 8 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -4002,6 +4002,14 @@
],
"sqlState" : "42613"
},
"STDS_FAILED_TO_READ_OPERATOR_METADATA" : {
"message" : [
"Failed to read the operator metadata for checkpointLocation=<checkpointLocation> and batchId=<batchId>.",
"Either the file does not exist, or the file is corrupted.",
"Rerun the streaming query to construct the operator metadata, and report to the corresponding communities or vendors if the error persists."
],
"sqlState" : "42K03"
},
"STDS_FAILED_TO_READ_STATE_SCHEMA" : {
"message" : [
"Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: <sourceOptions>.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class StateDataSource extends TableProvider with DataSourceRegister {
// Read the operator metadata once to see if we can find the information for prefix scan
// encoder used in session window aggregation queries.
val allStateStoreMetadata = new StateMetadataPartitionReader(
sourceOptions.stateCheckpointLocation.getParent.toString, serializedHadoopConf)
sourceOptions.stateCheckpointLocation.getParent.toString, serializedHadoopConf,
sourceOptions.batchId)
.stateMetadata.toArray
val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
entry.operatorId == sourceOptions.operatorId &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ object StateDataSourceErrors {
new StateDataSourceReadStateSchemaFailure(sourceOptions, cause)
}

def failedToReadOperatorMetadata(
checkpointLocation: String,
batchId: Long): StateDataSourceException = {
new StateDataSourceReadOperatorMetadataFailure(checkpointLocation, batchId)
}

def conflictOptions(options: Seq[String]): StateDataSourceException = {
new StateDataSourceConflictOptions(options)
}
Expand Down Expand Up @@ -158,3 +164,11 @@ class StateDataSourceNoPartitionDiscoveredInStateStore(sourceOptions: StateSourc
"STDS_NO_PARTITION_DISCOVERED_IN_STATE_STORE",
Map("sourceOptions" -> sourceOptions.toString),
cause = null)

class StateDataSourceReadOperatorMetadataFailure(
checkpointLocation: String,
batchId: Long)
extends StateDataSourceException(
"STDS_FAILED_TO_READ_OPERATOR_METADATA",
Map("checkpointLocation" -> checkpointLocation, "batchId" -> batchId.toString),
cause = null)
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionRead
import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceErrors
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.PATH
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.state.{OperatorInfoV1, OperatorStateMetadata, OperatorStateMetadataReader, OperatorStateMetadataV1, OperatorStateMetadataV2, StateStoreMetadataV1}
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataReader, OperatorStateMetadataUtils, OperatorStateMetadataV1, OperatorStateMetadataV2}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -104,7 +104,17 @@ class StateMetadataTable extends Table with SupportsRead with SupportsMetadataCo
if (!options.containsKey("path")) {
throw StateDataSourceErrors.requiredOptionUnspecified(PATH)
}
new StateMetadataScan(options.get("path"))

val checkpointLocation = options.get("path")

// TODO: SPARK-49115 - add docs for new options for state metadata source
val batchIdOpt = Option(options.get("batchId")).map(_.toLong)
// if a batchId is provided, use it. Otherwise, use the last committed batch. If there is no
// committed batch, use batchId 0.
val batchId = batchIdOpt.getOrElse(OperatorStateMetadataUtils
.getLastCommittedBatch(SparkSession.active, checkpointLocation).getOrElse(0L))

new StateMetadataScan(options.get("path"), batchId)
}
}

Expand All @@ -119,7 +129,7 @@ class StateMetadataTable extends Table with SupportsRead with SupportsMetadataCo

case class StateMetadataInputPartition(checkpointLocation: String) extends InputPartition

class StateMetadataScan(checkpointLocation: String) extends Scan {
class StateMetadataScan(checkpointLocation: String, batchId: Long) extends Scan {
override def readSchema: StructType = StateMetadataTableEntry.schema

override def toBatch: Batch = {
Expand All @@ -131,24 +141,26 @@ class StateMetadataScan(checkpointLocation: String) extends Scan {
override def createReaderFactory(): PartitionReaderFactory = {
// Don't need to broadcast the hadoop conf because this source only has one partition.
val conf = new SerializableConfiguration(SparkSession.active.sessionState.newHadoopConf())
StateMetadataPartitionReaderFactory(conf)
StateMetadataPartitionReaderFactory(conf, batchId)
}
}
}
}

case class StateMetadataPartitionReaderFactory(hadoopConf: SerializableConfiguration)
extends PartitionReaderFactory {
case class StateMetadataPartitionReaderFactory(
hadoopConf: SerializableConfiguration,
batchId: Long) extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new StateMetadataPartitionReader(
partition.asInstanceOf[StateMetadataInputPartition].checkpointLocation, hadoopConf)
partition.asInstanceOf[StateMetadataInputPartition].checkpointLocation, hadoopConf, batchId)
}
}

class StateMetadataPartitionReader(
checkpointLocation: String,
serializedHadoopConf: SerializableConfiguration) extends PartitionReader[InternalRow] {
serializedHadoopConf: SerializableConfiguration,
batchId: Long) extends PartitionReader[InternalRow] {

override def next(): Boolean = {
stateMetadata.hasNext
Expand Down Expand Up @@ -207,11 +219,12 @@ class StateMetadataPartitionReader(
} else {
1
}

OperatorStateMetadataReader.createReader(
operatorIdPath, hadoopConf, operatorStateMetadataVersion).read() match {
operatorIdPath, hadoopConf, operatorStateMetadataVersion, batchId).read() match {
case Some(metadata) => metadata
case None => OperatorStateMetadataV1(OperatorInfoV1(opId, null),
Array(StateStoreMetadataV1(null, -1, -1)))
case None => throw StateDataSourceErrors.failedToReadOperatorMetadata(checkpointLocation,
batchId)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class IncrementalExecution(
val oldMetadata = try {
OperatorStateMetadataReader.createReader(
new Path(checkpointLocation, ssw.getStateInfo.operatorId.toString),
hadoopConf, ssw.operatorStateMetadataVersion).read()
hadoopConf, ssw.operatorStateMetadataVersion, currentBatchId - 1).read()
} catch {
case e: Exception =>
logWarning(log"Error reading metadata path for stateful operator. This " +
Expand Down Expand Up @@ -462,7 +462,9 @@ class IncrementalExecution(
rulesToCompose.reduceLeft { (ruleA, ruleB) => ruleA orElse ruleB }
}

private def checkOperatorValidWithMetadata(planWithStateOpId: SparkPlan): Unit = {
private def checkOperatorValidWithMetadata(
planWithStateOpId: SparkPlan,
batchId: Long): Unit = {
// get stateful operators for current batch
val opMapInPhysicalPlan: Map[Long, String] = planWithStateOpId.collect {
case stateStoreWriter: StateStoreWriter =>
Expand All @@ -479,7 +481,7 @@ class IncrementalExecution(
try {
val reader = new StateMetadataPartitionReader(
new Path(checkpointLocation).getParent.toString,
new SerializableConfiguration(hadoopConf))
new SerializableConfiguration(hadoopConf), batchId)
val opMetadataList = reader.allOperatorStateMetadata
ret = opMetadataList.map {
case OperatorStateMetadataV1(operatorInfo, _) =>
Expand Down Expand Up @@ -517,7 +519,7 @@ class IncrementalExecution(
// Need to check before write to metadata because we need to detect add operator
// Only check when streaming is restarting and is first batch
if (isFirstBatch && currentBatchId != 0) {
checkOperatorValidWithMetadata(planWithStateOpId)
checkOperatorValidWithMetadata(planWithStateOpId, currentBatchId - 1)
}

// The rule below doesn't change the plan but can cause the side effect that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,16 @@ case class TransformWithStateExec(
new Path(stateSchemaDir, s"${batchId}_${UUID.randomUUID().toString}")
val metadataPath = new Path(getStateInfo.checkpointLocation, s"${getStateInfo.operatorId}")
val metadataReader = OperatorStateMetadataReader.createReader(
metadataPath, hadoopConf, operatorStateMetadataVersion)
val operatorStateMetadata = metadataReader.read()
metadataPath, hadoopConf, operatorStateMetadataVersion, batchId)
val operatorStateMetadata = try {
metadataReader.read()
} catch {
// If this is the first time we are running the query, there will be no metadata
// and this error is expected. In this case, we return None.
case ex: Exception if batchId == 0 =>
None
}

val oldStateSchemaFilePath: Option[Path] = operatorStateMetadata match {
case Some(metadata) =>
metadata match {
Expand Down Expand Up @@ -444,8 +452,9 @@ case class TransformWithStateExec(
new Path(getStateInfo.checkpointLocation,
s"${getStateInfo.operatorId.toString}")

val storeNamePath = new Path(stateCheckpointPath, storeName)
new Path(new Path(storeNamePath, "_metadata"), "schema")
val stateSchemaPath = new Path(stateCheckpointPath, "_stateSchema")
val storeNamePath = new Path(stateSchemaPath, storeName)
storeNamePath
}

override def validateNewMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization

import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceErrors
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, CommitLog, MetadataVersionUtil, OffsetSeqLog}
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS}
import org.apache.spark.sql.execution.streaming.state.OperatorStateMetadataUtils.{OperatorStateMetadataReader, OperatorStateMetadataWriter}

/**
Expand Down Expand Up @@ -166,18 +169,31 @@ object OperatorStateMetadataUtils extends Logging {
s"version=${operatorStateMetadata.version}")
}
}

def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): Long = {
val offsetLog = new OffsetSeqLog(session,
new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
offsetLog.getLatest().map(_._1).getOrElse(throw
StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))
}

def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Option[Long] = {
val commitLog = new CommitLog(session, new Path(checkpointLocation, DIR_NAME_COMMITS).toString)
commitLog.getLatest().map(_._1)
}
}

object OperatorStateMetadataReader {
def createReader(
stateCheckpointPath: Path,
hadoopConf: Configuration,
version: Int): OperatorStateMetadataReader = {
version: Int,
batchId: Long): OperatorStateMetadataReader = {
version match {
case 1 =>
new OperatorStateMetadataV1Reader(stateCheckpointPath, hadoopConf)
case 2 =>
new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf)
new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf, batchId)
case _ =>
throw new IllegalArgumentException(s"Failed to create reader for operator metadata " +
s"with version=$version")
Expand Down Expand Up @@ -291,30 +307,54 @@ class OperatorStateMetadataV2Writer(

class OperatorStateMetadataV2Reader(
stateCheckpointPath: Path,
hadoopConf: Configuration) extends OperatorStateMetadataReader {
hadoopConf: Configuration,
batchId: Long) extends OperatorStateMetadataReader {

// Check that the requested batchId is available in the checkpoint directory
val baseCheckpointDir = stateCheckpointPath.getParent.getParent
val lastAvailOffset = listOffsets(baseCheckpointDir).lastOption.getOrElse(-1L)
if (batchId > lastAvailOffset) {
throw StateDataSourceErrors.failedToReadOperatorMetadata(baseCheckpointDir.toString, batchId)
}

private val metadataDirPath = OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath)
private lazy val fm = CheckpointFileManager.create(metadataDirPath, hadoopConf)

fm.mkdirs(metadataDirPath.getParent)

override def version: Int = 2

private def listBatches(): Array[Long] = {
// List the available offsets in the offset directory
private def listOffsets(baseCheckpointDir: Path): Array[Long] = {
val offsetLog = new Path(baseCheckpointDir, DIR_NAME_OFFSETS)
val fm = CheckpointFileManager.create(offsetLog, hadoopConf)
if (!fm.exists(offsetLog)) {
return Array.empty
}
fm.list(offsetLog)
.filter(f => !f.getPath.getName.startsWith(".")) // ignore hidden files
.map(_.getPath.getName.toLong).sorted
}

// List the available batches in the operator metadata directory
private def listOperatorMetadataBatches(): Array[Long] = {
if (!fm.exists(metadataDirPath)) {
return Array.empty
}
fm.list(metadataDirPath).map(_.getPath.getName.toLong).sorted
}

override def read(): Option[OperatorStateMetadata] = {
val batches = listBatches()
if (batches.isEmpty) {
return None
val batches = listOperatorMetadataBatches()
val lastBatchId = batches.filter(_ <= batchId).lastOption
if (lastBatchId.isEmpty) {
throw StateDataSourceErrors.failedToReadOperatorMetadata(stateCheckpointPath.toString,
batchId)
} else {
val metadataFilePath = OperatorStateMetadataV2.metadataFilePath(
stateCheckpointPath, lastBatchId.get)
val inputStream = fm.open(metadataFilePath)
OperatorStateMetadataUtils.readMetadata(inputStream, version)
}
val lastBatchId = batches.last
val metadataFilePath = OperatorStateMetadataV2.metadataFilePath(
stateCheckpointPath, lastBatchId)
val inputStream = fm.open(metadataFilePath)
OperatorStateMetadataUtils.readMetadata(inputStream, version)
}
}
Loading

0 comments on commit 2a75210

Please sign in to comment.