diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java new file mode 100644 index 00000000..7c2944a3 --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/ExtRocketMQProducerTemplate.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.samples.springboot; + +import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; +import org.apache.rocketmq.spring.core.RocketMQTemplate; + +/** + * Class Name is ExtRocketMQProducerTemplate + * + * @author LiJun + * Created on 2022/2/14 17:11 + */ +@ExtRocketMQTemplateConfiguration(group = "my-group2") +public class ExtRocketMQProducerTemplate extends RocketMQTemplate { +} \ No newline at end of file diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java new file mode 100644 index 00000000..a858c83d --- /dev/null +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ExtProducerHotfixConsumer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.samples.springboot.consumer; + +import org.apache.rocketmq.samples.springboot.ExtRocketMQProducerTemplate; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +/** + * Class Name is ExtProducerHotfixConsumer + * + * @author LiJun + * Created on 2022/2/14 16:58 + */ +@Service +@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_group_c") +public class ExtProducerHotfixConsumer implements RocketMQListener { + + @Resource(name = "extRocketMQProducerTemplate") + private ExtRocketMQProducerTemplate rocketMQTemplate; + + /** + * Simulate consumption and send a message as soon as the message arrives + */ + @Override + public void onMessage(String message) { + System.out.println("consumer msg=" + message); + sendMessage("test_c_topic", "C ->" + message); + } + + /** + * Send message when startup + */ + @PostConstruct + public void init() { + for (int i = 0; i < 100; i++) { + sendMessage("test_topic", i + ""); + } + } + + public void sendMessage(String topic, String msg) { + try { + System.out.println("send start message=" + msg + ", producer=" + rocketMQTemplate.getProducer()); + rocketMQTemplate.send(topic, new GenericMessage<>(msg)); + System.out.println("send end message=" + msg + ", producer=" + rocketMQTemplate.getProducer()); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index 9468f2e9..1238ea6c 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -17,9 +17,6 @@ package org.apache.rocketmq.spring.autoconfigure; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; @@ -30,7 +27,7 @@ import org.springframework.aop.framework.AopProxyUtils; import org.springframework.aop.scope.ScopedProxyUtils; import org.springframework.beans.BeansException; -import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.support.BeanDefinitionValidationException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -41,7 +38,7 @@ import org.springframework.util.StringUtils; @Configuration -public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton { +public class ExtProducerResetConfiguration implements ApplicationContextAware, BeanPostProcessor { private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class); private ConfigurableApplicationContext applicationContext; @@ -65,12 +62,13 @@ public void setApplicationContext(ApplicationContext applicationContext) throws } @Override - public void afterSingletonsInstantiated() { - Map beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class) - .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - beans.forEach(this::registerTemplate); + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + Class clazz = AopProxyUtils.ultimateTargetClass(bean); + if (clazz.isAnnotationPresent(ExtRocketMQTemplateConfiguration.class) + && !ScopedProxyUtils.isScopedTarget(beanName)) { + this.registerTemplate(beanName, bean); + } + return BeanPostProcessor.super.postProcessBeforeInitialization(bean, beanName); } private void registerTemplate(String beanName, Object bean) { @@ -85,12 +83,15 @@ private void registerTemplate(String beanName, Object bean) { validate(annotation, genericApplicationContext); DefaultMQProducer mqProducer = createProducer(annotation); - try { - mqProducer.start(); - } catch (MQClientException e) { - throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}", - beanName), e); - } + // Set instanceName same as the beanName + mqProducer.setInstanceName(beanName); + // rocketmqTemplate start at afterProperties + // try { + // mqProducer.start(); + // } catch (MQClientException e) { + // throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}", + // beanName), e); + // } RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean; rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());