Skip to content

Commit

Permalink
Merge pull request #14 from SolaceCoEExt/fix-issues
Browse files Browse the repository at this point in the history
Issues
  • Loading branch information
ozangunalp authored Jan 4, 2024
2 parents 76d9f4f + 94a7e3e commit a9b4978
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 66 deletions.
13 changes: 13 additions & 0 deletions pubsub-plus-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@
</annotationProcessors>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<java.util.logging.manager>java.util.logging.LogManager</java.util.logging.manager>
</systemPropertyVariables>
<classpathDependencyExcludes>
<classpathDependencyExclude>org.jboss.slf4j:slf4j-jboss-logmanager</classpathDependencyExclude>
<classpathDependencyExclude>org.jboss.logmanager:jboss-logmanager-embedded</classpathDependencyExclude>
</classpathDependencyExcludes>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@

import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.AcknowledgementSupport;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Uni;

public class SolaceErrorTopic implements SolaceFailureHandler {
private final String channel;
Expand Down Expand Up @@ -50,24 +48,18 @@ public void setTimeToLive(Long timeToLive) {

@Override
public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata) {
PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler
.handle(msg, errorTopic, dmqEligible, timeToLive)
return solaceErrorTopicPublisherHandler.handle(msg, errorTopic, dmqEligible, timeToLive)
.onFailure().retry().withBackOff(Duration.ofSeconds(1))
.atMost(maxDeliveryAttempts)
.subscribeAsCompletionStage().exceptionally((t) -> {
SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel,
t.getMessage());
return null;
}).join();

if (publishReceipt != null) {
return Uni.createFrom().voidItem()
.invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED))
.runSubscriptionOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}

return Uni.createFrom().<Void> failure(reason)
.emitOn(msg::runOnMessageContext).subscribeAsCompletionStage();
.onItem().invoke(() -> {
SolaceLogging.log.messageSettled(channel,
MessageAcknowledgementConfiguration.Outcome.ACCEPTED.toString().toLowerCase(),
"Message is published to error topic and acknowledged on queue.");
ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
})
.replaceWithVoid()
.onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t))
.emitOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,33 @@
class SolaceErrorTopicPublisherHandler implements PersistentMessagePublisher.MessagePublishReceiptListener {

private final MessagingService solace;
private String errorTopic;
private final PersistentMessagePublisher publisher;
private final OutboundErrorMessageMapper outboundErrorMessageMapper;

public SolaceErrorTopicPublisherHandler(MessagingService solace) {
this.solace = solace;

publisher = solace.createPersistentMessagePublisherBuilder().build();
publisher.setMessagePublishReceiptListener(this);
publisher.start();
outboundErrorMessageMapper = new OutboundErrorMessageMapper();
}

public Uni<PublishReceipt> handle(SolaceInboundMessage<?> message,
String errorTopic,
boolean dmqEligible, Long timeToLive) {
this.errorTopic = errorTopic;
OutboundMessage outboundMessage = outboundErrorMessageMapper.mapError(this.solace.messageBuilder(),
message.getMessage(),
dmqEligible, timeToLive);
publisher.setMessagePublishReceiptListener(this);
// }
return Uni.createFrom().<PublishReceipt> emitter(e -> {
try {
// always wait for error message publish receipt to ensure it is successfully spooled on broker.
publisher.publish(outboundMessage, Topic.of(errorTopic), e);
} catch (Throwable t) {
SolaceLogging.log.publishException(this.errorTopic);
e.fail(t);
}
}).invoke(() -> System.out.println(""));
}).onFailure().invoke(t -> SolaceLogging.log.publishException(errorTopic, t));
}

