From 716f229e6568a0532450fa6e0050ccc39ffa0178 Mon Sep 17 00:00:00 2001 From: masmc05 <63639746+masmc05@users.noreply.github.com> Date: Tue, 3 Sep 2024 01:34:43 +0300 Subject: [PATCH 1/6] Support shouldAuthenticate = false (#856) --- .../geysermc/mcprotocollib/protocol/ClientListener.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java index 011449666..0b6f8e35a 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.util.Collections; +import java.util.Objects; /** * Handles making initial login and status requests for clients. @@ -63,7 +64,7 @@ public void packetReceived(Session session, Packet packet) { GameProfile profile = session.getFlag(MinecraftConstants.PROFILE_KEY); String accessToken = session.getFlag(MinecraftConstants.ACCESS_TOKEN_KEY); - if (profile == null || accessToken == null) { + if ((profile == null || accessToken == null) && helloPacket.isShouldAuthenticate()) { throw new UnexpectedEncryptionException(); } @@ -81,7 +82,9 @@ public void packetReceived(Session session, Packet packet) { // TODO: Add generic error, disabled multiplayer and banned from playing online errors try { - sessionService.joinServer(profile, accessToken, serverId); + if (helloPacket.isShouldAuthenticate()) { + sessionService.joinServer(Objects.requireNonNull(profile, "final shouldAuthenticate changed value?"), accessToken, serverId); + } } catch (IOException e) { session.disconnect(Component.translatable("disconnect.loginFailedInfo", Component.text(e.getMessage())), e); return; From 4148fa9b1fdabea2562ca848653a66679b4d6069 Mon Sep 17 00:00:00 2001 From: Alex <40795980+AlexProgrammerDE@users.noreply.github.com> Date: Tue, 10 Sep 2024 09:16:58 +0200 Subject: [PATCH 2/6] Static sizer and timeout handlers in the pipeline (#833) * Improve pipeline This simplifies all pipeline code and ensures some listeners like the sizer are always present. The code already assumed that the sizer is always there and thus causes issues. The sizer can be deactivated still now and has pretty much no performance losses from this. The profit from this PR is that there is less logic with modifying the PR and thus developers interacting with the channel can assume specific things about the order and placements of elements in the pipeline. This will be useful once ViaVersion is supported, and it is expected that certain elements always are in the pipeline and don't change. My plan is to also always have an encryption and compression handler in the pipeline that is controlled via AttributeKeys from netty, but for that first #828 needs to be merged. So this PR only completes the goal partially, but that's fine. PR is ready for review like it is right now. * Revert some stuff * Fix channel race condition * Fix closing race condition * Prevent client race conditions. * Fix test failure, idk how, idk why, but it works now * Address review * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris --------- Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> Co-authored-by: chris --- .../mcprotocollib/network/BuiltinFlags.java | 23 ++- .../mcprotocollib/network/Session.java | 46 +---- .../network/tcp/TcpClientSession.java | 173 +++++++++--------- .../network/tcp/TcpPacketSizer.java | 34 ++-- .../mcprotocollib/network/tcp/TcpServer.java | 101 +++++----- .../mcprotocollib/network/tcp/TcpSession.java | 77 -------- 6 files changed, 172 insertions(+), 282 deletions(-) diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java index 7a060dd46..d602cb904 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/BuiltinFlags.java @@ -6,8 +6,11 @@ * Built-in PacketLib session flags. */ public class BuiltinFlags { - public static final Flag ENABLE_CLIENT_PROXY_PROTOCOL = new Flag<>("enable-client-proxy-protocol", Boolean.class); + /** + * Enables HAProxy protocol support. + * When this value is not null it represents the ip and port the client claims the connection is from. + */ public static final Flag CLIENT_PROXIED_ADDRESS = new Flag<>("client-proxied-address", InetSocketAddress.class); /** @@ -20,6 +23,24 @@ public class BuiltinFlags { */ public static final Flag TCP_FAST_OPEN = new Flag<>("tcp-fast-open", Boolean.class); + /** + * Connection timeout in seconds. + * Only used by the client. + */ + public static final Flag CLIENT_CONNECT_TIMEOUT = new Flag<>("client-connect-timeout", Integer.class); + + /** + * Read timeout in seconds. + * Used by both the server and client. + */ + public static final Flag READ_TIMEOUT = new Flag<>("read-timeout", Integer.class); + + /** + * Write timeout in seconds. + * Used by both the server and client. + */ + public static final Flag WRITE_TIMEOUT = new Flag<>("write-timeout", Integer.class); + private BuiltinFlags() { } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java index 7ccc0d220..a4af44e33 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java @@ -37,7 +37,7 @@ public interface Session { * @param wait Whether to wait for the connection to be established before returning. * @param transferring Whether the session is a client being transferred. */ - public void connect(boolean wait, boolean transferring); + void connect(boolean wait, boolean transferring); /** * Gets the host the session is connected to. @@ -138,7 +138,7 @@ public interface Session { * * @param flags Collection of flags */ - public void setFlags(Map flags); + void setFlags(Map flags); /** * Gets the listeners listening on this session. @@ -204,48 +204,6 @@ public interface Session { */ void enableEncryption(PacketEncryption encryption); - /** - * Gets the connect timeout for this session in seconds. - * - * @return The session's connect timeout. - */ - int getConnectTimeout(); - - /** - * Sets the connect timeout for this session in seconds. - * - * @param timeout Connect timeout to set. - */ - void setConnectTimeout(int timeout); - - /** - * Gets the read timeout for this session in seconds. - * - * @return The session's read timeout. - */ - int getReadTimeout(); - - /** - * Sets the read timeout for this session in seconds. - * - * @param timeout Read timeout to set. - */ - void setReadTimeout(int timeout); - - /** - * Gets the write timeout for this session in seconds. - * - * @return The session's write timeout. - */ - int getWriteTimeout(); - - /** - * Sets the write timeout for this session in seconds. - * - * @param timeout Write timeout to set. - */ - void setWriteTimeout(int timeout); - /** * Returns true if the session is connected. * diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java index f1dc98bae..a04d0d89d 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java @@ -4,9 +4,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -25,6 +22,8 @@ import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.proxy.Socks4ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.DefaultThreadFactory; @@ -40,6 +39,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -90,56 +90,51 @@ public void connect(boolean wait, boolean transferring) { createTcpEventLoopGroup(); } - try { - final Bootstrap bootstrap = new Bootstrap() - .channelFactory(TRANSPORT_TYPE.socketChannelFactory()) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.IP_TOS, 0x18) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout() * 1000) - .group(EVENT_LOOP_GROUP) - .remoteAddress(resolveAddress()) - .localAddress(bindAddress, bindPort) - .handler(new ChannelInitializer<>() { - @Override - public void initChannel(Channel channel) { - PacketProtocol protocol = getPacketProtocol(); - protocol.newClientSession(TcpClientSession.this, transferring); - - ChannelPipeline pipeline = channel.pipeline(); - - refreshReadTimeoutHandler(channel); - refreshWriteTimeoutHandler(channel); - - addProxy(pipeline); - - int size = protocol.getPacketHeader().getLengthSize(); - if (size > 0) { - pipeline.addLast("sizer", new TcpPacketSizer(TcpClientSession.this, size)); - } - - pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true)); - pipeline.addLast("manager", TcpClientSession.this); - - addHAProxySupport(pipeline); - } - }); + final Bootstrap bootstrap = new Bootstrap() + .channelFactory(TRANSPORT_TYPE.socketChannelFactory()) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.IP_TOS, 0x18) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getFlag(BuiltinFlags.CLIENT_CONNECT_TIMEOUT, 30) * 1000) + .group(EVENT_LOOP_GROUP) + .remoteAddress(resolveAddress()) + .localAddress(bindAddress, bindPort) + .handler(new ChannelInitializer<>() { + @Override + public void initChannel(Channel channel) { + PacketProtocol protocol = getPacketProtocol(); + protocol.newClientSession(TcpClientSession.this, transferring); - if (getFlag(BuiltinFlags.TCP_FAST_OPEN, false) && TRANSPORT_TYPE.supportsTcpFastOpenClient()) { - bootstrap.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); - } + ChannelPipeline pipeline = channel.pipeline(); - ChannelFuture future = bootstrap.connect(); - if (wait) { - future.sync(); - } + addProxy(pipeline); - future.addListener((futureListener) -> { - if (!futureListener.isSuccess()) { - exceptionCaught(null, futureListener.cause()); + initializeHAProxySupport(channel); + + pipeline.addLast("read-timeout", new ReadTimeoutHandler(getFlag(BuiltinFlags.READ_TIMEOUT, 30))); + pipeline.addLast("write-timeout", new WriteTimeoutHandler(getFlag(BuiltinFlags.WRITE_TIMEOUT, 0))); + + pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper())); + + pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true)); + pipeline.addLast("manager", TcpClientSession.this); } }); - } catch (Throwable t) { - exceptionCaught(null, t); + + if (getFlag(BuiltinFlags.TCP_FAST_OPEN, false) && TRANSPORT_TYPE.supportsTcpFastOpenClient()) { + bootstrap.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); + } + + CompletableFuture handleFuture = new CompletableFuture<>(); + bootstrap.connect().addListener((futureListener) -> { + if (!futureListener.isSuccess()) { + exceptionCaught(null, futureListener.cause()); + } + + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -155,8 +150,8 @@ private InetSocketAddress resolveAddress() { if (getFlag(BuiltinFlags.ATTEMPT_SRV_RESOLVE, true) && (!this.host.matches(IP_REGEX) && !this.host.equalsIgnoreCase("localhost"))) { AddressedEnvelope envelope = null; try (DnsNameResolver resolver = new DnsNameResolverBuilder(EVENT_LOOP_GROUP.next()) - .channelFactory(TRANSPORT_TYPE.datagramChannelFactory()) - .build()) { + .channelFactory(TRANSPORT_TYPE.datagramChannelFactory()) + .build()) { envelope = resolver.query(new DefaultDnsQuestion(name, DnsRecordType.SRV)).get(); DnsResponse response = envelope.content(); @@ -206,54 +201,52 @@ private InetSocketAddress resolveAddress() { } private void addProxy(ChannelPipeline pipeline) { - if (proxy != null) { - switch (proxy.type()) { - case HTTP -> { - if (proxy.username() != null && proxy.password() != null) { - pipeline.addFirst("proxy", new HttpProxyHandler(proxy.address(), proxy.username(), proxy.password())); - } else { - pipeline.addFirst("proxy", new HttpProxyHandler(proxy.address())); - } + if (proxy == null) { + return; + } + + switch (proxy.type()) { + case HTTP -> { + if (proxy.username() != null && proxy.password() != null) { + pipeline.addLast("proxy", new HttpProxyHandler(proxy.address(), proxy.username(), proxy.password())); + } else { + pipeline.addLast("proxy", new HttpProxyHandler(proxy.address())); } - case SOCKS4 -> { - if (proxy.username() != null) { - pipeline.addFirst("proxy", new Socks4ProxyHandler(proxy.address(), proxy.username())); - } else { - pipeline.addFirst("proxy", new Socks4ProxyHandler(proxy.address())); - } + } + case SOCKS4 -> { + if (proxy.username() != null) { + pipeline.addLast("proxy", new Socks4ProxyHandler(proxy.address(), proxy.username())); + } else { + pipeline.addLast("proxy", new Socks4ProxyHandler(proxy.address())); } - case SOCKS5 -> { - if (proxy.username() != null && proxy.password() != null) { - pipeline.addFirst("proxy", new Socks5ProxyHandler(proxy.address(), proxy.username(), proxy.password())); - } else { - pipeline.addFirst("proxy", new Socks5ProxyHandler(proxy.address())); - } + } + case SOCKS5 -> { + if (proxy.username() != null && proxy.password() != null) { + pipeline.addLast("proxy", new Socks5ProxyHandler(proxy.address(), proxy.username(), proxy.password())); + } else { + pipeline.addLast("proxy", new Socks5ProxyHandler(proxy.address())); } - default -> throw new UnsupportedOperationException("Unsupported proxy type: " + proxy.type()); } + default -> throw new UnsupportedOperationException("Unsupported proxy type: " + proxy.type()); } } - private void addHAProxySupport(ChannelPipeline pipeline) { + private void initializeHAProxySupport(Channel channel) { InetSocketAddress clientAddress = getFlag(BuiltinFlags.CLIENT_PROXIED_ADDRESS); - if (getFlag(BuiltinFlags.ENABLE_CLIENT_PROXY_PROTOCOL, false) && clientAddress != null) { - pipeline.addFirst("proxy-protocol-packet-sender", new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6; - InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); - ctx.channel().writeAndFlush(new HAProxyMessage( - HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, - clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), - clientAddress.getPort(), remoteAddress.getPort() - )); - ctx.pipeline().remove(this); - ctx.pipeline().remove("proxy-protocol-encoder"); - super.channelActive(ctx); - } - }); - pipeline.addFirst("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE); + if (clientAddress == null) { + return; } + + channel.pipeline().addLast("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE); + HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6; + InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); + channel.writeAndFlush(new HAProxyMessage( + HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, + clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), + clientAddress.getPort(), remoteAddress.getPort() + )).addListener(future -> { + channel.pipeline().remove("proxy-protocol-encoder"); + }); } private static void createTcpEventLoopGroup() { @@ -264,7 +257,7 @@ private static void createTcpEventLoopGroup() { EVENT_LOOP_GROUP = TRANSPORT_TYPE.eventLoopGroupFactory().apply(newThreadFactory()); Runtime.getRuntime().addShutdownHook(new Thread( - () -> EVENT_LOOP_GROUP.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))); + () -> EVENT_LOOP_GROUP.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))); } protected static ThreadFactory newThreadFactory() { diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java index f6d5e9b70..decb5069b 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketSizer.java @@ -5,29 +5,39 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; import io.netty.handler.codec.CorruptedFrameException; -import org.geysermc.mcprotocollib.network.Session; +import lombok.RequiredArgsConstructor; +import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper; +import org.geysermc.mcprotocollib.network.packet.PacketHeader; import java.util.List; +@RequiredArgsConstructor public class TcpPacketSizer extends ByteToMessageCodec { - private final Session session; - private final int size; - - public TcpPacketSizer(Session session, int size) { - this.session = session; - this.size = size; - } + private final PacketHeader header; + private final PacketCodecHelper codecHelper; @Override public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) { + int size = header.getLengthSize(); + if (size == 0) { + out.writeBytes(in); + return; + } + int length = in.readableBytes(); - out.ensureWritable(this.session.getPacketProtocol().getPacketHeader().getLengthSize(length) + length); - this.session.getPacketProtocol().getPacketHeader().writeLength(out, this.session.getCodecHelper(), length); + out.ensureWritable(header.getLengthSize(length) + length); + header.writeLength(out, codecHelper, length); out.writeBytes(in); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) { + int size = header.getLengthSize(); + if (size == 0) { + out.add(buf.retain()); + return; + } + buf.markReaderIndex(); byte[] lengthBytes = new byte[size]; for (int index = 0; index < lengthBytes.length; index++) { @@ -37,8 +47,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) } lengthBytes[index] = buf.readByte(); - if ((this.session.getPacketProtocol().getPacketHeader().isLengthVariable() && lengthBytes[index] >= 0) || index == size - 1) { - int length = this.session.getPacketProtocol().getPacketHeader().readLength(Unpooled.wrappedBuffer(lengthBytes), this.session.getCodecHelper(), buf.readableBytes()); + if ((header.isLengthVariable() && lengthBytes[index] >= 0) || index == size - 1) { + int length = header.readLength(Unpooled.wrappedBuffer(lengthBytes), codecHelper, buf.readableBytes()); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java index b3298fd1a..4d35e9666 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java @@ -2,13 +2,13 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.Future; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import org.geysermc.mcprotocollib.network.AbstractServer; import org.geysermc.mcprotocollib.network.BuiltinFlags; import org.geysermc.mcprotocollib.network.helper.TransportHelper; @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; public class TcpServer extends AbstractServer { @@ -60,13 +61,10 @@ public void initChannel(Channel channel) { ChannelPipeline pipeline = channel.pipeline(); - session.refreshReadTimeoutHandler(channel); - session.refreshWriteTimeoutHandler(channel); + pipeline.addLast("read-timeout", new ReadTimeoutHandler(session.getFlag(BuiltinFlags.READ_TIMEOUT, 30))); + pipeline.addLast("write-timeout", new WriteTimeoutHandler(session.getFlag(BuiltinFlags.WRITE_TIMEOUT, 0))); - int size = protocol.getPacketHeader().getLengthSize(); - if (size > 0) { - pipeline.addLast("sizer", new TcpPacketSizer(session, size)); - } + pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), session.getCodecHelper())); pipeline.addLast("codec", new TcpPacketCodec(session, false)); pipeline.addLast("manager", session); @@ -77,29 +75,22 @@ public void initChannel(Channel channel) { bootstrap.option(ChannelOption.TCP_FASTOPEN, 3); } - ChannelFuture future = bootstrap.bind(); - - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + bootstrap.bind().addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + channel = future.channel(); + if (callback != null) { + callback.run(); + } + } else { + log.error("Failed to bind connection listener.", future.cause()); } - channel = future.channel(); - if (callback != null) { - callback.run(); - } - } else { - future.addListener((ChannelFutureListener) future1 -> { - if (future1.isSuccess()) { - channel = future1.channel(); - if (callback != null) { - callback.run(); - } - } else { - log.error("Failed to asynchronously bind connection listener.", future1.cause()); - } - }); + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -107,26 +98,21 @@ public void initChannel(Channel channel) { public void closeImpl(boolean wait, final Runnable callback) { if (this.channel != null) { if (this.channel.isOpen()) { - ChannelFuture future = this.channel.close(); - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + this.channel.close().addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + if (callback != null) { + callback.run(); + } + } else { + log.error("Failed to close connection listener.", future.cause()); } - if (callback != null) { - callback.run(); - } - } else { - future.addListener((ChannelFutureListener) future1 -> { - if (future1.isSuccess()) { - if (callback != null) { - callback.run(); - } - } else { - log.error("Failed to asynchronously close connection listener.", future1.cause()); - } - }); + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } } @@ -134,18 +120,17 @@ public void closeImpl(boolean wait, final Runnable callback) { } if (this.group != null) { - Future future = this.group.shutdownGracefully(); - if (wait) { - try { - future.sync(); - } catch (InterruptedException e) { + CompletableFuture handleFuture = new CompletableFuture<>(); + this.group.shutdownGracefully().addListener(future -> { + if (!future.isSuccess()) { + log.debug("Failed to close connection listener.", future.cause()); } - } else { - future.addListener(future1 -> { - if (!future1.isSuccess()) { - log.debug("Failed to asynchronously close connection listener.", future1.cause()); - } - }); + + handleFuture.complete(null); + }); + + if (wait) { + handleFuture.join(); } this.group = null; diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java index 94320ca43..462f85b4b 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java @@ -7,8 +7,6 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.util.concurrent.DefaultThreadFactory; import net.kyori.adventure.text.Component; import org.checkerframework.checker.nullness.qual.NonNull; @@ -50,9 +48,6 @@ public abstract class TcpSession extends SimpleChannelInboundHandler imp private final EventLoop eventLoop = createEventLoop(); private int compressionThreshold = -1; - private int connectTimeout = 30; - private int readTimeout = 30; - private int writeTimeout = 0; private final Map flags = new HashMap<>(); private final List listeners = new CopyOnWriteArrayList<>(); @@ -220,38 +215,6 @@ public void enableEncryption(PacketEncryption encryption) { channel.pipeline().addBefore("sizer", "encryption", new TcpPacketEncryptor(encryption)); } - @Override - public int getConnectTimeout() { - return this.connectTimeout; - } - - @Override - public void setConnectTimeout(int timeout) { - this.connectTimeout = timeout; - } - - @Override - public int getReadTimeout() { - return this.readTimeout; - } - - @Override - public void setReadTimeout(int timeout) { - this.readTimeout = timeout; - this.refreshReadTimeoutHandler(); - } - - @Override - public int getWriteTimeout() { - return this.writeTimeout; - } - - @Override - public void setWriteTimeout(int timeout) { - this.writeTimeout = timeout; - this.refreshWriteTimeoutHandler(); - } - @Override public boolean isConnected() { return this.channel != null && this.channel.isOpen() && !this.disconnected; @@ -313,46 +276,6 @@ public Channel getChannel() { return this.channel; } - protected void refreshReadTimeoutHandler() { - this.refreshReadTimeoutHandler(this.channel); - } - - protected void refreshReadTimeoutHandler(Channel channel) { - if (channel != null) { - if (this.readTimeout <= 0) { - if (channel.pipeline().get("readTimeout") != null) { - channel.pipeline().remove("readTimeout"); - } - } else { - if (channel.pipeline().get("readTimeout") == null) { - channel.pipeline().addFirst("readTimeout", new ReadTimeoutHandler(this.readTimeout)); - } else { - channel.pipeline().replace("readTimeout", "readTimeout", new ReadTimeoutHandler(this.readTimeout)); - } - } - } - } - - protected void refreshWriteTimeoutHandler() { - this.refreshWriteTimeoutHandler(this.channel); - } - - protected void refreshWriteTimeoutHandler(Channel channel) { - if (channel != null) { - if (this.writeTimeout <= 0) { - if (channel.pipeline().get("writeTimeout") != null) { - channel.pipeline().remove("writeTimeout"); - } - } else { - if (channel.pipeline().get("writeTimeout") == null) { - channel.pipeline().addFirst("writeTimeout", new WriteTimeoutHandler(this.writeTimeout)); - } else { - channel.pipeline().replace("writeTimeout", "writeTimeout", new WriteTimeoutHandler(this.writeTimeout)); - } - } - } - } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (this.disconnected || this.channel != null) { From de6bbe5f82ea8ff460fc18b902607f0df91c3d92 Mon Sep 17 00:00:00 2001 From: Alex <40795980+AlexProgrammerDE@users.noreply.github.com> Date: Tue, 17 Sep 2024 03:44:28 +0200 Subject: [PATCH 3/6] Allow accessing backing class for a channel instead of only its factory (#859) Some netty libraries that are poorly coded only accept classes. Example: https://github.com/CloudburstMC/Network/pull/42 --- .../network/helper/TransportHelper.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/helper/TransportHelper.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/helper/TransportHelper.java index 698110ad6..7e5268373 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/helper/TransportHelper.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/helper/TransportHelper.java @@ -34,8 +34,11 @@ public enum TransportMethod { } public record TransportType(TransportMethod method, + Class serverSocketChannelClass, ChannelFactory serverSocketChannelFactory, + Class socketChannelClass, ChannelFactory socketChannelFactory, + Class datagramChannelClass, ChannelFactory datagramChannelFactory, Function eventLoopGroupFactory, boolean supportsTcpFastOpenServer, @@ -46,8 +49,11 @@ public static TransportType determineTransportMethod() { if (isClassAvailable("io.netty.incubator.channel.uring.IOUring") && IOUring.isAvailable()) { return new TransportType( TransportMethod.IO_URING, + IOUringServerSocketChannel.class, IOUringServerSocketChannel::new, + IOUringSocketChannel.class, IOUringSocketChannel::new, + IOUringDatagramChannel.class, IOUringDatagramChannel::new, factory -> new IOUringEventLoopGroup(0, factory), IOUring.isTcpFastOpenServerSideAvailable(), @@ -58,8 +64,11 @@ public static TransportType determineTransportMethod() { if (isClassAvailable("io.netty.channel.epoll.Epoll") && Epoll.isAvailable()) { return new TransportType( TransportMethod.EPOLL, + EpollServerSocketChannel.class, EpollServerSocketChannel::new, + EpollSocketChannel.class, EpollSocketChannel::new, + EpollDatagramChannel.class, EpollDatagramChannel::new, factory -> new EpollEventLoopGroup(0, factory), Epoll.isTcpFastOpenServerSideAvailable(), @@ -70,8 +79,11 @@ public static TransportType determineTransportMethod() { if (isClassAvailable("io.netty.channel.kqueue.KQueue") && KQueue.isAvailable()) { return new TransportType( TransportMethod.KQUEUE, + KQueueServerSocketChannel.class, KQueueServerSocketChannel::new, + KQueueSocketChannel.class, KQueueSocketChannel::new, + KQueueDatagramChannel.class, KQueueDatagramChannel::new, factory -> new KQueueEventLoopGroup(0, factory), KQueue.isTcpFastOpenServerSideAvailable(), @@ -81,8 +93,11 @@ public static TransportType determineTransportMethod() { return new TransportType( TransportMethod.NIO, + NioServerSocketChannel.class, NioServerSocketChannel::new, + NioSocketChannel.class, NioSocketChannel::new, + NioDatagramChannel.class, NioDatagramChannel::new, factory -> new NioEventLoopGroup(0, factory), false, From b2c9268633760ab4346d1c4ec7ef44ba4972ddb7 Mon Sep 17 00:00:00 2001 From: Alex <40795980+AlexProgrammerDE@users.noreply.github.com> Date: Thu, 19 Sep 2024 18:40:36 +0200 Subject: [PATCH 4/6] Implement static compression and encryption pipeline (#858) * Implement static compression and encryption pipeline * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java Co-authored-by: chris * Update Session.java * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> * Use temp var * Add nullable annotation * enable -> create * Rename encryption method * Fix compression id * Fix -1 compression in ServerListener * Compress and encrypt in unit tests --------- Co-authored-by: chris Co-authored-by: Konicai <71294714+Konicai@users.noreply.github.com> --- .../example/ClientSessionListener.java | 2 +- .../network/example/ServerListener.java | 2 +- .../network/example/TestProtocol.java | 8 +-- .../example/MinecraftProtocolTest.java | 10 ++-- .../network/NetworkConstants.java | 10 ++++ .../mcprotocollib/network/Session.java | 24 ++++---- .../compression/CompressionConfig.java | 4 ++ .../network/crypt/EncryptionConfig.java | 4 ++ .../network/tcp/TcpClientSession.java | 2 + .../network/tcp/TcpPacketCompression.java | 55 +++++++++++-------- .../network/tcp/TcpPacketEncryptor.java | 25 ++++++--- .../mcprotocollib/network/tcp/TcpServer.java | 2 + .../mcprotocollib/network/tcp/TcpSession.java | 43 +++++++-------- .../protocol/ClientListener.java | 8 ++- .../protocol/MinecraftConstants.java | 9 ++- .../protocol/MinecraftProtocol.java | 8 +-- .../protocol/ServerListener.java | 19 ++++--- .../protocol/MinecraftProtocolTest.java | 12 ++-- 18 files changed, 147 insertions(+), 100 deletions(-) create mode 100644 protocol/src/main/java/org/geysermc/mcprotocollib/network/NetworkConstants.java create mode 100644 protocol/src/main/java/org/geysermc/mcprotocollib/network/compression/CompressionConfig.java create mode 100644 protocol/src/main/java/org/geysermc/mcprotocollib/network/crypt/EncryptionConfig.java diff --git a/example/src/main/java/org/geysermc/mcprotocollib/network/example/ClientSessionListener.java b/example/src/main/java/org/geysermc/mcprotocollib/network/example/ClientSessionListener.java index 029e6365c..ac42ccf38 100644 --- a/example/src/main/java/org/geysermc/mcprotocollib/network/example/ClientSessionListener.java +++ b/example/src/main/java/org/geysermc/mcprotocollib/network/example/ClientSessionListener.java @@ -39,7 +39,7 @@ public void packetSent(Session session, Packet packet) { public void connected(ConnectedEvent event) { log.info("CLIENT Connected"); - event.getSession().enableEncryption(((TestProtocol) event.getSession().getPacketProtocol()).getEncryption()); + event.getSession().setEncryption(((TestProtocol) event.getSession().getPacketProtocol()).getEncryption()); event.getSession().send(new PingPacket("hello")); } diff --git a/example/src/main/java/org/geysermc/mcprotocollib/network/example/ServerListener.java b/example/src/main/java/org/geysermc/mcprotocollib/network/example/ServerListener.java index 13e8b5eb8..f654d41a4 100644 --- a/example/src/main/java/org/geysermc/mcprotocollib/network/example/ServerListener.java +++ b/example/src/main/java/org/geysermc/mcprotocollib/network/example/ServerListener.java @@ -38,7 +38,7 @@ public void serverClosed(ServerClosedEvent event) { public void sessionAdded(SessionAddedEvent event) { log.info("SERVER Session Added: {}:{}", event.getSession().getHost(), event.getSession().getPort()); ((TestProtocol) event.getSession().getPacketProtocol()).setSecretKey(this.key); - event.getSession().enableEncryption(((TestProtocol) event.getSession().getPacketProtocol()).getEncryption()); + event.getSession().setEncryption(((TestProtocol) event.getSession().getPacketProtocol()).getEncryption()); } @Override diff --git a/example/src/main/java/org/geysermc/mcprotocollib/network/example/TestProtocol.java b/example/src/main/java/org/geysermc/mcprotocollib/network/example/TestProtocol.java index 5a7a8a270..cc108fb2c 100644 --- a/example/src/main/java/org/geysermc/mcprotocollib/network/example/TestProtocol.java +++ b/example/src/main/java/org/geysermc/mcprotocollib/network/example/TestProtocol.java @@ -8,7 +8,7 @@ import org.geysermc.mcprotocollib.network.codec.PacketDefinition; import org.geysermc.mcprotocollib.network.codec.PacketSerializer; import org.geysermc.mcprotocollib.network.crypt.AESEncryption; -import org.geysermc.mcprotocollib.network.crypt.PacketEncryption; +import org.geysermc.mcprotocollib.network.crypt.EncryptionConfig; import org.geysermc.mcprotocollib.network.packet.DefaultPacketHeader; import org.geysermc.mcprotocollib.network.packet.PacketHeader; import org.geysermc.mcprotocollib.network.packet.PacketProtocol; @@ -23,7 +23,7 @@ public class TestProtocol extends PacketProtocol { private static final Logger log = LoggerFactory.getLogger(TestProtocol.class); private final PacketHeader header = new DefaultPacketHeader(); private final PacketRegistry registry = new PacketRegistry(); - private AESEncryption encrypt; + private EncryptionConfig encrypt; @SuppressWarnings("unused") public TestProtocol() { @@ -51,7 +51,7 @@ public PingPacket deserialize(ByteBuf buf, PacketCodecHelper helper, PacketDefin }); try { - this.encrypt = new AESEncryption(key); + this.encrypt = new EncryptionConfig(new AESEncryption(key)); } catch (GeneralSecurityException e) { log.error("Failed to create encryption", e); } @@ -67,7 +67,7 @@ public PacketHeader getPacketHeader() { return this.header; } - public PacketEncryption getEncryption() { + public EncryptionConfig getEncryption() { return this.encrypt; } diff --git a/example/src/main/java/org/geysermc/mcprotocollib/protocol/example/MinecraftProtocolTest.java b/example/src/main/java/org/geysermc/mcprotocollib/protocol/example/MinecraftProtocolTest.java index b3e7e93f7..71250757d 100644 --- a/example/src/main/java/org/geysermc/mcprotocollib/protocol/example/MinecraftProtocolTest.java +++ b/example/src/main/java/org/geysermc/mcprotocollib/protocol/example/MinecraftProtocolTest.java @@ -48,7 +48,8 @@ public class MinecraftProtocolTest { private static final Logger log = LoggerFactory.getLogger(MinecraftProtocolTest.class); private static final boolean SPAWN_SERVER = true; - private static final boolean VERIFY_USERS = false; + private static final boolean ENCRYPT_CONNECTION = true; + private static final boolean SHOULD_AUTHENTICATE = false; private static final String HOST = "127.0.0.1"; private static final int PORT = 25565; private static final ProxyInfo PROXY = null; @@ -63,7 +64,8 @@ public static void main(String[] args) { Server server = new TcpServer(HOST, PORT, MinecraftProtocol::new); server.setGlobalFlag(MinecraftConstants.SESSION_SERVICE_KEY, sessionService); - server.setGlobalFlag(MinecraftConstants.VERIFY_USERS_KEY, VERIFY_USERS); + server.setGlobalFlag(MinecraftConstants.ENCRYPT_CONNECTION, ENCRYPT_CONNECTION); + server.setGlobalFlag(MinecraftConstants.SHOULD_AUTHENTICATE, SHOULD_AUTHENTICATE); server.setGlobalFlag(MinecraftConstants.SERVER_INFO_BUILDER_KEY, session -> new ServerStatusInfo( Component.text("Hello world!"), @@ -100,7 +102,7 @@ public static void main(String[] args) { )) ); - server.setGlobalFlag(MinecraftConstants.SERVER_COMPRESSION_THRESHOLD, 100); + server.setGlobalFlag(MinecraftConstants.SERVER_COMPRESSION_THRESHOLD, 256); server.addListener(new ServerAdapter() { @Override public void serverClosed(ServerClosedEvent event) { @@ -177,7 +179,7 @@ private static void status() { private static void login() { MinecraftProtocol protocol; - if (VERIFY_USERS) { + if (SHOULD_AUTHENTICATE) { StepFullJavaSession.FullJavaSession fullJavaSession; try { fullJavaSession = MinecraftAuth.JAVA_CREDENTIALS_LOGIN.getFromInput( diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/NetworkConstants.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/NetworkConstants.java new file mode 100644 index 000000000..c29889051 --- /dev/null +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/NetworkConstants.java @@ -0,0 +1,10 @@ +package org.geysermc.mcprotocollib.network; + +import io.netty.util.AttributeKey; +import org.geysermc.mcprotocollib.network.compression.CompressionConfig; +import org.geysermc.mcprotocollib.network.crypt.EncryptionConfig; + +public class NetworkConstants { + public static final AttributeKey COMPRESSION_ATTRIBUTE_KEY = AttributeKey.valueOf("compression"); + public static final AttributeKey ENCRYPTION_ATTRIBUTE_KEY = AttributeKey.valueOf("encryption"); +} diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java index a4af44e33..054c76ff1 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java @@ -4,7 +4,8 @@ import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper; -import org.geysermc.mcprotocollib.network.crypt.PacketEncryption; +import org.geysermc.mcprotocollib.network.compression.CompressionConfig; +import org.geysermc.mcprotocollib.network.crypt.EncryptionConfig; import org.geysermc.mcprotocollib.network.event.session.SessionEvent; import org.geysermc.mcprotocollib.network.event.session.SessionListener; import org.geysermc.mcprotocollib.network.packet.Packet; @@ -183,26 +184,21 @@ public interface Session { void callPacketSent(Packet packet); /** - * Gets the compression packet length threshold for this session (-1 = disabled). + * Sets the compression config for this session. * - * @return This session's compression threshold. + * @param compressionConfig the compression to compress with, + * or null to disable compression */ - int getCompressionThreshold(); + void setCompression(@Nullable CompressionConfig compressionConfig); /** - * Sets the compression packet length threshold for this session (-1 = disabled). + * Sets encryption for this session. * - * @param threshold The new compression threshold. - * @param validateDecompression whether to validate that the decompression fits within size checks. - */ - void setCompressionThreshold(int threshold, boolean validateDecompression); - - /** - * Enables encryption for this session. + * @param encryptionConfig the encryption to encrypt with, + * or null to disable encryption * - * @param encryption the encryption to encrypt with */ - void enableEncryption(PacketEncryption encryption); + void setEncryption(@Nullable EncryptionConfig encryptionConfig); /** * Returns true if the session is connected. diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/compression/CompressionConfig.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/compression/CompressionConfig.java new file mode 100644 index 000000000..20444c22f --- /dev/null +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/compression/CompressionConfig.java @@ -0,0 +1,4 @@ +package org.geysermc.mcprotocollib.network.compression; + +public record CompressionConfig(int threshold, PacketCompression compression, boolean validateDecompression) { +} diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/crypt/EncryptionConfig.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/crypt/EncryptionConfig.java new file mode 100644 index 000000000..ed5c3660f --- /dev/null +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/crypt/EncryptionConfig.java @@ -0,0 +1,4 @@ +package org.geysermc.mcprotocollib.network.crypt; + +public record EncryptionConfig(PacketEncryption encryption) { +} diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java index a04d0d89d..a430d09bd 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java @@ -113,7 +113,9 @@ public void initChannel(Channel channel) { pipeline.addLast("read-timeout", new ReadTimeoutHandler(getFlag(BuiltinFlags.READ_TIMEOUT, 30))); pipeline.addLast("write-timeout", new WriteTimeoutHandler(getFlag(BuiltinFlags.WRITE_TIMEOUT, 0))); + pipeline.addLast("encryption", new TcpPacketEncryptor()); pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper())); + pipeline.addLast("compression", new TcpPacketCompression(getCodecHelper())); pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true)); pipeline.addLast("manager", TcpClientSession.this); diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketCompression.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketCompression.java index 8a9b928c6..e83ef8f50 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketCompression.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketCompression.java @@ -4,44 +4,49 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.MessageToMessageCodec; -import org.geysermc.mcprotocollib.network.Session; -import org.geysermc.mcprotocollib.network.compression.PacketCompression; +import lombok.RequiredArgsConstructor; +import org.geysermc.mcprotocollib.network.NetworkConstants; +import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper; +import org.geysermc.mcprotocollib.network.compression.CompressionConfig; import java.util.List; +@RequiredArgsConstructor public class TcpPacketCompression extends MessageToMessageCodec { private static final int MAX_UNCOMPRESSED_SIZE = 8 * 1024 * 1024; // 8MiB - - private final Session session; - private final PacketCompression compression; - private final boolean validateDecompression; - - public TcpPacketCompression(Session session, PacketCompression compression, boolean validateDecompression) { - this.session = session; - this.compression = compression; - this.validateDecompression = validateDecompression; - } + private final PacketCodecHelper helper; @Override public void handlerRemoved(ChannelHandlerContext ctx) { - this.compression.close(); + CompressionConfig config = ctx.channel().attr(NetworkConstants.COMPRESSION_ATTRIBUTE_KEY).get(); + if (config == null) { + return; + } + + config.compression().close(); } @Override public void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) { + CompressionConfig config = ctx.channel().attr(NetworkConstants.COMPRESSION_ATTRIBUTE_KEY).get(); + if (config == null) { + out.add(msg.retain()); + return; + } + int uncompressed = msg.readableBytes(); if (uncompressed > MAX_UNCOMPRESSED_SIZE) { throw new IllegalArgumentException("Packet too big (is " + uncompressed + ", should be less than " + MAX_UNCOMPRESSED_SIZE + ")"); } ByteBuf outBuf = ctx.alloc().directBuffer(uncompressed); - if (uncompressed < this.session.getCompressionThreshold()) { + if (uncompressed < config.threshold()) { // Under the threshold, there is nothing to do. - this.session.getCodecHelper().writeVarInt(outBuf, 0); + this.helper.writeVarInt(outBuf, 0); outBuf.writeBytes(msg); } else { - this.session.getCodecHelper().writeVarInt(outBuf, uncompressed); - compression.deflate(msg, outBuf); + this.helper.writeVarInt(outBuf, uncompressed); + config.compression().deflate(msg, outBuf); } out.add(outBuf); @@ -49,15 +54,21 @@ public void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { - int claimedUncompressedSize = this.session.getCodecHelper().readVarInt(in); + CompressionConfig config = ctx.channel().attr(NetworkConstants.COMPRESSION_ATTRIBUTE_KEY).get(); + if (config == null) { + out.add(in.retain()); + return; + } + + int claimedUncompressedSize = this.helper.readVarInt(in); if (claimedUncompressedSize == 0) { out.add(in.retain()); return; } - if (validateDecompression) { - if (claimedUncompressedSize < this.session.getCompressionThreshold()) { - throw new DecoderException("Badly compressed packet - size of " + claimedUncompressedSize + " is below server threshold of " + this.session.getCompressionThreshold()); + if (config.validateDecompression()) { + if (claimedUncompressedSize < config.threshold()) { + throw new DecoderException("Badly compressed packet - size of " + claimedUncompressedSize + " is below server threshold of " + config.threshold()); } if (claimedUncompressedSize > MAX_UNCOMPRESSED_SIZE) { @@ -67,7 +78,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { ByteBuf uncompressed = ctx.alloc().directBuffer(claimedUncompressedSize); try { - compression.inflate(in, uncompressed, claimedUncompressedSize); + config.compression().inflate(in, uncompressed, claimedUncompressedSize); out.add(uncompressed); } catch (Exception e) { uncompressed.release(); diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketEncryptor.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketEncryptor.java index 2d8d1c21b..819c5c6e9 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketEncryptor.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketEncryptor.java @@ -6,26 +6,27 @@ import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.EncoderException; import io.netty.handler.codec.MessageToMessageCodec; -import org.geysermc.mcprotocollib.network.crypt.PacketEncryption; +import org.geysermc.mcprotocollib.network.NetworkConstants; +import org.geysermc.mcprotocollib.network.crypt.EncryptionConfig; import java.util.List; public class TcpPacketEncryptor extends MessageToMessageCodec { - private final PacketEncryption encryption; - - public TcpPacketEncryptor(PacketEncryption encryption) { - this.encryption = encryption; - } - @Override public void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) { + EncryptionConfig config = ctx.channel().attr(NetworkConstants.ENCRYPTION_ATTRIBUTE_KEY).get(); + if (config == null) { + out.add(msg.retain()); + return; + } + ByteBuf heapBuf = this.ensureHeapBuffer(ctx.alloc(), msg); int inBytes = heapBuf.readableBytes(); int baseOffset = heapBuf.arrayOffset() + heapBuf.readerIndex(); try { - encryption.encrypt(heapBuf.array(), baseOffset, inBytes, heapBuf.array(), baseOffset); + config.encryption().encrypt(heapBuf.array(), baseOffset, inBytes, heapBuf.array(), baseOffset); out.add(heapBuf); } catch (Exception e) { heapBuf.release(); @@ -35,13 +36,19 @@ public void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { + EncryptionConfig config = ctx.channel().attr(NetworkConstants.ENCRYPTION_ATTRIBUTE_KEY).get(); + if (config == null) { + out.add(in.retain()); + return; + } + ByteBuf heapBuf = this.ensureHeapBuffer(ctx.alloc(), in).slice(); int inBytes = heapBuf.readableBytes(); int baseOffset = heapBuf.arrayOffset() + heapBuf.readerIndex(); try { - encryption.decrypt(heapBuf.array(), baseOffset, inBytes, heapBuf.array(), baseOffset); + config.encryption().decrypt(heapBuf.array(), baseOffset, inBytes, heapBuf.array(), baseOffset); out.add(heapBuf); } catch (Exception e) { heapBuf.release(); diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java index 4d35e9666..b6fdaab32 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java @@ -64,7 +64,9 @@ public void initChannel(Channel channel) { pipeline.addLast("read-timeout", new ReadTimeoutHandler(session.getFlag(BuiltinFlags.READ_TIMEOUT, 30))); pipeline.addLast("write-timeout", new WriteTimeoutHandler(session.getFlag(BuiltinFlags.WRITE_TIMEOUT, 0))); + pipeline.addLast("encryption", new TcpPacketEncryptor()); pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), session.getCodecHelper())); + pipeline.addLast("compression", new TcpPacketCompression(session.getCodecHelper())); pipeline.addLast("codec", new TcpPacketCodec(session, false)); pipeline.addLast("manager", session); diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java index 462f85b4b..8a1ee37cb 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java @@ -12,9 +12,10 @@ import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.geysermc.mcprotocollib.network.Flag; +import org.geysermc.mcprotocollib.network.NetworkConstants; import org.geysermc.mcprotocollib.network.Session; -import org.geysermc.mcprotocollib.network.compression.ZlibCompression; -import org.geysermc.mcprotocollib.network.crypt.PacketEncryption; +import org.geysermc.mcprotocollib.network.compression.CompressionConfig; +import org.geysermc.mcprotocollib.network.crypt.EncryptionConfig; import org.geysermc.mcprotocollib.network.event.session.ConnectedEvent; import org.geysermc.mcprotocollib.network.event.session.DisconnectedEvent; import org.geysermc.mcprotocollib.network.event.session.DisconnectingEvent; @@ -23,6 +24,8 @@ import org.geysermc.mcprotocollib.network.event.session.SessionListener; import org.geysermc.mcprotocollib.network.packet.Packet; import org.geysermc.mcprotocollib.network.packet.PacketProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.SocketAddress; import java.util.Collections; @@ -34,6 +37,8 @@ import java.util.concurrent.TimeoutException; public abstract class TcpSession extends SimpleChannelInboundHandler implements Session { + private static final Logger log = LoggerFactory.getLogger(TcpSession.class); + /** * Controls whether non-priority packets are handled in a separate event loop */ @@ -47,8 +52,6 @@ public abstract class TcpSession extends SimpleChannelInboundHandler imp private final PacketProtocol protocol; private final EventLoop eventLoop = createEventLoop(); - private int compressionThreshold = -1; - private final Map flags = new HashMap<>(); private final List listeners = new CopyOnWriteArrayList<>(); @@ -188,31 +191,23 @@ public void callPacketSent(Packet packet) { } @Override - public int getCompressionThreshold() { - return this.compressionThreshold; - } - - @Override - public void setCompressionThreshold(int threshold, boolean validateDecompression) { - this.compressionThreshold = threshold; - if (this.channel != null) { - if (this.compressionThreshold >= 0) { - if (this.channel.pipeline().get("compression") == null) { - this.channel.pipeline().addBefore("codec", "compression", - new TcpPacketCompression(this, new ZlibCompression(), validateDecompression)); - } - } else if (this.channel.pipeline().get("compression") != null) { - this.channel.pipeline().remove("compression"); - } + public void setCompression(@Nullable CompressionConfig compressionConfig) { + if (this.channel == null) { + throw new IllegalStateException("You need to connect to set the compression!"); } + + log.debug("Setting compression for session {}", this); + channel.attr(NetworkConstants.COMPRESSION_ATTRIBUTE_KEY).set(compressionConfig); } @Override - public void enableEncryption(PacketEncryption encryption) { + public void setEncryption(@Nullable EncryptionConfig encryptionConfig) { if (channel == null) { - throw new IllegalStateException("Connect the client before initializing encryption!"); + throw new IllegalStateException("You need to connect to set the encryption!"); } - channel.pipeline().addBefore("sizer", "encryption", new TcpPacketEncryptor(encryption)); + + log.debug("Setting encryption for session {}", this); + channel.attr(NetworkConstants.ENCRYPTION_ATTRIBUTE_KEY).set(encryptionConfig); } @Override @@ -267,7 +262,7 @@ public void disconnect(@NonNull Component reason, @Nullable Throwable cause) { // daemon threads and their interaction with the runtime. PACKET_EVENT_LOOP = new DefaultEventLoopGroup(new DefaultThreadFactory(this.getClass(), true)); Runtime.getRuntime().addShutdownHook(new Thread( - () -> PACKET_EVENT_LOOP.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))); + () -> PACKET_EVENT_LOOP.shutdownGracefully(SHUTDOWN_QUIET_PERIOD_MS, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))); } return PACKET_EVENT_LOOP.next(); } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java index 0b6f8e35a..6b9ead048 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java @@ -7,6 +7,8 @@ import org.geysermc.mcprotocollib.auth.GameProfile; import org.geysermc.mcprotocollib.auth.SessionService; import org.geysermc.mcprotocollib.network.Session; +import org.geysermc.mcprotocollib.network.compression.CompressionConfig; +import org.geysermc.mcprotocollib.network.compression.ZlibCompression; import org.geysermc.mcprotocollib.network.event.session.ConnectedEvent; import org.geysermc.mcprotocollib.network.event.session.SessionAdapter; import org.geysermc.mcprotocollib.network.packet.Packet; @@ -91,13 +93,15 @@ public void packetReceived(Session session, Packet packet) { } session.send(new ServerboundKeyPacket(helloPacket.getPublicKey(), key, helloPacket.getChallenge())); - session.enableEncryption(protocol.enableEncryption(key)); + session.setEncryption(protocol.createEncryption(key)); } else if (packet instanceof ClientboundGameProfilePacket) { session.send(new ServerboundLoginAcknowledgedPacket()); } else if (packet instanceof ClientboundLoginDisconnectPacket loginDisconnectPacket) { session.disconnect(loginDisconnectPacket.getReason()); } else if (packet instanceof ClientboundLoginCompressionPacket loginCompressionPacket) { - session.setCompressionThreshold(loginCompressionPacket.getThreshold(), false); + int threshold = loginCompressionPacket.getThreshold(); + session.setCompression(threshold >= 0 ? + new CompressionConfig(threshold, new ZlibCompression(), false) : null); } } else if (protocol.getState() == ProtocolState.STATUS) { if (packet instanceof ClientboundStatusResponsePacket statusResponsePacket) { diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftConstants.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftConstants.java index 19e59e3f2..40b0b3165 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftConstants.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftConstants.java @@ -63,9 +63,14 @@ public final class MinecraftConstants { // Server Key Constants /** - * Session flag for determining whether to verify users. Server only. + * Session flag for determining whether to encrypt the connection. Server only. */ - public static final Flag VERIFY_USERS_KEY = new Flag<>("verify-users", Boolean.class); + public static final Flag ENCRYPT_CONNECTION = new Flag<>("encrypt-connection", Boolean.class); + + /** + * Session flag for determining whether to authenticate users with the session service. Server only. + */ + public static final Flag SHOULD_AUTHENTICATE = new Flag<>("should-authenticate", Boolean.class); /** * Session flag for determining whether to accept transferred connections. Server only. diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java index cc3bd0e93..435c0113d 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java @@ -10,7 +10,7 @@ import org.geysermc.mcprotocollib.network.Server; import org.geysermc.mcprotocollib.network.Session; import org.geysermc.mcprotocollib.network.crypt.AESEncryption; -import org.geysermc.mcprotocollib.network.crypt.PacketEncryption; +import org.geysermc.mcprotocollib.network.crypt.EncryptionConfig; import org.geysermc.mcprotocollib.network.packet.PacketHeader; import org.geysermc.mcprotocollib.network.packet.PacketProtocol; import org.geysermc.mcprotocollib.network.packet.PacketRegistry; @@ -177,11 +177,11 @@ public PacketRegistry getPacketRegistry() { return this.stateRegistry; } - protected PacketEncryption enableEncryption(Key key) { + protected EncryptionConfig createEncryption(Key key) { try { - return new AESEncryption(key); + return new EncryptionConfig(new AESEncryption(key)); } catch (GeneralSecurityException e) { - throw new Error("Failed to enable protocol encryption.", e); + throw new IllegalStateException("Failed to create protocol encryption.", e); } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ServerListener.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ServerListener.java index f63659ce4..cd1d7a6c5 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ServerListener.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ServerListener.java @@ -8,6 +8,8 @@ import org.geysermc.mcprotocollib.auth.GameProfile; import org.geysermc.mcprotocollib.auth.SessionService; import org.geysermc.mcprotocollib.network.Session; +import org.geysermc.mcprotocollib.network.compression.CompressionConfig; +import org.geysermc.mcprotocollib.network.compression.ZlibCompression; import org.geysermc.mcprotocollib.network.event.session.ConnectedEvent; import org.geysermc.mcprotocollib.network.event.session.DisconnectingEvent; import org.geysermc.mcprotocollib.network.event.session.SessionAdapter; @@ -116,10 +118,10 @@ public void packetReceived(Session session, Packet packet) { if (packet instanceof ServerboundHelloPacket helloPacket) { this.username = helloPacket.getUsername(); - if (session.getFlag(MinecraftConstants.VERIFY_USERS_KEY, true)) { - session.send(new ClientboundHelloPacket(SERVER_ID, KEY_PAIR.getPublic(), this.challenge, true)); + if (session.getFlag(MinecraftConstants.ENCRYPT_CONNECTION, true)) { + session.send(new ClientboundHelloPacket(SERVER_ID, KEY_PAIR.getPublic(), this.challenge, session.getFlag(MinecraftConstants.SHOULD_AUTHENTICATE, true))); } else { - new Thread(new UserAuthTask(session, null)).start(); + new Thread(new UserAuthTask(session, false, null)).start(); } } else if (packet instanceof ServerboundKeyPacket keyPacket) { PrivateKey privateKey = KEY_PAIR.getPrivate(); @@ -129,8 +131,8 @@ public void packetReceived(Session session, Packet packet) { } SecretKey key = keyPacket.getSecretKey(privateKey); - session.enableEncryption(protocol.enableEncryption(key)); - new Thread(new UserAuthTask(session, key)).start(); + session.setEncryption(protocol.createEncryption(key)); + new Thread(new UserAuthTask(session, session.getFlag(MinecraftConstants.SHOULD_AUTHENTICATE, true), key)).start(); } else if (packet instanceof ServerboundLoginAcknowledgedPacket) { protocol.setState(ProtocolState.CONFIGURATION); @@ -202,7 +204,9 @@ public void packetReceived(Session session, Packet packet) { @Override public void packetSent(Session session, Packet packet) { if (packet instanceof ClientboundLoginCompressionPacket loginCompressionPacket) { - session.setCompressionThreshold(loginCompressionPacket.getThreshold(), true); + int threshold = loginCompressionPacket.getThreshold(); + session.setCompression(threshold >= 0 ? + new CompressionConfig(threshold, new ZlibCompression(), true) : null); session.send(new ClientboundGameProfilePacket(session.getFlag(MinecraftConstants.PROFILE_KEY), true)); } } @@ -220,12 +224,13 @@ public void disconnecting(DisconnectingEvent event) { @RequiredArgsConstructor private class UserAuthTask implements Runnable { private final Session session; + private final boolean shouldAuthenticate; private final SecretKey key; @Override public void run() { GameProfile profile; - if (this.key != null) { + if (this.shouldAuthenticate && this.key != null) { SessionService sessionService = this.session.getFlag(MinecraftConstants.SESSION_SERVICE_KEY, new SessionService()); try { profile = sessionService.getProfileByServer(username, SessionService.getServerId(SERVER_ID, KEY_PAIR.getPublic(), this.key)); diff --git a/protocol/src/test/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocolTest.java b/protocol/src/test/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocolTest.java index 49a45c0b9..2dff91c35 100644 --- a/protocol/src/test/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocolTest.java +++ b/protocol/src/test/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocolTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.CountDownLatch; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.geysermc.mcprotocollib.protocol.MinecraftConstants.*; import static org.junit.jupiter.api.Assertions.*; public class MinecraftProtocolTest { @@ -49,10 +48,11 @@ public class MinecraftProtocolTest { @BeforeAll public static void setupServer() { server = new TcpServer(HOST, PORT, MinecraftProtocol::new); - server.setGlobalFlag(VERIFY_USERS_KEY, false); - server.setGlobalFlag(SERVER_COMPRESSION_THRESHOLD, 100); - server.setGlobalFlag(SERVER_INFO_BUILDER_KEY, session -> SERVER_INFO); - server.setGlobalFlag(SERVER_LOGIN_HANDLER_KEY, session -> { + server.setGlobalFlag(MinecraftConstants.ENCRYPT_CONNECTION, true); + server.setGlobalFlag(MinecraftConstants.SHOULD_AUTHENTICATE, false); + server.setGlobalFlag(MinecraftConstants.SERVER_COMPRESSION_THRESHOLD, 256); + server.setGlobalFlag(MinecraftConstants.SERVER_INFO_BUILDER_KEY, session -> SERVER_INFO); + server.setGlobalFlag(MinecraftConstants.SERVER_LOGIN_HANDLER_KEY, session -> { // Seems like in this setup the server can reply too quickly to ServerboundFinishConfigurationPacket // before the client can transition CONFIGURATION -> GAME. There is probably something wrong here and this is just a band-aid. try { @@ -79,7 +79,7 @@ public void testStatus() throws InterruptedException { Session session = new TcpClientSession(HOST, PORT, new MinecraftProtocol()); try { ServerInfoHandlerTest handler = new ServerInfoHandlerTest(); - session.setFlag(SERVER_INFO_HANDLER_KEY, handler); + session.setFlag(MinecraftConstants.SERVER_INFO_HANDLER_KEY, handler); session.addListener(new DisconnectListener()); session.connect(); From f8460356db2b92fbf7cb506757fe8f87a011a1f7 Mon Sep 17 00:00:00 2001 From: Alex <40795980+AlexProgrammerDE@users.noreply.github.com> Date: Tue, 8 Oct 2024 15:45:26 +0200 Subject: [PATCH 5/6] Split incoming/outgoing packet registry, transition protocol states correctly (#841) * Initial code changes * Make it compile * Small inlining * Make less detectable by anticheats and fix keepalive during configuration * Fix keepalive edge case * Properly switch inbound protocol in server listener * Add flow control * Make toggling automatic keepalive work in another way * Remove ping pong packets again * Address review * Handle keepalive in configuration * Only spawn keepalive after login is acknowledged * Prevent very unlikely race conditions with keepalive being switched during a task * Add debug log for packet serialization and state switching * Add one more debug print * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java Co-authored-by: chris * Update protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java Co-authored-by: chris * Mark packet as nonnull * Fix outbound writing race conditions * Ensure packets are always sent on the event loop This replicates the same approach Mojang uses in their networking code. * Reduce log verbosity * Put errors into debug * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java Co-authored-by: chris * Add comment to always running in event loop * Handle auto read earlier to prevent race conditions * Make instance dynamic * Revert "Make instance dynamic" This reverts commit 7f8affbdc58b0c94c119d1b383ddcbb76cc2e321. * Make flush packet priority * Do not hide original line that is the cause of the exception * Cancel packet using exception rather than return * Properly iterate through parents * Set log level to debug for unit tests * Revert "Properly iterate through parents" This reverts commit 4e2b64d9832919ba89735880ae5b07074112062b. * Revert "Cancel packet using exception rather than return" This reverts commit 6507e77bbe39e63b6de342284d564075e171ad67. * Add write length filter * Reuse bytebuf for fake flush to avoid unnecessary allocations * Make tests happy * Remake dropping packets * Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java Co-authored-by: chris * Fix space * Rename to flush packet * Add mojmap reference * Share keepalive code * Fix compilation * Revert a tiny bit closer to vanilla * Inline lambda * Inherit annotation * Inherit annotation 2 * Use checkerframework annotation * Fixup grammar slightly * Add reset states method * Add log marker for packet logging --------- Co-authored-by: chris --- .../network/example/TestProtocol.java | 7 +- .../example/MinecraftProtocolTest.java | 2 +- .../mcprotocollib/network/Session.java | 58 ++++- .../mcprotocollib/network/packet/Packet.java | 11 + .../network/packet/PacketProtocol.java | 13 +- .../network/tcp/FlushHandler.java | 28 +++ .../network/tcp/TcpClientSession.java | 9 +- .../network/tcp/TcpFlowControlHandler.java | 20 ++ .../network/tcp/TcpPacketCodec.java | 62 +++++- .../mcprotocollib/network/tcp/TcpServer.java | 5 +- .../mcprotocollib/network/tcp/TcpSession.java | 20 +- .../protocol/ClientListener.java | 68 +++--- .../protocol/MinecraftProtocol.java | 67 ++++-- .../protocol/ServerListener.java | 201 ++++++++++-------- .../ClientboundFinishConfigurationPacket.java | 5 + .../ServerboundFinishConfigurationPacket.java | 5 + .../serverbound/ClientIntentionPacket.java | 5 + .../ClientboundStartConfigurationPacket.java | 5 + ...rboundConfigurationAcknowledgedPacket.java | 5 + .../ClientboundGameProfilePacket.java | 5 + .../ServerboundLoginAcknowledgedPacket.java | 5 + .../test/resources/simplelogger.properties | 1 + 22 files changed, 447 insertions(+), 160 deletions(-) create mode 100644 protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/FlushHandler.java create mode 100644 protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpFlowControlHandler.java create mode 100644 protocol/src/test/resources/simplelogger.properties diff --git a/example/src/main/java/org/geysermc/mcprotocollib/network/example/TestProtocol.java b/example/src/main/java/org/geysermc/mcprotocollib/network/example/TestProtocol.java index cc108fb2c..3b114f413 100644 --- a/example/src/main/java/org/geysermc/mcprotocollib/network/example/TestProtocol.java +++ b/example/src/main/java/org/geysermc/mcprotocollib/network/example/TestProtocol.java @@ -82,7 +82,12 @@ public void newServerSession(Server server, Session session) { } @Override - public PacketRegistry getPacketRegistry() { + public PacketRegistry getInboundPacketRegistry() { + return registry; + } + + @Override + public PacketRegistry getOutboundPacketRegistry() { return registry; } } diff --git a/example/src/main/java/org/geysermc/mcprotocollib/protocol/example/MinecraftProtocolTest.java b/example/src/main/java/org/geysermc/mcprotocollib/protocol/example/MinecraftProtocolTest.java index 71250757d..e42cb1adf 100644 --- a/example/src/main/java/org/geysermc/mcprotocollib/protocol/example/MinecraftProtocolTest.java +++ b/example/src/main/java/org/geysermc/mcprotocollib/protocol/example/MinecraftProtocolTest.java @@ -135,7 +135,7 @@ public void packetReceived(Session session, Packet packet) { @Override public void sessionRemoved(SessionRemovedEvent event) { MinecraftProtocol protocol = (MinecraftProtocol) event.getSession().getPacketProtocol(); - if (protocol.getState() == ProtocolState.GAME) { + if (protocol.getOutboundState() == ProtocolState.GAME) { log.info("Closing server."); event.getServer().close(false); } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java index 054c76ff1..b472b4612 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java @@ -1,5 +1,6 @@ package org.geysermc.mcprotocollib.network; +import io.netty.channel.Channel; import net.kyori.adventure.text.Component; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -10,6 +11,7 @@ import org.geysermc.mcprotocollib.network.event.session.SessionListener; import org.geysermc.mcprotocollib.network.packet.Packet; import org.geysermc.mcprotocollib.network.packet.PacketProtocol; +import org.geysermc.mcprotocollib.network.tcp.FlushHandler; import java.net.SocketAddress; import java.util.List; @@ -212,7 +214,17 @@ public interface Session { * * @param packet Packet to send. */ - void send(Packet packet); + default void send(@NonNull Packet packet) { + this.send(packet, null); + } + + /** + * Sends a packet and runs the specified callback when the packet has been sent. + * + * @param packet Packet to send. + * @param onSent Callback to run when the packet has been sent. + */ + void send(@NonNull Packet packet, @Nullable Runnable onSent); /** * Disconnects the session. @@ -255,4 +267,48 @@ default void disconnect(@NonNull Component reason) { * @param cause Throwable responsible for disconnecting. */ void disconnect(@NonNull Component reason, @Nullable Throwable cause); + + /** + * Auto read in netty means that the server is automatically reading from the channel. + * Turning it off means that we won't get more packets being decoded until we turn it back on. + * We use this to hold off on reading packets until we are ready to process them. + * For example this is used for switching inbound states with {@link #switchInboundState(Runnable)}. + * + * @param autoRead Whether to enable auto read. + * Default is true. + */ + void setAutoRead(boolean autoRead); + + /** + * Returns the underlying netty channel of this session. + * + * @return The netty channel + */ + Channel getChannel(); + + /** + * Changes the inbound state of the session and then re-enables auto read. + * This is used after a terminal packet was handled and the session is ready to receive more packets in the new state. + * + * @param switcher The runnable that switches the inbound state. + */ + default void switchInboundState(Runnable switcher) { + switcher.run(); + + // We switched to the new inbound state + // we can start reading again + setAutoRead(true); + } + + /** + * Flushes all packets that are due to be sent and changes the outbound state of the session. + * This makes sure no other threads have scheduled packets to be sent. + * + * @param switcher The runnable that switches the outbound state. + */ + default void switchOutboundState(Runnable switcher) { + getChannel().writeAndFlush(FlushHandler.FLUSH_PACKET).syncUninterruptibly(); + + switcher.run(); + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/packet/Packet.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/packet/Packet.java index 56cd2db7e..94d68197f 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/packet/Packet.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/packet/Packet.java @@ -1,6 +1,7 @@ package org.geysermc.mcprotocollib.network.packet; import io.netty.buffer.ByteBuf; +import org.geysermc.mcprotocollib.network.Session; /** * A network packet. Any given packet must have a constructor that takes in a {@link ByteBuf}. @@ -17,4 +18,14 @@ public interface Packet { default boolean isPriority() { return false; } + + /** + * Returns whether the packet is terminal. If true, this should be the last packet sent inside a protocol state. + * Subsequently, {@link Session#setAutoRead(boolean)} should be disabled when a terminal packet is received, until the session has switched into a new state and is ready to receive more packets. + * + * @return Whether the packet is terminal. + */ + default boolean isTerminal() { + return false; + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/packet/PacketProtocol.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/packet/PacketProtocol.java index e50a2ba0c..00aa281aa 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/packet/PacketProtocol.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/packet/PacketProtocol.java @@ -49,9 +49,16 @@ public abstract class PacketProtocol { public abstract void newServerSession(Server server, Session session); /** - * Gets the packet registry for this protocol. + * Gets the inbound packet registry for this protocol. * - * @return The protocol's packet registry. + * @return The protocol's inbound packet registry. */ - public abstract PacketRegistry getPacketRegistry(); + public abstract PacketRegistry getInboundPacketRegistry(); + + /** + * Gets the outbound packet registry for this protocol. + * + * @return The protocol's outbound packet registry. + */ + public abstract PacketRegistry getOutboundPacketRegistry(); } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/FlushHandler.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/FlushHandler.java new file mode 100644 index 000000000..85d4ff1a4 --- /dev/null +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/FlushHandler.java @@ -0,0 +1,28 @@ +package org.geysermc.mcprotocollib.network.tcp; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +/** + * Sending a {@link FlushPacket} will ensure all before were sent. + * This handler makes sure it's dropped before it reaches the encoder. + * This logic is similar to the Minecraft UnconfiguredPipelineHandler.OutboundConfigurationTask. + */ +public class FlushHandler extends ChannelOutboundHandlerAdapter { + public static final FlushPacket FLUSH_PACKET = new FlushPacket(); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg == FLUSH_PACKET) { + promise.setSuccess(); + } else { + super.write(ctx, msg, promise); + } + } + + public static class FlushPacket { + private FlushPacket() { + } + } +} diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java index a430d09bd..9f1303635 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java @@ -27,6 +27,7 @@ import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.DefaultThreadFactory; +import org.checkerframework.checker.nullness.qual.NonNull; import org.geysermc.mcprotocollib.network.BuiltinFlags; import org.geysermc.mcprotocollib.network.ProxyInfo; import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper; @@ -100,7 +101,7 @@ public void connect(boolean wait, boolean transferring) { .localAddress(bindAddress, bindPort) .handler(new ChannelInitializer<>() { @Override - public void initChannel(Channel channel) { + public void initChannel(@NonNull Channel channel) { PacketProtocol protocol = getPacketProtocol(); protocol.newClientSession(TcpClientSession.this, transferring); @@ -117,7 +118,9 @@ public void initChannel(Channel channel) { pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper())); pipeline.addLast("compression", new TcpPacketCompression(getCodecHelper())); + pipeline.addLast("flow-control", new TcpFlowControlHandler()); pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true)); + pipeline.addLast("flush-handler", new FlushHandler()); pipeline.addLast("manager", TcpClientSession.this); } }); @@ -246,9 +249,7 @@ private void initializeHAProxySupport(Channel channel) { HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), clientAddress.getPort(), remoteAddress.getPort() - )).addListener(future -> { - channel.pipeline().remove("proxy-protocol-encoder"); - }); + )).addListener(future -> channel.pipeline().remove("proxy-protocol-encoder")); } private static void createTcpEventLoopGroup() { diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpFlowControlHandler.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpFlowControlHandler.java new file mode 100644 index 000000000..e22bf593a --- /dev/null +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpFlowControlHandler.java @@ -0,0 +1,20 @@ +package org.geysermc.mcprotocollib.network.tcp; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.flow.FlowControlHandler; + +/** + * A flow control handler for TCP connections. + * When auto-read is disabled, this will halt decoding of packets until auto-read is re-enabled. + * This is needed because auto-read still allows packets to be decoded, even if the channel is not reading anymore from the network. + * This can happen when the channel already read a packet, but the packet is not yet decoded. + * This will halt all decoding until the channel is ready to process more packets. + */ +public class TcpFlowControlHandler extends FlowControlHandler { + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + if (ctx.channel().config().isAutoRead()) { + super.read(ctx); + } + } +} diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketCodec.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketCodec.java index ce543bfe3..98a3e2f46 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketCodec.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpPacketCodec.java @@ -2,17 +2,27 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageCodec; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.EncoderException; +import io.netty.handler.codec.MessageToMessageCodec; import org.geysermc.mcprotocollib.network.Session; import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper; import org.geysermc.mcprotocollib.network.codec.PacketDefinition; import org.geysermc.mcprotocollib.network.event.session.PacketErrorEvent; import org.geysermc.mcprotocollib.network.packet.Packet; import org.geysermc.mcprotocollib.network.packet.PacketProtocol; +import org.geysermc.mcprotocollib.network.packet.PacketRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import java.util.List; -public class TcpPacketCodec extends ByteToMessageCodec { +public class TcpPacketCodec extends MessageToMessageCodec { + private static final Marker marker = MarkerFactory.getMarker("packet_logging"); + private static final Logger log = LoggerFactory.getLogger(TcpPacketCodec.class); + private final Session session; private final boolean client; @@ -23,35 +33,51 @@ public TcpPacketCodec(Session session, boolean client) { @SuppressWarnings({"rawtypes", "unchecked"}) @Override - public void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf buf) { - int initial = buf.writerIndex(); + public void encode(ChannelHandlerContext ctx, Packet packet, List out) { + if (log.isTraceEnabled()) { + log.trace(marker, "Encoding packet: {}", packet.getClass().getSimpleName()); + } PacketProtocol packetProtocol = this.session.getPacketProtocol(); + PacketRegistry packetRegistry = packetProtocol.getOutboundPacketRegistry(); PacketCodecHelper codecHelper = this.session.getCodecHelper(); try { - int packetId = this.client ? packetProtocol.getPacketRegistry().getServerboundId(packet) : packetProtocol.getPacketRegistry().getClientboundId(packet); - PacketDefinition definition = this.client ? packetProtocol.getPacketRegistry().getServerboundDefinition(packetId) : packetProtocol.getPacketRegistry().getClientboundDefinition(packetId); + int packetId = this.client ? packetRegistry.getServerboundId(packet) : packetRegistry.getClientboundId(packet); + PacketDefinition definition = this.client ? packetRegistry.getServerboundDefinition(packetId) : packetRegistry.getClientboundDefinition(packetId); + ByteBuf buf = ctx.alloc().buffer(); packetProtocol.getPacketHeader().writePacketId(buf, codecHelper, packetId); definition.getSerializer().serialize(buf, codecHelper, packet); + + out.add(buf); + + if (log.isDebugEnabled()) { + log.debug(marker, "Encoded packet {} ({})", packet.getClass().getSimpleName(), packetId); + } } catch (Throwable t) { - // Reset writer index to make sure incomplete data is not written out. - buf.writerIndex(initial); + log.debug(marker, "Error encoding packet", t); PacketErrorEvent e = new PacketErrorEvent(this.session, t); this.session.callEvent(e); if (!e.shouldSuppress()) { - throw t; + throw new EncoderException(t); } } } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) { + // Vanilla also checks for 0 length + if (buf.readableBytes() == 0) { + return; + } + int initial = buf.readerIndex(); PacketProtocol packetProtocol = this.session.getPacketProtocol(); + PacketRegistry packetRegistry = packetProtocol.getInboundPacketRegistry(); PacketCodecHelper codecHelper = this.session.getCodecHelper(); + Packet packet = null; try { int id = packetProtocol.getPacketHeader().readPacketId(buf, codecHelper); if (id == -1) { @@ -59,21 +85,35 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) return; } - Packet packet = this.client ? packetProtocol.getPacketRegistry().createClientboundPacket(id, buf, codecHelper) : packetProtocol.getPacketRegistry().createServerboundPacket(id, buf, codecHelper); + log.trace(marker, "Decoding packet with id: {}", id); + + packet = this.client ? packetRegistry.createClientboundPacket(id, buf, codecHelper) : packetRegistry.createServerboundPacket(id, buf, codecHelper); if (buf.readableBytes() > 0) { throw new IllegalStateException("Packet \"" + packet.getClass().getSimpleName() + "\" not fully read."); } out.add(packet); + + if (log.isDebugEnabled()) { + log.debug(marker, "Decoded packet {} ({})", packet.getClass().getSimpleName(), id); + } } catch (Throwable t) { + log.debug(marker, "Error decoding packet", t); + // Advance buffer to end to make sure remaining data in this packet is skipped. buf.readerIndex(buf.readerIndex() + buf.readableBytes()); PacketErrorEvent e = new PacketErrorEvent(this.session, t); this.session.callEvent(e); if (!e.shouldSuppress()) { - throw t; + throw new DecoderException(t); + } + } finally { + if (packet != null && packet.isTerminal()) { + // Next packets are in a different protocol state, so we must + // disable auto-read to prevent reading wrong packets. + session.setAutoRead(false); } } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java index b6fdaab32..809a88c7d 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java @@ -9,6 +9,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; +import org.checkerframework.checker.nullness.qual.NonNull; import org.geysermc.mcprotocollib.network.AbstractServer; import org.geysermc.mcprotocollib.network.BuiltinFlags; import org.geysermc.mcprotocollib.network.helper.TransportHelper; @@ -52,7 +53,7 @@ public void bindImpl(boolean wait, final Runnable callback) { .localAddress(this.getHost(), this.getPort()) .childHandler(new ChannelInitializer<>() { @Override - public void initChannel(Channel channel) { + public void initChannel(@NonNull Channel channel) { InetSocketAddress address = (InetSocketAddress) channel.remoteAddress(); PacketProtocol protocol = createPacketProtocol(); @@ -68,7 +69,9 @@ public void initChannel(Channel channel) { pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), session.getCodecHelper())); pipeline.addLast("compression", new TcpPacketCompression(session.getCodecHelper())); + pipeline.addLast("flow-control", new TcpFlowControlHandler()); pipeline.addLast("codec", new TcpPacketCodec(session, false)); + pipeline.addLast("flush-handler", new FlushHandler()); pipeline.addLast("manager", session); } }); diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java index 8a1ee37cb..f17fa910a 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpSession.java @@ -216,11 +216,17 @@ public boolean isConnected() { } @Override - public void send(Packet packet) { + public void send(@NonNull Packet packet, @Nullable Runnable onSent) { if (this.channel == null) { return; } + // Same behaviour as vanilla, always offload packet sending to the event loop + if (!this.channel.eventLoop().inEventLoop()) { + this.channel.eventLoop().execute(() -> this.send(packet, onSent)); + return; + } + PacketSendingEvent sendingEvent = new PacketSendingEvent(this, packet); this.callEvent(sendingEvent); @@ -228,6 +234,10 @@ public void send(Packet packet) { final Packet toSend = sendingEvent.getPacket(); this.channel.writeAndFlush(toSend).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { + if (onSent != null) { + onSent.run(); + } + callPacketSent(toSend); } else { exceptionCaught(null, future.cause()); @@ -252,6 +262,13 @@ public void disconnect(@NonNull Component reason, @Nullable Throwable cause) { } } + @Override + public void setAutoRead(boolean autoRead) { + if (this.channel != null) { + this.channel.config().setAutoRead(autoRead); + } + } + private @Nullable EventLoop createEventLoop() { if (!USE_EVENT_LOOP_FOR_PACKETS) { return null; @@ -267,6 +284,7 @@ public void disconnect(@NonNull Component reason, @Nullable Throwable cause) { return PACKET_EVENT_LOOP.next(); } + @Override public Channel getChannel() { return this.channel; } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java index 6b9ead048..ad017f614 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ClientListener.java @@ -61,7 +61,7 @@ public class ClientListener extends SessionAdapter { @Override public void packetReceived(Session session, Packet packet) { MinecraftProtocol protocol = (MinecraftProtocol) session.getPacketProtocol(); - if (protocol.getState() == ProtocolState.LOGIN) { + if (protocol.getInboundState() == ProtocolState.LOGIN) { if (packet instanceof ClientboundHelloPacket helloPacket) { GameProfile profile = session.getFlag(MinecraftConstants.PROFILE_KEY); String accessToken = session.getFlag(MinecraftConstants.ACCESS_TOKEN_KEY); @@ -92,18 +92,21 @@ public void packetReceived(Session session, Packet packet) { return; } - session.send(new ServerboundKeyPacket(helloPacket.getPublicKey(), key, helloPacket.getChallenge())); - session.setEncryption(protocol.createEncryption(key)); + session.send(new ServerboundKeyPacket(helloPacket.getPublicKey(), key, helloPacket.getChallenge()), + () -> session.setEncryption(protocol.createEncryption(key))); } else if (packet instanceof ClientboundGameProfilePacket) { + session.switchInboundState(() -> protocol.setInboundState(ProtocolState.CONFIGURATION)); session.send(new ServerboundLoginAcknowledgedPacket()); + session.switchOutboundState(() -> protocol.setOutboundState(ProtocolState.CONFIGURATION)); } else if (packet instanceof ClientboundLoginDisconnectPacket loginDisconnectPacket) { session.disconnect(loginDisconnectPacket.getReason()); } else if (packet instanceof ClientboundLoginCompressionPacket loginCompressionPacket) { int threshold = loginCompressionPacket.getThreshold(); - session.setCompression(threshold >= 0 ? - new CompressionConfig(threshold, new ZlibCompression(), false) : null); + if (threshold >= 0) { + session.setCompression(new CompressionConfig(threshold, new ZlibCompression(), false)); + } } - } else if (protocol.getState() == ProtocolState.STATUS) { + } else if (protocol.getInboundState() == ProtocolState.STATUS) { if (packet instanceof ClientboundStatusResponsePacket statusResponsePacket) { ServerStatusInfo info = statusResponsePacket.parseInfo(); ServerInfoHandler handler = session.getFlag(MinecraftConstants.SERVER_INFO_HANDLER_KEY); @@ -121,13 +124,15 @@ public void packetReceived(Session session, Packet packet) { session.disconnect(Component.translatable("multiplayer.status.finished")); } - } else if (protocol.getState() == ProtocolState.GAME) { + } else if (protocol.getInboundState() == ProtocolState.GAME) { if (packet instanceof ClientboundKeepAlivePacket keepAlivePacket && session.getFlag(MinecraftConstants.AUTOMATIC_KEEP_ALIVE_MANAGEMENT, true)) { session.send(new ServerboundKeepAlivePacket(keepAlivePacket.getPingId())); } else if (packet instanceof ClientboundDisconnectPacket disconnectPacket) { session.disconnect(disconnectPacket.getReason()); } else if (packet instanceof ClientboundStartConfigurationPacket) { + session.switchInboundState(() -> protocol.setInboundState(ProtocolState.CONFIGURATION)); session.send(new ServerboundConfigurationAcknowledgedPacket()); + session.switchOutboundState(() -> protocol.setOutboundState(ProtocolState.CONFIGURATION)); } else if (packet instanceof ClientboundTransferPacket transferPacket) { if (session.getFlag(MinecraftConstants.FOLLOW_TRANSFERS, true)) { TcpClientSession newSession = new TcpClientSession(transferPacket.getHost(), transferPacket.getPort(), session.getPacketProtocol()); @@ -136,9 +141,13 @@ public void packetReceived(Session session, Packet packet) { newSession.connect(true, true); } } - } else if (protocol.getState() == ProtocolState.CONFIGURATION) { - if (packet instanceof ClientboundFinishConfigurationPacket) { + } else if (protocol.getInboundState() == ProtocolState.CONFIGURATION) { + if (packet instanceof ClientboundKeepAlivePacket keepAlivePacket && session.getFlag(MinecraftConstants.AUTOMATIC_KEEP_ALIVE_MANAGEMENT, true)) { + session.send(new ServerboundKeepAlivePacket(keepAlivePacket.getPingId())); + } else if (packet instanceof ClientboundFinishConfigurationPacket) { + session.switchInboundState(() -> protocol.setInboundState(ProtocolState.GAME)); session.send(new ServerboundFinishConfigurationPacket()); + session.switchOutboundState(() -> protocol.setOutboundState(ProtocolState.GAME)); } else if (packet instanceof ClientboundSelectKnownPacks) { if (session.getFlag(MinecraftConstants.SEND_BLANK_KNOWN_PACKS_RESPONSE, true)) { session.send(new ServerboundSelectKnownPacks(Collections.emptyList())); @@ -155,38 +164,25 @@ public void packetReceived(Session session, Packet packet) { } @Override - public void packetSent(Session session, Packet packet) { + public void connected(ConnectedEvent event) { + Session session = event.getSession(); MinecraftProtocol protocol = (MinecraftProtocol) session.getPacketProtocol(); - if (packet instanceof ClientIntentionPacket) { - // Once the HandshakePacket has been sent, switch to the next protocol mode. - protocol.setState(this.targetState); + ClientIntentionPacket intention = new ClientIntentionPacket(protocol.getCodec().getProtocolVersion(), session.getHost(), session.getPort(), switch (targetState) { + case LOGIN -> transferring ? HandshakeIntent.TRANSFER : HandshakeIntent.LOGIN; + case STATUS -> HandshakeIntent.STATUS; + default -> throw new IllegalStateException("Unexpected value: " + targetState); + }); - if (this.targetState == ProtocolState.LOGIN) { + session.switchInboundState(() -> protocol.setInboundState(this.targetState)); + session.send(intention); + session.switchOutboundState(() -> protocol.setOutboundState(this.targetState)); + switch (this.targetState) { + case LOGIN -> { GameProfile profile = session.getFlag(MinecraftConstants.PROFILE_KEY); session.send(new ServerboundHelloPacket(profile.getName(), profile.getId())); - } else { - session.send(new ServerboundStatusRequestPacket()); - } - } else if (packet instanceof ServerboundLoginAcknowledgedPacket) { - protocol.setState(ProtocolState.CONFIGURATION); // LOGIN -> CONFIGURATION - } else if (packet instanceof ServerboundFinishConfigurationPacket) { - protocol.setState(ProtocolState.GAME); // CONFIGURATION -> GAME - } else if (packet instanceof ServerboundConfigurationAcknowledgedPacket) { - protocol.setState(ProtocolState.CONFIGURATION); // GAME -> CONFIGURATION - } - } - - @Override - public void connected(ConnectedEvent event) { - MinecraftProtocol protocol = (MinecraftProtocol) event.getSession().getPacketProtocol(); - if (this.targetState == ProtocolState.LOGIN) { - if (this.transferring) { - event.getSession().send(new ClientIntentionPacket(protocol.getCodec().getProtocolVersion(), event.getSession().getHost(), event.getSession().getPort(), HandshakeIntent.TRANSFER)); - } else { - event.getSession().send(new ClientIntentionPacket(protocol.getCodec().getProtocolVersion(), event.getSession().getHost(), event.getSession().getPort(), HandshakeIntent.LOGIN)); } - } else if (this.targetState == ProtocolState.STATUS) { - event.getSession().send(new ClientIntentionPacket(protocol.getCodec().getProtocolVersion(), event.getSession().getHost(), event.getSession().getPort(), HandshakeIntent.STATUS)); + case STATUS -> session.send(new ServerboundStatusRequestPacket()); + default -> throw new IllegalStateException("Unexpected value: " + targetState); } } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java index 435c0113d..3fa83080f 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java @@ -18,6 +18,8 @@ import org.geysermc.mcprotocollib.protocol.codec.MinecraftCodecHelper; import org.geysermc.mcprotocollib.protocol.codec.PacketCodec; import org.geysermc.mcprotocollib.protocol.data.ProtocolState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.InputStream; import java.security.GeneralSecurityException; @@ -29,6 +31,7 @@ * Implements the Minecraft protocol. */ public class MinecraftProtocol extends PacketProtocol { + private static final Logger log = LoggerFactory.getLogger(MinecraftProtocol.class); /** * The network codec sent from the server to the client during {@link ProtocolState#CONFIGURATION}. @@ -44,8 +47,11 @@ public class MinecraftProtocol extends PacketProtocol { @Getter private final PacketCodec codec; - private ProtocolState state; - private PacketRegistry stateRegistry; + private ProtocolState inboundState; + private PacketRegistry inboundStateRegistry; + + private ProtocolState outboundState; + private PacketRegistry outboundStateRegistry; private final ProtocolState targetState; @@ -84,7 +90,7 @@ public MinecraftProtocol(PacketCodec codec) { this.codec = codec; this.targetState = ProtocolState.STATUS; - this.setState(ProtocolState.HANDSHAKE); + resetStates(); } /** @@ -129,7 +135,7 @@ public MinecraftProtocol(@NonNull PacketCodec codec, @NonNull GameProfile profil this.profile = profile; this.accessToken = accessToken; - this.setState(ProtocolState.HANDSHAKE); + resetStates(); } @Override @@ -152,7 +158,7 @@ public void newClientSession(Session session, boolean transferring) { session.setFlag(MinecraftConstants.PROFILE_KEY, this.profile); session.setFlag(MinecraftConstants.ACCESS_TOKEN_KEY, this.accessToken); - this.setState(ProtocolState.HANDSHAKE); + resetStates(); if (this.useDefaultListeners) { session.addListener(new ClientListener(this.targetState, transferring)); @@ -161,7 +167,7 @@ public void newClientSession(Session session, boolean transferring) { @Override public void newServerSession(Server server, Session session) { - this.setState(ProtocolState.HANDSHAKE); + resetStates(); if (this.useDefaultListeners) { if (DEFAULT_NETWORK_CODEC == null) { @@ -173,8 +179,13 @@ public void newServerSession(Server server, Session session) { } @Override - public PacketRegistry getPacketRegistry() { - return this.stateRegistry; + public PacketRegistry getInboundPacketRegistry() { + return this.inboundStateRegistry; + } + + @Override + public PacketRegistry getOutboundPacketRegistry() { + return this.outboundStateRegistry; } protected EncryptionConfig createEncryption(Key key) { @@ -186,17 +197,43 @@ protected EncryptionConfig createEncryption(Key key) { } /** - * Gets the current {@link ProtocolState} the client is in. + * Resets the protocol states to {@link ProtocolState#HANDSHAKE}. + */ + public void resetStates() { + this.setInboundState(ProtocolState.HANDSHAKE); + this.setOutboundState(ProtocolState.HANDSHAKE); + } + + /** + * Gets the current inbound {@link ProtocolState} we're in. * - * @return The current {@link ProtocolState}. + * @return The current inbound {@link ProtocolState}. */ - public ProtocolState getState() { - return this.state; + public ProtocolState getInboundState() { + return this.inboundState; } - public void setState(ProtocolState state) { - this.state = state; - this.stateRegistry = this.codec.getCodec(state); + /** + * Gets the current outbound {@link ProtocolState} we're in. + * + * @return The current outbound {@link ProtocolState}. + */ + public ProtocolState getOutboundState() { + return this.outboundState; + } + + public void setInboundState(ProtocolState state) { + log.debug("Setting inbound protocol state to: {}", state); + + this.inboundState = state; + this.inboundStateRegistry = this.codec.getCodec(state); + } + + public void setOutboundState(ProtocolState state) { + log.debug("Setting outbound protocol state to: {}", state); + + this.outboundState = state; + this.outboundStateRegistry = this.codec.getCodec(state); } public static NbtMap loadNetworkCodec() { diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ServerListener.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ServerListener.java index cd1d7a6c5..5de069283 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ServerListener.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/ServerListener.java @@ -1,6 +1,6 @@ package org.geysermc.mcprotocollib.protocol; -import lombok.RequiredArgsConstructor; +import lombok.Getter; import net.kyori.adventure.key.Key; import net.kyori.adventure.text.Component; import org.cloudburstmc.nbt.NbtMap; @@ -77,9 +77,10 @@ public class ServerListener extends SessionAdapter { private final byte[] challenge = new byte[4]; private String username = ""; + private KeepAliveState keepAliveState; - private long lastPingTime = 0; - private int lastPingId = 0; + @Getter + private boolean isTransfer = false; public ServerListener(NbtMap networkCodec) { this.networkCodec = networkCodec; @@ -88,40 +89,33 @@ public ServerListener(NbtMap networkCodec) { @Override public void connected(ConnectedEvent event) { - event.getSession().setFlag(MinecraftConstants.PING_KEY, 0L); + Session session = event.getSession(); + session.setFlag(MinecraftConstants.PING_KEY, 0L); } @Override public void packetReceived(Session session, Packet packet) { MinecraftProtocol protocol = (MinecraftProtocol) session.getPacketProtocol(); - if (protocol.getState() == ProtocolState.HANDSHAKE) { + if (protocol.getInboundState() == ProtocolState.HANDSHAKE) { if (packet instanceof ClientIntentionPacket intentionPacket) { switch (intentionPacket.getIntent()) { - case STATUS -> protocol.setState(ProtocolState.STATUS); - case TRANSFER -> { - if (!session.getFlag(MinecraftConstants.ACCEPT_TRANSFERS_KEY, false)) { - session.disconnect(Component.translatable("multiplayer.disconnect.transfers_disabled")); - } - } - case LOGIN -> { - protocol.setState(ProtocolState.LOGIN); - if (intentionPacket.getProtocolVersion() > protocol.getCodec().getProtocolVersion()) { - session.disconnect(Component.translatable("multiplayer.disconnect.incompatible", Component.text(protocol.getCodec().getMinecraftVersion()))); - } else if (intentionPacket.getProtocolVersion() < protocol.getCodec().getProtocolVersion()) { - session.disconnect(Component.translatable("multiplayer.disconnect.outdated_client", Component.text(protocol.getCodec().getMinecraftVersion()))); - } + case STATUS -> { + protocol.setOutboundState(ProtocolState.STATUS); + session.switchInboundState(() -> protocol.setInboundState(ProtocolState.STATUS)); } + case TRANSFER -> beginLogin(session, protocol, intentionPacket, true); + case LOGIN -> beginLogin(session, protocol, intentionPacket, false); default -> throw new UnsupportedOperationException("Invalid client intent: " + intentionPacket.getIntent()); } } - } else if (protocol.getState() == ProtocolState.LOGIN) { + } else if (protocol.getInboundState() == ProtocolState.LOGIN) { if (packet instanceof ServerboundHelloPacket helloPacket) { this.username = helloPacket.getUsername(); if (session.getFlag(MinecraftConstants.ENCRYPT_CONNECTION, true)) { session.send(new ClientboundHelloPacket(SERVER_ID, KEY_PAIR.getPublic(), this.challenge, session.getFlag(MinecraftConstants.SHOULD_AUTHENTICATE, true))); } else { - new Thread(new UserAuthTask(session, false, null)).start(); + new Thread(() -> authenticate(session, false, null)).start(); } } else if (packet instanceof ServerboundKeyPacket keyPacket) { PrivateKey privateKey = KEY_PAIR.getPrivate(); @@ -132,9 +126,15 @@ public void packetReceived(Session session, Packet packet) { SecretKey key = keyPacket.getSecretKey(privateKey); session.setEncryption(protocol.createEncryption(key)); - new Thread(new UserAuthTask(session, session.getFlag(MinecraftConstants.SHOULD_AUTHENTICATE, true), key)).start(); + new Thread(() -> authenticate(session, session.getFlag(MinecraftConstants.SHOULD_AUTHENTICATE, true), key)).start(); } else if (packet instanceof ServerboundLoginAcknowledgedPacket) { - protocol.setState(ProtocolState.CONFIGURATION); + protocol.setOutboundState(ProtocolState.CONFIGURATION); + session.switchInboundState(() -> protocol.setInboundState(ProtocolState.CONFIGURATION)); + keepAliveState = new KeepAliveState(); + if (session.getFlag(MinecraftConstants.AUTOMATIC_KEEP_ALIVE_MANAGEMENT, true)) { + // If keepalive state is null, lets assume there is no keepalive thread yet + new Thread(() -> keepAlive(session)).start(); + } // Credit ViaVersion: https://github.com/ViaVersion/ViaVersion/blob/dev/common/src/main/java/com/viaversion/viaversion/protocols/protocol1_20_5to1_20_3/rewriter/EntityPacketRewriter1_20_5.java for (Map.Entry entry : networkCodec.entrySet()) { @@ -156,7 +156,7 @@ public void packetReceived(Session session, Packet packet) { session.send(new ClientboundFinishConfigurationPacket()); } - } else if (protocol.getState() == ProtocolState.STATUS) { + } else if (protocol.getInboundState() == ProtocolState.STATUS) { if (packet instanceof ServerboundStatusRequestPacket) { ServerInfoBuilder builder = session.getFlag(MinecraftConstants.SERVER_INFO_BUILDER_KEY); if (builder == null) { @@ -174,103 +174,132 @@ public void packetReceived(Session session, Packet packet) { } else if (packet instanceof ServerboundPingRequestPacket pingRequestPacket) { session.send(new ClientboundPongResponsePacket(pingRequestPacket.getPingTime())); } - } else if (protocol.getState() == ProtocolState.GAME) { + } else if (protocol.getInboundState() == ProtocolState.GAME) { if (packet instanceof ServerboundKeepAlivePacket keepAlivePacket) { - if (keepAlivePacket.getPingId() == this.lastPingId) { - long time = System.currentTimeMillis() - this.lastPingTime; - session.setFlag(MinecraftConstants.PING_KEY, time); - } + handleKeepAlive(session, keepAlivePacket); } else if (packet instanceof ServerboundConfigurationAcknowledgedPacket) { - protocol.setState(ProtocolState.CONFIGURATION); + // The developer who sends ClientboundStartConfigurationPacket needs to setOutboundState to CONFIGURATION + // after sending the packet. We can't do it in this class because it needs to be a method call right after it was sent. + // Using nettys event loop to change outgoing state may cause differences to vanilla. + session.switchInboundState(() -> protocol.setInboundState(ProtocolState.CONFIGURATION)); + keepAliveState = new KeepAliveState(); } else if (packet instanceof ServerboundPingRequestPacket pingRequestPacket) { session.send(new ClientboundPongResponsePacket(pingRequestPacket.getPingTime())); session.disconnect(Component.translatable("multiplayer.status.request_handled")); } - } else if (protocol.getState() == ProtocolState.CONFIGURATION) { - if (packet instanceof ServerboundFinishConfigurationPacket) { - protocol.setState(ProtocolState.GAME); + } else if (protocol.getInboundState() == ProtocolState.CONFIGURATION) { + if (packet instanceof ServerboundKeepAlivePacket keepAlivePacket) { + handleKeepAlive(session, keepAlivePacket); + } else if (packet instanceof ServerboundFinishConfigurationPacket) { + protocol.setOutboundState(ProtocolState.GAME); + session.switchInboundState(() -> protocol.setInboundState(ProtocolState.GAME)); + keepAliveState = new KeepAliveState(); ServerLoginHandler handler = session.getFlag(MinecraftConstants.SERVER_LOGIN_HANDLER_KEY); if (handler != null) { handler.loggedIn(session); } + } + } + } - if (session.getFlag(MinecraftConstants.AUTOMATIC_KEEP_ALIVE_MANAGEMENT, true)) { - new Thread(new KeepAliveTask(session)).start(); - } + private void handleKeepAlive(Session session, ServerboundKeepAlivePacket keepAlivePacket) { + KeepAliveState currentKeepAliveState = this.keepAliveState; + if (currentKeepAliveState != null) { + if (currentKeepAliveState.keepAlivePending && keepAlivePacket.getPingId() == currentKeepAliveState.keepAliveChallenge) { + currentKeepAliveState.keepAlivePending = false; + session.setFlag(MinecraftConstants.PING_KEY, System.currentTimeMillis() - currentKeepAliveState.keepAliveTime); + } else { + session.disconnect(Component.translatable("disconnect.timeout")); } } } - @Override - public void packetSent(Session session, Packet packet) { - if (packet instanceof ClientboundLoginCompressionPacket loginCompressionPacket) { - int threshold = loginCompressionPacket.getThreshold(); - session.setCompression(threshold >= 0 ? - new CompressionConfig(threshold, new ZlibCompression(), true) : null); - session.send(new ClientboundGameProfilePacket(session.getFlag(MinecraftConstants.PROFILE_KEY), true)); + private void beginLogin(Session session, MinecraftProtocol protocol, ClientIntentionPacket packet, boolean transferred) { + isTransfer = transferred; + protocol.setOutboundState(ProtocolState.LOGIN); + if (transferred && !session.getFlag(MinecraftConstants.ACCEPT_TRANSFERS_KEY)) { + session.disconnect(Component.translatable("multiplayer.disconnect.transfers_disabled")); + } else if (packet.getProtocolVersion() > protocol.getCodec().getProtocolVersion()) { + session.disconnect(Component.translatable("multiplayer.disconnect.incompatible", Component.text(protocol.getCodec().getMinecraftVersion()))); + } else if (packet.getProtocolVersion() < protocol.getCodec().getProtocolVersion()) { + session.disconnect(Component.translatable("multiplayer.disconnect.outdated_client", Component.text(protocol.getCodec().getMinecraftVersion()))); + } else { + session.switchInboundState(() -> protocol.setInboundState(ProtocolState.LOGIN)); } } @Override public void disconnecting(DisconnectingEvent event) { - MinecraftProtocol protocol = (MinecraftProtocol) event.getSession().getPacketProtocol(); - if (protocol.getState() == ProtocolState.LOGIN) { - event.getSession().send(new ClientboundLoginDisconnectPacket(event.getReason())); - } else if (protocol.getState() == ProtocolState.GAME) { - event.getSession().send(new ClientboundDisconnectPacket(event.getReason())); + Session session = event.getSession(); + MinecraftProtocol protocol = (MinecraftProtocol) session.getPacketProtocol(); + if (protocol.getOutboundState() == ProtocolState.LOGIN) { + session.send(new ClientboundLoginDisconnectPacket(event.getReason())); + } else if (protocol.getOutboundState() == ProtocolState.GAME) { + session.send(new ClientboundDisconnectPacket(event.getReason())); } } - @RequiredArgsConstructor - private class UserAuthTask implements Runnable { - private final Session session; - private final boolean shouldAuthenticate; - private final SecretKey key; - - @Override - public void run() { - GameProfile profile; - if (this.shouldAuthenticate && this.key != null) { - SessionService sessionService = this.session.getFlag(MinecraftConstants.SESSION_SERVICE_KEY, new SessionService()); - try { - profile = sessionService.getProfileByServer(username, SessionService.getServerId(SERVER_ID, KEY_PAIR.getPublic(), this.key)); - } catch (IOException e) { - session.disconnect(Component.translatable("multiplayer.disconnect.authservers_down"), e); - return; - } + private void authenticate(Session session, boolean shouldAuthenticate, SecretKey key) { + GameProfile profile; + if (shouldAuthenticate && key != null) { + SessionService sessionService = session.getFlag(MinecraftConstants.SESSION_SERVICE_KEY, new SessionService()); + try { + profile = sessionService.getProfileByServer(username, SessionService.getServerId(SERVER_ID, KEY_PAIR.getPublic(), key)); + } catch (IOException e) { + session.disconnect(Component.translatable("multiplayer.disconnect.authservers_down"), e); + return; + } - if (profile == null) { - session.disconnect(Component.translatable("multiplayer.disconnect.unverified_username")); - return; - } - } else { - profile = new GameProfile(UUID.nameUUIDFromBytes(("OfflinePlayer:" + username).getBytes()), username); + if (profile == null) { + session.disconnect(Component.translatable("multiplayer.disconnect.unverified_username")); + return; } + } else { + profile = new GameProfile(UUID.nameUUIDFromBytes(("OfflinePlayer:" + username).getBytes()), username); + } - this.session.setFlag(MinecraftConstants.PROFILE_KEY, profile); + session.setFlag(MinecraftConstants.PROFILE_KEY, profile); - int threshold = session.getFlag(MinecraftConstants.SERVER_COMPRESSION_THRESHOLD, DEFAULT_COMPRESSION_THRESHOLD); - this.session.send(new ClientboundLoginCompressionPacket(threshold)); + int threshold = session.getFlag(MinecraftConstants.SERVER_COMPRESSION_THRESHOLD, DEFAULT_COMPRESSION_THRESHOLD); + if (threshold >= 0) { + session.send(new ClientboundLoginCompressionPacket(threshold), () -> + session.setCompression(new CompressionConfig(threshold, new ZlibCompression(), true))); } + + session.send(new ClientboundGameProfilePacket(profile, true)); } - @RequiredArgsConstructor - private class KeepAliveTask implements Runnable { - private final Session session; + private void keepAlive(Session session) { + while (session.isConnected()) { + KeepAliveState currentKeepAliveState = this.keepAliveState; + if (currentKeepAliveState != null) { + if (System.currentTimeMillis() - currentKeepAliveState.keepAliveTime >= 15000L) { + if (currentKeepAliveState.keepAlivePending) { + session.disconnect(Component.translatable("disconnect.timeout")); + break; + } - @Override - public void run() { - while (this.session.isConnected()) { - lastPingTime = System.currentTimeMillis(); - lastPingId = (int) lastPingTime; - this.session.send(new ClientboundKeepAlivePacket(lastPingId)); + long time = System.currentTimeMillis(); - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - break; + currentKeepAliveState.keepAlivePending = true; + currentKeepAliveState.keepAliveChallenge = time; + currentKeepAliveState.keepAliveTime = time; + session.send(new ClientboundKeepAlivePacket(currentKeepAliveState.keepAliveChallenge)); } } + + // TODO: Implement proper tick loop rather than sleeping + try { + Thread.sleep(50); + } catch (InterruptedException e) { + break; + } } } + + private static class KeepAliveState { + private boolean keepAlivePending; + private long keepAliveChallenge; + private long keepAliveTime = System.currentTimeMillis(); + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/configuration/clientbound/ClientboundFinishConfigurationPacket.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/configuration/clientbound/ClientboundFinishConfigurationPacket.java index 3c3afc8bd..f248a38c3 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/configuration/clientbound/ClientboundFinishConfigurationPacket.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/configuration/clientbound/ClientboundFinishConfigurationPacket.java @@ -15,4 +15,9 @@ public ClientboundFinishConfigurationPacket(ByteBuf in, MinecraftCodecHelper hel @Override public void serialize(ByteBuf out, MinecraftCodecHelper helper) { } + + @Override + public boolean isTerminal() { + return true; + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/configuration/serverbound/ServerboundFinishConfigurationPacket.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/configuration/serverbound/ServerboundFinishConfigurationPacket.java index dd573ad94..75147f6c1 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/configuration/serverbound/ServerboundFinishConfigurationPacket.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/configuration/serverbound/ServerboundFinishConfigurationPacket.java @@ -15,4 +15,9 @@ public ServerboundFinishConfigurationPacket(ByteBuf in, MinecraftCodecHelper hel public void serialize(ByteBuf buf, MinecraftCodecHelper helper) { } + + @Override + public boolean isTerminal() { + return true; + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/handshake/serverbound/ClientIntentionPacket.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/handshake/serverbound/ClientIntentionPacket.java index 60254a3ac..d1799dbc2 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/handshake/serverbound/ClientIntentionPacket.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/handshake/serverbound/ClientIntentionPacket.java @@ -37,4 +37,9 @@ public void serialize(ByteBuf out, MinecraftCodecHelper helper) { public boolean isPriority() { return true; } + + @Override + public boolean isTerminal() { + return true; + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/ingame/clientbound/ClientboundStartConfigurationPacket.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/ingame/clientbound/ClientboundStartConfigurationPacket.java index e6969d373..d7844e6e3 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/ingame/clientbound/ClientboundStartConfigurationPacket.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/ingame/clientbound/ClientboundStartConfigurationPacket.java @@ -15,4 +15,9 @@ public ClientboundStartConfigurationPacket(ByteBuf in, MinecraftCodecHelper help public void serialize(ByteBuf out, MinecraftCodecHelper helper) { } + + @Override + public boolean isTerminal() { + return true; + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/ingame/serverbound/ServerboundConfigurationAcknowledgedPacket.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/ingame/serverbound/ServerboundConfigurationAcknowledgedPacket.java index 6665a56db..56ec5fdaa 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/ingame/serverbound/ServerboundConfigurationAcknowledgedPacket.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/ingame/serverbound/ServerboundConfigurationAcknowledgedPacket.java @@ -15,4 +15,9 @@ public ServerboundConfigurationAcknowledgedPacket(ByteBuf in, MinecraftCodecHelp public void serialize(ByteBuf out, MinecraftCodecHelper helper) { } + + @Override + public boolean isTerminal() { + return true; + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/login/clientbound/ClientboundGameProfilePacket.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/login/clientbound/ClientboundGameProfilePacket.java index 561abcda8..37099e9ba 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/login/clientbound/ClientboundGameProfilePacket.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/login/clientbound/ClientboundGameProfilePacket.java @@ -35,4 +35,9 @@ public void serialize(ByteBuf out, MinecraftCodecHelper helper) { public boolean isPriority() { return true; } + + @Override + public boolean isTerminal() { + return true; + } } diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/login/serverbound/ServerboundLoginAcknowledgedPacket.java b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/login/serverbound/ServerboundLoginAcknowledgedPacket.java index 14800c6de..5370be57f 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/login/serverbound/ServerboundLoginAcknowledgedPacket.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/protocol/packet/login/serverbound/ServerboundLoginAcknowledgedPacket.java @@ -15,4 +15,9 @@ public ServerboundLoginAcknowledgedPacket(ByteBuf in, MinecraftCodecHelper helpe @Override public void serialize(ByteBuf out, MinecraftCodecHelper helper) { } + + @Override + public boolean isTerminal() { + return true; + } } diff --git a/protocol/src/test/resources/simplelogger.properties b/protocol/src/test/resources/simplelogger.properties new file mode 100644 index 000000000..beb56b2e1 --- /dev/null +++ b/protocol/src/test/resources/simplelogger.properties @@ -0,0 +1 @@ +org.slf4j.simpleLogger.defaultLogLevel=debug From fda25b59295a5401108564c24e254822910406fb Mon Sep 17 00:00:00 2001 From: Alex <40795980+AlexProgrammerDE@users.noreply.github.com> Date: Thu, 10 Oct 2024 17:59:34 +0200 Subject: [PATCH 6/6] Fix/proxy protocol (#862) --- .../network/tcp/TcpClientSession.java | 75 ++++++++++--------- 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java index 9f1303635..4c3ac62ff 100644 --- a/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java +++ b/protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java @@ -4,6 +4,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -153,42 +155,39 @@ private InetSocketAddress resolveAddress() { log.debug("Attempting SRV lookup for \"{}\".", name); if (getFlag(BuiltinFlags.ATTEMPT_SRV_RESOLVE, true) && (!this.host.matches(IP_REGEX) && !this.host.equalsIgnoreCase("localhost"))) { - AddressedEnvelope envelope = null; try (DnsNameResolver resolver = new DnsNameResolverBuilder(EVENT_LOOP_GROUP.next()) .channelFactory(TRANSPORT_TYPE.datagramChannelFactory()) .build()) { - envelope = resolver.query(new DefaultDnsQuestion(name, DnsRecordType.SRV)).get(); - - DnsResponse response = envelope.content(); - if (response.count(DnsSection.ANSWER) > 0) { - DefaultDnsRawRecord record = response.recordAt(DnsSection.ANSWER, 0); - if (record.type() == DnsRecordType.SRV) { - ByteBuf buf = record.content(); - buf.skipBytes(4); // Skip priority and weight. - - int port = buf.readUnsignedShort(); - String host = DefaultDnsRecordDecoder.decodeName(buf); - if (host.endsWith(".")) { - host = host.substring(0, host.length() - 1); + AddressedEnvelope envelope = resolver.query(new DefaultDnsQuestion(name, DnsRecordType.SRV)).get(); + try { + DnsResponse response = envelope.content(); + if (response.count(DnsSection.ANSWER) > 0) { + DefaultDnsRawRecord record = response.recordAt(DnsSection.ANSWER, 0); + if (record.type() == DnsRecordType.SRV) { + ByteBuf buf = record.content(); + buf.skipBytes(4); // Skip priority and weight. + + int port = buf.readUnsignedShort(); + String host = DefaultDnsRecordDecoder.decodeName(buf); + if (host.endsWith(".")) { + host = host.substring(0, host.length() - 1); + } + + log.debug("Found SRV record containing \"{}:{}\".", host, port); + + this.host = host; + this.port = port; + } else { + log.debug("Received non-SRV record in response."); } - - log.debug("Found SRV record containing \"{}:{}\".", host, port); - - this.host = host; - this.port = port; } else { - log.debug("Received non-SRV record in response."); + log.debug("No SRV record found."); } - } else { - log.debug("No SRV record found."); + } finally { + envelope.release(); } } catch (Exception e) { log.debug("Failed to resolve SRV record.", e); - } finally { - if (envelope != null) { - envelope.release(); - } - } } else { log.debug("Not resolving SRV record for {}", this.host); @@ -243,13 +242,21 @@ private void initializeHAProxySupport(Channel channel) { } channel.pipeline().addLast("proxy-protocol-encoder", HAProxyMessageEncoder.INSTANCE); - HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6; - InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); - channel.writeAndFlush(new HAProxyMessage( - HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, - clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), - clientAddress.getPort(), remoteAddress.getPort() - )).addListener(future -> channel.pipeline().remove("proxy-protocol-encoder")); + channel.pipeline().addLast("proxy-protocol-packet-sender", new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + HAProxyProxiedProtocol proxiedProtocol = clientAddress.getAddress() instanceof Inet4Address ? HAProxyProxiedProtocol.TCP4 : HAProxyProxiedProtocol.TCP6; + ctx.channel().writeAndFlush(new HAProxyMessage( + HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol, + clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(), + clientAddress.getPort(), remoteAddress.getPort() + )).addListener(future -> channel.pipeline().remove("proxy-protocol-encoder")); + ctx.pipeline().remove(this); + + super.channelActive(ctx); + } + }); } private static void createTcpEventLoopGroup() {