From 4148fa9b1fdabea2562ca848653a66679b4d6069 Mon Sep 17 00:00:00 2001 From: Alex <40795980+AlexProgrammerDE@users.noreply.github.com> Date: Tue, 10 Sep 2024 09:16:58 +0200 Subject: [PATCH] Static sizer and timeout handlers in the pipeline (#833) * Improve pipeline This simplifies all pipeline code and ensures some listeners like the sizer are always present. The code already assumed that the sizer is always there and thus causes issues. The sizer can be deactivated still now and has pretty much no performance losses from this. The profit from this PR is that there is less logic with modifying the PR and thus developers interacting with the channel can assume specific things about the order and placements of elements in the pipeline. This will be useful once ViaVersion is supported, and it is expected that certain elements always are in the pipeline and don't change. My plan is to also always have an encryption and compression handler in the pipeline that is controlled via AttributeKeys from netty, but for that first #828 needs to be merged. So this PR only completes the goal partially, but that's fine. PR is ready for review like it is right now. * Revert some stuff * Fix channel race condition * Fix closing race condition * Prevent client race conditions. * Fix test failure, idk how, idk why, but it works now * Address review * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris --------- Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> Co-authored-by: chris --- .../mcprotocollib/network/BuiltinFlags.java | 23 ++- .../mcprotocollib/network/Session.java | 46 +---- .../network/tcp/TcpClientSession.java | 173 +++++++++--------- .../network/tcp/TcpPacketSizer.java | 34 ++-- .../mcprotocollib/network/tcp/TcpServer.java | 101 +++++----- .../mcprotocollib/network/tcp/TcpSession.java | 77 -------- 6 files changed, 172 insertions(+), 282 deletions(-) diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java index 7a060dd46..d602cb904 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java @@ -6,8 +6,11 @@ * Built-in PacketLib session flags. */ public class BuiltinFlags { - public static final Flag ENABLE_CLIENT_PROXY_PROTOCOL = new Flag<>("enable-client-proxy-protocol", Boolean.class); + /** + * Enables HAProxy protocol support. + * When this value is not null it represents the ip and port the client claims the connection is from. + */ public static final Flag CLIENT_PROXIED_ADDRESS = new Flag<>("client-proxied-address", InetSocketAddress.class); /** @@ -20,6 +23,24 @@ public class BuiltinFlags { */ public static final Flag TCP_FAST_OPEN = new Flag<>("tcp-fast-open", Boolean.class); + /** + * Connection timeout in seconds. + * Only used by the client. + */ + public static final Flag CLIENT_CONNECT_TIMEOUT = new Flag<>("client-connect-timeout", Integer.class); + + /** + * Read timeout in seconds. + * Used by both the server and client. + */ + public static final Flag READ_TIMEOUT = new Flag<>("read-timeout", Integer.class); + + /** + * Write timeout in seconds. + * Used by both the server and client. + */ + public static final Flag WRITE_TIMEOUT = new Flag<>("write-timeout", Integer.class); + private BuiltinFlags() { } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java index 7ccc0d220..a4af44e33 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java @@ -37,7 +37,7 @@ public interface Session { * @param wait Whether to wait for the connection to be established before returning. * @param transferring Whether the session is a client being transferred. */ - public void connect(boolean wait, boolean transferring); + void connect(boolean wait, boolean transferring); /** * Gets the host the session is connected to. @@ -138,7 +138,7 @@ public interface Session { * * @param flags Collection of flags */ - public void setFlags(Map flags); + void setFlags(Map flags); /** * Gets the listeners listening on this session. @@ -204,48 +204,6 @@ public interface Session { */ void enableEncryption(PacketEncryption encryption); - /** - * Gets the connect timeout for this session in seconds. - * - * @return The session's connect timeout. - */ - int getConnectTimeout(); - - /** - * Sets the connect timeout for this session in seconds. - * - * @param timeout Connect timeout to set. - */ - void setConnectTimeout(int timeout); - - /** - * Gets the read timeout for this session in seconds. - * - * @return The session's read timeout. - */ - int getReadTimeout(); - - /** - * Sets the read timeout for this session in seconds. - * - * @param timeout Read timeout to set. - */ - void setReadTimeout(int timeout); - - /** - * Gets the write timeout for this session in seconds. - * - * @return The session's write timeout. - */ - int getWriteTimeout(); - - /** - * Sets the write timeout for this session in seconds. - * - * @param timeout Write timeout to set. - */ - void setWriteTimeout(int timeout); - /** * Returns true if the session is connected. * diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java index f1dc98bae..a04d0d89d 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java @@ -4,9 +4,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -25,6 +22,8 @@ import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.proxy.Socks4ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.DefaultThreadFactory; @@ -40,6 +39,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -90,56 +90,51 @@ public void connect(boolean wait, boolean transferring) { createTcpEventLoopGroup(); } - try { - final Bootstrap bootstrap = new Bootstrap() - .channelFactory(TRANSPORT_TYPE.socketChannelFactory()) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.IP_TOS, 0x18) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout() * 1000) - .group(EVENT_LOOP_GROUP) - .remoteAddress(resolveAddress()) - .localAddress(bindAddress, bindPort) - .handler(new ChannelInitializer<>() { - @Override - public void initChannel(Channel channel) { - PacketProtocol protocol = getPacketProtocol(); - protocol.newClientSession(TcpClientSession.this, transferring); - - ChannelPipeline pipeline = channel.pipeline(); - - refreshReadTimeoutHandler(channel); - refreshWriteTimeoutHandler(channel); - - addProxy(pipeline); - - int size = protocol.getPacketHeader().getLengthSize(); - if (size > 0) { - pipeline.addLast("sizer", new TcpPacketSizer(TcpClientSession.this, size)); - } - - pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true)); - pipeline.addLast("manager", TcpClientSession.this); - - addHAProxySupport(pipeline); - } - }); + final Bootstrap bootstrap = new Bootstrap() + .channelFactory(TRANSPORT_TYPE.socketChannelFactory()) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.IP_TOS, 0x18) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getFlag(BuiltinFlags.CLIENT_CONNECT_TIMEOUT, 30) * 1000) + .group(EVENT_LOOP_GROUP) + .remoteAddress(resolveAddress()) + .localAddress(bindAddress, bindPort) + .handler(new ChannelInitializer<>() { + @Override + public void initChannel(Channel channel) { + PacketProtocol protocol = getPacketProtocol(); + protocol.newClientSession(TcpClientSession.this, transferring); - if (getFlag(BuiltinFlags.TCP_FAST_OPEN, false) && TRANSPORT_TYPE.supportsTcpFastOpenClient()) { - bootstrap.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); - } + ChannelPipeline pipeline = channel.pipeline(); - ChannelFuture future = bootstrap.connect(); - if (wait) { - future.sync(); - } + addProxy(pipeline); - future.addListener((futureListener) -> { - if (!futureListener.isSuccess()) { - exceptionCaught(null, futureListener.cause()); + initializeHAProxySupport(channel); + + pipeline.addLast("read-timeout", new ReadTimeoutHandler(getFlag(BuiltinFlags.READ_TIMEOUT, 30))); + pipeline.addLast("write-timeout", new WriteTimeoutHandler(getFlag(BuiltinFlags.WRITE_TIMEOUT, 0))); + + pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper())); + + pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true)); + pipeline.addLast("manager", TcpClientSession.this); } }); - } catch (Throwable t) { - exceptionCaught(null, t); + + if (getFlag(BuiltinFlags.TCP_FAST_OPEN, false) && TRANSPORT_TYPE.supportsTcpFastOpenClient()) { + bootstrap.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); + } + + CompletableFuture handleFuture = new CompletableFuture<>(); + bootstrap.connect().addListener((futureListener) -> { + if (!futureListener.isSuccess()) { + exceptionCaught(null, futureListener.cause()); + } + + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -155,8 +150,8 @@ private InetSocketAddress resolveAddress() { if (getFlag(BuiltinFlags.ATTEMPT_SRV_RESOLVE, true) && (!this.host.matches(IP_REGEX) && !this.host.equalsIgnoreCase("localhost"))) { AddressedEnvelope envelope = null; try (DnsNameResolver resolver = new DnsNameResolverBuilder(EVENT_LOOP_GROUP.next()) - .channelFactory(TRANSPORT_TYPE.datagramChannelFactory()) - .build()) { + .channelFactory(TRANSPORT_TYPE.datagramChannelFactory()) + .build()) { envelope = resolver.query(new DefaultDnsQuestion(name, DnsRecordType.SRV)).get(); DnsResponse response = envelope.content(); @@ -206,54 +201,52 @@ private InetSocketAddress resolveAddress() { } private void addProxy(ChannelPipeline pipeline) { - if (proxy != null) { - switch (proxy.type()) { - case HTTP -> { - if (proxy.username() != null && proxy.password() != null) { - pipeline.addFirst("proxy", new HttpProxyHandler(proxy.address(), proxy.username(), proxy.password())); - } else { - pipeline.addFirst("proxy", new HttpProxyHandler(proxy.address())); - } + if (proxy == null) { + return; + } + + switch (proxy.type()) { + case HTTP -> { + if (proxy.username() != null && proxy.password() != null) { + pipeline.addLast("proxy", new HttpProxyHandler(proxy.address(), proxy.username(), proxy.password())); + } else { + pipeline.addLast("proxy", new HttpProxyHandler(proxy.address())); } - case SOCKS4 -> { - if (proxy.username() != null) { - pipeline.addFirst("proxy", new Socks4ProxyHandler(proxy.address(), proxy.username())); - } else { - pipeline.addFirst("proxy", new Socks4ProxyHandler(proxy.address())); - } + } + case SOCKS4 -> { + if (proxy.username() != null) { + pipeline.addLast("proxy", new Socks4ProxyHandler(proxy.address(), proxy.username())); + } else { + pipeline.addLast("proxy", new Socks4ProxyHandler(proxy.address())); } - case SOCKS5 -> { - if (proxy.username() != null && proxy.password() != null) { - pipeline.addFirst("proxy", new Socks5ProxyHandler(proxy.address(), proxy.username(), proxy.password())); - } else { - pipeline.addFirst("proxy", new Socks5ProxyHandler(proxy.address())); - } + } + case SOCKS5 -> { + if (proxy.username() != null && proxy.password() != null) { + pipeline.addLast("proxy", new Socks5ProxyHandler(proxy.address(), proxy.username(), proxy.password())); + } else { + pipeline.addLast("proxy", new Socks5ProxyHandler(proxy.address())); } - default -> throw new UnsupportedOperationException("Unsupported proxy type: " + proxy.type()); } + default -> throw new UnsupportedOperationException("Unsupported proxy type: " + proxy.type()); } } - private void addHAProxySupport(ChannelPipeline pipeline) { + private void initializeHAProxySupport(Channel channel) { InetSocketAddress clientAddress = getFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS); - if (getFlag(BuiltinFlags.ENABLE_CLIENT_PROXY_PROTOCOL, false) && clientAddress != null) { - pipeline.addFirst("proxy-protocol-packet-sender", new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6; - InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); - ctx.channel().writeAndFlush(new HAProxyMessage( - HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, - clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), - clientAddress.getPort(), remoteAddress.getPort() - )); - ctx.pipeline().remove(this); - ctx.pipeline().remove("proxy-protocol-encoder"); - super.channelActive(ctx); - } - }); - pipeline.addFirst("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE); + if (clientAddress == null) { + return; } + + channel.pipeline().addLast("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE); + HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6; + InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); + channel.writeAndFlush(new HAProxyMessage( + HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, + clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), + clientAddress.getPort(), remoteAddress.getPort() + )).addListener(future -> { + channel.pipeline().remove("proxy-protocol-encoder"); + }); } private static void createTcpEventLoopGroup() { @@ -264,7 +257,7 @@ private static void createTcpEventLoopGroup() { EVENT_LOOP_GROUP = TRANSPORT_TYPE.eventLoopGroupFactory().apply(newThreadFactory()); Runtime.getRuntime().addShutdownHook(new Thread( - () -> EVENT_LOOP_GROUP.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))); + () -> EVENT_LOOP_GROUP.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))); } protected static ThreadFactory newThreadFactory() { diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java index f6d5e9b70..decb5069b 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java @@ -5,29 +5,39 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; import io.netty.handler.codec.CorruptedFrameException; -import org.geysermc.mcprotocollib.network.Session; +import lombok.RequiredArgsConstructor; +import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper; +import org.geysermc.mcprotocollib.network.packet.PacketHeader; import java.util.List; +@RequiredArgsConstructor public class TcpPacketSizer extends ByteToMessageCodec { - private final Session session; - private final int size; - - public TcpPacketSizer(Session session, int size) { - this.session = session; - this.size = size; - } + private final PacketHeader header; + private final PacketCodecHelper codecHelper; @Override public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) { + int size = header.getLengthSize(); + if (size == 0) { + out.writeBytes(in); + return; + } + int length = in.readableBytes(); - out.ensureWritable(this.session.getPacketProtocol().getPacketHeader().getLengthSize(length) + length); - this.session.getPacketProtocol().getPacketHeader().writeLength(out, this.session.getCodecHelper(), length); + out.ensureWritable(header.getLengthSize(length) + length); + header.writeLength(out, codecHelper, length); out.writeBytes(in); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) { + int size = header.getLengthSize(); + if (size == 0) { + out.add(buf.retain()); + return; + } + buf.markReaderIndex(); byte[] lengthBytes = new byte[size]; for (int index = 0; index < lengthBytes.length; index++) { @@ -37,8 +47,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) } lengthBytes[index] = buf.readByte(); - if ((this.session.getPacketProtocol().getPacketHeader().isLengthVariable() && lengthBytes[index] >= 0) || index == size - 1) { - int length = this.session.getPacketProtocol().getPacketHeader().readLength(Unpooled.wrappedBuffer(lengthBytes), this.session.getCodecHelper(), buf.readableBytes()); + if ((header.isLengthVariable() && lengthBytes[index] >= 0) || index == size - 1) { + int length = header.readLength(Unpooled.wrappedBuffer(lengthBytes), codecHelper, buf.readableBytes()); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java index b3298fd1a..4d35e9666 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java @@ -2,13 +2,13 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.Future; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import org.geysermc.mcprotocollib.network.AbstractServer; import org.geysermc.mcprotocollib.network.BuiltinFlags; import org.geysermc.mcprotocollib.network.helper.TransportHelper; @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; public class TcpServer extends AbstractServer { @@ -60,13 +61,10 @@ public void initChannel(Channel channel) { ChannelPipeline pipeline = channel.pipeline(); - session.refreshReadTimeoutHandler(channel); - session.refreshWriteTimeoutHandler(channel); + pipeline.addLast("read-timeout", new ReadTimeoutHandler(session.getFlag(BuiltinFlags.READ_TIMEOUT, 30))); + pipeline.addLast("write-timeout", new WriteTimeoutHandler(session.getFlag(BuiltinFlags.WRITE_TIMEOUT, 0))); - int size = protocol.getPacketHeader().getLengthSize(); - if (size > 0) { - pipeline.addLast("sizer", new TcpPacketSizer(session, size)); - } + pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), session.getCodecHelper())); pipeline.addLast("codec", new TcpPacketCodec(session, false)); pipeline.addLast("manager", session); @@ -77,29 +75,22 @@ public void initChannel(Channel channel) { bootstrap.option(ChannelOption.TCP_FASTOPEN, 3); } - ChannelFuture future = bootstrap.bind(); - - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + bootstrap.bind().addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + channel = future.channel(); + if (callback != null) { + callback.run(); + } + } else { + log.error("Failed to bind connection listener.", future.cause()); } - channel = future.channel(); - if (callback != null) { - callback.run(); - } - } else { - future.addListener((ChannelFutureListener) future1 -> { - if (future1.isSuccess()) { - channel = future1.channel(); - if (callback != null) { - callback.run(); - } - } else { - log.error("Failed to asynchronously bind connection listener.", future1.cause()); - } - }); + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -107,26 +98,21 @@ public void initChannel(Channel channel) { public void closeImpl(boolean wait, final Runnable callback) { if (this.channel != null) { if (this.channel.isOpen()) { - ChannelFuture future = this.channel.close(); - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + this.channel.close().addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + if (callback != null) { + callback.run(); + } + } else { + log.error("Failed to close connection listener.", future.cause()); } - if (callback != null) { - callback.run(); - } - } else { - future.addListener((ChannelFutureListener) future1 -> { - if (future1.isSuccess()) { - if (callback != null) { - callback.run(); - } - } else { - log.error("Failed to asynchronously close connection listener.", future1.cause()); - } - }); + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -134,18 +120,17 @@ public void closeImpl(boolean wait, final Runnable callback) { } if (this.group != null) { - Future future = this.group.shutdownGracefully(); - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + this.group.shutdownGracefully().addListener(future -> { + if (!future.isSuccess()) { + log.debug("Failed to close connection listener.", future.cause()); } - } else { - future.addListener(future1 -> { - if (!future1.isSuccess()) { - log.debug("Failed to asynchronously close connection listener.", future1.cause()); - } - }); + + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } this.group = null; diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java index 94320ca43..462f85b4b 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java @@ -7,8 +7,6 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.util.concurrent.DefaultThreadFactory; import net.kyori.adventure.text.Component; import org.checkerframework.checker.nullness.qual.NonNull; @@ -50,9 +48,6 @@ public abstract class TcpSession extends SimpleChannelInboundHandler imp private final EventLoop eventLoop = createEventLoop(); private int compressionThreshold = -1; - private int connectTimeout = 30; - private int readTimeout = 30; - private int writeTimeout = 0; private final Map flags = new HashMap<>(); private final List listeners = new CopyOnWriteArrayList<>(); @@ -220,38 +215,6 @@ public void enableEncryption(PacketEncryption encryption) { channel.pipeline().addBefore("sizer", "encryption", new TcpPacketEncryptor(encryption)); } - @Override - public int getConnectTimeout() { - return this.connectTimeout; - } - - @Override - public void setConnectTimeout(int timeout) { - this.connectTimeout = timeout; - } - - @Override - public int getReadTimeout() { - return this.readTimeout; - } - - @Override - public void setReadTimeout(int timeout) { - this.readTimeout = timeout; - this.refreshReadTimeoutHandler(); - } - - @Override - public int getWriteTimeout() { - return this.writeTimeout; - } - - @Override - public void setWriteTimeout(int timeout) { - this.writeTimeout = timeout; - this.refreshWriteTimeoutHandler(); - } - @Override public boolean isConnected() { return this.channel != null && this.channel.isOpen() && !this.disconnected; @@ -313,46 +276,6 @@ public Channel getChannel() { return this.channel; } - protected void refreshReadTimeoutHandler() { - this.refreshReadTimeoutHandler(this.channel); - } - - protected void refreshReadTimeoutHandler(Channel channel) { - if (channel != null) { - if (this.readTimeout <= 0) { - if (channel.pipeline().get("readTimeout") != null) { - channel.pipeline().remove("readTimeout"); - } - } else { - if (channel.pipeline().get("readTimeout") == null) { - channel.pipeline().addFirst("readTimeout", new ReadTimeoutHandler(this.readTimeout)); - } else { - channel.pipeline().replace("readTimeout", "readTimeout", new ReadTimeoutHandler(this.readTimeout)); - } - } - } - } - - protected void refreshWriteTimeoutHandler() { - this.refreshWriteTimeoutHandler(this.channel); - } - - protected void refreshWriteTimeoutHandler(Channel channel) { - if (channel != null) { - if (this.writeTimeout <= 0) { - if (channel.pipeline().get("writeTimeout") != null) { - channel.pipeline().remove("writeTimeout"); - } - } else { - if (channel.pipeline().get("writeTimeout") == null) { - channel.pipeline().addFirst("writeTimeout", new WriteTimeoutHandler(this.writeTimeout)); - } else { - channel.pipeline().replace("writeTimeout", "writeTimeout", new WriteTimeoutHandler(this.writeTimeout)); - } - } - } - } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (this.disconnected || this.channel != null) {