Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow non-existent checkpoint location path in index validation #313

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ trait FlintSparkValidationHelper extends Logging {
new Path(checkpointLocation),
spark.sessionState.newHadoopConf())

// The primary intent here is to catch any exceptions during the accessibility check.
// The actual result is ignored, as Spark can create any necessary sub-folders
// when the streaming job starts.
checkpointManager.exists(new Path(checkpointLocation))
true
} catch {
case e: IOException =>
logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
if (checkpointLocation.isDefined) {
require(
isCheckpointLocationAccessible(spark, checkpointLocation.get),
s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access")
s"No permission to access the checkpoint location ${checkpointLocation.get}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex)
"Checkpoint location is required by incremental refresh")
require(
isCheckpointLocationAccessible(spark, checkpointLocation.get),
s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access")
s"No permission to access the checkpoint location ${checkpointLocation.get}")
}

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup
withTable(testTable) {
sql(s"CREATE TABLE $testTable (name STRING) USING JSON")

// Generate UUID as folder name to ensure the path not exist
val checkpointDir = s"/test/${UUID.randomUUID()}"
// Use unknown scheme URL to simulate location without access permission
val checkpointDir = "unknown://invalid_permission_path"
the[IllegalArgumentException] thrownBy {
sql(s"""
| $statement
Expand All @@ -127,7 +127,7 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup
| )
|""".stripMargin)
} should have message
s"requirement failed: Checkpoint location $checkpointDir doesn't exist or no permission to access"
s"requirement failed: No permission to access the checkpoint location $checkpointDir"
}
}
}
Expand Down
Loading