diff --git a/README.md b/README.md
index f9568838e..eabcd7fd3 100644
--- a/README.md
+++ b/README.md
@@ -22,7 +22,7 @@ Version compatibility:
| 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.4.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
-| 0.5.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
+| 0.5.0 | 11+ | 3.5.1 | 2.12.14 | 2.13+ |
## Flint Extension Usage
diff --git a/build.sbt b/build.sbt
index 6f8237aac..120fad5ac 100644
--- a/build.sbt
+++ b/build.sbt
@@ -5,10 +5,10 @@
import Dependencies._
lazy val scala212 = "2.12.14"
-lazy val sparkVersion = "3.3.2"
-// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile
+lazy val sparkVersion = "3.5.1"
+// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version should compatible
// https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59
-lazy val jacksonVersion = "2.13.4"
+lazy val jacksonVersion = "2.15.2"
// The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version.
// Issue: https://github.com/opensearch-project/opensearch-spark/issues/442
diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java
index 6e3e90916..c3e00a067 100644
--- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java
+++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java
@@ -10,9 +10,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.commons.lang.StringUtils;
import com.amazonaws.services.cloudwatch.model.Dimension;
+import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkEnv;
/**
@@ -124,4 +124,4 @@ private static Dimension getEnvironmentVariableDimension(String envVarName, Stri
private static Dimension getDefaultDimension(String dimensionName) {
return getEnvironmentVariableDimension(dimensionName, dimensionName);
}
-}
\ No newline at end of file
+}
diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java
index f4d456899..a5ea190c5 100644
--- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java
+++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java
@@ -46,7 +46,6 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
-import org.apache.commons.lang.StringUtils;
import org.apache.spark.metrics.sink.CloudWatchSink.DimensionNameGroups;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala
index 201e7c748..9b9f70be0 100644
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala
@@ -5,7 +5,8 @@
package org.apache.spark.sql.flint
-import org.apache.spark.internal.Logging
+import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain
+
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.flint.config.FlintSparkConf
@@ -17,8 +18,7 @@ case class FlintScan(
options: FlintSparkConf,
pushedPredicates: Array[Predicate])
extends Scan
- with Batch
- with Logging {
+ with Batch {
override def readSchema(): StructType = schema
@@ -44,10 +44,13 @@ case class FlintScan(
* Print pushedPredicates when explain(mode="extended"). Learn from SPARK JDBCScan.
*/
override def description(): String = {
- super.description() + ", PushedPredicates: " + seqToString(pushedPredicates)
+ super.description() + ", PushedPredicates: " + pushedPredicates
+ .map {
+ case p if p.name().equalsIgnoreCase(BloomFilterMightContain.NAME) => p.name()
+ case p => p.toString()
+ }
+ .mkString("[", ", ", "]")
}
-
- private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
}
/**
diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala
index 0c6f7d700..82a570b2f 100644
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala
@@ -5,6 +5,8 @@
package org.apache.spark.sql.flint
+import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownV2Filters}
@@ -34,4 +36,5 @@ case class FlintScanBuilder(
}
override def pushedPredicates(): Array[Predicate] = pushedPredicate
+ .filterNot(_.name().equalsIgnoreCase(BloomFilterMightContain.NAME))
}
diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala
index 25b4db940..8691de3d0 100644
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala
@@ -48,4 +48,6 @@ case class FlintWrite(
override def toBatch: BatchWrite = this
override def toStreaming: StreamingWrite = this
+
+ override def useCommitCoordinator(): Boolean = false
}
diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala
index 31db1909f..a9e9122fb 100644
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala
@@ -118,7 +118,8 @@ class FlintJacksonParser(
array.toArray[InternalRow](schema)
}
case START_ARRAY =>
- throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError()
+ throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError(
+ parser.currentToken().asString())
}
}
@@ -420,17 +421,17 @@ class FlintJacksonParser(
case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
dataType match {
case FloatType | DoubleType | TimestampType | DateType =>
- throw QueryExecutionErrors.failToParseEmptyStringForDataTypeError(dataType)
+ throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
case _ => null
}
case VALUE_STRING if parser.getTextLength < 1 =>
- throw QueryExecutionErrors.failToParseEmptyStringForDataTypeError(dataType)
+ throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
case token =>
// We cannot parse this token based on the given data type. So, we throw a
// RuntimeException and this exception will be caught by `parse` method.
- throw QueryExecutionErrors.failToParseValueForDataTypeError(parser, token, dataType)
+ throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, dataType)
}
/**
@@ -537,7 +538,7 @@ class FlintJacksonParser(
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
- throw BadRecordException(() => recordLiteral(record), () => None, e)
+ throw BadRecordException(() => recordLiteral(record), cause = e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
@@ -545,11 +546,11 @@ class FlintJacksonParser(
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
- throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
+ throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
- partialResult = () => Some(row),
+ partialResults = () => Array(row),
cause)
}
}
diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala
index 06c92882b..d71bc5d12 100644
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala
@@ -9,6 +9,7 @@ import java.util.concurrent.ScheduledExecutorService
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.internal.SQLConf.DEFAULT_CATALOG
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils}
@@ -72,14 +73,8 @@ package object flint {
def qualifyTableName(spark: SparkSession, tableName: String): String = {
val (catalog, ident) = parseTableName(spark, tableName)
- // Tricky that our Flint delegate catalog's name has to be spark_catalog
- // so we have to find its actual name in CatalogManager
- val catalogMgr = spark.sessionState.catalogManager
- val catalogName =
- catalogMgr
- .listCatalogs(Some("*"))
- .find(catalogMgr.catalog(_) == catalog)
- .getOrElse(catalog.name())
+ // more reading at https://github.com/opensearch-project/opensearch-spark/issues/319.
+ val catalogName = resolveCatalogName(spark, catalog)
s"$catalogName.${ident.namespace.mkString(".")}.${ident.name}"
}
@@ -134,4 +129,41 @@ package object flint {
def findField(rootField: StructType, fieldName: String): Option[StructField] = {
rootField.findNestedField(fieldName.split('.')).map(_._2)
}
+
+ /**
+ * Resolve catalog name. spark.sql.defaultCatalog name is returned if catalog.name is
+ * spark_catalog otherwise, catalog.name is returned.
+ * @see
+ * issue319
+ *
+ * @param spark
+ * Spark Session
+ * @param catalog
+ * Spark Catalog
+ * @return
+ * catalog name.
+ */
+ def resolveCatalogName(spark: SparkSession, catalog: CatalogPlugin): String = {
+
+ /**
+ * Check if the provided catalog is a session catalog.
+ */
+ if (CatalogV2Util.isSessionCatalog(catalog)) {
+ val defaultCatalog = spark.conf.get(DEFAULT_CATALOG)
+ if (spark.sessionState.catalogManager.isCatalogRegistered(defaultCatalog)) {
+ defaultCatalog
+ } else {
+
+ /**
+ * It may happen when spark.sql.defaultCatalog is configured, but there's no
+ * implementation. For instance, spark.sql.defaultCatalog = "unknown"
+ */
+ throw new IllegalStateException(s"Unknown catalog name: $defaultCatalog")
+ }
+ } else {
+ // Return the name for non-session catalogs
+ catalog.name()
+ }
+ }
}
diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala
index bd7abcfb3..dc7875ccf 100644
--- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala
+++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala
@@ -10,7 +10,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COL
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.functions.isnull
import org.apache.spark.sql.types.StructType
@@ -96,7 +96,7 @@ case class FlintSparkSkippingFileIndex(
.toSet
}
- private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatus) = {
+ private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatusWithMetadata) = {
selectedFiles.contains(f.getPath.toUri.toString)
}
}
diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala
index de2ea772d..fa9b23951 100644
--- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala
+++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala
@@ -11,7 +11,10 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKi
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GetStructField}
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
+import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.StringType
/**
* Skipping index strategy that defines skipping data structure building and reading logic.
@@ -115,6 +118,17 @@ object FlintSparkSkippingStrategy {
Seq(attr.name)
case GetStructField(child, _, Some(name)) =>
extractColumnName(child) :+ name
+ /**
+ * Since Spark 3.4 add read-side padding, char_col = "sample char" became
+ * (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
+ * StringType, readSidePadding, char_col#47, 20, true, false, true) = sample char )
+ *
+ * When create skipping index, Spark did write-side padding. So read-side push down can be
+ * ignored. More reading, https://issues.apache.org/jira/browse/SPARK-40697
+ */
+ case StaticInvoke(staticObject, StringType, "readSidePadding", arguments, _, _, _, _)
+ if classOf[CharVarcharCodegenUtils].isAssignableFrom(staticObject) =>
+ extractColumnName(arguments.head)
case _ => Seq.empty
}
}
diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala
index 653abbd7d..c7ba66c9c 100644
--- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala
+++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala
@@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping.bloomfilter
import java.io.ByteArrayInputStream
import org.opensearch.flint.core.field.bloomfilter.classic.ClassicBloomFilter
+import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain.NAME
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.InternalRow
@@ -40,7 +41,7 @@ case class BloomFilterMightContain(bloomFilterExpression: Expression, valueExpre
override def dataType: DataType = BooleanType
- override def symbol: String = "BLOOM_FILTER_MIGHT_CONTAIN"
+ override def symbol: String = NAME
override def checkInputDataTypes(): TypeCheckResult = {
(left.dataType, right.dataType) match {
@@ -109,6 +110,8 @@ case class BloomFilterMightContain(bloomFilterExpression: Expression, valueExpre
object BloomFilterMightContain {
+ val NAME = "BLOOM_FILTER_MIGHT_CONTAIN"
+
/**
* Generate bloom filter might contain function given the bloom filter column and value.
*
diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala
new file mode 100644
index 000000000..3c75cf541
--- /dev/null
+++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.apache.spark.sql
+
+import org.mockito.Mockito.when
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin}
+import org.apache.spark.sql.flint.resolveCatalogName
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.SQLConf.DEFAULT_CATALOG
+
+class FlintCatalogSuite extends SparkFunSuite with MockitoSugar {
+
+ test("resolveCatalogName returns default catalog name for session catalog") {
+ assertCatalog()
+ .withCatalogName("spark_catalog")
+ .withDefaultCatalog("glue")
+ .registerCatalog("glue")
+ .shouldResolveCatalogName("glue")
+ }
+
+ test("resolveCatalogName returns default catalog name for spark_catalog") {
+ assertCatalog()
+ .withCatalogName("spark_catalog")
+ .withDefaultCatalog("spark_catalog")
+ .registerCatalog("spark_catalog")
+ .shouldResolveCatalogName("spark_catalog")
+ }
+
+ test("resolveCatalogName should return catalog name for non-session catalogs") {
+ assertCatalog()
+ .withCatalogName("custom_catalog")
+ .withDefaultCatalog("custom_catalog")
+ .registerCatalog("custom_catalog")
+ .shouldResolveCatalogName("custom_catalog")
+ }
+
+ test(
+ "resolveCatalogName should throw RuntimeException when default catalog is not registered") {
+ assertCatalog()
+ .withCatalogName("spark_catalog")
+ .withDefaultCatalog("glue")
+ .registerCatalog("unknown")
+ .shouldThrowException()
+ }
+
+ private def assertCatalog(): AssertionHelper = {
+ new AssertionHelper
+ }
+
+ private class AssertionHelper {
+ private val spark = mock[SparkSession]
+ private val catalog = mock[CatalogPlugin]
+ private val sessionState = mock[SessionState]
+ private val catalogManager = mock[CatalogManager]
+
+ def withCatalogName(catalogName: String): AssertionHelper = {
+ when(catalog.name()).thenReturn(catalogName)
+ this
+ }
+
+ def withDefaultCatalog(catalogName: String): AssertionHelper = {
+ val conf = new SQLConf
+ conf.setConf(DEFAULT_CATALOG, catalogName)
+ when(spark.conf).thenReturn(new RuntimeConfig(conf))
+ this
+ }
+
+ def registerCatalog(catalogName: String): AssertionHelper = {
+ when(spark.sessionState).thenReturn(sessionState)
+ when(sessionState.catalogManager).thenReturn(catalogManager)
+ when(catalogManager.isCatalogRegistered(catalogName)).thenReturn(true)
+ this
+ }
+
+ def shouldResolveCatalogName(expectedCatalogName: String): Unit = {
+ assert(resolveCatalogName(spark, catalog) == expectedCatalogName)
+ }
+
+ def shouldThrowException(): Unit = {
+ assertThrows[IllegalStateException] {
+ resolveCatalogName(spark, catalog)
+ }
+ }
+ }
+}
diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala
index a590eccb1..3a6802704 100644
--- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala
+++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala
@@ -284,8 +284,8 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
Matcher { (plan: LogicalPlan) =>
val result = plan.exists {
case LogicalRelation(_, _, Some(table), _) =>
- // Table name in logical relation doesn't have catalog name
- table.qualifiedName == expectedTableName.split('.').drop(1).mkString(".")
+ // Since Spark 3.4, Table name in logical relation have catalog name
+ table.qualifiedName == expectedTableName
case _ => false
}
diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala
index c099a1a86..f03116de9 100644
--- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala
+++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala
@@ -7,112 +7,105 @@ package org.opensearch.flint.spark.skipping
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING}
-import org.opensearch.flint.spark.FlintSpark
-import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
+import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions}
+import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexOptions}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind
import org.scalatest.matchers.{Matcher, MatchResult}
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.FlintSuite
import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression, ExprId, Literal, Or}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
-class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
+class ApplyFlintSparkSkippingIndexSuite extends FlintSuite with Matchers {
/** Test table and index */
private val testTable = "spark_catalog.default.apply_skipping_index_test"
- private val testIndex = getSkippingIndexName(testTable)
- private val testSchema = StructType(
- Seq(
- StructField("name", StringType, nullable = false),
- StructField("age", IntegerType, nullable = false),
- StructField("address", StringType, nullable = false)))
-
- /** Resolved column reference used in filtering condition */
- private val nameCol =
- AttributeReference("name", StringType, nullable = false)(exprId = ExprId(1))
- private val ageCol =
- AttributeReference("age", IntegerType, nullable = false)(exprId = ExprId(2))
- private val addressCol =
- AttributeReference("address", StringType, nullable = false)(exprId = ExprId(3))
+
+ // Mock FlintClient to avoid looking for real OpenSearch cluster
+ private val clientBuilder = mockStatic(classOf[FlintClientBuilder])
+ private val client = mock[FlintClient](RETURNS_DEEP_STUBS)
+
+ /** Mock FlintSpark which is required by the rule */
+ private val flint = mock[FlintSpark]
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ sql(s"CREATE TABLE $testTable (name STRING, age INT, address STRING) USING JSON")
+
+ // Mock static create method in FlintClientBuilder used by Flint data source
+ clientBuilder
+ .when(() => FlintClientBuilder.build(any(classOf[FlintOptions])))
+ .thenReturn(client)
+ when(flint.spark).thenReturn(spark)
+ }
+
+ override protected def afterAll(): Unit = {
+ sql(s"DROP TABLE $testTable")
+ clientBuilder.close()
+ super.afterAll()
+ }
test("should not rewrite query if no skipping index") {
assertFlintQueryRewriter()
- .withSourceTable(testTable, testSchema)
- .withFilter(EqualTo(nameCol, Literal("hello")))
+ .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'")
.withNoSkippingIndex()
.shouldNotRewrite()
}
test("should not rewrite query if filter condition is disjunction") {
assertFlintQueryRewriter()
- .withSourceTable(testTable, testSchema)
- .withFilter(Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
- .withSkippingIndex(testIndex, REFRESHING, "name", "age")
+ .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' or age = 30")
+ .withSkippingIndex(REFRESHING, "name", "age")
.shouldNotRewrite()
}
test("should not rewrite query if filter condition contains disjunction") {
assertFlintQueryRewriter()
- .withSourceTable(testTable, testSchema)
- .withFilter(
- And(
- Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))),
- EqualTo(ageCol, Literal(30))))
- .withSkippingIndex(testIndex, REFRESHING, "name", "age")
+ .withQuery(
+ s"SELECT * FROM $testTable WHERE (name = 'hello' or age = 30) and address = 'Seattle'")
+ .withSkippingIndex(REFRESHING, "name", "age")
.shouldNotRewrite()
}
test("should rewrite query with skipping index") {
assertFlintQueryRewriter()
- .withSourceTable(testTable, testSchema)
- .withFilter(EqualTo(nameCol, Literal("hello")))
- .withSkippingIndex(testIndex, REFRESHING, "name")
+ .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'")
+ .withSkippingIndex(REFRESHING, "name")
.shouldPushDownAfterRewrite(col("name") === "hello")
}
test("should not rewrite query with deleted skipping index") {
assertFlintQueryRewriter()
- .withSourceTable(testTable, testSchema)
- .withFilter(EqualTo(nameCol, Literal("hello")))
- .withSkippingIndex(testIndex, DELETED, "name")
+ .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'")
+ .withSkippingIndex(DELETED, "name")
.shouldNotRewrite()
}
test("should only push down filter condition with indexed column") {
assertFlintQueryRewriter()
- .withSourceTable(testTable, testSchema)
- .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
- .withSkippingIndex(testIndex, REFRESHING, "name")
+ .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' and age = 30")
+ .withSkippingIndex(REFRESHING, "name")
.shouldPushDownAfterRewrite(col("name") === "hello")
}
test("should push down all filter conditions with indexed column") {
assertFlintQueryRewriter()
- .withSourceTable(testTable, testSchema)
- .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
- .withSkippingIndex(testIndex, REFRESHING, "name", "age")
+ .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' and age = 30")
+ .withSkippingIndex(REFRESHING, "name", "age")
.shouldPushDownAfterRewrite(col("name") === "hello" && col("age") === 30)
assertFlintQueryRewriter()
- .withSourceTable(testTable, testSchema)
- .withFilter(
- And(
- EqualTo(nameCol, Literal("hello")),
- And(EqualTo(ageCol, Literal(30)), EqualTo(addressCol, Literal("Seattle")))))
- .withSkippingIndex(testIndex, REFRESHING, "name", "age", "address")
+ .withQuery(
+ s"SELECT * FROM $testTable WHERE name = 'hello' and (age = 30 and address = 'Seattle')")
+ .withSkippingIndex(REFRESHING, "name", "age", "address")
.shouldPushDownAfterRewrite(
col("name") === "hello" && col("age") === 30 && col("address") === "Seattle")
}
@@ -122,46 +115,27 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
}
private class AssertionHelper {
- private val flint = {
- val mockFlint = mock[FlintSpark](RETURNS_DEEP_STUBS)
- when(mockFlint.spark.sessionState.catalogManager.currentCatalog.name())
- .thenReturn("spark_catalog")
- mockFlint
- }
private val rule = new ApplyFlintSparkSkippingIndex(flint)
- private var relation: LogicalRelation = _
private var plan: LogicalPlan = _
- def withSourceTable(fullname: String, schema: StructType): AssertionHelper = {
- val table = CatalogTable(
- identifier = TableIdentifier(fullname.split('.')(1), Some(fullname.split('.')(0))),
- tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty,
- schema = null)
- relation = LogicalRelation(mockBaseRelation(schema), table)
- this
- }
-
- def withFilter(condition: Expression): AssertionHelper = {
- val filter = Filter(condition, relation)
- val project = Project(Seq(), filter)
- plan = SubqueryAlias("alb_logs", project)
+ def withQuery(query: String): AssertionHelper = {
+ this.plan = sql(query).queryExecution.optimizedPlan
this
}
- def withSkippingIndex(
- indexName: String,
- indexState: IndexState,
- indexCols: String*): AssertionHelper = {
- val skippingIndex = mock[FlintSparkSkippingIndex]
- when(skippingIndex.kind).thenReturn(SKIPPING_INDEX_TYPE)
- when(skippingIndex.name()).thenReturn(indexName)
- when(skippingIndex.indexedColumns).thenReturn(indexCols.map(FakeSkippingStrategy))
-
- // Mock index log entry with the given state
- val logEntry = mock[FlintMetadataLogEntry]
- when(logEntry.state).thenReturn(indexState)
- when(skippingIndex.latestLogEntry).thenReturn(Some(logEntry))
+ def withSkippingIndex(indexState: IndexState, indexCols: String*): AssertionHelper = {
+ val skippingIndex = new FlintSparkSkippingIndex(
+ tableName = testTable,
+ indexedColumns = indexCols.map(FakeSkippingStrategy),
+ options = FlintSparkIndexOptions.empty,
+ latestLogEntry = Some(
+ new FlintMetadataLogEntry(
+ "id",
+ 0L,
+ indexState,
+ Map.empty[String, Any],
+ "",
+ Map.empty[String, Any])))
when(flint.describeIndex(any())).thenReturn(Some(skippingIndex))
this
@@ -181,23 +155,6 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
}
}
- private def mockBaseRelation(schema: StructType): BaseRelation = {
- val fileIndex = mock[FileIndex]
- val baseRelation: HadoopFsRelation = mock[HadoopFsRelation]
- when(baseRelation.location).thenReturn(fileIndex)
- when(baseRelation.schema).thenReturn(schema)
-
- // Mock baseRelation.copy(location = FlintFileIndex)
- doAnswer((invocation: InvocationOnMock) => {
- val location = invocation.getArgument[FileIndex](0)
- val relationCopy: HadoopFsRelation = mock[HadoopFsRelation]
- when(relationCopy.location).thenReturn(location)
- relationCopy
- }).when(baseRelation).copy(any(), any(), any(), any(), any(), any())(any())
-
- baseRelation
- }
-
private def pushDownFilterToIndexScan(expect: Column): Matcher[LogicalPlan] = {
Matcher { (plan: LogicalPlan) =>
val useFlintSparkSkippingFileIndex = plan.exists {
diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala
index d2ef72158..4b707841c 100644
--- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala
+++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala
@@ -16,7 +16,7 @@ import org.apache.spark.FlintSuite
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate}
-import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
@@ -118,7 +118,8 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers {
private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = {
partitions.map { case (partitionName, filePaths) =>
- val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path)))
+ val files = filePaths.map(path =>
+ FileStatusWithMetadata(new FileStatus(0, false, 0, 0, 0, new Path(path))))
PartitionDirectory(InternalRow(Literal(partitionName)), files)
}.toSeq
}
diff --git a/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala
index 0bccf787b..fd5c5bf8f 100644
--- a/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala
+++ b/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala
@@ -104,7 +104,9 @@ class FlintDataSourceV2ITSuite
val df2 = df.filter($"aText".contains("second"))
checkFiltersRemoved(df2)
- checkPushedInfo(df2, "PushedPredicates: [aText IS NOT NULL, aText LIKE '%second%']")
+ checkPushedInfo(
+ df2,
+ "PushedPredicates: [aText IS NOT NULL, aText LIKE '%second%' ESCAPE '\\']")
checkAnswer(df2, Row(2, "b", "i am second"))
val df3 =
@@ -117,7 +119,7 @@ class FlintDataSourceV2ITSuite
checkFiltersRemoved(df4)
checkPushedInfo(
df4,
- "PushedPredicates: [aInt IS NOT NULL, aText IS NOT NULL, aInt > 1, aText LIKE '%second%']")
+ "PushedPredicates: [aInt IS NOT NULL, aText IS NOT NULL, aInt > 1, aText LIKE '%second%' ESCAPE '\\']")
checkAnswer(df4, Row(2, "b", "i am second"))
}
}
diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala
index ea5988577..69900677c 100644
--- a/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala
+++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala
@@ -22,6 +22,7 @@ class OpenSearchCatalogITSuite extends OpenSearchCatalogSuite {
}
}
+ // FIXME https://github.com/opensearch-project/opensearch-spark/issues/529
test("Describe single index as table") {
val indexName = "t0001"
withIndexName(indexName) {
@@ -29,16 +30,13 @@ class OpenSearchCatalogITSuite extends OpenSearchCatalogSuite {
val df = spark.sql(s"""
DESC ${catalogName}.default.$indexName""")
- assert(df.count() == 6)
+ assert(df.count() == 3)
checkAnswer(
df,
Seq(
- Row("# Partitioning", "", ""),
- Row("", "", ""),
- Row("Not partitioned", "", ""),
- Row("accountId", "string", ""),
- Row("eventName", "string", ""),
- Row("eventSource", "string", "")))
+ Row("accountId", "string", null),
+ Row("eventName", "string", null),
+ Row("eventSource", "string", null)))
}
}
diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala
index 21323cca4..832642088 100644
--- a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala
+++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala
@@ -8,7 +8,7 @@ package org.apache.spark.opensearch.table
import org.opensearch.flint.spark.FlintSparkSuite
trait OpenSearchCatalogSuite extends FlintSparkSuite {
- val catalogName = "dev"
+ override lazy val catalogName = "dev"
override def beforeAll(): Unit = {
super.beforeAll()
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala
index fc4cdbeac..439930486 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
/** Test table, MV, index name and query */
- private val testTable = "spark_catalog.default.mv_test"
- private val testMvName = "spark_catalog.default.mv_test_metrics"
+ private val testTable = s"$catalogName.default.mv_test"
+ private val testMvName = s"$catalogName.default.mv_test_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testQuery =
s"""
@@ -218,7 +218,11 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") {
- val tableName = "spark_catalog.default.issue112"
+ if (tableType.equalsIgnoreCase("iceberg")) {
+ cancel
+ }
+
+ val tableName = s"$catalogName.default.issue112"
createTableIssue112(tableName)
sql(s"""
|CREATE MATERIALIZED VIEW $testMvName AS
@@ -261,14 +265,14 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
test("create materialized view with quoted name and column name") {
val testQuotedQuery =
- """ SELECT
+ s""" SELECT
| window.start AS `start.time`,
| COUNT(*) AS `count`
- | FROM `spark_catalog`.`default`.`mv_test`
+ | FROM `$catalogName`.`default`.`mv_test`
| GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim
sql(s"""
- | CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics`
+ | CREATE MATERIALIZED VIEW `$catalogName`.`default`.`mv_test_metrics`
| AS $testQuotedQuery
|""".stripMargin)
@@ -303,34 +307,34 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
test("show all materialized views in catalog and database") {
// Show in catalog
- flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
- checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), Seq(Row("mv1")))
+ flint.materializedView().name(s"$catalogName.default.mv1").query(testQuery).create()
+ checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName"), Seq(Row("mv1")))
// Show in catalog.database
- flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create()
+ flint.materializedView().name(s"$catalogName.default.mv2").query(testQuery).create()
checkAnswer(
- sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"),
+ sql(s"SHOW MATERIALIZED VIEW IN $catalogName.default"),
Seq(Row("mv1"), Row("mv2")))
- checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
+ checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.other"), Seq.empty)
deleteTestIndex(
- getFlintIndexName("spark_catalog.default.mv1"),
- getFlintIndexName("spark_catalog.default.mv2"))
+ getFlintIndexName(s"$catalogName.default.mv1"),
+ getFlintIndexName(s"$catalogName.default.mv2"))
}
test("show materialized view in database with the same prefix") {
- flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
- flint.materializedView().name("spark_catalog.default_test.mv2").query(testQuery).create()
- checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), Seq(Row("mv1")))
+ flint.materializedView().name(s"$catalogName.default.mv1").query(testQuery).create()
+ flint.materializedView().name(s"$catalogName.default_test.mv2").query(testQuery).create()
+ checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.default"), Seq(Row("mv1")))
deleteTestIndex(
- getFlintIndexName("spark_catalog.default.mv1"),
- getFlintIndexName("spark_catalog.default_test.mv2"))
+ getFlintIndexName(s"$catalogName.default.mv1"),
+ getFlintIndexName(s"$catalogName.default_test.mv2"))
}
test("should return emtpy when show materialized views in empty database") {
- checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
+ checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.other"), Seq.empty)
}
test("describe materialized view") {
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala
index af497eb2b..751a0a7b6 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY
class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuiteHelper {
/** Test table and index name */
- private val testTable = "spark_catalog.default.skipping_sql_test"
- private val testIndex = getSkippingIndexName(testTable)
+ protected val testTable = s"$catalogName.default.skipping_sql_test"
+ protected val testIndex = getSkippingIndexName(testTable)
override def beforeEach(): Unit = {
super.beforeAll()
@@ -201,7 +201,9 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
"`struct_col`.`field1`.`subfield` VALUE_SET, `struct_col`.`field2` MIN_MAX").foreach {
columnSkipTypes =>
test(s"build skipping index for nested field $columnSkipTypes") {
- val testTable = "spark_catalog.default.nested_field_table"
+ assume(tableType != "iceberg", "ignore iceberg skipping index query rewrite test")
+
+ val testTable = s"$catalogName.default.nested_field_table"
val testIndex = getSkippingIndexName(testTable)
withTable(testTable) {
createStructTable(testTable)
@@ -339,7 +341,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
test("create skipping index with quoted table and column name") {
sql(s"""
- | CREATE SKIPPING INDEX ON `spark_catalog`.`default`.`skipping_sql_test`
+ | CREATE SKIPPING INDEX ON `$catalogName`.`default`.`skipping_sql_test`
| (
| `year` PARTITION,
| `name` VALUE_SET,
@@ -385,17 +387,26 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
sql("USE sample")
// Create index without database name specified
- sql("CREATE TABLE test1 (name STRING) USING CSV")
+ sql(s"CREATE TABLE test1 (name STRING) USING $tableType")
sql("CREATE SKIPPING INDEX ON test1 (name VALUE_SET)")
// Create index with database name specified
- sql("CREATE TABLE test2 (name STRING) USING CSV")
+ sql(s"CREATE TABLE test2 (name STRING) USING $tableType")
sql("CREATE SKIPPING INDEX ON sample.test2 (name VALUE_SET)")
try {
- flint.describeIndex("flint_spark_catalog_sample_test1_skipping_index") shouldBe defined
- flint.describeIndex("flint_spark_catalog_sample_test2_skipping_index") shouldBe defined
+ flint.describeIndex(s"flint_${catalogName}_sample_test1_skipping_index") shouldBe defined
+ flint.describeIndex(s"flint_${catalogName}_sample_test2_skipping_index") shouldBe defined
} finally {
+
+ /**
+ * TODO: REMOVE DROP TABLE when iceberg support CASCADE. More reading at
+ * https://github.com/apache/iceberg/pull/7275.
+ */
+ if (tableType.equalsIgnoreCase("iceberg")) {
+ sql("DROP TABLE test1")
+ sql("DROP TABLE test2")
+ }
sql("DROP DATABASE sample CASCADE")
}
}
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala
index 751647149..ea9945fce 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala
@@ -35,6 +35,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
lazy protected val flint: FlintSpark = new FlintSpark(spark)
lazy protected val tableType: String = "CSV"
lazy protected val tableOptions: String = "OPTIONS (header 'false', delimiter '\t')"
+ lazy protected val catalogName: String = "spark_catalog"
override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala
index 4cba9099c..79e70655b 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala
@@ -7,11 +7,12 @@ package org.opensearch.flint.spark
import java.sql.Timestamp
+import org.scalatest.Ignore
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite {
@@ -26,8 +27,21 @@ class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite {
val resultDF = inputDF.selectExpr("TUMBLE(timestamp, '10 minutes')")
- resultDF.schema shouldBe StructType.fromDDL(
- "window struct NOT NULL")
+ // Since Spark 3.4. https://issues.apache.org/jira/browse/SPARK-40821
+ val expected =
+ StructType(StructType.fromDDL("window struct NOT NULL").map {
+ case StructField(name, dataType: StructType, nullable, _) if name == "window" =>
+ StructField(
+ name,
+ dataType,
+ nullable,
+ metadata = new MetadataBuilder()
+ .putBoolean("spark.timeWindow", true)
+ .build())
+ case other => other
+ })
+
+ resultDF.schema shouldBe expected
checkAnswer(
resultDF,
Seq(
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala
index 2ae0d157a..e7ce5316b 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala
@@ -22,13 +22,20 @@ trait FlintSparkIcebergSuite extends FlintSparkSuite {
// You can also override tableOptions if Iceberg requires different options
override lazy protected val tableOptions: String = ""
+ override lazy protected val catalogName: String = "local"
+
// Override the sparkConf method to include Iceberg-specific configurations
override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
// Set Iceberg-specific Spark configurations
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
- .set("spark.sql.catalog.spark_catalog.type", "hadoop")
- .set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}")
+ .set("spark.sql.catalog.spark_catalog.type", "hive")
+ .set(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog")
+ .set(s"spark.sql.catalog.$catalogName.type", "hadoop")
+ // Required by IT(create skipping index on table without database name)
+ .set(s"spark.sql.catalog.$catalogName.default-namespace", "default")
+ .set(s"spark.sql.catalog.$catalogName.warehouse", s"spark-warehouse/${suiteName}")
+ .set(s"spark.sql.defaultCatalog", s"$catalogName")
.set(
"spark.sql.extensions",
List(
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala
index 407c2cb3b..384f14484 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala
@@ -171,14 +171,14 @@ class FlintSparkPPLEvalITSuite
val ex = intercept[AnalysisException](sql(s"""
| source = $testTable | eval age = 40 | eval name = upper(name) | sort name | fields name, age, state
| """.stripMargin))
- assert(ex.getMessage().contains("Reference 'name' is ambiguous"))
+ assert(ex.getMessage().contains("Reference `name` is ambiguous"))
}
test("test overriding existing fields: throw exception when specify the new field in where") {
val ex = intercept[AnalysisException](sql(s"""
| source = $testTable | eval age = abs(age) | where age < 50
| """.stripMargin))
- assert(ex.getMessage().contains("Reference 'age' is ambiguous"))
+ assert(ex.getMessage().contains("Reference `age` is ambiguous"))
}
test(
@@ -186,7 +186,7 @@ class FlintSparkPPLEvalITSuite
val ex = intercept[AnalysisException](sql(s"""
| source = $testTable | eval age = abs(age) | stats avg(age)
| """.stripMargin))
- assert(ex.getMessage().contains("Reference 'age' is ambiguous"))
+ assert(ex.getMessage().contains("Reference `age` is ambiguous"))
}
test(
@@ -194,7 +194,7 @@ class FlintSparkPPLEvalITSuite
val ex = intercept[AnalysisException](sql(s"""
| source = $testTable | eval country = upper(country) | stats avg(age) by country
| """.stripMargin))
- assert(ex.getMessage().contains("Reference 'country' is ambiguous"))
+ assert(ex.getMessage().contains("Reference `country` is ambiguous"))
}
test("test override existing fields: the eval field doesn't appear in fields command") {
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala
new file mode 100644
index 000000000..fb3d4bbda
--- /dev/null
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.flint.spark.sessioncatalog
+
+import org.opensearch.flint.spark.FlintSparkSuite
+
+import org.apache.spark.SparkConf
+
+/**
+ * Test with FlintDelegatingSessionCatalog.
+ */
+trait FlintSessionCatalogSuite extends FlintSparkSuite {
+ // Override catalog name
+ override lazy protected val catalogName: String = "mycatalog"
+
+ override protected def sparkConf: SparkConf = {
+ val conf = super.sparkConf
+ .set("spark.sql.catalog.mycatalog", "org.opensearch.sql.FlintDelegatingSessionCatalog")
+ .set("spark.sql.defaultCatalog", catalogName)
+ conf
+ }
+}
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala
new file mode 100644
index 000000000..25567ba4e
--- /dev/null
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala
@@ -0,0 +1,15 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.flint.spark.sessioncatalog
+
+import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite
+
+/**
+ * Test MaterializedView with FlintDelegatingSessionCatalog.
+ */
+class FlintSparkSessionCatalogMaterializedViewITSuite
+ extends FlintSparkMaterializedViewSqlITSuite
+ with FlintSessionCatalogSuite {}
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala
new file mode 100644
index 000000000..7b29d5883
--- /dev/null
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala
@@ -0,0 +1,15 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.flint.spark.sessioncatalog
+
+import org.opensearch.flint.spark.FlintSparkSkippingIndexSqlITSuite
+
+/**
+ * Test Skipping Index with FlintDelegatingSessionCatalog.
+ */
+class FlintSparkSessionCatalogSkippingIndexITSuite
+ extends FlintSparkSkippingIndexSqlITSuite
+ with FlintSessionCatalogSuite {}
diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala
index f6be0b1c3..fc8cb4f4a 100644
--- a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala
+++ b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala
@@ -45,7 +45,10 @@ class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessio
test("query without catalog name") {
sql("use mycatalog")
- assert(sql("SHOW CATALOGS").collect === Array(Row("mycatalog")))
+ // Since Spark 3.4.0. https://issues.apache.org/jira/browse/SPARK-40055, listCatalogs should
+ // also return spark_catalog even spark_catalog implementation is defaultSessionCatalog
+ assert(
+ sql("SHOW CATALOGS").collect.toSet === Array(Row("mycatalog"), Row("spark_catalog")).toSet)
checkAnswer(sql(s"SELECT name, age FROM $testTableWithoutCatalog"), Seq(Row("Hello", 30)))
}