From 3438eb1571b7ce7a852b67b756f91c2c845d1d79 Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Fri, 5 Jan 2024 17:35:13 +0530 Subject: [PATCH 1/4] Changes to handle graceful shutdown of consumer, publisher before destroy and added tests to assert graceful shutdown --- .../quarkiverse/solace/SolaceConnector.java | 16 ------ .../incoming/SolaceIncomingChannel.java | 2 + .../outgoing/SolaceOutgoingChannel.java | 2 + .../solace/SolaceConsumerTest.java | 52 +++++++++++++++++++ .../solace/SolacePublisherTest.java | 35 +++++++++++++ .../solace/runtime/SolaceRecorder.java | 2 +- 6 files changed, 92 insertions(+), 17 deletions(-) diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java index 7b598b3..a34b9fd 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java @@ -22,10 +22,8 @@ import com.solace.messaging.MessagingService; -import io.quarkiverse.solace.i18n.SolaceLogging; import io.quarkiverse.solace.incoming.SolaceIncomingChannel; import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel; -import io.quarkus.runtime.ShutdownEvent; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; @@ -50,8 +48,6 @@ @ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy") @ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time") @ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id") -// TODO implement consumer concurrency -//@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1") @ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore") @ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error") @ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false") @@ -81,18 +77,6 @@ public class SolaceConnector implements InboundConnector, OutboundConnector, Hea List incomingChannels = new CopyOnWriteArrayList<>(); List outgoingChannels = new CopyOnWriteArrayList<>(); - public void onStop(@Observes ShutdownEvent shutdownEvent) { - if (solace.isConnected()) { - SolaceLogging.log.info("Waiting incoming channel messages to be acknowledged"); - incomingChannels.forEach(SolaceIncomingChannel::waitForUnAcknowledgedMessages); - SolaceLogging.log.info("All incoming channel messages are acknowledged"); - - SolaceLogging.log.info("Waiting for outgoing messages to be published"); - outgoingChannels.forEach(SolaceOutgoingChannel::waitForPublishedMessages); - SolaceLogging.log.info("All outgoing messages are published"); - } - } - public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) { incomingChannels.forEach(SolaceIncomingChannel::close); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 9ebe065..3b61dce 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -173,6 +173,7 @@ public Flow.Publisher> getStream() { public void waitForUnAcknowledgedMessages() { try { receiver.pause(); + SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged"); if (!unacknowledgedMessageTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) { SolaceLogging.log.info(String.format("Timed out while waiting for the" + " remaining messages to be acknowledged.")); @@ -184,6 +185,7 @@ public void waitForUnAcknowledgedMessages() { } public void close() { + waitForUnAcknowledgedMessages(); closed.compareAndSet(false, true); if (this.pollerThread != null) { this.pollerThread.shutdown(); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index ae8f794..25c0e65 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -179,6 +179,7 @@ public Flow.Subscriber> getSubscriber() { public void waitForPublishedMessages() { try { + SolaceLogging.log.info("Waiting for outgoing messages to be published"); if (!publishedMessagesTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) { SolaceLogging.log.info(String.format("Timed out while waiting for the" + " remaining messages to get publish acknowledgment.")); @@ -190,6 +191,7 @@ public void waitForPublishedMessages() { } public void close() { + waitForPublishedMessages(); if (processor != null) { processor.cancel(); } diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index 5f949a3..7896237 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -4,14 +4,19 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; import jakarta.enterprise.context.ApplicationScoped; import org.awaitility.Durations; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -26,8 +31,12 @@ import io.quarkiverse.solace.base.SolaceContainer; import io.quarkiverse.solace.base.WeldTestBase; +import io.quarkiverse.solace.incoming.SolaceIncomingChannel; import io.quarkiverse.solace.logging.SolaceTestAppender; +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SolaceConsumerTest extends WeldTestBase { @@ -220,6 +229,49 @@ void consumerPublishToErrorTopicPermissionException() { @Test @Order(7) + void consumerGracefulCloseTest() { + MapBasedConfig config = new MapBasedConfig() + .with("channel-name", "in") + .with("consumer.queue.name", queue) + .with("consumer.queue.add-additional-subscriptions", true) + .with("consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("consumer.queue.subscriptions", topic); + + // Initialize incoming channel to consumes messages + SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(), + new SolaceConnectorIncomingConfiguration(config), messagingService); + + List list = new ArrayList<>(); + + Flow.Publisher> stream = solaceIncomingChannel.getStream(); + Multi.createFrom().publisher(stream).subscribe().with(message -> { + list.add(message); + CompletableFuture.runAsync(message::ack); + }); + await().until(() -> { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + solaceIncomingChannel.isReady(builder); + return builder.build().isOk(); + }); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + publisher.publish("3", tp); + publisher.publish("4", tp); + publisher.publish("5", tp); + + // Assert on consumed messages + await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 5); + solaceIncomingChannel.close(); + } + + @Test + @Order(8) void consumerCreateMissingResourceAddSubscriptionPermissionException() { MapBasedConfig config = new MapBasedConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java index 669d51a..358674f 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; import jakarta.enterprise.context.ApplicationScoped; @@ -20,8 +21,11 @@ import io.quarkiverse.solace.base.WeldTestBase; import io.quarkiverse.solace.outgoing.SolaceOutboundMetadata; +import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel; import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; public class SolacePublisherTest extends WeldTestBase { @@ -99,6 +103,37 @@ void publisherWithBackPressureReject() { await().untilAsserted(() -> assertThat(app.getAcked().size()).isLessThan(5)); } + @Test + void publisherGracefulCloseTest() { + MapBasedConfig config = new MapBasedConfig() + .with("channel-name", "out") + .with("producer.topic", topic); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + SolaceOutgoingChannel solaceOutgoingChannel = new SolaceOutgoingChannel(Vertx.vertx(), + new SolaceConnectorOutgoingConfiguration(config), messagingService); + // Publish messages + Multi.createFrom().range(0, 10) + .map(Message::of) + .subscribe((Flow.Subscriber>) solaceOutgoingChannel.getSubscriber()); + await().until(() -> { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + solaceOutgoingChannel.isReady(builder); + return builder.build().isOk(); + }); + // Assert on received messages + await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10)); + solaceOutgoingChannel.close(); + } + // @Test // void publisherWithBackPressureRejectWaitForPublisherReadiness() { // MapBasedConfig config = new MapBasedConfig() diff --git a/runtime/src/main/java/io/quarkiverse/solace/runtime/SolaceRecorder.java b/runtime/src/main/java/io/quarkiverse/solace/runtime/SolaceRecorder.java index b3b20ae..43ec4dd 100644 --- a/runtime/src/main/java/io/quarkiverse/solace/runtime/SolaceRecorder.java +++ b/runtime/src/main/java/io/quarkiverse/solace/runtime/SolaceRecorder.java @@ -55,7 +55,7 @@ public MessagingService apply(SyntheticCreationalContext conte } var tmp = service; - shutdown.addShutdownTask(() -> { + shutdown.addLastShutdownTask(() -> { if (tmp.isConnected()) { tmp.disconnect(); } From a4b0a11e2fda0e862e97b61ce54d74ea4ce2da6f Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 8 Jan 2024 15:58:54 +0530 Subject: [PATCH 2/4] addressed comments in PR #18 --- .../quarkiverse/solace/SolaceConnector.java | 3 ++- ...ncomingMessagesUnsignedCounterBarrier.java | 12 ++++++++++ .../incoming/SolaceIncomingChannel.java | 12 ++++++---- ...utgoingMessagesUnsignedCounterBarrier.java | 12 ++++++++++ .../outgoing/SolaceOutgoingChannel.java | 12 ++++++---- .../solace/SolaceConsumerTest.java | 24 ++++++++----------- .../solace/SolacePublisherTest.java | 10 +++----- 7 files changed, 55 insertions(+), 30 deletions(-) diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java index a34b9fd..7f5baff 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java @@ -38,7 +38,8 @@ // TODO only persisted is implemented //@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted") @ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false") -@ConnectorAttribute(name = "client.shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000") +@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true") +@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000") @ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver") @ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver", defaultValue = "durable-non-exclusive") @ConnectorAttribute(name = "consumer.queue.missing-resource-creation-strategy", type = "string", direction = INCOMING, description = "Missing resource creation strategy", defaultValue = "do-not-create") diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java index 9c8352b..53d9e57 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti return false; } isZero.await(realTimeout, TimeUnit.MILLISECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All incoming channel messages are acknowledged")); + } } return true; } else if (timeout < 0) { while (isGreaterThanZero()) { SolaceLogging.log.info(String.format("Waiting for %s items", counter.get())); isZero.await(5, TimeUnit.SECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All incoming channel messages are acknowledged")); + } } return true; } else { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 3b61dce..d056d3c 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -45,7 +45,8 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private final PersistentMessageReceiver receiver; private final Flow.Publisher> stream; private final ExecutorService pollerThread; - private long waitTimeout = -1; + private boolean gracefulShutdown; + private long gracefulShutdownWaitTimeout; // Assuming we won't ever exceed the limit of an unsigned long... private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier(); @@ -53,7 +54,8 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) { this.channel = ic.getChannel(); this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); - this.waitTimeout = ic.getClientShutdownWaitTimeout(); + this.gracefulShutdown = ic.getClientGracefulShutdown(); + this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout(); DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build(); Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED }; if (ic.getConsumerQueueSupportsNacks()) { @@ -174,7 +176,7 @@ public void waitForUnAcknowledgedMessages() { try { receiver.pause(); SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged"); - if (!unacknowledgedMessageTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) { + if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { SolaceLogging.log.info(String.format("Timed out while waiting for the" + " remaining messages to be acknowledged.")); } @@ -185,7 +187,9 @@ public void waitForUnAcknowledgedMessages() { } public void close() { - waitForUnAcknowledgedMessages(); + if (this.gracefulShutdown) { + waitForUnAcknowledgedMessages(); + } closed.compareAndSet(false, true); if (this.pollerThread != null) { this.pollerThread.shutdown(); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java index b38a030..e9f4f75 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti return false; } isZero.await(realTimeout, TimeUnit.MILLISECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All outgoing channel messages are published")); + } } return true; } else if (timeout < 0) { while (isGreaterThanZero()) { SolaceLogging.log.info(String.format("Waiting for %s items", counter.get())); isZero.await(5, TimeUnit.SECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All outgoing channel messages are published")); + } } return true; } else { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index 25c0e65..8cc3627 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -36,8 +36,9 @@ public class SolaceOutgoingChannel private final Topic topic; private final SenderProcessor processor; private boolean isPublisherReady = true; + private boolean gracefulShutdown; - private long waitTimeout = -1; + private long gracefulShutdownWaitTimeout; // Assuming we won't ever exceed the limit of an unsigned long... private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); @@ -56,7 +57,8 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity()); break; } - this.waitTimeout = oc.getClientShutdownWaitTimeout(); + this.gracefulShutdown = oc.getClientGracefulShutdown(); + this.gracefulShutdownWaitTimeout = oc.getClientGracefulShutdownWaitTimeout(); oc.getProducerDeliveryAckTimeout().ifPresent(builder::withDeliveryAckTimeout); oc.getProducerDeliveryAckWindowSize().ifPresent(builder::withDeliveryAckWindowSize); this.publisher = builder.build(); @@ -180,7 +182,7 @@ public Flow.Subscriber> getSubscriber() { public void waitForPublishedMessages() { try { SolaceLogging.log.info("Waiting for outgoing messages to be published"); - if (!publishedMessagesTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) { + if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { SolaceLogging.log.info(String.format("Timed out while waiting for the" + " remaining messages to get publish acknowledgment.")); } @@ -191,7 +193,9 @@ public void waitForPublishedMessages() { } public void close() { - waitForPublishedMessages(); + if (this.gracefulShutdown) { + waitForPublishedMessages(); + } if (processor != null) { processor.cancel(); } diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index 7896237..6f2ee25 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -4,13 +4,9 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Flow; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import jakarta.enterprise.context.ApplicationScoped; @@ -34,7 +30,6 @@ import io.quarkiverse.solace.incoming.SolaceIncomingChannel; import io.quarkiverse.solace.logging.SolaceTestAppender; import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; import io.vertx.mutiny.core.Vertx; @@ -241,17 +236,17 @@ void consumerGracefulCloseTest() { SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(), new SolaceConnectorIncomingConfiguration(config), messagingService); - List list = new ArrayList<>(); + CopyOnWriteArrayList list = new CopyOnWriteArrayList<>(); Flow.Publisher> stream = solaceIncomingChannel.getStream(); Multi.createFrom().publisher(stream).subscribe().with(message -> { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); list.add(message); - CompletableFuture.runAsync(message::ack); - }); - await().until(() -> { - HealthReport.HealthReportBuilder builder = HealthReport.builder(); - solaceIncomingChannel.isReady(builder); - return builder.build().isOk(); + executorService.schedule(() -> { + CompletableFuture.runAsync(message::ack); + list.remove(message); + }, 1, TimeUnit.SECONDS); + executorService.shutdown(); }); // Produce messages @@ -266,8 +261,9 @@ void consumerGracefulCloseTest() { publisher.publish("5", tp); // Assert on consumed messages - await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 5); + await().until(() -> list.size() == 5); solaceIncomingChannel.close(); + await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 0); } @Test diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java index 358674f..b07eca2 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java @@ -23,7 +23,6 @@ import io.quarkiverse.solace.outgoing.SolaceOutboundMetadata; import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel; import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; import io.vertx.mutiny.core.Vertx; @@ -124,14 +123,11 @@ void publisherGracefulCloseTest() { Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Flow.Subscriber>) solaceOutgoingChannel.getSubscriber()); - await().until(() -> { - HealthReport.HealthReportBuilder builder = HealthReport.builder(); - solaceOutgoingChannel.isReady(builder); - return builder.build().isOk(); - }); + + solaceOutgoingChannel.close(); // Assert on received messages await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10)); - solaceOutgoingChannel.close(); + } // @Test From b383e3400584d4e9300e59f45ca08605886ef5f0 Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Tue, 9 Jan 2024 16:58:44 +0530 Subject: [PATCH 3/4] changed variables to final --- .../quarkiverse/solace/incoming/SolaceIncomingChannel.java | 4 ++-- .../quarkiverse/solace/outgoing/SolaceOutgoingChannel.java | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index d056d3c..21a8d19 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -45,8 +45,8 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private final PersistentMessageReceiver receiver; private final Flow.Publisher> stream; private final ExecutorService pollerThread; - private boolean gracefulShutdown; - private long gracefulShutdownWaitTimeout; + private final boolean gracefulShutdown; + private final long gracefulShutdownWaitTimeout; // Assuming we won't ever exceed the limit of an unsigned long... private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier(); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index 8cc3627..cbbd90a 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -35,10 +35,9 @@ public class SolaceOutgoingChannel private final Flow.Subscriber> subscriber; private final Topic topic; private final SenderProcessor processor; + private final boolean gracefulShutdown; + private final long gracefulShutdownWaitTimeout; private boolean isPublisherReady = true; - private boolean gracefulShutdown; - - private long gracefulShutdownWaitTimeout; // Assuming we won't ever exceed the limit of an unsigned long... private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); From 33a4acfef60b6e3556709de6d8d583d705eddabf Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Tue, 9 Jan 2024 17:33:03 +0530 Subject: [PATCH 4/4] changed variable type and updated consumer graceful shutdown test --- .../solace/outgoing/SolaceOutgoingChannel.java | 2 +- .../io/quarkiverse/solace/SolaceConsumerTest.java | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index cbbd90a..4afb63c 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -37,7 +37,7 @@ public class SolaceOutgoingChannel private final SenderProcessor processor; private final boolean gracefulShutdown; private final long gracefulShutdownWaitTimeout; - private boolean isPublisherReady = true; + private volatile boolean isPublisherReady = true; // Assuming we won't ever exceed the limit of an unsigned long... private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index 6f2ee25..69f9090 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -237,16 +237,16 @@ void consumerGracefulCloseTest() { new SolaceConnectorIncomingConfiguration(config), messagingService); CopyOnWriteArrayList list = new CopyOnWriteArrayList<>(); + CopyOnWriteArrayList ackedMessageList = new CopyOnWriteArrayList<>(); Flow.Publisher> stream = solaceIncomingChannel.getStream(); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); Multi.createFrom().publisher(stream).subscribe().with(message -> { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); list.add(message); executorService.schedule(() -> { + ackedMessageList.add(message); CompletableFuture.runAsync(message::ack); - list.remove(message); }, 1, TimeUnit.SECONDS); - executorService.shutdown(); }); // Produce messages @@ -260,10 +260,11 @@ void consumerGracefulCloseTest() { publisher.publish("4", tp); publisher.publish("5", tp); - // Assert on consumed messages await().until(() -> list.size() == 5); + // Assert on acknowledged messages solaceIncomingChannel.close(); - await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 0); + await().atMost(2, TimeUnit.MINUTES).until(() -> ackedMessageList.size() == 5); + executorService.shutdown(); } @Test