From 0ddba0f83c91cc87c9cbe798411243c08a05e041 Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 16 Nov 2023 16:38:43 +1000 Subject: [PATCH 1/4] Improve the way the read and write stream timeouts are changed for a particular operation --- .../Protocol/MessageExchangeStream.cs | 86 +++++++++---------- .../Protocol/MessageExchangeStreamTimeout.cs | 10 +++ .../Protocol/StreamTimeoutExtensionMethods.cs | 78 +++++++++++++++++ 3 files changed, 129 insertions(+), 45 deletions(-) create mode 100644 source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs create mode 100644 source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index f5c90674..a66bb8df 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -35,7 +35,8 @@ public MessageExchangeStream(Stream stream, IMessageSerializer serializer, Halib this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.controlMessageReader = new ControlMessageReader(halibutTimeoutsAndLimits); this.serializer = serializer; - SetNormalTimeoutsAsync(); + + SetReadAndWriteTimeouts(MessageExchangeStreamTimeout.NormalTimeout); } static int streamCount; @@ -63,27 +64,29 @@ async Task SendIdentityMessageAsync(string identityLine, CancellationToken cance public async Task SendNextAsync(CancellationToken cancellationToken) { - SetShortTimeoutsAsync(); - await SendControlMessageAsync(Next, cancellationToken); - SetNormalTimeoutsAsync(); + await WithTimeout( + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, + async () => await SendControlMessageAsync(Next, cancellationToken)); } public async Task SendProceedAsync(CancellationToken cancellationToken) { - await SendControlMessageAsync(Proceed, cancellationToken); + await WithTimeout( + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, + async () => await SendControlMessageAsync(Proceed, cancellationToken)); } public async Task SendEndAsync(CancellationToken cancellationToken) { - SetShortTimeoutsAsync(); - await SendControlMessageAsync(End, cancellationToken); - SetNormalTimeoutsAsync(); + await WithTimeout( + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, + async () => await SendControlMessageAsync(End, cancellationToken)); } public async Task ExpectNextOrEndAsync(CancellationToken cancellationToken) { var line = await controlMessageReader.ReadUntilNonEmptyControlMessageAsync(stream, cancellationToken); - + return line switch { Next => true, @@ -95,21 +98,22 @@ public async Task ExpectNextOrEndAsync(CancellationToken cancellationToken public async Task ExpectProceedAsync(CancellationToken cancellationToken) { - SetShortTimeoutsAsync(); - - var line = await controlMessageReader.ReadUntilNonEmptyControlMessageAsync(stream, cancellationToken); - - if (line == null) - { - throw new AuthenticationException($"Expected {Proceed}, got no data"); - } + await WithTimeout( + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, + async () => + { + var line = await controlMessageReader.ReadUntilNonEmptyControlMessageAsync(stream, cancellationToken); - if (line != Proceed) - { - throw new ProtocolException($"Expected {Proceed}, got: " + line); - } + if (line == null) + { + throw new AuthenticationException($"Expected {Proceed}, got no data"); + } - SetNormalTimeoutsAsync(); + if (line != Proceed) + { + throw new ProtocolException($"Expected {Proceed}, got: " + line); + } + }); } public async Task IdentifyAsSubscriberAsync(string subscriptionId, CancellationToken cancellationToken) @@ -182,6 +186,21 @@ public async Task ReceiveAsync(CancellationToken cancellationToken) log.Write(EventType.Diagnostic, "Received: {0}", result); return result; } + + async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) + { + await stream.WithTimeout(halibutTimeoutsAndLimits, timeout, func); + } + + async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) + { + return await stream.WithTimeout(halibutTimeoutsAndLimits, timeout, func); + } + + void SetReadAndWriteTimeouts(MessageExchangeStreamTimeout timeout) + { + stream.SetReadAndWriteTimeouts(timeout, halibutTimeoutsAndLimits); + } static RemoteIdentityType ParseIdentityType(string identityType) { @@ -282,28 +301,5 @@ async Task WriteEachStreamAsync(IEnumerable streams, CancellationTok await stream.FlushAsync(cancellationToken); } } - - void SetNormalTimeoutsAsync() - { - // TODO - ASYNC ME UP! - // We should always be given a stream that can timeout. - if (!stream.CanTimeout) - return; - - stream.WriteTimeout = (int)this.halibutTimeoutsAndLimits.TcpClientSendTimeout.TotalMilliseconds; - stream.ReadTimeout = (int)this.halibutTimeoutsAndLimits.TcpClientReceiveTimeout.TotalMilliseconds; - } - - void SetShortTimeoutsAsync() - { - - // TODO - ASYNC ME UP! - // We should always be given a stream that can timeout. - if (!stream.CanTimeout) - return; - - stream.WriteTimeout = (int)this.halibutTimeoutsAndLimits.TcpClientHeartbeatSendTimeout.TotalMilliseconds; - stream.ReadTimeout = (int)this.halibutTimeoutsAndLimits.TcpClientHeartbeatReceiveTimeout.TotalMilliseconds; - } } } diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs b/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs new file mode 100644 index 00000000..d79b5bed --- /dev/null +++ b/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs @@ -0,0 +1,10 @@ +using System; + +namespace Halibut.Transport.Protocol +{ + public enum MessageExchangeStreamTimeout + { + NormalTimeout, + ControlMessageExchangeShortTimeout + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs b/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs new file mode 100644 index 00000000..535531b3 --- /dev/null +++ b/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs @@ -0,0 +1,78 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using Halibut.Diagnostics; + +namespace Halibut.Transport.Protocol +{ + public static class StreamTimeoutExtensionMethods + { + public static async Task WithTimeout(this Stream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, MessageExchangeStreamTimeout timeout, Func func) + { + if (!stream.CanTimeout) + { + await func(); + + return; + } + + var currentReadTimeout = stream.ReadTimeout; + var currentWriteTimeout = stream.WriteTimeout; + + try + { + stream.SetReadAndWriteTimeouts(timeout, halibutTimeoutsAndLimits); + await func(); + } + finally + { + stream.ReadTimeout = currentReadTimeout; + stream.WriteTimeout = currentWriteTimeout; + } + } + + public static async Task WithTimeout(this Stream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, MessageExchangeStreamTimeout timeout, Func> func) + { + if (!stream.CanTimeout) + { + return await func(); + } + + var currentReadTimeout = stream.ReadTimeout; + var currentWriteTimeout = stream.WriteTimeout; + + try + { + stream.SetReadAndWriteTimeouts(timeout, halibutTimeoutsAndLimits); + return await func(); + } + finally + { + stream.ReadTimeout = currentReadTimeout; + stream.WriteTimeout = currentWriteTimeout; + } + } + + public static void SetReadAndWriteTimeouts(this Stream stream, MessageExchangeStreamTimeout timeout, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits) + { + if (!stream.CanTimeout) + { + return; + } + + switch (timeout) + { + case MessageExchangeStreamTimeout.NormalTimeout: + stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientSendTimeout.TotalMilliseconds; + stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout: + stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatSendTimeout.TotalMilliseconds; + stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatReceiveTimeout.TotalMilliseconds; + break; + default: + throw new ArgumentOutOfRangeException(nameof(timeout), timeout, null); + } + } + } +} From fccce9d932edf140f0f52f2f9255739ffee4baf7 Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 16 Nov 2023 15:19:56 +1000 Subject: [PATCH 2/4] Add an auth short timeout Add a polling for next request short timeout --- ...HalibutTimeoutsAndLimitsForTestsBuilder.cs | 15 ++++-- .../Support/SerilogLoggerBuilder.cs | 12 ++--- .../Transport/Protocol/ProtocolFixture.cs | 14 +++++ .../Diagnostics/HalibutTimeoutsAndLimits.cs | 5 ++ .../Protocol/IMessageExchangeStream.cs | 3 ++ .../Protocol/MessageExchangeProtocol.cs | 4 +- .../Protocol/MessageExchangeStream.cs | 54 +++++++++++++------ .../Protocol/MessageExchangeStreamTimeout.cs | 4 +- .../Protocol/StreamTimeoutExtensionMethods.cs | 8 +++ .../TcpClientTimeoutExtensionMethods.cs | 53 ++++++++++++++++++ source/Halibut/Transport/SecureListener.cs | 2 +- .../Halibut/Transport/TcpConnectionFactory.cs | 15 +++--- 12 files changed, 154 insertions(+), 35 deletions(-) create mode 100644 source/Halibut/Transport/Protocol/TcpClientTimeoutExtensionMethods.cs diff --git a/source/Halibut.Tests/HalibutTimeoutsAndLimitsForTestsBuilder.cs b/source/Halibut.Tests/HalibutTimeoutsAndLimitsForTestsBuilder.cs index ac5ce63c..46850364 100644 --- a/source/Halibut.Tests/HalibutTimeoutsAndLimitsForTestsBuilder.cs +++ b/source/Halibut.Tests/HalibutTimeoutsAndLimitsForTestsBuilder.cs @@ -6,6 +6,8 @@ namespace Halibut.Tests public class HalibutTimeoutsAndLimitsForTestsBuilder { public static readonly TimeSpan HalfTheTcpReceiveTimeout = TimeSpan.FromSeconds(22.5); + static readonly TimeSpan PollingQueueWaitTimeout = TimeSpan.FromSeconds(20); + static readonly TimeSpan ShortTimeout = TimeSpan.FromSeconds(15); public HalibutTimeoutsAndLimits Build() { @@ -22,10 +24,17 @@ public HalibutTimeoutsAndLimits Build() TcpClientSendTimeout = HalfTheTcpReceiveTimeout + HalfTheTcpReceiveTimeout, TcpClientReceiveTimeout = HalfTheTcpReceiveTimeout + HalfTheTcpReceiveTimeout, - TcpClientHeartbeatSendTimeout = TimeSpan.FromSeconds(15), - TcpClientHeartbeatReceiveTimeout = TimeSpan.FromSeconds(15), + TcpClientHeartbeatSendTimeout = ShortTimeout, + TcpClientHeartbeatReceiveTimeout = ShortTimeout, + + TcpClientAuthenticationSendTimeout = ShortTimeout, + TcpClientAuthenticationReceiveTimeout = ShortTimeout, + + TcpClientPollingForNextRequestSendTimeout = ShortTimeout, + TcpClientPollingForNextRequestReceiveTimeout = PollingQueueWaitTimeout + ShortTimeout, + TcpClientConnectTimeout = TimeSpan.FromSeconds(20), - PollingQueueWaitTimeout = TimeSpan.FromSeconds(20) + PollingQueueWaitTimeout = PollingQueueWaitTimeout }; } } diff --git a/source/Halibut.Tests/Support/SerilogLoggerBuilder.cs b/source/Halibut.Tests/Support/SerilogLoggerBuilder.cs index d447c312..52db0f7e 100644 --- a/source/Halibut.Tests/Support/SerilogLoggerBuilder.cs +++ b/source/Halibut.Tests/Support/SerilogLoggerBuilder.cs @@ -57,18 +57,18 @@ public ILogger Build() var testHash = CurrentTestHash(); var logger = Logger.ForContext("TestHash", testHash); - if (!HasLoggedTestHash.Contains(testName)) - { - HasLoggedTestHash.Add(testName); - logger.Information($"Test: {TestContext.CurrentContext.Test.Name} has hash {testHash}"); - } - if (traceFileLogger != null) { TraceLoggers.AddOrUpdate(testName, traceFileLogger, (_, _) => throw new Exception("This should never be updated. If it is, it means that a test is being run multiple times in a single test run")); traceFileLogger.SetTestHash(testHash); } + if (!HasLoggedTestHash.Contains(testName)) + { + HasLoggedTestHash.Add(testName); + logger.Information($"Test: {TestContext.CurrentContext.Test.Name} has hash {testHash}"); + } + return logger; } diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index 87c5cf6a..7de27ab7 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -371,6 +371,20 @@ public async Task ReceiveAsync(CancellationToken cancellationToken) return (T)(nextReadQueue.Count > 0 ? nextReadQueue.Dequeue() : default(T)); } + public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) + { + output.AppendLine("|-- Set Timeout " + timeout); + + await func(); + } + + public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) + { + output.AppendLine("|-- Set Timeout " + timeout); + + return await func(); + } + public override string ToString() { return output.ToString(); diff --git a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs index 965939cf..b0b7fab1 100644 --- a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs +++ b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs @@ -62,6 +62,11 @@ public HalibutTimeoutsAndLimits() { } public TimeSpan TcpClientHeartbeatSendTimeout { get; set; } = TimeSpan.FromSeconds(60); public TimeSpan TcpClientHeartbeatReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); + public TimeSpan TcpClientAuthenticationSendTimeout { get; set; } = TimeSpan.FromSeconds(60); + public TimeSpan TcpClientAuthenticationReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); + public TimeSpan TcpClientPollingForNextRequestSendTimeout { get; set; } = TimeSpan.FromSeconds(60); + public TimeSpan TcpClientPollingForNextRequestReceiveTimeout { get; set; } = TimeSpan.FromSeconds(30) + TimeSpan.FromSeconds(60); + /// /// Amount of time to wait for a successful TCP or WSS connection /// diff --git a/source/Halibut/Transport/Protocol/IMessageExchangeStream.cs b/source/Halibut/Transport/Protocol/IMessageExchangeStream.cs index 2b6e4b12..364bc8ce 100644 --- a/source/Halibut/Transport/Protocol/IMessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/IMessageExchangeStream.cs @@ -26,5 +26,8 @@ public interface IMessageExchangeStream Task SendAsync(T message, CancellationToken cancellationToken); Task ReceiveAsync(CancellationToken cancellationToken); + + Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func); + Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func); } } \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs index 1a79bd68..60957702 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs @@ -93,7 +93,9 @@ public async Task ExchangeAsSubscriberAsync(Uri subscriptionId, Func> incomingRequestProcessor, CancellationToken cancellationToken) { - var request = await stream.ReceiveAsync(cancellationToken); + var request = await stream.WithTimeout( + MessageExchangeStreamTimeout.PollingForNextRequestShortTimeout, + async () => await stream.ReceiveAsync(cancellationToken)); if (request != null) { diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index a66bb8df..7ad6a738 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -43,9 +43,14 @@ public MessageExchangeStream(Stream stream, IMessageSerializer serializer, Halib public async Task IdentifyAsClientAsync(CancellationToken cancellationToken) { - log.Write(EventType.Diagnostic, "Identifying as a client"); - await SendIdentityMessageAsync($"{MxClient} {currentVersion}", cancellationToken); - await ExpectServerIdentityAsync(cancellationToken); + await WithTimeout( + MessageExchangeStreamTimeout.AuthenticationShortTimeout, + async () => + { + log.Write(EventType.Diagnostic, "Identifying as a client"); + await SendIdentityMessageAsync($"{MxClient} {currentVersion}", cancellationToken); + await ExpectServerIdentityAsync(cancellationToken); + }); } async Task SendControlMessageAsync(string message, CancellationToken cancellationToken) @@ -85,15 +90,20 @@ await WithTimeout( public async Task ExpectNextOrEndAsync(CancellationToken cancellationToken) { - var line = await controlMessageReader.ReadUntilNonEmptyControlMessageAsync(stream, cancellationToken); - - return line switch - { - Next => true, - null => false, - End => false, - _ => throw new ProtocolException($"Expected {Next} or {End}, got: " + line) - }; + return await WithTimeout( + MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout, + async () => + { + var line = await controlMessageReader.ReadUntilNonEmptyControlMessageAsync(stream, cancellationToken); + + return line switch + { + Next => true, + null => false, + End => false, + _ => throw new ProtocolException($"Expected {Next} or {End}, got: " + line) + }; + }); } public async Task ExpectProceedAsync(CancellationToken cancellationToken) @@ -118,13 +128,23 @@ await WithTimeout( public async Task IdentifyAsSubscriberAsync(string subscriptionId, CancellationToken cancellationToken) { - await SendIdentityMessageAsync($"{MxSubscriber} {currentVersion} {subscriptionId}", cancellationToken); - await ExpectServerIdentityAsync(cancellationToken); + await WithTimeout( + MessageExchangeStreamTimeout.AuthenticationShortTimeout, + async () => + { + await SendIdentityMessageAsync($"{MxSubscriber} {currentVersion} {subscriptionId}", cancellationToken); + await ExpectServerIdentityAsync(cancellationToken); + }); } public async Task IdentifyAsServerAsync(CancellationToken cancellationToken) { - await SendIdentityMessageAsync($"{MxServer} {currentVersion}", cancellationToken); + await WithTimeout( + MessageExchangeStreamTimeout.AuthenticationShortTimeout, + async () => + { + await SendIdentityMessageAsync($"{MxServer} {currentVersion}", cancellationToken); + }); } public async Task ReadRemoteIdentityAsync(CancellationToken cancellationToken) @@ -187,12 +207,12 @@ public async Task ReceiveAsync(CancellationToken cancellationToken) return result; } - async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) + public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) { await stream.WithTimeout(halibutTimeoutsAndLimits, timeout, func); } - async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) + public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) { return await stream.WithTimeout(halibutTimeoutsAndLimits, timeout, func); } diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs b/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs index d79b5bed..4e7ecb00 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStreamTimeout.cs @@ -5,6 +5,8 @@ namespace Halibut.Transport.Protocol public enum MessageExchangeStreamTimeout { NormalTimeout, - ControlMessageExchangeShortTimeout + ControlMessageExchangeShortTimeout, + AuthenticationShortTimeout, + PollingForNextRequestShortTimeout } } \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs b/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs index 535531b3..8b80f030 100644 --- a/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs +++ b/source/Halibut/Transport/Protocol/StreamTimeoutExtensionMethods.cs @@ -70,6 +70,14 @@ public static void SetReadAndWriteTimeouts(this Stream stream, MessageExchangeSt stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatSendTimeout.TotalMilliseconds; stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatReceiveTimeout.TotalMilliseconds; break; + case MessageExchangeStreamTimeout.AuthenticationShortTimeout: + stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientAuthenticationSendTimeout.TotalMilliseconds; + stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientAuthenticationReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.PollingForNextRequestShortTimeout: + stream.WriteTimeout = (int)halibutTimeoutsAndLimits.TcpClientPollingForNextRequestSendTimeout.TotalMilliseconds; + stream.ReadTimeout = (int)halibutTimeoutsAndLimits.TcpClientPollingForNextRequestReceiveTimeout.TotalMilliseconds; + break; default: throw new ArgumentOutOfRangeException(nameof(timeout), timeout, null); } diff --git a/source/Halibut/Transport/Protocol/TcpClientTimeoutExtensionMethods.cs b/source/Halibut/Transport/Protocol/TcpClientTimeoutExtensionMethods.cs new file mode 100644 index 00000000..a0cb4365 --- /dev/null +++ b/source/Halibut/Transport/Protocol/TcpClientTimeoutExtensionMethods.cs @@ -0,0 +1,53 @@ +using System; +using System.IO; +using System.Net.Sockets; +using System.Threading.Tasks; +using Halibut.Diagnostics; + +namespace Halibut.Transport.Protocol +{ + public static class TcpClientTimeoutExtensionMethods + { + public static async Task WithTimeout(this TcpClient stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, MessageExchangeStreamTimeout timeout, Func func) + { + var currentReadTimeout = stream.Client.ReceiveTimeout; + var currentWriteTimeout = stream.Client.SendTimeout; + + try + { + stream.SetReadAndWriteTimeouts(timeout, halibutTimeoutsAndLimits); + await func(); + } + finally + { + stream.ReceiveTimeout = currentReadTimeout; + stream.SendTimeout = currentWriteTimeout; + } + } + + public static void SetReadAndWriteTimeouts(this TcpClient stream, MessageExchangeStreamTimeout timeout, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits) + { + switch (timeout) + { + case MessageExchangeStreamTimeout.NormalTimeout: + stream.Client.SendTimeout = (int)halibutTimeoutsAndLimits.TcpClientSendTimeout.TotalMilliseconds; + stream.Client.ReceiveTimeout = (int)halibutTimeoutsAndLimits.TcpClientReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.ControlMessageExchangeShortTimeout: + stream.Client.SendTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatSendTimeout.TotalMilliseconds; + stream.Client.ReceiveTimeout = (int)halibutTimeoutsAndLimits.TcpClientHeartbeatReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.AuthenticationShortTimeout: + stream.Client.SendTimeout = (int)halibutTimeoutsAndLimits.TcpClientAuthenticationSendTimeout.TotalMilliseconds; + stream.Client.ReceiveTimeout = (int)halibutTimeoutsAndLimits.TcpClientAuthenticationReceiveTimeout.TotalMilliseconds; + break; + case MessageExchangeStreamTimeout.PollingForNextRequestShortTimeout: + stream.Client.SendTimeout = (int)halibutTimeoutsAndLimits.TcpClientPollingForNextRequestSendTimeout.TotalMilliseconds; + stream.Client.ReceiveTimeout = (int)halibutTimeoutsAndLimits.TcpClientPollingForNextRequestReceiveTimeout.TotalMilliseconds; + break; + default: + throw new ArgumentOutOfRangeException(nameof(timeout), timeout, null); + } + } + } +} diff --git a/source/Halibut/Transport/SecureListener.cs b/source/Halibut/Transport/SecureListener.cs index cc943705..4d383aaf 100644 --- a/source/Halibut/Transport/SecureListener.cs +++ b/source/Halibut/Transport/SecureListener.cs @@ -276,7 +276,7 @@ async Task ExecuteRequest(TcpClient client) finally { if (!connectionAuthorizedAndObserved) - { + { connectionsObserver.ConnectionAccepted(false); } diff --git a/source/Halibut/Transport/TcpConnectionFactory.cs b/source/Halibut/Transport/TcpConnectionFactory.cs index d3e96627..d5eedd7c 100644 --- a/source/Halibut/Transport/TcpConnectionFactory.cs +++ b/source/Halibut/Transport/TcpConnectionFactory.cs @@ -43,16 +43,19 @@ public async Task EstablishNewConnectionAsync(ExchangeProtocolBuild log.Write(EventType.SecurityNegotiation, "Performing TLS handshake"); + await client.WithTimeout(halibutTimeoutsAndLimits, MessageExchangeStreamTimeout.AuthenticationShortTimeout, async () => + { #if NETFRAMEWORK - // TODO: ASYNC ME UP! - // AuthenticateAsClientAsync in .NET 4.8 does not support cancellation tokens. So `cancellationToken` is not respected here. - await ssl.AuthenticateAsClientAsync(serviceEndpoint.BaseUri.Host, new X509Certificate2Collection(clientCertificate), SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false); + // TODO: ASYNC ME UP! + // AuthenticateAsClientAsync in .NET 4.8 does not support cancellation tokens. So `cancellationToken` is not respected here. + await ssl.AuthenticateAsClientAsync(serviceEndpoint.BaseUri.Host, new X509Certificate2Collection(clientCertificate), SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false); #else - await ssl.AuthenticateAsClientEnforcingTimeout(serviceEndpoint, new X509Certificate2Collection(clientCertificate), cancellationToken); + await ssl.AuthenticateAsClientEnforcingTimeout(serviceEndpoint, new X509Certificate2Collection(clientCertificate), cancellationToken); #endif - await ssl.WriteAsync(MxLine, 0, MxLine.Length, cancellationToken); - await ssl.FlushAsync(cancellationToken); + await ssl.WriteAsync(MxLine, 0, MxLine.Length, cancellationToken); + await ssl.FlushAsync(cancellationToken); + }); log.Write(EventType.Security, "Secure connection established. Server at {0} identified by thumbprint: {1}, using protocol {2}", client.Client.RemoteEndPoint, serviceEndpoint.RemoteThumbprint, ssl.SslProtocol.ToString()); From f042d49b34e7c2ecea4c9905a115c7c6cfbbaa11 Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 16 Nov 2023 17:18:08 +1000 Subject: [PATCH 3/4] . --- .../Timeouts/TimeoutsApplyDuringHandShake.cs | 2 +- .../Transport/Protocol/ProtocolFixture.cs | 45 +++++++++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/source/Halibut.Tests/Timeouts/TimeoutsApplyDuringHandShake.cs b/source/Halibut.Tests/Timeouts/TimeoutsApplyDuringHandShake.cs index a0f8fae3..a51decb2 100644 --- a/source/Halibut.Tests/Timeouts/TimeoutsApplyDuringHandShake.cs +++ b/source/Halibut.Tests/Timeouts/TimeoutsApplyDuringHandShake.cs @@ -64,7 +64,7 @@ int writeNumberToPauseOn // Ie pause on the first or second write } sw.Stop(); - sw.Elapsed.Should().BeCloseTo(clientAndService.Service.TimeoutsAndLimits.TcpClientReceiveTimeout, TimeSpan.FromSeconds(15), "Since a paused connection early on should not hang forever."); + sw.Elapsed.Should().BeCloseTo(clientAndService.Service.TimeoutsAndLimits.TcpClientAuthenticationSendTimeout, TimeSpan.FromSeconds(15), "Since a paused connection early on should not hang forever."); await echo.SayHelloAsync("The pump wont be paused here so this should work."); } diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index 7de27ab7..0b5fb9dd 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -121,22 +121,32 @@ public async Task ShouldExchangeAsSubscriber() AssertOutput(@" --> MX-SUBSCRIBE subscriptionId <-- MX-SERVER +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED"); } @@ -179,37 +189,57 @@ public async Task ShouldExchangeAsSubscriberWithPooling() AssertOutput(@" --> MX-SUBSCRIBE subscriptionId <-- MX-SERVER +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> ResponseMessage --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED +|-- Set Timeout PollingForNextRequestShortTimeout <-- RequestMessage +|-- Revert Timeout PollingForNextRequestShortTimeout --> NEXT <-- PROCEED"); } @@ -279,17 +309,12 @@ public void SetNumberOfReads(int reads) numberOfReads = reads; } - public void IdentifyAsClient() - { - output.AppendLine("--> MX-CLIENT"); - output.AppendLine("<-- MX-SERVER"); - } - public async Task IdentifyAsClientAsync(CancellationToken cancellationToken) { await Task.CompletedTask; - IdentifyAsClient(); + output.AppendLine("--> MX-CLIENT"); + output.AppendLine("<-- MX-SERVER"); } public async Task SendNextAsync(CancellationToken cancellationToken) @@ -374,15 +399,17 @@ public async Task ReceiveAsync(CancellationToken cancellationToken) public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func func) { output.AppendLine("|-- Set Timeout " + timeout); - await func(); + output.AppendLine("|-- Revert Timeout " + timeout); } public async Task WithTimeout(MessageExchangeStreamTimeout timeout, Func> func) { output.AppendLine("|-- Set Timeout " + timeout); + var response = await func(); + output.AppendLine("|-- Revert Timeout " + timeout); - return await func(); + return response; } public override string ToString() From 09b76d191683242b467d17a6d63907c4024583ab Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 16 Nov 2023 17:21:23 +1000 Subject: [PATCH 4/4] . --- source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs index b0b7fab1..ec43b3bf 100644 --- a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs +++ b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs @@ -47,17 +47,17 @@ public HalibutTimeoutsAndLimits() { } /// /// Amount of time to wait for a TCP or SslStream write to complete successfully /// - public TimeSpan TcpClientSendTimeout { get; set; } = TimeSpan.FromMinutes(10); + public TimeSpan TcpClientSendTimeout { get; set; } = TimeSpan.FromMinutes(1); /// /// Amount of time to wait for a TCP or SslStream read to complete successfully /// - public TimeSpan TcpClientReceiveTimeout { get; set; } = TimeSpan.FromMinutes(10); + public TimeSpan TcpClientReceiveTimeout { get; set; } = TimeSpan.FromMinutes(5); /// /// Amount of time a connection can stay in the pool /// - public TimeSpan TcpClientPooledConnectionTimeout { get; set; } = TimeSpan.FromMinutes(9); + public TimeSpan TcpClientPooledConnectionTimeout { get; set; } = TimeSpan.FromMinutes(4.5); public TimeSpan TcpClientHeartbeatSendTimeout { get; set; } = TimeSpan.FromSeconds(60); public TimeSpan TcpClientHeartbeatReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); @@ -65,7 +65,7 @@ public HalibutTimeoutsAndLimits() { } public TimeSpan TcpClientAuthenticationSendTimeout { get; set; } = TimeSpan.FromSeconds(60); public TimeSpan TcpClientAuthenticationReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); public TimeSpan TcpClientPollingForNextRequestSendTimeout { get; set; } = TimeSpan.FromSeconds(60); - public TimeSpan TcpClientPollingForNextRequestReceiveTimeout { get; set; } = TimeSpan.FromSeconds(30) + TimeSpan.FromSeconds(60); + public TimeSpan TcpClientPollingForNextRequestReceiveTimeout { get; set; } = TimeSpan.FromSeconds(60); /// /// Amount of time to wait for a successful TCP or WSS connection