From 5b63b1e09e02ca8d61ebcd12b289c975724cd439 Mon Sep 17 00:00:00 2001 From: Jackson Date: Tue, 12 Dec 2023 14:54:04 +1300 Subject: [PATCH] Reliable fixes --- Common/Networking/Channels/ReliableChannel.cs | 48 ++++--- Common/Networking/Peers/PeerBase.Mtu.cs | 5 - Common/Networking/Peers/PeerBase.cs | 16 +-- Common/Networking/README.md.meta | 7 - Runtime/PromulTransport.cs | 123 ++++++++---------- Tests~/NetworkCoreTests/LargeDataTests.cs | 1 + 6 files changed, 94 insertions(+), 106 deletions(-) delete mode 100644 Common/Networking/README.md.meta diff --git a/Common/Networking/Channels/ReliableChannel.cs b/Common/Networking/Channels/ReliableChannel.cs index 9e33007..fa6d773 100644 --- a/Common/Networking/Channels/ReliableChannel.cs +++ b/Common/Networking/Channels/ReliableChannel.cs @@ -20,7 +20,6 @@ internal sealed class ReliableChannel : ChannelBase private readonly SemaphoreSlim _pendingPacketsSem = new(1, 1); - private readonly SemaphoreSlim _pendingPacketsSemaphore = new(1, 1); private readonly NetworkPacket?[]? _receivedPackets; //for order private readonly int _windowSize; @@ -83,7 +82,7 @@ private async ValueTask ProcessAckAsync(NetworkPacket packet) return; } - await _pendingPacketsSemaphore.WaitAsync(); + await _pendingPacketsSem.WaitAsync(); try { for (var pendingSeq = _localWindowStart; @@ -91,11 +90,7 @@ private async ValueTask ProcessAckAsync(NetworkPacket packet) pendingSeq = (pendingSeq + 1) % NetConstants.MaxSequence) { var rel = NetUtils.RelativeSequenceNumber(pendingSeq, ackWindowStart); - if (rel >= _windowSize) - { - NetDebug.Write("[PA]REL: " + rel); - break; - } + if (rel >= _windowSize) break; var pendingIdx = pendingSeq % _windowSize; var currentByte = NetConstants.ChanneledHeaderSize + pendingIdx / BitsInByte; @@ -109,7 +104,7 @@ private async ValueTask ProcessAckAsync(NetworkPacket packet) } //Skip false ack - NetDebug.Write($"[PA]False ack: {pendingSeq}"); + Peer.LogDebug($"[Packet {packet.Sequence}] False ack for {pendingSeq}"); continue; } @@ -118,13 +113,18 @@ private async ValueTask ProcessAckAsync(NetworkPacket packet) _localWindowStart = (_localWindowStart + 1) % NetConstants.MaxSequence; //clear packet - if (await _pendingPackets[pendingIdx].ClearAsync(Peer)) - NetDebug.Write($"[PA]Received ack for sequence: {pendingSeq}"); + var p = _pendingPackets[pendingIdx]; + var r = await p.ClearAsync(Peer); + _pendingPackets[pendingIdx] = p; + if (r) + { + Peer.LogDebug($"[Packet {packet.Sequence}] Correct ack for {pendingSeq}"); + } } } finally { - _pendingPacketsSemaphore.Release(); + _pendingPacketsSem.Release(); } } @@ -151,7 +151,7 @@ protected override async Task FlushQueueAsync() try { await outgoingQueueSem.WaitAsync(); - try + try { while (OutgoingQueue.Count > 0) { @@ -162,7 +162,12 @@ protected override async Task FlushQueueAsync() var netPacket = OutgoingQueue.Dequeue(); netPacket.Sequence = (ushort)_localSequence; netPacket.ChannelId = _id; - _pendingPackets[_localSequence % _windowSize].Init(netPacket); + var prp = new PendingReliablePacket + { + _packet = netPacket + }; + _pendingPackets[_localSequence % _windowSize] = prp; + //_pendingPackets[_localSequence % _windowSize] = new PendingReliablePacket(); .Init(netPacket); _localSequence = (_localSequence + 1) % NetConstants.MaxSequence; } } @@ -175,8 +180,17 @@ protected override async Task FlushQueueAsync() for (var pendingSeq = _localWindowStart; pendingSeq != _localSequence; pendingSeq = (pendingSeq + 1) % NetConstants.MaxSequence) - if (await _pendingPackets[pendingSeq % _windowSize].TrySendAsync(currentTime, Peer)) + { + var p = _pendingPackets[pendingSeq % _windowSize]; + var sendSeq = await p.TrySendAsync(currentTime, Peer); + _pendingPackets[pendingSeq % _windowSize] = p; + //var sendSeq = await _pendingPackets[pendingSeq % _windowSize].TrySendAsync(currentTime, Peer); + Peer.LogDebug("Trying to send sequence number " + pendingSeq + ": " + sendSeq); + if (sendSeq) + { hasPendingPackets = true; + } + } } finally { @@ -281,7 +295,7 @@ public override async ValueTask HandlePacketAsync(NetworkPacket packet) //detailed check if (seq == _remoteSequence) { - Peer.LogDebug($"[Receive] {packet.Property} ({packet.Data.Count} bytes)"); + Peer.LogDebug($"[Receive] {packet.Property} ({packet.Data.Count} bytes) (sequence {packet.Sequence})"); await Peer.AddReliablePacket(_deliveryMethod, packet); _remoteSequence = (_remoteSequence + 1) % NetConstants.MaxSequence; @@ -326,7 +340,7 @@ public override async ValueTask HandlePacketAsync(NetworkPacket packet) private class PendingReliablePacket { private bool _isSent; - private NetworkPacket? _packet; + public NetworkPacket? _packet; private long _timeStamp; public override string ToString() @@ -344,7 +358,9 @@ public void Init(NetworkPacket packet) public async Task TrySendAsync(long utcNowTicks, PeerBase peer) { if (_packet == null) + { return false; + } if (_isSent) //check send time { diff --git a/Common/Networking/Peers/PeerBase.Mtu.cs b/Common/Networking/Peers/PeerBase.Mtu.cs index 679f976..3053c71 100644 --- a/Common/Networking/Peers/PeerBase.Mtu.cs +++ b/Common/Networking/Peers/PeerBase.Mtu.cs @@ -57,7 +57,6 @@ private async Task ProcessMtuPacketAsync(NetworkPacket packet) { case PacketProperty.MtuCheck: _mtuCheckAttempts = 0; - NetDebug.Write($"[MTU] Check OK for MTU value {frontCheck}. Sending back MTU OK: " + frontCheck); packet.Property = PacketProperty.MtuOk; await PromulManager.RawSendAsync(packet, EndPoint); break; @@ -84,10 +83,6 @@ private async Task ProcessMtuPacketAsync(NetworkPacket packet) _mtuNegotiationComplete = true; NetDebug.Write($"[MTU] Negotiation complete. MTU for this session: {MaximumTransferUnit}."); } - else - { - NetDebug.Write("[MTU] MTU confirm acknowledged. Setting MTU to " + MaximumTransferUnit); - } break; } diff --git a/Common/Networking/Peers/PeerBase.cs b/Common/Networking/Peers/PeerBase.cs index bd4a7ed..3c5319d 100644 --- a/Common/Networking/Peers/PeerBase.cs +++ b/Common/Networking/Peers/PeerBase.cs @@ -350,13 +350,13 @@ private async Task SendInternal( var packetDataSize = maxMtuCarryingCapacity - NetConstants.FragmentHeaderSize; var totalPackets = data.Count / packetDataSize + (data.Count % packetDataSize == 0 ? 0 : 1); - NetDebug.Write($@"Preparing to send {data.Count} bytes of fragmented data. - Complete data size (header + data): {completePackageSize} - Current MTU: {mtu} - Size of header for {property:G}: {headerSize} - Size of fragmentation header: {NetConstants.FragmentHeaderSize} - Maximum possible data per packet (MTU-header-fragment header): {packetDataSize} - That means we must send {totalPackets} total packets."); + // NetDebug.Write($@"Preparing to send {data.Count} bytes of fragmented data. + // Complete data size (header + data): {completePackageSize} + // Current MTU: {mtu} + // Size of header for {property:G}: {headerSize} + // Size of fragmentation header: {NetConstants.FragmentHeaderSize} + // Maximum possible data per packet (MTU-header-fragment header): {packetDataSize} + // That means we must send {totalPackets} total packets."); if (totalPackets > ushort.MaxValue) throw new TooBigPacketException("Data was split in " + totalPackets + " fragments, which exceeds " + @@ -678,7 +678,7 @@ internal async Task SendUserData(NetworkPacket packet) //if (mergedPacketSize + splitThreshold >= MaximumTransferUnit) //{ await PromulManager.RawSendAsync(packet, EndPoint); - LogDebug($"[Send] {packet.Property} ({packet.Data.Count} bytes)"); + LogDebug($"[Send] {packet.Property} ({packet.Data.Count} bytes) (sequence {packet.Sequence})"); //} // if (_mergePos + mergedPacketSize > MaximumTransferUnit) await SendMerged(); // diff --git a/Common/Networking/README.md.meta b/Common/Networking/README.md.meta deleted file mode 100644 index 645c7eb..0000000 --- a/Common/Networking/README.md.meta +++ /dev/null @@ -1,7 +0,0 @@ -fileFormatVersion: 2 -guid: 006ec120726235644b89c830c137e998 -TextScriptImporter: - externalObjects: {} - userData: - assetBundleName: - assetBundleVariant: diff --git a/Runtime/PromulTransport.cs b/Runtime/PromulTransport.cs index 624594a..21245d5 100644 --- a/Runtime/PromulTransport.cs +++ b/Runtime/PromulTransport.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.IO; using System.Linq; using System.Net; using System.Threading; @@ -7,57 +8,48 @@ using Promul.Common.Networking; using Promul.Common.Networking.Data; using Promul.Common.Structs; +using Unity.Netcode; +using UnityEngine; namespace Promul.Runtime { public class PromulTransport : NetworkTransport { - private readonly CancellationTokenSource _cts = new CancellationTokenSource(); - - private PromulManager _mPromulManager; - - private readonly ConcurrentQueue<(NetworkEvent, ulong, ArraySegment, float)> _queue = - new ConcurrentQueue<(NetworkEvent, ulong, ArraySegment, float)>(); - - private PeerBase? _relayPeer; + enum HostType + { + None, + Server, + Client + } + [Tooltip("The port of the relay server.")] + public ushort Port = 7777; [Tooltip("The address of the relay server.")] public string Address = "127.0.0.1"; + [Tooltip("Interval between ping packets used for detecting latency and checking connection, in seconds")] + public float PingInterval = 1f; [Tooltip("Maximum duration for a connection to survive without receiving packets, in seconds")] public float DisconnectTimeout = 5f; - - private HostType m_HostType; - + [Tooltip("Delay between connection attempts, in seconds")] + public float ReconnectDelay = 0.5f; [Tooltip("Maximum connection attempts before client stops and reports a disconnection")] public int MaxConnectAttempts = 10; - [Tooltip("Size of default buffer for decoding incoming packets, in bytes")] public int MessageBufferSize = 1024 * 5; - - [Tooltip("Interval between ping packets used for detecting latency and checking connection, in seconds")] - public float PingInterval = 1f; - - [Tooltip("The port of the relay server.")] - public ushort Port = 7777; - - [Tooltip("Delay between connection attempts, in seconds")] - public float ReconnectDelay = 0.5f; - - [Tooltip("Simulated maximum additional latency for packets in milliseconds (0 for no simulation")] - public int SimulateMaxLatency; - + [Tooltip("Simulated chance for a packet to be \"lost\", from 0 (no simulation) to 100 percent")] + public int SimulatePacketLossChance = 0; [Tooltip("Simulated minimum additional latency for packets in milliseconds (0 for no simulation)")] - public int SimulateMinLatency; + public int SimulateMinLatency = 0; + [Tooltip("Simulated maximum additional latency for packets in milliseconds (0 for no simulation")] + public int SimulateMaxLatency = 0; - [Tooltip("Simulated chance for a packet to be \"lost\", from 0 (no simulation) to 100 percent")] - public int SimulatePacketLossChance; + PromulManager _mPromulManager; public override ulong ServerClientId => 0; + HostType m_HostType; - public override bool IsSupported => Application.platform != RuntimePlatform.WebGLPlayer; - - private void OnValidate() + void OnValidate() { PingInterval = Math.Max(0, PingInterval); DisconnectTimeout = Math.Max(0, DisconnectTimeout); @@ -69,13 +61,20 @@ private void OnValidate() SimulateMaxLatency = Math.Max(SimulateMinLatency, SimulateMaxLatency); } + ConcurrentQueue<(NetworkEvent, ulong, ArraySegment)> _queue = new ConcurrentQueue<(NetworkEvent, ulong, ArraySegment)>(); + + public override bool IsSupported => Application.platform != RuntimePlatform.WebGLPlayer; + + PeerBase? _relayPeer; + CancellationTokenSource _cts = new CancellationTokenSource(); + public async Task SendControl(RelayControlMessage rcm, NetworkDelivery qos) { var writer = CompositeWriter.Create(); writer.Write(rcm); if (_relayPeer != null) await _relayPeer.SendAsync(writer, ConvertNetworkDelivery(qos)); } - + public override void Send(ulong clientId, ArraySegment data, NetworkDelivery qos) { Debug.Log("Sending to " + clientId + ": " + string.Join(" ", data.Select(e => e.ToString("X2")))); @@ -90,8 +89,7 @@ await SendControl(new RelayControlMessage }); } - private async Task OnNetworkReceive(PeerBase peer, CompositeReader reader, byte channel, - DeliveryMethod deliveryMethod) + async ValueTask OnNetworkReceive(PeerBase peer, CompositeReader reader, byte channel, DeliveryMethod deliveryMethod) { var message = reader.ReadRelayControlMessage(); var author = message.AuthorClientId; @@ -101,24 +99,24 @@ private async Task OnNetworkReceive(PeerBase peer, CompositeReader reader, byte // Either we are host and a client has connected, // or we're a client and we're connected. case RelayControlMessageType.Connected: - { - _queue.Enqueue((NetworkEvent.Connect, author, default, 0)); - break; - } + { + _queue.Enqueue((NetworkEvent.Connect, author, default)); + break; + } // A client has disconnected from the relay. case RelayControlMessageType.Disconnected: - { - _queue.Enqueue((NetworkEvent.Disconnect, author, default, 0)); - break; - } + { + _queue.Enqueue((NetworkEvent.Disconnect, author, default)); + break; + } // Relayed data case RelayControlMessageType.Data: { Debug.Log("Data: " + string.Join(" ", message.Data.Select(e => e.ToString("X2")))); var data = new byte[message.Data.Count]; message.Data.CopyTo(data); - _queue.Enqueue((NetworkEvent.Data, author, data, 0)); - break; + _queue.Enqueue((NetworkEvent.Data, author, data)); + break; } case RelayControlMessageType.KickFromRelay: break; @@ -128,8 +126,7 @@ private async Task OnNetworkReceive(PeerBase peer, CompositeReader reader, byte } } - public override NetworkEvent PollEvent(out ulong clientId, out ArraySegment payload, - out float receiveTime) + public override NetworkEvent PollEvent(out ulong clientId, out ArraySegment payload, out float receiveTime) { clientId = 0; receiveTime = Time.realtimeSinceStartup; @@ -141,11 +138,10 @@ public override NetworkEvent PollEvent(out ulong clientId, out ArraySegment { @@ -172,11 +168,7 @@ public override bool StartServer() public override void DisconnectRemoteClient(ulong clientId) { - SendControl( - new RelayControlMessage - { - Type = RelayControlMessageType.KickFromRelay, AuthorClientId = clientId, Data = Array.Empty() - }, NetworkDelivery.Reliable); + SendControl(new RelayControlMessage {Type = RelayControlMessageType.KickFromRelay, AuthorClientId = clientId, Data = Array.Empty() }, NetworkDelivery.Reliable); } public override void DisconnectLocalClient() @@ -198,7 +190,7 @@ public override void Shutdown() _mPromulManager.OnPeerDisconnected -= OnPeerDisconnected; _mPromulManager.OnReceive -= OnNetworkReceive; _ = _mPromulManager.StopAsync(); - + _cts.Cancel(); _relayPeer = null; m_HostType = HostType.None; @@ -225,7 +217,7 @@ public override void Initialize(NetworkManager? networkManager = null) _mPromulManager.OnReceive += OnNetworkReceive; } - private static DeliveryMethod ConvertNetworkDelivery(NetworkDelivery type) + static DeliveryMethod ConvertNetworkDelivery(NetworkDelivery type) { return type switch { @@ -237,29 +229,20 @@ private static DeliveryMethod ConvertNetworkDelivery(NetworkDelivery type) _ => throw new ArgumentOutOfRangeException(nameof(type), type, null) }; } - - private async Task OnConnectionRequest(ConnectionRequest request) + async ValueTask OnConnectionRequest(ConnectionRequest request) { await request.RejectAsync(force: true); } - - private async Task OnPeerDisconnected(PeerBase peer, DisconnectInfo disconnectInfo) + async ValueTask OnPeerDisconnected(PeerBase peer, DisconnectInfo disconnectInfo) { - Debug.Log("Disconnected " + disconnectInfo.Reason + " " + disconnectInfo.SocketErrorCode); - if (disconnectInfo.Reason != DisconnectReason.DisconnectPeerCalled) - _queue.Enqueue((NetworkEvent.TransportFailure, 0, new ArraySegment(), 0)); + Debug.Log("Disconnected " + disconnectInfo.Reason.ToString() + " " + disconnectInfo.SocketErrorCode.ToString()); + if (disconnectInfo.Reason != DisconnectReason.DisconnectPeerCalled) + _queue.Enqueue((NetworkEvent.TransportFailure, 0, new ArraySegment())); } - private static int SecondsToMilliseconds(float seconds) + static int SecondsToMilliseconds(float seconds) { return (int)Mathf.Ceil(seconds * 1000); } - - private enum HostType - { - None, - Server, - Client - } } } \ No newline at end of file diff --git a/Tests~/NetworkCoreTests/LargeDataTests.cs b/Tests~/NetworkCoreTests/LargeDataTests.cs index c041e91..8d25487 100644 --- a/Tests~/NetworkCoreTests/LargeDataTests.cs +++ b/Tests~/NetworkCoreTests/LargeDataTests.cs @@ -50,6 +50,7 @@ public async Task Test_Large_Data( [Test(Description = "Server sends client correct information, over 100,000 bytes.")] [Timeout(TestConstants.ExtendedTimeout)] + [Repeat(10)] public async Task Test_Very_Large_Data( [Values(100_000, 250_000, 500_000, 1_000_000, 10_000_000)] int nDataSize, [Values(DeliveryMethod.ReliableOrdered, DeliveryMethod.ReliableUnordered)] DeliveryMethod method)