@Override
Expand All @@ -53,7 +50,6 @@ public void onPublishReceipt(PublishReceipt publishReceipt) {
.getUserContext();
PubSubPlusClientException exception = publishReceipt.getException();
if (exception != null) {
SolaceLogging.log.publishException(this.errorTopic);
uniEmitter.fail(exception);
} else {
uniEmitter.complete(publishReceipt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.LogMessage;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;
import org.jboss.logging.annotations.Once;
import org.jboss.logging.annotations.*;

/**
* Logging for Solace PubSub Connector
Expand All @@ -30,12 +27,12 @@ public interface SolaceLogging extends BasicLogger {
void messageSettled(String channel, String outcome, String reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful, reason: %s")
void unsuccessfulToTopic(String topic, String channel, String reason);
@Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful")
void unsuccessfulToTopic(String topic, String channel, @Cause Throwable cause);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55204, value = "A exception occurred when publishing to topic %s")
void publishException(String topic);
void publishException(String topic, @Cause Throwable cause);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55205, value = "A exception occurred during shutdown %s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.quarkiverse.solace.i18n.SolaceLogging;

class IncomingMessagesUnsignedCounterBarrier {
private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1)
private final Lock awaitLock = new ReentrantLock();
private final Condition isZero = awaitLock.newCondition();

private static final Log logger = LogFactory.getLog(IncomingMessagesUnsignedCounterBarrier.class);

public IncomingMessagesUnsignedCounterBarrier(long initialValue) {
counter = new AtomicLong(initialValue);
}
Expand Down Expand Up @@ -64,7 +61,8 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
awaitLock.lock();
try {
if (timeout > 0) {
logger.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
SolaceLogging.log
.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
final long expiry = unit.toMillis(timeout) + System.currentTimeMillis();
while (isGreaterThanZero()) {
long realTimeout = expiry - System.currentTimeMillis();
Expand All @@ -76,7 +74,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
logger.info(String.format("Waiting for %s items", counter.get()));
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.quarkiverse.solace.i18n.SolaceLogging;

class OutgoingMessagesUnsignedCounterBarrier {
private final AtomicLong counter; // Treated as an unsigned long (i.e. range from 0 -> -1)
private final Lock awaitLock = new ReentrantLock();
private final Condition isZero = awaitLock.newCondition();

private static final Log logger = LogFactory.getLog(OutgoingMessagesUnsignedCounterBarrier.class);

public OutgoingMessagesUnsignedCounterBarrier(long initialValue) {
counter = new AtomicLong(initialValue);
}
Expand Down Expand Up @@ -64,7 +61,8 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
awaitLock.lock();
try {
if (timeout > 0) {
logger.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
SolaceLogging.log
.info(String.format("Waiting for %s items, time remaining: %s %s", counter.get(), timeout, unit));
final long expiry = unit.toMillis(timeout) + System.currentTimeMillis();
while (isGreaterThanZero()) {
long realTimeout = expiry - System.currentTimeMillis();
Expand All @@ -76,7 +74,7 @@ public boolean awaitEmpty(long timeout, TimeUnit unit) throws InterruptedExcepti
return true;
} else if (timeout < 0) {
while (isGreaterThanZero()) {
logger.info(String.format("Waiting for %s items", counter.get()));
SolaceLogging.log.info(String.format("Waiting for %s items", counter.get()));
isZero.await(5, TimeUnit.SECONDS);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@

import io.quarkiverse.solace.base.SolaceContainer;
import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.logging.SolaceTestAppender;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SolaceConsumerTest extends WeldTestBase {
private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("io.quarkiverse.solace");
private SolaceTestAppender solaceTestAppender = new SolaceTestAppender();

private SolaceConsumerTest() {
rootLogger.addAppender(solaceTestAppender);
}

@Test
@Order(1)
Expand Down Expand Up @@ -180,28 +187,6 @@ void consumerFailedProcessingMoveToDMQ() {

@Test
@Order(6)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

Exception exception = assertThrows(Exception.class, () -> {
// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);
});

// Assert on published messages
await().untilAsserted(() -> assertThat(exception.getMessage())
.contains("com.solacesystems.jcsmp.AccessDeniedException: Permission Not Allowed - Queue '"
+ SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic));
}

@Test
@Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand All @@ -210,6 +195,7 @@ void consumerPublishToErrorTopicPermissionException() {
.with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "error_topic")
.with("mp.messaging.incoming.in.consumer.queue.error.topic",
"publish/deny")
.with("mp.messaging.incoming.in.consumer.queue.error.message.max-delivery-attempts", 0)
.with("mp.messaging.incoming.error-in.connector", "quarkus-solace")
.with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME)
.with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive");
Expand All @@ -226,6 +212,32 @@ void consumerPublishToErrorTopicPermissionException() {
publisher.publish(outboundMessage, tp);

await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0));
// await().untilAsserted(() -> assertThat(inMemoryLogHandler.getRecords().stream().filter(record -> record.getMessage().contains("A exception occurred when publishing to topic")).count()).isEqualTo(4));
await().untilAsserted(() -> assertThat(solaceTestAppender.getLog().stream()
.anyMatch(record -> record.getMessage().toString().contains("Publishing error message to topic")))
.isEqualTo(true));
}

@Test
@Order(7)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

Exception exception = assertThrows(Exception.class, () -> {
// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);
});

// Assert on published messages
await().untilAsserted(() -> assertThat(exception.getMessage())
.contains("com.solacesystems.jcsmp.AccessDeniedException: Permission Not Allowed - Queue '"
+ SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic));
}

@ApplicationScoped
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.quarkiverse.solace.logging;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;

public class SolaceTestAppender extends AppenderSkeleton {
private List<LoggingEvent> log = new ArrayList<>();

@Override
protected void append(LoggingEvent loggingEvent) {
log.add(loggingEvent);
}

@Override
public void close() {

}

@Override
public boolean requiresLayout() {
return false;
}

public List<LoggingEvent> getLog() {
return new ArrayList<LoggingEvent>(log);
}
}

0 comments on commit a9b4978

Please sign in to comment.