Skip to content

Commit

Permalink
Merge pull request #431 from timcassell/develop
Browse files Browse the repository at this point in the history
Merge develop to master
  • Loading branch information
timcassell authored Apr 12, 2024
2 parents 0e0b4b6 + b4831cb commit d9f6d11
Show file tree
Hide file tree
Showing 13 changed files with 688 additions and 323 deletions.
12 changes: 12 additions & 0 deletions Docs/Changelog/v3.0.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Change Log

## v3.0.2 - April 12, 2024

Fixes:

- Fixed `Promise.ParallelFor*` canceling workers too early.
- `Promise.ParallelFor*` and `AsyncEnumerable.Merge` propagate exceptions from cancelation token callbacks instead of deadlocking.

Misc:

- `AsyncEnumerable.Merge` more eagerly stops iteration if a rejection or cancelation occurs.
147 changes: 86 additions & 61 deletions Package/Core/Linq/Internal/MergeInternal.cs

Large diffs are not rendered by default.

80 changes: 50 additions & 30 deletions Package/Core/Promises/Internal/ParallelAsyncInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ internal sealed partial class PromiseParallelForEachAsync<TParallelBody, TSource
private int _waitCounter;
private List<Exception> _exceptions;
private Promise.State _completionState;
private bool _stopExecuting;
// We need an async lock to lock around MoveNextAsync.
// We just use a counter instead of AsyncLock, and implement the locking algorithm directly.
// This is possible because we only use the lock once in a single code path.
Expand Down Expand Up @@ -87,6 +88,7 @@ internal static PromiseParallelForEachAsync<TParallelBody, TSource> GetOrCreate(
promise._synchronizationContext = synchronizationContext ?? BackgroundSynchronizationContextSentinel.s_instance;
promise._remainingAvailableWorkers = maxDegreeOfParallelism;
promise._completionState = Promise.State.Resolved;
promise._stopExecuting = false;
promise._lockAndLaunchNext = 0;
var cancelRef = CancelationRef.GetOrCreate();
promise._cancelationRef = cancelRef;
Expand All @@ -112,7 +114,7 @@ internal override void MaybeDispose()
public void Cancel()
{
_completionState = Promise.State.Canceled;
_cancelationRef.Cancel();
CancelWorkers();
}

private bool TryEnterLockAndStoreLaunchNext()
Expand Down Expand Up @@ -171,6 +173,7 @@ private bool ExitLockAndGetLaunchNext()

private void ExitLockComplete()
{
_stopExecuting = true;
// There are no more iterations to execute. We have full control of the lock,
// so we don't need to schedule continuations, and we simply subtract all waiting workers.
unchecked
Expand Down Expand Up @@ -254,14 +257,14 @@ private void ExecuteWorker(bool fromMoveNext)
catch (OperationCanceledException)
{
_completionState = Promise.State.Canceled;
MaybeComplete(1);
CancelWorkersAndMaybeComplete(1);
}
catch (Exception e)
{
// Record the failure and then don't let the exception propagate. The last worker to complete
// will propagate exceptions as is appropriate to the top-level promise.
RecordException(e);
MaybeComplete(1);
CancelWorkersAndMaybeComplete(1);
}
ClearCurrentInvoker();
}
Expand All @@ -279,7 +282,7 @@ private void WorkerBody(bool fromMoveNext)
LoopStart:
// Get the next element from the enumerator. This requires locking around MoveNextAsync/Current.
// We already have acquired the lock at this point, either from the caller, or from the end of the loop.
if (_cancelationRef._state >= CancelationRef.State.Canceled)
if (_stopExecuting)
{
ExitLockComplete();
return;
Expand Down Expand Up @@ -384,11 +387,9 @@ internal override void Handle(PromiseRefBase handler, Promise.State state)
return;
}

if (state == Promise.State.Canceled)
{
_completionState = Promise.State.Canceled;
}
else
_completionState = state;
CancelWorkers();
if (state == Promise.State.Rejected)
{
// Record the failure. The last worker to complete will propagate exceptions as is appropriate to the top-level promise.
RecordRejection(rejectContainer);
Expand Down Expand Up @@ -420,38 +421,57 @@ private void RecordException(Exception e)
}
}

private void MaybeComplete(int completeCount)
private void CancelWorkers()
{
// We cancel the source to notify completion to other threads.
_stopExecuting = true;
// We cancel the source to notify the workers that they don't need to continue processing.
// This may be called multiple times. It's fine because it checks internally if it's already canceled.
_cancelationRef.Cancel();
try
{
_cancelationRef.Cancel();
}
catch (Exception e)
{
RecordException(e);
}
}

