diff --git a/build.sbt b/build.sbt index bc018c265..9389384fd 100644 --- a/build.sbt +++ b/build.sbt @@ -58,7 +58,12 @@ lazy val flintCore = (project in file("flint-core")) "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion exclude ("org.apache.logging.log4j", "log4j-api"), "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" - exclude ("com.fasterxml.jackson.core", "jackson-databind")), + exclude ("com.fasterxml.jackson.core", "jackson-databind"), + "org.scalactic" %% "scalactic" % "3.2.15" % "test", + "org.scalatest" %% "scalatest" % "3.2.15" % "test", + "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", + "org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test", + "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"), publish / skip := true) lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) diff --git a/docs/index.md b/docs/index.md index 1f51c2d82..8afdc1fbc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -32,20 +32,17 @@ Currently, Flint metadata is only static configuration without version control a ```json { - "version": "0.1", - "indexConfig": { - "kind": "skipping", - "properties": { - "indexedColumns": [{ - "kind": "...", - "columnName": "...", - "columnType": "..." - }] - } - }, - "source": "alb_logs", - "state": "active", - "enabled": true + "version": "0.1.0", + "name": "...", + "kind": "skipping", + "source": "...", + "indexedColumns": [{ + "kind": "...", + "columnName": "...", + "columnType": "..." + }], + "options": { }, + "properties": { } } ``` diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala new file mode 100644 index 000000000..4c1991edc --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata + +import java.nio.charset.StandardCharsets.UTF_8 + +import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.xcontent._ +import org.opensearch.common.xcontent.json.JsonXContent + +/** + * JSON parsing and building helper. + */ +object FlintJsonHelper { + + /** + * Build JSON by creating JSON builder and pass it to the given function. + * + * @param block + * building logic with JSON builder + * @return + * JSON string + */ + def buildJson(block: XContentBuilder => Unit): String = { + val builder: XContentBuilder = XContentFactory.jsonBuilder + builder.startObject + block(builder) + builder.endObject() + BytesReference.bytes(builder).utf8ToString + } + + /** + * Add an object field of the name to the JSON builder and continue building it with the given + * function. + * + * @param builder + * JSON builder + * @param name + * field name + * @param block + * building logic on the JSON field + */ + def objectField(builder: XContentBuilder, name: String)(block: => Unit): Unit = { + builder.startObject(name) + block + builder.endObject() + } + + /** + * Add an optional object field of the name to the JSON builder. Add an empty object field if + * the value is null. + * + * @param builder + * JSON builder + * @param name + * field name + * @param value + * field value + */ + def optionalObjectField(builder: XContentBuilder, name: String, value: AnyRef): Unit = { + if (value == null) { + builder.startObject(name).endObject() + } else { + builder.field(name, value) + } + } + + /** + * Create a XContent JSON parser on the given JSON string. + * + * @param json + * JSON string + * @return + * JSON parser + */ + def createJsonParser(json: String): XContentParser = { + JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + json.getBytes(UTF_8)) + } + + /** + * Parse the given JSON string by creating JSON parser and pass it to the parsing function. + * + * @param json + * JSON string + * @param block + * parsing logic with the parser + */ + def parseJson(json: String)(block: (XContentParser, String) => Unit): Unit = { + val parser = createJsonParser(json) + + // Read first root object token and start parsing + parser.nextToken() + parseObjectField(parser)(block) + } + + /** + * Parse each inner field in the object field with the given parsing function. + * + * @param parser + * JSON parser + * @param block + * parsing logic on each inner field + */ + def parseObjectField(parser: XContentParser)(block: (XContentParser, String) => Unit): Unit = { + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName: String = parser.currentName() + parser.nextToken() // Move to the field value + + block(parser, fieldName) + } + } + + /** + * Parse each inner field in the array field. + * + * @param parser + * JSON parser + * @param block + * parsing logic on each inner field + */ + def parseArrayField(parser: XContentParser)(block: => Unit): Unit = { + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + block + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java deleted file mode 100644 index 6773c3897..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.metadata; - -/** - * Flint metadata follows Flint index specification and defines metadata - * for a Flint index regardless of query engine integration and storage. - */ -public class FlintMetadata { - - // TODO: define metadata format and create strong-typed class - private final String content; - - // TODO: piggyback optional index settings and will refactor as above - private String indexSettings; - - public FlintMetadata(String content) { - this.content = content; - } - - public FlintMetadata(String content, String indexSettings) { - this.content = content; - this.indexSettings = indexSettings; - } - - public String getContent() { - return content; - } - - public String getIndexSettings() { - return indexSettings; - } - - public void setIndexSettings(String indexSettings) { - this.indexSettings = indexSettings; - } -} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala new file mode 100644 index 000000000..ea0fb0f98 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -0,0 +1,232 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata + +import java.util + +import org.opensearch.flint.core.FlintVersion +import org.opensearch.flint.core.FlintVersion.current +import org.opensearch.flint.core.metadata.FlintJsonHelper._ + +/** + * Flint metadata follows Flint index specification and defines metadata for a Flint index + * regardless of query engine integration and storage. + */ +case class FlintMetadata( + /** Flint spec version */ + version: FlintVersion, + /** Flint index name */ + name: String, + /** Flint index kind */ + kind: String, + /** Flint index source that index data derived from */ + source: String, + /** Flint indexed column list */ + indexedColumns: Array[util.Map[String, AnyRef]] = Array(), + /** Flint indexed options. TODO: move to properties? */ + options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + /** Flint index properties for any custom fields */ + properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + /** Flint index schema */ + schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + /** Optional Flint index settings. TODO: move elsewhere? */ + indexSettings: Option[String]) { + + require(version != null, "version is required") + require(name != null, "name is required") + require(kind != null, "kind is required") + require(source != null, "source is required") + + /** + * Generate JSON content as index metadata. + * + * @return + * JSON content + */ + def getContent: String = { + try { + buildJson(builder => { + // Add _meta field + objectField(builder, "_meta") { + builder + .field("version", version.version) + .field("name", name) + .field("kind", kind) + .field("source", source) + .field("indexedColumns", indexedColumns) + + optionalObjectField(builder, "options", options) + optionalObjectField(builder, "properties", properties) + } + + // Add properties (schema) field + builder.field("properties", schema) + }) + } catch { + case e: Exception => + throw new IllegalStateException("Failed to jsonify Flint metadata", e) + } + } +} + +object FlintMetadata { + + /** + * Construct Flint metadata with JSON content and index settings. + * + * @param content + * JSON content + * @param settings + * index settings + * @return + * Flint metadata + */ + def apply(content: String, settings: String): FlintMetadata = { + val metadata = FlintMetadata(content) + metadata.copy(indexSettings = Option(settings)) + } + + /** + * Parse the given JSON content and construct Flint metadata class. + * + * @param content + * JSON content + * @return + * Flint metadata + */ + def apply(content: String): FlintMetadata = { + try { + val builder = new FlintMetadata.Builder() + parseJson(content) { (parser, fieldName) => + { + fieldName match { + case "_meta" => + parseObjectField(parser) { (parser, innerFieldName) => + { + innerFieldName match { + case "version" => builder.version(FlintVersion.apply(parser.text())) + case "name" => builder.name(parser.text()) + case "kind" => builder.kind(parser.text()) + case "source" => builder.source(parser.text()) + case "indexedColumns" => + parseArrayField(parser) { + builder.addIndexedColumn(parser.map()) + } + case "options" => builder.options(parser.map()) + case "properties" => builder.properties(parser.map()) + case _ => // Handle other fields as needed + } + } + } + case "properties" => + builder.schema(parser.map()) + } + } + } + builder.build() + } catch { + case e: Exception => + throw new IllegalStateException("Failed to parse metadata JSON", e) + } + } + + def builder(): FlintMetadata.Builder = new Builder + + /** + * Flint index metadata builder that can be extended by subclass to provide more custom build + * method. + */ + class Builder { + private var version: FlintVersion = FlintVersion.current() + private var name: String = "" + private var kind: String = "" + private var source: String = "" + private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + private var indexedColumns: Array[util.Map[String, AnyRef]] = Array() + private var properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + private var indexSettings: Option[String] = None + + def version(version: FlintVersion): this.type = { + this.version = version + this + } + + def name(name: String): this.type = { + this.name = name + this + } + + def kind(kind: String): this.type = { + this.kind = kind + this + } + + def source(source: String): this.type = { + this.source = source + this + } + + def options(options: util.Map[String, AnyRef]): this.type = { + this.options = options + this + } + + def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): this.type = { + this.indexedColumns = indexedColumns + this + } + + def addIndexedColumn(indexCol: util.Map[String, AnyRef]): this.type = { + indexedColumns = indexedColumns :+ indexCol + this + } + + def properties(properties: util.Map[String, AnyRef]): this.type = { + this.properties = properties + this + } + + def addProperty(key: String, value: AnyRef): this.type = { + properties.put(key, value) + this + } + + def schema(schema: util.Map[String, AnyRef]): this.type = { + this.schema = schema + this + } + + def schema(schema: String): this.type = { + parseJson(schema) { (parser, fieldName) => + fieldName match { + case "properties" => this.schema = parser.map() + case _ => // do nothing + } + } + this + } + + def indexSettings(indexSettings: String): this.type = { + this.indexSettings = Option(indexSettings) + this + } + + // Build method to create the FlintMetadata instance + def build(): FlintMetadata = { + FlintMetadata( + if (version == null) current() else version, + name, + kind, + source, + indexedColumns, + options, + properties, + schema, + indexSettings) + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index b973385d8..4badfe8f4 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -47,6 +47,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.search.SearchModule; import org.opensearch.search.builder.SearchSourceBuilder; +import scala.Option; /** * Flint client implementation for OpenSearch storage. @@ -73,8 +74,9 @@ public FlintOpenSearchClient(FlintOptions options) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(metadata.getContent(), XContentType.JSON); - if (metadata.getIndexSettings() != null) { - request.settings(metadata.getIndexSettings(), XContentType.JSON); + Option settings = metadata.indexSettings(); + if (settings.isDefined()) { + request.settings(settings.get(), XContentType.JSON); } client.indices().create(request, RequestOptions.DEFAULT); } catch (Exception e) { @@ -98,7 +100,7 @@ public FlintOpenSearchClient(FlintOptions options) { GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) - .map(index -> new FlintMetadata( + .map(index -> FlintMetadata.apply( response.getMappings().get(index).source().toString(), response.getSettings().get(index).toString())) .collect(Collectors.toList()); @@ -115,7 +117,7 @@ public FlintOpenSearchClient(FlintOptions options) { MappingMetadata mapping = response.getMappings().get(osIndexName); Settings settings = response.getSettings().get(osIndexName); - return new FlintMetadata(mapping.source().string(), settings.toString()); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala new file mode 100644 index 000000000..dc2f5fe6a --- /dev/null +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.opensearch.flint.core.FlintVersion.current +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FlintMetadataSuite extends AnyFlatSpec with Matchers { + + /** Test Flint index meta JSON string */ + val testMetadataJson: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": {}, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + val testIndexSettingsJson: String = + """ + | { "number_of_shards": 3 } + |""".stripMargin + + "constructor" should "deserialize the given JSON and assign parsed value to field" in { + val metadata = FlintMetadata(testMetadataJson, testIndexSettingsJson) + + metadata.version shouldBe current() + metadata.name shouldBe "test_index" + metadata.kind shouldBe "test_kind" + metadata.source shouldBe "test_source_table" + metadata.indexedColumns shouldBe Array(Map("test_field" -> "spark_type").asJava) + metadata.schema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava + } + + "getContent" should "serialize all fields to JSON" in { + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava); + builder.schema("""{"properties": {"test_field": {"type": "os_type"}}}""") + + val metadata = builder.build() + metadata.getContent should matchJson(testMetadataJson) + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 4a4885ecb..ee4775b6a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -7,9 +7,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters._ -import org.json4s.{Formats, JArray, NoTypeHints} -import org.json4s.JsonAST.{JField, JObject} -import org.json4s.native.JsonMethods.parse +import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintMetadata @@ -87,7 +85,6 @@ class FlintSpark(val spark: SparkSession) { } } else { val metadata = index.metadata() - index.options.indexSettings().foreach(metadata.setIndexSettings) flintClient.createIndex(indexName, metadata) } } @@ -105,7 +102,7 @@ class FlintSpark(val spark: SparkSession) { def refreshIndex(indexName: String, mode: RefreshMode): Option[String] = { val index = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - val tableName = getSourceTableName(index) + val tableName = index.metadata().source // Write Flint index data to Flint data source (shared by both refresh modes for now) def writeFlintIndex(df: DataFrame): Unit = { @@ -224,39 +221,16 @@ class FlintSpark(val spark: SparkSession) { } } - // TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed - private def getSourceTableName(index: FlintSparkIndex): String = { - val json = parse(index.metadata().getContent) - (json \ "_meta" \ "source").extract[String] - } - - /* - * For now, deserialize skipping strategies out of Flint metadata json - * ex. extract Seq(Partition("year", "int"), ValueList("name")) from - * { "_meta": { "indexedColumns": [ {...partition...}, {...value list...} ] } } - * - */ private def deserialize(metadata: FlintMetadata): FlintSparkIndex = { - val meta = parse(metadata.getContent) \ "_meta" - val indexName = (meta \ "name").extract[String] - val tableName = (meta \ "source").extract[String] - val indexType = (meta \ "kind").extract[String] - val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray] val indexOptions = FlintSparkIndexOptions( - (meta \ "options") - .asInstanceOf[JObject] - .obj - .map { case JField(key, value) => - key -> value.values.toString - } - .toMap) + metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) - indexType match { + metadata.kind match { case SKIPPING_INDEX_TYPE => - val strategies = indexedColumns.arr.map { colInfo => - val skippingKind = SkippingKind.withName((colInfo \ "kind").extract[String]) - val columnName = (colInfo \ "columnName").extract[String] - val columnType = (colInfo \ "columnType").extract[String] + val strategies = metadata.indexedColumns.map { colInfo => + val skippingKind = SkippingKind.withName(getString(colInfo, "kind")) + val columnName = getString(colInfo, "columnName") + val columnType = getString(colInfo, "columnType") skippingKind match { case PARTITION => @@ -269,17 +243,21 @@ class FlintSpark(val spark: SparkSession) { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - new FlintSparkSkippingIndex(tableName, strategies, indexOptions) + new FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( - indexName, - tableName, - indexedColumns.arr.map { obj => - ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) + metadata.name, + metadata.source, + metadata.indexedColumns.map { colInfo => + getString(colInfo, "columnName") -> getString(colInfo, "columnType") }.toMap, indexOptions) } } + + private def getString(map: java.util.Map[String, AnyRef], key: String): String = { + map.get(key).asInstanceOf[String] + } } object FlintSpark { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index a19e603dc..a130821bd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -5,9 +5,13 @@ package org.opensearch.flint.spark +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.types.StructType /** * Flint index interface in Spark. @@ -81,4 +85,42 @@ object FlintSparkIndex { .map(value => key -> value)) .toMap } + + /** + * Create Flint metadata builder with common fields. + * + * @param index + * Flint index + * @return + * Flint metadata builder + */ + def metadataBuilder(index: FlintSparkIndex): FlintMetadata.Builder = { + val builder = new FlintMetadata.Builder() + // Common fields + builder.kind(index.kind) + builder.options(index.options.optionsWithDefault.mapValues(_.asInstanceOf[AnyRef]).asJava) + + // Index properties + val envs = populateEnvToMetadata + if (envs.nonEmpty) { + builder.addProperty("env", envs.asJava) + } + + // Optional index settings + val settings = index.options.indexSettings() + if (settings.isDefined) { + builder.indexSettings(settings.get) + } + builder + } + + def generateSchemaJSON(allFieldTypes: Map[String, String]): String = { + val catalogDDL = + allFieldTypes + .map { case (colName, colType) => s"$colName $colType not null" } + .mkString(",") + + val structType = StructType.fromDDL(catalogDDL) + FlintDataType.serialize(structType) + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index f7e20b339..b97c3fea3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -5,19 +5,15 @@ package org.opensearch.flint.spark.covering -import org.json4s.{Formats, NoTypeHints} -import org.json4s.JsonAST.{JArray, JObject, JString} -import org.json4s.native.JsonMethods.{compact, parse, render} -import org.json4s.native.Serialization +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata} +import org.opensearch.flint.spark._ +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.flint.datatype.FlintDataType -import org.apache.spark.sql.types.StructType /** * Flint covering index in Spark. @@ -38,62 +34,30 @@ case class FlintSparkCoveringIndex( require(indexedColumns.nonEmpty, "indexed columns must not be empty") - /** Required by json4s write function */ - implicit val formats: Formats = Serialization.formats(NoTypeHints) - override val kind: String = COVERING_INDEX_TYPE override def name(): String = getFlintIndexName(indexName, tableName) override def metadata(): FlintMetadata = { - new FlintMetadata(s"""{ - | "_meta": { - | "name": "$indexName", - | "kind": "$kind", - | "indexedColumns": $getMetaInfo, - | "source": "$tableName", - | "options": $getIndexOptions, - | "properties": $getIndexProperties - | }, - | "properties": $getSchema - | } - |""".stripMargin) + val indexColumnMaps = { + indexedColumns.map { case (colName, colType) => + Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava + }.toArray + } + val schemaJson = generateSchemaJSON(indexedColumns) + + metadataBuilder(this) + .name(indexName) + .source(tableName) + .indexedColumns(indexColumnMaps) + .schema(schemaJson) + .build() } override def build(df: DataFrame): DataFrame = { val colNames = indexedColumns.keys.toSeq df.select(colNames.head, colNames.tail: _*) } - - // TODO: refactor all these once Flint metadata spec finalized - private def getMetaInfo: String = { - val objects = indexedColumns.map { case (colName, colType) => - JObject("columnName" -> JString(colName), "columnType" -> JString(colType)) - }.toList - Serialization.write(JArray(objects)) - } - - private def getIndexOptions: String = { - Serialization.write(options.optionsWithDefault) - } - - private def getIndexProperties: String = { - val envMap = populateEnvToMetadata - if (envMap.isEmpty) { - "{}" - } else { - s"""{ "env": ${Serialization.write(envMap)} }""" - } - } - - private def getSchema: String = { - val catalogDDL = - indexedColumns - .map { case (colName, colType) => s"$colName $colType not null" } - .mkString(",") - val properties = FlintDataType.serialize(StructType.fromDDL(catalogDDL)) - compact(render(parse(properties) \ "properties")) - } } object FlintSparkCoveringIndex { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 0749d13f5..ec213a3cd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -5,25 +5,20 @@ package org.opensearch.flint.spark.skipping -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization -import org.opensearch.flint.core.FlintVersion +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata, ID_COLUMN} +import org.opensearch.flint.spark._ +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, ID_COLUMN} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression -import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.functions.{col, input_file_name, sha1} -import org.apache.spark.sql.types.StructType /** * Flint skipping index in Spark. @@ -41,9 +36,6 @@ class FlintSparkSkippingIndex( require(indexedColumns.nonEmpty, "indexed columns must not be empty") - /** Required by json4s write function */ - implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer - /** Skipping index type */ override val kind: String = SKIPPING_INDEX_TYPE @@ -52,19 +44,27 @@ class FlintSparkSkippingIndex( } override def metadata(): FlintMetadata = { - new FlintMetadata(s"""{ - | "_meta": { - | "name": "${name()}", - | "version": "${FlintVersion.current()}", - | "kind": "$SKIPPING_INDEX_TYPE", - | "indexedColumns": $getMetaInfo, - | "source": "$tableName", - | "options": $getIndexOptions, - | "properties": $getIndexProperties - | }, - | "properties": $getSchema - | } - |""".stripMargin) + val indexColumnMaps = + indexedColumns + .map(col => + Map[String, AnyRef]( + "kind" -> col.kind.toString, + "columnName" -> col.columnName, + "columnType" -> col.columnType).asJava) + .toArray + + val fieldTypes = + indexedColumns + .flatMap(_.outputSchema()) + .toMap + (FILE_PATH_COLUMN -> "string") + val schemaJson = generateSchemaJSON(fieldTypes) + + metadataBuilder(this) + .name(name()) + .source(tableName) + .indexedColumns(indexColumnMaps) + .schema(schemaJson) + .build() } override def build(df: DataFrame): DataFrame = { @@ -81,36 +81,6 @@ class FlintSparkSkippingIndex( .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) } - - private def getMetaInfo: String = { - Serialization.write(indexedColumns) - } - - private def getIndexOptions: String = { - Serialization.write(options.optionsWithDefault) - } - - private def getIndexProperties: String = { - val envMap = populateEnvToMetadata - if (envMap.isEmpty) { - "{}" - } else { - s"""{ "env": ${Serialization.write(envMap)} }""" - } - } - - private def getSchema: String = { - val allFieldTypes = - indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") - val catalogDDL = - allFieldTypes - .map { case (colName, colType) => s"$colName $colType not null" } - .mkString(",") - val allFieldSparkTypes = StructType.fromDDL(catalogDDL) - // Convert StructType to {"properties": ...} and only need the properties value - val properties = FlintDataType.serialize(allFieldSparkTypes) - compact(render(parse(properties) \ "properties")) - } } object FlintSparkSkippingIndex { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index b31e18480..a3961bb51 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -5,11 +5,14 @@ package org.opensearch.flint.spark.skipping +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.json4s.native.JsonMethods.parse import org.mockito.Mockito.when import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.scalatestplus.mockito.MockitoSugar.mock @@ -27,6 +30,25 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test_skipping_index" } + test("get index metadata") { + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) + when(indexCol.columnName).thenReturn("test_field") + when(indexCol.columnType).thenReturn("integer") + when(indexCol.outputSchema()).thenReturn(Map("test_field" -> "integer")) + val index = new FlintSparkSkippingIndex(testTable, Seq(indexCol)) + + val metadata = index.metadata() + metadata.kind shouldBe SKIPPING_INDEX_TYPE + metadata.name shouldBe index.name() + metadata.source shouldBe testTable + metadata.indexedColumns shouldBe Array( + Map( + "kind" -> SkippingKind.PARTITION.toString, + "columnName" -> "test_field", + "columnType" -> "integer").asJava) + } + test("can build index building job with unique ID column") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) @@ -40,6 +62,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for boolean column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("boolean_col" -> "boolean")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("boolean_col").expr))) @@ -59,6 +82,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for string column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("string_col" -> "string")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("string_col").expr))) @@ -80,6 +104,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for varchar column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("varchar_col" -> "varchar(20)")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("varchar_col").expr))) @@ -99,6 +124,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for char column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("char_col" -> "char(20)")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("char_col").expr))) @@ -118,6 +144,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for long column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("long_col" -> "bigint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("long_col").expr))) @@ -137,6 +164,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for int column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("int_col" -> "int")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("int_col").expr))) @@ -156,6 +184,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for short column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("short_col" -> "smallint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("short_col").expr))) @@ -175,6 +204,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for byte column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("byte_col" -> "tinyint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("byte_col").expr))) @@ -194,6 +224,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for double column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("double_col" -> "double")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("double_col").expr))) @@ -213,6 +244,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for float column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("float_col" -> "float")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("float_col").expr))) @@ -232,6 +264,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for timestamp column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("timestamp_col" -> "timestamp")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("timestamp_col").expr))) @@ -252,6 +285,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for date column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("date_col" -> "date")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("date_col").expr))) @@ -272,6 +306,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for struct column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()) .thenReturn(Map("struct_col" -> "struct")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("struct_col").expr))) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 9d34b6f2a..5c799128c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -7,10 +7,10 @@ package org.opensearch.flint.core import scala.collection.JavaConverters._ -import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.mockito.Mockito.when import org.opensearch.client.json.jackson.JacksonJsonpMapper import org.opensearch.client.opensearch.OpenSearchClient import org.opensearch.client.transport.rest_client.RestClientTransport @@ -19,6 +19,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY @@ -34,7 +35,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val content = """ { | "_meta": { - | "kind": "SkippingIndex" + | "kind": "test_kind" | }, | "properties": { | "age": { @@ -43,41 +44,51 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M | } | } |""".stripMargin - flintClient.createIndex(indexName, new FlintMetadata(content)) + + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn(content) + when(metadata.indexSettings).thenReturn(None) + flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName).getContent should matchJson(content) + flintClient.getIndexMetadata(indexName).kind shouldBe "test_kind" } it should "create index with settings" in { val indexName = "flint_test_with_settings" val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" - flintClient.createIndex(indexName, new FlintMetadata("{}", indexSettings)) + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(Some(indexSettings)) + flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true // OS uses full setting name ("index" prefix) and store as string implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(indexName).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } it should "get all index metadata with the given index name pattern" in { - flintClient.createIndex("flint_test_1_index", new FlintMetadata("{}")) - flintClient.createIndex("flint_test_2_index", new FlintMetadata("{}")) + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(None) + flintClient.createIndex("flint_test_1_index", metadata) + flintClient.createIndex("flint_test_2_index", metadata) val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 - allMetadata.forEach(metadata => metadata.getContent shouldBe "{}") - allMetadata.forEach(metadata => metadata.getIndexSettings should not be empty) + allMetadata.forEach(metadata => metadata.getContent should not be empty) + allMetadata.forEach(metadata => metadata.indexSettings should not be empty) } it should "convert index name to all lowercase" in { val indexName = "flint_ELB_logs_index" flintClient.createIndex( indexName, - new FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) flintClient.exists(indexName) shouldBe true flintClient.getIndexMetadata(indexName) should not be null diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index b0938966f..a4b0069dd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined @@ -45,6 +46,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { + | "version": "${current()}", | "name": "name_and_age", | "kind": "covering", | "indexedColumns": [ diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 0e4c8bd67..627e11f52 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -89,7 +89,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testFlintIndex).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 55893732e..bfbeba9c3 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -94,7 +94,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testIndex).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index d12d03565..0a853dc92 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -294,7 +294,7 @@ object FlintJob extends Logging { mapping: String): (Boolean, String) = { try { logInfo(s"create $resultIndex") - flintClient.createIndex(resultIndex, new FlintMetadata(mapping)) + flintClient.createIndex(resultIndex, FlintMetadata.apply(mapping)) logInfo(s"create $resultIndex successfully") (true, "") } catch {