From a407e4f9eef2a82f4e1b3c078e9195463063fe1b Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Wed, 30 Aug 2023 12:31:05 -0500 Subject: [PATCH] ENH: allow extension configuration from data prepper configuration (#2851) * ADD: initial implementation on injecting extension config Signed-off-by: George Chen --- .../DataPrepperExtensionPlugin.java | 16 ++ .../opensearch/dataprepper/DataPrepper.java | 6 +- ...neParser.java => PipelineTransformer.java} | 132 ++++------------ .../parser/PipelinesDataflowModelParser.java | 94 ++++++++++++ .../config/PipelineParserConfiguration.java | 22 ++- .../model/DataPrepperConfiguration.java | 15 +- .../parser/model/PipelineExtensions.java | 25 +++ .../dataprepper/plugin/ExtensionLoader.java | 47 +++++- ...ExtensionPluginConfigurationConverter.java | 62 ++++++++ .../plugin/ObjectMapperConfiguration.java | 26 ++++ .../plugin/PluginConfigurationConverter.java | 16 +- .../dataprepper/DataPrepperTests.java | 12 +- .../dataprepper/TestDataProvider.java | 1 + ...sts.java => PipelineTransformerTests.java} | 145 ++++++++---------- .../PipelinesDataflowModelParserTest.java | 69 +++++++++ .../PipelineParserConfigurationTest.java | 18 +-- .../model/DataPrepperConfigurationTests.java | 13 ++ .../plugin/DefaultPluginFactoryIT.java | 9 +- .../plugin/ExtensionLoaderTest.java | 48 +++++- ...nsionPluginConfigurationConverterTest.java | 110 +++++++++++++ .../dataprepper/plugin/ExtensionsIT.java | 70 +++++++++ .../PluginConfigurationConverterTest.java | 4 +- .../TestPluginUsingExtensionWithConfig.java | 36 +++++ .../plugins/test/TestExtensionConfig.java | 12 ++ .../plugins/test/TestExtensionWithConfig.java | 80 ++++++++++ ...ata_prepper_config_with_test_extension.yml | 4 + .../src/test/resources/valid_pipeline.yml | 5 + 27 files changed, 866 insertions(+), 231 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/DataPrepperExtensionPlugin.java rename data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/{PipelineParser.java => PipelineTransformer.java} (71%) create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineExtensions.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverter.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java rename data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/{PipelineParserTests.java => PipelineTransformerTests.java} (76%) create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverterTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginUsingExtensionWithConfig.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/test/TestExtensionConfig.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/test/TestExtensionWithConfig.java create mode 100644 data-prepper-core/src/test/resources/valid_data_prepper_config_with_test_extension.yml create mode 100644 data-prepper-core/src/test/resources/valid_pipeline.yml diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/DataPrepperExtensionPlugin.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/DataPrepperExtensionPlugin.java new file mode 100644 index 0000000000..fb115cd622 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/annotations/DataPrepperExtensionPlugin.java @@ -0,0 +1,16 @@ +package org.opensearch.dataprepper.model.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface DataPrepperExtensionPlugin { + Class modelType(); + + String rootKey(); +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java index 865c46745f..1c93f41ec8 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java @@ -7,7 +7,7 @@ import io.micrometer.core.instrument.util.StringUtils; import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.parser.PipelineParser; +import org.opensearch.dataprepper.parser.PipelineTransformer; import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer; import org.opensearch.dataprepper.pipeline.Pipeline; import org.opensearch.dataprepper.pipeline.PipelineObserver; @@ -58,13 +58,13 @@ public static String getServiceNameForMetrics() { @Inject public DataPrepper( - final PipelineParser pipelineParser, + final PipelineTransformer pipelineTransformer, final PluginFactory pluginFactory, final PeerForwarderServer peerForwarderServer, final Predicate> shouldShutdownOnPipelineFailurePredicate) { this.pluginFactory = pluginFactory; - transformationPipelines = pipelineParser.parseConfiguration(); + transformationPipelines = pipelineTransformer.transformConfiguration(); this.shouldShutdownOnPipelineFailurePredicate = shouldShutdownOnPipelineFailurePredicate; if (transformationPipelines.size() == 0) { throw new RuntimeException("No valid pipeline is available for execution, exiting"); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java similarity index 71% rename from data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java rename to data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java index 6a0a67d0f0..1462a38049 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java @@ -5,13 +5,9 @@ package org.opensearch.dataprepper.parser; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding; @@ -37,15 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileFilter; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.SequenceInputStream; import java.time.Duration; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -53,18 +41,15 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.lang.String.format; @SuppressWarnings("rawtypes") -public class PipelineParser { - private static final Logger LOG = LoggerFactory.getLogger(PipelineParser.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()) - .enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY); +public class PipelineTransformer { + private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformer.class); private static final String PIPELINE_TYPE = "pipeline"; private static final String ATTRIBUTE_NAME = "name"; - private final String pipelineConfigurationFileLocation; + private final PipelinesDataFlowModel pipelinesDataFlowModel; private final RouterFactory routerFactory; private final DataPrepperConfiguration dataPrepperConfiguration; private final CircuitBreakerManager circuitBreakerManager; @@ -75,16 +60,16 @@ public class PipelineParser { private final AcknowledgementSetManager acknowledgementSetManager; private final SourceCoordinatorFactory sourceCoordinatorFactory; - public PipelineParser(final String pipelineConfigurationFileLocation, - final PluginFactory pluginFactory, - final PeerForwarderProvider peerForwarderProvider, - final RouterFactory routerFactory, - final DataPrepperConfiguration dataPrepperConfiguration, - final CircuitBreakerManager circuitBreakerManager, - final EventFactory eventFactory, - final AcknowledgementSetManager acknowledgementSetManager, - final SourceCoordinatorFactory sourceCoordinatorFactory) { - this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation; + public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, + final PluginFactory pluginFactory, + final PeerForwarderProvider peerForwarderProvider, + final RouterFactory routerFactory, + final DataPrepperConfiguration dataPrepperConfiguration, + final CircuitBreakerManager circuitBreakerManager, + final EventFactory eventFactory, + final AcknowledgementSetManager acknowledgementSetManager, + final SourceCoordinatorFactory sourceCoordinatorFactory) { + this.pipelinesDataFlowModel = pipelinesDataFlowModel; this.pluginFactory = Objects.requireNonNull(pluginFactory); this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider); this.routerFactory = routerFactory; @@ -95,82 +80,25 @@ public PipelineParser(final String pipelineConfigurationFileLocation, this.sourceCoordinatorFactory = sourceCoordinatorFactory; } - /** - * Parses the configuration file into Pipeline - */ - public Map parseConfiguration() { - try (final InputStream mergedPipelineConfigurationFiles = mergePipelineConfigurationFiles()) { - final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(mergedPipelineConfigurationFiles, - PipelinesDataFlowModel.class); - - final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion(); - validateDataPrepperVersion(version); - - final Map pipelineConfigurationMap = pipelinesDataFlowModel.getPipelines().entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> new PipelineConfiguration(entry.getValue()) - )); - final List allPipelineNames = PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigurationMap); - - // LinkedHashMap to preserve insertion order - final Map pipelineMap = new LinkedHashMap<>(); - pipelineConfigurationMap.forEach((pipelineName, configuration) -> - configuration.updateCommonPipelineConfiguration(pipelineName)); - for (String pipelineName : allPipelineNames) { - if (!pipelineMap.containsKey(pipelineName) && pipelineConfigurationMap.containsKey(pipelineName)) { - buildPipelineFromConfiguration(pipelineName, pipelineConfigurationMap, pipelineMap); - } - } - return pipelineMap; - } catch (IOException e) { - LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation); - throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e); - } - } - - private void validateDataPrepperVersion(final DataPrepperVersion version) { - if (Objects.nonNull(version) && !DataPrepperVersion.getCurrentVersion().compatibleWith(version)) { - LOG.error("The version: {} is not compatible with the current version: {}", version, DataPrepperVersion.getCurrentVersion()); - throw new ParseException(format("The version: %s is not compatible with the current version: %s", - version, DataPrepperVersion.getCurrentVersion())); - } - } - - private InputStream mergePipelineConfigurationFiles() throws IOException { - final File configurationLocation = new File(pipelineConfigurationFileLocation); - - if (configurationLocation.isFile()) { - return new FileInputStream(configurationLocation); - } else if (configurationLocation.isDirectory()) { - FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml")); - List configurationFiles = Stream.of(configurationLocation.listFiles(yamlFilter)) - .map(file -> { - InputStream inputStream; - try { - inputStream = new FileInputStream(file); - LOG.info("Reading pipeline configuration from {}", file.getName()); - } catch (FileNotFoundException e) { - inputStream = null; - LOG.warn("Pipeline configuration file {} not found", file.getName()); - } - return inputStream; - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - if (configurationFiles.isEmpty()) { - LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); - throw new ParseException( - format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); + public Map transformConfiguration() { + final Map pipelineConfigurationMap = pipelinesDataFlowModel.getPipelines().entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> new PipelineConfiguration(entry.getValue()) + )); + final List allPipelineNames = PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigurationMap); + + // LinkedHashMap to preserve insertion order + final Map pipelineMap = new LinkedHashMap<>(); + pipelineConfigurationMap.forEach((pipelineName, configuration) -> + configuration.updateCommonPipelineConfiguration(pipelineName)); + for (String pipelineName : allPipelineNames) { + if (!pipelineMap.containsKey(pipelineName) && pipelineConfigurationMap.containsKey(pipelineName)) { + buildPipelineFromConfiguration(pipelineName, pipelineConfigurationMap, pipelineMap); } - - return new SequenceInputStream(Collections.enumeration(configurationFiles)); - } else { - LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); - throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); } + return pipelineMap; } private void buildPipelineFromConfiguration( diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java new file mode 100644 index 0000000000..0cfd8fa257 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java @@ -0,0 +1,94 @@ +package org.opensearch.dataprepper.parser; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; + +public class PipelinesDataflowModelParser { + private static final Logger LOG = LoggerFactory.getLogger(PipelinesDataflowModelParser.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()) + .enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY); + + private final String pipelineConfigurationFileLocation; + + public PipelinesDataflowModelParser(final String pipelineConfigurationFileLocation) { + this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation; + } + + public PipelinesDataFlowModel parseConfiguration() { + try (final InputStream mergedPipelineConfigurationFiles = mergePipelineConfigurationFiles()) { + final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(mergedPipelineConfigurationFiles, + PipelinesDataFlowModel.class); + + final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion(); + validateDataPrepperVersion(version); + + return pipelinesDataFlowModel; + } catch (IOException e) { + LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation); + throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e); + } + } + + private void validateDataPrepperVersion(final DataPrepperVersion version) { + if (Objects.nonNull(version) && !DataPrepperVersion.getCurrentVersion().compatibleWith(version)) { + LOG.error("The version: {} is not compatible with the current version: {}", version, DataPrepperVersion.getCurrentVersion()); + throw new ParseException(format("The version: %s is not compatible with the current version: %s", + version, DataPrepperVersion.getCurrentVersion())); + } + } + + private InputStream mergePipelineConfigurationFiles() throws IOException { + final File configurationLocation = new File(pipelineConfigurationFileLocation); + + if (configurationLocation.isFile()) { + return new FileInputStream(configurationLocation); + } else if (configurationLocation.isDirectory()) { + FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml")); + List configurationFiles = Stream.of(configurationLocation.listFiles(yamlFilter)) + .map(file -> { + InputStream inputStream; + try { + inputStream = new FileInputStream(file); + LOG.info("Reading pipeline configuration from {}", file.getName()); + } catch (FileNotFoundException e) { + inputStream = null; + LOG.warn("Pipeline configuration file {} not found", file.getName()); + } + return inputStream; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + if (configurationFiles.isEmpty()) { + LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); + throw new ParseException( + format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); + } + + return new SequenceInputStream(Collections.enumeration(configurationFiles)); + } else { + LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); + throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); + } + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java index 56bb548476..43d7f3ad8d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java @@ -6,8 +6,10 @@ package org.opensearch.dataprepper.parser.config; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.parser.PipelineParser; +import org.opensearch.dataprepper.parser.PipelineTransformer; +import org.opensearch.dataprepper.parser.PipelinesDataflowModelParser; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.pipeline.router.RouterFactory; @@ -21,8 +23,8 @@ public class PipelineParserConfiguration { @Bean - public PipelineParser pipelineParser( - final FileStructurePathProvider fileStructurePathProvider, + public PipelineTransformer pipelineParser( + final PipelinesDataFlowModel pipelinesDataFlowModel, final PluginFactory pluginFactory, final PeerForwarderProvider peerForwarderProvider, final RouterFactory routerFactory, @@ -32,7 +34,7 @@ public PipelineParser pipelineParser( final AcknowledgementSetManager acknowledgementSetManager, final SourceCoordinatorFactory sourceCoordinatorFactory ) { - return new PipelineParser(fileStructurePathProvider.getPipelineConfigFileLocation(), + return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, @@ -42,4 +44,16 @@ public PipelineParser pipelineParser( acknowledgementSetManager, sourceCoordinatorFactory); } + + @Bean + public PipelinesDataflowModelParser pipelinesDataflowModelParser( + final FileStructurePathProvider fileStructurePathProvider) { + return new PipelinesDataflowModelParser(fileStructurePathProvider.getPipelineConfigFileLocation()); + } + + @Bean + public PipelinesDataFlowModel pipelinesDataFlowModel( + final PipelinesDataflowModelParser pipelinesDataflowModelParser) { + return pipelinesDataflowModelParser.parseConfiguration(); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java index 5be4e05040..adccb80120 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java @@ -7,7 +7,10 @@ import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.parser.config.MetricTagFilter; import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; @@ -47,6 +50,7 @@ public class DataPrepperConfiguration { private PeerForwarderConfiguration peerForwarderConfiguration; private Duration processorShutdownTimeout; private Duration sinkShutdownTimeout; + private PipelineExtensions pipelineExtensions; public static final DataPrepperConfiguration DEFAULT_CONFIG = new DataPrepperConfiguration(); @@ -85,7 +89,11 @@ public DataPrepperConfiguration( final Duration sinkShutdownTimeout, @JsonProperty("circuit_breakers") final CircuitBreakerConfig circuitBreakerConfig, @JsonProperty("source_coordination") final SourceCoordinationConfig sourceCoordinationConfig, - @JsonProperty("pipeline_shutdown") final PipelineShutdownOption pipelineShutdown) { + @JsonProperty("pipeline_shutdown") final PipelineShutdownOption pipelineShutdown, + @JsonProperty("extensions") + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonSetter(nulls = Nulls.SKIP) + final PipelineExtensions pipelineExtensions) { this.authentication = authentication; this.circuitBreakerConfig = circuitBreakerConfig; this.sourceCoordinationConfig = Objects.isNull(sourceCoordinationConfig) @@ -111,6 +119,7 @@ public DataPrepperConfiguration( if (this.sinkShutdownTimeout.isNegative()) { throw new IllegalArgumentException("sinkShutdownTimeout must be non-negative."); } + this.pipelineExtensions = pipelineExtensions; } public int getServerPort() { @@ -214,4 +223,8 @@ public CircuitBreakerConfig getCircuitBreakerConfig() { public PipelineShutdownOption getPipelineShutdown() { return pipelineShutdown; } + + public PipelineExtensions getPipelineExtensions() { + return pipelineExtensions; + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineExtensions.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineExtensions.java new file mode 100644 index 0000000000..30212d5203 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineExtensions.java @@ -0,0 +1,25 @@ +package org.opensearch.dataprepper.parser.model; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class PipelineExtensions { + + @JsonAnySetter + private final Map extensionMap; + + @JsonCreator + public PipelineExtensions() { + extensionMap = new HashMap<>(); + } + + @JsonAnyGetter + public Map getExtensionMap() { + return Collections.unmodifiableMap(extensionMap); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java index 20b6690552..abbf0c68d7 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin; import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; @@ -12,30 +13,49 @@ import javax.inject.Named; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; @Named public class ExtensionLoader { + private final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter; private final ExtensionClassProvider extensionClassProvider; private final PluginCreator pluginCreator; @Inject ExtensionLoader( + final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter, final ExtensionClassProvider extensionClassProvider, final PluginCreator pluginCreator) { + this.extensionPluginConfigurationConverter = extensionPluginConfigurationConverter; this.extensionClassProvider = extensionClassProvider; this.pluginCreator = pluginCreator; } List loadExtensions() { - final PluginArgumentsContext pluginArgumentsContext = new NoArgumentsArgumentsContext(); - return extensionClassProvider.loadExtensionPluginClasses() .stream() - .map(extensionClass -> pluginCreator.newPluginInstance(extensionClass, pluginArgumentsContext, convertClassToName(extensionClass))) + .map(extensionClass -> { + final PluginArgumentsContext pluginArgumentsContext = getConstructionContext(extensionClass); + return pluginCreator.newPluginInstance(extensionClass, pluginArgumentsContext, convertClassToName(extensionClass)); + }) .collect(Collectors.toList()); } + private PluginArgumentsContext getConstructionContext(final Class extensionPluginClass) { + final DataPrepperExtensionPlugin pluginAnnotation = extensionPluginClass.getAnnotation( + DataPrepperExtensionPlugin.class); + if (pluginAnnotation == null) { + return new NoArgumentsArgumentsContext(); + } else { + final Class pluginConfigurationType = pluginAnnotation.modelType(); + final String rootKey = pluginAnnotation.rootKey(); + final Object configuration = extensionPluginConfigurationConverter.convert( + pluginConfigurationType, rootKey); + return new SingleConfigArgumentArgumentsContext(configuration); + } + } + private String convertClassToName(final Class extensionClass) { final String className = extensionClass.getSimpleName(); return classNameToPluginName(className); @@ -51,7 +71,7 @@ static String classNameToPluginName(final String className) { .replace("$", ""); } - private static class NoArgumentsArgumentsContext implements PluginArgumentsContext { + protected static class NoArgumentsArgumentsContext implements PluginArgumentsContext { @Override public Object[] createArguments(final Class[] parameterTypes) { if(parameterTypes.length != 0) { @@ -60,4 +80,23 @@ public Object[] createArguments(final Class[] parameterTypes) { return new Object[0]; } } + + protected static class SingleConfigArgumentArgumentsContext implements PluginArgumentsContext { + private final Object extensionPluginConfiguration; + + SingleConfigArgumentArgumentsContext(final Object extensionPluginConfiguration) { + this.extensionPluginConfiguration = extensionPluginConfiguration; + } + + @Override + public Object[] createArguments(Class[] parameterTypes) { + if (parameterTypes.length != 1 && (Objects.nonNull(extensionPluginConfiguration) && + !parameterTypes[0].equals(extensionPluginConfiguration.getClass()))) { + throw new InvalidPluginDefinitionException(String.format( + "Single %s argument is permitted for extensions constructors.", + extensionPluginConfiguration.getClass())); + } + return new Object[] { extensionPluginConfiguration }; + } + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverter.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverter.java new file mode 100644 index 0000000000..ecf5633c2e --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverter.java @@ -0,0 +1,62 @@ +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validator; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; + +import javax.inject.Inject; +import javax.inject.Named; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +@Named +public class ExtensionPluginConfigurationConverter { + private final DataPrepperConfiguration dataPrepperConfiguration; + private final ObjectMapper objectMapper; + private final Validator validator; + + @Inject + public ExtensionPluginConfigurationConverter(final DataPrepperConfiguration dataPrepperConfiguration, + final Validator validator, + @Named("pluginConfigObjectMapper") + final ObjectMapper objectMapper) { + this.dataPrepperConfiguration = dataPrepperConfiguration; + this.objectMapper = objectMapper; + this.validator = validator; + } + + public Object convert(final Class extensionPluginConfigurationType, final String rootKey) { + Objects.requireNonNull(extensionPluginConfigurationType); + Objects.requireNonNull(rootKey); + + final Map extensionProperties = dataPrepperConfiguration.getPipelineExtensions() == null? + new HashMap<>() : dataPrepperConfiguration.getPipelineExtensions().getExtensionMap(); + + final Object configuration = convertSettings(extensionPluginConfigurationType, + extensionProperties.get(rootKey)); + + final Set> constraintViolations = configuration == null ? Collections.emptySet() : + validator.validate(configuration); + + if (!constraintViolations.isEmpty()) { + final String violationsString = constraintViolations.stream() + .map(v -> v.getPropertyPath().toString() + " " + v.getMessage()) + .collect(Collectors.joining(". ")); + + final String exceptionMessage = String.format("Extension %s in PipelineExtensions " + + "is configured incorrectly: %s", rootKey, violationsString); + throw new InvalidPluginConfigurationException(exceptionMessage); + } + return configuration; + } + + private Object convertSettings(final Class extensionPluginConfigurationType, final Object extensionPlugin) { + return objectMapper.convertValue(extensionPlugin, extensionPluginConfigurationType); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java new file mode 100644 index 0000000000..fbc2765d38 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.opensearch.dataprepper.parser.DataPrepperDurationDeserializer; +import org.springframework.context.annotation.Bean; + +import javax.inject.Named; +import java.time.Duration; + +/** + * Application context for internal plugin framework beans. + */ +@Named +public class ObjectMapperConfiguration { + @Bean(name = "pluginConfigObjectMapper") + ObjectMapper objectMapper() { + final SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); + + return new ObjectMapper() + .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) + .registerModule(simpleModule); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java index 72a68f2c2e..5f612993ce 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java @@ -9,14 +9,10 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.module.SimpleModule; import jakarta.validation.ConstraintViolation; import jakarta.validation.Validator; -import org.opensearch.dataprepper.parser.DataPrepperDurationDeserializer; import javax.inject.Named; -import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -32,14 +28,10 @@ class PluginConfigurationConverter { private final ObjectMapper objectMapper; private final Validator validator; - PluginConfigurationConverter(final Validator validator) { - final SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); - - this.objectMapper = new ObjectMapper() - .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE) - .registerModule(simpleModule); - + PluginConfigurationConverter(final Validator validator, + @Named("pluginConfigObjectMapper") + final ObjectMapper objectMapper) { + this.objectMapper = objectMapper; this.validator = validator; } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java index 826b411ced..670d9664c6 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java @@ -13,7 +13,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.parser.PipelineParser; +import org.opensearch.dataprepper.parser.PipelineTransformer; import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer; import org.opensearch.dataprepper.pipeline.Pipeline; import org.opensearch.dataprepper.pipeline.PipelineObserver; @@ -39,7 +39,7 @@ public class DataPrepperTests { private Map parseConfigurationFixture; @Mock - private PipelineParser pipelineParser; + private PipelineTransformer pipelineTransformer; @Mock private Pipeline pipeline; @Mock @@ -58,12 +58,12 @@ public void before() { lenient().when(pipeline.getName()).thenReturn("testKey"); lenient().when(pipeline.isReady()).thenReturn(true); - lenient().when(pipelineParser.parseConfiguration()) + lenient().when(pipelineTransformer.transformConfiguration()) .thenReturn(parseConfigurationFixture); } private DataPrepper createObjectUnderTest() throws NoSuchFieldException, IllegalAccessException { - final DataPrepper dataPrepper = new DataPrepper(pipelineParser, pluginFactory, peerForwarderServer, shouldShutdownOnPipelineFailurePredicate); + final DataPrepper dataPrepper = new DataPrepper(pipelineTransformer, pluginFactory, peerForwarderServer, shouldShutdownOnPipelineFailurePredicate); final Field dataPrepperServerField = dataPrepper.getClass().getDeclaredField("dataPrepperServer"); dataPrepperServerField.setAccessible(true); dataPrepperServerField.set(dataPrepper, dataPrepperServer); @@ -82,11 +82,11 @@ public void testGivenValidInputThenInstanceCreation() throws NoSuchFieldExceptio @Test public void testGivenInvalidInputThenExceptionThrown() { - final PipelineParser pipelineParser = mock(PipelineParser.class); + final PipelineTransformer pipelineTransformer = mock(PipelineTransformer.class); assertThrows( RuntimeException.class, - () -> new DataPrepper(pipelineParser, pluginFactory, peerForwarderServer, shouldShutdownOnPipelineFailurePredicate), + () -> new DataPrepper(pipelineTransformer, pluginFactory, peerForwarderServer, shouldShutdownOnPipelineFailurePredicate), "Exception should be thrown if pipeline parser has no pipeline configuration"); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java index 6279b1ee11..c638463a10 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java @@ -77,6 +77,7 @@ public class TestDataProvider { public static final String VALID_PEER_FORWARDER_CONFIG_FILE = "src/test/resources/valid_peer_forwarder_config.yml"; public static final String VALID_PEER_FORWARDER_CONFIG_WITH_DRAIN_TIMEOUT_FILE = "src/test/resources/valid_peer_forwarder_config_with_drain_timeout.yml"; public static final String VALID_PEER_FORWARDER_CONFIG_WITH_ISO8601_DRAIN_TIMEOUT_FILE = "src/test/resources/valid_peer_forwarder_config_with_iso8601_drain_timeout.yml"; + public static final String VALID_DATA_PREPPER_CONFIG_WITH_TEST_EXTENSION_FILE = "src/test/resources/valid_data_prepper_config_with_test_extension.yml"; public static final String INVALID_PEER_FORWARDER_WITH_PORT_CONFIG_FILE = "src/test/resources/invalid_peer_forwarder_with_port_config.yml"; public static final String INVALID_PEER_FORWARDER_WITH_THREAD_COUNT_CONFIG_FILE = "src/test/resources/invalid_peer_forwarder_with_thread_count_config.yml"; public static final String INVALID_PEER_FORWARDER_WITH_CONNECTION_CONFIG_FILE = "src/test/resources/invalid_peer_forwarder_with_connection_config.yml"; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineParserTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java similarity index 76% rename from data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineParserTests.java rename to data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java index 52a092b616..3e117f8204 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineParserTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java @@ -19,7 +19,7 @@ import org.opensearch.dataprepper.breaker.CircuitBreaker; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; @@ -58,7 +58,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -class PipelineParserTests { +class PipelineTransformerTests { private PeerForwarderProvider peerForwarderProvider; @Mock @@ -98,6 +98,7 @@ void setUp() { coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName()); coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); + coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration); coreContext.refresh(); pluginFactory = coreContext.getBean(DefaultPluginFactory.class); } @@ -107,8 +108,10 @@ void tearDown() { verifyNoMoreInteractions(dataPrepperConfiguration); } - private PipelineParser createObjectUnderTest(final String pipelineConfigurationFileLocation) { - return new PipelineParser(pipelineConfigurationFileLocation, pluginFactory, peerForwarderProvider, + private PipelineTransformer createObjectUnderTest(final String pipelineConfigurationFileLocation) { + final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataflowModelParser( + pipelineConfigurationFileLocation).parseConfiguration(); + return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager, sourceCoordinatorFactory); } @@ -116,153 +119,128 @@ private PipelineParser createObjectUnderTest(final String pipelineConfigurationF @Test void parseConfiguration_with_multiple_valid_pipelines_creates_the_correct_pipelineMap() { mockDataPrepperConfigurationAccesses(); - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); - final Map actualPipelineMap = pipelineParser.parseConfiguration(); + final Map actualPipelineMap = pipelineTransformer.transformConfiguration(); assertThat(actualPipelineMap.keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); verifyDataPrepperConfigurationAccesses(actualPipelineMap.keySet().size()); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_invalid_root_pipeline_creates_empty_pipelinesMap() { - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT); - final Map connectedPipelines = pipelineParser.parseConfiguration(); + final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(0)); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_incorrect_child_pipeline_returns_empty_pipelinesMap() { mockDataPrepperConfigurationAccesses(); - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT); - final Map connectedPipelines = pipelineParser.parseConfiguration(); + final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(0)); verifyDataPrepperConfigurationAccesses(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_a_single_pipeline_with_empty_source_settings_returns_that_pipeline() { mockDataPrepperConfigurationAccesses(); - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE); - final Map actualPipelineMap = pipelineParser.parseConfiguration(); + final Map actualPipelineMap = pipelineTransformer.transformConfiguration(); assertThat(actualPipelineMap.keySet().size(), equalTo(1)); verifyDataPrepperConfigurationAccesses(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_cycles_in_multiple_pipelines_should_throw() { - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.CYCLE_MULTIPLE_PIPELINE_CONFIG_FILE); - final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineParser::parseConfiguration); + final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineTransformer::transformConfiguration); assertThat(actualException.getMessage(), equalTo("Provided configuration results in a loop, check pipeline: test-pipeline-1")); - + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_incorrect_source_mapping_in_multiple_pipelines_should_throw() { - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.INCORRECT_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE); - final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineParser::parseConfiguration); + final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineTransformer::transformConfiguration); assertThat(actualException.getMessage(), equalTo("Invalid configuration, expected source test-pipeline-1 for pipeline test-pipeline-2 is missing")); - } - - @Test - void parseConfiguration_with_incompatible_version_should_throw() { - final PipelineParser pipelineParser = - createObjectUnderTest(TestDataProvider.INCOMPATIBLE_VERSION_CONFIG_FILE); - - final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineParser::parseConfiguration); - assertThat(actualException.getMessage(), - equalTo(String.format("The version: 3005.0 is not compatible with the current version: %s", DataPrepperVersion.getCurrentVersion()))); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_compatible_version() { - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.COMPATIBLE_VERSION_CONFIG_FILE); - final Map connectedPipelines = pipelineParser.parseConfiguration(); + final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(1)); verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration).getSinkShutdownTimeout(); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_missing_pipeline_name_should_throw() { - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.MISSING_NAME_MULTIPLE_PIPELINE_CONFIG_FILE); - final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineParser::parseConfiguration); + final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineTransformer::transformConfiguration); assertThat(actualException.getMessage(), equalTo("name is a required attribute for sink pipeline plugin, " + "check pipeline: test-pipeline-1")); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_missing_pipeline_name_in_multiple_pipelines_should_throw() { - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.MISSING_PIPELINE_MULTIPLE_PIPELINE_CONFIG_FILE); - final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineParser::parseConfiguration); + final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineTransformer::transformConfiguration); assertThat(actualException.getMessage(), equalTo("Invalid configuration, no pipeline is defined with name test-pipeline-4")); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void testMultipleSinks() { mockDataPrepperConfigurationAccesses(); - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_SINKS_CONFIG_FILE); - final Map pipelineMap = pipelineParser.parseConfiguration(); + final Map pipelineMap = pipelineTransformer.transformConfiguration(); assertThat(pipelineMap.keySet().size(), equalTo(3)); verifyDataPrepperConfigurationAccesses(pipelineMap.keySet().size()); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void testMultipleProcessors() { mockDataPrepperConfigurationAccesses(); - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PROCESSERS_CONFIG_FILE); - final Map pipelineMap = pipelineParser.parseConfiguration(); + final Map pipelineMap = pipelineTransformer.transformConfiguration(); assertThat(pipelineMap.keySet().size(), equalTo(3)); verifyDataPrepperConfigurationAccesses(pipelineMap.keySet().size()); - } - - @Test - void parseConfiguration_with_a_configuration_file_which_does_not_exist_should_throw() { - final PipelineParser pipelineParser = createObjectUnderTest("file_does_no_exist.yml"); - final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineParser::parseConfiguration); - assertThat(actualException.getMessage(), equalTo("Pipelines configuration file not found at file_does_no_exist.yml")); - } - - @Test - void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_pipelineMap() { - mockDataPrepperConfigurationAccesses(); - final PipelineParser pipelineParser = createObjectUnderTest(TestDataProvider.MULTI_FILE_PIPELINE_DIRECTOTRY); - final Map actualPipelineMap = pipelineParser.parseConfiguration(); - assertThat(actualPipelineMap.keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); - verifyDataPrepperConfigurationAccesses(actualPipelineMap.keySet().size()); - } - - @Test - void parseConfiguration_from_directory_with_single_file_creates_the_correct_pipelineMap() { - mockDataPrepperConfigurationAccesses(); - final PipelineParser pipelineParser = createObjectUnderTest(TestDataProvider.SINGLE_FILE_PIPELINE_DIRECTOTRY); - final Map actualPipelineMap = pipelineParser.parseConfiguration(); - assertThat(actualPipelineMap.keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); - verifyDataPrepperConfigurationAccesses(actualPipelineMap.keySet().size()); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_with_routes_creates_correct_pipeline() { mockDataPrepperConfigurationAccesses(); - final PipelineParser pipelineParser = + final PipelineTransformer pipelineTransformer = createObjectUnderTest("src/test/resources/valid_multiple_sinks_with_routes.yml"); - final Map pipelineMap = pipelineParser.parseConfiguration(); + final Map pipelineMap = pipelineTransformer.transformConfiguration(); assertThat(pipelineMap.keySet().size(), equalTo(3)); verifyDataPrepperConfigurationAccesses(pipelineMap.keySet().size()); @@ -270,25 +248,19 @@ void parseConfiguration_with_routes_creates_correct_pipeline() { assertThat(entryPipeline, notNullValue()); assertThat(entryPipeline.getSinks(), notNullValue()); assertThat(entryPipeline.getSinks().size(), equalTo(2)); - } - - @Test - void parseConfiguration_from_directory_with_no_yaml_files_should_throw() { - final PipelineParser pipelineParser = createObjectUnderTest(TestDataProvider.EMPTY_PIPELINE_DIRECTOTRY); - final RuntimeException actualException = assertThrows(RuntimeException.class, pipelineParser::parseConfiguration); - assertThat(actualException.getMessage(), equalTo( - String.format("Pipelines configuration file not found at %s", TestDataProvider.EMPTY_PIPELINE_DIRECTOTRY))); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void getPeerForwarderDrainDuration_peerForwarderConfigurationNotSet() { when(dataPrepperConfiguration.getPeerForwarderConfiguration()).thenReturn(null); - final PipelineParser pipelineParser = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); - final Duration result = pipelineParser.getPeerForwarderDrainTimeout(dataPrepperConfiguration); + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); + final Duration result = pipelineTransformer.getPeerForwarderDrainTimeout(dataPrepperConfiguration); assertThat(result, is(Duration.ofSeconds(0))); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test @@ -297,12 +269,13 @@ void getPeerForwarderDrainDuration_IsSet() { when(dataPrepperConfiguration.getPeerForwarderConfiguration()).thenReturn(peerForwarderConfiguration); when(peerForwarderConfiguration.getDrainTimeout()).thenReturn(expectedResult); - final PipelineParser pipelineParser = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); - final Duration result = pipelineParser.getPeerForwarderDrainTimeout(dataPrepperConfiguration); + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); + final Duration result = pipelineTransformer.getPeerForwarderDrainTimeout(dataPrepperConfiguration); assertThat(result, is(expectedResult)); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); verify(peerForwarderConfiguration).getDrainTimeout(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test @@ -310,10 +283,10 @@ void parseConfiguration_uses_CircuitBreaking_buffer_when_circuit_breakers_applie final CircuitBreaker circuitBreaker = mock(CircuitBreaker.class); when(circuitBreakerManager.getGlobalCircuitBreaker()) .thenReturn(Optional.of(circuitBreaker)); - final PipelineParser objectUnderTest = + final PipelineTransformer objectUnderTest = createObjectUnderTest(TestDataProvider.VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE); - final Map pipelineMap = objectUnderTest.parseConfiguration(); + final Map pipelineMap = objectUnderTest.transformConfiguration(); assertThat(pipelineMap.size(), equalTo(1)); assertThat(pipelineMap, hasKey("test-pipeline-1")); @@ -324,16 +297,17 @@ void parseConfiguration_uses_CircuitBreaking_buffer_when_circuit_breakers_applie verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration).getSinkShutdownTimeout(); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test void parseConfiguration_uses_unwrapped_buffer_when_no_circuit_breakers_are_applied() { when(circuitBreakerManager.getGlobalCircuitBreaker()) .thenReturn(Optional.empty()); - final PipelineParser objectUnderTest = + final PipelineTransformer objectUnderTest = createObjectUnderTest(TestDataProvider.VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE); - final Map pipelineMap = objectUnderTest.parseConfiguration(); + final Map pipelineMap = objectUnderTest.transformConfiguration(); assertThat(pipelineMap.size(), equalTo(1)); assertThat(pipelineMap, hasKey("test-pipeline-1")); @@ -345,6 +319,7 @@ void parseConfiguration_uses_unwrapped_buffer_when_no_circuit_breakers_are_appli verify(dataPrepperConfiguration).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration).getSinkShutdownTimeout(); verify(dataPrepperConfiguration).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } @Test @@ -352,10 +327,10 @@ void parseConfiguration_uses_unwrapped_buffer_for_pipeline_connectors() { final CircuitBreaker circuitBreaker = mock(CircuitBreaker.class); when(circuitBreakerManager.getGlobalCircuitBreaker()) .thenReturn(Optional.of(circuitBreaker)); - final PipelineParser objectUnderTest = + final PipelineTransformer objectUnderTest = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); - final Map pipelineMap = objectUnderTest.parseConfiguration(); + final Map pipelineMap = objectUnderTest.transformConfiguration(); assertThat(pipelineMap, hasKey("test-pipeline-1")); final Pipeline entryPipeline = pipelineMap.get("test-pipeline-1"); @@ -371,6 +346,7 @@ void parseConfiguration_uses_unwrapped_buffer_for_pipeline_connectors() { verify(dataPrepperConfiguration, times(3)).getProcessorShutdownTimeout(); verify(dataPrepperConfiguration, times(3)).getSinkShutdownTimeout(); verify(dataPrepperConfiguration, times(3)).getPeerForwarderConfiguration(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } private void mockDataPrepperConfigurationAccesses() { @@ -397,12 +373,13 @@ void getSecondaryBuffers(final int outerMapEntryCount, final int innerMapEntryCo final Map>>> bufferMap = generateBufferMap(outerMapEntryCount, innerMapEntryCount); when(peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap()).thenReturn(bufferMap); - final PipelineParser pipelineParser = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); - final List secondaryBuffers = pipelineParser.getSecondaryBuffers(); + final PipelineTransformer pipelineTransformer = createObjectUnderTest(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); + final List secondaryBuffers = pipelineTransformer.getSecondaryBuffers(); assertThat(secondaryBuffers.size(), is(outerMapEntryCount * innerMapEntryCount)); secondaryBuffers.forEach(retrievedBuffer -> assertThat(retrievedBuffer, is(buffer))); verify(peerForwarderProvider).getPipelinePeerForwarderReceiveBufferMap(); + verify(dataPrepperConfiguration).getPipelineExtensions(); } private static Stream provideGetSecondaryBufferArgs() { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java new file mode 100644 index 0000000000..aaa79eba34 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java @@ -0,0 +1,69 @@ +package org.opensearch.dataprepper.parser; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.TestDataProvider; +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class PipelinesDataflowModelParserTest { + @Test + void parseConfiguration_with_multiple_valid_pipelines() { + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(TestDataProvider.VALID_MULTIPLE_PIPELINE_CONFIG_FILE); + final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), + equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); + } + + @Test + void parseConfiguration_with_incompatible_version_should_throw() { + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(TestDataProvider.INCOMPATIBLE_VERSION_CONFIG_FILE); + + final RuntimeException actualException = assertThrows( + RuntimeException.class, pipelinesDataflowModelParser::parseConfiguration); + assertThat(actualException.getMessage(), + equalTo(String.format("The version: 3005.0 is not compatible with the current version: %s", DataPrepperVersion.getCurrentVersion()))); + } + + @Test + void parseConfiguration_with_a_configuration_file_which_does_not_exist_should_throw() { + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser("file_does_no_exist.yml"); + final RuntimeException actualException = assertThrows(RuntimeException.class, + pipelinesDataflowModelParser::parseConfiguration); + assertThat(actualException.getMessage(), equalTo("Pipelines configuration file not found at file_does_no_exist.yml")); + } + + @Test + void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_model() { + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(TestDataProvider.MULTI_FILE_PIPELINE_DIRECTOTRY); + final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), + equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); + } + + @Test + void parseConfiguration_from_directory_with_single_file_creates_the_correct_model() { + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(TestDataProvider.SINGLE_FILE_PIPELINE_DIRECTOTRY); + final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), + equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); + } + + @Test + void parseConfiguration_from_directory_with_no_yaml_files_should_throw() { + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(TestDataProvider.EMPTY_PIPELINE_DIRECTOTRY); + final RuntimeException actualException = assertThrows(RuntimeException.class, + pipelinesDataflowModelParser::parseConfiguration); + assertThat(actualException.getMessage(), equalTo( + String.format("Pipelines configuration file not found at %s", TestDataProvider.EMPTY_PIPELINE_DIRECTOTRY))); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java index c28f4beb5a..3037b6a68d 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java @@ -6,13 +6,14 @@ package org.opensearch.dataprepper.parser.config; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.parser.PipelineTransformer; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.parser.PipelineParser; import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.model.event.EventFactory; @@ -22,15 +23,13 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class PipelineParserConfigurationTest { private static final PipelineParserConfiguration pipelineParserConfiguration = new PipelineParserConfiguration(); @Mock - private FileStructurePathProvider fileStructurePathProvider; + private PipelinesDataFlowModel pipelinesDataFlowModel; @Mock private PluginFactory pluginFactory; @@ -58,15 +57,10 @@ class PipelineParserConfigurationTest { @Test void pipelineParser() { - final String pipelineConfigFileLocation = "hot soup"; - when(fileStructurePathProvider.getPipelineConfigFileLocation()) - .thenReturn(pipelineConfigFileLocation); - - final PipelineParser pipelineParser = pipelineParserConfiguration.pipelineParser( - fileStructurePathProvider, pluginFactory, peerForwarderProvider, routerFactory, + final PipelineTransformer pipelineTransformer = pipelineParserConfiguration.pipelineParser( + pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager, sourceCoordinatorFactory); - assertThat(pipelineParser, is(notNullValue())); - verify(fileStructurePathProvider).getPipelineConfigFileLocation(); + assertThat(pipelineTransformer, is(notNullValue())); } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/DataPrepperConfigurationTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/DataPrepperConfigurationTests.java index ed8265fb12..c0ffc81ad1 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/DataPrepperConfigurationTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/DataPrepperConfigurationTests.java @@ -58,6 +58,7 @@ public void testSomeDefaultConfig() throws IOException { final DataPrepperConfiguration dataPrepperConfiguration = makeConfig(TestDataProvider.VALID_DATA_PREPPER_SOME_DEFAULT_CONFIG_FILE); Assert.assertEquals(DataPrepperConfiguration.DEFAULT_CONFIG.getServerPort(), dataPrepperConfiguration.getServerPort()); + Assert.assertNull(dataPrepperConfiguration.getPipelineExtensions()); } @Test @@ -262,4 +263,16 @@ void testConfigWithPipelineShutdown() throws IOException { assertThat(config, notNullValue()); assertThat(config.getPipelineShutdown(), equalTo(PipelineShutdownOption.ON_ALL_PIPELINE_FAILURES)); } + + @Test + void testConfigWithTestExtension() throws IOException { + final DataPrepperConfiguration dataPrepperConfiguration = makeConfig( + TestDataProvider.VALID_DATA_PREPPER_CONFIG_WITH_TEST_EXTENSION_FILE); + + assertThat(dataPrepperConfiguration, notNullValue()); + assertThat(dataPrepperConfiguration.getPipelineExtensions(), notNullValue()); + assertThat(dataPrepperConfiguration.getPipelineExtensions().getExtensionMap(), equalTo( + Map.of("test_extension", Map.of("test_attribute", "test_string")) + )); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java index 581e6530a9..dea4e45fa0 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java @@ -5,8 +5,12 @@ package org.opensearch.dataprepper.plugin; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.plugins.TestPlugin; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,8 +32,10 @@ * Integration test of the plugin framework. These tests should not mock any portion * of the plugin framework. But, they may mock inputs when appropriate. */ +@ExtendWith(MockitoExtension.class) class DefaultPluginFactoryIT { - + @Mock + private DataPrepperConfiguration dataPrepperConfiguration; private String pluginName; private String pipelineName; @@ -54,6 +60,7 @@ private DefaultPluginFactory createObjectUnderTest() { coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName()); coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); coreContext.register(PluginBeanFactoryProvider.class); + coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration); coreContext.refresh(); return coreContext.getBean(DefaultPluginFactory.class); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java index 00514f4e3e..4df0af1e33 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java @@ -5,24 +5,32 @@ package org.opensearch.dataprepper.plugin; +import org.apache.commons.lang3.stream.Streams; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; +import org.opensearch.dataprepper.plugins.test.TestExtensionConfig; +import org.opensearch.dataprepper.plugins.test.TestExtensionWithConfig; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -36,13 +44,17 @@ @ExtendWith(MockitoExtension.class) class ExtensionLoaderTest { + @Mock + private ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter; @Mock private ExtensionClassProvider extensionClassProvider; @Mock private PluginCreator pluginCreator; + @Captor + private ArgumentCaptor pluginArgumentsContextArgumentCaptor; private ExtensionLoader createObjectUnderTest() { - return new ExtensionLoader(extensionClassProvider, pluginCreator); + return new ExtensionLoader(extensionPluginConfigurationConverter, extensionClassProvider, pluginCreator); } @Test @@ -75,6 +87,34 @@ void loadExtensions_returns_single_extension_for_single_plugin_class() { assertThat(extensionPlugins.get(0), equalTo(expectedPlugin)); } + @ParameterizedTest + @MethodSource("validExtensionConfigs") + void loadExtensions_returns_single_extension_with_config_for_single_plugin_class( + final TestExtensionConfig testExtensionConfig) { + when(extensionClassProvider.loadExtensionPluginClasses()) + .thenReturn(Collections.singleton(TestExtensionWithConfig.class)); + + final TestExtensionWithConfig expectedPlugin = mock(TestExtensionWithConfig.class); + final String expectedPluginName = "test_extension_with_config"; + when(extensionPluginConfigurationConverter.convert(eq(TestExtensionConfig.class), + eq("test_extension"))).thenReturn(testExtensionConfig); + when(pluginCreator.newPluginInstance( + eq(TestExtensionWithConfig.class), + any(PluginArgumentsContext.class), + eq(expectedPluginName))) + .thenReturn(expectedPlugin); + + final List extensionPlugins = createObjectUnderTest().loadExtensions(); + + verify(pluginCreator).newPluginInstance(eq(TestExtensionWithConfig.class), + pluginArgumentsContextArgumentCaptor.capture(), eq(expectedPluginName)); + assertThat(pluginArgumentsContextArgumentCaptor.getValue(), instanceOf( + ExtensionLoader.SingleConfigArgumentArgumentsContext.class)); + assertThat(extensionPlugins, notNullValue()); + assertThat(extensionPlugins.size(), equalTo(1)); + assertThat(extensionPlugins.get(0), equalTo(expectedPlugin)); + } + @Test void loadExtensions_returns_multiple_extensions_for_multiple_plugin_classes() { final Collection> pluginClasses = new HashSet<>(); @@ -178,6 +218,12 @@ void classNameToPluginName_returns_name_split_by_uppercase(final String input, f assertThat(ExtensionLoader.classNameToPluginName(input), equalTo(expected)); } + private static Stream validExtensionConfigs() { + return Streams.of( + Arguments.of(new TestExtensionConfig()), + null); + } + private interface TestExtension1 extends ExtensionPlugin { } private interface TestExtension2 extends ExtensionPlugin { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverterTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverterTest.java new file mode 100644 index 0000000000..3b49a8daf6 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverterTest.java @@ -0,0 +1,110 @@ +package org.opensearch.dataprepper.plugin; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Path; +import jakarta.validation.Validator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.parser.model.PipelineExtensions; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; +import org.opensearch.dataprepper.plugins.test.TestExtension; +import org.opensearch.dataprepper.plugins.test.TestExtensionConfig; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ExtensionPluginConfigurationConverterTest { + @Mock + private Validator validator; + @Mock + private DataPrepperConfiguration dataPrepperConfiguration; + @Mock + private PipelineExtensions pipelineExtensions; + @Mock + private ConstraintViolation constraintViolation; + + private final ObjectMapper objectMapper = new ObjectMapperConfiguration().objectMapper(); + private ExtensionPluginConfigurationConverter objectUnderTest; + + @BeforeEach + void setUp() { + objectUnderTest = new ExtensionPluginConfigurationConverter( + dataPrepperConfiguration, validator, objectMapper); + } + + @Test + void convert_with_null_extensionConfigurationType_should_throw() { + assertThrows(NullPointerException.class, + () -> objectUnderTest.convert(null, "testKey")); + } + + @Test + void convert_with_null_rootKey_should_throw() { + assertThrows(NullPointerException.class, + () -> objectUnderTest.convert(TestExtension.class, null)); + } + + @Test + void convert_with_test_extension_with_config() { + when(validator.validate(any())).thenReturn(Collections.emptySet()); + when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(pipelineExtensions); + final String rootKey = "test_extension"; + final String testValue = "test_value"; + when(pipelineExtensions.getExtensionMap()).thenReturn(Map.of( + rootKey, Map.of("test_attribute", testValue) + )); + final Object testExtensionConfig = objectUnderTest.convert(TestExtensionConfig.class, rootKey); + assertThat(testExtensionConfig, instanceOf(TestExtensionConfig.class)); + assertThat(((TestExtensionConfig) testExtensionConfig).getTestAttribute(), equalTo(testValue)); + } + + @Test + void convert_with_null_rootKey_value_should_return_null() { + when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(pipelineExtensions); + final String rootKey = "test_extension"; + when(pipelineExtensions.getExtensionMap()).thenReturn(Collections.emptyMap()); + final Object testExtensionConfig = objectUnderTest.convert(TestExtensionConfig.class, rootKey); + assertThat(testExtensionConfig, nullValue()); + } + + @Test + void convert_should_throw_exception_when_there_are_constraint_violations() { + when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(pipelineExtensions); + final String rootKey = UUID.randomUUID().toString(); + when(pipelineExtensions.getExtensionMap()).thenReturn(Map.of(rootKey, Collections.emptyMap())); + final String errorMessage = UUID.randomUUID().toString(); + given(constraintViolation.getMessage()).willReturn(errorMessage); + final String propertyPathString = UUID.randomUUID().toString(); + final Path propertyPath = mock(Path.class); + given(propertyPath.toString()).willReturn(propertyPathString); + given(constraintViolation.getPropertyPath()).willReturn(propertyPath); + + given(validator.validate(any())) + .willReturn(Collections.singleton(constraintViolation)); + + final InvalidPluginConfigurationException actualException = assertThrows(InvalidPluginConfigurationException.class, + () -> objectUnderTest.convert(TestExtensionConfig.class, rootKey)); + + assertThat(actualException.getMessage(), containsString(rootKey)); + assertThat(actualException.getMessage(), containsString(propertyPathString)); + assertThat(actualException.getMessage(), containsString(errorMessage)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionsIT.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionsIT.java index 2f0675ff25..a498cd5f3b 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionsIT.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionsIT.java @@ -5,15 +5,30 @@ package org.opensearch.dataprepper.plugin; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; +import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.event.DefaultEventFactory; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.parser.config.DataPrepperAppConfiguration; +import org.opensearch.dataprepper.parser.config.FileStructurePathProvider; +import org.opensearch.dataprepper.parser.config.PipelineParserConfiguration; +import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; +import org.opensearch.dataprepper.pipeline.router.RouterFactory; +import org.opensearch.dataprepper.plugins.TestPluginUsingExtensionWithConfig; import org.opensearch.dataprepper.plugins.test.TestExtension; import org.opensearch.dataprepper.plugins.TestPluginUsingExtension; +import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.ArrayList; @@ -30,8 +45,24 @@ import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) public class ExtensionsIT { + @Mock + private FileStructurePathProvider fileStructurePathProvider; + @Mock + private PeerForwarderProvider peerForwarderProvider; + @Mock + private RouterFactory routerFactory; + @Mock + private CircuitBreakerManager circuitBreakerManager; + @Mock + private EventFactory eventFactory; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private SourceCoordinatorFactory sourceCoordinatorFactory; private AnnotationConfigApplicationContext publicContext; private AnnotationConfigApplicationContext coreContext; private PluginFactory pluginFactory; @@ -52,6 +83,25 @@ void setUp() { coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName()); coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); + + when(fileStructurePathProvider.getPipelineConfigFileLocation()).thenReturn( + "src/test/resources/valid_pipeline.yml" + ); + when(fileStructurePathProvider.getDataPrepperConfigFileLocation()).thenReturn( + "src/test/resources/valid_data_prepper_config_with_test_extension.yml" + ); + + coreContext.registerBean(FileStructurePathProvider.class, () -> fileStructurePathProvider); + coreContext.registerBean(PeerForwarderProvider.class, () -> peerForwarderProvider); + coreContext.registerBean(RouterFactory.class, () -> routerFactory); + coreContext.registerBean(DataPrepperAppConfiguration.class, DataPrepperAppConfiguration::new); + coreContext.registerBean(CircuitBreakerManager.class, () -> circuitBreakerManager); + coreContext.registerBean(EventFactory.class, () -> eventFactory); + coreContext.registerBean(AcknowledgementSetManager.class, () -> acknowledgementSetManager); + coreContext.registerBean(SourceCoordinatorFactory.class, () -> sourceCoordinatorFactory); + coreContext.registerBean(ObjectMapperConfiguration.class, ObjectMapperConfiguration::new); + coreContext.registerBean(ObjectMapper.class, () -> new ObjectMapper(new YAMLFactory())); + coreContext.register(PipelineParserConfiguration.class); coreContext.refresh(); pluginFactory = coreContext.getBean(DefaultPluginFactory.class); @@ -85,6 +135,26 @@ void creating_a_plugin_using_an_extension() { assertThat(testPluginUsingExtension.getExtensionModel().getExtensionId(), notNullValue()); } + @Test + void creating_a_plugin_using_an_extension_with_config() { + pluginName = "test_plugin_using_extension_with_config"; + final String requiredStringValue = UUID.randomUUID().toString(); + final String optionalStringValue = UUID.randomUUID().toString(); + final Map pluginSettingMap = new HashMap<>(); + pluginSettingMap.put("required_string", requiredStringValue); + pluginSettingMap.put("optional_string", optionalStringValue); + final PluginSetting pluginSetting = createPluginSettings(pluginSettingMap); + + final TestPluggableInterface pluggableInterface = pluginFactory.loadPlugin(TestPluggableInterface.class, pluginSetting); + + assertThat(pluggableInterface, notNullValue()); + assertThat(pluggableInterface, instanceOf(TestPluginUsingExtensionWithConfig.class)); + final TestPluginUsingExtensionWithConfig testPluginUsingExtensionWithConfig = + (TestPluginUsingExtensionWithConfig) pluggableInterface; + assertThat(testPluginUsingExtensionWithConfig.getExtensionModel(), notNullValue()); + assertThat(testPluginUsingExtensionWithConfig.getExtensionModel().getTestAttribute(), equalTo("test_string")); + } + @Test void creating_multiple_plugins_using_an_extension() { final String requiredStringValue = UUID.randomUUID().toString(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java index 5d5591f686..b70dafb09f 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugin; +import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import jakarta.validation.ConstraintViolation; @@ -32,6 +33,7 @@ class PluginConfigurationConverterTest { private PluginSetting pluginSetting; private Validator validator; + private final ObjectMapper objectMapper = new ObjectMapperConfiguration().objectMapper(); static class TestConfiguration { @SuppressWarnings("unused") @@ -50,7 +52,7 @@ void setUp() { } private PluginConfigurationConverter createObjectUnderTest() { - return new PluginConfigurationConverter(validator); + return new PluginConfigurationConverter(validator, objectMapper); } @Test diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginUsingExtensionWithConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginUsingExtensionWithConfig.java new file mode 100644 index 0000000000..c97e2c4a0a --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestPluginUsingExtensionWithConfig.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.plugin.TestPluggableInterface; +import org.opensearch.dataprepper.plugin.TestPluginConfiguration; +import org.opensearch.dataprepper.plugins.test.TestExtensionWithConfig; + +/** + * Used for integration testing the plugin framework with extensions. + */ +@DataPrepperPlugin(name = "test_plugin_using_extension_with_config", pluginType = TestPluggableInterface.class, pluginConfigurationType = TestPluginConfiguration.class) +public class TestPluginUsingExtensionWithConfig implements TestPluggableInterface { + private final TestPluginConfiguration configuration; + private final TestExtensionWithConfig.TestModel extensionModel; + + @DataPrepperPluginConstructor + public TestPluginUsingExtensionWithConfig(final TestPluginConfiguration configuration, + final TestExtensionWithConfig.TestModel extensionModel) { + this.configuration = configuration; + this.extensionModel = extensionModel; + } + + public TestPluginConfiguration getConfiguration() { + return configuration; + } + + public TestExtensionWithConfig.TestModel getExtensionModel() { + return extensionModel; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/test/TestExtensionConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/test/TestExtensionConfig.java new file mode 100644 index 0000000000..9bd191190d --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/test/TestExtensionConfig.java @@ -0,0 +1,12 @@ +package org.opensearch.dataprepper.plugins.test; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TestExtensionConfig { + @JsonProperty("test_attribute") + private String testAttribute; + + public String getTestAttribute() { + return testAttribute; + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/test/TestExtensionWithConfig.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/test/TestExtensionWithConfig.java new file mode 100644 index 0000000000..768f42ad76 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/test/TestExtensionWithConfig.java @@ -0,0 +1,80 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.test; + +import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; +import org.opensearch.dataprepper.model.plugin.ExtensionPoints; +import org.opensearch.dataprepper.model.plugin.ExtensionProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +@DataPrepperExtensionPlugin(modelType = TestExtensionConfig.class, rootKey = "test_extension") +public class TestExtensionWithConfig implements ExtensionPlugin { + private static final Logger LOG = LoggerFactory.getLogger(TestExtensionWithConfig.class); + private static final AtomicInteger CONSTRUCTED_COUNT = new AtomicInteger(0); + private final String extensionId; + private TestExtensionConfig testExtensionConfig; + + @DataPrepperPluginConstructor + public TestExtensionWithConfig(final TestExtensionConfig testExtensionConfig) { + LOG.info("Constructing test extension plugin."); + CONSTRUCTED_COUNT.incrementAndGet(); + extensionId = UUID.randomUUID().toString(); + this.testExtensionConfig = testExtensionConfig; + } + + @Override + public void apply(final ExtensionPoints extensionPoints) { + LOG.info("Applying test extension."); + extensionPoints.addExtensionProvider(new TestExtensionProvider()); + } + + public static void reset() { + CONSTRUCTED_COUNT.set(0); + } + + public static int getConstructedInstances() { + return CONSTRUCTED_COUNT.get(); + } + + public static class TestModel { + private final String extensionId; + + private final String testAttribute; + + private TestModel(final String extensionId, final String testAttribute) { + + this.extensionId = extensionId; + this.testAttribute = testAttribute; + } + public String getExtensionId() { + return this.extensionId; + } + + public String getTestAttribute() { + return testAttribute; + } + } + + private class TestExtensionProvider implements ExtensionProvider { + + @Override + public Optional provideInstance(final Context context) { + return Optional.of(new TestModel(extensionId, testExtensionConfig.getTestAttribute())); + } + + @Override + public Class supportedClass() { + return TestModel.class; + } + } +} diff --git a/data-prepper-core/src/test/resources/valid_data_prepper_config_with_test_extension.yml b/data-prepper-core/src/test/resources/valid_data_prepper_config_with_test_extension.yml new file mode 100644 index 0000000000..16ffd373b0 --- /dev/null +++ b/data-prepper-core/src/test/resources/valid_data_prepper_config_with_test_extension.yml @@ -0,0 +1,4 @@ +ssl: false +extensions: + test_extension: + test_attribute: test_string \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/valid_pipeline.yml b/data-prepper-core/src/test/resources/valid_pipeline.yml new file mode 100644 index 0000000000..5135b36fed --- /dev/null +++ b/data-prepper-core/src/test/resources/valid_pipeline.yml @@ -0,0 +1,5 @@ +entry-pipeline: + source: + random: + sink: + - stdout: \ No newline at end of file