Skip to content

Commit

Permalink
Avoid naming conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Nov 20, 2024
1 parent c12831e commit 264ab07
Showing 1 changed file with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,50 @@ class FlintSparkIcebergMaterializedViewITSuite
extends FlintSparkMaterializedViewSqlITSuite
with FlintSparkIcebergSuite {

private val testTable = s"$catalogName.default.mv_test_iceberg"
private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testQuery =
private val icebergTestTable = s"$catalogName.default.mv_test_iceberg"
private val icebergTestMvName = s"$catalogName.default.mv_test_iceberg_metrics"
private val icebergTestFlintIndex = getFlintIndexName(icebergTestMvName)
private val icebergTestQuery =
s"""
| SELECT
| mymap
| FROM $testTable
| FROM $icebergTestTable
|""".stripMargin

override def beforeEach(): Unit = {
super.beforeAll()
createIcebergTimeSeriesTable(testTable)
createIcebergTimeSeriesTable(icebergTestTable)
}

override def afterEach(): Unit = {
super.afterEach()
deleteTestIndex(testFlintIndex)
sql(s"DROP TABLE $testTable")
deleteTestIndex(icebergTestFlintIndex)
sql(s"DROP TABLE $icebergTestTable")
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)
conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key)
}

test("create materialized view with map type") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| CREATE MATERIALIZED VIEW $icebergTestMvName
| AS $icebergTestQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
val job = spark.streams.active.find(_.name == icebergTestFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(testFlintIndex) shouldBe defined
flint.describeIndex(icebergTestFlintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(testFlintIndex).select("mymap"),
flint.queryIndex(icebergTestFlintIndex).select("mymap"),
Seq(
Row(Row("mapvalue2", null, null, null, null)),
Row(Row(null, "mapvalue5", null, null, null)),
Expand Down

0 comments on commit 264ab07

Please sign in to comment.