diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index fb7762e9..9bcdeb55 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -58,6 +58,7 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.SmartLifecycle; import org.springframework.core.MethodParameter; +import org.springframework.core.ResolvableType; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConversionException; @@ -66,6 +67,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.MimeTypeUtils; +import org.springframework.util.ReflectionUtils; @SuppressWarnings("WeakerAccess") public class DefaultRocketMQListenerContainer implements InitializingBean, @@ -553,61 +555,36 @@ private Object doConvertMessage(MessageExt messageExt) { private MethodParameter getMethodParameter() { Class targetClass; + Class consumerInterface; if (rocketMQListener != null) { targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); + consumerInterface = RocketMQListener.class; } else { targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener); + consumerInterface = RocketMQReplyListener.class; } - Type messageType = this.getMessageType(); - Class clazz = null; - if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) { - clazz = (Class) ((ParameterizedType) messageType).getRawType(); - } else if (messageType instanceof Class) { - clazz = (Class) messageType; - } else { - throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported"); - } - try { - final Method method = targetClass.getMethod("onMessage", clazz); - return new MethodParameter(method, 0); - } catch (NoSuchMethodException e) { - e.printStackTrace(); - throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported"); - } + ResolvableType resolvableType = ResolvableType.forClass(targetClass).as(consumerInterface); + Class clazz = resolvableType.getGeneric().resolve(); + Method onMessage = ReflectionUtils.findMethod(targetClass, "onMessage", clazz); + return MethodParameter.forExecutable(onMessage, 0); } + private Type getMessageType() { Class targetClass; + Class consumerInterface; if (rocketMQListener != null) { targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); + consumerInterface = RocketMQListener.class; } else { targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener); + consumerInterface = RocketMQReplyListener.class; } - Type matchedGenericInterface = null; - while (Objects.nonNull(targetClass)) { - Type[] interfaces = targetClass.getGenericInterfaces(); - if (Objects.nonNull(interfaces)) { - for (Type type : interfaces) { - if (type instanceof ParameterizedType && - (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) { - matchedGenericInterface = type; - break; - } - } - } - targetClass = targetClass.getSuperclass(); - } - if (Objects.isNull(matchedGenericInterface)) { - return Object.class; - } - - Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments(); - if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { - return actualTypeArguments[0]; - } - return Object.class; + ResolvableType resolvableType = ResolvableType.forClass(targetClass).as(consumerInterface); + return resolvableType.getGeneric().getType(); } + private void initRocketMQPushConsumer() throws MQClientException { if (rocketMQListener == null && rocketMQReplyListener == null) { throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");