Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create temp queue by thread, automatic one reconnecting retry when se… #33

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

public interface MQQueuesContainer {
void registerQueue(String key, Queue queue);
void registerToQueueGroup(String groupId, Queue queue);
void unregisterFromQueueGroup(String groupId, Queue queue);

Queue get(String alias);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ public class MQAutoconfiguration {
@ConditionalOnMissingBean(MQQueueCustomizer.class)
public MQQueueCustomizer defaultMQQueueCustomizer() {
return queue -> {
MQQueue customized = (MQQueue) queue;
customized.setProperty(WMQ_TARGET_CLIENT, "1");
customized.setProperty(WMQ_MQMD_READ_ENABLED, "true");
customized.setProperty(WMQ_MQMD_WRITE_ENABLED, "true");
customized.setPutAsyncAllowed(WMQ_PUT_ASYNC_ALLOWED_ENABLED);
customized.setReadAheadAllowed(WMQ_READ_AHEAD_ALLOWED_ENABLED);
if (queue instanceof MQQueue) {
MQQueue customized = (MQQueue) queue;
customized.setProperty(WMQ_TARGET_CLIENT, "1");
customized.setProperty(WMQ_MQMD_READ_ENABLED, "true");
customized.setProperty(WMQ_MQMD_WRITE_ENABLED, "true");
customized.setPutAsyncAllowed(WMQ_PUT_ASYNC_ALLOWED_ENABLED);
customized.setReadAheadAllowed(WMQ_READ_AHEAD_ALLOWED_ENABLED);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import co.com.bancolombia.commons.jms.api.MQRequestReply;
import co.com.bancolombia.commons.jms.exceptions.RelatedMessageNotFoundException;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.Queue;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import reactor.core.publisher.Mono;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import java.time.Duration;

@Log4j2
Expand Down Expand Up @@ -60,7 +61,11 @@ public Mono<Message> requestReply(MQMessageCreator messageCreator, Duration time
private MQMessageCreator defaultCreator(String message) {
return ctx -> {
Message jmsMessage = ctx.createTextMessage(message);
jmsMessage.setJMSReplyTo(container.get(replyQueue));
Queue queue = container.get(replyQueue);
jmsMessage.setJMSReplyTo(queue);
if (log.isInfoEnabled() && queue != null) {
log.info("Setting queue for reply to: {}", queue.getQueueName());
}
return jmsMessage;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import co.com.bancolombia.commons.jms.mq.config.MQProperties;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import com.ibm.msg.client.jakarta.wmq.compat.jms.internal.JMSC;
import jakarta.jms.JMSContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -48,9 +50,7 @@ class InterfaceComponentProxyFactoryBeanTest {
@Mock
private MQHealthListener healthListener;
@Mock
private Connection connection;
@Mock
private Session session;
private JMSContext context;
@Mock
private TemporaryQueue queue;
@Mock
Expand Down Expand Up @@ -112,9 +112,8 @@ void shouldInstanceTheBean() throws JMSException {
return null;
});
// Listener mocks
when(connectionFactory.createConnection()).thenReturn(connection);
when(connection.createSession()).thenReturn(session);
when(session.createTemporaryQueue()).thenReturn(queue);
when(connectionFactory.createContext()).thenReturn(context);
when(context.createTemporaryQueue()).thenReturn(queue);
// Sender Mock
when(sender.send(any(Destination.class), any(MQMessageCreator.class))).thenReturn(Mono.empty());
// Act
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import jakarta.jms.JMSException;
import jakarta.jms.MessageListener;
import jakarta.jms.Queue;
import jakarta.jms.TemporaryQueue;
import lombok.experimental.SuperBuilder;
import lombok.extern.log4j.Log4j2;

Expand All @@ -23,38 +24,80 @@ public class MQContextListener extends AbstractJMSReconnectable<MQContextListene
private final MQListenerConfig config;
private final MQQueuesContainer container;
private final MQBrokerUtils utils;
private final boolean temporary;
private JMSConsumer consumer;
private JMSContext context;
private TemporaryQueue tempQueue;

@Override
protected String name() {
String[] parts = Thread.currentThread().getName().split("-");
String finalName = "mq-listener-fixed-queue-" + parts[parts.length - 1] + "[" + config.getQueue() + "]";
String finalName;
if (temporary) {
finalName = "mq-listener-tmp-queue-" + parts[parts.length - 1] + "[" + config.getTempQueueAlias() + "]";
} else {
finalName = "mq-listener-fixed-queue-" + parts[parts.length - 1] + "[" + config.getQueue() + "]";
}
Thread.currentThread().setName(finalName);
return finalName;
}

@Override
protected void disconnect() throws JMSException {
protected void disconnect() {
if (temporary) {
container.unregisterFromQueueGroup(config.getTempQueueAlias(), tempQueue);
}
if (consumer != null) {
consumer.close();
try {
consumer.close();
} catch (Exception ignored) {
// ignore because disconnection
}
}
if (tempQueue != null) {
try {
tempQueue.delete();
} catch (Exception ignored) {
// ignore because disconnection
}
}
if (context != null) {
context.close();
try {
context.close();
} catch (Exception ignored) {
// ignore because disconnection
}
}
}

@Override
protected MQContextListener connect() {
log.info("Starting listener {}", getProcess());
context = connectionFactory.createContext();
Destination destination = MQQueueUtils.setupFixedQueue(context, config);
consumer = context.createConsumer(destination);//NOSONAR
container.registerQueue(config.getQueue(), (Queue) destination);
utils.setQueueManager(context, (Queue) destination);
consumer.setMessageListener(listener);
context.setExceptionListener(this);
log.info("Listener {} started successfully", getProcess());
if (temporary) {
tempQueue = MQQueueUtils.setupTemporaryQueue(context, config);
container.registerToQueueGroup(config.getTempQueueAlias(), tempQueue);
consumer = context.createConsumer(tempQueue);//NOSONAR
consumer.setMessageListener(listener);
log.info("Listener {} started successfully with queue: {}", getProcess(), shortDestinationName());
} else {
Destination destination = MQQueueUtils.setupFixedQueue(context, config);
utils.setQueueManager(context, (Queue) destination);
container.registerQueue(config.getQueue(), (Queue) destination);
consumer = context.createConsumer(destination);//NOSONAR
consumer.setMessageListener(listener);
log.info("Listener {} started successfully with queue: {}", getProcess(), config.getQueue());
}
return this;
}

private String shortDestinationName() {
try {
return tempQueue.getQueueName().split("\\?")[0];
} catch (JMSException e) {
log.warn("Error getting temp queue name", e);
return "Error getting queue name";
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected String name() {
}

@Override
protected void disconnect() throws JMSException {
protected void disconnect() {
// do not disconnect to avoid another thread exceptions
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class AbstractJMSReconnectable<T> implements ExceptionListener,

protected abstract T connect();

protected abstract void disconnect() throws JMSException;
protected abstract void disconnect();

protected abstract String name();

Expand All @@ -37,17 +37,25 @@ public abstract class AbstractJMSReconnectable<T> implements ExceptionListener,
public T call() {
this.process = name();
healthListener.onInit(process);
return start();
}

protected T start() {
try {
this.disconnect();
} catch (Exception e) {
log.info("Error disconnecting but ignore it because is in reconnection process", e);
}
try {
T result = connect();
markAsStarted();
return result;
} catch (JMSRuntimeException e) {
log.warn("JMSRuntimeException in {}", process, e);
} catch (Exception e) {
log.warn("Exception in {}", process, e);
throw e;
}
}


public void onException(JMSRuntimeException exception) {
onException(new JMSException(exception.getMessage(), exception.getErrorCode(),
new Exception(exception.getCause())));
Expand All @@ -71,15 +79,7 @@ private void reconnect() {
Thread.currentThread().setName("reconnection-" + process);
try {
log.warn("Starting reconnection for {}", process);
RetryableTask.runWithRetries(process, retryableConfig, () -> {
try {
this.disconnect();
} catch (Exception e) {
log.info("Error disconnecting but ignore it because is in reconnection process", e);
}
this.connect();
});
markAsStarted();
RetryableTask.runWithRetries(process, retryableConfig, this::start);
log.warn("Reconnection successful for {}", process);
} catch (JMSRuntimeException ex) {
log.warn("Reconnection error for {}", process, ex);
Expand Down
Loading