Skip to content

Commit

Permalink
update PR to only inclide
Browse files Browse the repository at this point in the history
json_delete
json_append

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Dec 11, 2024
1 parent 3fe3adb commit 88bcaea
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 170 deletions.
35 changes: 0 additions & 35 deletions docs/ppl-lang/functions/ppl-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,41 +286,6 @@ Example:
|{"root": {"a":["valueA", "valueB"]}} |
+-----------------------------------------------+



### `JSON_EXTEND`

**Description**

`json_extend(json, [path_value_pairs list])` extend appends multiple (array) values to an existing array json elements. Return the updated object after extending.

**Argument type:** JSON, List<[(STRING, List<STRING>)]>

**Return type:** JSON

A JSON object format.

**Note**
Extend arrays as individual values separates the `json_extend` functionality from the `json_append` - which is a similar function that appends the `<value>` as a single element.

Example:

os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`,['a', ["valueC","valueD"]])
fetched rows / total rows = 1/1
+-------------------------------------------------+
| extend |
+-------------------------------------------------+
| {"a":["valueA", "valueB", "valueC", "valueD"]} |
+-------------------------------------------------+

os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`,['a',[{"b":["valueC","valueD"]}]])
fetched rows / total rows = 1/1
+-------------------------------------------------------------+
| extend |
+-------------------------------------------------------------+
| {"a":["valueA", "valueB", {"b":"valueC"}, {"b":"valueD"}]} |
+-------------------------------------------------------------+

### `JSON_KEYS`

**Description**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class FlintSparkPPLJsonFunctionITSuite
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}"
private val validJson8 =
"{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}"
private val validJson9 = "{\"a\":[\"valueA\", \"valueB\"]}"
private val invalidJson1 = "[1,2"
private val invalidJson2 = "[invalid json]"
private val invalidJson3 = "{\"invalid\": \"json\""
Expand Down Expand Up @@ -667,5 +668,4 @@ class FlintSparkPPLJsonFunctionITSuite
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,10 @@ JSON_ARRAY_LENGTH: 'JSON_ARRAY_LENGTH';
TO_JSON_STRING: 'TO_JSON_STRING';
JSON_EXTRACT: 'JSON_EXTRACT';
JSON_DELETE : 'JSON_DELETE';
JSON_EXTEND : 'JSON_EXTEND';
JSON_KEYS: 'JSON_KEYS';
JSON_VALID: 'JSON_VALID';
JSON_APPEND: 'JSON_APPEND';
//JSON_EXTEND : 'JSON_EXTEND';
//JSON_SET: 'JSON_SET';
//JSON_ARRAY_ALL_MATCH: 'JSON_ARRAY_ALL_MATCH';
//JSON_ARRAY_ANY_MATCH: 'JSON_ARRAY_ANY_MATCH';
Expand Down
3 changes: 0 additions & 3 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -876,12 +876,9 @@ jsonFunctionName
| TO_JSON_STRING
| JSON_EXTRACT
| JSON_DELETE
| JSON_EXTEND
| JSON_APPEND
| JSON_KEYS
| JSON_VALID
// | JSON_APPEND
// | JSON_DELETE
// | JSON_EXTEND
// | JSON_SET
// | JSON_ARRAY_ALL_MATCH
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public interface JsonUtils {
ObjectMapper objectMapper = new ObjectMapper();

static Object parseValue(String value) {
// Try parsing the value as JSON, fallback to primitive if parsing fails
try {
return objectMapper.readValue(value, Object.class);
} catch (Exception e) {
// Primitive value, return as is
return value;
}
}

/**
* append nested value to the json object
* @param currentObj
* @param pathParts
* @param depth
* @param valueToAppend
*/
static void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) {
if (currentObj == null || depth >= pathParts.length) {
return;
}

if (currentObj instanceof Map) {
Map<String, Object> currentMap = (Map<String, Object>) currentObj;
String currentKey = pathParts[depth];

if (depth == pathParts.length - 1) {
// If it's the last key, append to the array
currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present
Object existingValue = currentMap.get(currentKey);

if (existingValue instanceof List) {
List<Object> existingList = (List<Object>) existingValue;
existingList.add(valueToAppend);
}
} else {
// Continue traversing
currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present
appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend);
}
} else if (currentObj instanceof List) {
// If the current object is a list, process each map in the list
List<Object> list = (List<Object>) currentObj;
for (Object item : list) {
if (item instanceof Map) {
appendNestedValue(item, pathParts, depth, valueToAppend);
}
}
}
}

