diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 0858616fa..7697e2d1f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -86,6 +86,8 @@ object FlintMetadata { builder.schema(parser.map()) } } + + builder.indexSettings(settings) builder.build() } catch { case e: Exception => @@ -114,6 +116,12 @@ object FlintMetadata { } } + def builder(): FlintMetadata.Builder = new Builder + + /** + * Flint index metadata builder that can be extended by subclass to provide more custom build + * method. + */ class Builder { private var version: FlintVersion = FlintVersion.current() private var name: String = "" @@ -125,68 +133,67 @@ object FlintMetadata { private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() private var indexSettings: String = null - // Setters for each field - def version(version: FlintVersion): Builder = { + def version(version: FlintVersion): this.type = { this.version = version this } - def name(name: String): Builder = { + def name(name: String): this.type = { this.name = name this } - def kind(kind: String): Builder = { + def kind(kind: String): this.type = { this.kind = kind this } - def source(source: String): Builder = { + def source(source: String): this.type = { this.source = source this } - def options(options: util.Map[String, AnyRef]): Builder = { + def options(options: util.Map[String, AnyRef]): this.type = { this.options = options this } - def addOption(key: String, value: AnyRef): Builder = { + def addOption(key: String, value: AnyRef): this.type = { this.options.put(key, value) this } - def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): Builder = { + def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): this.type = { this.indexedColumns = indexedColumns this } - def addIndexedColumn(indexCol: util.Map[String, AnyRef]): Builder = { + def addIndexedColumn(indexCol: util.Map[String, AnyRef]): this.type = { indexedColumns = indexedColumns :+ indexCol this } - def properties(properties: util.Map[String, AnyRef]): Builder = { + def properties(properties: util.Map[String, AnyRef]): this.type = { this.properties = properties this } - def addProperty(key: String, value: AnyRef): Builder = { + def addProperty(key: String, value: AnyRef): this.type = { properties.put(key, value) this } - def schema(schema: util.Map[String, AnyRef]): Builder = { + def schema(schema: util.Map[String, AnyRef]): this.type = { this.schema = schema this } - def addSchemaField(key: String, value: AnyRef): Builder = { + def addSchemaField(key: String, value: AnyRef): this.type = { schema.put(key, value) this } - def indexSettings(indexSettings: String): Builder = { + def indexSettings(indexSettings: String): this.type = { this.indexSettings = indexSettings this } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 4a4885ecb..ee4775b6a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -7,9 +7,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters._ -import org.json4s.{Formats, JArray, NoTypeHints} -import org.json4s.JsonAST.{JField, JObject} -import org.json4s.native.JsonMethods.parse +import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintMetadata @@ -87,7 +85,6 @@ class FlintSpark(val spark: SparkSession) { } } else { val metadata = index.metadata() - index.options.indexSettings().foreach(metadata.setIndexSettings) flintClient.createIndex(indexName, metadata) } } @@ -105,7 +102,7 @@ class FlintSpark(val spark: SparkSession) { def refreshIndex(indexName: String, mode: RefreshMode): Option[String] = { val index = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - val tableName = getSourceTableName(index) + val tableName = index.metadata().source // Write Flint index data to Flint data source (shared by both refresh modes for now) def writeFlintIndex(df: DataFrame): Unit = { @@ -224,39 +221,16 @@ class FlintSpark(val spark: SparkSession) { } } - // TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed - private def getSourceTableName(index: FlintSparkIndex): String = { - val json = parse(index.metadata().getContent) - (json \ "_meta" \ "source").extract[String] - } - - /* - * For now, deserialize skipping strategies out of Flint metadata json - * ex. extract Seq(Partition("year", "int"), ValueList("name")) from - * { "_meta": { "indexedColumns": [ {...partition...}, {...value list...} ] } } - * - */ private def deserialize(metadata: FlintMetadata): FlintSparkIndex = { - val meta = parse(metadata.getContent) \ "_meta" - val indexName = (meta \ "name").extract[String] - val tableName = (meta \ "source").extract[String] - val indexType = (meta \ "kind").extract[String] - val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray] val indexOptions = FlintSparkIndexOptions( - (meta \ "options") - .asInstanceOf[JObject] - .obj - .map { case JField(key, value) => - key -> value.values.toString - } - .toMap) + metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) - indexType match { + metadata.kind match { case SKIPPING_INDEX_TYPE => - val strategies = indexedColumns.arr.map { colInfo => - val skippingKind = SkippingKind.withName((colInfo \ "kind").extract[String]) - val columnName = (colInfo \ "columnName").extract[String] - val columnType = (colInfo \ "columnType").extract[String] + val strategies = metadata.indexedColumns.map { colInfo => + val skippingKind = SkippingKind.withName(getString(colInfo, "kind")) + val columnName = getString(colInfo, "columnName") + val columnType = getString(colInfo, "columnType") skippingKind match { case PARTITION => @@ -269,17 +243,21 @@ class FlintSpark(val spark: SparkSession) { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - new FlintSparkSkippingIndex(tableName, strategies, indexOptions) + new FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( - indexName, - tableName, - indexedColumns.arr.map { obj => - ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) + metadata.name, + metadata.source, + metadata.indexedColumns.map { colInfo => + getString(colInfo, "columnName") -> getString(colInfo, "columnType") }.toMap, indexOptions) } } + + private def getString(map: java.util.Map[String, AnyRef], key: String): String = { + map.get(key).asInstanceOf[String] + } } object FlintSpark { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala new file mode 100644 index 000000000..5b51520cb --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.json4s.JObject +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.spark.FlintSparkIndex.populateEnvToMetadata + +import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.types.StructType + +/** + * Flint Spark metadata builder with common build logic. + */ +class FlintSparkIndexMetadataBuilder(index: FlintSparkIndex) extends FlintMetadata.Builder { + + def schema(allFieldTypes: Map[String, String]): FlintSparkIndexMetadataBuilder = { + val catalogDDL = + allFieldTypes + .map { case (colName, colType) => s"$colName $colType not null" } + .mkString(",") + val struckType = StructType.fromDDL(catalogDDL) + + // Assume each value is an JSON Object + struckType.fields.foreach(field => { + val (fieldName, fieldType) = FlintDataType.serializeField(field) + val fieldTypeMap = + fieldType + .asInstanceOf[JObject] + .values + .mapValues { + case v: Map[_, _] => v.asJava + case other => other + } + .asJava + addSchemaField(fieldName, fieldTypeMap) + }) + this + } + + override def build(): FlintMetadata = { + // Common fields in all Flint Spark index + kind(index.kind) + name(index.name()) + options(index.options.options.mapValues(_.asInstanceOf[AnyRef]).asJava) + + val envs = populateEnvToMetadata + if (envs.nonEmpty) { + addProperty("env", envs.asJava) + } + + val settings = index.options.indexSettings() + if (settings.isDefined) { + indexSettings(settings.get) + } + super.build() + } +} + +object FlintSparkIndexMetadataBuilder { + + def builder(index: FlintSparkIndex): FlintSparkIndexMetadataBuilder = + new FlintSparkIndexMetadataBuilder(index) +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 3db325c3e..ef6203a8b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -5,19 +5,15 @@ package org.opensearch.flint.spark.covering -import org.json4s.{Formats, NoTypeHints} -import org.json4s.JsonAST.{JArray, JObject, JString} -import org.json4s.native.JsonMethods.{compact, parse, render} -import org.json4s.native.Serialization +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata} +import org.opensearch.flint.spark._ +import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.flint.datatype.FlintDataType -import org.apache.spark.sql.types.StructType /** * Flint covering index in Spark. @@ -38,62 +34,27 @@ case class FlintSparkCoveringIndex( require(indexedColumns.nonEmpty, "indexed columns must not be empty") - /** Required by json4s write function */ - implicit val formats: Formats = Serialization.formats(NoTypeHints) - override val kind: String = COVERING_INDEX_TYPE override def name(): String = getFlintIndexName(indexName, tableName) override def metadata(): FlintMetadata = { - new FlintMetadata(s"""{ - | "_meta": { - | "name": "$indexName", - | "kind": "$kind", - | "indexedColumns": $getMetaInfo, - | "source": "$tableName", - | "options": $getIndexOptions, - | "properties": $getIndexProperties - | }, - | "properties": $getSchema - | } - |""".stripMargin) + val builder = FlintSparkIndexMetadataBuilder.builder(this) + + indexedColumns.map { case (colName, colType) => + builder.addIndexedColumn( + Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava) + } + builder + .source(tableName) + .schema(indexedColumns) + .build() } override def build(df: DataFrame): DataFrame = { val colNames = indexedColumns.keys.toSeq df.select(colNames.head, colNames.tail: _*) } - - // TODO: refactor all these once Flint metadata spec finalized - private def getMetaInfo: String = { - val objects = indexedColumns.map { case (colName, colType) => - JObject("columnName" -> JString(colName), "columnType" -> JString(colType)) - }.toList - Serialization.write(JArray(objects)) - } - - private def getIndexOptions: String = { - Serialization.write(options.options) - } - - private def getIndexProperties: String = { - val envMap = populateEnvToMetadata - if (envMap.isEmpty) { - "{}" - } else { - s"""{ "env": ${Serialization.write(envMap)} }""" - } - } - - private def getSchema: String = { - val catalogDDL = - indexedColumns - .map { case (colName, colType) => s"$colName $colType not null" } - .mkString(",") - val properties = FlintDataType.serialize(StructType.fromDDL(catalogDDL)) - compact(render(parse(properties) \ "properties")) - } } object FlintSparkCoveringIndex { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index dd9cb6bdf..ef6636212 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -5,25 +5,20 @@ package org.opensearch.flint.spark.skipping -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization -import org.opensearch.flint.core.FlintVersion +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata, ID_COLUMN} +import org.opensearch.flint.spark._ +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression -import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.functions.{col, input_file_name, sha1} -import org.apache.spark.sql.types.StructType /** * Flint skipping index in Spark. @@ -41,9 +36,6 @@ class FlintSparkSkippingIndex( require(indexedColumns.nonEmpty, "indexed columns must not be empty") - /** Required by json4s write function */ - implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer - /** Skipping index type */ override val kind: String = SKIPPING_INDEX_TYPE @@ -52,19 +44,22 @@ class FlintSparkSkippingIndex( } override def metadata(): FlintMetadata = { - new FlintMetadata(s"""{ - | "_meta": { - | "name": "${name()}", - | "version": "${FlintVersion.current()}", - | "kind": "$SKIPPING_INDEX_TYPE", - | "indexedColumns": $getMetaInfo, - | "source": "$tableName", - | "options": $getIndexOptions, - | "properties": $getIndexProperties - | }, - | "properties": $getSchema - | } - |""".stripMargin) + val builder = FlintSparkIndexMetadataBuilder.builder(this) + + indexedColumns.map(col => + builder.addIndexedColumn( + Map[String, AnyRef]( + "kind" -> col.kind.toString, + "columnName" -> col.columnName, + "columnType" -> col.columnType).asJava)) + + val allFieldTypes = + indexedColumns + .flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") + builder + .source(tableName) + .schema(allFieldTypes) + .build() } override def build(df: DataFrame): DataFrame = { @@ -81,36 +76,6 @@ class FlintSparkSkippingIndex( .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) } - - private def getMetaInfo: String = { - Serialization.write(indexedColumns) - } - - private def getIndexOptions: String = { - Serialization.write(options.options) - } - - private def getIndexProperties: String = { - val envMap = populateEnvToMetadata - if (envMap.isEmpty) { - "{}" - } else { - s"""{ "env": ${Serialization.write(envMap)} }""" - } - } - - private def getSchema: String = { - val allFieldTypes = - indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") - val catalogDDL = - allFieldTypes - .map { case (colName, colType) => s"$colName $colType not null" } - .mkString(",") - val allFieldSparkTypes = StructType.fromDDL(catalogDDL) - // Convert StructType to {"properties": ...} and only need the properties value - val properties = FlintDataType.serialize(allFieldSparkTypes) - compact(render(parse(properties) \ "properties")) - } } object FlintSparkSkippingIndex { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index b31e18480..a3961bb51 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -5,11 +5,14 @@ package org.opensearch.flint.spark.skipping +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.json4s.native.JsonMethods.parse import org.mockito.Mockito.when import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.scalatestplus.mockito.MockitoSugar.mock @@ -27,6 +30,25 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test_skipping_index" } + test("get index metadata") { + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) + when(indexCol.columnName).thenReturn("test_field") + when(indexCol.columnType).thenReturn("integer") + when(indexCol.outputSchema()).thenReturn(Map("test_field" -> "integer")) + val index = new FlintSparkSkippingIndex(testTable, Seq(indexCol)) + + val metadata = index.metadata() + metadata.kind shouldBe SKIPPING_INDEX_TYPE + metadata.name shouldBe index.name() + metadata.source shouldBe testTable + metadata.indexedColumns shouldBe Array( + Map( + "kind" -> SkippingKind.PARTITION.toString, + "columnName" -> "test_field", + "columnType" -> "integer").asJava) + } + test("can build index building job with unique ID column") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) @@ -40,6 +62,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for boolean column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("boolean_col" -> "boolean")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("boolean_col").expr))) @@ -59,6 +82,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for string column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("string_col" -> "string")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("string_col").expr))) @@ -80,6 +104,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for varchar column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("varchar_col" -> "varchar(20)")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("varchar_col").expr))) @@ -99,6 +124,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for char column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("char_col" -> "char(20)")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("char_col").expr))) @@ -118,6 +144,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for long column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("long_col" -> "bigint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("long_col").expr))) @@ -137,6 +164,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for int column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("int_col" -> "int")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("int_col").expr))) @@ -156,6 +184,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for short column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("short_col" -> "smallint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("short_col").expr))) @@ -175,6 +204,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for byte column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("byte_col" -> "tinyint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("byte_col").expr))) @@ -194,6 +224,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for double column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("double_col" -> "double")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("double_col").expr))) @@ -213,6 +244,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for float column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("float_col" -> "float")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("float_col").expr))) @@ -232,6 +264,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for timestamp column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("timestamp_col" -> "timestamp")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("timestamp_col").expr))) @@ -252,6 +285,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for date column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("date_col" -> "date")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("date_col").expr))) @@ -272,6 +306,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for struct column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()) .thenReturn(Map("struct_col" -> "struct")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("struct_col").expr))) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 9d34b6f2a..d33343d81 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -11,6 +11,7 @@ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.mockito.Mockito.when import org.opensearch.client.json.jackson.JacksonJsonpMapper import org.opensearch.client.opensearch.OpenSearchClient import org.opensearch.client.transport.rest_client.RestClientTransport @@ -19,6 +20,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY @@ -34,7 +36,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val content = """ { | "_meta": { - | "kind": "SkippingIndex" + | "kind": "test_kind" | }, | "properties": { | "age": { @@ -43,41 +45,49 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M | } | } |""".stripMargin - flintClient.createIndex(indexName, new FlintMetadata(content)) + + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn(content) + flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName).getContent should matchJson(content) + flintClient.getIndexMetadata(indexName).kind shouldBe "test_kind" } it should "create index with settings" in { val indexName = "flint_test_with_settings" val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" - flintClient.createIndex(indexName, new FlintMetadata("{}", indexSettings)) + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(indexSettings) + flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true // OS uses full setting name ("index" prefix) and store as string implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(indexName).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } it should "get all index metadata with the given index name pattern" in { - flintClient.createIndex("flint_test_1_index", new FlintMetadata("{}")) - flintClient.createIndex("flint_test_2_index", new FlintMetadata("{}")) + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + flintClient.createIndex("flint_test_1_index", metadata) + flintClient.createIndex("flint_test_2_index", metadata) val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 - allMetadata.forEach(metadata => metadata.getContent shouldBe "{}") - allMetadata.forEach(metadata => metadata.getIndexSettings should not be empty) + allMetadata.forEach(metadata => metadata.getContent should not be empty) + allMetadata.forEach(metadata => metadata.indexSettings should not be empty) } it should "convert index name to all lowercase" in { val indexName = "flint_ELB_logs_index" flintClient.createIndex( indexName, - new FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + FlintMetadata.fromJson("""{"properties": {"test": { "type": "integer" } } }""", null)) flintClient.exists(indexName) shouldBe true flintClient.getIndexMetadata(indexName) should not be null diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 892a8faa4..e197c0f53 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -89,7 +89,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testFlintIndex).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index a688b1370..45af12047 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -94,7 +94,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testIndex).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" }