From 74b28c503682c10a19911f7363c7619ce93be520 Mon Sep 17 00:00:00 2001 From: lijun Date: Mon, 14 Feb 2022 17:19:08 +0800 Subject: [PATCH 1/3] [ISSUE #424] Fix NPE when using ExtRocketMQTemplateConfiguration register RocketMQTemplate --- .../ExtRocketMQProducerTemplate.java | 14 +++++ .../consumer/ExtProducerHotfixConsumer.java | 54 +++++++++++++++++++ .../ExtProducerResetConfiguration.java | 33 ++++++------ 3 files changed, 84 insertions(+), 17 deletions(-) create mode 100644 rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java create mode 100644 rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java new file mode 100644 index 00000000..22a0eb24 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java @@ -0,0 +1,14 @@ +package org.apache.rocketmq.samples.springboot; + +import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; +import org.apache.rocketmq.spring.core.RocketMQTemplate; + +/** + * Class Name is ExtRocketMQProducerTemplate + * + * @author LiJun + * Created on 2022/2/14 17:11 + */ +@ExtRocketMQTemplateConfiguration(group = "my-group2") +public class ExtRocketMQProducerTemplate extends RocketMQTemplate { +} \ No newline at end of file diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java new file mode 100644 index 00000000..771aa696 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java @@ -0,0 +1,54 @@ +package org.apache.rocketmq.samples.springboot.consumer; + +import org.apache.rocketmq.samples.springboot.ExtRocketMQProducerTemplate; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +/** + * Class Name is ExtProducerHotfixConsumer + * + * @author LiJun + * Created on 2022/2/14 16:58 + */ +@Service +@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_group_c") +public class ExtProducerHotfixConsumer implements RocketMQListener { + + @Resource(name = "extRocketMQProducerTemplate") + private ExtRocketMQProducerTemplate rocketMQTemplate; + + /** + * 模拟消费到消息马上发消息 + */ + @Override + public void onMessage(String message) { + System.out.println("consumer msg=" + message); + sendMessage("test_c_topic", "C ->" + message); + } + + /** + * 模拟启动时 发消息 验证producer 是否为空 + */ + @PostConstruct + public void init() { + for (int i = 0; i < 100; i++) { + sendMessage("test_topic", i + ""); + } + } + + public void sendMessage(String topic, String msg) { + try { + System.out.println("send start message=" + msg + ", producer=" + rocketMQTemplate.getProducer()); + rocketMQTemplate.send(topic, new GenericMessage<>(msg)); + System.out.println("send end message=" + msg + ", producer=" + rocketMQTemplate.getProducer()); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index ed9fb7d6..cd891d19 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -17,9 +17,6 @@ package org.apache.rocketmq.spring.autoconfigure; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; @@ -30,7 +27,7 @@ import org.springframework.aop.framework.AopProxyUtils; import org.springframework.aop.scope.ScopedProxyUtils; import org.springframework.beans.BeansException; -import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.support.BeanDefinitionValidationException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -41,7 +38,7 @@ import org.springframework.util.StringUtils; @Configuration -public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton { +public class ExtProducerResetConfiguration implements ApplicationContextAware, BeanPostProcessor { private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class); private ConfigurableApplicationContext applicationContext; @@ -65,12 +62,13 @@ public void setApplicationContext(ApplicationContext applicationContext) throws } @Override - public void afterSingletonsInstantiated() { - Map beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class) - .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - beans.forEach(this::registerTemplate); + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + Class clazz = AopProxyUtils.ultimateTargetClass(bean); + if (clazz.isAnnotationPresent(ExtRocketMQTemplateConfiguration.class) + && !ScopedProxyUtils.isScopedTarget(beanName)) { + this.registerTemplate(beanName, bean); + } + return BeanPostProcessor.super.postProcessBeforeInitialization(bean, beanName); } private void registerTemplate(String beanName, Object bean) { @@ -87,12 +85,13 @@ private void registerTemplate(String beanName, Object bean) { DefaultMQProducer mqProducer = createProducer(annotation); // Set instanceName same as the beanName mqProducer.setInstanceName(beanName); - try { - mqProducer.start(); - } catch (MQClientException e) { - throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}", - beanName), e); - } + // rocketmqTempalte afterProperties 会启动 + // try { + // mqProducer.start(); + // } catch (MQClientException e) { + // throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}", + // beanName), e); + // } RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean; rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); From df7e093d3a9ba9d5d9bf799b7e2f57bf863b90bf Mon Sep 17 00:00:00 2001 From: lijun Date: Fri, 17 Jun 2022 16:21:12 +0800 Subject: [PATCH 2/3] update comment --- .../springboot/consumer/ExtProducerHotfixConsumer.java | 4 ++-- .../spring/autoconfigure/ExtProducerResetConfiguration.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java index 771aa696..98fd2997 100644 --- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java @@ -23,7 +23,7 @@ public class ExtProducerHotfixConsumer implements RocketMQListener { private ExtRocketMQProducerTemplate rocketMQTemplate; /** - * 模拟消费到消息马上发消息 + * Simulate consumption and send a message as soon as the message arrives */ @Override public void onMessage(String message) { @@ -32,7 +32,7 @@ public void onMessage(String message) { } /** - * 模拟启动时 发消息 验证producer 是否为空 + * Send message when startup */ @PostConstruct public void init() { diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index cd891d19..4424456e 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -85,7 +85,7 @@ private void registerTemplate(String beanName, Object bean) { DefaultMQProducer mqProducer = createProducer(annotation); // Set instanceName same as the beanName mqProducer.setInstanceName(beanName); - // rocketmqTempalte afterProperties 会启动 + // rocketmqTemplate start at afterProperties // try { // mqProducer.start(); // } catch (MQClientException e) { From c97b77dbaac973116921f8a2f3b670225a38aef9 Mon Sep 17 00:00:00 2001 From: lijun Date: Fri, 12 Aug 2022 10:13:15 +0800 Subject: [PATCH 3/3] update comment --- .../springboot/ExtRocketMQProducerTemplate.java | 17 +++++++++++++++++ .../consumer/ExtProducerHotfixConsumer.java | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java index 22a0eb24..7c2944a3 100644 --- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.samples.springboot; import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java index 98fd2997..a858c83d 100644 --- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.samples.springboot.consumer; import org.apache.rocketmq.samples.springboot.ExtRocketMQProducerTemplate;