From 1c5b3ff10c0e819f02c0baf8a0cce6c58a89aa3f Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti <65240126+fjtirado@users.noreply.github.com> Date: Fri, 2 Feb 2024 14:26:21 +0100 Subject: [PATCH] [Fix_3383] Adding binary cloud event support for outgoing messages (#3386) * [Fix_3383] Do not include null values on json payload * Revert "[Fix_3383] Do not include null values on json payload" This reverts commit 47b9b4d7599badd9f8bdabbec22ddf10ab7b82c7. * [Fix_#3383] Setting metadata when using binary --- .../AbstractQuarkusCloudEventEmitter.java | 30 +++++++++++++++---- .../quarkus/deployment/ChannelInfo.java | 13 ++++++-- .../deployment/ChannelMappingStrategy.java | 25 +++++++++++++++- .../quarkus/deployment/CloudEventMode.java | 24 +++++++++++++++ .../deployment/EventEmitterGenerator.java | 2 +- 5 files changed, 85 insertions(+), 9 deletions(-) create mode 100644 quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java diff --git a/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java b/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java index 2b879347029..ceed88f4783 100644 --- a/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java +++ b/quarkus/addons/messaging/common/src/main/java/org/kie/kogito/addon/quarkus/messaging/common/AbstractQuarkusCloudEventEmitter.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider; import org.kie.kogito.event.CloudEventMarshaller; import org.kie.kogito.event.DataEvent; @@ -32,6 +34,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata; +import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadataBuilder; + import jakarta.inject.Inject; public abstract class AbstractQuarkusCloudEventEmitter implements EventEmitter { @@ -49,11 +54,11 @@ public abstract class AbstractQuarkusCloudEventEmitter implements EventEmitte public CompletionStage emit(DataEvent dataEvent) { logger.debug("publishing event {}", dataEvent); try { - Message message = messageDecorator.decorate(Message.of(getPayload(dataEvent)) + Message message = messageDecorator.decorate(getMessage(dataEvent)) .withNack(e -> { logger.error("Error publishing event {}", dataEvent, e); return CompletableFuture.completedFuture(null); - })); + }); emit(message); return message.getAck().get(); } catch (IOException e) { @@ -69,11 +74,26 @@ protected void setCloudEventMarshaller(CloudEventMarshaller marshaller) { this.cloudEventMarshaller = marshaller; } - private M getPayload(DataEvent event) throws IOException { + private Optional> getMetadata(DataEvent event) { + if (event.getId() == null || event.getType() == null || event.getSource() == null || event.getSpecVersion() == null) { + return Optional.empty(); + } + OutgoingCloudEventMetadataBuilder builder = OutgoingCloudEventMetadata.builder().withId(event.getId()).withSource(event.getSource()).withType(event.getType()) + .withSubject(event.getSubject()) + .withDataContentType(event.getDataContentType()).withDataSchema(event.getDataSchema()).withSpecVersion(event.getSpecVersion().name()).withTimestamp(event.getTime().toZonedDateTime()); + for (String extName : event.getExtensionNames()) { + builder.withExtension(extName, event.getExtension(extName)); + } + return Optional.of(builder.build()); + } + + private Message getMessage(DataEvent event) throws IOException { if (cloudEventMarshaller != null) { - return cloudEventMarshaller.marshall(event.asCloudEvent(cloudEventMarshaller.cloudEventDataFactory())); + return Message.of(cloudEventMarshaller.marshall(event.asCloudEvent(cloudEventMarshaller.cloudEventDataFactory()))); } else if (eventMarshaller != null) { - return eventMarshaller.marshall(event.getData()); + Optional> metadata = getMetadata(event); + M payload = eventMarshaller.marshall(event.getData()); + return metadata.isPresent() ? Message.of(payload, Metadata.of(metadata.orElseThrow())) : Message.of(payload); } else { throw new IllegalStateException("Not marshaller has been set for emitter " + this); } diff --git a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java index 5d528da3143..4db58fae5c6 100644 --- a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java +++ b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelInfo.java @@ -30,10 +30,13 @@ public class ChannelInfo { private final boolean isInput; private final boolean isDefault; + private final Optional cloudEventMode; + private final Optional marshaller; private final Optional onOverflow; - protected ChannelInfo(String channelName, Collection triggers, String className, boolean isInput, boolean isDefault, Optional marshaller, Optional onOverflow) { + protected ChannelInfo(String channelName, Collection triggers, String className, boolean isInput, boolean isDefault, Optional marshaller, Optional onOverflow, + Optional cloudEventMode) { this.className = className; this.channelName = channelName; this.isInput = isInput; @@ -41,6 +44,7 @@ protected ChannelInfo(String channelName, Collection triggers, String cl this.triggers = triggers; this.marshaller = marshaller; this.onOverflow = onOverflow; + this.cloudEventMode = cloudEventMode; } public Collection getTriggers() { @@ -93,9 +97,14 @@ public Optional getOnOverflow() { return onOverflow; } + public Optional getCloudEventMode() { + return cloudEventMode; + } + @Override public String toString() { return "ChannelInfo [channelName=" + channelName + ", className=" + className + ", triggers=" + triggers - + ", isInput=" + isInput + ", isDefault=" + isDefault + ", marshaller=" + marshaller + "]"; + + ", isInput=" + isInput + ", isDefault=" + isDefault + ", cloudEventMode=" + cloudEventMode + + ", marshaller=" + marshaller + ", onOverflow=" + onOverflow + "]"; } } diff --git a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java index c62f4acfe49..485ce4749e1 100644 --- a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java +++ b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/ChannelMappingStrategy.java @@ -47,6 +47,8 @@ private ChannelMappingStrategy() { private static final String INCOMING_DEFAULT_CHANNEL = KOGITO_INCOMING_PREFIX + "defaultName"; private static final String OUTGOING_DEFAULT_CHANNEL = KOGITO_OUTGOING_PREFIX + "defaultName"; + private static final String CLOUD_EVENT_MODE = KOGITO_OUTGOING_PREFIX + "cloudEventMode"; + private static final String MARSHALLER_PREFIX = KOGITO_MESSAGING_PREFIX + "marshaller."; private static final String UNMARSHALLLER_PREFIX = KOGITO_MESSAGING_PREFIX + "unmarshaller."; private static final String KOGITO_EMITTER_PREFIX = KOGITO_MESSAGING_PREFIX + "emitter."; @@ -96,7 +98,28 @@ private static ChannelInfo getChannelInfo(Config config, String property, String return new ChannelInfo(name, triggers.getOrDefault(name, Collections.singleton(name)), getClassName(config.getOptionalValue(getPropertyName(prefix, name, "value." + (isInput ? "deserializer" : "serializer")), String.class)), isInput, name.equals(defaultChannelName), config.getOptionalValue((isInput ? UNMARSHALLLER_PREFIX : MARSHALLER_PREFIX) + name, String.class), - isInput ? Optional.empty() : onOverflowInfo(config, name)); + isInput ? Optional.empty() : onOverflowInfo(config, name), cloudEventMode(config, name, property)); + } + + private static Optional cloudEventMode(Config config, String name, String property) { + if (!config.getOptionalValue("kogito.messaging.as-cloudevents", Boolean.class).orElse(true)) { + return Optional.empty(); + } + Optional cloudEventMode = getCloudEventMode(config, CLOUD_EVENT_MODE + "." + name); + if (cloudEventMode.isPresent()) { + return cloudEventMode; + } + cloudEventMode = getCloudEventMode(config, CLOUD_EVENT_MODE); + if (cloudEventMode.isPresent()) { + return cloudEventMode; + } + // if no config, infer default from connector type + String connector = config.getValue(property, String.class); + return Optional.of(connector.equals("quarkus-http") ? CloudEventMode.BINARY : CloudEventMode.STRUCTURED); + } + + private static Optional getCloudEventMode(Config config, String propName) { + return config.getOptionalValue(propName, String.class).map(String::toUpperCase).map(CloudEventMode::valueOf); } private static Optional onOverflowInfo(Config config, String name) { diff --git a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java new file mode 100644 index 00000000000..bb53eaa7737 --- /dev/null +++ b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/CloudEventMode.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.addon.cloudevents.quarkus.deployment; + +public enum CloudEventMode { + STRUCTURED, + BINARY +} diff --git a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java index e2999c9bc4d..1dba1d91441 100644 --- a/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java +++ b/quarkus/addons/messaging/deployment/src/main/java/org/kie/kogito/addon/cloudevents/quarkus/deployment/EventEmitterGenerator.java @@ -26,7 +26,7 @@ public class EventEmitterGenerator extends EventGenerator { public EventEmitterGenerator(KogitoBuildContext context, ChannelInfo channelInfo) { super(context, channelInfo, "EventEmitter"); - if (context.getApplicationProperty("kogito.messaging.as-cloudevents", Boolean.class).orElse(true)) { + if (channelInfo.getCloudEventMode().filter(mode -> mode == CloudEventMode.STRUCTURED).isPresent()) { generateMarshallerField("marshaller", "setCloudEventMarshaller", CloudEventMarshaller.class); } else { generateMarshallerField("marshaller", "setEventDataMarshaller", EventMarshaller.class);