From 6048236dfd1d51d14971d9a6b41cdeb6913030cc Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 25 Sep 2023 16:15:47 -0700 Subject: [PATCH] Add IT on Flint API and SQL Signed-off-by: Chen Dai --- .../FlintSparkSkippingIndexITSuite.scala | 5 +++- .../FlintSparkSkippingIndexSqlITSuite.scala | 24 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 7d5598cd5..1625dc2e4 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -115,7 +115,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .options(FlintSparkIndexOptions(Map( "auto_refresh" -> "true", "refresh_interval" -> "1 Minute", - "checkpoint_location" -> "s3a://test/" + "checkpoint_location" -> "s3a://test/", + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" ))) .create() @@ -124,6 +125,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { index.get.options.autoRefresh() shouldBe true index.get.options.refreshInterval() shouldBe Some("1 Minute") index.get.options.checkpointLocation() shouldBe Some("s3a://test/") + index.get.options.indexSettings() shouldBe + "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" } test("should not have ID column in index data") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 5846cab23..38bc20b20 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -6,7 +6,13 @@ package org.opensearch.flint.spark import scala.Option.empty +import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.JsonMethods.parse +import org.json4s.native.Serialization +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -75,6 +81,24 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { } } + test("create skipping index with index settings") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' + | ) + |""".stripMargin) + + // Check if the index settings is set to OS index + val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + val settings = parse(flintClient.getIndexMetadata(testIndex).getIndexSettings) + (settings \ "index.number_of_shards").extract[String] shouldBe "3" + (settings \ "index.number_of_replicas").extract[String] shouldBe "2" + } + test("create skipping index with manual refresh") { sql(s""" | CREATE SKIPPING INDEX ON $testTable