diff --git a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java index c05ccff..b4a022c 100644 --- a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java +++ b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java @@ -7,6 +7,9 @@ import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListener; import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListenerSync; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextSharedStrategy; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider; import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; import co.com.bancolombia.commons.jms.mq.config.exceptions.MQInvalidListenerException; @@ -16,6 +19,7 @@ import jakarta.jms.Message; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -39,6 +43,15 @@ public ExecutorService defaultBySelectorExecutorService() { new SynchronousQueue<>()); } + @Bean + @ConditionalOnMissingBean(SelectorModeProvider.class) + public SelectorModeProvider defaultSelectorModeProvider(@Value("${commons.jms.selector-mode:DEFAULT}") String mode) { + if (MQListenerConfig.SelectorMode.CONTEXT_PER_MESSAGE.name().equals(mode)) { + return (factory, context) -> new ContextPerMessageStrategy(factory); + } + return SelectorModeProvider.defaultSelector(); + } + @Bean @ConditionalOnMissingBean(ReactiveReplyRouter.class) public ReactiveReplyRouter selectorReactiveReplyRouter() { @@ -59,7 +72,7 @@ public MQMessageSelectorListener defaultMQMessageSelectorListener( @ConditionalOnMissingBean(MQMultiContextMessageSelectorListenerSync.class) public MQMultiContextMessageSelectorListenerSync defaultMQMultiContextMessageSelectorListenerSync( ConnectionFactory cf, @Qualifier("messageSelectorListenerConfig") MQListenerConfig config, - MQHealthListener healthListener, MQProperties properties) { + MQHealthListener healthListener, MQProperties properties, SelectorModeProvider selectorModeProvider) { if (config.getConcurrency() < 1) { throw new MQInvalidListenerException("Invalid property commons.jms.input-concurrency, minimum value 1, " + "you have passed " + config.getConcurrency()); @@ -72,7 +85,7 @@ public MQMultiContextMessageSelectorListenerSync defaultMQMultiContextMessageSel .initialRetryIntervalMillis(properties.getInitialRetryIntervalMillis()) .multiplier(properties.getRetryMultiplier()) .build(); - return new MQMultiContextMessageSelectorListenerSync(cf, config, healthListener, retryableConfig); + return new MQMultiContextMessageSelectorListenerSync(cf, config, healthListener, retryableConfig, selectorModeProvider); } @Bean diff --git a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListenerTest.java b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListenerTest.java index 4947488..1520864 100644 --- a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListenerTest.java +++ b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListenerTest.java @@ -4,6 +4,8 @@ import co.com.bancolombia.commons.jms.api.MQQueueManagerSetter; import co.com.bancolombia.commons.jms.api.MQQueuesContainer; import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider; import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.mq.config.exceptions.MQInvalidListenerException; import co.com.bancolombia.commons.jms.mq.helper.JmsContextImpl; @@ -68,7 +70,8 @@ void shouldHandleError() { // Act // Assert assertThrows(MQInvalidListenerException.class, - () -> configurator.defaultMQMultiContextMessageSelectorListenerSync(null, config, healthListener, properties)); + () -> configurator.defaultMQMultiContextMessageSelectorListenerSync(null, config, healthListener, + properties, SelectorModeProvider.defaultSelector())); } @Test @@ -85,7 +88,8 @@ void shouldCreateDefaultMessageSelectorListener() { when(context.createQueue(anyString())).thenReturn(queue); // Act MQMessageSelectorListenerSync listener = configurator. - defaultMQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, properties); + defaultMQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, + properties, SelectorModeProvider.defaultSelector()); // Assert assertNotNull(listener); } diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQContextMessageSelectorListenerSync.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQContextMessageSelectorListenerSync.java index ca9aa2d..21c6f36 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQContextMessageSelectorListenerSync.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQContextMessageSelectorListenerSync.java @@ -1,13 +1,14 @@ package co.com.bancolombia.commons.jms.internal.listener.selector; import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync; -import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextSharedStrategy; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorStrategy; import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.internal.reconnect.AbstractJMSReconnectable; import co.com.bancolombia.commons.jms.utils.MQQueueUtils; import jakarta.jms.ConnectionFactory; import jakarta.jms.Destination; -import jakarta.jms.JMSConsumer; import jakarta.jms.JMSContext; import jakarta.jms.JMSException; import jakarta.jms.JMSRuntimeException; @@ -21,8 +22,9 @@ public class MQContextMessageSelectorListenerSync extends AbstractJMSReconnectab public static final long DEFAULT_TIMEOUT = 5000L; private final ConnectionFactory connectionFactory; private final MQListenerConfig config; + private final SelectorModeProvider selectorModeProvider; + private SelectorStrategy strategy; private Destination destination; - private JMSContext context; @Override protected String name() { @@ -40,9 +42,10 @@ protected MQContextMessageSelectorListenerSync connect() { synchronized ("connectSelectorListener") { if (handled > lastSuccess.get()) { log.info("Starting listener {}", getProcess()); - context = connectionFactory.createContext(); + JMSContext context = connectionFactory.createContext(); context.setExceptionListener(this); destination = MQQueueUtils.setupFixedQueue(context, config); + strategy = selectorModeProvider.get(connectionFactory, context); log.info("Listener {} started successfully", getProcess()); lastSuccess.set(System.currentTimeMillis()); } else { @@ -80,16 +83,12 @@ public Message getMessageBySelector(String selector, long timeout, Destination d } public Message getMessageBySelector(String selector, long timeout, Destination destination, boolean retry) { - try (JMSConsumer consumer = context.createConsumer(destination, selector)) { - log.info("Waiting message with selector {}", selector); - Message message = consumer.receive(timeout); - if (message == null) { - throw new ReceiveTimeoutException("Message not received in " + timeout); - } - return message; + try { + return strategy.getMessageBySelector(selector, timeout, destination); } catch (JMSRuntimeException e) { // Connection is broken - if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().contains("CONNECTION_BROKEN")) { + if (strategy instanceof ContextSharedStrategy && e.getCause() != null && e.getCause().getMessage() != null + && e.getCause().getMessage().contains("CONNECTION_BROKEN")) { connect(); } if (retry) { diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSync.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSync.java index 71f54f1..a9b35d8 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSync.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSync.java @@ -2,6 +2,7 @@ import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync; import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider; import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; import jakarta.jms.ConnectionFactory; @@ -18,13 +19,15 @@ public class MQMultiContextMessageSelectorListenerSync implements MQMessageSelec private final MQHealthListener healthListener; private List adapterList; private final RetryableConfig retryableConfig; + private final SelectorModeProvider selectorModeProvider; public MQMultiContextMessageSelectorListenerSync(ConnectionFactory connectionFactory, MQListenerConfig config, - MQHealthListener healthListener, RetryableConfig retryableConfig) { + MQHealthListener healthListener, RetryableConfig retryableConfig, SelectorModeProvider selectorModeProvider) { this.connectionFactory = connectionFactory; this.config = config; this.healthListener = healthListener; this.retryableConfig = retryableConfig; + this.selectorModeProvider = selectorModeProvider; start(); } @@ -35,6 +38,7 @@ public void start() { .config(config) .healthListener(healthListener) .retryableConfig(retryableConfig) + .selectorModeProvider(selectorModeProvider) .build() .call()) .collect(Collectors.toList()); diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/ContextPerMessageStrategy.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/ContextPerMessageStrategy.java new file mode 100644 index 0000000..7b09c5b --- /dev/null +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/ContextPerMessageStrategy.java @@ -0,0 +1,30 @@ +package co.com.bancolombia.commons.jms.internal.listener.selector.strategy; + +import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.Destination; +import jakarta.jms.JMSConsumer; +import jakarta.jms.JMSContext; +import jakarta.jms.Message; +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +@AllArgsConstructor +public class ContextPerMessageStrategy implements SelectorStrategy { + private final ConnectionFactory factory; + + @Override + public Message getMessageBySelector(String selector, long timeout, Destination destination) { + try (JMSContext context = factory.createContext()) { + try (JMSConsumer consumer = context.createConsumer(destination, selector)) { + log.info("Waiting message with selector {}", selector); + Message message = consumer.receive(timeout); + if (message == null) { + throw new ReceiveTimeoutException("Message not received in " + timeout); + } + return message; + } + } + } +} diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/ContextSharedStrategy.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/ContextSharedStrategy.java new file mode 100644 index 0000000..2d4c09b --- /dev/null +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/ContextSharedStrategy.java @@ -0,0 +1,27 @@ +package co.com.bancolombia.commons.jms.internal.listener.selector.strategy; + +import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException; +import jakarta.jms.Destination; +import jakarta.jms.JMSConsumer; +import jakarta.jms.JMSContext; +import jakarta.jms.Message; +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +@AllArgsConstructor +public class ContextSharedStrategy implements SelectorStrategy { + private final JMSContext context; + + @Override + public Message getMessageBySelector(String selector, long timeout, Destination destination) { + try (JMSConsumer consumer = context.createConsumer(destination, selector)) { + log.info("Waiting message with selector {}", selector); + Message message = consumer.receive(timeout); + if (message == null) { + throw new ReceiveTimeoutException("Message not received in " + timeout); + } + return message; + } + } +} diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/SelectorModeProvider.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/SelectorModeProvider.java new file mode 100644 index 0000000..9187f40 --- /dev/null +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/SelectorModeProvider.java @@ -0,0 +1,12 @@ +package co.com.bancolombia.commons.jms.internal.listener.selector.strategy; + +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSContext; + +public interface SelectorModeProvider { + SelectorStrategy get(ConnectionFactory factory, JMSContext context); + + static SelectorModeProvider defaultSelector(){ + return (factory, context) -> new ContextSharedStrategy(context); + } +} diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/SelectorStrategy.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/SelectorStrategy.java new file mode 100644 index 0000000..c5784f1 --- /dev/null +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/strategy/SelectorStrategy.java @@ -0,0 +1,8 @@ +package co.com.bancolombia.commons.jms.internal.listener.selector.strategy; + +import jakarta.jms.Destination; +import jakarta.jms.Message; + +public interface SelectorStrategy { + Message getMessageBySelector(String selector, long timeout, Destination destination); +} diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/MQListenerConfig.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/MQListenerConfig.java index c4ce5ea..8417c2b 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/MQListenerConfig.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/MQListenerConfig.java @@ -22,5 +22,14 @@ public class MQListenerConfig { @Builder.Default private final int maxRetries = -1; //NOSONAR @Builder.Default - private final MQQueueManagerSetter qmSetter = (ctx, queue) -> {}; //NOSONAR + private final MQQueueManagerSetter qmSetter = (ctx, queue) -> { + }; //NOSONAR + + @Builder.Default + SelectorMode selectorMode = SelectorMode.CONTEXT_SHARED; + + public enum SelectorMode { + CONTEXT_SHARED, + CONTEXT_PER_MESSAGE + } } diff --git a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSyncTest.java b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSyncTest.java index 720d17a..e39019a 100644 --- a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSyncTest.java +++ b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerSyncTest.java @@ -3,6 +3,8 @@ import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync; import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider; import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; import org.junit.jupiter.api.BeforeEach; @@ -38,26 +40,44 @@ class MQMultiContextMessageSelectorListenerSyncTest { private MQMessageSelectorListenerSync listenerSync; + private final MQListenerConfig config = MQListenerConfig + .builder() + .concurrency(1) + .queue("QUEUE") + .build(); + private final RetryableConfig retryableConfig = RetryableConfig + .builder() + .maxRetries(10) + .initialRetryIntervalMillis(1000) + .multiplier(1.5) + .build(); + @BeforeEach void setup() { when(connectionFactory.createContext()).thenReturn(context); when(context.createQueue(anyString())).thenReturn(queue); - MQListenerConfig config = MQListenerConfig - .builder() - .concurrency(1) - .queue("QUEUE") - .build(); - RetryableConfig retryableConfig = RetryableConfig - .builder() - .maxRetries(10) - .initialRetryIntervalMillis(1000) - .multiplier(1.5) - .build(); - listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, retryableConfig); + listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, + retryableConfig, SelectorModeProvider.defaultSelector()); } + @Test + void shouldGetMessageWithContextPerMessage() { + listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, + retryableConfig, (factory, ignored) -> new ContextPerMessageStrategy(factory)); + // Arrange + String messageID = UUID.randomUUID().toString(); + when(context.createConsumer(any(Destination.class), anyString())).thenReturn(consumer); + when(consumer.receive(DEFAULT_TIMEOUT)).thenReturn(message); + // Act + Message receivedMessage = listenerSync.getMessage(messageID); + // Assert + assertEquals(message, receivedMessage); + verify(consumer, times(1)).receive(DEFAULT_TIMEOUT); + } @Test void shouldGetMessage() { + listenerSync = new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, + retryableConfig, SelectorModeProvider.defaultSelector()); // Arrange String messageID = UUID.randomUUID().toString(); when(context.createConsumer(any(Destination.class), anyString())).thenReturn(consumer); diff --git a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerTest.java b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerTest.java index 00d47a5..cfc2097 100644 --- a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerTest.java +++ b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerTest.java @@ -4,6 +4,7 @@ import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync; import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException; +import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider; import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter; @@ -66,7 +67,8 @@ void setup() { .multiplier(1.5) .build(); MQMessageSelectorListenerSync listenerSync = - new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, retryableConfig); + new MQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, + retryableConfig, SelectorModeProvider.defaultSelector()); listener = new MQMultiContextMessageSelectorListener(listenerSync, Executors.newCachedThreadPool(), new ReactiveReplyRouter<>()); } diff --git a/examples/mq-reactive-selector/src/main/resources/application.yaml b/examples/mq-reactive-selector/src/main/resources/application.yaml index de39747..8e68c8a 100644 --- a/examples/mq-reactive-selector/src/main/resources/application.yaml +++ b/examples/mq-reactive-selector/src/main/resources/application.yaml @@ -16,6 +16,7 @@ commons: input-queue: "DEV.QUEUE.2" input-queue-alias: "" input-queue-set-queue-manager: true # enable it to set queue manager using a temporary queue + selector-mode: CONTEXT_PER_MESSAGE # CONTEXT_SHARED | CONTEXT_PER_MESSAGE ibm: mq: channel: "DEV.APP.SVRCONN"