Skip to content

Commit

Permalink
Support allow_duplicate_keys for json processor
Browse files Browse the repository at this point in the history
Signed-off-by: Munif Tanjim <[email protected]>
  • Loading branch information
MunifTanjim committed Nov 2, 2023
1 parent 8673fa9 commit 7498602
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public MediaType contentType() {
return mediaType;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys is not possible for maps");

Check warning on line 113 in libs/core/src/main/java/org/opensearch/core/xcontent/MapXContentParser.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/xcontent/MapXContentParser.java#L113

Added line #L113 was not covered by tests
}

@Override
public Token nextToken() throws IOException {
if (iterator == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ enum NumberType {

MediaType contentType();

void allowDuplicateKeys(boolean allowDuplicateKeys);

Token nextToken() throws IOException;

void skipChildren() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public MediaType contentType() {
return parser.contentType();
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
parser.allowDuplicateKeys(allowDuplicateKeys);
}

Check warning on line 73 in libs/core/src/main/java/org/opensearch/core/xcontent/XContentSubParser.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/xcontent/XContentSubParser.java#L72-L73

Added lines #L72 - L73 were not covered by tests

@Override
public Token nextToken() throws IOException {
if (level > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public CborXContentParser(NamedXContentRegistry xContentRegistry, DeprecationHan
public XContentType contentType() {
return XContentType.CBOR;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys after the parser has been created is not possible for CBOR");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public XContentType contentType() {
return XContentType.JSON;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
parser.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, allowDuplicateKeys == false);
}

@Override
public Token nextToken() throws IOException {
return convertToken(parser.nextToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public SmileXContentParser(NamedXContentRegistry xContentRegistry, DeprecationHa
public XContentType contentType() {
return XContentType.SMILE;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys after the parser has been created is not possible for Smile");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ public final class JsonProcessor extends AbstractProcessor {
private final String field;
private final String targetField;
private final boolean addToRoot;
private final boolean allowDuplicateKeys;

JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot) {
JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot, boolean allowDuplicateKeys) {
super(tag, description);
this.field = field;
this.targetField = targetField;
this.addToRoot = addToRoot;
this.allowDuplicateKeys = allowDuplicateKeys;
}

public String getField() {
Expand All @@ -80,7 +82,7 @@ boolean isAddToRoot() {
return addToRoot;
}

public static Object apply(Object fieldValue) {
public static Object apply(Object fieldValue, boolean allowDuplicateKeys) {
BytesReference bytesRef = fieldValue == null ? new BytesArray("null") : new BytesArray(fieldValue.toString());
try (
InputStream stream = bytesRef.streamInput();
Expand All @@ -90,6 +92,7 @@ public static Object apply(Object fieldValue) {
stream
)
) {
parser.allowDuplicateKeys(allowDuplicateKeys);
XContentParser.Token token = parser.nextToken();
Object value = null;
if (token == XContentParser.Token.VALUE_NULL) {
Expand All @@ -113,8 +116,8 @@ public static Object apply(Object fieldValue) {
}
}

public static void apply(Map<String, Object> ctx, String fieldName) {
Object value = apply(ctx.get(fieldName));
public static void apply(Map<String, Object> ctx, String fieldName, boolean allowDuplicateKeys) {
Object value = apply(ctx.get(fieldName), allowDuplicateKeys);
if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) value;
Expand All @@ -127,9 +130,9 @@ public static void apply(Map<String, Object> ctx, String fieldName) {
@Override
public IngestDocument execute(IngestDocument document) throws Exception {
if (addToRoot) {
apply(document.getSourceAndMetadata(), field);
apply(document.getSourceAndMetadata(), field, allowDuplicateKeys);
} else {
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class), allowDuplicateKeys));
}
return document;
}
Expand All @@ -150,6 +153,7 @@ public JsonProcessor create(
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
boolean addToRoot = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "add_to_root", false);
boolean allowDuplicateKeys = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicate_keys", false);

if (addToRoot && targetField != null) {
throw newConfigurationException(
Expand All @@ -164,7 +168,7 @@ public JsonProcessor create(
targetField = field;
}

return new JsonProcessor(processorTag, description, field, targetField, addToRoot);
return new JsonProcessor(processorTag, description, field, targetField, addToRoot, allowDuplicateKeys);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public static String uppercase(String value) {
}

public static Object json(Object fieldValue) {
return JsonProcessor.apply(fieldValue);
return JsonProcessor.apply(fieldValue, false);

Check warning on line 52 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/Processors.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/Processors.java#L52

Added line #L52 was not covered by tests
}

public static void json(Map<String, Object> ctx, String field) {
JsonProcessor.apply(ctx, field);
JsonProcessor.apply(ctx, field, false);

Check warning on line 56 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/Processors.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/Processors.java#L56

Added line #L56 was not covered by tests
}

public static String urlDecode(String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testExecute() throws Exception {
String processorTag = randomAlphaOfLength(3);
String randomField = randomAlphaOfLength(3);
String randomTargetField = randomAlphaOfLength(2);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false, false);
Map<String, Object> document = new HashMap<>();

Map<String, Object> randomJsonMap = RandomDocumentPicks.randomSource(random());
Expand All @@ -71,7 +71,7 @@ public void testExecute() throws Exception {
}

public void testInvalidValue() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", "blah blah");
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -86,7 +86,7 @@ public void testInvalidValue() {
}

public void testByteArray() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", new byte[] { 0, 1 });
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -99,7 +99,7 @@ public void testByteArray() {
}

public void testNull() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -108,7 +108,7 @@ public void testNull() throws Exception {
}

public void testBoolean() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
boolean value = true;
document.put("field", value);
Expand All @@ -118,7 +118,7 @@ public void testBoolean() throws Exception {
}

public void testInteger() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
int value = 3;
document.put("field", value);
Expand All @@ -128,7 +128,7 @@ public void testInteger() throws Exception {
}

public void testDouble() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
double value = 3.0;
document.put("field", value);
Expand All @@ -138,7 +138,7 @@ public void testDouble() throws Exception {
}

public void testString() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
String value = "hello world";
document.put("field", "\"" + value + "\"");
Expand All @@ -148,7 +148,7 @@ public void testString() throws Exception {
}

public void testArray() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
List<Boolean> value = Arrays.asList(true, true, false);
document.put("field", value.toString());
Expand All @@ -158,7 +158,7 @@ public void testArray() throws Exception {
}

public void testFieldMissing() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);

Expand All @@ -169,7 +169,7 @@ public void testFieldMissing() {
public void testAddToRoot() throws Exception {
String processorTag = randomAlphaOfLength(3);
String randomTargetField = randomAlphaOfLength(2);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true, false);
Map<String, Object> document = new HashMap<>();

String json = "{\"a\": 1, \"b\": 2}";
Expand All @@ -185,8 +185,32 @@ public void testAddToRoot() throws Exception {
assertEquals("see", sourceAndMetadata.get("c"));
}

public void testDuplicateKeys() throws Exception {
String processorTag = randomAlphaOfLength(3);
JsonProcessor lenientJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, true);

Map<String, Object> document = new HashMap<>();
String json = "{\"a\": 1, \"a\": 2}";
document.put("a", json);
document.put("c", "see");

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
lenientJsonProcessor.execute(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();
assertEquals(2, sourceAndMetadata.get("a"));
assertEquals("see", sourceAndMetadata.get("c"));

JsonProcessor strictJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, false);
Exception exception = expectThrows(
IllegalArgumentException.class,
() -> strictJsonProcessor.execute(RandomDocumentPicks.randomIngestDocument(random(), document))
);
assertThat(exception.getMessage(), containsString("Duplicate field 'a'"));
}

public void testAddBoolToRoot() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true, false);
Map<String, Object> document = new HashMap<>();
document.put("field", true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ teardown:
ingest.delete_pipeline:
id: "1"
ignore: 404
- do:
ingest.delete_pipeline:
id: "2"
ignore: 404

---
"Test JSON Processor":
Expand Down Expand Up @@ -71,3 +75,36 @@ teardown:
- match: { _source.foo_number: 3 }
- is_true: _source.foo_boolean
- is_false: _source.foo_null

---
"Test JSON Processor duplicate keys":
- do:
ingest.put_pipeline:
id: "2"
body: {
"processors": [
{
"json" : {
"field" : "json",
"add_to_root": true,
"allow_duplicate_keys": true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 2
pipeline: "2"
body: {
json: "{\"dupe\": 1, \"dupe\": 2}",
}

- do:
get:
index: test
id: 2
- match: { _source.dupe: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public MediaType contentType() {
return MediaTypeRegistry.JSON;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys is not possible for JsonToString");

Check warning on line 144 in server/src/main/java/org/opensearch/common/xcontent/JsonToStringXContentParser.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/xcontent/JsonToStringXContentParser.java#L144

Added line #L144 was not covered by tests
}

@Override
public Token nextToken() throws IOException {
return this.parser.nextToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public abstract class BaseXContentTestCase extends OpenSearchTestCase {

protected abstract XContentType xcontentType();

private XContentBuilder builder() throws IOException {
protected XContentBuilder builder() throws IOException {
return XContentBuilder.builder(xcontentType().xContent());
}

Expand Down Expand Up @@ -1106,6 +1106,14 @@ public void testChecksForDuplicates() throws Exception {
}
}

public void testAllowsDuplicates() throws Exception {
XContentBuilder builder = builder().startObject().field("key", 1).field("key", 2).endObject();
try (XContentParser xParser = createParser(builder)) {
xParser.allowDuplicateKeys(true);
assertThat(xParser.map(), equalTo(Map.of("key", 2)));
}
}

public void testNamedObject() throws IOException {
Object test1 = new Object();
Object test2 = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.opensearch.common.xcontent.BaseXContentTestCase;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentParser;

import java.io.ByteArrayOutputStream;

Expand All @@ -52,4 +53,10 @@ public void testBigInteger() throws Exception {
JsonGenerator generator = new CBORFactory().createGenerator(os);
doTestBigInteger(generator, os);
}

public void testAllowsDuplicates() throws Exception {
try (XContentParser xParser = createParser(builder().startObject().endObject())) {
expectThrows(UnsupportedOperationException.class, () -> xParser.allowDuplicateKeys(true));
}
}
}
Loading

0 comments on commit 7498602

Please sign in to comment.