diff --git a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java index a009f26c5a8..aa6f1edf214 100644 --- a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java +++ b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java @@ -19,19 +19,18 @@ package org.kie.kogito.addon.cloudevents.spring; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.kie.kogito.config.ConfigBean; import org.kie.kogito.event.CloudEventUnmarshallerFactory; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.EventReceiver; import org.kie.kogito.event.EventUnmarshaller; -import org.kie.kogito.event.KogitoEventStreams; import org.kie.kogito.event.Subscription; import org.kie.kogito.event.impl.CloudEventConverter; import org.kie.kogito.event.impl.DataEventConverter; @@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import jakarta.annotation.PostConstruct; @@ -67,34 +66,32 @@ private void init() { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public void subscribe(Function, CompletionStage> consumer, Class clazz) { - consumers.add( new Subscription(consumer, configBean.useCloudEvents() ? new CloudEventConverter<>(clazz, cloudEventUnmarshaller) : new DataEventConverter<>(clazz, eventDataUnmarshaller))); } - @KafkaListener(topics = "${kogito.addon.cloudevents.kafka." + KogitoEventStreams.INCOMING + ":" + KogitoEventStreams.INCOMING + "}") - public void receive(@Payload Collection messages) throws InterruptedException { - log.debug("Received {} events", messages.size()); - Collection> futures = new ArrayList<>(); - for (String message : messages) { - for (Subscription consumer : consumers) { - try { - futures.add(consumer.getConsumer().apply(consumer.getConverter().convert(message))); - } catch (IOException e) { - log.info("Cannot convert event to the proper type {}", e.getMessage()); - } - } - } - // wait for this batch to complete - log.debug("Waiting for all operations in batch to complete"); - for (CompletionStage future : futures) { + @KafkaListener(topics = { "#{springTopics.getIncomingTopics}" }) + public void receive(ConsumerRecord message, Acknowledgment ack) throws InterruptedException { + log.debug("Receive message with key {} for topic {}", message.key(), message.topic()); + CompletionStage future = CompletableFuture.completedFuture(null); + for (Subscription subscription : consumers) { try { - future.toCompletableFuture().get(); - } catch (ExecutionException ex) { - log.error("Error executing consumer", ex.getCause()); + Object object = subscription.getConverter().convert(message.value()); + future = future.thenCompose(f -> subscription.getConsumer().apply(object)); + } catch (IOException e) { + log.debug("Error converting event. Exception message is {}", e.getMessage()); } } - log.debug("All operations in batch completed"); + future.whenComplete((v, e) -> acknowledge(e, ack)); + } + + private void acknowledge(Throwable ex, Acknowledgment ack) { + if (ex != null) { + log.error("Event publishing failed", ex); + } else { + log.debug("Acknoledge message"); + ack.acknowledge(); + } } } diff --git a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java index 1085bb33125..fee6d229556 100644 --- a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java +++ b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java @@ -18,6 +18,8 @@ */ package org.kie.kogito.addon.cloudevents.spring; +import java.util.Map; + import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -27,6 +29,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; @EnableKafka @Configuration @@ -34,8 +37,9 @@ public class SpringKafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory(KafkaProperties properties, ObjectProvider customizers) { + Map props = properties.buildConsumerProperties(); DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>( - properties.buildConsumerProperties()); + props); customizers.orderedStream().forEach(customizer -> customizer.customize(factory)); return factory; } @@ -44,7 +48,8 @@ public ConsumerFactory consumerFactory(KafkaProperties propertie public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); - factory.setBatchListener(true); + factory.setBatchListener(false); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } } diff --git a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java index b3d462fdf22..bae999886cb 100644 --- a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java +++ b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java @@ -19,40 +19,72 @@ package org.kie.kogito.addon.cloudevents.spring; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.kie.kogito.addon.cloudevents.AbstractTopicDiscovery; +import org.kie.kogito.event.ChannelType; +import org.kie.kogito.event.EventKind; import org.kie.kogito.event.KogitoEventStreams; import org.kie.kogito.event.Topic; -import org.springframework.beans.factory.annotation.Value; +import org.kie.kogito.event.cloudevents.CloudEventMeta; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; -@Component +@Component("springTopics") public class SpringTopicDiscovery extends AbstractTopicDiscovery { - // in the future we should be implementation agnostic - @Value(value = "${kogito.addon.cloudevents.kafka." + KogitoEventStreams.INCOMING + "}") - String incomingStreamTopic; + private static final Logger logger = LoggerFactory.getLogger(SpringTopicDiscovery.class); + private static final String KAFKA_PREFIX = "kogito.addon.cloudevents.kafka."; + private static final String INCOMING_PREFIX = KAFKA_PREFIX + KogitoEventStreams.INCOMING; + private static final String OUTGOING_PREFIX = KAFKA_PREFIX + KogitoEventStreams.OUTGOING; - @Value(value = "${kogito.addon.cloudevents.kafka." + KogitoEventStreams.OUTGOING + "}") - String outgoingStreamTopic; + @Autowired + private Environment env; + + @Autowired(required = false) + private List cloudEventMetaList = Collections.emptyList(); + + public Set getIncomingTopics() { + return getTopics(INCOMING_PREFIX, KogitoEventStreams.INCOMING, EventKind.CONSUMED); + } + + public Set getOutgoingTopics() { + return getTopics(OUTGOING_PREFIX, KogitoEventStreams.OUTGOING, EventKind.PRODUCED); + } @Override protected List getTopics() { - final List topics = new ArrayList<>(); - - if (incomingStreamTopic != null && !incomingStreamTopic.isEmpty()) { - final Topic incoming = DEFAULT_INCOMING_CHANNEL; - incoming.setName(incomingStreamTopic); - topics.add(incoming); + List topics = new ArrayList<>(); + for (String topic : getIncomingTopics()) { + topics.add(new Topic(topic, ChannelType.INCOMING)); } - - if (outgoingStreamTopic != null && !outgoingStreamTopic.isEmpty()) { - final Topic outgoing = DEFAULT_OUTGOING_CHANNEL; - outgoing.setName(outgoingStreamTopic); - topics.add(outgoing); + for (String topic : getOutgoingTopics()) { + topics.add(new Topic(topic, ChannelType.OUTGOING)); } + logger.debug("Using this list of topics {}", topics); + return topics; + } + private Set getTopics(String prefix, String defaultChannel, EventKind eventKind) { + final String defaultChannelName = env.getProperty(prefix, defaultChannel); + Set topics = cloudEventMetaList.stream().filter(c -> c.getKind().equals(eventKind)).map(CloudEventMeta::getType).map(this::parserTopicType) + .map(t -> env.getProperty(prefix + "." + t, defaultChannelName)) + .collect(Collectors.toSet()); + if (topics.isEmpty()) { + logger.debug("Using default channel name {}", defaultChannelName); + topics.add(defaultChannelName); + } return topics; } + + private String parserTopicType(String topic) { + int indexOf = topic.lastIndexOf('.'); + return indexOf == -1 ? topic : topic.substring(indexOf + 1); + } }