diff --git a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguration.java b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguration.java index 015e5d3..98f621f 100644 --- a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguration.java +++ b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguration.java @@ -11,7 +11,7 @@ import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; import co.com.bancolombia.commons.jms.mq.config.health.MQListenerHealthIndicator; -import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService; +import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService; import co.com.bancolombia.commons.jms.mq.utils.MQUtils; import co.com.bancolombia.commons.jms.utils.MQQueueUtils; import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp; diff --git a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQSpringResolver.java b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQSpringResolver.java index ad926ed..e2bc853 100644 --- a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQSpringResolver.java +++ b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQSpringResolver.java @@ -7,7 +7,7 @@ import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorBuilder; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; import co.com.bancolombia.commons.jms.mq.config.senders.MQSenderContainer; -import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService; +import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService; import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter; import jakarta.jms.ConnectionFactory; import jakarta.jms.Message; diff --git a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/factory/MQReqReplyFactory.java b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/factory/MQReqReplyFactory.java index 3d5f429..a2e7e3e 100644 --- a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/factory/MQReqReplyFactory.java +++ b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/factory/MQReqReplyFactory.java @@ -8,6 +8,7 @@ import co.com.bancolombia.commons.jms.api.MQQueuesContainer; import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; import co.com.bancolombia.commons.jms.api.model.JmsMessage; +import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService; import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListener; import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListenerSync; import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy; @@ -163,8 +164,9 @@ private static MQRequestReplySelector fixedQueueWithMessageSelector(ReqReply ann selectorModeProvider, queuesContainer); SelectorBuilder selector = resolver.getSelectorBuilder(); + MQExecutorService executorService = resolver.getMqExecutorService(); MQMessageSelectorListener reactiveSelectorListener = new MQMultiContextMessageSelectorListener( - selectorListener); + selectorListener, executorService); return new MQRequestReplySelector( sender, queuesContainer, diff --git a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguracionTest.java b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguracionTest.java index 6b8e7a9..fe939f9 100644 --- a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguracionTest.java +++ b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguracionTest.java @@ -9,7 +9,7 @@ import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener; import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorBuilder; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; -import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService; +import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService; import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter; import com.ibm.mq.jakarta.jms.MQQueue; import jakarta.jms.JMSContext; diff --git a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQSpringResolverTest.java b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQSpringResolverTest.java index a7c575d..271f9a9 100644 --- a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQSpringResolverTest.java +++ b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/MQSpringResolverTest.java @@ -8,7 +8,7 @@ import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorBuilder; import co.com.bancolombia.commons.jms.internal.models.RetryableConfig; import co.com.bancolombia.commons.jms.mq.config.senders.MQSenderContainer; -import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService; +import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService; import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp; import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter; import jakarta.jms.ConnectionFactory; diff --git a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/utils/MQFactoryBeanTest.java b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/utils/MQFactoryBeanTest.java index 48c8c30..7925721 100644 --- a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/utils/MQFactoryBeanTest.java +++ b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/config/utils/MQFactoryBeanTest.java @@ -13,7 +13,7 @@ import co.com.bancolombia.commons.jms.mq.config.MQProperties; import co.com.bancolombia.commons.jms.mq.config.MQSpringResolver; import co.com.bancolombia.commons.jms.mq.config.senders.MQSenderContainer; -import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService; +import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService; import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp; import jakarta.jms.ConnectionFactory; import jakarta.jms.JMSConsumer; diff --git a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/listeners/MQExecutorService.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQExecutorService.java similarity index 86% rename from commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/listeners/MQExecutorService.java rename to commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQExecutorService.java index 4117f87..57ba9fe 100644 --- a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/listeners/MQExecutorService.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQExecutorService.java @@ -1,4 +1,4 @@ -package co.com.bancolombia.commons.jms.mq.listeners; +package co.com.bancolombia.commons.jms.internal.listener.selector; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListener.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListener.java index 19650d4..b95b5ed 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListener.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListener.java @@ -15,6 +15,7 @@ @AllArgsConstructor public class MQMultiContextMessageSelectorListener implements MQMessageSelectorListener { private final MQMessageSelectorListenerSync listenerSync; // MQMultiContextMessageSelectorListenerSync + private final MQExecutorService executorService; @Override public Mono getMessage(String correlationId) { @@ -48,6 +49,6 @@ public Mono getMessageBySelector(String selector, long timeout, Destina private Mono doAsync(Supplier supplier) { return Mono.fromSupplier(supplier) - .subscribeOn(Schedulers.boundedElastic()); + .subscribeOn(Schedulers.fromExecutor(executorService)); } } diff --git a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerTest.java b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerTest.java index 11efbdb..206c728 100644 --- a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerTest.java +++ b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/internal/listener/selector/MQMultiContextMessageSelectorListenerTest.java @@ -25,6 +25,8 @@ import reactor.test.StepVerifier; import java.util.UUID; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; import static co.com.bancolombia.commons.jms.internal.listener.selector.MQContextMessageSelectorListenerSync.DEFAULT_TIMEOUT; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -37,6 +39,8 @@ @ExtendWith(MockitoExtension.class) class MQMultiContextMessageSelectorListenerTest { + public static final int MAX_THREADS = 200; + public static final long KEEP_ALIVE_SECONDS = 5L; @Mock private ConnectionFactory connectionFactory; @Mock @@ -73,7 +77,10 @@ void setup() { MQMessageSelectorListenerSync listenerSync = new MQMultiContextMessageSelectorListenerSync(config, healthListener, retryableConfig, provider, container); - listener = new MQMultiContextMessageSelectorListener(listenerSync); + MQExecutorService executorService = + new MQExecutorService(0, MAX_THREADS, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new SynchronousQueue<>()); + listener = new MQMultiContextMessageSelectorListener(listenerSync, executorService); } @Test