diff --git a/pubsub-plus-connector/pom.xml b/pubsub-plus-connector/pom.xml index 3d55cff..4c8d169 100644 --- a/pubsub-plus-connector/pom.xml +++ b/pubsub-plus-connector/pom.xml @@ -72,11 +72,6 @@ junit-jupiter-api test - - org.junit.jupiter - junit-jupiter-params - test - org.assertj assertj-core diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java index 31c91c1..e2c2ee4 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java @@ -24,6 +24,7 @@ import io.quarkiverse.solace.base.WeldTestBase; import io.quarkiverse.solace.incoming.SolaceInboundMessage; import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.annotations.Blocking; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @Disabled @@ -209,4 +210,177 @@ Multi> in(SolaceInboundMessage msg) { } } + + @Test + public void endToEndBlockingProcessorPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic); + + // Run app that consumes messages + runApplication(config, MyBlockingProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @Test + public void endToEndBlockingProcessorPerformanceTesttWithBackPressureWaitAndNoWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + // Run app that consumes messages + runApplication(config, MyBlockingProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @Test + public void endToEndBlockingProcessorPerformanceTesttWithBackPressureElasticAndWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic"); + + // Run app that consumes messages + runApplication(config, MyBlockingProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @Test + public void endToEndBlockingProcessorPerformanceTesttWithBackPressureElasticAndNoWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic") + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + // Run app that consumes messages + runApplication(config, MyBlockingProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @ApplicationScoped + static class MyBlockingProcessor { + @Incoming("in") + @Outgoing("out") + @Blocking(ordered = false) + Message in(SolaceInboundMessage msg) { + return Message.of(msg.getMessage().getPayloadAsString()).withAck(() -> { + msg.ack(); + return CompletableFuture.completedFuture(null); + }); + } + } }