Skip to content

Commit

Permalink
create temp queue by thread, automatic one reconnecting retry when se…
Browse files Browse the repository at this point in the history
…nd a message
  • Loading branch information
juancgalvis committed Oct 30, 2023
1 parent 8bf7166 commit c1edd6b
Show file tree
Hide file tree
Showing 23 changed files with 301 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

public interface MQQueuesContainer {
void registerQueue(String key, Queue queue);
void registerToQueueGroup(String groupId, Queue queue);
void unregisterFromQueueGroup(String groupId, Queue queue);

Queue get(String alias);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ public class MQAutoconfiguration {
@ConditionalOnMissingBean(MQQueueCustomizer.class)
public MQQueueCustomizer defaultMQQueueCustomizer() {
return queue -> {
MQQueue customized = (MQQueue) queue;
customized.setProperty(WMQ_TARGET_CLIENT, "1");
customized.setProperty(WMQ_MQMD_READ_ENABLED, "true");
customized.setProperty(WMQ_MQMD_WRITE_ENABLED, "true");
customized.setPutAsyncAllowed(WMQ_PUT_ASYNC_ALLOWED_ENABLED);
customized.setReadAheadAllowed(WMQ_READ_AHEAD_ALLOWED_ENABLED);
if (queue instanceof MQQueue) {
MQQueue customized = (MQQueue) queue;
customized.setProperty(WMQ_TARGET_CLIENT, "1");
customized.setProperty(WMQ_MQMD_READ_ENABLED, "true");
customized.setProperty(WMQ_MQMD_WRITE_ENABLED, "true");
customized.setPutAsyncAllowed(WMQ_PUT_ASYNC_ALLOWED_ENABLED);
customized.setReadAheadAllowed(WMQ_READ_AHEAD_ALLOWED_ENABLED);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import co.com.bancolombia.commons.jms.api.MQRequestReply;
import co.com.bancolombia.commons.jms.exceptions.RelatedMessageNotFoundException;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.Queue;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import reactor.core.publisher.Mono;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import java.time.Duration;

@Log4j2
Expand Down Expand Up @@ -60,7 +61,11 @@ public Mono<Message> requestReply(MQMessageCreator messageCreator, Duration time
private MQMessageCreator defaultCreator(String message) {
return ctx -> {
Message jmsMessage = ctx.createTextMessage(message);
jmsMessage.setJMSReplyTo(container.get(replyQueue));
Queue queue = container.get(replyQueue);
jmsMessage.setJMSReplyTo(queue);
if (log.isInfoEnabled() && queue != null) {
log.info("Setting queue for reply to: {}", queue.getQueueName());
}
return jmsMessage;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import co.com.bancolombia.commons.jms.mq.config.MQProperties;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import com.ibm.msg.client.jakarta.wmq.compat.jms.internal.JMSC;
import jakarta.jms.JMSContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -48,9 +50,7 @@ class InterfaceComponentProxyFactoryBeanTest {
@Mock
private MQHealthListener healthListener;
@Mock
private Connection connection;
@Mock
private Session session;
private JMSContext context;
@Mock
private TemporaryQueue queue;
@Mock
Expand Down Expand Up @@ -112,9 +112,8 @@ void shouldInstanceTheBean() throws JMSException {
return null;
});
// Listener mocks
when(connectionFactory.createConnection()).thenReturn(connection);
when(connection.createSession()).thenReturn(session);
when(session.createTemporaryQueue()).thenReturn(queue);
when(connectionFactory.createContext()).thenReturn(context);
when(context.createTemporaryQueue()).thenReturn(queue);
// Sender Mock
when(sender.send(any(Destination.class), any(MQMessageCreator.class))).thenReturn(Mono.empty());
// Act
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.MessageListener;
import jakarta.jms.Queue;
import lombok.experimental.SuperBuilder;
Expand All @@ -35,12 +34,20 @@ protected String name() {
}

@Override
protected void disconnect() throws JMSException {
protected void disconnect() {
if (consumer != null) {
consumer.close();
try {
consumer.close();
} catch (Exception ignored) {
// ignore because disconnection
}
}
if (context != null) {
context.close();
try {
context.close();
} catch (Exception ignored) {
// ignore because disconnection
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package co.com.bancolombia.commons.jms.internal.listener;

import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.internal.reconnect.AbstractJMSReconnectable;
import co.com.bancolombia.commons.jms.utils.MQQueueUtils;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.MessageListener;
import jakarta.jms.TemporaryQueue;
import lombok.experimental.SuperBuilder;
import lombok.extern.log4j.Log4j2;

@Log4j2
@SuperBuilder
public class MQContextTemporaryListener extends AbstractJMSReconnectable<MQContextTemporaryListener> {
private final ConnectionFactory connectionFactory;
private final MessageListener listener;
private final MQListenerConfig config;
private final MQQueuesContainer container;
private JMSConsumer consumer;
private JMSContext context;
private TemporaryQueue tempQueue;

@Override
protected String name() {
String[] parts = Thread.currentThread().getName().split("-");
String finalName = "mq-listener-tmp-queue-" + parts[parts.length - 1] + "[" + config.getTempQueueAlias() + "]";
Thread.currentThread().setName(finalName);
return finalName;
}

@Override
protected void disconnect() {
container.unregisterFromQueueGroup(config.getTempQueueAlias(), tempQueue);
if (consumer != null) {
try {
consumer.close();
} catch (Exception ignored) {
// ignore because disconnection
}
}
if (context != null) {
try {
context.close();
} catch (Exception ignored) {
// ignore because disconnection
}
}
}

@Override
protected MQContextTemporaryListener connect() {
log.info("Starting listener {}", getProcess());
context = connectionFactory.createContext();
context.setExceptionListener(this);
tempQueue = MQQueueUtils.setupTemporaryQueue(context, config);
consumer = context.createConsumer(tempQueue);//NOSONAR
consumer.setMessageListener(listener);
container.registerToQueueGroup(config.getTempQueueAlias(), tempQueue);
log.info("Listener {} started successfully with queue: {}", getProcess(), shortDestinationName());
return this;
}

private String shortDestinationName() {
try {
return tempQueue.getQueueName().split("\\?")[0];
} catch (JMSException e) {
log.warn("Error getting temp queue name", e);
return "Error getting queue name";
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected String name() {
}

@Override
protected void disconnect() throws JMSException {
protected void disconnect() {
// do not disconnect to avoid another thread exceptions
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class AbstractJMSReconnectable<T> implements ExceptionListener,

protected abstract T connect();

protected abstract void disconnect() throws JMSException;
protected abstract void disconnect();

protected abstract String name();

Expand All @@ -37,17 +37,25 @@ public abstract class AbstractJMSReconnectable<T> implements ExceptionListener,
public T call() {
this.process = name();
healthListener.onInit(process);
return start();
}

protected T start() {
try {
this.disconnect();
} catch (Exception e) {
log.info("Error disconnecting but ignore it because is in reconnection process", e);
}
try {
T result = connect();
markAsStarted();
return result;
} catch (JMSRuntimeException e) {
log.warn("JMSRuntimeException in {}", process, e);
} catch (Exception e) {
log.warn("Exception in {}", process, e);
throw e;
}
}


public void onException(JMSRuntimeException exception) {
onException(new JMSException(exception.getMessage(), exception.getErrorCode(),
new Exception(exception.getCause())));
Expand All @@ -71,15 +79,7 @@ private void reconnect() {
Thread.currentThread().setName("reconnection-" + process);
try {
log.warn("Starting reconnection for {}", process);
RetryableTask.runWithRetries(process, retryableConfig, () -> {
try {
this.disconnect();
} catch (Exception e) {
log.info("Error disconnecting but ignore it because is in reconnection process", e);
}
this.connect();
});
markAsStarted();
RetryableTask.runWithRetries(process, retryableConfig, this::start);
log.warn("Reconnection successful for {}", process);
} catch (JMSRuntimeException ex) {
log.warn("Reconnection error for {}", process, ex);
Expand Down
Loading

0 comments on commit c1edd6b

Please sign in to comment.