Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(multi-context-selector): Enable multi context selector listener … #39

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
* @return selector mode: CONTEXT_SHARED | CONTEXT_PER_MESSAGE
* default CONTEXT_SHARED
*/
MQListenerConfig.SelectorMode selectorMode() default MQListenerConfig.SelectorMode.CONTEXT_SHARED;
String selectorMode() default "CONTEXT_SHARED";


}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.MultiContextSharedStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorBuilder;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
Expand Down Expand Up @@ -61,21 +62,18 @@ public static Object createMQReqReply(ReqReply annotation, MQSpringResolver reso
RetryableConfig retryableConfig = resolver.getRetryableConfig();
Destination destination = resolveDestination(annotation, resolver, properties);
if (listenerConfig.getQueueType() == MQListenerConfig.QueueType.FIXED) {
SelectorModeProvider selectorModeProvider = getSelectorModeProvider(annotation.selectorMode());
SelectorModeProvider selectorModeProvider = getSelectorModeProvider(resolver, annotation.selectorMode(),
listenerConfig.getConcurrency());
MQMultiContextMessageSelectorListenerSync selectorListener = new MQMultiContextMessageSelectorListenerSync(
listenerConfig,
healthListener,
retryableConfig,
selectorModeProvider,
queuesContainer);
if (properties.isReactive()) {
ReactiveReplyRouter<Message> router = resolver.resolveReplier();
MQExecutorService executorService = resolver.getMqExecutorService();
SelectorBuilder selector = resolver.getSelectorBuilder();
MQMessageSelectorListener reactiveSelectorListener = new MQMultiContextMessageSelectorListener(
selectorListener,
executorService,
router);
selectorListener);
return new MQRequestReplySelector(
sender,
queuesContainer,
Expand Down Expand Up @@ -137,7 +135,7 @@ private static MQListenerConfig validateAnnotationConfig(ReqReply annotation, MQ
String replyQueue = resolver.resolveString(annotation.replyQueue());
String queueCustomizerName = annotation.queueCustomizer();
MQListenerConfig.QueueType queueType = annotation.queueType();
MQListenerConfig.SelectorMode selectorMode = annotation.selectorMode();
String selectorMode = annotation.selectorMode();

// Resolve dynamic values
int finalConcurrency = resolveConcurrency(Integer.parseInt(concurrencyStr), properties.getInputConcurrency());
Expand Down Expand Up @@ -168,11 +166,17 @@ private static MQListenerConfig validateAnnotationConfig(ReqReply annotation, MQ
}

// Selector
private static SelectorModeProvider getSelectorModeProvider(MQListenerConfig.SelectorMode mode) {
if (MQListenerConfig.SelectorMode.CONTEXT_PER_MESSAGE == mode) {
private static SelectorModeProvider getSelectorModeProvider(MQSpringResolver resolver, String mode, int concurrency) {
if (MQListenerConfig.SelectorMode.CONTEXT_PER_MESSAGE.name().equals(mode)) {
return (factory, context) -> new ContextPerMessageStrategy(factory);
}
return SelectorModeProvider.defaultSelector();
if (MQListenerConfig.SelectorMode.MULTI_CONTEXT_SHARED.name().equals(mode)) {
return (factory, context) -> new MultiContextSharedStrategy(factory, concurrency);
}
if (MQListenerConfig.SelectorMode.CONTEXT_SHARED.name().equals(mode)) {
return SelectorModeProvider.defaultSelector();
}
return resolver.resolveBean(mode, SelectorModeProvider.class);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,52 @@

import co.com.bancolombia.commons.jms.api.MQMessageSelectorListener;
import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

@Log4j2
@AllArgsConstructor
public class MQMultiContextMessageSelectorListener implements MQMessageSelectorListener {
private final MQMessageSelectorListenerSync listenerSync; // MQMultiContextMessageSelectorListenerSync
private final ExecutorService executorService;
private final ReactiveReplyRouter<Message> router;

@Override
public Mono<Message> getMessage(String correlationId) {
return router.wait(correlationId)
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId,
() -> listenerSync.getMessage(correlationId))));
return doAsync(() -> listenerSync.getMessage(correlationId));
}

@Override
public Mono<Message> getMessage(String correlationId, long timeout) {
return router.wait(correlationId, Duration.ofMillis(timeout))
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId,
() -> listenerSync.getMessage(correlationId, timeout))));
return doAsync(() -> listenerSync.getMessage(correlationId, timeout));
}

@Override
public Mono<Message> getMessage(String correlationId, long timeout, Destination destination) {
return router.wait(correlationId, Duration.ofMillis(timeout))
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId,
() -> listenerSync.getMessage(correlationId, timeout, destination))));
return doAsync(() -> listenerSync.getMessage(correlationId, timeout, destination));
}

@Override
public Mono<Message> getMessageBySelector(String selector) {
return router.wait(selector)
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector,
() -> listenerSync.getMessageBySelector(selector))));
return doAsync(() -> listenerSync.getMessageBySelector(selector));
}

