From 95d6bbf0794e7a91252f11122b4231d7ea990126 Mon Sep 17 00:00:00 2001 From: nHackel Date: Mon, 25 Sep 2023 18:30:27 +0200 Subject: [PATCH] Add multithreaded process option --- src/MiscAlgorithms/RuntimeAlgorithms.jl | 68 ++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/src/MiscAlgorithms/RuntimeAlgorithms.jl b/src/MiscAlgorithms/RuntimeAlgorithms.jl index cd18526..dd765ca 100644 --- a/src/MiscAlgorithms/RuntimeAlgorithms.jl +++ b/src/MiscAlgorithms/RuntimeAlgorithms.jl @@ -40,4 +40,70 @@ function pinnedRecoTask(algo::ThreadPinnedAlgorithm) end # TODO general async task, has to preserve order (cant just spawn task for each put) # TODO Timeout task with timeout options for put and take -# TODO maybe can be cancelled? \ No newline at end of file +# TODO maybe can be cancelled? + +export AbstractMultiThreadedProcessing +abstract type AbstractMultiThreadedProcessing <: AbstractImageReconstructionAlgorithm end + +mutable struct RoundRobinProcessAlgorithm <: AbstractMultiThreadedProcessing + threads::Vector{Int64} + threadNum::Int64 + threadTasks::Vector{Union{Task, Nothing}} + inputChannels::Vector{Channel{Any}} + inputOrder::Channel{Int64} + outputChannels::Vector{Channel{Any}} + processLock::ReentrantLock +end +RoundRobinProcessAlgorithm(threads::Vector{Int64}) = RoundRobinProcessAlgorithm(threads, 1, [nothing for i = 1:length(threads)], [Channel{Any}(Inf) for i = 1:length(threads)], Channel{Int64}(Inf), [Channel{Any}(Inf) for i = 1:length(threads)], ReentrantLock()) +RoundRobinProcessAlgorithm(threads::UnitRange) = RoundRobinProcessAlgorithm(collect(threads)) +function put!(algo::RoundRobinProcessAlgorithm, innerAlgo::AbstractImageReconstructionAlgorithm, params::AbstractImageReconstructionParameters, inputs...) + lock(algo.processLock) do + threadNum = algo.threadNum + task = algo.threadTasks[threadNum] + put!(algo.inputChannels[threadNum], (innerAlgo, params, inputs...)) + put!(algo.inputOrder, threadNum) + if isnothing(task) || istaskdone(task) + algo.threadTasks[threadNum] = @tspawnat algo.threads[threadNum] processInputs(algo, threadNum) + end + algo.threadNum = mod1(algo.threadNum + 1, length(algo.threads)) + end +end + +function processInputs(algo::RoundRobinProcessAlgorithm, threadNum) + input = algo.inputChannels[threadNum] + output = algo.outputChannels[threadNum] + while isready(input) + result = nothing + try + inner, params, inputs = take!(input) + result = process(inner, params, inputs) + catch e + @error "Error in image processing thread $(algo.threads[threadNum])" exception=(e, catch_backtrace()) + result = e + end + put!(output, result) + end +end + +function take!(algo::RoundRobinProcessAlgorithm) + outputOrder = take!(algo.inputOrder) + return take!(algo.outputChannels[outputOrder]) +end + +export MultiThreadedAlgorithmParameter, MultiThreadedAlgorithm, MultiThreadedInput +struct MultiThreadedInput + scheduler::AbstractMultiThreadedProcessing + inputs::Tuple +end +Base.@kwdef struct MultiThreadedAlgorithmParameter <: AbstractImageReconstructionParameters + threadIDs::Union{Vector{Int64}, UnitRange{Int64}} + algo::AbstractImageReconstructionAlgorithm +end + +mutable struct MultiThreadedAlgorithm <: AbstractImageReconstructionAlgorithm + params::MultiThreadedAlgorithmParameter + scheduler::RoundRobinProcessAlgorithm +end +MultiThreadedAlgorithm(params::MultiThreadedAlgorithmParameter) = MultiThreadedAlgorithm(params, RoundRobinProcessAlgorithm(params.threadIDs)) +put!(algo::MultiThreadedAlgorithm, inputs...) = put!(algo.params.algo, MultiThreadedInput(algo.scheduler, inputs)) +take!(algo::MultiThreadedAlgorithm) = take!(algo.params.algo) \ No newline at end of file