From 58fc5769704e55658de62f8bf5b79687bd7db2b9 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 21 Feb 2024 11:18:47 -0600 Subject: [PATCH] Catch processor exceptions instead of shutting down (#4155) (#4162) Signed-off-by: Taylor Gray --- .../plugins/processor/csv/CsvProcessor.java | 14 ++-- .../processor/dissect/DissectProcessor.java | 13 ++-- .../processor/keyvalue/KeyValueProcessor.java | 60 +++++++++------- .../mutateevent/AddEntryProcessor.java | 70 ++++++++++--------- .../ConvertEntryTypeProcessor.java | 34 +++++---- .../mutateevent/CopyValueProcessor.java | 10 +-- .../mutateevent/DeleteEntryProcessor.java | 20 ++++-- .../mutateevent/ListToMapProcessor.java | 62 ++++++++-------- .../mutateevent/MapToListProcessor.java | 60 +++++++++------- .../mutateevent/RenameKeyProcessor.java | 33 ++++++--- .../mutatestring/AbstractStringProcessor.java | 6 +- .../obfuscation/ObfuscationProcessor.java | 43 +++++++----- .../processor/truncate/TruncateProcessor.java | 52 ++++++++------ 13 files changed, 276 insertions(+), 201 deletions(-) diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java index 55bd7301cb..a0bb90b860 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java @@ -58,16 +58,16 @@ public Collection> doExecute(final Collection> recor final Event event = record.getData(); - final String message = event.get(config.getSource(), String.class); + try { + final String message = event.get(config.getSource(), String.class); - if (Objects.isNull(message)) { - continue; - } + if (Objects.isNull(message)) { + continue; + } - final boolean userDidSpecifyHeaderEventKey = Objects.nonNull(config.getColumnNamesSourceKey()); - final boolean thisEventHasHeaderSource = userDidSpecifyHeaderEventKey && event.containsKey(config.getColumnNamesSourceKey()); + final boolean userDidSpecifyHeaderEventKey = Objects.nonNull(config.getColumnNamesSourceKey()); + final boolean thisEventHasHeaderSource = userDidSpecifyHeaderEventKey && event.containsKey(config.getColumnNamesSourceKey()); - try { final MappingIterator> messageIterator = mapper.readerFor(List.class).with(schema).readValues(message); // otherwise the message is empty diff --git a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java index 5ff7f4ad56..0977d998a3 100644 --- a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java +++ b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java @@ -55,18 +55,17 @@ public DissectProcessor(PluginMetrics pluginMetrics, final DissectProcessorConfi public Collection> doExecute(Collection> records) { for (final Record record : records) { Event event = record.getData(); - String dissectWhen = dissectConfig.getDissectWhen(); - if (Objects.nonNull(dissectWhen) && !expressionEvaluator.evaluateConditional(dissectWhen, event)) { - continue; - } try{ - for(String field: dissectorMap.keySet()){ + String dissectWhen = dissectConfig.getDissectWhen(); + if (Objects.nonNull(dissectWhen) && !expressionEvaluator.evaluateConditional(dissectWhen, event)) { + continue; + } + for (String field: dissectorMap.keySet()){ if(event.containsKey(field)){ dissectField(event, field); } } - } - catch (Exception ex){ + } catch (Exception ex){ LOG.error(EVENT, "Error dissecting the event [{}] ", record.getData(), ex); } } diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index e7121456ce..0ccfa90baa 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -35,6 +35,8 @@ import java.util.Stack; import java.util.ArrayList; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + @DataPrepperPlugin(name = "key_value", pluginType = Processor.class, pluginConfigurationType = KeyValueProcessorConfig.class) public class KeyValueProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(KeyValueProcessor.class); @@ -235,38 +237,44 @@ public Collection> doExecute(final Collection> recor for (final Record record : records) { final Map outputMap = new HashMap<>(); final Event recordEvent = record.getData(); - final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class); - if (groupsRaw == null) { - continue; - } - final String[] groups = fieldDelimiterPattern.split(groupsRaw, 0); - if (keyValueProcessorConfig.getRecursive()) { - try { - JsonNode recursedTree = recurse(groupsRaw, mapper); - outputMap.putAll(createRecursedMap(recursedTree, mapper)); - } catch (Exception e) { - LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e); - recordEvent.getMetadata().addTags(tagsOnFailure); + try { + final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class); + if (groupsRaw == null) { + continue; } - } else { - try { - outputMap.putAll(createNonRecursedMap(groups)); - } catch (Exception e) { - LOG.error("Non-recursive parsing ran into an unexpected error", e); - recordEvent.getMetadata().addTags(tagsOnFailure); + final String[] groups = fieldDelimiterPattern.split(groupsRaw, 0); + + if (keyValueProcessorConfig.getRecursive()) { + try { + JsonNode recursedTree = recurse(groupsRaw, mapper); + outputMap.putAll(createRecursedMap(recursedTree, mapper)); + } catch (Exception e) { + LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e); + recordEvent.getMetadata().addTags(tagsOnFailure); + } + } else { + try { + outputMap.putAll(createNonRecursedMap(groups)); + } catch (Exception e) { + LOG.error("Non-recursive parsing ran into an unexpected error", e); + recordEvent.getMetadata().addTags(tagsOnFailure); + } } - } - final Map processedMap = executeConfigs(outputMap); + final Map processedMap = executeConfigs(outputMap); - if (Objects.isNull(keyValueProcessorConfig.getDestination())) { - writeToRoot(recordEvent, processedMap); - } else { - if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || - !recordEvent.containsKey(keyValueProcessorConfig.getDestination())) { - recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap); + if (Objects.isNull(keyValueProcessorConfig.getDestination())) { + writeToRoot(recordEvent, processedMap); + } else { + if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || + !recordEvent.containsKey(keyValueProcessorConfig.getDestination())) { + recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap); + } } + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing on Event [{}]: ", recordEvent, e); + recordEvent.getMetadata().addTags(tagsOnFailure); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java index 4096a2f58a..e3e414f96d 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java @@ -46,45 +46,49 @@ public Collection> doExecute(final Collection> recor for(final Record record : records) { final Event recordEvent = record.getData(); - for(AddEntryProcessorConfig.Entry entry : entries) { + try { + for (AddEntryProcessorConfig.Entry entry : entries) { - if (Objects.nonNull(entry.getAddWhen()) && !expressionEvaluator.evaluateConditional(entry.getAddWhen(), recordEvent)) { - continue; - } - - try { - final String key = entry.getKey(); - final String metadataKey = entry.getMetadataKey(); - Object value; - if (!Objects.isNull(entry.getValueExpression())) { - value = expressionEvaluator.evaluate(entry.getValueExpression(), recordEvent); - } else if (!Objects.isNull(entry.getFormat())) { - try { - value = recordEvent.formatString(entry.getFormat()); - } catch (final EventKeyNotFoundException e) { - value = null; - } - } else { - value = entry.getValue(); + if (Objects.nonNull(entry.getAddWhen()) && !expressionEvaluator.evaluateConditional(entry.getAddWhen(), recordEvent)) { + continue; } - if (!Objects.isNull(key)) { - if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) { - recordEvent.put(key, value); - } else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) { - mergeValueToEvent(recordEvent, key, value); + + try { + final String key = entry.getKey(); + final String metadataKey = entry.getMetadataKey(); + Object value; + if (!Objects.isNull(entry.getValueExpression())) { + value = expressionEvaluator.evaluate(entry.getValueExpression(), recordEvent); + } else if (!Objects.isNull(entry.getFormat())) { + try { + value = recordEvent.formatString(entry.getFormat()); + } catch (final EventKeyNotFoundException e) { + value = null; + } + } else { + value = entry.getValue(); } - } else { - Map attributes = recordEvent.getMetadata().getAttributes(); - if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) { - recordEvent.getMetadata().setAttribute(metadataKey, value); - } else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) { - mergeValueToEventMetadata(recordEvent, metadataKey, value); + if (!Objects.isNull(key)) { + if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) { + recordEvent.put(key, value); + } else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) { + mergeValueToEvent(recordEvent, key, value); + } + } else { + Map attributes = recordEvent.getMetadata().getAttributes(); + if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) { + recordEvent.getMetadata().setAttribute(metadataKey, value); + } else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) { + mergeValueToEventMetadata(recordEvent, metadataKey, value); + } } + } catch (Exception e) { + LOG.error(EVENT, "Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]", + recordEvent, entry.getKey(), entry.getMetadataKey(), entry.getValueExpression(), entry.getFormat(), entry.getValue(), e); } - } catch (Exception e) { - LOG.error(EVENT, "Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]", - recordEvent, entry.getKey(), entry.getMetadataKey(), entry.getValueExpression(), entry.getFormat(), entry.getValue(), e); } + } catch(final Exception e){ + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessor.java index 8ba0400734..95cbd9f714 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessor.java @@ -56,24 +56,30 @@ public Collection> doExecute(final Collection> recor for(final Record record : records) { final Event recordEvent = record.getData(); - if (Objects.nonNull(convertWhen) && !expressionEvaluator.evaluateConditional(convertWhen, recordEvent)) { - continue; - } + try { + + if (Objects.nonNull(convertWhen) && !expressionEvaluator.evaluateConditional(convertWhen, recordEvent)) { + continue; + } - for(final String key : convertEntryKeys) { - Object keyVal = recordEvent.get(key, Object.class); - if (keyVal != null) { - if (!nullValues.contains(keyVal.toString())) { - try { - recordEvent.put(key, converter.convert(keyVal)); - } catch (final RuntimeException e) { - LOG.error(EVENT, "Unable to convert key: {} with value: {} to {}", key, keyVal, type, e); - recordEvent.getMetadata().addTags(tagsOnFailure); + for (final String key : convertEntryKeys) { + Object keyVal = recordEvent.get(key, Object.class); + if (keyVal != null) { + if (!nullValues.contains(keyVal.toString())) { + try { + recordEvent.put(key, converter.convert(keyVal)); + } catch (final RuntimeException e) { + LOG.error(EVENT, "Unable to convert key: {} with value: {} to {}", key, keyVal, type, e); + recordEvent.getMetadata().addTags(tagsOnFailure); + } + } else { + recordEvent.delete(key); } - } else { - recordEvent.delete(key); } } + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); + recordEvent.getMetadata().addTags(tagsOnFailure); } } return records; diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java index 9883f71c29..845ae40e38 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Objects; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + @DataPrepperPlugin(name = "copy_values", pluginType = Processor.class, pluginConfigurationType = CopyValueProcessorConfig.class) public class CopyValueProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(CopyValueProcessor.class); @@ -41,8 +43,9 @@ public CopyValueProcessor(final PluginMetrics pluginMetrics, final CopyValueProc @Override public Collection> doExecute(final Collection> records) { for(final Record record : records) { + final Event recordEvent = record.getData(); + try { - final Event recordEvent = record.getData(); if (config.getFromList() != null || config.getToList() != null) { // Copying entries between lists if (recordEvent.containsKey(config.getToList()) && !config.getOverwriteIfToListExists()) { @@ -80,9 +83,8 @@ public Collection> doExecute(final Collection> recor } } } - } catch (Exception e) { - LOG.error("Fail to perform copy values operation", e); - //TODO: add tagging on failure + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java index 5bb01fa7a7..d7c902a32c 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java @@ -13,12 +13,18 @@ import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Objects; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + @DataPrepperPlugin(name = "delete_entries", pluginType = Processor.class, pluginConfigurationType = DeleteEntryProcessorConfig.class) public class DeleteEntryProcessor extends AbstractProcessor, Record> { + + private static final Logger LOG = LoggerFactory.getLogger(DeleteEntryProcessor.class); private final String[] entries; private final String deleteWhen; @@ -37,13 +43,17 @@ public Collection> doExecute(final Collection> recor for(final Record record : records) { final Event recordEvent = record.getData(); - if (Objects.nonNull(deleteWhen) && !expressionEvaluator.evaluateConditional(deleteWhen, recordEvent)) { - continue; - } + try { + if (Objects.nonNull(deleteWhen) && !expressionEvaluator.evaluateConditional(deleteWhen, recordEvent)) { + continue; + } - for(String entry : entries) { - recordEvent.delete(entry); + for (String entry : entries) { + recordEvent.delete(entry); + } + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java index de6757bace..d042f8fa28 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java @@ -45,38 +45,44 @@ public Collection> doExecute(final Collection> recor for (final Record record : records) { final Event recordEvent = record.getData(); - if (Objects.nonNull(config.getListToMapWhen()) && !expressionEvaluator.evaluateConditional(config.getListToMapWhen(), recordEvent)) { - continue; - } - - final List> sourceList; try { - sourceList = recordEvent.get(config.getSource(), List.class); - } catch (final Exception e) { - LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]", - config.getSource(), recordEvent, e); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); - continue; - } - final Map targetMap; - try { - targetMap = constructTargetMap(sourceList); - } catch (final IllegalArgumentException e) { - LOG.warn(EVENT, "Cannot find a list at the given source path [{}} on record [{}]", - config.getSource(), recordEvent, e); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); - continue; - } catch (final Exception e) { - LOG.error(EVENT, "Error converting source list to map on record [{}]", recordEvent, e); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); - continue; - } + if (Objects.nonNull(config.getListToMapWhen()) && !expressionEvaluator.evaluateConditional(config.getListToMapWhen(), recordEvent)) { + continue; + } - try { - updateEvent(recordEvent, targetMap); + final List> sourceList; + try { + sourceList = recordEvent.get(config.getSource(), List.class); + } catch (final Exception e) { + LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]", + config.getSource(), recordEvent, e); + recordEvent.getMetadata().addTags(config.getTagsOnFailure()); + continue; + } + + final Map targetMap; + try { + targetMap = constructTargetMap(sourceList); + } catch (final IllegalArgumentException e) { + LOG.warn(EVENT, "Cannot find a list at the given source path [{}} on record [{}]", + config.getSource(), recordEvent, e); + recordEvent.getMetadata().addTags(config.getTagsOnFailure()); + continue; + } catch (final Exception e) { + LOG.error(EVENT, "Error converting source list to map on record [{}]", recordEvent, e); + recordEvent.getMetadata().addTags(config.getTagsOnFailure()); + continue; + } + + try { + updateEvent(recordEvent, targetMap); + } catch (final Exception e) { + LOG.error(EVENT, "Error updating record [{}] after converting source list to map", recordEvent, e); + recordEvent.getMetadata().addTags(config.getTagsOnFailure()); + } } catch (final Exception e) { - LOG.error(EVENT, "Error updating record [{}] after converting source list to map", recordEvent, e); + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); recordEvent.getMetadata().addTags(config.getTagsOnFailure()); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java index 7b088f9976..d911cd6194 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java @@ -27,6 +27,8 @@ import java.util.Objects; import java.util.Set; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + @DataPrepperPlugin(name = "map_to_list", pluginType = Processor.class, pluginConfigurationType = MapToListProcessorConfig.class) public class MapToListProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(MapToListProcessor.class); @@ -48,39 +50,45 @@ public Collection> doExecute(final Collection> recor for (final Record record : records) { final Event recordEvent = record.getData(); - if (config.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(config.getMapToListWhen(), recordEvent)) { - continue; - } - try { - final Map sourceMap = getSourceMap(recordEvent); - if (config.getConvertFieldToList()) { - final List> targetNestedList = new ArrayList<>(); + if (config.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(config.getMapToListWhen(), recordEvent)) { + continue; + } + + try { + final Map sourceMap = getSourceMap(recordEvent); - for (final Map.Entry entry : sourceMap.entrySet()) { - if (!excludeKeySet.contains(entry.getKey())) { - targetNestedList.add(List.of(entry.getKey(), entry.getValue())); - } + if (config.getConvertFieldToList()) { + final List> targetNestedList = new ArrayList<>(); - } - removeProcessedFields(sourceMap, recordEvent); - recordEvent.put(config.getTarget(), targetNestedList); - } else { - final List> targetList = new ArrayList<>(); - for (final Map.Entry entry : sourceMap.entrySet()) { - if (!excludeKeySet.contains(entry.getKey())) { - targetList.add(Map.of( - config.getKeyName(), entry.getKey(), - config.getValueName(), entry.getValue() - )); + for (final Map.Entry entry : sourceMap.entrySet()) { + if (!excludeKeySet.contains(entry.getKey())) { + targetNestedList.add(List.of(entry.getKey(), entry.getValue())); + } + + } + removeProcessedFields(sourceMap, recordEvent); + recordEvent.put(config.getTarget(), targetNestedList); + } else { + final List> targetList = new ArrayList<>(); + for (final Map.Entry entry : sourceMap.entrySet()) { + if (!excludeKeySet.contains(entry.getKey())) { + targetList.add(Map.of( + config.getKeyName(), entry.getKey(), + config.getValueName(), entry.getValue() + )); + } } + removeProcessedFields(sourceMap, recordEvent); + recordEvent.put(config.getTarget(), targetList); } - removeProcessedFields(sourceMap, recordEvent); - recordEvent.put(config.getTarget(), targetList); + } catch (Exception e) { + LOG.error("Fail to perform Map to List operation", e); + recordEvent.getMetadata().addTags(config.getTagsOnFailure()); } - } catch (Exception e) { - LOG.error("Fail to perform Map to List operation", e); + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); recordEvent.getMetadata().addTags(config.getTagsOnFailure()); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java index eff59f7751..05c1ad7530 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java @@ -13,13 +13,19 @@ import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.List; import java.util.Objects; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + @DataPrepperPlugin(name = "rename_keys", pluginType = Processor.class, pluginConfigurationType = RenameKeyProcessorConfig.class) public class RenameKeyProcessor extends AbstractProcessor, Record> { + + private static final Logger LOG = LoggerFactory.getLogger(RenameKeyProcessor.class); private final List entries; private final ExpressionEvaluator expressionEvaluator; @@ -36,20 +42,25 @@ public Collection> doExecute(final Collection> recor for(final Record record : records) { final Event recordEvent = record.getData(); - for(RenameKeyProcessorConfig.Entry entry : entries) { - if (Objects.nonNull(entry.getRenameWhen()) && !expressionEvaluator.evaluateConditional(entry.getRenameWhen(), recordEvent)) { - continue; - } + try { - if(entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) { - continue; - } + for (RenameKeyProcessorConfig.Entry entry : entries) { + if (Objects.nonNull(entry.getRenameWhen()) && !expressionEvaluator.evaluateConditional(entry.getRenameWhen(), recordEvent)) { + continue; + } + + if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) { + continue; + } - if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) { - final Object source = recordEvent.get(entry.getFromKey(), Object.class); - recordEvent.put(entry.getToKey(), source); - recordEvent.delete(entry.getFromKey()); + if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) { + final Object source = recordEvent.get(entry.getFromKey(), Object.class); + recordEvent.put(entry.getToKey(), source); + recordEvent.delete(entry.getFromKey()); + } } + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); } } diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/AbstractStringProcessor.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/AbstractStringProcessor.java index bcd0f21aa8..19d11daf62 100644 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/AbstractStringProcessor.java +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/AbstractStringProcessor.java @@ -33,7 +33,11 @@ public AbstractStringProcessor(final PluginMetrics pluginMetrics, final StringPr public Collection> doExecute(final Collection> records) { for(final Record record : records) { final Event recordEvent = record.getData(); - performStringAction(recordEvent); + try { + performStringAction(recordEvent); + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); + } } return records; diff --git a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessor.java b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessor.java index 0ffc309797..21167bc747 100644 --- a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessor.java +++ b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessor.java @@ -29,6 +29,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + @DataPrepperPlugin(name = "obfuscate", pluginType = Processor.class, pluginConfigurationType = ObfuscationProcessorConfig.class) public class ObfuscationProcessor extends AbstractProcessor, Record> { @@ -106,29 +108,34 @@ public Collection> doExecute(Collection> records) { for (final Record record : records) { final Event recordEvent = record.getData(); - if (obfuscationProcessorConfig.getObfuscateWhen() != null && !expressionEvaluator.evaluateConditional(obfuscationProcessorConfig.getObfuscateWhen(), recordEvent)) { - continue; - } + try { - if (!recordEvent.containsKey(source)) { - continue; - } + if (obfuscationProcessorConfig.getObfuscateWhen() != null && !expressionEvaluator.evaluateConditional(obfuscationProcessorConfig.getObfuscateWhen(), recordEvent)) { + continue; + } - String rawValue = recordEvent.get(source, String.class); + if (!recordEvent.containsKey(source)) { + continue; + } - // Call obfuscation action - String newValue = this.action.obfuscate(rawValue, patterns); + String rawValue = recordEvent.get(source, String.class); - // No changes means it does not match any patterns - if (rawValue.equals(newValue)) { - recordEvent.getMetadata().addTags(obfuscationProcessorConfig.getTagsOnMatchFailure()); - } + // Call obfuscation action + String newValue = this.action.obfuscate(rawValue, patterns); + + // No changes means it does not match any patterns + if (rawValue.equals(newValue)) { + recordEvent.getMetadata().addTags(obfuscationProcessorConfig.getTagsOnMatchFailure()); + } - // Update the event record. - if (target == null || target.isEmpty()) { - recordEvent.put(source, newValue); - } else { - recordEvent.put(target, newValue); + // Update the event record. + if (target == null || target.isEmpty()) { + recordEvent.put(source, newValue); + } else { + recordEvent.put(target, newValue); + } + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); } } return records; diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java index 8449675791..c9b71173ce 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java @@ -13,11 +13,15 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.ArrayList; import java.util.List; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + /** * This processor takes in a key and truncates its value to a string with * characters from the front or at the end or at both removed. @@ -25,6 +29,7 @@ */ @DataPrepperPlugin(name = "truncate", pluginType = Processor.class, pluginConfigurationType = TruncateProcessorConfig.class) public class TruncateProcessor extends AbstractProcessor, Record>{ + private static final Logger LOG = LoggerFactory.getLogger(TruncateProcessor.class); private final ExpressionEvaluator expressionEvaluator; private final List entries; @@ -48,34 +53,39 @@ private String getTruncatedValue(final String value, final int startIndex, final public Collection> doExecute(final Collection> records) { for(final Record record : records) { final Event recordEvent = record.getData(); - for (TruncateProcessorConfig.Entry entry: entries) { - final List sourceKeys = entry.getSourceKeys(); - final String truncateWhen = entry.getTruncateWhen(); - final int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); - final Integer length = entry.getLength(); - if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) { - continue; - } - for (String sourceKey: sourceKeys) { - if (!recordEvent.containsKey(sourceKey)) { + + try { + for (TruncateProcessorConfig.Entry entry : entries) { + final List sourceKeys = entry.getSourceKeys(); + final String truncateWhen = entry.getTruncateWhen(); + final int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); + final Integer length = entry.getLength(); + if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) { continue; } + for (String sourceKey : sourceKeys) { + if (!recordEvent.containsKey(sourceKey)) { + continue; + } - final Object value = recordEvent.get(sourceKey, Object.class); - if (value instanceof String) { - recordEvent.put(sourceKey, getTruncatedValue((String)value, startIndex, length)); - } else if (value instanceof List) { - List result = new ArrayList<>(); - for (Object listItem: (List)value) { - if (listItem instanceof String) { - result.add(getTruncatedValue((String)listItem, startIndex, length)); - } else { - result.add(listItem); + final Object value = recordEvent.get(sourceKey, Object.class); + if (value instanceof String) { + recordEvent.put(sourceKey, getTruncatedValue((String) value, startIndex, length)); + } else if (value instanceof List) { + List result = new ArrayList<>(); + for (Object listItem : (List) value) { + if (listItem instanceof String) { + result.add(getTruncatedValue((String) listItem, startIndex, length)); + } else { + result.add(listItem); + } } + recordEvent.put(sourceKey, result); } - recordEvent.put(sourceKey, result); } } + } catch (final Exception e) { + LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e); } }