Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-82456: fix queueAdditionalSubscriptions when addDestinationAsSubscription=false #325

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,6 @@ private Queue provisionErrorQueue(String errorQueueName, ExtendedConsumerPropert
}

public void addSubscriptionToQueue(Queue queue, String topicName, SolaceCommonProperties properties, boolean isDestinationSubscription) {
if (!isDestinationSubscription && queue.isDurable() && !properties.isAddDestinationAsSubscriptionToQueue()) {
LOGGER.debug("Provision subscriptions to durable queues was disabled, queue {} will not be subscribed to topic {}",
queue.getName(), topicName);
return;
}

if (isDestinationSubscription && !properties.isAddDestinationAsSubscriptionToQueue()) {
LOGGER.debug("Adding destination as subscription was disabled, queue {} will not be subscribed to topic {}",
queue.getName(), topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -848,9 +851,11 @@ public void testFailConsumerProvisioningOnErrorQueuePropertyChange(JCSMPSession
}
}

@Test
@CartesianTest(name = "[{index}] addDestinationAsSubscriptionToQueue={0}")
@Execution(ExecutionMode.CONCURRENT)
public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Exception {
public void testConsumerAdditionalSubscriptions(
@Values(booleans = {false, true}) boolean addDestinationAsSubscriptionToQueue,
TestInfo testInfo) throws Exception {
SolaceTestBinder binder = getBinder();

DirectChannel moduleOutputChannel0 = createBindableChannel("output0", new BindingProperties());
Expand All @@ -867,6 +872,8 @@ public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Except
destination1, moduleOutputChannel1, createProducerProperties(testInfo));

ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties = createConsumerProperties();
// this flag shouldn't do anything for additional-subscriptions
consumerProperties.getExtension().setAddDestinationAsSubscriptionToQueue(addDestinationAsSubscriptionToQueue);
consumerProperties.getExtension()
.setQueueAdditionalSubscriptions(new String[]{wildcardDestination1, "some-random-sub"});

Expand All @@ -879,19 +886,31 @@ public void testConsumerAdditionalSubscriptions(TestInfo testInfo) throws Except

binderBindUnbindLatency();

final CountDownLatch latch = new CountDownLatch(2);
final BlockingQueue<Destination> receivedMsgDestinations = new ArrayBlockingQueue<>(10);
moduleInputChannel.subscribe(message1 -> {
logger.info(String.format("Received message %s", message1));
latch.countDown();
Optional.ofNullable(message1.getHeaders().get(SolaceHeaders.DESTINATION, Destination.class))
.ifPresent(receivedMsgDestinations::add);
});

logger.info(String.format("Sending message to destination %s: %s", destination0, message));
moduleOutputChannel0.send(message);
if (addDestinationAsSubscriptionToQueue) {
logger.info(String.format("Sending message to destination %s: %s", destination0, message));
moduleOutputChannel0.send(message);
assertThat(receivedMsgDestinations.poll(10, TimeUnit.SECONDS))
.extracting(Destination::getName)
.isEqualTo(destination0);
}

logger.info(String.format("Sending message to destination %s: %s", destination1, message));
moduleOutputChannel1.send(message);
assertThat(receivedMsgDestinations.poll(10, TimeUnit.SECONDS))
.extracting(Destination::getName)
.isEqualTo(destination1);

assertThat(receivedMsgDestinations)
.as("An unexpected message was read")
.isEmpty();

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
TimeUnit.SECONDS.sleep(1); // Give bindings a sec to finish processing successful message consume
producerBinding0.unbind();
producerBinding1.unbind();
Expand Down
Loading