Skip to content

Commit

Permalink
Add EurekaClientCacheManualRefresher
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Nov 22, 2023
1 parent b5b5f1a commit d73b4d3
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.rpc.conf;

import org.apache.commons.lang.exception.ExceptionUtils;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.TimedSupervisorTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
public class EurekaClientCacheManualRefresher {
private static final Logger logger =
LoggerFactory.getLogger(EurekaClientCacheManualRefresher.class);
private final AtomicBoolean isRefreshing = new AtomicBoolean(false);
private final ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
private final String cacheRefreshTaskField = "cacheRefreshTask";
private TimedSupervisorTask cacheRefreshTask;

private long lastRefreshMillis = 0;
private final Duration refreshIntervalDuration = Duration.ofSeconds(3);

@Autowired private BeanFactory beanFactory;

public void refreshOnExceptions(Exception e, List<Class<? extends Exception>> clazzs) {
if (null == clazzs || clazzs.size() == 0) {
throw new IllegalArgumentException();
}

if (clazzs.stream()
.anyMatch(
clazz -> clazz.isInstance(e) || clazz.isInstance(ExceptionUtils.getRootCause(e)))) {
refresh();
}
}

public void refresh() {
if (isRefreshing.compareAndSet(false, true)) {
refreshExecutor.execute(
() -> {
try {
if (System.currentTimeMillis()
<= lastRefreshMillis + refreshIntervalDuration.toMillis()) {
logger.warn(
"Not manually refresh eureka client cache as refresh interval was not exceeded:{}",
refreshIntervalDuration.getSeconds());
return;
}

if (null == cacheRefreshTask) {
Field field =
ReflectionUtils.findField(DiscoveryClient.class, cacheRefreshTaskField);
if (null != field) {
ReflectionUtils.makeAccessible(field);
DiscoveryClient discoveryClient = beanFactory.getBean(DiscoveryClient.class);
cacheRefreshTask =
(TimedSupervisorTask) ReflectionUtils.getField(field, discoveryClient);
}
}

if (null == cacheRefreshTask) {
logger.error(
"Field ({}) not found in class '{}'",
cacheRefreshTaskField,
DiscoveryClient.class.getSimpleName());
return;
}

lastRefreshMillis = System.currentTimeMillis();
cacheRefreshTask.run();
logger.info(
"Manually refresh eureka client cache completed(DiscoveryClient.cacheRefreshTask#run())");
} catch (Exception e) {
logger.error("An exception occurred when manually refresh eureka client cache", e);
} finally {
isRefreshing.set(false);
}
});
} else {
logger.warn(
"Not manually refresh eureka client cache as another thread is refreshing it already");
}
}
}
4 changes: 4 additions & 0 deletions linkis-dist/package/conf/application-linkis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pagehelper:
params: countSql

spring:
cloud:
loadbalancer:
cache:
enabled: false
main:
allow-circular-references: true
mvc:
Expand Down
1 change: 1 addition & 0 deletions linkis-dist/package/conf/linkis.properties
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ wds.linkis.gateway.conf.metadataquery.list=metadatamanager,metadataquery
# note: org.springframework.cloud.config.client.ConfigServiceBootstrapConfiguration.configServicePropertySource need to disable
spring.spring.cloud.config.enabled=false
spring.spring.mvc.pathmatch.matching-strategy=ant_path_matcher
spring.spring.cloud.loadbalancer.cache.enabled=false

# linkis user ticket sso
# redis stand-alone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package org.apache.linkis.gateway.springcloud.loadbalancer;

import org.apache.linkis.gateway.springcloud.constant.GatewayConstant;
import org.apache.linkis.rpc.conf.EurekaClientCacheManualRefresher;
import org.apache.linkis.rpc.constant.RpcConstant;
import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary;
import org.apache.linkis.rpc.exception.NoInstanceExistsException;
import org.apache.linkis.rpc.sender.SpringCloudFeignConfigurationCache$;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.*;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
Expand All @@ -45,6 +49,9 @@
public class ServiceInstancePriorityLoadBalancer implements ReactorServiceInstanceLoadBalancer {

private static final Log log = LogFactory.getLog(ServiceInstancePriorityLoadBalancer.class);

@Autowired private EurekaClientCacheManualRefresher eurekaClientCacheManualRefresher;

private final String serviceId;

final AtomicInteger position;
Expand Down Expand Up @@ -78,22 +85,62 @@ public Mono<Response<ServiceInstance>> choose(Request request) {
return supplier
.get(request)
.next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, clientIp));
.map(
serviceInstances ->
processInstanceResponse(request, supplier, serviceInstances, clientIp));
}

private Response<ServiceInstance> processInstanceResponse(
Request request,
ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances,
String clientIp) {
Response<ServiceInstance> serviceInstanceResponse =
getInstanceResponse(serviceInstances, clientIp);
Long endTtime = System.currentTimeMillis() + 2 * 60 * 1000;

List<String> linkisLoadBalancerTypeList =
((RequestDataContext) request.getContext())
.getClientRequest()
.getHeaders()
.get(RpcConstant.LINKIS_LOAD_BALANCER_TYPE);
String linkisLoadBalancerType =
CollectionUtils.isNotEmpty(linkisLoadBalancerTypeList)
? linkisLoadBalancerTypeList.get(0)
: null;

while (null == serviceInstanceResponse
&& StringUtils.isNoneBlank(clientIp)
&& isRPC(linkisLoadBalancerType)
&& System.currentTimeMillis() < endTtime) {
eurekaClientCacheManualRefresher.refresh();
List<ServiceInstance> instances =
SpringCloudFeignConfigurationCache$.MODULE$.discoveryClient().getInstances(serviceId);
serviceInstanceResponse = getInstanceResponse(instances, clientIp);
if (null == serviceInstanceResponse) {
try {
this.wait(5000L);
eurekaClientCacheManualRefresher.refresh();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
break;
}
}

if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier)
.selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}

private boolean isRPC(String linkisLoadBalancerType) {
return StringUtils.isNotBlank(linkisLoadBalancerType)
&& linkisLoadBalancerType.equalsIgnoreCase(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC);
}

private Response<ServiceInstance> getInstanceResponse(
List<ServiceInstance> instances, String clientIp) {
if (instances.isEmpty()) {
Expand All @@ -119,15 +166,15 @@ private Response<ServiceInstance> getInstanceResponse(
break;
}
}
if (null == chooseInstance) {
throw new NoInstanceExistsException(
LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorCode(),
MessageFormat.format(
LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorDesc(),
clientIp,
serviceId));
} else {
return new DefaultResponse(chooseInstance);
}
// if (null == chooseInstance) {
// throw new NoInstanceExistsException(
// LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorCode(),
// MessageFormat.format(
// LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorDesc(),
// clientIp,
// serviceId));
// } else {
return new DefaultResponse(chooseInstance);
// }
}
}

0 comments on commit d73b4d3

Please sign in to comment.