Skip to content

Commit

Permalink
[WIP] Spring cloud version upgraded and loadBalancer replaced ribbon
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Nov 7, 2023
1 parent b39a08e commit c7c86d2
Show file tree
Hide file tree
Showing 22 changed files with 433 additions and 430 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.swagger;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.servlet.mvc.method.RequestMappingInfoHandlerMapping;

import java.lang.reflect.Field;
import java.util.List;
import java.util.stream.Collectors;

import springfox.documentation.spring.web.plugins.WebFluxRequestHandlerProvider;
import springfox.documentation.spring.web.plugins.WebMvcRequestHandlerProvider;

@Configuration
public class SwaggerBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof WebMvcRequestHandlerProvider
|| bean instanceof WebFluxRequestHandlerProvider) {
List<RequestMappingInfoHandlerMapping> handlerMappings = getHandlerMappings(bean);
customizeSpringfoxHandlerMappings(handlerMappings);
}
return bean;
}

private <T extends RequestMappingInfoHandlerMapping> void customizeSpringfoxHandlerMappings(
List<T> mappings) {
List<T> copy =
mappings.stream()
.filter(mapping -> mapping.getPatternParser() == null)
.collect(Collectors.toList());
mappings.clear();
mappings.addAll(copy);
}

@SuppressWarnings("unchecked")
private List<RequestMappingInfoHandlerMapping> getHandlerMappings(Object bean) {
try {
Field field = ReflectionUtils.findField(bean.getClass(), "handlerMappings");
field.setAccessible(true);
return (List<RequestMappingInfoHandlerMapping>) field.get(bean);
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.linkis.rpc

import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.rpc.interceptor.{RPCInterceptor, RPCLoadBalancer, RPCServerLoader}
import org.apache.linkis.rpc.interceptor.{RPCInterceptor, RPCServerLoader}
import org.apache.linkis.rpc.interceptor.common.BroadcastSenderBuilder

import java.util
Expand All @@ -30,7 +30,6 @@ private[rpc] object RPCSpringBeanCache extends Logging {
import DataWorkCloudApplication.getApplicationContext
private var beanNameToReceivers: util.Map[String, Receiver] = _
private var rpcInterceptors: Array[RPCInterceptor] = _
private var rpcLoadBalancers: Array[RPCLoadBalancer] = _
private var rpcServerLoader: RPCServerLoader = _
private var senderBuilders: Array[BroadcastSenderBuilder] = _
private var rpcReceiveRestful: RPCReceiveRestful = _
Expand Down Expand Up @@ -83,18 +82,6 @@ private[rpc] object RPCSpringBeanCache extends Logging {
rpcInterceptors
}

private[rpc] def getRPCLoadBalancers: Array[RPCLoadBalancer] = {
if (rpcLoadBalancers == null) {
rpcLoadBalancers = getApplicationContext
.getBeansOfType(classOf[RPCLoadBalancer])
.asScala
.map(_._2)
.toArray
.sortBy(_.order)
}
rpcLoadBalancers
}

private[rpc] def getRPCServerLoader: RPCServerLoader = {
if (rpcServerLoader == null) {
rpcServerLoader = getApplicationContext.getBean(classOf[RPCServerLoader])
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,11 @@ import java.text.MessageFormat
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration

import com.netflix.loadbalancer.{DynamicServerListLoadBalancer, ILoadBalancer, Server}

trait RPCServerLoader {

@throws[NoInstanceExistsException]
def getOrRefreshServiceInstance(serviceInstance: ServiceInstance): Unit

@throws[NoInstanceExistsException]
def getServer(lb: ILoadBalancer, serviceInstance: ServiceInstance): Server

def getServiceInstances(applicationName: String): Array[ServiceInstance]

}
Expand All @@ -50,19 +45,12 @@ abstract class AbstractRPCServerLoader extends RPCServerLoader with Logging {

def refreshAllServers(): Unit

protected def refreshServerList(lb: ILoadBalancer): Unit = {
refreshAllServers()
lb match {
case d: DynamicServerListLoadBalancer[_] => d.updateListOfServers()
case _ =>
}
}

private def getOrRefresh(
refresh: => Unit,
refreshed: => Boolean,
serviceInstance: ServiceInstance
): Unit = {

val instanceNotExists = new NoInstanceExistsException(
APPLICATION_IS_NOT_EXISTS.getErrorCode,
MessageFormat.format(
Expand Down Expand Up @@ -101,15 +89,6 @@ abstract class AbstractRPCServerLoader extends RPCServerLoader with Logging {
serviceInstance
)

override def getServer(lb: ILoadBalancer, serviceInstance: ServiceInstance): Server = {
getOrRefresh(
refreshServerList(lb),
lb.getAllServers.asScala.exists(_.getHostPort == serviceInstance.getInstance),
serviceInstance
)
lb.getAllServers.asScala.find(_.getHostPort == serviceInstance.getInstance).get
}

def getDWCServiceInstance(serviceInstance: SpringCloudServiceInstance): ServiceInstance

override def getServiceInstances(applicationName: String): Array[ServiceInstance] =
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit c7c86d2

Please sign in to comment.