Skip to content

Commit

Permalink
Ascii Art Loading Fix (All Connectors) (#941)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan authored May 12, 2023
1 parent e263402 commit 6df7680
Show file tree
Hide file tree
Showing 23 changed files with 145 additions and 76 deletions.
28 changes: 28 additions & 0 deletions kafka-connect-aws-s3/src/main/resources/aws-s3-sink-ascii.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

████████▀▀▀▀▀███████████████████████████████████████████████████████████████████
█████▀ ▀████████████████████████████████████████████████████████████████
███▀ ▄█████▄ ▀██████████████████████████████████████████████████████████████
███ ▄███████▄ ██████ █████▌ █▌ ████ ███ ▄▄ ██ ███ ▄▄ ███
███ █████████ ██████ █████▌ ██████▌ ▀██ ██ ██████ ██████ ███████
███ ▀███████▀ ██████ █████▌ ██▌ █▄ █ ███▄▄ ██ ███▄▄ ███
████▄ ▄███████ █████▌ ██████▌ ███ ███████ █ ███████████ ██
█████████ ████████████ ▌ █▌ ████▄ ██▄ ▄██ █▄ ▄███
█████████ ████████████████████████████████████████████████████████████████████
█████████ ▄████████████████████████████████████████████████████████████████████
████████████████████████████████████████████████████████████████████████████████
__ _______ _____ ____
/\ \ / / ____| / ____|___ \
/ \ \ /\ / / (___ | (___ __) |
/ /\ \ \/ \/ / \___ \ \___ \ |__ <
/ ____ \ /\ / ____) | ____) |___) |
/_/____\_\/ \/ _|_____/ |_____/|____/
/ ____(_) | |
| (___ _ _ __ | | __
\___ \| | '_ \| |/ /
____) | | | | | <
|_____/|_|_| |_|_|\_\ _
/ ____| | |
| | ___ _ __ _ __ ___ ___| |_ ___ _ __
| | / _ \| '_ \| '_ \ / _ \/ __| __/ _ \| '__|
| |___| (_) | | | | | | | __/ (__| || (_) | |
\_____\___/|_| |_|_| |_|\___|\___|\__\___/|_|
28 changes: 28 additions & 0 deletions kafka-connect-aws-s3/src/main/resources/aws-s3-source-ascii.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

████████▀▀▀▀▀███████████████████████████████████████████████████████████████████
█████▀ ▀████████████████████████████████████████████████████████████████
███▀ ▄█████▄ ▀██████████████████████████████████████████████████████████████
███ ▄███████▄ ██████ █████▌ █▌ ████ ███ ▄▄ ██ ███ ▄▄ ███
███ █████████ ██████ █████▌ ██████▌ ▀██ ██ ██████ ██████ ███████
███ ▀███████▀ ██████ █████▌ ██▌ █▄ █ ███▄▄ ██ ███▄▄ ███
████▄ ▄███████ █████▌ ██████▌ ███ ███████ █ ███████████ ██
█████████ ████████████ ▌ █▌ ████▄ ██▄ ▄██ █▄ ▄███
█████████ ████████████████████████████████████████████████████████████████████
█████████ ▄████████████████████████████████████████████████████████████████████
████████████████████████████████████████████████████████████████████████████████
__ _______ _____ ____
/\ \ / / ____| / ____|___ \
/ \ \ /\ / / (___ | (___ __) |
/ /\ \ \/ \/ / \___ \ \___ \ |__ <
/ ____ \ /\ / ____) | ____) |___) |
/_/____\_\/ \/ |_____/ |_____/|____/
/ ____|
| (___ ___ _ _ _ __ ___ ___
\___ \ / _ \| | | | '__/ __/ _ \
____) | (_) | |_| | | | (_| __/
|_____/ \___/ \__,_|_| \___\___| _
/ ____| | |
| | ___ _ __ _ __ ___ ___| |_ ___ _ __
| | / _ \| '_ \| '_ \ / _ \/ __| __/ _ \| '__|
| |___| (_) | | | | | | | __/ (__| || (_) | |
\_____\___/|_| |_|_| |_|\___|\___|\__\___/|_|
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.lenses.streamreactor.connect.aws.s3.sink
import cats.implicits._
import com.datamountaineer.streamreactor.common.errors.ErrorHandler
import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.aws.s3.auth.AuthResources
import io.lenses.streamreactor.connect.aws.s3.config.S3Config
Expand Down Expand Up @@ -50,6 +51,9 @@ class S3SinkTask extends SinkTask with ErrorHandler {
override def version(): String = manifest.version()

override def start(props: util.Map[String, String]): Unit = {

printAsciiHeader(manifest, "/aws-s3-sink-ascii.txt")

sinkName = getSinkName(props).getOrElse("MissingSinkName")

logger.debug(s"[{}] S3SinkTask.start", sinkName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.lenses.streamreactor.connect.aws.s3.source

import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.auth.AuthResources
Expand Down Expand Up @@ -55,6 +56,9 @@ class S3SourceTask extends SourceTask with LazyLogging {
* Start sets up readers for every configured connection in the properties
*/
override def start(props: util.Map[String, String]): Unit = {

printAsciiHeader(manifest, "/aws-s3-source-ascii.txt")

sourceName = getSourceName(props).getOrElse("MissingSourceName")

logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.datamountaineer.streamreactor.connect.azure.documentdb.sink

import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
import com.datamountaineer.streamreactor.connect.azure.documentdb.config.DocumentDbConfig
Expand Down Expand Up @@ -57,10 +58,7 @@ class DocumentDbSinkTask extends SinkTask with StrictLogging {
case Success(s) => s
}

logger.info(scala.io.Source.fromInputStream(
this.getClass.getResourceAsStream("/documentdb-sink-ascii.txt"),
).mkString + s" $version")
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/documentdb-sink-ascii.txt")

writer = Some(DocumentDbWriter(taskConfig, context))
enableProgress = taskConfig.getBoolean(DocumentDbConfigConstants.PROGRESS_COUNTER_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.datamountaineer.streamreactor.connect.cassandra.sink

import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter

Expand Down Expand Up @@ -50,10 +51,7 @@ class CassandraSinkTask extends SinkTask with StrictLogging {
* Parse the configurations and setup the writer
*/
override def start(props: util.Map[String, String]): Unit = {
logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/cass-sink-ascii.txt")).mkString + s" $version",
)
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/cass-sink-ascii.txt")

val config = if (context.configs().isEmpty) props else context.configs()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datamountaineer.streamreactor.connect.cassandra.source

import com.datamountaineer.streamreactor.common.queues.QueueHelpers
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest

import java.util
Expand Down Expand Up @@ -62,11 +63,7 @@ class CassandraSourceTask extends SourceTask with StrictLogging {
* @param props A map of supplied properties.
*/
override def start(props: util.Map[String, String]): Unit = {

logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/cass-source-ascii.txt")).mkString + version,
)
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/cass-source-ascii.txt")

val config = if (context.configs().isEmpty) props else context.configs()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datamountaineer.streamreactor.common.utils

import com.typesafe.scalalogging.LazyLogging

import java.nio.charset.CodingErrorAction
import scala.io.Codec
import scala.io.Source

object AsciiArtPrinter extends LazyLogging {

def printAsciiHeader(manifest: JarManifest, asciiArtResource: String): Unit = {
implicit val codec: Codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
logger.info(
Source.fromInputStream(
getClass.getResourceAsStream(asciiArtResource),
).mkString + s" ${manifest.version()}",
)
logger.info(manifest.printManifest())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datamountaineer.streamreactor.connect.elastic6

import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
import com.datamountaineer.streamreactor.connect.elastic6.config.ElasticConfig
Expand All @@ -40,10 +41,7 @@ class ElasticSinkTask extends SinkTask with StrictLogging {
* Parse the configurations and setup the writer
*/
override def start(props: util.Map[String, String]): Unit = {
logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/elastic-ascii.txt")).mkString + s" $version",
)
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/elastic-ascii.txt")

val conf = if (context.configs().isEmpty) props else context.configs()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datamountaineer.streamreactor.connect.elastic7

import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
import com.datamountaineer.streamreactor.connect.elastic7.config.ElasticConfig
Expand All @@ -40,10 +41,7 @@ class ElasticSinkTask extends SinkTask with StrictLogging {
* Parse the configurations and setup the writer
*/
override def start(props: util.Map[String, String]): Unit = {
logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/elastic-ascii.txt")).mkString + s" $version",
)
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/elastic-ascii.txt")

val conf = if (context.configs().isEmpty) props else context.configs()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.datamountaineer.streamreactor.connect.ftp.source

import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.connect.connector.Task
Expand Down Expand Up @@ -44,9 +45,9 @@ class FtpSourceConnector extends SourceConnector with StrictLogging {
logger.info("stop")

override def start(props: util.Map[String, String]): Unit = {
logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/ftp-source-ascii.txt")).mkString + s" $version",
)

printAsciiHeader(manifest, "/ftp-source-ascii.txt")

logger.info(s"start FtpSourceConnector")

configProps = Some(props)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datamountaineer.streamreactor.connect.hazelcast.sink

import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
import com.datamountaineer.streamreactor.connect.hazelcast.config.HazelCastSinkConfig
Expand Down Expand Up @@ -45,10 +46,7 @@ class HazelCastSinkTask extends SinkTask with StrictLogging {
* Parse the configurations and setup the writer
*/
override def start(props: util.Map[String, String]): Unit = {
logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/hazelcast-ascii.txt")).mkString + s" $version",
)
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/hazelcast-ascii.txt")

if (Option(System.getProperty("hazelcast.logging.type")).isEmpty) {
System.setProperty("hazelcast.logging.type", "slf4j")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datamountaineer.streamreactor.connect.hbase

import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
import com.datamountaineer.streamreactor.connect.hbase.config.ConfigurationBuilder
Expand Down Expand Up @@ -53,8 +54,9 @@ class HbaseSinkTask extends SinkTask with StrictLogging {
* Parse the configurations and setup the writer
*/
override def start(props: util.Map[String, String]): Unit = {
logger.info(scala.io.Source.fromInputStream(getClass.getResourceAsStream("/hbase-ascii.txt")).mkString)
logger.info(manifest.printManifest())

printAsciiHeader(manifest, "/hbase-ascii.txt")

val conf = if (context.configs().isEmpty) props else context.configs()

HBaseConfig.config.parse(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datamountaineer.streamreactor.connect.influx

import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
import com.datamountaineer.streamreactor.connect.influx.config.InfluxConfig
Expand Down Expand Up @@ -48,10 +49,7 @@ class InfluxSinkTask extends SinkTask with StrictLogging {
* Parse the configurations and setup the writer
*/
override def start(props: util.Map[String, String]): Unit = {
logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/influx-ascii.txt")).mkString + s" $version",
)
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/influx-ascii.txt")

val conf = if (context.configs().isEmpty) props else context.configs()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datamountaineer.streamreactor.connect.jms.sink

import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy
import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
import com.datamountaineer.streamreactor.connect.jms.config.JMSConfig
Expand Down Expand Up @@ -47,10 +48,7 @@ class JMSSinkTask extends SinkTask with StrictLogging {
* Parse the configurations and setup the writer
*/
override def start(props: util.Map[String, String]): Unit = {
logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/jms-sink-ascii.txt")).mkString + s" $version",
)
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/jms-sink-ascii.txt")

val conf = if (context.configs().isEmpty) props else context.configs()
JMSConfig.config.parse(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.datamountaineer.streamreactor.connect.jms.source

import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import com.datamountaineer.streamreactor.common.utils.JarManifest
import com.datamountaineer.streamreactor.common.utils.ProgressCounter
import com.datamountaineer.streamreactor.connect.jms.config.JMSConfig
Expand Down Expand Up @@ -55,10 +56,7 @@ class JMSSourceTask extends SourceTask with StrictLogging {
private var evictThreshold: Int = 0

override def start(props: util.Map[String, String]): Unit = {
logger.info(
scala.io.Source.fromInputStream(getClass.getResourceAsStream("/jms-source-ascii.txt")).mkString + s" $version",
)
logger.info(manifest.printManifest())
printAsciiHeader(manifest, "/jms-source-ascii.txt")

val conf = if (context.configs().isEmpty) props else context.configs()

Expand Down
Loading

0 comments on commit 6df7680

Please sign in to comment.