Skip to content

Commit

Permalink
update IT tests for json_append
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Dec 10, 2024
1 parent 9dbc89e commit a46494c
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 50 deletions.
23 changes: 12 additions & 11 deletions docs/ppl-lang/functions/ppl-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,23 @@ A JSON object format.

Example:

os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, ["a"])
os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, json_array("a"))
fetched rows / total rows = 1/1
+----------------------------------+
| deleted |
+----------------------------------+
| {"a": "valueA" } |
+----------------------------------+

os> source=people | eval eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, ["a.b"])
os> source=people | eval eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, json_array("a.b"))
fetched rows / total rows = 1/1
+-----------------------------------------------------------+
| deleted |
+-----------------------------------------------------------+
| {"a":[{"c":3}] } |
+-----------------------------------------------------------+

os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, ["b.b"])
os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, json_array("b.b"))
fetched rows / total rows = 1/1
+-----------------------------------+
| no_action |
Expand All @@ -245,9 +245,9 @@ Example:

**Description**

`json_append(json, [path_value list])` appends values to end of an array within the json elements. Return the updated json object after appending .
`json_append(json, [path_key, list of values to add ])` appends values to end of an array within the json elements. Return the updated json object after appending .

**Argument type:** JSON, List<[(STRING, STRING>)]>
**Argument type:** JSON, List<STRING>

**Return type:** JSON

Expand All @@ -257,32 +257,33 @@ A JSON object format.
Append adds the value to the end of the existing array with the following cases:
- path is an object value - append is ignored and the value is returned
- path is an existing array not empty - the value are added to the array's tail
- path not found - the value are added to the root of the json tree
- path is an existing array is empty - create a new array with the given value

Example:

os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, ["a","valueC"])
os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, json_array('a', 'valueC', 'valueD'))
fetched rows / total rows = 1/1
+-------------------------------------------------+
| append |
+-------------------------------------------------+
| {"a":["valueA", "valueB", "valueC"]} |
| {"a":["valueA", "valueB", "valueC", "valueD"]} |
+-------------------------------------------------+

os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, ['a', {"a":["valueC"]}])
os> source=people | eval append = json_append(`{"a":[]}`, json_array('a', 'valueC'))
fetched rows / total rows = 1/1
+-----------------------------------------------+
| append |
+-----------------------------------------------+
| {"a":["valueA", "valueB", ["valueC"]]} |
| {"a":["valueC"]} |
+-----------------------------------------------+

