Skip to content

Commit

Permalink
Support Spark 3.4.1
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed May 14, 2024
1 parent 422dae7 commit 44539f4
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 16 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Version compatibility:
| 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.4.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.5.0 | 11+ | 3.4.1 | 2.12.14 | 2.13+ |

## Flint Extension Usage

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import Dependencies._

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
lazy val sparkVersion = "3.4.1"
lazy val opensearchVersion = "2.6.0"
lazy val icebergVersion = "1.5.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.commons.lang.StringUtils;

import com.amazonaws.services.cloudwatch.model.Dimension;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkEnv;

/**
Expand Down Expand Up @@ -124,4 +124,4 @@ private static Dimension getEnvironmentVariableDimension(String envVarName, Stri
private static Dimension getDefaultDimension(String dimensionName) {
return getEnvironmentVariableDimension(dimensionName, dimensionName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.metrics.sink.CloudWatchSink.DimensionNameGroups;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.apache.spark.sql.flint

import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain

import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.flint.config.FlintSparkConf
Expand Down Expand Up @@ -34,10 +36,13 @@ case class FlintScan(
* Print pushedPredicates when explain(mode="extended"). Learn from SPARK JDBCScan.
*/
override def description(): String = {
super.description() + ", PushedPredicates: " + seqToString(pushedPredicates)
super.description() + ", PushedPredicates: " + pushedPredicates
.map {
case p if p.name().equalsIgnoreCase(BloomFilterMightContain.NAME) => p.name()
case p => p.toString()
}
.mkString("[", ", ", "]")
}

private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
}

// todo. add partition support.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.apache.spark.sql.flint

import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain

import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownV2Filters}
Expand All @@ -31,4 +33,5 @@ case class FlintScanBuilder(tableName: String, schema: StructType, options: Flin
}

override def pushedPredicates(): Array[Predicate] = pushedPredicate
.filterNot(_.name().equalsIgnoreCase(BloomFilterMightContain.NAME))
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,17 +420,17 @@ class FlintJacksonParser(
case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
dataType match {
case FloatType | DoubleType | TimestampType | DateType =>
throw QueryExecutionErrors.failToParseEmptyStringForDataTypeError(dataType)
throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
case _ => null
}

case VALUE_STRING if parser.getTextLength < 1 =>
throw QueryExecutionErrors.failToParseEmptyStringForDataTypeError(dataType)
throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)

case token =>
// We cannot parse this token based on the given data type. So, we throw a
// RuntimeException and this exception will be caught by `parse` method.
throw QueryExecutionErrors.failToParseValueForDataTypeError(parser, token, dataType)
throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, dataType)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping.bloomfilter
import java.io.ByteArrayInputStream

import org.opensearch.flint.core.field.bloomfilter.classic.ClassicBloomFilter
import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain.NAME

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -40,7 +41,7 @@ case class BloomFilterMightContain(bloomFilterExpression: Expression, valueExpre

override def dataType: DataType = BooleanType

override def symbol: String = "BLOOM_FILTER_MIGHT_CONTAIN"
override def symbol: String = NAME

override def checkInputDataTypes(): TypeCheckResult = {
(left.dataType, right.dataType) match {
Expand Down Expand Up @@ -109,6 +110,8 @@ case class BloomFilterMightContain(bloomFilterExpression: Expression, valueExpre

object BloomFilterMightContain {

val NAME = "BLOOM_FILTER_MIGHT_CONTAIN"

/**
* Generate bloom filter might contain function given the bloom filter column and value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
Matcher { (plan: LogicalPlan) =>
val result = plan.exists {
case LogicalRelation(_, _, Some(table), _) =>
// Table name in logical relation doesn't have catalog name
table.qualifiedName == expectedTableName.split('.').drop(1).mkString(".")
// Since Spark 3.4, Table name in logical relation have catalog name
table.qualifiedName == expectedTableName
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,13 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
// CharType column is padded to a fixed length with whitespace
val paddedChar = "sample char".padTo(20, ' ')
checkAnswer(query, Row("sample varchar", paddedChar))
/*
* todo Spark 3.4 add read-side padding, SkippingIndex rule can not push down char_col plan now.
* https://issues.apache.org/jira/browse/SPARK-40697
*/
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter((isnull(col("varchar_col")) || col("varchar_col") === "sample varchar") &&
(isnull(col("char_col")) || col("char_col") === paddedChar)))
hasIndexFilter(isnull(col("varchar_col")) || col("varchar_col") === "sample varchar"))

deleteTestIndex(testIndex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessio

test("query without catalog name") {
sql("use mycatalog")
assert(sql("SHOW CATALOGS").collect === Array(Row("mycatalog")))
// Since Spark 3.4.0. https://issues.apache.org/jira/browse/SPARK-40055, listCatalogs should
// also return spark_catalog even spark_catalog implementation is defaultSessionCatalog
assert(
sql("SHOW CATALOGS").collect.toSet === Array(Row("mycatalog"), Row("spark_catalog")).toSet)

checkAnswer(sql(s"SELECT name, age FROM $testTableWithoutCatalog"), Seq(Row("Hello", 30)))
}
Expand Down

0 comments on commit 44539f4

Please sign in to comment.