Skip to content

Commit

Permalink
Feature/fixed queues container (#8)
Browse files Browse the repository at this point in the history
* add fixed queues to container, add current connected qm to fixed queue

* add tests and fix codesmells

* fix codesmells
  • Loading branch information
juancgalvis authored Jan 13, 2022
1 parent 5a29c4a commit eed6f54
Show file tree
Hide file tree
Showing 26 changed files with 725 additions and 183 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ JMS 2.0.
@EnableMQMessageSender
public class SampleMQMessageSender {
private final MQMessageSender sender;
// private final MQTemporaryQueuesContainer container; // Inject it to reference a temporary queue
// private final MQQueuesContainer container; // Inject it to reference a temporary queue

public Mono<String> send(String message) {
return sender.send(context -> {
Expand All @@ -168,7 +168,7 @@ public class SampleMQMessageSender {
@EnableMQMessageSender
public class SampleMQMessageSender {
private final MQMessageSenderSync sender;
// private final MQTemporaryQueuesContainer container; // Inject it to reference a temporary queue
// private final MQQueuesContainer container; // Inject it to reference a temporary queue

public String send(String message) {
return sender.send(context -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package co.com.bancolombia.commons.jms.api;

import javax.jms.JMSContext;
import javax.jms.Queue;

public interface MQBrokerUtils {
void setQueueManager(JMSContext context, Queue queue);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package co.com.bancolombia.commons.jms.api;

import javax.jms.Queue;

public interface MQQueuesContainer {
void registerQueue(String key, Queue queue);

Queue get(String alias);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import javax.jms.TemporaryQueue;

/**
* @deprecated This class will be removed.
* Use {@link MQQueuesContainer} class instead.
*/
@Deprecated
public interface MQTemporaryQueuesContainer {
void registerTemporaryQueue(String alias, TemporaryQueue queue);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package co.com.bancolombia.commons.jms.mq.config;

import co.com.bancolombia.commons.jms.api.MQBrokerUtils;
import co.com.bancolombia.commons.jms.api.MQQueueCustomizer;
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.api.MQTemporaryQueuesContainer;
import co.com.bancolombia.commons.jms.mq.utils.MQUtils;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import co.com.bancolombia.commons.jms.utils.MQTemporaryQueuesContainerImp;
import com.ibm.mq.jms.MQQueue;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static com.ibm.msg.client.wmq.common.CommonConstants.*;
import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_MQMD_READ_ENABLED;
import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_MQMD_WRITE_ENABLED;
import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_PUT_ASYNC_ALLOWED_ENABLED;
import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_READ_AHEAD_ALLOWED_ENABLED;
import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_TARGET_CLIENT;

@Configuration
public class MQAutoconfiguration {
Expand All @@ -26,9 +34,24 @@ public MQQueueCustomizer defaultMQQueueCustomizer() {
};
}

@Bean
@ConditionalOnMissingBean(MQQueuesContainer.class)
public MQQueuesContainer defaultMQQueuesContainer() {
return new MQQueuesContainerImp();
}

@Bean
@ConditionalOnMissingBean(MQTemporaryQueuesContainer.class)
public MQTemporaryQueuesContainer defaultMQTemporaryQueuesContainer() {
return new MQTemporaryQueuesContainerImp();
public MQTemporaryQueuesContainer defaultMQTemporaryQueuesContainer(MQQueuesContainer container) {
return new MQTemporaryQueuesContainerImp(container);
}

@Bean
@ConditionalOnMissingBean(MQBrokerUtils.class)
public MQBrokerUtils defaultMqBrokerUtils() {
return (context, queue) -> {
String qmName = MQUtils.extractQMName(context);
MQUtils.setQMName(queue, qmName);
};
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package co.com.bancolombia.commons.jms.mq.config;

import co.com.bancolombia.commons.jms.api.MQBrokerUtils;
import co.com.bancolombia.commons.jms.api.MQQueueCustomizer;
import co.com.bancolombia.commons.jms.api.MQTemporaryQueuesContainer;
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.mq.MQListener;
import co.com.bancolombia.commons.jms.mq.MQListeners;
import co.com.bancolombia.commons.jms.mq.MQMessageListener;
import co.com.bancolombia.commons.jms.mq.MQReactiveMessageListener;
import co.com.bancolombia.commons.jms.mq.config.exceptions.MQInvalidListenerException;
import co.com.bancolombia.commons.jms.mq.utils.MQUtils;
import co.com.bancolombia.commons.jms.utils.MQMessageListenerUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
Expand All @@ -34,7 +36,9 @@
import java.util.Objects;
import java.util.Set;

import static co.com.bancolombia.commons.jms.mq.config.utils.AnnotationUtils.*;
import static co.com.bancolombia.commons.jms.mq.config.utils.AnnotationUtils.resolveConcurrency;
import static co.com.bancolombia.commons.jms.mq.config.utils.AnnotationUtils.resolveQueue;
import static co.com.bancolombia.commons.jms.mq.config.utils.AnnotationUtils.resolveRetries;

@Log4j2
@RequiredArgsConstructor
Expand Down Expand Up @@ -63,11 +67,9 @@ public void setBeanFactory(BeanFactory beanFactory) {

private void processAnnotated(Object bean, String beanName, Map<Method, Set<MQListener>> annotatedMethods) {
if (!annotatedMethods.isEmpty()) {
annotatedMethods.forEach((method, listeners) ->
listeners.forEach(listener -> processJmsListener(listener, method, bean)));
annotatedMethods.forEach((method, listeners) -> listeners.forEach(listener -> processJmsListener(listener, method, bean)));
if (log.isInfoEnabled()) {
log.info("{} @MQListener methods processed on bean '{}': {}",
annotatedMethods.size(), beanName, annotatedMethods);
log.info("{} @MQListener methods processed on bean '{}': {}", annotatedMethods.size(), beanName, annotatedMethods);
}
}
}
Expand All @@ -78,62 +80,48 @@ private void processJmsListener(MQListener mqListener, Method mostSpecificMethod
Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
MessageListener processor = getEffectiveMessageListener(bean, invocableMethod, properties.isReactive(), config);
ConnectionFactory cf = resolveBeanWithName(mqListener.connectionFactory(), ConnectionFactory.class);
MQTemporaryQueuesContainer temporaryQueuesContainer = beanFactory.getBean(MQTemporaryQueuesContainer.class);
MQQueuesContainer queuesContainer = beanFactory.getBean(MQQueuesContainer.class);
MQBrokerUtils mqBrokerUtils = beanFactory.getBean(MQBrokerUtils.class);

try {
MQMessageListenerUtils.createListeners(cf, processor, temporaryQueuesContainer, config);
MQMessageListenerUtils.createListeners(cf, processor, queuesContainer, mqBrokerUtils, config);
} catch (JMSRuntimeException ex) {
throw new BeanInitializationException("Could not register MQ listener on [" +
mostSpecificMethod + "], using ConnectionFactory: " + cf, ex);
throw new BeanInitializationException("Could not register MQ listener on [" + mostSpecificMethod + "], using ConnectionFactory: " + cf, ex);
}
}

private MessageListener getEffectiveMessageListener(Object bean, Method invocableMethod, boolean isReactive,
MQListenerConfig config) {
return isReactive ?
MQReactiveMessageListener.fromBeanAndMethod(bean, invocableMethod, config.getMaxRetries()) :
MQMessageListener.fromBeanAndMethod(bean, invocableMethod, config.getMaxRetries());
private MessageListener getEffectiveMessageListener(Object bean, Method invocableMethod, boolean isReactive, MQListenerConfig config) {
return isReactive ? MQReactiveMessageListener.fromBeanAndMethod(bean, invocableMethod, config.getMaxRetries()) : MQMessageListener.fromBeanAndMethod(bean, invocableMethod, config.getMaxRetries());
}

private MQListenerConfig validateAnnotationConfig(MQListener config, MQProperties properties) {
// Resolve dynamic values
int concurrency = Integer.parseInt(Objects.
requireNonNull(embeddedValueResolver.resolveStringValue(config.concurrency())));
int concurrency = Integer.parseInt(Objects.requireNonNull(embeddedValueResolver.resolveStringValue(config.concurrency())));
String queue = embeddedValueResolver.resolveStringValue(config.value());
MQQueueCustomizer customizer = resolveBeanWithName(config.queueCustomizer(), MQQueueCustomizer.class);
// Map params
if (StringUtils.hasText(queue) && StringUtils.hasText(config.tempQueueAlias())) {
throw new MQInvalidListenerException("Invalid configuration, should define only one of value or " +
"tempQueueAlias in @MQListener annotation");
throw new MQInvalidListenerException("Invalid configuration, should define only one of value or " + "tempQueueAlias in @MQListener annotation");
}
if (StringUtils.hasText(properties.getInputQueue()) && StringUtils.hasText(properties.getInputQueueAlias())) {
throw new MQInvalidListenerException("Invalid configuration, should define only one of " +
"commons.jms.input-queue or commons.jms.input-queue-alias in your application.yaml file");
throw new MQInvalidListenerException("Invalid configuration, should define only one of " + "commons.jms.input-queue or commons.jms.input-queue-alias in your application.yaml file");
}
String temporaryQueue = resolveQueue(config.tempQueueAlias(), queue, properties.getInputQueueAlias());
String fixedQueue = resolveQueue(queue, temporaryQueue, properties.getInputQueue());
int finalConcurrency = resolveConcurrency(concurrency, properties.getInputConcurrency());
int maxRetries = resolveRetries(config.maxRetries());
MQListenerConfig listenerConfig = MQListenerConfig.builder()
.concurrency(finalConcurrency)
.tempQueueAlias(temporaryQueue)
.queue(fixedQueue)
.connectionFactory(config.connectionFactory())
.customizer(customizer)
.maxRetries(maxRetries)
.build();
MQListenerConfig listenerConfig = MQListenerConfig.builder().concurrency(finalConcurrency).tempQueueAlias(temporaryQueue).queue(fixedQueue).connectionFactory(config.connectionFactory()).customizer(customizer).maxRetries(maxRetries).build();
if (!StringUtils.hasText(listenerConfig.getQueue()) && !StringUtils.hasText(listenerConfig.getTempQueueAlias())) {
throw new MQInvalidListenerException("Invalid configuration, should define one of value or tempQueueAlias");
}
return listenerConfig;
}

private Map<Method, Set<MQListener>> getAnnotatedMethods(Class<?> targetClass) {
return MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<MQListener>>) method -> {
Set<MQListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, MQListener.class, MQListeners.class);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
return MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<MQListener>>) method -> {
Set<MQListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, MQListener.class, MQListeners.class);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
}

private <T> T resolveBeanWithName(String beanName, Class<T> tClass) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package co.com.bancolombia.commons.jms.mq.config.utils;

import co.com.bancolombia.commons.jms.mq.config.MQProperties;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.util.StringUtils;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class AnnotationUtils {

public static int resolveRetries(String maxRetriesStr) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package co.com.bancolombia.commons.jms.mq.utils;


import com.ibm.mq.jms.MQQueue;
import com.ibm.msg.client.jms.JmsReadablePropertyContext;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;

import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Queue;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Field;

import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_QUEUE_MANAGER;

@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class MQUtils {
private static final String CONNECTION_PROPERTY = "connection";

public static String extractQMName(JMSContext context) {
try {
Field field = context.getClass().getDeclaredField(CONNECTION_PROPERTY);
AccessibleObject.setAccessible(new AccessibleObject[]{field}, true);
JmsReadablePropertyContext readable = (JmsReadablePropertyContext) field.get(context);
AccessibleObject.setAccessible(new AccessibleObject[]{field}, false);
String qmName = readable.getStringProperty(WMQ_QUEUE_MANAGER);
log.info("Listening queue manager name got successfully: {}", qmName);
return qmName;
} catch (NoSuchFieldException | JMSException | IllegalAccessException e) {
log.warn("Error getting queue manager name from JMSContext", e);
return "";
}
}

public static void setQMName(Queue queue, String qmName) {
try {
((MQQueue) queue).setBaseQueueManagerName(qmName);
} catch (JMSException e) {
log.warn("Error setting queue manager name to queue", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package co.com.bancolombia.commons.jms.mq.config;

import co.com.bancolombia.commons.jms.api.MQBrokerUtils;
import co.com.bancolombia.commons.jms.api.MQQueueCustomizer;
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.api.MQTemporaryQueuesContainer;
import com.ibm.mq.jms.MQQueue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import javax.jms.JMSContext;
import javax.jms.JMSException;

import static com.ibm.msg.client.wmq.common.CommonConstants.WMQ_TARGET_CLIENT;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
class MQAutoconfiguracionTest {
private final MQAutoconfiguration configuration = new MQAutoconfiguration();
@Mock
private MQQueue queue;
@Mock
private JMSContext context;

@Test
void shouldCreateCustomizer() throws JMSException {
MQQueueCustomizer customizer = configuration.defaultMQQueueCustomizer();
customizer.customize(queue);
verify(queue, times(1)).setProperty(WMQ_TARGET_CLIENT, "1");
}

@Test
void shouldCreateContainers() {
MQQueuesContainer container = configuration.defaultMQQueuesContainer();
MQTemporaryQueuesContainer containerTemporary = configuration.defaultMQTemporaryQueuesContainer(container);
Assertions.assertNull(containerTemporary.get("non-existent"));
}

@Test
void shouldCreateBrokerUtils() throws JMSException {
MQBrokerUtils utils = configuration.defaultMqBrokerUtils();
utils.setQueueManager(context, queue);
verify(queue, times(1)).setBaseQueueManagerName("");
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package co.com.bancolombia.commons.jms.mq.config;

import co.com.bancolombia.commons.jms.api.MQBrokerUtils;
import co.com.bancolombia.commons.jms.api.MQQueueCustomizer;
import co.com.bancolombia.commons.jms.api.MQTemporaryQueuesContainer;
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.mq.MQListener;
import co.com.bancolombia.commons.jms.mq.config.exceptions.MQInvalidListenerException;
import co.com.bancolombia.commons.jms.utils.MQQueueUtils;
import lombok.extern.java.Log;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -21,18 +23,22 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class MQListenerAnnotationProcessorTest {
@Mock
private ConfigurableBeanFactory factory;
@Mock
private MQTemporaryQueuesContainer container;
private MQQueuesContainer container;
@Mock
private ConnectionFactory cf;
@Mock
private MQQueueCustomizer customizer;
@Mock
private MQBrokerUtils brokerUtils;
@InjectMocks
private MQListenerAnnotationProcessor processor;

Expand All @@ -49,7 +55,8 @@ void shouldProcessAnnotated() {
// Arrange
Object bean = new MyListener();
doReturn(new MQProperties()).when(factory).getBean(MQProperties.class);
doReturn(container).when(factory).getBean(MQTemporaryQueuesContainer.class);
doReturn(container).when(factory).getBean(MQQueuesContainer.class);
doReturn(brokerUtils).when(factory).getBean(MQBrokerUtils.class);
doReturn(cf).when(factory).getBean(ConnectionFactory.class);
doReturn(cf).when(factory).getBean("custom", ConnectionFactory.class);
// Act
Expand All @@ -65,7 +72,8 @@ void shouldProcessAnnotatedReactive() {
MQProperties properties = new MQProperties();
properties.setReactive(true);
doReturn(properties).when(factory).getBean(MQProperties.class);
doReturn(container).when(factory).getBean(MQTemporaryQueuesContainer.class);
doReturn(container).when(factory).getBean(MQQueuesContainer.class);
doReturn(brokerUtils).when(factory).getBean(MQBrokerUtils.class);
doReturn(cf).when(factory).getBean(ConnectionFactory.class);
doReturn(cf).when(factory).getBean("custom", ConnectionFactory.class);
// Act
Expand All @@ -81,7 +89,7 @@ void shouldWorksWithInvalidConcurrency() {
properties.setInputConcurrency(0);
properties.setReactive(true);
doReturn(properties).when(factory).getBean(MQProperties.class);
doReturn(container).when(factory).getBean(MQTemporaryQueuesContainer.class);
doReturn(container).when(factory).getBean(MQQueuesContainer.class);
doReturn(cf).when(factory).getBean(ConnectionFactory.class);
Object bean = new MyReactiveListenerInvalidConcurrency();
// Act
Expand Down
Loading

0 comments on commit eed6f54

Please sign in to comment.