Skip to content

Commit

Permalink
ENH: allow extension configuration from data prepper configuration (#…
Browse files Browse the repository at this point in the history
…2851)

* ADD: initial implementation on injecting extension config

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Aug 30, 2023
1 parent e845966 commit a407e4f
Show file tree
Hide file tree
Showing 27 changed files with 866 additions and 231 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface DataPrepperExtensionPlugin {
Class<?> modelType();

String rootKey();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import io.micrometer.core.instrument.util.StringUtils;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineParser;
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
import org.opensearch.dataprepper.pipeline.Pipeline;
import org.opensearch.dataprepper.pipeline.PipelineObserver;
Expand Down Expand Up @@ -58,13 +58,13 @@ public static String getServiceNameForMetrics() {

@Inject
public DataPrepper(
final PipelineParser pipelineParser,
final PipelineTransformer pipelineTransformer,
final PluginFactory pluginFactory,
final PeerForwarderServer peerForwarderServer,
final Predicate<Map<String, Pipeline>> shouldShutdownOnPipelineFailurePredicate) {
this.pluginFactory = pluginFactory;

transformationPipelines = pipelineParser.parseConfiguration();
transformationPipelines = pipelineTransformer.transformConfiguration();
this.shouldShutdownOnPipelineFailurePredicate = shouldShutdownOnPipelineFailurePredicate;
if (transformationPipelines.size() == 0) {
throw new RuntimeException("No valid pipeline is available for execution, exiting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@

package org.opensearch.dataprepper.parser;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
Expand All @@ -37,34 +33,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;

@SuppressWarnings("rawtypes")
public class PipelineParser {
private static final Logger LOG = LoggerFactory.getLogger(PipelineParser.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory())
.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
public class PipelineTransformer {
private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformer.class);
private static final String PIPELINE_TYPE = "pipeline";
private static final String ATTRIBUTE_NAME = "name";
private final String pipelineConfigurationFileLocation;
private final PipelinesDataFlowModel pipelinesDataFlowModel;
private final RouterFactory routerFactory;
private final DataPrepperConfiguration dataPrepperConfiguration;
private final CircuitBreakerManager circuitBreakerManager;
Expand All @@ -75,16 +60,16 @@ public class PipelineParser {
private final AcknowledgementSetManager acknowledgementSetManager;
private final SourceCoordinatorFactory sourceCoordinatorFactory;

public PipelineParser(final String pipelineConfigurationFileLocation,
final PluginFactory pluginFactory,
final PeerForwarderProvider peerForwarderProvider,
final RouterFactory routerFactory,
final DataPrepperConfiguration dataPrepperConfiguration,
final CircuitBreakerManager circuitBreakerManager,
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory) {
this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation;
public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final PluginFactory pluginFactory,
final PeerForwarderProvider peerForwarderProvider,
final RouterFactory routerFactory,
final DataPrepperConfiguration dataPrepperConfiguration,
final CircuitBreakerManager circuitBreakerManager,
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory) {
this.pipelinesDataFlowModel = pipelinesDataFlowModel;
this.pluginFactory = Objects.requireNonNull(pluginFactory);
this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider);
this.routerFactory = routerFactory;
Expand All @@ -95,82 +80,25 @@ public PipelineParser(final String pipelineConfigurationFileLocation,
this.sourceCoordinatorFactory = sourceCoordinatorFactory;
}

/**
* Parses the configuration file into Pipeline
*/
public Map<String, Pipeline> parseConfiguration() {
try (final InputStream mergedPipelineConfigurationFiles = mergePipelineConfigurationFiles()) {
final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(mergedPipelineConfigurationFiles,
PipelinesDataFlowModel.class);

final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion();
validateDataPrepperVersion(version);

final Map<String, PipelineConfiguration> pipelineConfigurationMap = pipelinesDataFlowModel.getPipelines().entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new PipelineConfiguration(entry.getValue())
));
final List<String> allPipelineNames = PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigurationMap);

// LinkedHashMap to preserve insertion order
final Map<String, Pipeline> pipelineMap = new LinkedHashMap<>();
pipelineConfigurationMap.forEach((pipelineName, configuration) ->
configuration.updateCommonPipelineConfiguration(pipelineName));
for (String pipelineName : allPipelineNames) {
if (!pipelineMap.containsKey(pipelineName) && pipelineConfigurationMap.containsKey(pipelineName)) {
buildPipelineFromConfiguration(pipelineName, pipelineConfigurationMap, pipelineMap);
}
}
return pipelineMap;
} catch (IOException e) {
LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e);
}
}

private void validateDataPrepperVersion(final DataPrepperVersion version) {
if (Objects.nonNull(version) && !DataPrepperVersion.getCurrentVersion().compatibleWith(version)) {
LOG.error("The version: {} is not compatible with the current version: {}", version, DataPrepperVersion.getCurrentVersion());
throw new ParseException(format("The version: %s is not compatible with the current version: %s",
version, DataPrepperVersion.getCurrentVersion()));
}
}

private InputStream mergePipelineConfigurationFiles() throws IOException {
final File configurationLocation = new File(pipelineConfigurationFileLocation);

if (configurationLocation.isFile()) {
return new FileInputStream(configurationLocation);
} else if (configurationLocation.isDirectory()) {
FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml"));
List<InputStream> configurationFiles = Stream.of(configurationLocation.listFiles(yamlFilter))
.map(file -> {
InputStream inputStream;
try {
inputStream = new FileInputStream(file);
LOG.info("Reading pipeline configuration from {}", file.getName());
} catch (FileNotFoundException e) {
inputStream = null;
LOG.warn("Pipeline configuration file {} not found", file.getName());
}
return inputStream;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (configurationFiles.isEmpty()) {
LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation);
throw new ParseException(
format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation));
public Map<String, Pipeline> transformConfiguration() {
final Map<String, PipelineConfiguration> pipelineConfigurationMap = pipelinesDataFlowModel.getPipelines().entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new PipelineConfiguration(entry.getValue())
));
final List<String> allPipelineNames = PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigurationMap);

// LinkedHashMap to preserve insertion order
final Map<String, Pipeline> pipelineMap = new LinkedHashMap<>();
pipelineConfigurationMap.forEach((pipelineName, configuration) ->
configuration.updateCommonPipelineConfiguration(pipelineName));
for (String pipelineName : allPipelineNames) {
if (!pipelineMap.containsKey(pipelineName) && pipelineConfigurationMap.containsKey(pipelineName)) {
buildPipelineFromConfiguration(pipelineName, pipelineConfigurationMap, pipelineMap);
}

return new SequenceInputStream(Collections.enumeration(configurationFiles));
} else {
LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation));
}
return pipelineMap;
}

private void buildPipelineFromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.opensearch.dataprepper.parser;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;

public class PipelinesDataflowModelParser {
private static final Logger LOG = LoggerFactory.getLogger(PipelinesDataflowModelParser.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory())
.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);

private final String pipelineConfigurationFileLocation;

public PipelinesDataflowModelParser(final String pipelineConfigurationFileLocation) {
this.pipelineConfigurationFileLocation = pipelineConfigurationFileLocation;
}

public PipelinesDataFlowModel parseConfiguration() {
try (final InputStream mergedPipelineConfigurationFiles = mergePipelineConfigurationFiles()) {
final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(mergedPipelineConfigurationFiles,
PipelinesDataFlowModel.class);

final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion();
validateDataPrepperVersion(version);

return pipelinesDataFlowModel;
} catch (IOException e) {
LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e);
}
}

