Skip to content

Commit

Permalink
[Fix_3383] Adding binary cloud event support for outgoing messages (a…
Browse files Browse the repository at this point in the history
…pache#3386)

* [Fix_3383] Do not include null values on json payload

* Revert "[Fix_3383] Do not include null values on json payload"

This reverts commit 47b9b4d.

* [Fix_#3383] Setting metadata when using binary
  • Loading branch information
fjtirado authored and rgdoliveira committed Feb 5, 2024
1 parent 5967c17 commit 1c5b3ff
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.DataEvent;
Expand All @@ -32,6 +34,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadataBuilder;

import jakarta.inject.Inject;

public abstract class AbstractQuarkusCloudEventEmitter<M> implements EventEmitter {
Expand All @@ -49,11 +54,11 @@ public abstract class AbstractQuarkusCloudEventEmitter<M> implements EventEmitte
public CompletionStage<Void> emit(DataEvent<?> dataEvent) {
logger.debug("publishing event {}", dataEvent);
try {
Message<M> message = messageDecorator.decorate(Message.of(getPayload(dataEvent))
Message<M> message = messageDecorator.decorate(getMessage(dataEvent))
.withNack(e -> {
logger.error("Error publishing event {}", dataEvent, e);
return CompletableFuture.completedFuture(null);
}));
});
emit(message);
return message.getAck().get();
} catch (IOException e) {
Expand All @@ -69,11 +74,26 @@ protected void setCloudEventMarshaller(CloudEventMarshaller<M> marshaller) {
this.cloudEventMarshaller = marshaller;
}

private <T> M getPayload(DataEvent<T> event) throws IOException {
private <T> Optional<OutgoingCloudEventMetadata<?>> getMetadata(DataEvent<T> event) {
if (event.getId() == null || event.getType() == null || event.getSource() == null || event.getSpecVersion() == null) {
return Optional.empty();
}
OutgoingCloudEventMetadataBuilder<Object> builder = OutgoingCloudEventMetadata.builder().withId(event.getId()).withSource(event.getSource()).withType(event.getType())
.withSubject(event.getSubject())
.withDataContentType(event.getDataContentType()).withDataSchema(event.getDataSchema()).withSpecVersion(event.getSpecVersion().name()).withTimestamp(event.getTime().toZonedDateTime());
for (String extName : event.getExtensionNames()) {
builder.withExtension(extName, event.getExtension(extName));
}
return Optional.of(builder.build());
}

private <T> Message<M> getMessage(DataEvent<T> event) throws IOException {
if (cloudEventMarshaller != null) {
return cloudEventMarshaller.marshall(event.asCloudEvent(cloudEventMarshaller.cloudEventDataFactory()));
return Message.of(cloudEventMarshaller.marshall(event.asCloudEvent(cloudEventMarshaller.cloudEventDataFactory())));
} else if (eventMarshaller != null) {
return eventMarshaller.marshall(event.getData());
Optional<OutgoingCloudEventMetadata<?>> metadata = getMetadata(event);
M payload = eventMarshaller.marshall(event.getData());
return metadata.isPresent() ? Message.of(payload, Metadata.of(metadata.orElseThrow())) : Message.of(payload);
} else {
throw new IllegalStateException("Not marshaller has been set for emitter " + this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ public class ChannelInfo {
private final boolean isInput;
private final boolean isDefault;

private final Optional<CloudEventMode> cloudEventMode;

private final Optional<String> marshaller;
private final Optional<OnOverflowInfo> onOverflow;

protected ChannelInfo(String channelName, Collection<String> triggers, String className, boolean isInput, boolean isDefault, Optional<String> marshaller, Optional<OnOverflowInfo> onOverflow) {
protected ChannelInfo(String channelName, Collection<String> triggers, String className, boolean isInput, boolean isDefault, Optional<String> marshaller, Optional<OnOverflowInfo> onOverflow,
Optional<CloudEventMode> cloudEventMode) {
this.className = className;
this.channelName = channelName;
this.isInput = isInput;
this.isDefault = isDefault;
this.triggers = triggers;
this.marshaller = marshaller;
this.onOverflow = onOverflow;
this.cloudEventMode = cloudEventMode;
}

public Collection<String> getTriggers() {
Expand Down Expand Up @@ -93,9 +97,14 @@ public Optional<OnOverflowInfo> getOnOverflow() {
return onOverflow;
}

public Optional<CloudEventMode> getCloudEventMode() {
return cloudEventMode;
}

@Override
public String toString() {
return "ChannelInfo [channelName=" + channelName + ", className=" + className + ", triggers=" + triggers
+ ", isInput=" + isInput + ", isDefault=" + isDefault + ", marshaller=" + marshaller + "]";
+ ", isInput=" + isInput + ", isDefault=" + isDefault + ", cloudEventMode=" + cloudEventMode
+ ", marshaller=" + marshaller + ", onOverflow=" + onOverflow + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private ChannelMappingStrategy() {
private static final String INCOMING_DEFAULT_CHANNEL = KOGITO_INCOMING_PREFIX + "defaultName";
private static final String OUTGOING_DEFAULT_CHANNEL = KOGITO_OUTGOING_PREFIX + "defaultName";

private static final String CLOUD_EVENT_MODE = KOGITO_OUTGOING_PREFIX + "cloudEventMode";

private static final String MARSHALLER_PREFIX = KOGITO_MESSAGING_PREFIX + "marshaller.";
private static final String UNMARSHALLLER_PREFIX = KOGITO_MESSAGING_PREFIX + "unmarshaller.";
private static final String KOGITO_EMITTER_PREFIX = KOGITO_MESSAGING_PREFIX + "emitter.";
Expand Down Expand Up @@ -96,7 +98,28 @@ private static ChannelInfo getChannelInfo(Config config, String property, String
return new ChannelInfo(name, triggers.getOrDefault(name, Collections.singleton(name)),
getClassName(config.getOptionalValue(getPropertyName(prefix, name, "value." + (isInput ? "deserializer" : "serializer")), String.class)), isInput,
name.equals(defaultChannelName), config.getOptionalValue((isInput ? UNMARSHALLLER_PREFIX : MARSHALLER_PREFIX) + name, String.class),
isInput ? Optional.empty() : onOverflowInfo(config, name));
isInput ? Optional.empty() : onOverflowInfo(config, name), cloudEventMode(config, name, property));
}

private static Optional<CloudEventMode> cloudEventMode(Config config, String name, String property) {
if (!config.getOptionalValue("kogito.messaging.as-cloudevents", Boolean.class).orElse(true)) {
return Optional.empty();
}
Optional<CloudEventMode> cloudEventMode = getCloudEventMode(config, CLOUD_EVENT_MODE + "." + name);
if (cloudEventMode.isPresent()) {
return cloudEventMode;
}
cloudEventMode = getCloudEventMode(config, CLOUD_EVENT_MODE);
if (cloudEventMode.isPresent()) {
return cloudEventMode;
}
// if no config, infer default from connector type
String connector = config.getValue(property, String.class);
return Optional.of(connector.equals("quarkus-http") ? CloudEventMode.BINARY : CloudEventMode.STRUCTURED);
}

private static Optional<CloudEventMode> getCloudEventMode(Config config, String propName) {
return config.getOptionalValue(propName, String.class).map(String::toUpperCase).map(CloudEventMode::valueOf);
}

private static Optional<OnOverflowInfo> onOverflowInfo(Config config, String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.addon.cloudevents.quarkus.deployment;

public enum CloudEventMode {
STRUCTURED,
BINARY
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class EventEmitterGenerator extends EventGenerator {

public EventEmitterGenerator(KogitoBuildContext context, ChannelInfo channelInfo) {
super(context, channelInfo, "EventEmitter");
if (context.getApplicationProperty("kogito.messaging.as-cloudevents", Boolean.class).orElse(true)) {
if (channelInfo.getCloudEventMode().filter(mode -> mode == CloudEventMode.STRUCTURED).isPresent()) {
generateMarshallerField("marshaller", "setCloudEventMarshaller", CloudEventMarshaller.class);
} else {
generateMarshallerField("marshaller", "setEventDataMarshaller", EventMarshaller.class);
Expand Down

0 comments on commit 1c5b3ff

Please sign in to comment.