diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java index 7a060dd46..d602cb904 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java @@ -6,8 +6,11 @@ * Built-in PacketLib session flags. */ public class BuiltinFlags { - public static final Flag ENABLE_CLIENT_PROXY_PROTOCOL = new Flag<>("enable-client-proxy-protocol", Boolean.class); + /** + * Enables HAProxy protocol support. + * When this value is not null it represents the ip and port the client claims the connection is from. + */ public static final Flag CLIENT_PROXIED_ADDRESS = new Flag<>("client-proxied-address", InetSocketAddress.class); /** @@ -20,6 +23,24 @@ public class BuiltinFlags { */ public static final Flag TCP_FAST_OPEN = new Flag<>("tcp-fast-open", Boolean.class); + /** + * Connection timeout in seconds. + * Only used by the client. + */ + public static final Flag CLIENT_CONNECT_TIMEOUT = new Flag<>("client-connect-timeout", Integer.class); + + /** + * Read timeout in seconds. + * Used by both the server and client. + */ + public static final Flag READ_TIMEOUT = new Flag<>("read-timeout", Integer.class); + + /** + * Write timeout in seconds. + * Used by both the server and client. + */ + public static final Flag WRITE_TIMEOUT = new Flag<>("write-timeout", Integer.class); + private BuiltinFlags() { } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java index 7ccc0d220..a4af44e33 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java @@ -37,7 +37,7 @@ public interface Session { * @param wait Whether to wait for the connection to be established before returning. * @param transferring Whether the session is a client being transferred. */ - public void connect(boolean wait, boolean transferring); + void connect(boolean wait, boolean transferring); /** * Gets the host the session is connected to. @@ -138,7 +138,7 @@ public interface Session { * * @param flags Collection of flags */ - public void setFlags(Map flags); + void setFlags(Map flags); /** * Gets the listeners listening on this session. @@ -204,48 +204,6 @@ public interface Session { */ void enableEncryption(PacketEncryption encryption); - /** - * Gets the connect timeout for this session in seconds. - * - * @return The session's connect timeout. - */ - int getConnectTimeout(); - - /** - * Sets the connect timeout for this session in seconds. - * - * @param timeout Connect timeout to set. - */ - void setConnectTimeout(int timeout); - - /** - * Gets the read timeout for this session in seconds. - * - * @return The session's read timeout. - */ - int getReadTimeout(); - - /** - * Sets the read timeout for this session in seconds. - * - * @param timeout Read timeout to set. - */ - void setReadTimeout(int timeout); - - /** - * Gets the write timeout for this session in seconds. - * - * @return The session's write timeout. - */ - int getWriteTimeout(); - - /** - * Sets the write timeout for this session in seconds. - * - * @param timeout Write timeout to set. - */ - void setWriteTimeout(int timeout); - /** * Returns true if the session is connected. * diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java index f1dc98bae..a04d0d89d 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java @@ -4,9 +4,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -25,6 +22,8 @@ import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.proxy.Socks4ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.DefaultThreadFactory; @@ -40,6 +39,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -90,56 +90,51 @@ public void connect(boolean wait, boolean transferring) { createTcpEventLoopGroup(); } - try { - final Bootstrap bootstrap = new Bootstrap() - .channelFactory(TRANSPORT_TYPE.socketChannelFactory()) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.IP_TOS, 0x18) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout() * 1000) - .group(EVENT_LOOP_GROUP) - .remoteAddress(resolveAddress()) - .localAddress(bindAddress, bindPort) - .handler(new ChannelInitializer<>() { - @Override - public void initChannel(Channel channel) { - PacketProtocol protocol = getPacketProtocol(); - protocol.newClientSession(TcpClientSession.this, transferring); - - ChannelPipeline pipeline = channel.pipeline(); - - refreshReadTimeoutHandler(channel); - refreshWriteTimeoutHandler(channel); - - addProxy(pipeline); - - int size = protocol.getPacketHeader().getLengthSize(); - if (size > 0) { - pipeline.addLast("sizer", new TcpPacketSizer(TcpClientSession.this, size)); - } - - pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true)); - pipeline.addLast("manager", TcpClientSession.this); - - addHAProxySupport(pipeline); - } - }); + final Bootstrap bootstrap = new Bootstrap() + .channelFactory(TRANSPORT_TYPE.socketChannelFactory()) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.IP_TOS, 0x18) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getFlag(BuiltinFlags.CLIENT_CONNECT_TIMEOUT, 30) * 1000) + .group(EVENT_LOOP_GROUP) + .remoteAddress(resolveAddress()) + .localAddress(bindAddress, bindPort) + .handler(new ChannelInitializer<>() { + @Override + public void initChannel(Channel channel) { + PacketProtocol protocol = getPacketProtocol(); + protocol.newClientSession(TcpClientSession.this, transferring); - if (getFlag(BuiltinFlags.TCP_FAST_OPEN, false) && TRANSPORT_TYPE.supportsTcpFastOpenClient()) { - bootstrap.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); - } + ChannelPipeline pipeline = channel.pipeline(); - ChannelFuture future = bootstrap.connect(); - if (wait) { - future.sync(); - } + addProxy(pipeline); - future.addListener((futureListener) -> { - if (!futureListener.isSuccess()) { - exceptionCaught(null, futureListener.cause()); + initializeHAProxySupport(channel); + + pipeline.addLast("read-timeout", new ReadTimeoutHandler(getFlag(BuiltinFlags.READ_TIMEOUT, 30))); + pipeline.addLast("write-timeout", new WriteTimeoutHandler(getFlag(BuiltinFlags.WRITE_TIMEOUT, 0))); + + pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper())); + + pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true)); + pipeline.addLast("manager", TcpClientSession.this); } }); - } catch (Throwable t) { - exceptionCaught(null, t); + + if (getFlag(BuiltinFlags.TCP_FAST_OPEN, false) && TRANSPORT_TYPE.supportsTcpFastOpenClient()) { + bootstrap.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); + } + + CompletableFuture handleFuture = new CompletableFuture<>(); + bootstrap.connect().addListener((futureListener) -> { + if (!futureListener.isSuccess()) { + exceptionCaught(null, futureListener.cause()); + } + + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -155,8 +150,8 @@ private InetSocketAddress resolveAddress() { if (getFlag(BuiltinFlags.ATTEMPT_SRV_RESOLVE, true) && (!this.host.matches(IP_REGEX) && !this.host.equalsIgnoreCase("localhost"))) { AddressedEnvelope envelope = null; try (DnsNameResolver resolver = new DnsNameResolverBuilder(EVENT_LOOP_GROUP.next()) - .channelFactory(TRANSPORT_TYPE.datagramChannelFactory()) - .build()) { + .channelFactory(TRANSPORT_TYPE.datagramChannelFactory()) + .build()) { envelope = resolver.query(new DefaultDnsQuestion(name, DnsRecordType.SRV)).get(); DnsResponse response = envelope.content(); @@ -206,54 +201,52 @@ private InetSocketAddress resolveAddress() { } private void addProxy(ChannelPipeline pipeline) { - if (proxy != null) { - switch (proxy.type()) { - case HTTP -> { - if (proxy.username() != null && proxy.password() != null) { - pipeline.addFirst("proxy", new HttpProxyHandler(proxy.address(), proxy.username(), proxy.password())); - } else { - pipeline.addFirst("proxy", new HttpProxyHandler(proxy.address())); - } + if (proxy == null) { + return; + } + + switch (proxy.type()) { + case HTTP -> { + if (proxy.username() != null && proxy.password() != null) { + pipeline.addLast("proxy", new HttpProxyHandler(proxy.address(), proxy.username(), proxy.password())); + } else { + pipeline.addLast("proxy", new HttpProxyHandler(proxy.address())); } - case SOCKS4 -> { - if (proxy.username() != null) { - pipeline.addFirst("proxy", new Socks4ProxyHandler(proxy.address(), proxy.username())); - } else { - pipeline.addFirst("proxy", new Socks4ProxyHandler(proxy.address())); - } + } + case SOCKS4 -> { + if (proxy.username() != null) { + pipeline.addLast("proxy", new Socks4ProxyHandler(proxy.address(), proxy.username())); + } else { + pipeline.addLast("proxy", new Socks4ProxyHandler(proxy.address())); } - case SOCKS5 -> { - if (proxy.username() != null && proxy.password() != null) { - pipeline.addFirst("proxy", new Socks5ProxyHandler(proxy.address(), proxy.username(), proxy.password())); - } else { - pipeline.addFirst("proxy", new Socks5ProxyHandler(proxy.address())); - } + } + case SOCKS5 -> { + if (proxy.username() != null && proxy.password() != null) { + pipeline.addLast("proxy", new Socks5ProxyHandler(proxy.address(), proxy.username(), proxy.password())); + } else { + pipeline.addLast("proxy", new Socks5ProxyHandler(proxy.address())); } - default -> throw new UnsupportedOperationException("Unsupported proxy type: " + proxy.type()); } + default -> throw new UnsupportedOperationException("Unsupported proxy type: " + proxy.type()); } } - private void addHAProxySupport(ChannelPipeline pipeline) { + private void initializeHAProxySupport(Channel channel) { InetSocketAddress clientAddress = getFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS); - if (getFlag(BuiltinFlags.ENABLE_CLIENT_PROXY_PROTOCOL, false) && clientAddress != null) { - pipeline.addFirst("proxy-protocol-packet-sender", new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6; - InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); - ctx.channel().writeAndFlush(new HAProxyMessage( - HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, - clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), - clientAddress.getPort(), remoteAddress.getPort() - )); - ctx.pipeline().remove(this); - ctx.pipeline().remove("proxy-protocol-encoder"); - super.channelActive(ctx); - } - }); - pipeline.addFirst("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE); + if (clientAddress == null) { + return; } + + channel.pipeline().addLast("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE); + HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6; + InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); + channel.writeAndFlush(new HAProxyMessage( + HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, + clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), + clientAddress.getPort(), remoteAddress.getPort() + )).addListener(future -> { + channel.pipeline().remove("proxy-protocol-encoder"); + }); } private static void createTcpEventLoopGroup() { @@ -264,7 +257,7 @@ private static void createTcpEventLoopGroup() { EVENT_LOOP_GROUP = TRANSPORT_TYPE.eventLoopGroupFactory().apply(newThreadFactory()); Runtime.getRuntime().addShutdownHook(new Thread( - () -> EVENT_LOOP_GROUP.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))); + () -> EVENT_LOOP_GROUP.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))); } protected static ThreadFactory newThreadFactory() { diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java index f6d5e9b70..decb5069b 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java @@ -5,29 +5,39 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; import io.netty.handler.codec.CorruptedFrameException; -import org.geysermc.mcprotocollib.network.Session; +import lombok.RequiredArgsConstructor; +import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper; +import org.geysermc.mcprotocollib.network.packet.PacketHeader; import java.util.List; +@RequiredArgsConstructor public class TcpPacketSizer extends ByteToMessageCodec { - private final Session session; - private final int size; - - public TcpPacketSizer(Session session, int size) { - this.session = session; - this.size = size; - } + private final PacketHeader header; + private final PacketCodecHelper codecHelper; @Override public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) { + int size = header.getLengthSize(); + if (size == 0) { + out.writeBytes(in); + return; + } + int length = in.readableBytes(); - out.ensureWritable(this.session.getPacketProtocol().getPacketHeader().getLengthSize(length) + length); - this.session.getPacketProtocol().getPacketHeader().writeLength(out, this.session.getCodecHelper(), length); + out.ensureWritable(header.getLengthSize(length) + length); + header.writeLength(out, codecHelper, length); out.writeBytes(in); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) { + int size = header.getLengthSize(); + if (size == 0) { + out.add(buf.retain()); + return; + } + buf.markReaderIndex(); byte[] lengthBytes = new byte[size]; for (int index = 0; index < lengthBytes.length; index++) { @@ -37,8 +47,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) } lengthBytes[index] = buf.readByte(); - if ((this.session.getPacketProtocol().getPacketHeader().isLengthVariable() && lengthBytes[index] >= 0) || index == size - 1) { - int length = this.session.getPacketProtocol().getPacketHeader().readLength(Unpooled.wrappedBuffer(lengthBytes), this.session.getCodecHelper(), buf.readableBytes()); + if ((header.isLengthVariable() && lengthBytes[index] >= 0) || index == size - 1) { + int length = header.readLength(Unpooled.wrappedBuffer(lengthBytes), codecHelper, buf.readableBytes()); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java index b3298fd1a..4d35e9666 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java @@ -2,13 +2,13 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.Future; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import org.geysermc.mcprotocollib.network.AbstractServer; import org.geysermc.mcprotocollib.network.BuiltinFlags; import org.geysermc.mcprotocollib.network.helper.TransportHelper; @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; public class TcpServer extends AbstractServer { @@ -60,13 +61,10 @@ public void initChannel(Channel channel) { ChannelPipeline pipeline = channel.pipeline(); - session.refreshReadTimeoutHandler(channel); - session.refreshWriteTimeoutHandler(channel); + pipeline.addLast("read-timeout", new ReadTimeoutHandler(session.getFlag(BuiltinFlags.READ_TIMEOUT, 30))); + pipeline.addLast("write-timeout", new WriteTimeoutHandler(session.getFlag(BuiltinFlags.WRITE_TIMEOUT, 0))); - int size = protocol.getPacketHeader().getLengthSize(); - if (size > 0) { - pipeline.addLast("sizer", new TcpPacketSizer(session, size)); - } + pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), session.getCodecHelper())); pipeline.addLast("codec", new TcpPacketCodec(session, false)); pipeline.addLast("manager", session); @@ -77,29 +75,22 @@ public void initChannel(Channel channel) { bootstrap.option(ChannelOption.TCP_FASTOPEN, 3); } - ChannelFuture future = bootstrap.bind(); - - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + bootstrap.bind().addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + channel = future.channel(); + if (callback != null) { + callback.run(); + } + } else { + log.error("Failed to bind connection listener.", future.cause()); } - channel = future.channel(); - if (callback != null) { - callback.run(); - } - } else { - future.addListener((ChannelFutureListener) future1 -> { - if (future1.isSuccess()) { - channel = future1.channel(); - if (callback != null) { - callback.run(); - } - } else { - log.error("Failed to asynchronously bind connection listener.", future1.cause()); - } - }); + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -107,26 +98,21 @@ public void initChannel(Channel channel) { public void closeImpl(boolean wait, final Runnable callback) { if (this.channel != null) { if (this.channel.isOpen()) { - ChannelFuture future = this.channel.close(); - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + this.channel.close().addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + if (callback != null) { + callback.run(); + } + } else { + log.error("Failed to close connection listener.", future.cause()); } - if (callback != null) { - callback.run(); - } - } else { - future.addListener((ChannelFutureListener) future1 -> { - if (future1.isSuccess()) { - if (callback != null) { - callback.run(); - } - } else { - log.error("Failed to asynchronously close connection listener.", future1.cause()); - } - }); + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -134,18 +120,17 @@ public void closeImpl(boolean wait, final Runnable callback) { } if (this.group != null) { - Future future = this.group.shutdownGracefully(); - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + this.group.shutdownGracefully().addListener(future -> { + if (!future.isSuccess()) { + log.debug("Failed to close connection listener.", future.cause()); } - } else { - future.addListener(future1 -> { - if (!future1.isSuccess()) { - log.debug("Failed to asynchronously close connection listener.", future1.cause()); - } - }); + + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } this.group = null; diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java index 94320ca43..462f85b4b 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java @@ -7,8 +7,6 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.util.concurrent.DefaultThreadFactory; import net.kyori.adventure.text.Component; import org.checkerframework.checker.nullness.qual.NonNull; @@ -50,9 +48,6 @@ public abstract class TcpSession extends SimpleChannelInboundHandler imp private final EventLoop eventLoop = createEventLoop(); private int compressionThreshold = -1; - private int connectTimeout = 30; - private int readTimeout = 30; - private int writeTimeout = 0; private final Map flags = new HashMap<>(); private final List listeners = new CopyOnWriteArrayList<>(); @@ -220,38 +215,6 @@ public void enableEncryption(PacketEncryption encryption) { channel.pipeline().addBefore("sizer", "encryption", new TcpPacketEncryptor(encryption)); } - @Override - public int getConnectTimeout() { - return this.connectTimeout; - } - - @Override - public void setConnectTimeout(int timeout) { - this.connectTimeout = timeout; - } - - @Override - public int getReadTimeout() { - return this.readTimeout; - } - - @Override - public void setReadTimeout(int timeout) { - this.readTimeout = timeout; - this.refreshReadTimeoutHandler(); - } - - @Override - public int getWriteTimeout() { - return this.writeTimeout; - } - - @Override - public void setWriteTimeout(int timeout) { - this.writeTimeout = timeout; - this.refreshWriteTimeoutHandler(); - } - @Override public boolean isConnected() { return this.channel != null && this.channel.isOpen() && !this.disconnected; @@ -313,46 +276,6 @@ public Channel getChannel() { return this.channel; } - protected void refreshReadTimeoutHandler() { - this.refreshReadTimeoutHandler(this.channel); - } - - protected void refreshReadTimeoutHandler(Channel channel) { - if (channel != null) { - if (this.readTimeout <= 0) { - if (channel.pipeline().get("readTimeout") != null) { - channel.pipeline().remove("readTimeout"); - } - } else { - if (channel.pipeline().get("readTimeout") == null) { - channel.pipeline().addFirst("readTimeout", new ReadTimeoutHandler(this.readTimeout)); - } else { - channel.pipeline().replace("readTimeout", "readTimeout", new ReadTimeoutHandler(this.readTimeout)); - } - } - } - } - - protected void refreshWriteTimeoutHandler() { - this.refreshWriteTimeoutHandler(this.channel); - } - - protected void refreshWriteTimeoutHandler(Channel channel) { - if (channel != null) { - if (this.writeTimeout <= 0) { - if (channel.pipeline().get("writeTimeout") != null) { - channel.pipeline().remove("writeTimeout"); - } - } else { - if (channel.pipeline().get("writeTimeout") == null) { - channel.pipeline().addFirst("writeTimeout", new WriteTimeoutHandler(this.writeTimeout)); - } else { - channel.pipeline().replace("writeTimeout", "writeTimeout", new WriteTimeoutHandler(this.writeTimeout)); - } - } - } - } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (this.disconnected || this.channel != null) {