private void CancelWorkersAndMaybeComplete(int completeCount)
{
CancelWorkers();
MaybeComplete(completeCount);
}

private void MaybeComplete(int completeCount)
{
// If we're the last worker to complete, clean up and complete the operation.
if (InterlockedAddWithUnsignedOverflowCheck(ref _waitCounter, -completeCount) == 0)
if (InterlockedAddWithUnsignedOverflowCheck(ref _waitCounter, -completeCount) != 0)
{
_externalCancelationRegistration.Dispose();
_externalCancelationRegistration = default;
_cancelationRef.TryDispose(_cancelationRef.SourceId);
_cancelationRef = null;

Promise disposePromise;
try
{
disposePromise = _asyncEnumerator.DisposeAsync();
}
catch (Exception e)
{
RecordException(e);
OnComplete();
return;
}
return;
}

HookUpDisposeAsync(disposePromise);
Promise disposePromise;
try
{
disposePromise = _asyncEnumerator.DisposeAsync();
}
catch (Exception e)
{
RecordException(e);
OnComplete();
return;
}

HookUpDisposeAsync(disposePromise);
}

private void OnComplete()
{
_externalCancelationRegistration.Dispose();
_externalCancelationRegistration = default;
_cancelationRef.TryDispose(_cancelationRef.SourceId);
_cancelationRef = null;

// Finally, complete the promise returned to the ParallelForEachAsync caller.
// This must be the very last thing done.
if (_exceptions != null)
Expand Down
120 changes: 72 additions & 48 deletions Package/Core/Promises/Internal/ParallelInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#pragma warning disable IDE0251 // Make member 'readonly'

using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
Expand All @@ -19,7 +18,7 @@ partial class Internal
{
internal interface IParallelEnumerator<T> : IDisposable
{
bool TryMoveNext(object lockObj, CancelationRef cancelationRef, out T value);
bool TryMoveNext(object lockObj, ref bool stopExecuting, out T value);
}

#if !PROTO_PROMISE_DEVELOPER_MODE
Expand All @@ -38,23 +37,25 @@ internal ForLoopEnumerator(int fromIndex, int toIndex)
}

[MethodImpl(InlineOption)]
public bool TryMoveNext(object lockObj, CancelationRef cancelationRef, out int value)
public bool TryMoveNext(object lockObj, ref bool stopExecuting, out int value)
{
int current;
do
int current = _current;
while (true)
{
// cancelationRef._state is a volatile read, so we don't need to volatile read _current.
var canceled = cancelationRef._state >= CancelationRef.State.Canceled;
current = _current;
if (current >= _toIndex | canceled)
// Interlocked.CompareExchange has an implicit memory barrier, so we can get away without volatile read of stopExecuting.
if (current >= _toIndex | stopExecuting)
{
value = 0;
return false;
}
} while (Interlocked.CompareExchange(ref _current, current + 1, current) != current);

value = current;
return true;
int oldValue = Interlocked.CompareExchange(ref _current, current + 1, current);
if (oldValue == current)
{
value = current;
return true;
}
current = oldValue;
}
}

[MethodImpl(InlineOption)]
Expand All @@ -79,13 +80,14 @@ internal GenericEnumerator(TEnumerator enumerator)
}