os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, {"root.a":"valueC"})
os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, json_array('root', 'valueC')
fetched rows / total rows = 1/1
+-----------------------------------------------+
| append |
+-----------------------------------------------+
|{"root": {"a":["valueA", "valueB", "valueC"]}} |
|{"root": {"a":["valueA", "valueB"]}} |
+-----------------------------------------------+


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class FlintSparkPPLJsonFunctionITSuite
private val validJson5 =
"{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}"
private val validJson6 = "[1,2,3]"
private val validJson7 =
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}"
private val validJson8 =
"{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}"
private val invalidJson1 = "[1,2"
private val invalidJson2 = "[invalid json]"
private val invalidJson3 = "{\"invalid\": \"json\""
Expand Down Expand Up @@ -502,4 +506,166 @@ class FlintSparkPPLJsonFunctionITSuite
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add single value") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',json_array('teacher', 'Tom')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\",\"Tom\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("teacher"), Literal("Tom")), isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add single value key not found") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',json_array('headmaster', 'Tom')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}],\"headmaster\":[\"Tom\"]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("headmaster"), Literal("Tom")), isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add single Object key not found") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',json_array('headmaster', '{"name":"Tomy","rank":1}')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}],\"headmaster\":[{\"name\":\"Tomy\",\"rank\":1}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("headmaster"), Literal("""{"name":"Tomy","rank":1}""")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add single Object value") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',json_array('student', '{"name":"Tomy","rank":5}')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2},{\"name\":\"Tomy\",\"rank\":5}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("student"), Literal("""{"name":"Tomy","rank":5}""")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add multi value") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',json_array('teacher', 'Tom', 'Walt')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\",\"Tom\",\"Walt\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("teacher"), Literal("Tom"), Literal("Walt")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add nested value") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson8',json_array('school.teacher', 'Tom', 'Walt')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"school\":{\"teacher\":[\"Alice\",\"Tom\",\"Walt\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("school.teacher"), Literal("Tom"), Literal("Walt")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import scala.collection.mutable.WrappedArray;
import scala.runtime.AbstractFunction2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -97,43 +99,87 @@ private void removeNestedKey(Object currentObj, String[] keyParts, int depth) {
}
};

Function2<String, List<Map.Entry<String, String>>, String> jsonAppendFunction = new SerializableAbstractFunction2<>() {

Function2<String, WrappedArray<String>, String> jsonAppendFunction = new SerializableAbstractFunction2<>() {
/**
* Append values to JSON arrays based on specified path-value pairs.
* Append values to JSON arrays based on specified path-values.
*
* @param jsonStr The input JSON string.
* @param pathValuePairs A list of path-value pairs to append.
* @param jsonStr The input JSON string.
* @param elements A list of path-values where the first item is the path and subsequent items are values to append.
* @return The updated JSON string.
*/
public String apply(String jsonStr, List<Map.Entry<String, String>> pathValuePairs) {
public String apply(String jsonStr, WrappedArray<String> elements) {
if (jsonStr == null) {
return null;
}
try {
Map<String, Object> jsonMap = objectMapper.readValue(jsonStr, Map.class);
List<String> pathValues = JavaConverters.mutableSeqAsJavaList(elements);
if (pathValues.isEmpty()) {
return jsonStr;
}

for (Map.Entry<String, String> pathValuePair : pathValuePairs) {
String path = pathValuePair.getKey();
String value = pathValuePair.getValue();
String path = pathValues.get(0);
String[] pathParts = path.split("\\.");
List<String> values = pathValues.subList(1, pathValues.size());

if (jsonMap.containsKey(path) && jsonMap.get(path) instanceof List) {
List<Object> existingList = (List<Object>) jsonMap.get(path);
// Append value to the end of the existing Scala List
existingList.add(value);
jsonMap.put(path, existingList);
} else if (jsonMap.containsKey(path)) {
// Ignore appending if the path is not an array
} else {
jsonMap.put(path, List.of(value));
}
// Parse the JSON string into a Map
Map<String, Object> jsonMap = objectMapper.readValue(jsonStr, Map.class);

// Append each value at the specified path
for (String value : values) {
Object parsedValue = parseValue(value); // Parse the value
appendNestedValue(jsonMap, pathParts, 0, parsedValue);
}

// Convert the updated map back to JSON
return objectMapper.writeValueAsString(jsonMap);
} catch (Exception e) {
return null;
}
}

private Object parseValue(String value) {
// Try parsing the value as JSON, fallback to primitive if parsing fails
try {
return objectMapper.readValue(value, Object.class);
} catch (Exception e) {
// Primitive value, return as is
return value;
}
}

private void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) {
if (currentObj == null || depth >= pathParts.length) {
return;
}

if (currentObj instanceof Map) {
Map<String, Object> currentMap = (Map<String, Object>) currentObj;
String currentKey = pathParts[depth];

if (depth == pathParts.length - 1) {
// If it's the last key, append to the array
currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present
Object existingValue = currentMap.get(currentKey);

if (existingValue instanceof List) {
List<Object> existingList = (List<Object>) existingValue;
existingList.add(valueToAppend);
}
} else {
// Continue traversing
currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present
appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend);
}
} else if (currentObj instanceof List) {
// If the current object is a list, process each map in the list
List<Object> list = (List<Object>) currentObj;
for (Object item : list) {
if (item instanceof Map) {
appendNestedValue(item, pathParts, depth, valueToAppend);
}
}
}
}
};

/**
Expand Down Expand Up @@ -171,7 +217,7 @@ public String apply(String jsonStr, List<Map.Entry<String, List<String>>> pathVa
}
}
};

Function2<String, String, Boolean> cidrFunction = new SerializableAbstractFunction2<>() {

IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder()
Expand Down
Loading

0 comments on commit a46494c

Please sign in to comment.