Skip to content

Commit

Permalink
REF: grok processor with the latest config model (opensearch-project#…
Browse files Browse the repository at this point in the history
…4731)

* REF: grok processor with the latest config model

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Jul 15, 2024
1 parent 731de12 commit 286edc2
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
Expand Down Expand Up @@ -59,7 +59,7 @@


@SingleThread
@DataPrepperPlugin(name = "grok", pluginType = Processor.class)
@DataPrepperPlugin(name = "grok", pluginType = Processor.class, pluginConfigurationType = GrokProcessorConfig.class)
public class GrokProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
static final long EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = 300L;

Expand Down Expand Up @@ -89,20 +89,28 @@ public class GrokProcessor extends AbstractProcessor<Record<Event>, Record<Event
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public GrokProcessor(final PluginSetting pluginSetting, final ExpressionEvaluator expressionEvaluator) {
this(pluginSetting, GrokCompiler.newInstance(), Executors.newSingleThreadExecutor(), expressionEvaluator);
public GrokProcessor(final PluginMetrics pluginMetrics,
final GrokProcessorConfig grokProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
this(pluginMetrics, grokProcessorConfig, GrokCompiler.newInstance(),
Executors.newSingleThreadExecutor(), expressionEvaluator);
}

GrokProcessor(final PluginSetting pluginSetting, final GrokCompiler grokCompiler, final ExecutorService executorService, final ExpressionEvaluator expressionEvaluator) {
super(pluginSetting);
this.grokProcessorConfig = GrokProcessorConfig.buildConfig(pluginSetting);
GrokProcessor(final PluginMetrics pluginMetrics,
final GrokProcessorConfig grokProcessorConfig,
final GrokCompiler grokCompiler,
final ExecutorService executorService,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.grokProcessorConfig = grokProcessorConfig;
this.keysToOverwrite = new HashSet<>(grokProcessorConfig.getkeysToOverwrite());
this.grokCompiler = grokCompiler;
this.fieldToGrok = new LinkedHashMap<>();
this.executorService = executorService;
this.expressionEvaluator = expressionEvaluator;
this.tagsOnMatchFailure = grokProcessorConfig.getTagsOnMatchFailure();
this.tagsOnTimeout = grokProcessorConfig.getTagsOnTimeout();
this.tagsOnTimeout = grokProcessorConfig.getTagsOnTimeout().isEmpty() ?
grokProcessorConfig.getTagsOnMatchFailure() : grokProcessorConfig.getTagsOnTimeout();
grokProcessingMatchCounter = pluginMetrics.counter(GROK_PROCESSING_MATCH);
grokProcessingMismatchCounter = pluginMetrics.counter(GROK_PROCESSING_MISMATCH);
grokProcessingErrorsCounter = pluginMetrics.counter(GROK_PROCESSING_ERRORS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

package org.opensearch.dataprepper.plugins.processor.grok;

import org.opensearch.dataprepper.model.configuration.PluginSetting;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -39,69 +41,57 @@ public class GrokProcessorConfig {
static final int DEFAULT_TIMEOUT_MILLIS = 30000;
static final String DEFAULT_TARGET_KEY = null;

private final boolean breakOnMatch;
private final boolean keepEmptyCaptures;
private final Map<String, List<String>> match;
private final boolean namedCapturesOnly;
private final List<String> keysToOverwrite;
private final List<String> patternsDirectories;
private final String patternsFilesGlob;
private final Map<String, String> patternDefinitions;
private final int timeoutMillis;
private final String targetKey;
private final String grokWhen;
private final List<String> tagsOnMatchFailure;
private final List<String> tagsOnTimeout;

private final boolean includePerformanceMetadata;

private GrokProcessorConfig(final boolean breakOnMatch,
final boolean keepEmptyCaptures,
final Map<String, List<String>> match,
final boolean namedCapturesOnly,
final List<String> keysToOverwrite,
final List<String> patternsDirectories,
final String patternsFilesGlob,
final Map<String, String> patternDefinitions,
final int timeoutMillis,
final String targetKey,
final String grokWhen,
final List<String> tagsOnMatchFailure,
final List<String> tagsOnTimeout,
final boolean includePerformanceMetadata) {

this.breakOnMatch = breakOnMatch;
this.keepEmptyCaptures = keepEmptyCaptures;
this.match = match;
this.namedCapturesOnly = namedCapturesOnly;
this.keysToOverwrite = keysToOverwrite;
this.patternsDirectories = patternsDirectories;
this.patternsFilesGlob = patternsFilesGlob;
this.patternDefinitions = patternDefinitions;
this.timeoutMillis = timeoutMillis;
this.targetKey = targetKey;
this.grokWhen = grokWhen;
this.tagsOnMatchFailure = tagsOnMatchFailure;
this.tagsOnTimeout = tagsOnTimeout.isEmpty() ? tagsOnMatchFailure : tagsOnTimeout;
this.includePerformanceMetadata = includePerformanceMetadata;
}

public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting) {
return new GrokProcessorConfig(pluginSetting.getBooleanOrDefault(BREAK_ON_MATCH, DEFAULT_BREAK_ON_MATCH),
pluginSetting.getBooleanOrDefault(KEEP_EMPTY_CAPTURES, DEFAULT_KEEP_EMPTY_CAPTURES),
pluginSetting.getTypedListMap(MATCH, String.class, String.class),
pluginSetting.getBooleanOrDefault(NAMED_CAPTURES_ONLY, DEFAULT_NAMED_CAPTURES_ONLY),
pluginSetting.getTypedList(KEYS_TO_OVERWRITE, String.class),
pluginSetting.getTypedList(PATTERNS_DIRECTORIES, String.class),
pluginSetting.getStringOrDefault(PATTERNS_FILES_GLOB, DEFAULT_PATTERNS_FILES_GLOB),
pluginSetting.getTypedMap(PATTERN_DEFINITIONS, String.class, String.class),
pluginSetting.getIntegerOrDefault(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS),
pluginSetting.getStringOrDefault(TARGET_KEY, DEFAULT_TARGET_KEY),
pluginSetting.getStringOrDefault(GROK_WHEN, null),
pluginSetting.getTypedList(TAGS_ON_MATCH_FAILURE, String.class),
pluginSetting.getTypedList(TAGS_ON_TIMEOUT, String.class),
pluginSetting.getBooleanOrDefault(INCLUDE_PERFORMANCE_METADATA, false));
}
@JsonProperty(BREAK_ON_MATCH)
@JsonPropertyDescription("Specifies whether to match all patterns (`false`) or stop once the first successful " +
"match is found (`true`). Default is `true`.")
private boolean breakOnMatch = DEFAULT_BREAK_ON_MATCH;
@JsonProperty(KEEP_EMPTY_CAPTURES)
@JsonPropertyDescription("Enables the preservation of `null` captures from the processed output. Default is `false`.")
private boolean keepEmptyCaptures = DEFAULT_KEEP_EMPTY_CAPTURES;
@JsonProperty(MATCH)
@JsonPropertyDescription("Specifies which keys should match specific patterns. Default is an empty response body.")
private Map<String, List<String>> match = Collections.emptyMap();
@JsonProperty(NAMED_CAPTURES_ONLY)
@JsonPropertyDescription("Specifies whether to keep only named captures. Default is `true`.")
private boolean namedCapturesOnly = DEFAULT_NAMED_CAPTURES_ONLY;
@JsonProperty(KEYS_TO_OVERWRITE)
@JsonPropertyDescription("Specifies which existing keys will be overwritten if there is a capture with the same key value. " +
"Default is `[]`.")
private List<String> keysToOverwrite = Collections.emptyList();
@JsonProperty(PATTERNS_DIRECTORIES)
@JsonPropertyDescription("Specifies which directory paths contain the custom pattern files. Default is an empty list.")
private List<String> patternsDirectories = Collections.emptyList();
@JsonProperty(PATTERNS_FILES_GLOB)
@JsonPropertyDescription("Specifies which pattern files to use from the directories specified for " +
"`pattern_directories`. Default is `*`.")
private String patternsFilesGlob = DEFAULT_PATTERNS_FILES_GLOB;
@JsonProperty(PATTERN_DEFINITIONS)
@JsonPropertyDescription("Allows for a custom pattern that can be used inline inside the response body. " +
"Default is an empty response body.")
private Map<String, String> patternDefinitions = Collections.emptyMap();
@JsonProperty(TIMEOUT_MILLIS)
@JsonPropertyDescription("The maximum amount of time during which matching occurs. " +
"Setting to `0` prevents any matching from occurring. Default is `30,000`.")
private int timeoutMillis = DEFAULT_TIMEOUT_MILLIS;
@JsonProperty(TARGET_KEY)
@JsonPropertyDescription("Specifies a parent-level key used to store all captures. Default value is `null`.")
private String targetKey = DEFAULT_TARGET_KEY;
@JsonProperty(GROK_WHEN)
@JsonPropertyDescription("Specifies under what condition the `grok` processor should perform matching. " +
"Default is no condition.")
private String grokWhen;
@JsonProperty(TAGS_ON_MATCH_FAILURE)
@JsonPropertyDescription("A `List` of `String`s that specifies the tags to be set in the event when grok fails to " +
"match or an unknown exception occurs while matching. This tag may be used in conditional expressions in " +
"other parts of the configuration")
private List<String> tagsOnMatchFailure = Collections.emptyList();
@JsonProperty(TAGS_ON_TIMEOUT)
@JsonPropertyDescription("A `List` of `String`s that specifies the tags to be set in the event when grok match times out.")
private List<String> tagsOnTimeout = Collections.emptyList();
@JsonProperty(INCLUDE_PERFORMANCE_METADATA)
@JsonPropertyDescription("A `Boolean` on whether to include performance metadata into event metadata, " +
"e.g. _total_grok_patterns_attempted, _total_grok_processing_time.")
private boolean includePerformanceMetadata = false;

public boolean isBreakOnMatch() {
return breakOnMatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.processor.grok;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -27,6 +28,7 @@
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS;

public class GrokProcessorConfigTests {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String PLUGIN_NAME = "grok";

private static final Map<String, List<String>> TEST_MATCH = new HashMap<>();
Expand Down Expand Up @@ -62,7 +64,8 @@ public static void setUp() {

@Test
public void testDefault() {
final GrokProcessorConfig grokProcessorConfig = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME, null));
final GrokProcessorConfig grokProcessorConfig = OBJECT_MAPPER.convertValue(
Collections.emptyMap(), GrokProcessorConfig.class);

assertThat(grokProcessorConfig.isBreakOnMatch(), equalTo(DEFAULT_BREAK_ON_MATCH));
assertThat(grokProcessorConfig.isKeepEmptyCaptures(), equalTo(DEFAULT_KEEP_EMPTY_CAPTURES));
Expand Down Expand Up @@ -95,7 +98,8 @@ public void testValidConfig() {
TEST_TARGET_KEY,
true);

final GrokProcessorConfig grokProcessorConfig = GrokProcessorConfig.buildConfig(validPluginSetting);
final GrokProcessorConfig grokProcessorConfig = OBJECT_MAPPER.convertValue(
validPluginSetting.getSettings(), GrokProcessorConfig.class);

assertThat(grokProcessorConfig.isBreakOnMatch(), equalTo(false));
assertThat(grokProcessorConfig.isKeepEmptyCaptures(), equalTo(true));
Expand Down Expand Up @@ -127,7 +131,8 @@ public void testInvalidConfig() {

invalidPluginSetting.getSettings().put(GrokProcessorConfig.MATCH, TEST_INVALID_MATCH);

assertThrows(IllegalArgumentException.class, () -> GrokProcessorConfig.buildConfig(invalidPluginSetting));
assertThrows(IllegalArgumentException.class, () -> OBJECT_MAPPER.convertValue(
invalidPluginSetting.getSettings(), GrokProcessorConfig.class));
}

private PluginSetting completePluginSettingForGrokProcessor(final boolean breakOnMatch,
Expand Down Expand Up @@ -160,43 +165,31 @@ private PluginSetting completePluginSettingForGrokProcessor(final boolean breakO
@Test
void getTagsOnMatchFailure_returns_tagOnMatch() {
final List<String> tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME,
Map.of(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch)
));
final GrokProcessorConfig objectUnderTest = OBJECT_MAPPER.convertValue(
Map.of(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch), GrokProcessorConfig.class);

assertThat(objectUnderTest.getTagsOnMatchFailure(), equalTo(tagsOnMatch));
}

@Test
void getTagsOnTimeout_returns_tagsOnMatch_if_no_tagsOnTimeout() {
final List<String> tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME,
Map.of(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch)
));

assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnMatch));
}

@Test
void getTagsOnTimeout_returns_tagsOnTimeout_if_present() {
final List<String> tagsOnMatch = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final List<String> tagsOnTimeout = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME,
final GrokProcessorConfig objectUnderTest = OBJECT_MAPPER.convertValue(
Map.of(
GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, tagsOnMatch,
GrokProcessorConfig.TAGS_ON_TIMEOUT, tagsOnTimeout
)
));
),
GrokProcessorConfig.class);

assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnTimeout));
}

@Test
void getTagsOnTimeout_returns_tagsOnTimeout_if_present_and_no_tagsOnMatch() {
final List<String> tagsOnTimeout = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GrokProcessorConfig objectUnderTest = GrokProcessorConfig.buildConfig(new PluginSetting(PLUGIN_NAME,
Map.of(GrokProcessorConfig.TAGS_ON_TIMEOUT, tagsOnTimeout)
));
final GrokProcessorConfig objectUnderTest = OBJECT_MAPPER.convertValue(
Map.of(GrokProcessorConfig.TAGS_ON_TIMEOUT, tagsOnTimeout), GrokProcessorConfig.class);

assertThat(objectUnderTest.getTagsOnTimeout(), equalTo(tagsOnTimeout));
}
Expand Down
Loading

0 comments on commit 286edc2

Please sign in to comment.