Skip to content

Commit

Permalink
Implements #57 SwimlaneDispatcher configurable - fixed capitalization…
Browse files Browse the repository at this point in the history
…: SwimLane -> Swimlane
  • Loading branch information
cer committed Aug 6, 2022
1 parent 8391e06 commit 3294297
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@

public class MessageConsumerKafkaImpl implements CommonMessageConsumer {

private Logger logger = LoggerFactory.getLogger(getClass());
private final Logger logger = LoggerFactory.getLogger(getClass());

private final String id = UUID.randomUUID().toString();

private String bootstrapServers;
private List<EventuateKafkaConsumer> consumers = new ArrayList<>();
private EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties;
private KafkaConsumerFactory kafkaConsumerFactory;
private EventuateKafkaMultiMessageConverter eventuateKafkaMultiMessageConverter = new EventuateKafkaMultiMessageConverter();
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping;
private final String bootstrapServers;
private final List<EventuateKafkaConsumer> consumers = new ArrayList<>();
private final EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties;
private final KafkaConsumerFactory kafkaConsumerFactory;
private final EventuateKafkaMultiMessageConverter eventuateKafkaMultiMessageConverter = new EventuateKafkaMultiMessageConverter();
private final TopicPartitionToSwimlaneMapping partitionToSwimLaneMapping;

public MessageConsumerKafkaImpl(String bootstrapServers,
EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties,
KafkaConsumerFactory kafkaConsumerFactory, TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping) {
KafkaConsumerFactory kafkaConsumerFactory, TopicPartitionToSwimlaneMapping partitionToSwimLaneMapping) {
this.bootstrapServers = bootstrapServers;
this.eventuateKafkaConsumerConfigurationProperties = eventuateKafkaConsumerConfigurationProperties;
this.kafkaConsumerFactory = kafkaConsumerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import java.util.concurrent.ConcurrentHashMap;

public class MultipleSwimlanesPerTopicPartitionMapping implements TopicPartitionToSwimLaneMapping {
public class MultipleSwimlanesPerTopicPartitionMapping implements TopicPartitionToSwimlaneMapping {

private final int swimlanesPerTopicPartition;

Expand All @@ -16,7 +16,7 @@ public MultipleSwimlanesPerTopicPartitionMapping(int swimlanesPerTopicPartition)


@Override
public Integer toSwimLane(TopicPartition topicPartition, String messageKey) {
public Integer toSwimlane(TopicPartition topicPartition, String messageKey) {
int startingSwimlane = mapping.computeIfAbsent(topicPartition, tp -> mapping.size() * swimlanesPerTopicPartition);
return startingSwimlane + messageKey.hashCode() % swimlanesPerTopicPartition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import org.apache.kafka.common.TopicPartition;

public class OriginalTopicPartitionToSwimLaneMapping implements TopicPartitionToSwimLaneMapping {
public class OriginalTopicPartitionToSwimlaneMapping implements TopicPartitionToSwimlaneMapping {
@Override
public Integer toSwimLane(TopicPartition topicPartition, String messageKey) {
public Integer toSwimlane(TopicPartition topicPartition, String messageKey) {
return topicPartition.partition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ public class SwimlaneBasedDispatcher {
private final Executor executor;
private final String subscriberId;

private final TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping;
private final TopicPartitionToSwimlaneMapping partitionToSwimLaneMapping;

public SwimlaneBasedDispatcher(String subscriberId, Executor executor, TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping) {
public SwimlaneBasedDispatcher(String subscriberId, Executor executor, TopicPartitionToSwimlaneMapping partitionToSwimLaneMapping) {
this.subscriberId = subscriberId;
this.executor = executor;
this.partitionToSwimLaneMapping = partitionToSwimLaneMapping;
}

public SwimlaneDispatcherBacklog dispatch(RawKafkaMessage message, TopicPartition topicPartition, Consumer<RawKafkaMessage> target) {
SwimlaneDispatcher swimlaneDispatcher = getOrCreate(partitionToSwimLaneMapping.toSwimLane(topicPartition, message.getMessageKey()));
SwimlaneDispatcher swimlaneDispatcher = getOrCreate(partitionToSwimLaneMapping.toSwimlane(topicPartition, message.getMessageKey()));
return swimlaneDispatcher.dispatch(message, target);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import java.util.concurrent.ConcurrentHashMap;

public class SwimlanePerTopicPartition implements TopicPartitionToSwimLaneMapping {
public class SwimlanePerTopicPartition implements TopicPartitionToSwimlaneMapping {

private final ConcurrentHashMap<TopicPartition, Integer> mapping = new ConcurrentHashMap<>();

@Override
public Integer toSwimLane(TopicPartition topicPartition, String messageKey) {
public Integer toSwimlane(TopicPartition topicPartition, String messageKey) {
return mapping.computeIfAbsent(topicPartition, tp -> mapping.size());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.eventuate.messaging.kafka.consumer;

import org.apache.kafka.common.TopicPartition;

public interface TopicPartitionToSwimlaneMapping {
Integer toSwimlane(TopicPartition topicPartition, String messageKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void shouldMap() {
}

private void assertSwimlaneWithinRange(MultipleSwimlanesPerTopicPartitionMapping mapping, TopicPartition topicPartition, String key, int minInclusive, int maxExclusive) {
assertThat(mapping.toSwimLane(topicPartition, key)).isGreaterThanOrEqualTo(minInclusive).isLessThan(maxExclusive);
assertThat(mapping.toSwimlane(topicPartition, key)).isGreaterThanOrEqualTo(minInclusive).isLessThan(maxExclusive);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ public class SwimlanePerTopicPartitionTest {
@Test
public void shouldComputeSwimlane() {
SwimlanePerTopicPartition mapping = new SwimlanePerTopicPartition();
assertEquals(Integer.valueOf(0), mapping.toSwimLane(tp0(), "X"));
assertEquals(Integer.valueOf(0), mapping.toSwimLane(tp0(), "Y"));
assertEquals(Integer.valueOf(1), mapping.toSwimLane(tp1(), "Z"));
assertEquals(Integer.valueOf(0), mapping.toSwimlane(tp0(), "X"));
assertEquals(Integer.valueOf(0), mapping.toSwimlane(tp0(), "Y"));
assertEquals(Integer.valueOf(1), mapping.toSwimlane(tp1(), "Z"));
}

private TopicPartition tp1() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import io.eventuate.messaging.kafka.basic.consumer.KafkaConsumerFactory;
import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimlaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimlaneMapping;
import io.micronaut.context.annotation.Factory;

import javax.inject.Singleton;

@Factory
public class MessageConsumerKafkaFactory {
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimLaneMapping();
private final TopicPartitionToSwimlaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimlaneMapping();

@Singleton
public MessageConsumerKafkaImpl messageConsumerKafka(EventuateKafkaConfigurationProperties props,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimlaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimlaneMapping;
import io.eventuate.messaging.kafka.producer.EventuateKafkaProducer;
import io.eventuate.messaging.kafka.producer.EventuateKafkaProducerConfigurationProperties;
import io.micronaut.context.annotation.Factory;
Expand All @@ -12,7 +12,7 @@

@Factory
public class EventuateKafkaProducerConsumerFactory {
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimLaneMapping();
private final TopicPartitionToSwimlaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimlaneMapping();

@Singleton
public EventuateKafkaProducer producer(EventuateKafkaConfigurationProperties kafkaProperties, EventuateKafkaProducerConfigurationProperties producerProperties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.eventuate.messaging.kafka.spring.consumer;

import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimlaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimlaneMapping;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -17,7 +17,7 @@
@Import({EventuateKafkaPropertiesConfiguration.class, EventuateKafkaConsumerSpringConfigurationPropertiesConfiguration.class})
public class MessageConsumerKafkaConfiguration {
@Autowired(required=false)
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimLaneMapping();
private TopicPartitionToSwimlaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimlaneMapping();

@Bean
public MessageConsumerKafkaImpl messageConsumerKafka(EventuateKafkaConfigurationProperties props,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import io.eventuate.messaging.kafka.basic.consumer.EventuateKafkaConsumerConfigurationProperties;
import io.eventuate.messaging.kafka.basic.consumer.KafkaConsumerFactory;
import io.eventuate.messaging.kafka.common.EventuateKafkaConfigurationProperties;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimLaneMapping;
import io.eventuate.messaging.kafka.consumer.OriginalTopicPartitionToSwimlaneMapping;
import io.eventuate.messaging.kafka.consumer.TopicPartitionToSwimlaneMapping;
import io.eventuate.messaging.kafka.spring.common.EventuateKafkaPropertiesConfiguration;
import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl;
import io.eventuate.messaging.kafka.producer.EventuateKafkaProducer;
Expand Down Expand Up @@ -35,7 +35,7 @@ public class EventuateKafkaBasicConsumerSpringTest extends AbstractEventuateKafk
public static class EventuateKafkaConsumerTestConfiguration {

@Autowired(required=false)
private TopicPartitionToSwimLaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimLaneMapping();
private TopicPartitionToSwimlaneMapping partitionToSwimLaneMapping = new OriginalTopicPartitionToSwimlaneMapping();

@Bean
public EventuateKafkaProducer producer(EventuateKafkaConfigurationProperties kafkaProperties,
Expand Down

0 comments on commit 3294297

Please sign in to comment.