diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index da362d19a..d50f83073 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -6,7 +6,9 @@ package org.opensearch.flint.spark.skipping import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.json4s.native.JsonMethods.parse import org.mockito.Mockito.when +import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN import org.scalatest.matchers.must.Matchers.contain @@ -41,22 +43,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("boolean_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "boolean_col": { - | "type": "boolean" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "boolean_col": { + | "type": "boolean" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -66,22 +62,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("string_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "string_col": { - | "type": "keyword" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "string_col": { + | "type": "keyword" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -93,22 +83,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("varchar_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "varchar_col": { - | "type": "keyword" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "varchar_col": { + | "type": "keyword" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -118,22 +102,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("char_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "char_col": { - | "type": "keyword" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "char_col": { + | "type": "keyword" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -143,22 +121,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("long_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "long_col": { - | "type": "long" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "long_col": { + | "type": "long" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -168,22 +140,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("int_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "int_col": { - | "type": "integer" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "int_col": { + | "type": "integer" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -193,22 +159,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("short_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "short_col": { - | "type": "short" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "short_col": { + | "type": "short" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -218,22 +178,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("byte_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "byte_col": { - | "type": "byte" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "byte_col": { + | "type": "byte" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -243,22 +197,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("double_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "double_col": { - | "type": "double" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "double_col": { + | "type": "double" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -268,22 +216,16 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("float_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "float_col": { - | "type": "float" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "float_col": { + | "type": "float" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -293,23 +235,17 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("timestamp_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "timestamp_col": { - | "type": "date", - | "format": "strict_date_optional_time_nanos" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "timestamp_col": { + | "type": "date", + | "format": "strict_date_optional_time_nanos" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -319,56 +255,44 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("date_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "date_col": { - | "type": "date", - | "format": "strict_date" - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "date_col": { + | "type": "date", + | "format": "strict_date" + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } test("can build index for struct column") { val indexCol = mock[FlintSparkSkippingStrategy] - when(indexCol.outputSchema()).thenReturn( - Map("struct_col" -> "struct")) + when(indexCol.outputSchema()) + .thenReturn(Map("struct_col" -> "struct")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("struct_col").expr))) val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) - index.metadata().getContent should matchJson( + schemaShouldMatch( + index.metadata(), s"""{ - | "_meta": { - | "kind": "skipping", - | "indexedColumns": [{}], - | "source": "default.test" - | }, - | "properties": { - | "struct_col": { - | "properties": { - | "subfield1": { - | "type": "keyword" - | }, - | "subfield2": { - | "type": "integer" - | } - | } - | }, - | "file_path": { - | "type": "keyword" - | } - | } - | } + | "struct_col": { + | "properties": { + | "subfield1": { + | "type": "keyword" + | }, + | "subfield2": { + | "type": "integer" + | } + | } + | }, + | "file_path": { + | "type": "keyword" + | } + |} |""".stripMargin) } @@ -383,4 +307,9 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { new FlintSparkSkippingIndex("default.test", Seq.empty) } } + + private def schemaShouldMatch(metadata: FlintMetadata, expected: String): Unit = { + val actual = parse(metadata.getContent) \ "properties" + assert(actual == parse(expected)) + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 3f510c9d5..e399b43c1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -7,6 +7,7 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.OpenSearchSuite +import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex @@ -96,7 +97,7 @@ class FlintSparkSkippingIndexITSuite index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { - | "version": "0.1.0", + | "version": "${current()}", | "kind": "skipping", | "indexedColumns": [ | { @@ -456,6 +457,7 @@ class FlintSparkSkippingIndexITSuite index.get.metadata().getContent should matchJson( s"""{ | "_meta": { + | "version": "${current()}", | "kind": "skipping", | "indexedColumns": [ | {