Skip to content

Commit

Permalink
ReportingController NPE fix (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan authored Sep 27, 2024
1 parent 9287343 commit 1109a78
Showing 1 changed file with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,12 @@
*/
package io.lenses.streamreactor.connect.reporting;

import static io.lenses.streamreactor.common.util.StringUtils.isBlank;

import cyclops.control.Try;
import io.lenses.streamreactor.common.exception.StreamReactorException;
import io.lenses.streamreactor.connect.reporting.config.ReportProducerConfigConst;
import io.lenses.streamreactor.connect.reporting.config.ReporterConfig;
import io.lenses.streamreactor.connect.reporting.model.RecordReport;
import io.lenses.streamreactor.connect.reporting.model.SinkRecordRecordReport;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -40,6 +31,16 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static io.lenses.streamreactor.common.util.StringUtils.isBlank;

@Slf4j
public abstract class ReportingController {

Expand All @@ -55,7 +56,7 @@ public abstract class ReportingController {
private final Producer<byte[], String> producer;
private final ExecutorService executorService;
private final String reportTopic;
private final String reportingClientId;
private final String reportingClientId = createProducerId();

protected ReportingController(Map<String, Object> senderConfig) {

Expand All @@ -68,7 +69,6 @@ protected ReportingController(Map<String, Object> senderConfig) {
this.producer = senderEnabled ? createKafkaProducer(senderConfig) : null;
this.reportHolder = senderEnabled ? new ReportHolder(null) : null;
this.executorService = senderEnabled ? Executors.newFixedThreadPool(1) : null;
this.reportingClientId = senderEnabled ? createProducerId() : null;
}

/**
Expand Down

0 comments on commit 1109a78

Please sign in to comment.