[MethodImpl(InlineOption)]
public bool TryMoveNext(object lockObj, CancelationRef cancelationRef, out T value)
public bool TryMoveNext(object lockObj, ref bool stopExecuting, out T value)
{
// Get the next element from the enumerator. This requires locking around MoveNext/Current.
lock (lockObj)
{
if (cancelationRef._state >= CancelationRef.State.Canceled || !_enumerator.MoveNext())
if (stopExecuting || !_enumerator.MoveNext())
{
stopExecuting = true;
// Exit the lock before writing the value.
goto ReturnFalse;
}
Expand Down Expand Up @@ -221,6 +223,7 @@ internal sealed partial class PromiseParallelForEach<TEnumerator, TParallelBody,
private int _waitCounter;
private List<Exception> _exceptions;
private Promise.State _completionState;
private bool _stopExecuting;

private PromiseParallelForEach() { }

Expand All @@ -242,6 +245,7 @@ internal static PromiseParallelForEach<TEnumerator, TParallelBody, TSource> GetO
promise._synchronizationContext = synchronizationContext ?? BackgroundSynchronizationContextSentinel.s_instance;
promise._remainingAvailableWorkers = maxDegreeOfParallelism;
promise._completionState = Promise.State.Resolved;
promise._stopExecuting = false;
promise._cancelationRef = CancelationRef.GetOrCreate();
cancelationToken.TryRegister(promise, out promise._externalCancelationRegistration);
if (Promise.Config.AsyncFlowExecutionContextEnabled)
Expand All @@ -264,7 +268,7 @@ internal override void MaybeDispose()
public void Cancel()
{
_completionState = Promise.State.Canceled;
_cancelationRef.Cancel();
CancelWorkers();
}

internal void MaybeLaunchWorker(bool launchWorker)
Expand Down Expand Up @@ -320,14 +324,14 @@ private void ExecuteWorker(bool launchNext)
catch (OperationCanceledException)
{
_completionState = Promise.State.Canceled;
MaybeComplete();
CancelWorkersAndMaybeComplete();
}
catch (Exception e)
{
// Record the failure and then don't let the exception propagate. The last worker to complete
// will propagate exceptions as is appropriate to the top-level promise.
RecordException(e);
MaybeComplete();
CancelWorkersAndMaybeComplete();
}
ClearCurrentInvoker();
}
Expand All @@ -337,7 +341,7 @@ private void WorkerBody(bool launchNext)
// The worker body. Each worker will execute this same body.
while (true)
{
if (!_enumerator.TryMoveNext(this, _cancelationRef, out var element))
if (!_enumerator.TryMoveNext(this, ref _stopExecuting, out var element))
{
MaybeComplete();
return;
Expand Down Expand Up @@ -386,10 +390,11 @@ internal override void Handle(PromiseRefBase handler, Promise.State state)
else if (state == Promise.State.Canceled)
{
_completionState = Promise.State.Canceled;
MaybeComplete();
CancelWorkersAndMaybeComplete();
}
else
{
CancelWorkers();
// Record the failure. The last worker to complete will propagate exceptions as is appropriate to the top-level promise.
var container = rejectContainer;
var exception = container.Value as Exception
Expand All @@ -408,41 +413,60 @@ private void RecordException(Exception e)
}
}

private void MaybeComplete()
private void CancelWorkers()
{
// We cancel the source to notify completion to other threads.
_stopExecuting = true;
// We cancel the source to notify the workers that they don't need to continue processing.
// This may be called multiple times. It's fine because it checks internally if it's already canceled.
_cancelationRef.Cancel();
try
{
_cancelationRef.Cancel();
}
catch (Exception e)
{
RecordException(e);
}
}

private void CancelWorkersAndMaybeComplete()
{
CancelWorkers();
MaybeComplete();
}

private void MaybeComplete()
{
// If we're the last worker to complete, clean up and complete the operation.
if (InterlockedAddWithUnsignedOverflowCheck(ref _waitCounter, -1) == 0)
if (InterlockedAddWithUnsignedOverflowCheck(ref _waitCounter, -1) != 0)
{
_externalCancelationRegistration.Dispose();
_externalCancelationRegistration = default;
_cancelationRef.TryDispose(_cancelationRef.SourceId);
_cancelationRef = null;
return;
}

try
{
_enumerator.Dispose();
}
catch (Exception e)
{
RecordException(e);
}
_externalCancelationRegistration.Dispose();
_externalCancelationRegistration = default;
_cancelationRef.TryDispose(_cancelationRef.SourceId);
_cancelationRef = null;

// Finally, complete the promise returned to the ParallelForEach caller.
// This must be the very last thing done.
if (_exceptions != null)
{
_rejectContainer = CreateRejectContainer(new AggregateException(_exceptions), int.MinValue, null, this);
_exceptions = null;
HandleNextInternal(Promise.State.Rejected);
}
else
{
HandleNextInternal(_completionState);
}
try
{
_enumerator.Dispose();
}
catch (Exception e)
{
RecordException(e);
}

// Finally, complete the promise returned to the ParallelForEach caller.
// This must be the very last thing done.
if (_exceptions != null)
{
_rejectContainer = CreateRejectContainer(new AggregateException(_exceptions), int.MinValue, null, this);
_exceptions = null;
HandleNextInternal(Promise.State.Rejected);
}
else
{
HandleNextInternal(_completionState);
}
}

Expand Down
Loading

0 comments on commit d9f6d11

Please sign in to comment.