diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceEndpointProvisioner.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceEndpointProvisioner.java index fb5fa14e..7ef63c3c 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceEndpointProvisioner.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceEndpointProvisioner.java @@ -256,12 +256,6 @@ private Queue provisionErrorQueue(String errorQueueName, ExtendedConsumerPropert } public void addSubscriptionToQueue(Queue queue, String topicName, SolaceCommonProperties properties, boolean isDestinationSubscription) { - if (!isDestinationSubscription && queue.isDurable() && !properties.isAddDestinationAsSubscriptionToQueue()) { - LOGGER.debug("Provision subscriptions to durable queues was disabled, queue {} will not be subscribed to topic {}", - queue.getName(), topicName); - return; - } - if (isDestinationSubscription && !properties.isAddDestinationAsSubscriptionToQueue()) { LOGGER.debug("Adding destination as subscription was disabled, queue {} will not be subscribed to topic {}", queue.getName(), topicName); diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java index 2e2b7535..210c856d 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java @@ -96,8 +96,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -848,9 +851,11 @@ public void testFailConsumerProvisioningOnErrorQueuePropertyChange(JCSMPSession } } - @Test + @CartesianTest(name = "[{index}] addDestinationAsSubscriptionToQueue={0}") @Execution(ExecutionMode.CONCURRENT) - public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Exception { + public void testConsumerAdditionalSubscriptions( + @Values(booleans = {false, true}) boolean addDestinationAsSubscriptionToQueue, + TestInfo testInfo) throws Exception { SolaceTestBinder binder = getBinder(); DirectChannel moduleOutputChannel0 = createBindableChannel("output0", new BindingProperties()); @@ -867,6 +872,8 @@ public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Except destination1, moduleOutputChannel1, createProducerProperties(testInfo)); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + // this flag shouldn't do anything for additional-subscriptions + consumerProperties.getExtension().setAddDestinationAsSubscriptionToQueue(addDestinationAsSubscriptionToQueue); consumerProperties.getExtension() .setQueueAdditionalSubscriptions(new String[]{wildcardDestination1, "some-random-sub"}); @@ -879,19 +886,31 @@ public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Except binderBindUnbindLatency(); - final CountDownLatch latch = new CountDownLatch(2); + final BlockingQueue receivedMsgDestinations = new ArrayBlockingQueue<>(10); moduleInputChannel.subscribe(message1 -> { logger.info(String.format("Received message %s", message1)); - latch.countDown(); + Optional.ofNullable(message1.getHeaders().get(SolaceHeaders.DESTINATION, Destination.class)) + .ifPresent(receivedMsgDestinations::add); }); - logger.info(String.format("Sending message to destination %s: %s", destination0, message)); - moduleOutputChannel0.send(message); + if (addDestinationAsSubscriptionToQueue) { + logger.info(String.format("Sending message to destination %s: %s", destination0, message)); + moduleOutputChannel0.send(message); + assertThat(receivedMsgDestinations.poll(10, TimeUnit.SECONDS)) + .extracting(Destination::getName) + .isEqualTo(destination0); + } logger.info(String.format("Sending message to destination %s: %s", destination1, message)); moduleOutputChannel1.send(message); + assertThat(receivedMsgDestinations.poll(10, TimeUnit.SECONDS)) + .extracting(Destination::getName) + .isEqualTo(destination1); + + assertThat(receivedMsgDestinations) + .as("An unexpected message was read") + .isEmpty(); - assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); TimeUnit.SECONDS.sleep(1); // Give bindings a sec to finish processing successful message consume producerBinding0.unbind(); producerBinding1.unbind(); diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderSubscriptionsIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderSubscriptionsIT.java index 534a88fc..8ff46244 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderSubscriptionsIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderSubscriptionsIT.java @@ -29,7 +29,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -170,9 +169,7 @@ private static void assertActualSubscriptionsAreCorrect(SpringCloudStreamContext boolean addDestinationAsSubscriptionToQueue) throws ApiException { String msgVpnName = (String) context.getJcsmpSession().getProperty(JCSMPProperties.VPN_NAME); - Set expectedSubscriptions = getExpectedQueueSubscriptions( - sempV2Api.monitor().getMsgVpnQueue(msgVpnName, queueName, null).getData().isDurable(), - addDestinationAsSubscriptionToQueue); + Set expectedSubscriptions = getExpectedQueueSubscriptions(addDestinationAsSubscriptionToQueue); List actualSubscriptions = sempV2Api.monitor().getMsgVpnQueueSubscriptions(msgVpnName, queueName, null, null, null, null).getData(); @@ -183,17 +180,14 @@ private static void assertActualSubscriptionsAreCorrect(SpringCloudStreamContext assertEquals(expectedSubscriptions.size(), actualSubscriptions.size(), "Some subscriptions are missing"); } - private static Set getExpectedQueueSubscriptions(boolean isDurableQueue, - boolean addDestinationAsSubscriptionToQueue) { + private static Set getExpectedQueueSubscriptions(boolean addDestinationAsSubscriptionToQueue) { if (addDestinationAsSubscriptionToQueue) { Set expectedSubscriptions = new HashSet<>(); expectedSubscriptions.add(DESTINATION); expectedSubscriptions.addAll(Arrays.asList(ADDITIONAL_SUBSCRIPTIONS)); return expectedSubscriptions; - } else if (!isDurableQueue) { - return new HashSet<>(Arrays.asList(ADDITIONAL_SUBSCRIPTIONS)); } else { - return Collections.emptySet(); + return new HashSet<>(Arrays.asList(ADDITIONAL_SUBSCRIPTIONS)); } } }