private void validateDataPrepperVersion(final DataPrepperVersion version) {
if (Objects.nonNull(version) && !DataPrepperVersion.getCurrentVersion().compatibleWith(version)) {
LOG.error("The version: {} is not compatible with the current version: {}", version, DataPrepperVersion.getCurrentVersion());
throw new ParseException(format("The version: %s is not compatible with the current version: %s",
version, DataPrepperVersion.getCurrentVersion()));
}
}

private InputStream mergePipelineConfigurationFiles() throws IOException {
final File configurationLocation = new File(pipelineConfigurationFileLocation);

if (configurationLocation.isFile()) {
return new FileInputStream(configurationLocation);
} else if (configurationLocation.isDirectory()) {
FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml"));
List<InputStream> configurationFiles = Stream.of(configurationLocation.listFiles(yamlFilter))
.map(file -> {
InputStream inputStream;
try {
inputStream = new FileInputStream(file);
LOG.info("Reading pipeline configuration from {}", file.getName());
} catch (FileNotFoundException e) {
inputStream = null;
LOG.warn("Pipeline configuration file {} not found", file.getName());
}
return inputStream;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (configurationFiles.isEmpty()) {
LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation);
throw new ParseException(
format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation));
}

return new SequenceInputStream(Collections.enumeration(configurationFiles));
} else {
LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
package org.opensearch.dataprepper.parser.config;

import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineParser;
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.parser.PipelinesDataflowModelParser;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
Expand All @@ -21,8 +23,8 @@
public class PipelineParserConfiguration {

@Bean
public PipelineParser pipelineParser(
final FileStructurePathProvider fileStructurePathProvider,
public PipelineTransformer pipelineParser(
final PipelinesDataFlowModel pipelinesDataFlowModel,
final PluginFactory pluginFactory,
final PeerForwarderProvider peerForwarderProvider,
final RouterFactory routerFactory,
Expand All @@ -32,7 +34,7 @@ public PipelineParser pipelineParser(
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory
) {
return new PipelineParser(fileStructurePathProvider.getPipelineConfigFileLocation(),
return new PipelineTransformer(pipelinesDataFlowModel,
pluginFactory,
peerForwarderProvider,
routerFactory,
Expand All @@ -42,4 +44,16 @@ public PipelineParser pipelineParser(
acknowledgementSetManager,
sourceCoordinatorFactory);
}

@Bean
public PipelinesDataflowModelParser pipelinesDataflowModelParser(
final FileStructurePathProvider fileStructurePathProvider) {
return new PipelinesDataflowModelParser(fileStructurePathProvider.getPipelineConfigFileLocation());
}

@Bean
public PipelinesDataFlowModel pipelinesDataFlowModel(
final PipelinesDataflowModelParser pipelinesDataflowModelParser) {
return pipelinesDataflowModelParser.parseConfiguration();
}
}
Loading

0 comments on commit a407e4f

Please sign in to comment.