Skip to content

Commit

Permalink
Thread-based DefaultParallelRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
nrader95 committed Jul 3, 2023
1 parent 34470a3 commit d8929f3
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 141 deletions.
86 changes: 0 additions & 86 deletions source/DefaultEcs/Internal/Threading/WorkerBarrier.cs

This file was deleted.

140 changes: 87 additions & 53 deletions source/DefaultEcs/Threading/DefaultParallelRunner.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DefaultEcs.Internal.Threading;

namespace DefaultEcs.Threading
{
/// <summary>
/// Represents an object used to run an <see cref="IParallelRunnable"/> by using multiple <see cref="Task"/>.
/// Represents an object used to run an <see cref="IParallelRunnable"/> by using multiple <see cref="Thread"/>.
/// </summary>
public sealed class DefaultParallelRunner : IParallelRunner
{
#region Fields

internal static readonly DefaultParallelRunner Default = new(1);

private readonly CancellationTokenSource _disposeHandle;
private readonly WorkerBarrier _barrier;
private readonly Task[] _tasks;
private readonly object _syncObject = new object();

Check warning on line 15 in source/DefaultEcs/Threading/DefaultParallelRunner.cs

View workflow job for this annotation

GitHub Actions / pull_request

'new' expression can be simplified (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0090)
private readonly ManualResetEventSlim _workStartEvent;
private readonly Thread[] _threads;
private readonly bool[] _threadsWorkState;

private IParallelRunnable _currentRunnable;
private volatile bool _isAlive;
private volatile int _pendingTasks;
private volatile IParallelRunnable _currentRunnable;

#endregion

Expand All @@ -29,48 +28,69 @@ public sealed class DefaultParallelRunner : IParallelRunner
/// <summary>
/// Initialises a new instance of the <see cref="DefaultParallelRunner"/> class.
/// </summary>
/// <param name="degreeOfParallelism">The number of concurrent <see cref="Task"/> used to update an <see cref="IParallelRunnable"/> in parallel.</param>
/// <param name="degreeOfParallelism">The number of concurrent <see cref="Thread"/> used to update an <see cref="IParallelRunnable"/> in parallel.</param>
/// <param name="threadNamePrefix"> Name prefix for the threads in case you have more than one runner and need to mark them. </param>
/// <exception cref="ArgumentException"><paramref name="degreeOfParallelism"/> cannot be inferior to one.</exception>
public DefaultParallelRunner(int degreeOfParallelism)
public DefaultParallelRunner(int degreeOfParallelism, string threadNamePrefix = null)
{
IEnumerable<int> indices = degreeOfParallelism >= 1 ? Enumerable.Range(0, degreeOfParallelism - 1) : throw new ArgumentException("Argument cannot be inferior to one", nameof(degreeOfParallelism));

_disposeHandle = new CancellationTokenSource();
_tasks = indices.Select(index => new Task(Update, index, TaskCreationOptions.LongRunning)).ToArray();
_barrier = degreeOfParallelism > 1 ? new WorkerBarrier(_tasks.Length) : null;

foreach (Task task in _tasks)
if (0 >= degreeOfParallelism)
throw new ArgumentException($"Argument {nameof(degreeOfParallelism)} cannot be inferior to one!");

_isAlive = true;
threadNamePrefix ??= "";
_workStartEvent = new ManualResetEventSlim(initialState: false);
_threads = new Thread[degreeOfParallelism - 1];
_threadsWorkState = new bool[_threads.Length];
for (int threadIndex = 0; _threads.Length > threadIndex; threadIndex++)
{
task.Start(TaskScheduler.Default);
_threads[threadIndex] = new Thread(new ParameterizedThreadStart(this.ThreadExecutionLoop));
Thread newThread = _threads[threadIndex];
newThread.Name = threadNamePrefix + $"{nameof(DefaultParallelRunner)} worker {threadIndex + 1}";
newThread.IsBackground = true;
newThread.Start(threadIndex);
}
}

#endregion

#region Methods

private void Update(object state)
private void ThreadExecutionLoop(object initObject)
{
int index = (int)state;

#if !NETSTANDARD1_1
Thread.CurrentThread.Name = $"{nameof(DefaultParallelRunner)} worker {index + 1}";
#endif

goto Start;

Work:
Volatile.Read(ref _currentRunnable).Run(index, _tasks.Length);

_barrier.Signal();

Start:
_barrier.Start();
int workerIndex = (int)initObject;
while (_isAlive)
{
_workStartEvent.Wait();
if (!_isAlive) return;
if (!_threadsWorkState[workerIndex]) continue;

try
{
_currentRunnable?.Run(workerIndex, _threads.Length);
}
finally
{
lock (_syncObject)
{
_pendingTasks--;
_threadsWorkState[workerIndex] = false;
}
}
}
}

if (!_disposeHandle.IsCancellationRequested)
private bool IsAllThreadsStopped()
{
bool result = true;
foreach (Thread thread in _threads)
{
goto Work;
if (thread.ThreadState == ThreadState.Running)
{
result = false;
break;
}
}
return result;
}

#endregion
Expand All @@ -80,23 +100,39 @@ private void Update(object state)
/// <summary>
/// Gets the degree of parallelism used to run an <see cref="IParallelRunnable"/>.
/// </summary>
public int DegreeOfParallelism => _tasks.Length + 1;
public int DegreeOfParallelism => _threads.Length + 1;

/// <summary>
/// Runs the provided <see cref="IParallelRunnable"/>.
/// </summary>
/// <param name="runnable">The <see cref="IParallelRunnable"/> to run.</param>
public void Run(IParallelRunnable runnable)
{
if (!_isAlive) throw new InvalidOperationException("Runner was already disposed!");
runnable.ThrowIfNull();

Volatile.Write(ref _currentRunnable, runnable);

_barrier?.StartWorkers();

runnable.Run(_tasks.Length, _tasks.Length);

_barrier?.WaitForWorkers();
_currentRunnable = runnable;
if (_threads.Length > 0)
{
_pendingTasks = _threads.Length;
_threadsWorkState.Fill<bool>(true);
_workStartEvent.Set();
try
{
_currentRunnable.Run(index: _threads.Length, maxIndex: _threads.Length);
}
finally
{
SpinWait.SpinUntil(() => _pendingTasks == 0);
_workStartEvent.Reset();
SpinWait.SpinUntil(IsAllThreadsStopped);
}
}
else
{
_currentRunnable.Run(index: _threads.Length, maxIndex: _threads.Length);
}
_currentRunnable = null;
}

#endregion
Expand All @@ -108,14 +144,12 @@ public void Run(IParallelRunnable runnable)
/// </summary>
public void Dispose()
{
_disposeHandle.Cancel();

_barrier?.StartWorkers();

Task.WaitAll(_tasks);

_barrier?.Dispose();
_disposeHandle.Dispose();
if (!_isAlive) return;
_isAlive = false;
_currentRunnable = null;
_workStartEvent.Set();
Thread.Sleep(millisecondsTimeout: 1);
_workStartEvent.Dispose();
}

#endregion
Expand Down
7 changes: 5 additions & 2 deletions source/DefaultEcs/Threading/IParallelRunnable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ public interface IParallelRunnable
/// <summary>
/// Runs the part <paramref name="index"/> out of <paramref name="maxIndex"/> of the process.
/// </summary>
/// <param name="index"></param>
/// <param name="maxIndex"></param>
/// <param name="index">
/// Index of given ecs worker. <para/>

Check warning on line 12 in source/DefaultEcs/Threading/IParallelRunnable.cs

View workflow job for this annotation

GitHub Actions / pull_request

Remove trailing white-space. (http://pihrt.net/roslynator/analyzer?id=RCS1037)
/// Using same index as <paramref name="maxIndex"/> for main thread is preferable to process leftover entities there.
/// </param>
/// <param name="maxIndex">Max index for ecs workers</param>
void Run(int index, int maxIndex);
}
}

0 comments on commit d8929f3

Please sign in to comment.