diff --git a/dedalus/tests_parallel/test_output_parallel.py b/dedalus/tests_parallel/test_output_parallel.py index 023b88d6..e080446f 100644 --- a/dedalus/tests_parallel/test_output_parallel.py +++ b/dedalus/tests_parallel/test_output_parallel.py @@ -14,7 +14,7 @@ @pytest.mark.parametrize('dtype', [np.float64, np.complex128]) @pytest.mark.parametrize('dealias', [1, 3/2]) @pytest.mark.parametrize('output_scales', [1/2, 1, 3/2]) -def test_cartesian_output(dtype, dealias, output_scales): +def test_cartesian_output_virtual(dtype, dealias, output_scales): Nx = Ny = Nz = 16 Lx = Ly = Lz = 2 * np.pi # Bases @@ -41,18 +41,21 @@ def test_cartesian_output(dtype, dealias, output_scales): # Output tasks = [u, u(x=0), u(y=0), u(z=0), u(x=0,y=0), u(x=0,z=0), u(y=0,z=0), u(x=0,y=0,z=0), v, v(x=0), v(y=0), v(z=0), v(x=0,y=0), v(x=0,z=0), v(y=0,z=0), v(x=0,y=0,z=0)] - output = solver.evaluator.add_file_handler('test_output', iter=1) + output = solver.evaluator.add_file_handler('test_output', iter=1, parallel='virtual') for task in tasks: output.add_task(task, layout='g', name=str(task), scales=output_scales) - solver.evaluator.evaluate_handlers([output]) + solver.evaluator.evaluate_handlers([output], wall_time=1, timestep=1, sim_time=1, iteration=1) # Check solution errors = [] - rank = d.comm.rank - with h5py.File('test_output/test_output_s1/test_output_s1_p{}.h5'.format(rank), mode='r') as file: + d.comm.Barrier() + with h5py.File('test_output/test_output_s1.h5', mode='r') as file: for task in tasks: - task_saved = file['tasks'][str(task)][-1] + task_name = str(task) task = task.evaluate() task.change_scales(output_scales) + local_slices = (slice(None),) * len(task.tensorsig) + d.grid_layout.slices(task.domain, task.scales) + task_saved = file['tasks'][task_name][-1] + task_saved = task_saved[local_slices] local_error = task['g'] - task_saved if local_error.size: errors.append(np.max(np.abs(task['g'] - task_saved))) @@ -61,12 +64,11 @@ def test_cartesian_output(dtype, dealias, output_scales): shutil.rmtree('test_output') assert np.allclose(errors, 0) - @pytest.mark.mpi(min_size=4) @pytest.mark.parametrize('dtype', [np.float64, np.complex128]) @pytest.mark.parametrize('dealias', [1, 3/2]) -@pytest.mark.parametrize('output_scales', [1/2]) -def test_cartesian_output_virtual(dtype, dealias, output_scales): +@pytest.mark.parametrize('output_scales', [1/2, 1, 3/2]) +def test_cartesian_output_gather(dtype, dealias, output_scales): Nx = Ny = Nz = 16 Lx = Ly = Lz = 2 * np.pi # Bases @@ -93,10 +95,10 @@ def test_cartesian_output_virtual(dtype, dealias, output_scales): # Output tasks = [u, u(x=0), u(y=0), u(z=0), u(x=0,y=0), u(x=0,z=0), u(y=0,z=0), u(x=0,y=0,z=0), v, v(x=0), v(y=0), v(z=0), v(x=0,y=0), v(x=0,z=0), v(y=0,z=0), v(x=0,y=0,z=0)] - output = solver.evaluator.add_file_handler('test_output', iter=1, max_writes=1, virtual_file=True) + output = solver.evaluator.add_file_handler('test_output', iter=1, parallel='gather') for task in tasks: output.add_task(task, layout='g', name=str(task), scales=output_scales) - solver.evaluator.evaluate_handlers([output]) + solver.evaluator.evaluate_handlers([output], wall_time=1, timestep=1, sim_time=1, iteration=1) # Check solution errors = [] d.comm.Barrier() @@ -118,14 +120,14 @@ def test_cartesian_output_virtual(dtype, dealias, output_scales): @pytest.mark.mpi(min_size=4) @pytest.mark.parametrize('dtype', [np.float64, np.complex128]) -@pytest.mark.parametrize('dealias', [1, 1.5, ]) -@pytest.mark.parametrize('output_scales', [1/2, 1]) -def test_cartesian_output_merged_virtual(dtype, dealias, output_scales): +@pytest.mark.parametrize('dealias', [1, 3/2]) +@pytest.mark.parametrize('output_scales', [1/2, 1, 3/2]) +def test_cartesian_output_mpio(dtype, dealias, output_scales): Nx = Ny = Nz = 16 Lx = Ly = Lz = 2 * np.pi # Bases c = coords.CartesianCoordinates('x', 'y', 'z') - d = distributor.Distributor((c,), mesh=[2,2]) + d = distributor.Distributor((c,), mesh=(2,2)) Fourier = {np.float64: basis.RealFourier, np.complex128: basis.ComplexFourier}[dtype] xb = Fourier(c.coords[0], size=Nx, bounds=(0, Lx), dealias=dealias) yb = Fourier(c.coords[1], size=Ny, bounds=(0, Ly), dealias=dealias) @@ -147,16 +149,13 @@ def test_cartesian_output_merged_virtual(dtype, dealias, output_scales): # Output tasks = [u, u(x=0), u(y=0), u(z=0), u(x=0,y=0), u(x=0,z=0), u(y=0,z=0), u(x=0,y=0,z=0), v, v(x=0), v(y=0), v(z=0), v(x=0,y=0), v(x=0,z=0), v(y=0,z=0), v(x=0,y=0,z=0)] - output = solver.evaluator.add_file_handler('test_output', iter=1, max_writes=1, virtual_file=True) + output = solver.evaluator.add_file_handler('test_output', iter=1, parallel='mpio') for task in tasks: output.add_task(task, layout='g', name=str(task), scales=output_scales) - solver.evaluator.evaluate_handlers([output]) + solver.evaluator.evaluate_handlers([output], wall_time=1, timestep=1, sim_time=1, iteration=1) # Check solution errors = [] - - post.merge_virtual_analysis('test_output', cleanup=True) d.comm.Barrier() - with h5py.File('test_output/test_output_s1.h5', mode='r') as file: for task in tasks: task_name = str(task) @@ -175,9 +174,9 @@ def test_cartesian_output_merged_virtual(dtype, dealias, output_scales): @pytest.mark.mpi(min_size=4) @pytest.mark.parametrize('dtype', [np.float64, np.complex128]) -@pytest.mark.parametrize('dealias', [1, 1.5, ]) -@pytest.mark.parametrize('output_scales', [1/2, 1]) -def test_cartesian_output_merged(dtype, dealias, output_scales): +@pytest.mark.parametrize('dealias', [1, 3/2]) +@pytest.mark.parametrize('output_scales', [1/2, 1, 3/2]) +def test_cartesian_output_merged_virtual(dtype, dealias, output_scales): Nx = Ny = Nz = 16 Lx = Ly = Lz = 2 * np.pi # Bases @@ -204,14 +203,14 @@ def test_cartesian_output_merged(dtype, dealias, output_scales): # Output tasks = [u, u(x=0), u(y=0), u(z=0), u(x=0,y=0), u(x=0,z=0), u(y=0,z=0), u(x=0,y=0,z=0), v, v(x=0), v(y=0), v(z=0), v(x=0,y=0), v(x=0,z=0), v(y=0,z=0), v(x=0,y=0,z=0)] - output = solver.evaluator.add_file_handler('test_output', iter=1, max_writes=1, virtual_file=False) + output = solver.evaluator.add_file_handler('test_output', iter=1, parallel='virtual') for task in tasks: output.add_task(task, layout='g', name=str(task), scales=output_scales) - solver.evaluator.evaluate_handlers([output]) + solver.evaluator.evaluate_handlers([output], wall_time=1, timestep=1, sim_time=1, iteration=1) # Check solution errors = [] - post.merge_analysis('test_output', cleanup=True) + post.merge_virtual_analysis('test_output', cleanup=True) d.comm.Barrier() with h5py.File('test_output/test_output_s1.h5', mode='r') as file: diff --git a/dedalus/tools/post.py b/dedalus/tools/post.py index a3a89d6f..7a9926bd 100644 --- a/dedalus/tools/post.py +++ b/dedalus/tools/post.py @@ -166,66 +166,7 @@ def merge_virtual_file(virtual_file_path, cleanup=False): shutil.rmtree(folder) -def merge_analysis(base_path, cleanup=False): - """ - Merge distributed analysis sets from a FileHandler. - - Parameters - ---------- - base_path : str or pathlib.Path - Base path of FileHandler output - cleanup : bool, optional - Delete distributed files after merging (default: False) - - Notes - ----- - This function is parallelized over sets, and so can be effectively - parallelized up to the number of distributed sets. - - """ - set_path = pathlib.Path(base_path) - logger.info("Merging files from {}".format(base_path)) - - set_paths = get_assigned_sets(base_path, distributed=True) - for set_path in set_paths: - merge_distributed_set(set_path, cleanup=cleanup) - - -def merge_distributed_set(set_path, cleanup=False): - """ - Merge a distributed analysis set from a FileHandler. - - Parameters - ---------- - set_path : str of pathlib.Path - Path to distributed analysis set folder - cleanup : bool, optional - Delete distributed files after merging (default: False) - - """ - set_path = pathlib.Path(set_path) - logger.info("Merging set {}".format(set_path)) - - set_stem = set_path.stem - proc_paths = set_path.glob("{}_p*.h5".format(set_stem)) - proc_paths = natural_sort(proc_paths) - joint_path = set_path.parent.joinpath("{}.h5".format(set_stem)) - - # Create joint file, overwriting if it already exists - with h5py.File(str(joint_path), mode='w') as joint_file: - # Setup joint file based on first process file (arbitrary) - merge_setup(joint_file, proc_paths) - # Merge data from all process files - for proc_path in proc_paths: - merge_data(joint_file, proc_path) - # Cleanup after completed merge, if directed - if cleanup: - for proc_path in proc_paths: - proc_path.unlink() - set_path.rmdir() - - -def merge_setup(joint_file, proc_paths, virtual=False): +def merge_virtual(joint_file, virtual_path): """ Merge HDF5 setup from part of a distributed analysis set into a joint file. @@ -233,116 +174,40 @@ def merge_setup(joint_file, proc_paths, virtual=False): ---------- joint_file : HDF5 file Joint file - proc_paths : list of [str or pathlib.Path] - List of files in a distributed analysis set - virtual: bool, optional - If True, merging a virtual file into a single file rather than distributed set - + virtual_path : path to virtual file [str or pathlib.Path] + Virtual file """ - proc_path0 = pathlib.Path(proc_paths[0]) + virt_path = pathlib.Path(virtual_path) logger.info("Merging setup for {}".format(joint_file)) - with h5py.File(str(proc_path0), mode='r') as proc_file: - # File metadata - try: - joint_file.attrs['set_number'] = proc_file.attrs['set_number'] - except KeyError: - joint_file.attrs['set_number'] = proc_file.attrs['file_number'] - joint_file.attrs['handler_name'] = proc_file.attrs['handler_name'] - try: - joint_file.attrs['writes'] = writes = proc_file.attrs['writes'] - except KeyError: - joint_file.attrs['writes'] = writes = len(proc_file['scales']['write_number']) + with h5py.File(str(virt_path), mode='r') as virt_file: # Copy scales (distributed files all have global scales) - if virtual: - proc_file.copy('scales', joint_file) - else: - needed_hashes = [] - joint_scales = joint_file.create_group('scales') - for scalename in proc_file['scales']: - if 'hash_' in scalename: - needed_hashes.append(scalename) - else: - joint_scales.create_dataset(name=scalename, data=proc_file['scales'][scalename]) + virt_file.copy('scales', joint_file) + # Tasks joint_tasks = joint_file.create_group('tasks') - proc_tasks = proc_file['tasks'] - for taskname in proc_tasks: + virt_tasks = virt_file['tasks'] + for taskname in virt_tasks: + try: + joint_file.attrs['writes'] = writes = virt_file.attrs['writes'] + except KeyError: + joint_file.attrs['writes'] = writes = len(virt_file['scales']['write_number']) # Setup dataset with automatic chunking - proc_dset = proc_tasks[taskname] - if virtual: - joint_dset = joint_tasks.create_dataset(name=proc_dset.name, data=proc_dset) - else: - spatial_shape = proc_dset.attrs['global_shape'] - joint_shape = (writes,) + tuple(spatial_shape) - joint_dset = joint_tasks.create_dataset(name=proc_dset.name, - shape=joint_shape, - dtype=proc_dset.dtype, - chunks=True) + virt_dset = virt_tasks[taskname] + joint_dset = joint_tasks.create_dataset(name=virt_dset.name, data=virt_dset) + # Dataset metadata - joint_dset.attrs['task_number'] = proc_dset.attrs['task_number'] - joint_dset.attrs['constant'] = proc_dset.attrs['constant'] - joint_dset.attrs['grid_space'] = proc_dset.attrs['grid_space'] - joint_dset.attrs['scales'] = proc_dset.attrs['scales'] - - for i, proc_dim in enumerate(proc_dset.dims): - joint_dset.dims[i].label = proc_dim.label - if joint_dset.dims[i].label == 't': - for scalename in ['sim_time', 'world_time', 'wall_time', 'timestep', 'iteration', 'write_number']: - scale = joint_file['scales'][scalename] - joint_dset.dims.create_scale(scale, scalename) - joint_dset.dims[i].attach_scale(scale) - else: - if proc_dim.label == 'constant' or proc_dim.label == '': + joint_dset.attrs['grid_space'] = virt_dset.attrs['grid_space'] + + for i, virt_dim in enumerate(virt_dset.dims): + joint_dset.dims[i].label = virt_dim.label + if joint_dset.dims[i].label != 't': + if virt_dim.label == 'constant' or virt_dim.label == '': scalename = 'constant' else: - hashval = hashlib.sha1(np.array(proc_dset.dims[i][0])).hexdigest() - scalename = 'hash_' + hashval - if not virtual and scalename in needed_hashes: - if joint_shape[i] == 1: - scale_data = np.zeros(1) - else: - scale_data = np.zeros(joint_shape[i]) - filled = np.zeros(joint_shape[i], dtype=bool) - for proc_path in proc_paths: - with h5py.File(proc_path, 'r') as pf: - start = pf['tasks'][taskname].attrs['start'][i-1] - stop = start+pf['tasks'][taskname].attrs['count'][i-1] - scale_data[start:stop] = pf['scales'][scalename] - filled[start:stop] = 1 - if np.sum(filled) == scale_data.size: break #stop filling - joint_scales.create_dataset(name=scalename, data=scale_data) - needed_hashes.remove(scalename) + hashval = hashlib.sha1(np.array(virt_dset.dims[i][0])).hexdigest() + scalename = virt_dim.label + '_hash_' + hashval scale = joint_file['scales'][scalename] joint_dset.dims.create_scale(scale, scalename) joint_dset.dims[i].attach_scale(scale) -def merge_data(joint_file, proc_path): - """ - Merge data from part of a distributed analysis set into a joint file. - - Parameters - ---------- - joint_file : HDF5 file - Joint file - proc_path : str or pathlib.Path - Path to part of a distributed analysis set - - """ - proc_path = pathlib.Path(proc_path) - logger.info("Merging data from {}".format(proc_path)) - - with h5py.File(str(proc_path), mode='r') as proc_file: - for taskname in proc_file['tasks']: - joint_dset = joint_file['tasks'][taskname] - proc_dset = proc_file['tasks'][taskname] - # Merge across spatial distribution - start = proc_dset.attrs['start'] - count = proc_dset.attrs['count'] - spatial_slices = tuple(slice(s, s+c) for (s,c) in zip(start, count)) - # Merge maintains same set of writes - slices = (slice(None),) + spatial_slices - joint_dset[slices] = proc_dset[:] - - -merge_virtual = lambda joint_file, virtual_path: merge_setup(joint_file, [virtual_path,], virtual=True)