From d1c0532d284f2126d813d13b5bfcce9061430cbc Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 10 Oct 2023 13:41:48 -0700 Subject: [PATCH 01/16] Refactor Flint metadata content to strong type Signed-off-by: Chen Dai --- build.sbt | 7 +- .../flint/core/metadata/FlintMetadata.java | 233 +++++++++++++++++- .../core/metadata/FlintMetadataSuite.scala | 63 +++++ 3 files changed, 300 insertions(+), 3 deletions(-) create mode 100644 flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala diff --git a/build.sbt b/build.sbt index bc018c265..9389384fd 100644 --- a/build.sbt +++ b/build.sbt @@ -58,7 +58,12 @@ lazy val flintCore = (project in file("flint-core")) "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion exclude ("org.apache.logging.log4j", "log4j-api"), "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" - exclude ("com.fasterxml.jackson.core", "jackson-databind")), + exclude ("com.fasterxml.jackson.core", "jackson-databind"), + "org.scalactic" %% "scalactic" % "3.2.15" % "test", + "org.scalatest" %% "scalatest" % "3.2.15" % "test", + "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", + "org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test", + "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"), publish / skip := true) lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java index 6773c3897..d0165b986 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java @@ -5,20 +5,150 @@ package org.opensearch.flint.core.metadata; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.opensearch.flint.core.FlintVersion.current; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.flint.core.FlintVersion; + /** * Flint metadata follows Flint index specification and defines metadata * for a Flint index regardless of query engine integration and storage. */ public class FlintMetadata { + /** + * Flint specification version. + */ + private FlintVersion version; + + /** + * Flint index name. + */ + private String name; + + /** + * Flint index type. + */ + private String kind; + + /** + * Source that the Flint index is derived from. + * // TODO: extend this as a complex object to store source file list in future? + */ + private String source; + + /** + * Flint index options. + * // TODO: maybe move this into properties as extended field? + */ + private Map options = new HashMap<>(); + + /** + * Indexed columns of the Flint index. Use LinkedHashMap to preserve ordering. + */ + private Map indexedColumns = new LinkedHashMap<>(); + + /** + * Other extended fields. Value is object for complex structure. + */ + private Map properties = new HashMap<>(); + + /** + * Flint index field schema. + */ + private Map schema = new HashMap<>(); + // TODO: define metadata format and create strong-typed class - private final String content; + private String content; // TODO: piggyback optional index settings and will refactor as above private String indexSettings; + public FlintMetadata() { + } + public FlintMetadata(String content) { this.content = content; + + try (XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + content.getBytes(UTF_8))) { + parser.nextToken(); // Start parsing + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); // Move to the field value + + if ("_meta".equals(fieldName)) { + parseMetaField(parser); + } else if ("properties".equals(fieldName)) { + schema = parser.map(); + } else if ("indexSettings".equals(fieldName)) { + indexSettings = parser.text(); + } + } + } catch (IOException e) { + throw new IllegalStateException("Failed to parse metadata JSON", e); + } + } + + private void parseMetaField(XContentParser parser) throws IOException { + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String metaFieldName = parser.currentName(); + parser.nextToken(); + + switch (metaFieldName) { + case "version": + version = FlintVersion.apply(parser.text()); + break; + case "name": + name = parser.text(); + break; + case "kind": + kind = parser.text(); + break; + case "source": + source = parser.text(); + break; + case "indexedColumns": + indexedColumns = parseIndexedColumns(parser); + break; + case "options": + options = parser.map(); + break; + case "properties": + properties = parser.map(); + break; + default: + break; + } + } + } + + private Map parseIndexedColumns(XContentParser parser) throws IOException { + Map columns = new LinkedHashMap<>(); + + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String columnName = parser.currentName(); + parser.nextToken(); + String columnValue = parser.text(); + columns.put(columnName, columnValue); + } + } + return columns; } public FlintMetadata(String content, String indexSettings) { @@ -27,7 +157,42 @@ public FlintMetadata(String content, String indexSettings) { } public String getContent() { - return content; + try { + XContentBuilder builder = XContentFactory.jsonBuilder(); + // Start main object + builder.startObject(); + + // Create the "_meta" object + builder.startObject("_meta"); + builder.field("version", (version == null) ? current().version() : version.version()); + builder.field("name", name); + builder.field("kind", kind); + builder.field("source", source); + builder.array("indexedColumns", indexedColumns); + + // Optional "options" JSON object + if (options == null) { + builder.startObject("options").endObject(); + } else { + builder.field("options", options); + } + + // Optional "properties" JSON object + if (properties == null) { + builder.startObject("properties").endObject(); + } else { + builder.field("properties", properties); + } + builder.endObject(); // End "_meta" object + + // Create "properties" (schema) object + builder.field("properties", schema); + builder.endObject(); // End the main object + + return BytesReference.bytes(builder).utf8ToString(); + } catch (IOException e) { + throw new IllegalStateException("Failed to jsonify Flint metadata", e); + } } public String getIndexSettings() { @@ -37,4 +202,68 @@ public String getIndexSettings() { public void setIndexSettings(String indexSettings) { this.indexSettings = indexSettings; } + + public FlintVersion getVersion() { + return version; + } + + public void setVersion(FlintVersion version) { + this.version = version; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getKind() { + return kind; + } + + public void setKind(String kind) { + this.kind = kind; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + public Map getOptions() { + return options; + } + + public void setOptions(Map options) { + this.options = options; + } + + public Map getIndexedColumns() { + return indexedColumns; + } + + public void setIndexedColumns(Map indexedColumns) { + this.indexedColumns = indexedColumns; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public Map getSchema() { + return schema; + } + + public void setSchema(Map schema) { + this.schema = schema; + } } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala new file mode 100644 index 000000000..47794ecee --- /dev/null +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.opensearch.flint.core.FlintVersion.current +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FlintMetadataSuite extends AnyFlatSpec with Matchers { + + /** Test metadata JSON string */ + val testMetadataJson: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "test_index", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": {}, + | "properties": {} + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + + "constructor" should "deserialize the given JSON and assign parsed value to field" in { + val metadata = new FlintMetadata(testMetadataJson) + + metadata.getVersion shouldBe current() + metadata.getName shouldBe "test_index" + metadata.getKind shouldBe "test_kind" + metadata.getSource shouldBe "test_source_table" + metadata.getIndexedColumns shouldBe Map("test_field" -> "spark_type").asJava + metadata.getSchema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava + } + + "getContent" should "serialize all fields to JSON" in { + val metadata = new FlintMetadata() + metadata.setName("test_index") + metadata.setKind("test_kind") + metadata.setSource("test_source_table") + metadata.getIndexedColumns.put("test_field", "spark_type"); + metadata.getSchema.put("test_field", Map("type" -> "os_type").asJava) + // metadata.setIndexedColumns("""[{"test_field": "spark_type"}]""") + // metadata.setSchema("""{"test_field": {"type": "os_type"}}""") + + metadata.getContent should matchJson(testMetadataJson) + } +} From 6e0d3cd72f90cfa2caa93ae20ddc0a8769a82fed Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 10 Oct 2023 15:35:10 -0700 Subject: [PATCH 02/16] Refactor Flint metadata to Scala case class Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.java | 269 ------------------ .../flint/core/metadata/FlintMetadata.scala | 209 ++++++++++++++ .../core/storage/FlintOpenSearchClient.java | 8 +- .../core/metadata/FlintMetadataSuite.scala | 41 +-- 4 files changed, 235 insertions(+), 292 deletions(-) delete mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java deleted file mode 100644 index d0165b986..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.metadata; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.opensearch.flint.core.FlintVersion.current; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.xcontent.DeprecationHandler; -import org.opensearch.common.xcontent.NamedXContentRegistry; -import org.opensearch.common.xcontent.XContentBuilder; -import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.common.xcontent.XContentParser; -import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.flint.core.FlintVersion; - -/** - * Flint metadata follows Flint index specification and defines metadata - * for a Flint index regardless of query engine integration and storage. - */ -public class FlintMetadata { - - /** - * Flint specification version. - */ - private FlintVersion version; - - /** - * Flint index name. - */ - private String name; - - /** - * Flint index type. - */ - private String kind; - - /** - * Source that the Flint index is derived from. - * // TODO: extend this as a complex object to store source file list in future? - */ - private String source; - - /** - * Flint index options. - * // TODO: maybe move this into properties as extended field? - */ - private Map options = new HashMap<>(); - - /** - * Indexed columns of the Flint index. Use LinkedHashMap to preserve ordering. - */ - private Map indexedColumns = new LinkedHashMap<>(); - - /** - * Other extended fields. Value is object for complex structure. - */ - private Map properties = new HashMap<>(); - - /** - * Flint index field schema. - */ - private Map schema = new HashMap<>(); - - // TODO: define metadata format and create strong-typed class - private String content; - - // TODO: piggyback optional index settings and will refactor as above - private String indexSettings; - - public FlintMetadata() { - } - - public FlintMetadata(String content) { - this.content = content; - - try (XContentParser parser = JsonXContent.jsonXContent.createParser( - NamedXContentRegistry.EMPTY, - DeprecationHandler.IGNORE_DEPRECATIONS, - content.getBytes(UTF_8))) { - parser.nextToken(); // Start parsing - - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String fieldName = parser.currentName(); - parser.nextToken(); // Move to the field value - - if ("_meta".equals(fieldName)) { - parseMetaField(parser); - } else if ("properties".equals(fieldName)) { - schema = parser.map(); - } else if ("indexSettings".equals(fieldName)) { - indexSettings = parser.text(); - } - } - } catch (IOException e) { - throw new IllegalStateException("Failed to parse metadata JSON", e); - } - } - - private void parseMetaField(XContentParser parser) throws IOException { - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String metaFieldName = parser.currentName(); - parser.nextToken(); - - switch (metaFieldName) { - case "version": - version = FlintVersion.apply(parser.text()); - break; - case "name": - name = parser.text(); - break; - case "kind": - kind = parser.text(); - break; - case "source": - source = parser.text(); - break; - case "indexedColumns": - indexedColumns = parseIndexedColumns(parser); - break; - case "options": - options = parser.map(); - break; - case "properties": - properties = parser.map(); - break; - default: - break; - } - } - } - - private Map parseIndexedColumns(XContentParser parser) throws IOException { - Map columns = new LinkedHashMap<>(); - - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String columnName = parser.currentName(); - parser.nextToken(); - String columnValue = parser.text(); - columns.put(columnName, columnValue); - } - } - return columns; - } - - public FlintMetadata(String content, String indexSettings) { - this.content = content; - this.indexSettings = indexSettings; - } - - public String getContent() { - try { - XContentBuilder builder = XContentFactory.jsonBuilder(); - // Start main object - builder.startObject(); - - // Create the "_meta" object - builder.startObject("_meta"); - builder.field("version", (version == null) ? current().version() : version.version()); - builder.field("name", name); - builder.field("kind", kind); - builder.field("source", source); - builder.array("indexedColumns", indexedColumns); - - // Optional "options" JSON object - if (options == null) { - builder.startObject("options").endObject(); - } else { - builder.field("options", options); - } - - // Optional "properties" JSON object - if (properties == null) { - builder.startObject("properties").endObject(); - } else { - builder.field("properties", properties); - } - builder.endObject(); // End "_meta" object - - // Create "properties" (schema) object - builder.field("properties", schema); - builder.endObject(); // End the main object - - return BytesReference.bytes(builder).utf8ToString(); - } catch (IOException e) { - throw new IllegalStateException("Failed to jsonify Flint metadata", e); - } - } - - public String getIndexSettings() { - return indexSettings; - } - - public void setIndexSettings(String indexSettings) { - this.indexSettings = indexSettings; - } - - public FlintVersion getVersion() { - return version; - } - - public void setVersion(FlintVersion version) { - this.version = version; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getKind() { - return kind; - } - - public void setKind(String kind) { - this.kind = kind; - } - - public String getSource() { - return source; - } - - public void setSource(String source) { - this.source = source; - } - - public Map getOptions() { - return options; - } - - public void setOptions(Map options) { - this.options = options; - } - - public Map getIndexedColumns() { - return indexedColumns; - } - - public void setIndexedColumns(Map indexedColumns) { - this.indexedColumns = indexedColumns; - } - - public Map getProperties() { - return properties; - } - - public void setProperties(Map properties) { - this.properties = properties; - } - - public Map getSchema() { - return schema; - } - - public void setSchema(Map schema) { - this.schema = schema; - } -} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala new file mode 100644 index 000000000..898e7771e --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -0,0 +1,209 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata + +import java.nio.charset.StandardCharsets.UTF_8 +import java.util + +import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.xcontent._ +import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.flint.core.FlintVersion + +/** + * Flint metadata follows Flint index specification and defines metadata for a Flint index + * regardless of query engine integration and storage. + */ +case class FlintMetadata( + version: FlintVersion = FlintVersion.current(), + name: String, + kind: String, + source: String, + indexedColumns: util.Map[String, AnyRef] = new util.LinkedHashMap[String, AnyRef], + options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + indexSettings: String = null) { + + def getContent: String = { + try { + val builder: XContentBuilder = XContentFactory.jsonBuilder + builder.startObject + + builder.startObject("_meta") + builder.field( + "version", + if (version == null) FlintVersion.current().version else version.version) + builder.field("name", name) + builder.field("kind", kind) + builder.field("source", source) + builder.array("indexedColumns", indexedColumns) + + if (options == null) { + builder.startObject("options").endObject() + } else { + builder.field("options", options) + } + + if (properties == null) { + builder.startObject("properties").endObject() + } else { + builder.field("properties", properties) + } + + builder.endObject + builder.field("properties", schema) + builder.endObject + BytesReference.bytes(builder).utf8ToString + } catch { + case e: Exception => + throw new IllegalStateException("Failed to jsonify Flint metadata", e) + } + } +} + +object FlintMetadata { + + def fromJson(content: String, settings: String): FlintMetadata = { + try { + val parser: XContentParser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + content.getBytes(UTF_8)) + parser.nextToken() // Start parsing + + val builder = new FlintMetadata.Builder() + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName: String = parser.currentName() + parser.nextToken() // Move to the field value + + if ("_meta".equals(fieldName)) { + parseMetaField(parser, builder) + } else if ("properties".equals(fieldName)) { + builder.schema(parser.map()) + } + } + builder.build() + } catch { + case e: Exception => + throw new IllegalStateException("Failed to parse metadata JSON", e) + } + } + + private def parseMetaField(parser: XContentParser, builder: Builder): Unit = { + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + val metaFieldName: String = parser.currentName() + parser.nextToken() + + metaFieldName match { + case "version" => builder.version(FlintVersion.apply(parser.text())) + case "name" => builder.name(parser.text()) + case "kind" => builder.kind(parser.text()) + case "source" => builder.source(parser.text()) + case "indexedColumns" => parseIndexedColumns(parser, builder) + case "options" => builder.options(parser.map()) + case "properties" => builder.properties(parser.map()) + case _ => // Handle other fields as needed + } + } + } + + private def parseIndexedColumns(parser: XContentParser, builder: Builder): Unit = { + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + val colName: String = parser.currentName() + parser.nextToken() + + val colValue: String = parser.text() + builder.addIndexedColumn(colName, colValue) + } + } + } + + class Builder { + private var version: FlintVersion = FlintVersion.current() + private var name: String = "" + private var kind: String = "" + private var source: String = "" + private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + private var indexedColumns: util.Map[String, AnyRef] = + new util.LinkedHashMap[String, AnyRef]() + private var properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + private var indexSettings: String = null + + // Setters for each field + def version(version: FlintVersion): Builder = { + this.version = version + this + } + + def name(name: String): Builder = { + this.name = name + this + } + + def kind(kind: String): Builder = { + this.kind = kind + this + } + + def source(source: String): Builder = { + this.source = source + this + } + + def options(options: util.Map[String, AnyRef]): Builder = { + this.options = options + this + } + + def addOption(key: String, value: AnyRef): Builder = { + this.options.put(key, value) + this + } + + def indexedColumns(indexedColumns: util.Map[String, AnyRef]): Builder = { + this.indexedColumns = indexedColumns + this + } + + def addIndexedColumn(key: String, value: AnyRef): Builder = { + indexedColumns.put(key, value) + this + } + + def properties(properties: util.Map[String, AnyRef]): Builder = { + this.properties = properties + this + } + + def addProperty(key: String, value: AnyRef): Builder = { + properties.put(key, value) + this + } + + def schema(schema: util.Map[String, AnyRef]): Builder = { + this.schema = schema + this + } + + def addSchemaField(key: String, value: AnyRef): Builder = { + schema.put(key, value) + this + } + + def indexSettings(indexSettings: String): Builder = { + this.indexSettings = indexSettings + this + } + + // Build method to create the FlintMetadata instance + def build(): FlintMetadata = { + FlintMetadata(version, name, kind, source, indexedColumns, options, properties, schema, indexSettings) + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index b973385d8..38283de80 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -73,8 +73,8 @@ public FlintOpenSearchClient(FlintOptions options) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(metadata.getContent(), XContentType.JSON); - if (metadata.getIndexSettings() != null) { - request.settings(metadata.getIndexSettings(), XContentType.JSON); + if (metadata.indexSettings() != null) { + request.settings(metadata.indexSettings(), XContentType.JSON); } client.indices().create(request, RequestOptions.DEFAULT); } catch (Exception e) { @@ -98,7 +98,7 @@ public FlintOpenSearchClient(FlintOptions options) { GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) - .map(index -> new FlintMetadata( + .map(index -> FlintMetadata.fromJson( response.getMappings().get(index).source().toString(), response.getSettings().get(index).toString())) .collect(Collectors.toList()); @@ -115,7 +115,7 @@ public FlintOpenSearchClient(FlintOptions options) { MappingMetadata mapping = response.getMappings().get(osIndexName); Settings settings = response.getSettings().get(osIndexName); - return new FlintMetadata(mapping.source().string(), settings.toString()); + return FlintMetadata.fromJson(mapping.source().string(), settings.toString()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala index 47794ecee..eeef98c6f 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -14,7 +14,7 @@ import org.scalatest.matchers.should.Matchers class FlintMetadataSuite extends AnyFlatSpec with Matchers { - /** Test metadata JSON string */ + /** Test Flint index meta JSON string */ val testMetadataJson: String = s""" | { | "_meta": { @@ -37,27 +37,30 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { | } |""".stripMargin + val testIndexSettingsJson: String = + """ + | { "number_of_shards": 3 } + |""".stripMargin + "constructor" should "deserialize the given JSON and assign parsed value to field" in { - val metadata = new FlintMetadata(testMetadataJson) - - metadata.getVersion shouldBe current() - metadata.getName shouldBe "test_index" - metadata.getKind shouldBe "test_kind" - metadata.getSource shouldBe "test_source_table" - metadata.getIndexedColumns shouldBe Map("test_field" -> "spark_type").asJava - metadata.getSchema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava + val metadata = FlintMetadata.fromJson(testMetadataJson, testIndexSettingsJson) + + metadata.version shouldBe current() + metadata.name shouldBe "test_index" + metadata.kind shouldBe "test_kind" + metadata.source shouldBe "test_source_table" + metadata.indexedColumns shouldBe Map("test_field" -> "spark_type").asJava + metadata.schema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava } "getContent" should "serialize all fields to JSON" in { - val metadata = new FlintMetadata() - metadata.setName("test_index") - metadata.setKind("test_kind") - metadata.setSource("test_source_table") - metadata.getIndexedColumns.put("test_field", "spark_type"); - metadata.getSchema.put("test_field", Map("type" -> "os_type").asJava) - // metadata.setIndexedColumns("""[{"test_field": "spark_type"}]""") - // metadata.setSchema("""{"test_field": {"type": "os_type"}}""") - - metadata.getContent should matchJson(testMetadataJson) + val metadata = new FlintMetadata.Builder + metadata.name("test_index") + metadata.kind("test_kind") + metadata.source("test_source_table") + metadata.addIndexedColumn("test_field", "spark_type"); + metadata.addSchemaField("test_field", Map("type" -> "os_type").asJava) + + metadata.build().getContent should matchJson(testMetadataJson) } } From c7f599de3d090b8b360a642c0732572cff321688 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 10 Oct 2023 15:59:40 -0700 Subject: [PATCH 03/16] Make indexed columns extendable in Flint Spark Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 41 +++++++++---------- .../core/metadata/FlintMetadataSuite.scala | 17 ++++---- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 898e7771e..0858616fa 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -22,7 +22,7 @@ case class FlintMetadata( name: String, kind: String, source: String, - indexedColumns: util.Map[String, AnyRef] = new util.LinkedHashMap[String, AnyRef], + indexedColumns: Array[util.Map[String, AnyRef]] = Array(), options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], @@ -40,7 +40,7 @@ case class FlintMetadata( builder.field("name", name) builder.field("kind", kind) builder.field("source", source) - builder.array("indexedColumns", indexedColumns) + builder.field("indexedColumns", indexedColumns) if (options == null) { builder.startObject("options").endObject() @@ -103,7 +103,10 @@ object FlintMetadata { case "name" => builder.name(parser.text()) case "kind" => builder.kind(parser.text()) case "source" => builder.source(parser.text()) - case "indexedColumns" => parseIndexedColumns(parser, builder) + case "indexedColumns" => + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + builder.addIndexedColumn(parser.map()) + } case "options" => builder.options(parser.map()) case "properties" => builder.properties(parser.map()) case _ => // Handle other fields as needed @@ -111,26 +114,13 @@ object FlintMetadata { } } - private def parseIndexedColumns(parser: XContentParser, builder: Builder): Unit = { - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - val colName: String = parser.currentName() - parser.nextToken() - - val colValue: String = parser.text() - builder.addIndexedColumn(colName, colValue) - } - } - } - class Builder { private var version: FlintVersion = FlintVersion.current() private var name: String = "" private var kind: String = "" private var source: String = "" private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() - private var indexedColumns: util.Map[String, AnyRef] = - new util.LinkedHashMap[String, AnyRef]() + private var indexedColumns: Array[util.Map[String, AnyRef]] = Array() private var properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() private var indexSettings: String = null @@ -166,13 +156,13 @@ object FlintMetadata { this } - def indexedColumns(indexedColumns: util.Map[String, AnyRef]): Builder = { + def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): Builder = { this.indexedColumns = indexedColumns this } - def addIndexedColumn(key: String, value: AnyRef): Builder = { - indexedColumns.put(key, value) + def addIndexedColumn(indexCol: util.Map[String, AnyRef]): Builder = { + indexedColumns = indexedColumns :+ indexCol this } @@ -203,7 +193,16 @@ object FlintMetadata { // Build method to create the FlintMetadata instance def build(): FlintMetadata = { - FlintMetadata(version, name, kind, source, indexedColumns, options, properties, schema, indexSettings) + FlintMetadata( + version, + name, + kind, + source, + indexedColumns, + options, + properties, + schema, + indexSettings) } } } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala index eeef98c6f..f2790f5b4 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -49,18 +49,19 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { metadata.name shouldBe "test_index" metadata.kind shouldBe "test_kind" metadata.source shouldBe "test_source_table" - metadata.indexedColumns shouldBe Map("test_field" -> "spark_type").asJava + metadata.indexedColumns shouldBe Array(Map("test_field" -> "spark_type").asJava) metadata.schema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava } "getContent" should "serialize all fields to JSON" in { - val metadata = new FlintMetadata.Builder - metadata.name("test_index") - metadata.kind("test_kind") - metadata.source("test_source_table") - metadata.addIndexedColumn("test_field", "spark_type"); - metadata.addSchemaField("test_field", Map("type" -> "os_type").asJava) + val builder = new FlintMetadata.Builder + builder.name("test_index") + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava); + builder.addSchemaField("test_field", Map("type" -> "os_type").asJava) - metadata.build().getContent should matchJson(testMetadataJson) + val metadata = builder.build() + metadata.getContent should matchJson(testMetadataJson) } } From b9100f38306896ba885be3a545441fff3f7d1f80 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 11 Oct 2023 13:46:58 -0700 Subject: [PATCH 04/16] Fix IT and UT Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 35 +++++---- .../opensearch/flint/spark/FlintSpark.scala | 56 +++++--------- .../FlintSparkIndexMetadataBuilder.scala | 69 +++++++++++++++++ .../covering/FlintSparkCoveringIndex.scala | 67 ++++------------- .../skipping/FlintSparkSkippingIndex.scala | 75 +++++-------------- .../FlintSparkSkippingIndexSuite.scala | 37 ++++++++- .../core/FlintOpenSearchClientSuite.scala | 30 +++++--- .../FlintSparkCoveringIndexSqlITSuite.scala | 2 +- .../FlintSparkSkippingIndexSqlITSuite.scala | 2 +- 9 files changed, 199 insertions(+), 174 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 0858616fa..7697e2d1f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -86,6 +86,8 @@ object FlintMetadata { builder.schema(parser.map()) } } + + builder.indexSettings(settings) builder.build() } catch { case e: Exception => @@ -114,6 +116,12 @@ object FlintMetadata { } } + def builder(): FlintMetadata.Builder = new Builder + + /** + * Flint index metadata builder that can be extended by subclass to provide more custom build + * method. + */ class Builder { private var version: FlintVersion = FlintVersion.current() private var name: String = "" @@ -125,68 +133,67 @@ object FlintMetadata { private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() private var indexSettings: String = null - // Setters for each field - def version(version: FlintVersion): Builder = { + def version(version: FlintVersion): this.type = { this.version = version this } - def name(name: String): Builder = { + def name(name: String): this.type = { this.name = name this } - def kind(kind: String): Builder = { + def kind(kind: String): this.type = { this.kind = kind this } - def source(source: String): Builder = { + def source(source: String): this.type = { this.source = source this } - def options(options: util.Map[String, AnyRef]): Builder = { + def options(options: util.Map[String, AnyRef]): this.type = { this.options = options this } - def addOption(key: String, value: AnyRef): Builder = { + def addOption(key: String, value: AnyRef): this.type = { this.options.put(key, value) this } - def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): Builder = { + def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): this.type = { this.indexedColumns = indexedColumns this } - def addIndexedColumn(indexCol: util.Map[String, AnyRef]): Builder = { + def addIndexedColumn(indexCol: util.Map[String, AnyRef]): this.type = { indexedColumns = indexedColumns :+ indexCol this } - def properties(properties: util.Map[String, AnyRef]): Builder = { + def properties(properties: util.Map[String, AnyRef]): this.type = { this.properties = properties this } - def addProperty(key: String, value: AnyRef): Builder = { + def addProperty(key: String, value: AnyRef): this.type = { properties.put(key, value) this } - def schema(schema: util.Map[String, AnyRef]): Builder = { + def schema(schema: util.Map[String, AnyRef]): this.type = { this.schema = schema this } - def addSchemaField(key: String, value: AnyRef): Builder = { + def addSchemaField(key: String, value: AnyRef): this.type = { schema.put(key, value) this } - def indexSettings(indexSettings: String): Builder = { + def indexSettings(indexSettings: String): this.type = { this.indexSettings = indexSettings this } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 4a4885ecb..ee4775b6a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -7,9 +7,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters._ -import org.json4s.{Formats, JArray, NoTypeHints} -import org.json4s.JsonAST.{JField, JObject} -import org.json4s.native.JsonMethods.parse +import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintMetadata @@ -87,7 +85,6 @@ class FlintSpark(val spark: SparkSession) { } } else { val metadata = index.metadata() - index.options.indexSettings().foreach(metadata.setIndexSettings) flintClient.createIndex(indexName, metadata) } } @@ -105,7 +102,7 @@ class FlintSpark(val spark: SparkSession) { def refreshIndex(indexName: String, mode: RefreshMode): Option[String] = { val index = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - val tableName = getSourceTableName(index) + val tableName = index.metadata().source // Write Flint index data to Flint data source (shared by both refresh modes for now) def writeFlintIndex(df: DataFrame): Unit = { @@ -224,39 +221,16 @@ class FlintSpark(val spark: SparkSession) { } } - // TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed - private def getSourceTableName(index: FlintSparkIndex): String = { - val json = parse(index.metadata().getContent) - (json \ "_meta" \ "source").extract[String] - } - - /* - * For now, deserialize skipping strategies out of Flint metadata json - * ex. extract Seq(Partition("year", "int"), ValueList("name")) from - * { "_meta": { "indexedColumns": [ {...partition...}, {...value list...} ] } } - * - */ private def deserialize(metadata: FlintMetadata): FlintSparkIndex = { - val meta = parse(metadata.getContent) \ "_meta" - val indexName = (meta \ "name").extract[String] - val tableName = (meta \ "source").extract[String] - val indexType = (meta \ "kind").extract[String] - val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray] val indexOptions = FlintSparkIndexOptions( - (meta \ "options") - .asInstanceOf[JObject] - .obj - .map { case JField(key, value) => - key -> value.values.toString - } - .toMap) + metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) - indexType match { + metadata.kind match { case SKIPPING_INDEX_TYPE => - val strategies = indexedColumns.arr.map { colInfo => - val skippingKind = SkippingKind.withName((colInfo \ "kind").extract[String]) - val columnName = (colInfo \ "columnName").extract[String] - val columnType = (colInfo \ "columnType").extract[String] + val strategies = metadata.indexedColumns.map { colInfo => + val skippingKind = SkippingKind.withName(getString(colInfo, "kind")) + val columnName = getString(colInfo, "columnName") + val columnType = getString(colInfo, "columnType") skippingKind match { case PARTITION => @@ -269,17 +243,21 @@ class FlintSpark(val spark: SparkSession) { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - new FlintSparkSkippingIndex(tableName, strategies, indexOptions) + new FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( - indexName, - tableName, - indexedColumns.arr.map { obj => - ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) + metadata.name, + metadata.source, + metadata.indexedColumns.map { colInfo => + getString(colInfo, "columnName") -> getString(colInfo, "columnType") }.toMap, indexOptions) } } + + private def getString(map: java.util.Map[String, AnyRef], key: String): String = { + map.get(key).asInstanceOf[String] + } } object FlintSpark { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala new file mode 100644 index 000000000..5b51520cb --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.json4s.JObject +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.spark.FlintSparkIndex.populateEnvToMetadata + +import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.types.StructType + +/** + * Flint Spark metadata builder with common build logic. + */ +class FlintSparkIndexMetadataBuilder(index: FlintSparkIndex) extends FlintMetadata.Builder { + + def schema(allFieldTypes: Map[String, String]): FlintSparkIndexMetadataBuilder = { + val catalogDDL = + allFieldTypes + .map { case (colName, colType) => s"$colName $colType not null" } + .mkString(",") + val struckType = StructType.fromDDL(catalogDDL) + + // Assume each value is an JSON Object + struckType.fields.foreach(field => { + val (fieldName, fieldType) = FlintDataType.serializeField(field) + val fieldTypeMap = + fieldType + .asInstanceOf[JObject] + .values + .mapValues { + case v: Map[_, _] => v.asJava + case other => other + } + .asJava + addSchemaField(fieldName, fieldTypeMap) + }) + this + } + + override def build(): FlintMetadata = { + // Common fields in all Flint Spark index + kind(index.kind) + name(index.name()) + options(index.options.options.mapValues(_.asInstanceOf[AnyRef]).asJava) + + val envs = populateEnvToMetadata + if (envs.nonEmpty) { + addProperty("env", envs.asJava) + } + + val settings = index.options.indexSettings() + if (settings.isDefined) { + indexSettings(settings.get) + } + super.build() + } +} + +object FlintSparkIndexMetadataBuilder { + + def builder(index: FlintSparkIndex): FlintSparkIndexMetadataBuilder = + new FlintSparkIndexMetadataBuilder(index) +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 3db325c3e..ef6203a8b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -5,19 +5,15 @@ package org.opensearch.flint.spark.covering -import org.json4s.{Formats, NoTypeHints} -import org.json4s.JsonAST.{JArray, JObject, JString} -import org.json4s.native.JsonMethods.{compact, parse, render} -import org.json4s.native.Serialization +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata} +import org.opensearch.flint.spark._ +import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.flint.datatype.FlintDataType -import org.apache.spark.sql.types.StructType /** * Flint covering index in Spark. @@ -38,62 +34,27 @@ case class FlintSparkCoveringIndex( require(indexedColumns.nonEmpty, "indexed columns must not be empty") - /** Required by json4s write function */ - implicit val formats: Formats = Serialization.formats(NoTypeHints) - override val kind: String = COVERING_INDEX_TYPE override def name(): String = getFlintIndexName(indexName, tableName) override def metadata(): FlintMetadata = { - new FlintMetadata(s"""{ - | "_meta": { - | "name": "$indexName", - | "kind": "$kind", - | "indexedColumns": $getMetaInfo, - | "source": "$tableName", - | "options": $getIndexOptions, - | "properties": $getIndexProperties - | }, - | "properties": $getSchema - | } - |""".stripMargin) + val builder = FlintSparkIndexMetadataBuilder.builder(this) + + indexedColumns.map { case (colName, colType) => + builder.addIndexedColumn( + Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava) + } + builder + .source(tableName) + .schema(indexedColumns) + .build() } override def build(df: DataFrame): DataFrame = { val colNames = indexedColumns.keys.toSeq df.select(colNames.head, colNames.tail: _*) } - - // TODO: refactor all these once Flint metadata spec finalized - private def getMetaInfo: String = { - val objects = indexedColumns.map { case (colName, colType) => - JObject("columnName" -> JString(colName), "columnType" -> JString(colType)) - }.toList - Serialization.write(JArray(objects)) - } - - private def getIndexOptions: String = { - Serialization.write(options.options) - } - - private def getIndexProperties: String = { - val envMap = populateEnvToMetadata - if (envMap.isEmpty) { - "{}" - } else { - s"""{ "env": ${Serialization.write(envMap)} }""" - } - } - - private def getSchema: String = { - val catalogDDL = - indexedColumns - .map { case (colName, colType) => s"$colName $colType not null" } - .mkString(",") - val properties = FlintDataType.serialize(StructType.fromDDL(catalogDDL)) - compact(render(parse(properties) \ "properties")) - } } object FlintSparkCoveringIndex { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index dd9cb6bdf..ef6636212 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -5,25 +5,20 @@ package org.opensearch.flint.spark.skipping -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization -import org.opensearch.flint.core.FlintVersion +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata, ID_COLUMN} +import org.opensearch.flint.spark._ +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression -import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.functions.{col, input_file_name, sha1} -import org.apache.spark.sql.types.StructType /** * Flint skipping index in Spark. @@ -41,9 +36,6 @@ class FlintSparkSkippingIndex( require(indexedColumns.nonEmpty, "indexed columns must not be empty") - /** Required by json4s write function */ - implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer - /** Skipping index type */ override val kind: String = SKIPPING_INDEX_TYPE @@ -52,19 +44,22 @@ class FlintSparkSkippingIndex( } override def metadata(): FlintMetadata = { - new FlintMetadata(s"""{ - | "_meta": { - | "name": "${name()}", - | "version": "${FlintVersion.current()}", - | "kind": "$SKIPPING_INDEX_TYPE", - | "indexedColumns": $getMetaInfo, - | "source": "$tableName", - | "options": $getIndexOptions, - | "properties": $getIndexProperties - | }, - | "properties": $getSchema - | } - |""".stripMargin) + val builder = FlintSparkIndexMetadataBuilder.builder(this) + + indexedColumns.map(col => + builder.addIndexedColumn( + Map[String, AnyRef]( + "kind" -> col.kind.toString, + "columnName" -> col.columnName, + "columnType" -> col.columnType).asJava)) + + val allFieldTypes = + indexedColumns + .flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") + builder + .source(tableName) + .schema(allFieldTypes) + .build() } override def build(df: DataFrame): DataFrame = { @@ -81,36 +76,6 @@ class FlintSparkSkippingIndex( .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) } - - private def getMetaInfo: String = { - Serialization.write(indexedColumns) - } - - private def getIndexOptions: String = { - Serialization.write(options.options) - } - - private def getIndexProperties: String = { - val envMap = populateEnvToMetadata - if (envMap.isEmpty) { - "{}" - } else { - s"""{ "env": ${Serialization.write(envMap)} }""" - } - } - - private def getSchema: String = { - val allFieldTypes = - indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") - val catalogDDL = - allFieldTypes - .map { case (colName, colType) => s"$colName $colType not null" } - .mkString(",") - val allFieldSparkTypes = StructType.fromDDL(catalogDDL) - // Convert StructType to {"properties": ...} and only need the properties value - val properties = FlintDataType.serialize(allFieldSparkTypes) - compact(render(parse(properties) \ "properties")) - } } object FlintSparkSkippingIndex { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index b31e18480..a3961bb51 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -5,11 +5,14 @@ package org.opensearch.flint.spark.skipping +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.json4s.native.JsonMethods.parse import org.mockito.Mockito.when import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.scalatestplus.mockito.MockitoSugar.mock @@ -27,6 +30,25 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test_skipping_index" } + test("get index metadata") { + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) + when(indexCol.columnName).thenReturn("test_field") + when(indexCol.columnType).thenReturn("integer") + when(indexCol.outputSchema()).thenReturn(Map("test_field" -> "integer")) + val index = new FlintSparkSkippingIndex(testTable, Seq(indexCol)) + + val metadata = index.metadata() + metadata.kind shouldBe SKIPPING_INDEX_TYPE + metadata.name shouldBe index.name() + metadata.source shouldBe testTable + metadata.indexedColumns shouldBe Array( + Map( + "kind" -> SkippingKind.PARTITION.toString, + "columnName" -> "test_field", + "columnType" -> "integer").asJava) + } + test("can build index building job with unique ID column") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) @@ -40,6 +62,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for boolean column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("boolean_col" -> "boolean")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("boolean_col").expr))) @@ -59,6 +82,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for string column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("string_col" -> "string")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("string_col").expr))) @@ -80,6 +104,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for varchar column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("varchar_col" -> "varchar(20)")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("varchar_col").expr))) @@ -99,6 +124,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for char column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("char_col" -> "char(20)")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("char_col").expr))) @@ -118,6 +144,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for long column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("long_col" -> "bigint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("long_col").expr))) @@ -137,6 +164,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for int column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("int_col" -> "int")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("int_col").expr))) @@ -156,6 +184,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for short column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("short_col" -> "smallint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("short_col").expr))) @@ -175,6 +204,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for byte column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("byte_col" -> "tinyint")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("byte_col").expr))) @@ -194,6 +224,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for double column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("double_col" -> "double")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("double_col").expr))) @@ -213,6 +244,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for float column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("float_col" -> "float")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("float_col").expr))) @@ -232,6 +264,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for timestamp column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("timestamp_col" -> "timestamp")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("timestamp_col").expr))) @@ -252,6 +285,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for date column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()).thenReturn(Map("date_col" -> "date")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("date_col").expr))) @@ -272,6 +306,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("can build index for struct column") { val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.kind).thenReturn(SkippingKind.PARTITION) when(indexCol.outputSchema()) .thenReturn(Map("struct_col" -> "struct")) when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("struct_col").expr))) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 9d34b6f2a..d33343d81 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -11,6 +11,7 @@ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.mockito.Mockito.when import org.opensearch.client.json.jackson.JacksonJsonpMapper import org.opensearch.client.opensearch.OpenSearchClient import org.opensearch.client.transport.rest_client.RestClientTransport @@ -19,6 +20,7 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY @@ -34,7 +36,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val content = """ { | "_meta": { - | "kind": "SkippingIndex" + | "kind": "test_kind" | }, | "properties": { | "age": { @@ -43,41 +45,49 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M | } | } |""".stripMargin - flintClient.createIndex(indexName, new FlintMetadata(content)) + + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn(content) + flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName).getContent should matchJson(content) + flintClient.getIndexMetadata(indexName).kind shouldBe "test_kind" } it should "create index with settings" in { val indexName = "flint_test_with_settings" val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" - flintClient.createIndex(indexName, new FlintMetadata("{}", indexSettings)) + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(indexSettings) + flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true // OS uses full setting name ("index" prefix) and store as string implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(indexName).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } it should "get all index metadata with the given index name pattern" in { - flintClient.createIndex("flint_test_1_index", new FlintMetadata("{}")) - flintClient.createIndex("flint_test_2_index", new FlintMetadata("{}")) + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + flintClient.createIndex("flint_test_1_index", metadata) + flintClient.createIndex("flint_test_2_index", metadata) val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 - allMetadata.forEach(metadata => metadata.getContent shouldBe "{}") - allMetadata.forEach(metadata => metadata.getIndexSettings should not be empty) + allMetadata.forEach(metadata => metadata.getContent should not be empty) + allMetadata.forEach(metadata => metadata.indexSettings should not be empty) } it should "convert index name to all lowercase" in { val indexName = "flint_ELB_logs_index" flintClient.createIndex( indexName, - new FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + FlintMetadata.fromJson("""{"properties": {"test": { "type": "integer" } } }""", null)) flintClient.exists(indexName) shouldBe true flintClient.getIndexMetadata(indexName) should not be null diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 892a8faa4..e197c0f53 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -89,7 +89,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testFlintIndex).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index a688b1370..45af12047 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -94,7 +94,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testIndex).getIndexSettings) + val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } From 02b438f5531da98e06db587559684839f85b4e03 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 11 Oct 2023 20:28:34 -0700 Subject: [PATCH 05/16] Refactor getContent with XContent helper Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 56 +++++++++---------- .../core/metadata/XContentBuilderHelper.scala | 34 +++++++++++ 2 files changed, 61 insertions(+), 29 deletions(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 7697e2d1f..c042e3784 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -8,10 +8,10 @@ package org.opensearch.flint.core.metadata import java.nio.charset.StandardCharsets.UTF_8 import java.util -import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent._ import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.flint.core.FlintVersion +import org.opensearch.flint.core.metadata.XContentBuilderHelper.{buildJson, objectField, optionalObjectField} /** * Flint metadata follows Flint index specification and defines metadata for a Flint index @@ -30,39 +30,37 @@ case class FlintMetadata( def getContent: String = { try { - val builder: XContentBuilder = XContentFactory.jsonBuilder - builder.startObject - - builder.startObject("_meta") - builder.field( - "version", - if (version == null) FlintVersion.current().version else version.version) - builder.field("name", name) - builder.field("kind", kind) - builder.field("source", source) - builder.field("indexedColumns", indexedColumns) - - if (options == null) { - builder.startObject("options").endObject() - } else { - builder.field("options", options) - } - - if (properties == null) { - builder.startObject("properties").endObject() - } else { - builder.field("properties", properties) - } - - builder.endObject - builder.field("properties", schema) - builder.endObject - BytesReference.bytes(builder).utf8ToString + buildJson(builder => { + // Add _meta field + objectField( + builder, + "_meta", { + builder.field("version", versionOrDefault()) + builder.field("name", name) + builder.field("kind", kind) + builder.field("source", source) + builder.field("indexedColumns", indexedColumns) + + optionalObjectField(builder, "options", options) + optionalObjectField(builder, "properties", properties) + }) + + // Add properties (schema) field + builder.field("properties", schema) + }) } catch { case e: Exception => throw new IllegalStateException("Failed to jsonify Flint metadata", e) } } + + private def versionOrDefault(): String = { + if (version == null) { + FlintVersion.current().version + } else { + version.version + } + } } object FlintMetadata { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala new file mode 100644 index 000000000..d0ab58c95 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata + +import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.xcontent.{XContentBuilder, XContentFactory} + +object XContentBuilderHelper { + + def buildJson(block: XContentBuilder => Unit): String = { + val builder: XContentBuilder = XContentFactory.jsonBuilder + builder.startObject + block(builder) + builder.endObject() + BytesReference.bytes(builder).utf8ToString + } + + def objectField(builder: XContentBuilder, name: String, block: => Unit): Unit = { + builder.startObject(name) + block + builder.endObject() + } + + def optionalObjectField(builder: XContentBuilder, name: String, value: AnyRef): Unit = { + if (value == null) { + builder.startObject(name).endObject() + } else { + builder.field(name, value) + } + } +} From a9ea6804b74effafa680944830118267b8df2bcc Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 11 Oct 2023 20:47:39 -0700 Subject: [PATCH 06/16] Refactor fromJson with XContent helper Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 73 ++++++++----------- .../core/metadata/XContentBuilderHelper.scala | 31 +++++++- 2 files changed, 59 insertions(+), 45 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index c042e3784..21ef83e1c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -5,13 +5,10 @@ package org.opensearch.flint.core.metadata -import java.nio.charset.StandardCharsets.UTF_8 import java.util -import org.opensearch.common.xcontent._ -import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.flint.core.FlintVersion -import org.opensearch.flint.core.metadata.XContentBuilderHelper.{buildJson, objectField, optionalObjectField} +import org.opensearch.flint.core.metadata.XContentBuilderHelper._ /** * Flint metadata follows Flint index specification and defines metadata for a Flint index @@ -66,52 +63,40 @@ case class FlintMetadata( object FlintMetadata { def fromJson(content: String, settings: String): FlintMetadata = { + val builder = new FlintMetadata.Builder() try { - val parser: XContentParser = JsonXContent.jsonXContent.createParser( - NamedXContentRegistry.EMPTY, - DeprecationHandler.IGNORE_DEPRECATIONS, - content.getBytes(UTF_8)) - parser.nextToken() // Start parsing - - val builder = new FlintMetadata.Builder() - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - val fieldName: String = parser.currentName() - parser.nextToken() // Move to the field value - - if ("_meta".equals(fieldName)) { - parseMetaField(parser, builder) - } else if ("properties".equals(fieldName)) { - builder.schema(parser.map()) - } - } - - builder.indexSettings(settings) - builder.build() + parseJson( + content, + (parser, fieldName) => + fieldName match { + case "_meta" => + parseObjectField( + parser, + (parser, innerFieldName) => + innerFieldName match { + case "version" => builder.version(FlintVersion.apply(parser.text())) + case "name" => builder.name(parser.text()) + case "kind" => builder.kind(parser.text()) + case "source" => builder.source(parser.text()) + case "indexedColumns" => + parseArrayField( + parser, { + builder.addIndexedColumn(parser.map()) + }) + case "options" => builder.options(parser.map()) + case "properties" => builder.properties(parser.map()) + case _ => // Handle other fields as needed + }) + case "properties" => + builder.schema(parser.map()) + }) } catch { case e: Exception => throw new IllegalStateException("Failed to parse metadata JSON", e) } - } - private def parseMetaField(parser: XContentParser, builder: Builder): Unit = { - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - val metaFieldName: String = parser.currentName() - parser.nextToken() - - metaFieldName match { - case "version" => builder.version(FlintVersion.apply(parser.text())) - case "name" => builder.name(parser.text()) - case "kind" => builder.kind(parser.text()) - case "source" => builder.source(parser.text()) - case "indexedColumns" => - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - builder.addIndexedColumn(parser.map()) - } - case "options" => builder.options(parser.map()) - case "properties" => builder.properties(parser.map()) - case _ => // Handle other fields as needed - } - } + builder.indexSettings(settings) + builder.build() } def builder(): FlintMetadata.Builder = new Builder diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala index d0ab58c95..959c60eae 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala @@ -5,8 +5,11 @@ package org.opensearch.flint.core.metadata +import java.nio.charset.StandardCharsets.UTF_8 + import org.opensearch.common.bytes.BytesReference -import org.opensearch.common.xcontent.{XContentBuilder, XContentFactory} +import org.opensearch.common.xcontent._ +import org.opensearch.common.xcontent.json.JsonXContent object XContentBuilderHelper { @@ -31,4 +34,30 @@ object XContentBuilderHelper { builder.field(name, value) } } + + def parseJson(json: String, block: (XContentParser, String) => Unit): Unit = { + val parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + json.getBytes(UTF_8)) + + // Start parsing + parser.nextToken() + parseObjectField(parser, block) + } + + def parseObjectField(parser: XContentParser, block: (XContentParser, String) => Unit): Unit = { + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName: String = parser.currentName() + parser.nextToken() // Move to the field value + + block(parser, fieldName) + } + } + + def parseArrayField(parser: XContentParser, block: => Unit): Unit = { + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + block + } + } } From c39f30bbe1cf40c115d396aa7c050e382b05921f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 11 Oct 2023 20:58:45 -0700 Subject: [PATCH 07/16] Refactor method arg for readability Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 35 ++++++++++--------- .../core/metadata/XContentBuilderHelper.scala | 6 ++-- .../core/storage/FlintOpenSearchClient.java | 4 +-- .../core/metadata/FlintMetadataSuite.scala | 2 +- .../core/FlintOpenSearchClientSuite.scala | 2 +- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 21ef83e1c..684f892fb 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -29,18 +29,16 @@ case class FlintMetadata( try { buildJson(builder => { // Add _meta field - objectField( - builder, - "_meta", { - builder.field("version", versionOrDefault()) - builder.field("name", name) - builder.field("kind", kind) - builder.field("source", source) - builder.field("indexedColumns", indexedColumns) - - optionalObjectField(builder, "options", options) - optionalObjectField(builder, "properties", properties) - }) + objectField(builder, "_meta") { + builder.field("version", versionOrDefault()) + builder.field("name", name) + builder.field("kind", kind) + builder.field("source", source) + builder.field("indexedColumns", indexedColumns) + + optionalObjectField(builder, "options", options) + optionalObjectField(builder, "properties", properties) + } // Add properties (schema) field builder.field("properties", schema) @@ -62,7 +60,11 @@ case class FlintMetadata( object FlintMetadata { - def fromJson(content: String, settings: String): FlintMetadata = { + def apply(content: String): FlintMetadata = { + FlintMetadata(content, null) + } + + def apply(content: String, settings: String): FlintMetadata = { val builder = new FlintMetadata.Builder() try { parseJson( @@ -79,10 +81,9 @@ object FlintMetadata { case "kind" => builder.kind(parser.text()) case "source" => builder.source(parser.text()) case "indexedColumns" => - parseArrayField( - parser, { - builder.addIndexedColumn(parser.map()) - }) + parseArrayField(parser) { + builder.addIndexedColumn(parser.map()) + } case "options" => builder.options(parser.map()) case "properties" => builder.properties(parser.map()) case _ => // Handle other fields as needed diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala index 959c60eae..f9e315f20 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala @@ -21,7 +21,7 @@ object XContentBuilderHelper { BytesReference.bytes(builder).utf8ToString } - def objectField(builder: XContentBuilder, name: String, block: => Unit): Unit = { + def objectField(builder: XContentBuilder, name: String)(block: => Unit): Unit = { builder.startObject(name) block builder.endObject() @@ -41,7 +41,7 @@ object XContentBuilderHelper { DeprecationHandler.IGNORE_DEPRECATIONS, json.getBytes(UTF_8)) - // Start parsing + // Read first root object token and start parsing parser.nextToken() parseObjectField(parser, block) } @@ -55,7 +55,7 @@ object XContentBuilderHelper { } } - def parseArrayField(parser: XContentParser, block: => Unit): Unit = { + def parseArrayField(parser: XContentParser)(block: => Unit): Unit = { while (parser.nextToken() != XContentParser.Token.END_ARRAY) { block } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 38283de80..6aaa3081c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -98,7 +98,7 @@ public FlintOpenSearchClient(FlintOptions options) { GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) - .map(index -> FlintMetadata.fromJson( + .map(index -> FlintMetadata.apply( response.getMappings().get(index).source().toString(), response.getSettings().get(index).toString())) .collect(Collectors.toList()); @@ -115,7 +115,7 @@ public FlintOpenSearchClient(FlintOptions options) { MappingMetadata mapping = response.getMappings().get(osIndexName); Settings settings = response.getSettings().get(osIndexName); - return FlintMetadata.fromJson(mapping.source().string(), settings.toString()); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala index f2790f5b4..28814b38b 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -43,7 +43,7 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { |""".stripMargin "constructor" should "deserialize the given JSON and assign parsed value to field" in { - val metadata = FlintMetadata.fromJson(testMetadataJson, testIndexSettingsJson) + val metadata = FlintMetadata(testMetadataJson, testIndexSettingsJson) metadata.version shouldBe current() metadata.name shouldBe "test_index" diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index d33343d81..8358ef007 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -87,7 +87,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val indexName = "flint_ELB_logs_index" flintClient.createIndex( indexName, - FlintMetadata.fromJson("""{"properties": {"test": { "type": "integer" } } }""", null)) + FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) flintClient.exists(indexName) shouldBe true flintClient.getIndexMetadata(indexName) should not be null From cdab8261be92611e311e63980cffa632a5d19d27 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 08:17:40 -0700 Subject: [PATCH 08/16] Refactor method arg for readability Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 32 ++++++++++--------- .../core/metadata/XContentBuilderHelper.scala | 6 ++-- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 684f892fb..c58afeb17 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -60,21 +60,21 @@ case class FlintMetadata( object FlintMetadata { - def apply(content: String): FlintMetadata = { - FlintMetadata(content, null) + def apply(content: String, settings: String): FlintMetadata = { + val metadata = FlintMetadata(content) + metadata.copy(indexSettings = settings) + metadata } - def apply(content: String, settings: String): FlintMetadata = { - val builder = new FlintMetadata.Builder() + def apply(content: String): FlintMetadata = { try { - parseJson( - content, - (parser, fieldName) => + val builder = new FlintMetadata.Builder() + parseJson(content) { (parser, fieldName) => + { fieldName match { case "_meta" => - parseObjectField( - parser, - (parser, innerFieldName) => + parseObjectField(parser) { (parser, innerFieldName) => + { innerFieldName match { case "version" => builder.version(FlintVersion.apply(parser.text())) case "name" => builder.name(parser.text()) @@ -87,17 +87,19 @@ object FlintMetadata { case "options" => builder.options(parser.map()) case "properties" => builder.properties(parser.map()) case _ => // Handle other fields as needed - }) + } + } + } case "properties" => builder.schema(parser.map()) - }) + } + } + } + builder.build() } catch { case e: Exception => throw new IllegalStateException("Failed to parse metadata JSON", e) } - - builder.indexSettings(settings) - builder.build() } def builder(): FlintMetadata.Builder = new Builder diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala index f9e315f20..4e6b09da1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala @@ -35,7 +35,7 @@ object XContentBuilderHelper { } } - def parseJson(json: String, block: (XContentParser, String) => Unit): Unit = { + def parseJson(json: String)(block: (XContentParser, String) => Unit): Unit = { val parser = JsonXContent.jsonXContent.createParser( NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, @@ -43,10 +43,10 @@ object XContentBuilderHelper { // Read first root object token and start parsing parser.nextToken() - parseObjectField(parser, block) + parseObjectField(parser)(block) } - def parseObjectField(parser: XContentParser, block: (XContentParser, String) => Unit): Unit = { + def parseObjectField(parser: XContentParser)(block: (XContentParser, String) => Unit): Unit = { while (parser.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName: String = parser.currentName() parser.nextToken() // Move to the field value From 977aa3f34d71ded72bc304f5f36cce54eeec3886 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 08:47:39 -0700 Subject: [PATCH 09/16] Fix complex schema issue Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 28 ++++++++++++----- .../core/metadata/XContentBuilderHelper.scala | 8 +++-- .../core/metadata/FlintMetadataSuite.scala | 2 +- .../flint/spark/FlintSparkIndex.scala | 31 +++++++++++++++++++ .../FlintSparkIndexMetadataBuilder.scala | 19 ++---------- 5 files changed, 62 insertions(+), 26 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index c58afeb17..2795fb258 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -25,6 +25,12 @@ case class FlintMetadata( schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], indexSettings: String = null) { + /** + * Generate JSON content as index metadata. + * + * @return + * JSON content + */ def getContent: String = { try { buildJson(builder => { @@ -66,6 +72,14 @@ object FlintMetadata { metadata } + /** + * Parse the given JSON content and construct Flint metadata class. + * + * @param content + * JSON content + * @return + * Flint metadata + */ def apply(content: String): FlintMetadata = { try { val builder = new FlintMetadata.Builder() @@ -144,11 +158,6 @@ object FlintMetadata { this } - def addOption(key: String, value: AnyRef): this.type = { - this.options.put(key, value) - this - } - def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): this.type = { this.indexedColumns = indexedColumns this @@ -174,8 +183,13 @@ object FlintMetadata { this } - def addSchemaField(key: String, value: AnyRef): this.type = { - schema.put(key, value) + def schema(schema: String): this.type = { + parseJson(schema) { (parser, fieldName) => + fieldName match { + case "properties" => this.schema = parser.map() + case _ => // do nothing + } + } this } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala index 4e6b09da1..56b6cc232 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala @@ -35,11 +35,15 @@ object XContentBuilderHelper { } } - def parseJson(json: String)(block: (XContentParser, String) => Unit): Unit = { - val parser = JsonXContent.jsonXContent.createParser( + def createJsonParser(json: String): XContentParser = { + JsonXContent.jsonXContent.createParser( NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, json.getBytes(UTF_8)) + } + + def parseJson(json: String)(block: (XContentParser, String) => Unit): Unit = { + val parser = createJsonParser(json) // Read first root object token and start parsing parser.nextToken() diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala index 28814b38b..dc2f5fe6a 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/metadata/FlintMetadataSuite.scala @@ -59,7 +59,7 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers { builder.kind("test_kind") builder.source("test_source_table") builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava); - builder.addSchemaField("test_field", Map("type" -> "os_type").asJava) + builder.schema("""{"properties": {"test_field": {"type": "os_type"}}}""") val metadata = builder.build() metadata.getContent should matchJson(testMetadataJson) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index a19e603dc..09102dc46 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata import org.apache.spark.sql.DataFrame @@ -81,4 +83,33 @@ object FlintSparkIndex { .map(value => key -> value)) .toMap } + + /** + * Create Flint metadata builder with common fields. + * + * @param index + * Flint index + * @return + * Flint metadata builder + */ + def metadataBuilder(index: FlintSparkIndex): FlintMetadata.Builder = { + val builder = new FlintMetadata.Builder() + // Common fields + builder.kind(index.kind) + builder.name(index.name()) + builder.options(index.options.options.mapValues(_.asInstanceOf[AnyRef]).asJava) + + // Index properties + val envs = populateEnvToMetadata + if (envs.nonEmpty) { + builder.addProperty("env", envs.asJava) + } + + // Optional index settings + val settings = index.options.indexSettings() + if (settings.isDefined) { + builder.indexSettings(settings.get) + } + builder + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala index 5b51520cb..d9fe7b475 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala @@ -24,22 +24,9 @@ class FlintSparkIndexMetadataBuilder(index: FlintSparkIndex) extends FlintMetada allFieldTypes .map { case (colName, colType) => s"$colName $colType not null" } .mkString(",") - val struckType = StructType.fromDDL(catalogDDL) - - // Assume each value is an JSON Object - struckType.fields.foreach(field => { - val (fieldName, fieldType) = FlintDataType.serializeField(field) - val fieldTypeMap = - fieldType - .asInstanceOf[JObject] - .values - .mapValues { - case v: Map[_, _] => v.asJava - case other => other - } - .asJava - addSchemaField(fieldName, fieldTypeMap) - }) + val structType = StructType.fromDDL(catalogDDL) + val properties = FlintDataType.serialize(structType) + schema(properties) this } From ded6831061ef6ab799922122dca979ba34438409 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 09:09:38 -0700 Subject: [PATCH 10/16] Remove custom metadata builder Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 1 - .../flint/spark/FlintSparkIndex.scala | 14 ++++- .../FlintSparkIndexMetadataBuilder.scala | 56 ------------------- .../covering/FlintSparkCoveringIndex.scala | 18 +++--- .../skipping/FlintSparkSkippingIndex.scala | 32 ++++++----- 5 files changed, 40 insertions(+), 81 deletions(-) delete mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 2795fb258..a8f165ceb 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -69,7 +69,6 @@ object FlintMetadata { def apply(content: String, settings: String): FlintMetadata = { val metadata = FlintMetadata(content) metadata.copy(indexSettings = settings) - metadata } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 09102dc46..a9d16ce6f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -6,10 +6,10 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters.mapAsJavaMapConverter - import org.opensearch.flint.core.metadata.FlintMetadata - import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.types.StructType /** * Flint index interface in Spark. @@ -112,4 +112,14 @@ object FlintSparkIndex { } builder } + + def generateSchemaJSON(allFieldTypes: Map[String, String]): String = { + val catalogDDL = + allFieldTypes + .map { case (colName, colType) => s"$colName $colType not null" } + .mkString(",") + + val structType = StructType.fromDDL(catalogDDL) + FlintDataType.serialize(structType) + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala deleted file mode 100644 index d9fe7b475..000000000 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMetadataBuilder.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.spark - -import scala.collection.JavaConverters.mapAsJavaMapConverter - -import org.json4s.JObject -import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.spark.FlintSparkIndex.populateEnvToMetadata - -import org.apache.spark.sql.flint.datatype.FlintDataType -import org.apache.spark.sql.types.StructType - -/** - * Flint Spark metadata builder with common build logic. - */ -class FlintSparkIndexMetadataBuilder(index: FlintSparkIndex) extends FlintMetadata.Builder { - - def schema(allFieldTypes: Map[String, String]): FlintSparkIndexMetadataBuilder = { - val catalogDDL = - allFieldTypes - .map { case (colName, colType) => s"$colName $colType not null" } - .mkString(",") - val structType = StructType.fromDDL(catalogDDL) - val properties = FlintDataType.serialize(structType) - schema(properties) - this - } - - override def build(): FlintMetadata = { - // Common fields in all Flint Spark index - kind(index.kind) - name(index.name()) - options(index.options.options.mapValues(_.asInstanceOf[AnyRef]).asJava) - - val envs = populateEnvToMetadata - if (envs.nonEmpty) { - addProperty("env", envs.asJava) - } - - val settings = index.options.indexSettings() - if (settings.isDefined) { - indexSettings(settings.get) - } - super.build() - } -} - -object FlintSparkIndexMetadataBuilder { - - def builder(index: FlintSparkIndex): FlintSparkIndexMetadataBuilder = - new FlintSparkIndexMetadataBuilder(index) -} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index ef6203a8b..d89b2a844 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -9,7 +9,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} @@ -39,15 +39,17 @@ case class FlintSparkCoveringIndex( override def name(): String = getFlintIndexName(indexName, tableName) override def metadata(): FlintMetadata = { - val builder = FlintSparkIndexMetadataBuilder.builder(this) - - indexedColumns.map { case (colName, colType) => - builder.addIndexedColumn( - Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava) + val indexColumnMaps = { + indexedColumns.map { case (colName, colType) => + Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava + }.toArray } - builder + val schemaJson = generateSchemaJSON(indexedColumns) + + metadataBuilder(this) .source(tableName) - .schema(indexedColumns) + .indexedColumns(indexColumnMaps) + .schema(schemaJson) .build() } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index ef6636212..020d98cda 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -9,7 +9,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, ID_COLUMN} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy @@ -44,21 +44,25 @@ class FlintSparkSkippingIndex( } override def metadata(): FlintMetadata = { - val builder = FlintSparkIndexMetadataBuilder.builder(this) - - indexedColumns.map(col => - builder.addIndexedColumn( - Map[String, AnyRef]( - "kind" -> col.kind.toString, - "columnName" -> col.columnName, - "columnType" -> col.columnType).asJava)) - - val allFieldTypes = + val indexColumnMaps = indexedColumns - .flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") - builder + .map(col => + Map[String, AnyRef]( + "kind" -> col.kind.toString, + "columnName" -> col.columnName, + "columnType" -> col.columnType).asJava) + .toArray + + val fieldTypes = + indexedColumns + .flatMap(_.outputSchema()) + .toMap + (FILE_PATH_COLUMN -> "string") + val schemaJson = generateSchemaJSON(fieldTypes) + + metadataBuilder(this) .source(tableName) - .schema(allFieldTypes) + .indexedColumns(indexColumnMaps) + .schema(schemaJson) .build() } From e15a23fd80032fd3a05ac32efcb12d9b94a15478 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 09:34:00 -0700 Subject: [PATCH 11/16] Change indexSettings to Option Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 35 +++++++++++-------- .../core/storage/FlintOpenSearchClient.java | 6 ++-- .../core/FlintOpenSearchClientSuite.scala | 7 ++-- .../FlintSparkCoveringIndexSqlITSuite.scala | 2 +- .../FlintSparkSkippingIndexSqlITSuite.scala | 2 +- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index a8f165ceb..cdf0cd0cb 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.core.metadata import java.util import org.opensearch.flint.core.FlintVersion +import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.core.metadata.XContentBuilderHelper._ /** @@ -15,15 +16,29 @@ import org.opensearch.flint.core.metadata.XContentBuilderHelper._ * regardless of query engine integration and storage. */ case class FlintMetadata( - version: FlintVersion = FlintVersion.current(), + /** Flint spec version */ + version: FlintVersion, + /** Flint index name */ name: String, + /** Flint index kind */ kind: String, + /** Flint index source that index data derived from */ source: String, + /** Flint indexed column list */ indexedColumns: Array[util.Map[String, AnyRef]] = Array(), + /** Flint indexed options. TODO: move to properties? */ options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + /** Flint index properties for any custom fields */ properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], + /** Flint index schema */ schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], - indexSettings: String = null) { + /** Optional Flint index settings. TODO: move elsewhere? */ + indexSettings: Option[String]) { + + require(version != null, "version is required") + require(name != null, "name is required") + require(kind != null, "kind is required") + require(source != null, "source is required") /** * Generate JSON content as index metadata. @@ -36,7 +51,7 @@ case class FlintMetadata( buildJson(builder => { // Add _meta field objectField(builder, "_meta") { - builder.field("version", versionOrDefault()) + builder.field("version", version.version) builder.field("name", name) builder.field("kind", kind) builder.field("source", source) @@ -54,21 +69,13 @@ case class FlintMetadata( throw new IllegalStateException("Failed to jsonify Flint metadata", e) } } - - private def versionOrDefault(): String = { - if (version == null) { - FlintVersion.current().version - } else { - version.version - } - } } object FlintMetadata { def apply(content: String, settings: String): FlintMetadata = { val metadata = FlintMetadata(content) - metadata.copy(indexSettings = settings) + metadata.copy(indexSettings = Option(settings)) } /** @@ -200,7 +207,7 @@ object FlintMetadata { // Build method to create the FlintMetadata instance def build(): FlintMetadata = { FlintMetadata( - version, + if (version == null) current() else version, name, kind, source, @@ -208,7 +215,7 @@ object FlintMetadata { options, properties, schema, - indexSettings) + Option(indexSettings)) } } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 6aaa3081c..4badfe8f4 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -47,6 +47,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.search.SearchModule; import org.opensearch.search.builder.SearchSourceBuilder; +import scala.Option; /** * Flint client implementation for OpenSearch storage. @@ -73,8 +74,9 @@ public FlintOpenSearchClient(FlintOptions options) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(metadata.getContent(), XContentType.JSON); - if (metadata.indexSettings() != null) { - request.settings(metadata.indexSettings(), XContentType.JSON); + Option settings = metadata.indexSettings(); + if (settings.isDefined()) { + request.settings(settings.get(), XContentType.JSON); } client.indices().create(request, RequestOptions.DEFAULT); } catch (Exception e) { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 8358ef007..5c799128c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -7,7 +7,6 @@ package org.opensearch.flint.core import scala.collection.JavaConverters._ -import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization @@ -48,6 +47,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val metadata = mock[FlintMetadata] when(metadata.getContent).thenReturn(content) + when(metadata.indexSettings).thenReturn(None) flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true @@ -59,14 +59,14 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" val metadata = mock[FlintMetadata] when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(indexSettings) + when(metadata.indexSettings).thenReturn(Some(indexSettings)) flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true // OS uses full setting name ("index" prefix) and store as string implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings) + val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } @@ -74,6 +74,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M it should "get all index metadata with the given index name pattern" in { val metadata = mock[FlintMetadata] when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(None) flintClient.createIndex("flint_test_1_index", metadata) flintClient.createIndex("flint_test_2_index", metadata) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index e197c0f53..714911a99 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -89,7 +89,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings) + val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 45af12047..d418af30f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -94,7 +94,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings) + val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } From 5a4d0c91071b3fa989f31e572db9682f1010722f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 09:42:08 -0700 Subject: [PATCH 12/16] Fix code style Signed-off-by: Chen Dai --- .../main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index a9d16ce6f..d523f565b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -6,7 +6,9 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.core.metadata.FlintMetadata + import org.apache.spark.sql.DataFrame import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.types.StructType From ca444490b9262a7a241bf62233628dc1e0b2dbe5 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 10:18:26 -0700 Subject: [PATCH 13/16] Fix index name issue Signed-off-by: Chen Dai --- .../main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala | 1 - .../flint/spark/covering/FlintSparkCoveringIndex.scala | 1 + .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 1 + .../opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala | 2 ++ 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index d523f565b..b38010bf2 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -98,7 +98,6 @@ object FlintSparkIndex { val builder = new FlintMetadata.Builder() // Common fields builder.kind(index.kind) - builder.name(index.name()) builder.options(index.options.options.mapValues(_.asInstanceOf[AnyRef]).asJava) // Index properties diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index d89b2a844..b97c3fea3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -47,6 +47,7 @@ case class FlintSparkCoveringIndex( val schemaJson = generateSchemaJSON(indexedColumns) metadataBuilder(this) + .name(indexName) .source(tableName) .indexedColumns(indexColumnMaps) .schema(schemaJson) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 020d98cda..ec213a3cd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -60,6 +60,7 @@ class FlintSparkSkippingIndex( val schemaJson = generateSchemaJSON(fieldTypes) metadataBuilder(this) + .name(name()) .source(tableName) .indexedColumns(indexColumnMaps) .schema(schemaJson) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index ac0b33746..b30b46273 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined @@ -45,6 +46,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { + | "version": "${current()}", | "name": "name_and_age", | "kind": "covering", | "indexedColumns": [ From 95c80911a907611245cb46fa8d57a1885eecfc3f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 11:14:33 -0700 Subject: [PATCH 14/16] Add more javadoc and fix FlintJob Signed-off-by: Chen Dai --- ...lderHelper.scala => FlintJsonHelper.scala} | 67 ++++++++++++++++++- .../flint/core/metadata/FlintMetadata.scala | 18 +++-- .../scala/org/apache/spark/sql/FlintJob.scala | 2 +- 3 files changed, 81 insertions(+), 6 deletions(-) rename flint-core/src/main/scala/org/opensearch/flint/core/metadata/{XContentBuilderHelper.scala => FlintJsonHelper.scala} (55%) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala similarity index 55% rename from flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala rename to flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala index 56b6cc232..4c1991edc 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/XContentBuilderHelper.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintJsonHelper.scala @@ -11,8 +11,19 @@ import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent._ import org.opensearch.common.xcontent.json.JsonXContent -object XContentBuilderHelper { +/** + * JSON parsing and building helper. + */ +object FlintJsonHelper { + /** + * Build JSON by creating JSON builder and pass it to the given function. + * + * @param block + * building logic with JSON builder + * @return + * JSON string + */ def buildJson(block: XContentBuilder => Unit): String = { val builder: XContentBuilder = XContentFactory.jsonBuilder builder.startObject @@ -21,12 +32,34 @@ object XContentBuilderHelper { BytesReference.bytes(builder).utf8ToString } + /** + * Add an object field of the name to the JSON builder and continue building it with the given + * function. + * + * @param builder + * JSON builder + * @param name + * field name + * @param block + * building logic on the JSON field + */ def objectField(builder: XContentBuilder, name: String)(block: => Unit): Unit = { builder.startObject(name) block builder.endObject() } + /** + * Add an optional object field of the name to the JSON builder. Add an empty object field if + * the value is null. + * + * @param builder + * JSON builder + * @param name + * field name + * @param value + * field value + */ def optionalObjectField(builder: XContentBuilder, name: String, value: AnyRef): Unit = { if (value == null) { builder.startObject(name).endObject() @@ -35,6 +68,14 @@ object XContentBuilderHelper { } } + /** + * Create a XContent JSON parser on the given JSON string. + * + * @param json + * JSON string + * @return + * JSON parser + */ def createJsonParser(json: String): XContentParser = { JsonXContent.jsonXContent.createParser( NamedXContentRegistry.EMPTY, @@ -42,6 +83,14 @@ object XContentBuilderHelper { json.getBytes(UTF_8)) } + /** + * Parse the given JSON string by creating JSON parser and pass it to the parsing function. + * + * @param json + * JSON string + * @param block + * parsing logic with the parser + */ def parseJson(json: String)(block: (XContentParser, String) => Unit): Unit = { val parser = createJsonParser(json) @@ -50,6 +99,14 @@ object XContentBuilderHelper { parseObjectField(parser)(block) } + /** + * Parse each inner field in the object field with the given parsing function. + * + * @param parser + * JSON parser + * @param block + * parsing logic on each inner field + */ def parseObjectField(parser: XContentParser)(block: (XContentParser, String) => Unit): Unit = { while (parser.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName: String = parser.currentName() @@ -59,6 +116,14 @@ object XContentBuilderHelper { } } + /** + * Parse each inner field in the array field. + * + * @param parser + * JSON parser + * @param block + * parsing logic on each inner field + */ def parseArrayField(parser: XContentParser)(block: => Unit): Unit = { while (parser.nextToken() != XContentParser.Token.END_ARRAY) { block diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index cdf0cd0cb..d61fc83ea 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -9,7 +9,7 @@ import java.util import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.FlintVersion.current -import org.opensearch.flint.core.metadata.XContentBuilderHelper._ +import org.opensearch.flint.core.metadata.FlintJsonHelper._ /** * Flint metadata follows Flint index specification and defines metadata for a Flint index @@ -73,6 +73,16 @@ case class FlintMetadata( object FlintMetadata { + /** + * Construct Flint metadata with JSON content and index settings. + * + * @param content + * JSON content + * @param settings + * index settings + * @return + * Flint metadata + */ def apply(content: String, settings: String): FlintMetadata = { val metadata = FlintMetadata(content) metadata.copy(indexSettings = Option(settings)) @@ -137,7 +147,7 @@ object FlintMetadata { private var indexedColumns: Array[util.Map[String, AnyRef]] = Array() private var properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() - private var indexSettings: String = null + private var indexSettings: Option[String] = None def version(version: FlintVersion): this.type = { this.version = version @@ -200,7 +210,7 @@ object FlintMetadata { } def indexSettings(indexSettings: String): this.type = { - this.indexSettings = indexSettings + this.indexSettings = Option(indexSettings) this } @@ -215,7 +225,7 @@ object FlintMetadata { options, properties, schema, - Option(indexSettings)) + indexSettings) } } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index d12d03565..0a853dc92 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -294,7 +294,7 @@ object FlintJob extends Logging { mapping: String): (Boolean, String) = { try { logInfo(s"create $resultIndex") - flintClient.createIndex(resultIndex, new FlintMetadata(mapping)) + flintClient.createIndex(resultIndex, FlintMetadata.apply(mapping)) logInfo(s"create $resultIndex successfully") (true, "") } catch { From bbe7ef84d61e76f3b00a7e7d0c95f4941eb5d5ce Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 12 Oct 2023 11:18:12 -0700 Subject: [PATCH 15/16] Update user manual Signed-off-by: Chen Dai --- docs/index.md | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/docs/index.md b/docs/index.md index 44b0052b0..f801e7d52 100644 --- a/docs/index.md +++ b/docs/index.md @@ -32,20 +32,17 @@ Currently, Flint metadata is only static configuration without version control a ```json { - "version": "0.1", - "indexConfig": { - "kind": "skipping", - "properties": { - "indexedColumns": [{ - "kind": "...", - "columnName": "...", - "columnType": "..." - }] - } - }, - "source": "alb_logs", - "state": "active", - "enabled": true + "version": "0.1.0", + "name": "...", + "kind": "skipping", + "source": "...", + "indexedColumns": [{ + "kind": "...", + "columnName": "...", + "columnType": "..." + }], + "options": { }, + "properties": { } } ``` From eb5865847f9d059717e2f12b925066fc785cc654 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 13 Oct 2023 11:12:52 -0700 Subject: [PATCH 16/16] Use fluent API of XContent builder Signed-off-by: Chen Dai --- .../flint/core/metadata/FlintMetadata.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index d61fc83ea..ea0fb0f98 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -51,11 +51,12 @@ case class FlintMetadata( buildJson(builder => { // Add _meta field objectField(builder, "_meta") { - builder.field("version", version.version) - builder.field("name", name) - builder.field("kind", kind) - builder.field("source", source) - builder.field("indexedColumns", indexedColumns) + builder + .field("version", version.version) + .field("name", name) + .field("kind", kind) + .field("source", source) + .field("indexedColumns", indexedColumns) optionalObjectField(builder, "options", options) optionalObjectField(builder, "properties", properties)