Skip to content

Commit

Permalink
Optimize getMessageType and getMethodParameter methods in DefaultRock…
Browse files Browse the repository at this point in the history
…etMQListenerContainer
  • Loading branch information
xuziyang committed Apr 15, 2024
1 parent c1dbf87 commit 2954c37
Showing 1 changed file with 16 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConversionException;
Expand All @@ -66,6 +67,7 @@
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;

@SuppressWarnings("WeakerAccess")
public class DefaultRocketMQListenerContainer implements InitializingBean,
Expand Down Expand Up @@ -553,61 +555,36 @@ private Object doConvertMessage(MessageExt messageExt) {

private MethodParameter getMethodParameter() {
Class<?> targetClass;
Class<?> consumerInterface;
if (rocketMQListener != null) {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
consumerInterface = RocketMQListener.class;
} else {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
consumerInterface = RocketMQReplyListener.class;
}
Type messageType = this.getMessageType();
Class clazz = null;
if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) {
clazz = (Class) ((ParameterizedType) messageType).getRawType();
} else if (messageType instanceof Class) {
clazz = (Class) messageType;
} else {
throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
}
try {
final Method method = targetClass.getMethod("onMessage", clazz);
return new MethodParameter(method, 0);
} catch (NoSuchMethodException e) {
e.printStackTrace();
throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported");
}
ResolvableType resolvableType = ResolvableType.forClass(targetClass).as(consumerInterface);
Class<?> clazz = resolvableType.getGeneric().resolve();
Method onMessage = ReflectionUtils.findMethod(targetClass, "onMessage", clazz);
return MethodParameter.forExecutable(onMessage, 0);
}


private Type getMessageType() {
Class<?> targetClass;
Class<?> consumerInterface;
if (rocketMQListener != null) {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
consumerInterface = RocketMQListener.class;
} else {
targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
consumerInterface = RocketMQReplyListener.class;
}
Type matchedGenericInterface = null;
while (Objects.nonNull(targetClass)) {
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
if (type instanceof ParameterizedType &&
(Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) {
matchedGenericInterface = type;
break;
}
}
}
targetClass = targetClass.getSuperclass();
}
if (Objects.isNull(matchedGenericInterface)) {
return Object.class;
}

Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
ResolvableType resolvableType = ResolvableType.forClass(targetClass).as(consumerInterface);
return resolvableType.getGeneric().getType();
}


private void initRocketMQPushConsumer() throws MQClientException {
if (rocketMQListener == null && rocketMQReplyListener == null) {
throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
Expand Down

0 comments on commit 2954c37

Please sign in to comment.