Skip to content

Commit

Permalink
Merge branch 'main' into ppl-spark-join-command
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Oct 19, 2023
2 parents e1735b2 + 3fcf926 commit bb16e7a
Show file tree
Hide file tree
Showing 29 changed files with 1,706 additions and 330 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ jobs:
- name: Integ Test
run: sbt integtest/test

- name: Unit Test
run: sbt test

- name: Style check
run: sbt scalafmtCheckAll
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ lazy val flintCore = (project in file("flint-core"))
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion
exclude ("org.apache.logging.log4j", "log4j-api"),
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"),
publish / skip := true)

lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
Expand Down
27 changes: 13 additions & 14 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ Currently, Flint metadata is only static configuration without version control a

```json
{
"version": "0.1",
"indexConfig": {
"kind": "skipping",
"properties": {
"indexedColumns": [{
"kind": "...",
"columnName": "...",
"columnType": "..."
}]
}
},
"source": "alb_logs",
"state": "active",
"enabled": true
"version": "0.1.0",
"name": "...",
"kind": "skipping",
"source": "...",
"indexedColumns": [{
"kind": "...",
"columnName": "...",
"columnType": "..."
}],
"options": { },
"properties": { }
}
```

Expand Down Expand Up @@ -199,6 +196,8 @@ User can provide the following options in `WITH` clause of create statement:
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.

Note that the index option name is case-sensitive.

```sql
WITH (
auto_refresh = [true|false],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.flint.core;

import java.util.List;

import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;
Expand Down Expand Up @@ -71,4 +73,10 @@ public interface FlintClient {
* @return {@link FlintWriter}
*/
FlintWriter createWriter(String indexName);

/**
* Create {@link RestHighLevelClient}.
* @return {@link RestHighLevelClient}
*/
public RestHighLevelClient createClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata

import java.nio.charset.StandardCharsets.UTF_8

import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent._
import org.opensearch.common.xcontent.json.JsonXContent

/**
* JSON parsing and building helper.
*/
object FlintJsonHelper {

/**
* Build JSON by creating JSON builder and pass it to the given function.
*
* @param block
* building logic with JSON builder
* @return
* JSON string
*/
def buildJson(block: XContentBuilder => Unit): String = {
val builder: XContentBuilder = XContentFactory.jsonBuilder
builder.startObject
block(builder)
builder.endObject()
BytesReference.bytes(builder).utf8ToString
}

/**
* Add an object field of the name to the JSON builder and continue building it with the given
* function.
*
* @param builder
* JSON builder
* @param name
* field name
* @param block
* building logic on the JSON field
*/
def objectField(builder: XContentBuilder, name: String)(block: => Unit): Unit = {
builder.startObject(name)
block
builder.endObject()
}

/**
* Add an optional object field of the name to the JSON builder. Add an empty object field if
* the value is null.
*
* @param builder
* JSON builder
* @param name
* field name
* @param value
* field value
*/
def optionalObjectField(builder: XContentBuilder, name: String, value: AnyRef): Unit = {
if (value == null) {
builder.startObject(name).endObject()
} else {
builder.field(name, value)
}
}

/**
* Create a XContent JSON parser on the given JSON string.
*
* @param json
* JSON string
* @return
* JSON parser
*/
def createJsonParser(json: String): XContentParser = {
JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.IGNORE_DEPRECATIONS,
json.getBytes(UTF_8))
}

/**
* Parse the given JSON string by creating JSON parser and pass it to the parsing function.
*
* @param json
* JSON string
* @param block
* parsing logic with the parser
*/
def parseJson(json: String)(block: (XContentParser, String) => Unit): Unit = {
val parser = createJsonParser(json)

// Read first root object token and start parsing
parser.nextToken()
parseObjectField(parser)(block)
}

/**
* Parse each inner field in the object field with the given parsing function.
*
* @param parser
* JSON parser
* @param block
* parsing logic on each inner field
*/
def parseObjectField(parser: XContentParser)(block: (XContentParser, String) => Unit): Unit = {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName: String = parser.currentName()
parser.nextToken() // Move to the field value

block(parser, fieldName)
}
}

/**
* Parse each inner field in the array field.
*
* @param parser
* JSON parser
* @param block
* parsing logic on each inner field
*/
def parseArrayField(parser: XContentParser)(block: => Unit): Unit = {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
block
}
}
}

This file was deleted.

Loading

0 comments on commit bb16e7a

Please sign in to comment.