Skip to content

Commit

Permalink
Fix the build errors
Browse files Browse the repository at this point in the history
  • Loading branch information
stheppi committed Nov 2, 2023
1 parent 1f7947f commit 61a7d89
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datamountaineer.streamreactor.connect.redis.sink

import com.datamountaineer.streamreactor.connect.redis.sink.config.RedisSinkSettings
import redis.clients.jedis.Jedis

import java.io.File
import java.io.FileNotFoundException

object JedisClientBuilder {
def createClient(sinkSettings: RedisSinkSettings): Jedis = {
val connection = sinkSettings.connectionInfo

if (connection.isSslConnection) {
connection.keyStoreFilepath match {
case Some(path) =>
if (!new File(path).exists) {
throw new FileNotFoundException(s"Keystore not found in: [$path]")
}

System.setProperty("javax.net.ssl.keyStorePassword", connection.keyStorePassword.getOrElse(""))
System.setProperty("javax.net.ssl.keyStore", path)
System.setProperty("javax.net.ssl.keyStoreType", connection.keyStoreType.getOrElse("jceks"))

case None =>
}

connection.trustStoreFilepath match {
case Some(path) =>
if (!new File(path).exists) {
throw new FileNotFoundException(s"Truststore not found in: $path")
}

System.setProperty("javax.net.ssl.trustStorePassword", connection.trustStorePassword.getOrElse(""))
System.setProperty("javax.net.ssl.trustStore", path)
System.setProperty("javax.net.ssl.trustStoreType", connection.trustStoreType.getOrElse("jceks"))

case None =>
}
}

val jedis = new Jedis(connection.host, connection.port, connection.isSslConnection)
connection.password.foreach(p => jedis.auth(p))
jedis
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datamountaineer.streamreactor.connect.redis.sink

import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy
import com.datamountaineer.streamreactor.common.sink.DbWriter
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
Expand All @@ -40,7 +41,7 @@ import scala.jdk.CollectionConverters.ListHasAsScala
* target sink
*/
class RedisSinkTask extends SinkTask with StrictLogging {
var writer: List[RedisWriter] = List[RedisWriter]()
var writer: List[DbWriter] = List[DbWriter]()
private val progressCounter = new ProgressCounter
private var enableProgress: Boolean = false
private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
Expand Down Expand Up @@ -84,34 +85,31 @@ class RedisSinkTask extends SinkTask with StrictLogging {

val mode_STREAM = filterStream(settings)

val jedis = JedisClientBuilder.createClient(settings)

//-- Start as many writers as required
writer = (modeCache.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${modeCache.kcqlSettings.size}] KCQLs with Redis Cache mode")
val writer = new RedisCache(modeCache)
writer.createClient(settings)
val writer = new RedisCache(modeCache, jedis)
List(writer)
} ++ mode_INSERT_SS.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting ${mode_INSERT_SS.kcqlSettings.size}] KCQLs with Redis Insert Sorted Set mode")
val writer = new RedisInsertSortedSet(mode_INSERT_SS)
writer.createClient(settings)
val writer = new RedisInsertSortedSet(mode_INSERT_SS, jedis)
List(writer)
} ++ mode_PUBSUB.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${mode_PUBSUB.kcqlSettings.size}] KCQLs with Redis PubSub mode")
val writer = new RedisPubSub(mode_PUBSUB)
writer.createClient(settings)
val writer = new RedisPubSub(mode_PUBSUB, jedis)
List(writer)
} ++ mode_PK_SS.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${mode_PK_SS.kcqlSettings.size}] KCQLs with Redis Multiple Sorted Sets mode")
val writer = new RedisMultipleSortedSets(mode_PK_SS)
writer.createClient(settings)
val writer = new RedisMultipleSortedSets(mode_PK_SS, jedis)
List(writer)
} ++ mode_GEOADD.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${mode_GEOADD.kcqlSettings.size}] KCQLs with Redis Geo Add mode")
List(new RedisGeoAdd(mode_GEOADD))
List(new RedisGeoAdd(mode_GEOADD, jedis))
} ++ mode_STREAM.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${mode_STREAM.kcqlSettings.size}] KCQLs with Redis Stream mode")
val writer = new RedisStreams(mode_STREAM)
writer.createClient(settings)
val writer = new RedisStreams(mode_STREAM, jedis)
List(writer)
}).flatten.toList

Expand Down Expand Up @@ -163,7 +161,7 @@ class RedisSinkTask extends SinkTask with StrictLogging {
* @param settings The RedisSinkSettings containing all kcqlConfigs.
* @return A RedisSinkSettings object containing only the kcqlConfigs that use the PubSub mode.
*/
def filterModePubSub(settings: RedisSinkSettings): RedisSinkSettings = settings.copy(
private def filterModePubSub(settings: RedisSinkSettings): RedisSinkSettings = settings.copy(
kcqlSettings =
settings.kcqlSettings
.filter { k =>
Expand Down Expand Up @@ -200,7 +198,7 @@ class RedisSinkTask extends SinkTask with StrictLogging {
* @param settings The RedisSinkSettings containing all kcqlConfigs.
* @return A RedisSinkSettings object containing only the kcqlConfigs that use the Geo Add mode.
*/
def filterGeoAddMode(settings: RedisSinkSettings): RedisSinkSettings = settings.copy(
private def filterGeoAddMode(settings: RedisSinkSettings): RedisSinkSettings = settings.copy(
kcqlSettings =
settings.kcqlSettings
.filter { k =>
Expand Down

0 comments on commit 61a7d89

Please sign in to comment.