Skip to content

Commit

Permalink
fix docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
jishnub committed Aug 19, 2020
1 parent 75cb555 commit c4bdbaf
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ParallelUtilities"
uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0"
authors = ["Jishnu Bhattacharya <[email protected]>"]
version = "0.7.4"
version = "0.7.5"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
34 changes: 25 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ julia> pmapsum(x -> ones(2).*myid(), 1:nworkers())
5.0
```

# Performance

The `pmapreduce`-related functions are expected to be more performant than `@distributed` for loops. As an example, running the following on a Slurm cluster using 2 nodes with 28 cores on each leads to

```julia
julia> @time @distributed (+) for i=1:nworkers()
ones(10_000, 1_000)
end;
22.355047 seconds (7.05 M allocations: 8.451 GiB, 6.73% gc time)

julia> @time pmapsum(x -> ones(10_000, 1_000), 1:nworkers());
2.672838 seconds (52.83 k allocations: 78.295 MiB, 0.53% gc time)
```

The difference becomes more apparent as larger data needs to be communicated across workers. This is because `ParallelUtilities.pmapreduce*` perform local reductions on each node before communicating across nodes.

# Usage

The package splits up a collection of ranges into subparts of roughly equal length, so that all the cores are approximately equally loaded. This is best understood using an example: let's say that we have a function `f` that is defined as
Expand Down Expand Up @@ -82,7 +98,7 @@ The first six processors receive 4 tuples of parameters each and the final four

The package provides versions of `pmap` with an optional reduction. These differ from the one provided by `Distributed` in a few key aspects: firstly, the iterator product of the argument is what is passed to the function and not the arguments by elementwise, so the i-th task will be `Iterators.product(args...)[i]` and not `[x[i] for x in args]`. Specifically the second set of parameters in the example above will be `(2,2,3)` and not `(2,3,4)`.

Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Thirdly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last active worker. The tasks are also approximately evenly distributed across processors. The function `pmapbatch_elementwise` is also exported that passes the elements to the function one-by-one as unwrapped tuples. This produces the same result as `pmap` where each worker is assigned batches of approximately equal sizes taken from the iterator product.
Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Thirdly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last active worker. The tasks are also approximately evenly distributed across processors. The exported function `pmapbatch_elementwise` passes the elements to the function one-by-one as splatted tuples. This produces the same result as `pmap` for a single range as the argument.

### pmapbatch and pmapbatch_elementwise

Expand All @@ -106,7 +122,7 @@ julia> Tuple(p)

### pmapsum and pmapreduce

Often a parallel execution is followed by a reduction (eg. a sum over the results). A reduction may be commutative (in which case the order of results do not matter), or non-commutative (in which the order does matter). There are two functions that are exported that carry out these tasks: `pmapreduce_commutative` and `pmapreduce`, where the former does not preserve ordering and the latter does. For convenience, the package also provides the function `pmapsum` that chooses `sum` as the reduction operator. The map-reduce operation is similar in many ways to the distributed `for` loop provided by julia, but the main difference is that the reduction operation is not binary for the functions in this package (eg. we need `sum` and not `(+)`to add the results). There is also the difference as above that the function gets the parameters in batches, with functions having the suffix `_elementwise` taking on parameters individually as unwrapped tuples as above. The function `pmapreduce` does not take on parameters elementwise at this point, although this might be implemented in the future.
Often a parallel execution is followed by a reduction (eg. a sum over the results). A reduction may be commutative (in which case the order of results do not matter), or non-commutative (in which the order does matter). There are two functions that are exported that carry out these tasks: `pmapreduce_commutative` and `pmapreduce`, where the former does not preserve ordering and the latter does. For convenience, the package also provides the function `pmapsum` that chooses `sum` as the reduction operator. The map-reduce operation is similar in many ways to the distributed `for` loop provided by julia, but the main difference is that the reduction operation is not binary for the functions in this package (eg. we need `sum` and not `(+)`to add the results). There is also the difference as above that the function gets the parameters in batches, with functions having the suffix `_elementwise` taking on parameters individually as splatted `Tuple`s. The function `pmapreduce` does not take on parameters elementwise at this point, although this might be implemented in the future.

As an example, to sum up a list of numbers in parallel we may call
```julia
Expand Down Expand Up @@ -137,7 +153,7 @@ julia> workers()
2
3

