From 1109a783f653bf770e51100fcd02e5713e1ca144 Mon Sep 17 00:00:00 2001 From: David Sloan <33483659+davidsloan@users.noreply.github.com> Date: Fri, 27 Sep 2024 12:45:59 +0100 Subject: [PATCH] ReportingController NPE fix (#117) --- .../reporting/ReportingController.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/ReportingController.java b/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/ReportingController.java index 9333eedfc..f8efd2954 100644 --- a/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/ReportingController.java +++ b/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/ReportingController.java @@ -15,21 +15,12 @@ */ package io.lenses.streamreactor.connect.reporting; -import static io.lenses.streamreactor.common.util.StringUtils.isBlank; - import cyclops.control.Try; import io.lenses.streamreactor.common.exception.StreamReactorException; import io.lenses.streamreactor.connect.reporting.config.ReportProducerConfigConst; import io.lenses.streamreactor.connect.reporting.config.ReporterConfig; import io.lenses.streamreactor.connect.reporting.model.RecordReport; import io.lenses.streamreactor.connect.reporting.model.SinkRecordRecordReport; -import java.time.Duration; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; @@ -40,6 +31,16 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static io.lenses.streamreactor.common.util.StringUtils.isBlank; + @Slf4j public abstract class ReportingController { @@ -55,7 +56,7 @@ public abstract class ReportingController { private final Producer producer; private final ExecutorService executorService; private final String reportTopic; - private final String reportingClientId; + private final String reportingClientId = createProducerId(); protected ReportingController(Map senderConfig) { @@ -68,7 +69,6 @@ protected ReportingController(Map senderConfig) { this.producer = senderEnabled ? createKafkaProducer(senderConfig) : null; this.reportHolder = senderEnabled ? new ReportHolder(null) : null; this.executorService = senderEnabled ? Executors.newFixedThreadPool(1) : null; - this.reportingClientId = senderEnabled ? createProducerId() : null; } /**