From 7731309a371ffe2e272ede26a9778005f4cc1a15 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 16 Apr 2024 16:54:36 -0700 Subject: [PATCH] Address PR comments Signed-off-by: Chen Dai --- .../scala/org/opensearch/flint/spark/FlintSpark.scala | 3 --- .../flint/spark/FlintSparkValidationHelper.scala | 8 +++++--- .../opensearch/flint/spark/refresh/AutoIndexRefresh.scala | 6 ++++-- .../flint/spark/refresh/IncrementalIndexRefresh.scala | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 7b875f63b..9cd5f60a7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -407,9 +407,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { case (true, false) => AUTO case (false, false) => FULL case (false, true) => INCREMENTAL - case (true, true) => - throw new IllegalArgumentException( - "auto_refresh and incremental_refresh options cannot both be true") } // validate allowed options depending on refresh mode diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index 16d5a1905..f689d9aee 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} trait FlintSparkValidationHelper extends Logging { /** - * Determines whether the source table(s) for a given Flint index are Hive tables. + * Determines whether the source table(s) for a given Flint index are supported. * * @param spark * Spark session @@ -34,7 +34,7 @@ trait FlintSparkValidationHelper extends Logging { * @return * true if all non Hive, otherwise false */ - def isSourceTableHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { + def isTableProviderSupported(spark: SparkSession, index: FlintSparkIndex): Boolean = { // Extract source table name (possibly more than one for MV query) val tableNames = index match { case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) @@ -47,10 +47,12 @@ trait FlintSparkValidationHelper extends Logging { } } - // Validate if any source table is Hive + // Validate if any source table is not supported (currently Hive only) tableNames.exists { tableName => val (catalog, ident) = parseTableName(spark, tableName) val table = loadTable(catalog, ident).get + + // TODO: add allowed table provider list DDLUtils.isHiveTable(Option(table.properties().get("provider"))) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index f99f55790..35902e184 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -39,11 +39,13 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) "Incremental refresh cannot be enabled if auto refresh is enabled") // Hive table doesn't support auto refresh - require(!isSourceTableHive(spark, index), "Index auto refresh doesn't support Hive table") + require( + !isTableProviderSupported(spark, index), + "Index auto refresh doesn't support Hive table") // Checkpoint location is required if mandatory option set val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) - val checkpointLocation = index.options.checkpointLocation() + val checkpointLocation = options.checkpointLocation() if (flintSparkConf.isCheckpointMandatory) { require( checkpointLocation.isDefined, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 80aa91687..8eb8d6f1f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -28,7 +28,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def validate(spark: SparkSession): Unit = { // Non-Hive table is required for incremental refresh require( - !isSourceTableHive(spark, index), + !isTableProviderSupported(spark, index), "Index incremental refresh doesn't support Hive table") // Checkpoint location is required regardless of mandatory option