Skip to content

Commit

Permalink
Refactor Flint index metadata (opensearch-project#70)
Browse files Browse the repository at this point in the history
* Refactor Flint metadata content to strong type

Signed-off-by: Chen Dai <[email protected]>

* Refactor Flint metadata to Scala case class

Signed-off-by: Chen Dai <[email protected]>

* Make indexed columns extendable in Flint Spark

Signed-off-by: Chen Dai <[email protected]>

* Fix IT and UT

Signed-off-by: Chen Dai <[email protected]>

* Refactor getContent with XContent helper

Signed-off-by: Chen Dai <[email protected]>

* Refactor fromJson with XContent helper

Signed-off-by: Chen Dai <[email protected]>

* Refactor method arg for readability

Signed-off-by: Chen Dai <[email protected]>

* Refactor method arg for readability

Signed-off-by: Chen Dai <[email protected]>

* Fix complex schema issue

Signed-off-by: Chen Dai <[email protected]>

* Remove custom metadata builder

Signed-off-by: Chen Dai <[email protected]>

* Change indexSettings to Option

Signed-off-by: Chen Dai <[email protected]>

* Fix code style

Signed-off-by: Chen Dai <[email protected]>

* Fix index name issue

Signed-off-by: Chen Dai <[email protected]>

* Add more javadoc and fix FlintJob

Signed-off-by: Chen Dai <[email protected]>

* Update user manual

Signed-off-by: Chen Dai <[email protected]>

* Use fluent API of XContent builder

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Oct 17, 2023
1 parent 2c184ca commit b4f0a81
Show file tree
Hide file tree
Showing 17 changed files with 618 additions and 221 deletions.
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ lazy val flintCore = (project in file("flint-core"))
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion
exclude ("org.apache.logging.log4j", "log4j-api"),
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"),
publish / skip := true)

lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
Expand Down
25 changes: 11 additions & 14 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ Currently, Flint metadata is only static configuration without version control a

```json
{
"version": "0.1",
"indexConfig": {
"kind": "skipping",
"properties": {
"indexedColumns": [{
"kind": "...",
"columnName": "...",
"columnType": "..."
}]
}
},
"source": "alb_logs",
"state": "active",
"enabled": true
"version": "0.1.0",
"name": "...",
"kind": "skipping",
"source": "...",
"indexedColumns": [{
"kind": "...",
"columnName": "...",
"columnType": "..."
}],
"options": { },
"properties": { }
}
```

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata

import java.nio.charset.StandardCharsets.UTF_8

import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent._
import org.opensearch.common.xcontent.json.JsonXContent

/**
* JSON parsing and building helper.
*/
object FlintJsonHelper {

/**
* Build JSON by creating JSON builder and pass it to the given function.
*
* @param block
* building logic with JSON builder
* @return
* JSON string
*/
def buildJson(block: XContentBuilder => Unit): String = {
val builder: XContentBuilder = XContentFactory.jsonBuilder
builder.startObject
block(builder)
builder.endObject()
BytesReference.bytes(builder).utf8ToString
}

/**
* Add an object field of the name to the JSON builder and continue building it with the given
* function.
*
* @param builder
* JSON builder
* @param name
* field name
* @param block
* building logic on the JSON field
*/
def objectField(builder: XContentBuilder, name: String)(block: => Unit): Unit = {
builder.startObject(name)
block
builder.endObject()
}

/**
* Add an optional object field of the name to the JSON builder. Add an empty object field if
* the value is null.
*
* @param builder
* JSON builder
* @param name
* field name
* @param value
* field value
*/
def optionalObjectField(builder: XContentBuilder, name: String, value: AnyRef): Unit = {
if (value == null) {
builder.startObject(name).endObject()
} else {
builder.field(name, value)
}
}

/**
* Create a XContent JSON parser on the given JSON string.
*
* @param json
* JSON string
* @return
* JSON parser
*/
def createJsonParser(json: String): XContentParser = {
JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.IGNORE_DEPRECATIONS,
json.getBytes(UTF_8))
}

/**
* Parse the given JSON string by creating JSON parser and pass it to the parsing function.
*
* @param json
* JSON string
* @param block
* parsing logic with the parser
*/
def parseJson(json: String)(block: (XContentParser, String) => Unit): Unit = {
val parser = createJsonParser(json)

// Read first root object token and start parsing
parser.nextToken()
parseObjectField(parser)(block)
}

/**
* Parse each inner field in the object field with the given parsing function.
*
* @param parser
* JSON parser
* @param block
* parsing logic on each inner field
*/
def parseObjectField(parser: XContentParser)(block: (XContentParser, String) => Unit): Unit = {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName: String = parser.currentName()
parser.nextToken() // Move to the field value

block(parser, fieldName)
}
}

/**
* Parse each inner field in the array field.
*
* @param parser
* JSON parser
* @param block
* parsing logic on each inner field
*/
def parseArrayField(parser: XContentParser)(block: => Unit): Unit = {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
block
}
}
}

This file was deleted.

Loading

0 comments on commit b4f0a81

Please sign in to comment.