Skip to content

Commit

Permalink
Move the decimal and map IT to the base suite instead of the iceberg …
Browse files Browse the repository at this point in the history
…suite

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Dec 1, 2024
1 parent c1d4c8e commit 3ce421b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -523,5 +523,45 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("create materialized view with decimal and map types") {
val decimalAndMapTable = s"$catalogName.default.mv_test_decimal_map"
val decimalAndMapMv = s"$catalogName.default.mv_test_decimal_map_ser"
withTable(decimalAndMapTable) {
createMapAndDecimalTimeSeriesTable(decimalAndMapTable)

withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $decimalAndMapMv
| AS
| SELECT
| base_score, mymap
| FROM $decimalAndMapTable
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val flintIndex = getFlintIndexName(decimalAndMapMv)
val job = spark.streams.active.find(_.name == flintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(flintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(flintIndex).select("base_score", "mymap"),
Seq(
Row(3.1415926, Row(null, null, null, null, "mapvalue1")),
Row(4.1415926, Row("mapvalue2", null, null, null, null)),
Row(5.1415926, Row(null, null, "mapvalue3", null, null)),
Row(6.1415926, Row(null, null, null, "mapvalue4", null)),
Row(7.1415926, Row(null, "mapvalue5", null, null, null))))
}
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,10 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createIcebergTimeSeriesTable(testTable: String): Unit = {
protected def createMapAndDecimalTimeSeriesTable(testTable: String): Unit = {
// CSV tables do not support MAP types so we use JSON instead
val finalTableType = if (tableType == "CSV") "JSON" else tableType

sql(s"""
| CREATE TABLE $testTable
| (
Expand All @@ -455,7 +458,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| base_score DECIMAL(8, 7),
| mymap MAP<STRING, STRING>
| )
| USING ICEBERG
| USING $finalTableType $tableOptions
|""".stripMargin)

sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,67 +6,7 @@
package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkIcebergMaterializedViewITSuite
extends FlintSparkMaterializedViewSqlITSuite
with FlintSparkIcebergSuite {

private val testTable = s"$catalogName.default.mv_test_iceberg"
private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testQuery =
s"""
| SELECT
| base_score, mymap
| FROM $testTable
|""".stripMargin

override def beforeAll(): Unit = {
super.beforeAll()
createIcebergTimeSeriesTable(testTable)
}

override def afterAll(): Unit = {
deleteTestIndex(testFlintIndex)
sql(s"DROP TABLE $testTable")
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)
conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key)
super.afterAll()
}

test("create materialized view with decimal and map types") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(testFlintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(testFlintIndex).select("base_score", "mymap"),
Seq(
Row(3.1415926, Row(null, null, null, null, "mapvalue1")),
Row(4.1415926, Row("mapvalue2", null, null, null, null)),
Row(5.1415926, Row(null, null, "mapvalue3", null, null)),
Row(6.1415926, Row(null, null, null, "mapvalue4", null)),
Row(7.1415926, Row(null, "mapvalue5", null, null, null))))
}
}
}
with FlintSparkIcebergSuite {}

0 comments on commit 3ce421b

Please sign in to comment.