Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add algorithm locking for thread-safety #13

Merged
merged 5 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![Build Status](https://github.com/JuliaImageRecon/AbstractImageReconstruction.jl/actions/workflows/CI.yml/badge.svg?branch=main)](https://github.com/JuliaImageRecon/AbstractImageReconstruction.jl/actions/workflows/CI.yml?query=branch%3Amain)

[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://JuliaImageRecon.github.io/AbstractImageReconstruction.jl/latest)
[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://JuliaImageRecon.github.io/AbstractImageReconstruction.jl)


This package contains an interface and type hierarchy for image reconstruction algorithms and their parameters, together with associated utility tools.
Expand Down
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ makedocs(
"Serialization" => "generated/howto/serialization.md",
"Caching" => "generated/howto/caching.md",
"Observables" => "generated/howto/observables.md",
"Multi-Threading" => "generated/howto/multi_threading.md",
],
"API Reference" => "API/api.md",
#"Regularization Terms" => "API/regularization.md"],
Expand Down
4 changes: 4 additions & 0 deletions docs/src/API/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ AbstractImageReconstruction.AbstractImageReconstructionAlgorithm
AbstractImageReconstruction.reconstruct
Base.put!(::AbstractImageReconstructionAlgorithm, ::Any)
Base.take!(::AbstractImageReconstructionAlgorithm)
Base.lock(::AbstractImageReconstructionAlgorithm)
Base.unlock(::AbstractImageReconstructionAlgorithm)
Base.isready(::AbstractImageReconstructionAlgorithm)
Base.wait(::AbstractImageReconstructionAlgorithm)
AbstractImageReconstruction.AbstractImageReconstructionParameters
AbstractImageReconstruction.process
AbstractImageReconstruction.parameter
Expand Down
14 changes: 11 additions & 3 deletions docs/src/literate/example/2_direct.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,20 @@ end
# And they implement a method to retrieve the used parameters:
AbstractImageReconstruction.parameter(algo::DirectRadonAlgorithm) = algo.parameter

# And implement the `put!` and `take!` functions, mimicking the behavior of a FIFO channel:
# Algorithms are assumed to be stateful. To ensure thread safety, we need to implement the `lock` and `unlock` functions. We will use the `output` channel as a lock:
Base.lock(algo::DirectRadonAlgorithm) = lock(algo.output)
Base.unlock(algo::DirectRadonAlgorithm) = unlock(algo.output)

# And implement the `put!` and `take!` functions, mimicking the behavior of a FIFO channel for reconstructions:
Base.take!(algo::DirectRadonAlgorithm) = Base.take!(algo.output)
function Base.put!(algo::DirectRadonAlgorithm, data::AbstractArray{T, 4}) where {T}
lock(algo.output) do
function Base.put!(algo::DirectRadonAlgorithm, data::AbstractArray{T, 4}) where {T}
lock(algo) do
put!(algo.output, process(algo, algo.parameter, data))
end
end

# The way the behaviour is implemented here, the algorithm does not buffer any inputs and instead blocks until the currenct reconstruction is done. Outputs are stored until they are retrieved.

# With `wait` and `isready` we can check if the algorithm is currently processing data or if it is ready to accept new inputs:
Base.wait(algo::DirectRadonAlgorithm) = wait(algo.output)
Base.isready(algo::DirectRadonAlgorithm) = isready(algo.output)
8 changes: 6 additions & 2 deletions docs/src/literate/example/4_iterative.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, para
end

# Note that initially the operator is `nothing` and the processing step would fail as it stands. To "fix" this we define a `process` method for the algorithm instance which creates the operator and stores it in the algorithm:
function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, params::IterativeRadonReconstructionParameters, ::Nothing, data::AbstractArray{T, 4}) where {T}
function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, params::AbstractIterativeRadonReconstructionParameters, ::Nothing, data::AbstractArray{T, 4}) where {T}
op = RadonOp(T; shape = params.shape, angles = params.angles)
algo.op = op
return process(AbstractIterativeRadonAlgorithm, params, op, data)
Expand All @@ -73,11 +73,15 @@ end
# Our algorithm is not type stable. To fix this, we would need to know the element type of the sinograms during construction. Which is possible with a different parameterization of the algorithm. We will not do this here.
# Often times the performance impact of this is negligible as the critical sections are in the preprocessing or the iterative solver, especially since we still dispatch on the operator.

# To finish up the implementation we need to implement the `put!`, `take!` and `parameters` functions:
# To finish up the implementation we need to implement the remaining runtime related functions:
Base.take!(algo::IterativeRadonAlgorithm) = Base.take!(algo.output)
function Base.put!(algo::IterativeRadonAlgorithm, data::AbstractArray{T, 4}) where {T}
lock(algo.output) do
put!(algo.output, process(algo, algo.parameter, data))
end
end
Base.lock(algo::IterativeRadonAlgorithm) = lock(algo.output)
Base.unlock(algo::IterativeRadonAlgorithm) = unlock(algo.output)
Base.isready(algo::IterativeRadonAlgorithm) = isready(algo.output)
Base.wait(algo::IterativeRadonAlgorithm) = wait(algo.output)
AbstractImageReconstruction.parameter(algo::IterativeRadonAlgorithm) = algo.parameter
3 changes: 2 additions & 1 deletion docs/src/literate/example/example_include_data.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ angles, shape, sinograms, images = isDataDefined ? (angles, shape, sinograms, im
sinograms[:, :, :, i] = Array(RadonKA.radon(images[:, :, :, i], angles))
end
return angles, shape, sinograms, images
end;
end
nothing
74 changes: 74 additions & 0 deletions docs/src/literate/howto/multi_threading.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
include("../../literate/example/example_include_all.jl") #hide

# # Multi-Threading
# `AbstractImageReconstruction` assumes that algorithms are stateful. This is reflected in the FIFO behaviour and the locking interface of algorithms.
# The motivation behind this choice is that the nature of computations within an algorithms heavily impact if multi-threading is beneficial or not.
# For example, consider a GPU-accelerated reconstruction. There it might be faster to sequentially process all images on the GPU instead of processing them in parallel. Or consider, the preprocessing step of the Radon example where we average our data. If we were to extend our algorithm to read sinograms from a file, it might be inefficient to partially read and average frames from the file in parallel.
# Instead it would be more efficient to read the required file in one go and then average the frames in parallel.
# Therefore, the actual runtime behaviour is intended to be an implementation detail of an algorithm which is to be abstracted behind `reconstruct`.

# In the following we will explore the results of this design decision. If we consider a n algorithm such as:
plan = RecoPlan(IterativeRadonAlgorithm, parameter = RecoPlan(IterativeRadonParameters,
pre = RecoPlan(RadonPreprocessingParameters, frames = collect(1:5)),
reco = RecoPlan(IterativeRadonReconstructionParameters, shape = size(images)[1:3], angles = angles,
iterations = 20, reg = [L2Regularization(0.001), PositiveRegularization()], solver = CGNR)
))
algo = build(plan)

# which acts on one frame at a time, we could in theory do:
# ```julia
# Threads.@threads for i = 1:size(sinograms, 4)
# res = reconstruct(algo, sinograms[:, :, :, i:i])
# # Store res
# end
# ```
# Due to the locking interface of the algorithm, this will not actually run in parallel. Instead the algorithm will be locked for each iteration and tasks will wait for the previous reconstruction to finish.

# If a user wants to explicitly use multi-threading, we could the plan to create a new algorithm for each task:
# ```julia
# Threads.@threads for i = 1:size(sinograms, 4)
# algo = build(plan)
# res = reconstruct(algo, sinograms[:, :, :, i:i])
# # Store res
# end
# ```
# This way each task has its own algorithm and can run in parallel. As mentioned before, this parallelization might not be the most efficient parallel reconstruction, both in terms of memory and runtime.

# To further improve the performance of the reconstruction, we could implement specific multi-threading `process`-ing steps for our algorithms. As an example, we will implement parallel processing for the iterative solver:
Base.@kwdef struct ThreadedIterativeReconstructionParameters{S <: AbstractLinearSolver, R <: AbstractRegularization, N} <: AbstractIterativeRadonReconstructionParameters
solver::Type{S}
iterations::Int64
reg::Vector{R}
shape::NTuple{N, Int64}
angles::Vector{Float64}
end
# Our parameters are identical to the iterative reconstruction parameters from the iterative example. We only differ in the type of the parameters. This allows us to dispatch on the type of the parameters and implement a different `process` method for the threaded version:
function AbstractImageReconstruction.process(::Type{<:AbstractIterativeRadonAlgorithm}, params::ThreadedIterativeReconstructionParameters, op, data::AbstractArray{T, 4}) where {T}

result = similar(data, params.shape..., size(data, 4))

Threads.@threads for i = 1:size(data, 4)
solver = createLinearSolver(params.solver, op; iterations = params.iterations, reg = params.reg)
result[:, :, :, i] = solve!(solver, vec(data[:, :, :, i]))
end

return result
end

# While the Radon operator is thread-safe, the normal operator constructed by the solver is not. Therefore, we can reuse the Radon operator but still have to construct new solvers for each task.

# Our multi-threaded reconstruction can be constructed and used just like the single-threaded one::
plan.parameter.pre.frames = collect(1:size(sinograms, 4))
plan.parameter.reco = RecoPlan(ThreadedIterativeReconstructionParameters, shape = size(images)[1:3], angles = angles,
iterations = 20, reg = [L2Regularization(0.001), PositiveRegularization()], solver = CGNR)

algo = build(plan)
imag_iter = reconstruct(algo, sinograms)
fig = Figure()
for i = 1:5
plot_image(fig[i,1], reverse(images[:, :, 24, i]))
plot_image(fig[i,2], sinograms[:, :, 24, i])
plot_image(fig[i,3], reverse(imag_iter[:, :, 24, i]))
end
resize_to_layout!(fig)
fig
2 changes: 1 addition & 1 deletion src/AbstractImageReconstruction.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ using Observables
using Scratch
using LRUCache

import Base: put!, take!, fieldtypes, fieldtype, ismissing, propertynames, parent, hash, wait, isready
import Base: put!, take!, fieldtypes, fieldtype, ismissing, propertynames, parent, hash, wait, isready, lock, unlock

include("AlgorithmInterface.jl")
include("StructTransforms.jl")
Expand Down
47 changes: 44 additions & 3 deletions src/AlgorithmInterface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,57 @@ put!(algo::AbstractImageReconstructionAlgorithm, inputs...) = error("$(typeof(al
Remove and return a stored result from the algorithm `algo`. Blocks until a result is available.
"""
take!(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement take!")
"""
isready(algo::AbstractImageReconstructionAlgorithm)

Determine if the algorithm `algo` has a result available.
"""
isready(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement isready")
"""
wait(algo::AbstractImageReconstructionAlgorithm)

Wait for a result to be available from the specified `algo`.
"""
wait(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement wait")
"""
lock(algo::AbstractImageReconstructionAlgorithm)

Acquire a lock on the algorithm `algo`. If the lock is already acquired, wait until it is released.

Each `lock` must be matched with a `unlock`.
"""
lock(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement lock")
"""
unlock(algo::AbstractImageReconstructionAlgorithm)

Release a lock on the algorithm `algo`.
"""
unlock(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement unlock")
"""
lock(fn, algo::AbstractImageReconstructionAlgorithm)

Acquire the `lock` on `algo`, execute `fn` and release the `lock` afterwards.
"""
function lock(fn, algo::AbstractImageReconstructionAlgorithm)
lock(algo)
try
fn()
finally
unlock(algo)
end
end

export reconstruct
"""
reconstruct(algo::T, u) where {T<:AbstractImageReconstructionAlgorithm}

Reconstruct an image from input `u` using algorithm `algo`.
Reconstruct an image from input `u` using algorithm `algo`. The `àlgo` will be `lock`ed until the result is available or an error occurs.
"""
function reconstruct(algo::T, u) where {T<:AbstractImageReconstructionAlgorithm}
put!(algo, u)
return take!(algo)
lock(algo) do
put!(algo, u)
return take!(algo)
end
end

export process
Expand Down
2 changes: 1 addition & 1 deletion src/MiscAlgorithms/MiscAlgorithms.jl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
include("RuntimeAlgorithms.jl")
include("ThreadPinnedAlgorithm.jl")
Original file line number Diff line number Diff line change
@@ -1,47 +1,3 @@
export ThreadPinnedAlgorithm, ThreadPinnedAlgorithmParameter
Base.@kwdef struct ThreadPinnedAlgorithmParameter <: AbstractImageReconstructionParameters
threadID::Int64
algo::AbstractImageReconstructionAlgorithm
end

mutable struct ThreadPinnedAlgorithm <: AbstractImageReconstructionAlgorithm
params::ThreadPinnedAlgorithmParameter
recoTask::Union{Nothing,Task}
taskLock::ReentrantLock
inputChannel::Channel{Any}
outputChannel::Channel{Any}
end

ThreadPinnedAlgorithm(params::ThreadPinnedAlgorithmParameter) = ThreadPinnedAlgorithm(params, nothing, ReentrantLock(), Channel{Any}(Inf), Channel{Any}(Inf))

take!(algo::ThreadPinnedAlgorithm) = take!(algo.outputChannel)
function put!(algo::ThreadPinnedAlgorithm, u)
put!(algo.inputChannel, u)
lock(algo.taskLock)
try
if isnothing(algo.recoTask) || istaskdone(algo.recoTask)
algo.recoTask = @tspawnat algo.params.threadID pinnedRecoTask(algo)
end
finally
unlock(algo.taskLock)
end
end
function pinnedRecoTask(algo::ThreadPinnedAlgorithm)
while isready(algo.inputChannel)
result = nothing
try
put!(algo.params.algo, take!(algo.inputChannel))
result = take!(algo.params.algo)
catch e
result = e
end
put!(algo.outputChannel, result)
end
end
# TODO general async task, has to preserve order (cant just spawn task for each put)
# TODO Timeout task with timeout options for put and take
# TODO maybe can be cancelled?

export AbstractMultiThreadedProcessing
abstract type AbstractMultiThreadedProcessing <: AbstractImageReconstructionAlgorithm end

Expand Down
43 changes: 43 additions & 0 deletions src/MiscAlgorithms/ThreadPinnedAlgorithm.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
export ThreadPinnedAlgorithm, ThreadPinnedAlgorithmParameter
Base.@kwdef struct ThreadPinnedAlgorithmParameter <: AbstractImageReconstructionParameters
threadID::Int64
algo::AbstractImageReconstructionAlgorithm
end

mutable struct ThreadPinnedAlgorithm <: AbstractImageReconstructionAlgorithm
params::ThreadPinnedAlgorithmParameter
recoTask::Union{Nothing,Task}
taskLock::ReentrantLock
inputChannel::Channel{Any}
outputChannel::Channel{Any}
end

ThreadPinnedAlgorithm(params::ThreadPinnedAlgorithmParameter) = ThreadPinnedAlgorithm(params, nothing, ReentrantLock(), Channel{Any}(Inf), Channel{Any}(Inf))

take!(algo::ThreadPinnedAlgorithm) = take!(algo.outputChannel)
function put!(algo::ThreadPinnedAlgorithm, u)
put!(algo.inputChannel, u)
lock(algo.taskLock)
try
if isnothing(algo.recoTask) || istaskdone(algo.recoTask)
algo.recoTask = @tspawnat algo.params.threadID pinnedRecoTask(algo)
end
finally
unlock(algo.taskLock)
end
end
function pinnedRecoTask(algo::ThreadPinnedAlgorithm)
while isready(algo.inputChannel)
result = nothing
try
put!(algo.params.algo, take!(algo.inputChannel))
result = take!(algo.params.algo)
catch e
result = e
end
put!(algo.outputChannel, result)
end
end
# TODO general async task, has to preserve order (cant just spawn task for each put)
# TODO Timeout task with timeout options for put and take
# TODO maybe can be cancelled?
Loading