From 7e2331f1af4e646e49047de3bb90d642fc69dc46 Mon Sep 17 00:00:00 2001 From: David Venable Date: Fri, 16 Sep 2022 15:19:03 -0500 Subject: [PATCH] Support Mutual TLS authentication for Core Peer Forwarding. Resolves #1758. (#1771) Signed-off-by: David Venable --- .../ForwardingAuthentication.java | 38 ++++ .../peerforwarder/PeerClientPool.java | 26 ++- .../PeerForwarderClientFactory.java | 2 + .../PeerForwarderConfiguration.java | 28 +++ .../PeerForwarderHttpServerProvider.java | 16 +- .../ForwardingAuthenticationTest.java | 65 +++++++ .../peerforwarder/PeerClientPoolTest.java | 23 +++ .../PeerForwarderClientFactoryTest.java | 16 +- .../PeerForwarderConfigurationTest.java | 22 ++- .../PeerForwarder_ClientServerIT.java | 175 +++++++++++++----- ...warder_config_with_many_authentication.yml | 16 ++ ...rwarder_config_with_mutual_tls_not_ssl.yml | 15 ++ .../src/test/resources/test-alternate-crt.crt | 19 ++ .../src/test/resources/test-alternate-key.key | 28 +++ ..._peer_forwarder_config_with_mutual_tls.yml | 15 ++ ..._forwarder_config_with_unauthenticated.yml | 13 ++ 16 files changed, 458 insertions(+), 59 deletions(-) create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/ForwardingAuthentication.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/ForwardingAuthenticationTest.java create mode 100644 data-prepper-core/src/test/resources/invalid_peer_forwarder_config_with_many_authentication.yml create mode 100644 data-prepper-core/src/test/resources/invalid_peer_forwarder_config_with_mutual_tls_not_ssl.yml create mode 100644 data-prepper-core/src/test/resources/test-alternate-crt.crt create mode 100644 data-prepper-core/src/test/resources/test-alternate-key.key create mode 100644 data-prepper-core/src/test/resources/valid_peer_forwarder_config_with_mutual_tls.yml create mode 100644 data-prepper-core/src/test/resources/valid_peer_forwarder_config_with_unauthenticated.yml diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/ForwardingAuthentication.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/ForwardingAuthentication.java new file mode 100644 index 0000000000..78ec756e90 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/ForwardingAuthentication.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.peerforwarder; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public enum ForwardingAuthentication { + MUTUAL_TLS("mutual_tls"), + UNAUTHENTICATED("unauthenticated"); + + private static final Map STRING_NAME_TO_ENUM_MAP = new HashMap<>(); + + private final String name; + + static { + Arrays.stream(ForwardingAuthentication.values()) + .forEach(enumValue -> STRING_NAME_TO_ENUM_MAP.put(enumValue.name, enumValue)); + } + + ForwardingAuthentication(final String name){ + this.name = name; + } + + public String getName(){ + return name; + } + + static ForwardingAuthentication getByName(final String name) { + return Optional.ofNullable(STRING_NAME_TO_ENUM_MAP.get(name)) + .orElseThrow(() -> new IllegalArgumentException("Unrecognized ForwardingAuthentication: " + name)); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerClientPool.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerClientPool.java index 523a4d9da9..44ad2bda26 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerClientPool.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerClientPool.java @@ -5,11 +5,12 @@ package org.opensearch.dataprepper.peerforwarder; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import com.linecorp.armeria.client.ClientBuilder; import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.ClientFactoryBuilder; import com.linecorp.armeria.client.Clients; import com.linecorp.armeria.client.WebClient; +import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; @@ -26,6 +27,7 @@ public class PeerClientPool { private int clientTimeoutSeconds = 3; private boolean ssl; private Certificate certificate; + private ForwardingAuthentication authentication; public PeerClientPool() { peerClients = new ConcurrentHashMap<>(); @@ -47,6 +49,10 @@ public void setCertificate(final Certificate certificate) { this.certificate = certificate; } + public void setAuthentication(ForwardingAuthentication authentication) { + this.authentication = authentication; + } + public WebClient getClient(final String address) { return peerClients.computeIfAbsent(address, this::getHTTPClient); } @@ -58,14 +64,20 @@ private WebClient getHTTPClient(final String ipAddress) { .writeTimeout(Duration.ofSeconds(clientTimeoutSeconds)); if (ssl) { - final ClientFactory clientFactory = ClientFactory.builder() + final ClientFactoryBuilder clientFactoryBuilder = ClientFactory.builder() .tlsCustomizer(sslContextBuilder -> sslContextBuilder.trustManager( - new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)) + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)) ) - ).tlsNoVerifyHosts(ipAddress) - .build(); - - clientBuilder = clientBuilder.factory(clientFactory); + ) + .tlsNoVerifyHosts(ipAddress); + // TODO: Add keyManager configuration here + if (authentication == ForwardingAuthentication.MUTUAL_TLS) { + clientFactoryBuilder.tlsCustomizer(sslContextBuilder -> sslContextBuilder.keyManager( + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)) + )); + } + clientBuilder = clientBuilder.factory(clientFactoryBuilder.build()); } return clientBuilder.build(WebClient.class); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactory.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactory.java index c27c3ab86e..f29287324f 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactory.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactory.java @@ -40,6 +40,8 @@ public PeerClientPool setPeerClientPool() { final boolean ssl = peerForwarderConfiguration.isSsl(); final boolean useAcmCertForSsl = peerForwarderConfiguration.isUseAcmCertificateForSsl(); + peerClientPool.setAuthentication(peerForwarderConfiguration.getAuthentication()); + if (ssl || useAcmCertForSsl) { peerClientPool.setSsl(true); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfiguration.java index 7bdb2e3e60..e527f685f7 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfiguration.java @@ -32,6 +32,7 @@ public class PeerForwarderConfiguration { private boolean ssl = false; private String sslCertificateFile; private String sslKeyFile; + private ForwardingAuthentication authentication = ForwardingAuthentication.UNAUTHENTICATED; private boolean useAcmCertificateForSsl = false; private String acmCertificateArn; private String acmPrivateKeyPassword; @@ -60,6 +61,7 @@ public PeerForwarderConfiguration ( @JsonProperty("ssl") final Boolean ssl, @JsonProperty("ssl_certificate_file") final String sslCertificateFile, @JsonProperty("ssl_key_file") final String sslKeyFile, + @JsonProperty("authentication") final Map authentication, @JsonProperty("use_acm_certificate_for_ssl") final Boolean useAcmCertificateForSsl, @JsonProperty("acm_certificate_arn") final String acmCertificateArn, @JsonProperty("acm_private_key_password") final String acmPrivateKeyPassword, @@ -84,6 +86,7 @@ public PeerForwarderConfiguration ( setUseAcmCertificateForSsl(useAcmCertificateForSsl); setSslCertificateFile(sslCertificateFile); setSslKeyFile(sslKeyFile); + setAuthentication(authentication); setAcmCertificateArn(acmCertificateArn); this.acmPrivateKeyPassword = acmPrivateKeyPassword; setAcmCertificateTimeoutMillis(acmCertificateTimeoutMillis); @@ -98,6 +101,7 @@ public PeerForwarderConfiguration ( setBatchSize(batchSize); setBufferSize(bufferSize); checkForCertAndKeyFileInS3(); + validateSslAndAuthentication(); } public int getServerPort() { @@ -259,6 +263,25 @@ private void setSslKeyFile(final String sslKeyFile) { } } + private void setAuthentication(final Map authentication) { + if(authentication == null) + return; + + if (authentication.isEmpty()) + return; + + if (authentication.size() > 1) + throw new IllegalArgumentException("Invalid authentication configuration."); + + final String authenticationName = authentication.keySet().iterator().next(); + + this.authentication = ForwardingAuthentication.getByName(authenticationName); + } + + public ForwardingAuthentication getAuthentication() { + return authentication; + } + private void setUseAcmCertificateForSsl(final Boolean useAcmCertificateForSsl) { if (useAcmCertificateForSsl != null) { this.useAcmCertificateForSsl = useAcmCertificateForSsl; @@ -382,4 +405,9 @@ private void checkForCertAndKeyFileInS3() { public boolean isSslCertAndKeyFileInS3() { return sslCertAndKeyFileInS3; } + + private void validateSslAndAuthentication() { + if(authentication == ForwardingAuthentication.MUTUAL_TLS && !ssl) + throw new IllegalArgumentException("Mutual TLS is only available when SSL is enabled."); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/server/PeerForwarderHttpServerProvider.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/server/PeerForwarderHttpServerProvider.java index eb234a3b34..7d426f39bd 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/server/PeerForwarderHttpServerProvider.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/server/PeerForwarderHttpServerProvider.java @@ -5,12 +5,14 @@ package org.opensearch.dataprepper.peerforwarder.server; -import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; +import io.netty.handler.ssl.ClientAuth; +import org.opensearch.dataprepper.peerforwarder.ForwardingAuthentication; import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,9 +49,9 @@ public Server get() { sb.disableServerHeader(); if (peerForwarderConfiguration.isSsl()) { - LOG.info("Creating http source with SSL/TLS enabled."); final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); final Certificate certificate = certificateProvider.getCertificate(); + LOG.info("Creating http source with SSL/TLS enabled."); // TODO: enable encrypted key with password sb.https(peerForwarderConfiguration.getServerPort()) .tls( @@ -57,11 +59,19 @@ public Server get() { new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) ) ); + + if (peerForwarderConfiguration.getAuthentication() == ForwardingAuthentication.MUTUAL_TLS) { + sb.tlsCustomizer(sslContextBuilder -> sslContextBuilder.trustManager( + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)) + ) + .clientAuth(ClientAuth.REQUIRE)); + } } else { LOG.warn("Creating Peer Forwarder server without SSL/TLS. This is not secure."); sb.http(peerForwarderConfiguration.getServerPort()); } + sb.maxNumConnections(peerForwarderConfiguration.getMaxConnectionCount()); sb.requestTimeout(Duration.ofMillis(peerForwarderConfiguration.getRequestTimeout())); final int threadCount = peerForwarderConfiguration.getServerThreadCount(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/ForwardingAuthenticationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/ForwardingAuthenticationTest.java new file mode 100644 index 0000000000..06b6c090b6 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/ForwardingAuthenticationTest.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.peerforwarder; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.UUID; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class ForwardingAuthenticationTest { + + @ParameterizedTest + @ArgumentsSource(EnumToStringNameArgumentsProvider.class) + void getValue_returns_expected_value (final ForwardingAuthentication enumValue, final String expectedName) { + assertThat(enumValue.getName(), equalTo(expectedName)); + } + + @ParameterizedTest + @EnumSource(ForwardingAuthentication.class) + void getByName_returns_correct_enum_from_expected_name(final ForwardingAuthentication enumValue) { + + final String stringName = enumValue.getName(); + + assertThat(ForwardingAuthentication.getByName(stringName), equalTo(enumValue)); + } + + @Test + void getByName_throws_for_null() { + assertThrows(IllegalArgumentException.class, () -> ForwardingAuthentication.getByName(null)); + } + + @Test + void getByName_throws_for_empty_string() { + assertThrows(IllegalArgumentException.class, () -> ForwardingAuthentication.getByName("")); + } + + @Test + void getByName_throws_for_unrecognized_non_empty_name() { + assertThrows(IllegalArgumentException.class, () -> ForwardingAuthentication.getByName(UUID.randomUUID().toString())); + } + + private static class EnumToStringNameArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(ForwardingAuthentication.MUTUAL_TLS, "mutual_tls"), + arguments(ForwardingAuthentication.UNAUTHENTICATED, "unauthenticated") + ); + } + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerClientPoolTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerClientPoolTest.java index f1f056341b..a746d78609 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerClientPoolTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerClientPoolTest.java @@ -21,6 +21,7 @@ import java.util.Objects; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; @ExtendWith(MockitoExtension.class) @@ -62,4 +63,26 @@ void testGetClientWithSSL(final String address) throws IOException { assertThat(client.uri(), equalTo(URI.create("https://" + address + ":" + PORT + "/"))); } + @ParameterizedTest + @ValueSource(strings = {VALID_ADDRESS, LOCALHOST}) + void testGetClientWithMutualTls(final String address) throws IOException { + final PeerClientPool objectUnderTest = new PeerClientPool(); + objectUnderTest.setSsl(true); + objectUnderTest.setPort(PORT); + objectUnderTest.setAuthentication(ForwardingAuthentication.MUTUAL_TLS); + + final Path certFilePath = new File(Objects.requireNonNull(PeerClientPoolTest.class.getClassLoader().getResource("test-crt.crt")).getFile()).toPath(); + final Path keyFilePath = new File(Objects.requireNonNull(PeerClientPoolTest.class.getClassLoader().getResource("test-key.key")).getFile()).toPath(); + final String certAsString = Files.readString(certFilePath); + final String keyAsString = Files.readString(keyFilePath); + final Certificate certificate = new Certificate(certAsString, keyAsString); + + objectUnderTest.setCertificate(certificate); + + final WebClient client = objectUnderTest.getClient(address); + + assertThat(client, notNullValue()); + assertThat(client.uri(), equalTo(URI.create("https://" + address + ":" + PORT + "/"))); + } + } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactoryTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactoryTest.java index 7ffd0675f2..d485cc5962 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactoryTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderClientFactoryTest.java @@ -7,17 +7,21 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import java.util.Collections; +import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.hamcrest.MatcherAssert.assertThat; import org.hamcrest.core.IsInstanceOf; import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory; import org.opensearch.dataprepper.peerforwarder.discovery.DiscoveryMode; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -58,10 +62,16 @@ void testCreateHashRing_without_endpoints_should_throw() { void testCreatePeerClientPool_should_return() { PeerForwarderClientFactory peerForwarderClientFactory = createObjectUnderTest(); - PeerClientPool peerClientPool = peerForwarderClientFactory.setPeerClientPool(); + PeerClientPool returnedPeerClientPool = peerForwarderClientFactory.setPeerClientPool(); - assertThat(peerClientPool, new IsInstanceOf(PeerClientPool.class)); + assertThat(returnedPeerClientPool, equalTo(peerClientPool)); } - + @ParameterizedTest + @EnumSource(ForwardingAuthentication.class) + void testCreatePeerClientPool_should_set_the_authentication(final ForwardingAuthentication authentication) { + when(peerForwarderConfiguration.getAuthentication()).thenReturn(authentication); + createObjectUnderTest().setPeerClientPool(); + verify(peerClientPool).setAuthentication(authentication); + } } \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfigurationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfigurationTest.java index 8bb3f1563d..9a624c2a24 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfigurationTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderConfigurationTest.java @@ -46,6 +46,7 @@ void testPeerForwarderDefaultConfig() throws IOException { assertThat(peerForwarderConfiguration.getClientThreadCount(), equalTo(200)); assertThat(peerForwarderConfiguration.getBatchSize(), equalTo(48)); assertThat(peerForwarderConfiguration.getBufferSize(), equalTo(512)); + assertThat(peerForwarderConfiguration.getAuthentication(), equalTo(ForwardingAuthentication.UNAUTHENTICATED)); } @Test @@ -68,6 +69,23 @@ void testValidPeerForwarderConfig() throws IOException { assertThat(peerForwarderConfiguration.getClientThreadCount(), equalTo(100)); assertThat(peerForwarderConfiguration.getBatchSize(), equalTo(100)); assertThat(peerForwarderConfiguration.getBufferSize(), equalTo(100)); + assertThat(peerForwarderConfiguration.getAuthentication(), equalTo(ForwardingAuthentication.UNAUTHENTICATED)); + } + + @Test + void testValidPeerForwarderConfig_with_Mutual_TLS() throws IOException { + final PeerForwarderConfiguration peerForwarderConfiguration = makeConfig("src/test/resources/valid_peer_forwarder_config_with_mutual_tls.yml"); + + assertThat(peerForwarderConfiguration.isSsl(), equalTo(true)); + assertThat(peerForwarderConfiguration.getAuthentication(), equalTo(ForwardingAuthentication.MUTUAL_TLS)); + } + + @Test + void testValidPeerForwarderConfig_with_Unauthenticated() throws IOException { + final PeerForwarderConfiguration peerForwarderConfiguration = makeConfig("src/test/resources/valid_peer_forwarder_config_with_unauthenticated.yml"); + + assertThat(peerForwarderConfiguration.isSsl(), equalTo(false)); + assertThat(peerForwarderConfiguration.getAuthentication(), equalTo(ForwardingAuthentication.UNAUTHENTICATED)); } @Test @@ -93,7 +111,9 @@ void test_with_acm_should_create_PeerForwarderConfiguration_object_even_with_nul TestDataProvider.INVALID_PEER_FORWARDER_WITH_CLOUD_MAP_WITHOUT_NAMESPACE_NAME_CONFIG_FILE, TestDataProvider.INVALID_PEER_FORWARDER_WITH_CLOUD_MAP_WITHOUT_REGION_CONFIG_FILE, TestDataProvider.INVALID_PEER_FORWARDER_WITH_DNS_WITHOUT_DOMAIN_NAME_CONFIG_FILE, - TestDataProvider.INVALID_PEER_FORWARDER_WITH_SSL + TestDataProvider.INVALID_PEER_FORWARDER_WITH_SSL, + "src/test/resources/invalid_peer_forwarder_config_with_many_authentication.yml", + "src/test/resources/invalid_peer_forwarder_config_with_mutual_tls_not_ssl.yml" }) void invalid_InvalidPeerForwarderConfig_test(final String filePath) { assertThrows(ValueInstantiationException.class, () -> makeConfig(filePath)); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarder_ClientServerIT.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarder_ClientServerIT.java index 1cf771a1ae..e824583647 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarder_ClientServerIT.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarder_ClientServerIT.java @@ -43,8 +43,10 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -53,11 +55,14 @@ */ class PeerForwarder_ClientServerIT { - public static final String LOCALHOST = "127.0.0.1"; + private static final String LOCALHOST = "127.0.0.1"; + private static final String SSL_CERTIFICATE_FILE = "src/test/resources/test-crt.crt"; + private static final String SSL_KEY_FILE = "src/test/resources/test-key.key"; private ObjectMapper objectMapper; private String pipelineName; private String pluginId; private List> outgoingRecords; + private Set expectedMessages; @BeforeEach void setUp() { @@ -70,6 +75,11 @@ void setUp() { .collect(Collectors.toList()); pipelineName = UUID.randomUUID().toString(); pluginId = UUID.randomUUID().toString(); + + expectedMessages = outgoingRecords.stream() + .map(Record::getData) + .map(e -> e.get("message", String.class)) + .collect(Collectors.toSet()); } private PeerForwarderServer createServer( @@ -90,36 +100,43 @@ private PeerForwarderServer createServer( private PeerForwarderProvider createPeerForwarderProvider( final PeerForwarderConfiguration peerForwarderConfiguration, final CertificateProviderFactory certificateProviderFactory) { - final PeerForwarderClient clientForProvider = createClient(peerForwarderConfiguration, certificateProviderFactory); + final PeerForwarderClient clientForProvider = createClient(peerForwarderConfiguration); final PeerClientPool peerClientPool = new PeerClientPool(); final PeerForwarderClientFactory clientFactoryForProvider = new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool, certificateProviderFactory); return new PeerForwarderProvider(clientFactoryForProvider, clientForProvider, peerForwarderConfiguration); } private PeerForwarderClient createClient( - final PeerForwarderConfiguration peerForwarderConfiguration, - final CertificateProviderFactory certificateProviderFactory) { + final PeerForwarderConfiguration peerForwarderConfiguration) { Objects.requireNonNull(peerForwarderConfiguration, "Nested classes must supply peerForwarderConfiguration"); - Objects.requireNonNull(certificateProviderFactory, "Nested classes must supply certificateProviderFactory"); + final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); final PeerClientPool peerClientPool = new PeerClientPool(); final PeerForwarderClientFactory peerForwarderClientFactory = new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool, certificateProviderFactory); peerForwarderClientFactory.setPeerClientPool(); return new PeerForwarderClient(peerForwarderConfiguration, peerForwarderClientFactory, objectMapper); } + private Collection> getServerSideRecords(final PeerForwarderProvider peerForwarderProvider) { + final Map>> pluginBufferMap = peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap().get(pipelineName); + assertThat(pluginBufferMap, notNullValue()); + final PeerForwarderReceiveBuffer> receiveBuffer = pluginBufferMap.get(pluginId); + + final Map.Entry>, CheckpointState> bufferEntry = receiveBuffer.read(1000); + return bufferEntry.getKey(); + } + @Nested class WithSSL { private PeerForwarderConfiguration peerForwarderConfiguration; - private CertificateProviderFactory certificateProviderFactory; private PeerForwarderServer server; private PeerForwarderProvider peerForwarderProvider; @BeforeEach void setUp() { - peerForwarderConfiguration = createConfiguration(true); + peerForwarderConfiguration = createConfiguration(true, ForwardingAuthentication.UNAUTHENTICATED); - certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); + final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); peerForwarderProvider = createPeerForwarderProvider(peerForwarderConfiguration, certificateProviderFactory); peerForwarderProvider.register(pipelineName, pluginId, Collections.singleton(UUID.randomUUID().toString())); server = createServer(peerForwarderConfiguration, certificateProviderFactory, peerForwarderProvider); @@ -133,18 +150,13 @@ void tearDown() { @Test void send_Events_to_server() { - final PeerForwarderClient client = createClient(peerForwarderConfiguration, certificateProviderFactory); + final PeerForwarderClient client = createClient(peerForwarderConfiguration); final AggregatedHttpResponse httpResponse = client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, pluginId, pipelineName); assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); - final Map>> pluginBufferMap = peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap().get(pipelineName); - assertThat(pluginBufferMap, notNullValue()); - final PeerForwarderReceiveBuffer> receiveBuffer = pluginBufferMap.get(pluginId); - - final Map.Entry>, CheckpointState> bufferEntry = receiveBuffer.read(1000); - final Collection> receivedRecords = bufferEntry.getKey(); + final Collection> receivedRecords = getServerSideRecords(peerForwarderProvider); assertThat(receivedRecords, notNullValue()); assertThat(receivedRecords.size(), equalTo(outgoingRecords.size())); @@ -158,22 +170,20 @@ void send_Events_to_server() { receivedMessages.add(message); } - final Set expectedMessages = outgoingRecords.stream() - .map(Record::getData) - .map(e -> e.get("message", String.class)) - .collect(Collectors.toSet()); - assertThat(receivedMessages, equalTo(expectedMessages)); } @Test void send_Events_to_server_when_client_does_not_expect_SSL_should_throw() { - final PeerForwarderConfiguration peerForwarderConfiguration = createConfiguration(false); - certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); + final PeerForwarderConfiguration peerForwarderConfiguration = createConfiguration(false, ForwardingAuthentication.UNAUTHENTICATED); - final PeerForwarderClient client = createClient(peerForwarderConfiguration, certificateProviderFactory); + final PeerForwarderClient client = createClient(peerForwarderConfiguration); assertThrows(ClosedSessionException.class, () -> client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, pluginId, pipelineName)); + + final Collection> receivedRecords = getServerSideRecords(peerForwarderProvider); + assertThat(receivedRecords, notNullValue()); + assertThat(receivedRecords, is(empty())); } } @@ -181,15 +191,14 @@ void send_Events_to_server_when_client_does_not_expect_SSL_should_throw() { class WithoutSSL { private PeerForwarderConfiguration peerForwarderConfiguration; - private CertificateProviderFactory certificateProviderFactory; private PeerForwarderServer server; private PeerForwarderProvider peerForwarderProvider; @BeforeEach void setUp() { - peerForwarderConfiguration = createConfiguration(false); + peerForwarderConfiguration = createConfiguration(false, ForwardingAuthentication.UNAUTHENTICATED); - certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); + final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); peerForwarderProvider = createPeerForwarderProvider(peerForwarderConfiguration, certificateProviderFactory); peerForwarderProvider.register(pipelineName, pluginId, Collections.singleton(UUID.randomUUID().toString())); server = createServer(peerForwarderConfiguration, certificateProviderFactory, peerForwarderProvider); @@ -203,18 +212,13 @@ void tearDown() { @Test void send_Events_to_server() { - final PeerForwarderClient client = createClient(peerForwarderConfiguration, certificateProviderFactory); + final PeerForwarderClient client = createClient(peerForwarderConfiguration); final AggregatedHttpResponse httpResponse = client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, pluginId, pipelineName); assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); - final Map>> pluginBufferMap = peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap().get(pipelineName); - assertThat(pluginBufferMap, notNullValue()); - final PeerForwarderReceiveBuffer> receiveBuffer = pluginBufferMap.get(pluginId); - - final Map.Entry>, CheckpointState> bufferEntry = receiveBuffer.read(1000); - final Collection> receivedRecords = bufferEntry.getKey(); + final Collection> receivedRecords = getServerSideRecords(peerForwarderProvider); assertThat(receivedRecords, notNullValue()); assertThat(receivedRecords.size(), equalTo(outgoingRecords.size())); @@ -228,30 +232,110 @@ void send_Events_to_server() { receivedMessages.add(message); } - final Set expectedMessages = outgoingRecords.stream() - .map(Record::getData) - .map(e -> e.get("message", String.class)) - .collect(Collectors.toSet()); - assertThat(receivedMessages, equalTo(expectedMessages)); } @Test void send_Events_to_server_when_expecting_SSL_should_throw() { - final PeerForwarderConfiguration peerForwarderConfiguration = createConfiguration(true); - certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); + final PeerForwarderConfiguration peerForwarderConfiguration = createConfiguration(true, ForwardingAuthentication.UNAUTHENTICATED); - final PeerForwarderClient client = createClient(peerForwarderConfiguration, certificateProviderFactory); + final PeerForwarderClient client = createClient(peerForwarderConfiguration); final UnprocessedRequestException actualException = assertThrows(UnprocessedRequestException.class, () -> client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, pluginId, pipelineName)); assertThat(actualException.getCause(), instanceOf(SSLHandshakeException.class)); + + final Collection> receivedRecords = getServerSideRecords(peerForwarderProvider); + assertThat(receivedRecords, notNullValue()); + assertThat(receivedRecords, is(empty())); + } + } + + @Nested + class WithMutualTls { + + private PeerForwarderConfiguration peerForwarderConfiguration; + private PeerForwarderServer server; + private PeerForwarderProvider peerForwarderProvider; + + @BeforeEach + void setUp() { + peerForwarderConfiguration = createConfiguration(true, ForwardingAuthentication.MUTUAL_TLS); + + final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration); + peerForwarderProvider = createPeerForwarderProvider(peerForwarderConfiguration, certificateProviderFactory); + peerForwarderProvider.register(pipelineName, pluginId, Collections.singleton(UUID.randomUUID().toString())); + server = createServer(peerForwarderConfiguration, certificateProviderFactory, peerForwarderProvider); + server.start(); } + + @AfterEach + void tearDown() { + server.stop(); + } + + @Test + void send_Events_to_server() { + final PeerForwarderClient client = createClient(peerForwarderConfiguration); + + final AggregatedHttpResponse httpResponse = client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, pluginId, pipelineName); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + + final Collection> receivedRecords = getServerSideRecords(peerForwarderProvider); + assertThat(receivedRecords, notNullValue()); + assertThat(receivedRecords.size(), equalTo(outgoingRecords.size())); + + final Set receivedMessages = new HashSet<>(); + for (Record receivedRecord : receivedRecords) { + assertThat(receivedRecord, notNullValue()); + assertThat(receivedRecord.getData(), instanceOf(Event.class)); + final Event event = (Event) receivedRecord.getData(); + final String message = event.get("message", String.class); + assertThat(message, notNullValue()); + receivedMessages.add(message); + } + + assertThat(receivedMessages, equalTo(expectedMessages)); + } + + @Test + void send_Events_to_server_without_client_certificate_closes() { + final PeerForwarderConfiguration peerForwarderConfiguration = createConfiguration(false, ForwardingAuthentication.UNAUTHENTICATED); + + final PeerForwarderClient client = createClient(peerForwarderConfiguration); + + assertThrows(ClosedSessionException.class, () -> client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, pluginId, pipelineName)); + + final Collection> receivedRecords = getServerSideRecords(peerForwarderProvider); + assertThat(receivedRecords, notNullValue()); + assertThat(receivedRecords, is(empty())); + } + + @Test + void send_Events_to_server_with_unknown_certificate_key_closes() { + final String alternateSigningKeyFile = "src/test/resources/test-alternate-key.key"; + final PeerForwarderConfiguration peerForwarderConfiguration = createConfiguration( + true, ForwardingAuthentication.MUTUAL_TLS, alternateSigningKeyFile); + + final PeerForwarderClient client = createClient(peerForwarderConfiguration); + assertThrows(UnprocessedRequestException.class, () -> client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, pluginId, pipelineName)); + + final Collection> receivedRecords = getServerSideRecords(peerForwarderProvider); + assertThat(receivedRecords, notNullValue()); + assertThat(receivedRecords, is(empty())); + } + } + + private PeerForwarderConfiguration createConfiguration(final boolean ssl, final ForwardingAuthentication authentication) { + return createConfiguration(ssl, authentication, SSL_KEY_FILE); } - private PeerForwarderConfiguration createConfiguration(final boolean ssl) { - final String sslCertificateFile = "src/test/resources/test-crt.crt"; - final String sslKeyFile = "src/test/resources/test-key.key"; + private PeerForwarderConfiguration createConfiguration( + final boolean ssl, + final ForwardingAuthentication authentication, + final String sslKeyFile) { + final Map authenticationMap = Collections.singletonMap(authentication.getName(), null); return new PeerForwarderConfiguration( 21890, 10_000, @@ -259,8 +343,9 @@ private PeerForwarderConfiguration createConfiguration(final boolean ssl) { 500, 1024, ssl, - sslCertificateFile, + SSL_CERTIFICATE_FILE, sslKeyFile, + authenticationMap, false, null, null, diff --git a/data-prepper-core/src/test/resources/invalid_peer_forwarder_config_with_many_authentication.yml b/data-prepper-core/src/test/resources/invalid_peer_forwarder_config_with_many_authentication.yml new file mode 100644 index 0000000000..509644be05 --- /dev/null +++ b/data-prepper-core/src/test/resources/invalid_peer_forwarder_config_with_many_authentication.yml @@ -0,0 +1,16 @@ +port: 21895 +request_timeout: 1000 +server_thread_count: 100 +max_connection_count: 100 +max_pending_requests: 512 +ssl: true +ssl_certificate_file: src/test/resources/test-crt.crt +ssl_key_file: src/test/resources/test-key.crt +use_acm_certificate_for_ssl: false +discovery_mode: static +client_thread_count: 100 +batch_size: 100 +buffer_size: 100 +authentication: + mutual_tls: + unauthenticated: \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/invalid_peer_forwarder_config_with_mutual_tls_not_ssl.yml b/data-prepper-core/src/test/resources/invalid_peer_forwarder_config_with_mutual_tls_not_ssl.yml new file mode 100644 index 0000000000..3eec5b3871 --- /dev/null +++ b/data-prepper-core/src/test/resources/invalid_peer_forwarder_config_with_mutual_tls_not_ssl.yml @@ -0,0 +1,15 @@ +port: 21895 +request_timeout: 1000 +server_thread_count: 100 +max_connection_count: 100 +max_pending_requests: 512 +ssl: false +ssl_certificate_file: src/test/resources/test-crt.crt +ssl_key_file: src/test/resources/test-key.crt +use_acm_certificate_for_ssl: false +discovery_mode: static +client_thread_count: 100 +batch_size: 100 +buffer_size: 100 +authentication: + mutual_tls: \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/test-alternate-crt.crt b/data-prepper-core/src/test/resources/test-alternate-crt.crt new file mode 100644 index 0000000000..00e130297d --- /dev/null +++ b/data-prepper-core/src/test/resources/test-alternate-crt.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDDjCCAfYCCQCXNkX+YubACjANBgkqhkiG9w0BAQsFADBJMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjENMAsGA1UECgwEVGVzdDEN +MAsGA1UECwwEVGVzdDAeFw0yMjA5MTUxNDE5MzJaFw0yNTA5MTQxNDE5MzJaMEkx +CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJUWDEPMA0GA1UEBwwGQXVzdGluMQ0wCwYD +VQQKDARUZXN0MQ0wCwYDVQQLDARUZXN0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEArGuHRKJ+CVk2i4P7KCRC95imo5z0RaMm0Cq32FvvHas+rpe95fk0 +Er4I4QRhJBAAqtpUfM1dyimSkRoHrAWPf8uOfdbejhu3LdawSKFNeOnp+2XG7iCq +MD22JOplHe8SKva1SgI1CMa2gwTDC9TxMsNejep6tKOSxR0S5K4Zau95LARWpWqx +gPpQLnGwVAiproCd9Z/YdTMAex8GlGif1DlUGd0rFCWWbaUpWwf9AzN020hPbXhM +bC/KUw6n3P01uFmR0Q7WYld/CENPm3XPvENF9UGeJ/Yv708Ik7hTiBs98Il1uXZt +a7mkJMntdPefTGrITcWo/dchP1ZORZMSTQIDAQABMA0GCSqGSIb3DQEBCwUAA4IB +AQANWSdgBJbBtx6DL3/3RU1g4gIieNaa5xQyFPV74uWlO11w+8GmzPccq1KfziwW +e3u0+clbGCsh0nliOIwo991PAFAnQmdHlM5EZOcVPyV0HbD1BDxzocUahpUolZpI +b2Kq8adPXNx7Yhf3fpRbT26STzQW8I+TJlPvaElb/CambrADHKZATX02Km4eFub0 +diRMMaeY+7VqDUJYnFodJhGdnlELyEppFlmZ/bij3y3yGVGhL1zJ5Dhq2KyugzV0 +vhcgYsnmdLaNzeVvmeYlmk8M0oPnfVRteaXqvfprqYwxG5evmLQwsRzNMBD5hE4t +Y9IMxWvIMcWsN5tnl/uaNtNe +-----END CERTIFICATE----- diff --git a/data-prepper-core/src/test/resources/test-alternate-key.key b/data-prepper-core/src/test/resources/test-alternate-key.key new file mode 100644 index 0000000000..34ae6eb411 --- /dev/null +++ b/data-prepper-core/src/test/resources/test-alternate-key.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCsa4dEon4JWTaL +g/soJEL3mKajnPRFoybQKrfYW+8dqz6ul73l+TQSvgjhBGEkEACq2lR8zV3KKZKR +GgesBY9/y4591t6OG7ct1rBIoU146en7ZcbuIKowPbYk6mUd7xIq9rVKAjUIxraD +BMML1PEyw16N6nq0o5LFHRLkrhlq73ksBFalarGA+lAucbBUCKmugJ31n9h1MwB7 +HwaUaJ/UOVQZ3SsUJZZtpSlbB/0DM3TbSE9teExsL8pTDqfc/TW4WZHRDtZiV38I +Q0+bdc+8Q0X1QZ4n9i/vTwiTuFOIGz3wiXW5dm1ruaQkye10959MashNxaj91yE/ +Vk5FkxJNAgMBAAECggEBAI3aveSTB7zrQDwSru08wDfyyI7tz+8HJyS3MKB6KsWv +IRgF0f/GQrh0zttKfh1saRAoJsCzOcnc4DAvSUaXIJPUxnvz7xDd7pxw+IIY66Ni +V9Y0yzEZgMvFyfifO+gfzEO6zDaCkF2TF4+9uDcgz/yizkVWN/Qsa9FudR/duR3H +Y1aAihcszm2Ikhchz7w1z77ouL1ZWLHs0i7X6jNfIL8jy1H4WT/m/JCEemo8sxyj +6YmdXyUIZvl8h/kylKrY9XRncXdmwkWu+NbWYm5e5wUvfz0REnO1vx19xt2WuyMC +BrulT9rwGIub47edG83O/n6ps4s5MPCaqjYWUIQLpgUCgYEA5TdEmc1/6UMCvhzN +aijsg3/VmmDd8nbylpBSCZaOXiftOYQVXAMd+X2vaB8vL4sKo2MAP5yJcYmaFMFW +mhEgntw4glHCFsxqUequplNnFXmMtC0E12aoMARa+ikFq8iv3a1cmJwl/+2gr5ro +JLTBL5gi7HOxN2BBuRLprmBsNvcCgYEAwJFG71t3dWBBkpLXS5XqT6Cx+dyjv9B3 +LNGL4DfcQnNOXcwEVC0o4kNjo8FySjQfWaQLACuGPs3sqNAggewZ993R4sN+BfOd +aN+AWAhPpxpBET2ZLURpRbD1bnbdIrzTimWAiG4xucmRQ7dgdvR+CF0m2I6lptNp +edAiA4lsG9sCgYB+rfjf988Fn99YKprsXQm582CeHYMMieiwhbqRSgAM0YwuCW20 +mJRsWqYiqXlukS2j9wcgIOdlNGNPrTqrk2Ov6I3imEToTLlF5Kn5ioaiO7cjrO3M +DhRN1Vpif2F++z8XCMTwIvxtYWs5IifCgZNzNmW2wInkzPIF0woLMYcdKQKBgH7+ +T92CYKGTvSrAlCNQ1pl0zGqPS54wCKgR+UEBFczs5f2Nkj1BGvk8n/Vamdi9zv76 +5BZUnc/FVhFuUEHeRrHkI4p8ihu6sVB8NNPZh04YTljkWqQzvYksTm9vDB7gkFFw +5vcSVNDFUXtq778DNiql4/xk9nGycpjW3defRSCJAoGARgu/z25yVd4XruyHLMDr +qohh9dPuSZtUl5KIL4VgRl/MqKANEmcJyuDw3hzglDFYcXJPpRP6T/8Uu/8TZcJJ +JTIV2R/ELWTprQFS+dW0ard3nKmith+igf1p60OZgzZNI+8vRgeGK6tokaCKTvox +jYuXGLtjzeDasKQxfBNTOkU= +-----END PRIVATE KEY----- diff --git a/data-prepper-core/src/test/resources/valid_peer_forwarder_config_with_mutual_tls.yml b/data-prepper-core/src/test/resources/valid_peer_forwarder_config_with_mutual_tls.yml new file mode 100644 index 0000000000..361dec0bc0 --- /dev/null +++ b/data-prepper-core/src/test/resources/valid_peer_forwarder_config_with_mutual_tls.yml @@ -0,0 +1,15 @@ +port: 21895 +request_timeout: 1000 +server_thread_count: 100 +max_connection_count: 100 +max_pending_requests: 512 +ssl: true +ssl_certificate_file: src/test/resources/test-crt.crt +ssl_key_file: src/test/resources/test-key.crt +use_acm_certificate_for_ssl: false +discovery_mode: static +client_thread_count: 100 +batch_size: 100 +buffer_size: 100 +authentication: + mutual_tls: \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/valid_peer_forwarder_config_with_unauthenticated.yml b/data-prepper-core/src/test/resources/valid_peer_forwarder_config_with_unauthenticated.yml new file mode 100644 index 0000000000..0a668c631b --- /dev/null +++ b/data-prepper-core/src/test/resources/valid_peer_forwarder_config_with_unauthenticated.yml @@ -0,0 +1,13 @@ +port: 21895 +request_timeout: 1000 +server_thread_count: 100 +max_connection_count: 100 +max_pending_requests: 512 +ssl: false +use_acm_certificate_for_ssl: false +discovery_mode: static +client_thread_count: 100 +batch_size: 100 +buffer_size: 100 +authentication: + unauthenticated: \ No newline at end of file