From 88bcaea9c6f4d93a00ffdaa82b3286f520978b1f Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 11 Dec 2024 14:50:49 -0800 Subject: [PATCH] update PR to only inclide json_delete json_append Signed-off-by: YANGDB --- docs/ppl-lang/functions/ppl-json.md | 35 ----- .../FlintSparkPPLJsonFunctionITSuite.scala | 2 +- .../src/main/antlr4/OpenSearchPPLLexer.g4 | 2 +- .../src/main/antlr4/OpenSearchPPLParser.g4 | 3 - .../sql/expression/function/JsonUtils.java | 106 ++++++++++++++ .../expression/function/SerializableUdf.java | 131 +----------------- ...PlanJsonFunctionsTranslatorTestSuite.scala | 14 +- 7 files changed, 123 insertions(+), 170 deletions(-) create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/JsonUtils.java diff --git a/docs/ppl-lang/functions/ppl-json.md b/docs/ppl-lang/functions/ppl-json.md index 16af1e31e..c3955941b 100644 --- a/docs/ppl-lang/functions/ppl-json.md +++ b/docs/ppl-lang/functions/ppl-json.md @@ -286,41 +286,6 @@ Example: |{"root": {"a":["valueA", "valueB"]}} | +-----------------------------------------------+ - - -### `JSON_EXTEND` - -**Description** - -`json_extend(json, [path_value_pairs list])` extend appends multiple (array) values to an existing array json elements. Return the updated object after extending. - -**Argument type:** JSON, List<[(STRING, List)]> - -**Return type:** JSON - -A JSON object format. - -**Note** -Extend arrays as individual values separates the `json_extend` functionality from the `json_append` - which is a similar function that appends the `` as a single element. - -Example: - - os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`,['a', ["valueC","valueD"]]) - fetched rows / total rows = 1/1 - +-------------------------------------------------+ - | extend | - +-------------------------------------------------+ - | {"a":["valueA", "valueB", "valueC", "valueD"]} | - +-------------------------------------------------+ - - os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`,['a',[{"b":["valueC","valueD"]}]]) - fetched rows / total rows = 1/1 - +-------------------------------------------------------------+ - | extend | - +-------------------------------------------------------------+ - | {"a":["valueA", "valueB", {"b":"valueC"}, {"b":"valueD"}]} | - +-------------------------------------------------------------+ - ### `JSON_KEYS` **Description** diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala index a1ac52de0..7a00d9a07 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala @@ -35,6 +35,7 @@ class FlintSparkPPLJsonFunctionITSuite "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}" private val validJson8 = "{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}" + private val validJson9 = "{\"a\":[\"valueA\", \"valueB\"]}" private val invalidJson1 = "[1,2" private val invalidJson2 = "[invalid json]" private val invalidJson3 = "{\"invalid\": \"json\"" @@ -667,5 +668,4 @@ class FlintSparkPPLJsonFunctionITSuite val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - } diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index a8efffe71..b7d615980 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -386,10 +386,10 @@ JSON_ARRAY_LENGTH: 'JSON_ARRAY_LENGTH'; TO_JSON_STRING: 'TO_JSON_STRING'; JSON_EXTRACT: 'JSON_EXTRACT'; JSON_DELETE : 'JSON_DELETE'; -JSON_EXTEND : 'JSON_EXTEND'; JSON_KEYS: 'JSON_KEYS'; JSON_VALID: 'JSON_VALID'; JSON_APPEND: 'JSON_APPEND'; +//JSON_EXTEND : 'JSON_EXTEND'; //JSON_SET: 'JSON_SET'; //JSON_ARRAY_ALL_MATCH: 'JSON_ARRAY_ALL_MATCH'; //JSON_ARRAY_ANY_MATCH: 'JSON_ARRAY_ANY_MATCH'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 54926e05e..b990fd549 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -876,12 +876,9 @@ jsonFunctionName | TO_JSON_STRING | JSON_EXTRACT | JSON_DELETE - | JSON_EXTEND | JSON_APPEND | JSON_KEYS | JSON_VALID -// | JSON_APPEND -// | JSON_DELETE // | JSON_EXTEND // | JSON_SET // | JSON_ARRAY_ALL_MATCH diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/JsonUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/JsonUtils.java new file mode 100644 index 000000000..9ca6732c6 --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/JsonUtils.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public interface JsonUtils { + ObjectMapper objectMapper = new ObjectMapper(); + + static Object parseValue(String value) { + // Try parsing the value as JSON, fallback to primitive if parsing fails + try { + return objectMapper.readValue(value, Object.class); + } catch (Exception e) { + // Primitive value, return as is + return value; + } + } + + /** + * append nested value to the json object + * @param currentObj + * @param pathParts + * @param depth + * @param valueToAppend + */ + static void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) { + if (currentObj == null || depth >= pathParts.length) { + return; + } + + if (currentObj instanceof Map) { + Map currentMap = (Map) currentObj; + String currentKey = pathParts[depth]; + + if (depth == pathParts.length - 1) { + // If it's the last key, append to the array + currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present + Object existingValue = currentMap.get(currentKey); + + if (existingValue instanceof List) { + List existingList = (List) existingValue; + existingList.add(valueToAppend); + } + } else { + // Continue traversing + currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present + appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend); + } + } else if (currentObj instanceof List) { + // If the current object is a list, process each map in the list + List list = (List) currentObj; + for (Object item : list) { + if (item instanceof Map) { + appendNestedValue(item, pathParts, depth, valueToAppend); + } + } + } + } + + /** + * remove nested json object using its keys parts + * @param currentObj + * @param keyParts + * @param depth + */ + static void removeNestedKey(Object currentObj, String[] keyParts, int depth) { + if (currentObj == null || depth >= keyParts.length) { + return; + } + + if (currentObj instanceof Map) { + Map currentMap = (Map) currentObj; + String currentKey = keyParts[depth]; + + if (depth == keyParts.length - 1) { + // If it's the last key, remove it from the map + currentMap.remove(currentKey); + } else { + // If not the last key, continue traversing + if (currentMap.containsKey(currentKey)) { + Object nextObj = currentMap.get(currentKey); + + if (nextObj instanceof List) { + // If the value is a list, process each item in the list + List list = (List) nextObj; + for (int i = 0; i < list.size(); i++) { + removeNestedKey(list.get(i), keyParts, depth + 1); + } + } else { + // Continue traversing if it's a map + removeNestedKey(nextObj, keyParts, depth + 1); + } + } + } + } + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 7b5e10ffe..e80a26bc4 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -5,7 +5,6 @@ package org.opensearch.sql.expression.function; -import com.fasterxml.jackson.databind.ObjectMapper; import inet.ipaddr.AddressStringException; import inet.ipaddr.IPAddressString; import inet.ipaddr.IPAddressStringParameters; @@ -19,18 +18,19 @@ import scala.collection.mutable.WrappedArray; import scala.runtime.AbstractFunction2; -import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import static org.opensearch.sql.expression.function.JsonUtils.appendNestedValue; +import static org.opensearch.sql.expression.function.JsonUtils.objectMapper; +import static org.opensearch.sql.expression.function.JsonUtils.parseValue; +import static org.opensearch.sql.expression.function.JsonUtils.removeNestedKey; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; public interface SerializableUdf { - ObjectMapper objectMapper = new ObjectMapper(); abstract class SerializableAbstractFunction2 extends AbstractFunction2 implements Serializable { @@ -65,38 +65,6 @@ private void removeKeys(Map map, WrappedArray keysToRemo removeNestedKey(map, keyParts, 0); } } - - private void removeNestedKey(Object currentObj, String[] keyParts, int depth) { - if (currentObj == null || depth >= keyParts.length) { - return; - } - - if (currentObj instanceof Map) { - Map currentMap = (Map) currentObj; - String currentKey = keyParts[depth]; - - if (depth == keyParts.length - 1) { - // If it's the last key, remove it from the map - currentMap.remove(currentKey); - } else { - // If not the last key, continue traversing - if (currentMap.containsKey(currentKey)) { - Object nextObj = currentMap.get(currentKey); - - if (nextObj instanceof List) { - // If the value is a list, process each item in the list - List list = (List) nextObj; - for (int i = 0; i < list.size(); i++) { - removeNestedKey(list.get(i), keyParts, depth + 1); - } - } else { - // Continue traversing if it's a map - removeNestedKey(nextObj, keyParts, depth + 1); - } - } - } - } - } }; Function2, String> jsonAppendFunction = new SerializableAbstractFunction2<>() { @@ -136,88 +104,8 @@ public String apply(String jsonStr, WrappedArray elements) { return null; } } - - private Object parseValue(String value) { - // Try parsing the value as JSON, fallback to primitive if parsing fails - try { - return objectMapper.readValue(value, Object.class); - } catch (Exception e) { - // Primitive value, return as is - return value; - } - } - - private void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) { - if (currentObj == null || depth >= pathParts.length) { - return; - } - - if (currentObj instanceof Map) { - Map currentMap = (Map) currentObj; - String currentKey = pathParts[depth]; - - if (depth == pathParts.length - 1) { - // If it's the last key, append to the array - currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present - Object existingValue = currentMap.get(currentKey); - - if (existingValue instanceof List) { - List existingList = (List) existingValue; - existingList.add(valueToAppend); - } - } else { - // Continue traversing - currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present - appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend); - } - } else if (currentObj instanceof List) { - // If the current object is a list, process each map in the list - List list = (List) currentObj; - for (Object item : list) { - if (item instanceof Map) { - appendNestedValue(item, pathParts, depth, valueToAppend); - } - } - } - } - }; - - /** - * Extend JSON arrays with new values based on specified path-value pairs. - * - * @param jsonStr The input JSON string. - * @param pathValuePairs A list of path-value pairs to extend. - * @return The updated JSON string. - */ - Function2>>, String> jsonExtendFunction = new SerializableAbstractFunction2<>() { - - @Override - public String apply(String jsonStr, List>> pathValuePairs) { - if (jsonStr == null) { - return null; - } - try { - Map jsonMap = objectMapper.readValue(jsonStr, Map.class); - - for (Map.Entry> pathValuePair : pathValuePairs) { - String path = pathValuePair.getKey(); - List values = pathValuePair.getValue(); - - if (jsonMap.containsKey(path) && jsonMap.get(path) instanceof List) { - List existingList = (List) jsonMap.get(path); - existingList.addAll(values); - } else { - jsonMap.put(path, values); - } - } - - return objectMapper.writeValueAsString(jsonMap); - } catch (Exception e) { - return null; - } - } }; - + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() @@ -280,15 +168,6 @@ static ScalaUDF visit(String funcName, List expressions) { Option.apply("json_delete"), false, true); - case "json_extend": - return new ScalaUDF(jsonExtendFunction, - DataTypes.StringType, - seq(expressions), - seq(), - Option.empty(), - Option.apply("json_extend"), - false, - true); case "json_append": return new ScalaUDF(jsonAppendFunction, DataTypes.StringType, diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala index 70a633024..23f6ba5d2 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala @@ -218,12 +218,15 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite planTransformer.visit( plan( pplParser, - """source=t | eval result = json_append('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""), + """source=t | eval result = json_append('{"a":[{"b":1},{"c":2}]}', array('a.b','c','d'))"""), context) val table = UnresolvedRelation(Seq("t")) val keysExpression = - UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false) + UnresolvedFunction( + "array", + Seq(Literal("a.b"), Literal("c"), Literal("d")), + isDistinct = false) val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") val jsonFunc = @@ -239,12 +242,15 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite planTransformer.visit( plan( pplParser, - """source=t | eval result = json_extend('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""), + """source=t | eval result = json_extend('{"a":[{"b":1},{"c":2}]}', array('a.b', 'c','d'))"""), context) val table = UnresolvedRelation(Seq("t")) val keysExpression = - UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false) + UnresolvedFunction( + "array", + Seq(Literal("a.b"), Literal("c"), Literal("d")), + isDistinct = false) val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") val jsonFunc =