Skip to content

Commit

Permalink
Improve selector listener (#32)
Browse files Browse the repository at this point in the history
* Reconnect on some runtime exceptions, use single thread for reconnections, us ThreadPool to listen getBySelector on reactive

* enable same thread autoreconnect and retry for getMessageBySelector
  • Loading branch information
juancgalvis authored Oct 27, 2023
1 parent f392e48 commit 8bf7166
Show file tree
Hide file tree
Showing 26 changed files with 594 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,72 @@
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.MQMultiContextMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextSharedStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.models.RetryableConfig;
import co.com.bancolombia.commons.jms.mq.config.exceptions.MQInvalidListenerException;
import co.com.bancolombia.commons.jms.mq.utils.MQUtils;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Log4j2
public class MQAutoconfigurationSelectorListener {
public static final int MAX_THREADS = 200;
public static final long KEEP_ALIVE_SECONDS = 5L;

@Bean
@ConditionalOnMissingBean(ExecutorService.class)
@Qualifier("selectorExecutorService")
@ConditionalOnProperty(prefix = "commons.jms", name = "reactive", havingValue = "true")
public ExecutorService defaultBySelectorExecutorService() {
return new ThreadPoolExecutor(0, MAX_THREADS, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new SynchronousQueue<>());
}

@Bean
@ConditionalOnMissingBean(SelectorModeProvider.class)
public SelectorModeProvider defaultSelectorModeProvider(@Value("${commons.jms.selector-mode:DEFAULT}") String mode) {
if (MQListenerConfig.SelectorMode.CONTEXT_PER_MESSAGE.name().equals(mode)) {
return (factory, context) -> new ContextPerMessageStrategy(factory);
}
return SelectorModeProvider.defaultSelector();
}

@Bean
@ConditionalOnMissingBean(ReactiveReplyRouter.class)
public ReactiveReplyRouter<Message> selectorReactiveReplyRouter() {
return new ReactiveReplyRouter<>();
}

@Bean
@ConditionalOnMissingBean(MQMessageSelectorListener.class)
@ConditionalOnProperty(prefix = "commons.jms", name = "reactive", havingValue = "true")
public MQMessageSelectorListener defaultMQMessageSelectorListener(
MQMultiContextMessageSelectorListenerSync senderSync) {
return new MQMultiContextMessageSelectorListener(senderSync);
MQMultiContextMessageSelectorListenerSync senderSync,
@Qualifier("selectorExecutorService") ExecutorService bySelectorExecutorService,
ReactiveReplyRouter<Message> router) {
return new MQMultiContextMessageSelectorListener(senderSync, bySelectorExecutorService, router);
}

@Bean
@ConditionalOnMissingBean(MQMultiContextMessageSelectorListenerSync.class)
public MQMultiContextMessageSelectorListenerSync defaultMQMultiContextMessageSelectorListenerSync(
ConnectionFactory cf, @Qualifier("messageSelectorListenerConfig") MQListenerConfig config,
MQHealthListener healthListener, MQProperties properties) {
MQHealthListener healthListener, MQProperties properties, SelectorModeProvider selectorModeProvider) {
if (config.getConcurrency() < 1) {
throw new MQInvalidListenerException("Invalid property commons.jms.input-concurrency, minimum value 1, " +
"you have passed " + config.getConcurrency());
Expand All @@ -46,7 +85,7 @@ public MQMultiContextMessageSelectorListenerSync defaultMQMultiContextMessageSel
.initialRetryIntervalMillis(properties.getInitialRetryIntervalMillis())
.multiplier(properties.getRetryMultiplier())
.build();
return new MQMultiContextMessageSelectorListenerSync(cf, config, healthListener, retryableConfig);
return new MQMultiContextMessageSelectorListenerSync(cf, config, healthListener, retryableConfig, selectorModeProvider);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import co.com.bancolombia.commons.jms.api.MQQueueManagerSetter;
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextPerMessageStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.mq.config.exceptions.MQInvalidListenerException;
import co.com.bancolombia.commons.jms.mq.helper.JmsContextImpl;
Expand Down Expand Up @@ -65,10 +67,12 @@ void shouldHandleError() {
MQProperties properties = new MQProperties();
properties.setMaxRetries(10);
properties.setInitialRetryIntervalMillis(1000);
SelectorModeProvider provider = SelectorModeProvider.defaultSelector();
// Act
// Assert
assertThrows(MQInvalidListenerException.class,
() -> configurator.defaultMQMultiContextMessageSelectorListenerSync(null, config, healthListener, properties));
() -> configurator.defaultMQMultiContextMessageSelectorListenerSync(null, config, healthListener,
properties, provider));
}

@Test
Expand All @@ -85,7 +89,8 @@ void shouldCreateDefaultMessageSelectorListener() {
when(context.createQueue(anyString())).thenReturn(queue);
// Act
MQMessageSelectorListenerSync listener = configurator.
defaultMQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener, properties);
defaultMQMultiContextMessageSelectorListenerSync(connectionFactory, config, healthListener,
properties, SelectorModeProvider.defaultSelector());
// Assert
assertNotNull(listener);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package co.com.bancolombia.commons.jms.internal.listener;

import lombok.AllArgsConstructor;
import lombok.Builder;

import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import jakarta.jms.TemporaryQueue;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;

@Log4j2
@Builder
@AllArgsConstructor
public class MQConnectionListener implements Runnable {
Expand All @@ -23,7 +24,11 @@ public void run() {
Thread.currentThread().setName("mq-listener-temporary-queue-" + sequence + "[" + shortDestinationName() + "]");
session.createConsumer(destination)//NOSONAR
.setMessageListener(listener);
} catch (JMSRuntimeException e) {
log.warn("JMSRuntimeException in MQConnectionListener", e);
throw e;
} catch (JMSException ex) {
log.warn("JMSException in MQConnectionListener", ex);
throw new JMSRuntimeException(ex.getMessage(), ex.getErrorCode(), ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.reconnect.AbstractJMSReconnectable;
import co.com.bancolombia.commons.jms.utils.MQQueueUtils;
import lombok.experimental.SuperBuilder;
import lombok.extern.log4j.Log4j2;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.MessageListener;
import jakarta.jms.Queue;
import lombok.experimental.SuperBuilder;
import lombok.extern.log4j.Log4j2;

@Log4j2
@SuperBuilder
Expand All @@ -23,6 +23,8 @@ public class MQContextListener extends AbstractJMSReconnectable<MQContextListene
private final MQListenerConfig config;
private final MQQueuesContainer container;
private final MQBrokerUtils utils;
private JMSConsumer consumer;
private JMSContext context;

@Override
protected String name() {
Expand All @@ -32,12 +34,22 @@ protected String name() {
return finalName;
}

@Override
protected void disconnect() throws JMSException {
if (consumer != null) {
consumer.close();
}
if (context != null) {
context.close();
}
}

@Override
protected MQContextListener connect() {
log.info("Starting listener {}", getProcess());
JMSContext context = connectionFactory.createContext();
context = connectionFactory.createContext();
Destination destination = MQQueueUtils.setupFixedQueue(context, config);
JMSConsumer consumer = context.createConsumer(destination);//NOSONAR
consumer = context.createConsumer(destination);//NOSONAR
container.registerQueue(config.getQueue(), (Queue) destination);
utils.setQueueManager(context, (Queue) destination);
consumer.setMessageListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.reconnect.AbstractJMSReconnectable;
import co.com.bancolombia.commons.jms.utils.MQQueueUtils;
import lombok.experimental.SuperBuilder;
import lombok.extern.log4j.Log4j2;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.MessageListener;
import jakarta.jms.TemporaryQueue;
import lombok.experimental.SuperBuilder;
import lombok.extern.log4j.Log4j2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -24,21 +24,30 @@ public class MQMultiConnectionListener extends AbstractJMSReconnectable<MQMultiC
private final MQQueuesContainer container;
private final MQListenerConfig config;
private ExecutorService service;
private Connection connection;

@Override
protected String name() {
return "mq-lister-temporary-queue-[" + config.getTempQueueAlias() + "]";
}


@Override
@SuppressWarnings("resource")
protected MQMultiConnectionListener connect() {
log.info("Starting listener {}", getProcess());
protected void disconnect() throws JMSException {
if (service != null && !service.isTerminated() && !service.isShutdown()) {
service.shutdown();
}
if (connection != null) {
connection.close();
}
}

@Override
@SuppressWarnings("resource")
protected MQMultiConnectionListener connect() {
log.info("Starting listener {}", getProcess());
try {
Connection connection = connectionFactory.createConnection();//NOSONAR
connection = connectionFactory.createConnection();//NOSONAR
connection.setExceptionListener(this);
TemporaryQueue destination = MQQueueUtils.setupTemporaryQueue(connection.createSession(), config);
container.registerQueue(config.getTempQueueAlias(), destination);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package co.com.bancolombia.commons.jms.internal.listener.selector;

import co.com.bancolombia.commons.jms.api.MQMessageSelectorListenerSync;
import co.com.bancolombia.commons.jms.api.exceptions.ReceiveTimeoutException;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.ContextSharedStrategy;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorModeProvider;
import co.com.bancolombia.commons.jms.internal.listener.selector.strategy.SelectorStrategy;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.reconnect.AbstractJMSReconnectable;
import co.com.bancolombia.commons.jms.utils.MQQueueUtils;
import lombok.experimental.SuperBuilder;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.Message;
import lombok.experimental.SuperBuilder;
import lombok.extern.log4j.Log4j2;

@Log4j2
Expand All @@ -20,22 +22,37 @@ public class MQContextMessageSelectorListenerSync extends AbstractJMSReconnectab
public static final long DEFAULT_TIMEOUT = 5000L;
private final ConnectionFactory connectionFactory;
private final MQListenerConfig config;
private final SelectorModeProvider selectorModeProvider;
private SelectorStrategy strategy;
private Destination destination;
private JMSContext context;

@Override
protected String name() {
String[] parts = this.toString().split("\\.");
return parts[parts.length - 1];
}

@Override
protected void disconnect() throws JMSException {
// do not disconnect to avoid another thread exceptions
}

@Override
protected MQContextMessageSelectorListenerSync connect() {
log.info("Starting listener {}", getProcess());
context = connectionFactory.createContext();
context.setExceptionListener(this);
destination = MQQueueUtils.setupFixedQueue(context, config);
log.info("Listener {} started successfully", getProcess());
long handled = System.currentTimeMillis();
synchronized (this) {
if (handled > lastSuccess.get()) {
log.info("Starting listener {}", getProcess());
JMSContext context = connectionFactory.createContext();
context.setExceptionListener(this);
destination = MQQueueUtils.setupFixedQueue(context, config);
strategy = selectorModeProvider.get(connectionFactory, context);
log.info("Listener {} started successfully", getProcess());
lastSuccess.set(System.currentTimeMillis());
} else {
log.warn("Reconnection ignored because already connected");
}
}
return this;
}

Expand Down Expand Up @@ -63,12 +80,25 @@ public Message getMessageBySelector(String selector, long timeout) {
}

public Message getMessageBySelector(String selector, long timeout, Destination destination) {
try (JMSConsumer consumer = context.createConsumer(destination, selector)) {
Message message = consumer.receive(timeout);
if (message == null) {
throw new ReceiveTimeoutException("Message not received in " + timeout);
return getMessageBySelector(selector, timeout, destination, true);
}

protected Message getMessageBySelector(String selector, long timeout, Destination destination, boolean retry) {
try {
return strategy.getMessageBySelector(selector, timeout, destination);
} catch (JMSRuntimeException e) {
// Connection is broken
if (strategy instanceof ContextSharedStrategy && e.getCause() != null && e.getCause().getMessage() != null
&& e.getCause().getMessage().contains("CONNECTION_BROKEN")) {
connect();
}
if (retry) {
log.warn("Retrying because: {}", e.getMessage());
return getMessageBySelector(selector, timeout, destination, false);
} else {
log.warn("Retry has failed with {}, this will rethrow", e.getMessage());
throw e;
}
return message;
}
}

Expand Down
Loading

0 comments on commit 8bf7166

Please sign in to comment.