# The signature is pmapreduce(fmap,freduce,iterable)
# The signature is pmapreduce(fmap, freduce, range_or_tuple_of_ranges)
julia> pmapreduce(x -> ones(2).*myid(), x -> hcat(x...), 1:nworkers())
2×2 Array{Float64,2}:
2.0 3.0
Expand All @@ -146,7 +162,7 @@ julia> pmapreduce(x -> ones(2).*myid(), x -> hcat(x...), 1:nworkers())

The functions `pmapreduce` produces the same result as `pmapreduce_commutative` if the reduction operator is commutative (ie. the order of results received from the children workers does not matter).

The function `pmapsum` sets the reduction operator to be a sum.
The function `pmapsum` sets the reduction function to `sum`.

```julia
julia> sum(workers())
Expand All @@ -162,13 +178,13 @@ julia> pmapsum(x -> ones(2).*myid(), 1:nworkers())
It is possible to specify the return types of the map and reduce operations in these functions. To specify the return types use the following variants:

```julia
# Signature is pmapreduce(fmap, Tmap, freduce, Treduce, iterators)
# Signature is pmapreduce(fmap, Tmap, freduce, Treduce, range_or_tuple_of_ranges)
julia> pmapreduce(x -> ones(2).*myid(), Vector{Float64}, x -> hcat(x...), Matrix{Float64}, 1:nworkers())
2×2 Array{Float64,2}:
2.0 3.0
2.0 3.0

# Signature is pmapsum(fmap, Tmap, iterators)
# Signature is pmapsum(fmap, Tmap, range_or_tuple_of_ranges)
julia> pmapsum(x -> ones(2).*myid(), Vector{Float64}, 1:nworkers())
2-element Array{Float64,1}:
5.0
Expand Down Expand Up @@ -244,7 +260,7 @@ julia> collect(ps)
where the object loops over values of `(x,y,z)`, and the values are sorted in reverse lexicographic order (the last index increases the slowest while the first index increases the fastest). The ranges roll over as expected. The tasks are evenly distributed with the remainders being split among the first few processors. In this example the first six processors receive 4 tasks each and the last four receive 3 each. We can see this by evaluating the length of the `ProductSplit` operator on each processor

```julia
julia> Tuple(length(ProductSplit((xrange,yrange,zrange),10,i)) for i=1:10)
julia> Tuple(length(ProductSplit((xrange,yrange,zrange), 10, i)) for i=1:10)
(4, 4, 4, 4, 4, 4, 3, 3, 3, 3)
```

Expand All @@ -268,11 +284,11 @@ julia> xrange_long,yrange_long,zrange_long = 1:3000,1:3000,1:3000

julia> params_long = (xrange_long,yrange_long,zrange_long);

julia> ps_long = ProductSplit(params_long,10,4)
julia> ps_long = ProductSplit(params_long, 10, 4)
ProductSplit{Tuple{Int64,Int64,Int64},3,UnitRange{Int64}}((1:3000, 1:3000, 1:3000), (0, 3000, 9000000), 10, 4, 8100000001, 10800000000)

# Evaluate length using random ranges to avoid compiler optimizations
julia> @btime length(p) setup=(n=rand(3000:4000);p=ProductSplit((1:n,1:n,1:n),200,2));
julia> @btime length(p) setup = (n = rand(3000:4000); p = ProductSplit((1:n,1:n,1:n), 200, 2));
2.674 ns (0 allocations: 0 bytes)

