Skip to content

Commit

Permalink
Add more IT for create statement
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 18, 2023
1 parent 06653b0 commit 35e95ab
Showing 1 changed file with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ package org.opensearch.flint.spark

import java.sql.Timestamp

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row

Expand Down Expand Up @@ -71,5 +78,55 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("create materialized view with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| refresh_interval = '5 Seconds',
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
| )
|""".stripMargin)

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined
index.get.options.autoRefresh() shouldBe true
index.get.options.refreshInterval() shouldBe Some("5 Seconds")
index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)
}
}

test("create materialized view with index settings") {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
| )
|""".stripMargin)

// Check if the index setting option is set to OS index setting
val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get)
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create materialized view if not exists") {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
flint.describeIndex(testFlintIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
the[IllegalStateException] thrownBy
sql(s"CREATE MATERIALIZED VIEW $testMvName AS $testQuery")

sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}

0 comments on commit 35e95ab

Please sign in to comment.