From 7c7a92eae91dd2a885ba73a189a4ea997f9b96d7 Mon Sep 17 00:00:00 2001 From: skwasjer <11424653+skwasjer@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:22:48 +0200 Subject: [PATCH] chore: replace our own TaskHelpers with a more robust impl., adapted from RestSharp/Rebus. We cannot really avoid doing sync over async in some areas, specifically in older .NET SDK's which do not have sync HttpClient API's. Perhaps this also fixes our CI issues of random AppDomain unloads as well when running unit tests (because of xunit's own AppDomain/SynchronizationContext logic). --- .../Extensions/ResponseBuilderExtensions.cs | 2 +- src/MockHttp/MockHttpHandler.cs | 2 +- src/MockHttp/Threading/AsyncHelpers.cs | 167 ++++++++++++++++++ src/MockHttp/Threading/TaskHelpers.cs | 58 ------ 4 files changed, 169 insertions(+), 60 deletions(-) create mode 100644 src/MockHttp/Threading/AsyncHelpers.cs delete mode 100644 src/MockHttp/Threading/TaskHelpers.cs diff --git a/src/MockHttp/Extensions/ResponseBuilderExtensions.cs b/src/MockHttp/Extensions/ResponseBuilderExtensions.cs index 3bcff83f..8ae76e82 100644 --- a/src/MockHttp/Extensions/ResponseBuilderExtensions.cs +++ b/src/MockHttp/Extensions/ResponseBuilderExtensions.cs @@ -54,7 +54,7 @@ public static IWithContentResult Body(this IWithContent builder, HttpContent htt #if NET6_0_OR_GREATER using Stream stream = httpContent.ReadAsStream(); #else - using Stream stream = Threading.TaskHelpers.RunSync(httpContent.ReadAsStreamAsync, TimeSpan.FromMinutes(1)); + using Stream stream = Threading.AsyncHelpers.RunSync(httpContent.ReadAsStreamAsync); #endif return (IWithContentResult)BufferedStreamBody(builder, stream) .Headers(httpContent.Headers.Select(kvp => new KeyValuePair>(kvp.Key, kvp.Value))); diff --git a/src/MockHttp/MockHttpHandler.cs b/src/MockHttp/MockHttpHandler.cs index baf1575a..1aa48c41 100644 --- a/src/MockHttp/MockHttpHandler.cs +++ b/src/MockHttp/MockHttpHandler.cs @@ -147,7 +147,7 @@ public void Verify(Action matching, Func times, string? /// public void Verify(Action matching, IsSent times, string? because = null) { - TaskHelpers.RunSync(() => VerifyAsync(matching, times, because), TimeSpan.FromSeconds(30)); + AsyncHelpers.RunSync(() => VerifyAsync(matching, times, because)); } /// diff --git a/src/MockHttp/Threading/AsyncHelpers.cs b/src/MockHttp/Threading/AsyncHelpers.cs new file mode 100644 index 00000000..65610deb --- /dev/null +++ b/src/MockHttp/Threading/AsyncHelpers.cs @@ -0,0 +1,167 @@ +// Copyright (c) .NET Foundation and Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from Rebus +// Adapted from RestSharp (sha: 159c8a79963b): +// - dispose ManualResetEvent +// - added DebuggerStepThroughAttribute + +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.ExceptionServices; + +namespace MockHttp.Threading; + +[DebuggerStepThrough] +internal static class AsyncHelpers +{ + /// + /// Executes a task synchronously on the calling thread by installing a temporary synchronization context that queues continuations + /// + /// Callback for asynchronous task to run + public static void RunSync(Func task) + { + SynchronizationContext? currentContext = SynchronizationContext.Current; + using var customContext = new CustomSynchronizationContext(task); + + try + { + SynchronizationContext.SetSynchronizationContext(customContext); + customContext.Run(); + } + finally + { + SynchronizationContext.SetSynchronizationContext(currentContext); + } + } + + /// + /// Executes a task synchronously on the calling thread by installing a temporary synchronization context that queues continuations + /// + /// Callback for asynchronous task to run + /// Return type for the task + /// Return value from the task + public static T RunSync(Func> task) + { + T result = default!; +#pragma warning disable CA2007 + RunSync(async () => { result = await task(); }); +#pragma warning restore CA2007 + return result; + } + + /// + /// Synchronization context that can be "pumped" in order to have it execute continuations posted back to it + /// + private sealed class CustomSynchronizationContext : SynchronizationContext, IDisposable + { + private readonly ConcurrentQueue> _items = new(); + private readonly Func _task; + private readonly AutoResetEvent _workItemsWaiting = new(false); + private ExceptionDispatchInfo? _caughtException; + private bool _done; + + /// + /// Constructor for the custom context + /// + /// Task to execute + public CustomSynchronizationContext(Func task) + { + _task = task ?? throw new ArgumentNullException(nameof(task), "Please remember to pass a Task to be executed"); + } + + public void Dispose() + { + _workItemsWaiting.Dispose(); + } + + /// + /// When overridden in a derived class, dispatches an asynchronous message to a synchronization context. + /// + /// Callback function + /// Callback state + public override void Post(SendOrPostCallback function, object? state) + { + _items.Enqueue(Tuple.Create(function, state)); + _workItemsWaiting.Set(); + } + + /// + /// Enqueues the function to be executed and executes all resulting continuations until it is completely done + /// + public void Run() + { + Post(PostCallback, null); + + while (!_done) + { + if (_items.TryDequeue(out Tuple? task)) + { + task.Item1(task.Item2); + if (_caughtException is null) + { + continue; + } + + _caughtException.Throw(); + } + else + { + _workItemsWaiting.WaitOne(); + } + } + + return; + + async void PostCallback(object? _) + { + try + { + await _task().ConfigureAwait(false); + } + catch (Exception exception) + { + _caughtException = ExceptionDispatchInfo.Capture(exception); + throw; + } + finally + { + Post(_ => _done = true, null); + } + } + } + + /// + /// When overridden in a derived class, dispatches a synchronous message to a synchronization context. + /// + /// Callback function + /// Callback state + [ExcludeFromCodeCoverage] + public override void Send(SendOrPostCallback function, object? state) + { + throw new NotSupportedException("Cannot send to same thread"); + } + + /// + /// When overridden in a derived class, creates a copy of the synchronization context. Not needed, so just return ourselves. + /// + /// Copy of the context + [ExcludeFromCodeCoverage] + public override SynchronizationContext CreateCopy() + { + return this; + } + } +} diff --git a/src/MockHttp/Threading/TaskHelpers.cs b/src/MockHttp/Threading/TaskHelpers.cs deleted file mode 100644 index db0d4c58..00000000 --- a/src/MockHttp/Threading/TaskHelpers.cs +++ /dev/null @@ -1,58 +0,0 @@ -using System.Runtime.ExceptionServices; - -namespace MockHttp.Threading; - -internal static class TaskHelpers -{ - public static T RunSync(Func> action, TimeSpan timeout) - { - Task? task = null; - - RunSync(() => - { - task = action(); - return (Task)task; - }, - timeout); - - return task is null ? default! : task.Result; - } - - public static void RunSync(Func action, TimeSpan timeout) - { - if (SynchronizationContext.Current is null) - { - RunSyncAndWait(action, timeout); - } - else - { - RunSyncAndWait(() => Task.Factory.StartNew(action, - CancellationToken.None, - TaskCreationOptions.None, - TaskScheduler.Default - ) - .Unwrap(), - timeout); - } - } - - private static void RunSyncAndWait(Func action, TimeSpan timeout) - { - try - { - action().Wait(timeout); - } - catch (AggregateException ex) - { - AggregateException flattened = ex.Flatten(); - if (flattened.InnerExceptions.Count == 1) - { - ExceptionDispatchInfo.Capture(ex.InnerException!).Throw(); - } - else - { - throw; - } - } - } -}