Skip to content

Commit

Permalink
Add multithreaded process option
Browse files Browse the repository at this point in the history
nHackel committed Sep 25, 2023
1 parent f00557f commit 95d6bbf
Showing 1 changed file with 67 additions and 1 deletion.
68 changes: 67 additions & 1 deletion src/MiscAlgorithms/RuntimeAlgorithms.jl
Original file line number Diff line number Diff line change
@@ -40,4 +40,70 @@ function pinnedRecoTask(algo::ThreadPinnedAlgorithm)
end
# TODO general async task, has to preserve order (cant just spawn task for each put)
# TODO Timeout task with timeout options for put and take
# TODO maybe can be cancelled?
# TODO maybe can be cancelled?

export AbstractMultiThreadedProcessing
abstract type AbstractMultiThreadedProcessing <: AbstractImageReconstructionAlgorithm end

mutable struct RoundRobinProcessAlgorithm <: AbstractMultiThreadedProcessing
threads::Vector{Int64}
threadNum::Int64
threadTasks::Vector{Union{Task, Nothing}}
inputChannels::Vector{Channel{Any}}
inputOrder::Channel{Int64}
outputChannels::Vector{Channel{Any}}
processLock::ReentrantLock
end
RoundRobinProcessAlgorithm(threads::Vector{Int64}) = RoundRobinProcessAlgorithm(threads, 1, [nothing for i = 1:length(threads)], [Channel{Any}(Inf) for i = 1:length(threads)], Channel{Int64}(Inf), [Channel{Any}(Inf) for i = 1:length(threads)], ReentrantLock())
RoundRobinProcessAlgorithm(threads::UnitRange) = RoundRobinProcessAlgorithm(collect(threads))
function put!(algo::RoundRobinProcessAlgorithm, innerAlgo::AbstractImageReconstructionAlgorithm, params::AbstractImageReconstructionParameters, inputs...)
lock(algo.processLock) do
threadNum = algo.threadNum
task = algo.threadTasks[threadNum]
put!(algo.inputChannels[threadNum], (innerAlgo, params, inputs...))
put!(algo.inputOrder, threadNum)
if isnothing(task) || istaskdone(task)
algo.threadTasks[threadNum] = @tspawnat algo.threads[threadNum] processInputs(algo, threadNum)
end
algo.threadNum = mod1(algo.threadNum + 1, length(algo.threads))
end
end

function processInputs(algo::RoundRobinProcessAlgorithm, threadNum)
input = algo.inputChannels[threadNum]
output = algo.outputChannels[threadNum]
while isready(input)
result = nothing
try
inner, params, inputs = take!(input)
result = process(inner, params, inputs)
catch e
@error "Error in image processing thread $(algo.threads[threadNum])" exception=(e, catch_backtrace())
result = e
end
put!(output, result)
end
end

function take!(algo::RoundRobinProcessAlgorithm)
outputOrder = take!(algo.inputOrder)
return take!(algo.outputChannels[outputOrder])
end

export MultiThreadedAlgorithmParameter, MultiThreadedAlgorithm, MultiThreadedInput
struct MultiThreadedInput
scheduler::AbstractMultiThreadedProcessing
inputs::Tuple
end
Base.@kwdef struct MultiThreadedAlgorithmParameter <: AbstractImageReconstructionParameters
threadIDs::Union{Vector{Int64}, UnitRange{Int64}}
algo::AbstractImageReconstructionAlgorithm
end

mutable struct MultiThreadedAlgorithm <: AbstractImageReconstructionAlgorithm
params::MultiThreadedAlgorithmParameter
scheduler::RoundRobinProcessAlgorithm
end
MultiThreadedAlgorithm(params::MultiThreadedAlgorithmParameter) = MultiThreadedAlgorithm(params, RoundRobinProcessAlgorithm(params.threadIDs))
put!(algo::MultiThreadedAlgorithm, inputs...) = put!(algo.params.algo, MultiThreadedInput(algo.scheduler, inputs))
take!(algo::MultiThreadedAlgorithm) = take!(algo.params.algo)

0 comments on commit 95d6bbf

Please sign in to comment.