Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* [Fix_#3363] Allowing multiple topic for kafka listener

* [Fix_#3663] Manual ack and not thread blocking approach
  • Loading branch information
fjtirado authored and rgdoliveira committed Jan 25, 2024
1 parent c8d7e0f commit 7ba34e7
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,26 @@
package org.kie.kogito.addon.cloudevents.spring;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.kie.kogito.config.ConfigBean;
import org.kie.kogito.event.CloudEventUnmarshallerFactory;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.EventUnmarshaller;
import org.kie.kogito.event.KogitoEventStreams;
import org.kie.kogito.event.Subscription;
import org.kie.kogito.event.impl.CloudEventConverter;
import org.kie.kogito.event.impl.DataEventConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import jakarta.annotation.PostConstruct;
Expand Down Expand Up @@ -67,34 +66,32 @@ private void init() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public <T> void subscribe(Function<DataEvent<T>, CompletionStage<?>> consumer, Class<T> clazz) {

consumers.add(
new Subscription(consumer, configBean.useCloudEvents() ? new CloudEventConverter<>(clazz, cloudEventUnmarshaller)
: new DataEventConverter<>(clazz, eventDataUnmarshaller)));
}

@KafkaListener(topics = "${kogito.addon.cloudevents.kafka." + KogitoEventStreams.INCOMING + ":" + KogitoEventStreams.INCOMING + "}")
public void receive(@Payload Collection<String> messages) throws InterruptedException {
log.debug("Received {} events", messages.size());
Collection<CompletionStage<?>> futures = new ArrayList<>();
for (String message : messages) {
for (Subscription<Object, String> consumer : consumers) {
try {
futures.add(consumer.getConsumer().apply(consumer.getConverter().convert(message)));
} catch (IOException e) {
log.info("Cannot convert event to the proper type {}", e.getMessage());
}
}
}
// wait for this batch to complete
log.debug("Waiting for all operations in batch to complete");
for (CompletionStage<?> future : futures) {
@KafkaListener(topics = { "#{springTopics.getIncomingTopics}" })
public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) throws InterruptedException {
log.debug("Receive message with key {} for topic {}", message.key(), message.topic());
CompletionStage<?> future = CompletableFuture.completedFuture(null);
for (Subscription<Object, String> subscription : consumers) {
try {
future.toCompletableFuture().get();
} catch (ExecutionException ex) {
log.error("Error executing consumer", ex.getCause());
Object object = subscription.getConverter().convert(message.value());
future = future.thenCompose(f -> subscription.getConsumer().apply(object));
} catch (IOException e) {
log.debug("Error converting event. Exception message is {}", e.getMessage());
}
}
log.debug("All operations in batch completed");
future.whenComplete((v, e) -> acknowledge(e, ack));
}

private void acknowledge(Throwable ex, Acknowledgment ack) {
if (ex != null) {
log.error("Event publishing failed", ex);
} else {
log.debug("Acknoledge message");
ack.acknowledge();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.kie.kogito.addon.cloudevents.spring;

import java.util.Map;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
Expand All @@ -27,15 +29,17 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

@EnableKafka
@Configuration
public class SpringKafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties properties, ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
Map<String, Object> props = properties.buildConsumerProperties();
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(
properties.buildConsumerProperties());
props);
customizers.orderedStream().forEach(customizer -> customizer.customize(factory));
return factory;
}
Expand All @@ -44,7 +48,8 @@ public ConsumerFactory<String, String> consumerFactory(KafkaProperties propertie
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.setBatchListener(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,72 @@
package org.kie.kogito.addon.cloudevents.spring;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.kie.kogito.addon.cloudevents.AbstractTopicDiscovery;
import org.kie.kogito.event.ChannelType;
import org.kie.kogito.event.EventKind;
import org.kie.kogito.event.KogitoEventStreams;
import org.kie.kogito.event.Topic;
import org.springframework.beans.factory.annotation.Value;
import org.kie.kogito.event.cloudevents.CloudEventMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
@Component("springTopics")
public class SpringTopicDiscovery extends AbstractTopicDiscovery {

// in the future we should be implementation agnostic
@Value(value = "${kogito.addon.cloudevents.kafka." + KogitoEventStreams.INCOMING + "}")
String incomingStreamTopic;
private static final Logger logger = LoggerFactory.getLogger(SpringTopicDiscovery.class);
private static final String KAFKA_PREFIX = "kogito.addon.cloudevents.kafka.";
private static final String INCOMING_PREFIX = KAFKA_PREFIX + KogitoEventStreams.INCOMING;
private static final String OUTGOING_PREFIX = KAFKA_PREFIX + KogitoEventStreams.OUTGOING;

@Value(value = "${kogito.addon.cloudevents.kafka." + KogitoEventStreams.OUTGOING + "}")
String outgoingStreamTopic;
@Autowired
private Environment env;

@Autowired(required = false)
private List<CloudEventMeta> cloudEventMetaList = Collections.emptyList();

public Set<String> getIncomingTopics() {
return getTopics(INCOMING_PREFIX, KogitoEventStreams.INCOMING, EventKind.CONSUMED);
}

public Set<String> getOutgoingTopics() {
return getTopics(OUTGOING_PREFIX, KogitoEventStreams.OUTGOING, EventKind.PRODUCED);
}

@Override
protected List<Topic> getTopics() {
final List<Topic> topics = new ArrayList<>();

if (incomingStreamTopic != null && !incomingStreamTopic.isEmpty()) {
final Topic incoming = DEFAULT_INCOMING_CHANNEL;
incoming.setName(incomingStreamTopic);
topics.add(incoming);
List<Topic> topics = new ArrayList<>();
for (String topic : getIncomingTopics()) {
topics.add(new Topic(topic, ChannelType.INCOMING));
}

if (outgoingStreamTopic != null && !outgoingStreamTopic.isEmpty()) {
final Topic outgoing = DEFAULT_OUTGOING_CHANNEL;
outgoing.setName(outgoingStreamTopic);
topics.add(outgoing);
for (String topic : getOutgoingTopics()) {
topics.add(new Topic(topic, ChannelType.OUTGOING));
}
logger.debug("Using this list of topics {}", topics);
return topics;
}

private Set<String> getTopics(String prefix, String defaultChannel, EventKind eventKind) {
final String defaultChannelName = env.getProperty(prefix, defaultChannel);
Set<String> topics = cloudEventMetaList.stream().filter(c -> c.getKind().equals(eventKind)).map(CloudEventMeta::getType).map(this::parserTopicType)
.map(t -> env.getProperty(prefix + "." + t, defaultChannelName))
.collect(Collectors.toSet());
if (topics.isEmpty()) {
logger.debug("Using default channel name {}", defaultChannelName);
topics.add(defaultChannelName);
}
return topics;
}

private String parserTopicType(String topic) {
int indexOf = topic.lastIndexOf('.');
return indexOf == -1 ? topic : topic.substring(indexOf + 1);
}
}

0 comments on commit 7ba34e7

Please sign in to comment.