julia> @btime $ps_long[1000000] # also fast, does not iterate
Expand Down
29 changes: 13 additions & 16 deletions src/mapreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,9 @@ or iterate over one to access individual tuples of integers.
The reduction function `freduce` is expected to accept a collection of mapped values.
Note that this is different from the standard `mapreduce` operation in julia that
expects a binary reduction operator. For example, `fmap` should be
`sum` and not `+`. In case a binary operator `op` is to be passed, one may wrap it in
an anonymous function as `x->reduce(op,x)`, or as `x->op(x...)` in case the operator
accepts multiple arguments that are processed in pairs.
expects a binary reduction operator. For example, `freduce` should be
`sum` and not `+`. In case a binary operator `op` is to be used in the reduction, one may pass it
as `Base.splat(op)` or wrap it in an anonymous function as `x -> op(x...)`.
Arguments `mapargs` and keyword arguments `mapkwargs` — if provided — are
passed on to the mapping function `fmap`.
Expand Down Expand Up @@ -368,10 +367,9 @@ results obtained may be incorrect otherwise.
The reduction function `freduce` is expected to accept a collection of mapped values.
Note that this is different from the standard `mapreduce` operation in julia that
expects a binary reduction operator. For example, `fmap` should be
`sum` and not `+`. In case a binary operator `op` is to be passed, one may wrap it in
an anonymous function as `x->reduce(op,x)`, or as `x->op(x...)` in case the operator
accepts multiple arguments that are processed in pairs.
expects a binary reduction operator. For example, `freduce` should be
`sum` and not `+`. In case a binary operator `op` is to be used in the reduction, one may pass it
as `Base.splat(op)` or wrap it in an anonymous function as `x -> op(x...)`.
Arguments `mapargs` and keyword arguments `mapkwargs` — if provided — are
passed on to the mapping function `fmap`.
Expand Down Expand Up @@ -513,9 +511,8 @@ or iterate over one to access individual tuples of integers.
The reduction function `freduce` is expected to accept a collection of mapped values.
Note that this is different from the standard `mapreduce` operation in julia that
expects a binary reduction operator. For example, `fmap` should be
`sum` and not `+`. In case a binary operator `op` is to be passed, one may wrap it in
an anonymous function as `x->reduce(op,x)`, or as `x->op(x...)` in case the operator
accepts multiple arguments that are processed in pairs.
`sum` and not `+`. In case a binary operator `op` is to be used in the reduction, one may pass it
as `Base.splat(op)` or wrap it in an anonymous function as `x -> op(x...)`.
Arguments `mapargs` and keyword arguments `mapkwargs` — if provided — are
passed on to the mapping function `fmap`.
Expand Down Expand Up @@ -573,10 +570,9 @@ part of the entire parameter space sequentially. The argument
`iterators` needs to be a strictly-increasing range,
or a tuple of such ranges. The outer product of these ranges forms the
entire range of parameters that is processed in batches on
the workers.
Arguments `mapargs` and keyword arguments `mapkwargs` — if provided — are
the workers. Arguments `mapargs` and keyword arguments `mapkwargs` — if provided — are
passed on to the function `f`.
Additionally, the number of workers to be used may be specified using the
keyword argument `num_workers`. In this case the first `num_workers` available
workers are used in the evaluation.
Expand All @@ -597,7 +593,7 @@ function pmapbatch(f::Function, iterators::Tuple, args...;
end

function pmapbatch(f::Function, ::Type{T}, iterators::Tuple, args...;
num_workers = nworkersactive(iterators),kwargs...) where {T}
num_workers = nworkersactive(iterators), kwargs...) where {T}

