From 61a7d89885ba13de47cf6f964236fbbb7633d546 Mon Sep 17 00:00:00 2001 From: stheppi Date: Thu, 2 Nov 2023 10:26:58 +0000 Subject: [PATCH] Fix the build errors --- .../redis/sink/JedisClientBuilder.scala | 60 +++++++++++++++++++ .../connect/redis/sink/RedisSinkTask.scala | 26 ++++---- 2 files changed, 72 insertions(+), 14 deletions(-) create mode 100644 kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/JedisClientBuilder.scala diff --git a/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/JedisClientBuilder.scala b/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/JedisClientBuilder.scala new file mode 100644 index 000000000..8305dfb62 --- /dev/null +++ b/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/JedisClientBuilder.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datamountaineer.streamreactor.connect.redis.sink + +import com.datamountaineer.streamreactor.connect.redis.sink.config.RedisSinkSettings +import redis.clients.jedis.Jedis + +import java.io.File +import java.io.FileNotFoundException + +object JedisClientBuilder { + def createClient(sinkSettings: RedisSinkSettings): Jedis = { + val connection = sinkSettings.connectionInfo + + if (connection.isSslConnection) { + connection.keyStoreFilepath match { + case Some(path) => + if (!new File(path).exists) { + throw new FileNotFoundException(s"Keystore not found in: [$path]") + } + + System.setProperty("javax.net.ssl.keyStorePassword", connection.keyStorePassword.getOrElse("")) + System.setProperty("javax.net.ssl.keyStore", path) + System.setProperty("javax.net.ssl.keyStoreType", connection.keyStoreType.getOrElse("jceks")) + + case None => + } + + connection.trustStoreFilepath match { + case Some(path) => + if (!new File(path).exists) { + throw new FileNotFoundException(s"Truststore not found in: $path") + } + + System.setProperty("javax.net.ssl.trustStorePassword", connection.trustStorePassword.getOrElse("")) + System.setProperty("javax.net.ssl.trustStore", path) + System.setProperty("javax.net.ssl.trustStoreType", connection.trustStoreType.getOrElse("jceks")) + + case None => + } + } + + val jedis = new Jedis(connection.host, connection.port, connection.isSslConnection) + connection.password.foreach(p => jedis.auth(p)) + jedis + } +} diff --git a/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/RedisSinkTask.scala b/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/RedisSinkTask.scala index b8e901ce3..8b952e68a 100644 --- a/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/RedisSinkTask.scala +++ b/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/RedisSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.redis.sink import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.sink.DbWriter import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter @@ -40,7 +41,7 @@ import scala.jdk.CollectionConverters.ListHasAsScala * target sink */ class RedisSinkTask extends SinkTask with StrictLogging { - var writer: List[RedisWriter] = List[RedisWriter]() + var writer: List[DbWriter] = List[DbWriter]() private val progressCounter = new ProgressCounter private var enableProgress: Boolean = false private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) @@ -84,34 +85,31 @@ class RedisSinkTask extends SinkTask with StrictLogging { val mode_STREAM = filterStream(settings) + val jedis = JedisClientBuilder.createClient(settings) + //-- Start as many writers as required writer = (modeCache.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${modeCache.kcqlSettings.size}] KCQLs with Redis Cache mode") - val writer = new RedisCache(modeCache) - writer.createClient(settings) + val writer = new RedisCache(modeCache, jedis) List(writer) } ++ mode_INSERT_SS.kcqlSettings.headOption.map { _ => logger.info(s"Starting ${mode_INSERT_SS.kcqlSettings.size}] KCQLs with Redis Insert Sorted Set mode") - val writer = new RedisInsertSortedSet(mode_INSERT_SS) - writer.createClient(settings) + val writer = new RedisInsertSortedSet(mode_INSERT_SS, jedis) List(writer) } ++ mode_PUBSUB.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${mode_PUBSUB.kcqlSettings.size}] KCQLs with Redis PubSub mode") - val writer = new RedisPubSub(mode_PUBSUB) - writer.createClient(settings) + val writer = new RedisPubSub(mode_PUBSUB, jedis) List(writer) } ++ mode_PK_SS.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${mode_PK_SS.kcqlSettings.size}] KCQLs with Redis Multiple Sorted Sets mode") - val writer = new RedisMultipleSortedSets(mode_PK_SS) - writer.createClient(settings) + val writer = new RedisMultipleSortedSets(mode_PK_SS, jedis) List(writer) } ++ mode_GEOADD.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${mode_GEOADD.kcqlSettings.size}] KCQLs with Redis Geo Add mode") - List(new RedisGeoAdd(mode_GEOADD)) + List(new RedisGeoAdd(mode_GEOADD, jedis)) } ++ mode_STREAM.kcqlSettings.headOption.map { _ => logger.info(s"Starting [${mode_STREAM.kcqlSettings.size}] KCQLs with Redis Stream mode") - val writer = new RedisStreams(mode_STREAM) - writer.createClient(settings) + val writer = new RedisStreams(mode_STREAM, jedis) List(writer) }).flatten.toList @@ -163,7 +161,7 @@ class RedisSinkTask extends SinkTask with StrictLogging { * @param settings The RedisSinkSettings containing all kcqlConfigs. * @return A RedisSinkSettings object containing only the kcqlConfigs that use the PubSub mode. */ - def filterModePubSub(settings: RedisSinkSettings): RedisSinkSettings = settings.copy( + private def filterModePubSub(settings: RedisSinkSettings): RedisSinkSettings = settings.copy( kcqlSettings = settings.kcqlSettings .filter { k => @@ -200,7 +198,7 @@ class RedisSinkTask extends SinkTask with StrictLogging { * @param settings The RedisSinkSettings containing all kcqlConfigs. * @return A RedisSinkSettings object containing only the kcqlConfigs that use the Geo Add mode. */ - def filterGeoAddMode(settings: RedisSinkSettings): RedisSinkSettings = settings.copy( + private def filterGeoAddMode(settings: RedisSinkSettings): RedisSinkSettings = settings.copy( kcqlSettings = settings.kcqlSettings .filter { k =>