Skip to content

Commit

Permalink
DATAGO-68275: fix SolaceErrorMessageHandler acknowledgmentCallback de…
Browse files Browse the repository at this point in the history
…tection and error handling (#331)

* Detect and handle acknowledgmentCallbacks from all potential sources:
    * ErrorMessage header
    * ErrorMessage.getOriginalMessage()
    * MessagingException.getFailedMessage()
* throw exception instead of returning when error handler isn't able to process the error message.
  • Loading branch information
Nephery authored Aug 29, 2024
1 parent 9b4c6c3 commit b64f11d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,62 +12,65 @@
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;

import java.util.UUID;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SolaceErrorMessageHandler implements MessageHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(SolaceErrorMessageHandler.class);

@Override
public void handleMessage(Message<?> message) throws MessagingException {
UUID springId = StaticMessageHeaderAccessor.getId(message);
StringBuilder info = new StringBuilder("Processing message ").append(springId).append(" <");

if (!(message instanceof ErrorMessage errorMessage)) {
LOGGER.warn("Spring message {}: Expected an {}, not a {}", springId, ErrorMessage.class.getSimpleName(),
message.getClass().getSimpleName());
return;
throw new IllegalArgumentException(String.format("Spring message %s: Expected an %s, but got a %s",
StaticMessageHeaderAccessor.getId(message), ErrorMessage.class.getSimpleName(),
message.getClass().getSimpleName()));
}

Throwable payload = errorMessage.getPayload();

Message<?> failedMsg;
if (payload instanceof MessagingException && ((MessagingException) payload).getFailedMessage() != null) {
failedMsg = ((MessagingException) payload).getFailedMessage();
} else {
failedMsg = errorMessage.getOriginalMessage();
}

if (failedMsg != null) {
info.append("failed-message: ").append(StaticMessageHeaderAccessor.getId(failedMsg)).append(", ");
}

Object sourceData = StaticMessageHeaderAccessor.getSourceData(message);
if (sourceData instanceof XMLMessage) {
info.append("source-message: ").append(((XMLMessage) sourceData).getMessageId()).append(", ");
}
handleErrorMessage(errorMessage);
}

LOGGER.info(info.append('>').toString());
private void handleErrorMessage(ErrorMessage errorMessage) {
Message<?> messagingExceptionFailedMessage = errorMessage.getPayload() instanceof MessagingException m ?
m.getFailedMessage() : null;
Set<AcknowledgmentCallback> acknowledgmentCallbacks = Stream.of(Stream.of(errorMessage),
Stream.ofNullable(messagingExceptionFailedMessage),
Stream.ofNullable(errorMessage.getOriginalMessage()))
.flatMap(s -> s)
.map(StaticMessageHeaderAccessor::getAcknowledgmentCallback)
.filter(Objects::nonNull)
.collect(Collectors.toUnmodifiableSet());

AcknowledgmentCallback acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message);
if (acknowledgmentCallback == null && failedMsg != null) {
acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMsg);
}
LOGGER.atInfo()
.setMessage("Processing message {} <messaging-exception-message: {}, original-message: {}, source-jcsmp-message: {}>")
.addArgument(() -> StaticMessageHeaderAccessor.getId(errorMessage))
.addArgument(() -> messagingExceptionFailedMessage != null ?
StaticMessageHeaderAccessor.getId(messagingExceptionFailedMessage) : null)
.addArgument(() -> errorMessage.getOriginalMessage() != null ?
StaticMessageHeaderAccessor.getId(errorMessage.getOriginalMessage()) : null)
.addArgument(() -> StaticMessageHeaderAccessor.getSourceData(errorMessage) instanceof XMLMessage m ?
m.getMessageId() : null)
.log();

