Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split incoming/outgoing packet registry, transition protocol states correctly #841

Merged
merged 58 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b6a7b0a
Initial code changes
AlexProgrammerDE Jul 20, 2024
0d18334
Make it compile
AlexProgrammerDE Jul 20, 2024
9f10790
Small inlining
AlexProgrammerDE Jul 20, 2024
def403a
Make less detectable by anticheats and fix keepalive during configura…
AlexProgrammerDE Jul 20, 2024
f2d67ef
Fix keepalive edge case
AlexProgrammerDE Jul 21, 2024
e33042c
Properly switch inbound protocol in server listener
AlexProgrammerDE Jul 26, 2024
1341744
Add flow control
AlexProgrammerDE Jul 26, 2024
fccce03
Make toggling automatic keepalive work in another way
AlexProgrammerDE Jul 26, 2024
a7e539f
Remove ping pong packets again
AlexProgrammerDE Jul 26, 2024
2b310d2
Address review
AlexProgrammerDE Jul 28, 2024
72721b8
Handle keepalive in configuration
AlexProgrammerDE Jul 28, 2024
576b311
Only spawn keepalive after login is acknowledged
AlexProgrammerDE Jul 28, 2024
68a426d
Prevent very unlikely race conditions with keepalive being switched d…
AlexProgrammerDE Jul 28, 2024
2ae3b2f
Add debug log for packet serialization and state switching
AlexProgrammerDE Aug 19, 2024
5d90ce1
Add one more debug print
AlexProgrammerDE Aug 19, 2024
2439c69
Update protocol/src/main/java/org/geysermc/mcprotocollib/network/Sess…
AlexProgrammerDE Aug 20, 2024
087e644
Update protocol/src/main/java/org/geysermc/mcprotocollib/protocol/Min…
AlexProgrammerDE Aug 20, 2024
608b03d
Update protocol/src/main/java/org/geysermc/mcprotocollib/protocol/Min…
AlexProgrammerDE Aug 20, 2024
b54a21b
Mark packet as nonnull
AlexProgrammerDE Aug 20, 2024
618c0b4
Merge remote-tracking branch 'origin/splitting-v3' into splitting-v3
AlexProgrammerDE Aug 20, 2024
324f066
Fix outbound writing race conditions
AlexProgrammerDE Aug 25, 2024
0ff09d1
Ensure packets are always sent on the event loop
AlexProgrammerDE Sep 9, 2024
b58eb3a
Merge remote-tracking branch 'upstream/master' into splitting-v3
AlexProgrammerDE Sep 10, 2024
8bafb95
Reduce log verbosity
AlexProgrammerDE Sep 10, 2024
a19d1d2
Put errors into debug
AlexProgrammerDE Sep 10, 2024
c838e00
Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/…
AlexProgrammerDE Sep 11, 2024
611cd8e
Add comment to always running in event loop
AlexProgrammerDE Sep 11, 2024
ef81342
Merge remote-tracking branch 'origin/splitting-v3' into splitting-v3
AlexProgrammerDE Sep 11, 2024
b67d76c
Handle auto read earlier to prevent race conditions
AlexProgrammerDE Sep 11, 2024
7f8affb
Make instance dynamic
AlexProgrammerDE Sep 11, 2024
3111bd1
Revert "Make instance dynamic"
AlexProgrammerDE Sep 11, 2024
9706bb3
Make flush packet priority
AlexProgrammerDE Sep 11, 2024
d4eddfa
Do not hide original line that is the cause of the exception
AlexProgrammerDE Sep 12, 2024
6507e77
Cancel packet using exception rather than return
AlexProgrammerDE Sep 12, 2024
4e2b64d
Properly iterate through parents
AlexProgrammerDE Sep 12, 2024
e82133d
Set log level to debug for unit tests
AlexProgrammerDE Sep 12, 2024
54228e5
Revert "Properly iterate through parents"
AlexProgrammerDE Sep 12, 2024
c57de2a
Revert "Cancel packet using exception rather than return"
AlexProgrammerDE Sep 12, 2024
d84ce87
Add write length filter
AlexProgrammerDE Sep 12, 2024
793dfae
Reuse bytebuf for fake flush to avoid unnecessary allocations
AlexProgrammerDE Sep 12, 2024
187e2c5
Make tests happy
AlexProgrammerDE Sep 12, 2024
615cec8
Remake dropping packets
AlexProgrammerDE Sep 12, 2024
792519a
Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/…
AlexProgrammerDE Sep 12, 2024
a12a952
Fix space
AlexProgrammerDE Sep 12, 2024
f9b86e5
Rename to flush packet
AlexProgrammerDE Sep 12, 2024
0b28bf7
Add mojmap reference
AlexProgrammerDE Sep 12, 2024
e4976d9
Share keepalive code
AlexProgrammerDE Sep 13, 2024
6cee7c1
Merge remote-tracking branch 'upstream/master' into splitting-v3
AlexProgrammerDE Sep 17, 2024
9a576e0
Merge remote-tracking branch 'upstream/master' into splitting-v3
AlexProgrammerDE Sep 19, 2024
fc868b6
Fix compilation
AlexProgrammerDE Sep 19, 2024
59fcdf1
Revert a tiny bit closer to vanilla
AlexProgrammerDE Sep 19, 2024
549170a
Inline lambda
AlexProgrammerDE Sep 19, 2024
8f85079
Inherit annotation
AlexProgrammerDE Sep 19, 2024
f3d32cd
Inherit annotation 2
AlexProgrammerDE Sep 19, 2024
ae1e7ed
Use checkerframework annotation
AlexProgrammerDE Sep 20, 2024
f72438b
Fixup grammar slightly
AlexProgrammerDE Sep 25, 2024
c36d179
Add reset states method
AlexProgrammerDE Sep 26, 2024
5090ca4
Add log marker for packet logging
AlexProgrammerDE Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ public void newServerSession(Server server, Session session) {
}

