Skip to content

Commit

Permalink
Expanded plugin constructor capabilities (opensearch-project#481)
Browse files Browse the repository at this point in the history
Support more constructors for plugins by adding the DataPrepperPluginConstructor annotation. This is the preferred constructor. If no other constructor is available for a plugin, use the no-op constructor. Updated the HTTPSource plugin to use this capability to receive both a configuration model and PluginMetrics via the constructor. For a single parameter, un-annotated constructor in plugins, the only supported parameter is once again PluginSetting.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Nov 2, 2021
1 parent 4353495 commit 3c7ceec
Show file tree
Hide file tree
Showing 22 changed files with 553 additions and 404 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.amazon.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Add to a plugin class to indicate which constructor should be used by Data Prepper.
* <p>
* The current behavior for choosing a constructor is:
* <ol>
* <li>Choose the constructor annotated with {@link DataPrepperPluginConstructor}</li>
* <li>Choose a constructor which takes in a single parameter matching
* the {@link DataPrepperPlugin#pluginConfigurationType()} for the plugin</li>
* <li>Use the default (ie. empty) constructor</li>
* </ol>
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.CONSTRUCTOR})
public @interface DataPrepperPluginConstructor {
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ public class InvalidPluginDefinitionException extends RuntimeException {
public InvalidPluginDefinitionException(final String message, final Throwable cause) {
super(message, cause);
}

public InvalidPluginDefinitionException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.amazon.dataprepper.model.plugin;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

Expand All @@ -19,19 +21,41 @@ void setUp() {
cause = mock(Throwable.class);
}

private InvalidPluginDefinitionException createObjectUnderTest() {
return new InvalidPluginDefinitionException(message, cause);
@Nested
class MessageCauseConstructor {
private InvalidPluginDefinitionException createObjectUnderTest() {
return new InvalidPluginDefinitionException(message, cause);
}

@Test
void getMessage_returns_message() {
assertThat(createObjectUnderTest().getMessage(),
equalTo(message));
}

@Test
void getCause_returns_cause() {
assertThat(createObjectUnderTest().getCause(),
equalTo(cause));
}
}

@Test
void getMessage_returns_message() {
assertThat(createObjectUnderTest().getMessage(),
equalTo(message));
}

@Test
void getCause_returns_cause() {
assertThat(createObjectUnderTest().getCause(),
equalTo(cause));
@Nested
class MessageOnlyConstructor {
private InvalidPluginDefinitionException createObjectUnderTest() {
return new InvalidPluginDefinitionException(message);
}

@Test
void getMessage_returns_message() {
assertThat(createObjectUnderTest().getMessage(),
equalTo(message));
}

@Test
void getCause_returns_null() {
assertThat(createObjectUnderTest().getCause(),
nullValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class DefaultPluginFactory implements PluginFactory {

public DefaultPluginFactory() {
this(new PluginProviderLoader(), new PluginCreator(), new PluginConfigurationConverter());
// TODO: Remove this along with the removal of com.amazon.dataprepper.plugins.PluginFactory
com.amazon.dataprepper.plugins.PluginFactory.dangerousMethod_setPluginFunction(
((pluginSetting, aClass) -> pluginCreator.newPluginInstance(aClass, getConstructionContext(pluginSetting, aClass), pluginSetting.getName()))
);
}

/**
Expand Down Expand Up @@ -52,9 +56,9 @@ public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSett
final String pluginName = pluginSetting.getName();
final Class<? extends T> pluginClass = getPluginClass(baseClass, pluginName);

final Object configuration = getConfiguration(pluginSetting, pluginClass);
final PluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass);

return pluginCreator.newPluginInstance(pluginClass, configuration, pluginName);
return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName);
}

@Override
Expand All @@ -70,25 +74,30 @@ public <T> List<T> loadPlugins(
if(numberOfInstances == null || numberOfInstances < 0)
throw new IllegalArgumentException("The numberOfInstances must be provided as a non-negative integer.");

final Object configuration = getConfiguration(pluginSetting, pluginClass);
final PluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass);

final List<T> plugins = new ArrayList<>(numberOfInstances);
for (int i = 0; i < numberOfInstances; i++) {
plugins.add(pluginCreator.newPluginInstance(pluginClass, configuration, pluginName));
plugins.add(pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName));
}
return plugins;
}

private <T> Object getConfiguration(final PluginSetting pluginSetting, final Class<? extends T> pluginClass) {
private <T> PluginArgumentsContext getConstructionContext(final PluginSetting pluginSetting, final Class<? extends T> pluginClass) {
final DataPrepperPlugin pluginAnnotation = pluginClass.getAnnotation(DataPrepperPlugin.class);

final Class<?> pluginConfigurationType = pluginAnnotation.pluginConfigurationType();
return pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting);
final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting);

return new PluginArgumentsContext.Builder()
.withPluginSetting(pluginSetting)
.withPluginConfiguration(configuration)
.build();
}

private <T> Class<? extends T> getPluginClass(final Class<T> baseClass, final String pluginName) {
return pluginProviders.stream()
.map(pluginProvider -> pluginProvider.<T>findPluginClass(baseClass, pluginName))
.map(pluginProvider -> pluginProvider.findPluginClass(baseClass, pluginName))
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.amazon.dataprepper.plugin;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.plugin.InvalidPluginDefinitionException;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/**
* An internal class which represents all the data which can be provided
* when constructing a new plugin.
*/
class PluginArgumentsContext {
private final Map<Class<?>, Supplier<Object>> typedArgumentsSuppliers;

private PluginArgumentsContext(final Builder builder) {
Objects.requireNonNull(builder.pluginSetting,
"PluginArgumentsContext received a null Builder object. This is likely an error in the plugin framework.");

typedArgumentsSuppliers = new HashMap<>();

typedArgumentsSuppliers.put(builder.pluginSetting.getClass(), () -> builder.pluginSetting);

if(builder.pluginConfiguration != null) {
typedArgumentsSuppliers.put(builder.pluginConfiguration.getClass(), () -> builder.pluginConfiguration);
}

typedArgumentsSuppliers.put(PluginMetrics.class, () -> PluginMetrics.fromPluginSetting(builder.pluginSetting));
}

Object[] createArguments(final Class<?>[] parameterTypes) {
return Arrays.stream(parameterTypes)
.map(this::getRequiredArgumentSupplier)
.map(Supplier::get)
.toArray();
}

private Supplier<Object> getRequiredArgumentSupplier(final Class<?> parameterType) {
if(typedArgumentsSuppliers.containsKey(parameterType)) {
return typedArgumentsSuppliers.get(parameterType);
}

throw new InvalidPluginDefinitionException("Unable to create an argument for required plugin parameter type: " + parameterType);
}

static class Builder {
private Object pluginConfiguration;
private PluginSetting pluginSetting;

Builder withPluginConfiguration(final Object pluginConfiguration) {
this.pluginConfiguration = pluginConfiguration;
return this;
}

Builder withPluginSetting(final PluginSetting pluginSetting) {
this.pluginSetting = pluginSetting;
return this;
}

PluginArgumentsContext build() {
return new PluginArgumentsContext(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
package com.amazon.dataprepper.plugin;

import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.plugin.InvalidPluginDefinitionException;
import com.amazon.dataprepper.model.plugin.PluginInvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import static java.lang.String.format;
import java.util.Optional;
import java.util.stream.Collectors;

class PluginCreator {
private static final Logger LOG = LoggerFactory.getLogger(PluginCreator.class);

<T> T newPluginInstance(final Class<T> pluginClass,
final Object pluginConfiguration,
final PluginArgumentsContext pluginArgumentsContext,
final String pluginName) {
Objects.requireNonNull(pluginClass);
Objects.requireNonNull(pluginArgumentsContext);
Objects.requireNonNull(pluginName);

final Constructor<?> constructor = getConstructor(pluginClass, pluginName);

final Class<?> pluginConfigurationType = pluginConfiguration.getClass();
final Constructor<?> constructor = getConstructor(pluginClass, pluginConfigurationType, pluginName);
final Object[] constructorArguments = pluginArgumentsContext.createArguments(constructor.getParameterTypes());

try {
return (T) constructor.newInstance(pluginConfiguration);
return (T) constructor.newInstance(constructorArguments);
} catch (final IllegalAccessException | InstantiationException ex) {
LOG.error("Encountered exception while instantiating the plugin {}", pluginClass.getSimpleName(), ex);
throw new InvalidPluginDefinitionException("Unable to access or instantiate the plugin '" + pluginClass.getSimpleName() + ".'", ex);
Expand All @@ -33,16 +40,54 @@ <T> T newPluginInstance(final Class<T> pluginClass,
}
}

private <T> Constructor<?> getConstructor(final Class<T> pluginClass, final Class<?> pluginConfigurationType, final String pluginName) {
try {
return pluginClass.getConstructor(pluginConfigurationType);
} catch (final NoSuchMethodException ex) {
LOG.error("Data Prepper plugin requires a constructor with {} parameter;" +
" Plugin {} with name {} is missing such constructor.", pluginConfigurationType,
pluginClass.getSimpleName(), pluginName, ex);
throw new InvalidPluginDefinitionException(format("Data Prepper plugin requires a constructor with %s parameter;" +
" Plugin %s with name %s is missing such constructor.", pluginConfigurationType,
pluginClass.getSimpleName(), pluginName), ex);
private <T> Constructor<?> getConstructor(final Class<T> pluginClass, final String pluginName) {

final Constructor<?>[] constructors = pluginClass.getConstructors();

final Optional<Constructor<?>> annotatedConstructor = getAnnotatedConstructor(pluginClass, constructors);
if(annotatedConstructor.isPresent())
return annotatedConstructor.get();

final Optional<Constructor<?>> pluginSettingOnlyConstructor = Arrays.stream(constructors)
.filter(c -> Arrays.equals(c.getParameterTypes(), new Class[]{PluginSetting.class}))
.findFirst();

if(pluginSettingOnlyConstructor.isPresent())
return pluginSettingOnlyConstructor.get();

final Optional<Constructor<?>> defaultConstructor = Arrays.stream(constructors)
.filter(c -> c.getParameterTypes().length == 0)
.findFirst();

if(defaultConstructor.isPresent())
return defaultConstructor.get();

final String error =
String.format("Data Prepper plugin %s with name %s does not have a valid plugin constructor. " +
"Please ensure the plugin has a constructor that either: " +
"1. Is annotated with @DataPrepperPlugin; " +
"2. Contains a single argument of type PluginSetting; or " +
"3. Is the default constructor.",
pluginClass.getSimpleName(), pluginName);

LOG.error("{}", error);
throw new InvalidPluginDefinitionException(error);
}

private Optional<Constructor<?>> getAnnotatedConstructor(final Class<?> pluginClass, final Constructor<?>[] constructors) {
final List<Constructor<?>> annotatedConstructors = Arrays.stream(constructors)
.filter(c -> c.isAnnotationPresent(DataPrepperPluginConstructor.class))
.collect(Collectors.toList());

if(annotatedConstructors.size() > 1) {
throw new InvalidPluginDefinitionException("The plugin type " + pluginClass +
" has more than one constructor annotated with @DataPrepperPluginConstructor. " +
"At most one constructor may have this annotation." );
}

if(annotatedConstructors.size() == 1) {
return Optional.of(annotatedConstructors.get(0));
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;
Expand Down Expand Up @@ -141,7 +143,7 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found() {
final Object convertedConfiguration = mock(Object.class);
given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting))
.willReturn(convertedConfiguration);
given(pluginCreator.newPluginInstance(expectedPluginClass, convertedConfiguration, pluginName))
given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(PluginArgumentsContext.class), eq(pluginName)))
.willReturn(expectedInstance);

assertThat(createObjectUnderTest().loadPlugin(baseClass, pluginSetting),
Expand Down Expand Up @@ -186,7 +188,7 @@ void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_
final Object convertedConfiguration = mock(Object.class);
given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting))
.willReturn(convertedConfiguration);
given(pluginCreator.newPluginInstance(expectedPluginClass, convertedConfiguration, pluginName))
given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(PluginArgumentsContext.class), eq(pluginName)))
.willReturn(expectedInstance);

final List<?> plugins = createObjectUnderTest().loadPlugins(
Expand All @@ -205,8 +207,7 @@ void loadPlugins_should_return_an_instance_for_the_total_count() {
final Object convertedConfiguration = mock(Object.class);
given(pluginConfigurationConverter.convert(PluginSetting.class, pluginSetting))
.willReturn(convertedConfiguration);

given(pluginCreator.newPluginInstance(expectedPluginClass, convertedConfiguration, pluginName))
given(pluginCreator.newPluginInstance(eq(expectedPluginClass), any(PluginArgumentsContext.class), eq(pluginName)))
.willReturn(expectedInstance1)
.willReturn(expectedInstance2)
.willReturn(expectedInstance3);
Expand Down
Loading

0 comments on commit 3c7ceec

Please sign in to comment.