diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java index a34b9fd..7f5baff 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java @@ -38,7 +38,8 @@ // TODO only persisted is implemented //@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted") @ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false") -@ConnectorAttribute(name = "client.shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000") +@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true") +@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000") @ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver") @ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver", defaultValue = "durable-non-exclusive") @ConnectorAttribute(name = "consumer.queue.missing-resource-creation-strategy", type = "string", direction = INCOMING, description = "Missing resource creation strategy", defaultValue = "do-not-create") diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java index 9c8352b..53d9e57 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/IncomingMessagesUnsignedCounterBarrier.java @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti return false; } isZero.await(realTimeout, TimeUnit.MILLISECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All incoming channel messages are acknowledged")); + } } return true; } else if (timeout < 0) { while (isGreaterThanZero()) { SolaceLogging.log.info(String.format("Waiting for %s items", counter.get())); isZero.await(5, TimeUnit.SECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All incoming channel messages are acknowledged")); + } } return true; } else { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 3b61dce..d056d3c 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -45,7 +45,8 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private final PersistentMessageReceiver receiver; private final Flow.Publisher> stream; private final ExecutorService pollerThread; - private long waitTimeout = -1; + private boolean gracefulShutdown; + private long gracefulShutdownWaitTimeout; // Assuming we won't ever exceed the limit of an unsigned long... private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier(); @@ -53,7 +54,8 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) { this.channel = ic.getChannel(); this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); - this.waitTimeout = ic.getClientShutdownWaitTimeout(); + this.gracefulShutdown = ic.getClientGracefulShutdown(); + this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout(); DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build(); Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED }; if (ic.getConsumerQueueSupportsNacks()) { @@ -174,7 +176,7 @@ public void waitForUnAcknowledgedMessages() { try { receiver.pause(); SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged"); - if (!unacknowledgedMessageTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) { + if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { SolaceLogging.log.info(String.format("Timed out while waiting for the" + " remaining messages to be acknowledged.")); } @@ -185,7 +187,9 @@ public void waitForUnAcknowledgedMessages() { } public void close() { - waitForUnAcknowledgedMessages(); + if (this.gracefulShutdown) { + waitForUnAcknowledgedMessages(); + } closed.compareAndSet(false, true); if (this.pollerThread != null) { this.pollerThread.shutdown(); diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java index b38a030..e9f4f75 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/OutgoingMessagesUnsignedCounterBarrier.java @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti return false; } isZero.await(realTimeout, TimeUnit.MILLISECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All outgoing channel messages are published")); + } } return true; } else if (timeout < 0) { while (isGreaterThanZero()) { SolaceLogging.log.info(String.format("Waiting for %s items", counter.get())); isZero.await(5, TimeUnit.SECONDS); + SolaceLogging.log + .info(String.format("Items remaining: %s", counter.get())); + if (counter.get() == 0l) { + SolaceLogging.log + .info(String.format("All outgoing channel messages are published")); + } } return true; } else { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index 25c0e65..8cc3627 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -36,8 +36,9 @@ public class SolaceOutgoingChannel private final Topic topic; private final SenderProcessor processor; private boolean isPublisherReady = true; + private boolean gracefulShutdown; - private long waitTimeout = -1; + private long gracefulShutdownWaitTimeout; // Assuming we won't ever exceed the limit of an unsigned long... private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); @@ -56,7 +57,8 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity()); break; } - this.waitTimeout = oc.getClientShutdownWaitTimeout(); + this.gracefulShutdown = oc.getClientGracefulShutdown(); + this.gracefulShutdownWaitTimeout = oc.getClientGracefulShutdownWaitTimeout(); oc.getProducerDeliveryAckTimeout().ifPresent(builder::withDeliveryAckTimeout); oc.getProducerDeliveryAckWindowSize().ifPresent(builder::withDeliveryAckWindowSize); this.publisher = builder.build(); @@ -180,7 +182,7 @@ public Flow.Subscriber> getSubscriber() { public void waitForPublishedMessages() { try { SolaceLogging.log.info("Waiting for outgoing messages to be published"); - if (!publishedMessagesTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) { + if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { SolaceLogging.log.info(String.format("Timed out while waiting for the" + " remaining messages to get publish acknowledgment.")); } @@ -191,7 +193,9 @@ public void waitForPublishedMessages() { } public void close() { - waitForPublishedMessages(); + if (this.gracefulShutdown) { + waitForPublishedMessages(); + } if (processor != null) { processor.cancel(); } diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index 7896237..6f2ee25 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -4,13 +4,9 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Flow; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import jakarta.enterprise.context.ApplicationScoped; @@ -34,7 +30,6 @@ import io.quarkiverse.solace.incoming.SolaceIncomingChannel; import io.quarkiverse.solace.logging.SolaceTestAppender; import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; import io.vertx.mutiny.core.Vertx; @@ -241,17 +236,17 @@ void consumerGracefulCloseTest() { SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(), new SolaceConnectorIncomingConfiguration(config), messagingService); - List list = new ArrayList<>(); + CopyOnWriteArrayList list = new CopyOnWriteArrayList<>(); Flow.Publisher> stream = solaceIncomingChannel.getStream(); Multi.createFrom().publisher(stream).subscribe().with(message -> { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); list.add(message); - CompletableFuture.runAsync(message::ack); - }); - await().until(() -> { - HealthReport.HealthReportBuilder builder = HealthReport.builder(); - solaceIncomingChannel.isReady(builder); - return builder.build().isOk(); + executorService.schedule(() -> { + CompletableFuture.runAsync(message::ack); + list.remove(message); + }, 1, TimeUnit.SECONDS); + executorService.shutdown(); }); // Produce messages @@ -266,8 +261,9 @@ void consumerGracefulCloseTest() { publisher.publish("5", tp); // Assert on consumed messages - await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 5); + await().until(() -> list.size() == 5); solaceIncomingChannel.close(); + await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 0); } @Test diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java index 358674f..b07eca2 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolacePublisherTest.java @@ -23,7 +23,6 @@ import io.quarkiverse.solace.outgoing.SolaceOutboundMetadata; import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel; import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; import io.vertx.mutiny.core.Vertx; @@ -124,14 +123,11 @@ void publisherGracefulCloseTest() { Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Flow.Subscriber>) solaceOutgoingChannel.getSubscriber()); - await().until(() -> { - HealthReport.HealthReportBuilder builder = HealthReport.builder(); - solaceOutgoingChannel.isReady(builder); - return builder.build().isOk(); - }); + + solaceOutgoingChannel.close(); // Assert on received messages await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10)); - solaceOutgoingChannel.close(); + } // @Test