Skip to content

Commit

Permalink
Create decrompress processor to decompress gzipped keys (opensearch-p…
Browse files Browse the repository at this point in the history
…roject#4118)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Feb 14, 2024
1 parent ab58c96 commit 3e3f302
Show file tree
Hide file tree
Showing 16 changed files with 787 additions and 0 deletions.
13 changes: 13 additions & 0 deletions data-prepper-plugins/decompress-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation 'commons-io:commons-io:2.15.1'
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'io.micrometer:micrometer-core'
testImplementation testLibs.mockito.inline
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.google.common.base.Charsets;
import io.micrometer.core.instrument.Counter;
import org.apache.commons.io.IOUtils;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Collection;

@DataPrepperPlugin(name = "decompress", pluginType = Processor.class, pluginConfigurationType = DecompressProcessorConfig.class)
public class DecompressProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(DecompressProcessor.class);
static final String DECOMPRESSION_PROCESSING_ERRORS = "processingErrors";

private final DecompressProcessorConfig decompressProcessorConfig;
private final ExpressionEvaluator expressionEvaluator;

private final Counter decompressionProcessingErrors;

@DataPrepperPluginConstructor
public DecompressProcessor(final PluginMetrics pluginMetrics,
final DecompressProcessorConfig decompressProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.decompressProcessorConfig = decompressProcessorConfig;
this.expressionEvaluator = expressionEvaluator;
this.decompressionProcessingErrors = pluginMetrics.counter(DECOMPRESSION_PROCESSING_ERRORS);

if (decompressProcessorConfig.getDecompressWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(decompressProcessorConfig.getDecompressWhen())) {
throw new InvalidPluginConfigurationException(
String.format("decompress_when value of %s is not a valid expression statement. " +
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", decompressProcessorConfig.getDecompressWhen()));
}
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {

try {
if (decompressProcessorConfig.getDecompressWhen() != null && !expressionEvaluator.evaluateConditional(decompressProcessorConfig.getDecompressWhen(), record.getData())) {
continue;
}

for (final String key : decompressProcessorConfig.getKeys()) {

final String compressedValue = record.getData().get(key, String.class);

if (compressedValue == null) {
continue;
}

final byte[] compressedValueAsBytes = decompressProcessorConfig.getEncodingType().getDecoderEngine().decode(compressedValue);

try (final InputStream inputStream = decompressProcessorConfig.getDecompressionType().getDecompressionEngine().createInputStream(new ByteArrayInputStream(compressedValueAsBytes));){

final String decompressedString = IOUtils.toString(inputStream, Charsets.UTF_8);
record.getData().put(key, decompressedString);
} catch (final Exception e) {
LOG.error("Unable to decompress key {} using decompression type {}:",
key, decompressProcessorConfig.getDecompressionType(), e);
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
}
}
} catch (final DecodingException e) {
LOG.error("Unable to decode key with base64: {}", e.getMessage());
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
} catch (final Exception e) {
LOG.error("An uncaught exception occurred while decompressing Events", e);
record.getData().getMetadata().addTags(decompressProcessorConfig.getTagsOnFailure());
decompressionProcessingErrors.increment();
}
}

return records;
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.DecoderEngineFactory;

import java.util.List;

public class DecompressProcessorConfig {

@JsonProperty("keys")
@NotEmpty
private List<String> keys;

@JsonProperty("type")
@NotNull
private DecompressionType decompressionType;

@JsonProperty("decompress_when")
private String decompressWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure = List.of("_decompression_failure");

@JsonIgnore
private final EncodingType encodingType = EncodingType.BASE64;

public List<String> getKeys() {
return keys;
}

public DecompressionEngineFactory getDecompressionType() {
return decompressionType;
}

public DecoderEngineFactory getEncodingType() { return encodingType; }

public String getDecompressWhen() {
return decompressWhen;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import org.opensearch.dataprepper.model.codec.DecompressionEngine;

public interface DecompressionEngineFactory {
public DecompressionEngine getDecompressionEngine();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum DecompressionType implements DecompressionEngineFactory {
GZIP("gzip");

private final String option;

private static final Map<String, DecompressionType> OPTIONS_MAP = Arrays.stream(DecompressionType.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private static final Map<String, DecompressionEngine> DECOMPRESSION_ENGINE_MAP = Map.of(
"gzip", new GZipDecompressionEngine()
);

DecompressionType(final String option) {
this.option = option;
}

@JsonCreator
static DecompressionType fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}

@Override
public DecompressionEngine getDecompressionEngine() {
return DECOMPRESSION_ENGINE_MAP.get(this.option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;

import java.util.Base64;

public class Base64DecoderEngine implements DecoderEngine {
@Override
public byte[] decode(final String encodedValue) {
try {
return Base64.getDecoder().decode(encodedValue);
} catch (final Exception e) {
throw new DecodingException(String.format("There was an error decoding with the base64 encoding type: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import org.opensearch.dataprepper.plugins.processor.decompress.exceptions.DecodingException;

public interface DecoderEngine {
byte[] decode(final String encodedValue) throws DecodingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

public interface DecoderEngineFactory {
DecoderEngine getDecoderEngine();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.encoding;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum EncodingType implements DecoderEngineFactory {
BASE64("base64");

private final String option;

private static final Map<String, EncodingType> OPTIONS_MAP = Arrays.stream(EncodingType.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private static final Map<String, DecoderEngine> DECODER_ENGINE_MAP = Map.of(
"base64", new Base64DecoderEngine()
);

EncodingType(final String option) {
this.option = option;
}

@JsonCreator
static EncodingType fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}

@Override
public DecoderEngine getDecoderEngine() {
return DECODER_ENGINE_MAP.get(this.option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.decompress.exceptions;

public class DecodingException extends RuntimeException {
public DecodingException(final String message) {
super(message);
}
}
Loading

0 comments on commit 3e3f302

Please sign in to comment.