From 70837466958e29db19aa7e7007c01c0beb4dda7d Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 16 Aug 2024 17:31:37 -0700 Subject: [PATCH] Revert "Upgrade Spark 3.5.1 (#525)" This reverts commit d6e71fa1b14566622d4aa9c5744017cc7500033f. --- README.md | 2 +- build.sbt | 6 +- .../core/metrics/reporter/DimensionUtils.java | 4 +- .../DimensionedCloudWatchReporter.java | 1 + .../apache/spark/sql/flint/FlintScan.scala | 15 +- .../spark/sql/flint/FlintScanBuilder.scala | 3 - .../apache/spark/sql/flint/FlintWrite.scala | 2 - .../sql/flint/json/FlintJacksonParser.scala | 15 +- .../org/apache/spark/sql/flint/package.scala | 48 +---- .../FlintSparkSkippingFileIndex.scala | 4 +- .../skipping/FlintSparkSkippingStrategy.scala | 14 -- .../bloomfilter/BloomFilterMightContain.scala | 5 +- .../apache/spark/sql/FlintCatalogSuite.scala | 91 ---------- .../ApplyFlintSparkCoveringIndexSuite.scala | 4 +- .../ApplyFlintSparkSkippingIndexSuite.scala | 169 +++++++++++------- .../FlintSparkSkippingFileIndexSuite.scala | 5 +- .../spark/FlintDataSourceV2ITSuite.scala | 6 +- .../catalog/OpenSearchCatalogITSuite.scala | 12 +- .../table/OpenSearchCatalogSuite.scala | 2 +- ...FlintSparkMaterializedViewSqlITSuite.scala | 42 ++--- .../FlintSparkSkippingIndexSqlITSuite.scala | 27 +-- .../flint/spark/FlintSparkSuite.scala | 1 - .../FlintSparkWindowingFunctionITSuite.scala | 20 +-- .../iceberg/FlintSparkIcebergSuite.scala | 11 +- .../spark/ppl/FlintSparkPPLEvalITSuite.scala | 8 +- .../FlintSessionCatalogSuite.scala | 25 --- ...essionCatalogMaterializedViewITSuite.scala | 15 -- ...rkSessionCatalogSkippingIndexITSuite.scala | 15 -- .../FlintDelegatingSessionCatalogTest.scala | 5 +- 29 files changed, 188 insertions(+), 389 deletions(-) delete mode 100644 flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala delete mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala delete mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala delete mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala diff --git a/README.md b/README.md index eabcd7fd3..f9568838e 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Version compatibility: | 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ | | 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ | | 0.4.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ | -| 0.5.0 | 11+ | 3.5.1 | 2.12.14 | 2.13+ | +| 0.5.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ | ## Flint Extension Usage diff --git a/build.sbt b/build.sbt index d4d7352e3..1c12885fb 100644 --- a/build.sbt +++ b/build.sbt @@ -5,10 +5,10 @@ import Dependencies._ lazy val scala212 = "2.12.14" -lazy val sparkVersion = "3.5.1" -// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version should compatible +lazy val sparkVersion = "3.3.2" +// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile // https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59 -lazy val jacksonVersion = "2.15.2" +lazy val jacksonVersion = "2.13.4" // The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version. // Issue: https://github.com/opensearch-project/opensearch-spark/issues/442 diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java index c3e00a067..6e3e90916 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionUtils.java @@ -10,9 +10,9 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.commons.lang.StringUtils; import com.amazonaws.services.cloudwatch.model.Dimension; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkEnv; /** @@ -124,4 +124,4 @@ private static Dimension getEnvironmentVariableDimension(String envVarName, Stri private static Dimension getDefaultDimension(String dimensionName) { return getEnvironmentVariableDimension(dimensionName, dimensionName); } -} +} \ No newline at end of file diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java index a5ea190c5..f4d456899 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -46,6 +46,7 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; +import org.apache.commons.lang.StringUtils; import org.apache.spark.metrics.sink.CloudWatchSink.DimensionNameGroups; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala index 9b9f70be0..201e7c748 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala @@ -5,8 +5,7 @@ package org.apache.spark.sql.flint -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain - +import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.flint.config.FlintSparkConf @@ -18,7 +17,8 @@ case class FlintScan( options: FlintSparkConf, pushedPredicates: Array[Predicate]) extends Scan - with Batch { + with Batch + with Logging { override def readSchema(): StructType = schema @@ -44,13 +44,10 @@ case class FlintScan( * Print pushedPredicates when explain(mode="extended"). Learn from SPARK JDBCScan. */ override def description(): String = { - super.description() + ", PushedPredicates: " + pushedPredicates - .map { - case p if p.name().equalsIgnoreCase(BloomFilterMightContain.NAME) => p.name() - case p => p.toString() - } - .mkString("[", ", ", "]") + super.description() + ", PushedPredicates: " + seqToString(pushedPredicates) } + + private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") } /** diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala index 82a570b2f..0c6f7d700 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala @@ -5,8 +5,6 @@ package org.apache.spark.sql.flint -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain - import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownV2Filters} @@ -36,5 +34,4 @@ case class FlintScanBuilder( } override def pushedPredicates(): Array[Predicate] = pushedPredicate - .filterNot(_.name().equalsIgnoreCase(BloomFilterMightContain.NAME)) } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala index 8691de3d0..25b4db940 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintWrite.scala @@ -48,6 +48,4 @@ case class FlintWrite( override def toBatch: BatchWrite = this override def toStreaming: StreamingWrite = this - - override def useCommitCoordinator(): Boolean = false } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala index a9e9122fb..31db1909f 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonParser.scala @@ -118,8 +118,7 @@ class FlintJacksonParser( array.toArray[InternalRow](schema) } case START_ARRAY => - throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError( - parser.currentToken().asString()) + throw QueryExecutionErrors.cannotParseJsonArraysAsStructsError() } } @@ -421,17 +420,17 @@ class FlintJacksonParser( case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString => dataType match { case FloatType | DoubleType | TimestampType | DateType => - throw QueryExecutionErrors.emptyJsonFieldValueError(dataType) + throw QueryExecutionErrors.failToParseEmptyStringForDataTypeError(dataType) case _ => null } case VALUE_STRING if parser.getTextLength < 1 => - throw QueryExecutionErrors.emptyJsonFieldValueError(dataType) + throw QueryExecutionErrors.failToParseEmptyStringForDataTypeError(dataType) case token => // We cannot parse this token based on the given data type. So, we throw a // RuntimeException and this exception will be caught by `parse` method. - throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, dataType) + throw QueryExecutionErrors.failToParseValueForDataTypeError(parser, token, dataType) } /** @@ -538,7 +537,7 @@ class FlintJacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => recordLiteral(record), cause = e) + throw BadRecordException(() => recordLiteral(record), () => None, e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -546,11 +545,11 @@ class FlintJacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException) + throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => recordLiteral(record), - partialResults = () => Array(row), + partialResult = () => Some(row), cause) } } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala index d71bc5d12..06c92882b 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala @@ -9,7 +9,6 @@ import java.util.concurrent.ScheduledExecutorService import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.internal.SQLConf.DEFAULT_CATALOG import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.{ShutdownHookManager, ThreadUtils} @@ -73,8 +72,14 @@ package object flint { def qualifyTableName(spark: SparkSession, tableName: String): String = { val (catalog, ident) = parseTableName(spark, tableName) - // more reading at https://github.com/opensearch-project/opensearch-spark/issues/319. - val catalogName = resolveCatalogName(spark, catalog) + // Tricky that our Flint delegate catalog's name has to be spark_catalog + // so we have to find its actual name in CatalogManager + val catalogMgr = spark.sessionState.catalogManager + val catalogName = + catalogMgr + .listCatalogs(Some("*")) + .find(catalogMgr.catalog(_) == catalog) + .getOrElse(catalog.name()) s"$catalogName.${ident.namespace.mkString(".")}.${ident.name}" } @@ -129,41 +134,4 @@ package object flint { def findField(rootField: StructType, fieldName: String): Option[StructField] = { rootField.findNestedField(fieldName.split('.')).map(_._2) } - - /** - * Resolve catalog name. spark.sql.defaultCatalog name is returned if catalog.name is - * spark_catalog otherwise, catalog.name is returned. - * @see - * issue319 - * - * @param spark - * Spark Session - * @param catalog - * Spark Catalog - * @return - * catalog name. - */ - def resolveCatalogName(spark: SparkSession, catalog: CatalogPlugin): String = { - - /** - * Check if the provided catalog is a session catalog. - */ - if (CatalogV2Util.isSessionCatalog(catalog)) { - val defaultCatalog = spark.conf.get(DEFAULT_CATALOG) - if (spark.sessionState.catalogManager.isCatalogRegistered(defaultCatalog)) { - defaultCatalog - } else { - - /** - * It may happen when spark.sql.defaultCatalog is configured, but there's no - * implementation. For instance, spark.sql.defaultCatalog = "unknown" - */ - throw new IllegalStateException(s"Unknown catalog name: $defaultCatalog") - } - } else { - // Return the name for non-session catalogs - catalog.name() - } - } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index dc7875ccf..bd7abcfb3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -10,7 +10,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COL import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.functions.isnull import org.apache.spark.sql.types.StructType @@ -96,7 +96,7 @@ case class FlintSparkSkippingFileIndex( .toSet } - private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatusWithMetadata) = { + private def isFileNotSkipped(selectedFiles: Set[String], f: FileStatus) = { selectedFiles.contains(f.getPath.toUri.toString) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala index fa9b23951..de2ea772d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala @@ -11,10 +11,7 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKi import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GetStructField} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke -import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types.StringType /** * Skipping index strategy that defines skipping data structure building and reading logic. @@ -118,17 +115,6 @@ object FlintSparkSkippingStrategy { Seq(attr.name) case GetStructField(child, _, Some(name)) => extractColumnName(child) :+ name - /** - * Since Spark 3.4 add read-side padding, char_col = "sample char" became - * (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, - * StringType, readSidePadding, char_col#47, 20, true, false, true) = sample char ) - * - * When create skipping index, Spark did write-side padding. So read-side push down can be - * ignored. More reading, https://issues.apache.org/jira/browse/SPARK-40697 - */ - case StaticInvoke(staticObject, StringType, "readSidePadding", arguments, _, _, _, _) - if classOf[CharVarcharCodegenUtils].isAssignableFrom(staticObject) => - extractColumnName(arguments.head) case _ => Seq.empty } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala index c7ba66c9c..653abbd7d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/bloomfilter/BloomFilterMightContain.scala @@ -8,7 +8,6 @@ package org.opensearch.flint.spark.skipping.bloomfilter import java.io.ByteArrayInputStream import org.opensearch.flint.core.field.bloomfilter.classic.ClassicBloomFilter -import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain.NAME import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.InternalRow @@ -41,7 +40,7 @@ case class BloomFilterMightContain(bloomFilterExpression: Expression, valueExpre override def dataType: DataType = BooleanType - override def symbol: String = NAME + override def symbol: String = "BLOOM_FILTER_MIGHT_CONTAIN" override def checkInputDataTypes(): TypeCheckResult = { (left.dataType, right.dataType) match { @@ -110,8 +109,6 @@ case class BloomFilterMightContain(bloomFilterExpression: Expression, valueExpre object BloomFilterMightContain { - val NAME = "BLOOM_FILTER_MIGHT_CONTAIN" - /** * Generate bloom filter might contain function given the bloom filter column and value. * diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala deleted file mode 100644 index 3c75cf541..000000000 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/FlintCatalogSuite.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark.sql - -import org.mockito.Mockito.when -import org.scalatestplus.mockito.MockitoSugar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin} -import org.apache.spark.sql.flint.resolveCatalogName -import org.apache.spark.sql.internal.{SessionState, SQLConf} -import org.apache.spark.sql.internal.SQLConf.DEFAULT_CATALOG - -class FlintCatalogSuite extends SparkFunSuite with MockitoSugar { - - test("resolveCatalogName returns default catalog name for session catalog") { - assertCatalog() - .withCatalogName("spark_catalog") - .withDefaultCatalog("glue") - .registerCatalog("glue") - .shouldResolveCatalogName("glue") - } - - test("resolveCatalogName returns default catalog name for spark_catalog") { - assertCatalog() - .withCatalogName("spark_catalog") - .withDefaultCatalog("spark_catalog") - .registerCatalog("spark_catalog") - .shouldResolveCatalogName("spark_catalog") - } - - test("resolveCatalogName should return catalog name for non-session catalogs") { - assertCatalog() - .withCatalogName("custom_catalog") - .withDefaultCatalog("custom_catalog") - .registerCatalog("custom_catalog") - .shouldResolveCatalogName("custom_catalog") - } - - test( - "resolveCatalogName should throw RuntimeException when default catalog is not registered") { - assertCatalog() - .withCatalogName("spark_catalog") - .withDefaultCatalog("glue") - .registerCatalog("unknown") - .shouldThrowException() - } - - private def assertCatalog(): AssertionHelper = { - new AssertionHelper - } - - private class AssertionHelper { - private val spark = mock[SparkSession] - private val catalog = mock[CatalogPlugin] - private val sessionState = mock[SessionState] - private val catalogManager = mock[CatalogManager] - - def withCatalogName(catalogName: String): AssertionHelper = { - when(catalog.name()).thenReturn(catalogName) - this - } - - def withDefaultCatalog(catalogName: String): AssertionHelper = { - val conf = new SQLConf - conf.setConf(DEFAULT_CATALOG, catalogName) - when(spark.conf).thenReturn(new RuntimeConfig(conf)) - this - } - - def registerCatalog(catalogName: String): AssertionHelper = { - when(spark.sessionState).thenReturn(sessionState) - when(sessionState.catalogManager).thenReturn(catalogManager) - when(catalogManager.isCatalogRegistered(catalogName)).thenReturn(true) - this - } - - def shouldResolveCatalogName(expectedCatalogName: String): Unit = { - assert(resolveCatalogName(spark, catalog) == expectedCatalogName) - } - - def shouldThrowException(): Unit = { - assertThrows[IllegalStateException] { - resolveCatalogName(spark, catalog) - } - } - } -} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 3a6802704..a590eccb1 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -284,8 +284,8 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { Matcher { (plan: LogicalPlan) => val result = plan.exists { case LogicalRelation(_, _, Some(table), _) => - // Since Spark 3.4, Table name in logical relation have catalog name - table.qualifiedName == expectedTableName + // Table name in logical relation doesn't have catalog name + table.qualifiedName == expectedTableName.split('.').drop(1).mkString(".") case _ => false } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala index f03116de9..c099a1a86 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala @@ -7,105 +7,112 @@ package org.opensearch.flint.spark.skipping import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING} -import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexOptions} +import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind import org.scalatest.matchers.{Matcher, MatchResult} import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock -import org.apache.spark.FlintSuite +import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression, ExprId, Literal, Or} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions.col +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -class ApplyFlintSparkSkippingIndexSuite extends FlintSuite with Matchers { +class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { /** Test table and index */ private val testTable = "spark_catalog.default.apply_skipping_index_test" - - // Mock FlintClient to avoid looking for real OpenSearch cluster - private val clientBuilder = mockStatic(classOf[FlintClientBuilder]) - private val client = mock[FlintClient](RETURNS_DEEP_STUBS) - - /** Mock FlintSpark which is required by the rule */ - private val flint = mock[FlintSpark] - - override protected def beforeAll(): Unit = { - super.beforeAll() - sql(s"CREATE TABLE $testTable (name STRING, age INT, address STRING) USING JSON") - - // Mock static create method in FlintClientBuilder used by Flint data source - clientBuilder - .when(() => FlintClientBuilder.build(any(classOf[FlintOptions]))) - .thenReturn(client) - when(flint.spark).thenReturn(spark) - } - - override protected def afterAll(): Unit = { - sql(s"DROP TABLE $testTable") - clientBuilder.close() - super.afterAll() - } + private val testIndex = getSkippingIndexName(testTable) + private val testSchema = StructType( + Seq( + StructField("name", StringType, nullable = false), + StructField("age", IntegerType, nullable = false), + StructField("address", StringType, nullable = false))) + + /** Resolved column reference used in filtering condition */ + private val nameCol = + AttributeReference("name", StringType, nullable = false)(exprId = ExprId(1)) + private val ageCol = + AttributeReference("age", IntegerType, nullable = false)(exprId = ExprId(2)) + private val addressCol = + AttributeReference("address", StringType, nullable = false)(exprId = ExprId(3)) test("should not rewrite query if no skipping index") { assertFlintQueryRewriter() - .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'") + .withSourceTable(testTable, testSchema) + .withFilter(EqualTo(nameCol, Literal("hello"))) .withNoSkippingIndex() .shouldNotRewrite() } test("should not rewrite query if filter condition is disjunction") { assertFlintQueryRewriter() - .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' or age = 30") - .withSkippingIndex(REFRESHING, "name", "age") + .withSourceTable(testTable, testSchema) + .withFilter(Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) + .withSkippingIndex(testIndex, REFRESHING, "name", "age") .shouldNotRewrite() } test("should not rewrite query if filter condition contains disjunction") { assertFlintQueryRewriter() - .withQuery( - s"SELECT * FROM $testTable WHERE (name = 'hello' or age = 30) and address = 'Seattle'") - .withSkippingIndex(REFRESHING, "name", "age") + .withSourceTable(testTable, testSchema) + .withFilter( + And( + Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))), + EqualTo(ageCol, Literal(30)))) + .withSkippingIndex(testIndex, REFRESHING, "name", "age") .shouldNotRewrite() } test("should rewrite query with skipping index") { assertFlintQueryRewriter() - .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'") - .withSkippingIndex(REFRESHING, "name") + .withSourceTable(testTable, testSchema) + .withFilter(EqualTo(nameCol, Literal("hello"))) + .withSkippingIndex(testIndex, REFRESHING, "name") .shouldPushDownAfterRewrite(col("name") === "hello") } test("should not rewrite query with deleted skipping index") { assertFlintQueryRewriter() - .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello'") - .withSkippingIndex(DELETED, "name") + .withSourceTable(testTable, testSchema) + .withFilter(EqualTo(nameCol, Literal("hello"))) + .withSkippingIndex(testIndex, DELETED, "name") .shouldNotRewrite() } test("should only push down filter condition with indexed column") { assertFlintQueryRewriter() - .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' and age = 30") - .withSkippingIndex(REFRESHING, "name") + .withSourceTable(testTable, testSchema) + .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) + .withSkippingIndex(testIndex, REFRESHING, "name") .shouldPushDownAfterRewrite(col("name") === "hello") } test("should push down all filter conditions with indexed column") { assertFlintQueryRewriter() - .withQuery(s"SELECT * FROM $testTable WHERE name = 'hello' and age = 30") - .withSkippingIndex(REFRESHING, "name", "age") + .withSourceTable(testTable, testSchema) + .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) + .withSkippingIndex(testIndex, REFRESHING, "name", "age") .shouldPushDownAfterRewrite(col("name") === "hello" && col("age") === 30) assertFlintQueryRewriter() - .withQuery( - s"SELECT * FROM $testTable WHERE name = 'hello' and (age = 30 and address = 'Seattle')") - .withSkippingIndex(REFRESHING, "name", "age", "address") + .withSourceTable(testTable, testSchema) + .withFilter( + And( + EqualTo(nameCol, Literal("hello")), + And(EqualTo(ageCol, Literal(30)), EqualTo(addressCol, Literal("Seattle"))))) + .withSkippingIndex(testIndex, REFRESHING, "name", "age", "address") .shouldPushDownAfterRewrite( col("name") === "hello" && col("age") === 30 && col("address") === "Seattle") } @@ -115,27 +122,46 @@ class ApplyFlintSparkSkippingIndexSuite extends FlintSuite with Matchers { } private class AssertionHelper { + private val flint = { + val mockFlint = mock[FlintSpark](RETURNS_DEEP_STUBS) + when(mockFlint.spark.sessionState.catalogManager.currentCatalog.name()) + .thenReturn("spark_catalog") + mockFlint + } private val rule = new ApplyFlintSparkSkippingIndex(flint) + private var relation: LogicalRelation = _ private var plan: LogicalPlan = _ - def withQuery(query: String): AssertionHelper = { - this.plan = sql(query).queryExecution.optimizedPlan + def withSourceTable(fullname: String, schema: StructType): AssertionHelper = { + val table = CatalogTable( + identifier = TableIdentifier(fullname.split('.')(1), Some(fullname.split('.')(0))), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty, + schema = null) + relation = LogicalRelation(mockBaseRelation(schema), table) + this + } + + def withFilter(condition: Expression): AssertionHelper = { + val filter = Filter(condition, relation) + val project = Project(Seq(), filter) + plan = SubqueryAlias("alb_logs", project) this } - def withSkippingIndex(indexState: IndexState, indexCols: String*): AssertionHelper = { - val skippingIndex = new FlintSparkSkippingIndex( - tableName = testTable, - indexedColumns = indexCols.map(FakeSkippingStrategy), - options = FlintSparkIndexOptions.empty, - latestLogEntry = Some( - new FlintMetadataLogEntry( - "id", - 0L, - indexState, - Map.empty[String, Any], - "", - Map.empty[String, Any]))) + def withSkippingIndex( + indexName: String, + indexState: IndexState, + indexCols: String*): AssertionHelper = { + val skippingIndex = mock[FlintSparkSkippingIndex] + when(skippingIndex.kind).thenReturn(SKIPPING_INDEX_TYPE) + when(skippingIndex.name()).thenReturn(indexName) + when(skippingIndex.indexedColumns).thenReturn(indexCols.map(FakeSkippingStrategy)) + + // Mock index log entry with the given state + val logEntry = mock[FlintMetadataLogEntry] + when(logEntry.state).thenReturn(indexState) + when(skippingIndex.latestLogEntry).thenReturn(Some(logEntry)) when(flint.describeIndex(any())).thenReturn(Some(skippingIndex)) this @@ -155,6 +181,23 @@ class ApplyFlintSparkSkippingIndexSuite extends FlintSuite with Matchers { } } + private def mockBaseRelation(schema: StructType): BaseRelation = { + val fileIndex = mock[FileIndex] + val baseRelation: HadoopFsRelation = mock[HadoopFsRelation] + when(baseRelation.location).thenReturn(fileIndex) + when(baseRelation.schema).thenReturn(schema) + + // Mock baseRelation.copy(location = FlintFileIndex) + doAnswer((invocation: InvocationOnMock) => { + val location = invocation.getArgument[FileIndex](0) + val relationCopy: HadoopFsRelation = mock[HadoopFsRelation] + when(relationCopy.location).thenReturn(location) + relationCopy + }).when(baseRelation).copy(any(), any(), any(), any(), any(), any())(any()) + + baseRelation + } + private def pushDownFilterToIndexScan(expect: Column): Matcher[LogicalPlan] = { Matcher { (plan: LogicalPlan) => val useFlintSparkSkippingFileIndex = plan.exists { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala index 4b707841c..d2ef72158 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndexSuite.scala @@ -16,7 +16,7 @@ import org.apache.spark.FlintSuite import org.apache.spark.sql.{Column, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Predicate} -import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusWithMetadata, PartitionDirectory} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -118,8 +118,7 @@ class FlintSparkSkippingFileIndexSuite extends FlintSuite with Matchers { private def mockPartitions(partitions: Map[String, Seq[String]]): Seq[PartitionDirectory] = { partitions.map { case (partitionName, filePaths) => - val files = filePaths.map(path => - FileStatusWithMetadata(new FileStatus(0, false, 0, 0, 0, new Path(path)))) + val files = filePaths.map(path => new FileStatus(0, false, 0, 0, 0, new Path(path))) PartitionDirectory(InternalRow(Literal(partitionName)), files) }.toSeq } diff --git a/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala index fd5c5bf8f..0bccf787b 100644 --- a/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala @@ -104,9 +104,7 @@ class FlintDataSourceV2ITSuite val df2 = df.filter($"aText".contains("second")) checkFiltersRemoved(df2) - checkPushedInfo( - df2, - "PushedPredicates: [aText IS NOT NULL, aText LIKE '%second%' ESCAPE '\\']") + checkPushedInfo(df2, "PushedPredicates: [aText IS NOT NULL, aText LIKE '%second%']") checkAnswer(df2, Row(2, "b", "i am second")) val df3 = @@ -119,7 +117,7 @@ class FlintDataSourceV2ITSuite checkFiltersRemoved(df4) checkPushedInfo( df4, - "PushedPredicates: [aInt IS NOT NULL, aText IS NOT NULL, aInt > 1, aText LIKE '%second%' ESCAPE '\\']") + "PushedPredicates: [aInt IS NOT NULL, aText IS NOT NULL, aInt > 1, aText LIKE '%second%']") checkAnswer(df4, Row(2, "b", "i am second")) } } diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala index 69900677c..ea5988577 100644 --- a/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/catalog/OpenSearchCatalogITSuite.scala @@ -22,7 +22,6 @@ class OpenSearchCatalogITSuite extends OpenSearchCatalogSuite { } } - // FIXME https://github.com/opensearch-project/opensearch-spark/issues/529 test("Describe single index as table") { val indexName = "t0001" withIndexName(indexName) { @@ -30,13 +29,16 @@ class OpenSearchCatalogITSuite extends OpenSearchCatalogSuite { val df = spark.sql(s""" DESC ${catalogName}.default.$indexName""") - assert(df.count() == 3) + assert(df.count() == 6) checkAnswer( df, Seq( - Row("accountId", "string", null), - Row("eventName", "string", null), - Row("eventSource", "string", null))) + Row("# Partitioning", "", ""), + Row("", "", ""), + Row("Not partitioned", "", ""), + Row("accountId", "string", ""), + Row("eventName", "string", ""), + Row("eventSource", "string", ""))) } } diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala index 832642088..21323cca4 100644 --- a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala @@ -8,7 +8,7 @@ package org.apache.spark.opensearch.table import org.opensearch.flint.spark.FlintSparkSuite trait OpenSearchCatalogSuite extends FlintSparkSuite { - override lazy val catalogName = "dev" + val catalogName = "dev" override def beforeAll(): Unit = { super.beforeAll() diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 439930486..fc4cdbeac 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { /** Test table, MV, index name and query */ - private val testTable = s"$catalogName.default.mv_test" - private val testMvName = s"$catalogName.default.mv_test_metrics" + private val testTable = "spark_catalog.default.mv_test" + private val testMvName = "spark_catalog.default.mv_test_metrics" private val testFlintIndex = getFlintIndexName(testMvName) private val testQuery = s""" @@ -218,11 +218,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") { - if (tableType.equalsIgnoreCase("iceberg")) { - cancel - } - - val tableName = s"$catalogName.default.issue112" + val tableName = "spark_catalog.default.issue112" createTableIssue112(tableName) sql(s""" |CREATE MATERIALIZED VIEW $testMvName AS @@ -265,14 +261,14 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("create materialized view with quoted name and column name") { val testQuotedQuery = - s""" SELECT + """ SELECT | window.start AS `start.time`, | COUNT(*) AS `count` - | FROM `$catalogName`.`default`.`mv_test` + | FROM `spark_catalog`.`default`.`mv_test` | GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim sql(s""" - | CREATE MATERIALIZED VIEW `$catalogName`.`default`.`mv_test_metrics` + | CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics` | AS $testQuotedQuery |""".stripMargin) @@ -307,34 +303,34 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("show all materialized views in catalog and database") { // Show in catalog - flint.materializedView().name(s"$catalogName.default.mv1").query(testQuery).create() - checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName"), Seq(Row("mv1"))) + flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), Seq(Row("mv1"))) // Show in catalog.database - flint.materializedView().name(s"$catalogName.default.mv2").query(testQuery).create() + flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() checkAnswer( - sql(s"SHOW MATERIALIZED VIEW IN $catalogName.default"), + sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), Seq(Row("mv1"), Row("mv2"))) - checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.other"), Seq.empty) + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) deleteTestIndex( - getFlintIndexName(s"$catalogName.default.mv1"), - getFlintIndexName(s"$catalogName.default.mv2")) + getFlintIndexName("spark_catalog.default.mv1"), + getFlintIndexName("spark_catalog.default.mv2")) } test("show materialized view in database with the same prefix") { - flint.materializedView().name(s"$catalogName.default.mv1").query(testQuery).create() - flint.materializedView().name(s"$catalogName.default_test.mv2").query(testQuery).create() - checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.default"), Seq(Row("mv1"))) + flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() + flint.materializedView().name("spark_catalog.default_test.mv2").query(testQuery).create() + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), Seq(Row("mv1"))) deleteTestIndex( - getFlintIndexName(s"$catalogName.default.mv1"), - getFlintIndexName(s"$catalogName.default_test.mv2")) + getFlintIndexName("spark_catalog.default.mv1"), + getFlintIndexName("spark_catalog.default_test.mv2")) } test("should return emtpy when show materialized views in empty database") { - checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN $catalogName.other"), Seq.empty) + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) } test("describe materialized view") { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 751a0a7b6..af497eb2b 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuiteHelper { /** Test table and index name */ - protected val testTable = s"$catalogName.default.skipping_sql_test" - protected val testIndex = getSkippingIndexName(testTable) + private val testTable = "spark_catalog.default.skipping_sql_test" + private val testIndex = getSkippingIndexName(testTable) override def beforeEach(): Unit = { super.beforeAll() @@ -201,9 +201,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit "`struct_col`.`field1`.`subfield` VALUE_SET, `struct_col`.`field2` MIN_MAX").foreach { columnSkipTypes => test(s"build skipping index for nested field $columnSkipTypes") { - assume(tableType != "iceberg", "ignore iceberg skipping index query rewrite test") - - val testTable = s"$catalogName.default.nested_field_table" + val testTable = "spark_catalog.default.nested_field_table" val testIndex = getSkippingIndexName(testTable) withTable(testTable) { createStructTable(testTable) @@ -341,7 +339,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit test("create skipping index with quoted table and column name") { sql(s""" - | CREATE SKIPPING INDEX ON `$catalogName`.`default`.`skipping_sql_test` + | CREATE SKIPPING INDEX ON `spark_catalog`.`default`.`skipping_sql_test` | ( | `year` PARTITION, | `name` VALUE_SET, @@ -387,26 +385,17 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit sql("USE sample") // Create index without database name specified - sql(s"CREATE TABLE test1 (name STRING) USING $tableType") + sql("CREATE TABLE test1 (name STRING) USING CSV") sql("CREATE SKIPPING INDEX ON test1 (name VALUE_SET)") // Create index with database name specified - sql(s"CREATE TABLE test2 (name STRING) USING $tableType") + sql("CREATE TABLE test2 (name STRING) USING CSV") sql("CREATE SKIPPING INDEX ON sample.test2 (name VALUE_SET)") try { - flint.describeIndex(s"flint_${catalogName}_sample_test1_skipping_index") shouldBe defined - flint.describeIndex(s"flint_${catalogName}_sample_test2_skipping_index") shouldBe defined + flint.describeIndex("flint_spark_catalog_sample_test1_skipping_index") shouldBe defined + flint.describeIndex("flint_spark_catalog_sample_test2_skipping_index") shouldBe defined } finally { - - /** - * TODO: REMOVE DROP TABLE when iceberg support CASCADE. More reading at - * https://github.com/apache/iceberg/pull/7275. - */ - if (tableType.equalsIgnoreCase("iceberg")) { - sql("DROP TABLE test1") - sql("DROP TABLE test2") - } sql("DROP DATABASE sample CASCADE") } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 8766a8cb0..5a477a822 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -35,7 +35,6 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit lazy protected val flint: FlintSpark = new FlintSpark(spark) lazy protected val tableType: String = "CSV" lazy protected val tableOptions: String = "OPTIONS (header 'false', delimiter '\t')" - lazy protected val catalogName: String = "spark_catalog" override protected def sparkConf: SparkConf = { val conf = super.sparkConf diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala index 79e70655b..4cba9099c 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkWindowingFunctionITSuite.scala @@ -7,12 +7,11 @@ package org.opensearch.flint.spark import java.sql.Timestamp -import org.scalatest.Ignore import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.types.StructType class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite { @@ -27,21 +26,8 @@ class FlintSparkWindowingFunctionITSuite extends QueryTest with FlintSuite { val resultDF = inputDF.selectExpr("TUMBLE(timestamp, '10 minutes')") - // Since Spark 3.4. https://issues.apache.org/jira/browse/SPARK-40821 - val expected = - StructType(StructType.fromDDL("window struct NOT NULL").map { - case StructField(name, dataType: StructType, nullable, _) if name == "window" => - StructField( - name, - dataType, - nullable, - metadata = new MetadataBuilder() - .putBoolean("spark.timeWindow", true) - .build()) - case other => other - }) - - resultDF.schema shouldBe expected + resultDF.schema shouldBe StructType.fromDDL( + "window struct NOT NULL") checkAnswer( resultDF, Seq( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala index e7ce5316b..2ae0d157a 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala @@ -22,20 +22,13 @@ trait FlintSparkIcebergSuite extends FlintSparkSuite { // You can also override tableOptions if Iceberg requires different options override lazy protected val tableOptions: String = "" - override lazy protected val catalogName: String = "local" - // Override the sparkConf method to include Iceberg-specific configurations override protected def sparkConf: SparkConf = { val conf = super.sparkConf // Set Iceberg-specific Spark configurations .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") - .set("spark.sql.catalog.spark_catalog.type", "hive") - .set(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog") - .set(s"spark.sql.catalog.$catalogName.type", "hadoop") - // Required by IT(create skipping index on table without database name) - .set(s"spark.sql.catalog.$catalogName.default-namespace", "default") - .set(s"spark.sql.catalog.$catalogName.warehouse", s"spark-warehouse/${suiteName}") - .set(s"spark.sql.defaultCatalog", s"$catalogName") + .set("spark.sql.catalog.spark_catalog.type", "hadoop") + .set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}") .set( "spark.sql.extensions", List( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala index ea77ff990..19295fbe8 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLEvalITSuite.scala @@ -171,14 +171,14 @@ class FlintSparkPPLEvalITSuite val ex = intercept[AnalysisException](sql(s""" | source = $testTable | eval age = 40 | eval name = upper(name) | sort name | fields name, age, state | """.stripMargin)) - assert(ex.getMessage().contains("Reference `name` is ambiguous")) + assert(ex.getMessage().contains("Reference 'name' is ambiguous")) } test("test overriding existing fields: throw exception when specify the new field in where") { val ex = intercept[AnalysisException](sql(s""" | source = $testTable | eval age = abs(age) | where age < 50 | """.stripMargin)) - assert(ex.getMessage().contains("Reference `age` is ambiguous")) + assert(ex.getMessage().contains("Reference 'age' is ambiguous")) } test( @@ -186,7 +186,7 @@ class FlintSparkPPLEvalITSuite val ex = intercept[AnalysisException](sql(s""" | source = $testTable | eval age = abs(age) | stats avg(age) | """.stripMargin)) - assert(ex.getMessage().contains("Reference `age` is ambiguous")) + assert(ex.getMessage().contains("Reference 'age' is ambiguous")) } test( @@ -194,7 +194,7 @@ class FlintSparkPPLEvalITSuite val ex = intercept[AnalysisException](sql(s""" | source = $testTable | eval country = upper(country) | stats avg(age) by country | """.stripMargin)) - assert(ex.getMessage().contains("Reference `country` is ambiguous")) + assert(ex.getMessage().contains("Reference 'country' is ambiguous")) } test("test override existing fields: the eval field doesn't appear in fields command") { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala deleted file mode 100644 index fb3d4bbda..000000000 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSessionCatalogSuite.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark.sessioncatalog - -import org.opensearch.flint.spark.FlintSparkSuite - -import org.apache.spark.SparkConf - -/** - * Test with FlintDelegatingSessionCatalog. - */ -trait FlintSessionCatalogSuite extends FlintSparkSuite { - // Override catalog name - override lazy protected val catalogName: String = "mycatalog" - - override protected def sparkConf: SparkConf = { - val conf = super.sparkConf - .set("spark.sql.catalog.mycatalog", "org.opensearch.sql.FlintDelegatingSessionCatalog") - .set("spark.sql.defaultCatalog", catalogName) - conf - } -} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala deleted file mode 100644 index 25567ba4e..000000000 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogMaterializedViewITSuite.scala +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark.sessioncatalog - -import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite - -/** - * Test MaterializedView with FlintDelegatingSessionCatalog. - */ -class FlintSparkSessionCatalogMaterializedViewITSuite - extends FlintSparkMaterializedViewSqlITSuite - with FlintSessionCatalogSuite {} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala deleted file mode 100644 index 7b29d5883..000000000 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/sessioncatalog/FlintSparkSessionCatalogSkippingIndexITSuite.scala +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark.sessioncatalog - -import org.opensearch.flint.spark.FlintSparkSkippingIndexSqlITSuite - -/** - * Test Skipping Index with FlintDelegatingSessionCatalog. - */ -class FlintSparkSessionCatalogSkippingIndexITSuite - extends FlintSparkSkippingIndexSqlITSuite - with FlintSessionCatalogSuite {} diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala index fc8cb4f4a..f6be0b1c3 100644 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/FlintDelegatingSessionCatalogTest.scala @@ -45,10 +45,7 @@ class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessio test("query without catalog name") { sql("use mycatalog") - // Since Spark 3.4.0. https://issues.apache.org/jira/browse/SPARK-40055, listCatalogs should - // also return spark_catalog even spark_catalog implementation is defaultSessionCatalog - assert( - sql("SHOW CATALOGS").collect.toSet === Array(Row("mycatalog"), Row("spark_catalog")).toSet) + assert(sql("SHOW CATALOGS").collect === Array(Row("mycatalog"))) checkAnswer(sql(s"SELECT name, age FROM $testTableWithoutCatalog"), Seq(Row("Hello", 30))) }