@Override
public PacketRegistry getPacketRegistry() {
public PacketRegistry getInboundPacketRegistry() {
return registry;
}

@Override
public PacketRegistry getOutboundPacketRegistry() {
return registry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void packetReceived(Session session, Packet packet) {
@Override
public void sessionRemoved(SessionRemovedEvent event) {
MinecraftProtocol protocol = (MinecraftProtocol) event.getSession().getPacketProtocol();
if (protocol.getState() == ProtocolState.GAME) {
if (protocol.getOutboundState() == ProtocolState.GAME) {
log.info("Closing server.");
event.getServer().close(false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.geysermc.mcprotocollib.network;

import io.netty.channel.Channel;
import net.kyori.adventure.text.Component;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -10,6 +11,7 @@
import org.geysermc.mcprotocollib.network.event.session.SessionListener;
import org.geysermc.mcprotocollib.network.packet.Packet;
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
import org.geysermc.mcprotocollib.network.tcp.FlushHandler;

import java.net.SocketAddress;
import java.util.List;
Expand Down Expand Up @@ -212,7 +214,17 @@ public interface Session {
*
* @param packet Packet to send.
*/
void send(Packet packet);
default void send(@NonNull Packet packet) {
this.send(packet, null);
}

/**
* Sends a packet and runs the specified callback when the packet has been sent.
*
* @param packet Packet to send.
* @param onSent Callback to run when the packet has been sent.
*/
void send(@NonNull Packet packet, @Nullable Runnable onSent);

/**
* Disconnects the session.
Expand Down Expand Up @@ -255,4 +267,48 @@ default void disconnect(@NonNull Component reason) {
* @param cause Throwable responsible for disconnecting.
*/
void disconnect(@NonNull Component reason, @Nullable Throwable cause);

/**
* Auto read in netty means that the server is automatically reading from the channel.
* Turning it off means that we won't get more packets being decoded until we turn it back on.
* We use this to hold off on reading packets until we are ready to process them.
* For example this is used for switching inbound states with {@link #switchInboundState(Runnable)}.
*
* @param autoRead Whether to enable auto read.
* Default is true.
*/
void setAutoRead(boolean autoRead);

/**
* Returns the underlying netty channel of this session.
*
* @return The netty channel
*/
Channel getChannel();
Konicai marked this conversation as resolved.
Show resolved Hide resolved

/**
* Changes the inbound state of the session and then re-enables auto read.
* This is used after a terminal packet was handled and the session is ready to receive more packets in the new state.
*
* @param switcher The runnable that switches the inbound state.
*/
default void switchInboundState(Runnable switcher) {
switcher.run();

// We switched to the new inbound state
// we can start reading again
setAutoRead(true);
}

/**
* Flushes all packets that are due to be sent and changes the outbound state of the session.
* This makes sure no other threads have scheduled packets to be sent.
*
* @param switcher The runnable that switches the outbound state.
*/
default void switchOutboundState(Runnable switcher) {
getChannel().writeAndFlush(FlushHandler.FLUSH_PACKET).syncUninterruptibly();

switcher.run();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.geysermc.mcprotocollib.network.packet;

import io.netty.buffer.ByteBuf;
import org.geysermc.mcprotocollib.network.Session;

/**
* A network packet. Any given packet must have a constructor that takes in a {@link ByteBuf}.
Expand All @@ -17,4 +18,14 @@ public interface Packet {
default boolean isPriority() {
return false;
}

/**
* Returns whether the packet is terminal. If true, this should be the last packet sent inside a protocol state.
* Subsequently, {@link Session#setAutoRead(boolean)} should be disabled when a terminal packet is received, until the session has switched into a new state and is ready to receive more packets.
*
* @return Whether the packet is terminal.
*/
default boolean isTerminal() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,16 @@ public abstract class PacketProtocol {
public abstract void newServerSession(Server server, Session session);

/**
* Gets the packet registry for this protocol.
* Gets the inbound packet registry for this protocol.
*
* @return The protocol's packet registry.
* @return The protocol's inbound packet registry.
*/
public abstract PacketRegistry getPacketRegistry();
public abstract PacketRegistry getInboundPacketRegistry();

/**
* Gets the outbound packet registry for this protocol.
*
* @return The protocol's outbound packet registry.
*/
public abstract PacketRegistry getOutboundPacketRegistry();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.geysermc.mcprotocollib.network.tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
* Sending a {@link FlushPacket} will ensure all before were sent.
* This handler makes sure it's dropped before it reaches the encoder.
* This logic is similar to the Minecraft UnconfiguredPipelineHandler.OutboundConfigurationTask.
*/
public class FlushHandler extends ChannelOutboundHandlerAdapter {
public static final FlushPacket FLUSH_PACKET = new FlushPacket();

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg == FLUSH_PACKET) {
promise.setSuccess();
} else {
super.write(ctx, msg, promise);
}
}

public static class FlushPacket {
private FlushPacket() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.geysermc.mcprotocollib.network.BuiltinFlags;
import org.geysermc.mcprotocollib.network.ProxyInfo;
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void connect(boolean wait, boolean transferring) {
.localAddress(bindAddress, bindPort)
.handler(new ChannelInitializer<>() {
@Override
public void initChannel(Channel channel) {
public void initChannel(@NonNull Channel channel) {
PacketProtocol protocol = getPacketProtocol();
protocol.newClientSession(TcpClientSession.this, transferring);

Expand All @@ -117,7 +118,9 @@ public void initChannel(Channel channel) {
pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper()));
pipeline.addLast("compression", new TcpPacketCompression(getCodecHelper()));

pipeline.addLast("flow-control", new TcpFlowControlHandler());
pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true));
pipeline.addLast("flush-handler", new FlushHandler());
pipeline.addLast("manager", TcpClientSession.this);
}
});
Expand Down Expand Up @@ -246,9 +249,7 @@ private void initializeHAProxySupport(Channel channel) {
HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol,
clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(),
clientAddress.getPort(), remoteAddress.getPort()
)).addListener(future -> {
channel.pipeline().remove("proxy-protocol-encoder");
});
)).addListener(future -> channel.pipeline().remove("proxy-protocol-encoder"));
}

private static void createTcpEventLoopGroup() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.geysermc.mcprotocollib.network.tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.flow.FlowControlHandler;

/**
* A flow control handler for TCP connections.
* When auto-read is disabled, this will halt decoding of packets until auto-read is re-enabled.
* This is needed because auto-read still allows packets to be decoded, even if the channel is not reading anymore from the network.
* This can happen when the channel already read a packet, but the packet is not yet decoded.
* This will halt all decoding until the channel is ready to process more packets.
*/
public class TcpFlowControlHandler extends FlowControlHandler {
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().config().isAutoRead()) {
super.read(ctx);
}
AlexProgrammerDE marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageCodec;
import org.geysermc.mcprotocollib.network.Session;
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
import org.geysermc.mcprotocollib.network.codec.PacketDefinition;
import org.geysermc.mcprotocollib.network.event.session.PacketErrorEvent;
import org.geysermc.mcprotocollib.network.packet.Packet;
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
import org.geysermc.mcprotocollib.network.packet.PacketRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

import java.util.List;

public class TcpPacketCodec extends ByteToMessageCodec<Packet> {
public class TcpPacketCodec extends MessageToMessageCodec<ByteBuf, Packet> {
private static final Marker marker = MarkerFactory.getMarker("packet_logging");
private static final Logger log = LoggerFactory.getLogger(TcpPacketCodec.class);

private final Session session;
private final boolean client;

Expand All @@ -23,57 +33,87 @@ public TcpPacketCodec(Session session, boolean client) {

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf buf) {
int initial = buf.writerIndex();
public void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {
if (log.isTraceEnabled()) {
log.trace(marker, "Encoding packet: {}", packet.getClass().getSimpleName());
}

PacketProtocol packetProtocol = this.session.getPacketProtocol();
PacketRegistry packetRegistry = packetProtocol.getOutboundPacketRegistry();
PacketCodecHelper codecHelper = this.session.getCodecHelper();
try {
int packetId = this.client ? packetProtocol.getPacketRegistry().getServerboundId(packet) : packetProtocol.getPacketRegistry().getClientboundId(packet);
PacketDefinition definition = this.client ? packetProtocol.getPacketRegistry().getServerboundDefinition(packetId) : packetProtocol.getPacketRegistry().getClientboundDefinition(packetId);
int packetId = this.client ? packetRegistry.getServerboundId(packet) : packetRegistry.getClientboundId(packet);
PacketDefinition definition = this.client ? packetRegistry.getServerboundDefinition(packetId) : packetRegistry.getClientboundDefinition(packetId);

ByteBuf buf = ctx.alloc().buffer();
packetProtocol.getPacketHeader().writePacketId(buf, codecHelper, packetId);
definition.getSerializer().serialize(buf, codecHelper, packet);

out.add(buf);

if (log.isDebugEnabled()) {
log.debug(marker, "Encoded packet {} ({})", packet.getClass().getSimpleName(), packetId);
}
} catch (Throwable t) {
// Reset writer index to make sure incomplete data is not written out.
buf.writerIndex(initial);
log.debug(marker, "Error encoding packet", t);

PacketErrorEvent e = new PacketErrorEvent(this.session, t);
this.session.callEvent(e);
if (!e.shouldSuppress()) {
throw t;
throw new EncoderException(t);
}
}
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
// Vanilla also checks for 0 length
if (buf.readableBytes() == 0) {
return;
}

int initial = buf.readerIndex();

PacketProtocol packetProtocol = this.session.getPacketProtocol();
PacketRegistry packetRegistry = packetProtocol.getInboundPacketRegistry();
PacketCodecHelper codecHelper = this.session.getCodecHelper();
Packet packet = null;
try {
int id = packetProtocol.getPacketHeader().readPacketId(buf, codecHelper);
if (id == -1) {
buf.readerIndex(initial);
return;
}

Packet packet = this.client ? packetProtocol.getPacketRegistry().createClientboundPacket(id, buf, codecHelper) : packetProtocol.getPacketRegistry().createServerboundPacket(id, buf, codecHelper);
log.trace(marker, "Decoding packet with id: {}", id);

packet = this.client ? packetRegistry.createClientboundPacket(id, buf, codecHelper) : packetRegistry.createServerboundPacket(id, buf, codecHelper);

if (buf.readableBytes() > 0) {
throw new IllegalStateException("Packet \"" + packet.getClass().getSimpleName() + "\" not fully read.");
}

out.add(packet);

if (log.isDebugEnabled()) {
log.debug(marker, "Decoded packet {} ({})", packet.getClass().getSimpleName(), id);
}
} catch (Throwable t) {
log.debug(marker, "Error decoding packet", t);

// Advance buffer to end to make sure remaining data in this packet is skipped.
buf.readerIndex(buf.readerIndex() + buf.readableBytes());

PacketErrorEvent e = new PacketErrorEvent(this.session, t);
this.session.callEvent(e);
if (!e.shouldSuppress()) {
throw t;
throw new DecoderException(t);
}
} finally {
if (packet != null && packet.isTerminal()) {
// Next packets are in a different protocol state, so we must
// disable auto-read to prevent reading wrong packets.
session.setAutoRead(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.geysermc.mcprotocollib.network.AbstractServer;
import org.geysermc.mcprotocollib.network.BuiltinFlags;
import org.geysermc.mcprotocollib.network.helper.TransportHelper;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void bindImpl(boolean wait, final Runnable callback) {
.localAddress(this.getHost(), this.getPort())
.childHandler(new ChannelInitializer<>() {
@Override
public void initChannel(Channel channel) {
public void initChannel(@NonNull Channel channel) {
InetSocketAddress address = (InetSocketAddress) channel.remoteAddress();
PacketProtocol protocol = createPacketProtocol();

Expand All @@ -68,7 +69,9 @@ public void initChannel(Channel channel) {
pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), session.getCodecHelper()));
pipeline.addLast("compression", new TcpPacketCompression(session.getCodecHelper()));

pipeline.addLast("flow-control", new TcpFlowControlHandler());
pipeline.addLast("codec", new TcpPacketCodec(session, false));
pipeline.addLast("flush-handler", new FlushHandler());
pipeline.addLast("manager", session);
}
});
Expand Down
Loading
Loading