Skip to content

Commit

Permalink
#74 added a function to call MPI as a subprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
felipeZ committed Sep 20, 2017
1 parent ce64782 commit fedb425
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
4 changes: 2 additions & 2 deletions nac/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ def search_data_in_hdf5(path_hdf5, xs):


def store_arrays_in_hdf5(path_hdf5: str, paths, tensor: Array,
dtype=np.float32, driver=None, comm=None)-> None:
dtype=np.float32)-> None:
"""
Store the corrected overlaps in the HDF5 file
"""
with h5py.File(path_hdf5, 'r+', driver=driver, comm=comm) as f5:
with h5py.File(path_hdf5, 'r+') as f5:
if isinstance(paths, list):
for k, path in enumerate(paths):
data = tensor[k]
Expand Down
52 changes: 43 additions & 9 deletions nac/mpi/mpi_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,50 @@

from itertools import accumulate
from mpi4py import MPI
from nac.common import (Array, product)
from nac.common import (
Array, product, retrieve_hdf5_data, store_arrays_in_hdf5)
from subprocess import (PIPE, Popen)
import logging
import numpy as np

# Type hints
from typing import (Callable, List, Tuple)
from typing import (Callable, Tuple)

# Starting logger
logger = logging.getLogger(__name__)


def call_mpi_as_subprocess(
path_hdf5: str, n_processes: int, cmd_line_args: str):
"""
Execute a function using mpi calling it in a seperated thread.
:param path_hdf5: Path to the HDF5 where the resulting data will be store.
:param n_processes: Number of MPI processes to use.
:param cmd_line_args: python command to run with MPI.
"""
# Call MPI
cmd = "mpiexec -n {} python {}".format(n_processes, cmd_line_args)
p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
output, error = p.communicate()
logger.info("MPI out_put: ".format(output))

# Check for errors
if error:
raise RuntimeError(error.decode())
else:
pass


def distribute_tensor_mpi(
fun: Callable, arr: Array, path_hdf5: str, paths_results: List,
*args, **kwargs) -> None:
fun: Callable, path_hdf5: str, path_tensor: str, path_results: str,
shape_tensor: Tuple, *args, **kwargs) -> None:
"""
Use MPI to distribute a computation in all the available CPUs.
:param fun: Function to execute in each MPI processes.
:param arr: Tensor (Vector, Matrix, etc.) to split among the workers.
:param path_hdf5: Path to the HDF5 where the resulting data will be store.
:param path_tensor: path to the Tensor (Vector, Matrix, etc.) in the HDF5
:param paths_results: Path to the nodes in the HDF5 where the results are
stored.
:param args: Extra positional arguments for `fun`
Expand All @@ -27,18 +55,24 @@ def distribute_tensor_mpi(
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
print(size)

# Retrieve the tensor to be split among the processes
if rank == 0:
arr = retrieve_hdf5_data(path_hdf5, path_tensor)
else:
arr = np.empty(shape_tensor)

# Shapes and array dimension information to distribute the computation
shapes = calculate_strides_and_displacements(arr, rank, size)
if rank == 0:
print(shapes)
shape_worker, strides, displacements = shapes

# Array containing the corresponding chunk sliced from the input
recv_buffer = np.empty(shape_worker)

# Distribute the input array to the other processes
comm.Scatterv([arr, strides, displacements, MPI.DOUBLE], recv_buffer)

# Invoke the worker function
recv_buffer = fun(recv_buffer, *args, **kwargs)

Expand All @@ -52,7 +86,7 @@ def distribute_tensor_mpi(
comm.Barrier()
comm.Gatherv(recv_buffer, [rs, strides, displacements, MPI.DOUBLE])
if rank == 0:
np.save("array", rs)
store_arrays_in_hdf5(path_hdf5, path_results, rs)

MPI.Finalize()

Expand Down

0 comments on commit fedb425

Please sign in to comment.