diff --git a/CHANGELOG.md b/CHANGELOG.md index 472ef0feb..b3cffecb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [unreleased] +### Fixed +- KafkaEventReceiver progress logger will actually log the process during normal application bootstrapping. + ## [1.4.2] 2023-09-21 ### No change - The release deployment for 1.4.1 failed (502 Bad Gateway) so releasing it again as 1.4.2 diff --git a/state/kafka-receiver/src/main/java/com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventReceiver.java b/state/kafka-receiver/src/main/java/com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventReceiver.java index c62c3960c..768c6514e 100644 --- a/state/kafka-receiver/src/main/java/com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventReceiver.java +++ b/state/kafka-receiver/src/main/java/com/expediagroup/streamplatform/streamregistry/state/kafka/KafkaEventReceiver.java @@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import com.expediagroup.streamplatform.streamregistry.state.Configurator; import lombok.Builder; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -52,6 +51,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import com.expediagroup.streamplatform.streamregistry.state.Configurator; import com.expediagroup.streamplatform.streamregistry.state.EventReceiver; import com.expediagroup.streamplatform.streamregistry.state.EventReceiverListener; import com.expediagroup.streamplatform.streamregistry.state.avro.AvroConverter; @@ -62,6 +62,14 @@ @Slf4j @RequiredArgsConstructor(access = PACKAGE) public class KafkaEventReceiver implements EventReceiver { + + /** + * Most of the time, there will only ever be a single process running on the ExecutorService. However, we need this to be two during + * application bootstrapping. 1 thread for the consumer, 1 thread for the progress logger. + */ + private static final int THREAD_POOL_SIZE = 2; + + @NonNull private final Config config; private final EventCorrelator correlator; @NonNull private final AvroConverter converter; @@ -77,7 +85,7 @@ public KafkaEventReceiver(Config config, EventCorrelator correlator, Configurato correlator, new AvroConverter(), getKafkaConsumer(config, consumerConfigurator), - newScheduledThreadPool(1) + newScheduledThreadPool(THREAD_POOL_SIZE) ); }