Skip to content

Commit

Permalink
Modifying linkis rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Nov 20, 2023
1 parent fa3011f commit 7935cde
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 12 deletions.
4 changes: 4 additions & 0 deletions linkis-commons/linkis-module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-openfeign-core</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
Expand All @@ -66,6 +67,7 @@
@SpringBootApplication(scanBasePackages = {"org.apache.linkis", "com.webank.wedatasphere"})
@EnableDiscoveryClient
@RefreshScope
@EnableFeignClients
public class DataWorkCloudApplication extends SpringBootServletInitializer {
private static final Log logger = LogFactory.getLog(DataWorkCloudApplication.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.rpc.conf;

import org.apache.linkis.DataWorkCloudApplication;

import org.apache.commons.lang3.StringUtils;

import org.springframework.cloud.openfeign.FeignClientBuilder;
import org.springframework.cloud.openfeign.FeignClientFactoryBean;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

@Component
public class DynamicFeignClient<T> {

private FeignClientBuilder feignClientBuilder;

private final ConcurrentHashMap<String, T> CACHE_BEAN = new ConcurrentHashMap();

public DynamicFeignClient() {
this.feignClientBuilder =
new FeignClientBuilder(DataWorkCloudApplication.getApplicationContext());
}

public T getFeignClient(final Class<T> type, final String serviceName) {
return getFeignClient(type, serviceName, null);
}

public T getFeignClient(
final Class<T> type, final Class<?> fallbackFactory, final String serviceName) {
return getFeignClient(type, fallbackFactory, serviceName, null);
}

public T getFeignClient(
final Class<T> type,
final FeignClientFactoryBean clientFactoryBean,
final String serviceName) {
return getFeignClient(type, clientFactoryBean, serviceName, null);
}

public T getFeignClient(final Class<T> type, String serviceName, final String serviceUrl) {
String k = serviceName;
if (StringUtils.isNotEmpty(serviceUrl)) {
k = serviceUrl;
}
return CACHE_BEAN.computeIfAbsent(
k,
(t) -> {
FeignClientBuilder.Builder<T> builder =
this.feignClientBuilder.forType(type, serviceName);
if (StringUtils.isNotEmpty(serviceUrl)) {
builder.url(serviceUrl);
}
return builder.build();
});
}

public T getFeignClient(
final Class<T> type,
final Class<?> fallbackFactory,
final String serviceName,
final String serviceUrl) {
String k = serviceName;
if (StringUtils.isNotEmpty(serviceUrl)) {
k = serviceUrl;
}
return CACHE_BEAN.computeIfAbsent(
k,
(t) -> {
FeignClientFactoryBean feignClientFactoryBean = new FeignClientFactoryBean();
feignClientFactoryBean.setFallbackFactory(fallbackFactory);
FeignClientBuilder.Builder<T> builder =
this.feignClientBuilder.forType(type, feignClientFactoryBean, serviceName);
if (StringUtils.isNotEmpty(serviceUrl)) {
builder.url(serviceUrl);
}
return builder.build();
});
}

public T getFeignClient(
final Class<T> type,
final FeignClientFactoryBean clientFactoryBean,
final String serviceName,
final String serviceUrl) {
String k = serviceName;
if (StringUtils.isNotEmpty(serviceUrl)) {
k = serviceUrl;
}
return CACHE_BEAN.computeIfAbsent(
k,
(t) -> {
FeignClientBuilder.Builder<T> builder =
this.feignClientBuilder.forType(type, clientFactoryBean, serviceName);
if (StringUtils.isNotEmpty(serviceUrl)) {
builder.url(serviceUrl);
}
return builder.build();
});
}

private T getFromCache(final String serviceName, final String serviceUrl) {
if (StringUtils.isNotEmpty(serviceUrl)) {
return CACHE_BEAN.get(serviceUrl);
} else {
return CACHE_BEAN.get(serviceName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.rpc.conf;

import org.apache.linkis.rpc.constant.RpcConstant;
import org.apache.linkis.server.security.SSOUtils$;
import org.apache.linkis.server.security.SecurityFilter$;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;

import java.util.Enumeration;

import scala.Tuple2;

import feign.RequestInterceptor;
import feign.RequestTemplate;

@Configuration
public class FeignConfig implements RequestInterceptor {

@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes =
(ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (null != attributes) {
HttpServletRequest request = attributes.getRequest();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
String value = request.getHeader(name);
if (name.equalsIgnoreCase("content-length")) {
continue;
}
requestTemplate.header(name, value);
}
requestTemplate.header(
RpcConstant.LINKIS_LOAD_BALANCER_TYPE, RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC);
Tuple2<String, String> userTicketKV =
SSOUtils$.MODULE$.getUserTicketKV(SecurityFilter$.MODULE$.OTHER_SYSTEM_IGNORE_UM_USER());
requestTemplate.header(userTicketKV._1, userTicketKV._2);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.rpc.constant;

public class RpcConstant {

public static final String LINKIS_LOAD_BALANCER_TYPE = "LinkisLoadBalancerType";

public static final String LINKIS_LOAD_BALANCER_TYPE_RPC = "RPC";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.exception.WarnException
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.protocol.Protocol
import org.apache.linkis.rpc.conf.DynamicFeignClient
import org.apache.linkis.rpc.conf.RPCConfiguration.{
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX,
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX,
Expand All @@ -30,19 +31,18 @@ import org.apache.linkis.rpc.conf.RPCConfiguration.{
import org.apache.linkis.rpc.interceptor._
import org.apache.linkis.rpc.transform.{RPCConsumer, RPCProduct}
import org.apache.linkis.server.Message
import org.apache.linkis.server.conf.ServerConfiguration

import java.util

import scala.concurrent.duration.Duration
import scala.runtime.BoxedUnit

import feign.{Feign, Retryer}
import feign.slf4j.Slf4jLogger

private[rpc] class BaseRPCSender extends Sender with Logging {
private var name: String = _
private var rpc: RPCReceiveRemote = _
private var dynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = _

protected def getRPCInterceptors: Array[RPCInterceptor] = Array.empty

Expand All @@ -67,18 +67,20 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
rpc
}

private def getDynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = {
if (dynamicFeignClient == null) this synchronized {
if (dynamicFeignClient == null) dynamicFeignClient = new DynamicFeignClient()
}
dynamicFeignClient
}

private[rpc] def getApplicationName = name

protected def doBuilder(builder: Feign.Builder): Unit =
builder.retryer(Retryer.NEVER_RETRY)

protected def newRPC: RPCReceiveRemote = {
val builder = Feign.builder.logger(new Slf4jLogger()).logLevel(feign.Logger.Level.FULL)
doBuilder(builder)
var url = if (name.startsWith("http://")) name else "http://" + name
if (url.endsWith("/")) url = url.substring(0, url.length - 1)
url += ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue
builder.target(classOf[RPCReceiveRemote], url)
getDynamicFeignClient.getFeignClient(classOf[RPCReceiveRemote], name)
}

private def execute(message: Any)(op: => Any): Any = message match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,31 @@

package org.apache.linkis.rpc

import org.apache.linkis.rpc.constant.RpcConstant
import org.apache.linkis.server.Message

import org.springframework.cloud.openfeign.FeignClient
import org.springframework.web.bind.annotation.{RequestBody, RequestMapping, RequestMethod}

@FeignClient(name = RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC)
private[rpc] trait RPCReceiveRemote {

@RequestMapping(value = Array("/rpc/receive"), method = Array(RequestMethod.POST))
@RequestMapping(
value = Array("${spring.mvc.servlet.path}/rpc/receive"),
method = Array(RequestMethod.POST)
)
def receive(@RequestBody message: Message): Message

@RequestMapping(value = Array("/rpc/receiveAndReply"), method = Array(RequestMethod.POST))
@RequestMapping(
value = Array("${spring.mvc.servlet.path}/rpc/receiveAndReply"),
method = Array(RequestMethod.POST)
)
def receiveAndReply(@RequestBody message: Message): Message

@RequestMapping(value = Array("/rpc/replyInMills"), method = Array(RequestMethod.POST))
@RequestMapping(
value = Array("${spring.mvc.servlet.path}/rpc/replyInMills"),
method = Array(RequestMethod.POST)
)
def receiveAndReplyInMills(@RequestBody message: Message): Message

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[rpc] object RPCSpringBeanCache extends Logging {
private var rpcServerLoader: RPCServerLoader = _
private var senderBuilders: Array[BroadcastSenderBuilder] = _
private var rpcReceiveRestful: RPCReceiveRestful = _
private var rpcReceiveRemote: RPCReceiveRemote = _

def registerReceiver(receiverName: String, receiver: Receiver): Unit = {
if (beanNameToReceivers == null) {
Expand Down Expand Up @@ -63,6 +64,13 @@ private[rpc] object RPCSpringBeanCache extends Logging {
rpcReceiveRestful
}

def getRPCReceiveRemote: RPCReceiveRemote = {
if (rpcReceiveRemote == null) {
rpcReceiveRemote = getApplicationContext.getBean(classOf[RPCReceiveRemote])
}
rpcReceiveRemote
}

private[rpc] def getReceivers: util.Map[String, Receiver] = {
if (beanNameToReceivers == null) {
beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private Response<ServiceInstance> getInstanceResponse(
ServiceInstance chooseInstance = null;
for (ServiceInstance instance : instances) {
if (Objects.equals(ipAndPort[0], instance.getHost())
&& Objects.equals(ipAndPort[1], instance.getPort())) {
&& Objects.equals(ipAndPort[1], String.valueOf(instance.getPort()))) {
chooseInstance = instance;
break;
}
Expand Down

0 comments on commit 7935cde

Please sign in to comment.