Skip to content

Commit

Permalink
Added solace community license and code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Dec 19, 2023
1 parent c10a047 commit 3610572
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 221 deletions.
201 changes: 0 additions & 201 deletions LICENSE

This file was deleted.

Binary file added LICENSE.pdf
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public interface SolaceLogging extends BasicLogger {
void messageSettled(String channel, String outcome, String reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful")
void unsuccessfulToTopic(String topic, String channel);
@Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful, reason: %s")
void unsuccessfulToTopic(String topic, String channel, String reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55204, value = "A exception occurred when publishing to topic %s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class SolaceInboundMessage<T> implements ContextAwareMessage<T>, Metadata
private final SolaceConnectorIncomingConfiguration ic;
private final T payload;
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker;

private Metadata metadata;

public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler,
Expand Down Expand Up @@ -97,32 +96,45 @@ public CompletionStage<Void> ack() {

@Override
public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {
if (solaceErrorTopicPublisherHandler != null) {
if (solaceErrorTopicPublisherHandler == null) {
// REJECTED - Will move message to DMQ if enabled, FAILED - Will redeliver the message.
MessageAcknowledgementConfiguration.Outcome outcome = ic.getConsumerQueueEnableNacks()
? (ic.getConsumerQueueDiscardMessagesOnFailure() ? MessageAcknowledgementConfiguration.Outcome.REJECTED
: MessageAcknowledgementConfiguration.Outcome.FAILED)
: null; // if nacks are not supported on broker, no outcome is required.
if (outcome != null) {
// decrement the tracker, as the message might get redelivered or moved to DMQ
this.unacknowledgedMessageTracker.decrement();
return nackHandler.handle(this, reason, nackMetadata, outcome);
}
} else {
PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler.handle(this, ic)
.onFailure().retry().withBackOff(Duration.ofSeconds(1))
.atMost(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts())
.onFailure().transform((throwable -> {
SolaceLogging.log.unsuccessfulToTopic(ic.getConsumerQueueErrorTopic().get(), ic.getChannel());
throw new RuntimeException(throwable); // TODO How to catch this exception in tests
}))
.await().atMost(Duration.ofSeconds(30));
.subscribeAsCompletionStage().exceptionally((t) -> {
SolaceLogging.log.unsuccessfulToTopic(ic.getConsumerQueueErrorTopic().get(), ic.getChannel(),
t.getMessage());
return null;
}).join();

if (publishReceipt != null) {
// decrement the tracker, as the message might get redelivered or moved to DMQ
this.unacknowledgedMessageTracker.decrement();
return nackHandler.handle(this, reason, nackMetadata, MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
} else {
if (ic.getConsumerQueueEnableNacks()) {
// decrement the tracker, as the message might get redelivered or moved to DMQ
this.unacknowledgedMessageTracker.decrement();
return nackHandler.handle(this, reason, nackMetadata,
MessageAcknowledgementConfiguration.Outcome.FAILED);
}
}
}

MessageAcknowledgementConfiguration.Outcome outcome = ic.getConsumerQueueEnableNacks()
&& ic.getConsumerQueueDiscardMessagesOnFailure() && solaceErrorTopicPublisherHandler == null
? MessageAcknowledgementConfiguration.Outcome.REJECTED // will move message to DMQ is enabled on queue & message
: MessageAcknowledgementConfiguration.Outcome.FAILED; // will redeliver the message
if (outcome == MessageAcknowledgementConfiguration.Outcome.REJECTED) {
this.unacknowledgedMessageTracker.decrement();
}
return ic.getConsumerQueueEnableNacks()
? nackHandler.handle(this, reason, nackMetadata, outcome)
: Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO Disconnect and reconnect the receiver in order to redeliver the message. Required when nacks are not supported by the broker version.
// decrement the tracker, as the message might get redelivered or moved to DMQ
this.unacknowledgedMessageTracker.decrement();
// return void stage if above check fail. This will not nack the message on broker.
return Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO - Restart receiver to redeliver message, needed when nacks are not supported on broker.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
@ApplicationScoped
public class HelloConsumer {
/**
* Publish a simple string from using TryMe in Solace broker and you should see the message published to topic
* Publish a simple message using TryMe in Solace broker and you should see the message published to topic
*
* @param p
*/
Expand Down

0 comments on commit 3610572

Please sign in to comment.