Skip to content

Commit

Permalink
Fix Iceberg IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Apr 30, 2024
1 parent 5240d53 commit f4416b1
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.D
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider}
import org.opensearch.flint.spark.source.file.FileSourceRelationProvider
import org.opensearch.flint.spark.source.iceberg.IcebergSourceRelationProvider

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.{qualifyTableName, FlintDataSourceV2}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -31,24 +29,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
*/
class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] {

private val supportedSourceRelations: Seq[FlintSparkSourceRelationProvider] = {
var relations = Seq[FlintSparkSourceRelationProvider]()
relations = relations :+ new FileSourceRelationProvider

if (flint.spark.conf
.getOption("spark.sql.catalog.spark_catalog")
.contains("org.apache.iceberg.spark.SparkSessionCatalog")) {
relations = relations :+ new IcebergSourceRelationProvider
}
relations
}
/** All supported source relation providers */
private val relationProviders = FlintSparkSourceRelationProvider.getProviders(flint.spark)

override def apply(plan: LogicalPlan): LogicalPlan = {
if (plan.isInstanceOf[V2WriteCommand]) {
plan
} else {
plan transform { case subPlan =>
supportedSourceRelations
relationProviders
.collectFirst {
case relationProvider if relationProvider.isSupported(subPlan) =>
val relation = relationProvider.getRelation(subPlan)
Expand Down Expand Up @@ -79,7 +68,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
val relationColsById = relation.output.map(attr => (attr.exprId, attr)).toMap
plan
.collect {
case r: LogicalRelation if r.eq(relation.plan) => Set.empty
case r: MultiInstanceRelation if r.eq(relation.plan) => Set.empty
case other =>
other.expressions
.flatMap(_.references)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@

package org.opensearch.flint.spark.source

import org.opensearch.flint.spark.source.file.FileSourceRelation
import org.opensearch.flint.spark.source.iceberg.IcebergSourceRelation

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

/**
* This source relation abstraction allows Flint to interact uniformly with different kinds of
Expand All @@ -38,10 +33,3 @@ trait FlintSparkSourceRelation {
*/
def output: Seq[AttributeReference]
}

trait FlintSparkSourceRelationProvider {

def isSupported(plan: LogicalPlan): Boolean

def getRelation(plan: LogicalPlan): FlintSparkSourceRelation
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.source

import org.opensearch.flint.spark.source.file.FileSourceRelationProvider
import org.opensearch.flint.spark.source.iceberg.IcebergSourceRelationProvider

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* A provider defines what kind of logical plan can be supported by Flint Spark integration. It
* serves similar purpose to Scala extractor which has to be used in match case statement.
* However, the problem here is we want to avoid hard dependency on some data source code, such as
* Iceberg. In this case, we have to maintain a list of provider and run it only if the 3rd party
* library is available in current Spark session.
*/
trait FlintSparkSourceRelationProvider {

/**
* @return
* the name of the source relation provider
*/
def name(): String

/**
* Determines whether the given logical plan is supported by this provider.
*
* @param plan
* the logical plan to evaluate
* @return
* true if the plan is supported, false otherwise
*/
def isSupported(plan: LogicalPlan): Boolean

/**
* Creates a source relation based on the provided logical plan.
*
* @param plan
* the logical plan to wrap in source relation
* @return
* an instance of source relation
*/
def getRelation(plan: LogicalPlan): FlintSparkSourceRelation
}

/**
* Companion object provides utility methods.
*/
object FlintSparkSourceRelationProvider {

/**
* Retrieve all supported source relation provider for the given Spark session.
*
* @param spark
* the Spark session
* @return
* a sequence of source relation provider
*/
def getProviders(spark: SparkSession): Seq[FlintSparkSourceRelationProvider] = {
var relations = Seq[FlintSparkSourceRelationProvider]()

// File source is built-in supported
relations = relations :+ new FileSourceRelationProvider

// Add Iceberg provider if it's enabled in Spark conf
if (spark.conf
.getOption("spark.sql.catalog.spark_catalog")
.contains("org.apache.iceberg.spark.SparkSessionCatalog")) {
relations = relations :+ new IcebergSourceRelationProvider
}
relations
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,8 @@ case class FileSourceRelation(override val plan: LogicalRelation)
extends FlintSparkSourceRelation {

override def tableName: String =
plan.catalogTable
.getOrElse(throw new IllegalArgumentException("No table found in the source relation plan"))
plan.catalogTable.get // catalogTable must be present as pre-checked in source relation provider's
.qualifiedName

override def output: Seq[AttributeReference] = plan.output
}

class FileSourceRelationProvider extends FlintSparkSourceRelationProvider {

override def isSupported(plan: LogicalPlan): Boolean = plan match {
case LogicalRelation(_, _, Some(_), false) => true
case _ => false
}

override def getRelation(plan: LogicalPlan): FlintSparkSourceRelation = {
FileSourceRelation(plan.asInstanceOf[LogicalRelation])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.source.file

import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider}

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation

/**
* Source relation provider for Spark built-in file-based source.
*
* @param name
* the name of the file source provider
*/
class FileSourceRelationProvider(override val name: String = "file")
extends FlintSparkSourceRelationProvider {

override def isSupported(plan: LogicalPlan): Boolean = plan match {
case LogicalRelation(_, _, Some(_), false) => true
case _ => false
}

override def getRelation(plan: LogicalPlan): FlintSparkSourceRelation = {
FileSourceRelation(plan.asInstanceOf[LogicalRelation])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@

package org.opensearch.flint.spark.source.iceberg

import org.apache.iceberg.spark.source.SparkTable
import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider}
import org.opensearch.flint.spark.source.FlintSparkSourceRelation

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

/**
Expand All @@ -23,29 +21,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
case class IcebergSourceRelation(override val plan: DataSourceV2Relation)
extends FlintSparkSourceRelation {

/**
* Retrieves the fully qualified name of the table from the Iceberg table metadata. If the
* Iceberg table is not correctly referenced or the metadata is missing, an exception is thrown.
*/
override def tableName: String =
plan.table.name() // TODO: confirm

/**
* Provides the output attributes of the logical plan. These attributes represent the schema of
* the Iceberg table as it appears in Spark's logical plan and are used to define the structure
* of the data returned by scans of the Iceberg table.
*/
override def output: Seq[AttributeReference] = plan.output
}

class IcebergSourceRelationProvider extends FlintSparkSourceRelationProvider {

override def isSupported(plan: LogicalPlan): Boolean = plan match {
case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true
case _ => false
}

override def getRelation(plan: LogicalPlan): FlintSparkSourceRelation = {
IcebergSourceRelation(plan.asInstanceOf[DataSourceV2Relation])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.source.iceberg

import org.apache.iceberg.spark.source.SparkTable
import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider}

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}

/**
* Source relation provider for Apache Iceberg-based source.
*
* @param name
* the name of the Iceberg source provider
*/
class IcebergSourceRelationProvider(override val name: String = "iceberg")
extends FlintSparkSourceRelationProvider {

override def isSupported(plan: LogicalPlan): Boolean = plan match {
case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true
case DataSourceV2ScanRelation(DataSourceV2Relation(_: SparkTable, _, _, _, _), _, _, _) =>
true
case _ => false
}

override def getRelation(plan: LogicalPlan): FlintSparkSourceRelation = plan match {
case relation @ DataSourceV2Relation(_: SparkTable, _, _, _, _) =>
IcebergSourceRelation(relation)
case DataSourceV2ScanRelation(
relation @ DataSourceV2Relation(_: SparkTable, _, _, _, _),
_,
_,
_) =>
IcebergSourceRelation(relation)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.flint.spark.iceberg

// FIXME: support Iceberg table in covering index rewrite rule
/*
import org.opensearch.flint.spark.FlintSparkCoveringIndexSqlITSuite

class FlintSparkIcebergCoveringIndexITSuite
extends FlintSparkCoveringIndexSqlITSuite
with FlintSparkIcebergSuite {}
*/

0 comments on commit f4416b1

Please sign in to comment.