diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 9b3327c38..efc0e0f34 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -7,9 +7,16 @@ package org.opensearch.flint.spark import java.sql.Timestamp +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.JsonMethods.parse +import org.json4s.native.Serialization +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -71,5 +78,55 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } + test("create materialized view with streaming job options") { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | auto_refresh = true, + | refresh_interval = '5 Seconds', + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' + | ) + |""".stripMargin) + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("5 Seconds") + index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + } + } + + test("create materialized view with index settings") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' + | ) + |""".stripMargin) + + // Check if the index setting option is set to OS index setting + val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + (settings \ "index.number_of_shards").extract[String] shouldBe "3" + (settings \ "index.number_of_replicas").extract[String] shouldBe "2" + } + + test("create materialized view if not exists") { + sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") + flint.describeIndex(testFlintIndex) shouldBe defined + + // Expect error without IF NOT EXISTS, otherwise success + the[IllegalStateException] thrownBy + sql(s"CREATE MATERIALIZED VIEW $testMvName AS $testQuery") + + sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) }