From f4416b175b005f67afae51a8b1bfdab3c25ea756 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 30 Apr 2024 10:18:13 -0700 Subject: [PATCH] Fix Iceberg IT Signed-off-by: Chen Dai --- .../ApplyFlintSparkCoveringIndex.scala | 21 ++--- .../source/FlintSparkSourceRelation.scala | 12 --- .../FlintSparkSourceRelationProvider.scala | 77 +++++++++++++++++++ .../source/file/FileSourceRelation.scala | 15 +--- .../file/FileSourceRelationProvider.scala | 30 ++++++++ .../iceberg/IcebergSourceRelation.scala | 25 +----- .../IcebergSourceRelationProvider.scala | 40 ++++++++++ ...lintSparkIcebergCoveringIndexITSuite.scala | 5 +- 8 files changed, 156 insertions(+), 69 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelationProvider.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/file/FileSourceRelationProvider.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/iceberg/IcebergSourceRelationProvider.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala index bbc1ada5a..3b27f0bc3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala @@ -11,13 +11,11 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.D import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider} -import org.opensearch.flint.spark.source.file.FileSourceRelationProvider -import org.opensearch.flint.spark.source.iceberg.IcebergSourceRelationProvider +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.flint.{qualifyTableName, FlintDataSourceV2} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -31,24 +29,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap */ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] { - private val supportedSourceRelations: Seq[FlintSparkSourceRelationProvider] = { - var relations = Seq[FlintSparkSourceRelationProvider]() - relations = relations :+ new FileSourceRelationProvider - - if (flint.spark.conf - .getOption("spark.sql.catalog.spark_catalog") - .contains("org.apache.iceberg.spark.SparkSessionCatalog")) { - relations = relations :+ new IcebergSourceRelationProvider - } - relations - } + /** All supported source relation providers */ + private val relationProviders = FlintSparkSourceRelationProvider.getProviders(flint.spark) override def apply(plan: LogicalPlan): LogicalPlan = { if (plan.isInstanceOf[V2WriteCommand]) { plan } else { plan transform { case subPlan => - supportedSourceRelations + relationProviders .collectFirst { case relationProvider if relationProvider.isSupported(subPlan) => val relation = relationProvider.getRelation(subPlan) @@ -79,7 +68,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] val relationColsById = relation.output.map(attr => (attr.exprId, attr)).toMap plan .collect { - case r: LogicalRelation if r.eq(relation.plan) => Set.empty + case r: MultiInstanceRelation if r.eq(relation.plan) => Set.empty case other => other.expressions .flatMap(_.references) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelation.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelation.scala index 3e208569b..f5d063ba7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelation.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelation.scala @@ -5,13 +5,8 @@ package org.opensearch.flint.spark.source -import org.opensearch.flint.spark.source.file.FileSourceRelation -import org.opensearch.flint.spark.source.iceberg.IcebergSourceRelation - import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation /** * This source relation abstraction allows Flint to interact uniformly with different kinds of @@ -38,10 +33,3 @@ trait FlintSparkSourceRelation { */ def output: Seq[AttributeReference] } - -trait FlintSparkSourceRelationProvider { - - def isSupported(plan: LogicalPlan): Boolean - - def getRelation(plan: LogicalPlan): FlintSparkSourceRelation -} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelationProvider.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelationProvider.scala new file mode 100644 index 000000000..f193e6568 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/FlintSparkSourceRelationProvider.scala @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.source + +import org.opensearch.flint.spark.source.file.FileSourceRelationProvider +import org.opensearch.flint.spark.source.iceberg.IcebergSourceRelationProvider + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * A provider defines what kind of logical plan can be supported by Flint Spark integration. It + * serves similar purpose to Scala extractor which has to be used in match case statement. + * However, the problem here is we want to avoid hard dependency on some data source code, such as + * Iceberg. In this case, we have to maintain a list of provider and run it only if the 3rd party + * library is available in current Spark session. + */ +trait FlintSparkSourceRelationProvider { + + /** + * @return + * the name of the source relation provider + */ + def name(): String + + /** + * Determines whether the given logical plan is supported by this provider. + * + * @param plan + * the logical plan to evaluate + * @return + * true if the plan is supported, false otherwise + */ + def isSupported(plan: LogicalPlan): Boolean + + /** + * Creates a source relation based on the provided logical plan. + * + * @param plan + * the logical plan to wrap in source relation + * @return + * an instance of source relation + */ + def getRelation(plan: LogicalPlan): FlintSparkSourceRelation +} + +/** + * Companion object provides utility methods. + */ +object FlintSparkSourceRelationProvider { + + /** + * Retrieve all supported source relation provider for the given Spark session. + * + * @param spark + * the Spark session + * @return + * a sequence of source relation provider + */ + def getProviders(spark: SparkSession): Seq[FlintSparkSourceRelationProvider] = { + var relations = Seq[FlintSparkSourceRelationProvider]() + + // File source is built-in supported + relations = relations :+ new FileSourceRelationProvider + + // Add Iceberg provider if it's enabled in Spark conf + if (spark.conf + .getOption("spark.sql.catalog.spark_catalog") + .contains("org.apache.iceberg.spark.SparkSessionCatalog")) { + relations = relations :+ new IcebergSourceRelationProvider + } + relations + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/file/FileSourceRelation.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/file/FileSourceRelation.scala index 2d8412de9..302e7bb3b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/file/FileSourceRelation.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/file/FileSourceRelation.scala @@ -21,21 +21,8 @@ case class FileSourceRelation(override val plan: LogicalRelation) extends FlintSparkSourceRelation { override def tableName: String = - plan.catalogTable - .getOrElse(throw new IllegalArgumentException("No table found in the source relation plan")) + plan.catalogTable.get // catalogTable must be present as pre-checked in source relation provider's .qualifiedName override def output: Seq[AttributeReference] = plan.output } - -class FileSourceRelationProvider extends FlintSparkSourceRelationProvider { - - override def isSupported(plan: LogicalPlan): Boolean = plan match { - case LogicalRelation(_, _, Some(_), false) => true - case _ => false - } - - override def getRelation(plan: LogicalPlan): FlintSparkSourceRelation = { - FileSourceRelation(plan.asInstanceOf[LogicalRelation]) - } -} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/file/FileSourceRelationProvider.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/file/FileSourceRelationProvider.scala new file mode 100644 index 000000000..d35309dcc --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/file/FileSourceRelationProvider.scala @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.source.file + +import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider} + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation + +/** + * Source relation provider for Spark built-in file-based source. + * + * @param name + * the name of the file source provider + */ +class FileSourceRelationProvider(override val name: String = "file") + extends FlintSparkSourceRelationProvider { + + override def isSupported(plan: LogicalPlan): Boolean = plan match { + case LogicalRelation(_, _, Some(_), false) => true + case _ => false + } + + override def getRelation(plan: LogicalPlan): FlintSparkSourceRelation = { + FileSourceRelation(plan.asInstanceOf[LogicalRelation]) + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/iceberg/IcebergSourceRelation.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/iceberg/IcebergSourceRelation.scala index 6a8679f83..e00291100 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/iceberg/IcebergSourceRelation.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/iceberg/IcebergSourceRelation.scala @@ -5,11 +5,9 @@ package org.opensearch.flint.spark.source.iceberg -import org.apache.iceberg.spark.source.SparkTable -import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider} +import org.opensearch.flint.spark.source.FlintSparkSourceRelation import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation /** @@ -23,29 +21,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation case class IcebergSourceRelation(override val plan: DataSourceV2Relation) extends FlintSparkSourceRelation { - /** - * Retrieves the fully qualified name of the table from the Iceberg table metadata. If the - * Iceberg table is not correctly referenced or the metadata is missing, an exception is thrown. - */ override def tableName: String = plan.table.name() // TODO: confirm - /** - * Provides the output attributes of the logical plan. These attributes represent the schema of - * the Iceberg table as it appears in Spark's logical plan and are used to define the structure - * of the data returned by scans of the Iceberg table. - */ override def output: Seq[AttributeReference] = plan.output } - -class IcebergSourceRelationProvider extends FlintSparkSourceRelationProvider { - - override def isSupported(plan: LogicalPlan): Boolean = plan match { - case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true - case _ => false - } - - override def getRelation(plan: LogicalPlan): FlintSparkSourceRelation = { - IcebergSourceRelation(plan.asInstanceOf[DataSourceV2Relation]) - } -} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/iceberg/IcebergSourceRelationProvider.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/iceberg/IcebergSourceRelationProvider.scala new file mode 100644 index 000000000..9cfb4c88c --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/source/iceberg/IcebergSourceRelationProvider.scala @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.source.iceberg + +import org.apache.iceberg.spark.source.SparkTable +import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider} + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} + +/** + * Source relation provider for Apache Iceberg-based source. + * + * @param name + * the name of the Iceberg source provider + */ +class IcebergSourceRelationProvider(override val name: String = "iceberg") + extends FlintSparkSourceRelationProvider { + + override def isSupported(plan: LogicalPlan): Boolean = plan match { + case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true + case DataSourceV2ScanRelation(DataSourceV2Relation(_: SparkTable, _, _, _, _), _, _, _) => + true + case _ => false + } + + override def getRelation(plan: LogicalPlan): FlintSparkSourceRelation = plan match { + case relation @ DataSourceV2Relation(_: SparkTable, _, _, _, _) => + IcebergSourceRelation(relation) + case DataSourceV2ScanRelation( + relation @ DataSourceV2Relation(_: SparkTable, _, _, _, _), + _, + _, + _) => + IcebergSourceRelation(relation) + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergCoveringIndexITSuite.scala index a10be970b..2675ef0cd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergCoveringIndexITSuite.scala @@ -5,9 +5,8 @@ package org.opensearch.flint.spark.iceberg -// FIXME: support Iceberg table in covering index rewrite rule -/* +import org.opensearch.flint.spark.FlintSparkCoveringIndexSqlITSuite + class FlintSparkIcebergCoveringIndexITSuite extends FlintSparkCoveringIndexSqlITSuite with FlintSparkIcebergSuite {} - */