/**
* remove nested json object using its keys parts
* @param currentObj
* @param keyParts
* @param depth
*/
static void removeNestedKey(Object currentObj, String[] keyParts, int depth) {
if (currentObj == null || depth >= keyParts.length) {
return;
}

if (currentObj instanceof Map) {
Map<String, Object> currentMap = (Map<String, Object>) currentObj;
String currentKey = keyParts[depth];

if (depth == keyParts.length - 1) {
// If it's the last key, remove it from the map
currentMap.remove(currentKey);
} else {
// If not the last key, continue traversing
if (currentMap.containsKey(currentKey)) {
Object nextObj = currentMap.get(currentKey);

if (nextObj instanceof List) {
// If the value is a list, process each item in the list
List<Object> list = (List<Object>) nextObj;
for (int i = 0; i < list.size(); i++) {
removeNestedKey(list.get(i), keyParts, depth + 1);
}
} else {
// Continue traversing if it's a map
removeNestedKey(nextObj, keyParts, depth + 1);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.sql.expression.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import inet.ipaddr.AddressStringException;
import inet.ipaddr.IPAddressString;
import inet.ipaddr.IPAddressStringParameters;
Expand All @@ -19,18 +18,19 @@
import scala.collection.mutable.WrappedArray;
import scala.runtime.AbstractFunction2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.sql.expression.function.JsonUtils.appendNestedValue;
import static org.opensearch.sql.expression.function.JsonUtils.objectMapper;
import static org.opensearch.sql.expression.function.JsonUtils.parseValue;
import static org.opensearch.sql.expression.function.JsonUtils.removeNestedKey;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;


public interface SerializableUdf {

ObjectMapper objectMapper = new ObjectMapper();

abstract class SerializableAbstractFunction2<T1, T2, R> extends AbstractFunction2<T1, T2, R>
implements Serializable {
Expand Down Expand Up @@ -65,38 +65,6 @@ private void removeKeys(Map<String, Object> map, WrappedArray<String> keysToRemo
removeNestedKey(map, keyParts, 0);
}
}

private void removeNestedKey(Object currentObj, String[] keyParts, int depth) {
if (currentObj == null || depth >= keyParts.length) {
return;
}

if (currentObj instanceof Map) {
Map<String, Object> currentMap = (Map<String, Object>) currentObj;
String currentKey = keyParts[depth];

if (depth == keyParts.length - 1) {
// If it's the last key, remove it from the map
currentMap.remove(currentKey);
} else {
// If not the last key, continue traversing
if (currentMap.containsKey(currentKey)) {
Object nextObj = currentMap.get(currentKey);

if (nextObj instanceof List) {
// If the value is a list, process each item in the list
List<Object> list = (List<Object>) nextObj;
for (int i = 0; i < list.size(); i++) {
removeNestedKey(list.get(i), keyParts, depth + 1);
}
} else {
// Continue traversing if it's a map
removeNestedKey(nextObj, keyParts, depth + 1);
}
}
}
}
}
};

Function2<String, WrappedArray<String>, String> jsonAppendFunction = new SerializableAbstractFunction2<>() {
Expand Down Expand Up @@ -136,88 +104,8 @@ public String apply(String jsonStr, WrappedArray<String> elements) {
return null;
}
}

private Object parseValue(String value) {
// Try parsing the value as JSON, fallback to primitive if parsing fails
try {
return objectMapper.readValue(value, Object.class);
} catch (Exception e) {
// Primitive value, return as is
return value;
}
}

private void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) {
if (currentObj == null || depth >= pathParts.length) {
return;
}

if (currentObj instanceof Map) {
Map<String, Object> currentMap = (Map<String, Object>) currentObj;
String currentKey = pathParts[depth];

if (depth == pathParts.length - 1) {
// If it's the last key, append to the array
currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present
Object existingValue = currentMap.get(currentKey);

if (existingValue instanceof List) {
List<Object> existingList = (List<Object>) existingValue;
existingList.add(valueToAppend);
}
} else {
// Continue traversing
currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present
appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend);
}
} else if (currentObj instanceof List) {
// If the current object is a list, process each map in the list
List<Object> list = (List<Object>) currentObj;
for (Object item : list) {
if (item instanceof Map) {
appendNestedValue(item, pathParts, depth, valueToAppend);
}
}
}
}
};

/**
* Extend JSON arrays with new values based on specified path-value pairs.
*
* @param jsonStr The input JSON string.
* @param pathValuePairs A list of path-value pairs to extend.
* @return The updated JSON string.
*/
Function2<String, List<Map.Entry<String, List<String>>>, String> jsonExtendFunction = new SerializableAbstractFunction2<>() {

@Override
public String apply(String jsonStr, List<Map.Entry<String, List<String>>> pathValuePairs) {
if (jsonStr == null) {
return null;
}
try {
Map<String, Object> jsonMap = objectMapper.readValue(jsonStr, Map.class);

for (Map.Entry<String, List<String>> pathValuePair : pathValuePairs) {
String path = pathValuePair.getKey();
List<String> values = pathValuePair.getValue();

if (jsonMap.containsKey(path) && jsonMap.get(path) instanceof List) {
List<Object> existingList = (List<Object>) jsonMap.get(path);
existingList.addAll(values);
} else {
jsonMap.put(path, values);
}
}

return objectMapper.writeValueAsString(jsonMap);
} catch (Exception e) {
return null;
}
}
};

Function2<String, String, Boolean> cidrFunction = new SerializableAbstractFunction2<>() {

IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder()
Expand Down Expand Up @@ -280,15 +168,6 @@ static ScalaUDF visit(String funcName, List<Expression> expressions) {
Option.apply("json_delete"),
false,
true);
case "json_extend":
return new ScalaUDF(jsonExtendFunction,
DataTypes.StringType,
seq(expressions),
seq(),
Option.empty(),
Option.apply("json_extend"),
false,
true);
case "json_append":
return new ScalaUDF(jsonAppendFunction,
DataTypes.StringType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,15 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite
planTransformer.visit(
plan(
pplParser,
"""source=t | eval result = json_append('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""),
"""source=t | eval result = json_append('{"a":[{"b":1},{"c":2}]}', array('a.b','c','d'))"""),
context)

val table = UnresolvedRelation(Seq("t"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false)
UnresolvedFunction(
"array",
Seq(Literal("a.b"), Literal("c"), Literal("d")),
isDistinct = false)
val jsonObjExp =
Literal("""{"a":[{"b":1},{"c":2}]}""")
val jsonFunc =
Expand All @@ -239,12 +242,15 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite
planTransformer.visit(
plan(
pplParser,
"""source=t | eval result = json_extend('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""),
"""source=t | eval result = json_extend('{"a":[{"b":1},{"c":2}]}', array('a.b', 'c','d'))"""),
context)

val table = UnresolvedRelation(Seq("t"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false)
UnresolvedFunction(
"array",
Seq(Literal("a.b"), Literal("c"), Literal("d")),
isDistinct = false)
val jsonObjExp =
Literal("""{"a":[{"b":1},{"c":2}]}""")
val jsonFunc =
Expand Down

0 comments on commit 88bcaea

Please sign in to comment.