From d6e71fa1b14566622d4aa9c5744017cc7500033f Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 8 Aug 2024 13:38:22 -0700 Subject: [PATCH] Upgrade Spark 3.5.1 (#525) --------- Signed-off-by: Peng Huo Signed-off-by: Chen Dai Co-authored-by: Chen Dai --- README.md | 2 +- build.sbt | 6 +- .../core/metrics/reporter/DimensionUtils.java | 4 +- .../DimensionedCloudWatchReporter.java | 1 - .../apache/spark/sql/flint/FlintScan.scala | 15 +- .../spark/sql/flint/FlintScanBuilder.scala | 3 + .../apache/spark/sql/flint/FlintWrite.scala | 2 + .../sql/flint/json/FlintJacksonParser.scala | 15 +- .../org/apache/spark/sql/flint/package.scala | 48 ++++- .../FlintSparkSkippingFileIndex.scala | 4 +- .../skipping/FlintSparkSkippingStrategy.scala | 14 ++ .../bloomfilter/BloomFilterMightContain.scala | 5 +- .../apache/spark/sql/FlintCatalogSuite.scala | 91 ++++++++++ .../ApplyFlintSparkCoveringIndexSuite.scala | 4 +- .../ApplyFlintSparkSkippingIndexSuite.scala | 169 +++++++----------- .../FlintSparkSkippingFileIndexSuite.scala | 5 +- .../spark/FlintDataSourceV2ITSuite.scala | 6 +- .../catalog/OpenSearchCatalogITSuite.scala | 12 +- .../table/OpenSearchCatalogSuite.scala | 2 +- ...FlintSparkMaterializedViewSqlITSuite.scala | 42 +++-- .../FlintSparkSkippingIndexSqlITSuite.scala | 27 ++- .../flint/spark/FlintSparkSuite.scala | 1 + .../FlintSparkWindowingFunctionITSuite.scala | 20 ++- .../iceberg/FlintSparkIcebergSuite.scala | 11 +- .../spark/ppl/FlintSparkPPLEvalITSuite.scala | 8 +- .../FlintSessionCatalogSuite.scala | 25 +++ ...essionCatalogMaterializedViewITSuite.scala | 15 ++ ...rkSessionCatalogSkippingIndexITSuite.scala | 15 ++ .../FlintDelegatingSessionCatalogTest.scala | 5 +- 29 files changed, 389 insertions(+), 188 deletions(-) create mode 100644 flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala diff --git a/README.md b/README.md index f9568838e..eabcd7fd3 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Version compatibility: | 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ | | 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ | | 0.4.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ | -| 0.5.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ | +| 0.5.0 | 11+ | 3.5.1 | 2.12.14 | 2.13+ | ## Flint Extension Usage diff --git a/build.sbt b/build.sbt index 6f8237aac..120fad5ac 100644 --- a/build.sbt +++ b/build.sbt @@ -5,10 +5,10 @@ import Dependencies._ lazy val scala212 = "2.12.14" -lazy val sparkVersion = "3.3.2" -// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile +lazy val sparkVersion = "3.5.1" +// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version should compatible // https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59 -lazy val jacksonVersion = "2.13.4" +lazy val jacksonVersion = "2.15.2" // The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version. // Issue: https://github.com/opensearch-project/opensearch-spark/issues/442 diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java index 6e3e90916..c3e00a067 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java @@ -10,9 +10,9 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.commons.lang.StringUtils; import com.amazonaws.services.cloudwatch.model.Dimension; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkEnv; /** @@ -124,4 +124,4 @@ private static Dimension getEnvironmentVariableDimension(String envVarName, Stri private static Dimension getDefaultDimension(String dimensionName) { return getEnvironmentVariableDimension(dimensionName, dimensionName); } -} \ No newline at end of file +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java index f4d456899..a5ea190c5 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -46,7 +46,6 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; -import org.apache.commons.lang.StringUtils; import org.apache.spark.metrics.sink.CloudWatchSink.DimensionNameGroups; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala index 201e7c748..9b9f70be0 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala @@ -5,7 +5,8 @@ package org.apache.spark.sql.flint -import org.apache.spark.internal.Logging +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain + import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.flint.config.FlintSparkConf @@ -17,8 +18,7 @@ case class FlintScan( options: FlintSparkConf, pushedPredicates: Array[Predicate]) extends Scan - with Batch - with Logging { + with Batch { override def readSchema(): StructType = schema @@ -44,10 +44,13 @@ case class FlintScan( * Print pushedPredicates when explain(mode="extended"). Learn from SPARK JDBCScan. */ override def description(): String = { - super.description() + ", PushedPredicates: " + seqToString(pushedPredicates) + super.description() + ", PushedPredicates: " + pushedPredicates + .map { + case p if p.name().equalsIgnoreCase(BloomFilterMightContain.NAME) => p.name() + case p => p.toString() + } + .mkString("[", ", ", "]") } - - private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") } /** diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala index 0c6f7d700..82a570b2f 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala @@ -5,6 +5,8 @@ package org.apache.spark.sql.flint +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain + import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownV2Filters} @@ -34,4 +36,5 @@ case class FlintScanBuilder( } override def pushedPredicates(): Array[Predicate] = pushedPredicate + .filterNot(_.name().equalsIgnoreCase(BloomFilterMightContain.NAME)) } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala index 25b4db940..8691de3d0 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala @@ -48,4 +48,6 @@ case class FlintWrite( override def toBatch: BatchWrite = this override def toStreaming: StreamingWrite = this + + override def useCommitCoordinator(): Boolean = false } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala index 31db1909f..a9e9122fb 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala @@ -118,7 +118,8 @@ class FlintJacksonParser( array.toArray[InternalRow](schema) } case START_ARRAY => - throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError() + throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError( + parser.currentToken().asString()) } } @@ -420,17 +421,17 @@ class FlintJacksonParser( case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString => dataType match { case FloatType | DoubleType | TimestampType | DateType => - throw QueryExecutionErrors.failToParseEmptyStringForDataTypeError(dataType) + throw QueryExecutionErrors.emptyJsonFieldValueError(dataType) case _ => null } case VALUE_STRING if parser.getTextLength < 1 => - throw QueryExecutionErrors.failToParseEmptyStringForDataTypeError(dataType) + throw QueryExecutionErrors.emptyJsonFieldValueError(dataType) case token => // We cannot parse this token based on the given data type. So, we throw a // RuntimeException and this exception will be caught by `parse` method. - throw QueryExecutionErrors.failToParseValueForDataTypeError(parser, token, dataType) + throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, dataType) } /** @@ -537,7 +538,7 @@ class FlintJacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => recordLiteral(record), () => None, e) + throw BadRecordException(() => recordLiteral(record), cause = e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -545,11 +546,11 @@ class FlintJacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException) + throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => recordLiteral(record), - partialResult = () => Some(row), + partialResults = () => Array(row), cause) } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala index 06c92882b..d71bc5d12 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala @@ -9,6 +9,7 @@ import java.util.concurrent.ScheduledExecutorService import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.internal.SQLConf.DEFAULT_CATALOG import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.{ShutdownHookManager, ThreadUtils} @@ -72,14 +73,8 @@ package object flint { def qualifyTableName(spark: SparkSession, tableName: String): String = { val (catalog, ident) = parseTableName(spark, tableName) - // Tricky that our Flint delegate catalog's name has to be spark_catalog - // so we have to find its actual name in CatalogManager - val catalogMgr = spark.sessionState.catalogManager - val catalogName = - catalogMgr - .listCatalogs(Some("*")) - .find(catalogMgr.catalog(_) == catalog) - .getOrElse(catalog.name()) + // more reading at https://github.com/opensearch-project/opensearch-spark/issues/319. + val catalogName = resolveCatalogName(spark, catalog) s"$catalogName.${ident.namespace.mkString(".")}.${ident.name}" } @@ -134,4 +129,41 @@ package object flint { def findField(rootField: StructType, fieldName: String): Option[StructField] = { rootField.findNestedField(fieldName.split('.')).map(_._2) } + + /** + * Resolve catalog name. spark.sql.defaultCatalog name is returned if catalog.name is + * spark_catalog otherwise, catalog.name is returned. + * @see + * issue319 + * + * @param spark + * Spark Session + * @param catalog + * Spark Catalog + * @return + * catalog name. + */ + def resolveCatalogName(spark: SparkSession, catalog: CatalogPlugin): String = { + + /** + * Check if the provided catalog is a session catalog. + */ + if (CatalogV2Util.isSessionCatalog(catalog)) { + val defaultCatalog = spark.conf.get(DEFAULT_CATALOG) + if (spark.sessionState.catalogManager.isCatalogRegistered(defaultCatalog)) { + defaultCatalog + } else { + + /** + * It may happen when spark.sql.defaultCatalog is configured, but there's no + * implementation. For instance, spark.sql.defaultCatalog = "unknown" + */ + throw new IllegalStateException(s"Unknown catalog name: $defaultCatalog") + } + } else { + // Return the name for non-session catalogs + catalog.name() + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index bd7abcfb3..dc7875ccf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -10,7 +10,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COL import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory} import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.functions.isnull import org.apache.spark.sql.types.StructType @@ -96,7 +96,7 @@ case class FlintSparkSkippingFileIndex( .toSet } - private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatus) = { + private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatusWithMetadata) = { selectedFiles.contains(f.getPath.toUri.toString) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala index de2ea772d..fa9b23951 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala @@ -11,7 +11,10 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKi import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GetStructField} +import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke +import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.StringType /** * Skipping index strategy that defines skipping data structure building and reading logic. @@ -115,6 +118,17 @@ object FlintSparkSkippingStrategy { Seq(attr.name) case GetStructField(child, _, Some(name)) => extractColumnName(child) :+ name + /** + * Since Spark 3.4 add read-side padding, char_col = "sample char" became + * (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, + * StringType, readSidePadding, char_col#47, 20, true, false, true) = sample char ) + * + * When create skipping index, Spark did write-side padding. So read-side push down can be + * ignored. More reading, https://issues.apache.org/jira/browse/SPARK-40697 + */ + case StaticInvoke(staticObject, StringType, "readSidePadding", arguments, _, _, _, _) + if classOf[CharVarcharCodegenUtils].isAssignableFrom(staticObject) => + extractColumnName(arguments.head) case _ => Seq.empty } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala index 653abbd7d..c7ba66c9c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping.bloomfilter import java.io.ByteArrayInputStream import org.opensearch.flint.core.field.bloomfilter.classic.ClassicBloomFilter +import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain.NAME import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.InternalRow @@ -40,7 +41,7 @@ case class BloomFilterMightContain(bloomFilterExpression: Expression, valueExpre override def dataType: DataType = BooleanType - override def symbol: String = "BLOOM_FILTER_MIGHT_CONTAIN" + override def symbol: String = NAME override def checkInputDataTypes(): TypeCheckResult = { (left.dataType, right.dataType) match { @@ -109,6 +110,8 @@ case class BloomFilterMightContain(bloomFilterExpression: Expression, valueExpre object BloomFilterMightContain { + val NAME = "BLOOM_FILTER_MIGHT_CONTAIN" + /** * Generate bloom filter might contain function given the bloom filter column and value. * diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala new file mode 100644 index 000000000..3c75cf541 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql + +import org.mockito.Mockito.when +import org.scalatestplus.mockito.MockitoSugar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin} +import org.apache.spark.sql.flint.resolveCatalogName +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.internal.SQLConf.DEFAULT_CATALOG + +class FlintCatalogSuite extends SparkFunSuite with MockitoSugar { + + test("resolveCatalogName returns default catalog name for session catalog") { + assertCatalog() + .withCatalogName("spark_catalog") + .withDefaultCatalog("glue") + .registerCatalog("glue") + .shouldResolveCatalogName("glue") + } + + test("resolveCatalogName returns default catalog name for spark_catalog") { + assertCatalog() + .withCatalogName("spark_catalog") + .withDefaultCatalog("spark_catalog") + .registerCatalog("spark_catalog") + .shouldResolveCatalogName("spark_catalog") + } + + test("resolveCatalogName should return catalog name for non-session catalogs") { + assertCatalog() + .withCatalogName("custom_catalog") + .withDefaultCatalog("custom_catalog") + .registerCatalog("custom_catalog") + .shouldResolveCatalogName("custom_catalog") + } + + test( + "resolveCatalogName should throw RuntimeException when default catalog is not registered") { + assertCatalog() + .withCatalogName("spark_catalog") + .withDefaultCatalog("glue") + .registerCatalog("unknown") + .shouldThrowException() + } + + private def assertCatalog(): AssertionHelper = { + new AssertionHelper + } + + private class AssertionHelper { + private val spark = mock[SparkSession] + private val catalog = mock[CatalogPlugin] + private val sessionState = mock[SessionState] + private val catalogManager = mock[CatalogManager] + + def withCatalogName(catalogName: String): AssertionHelper = { + when(catalog.name()).thenReturn(catalogName) + this + } + + def withDefaultCatalog(catalogName: String): AssertionHelper = { + val conf = new SQLConf + conf.setConf(DEFAULT_CATALOG, catalogName) + when(spark.conf).thenReturn(new RuntimeConfig(conf)) + this + } + + def registerCatalog(catalogName: String): AssertionHelper = { + when(spark.sessionState).thenReturn(sessionState) + when(sessionState.catalogManager).thenReturn(catalogManager) + when(catalogManager.isCatalogRegistered(catalogName)).thenReturn(true) + this + } + + def shouldResolveCatalogName(expectedCatalogName: String): Unit = { + assert(resolveCatalogName(spark, catalog) == expectedCatalogName) + } + + def shouldThrowException(): Unit = { + assertThrows[IllegalStateException] { + resolveCatalogName(spark, catalog) + } + } + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index a590eccb1..3a6802704 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -284,8 +284,8 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { Matcher { (plan: LogicalPlan) => val result = plan.exists { case LogicalRelation(_, _, Some(table), _) => - // Table name in logical relation doesn't have catalog name - table.qualifiedName == expectedTableName.split('.').drop(1).mkString(".") + // Since Spark 3.4, Table name in logical relation have catalog name + table.qualifiedName == expectedTableName case _ => false } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala index c099a1a86..f03116de9 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala @@ -7,112 +7,105 @@ package org.opensearch.flint.spark.skipping import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING} -import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} +import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexOptions} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind import org.scalatest.matchers.{Matcher, MatchResult} import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock -import org.apache.spark.SparkFunSuite +import org.apache.spark.FlintSuite import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression, ExprId, Literal, Or} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, SubqueryAlias} -import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions.col -import org.apache.spark.sql.sources.BaseRelation -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { +class ApplyFlintSparkSkippingIndexSuite extends FlintSuite with Matchers { /** Test table and index */ private val testTable = "spark_catalog.default.apply_skipping_index_test" - private val testIndex = getSkippingIndexName(testTable) - private val testSchema = StructType( - Seq( - StructField("name", StringType, nullable = false), - StructField("age", IntegerType, nullable = false), - StructField("address", StringType, nullable = false))) - - /** Resolved column reference used in filtering condition */ - private val nameCol = - AttributeReference("name", StringType, nullable = false)(exprId = ExprId(1)) - private val ageCol = - AttributeReference("age", IntegerType, nullable = false)(exprId = ExprId(2)) - private val addressCol = - AttributeReference("address", StringType, nullable = false)(exprId = ExprId(3)) + + // Mock FlintClient to avoid looking for real OpenSearch cluster + private val clientBuilder = mockStatic(classOf[FlintClientBuilder]) + private val client = mock[FlintClient](RETURNS_DEEP_STUBS) + + /** Mock FlintSpark which is required by the rule */ + private val flint = mock[FlintSpark] + + override protected def beforeAll(): Unit = { + super.beforeAll() + sql(s"CREATE TABLE $testTable (name STRING, age INT, address STRING) USING JSON") + + // Mock static create method in FlintClientBuilder used by Flint data source + clientBuilder + .when(() => FlintClientBuilder.build(any(classOf[FlintOptions]))) + .thenReturn(client) + when(flint.spark).thenReturn(spark) + } + + override protected def afterAll(): Unit = { + sql(s"DROP TABLE $testTable") + clientBuilder.close() + super.afterAll() + } test("should not rewrite query if no skipping index") { assertFlintQueryRewriter() - .withSourceTable(testTable, testSchema) - .withFilter(EqualTo(nameCol, Literal("hello"))) + .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'") .withNoSkippingIndex() .shouldNotRewrite() } test("should not rewrite query if filter condition is disjunction") { assertFlintQueryRewriter() - .withSourceTable(testTable, testSchema) - .withFilter(Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) - .withSkippingIndex(testIndex, REFRESHING, "name", "age") + .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' or age = 30") + .withSkippingIndex(REFRESHING, "name", "age") .shouldNotRewrite() } test("should not rewrite query if filter condition contains disjunction") { assertFlintQueryRewriter() - .withSourceTable(testTable, testSchema) - .withFilter( - And( - Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))), - EqualTo(ageCol, Literal(30)))) - .withSkippingIndex(testIndex, REFRESHING, "name", "age") + .withQuery( + s"SELECT * FROM $testTable WHERE (name = 'hello' or age = 30) and address = 'Seattle'") + .withSkippingIndex(REFRESHING, "name", "age") .shouldNotRewrite() } test("should rewrite query with skipping index") { assertFlintQueryRewriter() - .withSourceTable(testTable, testSchema) - .withFilter(EqualTo(nameCol, Literal("hello"))) - .withSkippingIndex(testIndex, REFRESHING, "name") + .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'") + .withSkippingIndex(REFRESHING, "name") .shouldPushDownAfterRewrite(col("name") === "hello") } test("should not rewrite query with deleted skipping index") { assertFlintQueryRewriter() - .withSourceTable(testTable, testSchema) - .withFilter(EqualTo(nameCol, Literal("hello"))) - .withSkippingIndex(testIndex, DELETED, "name") + .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'") + .withSkippingIndex(DELETED, "name") .shouldNotRewrite() } test("should only push down filter condition with indexed column") { assertFlintQueryRewriter() - .withSourceTable(testTable, testSchema) - .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) - .withSkippingIndex(testIndex, REFRESHING, "name") + .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' and age = 30") + .withSkippingIndex(REFRESHING, "name") .shouldPushDownAfterRewrite(col("name") === "hello") } test("should push down all filter conditions with indexed column") { assertFlintQueryRewriter() - .withSourceTable(testTable, testSchema) - .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) - .withSkippingIndex(testIndex, REFRESHING, "name", "age") + .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' and age = 30") + .withSkippingIndex(REFRESHING, "name", "age") .shouldPushDownAfterRewrite(col("name") === "hello" && col("age") === 30) assertFlintQueryRewriter() - .withSourceTable(testTable, testSchema) - .withFilter( - And( - EqualTo(nameCol, Literal("hello")), - And(EqualTo(ageCol, Literal(30)), EqualTo(addressCol, Literal("Seattle"))))) - .withSkippingIndex(testIndex, REFRESHING, "name", "age", "address") + .withQuery( + s"SELECT * FROM $testTable WHERE name = 'hello' and (age = 30 and address = 'Seattle')") + .withSkippingIndex(REFRESHING, "name", "age", "address") .shouldPushDownAfterRewrite( col("name") === "hello" && col("age") === 30 && col("address") === "Seattle") } @@ -122,46 +115,27 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { } private class AssertionHelper { - private val flint = { - val mockFlint = mock[FlintSpark](RETURNS_DEEP_STUBS) - when(mockFlint.spark.sessionState.catalogManager.currentCatalog.name()) - .thenReturn("spark_catalog") - mockFlint - } private val rule = new ApplyFlintSparkSkippingIndex(flint) - private var relation: LogicalRelation = _ private var plan: LogicalPlan = _ - def withSourceTable(fullname: String, schema: StructType): AssertionHelper = { - val table = CatalogTable( - identifier = TableIdentifier(fullname.split('.')(1), Some(fullname.split('.')(0))), - tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty, - schema = null) - relation = LogicalRelation(mockBaseRelation(schema), table) - this - } - - def withFilter(condition: Expression): AssertionHelper = { - val filter = Filter(condition, relation) - val project = Project(Seq(), filter) - plan = SubqueryAlias("alb_logs", project) + def withQuery(query: String): AssertionHelper = { + this.plan = sql(query).queryExecution.optimizedPlan this } - def withSkippingIndex( - indexName: String, - indexState: IndexState, - indexCols: String*): AssertionHelper = { - val skippingIndex = mock[FlintSparkSkippingIndex] - when(skippingIndex.kind).thenReturn(SKIPPING_INDEX_TYPE) - when(skippingIndex.name()).thenReturn(indexName) - when(skippingIndex.indexedColumns).thenReturn(indexCols.map(FakeSkippingStrategy)) - - // Mock index log entry with the given state - val logEntry = mock[FlintMetadataLogEntry] - when(logEntry.state).thenReturn(indexState) - when(skippingIndex.latestLogEntry).thenReturn(Some(logEntry)) + def withSkippingIndex(indexState: IndexState, indexCols: String*): AssertionHelper = { + val skippingIndex = new FlintSparkSkippingIndex( + tableName = testTable, + indexedColumns = indexCols.map(FakeSkippingStrategy), + options = FlintSparkIndexOptions.empty, + latestLogEntry = Some( + new FlintMetadataLogEntry( + "id", + 0L, + indexState, + Map.empty[String, Any], + "", + Map.empty[String, Any]))) when(flint.describeIndex(any())).thenReturn(Some(skippingIndex)) this @@ -181,23 +155,6 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { } } - private def mockBaseRelation(schema: StructType): BaseRelation = { - val fileIndex = mock[FileIndex] - val baseRelation: HadoopFsRelation = mock[HadoopFsRelation] - when(baseRelation.location).thenReturn(fileIndex) - when(baseRelation.schema).thenReturn(schema) - - // Mock baseRelation.copy(location = FlintFileIndex) - doAnswer((invocation: InvocationOnMock) => { - val location = invocation.getArgument[FileIndex](0) - val relationCopy: HadoopFsRelation = mock[HadoopFsRelation] - when(relationCopy.location).thenReturn(location) - relationCopy - }).when(baseRelation).copy(any(), any(), any(), any(), any(), any())(any()) - - baseRelation - } - private def pushDownFilterToIndexScan(expect: Column): Matcher[LogicalPlan] = { Matcher { (plan: LogicalPlan) => val useFlintSparkSkippingFileIndex = plan.exists { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala index d2ef72158..4b707841c 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala @@ -16,7 +16,7 @@ import org.apache.spark.FlintSuite import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate} -import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} +import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -118,7 +118,8 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = { partitions.map { case (partitionName, filePaths) => - val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path))) + val files = filePaths.map(path => + FileStatusWithMetadata(new FileStatus(0, false, 0, 0, 0, new Path(path)))) PartitionDirectory(InternalRow(Literal(partitionName)), files) }.toSeq } diff --git a/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala index 0bccf787b..fd5c5bf8f 100644 --- a/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala @@ -104,7 +104,9 @@ class FlintDataSourceV2ITSuite val df2 = df.filter($"aText".contains("second")) checkFiltersRemoved(df2) - checkPushedInfo(df2, "PushedPredicates: [aText IS NOT NULL, aText LIKE '%second%']") + checkPushedInfo( + df2, + "PushedPredicates: [aText IS NOT NULL, aText LIKE '%second%' ESCAPE '\\']") checkAnswer(df2, Row(2, "b", "i am second")) val df3 = @@ -117,7 +119,7 @@ class FlintDataSourceV2ITSuite checkFiltersRemoved(df4) checkPushedInfo( df4, - "PushedPredicates: [aInt IS NOT NULL, aText IS NOT NULL, aInt > 1, aText LIKE '%second%']") + "PushedPredicates: [aInt IS NOT NULL, aText IS NOT NULL, aInt > 1, aText LIKE '%second%' ESCAPE '\\']") checkAnswer(df4, Row(2, "b", "i am second")) } } diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala index ea5988577..69900677c 100644 --- a/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala @@ -22,6 +22,7 @@ class OpenSearchCatalogITSuite extends OpenSearchCatalogSuite { } } + // FIXME https://github.com/opensearch-project/opensearch-spark/issues/529 test("Describe single index as table") { val indexName = "t0001" withIndexName(indexName) { @@ -29,16 +30,13 @@ class OpenSearchCatalogITSuite extends OpenSearchCatalogSuite { val df = spark.sql(s""" DESC ${catalogName}.default.$indexName""") - assert(df.count() == 6) + assert(df.count() == 3) checkAnswer( df, Seq( - Row("# Partitioning", "", ""), - Row("", "", ""), - Row("Not partitioned", "", ""), - Row("accountId", "string", ""), - Row("eventName", "string", ""), - Row("eventSource", "string", ""))) + Row("accountId", "string", null), + Row("eventName", "string", null), + Row("eventSource", "string", null))) } } diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala index 21323cca4..832642088 100644 --- a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala @@ -8,7 +8,7 @@ package org.apache.spark.opensearch.table import org.opensearch.flint.spark.FlintSparkSuite trait OpenSearchCatalogSuite extends FlintSparkSuite { - val catalogName = "dev" + override lazy val catalogName = "dev" override def beforeAll(): Unit = { super.beforeAll() diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index fc4cdbeac..439930486 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { /** Test table, MV, index name and query */ - private val testTable = "spark_catalog.default.mv_test" - private val testMvName = "spark_catalog.default.mv_test_metrics" + private val testTable = s"$catalogName.default.mv_test" + private val testMvName = s"$catalogName.default.mv_test_metrics" private val testFlintIndex = getFlintIndexName(testMvName) private val testQuery = s""" @@ -218,7 +218,11 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") { - val tableName = "spark_catalog.default.issue112" + if (tableType.equalsIgnoreCase("iceberg")) { + cancel + } + + val tableName = s"$catalogName.default.issue112" createTableIssue112(tableName) sql(s""" |CREATE MATERIALIZED VIEW $testMvName AS @@ -261,14 +265,14 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("create materialized view with quoted name and column name") { val testQuotedQuery = - """ SELECT + s""" SELECT | window.start AS `start.time`, | COUNT(*) AS `count` - | FROM `spark_catalog`.`default`.`mv_test` + | FROM `$catalogName`.`default`.`mv_test` | GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim sql(s""" - | CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics` + | CREATE MATERIALIZED VIEW `$catalogName`.`default`.`mv_test_metrics` | AS $testQuotedQuery |""".stripMargin) @@ -303,34 +307,34 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("show all materialized views in catalog and database") { // Show in catalog - flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() - checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), Seq(Row("mv1"))) + flint.materializedView().name(s"$catalogName.default.mv1").query(testQuery).create() + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName"), Seq(Row("mv1"))) // Show in catalog.database - flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() + flint.materializedView().name(s"$catalogName.default.mv2").query(testQuery).create() checkAnswer( - sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), + sql(s"SHOW MATERIALIZED VIEW IN $catalogName.default"), Seq(Row("mv1"), Row("mv2"))) - checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.other"), Seq.empty) deleteTestIndex( - getFlintIndexName("spark_catalog.default.mv1"), - getFlintIndexName("spark_catalog.default.mv2")) + getFlintIndexName(s"$catalogName.default.mv1"), + getFlintIndexName(s"$catalogName.default.mv2")) } test("show materialized view in database with the same prefix") { - flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() - flint.materializedView().name("spark_catalog.default_test.mv2").query(testQuery).create() - checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), Seq(Row("mv1"))) + flint.materializedView().name(s"$catalogName.default.mv1").query(testQuery).create() + flint.materializedView().name(s"$catalogName.default_test.mv2").query(testQuery).create() + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.default"), Seq(Row("mv1"))) deleteTestIndex( - getFlintIndexName("spark_catalog.default.mv1"), - getFlintIndexName("spark_catalog.default_test.mv2")) + getFlintIndexName(s"$catalogName.default.mv1"), + getFlintIndexName(s"$catalogName.default_test.mv2")) } test("should return emtpy when show materialized views in empty database") { - checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.other"), Seq.empty) } test("describe materialized view") { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index af497eb2b..751a0a7b6 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuiteHelper { /** Test table and index name */ - private val testTable = "spark_catalog.default.skipping_sql_test" - private val testIndex = getSkippingIndexName(testTable) + protected val testTable = s"$catalogName.default.skipping_sql_test" + protected val testIndex = getSkippingIndexName(testTable) override def beforeEach(): Unit = { super.beforeAll() @@ -201,7 +201,9 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit "`struct_col`.`field1`.`subfield` VALUE_SET, `struct_col`.`field2` MIN_MAX").foreach { columnSkipTypes => test(s"build skipping index for nested field $columnSkipTypes") { - val testTable = "spark_catalog.default.nested_field_table" + assume(tableType != "iceberg", "ignore iceberg skipping index query rewrite test") + + val testTable = s"$catalogName.default.nested_field_table" val testIndex = getSkippingIndexName(testTable) withTable(testTable) { createStructTable(testTable) @@ -339,7 +341,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit test("create skipping index with quoted table and column name") { sql(s""" - | CREATE SKIPPING INDEX ON `spark_catalog`.`default`.`skipping_sql_test` + | CREATE SKIPPING INDEX ON `$catalogName`.`default`.`skipping_sql_test` | ( | `year` PARTITION, | `name` VALUE_SET, @@ -385,17 +387,26 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit sql("USE sample") // Create index without database name specified - sql("CREATE TABLE test1 (name STRING) USING CSV") + sql(s"CREATE TABLE test1 (name STRING) USING $tableType") sql("CREATE SKIPPING INDEX ON test1 (name VALUE_SET)") // Create index with database name specified - sql("CREATE TABLE test2 (name STRING) USING CSV") + sql(s"CREATE TABLE test2 (name STRING) USING $tableType") sql("CREATE SKIPPING INDEX ON sample.test2 (name VALUE_SET)") try { - flint.describeIndex("flint_spark_catalog_sample_test1_skipping_index") shouldBe defined - flint.describeIndex("flint_spark_catalog_sample_test2_skipping_index") shouldBe defined + flint.describeIndex(s"flint_${catalogName}_sample_test1_skipping_index") shouldBe defined + flint.describeIndex(s"flint_${catalogName}_sample_test2_skipping_index") shouldBe defined } finally { + + /** + * TODO: REMOVE DROP TABLE when iceberg support CASCADE. More reading at + * https://github.com/apache/iceberg/pull/7275. + */ + if (tableType.equalsIgnoreCase("iceberg")) { + sql("DROP TABLE test1") + sql("DROP TABLE test2") + } sql("DROP DATABASE sample CASCADE") } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 751647149..ea9945fce 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -35,6 +35,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit lazy protected val flint: FlintSpark = new FlintSpark(spark) lazy protected val tableType: String = "CSV" lazy protected val tableOptions: String = "OPTIONS (header 'false', delimiter '\t')" + lazy protected val catalogName: String = "spark_catalog" override protected def sparkConf: SparkConf = { val conf = super.sparkConf diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala index 4cba9099c..79e70655b 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala @@ -7,11 +7,12 @@ package org.opensearch.flint.spark import java.sql.Timestamp +import org.scalatest.Ignore import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite { @@ -26,8 +27,21 @@ class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite { val resultDF = inputDF.selectExpr("TUMBLE(timestamp, '10 minutes')") - resultDF.schema shouldBe StructType.fromDDL( - "window struct NOT NULL") + // Since Spark 3.4. https://issues.apache.org/jira/browse/SPARK-40821 + val expected = + StructType(StructType.fromDDL("window struct NOT NULL").map { + case StructField(name, dataType: StructType, nullable, _) if name == "window" => + StructField( + name, + dataType, + nullable, + metadata = new MetadataBuilder() + .putBoolean("spark.timeWindow", true) + .build()) + case other => other + }) + + resultDF.schema shouldBe expected checkAnswer( resultDF, Seq( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala index 2ae0d157a..e7ce5316b 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala @@ -22,13 +22,20 @@ trait FlintSparkIcebergSuite extends FlintSparkSuite { // You can also override tableOptions if Iceberg requires different options override lazy protected val tableOptions: String = "" + override lazy protected val catalogName: String = "local" + // Override the sparkConf method to include Iceberg-specific configurations override protected def sparkConf: SparkConf = { val conf = super.sparkConf // Set Iceberg-specific Spark configurations .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") - .set("spark.sql.catalog.spark_catalog.type", "hadoop") - .set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}") + .set("spark.sql.catalog.spark_catalog.type", "hive") + .set(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog") + .set(s"spark.sql.catalog.$catalogName.type", "hadoop") + // Required by IT(create skipping index on table without database name) + .set(s"spark.sql.catalog.$catalogName.default-namespace", "default") + .set(s"spark.sql.catalog.$catalogName.warehouse", s"spark-warehouse/${suiteName}") + .set(s"spark.sql.defaultCatalog", s"$catalogName") .set( "spark.sql.extensions", List( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala index 407c2cb3b..384f14484 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala @@ -171,14 +171,14 @@ class FlintSparkPPLEvalITSuite val ex = intercept[AnalysisException](sql(s""" | source = $testTable | eval age = 40 | eval name = upper(name) | sort name | fields name, age, state | """.stripMargin)) - assert(ex.getMessage().contains("Reference 'name' is ambiguous")) + assert(ex.getMessage().contains("Reference `name` is ambiguous")) } test("test overriding existing fields: throw exception when specify the new field in where") { val ex = intercept[AnalysisException](sql(s""" | source = $testTable | eval age = abs(age) | where age < 50 | """.stripMargin)) - assert(ex.getMessage().contains("Reference 'age' is ambiguous")) + assert(ex.getMessage().contains("Reference `age` is ambiguous")) } test( @@ -186,7 +186,7 @@ class FlintSparkPPLEvalITSuite val ex = intercept[AnalysisException](sql(s""" | source = $testTable | eval age = abs(age) | stats avg(age) | """.stripMargin)) - assert(ex.getMessage().contains("Reference 'age' is ambiguous")) + assert(ex.getMessage().contains("Reference `age` is ambiguous")) } test( @@ -194,7 +194,7 @@ class FlintSparkPPLEvalITSuite val ex = intercept[AnalysisException](sql(s""" | source = $testTable | eval country = upper(country) | stats avg(age) by country | """.stripMargin)) - assert(ex.getMessage().contains("Reference 'country' is ambiguous")) + assert(ex.getMessage().contains("Reference `country` is ambiguous")) } test("test override existing fields: the eval field doesn't appear in fields command") { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala new file mode 100644 index 000000000..fb3d4bbda --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sessioncatalog + +import org.opensearch.flint.spark.FlintSparkSuite + +import org.apache.spark.SparkConf + +/** + * Test with FlintDelegatingSessionCatalog. + */ +trait FlintSessionCatalogSuite extends FlintSparkSuite { + // Override catalog name + override lazy protected val catalogName: String = "mycatalog" + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + .set("spark.sql.catalog.mycatalog", "org.opensearch.sql.FlintDelegatingSessionCatalog") + .set("spark.sql.defaultCatalog", catalogName) + conf + } +} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala new file mode 100644 index 000000000..25567ba4e --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sessioncatalog + +import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite + +/** + * Test MaterializedView with FlintDelegatingSessionCatalog. + */ +class FlintSparkSessionCatalogMaterializedViewITSuite + extends FlintSparkMaterializedViewSqlITSuite + with FlintSessionCatalogSuite {} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala new file mode 100644 index 000000000..7b29d5883 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sessioncatalog + +import org.opensearch.flint.spark.FlintSparkSkippingIndexSqlITSuite + +/** + * Test Skipping Index with FlintDelegatingSessionCatalog. + */ +class FlintSparkSessionCatalogSkippingIndexITSuite + extends FlintSparkSkippingIndexSqlITSuite + with FlintSessionCatalogSuite {} diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala index f6be0b1c3..fc8cb4f4a 100644 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala @@ -45,7 +45,10 @@ class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessio test("query without catalog name") { sql("use mycatalog") - assert(sql("SHOW CATALOGS").collect === Array(Row("mycatalog"))) + // Since Spark 3.4.0. https://issues.apache.org/jira/browse/SPARK-40055, listCatalogs should + // also return spark_catalog even spark_catalog implementation is defaultSessionCatalog + assert( + sql("SHOW CATALOGS").collect.toSet === Array(Row("mycatalog"), Row("spark_catalog")).toSet) checkAnswer(sql(s"SELECT name, age FROM $testTableWithoutCatalog"), Seq(Row("Hello", 30))) }