Skip to content

Commit

Permalink
fix(performance): Return to specific scheduler used in v1.3.0 for mes…
Browse files Browse the repository at this point in the history
…sage selector (#61)
  • Loading branch information
juancgalvis authored Jul 4, 2024
1 parent ec9e124 commit 0b3d56d
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import co.com.bancolombia.commons.jms.mq.config.health.MQListenerHealthIndicator;
import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService;
import co.com.bancolombia.commons.jms.mq.utils.MQUtils;
import co.com.bancolombia.commons.jms.utils.MQQueueUtils;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorBuilder;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import co.com.bancolombia.commons.jms.mq.config.senders.MQSenderContainer;
import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.api.model.JmsMessage;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy;
Expand Down Expand Up @@ -163,8 +164,9 @@ private static MQRequestReplySelector fixedQueueWithMessageSelector(ReqReply ann
selectorModeProvider,
queuesContainer);
SelectorBuilder selector = resolver.getSelectorBuilder();
MQExecutorService executorService = resolver.getMqExecutorService();
MQMessageSelectorListener reactiveSelectorListener = new MQMultiContextMessageSelectorListener(
selectorListener);
selectorListener, executorService);
return new MQRequestReplySelector(
sender,
queuesContainer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorBuilder;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import com.ibm.mq.jakarta.jms.MQQueue;
import jakarta.jms.JMSContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorBuilder;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import co.com.bancolombia.commons.jms.mq.config.senders.MQSenderContainer;
import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.ConnectionFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import co.com.bancolombia.commons.jms.mq.config.MQProperties;
import co.com.bancolombia.commons.jms.mq.config.MQSpringResolver;
import co.com.bancolombia.commons.jms.mq.config.senders.MQSenderContainer;
import co.com.bancolombia.commons.jms.mq.listeners.MQExecutorService;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQExecutorService;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSConsumer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package co.com.bancolombia.commons.jms.mq.listeners;
package co.com.bancolombia.commons.jms.internal.listener.selector;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
@AllArgsConstructor
public class MQMultiContextMessageSelectorListener implements MQMessageSelectorListener {
private final MQMessageSelectorListenerSync listenerSync; // MQMultiContextMessageSelectorListenerSync
private final MQExecutorService executorService;

@Override
public Mono<Message> getMessage(String correlationId) {
Expand Down Expand Up @@ -48,6 +49,6 @@ public Mono<Message> getMessageBySelector(String selector, long timeout, Destina

private Mono<Message> doAsync(Supplier<Message> supplier) {
return Mono.fromSupplier(supplier)
.subscribeOn(Schedulers.boundedElastic());
.subscribeOn(Schedulers.fromExecutor(executorService));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import reactor.test.StepVerifier;

import java.util.UUID;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import static co.com.bancolombia.commons.jms.internal.listener.selector.MQContextMessageSelectorListenerSync.DEFAULT_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -37,6 +39,8 @@

@ExtendWith(MockitoExtension.class)
class MQMultiContextMessageSelectorListenerTest {
public static final int MAX_THREADS = 200;
public static final long KEEP_ALIVE_SECONDS = 5L;
@Mock
private ConnectionFactory connectionFactory;
@Mock
Expand Down Expand Up @@ -73,7 +77,10 @@ void setup() {
MQMessageSelectorListenerSync listenerSync =
new MQMultiContextMessageSelectorListenerSync(config, healthListener,
retryableConfig, provider, container);
listener = new MQMultiContextMessageSelectorListener(listenerSync);
MQExecutorService executorService =
new MQExecutorService(0, MAX_THREADS, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new SynchronousQueue<>());
listener = new MQMultiContextMessageSelectorListener(listenerSync, executorService);
}

@Test
Expand Down

0 comments on commit 0b3d56d

Please sign in to comment.