Skip to content

Commit

Permalink
pmapbatch with type
Browse files Browse the repository at this point in the history
  • Loading branch information
jishnub committed Aug 16, 2020
1 parent 3d323af commit 75cb555
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 479 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Manifest.toml
*.cov
coverage
docs/build
14 changes: 11 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@ julia:
- 1.2
- 1
- nightly
matrix:
jobs:
allow_failures:
- julia: nightly
fast_finish: true
include:
- stage: "Documentation"
julia: 1.4
os: linux
script:
- julia --project=docs/ -e 'using Pkg; Pkg.develop(PackageSpec(path=pwd()));
Pkg.instantiate()'
- julia --project=docs/ docs/make.jl
after_success: skip
notifications:
email: false
after_success:
- julia -e 'using Pkg; Pkg.add("Coverage"); using Coverage; Coveralls.submit(process_folder())'
- julia -e 'using Pkg; Pkg.add("Coverage"); using Coverage; Codecov.submit(process_folder())'
- julia -e 'using Pkg; Pkg.add("Coverage"); using Coverage; Codecov.submit(process_folder())'
102 changes: 31 additions & 71 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# ParallelUtilities.jl

[![Build Status](https://travis-ci.com/jishnub/ParallelUtilities.jl.svg?branch=master)](https://travis-ci.com/jishnub/ParallelUtilities.jl)
[![Coverage Status](https://coveralls.io/repos/github/jishnub/ParallelUtilities.jl/badge.svg?branch=master)](https://coveralls.io/github/jishnub/ParallelUtilities.jl?branch=master)
[![codecov](https://codecov.io/gh/jishnub/ParallelUtilities.jl/branch/master/graph/badge.svg)](https://codecov.io/gh/jishnub/ParallelUtilities.jl)
[![Stable](https://img.shields.io/badge/docs-stable-blue.svg)](https://jishnub.github.io/ParallelUtilities.jl/stable)
[![Dev](https://img.shields.io/badge/docs-dev-blue.svg)](https://jishnub.github.io/ParallelUtilities.jl/dev)

Parallel mapreduce and other helpful functions for HPC, meant primarily for embarassingly parallel operations that often require one to split up a list of tasks into subsections that can be processed on individual cores.

Expand All @@ -14,29 +15,6 @@ Install the package using
pkg> add ParallelUtilities
julia> using ParallelUtilities
```

# Exported functions

* `pmap`-related functions
* `pmapreduce`
* `pmapreduce_commutative`
* `pmapsum`
* `pmapreduce_elementwise`
* `pmapsum_elementwise`
* Functions to evenly split a Tuple of ranges
* `ProductSplit`
* `ntasks`
* `whichproc`
* `procrange_recast`
* `localindex`
* `whichproc_localindex`
* `extremadims`
* `extrema_commonlastdim`
* Utility functions to query the cluster
* `gethostnames`
* `nodenames`
* `nprocs_node`

# Quick start

```julia
Expand All @@ -47,17 +25,17 @@ julia> addprocs(2)

julia> @everywhere using ParallelUtilities

julia> pmapreduce(x->ones(2).*myid(),x->hcat(x...),1:nworkers())
julia> pmapreduce(x -> ones(2).*myid(), x -> hcat(x...), 1:nworkers())
2×2 Array{Float64,2}:
2.0 3.0
2.0 3.0

julia> pmapreduce_commutative(x->ones(2).*myid(),sum,1:nworkers())
julia> pmapreduce_commutative(x -> ones(2).*myid(), sum, 1:nworkers())
2-element Array{Float64,1}:
5.0
5.0

julia> pmapsum(x->ones(2).*myid(),1:nworkers())
julia> pmapsum(x -> ones(2).*myid(), 1:nworkers())
2-element Array{Float64,1}:
5.0
5.0
Expand All @@ -76,7 +54,7 @@ julia> @everywhere begin
where each parameter takes up values in a range, and we would like to sample the entire parameter space. As an example, we choose the ranges to be

```julia
julia> xrange,yrange,zrange = 1:3,2:4,3:6 # ranges should be strictly increasing
julia> xrange, yrange, zrange = 1:3, 2:4, 3:6 # ranges should be strictly increasing
```

There are a total of 36 possible `(x,y,z)` combinations possible given these ranges. Let's say that we would like to split the evaluation of the function over 10 processors. We describe the simple way to evaluate this and then explain how this is achieved.
Expand Down Expand Up @@ -111,33 +89,16 @@ Secondly, the iterator is passed to the function in batches and not elementwise,
As an example we demonstrate how to evaluate the function `f` for the ranges of parameters listed above:

```julia
julia> p = pmapbatch_elementwise(f,(xrange,yrange,zrange));
julia> p = pmapbatch_elementwise(f, (xrange,yrange,zrange));

julia> Tuple(p)
(6, 7, 8, 7, 8, 9, 8, 9, 10, 7, 8, 9, 8, 9, 10, 9, 10, 11, 8, 9, 10, 9, 10, 11, 10, 11, 12, 9, 10, 11, 10, 11, 12, 11, 12, 13)

# Check for correctness
julia> p == map(f,vec(collect(Iterators.product(xrange,yrange,zrange))))
true

# pmapbatch_elementwise produces the same result as pmap, although the internals are different
julia> pmapbatch_elementwise(x->x^2,1:3)
3-element Array{Int64,1}:
1
4
9

julia> pmap(x->x^2,1:3)
3-element Array{Int64,1}:
1
4
9
```

There is also a function `pmapbatch` that deals with batches of parameters that are passed to each processor, and `pmap_elementwise` calls this function under the hood to process the parameters one by one. We may use this directly as well if we need the entire batch for some reason (eg. reading values off a disk, which needs to be done once for the entire set and not for every parameter). As an example we demonstrate how to obtain the same result as above using `pmapbatch`:

```julia
julia> p = pmapbatch(x->[f(i...) for i in x],(xrange,yrange,zrange));
julia> p = pmapbatch(x->[f(i...) for i in x], (xrange,yrange,zrange));

julia> Tuple(p)
(6, 7, 8, 7, 8, 9, 8, 9, 10, 7, 8, 9, 8, 9, 10, 9, 10, 11, 8, 9, 10, 9, 10, 11, 10, 11, 12, 9, 10, 11, 10, 11, 12, 11, 12, 13)
Expand All @@ -149,22 +110,22 @@ Often a parallel execution is followed by a reduction (eg. a sum over the result

As an example, to sum up a list of numbers in parallel we may call
```julia
julia> pmapsum_elementwise(identity,1:1000)
julia> pmapsum_elementwise(identity, 1:1000)
500500
```

Here the mapped function is taken to by `identity` which just returns its argument. To sum the squares of the numbers in a list we may use

```julia
julia> pmapsum_elementwise(x->x^2,1:1000)
julia> pmapsum_elementwise(x -> x^2, 1:1000)
333833500
```

We may choose an arbitrary reduction operator in the function `pmapreduce` and `pmapreduce_commutative`, and the elementwise function `pmapreduce_commutative_elementwise`. The reductions are carried out as a binary tree across all workers.

```julia
# Compute 1^2 * 2^2 * 3^2 in parallel
julia> pmapreduce_commutative_elementwise(x->x^2,prod,1:3)
julia> pmapreduce_commutative_elementwise(x -> x^2, prod, 1:3)
36
```

Expand All @@ -177,7 +138,7 @@ julia> workers()
3

# The signature is pmapreduce(fmap,freduce,iterable)
julia> pmapreduce(x->ones(2).*myid(),x->hcat(x...),1:nworkers())
julia> pmapreduce(x -> ones(2).*myid(), x -> hcat(x...), 1:nworkers())
2×2 Array{Float64,2}:
2.0 3.0
2.0 3.0
Expand All @@ -192,7 +153,7 @@ julia> sum(workers())
5

# We compute ones(2).*sum(workers()) in parallel
julia> pmapsum(x->ones(2).*myid(),1:nworkers())
julia> pmapsum(x -> ones(2).*myid(), 1:nworkers())
2-element Array{Float64,1}:
5.0
5.0
Expand All @@ -201,14 +162,14 @@ julia> pmapsum(x->ones(2).*myid(),1:nworkers())
It is possible to specify the return types of the map and reduce operations in these functions. To specify the return types use the following variants:

```julia
# Signature is pmapreduce(fmap,Tmap,freduce,Treduce,iterators)
julia> pmapreduce(x->ones(2).*myid(),Vector{Float64},x->hcat(x...),Matrix{Float64},1:nworkers())
# Signature is pmapreduce(fmap, Tmap, freduce, Treduce, iterators)
julia> pmapreduce(x -> ones(2).*myid(), Vector{Float64}, x -> hcat(x...), Matrix{Float64}, 1:nworkers())
2×2 Array{Float64,2}:
2.0 3.0
2.0 3.0

# Signature is pmapsum(fmap,Tmap,iterators)
julia> pmapsum(x->ones(2).*myid(),Vector{Float64},1:nworkers())
# Signature is pmapsum(fmap, Tmap, iterators)
julia> pmapsum(x -> ones(2).*myid(), Vector{Float64}, 1:nworkers())
2-element Array{Float64,1}:
5.0
5.0
Expand All @@ -219,13 +180,13 @@ Specifying the types would lead to a type coercion if possible, or an error if a
```julia
# The result is converted from Vector{Float64} to Vector{Int}.
# Conversion works as the numbers are integers
julia> pmapsum(x->ones(2).*myid(),Vector{Int},1:nworkers())
julia> pmapsum(x -> ones(2).*myid(), Vector{Int}, 1:nworkers())
2-element Array{Int64,1}:
5
5

# Conversion fails here as the numbers aren't integers
julia> pmapsum(x->rand(2),Vector{Int},1:nworkers())
julia> pmapsum(x -> rand(2), Vector{Int}, 1:nworkers())
ERROR: On worker 2:
InexactError: Int64(0.7742577217010362)
```
Expand All @@ -236,12 +197,12 @@ The progress of the map-reduce operation might be tracked by setting the keyword

```julia
# Running on 8 workers, artificially induce load using sleep
julia> pmapreduce(x->(sleep(myid());myid()),x->hcat(x...),1:nworkers(),showprogress=true)
julia> pmapreduce(x -> (sleep(myid()); myid()), x -> hcat(x...), 1:nworkers(), showprogress=true)
Progress in pmapreduce : 100%|██████████████████████████████████████████████████| Time: 0:00:09
1×8 Array{Int64,2}:
2 3 4 5 6 7 8 9

julia> pmapreduce(x->(sleep(myid());myid()),x->hcat(x...),1:nworkers(),showprogress=true,progressdesc="Progress : ")
julia> pmapreduce(x -> (sleep(myid()); myid()), x -> hcat(x...), 1:nworkers(), showprogress=true, progressdesc="Progress : ")
Progress : 100%|████████████████████████████████████████████████████████████████| Time: 0:00:09
1×8 Array{Int64,2}:
2 3 4 5 6 7 8 9
Expand All @@ -264,14 +225,13 @@ with appropriately chosen parameters, and in many ways a `ProductSplit` behaves
The signature of the constructor is

```julia
ProductSplit(tuple_of_ranges,number_of_processors,processor_rank)
ProductSplit(tuple_of_ranges, number_of_processors, processor_rank)
```

where `processor_rank` takes up values in `1:number_of_processors`. Note that this is different from MPI where the rank starts from 0. For example, we check the tasks that are passed on to the processor number 4:

```julia
julia> ps = ProductSplit((xrange,yrange,zrange),10,4)
ProductSplit{Tuple{Int64,Int64,Int64},3,UnitRange{Int64}}((1:3, 2:4, 3:5), (0, 3, 9), 10, 4, 10, 12)
julia> ps = ProductSplit((xrange, yrange, zrange), 10, 4);

julia> collect(ps)
4-element Array{Tuple{Int64,Int64,Int64},1}:
Expand Down Expand Up @@ -337,10 +297,10 @@ julia> val = (3,3,4)
julia> val in ps
true

julia> localindex(ps,val)
julia> localindex(ps, val)
3

julia> val=(10,2,901);
julia> val = (10,2,901);

julia> @btime $val in $ps_long
50.183 ns (0 allocations: 0 bytes)
Expand All @@ -354,10 +314,10 @@ julia> @btime localindex($ps_long, $val)
Another useful function is `whichproc` that returns the rank of the processor a specific set of parameters will be on, given the total number of processors. This is also computed using a binary search.

```julia
julia> whichproc(params_long,val,10)
julia> whichproc(params_long, val, 10)
4

julia> @btime whichproc($params_long,$val,10)
julia> @btime whichproc($params_long, $val, 10)
1.264 μs (14 allocations: 448 bytes)
4
```
Expand All @@ -367,18 +327,18 @@ julia> @btime whichproc($params_long,$val,10)
We can compute the ranges of each variable on any processor in `O(1)` time.

```julia
julia> extrema(ps,dim=2) # extrema of the second parameter on this processor
julia> extrema(ps, dim=2) # extrema of the second parameter on this processor
(3, 4)

julia> Tuple(extrema(ps,dim=i) for i in 1:3)
julia> Tuple(extrema(ps, dim=i) for i in 1:3)
((1, 3), (3, 4), (4, 4))

# Minimum and maximum work similarly

julia> (minimum(ps,dim=2),maximum(ps,dim=2))
julia> (minimum(ps, dim=2), maximum(ps, dim=2))
(3, 4)

julia> @btime extrema($ps_long,dim=2)
julia> @btime extrema($ps_long, dim=2)
52.813 ns (0 allocations: 0 bytes)
(1, 3000)
```
5 changes: 5 additions & 0 deletions docs/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[deps]
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"

[compat]
Documenter = "0.25"
23 changes: 23 additions & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Documenter
using ParallelUtilities

DocMeta.setdocmeta!(ParallelUtilities, :DocTestSetup, :(using ParallelUtilities); recursive=true)

makedocs(;
modules=[ParallelUtilities],
authors="Jishnu Bhattacharya",
repo="https://github.com/jishnub/ParallelUtilities.jl/blob/{commit}{path}#L{line}",
sitename="ParallelUtilities.jl",
format=Documenter.HTML(;
prettyurls=get(ENV, "CI", "false") == "true",
canonical="https://jishnub.github.io/ParallelUtilities.jl",
assets=String[],
),
pages=[
"Reference" => "index.md",
],
)

deploydocs(;
repo="github.com/jishnub/ParallelUtilities.jl",
)
9 changes: 9 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
```@meta
CurrentModule = ParallelUtilities
```

# ParallelUtilities.jl

```@autodocs
Modules = [ParallelUtilities]
```
Loading

0 comments on commit 75cb555

Please sign in to comment.