Skip to content

Commit

Permalink
add some unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
juancgalvis committed Oct 27, 2023
1 parent cc632c4 commit 47aa8be
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextSharedStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import co.com.bancolombia.commons.jms.mq.config.exceptions.MQInvalidListenerException;
Expand All @@ -16,6 +19,7 @@
import jakarta.jms.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -39,6 +43,15 @@ public ExecutorService defaultBySelectorExecutorService() {
new SynchronousQueue<>());
}

@Bean
@ConditionalOnMissingBean(SelectorModeProvider.class)
public SelectorModeProvider defaultSelectorModeProvider(@Value("${commons.jms.selector-mode:DEFAULT}") String mode) {
if (MQListenerConfig.SelectorMode.CONTEXT_PER_MESSAGE.name().equals(mode)) {
return (factory, context) -> new ContextPerMessageStrategy(factory);
}
return SelectorModeProvider.defaultSelector();
}

@Bean
@ConditionalOnMissingBean(ReactiveReplyRouter.class)
public ReactiveReplyRouter<Message> selectorReactiveReplyRouter() {
Expand All @@ -59,7 +72,7 @@ public MQMessageSelectorListener defaultMQMessageSelectorListener(
@ConditionalOnMissingBean(MQMultiContextMessageSelectorListenerSync.class)
public MQMultiContextMessageSelectorListenerSync defaultMQMultiContextMessageSelectorListenerSync(
ConnectionFactory cf, @Qualifier("messageSelectorListenerConfig") MQListenerConfig config,
MQHealthListener healthListener, MQProperties properties) {
MQHealthListener healthListener, MQProperties properties, SelectorModeProvider selectorModeProvider) {
if (config.getConcurrency() < 1) {
throw new MQInvalidListenerException("Invalid property commons.jms.input-concurrency, minimum value 1, " +
"you have passed " + config.getConcurrency());
Expand All @@ -72,7 +85,7 @@ public MQMultiContextMessageSelectorListenerSync defaultMQMultiContextMessageSel
.initialRetryIntervalMillis(properties.getInitialRetryIntervalMillis())
.multiplier(properties.getRetryMultiplier())
.build();
return new MQMultiContextMessageSelectorListenerSync(cf, config, healthListener, retryableConfig);
return new MQMultiContextMessageSelectorListenerSync(cf, config, healthListener, retryableConfig, selectorModeProvider);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import co.com.bancolombia.commons.jms.api.MQQueueManagerSetter;
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.mq.config.exceptions.MQInvalidListenerException;
import co.com.bancolombia.commons.jms.mq.helper.JmsContextImpl;
Expand Down Expand Up @@ -68,7 +70,8 @@ void shouldHandleError() {
// Act
// Assert
assertThrows(MQInvalidListenerException.class,
() -> configurator.defaultMQMultiContextMessageSelectorListenerSync(null, config, healthListener, properties));
() -> configurator.defaultMQMultiContextMessageSelectorListenerSync(null, config, healthListener,
properties, SelectorModeProvider.defaultSelector()));
}

@Test
Expand All @@ -85,7 +88,8 @@ void shouldCreateDefaultMessageSelectorListener() {
when(context.createQueue(anyString())).thenReturn(queue);
// Act
MQMessageSelectorListenerSync listener = configurator.
defaultMQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, properties);
defaultMQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener,
properties, SelectorModeProvider.defaultSelector());
// Assert
assertNotNull(listener);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package co.com.bancolombia.commons.jms.internal.listener.selector;

import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextSharedStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorStrategy;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.reconnect.AbstractJMSReconnectable;
import co.com.bancolombia.commons.jms.utils.MQQueueUtils;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
Expand All @@ -21,8 +22,9 @@ public class MQContextMessageSelectorListenerSync extends AbstractJMSReconnectab
public static final long DEFAULT_TIMEOUT = 5000L;
private final ConnectionFactory connectionFactory;
private final MQListenerConfig config;
private final SelectorModeProvider selectorModeProvider;
private SelectorStrategy strategy;
private Destination destination;
private JMSContext context;

@Override
protected String name() {
Expand All @@ -40,9 +42,10 @@ protected MQContextMessageSelectorListenerSync connect() {
synchronized ("connectSelectorListener") {
if (handled > lastSuccess.get()) {
log.info("Starting listener {}", getProcess());
context = connectionFactory.createContext();
JMSContext context = connectionFactory.createContext();
context.setExceptionListener(this);
destination = MQQueueUtils.setupFixedQueue(context, config);
strategy = selectorModeProvider.get(connectionFactory, context);
log.info("Listener {} started successfully", getProcess());
lastSuccess.set(System.currentTimeMillis());
} else {
Expand Down Expand Up @@ -80,16 +83,12 @@ public Message getMessageBySelector(String selector, long timeout, Destination d
}

public Message getMessageBySelector(String selector, long timeout, Destination destination, boolean retry) {
try (JMSConsumer consumer = context.createConsumer(destination, selector)) {
log.info("Waiting message with selector {}", selector);
Message message = consumer.receive(timeout);
if (message == null) {
throw new ReceiveTimeoutException("Message not received in " + timeout);
}
return message;
try {
return strategy.getMessageBySelector(selector, timeout, destination);
} catch (JMSRuntimeException e) {
// Connection is broken
if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().contains("CONNECTION_BROKEN")) {
if (strategy instanceof ContextSharedStrategy && e.getCause() != null && e.getCause().getMessage() != null
&& e.getCause().getMessage().contains("CONNECTION_BROKEN")) {
connect();
}
if (retry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import jakarta.jms.ConnectionFactory;
Expand All @@ -18,13 +19,15 @@ public class MQMultiContextMessageSelectorListenerSync implements MQMessageSelec
private final MQHealthListener healthListener;
private List<MQContextMessageSelectorListenerSync> adapterList;
private final RetryableConfig retryableConfig;
private final SelectorModeProvider selectorModeProvider;

public MQMultiContextMessageSelectorListenerSync(ConnectionFactory connectionFactory, MQListenerConfig config,
MQHealthListener healthListener, RetryableConfig retryableConfig) {
MQHealthListener healthListener, RetryableConfig retryableConfig, SelectorModeProvider selectorModeProvider) {
this.connectionFactory = connectionFactory;
this.config = config;
this.healthListener = healthListener;
this.retryableConfig = retryableConfig;
this.selectorModeProvider = selectorModeProvider;
start();
}

Expand All @@ -35,6 +38,7 @@ public void start() {
.config(config)
.healthListener(healthListener)
.retryableConfig(retryableConfig)
.selectorModeProvider(selectorModeProvider)
.build()
.call())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package co.com.bancolombia.commons.jms.internal.listener.selector.strategy;

import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.Message;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Log4j2
@AllArgsConstructor
public class ContextPerMessageStrategy implements SelectorStrategy {
private final ConnectionFactory factory;

@Override
public Message getMessageBySelector(String selector, long timeout, Destination destination) {
try (JMSContext context = factory.createContext()) {
try (JMSConsumer consumer = context.createConsumer(destination, selector)) {
log.info("Waiting message with selector {}", selector);
Message message = consumer.receive(timeout);
if (message == null) {
throw new ReceiveTimeoutException("Message not received in " + timeout);
}
return message;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package co.com.bancolombia.commons.jms.internal.listener.selector.strategy;

import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.Message;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Log4j2
@AllArgsConstructor
public class ContextSharedStrategy implements SelectorStrategy {
private final JMSContext context;

@Override
public Message getMessageBySelector(String selector, long timeout, Destination destination) {
try (JMSConsumer consumer = context.createConsumer(destination, selector)) {
log.info("Waiting message with selector {}", selector);
Message message = consumer.receive(timeout);
if (message == null) {
throw new ReceiveTimeoutException("Message not received in " + timeout);
}
return message;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package co.com.bancolombia.commons.jms.internal.listener.selector.strategy;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSContext;

public interface SelectorModeProvider {
SelectorStrategy get(ConnectionFactory factory, JMSContext context);

static SelectorModeProvider defaultSelector(){
return (factory, context) -> new ContextSharedStrategy(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package co.com.bancolombia.commons.jms.internal.listener.selector.strategy;

import jakarta.jms.Destination;
import jakarta.jms.Message;

public interface SelectorStrategy {
Message getMessageBySelector(String selector, long timeout, Destination destination);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,14 @@ public class MQListenerConfig {
@Builder.Default
private final int maxRetries = -1; //NOSONAR
@Builder.Default
private final MQQueueManagerSetter qmSetter = (ctx, queue) -> {}; //NOSONAR
private final MQQueueManagerSetter qmSetter = (ctx, queue) -> {
}; //NOSONAR

@Builder.Default
SelectorMode selectorMode = SelectorMode.CONTEXT_SHARED;

public enum SelectorMode {
CONTEXT_SHARED,
CONTEXT_PER_MESSAGE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -38,26 +40,44 @@ class MQMultiContextMessageSelectorListenerSyncTest {

private MQMessageSelectorListenerSync listenerSync;

private final MQListenerConfig config = MQListenerConfig
.builder()
.concurrency(1)
.queue("QUEUE")
.build();
private final RetryableConfig retryableConfig = RetryableConfig
.builder()
.maxRetries(10)
.initialRetryIntervalMillis(1000)
.multiplier(1.5)
.build();

@BeforeEach
void setup() {
when(connectionFactory.createContext()).thenReturn(context);
when(context.createQueue(anyString())).thenReturn(queue);
MQListenerConfig config = MQListenerConfig
.builder()
.concurrency(1)
.queue("QUEUE")
.build();
RetryableConfig retryableConfig = RetryableConfig
.builder()
.maxRetries(10)
.initialRetryIntervalMillis(1000)
.multiplier(1.5)
.build();
listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, retryableConfig);
listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener,
retryableConfig, SelectorModeProvider.defaultSelector());
}

@Test
void shouldGetMessageWithContextPerMessage() {
listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener,
retryableConfig, (factory, ignored) -> new ContextPerMessageStrategy(factory));
// Arrange
String messageID = UUID.randomUUID().toString();
when(context.createConsumer(any(Destination.class), anyString())).thenReturn(consumer);
when(consumer.receive(DEFAULT_TIMEOUT)).thenReturn(message);
// Act
Message receivedMessage = listenerSync.getMessage(messageID);
// Assert
assertEquals(message, receivedMessage);
verify(consumer, times(1)).receive(DEFAULT_TIMEOUT);
}
@Test
void shouldGetMessage() {
listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener,
retryableConfig, SelectorModeProvider.defaultSelector());
// Arrange
String messageID = UUID.randomUUID().toString();
when(context.createConsumer(any(Destination.class), anyString())).thenReturn(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
Expand Down Expand Up @@ -66,7 +67,8 @@ void setup() {
.multiplier(1.5)
.build();
MQMessageSelectorListenerSync listenerSync =
new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, retryableConfig);
new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener,
retryableConfig, SelectorModeProvider.defaultSelector());
listener = new MQMultiContextMessageSelectorListener(listenerSync, Executors.newCachedThreadPool(),
new ReactiveReplyRouter<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ commons:
input-queue: "DEV.QUEUE.2"
input-queue-alias: ""
input-queue-set-queue-manager: true # enable it to set queue manager using a temporary queue
selector-mode: CONTEXT_PER_MESSAGE # CONTEXT_SHARED | CONTEXT_PER_MESSAGE
ibm:
mq:
channel: "DEV.APP.SVRCONN"
Expand Down

0 comments on commit 47aa8be

Please sign in to comment.