if (acknowledgmentCallback == null) {
if (acknowledgmentCallbacks.isEmpty()) {
// Should never happen under normal use
LOGGER.warn("Spring message {} does not contain an acknowledgment callback. Message cannot be acknowledged",
springId);
return;
throw new IllegalArgumentException(String.format(
"Spring error message %s does not contain an acknowledgment callback. Message cannot be acknowledged",
StaticMessageHeaderAccessor.getId(errorMessage)));
}

try {
if (!SolaceAckUtil.republishToErrorQueue(acknowledgmentCallback)) {
AckUtils.requeue(acknowledgmentCallback);
for(AcknowledgmentCallback acknowledgmentCallback : acknowledgmentCallbacks) {
try {
if (!SolaceAckUtil.republishToErrorQueue(acknowledgmentCallback)) {
AckUtils.requeue(acknowledgmentCallback);
}
} catch (SolaceAcknowledgmentException e) {
LOGGER.error("Spring error message {}: exception in error handler",
StaticMessageHeaderAccessor.getId(errorMessage), e);
throw e;
}
} catch (SolaceAcknowledgmentException e) {
LOGGER.error("Spring message {}: exception in error handler", springId, e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.solace.spring.cloud.stream.binder.util;

import static org.junit.jupiter.api.Assertions.assertThrows;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.TextMessage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.integration.support.ErrorMessageUtils;
Expand All @@ -20,6 +20,9 @@
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.MessageBuilder;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;

@ExtendWith(MockitoExtension.class)
public class SolaceErrorMessageHandlerTest {
SolaceMessageHeaderErrorMessageStrategy errorMessageStrategy = new SolaceMessageHeaderErrorMessageStrategy();
Expand All @@ -33,48 +36,60 @@ public void setup() {
}

@Test
public void testAcknowledgmentCallbackHeader(@Mock AcknowledgmentCallback acknowledgementCallback) {
Message<?> inputMessage = MessageBuilder.withPayload("test")
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
Mockito.mock(AcknowledgmentCallback.class))
.build();
public void testNotAnErrorMessage() {
assertThatThrownBy(() -> errorMessageHandler.handleMessage(MessageBuilder.withPayload("test").build()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Expected an ErrorMessage");
}

attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, inputMessage);
@Test
public void testErrorMessage(@Mock AcknowledgmentCallback acknowledgementCallback) {
attributeAccessor.setAttribute(SolaceMessageHeaderErrorMessageStrategy.ATTR_SOLACE_ACKNOWLEDGMENT_CALLBACK,
acknowledgementCallback);

ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage(
new MessagingException(inputMessage),
new RuntimeException("test"),
attributeAccessor);

errorMessageHandler.handleMessage(errorMessage);
Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE);
Mockito.verify(StaticMessageHeaderAccessor.getAcknowledgmentCallback(inputMessage), Mockito.never())
.acknowledge(Mockito.any());
}

@Test
public void testFailedMessageAcknowledgmentCallback(@Mock AcknowledgmentCallback acknowledgementCallback) {
Message<?> inputMessage = MessageBuilder.withPayload("test")
public void testMessagingException(@Mock AcknowledgmentCallback acknowledgementCallback) {
ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage(
new MessagingException(MessageBuilder.withPayload("test")
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback)
.build()),
attributeAccessor);

errorMessageHandler.handleMessage(errorMessage);
Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE);
}

@Test
public void testOriginalMessage(@Mock AcknowledgmentCallback acknowledgementCallback) {
attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, MessageBuilder.withPayload("test")
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback)
.build();
attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, inputMessage);
.build());

ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage(
new MessagingException(inputMessage),
new RuntimeException("test"),
attributeAccessor);

errorMessageHandler.handleMessage(errorMessage);
Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE);
}

@Test
public void testNoFailedMessage(@Mock AcknowledgmentCallback acknowledgementCallback) {
public void testNoFailedMessage() {
ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage(
new MessagingException("test"),
attributeAccessor);

errorMessageHandler.handleMessage(errorMessage);
Mockito.verify(acknowledgementCallback, Mockito.never()).acknowledge(AcknowledgmentCallback.Status.REJECT);
assertThatThrownBy(() -> errorMessageHandler.handleMessage(errorMessage))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("does not contain an acknowledgment callback");
}

@Test
Expand All @@ -91,21 +106,44 @@ public void testNonMessagingException(@Mock AcknowledgmentCallback acknowledgeme
Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE);
}

@Test
@CartesianTest(name = "[{index}] ackCallbackHeaderProvider={0}")
public void testMessagingExceptionContainingDifferentFailedMessage(
@Mock AcknowledgmentCallback acknowledgementCallback) {
Message<?> inputMessage = MessageBuilder.withPayload("test")
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback)
@Values(strings = {"messaging-exception", "input-message", "error-message", "all:same", "all:different"})
String ackCallbackHeaderProvider,
@Mock AcknowledgmentCallback acknowledgementCallback1,
@Mock AcknowledgmentCallback acknowledgementCallback2,
@Mock AcknowledgmentCallback acknowledgementCallback3) {
Message<?> messageWithAckCallback = MessageBuilder.withPayload("with-callback-1")
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback1)
.build();
attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY,
MessageBuilder.withPayload("some-other-message").build());
Message<String> messageNoAckCallback = MessageBuilder.withPayload("some-other-message").build();

ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage(
new MessagingException(inputMessage),
attributeAccessor);
MessagingException exception = switch (ackCallbackHeaderProvider) {
case "messaging-exception", "all:same", "all:different" -> new MessagingException(messageWithAckCallback);
default -> new MessagingException(messageNoAckCallback);
};

attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, switch (ackCallbackHeaderProvider) {
case "input-message", "all:same" -> messageWithAckCallback;
case "all:different" -> MessageBuilder.withPayload("with-callback-1")
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback2)
.build();
default -> messageNoAckCallback;
});

if (ackCallbackHeaderProvider.equals("error-message") || ackCallbackHeaderProvider.startsWith("all:")) {
attributeAccessor.setAttribute(SolaceMessageHeaderErrorMessageStrategy.ATTR_SOLACE_ACKNOWLEDGMENT_CALLBACK,
ackCallbackHeaderProvider.equals("all:different") ? acknowledgementCallback3 : acknowledgementCallback1);
}

ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage(exception, attributeAccessor);

