diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceErrorMessageHandler.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceErrorMessageHandler.java index 3317a9d1..35f746f1 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceErrorMessageHandler.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceErrorMessageHandler.java @@ -12,7 +12,10 @@ import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; -import java.util.UUID; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class SolaceErrorMessageHandler implements MessageHandler { @@ -20,54 +23,54 @@ public class SolaceErrorMessageHandler implements MessageHandler { @Override public void handleMessage(Message message) throws MessagingException { - UUID springId = StaticMessageHeaderAccessor.getId(message); - StringBuilder info = new StringBuilder("Processing message ").append(springId).append(" <"); - if (!(message instanceof ErrorMessage errorMessage)) { - LOGGER.warn("Spring message {}: Expected an {}, not a {}", springId, ErrorMessage.class.getSimpleName(), - message.getClass().getSimpleName()); - return; + throw new IllegalArgumentException(String.format("Spring message %s: Expected an %s, but got a %s", + StaticMessageHeaderAccessor.getId(message), ErrorMessage.class.getSimpleName(), + message.getClass().getSimpleName())); } - Throwable payload = errorMessage.getPayload(); - - Message failedMsg; - if (payload instanceof MessagingException && ((MessagingException) payload).getFailedMessage() != null) { - failedMsg = ((MessagingException) payload).getFailedMessage(); - } else { - failedMsg = errorMessage.getOriginalMessage(); - } - - if (failedMsg != null) { - info.append("failed-message: ").append(StaticMessageHeaderAccessor.getId(failedMsg)).append(", "); - } - - Object sourceData = StaticMessageHeaderAccessor.getSourceData(message); - if (sourceData instanceof XMLMessage) { - info.append("source-message: ").append(((XMLMessage) sourceData).getMessageId()).append(", "); - } + handleErrorMessage(errorMessage); + } - LOGGER.info(info.append('>').toString()); + private void handleErrorMessage(ErrorMessage errorMessage) { + Message messagingExceptionFailedMessage = errorMessage.getPayload() instanceof MessagingException m ? + m.getFailedMessage() : null; + Set acknowledgmentCallbacks = Stream.of(Stream.of(errorMessage), + Stream.ofNullable(messagingExceptionFailedMessage), + Stream.ofNullable(errorMessage.getOriginalMessage())) + .flatMap(s -> s) + .map(StaticMessageHeaderAccessor::getAcknowledgmentCallback) + .filter(Objects::nonNull) + .collect(Collectors.toUnmodifiableSet()); - AcknowledgmentCallback acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message); - if (acknowledgmentCallback == null && failedMsg != null) { - acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMsg); - } + LOGGER.atInfo() + .setMessage("Processing message {} ") + .addArgument(() -> StaticMessageHeaderAccessor.getId(errorMessage)) + .addArgument(() -> messagingExceptionFailedMessage != null ? + StaticMessageHeaderAccessor.getId(messagingExceptionFailedMessage) : null) + .addArgument(() -> errorMessage.getOriginalMessage() != null ? + StaticMessageHeaderAccessor.getId(errorMessage.getOriginalMessage()) : null) + .addArgument(() -> StaticMessageHeaderAccessor.getSourceData(errorMessage) instanceof XMLMessage m ? + m.getMessageId() : null) + .log(); - if (acknowledgmentCallback == null) { + if (acknowledgmentCallbacks.isEmpty()) { // Should never happen under normal use - LOGGER.warn("Spring message {} does not contain an acknowledgment callback. Message cannot be acknowledged", - springId); - return; + throw new IllegalArgumentException(String.format( + "Spring error message %s does not contain an acknowledgment callback. Message cannot be acknowledged", + StaticMessageHeaderAccessor.getId(errorMessage))); } - try { - if (!SolaceAckUtil.republishToErrorQueue(acknowledgmentCallback)) { - AckUtils.requeue(acknowledgmentCallback); + for(AcknowledgmentCallback acknowledgmentCallback : acknowledgmentCallbacks) { + try { + if (!SolaceAckUtil.republishToErrorQueue(acknowledgmentCallback)) { + AckUtils.requeue(acknowledgmentCallback); + } + } catch (SolaceAcknowledgmentException e) { + LOGGER.error("Spring error message {}: exception in error handler", + StaticMessageHeaderAccessor.getId(errorMessage), e); + throw e; } - } catch (SolaceAcknowledgmentException e) { - LOGGER.error("Spring message {}: exception in error handler", springId, e); - throw e; } } } diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceErrorMessageHandlerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceErrorMessageHandlerTest.java index d4792686..cb6aa8b1 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceErrorMessageHandlerTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceErrorMessageHandlerTest.java @@ -1,17 +1,17 @@ package com.solace.spring.cloud.stream.binder.util; -import static org.junit.jupiter.api.Assertions.assertThrows; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.TextMessage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.cartesian.CartesianTest; +import org.junitpioneer.jupiter.cartesian.CartesianTest.Values; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.core.AttributeAccessor; import org.springframework.integration.IntegrationMessageHeaderAccessor; -import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.acks.AcknowledgmentCallback.Status; import org.springframework.integration.support.ErrorMessageUtils; @@ -20,6 +20,9 @@ import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.MessageBuilder; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertThrows; + @ExtendWith(MockitoExtension.class) public class SolaceErrorMessageHandlerTest { SolaceMessageHeaderErrorMessageStrategy errorMessageStrategy = new SolaceMessageHeaderErrorMessageStrategy(); @@ -33,34 +36,45 @@ public void setup() { } @Test - public void testAcknowledgmentCallbackHeader(@Mock AcknowledgmentCallback acknowledgementCallback) { - Message inputMessage = MessageBuilder.withPayload("test") - .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, - Mockito.mock(AcknowledgmentCallback.class)) - .build(); + public void testNotAnErrorMessage() { + assertThatThrownBy(() -> errorMessageHandler.handleMessage(MessageBuilder.withPayload("test").build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Expected an ErrorMessage"); + } - attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, inputMessage); + @Test + public void testErrorMessage(@Mock AcknowledgmentCallback acknowledgementCallback) { attributeAccessor.setAttribute(SolaceMessageHeaderErrorMessageStrategy.ATTR_SOLACE_ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback); ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage( - new MessagingException(inputMessage), + new RuntimeException("test"), attributeAccessor); errorMessageHandler.handleMessage(errorMessage); Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE); - Mockito.verify(StaticMessageHeaderAccessor.getAcknowledgmentCallback(inputMessage), Mockito.never()) - .acknowledge(Mockito.any()); } @Test - public void testFailedMessageAcknowledgmentCallback(@Mock AcknowledgmentCallback acknowledgementCallback) { - Message inputMessage = MessageBuilder.withPayload("test") + public void testMessagingException(@Mock AcknowledgmentCallback acknowledgementCallback) { + ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage( + new MessagingException(MessageBuilder.withPayload("test") + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback) + .build()), + attributeAccessor); + + errorMessageHandler.handleMessage(errorMessage); + Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE); + } + + @Test + public void testOriginalMessage(@Mock AcknowledgmentCallback acknowledgementCallback) { + attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, MessageBuilder.withPayload("test") .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback) - .build(); - attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, inputMessage); + .build()); + ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage( - new MessagingException(inputMessage), + new RuntimeException("test"), attributeAccessor); errorMessageHandler.handleMessage(errorMessage); @@ -68,13 +82,14 @@ public void testFailedMessageAcknowledgmentCallback(@Mock AcknowledgmentCallback } @Test - public void testNoFailedMessage(@Mock AcknowledgmentCallback acknowledgementCallback) { + public void testNoFailedMessage() { ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage( new MessagingException("test"), attributeAccessor); - errorMessageHandler.handleMessage(errorMessage); - Mockito.verify(acknowledgementCallback, Mockito.never()).acknowledge(AcknowledgmentCallback.Status.REJECT); + assertThatThrownBy(() -> errorMessageHandler.handleMessage(errorMessage)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("does not contain an acknowledgment callback"); } @Test @@ -91,21 +106,44 @@ public void testNonMessagingException(@Mock AcknowledgmentCallback acknowledgeme Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE); } - @Test + @CartesianTest(name = "[{index}] ackCallbackHeaderProvider={0}") public void testMessagingExceptionContainingDifferentFailedMessage( - @Mock AcknowledgmentCallback acknowledgementCallback) { - Message inputMessage = MessageBuilder.withPayload("test") - .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback) + @Values(strings = {"messaging-exception", "input-message", "error-message", "all:same", "all:different"}) + String ackCallbackHeaderProvider, + @Mock AcknowledgmentCallback acknowledgementCallback1, + @Mock AcknowledgmentCallback acknowledgementCallback2, + @Mock AcknowledgmentCallback acknowledgementCallback3) { + Message messageWithAckCallback = MessageBuilder.withPayload("with-callback-1") + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback1) .build(); - attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, - MessageBuilder.withPayload("some-other-message").build()); + Message messageNoAckCallback = MessageBuilder.withPayload("some-other-message").build(); - ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage( - new MessagingException(inputMessage), - attributeAccessor); + MessagingException exception = switch (ackCallbackHeaderProvider) { + case "messaging-exception", "all:same", "all:different" -> new MessagingException(messageWithAckCallback); + default -> new MessagingException(messageNoAckCallback); + }; + + attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, switch (ackCallbackHeaderProvider) { + case "input-message", "all:same" -> messageWithAckCallback; + case "all:different" -> MessageBuilder.withPayload("with-callback-1") + .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, acknowledgementCallback2) + .build(); + default -> messageNoAckCallback; + }); + + if (ackCallbackHeaderProvider.equals("error-message") || ackCallbackHeaderProvider.startsWith("all:")) { + attributeAccessor.setAttribute(SolaceMessageHeaderErrorMessageStrategy.ATTR_SOLACE_ACKNOWLEDGMENT_CALLBACK, + ackCallbackHeaderProvider.equals("all:different") ? acknowledgementCallback3 : acknowledgementCallback1); + } + + ErrorMessage errorMessage = errorMessageStrategy.buildErrorMessage(exception, attributeAccessor); errorMessageHandler.handleMessage(errorMessage); - Mockito.verify(acknowledgementCallback).acknowledge(Status.REQUEUE); + Mockito.verify(acknowledgementCallback1).acknowledge(Status.REQUEUE); + Mockito.verify(acknowledgementCallback2, Mockito.times(ackCallbackHeaderProvider.equals("all:different") ? 1 : 0)) + .acknowledge(Status.REQUEUE); + Mockito.verify(acknowledgementCallback3, Mockito.times(ackCallbackHeaderProvider.equals("all:different") ? 1 : 0)) + .acknowledge(Status.REQUEUE); } @Test diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java index 210c856d..9b2580c4 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderBasicIT.java @@ -608,13 +608,15 @@ public void testSendBatchOverTransactionLimit( producerBinding.unbind(); } - @CartesianTest(name = "[{index}] channelType={0}, batchMode={1}, transacted={2}, namedConsumerGroup={3}") + @CartesianTest(name = "[{index}] channelType={0}, batchMode={1}, transacted={2}, namedConsumerGroup={3} maxAttempts={4} throwMessagingExceptionWithMissingAckCallback={5}") @Execution(ExecutionMode.CONCURRENT) public void testConsumerRequeue( @Values(classes = {DirectChannel.class, PollableSource.class}) Class channelType, @Values(booleans = {false, true}) boolean batchMode, @Values(booleans = {false, true}) boolean transacted, @Values(booleans = {false, true}) boolean namedConsumerGroup, + @Values(ints = {1, 3}) int maxAttempts, + @Values(booleans = {false, true}) boolean throwMessagingExceptionWithMissingAckCallback, SempV2Api sempV2Api, SoftAssertions softly, TestInfo testInfo) throws Exception { @@ -637,6 +639,7 @@ public void testConsumerRequeue( ExtendedConsumerProperties consumerProperties = createConsumerProperties(); consumerProperties.setBatchMode(batchMode); + consumerProperties.setMaxAttempts(maxAttempts); consumerProperties.getExtension().setTransacted(transacted); Binding consumerBinding = consumerInfrastructureUtil.createBinding(binder, destination0, group0, moduleInputChannel, consumerProperties); @@ -657,7 +660,12 @@ public void testConsumerRequeue( softly.assertThat(msg).satisfies(isValidMessage(channelType, consumerProperties, messages)); if (numRetriesRemaining.getAndDecrement() > 0) { callback.run(); - throw new RuntimeException("Throwing expected exception!"); + throw throwMessagingExceptionWithMissingAckCallback ? + new MessagingException(MessageBuilder.fromMessage(msg) + .removeHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) + .build(), + "Throwing expected exception!") : + new RuntimeException("Throwing expected exception!"); } else { logger.info("Received message"); softly.assertThat(msg).satisfies(hasNestedHeader(SolaceHeaders.REDELIVERED, Boolean.class, @@ -687,12 +695,14 @@ public void testConsumerRequeue( consumerBinding.unbind(); } - @CartesianTest(name = "[{index}] channelType={0}, batchMode={1}, namedConsumerGroup={2}") + @CartesianTest(name = "[{index}] channelType={0}, batchMode={1}, namedConsumerGroup={2} maxAttempts={3} throwMessagingExceptionWithMissingAckCallback={4}") @Execution(ExecutionMode.CONCURRENT) public void testConsumerErrorQueueRepublish( @Values(classes = {DirectChannel.class, PollableSource.class}) Class channelType, @Values(booleans = {false, true}) boolean batchMode, @Values(booleans = {false, true}) boolean namedConsumerGroup, + @Values(ints = {1, 3}) int maxAttempts, + @Values(booleans = {false, true}) boolean throwMessagingExceptionWithMissingAckCallback, JCSMPSession jcsmpSession, SempV2Api sempV2Api, TestInfo testInfo) throws Exception { @@ -712,6 +722,7 @@ public void testConsumerErrorQueueRepublish( ExtendedConsumerProperties consumerProperties = createConsumerProperties(); consumerProperties.setBatchMode(batchMode); + consumerProperties.setMaxAttempts(maxAttempts); consumerProperties.getExtension().setAutoBindErrorQueue(true); Binding consumerBinding = consumerInfrastructureUtil.createBinding(binder, destination0, group0, moduleInputChannel, consumerProperties); @@ -729,7 +740,12 @@ public void testConsumerErrorQueueRepublish( () -> messages.forEach(moduleOutputChannel::send), (msg, callback) -> { callback.run(); - throw new RuntimeException("Throwing expected exception!"); + throw throwMessagingExceptionWithMissingAckCallback ? + new MessagingException(MessageBuilder.fromMessage(msg) + .removeHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) + .build(), + "Throwing expected exception!") : + new RuntimeException("Throwing expected exception!"); }); assertThat(binder.getConsumerErrorQueueName(consumerBinding))