diff --git a/source/DefaultEcs/Internal/Threading/WorkerBarrier.cs b/source/DefaultEcs/Internal/Threading/WorkerBarrier.cs deleted file mode 100644 index 91bf3975..00000000 --- a/source/DefaultEcs/Internal/Threading/WorkerBarrier.cs +++ /dev/null @@ -1,86 +0,0 @@ -using System; -using System.Runtime.CompilerServices; -using System.Threading; - -namespace DefaultEcs.Internal.Threading -{ - internal sealed class WorkerBarrier : IDisposable - { - #region Fields - - private readonly int _count; - private readonly ManualResetEventSlim _endHandle; - private readonly ManualResetEventSlim _startHandle; - - private bool _allStarted; - private int _runningCount; - - #endregion - - #region Initialisation - - public WorkerBarrier(int workerCount) - { - _count = workerCount; - _endHandle = new ManualResetEventSlim(false); - _startHandle = new ManualResetEventSlim(false); - - _allStarted = false; - _runningCount = 0; - } - - #endregion - - #region Methods - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void StartWorkers() - { - Volatile.Write(ref _allStarted, false); - _startHandle.Set(); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Start() - { - _startHandle.Wait(); - - if (Interlocked.Increment(ref _runningCount) == _count) - { - _startHandle.Reset(); - Volatile.Write(ref _allStarted, true); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Signal() - { - while (!Volatile.Read(ref _allStarted)) - { } - - if (Interlocked.Decrement(ref _runningCount) == 0) - { - _endHandle.Set(); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void WaitForWorkers() - { - _endHandle.Wait(); - _endHandle.Reset(); - } - - #endregion - - #region IDisposable - - public void Dispose() - { - _endHandle.Dispose(); - _startHandle.Dispose(); - } - - #endregion - } -} diff --git a/source/DefaultEcs/Threading/DefaultParallelRunner.cs b/source/DefaultEcs/Threading/DefaultParallelRunner.cs index adc5dd25..2acebbe4 100644 --- a/source/DefaultEcs/Threading/DefaultParallelRunner.cs +++ b/source/DefaultEcs/Threading/DefaultParallelRunner.cs @@ -1,14 +1,10 @@ using System; -using System.Collections.Generic; -using System.Linq; using System.Threading; -using System.Threading.Tasks; -using DefaultEcs.Internal.Threading; namespace DefaultEcs.Threading { /// - /// Represents an object used to run an by using multiple . + /// Represents an object used to run an by using multiple . /// public sealed class DefaultParallelRunner : IParallelRunner { @@ -16,11 +12,14 @@ public sealed class DefaultParallelRunner : IParallelRunner internal static readonly DefaultParallelRunner Default = new(1); - private readonly CancellationTokenSource _disposeHandle; - private readonly WorkerBarrier _barrier; - private readonly Task[] _tasks; + private readonly object _syncObject = new object(); + private readonly ManualResetEventSlim _workStartEvent; + private readonly Thread[] _threads; + private readonly bool[] _threadsWorkState; - private IParallelRunnable _currentRunnable; + private volatile bool _isAlive; + private volatile int _pendingTasks; + private volatile IParallelRunnable _currentRunnable; #endregion @@ -29,19 +28,26 @@ public sealed class DefaultParallelRunner : IParallelRunner /// /// Initialises a new instance of the class. /// - /// The number of concurrent used to update an in parallel. + /// The number of concurrent used to update an in parallel. + /// Name prefix for the threads in case you have more than one runner and need to mark them. /// cannot be inferior to one. - public DefaultParallelRunner(int degreeOfParallelism) + public DefaultParallelRunner(int degreeOfParallelism, string threadNamePrefix = null) { - IEnumerable indices = degreeOfParallelism >= 1 ? Enumerable.Range(0, degreeOfParallelism - 1) : throw new ArgumentException("Argument cannot be inferior to one", nameof(degreeOfParallelism)); - - _disposeHandle = new CancellationTokenSource(); - _tasks = indices.Select(index => new Task(Update, index, TaskCreationOptions.LongRunning)).ToArray(); - _barrier = degreeOfParallelism > 1 ? new WorkerBarrier(_tasks.Length) : null; - - foreach (Task task in _tasks) + if (0 >= degreeOfParallelism) + throw new ArgumentException($"Argument {nameof(degreeOfParallelism)} cannot be inferior to one!"); + + _isAlive = true; + threadNamePrefix ??= ""; + _workStartEvent = new ManualResetEventSlim(initialState: false); + _threads = new Thread[degreeOfParallelism - 1]; + _threadsWorkState = new bool[_threads.Length]; + for (int threadIndex = 0; _threads.Length > threadIndex; threadIndex++) { - task.Start(TaskScheduler.Default); + _threads[threadIndex] = new Thread(new ParameterizedThreadStart(this.ThreadExecutionLoop)); + Thread newThread = _threads[threadIndex]; + newThread.Name = threadNamePrefix + $"{nameof(DefaultParallelRunner)} worker {threadIndex + 1}"; + newThread.IsBackground = true; + newThread.Start(threadIndex); } } @@ -49,28 +55,42 @@ public DefaultParallelRunner(int degreeOfParallelism) #region Methods - private void Update(object state) + private void ThreadExecutionLoop(object initObject) { - int index = (int)state; - -#if !NETSTANDARD1_1 - Thread.CurrentThread.Name = $"{nameof(DefaultParallelRunner)} worker {index + 1}"; -#endif - - goto Start; - - Work: - Volatile.Read(ref _currentRunnable).Run(index, _tasks.Length); - - _barrier.Signal(); - - Start: - _barrier.Start(); + int workerIndex = (int)initObject; + while (_isAlive) + { + _workStartEvent.Wait(); + if (!_isAlive) return; + if (!_threadsWorkState[workerIndex]) continue; + + try + { + _currentRunnable?.Run(workerIndex, _threads.Length); + } + finally + { + lock (_syncObject) + { + _pendingTasks--; + _threadsWorkState[workerIndex] = false; + } + } + } + } - if (!_disposeHandle.IsCancellationRequested) + private bool IsAllThreadsStopped() + { + bool result = true; + foreach (Thread thread in _threads) { - goto Work; + if (thread.ThreadState == ThreadState.Running) + { + result = false; + break; + } } + return result; } #endregion @@ -80,7 +100,7 @@ private void Update(object state) /// /// Gets the degree of parallelism used to run an . /// - public int DegreeOfParallelism => _tasks.Length + 1; + public int DegreeOfParallelism => _threads.Length + 1; /// /// Runs the provided . @@ -88,15 +108,31 @@ private void Update(object state) /// The to run. public void Run(IParallelRunnable runnable) { + if (!_isAlive) throw new InvalidOperationException("Runner was already disposed!"); runnable.ThrowIfNull(); - Volatile.Write(ref _currentRunnable, runnable); - - _barrier?.StartWorkers(); - - runnable.Run(_tasks.Length, _tasks.Length); - - _barrier?.WaitForWorkers(); + _currentRunnable = runnable; + if (_threads.Length > 0) + { + _pendingTasks = _threads.Length; + _threadsWorkState.Fill(true); + _workStartEvent.Set(); + try + { + _currentRunnable.Run(index: _threads.Length, maxIndex: _threads.Length); + } + finally + { + SpinWait.SpinUntil(() => _pendingTasks == 0); + _workStartEvent.Reset(); + SpinWait.SpinUntil(IsAllThreadsStopped); + } + } + else + { + _currentRunnable.Run(index: _threads.Length, maxIndex: _threads.Length); + } + _currentRunnable = null; } #endregion @@ -108,14 +144,12 @@ public void Run(IParallelRunnable runnable) /// public void Dispose() { - _disposeHandle.Cancel(); - - _barrier?.StartWorkers(); - - Task.WaitAll(_tasks); - - _barrier?.Dispose(); - _disposeHandle.Dispose(); + if (!_isAlive) return; + _isAlive = false; + _currentRunnable = null; + _workStartEvent.Set(); + Thread.Sleep(millisecondsTimeout: 1); + _workStartEvent.Dispose(); } #endregion diff --git a/source/DefaultEcs/Threading/IParallelRunnable.cs b/source/DefaultEcs/Threading/IParallelRunnable.cs index d8baf78c..c34908a7 100644 --- a/source/DefaultEcs/Threading/IParallelRunnable.cs +++ b/source/DefaultEcs/Threading/IParallelRunnable.cs @@ -8,8 +8,11 @@ public interface IParallelRunnable /// /// Runs the part out of of the process. /// - /// - /// + /// + /// Index of given ecs worker. + /// Using same index as for main thread is preferable to process leftover entities there. + /// + /// Max index for ecs workers void Run(int index, int maxIndex); } }