From b5ab7bd3ed135f1827df2cb0ca17422beb776985 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 19 Apr 2024 09:54:13 -0700 Subject: [PATCH] Allow non existing checkpoint location and change IT (#313) Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSparkValidationHelper.scala | 4 ++++ .../opensearch/flint/spark/refresh/AutoIndexRefresh.scala | 2 +- .../flint/spark/refresh/IncrementalIndexRefresh.scala | 2 +- .../flint/spark/FlintSparkIndexValidationITSuite.scala | 6 +++--- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index f689d9aee..987e4366d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -76,7 +76,11 @@ trait FlintSparkValidationHelper extends Logging { new Path(checkpointLocation), spark.sessionState.newHadoopConf()) + // The primary intent here is to catch any exceptions during the accessibility check. + // The actual result is ignored, as Spark can create any necessary sub-folders + // when the streaming job starts. checkpointManager.exists(new Path(checkpointLocation)) + true } catch { case e: IOException => logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index 35902e184..dd565fd18 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -56,7 +56,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) if (checkpointLocation.isDefined) { require( isCheckpointLocationAccessible(spark, checkpointLocation.get), - s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") + s"No permission to access the checkpoint location ${checkpointLocation.get}") } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 8eb8d6f1f..b52f1ae79 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -39,7 +39,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) "Checkpoint location is required by incremental refresh") require( isCheckpointLocationAccessible(spark, checkpointLocation.get), - s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") + s"No permission to access the checkpoint location ${checkpointLocation.get}") } override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala index ee7420d94..cc0573d6c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -116,8 +116,8 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup withTable(testTable) { sql(s"CREATE TABLE $testTable (name STRING) USING JSON") - // Generate UUID as folder name to ensure the path not exist - val checkpointDir = s"/test/${UUID.randomUUID()}" + // Use unknown scheme URL to simulate location without access permission + val checkpointDir = "unknown://invalid_permission_path" the[IllegalArgumentException] thrownBy { sql(s""" | $statement @@ -127,7 +127,7 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup | ) |""".stripMargin) } should have message - s"requirement failed: Checkpoint location $checkpointDir doesn't exist or no permission to access" + s"requirement failed: No permission to access the checkpoint location $checkpointDir" } } }