From d1757c4cd77a58f819770c2dc7fb6562e4c2c6ab Mon Sep 17 00:00:00 2001 From: Nathan Willoughby <86938706+nathanwoctopusdeploy@users.noreply.github.com> Date: Tue, 28 Nov 2023 12:44:22 +1000 Subject: [PATCH] Revert cooperative cancellation (#560) * Revert "Fix a bug in cooperative cancellation for Polling Services causing a request to time out to quickly (#558)" This reverts commit 79841cf71829a61c0fe83c5f61c688e0803402ab. * Revert "Co-operatively cancel all RPC Calls (#525)" This reverts commit 1bde8c8b19b2e4b9602a6e534a4394e3abafd337. --- source/Halibut.Tests/BadCertificatesTests.cs | 17 +- .../CancellationViaClientProxyFixture.cs | 86 ++++++-- .../PollingServiceTimeoutsFixture.cs | 112 ---------- .../ResponseMessageCacheFixture.cs | 8 +- .../PendingRequestQueueFixture.cs | 119 ++++++----- ...AndLatestServiceBuilderExtensionMethods.cs | 7 + ...ReceivingRequestMessagesTimeoutsFixture.cs | 8 +- .../ConnectionObserverFixture.cs | 4 +- .../Transport/Protocol/ProtocolFixture.cs | 14 +- .../Transport/SecureClientFixture.cs | 4 +- ...enCancellingARequestForAPollingTentacle.cs | 198 +++++++++--------- .../Exceptions/RequestCancelledException.cs | 47 ----- source/Halibut/HalibutRuntime.cs | 16 +- .../HalibutProxyRequestOptions.cs | 25 ++- .../ServiceModel/HalibutProxyWithAsync.cs | 29 ++- .../ServiceModel/IPendingRequestQueue.cs | 4 +- .../ServiceModel/PendingRequestQueueAsync.cs | 95 +++------ .../ServiceModel/RequestCancellationTokens.cs | 56 +++++ source/Halibut/Transport/ISecureClient.cs | 4 +- source/Halibut/Transport/PollingClient.cs | 33 +-- .../Protocol/MessageExchangeProtocol.cs | 23 +- .../Transport/Protocol/RequestMessage.cs | 1 + .../RequestMessageWithCancellationToken.cs | 16 -- .../Transport/Protocol/ResponseMessage.cs | 11 +- source/Halibut/Transport/SecureClient.cs | 22 +- .../Transport/SecureListeningClient.cs | 50 ++--- .../Transport/SecureWebSocketClient.cs | 15 +- .../Transport/Streams/NetworkTimeoutStream.cs | 79 +++---- source/Halibut/Util/Try.cs | 13 -- .../PortForwarder.cs | 2 - 30 files changed, 539 insertions(+), 579 deletions(-) delete mode 100644 source/Halibut.Tests/PollingServiceTimeoutsFixture.cs delete mode 100644 source/Halibut/Exceptions/RequestCancelledException.cs create mode 100644 source/Halibut/ServiceModel/RequestCancellationTokens.cs delete mode 100644 source/Halibut/Transport/Protocol/RequestMessageWithCancellationToken.cs diff --git a/source/Halibut.Tests/BadCertificatesTests.cs b/source/Halibut.Tests/BadCertificatesTests.cs index a63323961..fa1f12043 100644 --- a/source/Halibut.Tests/BadCertificatesTests.cs +++ b/source/Halibut.Tests/BadCertificatesTests.cs @@ -99,7 +99,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate_ButServiceIsCon }); // Act - var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken); + var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken); // Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified? Wait.UntilActionSucceeds(() => { @@ -148,14 +148,14 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA }); // Works normally - await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)); - await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)); + await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)); + await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)); // Act clientAndBuilder.Client.TrustOnly(new List()); // Assert - var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken); + var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken); await Task.Delay(3000, CancellationToken); @@ -164,7 +164,8 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA var exception = await AssertionExtensions.Should(() => incrementCount).ThrowAsync(); exception.And.Should().Match(e => e.GetType() == typeof(HalibutClientException) - || e is OperationCanceledException); + || e.GetType() == typeof(OperationCanceledException) + || e.GetType() == typeof(TaskCanceledException)); } } @@ -212,8 +213,8 @@ public async Task FailWhenClientPresentsWrongCertificateToPollingService(ClientA point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000); }); - var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken); - + var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken); + Func hasExpectedLog = logEvent => logEvent.FormattedMessage.Contains("The server at") && logEvent.FormattedMessage.Contains("presented an unexpected security certificate"); @@ -269,7 +270,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate(ClientAndServic point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(10); }); - var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken); + var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken); // Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified? Wait.UntilActionSucceeds(() => { AllLogs(serviceLoggers).Select(l => l.FormattedMessage).ToArray() diff --git a/source/Halibut.Tests/CancellationViaClientProxyFixture.cs b/source/Halibut.Tests/CancellationViaClientProxyFixture.cs index 83d7084da..e29212359 100644 --- a/source/Halibut.Tests/CancellationViaClientProxyFixture.cs +++ b/source/Halibut.Tests/CancellationViaClientProxyFixture.cs @@ -3,7 +3,7 @@ using System.Threading; using System.Threading.Tasks; using FluentAssertions; -using Halibut.Exceptions; +using Halibut.Logging; using Halibut.ServiceModel; using Halibut.Tests.Support; using Halibut.Tests.Support.ExtensionMethods; @@ -21,11 +21,27 @@ public class CancellationViaClientProxyFixture : BaseTest [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] [LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)] - public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase) { var tokenSourceToCancel = new CancellationTokenSource(); - var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token); + var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None); + await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption); + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + [LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)] + public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase) + { + var tokenSourceToCancel = new CancellationTokenSource(); + var halibutRequestOption = new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token); + + await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption); + } + + async Task CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase, CancellationTokenSource tokenSourceToCancel, HalibutProxyRequestOptions halibutRequestOption) + { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithPortForwarding(port => PortForwarderUtil.ForwardingToLocalPort(port).Build()) .WithStandardServices() @@ -39,19 +55,64 @@ public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_ point => point.TryAndConnectForALongTime()); tokenSourceToCancel.CancelAfter(TimeSpan.FromMilliseconds(100)); - + (await AssertAsync.Throws(() => echo.IncrementAsync(halibutRequestOption))) - .And.Should().Match(x => x is ConnectingRequestCancelledException || (x is HalibutClientException && x.As().Message.Contains("The Request was cancelled while Connecting"))); + .And.Message.Contains("The operation was canceled"); clientAndService.PortForwarder.ReturnToNormalMode(); - await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken)); + await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); - (await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken))) + (await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None))) .Should().Be(1, "Since we cancelled the first call"); } } + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + [LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)] + public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanNotCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithHalibutLoggingLevel(LogLevel.Trace) + .WithStandardServices() + .Build(CancellationToken)) + { + var lockService = clientAndService.CreateAsyncClient(); + + var tokenSourceToCancel = new CancellationTokenSource(); + using var tmpDir = new TemporaryDirectory(); + var fileThatOnceDeletedEndsTheCall = tmpDir.CreateRandomFile(); + var callStartedFile = tmpDir.RandomFileName(); + + var inFlightRequest = Task.Run(async () => await lockService.WaitForFileToBeDeletedAsync( + fileThatOnceDeletedEndsTheCall, + callStartedFile, + new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None))); + + Logger.Information("Waiting for the RPC call to be inflight"); + while (!File.Exists(callStartedFile)) + { + await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken); + } + + // The call is now in flight. Call cancel on the cancellation token for that in flight request. + tokenSourceToCancel.Cancel(); + + // Give time for the cancellation to do something + await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken); + + if (inFlightRequest.Status == TaskStatus.Faulted) await inFlightRequest; + + inFlightRequest.IsCompleted.Should().Be(false, $"The cancellation token can not cancel in flight requests. Current state: {inFlightRequest.Status}"); + + File.Delete(fileThatOnceDeletedEndsTheCall); + + // Now the lock is released we should be able to complete the request. + await inFlightRequest; + } + } + [Test] // TODO: ASYNC ME UP! // net48 does not support cancellation of the request as the DeflateStream ends up using Begin and End methods which don't get passed the cancellation token @@ -62,7 +123,7 @@ public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_ [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] [LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)] #endif - public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase) { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() @@ -80,7 +141,7 @@ public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_ await lockService.WaitForFileToBeDeletedAsync( fileThatOnceDeletedEndsTheCall, callStartedFile, - new HalibutProxyRequestOptions(tokenSourceToCancel.Token)); + new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token)); }); Logger.Information("Waiting for the RPC call to be inflight"); @@ -95,8 +156,9 @@ await lockService.WaitForFileToBeDeletedAsync( // Give time for the cancellation to do something await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken); - (await AssertionExtensions.Should(async () => await inFlightRequest).ThrowAsync()) - .And.Should().Match(x => x is TransferringRequestCancelledException || (x is HalibutClientException && x.As().Message.Contains("The Request was cancelled while Transferring"))); + (await AssertionExtensions.Should(async () => await inFlightRequest) + .ThrowAsync()).And + .Should().Match(x => x is OperationCanceledException || (x.GetType() == typeof(HalibutClientException) && x.Message.Contains("The ReadAsync operation was cancelled"))); } } @@ -128,7 +190,7 @@ public async Task HalibutProxyRequestOptionsCanBeSentToLatestAndOldServicesThatP { var echo = clientAndService.CreateAsyncClient(); - (await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken))) + (await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken, null))) .Should() .Be("Hello!!..."); } diff --git a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs deleted file mode 100644 index 7090e3f10..000000000 --- a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs +++ /dev/null @@ -1,112 +0,0 @@ -using System; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; -using FluentAssertions; -using Halibut.Tests.Support; -using Halibut.Tests.Support.TestAttributes; -using Halibut.Tests.Support.TestCases; -using Halibut.Tests.TestServices; -using Halibut.Tests.TestServices.Async; -using Halibut.TestUtils.Contracts; -using NUnit.Framework; - -namespace Halibut.Tests -{ - public class PollingServiceTimeoutsFixture : BaseTest - { - [Test] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] - public async Task WhenThePollingRequestQueueTimeoutIsReached_TheRequestShouldTimeout(ClientAndServiceTestCase clientAndServiceTestCase) - { - var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); - halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5); - - await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() - .As() - .NoService() - .WithStandardServices() - .WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits) - .Build(CancellationToken)) - { - var client = clientAndService.CreateAsyncClient(); - - var stopwatch = Stopwatch.StartNew(); - (await AssertAsync.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken)))) - .And.Message.Should().Contain("A request was sent to a polling endpoint, but the polling endpoint did not collect the request within the allowed time (00:00:05), so the request timed out."); - stopwatch.Stop(); - - stopwatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(8), "Should have timed out quickly"); - } - } - - [Test] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] - public async Task WhenThePollingRequestQueueTimeoutIsReached_ButTheResponseIsReceivedBeforeThePollingRequestMaximumMessageProcessingTimeoutIsReached_TheRequestShouldSucceed(ClientAndServiceTestCase clientAndServiceTestCase) - { - var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); - halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5); - halibutTimeoutsAndLimits.PollingRequestMaximumMessageProcessingTimeout = TimeSpan.FromSeconds(100); - - var responseDelay = TimeSpan.FromSeconds(10); - - await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() - .As() - .WithDoSomeActionService(() => Thread.Sleep(responseDelay)) - .WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits) - .WithInstantReconnectPollingRetryPolicy() - .Build(CancellationToken)) - { - var doSomeActionClient = clientAndService.CreateAsyncClient(); - - var stopwatch = Stopwatch.StartNew(); - await doSomeActionClient.ActionAsync(new(CancellationToken)); - stopwatch.Stop(); - - stopwatch.Elapsed.Should() - .BeGreaterThan(halibutTimeoutsAndLimits.PollingRequestQueueTimeout, "Should have waited longer than the PollingRequestQueueTimeout").And - .BeLessThan(responseDelay + TimeSpan.FromSeconds(5), "Should have received the response after the 10 second delay + 5 second buffer"); - } - } - - [Test] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] - public async Task WhenThePollingRequestMaximumMessageProcessingTimeoutIsReached_TheRequestShouldTimeout_AndTheTransferringPendingRequestCancelled(ClientAndServiceTestCase clientAndServiceTestCase) - { - var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); - halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5); - halibutTimeoutsAndLimits.PollingRequestMaximumMessageProcessingTimeout = TimeSpan.FromSeconds(6); - - var waitSemaphore = new SemaphoreSlim(0, 1); - var connectionsObserver = new TestConnectionsObserver(); - - await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() - .As() - .WithDoSomeActionService(() => waitSemaphore.Wait(CancellationToken)) - .WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits) - .WithInstantReconnectPollingRetryPolicy() - .WithConnectionObserverOnTcpServer(connectionsObserver) - .Build(CancellationToken)) - { - var doSomeActionClient = clientAndService.CreateAsyncClient(); - - var stopwatch = Stopwatch.StartNew(); - (await AssertAsync.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken)))) - .And.Message.Should().Contain("A request was sent to a polling endpoint, the polling endpoint collected it but did not respond in the allowed time (00:00:06), so the request timed out."); - stopwatch.Stop(); - - stopwatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(15), "Should have timed out quickly"); - - connectionsObserver.ConnectionAcceptedCount.Should().Be(1, "A single TCP connection should have been created"); - - waitSemaphore.Release(); - - Wait.UntilActionSucceeds(() => - { - connectionsObserver.ConnectionClosedCount.Should().Be(1, "Cancelling the PendingRequest should have caused the TCP Connection to be cancelled to stop the in-flight request"); - connectionsObserver.ConnectionAcceptedCount.Should().Be(2, "The Service should have reconnected after the request was cancelled"); - }, TimeSpan.FromSeconds(30), Logger, CancellationToken); - } - } - } -} \ No newline at end of file diff --git a/source/Halibut.Tests/ResponseMessageCacheFixture.cs b/source/Halibut.Tests/ResponseMessageCacheFixture.cs index 6b0cee5ff..9787dda98 100644 --- a/source/Halibut.Tests/ResponseMessageCacheFixture.cs +++ b/source/Halibut.Tests/ResponseMessageCacheFixture.cs @@ -48,8 +48,8 @@ public async Task ForAServiceThatDoesNotSupportCaching_WithClientInterface_Respo { var client = clientAndService.CreateAsyncClient(); - var result1 = await client.NonCachableCallAsync(new HalibutProxyRequestOptions(CancellationToken)); - var result2 = await client.NonCachableCallAsync(new HalibutProxyRequestOptions(CancellationToken)); + var result1 = await client.NonCachableCallAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); + var result2 = await client.NonCachableCallAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); result1.Should().NotBe(result2); } @@ -84,8 +84,8 @@ public async Task ForAServiceThatSupportsCaching_WithClientInterface_ResponseSho { var client = clientAndService.CreateAsyncClient(); - var result1 = await client.CachableCallAsync(new HalibutProxyRequestOptions(CancellationToken)); - var result2 = await client.CachableCallAsync(new HalibutProxyRequestOptions(CancellationToken)); + var result1 = await client.CachableCallAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); + var result2 = await client.CachableCallAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); result1.Should().Be(result2); } diff --git a/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs b/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs index e410e733d..c55a7b102 100644 --- a/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs +++ b/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs @@ -8,6 +8,7 @@ using FluentAssertions; using Halibut.ServiceModel; using Halibut.Tests.Builders; +using Halibut.Tests.Support.TestAttributes; using Halibut.Transport.Protocol; using NUnit.Framework; @@ -24,16 +25,17 @@ public async Task QueueAndWait_WillContinueWaitingUntilResponseIsApplied() var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); var request = new RequestMessageBuilder(endpoint).Build(); var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); - + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken); await sut.DequeueAsync(CancellationToken); + // Act await Task.Delay(1000, CancellationToken); queueAndWaitTask.IsCompleted.Should().BeFalse(); await sut.ApplyResponse(expectedResponse, request.Destination); - + // Assert var response = await queueAndWaitTask; response.Should().Be(expectedResponse); @@ -83,7 +85,7 @@ public async Task QueueAndWait_WhenPollingRequestQueueTimeoutIsReached_WillStopW var request = new RequestMessageBuilder(endpoint) .WithServiceEndpoint(seb => seb.WithPollingRequestQueueTimeout(TimeSpan.FromMilliseconds(1000))) .Build(); - + // Act var stopwatch = Stopwatch.StartNew(); var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken); @@ -116,10 +118,8 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo // Act var stopwatch = Stopwatch.StartNew(); - var (queueAndWaitTask, dequeued) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); - + var (queueAndWaitTask, _) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); var response = await queueAndWaitTask; - dequeued.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request when the PollingRequestMaximumMessageProcessingTimeout is reached"); // Assert // Although we sleep for 2 second, sometimes it can be just under. So be generous with the buffer. @@ -130,7 +130,7 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } - + [Test] public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeoutIsReached_ShouldWaitTillRequestRespondsAndClearRequest() { @@ -141,31 +141,28 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo .WithEndpoint(endpoint) .WithPollingQueueWaitTimeout(TimeSpan.Zero) // Remove delay, otherwise we wait the full 20 seconds for DequeueAsync at the end of the test .Build(); - var request = new RequestMessageBuilder(endpoint) .WithServiceEndpoint(seb => seb.WithPollingRequestQueueTimeout(TimeSpan.FromMilliseconds(1000))) .Build(); - var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); // Act var (queueAndWaitTask, dequeued) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); await Task.Delay(2000, CancellationToken); - dequeued.CancellationToken.IsCancellationRequested.Should().BeFalse("Should not have cancelled the request after PollingRequestQueueTimeout is reached"); await sut.ApplyResponse(expectedResponse, request.Destination); var response = await queueAndWaitTask; // Assert - dequeued.RequestMessage.Should().NotBeNull("We should have removed the item from the queue before it timed out.").And.Be(request); + dequeued.Should().NotBeNull("We should have removed the item from the queue before it timed out.").And.Be(request); response.Should().Be(expectedResponse); var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } - + [Test] public async Task QueueAndWait_AddingMultipleItemsToQueueInOrder_ShouldDequeueInOrder() { @@ -190,7 +187,7 @@ public async Task QueueAndWait_AddingMultipleItemsToQueueInOrder_ShouldDequeueIn foreach (var expectedRequest in requestsInOrder) { var request = await sut.DequeueAsync(CancellationToken); - request.RequestMessage.Should().Be(expectedRequest); + request.Should().Be(expectedRequest); } await ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch(sut, requestsInOrder, queueAndWaitTasksInOrder); @@ -220,7 +217,7 @@ public async Task QueueAndWait_AddingMultipleItemsToQueueConcurrently_AllRequest for (int i = 0; i < requestsInOrder.Count; i++) { var request = await sut.DequeueAsync(CancellationToken); - requests.Add(request.RequestMessage); + requests.Add(request); } requests.Should().BeEquivalentTo(requestsInOrder); @@ -253,7 +250,7 @@ public async Task QueueAndWait_Can_Queue_Dequeue_Apply_VeryLargeNumberOfRequests .ToList(); await Task.WhenAll(dequeueTasks); - + // Assert await ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch(sut, requestsInOrder, queueAndWaitTasksInOrder); } @@ -268,7 +265,7 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() const int minimumCancelledRequest = 100; var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); - + var requestsInOrder = Enumerable.Range(0, totalRequest) .Select(_ => new RequestMessageBuilder(endpoint).Build()) .ToList(); @@ -279,16 +276,16 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() { var requestCancellationTokenSource = new CancellationTokenSource(); return new Tuple, CancellationTokenSource>( - StartQueueAndWait(sut, request, requestCancellationTokenSource.Token), + StartQueueAndWait(sut, request, requestCancellationTokenSource.Token), requestCancellationTokenSource); }) .ToList(); await WaitForQueueCountToBecome(sut, requestsInOrder.Count); - + var index = 0; var cancelled = 0; - var dequeueTasks = new ConcurrentBag>(); + var dequeueTasks = new ConcurrentBag>(); var cancelSomeTask = Task.Run(() => { @@ -296,10 +293,10 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() { var currentIndex = Interlocked.Increment(ref index); - if (currentIndex % 2 == 0) + if(currentIndex % 2 == 0) { Interlocked.Increment(ref cancelled); - queueAndWaitTasksInOrder.ElementAt(index - 1).Item2.Cancel(); + queueAndWaitTasksInOrder.ElementAt(index-1).Item2.Cancel(); } } }); @@ -359,7 +356,7 @@ public async Task QueueAndWait_CancellingAPendingRequestBeforeItIsDequeued_Shoul } [Test] - public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_ShouldThrowExceptionAndClearRequest() + public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_ShouldWaitTillRequestRespondsAndClearRequest() { // Arrange const string endpoint = "poll://endpoint001"; @@ -368,8 +365,8 @@ public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_Should .WithEndpoint(endpoint) .WithPollingQueueWaitTimeout(TimeSpan.Zero) // Remove delay, otherwise we wait the full 20 seconds for DequeueAsync at the end of the test .Build(); - var request = new RequestMessageBuilder(endpoint).Build(); + var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); var cancellationTokenSource = new CancellationTokenSource(); @@ -381,17 +378,47 @@ public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_Should // Cancel, and give the queue time to start waiting for a response cancellationTokenSource.Cancel(); await Task.Delay(1000, CancellationToken); - dequeued.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request"); - await AssertionExtensions.Should(() => queueAndWaitTask).ThrowAsync(); + await sut.ApplyResponse(expectedResponse, request.Destination); + + var response = await queueAndWaitTask; // Assert - dequeued.RequestMessage.Should().NotBeNull().And.Be(request); + dequeued.Should().NotBeNull().And.Be(request); + response.Should().Be(expectedResponse); var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } + [Test] + public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_AndPollingRequestMaximumMessageProcessingTimeoutIsReached_WillStopWaiting() + { + // Arrange + const string endpoint = "poll://endpoint001"; + + var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); + var request = new RequestMessageBuilder(endpoint) + .WithServiceEndpoint(seb => seb.WithPollingRequestMaximumMessageProcessingTimeout(TimeSpan.FromMilliseconds(1000))) + .Build(); + + var cancellationTokenSource = new CancellationTokenSource(); + + // Act + var stopwatch = Stopwatch.StartNew(); + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, cancellationTokenSource.Token); + + await sut.DequeueAsync(CancellationToken); + cancellationTokenSource.Cancel(); + var response = await queueAndWaitTask; + + // Assert + // Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer. + stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800)); + response.Id.Should().Be(request.Id); + response.Error.Message.Should().Be("A request was sent to a polling endpoint, the polling endpoint collected it but did not respond in the allowed time (00:00:01), so the request timed out."); + } + [Test] public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout_ShouldReturnNull() { @@ -402,7 +429,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout .WithEndpoint(endpoint) .WithPollingQueueWaitTimeout(TimeSpan.FromSeconds(1)) .Build(); - + // Act var stopwatch = Stopwatch.StartNew(); var request = await sut.DequeueAsync(CancellationToken); @@ -428,7 +455,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout var previousRequest = new RequestMessageBuilder(endpoint).Build(); var expectedPreviousResponse = ResponseMessageBuilder.FromRequest(previousRequest).Build(); - var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, previousRequest, CancellationToken); + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, previousRequest ,CancellationToken); await sut.DequeueAsync(CancellationToken); await sut.ApplyResponse(expectedPreviousResponse, previousRequest.Destination); await queueAndWaitTask; @@ -436,7 +463,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout // Act var stopwatch = Stopwatch.StartNew(); var dequeuedRequest = await sut.DequeueAsync(CancellationToken); - + // Assert // Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer. stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800)); @@ -460,21 +487,21 @@ public async Task DequeueAsync_WillContinueWaitingUntilItemIsQueued() await Task.Delay(1000, CancellationToken); var queueAndWaitTask = StartQueueAndWait(sut, request, CancellationToken); - + var dequeuedRequest = await dequeueTask; - + // Assert // Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer. stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800)); - dequeuedRequest.RequestMessage.Should().Be(request); + dequeuedRequest.Should().Be(request); // Apply a response so we can prove this counts as taking a message. await sut.ApplyResponse(expectedResponse, request.Destination); var response = await queueAndWaitTask; response.Should().Be(expectedResponse); } - + [Test] public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequestIsQueued_ThenOnlyOneCallersReceivesRequest() { @@ -484,7 +511,7 @@ public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequest var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); var request = new RequestMessageBuilder(endpoint).Build(); var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); - + var dequeueTasks = Enumerable.Range(0, 30) .Select(_ => sut.DequeueAsync(CancellationToken)) .ToArray(); @@ -501,7 +528,7 @@ public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequest await queueAndWaitTask; var singleDequeuedRequest = dequeueTasks.Should().ContainSingle(t => t.Result != null).Subject.Result; - singleDequeuedRequest.RequestMessage.Should().Be(request); + singleDequeuedRequest.Should().Be(request); } [Test] @@ -542,11 +569,11 @@ public async Task ApplyResponse_AfterRequestHasBeenCollected_AndWaitingHasBeenCa async Task> StartQueueAndWaitAndWaitForRequestToBeQueued( IPendingRequestQueue pendingRequestQueue, RequestMessage request, - CancellationToken requestCancellationToken) + CancellationToken queueAndWaitCancellationToken) { var count = pendingRequestQueue.Count; - var task = StartQueueAndWait(pendingRequestQueue, request, requestCancellationToken); + var task = StartQueueAndWait(pendingRequestQueue, request, queueAndWaitCancellationToken); await WaitForQueueCountToBecome(pendingRequestQueue, count + 1); @@ -561,26 +588,24 @@ async Task WaitForQueueCountToBecome(IPendingRequestQueue pendingRequestQueue, i } } - Task StartQueueAndWait( - IPendingRequestQueue pendingRequestQueue, - RequestMessage request, - CancellationToken requestCancellationToken) + Task StartQueueAndWait(IPendingRequestQueue pendingRequestQueue, RequestMessage request, CancellationToken queueAndWaitCancellationToken) { var task = Task.Run( - async () => await pendingRequestQueue.QueueAndWaitAsync(request, requestCancellationToken), + async () => await pendingRequestQueue.QueueAndWaitAsync(request, new RequestCancellationTokens(queueAndWaitCancellationToken, CancellationToken.None)), CancellationToken); return task; } - async Task<(Task queueAndWaitTask, RequestMessageWithCancellationToken dequeued)> QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition( - IPendingRequestQueue sut, + async Task<(Task queueAndWaitTask, RequestMessage dequeued)> QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition( + IPendingRequestQueue sut, RequestMessage request, CancellationToken cancellationToken) { //For most tests, this is not a good method to use. It is a fix for some specific tests to cope with a race condition when Team City runs out of resources (and causes tests to become flaky) + while (true) { - var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, cancellationToken); + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken); sut.Count.Should().Be(1, "Item should be queued"); var dequeued = await sut.DequeueAsync(cancellationToken); @@ -610,9 +635,9 @@ static async Task ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch( //Concurrently apply responses to prove this does not cause issues. var applyResponseTasks = requestsInOrder - .Select((r, i) => Task.Factory.StartNew(async () => await sut.ApplyResponse(expectedResponsesInOrder[i], r.Destination))) + .Select((r,i) => Task.Factory.StartNew(async () => await sut.ApplyResponse(expectedResponsesInOrder[i], r.Destination))) .ToList(); - + await Task.WhenAll(applyResponseTasks); var index = 0; diff --git a/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs b/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs index 389414122..997801c1d 100644 --- a/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs +++ b/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs @@ -59,6 +59,13 @@ public static LatestClientAndLatestServiceBuilder WithInstantReconnectPollingRet { return builder.WithPollingReconnectRetryPolicy(() => new RetryPolicy(1, TimeSpan.Zero, TimeSpan.Zero)); } + + public static LatestClientAndLatestServiceBuilder WhenTestingAsyncClient(this LatestClientAndLatestServiceBuilder builder, ClientAndServiceTestCase clientAndServiceTestCase, Action action) + { + + action(builder); + return builder; + } public static LatestClientAndLatestServiceBuilder WithConnectionObserverOnTcpServer(this LatestClientAndLatestServiceBuilder builder, IConnectionsObserver connectionsObserver) { diff --git a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs index bbf1d4e34..5a1616394 100644 --- a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs +++ b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs @@ -29,7 +29,12 @@ public async Task HalibutTimeoutsAndLimits_AppliesToTcpClientReceiveTimeout(Clie .WithPortForwarding(out var portForwarderRef) .WithEchoService() .WithDoSomeActionService(() => portForwarderRef.Value.PauseExistingConnections()) - .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build().WithAllTcpTimeoutsTo(TimeSpan.FromSeconds(133)).WithTcpClientReceiveTimeout(expectedTimeout)) + .WhenTestingAsyncClient(clientAndServiceTestCase, b => + { + b.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build() + .WithAllTcpTimeoutsTo(TimeSpan.FromSeconds(133)) + .WithTcpClientReceiveTimeout(expectedTimeout)); + }) .WithInstantReconnectPollingRetryPolicy() .Build(CancellationToken)) { @@ -43,7 +48,6 @@ public async Task HalibutTimeoutsAndLimits_AppliesToTcpClientReceiveTimeout(Clie sw.Stop(); Logger.Error(e, "Received error"); AssertExceptionMessageLooksLikeAReadTimeout(e); - sw.Elapsed.Should().BeGreaterThan(expectedTimeout - TimeSpan.FromSeconds(2), "The receive timeout should apply, not the shorter heart beat timeout") // -2s give it a little slack to avoid it timed out slightly too early. .And .BeLessThan(expectedTimeout + HalibutTimeoutsAndLimitsForTestsBuilder.HalfTheTcpReceiveTimeout, "We should be timing out on the tcp receive timeout"); diff --git a/source/Halibut.Tests/Transport/Observability/ConnectionObserverFixture.cs b/source/Halibut.Tests/Transport/Observability/ConnectionObserverFixture.cs index b84817c6a..3ca8eaf0b 100644 --- a/source/Halibut.Tests/Transport/Observability/ConnectionObserverFixture.cs +++ b/source/Halibut.Tests/Transport/Observability/ConnectionObserverFixture.cs @@ -93,7 +93,7 @@ public async Task ObserveUnauthorizedPollingConnections(ClientAndServiceTestCase var echo = clientAndBuilder.CreateAsyncClient( point => { point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000); }); - var sayHelloTask = Task.Run(async () => await echo.SayHelloAsync("hello", new HalibutProxyRequestOptions(token)), CancellationToken); + var sayHelloTask = Task.Run(async () => await echo.SayHelloAsync("hello", new HalibutProxyRequestOptions(token, CancellationToken.None)), CancellationToken); await Task.Delay(3000, CancellationToken); @@ -126,7 +126,7 @@ public async Task ObserveUnauthorizedPollingWebSocketConnections(ClientAndServic var echo = clientAndBuilder.CreateAsyncClient( point => { point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000); }); - var sayHelloTask = Task.Run(async () => await echo.SayHelloAsync("hello", new HalibutProxyRequestOptions(token)), CancellationToken); + var sayHelloTask = Task.Run(async () => await echo.SayHelloAsync("hello", new HalibutProxyRequestOptions(token, CancellationToken.None)), CancellationToken); await Task.Delay(3000, CancellationToken); diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index 31808f85c..ef148032a 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -146,9 +146,9 @@ public async Task ShouldExchangeAsServerOfSubscriberAsync() { stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Subscriber, new Uri("poll://12831"))); var requestQueue = Substitute.For(); - var queue = new Queue(); - queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); - queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); + var queue = new Queue(); + queue.Enqueue(new RequestMessage()); + queue.Enqueue(new RequestMessage()); requestQueue.DequeueAsync(CancellationToken.None).Returns(ci => queue.Count > 0 ? queue.Dequeue() : null); stream.SetNumberOfReads(2); @@ -219,16 +219,16 @@ public async Task ShouldExchangeAsServerOfSubscriberWithPoolingAsync() { stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Subscriber, new Uri("poll://12831"))); var requestQueue = Substitute.For(); - var queue = new Queue(); + var queue = new Queue(); requestQueue.DequeueAsync(CancellationToken.None).Returns(ci => queue.Count > 0 ? queue.Dequeue() : null); - queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); - queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); + queue.Enqueue(new RequestMessage()); + queue.Enqueue(new RequestMessage()); stream.SetNumberOfReads(2); await protocol.ExchangeAsServerAsync(req => Task.FromResult(ResponseMessage.FromException(req, new Exception("Divide by zero"))), ri => requestQueue, CancellationToken.None); - queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); + queue.Enqueue(new RequestMessage()); stream.SetNumberOfReads(1); diff --git a/source/Halibut.Tests/Transport/SecureClientFixture.cs b/source/Halibut.Tests/Transport/SecureClientFixture.cs index ebef2bc60..07d568f09 100644 --- a/source/Halibut.Tests/Transport/SecureClientFixture.cs +++ b/source/Halibut.Tests/Transport/SecureClientFixture.cs @@ -80,7 +80,9 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt() var secureClient = new SecureListeningClient((s, l) => GetProtocol(s, l), endpoint, Certificates.Octopus, log, connectionManager, tcpConnectionFactory); ResponseMessage response = null!; - await secureClient.ExecuteTransactionAsync(async (mep, ct) => response = await mep.ExchangeAsClientAsync(request, ct), CancellationToken.None); + using var requestCancellationTokens = new RequestCancellationTokens(CancellationToken.None, CancellationToken.None); + + await secureClient.ExecuteTransactionAsync(async (mep, ct) => response = await mep.ExchangeAsClientAsync(request, ct), requestCancellationTokens); // The pool should be cleared after the second failure await stream.Received(2).IdentifyAsClientAsync(Arg.Any()); diff --git a/source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs b/source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs index 15f234701..ee9b299b5 100644 --- a/source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs +++ b/source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs @@ -1,11 +1,9 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using FluentAssertions; -using Halibut.Exceptions; using Halibut.Logging; using Halibut.ServiceModel; using Halibut.Tests.Support; @@ -13,7 +11,6 @@ using Halibut.Tests.Support.TestCases; using Halibut.Tests.TestServices; using Halibut.Tests.TestServices.Async; -using Halibut.TestUtils.Contracts; using Halibut.Transport.Protocol; using NUnit.Framework; @@ -24,10 +21,15 @@ public class WhenCancellingARequestForAPollingTentacle public class AndTheRequestIsStillQueued : BaseTest { [Test] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] - public async Task TheRequestShouldBeCancelled_WhenTheRequestCancellationTokenIsCancelled(ClientAndServiceTestCase clientAndServiceTestCase) + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[]{ true, false })] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[]{ false, true })] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[]{ true, true })] + public async Task TheRequestShouldBeCancelled_WhenTheConnectingOrInProgressCancellationTokenIsCancelled_OnAsyncClients( + ClientAndServiceTestCase clientAndServiceTestCase, + bool connectingCancellationTokenCancelled, + bool inProgressCancellationTokenCancelled) { - var (tokenSourcesToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions(); + var (tokenSourcesToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions(connectingCancellationTokenCancelled, inProgressCancellationTokenCancelled); await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .AsLatestClientAndLatestServiceBuilder() @@ -37,7 +39,7 @@ public async Task TheRequestShouldBeCancelled_WhenTheRequestCancellationTokenIsC { var doSomeActionService = clientAndService.CreateAsyncClient(); - await AssertAsync.Throws(() => doSomeActionService.ActionAsync(halibutProxyRequestOptions)); + await AssertAsync.Throws(() => doSomeActionService.ActionAsync(halibutProxyRequestOptions)); } } } @@ -46,79 +48,99 @@ public class AndTheRequestHasBeenDequeuedButNoResponseReceived : BaseTest { [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] - public async Task TheRequestShouldBeCancelled_WhenTheRequestCancellationTokenIsCancelled(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task TheRequestShouldNotBeCancelled_WhenTheConnectingCancellationTokenIsCancelled_OnAsyncClients(ClientAndServiceTestCase clientAndServiceTestCase) { - var responseMessages = new List(); - var shouldCancelWhenRequestDequeued = false; var calls = new List(); - var (tokenSourceToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions(); + var (tokenSourcesToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions( + connectingCancellationTokenCancelled: true, + inProgressCancellationTokenCancelled: false); await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .AsLatestClientAndLatestServiceBuilder() - .WithHalibutLoggingLevel(LogLevel.Trace) .WithDoSomeActionService(() => { calls.Add(DateTime.UtcNow); - while (!tokenSourceToCancel.IsCancellationRequested) + while (!tokenSourcesToCancel.All(x => x.IsCancellationRequested)) { - // Wait until the request is cancelled - Thread.Sleep(TimeSpan.FromMilliseconds(100)); + Thread.Sleep(TimeSpan.FromMilliseconds(10)); } Thread.Sleep(TimeSpan.FromSeconds(1)); }) - .WithEchoService() - .WithPendingRequestQueueFactoryBuilder(builder => builder.WithDecorator((_, inner) => - new CancelWhenRequestDequeuedPendingRequestQueueFactory(inner, tokenSourceToCancel, ShouldCancelOnDequeue, OnResponseApplied))) + .WithPendingRequestQueueFactoryBuilder(builder => builder.WithDecorator((_, inner) => new CancelWhenRequestDequeuedPendingRequestQueueFactory(inner, tokenSourcesToCancel))) .Build(CancellationToken)) { - - shouldCancelWhenRequestDequeued = true; var doSomeActionService = clientAndService.CreateAsyncClient(); - var echoService = clientAndService.CreateAsyncClient(); - await AssertAsync.Throws(() => doSomeActionService.ActionAsync(halibutProxyRequestOptions)); - - // Ensure we can send another message to the Service which will validate the Client had the request cancelled to the socket - shouldCancelWhenRequestDequeued = false; - var started = Stopwatch.StartNew(); - await echoService.SayHelloAsync(".", new HalibutProxyRequestOptions(CancellationToken.None)); - // This should return quickly - started.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10)); + await doSomeActionService.ActionAsync(halibutProxyRequestOptions); } calls.Should().HaveCount(1); + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[]{ false, true })] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[]{ true, true })] + public async Task TheRequestShouldBeCancelled_WhenTheInProgressCancellationTokenIsCancelled_OnAsyncClients( + ClientAndServiceTestCase clientAndServiceTestCase, + bool connectingCancellationTokenCancelled, + bool inProgressCancellationTokenCancelled) + { + var calls = new List(); + var (tokenSourcesToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions(connectingCancellationTokenCancelled, inProgressCancellationTokenCancelled); - // Wait for all responses have been received - await Task.Delay(TimeSpan.FromSeconds(5)); + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutLoggingLevel(LogLevel.Trace) + .WithDoSomeActionService(() => + { + calls.Add(DateTime.UtcNow); - // Ensure we did not get a valid response back from the doSomeActionService and that the request was cancelled to the socket. - responseMessages.Should().HaveCount(2); - responseMessages.ElementAt(0).Id.Should().Contain("IDoSomeActionService::ActionAsync"); - responseMessages.ElementAt(0).Error.Should().NotBeNull(); - responseMessages.ElementAt(0).Error.Message.Should().Contain("The Request was cancelled while Transferring"); - responseMessages.ElementAt(1).Error.Should().BeNull(); - responseMessages.ElementAt(1).Id.Should().Contain("IEchoService::SayHelloAsync"); + while (!tokenSourcesToCancel.All(x => x.IsCancellationRequested)) + { + Thread.Sleep(TimeSpan.FromMilliseconds(10)); + } - bool ShouldCancelOnDequeue() + Thread.Sleep(TimeSpan.FromSeconds(1)); + }) + .WithPendingRequestQueueFactoryBuilder(builder => builder.WithDecorator((_, inner) => new CancelWhenRequestDequeuedPendingRequestQueueFactory(inner, tokenSourcesToCancel))) + .Build(CancellationToken)) { - return shouldCancelWhenRequestDequeued; - } + var doSomeActionService = clientAndService.CreateAsyncClient(); - void OnResponseApplied(ResponseMessage response) - { - responseMessages.Add(response); + await AssertAsync.Throws(() => doSomeActionService.ActionAsync(halibutProxyRequestOptions)); } + + calls.Should().HaveCount(1); } } - static (CancellationTokenSource TokenSourceToCancel, HalibutProxyRequestOptions HalibutProxyRequestOptions) CreateTokenSourceAndHalibutProxyRequestOptions() + static (CancellationTokenSource[] ToeknSourcesToCancel, HalibutProxyRequestOptions HalibutProxyRequestOptions) CreateTokenSourceAndHalibutProxyRequestOptions( + bool connectingCancellationTokenCancelled, + bool inProgressCancellationTokenCancelled) { - var requestCancellationTokenSource = new CancellationTokenSource(); - var halibutProxyRequestOptions = new HalibutProxyRequestOptions(requestCancellationTokenSource.Token); + var connectingCancellationTokenSource = new CancellationTokenSource(); + var inProgressCancellationTokenSource = new CancellationTokenSource(); + + CancellationTokenSource[] tokenSourcesToCancel; + + if (connectingCancellationTokenCancelled && inProgressCancellationTokenCancelled) + { + tokenSourcesToCancel = new [] { connectingCancellationTokenSource, inProgressCancellationTokenSource }; + } + else if (connectingCancellationTokenCancelled) + { + tokenSourcesToCancel = new [] { connectingCancellationTokenSource }; + } + else + { + tokenSourcesToCancel = new [] { inProgressCancellationTokenSource }; + } + + var halibutProxyRequestOptions = new HalibutProxyRequestOptions(connectingCancellationTokenSource.Token, inProgressCancellationTokenSource.Token); - return (requestCancellationTokenSource, halibutProxyRequestOptions); + return (tokenSourcesToCancel, halibutProxyRequestOptions); } /// @@ -126,41 +148,40 @@ void OnResponseApplied(ResponseMessage response) /// class CancelWhenRequestQueuedPendingRequestQueueFactory : IPendingRequestQueueFactory { - readonly CancellationTokenSource cancellationTokenSource; + readonly CancellationTokenSource[] cancellationTokenSources; readonly IPendingRequestQueueFactory inner; - public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource) + public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource[] cancellationTokenSources) { - this.cancellationTokenSource = cancellationTokenSource; + this.cancellationTokenSources = cancellationTokenSources; this.inner = inner; } - + + public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource) : this(inner, new[]{ cancellationTokenSource }) { + } + public IPendingRequestQueue CreateQueue(Uri endpoint) { - return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource); + return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources); } class Decorator : IPendingRequestQueue { - readonly CancellationTokenSource cancellationTokenSource; + readonly CancellationTokenSource[] cancellationTokenSources; readonly IPendingRequestQueue inner; - public Decorator(IPendingRequestQueue inner, CancellationTokenSource cancellationTokenSource) + public Decorator(IPendingRequestQueue inner, CancellationTokenSource[] cancellationTokenSources) { this.inner = inner; - this.cancellationTokenSource = cancellationTokenSource; + this.cancellationTokenSources = cancellationTokenSources; } public bool IsEmpty => inner.IsEmpty; public int Count => inner.Count; - public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) - { - await inner.ApplyResponse(response, destination); - } + public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) => await inner.ApplyResponse(response, destination); + public async Task DequeueAsync(CancellationToken cancellationToken) => await inner.DequeueAsync(cancellationToken); - public async Task DequeueAsync(CancellationToken cancellationToken) => await inner.DequeueAsync(cancellationToken); - - public async Task QueueAndWaitAsync(RequestMessage request, CancellationToken requestCancellationToken) + public async Task QueueAndWaitAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) { var task = Task.Run(async () => { @@ -169,11 +190,11 @@ public async Task QueueAndWaitAsync(RequestMessage request, Can await Task.Delay(TimeSpan.FromMilliseconds(10), CancellationToken.None); } - cancellationTokenSource.Cancel(); + Parallel.ForEach(cancellationTokenSources, cancellationTokenSource => cancellationTokenSource.Cancel()); }, CancellationToken.None); - var result = await inner.QueueAndWaitAsync(request, requestCancellationToken); + var result = await inner.QueueAndWaitAsync(request, requestCancellationTokens); await task; return result; } @@ -185,61 +206,50 @@ public async Task QueueAndWaitAsync(RequestMessage request, Can /// class CancelWhenRequestDequeuedPendingRequestQueueFactory : IPendingRequestQueueFactory { - readonly CancellationTokenSource cancellationTokenSource; - readonly Func shouldCancelOnDequeue; - readonly Action onResponseApplied; + readonly CancellationTokenSource[] cancellationTokenSources; readonly IPendingRequestQueueFactory inner; - public CancelWhenRequestDequeuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource, Func shouldCancelOnDequeue, Action onResponseApplied) + public CancelWhenRequestDequeuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource[] cancellationTokenSources) { - this.cancellationTokenSource = cancellationTokenSource; - this.shouldCancelOnDequeue = shouldCancelOnDequeue; + this.cancellationTokenSources = cancellationTokenSources; this.inner = inner; - this.onResponseApplied = onResponseApplied; + } + + public CancelWhenRequestDequeuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource): this(inner, new []{ cancellationTokenSource }) + { } public IPendingRequestQueue CreateQueue(Uri endpoint) { - return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource, shouldCancelOnDequeue, onResponseApplied); + return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources); } class Decorator : IPendingRequestQueue { - readonly CancellationTokenSource cancellationTokenSource; - readonly Func shouldCancel; - readonly Action onResponseApplied; + readonly CancellationTokenSource[] cancellationTokenSources; readonly IPendingRequestQueue inner; - public Decorator(IPendingRequestQueue inner, CancellationTokenSource cancellationTokenSource, Func shouldCancel, Action onResponseApplied) + public Decorator(IPendingRequestQueue inner, CancellationTokenSource[] cancellationTokenSources) { this.inner = inner; - this.cancellationTokenSource = cancellationTokenSource; - this.shouldCancel = shouldCancel; - this.onResponseApplied = onResponseApplied; + this.cancellationTokenSources = cancellationTokenSources; } public bool IsEmpty => inner.IsEmpty; public int Count => inner.Count; - public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) - { - onResponseApplied(response); - await inner.ApplyResponse(response, destination); - } - - public async Task DequeueAsync(CancellationToken cancellationToken) + public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) => await inner.ApplyResponse(response, destination); + + public async Task DequeueAsync(CancellationToken cancellationToken) { var response = await inner.DequeueAsync(cancellationToken); - - if (shouldCancel()) - { - cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(2)); - } + + Parallel.ForEach(cancellationTokenSources, cancellationTokenSource => cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(2))); return response; } - public async Task QueueAndWaitAsync(RequestMessage request, CancellationToken requestCancellationToken) - => await inner.QueueAndWaitAsync(request, requestCancellationToken); + public async Task QueueAndWaitAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) + => await inner.QueueAndWaitAsync(request, requestCancellationTokens); } } } diff --git a/source/Halibut/Exceptions/RequestCancelledException.cs b/source/Halibut/Exceptions/RequestCancelledException.cs deleted file mode 100644 index 18553a41b..000000000 --- a/source/Halibut/Exceptions/RequestCancelledException.cs +++ /dev/null @@ -1,47 +0,0 @@ -using System; - -namespace Halibut.Exceptions -{ - public abstract class RequestCancelledException : OperationCanceledException - { - protected RequestCancelledException(string message, Exception innerException) - : base(message, innerException) - { - } - } - - public class ConnectingRequestCancelledException : RequestCancelledException - { - public ConnectingRequestCancelledException(Exception innerException) - : this("The Request was cancelled while Connecting.", innerException) - { - } - - public ConnectingRequestCancelledException(string message, Exception innerException) - : base(message, innerException) - { - } - public ConnectingRequestCancelledException(string message, string innerException) - : base(message, new Exception(innerException)) - { - } - } - - public class TransferringRequestCancelledException : RequestCancelledException - { - public TransferringRequestCancelledException(Exception innerException) - : this("The Request was cancelled while Transferring.", innerException) - { - } - - public TransferringRequestCancelledException(string message, Exception innerException) - : base(message, innerException) - { - } - - public TransferringRequestCancelledException(string message, string innerException) - : base(message, new Exception(innerException)) - { - } - } -} \ No newline at end of file diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index 271415f1a..784320e01 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -192,11 +192,11 @@ public TAsyncClientService CreateAsyncClient(Serv var logger = logs.ForEndpoint(endpoint.BaseUri); var proxy = DispatchProxyAsync.Create(); - (proxy as HalibutProxyWithAsync)!.Configure(SendOutgoingRequestAsync, typeof(TService), endpoint, logger); + (proxy as HalibutProxyWithAsync)!.Configure(SendOutgoingRequestAsync, typeof(TService), endpoint, logger, CancellationToken.None); return proxy; } - async Task SendOutgoingRequestAsync(RequestMessage request, MethodInfo methodInfo, CancellationToken cancellationToken) + async Task SendOutgoingRequestAsync(RequestMessage request, MethodInfo methodInfo, RequestCancellationTokens requestCancellationTokens) { var endPoint = request.Destination; @@ -212,10 +212,10 @@ async Task SendOutgoingRequestAsync(RequestMessage request, Met switch (endPoint.BaseUri.Scheme.ToLowerInvariant()) { case "https": - response = await SendOutgoingHttpsRequestAsync(request, cancellationToken).ConfigureAwait(false); + response = await SendOutgoingHttpsRequestAsync(request, requestCancellationTokens).ConfigureAwait(false); break; case "poll": - response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false); + response = await SendOutgoingPollingRequestAsync(request, requestCancellationTokens).ConfigureAwait(false); break; default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme); } @@ -225,7 +225,7 @@ async Task SendOutgoingRequestAsync(RequestMessage request, Met return response; } - async Task SendOutgoingHttpsRequestAsync(RequestMessage request, CancellationToken cancellationToken) + async Task SendOutgoingHttpsRequestAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) { var client = new SecureListeningClient(ExchangeProtocolBuilder(), request.Destination, serverCertificate, logs.ForEndpoint(request.Destination.BaseUri), connectionManager, tcpConnectionFactory); @@ -236,15 +236,15 @@ await client.ExecuteTransactionAsync( { response = await protocol.ExchangeAsClientAsync(request, cts).ConfigureAwait(false); }, - cancellationToken).ConfigureAwait(false); + requestCancellationTokens).ConfigureAwait(false); return response; } - async Task SendOutgoingPollingRequestAsync(RequestMessage request, CancellationToken cancellationToken) + async Task SendOutgoingPollingRequestAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) { var queue = GetQueue(request.Destination.BaseUri); - return await queue.QueueAndWaitAsync(request, cancellationToken); + return await queue.QueueAndWaitAsync(request, requestCancellationTokens); } async Task HandleIncomingRequestAsync(RequestMessage request) diff --git a/source/Halibut/ServiceModel/HalibutProxyRequestOptions.cs b/source/Halibut/ServiceModel/HalibutProxyRequestOptions.cs index 253c6046c..598442d4a 100644 --- a/source/Halibut/ServiceModel/HalibutProxyRequestOptions.cs +++ b/source/Halibut/ServiceModel/HalibutProxyRequestOptions.cs @@ -1,3 +1,5 @@ + + using System.Threading; namespace Halibut.ServiceModel @@ -5,13 +7,28 @@ namespace Halibut.ServiceModel public class HalibutProxyRequestOptions { /// - /// When cancelled, will cancel a connecting or in-progress/in-flight RPC call. + /// When cancelled, will only stop an RPC call if it is known to not be received by the service. + /// For a Listening Service, cancellation can occur when the Client is still connecting to the Service. + /// For a Polling Service, cancellation can occur when the Client has queued a Request but the Service has not yet Dequeued it. + /// + public CancellationToken? ConnectingCancellationToken { get; } + + /// + /// When cancelled, will try to cancel an in-progress / in-flight RPC call. + /// This is a best effort cancellation and is not guaranteed. + /// For Sync Halibut, providing this cancellation token is not supported. + /// For Async Halibut this will attempt to cancel the RPC call. + /// If the call is to a Listening Service, then cancellation is performed all the way down to the Socket operations. + /// if the call is to a Polling Service, then cancellation is performed all the way down to the Polling Queue, + /// this means the client can cancel the call but the service will still process the request and return a response. /// - public CancellationToken RequestCancellationToken { get; } + public CancellationToken? InProgressRequestCancellationToken { get; } - public HalibutProxyRequestOptions(CancellationToken cancellationToken) + public HalibutProxyRequestOptions(CancellationToken? connectingCancellationToken, + CancellationToken? inProgressRequestCancellationToken) { - RequestCancellationToken = cancellationToken; + ConnectingCancellationToken = connectingCancellationToken; + InProgressRequestCancellationToken = inProgressRequestCancellationToken; } } } \ No newline at end of file diff --git a/source/Halibut/ServiceModel/HalibutProxyWithAsync.cs b/source/Halibut/ServiceModel/HalibutProxyWithAsync.cs index b73fd9c70..3570cf514 100644 --- a/source/Halibut/ServiceModel/HalibutProxyWithAsync.cs +++ b/source/Halibut/ServiceModel/HalibutProxyWithAsync.cs @@ -9,7 +9,7 @@ namespace Halibut.ServiceModel { - public delegate Task MessageRouter(RequestMessage request, MethodInfo serviceMethod, CancellationToken cancellationToken); + public delegate Task MessageRouter(RequestMessage request, MethodInfo serviceMethod, RequestCancellationTokens requestCancellationTokens); public class HalibutProxyWithAsync : DispatchProxyAsync { @@ -18,17 +18,20 @@ public class HalibutProxyWithAsync : DispatchProxyAsync ServiceEndPoint endPoint; long callId; bool configured; + CancellationToken globalCancellationToken; ILog logger; public void Configure( MessageRouter messageRouter, Type contractType, ServiceEndPoint endPoint, - ILog logger) + ILog logger, + CancellationToken cancellationToken) { this.messageRouter = messageRouter; this.contractType = contractType; this.endPoint = endPoint; + this.globalCancellationToken = cancellationToken; this.configured = true; this.logger = logger; } @@ -69,7 +72,9 @@ public override async Task InvokeAsyncT(MethodInfo asyncMethod, object[] a var request = CreateRequest(asyncMethod, serviceMethod, args); - var response = await messageRouter(request, serviceMethod, halibutProxyRequestOptions?.RequestCancellationToken ?? CancellationToken.None); + using var requestCancellationTokens = RequestCancellationTokens(halibutProxyRequestOptions); + + var response = await messageRouter(request, serviceMethod, requestCancellationTokens); EnsureNotError(response); @@ -98,7 +103,19 @@ RequestMessage CreateRequest(MethodInfo asyncMethod, MethodInfo targetMethod, ob }; return request; } - + + RequestCancellationTokens RequestCancellationTokens(HalibutProxyRequestOptions halibutProxyRequestOptions) + { + if (halibutProxyRequestOptions == null) + { + return new RequestCancellationTokens(globalCancellationToken, CancellationToken.None); + } + + return new RequestCancellationTokens( + halibutProxyRequestOptions.ConnectingCancellationToken ?? CancellationToken.None, + halibutProxyRequestOptions.InProgressRequestCancellationToken ?? CancellationToken.None); + } + void EnsureNotError(ResponseMessage responseMessage) { if (responseMessage == null) @@ -122,7 +139,7 @@ internal static void ThrowExceptionFromReceivedError(ServerError error, ILog log if (theType != null && theType != typeof(HalibutClientException)) { var ctor = theType.GetConstructor(new[] { typeof(string), typeof(string) }); - var e = (Exception)ctor.Invoke(new object[] { error.Message, realException }); + Exception e = (Exception)ctor.Invoke(new object[] { error.Message, realException }); throw e; } } @@ -148,7 +165,7 @@ internal static void ThrowExceptionFromReceivedError(ServerError error, ILog log } } - catch (Exception exception) when (exception is not HalibutClientException && exception is not RequestCancelledException) + catch (Exception exception) when (!(exception is HalibutClientException)) { // Something went wrong trying to understand the ServerError revert back to the old behaviour of just // throwing a standard halibut client exception. diff --git a/source/Halibut/ServiceModel/IPendingRequestQueue.cs b/source/Halibut/ServiceModel/IPendingRequestQueue.cs index 861c81dee..648e1d9a2 100644 --- a/source/Halibut/ServiceModel/IPendingRequestQueue.cs +++ b/source/Halibut/ServiceModel/IPendingRequestQueue.cs @@ -10,7 +10,7 @@ public interface IPendingRequestQueue bool IsEmpty { get; } int Count { get; } Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination); - Task DequeueAsync(CancellationToken cancellationToken); - Task QueueAndWaitAsync(RequestMessage request, CancellationToken cancellationToken); + Task DequeueAsync(CancellationToken cancellationToken); + Task QueueAndWaitAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens); } } \ No newline at end of file diff --git a/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs b/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs index 65df5e2b8..fa23f32e1 100644 --- a/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs +++ b/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs @@ -4,7 +4,6 @@ using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; -using Halibut.Exceptions; using Halibut.Transport.Protocol; using Nito.AsyncEx; @@ -30,27 +29,20 @@ public PendingRequestQueueAsync(ILog log, TimeSpan pollingQueueWaitTimeout) this.pollingQueueWaitTimeout = pollingQueueWaitTimeout; } - public async Task QueueAndWaitAsync(RequestMessage request, CancellationToken cancellationToken) + public async Task QueueAndWaitAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) { using var pending = new PendingRequest(request, log); - try - { - using (await queueLock.LockAsync(cancellationToken)) - { - queue.Add(pending); - inProgress.Add(request.Id, pending); - itemAddedToQueue.Set(); - } - } - catch (OperationCanceledException ex) + using (await queueLock.LockAsync(requestCancellationTokens.LinkedCancellationToken)) { - throw new ConnectingRequestCancelledException(ex); + queue.Add(pending); + inProgress.Add(request.Id, pending); + itemAddedToQueue.Set(); } try { - await pending.WaitUntilComplete(cancellationToken); + await pending.WaitUntilComplete(requestCancellationTokens); } finally { @@ -88,7 +80,7 @@ public int Count } } - public async Task DequeueAsync(CancellationToken cancellationToken) + public async Task DequeueAsync(CancellationToken cancellationToken) { var timer = Stopwatch.StartNew(); @@ -104,7 +96,7 @@ public async Task DequeueAsync(Cancellation var result = await pending.BeginTransfer(); if (result) { - return new (pending.Request, pending.PendingRequestCancellationToken); + return pending.Request; } } } @@ -177,20 +169,16 @@ class PendingRequest : IDisposable readonly SemaphoreSlim transferLock = new(1, 1); bool transferBegun; bool completed; - readonly CancellationTokenSource pendingRequestCancellationTokenSource; public PendingRequest(RequestMessage request, ILog log) { this.request = request; this.log = log; - - pendingRequestCancellationTokenSource = new CancellationTokenSource(); - PendingRequestCancellationToken = pendingRequestCancellationTokenSource.Token; } public RequestMessage Request => request; - public async Task WaitUntilComplete(CancellationToken cancellationToken) + public async Task WaitUntilComplete(RequestCancellationTokens requestCancellationTokens) { log.Write(EventType.MessageExchange, "Request {0} was queued", request); @@ -199,20 +187,19 @@ public async Task WaitUntilComplete(CancellationToken cancellationToken) try { - responseSet = await WaitForResponseToBeSet( - request.Destination.PollingRequestQueueTimeout, - // Don't cancel a dequeued request as we need to wait PollingRequestMaximumMessageProcessingTimeout for it to complete - cancelTheRequestWhenTransferHasBegun: false, - cancellationToken); - + responseSet = await WaitForResponseToBeSet(request.Destination.PollingRequestQueueTimeout, requestCancellationTokens.LinkedCancellationToken); if (responseSet) { log.Write(EventType.MessageExchange, "Request {0} was collected by the polling endpoint", request); return; } } - catch (RequestCancelledException) + catch (Exception ex) when (ex is TaskCanceledException or OperationCanceledException) { + // responseWaiter.Set is only called when the request has been collected and the response received. + // It is possible that the transfer has already started once the requestCancellationTokens.LinkedCancellationToke is cancelled + // If the requestCancellationTokens.InProgressCancellationToken is Ct.None or not cancelled then + // we cannot walk away from the request as it is already in progress and no longer in the connecting phase cancelled = true; using (await transferLock.LockAsync(CancellationToken.None)) @@ -223,6 +210,11 @@ public async Task WaitUntilComplete(CancellationToken cancellationToken) log.Write(EventType.MessageExchange, "Request {0} was cancelled before it could be collected by the polling endpoint", request); throw; } + + if (!requestCancellationTokens.CanCancelInProgressRequest()) + { + log.Write(EventType.MessageExchange, "Request {0} was cancelled after it had been collected by the polling endpoint and will not be cancelled", request); + } } } @@ -241,12 +233,8 @@ public async Task WaitUntilComplete(CancellationToken cancellationToken) if (waitForTransferToComplete) { - responseSet = await WaitForResponseToBeSet( - request.Destination.PollingRequestMaximumMessageProcessingTimeout, - // Cancel the dequeued request to force Reads and Writes to be cancelled - cancelTheRequestWhenTransferHasBegun: true, - cancellationToken); - + // We cannot use requestCancellationTokens.ConnectingCancellationToken here, because if we were cancelled, and the transfer has begun, we should attempt to wait for it. + responseSet = await WaitForResponseToBeSet(request.Destination.PollingRequestMaximumMessageProcessingTimeout, requestCancellationTokens.InProgressRequestCancellationToken); if (responseSet) { // We end up here when the request is cancelled but already being transferred so we need to adjust the log message accordingly @@ -261,7 +249,7 @@ public async Task WaitUntilComplete(CancellationToken cancellationToken) } else { - if (cancellationToken.IsCancellationRequested) + if (requestCancellationTokens.InProgressRequestCancellationToken.IsCancellationRequested) { log.Write(EventType.MessageExchange, "Request {0} was cancelled before a response was received", request); SetResponse(ResponseMessage.FromException(request, new TimeoutException($"A request was sent to a polling endpoint, the polling endpoint collected it but the request was cancelled before the polling endpoint responded."))); @@ -280,37 +268,18 @@ public async Task WaitUntilComplete(CancellationToken cancellationToken) } } - async Task WaitForResponseToBeSet(TimeSpan timeout, bool cancelTheRequestWhenTransferHasBegun, CancellationToken cancellationToken) + async Task WaitForResponseToBeSet(TimeSpan timeout, CancellationToken cancellationToken) { - using var timeoutCancellationTokenSource = new CancellationTokenSource(timeout); - + using var cancellationTokenSource = new CancellationTokenSource(timeout); try { - using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutCancellationTokenSource.Token, cancellationToken); + using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, cancellationToken); await responseWaiter.WaitAsync(linkedTokenSource.Token); } - catch (OperationCanceledException ex) + catch (OperationCanceledException) { - using (await transferLock.LockAsync(CancellationToken.None)) - { - if (transferBegun && cancelTheRequestWhenTransferHasBegun) - { - // Cancel the dequeued request. This will cause co-operative cancellation on the thread dequeuing the request - pendingRequestCancellationTokenSource.Cancel(); - } - else if (!transferBegun) - { - // Cancel the queued request. This will flag the request as cancelled to stop it being dequeued - pendingRequestCancellationTokenSource.Cancel(); - } - - if (timeoutCancellationTokenSource.IsCancellationRequested) - { - return false; - } - - throw transferBegun ? new TransferringRequestCancelledException(ex) : new ConnectingRequestCancelledException(ex); - } + if (cancellationTokenSource.IsCancellationRequested) return false; + throw; } return true; @@ -327,9 +296,7 @@ public async Task BeginTransfer() { using (await transferLock.LockAsync(CancellationToken.None)) { - // Check if the request has already been completed or if the request has been cancelled - // to ensure we don't dequeue an already completed or already cancelled request - if (completed || pendingRequestCancellationTokenSource.IsCancellationRequested) + if (completed) { return false; } @@ -345,7 +312,6 @@ public async Task BeginTransfer() } public ResponseMessage Response { get; private set; } - public CancellationToken PendingRequestCancellationToken { get; } public void SetResponse(ResponseMessage response) { @@ -355,7 +321,6 @@ public void SetResponse(ResponseMessage response) public void Dispose() { - pendingRequestCancellationTokenSource?.Dispose(); transferLock?.Dispose(); } } diff --git a/source/Halibut/ServiceModel/RequestCancellationTokens.cs b/source/Halibut/ServiceModel/RequestCancellationTokens.cs new file mode 100644 index 000000000..ebd2e028e --- /dev/null +++ b/source/Halibut/ServiceModel/RequestCancellationTokens.cs @@ -0,0 +1,56 @@ +using System; +using System.Threading; + +namespace Halibut.ServiceModel +{ + public class RequestCancellationTokens : IDisposable + { + CancellationTokenSource linkedCancellationTokenSource; + + public RequestCancellationTokens(CancellationToken connectingCancellationToken, CancellationToken inProgressRequestCancellationToken) + { + ConnectingCancellationToken = connectingCancellationToken; + InProgressRequestCancellationToken = inProgressRequestCancellationToken; + + if (ConnectingCancellationToken == CancellationToken.None && InProgressRequestCancellationToken == CancellationToken.None) + { + LinkedCancellationToken = CancellationToken.None; + } + else if (InProgressRequestCancellationToken == CancellationToken.None) + { + LinkedCancellationToken = ConnectingCancellationToken; + } + else if (ConnectingCancellationToken == CancellationToken.None) + { + LinkedCancellationToken = InProgressRequestCancellationToken; + } + else if (ConnectingCancellationToken == InProgressRequestCancellationToken) + { + LinkedCancellationToken = ConnectingCancellationToken; + } + else + { + linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ConnectingCancellationToken, InProgressRequestCancellationToken); + + LinkedCancellationToken = linkedCancellationTokenSource.Token; + } + } + + public CancellationToken ConnectingCancellationToken { get; set; } + public CancellationToken InProgressRequestCancellationToken { get; set; } + + public CancellationToken LinkedCancellationToken { get; private set; } + + public void Dispose() + { + LinkedCancellationToken = CancellationToken.None; + linkedCancellationTokenSource?.Dispose(); + linkedCancellationTokenSource = null; + } + + public bool CanCancelInProgressRequest() + { + return InProgressRequestCancellationToken != CancellationToken.None; + } + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/ISecureClient.cs b/source/Halibut/Transport/ISecureClient.cs index 2b5a36a39..97d41970d 100644 --- a/source/Halibut/Transport/ISecureClient.cs +++ b/source/Halibut/Transport/ISecureClient.cs @@ -1,5 +1,5 @@ -using System.Threading; using System.Threading.Tasks; +using Halibut.ServiceModel; using Halibut.Transport.Protocol; namespace Halibut.Transport @@ -7,6 +7,6 @@ namespace Halibut.Transport public interface ISecureClient { ServiceEndPoint ServiceEndpoint { get; } - Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, CancellationToken cancellationToken); + Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, RequestCancellationTokens requestCancellationTokens); } } diff --git a/source/Halibut/Transport/PollingClient.cs b/source/Halibut/Transport/PollingClient.cs index ad3ce1ec3..5cf440745 100644 --- a/source/Halibut/Transport/PollingClient.cs +++ b/source/Halibut/Transport/PollingClient.cs @@ -16,9 +16,13 @@ public class PollingClient : IPollingClient readonly ILog log; readonly ISecureClient secureClient; readonly Uri subscription; + Task? pollingClientLoopTask; - readonly CancellationTokenSource cancellationTokenSource; + readonly CancellationTokenSource workingCancellationTokenSource; + readonly CancellationToken cancellationToken; + readonly Func createRetryPolicy; + RequestCancellationTokens? requestCancellationTokens; public PollingClient(Uri subscription, ISecureClient secureClient, Func handleIncomingRequest, ILog log, CancellationToken cancellationToken, Func createRetryPolicy) { @@ -26,7 +30,8 @@ public PollingClient(Uri subscription, ISecureClient secureClient, Func await ExecutePollingLoopAsyncCatchingExceptions(requestCancellationToken)); + requestCancellationTokens = new RequestCancellationTokens(workingCancellationTokenSource.Token, workingCancellationTokenSource.Token); + pollingClientLoopTask = Task.Run(async () => await ExecutePollingLoopAsyncCatchingExceptions(requestCancellationTokens)); } public void Dispose() { - Try.CatchingError(cancellationTokenSource.Cancel, _ => { }); - Try.CatchingError(cancellationTokenSource.Dispose, _ => { }); + Try.CatchingError(workingCancellationTokenSource.Cancel, _ => { }); + Try.CatchingError(workingCancellationTokenSource.Dispose, _ => { }); + Try.CatchingError(() => requestCancellationTokens?.Dispose(), _ => { }); } /// /// Runs ExecutePollingLoopAsync but catches any exception that falls out of it, log here /// rather than let it be unobserved. We are not expecting an exception but just in case. /// - async Task ExecutePollingLoopAsyncCatchingExceptions(CancellationToken cancellationToken) + async Task ExecutePollingLoopAsyncCatchingExceptions(RequestCancellationTokens requestCancellationTokens) { try { - await ExecutePollingLoopAsync(cancellationToken); + await ExecutePollingLoopAsync(requestCancellationTokens); } catch (Exception e) { @@ -68,11 +75,11 @@ async Task ExecutePollingLoopAsyncCatchingExceptions(CancellationToken cancellat } } - async Task ExecutePollingLoopAsync(CancellationToken cancellationToken) + async Task ExecutePollingLoopAsync(RequestCancellationTokens requestCancellationTokens) { var retry = createRetryPolicy(); var sleepFor = TimeSpan.Zero; - while (!cancellationToken.IsCancellationRequested) + while (!requestCancellationTokens.LinkedCancellationToken.IsCancellationRequested) { try { @@ -85,7 +92,7 @@ await secureClient.ExecuteTransactionAsync(async (protocol, ct) => // Subsequent connection issues will try and reconnect quickly and then back-off retry.Success(); await protocol.ExchangeAsSubscriberAsync(subscription, handleIncomingRequestAsync, int.MaxValue, ct); - }, cancellationToken); + }, requestCancellationTokens); retry.Success(); } finally @@ -103,7 +110,7 @@ await secureClient.ExecuteTransactionAsync(async (protocol, ct) => } finally { - await Task.Delay(sleepFor, cancellationToken); + await Task.Delay(sleepFor, requestCancellationTokens.LinkedCancellationToken); } } } diff --git a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs index b411bb2da..c2e26af38 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs @@ -3,7 +3,6 @@ using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; -using Halibut.Exceptions; using Halibut.ServiceModel; using Halibut.Transport.Observability; @@ -215,36 +214,26 @@ async Task ProcessSubscriberAsync(IPendingRequestQueue pendingRequests, Cancella } } - async Task ProcessReceiverInternalAsync(IPendingRequestQueue pendingRequests, RequestMessageWithCancellationToken nextRequest, CancellationToken cancellationToken) + async Task ProcessReceiverInternalAsync(IPendingRequestQueue pendingRequests, RequestMessage nextRequest, CancellationToken cancellationToken) { try { if (nextRequest != null) { - using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(nextRequest.CancellationToken, cancellationToken); - var linkedCancellationToken = linkedTokenSource.Token; - - var response = await SendAndReceiveRequest(nextRequest.RequestMessage, linkedCancellationToken); - await pendingRequests.ApplyResponse(response, nextRequest.RequestMessage.Destination); + var response = await SendAndReceiveRequest(nextRequest, cancellationToken); + await pendingRequests.ApplyResponse(response, nextRequest.Destination); } else { - await stream.SendAsync(null, cancellationToken); + await stream.SendAsync(nextRequest, cancellationToken); } } catch (Exception ex) { if (nextRequest != null) { - var cancellationException = nextRequest.CancellationToken.IsCancellationRequested ? new TransferringRequestCancelledException(ex) : ex; - - var response = ResponseMessage.FromException(nextRequest.RequestMessage, cancellationException); - await pendingRequests.ApplyResponse(response, nextRequest.RequestMessage.Destination); - - if (nextRequest.CancellationToken.IsCancellationRequested) - { - throw cancellationException; - } + var response = ResponseMessage.FromException(nextRequest, ex); + await pendingRequests.ApplyResponse(response, nextRequest.Destination); } return false; diff --git a/source/Halibut/Transport/Protocol/RequestMessage.cs b/source/Halibut/Transport/Protocol/RequestMessage.cs index 56df18dd7..2bdd68ee4 100644 --- a/source/Halibut/Transport/Protocol/RequestMessage.cs +++ b/source/Halibut/Transport/Protocol/RequestMessage.cs @@ -1,4 +1,5 @@ using System; +using Halibut.Diagnostics; using Newtonsoft.Json; namespace Halibut.Transport.Protocol diff --git a/source/Halibut/Transport/Protocol/RequestMessageWithCancellationToken.cs b/source/Halibut/Transport/Protocol/RequestMessageWithCancellationToken.cs deleted file mode 100644 index d85c71e2a..000000000 --- a/source/Halibut/Transport/Protocol/RequestMessageWithCancellationToken.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.Threading; - -namespace Halibut.Transport.Protocol -{ - public class RequestMessageWithCancellationToken - { - public RequestMessageWithCancellationToken(RequestMessage requestMessage, CancellationToken cancellationToken) - { - RequestMessage = requestMessage; - CancellationToken = cancellationToken; - } - - public RequestMessage RequestMessage { get; } - public CancellationToken CancellationToken { get; } - } -} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/ResponseMessage.cs b/source/Halibut/Transport/Protocol/ResponseMessage.cs index 4bcf6b2b0..cbfeb25ad 100644 --- a/source/Halibut/Transport/Protocol/ResponseMessage.cs +++ b/source/Halibut/Transport/Protocol/ResponseMessage.cs @@ -1,6 +1,5 @@ using System; using Halibut.Diagnostics; -using Halibut.Exceptions; using Newtonsoft.Json; namespace Halibut.Transport.Protocol @@ -33,14 +32,12 @@ public static ResponseMessage FromException(RequestMessage request, Exception ex internal static ServerError ServerErrorFromException(Exception ex) { - string errorType = null; - - if (ex is HalibutClientException or RequestCancelledException) + string ErrorType = null; + if (ex is HalibutClientException) { - errorType = ex.GetType().FullName; + ErrorType = ex.GetType().FullName; } - - return new ServerError { Message = ex.UnpackFromContainers().Message, Details = ex.ToString(), HalibutErrorType = errorType }; + return new ServerError { Message = ex.UnpackFromContainers().Message, Details = ex.ToString(), HalibutErrorType = ErrorType }; } } } \ No newline at end of file diff --git a/source/Halibut/Transport/SecureClient.cs b/source/Halibut/Transport/SecureClient.cs index 8d2ea7634..c1f989452 100644 --- a/source/Halibut/Transport/SecureClient.cs +++ b/source/Halibut/Transport/SecureClient.cs @@ -8,7 +8,9 @@ using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; +using Halibut.ServiceModel; using Halibut.Transport.Protocol; +using Halibut.Transport.Streams; using Halibut.Util; namespace Halibut.Transport @@ -38,7 +40,7 @@ public SecureClient(ExchangeProtocolBuilder protocolBuilder, public ServiceEndPoint ServiceEndpoint { get; } - public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, CancellationToken cancellationToken) + public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, RequestCancellationTokens requestCancellationTokens) { var retryInterval = ServiceEndpoint.RetryListeningSleepInterval; @@ -51,7 +53,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C { if (i > 0) { - await Task.Delay(retryInterval, cancellationToken).ConfigureAwait(false); + await Task.Delay(retryInterval, requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); log.Write(EventType.OpeningNewConnection, $"Retrying connection to {ServiceEndpoint.Format()} - attempt #{i}."); } @@ -67,11 +69,14 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C tcpConnectionFactory, ServiceEndpoint, log, - cancellationToken).ConfigureAwait(false); + requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); // Beyond this point, we have no way to be certain that the server hasn't tried to process a request; therefore, we can't retry after this point retryAllowed = false; - await protocolHandler(connection.Protocol, cancellationToken).ConfigureAwait(false); + + // TODO: Enhancement: Pass the RequestCancellationTokens to the protocol handler so that it can cancel + // PrepareExchangeAsClientAsync as part of the ConnectingCancellationToken being cancelled + await protocolHandler(connection.Protocol, requestCancellationTokens.InProgressRequestCancellationToken).ConfigureAwait(false); } catch { @@ -79,17 +84,14 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C { await connection.DisposeAsync(); } - + if (connectionManager.IsDisposed) - { return; - } - throw; } // Only return the connection to the pool if all went well - await connectionManager.ReleaseConnectionAsync(ServiceEndpoint, connection, cancellationToken); + await connectionManager.ReleaseConnectionAsync(ServiceEndpoint, connection, requestCancellationTokens.InProgressRequestCancellationToken); } catch (AuthenticationException ex) { @@ -123,7 +125,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C // against all connections in the pool being bad if (i == 1) { - await connectionManager.ClearPooledConnectionsAsync(ServiceEndpoint, log, cancellationToken); + await connectionManager.ClearPooledConnectionsAsync(ServiceEndpoint, log, requestCancellationTokens.InProgressRequestCancellationToken); } } catch (IOException ex) when (ex.IsSocketConnectionReset()) diff --git a/source/Halibut/Transport/SecureListeningClient.cs b/source/Halibut/Transport/SecureListeningClient.cs index 0de893fdf..96eaf6f4c 100644 --- a/source/Halibut/Transport/SecureListeningClient.cs +++ b/source/Halibut/Transport/SecureListeningClient.cs @@ -5,10 +5,9 @@ using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Text; -using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; -using Halibut.Exceptions; +using Halibut.ServiceModel; using Halibut.Transport.Protocol; using Halibut.Util; @@ -39,7 +38,7 @@ public SecureListeningClient(ExchangeProtocolBuilder exchangeProtocolBuilder, public ServiceEndPoint ServiceEndpoint { get; } - public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, CancellationToken cancellationToken) + public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, RequestCancellationTokens requestCancellationTokens) { var retryInterval = ServiceEndpoint.RetryListeningSleepInterval; @@ -52,15 +51,8 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C { if (i > 0) { - try - { - await Task.Delay(retryInterval, cancellationToken).ConfigureAwait(false); - log.Write(EventType.OpeningNewConnection, $"Retrying connection to {ServiceEndpoint.Format()} - attempt #{i}."); - } - catch (Exception ex) when (cancellationToken.IsCancellationRequested) - { - throw new ConnectingRequestCancelledException(ex); - } + await Task.Delay(retryInterval, requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); + log.Write(EventType.OpeningNewConnection, $"Retrying connection to {ServiceEndpoint.Format()} - attempt #{i}."); } try @@ -70,31 +62,19 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C IConnection connection = null; try { - try - { - connection = await connectionManager.AcquireConnectionAsync( - exchangeProtocolBuilder, - tcpConnectionFactory, - ServiceEndpoint, - log, - cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) when (cancellationToken.IsCancellationRequested) - { - throw new ConnectingRequestCancelledException(ex); - } + connection = await connectionManager.AcquireConnectionAsync( + exchangeProtocolBuilder, + tcpConnectionFactory, + ServiceEndpoint, + log, + requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); // Beyond this point, we have no way to be certain that the server hasn't tried to process a request; therefore, we can't retry after this point retryAllowed = false; - try - { - await protocolHandler(connection.Protocol, cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) when (cancellationToken.IsCancellationRequested) - { - throw new TransferringRequestCancelledException(ex); - } + // TODO: Enhancement: Pass the RequestCancellationTokens to the protocol handler so that it can cancel + // PrepareExchangeAsClientAsync as part of the ConnectingCancellationToken being cancelled + await protocolHandler(connection.Protocol, requestCancellationTokens.InProgressRequestCancellationToken).ConfigureAwait(false); } catch { @@ -109,7 +89,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C } // Only return the connection to the pool if all went well - await connectionManager.ReleaseConnectionAsync(ServiceEndpoint, connection, cancellationToken); + await connectionManager.ReleaseConnectionAsync(ServiceEndpoint, connection, requestCancellationTokens.InProgressRequestCancellationToken); } catch (AuthenticationException ex) { @@ -148,7 +128,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C // against all connections in the pool being bad if (i == 1) { - await connectionManager.ClearPooledConnectionsAsync(ServiceEndpoint, log, cancellationToken); + await connectionManager.ClearPooledConnectionsAsync(ServiceEndpoint, log, requestCancellationTokens.InProgressRequestCancellationToken); } } catch (IOException ex) when (ex.IsSocketConnectionReset()) diff --git a/source/Halibut/Transport/SecureWebSocketClient.cs b/source/Halibut/Transport/SecureWebSocketClient.cs index 588cba551..c3a1f0bdf 100644 --- a/source/Halibut/Transport/SecureWebSocketClient.cs +++ b/source/Halibut/Transport/SecureWebSocketClient.cs @@ -50,7 +50,7 @@ public SecureWebSocketClient(ExchangeProtocolBuilder protocolBuilder, public ServiceEndPoint ServiceEndpoint => serviceEndpoint; - public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, CancellationToken cancellationToken) + public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, RequestCancellationTokens requestCancellationTokens) { var retryInterval = ServiceEndpoint.RetryListeningSleepInterval; @@ -63,7 +63,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C { if (i > 0) { - await Task.Delay(retryInterval, cancellationToken).ConfigureAwait(false); + await Task.Delay(retryInterval, requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); log.Write(EventType.OpeningNewConnection, $"Retrying connection to {serviceEndpoint.Format()} - attempt #{i}."); } @@ -79,11 +79,14 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C new WebSocketConnectionFactory(clientCertificate, halibutTimeoutsAndLimits, streamFactory), serviceEndpoint, log, - cancellationToken).ConfigureAwait(false); + requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); // Beyond this point, we have no way to be certain that the server hasn't tried to process a request; therefore, we can't retry after this point retryAllowed = false; - await protocolHandler(connection.Protocol, cancellationToken).ConfigureAwait(false); + + // TODO: Enhancement: Pass the RequestCancellationTokens to the protocol handler so that it can cancel + // PrepareExchangeAsClientAsync as part of the ConnectingCancellationToken being cancelled + await protocolHandler(connection.Protocol, requestCancellationTokens.InProgressRequestCancellationToken).ConfigureAwait(false); } catch { @@ -96,7 +99,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C } // Only return the connection to the pool if all went well - await connectionManager.ReleaseConnectionAsync(serviceEndpoint, connection, cancellationToken); + await connectionManager.ReleaseConnectionAsync(serviceEndpoint, connection, requestCancellationTokens.InProgressRequestCancellationToken); } catch (AuthenticationException aex) { @@ -133,7 +136,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, C // against all connections in the pool being bad if (i == 1) { - await connectionManager.ClearPooledConnectionsAsync(serviceEndpoint, log, cancellationToken); + await connectionManager.ClearPooledConnectionsAsync(serviceEndpoint, log, requestCancellationTokens.InProgressRequestCancellationToken); } } diff --git a/source/Halibut/Transport/Streams/NetworkTimeoutStream.cs b/source/Halibut/Transport/Streams/NetworkTimeoutStream.cs index 07311d881..94bf60407 100644 --- a/source/Halibut/Transport/Streams/NetworkTimeoutStream.cs +++ b/source/Halibut/Transport/Streams/NetworkTimeoutStream.cs @@ -15,8 +15,8 @@ namespace Halibut.Transport.Streams class NetworkTimeoutStream : AsyncStream { readonly Stream inner; - bool hasCancelledOrTimedOut = false; - Exception? cancellationOrTimeoutException = null; + bool hasTimedOut = false; + Exception? timeoutException = null; public NetworkTimeoutStream(Stream inner) { @@ -30,7 +30,7 @@ public override async ValueTask DisposeAsync() public override async Task FlushAsync(CancellationToken cancellationToken) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); await WrapWithCancellationAndTimeout( async ct => @@ -46,7 +46,7 @@ await WrapWithCancellationAndTimeout( public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return await WrapWithCancellationAndTimeout( async ct => await inner.ReadAsync(buffer, offset, count, ct), @@ -58,7 +58,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); await WrapWithCancellationAndTimeout( async ct => @@ -75,7 +75,7 @@ await WrapWithCancellationAndTimeout( #if !NETFRAMEWORK public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return await WrapWithCancellationAndTimeout( async ct => await inner.ReadAsync(buffer, ct), @@ -87,7 +87,7 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); await WrapWithCancellationAndTimeout( async ct => @@ -109,7 +109,7 @@ public override void Close() public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); await base.CopyToAsync(destination, bufferSize, cancellationToken); } @@ -131,14 +131,14 @@ public override int Read(byte[] buffer, int offset, int count) public override long Seek(long offset, SeekOrigin origin) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.Seek(offset, origin); } public override void SetLength(long value) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); inner.SetLength(value); } @@ -156,14 +156,14 @@ public override void WriteByte(byte value) #if !NETFRAMEWORK public override void CopyTo(Stream destination, int bufferSize) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); base.CopyTo(destination, bufferSize); } public override int Read(Span buffer) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); try { @@ -183,7 +183,7 @@ public override int Read(Span buffer) public override void Write(ReadOnlySpan buffer) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); try { @@ -205,14 +205,14 @@ public override void Write(ReadOnlySpan buffer) #if NETFRAMEWORK public override ObjRef CreateObjRef(Type requestedType) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.CreateObjRef(requestedType); } public override object? InitializeLifetimeService() { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.InitializeLifetimeService(); } @@ -222,12 +222,12 @@ public override int ReadTimeout { get { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.ReadTimeout; } set { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); inner.ReadTimeout = value; } } @@ -236,12 +236,12 @@ public override int WriteTimeout { get { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.WriteTimeout; } set { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); inner.WriteTimeout = value; } } @@ -250,7 +250,7 @@ public override bool CanTimeout { get { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.CanTimeout; } } @@ -259,7 +259,7 @@ public override bool CanRead { get { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.CanRead; } } @@ -268,7 +268,7 @@ public override bool CanSeek { get { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.CanSeek; } } @@ -277,7 +277,7 @@ public override bool CanWrite { get { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.CanWrite; } } @@ -286,7 +286,7 @@ public override long Length { get { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.Length; } } @@ -295,12 +295,12 @@ public override long Position { get { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); return inner.Position; } set { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); inner.Position = value; } } @@ -329,10 +329,15 @@ async Task WrapWithCancellationAndTimeout( async Task SafelyDisposeStream(Exception exception) { - cancellationOrTimeoutException = exception; - hasCancelledOrTimedOut = true; - - await Try.CatchingError(async () => await DisposeAsync(), _ => { }); + try + { + timeoutException = exception; + hasTimedOut = true; + await inner.DisposeAsync(); + } + catch + { + } } Exception CreateExceptionOnTimeout() @@ -344,7 +349,7 @@ Exception CreateExceptionOnTimeout() void TryCloseOnTimeout(Action action) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); try { @@ -364,7 +369,7 @@ void TryCloseOnTimeout(Action action) T TryCloseOnTimeout(Func action) { - ThrowIfAlreadyCancelledOrTimedOut(); + ThrowIfAlreadyTimedOut(); try { @@ -384,8 +389,8 @@ T TryCloseOnTimeout(Func action) void CloseOnTimeout(Exception ex) { - cancellationOrTimeoutException = ex; - hasCancelledOrTimedOut = true; + timeoutException = ex; + hasTimedOut = true; inner.Close(); } @@ -399,11 +404,11 @@ static bool IsTimeoutException(Exception exception) return exception.InnerException != null && IsTimeoutException(exception.InnerException); } - void ThrowIfAlreadyCancelledOrTimedOut() + void ThrowIfAlreadyTimedOut() { - if (hasCancelledOrTimedOut) + if (hasTimedOut) { - throw cancellationOrTimeoutException ?? new SocketException((int)SocketError.TimedOut); + throw timeoutException ?? new SocketException((int)SocketError.TimedOut); } } } diff --git a/source/Halibut/Util/Try.cs b/source/Halibut/Util/Try.cs index 4c01d353c..d45fedcf1 100644 --- a/source/Halibut/Util/Try.cs +++ b/source/Halibut/Util/Try.cs @@ -1,6 +1,5 @@ using System; using System.IO; -using System.Threading.Tasks; namespace Halibut.Util { @@ -18,18 +17,6 @@ public static void CatchingError(Action tryThisAction, Action onFailu } } - public static async Task CatchingError(Func tryThisAction, Action onFailure) - { - try - { - await tryThisAction(); - } - catch (Exception e) - { - onFailure(e); - } - } - public static SilentStreamDisposer CatchingErrorOnDisposal(Stream streamToDispose, Action onFailure) { return new SilentStreamDisposer(streamToDispose, onFailure); diff --git a/source/Octopus.TestPortForwarder/PortForwarder.cs b/source/Octopus.TestPortForwarder/PortForwarder.cs index 736d29b25..83806e5a6 100644 --- a/source/Octopus.TestPortForwarder/PortForwarder.cs +++ b/source/Octopus.TestPortForwarder/PortForwarder.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using Serilog; using System.Diagnostics.CodeAnalysis; -using Serilog.Core; namespace Octopus.TestPortForwarder { @@ -222,7 +221,6 @@ public void UnPauseExistingConnections() public void PauseExistingConnections() { - logger.Information("Pausing existing connections"); lock (pumps) { foreach (var pump in pumps)