procs_used = workersactive(iterators)
if num_workers < length(procs_used)
Expand Down Expand Up @@ -634,7 +630,8 @@ part of the entire parameter space sequentially. The argument
`iterators` needs to be a strictly-increasing range of intergers,
or a tuple of such ranges. The outer product of these ranges forms the
entire range of parameters that is processed elementwise by the function `f`.
Given `n` ranges in `iterators`, the function `f` will receive `n` integers
The individual tuples are splatted and passed as arguments to `f`.
Given `n` ranges in `iterators`, the function `f` will receive `n` values
at a time.
Arguments `mapargs` and keyword arguments `mapkwargs` — if provided — are
Expand Down
57 changes: 29 additions & 28 deletions src/productsplit.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
ParallelUtilities.AbstractConstrainedProduct{T,N}
Supertype of [`ParallelUtilities.ProductSplit`](@ref) and [`ParallelUtilities.ProductSection`](@ref).
"""
abstract type AbstractConstrainedProduct{T,N} end
Base.eltype(::AbstractConstrainedProduct{T}) where {T} = T
Base.ndims(::AbstractConstrainedProduct{<:Any,N}) where {N} = N
Expand Down Expand Up @@ -132,7 +137,7 @@ ProductSplit(::Tuple{},::Integer,::Integer) = throw(ArgumentError("Need at least
"""
ProductSection(iterators::Tuple{Vararg{AbstractRange}}, inds::AbstractUnitRange)
Construct a `ProductSection` iterator that represents a view of the outer product
Construct a `ProductSection` iterator that represents a 1D view of the outer product
of the ranges provided in `iterators`, with the range of indices in the view being
specified by `inds`.
Expand All @@ -147,7 +152,7 @@ julia> collect(p)
(1, 6)
(2, 6)
julia> collect(p) == collect(Iterators.product(1:3,4:6))[5:8]
julia> collect(p) == collect(Iterators.product(1:3, 4:6))[5:8]
true
```
"""
Expand Down Expand Up @@ -206,7 +211,7 @@ outer product of the iterators.
# Examples
```jldoctest
julia> ps = ProductSplit((1:5,2:4,1:3),7,1);
julia> ps = ProductSplit((1:5, 2:4, 1:3), 7, 1);
julia> ParallelUtilities.childindex(ps, 6)
(1, 2, 1)
Expand Down Expand Up @@ -241,9 +246,9 @@ given an index of a `AbstractConstrainedProduct`.
# Examples
```jldoctest
julia> ps = ProductSplit((1:5,2:4,1:3), 7, 3);
julia> ps = ProductSplit((1:5, 2:4, 1:3), 7, 3);
julia> cinds = ParallelUtilities.childindexshifted(ps,3)
julia> cinds = ParallelUtilities.childindexshifted(ps, 3)
(2, 1, 2)
julia> getindex.(ps.iterators, cinds) == ps[3]
Expand Down Expand Up @@ -341,12 +346,11 @@ end
ParallelUtilities.nelements(ps::AbstractConstrainedProduct; dim::Integer)
ParallelUtilities.nelements(ps::AbstractConstrainedProduct, dim::Integer)
Compute the number of unique values in the element number `dim` of the tuples
that are returned when `ps` is iterated over.
Compute the number of unique values in the section of the `dim`-th range contained in `ps`.
# Examples
```jldoctest
julia> ps = ProductSplit((1:5,2:4,1:3),7,3);
julia> ps = ProductSplit((1:5, 2:4, 1:3), 7, 3);
julia> collect(ps)
7-element Array{Tuple{Int64,Int64,Int64},1}:
Expand All @@ -358,14 +362,14 @@ julia> collect(ps)
(5, 2, 2)
(1, 3, 2)
julia> ParallelUtilities.nelements(ps,3)
2
julia> ParallelUtilities.nelements(ps, 1)
5
julia> ParallelUtilities.nelements(ps,2)
julia> ParallelUtilities.nelements(ps, 2)
3
julia> ParallelUtilities.nelements(ps,1)
5
julia> ParallelUtilities.nelements(ps, 3)
2
```
"""
nelements(ps::AbstractConstrainedProduct; dim::Integer) = nelements(ps,dim)
Expand Down Expand Up @@ -402,8 +406,7 @@ end
maximum(ps::AbstractConstrainedProduct; dim::Integer)
maximum(ps::AbstractConstrainedProduct, dim::Integer)
Compute the maximum value of the range number `dim` that is
contained in `ps`.
Compute the maximum value of the section of the `dim`-th range contained in `ps`.
# Examples
```jldoctest
Expand Down Expand Up @@ -457,12 +460,11 @@ end
minimum(ps::AbstractConstrainedProduct; dim::Integer)
minimum(ps::AbstractConstrainedProduct, dim::Integer)
Compute the minimum value of the range number `dim` that is
contained in `ps`.
Compute the minimum value of the section of the `dim`-th range contained in `ps`.
# Examples
```jldoctest
julia> ps = ProductSplit((1:2,4:5), 2, 1);
julia> ps = ProductSplit((1:2, 4:5), 2, 1);
julia> collect(ps)
2-element Array{Tuple{Int64,Int64},1}:
Expand Down Expand Up @@ -512,12 +514,11 @@ end
extrema(ps::AbstractConstrainedProduct; dim::Integer)
extrema(ps::AbstractConstrainedProduct, dim::Integer)
Compute the minimum and maximum of the range number `dim` that is
contained in `ps`.
Compute the `extrema` of the section of the `dim`-th range contained in `ps`.
# Examples
```jldoctest
julia> ps = ProductSplit((1:2,4:5), 2, 1);
julia> ps = ProductSplit((1:2, 4:5), 2, 1);
julia> collect(ps)
2-element Array{Tuple{Int64,Int64},1}:
Expand Down Expand Up @@ -574,11 +575,11 @@ end
"""
extremadims(ps::AbstractConstrainedProduct)
Compute the extrema of all the ranges contained in `ps`.
Compute the extrema of the sections of all the ranges contained in `ps`.
# Examples
```jldoctest
julia> ps = ProductSplit((1:2,4:5), 2, 1);
julia> ps = ProductSplit((1:2, 4:5), 2, 1);
julia> collect(ps)
2-element Array{Tuple{Int64,Int64},1}:
Expand All @@ -601,9 +602,9 @@ _extremadims(::AbstractConstrainedProduct, ::Integer, ::Tuple{}) = ()
Return the reverse-lexicographic extrema of values taken from
ranges contained in `ps`, where the pairs of ranges are constructed
by concatenating each dimension with the last one.
by concatenating the ranges along each dimension with the last one.
For two ranges this simply returns ([first(ps)],[last(ps)]).
For two ranges this simply returns `([first(ps)], [last(ps)])`.
# Examples
```jldoctest
Expand Down Expand Up @@ -791,7 +792,7 @@ the ranges in `iterators`.
# Examples
```jldoctest
julia> iters = (1:10,4:6,1:4);
julia> iters = (1:10, 4:6, 1:4);
julia> ps = ProductSplit(iters, 5, 2);
Expand Down Expand Up @@ -833,7 +834,7 @@ is not found.
# Examples
```jldoctest
julia> ps = ProductSplit((1:3,4:5:20), 3, 2);
julia> ps = ProductSplit((1:3, 4:5:20), 3, 2);
julia> collect(ps)
4-element Array{Tuple{Int64,Int64},1}:
Expand Down Expand Up @@ -902,7 +903,7 @@ resulting `ProductSection` will be the same as in `ps`.
# Examples
```jldoctest
julia> ps = ProductSplit((1:5,2:4,1:3),7,3);
julia> ps = ProductSplit((1:5, 2:4, 1:3), 7, 3);
julia> collect(ps)
7-element Array{Tuple{Int64,Int64,Int64},1}:
Expand Down
4 changes: 2 additions & 2 deletions src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ workers are chosen.
gethostnames(procs = workers())
Return the hostname of each worker in `procs`. This is obtained by evaluating
`Libc.gethostname()` on each worker.
`Libc.gethostname()` on each worker asynchronously.
"""
function gethostnames(procs = workers())
hostnames = Vector{String}(undef,length(procs))
hostnames = Vector{String}(undef, length(procs))
@sync for (ind,p) in enumerate(procs)
@async hostnames[ind] = @fetchfrom p Libc.gethostname()
end
Expand Down

2 comments on commit c4bdbaf

@jishnub
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/19777

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.7.5 -m "<description of version>" c4bdbafe538e34ba3ff77a7836866914c3360634
git push origin v0.7.5

Please sign in to comment.