Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes virtual file merge logic & removes deprecated distributed set merge logic #206

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions dedalus/tests_parallel/test_output_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
@pytest.mark.parametrize('dtype', [np.float64, np.complex128])
@pytest.mark.parametrize('dealias', [1, 3/2])
@pytest.mark.parametrize('output_scales', [1/2, 1, 3/2])
def test_cartesian_output(dtype, dealias, output_scales):
def test_cartesian_output_virtual(dtype, dealias, output_scales):
Nx = Ny = Nz = 16
Lx = Ly = Lz = 2 * np.pi
# Bases
Expand All @@ -41,18 +41,21 @@ def test_cartesian_output(dtype, dealias, output_scales):
# Output
tasks = [u, u(x=0), u(y=0), u(z=0), u(x=0,y=0), u(x=0,z=0), u(y=0,z=0), u(x=0,y=0,z=0),
v, v(x=0), v(y=0), v(z=0), v(x=0,y=0), v(x=0,z=0), v(y=0,z=0), v(x=0,y=0,z=0)]
output = solver.evaluator.add_file_handler('test_output', iter=1)
output = solver.evaluator.add_file_handler('test_output', iter=1, parallel='virtual')
for task in tasks:
output.add_task(task, layout='g', name=str(task), scales=output_scales)
solver.evaluator.evaluate_handlers([output])
solver.evaluator.evaluate_handlers([output], wall_time=1, timestep=1, sim_time=1, iteration=1)
# Check solution
errors = []
rank = d.comm.rank
with h5py.File('test_output/test_output_s1/test_output_s1_p{}.h5'.format(rank), mode='r') as file:
d.comm.Barrier()
with h5py.File('test_output/test_output_s1.h5', mode='r') as file:
for task in tasks:
task_saved = file['tasks'][str(task)][-1]
task_name = str(task)
task = task.evaluate()
task.change_scales(output_scales)
local_slices = (slice(None),) * len(task.tensorsig) + d.grid_layout.slices(task.domain, task.scales)
task_saved = file['tasks'][task_name][-1]
task_saved = task_saved[local_slices]
local_error = task['g'] - task_saved
if local_error.size:
errors.append(np.max(np.abs(task['g'] - task_saved)))
Expand All @@ -61,12 +64,11 @@ def test_cartesian_output(dtype, dealias, output_scales):
shutil.rmtree('test_output')
assert np.allclose(errors, 0)


@pytest.mark.mpi(min_size=4)
@pytest.mark.parametrize('dtype', [np.float64, np.complex128])
@pytest.mark.parametrize('dealias', [1, 3/2])
@pytest.mark.parametrize('output_scales', [1/2])
def test_cartesian_output_virtual(dtype, dealias, output_scales):
@pytest.mark.parametrize('output_scales', [1/2, 1, 3/2])
def test_cartesian_output_gather(dtype, dealias, output_scales):
Nx = Ny = Nz = 16
Lx = Ly = Lz = 2 * np.pi
# Bases
Expand All @@ -93,10 +95,10 @@ def test_cartesian_output_virtual(dtype, dealias, output_scales):
# Output
tasks = [u, u(x=0), u(y=0), u(z=0), u(x=0,y=0), u(x=0,z=0), u(y=0,z=0), u(x=0,y=0,z=0),
v, v(x=0), v(y=0), v(z=0), v(x=0,y=0), v(x=0,z=0), v(y=0,z=0), v(x=0,y=0,z=0)]
output = solver.evaluator.add_file_handler('test_output', iter=1, max_writes=1, virtual_file=True)
output = solver.evaluator.add_file_handler('test_output', iter=1, parallel='gather')
for task in tasks:
output.add_task(task, layout='g', name=str(task), scales=output_scales)
solver.evaluator.evaluate_handlers([output])
solver.evaluator.evaluate_handlers([output], wall_time=1, timestep=1, sim_time=1, iteration=1)
# Check solution
errors = []
d.comm.Barrier()
Expand All @@ -118,14 +120,14 @@ def test_cartesian_output_virtual(dtype, dealias, output_scales):

