diff --git a/data-prepper-api/src/main/java/com/amazon/dataprepper/model/peerforwarder/RequiresPeerForwarding.java b/data-prepper-api/src/main/java/com/amazon/dataprepper/model/peerforwarder/RequiresPeerForwarding.java index d5fc951b82..4af6654d76 100644 --- a/data-prepper-api/src/main/java/com/amazon/dataprepper/model/peerforwarder/RequiresPeerForwarding.java +++ b/data-prepper-api/src/main/java/com/amazon/dataprepper/model/peerforwarder/RequiresPeerForwarding.java @@ -5,13 +5,15 @@ package com.amazon.dataprepper.model.peerforwarder; +import com.amazon.dataprepper.model.processor.Processor; + import java.util.Set; /** * An interface that a {@link com.amazon.dataprepper.model.processor.Processor} will implement which must have peer forwarding prior to processing events. * @since 2.0 */ -public interface RequiresPeerForwarding { +public interface RequiresPeerForwarding extends Processor { /** * Gets the identification keys which Peer Forwarder uses to allocate Events to specific Data Prepper nodes. * diff --git a/data-prepper-core/build.gradle b/data-prepper-core/build.gradle index 0d95f001e7..a391bdfd74 100644 --- a/data-prepper-core/build.gradle +++ b/data-prepper-core/build.gradle @@ -49,6 +49,7 @@ dependencies { implementation 'software.amazon.awssdk:servicediscovery' testImplementation "org.mockito:mockito-inline:${versionMap.mockito}" testImplementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' } jacocoTestCoverageVerification { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfig.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfig.java index fab23d08ba..09ea0847cd 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfig.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfig.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; +import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory; import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -30,18 +31,26 @@ public PeerClientPool peerClientPool() { return new PeerClientPool(); } + @Bean + public CertificateProviderFactory certificateProviderFactory(final PeerForwarderConfiguration peerForwarderConfiguration) { + return new CertificateProviderFactory(peerForwarderConfiguration); + } + @Bean public PeerForwarderClientFactory peerForwarderClientFactory( final PeerForwarderConfiguration peerForwarderConfiguration, - final PeerClientPool peerClientPool + final PeerClientPool peerClientPool, + final CertificateProviderFactory certificateProviderFactory ) { - return new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool); + return new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool, certificateProviderFactory); } @Bean - public PeerForwarderClient peerForwarderClient(final ObjectMapper objectMapper, - final PeerForwarderClientFactory peerForwarderClientFactory) { - return new PeerForwarderClient(objectMapper, peerForwarderClientFactory); + public PeerForwarderClient peerForwarderClient(final PeerForwarderConfiguration peerForwarderConfiguration, + final PeerForwarderClientFactory peerForwarderClientFactory, + final ObjectMapper objectMapper + ) { + return new PeerForwarderClient(peerForwarderConfiguration, peerForwarderClientFactory, objectMapper); } @Bean diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactory.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactory.java index 9980a2d81c..c27c3ab86e 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactory.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactory.java @@ -15,11 +15,14 @@ public class PeerForwarderClientFactory { private final PeerForwarderConfiguration peerForwarderConfiguration; private final PeerClientPool peerClientPool; + private final CertificateProviderFactory certificateProviderFactory; - public PeerForwarderClientFactory(PeerForwarderConfiguration peerForwarderConfiguration, - PeerClientPool peerClientPool) { + public PeerForwarderClientFactory(final PeerForwarderConfiguration peerForwarderConfiguration, + final PeerClientPool peerClientPool, + final CertificateProviderFactory certificateProviderFactory) { this.peerForwarderConfiguration = peerForwarderConfiguration; this.peerClientPool = peerClientPool; + this.certificateProviderFactory = certificateProviderFactory; } public HashRing createHashRing() { @@ -40,7 +43,6 @@ public PeerClientPool setPeerClientPool() { if (ssl || useAcmCertForSsl) { peerClientPool.setSsl(true); - final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); peerClientPool.setCertificate(certificateProviderFactory.getCertificateProvider().getCertificate()); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfiguration.java index 28147337a1..cfe523fc7a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfiguration.java @@ -21,11 +21,12 @@ * @since 2.0 */ public class PeerForwarderConfiguration { + public static final String DEFAULT_PEER_FORWARDING_URI = "/event/forward"; private static final String S3_PREFIX = "s3://"; private Integer serverPort = 21890; private Integer requestTimeout = 10_000; - private Integer threadCount = 200; + private Integer serverThreadCount = 200; private Integer maxConnectionCount = 500; private Integer maxPendingRequests = 1024; private boolean ssl = true; @@ -42,6 +43,7 @@ public class PeerForwarderConfiguration { private Map awsCloudMapQueryParameters = Collections.emptyMap(); private String domainName; private List staticEndpoints = new ArrayList<>(); + private Integer clientThreadCount = 200; private Integer batchSize = 48; private Integer bufferSize = 512; private boolean sslCertAndKeyFileInS3; @@ -52,7 +54,7 @@ public PeerForwarderConfiguration() {} public PeerForwarderConfiguration ( @JsonProperty("port") final Integer serverPort, @JsonProperty("request_timeout") final Integer requestTimeout, - @JsonProperty("thread_count") final Integer threadCount, + @JsonProperty("server_thread_count") final Integer serverThreadCount, @JsonProperty("max_connection_count") final Integer maxConnectionCount, @JsonProperty("max_pending_requests") final Integer maxPendingRequests, @JsonProperty("ssl") final Boolean ssl, @@ -69,12 +71,13 @@ public PeerForwarderConfiguration ( @JsonProperty("aws_cloud_map_query_parameters") final Map awsCloudMapQueryParameters, @JsonProperty("domain_name") final String domainName, @JsonProperty("static_endpoints") final List staticEndpoints, + @JsonProperty("client_thread_count") final Integer clientThreadCount, @JsonProperty("batch_size") final Integer batchSize, @JsonProperty("buffer_size") final Integer bufferSize ) { setServerPort(serverPort); setRequestTimeout(requestTimeout); - setThreadCount(threadCount); + setServerThreadCount(serverThreadCount); setMaxConnectionCount(maxConnectionCount); setMaxPendingRequests(maxPendingRequests); setSsl(ssl); @@ -91,6 +94,7 @@ public PeerForwarderConfiguration ( setAwsCloudMapQueryParameters(awsCloudMapQueryParameters); setDomainName(domainName); setStaticEndpoints(staticEndpoints); + setClientThreadCount(clientThreadCount); setBatchSize(batchSize); setBufferSize(bufferSize); checkForCertAndKeyFileInS3(); @@ -104,8 +108,8 @@ public int getRequestTimeout() { return requestTimeout; } - public int getThreadCount() { - return threadCount; + public int getServerThreadCount() { + return serverThreadCount; } public int getMaxConnectionCount() { @@ -172,6 +176,10 @@ public String getDomainName() { return domainName; } + public Integer getClientThreadCount() { + return clientThreadCount; + } + public int getBatchSize() { return batchSize; } @@ -198,12 +206,12 @@ private void setRequestTimeout(final Integer requestTimeout) { } } - private void setThreadCount(final Integer threadCount) { - if (threadCount != null) { - if (threadCount <= 0) { - throw new IllegalArgumentException("Thread count must be a positive integer."); + private void setServerThreadCount(final Integer serverThreadCount) { + if (serverThreadCount != null) { + if (serverThreadCount <= 0) { + throw new IllegalArgumentException("Server thread count must be a positive integer."); } - this.threadCount = threadCount; + this.serverThreadCount = serverThreadCount; } } @@ -335,6 +343,15 @@ private void setStaticEndpoints(final List staticEndpoints) { } } + public void setClientThreadCount(final Integer clientThreadCount) { + if (clientThreadCount != null) { + if (clientThreadCount <= 0) { + throw new IllegalArgumentException("Client thread count must be a positive integer."); + } + this.clientThreadCount = clientThreadCount; + } + } + private void setBatchSize(final Integer batchSize) { if (batchSize != null) { if (batchSize <= 0) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java index 7f06de3b61..43d026af0f 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java @@ -9,6 +9,8 @@ import com.amazon.dataprepper.model.peerforwarder.RequiresPeerForwarding; import com.amazon.dataprepper.model.processor.Processor; import com.amazon.dataprepper.model.record.Record; +import org.opensearch.dataprepper.peerforwarder.exception.EmptyPeerForwarderPluginIdentificationKeysException; +import org.opensearch.dataprepper.peerforwarder.exception.UnsupportedPeerForwarderPluginException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,13 @@ public PeerForwardingProcessorDecorator(final Processor innerProcessor, if (innerProcessor instanceof RequiresPeerForwarding) { identificationKeys = ((RequiresPeerForwarding) innerProcessor).getIdentificationKeys(); } + else { + throw new UnsupportedPeerForwarderPluginException("Peer Forwarding is only supported for plugins which implement RequiresPeerForwarding interface."); + } + if (identificationKeys.isEmpty()) { + throw new EmptyPeerForwarderPluginIdentificationKeysException("Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId); + } + // TODO: remove this log message after implementing peer forwarder LOG.info("Peer Forwarder not implemented yet, processing events locally."); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/PeerForwarderClient.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/PeerForwarderClient.java index 7abae65980..5fdac72930 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/PeerForwarderClient.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/PeerForwarderClient.java @@ -11,35 +11,42 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpStatus; import org.opensearch.dataprepper.peerforwarder.PeerClientPool; import org.opensearch.dataprepper.peerforwarder.PeerForwarderClientFactory; +import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; +import org.opensearch.dataprepper.peerforwarder.model.WireEvent; +import org.opensearch.dataprepper.peerforwarder.model.WireEvents; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration.DEFAULT_PEER_FORWARDING_URI; + public class PeerForwarderClient { private static final Logger LOG = LoggerFactory.getLogger(PeerForwarderClient.class); - // TODO: Update the URI to something similar to "/event/forward" - private static final String URI = "/log/ingest"; - private static final int ASYNC_REQUEST_THREAD_COUNT = 200; - private final ObjectMapper objectMapper; + private final PeerForwarderConfiguration peerForwarderConfiguration; private final PeerForwarderClientFactory peerForwarderClientFactory; + private final ObjectMapper objectMapper; private ExecutorService executorService; private PeerClientPool peerClientPool; - public PeerForwarderClient(final ObjectMapper objectMapper, - final PeerForwarderClientFactory peerForwarderClientFactory) { - this.objectMapper = objectMapper; + public PeerForwarderClient(final PeerForwarderConfiguration peerForwarderConfiguration, + final PeerForwarderClientFactory peerForwarderClientFactory, + final ObjectMapper objectMapper) { + this.peerForwarderConfiguration = peerForwarderConfiguration; this.peerForwarderClientFactory = peerForwarderClientFactory; - executorService = Executors.newFixedThreadPool(ASYNC_REQUEST_THREAD_COUNT); + this.objectMapper = objectMapper; + executorService = Executors.newFixedThreadPool(peerForwarderConfiguration.getClientThreadCount()); } public AggregatedHttpResponse serializeRecordsAndSendHttpRequest( @@ -48,32 +55,48 @@ public AggregatedHttpResponse serializeRecordsAndSendHttpRequest( final String pluginId) { // TODO: decide the default values of peer forwarder configuration and move the PeerClientPool to constructor peerClientPool = peerForwarderClientFactory.setPeerClientPool(); - List wireEventList = new ArrayList<>(); - for (Record record : records) { - final Event event = record.getData(); - wireEventList.add(new WireEvent(event.getMetadata().getEventType(), event.toJsonString())); - } + final WebClient client = peerClientPool.getClient(ipAddress); + + final Optional serializedJsonString = getSerializedJsonString(records, pluginId); + return serializedJsonString.map(value -> { + final CompletableFuture aggregatedHttpResponseCompletableFuture = + processHttpRequest(client, value); + return getAggregatedHttpResponse(aggregatedHttpResponseCompletableFuture); + }) + .orElse(AggregatedHttpResponse.of(HttpStatus.BAD_REQUEST)); + } + private Optional getSerializedJsonString(final Collection> records, final String pluginId) { + final List wireEventList = getWireEventList(records); final WireEvents wireEvents = new WireEvents(wireEventList, pluginId); - String serializedJsonString; + String serializedJsonString = null; try { serializedJsonString = objectMapper.writeValueAsString(wireEvents); } catch (JsonProcessingException e) { - throw new RuntimeException(e); + LOG.warn("Unable to send request to peer, processing locally.", e); } - final WebClient client = peerClientPool.getClient(ipAddress); - final CompletableFuture aggregatedHttpResponseCompletableFuture = processHttpRequest(client, serializedJsonString); + return Optional.ofNullable(serializedJsonString); + } - AggregatedHttpResponse response = null; - try { - response = aggregatedHttpResponseCompletableFuture.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Problem with asynchronous peer forwarding", e); + private List getWireEventList(final Collection> records) { + final List wireEventList = new ArrayList<>(); + + for (final Record record : records) { + final Event event = record.getData(); + wireEventList.add(getWireEvent(event)); } + return wireEventList; + } - return response; + private WireEvent getWireEvent(final Event event) { + return new WireEvent( + event.getMetadata().getEventType(), + event.getMetadata().getTimeReceived(), + event.getMetadata().getAttributes(), + event.toJsonString() + ); } private CompletableFuture processHttpRequest(final WebClient client, final String content) { @@ -81,7 +104,7 @@ private CompletableFuture processHttpRequest(final WebCl return CompletableFuture.supplyAsync(() -> { try { - final CompletableFuture aggregate = client.post(URI, content).aggregate(); + final CompletableFuture aggregate = client.post(DEFAULT_PEER_FORWARDING_URI, content).aggregate(); return aggregate.join(); } catch (Exception e) { LOG.error("Failed to forward request to address: {}", authority, e); @@ -90,4 +113,13 @@ private CompletableFuture processHttpRequest(final WebCl }, executorService); } + private AggregatedHttpResponse getAggregatedHttpResponse(final CompletableFuture aggregatedHttpResponseCompletableFuture) { + try { + return aggregatedHttpResponseCompletableFuture.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Problem with asynchronous peer forwarding", e); + } + return AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE); + } + } \ No newline at end of file diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/WireEvent.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/WireEvent.java deleted file mode 100644 index 2d10cd8c1c..0000000000 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/WireEvent.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.peerforwarder.client; - -import org.opensearch.dataprepper.peerforwarder.PeerForwarder; - -/** - * A class for {@link com.amazon.dataprepper.model.event.EventType} and JSON representation of event data used by {@link PeerForwarder} - * - * @since 2.0 - */ -class WireEvent { - private final String eventType; - private final String eventData; - - public WireEvent(final String eventType, final String eventData) { - this.eventType = eventType; - this.eventData = eventData; - } - - public String getEventType() { - return eventType; - } - - public String getEventData() { - return eventData; - } -} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/exception/EmptyPeerForwarderPluginIdentificationKeysException.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/exception/EmptyPeerForwarderPluginIdentificationKeysException.java new file mode 100644 index 0000000000..2e7b532fe1 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/exception/EmptyPeerForwarderPluginIdentificationKeysException.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.peerforwarder.exception; + +/** + * This exception is thrown when processor which implements + * {@link com.amazon.dataprepper.model.peerforwarder.RequiresPeerForwarding} interface returns an empty set of identification keys. + * + * @since 2.0 + */ +public class EmptyPeerForwarderPluginIdentificationKeysException extends RuntimeException { + + public EmptyPeerForwarderPluginIdentificationKeysException(final String errorMessage) { + super(errorMessage); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/exception/UnsupportedPeerForwarderPluginException.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/exception/UnsupportedPeerForwarderPluginException.java new file mode 100644 index 0000000000..7d119530ad --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/exception/UnsupportedPeerForwarderPluginException.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.peerforwarder.exception; + +/** + * This exception is thrown when processor which doesn't implement + * {@link com.amazon.dataprepper.model.peerforwarder.RequiresPeerForwarding} interface is passed to + * {@link org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator}. + * + * @since 2.0 + */ +public class UnsupportedPeerForwarderPluginException extends RuntimeException { + + public UnsupportedPeerForwarderPluginException(final String errorMessage) { + super(errorMessage); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/model/WireEvent.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/model/WireEvent.java new file mode 100644 index 0000000000..e23a87a574 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/model/WireEvent.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.peerforwarder.model; + +import org.opensearch.dataprepper.peerforwarder.PeerForwarder; + +import java.time.Instant; +import java.util.Map; + +/** + * A class for {@link com.amazon.dataprepper.model.event.EventType} and JSON representation of event data used by {@link PeerForwarder} + * + * @since 2.0 + */ +public class WireEvent { + private String eventType; + private Instant eventTimeReceived; + private Map eventAttributes; + private String eventData; + + public WireEvent() { + } + + // TODO: Add a toJsonString method to EventMetadata and use that instead of metadata fields + + public WireEvent(final String eventType, + final Instant eventTimeReceived, + final Map eventAttributes, + final String eventData) { + this.eventType = eventType; + this.eventTimeReceived = eventTimeReceived; + this.eventAttributes = eventAttributes; + this.eventData = eventData; + } + + public String getEventType() { + return eventType; + } + + public Instant getEventTimeReceived() { + return eventTimeReceived; + } + + public Map getEventAttributes() { + return eventAttributes; + } + + public String getEventData() { + return eventData; + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/WireEvents.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/model/WireEvents.java similarity index 77% rename from data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/WireEvents.java rename to data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/model/WireEvents.java index 8470ea3012..98e6a3b6b3 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/client/WireEvents.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/model/WireEvents.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.peerforwarder.client; +package org.opensearch.dataprepper.peerforwarder.model; import org.opensearch.dataprepper.peerforwarder.PeerForwarder; @@ -14,9 +14,12 @@ * * @since 2.0 */ -class WireEvents { - private final List events; - private final String destinationPluginId; +public class WireEvents { + private List events; + private String destinationPluginId; + + public WireEvents() { + } public WireEvents(final List events, final String destinationPluginId) { this.events = events; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfigIT.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfigIT.java index bbbe75ec71..73800d763c 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfigIT.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfigIT.java @@ -33,13 +33,14 @@ void PeerForwarderConfiguration_default_values_test() { final PeerForwarderConfiguration objectUnderTest = createObjectUnderTest(); assertThat(objectUnderTest.getServerPort(), equalTo(21890)); assertThat(objectUnderTest.getRequestTimeout(), equalTo(10_000)); - assertThat(objectUnderTest.getThreadCount(), equalTo(200)); + assertThat(objectUnderTest.getServerThreadCount(), equalTo(200)); assertThat(objectUnderTest.getMaxConnectionCount(), equalTo(500)); assertThat(objectUnderTest.getMaxPendingRequests(), equalTo(1024)); assertThat(objectUnderTest.isSsl(), equalTo(true)); assertThat(objectUnderTest.getSslCertificateFile(), equalTo(null)); assertThat(objectUnderTest.getSslKeyFile(), equalTo(null)); assertThat(objectUnderTest.getDiscoveryMode(), equalTo(DiscoveryMode.STATIC)); + assertThat(objectUnderTest.getClientThreadCount(), equalTo(200)); assertThat(objectUnderTest.getBatchSize(), equalTo(48)); assertThat(objectUnderTest.getBufferSize(), equalTo(512)); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfigTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfigTest.java index c7c6a30c63..1776a93c49 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfigTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfigTest.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; @@ -49,7 +50,8 @@ void peerClientPool_should_return_test() { void peerForwarderClientFactory_should_return_test() { PeerForwarderClientFactory peerForwarderClientFactory = peerForwarderAppConfig.peerForwarderClientFactory( mock(PeerForwarderConfiguration.class), - mock(PeerClientPool.class) + mock(PeerClientPool.class), + mock(CertificateProviderFactory.class) ); assertThat(peerForwarderClientFactory, notNullValue()); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactoryTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactoryTest.java index c28fa2f093..7ffd0675f2 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactoryTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactoryTest.java @@ -15,6 +15,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.hamcrest.MatcherAssert.assertThat; import org.hamcrest.core.IsInstanceOf; +import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory; import org.opensearch.dataprepper.peerforwarder.discovery.DiscoveryMode; import static org.mockito.Mockito.when; @@ -28,8 +29,11 @@ class PeerForwarderClientFactoryTest { @Mock PeerClientPool peerClientPool; + @Mock + CertificateProviderFactory certificateProviderFactory; + private PeerForwarderClientFactory createObjectUnderTest() { - return new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool); + return new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool, certificateProviderFactory); } @Test diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfigurationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfigurationTest.java index 600a503ce1..1a6bedd88c 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfigurationTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfigurationTest.java @@ -36,13 +36,14 @@ void testPeerForwarderDefaultConfig() throws IOException { assertThat(peerForwarderConfiguration.getServerPort(), equalTo(21890)); assertThat(peerForwarderConfiguration.getRequestTimeout(), equalTo(10_000)); - assertThat(peerForwarderConfiguration.getThreadCount(), equalTo(200)); + assertThat(peerForwarderConfiguration.getServerThreadCount(), equalTo(200)); assertThat(peerForwarderConfiguration.getMaxConnectionCount(), equalTo(500)); assertThat(peerForwarderConfiguration.getMaxPendingRequests(), equalTo(1024)); assertThat(peerForwarderConfiguration.isSsl(), equalTo(false)); assertThat(peerForwarderConfiguration.getAcmPrivateKeyPassword(), equalTo(null)); assertThat(peerForwarderConfiguration.isUseAcmCertificateForSsl(), equalTo(false)); assertThat(peerForwarderConfiguration.getDiscoveryMode(), equalTo(DiscoveryMode.STATIC)); + assertThat(peerForwarderConfiguration.getClientThreadCount(), equalTo(200)); assertThat(peerForwarderConfiguration.getBatchSize(), equalTo(48)); assertThat(peerForwarderConfiguration.getBufferSize(), equalTo(512)); } @@ -53,7 +54,7 @@ void testValidPeerForwarderConfig() throws IOException { assertThat(peerForwarderConfiguration.getServerPort(), equalTo(21895)); assertThat(peerForwarderConfiguration.getRequestTimeout(), equalTo(1000)); - assertThat(peerForwarderConfiguration.getThreadCount(), equalTo(100)); + assertThat(peerForwarderConfiguration.getServerThreadCount(), equalTo(100)); assertThat(peerForwarderConfiguration.getMaxConnectionCount(), equalTo(100)); assertThat(peerForwarderConfiguration.getMaxPendingRequests(), equalTo(512)); assertThat(peerForwarderConfiguration.isSsl(), equalTo(false)); @@ -64,6 +65,7 @@ void testValidPeerForwarderConfig() throws IOException { assertThat(peerForwarderConfiguration.getDomainName(), equalTo(null)); assertThat(peerForwarderConfiguration.getAwsCloudMapNamespaceName(), equalTo(null)); assertThat(peerForwarderConfiguration.getAwsCloudMapServiceName(), equalTo(null)); + assertThat(peerForwarderConfiguration.getClientThreadCount(), equalTo(100)); assertThat(peerForwarderConfiguration.getBatchSize(), equalTo(100)); assertThat(peerForwarderConfiguration.getBufferSize(), equalTo(100)); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java index e3d9e781dc..60d692e9c6 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java @@ -15,18 +15,20 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.peerforwarder.exception.EmptyPeerForwarderPluginIdentificationKeysException; +import org.opensearch.dataprepper.peerforwarder.exception.UnsupportedPeerForwarderPluginException; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -39,7 +41,7 @@ class PeerForwardingProcessingDecoratorTest { @Mock Processor processor; - @Mock(extraInterfaces = Processor.class) + @Mock RequiresPeerForwarding requiresPeerForwarding; @Mock @@ -56,14 +58,15 @@ private PeerForwardingProcessorDecorator createObjectUnderTest(Processor process @Test void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentificationKeys() { - createObjectUnderTest((Processor) requiresPeerForwarding); + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Set.of(TEST_IDENTIFICATION_KEY)); + + createObjectUnderTest(requiresPeerForwarding); verify(requiresPeerForwarding).getIdentificationKeys(); } @Test void PeerForwardingProcessingDecorator_should_not_have_any_interactions_if_its_not_an_instance_of_RequiresPeerForwarding() { - createObjectUnderTest(processor); - verifyNoInteractions(processor); + assertThrows(UnsupportedPeerForwarderPluginException.class, () -> createObjectUnderTest(processor)); } @Test @@ -74,10 +77,9 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys); when(peerForwarder.forwardRecords(testData, identificationKeys, TEST_PLUGIN_ID)).thenReturn(testData); - Processor processor = (Processor) requiresPeerForwarding; - when(processor.execute(testData)).thenReturn(testData); + when(requiresPeerForwarding.execute(testData)).thenReturn(testData); - final PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(processor); + final PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(requiresPeerForwarding); final Collection> records = objectUnderTest.execute(testData); verify(requiresPeerForwarding).getIdentificationKeys(); @@ -87,33 +89,47 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc assertThat(records, equalTo(testData)); } + @Test + void PeerForwardingProcessingDecorator_execute_with_empty_identification_keys_should_throw() { + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Collections.emptySet()); + + assertThrows(EmptyPeerForwarderPluginIdentificationKeysException.class, () -> createObjectUnderTest(requiresPeerForwarding)); + } + @Test void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execute() { - PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(processor); + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Set.of(TEST_IDENTIFICATION_KEY)); + PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(requiresPeerForwarding); Collection> testData = Collections.singletonList(record); objectUnderTest.execute(testData); - verify(processor).execute(any(Collection.class)); + verify(requiresPeerForwarding).execute(anyCollection()); } @Test void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_processors_prepareForShutdown() { - PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(processor); + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Set.of(TEST_IDENTIFICATION_KEY)); + PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(requiresPeerForwarding); + objectUnderTest.prepareForShutdown(); - verify(processor).prepareForShutdown(); + verify(requiresPeerForwarding).prepareForShutdown(); } @Test void PeerForwardingProcessingDecorator_isReadyForShutdown_will_call_inner_processors_isReadyForShutdown() { - PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(processor); + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Set.of(TEST_IDENTIFICATION_KEY)); + PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(requiresPeerForwarding); + objectUnderTest.isReadyForShutdown(); - verify(processor).isReadyForShutdown(); + verify(requiresPeerForwarding).isReadyForShutdown(); } @Test void PeerForwardingProcessingDecorator_shutdown_will_call_inner_processors_shutdown() { - PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(processor); + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Set.of(TEST_IDENTIFICATION_KEY)); + PeerForwardingProcessorDecorator objectUnderTest = createObjectUnderTest(requiresPeerForwarding); + objectUnderTest.shutdown(); - verify(processor).shutdown(); + verify(requiresPeerForwarding).shutdown(); } } \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/client/PeerForwarderClientTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/client/PeerForwarderClientTest.java index bbb7a1aee8..09cb6b85fd 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/client/PeerForwarderClientTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/client/PeerForwarderClientTest.java @@ -9,7 +9,10 @@ import com.amazon.dataprepper.model.event.JacksonEvent; import com.amazon.dataprepper.model.log.JacksonLog; import com.amazon.dataprepper.model.record.Record; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.linecorp.armeria.client.ClientBuilder; import com.linecorp.armeria.client.Clients; import com.linecorp.armeria.client.WebClient; @@ -24,6 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.peerforwarder.PeerClientPool; import org.opensearch.dataprepper.peerforwarder.PeerForwarderClientFactory; +import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; +import org.opensearch.dataprepper.peerforwarder.model.WireEvents; import java.io.IOException; import java.io.OutputStream; @@ -39,7 +44,10 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration.DEFAULT_PEER_FORWARDING_URI; @ExtendWith(MockitoExtension.class) class PeerForwarderClientTest { @@ -47,7 +55,11 @@ class PeerForwarderClientTest { private static final String LOCAL_IP = "127.0.0.1"; private static final String TEST_PLUGIN_ID = "test_plugin_id"; - private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectMapper objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()); + + @Mock + PeerForwarderConfiguration peerForwarderConfiguration; @Mock PeerClientPool peerClientPool; @@ -55,8 +67,9 @@ class PeerForwarderClientTest { @Mock PeerForwarderClientFactory peerForwarderClientFactory; - PeerForwarderClient createObjectUnderTest() { - return new PeerForwarderClient(objectMapper, peerForwarderClientFactory); + PeerForwarderClient createObjectUnderTest(final ObjectMapper objectMapper) { + when(peerForwarderConfiguration.getClientThreadCount()).thenReturn(200); + return new PeerForwarderClient(peerForwarderConfiguration, peerForwarderClientFactory, objectMapper); } @Test @@ -64,14 +77,14 @@ void test_serializeRecordsAndSendHttpRequest_with_actual_client_and_server_shoul when(peerForwarderClientFactory.setPeerClientPool()).thenReturn(peerClientPool); final HttpServer server = createServer(2022); - server.createContext("/log/ingest", new TestHandler()); + server.createContext(DEFAULT_PEER_FORWARDING_URI, new TestHandler()); server.start(); final InetSocketAddress address = server.getAddress(); final WebClient testClient = getTestClient(String.valueOf(address.getPort())); when(peerClientPool.getClient(anyString())).thenReturn(testClient); - final PeerForwarderClient peerForwarderClient = createObjectUnderTest(); + final PeerForwarderClient peerForwarderClient = createObjectUnderTest(objectMapper); final AggregatedHttpResponse aggregatedHttpResponse = peerForwarderClient.serializeRecordsAndSendHttpRequest(generateBatchRecords(1), address.toString(), TEST_PLUGIN_ID); @@ -82,6 +95,24 @@ void test_serializeRecordsAndSendHttpRequest_with_actual_client_and_server_shoul server.stop(0); } + @Test + void test_serializeRecordsAndSendHttpRequest_with_bad_wireEvents_should_return_BAD_REQUEST() throws JsonProcessingException { + ObjectMapper objectMapper = mock(ObjectMapper.class); + objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + lenient().when(objectMapper.writeValueAsString(instanceOf(WireEvents.class))).thenThrow(JsonProcessingException.class); + + when(peerForwarderClientFactory.setPeerClientPool()).thenReturn(peerClientPool); + + final PeerForwarderClient peerForwarderClient = createObjectUnderTest(objectMapper); + + final AggregatedHttpResponse aggregatedHttpResponse = + peerForwarderClient.serializeRecordsAndSendHttpRequest(generateBatchRecords(1), LOCAL_IP, TEST_PLUGIN_ID); + + assertThat(aggregatedHttpResponse, notNullValue()); + assertThat(aggregatedHttpResponse, instanceOf(AggregatedHttpResponse.class)); + assertThat(aggregatedHttpResponse.status(), equalTo(HttpStatus.BAD_REQUEST)); + } + private Collection> generateBatchRecords(final int numRecords) { final Collection> results = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/exception/EmptyPeerForwarderPluginIdentificationKeysExceptionTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/exception/EmptyPeerForwarderPluginIdentificationKeysExceptionTest.java new file mode 100644 index 0000000000..c9b50d07a7 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/exception/EmptyPeerForwarderPluginIdentificationKeysExceptionTest.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.peerforwarder.exception; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class EmptyPeerForwarderPluginIdentificationKeysExceptionTest { + private String message; + + @BeforeEach + void setUp() { + message = UUID.randomUUID().toString(); + } + + private EmptyPeerForwarderPluginIdentificationKeysException createObjectUnderTest() { + return new EmptyPeerForwarderPluginIdentificationKeysException(message); + } + + @Test + void getMessage_returns_message() { + assertThat(createObjectUnderTest().getMessage(), equalTo(message)); + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/exception/UnsupportedPeerForwarderPluginExceptionTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/exception/UnsupportedPeerForwarderPluginExceptionTest.java new file mode 100644 index 0000000000..e679221cf4 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/exception/UnsupportedPeerForwarderPluginExceptionTest.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.peerforwarder.exception; + +import com.amazon.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class UnsupportedPeerForwarderPluginExceptionTest { + private String message; + + @BeforeEach + void setUp() { + message = UUID.randomUUID().toString(); + } + + private InvalidPluginConfigurationException createObjectUnderTest() { + return new InvalidPluginConfigurationException(message); + } + + @Test + void getMessage_returns_message() { + assertThat(createObjectUnderTest().getMessage(), equalTo(message)); + } +} diff --git a/data-prepper-core/src/test/resources/valid_peer_forwarder_config.yml b/data-prepper-core/src/test/resources/valid_peer_forwarder_config.yml index c38597867c..59ef210b50 100644 --- a/data-prepper-core/src/test/resources/valid_peer_forwarder_config.yml +++ b/data-prepper-core/src/test/resources/valid_peer_forwarder_config.yml @@ -1,10 +1,11 @@ port: 21895 request_timeout: 1000 -thread_count: 100 +server_thread_count: 100 max_connection_count: 100 max_pending_requests: 512 ssl: false use_acm_certificate_for_ssl: false discovery_mode: static +client_thread_count: 100 batch_size: 100 buffer_size: 100 \ No newline at end of file