Skip to content

Commit

Permalink
[ISSUE #632 ] Fix NPE caused by using ExtRocketMQTemplateConfiguratio…
Browse files Browse the repository at this point in the history
…n annotation extension to send messages
  • Loading branch information
lilinjiang authored Mar 3, 2024
1 parent 404085e commit b03f552
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.OrderComparator;
import org.springframework.core.annotation.AnnotationUtils;

Expand All @@ -32,14 +33,16 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {

private ApplicationContext applicationContext;

private AnnotationEnhancer enhancer;

private RocketMQMessageListenerContainerRegistrar listenerContainerRegistrar;

private boolean running = false;

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
Expand All @@ -58,6 +61,34 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
return bean;
}

@Override
public int getPhase() {
return Integer.MAX_VALUE - 2000;
}

@Override
public void start() {
if (!isRunning()) {
this.setRunning(true);
listenerContainerRegistrar.startContainer();
}
}

@Override
public void stop() {

}

public void setRunning(boolean running) {
this.running = running;
}


@Override
public boolean isRunning() {
return running;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class RocketMQMessageListenerContainerRegistrar implements ApplicationContextAware {
Expand All @@ -52,6 +54,8 @@ public class RocketMQMessageListenerContainerRegistrar implements ApplicationCon

private final RocketMQMessageConverter rocketMQMessageConverter;

private final List<DefaultRocketMQListenerContainer> containers = new ArrayList<>();

public RocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter,
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
Expand Down Expand Up @@ -97,18 +101,25 @@ public void registerContainer(String beanName, Object bean, RocketMQMessageListe
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}

containers.add(container);

log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

public void startContainer() {
for (DefaultRocketMQListenerContainer container : containers) {
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
}
}

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
Expand Down

0 comments on commit b03f552

Please sign in to comment.