Skip to content

Commit

Permalink
Catch processor exceptions instead of shutting down (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#4155) (opensearch-project#4162)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Feb 21, 2024
1 parent bb494de commit 58fc576
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

final Event event = record.getData();

final String message = event.get(config.getSource(), String.class);
try {
final String message = event.get(config.getSource(), String.class);

if (Objects.isNull(message)) {
continue;
}
if (Objects.isNull(message)) {
continue;
}

final boolean userDidSpecifyHeaderEventKey = Objects.nonNull(config.getColumnNamesSourceKey());
final boolean thisEventHasHeaderSource = userDidSpecifyHeaderEventKey && event.containsKey(config.getColumnNamesSourceKey());
final boolean userDidSpecifyHeaderEventKey = Objects.nonNull(config.getColumnNamesSourceKey());
final boolean thisEventHasHeaderSource = userDidSpecifyHeaderEventKey && event.containsKey(config.getColumnNamesSourceKey());

try {
final MappingIterator<List<String>> messageIterator = mapper.readerFor(List.class).with(schema).readValues(message);

// otherwise the message is empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,17 @@ public DissectProcessor(PluginMetrics pluginMetrics, final DissectProcessorConfi
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
Event event = record.getData();
String dissectWhen = dissectConfig.getDissectWhen();
if (Objects.nonNull(dissectWhen) && !expressionEvaluator.evaluateConditional(dissectWhen, event)) {
continue;
}
try{
for(String field: dissectorMap.keySet()){
String dissectWhen = dissectConfig.getDissectWhen();
if (Objects.nonNull(dissectWhen) && !expressionEvaluator.evaluateConditional(dissectWhen, event)) {
continue;
}
for (String field: dissectorMap.keySet()){
if(event.containsKey(field)){
dissectField(event, field);
}
}
}
catch (Exception ex){
} catch (Exception ex){
LOG.error(EVENT, "Error dissecting the event [{}] ", record.getData(), ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.Stack;
import java.util.ArrayList;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

@DataPrepperPlugin(name = "key_value", pluginType = Processor.class, pluginConfigurationType = KeyValueProcessorConfig.class)
public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(KeyValueProcessor.class);
Expand Down Expand Up @@ -235,38 +237,44 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
for (final Record<Event> record : records) {
final Map<String, Object> outputMap = new HashMap<>();
final Event recordEvent = record.getData();
final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
if (groupsRaw == null) {
continue;
}
final String[] groups = fieldDelimiterPattern.split(groupsRaw, 0);

if (keyValueProcessorConfig.getRecursive()) {
try {
JsonNode recursedTree = recurse(groupsRaw, mapper);
outputMap.putAll(createRecursedMap(recursedTree, mapper));
} catch (Exception e) {
LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
try {
final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
if (groupsRaw == null) {
continue;
}
} else {
try {
outputMap.putAll(createNonRecursedMap(groups));
} catch (Exception e) {
LOG.error("Non-recursive parsing ran into an unexpected error", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
final String[] groups = fieldDelimiterPattern.split(groupsRaw, 0);

if (keyValueProcessorConfig.getRecursive()) {
try {
JsonNode recursedTree = recurse(groupsRaw, mapper);
outputMap.putAll(createRecursedMap(recursedTree, mapper));
} catch (Exception e) {
LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
} else {
try {
outputMap.putAll(createNonRecursedMap(groups));
} catch (Exception e) {
LOG.error("Non-recursive parsing ran into an unexpected error", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
}
}

final Map<String, Object> processedMap = executeConfigs(outputMap);
final Map<String, Object> processedMap = executeConfigs(outputMap);

if (Objects.isNull(keyValueProcessorConfig.getDestination())) {
writeToRoot(recordEvent, processedMap);
} else {
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() ||
!recordEvent.containsKey(keyValueProcessorConfig.getDestination())) {
recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap);
if (Objects.isNull(keyValueProcessorConfig.getDestination())) {
writeToRoot(recordEvent, processedMap);
} else {
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() ||
!recordEvent.containsKey(keyValueProcessorConfig.getDestination())) {
recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap);
}
}
} catch (final Exception e) {
LOG.error(EVENT, "There was an exception while processing on Event [{}]: ", recordEvent, e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,45 +46,49 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();

for(AddEntryProcessorConfig.Entry entry : entries) {
try {
for (AddEntryProcessorConfig.Entry entry : entries) {

if (Objects.nonNull(entry.getAddWhen()) && !expressionEvaluator.evaluateConditional(entry.getAddWhen(), recordEvent)) {
continue;
}

try {
final String key = entry.getKey();
final String metadataKey = entry.getMetadataKey();
Object value;
if (!Objects.isNull(entry.getValueExpression())) {
value = expressionEvaluator.evaluate(entry.getValueExpression(), recordEvent);
} else if (!Objects.isNull(entry.getFormat())) {
try {
value = recordEvent.formatString(entry.getFormat());
} catch (final EventKeyNotFoundException e) {
value = null;
}
} else {
value = entry.getValue();
if (Objects.nonNull(entry.getAddWhen()) && !expressionEvaluator.evaluateConditional(entry.getAddWhen(), recordEvent)) {
continue;
}
if (!Objects.isNull(key)) {
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(key, value);
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
mergeValueToEvent(recordEvent, key, value);

try {
final String key = entry.getKey();
final String metadataKey = entry.getMetadataKey();
Object value;
if (!Objects.isNull(entry.getValueExpression())) {
value = expressionEvaluator.evaluate(entry.getValueExpression(), recordEvent);
} else if (!Objects.isNull(entry.getFormat())) {
try {
value = recordEvent.formatString(entry.getFormat());
} catch (final EventKeyNotFoundException e) {
value = null;
}
} else {
value = entry.getValue();
}
} else {
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
recordEvent.getMetadata().setAttribute(metadataKey, value);
} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
mergeValueToEventMetadata(recordEvent, metadataKey, value);
if (!Objects.isNull(key)) {
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(key, value);
} else if (recordEvent.containsKey(key) && entry.getAppendIfKeyExists()) {
mergeValueToEvent(recordEvent, key, value);
}
} else {
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
recordEvent.getMetadata().setAttribute(metadataKey, value);
} else if (attributes.containsKey(metadataKey) && entry.getAppendIfKeyExists()) {
mergeValueToEventMetadata(recordEvent, metadataKey, value);
}
}
} catch (Exception e) {
LOG.error(EVENT, "Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]",
recordEvent, entry.getKey(), entry.getMetadataKey(), entry.getValueExpression(), entry.getFormat(), entry.getValue(), e);
}
} catch (Exception e) {
LOG.error(EVENT, "Error adding entry to record [{}] with key [{}], metadataKey [{}], value_expression [{}] format [{}], value [{}]",
recordEvent, entry.getKey(), entry.getMetadataKey(), entry.getValueExpression(), entry.getFormat(), entry.getValue(), e);
}
} catch(final Exception e){
LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,30 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();

if (Objects.nonNull(convertWhen) && !expressionEvaluator.evaluateConditional(convertWhen, recordEvent)) {
continue;
}
try {

if (Objects.nonNull(convertWhen) && !expressionEvaluator.evaluateConditional(convertWhen, recordEvent)) {
continue;
}

for(final String key : convertEntryKeys) {
Object keyVal = recordEvent.get(key, Object.class);
if (keyVal != null) {
if (!nullValues.contains(keyVal.toString())) {
try {
recordEvent.put(key, converter.convert(keyVal));
} catch (final RuntimeException e) {
LOG.error(EVENT, "Unable to convert key: {} with value: {} to {}", key, keyVal, type, e);
recordEvent.getMetadata().addTags(tagsOnFailure);
for (final String key : convertEntryKeys) {
Object keyVal = recordEvent.get(key, Object.class);
if (keyVal != null) {
if (!nullValues.contains(keyVal.toString())) {
try {
recordEvent.put(key, converter.convert(keyVal));
} catch (final RuntimeException e) {
LOG.error(EVENT, "Unable to convert key: {} with value: {} to {}", key, keyVal, type, e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
} else {
recordEvent.delete(key);
}
} else {
recordEvent.delete(key);
}
}
} catch (final Exception e) {
LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

@DataPrepperPlugin(name = "copy_values", pluginType = Processor.class, pluginConfigurationType = CopyValueProcessorConfig.class)
public class CopyValueProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(CopyValueProcessor.class);
Expand All @@ -41,8 +43,9 @@ public CopyValueProcessor(final PluginMetrics pluginMetrics, final CopyValueProc
@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();

try {
final Event recordEvent = record.getData();
if (config.getFromList() != null || config.getToList() != null) {
// Copying entries between lists
if (recordEvent.containsKey(config.getToList()) && !config.getOverwriteIfToListExists()) {
Expand Down Expand Up @@ -80,9 +83,8 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}
}
}
} catch (Exception e) {
LOG.error("Fail to perform copy values operation", e);
//TODO: add tagging on failure
} catch (final Exception e) {
LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

@DataPrepperPlugin(name = "delete_entries", pluginType = Processor.class, pluginConfigurationType = DeleteEntryProcessorConfig.class)
public class DeleteEntryProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(DeleteEntryProcessor.class);
private final String[] entries;
private final String deleteWhen;

Expand All @@ -37,13 +43,17 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();

if (Objects.nonNull(deleteWhen) && !expressionEvaluator.evaluateConditional(deleteWhen, recordEvent)) {
continue;
}
try {
if (Objects.nonNull(deleteWhen) && !expressionEvaluator.evaluateConditional(deleteWhen, recordEvent)) {
continue;
}


for(String entry : entries) {
recordEvent.delete(entry);
for (String entry : entries) {
recordEvent.delete(entry);
}
} catch (final Exception e) {
LOG.error(EVENT, "There was an exception while processing Event [{}]", recordEvent, e);
}
}

Expand Down
Loading

0 comments on commit 58fc576

Please sign in to comment.