Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix/remove_process_…
Browse files Browse the repository at this point in the history
…binding_consumer_producer_link
  • Loading branch information
jamespfaulkner committed Oct 31, 2023
2 parents 6637877 + 6647bc7 commit fb19165
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [unreleased]
### Fixed
- KafkaEventReceiver progress logger will actually log the process during normal application bootstrapping.

## [1.4.2] 2023-09-21
### No change
- The release deployment for 1.4.1 failed (502 Bad Gateway) so releasing it again as 1.4.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import com.expediagroup.streamplatform.streamregistry.state.Configurator;
import lombok.Builder;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -52,6 +51,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import com.expediagroup.streamplatform.streamregistry.state.Configurator;
import com.expediagroup.streamplatform.streamregistry.state.EventReceiver;
import com.expediagroup.streamplatform.streamregistry.state.EventReceiverListener;
import com.expediagroup.streamplatform.streamregistry.state.avro.AvroConverter;
Expand All @@ -62,6 +62,14 @@
@Slf4j
@RequiredArgsConstructor(access = PACKAGE)
public class KafkaEventReceiver implements EventReceiver {

/**
* Most of the time, there will only ever be a single process running on the ExecutorService. However, we need this to be two during
* application bootstrapping. 1 thread for the consumer, 1 thread for the progress logger.
*/
private static final int THREAD_POOL_SIZE = 2;


@NonNull private final Config config;
private final EventCorrelator correlator;
@NonNull private final AvroConverter converter;
Expand All @@ -77,7 +85,7 @@ public KafkaEventReceiver(Config config, EventCorrelator correlator, Configurato
correlator,
new AvroConverter(),
getKafkaConsumer(config, consumerConfigurator),
newScheduledThreadPool(1)
newScheduledThreadPool(THREAD_POOL_SIZE)
);
}

Expand Down

0 comments on commit fb19165

Please sign in to comment.