From 94375cc144f3f3fa23fd3d25755bb0ac1016da40 Mon Sep 17 00:00:00 2001 From: Jeffrey Douangpaseuth <11084623+Nephery@users.noreply.github.com> Date: Thu, 12 Oct 2023 15:48:02 -0400 Subject: [PATCH] add solace_scst_partitionKey header --- .../README.adoc | 36 +++++++- .../messaging/SolaceBinderHeaderMeta.java | 5 +- .../binder/messaging/SolaceBinderHeaders.java | 8 ++ .../stream/binder/util/XMLMessageMapper.java | 32 ++++++- .../binder/util/JmsCompatibilityIT.java | 3 + .../binder/util/XMLMessageMapperTest.java | 87 +++++++++++++++---- 6 files changed, 144 insertions(+), 27 deletions(-) diff --git a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc index 52ed13a4..2d4c6294 100644 --- a/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc +++ b/solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc @@ -35,8 +35,6 @@ In Solace, the above setup is called topic-to-queue mapping. So a typical messag + NOTE: Round-robin distribution only occurs if the consumer group's queue is configured for non-exclusive access. If the queue has exclusive access, then only one consumer will receive messages. -NOTE: Partitioning is not yet supported by this version of the binder. - IMPORTANT: Since consumer bindings always consumes from queues it is required that Assured Delivery is enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud). Additionally, the client username's client profile must be allowed to send and receive guaranteed messages. For the sake of brevity, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/[Spring's documentation]. This document will solely focus on discussing components unique to Solace. @@ -639,6 +637,12 @@ The consolidated list of message headers for a batch of messages where the heade | | Present and true to indicate when the PubSub+ message payload was null. +| solace_scst_partitionKey +| String +| Write +| +| The partition key for PubSub+ partitioned queues. + | solace_scst_serializedPayload | Boolean | Internal Binder Use Only @@ -844,6 +848,34 @@ See <> for more info regarding this binder's natively supp To create a batch of messages, the binder will consume messages from the PubSub+ broker until either a maximum batch size or timeout has been achieved. After which, the binder will compose the batch message and send it to the consumer handler for processing. Both these batching parameters can be configured using the `batchMaxSize` and `batchTimeout` consumer config options. +== Partitioning + +[NOTE] +==== +The Solace PubSub+ broker supports partitioning natively. + +The partitioning abstraction as described in the https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#partitioning[Spring Cloud Stream documentation] is not supported. +==== + +To publish messages that are intended for partitioned queues, you must provide a partition key by setting the `solace_scst_partitionKey` message header (accessible through the `SolaceBinderHeaders.PARTITION_KEY` constant). + +For example: + +[source,java] +---- +public class MyMessageBuilder { + public Message buildMeAMessage() { + return MessageBuilder.withPayload("payload") + .setHeader(SolaceBinderHeaders.PARTITION_KEY, "partition-key") + .build(); + } +} +---- + +As for consuming messages from partitioned queues, this is handled transparently by the PubSub+ broker. That is to say, consuming messages from a partitioned queue is no different from consuming messages from any other queue. + +See https://docs.solace.com/Messaging/Guaranteed-Msg/Queues.htm#partitioned-queues[Partitioned Queues] for more. + == Manual Message Acknowledgment Message handlers can disable auto-acknowledgement and manually invoke the acknowledgement callback as follows: diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaderMeta.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaderMeta.java index 65144160..a1d2f950 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaderMeta.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaderMeta.java @@ -1,14 +1,15 @@ package com.solace.spring.cloud.stream.binder.messaging; +import com.solace.spring.cloud.stream.binder.util.CorrelationData; + import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; -import com.solace.spring.cloud.stream.binder.util.CorrelationData; - public class SolaceBinderHeaderMeta implements HeaderMeta { public static final Map> META = Stream.of(new Object[][] { + {SolaceBinderHeaders.PARTITION_KEY, new SolaceBinderHeaderMeta<>(String.class, false, true, Scope.WIRE)}, {SolaceBinderHeaders.MESSAGE_VERSION, new SolaceBinderHeaderMeta<>(Integer.class, true, false, Scope.WIRE)}, {SolaceBinderHeaders.SERIALIZED_PAYLOAD, new SolaceBinderHeaderMeta<>(Boolean.class, false, false, Scope.WIRE)}, {SolaceBinderHeaders.SERIALIZED_HEADERS, new SolaceBinderHeaderMeta<>(String.class, false, false, Scope.WIRE)}, diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java index d0d07782..dcabbf44 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java @@ -24,6 +24,14 @@ public final class SolaceBinderHeaders { */ static final String PREFIX = SolaceHeaders.PREFIX + "scst_"; + /** + *

Acceptable Value Type: {@link String}

+ *

Access: Write

+ *
+ *

The partition key for PubSub+ partitioned queues.

+ */ + public static final String PARTITION_KEY = PREFIX + "partitionKey"; + /** *

Acceptable Value Type: {@link Integer}

*

Access: Read

diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapper.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapper.java index 0d5d107f..469aea4e 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapper.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapper.java @@ -7,7 +7,6 @@ import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaderMeta; import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaders; import com.solace.spring.cloud.stream.binder.messaging.SolaceHeaderMeta; -import com.solace.spring.cloud.stream.binder.messaging.SolaceHeaders; import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; import com.solacesystems.common.util.ByteArray; import com.solacesystems.jcsmp.BytesMessage; @@ -29,7 +28,6 @@ import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.DefaultMessageBuilderFactory; -import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -50,10 +48,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class XMLMessageMapper { private static final Log logger = LogFactory.getLog(XMLMessageMapper.class); @@ -308,6 +305,12 @@ SDTMap map(MessageHeaders headers, Collection excludedHeaders, boolean c convertNonSerializableHeadersToString); } + if (headers.containsKey(SolaceBinderHeaders.PARTITION_KEY)) { + rethrowableCall(metadata::putString, XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, + this., String>rethrowableCall(headers::get, + SolaceBinderHeaders.PARTITION_KEY, String.class)); + } + if (!serializedHeaders.isEmpty()) { rethrowableCall(metadata::putString, SolaceBinderHeaders.SERIALIZED_HEADERS, rethrowableCall(stringSetWriter::writeValueAsString, serializedHeaders)); @@ -406,6 +409,10 @@ private R rethrowableCall(ThrowingFunction function, T var) { return function.apply(var); } + private R rethrowableCall(ThrowingBiFunction function, T var0, U var1) { + return function.apply(var0, var1); + } + private void rethrowableCall(ThrowingBiConsumer consumer, T var0, U var1) { consumer.accept(var0, var1); } @@ -441,6 +448,23 @@ default R apply(T t) { R applyThrows(T t) throws Exception; } + @FunctionalInterface + private interface ThrowingBiFunction extends BiFunction { + + @Override + default R apply(T t, U u) { + try { + return applyThrows(t, u); + } catch (Exception e) { + SolaceMessageConversionException wrappedException = new SolaceMessageConversionException(e); + logger.warn(wrappedException); + throw wrappedException; + } + } + + R applyThrows(T t, U u) throws Exception; + } + @FunctionalInterface private interface ThrowingBiConsumer extends BiConsumer { diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java index d0fc3ec1..ffe18e2e 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java @@ -169,6 +169,9 @@ public void testBinderHeaders(SoftAssertions softly) throws Exception { for (Map.Entry> headerMeta : SolaceBinderHeaderMeta.META.entrySet()) { if (!HeaderMeta.Scope.WIRE.equals(headerMeta.getValue().getScope())) continue; String headerName = headerMeta.getKey(); + if (headerName.equals(SolaceBinderHeaders.PARTITION_KEY)) { + headerName = XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY; + } // Everything should be receivable as a String in JMS softly.assertThat(msg.getStringProperty(headerName)) .withFailMessage("Expecting JMS property %s to not be null", headerName) diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java index f4d55838..1a0044c5 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java @@ -259,6 +259,12 @@ public void testMapSpringMessageToXMLMessage_WriteSolaceProperties() throws Exce case SolaceHeaders.USER_DATA: value = RandomStringUtils.randomAlphanumeric(10).getBytes(); break; + case SolaceBinderHeaders.PARTITION_KEY: + value = RandomStringUtils.randomAlphanumeric(10); + // This value is overwritten by binder-defined partition key header + messageBuilder.setHeader(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, + RandomStringUtils.randomAlphanumeric(10)); + break; default: value = null; fail(String.format("no test for header %s", header.getKey())); @@ -315,6 +321,10 @@ public void testMapSpringMessageToXMLMessage_WriteSolaceProperties() throws Exce case SolaceHeaders.USER_DATA: assertEquals(expectedValue, xmlMessage.getUserData()); break; + case SolaceBinderHeaders.PARTITION_KEY: + assertEquals(expectedValue, xmlMessage.getProperties() + .getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)); + break; default: fail(String.format("no test for header %s", header.getKey())); } @@ -366,7 +376,7 @@ public void testMapSpringMessageToXMLMessage_NonWriteableSolaceProperties() thro assertFalse(xmlMessage.getRedelivered()); break; case SolaceBinderHeaders.MESSAGE_VERSION: - assertEquals(new Integer(XMLMessageMapper.MESSAGE_VERSION), + assertEquals(Integer.valueOf(XMLMessageMapper.MESSAGE_VERSION), xmlMessage.getProperties().getInteger(header.getKey())); break; case SolaceBinderHeaders.SERIALIZED_HEADERS: @@ -485,10 +495,21 @@ public void testFailMapSpringMessageToXMLMessage_InvalidHeaderType() { xmlMessageMapper.map(testSpringMessage, null, false); fail(String.format("Expected message mapping to fail for header %s", header.getKey())); } catch (SolaceMessageConversionException e) { - assertEquals(e.getMessage(), String.format( - "Message %s has an invalid value type for header %s. Expected %s but received %s.", - testSpringMessage.getHeaders().getId(), header.getKey(), header.getValue().getType(), - Object.class)); + if (header.getValue() instanceof SolaceHeaderMeta) { + assertEquals(e.getMessage(), String.format( + "Message %s has an invalid value type for header %s. Expected %s but received %s.", + testSpringMessage.getHeaders().getId(), header.getKey(), header.getValue().getType(), + Object.class)); + } else { + switch (header.getKey()) { + case SolaceBinderHeaders.PARTITION_KEY -> + Assertions.assertThat(e).rootCause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContainingAll(header.getKey(), header.getValue().getType().toString()); + default -> + fail(String.format("no test for header %s", header.getKey())); + } + } } } } @@ -585,6 +606,7 @@ public void testMapProducerSpringMessageToXMLMessage_WithExcludedHeader_ShouldNo case SolaceHeaders.CORRELATION_ID: case SolaceHeaders.HTTP_CONTENT_ENCODING: case SolaceHeaders.SENDER_ID: + case SolaceBinderHeaders.PARTITION_KEY: value = RandomStringUtils.randomAlphanumeric(10); break; case SolaceHeaders.DMQ_ELIGIBLE: @@ -668,16 +690,22 @@ public void testMapProducerSpringMessageToXMLMessage_WithExcludedHeader_ShouldNo case SolaceHeaders.USER_DATA: assertEquals(expectedValue, xmlMessage.getUserData()); break; + case SolaceBinderHeaders.PARTITION_KEY: + assertEquals(expectedValue, xmlMessage.getProperties() + .getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)); + break; default: fail(String.format("no test for header %s", header.getKey())); } } - for (Map.Entry> binderHeaderMetaEntry : SolaceBinderHeaderMeta.META.entrySet()) { - if (SolaceHeaderMeta.Scope.WIRE.equals(binderHeaderMetaEntry.getValue().getScope())) { - assertNotNull(xmlMessage.getProperties().get(binderHeaderMetaEntry.getKey())); - } - } + Assertions.assertThat(SolaceBinderHeaderMeta.META + .entrySet() + .stream() + .filter(e -> SolaceHeaderMeta.Scope.WIRE.equals(e.getValue().getScope())) + .filter(e -> !e.getValue().isWritable()) // already tested earlier in this test + .map(Map.Entry::getKey)) + .allSatisfy(h -> Assertions.assertThat(xmlMessage.getProperties().get(h)).isNotNull()); Mockito.verify(xmlMessageMapper).map(testSpringMessage, excludedHeaders, false); } @@ -1062,6 +1090,9 @@ public void testMapXMLMessageToSpringMessage_NonReadableSolaceProperties(boolean case SolaceBinderHeaders.TARGET_DESTINATION_TYPE: metadata.putString(header.getKey(), "topic"); break; + case SolaceBinderHeaders.PARTITION_KEY: + metadata.putString(header.getKey(), "partitionKey"); + break; default: fail(String.format("no test for header %s", header.getKey())); } @@ -1303,6 +1334,17 @@ public void testMapXMLMessageToSpringMessage_WithListPayload(boolean batchMode) } } + @Test + public void testMapMessageHeadersToSDTMap_JMSXGroupID() throws Exception { + String jmsxGroupID = "partition-key-value"; + SDTMap sdtMap = xmlMessageMapper.map( + new MessageHeaders(Collections.singletonMap( + XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, jmsxGroupID)), + Collections.emptyList(), false); + assertThat(sdtMap.keySet(), hasItem(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)); + assertEquals(jmsxGroupID, sdtMap.getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)); + } + @Test public void testMapMessageHeadersToSDTMap_Serializable() throws Exception { String key = "a"; @@ -1559,15 +1601,22 @@ private void validateXMLProperties(XMLMessage xmlMessage, Message springMessa .filter(h -> h.getValue().isReadable()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - for (Map.Entry header : expectedHeaders.entrySet()) { - if (readWriteableSolaceHeaders.containsKey(header.getKey())) { - Object value = readWriteableSolaceHeaders.get(header.getKey()).getReadAction().apply(xmlMessage); - assertEquals(header.getValue(), value); - } else if (!serializedHeaders.contains(header.getKey())) { - assertThat(metadata.keySet(), hasItem(header.getKey())); - assertEquals(header.getValue(), metadata.get(header.getKey())); - } - } + Assertions.assertThat(expectedHeaders) + .allSatisfy((headerKey, headerValue) -> { + if (readWriteableSolaceHeaders.containsKey(headerKey)) { + Object value = readWriteableSolaceHeaders.get(headerKey).getReadAction().apply(xmlMessage); + assertEquals(headerValue, value); + } else if (!serializedHeaders.contains(headerKey)) { + switch (headerKey) { + case SolaceBinderHeaders.PARTITION_KEY -> + headerKey = XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY; + case XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY -> + headerValue = expectedHeaders.getOrDefault(SolaceBinderHeaders.PARTITION_KEY, headerValue); + } + assertThat(metadata.keySet(), hasItem(headerKey)); + assertEquals(headerValue, ((ThrowingFunction) metadata::get).apply(headerKey)); + } + }); } private void validateSpringHeaders(MessageHeaders messageHeaders, XMLMessage xmlMessage)