Skip to content

Commit

Permalink
addressed comments in PR #18
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Jan 8, 2024
1 parent 3438eb1 commit a4b0a11
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
// TODO only persisted is implemented
//@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted")
@ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false")
@ConnectorAttribute(name = "client.shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")
@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true")
@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")
@ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver")
@ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver", defaultValue = "durable-non-exclusive")
@ConnectorAttribute(name = "consumer.queue.missing-resource-creation-strategy", type = "string", direction = INCOMING, description = "Missing resource creation strategy", defaultValue = "do-not-create")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return false;
}
isZero.await(realTimeout, TimeUnit.MILLISECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All incoming channel messages are acknowledged"));
}
}
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All incoming channel messages are acknowledged"));
}
}
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final PersistentMessageReceiver receiver;
private final Flow.Publisher<? extends Message<?>> stream;
private final ExecutorService pollerThread;
private long waitTimeout = -1;
private boolean gracefulShutdown;
private long gracefulShutdownWaitTimeout;

// Assuming we won't ever exceed the limit of an unsigned long...
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier();

public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
this.channel = ic.getChannel();
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.waitTimeout = ic.getClientShutdownWaitTimeout();
this.gracefulShutdown = ic.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout();
DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build();
Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED };
if (ic.getConsumerQueueSupportsNacks()) {
Expand Down Expand Up @@ -174,7 +176,7 @@ public void waitForUnAcknowledgedMessages() {
try {
receiver.pause();
SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged");
if (!unacknowledgedMessageTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) {
if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to be acknowledged."));
}
Expand All @@ -185,7 +187,9 @@ public void waitForUnAcknowledgedMessages() {
}

public void close() {
waitForUnAcknowledgedMessages();
if (this.gracefulShutdown) {
waitForUnAcknowledgedMessages();
}
closed.compareAndSet(false, true);
if (this.pollerThread != null) {
this.pollerThread.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,24 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return false;
}
isZero.await(realTimeout, TimeUnit.MILLISECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All outgoing channel messages are published"));
}
}
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
SolaceLogging.log
.info(String.format("Items remaining: %s", counter.get()));
if (counter.get() == 0l) {
SolaceLogging.log
.info(String.format("All outgoing channel messages are published"));
}
}
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ public class SolaceOutgoingChannel
private final Topic topic;
private final SenderProcessor processor;
private boolean isPublisherReady = true;
private boolean gracefulShutdown;

private long waitTimeout = -1;
private long gracefulShutdownWaitTimeout;

// Assuming we won't ever exceed the limit of an unsigned long...
private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier();
Expand All @@ -56,7 +57,8 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity());
break;
}
this.waitTimeout = oc.getClientShutdownWaitTimeout();
this.gracefulShutdown = oc.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = oc.getClientGracefulShutdownWaitTimeout();
oc.getProducerDeliveryAckTimeout().ifPresent(builder::withDeliveryAckTimeout);
oc.getProducerDeliveryAckWindowSize().ifPresent(builder::withDeliveryAckWindowSize);
this.publisher = builder.build();
Expand Down Expand Up @@ -180,7 +182,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber() {
public void waitForPublishedMessages() {
try {
SolaceLogging.log.info("Waiting for outgoing messages to be published");
if (!publishedMessagesTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) {
if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to get publish acknowledgment."));
}
Expand All @@ -191,7 +193,9 @@ public void waitForPublishedMessages() {
}

public void close() {
waitForPublishedMessages();
if (this.gracefulShutdown) {
waitForPublishedMessages();
}
if (processor != null) {
processor.cancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import jakarta.enterprise.context.ApplicationScoped;

Expand All @@ -34,7 +30,6 @@
import io.quarkiverse.solace.incoming.SolaceIncomingChannel;
import io.quarkiverse.solace.logging.SolaceTestAppender;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

Expand Down Expand Up @@ -241,17 +236,17 @@ void consumerGracefulCloseTest() {
SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(),
new SolaceConnectorIncomingConfiguration(config), messagingService);

List<Object> list = new ArrayList<>();
CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();

Flow.Publisher<? extends Message<?>> stream = solaceIncomingChannel.getStream();
Multi.createFrom().publisher(stream).subscribe().with(message -> {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
list.add(message);
CompletableFuture.runAsync(message::ack);
});
await().until(() -> {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
solaceIncomingChannel.isReady(builder);
return builder.build().isOk();
executorService.schedule(() -> {
CompletableFuture.runAsync(message::ack);
list.remove(message);
}, 1, TimeUnit.SECONDS);
executorService.shutdown();
});

// Produce messages
Expand All @@ -266,8 +261,9 @@ void consumerGracefulCloseTest() {
publisher.publish("5", tp);

// Assert on consumed messages
await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 5);
await().until(() -> list.size() == 5);
solaceIncomingChannel.close();
await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.quarkiverse.solace.outgoing.SolaceOutboundMetadata;
import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

Expand Down Expand Up @@ -124,14 +123,11 @@ void publisherGracefulCloseTest() {
Multi.createFrom().range(0, 10)
.map(Message::of)
.subscribe((Flow.Subscriber<? super Message<Integer>>) solaceOutgoingChannel.getSubscriber());
await().until(() -> {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
solaceOutgoingChannel.isReady(builder);
return builder.build().isOk();
});

solaceOutgoingChannel.close();
// Assert on received messages
await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10));
solaceOutgoingChannel.close();

}

// @Test
Expand Down

0 comments on commit a4b0a11

Please sign in to comment.