diff --git a/docs/ppl-lang/functions/ppl-json.md b/docs/ppl-lang/functions/ppl-json.md index 627eb110c..75f8940f9 100644 --- a/docs/ppl-lang/functions/ppl-json.md +++ b/docs/ppl-lang/functions/ppl-json.md @@ -217,7 +217,7 @@ A JSON object format. Example: - os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, ["a"]) + os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, json_array("a")) fetched rows / total rows = 1/1 +----------------------------------+ | deleted | @@ -225,7 +225,7 @@ Example: | {"a": "valueA" } | +----------------------------------+ - os> source=people | eval eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, ["a.b"]) + os> source=people | eval eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, json_array("a.b")) fetched rows / total rows = 1/1 +-----------------------------------------------------------+ | deleted | @@ -233,7 +233,7 @@ Example: | {"a":[{"c":3}] } | +-----------------------------------------------------------+ - os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, ["b.b"]) + os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, json_array("b.b")) fetched rows / total rows = 1/1 +-----------------------------------+ | no_action | @@ -245,9 +245,9 @@ Example: **Description** -`json_append(json, [path_value list])` appends values to end of an array within the json elements. Return the updated json object after appending . +`json_append(json, [path_key, list of values to add ])` appends values to end of an array within the json elements. Return the updated json object after appending . -**Argument type:** JSON, List<[(STRING, STRING>)]> +**Argument type:** JSON, List **Return type:** JSON @@ -257,32 +257,33 @@ A JSON object format. Append adds the value to the end of the existing array with the following cases: - path is an object value - append is ignored and the value is returned - path is an existing array not empty - the value are added to the array's tail + - path not found - the value are added to the root of the json tree - path is an existing array is empty - create a new array with the given value Example: - os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, ["a","valueC"]) + os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, json_array('a', 'valueC', 'valueD')) fetched rows / total rows = 1/1 +-------------------------------------------------+ | append | +-------------------------------------------------+ - | {"a":["valueA", "valueB", "valueC"]} | + | {"a":["valueA", "valueB", "valueC", "valueD"]} | +-------------------------------------------------+ - os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, ['a', {"a":["valueC"]}]) + os> source=people | eval append = json_append(`{"a":[]}`, json_array('a', 'valueC')) fetched rows / total rows = 1/1 +-----------------------------------------------+ | append | +-----------------------------------------------+ - | {"a":["valueA", "valueB", ["valueC"]]} | + | {"a":["valueC"]} | +-----------------------------------------------+ - os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, {"root.a":"valueC"}) + os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, json_array('root', 'valueC') fetched rows / total rows = 1/1 +-----------------------------------------------+ | append | +-----------------------------------------------+ - |{"root": {"a":["valueA", "valueB", "valueC"]}} | + |{"root": {"a":["valueA", "valueB"]}} | +-----------------------------------------------+ diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala index c438f5d56..9e0e6a1b3 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala @@ -31,6 +31,10 @@ class FlintSparkPPLJsonFunctionITSuite private val validJson5 = "{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}" private val validJson6 = "[1,2,3]" + private val validJson7 = + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}" + private val validJson8 = + "{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}" private val invalidJson1 = "[1,2" private val invalidJson2 = "[invalid json]" private val invalidJson3 = "{\"invalid\": \"json\"" @@ -502,4 +506,166 @@ class FlintSparkPPLJsonFunctionITSuite comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + test("test json_append() function: add single value") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('teacher', 'Tom')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\",\"Tom\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("teacher"), Literal("Tom")), isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add single value key not found") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('headmaster', 'Tom')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}],\"headmaster\":[\"Tom\"]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("headmaster"), Literal("Tom")), isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add single Object key not found") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('headmaster', '{"name":"Tomy","rank":1}')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}],\"headmaster\":[{\"name\":\"Tomy\",\"rank\":1}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("headmaster"), Literal("""{"name":"Tomy","rank":1}""")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add single Object value") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('student', '{"name":"Tomy","rank":5}')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2},{\"name\":\"Tomy\",\"rank\":5}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("student"), Literal("""{"name":"Tomy","rank":5}""")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add multi value") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('teacher', 'Tom', 'Walt')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\",\"Tom\",\"Walt\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("teacher"), Literal("Tom"), Literal("Walt")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add nested value") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson8',json_array('school.teacher', 'Tom', 'Walt')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"school\":{\"teacher\":[\"Alice\",\"Tom\",\"Walt\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("school.teacher"), Literal("Tom"), Literal("Walt")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 0b1c3b210..7b5e10ffe 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -19,7 +19,9 @@ import scala.collection.mutable.WrappedArray; import scala.runtime.AbstractFunction2; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -97,43 +99,87 @@ private void removeNestedKey(Object currentObj, String[] keyParts, int depth) { } }; - Function2>, String> jsonAppendFunction = new SerializableAbstractFunction2<>() { - + Function2, String> jsonAppendFunction = new SerializableAbstractFunction2<>() { /** - * Append values to JSON arrays based on specified path-value pairs. + * Append values to JSON arrays based on specified path-values. * - * @param jsonStr The input JSON string. - * @param pathValuePairs A list of path-value pairs to append. + * @param jsonStr The input JSON string. + * @param elements A list of path-values where the first item is the path and subsequent items are values to append. * @return The updated JSON string. */ - public String apply(String jsonStr, List> pathValuePairs) { + public String apply(String jsonStr, WrappedArray elements) { if (jsonStr == null) { return null; } try { - Map jsonMap = objectMapper.readValue(jsonStr, Map.class); + List pathValues = JavaConverters.mutableSeqAsJavaList(elements); + if (pathValues.isEmpty()) { + return jsonStr; + } - for (Map.Entry pathValuePair : pathValuePairs) { - String path = pathValuePair.getKey(); - String value = pathValuePair.getValue(); + String path = pathValues.get(0); + String[] pathParts = path.split("\\."); + List values = pathValues.subList(1, pathValues.size()); - if (jsonMap.containsKey(path) && jsonMap.get(path) instanceof List) { - List existingList = (List) jsonMap.get(path); - // Append value to the end of the existing Scala List - existingList.add(value); - jsonMap.put(path, existingList); - } else if (jsonMap.containsKey(path)) { - // Ignore appending if the path is not an array - } else { - jsonMap.put(path, List.of(value)); - } + // Parse the JSON string into a Map + Map jsonMap = objectMapper.readValue(jsonStr, Map.class); + + // Append each value at the specified path + for (String value : values) { + Object parsedValue = parseValue(value); // Parse the value + appendNestedValue(jsonMap, pathParts, 0, parsedValue); } + // Convert the updated map back to JSON return objectMapper.writeValueAsString(jsonMap); } catch (Exception e) { return null; } } + + private Object parseValue(String value) { + // Try parsing the value as JSON, fallback to primitive if parsing fails + try { + return objectMapper.readValue(value, Object.class); + } catch (Exception e) { + // Primitive value, return as is + return value; + } + } + + private void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) { + if (currentObj == null || depth >= pathParts.length) { + return; + } + + if (currentObj instanceof Map) { + Map currentMap = (Map) currentObj; + String currentKey = pathParts[depth]; + + if (depth == pathParts.length - 1) { + // If it's the last key, append to the array + currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present + Object existingValue = currentMap.get(currentKey); + + if (existingValue instanceof List) { + List existingList = (List) existingValue; + existingList.add(valueToAppend); + } + } else { + // Continue traversing + currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present + appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend); + } + } else if (currentObj instanceof List) { + // If the current object is a list, process each map in the list + List list = (List) currentObj; + for (Object item : list) { + if (item instanceof Map) { + appendNestedValue(item, pathParts, depth, valueToAppend); + } + } + } + } }; /** @@ -171,7 +217,7 @@ public String apply(String jsonStr, List>> pathVa } } }; - + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java index 3f54c4dc9..884d3d5a2 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java @@ -95,17 +95,30 @@ public void testJsonDeleteFunctionInvalidJson() { public void testJsonAppendFunctionAppendToExistingArray() { String jsonStr = "{\"arrayKey\":[\"value1\",\"value2\"]}"; String expectedJson = "{\"arrayKey\":[\"value1\",\"value2\",\"value3\"]}"; - Map.Entry pair = Map.entry("arrayKey", "value3"); - String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"arrayKey", "value3"})); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonAppendFunctionAppendObjectToExistingArray() { + String jsonStr = "{\"key1\":\"value1\",\"key2\":[{\"a\":\"valueA\",\"key3\":\"value3\"}]}"; + String expectedJson = "{\"key1\":\"value1\",\"key2\":[{\"a\":\"valueA\",\"key3\":\"value3\"},{\"a\":\"valueA\",\"key4\":\"value4\"}]}"; + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"key2", "{\"a\":\"valueA\",\"key4\":\"value4\"}"})); assertEquals(expectedJson, result); } @Test public void testJsonAppendFunctionAddNewArray() { - String jsonStr = "{\"key1\":\"value1\"}"; + String jsonStr = "{\"key1\":\"value1\",\"newArray\":[]}"; String expectedJson = "{\"key1\":\"value1\",\"newArray\":[\"newValue\"]}"; - Map.Entry pair = Map.entry("newArray", "newValue"); - String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"newArray", "newValue"})); + assertEquals(expectedJson, result); + } + @Test + public void testJsonAppendFunctionNoSuchKey() { + String jsonStr = "{\"key1\":\"value1\"}"; + String expectedJson = "{\"key1\":\"value1\",\"newKey\":[\"newValue\"]}"; + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"newKey", "newValue"})); assertEquals(expectedJson, result); } @@ -113,34 +126,36 @@ public void testJsonAppendFunctionAddNewArray() { public void testJsonAppendFunctionIgnoreNonArrayKey() { String jsonStr = "{\"key1\":\"value1\"}"; String expectedJson = jsonStr; - Map.Entry pair = Map.entry("key1", "newValue"); - String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"key1", "newValue"})); assertEquals(expectedJson, result); } @Test - public void testJsonAppendFunctionMultipleAppends() { - String jsonStr = "{\"arrayKey\":[\"value1\"]}"; - String expectedJson = "{\"arrayKey\":[\"value1\",\"value2\",\"value3\"],\"newKey\":[\"newValue\"]}"; - List> pairs = Arrays.asList( - Map.entry("arrayKey", "value2"), - Map.entry("arrayKey", "value3"), - Map.entry("newKey", "newValue") - ); - String result = jsonAppendFunction.apply(jsonStr, pairs); + public void testJsonAppendFunctionWithNestedArrayKeys() { + String jsonStr = "{\"key2\":[{\"a\":[\"Value1\"],\"key3\":\"Value3\"},{\"a\":[\"Value1\"],\"key4\":\"Value4\"}]}"; + String expectedJson = "{\"key2\":[{\"a\":[\"Value1\",\"Value2\"],\"key3\":\"Value3\"},{\"a\":[\"Value1\",\"Value2\"],\"key4\":\"Value4\"}]}"; + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"key2.a","Value2"})); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonAppendFunctionWithObjectKey() { + String jsonStr = "{\"key2\":[{\"a\":[\"Value1\"],\"key3\":\"Value3\"},{\"a\":[\"Value1\"],\"key4\":\"Value4\"}]}"; + String expectedJson = "{\"key2\":[{\"a\":[\"Value1\"],\"key3\":\"Value3\"},{\"a\":[\"Value1\"],\"key4\":\"Value4\"},\"Value2\"]}"; + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"key2","Value2"})); assertEquals(expectedJson, result); } @Test public void testJsonAppendFunctionNullJson() { - String result = jsonAppendFunction.apply(null, Collections.singletonList(Map.entry("key", "value"))); + String result = jsonAppendFunction.apply(null, WrappedArray.make(new String[]{"key1", "newValue"})); assertNull(result); } @Test public void testJsonAppendFunctionInvalidJson() { String invalidJson = "invalid_json"; - String result = jsonAppendFunction.apply(invalidJson, Collections.singletonList(Map.entry("key", "value"))); + String result = jsonAppendFunction.apply(invalidJson, WrappedArray.make(new String[]{"key1", "newValue"})); assertNull(result); }