errorMessageHandler.handleMessage(errorMessage);
Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE);
Mockito.verify(acknowledgementCallback1).acknowledge(Status.REQUEUE);
Mockito.verify(acknowledgementCallback2, Mockito.times(ackCallbackHeaderProvider.equals("all:different") ? 1 : 0))
.acknowledge(Status.REQUEUE);
Mockito.verify(acknowledgementCallback3, Mockito.times(ackCallbackHeaderProvider.equals("all:different") ? 1 : 0))
.acknowledge(Status.REQUEUE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,13 +608,15 @@ public void testSendBatchOverTransactionLimit(
producerBinding.unbind();
}

@CartesianTest(name = "[{index}] channelType={0}, batchMode={1}, transacted={2}, namedConsumerGroup={3}")
@CartesianTest(name = "[{index}] channelType={0}, batchMode={1}, transacted={2}, namedConsumerGroup={3} maxAttempts={4} throwMessagingExceptionWithMissingAckCallback={5}")
@Execution(ExecutionMode.CONCURRENT)
public <T> void testConsumerRequeue(
@Values(classes = {DirectChannel.class, PollableSource.class}) Class<T> channelType,
@Values(booleans = {false, true}) boolean batchMode,
@Values(booleans = {false, true}) boolean transacted,
@Values(booleans = {false, true}) boolean namedConsumerGroup,
@Values(ints = {1, 3}) int maxAttempts,
@Values(booleans = {false, true}) boolean throwMessagingExceptionWithMissingAckCallback,
SempV2Api sempV2Api,
SoftAssertions softly,
TestInfo testInfo) throws Exception {
Expand All @@ -637,6 +639,7 @@ public <T> void testConsumerRequeue(

ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setBatchMode(batchMode);
consumerProperties.setMaxAttempts(maxAttempts);
consumerProperties.getExtension().setTransacted(transacted);
Binding<T> consumerBinding = consumerInfrastructureUtil.createBinding(binder,
destination0, group0, moduleInputChannel, consumerProperties);
Expand All @@ -657,7 +660,12 @@ public <T> void testConsumerRequeue(
softly.assertThat(msg).satisfies(isValidMessage(channelType, consumerProperties, messages));
if (numRetriesRemaining.getAndDecrement() > 0) {
callback.run();
throw new RuntimeException("Throwing expected exception!");
throw throwMessagingExceptionWithMissingAckCallback ?
new MessagingException(MessageBuilder.fromMessage(msg)
.removeHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK)
.build(),
"Throwing expected exception!") :
new RuntimeException("Throwing expected exception!");
} else {
logger.info("Received message");
softly.assertThat(msg).satisfies(hasNestedHeader(SolaceHeaders.REDELIVERED, Boolean.class,
Expand Down Expand Up @@ -687,12 +695,14 @@ public <T> void testConsumerRequeue(
consumerBinding.unbind();
}

@CartesianTest(name = "[{index}] channelType={0}, batchMode={1}, namedConsumerGroup={2}")
@CartesianTest(name = "[{index}] channelType={0}, batchMode={1}, namedConsumerGroup={2} maxAttempts={3} throwMessagingExceptionWithMissingAckCallback={4}")
@Execution(ExecutionMode.CONCURRENT)
public <T> void testConsumerErrorQueueRepublish(
@Values(classes = {DirectChannel.class, PollableSource.class}) Class<T> channelType,
@Values(booleans = {false, true}) boolean batchMode,
@Values(booleans = {false, true}) boolean namedConsumerGroup,
@Values(ints = {1, 3}) int maxAttempts,
@Values(booleans = {false, true}) boolean throwMessagingExceptionWithMissingAckCallback,
JCSMPSession jcsmpSession,
SempV2Api sempV2Api,
TestInfo testInfo) throws Exception {
Expand All @@ -712,6 +722,7 @@ public <T> void testConsumerErrorQueueRepublish(

ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setBatchMode(batchMode);
consumerProperties.setMaxAttempts(maxAttempts);
consumerProperties.getExtension().setAutoBindErrorQueue(true);
Binding<T> consumerBinding = consumerInfrastructureUtil.createBinding(binder,
destination0, group0, moduleInputChannel, consumerProperties);
Expand All @@ -729,7 +740,12 @@ public <T> void testConsumerErrorQueueRepublish(
() -> messages.forEach(moduleOutputChannel::send),
(msg, callback) -> {
callback.run();
throw new RuntimeException("Throwing expected exception!");
throw throwMessagingExceptionWithMissingAckCallback ?
new MessagingException(MessageBuilder.fromMessage(msg)
.removeHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK)
.build(),
"Throwing expected exception!") :
new RuntimeException("Throwing expected exception!");
});

assertThat(binder.getConsumerErrorQueueName(consumerBinding))
Expand Down

0 comments on commit b64f11d

Please sign in to comment.