From 1328586d7d4dd9fe8670d5eced6ab38087b7ac6f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 26 Mar 2024 16:47:35 -0700 Subject: [PATCH 01/10] Add index refresh validation Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 5 ++ .../flint/spark/FlintSparkIndex.scala | 15 +++++ .../spark/refresh/AutoIndexRefresh.scala | 32 +++++++++++ .../refresh/FlintSparkIndexRefresh.scala | 55 ++++++++++++++++++- .../refresh/IncrementalIndexRefresh.scala | 21 +++++-- 5 files changed, 122 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 89e8dc423..238266eea 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -104,6 +104,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { } else { val metadata = index.metadata() try { + // Validate index beforehand to avoid leaving behind an orphaned OS index + // when streaming job fails to start due to invalidity later + index.validate(spark) + + // Start transaction only if index validation passed flintClient .startTransaction(indexName, dataSourceName, true) .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 702b1475e..77439dd09 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -9,8 +9,13 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.types.StructType @@ -46,6 +51,16 @@ trait FlintSparkIndex { */ def metadata(): FlintMetadata + /** + * Validates the index to ensure its validity. By default, this method validates index options + * by delegating to specific index refresh (index options are mostly serving index refresh). + * Subclasses can extend this method to include additional validation logic. + */ + def validate(spark: SparkSession): Unit = { + val refresh = FlintSparkIndexRefresh.create(name(), this) + refresh.validate(spark) // TODO: why indexName arg necessary? + } + /** * Build a data frame to represent index data computation logic. Upper level code decides how to * use this, ex. batch or streaming, fully or incremental refresh. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index 09428f80d..1b162fa7f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -5,8 +5,11 @@ package org.opensearch.flint.spark.refresh +import java.util.Collections + import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.{isCheckpointLocationAccessible, isSourceTableNonHive} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -27,6 +30,35 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintS override def refreshMode: RefreshMode = AUTO + override def validate(spark: SparkSession): Unit = { + // Incremental refresh cannot enabled at the same time + val options = index.options + require( + !options.incrementalRefresh(), + "Incremental refresh cannot be enabled if auto refresh is enabled") + + // Non-Hive table is required for auto refresh + require( + isSourceTableNonHive(spark, index), + "Flint index auto refresh doesn't support Hive table") + + // Checkpoint location is required if mandatory option set + val flintSparkConf = new FlintSparkConf(Collections.emptyMap) + val checkpointLocation = index.options.checkpointLocation() + if (flintSparkConf.isCheckpointMandatory) { + require( + checkpointLocation.isDefined, + s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled") + } + + // Given checkpoint location is accessible + if (checkpointLocation.isDefined) { + require( + isCheckpointLocationAccessible(spark, checkpointLocation.get), + s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") + } + } + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { val options = index.options val tableName = index.metadata().source diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 3c929d8e3..3c24cba6a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -5,11 +5,21 @@ package org.opensearch.flint.spark.refresh +import java.io.IOException + +import org.apache.hadoop.fs.Path import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.RefreshMode +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} import org.apache.spark.sql.flint.config.FlintSparkConf /** @@ -24,6 +34,11 @@ trait FlintSparkIndexRefresh extends Logging { */ def refreshMode: RefreshMode + /** + * Validate the current index refresh beforehand. + */ + def validate(spark: SparkSession): Unit = {} + /** * Start refreshing the index. * @@ -37,7 +52,7 @@ trait FlintSparkIndexRefresh extends Logging { def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] } -object FlintSparkIndexRefresh { +object FlintSparkIndexRefresh extends Logging { /** Index refresh mode */ object RefreshMode extends Enumeration { @@ -65,4 +80,42 @@ object FlintSparkIndexRefresh { new FullIndexRefresh(indexName, index) } } + + def isSourceTableNonHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { + // Extract source table name (possibly more than 1 for MV source query) + val tableNames = index match { + case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) + case covering: FlintSparkCoveringIndex => Seq(covering.tableName) + case mv: FlintSparkMaterializedView => + spark.sessionState.sqlParser + .parsePlan(mv.query) + .collect { case LogicalRelation(_, _, Some(table), _) => + qualifyTableName(spark, table.identifier.table) + } + } + + // Validate each source table is Hive + tableNames.forall { tableName => + val (catalog, ident) = parseTableName(spark, tableName) + val table = loadTable(catalog, ident).get + !DDLUtils.isHiveTable(Option(table.properties().get("provider"))) + } + } + + def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { + val checkpointPath = new Path(checkpointLocation) + val checkpointManager = + CheckpointFileManager.create(checkpointPath, spark.sessionState.newHadoopConf()) + try { + // require( + checkpointManager.exists(checkpointPath) + // s"Checkpoint location $checkpointLocation doesn't exist") + } catch { + case e: IOException => + logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e) + // throw new IllegalArgumentException( + // s"No permission to access checkpoint location $checkpointLocation") + false + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 418ada902..d995b9825 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark.refresh import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.{isCheckpointLocationAccessible, isSourceTableNonHive} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode} import org.apache.spark.sql.SparkSession @@ -24,14 +25,24 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def refreshMode: RefreshMode = INCREMENTAL + override def validate(spark: SparkSession): Unit = { + // Non-Hive table is required for incremental refresh + require(!isSourceTableNonHive(spark, index), "Flint index incremental refresh doesn't support Hive table") + + // Checkpoint location is required regardless of mandatory option + val options = index.options + val checkpointLocation = options.checkpointLocation() + require( + options.checkpointLocation().nonEmpty, + "Checkpoint location is required by incremental refresh") + require( + isCheckpointLocationAccessible(spark, checkpointLocation.get), + s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") + } + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { logInfo(s"Start refreshing index $indexName in incremental mode") - // TODO: move this to validation method together in future - if (index.options.checkpointLocation().isEmpty) { - throw new IllegalStateException("Checkpoint location is required by incremental refresh") - } - // Reuse auto refresh which uses AvailableNow trigger and will stop once complete val jobId = new AutoIndexRefresh(indexName, index) From 5b1a49ae075a9a8601556fbb7e720087f8e7d10f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 26 Mar 2024 16:50:41 -0700 Subject: [PATCH 02/10] Add Java doc Signed-off-by: Chen Dai --- .../refresh/FlintSparkIndexRefresh.scala | 21 +++++++++++++++++++ .../refresh/IncrementalIndexRefresh.scala | 4 +++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 3c24cba6a..a8d4a56e4 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -81,6 +81,16 @@ object FlintSparkIndexRefresh extends Logging { } } + /** + * Validate if source table(s) of the given Flint index are not Hive table. + * + * @param spark + * Spark session + * @param index + * Flint index + * @return + * true if all non Hive, otherwise false + */ def isSourceTableNonHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { // Extract source table name (possibly more than 1 for MV source query) val tableNames = index match { @@ -102,6 +112,17 @@ object FlintSparkIndexRefresh extends Logging { } } + /** + * Validate if checkpoint location is accessible (the folder exists and current Spark session + * has permission to access). + * + * @param spark + * Spark session + * @param checkpointLocation + * checkpoint location + * @return + * true if accessible, otherwise false + */ def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { val checkpointPath = new Path(checkpointLocation) val checkpointManager = diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index d995b9825..5c8054253 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -27,7 +27,9 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def validate(spark: SparkSession): Unit = { // Non-Hive table is required for incremental refresh - require(!isSourceTableNonHive(spark, index), "Flint index incremental refresh doesn't support Hive table") + require( + !isSourceTableNonHive(spark, index), + "Flint index incremental refresh doesn't support Hive table") // Checkpoint location is required regardless of mandatory option val options = index.options From 338040157f6340c423590f79891383edd5ad1b97 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 29 Mar 2024 14:00:30 -0700 Subject: [PATCH 03/10] Implement index refresh options validation Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 3 + .../flint/spark/FlintSparkException.scala | 32 +++++++++ .../flint/spark/FlintSparkIndex.scala | 10 ++- .../spark/refresh/AutoIndexRefresh.scala | 14 ++-- .../refresh/FlintSparkIndexRefresh.scala | 66 ++++++++++--------- .../refresh/IncrementalIndexRefresh.scala | 12 ++-- .../scala/org/apache/spark/FlintSuite.scala | 11 +++- .../spark/sql/FlintTestSparkSession.scala | 18 +++++ .../FlintSparkSkippingIndexSqlITSuite.scala | 20 +++++- project/Dependencies.scala | 1 + 10 files changed, 133 insertions(+), 54 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkException.scala create mode 100644 flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintTestSparkSession.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 238266eea..30ae8ef66 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -123,6 +123,9 @@ class FlintSpark(val spark: SparkSession) extends Logging { }) logInfo("Create index complete") } catch { + case e: FlintSparkException => + logError("Failed to create Flint index", e) + throw new IllegalStateException("Failed to create Flint index: " + e.getMessage) case e: Exception => logError("Failed to create Flint index", e) throw new IllegalStateException("Failed to create Flint index") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkException.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkException.scala new file mode 100644 index 000000000..19950bee6 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkException.scala @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +/** + * Flint Spark exception. + */ +abstract class FlintSparkException(message: String, cause: Option[Throwable]) + extends Throwable(message) {} + +object FlintSparkException { + + def requireValidation(requirement: Boolean, message: => Any): Unit = { + if (!requirement) { + throw new FlintSparkValidationException(message.toString) + } + } +} + +/** + * Flint Spark validation exception. + * + * @param message + * error message + * @param cause + * exception causing the error + */ +class FlintSparkValidationException(message: String, cause: Option[Throwable] = None) + extends FlintSparkException(message, cause) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 77439dd09..5941c69c1 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -9,13 +9,9 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.types.StructType @@ -57,8 +53,10 @@ trait FlintSparkIndex { * Subclasses can extend this method to include additional validation logic. */ def validate(spark: SparkSession): Unit = { - val refresh = FlintSparkIndexRefresh.create(name(), this) - refresh.validate(spark) // TODO: why indexName arg necessary? + // Validate if index option valid for refresh + FlintSparkIndexRefresh + .create(name(), this) + .validate(spark) // TODO: why indexName arg necessary? } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index 1b162fa7f..60ba6f300 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -8,8 +8,8 @@ package org.opensearch.flint.spark.refresh import java.util.Collections import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions} +import org.opensearch.flint.spark.FlintSparkException.requireValidation import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} -import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.{isCheckpointLocationAccessible, isSourceTableNonHive} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -33,27 +33,27 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintS override def validate(spark: SparkSession): Unit = { // Incremental refresh cannot enabled at the same time val options = index.options - require( + requireValidation( !options.incrementalRefresh(), "Incremental refresh cannot be enabled if auto refresh is enabled") // Non-Hive table is required for auto refresh - require( + requireValidation( isSourceTableNonHive(spark, index), - "Flint index auto refresh doesn't support Hive table") + "Index auto refresh doesn't support Hive table") // Checkpoint location is required if mandatory option set - val flintSparkConf = new FlintSparkConf(Collections.emptyMap) + val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) val checkpointLocation = index.options.checkpointLocation() if (flintSparkConf.isCheckpointMandatory) { - require( + requireValidation( checkpointLocation.isDefined, s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled") } // Given checkpoint location is accessible if (checkpointLocation.isDefined) { - require( + requireValidation( isCheckpointLocationAccessible(spark, checkpointLocation.get), s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index a8d4a56e4..62a1b4044 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -50,36 +50,6 @@ trait FlintSparkIndexRefresh extends Logging { * optional Spark job ID */ def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] -} - -object FlintSparkIndexRefresh extends Logging { - - /** Index refresh mode */ - object RefreshMode extends Enumeration { - type RefreshMode = Value - val AUTO, FULL, INCREMENTAL = Value - } - - /** - * Create concrete index refresh implementation for the given index. - * - * @param indexName - * Flint index name - * @param index - * Flint index - * @return - * index refresh - */ - def create(indexName: String, index: FlintSparkIndex): FlintSparkIndexRefresh = { - val options = index.options - if (options.autoRefresh()) { - new AutoIndexRefresh(indexName, index) - } else if (options.incrementalRefresh()) { - new IncrementalIndexRefresh(indexName, index) - } else { - new FullIndexRefresh(indexName, index) - } - } /** * Validate if source table(s) of the given Flint index are not Hive table. @@ -91,7 +61,7 @@ object FlintSparkIndexRefresh extends Logging { * @return * true if all non Hive, otherwise false */ - def isSourceTableNonHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { + protected def isSourceTableNonHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { // Extract source table name (possibly more than 1 for MV source query) val tableNames = index match { case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) @@ -123,7 +93,9 @@ object FlintSparkIndexRefresh extends Logging { * @return * true if accessible, otherwise false */ - def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { + protected def isCheckpointLocationAccessible( + spark: SparkSession, + checkpointLocation: String): Boolean = { val checkpointPath = new Path(checkpointLocation) val checkpointManager = CheckpointFileManager.create(checkpointPath, spark.sessionState.newHadoopConf()) @@ -140,3 +112,33 @@ object FlintSparkIndexRefresh extends Logging { } } } + +object FlintSparkIndexRefresh { + + /** Index refresh mode */ + object RefreshMode extends Enumeration { + type RefreshMode = Value + val AUTO, FULL, INCREMENTAL = Value + } + + /** + * Create concrete index refresh implementation for the given index. + * + * @param indexName + * Flint index name + * @param index + * Flint index + * @return + * index refresh + */ + def create(indexName: String, index: FlintSparkIndex): FlintSparkIndexRefresh = { + val options = index.options + if (options.autoRefresh()) { + new AutoIndexRefresh(indexName, index) + } else if (options.incrementalRefresh()) { + new IncrementalIndexRefresh(indexName, index) + } else { + new FullIndexRefresh(indexName, index) + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 5c8054253..ed576d786 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -5,8 +5,8 @@ package org.opensearch.flint.spark.refresh +import org.opensearch.flint.spark.FlintSparkException.requireValidation import org.opensearch.flint.spark.FlintSparkIndex -import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.{isCheckpointLocationAccessible, isSourceTableNonHive} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode} import org.apache.spark.sql.SparkSession @@ -27,17 +27,17 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def validate(spark: SparkSession): Unit = { // Non-Hive table is required for incremental refresh - require( - !isSourceTableNonHive(spark, index), - "Flint index incremental refresh doesn't support Hive table") + requireValidation( + isSourceTableNonHive(spark, index), + "Index incremental refresh doesn't support Hive table") // Checkpoint location is required regardless of mandatory option val options = index.options val checkpointLocation = options.checkpointLocation() - require( + requireValidation( options.checkpointLocation().nonEmpty, "Checkpoint location is required by incremental refresh") - require( + requireValidation( isCheckpointLocationAccessible(spark, checkpointLocation.get), s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") } diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index ee8a52d96..87bb6cc9a 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -7,12 +7,13 @@ package org.apache.spark import org.opensearch.flint.spark.FlintSparkExtensions +import org.apache.spark.sql.{FlintTestSparkSession, SparkSession} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.flint.config.FlintConfigEntry import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} trait FlintSuite extends SharedSparkSession { override protected def sparkConf = { @@ -26,9 +27,15 @@ trait FlintSuite extends SharedSparkSession { // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) .set("spark.sql.extensions", classOf[FlintSparkExtensions].getName) + .set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive") conf } + override protected def createSparkSession: TestSparkSession = { + SparkSession.cleanupAnyExistingSession() + new FlintTestSparkSession(sparkConf) + } + /** * Set Flint Spark configuration. (Generic "value: T" has problem with FlintConfigEntry[Any]) */ diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintTestSparkSession.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintTestSparkSession.scala new file mode 100644 index 000000000..9372c4f9e --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintTestSparkSession.scala @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.sql.hive.HiveSessionStateBuilder +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.test.TestSparkSession + +class FlintTestSparkSession(sparkConf: SparkConf) extends TestSparkSession(sparkConf) { + + override lazy val sessionState: SessionState = { + new HiveSessionStateBuilder(this, None).build() + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index b08945953..77c140215 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -15,7 +15,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.must.Matchers.{defined, have} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -237,6 +237,24 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { } } + test("should fail if create auto refresh skipping index on Hive table") { + val hiveTableName = "spark_catalog.default.hive_table" + withTable(hiveTableName) { + sql(s""" + | CREATE TABLE $hiveTableName + | ( name STRING ) + |""".stripMargin) + + the[IllegalStateException] thrownBy { + sql(s""" + | CREATE SKIPPING INDEX ON $hiveTableName + | ( name VALUE_SET ) + | WITH (auto_refresh = true) + | """.stripMargin) + } should have message "Failed to create Flint index: Index auto refresh doesn't support Hive table" + } + } + test("should fail if refresh an auto refresh skipping index") { sql(s""" | CREATE SKIPPING INDEX ON $testTable diff --git a/project/Dependencies.scala b/project/Dependencies.scala index db92cf78f..047afb64c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,6 +12,7 @@ object Dependencies { "org.apache.spark" %% "spark-core" % sparkVersion % "provided" withSources (), "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" withSources (), "org.json4s" %% "json4s-native" % "3.7.0-M5" % "test", + "org.apache.spark" %% "spark-hive" % sparkVersion % "test", "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests") From 5ae67c53ead0410055da90b45d51443f9c12b776 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 2 Apr 2024 10:54:11 -0700 Subject: [PATCH 04/10] Move validate to index builder and add separate suite for Hive test Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 7 --- .../flint/spark/FlintSparkException.scala | 32 ------------- .../flint/spark/FlintSparkIndex.scala | 13 ------ .../flint/spark/FlintSparkIndexBuilder.scala | 22 ++++++++- .../spark/refresh/AutoIndexRefresh.scala | 11 ++--- .../refresh/IncrementalIndexRefresh.scala | 7 ++- .../scala/org/apache/spark/FlintSuite.scala | 11 +---- .../spark/sql/FlintTestSparkSession.scala | 18 -------- .../spark/sql/FlintSparkHiveSuite.scala | 43 +++++++++++++++++ .../FlintSparkHiveValidationITSuite.scala | 46 +++++++++++++++++++ .../FlintSparkSkippingIndexSqlITSuite.scala | 20 +------- 11 files changed, 119 insertions(+), 111 deletions(-) delete mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkException.scala delete mode 100644 flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintTestSparkSession.scala create mode 100644 integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9da103cd5..5ee37d4d5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -105,10 +105,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { } else { val metadata = index.metadata() try { - // Validate index beforehand to avoid leaving behind an orphaned OS index - // when streaming job fails to start due to invalidity later - index.validate(spark) - // Start transaction only if index validation passed flintClient .startTransaction(indexName, dataSourceName, true) @@ -124,9 +120,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { }) logInfo("Create index complete") } catch { - case e: FlintSparkException => - logError("Failed to create Flint index", e) - throw new IllegalStateException("Failed to create Flint index: " + e.getMessage) case e: Exception => logError("Failed to create Flint index", e) throw new IllegalStateException("Failed to create Flint index") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkException.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkException.scala deleted file mode 100644 index 19950bee6..000000000 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkException.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark - -/** - * Flint Spark exception. - */ -abstract class FlintSparkException(message: String, cause: Option[Throwable]) - extends Throwable(message) {} - -object FlintSparkException { - - def requireValidation(requirement: Boolean, message: => Any): Unit = { - if (!requirement) { - throw new FlintSparkValidationException(message.toString) - } - } -} - -/** - * Flint Spark validation exception. - * - * @param message - * error message - * @param cause - * exception causing the error - */ -class FlintSparkValidationException(message: String, cause: Option[Throwable] = None) - extends FlintSparkException(message, cause) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 5941c69c1..702b1475e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -9,7 +9,6 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.flint.datatype.FlintDataType @@ -47,18 +46,6 @@ trait FlintSparkIndex { */ def metadata(): FlintMetadata - /** - * Validates the index to ensure its validity. By default, this method validates index options - * by delegating to specific index refresh (index options are mostly serving index refresh). - * Subclasses can extend this method to include additional validation logic. - */ - def validate(spark: SparkSession): Unit = { - // Validate if index option valid for refresh - FlintSparkIndexRefresh - .create(name(), this) - .validate(spark) // TODO: why indexName arg necessary? - } - /** * Build a data frame to represent index data computation logic. Upper level code decides how to * use this, ex. batch or streaming, fully or incremental refresh. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index adcb4c45f..5f5bf764b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.spark.FlintSparkIndexOptions.empty +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.apache.spark.sql.catalog.Column import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -59,7 +60,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { * ignore existing index */ def create(ignoreIfExists: Boolean = false): Unit = - flint.createIndex(buildIndex(), ignoreIfExists) + flint.createIndex(validateIndex(buildIndex()), ignoreIfExists) /** * Copy Flint index with updated options. @@ -80,7 +81,24 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { val updatedMetadata = index .metadata() .copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava) - FlintSparkIndexFactory.create(updatedMetadata).get + validateIndex(FlintSparkIndexFactory.create(updatedMetadata).get) + } + + /** + * Pre-validate index to ensure its validity. By default, this method validates index options by + * delegating to specific index refresh (index options are mostly serving index refresh). + * Subclasses can extend this method to include additional validation logic. + * + * @param index + * Flint index to be validated + * @return + * the index or exception occurred if validation failed + */ + protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = { + FlintSparkIndexRefresh + .create(index.name(), index) + .validate(flint.spark) // TODO: why indexName arg necessary? + index } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index 60ba6f300..ab5b3714b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -8,7 +8,6 @@ package org.opensearch.flint.spark.refresh import java.util.Collections import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkException.requireValidation import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} @@ -33,27 +32,25 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintS override def validate(spark: SparkSession): Unit = { // Incremental refresh cannot enabled at the same time val options = index.options - requireValidation( + require( !options.incrementalRefresh(), "Incremental refresh cannot be enabled if auto refresh is enabled") // Non-Hive table is required for auto refresh - requireValidation( - isSourceTableNonHive(spark, index), - "Index auto refresh doesn't support Hive table") + require(isSourceTableNonHive(spark, index), "Index auto refresh doesn't support Hive table") // Checkpoint location is required if mandatory option set val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) val checkpointLocation = index.options.checkpointLocation() if (flintSparkConf.isCheckpointMandatory) { - requireValidation( + require( checkpointLocation.isDefined, s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled") } // Given checkpoint location is accessible if (checkpointLocation.isDefined) { - requireValidation( + require( isCheckpointLocationAccessible(spark, checkpointLocation.get), s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index ed576d786..d60d491a3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -5,7 +5,6 @@ package org.opensearch.flint.spark.refresh -import org.opensearch.flint.spark.FlintSparkException.requireValidation import org.opensearch.flint.spark.FlintSparkIndex import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode} @@ -27,17 +26,17 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def validate(spark: SparkSession): Unit = { // Non-Hive table is required for incremental refresh - requireValidation( + require( isSourceTableNonHive(spark, index), "Index incremental refresh doesn't support Hive table") // Checkpoint location is required regardless of mandatory option val options = index.options val checkpointLocation = options.checkpointLocation() - requireValidation( + require( options.checkpointLocation().nonEmpty, "Checkpoint location is required by incremental refresh") - requireValidation( + require( isCheckpointLocationAccessible(spark, checkpointLocation.get), s"Checkpoint location ${checkpointLocation.get} doesn't exist or no permission to access") } diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index 87bb6cc9a..ee8a52d96 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -7,13 +7,12 @@ package org.apache.spark import org.opensearch.flint.spark.FlintSparkExtensions -import org.apache.spark.sql.{FlintTestSparkSession, SparkSession} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.flint.config.FlintConfigEntry import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession trait FlintSuite extends SharedSparkSession { override protected def sparkConf = { @@ -27,15 +26,9 @@ trait FlintSuite extends SharedSparkSession { // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) .set("spark.sql.extensions", classOf[FlintSparkExtensions].getName) - .set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive") conf } - override protected def createSparkSession: TestSparkSession = { - SparkSession.cleanupAnyExistingSession() - new FlintTestSparkSession(sparkConf) - } - /** * Set Flint Spark configuration. (Generic "value: T" has problem with FlintConfigEntry[Any]) */ diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintTestSparkSession.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintTestSparkSession.scala deleted file mode 100644 index 9372c4f9e..000000000 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintTestSparkSession.scala +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark.sql - -import org.apache.spark.SparkConf -import org.apache.spark.sql.hive.HiveSessionStateBuilder -import org.apache.spark.sql.internal.SessionState -import org.apache.spark.sql.test.TestSparkSession - -class FlintTestSparkSession(sparkConf: SparkConf) extends TestSparkSession(sparkConf) { - - override lazy val sessionState: SessionState = { - new HiveSessionStateBuilder(this, None).build() - } -} diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala new file mode 100644 index 000000000..936a92ac6 --- /dev/null +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql + +import org.opensearch.flint.spark.FlintSparkSuite + +import org.apache.spark.SparkConf +import org.apache.spark.sql.hive.HiveSessionStateBuilder +import org.apache.spark.sql.internal.{SessionState, StaticSQLConf} +import org.apache.spark.sql.test.TestSparkSession + +/** + * Flint Spark base suite with Hive support enabled. Because enabling Hive support in Spark + * configuration alone is not adequate, as [[TestSparkSession]] disregards it and consistently + * creates its own instance of [[org.apache.spark.sql.test.TestSQLSessionStateBuilder]]. We need + * to override its session state with that of Hive in the meanwhile. + */ +trait FlintSparkHiveSuite extends FlintSparkSuite { + + override protected def sparkConf: SparkConf = { + // Enable Hive support + val conf = + super.sparkConf + .set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive") + conf + } + + override protected def createSparkSession: TestSparkSession = { + SparkSession.cleanupAnyExistingSession() + new FlintTestSparkSession(sparkConf) + } + + class FlintTestSparkSession(sparkConf: SparkConf) extends TestSparkSession(sparkConf) { self => + + override lazy val sessionState: SessionState = { + // Override to replace [[TestSQLSessionStateBuilder]] with Hive session state + new HiveSessionStateBuilder(spark, None).build() + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala new file mode 100644 index 000000000..0ef0f8f0c --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.scalatest.matchers.must.Matchers.have +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} + +import org.apache.spark.sql.FlintSparkHiveSuite + +class FlintSparkHiveValidationITSuite extends FlintSparkHiveSuite { + + private val hiveTableName = "spark_catalog.default.hive_table" + + override def beforeAll(): Unit = { + super.beforeAll() + sql(s"CREATE TABLE $hiveTableName (name STRING)") + } + + override def afterAll(): Unit = { + sql(s"DROP TABLE $hiveTableName") + super.afterAll() + } + + test("should fail if create auto refresh skipping index on Hive table") { + the[IllegalArgumentException] thrownBy { + sql(s""" + | CREATE SKIPPING INDEX ON $hiveTableName + | ( name VALUE_SET ) + | WITH (auto_refresh = true) + | """.stripMargin) + } should have message "requirement failed: Index auto refresh doesn't support Hive table" + } + + test("should fail if create incremental refresh skipping index on Hive table") { + the[IllegalArgumentException] thrownBy { + sql(s""" + | CREATE SKIPPING INDEX ON $hiveTableName + | ( name VALUE_SET ) + | WITH (incremental_refresh = true) + | """.stripMargin) + } should have message "requirement failed: Index incremental refresh doesn't support Hive table" + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 754835284..53d08bda7 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -15,7 +15,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -237,24 +237,6 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { } } - test("should fail if create auto refresh skipping index on Hive table") { - val hiveTableName = "spark_catalog.default.hive_table" - withTable(hiveTableName) { - sql(s""" - | CREATE TABLE $hiveTableName - | ( name STRING ) - |""".stripMargin) - - the[IllegalStateException] thrownBy { - sql(s""" - | CREATE SKIPPING INDEX ON $hiveTableName - | ( name VALUE_SET ) - | WITH (auto_refresh = true) - | """.stripMargin) - } should have message "Failed to create Flint index: Index auto refresh doesn't support Hive table" - } - } - test("should fail if refresh an auto refresh skipping index") { sql(s""" | CREATE SKIPPING INDEX ON $testTable From 3d6e3064bd8c427f28e87424a1f90fe594057665 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 2 Apr 2024 14:13:11 -0700 Subject: [PATCH 05/10] Use in-memory Derby as Hive metastore Signed-off-by: Chen Dai --- .../refresh/FlintSparkIndexRefresh.scala | 6 +- .../spark/sql/FlintSparkHiveSuite.scala | 12 +-- .../FlintSparkHiveValidationITSuite.scala | 80 +++++++++++++++---- 3 files changed, 73 insertions(+), 25 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 62a1b4044..9c8397513 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -16,8 +16,8 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} import org.apache.spark.sql.flint.config.FlintSparkConf @@ -69,8 +69,8 @@ trait FlintSparkIndexRefresh extends Logging { case mv: FlintSparkMaterializedView => spark.sessionState.sqlParser .parsePlan(mv.query) - .collect { case LogicalRelation(_, _, Some(table), _) => - qualifyTableName(spark, table.identifier.table) + .collect { case relation: UnresolvedRelation => + qualifyTableName(spark, relation.tableName) } } diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala index 936a92ac6..a0fda1285 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala @@ -21,11 +21,13 @@ import org.apache.spark.sql.test.TestSparkSession trait FlintSparkHiveSuite extends FlintSparkSuite { override protected def sparkConf: SparkConf = { - // Enable Hive support - val conf = - super.sparkConf - .set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive") - conf + super.sparkConf + // Enable Hive support + .set(StaticSQLConf.CATALOG_IMPLEMENTATION.key, "hive") + // Use in-memory Derby as Hive metastore so no need to clean up metastore_db folder after test + .set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:metastore_db;create=true") + .set("hive.metastore.uris", "") + .set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver") } override protected def createSparkSession: TestSparkSession = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala index 0ef0f8f0c..af5808045 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala @@ -5,6 +5,9 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.scalatest.matchers.must.Matchers.have import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -12,11 +15,28 @@ import org.apache.spark.sql.FlintSparkHiveSuite class FlintSparkHiveValidationITSuite extends FlintSparkHiveSuite { + // Test Hive table name private val hiveTableName = "spark_catalog.default.hive_table" + // Test create Flint index name and DDL statement + private val skippingIndexName = FlintSparkSkippingIndex.getSkippingIndexName(hiveTableName) + private val createSkippingIndexStatement = + s"CREATE SKIPPING INDEX ON $hiveTableName (name VALUE_SET)" + + private val coveringIndexName = + FlintSparkCoveringIndex.getFlintIndexName("ci_test", hiveTableName) + private val createCoveringIndexStatement = + s"CREATE INDEX ci_test ON $hiveTableName (name)" + + private val materializedViewName = + FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default.mv_test") + private val createMaterializedViewStatement = + s"CREATE MATERIALIZED VIEW spark_catalog.default.mv_test AS SELECT * FROM $hiveTableName" + override def beforeAll(): Unit = { super.beforeAll() sql(s"CREATE TABLE $hiveTableName (name STRING)") + sql(s"INSERT INTO $hiveTableName VALUES ('test')") } override def afterAll(): Unit = { @@ -24,23 +44,49 @@ class FlintSparkHiveValidationITSuite extends FlintSparkHiveSuite { super.afterAll() } - test("should fail if create auto refresh skipping index on Hive table") { - the[IllegalArgumentException] thrownBy { - sql(s""" - | CREATE SKIPPING INDEX ON $hiveTableName - | ( name VALUE_SET ) - | WITH (auto_refresh = true) - | """.stripMargin) - } should have message "requirement failed: Index auto refresh doesn't support Hive table" - } + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test(s"should fail to create auto refresh Flint index on Hive table: $statement") { + the[IllegalArgumentException] thrownBy { + withTempDir { checkpointDir => + sql(s""" + | $statement + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } + } should have message "requirement failed: Index auto refresh doesn't support Hive table" + } + } + + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test(s"should fail to create incremental refresh Flint index on Hive table: $statement") { + the[IllegalArgumentException] thrownBy { + withTempDir { checkpointDir => + sql(s""" + | $statement + | WITH ( + | incremental_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } + } should have message "requirement failed: Index incremental refresh doesn't support Hive table" + } + } - test("should fail if create incremental refresh skipping index on Hive table") { - the[IllegalArgumentException] thrownBy { - sql(s""" - | CREATE SKIPPING INDEX ON $hiveTableName - | ( name VALUE_SET ) - | WITH (incremental_refresh = true) - | """.stripMargin) - } should have message "requirement failed: Index incremental refresh doesn't support Hive table" + Seq( + (skippingIndexName, createSkippingIndexStatement), + (coveringIndexName, createCoveringIndexStatement), + (materializedViewName, createMaterializedViewStatement)).foreach { + case (flintIndexName, statement) => + test(s"should succeed to create full refresh Flint index on Hive table: $flintIndexName") { + sql(statement) + flint.refreshIndex(flintIndexName) + flint.queryIndex(flintIndexName).count() shouldBe 1 + } } } From 2874174330bf9b97f5df0990697401d9473fb6a5 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 3 Apr 2024 13:38:44 -0700 Subject: [PATCH 06/10] Fix broken IT Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 1 - .../flint/spark/FlintSparkIndexBuilder.scala | 4 +- .../spark/FlintSparkValidationHelper.scala | 82 ++++++++++++++ .../spark/refresh/AutoIndexRefresh.scala | 12 ++- .../refresh/FlintSparkIndexRefresh.scala | 73 +------------ .../spark/refresh/FullIndexRefresh.scala | 4 + .../refresh/IncrementalIndexRefresh.scala | 7 +- .../spark/sql/FlintSparkHiveSuite.scala | 1 - .../FlintSparkCoveringIndexSqlITSuite.scala | 2 +- .../FlintSparkMaterializedViewITSuite.scala | 100 +++++++++--------- .../FlintSparkSkippingIndexITSuite.scala | 77 +++++++------- .../FlintSparkSkippingIndexSqlITSuite.scala | 2 +- .../spark/FlintSparkUpdateIndexITSuite.scala | 8 +- 13 files changed, 197 insertions(+), 176 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 5ee37d4d5..7b875f63b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -105,7 +105,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { } else { val metadata = index.metadata() try { - // Start transaction only if index validation passed flintClient .startTransaction(indexName, dataSourceName, true) .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 5f5bf764b..106df276d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -96,8 +96,8 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { */ protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = { FlintSparkIndexRefresh - .create(index.name(), index) - .validate(flint.spark) // TODO: why indexName arg necessary? + .create(index.name(), index) // TODO: remove first argument? + .validate(flint.spark) index } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala new file mode 100644 index 000000000..8b3e9c4e8 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} + +/** + * Flint Spark validation helper. + */ +trait FlintSparkValidationHelper { + self: Logging => + + /** + * Validate if source table(s) of the given Flint index are not Hive table. + * + * @param spark + * Spark session + * @param index + * Flint index + * @return + * true if all non Hive, otherwise false + */ + def isSourceTableHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { + // Extract source table name (possibly more than 1 for MV source query) + val tableNames = index match { + case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) + case covering: FlintSparkCoveringIndex => Seq(covering.tableName) + case mv: FlintSparkMaterializedView => + spark.sessionState.sqlParser + .parsePlan(mv.query) + .collect { case relation: UnresolvedRelation => + qualifyTableName(spark, relation.tableName) + } + } + + // Validate if any source table is Hive + tableNames.exists { tableName => + val (catalog, ident) = parseTableName(spark, tableName) + val table = loadTable(catalog, ident).get + DDLUtils.isHiveTable(Option(table.properties().get("provider"))) + } + } + + /** + * Validate if checkpoint location is accessible (the folder exists and current Spark session + * has permission to access). + * + * @param spark + * Spark session + * @param checkpointLocation + * checkpoint location + * @return + * true if accessible, otherwise false + */ + def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { + val checkpointPath = new Path(checkpointLocation) + val checkpointManager = + CheckpointFileManager.create(checkpointPath, spark.sessionState.newHadoopConf()) + try { + checkpointManager.exists(checkpointPath) + } catch { + case e: IOException => + logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e) + false + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index ab5b3714b..f99f55790 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.refresh import java.util.Collections -import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions} +import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} @@ -25,7 +25,9 @@ import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger} * @param index * Flint index */ -class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresh { +class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) + extends FlintSparkIndexRefresh + with FlintSparkValidationHelper { override def refreshMode: RefreshMode = AUTO @@ -36,8 +38,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintS !options.incrementalRefresh(), "Incremental refresh cannot be enabled if auto refresh is enabled") - // Non-Hive table is required for auto refresh - require(isSourceTableNonHive(spark, index), "Index auto refresh doesn't support Hive table") + // Hive table doesn't support auto refresh + require(!isSourceTableHive(spark, index), "Index auto refresh doesn't support Hive table") // Checkpoint location is required if mandatory option set val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) @@ -48,7 +50,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintS s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled") } - // Given checkpoint location is accessible + // Checkpoint location must be accessible if (checkpointLocation.isDefined) { require( isCheckpointLocationAccessible(spark, checkpointLocation.get), diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 9c8397513..608907891 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -5,21 +5,11 @@ package org.opensearch.flint.spark.refresh -import java.io.IOException - -import org.apache.hadoop.fs.Path import org.opensearch.flint.spark.FlintSparkIndex -import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.RefreshMode -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.streaming.CheckpointFileManager -import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} import org.apache.spark.sql.flint.config.FlintSparkConf /** @@ -37,7 +27,7 @@ trait FlintSparkIndexRefresh extends Logging { /** * Validate the current index refresh beforehand. */ - def validate(spark: SparkSession): Unit = {} + def validate(spark: SparkSession): Unit /** * Start refreshing the index. @@ -50,67 +40,6 @@ trait FlintSparkIndexRefresh extends Logging { * optional Spark job ID */ def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] - - /** - * Validate if source table(s) of the given Flint index are not Hive table. - * - * @param spark - * Spark session - * @param index - * Flint index - * @return - * true if all non Hive, otherwise false - */ - protected def isSourceTableNonHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { - // Extract source table name (possibly more than 1 for MV source query) - val tableNames = index match { - case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) - case covering: FlintSparkCoveringIndex => Seq(covering.tableName) - case mv: FlintSparkMaterializedView => - spark.sessionState.sqlParser - .parsePlan(mv.query) - .collect { case relation: UnresolvedRelation => - qualifyTableName(spark, relation.tableName) - } - } - - // Validate each source table is Hive - tableNames.forall { tableName => - val (catalog, ident) = parseTableName(spark, tableName) - val table = loadTable(catalog, ident).get - !DDLUtils.isHiveTable(Option(table.properties().get("provider"))) - } - } - - /** - * Validate if checkpoint location is accessible (the folder exists and current Spark session - * has permission to access). - * - * @param spark - * Spark session - * @param checkpointLocation - * checkpoint location - * @return - * true if accessible, otherwise false - */ - protected def isCheckpointLocationAccessible( - spark: SparkSession, - checkpointLocation: String): Boolean = { - val checkpointPath = new Path(checkpointLocation) - val checkpointManager = - CheckpointFileManager.create(checkpointPath, spark.sessionState.newHadoopConf()) - try { - // require( - checkpointManager.exists(checkpointPath) - // s"Checkpoint location $checkpointLocation doesn't exist") - } catch { - case e: IOException => - logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e) - // throw new IllegalArgumentException( - // s"No permission to access checkpoint location $checkpointLocation") - false - } - } } object FlintSparkIndexRefresh { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala index be09c2c36..c66361d09 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala @@ -31,6 +31,10 @@ class FullIndexRefresh( override def refreshMode: RefreshMode = FULL + override def validate(spark: SparkSession): Unit = { + // Nothing to validate for full refresh for now + } + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { logInfo(s"Start refreshing index $indexName in full mode") index diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index d60d491a3..80aa91687 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark.refresh -import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkValidationHelper} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode} import org.apache.spark.sql.SparkSession @@ -20,14 +20,15 @@ import org.apache.spark.sql.flint.config.FlintSparkConf * Flint index */ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) - extends FlintSparkIndexRefresh { + extends FlintSparkIndexRefresh + with FlintSparkValidationHelper { override def refreshMode: RefreshMode = INCREMENTAL override def validate(spark: SparkSession): Unit = { // Non-Hive table is required for incremental refresh require( - isSourceTableNonHive(spark, index), + !isSourceTableHive(spark, index), "Index incremental refresh doesn't support Hive table") // Checkpoint location is required regardless of mandatory option diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala index a0fda1285..555700151 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala @@ -27,7 +27,6 @@ trait FlintSparkHiveSuite extends FlintSparkSuite { // Use in-memory Derby as Hive metastore so no need to clean up metastore_db folder after test .set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:metastore_db;create=true") .set("hive.metastore.uris", "") - .set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver") } override protected def createSparkSession: TestSparkSession = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 6991e60d8..eb58de567 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -125,7 +125,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") { setFlintSparkConf(CHECKPOINT_MANDATORY, "true") try { - the[IllegalStateException] thrownBy { + the[IllegalArgumentException] thrownBy { sql(s""" | CREATE INDEX $testIndex ON $testTable | (name, age) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 16d2b0b07..83fe1546c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -43,56 +43,58 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } test("create materialized view with metadata successfully") { - val indexOptions = - FlintSparkIndexOptions( - Map( - "auto_refresh" -> "true", - "checkpoint_location" -> "s3://test/", - "watermark_delay" -> "30 Seconds")) - flint - .materializedView() - .name(testMvName) - .query(testQuery) - .options(indexOptions) - .create() + withTempDir { checkpointDir => + val indexOptions = + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "watermark_delay" -> "30 Seconds")) + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .options(indexOptions) + .create() - val index = flint.describeIndex(testFlintIndex) - index shouldBe defined - index.get.metadata().getContent should matchJson(s""" - | { - | "_meta": { - | "version": "${current()}", - | "name": "spark_catalog.default.mv_test_metrics", - | "kind": "mv", - | "source": "$testQuery", - | "indexedColumns": [ - | { - | "columnName": "startTime", - | "columnType": "timestamp" - | },{ - | "columnName": "count", - | "columnType": "bigint" - | }], - | "options": { - | "auto_refresh": "true", - | "incremental_refresh": "false", - | "checkpoint_location": "s3://test/", - | "watermark_delay": "30 Seconds" - | }, - | "latestId": "$testLatestId", - | "properties": {} - | }, - | "properties": { - | "startTime": { - | "type": "date", - | "format": "strict_date_optional_time_nanos" - | }, - | "count": { - | "type": "long" - | } - | } - | } - |""".stripMargin) + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + index.get.metadata().getContent should matchJson(s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "spark_catalog.default.mv_test_metrics", + | "kind": "mv", + | "source": "$testQuery", + | "indexedColumns": [ + | { + | "columnName": "startTime", + | "columnType": "timestamp" + | },{ + | "columnName": "count", + | "columnType": "bigint" + | }], + | "options": { + | "auto_refresh": "true", + | "incremental_refresh": "false", + | "checkpoint_location": "${checkpointDir.getAbsolutePath}", + | "watermark_delay": "30 Seconds" + | }, + | "latestId": "$testLatestId", + | "properties": {} + | }, + | "properties": { + | "startTime": { + | "type": "date", + | "format": "strict_date_optional_time_nanos" + | }, + | "count": { + | "type": "long" + | } + | } + | } + |""".stripMargin) + } } test("full refresh materialized view") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 8b724fde7..8c7c5be82 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -141,36 +141,39 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("create skipping index with index options successfully") { - flint - .skippingIndex() - .onTable(testTable) - .addValueSet("address") - .options(FlintSparkIndexOptions(Map( - "auto_refresh" -> "true", - "refresh_interval" -> "1 Minute", - "checkpoint_location" -> "s3a://test/", - "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"))) - .create() + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .options(FlintSparkIndexOptions(Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"))) + .create() - val index = flint.describeIndex(testIndex) - index shouldBe defined - val optionJson = compact(render(parse(index.get.metadata().getContent) \ "_meta" \ "options")) - optionJson should matchJson(""" - | { - | "auto_refresh": "true", - | "incremental_refresh": "false", - | "refresh_interval": "1 Minute", - | "checkpoint_location": "s3a://test/", - | "index_settings": "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" - | } - |""".stripMargin) + val index = flint.describeIndex(testIndex) + index shouldBe defined + val optionJson = + compact(render(parse(index.get.metadata().getContent) \ "_meta" \ "options")) + optionJson should matchJson(s""" + | { + | "auto_refresh": "true", + | "incremental_refresh": "false", + | "refresh_interval": "1 Minute", + | "checkpoint_location": "${checkpointDir.getAbsolutePath}", + | "index_settings": "{\\"number_of_shards\\": 3,\\"number_of_replicas\\": 2}" + | } + |""".stripMargin) - // Load index options from index mapping (verify OS index setting in SQL IT) - index.get.options.autoRefresh() shouldBe true - index.get.options.refreshInterval() shouldBe Some("1 Minute") - index.get.options.checkpointLocation() shouldBe Some("s3a://test/") - index.get.options.indexSettings() shouldBe - Some("{\"number_of_shards\": 3,\"number_of_replicas\": 2}") + // Load index options from index mapping (verify OS index setting in SQL IT) + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("1 Minute") + index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + index.get.options.indexSettings() shouldBe + Some("{\"number_of_shards\": 3,\"number_of_replicas\": 2}") + } } test("should not have ID column in index data") { @@ -233,16 +236,14 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should fail if incremental refresh without checkpoint location") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) - .create() - - assertThrows[IllegalStateException] { - flint.refreshIndex(testIndex) - } + the[IllegalArgumentException] thrownBy { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) + .create() + } should have message "requirement failed: Checkpoint location is required by incremental refresh" } test("auto refresh skipping index successfully") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 53d08bda7..cddda3122 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -178,7 +178,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") { setFlintSparkConf(CHECKPOINT_MANDATORY, "true") try { - the[IllegalStateException] thrownBy { + the[IllegalArgumentException] thrownBy { sql(s""" | CREATE SKIPPING INDEX ON $testTable | ( year PARTITION ) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index c5ac0ab95..c72b06fbf 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -199,12 +199,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "false", "incremental_refresh" -> "true", - "refresh_interval" -> "1 Minute"), + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> "s3a://test/"), Map( "auto_refresh" -> false, "incremental_refresh" -> true, "refresh_interval" -> Some("1 Minute"), - "checkpoint_location" -> None, + "checkpoint_location" -> Some("s3a://test/"), "watermark_delay" -> None)), ( Map("auto_refresh" -> "true"), @@ -223,12 +224,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "false", "incremental_refresh" -> "true", + "checkpoint_location" -> "s3a://test/", "watermark_delay" -> "1 Minute"), Map( "auto_refresh" -> false, "incremental_refresh" -> true, "refresh_interval" -> None, - "checkpoint_location" -> None, + "checkpoint_location" -> Some("s3a://test/"), "watermark_delay" -> Some("1 Minute"))))), ( "convert to auto refresh with allowed options", From 83202146a01c2df0b0a66cae1e2b014785b3d174 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 3 Apr 2024 15:00:32 -0700 Subject: [PATCH 07/10] Add more IT Signed-off-by: Chen Dai --- .../FlintSparkHiveValidationITSuite.scala | 92 --------- .../FlintSparkIndexValidationITSuite.scala | 186 ++++++++++++++++++ 2 files changed, 186 insertions(+), 92 deletions(-) delete mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala deleted file mode 100644 index af5808045..000000000 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkHiveValidationITSuite.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark - -import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex -import org.scalatest.matchers.must.Matchers.have -import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} - -import org.apache.spark.sql.FlintSparkHiveSuite - -class FlintSparkHiveValidationITSuite extends FlintSparkHiveSuite { - - // Test Hive table name - private val hiveTableName = "spark_catalog.default.hive_table" - - // Test create Flint index name and DDL statement - private val skippingIndexName = FlintSparkSkippingIndex.getSkippingIndexName(hiveTableName) - private val createSkippingIndexStatement = - s"CREATE SKIPPING INDEX ON $hiveTableName (name VALUE_SET)" - - private val coveringIndexName = - FlintSparkCoveringIndex.getFlintIndexName("ci_test", hiveTableName) - private val createCoveringIndexStatement = - s"CREATE INDEX ci_test ON $hiveTableName (name)" - - private val materializedViewName = - FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default.mv_test") - private val createMaterializedViewStatement = - s"CREATE MATERIALIZED VIEW spark_catalog.default.mv_test AS SELECT * FROM $hiveTableName" - - override def beforeAll(): Unit = { - super.beforeAll() - sql(s"CREATE TABLE $hiveTableName (name STRING)") - sql(s"INSERT INTO $hiveTableName VALUES ('test')") - } - - override def afterAll(): Unit = { - sql(s"DROP TABLE $hiveTableName") - super.afterAll() - } - - Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) - .foreach { statement => - test(s"should fail to create auto refresh Flint index on Hive table: $statement") { - the[IllegalArgumentException] thrownBy { - withTempDir { checkpointDir => - sql(s""" - | $statement - | WITH ( - | auto_refresh = true, - | checkpoint_location = '${checkpointDir.getAbsolutePath}' - | ) - |""".stripMargin) - } - } should have message "requirement failed: Index auto refresh doesn't support Hive table" - } - } - - Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) - .foreach { statement => - test(s"should fail to create incremental refresh Flint index on Hive table: $statement") { - the[IllegalArgumentException] thrownBy { - withTempDir { checkpointDir => - sql(s""" - | $statement - | WITH ( - | incremental_refresh = true, - | checkpoint_location = '${checkpointDir.getAbsolutePath}' - | ) - |""".stripMargin) - } - } should have message "requirement failed: Index incremental refresh doesn't support Hive table" - } - } - - Seq( - (skippingIndexName, createSkippingIndexStatement), - (coveringIndexName, createCoveringIndexStatement), - (materializedViewName, createMaterializedViewStatement)).foreach { - case (flintIndexName, statement) => - test(s"should succeed to create full refresh Flint index on Hive table: $flintIndexName") { - sql(statement) - flint.refreshIndex(flintIndexName) - flint.queryIndex(flintIndexName).count() shouldBe 1 - } - } -} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala new file mode 100644 index 000000000..bcabbd3a8 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.{Locale, UUID} + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, INCREMENTAL, RefreshMode} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.scalatest.matchers.must.Matchers.have +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} + +import org.apache.spark.sql.FlintSparkHiveSuite +import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY + +class FlintSparkIndexValidationITSuite extends FlintSparkHiveSuite { + + // Test Hive table name + private val testTable = "spark_catalog.default.index_validation_test" + + // Test create Flint index name and DDL statement + private val skippingIndexName = FlintSparkSkippingIndex.getSkippingIndexName(testTable) + private val createSkippingIndexStatement = + s"CREATE SKIPPING INDEX ON $testTable (name VALUE_SET)" + + private val coveringIndexName = + FlintSparkCoveringIndex.getFlintIndexName("ci_test", testTable) + private val createCoveringIndexStatement = + s"CREATE INDEX ci_test ON $testTable (name)" + + private val materializedViewName = + FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default.mv_test") + private val createMaterializedViewStatement = + s"CREATE MATERIALIZED VIEW spark_catalog.default.mv_test AS SELECT * FROM $testTable" + + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test( + s"should fail to create auto refresh Flint index if incremental refresh enabled: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | auto_refresh = true, + | incremental_refresh = true + | ) + |""".stripMargin) + } should have message + "requirement failed: Incremental refresh cannot be enabled if auto refresh is enabled" + } + } + } + + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test( + s"should fail to create auto refresh Flint index if checkpoint location mandatory: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + the[IllegalArgumentException] thrownBy { + try { + setFlintSparkConf(CHECKPOINT_MANDATORY, "true") + sql(s""" + | $statement + | WITH ( + | auto_refresh = true + | ) + |""".stripMargin) + } finally { + setFlintSparkConf(CHECKPOINT_MANDATORY, "false") + } + } should have message + s"requirement failed: Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled" + } + } + } + + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test( + s"should fail to create incremental refresh Flint index without checkpoint location: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | incremental_refresh = true + | ) + |""".stripMargin) + } should have message + "requirement failed: Checkpoint location is required by incremental refresh" + } + } + } + + Seq( + (AUTO, createSkippingIndexStatement), + (AUTO, createCoveringIndexStatement), + (AUTO, createMaterializedViewStatement), + (INCREMENTAL, createSkippingIndexStatement), + (INCREMENTAL, createCoveringIndexStatement), + (INCREMENTAL, createMaterializedViewStatement)) + .foreach { case (refreshMode, statement) => + test( + s"should fail to create $refreshMode refresh Flint index if checkpoint location is inaccessible: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + // Generate UUID as folder name to ensure the path not exist + val checkpointDir = s"/test/${UUID.randomUUID()}" + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | ${optionName(refreshMode)} = true, + | checkpoint_location = "$checkpointDir" + | ) + |""".stripMargin) + } should have message + s"requirement failed: Checkpoint location $checkpointDir doesn't exist or no permission to access" + } + } + } + + Seq( + (AUTO, createSkippingIndexStatement), + (AUTO, createCoveringIndexStatement), + (AUTO, createMaterializedViewStatement), + (INCREMENTAL, createSkippingIndexStatement), + (INCREMENTAL, createCoveringIndexStatement), + (INCREMENTAL, createMaterializedViewStatement)) + .foreach { case (refreshMode, statement) => + test(s"should fail to create $refreshMode refresh Flint index on Hive table: $statement") { + withTempDir { checkpointDir => + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING)") + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | ${optionName(refreshMode)} = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } should have message + s"requirement failed: Index ${lowercase(refreshMode)} refresh doesn't support Hive table" + } + } + } + } + + Seq( + (skippingIndexName, createSkippingIndexStatement), + (coveringIndexName, createCoveringIndexStatement), + (materializedViewName, createMaterializedViewStatement)).foreach { + case (flintIndexName, statement) => + test(s"should succeed to create full refresh Flint index on Hive table: $flintIndexName") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING)") + sql(s"INSERT INTO $testTable VALUES ('test')") + + sql(statement) + flint.refreshIndex(flintIndexName) + flint.queryIndex(flintIndexName).count() shouldBe 1 + } + } + } + + private def lowercase(mode: RefreshMode): String = mode.toString.toLowerCase(Locale.ROOT) + + private def optionName(mode: RefreshMode): String = mode match { + case AUTO => "auto_refresh" + case INCREMENTAL => "incremental_refresh" + } +} From 2c2e1296f21d5fa666d195ab39be164f86353aea Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 4 Apr 2024 14:16:25 -0700 Subject: [PATCH 08/10] Rename Hive test base suite Signed-off-by: Chen Dai --- ...ntSparkHiveSuite.scala => SparkHiveSupportSuite.scala} | 8 ++++---- .../flint/spark/FlintSparkIndexValidationITSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) rename integ-test/src/test/scala/org/apache/spark/sql/{FlintSparkHiveSuite.scala => SparkHiveSupportSuite.scala} (87%) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/SparkHiveSupportSuite.scala similarity index 87% rename from integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala rename to integ-test/src/test/scala/org/apache/spark/sql/SparkHiveSupportSuite.scala index 555700151..36a0b526d 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintSparkHiveSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/SparkHiveSupportSuite.scala @@ -5,20 +5,20 @@ package org.apache.spark.sql -import org.opensearch.flint.spark.FlintSparkSuite - import org.apache.spark.SparkConf import org.apache.spark.sql.hive.HiveSessionStateBuilder import org.apache.spark.sql.internal.{SessionState, StaticSQLConf} -import org.apache.spark.sql.test.TestSparkSession +import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} /** * Flint Spark base suite with Hive support enabled. Because enabling Hive support in Spark * configuration alone is not adequate, as [[TestSparkSession]] disregards it and consistently * creates its own instance of [[org.apache.spark.sql.test.TestSQLSessionStateBuilder]]. We need * to override its session state with that of Hive in the meanwhile. + * + * Note that we need to extend [[SharedSparkSession]] to call super.sparkConf() method. */ -trait FlintSparkHiveSuite extends FlintSparkSuite { +trait SparkHiveSupportSuite extends SharedSparkSession { override protected def sparkConf: SparkConf = { super.sparkConf diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala index bcabbd3a8..ee7420d94 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -14,10 +14,10 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.scalatest.matchers.must.Matchers.have import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} -import org.apache.spark.sql.FlintSparkHiveSuite +import org.apache.spark.sql.SparkHiveSupportSuite import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY -class FlintSparkIndexValidationITSuite extends FlintSparkHiveSuite { +class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSupportSuite { // Test Hive table name private val testTable = "spark_catalog.default.index_validation_test" From 2c47c16432a72926cde2f4e1e770612405dc5e27 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 5 Apr 2024 11:43:21 -0700 Subject: [PATCH 09/10] Polish Javadoc and comments Signed-off-by: Chen Dai --- .../spark/FlintSparkValidationHelper.scala | 22 ++++++++++--------- .../refresh/FlintSparkIndexRefresh.scala | 11 +++++++++- .../spark/refresh/FullIndexRefresh.scala | 3 ++- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index 8b3e9c4e8..16d5a1905 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -22,11 +22,10 @@ import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} /** * Flint Spark validation helper. */ -trait FlintSparkValidationHelper { - self: Logging => +trait FlintSparkValidationHelper extends Logging { /** - * Validate if source table(s) of the given Flint index are not Hive table. + * Determines whether the source table(s) for a given Flint index are Hive tables. * * @param spark * Spark session @@ -36,7 +35,7 @@ trait FlintSparkValidationHelper { * true if all non Hive, otherwise false */ def isSourceTableHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { - // Extract source table name (possibly more than 1 for MV source query) + // Extract source table name (possibly more than one for MV query) val tableNames = index match { case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) case covering: FlintSparkCoveringIndex => Seq(covering.tableName) @@ -57,8 +56,9 @@ trait FlintSparkValidationHelper { } /** - * Validate if checkpoint location is accessible (the folder exists and current Spark session - * has permission to access). + * Checks whether a specified checkpoint location is accessible. Accessibility, in this context, + * means that the folder exists and the current Spark session has the necessary permissions to + * access it. * * @param spark * Spark session @@ -68,11 +68,13 @@ trait FlintSparkValidationHelper { * true if accessible, otherwise false */ def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { - val checkpointPath = new Path(checkpointLocation) - val checkpointManager = - CheckpointFileManager.create(checkpointPath, spark.sessionState.newHadoopConf()) try { - checkpointManager.exists(checkpointPath) + val checkpointManager = + CheckpointFileManager.create( + new Path(checkpointLocation), + spark.sessionState.newHadoopConf()) + + checkpointManager.exists(new Path(checkpointLocation)) } catch { case e: IOException => logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 608907891..0c6adb0bd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -25,7 +25,16 @@ trait FlintSparkIndexRefresh extends Logging { def refreshMode: RefreshMode /** - * Validate the current index refresh beforehand. + * Validates the current index refresh settings before the actual execution begins. This method + * checks for the integrity of the index refresh configurations and ensures that all options set + * for the current refresh mode are valid. This preemptive validation helps in identifying + * configuration issues before the refresh operation is initiated, minimizing runtime errors and + * potential inconsistencies. + * + * @param spark + * Spark session + * @throws IllegalArgumentException + * if any invalid or inapplicable config identified */ def validate(spark: SparkSession): Unit diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala index c66361d09..b2ce2ad34 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresh.scala @@ -32,7 +32,8 @@ class FullIndexRefresh( override def refreshMode: RefreshMode = FULL override def validate(spark: SparkSession): Unit = { - // Nothing to validate for full refresh for now + // Full refresh validates nothing for now, including Hive table validation. + // This allows users to continue using their existing Hive table with full refresh only. } override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { From 7731309a371ffe2e272ede26a9778005f4cc1a15 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 16 Apr 2024 16:54:36 -0700 Subject: [PATCH 10/10] Address PR comments Signed-off-by: Chen Dai --- .../scala/org/opensearch/flint/spark/FlintSpark.scala | 3 --- .../flint/spark/FlintSparkValidationHelper.scala | 8 +++++--- .../opensearch/flint/spark/refresh/AutoIndexRefresh.scala | 6 ++++-- .../flint/spark/refresh/IncrementalIndexRefresh.scala | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 7b875f63b..9cd5f60a7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -407,9 +407,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { case (true, false) => AUTO case (false, false) => FULL case (false, true) => INCREMENTAL - case (true, true) => - throw new IllegalArgumentException( - "auto_refresh and incremental_refresh options cannot both be true") } // validate allowed options depending on refresh mode diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index 16d5a1905..f689d9aee 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} trait FlintSparkValidationHelper extends Logging { /** - * Determines whether the source table(s) for a given Flint index are Hive tables. + * Determines whether the source table(s) for a given Flint index are supported. * * @param spark * Spark session @@ -34,7 +34,7 @@ trait FlintSparkValidationHelper extends Logging { * @return * true if all non Hive, otherwise false */ - def isSourceTableHive(spark: SparkSession, index: FlintSparkIndex): Boolean = { + def isTableProviderSupported(spark: SparkSession, index: FlintSparkIndex): Boolean = { // Extract source table name (possibly more than one for MV query) val tableNames = index match { case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) @@ -47,10 +47,12 @@ trait FlintSparkValidationHelper extends Logging { } } - // Validate if any source table is Hive + // Validate if any source table is not supported (currently Hive only) tableNames.exists { tableName => val (catalog, ident) = parseTableName(spark, tableName) val table = loadTable(catalog, ident).get + + // TODO: add allowed table provider list DDLUtils.isHiveTable(Option(table.properties().get("provider"))) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index f99f55790..35902e184 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -39,11 +39,13 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) "Incremental refresh cannot be enabled if auto refresh is enabled") // Hive table doesn't support auto refresh - require(!isSourceTableHive(spark, index), "Index auto refresh doesn't support Hive table") + require( + !isTableProviderSupported(spark, index), + "Index auto refresh doesn't support Hive table") // Checkpoint location is required if mandatory option set val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) - val checkpointLocation = index.options.checkpointLocation() + val checkpointLocation = options.checkpointLocation() if (flintSparkConf.isCheckpointMandatory) { require( checkpointLocation.isDefined, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 80aa91687..8eb8d6f1f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -28,7 +28,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def validate(spark: SparkSession): Unit = { // Non-Hive table is required for incremental refresh require( - !isSourceTableHive(spark, index), + !isTableProviderSupported(spark, index), "Index incremental refresh doesn't support Hive table") // Checkpoint location is required regardless of mandatory option