diff --git a/build.sbt b/build.sbt index b8264c77d..27a59589a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,5 @@ import Dependencies.Versions +import Dependencies.`cyclopsPure` import Dependencies.`cyclops` import Dependencies.`lombok` import Dependencies.globalExcludeDeps @@ -71,7 +72,7 @@ lazy val `query-language` = (project in file("java-connectors/kafka-connect-quer Seq( name := "kafka-connect-query-language", description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores", - libraryDependencies ++= Seq(cyclops, lombok), + libraryDependencies ++= Seq(cyclops, cyclopsPure, lombok), publish / skip := true, ), ) diff --git a/java-connectors/build.gradle b/java-connectors/build.gradle index fc2c6fdac..f4e909470 100644 --- a/java-connectors/build.gradle +++ b/java-connectors/build.gradle @@ -31,6 +31,7 @@ allprojects { apacheToConfluentVersionAxis = ["2.8.1": "6.2.2", "3.3.0": "7.3.1"] caffeineVersion = '3.1.8' cyclopsVersion = '10.4.1' + bouncyCastleVersion = "1.78.1" //Other Manifest Info mainClassName = '' @@ -64,6 +65,7 @@ allprojects { // functional java implementation group: 'com.oath.cyclops', name: 'cyclops', version: cyclopsVersion + implementation group: 'com.oath.cyclops', name: 'cyclops-pure', version: cyclopsVersion //tests testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoJupiterVersion diff --git a/java-connectors/kafka-connect-common/build.gradle b/java-connectors/kafka-connect-common/build.gradle index 800c50649..65456e61b 100644 --- a/java-connectors/kafka-connect-common/build.gradle +++ b/java-connectors/kafka-connect-common/build.gradle @@ -12,6 +12,12 @@ project(":kafka-connect-common") { api group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion testImplementation(project(path: ':test-utils', configuration: 'testArtifacts')) + testImplementation group: 'org.bouncycastle', name:'bcprov-jdk18on', version: bouncyCastleVersion + testImplementation group: 'org.bouncycastle', name:'bcutil-jdk18on', version: bouncyCastleVersion + testImplementation group: 'org.bouncycastle', name:'bcpkix-jdk18on', version: bouncyCastleVersion + testImplementation group: 'org.bouncycastle', name:'bcpg-jdk18on', version: bouncyCastleVersion + testImplementation group: 'org.bouncycastle', name:'bctls-jdk18on', version: bouncyCastleVersion + //confluent - may be needed soon // implementation group: 'io.confluent', name: 'kafka-json-schema-serializer', version: apacheToConfluentVersionAxis.get(kafkaVersion) // implementation group: 'io.confluent', name: 'kafka-connect-avro-converter', version: apacheToConfluentVersionAxis.get(kafkaVersion) diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/SecuritySetupException.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/SecuritySetupException.java new file mode 100644 index 000000000..f7e7e994a --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/SecuritySetupException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.exception; + +public class SecuritySetupException extends StreamReactorException { + + public SecuritySetupException(String message) { + super(message); + } + + public SecuritySetupException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/KeyStoreInfo.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/KeyStoreInfo.java new file mode 100644 index 000000000..9bf4ceed9 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/KeyStoreInfo.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.security; + +import cyclops.control.Option; +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.nio.file.Path; + +@AllArgsConstructor +@Data +public class KeyStoreInfo implements StoreInfo { + + private Path storePath; + + private StoreType storeType; + + private String storePassword; + + private Option managerAlgorithm; + +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoreInfo.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoreInfo.java new file mode 100644 index 000000000..0f72defa9 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoreInfo.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.security; + +import cyclops.control.Option; + +import java.nio.file.Path; + +interface StoreInfo { + + Path getStorePath(); + + StoreType getStoreType(); + + Option getManagerAlgorithm(); +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoreType.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoreType.java new file mode 100644 index 000000000..3ad2d48f3 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoreType.java @@ -0,0 +1,44 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.security; + +import cyclops.control.Either; +import cyclops.control.Try; +import io.lenses.streamreactor.common.exception.SecuritySetupException; +import lombok.Getter; + +@Getter +public enum StoreType { + + JKS("JKS"), + PKCS12("PKCS12"); + + private final String type; + + StoreType(String type) { + this.type = type; + } + + public static Either valueOfCaseInsensitive(String storeType) { + return Try + .withCatch(() -> StoreType.valueOf(storeType.toUpperCase())) + .toEither() + .mapLeft(ex -> new SecuritySetupException(String.format("Unable to retrieve Store type %s", storeType), ex)); + } + + public static final StoreType DEFAULT_STORE_TYPE = StoreType.JKS; + +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoresInfo.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoresInfo.java new file mode 100644 index 000000000..9159ca7d0 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/StoresInfo.java @@ -0,0 +1,249 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.security; + +import cyclops.control.Either; +import cyclops.control.Option; +import cyclops.control.Try; +import cyclops.instances.control.TryInstances; +import cyclops.typeclasses.Do; +import io.lenses.streamreactor.common.exception.SecuritySetupException; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.val; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.nio.file.Path; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.lenses.streamreactor.common.security.StoreType.DEFAULT_STORE_TYPE; + +@AllArgsConstructor +@Data +public class StoresInfo { + + private static final String PROTOCOL_TLS = "TLS"; + + private Option sslProtocol; + private Option maybeTrustStore; + private Option maybeKeyStore; + + private Try getJksStore(Path path, StoreType storeType, Option password) { + return Try.withCatch( + () -> { + val keyStore = KeyStore.getInstance(storeType.toString()); + val inputStream = new FileInputStream(path.toFile()); + keyStore.load(inputStream, password.map(String::toCharArray).orElse(null)); + return keyStore; + }, + Exception.class + ).mapFailure(ex -> new SecuritySetupException("unable to retrieve keystore", ex)); + + } + + public Either> toSslContext() { + + final Option> maybeTrustFactory = + maybeTrustStore.map( + trustStore -> trustManagers( + trustStore.getStorePath(), + trustStore.getStoreType(), + trustStore.getStorePassword(), + trustStore.getManagerAlgorithm() + ) + ); + + final Option> maybeKeyFactory = + maybeKeyStore.map( + keyStore -> keyManagers( + keyStore.getStorePath(), + keyStore.getStoreType(), + keyStore.getStorePassword(), + keyStore.getManagerAlgorithm() + ) + ); + + val failures = + Stream.of( + maybeTrustFactory.filter(Try::isFailure).flatMap(Try::failureGet).stream(), + maybeKeyFactory.filter(Try::isFailure).flatMap(Try::failureGet).stream() + ) + .flatMap(Function.identity()) + .collect(Collectors.toUnmodifiableList()); + + val maybeFailure = + Option.fromOptional( + failures + .stream() + .findFirst() + ); + + return maybeFailure + .toEither(getAndInitSslContext(maybeKeyFactory, maybeTrustFactory, sslProtocol)) + .swap() + .fold(Either::left, either -> either.fold(Either::left, Either::right)); + + } + + private static Either> getAndInitSslContext( + Option> maybeKeyFactory, + Option> maybeTrustFactory, + Option sslProtocol + ) { + return Try.withCatch(() -> { + // If either factory is present, initialize SSLContext + if (maybeKeyFactory.isPresent() || maybeTrustFactory.isPresent()) { + val sslContext = SSLContext.getInstance(sslProtocol.orElse(PROTOCOL_TLS)); + sslContext.init( + maybeKeyFactory.flatMap(Try::toOption).map(KeyManagerFactory::getKeyManagers).orElse(null), + maybeTrustFactory.flatMap(Try::toOption).map(TrustManagerFactory::getTrustManagers).orElse(null), + null + ); + return Option.of(sslContext); + } + return Option.none(); + }).mapFailure(ex -> new SecuritySetupException("unable to retrieve keystore", ex)) + .toEither(); + } + + private Try trustManagers(Path path, StoreType storeType, + Option password, Option algorithm) { + return Try.narrowK( + Do.forEach( + TryInstances.monad() + ) + .__(getJksStore(path, storeType, password)) + .__((KeyStore keyStore) -> StoresInfo.getTrustManagerFactoryFromKeyStore(keyStore, algorithm)) + .yield( + (KeyStore keyStore, TrustManagerFactory trustManagerFactory) -> trustManagerFactory + ) + .unwrap()); + + } + + private Try keyManagers(Path path, StoreType storeType, + String password, Option algorithm) { + return Try.narrowK( + Do.forEach( + TryInstances.monad() + ) + .__(getJksStore(path, storeType, Option.of(password))) + .__((KeyStore keyStore) -> StoresInfo.getKeyManagerFactoryFromKeyStore(keyStore, password, algorithm)) + .yield( + (KeyStore keyStore, KeyManagerFactory trustManagerFactory) -> trustManagerFactory + ) + .unwrap()); + } + + private static Try getTrustManagerFactoryFromKeyStore( + KeyStore keyStore, Option algorithm) { + return Try.withCatch(() -> { + val trustManagerFactory = + TrustManagerFactory.getInstance(algorithm.orElse(TrustManagerFactory.getDefaultAlgorithm())); + trustManagerFactory.init(keyStore); + return trustManagerFactory; + }, NoSuchAlgorithmException.class, KeyStoreException.class) + .mapFailure(ex -> new SecuritySetupException("Unable to get trust manager factory from keystore", ex)); + } + + private static Try getKeyManagerFactoryFromKeyStore(KeyStore keyStore, + String password, Option algorithm) { + return Try.withCatch(() -> { + val keyManagerFactory = KeyManagerFactory.getInstance(algorithm.orElse(KeyManagerFactory.getDefaultAlgorithm())); + keyManagerFactory.init(keyStore, password.toCharArray()); + return keyManagerFactory; + }, NoSuchAlgorithmException.class, KeyStoreException.class, UnrecoverableKeyException.class) + .mapFailure(ex -> new SecuritySetupException("Unable to get trust manager factory from truststore", ex)); + } + + public static Either fromConfig(AbstractConfig config) { + val sslProtocol = configToSslProtocol(config); + val trustStore = configToTrustStoreInfo(config); + val keyStore = configToKeyStoreInfo(config); + + val failures = + Stream.of(trustStore, keyStore) + .flatMap(option -> option.stream().flatMap(either -> either.swap().stream())) + .collect(Collectors.toUnmodifiableList()); + + return failures.isEmpty() + ? Either.right(new StoresInfo( + sslProtocol, + trustStore.flatMap(Either::toOption), + keyStore.flatMap(Either::toOption) + )) + : Either.left(failures.iterator().next()); + } + + private static Option> configToTrustStoreInfo(AbstractConfig config) { + return Option.ofNullable(config.getString(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) + .map(Path::of) + .map(storePath -> fromConfigOption(config, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) + .map(storeType -> { + val storePassword = + Option.ofNullable(config.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + .map(Password::value); + val managerAlgorithm = Option.ofNullable(config.getString(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG)); + return new TrustStoreInfo(storePath, storeType, storePassword, managerAlgorithm); + } + )); + } + + private static Option> configToKeyStoreInfo( + AbstractConfig config + ) { + return Option.ofNullable(config.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + .map(Path::of) + .flatMap(storePath -> fromConfigOption(config, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG) + .map(storeType -> Option.ofNullable(config.getPassword(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)) + .map(Password::value) + .toEither(new SecuritySetupException("Password is required for key store")) + .map(pw -> { + val managerAlgorithm = + Option.ofNullable(config.getString(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG)); + return new KeyStoreInfo(storePath, storeType, pw, managerAlgorithm); + }) + ) + .toOption() + ); + } + + private static Option configToSslProtocol( + AbstractConfig config + ) { + return Option.ofNullable(config.getString(SslConfigs.SSL_PROTOCOL_CONFIG)); + } + + private static Either fromConfigOption(AbstractConfig config, String configKey) { + return Option + .fromNullable(config.getString(configKey)) + .map(StoreType::valueOfCaseInsensitive) + .orElse(Either.right(DEFAULT_STORE_TYPE)); + } + +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/TrustStoreInfo.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/TrustStoreInfo.java new file mode 100644 index 000000000..6809907fd --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/security/TrustStoreInfo.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.security; + +import cyclops.control.Option; +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.nio.file.Path; + +@AllArgsConstructor +@Data +public class TrustStoreInfo implements StoreInfo { + + private Path storePath; + + private StoreType storeType; + + private Option storePassword; + + private Option managerAlgorithm; + +} diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/security/KeyStoreUtils.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/security/KeyStoreUtils.java new file mode 100644 index 000000000..9adb3eee4 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/security/KeyStoreUtils.java @@ -0,0 +1,171 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.security; +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import lombok.extern.slf4j.Slf4j; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; + +import java.io.FileOutputStream; +import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.Security; +import java.security.cert.X509Certificate; +import java.security.interfaces.RSAPrivateKey; +import java.time.Duration; +import java.time.Instant; +import java.util.Date; + +/** + * Utility class for creating and managing keystore and truststore files using RSA keys and X.509 certificates. + */ +@Slf4j +public class KeyStoreUtils { + + private static final String COMMON_NAME = "CN"; + private static final String DIRECTORY_SECURITY = "security"; + private static final String ALGORITHM_RSA = "RSA"; + private static final String PROVIDER_BC = "BC"; + private static final int KEY_SIZE = 2048; + private static final int DAYS_IN_YEAR = 365; + private static final String SIGNER_SIGNATURE_ALGORITHM = "SHA256WithRSAEncryption"; + private static final String KEYSTORE_TYPE = "JKS"; + private static final String KEYSTORE_FILE = "keystore.jks"; + private static final String TRUSTSTORE_FILE = "truststore.jks"; + private static final String TEST_ALIAS = "alias"; + + static { + Security.addProvider(new BouncyCastleProvider()); + } + + /** + * Creates a new keystore and truststore with an RSA key pair and X.509 certificate. + * + * @param commonName the common name for the certificate + * @param keyStorePassword the password for the keystore + * @param trustStorePassword the password for the truststore + * @return Path to the temporary directory where keystore and truststore are created + * @throws Exception if an error occurs during the creation process + */ + public static Path createKeystore(String commonName, String keyStorePassword, String trustStorePassword) + throws Exception { + Path tmpDir = Files.createTempDirectory(DIRECTORY_SECURITY); + + KeyPairGenerator keyPairGen = KeyPairGenerator.getInstance(ALGORITHM_RSA, PROVIDER_BC); + keyPairGen.initialize(KEY_SIZE); + KeyPair keyPair = keyPairGen.generateKeyPair(); + + Date notBefore = new Date(Instant.now().toEpochMilli()); + Date notAfter = new Date(Instant.now().plus(Duration.ofDays(DAYS_IN_YEAR)).toEpochMilli()); + + SubjectPublicKeyInfo publicKeyInfo = SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()); + + X509v3CertificateBuilder certBuilder = + new X509v3CertificateBuilder( + new X500Name(COMMON_NAME + "=" + commonName), + BigInteger.valueOf(System.currentTimeMillis()), + notBefore, + notAfter, + new X500Name(COMMON_NAME + "=" + commonName), + publicKeyInfo + ); + + JcaContentSignerBuilder contentSignerBuilder = + new JcaContentSignerBuilder(SIGNER_SIGNATURE_ALGORITHM).setProvider(PROVIDER_BC); + X509Certificate certificate = + new JcaX509CertificateConverter().setProvider(PROVIDER_BC) + .getCertificate(certBuilder.build(contentSignerBuilder.build(keyPair.getPrivate()))); + + createAndSaveKeystore(tmpDir, keyStorePassword, certificate, (RSAPrivateKey) keyPair.getPrivate()); + createAndSaveTruststore(tmpDir, trustStorePassword, certificate); + + log.info("container -> Creating keystore at " + tmpDir); + return tmpDir; + } + + /** + * Creates and saves the keystore with the given password and certificate. + * + * @param tmpDir the temporary directory where the keystore will be saved + * @param password the password for the keystore + * @param certificate the certificate to store in the keystore + * @param privateKey the private key to store in the keystore + * @return the path to the created keystore file + * @throws Exception if an error occurs during the keystore creation process + */ + private static String createAndSaveKeystore(Path tmpDir, String password, X509Certificate certificate, + RSAPrivateKey privateKey) throws Exception { + KeyStore keyStore = KeyStore.getInstance(KEYSTORE_TYPE); + keyStore.load(null, password.toCharArray()); + + keyStore.setKeyEntry(TEST_ALIAS, privateKey, password.toCharArray(), new java.security.cert.Certificate[]{ + certificate}); + + String keyStorePath = tmpDir.resolve(KEYSTORE_FILE).toString(); + try (FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePath)) { + keyStore.store(keyStoreOutputStream, password.toCharArray()); + } + + return keyStorePath; + } + + /** + * Creates and saves the truststore with the given password and certificate. + * + * @param tmpDir the temporary directory where the truststore will be saved + * @param password the password for the truststore + * @param certificate the certificate to store in the truststore + * @return the path to the created truststore file + * @throws Exception if an error occurs during the truststore creation process + */ + private static String createAndSaveTruststore(Path tmpDir, String password, X509Certificate certificate) + throws Exception { + KeyStore trustStore = KeyStore.getInstance(KEYSTORE_TYPE); + trustStore.load(null, password.toCharArray()); + + trustStore.setCertificateEntry(TEST_ALIAS, certificate); + String trustStorePath = tmpDir.resolve(TRUSTSTORE_FILE).toString(); + + try (FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStorePath)) { + trustStore.store(trustStoreOutputStream, password.toCharArray()); + } + + return trustStorePath; + } +} diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/security/StoresInfoTest.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/security/StoresInfoTest.java new file mode 100644 index 000000000..987e429eb --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/security/StoresInfoTest.java @@ -0,0 +1,174 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.security; + +import io.lenses.streamreactor.common.config.base.BaseConfig; +import io.lenses.streamreactor.common.exception.SecuritySetupException; +import lombok.val; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.junit.jupiter.api.Test; + +import java.io.FileNotFoundException; +import java.nio.file.Path; + +import static cyclops.control.Either.right; +import static cyclops.control.Option.none; +import static cyclops.control.Option.some; +import static io.lenses.streamreactor.test.utils.EitherValues.assertLeft; +import static io.lenses.streamreactor.test.utils.EitherValues.assertRight; +import static io.lenses.streamreactor.test.utils.EitherValues.getLeft; +import static io.lenses.streamreactor.test.utils.EitherValues.getRight; +import static io.lenses.streamreactor.test.utils.OptionValues.getValue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class StoresInfoTest { + + private static final String KEY_OR_TRUST_MANAGER_ALGORITHM = "PKIX"; + private static final String STORE_PASSWORD = "changeIt"; + private static final String SSL_PROTOCOL_TLS = "TLS"; + + private final Path keystoreDir = KeyStoreUtils.createKeystore("TestCommonName", STORE_PASSWORD, STORE_PASSWORD); + private final Path keystoreFile = keystoreDir.resolve("keystore.jks"); + private final Path truststoreFile = keystoreDir.resolve("truststore.jks"); + + StoresInfoTest() throws Exception { + } + + @Test + void testToSslContextWithBothNone() { + val storesInfo = new StoresInfo(none(), none(), none()); + assertEquals(right(none()), storesInfo.toSslContext()); + } + + @Test + void testToSslContextWithKeyStoreDefined() { + val storeInfo = new KeyStoreInfo(keystoreFile, StoreType.JKS, STORE_PASSWORD, none()); + val storesInfo = new StoresInfo(some(SSL_PROTOCOL_TLS), none(), some(storeInfo)); + + val sslContext = getRight(storesInfo.toSslContext()); + + assertEquals(SSL_PROTOCOL_TLS, getValue(sslContext).getProtocol()); + } + + @Test + void testToSslContextWithTrustStoreDefined() { + val storeInfo = new TrustStoreInfo(keystoreFile, StoreType.JKS, some(STORE_PASSWORD), none()); + val storesInfo = new StoresInfo(none(), some(storeInfo), none()); + + val sslContext = getRight(storesInfo.toSslContext()); + + assertEquals(SSL_PROTOCOL_TLS, getValue(sslContext).getProtocol()); + } + + @Test + void testToSslContextWithBothStoresDefined() { + val keyStoreInfo = new KeyStoreInfo(keystoreFile, StoreType.JKS, STORE_PASSWORD, none()); + val trustStoreInfo = new TrustStoreInfo(truststoreFile, StoreType.JKS, some(STORE_PASSWORD), none()); + val storesInfo = new StoresInfo(some(SSL_PROTOCOL_TLS), some(trustStoreInfo), some(keyStoreInfo)); + + val sslContext = getRight(storesInfo.toSslContext()); + + assertEquals(SSL_PROTOCOL_TLS, getValue(sslContext).getProtocol()); + } + + @Test + void testToSslContextThrowsFileNotFoundExceptionForInvalidKeyStorePath() { + val keyStoreInfo = new KeyStoreInfo(Path.of("/invalid/path/to/keystore"), StoreType.JKS, STORE_PASSWORD, none()); + val storesInfo = new StoresInfo(none(), none(), some(keyStoreInfo)); + + assertEquals(FileNotFoundException.class, getLeft(storesInfo.toSslContext()).getCause().getClass()); + } + + @Test + void testToSslContextThrowsFileNotFoundExceptionForInvalidTrustStorePath() { + val trustStoreInfo = + new TrustStoreInfo(Path.of("/invalid/path/to/truststore"), StoreType.JKS, some(STORE_PASSWORD), none()); + val storesInfo = new StoresInfo(some(SSL_PROTOCOL_TLS), some(trustStoreInfo), none()); + + assertEquals(FileNotFoundException.class, getLeft(storesInfo.toSslContext()).getCause().getClass()); + + } + + @Test + void testStoresInfoCreationFromBaseConfig() { + BaseConfig mockConfig = mock(BaseConfig.class); + when(mockConfig.getString(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)).thenReturn("/path/to/truststore"); + when(mockConfig.getString(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)).thenReturn("JKS"); + when(mockConfig.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).thenReturn(null); + when(mockConfig.getString(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG)).thenReturn(KEY_OR_TRUST_MANAGER_ALGORITHM); + + when(mockConfig.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)).thenReturn("/path/to/keystore"); + when(mockConfig.getString(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)).thenReturn("JKS"); + when(mockConfig.getPassword(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).thenReturn(new Password(STORE_PASSWORD)); + when(mockConfig.getString(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG)).thenReturn(KEY_OR_TRUST_MANAGER_ALGORITHM); + + val storesInfo = StoresInfo.fromConfig(mockConfig); + + assertRight(storesInfo).isEqualTo( + new StoresInfo( + none(), + some(new TrustStoreInfo(Path.of("/path/to/truststore"), StoreType.JKS, none(), some( + KEY_OR_TRUST_MANAGER_ALGORITHM))), + some(new KeyStoreInfo(Path.of("/path/to/keystore"), StoreType.JKS, STORE_PASSWORD, some( + KEY_OR_TRUST_MANAGER_ALGORITHM))) + ) + ); + } + + @Test + void testStoresInfoCreationFromBaseConfigFailsWithoutTrustStorePassword() { + BaseConfig mockConfig = mock(BaseConfig.class); + when(mockConfig.getString(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)).thenReturn("/path/to/truststore"); + when(mockConfig.getString(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)).thenReturn("JKS"); + when(mockConfig.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).thenReturn(null); + + when(mockConfig.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)).thenReturn("/path/to/keystore"); + when(mockConfig.getString(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)).thenReturn("JKS"); + when(mockConfig.getPassword(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).thenReturn(null); + + val storesInfo = StoresInfo.fromConfig(mockConfig); + + assertLeft(storesInfo) + .isInstanceOf(SecuritySetupException.class) + .satisfies(ex -> assertThat(ex.getMessage()).contains("Password is required for key store")); + + } + + @Test + void testStoresInfoCreationWithNoneValuesWithMissingConfigs() { + BaseConfig mockConfig = mock(BaseConfig.class); + when(mockConfig.getString(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)).thenReturn(null); + when(mockConfig.getString(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)).thenReturn(null); + when(mockConfig.getPassword(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).thenReturn(null); + + when(mockConfig.getString(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)).thenReturn(null); + when(mockConfig.getString(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)).thenReturn(null); + when(mockConfig.getPassword(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).thenReturn(null); + + val storesInfo = StoresInfo.fromConfig(mockConfig); + assertRight(storesInfo).isEqualTo( + new StoresInfo( + none(), + none(), + none() + ) + ); + } +} diff --git a/java-connectors/test-utils/src/test/java/io/lenses/streamreactor/test/utils/OptionValues.java b/java-connectors/test-utils/src/test/java/io/lenses/streamreactor/test/utils/OptionValues.java new file mode 100644 index 000000000..0d43868dc --- /dev/null +++ b/java-connectors/test-utils/src/test/java/io/lenses/streamreactor/test/utils/OptionValues.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.test.utils; + +import cyclops.control.Option; +import lombok.val; + +public class OptionValues { + + public static X getValue(Option opt) { + val ex = new AssertionError("Expected Some, got None"); + return opt.orElseGet(() -> { + throw ex; + }); + } +} diff --git a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/common/utils/CyclopsToScalaOption.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/common/utils/CyclopsToScalaOption.scala new file mode 100644 index 000000000..550d1a3a6 --- /dev/null +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/common/utils/CyclopsToScalaOption.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.utils + +import cyclops.control.{ Option => CyclopsOption } + +import scala.jdk.OptionConverters.RichOptional +import scala.{ Option => ScalaOption } + +/** + * Utility object for converting Cyclops Option to Scala Option. + * + * This object provides a method to convert an instance of Cyclops Option to a Scala Option. + */ +object CyclopsToScalaOption { + + /** + * Converts a Cyclops Option to a Scala Option. + * + * This method converts an instance of Cyclops Option to a Scala Option. + * + * @tparam M the type of the value contained in the Option + * @param cyclopsOption the Cyclops Option to convert + * @return the converted Scala Option + */ + def convertToScalaOption[M](cyclopsOption: CyclopsOption[M]): ScalaOption[M] = + cyclopsOption.toOptional.toScala + +} diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KElasticClient.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KElasticClient.scala index e20f5e487..dd2d416d0 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KElasticClient.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KElasticClient.scala @@ -21,13 +21,14 @@ import com.sksamuel.elastic4s.http._ import com.sksamuel.elastic4s.http.bulk.BulkResponse import com.typesafe.scalalogging.StrictLogging import io.lenses.kcql.Kcql +import io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow import io.lenses.streamreactor.connect.elastic6.config.ElasticSettings import io.lenses.streamreactor.connect.elastic6.indexname.CreateIndex.getIndexNameForAutoCreate import org.apache.http.auth.AuthScope import org.apache.http.auth.UsernamePasswordCredentials +import org.apache.http.client.config.RequestConfig.Builder import org.apache.http.impl.client.BasicCredentialsProvider import org.apache.http.impl.nio.client.HttpAsyncClientBuilder -import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback import scala.concurrent.Future @@ -39,29 +40,29 @@ trait KElasticClient extends AutoCloseable { object KElasticClient extends StrictLogging { - def createHttpClient(settings: ElasticSettings, endpoints: Seq[ElasticNodeEndpoint]): KElasticClient = - if (settings.httpBasicAuthUsername.nonEmpty && settings.httpBasicAuthPassword.nonEmpty) { - lazy val provider = { - val provider = new BasicCredentialsProvider - val credentials = - new UsernamePasswordCredentials(settings.httpBasicAuthUsername, settings.httpBasicAuthPassword) + def createHttpClient(settings: ElasticSettings, endpoints: Seq[ElasticNodeEndpoint]): KElasticClient = { + val maybeProvider: Option[BasicCredentialsProvider] = { + for { + httpBasicAuthUsername <- Option.when(settings.httpBasicAuthUsername.nonEmpty)(settings.httpBasicAuthUsername) + httpBasicAuthPassword <- Option.when(settings.httpBasicAuthPassword.nonEmpty)(settings.httpBasicAuthPassword) + } yield { + val credentials = new UsernamePasswordCredentials(httpBasicAuthUsername, httpBasicAuthPassword) + val provider = new BasicCredentialsProvider provider.setCredentials(AuthScope.ANY, credentials) provider } - val callback = new HttpClientConfigCallback { - override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = - httpClientBuilder.setDefaultCredentialsProvider(provider) - } - val client: ElasticClient = ElasticClient( - ElasticProperties(endpoints), - requestConfigCallback = NoOpRequestConfigCallback, - httpClientConfigCallback = callback, - ) - new HttpKElasticClient(client) - } else { - val client: ElasticClient = ElasticClient(ElasticProperties(endpoints)) - new HttpKElasticClient(client) } + val client: ElasticClient = ElasticClient( + ElasticProperties(endpoints), + (requestConfigBuilder: Builder) => requestConfigBuilder, + (httpClientBuilder: HttpAsyncClientBuilder) => { + maybeProvider.foreach(httpClientBuilder.setDefaultCredentialsProvider) + unpackOrThrow(settings.storesInfo.toSslContext).map(httpClientBuilder.setSSLContext(_)) + httpClientBuilder + }, + ) + new HttpKElasticClient(client) + } } class HttpKElasticClient(client: ElasticClient) extends KElasticClient { diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/config/ElasticConfig.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/config/ElasticConfig.scala index b44b10a5e..24cd98725 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/config/ElasticConfig.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/config/ElasticConfig.scala @@ -191,6 +191,7 @@ object ElasticConfig { ConfigDef.Width.MEDIUM, ElasticConfigConstants.PROGRESS_COUNTER_ENABLED_DISPLAY, ) + .withClientSslSupport() } /** diff --git a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/config/ElasticSettings.scala b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/config/ElasticSettings.scala index 9d3b86713..15f194afc 100644 --- a/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/config/ElasticSettings.scala +++ b/kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/config/ElasticSettings.scala @@ -15,8 +15,11 @@ */ package io.lenses.streamreactor.connect.elastic6.config +import cyclops.control.Option.none import io.lenses.kcql.Kcql import io.lenses.streamreactor.common.errors.ErrorPolicy +import io.lenses.streamreactor.common.security.StoresInfo +import io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow /** * Created by andrew@datamountaineer.com on 13/05/16. @@ -25,12 +28,13 @@ import io.lenses.streamreactor.common.errors.ErrorPolicy case class ElasticSettings( kcqls: Seq[Kcql], errorPolicy: ErrorPolicy, - taskRetries: Int = ElasticConfigConstants.NBR_OF_RETIRES_DEFAULT, - writeTimeout: Int = ElasticConfigConstants.WRITE_TIMEOUT_DEFAULT, - batchSize: Int = ElasticConfigConstants.BATCH_SIZE_DEFAULT, - pkJoinerSeparator: String = ElasticConfigConstants.PK_JOINER_SEPARATOR_DEFAULT, - httpBasicAuthUsername: String = ElasticConfigConstants.CLIENT_HTTP_BASIC_AUTH_USERNAME_DEFAULT, - httpBasicAuthPassword: String = ElasticConfigConstants.CLIENT_HTTP_BASIC_AUTH_USERNAME_DEFAULT, + taskRetries: Int = ElasticConfigConstants.NBR_OF_RETIRES_DEFAULT, + writeTimeout: Int = ElasticConfigConstants.WRITE_TIMEOUT_DEFAULT, + batchSize: Int = ElasticConfigConstants.BATCH_SIZE_DEFAULT, + pkJoinerSeparator: String = ElasticConfigConstants.PK_JOINER_SEPARATOR_DEFAULT, + httpBasicAuthUsername: String = ElasticConfigConstants.CLIENT_HTTP_BASIC_AUTH_USERNAME_DEFAULT, + httpBasicAuthPassword: String = ElasticConfigConstants.CLIENT_HTTP_BASIC_AUTH_USERNAME_DEFAULT, + storesInfo: StoresInfo = new StoresInfo(none(), none(), none()), ) object ElasticSettings { @@ -46,14 +50,16 @@ object ElasticSettings { val batchSize = config.getInt(ElasticConfigConstants.BATCH_SIZE_CONFIG) - ElasticSettings(kcql, - errorPolicy, - retries, - writeTimeout, - batchSize, - pkJoinerSeparator, - httpBasicAuthUsername, - httpBasicAuthPassword, + ElasticSettings( + kcql, + errorPolicy, + retries, + writeTimeout, + batchSize, + pkJoinerSeparator, + httpBasicAuthUsername, + httpBasicAuthPassword, + unpackOrThrow(StoresInfo.fromConfig(config)), ) } } diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KElasticClient.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KElasticClient.scala index 3733ab34c..209ff9217 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KElasticClient.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/KElasticClient.scala @@ -25,6 +25,7 @@ import com.sksamuel.elastic4s.requests.bulk.BulkRequest import com.sksamuel.elastic4s.requests.bulk.BulkResponse import com.typesafe.scalalogging.StrictLogging import io.lenses.kcql.Kcql +import io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow import io.lenses.streamreactor.connect.elastic7.config.ElasticSettings import io.lenses.streamreactor.connect.elastic7.indexname.CreateIndex.getIndexNameForAutoCreate import org.apache.http.auth.AuthScope @@ -43,28 +44,31 @@ trait KElasticClient extends AutoCloseable { object KElasticClient extends StrictLogging { - def createHttpClient(settings: ElasticSettings, endpoints: Seq[ElasticNodeEndpoint]): KElasticClient = - if (settings.httpBasicAuthUsername.nonEmpty && settings.httpBasicAuthPassword.nonEmpty) { - lazy val provider = { - val provider = new BasicCredentialsProvider - val credentials = - new UsernamePasswordCredentials(settings.httpBasicAuthUsername, settings.httpBasicAuthPassword) + def createHttpClient(settings: ElasticSettings, endpoints: Seq[ElasticNodeEndpoint]): KElasticClient = { + val maybeProvider: Option[BasicCredentialsProvider] = { + for { + httpBasicAuthUsername <- Option.when(settings.httpBasicAuthUsername.nonEmpty)(settings.httpBasicAuthUsername) + httpBasicAuthPassword <- Option.when(settings.httpBasicAuthPassword.nonEmpty)(settings.httpBasicAuthPassword) + } yield { + val credentials = new UsernamePasswordCredentials(httpBasicAuthUsername, httpBasicAuthPassword) + val provider = new BasicCredentialsProvider provider.setCredentials(AuthScope.ANY, credentials) provider } - - val javaClient = JavaClient( + } + val client: ElasticClient = ElasticClient( + JavaClient( ElasticProperties(endpoints), (requestConfigBuilder: Builder) => requestConfigBuilder, - (httpClientBuilder: HttpAsyncClientBuilder) => httpClientBuilder.setDefaultCredentialsProvider(provider), - ) - - val client: ElasticClient = ElasticClient(javaClient) - new HttpKElasticClient(client) - } else { - val client: ElasticClient = ElasticClient(JavaClient(ElasticProperties(endpoints))) - new HttpKElasticClient(client) - } + (httpClientBuilder: HttpAsyncClientBuilder) => { + maybeProvider.foreach(httpClientBuilder.setDefaultCredentialsProvider) + unpackOrThrow(settings.storesInfo.toSslContext).map(httpClientBuilder.setSSLContext(_)) + httpClientBuilder + }, + ), + ) + new HttpKElasticClient(client) + } } class HttpKElasticClient(client: ElasticClient) extends KElasticClient { @@ -80,7 +84,6 @@ class HttpKElasticClient(client: ElasticClient) extends KElasticClient { createIndex(indexName) } } - () } diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/config/ElasticConfig.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/config/ElasticConfig.scala index 31237b4c6..f1a542ca9 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/config/ElasticConfig.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/config/ElasticConfig.scala @@ -191,6 +191,7 @@ object ElasticConfig { ConfigDef.Width.MEDIUM, ElasticConfigConstants.PROGRESS_COUNTER_ENABLED_DISPLAY, ) + .withClientSslSupport() } /** diff --git a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/config/ElasticSettings.scala b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/config/ElasticSettings.scala index 83341edd3..e1cd3e506 100644 --- a/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/config/ElasticSettings.scala +++ b/kafka-connect-elastic7/src/main/scala/io/lenses/streamreactor/connect/elastic7/config/ElasticSettings.scala @@ -15,8 +15,11 @@ */ package io.lenses.streamreactor.connect.elastic7.config +import cyclops.control.Option.none import io.lenses.kcql.Kcql import io.lenses.streamreactor.common.errors.ErrorPolicy +import io.lenses.streamreactor.common.security.StoresInfo +import io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow /** * Created by andrew@datamountaineer.com on 13/05/16. @@ -25,12 +28,13 @@ import io.lenses.streamreactor.common.errors.ErrorPolicy case class ElasticSettings( kcqls: Seq[Kcql], errorPolicy: ErrorPolicy, - taskRetries: Int = ElasticConfigConstants.NBR_OF_RETIRES_DEFAULT, - writeTimeout: Int = ElasticConfigConstants.WRITE_TIMEOUT_DEFAULT, - batchSize: Int = ElasticConfigConstants.BATCH_SIZE_DEFAULT, - pkJoinerSeparator: String = ElasticConfigConstants.PK_JOINER_SEPARATOR_DEFAULT, - httpBasicAuthUsername: String = ElasticConfigConstants.CLIENT_HTTP_BASIC_AUTH_USERNAME_DEFAULT, - httpBasicAuthPassword: String = ElasticConfigConstants.CLIENT_HTTP_BASIC_AUTH_USERNAME_DEFAULT, + taskRetries: Int = ElasticConfigConstants.NBR_OF_RETIRES_DEFAULT, + writeTimeout: Int = ElasticConfigConstants.WRITE_TIMEOUT_DEFAULT, + batchSize: Int = ElasticConfigConstants.BATCH_SIZE_DEFAULT, + pkJoinerSeparator: String = ElasticConfigConstants.PK_JOINER_SEPARATOR_DEFAULT, + httpBasicAuthUsername: String = ElasticConfigConstants.CLIENT_HTTP_BASIC_AUTH_USERNAME_DEFAULT, + httpBasicAuthPassword: String = ElasticConfigConstants.CLIENT_HTTP_BASIC_AUTH_USERNAME_DEFAULT, + storesInfo: StoresInfo = new StoresInfo(none(), none(), none()), ) object ElasticSettings { @@ -46,14 +50,16 @@ object ElasticSettings { val batchSize = config.getInt(ElasticConfigConstants.BATCH_SIZE_CONFIG) - ElasticSettings(kcql, - errorPolicy, - retries, - writeTimeout, - batchSize, - pkJoinerSeparator, - httpBasicAuthUsername, - httpBasicAuthPassword, + ElasticSettings( + kcql, + errorPolicy, + retries, + writeTimeout, + batchSize, + pkJoinerSeparator, + httpBasicAuthUsername, + httpBasicAuthPassword, + unpackOrThrow(StoresInfo.fromConfig(config)), ) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 26042528e..2dd45a0b1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -192,7 +192,8 @@ object Dependencies { val `junitJupiterParams` = "org.junit.jupiter" % "junit-jupiter-params" % junitJupiterVersion val `assertjCore` = "org.assertj" % "assertj-core" % assertjCoreVersion - val `cyclops` = "com.oath.cyclops" % "cyclops" % cyclopsVersion + val `cyclops` = "com.oath.cyclops" % "cyclops" % cyclopsVersion + val `cyclopsPure` = "com.oath.cyclops" % "cyclops-pure" % cyclopsVersion val catsEffectScalatest = "org.typelevel" %% "cats-effect-testing-scalatest" % `cats-effect-testing` @@ -437,7 +438,7 @@ trait Dependencies { jerseyCommon, avro4s, kafkaClients, - ) ++ enumeratum ++ circe ++ http4s + ) ++ enumeratum ++ circe ++ http4s ++ bouncyCastle //Specific modules dependencies val sqlCommonDeps: Seq[ModuleID] = loggingDeps ++ Seq( @@ -468,8 +469,9 @@ trait Dependencies { confluentJsonSchemaSerializer, ) ++ enumeratum ++ circe - val javaCommonDeps: Seq[ModuleID] = Seq(lombok, kafkaConnectJson, kafkaClients, cyclops) - val javaCommonTestDeps: Seq[ModuleID] = Seq(junitJupiter, junitJupiterParams, assertjCore, `mockitoJava`, logback) + val javaCommonDeps: Seq[ModuleID] = Seq(lombok, kafkaConnectJson, kafkaClients, cyclops, `cyclopsPure`) + val javaCommonTestDeps: Seq[ModuleID] = + Seq(junitJupiter, junitJupiterParams, assertjCore, `mockitoJava`, logback) ++ bouncyCastle //Specific modules dependencies