Skip to content

Commit

Permalink
Header fix?
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed May 10, 2023
1 parent e263402 commit b9a8b70
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.connect.source.SourceRecord
import org.apache.kafka.connect.source.SourceTask

import java.nio.charset.CodingErrorAction
import scala.io.{Codec, Source}
import scala.jdk.CollectionConverters.ListHasAsScala
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.util.Failure
Expand All @@ -43,11 +45,7 @@ class MqttSourceTask extends SourceTask with StrictLogging {
private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)

override def start(props: util.Map[String, String]): Unit = {

logger.info(scala.io.Source.fromInputStream(
this.getClass.getResourceAsStream("/mqtt-source-ascii.txt"),
).mkString + s" $version")
logger.info(manifest.printManifest())
printAsciiHeader()

val conf = if (context.configs().isEmpty) props else context.configs()

Expand Down Expand Up @@ -90,6 +88,17 @@ class MqttSourceTask extends SourceTask with StrictLogging {
enableProgress = settings.enableProgress
}

private def printAsciiHeader(): Unit = {
implicit val codec: Codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
logger.info(
Source.fromInputStream(
getClass.getResourceAsStream("/mqtt-source-ascii.txt"),
).mkString + s" $version")
logger.info(manifest.printManifest())
}

/**
* Get all the messages accumulated so far.
*/
Expand Down

0 comments on commit b9a8b70

Please sign in to comment.