diff --git a/README.md b/README.md index d3cf344..59ad566 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Build Status](https://github.com/JuliaImageRecon/AbstractImageReconstruction.jl/actions/workflows/CI.yml/badge.svg?branch=main)](https://github.com/JuliaImageRecon/AbstractImageReconstruction.jl/actions/workflows/CI.yml?query=branch%3Amain) -[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://JuliaImageRecon.github.io/AbstractImageReconstruction.jl/latest) +[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://JuliaImageRecon.github.io/AbstractImageReconstruction.jl) This package contains an interface and type hierarchy for image reconstruction algorithms and their parameters, together with associated utility tools. diff --git a/docs/make.jl b/docs/make.jl index 5cfc31d..c8e1ad8 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -39,6 +39,7 @@ makedocs( "Serialization" => "generated/howto/serialization.md", "Caching" => "generated/howto/caching.md", "Observables" => "generated/howto/observables.md", + "Multi-Threading" => "generated/howto/multi_threading.md", ], "API Reference" => "API/api.md", #"Regularization Terms" => "API/regularization.md"], diff --git a/docs/src/API/api.md b/docs/src/API/api.md index eb290e8..134580b 100644 --- a/docs/src/API/api.md +++ b/docs/src/API/api.md @@ -8,6 +8,10 @@ AbstractImageReconstruction.AbstractImageReconstructionAlgorithm AbstractImageReconstruction.reconstruct Base.put!(::AbstractImageReconstructionAlgorithm, ::Any) Base.take!(::AbstractImageReconstructionAlgorithm) +Base.lock(::AbstractImageReconstructionAlgorithm) +Base.unlock(::AbstractImageReconstructionAlgorithm) +Base.isready(::AbstractImageReconstructionAlgorithm) +Base.wait(::AbstractImageReconstructionAlgorithm) AbstractImageReconstruction.AbstractImageReconstructionParameters AbstractImageReconstruction.process AbstractImageReconstruction.parameter diff --git a/docs/src/literate/example/2_direct.jl b/docs/src/literate/example/2_direct.jl index 1fb6887..a3fb458 100644 --- a/docs/src/literate/example/2_direct.jl +++ b/docs/src/literate/example/2_direct.jl @@ -53,12 +53,20 @@ end # And they implement a method to retrieve the used parameters: AbstractImageReconstruction.parameter(algo::DirectRadonAlgorithm) = algo.parameter -# And implement the `put!` and `take!` functions, mimicking the behavior of a FIFO channel: +# Algorithms are assumed to be stateful. To ensure thread safety, we need to implement the `lock` and `unlock` functions. We will use the `output` channel as a lock: +Base.lock(algo::DirectRadonAlgorithm) = lock(algo.output) +Base.unlock(algo::DirectRadonAlgorithm) = unlock(algo.output) + +# And implement the `put!` and `take!` functions, mimicking the behavior of a FIFO channel for reconstructions: Base.take!(algo::DirectRadonAlgorithm) = Base.take!(algo.output) -function Base.put!(algo::DirectRadonAlgorithm, data::AbstractArray{T, 4}) where {T} - lock(algo.output) do +function Base.put!(algo::DirectRadonAlgorithm, data::AbstractArray{T, 4}) where {T} + lock(algo) do put!(algo.output, process(algo, algo.parameter, data)) end end # The way the behaviour is implemented here, the algorithm does not buffer any inputs and instead blocks until the currenct reconstruction is done. Outputs are stored until they are retrieved. + +# With `wait` and `isready` we can check if the algorithm is currently processing data or if it is ready to accept new inputs: +Base.wait(algo::DirectRadonAlgorithm) = wait(algo.output) +Base.isready(algo::DirectRadonAlgorithm) = isready(algo.output) \ No newline at end of file diff --git a/docs/src/literate/example/4_iterative.jl b/docs/src/literate/example/4_iterative.jl index ac71dd9..d52df96 100644 --- a/docs/src/literate/example/4_iterative.jl +++ b/docs/src/literate/example/4_iterative.jl @@ -64,7 +64,7 @@ function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, para end # Note that initially the operator is `nothing` and the processing step would fail as it stands. To "fix" this we define a `process` method for the algorithm instance which creates the operator and stores it in the algorithm: -function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, params::IterativeRadonReconstructionParameters, ::Nothing, data::AbstractArray{T, 4}) where {T} +function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, params::AbstractIterativeRadonReconstructionParameters, ::Nothing, data::AbstractArray{T, 4}) where {T} op = RadonOp(T; shape = params.shape, angles = params.angles) algo.op = op return process(AbstractIterativeRadonAlgorithm, params, op, data) @@ -73,11 +73,15 @@ end # Our algorithm is not type stable. To fix this, we would need to know the element type of the sinograms during construction. Which is possible with a different parameterization of the algorithm. We will not do this here. # Often times the performance impact of this is negligible as the critical sections are in the preprocessing or the iterative solver, especially since we still dispatch on the operator. -# To finish up the implementation we need to implement the `put!`, `take!` and `parameters` functions: +# To finish up the implementation we need to implement the remaining runtime related functions: Base.take!(algo::IterativeRadonAlgorithm) = Base.take!(algo.output) function Base.put!(algo::IterativeRadonAlgorithm, data::AbstractArray{T, 4}) where {T} lock(algo.output) do put!(algo.output, process(algo, algo.parameter, data)) end end +Base.lock(algo::IterativeRadonAlgorithm) = lock(algo.output) +Base.unlock(algo::IterativeRadonAlgorithm) = unlock(algo.output) +Base.isready(algo::IterativeRadonAlgorithm) = isready(algo.output) +Base.wait(algo::IterativeRadonAlgorithm) = wait(algo.output) AbstractImageReconstruction.parameter(algo::IterativeRadonAlgorithm) = algo.parameter \ No newline at end of file diff --git a/docs/src/literate/example/example_include_data.jl b/docs/src/literate/example/example_include_data.jl index fc4e924..653cff7 100644 --- a/docs/src/literate/example/example_include_data.jl +++ b/docs/src/literate/example/example_include_data.jl @@ -31,4 +31,5 @@ angles, shape, sinograms, images = isDataDefined ? (angles, shape, sinograms, im sinograms[:, :, :, i] = Array(RadonKA.radon(images[:, :, :, i], angles)) end return angles, shape, sinograms, images -end; +end +nothing diff --git a/docs/src/literate/howto/multi_threading.jl b/docs/src/literate/howto/multi_threading.jl new file mode 100644 index 0000000..52ff8ba --- /dev/null +++ b/docs/src/literate/howto/multi_threading.jl @@ -0,0 +1,74 @@ +include("../../literate/example/example_include_all.jl") #hide + +# # Multi-Threading +# `AbstractImageReconstruction` assumes that algorithms are stateful. This is reflected in the FIFO behaviour and the locking interface of algorithms. +# The motivation behind this choice is that the nature of computations within an algorithms heavily impact if multi-threading is beneficial or not. +# For example, consider a GPU-accelerated reconstruction. There it might be faster to sequentially process all images on the GPU instead of processing them in parallel. Or consider, the preprocessing step of the Radon example where we average our data. If we were to extend our algorithm to read sinograms from a file, it might be inefficient to partially read and average frames from the file in parallel. +# Instead it would be more efficient to read the required file in one go and then average the frames in parallel. +# Therefore, the actual runtime behaviour is intended to be an implementation detail of an algorithm which is to be abstracted behind `reconstruct`. + +# In the following we will explore the results of this design decision. If we consider a n algorithm such as: +plan = RecoPlan(IterativeRadonAlgorithm, parameter = RecoPlan(IterativeRadonParameters, + pre = RecoPlan(RadonPreprocessingParameters, frames = collect(1:5)), + reco = RecoPlan(IterativeRadonReconstructionParameters, shape = size(images)[1:3], angles = angles, + iterations = 20, reg = [L2Regularization(0.001), PositiveRegularization()], solver = CGNR) +)) +algo = build(plan) + +# which acts on one frame at a time, we could in theory do: +# ```julia +# Threads.@threads for i = 1:size(sinograms, 4) +# res = reconstruct(algo, sinograms[:, :, :, i:i]) +# # Store res +# end +# ``` +# Due to the locking interface of the algorithm, this will not actually run in parallel. Instead the algorithm will be locked for each iteration and tasks will wait for the previous reconstruction to finish. + +# If a user wants to explicitly use multi-threading, we could the plan to create a new algorithm for each task: +# ```julia +# Threads.@threads for i = 1:size(sinograms, 4) +# algo = build(plan) +# res = reconstruct(algo, sinograms[:, :, :, i:i]) +# # Store res +# end +# ``` +# This way each task has its own algorithm and can run in parallel. As mentioned before, this parallelization might not be the most efficient parallel reconstruction, both in terms of memory and runtime. + +# To further improve the performance of the reconstruction, we could implement specific multi-threading `process`-ing steps for our algorithms. As an example, we will implement parallel processing for the iterative solver: +Base.@kwdef struct ThreadedIterativeReconstructionParameters{S <: AbstractLinearSolver, R <: AbstractRegularization, N} <: AbstractIterativeRadonReconstructionParameters + solver::Type{S} + iterations::Int64 + reg::Vector{R} + shape::NTuple{N, Int64} + angles::Vector{Float64} +end +# Our parameters are identical to the iterative reconstruction parameters from the iterative example. We only differ in the type of the parameters. This allows us to dispatch on the type of the parameters and implement a different `process` method for the threaded version: +function AbstractImageReconstruction.process(::Type{<:AbstractIterativeRadonAlgorithm}, params::ThreadedIterativeReconstructionParameters, op, data::AbstractArray{T, 4}) where {T} + + result = similar(data, params.shape..., size(data, 4)) + + Threads.@threads for i = 1:size(data, 4) + solver = createLinearSolver(params.solver, op; iterations = params.iterations, reg = params.reg) + result[:, :, :, i] = solve!(solver, vec(data[:, :, :, i])) + end + + return result +end + +# While the Radon operator is thread-safe, the normal operator constructed by the solver is not. Therefore, we can reuse the Radon operator but still have to construct new solvers for each task. + +# Our multi-threaded reconstruction can be constructed and used just like the single-threaded one:: +plan.parameter.pre.frames = collect(1:size(sinograms, 4)) +plan.parameter.reco = RecoPlan(ThreadedIterativeReconstructionParameters, shape = size(images)[1:3], angles = angles, + iterations = 20, reg = [L2Regularization(0.001), PositiveRegularization()], solver = CGNR) + +algo = build(plan) +imag_iter = reconstruct(algo, sinograms) +fig = Figure() +for i = 1:5 + plot_image(fig[i,1], reverse(images[:, :, 24, i])) + plot_image(fig[i,2], sinograms[:, :, 24, i]) + plot_image(fig[i,3], reverse(imag_iter[:, :, 24, i])) +end +resize_to_layout!(fig) +fig diff --git a/src/AbstractImageReconstruction.jl b/src/AbstractImageReconstruction.jl index ad1ed3d..f62ce67 100644 --- a/src/AbstractImageReconstruction.jl +++ b/src/AbstractImageReconstruction.jl @@ -6,7 +6,7 @@ using Observables using Scratch using LRUCache -import Base: put!, take!, fieldtypes, fieldtype, ismissing, propertynames, parent, hash, wait, isready +import Base: put!, take!, fieldtypes, fieldtype, ismissing, propertynames, parent, hash, wait, isready, lock, unlock include("AlgorithmInterface.jl") include("StructTransforms.jl") diff --git a/src/AlgorithmInterface.jl b/src/AlgorithmInterface.jl index f8c64e0..e522ba5 100644 --- a/src/AlgorithmInterface.jl +++ b/src/AlgorithmInterface.jl @@ -27,16 +27,57 @@ put!(algo::AbstractImageReconstructionAlgorithm, inputs...) = error("$(typeof(al Remove and return a stored result from the algorithm `algo`. Blocks until a result is available. """ take!(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement take!") +""" + isready(algo::AbstractImageReconstructionAlgorithm) + +Determine if the algorithm `algo` has a result available. +""" +isready(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement isready") +""" + wait(algo::AbstractImageReconstructionAlgorithm) + +Wait for a result to be available from the specified `algo`. +""" +wait(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement wait") +""" + lock(algo::AbstractImageReconstructionAlgorithm) + +Acquire a lock on the algorithm `algo`. If the lock is already acquired, wait until it is released. + +Each `lock` must be matched with a `unlock`. +""" +lock(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement lock") +""" + unlock(algo::AbstractImageReconstructionAlgorithm) + +Release a lock on the algorithm `algo`. +""" +unlock(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement unlock") +""" + lock(fn, algo::AbstractImageReconstructionAlgorithm) + +Acquire the `lock` on `algo`, execute `fn` and release the `lock` afterwards. +""" +function lock(fn, algo::AbstractImageReconstructionAlgorithm) + lock(algo) + try + fn() + finally + unlock(algo) + end +end export reconstruct """ reconstruct(algo::T, u) where {T<:AbstractImageReconstructionAlgorithm} -Reconstruct an image from input `u` using algorithm `algo`. +Reconstruct an image from input `u` using algorithm `algo`. The `àlgo` will be `lock`ed until the result is available or an error occurs. """ function reconstruct(algo::T, u) where {T<:AbstractImageReconstructionAlgorithm} - put!(algo, u) - return take!(algo) + lock(algo) do + put!(algo, u) + return take!(algo) + end end export process diff --git a/src/MiscAlgorithms/MiscAlgorithms.jl b/src/MiscAlgorithms/MiscAlgorithms.jl index c18c448..e92aa75 100644 --- a/src/MiscAlgorithms/MiscAlgorithms.jl +++ b/src/MiscAlgorithms/MiscAlgorithms.jl @@ -1 +1 @@ -include("RuntimeAlgorithms.jl") \ No newline at end of file +include("ThreadPinnedAlgorithm.jl") \ No newline at end of file diff --git a/src/MiscAlgorithms/RuntimeAlgorithms.jl b/src/MiscAlgorithms/MultiThreadedAlgorithms.jl similarity index 68% rename from src/MiscAlgorithms/RuntimeAlgorithms.jl rename to src/MiscAlgorithms/MultiThreadedAlgorithms.jl index 1257753..7d78449 100644 --- a/src/MiscAlgorithms/RuntimeAlgorithms.jl +++ b/src/MiscAlgorithms/MultiThreadedAlgorithms.jl @@ -1,47 +1,3 @@ -export ThreadPinnedAlgorithm, ThreadPinnedAlgorithmParameter -Base.@kwdef struct ThreadPinnedAlgorithmParameter <: AbstractImageReconstructionParameters - threadID::Int64 - algo::AbstractImageReconstructionAlgorithm -end - -mutable struct ThreadPinnedAlgorithm <: AbstractImageReconstructionAlgorithm - params::ThreadPinnedAlgorithmParameter - recoTask::Union{Nothing,Task} - taskLock::ReentrantLock - inputChannel::Channel{Any} - outputChannel::Channel{Any} -end - -ThreadPinnedAlgorithm(params::ThreadPinnedAlgorithmParameter) = ThreadPinnedAlgorithm(params, nothing, ReentrantLock(), Channel{Any}(Inf), Channel{Any}(Inf)) - -take!(algo::ThreadPinnedAlgorithm) = take!(algo.outputChannel) -function put!(algo::ThreadPinnedAlgorithm, u) - put!(algo.inputChannel, u) - lock(algo.taskLock) - try - if isnothing(algo.recoTask) || istaskdone(algo.recoTask) - algo.recoTask = @tspawnat algo.params.threadID pinnedRecoTask(algo) - end - finally - unlock(algo.taskLock) - end -end -function pinnedRecoTask(algo::ThreadPinnedAlgorithm) - while isready(algo.inputChannel) - result = nothing - try - put!(algo.params.algo, take!(algo.inputChannel)) - result = take!(algo.params.algo) - catch e - result = e - end - put!(algo.outputChannel, result) - end -end -# TODO general async task, has to preserve order (cant just spawn task for each put) -# TODO Timeout task with timeout options for put and take -# TODO maybe can be cancelled? - export AbstractMultiThreadedProcessing abstract type AbstractMultiThreadedProcessing <: AbstractImageReconstructionAlgorithm end diff --git a/src/MiscAlgorithms/ThreadPinnedAlgorithm.jl b/src/MiscAlgorithms/ThreadPinnedAlgorithm.jl new file mode 100644 index 0000000..cd18526 --- /dev/null +++ b/src/MiscAlgorithms/ThreadPinnedAlgorithm.jl @@ -0,0 +1,43 @@ +export ThreadPinnedAlgorithm, ThreadPinnedAlgorithmParameter +Base.@kwdef struct ThreadPinnedAlgorithmParameter <: AbstractImageReconstructionParameters + threadID::Int64 + algo::AbstractImageReconstructionAlgorithm +end + +mutable struct ThreadPinnedAlgorithm <: AbstractImageReconstructionAlgorithm + params::ThreadPinnedAlgorithmParameter + recoTask::Union{Nothing,Task} + taskLock::ReentrantLock + inputChannel::Channel{Any} + outputChannel::Channel{Any} +end + +ThreadPinnedAlgorithm(params::ThreadPinnedAlgorithmParameter) = ThreadPinnedAlgorithm(params, nothing, ReentrantLock(), Channel{Any}(Inf), Channel{Any}(Inf)) + +take!(algo::ThreadPinnedAlgorithm) = take!(algo.outputChannel) +function put!(algo::ThreadPinnedAlgorithm, u) + put!(algo.inputChannel, u) + lock(algo.taskLock) + try + if isnothing(algo.recoTask) || istaskdone(algo.recoTask) + algo.recoTask = @tspawnat algo.params.threadID pinnedRecoTask(algo) + end + finally + unlock(algo.taskLock) + end +end +function pinnedRecoTask(algo::ThreadPinnedAlgorithm) + while isready(algo.inputChannel) + result = nothing + try + put!(algo.params.algo, take!(algo.inputChannel)) + result = take!(algo.params.algo) + catch e + result = e + end + put!(algo.outputChannel, result) + end +end +# TODO general async task, has to preserve order (cant just spawn task for each put) +# TODO Timeout task with timeout options for put and take +# TODO maybe can be cancelled? \ No newline at end of file