@pytest.mark.mpi(min_size=4)
@pytest.mark.parametrize('dtype', [np.float64, np.complex128])
@pytest.mark.parametrize('dealias', [1, 1.5, ])
@pytest.mark.parametrize('output_scales', [1/2, 1])
def test_cartesian_output_merged_virtual(dtype, dealias, output_scales):
@pytest.mark.parametrize('dealias', [1, 3/2])
@pytest.mark.parametrize('output_scales', [1/2, 1, 3/2])
def test_cartesian_output_mpio(dtype, dealias, output_scales):
Nx = Ny = Nz = 16
Lx = Ly = Lz = 2 * np.pi
# Bases
c = coords.CartesianCoordinates('x', 'y', 'z')
d = distributor.Distributor((c,), mesh=[2,2])
d = distributor.Distributor((c,), mesh=(2,2))
Fourier = {np.float64: basis.RealFourier, np.complex128: basis.ComplexFourier}[dtype]
xb = Fourier(c.coords[0], size=Nx, bounds=(0, Lx), dealias=dealias)
yb = Fourier(c.coords[1], size=Ny, bounds=(0, Ly), dealias=dealias)
Expand All @@ -147,16 +149,13 @@ def test_cartesian_output_merged_virtual(dtype, dealias, output_scales):
# Output
tasks = [u, u(x=0), u(y=0), u(z=0), u(x=0,y=0), u(x=0,z=0), u(y=0,z=0), u(x=0,y=0,z=0),
v, v(x=0), v(y=0), v(z=0), v(x=0,y=0), v(x=0,z=0), v(y=0,z=0), v(x=0,y=0,z=0)]
output = solver.evaluator.add_file_handler('test_output', iter=1, max_writes=1, virtual_file=True)
output = solver.evaluator.add_file_handler('test_output', iter=1, parallel='mpio')
for task in tasks:
output.add_task(task, layout='g', name=str(task), scales=output_scales)
solver.evaluator.evaluate_handlers([output])
solver.evaluator.evaluate_handlers([output], wall_time=1, timestep=1, sim_time=1, iteration=1)
# Check solution
errors = []

post.merge_virtual_analysis('test_output', cleanup=True)
d.comm.Barrier()

with h5py.File('test_output/test_output_s1.h5', mode='r') as file:
for task in tasks:
task_name = str(task)
Expand All @@ -175,9 +174,9 @@ def test_cartesian_output_merged_virtual(dtype, dealias, output_scales):

@pytest.mark.mpi(min_size=4)
@pytest.mark.parametrize('dtype', [np.float64, np.complex128])
@pytest.mark.parametrize('dealias', [1, 1.5, ])
@pytest.mark.parametrize('output_scales', [1/2, 1])
def test_cartesian_output_merged(dtype, dealias, output_scales):
@pytest.mark.parametrize('dealias', [1, 3/2])
@pytest.mark.parametrize('output_scales', [1/2, 1, 3/2])
def test_cartesian_output_merged_virtual(dtype, dealias, output_scales):
Nx = Ny = Nz = 16
Lx = Ly = Lz = 2 * np.pi
# Bases
Expand All @@ -204,14 +203,14 @@ def test_cartesian_output_merged(dtype, dealias, output_scales):
# Output
tasks = [u, u(x=0), u(y=0), u(z=0), u(x=0,y=0), u(x=0,z=0), u(y=0,z=0), u(x=0,y=0,z=0),
v, v(x=0), v(y=0), v(z=0), v(x=0,y=0), v(x=0,z=0), v(y=0,z=0), v(x=0,y=0,z=0)]
output = solver.evaluator.add_file_handler('test_output', iter=1, max_writes=1, virtual_file=False)
output = solver.evaluator.add_file_handler('test_output', iter=1, parallel='virtual')
for task in tasks:
output.add_task(task, layout='g', name=str(task), scales=output_scales)
solver.evaluator.evaluate_handlers([output])
solver.evaluator.evaluate_handlers([output], wall_time=1, timestep=1, sim_time=1, iteration=1)
# Check solution
errors = []

