diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/swagger/SwaggerBeanPostProcessor.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/swagger/SwaggerBeanPostProcessor.java new file mode 100644 index 0000000000..f07b7ed0a6 --- /dev/null +++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/swagger/SwaggerBeanPostProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.swagger; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.ReflectionUtils; +import org.springframework.web.servlet.mvc.method.RequestMappingInfoHandlerMapping; + +import java.lang.reflect.Field; +import java.util.List; +import java.util.stream.Collectors; + +import springfox.documentation.spring.web.plugins.WebFluxRequestHandlerProvider; +import springfox.documentation.spring.web.plugins.WebMvcRequestHandlerProvider; + +@Configuration +public class SwaggerBeanPostProcessor implements BeanPostProcessor { + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof WebMvcRequestHandlerProvider + || bean instanceof WebFluxRequestHandlerProvider) { + List handlerMappings = getHandlerMappings(bean); + customizeSpringfoxHandlerMappings(handlerMappings); + } + return bean; + } + + private void customizeSpringfoxHandlerMappings( + List mappings) { + List copy = + mappings.stream() + .filter(mapping -> mapping.getPatternParser() == null) + .collect(Collectors.toList()); + mappings.clear(); + mappings.addAll(copy); + } + + @SuppressWarnings("unchecked") + private List getHandlerMappings(Object bean) { + try { + Field field = ReflectionUtils.findField(bean.getClass(), "handlerMappings"); + field.setAccessible(true); + return (List) field.get(bean); + } catch (IllegalArgumentException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java deleted file mode 100644 index f022fc8c7f..0000000000 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.message.utils; - -import org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient; - -import java.lang.reflect.Field; - -import feign.Request.Options; - -public class LoadBalancerOptionsUtils { - - private static Options DEFAULT_OPTIONS = null; - - private static Object locker = new Object(); - - public static Options getDefaultOptions() throws NoSuchFieldException, IllegalAccessException { - if (null == DEFAULT_OPTIONS) { - synchronized (locker) { - Class clazz = LoadBalancerFeignClient.class; - Field optionField = clazz.getDeclaredField("DEFAULT_OPTIONS"); - optionField.setAccessible(true); - Object o = optionField.get(clazz); - DEFAULT_OPTIONS = (Options) o; - } - } - return DEFAULT_OPTIONS; - } -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala index aa92605f9b..00fa019d99 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala @@ -19,7 +19,7 @@ package org.apache.linkis.rpc import org.apache.linkis.DataWorkCloudApplication import org.apache.linkis.common.utils.Logging -import org.apache.linkis.rpc.interceptor.{RPCInterceptor, RPCLoadBalancer, RPCServerLoader} +import org.apache.linkis.rpc.interceptor.{RPCInterceptor, RPCServerLoader} import org.apache.linkis.rpc.interceptor.common.BroadcastSenderBuilder import java.util @@ -30,7 +30,6 @@ private[rpc] object RPCSpringBeanCache extends Logging { import DataWorkCloudApplication.getApplicationContext private var beanNameToReceivers: util.Map[String, Receiver] = _ private var rpcInterceptors: Array[RPCInterceptor] = _ - private var rpcLoadBalancers: Array[RPCLoadBalancer] = _ private var rpcServerLoader: RPCServerLoader = _ private var senderBuilders: Array[BroadcastSenderBuilder] = _ private var rpcReceiveRestful: RPCReceiveRestful = _ @@ -83,18 +82,6 @@ private[rpc] object RPCSpringBeanCache extends Logging { rpcInterceptors } - private[rpc] def getRPCLoadBalancers: Array[RPCLoadBalancer] = { - if (rpcLoadBalancers == null) { - rpcLoadBalancers = getApplicationContext - .getBeansOfType(classOf[RPCLoadBalancer]) - .asScala - .map(_._2) - .toArray - .sortBy(_.order) - } - rpcLoadBalancers - } - private[rpc] def getRPCServerLoader: RPCServerLoader = { if (rpcServerLoader == null) { rpcServerLoader = getApplicationContext.getBean(classOf[RPCServerLoader]) diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/RPCLoadBalancer.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/RPCLoadBalancer.scala deleted file mode 100644 index 7ae331265b..0000000000 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/RPCLoadBalancer.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.interceptor - -import org.apache.linkis.common.ServiceInstance -import org.apache.linkis.protocol.Protocol - -import com.netflix.loadbalancer.ILoadBalancer - -trait RPCLoadBalancer { - - val order: Int - - def choose( - protocol: Protocol, - originService: ServiceInstance, - lb: ILoadBalancer - ): Option[ServiceInstance] - -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/RPCServerLoader.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/RPCServerLoader.scala index 8cab6d7d0f..e4259466b1 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/RPCServerLoader.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/RPCServerLoader.scala @@ -28,16 +28,11 @@ import java.text.MessageFormat import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration -import com.netflix.loadbalancer.{DynamicServerListLoadBalancer, ILoadBalancer, Server} - trait RPCServerLoader { @throws[NoInstanceExistsException] def getOrRefreshServiceInstance(serviceInstance: ServiceInstance): Unit - @throws[NoInstanceExistsException] - def getServer(lb: ILoadBalancer, serviceInstance: ServiceInstance): Server - def getServiceInstances(applicationName: String): Array[ServiceInstance] } @@ -50,19 +45,12 @@ abstract class AbstractRPCServerLoader extends RPCServerLoader with Logging { def refreshAllServers(): Unit - protected def refreshServerList(lb: ILoadBalancer): Unit = { - refreshAllServers() - lb match { - case d: DynamicServerListLoadBalancer[_] => d.updateListOfServers() - case _ => - } - } - private def getOrRefresh( refresh: => Unit, refreshed: => Boolean, serviceInstance: ServiceInstance ): Unit = { + val instanceNotExists = new NoInstanceExistsException( APPLICATION_IS_NOT_EXISTS.getErrorCode, MessageFormat.format( @@ -101,15 +89,6 @@ abstract class AbstractRPCServerLoader extends RPCServerLoader with Logging { serviceInstance ) - override def getServer(lb: ILoadBalancer, serviceInstance: ServiceInstance): Server = { - getOrRefresh( - refreshServerList(lb), - lb.getAllServers.asScala.exists(_.getHostPort == serviceInstance.getInstance), - serviceInstance - ) - lb.getAllServers.asScala.find(_.getHostPort == serviceInstance.getInstance).get - } - def getDWCServiceInstance(serviceInstance: SpringCloudServiceInstance): ServiceInstance override def getServiceInstances(applicationName: String): Array[ServiceInstance] = diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/InstanceRPCLoadBalancer.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/InstanceRPCLoadBalancer.scala deleted file mode 100644 index 6cdac0df9f..0000000000 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/InstanceRPCLoadBalancer.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.interceptor.common - -import org.apache.linkis.common.ServiceInstance -import org.apache.linkis.protocol.{InstanceProtocol, Protocol} -import org.apache.linkis.rpc.interceptor.RPCLoadBalancer - -import org.springframework.stereotype.Component - -import com.netflix.loadbalancer.ILoadBalancer - -@Component -class InstanceRPCLoadBalancer extends RPCLoadBalancer { - override val order: Int = 10 - - override def choose( - protocol: Protocol, - originService: ServiceInstance, - lb: ILoadBalancer - ): Option[ServiceInstance] = protocol match { - case instance: InstanceProtocol => - instance.choseInstance.map(ServiceInstance(originService.getApplicationName, _)) - case _ => None - } - -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/SingleInstanceRPCLoadBalancer.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/SingleInstanceRPCLoadBalancer.scala deleted file mode 100644 index b007838ea6..0000000000 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/SingleInstanceRPCLoadBalancer.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.interceptor.common - -import org.apache.linkis.common.ServiceInstance -import org.apache.linkis.common.utils.Logging -import org.apache.linkis.protocol.{Protocol, SingleInstanceProtocol} -import org.apache.linkis.rpc.interceptor.RPCLoadBalancer - -import org.apache.commons.lang3.StringUtils - -import org.springframework.stereotype.Component - -import com.netflix.loadbalancer.ILoadBalancer - -@Component -class SingleInstanceRPCLoadBalancer extends RPCLoadBalancer with Logging { - override val order: Int = 20 - - override def choose( - protocol: Protocol, - originService: ServiceInstance, - lb: ILoadBalancer - ): Option[ServiceInstance] = protocol match { - case _: SingleInstanceProtocol => - if (StringUtils.isEmpty(originService.getInstance)) synchronized { - if (StringUtils.isEmpty(originService.getInstance)) { - val servers = lb.getAllServers - val server = servers.get((math.random * servers.size()).toInt) - originService.setInstance(server.getHostPort) - logger.warn( - originService.getApplicationName + " choose " + server.getHostPort + " to build a single instance connection." - ) - } - } - Some(originService) - case _ => None - } - -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancer.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancer.scala new file mode 100644 index 0000000000..0c869093f9 --- /dev/null +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancer.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.rpc.sender + +import org.springframework.beans.factory.ObjectProvider +import org.springframework.cloud.client.ServiceInstance +import org.springframework.cloud.client.loadbalancer.{ + DefaultResponse, + EmptyResponse, + Request, + Response +} +import org.springframework.cloud.loadbalancer.core.{ + ReactorServiceInstanceLoadBalancer, + ServiceInstanceListSupplier +} + +import java.util + +import scala.collection.JavaConverters._ + +import reactor.core.publisher.Mono + +private class LinkisLoadBalancer( + serviceInstance: ServiceInstance, + serviceInstanceListSupplierProvider: ObjectProvider[ServiceInstanceListSupplier] +) extends ReactorServiceInstanceLoadBalancer { + + override def choose(request: Request[_]): Mono[Response[ServiceInstance]] = { + val supplier: ServiceInstanceListSupplier = serviceInstanceListSupplierProvider.getIfAvailable() + supplier.get(request).next.map(this.getInstanceResponse) + } + + private def getInstanceResponse( + instances: util.List[ServiceInstance] + ): Response[ServiceInstance] = { + if (instances.isEmpty) { + new EmptyResponse + } + var instanceResult: ServiceInstance = null + + instances.asScala.find(instance => + serviceInstance.equals(instance.getInstanceId) && + serviceInstance.getHost.equals(instance.getHost) && + serviceInstance.getPort == instance.getPort + ) match { + case Some(instance) => instanceResult = instance + case _ => + } + +// if (instanceResult == null) instanceResult = instances.get(0) + if (instanceResult == null) { + // 则重试 + +// getOrRefresh( +// refreshAllServers(), +// getServiceInstances(serviceInstance.getApplicationName).contains(serviceInstance), +// serviceInstance +// ) + } + new DefaultResponse(instanceResult) + } + +} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancerClientFactory.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancerClientFactory.scala new file mode 100644 index 0000000000..53cdc08b4b --- /dev/null +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancerClientFactory.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.rpc.sender + +import org.apache.linkis.common.ServiceInstance + +import org.apache.commons.lang3.StringUtils + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.cloud.client.DefaultServiceInstance +import org.springframework.cloud.client.loadbalancer.LoadBalancerClientsProperties +import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer +import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier +import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory +import org.springframework.core.env.Environment + +private class LinkisLoadBalancerClientFactory( + serviceInstance: ServiceInstance, + loadBalancerClientsProperties: LoadBalancerClientsProperties +) extends LoadBalancerClientFactory(loadBalancerClientsProperties: LoadBalancerClientsProperties) { + + @Autowired + private var env: Environment = _ + + @Autowired + private var loadBalancerClientFactory: LoadBalancerClientFactory = _ + + override def getInstance( + serviceId: String + ): ReactiveLoadBalancer[org.springframework.cloud.client.ServiceInstance] = { + if (null != serviceInstance && StringUtils.isNotBlank(serviceInstance.getInstance)) { + val hostAndPort: Array[String] = serviceInstance.getInstance.split(":") + val defaultServiceInstance: DefaultServiceInstance = new DefaultServiceInstance( + serviceInstance.getApplicationName, + serviceId, + hostAndPort.head, + hostAndPort.last.toInt, + true + ) + val name: String = env.getProperty(LoadBalancerClientFactory.PROPERTY_NAME) + new LinkisLoadBalancer( + defaultServiceInstance, + super.getLazyProvider(name, classOf[ServiceInstanceListSupplier]) + ) + } else { + super.getInstance(serviceId) + } + + } + +} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala index 06f13c70a9..b8b41524d5 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala @@ -24,7 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.AutoConfigureBefore import org.springframework.cloud.client.discovery.DiscoveryClient import org.springframework.cloud.client.loadbalancer.LoadBalancedRetryFactory -import org.springframework.cloud.netflix.ribbon.SpringClientFactory +import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory import org.springframework.cloud.openfeign.FeignClientsConfiguration import org.springframework.context.annotation.{Configuration, Import} @@ -48,7 +48,7 @@ class SpringCloudFeignConfigurationCache( private var discoveryClient: DiscoveryClient = _ @Autowired - private var clientFactory: SpringClientFactory = _ + private var loadBalancerClientFactory: LoadBalancerClientFactory = _ @Autowired(required = false) private var loadBalancedRetryFactory: LoadBalancedRetryFactory = _ @@ -56,7 +56,7 @@ class SpringCloudFeignConfigurationCache( @PostConstruct def storeFeignConfiguration(): Unit = { SpringCloudFeignConfigurationCache.client = client - SpringCloudFeignConfigurationCache.clientFactory = clientFactory + SpringCloudFeignConfigurationCache.loadBalancerClientFactory = loadBalancerClientFactory SpringCloudFeignConfigurationCache.loadBalancedRetryFactory = loadBalancedRetryFactory SpringCloudFeignConfigurationCache.contract = contract SpringCloudFeignConfigurationCache.decoder = decoder @@ -71,7 +71,9 @@ private[linkis] object SpringCloudFeignConfigurationCache { private[SpringCloudFeignConfigurationCache] var decoder: Decoder = _ private[SpringCloudFeignConfigurationCache] var contract: Contract = _ private[SpringCloudFeignConfigurationCache] var client: Client = _ - private[SpringCloudFeignConfigurationCache] var clientFactory: SpringClientFactory = _ + + private[SpringCloudFeignConfigurationCache] var loadBalancerClientFactory + : LoadBalancerClientFactory = _ private[SpringCloudFeignConfigurationCache] var loadBalancedRetryFactory : LoadBalancedRetryFactory = _ @@ -92,7 +94,7 @@ private[linkis] object SpringCloudFeignConfigurationCache { client } - private[rpc] def getClientFactory = clientFactory + private[rpc] def getLoadloadBalancerClientFactory = loadBalancerClientFactory private[rpc] def getLoadBalancedRetryFactory = loadBalancedRetryFactory private[linkis] def getDiscoveryClient = { diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala index ab4f2d7fe3..490aa73569 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala @@ -19,32 +19,17 @@ package org.apache.linkis.rpc.sender import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.conf.{Configuration => DWCConfiguration} -import org.apache.linkis.protocol.Protocol import org.apache.linkis.rpc.{BaseRPCSender, RPCMessageEvent, RPCSpringBeanCache} -import org.apache.linkis.rpc.conf.RPCConfiguration -import org.apache.linkis.rpc.interceptor.{ - RPCInterceptor, - RPCLoadBalancer, - ServiceInstanceRPCInterceptorChain -} -import org.apache.linkis.rpc.message.utils.LoadBalancerOptionsUtils -import org.apache.linkis.rpc.transform.RPCConsumer -import org.apache.linkis.server.{BDPJettyServerHelper, Message} +import org.apache.linkis.rpc.interceptor.{RPCInterceptor, ServiceInstanceRPCInterceptorChain} import org.apache.commons.lang3.StringUtils -import org.springframework.cloud.netflix.ribbon.ServerIntrospector -import org.springframework.cloud.openfeign.ribbon.{ - CachingSpringLoadBalancerFactory, - FeignLoadBalancer, - LoadBalancerFeignClient -} - -import java.lang.reflect.Field +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.cloud.client.loadbalancer.LoadBalancerClientsProperties +import org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient +import org.springframework.cloud.openfeign.loadbalancer.FeignBlockingLoadBalancerClient +import org.springframework.core.env.Environment -import com.netflix.client.ClientRequest -import com.netflix.client.config.IClientConfig -import com.netflix.loadbalancer.reactive.LoadBalancerCommand import feign._ private[rpc] class SpringMVCRPCSender private[rpc] ( @@ -59,70 +44,25 @@ private[rpc] class SpringMVCRPCSender private[rpc] ( override protected def createRPCInterceptorChain() = new ServiceInstanceRPCInterceptorChain(0, getRPCInterceptors, serviceInstance) - protected def getRPCLoadBalancers: Array[RPCLoadBalancer] = - RPCSpringBeanCache.getRPCLoadBalancers + @Autowired + private var env: Environment = _ override protected def doBuilder(builder: Feign.Builder): Unit = { - val client = getClient.asInstanceOf[LoadBalancerFeignClient] - val newClient = new LoadBalancerFeignClient( + val client = getClient.asInstanceOf[FeignBlockingLoadBalancerClient] + val loadBalancerClientFactory = + new LinkisLoadBalancerClientFactory(serviceInstance, new LoadBalancerClientsProperties) + + val blockingLoadBalancerClient: BlockingLoadBalancerClient = new BlockingLoadBalancerClient( + loadBalancerClientFactory + ) + + val newClient = new FeignBlockingLoadBalancerClient( client.getDelegate, - new CachingSpringLoadBalancerFactory(getClientFactory) { - override def create(clientName: String): FeignLoadBalancer = { - val serverIntrospector = - getClientFactory.getInstance(clientName, classOf[ServerIntrospector]) - new FeignLoadBalancer( - getClientFactory.getLoadBalancer(clientName), - getClientFactory.getClientConfig(clientName), - serverIntrospector - ) { - override def customizeLoadBalancerCommandBuilder( - request: FeignLoadBalancer.RibbonRequest, - config: IClientConfig, - builder: LoadBalancerCommand.Builder[FeignLoadBalancer.RibbonResponse] - ): Unit = { - val instance = - if (getRPCLoadBalancers.isEmpty) None - else { - val requestBody = SpringMVCRPCSender.getRequest(request).body() - val requestStr = new String(requestBody, DWCConfiguration.BDP_ENCODING.getValue) - val obj = RPCConsumer.getRPCConsumer.toObject( - BDPJettyServerHelper.gson.fromJson(requestStr, classOf[Message]) - ) - obj match { - case protocol: Protocol => - var serviceInstance: Option[ServiceInstance] = None - for (lb <- getRPCLoadBalancers if serviceInstance.isEmpty) - serviceInstance = lb.choose( - protocol, - SpringMVCRPCSender.this.serviceInstance, - getLoadBalancer - ) - serviceInstance.foreach(f => - logger.info( - "origin serviceInstance: " + SpringMVCRPCSender.this.serviceInstance + ", chose serviceInstance: " + f - ) - ) // TODO just for test - serviceInstance - case _ => None - } - } - instance - .orElse(Option(SpringMVCRPCSender.this.serviceInstance)) - .filter(s => StringUtils.isNotBlank(s.getInstance)) - .foreach { serviceInstance => - val server = RPCSpringBeanCache.getRPCServerLoader - .getServer(getLoadBalancer, serviceInstance) - builder.withServer(server) - } - } - } - } - }, - getClientFactory + blockingLoadBalancerClient, + // getLoadBalancedRetryFactory, + loadBalancerClientFactory ) - if (RPCConfiguration.ENABLE_SPRING_PARAMS) { - builder.options(LoadBalancerOptionsUtils.getDefaultOptions) - } + super.doBuilder(builder) builder .contract(getContract) @@ -160,18 +100,3 @@ private[rpc] class SpringMVCRPCSender private[rpc] ( } else s"RPCSender($getApplicationName, ${serviceInstance.getInstance})" } - -private object SpringMVCRPCSender { - private var requestField: Field = _ - - def getRequest(req: ClientRequest): Request = { - if (requestField == null) synchronized { - if (requestField == null) { - requestField = req.getClass.getDeclaredField("request") - requestField.setAccessible(true) - } - } - requestField.get(req).asInstanceOf[Request] - } - -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/utils/RPCUtils.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/utils/RPCUtils.scala index e7d48305ac..5ee3b1ca48 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/utils/RPCUtils.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/utils/RPCUtils.scala @@ -25,13 +25,14 @@ import org.apache.linkis.rpc.sender.{SpringCloudFeignConfigurationCache, SpringM import org.apache.commons.lang3.StringUtils +import org.springframework.cloud.client.loadbalancer.RetryableStatusCodeException + import java.lang.reflect.UndeclaredThrowableException import java.net.ConnectException import java.util.Locale import scala.collection.JavaConverters._ -import com.netflix.client.ClientException import feign.RetryableException object RPCUtils { @@ -53,11 +54,10 @@ object RPCUtils { } case t: RuntimeException => t.getCause match { - case client: ClientException => - StringUtils.isNotBlank(client.getErrorMessage) && - client.getErrorMessage.contains( - "Load balancer does not have available server for client" - ) + // case client: ClientException => + case client: RetryableStatusCodeException => + StringUtils.isNotBlank(client.getMessage) && + client.getMessage.contains("Load balancer does not have available server for client") case _ => false } case _ => false diff --git a/linkis-commons/linkis-rpc/src/test/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtilsTest.java b/linkis-commons/linkis-rpc/src/test/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtilsTest.java deleted file mode 100644 index d265371d60..0000000000 --- a/linkis-commons/linkis-rpc/src/test/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtilsTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.message.utils; - -import feign.Request; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -public class LoadBalancerOptionsUtilsTest { - - @Test - @DisplayName("getDefaultOptionsTest") - public void getDefaultOptionsTest() throws NoSuchFieldException, IllegalAccessException { - - Request.Options defaultOptions = LoadBalancerOptionsUtils.getDefaultOptions(); - Assertions.assertNotNull(defaultOptions); - } -} diff --git a/linkis-dist/package/conf/application-linkis.yml b/linkis-dist/package/conf/application-linkis.yml index 868a942946..a5bb4adf7c 100644 --- a/linkis-dist/package/conf/application-linkis.yml +++ b/linkis-dist/package/conf/application-linkis.yml @@ -42,6 +42,11 @@ pagehelper: params: countSql spring: + main: + allow-circular-references: true + mvc: + pathmatch: + matching-strategy: ant_path_matcher servlet: multipart: max-file-size: 500MB diff --git a/linkis-dist/package/conf/linkis.properties b/linkis-dist/package/conf/linkis.properties index c04294c1b0..191fee88a4 100644 --- a/linkis-dist/package/conf/linkis.properties +++ b/linkis-dist/package/conf/linkis.properties @@ -1,4 +1,4 @@ -# +# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -84,6 +84,7 @@ wds.linkis.gateway.conf.metadataquery.list=metadatamanager,metadataquery # note: org.springframework.cloud.config.client.ConfigServiceBootstrapConfiguration.configServicePropertySource need to disable spring.spring.cloud.config.enabled=false +spring.spring.mvc.pathmatch.matching-strategy=ant_path_matcher # linkis user ticket sso # redis stand-alone diff --git a/linkis-spring-cloud-services/linkis-service-discovery/linkis-eureka/pom.xml b/linkis-spring-cloud-services/linkis-service-discovery/linkis-eureka/pom.xml index 355864379e..3780541468 100644 --- a/linkis-spring-cloud-services/linkis-service-discovery/linkis-eureka/pom.xml +++ b/linkis-spring-cloud-services/linkis-service-discovery/linkis-eureka/pom.xml @@ -36,6 +36,28 @@ org.apache.linkis linkis-module provided + + + com.github.xiaoymin + knife4j-spring-boot-starter + + + io.springfox + springfox-schema + + + io.springfox + springfox-spring-web + + + io.springfox + springfox-spring-webflux + + + io.springfox + springfox-spring-webmvc + + diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/pom.xml b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/pom.xml index 37f15698ad..322c2c61b7 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/pom.xml +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/pom.xml @@ -80,6 +80,16 @@ springfox-schema ${springfox.version} + + io.springfox + springfox-spring-webmvc + ${springfox.version} + + + io.springfox + springfox-spring-webflux + ${springfox.version} + org.springframework spring-webmvc diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewaySpringConfiguration.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewaySpringConfiguration.scala index 046edea0c9..52f4d51081 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewaySpringConfiguration.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-core/src/main/scala/org/apache/linkis/gateway/config/GatewaySpringConfiguration.scala @@ -40,27 +40,23 @@ import java.util.stream.Collectors @Configuration class GatewaySpringConfiguration { - @Autowired - private var userRestful: UserRestful = _ - @Autowired private var tokenService: TokenService = _ @PostConstruct def init(): Unit = { - SecurityFilter.setUserRestful(userRestful) TokenAuthentication.setTokenService(tokenService) } -// @Bean(Array("defaultGatewayParser")) -// @ConditionalOnMissingBean -// @Autowired(required = false) -// def createGatewayParser(gatewayParsers: Array[GatewayParser]): DefaultGatewayParser = -// new DefaultGatewayParser(gatewayParsers) -// -// @Bean(Array("defaultGatewayRouter")) -// @ConditionalOnMissingBean -// def createGatewayRouter(): DefaultGatewayParser = new DefaultGatewayRouter + // @Bean(Array("defaultGatewayParser")) + // @ConditionalOnMissingBean + // @Autowired(required = false) + // def createGatewayParser(gatewayParsers: Array[GatewayParser]): DefaultGatewayParser = + // new DefaultGatewayParser(gatewayParsers) + // + // @Bean(Array("defaultGatewayRouter")) + // @ConditionalOnMissingBean + // def createGatewayRouter(): DefaultGatewayParser = new DefaultGatewayRouter @Bean(Array("userRestful")) @ConditionalOnMissingBean @@ -68,6 +64,7 @@ class GatewaySpringConfiguration { def createUserRestful(securityHooks: Array[SecurityHook]): UserRestful = { val userRestful = new LDAPUserRestful if (securityHooks != null) userRestful.setSecurityHooks(securityHooks) + SecurityFilter.setUserRestful(userRestful) userRestful } diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/pom.xml b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/pom.xml index 6dec0ef829..ce3e4f8091 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/pom.xml +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/pom.xml @@ -80,10 +80,6 @@ org.springframework.boot spring-boot-starter-log4j2 - - org.springframework.cloud - spring-cloud-gateway-core - org.springframework.boot spring-boot-starter-reactor-netty diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/GatewayAuthorizationFilter.java b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/GatewayAuthorizationFilter.java index 71e53fb2b6..9adf694604 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/GatewayAuthorizationFilter.java +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/http/GatewayAuthorizationFilter.java @@ -36,7 +36,6 @@ import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.route.Route; import org.springframework.cloud.gateway.route.RouteDefinition; -import org.springframework.cloud.gateway.support.DefaultServerRequest; import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.core.Ordered; import org.springframework.core.codec.AbstractDataBufferDecoder; @@ -47,6 +46,8 @@ import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.web.reactive.function.server.HandlerStrategies; +import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; import java.nio.charset.StandardCharsets; @@ -79,11 +80,12 @@ public GatewayAuthorizationFilter( } private String getRequestBody(ServerWebExchange exchange) { - // StringBuilder requestBody = new StringBuilder(); - DefaultServerRequest serverRequest = new DefaultServerRequest(exchange); + ServerRequest defaultServerRequest = + ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders()); + String requestBody = null; try { - requestBody = serverRequest.bodyToMono(String.class).toFuture().get(); + requestBody = defaultServerRequest.bodyToMono(String.class).toFuture().get(); } catch (Exception e) { GatewayWarnException exception = new GatewayWarnException( @@ -246,7 +248,8 @@ public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); BaseGatewayContext gatewayContext = getBaseGatewayContext(exchange, route); if (!gatewayContext.isWebSocketRequest() && parser.shouldContainRequestBody(gatewayContext)) { - DefaultServerRequest defaultServerRequest = new DefaultServerRequest(exchange); + ServerRequest defaultServerRequest = + ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders()); defaultServerRequest.messageReaders().stream() .filter(reader -> reader instanceof DecoderHttpMessageReader) .filter( diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/scala/org/apache/linkis/gateway/springcloud/SpringCloudGatewayConfiguration.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/scala/org/apache/linkis/gateway/springcloud/SpringCloudGatewayConfiguration.scala index dd1cf6c038..f2a8b49571 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/scala/org/apache/linkis/gateway/springcloud/SpringCloudGatewayConfiguration.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/scala/org/apache/linkis/gateway/springcloud/SpringCloudGatewayConfiguration.scala @@ -27,25 +27,29 @@ import org.apache.linkis.gateway.springcloud.http.{ } import org.apache.linkis.gateway.springcloud.websocket.SpringCloudGatewayWebsocketFilter import org.apache.linkis.rpc.Sender +import org.apache.linkis.rpc.interceptor.ServiceInstanceUtils import org.apache.linkis.server.conf.ServerConfiguration import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.AutoConfigureAfter import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.cloud.client +import org.springframework.cloud.client.DefaultServiceInstance import org.springframework.cloud.client.loadbalancer.LoadBalancerClient import org.springframework.cloud.gateway.config.{GatewayAutoConfiguration, GatewayProperties} import org.springframework.cloud.gateway.filter._ import org.springframework.cloud.gateway.route.{Route, RouteLocator} -import org.springframework.cloud.gateway.route.builder.{PredicateSpec, RouteLocatorBuilder} -import org.springframework.cloud.netflix.ribbon._ +import org.springframework.cloud.gateway.route.builder.{ + Buildable, + PredicateSpec, + RouteLocatorBuilder +} +import org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient +import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory import org.springframework.context.annotation.{Bean, Configuration} import org.springframework.web.reactive.socket.client.WebSocketClient import org.springframework.web.reactive.socket.server.WebSocketService -import scala.collection.JavaConverters._ - -import com.netflix.loadbalancer.Server import org.slf4j.{Logger, LoggerFactory} @Configuration @@ -89,9 +93,9 @@ class SpringCloudGatewayConfiguration { .routes() .route( "api", - new java.util.function.Function[PredicateSpec, Route.AsyncBuilder] { + new java.util.function.Function[PredicateSpec, Buildable[Route]] { - override def apply(t: PredicateSpec): Route.AsyncBuilder = t + override def apply(t: PredicateSpec): Buildable[Route] = t .path(API_URL_PREFIX + "**") .uri(ROUTE_URI_FOR_HTTP_HEADER + Sender.getThisServiceInstance.getApplicationName) @@ -99,9 +103,9 @@ class SpringCloudGatewayConfiguration { ) .route( "dws", - new java.util.function.Function[PredicateSpec, Route.AsyncBuilder] { + new java.util.function.Function[PredicateSpec, Buildable[Route]] { - override def apply(t: PredicateSpec): Route.AsyncBuilder = t + override def apply(t: PredicateSpec): Buildable[Route] = t .path(PROXY_URL_PREFIX + "**") .uri(ROUTE_URI_FOR_HTTP_HEADER + Sender.getThisServiceInstance.getApplicationName) @@ -109,9 +113,9 @@ class SpringCloudGatewayConfiguration { ) .route( "ws_http", - new java.util.function.Function[PredicateSpec, Route.AsyncBuilder] { + new java.util.function.Function[PredicateSpec, Buildable[Route]] { - override def apply(t: PredicateSpec): Route.AsyncBuilder = t + override def apply(t: PredicateSpec): Buildable[Route] = t .path(SpringCloudGatewayConfiguration.WEBSOCKET_URI + "info/**") .uri(ROUTE_URI_FOR_HTTP_HEADER + Sender.getThisServiceInstance.getApplicationName) @@ -119,9 +123,9 @@ class SpringCloudGatewayConfiguration { ) .route( "ws", - new java.util.function.Function[PredicateSpec, Route.AsyncBuilder] { + new java.util.function.Function[PredicateSpec, Buildable[Route]] { - override def apply(t: PredicateSpec): Route.AsyncBuilder = t + override def apply(t: PredicateSpec): Buildable[Route] = t .path(SpringCloudGatewayConfiguration.WEBSOCKET_URI + "**") .uri(ROUTE_URI_FOR_WEB_SOCKET_HEADER + Sender.getThisServiceInstance.getApplicationName) @@ -129,55 +133,127 @@ class SpringCloudGatewayConfiguration { ) .build() - @Bean - def createLoadBalancerClient(springClientFactory: SpringClientFactory): RibbonLoadBalancerClient = - new RibbonLoadBalancerClient(springClientFactory) { - - override def getServer(serviceId: String): Server = if (isMergeModuleInstance(serviceId)) { - val serviceInstance = getServiceInstance(serviceId) - logger.info("redirect to " + serviceInstance) - val lb = this.getLoadBalancer(serviceInstance.getApplicationName) - lb.getAllServers.asScala.find(_.getHostPort == serviceInstance.getInstance).get - } else super.getServer(serviceId) - - def isSecure(server: Server, serviceId: String) = { - val config = springClientFactory.getClientConfig(serviceId) - val serverIntrospector = serverIntrospectorFun(serviceId) - RibbonUtils.isSecure(config, serverIntrospector, server) - } + // @Bean + // def createLoadBalancerClient(springClientFactory: SpringClientFactory): RibbonLoadBalancerClient = + // new RibbonLoadBalancerClient(springClientFactory) { + // + // override def getServer(serviceId: String): Server = if (isMergeModuleInstance(serviceId)) { + // val serviceInstance = getServiceInstance(serviceId) + // logger.info("redirect to " + serviceInstance) + // val lb = this.getLoadBalancer(serviceInstance.getApplicationName) + // lb.getAllServers.asScala.find(_.getHostPort == serviceInstance.getInstance).get + // } else super.getServer(serviceId) + // + // def isSecure(server: Server, serviceId: String) = { + // val config = springClientFactory.getClientConfig(serviceId) + // val serverIntrospector = serverIntrospectorFun(serviceId) + // RibbonUtils.isSecure(config, serverIntrospector, server) + // } + // + // def serverIntrospectorFun(serviceId: String) = { + // var serverIntrospector = + // springClientFactory.getInstance(serviceId, classOf[ServerIntrospector]) + // if (serverIntrospector == null) serverIntrospector = new DefaultServerIntrospector + // serverIntrospector + // } + // + // override def choose(serviceId: String, hint: Any): client.ServiceInstance = + // if (isMergeModuleInstance(serviceId)) { + // val serviceInstance = getServiceInstance(serviceId) + // logger.info("redirect to " + serviceInstance) + // val lb = this.getLoadBalancer(serviceInstance.getApplicationName) + // val serverOption = + // lb.getAllServers.asScala.find(_.getHostPort == serviceInstance.getInstance) + // if (serverOption.isDefined) { + // val server = serverOption.get + // new RibbonLoadBalancerClient.RibbonServer( + // serviceId, + // server, + // isSecure(server, serviceId), + // serverIntrospectorFun(serviceId).getMetadata(server) + // ) + // } else { + // logger.warn( + // "RibbonLoadBalancer not have Server, execute default super choose method" + serviceInstance + // ) + // super.choose(serviceInstance.getApplicationName, hint) + // } + // } else super.choose(serviceId, hint) + // + // } - def serverIntrospectorFun(serviceId: String) = { - var serverIntrospector = - springClientFactory.getInstance(serviceId, classOf[ServerIntrospector]) - if (serverIntrospector == null) serverIntrospector = new DefaultServerIntrospector - serverIntrospector - } + @Bean + def createLoadBalancerClient( + loadBalancerClientFactory: LoadBalancerClientFactory + ): BlockingLoadBalancerClient = + new BlockingLoadBalancerClient(loadBalancerClientFactory) { - override def choose(serviceId: String, hint: Any): client.ServiceInstance = + override def choose(serviceId: String): client.ServiceInstance = { + // serviceId = merge-gw-18linkis-cg-entrance192—168—217–172—9104 if (isMergeModuleInstance(serviceId)) { + // serviceInstance = (linkis-cg-entrance,192.168.217.172:9104) val serviceInstance = getServiceInstance(serviceId) logger.info("redirect to " + serviceInstance) - val lb = this.getLoadBalancer(serviceInstance.getApplicationName) - val serverOption = - lb.getAllServers.asScala.find(_.getHostPort == serviceInstance.getInstance) + + val serverOption: Option[ServiceInstance] = ServiceInstanceUtils.getRPCServerLoader + .getServiceInstances(serviceInstance.getApplicationName) + .find(_.getInstance == serviceInstance.getInstance) + if (serverOption.isDefined) { val server = serverOption.get - new RibbonLoadBalancerClient.RibbonServer( + // serviceInstance.getApplicationName = linkis-cg-entrance + // super.choose(server.getApplicationName) + // SpringCloudFeignConfigurationCache.getDiscoveryClient + // .getInstances(server.getApplicationName).get(0) + val hostAndPort: Array[String] = server.getInstance.split(":") + new DefaultServiceInstance( + server.getApplicationName, serviceId, - server, - isSecure(server, serviceId), - serverIntrospectorFun(serviceId).getMetadata(server) + hostAndPort.head, + hostAndPort.last.toInt, + true ) } else { logger.warn( - "RibbonLoadBalancer not have Server, execute default super choose method" + serviceInstance + "BlockingLoadBalancer not have Server, execute default super choose method" + serviceInstance ) - super.choose(serviceInstance.getApplicationName, hint) + super.choose(serviceInstance.getApplicationName) } - } else super.choose(serviceId, hint) + } else super.choose(serviceId) + } } + // @Bean + // def createLoadBalancerClient( + // loadBalancerClientFactory: LoadBalancerClientFactory + // ): BlockingLoadBalancerClient = + // new BlockingLoadBalancerClient(loadBalancerClientFactory) { + // override def choose(serviceId: String): client.ServiceInstance = { + // // serviceId = merge-gw-18linkis-cg-entrance192—168—217–172—9104 + // if (isMergeModuleInstance(serviceId)) { + // // serviceInstance = (linkis-cg-entrance,192.168.217.172:9104) + // val serviceInstance = getServiceInstance(serviceId) + // logger.info("redirect to " + serviceInstance) + // + // val serverOption: Option[client.ServiceInstance] = SpringCloudFeignConfigurationCache.getDiscoveryClient + // .getInstances(serviceInstance.getApplicationName).iterator() + // .asScala.find(s => s.getHost + ":" + s.getPort == serviceInstance.getInstance) + // + // if (serverOption.isDefined) { + // val server: client.ServiceInstance = serverOption.get + // // serviceInstance.getApplicationName = linkis-cg-entrance + // // super.choose(serviceInstance.getApplicationName) + // super.choose(server.getServiceId) + // } else { + // logger.warn( + // "BlockingLoadBalancer not have Server, execute default super choose method" + serviceInstance + // ) + // super.choose(serviceId) + // } + // } else super.choose(serviceId) + // } + // } @Bean @ConditionalOnProperty(name = Array("spring.cloud.gateway.url.enabled"), matchIfMissing = true) def linkisGatewayHttpHeadersFilter(): LinkisGatewayHttpHeadersFilter = { diff --git a/pom.xml b/pom.xml index db87cce8c3..b7224d64b3 100644 --- a/pom.xml +++ b/pom.xml @@ -160,8 +160,8 @@ 4.5.13 ${httpclient.version} - 2.0.9 - 2.10.5 + 3.0.3 + 3.0.0 5.2.3 0.9.12 @@ -212,11 +212,11 @@ 2.2.220 - 5.2.23.RELEASE - 5.7.5 - 2.3.12.RELEASE - 2.2.9.RELEASE - Hoxton.SR12 + 5.3.27 + 5.7.8 + 2.7.11 + 3.1.7 + 2021.0.8 UTF-8 @@ -1860,7 +1860,7 @@ 3 true -XX:MaxMetaspaceSize=2g ${extraJavaTestArgs} - -Dio.netty.tryReflectionSetAccessible=true + -Dio.netty.tryReflectionSetAccessible=true