Skip to content

Commit

Permalink
add default retries (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
juancgalvis authored Oct 11, 2021
1 parent 003ce89 commit 5a29c4a
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@
* default empty and uses available MQQueueCustomizer.class bean
*/
String queueCustomizer() default "";

/**
* Max message processing retries when error handled
*
* @return max retries, specify a negative value for infinite retries
*/
String maxRetries() default "10";
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.messaging.support.MessageBuilder;
Expand All @@ -10,13 +11,17 @@
import javax.jms.MessageListener;
import java.lang.reflect.Method;

import static com.ibm.msg.client.jms.JmsConstants.JMSX_DELIVERY_COUNT;

@Log4j2
@AllArgsConstructor
public final class MQMessageListener implements MessageListener {
private final InvocableHandlerMethod method;
private final int maxRetries;

public static MQMessageListener fromBeanAndMethod(Object bean, Method invocableMethod) {
public static MQMessageListener fromBeanAndMethod(Object bean, Method invocableMethod, int retries) {
InvocableHandlerMethod handlerMethod = new InvocableHandlerMethod(bean, invocableMethod);
return new MQMessageListener(handlerMethod);
return new MQMessageListener(handlerMethod, retries);
}

@SneakyThrows
Expand All @@ -26,6 +31,17 @@ public void onMessage(Message message) {
}

private void callRealMethod(Message message) throws Exception {
method.invoke(MessageBuilder.createMessage("", new MessageHeaders(null)), message);
try {
method.invoke(MessageBuilder.createMessage("", new MessageHeaders(null)), message);
} catch (Exception error) {
if (maxRetries != -1 && maxRetries < message.getIntProperty(JMSX_DELIVERY_COUNT)) {
log.warn("Discarding message {} after {} retries", message.getJMSMessageID(), maxRetries);
log.warn("Cause", error);
} else {
log.warn("Message {} will be retried", message.getJMSMessageID());
log.warn("Cause", error);
throw error;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package co.com.bancolombia.commons.jms.mq;

import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.invocation.reactive.InvocableHandlerMethod;
import org.springframework.messaging.support.MessageBuilder;
Expand All @@ -11,24 +13,46 @@
import javax.jms.MessageListener;
import java.lang.reflect.Method;

import static com.ibm.msg.client.jms.JmsConstants.JMSX_DELIVERY_COUNT;

@Log4j2
@AllArgsConstructor
public final class MQReactiveMessageListener implements MessageListener {
private final InvocableHandlerMethod method;
private final int maxRetries;

public static MQReactiveMessageListener fromBeanAndMethod(Object bean, Method invocableMethod) {
public static MQReactiveMessageListener fromBeanAndMethod(Object bean, Method invocableMethod, int retries) {
InvocableHandlerMethod handlerMethod = new InvocableHandlerMethod(bean, invocableMethod);
return new MQReactiveMessageListener(handlerMethod);
return new MQReactiveMessageListener(handlerMethod, retries);
}

@Override
public void onMessage(Message message) {
onMessageAsync(message)
.subscribe();
onMessageAsync(message).toFuture().join();
}

@SneakyThrows
private Mono<Object> onMessageAsync(Message message) {
Mono<Object> flow = Mono.defer(() -> callRealMethod(message));
if (maxRetries != -1 && maxRetries < message.getIntProperty(JMSX_DELIVERY_COUNT)) {
flow = flow.onErrorResume(e -> discardMessage(message, e));
} else {
flow = flow.doOnError(error -> logRetry(message, error));
}
return flow.subscribeOn(Schedulers.boundedElastic());
}

@SneakyThrows
private Mono<Object> discardMessage(Message message, Throwable error) {
log.warn("Discarding message {} after {} retries", message.getJMSMessageID(), maxRetries);
log.warn("Cause", error);
return Mono.empty();
}

protected Mono<Object> onMessageAsync(Message message) {
return Mono.defer(() -> callRealMethod(message))
.subscribeOn(Schedulers.boundedElastic());
@SneakyThrows
private void logRetry(Message message, Throwable error) {
log.warn("Message {} will be retried", message.getJMSMessageID());
log.warn("Cause", error);
}

private Mono<Object> callRealMethod(Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import java.util.Objects;
import java.util.Set;

import static co.com.bancolombia.commons.jms.mq.config.utils.AnnotationUtils.*;

@Log4j2
@RequiredArgsConstructor
@Configuration
public class MQListenerAnnotationProcessor implements BeanPostProcessor, BeanFactoryAware {
public static final int DEFAULT_MAX_RETRIES = 10;
private final BeanFactory beanFactory;
private StringValueResolver embeddedValueResolver;

Expand Down Expand Up @@ -73,7 +76,7 @@ private void processJmsListener(MQListener mqListener, Method mostSpecificMethod
MQProperties properties = resolveBeanWithName("", MQProperties.class);
MQListenerConfig config = validateAnnotationConfig(mqListener, properties);
Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
MessageListener processor = getEffectiveMessageListener(bean, invocableMethod, properties.isReactive());
MessageListener processor = getEffectiveMessageListener(bean, invocableMethod, properties.isReactive(), config);
ConnectionFactory cf = resolveBeanWithName(mqListener.connectionFactory(), ConnectionFactory.class);
MQTemporaryQueuesContainer temporaryQueuesContainer = beanFactory.getBean(MQTemporaryQueuesContainer.class);
try {
Expand All @@ -84,10 +87,11 @@ private void processJmsListener(MQListener mqListener, Method mostSpecificMethod
}
}

private MessageListener getEffectiveMessageListener(Object bean, Method invocableMethod, boolean isReactive) {
private MessageListener getEffectiveMessageListener(Object bean, Method invocableMethod, boolean isReactive,
MQListenerConfig config) {
return isReactive ?
MQReactiveMessageListener.fromBeanAndMethod(bean, invocableMethod) :
MQMessageListener.fromBeanAndMethod(bean, invocableMethod);
MQReactiveMessageListener.fromBeanAndMethod(bean, invocableMethod, config.getMaxRetries()) :
MQMessageListener.fromBeanAndMethod(bean, invocableMethod, config.getMaxRetries());
}

private MQListenerConfig validateAnnotationConfig(MQListener config, MQProperties properties) {
Expand All @@ -107,39 +111,22 @@ private MQListenerConfig validateAnnotationConfig(MQListener config, MQPropertie
}
String temporaryQueue = resolveQueue(config.tempQueueAlias(), queue, properties.getInputQueueAlias());
String fixedQueue = resolveQueue(queue, temporaryQueue, properties.getInputQueue());
int finalConcurrency = resolveConcurrency(concurrency, properties.getInputConcurrency());
int maxRetries = resolveRetries(config.maxRetries());
MQListenerConfig listenerConfig = MQListenerConfig.builder()
.concurrency(resolveConcurrency(concurrency, properties.getInputConcurrency()))
.concurrency(finalConcurrency)
.tempQueueAlias(temporaryQueue)
.queue(fixedQueue)
.connectionFactory(config.connectionFactory())
.customizer(customizer)
.maxRetries(maxRetries)
.build();
if (!StringUtils.hasText(listenerConfig.getQueue()) && !StringUtils.hasText(listenerConfig.getTempQueueAlias())) {
throw new MQInvalidListenerException("Invalid configuration, should define one of value or tempQueueAlias");
}
return listenerConfig;
}

private int resolveConcurrency(int concurrencyAnnotation, int concurrencyProperties) {
if (concurrencyAnnotation > 0) {
return concurrencyAnnotation;
}
if (concurrencyProperties > 0) {
return concurrencyProperties;
}
return MQProperties.DEFAULT_CONCURRENCY;
}

private String resolveQueue(String primaryAnnotation, String secondaryValue, String queueProperties) {
if (StringUtils.hasText(primaryAnnotation)) {
return primaryAnnotation;
}
if (!StringUtils.hasText(secondaryValue) && StringUtils.hasText(queueProperties)) {
return queueProperties;
}
return null;
}

private Map<Method, Set<MQListener>> getAnnotatedMethods(Class<?> targetClass) {
return MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<MQListener>>) method -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
@ConfigurationProperties(prefix = "commons.jms")
public class MQProperties {
public static final int DEFAULT_CONCURRENCY = 1;
public static final int DEFAULT_MAX_RETRIES = 10;
private int outputConcurrency = DEFAULT_CONCURRENCY;
private String outputQueue;
private int inputConcurrency = DEFAULT_CONCURRENCY;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package co.com.bancolombia.commons.jms.mq.config.utils;

import co.com.bancolombia.commons.jms.mq.config.MQProperties;
import org.springframework.util.StringUtils;

public class AnnotationUtils {

public static int resolveRetries(String maxRetriesStr) {
try {
int maxRetries = Integer.parseInt(maxRetriesStr);
if (maxRetries < 0) {
maxRetries = -1;
}
return maxRetries;
} catch (Exception ignored) {
return MQProperties.DEFAULT_MAX_RETRIES;
}
}

public static int resolveConcurrency(int concurrencyAnnotation, int concurrencyProperties) {
if (concurrencyAnnotation > 0) {
return concurrencyAnnotation;
}
if (concurrencyProperties > 0) {
return concurrencyProperties;
}
return MQProperties.DEFAULT_CONCURRENCY;
}

public static String resolveQueue(String primaryAnnotation, String secondaryValue, String queueProperties) {
if (StringUtils.hasText(primaryAnnotation)) {
return primaryAnnotation;
}
if (!StringUtils.hasText(secondaryValue) && StringUtils.hasText(queueProperties)) {
return queueProperties;
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package co.com.bancolombia.commons.jms.mq;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.handler.invocation.reactive.InvocableHandlerMethod;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import javax.jms.JMSException;
import javax.jms.Message;

import static com.ibm.msg.client.jms.JmsConstants.JMSX_DELIVERY_COUNT;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
Expand All @@ -19,18 +21,40 @@ class MQReactiveMessageListenerTest {
private InvocableHandlerMethod handlerMethod;
@Mock
private Message message;
@InjectMocks
private MQReactiveMessageListener listener;

@BeforeEach
void setup() {
listener = new MQReactiveMessageListener(handlerMethod, 1);
}

@Test
void shouldListen() {
// Arrange
when(handlerMethod.invoke(any(), any())).thenReturn(Mono.empty());
// Act
Mono<Object> listen = listener.onMessageAsync(message);
listener.onMessage(message);
// Assert
verify(handlerMethod, times(1)).invoke(any(), any(Message.class));
}

@Test
void shouldThrowError() throws JMSException {
// Arrange
when(handlerMethod.invoke(any(), any())).thenReturn(Mono.error(new RuntimeException()));
when(message.getIntProperty(JMSX_DELIVERY_COUNT)).thenReturn(0);
// Act
// Assert
assertThrows(RuntimeException.class, () -> listener.onMessage(message));
}

StepVerifier.create(listen)
.verifyComplete();
@Test
void shouldHandleErrorWhenReachedRetriesAttempts() throws JMSException {
// Arrange
when(handlerMethod.invoke(any(), any())).thenReturn(Mono.error(new RuntimeException()));
when(message.getIntProperty(JMSX_DELIVERY_COUNT)).thenReturn(2);
// Act
listener.onMessage(message);
// Assert
verify(handlerMethod, times(1)).invoke(any(), any(Message.class));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package co.com.bancolombia.commons.jms.mq.config.utils;

import co.com.bancolombia.commons.jms.mq.config.MQProperties;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class AnnotationUtilsTest {

@Test
void shouldResolveRetries() {
assertEquals(1, AnnotationUtils.resolveRetries("1"));
assertEquals(0, AnnotationUtils.resolveRetries("0"));
assertEquals(-1, AnnotationUtils.resolveRetries("-10"));
assertEquals(20, AnnotationUtils.resolveRetries("20"));
assertEquals(MQProperties.DEFAULT_MAX_RETRIES, AnnotationUtils.resolveRetries("NO"));
assertEquals(MQProperties.DEFAULT_MAX_RETRIES, AnnotationUtils.resolveRetries(""));
}

@Test
void shouldResolveConcurrency() {
assertEquals(1, AnnotationUtils.resolveConcurrency(1, -1));
assertEquals(2, AnnotationUtils.resolveConcurrency(0, 2));
assertEquals(MQProperties.DEFAULT_CONCURRENCY, AnnotationUtils.resolveConcurrency(0, 0));
}

@Test
void shouldResolveQueue() {
assertEquals("Q1", AnnotationUtils.resolveQueue("Q1", "Q2", "Q3"));
assertNull(AnnotationUtils.resolveQueue("", "Q2", "Q3"));
assertNull(AnnotationUtils.resolveQueue("", "", ""));
assertEquals("Q3", AnnotationUtils.resolveQueue("", "", "Q3"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ public class MQListenerConfig {
@Builder.Default
private final MQQueueCustomizer customizer = ignored -> { //NOSONAR
};
@Builder.Default
private final int maxRetries = -1; //NOSONAR
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class MyMQListener {
private final ReactiveReplyRouterUseCase useCase;
private final ObjectMapper mapper;

@MQListener
@MQListener(maxRetries = "10")
public Mono<Void> process(Message message) throws JMSException, JsonProcessingException {
log.info("Received and processing");
TextMessage textMessage = (TextMessage) message;
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.0.4
version=0.0.5
springBootVersion=2.4.8
gradleVersionsVersion=0.28.0
mqJMSVersion=2.5.0
Expand Down

0 comments on commit 5a29c4a

Please sign in to comment.