Skip to content

Commit

Permalink
Implements #57 SwimlaneDispatcher configurable - hashing algorithm, t…
Browse files Browse the repository at this point in the history
…hreadpool size - defined TopicPartitionToSwimLaneMapping strategy
  • Loading branch information
cer committed Aug 4, 2022
1 parent 1419a4f commit 681d436
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.eventuate.messaging.kafka.common.EventuateKafkaMultiMessage;
import io.eventuate.messaging.partitionmanagement.CommonMessageConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,13 +30,15 @@ public class MessageConsumerKafkaImpl implements CommonMessageConsumer {
private EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties;
private KafkaConsumerFactory kafkaConsumerFactory;
private EventuateKafkaMultiMessageConverter eventuateKafkaMultiMessageConverter = new EventuateKafkaMultiMessageConverter();
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping;

public MessageConsumerKafkaImpl(String bootstrapServers,
EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties,
KafkaConsumerFactory kafkaConsumerFactory) {
KafkaConsumerFactory kafkaConsumerFactory, TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping) {
this.bootstrapServers = bootstrapServers;
this.eventuateKafkaConsumerConfigurationProperties = eventuateKafkaConsumerConfigurationProperties;
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.partitionToSwimLaneMapping = partitionToSwimLaneMapping;
}

public KafkaSubscription subscribe(String subscriberId, Set<String> channels, KafkaMessageHandler handler) {
Expand All @@ -47,13 +50,13 @@ public KafkaSubscription subscribe(String subscriberId, Set<String> channels, Ka

public KafkaSubscription subscribeWithReactiveHandler(String subscriberId, Set<String> channels, ReactiveKafkaMessageHandler handler) {

SwimlaneBasedDispatcher swimlaneBasedDispatcher = new SwimlaneBasedDispatcher(subscriberId, Executors.newCachedThreadPool());
SwimlaneBasedDispatcher swimlaneBasedDispatcher = new SwimlaneBasedDispatcher(subscriberId, Executors.newCachedThreadPool(), partitionToSwimLaneMapping);

EventuateKafkaConsumerMessageHandler kcHandler = (record, callback) -> {
if (eventuateKafkaMultiMessageConverter.isMultiMessage(record.value())) {
return handleBatch(record, swimlaneBasedDispatcher, callback, handler);
} else {
return swimlaneBasedDispatcher.dispatch(new RawKafkaMessage(record.value()), record.partition(), message -> handle(message, callback, handler));
return swimlaneBasedDispatcher.dispatch(new RawKafkaMessage(record.value()), new TopicPartition(record.topic(), record.partition()), message -> handle(message, callback, handler));
}
};

Expand Down Expand Up @@ -85,7 +88,7 @@ private SwimlaneDispatcherBacklog handleBatch(ConsumerRecord<String, byte[]> rec
.map(EventuateKafkaMultiMessage::getValue)
.map(KafkaMessage::new)
.map(kafkaMessage ->
swimlaneBasedDispatcher.dispatch(new RawKafkaMessage(record.value()), record.partition(), message -> handle(message, callback, handler)))
swimlaneBasedDispatcher.dispatch(new RawKafkaMessage(record.value()), new TopicPartition(record.topic(), record.partition()), message -> handle(message, callback, handler)))
.collect(Collectors.toList()) // it is not possible to use "findAny()" now, because streams are lazy and only one message will be processed
.stream()
.findAny()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.eventuate.messaging.kafka.consumer;

import org.apache.kafka.common.TopicPartition;

public class OriginalTopicPartitionToSwimLaneMapping implements TopicPartitionToSwimLaneMapping {
@Override
public Integer toSwimLane(TopicPartition topicPartition) {
return topicPartition.partition();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.messaging.kafka.consumer;

import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -15,13 +16,16 @@ public class SwimlaneBasedDispatcher {
private Executor executor;
private String subscriberId;

public SwimlaneBasedDispatcher(String subscriberId, Executor executor) {
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping;

public SwimlaneBasedDispatcher(String subscriberId, Executor executor, TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping) {
this.subscriberId = subscriberId;
this.executor = executor;
this.partitionToSwimLaneMapping = partitionToSwimLaneMapping;
}

public SwimlaneDispatcherBacklog dispatch(RawKafkaMessage message, Integer swimlane, Consumer<RawKafkaMessage> target) {
SwimlaneDispatcher swimlaneDispatcher = getOrCreate(swimlane);
public SwimlaneDispatcherBacklog dispatch(RawKafkaMessage message, TopicPartition topicPartition, Consumer<RawKafkaMessage> target) {
SwimlaneDispatcher swimlaneDispatcher = getOrCreate(partitionToSwimLaneMapping.toSwimLane(topicPartition));
return swimlaneDispatcher.dispatch(message, target);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.eventuate.messaging.kafka.consumer;

import org.apache.kafka.common.TopicPartition;

public interface TopicPartitionToSwimLaneMapping {
Integer toSwimLane(TopicPartition topicPartition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
import io.eventuate.messaging.kafka.basic.consumer.KafkaConsumerFactory;
import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimLaneMapping;
import io.micronaut.context.annotation.Factory;

import javax.inject.Singleton;

@Factory
public class MessageConsumerKafkaFactory {
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimLaneMapping();

@Singleton
public MessageConsumerKafkaImpl messageConsumerKafka(EventuateKafkaConfigurationProperties props,
EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties,
KafkaConsumerFactory kafkaConsumerFactory) {
return new MessageConsumerKafkaImpl(props.getBootstrapServers(), eventuateKafkaConsumerConfigurationProperties, kafkaConsumerFactory);
return new MessageConsumerKafkaImpl(props.getBootstrapServers(), eventuateKafkaConsumerConfigurationProperties, kafkaConsumerFactory, partitionToSwimLaneMapping);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.producer.EventuateKafkaProducer;
import io.eventuate.messaging.kafka.producer.EventuateKafkaProducerConfigurationProperties;
import io.micronaut.context.annotation.Factory;
Expand All @@ -10,6 +12,8 @@

@Factory
public class EventuateKafkaProducerConsumerFactory {
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimLaneMapping();

@Singleton
public EventuateKafkaProducer producer(EventuateKafkaConfigurationProperties kafkaProperties, EventuateKafkaProducerConfigurationProperties producerProperties) {
return new EventuateKafkaProducer(kafkaProperties.getBootstrapServers(), producerProperties);
Expand All @@ -19,7 +23,7 @@ public EventuateKafkaProducer producer(EventuateKafkaConfigurationProperties kaf
public MessageConsumerKafkaImpl messageConsumerKafka(EventuateKafkaConfigurationProperties props,
EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties,
KafkaConsumerFactory kafkaConsumerFactory) {
return new MessageConsumerKafkaImpl(props.getBootstrapServers(), eventuateKafkaConsumerConfigurationProperties, kafkaConsumerFactory);
return new MessageConsumerKafkaImpl(props.getBootstrapServers(), eventuateKafkaConsumerConfigurationProperties, kafkaConsumerFactory, partitionToSwimLaneMapping);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.eventuate.messaging.kafka.spring.consumer;

import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimLaneMapping;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -13,10 +16,13 @@
@Configuration
@Import({EventuateKafkaPropertiesConfiguration.class, EventuateKafkaConsumerSpringConfigurationPropertiesConfiguration.class})
public class MessageConsumerKafkaConfiguration {
@Autowired(required=false)
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimLaneMapping();

@Bean
public MessageConsumerKafkaImpl messageConsumerKafka(EventuateKafkaConfigurationProperties props,
EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties,
KafkaConsumerFactory kafkaConsumerFactory) {
return new MessageConsumerKafkaImpl(props.getBootstrapServers(), eventuateKafkaConsumerConfigurationProperties, kafkaConsumerFactory);
return new MessageConsumerKafkaImpl(props.getBootstrapServers(), eventuateKafkaConsumerConfigurationProperties, kafkaConsumerFactory, partitionToSwimLaneMapping);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerConfigurationProperties;
import io.eventuate.messaging.kafka.basic.consumer.KafkaConsumerFactory;
import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.spring.common.EventuateKafkaPropertiesConfiguration;
import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl;
import io.eventuate.messaging.kafka.producer.EventuateKafkaProducer;
Expand All @@ -31,6 +33,10 @@ public class EventuateKafkaBasicConsumerSpringTest extends AbstractEventuateKafk
EventuateKafkaProducerSpringConfigurationPropertiesConfiguration.class,
EventuateKafkaPropertiesConfiguration.class})
public static class EventuateKafkaConsumerTestConfiguration {

@Autowired(required=false)
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimLaneMapping();

@Bean
public EventuateKafkaProducer producer(EventuateKafkaConfigurationProperties kafkaProperties,
EventuateKafkaProducerConfigurationProperties producerProperties) {
Expand All @@ -41,7 +47,7 @@ public EventuateKafkaProducer producer(EventuateKafkaConfigurationProperties kaf
public MessageConsumerKafkaImpl messageConsumerKafka(EventuateKafkaConfigurationProperties props,
EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties,
KafkaConsumerFactory kafkaConsumerFactory) {
return new MessageConsumerKafkaImpl(props.getBootstrapServers(), eventuateKafkaConsumerConfigurationProperties, kafkaConsumerFactory);
return new MessageConsumerKafkaImpl(props.getBootstrapServers(), eventuateKafkaConsumerConfigurationProperties, kafkaConsumerFactory, partitionToSwimLaneMapping);
}

@Bean
Expand Down

0 comments on commit 681d436

Please sign in to comment.