From 120138414f340e108940a08f636572361830a3dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=BE=BD?= Date: Mon, 29 Apr 2019 21:52:43 +0800 Subject: [PATCH 1/5] feat: new attachments for response --- .../motan/protocol/rpc/CompressRpcCodec.java | 51 +++++++------------ 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/motan-core/src/main/java/com/weibo/api/motan/protocol/rpc/CompressRpcCodec.java b/motan-core/src/main/java/com/weibo/api/motan/protocol/rpc/CompressRpcCodec.java index 60619e8e1..ba914cb4b 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/protocol/rpc/CompressRpcCodec.java +++ b/motan-core/src/main/java/com/weibo/api/motan/protocol/rpc/CompressRpcCodec.java @@ -16,22 +16,6 @@ package com.weibo.api.motan.protocol.rpc; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -import org.apache.commons.lang3.StringUtils; - import com.weibo.api.motan.codec.AbstractCodec; import com.weibo.api.motan.codec.Serialization; import com.weibo.api.motan.common.MotanConstants; @@ -40,21 +24,20 @@ import com.weibo.api.motan.core.extension.SpiMeta; import com.weibo.api.motan.exception.MotanErrorMsgConstant; import com.weibo.api.motan.exception.MotanFrameworkException; -import com.weibo.api.motan.rpc.DefaultRequest; -import com.weibo.api.motan.rpc.DefaultResponse; -import com.weibo.api.motan.rpc.Provider; -import com.weibo.api.motan.rpc.Request; -import com.weibo.api.motan.rpc.Response; +import com.weibo.api.motan.rpc.*; import com.weibo.api.motan.transport.Channel; import com.weibo.api.motan.transport.support.DefaultRpcHeartbeatFactory; -import com.weibo.api.motan.util.ByteUtil; -import com.weibo.api.motan.util.ConcurrentHashSet; -import com.weibo.api.motan.util.ExceptionUtil; -import com.weibo.api.motan.util.LoggerUtil; -import com.weibo.api.motan.util.MotanDigestUtil; -import com.weibo.api.motan.util.MotanFrameworkUtil; -import com.weibo.api.motan.util.MotanSwitcherUtil; -import com.weibo.api.motan.util.ReflectUtil; +import com.weibo.api.motan.util.*; +import org.apache.commons.lang3.StringUtils; + +import java.io.*; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; /** * 压缩协议codec,支持开启gzip压缩。 @@ -425,20 +408,20 @@ private byte[] encodeResponse(Channel channel, Response value) throws IOExceptio serialize(output, value.getValue(), serialization); // v2版本可以在response中添加attachment Map attachments = value.getAttachments(); + Map responseAttachments = new HashMap<>(); if (attachments != null) { String signed = attachments.get(ATTACHMENT_SIGN); String unSigned = attachments.get(UN_ATTACHMENT_SIGN); - attachments.clear(); // 除了attachment签名外不返回其他信息。 if (StringUtils.isNotBlank(signed)) { - attachments.put(ATTACHMENT_SIGN, signed); + responseAttachments.put(ATTACHMENT_SIGN, signed); } if (StringUtils.isNotBlank(unSigned)) { - attachments.put(UN_ATTACHMENT_SIGN, unSigned); + responseAttachments.put(UN_ATTACHMENT_SIGN, unSigned); } } - if (attachments != null && !attachments.isEmpty()) {// 需要回传附加数据 - addAttachment(output, attachments); + if (!responseAttachments.isEmpty()) {// 需要回传附加数据 + addAttachment(output, responseAttachments); } else { // empty attachments output.writeShort(0); From 8c8562c73c26b569e409163015cfaea9702884bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=BE=BD?= Date: Tue, 30 Apr 2019 10:49:31 +0800 Subject: [PATCH 2/5] fix: cleanup netty when close --- .../motan/protocol/rpc/CompressRpcCodec.java | 7 +- .../motan/transport/netty/NettyClient.java | 835 +++++++++--------- .../motan/transport/netty/NettyServer.java | 39 +- .../motan/transport/netty4/NettyClient.java | 38 +- .../motan/transport/netty4/NettyServer.java | 54 +- .../transport/netty4/NettyServerTest.java | 1 - .../src/test/resources/log4j.properties | 19 + 7 files changed, 501 insertions(+), 492 deletions(-) create mode 100644 motan-transport-netty4/src/test/resources/log4j.properties diff --git a/motan-core/src/main/java/com/weibo/api/motan/protocol/rpc/CompressRpcCodec.java b/motan-core/src/main/java/com/weibo/api/motan/protocol/rpc/CompressRpcCodec.java index ba914cb4b..ac78ec001 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/protocol/rpc/CompressRpcCodec.java +++ b/motan-core/src/main/java/com/weibo/api/motan/protocol/rpc/CompressRpcCodec.java @@ -344,7 +344,7 @@ private void replaceAttachmentParamsBySign(Channel channel, Map // attachment中的固定参数使用签名方式传递。首次跟server建立链接时传全部信息,之后只传签名。 AttachmentInfo info = getAttachmentInfoMap(attachments); if (info != null) { - String sign = info.getAttachmetnSign(); + String sign = info.getAttachmentSign(); if (sign != null) { attachments.put(ATTACHMENT_SIGN, sign); @@ -706,7 +706,6 @@ private void putAttachmentInfoMap(AttachmentInfo attachmentInfo, Map attachments) { if (attachments != null) { attachments.remove(URLParamType.group.name()); - attachments.remove(URLParamType.application.name()); attachments.remove(URLParamType.module.name()); attachments.remove(URLParamType.version.name()); } @@ -914,13 +913,13 @@ public AttachmentInfo(String group, String application, String module, String ve this.version = version; } - public String getAttachmetnSign() { + public String getAttachmentSign() { String signstr = group + application + module + version; String hashcodeStr = null; try { hashcodeStr = MotanDigestUtil.md5LowerCase(signstr).substring(8, 12); // 取md5中的四个字符。 } catch (Exception e) { - LoggerUtil.warn("getAttachmetnSign fail!" + e.getMessage()); + LoggerUtil.warn("getAttachmentSign fail!" + e.getMessage()); } return hashcodeStr; } diff --git a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyClient.java b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyClient.java index 272565491..6b59a5495 100644 --- a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyClient.java +++ b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyClient.java @@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong; /** - * *
  * 		netty client 相关
  * 			1)  timeout 设置 (connecttimeout,sotimeout, application timeout)
@@ -59,431 +58,425 @@
  *
  * @author maijunsheng
  * @version 创建时间:2013-5-31
- *
  */
 public class NettyClient extends AbstractPoolClient implements StatisticCallback {
     //这里采用默认的CPU数*2
-	private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(
-			Executors.newCachedThreadPool(new DefaultThreadFactory("nettyClientBoss", true)),
-			Executors.newCachedThreadPool(new DefaultThreadFactory("nettyClientWorker", true)));
-
-	// 回收过期任务
-	private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4);
-
-	// 异步的request,需要注册callback future
-	// 触发remove的操作有: 1) service的返回结果处理。 2) timeout thread cancel
-	protected ConcurrentMap callbackMap = new ConcurrentHashMap();
-
-	private ScheduledFuture timeMonitorFuture = null;
-
-
-	// 连续失败次数
-	private AtomicLong errorCount = new AtomicLong(0);
-	// 最大连接数
-	private int maxClientConnection = 0;
-
-	private ClientBootstrap bootstrap;
-
-	public NettyClient(URL url) {
-		super(url);
-
-		maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(),
-				URLParamType.maxClientConnection.getIntValue());
-
-		timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay(
-				new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()),
-				MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD,
-				TimeUnit.MILLISECONDS);
-	}
-
-	@Override
-	public Response request(Request request) throws TransportException {
-		if (!isAvailable()) {
-			throw new MotanServiceException("NettyChannel is unavaliable: url=" + url.getUri()
-					+ MotanFrameworkUtil.toString(request));
-		}
-		boolean isAsync = false;
-		Object async = RpcContext.getContext().getAttribute(MotanConstants.ASYNC_SUFFIX);
-		if(async != null && async instanceof Boolean){
-		    isAsync = (Boolean)async;
-		}
-		return request(request, isAsync);
-	}
-
-	@Override
-	public void heartbeat(Request request) {
-		// 如果节点还没有初始化或者节点已经被close掉了,那么heartbeat也不需要进行了
-		if (state.isUnInitState() || state.isCloseState()) {
-			LoggerUtil.warn("NettyClient heartbeat Error: state={} url={}", state.name(), url.getUri());
-			return;
-		}
-
-		LoggerUtil.info("NettyClient heartbeat request: url={}", url.getUri());
-
-		try {
-			// async request后,如果service is
-			// available,那么将会自动把该client设置成可用
-			request(request, true);
-		} catch (Exception e) {
-			LoggerUtil.error("NettyClient heartbeat Error: url=" + url.getUri() + ", " + e.getMessage());
-		}
-	}
-
-	/**
-	 * 请求remote service
-	 *
-	 * 
-	 * 		1)  get connection from pool
-	 * 		2)  async requset
-	 * 		3)  return connection to pool
-	 * 		4)  check if async return response, true: return ResponseFuture;  false: return result
-	 * 
- * - * @param request - * @param async - * @return - * @throws TransportException - */ - private Response request(Request request, boolean async) throws TransportException { - Channel channel = null; - - Response response = null; - - try { - // return channel or throw exception(timeout or connection_fail) - channel = borrowObject(); - MotanFrameworkUtil.logRequestEvent(request.getRequestId(), "after get server connection " + this.getUrl().getServerPortStr(), System.currentTimeMillis()); - - if (channel == null) { - LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " " - + MotanFrameworkUtil.toString(request)); - return null; - } - - // async request - response = channel.request(request); - // return channel to pool - returnObject(channel); - } catch (Exception e) { - LoggerUtil.error( - "NettyClient request Error: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request) + ", " + e.getMessage()); - //TODO 对特定的异常回收channel - invalidateObject(channel); - - if (e instanceof MotanAbstractException) { - throw (MotanAbstractException) e; - } else { - throw new MotanServiceException("NettyClient request Error: url=" + url.getUri() + " " - + MotanFrameworkUtil.toString(request), e); - } - } - - // aysnc or sync result - response = asyncResponse(response, async); - - return response; - } - - /** - * 如果async是false,那么同步获取response的数据 - * - * @param response - * @param async - * @return - */ - private Response asyncResponse(Response response, boolean async) { - if (async || !(response instanceof ResponseFuture)) { - return response; - } - - return new DefaultResponse(response); - } - - @Override - public synchronized boolean open() { - if (isAvailable()) { - return true; - } - - // 初始化netty client bootstrap - initClientBootstrap(); - - // 初始化连接池 - initPool(); - - LoggerUtil.info("NettyClient finish Open: url={}", url); - - // 注册统计回调 - StatsUtil.registryStatisticCallback(this); - - // 设置可用状态 - state = ChannelState.ALIVE; - return state.isAliveState(); - } - - /** - * 初始化 netty clientBootstrap - */ - private void initClientBootstrap() { - bootstrap = new ClientBootstrap(channelFactory); - - bootstrap.setOption("keepAlive", true); - bootstrap.setOption("tcpNoDelay", true); - - // 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时, - // 如果为了严格意义的timeout,那么需要应用端进行控制。 - int timeout = getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue()); + private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(new DefaultThreadFactory("nettyClientBoss", true)), + Executors.newCachedThreadPool(new DefaultThreadFactory("nettyClientWorker", true))); + + // 回收过期任务 + private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4); + + // 异步的request,需要注册callback future + // 触发remove的操作有: 1) service的返回结果处理。 2) timeout thread cancel + protected ConcurrentMap callbackMap = new ConcurrentHashMap(); + + private ScheduledFuture timeMonitorFuture = null; + + + // 连续失败次数 + private AtomicLong errorCount = new AtomicLong(0); + // 最大连接数 + private int maxClientConnection = 0; + + private ClientBootstrap bootstrap; + + public NettyClient(URL url) { + super(url); + + maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(), + URLParamType.maxClientConnection.getIntValue()); + + timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay( + new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()), + MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, + TimeUnit.MILLISECONDS); + } + + @Override + public Response request(Request request) throws TransportException { + if (!isAvailable()) { + throw new MotanServiceException("NettyChannel is unavaliable: url=" + url.getUri() + + MotanFrameworkUtil.toString(request)); + } + boolean isAsync = false; + Object async = RpcContext.getContext().getAttribute(MotanConstants.ASYNC_SUFFIX); + if (async != null && async instanceof Boolean) { + isAsync = (Boolean) async; + } + return request(request, isAsync); + } + + @Override + public void heartbeat(Request request) { + // 如果节点还没有初始化或者节点已经被close掉了,那么heartbeat也不需要进行了 + if (state.isUnInitState() || state.isCloseState()) { + LoggerUtil.warn("NettyClient heartbeat Error: state={} url={}", state.name(), url.getUri()); + return; + } + + LoggerUtil.info("NettyClient heartbeat request: url={}", url.getUri()); + + try { + // async request后,如果service is + // available,那么将会自动把该client设置成可用 + request(request, true); + } catch (Exception e) { + LoggerUtil.error("NettyClient heartbeat Error: url=" + url.getUri() + ", " + e.getMessage()); + } + } + + /** + * 请求remote service + * + *
+     * 		1)  get connection from pool
+     * 		2)  async requset
+     * 		3)  return connection to pool
+     * 		4)  check if async return response, true: return ResponseFuture;  false: return result
+     * 
+ * + * @param request + * @param async + * @return + * @throws TransportException + */ + private Response request(Request request, boolean async) throws TransportException { + Channel channel = null; + + Response response = null; + + try { + // return channel or throw exception(timeout or connection_fail) + channel = borrowObject(); + MotanFrameworkUtil.logRequestEvent(request.getRequestId(), "after get server connection " + this.getUrl().getServerPortStr(), System.currentTimeMillis()); + + if (channel == null) { + LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " " + + MotanFrameworkUtil.toString(request)); + return null; + } + + // async request + response = channel.request(request); + // return channel to pool + returnObject(channel); + } catch (Exception e) { + LoggerUtil.error( + "NettyClient request Error: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request) + ", " + e.getMessage()); + //TODO 对特定的异常回收channel + invalidateObject(channel); + + if (e instanceof MotanAbstractException) { + throw (MotanAbstractException) e; + } else { + throw new MotanServiceException("NettyClient request Error: url=" + url.getUri() + " " + + MotanFrameworkUtil.toString(request), e); + } + } + + // aysnc or sync result + response = asyncResponse(response, async); + + return response; + } + + /** + * 如果async是false,那么同步获取response的数据 + * + * @param response + * @param async + * @return + */ + private Response asyncResponse(Response response, boolean async) { + if (async || !(response instanceof ResponseFuture)) { + return response; + } + + return new DefaultResponse(response); + } + + @Override + public synchronized boolean open() { + if (isAvailable()) { + return true; + } + + // 初始化netty client bootstrap + initClientBootstrap(); + + // 初始化连接池 + initPool(); + + LoggerUtil.info("NettyClient finish Open: url={}", url); + + // 注册统计回调 + StatsUtil.registryStatisticCallback(this); + + // 设置可用状态 + state = ChannelState.ALIVE; + return state.isAliveState(); + } + + /** + * 初始化 netty clientBootstrap + */ + private void initClientBootstrap() { + bootstrap = new ClientBootstrap(channelFactory); + + bootstrap.setOption("keepAlive", true); + bootstrap.setOption("tcpNoDelay", true); + + // 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时, + // 如果为了严格意义的timeout,那么需要应用端进行控制。 + int timeout = getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue()); if (timeout <= 0) { throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR); } - bootstrap.setOption("connectTimeoutMillis", timeout); - - // 最大响应包限制 - final int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(), - URLParamType.maxContentLength.getIntValue()); - - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() { - final ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("decoder", new NettyDecoder(codec, NettyClient.this, maxContentLength)); - pipeline.addLast("encoder", new NettyEncoder(codec, NettyClient.this)); - pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() { - @Override - public Object handle(Channel channel, Object message) { - Response response = (Response) message; - - ResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId()); - - if (responseFuture == null) { - LoggerUtil.warn( - "NettyClient has response from server, but resonseFuture not exist, requestId={}", - response.getRequestId()); - return null; - } - - if (response.getException() != null) { - responseFuture.onFailure(response); - } else { - responseFuture.onSuccess(response); - } - - return null; - } - })); - return pipeline; - } - }); - } - - @Override - public synchronized void close() { - close(0); - } - - /** - * 目前close不支持timeout的概念 - */ - @Override - public synchronized void close(int timeout) { - if (state.isCloseState()) { - LoggerUtil.info("NettyClient close fail: already close, url={}", url.getUri()); - return; - } - - // 如果当前nettyClient还没有初始化,那么就没有close的理由。 - if (state.isUnInitState()) { - LoggerUtil.info("NettyClient close Fail: don't need to close because node is unInit state: url={}", - url.getUri()); - return; - } - - try { - // 取消定期的回收任务 - timeMonitorFuture.cancel(true); - // 关闭连接池 - pool.close(); - // 清空callback - callbackMap.clear(); - - // 设置close状态 - state = ChannelState.CLOSE; - // 解除统计回调的注册 - StatsUtil.unRegistryStatisticCallback(this); - LoggerUtil.info("NettyClient close Success: url={}", url.getUri()); - } catch (Exception e) { - LoggerUtil.error("NettyClient close Error: url=" + url.getUri(), e); - } - - } - - @Override - public boolean isClosed() { - return state.isCloseState(); - } - - @Override - public boolean isAvailable() { - return state.isAliveState(); - } - - @Override - public URL getUrl() { - return url; - } - - /** - * connection factory - */ - @Override - protected BasePoolableObjectFactory createChannelFactory() { - return new NettyChannelFactory(this); - } - - /** - * 增加调用失败的次数: - * - *
-	 * 	 	如果连续失败的次数 >= maxClientConnection, 那么把client设置成不可用状态
-	 * 
- * - */ - void incrErrorCount() { - long count = errorCount.incrementAndGet(); - - // 如果节点是可用状态,同时当前连续失败的次数超过限制maxClientConnection次,那么把该节点标示为不可用 - if (count >= maxClientConnection && state.isAliveState()) { - synchronized (this) { - count = errorCount.longValue(); - - if (count >= maxClientConnection && state.isAliveState()) { - LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " " - + url.getServerPortStr()); - state = ChannelState.UNALIVE; - } - } - } - } - - /** - * 重置调用失败的计数 : - * - *
-	 * 把节点设置成可用
-	 * 
- * - */ - void resetErrorCount() { - errorCount.set(0); - - if (state.isAliveState()) { - return; - } - - synchronized (this) { - if (state.isAliveState()) { - return; - } - - // 如果节点是unalive才进行设置,而如果是 close 或者 uninit,那么直接忽略 - if (state.isUnAliveState()) { - long count = errorCount.longValue(); - - // 过程中有其他并发更新errorCount的,因此这里需要进行一次判断 - if (count < maxClientConnection) { - state = ChannelState.ALIVE; - LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " " - + url.getServerPortStr()); - } - } - } - } - - /** - * 注册回调的resposne - * - *
-	 *
-	 * 		进行最大的请求并发数的控制,如果超过NETTY_CLIENT_MAX_REQUEST的话,那么throw reject exception
-	 *
-	 * 
- * - * @throws MotanServiceException - * @param requestId - * @param nettyResponseFuture - */ - public void registerCallback(long requestId, ResponseFuture nettyResponseFuture) { - if (this.callbackMap.size() >= MotanConstants.NETTY_CLIENT_MAX_REQUEST) { - // reject request, prevent from OutOfMemoryError - throw new MotanServiceException("NettyClient over of max concurrent request, drop request, url: " - + url.getUri() + " requestId=" + requestId, MotanErrorMsgConstant.SERVICE_REJECT); - } - - this.callbackMap.put(requestId, nettyResponseFuture); - } - - /** - * 统计回调接口 - */ - @Override - public String statisticCallback() { - //避免消息泛滥,如果节点是可用状态,并且堆积的请求不超过100的话,那么就不记录log了 - if (isAvailable() && callbackMap.size() < 100) { - return null; - } - - return String.format("identity: %s available: %s concurrent_count: %s", url.getIdentity(), isAvailable(), - callbackMap.size()); - } - - /** - * 移除回调的response - * - * @param requestId - * @return - */ - public ResponseFuture removeCallback(long requestId) { - return callbackMap.remove(requestId); - } - - public ClientBootstrap getBootstrap() { - return bootstrap; - } - - /** - * 回收超时任务 - * - * @author maijunsheng - * - */ - class TimeoutMonitor implements Runnable { - private String name; - - public TimeoutMonitor(String name) { - this.name = name; - } - - @Override - public void run() { - - long currentTime = System.currentTimeMillis(); - - for (Map.Entry entry : callbackMap.entrySet()) { - try { - ResponseFuture future = entry.getValue(); - - if (future.getCreateTime() + future.getTimeout() < currentTime) { - // timeout: remove from callback list, and then cancel - removeCallback(entry.getKey()); - future.cancel(); - } - } catch (Exception e) { - LoggerUtil.error( - name + " clear timeout future Error: uri=" + url.getUri() + " requestId=" + entry.getKey(), - e); - } - } - } - } + bootstrap.setOption("connectTimeoutMillis", timeout); + + // 最大响应包限制 + final int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(), + URLParamType.maxContentLength.getIntValue()); + + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() { + final ChannelPipeline pipeline = Channels.pipeline(); + pipeline.addLast("decoder", new NettyDecoder(codec, NettyClient.this, maxContentLength)); + pipeline.addLast("encoder", new NettyEncoder(codec, NettyClient.this)); + pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() { + @Override + public Object handle(Channel channel, Object message) { + Response response = (Response) message; + + ResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId()); + + if (responseFuture == null) { + LoggerUtil.warn( + "NettyClient has response from server, but resonseFuture not exist, requestId={}", + response.getRequestId()); + return null; + } + + if (response.getException() != null) { + responseFuture.onFailure(response); + } else { + responseFuture.onSuccess(response); + } + + return null; + } + })); + return pipeline; + } + }); + } + + @Override + public synchronized void close() { + close(0); + } + + /** + * 目前close不支持timeout的概念 + */ + @Override + public synchronized void close(int timeout) { + try { + if (state.isCloseState() || state.isUnInitState()) { + LoggerUtil.info("NettyClient close fail: state={}, url={}", state.value, url.getUri()); + cleanup(); + return; + } + + cleanup(); + // 设置close状态 + state = ChannelState.CLOSE; + LoggerUtil.info("NettyClient close Success: url={}", url.getUri()); + } catch (Exception e) { + LoggerUtil.error("NettyClient close Error: url=" + url.getUri(), e); + } + } + + public void cleanup() throws Exception { + // 取消定期的回收任务 + timeMonitorFuture.cancel(true); + // 关闭连接池 + if (pool != null) { + pool.close(); + } + // 清空callback + callbackMap.clear(); + // 解除统计回调的注册 + StatsUtil.unRegistryStatisticCallback(this); + } + + @Override + public boolean isClosed() { + return state.isCloseState(); + } + + @Override + public boolean isAvailable() { + return state.isAliveState(); + } + + @Override + public URL getUrl() { + return url; + } + + /** + * connection factory + */ + @Override + protected BasePoolableObjectFactory createChannelFactory() { + return new NettyChannelFactory(this); + } + + /** + * 增加调用失败的次数: + * + *
+     * 	 	如果连续失败的次数 >= maxClientConnection, 那么把client设置成不可用状态
+     * 
+ */ + void incrErrorCount() { + long count = errorCount.incrementAndGet(); + + // 如果节点是可用状态,同时当前连续失败的次数超过限制maxClientConnection次,那么把该节点标示为不可用 + if (count >= maxClientConnection && state.isAliveState()) { + synchronized (this) { + count = errorCount.longValue(); + + if (count >= maxClientConnection && state.isAliveState()) { + LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " " + + url.getServerPortStr()); + state = ChannelState.UNALIVE; + } + } + } + } + + /** + * 重置调用失败的计数 : + * + *
+     * 把节点设置成可用
+     * 
+ */ + void resetErrorCount() { + errorCount.set(0); + + if (state.isAliveState()) { + return; + } + + synchronized (this) { + if (state.isAliveState()) { + return; + } + + // 如果节点是unalive才进行设置,而如果是 close 或者 uninit,那么直接忽略 + if (state.isUnAliveState()) { + long count = errorCount.longValue(); + + // 过程中有其他并发更新errorCount的,因此这里需要进行一次判断 + if (count < maxClientConnection) { + state = ChannelState.ALIVE; + LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " " + + url.getServerPortStr()); + } + } + } + } + + /** + * 注册回调的resposne + * + *
+     *
+     * 		进行最大的请求并发数的控制,如果超过NETTY_CLIENT_MAX_REQUEST的话,那么throw reject exception
+     *
+     * 
+ * + * @param requestId + * @param nettyResponseFuture + * @throws MotanServiceException + */ + public void registerCallback(long requestId, ResponseFuture nettyResponseFuture) { + if (this.callbackMap.size() >= MotanConstants.NETTY_CLIENT_MAX_REQUEST) { + // reject request, prevent from OutOfMemoryError + throw new MotanServiceException("NettyClient over of max concurrent request, drop request, url: " + + url.getUri() + " requestId=" + requestId, MotanErrorMsgConstant.SERVICE_REJECT); + } + + this.callbackMap.put(requestId, nettyResponseFuture); + } + + /** + * 统计回调接口 + */ + @Override + public String statisticCallback() { + //避免消息泛滥,如果节点是可用状态,并且堆积的请求不超过100的话,那么就不记录log了 + if (isAvailable() && callbackMap.size() < 100) { + return null; + } + + return String.format("identity: %s available: %s concurrent_count: %s", url.getIdentity(), isAvailable(), + callbackMap.size()); + } + + /** + * 移除回调的response + * + * @param requestId + * @return + */ + public ResponseFuture removeCallback(long requestId) { + return callbackMap.remove(requestId); + } + + public ClientBootstrap getBootstrap() { + return bootstrap; + } + + /** + * 回收超时任务 + * + * @author maijunsheng + */ + class TimeoutMonitor implements Runnable { + private String name; + + public TimeoutMonitor(String name) { + this.name = name; + } + + @Override + public void run() { + + long currentTime = System.currentTimeMillis(); + + for (Map.Entry entry : callbackMap.entrySet()) { + try { + ResponseFuture future = entry.getValue(); + + if (future.getCreateTime() + future.getTimeout() < currentTime) { + // timeout: remove from callback list, and then cancel + removeCallback(entry.getKey()); + future.cancel(); + } + } catch (Exception e) { + LoggerUtil.error( + name + " clear timeout future Error: uri=" + url.getUri() + " requestId=" + entry.getKey(), + e); + } + } + } + } } diff --git a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyServer.java b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyServer.java index ecede158a..07a1605f3 100644 --- a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyServer.java +++ b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyServer.java @@ -160,35 +160,34 @@ public synchronized void close() { @Override public synchronized void close(int timeout) { - if (state.isCloseState()) { - LoggerUtil.info("NettyServer close fail: already close, url={}", url.getUri()); - return; - } - - if (state.isUnInitState()) { - LoggerUtil.info("NettyServer close Fail: don't need to close because node is unInit state: url={}", - url.getUri()); - return; - } - try { - // close listen socket - serverChannel.close(); - // close all clients's channel - channelManage.close(); - // shutdown the threadPool - standardThreadExecutor.shutdownNow(); + if (state.isCloseState() || state.isUnInitState()) { + LoggerUtil.info("NettyServer close fail: state={}, url={}", state.value, url.getUri()); + cleanup(); + return; + } + + cleanup(); // 设置close状态 state = ChannelState.CLOSE; - // 取消统计回调的注册 - StatsUtil.unRegistryStatisticCallback(nettyChannelHandler); - StatsUtil.unRegistryStatisticCallback(this); LoggerUtil.info("NettyServer close Success: url={}", url.getUri()); } catch (Exception e) { LoggerUtil.error("NettyServer close Error: url=" + url.getUri(), e); } } + public void cleanup() { + // close listen socket + serverChannel.close(); + // close all clients's channel + channelManage.close(); + // shutdown the threadPool + standardThreadExecutor.shutdownNow(); + // 取消统计回调的注册 + StatsUtil.unRegistryStatisticCallback(nettyChannelHandler); + StatsUtil.unRegistryStatisticCallback(this); + } + @Override public boolean isClosed() { return state.isCloseState(); diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java index 13d50eb2f..05735c1bd 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java @@ -205,34 +205,34 @@ public synchronized void close() { @Override public synchronized void close(int timeout) { - if (state.isCloseState()) { - LoggerUtil.info("NettyClient close fail: already close, url={}", url.getUri()); - return; - } - - // 如果当前nettyClient还没有初始化,那么就没有close的理由。 - if (state.isUnInitState()) { - LoggerUtil.info("NettyClient close Fail: don't need to close because node is unInit state: url={}", url.getUri()); - return; - } - try { - // 取消定期的回收任务 - timeMonitorFuture.cancel(true); - // 清空callback - callbackMap.clear(); + if (state.isCloseState() || state.isUnInitState()) { + LoggerUtil.info("NettyClient close fail: state={}, url={}", state.value, url.getUri()); + cleanup(); + return; + } + + cleanup(); // 设置close状态 state = ChannelState.CLOSE; - // 关闭client持有的channel - closeAllChannels(); - // 解除统计回调的注册 - StatsUtil.unRegistryStatisticCallback(this); LoggerUtil.info("NettyClient close Success: url={}", url.getUri()); } catch (Exception e) { LoggerUtil.error("NettyClient close Error: url=" + url.getUri(), e); + } finally { + // 解除统计回调的注册 + StatsUtil.unRegistryStatisticCallback(this); } } + public void cleanup() { + // 取消定期的回收任务 + timeMonitorFuture.cancel(true); + // 清空callback + callbackMap.clear(); + // 关闭client持有的channel + closeAllChannels(); + } + @Override public boolean isClosed() { return state.isCloseState(); diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyServer.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyServer.java index b6663ce18..dfc2cfeb4 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyServer.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyServer.java @@ -118,43 +118,43 @@ public synchronized void close() { @Override public synchronized void close(int timeout) { - if (state.isCloseState()) { - LoggerUtil.info("NettyServer close fail: already close, url={}", url.getUri()); - return; - } - - if (state.isUnInitState()) { - LoggerUtil.info("NettyServer close Fail: don't need to close because node is unInit state: url={}", url.getUri()); - return; - } - try { - // close listen socket - if (serverChannel != null) { - serverChannel.close(); - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - bossGroup = null; - workerGroup = null; - } - // close all clients's channel - if (channelManage != null) { - channelManage.close(); - } - // shutdown the threadPool - if (standardThreadExecutor != null) { - standardThreadExecutor.shutdownNow(); + if (state.isCloseState() || state.isUnInitState()) { + LoggerUtil.info("NettyServer close fail: state={}, url={}", state.value, url.getUri()); + cleanup(); + return; } + + cleanup(); // 设置close状态 state = ChannelState.CLOSE; - // 取消统计回调的注册 - clearStatisticCallback(); LoggerUtil.info("NettyServer close Success: url={}", url.getUri()); } catch (Exception e) { LoggerUtil.error("NettyServer close Error: url=" + url.getUri(), e); } } + public void cleanup() { + // close listen socket + if (serverChannel != null) { + serverChannel.close(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + bossGroup = null; + workerGroup = null; + } + // close all clients's channel + if (channelManage != null) { + channelManage.close(); + } + // shutdown the threadPool + if (standardThreadExecutor != null) { + standardThreadExecutor.shutdownNow(); + } + // 取消统计回调的注册 + clearStatisticCallback(); + } + @Override public boolean isClosed() { return state.isCloseState(); diff --git a/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyServerTest.java b/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyServerTest.java index 8c84bb138..4695ac3a3 100644 --- a/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyServerTest.java +++ b/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyServerTest.java @@ -100,7 +100,6 @@ public Object handle(Channel channel, Object message) { Assert.assertTrue(StatsUtil.getStatisticCallbacks().size() >= minClientConnection + 1); nettyServer.close(); - Thread.sleep(100); Assert.assertEquals(0, StatsUtil.getStatisticCallbacks().size()); } diff --git a/motan-transport-netty4/src/test/resources/log4j.properties b/motan-transport-netty4/src/test/resources/log4j.properties new file mode 100644 index 000000000..1d49534c2 --- /dev/null +++ b/motan-transport-netty4/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# +# Copyright 2009-2016 Weibo, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +log4j.rootLogger=error,console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=(%F:%L)|%m%n From 280be47a354736728f2028ec25829a238d6d296f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=BE=BD?= Date: Thu, 9 May 2019 11:37:34 +0800 Subject: [PATCH 3/5] fix: netty3 check channel available --- .../weibo/api/motan/registry/support/AbstractRegistry.java | 4 +--- .../com/weibo/api/motan/transport/netty/NettyChannel.java | 5 ++--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/motan-core/src/main/java/com/weibo/api/motan/registry/support/AbstractRegistry.java b/motan-core/src/main/java/com/weibo/api/motan/registry/support/AbstractRegistry.java index e10fa6b72..4fdbd77ed 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/registry/support/AbstractRegistry.java +++ b/motan-core/src/main/java/com/weibo/api/motan/registry/support/AbstractRegistry.java @@ -210,9 +210,7 @@ protected void notify(URL refUrl, NotifyListener listener, List urls) { } // refresh local urls cache - for (String nodeType : nodeTypeUrlsInRs.keySet()) { - curls.put(nodeType, nodeTypeUrlsInRs.get(nodeType)); - } + curls.putAll(nodeTypeUrlsInRs); for (List us : nodeTypeUrlsInRs.values()) { listener.notify(getUrl(), us); diff --git a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyChannel.java b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyChannel.java index e57b720d5..bd82d4ca0 100644 --- a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyChannel.java +++ b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyChannel.java @@ -180,8 +180,7 @@ public synchronized void close(int timeout) { channel.close(); } } catch (Exception e) { - LoggerUtil - .error("NettyChannel close Error: " + nettyClient.getUrl().getUri() + " local=" + localAddress, e); + LoggerUtil.error("NettyChannel close Error: " + nettyClient.getUrl().getUri() + " local=" + localAddress, e); } } @@ -202,7 +201,7 @@ public boolean isClosed() { @Override public boolean isAvailable() { - return state.isAliveState(); + return state.isAliveState() && channel != null && channel.isConnected(); } @Override From 8cba0656ec8b3424a7da8933e0c844229b50adfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=BE=BD?= Date: Thu, 9 May 2019 20:19:16 +0800 Subject: [PATCH 4/5] feat: sharedPoolClient optimize rebuildObject when get channel --- .../motan/transport/AbstractSharedPoolClient.java | 13 +++++++------ .../api/motan/transport/SharedObjectFactory.java | 4 +++- .../motan/transport/netty4/NettyChannelFactory.java | 12 ++++++++++-- .../api/motan/transport/netty4/NettyClient.java | 8 +++++--- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/motan-core/src/main/java/com/weibo/api/motan/transport/AbstractSharedPoolClient.java b/motan-core/src/main/java/com/weibo/api/motan/transport/AbstractSharedPoolClient.java index 7906a3c98..722544ad6 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/transport/AbstractSharedPoolClient.java +++ b/motan-core/src/main/java/com/weibo/api/motan/transport/AbstractSharedPoolClient.java @@ -32,7 +32,7 @@ * @author sunnights */ public abstract class AbstractSharedPoolClient extends AbstractClient { - private static final ThreadPoolExecutor executor = new StandardThreadExecutor(1, 300, 20000, + private static final ThreadPoolExecutor EXECUTOR = new StandardThreadExecutor(1, 300, 20000, new DefaultThreadFactory("AbstractPoolClient-initPool-", true)); private final AtomicInteger idx = new AtomicInteger(); protected SharedObjectFactory factory; @@ -62,7 +62,7 @@ protected void initPool() { protected void initConnections(boolean async) { if (async) { - executor.execute(new Runnable() { + EXECUTOR.execute(new Runnable() { @Override public void run() { createConnections(); @@ -83,16 +83,17 @@ private void createConnections() { } } - protected Channel getChannel() throws MotanServiceException { + protected Channel getChannel() { int index = MathUtil.getNonNegativeRange24bit(idx.getAndIncrement()); Channel channel; - for (int i = index; i < connections + index; i++) { + for (int i = index; i < connections + 1 + index; i++) { channel = channels.get(i % connections); + if (!channel.isAvailable()) { + factory.rebuildObject(channel, i != connections + 1); + } if (channel.isAvailable()) { return channel; - } else { - factory.rebuildObject(channel); } } diff --git a/motan-core/src/main/java/com/weibo/api/motan/transport/SharedObjectFactory.java b/motan-core/src/main/java/com/weibo/api/motan/transport/SharedObjectFactory.java index 66e78dbcb..cef04e7ae 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/transport/SharedObjectFactory.java +++ b/motan-core/src/main/java/com/weibo/api/motan/transport/SharedObjectFactory.java @@ -14,9 +14,11 @@ public interface SharedObjectFactory { /** * 重建对象 + * * @param obj + * @param async * @return */ - boolean rebuildObject(T obj); + boolean rebuildObject(T obj, boolean async); } diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelFactory.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelFactory.java index d9162054d..1effd061c 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelFactory.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelFactory.java @@ -31,14 +31,22 @@ public NettyChannel makeObject() { } @Override - public boolean rebuildObject(NettyChannel nettyChannel) { + public boolean rebuildObject(NettyChannel nettyChannel, boolean async) { ReentrantLock lock = nettyChannel.getLock(); if (lock.tryLock()) { try { if (!nettyChannel.isAvailable() && !nettyChannel.isReconnect()) { nettyChannel.reconnect(); - rebuildExecutorService.submit(new RebuildTask(nettyChannel)); + if (async) { + rebuildExecutorService.submit(new RebuildTask(nettyChannel)); + } else { + nettyChannel.close(); + nettyChannel.open(); + LoggerUtil.info("rebuild channel success: " + nettyChannel.getUrl()); + } } + } catch (Exception e) { + LoggerUtil.error("rebuild error: " + this.toString() + ", " + nettyChannel.getUrl(), e); } finally { lock.unlock(); } diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java index 05735c1bd..108268d33 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java @@ -41,6 +41,7 @@ public class NettyClient extends AbstractSharedPoolClient implements StatisticCa protected ConcurrentMap callbackMap = new ConcurrentHashMap<>(); private ScheduledFuture timeMonitorFuture = null; private Bootstrap bootstrap; + private int maxClientConnection; /** * 连续失败次数 */ @@ -48,6 +49,7 @@ public class NettyClient extends AbstractSharedPoolClient implements StatisticCa public NettyClient(URL url) { super(url); + maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue()); timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay( new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()), MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, @@ -273,11 +275,11 @@ void incrErrorCount() { long count = errorCount.incrementAndGet(); // 如果节点是可用状态,同时当前连续失败的次数超过连接数,那么把该节点标示为不可用 - if (count >= connections && state.isAliveState()) { + if (count >= maxClientConnection && state.isAliveState()) { synchronized (this) { count = errorCount.longValue(); - if (count >= connections && state.isAliveState()) { + if (count >= maxClientConnection && state.isAliveState()) { LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " " + url.getServerPortStr()); state = ChannelState.UNALIVE; @@ -309,7 +311,7 @@ void resetErrorCount() { long count = errorCount.longValue(); // 过程中有其他并发更新errorCount的,因此这里需要进行一次判断 - if (count < connections) { + if (count < maxClientConnection) { state = ChannelState.ALIVE; LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " " + url.getServerPortStr()); From 665aeadb0720d8d2e79f213875f8f9be49f1d411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=BE=BD?= Date: Wed, 15 May 2019 17:43:46 +0800 Subject: [PATCH 5/5] refactor: NettyServer statistic callback --- .../transport/AbstractSharedPoolClient.java | 7 ++- .../com/weibo/api/motan/util/StatsUtil.java | 14 ------ .../motan/transport/netty/NettyClient.java | 9 ++-- .../motan/transport/netty/NettyServer.java | 9 ++-- .../transport/netty/NettyClientTest.java | 6 +-- .../transport/netty4/NettyChannelHandler.java | 19 ++------ .../motan/transport/netty4/NettyClient.java | 14 +++--- .../motan/transport/netty4/NettyServer.java | 43 ++++++++++--------- .../transport/netty4/NettyClientTest.java | 4 +- .../transport/netty4/NettyServerTest.java | 31 ------------- .../src/test/resources/log4j.properties | 4 +- 11 files changed, 56 insertions(+), 104 deletions(-) diff --git a/motan-core/src/main/java/com/weibo/api/motan/transport/AbstractSharedPoolClient.java b/motan-core/src/main/java/com/weibo/api/motan/transport/AbstractSharedPoolClient.java index 722544ad6..05f5f9058 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/transport/AbstractSharedPoolClient.java +++ b/motan-core/src/main/java/com/weibo/api/motan/transport/AbstractSharedPoolClient.java @@ -21,6 +21,7 @@ import com.weibo.api.motan.core.StandardThreadExecutor; import com.weibo.api.motan.exception.MotanServiceException; import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.util.CollectionUtil; import com.weibo.api.motan.util.LoggerUtil; import com.weibo.api.motan.util.MathUtil; @@ -103,8 +104,10 @@ protected Channel getChannel() { } protected void closeAllChannels() { - for (Channel channel : channels) { - channel.close(); + if (!CollectionUtil.isEmpty(channels)) { + for (Channel channel : channels) { + channel.close(); + } } } } diff --git a/motan-core/src/main/java/com/weibo/api/motan/util/StatsUtil.java b/motan-core/src/main/java/com/weibo/api/motan/util/StatsUtil.java index 6fbce3d18..6b5d47f81 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/util/StatsUtil.java +++ b/motan-core/src/main/java/com/weibo/api/motan/util/StatsUtil.java @@ -27,7 +27,6 @@ import org.apache.commons.lang3.StringUtils; import java.text.DecimalFormat; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.*; @@ -71,10 +70,6 @@ public void close() { }); } - public static List getStatisticCallbacks() { - return statisticCallbacks; - } - public static void registryStatisticCallback(StatisticCallback callback) { if (callback == null) { LoggerUtil.warn("StatsUtil registryStatisticCallback is null"); @@ -93,15 +88,6 @@ public static void unRegistryStatisticCallback(StatisticCallback callback) { statisticCallbacks.remove(callback); } - public static void unRegistryStatisticCallbacks(Collection callbacks) { - if (CollectionUtil.isEmpty(callbacks)) { - LoggerUtil.warn("StatsUtil unRegistryStatisticCallbacks is empty"); - return; - } - - statisticCallbacks.removeAll(callbacks); - } - /** * callStatus: 0 is normal, 1 is bizExceptin, 2 is otherException * diff --git a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyClient.java b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyClient.java index 6b59a5495..67dffe308 100644 --- a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyClient.java +++ b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyClient.java @@ -285,14 +285,17 @@ public synchronized void close() { */ @Override public synchronized void close(int timeout) { + if (state.isCloseState()) { + return; + } + try { - if (state.isCloseState() || state.isUnInitState()) { + cleanup(); + if (state.isUnInitState()) { LoggerUtil.info("NettyClient close fail: state={}, url={}", state.value, url.getUri()); - cleanup(); return; } - cleanup(); // 设置close状态 state = ChannelState.CLOSE; LoggerUtil.info("NettyClient close Success: url={}", url.getUri()); diff --git a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyServer.java b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyServer.java index 07a1605f3..e31054c77 100644 --- a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyServer.java +++ b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyServer.java @@ -160,14 +160,17 @@ public synchronized void close() { @Override public synchronized void close(int timeout) { + if (state.isCloseState()) { + return; + } + try { - if (state.isCloseState() || state.isUnInitState()) { + cleanup(); + if (state.isUnInitState()) { LoggerUtil.info("NettyServer close fail: state={}, url={}", state.value, url.getUri()); - cleanup(); return; } - cleanup(); // 设置close状态 state = ChannelState.CLOSE; LoggerUtil.info("NettyServer close Success: url={}", url.getUri()); diff --git a/motan-transport-netty/src/test/java/com/weibo/api/motan/transport/netty/NettyClientTest.java b/motan-transport-netty/src/test/java/com/weibo/api/motan/transport/netty/NettyClientTest.java index 36eca163e..b04f43ebe 100644 --- a/motan-transport-netty/src/test/java/com/weibo/api/motan/transport/netty/NettyClientTest.java +++ b/motan-transport-netty/src/test/java/com/weibo/api/motan/transport/netty/NettyClientTest.java @@ -23,7 +23,6 @@ import com.weibo.api.motan.transport.Channel; import com.weibo.api.motan.transport.MessageHandler; import com.weibo.api.motan.util.RequestIdGenerator; -import com.weibo.api.motan.util.StatsUtil; import junit.framework.Assert; import org.junit.After; import org.junit.Before; @@ -74,12 +73,9 @@ public Object handle(Channel channel, Object message) { } @After - public void tearDown() throws InterruptedException { + public void tearDown() { nettyClient.close(); nettyServer.close(); - - Thread.sleep(100); - assertEquals(0, StatsUtil.getStatisticCallbacks().size()); } @Test diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelHandler.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelHandler.java index 5185df84c..863bf5813 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelHandler.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelHandler.java @@ -13,7 +13,6 @@ import com.weibo.api.motan.util.LoggerUtil; import com.weibo.api.motan.util.MotanFrameworkUtil; import com.weibo.api.motan.util.NetUtils; -import com.weibo.api.motan.util.StatisticCallback; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -23,17 +22,15 @@ import java.net.SocketAddress; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; /** * @author sunnights */ -public class NettyChannelHandler extends ChannelDuplexHandler implements StatisticCallback { +public class NettyChannelHandler extends ChannelDuplexHandler { private ThreadPoolExecutor threadPoolExecutor; private MessageHandler messageHandler; private Channel channel; private Codec codec; - private AtomicInteger rejectCounter = new AtomicInteger(0); public NettyChannelHandler(Channel channel, MessageHandler messageHandler) { this.channel = channel; @@ -102,7 +99,9 @@ private void rejectMessage(ChannelHandlerContext ctx, NettyMessage msg) { LoggerUtil.error("process thread pool is full, reject, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}", threadPoolExecutor.getActiveCount(), threadPoolExecutor.getPoolSize(), threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getTaskCount(), msg.getRequestId()); - rejectCounter.incrementAndGet(); + if (channel instanceof NettyServer) { + ((NettyServer) channel).getRejectCounter().incrementAndGet(); + } } } @@ -210,14 +209,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E LoggerUtil.error("NettyChannelHandler exceptionCaught: remote={} local={} event={}", ctx.channel().remoteAddress(), ctx.channel().localAddress(), cause.getMessage(), cause); ctx.channel().close(); } - - @Override - public String statisticCallback() { - int count = rejectCounter.getAndSet(0); - if (count > 0) { - return String.format("type: motan name: reject_request_pool total_count: %s reject_count: %s", threadPoolExecutor.getPoolSize(), count); - } else { - return null; - } - } } diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java index 108268d33..dbe017aa8 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyClient.java @@ -207,22 +207,22 @@ public synchronized void close() { @Override public synchronized void close(int timeout) { + if (state.isCloseState()) { + return; + } + try { - if (state.isCloseState() || state.isUnInitState()) { + cleanup(); + if (state.isUnInitState()) { LoggerUtil.info("NettyClient close fail: state={}, url={}", state.value, url.getUri()); - cleanup(); return; } - cleanup(); // 设置close状态 state = ChannelState.CLOSE; LoggerUtil.info("NettyClient close Success: url={}", url.getUri()); } catch (Exception e) { LoggerUtil.error("NettyClient close Error: url=" + url.getUri(), e); - } finally { - // 解除统计回调的注册 - StatsUtil.unRegistryStatisticCallback(this); } } @@ -233,6 +233,8 @@ public void cleanup() { callbackMap.clear(); // 关闭client持有的channel closeAllChannels(); + // 解除统计回调的注册 + StatsUtil.unRegistryStatisticCallback(this); } @Override diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyServer.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyServer.java index dfc2cfeb4..f5aa01c21 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyServer.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyServer.java @@ -22,8 +22,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * @author sunnights @@ -35,7 +34,12 @@ public class NettyServer extends AbstractServer implements StatisticCallback { private Channel serverChannel; private MessageHandler messageHandler; private StandardThreadExecutor standardThreadExecutor = null; - private List statisticCallbackList = new ArrayList<>(); + + private AtomicInteger rejectCounter = new AtomicInteger(0); + + public AtomicInteger getRejectCounter() { + return rejectCounter; + } public NettyServer(URL url, MessageHandler messageHandler) { super(url); @@ -97,7 +101,6 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast("encoder", new NettyEncoder()); NettyChannelHandler handler = new NettyChannelHandler(NettyServer.this, messageHandler, standardThreadExecutor); pipeline.addLast("handler", handler); - addStatisticCallback(handler); } }); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true); @@ -106,7 +109,7 @@ protected void initChannel(SocketChannel ch) throws Exception { channelFuture.syncUninterruptibly(); serverChannel = channelFuture.channel(); state = ChannelState.ALIVE; - addStatisticCallback(this); + StatsUtil.registryStatisticCallback(this); LoggerUtil.info("NettyServer ServerChannel finish Open: url=" + url); return state.isAliveState(); } @@ -118,14 +121,17 @@ public synchronized void close() { @Override public synchronized void close(int timeout) { + if (state.isCloseState()) { + return; + } + try { - if (state.isCloseState() || state.isUnInitState()) { + cleanup(); + if (state.isUnInitState()) { LoggerUtil.info("NettyServer close fail: state={}, url={}", state.value, url.getUri()); - cleanup(); return; } - cleanup(); // 设置close状态 state = ChannelState.CLOSE; LoggerUtil.info("NettyServer close Success: url={}", url.getUri()); @@ -138,9 +144,13 @@ public void cleanup() { // close listen socket if (serverChannel != null) { serverChannel.close(); + } + if (bossGroup != null) { bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); bossGroup = null; + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); workerGroup = null; } // close all clients's channel @@ -152,7 +162,7 @@ public void cleanup() { standardThreadExecutor.shutdownNow(); } // 取消统计回调的注册 - clearStatisticCallback(); + StatsUtil.unRegistryStatisticCallback(this); } @Override @@ -172,18 +182,9 @@ public URL getUrl() { @Override public String statisticCallback() { - return String.format("identity: %s connectionCount: %s taskCount: %s queueCount: %s maxThreadCount: %s maxTaskCount: %s", + return String.format("identity: %s connectionCount: %s taskCount: %s queueCount: %s maxThreadCount: %s maxTaskCount: %s executorRejectCount: %s", url.getIdentity(), channelManage.getChannels().size(), standardThreadExecutor.getSubmittedTasksCount(), standardThreadExecutor.getQueue().size(), standardThreadExecutor.getMaximumPoolSize(), - standardThreadExecutor.getMaxSubmittedTaskCount()); - } - - private void addStatisticCallback(StatisticCallback callback) { - StatsUtil.registryStatisticCallback(callback); - statisticCallbackList.add(callback); - } - - private void clearStatisticCallback() { - StatsUtil.unRegistryStatisticCallbacks(statisticCallbackList); + standardThreadExecutor.getMaxSubmittedTaskCount(), rejectCounter.getAndSet(0)); } } diff --git a/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyClientTest.java b/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyClientTest.java index b118a06a8..671f53692 100644 --- a/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyClientTest.java +++ b/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyClientTest.java @@ -143,7 +143,7 @@ public void testAbNormal() throws InterruptedException { @Test public void testAbNormal2() throws InterruptedException { // 模拟失败连接的次数大于或者等于设置的次数,client期望为不可用 - url.addParameter(URLParamType.minClientConnection.getName(), "1"); + url.addParameter(URLParamType.maxClientConnection.getName(), "1"); url.addParameter(URLParamType.requestTimeout.getName(), "1"); NettyTestClient nettyClient = new NettyTestClient(url); this.nettyClient = nettyClient; @@ -154,7 +154,7 @@ public void testAbNormal2() throws InterruptedException { } catch (Exception e) { } - Thread.sleep(3000); + Thread.sleep(100); try { nettyClient.request(request); } catch (MotanServiceException e) { diff --git a/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyServerTest.java b/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyServerTest.java index 4695ac3a3..42ff07cfc 100644 --- a/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyServerTest.java +++ b/motan-transport-netty4/src/test/java/com/weibo/api/motan/transport/netty4/NettyServerTest.java @@ -6,7 +6,6 @@ import com.weibo.api.motan.rpc.URL; import com.weibo.api.motan.transport.Channel; import com.weibo.api.motan.transport.MessageHandler; -import com.weibo.api.motan.util.StatsUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -73,34 +72,4 @@ public Object handle(Channel channel, Object message) { Assert.assertEquals(0, nettyServer.channelManage.getChannels().size()); } - @Test - public void testCallbacks() throws InterruptedException { - int minClientConnection = 2; - int maxServerConnection = 2; - url.addParameter(URLParamType.minClientConnection.getName(), String.valueOf(minClientConnection)); - url.addParameter(URLParamType.maxServerConnection.getName(), String.valueOf(maxServerConnection)); - url.addParameter(URLParamType.requestTimeout.getName(), "10000"); - nettyServer = new NettyServer(url, new MessageHandler() { - @Override - public Object handle(Channel channel, Object message) { - Request request = (Request) message; - DefaultResponse response = new DefaultResponse(); - response.setRequestId(request.getRequestId()); - response.setValue("method: " + request.getMethodName() + " requestId: " + request.getRequestId()); - return response; - } - }); - nettyServer.open(); - Assert.assertEquals(0, nettyServer.channelManage.getChannels().size()); - - NettyClient nettyClient = new NettyClient(url); - nettyClient.open(); - Thread.sleep(100); - nettyClient.close(); - - Assert.assertTrue(StatsUtil.getStatisticCallbacks().size() >= minClientConnection + 1); - nettyServer.close(); - Assert.assertEquals(0, StatsUtil.getStatisticCallbacks().size()); - } - } \ No newline at end of file diff --git a/motan-transport-netty4/src/test/resources/log4j.properties b/motan-transport-netty4/src/test/resources/log4j.properties index 1d49534c2..5705c84f8 100644 --- a/motan-transport-netty4/src/test/resources/log4j.properties +++ b/motan-transport-netty4/src/test/resources/log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -log4j.rootLogger=error,console +log4j.rootLogger=fatal,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=(%F:%L)|%m%n +log4j.appender.console.layout.ConversionPattern=%-d{HH:mm:ss,SSS} %t %m%n