diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java index f29a920c132..a2783038b2a 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java @@ -35,8 +35,16 @@ public MultipleProcessInstanceDataEvent(URI source, Collection>> toCloudEvent(MultipleProcessInstanceDataEvent event, ObjectMapper objectMapper) { + if (MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(event.getDataContentType())) { + return event.isCompressed() ? compressedToBytes : binaryToBytes; + } else { + return objectMapper::writeValueAsBytes; + } + } + public static Converter>> fromCloudEvent(CloudEvent cloudEvent, ObjectMapper objectMapper) { if (MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(cloudEvent.getDataContentType())) { return isCompressed(cloudEvent) ? compressedConverter : binaryConverter; @@ -49,10 +57,13 @@ public static Converter>> compressedToBytes = data -> serialize(data, true); + + private static ToBytes>> binaryToBytes = data -> serialize(data, false); + private static Converter>> binaryConverter = data -> deserialize(data, false); @@ -62,4 +73,9 @@ private static boolean isCompressed(CloudEvent event) { private static Collection> deserialize(CloudEventData data, boolean compress) throws IOException { return MultipleProcessInstanceDataEventDeserializer.readFromBytes(Base64.getDecoder().decode(data.toBytes()), compress); } + + private static byte[] serialize(Collection> data, + boolean compress) throws IOException { + return Base64.getEncoder().encode(MultipleProcessInstanceDataEventSerializer.dataAsBytes(data, compress)); + } } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java index 42825e9679c..42b219b46f1 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java @@ -62,14 +62,14 @@ public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen, if (compress) { gen.writeBooleanField(MultipleProcessInstanceDataEvent.COMPRESS_DATA, true); } - gen.writeBinaryField("data", dataAsBytes(gen, value.getData(), compress)); + gen.writeBinaryField("data", dataAsBytes(value.getData(), compress)); gen.writeEndObject(); } else { defaultSerializer.serialize(value, gen, serializers); } } - private byte[] dataAsBytes(JsonGenerator gen, Collection> data, boolean compress) throws IOException { + static byte[] dataAsBytes(Collection> data, boolean compress) throws IOException { ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) { logger.trace("Writing size {}", data.size()); diff --git a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java index d278e2c4279..50d2f6e8674 100644 --- a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java +++ b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java @@ -22,8 +22,11 @@ import java.net.URI; import java.time.OffsetDateTime; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; @@ -40,8 +43,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.cloudevents.SpecVersion; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.data.PojoCloudEventData; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; import io.cloudevents.jackson.JsonFormat; +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; import static org.assertj.core.api.Assertions.assertThat; import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate; @@ -130,14 +142,103 @@ void processInstanceDataEvent() throws Exception { @Test void multipleInstanceDataEvent() throws IOException { JsonNode expectedVarValue = OBJECT_MAPPER.createObjectNode().put("name", "John Doe"); - int standard = processMultipleInstanceDataEvent(expectedVarValue, false, false); - int binary = processMultipleInstanceDataEvent(expectedVarValue, true, false); - int binaryCompressed = processMultipleInstanceDataEvent(expectedVarValue, true, true); - assertThat(standard).isGreaterThan(binary); - assertThat(binary).isGreaterThan(binaryCompressed); + processMultipleInstanceDataEvent(expectedVarValue, false, false, this::serializeAsStructured); + processMultipleInstanceDataEvent(expectedVarValue, true, false, this::serializeAsStructured); + processMultipleInstanceDataEvent(expectedVarValue, true, true, this::serializeAsStructured); + processMultipleInstanceDataEvent(expectedVarValue, false, false, this::serializeAsBinary); + processMultipleInstanceDataEvent(expectedVarValue, true, false, this::serializeAsBinary); + processMultipleInstanceDataEvent(expectedVarValue, true, true, this::serializeAsBinary); } - private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean binary, boolean compress) throws IOException { + private MultipleProcessInstanceDataEvent serializeAsStructured(MultipleProcessInstanceDataEvent event) throws IOException { + return OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsBytes(event), MultipleProcessInstanceDataEvent.class); + } + + private record CloudEventHolder(Map headers, byte[] data) { + } + + private static class TestMessageWriter implements CloudEventWriter, MessageWriter, CloudEventHolder> { + + private Map headers = new HashMap<>(); + private byte[] value; + + @Override + public TestMessageWriter create(SpecVersion version) throws CloudEventRWException { + headers.put("specversion", version.toString()); + return this; + } + + @Override + public CloudEventHolder setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + this.value = value; + return this.end(); + } + + @Override + public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { + headers.put(name, value); + return this; + } + + @Override + public CloudEventHolder end(CloudEventData data) throws CloudEventRWException { + this.value = data.toBytes(); + return this.end(); + } + + @Override + public CloudEventHolder end() throws CloudEventRWException { + return new CloudEventHolder(headers, value); + } + } + + private class TestMessageReader extends BaseGenericBinaryMessageReaderImpl { + private Map headers; + + protected TestMessageReader(SpecVersion version, CloudEventHolder body) { + super(version, body.data() == null ? null : BytesCloudEventData.wrap(body.data())); + this.headers = body.headers(); + } + + @Override + protected boolean isContentTypeHeader(String key) { + return false; + } + + @Override + protected boolean isCloudEventsHeader(String key) { + return true; + } + + @Override + protected String toCloudEventsKey(String key) { + return key; + } + + @Override + protected void forEachHeader(BiConsumer fn) { + headers.forEach(fn); + } + + @Override + protected String toCloudEventsValue(String value) { + return value; + } + + } + + private MultipleProcessInstanceDataEvent serializeAsBinary(MultipleProcessInstanceDataEvent event) throws IOException { + CloudEvent toSerialize = event.asCloudEvent(value -> PojoCloudEventData.wrap(value, MultipleProcessDataInstanceConverterFactory.toCloudEvent(event, OBJECT_MAPPER))); + CloudEventHolder holder = new TestMessageWriter().writeBinary(toSerialize); + CloudEvent deserialized = new TestMessageReader(SpecVersion.V1, holder).toEvent(); + return DataEventFactory.from(new MultipleProcessInstanceDataEvent(), deserialized, MultipleProcessDataInstanceConverterFactory.fromCloudEvent(deserialized, OBJECT_MAPPER)); + } + + private static interface CheckedUnaryOperator { + T apply(T obj) throws IOException; + } + + private void processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean binary, boolean compress, CheckedUnaryOperator operator) throws IOException { ProcessInstanceStateDataEvent stateEvent = new ProcessInstanceStateDataEvent(); setBaseEventValues(stateEvent, ProcessInstanceStateDataEvent.STATE_TYPE); stateEvent.setData(ProcessInstanceStateEventBody.create().eventDate(toDate(TIME)).eventType(EVENT_TYPE).eventUser(SUBJECT) @@ -185,20 +286,9 @@ private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean event.setCompressed(compress); } - byte[] json = OBJECT_MAPPER.writeValueAsBytes(event); - logger.info("Serialized chunk size is {}", json.length); - - // cloud event structured mode check - MultipleProcessInstanceDataEvent deserializedEvent = OBJECT_MAPPER.readValue(json, MultipleProcessInstanceDataEvent.class); - assertThat(deserializedEvent.getData()).hasSize(event.getData().size()); - assertMultipleIntance(deserializedEvent, expectedVarValue); - - // cloud event binary mode check - CloudEvent cloudEvent = OBJECT_MAPPER.readValue(json, CloudEvent.class); - deserializedEvent = DataEventFactory.from(new MultipleProcessInstanceDataEvent(), cloudEvent, MultipleProcessDataInstanceConverterFactory.fromCloudEvent(cloudEvent, OBJECT_MAPPER)); + MultipleProcessInstanceDataEvent deserializedEvent = operator.apply(event); assertThat(deserializedEvent.getData()).hasSize(event.getData().size()); assertMultipleIntance(deserializedEvent, expectedVarValue); - return json.length; } private void assertMultipleIntance(MultipleProcessInstanceDataEvent deserializedEvent, JsonNode expectedVarValue) {