diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java new file mode 100644 index 0000000000..4473ceffe0 --- /dev/null +++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/conf/EurekaClientCacheManualRefresher.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.rpc.conf; + +import org.apache.commons.lang.exception.ExceptionUtils; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.netflix.discovery.DiscoveryClient; +import com.netflix.discovery.TimedSupervisorTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component +public class EurekaClientCacheManualRefresher { + private static final Logger logger = + LoggerFactory.getLogger(EurekaClientCacheManualRefresher.class); + private final AtomicBoolean isRefreshing = new AtomicBoolean(false); + private final ExecutorService refreshExecutor = Executors.newSingleThreadExecutor(); + private final String cacheRefreshTaskField = "cacheRefreshTask"; + private TimedSupervisorTask cacheRefreshTask; + + private long lastRefreshMillis = 0; + private final Duration refreshIntervalDuration = Duration.ofSeconds(3); + + @Autowired private BeanFactory beanFactory; + + public void refreshOnExceptions(Exception e, List> clazzs) { + if (null == clazzs || clazzs.size() == 0) { + throw new IllegalArgumentException(); + } + + if (clazzs.stream() + .anyMatch( + clazz -> clazz.isInstance(e) || clazz.isInstance(ExceptionUtils.getRootCause(e)))) { + refresh(); + } + } + + public void refresh() { + if (isRefreshing.compareAndSet(false, true)) { + refreshExecutor.execute( + () -> { + try { + if (System.currentTimeMillis() + <= lastRefreshMillis + refreshIntervalDuration.toMillis()) { + logger.warn( + "Not manually refresh eureka client cache as refresh interval was not exceeded:{}", + refreshIntervalDuration.getSeconds()); + return; + } + + if (null == cacheRefreshTask) { + Field field = + ReflectionUtils.findField(DiscoveryClient.class, cacheRefreshTaskField); + if (null != field) { + ReflectionUtils.makeAccessible(field); + DiscoveryClient discoveryClient = beanFactory.getBean(DiscoveryClient.class); + cacheRefreshTask = + (TimedSupervisorTask) ReflectionUtils.getField(field, discoveryClient); + } + } + + if (null == cacheRefreshTask) { + logger.error( + "Field ({}) not found in class '{}'", + cacheRefreshTaskField, + DiscoveryClient.class.getSimpleName()); + return; + } + + lastRefreshMillis = System.currentTimeMillis(); + cacheRefreshTask.run(); + logger.info( + "Manually refresh eureka client cache completed(DiscoveryClient.cacheRefreshTask#run())"); + } catch (Exception e) { + logger.error("An exception occurred when manually refresh eureka client cache", e); + } finally { + isRefreshing.set(false); + } + }); + } else { + logger.warn( + "Not manually refresh eureka client cache as another thread is refreshing it already"); + } + } +} diff --git a/linkis-dist/package/conf/application-linkis.yml b/linkis-dist/package/conf/application-linkis.yml index a5bb4adf7c..82fb281211 100644 --- a/linkis-dist/package/conf/application-linkis.yml +++ b/linkis-dist/package/conf/application-linkis.yml @@ -42,6 +42,10 @@ pagehelper: params: countSql spring: + cloud: + loadbalancer: + cache: + enabled: false main: allow-circular-references: true mvc: diff --git a/linkis-dist/package/conf/linkis.properties b/linkis-dist/package/conf/linkis.properties index 191fee88a4..30904622b2 100644 --- a/linkis-dist/package/conf/linkis.properties +++ b/linkis-dist/package/conf/linkis.properties @@ -85,6 +85,7 @@ wds.linkis.gateway.conf.metadataquery.list=metadatamanager,metadataquery # note: org.springframework.cloud.config.client.ConfigServiceBootstrapConfiguration.configServicePropertySource need to disable spring.spring.cloud.config.enabled=false spring.spring.mvc.pathmatch.matching-strategy=ant_path_matcher +spring.spring.cloud.loadbalancer.cache.enabled=false # linkis user ticket sso # redis stand-alone diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java index 2fdbfa97a1..9bf363a287 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java @@ -18,8 +18,11 @@ package org.apache.linkis.gateway.springcloud.loadbalancer; import org.apache.linkis.gateway.springcloud.constant.GatewayConstant; +import org.apache.linkis.rpc.conf.EurekaClientCacheManualRefresher; +import org.apache.linkis.rpc.constant.RpcConstant; import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary; import org.apache.linkis.rpc.exception.NoInstanceExistsException; +import org.apache.linkis.rpc.sender.SpringCloudFeignConfigurationCache$; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -27,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.*; import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier; @@ -45,6 +49,9 @@ public class ServiceInstancePriorityLoadBalancer implements ReactorServiceInstanceLoadBalancer { private static final Log log = LogFactory.getLog(ServiceInstancePriorityLoadBalancer.class); + + @Autowired private EurekaClientCacheManualRefresher eurekaClientCacheManualRefresher; + private final String serviceId; final AtomicInteger position; @@ -78,15 +85,50 @@ public Mono> choose(Request request) { return supplier .get(request) .next() - .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, clientIp)); + .map( + serviceInstances -> + processInstanceResponse(request, supplier, serviceInstances, clientIp)); } private Response processInstanceResponse( + Request request, ServiceInstanceListSupplier supplier, List serviceInstances, String clientIp) { Response serviceInstanceResponse = getInstanceResponse(serviceInstances, clientIp); + Long endTtime = System.currentTimeMillis() + 2 * 60 * 1000; + + List linkisLoadBalancerTypeList = + ((RequestDataContext) request.getContext()) + .getClientRequest() + .getHeaders() + .get(RpcConstant.LINKIS_LOAD_BALANCER_TYPE); + String linkisLoadBalancerType = + CollectionUtils.isNotEmpty(linkisLoadBalancerTypeList) + ? linkisLoadBalancerTypeList.get(0) + : null; + + while (null == serviceInstanceResponse + && StringUtils.isNoneBlank(clientIp) + && isRPC(linkisLoadBalancerType) + && System.currentTimeMillis() < endTtime) { + eurekaClientCacheManualRefresher.refresh(); + List instances = + SpringCloudFeignConfigurationCache$.MODULE$.discoveryClient().getInstances(serviceId); + serviceInstanceResponse = getInstanceResponse(instances, clientIp); + if (null == serviceInstanceResponse) { + try { + this.wait(5000L); + eurekaClientCacheManualRefresher.refresh(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + break; + } + } + if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) { ((SelectedInstanceCallback) supplier) .selectedServiceInstance(serviceInstanceResponse.getServer()); @@ -94,6 +136,11 @@ private Response processInstanceResponse( return serviceInstanceResponse; } + private boolean isRPC(String linkisLoadBalancerType) { + return StringUtils.isNotBlank(linkisLoadBalancerType) + && linkisLoadBalancerType.equalsIgnoreCase(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC); + } + private Response getInstanceResponse( List instances, String clientIp) { if (instances.isEmpty()) { @@ -119,15 +166,15 @@ private Response getInstanceResponse( break; } } - if (null == chooseInstance) { - throw new NoInstanceExistsException( - LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorCode(), - MessageFormat.format( - LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorDesc(), - clientIp, - serviceId)); - } else { - return new DefaultResponse(chooseInstance); - } + // if (null == chooseInstance) { + // throw new NoInstanceExistsException( + // LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorCode(), + // MessageFormat.format( + // LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorDesc(), + // clientIp, + // serviceId)); + // } else { + return new DefaultResponse(chooseInstance); + // } } }