Skip to content

Commit

Permalink
address SPARK-40013
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed May 14, 2024
1 parent 03d5057 commit 27690f9
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ case class FlintScan(
* Print pushedPredicates when explain(mode="extended"). Learn from SPARK JDBCScan.
*/
override def description(): String = {
super.description() + ", PushedPredicates: " + seqToString(pushedPredicates)
super.description() + ", PushedPredicates: " + pushedPredicates
.map {
case p if p.name().equalsIgnoreCase("BLOOM_FILTER_MIGHT_CONTAIN") => p.name()
case p => p.toString()
}
.mkString("[", ", ", "]")
}

private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
}

// todo. add partition support.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ case class FlintScanBuilder(tableName: String, schema: StructType, options: Flin
unSupported
}

override def pushedPredicates(): Array[Predicate] = pushedPredicate
override def pushedPredicates(): Array[Predicate] = pushedPredicate.filterNot(_.name()
.equalsIgnoreCase("BLOOM_FILTER_MIGHT_CONTAIN"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

/** Test table and index name */
private val testTable = "spark_catalog.default.skipping_test"
private val testTable = s"${catalogName}.default.skipping_test"
private val testIndex = getSkippingIndexName(testTable)
private val testLatestId = Base64.getEncoder.encodeToString(testIndex.getBytes)

Expand Down Expand Up @@ -62,7 +62,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""{
| "_meta": {
| "name": "flint_spark_catalog_default_skipping_test_skipping_index",
| "name": "flint_${catalogName}_default_skipping_test_skipping_index",
| "version": "${current()}",
| "kind": "skipping",
| "indexedColumns": [
Expand Down Expand Up @@ -100,7 +100,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnName": "name",
| "columnType": "string"
| }],
| "source": "spark_catalog.default.skipping_test",
| "source": "${catalogName}.default.skipping_test",
| "options": {
| "auto_refresh": "false",
| "incremental_refresh": "false"
Expand Down Expand Up @@ -548,7 +548,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

test("create skipping index for all supported data types successfully") {
// Prepare test table
val testTable = "spark_catalog.default.data_type_table"
val testTable = s"$catalogName.default.data_type_table"
val testIndex = getSkippingIndexName(testTable)
val testLatestId = Base64.getEncoder.encodeToString(testIndex.getBytes)
sql(s"""
Expand Down Expand Up @@ -612,7 +612,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""{
| "_meta": {
| "name": "flint_spark_catalog_default_data_type_table_skipping_index",
| "name": "flint_${catalogName}_default_data_type_table_skipping_index",
| "version": "${current()}",
| "kind": "skipping",
| "indexedColumns": [
Expand Down Expand Up @@ -762,7 +762,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build skipping index for varchar and char and rewrite applicable query") {
val testTable = "spark_catalog.default.varchar_char_table"
val testTable = s"${catalogName}.default.varchar_char_table"
val testIndex = getSkippingIndexName(testTable)
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
lazy protected val flint: FlintSpark = new FlintSpark(spark)
lazy protected val tableType: String = "CSV"
lazy protected val tableOptions: String = "OPTIONS (header 'false', delimiter '\t')"
lazy protected val catalogName: String = "spark_catalog"

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sessioncatalog

import org.opensearch.flint.spark.FlintSparkSuite

import org.apache.spark.SparkConf

trait FlintSessionCatalogSuit extends FlintSparkSuite {
// Override catalog name
override lazy protected val catalogName: String = "mycatalog"

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
// Set Iceberg-specific Spark configurations
.set("spark.sql.catalog.mycatalog", "org.opensearch.sql.FlintDelegatingSessionCatalog")
.set("spark.sql.defaultCatalog", s"${catalogName}")
conf
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sessioncatalog

import org.opensearch.flint.spark.FlintSparkSkippingIndexITSuite

class FlintSparkSkippingIndexSessionCatalogITSuite
extends FlintSparkSkippingIndexITSuite
with FlintSessionCatalogSuit {}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class FlintDelegatingSessionCatalogTest extends QueryTest with SharedSparkSessio

test("query without catalog name") {
sql("use mycatalog")
assert(sql("SHOW CATALOGS").collect === Array(Row("mycatalog")))
// Since Spark 3.4.0. https://issues.apache.org/jira/browse/SPARK-40055, listCatalogs should
// also return spark_catalog even spark_catalog implementation is defaultSessionCatalog
assert(
sql("SHOW CATALOGS").collect.toSet === Array(Row("mycatalog"), Row("spark_catalog")).toSet)

checkAnswer(sql(s"SELECT name, age FROM $testTableWithoutCatalog"), Seq(Row("Hello", 30)))
}
Expand Down

0 comments on commit 27690f9

Please sign in to comment.