Skip to content

Commit

Permalink
Added blocking processor test and removed duplicate entry in pom.xml
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Jan 19, 2024
1 parent cd0c544 commit ec3369d
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 5 deletions.
5 changes: 0 additions & 5 deletions pubsub-plus-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

@Disabled
Expand Down Expand Up @@ -209,4 +210,177 @@ Multi<Message<String>> in(SolaceInboundMessage<byte[]> msg) {
}

}

@Test
public void endToEndBlockingProcessorPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", processedTopic);

// Run app that consumes messages
runApplication(config, MyBlockingProcessor.class);

List<String> received = new CopyOnWriteArrayList<>();

// Start listening processed messages
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.withSubscriptions(TopicSubscription.of(processedTopic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString()));
receiver.start();

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> received.size() == COUNT);
}

@Test
public void endToEndBlockingProcessorPerformanceTesttWithBackPressureWaitAndNoWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", processedTopic)
.with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false);

// Run app that consumes messages
runApplication(config, MyBlockingProcessor.class);

List<String> received = new CopyOnWriteArrayList<>();

// Start listening processed messages
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.withSubscriptions(TopicSubscription.of(processedTopic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString()));
receiver.start();

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> received.size() == COUNT);
}

@Test
public void endToEndBlockingProcessorPerformanceTesttWithBackPressureElasticAndWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", processedTopic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic");

// Run app that consumes messages
runApplication(config, MyBlockingProcessor.class);

List<String> received = new CopyOnWriteArrayList<>();

// Start listening processed messages
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.withSubscriptions(TopicSubscription.of(processedTopic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString()));
receiver.start();

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> received.size() == COUNT);
}

@Test
public void endToEndBlockingProcessorPerformanceTesttWithBackPressureElasticAndNoWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", processedTopic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic")
.with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false);

// Run app that consumes messages
runApplication(config, MyBlockingProcessor.class);

List<String> received = new CopyOnWriteArrayList<>();

// Start listening processed messages
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.withSubscriptions(TopicSubscription.of(processedTopic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString()));
receiver.start();

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> received.size() == COUNT);
}

@ApplicationScoped
static class MyBlockingProcessor {
@Incoming("in")
@Outgoing("out")
@Blocking(ordered = false)
Message<String> in(SolaceInboundMessage<byte[]> msg) {
return Message.of(msg.getMessage().getPayloadAsString()).withAck(() -> {
msg.ack();
return CompletableFuture.completedFuture(null);
});
}
}
}

0 comments on commit ec3369d

Please sign in to comment.