Skip to content

Commit

Permalink
Merge pull request #827 from sunnights/dev
Browse files Browse the repository at this point in the history
feat: decouple attachments for request & response
  • Loading branch information
rayzhang0603 authored May 15, 2019
2 parents ed0095d + 665aead commit ff6d3e9
Show file tree
Hide file tree
Showing 16 changed files with 571 additions and 617 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,6 @@

package com.weibo.api.motan.protocol.rpc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.commons.lang3.StringUtils;

import com.weibo.api.motan.codec.AbstractCodec;
import com.weibo.api.motan.codec.Serialization;
import com.weibo.api.motan.common.MotanConstants;
Expand All @@ -40,21 +24,20 @@
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.exception.MotanErrorMsgConstant;
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.rpc.DefaultRequest;
import com.weibo.api.motan.rpc.DefaultResponse;
import com.weibo.api.motan.rpc.Provider;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.rpc.Response;
import com.weibo.api.motan.rpc.*;
import com.weibo.api.motan.transport.Channel;
import com.weibo.api.motan.transport.support.DefaultRpcHeartbeatFactory;
import com.weibo.api.motan.util.ByteUtil;
import com.weibo.api.motan.util.ConcurrentHashSet;
import com.weibo.api.motan.util.ExceptionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanDigestUtil;
import com.weibo.api.motan.util.MotanFrameworkUtil;
import com.weibo.api.motan.util.MotanSwitcherUtil;
import com.weibo.api.motan.util.ReflectUtil;
import com.weibo.api.motan.util.*;
import org.apache.commons.lang3.StringUtils;