@Override
public Mono<Message> getMessageBySelector(String selector, long timeout) {
return router.wait(selector, Duration.ofMillis(timeout))
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector,
() -> listenerSync.getMessageBySelector(selector, timeout))));
return doAsync(() -> listenerSync.getMessageBySelector(selector, timeout));
}

@Override
public Mono<Message> getMessageBySelector(String selector, long timeout, Destination destination) {
return router.wait(selector, Duration.ofMillis(timeout))
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector,
() -> listenerSync.getMessageBySelector(selector, timeout, destination))));
return doAsync(() -> listenerSync.getMessageBySelector(selector, timeout, destination));
}

private void realGetMessageBySelector(String selector, Supplier<Message> supplier) {
try {
Message message = supplier.get();
router.reply(selector, message);
} catch (Exception e) {
log.warn("Error getting message with selector: {}", selector, e);
router.error(selector, e);
}
private Mono<Message> doAsync(Supplier<Message> supplier) {
return Mono.fromSupplier(supplier)
.subscribeOn(Schedulers.boundedElastic());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package co.com.bancolombia.commons.jms.internal.listener.selector.strategy;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Log4j2
@AllArgsConstructor
public class MultiContextSharedStrategy implements SelectorStrategy {
private final List<ContextSharedStrategy> contexts;
private final int concurrency;

public MultiContextSharedStrategy(ConnectionFactory factory, int concurrency) {
this.concurrency = concurrency;
this.contexts = IntStream.range(0, concurrency)
.mapToObj(idx -> new ContextSharedStrategy(factory.createContext()))
.collect(Collectors.toList());
}

@Override
public Message getMessageBySelector(String selector, long timeout, Destination destination) {
return getRandom().getMessageBySelector(selector, timeout, destination);
}

protected ContextSharedStrategy getRandom() {
int selectIndex = (int) (System.currentTimeMillis() % concurrency);
return contexts.get(selectIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ public class MQListenerConfig {
private final MQQueueManagerSetter qmSetter = (ctx, queueName) -> {
}; //NOSONAR
@Builder.Default
private final SelectorMode selectorMode = SelectorMode.CONTEXT_SHARED;
private final String selectorMode = SelectorMode.CONTEXT_SHARED.name();

public enum SelectorMode {
CONTEXT_SHARED,
MULTI_CONTEXT_SHARED,
CONTEXT_PER_MESSAGE
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.MultiContextSharedStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
Expand All @@ -25,7 +25,6 @@
import reactor.test.StepVerifier;

import java.util.UUID;
import java.util.concurrent.Executors;

import static co.com.bancolombia.commons.jms.internal.listener.selector.MQContextMessageSelectorListenerSync.DEFAULT_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -70,11 +69,11 @@ void setup() {
.initialRetryIntervalMillis(1000)
.multiplier(1.5)
.build();
SelectorModeProvider provider = (cf, context) -> new MultiContextSharedStrategy(cf, 2);
MQMessageSelectorListenerSync listenerSync =
new MQMultiContextMessageSelectorListenerSync(config, healthListener,
retryableConfig, SelectorModeProvider.defaultSelector(), container);
listener = new MQMultiContextMessageSelectorListener(listenerSync, Executors.newCachedThreadPool(),
new ReactiveReplyRouter<>());
retryableConfig, provider, container);
listener = new MQMultiContextMessageSelectorListener(listenerSync);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
Expand Down Expand Up @@ -42,11 +43,11 @@ public MQProducerCustomizer producerCustomizer() {
@Bean
@Primary
@SneakyThrows
public ConnectionFactory cachingJmsConnectionFactory(MQConfigurationProperties properties, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers, JmsProperties jmsProperties) {
public ConnectionFactory cachingJmsConnectionFactory(MQConfigurationProperties properties, ObjectProvider<SslBundles> sslBundles, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers, JmsProperties jmsProperties) {
JmsProperties.Cache cacheProperties = jmsProperties.getCache();
properties.setQueueManager("QM1");
properties.setConnName("localhost(1414)");
MQConnectionFactory wrappedConnectionFactory = createConnectionFactory(properties, factoryCustomizers);
MQConnectionFactory wrappedConnectionFactory = createConnectionFactory(properties, sslBundles, factoryCustomizers);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(wrappedConnectionFactory);
connectionFactory.setCacheConsumers(cacheProperties.isConsumers());
connectionFactory.setCacheProducers(cacheProperties.isProducers());
Expand All @@ -56,19 +57,19 @@ public ConnectionFactory cachingJmsConnectionFactory(MQConfigurationProperties p

@Bean
@SneakyThrows
public ConnectionFactory domainB(MQConfigurationProperties properties, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers, JmsProperties jmsProperties) {
public ConnectionFactory domainB(MQConfigurationProperties properties, ObjectProvider<SslBundles> sslBundles, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers, JmsProperties jmsProperties) {
JmsProperties.Cache cacheProperties = jmsProperties.getCache();
properties.setQueueManager("QM2");
properties.setConnName("localhost(1415)");
MQConnectionFactory wrappedConnectionFactory = createConnectionFactory(properties, factoryCustomizers);
MQConnectionFactory wrappedConnectionFactory = createConnectionFactory(properties, sslBundles, factoryCustomizers);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(wrappedConnectionFactory);
connectionFactory.setCacheConsumers(cacheProperties.isConsumers());
connectionFactory.setCacheProducers(cacheProperties.isProducers());
connectionFactory.setSessionCacheSize(cacheProperties.getSessionCacheSize());
return connectionFactory;
}

private static MQConnectionFactory createConnectionFactory(MQConfigurationProperties properties, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers) {
return (new MQConnectionFactoryFactory(properties, factoryCustomizers.getIfAvailable())).createConnectionFactory(MQConnectionFactory.class);
private static MQConnectionFactory createConnectionFactory(MQConfigurationProperties properties, ObjectProvider<SslBundles> sslBundles, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers) {
return (new MQConnectionFactoryFactory(properties, (SslBundles) sslBundles.getIfAvailable(), (List) factoryCustomizers.getIfAvailable())).createConnectionFactory(MQConnectionFactory.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
Expand Down Expand Up @@ -52,11 +53,11 @@ public ReactiveReplyRouter<Result> resultReactiveReplyRouter() {
@Bean
@Primary
@SneakyThrows
public ConnectionFactory cachingJmsConnectionFactory(MQConfigurationProperties properties, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers, JmsProperties jmsProperties) {
public ConnectionFactory cachingJmsConnectionFactory(MQConfigurationProperties properties, ObjectProvider<SslBundles> sslBundles, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers, JmsProperties jmsProperties) {
JmsProperties.Cache cacheProperties = jmsProperties.getCache();
properties.setQueueManager("QM1");
properties.setConnName("localhost(1414)");
MQConnectionFactory wrappedConnectionFactory = createConnectionFactory(properties, factoryCustomizers);
MQConnectionFactory wrappedConnectionFactory = createConnectionFactory(properties, sslBundles, factoryCustomizers);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(wrappedConnectionFactory);
connectionFactory.setCacheConsumers(cacheProperties.isConsumers());
connectionFactory.setCacheProducers(cacheProperties.isProducers());
Expand All @@ -66,20 +67,20 @@ public ConnectionFactory cachingJmsConnectionFactory(MQConfigurationProperties p

@Bean
@SneakyThrows
public ConnectionFactory domainB(MQConfigurationProperties properties, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers, JmsProperties jmsProperties) {
public ConnectionFactory domainB(MQConfigurationProperties properties, ObjectProvider<SslBundles> sslBundles, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers, JmsProperties jmsProperties) {
JmsProperties.Cache cacheProperties = jmsProperties.getCache();
properties.setQueueManager("QM2");
properties.setConnName("localhost(1415)");
MQConnectionFactory wrappedConnectionFactory = createConnectionFactory(properties, factoryCustomizers);
MQConnectionFactory wrappedConnectionFactory = createConnectionFactory(properties, sslBundles, factoryCustomizers);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(wrappedConnectionFactory);
connectionFactory.setCacheConsumers(cacheProperties.isConsumers());
connectionFactory.setCacheProducers(cacheProperties.isProducers());
connectionFactory.setSessionCacheSize(cacheProperties.getSessionCacheSize());
return connectionFactory;
}

private static MQConnectionFactory createConnectionFactory(MQConfigurationProperties properties, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers) {
return (new MQConnectionFactoryFactory(properties, factoryCustomizers.getIfAvailable())).createConnectionFactory(MQConnectionFactory.class);
private static MQConnectionFactory createConnectionFactory(MQConfigurationProperties properties, ObjectProvider<SslBundles> sslBundles, ObjectProvider<List<MQConnectionFactoryCustomizer>> factoryCustomizers) {
return (new MQConnectionFactoryFactory(properties, (SslBundles) sslBundles.getIfAvailable(), (List) factoryCustomizers.getIfAvailable())).createConnectionFactory(MQConnectionFactory.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@

import static co.com.bancolombia.commons.jms.internal.models.MQListenerConfig.QueueType.FIXED;

@ReqReply(requestQueue = "DEV.QUEUE.1", replyQueue = "DEV.QUEUE.2", queueType = FIXED, connectionFactory = "domainB")
@ReqReply(requestQueue = "DEV.QUEUE.1", replyQueue = "DEV.QUEUE.2", queueType = FIXED, selectorMode = "{}", connectionFactory = "domainB")
public interface MyRequestReply extends MQRequestReply {
}
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
version=2.0.2
version=2.1.0
springBootVersion=3.2.3
gradleVersionsVersion=0.47.0
owaspDependencyCheckVersion=9.0.9
mqJMSVersion=3.2.1
mqJMSVersion=3.2.3
jsmAPI=3.1.0
commonsLang=3.14.0
resilience4j=2.2.0
Expand Down
Loading
Loading