Skip to content

Commit

Permalink
Mutate Entry Processors (opensearch-project#1002)
Browse files Browse the repository at this point in the history
Add Generic Mutate Processors

Signed-off-by: David Powers <[email protected]>
  • Loading branch information
dapowers87 authored Feb 14, 2022
1 parent aac15ad commit eee07a7
Show file tree
Hide file tree
Showing 15 changed files with 1,328 additions and 0 deletions.
207 changes: 207 additions & 0 deletions data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# Mutate Event Processors
The following is a list of processors available to mutate an event.

___

##AddEntryProcessor
A processor that adds entries to an event

###Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- add_entries:
entries:
- key: "newMessage"
value: 3
overwrite_if_key_exists: true
sink:
- stdout:
```
Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"message": "value"}
```

When run, the processor will parse the message into the following output:

```json
{"message": "value", "newMessage": 3}
```

> If `newMessage` had already existed, its existing value would have been overwritten with `3`

###Configuration
* `entries` - (required) - A list of entries to add to an event
* `key` - (required) - The key of the new entry to be added
* `value` - (required) - The value of the new entry to be added. Strings, booleans, numbers, null, nested objects, and arrays containing the aforementioned data types are valid to use
* `overwrite_if_key_exists` - (optional) - When set to `true`, if `key` already exists in the event, then the existing value will be overwritten. The default is `false`.

___

##CopyValueProcessor
A processor that copies values within an event

###Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- copy_values:
entries:
- from_key: "message"
to_key: "newMessage"
overwrite_if_to_key_exists: true
sink:
- stdout:
```

Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"message": "value"}
```

When run, the processor will parse the message into the following output:

```json
{"message": "value", "newMessage": "value"}
```

> If `newMessage` had already existed, its existing value would have been overwritten with `value`

###Configuration
* `entries` - (required) - A list of entries to be copied in an event
* `from_key` - (required) - The key of the entry to be copied
* `to_key` - (required) - The key of the new entry to be added
* `overwrite_if_to_key_exists` - (optional) - When set to `true`, if `to_key` already exists in the event, then the existing value will be overwritten. The default is `false`.

___

##DeleteEntryProcessor
A processor that deletes entries in an event

###Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- delete_entries:
with_keys: ["message"]
sink:
- stdout:
```

Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"message": "value", "message2": "value2"}
```

When run, the processor will parse the message into the following output:

```json
{"message2": "value2"}
```

> If `message` had not existed in the event, then nothing would have happened

###Configuration
* `with_keys` - (required) - An array of keys of the entries to be deleted

___

##RenameKeyProcessor
A processor that renames keys in an event

###Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- rename_keys:
entries:
- from_key: "message"
to_key: "newMessage"
overwrite_if_to_key_exists: true
sink:
- stdout:
```

Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"message": "value"}
```

When run, the processor will parse the message into the following output:

```json
{"newMessage": "value"}
```

> If `newMessage` had already existed, its existing value would have been overwritten with `value`

###Configuration
* `entries` - (required) - A list of entries to rename in an event
* `from_key` - (required) - The key of the entry to be renamed
* `to_key` - (required) - The new key of the entry
* `overwrite_if_to_key_exists` - (optional) - When set to `true`, if `to_key` already exists in the event, then the existing value will be overwritten. The default is `false`.

###Special Consideration
The renaming operation occurs in the order defined. This means that chaining is implicit with the RenameKeyProcessor. Take the following `piplines.yaml` for example:
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- rename_key:
entries:
- from_key: "message"
to_key: "message2"
- from_key: "message2"
to_key: "message3"
sink:
- stdout:
```

Let the contents of `logs_json.log` be the following:
```json
{"message": "value"}
```

After the processor runs, this will be the output
```json
{"message3": "value"}
```

## Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/readme/monitoring.md)
25 changes: 25 additions & 0 deletions data-prepper-plugins/mutate-event-processors/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 1.0
}
}
}
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.mutateevent;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.processor.AbstractProcessor;
import com.amazon.dataprepper.model.processor.Processor;
import com.amazon.dataprepper.model.record.Record;

import java.util.Collection;
import java.util.List;

@DataPrepperPlugin(name = "add_entries", pluginType = Processor.class, pluginConfigurationType = AddEntryProcessorConfig.class)
public class AddEntryProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private final List<AddEntryProcessorConfig.Entry> entries;

@DataPrepperPluginConstructor
public AddEntryProcessor(final PluginMetrics pluginMetrics, final AddEntryProcessorConfig config) {
super(pluginMetrics);
this.entries = config.getEntries();
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();

for(AddEntryProcessorConfig.Entry entry : entries) {
if (!recordEvent.containsKey(entry.getKey()) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(entry.getKey(), entry.getValue());
}
}
}

return records;
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.List;

public class AddEntryProcessorConfig {
public static class Entry {
@NotEmpty
@NotNull
private String key;

@NotEmpty
@NotNull
private Object value;

@JsonProperty("overwrite_if_key_exists")
private boolean overwriteIfKeyExists = false;

public String getKey() {
return key;
}

public Object getValue() {
return value;
}

public boolean getOverwriteIfKeyExists() {
return overwriteIfKeyExists;
}

public Entry(final String key, final Object value, final boolean overwriteIfKeyExists)
{
this.key = key;
this.value = value;
this.overwriteIfKeyExists = overwriteIfKeyExists;
}

public Entry() {

}
}

@NotEmpty
@NotNull
private List<Entry> entries;

public List<Entry> getEntries() {
return entries;
}
}
Loading

0 comments on commit eee07a7

Please sign in to comment.