diff --git a/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSFix.java b/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSFix.java deleted file mode 100644 index 6c46eca15c7..00000000000 --- a/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSFix.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.fasterxml.jackson.core.base; - -import org.apache.servicecomb.foundation.common.utils.JvmUtils; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper; -import com.fasterxml.jackson.core.json.ReaderBasedJsonParser; -import com.fasterxml.jackson.core.json.UTF8StreamJsonParser; -import com.fasterxml.jackson.databind.MappingJsonFactory; -import com.netflix.config.DynamicPropertyFactory; - -import javassist.CannotCompileException; -import javassist.ClassPool; -import javassist.CtClass; -import javassist.CtMethod; -import javassist.LoaderClassPath; -import javassist.NotFoundException; - -/** - * will be deleted after jackson fix the DoS problem: - * https://github.com/FasterXML/jackson-databind/issues/2157 - */ -public class DoSFix { - private static final String SUFFIX = "Fixed"; - - private static boolean enabled = DynamicPropertyFactory.getInstance() - .getBooleanProperty("servicecomb.jackson.fix.DoS.enabled", true).get(); - - private static boolean fixed; - - private static Class mappingJsonFactoryClass; - - public static synchronized void init() { - if (fixed || !enabled) { - return; - } - - fix(); - } - - public static JsonFactory createJsonFactory() { - try { - return (JsonFactory) mappingJsonFactoryClass.newInstance(); - } catch (Throwable e) { - throw new IllegalStateException("Failed to create JsonFactory.", e); - } - } - - private static void fix() { - try { - ClassLoader classLoader = JvmUtils.correctClassLoader(DoSFix.class.getClassLoader()); - ClassPool pool = new ClassPool(ClassPool.getDefault()); - pool.appendClassPath(new LoaderClassPath(classLoader)); - - fixParserBase(classLoader, pool); - fixReaderParser(classLoader, pool); - fixStreamParser(classLoader, pool); - fixByteSourceJsonBootstrapper(classLoader, pool); - - CtClass ctJsonFactoryFixedClass = fixJsonFactory(classLoader, pool); - fixMappingJsonFactoryClass(classLoader, pool, ctJsonFactoryFixedClass); - - fixed = true; - } catch (Throwable e) { - throw new IllegalStateException( - "Failed to fix jackson DoS bug.", - e); - } - } - - private static void fixMappingJsonFactoryClass(ClassLoader classLoader, ClassPool pool, - CtClass ctJsonFactoryFixedClass) throws NotFoundException, CannotCompileException { - CtClass ctMappingJsonFactoryClass = pool - .getAndRename(MappingJsonFactory.class.getName(), MappingJsonFactory.class.getName() + SUFFIX); - ctMappingJsonFactoryClass.setSuperclass(ctJsonFactoryFixedClass); - mappingJsonFactoryClass = ctMappingJsonFactoryClass.toClass(classLoader, null); - } - - private static CtClass fixJsonFactory(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtClass ctJsonFactoryClass = pool.getCtClass(JsonFactory.class.getName()); - CtClass ctJsonFactoryFixedClass = pool.makeClass(JsonFactory.class.getName() + SUFFIX); - ctJsonFactoryFixedClass.setSuperclass(ctJsonFactoryClass); - for (CtMethod ctMethod : ctJsonFactoryClass.getDeclaredMethods()) { - if (ctMethod.getName().equals("_createParser")) { - ctJsonFactoryFixedClass.addMethod(new CtMethod(ctMethod, ctJsonFactoryFixedClass, null)); - } - } - ctJsonFactoryFixedClass - .replaceClassName(ReaderBasedJsonParser.class.getName(), ReaderBasedJsonParser.class.getName() + SUFFIX); - ctJsonFactoryFixedClass - .replaceClassName(UTF8StreamJsonParser.class.getName(), UTF8StreamJsonParser.class.getName() + SUFFIX); - ctJsonFactoryFixedClass.replaceClassName(ByteSourceJsonBootstrapper.class.getName(), - ByteSourceJsonBootstrapper.class.getName() + SUFFIX); - ctJsonFactoryFixedClass.toClass(classLoader, null); - - return ctJsonFactoryFixedClass; - } - - private static void fixByteSourceJsonBootstrapper(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtClass ctByteSourceJsonBootstrapper = pool - .getAndRename(ByteSourceJsonBootstrapper.class.getName(), ByteSourceJsonBootstrapper.class.getName() + SUFFIX); - ctByteSourceJsonBootstrapper - .replaceClassName(UTF8StreamJsonParser.class.getName(), UTF8StreamJsonParser.class.getName() + SUFFIX); - ctByteSourceJsonBootstrapper - .replaceClassName(ReaderBasedJsonParser.class.getName(), ReaderBasedJsonParser.class.getName() + SUFFIX); - ctByteSourceJsonBootstrapper.toClass(classLoader, null); - } - - private static void fixStreamParser(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtClass ctStreamClass = pool - .getAndRename(UTF8StreamJsonParser.class.getName(), UTF8StreamJsonParser.class.getName() + SUFFIX); - ctStreamClass.replaceClassName(ParserBase.class.getName(), ParserBase.class.getName() + SUFFIX); - ctStreamClass.toClass(classLoader, null); - } - - private static void fixReaderParser(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtClass ctReaderClass = pool - .getAndRename(ReaderBasedJsonParser.class.getName(), ReaderBasedJsonParser.class.getName() + SUFFIX); - ctReaderClass.replaceClassName(ParserBase.class.getName(), ParserBase.class.getName() + SUFFIX); - ctReaderClass.toClass(classLoader, null); - } - - private static void fixParserBase(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtMethod ctMethodFixed = pool.get(DoSParserFixed.class.getName()).getDeclaredMethod("_parseSlowInt"); - CtClass baseClass = pool.getAndRename(ParserBase.class.getName(), ParserBase.class.getName() + SUFFIX); - baseClass.removeMethod(baseClass.getDeclaredMethod("_parseSlowInt")); - baseClass.addMethod(new CtMethod(ctMethodFixed, baseClass, null)); - baseClass.toClass(classLoader, null); - } -} diff --git a/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSParserFixed.java b/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSParserFixed.java deleted file mode 100644 index 71fbdc675b0..00000000000 --- a/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSParserFixed.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.fasterxml.jackson.core.base; - -import java.io.IOException; -import java.io.Reader; -import java.math.BigInteger; - -import com.fasterxml.jackson.core.ObjectCodec; -import com.fasterxml.jackson.core.io.IOContext; -import com.fasterxml.jackson.core.io.NumberInput; -import com.fasterxml.jackson.core.json.ReaderBasedJsonParser; -import com.fasterxml.jackson.core.sym.CharsToNameCanonicalizer; - -/** - * will not be use directly - * just get _parseSlowInt/_parseSlowFloat bytecode and replace to ParserBase - */ -public abstract class DoSParserFixed extends ReaderBasedJsonParser { - public DoSParserFixed(IOContext ctxt, int features, Reader r, - ObjectCodec codec, CharsToNameCanonicalizer st, - char[] inputBuffer, int start, int end, boolean bufferRecyclable) { - super(ctxt, features, r, codec, st, inputBuffer, start, end, bufferRecyclable); - } - - private void _parseSlowInt(int expType) throws IOException { - String numStr = _textBuffer.contentsAsString(); - try { - int len = _intLength; - char[] buf = _textBuffer.getTextBuffer(); - int offset = _textBuffer.getTextOffset(); - if (_numberNegative) { - ++offset; - } - // Some long cases still... - if (NumberInput.inLongRange(buf, offset, len, _numberNegative)) { - // Probably faster to construct a String, call parse, than to use BigInteger - _numberLong = Long.parseLong(numStr); - _numTypesValid = NR_LONG; - } else { - // nope, need the heavy guns... (rare case) - - // *** fix DoS attack begin *** - if (NR_DOUBLE == expType || NR_FLOAT == expType) { - _numberDouble = Double.parseDouble(numStr); - _numTypesValid = NR_DOUBLE; - return; - } - if (NR_BIGINT != expType) { - throw new NumberFormatException("invalid numeric value '" + numStr + "'"); - } - // *** fix DoS attack end *** - - _numberBigInt = new BigInteger(numStr); - _numTypesValid = NR_BIGINT; - } - } catch (NumberFormatException nex) { - // Can this ever occur? Due to overflow, maybe? - _wrapError("Malformed numeric value '" + numStr + "'", nex); - } - } -} diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/AbstractRestObjectMapper.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/AbstractRestObjectMapper.java index 1f64d3d0543..0ca5fa6a019 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/AbstractRestObjectMapper.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/AbstractRestObjectMapper.java @@ -17,15 +17,10 @@ package org.apache.servicecomb.common.rest.codec; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; public abstract class AbstractRestObjectMapper extends ObjectMapper { private static final long serialVersionUID = 189026839992490564L; - public AbstractRestObjectMapper(JsonFactory jsonFactory) { - super(jsonFactory); - } - abstract public String convertToString(Object value) throws Exception; } diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/RestObjectMapper.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/RestObjectMapper.java index ae1a45f7fcf..8c315c52aa2 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/RestObjectMapper.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/RestObjectMapper.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser.Feature; -import com.fasterxml.jackson.core.base.DoSFix; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonSerializer; @@ -35,9 +34,6 @@ import io.vertx.core.json.JsonObject; public class RestObjectMapper extends AbstractRestObjectMapper { - static { - DoSFix.init(); - } private static class JsonObjectSerializer extends JsonSerializer { @Override @@ -52,7 +48,6 @@ public void serialize(JsonObject value, JsonGenerator jgen, SerializerProvider p @SuppressWarnings("deprecation") public RestObjectMapper() { - super(DoSFix.createJsonFactory()); // swagger中要求date使用ISO8601格式传递,这里与之做了功能绑定,这在cse中是没有问题的 setDateFormat(new com.fasterxml.jackson.databind.util.ISO8601DateFormat() { diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/param/TestRestClientRequestImpl.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/param/TestRestClientRequestImpl.java index 6bcb3f9ec63..3e25612cf09 100644 --- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/param/TestRestClientRequestImpl.java +++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/param/TestRestClientRequestImpl.java @@ -22,6 +22,7 @@ import java.util.UUID; import javax.servlet.http.Part; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import org.hamcrest.Matchers; @@ -35,7 +36,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.CaseInsensitiveHeaders; import io.vertx.core.http.HttpClientRequest; -import io.vertx.core.http.HttpHeaders; import mockit.Deencapsulation; import mockit.Expectations; import mockit.Mock; diff --git a/core/src/main/java/org/apache/servicecomb/core/transport/TransportVertxFactory.java b/core/src/main/java/org/apache/servicecomb/core/transport/TransportVertxFactory.java index 3aa8ab1bb2d..d4e96d380a3 100644 --- a/core/src/main/java/org/apache/servicecomb/core/transport/TransportVertxFactory.java +++ b/core/src/main/java/org/apache/servicecomb/core/transport/TransportVertxFactory.java @@ -35,6 +35,7 @@ public class TransportVertxFactory { public TransportVertxFactory() { vertxOptions.setMetricsOptions(metricsOptionsEx); transportVertx = VertxUtils.getOrCreateVertxByName("transport", vertxOptions); + metricsFactory.setVertx(transportVertx, vertxOptions); } public DefaultVertxMetricsFactory getMetricsFactory() { diff --git a/core/src/test/java/org/apache/servicecomb/core/transport/TestTransportVertxFactory.java b/core/src/test/java/org/apache/servicecomb/core/transport/TestTransportVertxFactory.java index aec2492b2e2..fee477f474e 100644 --- a/core/src/test/java/org/apache/servicecomb/core/transport/TestTransportVertxFactory.java +++ b/core/src/test/java/org/apache/servicecomb/core/transport/TestTransportVertxFactory.java @@ -26,7 +26,6 @@ public void getTransportVertx() { Assert.assertNotNull(vertxFactory.getTransportVertx()); Assert.assertSame(vertxFactory.getTransportVertx(), vertxFactory.getTransportVertx()); - Assert.assertSame(vertxFactory.getTransportVertx(), vertxFactory.getMetricsFactory().getVertxMetrics().getVertx()); vertxFactory.getTransportVertx().close(); } diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java index 56cb27e8fab..cc0a5df9405 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java @@ -163,6 +163,7 @@ private void refreshMembers(MemberDiscovery memberDiscovery) { String configCenter = memberDiscovery.getConfigServer(); IpPort ipPort = NetUtils.parseIpPortFromURI(configCenter); clientMgr.findThreadBindClientPool().runOnContext(client -> { + @SuppressWarnings("deprecation") HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> { if (rsp.statusCode() == HttpResponseStatus.OK.code()) { @@ -374,6 +375,7 @@ public void refreshConfig(String configcenter, boolean wait) { + ParseConfigUtils.getInstance().getCurrentVersionInfo(); clientMgr.findThreadBindClientPool().runOnContext(client -> { IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter); + @SuppressWarnings("deprecation") HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), path, rsp -> { if (rsp.statusCode() == HttpResponseStatus.OK.code()) { rsp.bodyHandler(buf -> { diff --git a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java index 5c70ac4a280..d6fad3af143 100644 --- a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java +++ b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java @@ -53,6 +53,7 @@ import mockit.MockUp; import mockit.Mocked; +@SuppressWarnings("deprecation") public class TestConfigCenterClient { @BeforeClass public static void setUpClass() { diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java index eb039afd66b..4fd33690b0a 100644 --- a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java +++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java @@ -55,10 +55,10 @@ public static void syncExecuteBlocking(Handler> blockingCodeHandle } @Override - public void executeBlocking(Action action, Handler> resultHandler) { + public void executeBlockingInternal(Handler> action, Handler> resultHandler) { syncExecuteBlocking((future) -> { try { - future.complete(action.perform()); + action.handle(future); } catch (Throwable e) { future.fail(e); } @@ -72,7 +72,7 @@ public void executeBlocking(Handler> blockingCodeHandler, boolean } @Override - void executeBlocking(Action action, Handler> blockingCodeHandler, + void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler, Executor exec, TaskQueue queue, @SuppressWarnings("rawtypes") PoolMetrics metrics) { syncExecuteBlocking(blockingCodeHandler, resultHandler); diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java index 5fada598c84..c200ed73cb9 100644 --- a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java +++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java @@ -20,6 +20,7 @@ import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; +import io.vertx.core.net.impl.transport.Transport; /** * after test finished, need to invoke vertx.close @@ -28,10 +29,12 @@ public class SyncVertx extends VertxImpl { private ContextImpl context = new SyncContext(this); public SyncVertx() { - this(null, null); + this(new VertxOptions(), null); } protected SyncVertx(VertxOptions options, Handler> resultHandler) { + super(options, Transport.transport(options.getPreferNativeTransport())); + init(); } @Override diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java new file mode 100644 index 00000000000..cd2b055e909 --- /dev/null +++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -0,0 +1,1137 @@ +/* + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.impl; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolverGroup; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.concurrent.GenericFutureListener; +import io.vertx.core.AsyncResult; +import io.vertx.core.Closeable; +import io.vertx.core.Context; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.ServiceHelper; +import io.vertx.core.TimeoutStream; +import io.vertx.core.Verticle; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.datagram.DatagramSocket; +import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.datagram.impl.DatagramSocketImpl; +import io.vertx.core.dns.AddressResolverOptions; +import io.vertx.core.dns.DnsClient; +import io.vertx.core.dns.DnsClientOptions; +import io.vertx.core.dns.impl.DnsClientImpl; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusImpl; +import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus; +import io.vertx.core.file.FileSystem; +import io.vertx.core.file.impl.FileResolver; +import io.vertx.core.file.impl.FileSystemImpl; +import io.vertx.core.file.impl.WindowsFileSystem; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.impl.HttpClientImpl; +import io.vertx.core.http.impl.HttpServerImpl; +import io.vertx.core.impl.resolver.DnsResolverProvider; +import io.vertx.core.json.JsonObject; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.impl.NetClientImpl; +import io.vertx.core.net.impl.NetServerImpl; +import io.vertx.core.net.impl.ServerID; +import io.vertx.core.net.impl.transport.Transport; +import io.vertx.core.shareddata.SharedData; +import io.vertx.core.shareddata.impl.SharedDataImpl; +import io.vertx.core.spi.VerticleFactory; +import io.vertx.core.spi.VertxMetricsFactory; +import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.spi.metrics.Metrics; +import io.vertx.core.spi.metrics.MetricsProvider; +import io.vertx.core.spi.metrics.PoolMetrics; +import io.vertx.core.spi.metrics.VertxMetrics; + +/** + * @author Tim Fox + */ +public class VertxImpl implements VertxInternal, MetricsProvider { + + private static final Logger log = LoggerFactory.getLogger(VertxImpl.class); + + private static final String CLUSTER_MAP_NAME = "__vertx.haInfo"; + private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio"; + private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50); + + static { + // Netty resource leak detection has a performance overhead and we do not need it in Vert.x + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); + // Use the JDK deflater/inflater by default + System.setProperty("io.netty.noJdkZlibDecoder", "false"); + } + + static VertxImpl vertx(VertxOptions options) { + VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport())); + vertx.init(); + return vertx; + } + + static VertxImpl vertx(VertxOptions options, Transport transport) { + VertxImpl vertx = new VertxImpl(options, transport); + vertx.init(); + return vertx; + } + + static void clusteredVertx(VertxOptions options, Handler> resultHandler) { + VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport())); + vertx.joinCluster(options, resultHandler); + } + + private final FileSystem fileSystem = getFileSystem(); + private final SharedData sharedData; + private final VertxMetrics metrics; + private final ConcurrentMap timeouts = new ConcurrentHashMap<>(); + private final AtomicLong timeoutCounter = new AtomicLong(0); + private final ClusterManager clusterManager; + private final DeploymentManager deploymentManager; + private final FileResolver fileResolver; + private final Map sharedHttpServers = new HashMap<>(); + private final Map sharedNetServers = new HashMap<>(); + final WorkerPool workerPool; + final WorkerPool internalBlockingPool; + private final ThreadFactory eventLoopThreadFactory; + private final EventLoopGroup eventLoopGroup; + private final EventLoopGroup acceptorEventLoopGroup; + private final BlockedThreadChecker checker; + private final AddressResolver addressResolver; + private final AddressResolverOptions addressResolverOptions; + private final EventBus eventBus; + private volatile HAManager haManager; + private boolean closed; + private volatile Handler exceptionHandler; + private final Map namedWorkerPools; + private final int defaultWorkerPoolSize; + private final long defaultWorkerMaxExecTime; + private final CloseHooks closeHooks; + private final Transport transport; + + @SuppressWarnings("rawtypes") + public VertxImpl(VertxOptions options, Transport transport) { + // Sanity check + if (Vertx.currentContext() != null) { + log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?"); + } + closeHooks = new CloseHooks(log); + checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit()); + eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); + eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO); + ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); + // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections + // under a lot of load + acceptorEventLoopGroup = transport.eventLoopGroup(1, acceptorEventLoopThreadFactory, 100); + + metrics = initialiseMetrics(options); + + ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(), + new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); + PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; + ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), + new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); + PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null; + internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics); + namedWorkerPools = new HashMap<>(); + workerPool = new WorkerPool(workerExec, workerPoolMetrics); + defaultWorkerPoolSize = options.getWorkerPoolSize(); + defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime(); + + this.transport = transport; + this.fileResolver = new FileResolver(options.getFileSystemOptions()); + this.addressResolverOptions = options.getAddressResolverOptions(); + this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions()); + this.deploymentManager = new DeploymentManager(this); + if (options.isClustered()) { + this.clusterManager = getClusterManager(options); + this.eventBus = new ClusteredEventBus(this, options, clusterManager); + } else { + this.clusterManager = null; + this.eventBus = new EventBusImpl(this); + } + this.sharedData = new SharedDataImpl(this, clusterManager); + } + + public void init() { + eventBus.start(ar -> {}); + if (metrics != null) { + metrics.vertxCreated(this); + } + } + + private void joinCluster(VertxOptions options, Handler> resultHandler) { + clusterManager.setVertx(this); + clusterManager.join(ar -> { + if (ar.succeeded()) { + createHaManager(options, resultHandler); + } else { + log.error("Failed to join cluster", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void createHaManager(VertxOptions options, Handler> resultHandler) { + this.>executeBlocking(fut -> { + fut.complete(clusterManager.getSyncMap(CLUSTER_MAP_NAME)); + }, false, ar -> { + if (ar.succeeded()) { + Map clusterMap = ar.result(); + haManager = new HAManager(this, deploymentManager, clusterManager, clusterMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled()); + startEventBus(resultHandler); + } else { + log.error("Failed to start HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void startEventBus(Handler> resultHandler) { + eventBus.start(ar -> { + if (ar.succeeded()) { + initializeHaManager(resultHandler); + } else { + log.error("Failed to start event bus", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void initializeHaManager(Handler> resultHandler) { + this.executeBlocking(fut -> { + // Init the manager (i.e register listener and check the quorum) + // after the event bus has been fully started and updated its state + // it will have also set the clustered changed view handler on the ha manager + haManager.init(); + fut.complete(); + }, false, ar -> { + if (ar.succeeded()) { + if (metrics != null) { + metrics.vertxCreated(this); + } + resultHandler.handle(Future.succeededFuture(this)); + } else { + log.error("Failed to initialize HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + /** + * @return The FileSystem implementation for the OS + */ + protected FileSystem getFileSystem() { + return Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this); + } + + @Override + public DatagramSocket createDatagramSocket(DatagramSocketOptions options) { + return DatagramSocketImpl.create(this, options); + } + + @Override + public DatagramSocket createDatagramSocket() { + return createDatagramSocket(new DatagramSocketOptions()); + } + + public NetServer createNetServer(NetServerOptions options) { + return new NetServerImpl(this, options); + } + + @Override + public NetServer createNetServer() { + return createNetServer(new NetServerOptions()); + } + + public NetClient createNetClient(NetClientOptions options) { + return new NetClientImpl(this, options); + } + + @Override + public NetClient createNetClient() { + return createNetClient(new NetClientOptions()); + } + + @Override + public Transport transport() { + return transport; + } + + @Override + public boolean isNativeTransportEnabled() { + return transport != Transport.JDK; + } + + public FileSystem fileSystem() { + return fileSystem; + } + + public SharedData sharedData() { + return sharedData; + } + + public HttpServer createHttpServer(HttpServerOptions serverOptions) { + return new HttpServerImpl(this, serverOptions); + } + + @Override + public HttpServer createHttpServer() { + return createHttpServer(new HttpServerOptions()); + } + + public HttpClient createHttpClient(HttpClientOptions options) { + return new HttpClientImpl(this, options); + } + + @Override + public HttpClient createHttpClient() { + return createHttpClient(new HttpClientOptions()); + } + + public EventBus eventBus() { + return eventBus; + } + + public long setPeriodic(long delay, Handler handler) { + return scheduleTimeout(getOrCreateContext(), handler, delay, true); + } + + @Override + public TimeoutStream periodicStream(long delay) { + return new TimeoutStreamImpl(delay, true); + } + + public long setTimer(long delay, Handler handler) { + return scheduleTimeout(getOrCreateContext(), handler, delay, false); + } + + @Override + public TimeoutStream timerStream(long delay) { + return new TimeoutStreamImpl(delay, false); + } + + public void runOnContext(Handler task) { + ContextImpl context = getOrCreateContext(); + context.runOnContext(task); + } + + // The background pool is used for making blocking calls to legacy synchronous APIs + public ExecutorService getWorkerPool() { + return workerPool.executor(); + } + + public EventLoopGroup getEventLoopGroup() { + return eventLoopGroup; + } + + public EventLoopGroup getAcceptorEventLoopGroup() { + return acceptorEventLoopGroup; + } + + public ContextImpl getOrCreateContext() { + ContextImpl ctx = getContext(); + if (ctx == null) { + // We are running embedded - Create a context + ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader()); + } + return ctx; + } + + public Map sharedHttpServers() { + return sharedHttpServers; + } + + public Map sharedNetServers() { + return sharedNetServers; + } + + @Override + public boolean isMetricsEnabled() { + return metrics != null; + } + + @Override + public Metrics getMetrics() { + return metrics; + } + + public boolean cancelTimer(long id) { + InternalTimerHandler handler = timeouts.remove(id); + if (handler != null) { + handler.context.removeCloseHook(handler); + return handler.cancel(); + } else { + return false; + } + } + + @Override + public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) { + return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl); + } + + @Override + public ContextImpl createWorkerContext(boolean multiThreaded, String deploymentID, WorkerPool workerPool, JsonObject config, + ClassLoader tccl) { + if (workerPool == null) { + workerPool = this.workerPool; + } + if (multiThreaded) { + return new MultiThreadedWorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl); + } else { + return new WorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl); + } + } + + @Override + public DnsClient createDnsClient(int port, String host) { + return createDnsClient(new DnsClientOptions().setHost(host).setPort(port)); + } + + @Override + public DnsClient createDnsClient() { + return createDnsClient(new DnsClientOptions()); + } + + @Override + public DnsClient createDnsClient(DnsClientOptions options) { + String host = options.getHost(); + int port = options.getPort(); + if (host == null || port < 0) { + DnsResolverProvider provider = new DnsResolverProvider(this, addressResolverOptions); + InetSocketAddress address = provider.nameServerAddresses().get(0); + // provide the host and port + options = new DnsClientOptions(options) + .setHost(address.getAddress().getHostAddress()) + .setPort(address.getPort()); + } + return new DnsClientImpl(this, options); + } + + private VertxMetrics initialiseMetrics(VertxOptions options) { + if (options.getMetricsOptions() != null && options.getMetricsOptions().isEnabled()) { + VertxMetricsFactory factory = options.getMetricsOptions().getFactory(); + if (factory == null) { + factory = ServiceHelper.loadFactoryOrNull(VertxMetricsFactory.class); + if (factory == null) { + log.warn("Metrics has been set to enabled but no VertxMetricsFactory found on classpath"); + } + } + if (factory != null) { + VertxMetrics metrics = factory.metrics(options); + Objects.requireNonNull(metrics, "The metric instance created from " + factory + " cannot be null"); + return metrics; + } + } + return null; + } + + private ClusterManager getClusterManager(VertxOptions options) { + ClusterManager mgr = options.getClusterManager(); + if (mgr == null) { + String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass"); + if (clusterManagerClassName != null) { + // We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader + try { + Class clazz = Class.forName(clusterManagerClassName); + mgr = (ClusterManager) clazz.newInstance(); + } catch (Exception e) { + throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e); + } + } else { + mgr = ServiceHelper.loadFactoryOrNull(ClusterManager.class); + if (mgr == null) { + throw new IllegalStateException("No ClusterManagerFactory instances found on classpath"); + } + } + } + return mgr; + } + + private long scheduleTimeout(ContextImpl context, Handler handler, long delay, boolean periodic) { + if (delay < 1) { + throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms"); + } + long timerId = timeoutCounter.getAndIncrement(); + InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context); + timeouts.put(timerId, task); + context.addCloseHook(task); + return timerId; + } + + public static Context context() { + Thread current = Thread.currentThread(); + if (current instanceof VertxThread) { + return ((VertxThread) current).getContext(); + } + return null; + } + + public ContextImpl getContext() { + ContextImpl context = (ContextImpl) context(); + if (context != null && context.owner == this) { + return context; + } + return null; + } + + public ClusterManager getClusterManager() { + return clusterManager; + } + + @Override + public void close() { + close(null); + } + + private void closeClusterManager(Handler> completionHandler) { + if (clusterManager != null) { + clusterManager.leave(ar -> { + if (ar.failed()) { + log.error("Failed to leave cluster", ar.cause()); + } + if (completionHandler != null) { + runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + }); + } else if (completionHandler != null) { + runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + } + + @Override + public synchronized void close(Handler> completionHandler) { + if (closed || eventBus == null) { + // Just call the handler directly since pools shutdown + if (completionHandler != null) { + completionHandler.handle(Future.succeededFuture()); + } + return; + } + closed = true; + + closeHooks.run(ar -> { + deploymentManager.undeployAll(ar1 -> { + HAManager haManager = haManager(); + Future haFuture = Future.future(); + if (haManager != null) { + this.executeBlocking(fut -> { + haManager.stop(); + fut.complete(); + }, false, haFuture); + } else { + haFuture.complete(); + } + haFuture.setHandler(ar2 -> { + addressResolver.close(ar3 -> { + eventBus.close(ar4 -> { + closeClusterManager(ar5 -> { + // Copy set to prevent ConcurrentModificationException + Set httpServers = new HashSet<>(sharedHttpServers.values()); + Set netServers = new HashSet<>(sharedNetServers.values()); + sharedHttpServers.clear(); + sharedNetServers.clear(); + + int serverCount = httpServers.size() + netServers.size(); + + AtomicInteger serverCloseCount = new AtomicInteger(); + + Handler> serverCloseHandler = res -> { + if (res.failed()) { + log.error("Failure in shutting down server", res.cause()); + } + if (serverCloseCount.incrementAndGet() == serverCount) { + deleteCacheDirAndShutdown(completionHandler); + } + }; + + for (HttpServerImpl server : httpServers) { + server.closeAll(serverCloseHandler); + } + for (NetServerImpl server : netServers) { + server.closeAll(serverCloseHandler); + } + if (serverCount == 0) { + deleteCacheDirAndShutdown(completionHandler); + } + }); + }); + }); + }); + }); + }); + } + + @Override + public void deployVerticle(Verticle verticle) { + deployVerticle(verticle, new DeploymentOptions(), null); + } + + @Override + public void deployVerticle(Verticle verticle, Handler> completionHandler) { + deployVerticle(verticle, new DeploymentOptions(), completionHandler); + } + + @Override + public void deployVerticle(String name, Handler> completionHandler) { + deployVerticle(name, new DeploymentOptions(), completionHandler); + } + + @Override + public void deployVerticle(Verticle verticle, DeploymentOptions options) { + deployVerticle(verticle, options, null); + } + + @Override + public void deployVerticle(Class verticleClass, DeploymentOptions options) { + deployVerticle(() -> { + try { + return verticleClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, options); + } + + @Override + public void deployVerticle(Supplier verticleSupplier, DeploymentOptions options) { + deployVerticle(verticleSupplier, options, null); + } + + @Override + public void deployVerticle(Verticle verticle, DeploymentOptions options, Handler> completionHandler) { + if (options.getInstances() != 1) { + throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle"); + } + deployVerticle(() -> verticle, options, completionHandler); + } + + @Override + public void deployVerticle(Class verticleClass, DeploymentOptions options, Handler> completionHandler) { + deployVerticle(() -> { + try { + return verticleClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, options, completionHandler); + } + + @Override + public void deployVerticle(Supplier verticleSupplier, DeploymentOptions options, Handler> completionHandler) { + boolean closed; + synchronized (this) { + closed = this.closed; + } + if (closed) { + if (completionHandler != null) { + completionHandler.handle(Future.failedFuture("Vert.x closed")); + } + } else { + deploymentManager.deployVerticle(verticleSupplier, options, completionHandler); + } + } + + @Override + public void deployVerticle(String name) { + deployVerticle(name, new DeploymentOptions(), null); + } + + @Override + public void deployVerticle(String name, DeploymentOptions options) { + deployVerticle(name, options, null); + } + + @Override + public void deployVerticle(String name, DeploymentOptions options, Handler> completionHandler) { + if (options.isHa() && haManager() != null && haManager().isEnabled()) { + haManager().deployVerticle(name, options, completionHandler); + } else { + deploymentManager.deployVerticle(name, options, completionHandler); + } + } + + @Override + public String getNodeID() { + return clusterManager.getNodeID(); + } + + @Override + public void undeploy(String deploymentID) { + undeploy(deploymentID, res -> { + }); + } + + @Override + public void undeploy(String deploymentID, Handler> completionHandler) { + HAManager haManager = haManager(); + Future haFuture = Future.future(); + if (haManager != null && haManager.isEnabled()) { + this.executeBlocking(fut -> { + haManager.removeFromHA(deploymentID); + fut.complete(); + }, false, haFuture); + } else { + haFuture.complete(); + } + haFuture.compose(v -> { + Future deploymentFuture = Future.future(); + deploymentManager.undeployVerticle(deploymentID, deploymentFuture); + return deploymentFuture; + }).setHandler(completionHandler); + } + + @Override + public Set deploymentIDs() { + return deploymentManager.deployments(); + } + + @Override + public void registerVerticleFactory(VerticleFactory factory) { + deploymentManager.registerVerticleFactory(factory); + } + + @Override + public void unregisterVerticleFactory(VerticleFactory factory) { + deploymentManager.unregisterVerticleFactory(factory); + } + + @Override + public Set verticleFactories() { + return deploymentManager.verticleFactories(); + } + + @Override + public void executeBlockingInternal(Handler> blockingCodeHandler, Handler> resultHandler) { + ContextImpl context = getOrCreateContext(); + + context.executeBlockingInternal(blockingCodeHandler, resultHandler); + } + + @Override + public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, + Handler> asyncResultHandler) { + ContextImpl context = getOrCreateContext(); + context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); + } + + @Override + public void executeBlocking(Handler> blockingCodeHandler, + Handler> asyncResultHandler) { + executeBlocking(blockingCodeHandler, true, asyncResultHandler); + } + + @Override + public boolean isClustered() { + return clusterManager != null; + } + + @Override + public EventLoopGroup nettyEventLoopGroup() { + return eventLoopGroup; + } + + // For testing + public void simulateKill() { + if (haManager() != null) { + haManager().simulateKill(); + } + } + + @Override + public Deployment getDeployment(String deploymentID) { + return deploymentManager.getDeployment(deploymentID); + } + + @Override + public synchronized void failoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) { + if (haManager() != null) { + haManager().setFailoverCompleteHandler(failoverCompleteHandler); + } + } + + @Override + public boolean isKilled() { + return haManager().isKilled(); + } + + @Override + public void failDuringFailover(boolean fail) { + if (haManager() != null) { + haManager().failDuringFailover(fail); + } + } + + @Override + public VertxMetrics metricsSPI() { + return metrics; + } + + @Override + public File resolveFile(String fileName) { + return fileResolver.resolveFile(fileName); + } + + @Override + public void resolveAddress(String hostname, Handler> resultHandler) { + addressResolver.resolveHostname(hostname, resultHandler); + } + + @Override + public AddressResolver addressResolver() { + return addressResolver; + } + + @Override + public AddressResolverGroup nettyAddressResolverGroup() { + return addressResolver.nettyAddressResolverGroup(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void deleteCacheDirAndShutdown(Handler> completionHandler) { + executeBlockingInternal(fut -> { + try { + fileResolver.close(); + fut.complete(); + } catch (IOException e) { + fut.tryFail(e); + } + }, ar -> { + + workerPool.close(); + internalBlockingPool.close(); + new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close); + + acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Failure in shutting down acceptor event loop group", future.cause()); + } + eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Failure in shutting down event loop group", future.cause()); + } + if (metrics != null) { + metrics.close(); + } + + checker.close(); + + if (completionHandler != null) { + eventLoopThreadFactory.newThread(() -> { + completionHandler.handle(Future.succeededFuture()); + }).start(); + } + } + }); + } + }); + }); + } + + public HAManager haManager() { + return haManager; + } + + private class InternalTimerHandler implements Handler, Closeable { + final Handler handler; + final boolean periodic; + final long timerID; + final ContextImpl context; + final java.util.concurrent.Future future; + final AtomicBoolean cancelled; + + boolean cancel() { + if (cancelled.compareAndSet(false, true)) { + if (metrics != null) { + metrics.timerEnded(timerID, true); + } + future.cancel(false); + return true; + } else { + return false; + } + } + + InternalTimerHandler(long timerID, Handler runnable, boolean periodic, long delay, ContextImpl context) { + this.context = context; + this.timerID = timerID; + this.handler = runnable; + this.periodic = periodic; + this.cancelled = new AtomicBoolean(); + EventLoop el = context.nettyEventLoop(); + Runnable toRun = () -> context.runOnContext(this); + if (periodic) { + future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS); + } else { + future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS); + } + if (metrics != null) { + metrics.timerCreated(timerID); + } + } + + public void handle(Void v) { + if (!cancelled.get()) { + try { + handler.handle(timerID); + } finally { + if (!periodic) { + // Clean up after it's fired + cleanupNonPeriodic(); + } + } + } + } + + private void cleanupNonPeriodic() { + VertxImpl.this.timeouts.remove(timerID); + if (metrics != null) { + metrics.timerEnded(timerID, false); + } + ContextImpl context = getContext(); + if (context != null) { + context.removeCloseHook(this); + } + } + + // Called via Context close hook when Verticle is undeployed + public void close(Handler> completionHandler) { + VertxImpl.this.timeouts.remove(timerID); + cancel(); + completionHandler.handle(Future.succeededFuture()); + } + + } + + /* + * + * This class is optimised for performance when used on the same event loop that is was passed to the handler with. + * However it can be used safely from other threads. + * + * The internal state is protected using the synchronized keyword. If always used on the same event loop, then + * we benefit from biased locking which makes the overhead of synchronized near zero. + * + */ + private class TimeoutStreamImpl implements TimeoutStream, Handler { + + private final long delay; + private final boolean periodic; + + private Long id; + private Handler handler; + private Handler endHandler; + private long demand; + + public TimeoutStreamImpl(long delay, boolean periodic) { + this.delay = delay; + this.periodic = periodic; + this.demand = Long.MAX_VALUE; + } + + @Override + public synchronized void handle(Long event) { + try { + if (demand > 0) { + demand--; + handler.handle(event); + } + } finally { + if (!periodic && endHandler != null) { + endHandler.handle(null); + } + } + } + + @Override + public synchronized TimeoutStream fetch(long amount) { + demand += amount; + if (demand < 0) { + demand = Long.MAX_VALUE; + } + return this; + } + + @Override + public TimeoutStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public void cancel() { + if (id != null) { + VertxImpl.this.cancelTimer(id); + } + } + + @Override + public synchronized TimeoutStream handler(Handler handler) { + if (handler != null) { + if (id != null) { + throw new IllegalStateException(); + } + this.handler = handler; + id = scheduleTimeout(getOrCreateContext(), this, delay, periodic); + } else { + cancel(); + } + return this; + } + + @Override + public synchronized TimeoutStream pause() { + demand = 0; + return this; + } + + @Override + public synchronized TimeoutStream resume() { + demand = Long.MAX_VALUE; + return this; + } + + @Override + public synchronized TimeoutStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + } + + class SharedWorkerPool extends WorkerPool { + + private final String name; + private int refCount = 1; + + @SuppressWarnings("rawtypes") + SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) { + super(workerExec, workerMetrics); + this.name = name; + } + + @Override + void close() { + synchronized (VertxImpl.this) { + if (refCount > 0) { + refCount = 0; + super.close(); + } + } + } + + void release() { + synchronized (VertxImpl.this) { + if (--refCount == 0) { + namedWorkerPools.remove(name); + super.close(); + } + } + } + } + + @Override + public WorkerExecutorImpl createSharedWorkerExecutor(String name) { + return createSharedWorkerExecutor(name, defaultWorkerPoolSize); + } + + @Override + public WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize) { + return createSharedWorkerExecutor(name, poolSize, defaultWorkerMaxExecTime); + } + + @Override + public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) { + return createSharedWorkerExecutor(name, poolSize, maxExecuteTime, TimeUnit.NANOSECONDS); + } + + @SuppressWarnings("rawtypes") + @Override + public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) { + if (poolSize < 1) { + throw new IllegalArgumentException("poolSize must be > 0"); + } + if (maxExecuteTime < 1) { + throw new IllegalArgumentException("maxExecuteTime must be > 0"); + } + SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name); + if (sharedWorkerPool == null) { + ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime, maxExecuteTimeUnit)); + PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null; + namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics)); + } else { + sharedWorkerPool.refCount++; + } + ContextImpl context = getOrCreateContext(); + WorkerExecutorImpl namedExec = new WorkerExecutorImpl(context, sharedWorkerPool); + context.addCloseHook(namedExec); + return namedExec; + } + + @Override + public Vertx exceptionHandler(Handler handler) { + exceptionHandler = handler; + return this; + } + + @Override + public Handler exceptionHandler() { + return exceptionHandler; + } + + @Override + public void addCloseHook(Closeable hook) { + closeHooks.add(hook); + } + + @Override + public void removeCloseHook(Closeable hook) { + closeHooks.remove(hook); + } +} diff --git a/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java new file mode 100644 index 00000000000..5adc7231e8a --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -0,0 +1,1118 @@ +/* + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.impl; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolverGroup; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.concurrent.GenericFutureListener; +import io.vertx.core.*; +import io.vertx.core.Future; +import io.vertx.core.datagram.DatagramSocket; +import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.datagram.impl.DatagramSocketImpl; +import io.vertx.core.dns.AddressResolverOptions; +import io.vertx.core.dns.DnsClient; +import io.vertx.core.dns.DnsClientOptions; +import io.vertx.core.dns.impl.DnsClientImpl; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusImpl; +import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus; +import io.vertx.core.file.FileSystem; +import io.vertx.core.file.impl.FileResolver; +import io.vertx.core.file.impl.FileSystemImpl; +import io.vertx.core.file.impl.WindowsFileSystem; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.impl.HttpClientImpl; +import io.vertx.core.http.impl.HttpServerImpl; +import io.vertx.core.impl.resolver.DnsResolverProvider; +import io.vertx.core.json.JsonObject; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.impl.NetClientImpl; +import io.vertx.core.net.impl.NetServerImpl; +import io.vertx.core.net.impl.ServerID; +import io.vertx.core.net.impl.transport.Transport; +import io.vertx.core.shareddata.SharedData; +import io.vertx.core.shareddata.impl.SharedDataImpl; +import io.vertx.core.spi.VerticleFactory; +import io.vertx.core.spi.VertxMetricsFactory; +import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.spi.metrics.Metrics; +import io.vertx.core.spi.metrics.MetricsProvider; +import io.vertx.core.spi.metrics.PoolMetrics; +import io.vertx.core.spi.metrics.VertxMetrics; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +/** + * @author Tim Fox + */ +public class VertxImpl implements VertxInternal, MetricsProvider { + + private static final Logger log = LoggerFactory.getLogger(VertxImpl.class); + + private static final String CLUSTER_MAP_NAME = "__vertx.haInfo"; + private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio"; + private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50); + + static { + // Netty resource leak detection has a performance overhead and we do not need it in Vert.x + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); + // Use the JDK deflater/inflater by default + System.setProperty("io.netty.noJdkZlibDecoder", "false"); + } + + static VertxImpl vertx(VertxOptions options) { + VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport())); + vertx.init(); + return vertx; + } + + static VertxImpl vertx(VertxOptions options, Transport transport) { + VertxImpl vertx = new VertxImpl(options, transport); + vertx.init(); + return vertx; + } + + static void clusteredVertx(VertxOptions options, Handler> resultHandler) { + VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport())); + vertx.joinCluster(options, resultHandler); + } + + private final FileSystem fileSystem = getFileSystem(); + private final SharedData sharedData; + private final VertxMetrics metrics; + private final ConcurrentMap timeouts = new ConcurrentHashMap<>(); + private final AtomicLong timeoutCounter = new AtomicLong(0); + private final ClusterManager clusterManager; + private final DeploymentManager deploymentManager; + private final FileResolver fileResolver; + private final Map sharedHttpServers = new HashMap<>(); + private final Map sharedNetServers = new HashMap<>(); + final WorkerPool workerPool; + final WorkerPool internalBlockingPool; + private final ThreadFactory eventLoopThreadFactory; + private final EventLoopGroup eventLoopGroup; + private final EventLoopGroup acceptorEventLoopGroup; + private final BlockedThreadChecker checker; + private final AddressResolver addressResolver; + private final AddressResolverOptions addressResolverOptions; + private final EventBus eventBus; + private volatile HAManager haManager; + private boolean closed; + private volatile Handler exceptionHandler; + private final Map namedWorkerPools; + private final int defaultWorkerPoolSize; + private final long defaultWorkerMaxExecTime; + private final CloseHooks closeHooks; + private final Transport transport; + + @SuppressWarnings("rawtypes") + public VertxImpl(VertxOptions options, Transport transport) { + // Sanity check + if (Vertx.currentContext() != null) { + log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?"); + } + closeHooks = new CloseHooks(log); + checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit()); + eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); + eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO); + ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); + // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections + // under a lot of load + acceptorEventLoopGroup = transport.eventLoopGroup(1, acceptorEventLoopThreadFactory, 100); + + metrics = initialiseMetrics(options); + + ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(), + new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); + PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; + ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), + new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); + PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null; + internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics); + namedWorkerPools = new HashMap<>(); + workerPool = new WorkerPool(workerExec, workerPoolMetrics); + defaultWorkerPoolSize = options.getWorkerPoolSize(); + defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime(); + + this.transport = transport; + this.fileResolver = new FileResolver(options.getFileSystemOptions()); + this.addressResolverOptions = options.getAddressResolverOptions(); + this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions()); + this.deploymentManager = new DeploymentManager(this); + if (options.isClustered()) { + this.clusterManager = getClusterManager(options); + this.eventBus = new ClusteredEventBus(this, options, clusterManager); + } else { + this.clusterManager = null; + this.eventBus = new EventBusImpl(this); + } + this.sharedData = new SharedDataImpl(this, clusterManager); + } + + public void init() { + eventBus.start(ar -> {}); + if (metrics != null) { + metrics.vertxCreated(this); + } + } + + private void joinCluster(VertxOptions options, Handler> resultHandler) { + clusterManager.setVertx(this); + clusterManager.join(ar -> { + if (ar.succeeded()) { + createHaManager(options, resultHandler); + } else { + log.error("Failed to join cluster", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void createHaManager(VertxOptions options, Handler> resultHandler) { + this.>executeBlocking(fut -> { + fut.complete(clusterManager.getSyncMap(CLUSTER_MAP_NAME)); + }, false, ar -> { + if (ar.succeeded()) { + Map clusterMap = ar.result(); + haManager = new HAManager(this, deploymentManager, clusterManager, clusterMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled()); + startEventBus(resultHandler); + } else { + log.error("Failed to start HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void startEventBus(Handler> resultHandler) { + eventBus.start(ar -> { + if (ar.succeeded()) { + initializeHaManager(resultHandler); + } else { + log.error("Failed to start event bus", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void initializeHaManager(Handler> resultHandler) { + this.executeBlocking(fut -> { + // Init the manager (i.e register listener and check the quorum) + // after the event bus has been fully started and updated its state + // it will have also set the clustered changed view handler on the ha manager + haManager.init(); + fut.complete(); + }, false, ar -> { + if (ar.succeeded()) { + if (metrics != null) { + metrics.vertxCreated(this); + } + resultHandler.handle(Future.succeededFuture(this)); + } else { + log.error("Failed to initialize HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + /** + * @return The FileSystem implementation for the OS + */ + protected FileSystem getFileSystem() { + return Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this); + } + + @Override + public DatagramSocket createDatagramSocket(DatagramSocketOptions options) { + return DatagramSocketImpl.create(this, options); + } + + @Override + public DatagramSocket createDatagramSocket() { + return createDatagramSocket(new DatagramSocketOptions()); + } + + public NetServer createNetServer(NetServerOptions options) { + return new NetServerImpl(this, options); + } + + @Override + public NetServer createNetServer() { + return createNetServer(new NetServerOptions()); + } + + public NetClient createNetClient(NetClientOptions options) { + return new NetClientImpl(this, options); + } + + @Override + public NetClient createNetClient() { + return createNetClient(new NetClientOptions()); + } + + @Override + public Transport transport() { + return transport; + } + + @Override + public boolean isNativeTransportEnabled() { + return transport != Transport.JDK; + } + + public FileSystem fileSystem() { + return fileSystem; + } + + public SharedData sharedData() { + return sharedData; + } + + public HttpServer createHttpServer(HttpServerOptions serverOptions) { + return new HttpServerImpl(this, serverOptions); + } + + @Override + public HttpServer createHttpServer() { + return createHttpServer(new HttpServerOptions()); + } + + public HttpClient createHttpClient(HttpClientOptions options) { + return new HttpClientImpl(this, options); + } + + @Override + public HttpClient createHttpClient() { + return createHttpClient(new HttpClientOptions()); + } + + public EventBus eventBus() { + return eventBus; + } + + public long setPeriodic(long delay, Handler handler) { + return scheduleTimeout(getOrCreateContext(), handler, delay, true); + } + + @Override + public TimeoutStream periodicStream(long delay) { + return new TimeoutStreamImpl(delay, true); + } + + public long setTimer(long delay, Handler handler) { + return scheduleTimeout(getOrCreateContext(), handler, delay, false); + } + + @Override + public TimeoutStream timerStream(long delay) { + return new TimeoutStreamImpl(delay, false); + } + + public void runOnContext(Handler task) { + ContextImpl context = getOrCreateContext(); + context.runOnContext(task); + } + + // The background pool is used for making blocking calls to legacy synchronous APIs + public ExecutorService getWorkerPool() { + return workerPool.executor(); + } + + public EventLoopGroup getEventLoopGroup() { + return eventLoopGroup; + } + + public EventLoopGroup getAcceptorEventLoopGroup() { + return acceptorEventLoopGroup; + } + + public ContextImpl getOrCreateContext() { + ContextImpl ctx = getContext(); + if (ctx == null) { + // We are running embedded - Create a context + ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader()); + } + return ctx; + } + + public Map sharedHttpServers() { + return sharedHttpServers; + } + + public Map sharedNetServers() { + return sharedNetServers; + } + + @Override + public boolean isMetricsEnabled() { + return metrics != null; + } + + @Override + public Metrics getMetrics() { + return metrics; + } + + public boolean cancelTimer(long id) { + InternalTimerHandler handler = timeouts.remove(id); + if (handler != null) { + handler.context.removeCloseHook(handler); + return handler.cancel(); + } else { + return false; + } + } + + @Override + public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) { + return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl); + } + + @Override + public ContextImpl createWorkerContext(boolean multiThreaded, String deploymentID, WorkerPool workerPool, JsonObject config, + ClassLoader tccl) { + if (workerPool == null) { + workerPool = this.workerPool; + } + if (multiThreaded) { + return new MultiThreadedWorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl); + } else { + return new WorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl); + } + } + + @Override + public DnsClient createDnsClient(int port, String host) { + return createDnsClient(new DnsClientOptions().setHost(host).setPort(port)); + } + + @Override + public DnsClient createDnsClient() { + return createDnsClient(new DnsClientOptions()); + } + + @Override + public DnsClient createDnsClient(DnsClientOptions options) { + String host = options.getHost(); + int port = options.getPort(); + if (host == null || port < 0) { + DnsResolverProvider provider = new DnsResolverProvider(this, addressResolverOptions); + InetSocketAddress address = provider.nameServerAddresses().get(0); + // provide the host and port + options = new DnsClientOptions(options) + .setHost(address.getAddress().getHostAddress()) + .setPort(address.getPort()); + } + return new DnsClientImpl(this, options); + } + + private VertxMetrics initialiseMetrics(VertxOptions options) { + if (options.getMetricsOptions() != null && options.getMetricsOptions().isEnabled()) { + VertxMetricsFactory factory = options.getMetricsOptions().getFactory(); + if (factory == null) { + factory = ServiceHelper.loadFactoryOrNull(VertxMetricsFactory.class); + if (factory == null) { + log.warn("Metrics has been set to enabled but no VertxMetricsFactory found on classpath"); + } + } + if (factory != null) { + VertxMetrics metrics = factory.metrics(options); + Objects.requireNonNull(metrics, "The metric instance created from " + factory + " cannot be null"); + return metrics; + } + } + return null; + } + + private ClusterManager getClusterManager(VertxOptions options) { + ClusterManager mgr = options.getClusterManager(); + if (mgr == null) { + String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass"); + if (clusterManagerClassName != null) { + // We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader + try { + Class clazz = Class.forName(clusterManagerClassName); + mgr = (ClusterManager) clazz.newInstance(); + } catch (Exception e) { + throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e); + } + } else { + mgr = ServiceHelper.loadFactoryOrNull(ClusterManager.class); + if (mgr == null) { + throw new IllegalStateException("No ClusterManagerFactory instances found on classpath"); + } + } + } + return mgr; + } + + private long scheduleTimeout(ContextImpl context, Handler handler, long delay, boolean periodic) { + if (delay < 1) { + throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms"); + } + long timerId = timeoutCounter.getAndIncrement(); + InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context); + timeouts.put(timerId, task); + context.addCloseHook(task); + return timerId; + } + + public static Context context() { + Thread current = Thread.currentThread(); + if (current instanceof VertxThread) { + return ((VertxThread) current).getContext(); + } + return null; + } + + public ContextImpl getContext() { + ContextImpl context = (ContextImpl) context(); + if (context != null && context.owner == this) { + return context; + } + return null; + } + + public ClusterManager getClusterManager() { + return clusterManager; + } + + @Override + public void close() { + close(null); + } + + private void closeClusterManager(Handler> completionHandler) { + if (clusterManager != null) { + clusterManager.leave(ar -> { + if (ar.failed()) { + log.error("Failed to leave cluster", ar.cause()); + } + if (completionHandler != null) { + runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + }); + } else if (completionHandler != null) { + runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + } + + @Override + public synchronized void close(Handler> completionHandler) { + if (closed || eventBus == null) { + // Just call the handler directly since pools shutdown + if (completionHandler != null) { + completionHandler.handle(Future.succeededFuture()); + } + return; + } + closed = true; + + closeHooks.run(ar -> { + deploymentManager.undeployAll(ar1 -> { + HAManager haManager = haManager(); + Future haFuture = Future.future(); + if (haManager != null) { + this.executeBlocking(fut -> { + haManager.stop(); + fut.complete(); + }, false, haFuture); + } else { + haFuture.complete(); + } + haFuture.setHandler(ar2 -> { + addressResolver.close(ar3 -> { + eventBus.close(ar4 -> { + closeClusterManager(ar5 -> { + // Copy set to prevent ConcurrentModificationException + Set httpServers = new HashSet<>(sharedHttpServers.values()); + Set netServers = new HashSet<>(sharedNetServers.values()); + sharedHttpServers.clear(); + sharedNetServers.clear(); + + int serverCount = httpServers.size() + netServers.size(); + + AtomicInteger serverCloseCount = new AtomicInteger(); + + Handler> serverCloseHandler = res -> { + if (res.failed()) { + log.error("Failure in shutting down server", res.cause()); + } + if (serverCloseCount.incrementAndGet() == serverCount) { + deleteCacheDirAndShutdown(completionHandler); + } + }; + + for (HttpServerImpl server : httpServers) { + server.closeAll(serverCloseHandler); + } + for (NetServerImpl server : netServers) { + server.closeAll(serverCloseHandler); + } + if (serverCount == 0) { + deleteCacheDirAndShutdown(completionHandler); + } + }); + }); + }); + }); + }); + }); + } + + @Override + public void deployVerticle(Verticle verticle) { + deployVerticle(verticle, new DeploymentOptions(), null); + } + + @Override + public void deployVerticle(Verticle verticle, Handler> completionHandler) { + deployVerticle(verticle, new DeploymentOptions(), completionHandler); + } + + @Override + public void deployVerticle(String name, Handler> completionHandler) { + deployVerticle(name, new DeploymentOptions(), completionHandler); + } + + @Override + public void deployVerticle(Verticle verticle, DeploymentOptions options) { + deployVerticle(verticle, options, null); + } + + @Override + public void deployVerticle(Class verticleClass, DeploymentOptions options) { + deployVerticle(() -> { + try { + return verticleClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, options); + } + + @Override + public void deployVerticle(Supplier verticleSupplier, DeploymentOptions options) { + deployVerticle(verticleSupplier, options, null); + } + + @Override + public void deployVerticle(Verticle verticle, DeploymentOptions options, Handler> completionHandler) { + if (options.getInstances() != 1) { + throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle"); + } + deployVerticle(() -> verticle, options, completionHandler); + } + + @Override + public void deployVerticle(Class verticleClass, DeploymentOptions options, Handler> completionHandler) { + deployVerticle(() -> { + try { + return verticleClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, options, completionHandler); + } + + @Override + public void deployVerticle(Supplier verticleSupplier, DeploymentOptions options, Handler> completionHandler) { + boolean closed; + synchronized (this) { + closed = this.closed; + } + if (closed) { + if (completionHandler != null) { + completionHandler.handle(Future.failedFuture("Vert.x closed")); + } + } else { + deploymentManager.deployVerticle(verticleSupplier, options, completionHandler); + } + } + + @Override + public void deployVerticle(String name) { + deployVerticle(name, new DeploymentOptions(), null); + } + + @Override + public void deployVerticle(String name, DeploymentOptions options) { + deployVerticle(name, options, null); + } + + @Override + public void deployVerticle(String name, DeploymentOptions options, Handler> completionHandler) { + if (options.isHa() && haManager() != null && haManager().isEnabled()) { + haManager().deployVerticle(name, options, completionHandler); + } else { + deploymentManager.deployVerticle(name, options, completionHandler); + } + } + + @Override + public String getNodeID() { + return clusterManager.getNodeID(); + } + + @Override + public void undeploy(String deploymentID) { + undeploy(deploymentID, res -> { + }); + } + + @Override + public void undeploy(String deploymentID, Handler> completionHandler) { + HAManager haManager = haManager(); + Future haFuture = Future.future(); + if (haManager != null && haManager.isEnabled()) { + this.executeBlocking(fut -> { + haManager.removeFromHA(deploymentID); + fut.complete(); + }, false, haFuture); + } else { + haFuture.complete(); + } + haFuture.compose(v -> { + Future deploymentFuture = Future.future(); + deploymentManager.undeployVerticle(deploymentID, deploymentFuture); + return deploymentFuture; + }).setHandler(completionHandler); + } + + @Override + public Set deploymentIDs() { + return deploymentManager.deployments(); + } + + @Override + public void registerVerticleFactory(VerticleFactory factory) { + deploymentManager.registerVerticleFactory(factory); + } + + @Override + public void unregisterVerticleFactory(VerticleFactory factory) { + deploymentManager.unregisterVerticleFactory(factory); + } + + @Override + public Set verticleFactories() { + return deploymentManager.verticleFactories(); + } + + @Override + public void executeBlockingInternal(Handler> blockingCodeHandler, Handler> resultHandler) { + ContextImpl context = getOrCreateContext(); + + context.executeBlockingInternal(blockingCodeHandler, resultHandler); + } + + @Override + public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, + Handler> asyncResultHandler) { + ContextImpl context = getOrCreateContext(); + context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); + } + + @Override + public void executeBlocking(Handler> blockingCodeHandler, + Handler> asyncResultHandler) { + executeBlocking(blockingCodeHandler, true, asyncResultHandler); + } + + @Override + public boolean isClustered() { + return clusterManager != null; + } + + @Override + public EventLoopGroup nettyEventLoopGroup() { + return eventLoopGroup; + } + + // For testing + public void simulateKill() { + if (haManager() != null) { + haManager().simulateKill(); + } + } + + @Override + public Deployment getDeployment(String deploymentID) { + return deploymentManager.getDeployment(deploymentID); + } + + @Override + public synchronized void failoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) { + if (haManager() != null) { + haManager().setFailoverCompleteHandler(failoverCompleteHandler); + } + } + + @Override + public boolean isKilled() { + return haManager().isKilled(); + } + + @Override + public void failDuringFailover(boolean fail) { + if (haManager() != null) { + haManager().failDuringFailover(fail); + } + } + + @Override + public VertxMetrics metricsSPI() { + return metrics; + } + + @Override + public File resolveFile(String fileName) { + return fileResolver.resolveFile(fileName); + } + + @Override + public void resolveAddress(String hostname, Handler> resultHandler) { + addressResolver.resolveHostname(hostname, resultHandler); + } + + @Override + public AddressResolver addressResolver() { + return addressResolver; + } + + @Override + public AddressResolverGroup nettyAddressResolverGroup() { + return addressResolver.nettyAddressResolverGroup(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void deleteCacheDirAndShutdown(Handler> completionHandler) { + executeBlockingInternal(fut -> { + try { + fileResolver.close(); + fut.complete(); + } catch (IOException e) { + fut.tryFail(e); + } + }, ar -> { + + workerPool.close(); + internalBlockingPool.close(); + new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close); + + acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Failure in shutting down acceptor event loop group", future.cause()); + } + eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Failure in shutting down event loop group", future.cause()); + } + if (metrics != null) { + metrics.close(); + } + + checker.close(); + + if (completionHandler != null) { + eventLoopThreadFactory.newThread(() -> { + completionHandler.handle(Future.succeededFuture()); + }).start(); + } + } + }); + } + }); + }); + } + + public HAManager haManager() { + return haManager; + } + + private class InternalTimerHandler implements Handler, Closeable { + final Handler handler; + final boolean periodic; + final long timerID; + final ContextImpl context; + final java.util.concurrent.Future future; + final AtomicBoolean cancelled; + + boolean cancel() { + if (cancelled.compareAndSet(false, true)) { + if (metrics != null) { + metrics.timerEnded(timerID, true); + } + future.cancel(false); + return true; + } else { + return false; + } + } + + InternalTimerHandler(long timerID, Handler runnable, boolean periodic, long delay, ContextImpl context) { + this.context = context; + this.timerID = timerID; + this.handler = runnable; + this.periodic = periodic; + this.cancelled = new AtomicBoolean(); + EventLoop el = context.nettyEventLoop(); + Runnable toRun = () -> context.runOnContext(this); + if (periodic) { + future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS); + } else { + future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS); + } + if (metrics != null) { + metrics.timerCreated(timerID); + } + } + + public void handle(Void v) { + if (!cancelled.get()) { + try { + handler.handle(timerID); + } finally { + if (!periodic) { + // Clean up after it's fired + cleanupNonPeriodic(); + } + } + } + } + + private void cleanupNonPeriodic() { + VertxImpl.this.timeouts.remove(timerID); + if (metrics != null) { + metrics.timerEnded(timerID, false); + } + ContextImpl context = getContext(); + if (context != null) { + context.removeCloseHook(this); + } + } + + // Called via Context close hook when Verticle is undeployed + public void close(Handler> completionHandler) { + VertxImpl.this.timeouts.remove(timerID); + cancel(); + completionHandler.handle(Future.succeededFuture()); + } + + } + + /* + * + * This class is optimised for performance when used on the same event loop that is was passed to the handler with. + * However it can be used safely from other threads. + * + * The internal state is protected using the synchronized keyword. If always used on the same event loop, then + * we benefit from biased locking which makes the overhead of synchronized near zero. + * + */ + private class TimeoutStreamImpl implements TimeoutStream, Handler { + + private final long delay; + private final boolean periodic; + + private Long id; + private Handler handler; + private Handler endHandler; + private long demand; + + public TimeoutStreamImpl(long delay, boolean periodic) { + this.delay = delay; + this.periodic = periodic; + this.demand = Long.MAX_VALUE; + } + + @Override + public synchronized void handle(Long event) { + try { + if (demand > 0) { + demand--; + handler.handle(event); + } + } finally { + if (!periodic && endHandler != null) { + endHandler.handle(null); + } + } + } + + @Override + public synchronized TimeoutStream fetch(long amount) { + demand += amount; + if (demand < 0) { + demand = Long.MAX_VALUE; + } + return this; + } + + @Override + public TimeoutStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public void cancel() { + if (id != null) { + VertxImpl.this.cancelTimer(id); + } + } + + @Override + public synchronized TimeoutStream handler(Handler handler) { + if (handler != null) { + if (id != null) { + throw new IllegalStateException(); + } + this.handler = handler; + id = scheduleTimeout(getOrCreateContext(), this, delay, periodic); + } else { + cancel(); + } + return this; + } + + @Override + public synchronized TimeoutStream pause() { + demand = 0; + return this; + } + + @Override + public synchronized TimeoutStream resume() { + demand = Long.MAX_VALUE; + return this; + } + + @Override + public synchronized TimeoutStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + } + + class SharedWorkerPool extends WorkerPool { + + private final String name; + private int refCount = 1; + + @SuppressWarnings("rawtypes") + SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) { + super(workerExec, workerMetrics); + this.name = name; + } + + @Override + void close() { + synchronized (VertxImpl.this) { + if (refCount > 0) { + refCount = 0; + super.close(); + } + } + } + + void release() { + synchronized (VertxImpl.this) { + if (--refCount == 0) { + namedWorkerPools.remove(name); + super.close(); + } + } + } + } + + @Override + public WorkerExecutorImpl createSharedWorkerExecutor(String name) { + return createSharedWorkerExecutor(name, defaultWorkerPoolSize); + } + + @Override + public WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize) { + return createSharedWorkerExecutor(name, poolSize, defaultWorkerMaxExecTime); + } + + @Override + public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) { + return createSharedWorkerExecutor(name, poolSize, maxExecuteTime, TimeUnit.NANOSECONDS); + } + + @SuppressWarnings("rawtypes") + @Override + public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) { + if (poolSize < 1) { + throw new IllegalArgumentException("poolSize must be > 0"); + } + if (maxExecuteTime < 1) { + throw new IllegalArgumentException("maxExecuteTime must be > 0"); + } + SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name); + if (sharedWorkerPool == null) { + ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime, maxExecuteTimeUnit)); + PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null; + namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics)); + } else { + sharedWorkerPool.refCount++; + } + ContextImpl context = getOrCreateContext(); + WorkerExecutorImpl namedExec = new WorkerExecutorImpl(context, sharedWorkerPool); + context.addCloseHook(namedExec); + return namedExec; + } + + @Override + public Vertx exceptionHandler(Handler handler) { + exceptionHandler = handler; + return this; + } + + @Override + public Handler exceptionHandler() { + return exceptionHandler; + } + + @Override + public void addCloseHook(Closeable hook) { + closeHooks.add(hook); + } + + @Override + public void removeCloseHook(Closeable hook) { + closeHooks.remove(hook); + } +} diff --git a/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImplEx.java b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImplEx.java index 2020ff3b41d..2ea0182e92f 100644 --- a/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImplEx.java +++ b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImplEx.java @@ -25,13 +25,14 @@ import io.vertx.core.VertxOptions; import io.vertx.core.json.JsonObject; +import io.vertx.core.net.impl.transport.Transport; public class VertxImplEx extends VertxImpl { private AtomicLong eventLoopContextCreated = new AtomicLong(); public VertxImplEx(String name, VertxOptions vertxOptions) { - super(vertxOptions); - + super(vertxOptions, Transport.transport(vertxOptions.getPreferNativeTransport())); + init(); if (StringUtils.isEmpty(name)) { return; } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java index a3aabd69452..8861a793a2a 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java @@ -41,7 +41,7 @@ import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; -import io.vertx.core.impl.FileResolver; +import io.vertx.core.file.impl.FileResolver; import io.vertx.core.impl.VertxImplEx; import io.vertx.core.logging.SLF4JLogDelegateFactory; diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultHttpServerMetrics.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultHttpServerMetrics.java index b99e8f9246d..4d007ed9ecf 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultHttpServerMetrics.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultHttpServerMetrics.java @@ -62,12 +62,7 @@ public void responseEnd(Object requestMetric, HttpServerResponse response) { } @Override - public Object upgrade(Object requestMetric, ServerWebSocket serverWebSocket) { - return null; - } - - @Override - public Object connected(DefaultHttpSocketMetric socketMetric, ServerWebSocket serverWebSocket) { + public Object connected(DefaultHttpSocketMetric socketMetric, Object requestMetric, ServerWebSocket serverWebSocket) { return null; } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java index 4d59a296f99..f78b6b862e5 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java @@ -24,9 +24,7 @@ import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; -import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.metrics.impl.DummyVertxMetrics; import io.vertx.core.net.NetClientOptions; @@ -37,8 +35,6 @@ import io.vertx.core.spi.metrics.TCPMetrics; public class DefaultVertxMetrics extends DummyVertxMetrics { - private final Vertx vertx; - private VertxOptions vertxOptions; // to support listen multiple addresses, must use a map to manage the metric @@ -46,17 +42,12 @@ public class DefaultVertxMetrics extends DummyVertxMetrics { private volatile DefaultClientEndpointMetricManager clientEndpointMetricManager; - public DefaultVertxMetrics(Vertx vertx, VertxOptions vertxOptions) { - this.vertx = vertx; + public DefaultVertxMetrics(VertxOptions vertxOptions) { this.vertxOptions = vertxOptions; - this.clientEndpointMetricManager = new DefaultClientEndpointMetricManager(vertx, + this.clientEndpointMetricManager = new DefaultClientEndpointMetricManager( (MetricsOptionsEx) vertxOptions.getMetricsOptions()); } - public Vertx getVertx() { - return vertx; - } - public DefaultClientEndpointMetricManager getClientEndpointMetricManager() { return clientEndpointMetricManager; } @@ -66,27 +57,27 @@ public Map getServerEndpointMetricMa } @Override - public HttpServerMetrics createMetrics(HttpServer server, SocketAddress localAddress, - HttpServerOptions options) { + public HttpServerMetrics createHttpServerMetrics(HttpServerOptions options, SocketAddress localAddress + ) { DefaultServerEndpointMetric endpointMetric = serverEndpointMetricMap .computeIfAbsent(localAddress, DefaultServerEndpointMetric::new); return new DefaultHttpServerMetrics(endpointMetric); } @Override - public HttpClientMetrics createMetrics(HttpClient client, HttpClientOptions options) { + public HttpClientMetrics createHttpClientMetrics(HttpClientOptions options) { return new DefaultHttpClientMetrics(clientEndpointMetricManager); } @Override - public TCPMetrics createMetrics(SocketAddress localAddress, NetServerOptions options) { + public TCPMetrics createNetServerMetrics(NetServerOptions options, SocketAddress localAddress) { DefaultServerEndpointMetric endpointMetric = serverEndpointMetricMap .computeIfAbsent(localAddress, DefaultServerEndpointMetric::new); return new DefaultTcpServerMetrics(endpointMetric); } @Override - public TCPMetrics createMetrics(NetClientOptions options) { + public TCPMetrics createNetClientMetrics(NetClientOptions options) { return new DefaultTcpClientMetrics(clientEndpointMetricManager); } @@ -100,4 +91,8 @@ public boolean isMetricsEnabled() { public boolean isEnabled() { return true; } + + public void setVertx(Vertx vertx) { + clientEndpointMetricManager.setVertx(vertx); + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetricsFactory.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetricsFactory.java index 1ceb91684bd..1c558029f50 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetricsFactory.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetricsFactory.java @@ -44,9 +44,9 @@ public DefaultVertxMetrics getVertxMetrics() { } @Override - public synchronized VertxMetrics metrics(Vertx vertx, VertxOptions options) { + public synchronized VertxMetrics metrics(VertxOptions options) { if (vertxMetrics == null) { - vertxMetrics = new DefaultVertxMetrics(vertx, options); + vertxMetrics = new DefaultVertxMetrics(options); } return vertxMetrics; } @@ -58,4 +58,11 @@ public MetricsOptions newOptions() { metricsOptions.setEnabled(true); return metricsOptions; } + + public void setVertx(Vertx vertx, VertxOptions options) { + if (vertxMetrics == null) { + vertxMetrics = new DefaultVertxMetrics(options); + } + vertxMetrics.setVertx(vertx); + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java index aa25d132fb9..e537b186549 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java @@ -30,20 +30,19 @@ import io.vertx.core.net.SocketAddress; public class DefaultClientEndpointMetricManager { - private final Vertx vertx; - private final MetricsOptionsEx metricsOptionsEx; // to avoid save too many endpoint that not exist any more // must check expired periodically private Map clientEndpointMetricMap = new ConcurrentHashMapEx<>(); + private Vertx vertx; + private final ReadWriteLock rwlock = new ReentrantReadWriteLock(); private AtomicBoolean inited = new AtomicBoolean(false); - public DefaultClientEndpointMetricManager(Vertx vertx, MetricsOptionsEx metricsOptionsEx) { - this.vertx = vertx; + public DefaultClientEndpointMetricManager(MetricsOptionsEx metricsOptionsEx) { this.metricsOptionsEx = metricsOptionsEx; } @@ -88,4 +87,8 @@ public void onCheckClientEndpointMetricExpired(long periodic) { } } } + + public void setVertx(Vertx vertx) { + this.vertx = vertx; + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java index 6d66fb60f98..18ed2745815 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java @@ -191,7 +191,7 @@ private void closeInputStream() { if (closed) { return; } - + closed = true; if (!autoCloseInputStream) { return; @@ -210,4 +210,9 @@ public ReadStream endHandler(Handler handler) { this.endHandler = handler; return this; } + + @Override + public ReadStream fetch(long amount) { + return this; + } } diff --git a/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java b/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java deleted file mode 100644 index a458f9bd6d8..00000000000 --- a/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.vertx.core.file.impl; - -import io.vertx.core.file.AsyncFile; -import io.vertx.core.file.OpenOptions; -import io.vertx.core.impl.ContextImpl; -import io.vertx.core.impl.VertxInternal; - -public class AsyncFileUitls { - public static AsyncFile createAsyncFile(VertxInternal vertx, String path, OpenOptions options, ContextImpl context) { - return new AsyncFileImpl(vertx, path, options, context); - } -} diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/http/TestHttpClientPoolFactory.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/http/TestHttpClientPoolFactory.java index 669d1da6697..eb63c1db8d7 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/http/TestHttpClientPoolFactory.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/http/TestHttpClientPoolFactory.java @@ -21,7 +21,7 @@ import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxImpl; import io.vertx.core.impl.VertxInternal; import mockit.Expectations; @@ -33,7 +33,7 @@ public class TestHttpClientPoolFactory { HttpClientPoolFactory factory = new HttpClientPoolFactory(httpClientOptions); @Test - public void createClientPool(@Mocked VertxInternal vertx, @Mocked ContextImpl context, + public void createClientPool(@Mocked VertxInternal vertx, @Mocked ContextInternal context, @Mocked HttpClient httpClient) { new Expectations(VertxImpl.class) { { diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnectionPool.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnectionPool.java index 2ca9736d8bc..f13e59ff75f 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnectionPool.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnectionPool.java @@ -21,12 +21,12 @@ import org.junit.Before; import org.junit.Test; -import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.ContextInternal; import mockit.Mocked; public class TestTcpClientConnectionPool { @Mocked - ContextImpl context; + ContextInternal context; @Mocked NetClientWrapper netClientWrapper; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java index 0e50c68ba7d..c2f6230f2b4 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java @@ -40,7 +40,7 @@ import io.vertx.core.file.FileSystemException; import io.vertx.core.file.OpenOptions; import io.vertx.core.http.HttpClientResponse; -import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.SyncVertx; import io.vertx.core.streams.WriteStream; import mockit.Expectations; @@ -51,7 +51,7 @@ public class TestReadStreamPart { static SyncVertx vertx = new SyncVertx(); - static ContextImpl context = vertx.getContext(); + static ContextInternal context = vertx.getContext(); static String src = "src"; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpClientMetrics.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpClientMetrics.java index caeb25e1541..c026512a0e0 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpClientMetrics.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpClientMetrics.java @@ -99,9 +99,10 @@ private static DefaultHttpSocketMetric initSocketMetric(DefaultHttpClientMetrics @Before public void setup() { vertxOptions.setMetricsOptions(metricsOptionsEx); - defaultVertxMetrics = new DefaultVertxMetrics(vertx, vertxOptions); - clientMetrics_a = (DefaultHttpClientMetrics) defaultVertxMetrics.createMetrics(anyHttpClient, options); - clientMetrics_b = (DefaultHttpClientMetrics) defaultVertxMetrics.createMetrics(anyHttpClient, options); + defaultVertxMetrics = new DefaultVertxMetrics(vertxOptions); + defaultVertxMetrics.setVertx(vertx); + clientMetrics_a = (DefaultHttpClientMetrics) defaultVertxMetrics.createHttpClientMetrics(options); + clientMetrics_b = (DefaultHttpClientMetrics) defaultVertxMetrics.createHttpClientMetrics(options); nanoTime = 1; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpServerMetrics.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpServerMetrics.java index b95ad5a6e3c..51505978c33 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpServerMetrics.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpServerMetrics.java @@ -93,18 +93,18 @@ public class TestDefaultHttpServerMetrics { @Before public void setup() { vertxOptions.setMetricsOptions(metricsOptionsEx); - defaultVertxMetrics = new DefaultVertxMetrics(vertx, vertxOptions); + defaultVertxMetrics = new DefaultVertxMetrics(vertxOptions); metrics_listen1_server1 = (DefaultHttpServerMetrics) defaultVertxMetrics - .createMetrics(listen1_server1, listen1_addr, options); + .createHttpServerMetrics(options, listen1_addr); metrics_listen1_server2 = (DefaultHttpServerMetrics) defaultVertxMetrics - .createMetrics(listen1_server2, listen1_addr, options); + .createHttpServerMetrics(options, listen1_addr); endpointMetric1 = metrics_listen1_server1.getEndpointMetric(); metrics_listen2_server1 = (DefaultHttpServerMetrics) defaultVertxMetrics - .createMetrics(listen2_server1, listen2_addr, options); + .createHttpServerMetrics(options, listen2_addr); metrics_listen2_server2 = (DefaultHttpServerMetrics) defaultVertxMetrics - .createMetrics(listen2_server2, listen2_addr, options); + .createHttpServerMetrics(options, listen2_addr); endpointMetric2 = metrics_listen2_server1.getEndpointMetric(); socketMetric_listen1_1 = metrics_listen1_server1.connected(anyRemoteAddr, remoteName); @@ -198,8 +198,7 @@ public void meaningless() { metrics_listen1_server1.requestReset(null); metrics_listen1_server1.responsePushed(null, null, null, null); metrics_listen1_server1.responseEnd(null, null); - metrics_listen1_server1.upgrade(null, null); - metrics_listen1_server1.connected((DefaultHttpSocketMetric) null, null); + metrics_listen1_server1.connected((DefaultHttpSocketMetric) null, null, null); metrics_listen1_server1.disconnected(null); metrics_listen1_server1.exceptionOccurred(null, null, null); metrics_listen1_server1.close(); diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpClientMetrics.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpClientMetrics.java index fbbe8dec921..140534ebd81 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpClientMetrics.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpClientMetrics.java @@ -94,9 +94,10 @@ private static DefaultTcpSocketMetric initSocketMetric(DefaultTcpClientMetrics m @Before public void setup() { vertxOptions.setMetricsOptions(metricsOptionsEx); - defaultVertxMetrics = new DefaultVertxMetrics(vertx, vertxOptions); - clientMetrics_a = (DefaultTcpClientMetrics) defaultVertxMetrics.createMetrics(options); - clientMetrics_b = (DefaultTcpClientMetrics) defaultVertxMetrics.createMetrics(options); + defaultVertxMetrics = new DefaultVertxMetrics(vertxOptions); + defaultVertxMetrics.setVertx(vertx); + clientMetrics_a = (DefaultTcpClientMetrics) defaultVertxMetrics.createNetClientMetrics(options); + clientMetrics_b = (DefaultTcpClientMetrics) defaultVertxMetrics.createNetClientMetrics(options); nanoTime = 1; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpServerMetrics.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpServerMetrics.java index c6ed45e3a9b..8cdf0a847c9 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpServerMetrics.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpServerMetrics.java @@ -80,18 +80,18 @@ public class TestDefaultTcpServerMetrics { @Before public void setup() { vertxOptions.setMetricsOptions(metricsOptionsEx); - defaultVertxMetrics = new DefaultVertxMetrics(vertx, vertxOptions); + defaultVertxMetrics = new DefaultVertxMetrics(vertxOptions); metrics_listen1_server1 = (DefaultTcpServerMetrics) defaultVertxMetrics - .createMetrics(listen1_addr, options); + .createNetServerMetrics(options, listen1_addr); metrics_listen1_server2 = (DefaultTcpServerMetrics) defaultVertxMetrics - .createMetrics(listen1_addr, options); + .createNetServerMetrics(options, listen1_addr); endpointMetric1 = metrics_listen1_server1.getEndpointMetric(); metrics_listen2_server1 = (DefaultTcpServerMetrics) defaultVertxMetrics - .createMetrics(listen2_addr, options); + .createNetServerMetrics(options, listen2_addr); metrics_listen2_server2 = (DefaultTcpServerMetrics) defaultVertxMetrics - .createMetrics(listen2_addr, options); + .createNetServerMetrics(options, listen2_addr); endpointMetric2 = metrics_listen2_server1.getEndpointMetric(); socketMetric_listen1_1 = metrics_listen1_server1.connected(anyRemoteAddr, remoteName); diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultVertxMetricsFactory.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultVertxMetricsFactory.java index a96f23c7fa1..64af2df5704 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultVertxMetricsFactory.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultVertxMetricsFactory.java @@ -38,13 +38,12 @@ public class TestDefaultVertxMetricsFactory { public void metrics() { MetricsOptions metricsOptions = factory.newOptions(); options.setMetricsOptions(metricsOptions); - VertxMetrics vertxMetrics = factory.metrics(vertx, options); + VertxMetrics vertxMetrics = factory.metrics(options); Assert.assertSame(factory, metricsOptions.getFactory()); Assert.assertTrue(metricsOptions.isEnabled()); Assert.assertSame(factory.getVertxMetrics(), vertxMetrics); - Assert.assertSame(vertx, ((DefaultVertxMetrics) vertxMetrics).getVertx()); Assert.assertTrue(vertxMetrics.isMetricsEnabled()); Assert.assertTrue(vertxMetrics.isEnabled()); } diff --git a/java-chassis-dependencies/pom.xml b/java-chassis-dependencies/pom.xml index 4642339e3a4..c4f2cce1ef6 100644 --- a/java-chassis-dependencies/pom.xml +++ b/java-chassis-dependencies/pom.xml @@ -31,8 +31,8 @@ pom - 2.9.6 - 3.5.3 + 2.9.8 + 3.6.2 0.8 4.3.20.RELEASE 1.7.7 @@ -45,8 +45,8 @@ 4.5.2 1.5.2 1.5.12 - 4.1.24.Final - 2.0.7.Final + 4.1.28.Final + 2.0.14.Final ${basedir}/../.. 5.3.2.Final 3.1.6 diff --git a/java-chassis-distribution/src/release/LICENSE b/java-chassis-distribution/src/release/LICENSE index 5e1070c6ded..ffe745238be 100644 --- a/java-chassis-distribution/src/release/LICENSE +++ b/java-chassis-distribution/src/release/LICENSE @@ -436,14 +436,14 @@ Google Guice - Extensions - AssistedInject (https://github.com/google/guice/exte Google Guice - Extensions - MultiBindings (https://github.com/google/guice/extensions-parent/guice-multibindings) com.google.inject.extensions:guice-multibindings:jar:4.1.0 Hibernate Validator Engine (http://hibernate.org/validator/hibernate-validator) org.hibernate:hibernate-validator:jar:6.0.2.Final hystrix-core (https://github.com/Netflix/Hystrix) com.netflix.hystrix:hystrix-core:jar:1.5.12 -Jackson dataformat: protobuf (http://github.com/FasterXML/jackson-dataformats-binary) com.fasterxml.jackson.dataformat:jackson-dataformat-protobuf:bundle:2.9.6 +Jackson dataformat: protobuf (http://github.com/FasterXML/jackson-dataformats-binary) com.fasterxml.jackson.dataformat:jackson-dataformat-protobuf:bundle:2.9.8 Jackson module: Afterburner (https://github.com/FasterXML/jackson-modules-base) com.fasterxml.jackson.module:jackson-module-afterburner:bundle:2.8.11 Jackson module: JAXB-annotations (http://github.com/FasterXML/jackson-module-jaxb-annotations) com.fasterxml.jackson.module:jackson-module-jaxb-annotations:bundle:2.8.11 -Jackson-annotations (http://github.com/FasterXML/jackson) com.fasterxml.jackson.core:jackson-annotations:bundle:2.9.6 -Jackson-core (https://github.com/FasterXML/jackson-core) com.fasterxml.jackson.core:jackson-core:bundle:2.9.6 -jackson-databind (http://github.com/FasterXML/jackson) com.fasterxml.jackson.core:jackson-databind:bundle:2.9.6 -Jackson-dataformat-XML (http://wiki.fasterxml.com/JacksonExtensionXmlDataBinding) com.fasterxml.jackson.dataformat:jackson-dataformat-xml:bundle:2.9.6 -Jackson-dataformat-YAML (https://github.com/FasterXML/jackson-dataformats-text) com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:bundle:2.9.6 +Jackson-annotations (http://github.com/FasterXML/jackson) com.fasterxml.jackson.core:jackson-annotations:bundle:2.9.8 +Jackson-core (https://github.com/FasterXML/jackson-core) com.fasterxml.jackson.core:jackson-core:bundle:2.9.8 +jackson-databind (http://github.com/FasterXML/jackson) com.fasterxml.jackson.core:jackson-databind:bundle:2.9.8 +Jackson-dataformat-XML (http://wiki.fasterxml.com/JacksonExtensionXmlDataBinding) com.fasterxml.jackson.dataformat:jackson-dataformat-xml:bundle:2.9.8 +Jackson-dataformat-YAML (https://github.com/FasterXML/jackson-dataformats-text) com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:bundle:2.9.8 Javassist (http://www.javassist.org/) org.javassist:javassist:bundle:3.18.1-GA javax.inject (http://code.google.com/p/atinject/) javax.inject:javax.inject:jar:1 JBoss Logging 3 (http://www.jboss.org) org.jboss.logging:jboss-logging:jar:3.3.2.Final @@ -451,19 +451,19 @@ json-lib (http://json-lib.sourceforge.net) net.sf.json-lib:json-lib:jar:2.4 Log4j Implemented Over SLF4J (http://www.slf4j.org) org.slf4j:log4j-over-slf4j:jar:1.7.7 netflix-commons-util (https://github.com/Netflix/netflix-commons) com.netflix.netflix-commons:netflix-commons-util:jar:0.1.1 netflix-statistics (https://github.com/Netflix/netflix-commons) com.netflix.netflix-commons:netflix-statistics:jar:0.1.1 -Netty/Buffer (http://netty.io/netty-buffer/) io.netty:netty-buffer:jar:4.1.24.Final -Netty/Codec (http://netty.io/netty-codec/) io.netty:netty-codec:jar:4.1.24.Final -Netty/Codec/DNS (http://netty.io/netty-codec-dns/) io.netty:netty-codec-dns:jar:4.1.24.Final -Netty/Codec/HTTP (http://netty.io/netty-codec-http/) io.netty:netty-codec-http:jar:4.1.24.Final -Netty/Codec/HTTP2 (http://netty.io/netty-codec-http2/) io.netty:netty-codec-http2:jar:4.1.24.Final -Netty/Codec/Socks (http://netty.io/netty-codec-socks/) io.netty:netty-codec-socks:jar:4.1.24.Final -Netty/Common (http://netty.io/netty-common/) io.netty:netty-common:jar:4.1.24.Final -Netty/Handler (http://netty.io/netty-handler/) io.netty:netty-handler:jar:4.1.24.Final -Netty/Handler/Proxy (http://netty.io/netty-handler-proxy/) io.netty:netty-handler-proxy:jar:4.1.24.Final -Netty/Resolver (http://netty.io/netty-resolver/) io.netty:netty-resolver:jar:4.1.24.Final -Netty/Resolver/DNS (http://netty.io/netty-resolver-dns/) io.netty:netty-resolver-dns:jar:4.1.24.Final -Netty/TomcatNative(https://netty.io/wiki/forked-tomcat-native.html) io.netty:netty-tcnative-boringssl-static:2.0.7.Final -Netty/Transport (http://netty.io/netty-transport/) io.netty:netty-transport:jar:4.1.24.Final +Netty/Buffer (http://netty.io/netty-buffer/) io.netty:netty-buffer:jar:4.1.28.Final +Netty/Codec (http://netty.io/netty-codec/) io.netty:netty-codec:jar:4.1.28.Final +Netty/Codec/DNS (http://netty.io/netty-codec-dns/) io.netty:netty-codec-dns:jar:4.1.28.Final +Netty/Codec/HTTP (http://netty.io/netty-codec-http/) io.netty:netty-codec-http:jar:4.1.28.Final +Netty/Codec/HTTP2 (http://netty.io/netty-codec-http2/) io.netty:netty-codec-http2:jar:4.1.28.Final +Netty/Codec/Socks (http://netty.io/netty-codec-socks/) io.netty:netty-codec-socks:jar:4.1.28.Final +Netty/Common (http://netty.io/netty-common/) io.netty:netty-common:jar:4.1.28.Final +Netty/Handler (http://netty.io/netty-handler/) io.netty:netty-handler:jar:4.1.28.Final +Netty/Handler/Proxy (http://netty.io/netty-handler-proxy/) io.netty:netty-handler-proxy:jar:4.1.28.Final +Netty/Resolver (http://netty.io/netty-resolver/) io.netty:netty-resolver:jar:4.1.28.Final +Netty/Resolver/DNS (http://netty.io/netty-resolver-dns/) io.netty:netty-resolver-dns:jar:4.1.28.Final +Netty/TomcatNative(https://netty.io/wiki/forked-tomcat-native.html) io.netty:netty-tcnative-boringssl-static:2.0.14.Final +Netty/Transport (http://netty.io/netty-transport/) io.netty:netty-transport:jar:4.1.28.Final OkHttp (https://github.com/square/okhttp/okhttp) com.squareup.okhttp3:okhttp:jar:3.4.1 Okio (https://github.com/square/okio/okio) com.squareup.okio:okio:jar:1.9.0 Prometheus Java Simpleclient (http://github.com/prometheus/client_java/simpleclient) io.prometheus:simpleclient:bundle:0.1.0 @@ -523,10 +523,10 @@ tomcat-embed-core (http://tomcat.apache.org/) org.apache.tomcat.embed:tomcat-emb tomcat-embed-el (http://tomcat.apache.org/) org.apache.tomcat.embed:tomcat-embed-el:jar:8.0.33 tomcat-embed-logging-juli (http://tomcat.apache.org/) org.apache.tomcat.embed:tomcat-embed-logging-juli:jar:8.0.33 tomcat-embed-websocket (http://tomcat.apache.org/) org.apache.tomcat.embed:tomcat-embed-websocket:jar:8.0.33 -Vert.x Bridge Common (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-bridge-common) io.vertx:vertx-bridge-common:jar:3.5.3 -Vert.x Core (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-core) io.vertx:vertx-core:jar:3.5.3 -vertx-auth-common (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-auth/vertx-auth-common) io.vertx:vertx-auth-common:jar:3.5.3 -vertx-web (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-web-parent/vertx-web) io.vertx:vertx-web:jar:3.5.3 +Vert.x Bridge Common (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-bridge-common) io.vertx:vertx-bridge-common:jar:3.6.2 +Vert.x Core (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-core) io.vertx:vertx-core:jar:3.6.2 +vertx-auth-common (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-auth/vertx-auth-common) io.vertx:vertx-auth-common:jar:3.6.2 +vertx-web (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-web-parent/vertx-web) io.vertx:vertx-web:jar:3.6.2 Woodstox (https://github.com/FasterXML/woodstox) com.fasterxml.woodstox:woodstox-core:bundle:5.0.3 Zipkin Reporter Spring Factory Beans (https://github.com/openzipkin/zipkin-reporter-java/zipkin-reporter-spring-beans) io.zipkin.reporter2:zipkin-reporter-spring-beans:jar:2.7.13 Zipkin Reporter: Core (https://github.com/openzipkin/zipkin-reporter-java/zipkin-reporter) io.zipkin.reporter2:zipkin-reporter:jar:2.7.13 diff --git a/metrics/metrics-core/src/test/java/org/apache/servicecomb/metrics/core/TestVertxMetersInitializer.java b/metrics/metrics-core/src/test/java/org/apache/servicecomb/metrics/core/TestVertxMetersInitializer.java index ad5573fd187..6f5b9d53815 100644 --- a/metrics/metrics-core/src/test/java/org/apache/servicecomb/metrics/core/TestVertxMetersInitializer.java +++ b/metrics/metrics-core/src/test/java/org/apache/servicecomb/metrics/core/TestVertxMetersInitializer.java @@ -74,7 +74,7 @@ public void start(Future startFuture) { }); HttpServer server = vertx.createHttpServer(); - server.requestHandler(mainRouter::accept); + server.requestHandler(mainRouter); server.listen(0, "0.0.0.0", ar -> { if (ar.succeeded()) { port = ar.result().actualPort(); @@ -88,6 +88,7 @@ public void start(Future startFuture) { } public static class TestClientVerticle extends AbstractVerticle { + @SuppressWarnings("deprecation") @Override public void start(Future startFuture) { HttpClient client = vertx.createHttpClient(); diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestUtils.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestUtils.java index f9db0c3887d..bb797810742 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestUtils.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestUtils.java @@ -83,6 +83,7 @@ public static void httpDo(long timeout, RequestContext requestContext, Handler { responseHandler.handle(new RestResponse(requestContext, response)); diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java index 8af3b3adf25..4218da3c6a9 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java @@ -146,6 +146,7 @@ private HttpMethod getMethod() { return HttpMethod.valueOf(method); } + @SuppressWarnings("deprecation") void createRequest(IpPort ipPort, String path) { URIEndpointObject endpoint = (URIEndpointObject) invocation.getEndpoint().getAddress(); RequestOptions requestOptions = new RequestOptions(); diff --git a/transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/net/impl/ConnectionBase.java b/transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/net/impl/ConnectionBase.java new file mode 100644 index 00000000000..369b62c3fd3 --- /dev/null +++ b/transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/net/impl/ConnectionBase.java @@ -0,0 +1,392 @@ +/* + * Copyright (c) 2011-2018 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.net.impl; + +import io.netty.channel.*; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.vertx.core.*; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import io.vertx.core.net.SocketAddress; +import io.vertx.core.spi.metrics.NetworkMetrics; +import io.vertx.core.spi.metrics.TCPMetrics; + +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.security.cert.X509Certificate; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; + +/** + * Abstract base class for TCP connections. + * + * This class is optimised for performance when used on the same event loop. However it can be used safely from other threads. + * + * The internal state is protected using the synchronized keyword. If always used on the same event loop, then + * we benefit from biased locking which makes the overhead of synchronized near zero. + * + * @author Tim Fox + */ +@SuppressWarnings({"unchecked"}) +public abstract class ConnectionBase { + + /** + * An exception used to signal a closed connection to an exception handler. Exception are + * expensive to create, this instance can be used for this purpose. It does not capture a stack + * trace to not be misleading. + */ + public static final VertxException CLOSED_EXCEPTION = new VertxException("Connection was closed", true); + private static final Logger log = LoggerFactory.getLogger(ConnectionBase.class); + private static final int MAX_REGION_SIZE = 1024 * 1024; + + private final VoidChannelPromise voidPromise; + protected final VertxInternal vertx; + protected final ChannelHandlerContext chctx; + protected final ContextInternal context; + private Handler exceptionHandler; + private Handler closeHandler; + private boolean read; + private boolean needsFlush; + private int writeInProgress; + private Object metric; + + protected ConnectionBase(VertxInternal vertx, ChannelHandlerContext chctx, ContextInternal context) { + this.vertx = vertx; + this.chctx = chctx; + this.context = context; + this.voidPromise = new VoidChannelPromise(chctx.channel(), false); + } + + /** + * Fail the connection, the {@code error} will be sent to the pipeline and the connection will + * stop processing any further message. + * + * @param error the {@code Throwable} to propagate + */ + public void fail(Throwable error) { + handler().fail(error); + } + + public VertxHandler handler() { + return (VertxHandler) chctx.handler(); + } + + protected synchronized final void endReadAndFlush() { + if (read) { + read = false; + if (needsFlush && writeInProgress == 0) { + needsFlush = false; + chctx.flush(); + } + } + } + + private void write(Object msg, ChannelPromise promise) { + if (read || writeInProgress > 0) { + needsFlush = true; + chctx.write(msg, promise); + } else { + needsFlush = false; + chctx.writeAndFlush(msg, promise); + } + } + + public synchronized void writeToChannel(Object msg, ChannelPromise promise) { + // Make sure we serialize all the messages as this method can be called from various threads: + // two "sequential" calls to writeToChannel (we can say that as it is synchronized) should preserve + // the message order independently of the thread. To achieve this we need to reschedule messages + // not on the event loop or if there are pending async message for the channel. + if (chctx.executor().inEventLoop() && writeInProgress == 0) { + write(msg, promise); + } else { + queueForWrite(msg, promise); + } + } + + private void queueForWrite(Object msg, ChannelPromise promise) { + writeInProgress++; + context.runOnContext(v -> { + synchronized (ConnectionBase.this) { + writeInProgress--; + write(msg, promise); + } + }); + } + + public void writeToChannel(Object obj) { + writeToChannel(obj, voidPromise); + } + + // This is a volatile read inside the Netty channel implementation + public boolean isNotWritable() { + return !chctx.channel().isWritable(); + } + + /** + * Close the connection + */ + public void close() { + // make sure everything is flushed out on close + endReadAndFlush(); + chctx.channel().close(); + } + + public synchronized ConnectionBase closeHandler(Handler handler) { + closeHandler = handler; + return this; + } + + public synchronized ConnectionBase exceptionHandler(Handler handler) { + this.exceptionHandler = handler; + return this; + } + + protected synchronized Handler exceptionHandler() { + return exceptionHandler; + } + + public void doPause() { + chctx.channel().config().setAutoRead(false); + } + + public void doResume() { + chctx.channel().config().setAutoRead(true); + } + + public void doSetWriteQueueMaxSize(int size) { + ChannelConfig config = chctx.channel().config(); + config.setWriteBufferWaterMark(new WriteBufferWaterMark(size / 2, size)); + } + + protected final void checkContext() { + // Sanity check + if (context != vertx.getContext()) { + throw new IllegalStateException("Wrong context!"); + } + } + + /** + * @return the Netty channel - for internal usage only + */ + public final Channel channel() { + return chctx.channel(); + } + + public final ChannelHandlerContext channelHandlerContext() { + return chctx; + } + + public final ContextInternal getContext() { + return context; + } + + public synchronized void metric(Object metric) { + this.metric = metric; + } + + public synchronized Object metric() { + return metric; + } + + public abstract NetworkMetrics metrics(); + + protected synchronized void handleException(Throwable t) { + NetworkMetrics metrics = metrics(); + if (metrics != null) { + metrics.exceptionOccurred((ConnectionBase)metric, remoteAddress(), t); + } + if (exceptionHandler != null) { + exceptionHandler.handle(t); + } else { + if (log.isDebugEnabled()) { + log.error(t.getMessage(), t); + } else { + log.error(t.getMessage()); + } + } + } + + protected void handleClosed() { + Handler handler; + synchronized (this) { + NetworkMetrics metrics = metrics(); + if (metrics != null && metrics instanceof TCPMetrics) { + ((TCPMetrics) metrics).disconnected(metric(), remoteAddress()); + } + handler = closeHandler; + } + if (handler != null) { + handler.handle(null); + } + } + + protected abstract void handleInterestedOpsChanged(); + + protected void addFuture(final Handler> completionHandler, final ChannelFuture future) { + if (future != null) { + future.addListener(channelFuture -> context.executeFromIO(v -> { + if (completionHandler != null) { + if (channelFuture.isSuccess()) { + completionHandler.handle(Future.succeededFuture()); + } else { + completionHandler.handle(Future.failedFuture(channelFuture.cause())); + } + } else if (!channelFuture.isSuccess()) { + handleException(channelFuture.cause()); + } + })); + } + } + + protected boolean supportsFileRegion() { + return !isSSL(); + } + + public void reportBytesRead(long numberOfBytes) { + NetworkMetrics metrics = metrics(); + if (metrics != null) { + metrics.bytesRead((ConnectionBase)metric(), remoteAddress(), numberOfBytes); + } + } + + public void reportBytesWritten(long numberOfBytes) { + NetworkMetrics metrics = metrics(); + if (metrics != null) { + metrics.bytesWritten((ConnectionBase)metric(), remoteAddress(), numberOfBytes); + } + } + + public boolean isSSL() { + return chctx.pipeline().get(SslHandler.class) != null; + } + + /** + * Send a file as a file region for zero copy transfer to the socket. + * + * The implementation splits the file into multiple regions to avoid stalling the pipeline + * and producing idle timeouts for very large files. + * + * @param file the file to send + * @param offset the file offset + * @param length the file length + * @param writeFuture the write future to be completed when the transfer is done or failed + */ + private void sendFileRegion(RandomAccessFile file, long offset, long length, ChannelPromise writeFuture) { + if (length < MAX_REGION_SIZE) { + writeToChannel(new DefaultFileRegion(file.getChannel(), offset, length), writeFuture); + } else { + ChannelPromise promise = chctx.newPromise(); + FileRegion region = new DefaultFileRegion(file.getChannel(), offset, MAX_REGION_SIZE); + // Retain explicitly this file region so the underlying channel is not closed by the NIO channel when it + // as been sent as we need it again + region.retain(); + writeToChannel(region, promise); + promise.addListener(future -> { + if (future.isSuccess()) { + sendFileRegion(file, offset + MAX_REGION_SIZE, length - MAX_REGION_SIZE, writeFuture); + } else { + future.cause().printStackTrace(); + writeFuture.setFailure(future.cause()); + } + }); + } + } + + protected ChannelFuture sendFile(RandomAccessFile raf, long offset, long length) throws IOException { + // Write the content. + ChannelPromise writeFuture = chctx.newPromise(); + if (!supportsFileRegion()) { + // Cannot use zero-copy + writeToChannel(new ChunkedFile(raf, offset, length, 8192), writeFuture); + } else { + // No encryption - use zero-copy. + sendFileRegion(raf, offset, length, writeFuture); + } + if (writeFuture != null) { + writeFuture.addListener(fut -> raf.close()); + } else { + raf.close(); + } + return writeFuture; + } + + public boolean isSsl() { + return chctx.pipeline().get(SslHandler.class) != null; + } + + public SSLSession sslSession() { + if (isSSL()) { + ChannelHandlerContext sslHandlerContext = chctx.pipeline().context("ssl"); + assert sslHandlerContext != null; + SslHandler sslHandler = (SslHandler) sslHandlerContext.handler(); + return sslHandler.engine().getSession(); + } else { + return null; + } + } + + public X509Certificate[] peerCertificateChain() throws SSLPeerUnverifiedException { + if (isSSL()) { + ChannelHandlerContext sslHandlerContext = chctx.pipeline().context(SslHandler.class); + assert sslHandlerContext != null; + SslHandler sslHandler = (SslHandler) sslHandlerContext.handler(); + return sslHandler.engine().getSession().getPeerCertificateChain(); + } else { + return null; + } + } + + public String indicatedServerName() { + if (chctx.channel().hasAttr(SslHandshakeCompletionHandler.SERVER_NAME_ATTR)) { + return chctx.channel().attr(SslHandshakeCompletionHandler.SERVER_NAME_ATTR).get(); + } else { + return null; + } + } + + public ChannelPromise channelFuture() { + return chctx.newPromise(); + } + + public String remoteName() { + InetSocketAddress addr = (InetSocketAddress) chctx.channel().remoteAddress(); + if (addr == null) return null; + // Use hostString that does not trigger a DNS resolution + return addr.getHostString(); + } + + public SocketAddress remoteAddress() { + InetSocketAddress addr = (InetSocketAddress) chctx.channel().remoteAddress(); + if (addr == null) return null; + return new SocketAddressImpl(addr); + } + + public SocketAddress localAddress() { + InetSocketAddress addr = (InetSocketAddress) chctx.channel().localAddress(); + if (addr == null) return null; + return new SocketAddressImpl(addr); + } + + final void handleRead(Object msg) { + synchronized (this) { + read = true; + } + handleMessage(msg); + } + + public void handleMessage(Object msg) { + } +} diff --git a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java index 57a590c420d..aca627139fd 100644 --- a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java @@ -130,7 +130,7 @@ long nanoTime() { }; } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Before public void setup() { Deencapsulation.setField(restClientInvocation, "clientRequest", request); diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java index 7e8105fadf8..257d95d98b6 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java @@ -60,18 +60,29 @@ public class RestBodyHandler implements BodyHandler { private static final String BODY_HANDLED = "__body-handled"; private long bodyLimit = DEFAULT_BODY_LIMIT; - + private boolean handleFileUploads; private String uploadsDir; private boolean mergeFormAttributes = DEFAULT_MERGE_FORM_ATTRIBUTES; private boolean deleteUploadedFilesOnEnd = DEFAULT_DELETE_UPLOADED_FILES_ON_END; + private boolean isPreallocateBodyBuffer = DEFAULT_PREALLOCATE_BODY_BUFFER; + private static final int DEFAULT_INITIAL_BODY_BUFFER_SIZE = 1024; //bytes public RestBodyHandler() { - setUploadsDirectory(DEFAULT_UPLOADS_DIRECTORY); + this(true, DEFAULT_UPLOADS_DIRECTORY); + } + + public RestBodyHandler(boolean handleFileUploads) { + this(handleFileUploads, DEFAULT_UPLOADS_DIRECTORY); } public RestBodyHandler(String uploadDirectory) { + this(true, uploadDirectory); + } + + private RestBodyHandler(boolean handleFileUploads, String uploadDirectory) { + this.handleFileUploads = handleFileUploads; setUploadsDirectory(uploadDirectory); } @@ -86,7 +97,8 @@ public void handle(RoutingContext context) { // we need to keep state since we can be called again on reroute Boolean handled = context.get(BODY_HANDLED); if (handled == null || !handled) { - BHandler handler = new BHandler(context); + long contentLength = isPreallocateBodyBuffer ? parseContentLengthHeader(request) : -1; + BHandler handler = new BHandler(context, contentLength); request.handler(handler); request.endHandler(v -> handler.end()); context.put(BODY_HANDLED, true); @@ -100,6 +112,12 @@ public void handle(RoutingContext context) { } } + @Override + public BodyHandler setHandleFileUploads(boolean handleFileUploads) { + this.handleFileUploads = handleFileUploads; + return this; + } + @Override public BodyHandler setBodyLimit(long bodyLimit) { this.bodyLimit = bodyLimit; @@ -124,11 +142,32 @@ public BodyHandler setDeleteUploadedFilesOnEnd(boolean deleteUploadedFilesOnEnd) return this; } + @Override + public BodyHandler setPreallocateBodyBuffer(boolean isPreallocateBodyBuffer) { + this.isPreallocateBodyBuffer = isPreallocateBodyBuffer; + return this; + } + + private long parseContentLengthHeader(HttpServerRequest request) { + String contentLength = request.getHeader(HttpHeaders.CONTENT_LENGTH); + if(contentLength == null || contentLength.isEmpty()) { + return -1; + } + try { + long parsedContentLength = Long.parseLong(contentLength); + return parsedContentLength < 0 ? null : parsedContentLength; + } + catch (NumberFormatException ex) { + return -1; + } + } + private class BHandler implements Handler { + private static final int MAX_PREALLOCATED_BODY_BUFFER_BYTES = 65535; private RoutingContext context; - private Buffer body = Buffer.buffer(); + private Buffer body; private boolean failed; @@ -144,7 +183,7 @@ private class BHandler implements Handler { private final boolean isUrlEncoded; - BHandler(RoutingContext context) { + BHandler(RoutingContext context, long contentLength) { this.context = context; Set fileUploads = context.fileUploads(); @@ -158,9 +197,13 @@ private class BHandler implements Handler { isUrlEncoded = lowerCaseContentType.startsWith(HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString()); } + initBodyBuffer(contentLength); + if (isMultipart || isUrlEncoded) { - makeUploadDir(context.vertx().fileSystem()); context.request().setExpectMultipart(true); + if (handleFileUploads) { + makeUploadDir(context.vertx().fileSystem()); + } context.request().uploadHandler(upload -> { // *** cse begin *** if (uploadsDir == null) { @@ -180,17 +223,19 @@ private class BHandler implements Handler { return; } } - // we actually upload to a file with a generated filename - uploadCount.incrementAndGet(); - String uploadedFileName = new File(uploadsDir, UUID.randomUUID().toString()).getPath(); - upload.streamToFileSystem(uploadedFileName); - FileUploadImpl fileUpload = new FileUploadImpl(uploadedFileName, upload); - fileUploads.add(fileUpload); - upload.exceptionHandler(t -> { - deleteFileUploads(); - context.fail(t); - }); - upload.endHandler(v -> uploadEnded()); + if (handleFileUploads) { + // we actually upload to a file with a generated filename + uploadCount.incrementAndGet(); + String uploadedFileName = new File(uploadsDir, UUID.randomUUID().toString()).getPath(); + upload.streamToFileSystem(uploadedFileName); + FileUploadImpl fileUpload = new FileUploadImpl(uploadedFileName, upload); + fileUploads.add(fileUpload); + upload.exceptionHandler(t -> { + deleteFileUploads(); + context.fail(t); + }); + upload.endHandler(v -> uploadEnded()); + } }); } context.request().exceptionHandler(t -> { @@ -199,6 +244,25 @@ private class BHandler implements Handler { }); } + private void initBodyBuffer(long contentLength) { + int initialBodyBufferSize; + if(contentLength < 0) { + initialBodyBufferSize = DEFAULT_INITIAL_BODY_BUFFER_SIZE; + } + else if(contentLength > MAX_PREALLOCATED_BODY_BUFFER_BYTES) { + initialBodyBufferSize = MAX_PREALLOCATED_BODY_BUFFER_BYTES; + } + else { + initialBodyBufferSize = (int) contentLength; + } + + if(bodyLimit != -1) { + initialBodyBufferSize = (int)Math.min(initialBodyBufferSize, bodyLimit); + } + + this.body = Buffer.buffer(initialBodyBufferSize); + } + private void makeUploadDir(FileSystem fileSystem) { // *** cse begin *** if (uploadsDir == null) { @@ -271,7 +335,7 @@ void doEnd() { } private void deleteFileUploads() { - if (cleanup.compareAndSet(false, true)) { + if (cleanup.compareAndSet(false, true) && handleFileUploads) { for (FileUpload fileUpload : context.fileUploads()) { FileSystem fileSystem = context.vertx().fileSystem(); String uploadedFileName = fileUpload.uploadedFileName(); diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java index 3ac2ee7a21a..2e35b5a3ceb 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java @@ -92,7 +92,7 @@ public void start(Future startFuture) throws Exception { mountCorsHandler(mainRouter); initDispatcher(mainRouter); HttpServer httpServer = createHttpServer(); - httpServer.requestHandler(mainRouter::accept); + httpServer.requestHandler(mainRouter); httpServer.connectionHandler(connection -> { DefaultHttpServerMetrics serverMetrics = (DefaultHttpServerMetrics) ((ConnectionBase) connection).metrics(); DefaultServerEndpointMetric endpointMetric = serverMetrics.getEndpointMetric();