From ae69c5fa0eceb1d39de7e2f76551d0c451d038f8 Mon Sep 17 00:00:00 2001 From: Marcel Koester Date: Sun, 30 Jul 2023 20:36:40 +0200 Subject: [PATCH] Added new ParallelCache and ParallelProcessingCache classes to support cached intermediate values during processing. --- Src/ILGPU/Util/ParallelCache.cs | 378 ++++++++++++++++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 Src/ILGPU/Util/ParallelCache.cs diff --git a/Src/ILGPU/Util/ParallelCache.cs b/Src/ILGPU/Util/ParallelCache.cs new file mode 100644 index 000000000..ecb4adbb9 --- /dev/null +++ b/Src/ILGPU/Util/ParallelCache.cs @@ -0,0 +1,378 @@ +// --------------------------------------------------------------------------------------- +// ILGPU +// Copyright (c) 2023 ILGPU Project +// www.ilgpu.net +// +// File: ParallelCache.cs +// +// This file is part of ILGPU and is distributed under the University of Illinois Open +// Source License. See LICENSE.txt for details. +// --------------------------------------------------------------------------------------- + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + +namespace ILGPU.Util +{ + /// + /// Represents a parallel object cache to be used in combination with a + /// for implementation to avoid unnecessary temporary object + /// creation. + /// + /// The type of the elements to cache. + public abstract class ParallelCache : DisposeBase, IParallelCache + where T : class + { + #region Instance + + private InlineList cache; + private InlineList used; + + /// + /// Creates a new parallel cache. + /// + /// + /// The initial number of processing threads (if any). + /// + protected ParallelCache(int? initialCapacity = null) + { + int capacity = initialCapacity ?? Environment.ProcessorCount * 2; + cache = InlineList.Create(capacity); + used = InlineList.Create(capacity); + + LocalInitializer = GetOrCreate; + LocalFinalizer = FinishProcessing; + } + + #endregion + + #region Properties + + /// + /// Returns the underlying sync root object. + /// + public object SyncRoot { get; } = new object(); + + /// + /// Returns the local initializer function. + /// + public Func LocalInitializer { get; } + + /// + /// Returns the local finalizer action. + /// + public Action LocalFinalizer { get; } + + /// + /// Returns the underlying used intermediates. + /// + protected ReadOnlySpan Used => used; + + #endregion + + #region Methods + + /// + /// Initializes this parallel cache of the next parallel operation. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void InitializeProcessing() + { + // This method does not perform an operation at the moment but this may + // change in the future. For this reason, this (empty) method remains here + // and should be called in all cases prior to calling GetOrCreate(). + } + + /// + /// Gets or creates a new intermediate array tuple storing information for the + /// upcoming optimizer iteration. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public T GetOrCreate() + { + // Checks the cache contents to retrieve previously + T intermediate; + lock (SyncRoot) + { + if (cache.Count > 0) + { + int lastIndex = cache.Count - 1; + intermediate = cache[lastIndex]; + cache.RemoveAt(lastIndex); + } + else + { + // Create a new intermediate result + intermediate = CreateIntermediate(); + } + } + + // Initialize intermediate result and return + InitializeIntermediate(intermediate); + + // Add to our list of used intermediates + lock (SyncRoot) + used.Add(intermediate); + + return intermediate; + } + + /// + /// Finishes a parallel processing step. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void FinishProcessing() + { + // Return all used intermediates to the cache + cache.AddRange(used); + used.Clear(); + + } + + /// + /// Creates a new intermediate instance without initializing it properly. + /// + /// The created intermediate state. + protected abstract T CreateIntermediate(); + + /// + /// Initializes the given intermediate state in order to prepare it for + /// processing. + /// + /// The intermediate state to prepare. + protected virtual void InitializeIntermediate(T intermediateState) { } + + /// + /// Finishes processing of the current thread while getting an intermediate state. + /// + /// The intermediate state to operate on. + protected virtual void FinishProcessing(T intermediateState) { } + + #endregion + + #region IParallelCache + + /// + /// Creates a new intermediate instance without initializing it properly. + /// + /// The created intermediate state. + T IParallelCache.CreateIntermediate() => CreateIntermediate(); + + /// + /// Initializes the given intermediate state in order to prepare it for + /// processing. + /// + /// The intermediate state to prepare. + void IParallelCache.InitializeIntermediate(T intermediateState) => + InitializeIntermediate(intermediateState); + + /// + /// Finishes processing of the current thread while getting an intermediate state. + /// + /// The intermediate state to operate on. + void IParallelCache.FinishProcessing(T intermediateState) => + FinishProcessing(intermediateState); + + #endregion + + #region IDisposable + + /// + /// Disposes all created intermediate states (if required). + /// + protected override void Dispose(bool disposing) + { + // Check whether we need to dispose all elements + if (cache.Count > 0 && typeof(IDisposable).IsAssignableFrom(typeof(T))) + { + foreach (var intermediateStates in cache) + intermediateStates.AsNotNullCast().Dispose(); + } + base.Dispose(disposing); + } + + #endregion + } + + /// + /// An abstract parallel cache interface operating on intermediate states. + /// + /// The type of all intermediate states. + public interface IParallelCache + { + /// + /// Creates a new intermediate instance without initializing it properly. + /// + /// The created intermediate state. + T CreateIntermediate(); + + /// + /// Initializes the given intermediate state in order to prepare it for + /// processing. + /// + /// The intermediate state to prepare. + void InitializeIntermediate(T intermediateState); + + /// + /// Finishes processing of the current thread while getting an intermediate state. + /// + /// The intermediate state to operate on. + void FinishProcessing(T intermediateState); + } + + /// + /// An abstract parallel processing body representing a function to be executed + /// concurrently on a given value range. It operates on intermediate values that are + /// managed by its surrounding processing cache. + /// + /// The type of all intermediate states. + public interface IParallelProcessingBody + where T : class + { + /// + /// Initializes this processing body to prepare the upcoming parallel processing + /// steps. + /// + void Initialize(); + + /// + /// Processes a single element concurrently while accepting an intermediate state + /// on which this body operates on. + /// + /// The current processing element index. + /// The parallel loop state (if any). + /// + /// The current intermediate state for this thread. + /// + void Process( + int index, + ParallelLoopState? loopState, + T intermediateState); + + /// + /// Finalizes the current body operating while having the ability to inspect all + /// previously used intermediate states. + /// + /// + /// A span referring to all previously used intermediate states. + /// + void Finalize(ReadOnlySpan intermediateStates); + } + + /// + /// Static helpers for parallel processing extensions. + /// + public static class ParallelProcessing + { + /// + /// Gets or sets whether debug mode is enabled. Note that this assignment needs to + /// be changes before the first + /// instance has been created since the flag is cached locally to enable JIT + /// optimizations. + /// + public static bool DebugMode { get; set; } + } + + /// + /// Represents a parallel object cache to be used in combination with a + /// for implementation to avoid unnecessary temporary object + /// creation. Furthermore, this implementation operates on specialized body instances + /// to avoid virtual function calls in each processing step. + /// + /// The type of the elements to cache. + /// The type of the custom loop body instance. + public abstract class ParallelProcessingCache : ParallelCache + where T : class + where TBody : IParallelProcessingBody + { + /// + /// Returns true if the debug mode is enabled for all parallel processing + /// operations. + /// + private static readonly bool DebugMode = ParallelProcessing.DebugMode; + + private readonly Func body; + private readonly TBody bodyImplementation; + private readonly ParallelOptions defaultOptions = new(); + + /// + /// Creates a new parallel processing cache operating on intermediate states. + /// + /// + /// The initial number of processing threads (if any). + /// + [SuppressMessage( + "Usage", + "CA2214:Do not call overridable methods in constructors", + Justification = "This method is called here as it represents an abstract " + + "static factory method")] + protected ParallelProcessingCache(int? initialCapacity = null) + : base(initialCapacity) + { + bodyImplementation = CreateBody(); + body = (i, state, intermediate) => + { + bodyImplementation.Process(i, state, intermediate); + return intermediate; + }; + } + + /// + /// Creates the required parallel processing body to be used. + /// + /// The processing body to use. + protected abstract TBody CreateBody(); + + /// + /// Performs the current operation in parallel. + /// + /// The inclusive start index. + /// The exclusive end index. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ParallelFor(int fromInclusive, int toExclusive) => + ParallelFor(fromInclusive, toExclusive, defaultOptions); + + /// + /// Performs the current operation in parallel. + /// + /// The inclusive start index. + /// The exclusive end index. + /// The parallel execution options. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void ParallelFor( + int fromInclusive, + int toExclusive, + ParallelOptions options) + { + // Initialize processing cache + InitializeProcessing(); + + // Initialize operation + bodyImplementation.Initialize(); + + // Check for enabled debug mode + if (DebugMode) + { + var intermediate = GetOrCreate(); + for (int i = fromInclusive; i < toExclusive; ++i) + body(i, null, intermediate); + } + else + { + Parallel.For( + fromInclusive, + toExclusive, + options, + LocalInitializer, + body, + LocalFinalizer); + } + + // Finalize operation + bodyImplementation.Finalize(Used); + FinishProcessing(); + } + } +}