From 7935cde23be63f63e765c4fec7be5aa5a5482471 Mon Sep 17 00:00:00 2001
From: ChengJie1053 <18033291053@163.com>
Date: Mon, 20 Nov 2023 21:26:12 +0800
Subject: [PATCH] Modifying linkis rpc
---
linkis-commons/linkis-module/pom.xml | 4 +
.../linkis/DataWorkCloudApplication.java | 2 +
.../linkis/rpc/conf/DynamicFeignClient.java | 126 ++++++++++++++++++
.../apache/linkis/rpc/conf/FeignConfig.java | 62 +++++++++
.../linkis/rpc/constant/RpcConstant.java | 25 ++++
.../org/apache/linkis/rpc/BaseRPCSender.scala | 18 +--
.../apache/linkis/rpc/RPCReceiveRemote.scala | 18 ++-
.../linkis/rpc/RPCSpringBeanCache.scala | 8 ++
.../ServiceInstancePriorityLoadBalancer.java | 2 +-
9 files changed, 253 insertions(+), 12 deletions(-)
create mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java
create mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignConfig.java
create mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java
diff --git a/linkis-commons/linkis-module/pom.xml b/linkis-commons/linkis-module/pom.xml
index d4ffc38e2c..c5b247b9be 100644
--- a/linkis-commons/linkis-module/pom.xml
+++ b/linkis-commons/linkis-module/pom.xml
@@ -266,6 +266,10 @@
jedis
${jedis.version}
+
+ org.springframework.cloud
+ spring-cloud-openfeign-core
+
diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/DataWorkCloudApplication.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/DataWorkCloudApplication.java
index 6f0256fdfa..5610ded412 100644
--- a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/DataWorkCloudApplication.java
+++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/DataWorkCloudApplication.java
@@ -41,6 +41,7 @@
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
+import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
@@ -66,6 +67,7 @@
@SpringBootApplication(scanBasePackages = {"org.apache.linkis", "com.webank.wedatasphere"})
@EnableDiscoveryClient
@RefreshScope
+@EnableFeignClients
public class DataWorkCloudApplication extends SpringBootServletInitializer {
private static final Log logger = LogFactory.getLog(DataWorkCloudApplication.class);
diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java
new file mode 100644
index 0000000000..dd1f6a7dc9
--- /dev/null
+++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/DynamicFeignClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.rpc.conf;
+
+import org.apache.linkis.DataWorkCloudApplication;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.cloud.openfeign.FeignClientBuilder;
+import org.springframework.cloud.openfeign.FeignClientFactoryBean;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+public class DynamicFeignClient {
+
+ private FeignClientBuilder feignClientBuilder;
+
+ private final ConcurrentHashMap CACHE_BEAN = new ConcurrentHashMap();
+
+ public DynamicFeignClient() {
+ this.feignClientBuilder =
+ new FeignClientBuilder(DataWorkCloudApplication.getApplicationContext());
+ }
+
+ public T getFeignClient(final Class type, final String serviceName) {
+ return getFeignClient(type, serviceName, null);
+ }
+
+ public T getFeignClient(
+ final Class type, final Class> fallbackFactory, final String serviceName) {
+ return getFeignClient(type, fallbackFactory, serviceName, null);
+ }
+
+ public T getFeignClient(
+ final Class type,
+ final FeignClientFactoryBean clientFactoryBean,
+ final String serviceName) {
+ return getFeignClient(type, clientFactoryBean, serviceName, null);
+ }
+
+ public T getFeignClient(final Class type, String serviceName, final String serviceUrl) {
+ String k = serviceName;
+ if (StringUtils.isNotEmpty(serviceUrl)) {
+ k = serviceUrl;
+ }
+ return CACHE_BEAN.computeIfAbsent(
+ k,
+ (t) -> {
+ FeignClientBuilder.Builder builder =
+ this.feignClientBuilder.forType(type, serviceName);
+ if (StringUtils.isNotEmpty(serviceUrl)) {
+ builder.url(serviceUrl);
+ }
+ return builder.build();
+ });
+ }
+
+ public T getFeignClient(
+ final Class type,
+ final Class> fallbackFactory,
+ final String serviceName,
+ final String serviceUrl) {
+ String k = serviceName;
+ if (StringUtils.isNotEmpty(serviceUrl)) {
+ k = serviceUrl;
+ }
+ return CACHE_BEAN.computeIfAbsent(
+ k,
+ (t) -> {
+ FeignClientFactoryBean feignClientFactoryBean = new FeignClientFactoryBean();
+ feignClientFactoryBean.setFallbackFactory(fallbackFactory);
+ FeignClientBuilder.Builder builder =
+ this.feignClientBuilder.forType(type, feignClientFactoryBean, serviceName);
+ if (StringUtils.isNotEmpty(serviceUrl)) {
+ builder.url(serviceUrl);
+ }
+ return builder.build();
+ });
+ }
+
+ public T getFeignClient(
+ final Class type,
+ final FeignClientFactoryBean clientFactoryBean,
+ final String serviceName,
+ final String serviceUrl) {
+ String k = serviceName;
+ if (StringUtils.isNotEmpty(serviceUrl)) {
+ k = serviceUrl;
+ }
+ return CACHE_BEAN.computeIfAbsent(
+ k,
+ (t) -> {
+ FeignClientBuilder.Builder builder =
+ this.feignClientBuilder.forType(type, clientFactoryBean, serviceName);
+ if (StringUtils.isNotEmpty(serviceUrl)) {
+ builder.url(serviceUrl);
+ }
+ return builder.build();
+ });
+ }
+
+ private T getFromCache(final String serviceName, final String serviceUrl) {
+ if (StringUtils.isNotEmpty(serviceUrl)) {
+ return CACHE_BEAN.get(serviceUrl);
+ } else {
+ return CACHE_BEAN.get(serviceName);
+ }
+ }
+}
diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignConfig.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignConfig.java
new file mode 100644
index 0000000000..1599969086
--- /dev/null
+++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/FeignConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.rpc.conf;
+
+import org.apache.linkis.rpc.constant.RpcConstant;
+import org.apache.linkis.server.security.SSOUtils$;
+import org.apache.linkis.server.security.SecurityFilter$;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.context.request.RequestContextHolder;
+import org.springframework.web.context.request.ServletRequestAttributes;
+
+import javax.servlet.http.HttpServletRequest;
+
+import java.util.Enumeration;
+
+import scala.Tuple2;
+
+import feign.RequestInterceptor;
+import feign.RequestTemplate;
+
+@Configuration
+public class FeignConfig implements RequestInterceptor {
+
+ @Override
+ public void apply(RequestTemplate requestTemplate) {
+ ServletRequestAttributes attributes =
+ (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
+ if (null != attributes) {
+ HttpServletRequest request = attributes.getRequest();
+ Enumeration headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String name = headerNames.nextElement();
+ String value = request.getHeader(name);
+ if (name.equalsIgnoreCase("content-length")) {
+ continue;
+ }
+ requestTemplate.header(name, value);
+ }
+ requestTemplate.header(
+ RpcConstant.LINKIS_LOAD_BALANCER_TYPE, RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC);
+ Tuple2 userTicketKV =
+ SSOUtils$.MODULE$.getUserTicketKV(SecurityFilter$.MODULE$.OTHER_SYSTEM_IGNORE_UM_USER());
+ requestTemplate.header(userTicketKV._1, userTicketKV._2);
+ }
+ }
+}
diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java
new file mode 100644
index 0000000000..e8fe3f5c4e
--- /dev/null
+++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/constant/RpcConstant.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.rpc.constant;
+
+public class RpcConstant {
+
+ public static final String LINKIS_LOAD_BALANCER_TYPE = "LinkisLoadBalancerType";
+
+ public static final String LINKIS_LOAD_BALANCER_TYPE_RPC = "RPC";
+}
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
index 149179f8b1..226a8c19e7 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.exception.WarnException
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.protocol.Protocol
+import org.apache.linkis.rpc.conf.DynamicFeignClient
import org.apache.linkis.rpc.conf.RPCConfiguration.{
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX,
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX,
@@ -30,7 +31,6 @@ import org.apache.linkis.rpc.conf.RPCConfiguration.{
import org.apache.linkis.rpc.interceptor._
import org.apache.linkis.rpc.transform.{RPCConsumer, RPCProduct}
import org.apache.linkis.server.Message
-import org.apache.linkis.server.conf.ServerConfiguration
import java.util
@@ -38,11 +38,11 @@ import scala.concurrent.duration.Duration
import scala.runtime.BoxedUnit
import feign.{Feign, Retryer}
-import feign.slf4j.Slf4jLogger
private[rpc] class BaseRPCSender extends Sender with Logging {
private var name: String = _
private var rpc: RPCReceiveRemote = _
+ private var dynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = _
protected def getRPCInterceptors: Array[RPCInterceptor] = Array.empty
@@ -67,18 +67,20 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
rpc
}
+ private def getDynamicFeignClient: DynamicFeignClient[RPCReceiveRemote] = {
+ if (dynamicFeignClient == null) this synchronized {
+ if (dynamicFeignClient == null) dynamicFeignClient = new DynamicFeignClient()
+ }
+ dynamicFeignClient
+ }
+
private[rpc] def getApplicationName = name
protected def doBuilder(builder: Feign.Builder): Unit =
builder.retryer(Retryer.NEVER_RETRY)
protected def newRPC: RPCReceiveRemote = {
- val builder = Feign.builder.logger(new Slf4jLogger()).logLevel(feign.Logger.Level.FULL)
- doBuilder(builder)
- var url = if (name.startsWith("http://")) name else "http://" + name
- if (url.endsWith("/")) url = url.substring(0, url.length - 1)
- url += ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue
- builder.target(classOf[RPCReceiveRemote], url)
+ getDynamicFeignClient.getFeignClient(classOf[RPCReceiveRemote], name)
}
private def execute(message: Any)(op: => Any): Any = message match {
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala
index c539652d31..7bf441ff3b 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRemote.scala
@@ -17,19 +17,31 @@
package org.apache.linkis.rpc
+import org.apache.linkis.rpc.constant.RpcConstant
import org.apache.linkis.server.Message
+import org.springframework.cloud.openfeign.FeignClient
import org.springframework.web.bind.annotation.{RequestBody, RequestMapping, RequestMethod}
+@FeignClient(name = RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC)
private[rpc] trait RPCReceiveRemote {
- @RequestMapping(value = Array("/rpc/receive"), method = Array(RequestMethod.POST))
+ @RequestMapping(
+ value = Array("${spring.mvc.servlet.path}/rpc/receive"),
+ method = Array(RequestMethod.POST)
+ )
def receive(@RequestBody message: Message): Message
- @RequestMapping(value = Array("/rpc/receiveAndReply"), method = Array(RequestMethod.POST))
+ @RequestMapping(
+ value = Array("${spring.mvc.servlet.path}/rpc/receiveAndReply"),
+ method = Array(RequestMethod.POST)
+ )
def receiveAndReply(@RequestBody message: Message): Message
- @RequestMapping(value = Array("/rpc/replyInMills"), method = Array(RequestMethod.POST))
+ @RequestMapping(
+ value = Array("${spring.mvc.servlet.path}/rpc/replyInMills"),
+ method = Array(RequestMethod.POST)
+ )
def receiveAndReplyInMills(@RequestBody message: Message): Message
}
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
index 00fa019d99..ff542aaad5 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
@@ -33,6 +33,7 @@ private[rpc] object RPCSpringBeanCache extends Logging {
private var rpcServerLoader: RPCServerLoader = _
private var senderBuilders: Array[BroadcastSenderBuilder] = _
private var rpcReceiveRestful: RPCReceiveRestful = _
+ private var rpcReceiveRemote: RPCReceiveRemote = _
def registerReceiver(receiverName: String, receiver: Receiver): Unit = {
if (beanNameToReceivers == null) {
@@ -63,6 +64,13 @@ private[rpc] object RPCSpringBeanCache extends Logging {
rpcReceiveRestful
}
+ def getRPCReceiveRemote: RPCReceiveRemote = {
+ if (rpcReceiveRemote == null) {
+ rpcReceiveRemote = getApplicationContext.getBean(classOf[RPCReceiveRemote])
+ }
+ rpcReceiveRemote
+ }
+
private[rpc] def getReceivers: util.Map[String, Receiver] = {
if (beanNameToReceivers == null) {
beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java
index 673559789c..2fdbfa97a1 100644
--- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java
+++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java
@@ -114,7 +114,7 @@ private Response getInstanceResponse(
ServiceInstance chooseInstance = null;
for (ServiceInstance instance : instances) {
if (Objects.equals(ipAndPort[0], instance.getHost())
- && Objects.equals(ipAndPort[1], instance.getPort())) {
+ && Objects.equals(ipAndPort[1], String.valueOf(instance.getPort()))) {
chooseInstance = instance;
break;
}