Skip to content

Commit

Permalink
Revert cooperative cancellation (#560)
Browse files Browse the repository at this point in the history
* Revert "Fix a bug in cooperative cancellation for Polling Services causing a request to time out to quickly (#558)"

This reverts commit 79841cf.

* Revert "Co-operatively cancel all RPC Calls (#525)"

This reverts commit 1bde8c8.
  • Loading branch information
nathanwoctopusdeploy authored Nov 28, 2023
1 parent 65497aa commit d1757c4
Show file tree
Hide file tree
Showing 30 changed files with 539 additions and 579 deletions.
17 changes: 9 additions & 8 deletions source/Halibut.Tests/BadCertificatesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate_ButServiceIsCon
});

// Act
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);

// Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified?
Wait.UntilActionSucceeds(() => {
Expand Down Expand Up @@ -148,14 +148,14 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA
});

// Works normally
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None));

// Act
clientAndBuilder.Client.TrustOnly(new List<string>());

// Assert
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);

await Task.Delay(3000, CancellationToken);

Expand All @@ -164,7 +164,8 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA
var exception = await AssertionExtensions.Should(() => incrementCount).ThrowAsync<Exception>();

exception.And.Should().Match(e => e.GetType() == typeof(HalibutClientException)
|| e is OperationCanceledException);
|| e.GetType() == typeof(OperationCanceledException)
|| e.GetType() == typeof(TaskCanceledException));
}
}

Expand Down Expand Up @@ -212,8 +213,8 @@ public async Task FailWhenClientPresentsWrongCertificateToPollingService(ClientA
point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000);
});

var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);

Func<LogEvent, bool> hasExpectedLog = logEvent =>
logEvent.FormattedMessage.Contains("The server at")
&& logEvent.FormattedMessage.Contains("presented an unexpected security certificate");
Expand Down Expand Up @@ -269,7 +270,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate(ClientAndServic
point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(10);
});

var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);

// Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified?
Wait.UntilActionSucceeds(() => { AllLogs(serviceLoggers).Select(l => l.FormattedMessage).ToArray()
Expand Down
86 changes: 74 additions & 12 deletions source/Halibut.Tests/CancellationViaClientProxyFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Exceptions;
using Halibut.Logging;
using Halibut.ServiceModel;
using Halibut.Tests.Support;
using Halibut.Tests.Support.ExtensionMethods;
Expand All @@ -21,11 +21,27 @@ public class CancellationViaClientProxyFixture : BaseTest
[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
var tokenSourceToCancel = new CancellationTokenSource();
var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token);
var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None);

await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption);
}

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
var tokenSourceToCancel = new CancellationTokenSource();
var halibutRequestOption = new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token);

await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption);
}

async Task CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase, CancellationTokenSource tokenSourceToCancel, HalibutProxyRequestOptions halibutRequestOption)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithPortForwarding(port => PortForwarderUtil.ForwardingToLocalPort(port).Build())
.WithStandardServices()
Expand All @@ -39,19 +55,64 @@ public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_
point => point.TryAndConnectForALongTime());

tokenSourceToCancel.CancelAfter(TimeSpan.FromMilliseconds(100));

(await AssertAsync.Throws<Exception>(() => echo.IncrementAsync(halibutRequestOption)))
.And.Should().Match(x => x is ConnectingRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Connecting")));
.And.Message.Contains("The operation was canceled");

clientAndService.PortForwarder.ReturnToNormalMode();

await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken));
await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None));

(await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken)))
(await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)))
.Should().Be(1, "Since we cancelled the first call");
}
}

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanNotCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithHalibutLoggingLevel(LogLevel.Trace)
.WithStandardServices()
.Build(CancellationToken))
{
var lockService = clientAndService.CreateAsyncClient<ILockService, IAsyncClientLockServiceWithOptions>();

var tokenSourceToCancel = new CancellationTokenSource();
using var tmpDir = new TemporaryDirectory();
var fileThatOnceDeletedEndsTheCall = tmpDir.CreateRandomFile();
var callStartedFile = tmpDir.RandomFileName();

var inFlightRequest = Task.Run(async () => await lockService.WaitForFileToBeDeletedAsync(
fileThatOnceDeletedEndsTheCall,
callStartedFile,
new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None)));

Logger.Information("Waiting for the RPC call to be inflight");
while (!File.Exists(callStartedFile))
{
await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken);
}

// The call is now in flight. Call cancel on the cancellation token for that in flight request.
tokenSourceToCancel.Cancel();

// Give time for the cancellation to do something
await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken);

if (inFlightRequest.Status == TaskStatus.Faulted) await inFlightRequest;

inFlightRequest.IsCompleted.Should().Be(false, $"The cancellation token can not cancel in flight requests. Current state: {inFlightRequest.Status}");

File.Delete(fileThatOnceDeletedEndsTheCall);

// Now the lock is released we should be able to complete the request.
await inFlightRequest;
}
}

[Test]
// TODO: ASYNC ME UP!
// net48 does not support cancellation of the request as the DeflateStream ends up using Begin and End methods which don't get passed the cancellation token
Expand All @@ -62,7 +123,7 @@ public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
#endif
public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithStandardServices()
Expand All @@ -80,7 +141,7 @@ public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_
await lockService.WaitForFileToBeDeletedAsync(
fileThatOnceDeletedEndsTheCall,
callStartedFile,
new HalibutProxyRequestOptions(tokenSourceToCancel.Token));
new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token));
});

Logger.Information("Waiting for the RPC call to be inflight");
Expand All @@ -95,8 +156,9 @@ await lockService.WaitForFileToBeDeletedAsync(
// Give time for the cancellation to do something
await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken);

(await AssertionExtensions.Should(async () => await inFlightRequest).ThrowAsync<Exception>())
.And.Should().Match(x => x is TransferringRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Transferring")));
(await AssertionExtensions.Should(async () => await inFlightRequest)
.ThrowAsync<Exception>()).And
.Should().Match<Exception>(x => x is OperationCanceledException || (x.GetType() == typeof(HalibutClientException) && x.Message.Contains("The ReadAsync operation was cancelled")));
}
}

Expand Down Expand Up @@ -128,7 +190,7 @@ public async Task HalibutProxyRequestOptionsCanBeSentToLatestAndOldServicesThatP
{
var echo = clientAndService.CreateAsyncClient<IEchoService, IAsyncClientEchoServiceWithOptions>();

(await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken)))
(await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken, null)))
.Should()
.Be("Hello!!...");
}
Expand Down
112 changes: 0 additions & 112 deletions source/Halibut.Tests/PollingServiceTimeoutsFixture.cs

This file was deleted.

Loading

0 comments on commit d1757c4

Please sign in to comment.