diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 3c24cba6a..a8d4a56e4 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -81,6 +81,16 @@ object FlintSparkIndexRefresh extends Logging { } } + /** + * Validate if source table(s) of the given Flint index are not Hive table. + * + * @param spark + * Spark session + * @param index + * Flint index + * @return + * true if all non Hive, otherwise false + */ def isSourceTableNonHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { // Extract source table name (possibly more than 1 for MV source query) val tableNames = index match { @@ -102,6 +112,17 @@ object FlintSparkIndexRefresh extends Logging { } } + /** + * Validate if checkpoint location is accessible (the folder exists and current Spark session + * has permission to access). + * + * @param spark + * Spark session + * @param checkpointLocation + * checkpoint location + * @return + * true if accessible, otherwise false + */ def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { val checkpointPath = new Path(checkpointLocation) val checkpointManager = diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index d995b9825..5c8054253 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -27,7 +27,9 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def validate(spark: SparkSession): Unit = { // Non-Hive table is required for incremental refresh - require(!isSourceTableNonHive(spark, index), "Flint index incremental refresh doesn't support Hive table") + require( + !isSourceTableNonHive(spark, index), + "Flint index incremental refresh doesn't support Hive table") // Checkpoint location is required regardless of mandatory option val options = index.options