post.merge_analysis('test_output', cleanup=True)
post.merge_virtual_analysis('test_output', cleanup=True)
d.comm.Barrier()

with h5py.File('test_output/test_output_s1.h5', mode='r') as file:
Expand Down
183 changes: 24 additions & 159 deletions dedalus/tools/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,183 +166,48 @@ def merge_virtual_file(virtual_file_path, cleanup=False):
shutil.rmtree(folder)


def merge_analysis(base_path, cleanup=False):
"""
Merge distributed analysis sets from a FileHandler.

Parameters
----------
base_path : str or pathlib.Path
Base path of FileHandler output
cleanup : bool, optional
Delete distributed files after merging (default: False)

Notes
-----
This function is parallelized over sets, and so can be effectively
parallelized up to the number of distributed sets.

"""
set_path = pathlib.Path(base_path)
logger.info("Merging files from {}".format(base_path))

set_paths = get_assigned_sets(base_path, distributed=True)
for set_path in set_paths:
merge_distributed_set(set_path, cleanup=cleanup)


def merge_distributed_set(set_path, cleanup=False):
"""
Merge a distributed analysis set from a FileHandler.

Parameters
----------
set_path : str of pathlib.Path
Path to distributed analysis set folder
cleanup : bool, optional
Delete distributed files after merging (default: False)

"""
set_path = pathlib.Path(set_path)
logger.info("Merging set {}".format(set_path))

set_stem = set_path.stem
proc_paths = set_path.glob("{}_p*.h5".format(set_stem))
proc_paths = natural_sort(proc_paths)
joint_path = set_path.parent.joinpath("{}.h5".format(set_stem))

# Create joint file, overwriting if it already exists
with h5py.File(str(joint_path), mode='w') as joint_file:
# Setup joint file based on first process file (arbitrary)
merge_setup(joint_file, proc_paths)
# Merge data from all process files
for proc_path in proc_paths:
merge_data(joint_file, proc_path)
# Cleanup after completed merge, if directed
if cleanup:
for proc_path in proc_paths:
proc_path.unlink()
set_path.rmdir()


def merge_setup(joint_file, proc_paths, virtual=False):
def merge_virtual(joint_file, virtual_path):
"""
Merge HDF5 setup from part of a distributed analysis set into a joint file.

Parameters
----------
joint_file : HDF5 file
Joint file
proc_paths : list of [str or pathlib.Path]
List of files in a distributed analysis set
virtual: bool, optional
If True, merging a virtual file into a single file rather than distributed set

virtual_path : path to virtual file [str or pathlib.Path]
Virtual file
"""
proc_path0 = pathlib.Path(proc_paths[0])
virt_path = pathlib.Path(virtual_path)
logger.info("Merging setup for {}".format(joint_file))
with h5py.File(str(proc_path0), mode='r') as proc_file:
# File metadata
try:
joint_file.attrs['set_number'] = proc_file.attrs['set_number']
except KeyError:
joint_file.attrs['set_number'] = proc_file.attrs['file_number']
joint_file.attrs['handler_name'] = proc_file.attrs['handler_name']
try:
joint_file.attrs['writes'] = writes = proc_file.attrs['writes']
except KeyError:
joint_file.attrs['writes'] = writes = len(proc_file['scales']['write_number'])
with h5py.File(str(virt_path), mode='r') as virt_file:
# Copy scales (distributed files all have global scales)
if virtual:
proc_file.copy('scales', joint_file)
else:
needed_hashes = []
joint_scales = joint_file.create_group('scales')
for scalename in proc_file['scales']:
if 'hash_' in scalename:
needed_hashes.append(scalename)
else:
joint_scales.create_dataset(name=scalename, data=proc_file['scales'][scalename])
virt_file.copy('scales', joint_file)

