Skip to content

Commit

Permalink
Allow non existing checkpoint location and change IT (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#313)

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Apr 19, 2024
1 parent fe0c7c4 commit b5ab7bd
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ trait FlintSparkValidationHelper extends Logging {
new Path(checkpointLocation),
spark.sessionState.newHadoopConf())

// The primary intent here is to catch any exceptions during the accessibility check.
// The actual result is ignored, as Spark can create any necessary sub-folders
// when the streaming job starts.
checkpointManager.exists(new Path(checkpointLocation))
true
} catch {
case e: IOException =>
logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
if (checkpointLocation.isDefined) {
require(
isCheckpointLocationAccessible(spark, checkpointLocation.get),
s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access")
s"No permission to access the checkpoint location ${checkpointLocation.get}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex)
"Checkpoint location is required by incremental refresh")
require(
isCheckpointLocationAccessible(spark, checkpointLocation.get),
s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access")
s"No permission to access the checkpoint location ${checkpointLocation.get}")
}

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup
withTable(testTable) {
sql(s"CREATE TABLE $testTable (name STRING) USING JSON")

// Generate UUID as folder name to ensure the path not exist
val checkpointDir = s"/test/${UUID.randomUUID()}"
// Use unknown scheme URL to simulate location without access permission
val checkpointDir = "unknown://invalid_permission_path"
the[IllegalArgumentException] thrownBy {
sql(s"""
| $statement
Expand All @@ -127,7 +127,7 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup
| )
|""".stripMargin)
} should have message
s"requirement failed: Checkpoint location $checkpointDir doesn't exist or no permission to access"
s"requirement failed: No permission to access the checkpoint location $checkpointDir"
}
}
}
Expand Down

0 comments on commit b5ab7bd

Please sign in to comment.