Skip to content

Commit

Permalink
uncommented integration test with fix
Browse files Browse the repository at this point in the history
Message sent in previous test is being consumed during the start of this test. Set ttl to the published message so that it is not consumed again.
  • Loading branch information
SravanThotakura05 committed Dec 18, 2023
1 parent 964cc52 commit 4f0f188
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import jakarta.enterprise.context.ApplicationScoped;

import org.awaitility.Durations;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
Expand Down Expand Up @@ -118,6 +119,7 @@ void consumerFailedProcessingPublishToErrorTopic() {
.with("mp.messaging.incoming.in.consumer.queue.publish-to-error-topic-on-failure", true)
.with("mp.messaging.incoming.in.consumer.queue.error.topic",
SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION)
.with("mp.messaging.incoming.in.consumer.queue.error.message.ttl", 1000)
.with("mp.messaging.incoming.error-in.connector", "quarkus-solace")
.with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME)
.with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive");
Expand All @@ -131,17 +133,14 @@ void consumerFailedProcessingPublishToErrorTopic() {
.start();
Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION);
OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
OutboundMessage outboundMessage = messageBuilder.build("2");
OutboundMessage outboundMessage = messageBuilder.build("1");
publisher.publish(outboundMessage, tp);

// Assert on published messages
await().untilAsserted(() -> assertThat(app.getReceived().size()).isEqualTo(0));
await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(1));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages()).contains("1"));
await().pollDelay(Durations.FIVE_SECONDS).until(() -> true);
}

@Test
Expand Down Expand Up @@ -170,12 +169,13 @@ void consumerFailedProcessingMoveToDMQ() {
Properties properties = new Properties();
properties.setProperty(SolaceProperties.MessageProperties.PERSISTENT_DMQ_ELIGIBLE, "true");
messageBuilder.fromProperties(properties);
OutboundMessage outboundMessage = messageBuilder.build("1");
OutboundMessage outboundMessage = messageBuilder.build("12");
publisher.publish(outboundMessage, tp);

// Assert on published messages
await().untilAsserted(() -> assertThat(app.getReceived().size()).isEqualTo(0));
await().untilAsserted(() -> assertThat(app.getReceivedDMQMessages().size()).isEqualTo(1));
await().untilAsserted(() -> assertThat(app.getReceivedDMQMessages()).contains("12"));
}

@Test
Expand All @@ -200,33 +200,33 @@ void consumerCreateMissingResourceAddSubscriptionPermissionException() {
+ SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic));
}

// @Test
// @Order(7)
// void consumerPublishToErrorTopicPermissionException() {
// MapBasedConfig config = new MapBasedConfig()
// .with("mp.messaging.incoming.in.connector", "quarkus-solace")
// .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
// .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
// .with("mp.messaging.incoming.in.consumer.queue.publish-to-error-topic-on-failure", true)
// .with("mp.messaging.incoming.in.consumer.queue.error.topic",
// "publish/deny")
// .with("mp.messaging.incoming.error-in.connector", "quarkus-solace")
// .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME)
// .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive");
//
// // Run app that consumes messages
// MyErrorQueueConsumer app = runApplication(config, MyErrorQueueConsumer.class);
// // Produce messages
// PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
// .build()
// .start();
// Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION);
// OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
// OutboundMessage outboundMessage = messageBuilder.build("2");
// publisher.publish(outboundMessage, tp);
//
// await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0));
// }
@Test
@Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
.with("mp.messaging.incoming.in.consumer.queue.publish-to-error-topic-on-failure", true)
.with("mp.messaging.incoming.in.consumer.queue.error.topic",
"publish/deny")
.with("mp.messaging.incoming.error-in.connector", "quarkus-solace")
.with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME)
.with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive");

// Run app that consumes messages
MyErrorQueueConsumer app = runApplication(config, MyErrorQueueConsumer.class);
// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION);
OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
OutboundMessage outboundMessage = messageBuilder.build("2");
publisher.publish(outboundMessage, tp);

await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0));
}

@ApplicationScoped
static class MyConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "message-spool message-vpn default");
updateConfigScript(scriptBuilder, "create queue " + INTEGRATION_TEST_ERROR_QUEUE_NAME);
updateConfigScript(scriptBuilder, "access-type exclusive");
updateConfigScript(scriptBuilder, "respect-ttl");
updateConfigScript(scriptBuilder, "max-spool-usage 300");
updateConfigScript(scriptBuilder, "subscription topic " + INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION);
updateConfigScript(scriptBuilder, "permission all consume");
Expand Down

0 comments on commit 4f0f188

Please sign in to comment.