# Tasks
joint_tasks = joint_file.create_group('tasks')
proc_tasks = proc_file['tasks']
for taskname in proc_tasks:
virt_tasks = virt_file['tasks']
for taskname in virt_tasks:
try:
joint_file.attrs['writes'] = writes = virt_file.attrs['writes']
except KeyError:
joint_file.attrs['writes'] = writes = len(virt_file['scales']['write_number'])
# Setup dataset with automatic chunking
proc_dset = proc_tasks[taskname]
if virtual:
joint_dset = joint_tasks.create_dataset(name=proc_dset.name, data=proc_dset)
else:
spatial_shape = proc_dset.attrs['global_shape']
joint_shape = (writes,) + tuple(spatial_shape)
joint_dset = joint_tasks.create_dataset(name=proc_dset.name,
shape=joint_shape,
dtype=proc_dset.dtype,
chunks=True)
virt_dset = virt_tasks[taskname]
joint_dset = joint_tasks.create_dataset(name=virt_dset.name, data=virt_dset)

# Dataset metadata
joint_dset.attrs['task_number'] = proc_dset.attrs['task_number']
joint_dset.attrs['constant'] = proc_dset.attrs['constant']
joint_dset.attrs['grid_space'] = proc_dset.attrs['grid_space']
joint_dset.attrs['scales'] = proc_dset.attrs['scales']

for i, proc_dim in enumerate(proc_dset.dims):
joint_dset.dims[i].label = proc_dim.label
if joint_dset.dims[i].label == 't':
for scalename in ['sim_time', 'world_time', 'wall_time', 'timestep', 'iteration', 'write_number']:
scale = joint_file['scales'][scalename]
joint_dset.dims.create_scale(scale, scalename)
joint_dset.dims[i].attach_scale(scale)
else:
if proc_dim.label == 'constant' or proc_dim.label == '':
joint_dset.attrs['grid_space'] = virt_dset.attrs['grid_space']

for i, virt_dim in enumerate(virt_dset.dims):
joint_dset.dims[i].label = virt_dim.label
if joint_dset.dims[i].label != 't':
if virt_dim.label == 'constant' or virt_dim.label == '':
scalename = 'constant'
else:
hashval = hashlib.sha1(np.array(proc_dset.dims[i][0])).hexdigest()
scalename = 'hash_' + hashval
if not virtual and scalename in needed_hashes:
if joint_shape[i] == 1:
scale_data = np.zeros(1)
else:
scale_data = np.zeros(joint_shape[i])
filled = np.zeros(joint_shape[i], dtype=bool)
for proc_path in proc_paths:
with h5py.File(proc_path, 'r') as pf:
start = pf['tasks'][taskname].attrs['start'][i-1]
stop = start+pf['tasks'][taskname].attrs['count'][i-1]
scale_data[start:stop] = pf['scales'][scalename]
filled[start:stop] = 1
if np.sum(filled) == scale_data.size: break #stop filling
joint_scales.create_dataset(name=scalename, data=scale_data)
needed_hashes.remove(scalename)
hashval = hashlib.sha1(np.array(virt_dset.dims[i][0])).hexdigest()
scalename = virt_dim.label + '_hash_' + hashval

scale = joint_file['scales'][scalename]
joint_dset.dims.create_scale(scale, scalename)
joint_dset.dims[i].attach_scale(scale)

def merge_data(joint_file, proc_path):
"""
Merge data from part of a distributed analysis set into a joint file.

Parameters
----------
joint_file : HDF5 file
Joint file
proc_path : str or pathlib.Path
Path to part of a distributed analysis set

"""
proc_path = pathlib.Path(proc_path)
logger.info("Merging data from {}".format(proc_path))

with h5py.File(str(proc_path), mode='r') as proc_file:
for taskname in proc_file['tasks']:
joint_dset = joint_file['tasks'][taskname]
proc_dset = proc_file['tasks'][taskname]
# Merge across spatial distribution
start = proc_dset.attrs['start']
count = proc_dset.attrs['count']
spatial_slices = tuple(slice(s, s+c) for (s,c) in zip(start, count))
# Merge maintains same set of writes
slices = (slice(None),) + spatial_slices
joint_dset[slices] = proc_dset[:]


merge_virtual = lambda joint_file, virtual_path: merge_setup(joint_file, [virtual_path,], virtual=True)