From 29672397c381e12f9b38b0dd1d86665b736524f6 Mon Sep 17 00:00:00 2001 From: Amir Halatzi Date: Wed, 29 Dec 2021 13:43:40 +0200 Subject: [PATCH] feat(kafka): add extraOptions to KafkaOutputWriter (#467) --- config/job_config_sample.yaml | 3 +++ .../writers/kafka/KafkaOutputWriter.scala | 25 ++++++++++--------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/config/job_config_sample.yaml b/config/job_config_sample.yaml index 14aadc4a4..745911020 100644 --- a/config/job_config_sample.yaml +++ b/config/job_config_sample.yaml @@ -52,6 +52,9 @@ inputs: topic: some_topic schemaRegistryUrl: https://schema-registry-url # optional schemaSubject: subject # optional + # Add any other options supported by the DataStreamWriter/Kafka Producer + extraOptions: + opt: val input_6: cassandra: host: 127.0.0.1 diff --git a/src/main/scala/com/yotpo/metorikku/output/writers/kafka/KafkaOutputWriter.scala b/src/main/scala/com/yotpo/metorikku/output/writers/kafka/KafkaOutputWriter.scala index a13edc4fd..13af9dbfc 100644 --- a/src/main/scala/com/yotpo/metorikku/output/writers/kafka/KafkaOutputWriter.scala +++ b/src/main/scala/com/yotpo/metorikku/output/writers/kafka/KafkaOutputWriter.scala @@ -14,7 +14,8 @@ class KafkaOutputWriter(props: Map[String, String], config: Option[Kafka]) exten valueColumn: String, outputMode: String, triggerType: Option[String], - triggerDuration: String) + triggerDuration: String, + extraOptions: Option[Map[String, String]]) val log: Logger = LogManager.getLogger(this.getClass) @@ -33,22 +34,25 @@ class KafkaOutputWriter(props: Map[String, String], config: Option[Kafka]) exten valueColumn, props.getOrElse("outputMode", "append"), props.get("triggerType"), - props.getOrElse("triggerDuration", "10 seconds")) + props.getOrElse("triggerDuration", "10 seconds"), + props.get("extraOptions").asInstanceOf[Option[Map[String, String]]]) override def write(dataFrame: DataFrame): Unit = { config match { case Some(kafkaConfig) => - val bootstrapServers = kafkaConfig.servers.mkString(",") - log.info(s"Writing Dataframe to Kafka Topic ${kafkaOptions.topic}") val df: DataFrame = selectedColumnsDataframe(dataFrame) - df.write.format("kafka") - .option("kafka.bootstrap.servers", bootstrapServers) - .option("topic", kafkaOptions.topic) - .save() + log.info(s"Writing Dataframe to Kafka Topic ${kafkaOptions.topic}") + df.write.format("kafka").options(getKafkaOptions(kafkaConfig)).save() + case None => } } + private def getKafkaOptions[T](kafkaConfig:Kafka):Map[String,String] = { + Map[String,String]("kafka.bootstrap.servers"->kafkaConfig.servers.mkString(","), + "topic"->kafkaOptions.topic)++kafkaOptions.extraOptions.getOrElse(Map[String, String]()) + } + private def selectedColumnsDataframe(dataFrame: DataFrame) = { val selectExpression = kafkaOptions.keyColumn match { case None => @@ -62,14 +66,11 @@ class KafkaOutputWriter(props: Map[String, String], config: Option[Kafka]) exten override def writeStream(dataFrame: DataFrame, streamingConfig: Option[Streaming]): Unit = { config match { case Some(kafkaConfig) => - val bootstrapServers = kafkaConfig.servers.mkString(",") log.info(s"Writing Dataframe to Kafka Topic ${kafkaOptions.topic}") val df: DataFrame = selectedColumnsDataframe(dataFrame) val kafkaOutputStream = df.writeStream.format("kafka") - kafkaOutputStream - .option("kafka.bootstrap.servers", bootstrapServers) - .option("topic", kafkaOptions.topic) + kafkaOutputStream.options(getKafkaOptions(kafkaConfig)) kafkaConfig.compressionType match { case Some(compressionType) => kafkaOutputStream.option("kafka.compression.type",compressionType)