Skip to content

Commit

Permalink
Added custom exceptions and code refactoring (opensearch-project#1698)
Browse files Browse the repository at this point in the history
* Added custom exceptions and code refactoring

Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed authored Aug 31, 2022
1 parent 1584744 commit 81d9760
Show file tree
Hide file tree
Showing 21 changed files with 365 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

package com.amazon.dataprepper.model.peerforwarder;

import com.amazon.dataprepper.model.processor.Processor;

import java.util.Set;

/**
* An interface that a {@link com.amazon.dataprepper.model.processor.Processor} will implement which must have peer forwarding prior to processing events.
* @since 2.0
*/
public interface RequiresPeerForwarding {
public interface RequiresPeerForwarding extends Processor {
/**
* Gets the identification keys which Peer Forwarder uses to allocate Events to specific Data Prepper nodes.
*
Expand Down
1 change: 1 addition & 0 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dependencies {
implementation 'software.amazon.awssdk:servicediscovery'
testImplementation "org.mockito:mockito-inline:${versionMap.mockito}"
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
Expand All @@ -30,18 +31,26 @@ public PeerClientPool peerClientPool() {
return new PeerClientPool();
}

@Bean
public CertificateProviderFactory certificateProviderFactory(final PeerForwarderConfiguration peerForwarderConfiguration) {
return new CertificateProviderFactory(peerForwarderConfiguration);
}

@Bean
public PeerForwarderClientFactory peerForwarderClientFactory(
final PeerForwarderConfiguration peerForwarderConfiguration,
final PeerClientPool peerClientPool
final PeerClientPool peerClientPool,
final CertificateProviderFactory certificateProviderFactory
) {
return new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool);
return new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool, certificateProviderFactory);
}

@Bean
public PeerForwarderClient peerForwarderClient(final ObjectMapper objectMapper,
final PeerForwarderClientFactory peerForwarderClientFactory) {
return new PeerForwarderClient(objectMapper, peerForwarderClientFactory);
public PeerForwarderClient peerForwarderClient(final PeerForwarderConfiguration peerForwarderConfiguration,
final PeerForwarderClientFactory peerForwarderClientFactory,
final ObjectMapper objectMapper
) {
return new PeerForwarderClient(peerForwarderConfiguration, peerForwarderClientFactory, objectMapper);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ public class PeerForwarderClientFactory {

private final PeerForwarderConfiguration peerForwarderConfiguration;
private final PeerClientPool peerClientPool;
private final CertificateProviderFactory certificateProviderFactory;

public PeerForwarderClientFactory(PeerForwarderConfiguration peerForwarderConfiguration,
PeerClientPool peerClientPool) {
public PeerForwarderClientFactory(final PeerForwarderConfiguration peerForwarderConfiguration,
final PeerClientPool peerClientPool,
final CertificateProviderFactory certificateProviderFactory) {
this.peerForwarderConfiguration = peerForwarderConfiguration;
this.peerClientPool = peerClientPool;
this.certificateProviderFactory = certificateProviderFactory;
}

public HashRing createHashRing() {
Expand All @@ -40,7 +43,6 @@ public PeerClientPool setPeerClientPool() {
if (ssl || useAcmCertForSsl) {
peerClientPool.setSsl(true);

final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration);
peerClientPool.setCertificate(certificateProviderFactory.getCertificateProvider().getCertificate());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
* @since 2.0
*/
public class PeerForwarderConfiguration {
public static final String DEFAULT_PEER_FORWARDING_URI = "/event/forward";
private static final String S3_PREFIX = "s3://";

private Integer serverPort = 21890;
private Integer requestTimeout = 10_000;
private Integer threadCount = 200;
private Integer serverThreadCount = 200;
private Integer maxConnectionCount = 500;
private Integer maxPendingRequests = 1024;
private boolean ssl = true;
Expand All @@ -42,6 +43,7 @@ public class PeerForwarderConfiguration {
private Map<String, String> awsCloudMapQueryParameters = Collections.emptyMap();
private String domainName;
private List<String> staticEndpoints = new ArrayList<>();
private Integer clientThreadCount = 200;
private Integer batchSize = 48;
private Integer bufferSize = 512;
private boolean sslCertAndKeyFileInS3;
Expand All @@ -52,7 +54,7 @@ public PeerForwarderConfiguration() {}
public PeerForwarderConfiguration (
@JsonProperty("port") final Integer serverPort,
@JsonProperty("request_timeout") final Integer requestTimeout,
@JsonProperty("thread_count") final Integer threadCount,
@JsonProperty("server_thread_count") final Integer serverThreadCount,
@JsonProperty("max_connection_count") final Integer maxConnectionCount,
@JsonProperty("max_pending_requests") final Integer maxPendingRequests,
@JsonProperty("ssl") final Boolean ssl,
Expand All @@ -69,12 +71,13 @@ public PeerForwarderConfiguration (
@JsonProperty("aws_cloud_map_query_parameters") final Map<String, String> awsCloudMapQueryParameters,
@JsonProperty("domain_name") final String domainName,
@JsonProperty("static_endpoints") final List<String> staticEndpoints,
@JsonProperty("client_thread_count") final Integer clientThreadCount,
@JsonProperty("batch_size") final Integer batchSize,
@JsonProperty("buffer_size") final Integer bufferSize
) {
setServerPort(serverPort);
setRequestTimeout(requestTimeout);
setThreadCount(threadCount);
setServerThreadCount(serverThreadCount);
setMaxConnectionCount(maxConnectionCount);
setMaxPendingRequests(maxPendingRequests);
setSsl(ssl);
Expand All @@ -91,6 +94,7 @@ public PeerForwarderConfiguration (
setAwsCloudMapQueryParameters(awsCloudMapQueryParameters);
setDomainName(domainName);
setStaticEndpoints(staticEndpoints);
setClientThreadCount(clientThreadCount);
setBatchSize(batchSize);
setBufferSize(bufferSize);
checkForCertAndKeyFileInS3();
Expand All @@ -104,8 +108,8 @@ public int getRequestTimeout() {
return requestTimeout;
}

public int getThreadCount() {
return threadCount;
public int getServerThreadCount() {
return serverThreadCount;
}

public int getMaxConnectionCount() {
Expand Down Expand Up @@ -172,6 +176,10 @@ public String getDomainName() {
return domainName;
}

public Integer getClientThreadCount() {
return clientThreadCount;
}

public int getBatchSize() {
return batchSize;
}
Expand All @@ -198,12 +206,12 @@ private void setRequestTimeout(final Integer requestTimeout) {
}
}

private void setThreadCount(final Integer threadCount) {
if (threadCount != null) {
if (threadCount <= 0) {
throw new IllegalArgumentException("Thread count must be a positive integer.");
private void setServerThreadCount(final Integer serverThreadCount) {
if (serverThreadCount != null) {
if (serverThreadCount <= 0) {
throw new IllegalArgumentException("Server thread count must be a positive integer.");
}
this.threadCount = threadCount;
this.serverThreadCount = serverThreadCount;
}
}

Expand Down Expand Up @@ -335,6 +343,15 @@ private void setStaticEndpoints(final List<String> staticEndpoints) {
}
}

public void setClientThreadCount(final Integer clientThreadCount) {
if (clientThreadCount != null) {
if (clientThreadCount <= 0) {
throw new IllegalArgumentException("Client thread count must be a positive integer.");
}
this.clientThreadCount = clientThreadCount;
}
}

private void setBatchSize(final Integer batchSize) {
if (batchSize != null) {
if (batchSize <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.amazon.dataprepper.model.peerforwarder.RequiresPeerForwarding;
import com.amazon.dataprepper.model.processor.Processor;
import com.amazon.dataprepper.model.record.Record;
import org.opensearch.dataprepper.peerforwarder.exception.EmptyPeerForwarderPluginIdentificationKeysException;
import org.opensearch.dataprepper.peerforwarder.exception.UnsupportedPeerForwarderPluginException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,6 +36,13 @@ public PeerForwardingProcessorDecorator(final Processor innerProcessor,
if (innerProcessor instanceof RequiresPeerForwarding) {
identificationKeys = ((RequiresPeerForwarding) innerProcessor).getIdentificationKeys();
}
else {
throw new UnsupportedPeerForwarderPluginException("Peer Forwarding is only supported for plugins which implement RequiresPeerForwarding interface.");
}
if (identificationKeys.isEmpty()) {
throw new EmptyPeerForwarderPluginIdentificationKeysException("Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId);
}
// TODO: remove this log message after implementing peer forwarder
LOG.info("Peer Forwarder not implemented yet, processing events locally.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,42 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import org.opensearch.dataprepper.peerforwarder.PeerClientPool;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderClientFactory;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration;
import org.opensearch.dataprepper.peerforwarder.model.WireEvent;
import org.opensearch.dataprepper.peerforwarder.model.WireEvents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration.DEFAULT_PEER_FORWARDING_URI;

public class PeerForwarderClient {
private static final Logger LOG = LoggerFactory.getLogger(PeerForwarderClient.class);
// TODO: Update the URI to something similar to "/event/forward"
private static final String URI = "/log/ingest";
private static final int ASYNC_REQUEST_THREAD_COUNT = 200;

private final ObjectMapper objectMapper;
private final PeerForwarderConfiguration peerForwarderConfiguration;
private final PeerForwarderClientFactory peerForwarderClientFactory;
private final ObjectMapper objectMapper;
private ExecutorService executorService;
private PeerClientPool peerClientPool;

public PeerForwarderClient(final ObjectMapper objectMapper,
final PeerForwarderClientFactory peerForwarderClientFactory) {
this.objectMapper = objectMapper;
public PeerForwarderClient(final PeerForwarderConfiguration peerForwarderConfiguration,
final PeerForwarderClientFactory peerForwarderClientFactory,
final ObjectMapper objectMapper) {
this.peerForwarderConfiguration = peerForwarderConfiguration;
this.peerForwarderClientFactory = peerForwarderClientFactory;
executorService = Executors.newFixedThreadPool(ASYNC_REQUEST_THREAD_COUNT);
this.objectMapper = objectMapper;
executorService = Executors.newFixedThreadPool(peerForwarderConfiguration.getClientThreadCount());
}

public AggregatedHttpResponse serializeRecordsAndSendHttpRequest(
Expand All @@ -48,40 +55,56 @@ public AggregatedHttpResponse serializeRecordsAndSendHttpRequest(
final String pluginId) {
// TODO: decide the default values of peer forwarder configuration and move the PeerClientPool to constructor
peerClientPool = peerForwarderClientFactory.setPeerClientPool();
List<WireEvent> wireEventList = new ArrayList<>();

for (Record<Event> record : records) {
final Event event = record.getData();
wireEventList.add(new WireEvent(event.getMetadata().getEventType(), event.toJsonString()));
}
final WebClient client = peerClientPool.getClient(ipAddress);

final Optional<String> serializedJsonString = getSerializedJsonString(records, pluginId);
return serializedJsonString.map(value -> {
final CompletableFuture<AggregatedHttpResponse> aggregatedHttpResponseCompletableFuture =
processHttpRequest(client, value);
return getAggregatedHttpResponse(aggregatedHttpResponseCompletableFuture);
})
.orElse(AggregatedHttpResponse.of(HttpStatus.BAD_REQUEST));
}

private Optional<String> getSerializedJsonString(final Collection<Record<Event>> records, final String pluginId) {
final List<WireEvent> wireEventList = getWireEventList(records);
final WireEvents wireEvents = new WireEvents(wireEventList, pluginId);

String serializedJsonString;
String serializedJsonString = null;
try {
serializedJsonString = objectMapper.writeValueAsString(wireEvents);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
LOG.warn("Unable to send request to peer, processing locally.", e);
}
final WebClient client = peerClientPool.getClient(ipAddress);
final CompletableFuture<AggregatedHttpResponse> aggregatedHttpResponseCompletableFuture = processHttpRequest(client, serializedJsonString);
return Optional.ofNullable(serializedJsonString);
}

AggregatedHttpResponse response = null;
try {
response = aggregatedHttpResponseCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Problem with asynchronous peer forwarding", e);
private List<WireEvent> getWireEventList(final Collection<Record<Event>> records) {
final List<WireEvent> wireEventList = new ArrayList<>();

for (final Record<Event> record : records) {
final Event event = record.getData();
wireEventList.add(getWireEvent(event));
}
return wireEventList;
}

return response;
private WireEvent getWireEvent(final Event event) {
return new WireEvent(
event.getMetadata().getEventType(),
event.getMetadata().getTimeReceived(),
event.getMetadata().getAttributes(),
event.toJsonString()
);
}

private CompletableFuture<AggregatedHttpResponse> processHttpRequest(final WebClient client, final String content) {
final String authority = client.uri().getAuthority();
return CompletableFuture.supplyAsync(() ->
{
try {
final CompletableFuture<AggregatedHttpResponse> aggregate = client.post(URI, content).aggregate();
final CompletableFuture<AggregatedHttpResponse> aggregate = client.post(DEFAULT_PEER_FORWARDING_URI, content).aggregate();
return aggregate.join();
} catch (Exception e) {
LOG.error("Failed to forward request to address: {}", authority, e);
Expand All @@ -90,4 +113,13 @@ private CompletableFuture<AggregatedHttpResponse> processHttpRequest(final WebCl
}, executorService);
}

private AggregatedHttpResponse getAggregatedHttpResponse(final CompletableFuture<AggregatedHttpResponse> aggregatedHttpResponseCompletableFuture) {
try {
return aggregatedHttpResponseCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Problem with asynchronous peer forwarding", e);
}
return AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);
}

}

This file was deleted.

Loading

0 comments on commit 81d9760

Please sign in to comment.