Skip to content

Commit

Permalink
Support allow_duplicate_keys for json processor
Browse files Browse the repository at this point in the history
  • Loading branch information
MunifTanjim committed Nov 2, 2023
1 parent 8673fa9 commit aa08236
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public MediaType contentType() {
return mediaType;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys is not possible for maps");
}

@Override
public Token nextToken() throws IOException {
if (iterator == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ enum NumberType {

MediaType contentType();

void allowDuplicateKeys(boolean allowDuplicateKeys);

Token nextToken() throws IOException;

void skipChildren() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public MediaType contentType() {
return parser.contentType();
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
parser.allowDuplicateKeys(allowDuplicateKeys);
}

@Override
public Token nextToken() throws IOException {
if (level > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public CborXContentParser(NamedXContentRegistry xContentRegistry, DeprecationHan
public XContentType contentType() {
return XContentType.CBOR;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys after the parser has been created is not possible for CBOR");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public XContentType contentType() {
return XContentType.JSON;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
parser.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, allowDuplicateKeys == false);
}

@Override
public Token nextToken() throws IOException {
return convertToken(parser.nextToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public SmileXContentParser(NamedXContentRegistry xContentRegistry, DeprecationHa
public XContentType contentType() {
return XContentType.SMILE;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys after the parser has been created is not possible for Smile");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ public final class JsonProcessor extends AbstractProcessor {
private final String field;
private final String targetField;
private final boolean addToRoot;
private final boolean allowDuplicateKeys;

JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot) {
JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot, boolean allowDuplicateKeys) {
super(tag, description);
this.field = field;
this.targetField = targetField;
this.addToRoot = addToRoot;
this.allowDuplicateKeys = allowDuplicateKeys;
}

public String getField() {
Expand All @@ -80,7 +82,7 @@ boolean isAddToRoot() {
return addToRoot;
}

public static Object apply(Object fieldValue) {
public static Object apply(Object fieldValue, boolean allowDuplicateKeys) {
BytesReference bytesRef = fieldValue == null ? new BytesArray("null") : new BytesArray(fieldValue.toString());
try (
InputStream stream = bytesRef.streamInput();
Expand All @@ -90,6 +92,7 @@ public static Object apply(Object fieldValue) {
stream
)
) {
parser.allowDuplicateKeys(allowDuplicateKeys);
XContentParser.Token token = parser.nextToken();
Object value = null;
if (token == XContentParser.Token.VALUE_NULL) {
Expand All @@ -113,8 +116,8 @@ public static Object apply(Object fieldValue) {
}
}

public static void apply(Map<String, Object> ctx, String fieldName) {
Object value = apply(ctx.get(fieldName));
public static void apply(Map<String, Object> ctx, String fieldName, boolean allowDuplicateKeys) {
Object value = apply(ctx.get(fieldName), allowDuplicateKeys);
if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) value;
Expand All @@ -127,9 +130,9 @@ public static void apply(Map<String, Object> ctx, String fieldName) {
@Override
public IngestDocument execute(IngestDocument document) throws Exception {
if (addToRoot) {
apply(document.getSourceAndMetadata(), field);
apply(document.getSourceAndMetadata(), field, allowDuplicateKeys);
} else {
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class), allowDuplicateKeys));
}
return document;
}
Expand All @@ -150,6 +153,7 @@ public JsonProcessor create(
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
boolean addToRoot = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "add_to_root", false);
boolean allowDuplicateKeys = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicate_keys", false);

if (addToRoot && targetField != null) {
throw newConfigurationException(
Expand All @@ -164,7 +168,7 @@ public JsonProcessor create(
targetField = field;
}

return new JsonProcessor(processorTag, description, field, targetField, addToRoot);
return new JsonProcessor(processorTag, description, field, targetField, addToRoot, allowDuplicateKeys);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public static String uppercase(String value) {
}

public static Object json(Object fieldValue) {
return JsonProcessor.apply(fieldValue);
return JsonProcessor.apply(fieldValue, false);
}

public static void json(Map<String, Object> ctx, String field) {
JsonProcessor.apply(ctx, field);
JsonProcessor.apply(ctx, field, false);
}

public static String urlDecode(String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testExecute() throws Exception {
String processorTag = randomAlphaOfLength(3);
String randomField = randomAlphaOfLength(3);
String randomTargetField = randomAlphaOfLength(2);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false, false);
Map<String, Object> document = new HashMap<>();

Map<String, Object> randomJsonMap = RandomDocumentPicks.randomSource(random());
Expand All @@ -71,7 +71,7 @@ public void testExecute() throws Exception {
}

public void testInvalidValue() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", "blah blah");
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -86,7 +86,7 @@ public void testInvalidValue() {
}

public void testByteArray() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", new byte[] { 0, 1 });
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -99,7 +99,7 @@ public void testByteArray() {
}

public void testNull() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -108,7 +108,7 @@ public void testNull() throws Exception {
}

public void testBoolean() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
boolean value = true;
document.put("field", value);
Expand All @@ -118,7 +118,7 @@ public void testBoolean() throws Exception {
}

public void testInteger() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
int value = 3;
document.put("field", value);
Expand All @@ -128,7 +128,7 @@ public void testInteger() throws Exception {
}

public void testDouble() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
double value = 3.0;
document.put("field", value);
Expand All @@ -138,7 +138,7 @@ public void testDouble() throws Exception {
}

public void testString() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
String value = "hello world";
document.put("field", "\"" + value + "\"");
Expand All @@ -148,7 +148,7 @@ public void testString() throws Exception {
}

public void testArray() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
List<Boolean> value = Arrays.asList(true, true, false);
document.put("field", value.toString());
Expand All @@ -158,7 +158,7 @@ public void testArray() throws Exception {
}

public void testFieldMissing() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);

Expand All @@ -169,7 +169,7 @@ public void testFieldMissing() {
public void testAddToRoot() throws Exception {
String processorTag = randomAlphaOfLength(3);
String randomTargetField = randomAlphaOfLength(2);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true, false);
Map<String, Object> document = new HashMap<>();

String json = "{\"a\": 1, \"b\": 2}";
Expand All @@ -185,8 +185,30 @@ public void testAddToRoot() throws Exception {
assertEquals("see", sourceAndMetadata.get("c"));
}

public void testDuplicateKeys() throws Exception {
String processorTag = randomAlphaOfLength(3);
JsonProcessor lenientJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, true);

Map<String, Object> document = new HashMap<>();
String json = "{\"a\": 1, \"a\": 2}";
document.put("a", json);
document.put("c", "see");

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
lenientJsonProcessor.execute(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();
assertEquals(2, sourceAndMetadata.get("a"));
assertEquals("see", sourceAndMetadata.get("c"));

JsonProcessor strictJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, false);
Exception exception = expectThrows(IllegalArgumentException.class, () ->
strictJsonProcessor.execute(RandomDocumentPicks.randomIngestDocument(random(), document)));
assertThat(exception.getMessage(), containsString("Duplicate field 'a'"));
}

public void testAddBoolToRoot() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true, false);
Map<String, Object> document = new HashMap<>();
document.put("field", true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ teardown:
ingest.delete_pipeline:
id: "1"
ignore: 404
- do:
ingest.delete_pipeline:
id: "2"
ignore: 404

---
"Test JSON Processor":
Expand Down Expand Up @@ -71,3 +75,36 @@ teardown:
- match: { _source.foo_number: 3 }
- is_true: _source.foo_boolean
- is_false: _source.foo_null

---
"Test JSON Processor duplicate keys":
- do:
ingest.put_pipeline:
id: "2"
body: {
"processors": [
{
"json" : {
"field" : "json",
"add_to_root": true,
"allow_duplicate_keys": true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 2
pipeline: "2"
body: {
json: "{\"dupe\": 1, \"dupe\": 2}",
}

- do:
get:
index: test
id: 2
- match: { _source.dupe: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public abstract class BaseXContentTestCase extends OpenSearchTestCase {

protected abstract XContentType xcontentType();

private XContentBuilder builder() throws IOException {
protected XContentBuilder builder() throws IOException {
return XContentBuilder.builder(xcontentType().xContent());
}

Expand Down Expand Up @@ -1106,6 +1106,18 @@ public void testChecksForDuplicates() throws Exception {
}
}

public void testAllowsDuplicates() throws Exception {
XContentBuilder builder = builder()
.startObject()
.field("key", 1)
.field("key", 2)
.endObject();
try (XContentParser xParser = createParser(builder)) {
xParser.allowDuplicateKeys(true);
assertThat(xParser.map(), equalTo(Map.of("key", 2)));
}
}

public void testNamedObject() throws IOException {
Object test1 = new Object();
Object test2 = new Object();
Expand Down

0 comments on commit aa08236

Please sign in to comment.