Skip to content

Commit

Permalink
Merge pull request #238 from SolaceDev/native-partitioning
Browse files Browse the repository at this point in the history
add support for PubSub+ partitioned queues
  • Loading branch information
Nephery authored Oct 12, 2023
2 parents 94a4bdf + 94375cc commit 8377693
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ In Solace, the above setup is called topic-to-queue mapping. So a typical messag
+
NOTE: Round-robin distribution only occurs if the consumer group's queue is configured for non-exclusive access. If the queue has exclusive access, then only one consumer will receive messages.

NOTE: Partitioning is not yet supported by this version of the binder.

IMPORTANT: Since consumer bindings always consumes from queues it is required that Assured Delivery is enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud). Additionally, the client username's client profile must be allowed to send and receive guaranteed messages.

For the sake of brevity, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/[Spring's documentation]. This document will solely focus on discussing components unique to Solace.
Expand Down Expand Up @@ -639,6 +637,12 @@ The consolidated list of message headers for a batch of messages where the heade
|
| Present and true to indicate when the PubSub+ message payload was null.

| solace_scst_partitionKey
| String
| Write
|
| The partition key for PubSub+ partitioned queues.

| solace_scst_serializedPayload
| Boolean
| Internal Binder Use Only
Expand Down Expand Up @@ -844,6 +848,34 @@ See <<Native Payload Types>> for more info regarding this binder's natively supp

To create a batch of messages, the binder will consume messages from the PubSub+ broker until either a maximum batch size or timeout has been achieved. After which, the binder will compose the batch message and send it to the consumer handler for processing. Both these batching parameters can be configured using the `batchMaxSize` and `batchTimeout` consumer config options.

== Partitioning

[NOTE]
====
The Solace PubSub+ broker supports partitioning natively.
The partitioning abstraction as described in the https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#partitioning[Spring Cloud Stream documentation] is not supported.
====

To publish messages that are intended for partitioned queues, you must provide a partition key by setting the `solace_scst_partitionKey` message header (accessible through the `SolaceBinderHeaders.PARTITION_KEY` constant).

For example:

[source,java]
----
public class MyMessageBuilder {
public Message<String> buildMeAMessage() {
return MessageBuilder.withPayload("payload")
.setHeader(SolaceBinderHeaders.PARTITION_KEY, "partition-key")
.build();
}
}
----

As for consuming messages from partitioned queues, this is handled transparently by the PubSub+ broker. That is to say, consuming messages from a partitioned queue is no different from consuming messages from any other queue.

See https://docs.solace.com/Messaging/Guaranteed-Msg/Queues.htm#partitioned-queues[Partitioned Queues] for more.

== Manual Message Acknowledgment

Message handlers can disable auto-acknowledgement and manually invoke the acknowledgement callback as follows:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.solace.spring.cloud.stream.binder.messaging;

import com.solace.spring.cloud.stream.binder.util.CorrelationData;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.solace.spring.cloud.stream.binder.util.CorrelationData;

public class SolaceBinderHeaderMeta<T> implements HeaderMeta<T> {
public static final Map<String, SolaceBinderHeaderMeta<?>> META = Stream.of(new Object[][] {
{SolaceBinderHeaders.PARTITION_KEY, new SolaceBinderHeaderMeta<>(String.class, false, true, Scope.WIRE)},
{SolaceBinderHeaders.MESSAGE_VERSION, new SolaceBinderHeaderMeta<>(Integer.class, true, false, Scope.WIRE)},
{SolaceBinderHeaders.SERIALIZED_PAYLOAD, new SolaceBinderHeaderMeta<>(Boolean.class, false, false, Scope.WIRE)},
{SolaceBinderHeaders.SERIALIZED_HEADERS, new SolaceBinderHeaderMeta<>(String.class, false, false, Scope.WIRE)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ public final class SolaceBinderHeaders {
*/
static final String PREFIX = SolaceHeaders.PREFIX + "scst_";

/**
* <p><b>Acceptable Value Type:</b> {@link String}</p>
* <p><b>Access:</b> Write</p>
* <br>
* <p>The partition key for PubSub+ partitioned queues.</p>
*/
public static final String PARTITION_KEY = PREFIX + "partitionKey";

/**
* <p><b>Acceptable Value Type:</b> {@link Integer}</p>
* <p><b>Access:</b> Read</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaderMeta;
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaders;
import com.solace.spring.cloud.stream.binder.messaging.SolaceHeaderMeta;
import com.solace.spring.cloud.stream.binder.messaging.SolaceHeaders;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solacesystems.common.util.ByteArray;
import com.solacesystems.jcsmp.BytesMessage;
Expand All @@ -29,7 +28,6 @@
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
Expand All @@ -50,10 +48,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class XMLMessageMapper {
private static final Log logger = LogFactory.getLog(XMLMessageMapper.class);
Expand Down Expand Up @@ -308,6 +305,12 @@ SDTMap map(MessageHeaders headers, Collection<String> excludedHeaders, boolean c
convertNonSerializableHeadersToString);
}

if (headers.containsKey(SolaceBinderHeaders.PARTITION_KEY)) {
rethrowableCall(metadata::putString, XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY,
this.<String, Class<String>, String>rethrowableCall(headers::get,
SolaceBinderHeaders.PARTITION_KEY, String.class));
}

if (!serializedHeaders.isEmpty()) {
rethrowableCall(metadata::putString, SolaceBinderHeaders.SERIALIZED_HEADERS,
rethrowableCall(stringSetWriter::writeValueAsString, serializedHeaders));
Expand Down Expand Up @@ -406,6 +409,10 @@ private <T,R> R rethrowableCall(ThrowingFunction<T,R> function, T var) {
return function.apply(var);
}

private <T,U,R> R rethrowableCall(ThrowingBiFunction<T,U,R> function, T var0, U var1) {
return function.apply(var0, var1);
}

private <T,U> void rethrowableCall(ThrowingBiConsumer<T,U> consumer, T var0, U var1) {
consumer.accept(var0, var1);
}
Expand Down Expand Up @@ -441,6 +448,23 @@ default R apply(T t) {
R applyThrows(T t) throws Exception;
}

@FunctionalInterface
private interface ThrowingBiFunction<T,U,R> extends BiFunction<T,U,R> {

@Override
default R apply(T t, U u) {
try {
return applyThrows(t, u);
} catch (Exception e) {
SolaceMessageConversionException wrappedException = new SolaceMessageConversionException(e);
logger.warn(wrappedException);
throw wrappedException;
}
}

R applyThrows(T t, U u) throws Exception;
}

@FunctionalInterface
private interface ThrowingBiConsumer<T,U> extends BiConsumer<T,U> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ public void testBinderHeaders(SoftAssertions softly) throws Exception {
for (Map.Entry<String, SolaceBinderHeaderMeta<?>> headerMeta : SolaceBinderHeaderMeta.META.entrySet()) {
if (!HeaderMeta.Scope.WIRE.equals(headerMeta.getValue().getScope())) continue;
String headerName = headerMeta.getKey();
if (headerName.equals(SolaceBinderHeaders.PARTITION_KEY)) {
headerName = XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY;
}
// Everything should be receivable as a String in JMS
softly.assertThat(msg.getStringProperty(headerName))
.withFailMessage("Expecting JMS property %s to not be null", headerName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ public void testMapSpringMessageToXMLMessage_WriteSolaceProperties() throws Exce
case SolaceHeaders.USER_DATA:
value = RandomStringUtils.randomAlphanumeric(10).getBytes();
break;
case SolaceBinderHeaders.PARTITION_KEY:
value = RandomStringUtils.randomAlphanumeric(10);
// This value is overwritten by binder-defined partition key header
messageBuilder.setHeader(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY,
RandomStringUtils.randomAlphanumeric(10));
break;
default:
value = null;
fail(String.format("no test for header %s", header.getKey()));
Expand Down Expand Up @@ -315,6 +321,10 @@ public void testMapSpringMessageToXMLMessage_WriteSolaceProperties() throws Exce
case SolaceHeaders.USER_DATA:
assertEquals(expectedValue, xmlMessage.getUserData());
break;
case SolaceBinderHeaders.PARTITION_KEY:
assertEquals(expectedValue, xmlMessage.getProperties()
.getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY));
break;
default:
fail(String.format("no test for header %s", header.getKey()));
}
Expand Down Expand Up @@ -366,7 +376,7 @@ public void testMapSpringMessageToXMLMessage_NonWriteableSolaceProperties() thro
assertFalse(xmlMessage.getRedelivered());
break;
case SolaceBinderHeaders.MESSAGE_VERSION:
assertEquals(new Integer(XMLMessageMapper.MESSAGE_VERSION),
assertEquals(Integer.valueOf(XMLMessageMapper.MESSAGE_VERSION),
xmlMessage.getProperties().getInteger(header.getKey()));
break;
case SolaceBinderHeaders.SERIALIZED_HEADERS:
Expand Down Expand Up @@ -485,10 +495,21 @@ public void testFailMapSpringMessageToXMLMessage_InvalidHeaderType() {
xmlMessageMapper.map(testSpringMessage, null, false);
fail(String.format("Expected message mapping to fail for header %s", header.getKey()));
} catch (SolaceMessageConversionException e) {
assertEquals(e.getMessage(), String.format(
"Message %s has an invalid value type for header %s. Expected %s but received %s.",
testSpringMessage.getHeaders().getId(), header.getKey(), header.getValue().getType(),
Object.class));
if (header.getValue() instanceof SolaceHeaderMeta<?>) {
assertEquals(e.getMessage(), String.format(
"Message %s has an invalid value type for header %s. Expected %s but received %s.",
testSpringMessage.getHeaders().getId(), header.getKey(), header.getValue().getType(),
Object.class));
} else {
switch (header.getKey()) {
case SolaceBinderHeaders.PARTITION_KEY ->
Assertions.assertThat(e).rootCause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContainingAll(header.getKey(), header.getValue().getType().toString());
default ->
fail(String.format("no test for header %s", header.getKey()));
}
}
}
}
}
Expand Down Expand Up @@ -585,6 +606,7 @@ public void testMapProducerSpringMessageToXMLMessage_WithExcludedHeader_ShouldNo
case SolaceHeaders.CORRELATION_ID:
case SolaceHeaders.HTTP_CONTENT_ENCODING:
case SolaceHeaders.SENDER_ID:
case SolaceBinderHeaders.PARTITION_KEY:
value = RandomStringUtils.randomAlphanumeric(10);
break;
case SolaceHeaders.DMQ_ELIGIBLE:
Expand Down Expand Up @@ -668,16 +690,22 @@ public void testMapProducerSpringMessageToXMLMessage_WithExcludedHeader_ShouldNo
case SolaceHeaders.USER_DATA:
assertEquals(expectedValue, xmlMessage.getUserData());
break;
case SolaceBinderHeaders.PARTITION_KEY:
assertEquals(expectedValue, xmlMessage.getProperties()
.getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY));
break;
default:
fail(String.format("no test for header %s", header.getKey()));
}
}

for (Map.Entry<String, SolaceBinderHeaderMeta<?>> binderHeaderMetaEntry : SolaceBinderHeaderMeta.META.entrySet()) {
if (SolaceHeaderMeta.Scope.WIRE.equals(binderHeaderMetaEntry.getValue().getScope())) {
assertNotNull(xmlMessage.getProperties().get(binderHeaderMetaEntry.getKey()));
}
}
Assertions.assertThat(SolaceBinderHeaderMeta.META
.entrySet()
.stream()
.filter(e -> SolaceHeaderMeta.Scope.WIRE.equals(e.getValue().getScope()))
.filter(e -> !e.getValue().isWritable()) // already tested earlier in this test
.map(Map.Entry::getKey))
.allSatisfy(h -> Assertions.assertThat(xmlMessage.getProperties().get(h)).isNotNull());

Mockito.verify(xmlMessageMapper).map(testSpringMessage, excludedHeaders, false);
}
Expand Down Expand Up @@ -1062,6 +1090,9 @@ public void testMapXMLMessageToSpringMessage_NonReadableSolaceProperties(boolean
case SolaceBinderHeaders.TARGET_DESTINATION_TYPE:
metadata.putString(header.getKey(), "topic");
break;
case SolaceBinderHeaders.PARTITION_KEY:
metadata.putString(header.getKey(), "partitionKey");
break;
default:
fail(String.format("no test for header %s", header.getKey()));
}
Expand Down Expand Up @@ -1303,6 +1334,17 @@ public void testMapXMLMessageToSpringMessage_WithListPayload(boolean batchMode)
}
}

@Test
public void testMapMessageHeadersToSDTMap_JMSXGroupID() throws Exception {
String jmsxGroupID = "partition-key-value";
SDTMap sdtMap = xmlMessageMapper.map(
new MessageHeaders(Collections.singletonMap(
XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, jmsxGroupID)),
Collections.emptyList(), false);
assertThat(sdtMap.keySet(), hasItem(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY));
assertEquals(jmsxGroupID, sdtMap.getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY));
}

@Test
public void testMapMessageHeadersToSDTMap_Serializable() throws Exception {
String key = "a";
Expand Down Expand Up @@ -1559,15 +1601,22 @@ private void validateXMLProperties(XMLMessage xmlMessage, Message<?> springMessa
.filter(h -> h.getValue().isReadable())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

for (Map.Entry<String,Object> header : expectedHeaders.entrySet()) {
if (readWriteableSolaceHeaders.containsKey(header.getKey())) {
Object value = readWriteableSolaceHeaders.get(header.getKey()).getReadAction().apply(xmlMessage);
assertEquals(header.getValue(), value);
} else if (!serializedHeaders.contains(header.getKey())) {
assertThat(metadata.keySet(), hasItem(header.getKey()));
assertEquals(header.getValue(), metadata.get(header.getKey()));
}
}
Assertions.assertThat(expectedHeaders)
.allSatisfy((headerKey, headerValue) -> {
if (readWriteableSolaceHeaders.containsKey(headerKey)) {
Object value = readWriteableSolaceHeaders.get(headerKey).getReadAction().apply(xmlMessage);
assertEquals(headerValue, value);
} else if (!serializedHeaders.contains(headerKey)) {
switch (headerKey) {
case SolaceBinderHeaders.PARTITION_KEY ->
headerKey = XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY;
case XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY ->
headerValue = expectedHeaders.getOrDefault(SolaceBinderHeaders.PARTITION_KEY, headerValue);
}
assertThat(metadata.keySet(), hasItem(headerKey));
assertEquals(headerValue, ((ThrowingFunction<String, Object>) metadata::get).apply(headerKey));
}
});
}

private void validateSpringHeaders(MessageHeaders messageHeaders, XMLMessage xmlMessage)
Expand Down

0 comments on commit 8377693

Please sign in to comment.