import java.io.*;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
* 压缩协议codec,支持开启gzip压缩。
Expand Down Expand Up @@ -361,7 +344,7 @@ private void replaceAttachmentParamsBySign(Channel channel, Map<String, String>
// attachment中的固定参数使用签名方式传递。首次跟server建立链接时传全部信息,之后只传签名。
AttachmentInfo info = getAttachmentInfoMap(attachments);
if (info != null) {
String sign = info.getAttachmetnSign();
String sign = info.getAttachmentSign();
if (sign != null) {
attachments.put(ATTACHMENT_SIGN, sign);

Expand Down Expand Up @@ -425,20 +408,20 @@ private byte[] encodeResponse(Channel channel, Response value) throws IOExceptio
serialize(output, value.getValue(), serialization);
// v2版本可以在response中添加attachment
Map<String, String> attachments = value.getAttachments();
Map<String, String> responseAttachments = new HashMap<>();
if (attachments != null) {
String signed = attachments.get(ATTACHMENT_SIGN);
String unSigned = attachments.get(UN_ATTACHMENT_SIGN);
attachments.clear(); // 除了attachment签名外不返回其他信息。

if (StringUtils.isNotBlank(signed)) {
attachments.put(ATTACHMENT_SIGN, signed);
responseAttachments.put(ATTACHMENT_SIGN, signed);
}
if (StringUtils.isNotBlank(unSigned)) {
attachments.put(UN_ATTACHMENT_SIGN, unSigned);
responseAttachments.put(UN_ATTACHMENT_SIGN, unSigned);
}
}
if (attachments != null && !attachments.isEmpty()) {// 需要回传附加数据
addAttachment(output, attachments);
if (!responseAttachments.isEmpty()) {// 需要回传附加数据
addAttachment(output, responseAttachments);
} else {
// empty attachments
output.writeShort(0);
Expand Down Expand Up @@ -723,7 +706,6 @@ private void putAttachmentInfoMap(AttachmentInfo attachmentInfo, Map<String, Str
private void removeAttachmentInfoMap(Map<String, String> attachments) {
if (attachments != null) {
attachments.remove(URLParamType.group.name());
attachments.remove(URLParamType.application.name());
attachments.remove(URLParamType.module.name());
attachments.remove(URLParamType.version.name());
}
Expand Down Expand Up @@ -931,13 +913,13 @@ public AttachmentInfo(String group, String application, String module, String ve
this.version = version;
}

public String getAttachmetnSign() {
public String getAttachmentSign() {
String signstr = group + application + module + version;
String hashcodeStr = null;
try {
hashcodeStr = MotanDigestUtil.md5LowerCase(signstr).substring(8, 12); // 取md5中的四个字符。
} catch (Exception e) {
LoggerUtil.warn("getAttachmetnSign fail!" + e.getMessage());
LoggerUtil.warn("getAttachmentSign fail!" + e.getMessage());
}
return hashcodeStr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,7 @@ protected void notify(URL refUrl, NotifyListener listener, List<URL> urls) {
}

// refresh local urls cache
for (String nodeType : nodeTypeUrlsInRs.keySet()) {
curls.put(nodeType, nodeTypeUrlsInRs.get(nodeType));
}
curls.putAll(nodeTypeUrlsInRs);

for (List<URL> us : nodeTypeUrlsInRs.values()) {
listener.notify(getUrl(), us);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.weibo.api.motan.core.StandardThreadExecutor;
import com.weibo.api.motan.exception.MotanServiceException;
import com.weibo.api.motan.rpc.URL;
import com.weibo.api.motan.util.CollectionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MathUtil;

Expand All @@ -32,7 +33,7 @@
* @author sunnights
*/
public abstract class AbstractSharedPoolClient extends AbstractClient {
private static final ThreadPoolExecutor executor = new StandardThreadExecutor(1, 300, 20000,
private static final ThreadPoolExecutor EXECUTOR = new StandardThreadExecutor(1, 300, 20000,
new DefaultThreadFactory("AbstractPoolClient-initPool-", true));
private final AtomicInteger idx = new AtomicInteger();
protected SharedObjectFactory factory;
Expand Down Expand Up @@ -62,7 +63,7 @@ protected void initPool() {

protected void initConnections(boolean async) {
if (async) {
executor.execute(new Runnable() {
EXECUTOR.execute(new Runnable() {
@Override
public void run() {
createConnections();
Expand All @@ -83,16 +84,17 @@ private void createConnections() {
}
}

protected Channel getChannel() throws MotanServiceException {
protected Channel getChannel() {
int index = MathUtil.getNonNegativeRange24bit(idx.getAndIncrement());
Channel channel;

for (int i = index; i < connections + index; i++) {
for (int i = index; i < connections + 1 + index; i++) {
channel = channels.get(i % connections);
if (!channel.isAvailable()) {
factory.rebuildObject(channel, i != connections + 1);
}
if (channel.isAvailable()) {
return channel;
} else {
factory.rebuildObject(channel);
}
}

Expand All @@ -102,8 +104,10 @@ protected Channel getChannel() throws MotanServiceException {
}

protected void closeAllChannels() {
for (Channel channel : channels) {
channel.close();
if (!CollectionUtil.isEmpty(channels)) {
for (Channel channel : channels) {
channel.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ public interface SharedObjectFactory<T> {

/**
* 重建对象
*
* @param obj
* @param async
* @return
*/
boolean rebuildObject(T obj);
boolean rebuildObject(T obj, boolean async);

}
14 changes: 0 additions & 14 deletions motan-core/src/main/java/com/weibo/api/motan/util/StatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.commons.lang3.StringUtils;

import java.text.DecimalFormat;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
Expand Down Expand Up @@ -71,10 +70,6 @@ public void close() {
});
}

public static List<StatisticCallback> getStatisticCallbacks() {
return statisticCallbacks;
}

public static void registryStatisticCallback(StatisticCallback callback) {
if (callback == null) {
LoggerUtil.warn("StatsUtil registryStatisticCallback is null");
Expand All @@ -93,15 +88,6 @@ public static void unRegistryStatisticCallback(StatisticCallback callback) {
statisticCallbacks.remove(callback);
}

public static void unRegistryStatisticCallbacks(Collection<StatisticCallback> callbacks) {
if (CollectionUtil.isEmpty(callbacks)) {
LoggerUtil.warn("StatsUtil unRegistryStatisticCallbacks is empty");
return;
}

statisticCallbacks.removeAll(callbacks);
}

/**
* callStatus: 0 is normal, 1 is bizExceptin, 2 is otherException
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ public synchronized void close(int timeout) {
channel.close();
}
} catch (Exception e) {
LoggerUtil
.error("NettyChannel close Error: " + nettyClient.getUrl().getUri() + " local=" + localAddress, e);
LoggerUtil.error("NettyChannel close Error: " + nettyClient.getUrl().getUri() + " local=" + localAddress, e);
}
}

Expand All @@ -202,7 +201,7 @@ public boolean isClosed() {

@Override
public boolean isAvailable() {
return state.isAliveState();
return state.isAliveState() && channel != null && channel.isConnected();
}

@Override
Expand Down
Loading

0 comments on commit ff6d3e9

Please sign in to comment.