diff --git a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java index b4a022c..e4fac49 100644 --- a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java +++ b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java @@ -31,8 +31,8 @@ @Log4j2 public class MQAutoconfigurationSelectorListener { - public static int MAX_THREADS = 200; - public static long KEEP_ALIVE_SECONDS = 5L; + public static final int MAX_THREADS = 200; + public static final long KEEP_ALIVE_SECONDS = 5L; @Bean @ConditionalOnMissingBean(ExecutorService.class) diff --git a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListenerTest.java b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListenerTest.java index 1520864..7786282 100644 --- a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListenerTest.java +++ b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListenerTest.java @@ -67,11 +67,12 @@ void shouldHandleError() { MQProperties properties = new MQProperties(); properties.setMaxRetries(10); properties.setInitialRetryIntervalMillis(1000); + SelectorModeProvider provider = SelectorModeProvider.defaultSelector(); // Act // Assert assertThrows(MQInvalidListenerException.class, () -> configurator.defaultMQMultiContextMessageSelectorListenerSync(null, config, healthListener, - properties, SelectorModeProvider.defaultSelector())); + properties, provider)); } @Test diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQContextMessageSelectorListenerSync.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQContextMessageSelectorListenerSync.java index 21c6f36..0dee609 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQContextMessageSelectorListenerSync.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQContextMessageSelectorListenerSync.java @@ -34,12 +34,13 @@ protected String name() { @Override protected void disconnect() throws JMSException { + // do not disconnect to avoid another thread exceptions } @Override protected MQContextMessageSelectorListenerSync connect() { long handled = System.currentTimeMillis(); - synchronized ("connectSelectorListener") { + synchronized (this) { if (handled > lastSuccess.get()) { log.info("Starting listener {}", getProcess()); JMSContext context = connectionFactory.createContext(); @@ -82,7 +83,7 @@ public Message getMessageBySelector(String selector, long timeout, Destination d return getMessageBySelector(selector, timeout, destination, true); } - public Message getMessageBySelector(String selector, long timeout, Destination destination, boolean retry) { + protected Message getMessageBySelector(String selector, long timeout, Destination destination, boolean retry) { try { return strategy.getMessageBySelector(selector, timeout, destination); } catch (JMSRuntimeException e) { diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListener.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListener.java index d36e347..81fdcca 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListener.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListener.java @@ -11,6 +11,7 @@ import java.time.Duration; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; @Log4j2 @AllArgsConstructor @@ -22,84 +23,52 @@ public class MQMultiContextMessageSelectorListener implements MQMessageSelectorL @Override public Mono getMessage(String correlationId) { return router.wait(correlationId) - .doOnSubscribe(s -> executorService.submit(() -> { - try { - Message message = listenerSync.getMessage(correlationId); - router.reply(correlationId, message); - } catch (Exception e) { - log.warn("Error getting message with correlationId: {}", correlationId, e); - router.error(correlationId, e); - } - })); + .doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId, + () -> listenerSync.getMessage(correlationId)))); } @Override public Mono getMessage(String correlationId, long timeout) { return router.wait(correlationId, Duration.ofMillis(timeout)) - .doOnSubscribe(s -> executorService.submit(() -> { - try { - Message message = listenerSync.getMessage(correlationId, timeout); - router.reply(correlationId, message); - } catch (Exception e) { - log.warn("Error getting message with correlationId: {}", correlationId, e); - router.error(correlationId, e); - } - })); + .doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId, + () -> listenerSync.getMessage(correlationId, timeout)))); } @Override public Mono getMessage(String correlationId, long timeout, Destination destination) { return router.wait(correlationId, Duration.ofMillis(timeout)) - .doOnSubscribe(s -> executorService.submit(() -> { - try { - Message message = listenerSync.getMessage(correlationId, timeout, destination); - router.reply(correlationId, message); - } catch (Exception e) { - log.warn("Error getting message with correlationId: {}", correlationId, e); - router.error(correlationId, e); - } - })); + .doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId, + () -> listenerSync.getMessage(correlationId, timeout, destination)))); } @Override public Mono getMessageBySelector(String selector) { return router.wait(selector) - .doOnSubscribe(s -> executorService.submit(() -> { - try { - Message message = listenerSync.getMessageBySelector(selector); - router.reply(selector, message); - } catch (Exception e) { - log.warn("Error getting message with selector: {}", selector, e); - router.error(selector, e); - } - })); + .doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector, + () -> listenerSync.getMessageBySelector(selector)))); } @Override public Mono getMessageBySelector(String selector, long timeout) { return router.wait(selector, Duration.ofMillis(timeout)) - .doOnSubscribe(s -> executorService.submit(() -> { - try { - Message message = listenerSync.getMessageBySelector(selector, timeout); - router.reply(selector, message); - } catch (Exception e) { - log.warn("Error getting message with selector: {}", selector, e); - router.error(selector, e); - } - })); + .doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector, + () -> listenerSync.getMessageBySelector(selector, timeout)))); } @Override public Mono getMessageBySelector(String selector, long timeout, Destination destination) { return router.wait(selector, Duration.ofMillis(timeout)) - .doOnSubscribe(s -> executorService.submit(() -> { - try { - Message message = listenerSync.getMessageBySelector(selector, timeout, destination); - router.reply(selector, message); - } catch (Exception e) { - log.warn("Error getting message with selector: {}", selector, e); - router.error(selector, e); - } - })); + .doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector, + () -> listenerSync.getMessageBySelector(selector, timeout, destination)))); + } + + private void realGetMessageBySelector(String selector, Supplier supplier) { + try { + Message message = supplier.get(); + router.reply(selector, message); + } catch (Exception e) { + log.warn("Error getting message with selector: {}", selector, e); + router.error(selector, e); + } } } diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSync.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSync.java index a9b35d8..8804b0f 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSync.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSync.java @@ -72,7 +72,7 @@ public Message getMessageBySelector(String selector, long timeout, Destination d return getRandom().getMessageBySelector(selector, timeout, destination); } - private MQMessageSelectorListenerSync getRandom() { + protected MQMessageSelectorListenerSync getRandom() { int selectIndex = (int) (System.currentTimeMillis() % config.getConcurrency()); return adapterList.get(selectIndex); } diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/MQListenerConfig.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/MQListenerConfig.java index 8417c2b..9128970 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/MQListenerConfig.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/MQListenerConfig.java @@ -22,7 +22,7 @@ public class MQListenerConfig { @Builder.Default private final int maxRetries = -1; //NOSONAR @Builder.Default - private final MQQueueManagerSetter qmSetter = (ctx, queue) -> { + private final MQQueueManagerSetter qmSetter = (ctx, queueName) -> { }; //NOSONAR @Builder.Default diff --git a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/MQContextListenerTest.java b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/MQContextListenerTest.java index 88fc0a4..7b4cc21 100644 --- a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/MQContextListenerTest.java +++ b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/MQContextListenerTest.java @@ -4,17 +4,17 @@ import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - import jakarta.jms.ConnectionFactory; import jakarta.jms.JMSConsumer; import jakarta.jms.JMSContext; +import jakarta.jms.JMSException; import jakarta.jms.MessageListener; import jakarta.jms.Queue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.times; @@ -62,4 +62,17 @@ void shouldStartListener() { // Assert verify(consumer, times(1)).setMessageListener(listener); } + + @Test + void shouldDisconnectAndConnectOnException() throws JMSException { + // Arrange + when(connectionFactory.createContext()).thenReturn(context); + when(context.createQueue(anyString())).thenReturn(queue); + when(context.createConsumer(queue)).thenReturn(consumer); + contextListener.call(); + // Act + contextListener.disconnect(); + // Assert + verify(consumer, times(1)).close(); + } } diff --git a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/MQMultiConnectionListenerTest.java b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/MQMultiConnectionListenerTest.java new file mode 100644 index 0000000..5735915 --- /dev/null +++ b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/MQMultiConnectionListenerTest.java @@ -0,0 +1,83 @@ +package co.com.bancolombia.commons.jms.internal.listener; + +import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; +import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; +import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp; +import jakarta.jms.Connection; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSException; +import jakarta.jms.JMSRuntimeException; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageListener; +import jakarta.jms.Session; +import jakarta.jms.TemporaryQueue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.concurrent.Executors; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class MQMultiConnectionListenerTest { + @Mock + private Session session; + @Mock + private MessageListener listener; + @Mock + private Connection connection; + + @Mock + private ConnectionFactory connectionFactory; + + @Mock + private MQHealthListener healthListener; + + @Mock + private TemporaryQueue tmpQueue; + + private MQMultiConnectionListener connectionListener; + + @BeforeEach + void setup() throws JMSException { + when(connectionFactory.createConnection()).thenReturn(connection); + when(connection.createSession()).thenReturn(session); + when(session.createTemporaryQueue()).thenReturn(tmpQueue); + connectionListener = MQMultiConnectionListener.builder() + .listener(listener) + .connection(connection) + .connectionFactory(connectionFactory) + .container(new MQQueuesContainerImp()) + .healthListener(healthListener) + .config(MQListenerConfig.builder().build()) + .service(Executors.newCachedThreadPool()) + .build(); + } + + @Test + void shouldStartListener() throws JMSException { + // Arrange + // Act + connectionListener.call(); + // Assert + verify(connection, times(1)).start(); + } + + @Test + void shouldDisconnect() throws JMSException { + // Arrange + connectionListener.call(); + // Act + connectionListener.disconnect(); + // Assert + verify(connection, times(1)).close(); + } + + +} diff --git a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSyncTest.java b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSyncTest.java index e39019a..7e52279 100644 --- a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSyncTest.java +++ b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSyncTest.java @@ -1,26 +1,37 @@ package co.com.bancolombia.commons.jms.internal.listener.selector; -import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync; import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException; import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy; import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider; import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.Destination; +import jakarta.jms.JMSConsumer; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSException; +import jakarta.jms.JMSRuntimeException; +import jakarta.jms.Message; +import jakarta.jms.Queue; +import jakarta.jms.TextMessage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import jakarta.jms.*; import java.util.UUID; import static co.com.bancolombia.commons.jms.internal.listener.selector.MQContextMessageSelectorListenerSync.DEFAULT_TIMEOUT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -37,8 +48,12 @@ class MQMultiContextMessageSelectorListenerSyncTest { private TextMessage message; @Mock private MQHealthListener healthListener; + @Mock + private Destination destination; + @Mock + private SelectorModeProvider selectorModeProvider; - private MQMessageSelectorListenerSync listenerSync; + private MQMultiContextMessageSelectorListenerSync listenerSync; private final MQListenerConfig config = MQListenerConfig .builder() @@ -56,8 +71,10 @@ class MQMultiContextMessageSelectorListenerSyncTest { void setup() { when(connectionFactory.createContext()).thenReturn(context); when(context.createQueue(anyString())).thenReturn(queue); + when(selectorModeProvider.get(any(), any())) + .thenReturn(SelectorModeProvider.defaultSelector().get(connectionFactory, context)); listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, - retryableConfig, SelectorModeProvider.defaultSelector()); + retryableConfig, selectorModeProvider); } @Test @@ -74,6 +91,7 @@ void shouldGetMessageWithContextPerMessage() { assertEquals(message, receivedMessage); verify(consumer, times(1)).receive(DEFAULT_TIMEOUT); } + @Test void shouldGetMessage() { listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, @@ -128,6 +146,50 @@ void shouldGetMessageBySelectorWithTimeout() { verify(consumer, times(1)).receive(DEFAULT_TIMEOUT); } + @Test + void shouldRetrySelectMessageById() { + // Arrange + String messageID = UUID.randomUUID().toString(); + when(context.createConsumer(any(Destination.class), anyString())).thenReturn(consumer); + when(consumer.receive(DEFAULT_TIMEOUT)).thenThrow(new JMSRuntimeException("")).thenReturn(message); + // Act + Message receivedMessage = listenerSync.getMessage(messageID, DEFAULT_TIMEOUT, destination); + // Assert + assertEquals(message, receivedMessage); + verify(connectionFactory, times(1)).createContext(); + verify(consumer, times(2)).receive(DEFAULT_TIMEOUT); + } + + @Test + void shouldReconnectWhenErrorBroken() { + // Arrange + String messageID = UUID.randomUUID().toString(); + when(context.createConsumer(any(Destination.class), anyString())).thenReturn(consumer); + when(consumer.receive(DEFAULT_TIMEOUT)) + .thenThrow(new JMSRuntimeException("error", "code", new Exception("Error CONNECTION_BROKEN"))) + .thenReturn(message); + // Act + Message receivedMessage = listenerSync.getMessage(messageID, DEFAULT_TIMEOUT, destination); + // Assert + assertEquals(message, receivedMessage); + verify(connectionFactory, times(2)).createContext(); + verify(consumer, times(2)).receive(DEFAULT_TIMEOUT); + } + + @Test + void shouldHandleErrorWhenRetryError() { + // Arrange + String messageID = UUID.randomUUID().toString(); + when(context.createConsumer(any(Destination.class), anyString())).thenReturn(consumer); + when(consumer.receive(DEFAULT_TIMEOUT)) + .thenThrow(new JMSRuntimeException("error")) + .thenThrow(new JMSRuntimeException("error")); + // Act + assertThrows(JMSRuntimeException.class, () -> listenerSync.getMessage(messageID, DEFAULT_TIMEOUT, destination)); + // Assert + verify(consumer, times(2)).receive(DEFAULT_TIMEOUT); + } + @Test void shouldHandleTimeoutErrorWithCustomTimeout() { // Arrange @@ -150,4 +212,14 @@ void shouldHandleTimeoutErrorWithCustomTimeoutBySelector() { assertThrows(ReceiveTimeoutException.class, () -> listenerSync.getMessageBySelector("JMSMessageID='" + messageID + "'", DEFAULT_TIMEOUT, queue)); } + @Test + void shouldNotDisconnect() throws JMSException { + // Arrange + MQContextMessageSelectorListenerSync listener = (MQContextMessageSelectorListenerSync) listenerSync.getRandom(); + // Act + listener.disconnect(); + // Assert + verify(consumer, never()).close(); + } + } diff --git a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/sender/MQMultiContextSenderSyncTest.java b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/sender/MQMultiContextSenderSyncTest.java index 4a7b107..472cd3d 100644 --- a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/sender/MQMultiContextSenderSyncTest.java +++ b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/sender/MQMultiContextSenderSyncTest.java @@ -90,4 +90,16 @@ void shouldHandleError() { }); } + @Test + void shouldReconnectWhenHandleError() { + // Arrange + // Assert + assertThrows(JMSRuntimeException.class, () -> { + // Act + senderSync.send(ctx -> { + throw new JMSRuntimeException("Error", "JMSCC0008", new Exception()); + }); + }); + } + } diff --git a/gradle.properties b/gradle.properties index 4da3519..29dfc6b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=1.2.0 +version=1.3.0 springBootVersion=3.1.1 gradleVersionsVersion=0.47.0 owaspDependencyCheckVersion=8.3.1