From 85d54b205aef0744ecd7a0a5d9dcdc882b2bc27f Mon Sep 17 00:00:00 2001 From: Dung Ta Van Date: Sun, 7 Mar 2021 15:15:37 +0700 Subject: [PATCH] add response handlers --- quick-rpc-client/pom.xml | 2 +- .../quick/rpc/client/QuickRpcClient.java | 33 ++++++++++- .../client/annotation/RpcResponseHandled.java | 23 ++++++++ .../client/handler/RpcResponseHandler.java | 9 +++ .../client/handler/RpcResponseHandlers.java | 57 +++++++++++++++++++ .../util/RpcResponseHandledAnnotations.java | 20 +++++++ .../quick/rpc/server/QuickRpcServer.java | 4 +- ...java => RpcRequestHandledAnnotations.java} | 4 +- 8 files changed, 146 insertions(+), 6 deletions(-) create mode 100644 quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/annotation/RpcResponseHandled.java create mode 100644 quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/handler/RpcResponseHandler.java create mode 100644 quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/handler/RpcResponseHandlers.java create mode 100644 quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/util/RpcResponseHandledAnnotations.java rename quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/util/{RpcHandlerAnnotations.java => RpcRequestHandledAnnotations.java} (83%) diff --git a/quick-rpc-client/pom.xml b/quick-rpc-client/pom.xml index c14f16e..6d5ec6f 100644 --- a/quick-rpc-client/pom.xml +++ b/quick-rpc-client/pom.xml @@ -13,7 +13,7 @@ http://maven.apache.org - 1.0.3 + 1.0.4 diff --git a/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/QuickRpcClient.java b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/QuickRpcClient.java index 97726a7..25fed0c 100644 --- a/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/QuickRpcClient.java +++ b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/QuickRpcClient.java @@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import com.tvd12.ezyfox.bean.EzyBeanContext; import com.tvd12.ezyfox.binding.EzyBindingContext; import com.tvd12.ezyfox.binding.EzyBindingContextBuilder; import com.tvd12.ezyfox.binding.EzyMarshaller; @@ -54,6 +55,8 @@ import com.tvd12.quick.rpc.client.exception.RpcClientMaxCapacityException; import com.tvd12.quick.rpc.client.exception.RpcClientNotConnectedException; import com.tvd12.quick.rpc.client.exception.RpcErrorException; +import com.tvd12.quick.rpc.client.handler.RpcResponseHandler; +import com.tvd12.quick.rpc.client.handler.RpcResponseHandlers; import com.tvd12.quick.rpc.client.net.RpcSocketAddress; import com.tvd12.quick.rpc.client.net.RpcURI; import com.tvd12.quick.rpc.core.constant.RpcInternalCommands; @@ -81,6 +84,7 @@ public class QuickRpcClient extends EzyLoggable implements EzyCloseable { protected final EzyBindingContext bindingContext; protected final Map commands; protected final Consumer messageSender; + protected final RpcResponseHandlers responseHandlers; protected final LinkedList transporters; protected final RpcRequestIdGenerator requestIdGenerator; protected final Map> futureMap; @@ -111,6 +115,7 @@ protected QuickRpcClient(Builder builder) { this.commands = new ConcurrentHashMap<>(builder.commands); this.serverAddresses = new LinkedList<>(builder.serverAddresses); this.requestIdGenerator = new RpcRequestIdGenerator(); + this.responseHandlers = builder.responseHandlers; this.connect(); this.messageSender = newMessageSender(); } @@ -410,7 +415,15 @@ public void handle(EzyApp app, EzyArray data) { String cmd = data.get(0, String.class); EzyFutureMap futures = futureMap.get(cmd); if(futures == null) { - logger.warn("has no future map to command: {}", cmd); + RpcResponseHandler responseHandler = responseHandlers.getHandler(cmd); + if(responseHandler != null) { + String id = data.get(1, String.class); + Object result = data.get(2); + responseHandler.handle(new RpcResponse(unmarshaller, cmd, id, result, false)); + } + else { + logger.warn("has no future map to command: {}", cmd); + } return; } String id = data.get(1, String.class); @@ -464,12 +477,19 @@ public static class Builder protected int threadPoolSize = 8; protected int processEventInterval = 3; protected int defaultRequestTimeout = 5000; + protected EzyBeanContext beanContext; protected EzyBindingContext bindingContext; protected EzyBindingContextBuilder bindingContextBuilder; + protected RpcResponseHandlers responseHandlers; + protected RpcResponseHandlers.Builder responseHandlersBuilder; protected Set packagesToScan = new HashSet<>(); protected Map commands = new HashMap<>(); protected RpcClientType clientType = RpcClientType.SINGLE; protected LinkedList serverAddresses = new LinkedList<>(); + + { + responseHandlersBuilder = RpcResponseHandlers.builder(); + } public Builder name(String name) { this.name = name; @@ -559,6 +579,15 @@ public Builder bindingContextBuilder(EzyBindingContextBuilder bindingContextBuil return this; } + public Builder onResponseRecevied(String cmd, Consumer consumer) { + return addResponseHandler(cmd, r -> consumer.accept(r)); + } + + public Builder addResponseHandler(String cmd, RpcResponseHandler handler) { + this.responseHandlersBuilder.addHandler(cmd, handler); + return this; + } + @Override public QuickRpcClient build() { if(serverAddresses.isEmpty()) @@ -599,6 +628,8 @@ public QuickRpcClient build() { } bindingContext = bindingContextBuilder.build(); } + responseHandlers = responseHandlersBuilder.build(); + if(serverAddresses.isEmpty()) serverAddresses.add(new RpcSocketAddress("127.0.0.1", 3005)); return newProduct(); diff --git a/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/annotation/RpcResponseHandled.java b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/annotation/RpcResponseHandled.java new file mode 100644 index 0000000..e2797b6 --- /dev/null +++ b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/annotation/RpcResponseHandled.java @@ -0,0 +1,23 @@ +package com.tvd12.quick.rpc.client.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * + * @author tavandung12 + * + */ +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.TYPE }) +public @interface RpcResponseHandled { + + String value() default ""; + + String command() default ""; + +} diff --git a/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/handler/RpcResponseHandler.java b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/handler/RpcResponseHandler.java new file mode 100644 index 0000000..dda3ad9 --- /dev/null +++ b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/handler/RpcResponseHandler.java @@ -0,0 +1,9 @@ +package com.tvd12.quick.rpc.client.handler; + +import com.tvd12.quick.rpc.client.entity.RpcResponse; + +public interface RpcResponseHandler { + + void handle(RpcResponse response); + +} diff --git a/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/handler/RpcResponseHandlers.java b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/handler/RpcResponseHandlers.java new file mode 100644 index 0000000..3394ae7 --- /dev/null +++ b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/handler/RpcResponseHandlers.java @@ -0,0 +1,57 @@ +package com.tvd12.quick.rpc.client.handler; + +import java.util.HashMap; +import java.util.Map; + +import com.tvd12.ezyfox.builder.EzyBuilder; + +public class RpcResponseHandlers { + + protected final Map handlers; + + protected RpcResponseHandlers(Builder builder) { + this.handlers = new HashMap<>(builder.handlers); + } + + public RpcResponseHandler getHandler(String command) { + return handlers.get(command); + } + + @Override + public String toString() { + return "RpcResponseHandlers(" + handlers + ")"; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder implements EzyBuilder { + + protected Map handlers; + + public Builder() { + this.handlers = new HashMap<>(); + } + + public Builder addHandler(String cmd, RpcResponseHandler handler) { + handlers.put(cmd, handler); + return this; + } + + public Builder addHandlers(Map handlers) { + for(String cmd : handlers.keySet()) { + RpcResponseHandler handler = handlers.get(cmd); + addHandler(cmd, handler); + } + return this; + } + + @Override + public RpcResponseHandlers build() { + return new RpcResponseHandlers(this); + } + + } + +} diff --git a/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/util/RpcResponseHandledAnnotations.java b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/util/RpcResponseHandledAnnotations.java new file mode 100644 index 0000000..b29239a --- /dev/null +++ b/quick-rpc-client/src/main/java/com/tvd12/quick/rpc/client/util/RpcResponseHandledAnnotations.java @@ -0,0 +1,20 @@ +package com.tvd12.quick.rpc.client.util; + +import com.tvd12.ezyfox.io.EzyStrings; +import com.tvd12.quick.rpc.client.annotation.RpcResponseHandled; + +public final class RpcResponseHandledAnnotations { + + private RpcResponseHandledAnnotations() {} + + public static String getCommand(RpcResponseHandled annotation) { + if(EzyStrings.isNoContent(annotation.value())) + return annotation.command(); + return annotation.value(); + } + + public static String getCommand(Class rpcHandlerClass) { + return getCommand(rpcHandlerClass.getAnnotation(RpcResponseHandled.class)); + } + +} diff --git a/quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/QuickRpcServer.java b/quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/QuickRpcServer.java index d40fc5c..bc66bd2 100644 --- a/quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/QuickRpcServer.java +++ b/quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/QuickRpcServer.java @@ -74,7 +74,7 @@ import com.tvd12.quick.rpc.server.setting.QuickRpcSettings; import com.tvd12.quick.rpc.server.transport.RpcAppEntryLoader; import com.tvd12.quick.rpc.server.transport.RpcPluginEntryLoader; -import com.tvd12.quick.rpc.server.util.RpcHandlerAnnotations; +import com.tvd12.quick.rpc.server.util.RpcRequestHandledAnnotations; @SuppressWarnings("rawtypes") public class QuickRpcServer @@ -241,7 +241,7 @@ public RpcServerContext start() throws Exception { requestHandlersBuilder.addHandlers( (Map)EzyMaps.newHashMap( beanContext.getSingletons(RpcRequestHandled.class), - it -> RpcHandlerAnnotations.getCommand(it.getClass()) + it -> RpcRequestHandledAnnotations.getCommand(it.getClass()) ) ); diff --git a/quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/util/RpcHandlerAnnotations.java b/quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/util/RpcRequestHandledAnnotations.java similarity index 83% rename from quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/util/RpcHandlerAnnotations.java rename to quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/util/RpcRequestHandledAnnotations.java index b9f3c91..776f860 100644 --- a/quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/util/RpcHandlerAnnotations.java +++ b/quick-rpc-server/src/main/java/com/tvd12/quick/rpc/server/util/RpcRequestHandledAnnotations.java @@ -3,9 +3,9 @@ import com.tvd12.ezyfox.io.EzyStrings; import com.tvd12.quick.rpc.server.annotation.RpcRequestHandled; -public final class RpcHandlerAnnotations { +public final class RpcRequestHandledAnnotations { - private RpcHandlerAnnotations() {} + private RpcRequestHandledAnnotations() {} public static String getCommand(RpcRequestHandled annotation) { if(EzyStrings.isNoContent(annotation.value()))