Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Apr 16, 2024
1 parent 2c47c16 commit 7731309
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
case (true, false) => AUTO
case (false, false) => FULL
case (false, true) => INCREMENTAL
case (true, true) =>
throw new IllegalArgumentException(
"auto_refresh and incremental_refresh options cannot both be true")
}

// validate allowed options depending on refresh mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}
trait FlintSparkValidationHelper extends Logging {

/**
* Determines whether the source table(s) for a given Flint index are Hive tables.
* Determines whether the source table(s) for a given Flint index are supported.
*
* @param spark
* Spark session
Expand All @@ -34,7 +34,7 @@ trait FlintSparkValidationHelper extends Logging {
* @return
* true if all non Hive, otherwise false
*/
def isSourceTableHive(spark: SparkSession, index: FlintSparkIndex): Boolean = {
def isTableProviderSupported(spark: SparkSession, index: FlintSparkIndex): Boolean = {
// Extract source table name (possibly more than one for MV query)
val tableNames = index match {
case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName)
Expand All @@ -47,10 +47,12 @@ trait FlintSparkValidationHelper extends Logging {
}
}

// Validate if any source table is Hive
// Validate if any source table is not supported (currently Hive only)
tableNames.exists { tableName =>
val (catalog, ident) = parseTableName(spark, tableName)
val table = loadTable(catalog, ident).get

// TODO: add allowed table provider list
DDLUtils.isHiveTable(Option(table.properties().get("provider")))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
"Incremental refresh cannot be enabled if auto refresh is enabled")

// Hive table doesn't support auto refresh
require(!isSourceTableHive(spark, index), "Index auto refresh doesn't support Hive table")
require(
!isTableProviderSupported(spark, index),
"Index auto refresh doesn't support Hive table")

// Checkpoint location is required if mandatory option set
val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String])
val checkpointLocation = index.options.checkpointLocation()
val checkpointLocation = options.checkpointLocation()
if (flintSparkConf.isCheckpointMandatory) {
require(
checkpointLocation.isDefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex)
override def validate(spark: SparkSession): Unit = {
// Non-Hive table is required for incremental refresh
require(
!isSourceTableHive(spark, index),
!isTableProviderSupported(spark, index),
"Index incremental refresh doesn't support Hive table")

// Checkpoint location is required regardless of mandatory option
Expand Down

0 comments on commit 7731309

Please sign in to comment.