Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add arbitrary shuffling for openPMD input #622

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 188 additions & 35 deletions mala/datahandling/data_shuffler.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,187 @@ def __init__(self):
self.rank = 0
self.size = 1

# Takes the `extent` of an n-dimensional grid, the coordinates of a start
# point `x` and of an end point `y`.
# Interpreting the grid as a row-major n-dimensional array yields a
# contiguous slice from `x` to `y`.
# !!! Both `x` and `y` are **inclusive** ends as the semantics of what would
# be the next exclusive upper limit are unnecessarily complex in
# n dimensions, especially since the function recurses over the number of
# dimensions. !!!
# This slice is not in general an n-dimensional cuboid within the grid, but
# may be a non-convex form composed of multiple cuboid chunks.
#
# This function returns a list of cuboid sub-chunks of the n-dimensional
# grid such that:
#
# 1. The union of those chunks is equal to the contiguous slice spanned
# between `x` and `y`, both ends inclusive.
# 2. The chunks do not overlap.
# 3. Each single chunk is a contiguous slice within the row-major grid.
# 4. The chunks are sorted in ascending order of their position within the
# row-major grid.
#
# Row-major within the context of this function means that the indexes go
# from the slowest-running dimension at the left end to the fastest-running
# dimension at the right end.
#
# The output is a list of chunks as defined in openPMD, i.e. a start offset
# and the extent, counted from this offset (not from the grid origin!).
#
# Example: From the below 2D grid with extent [8, 10], the slice marked by
# the X-es should be loaded. The slice is defined by its first item
# at [2, 3] and its last item at [6, 6].
# The result would be a list of three chunks:
#
# 1. ([2, 3], [1, 7])
# 2. ([3, 0], [3, 10])
# 3. ([6, 0], [1, 7])
#
# OOOOOOOOOO
# OOOOOOOOOO
# OOOXXXXXXX
# XXXXXXXXXX
# XXXXXXXXXX
# XXXXXXXXXX
# XXXXXXXOOO
# OOOOOOOOOO

def __contiguous_slice_within_ndim_grid_as_blocks(
extent: list[int], x: list[int], y: list[int]
):
# Used for converting a block defined by inclusive lower and upper
# coordinate to a chunk as defined by openPMD.
# The openPMD extent is defined by (to-from)+1 (to make up for the
# inclusive upper end).
def get_extent(from_, to_):
res = [upper + 1 - lower for (lower, upper) in zip(from_, to_)]
if any(x < 1 for x in res):
raise RuntimeError(
f"Cannot compute block extent from {from_} to {to_}."
)
return res

if len(extent) != len(x):
raise RuntimeError(
f"__shuffle_openpmd: Internal indexing error extent={extent} and x={x} must have the same length. This is a bug."
)
if len(extent) != len(y):
raise RuntimeError(
f"__shuffle_openpmd: Internal indexing error extent={extent} and y={y} must have the same length. This is a bug."
)

# Recursive bottom cases
# 1. No items left
if y < x:
return []
# 2. No distinct dimensions left
elif x[0:-1] == y[0:-1]:
return [(x.copy(), get_extent(x, y))]

# Take the frontside and backside one-dimensional slices with extent
# defined by the last dimension, process the remainder recursively.

# The offset of the front slice is equal to x.
front_slice = (x.copy(), [1 for _ in range(len(x))])
# The chunk extent in the last dimension is the distance from the x
# coordinate to the grid extent in the last dimension.
# As the slice is one-dimensional, the extent is 1 in all other
# dimensions.
front_slice[1][-1] = extent[-1] - x[-1]

# Similar for the back slice.
# The offset of the back slice is 0 in the last dimension, equal to y
# in all other dimensions.
back_slice = (y.copy(), [1 for _ in range(len(y))])
back_slice[0][-1] = 0
# The extent is equal to the coordinate of y+1 (to convert inclusive to
# exclusive upper end) in the last dimension, 1 otherwise.
back_slice[1][-1] = y[-1] + 1

# Strip the last (i.e. fast-running) dimension for a recursive call with
# one dimensionality reduced.
recursive_x = x[0:-1]
recursive_y = y[0:-1]

# Since the first and last row are dealt with above, those rows are now
# stripped from the second-to-last dimension for the recursive call
# (in which they will be the last dimension)
recursive_x[-1] += 1
recursive_y[-1] -= 1

rec_res = DataShuffler.__contiguous_slice_within_ndim_grid_as_blocks(
extent[0:-1], recursive_x, recursive_y
)

# Add the last dimension again. The last dimension is covered from start
# to end since the leftovers have been dealt with by the front and back
# slice.
rec_res = [(xx + [0], yy + [extent[-1]]) for (xx, yy) in rec_res]

return [front_slice] + rec_res + [back_slice]

# Interpreting `ndim_extent` as the extents of an n-dimensional grid,
# returns the n-dimensional coordinate of the `idx`th item in the row-major
# representation of the grid.
def __resolve_flattened_index_into_ndim(idx: int, ndim_extent: list[int]):
if not ndim_extent:
raise RuntimeError("Cannot index into a zero-dimensional array.")
strides = []
current_stride = 1
for ext in reversed(ndim_extent):
strides = [current_stride] + strides
# sic!, the last stride is ignored, as it's just the entire grid
# extent
current_stride *= ext

def worker(inner_idx, inner_strides):
if not inner_strides:
if inner_idx != 0:
raise RuntimeError(
"This cannot happen. There is bug somewhere.")
else:
return []
div, mod = divmod(inner_idx, inner_strides[0])
return [div] + worker(mod, inner_strides[1:])

return worker(idx, strides)

# Load a chunk from the openPMD `record` into the buffer at `arr`.
# The indexes `offset` and `extent` are scalar 1-dimensional coordinates,
# and apply to the n-dimensional record by reinterpreting (reshaped) it as
# a one-dimensional array.
# The function deals internally with splitting the 1-dimensional slice into
# a sequence of n-dimensional block load operations.
def __load_chunk_1D(mesh, arr, offset, extent):
start_idx = DataShuffler.__resolve_flattened_index_into_ndim(
offset, mesh.shape
)
# Inclusive upper end. See the documentation in
# __contiguous_slice_within_ndim_grid_as_blocks() for the reason why
# we work with inclusive upper ends.
end_idx = DataShuffler.__resolve_flattened_index_into_ndim(
offset + extent - 1, mesh.shape
)
blocks_to_load = (
DataShuffler.__contiguous_slice_within_ndim_grid_as_blocks(
mesh.shape, start_idx, end_idx
)
)
# print(f"\n\nLOADING {offset}\t+{extent}\tFROM {mesh.shape}")
current_offset = 0 # offset within arr
for nd_offset, nd_extent in blocks_to_load:
flat_extent = np.prod(nd_extent)
# print(
# f"\t{nd_offset}\t-{nd_extent}\t->[{current_offset}:{current_offset + flat_extent}]"
# )
mesh.load_chunk(
arr[current_offset : current_offset + flat_extent],
nd_offset,
nd_extent,
)
current_offset += flat_extent

def __shuffle_openpmd(
self,
dot: __DescriptorOrTarget,
Expand Down Expand Up @@ -369,23 +550,12 @@ def __shuffle_openpmd(
# This gets the offset and extent of the i'th such slice.
# The extent is given as in openPMD, i.e. the size of the block
# (not its upper coordinate).
def from_chunk_i(i, n, dset, slice_dimension=0):
def from_chunk_i(i, n, dset):
if isinstance(dset, io.Dataset):
dset = dset.extent
dset = list(dset)
offset = [0 for _ in dset]
extent = dset
extent_dim_0 = dset[slice_dimension]
if extent_dim_0 % n != 0:
raise Exception(
"Dataset {} cannot be split into {} chunks on dimension {}.".format(
dset, n, slice_dimension
)
)
single_chunk_len = extent_dim_0 // n
offset[slice_dimension] = i * single_chunk_len
extent[slice_dimension] = single_chunk_len
return offset, extent
flat_extent = np.prod(dset)
one_slice_extent = flat_extent // n
return i * one_slice_extent, one_slice_extent

import json

Expand Down Expand Up @@ -446,9 +616,10 @@ def from_chunk_i(i, n, dset, slice_dimension=0):
i, number_of_new_snapshots, extent_in
)
to_chunk_offset = to_chunk_extent
to_chunk_extent = to_chunk_offset + np.prod(from_chunk_extent)
to_chunk_extent = to_chunk_offset + from_chunk_extent
for dimension in range(len(mesh_in)):
mesh_in[str(dimension)].load_chunk(
DataShuffler.__load_chunk_1D(
mesh_in[str(dimension)],
new_array[dimension, to_chunk_offset:to_chunk_extent],
from_chunk_offset,
from_chunk_extent,
Expand Down Expand Up @@ -552,24 +723,6 @@ def shuffle_snapshots(
if number_of_shuffled_snapshots is None:
number_of_shuffled_snapshots = self.nr_snapshots

# Currently, the openPMD interface is not feature-complete.
if snapshot_type == "openpmd" and np.any(
np.array(
[
snapshot.grid_dimension[0] % number_of_shuffled_snapshots
for snapshot in self.parameters.snapshot_directories_list
]
)
!= 0
):
raise ValueError(
"Shuffling from OpenPMD files currently only "
"supported if first dimension of all snapshots "
"can evenly be divided by number of snapshots. "
"Please select a different number of shuffled "
"snapshots or use the numpy interface. "
)

shuffled_gridsizes = snapshot_size_list // number_of_shuffled_snapshots

if np.any(
Expand Down
38 changes: 33 additions & 5 deletions test/shuffling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,9 @@ def test_training_openpmd(self):
new_loss = test_trainer.final_validation_loss
assert old_loss > new_loss

def test_arbitrary_number_snapshots(self):
def worker_arbitrary_number_snapshots(
self, ext_in, ext_out, snapshot_type
):
parameters = mala.Parameters()

# This ensures reproducibility of the created data sets.
Expand All @@ -337,20 +339,46 @@ def test_arbitrary_number_snapshots(self):

for i in range(5):
data_shuffler.add_snapshot(
"Be_snapshot0.in.npy",
f"Be_snapshot0.in.{ext_in}",
data_path,
"Be_snapshot0.out.npy",
f"Be_snapshot0.out.{ext_in}",
data_path,
snapshot_type=snapshot_type,
)
data_shuffler.shuffle_snapshots(
complete_save_path=".",
save_name="Be_shuffled*",
save_name=f"Be_shuffled*{ext_out}",
number_of_shuffled_snapshots=5,
)
for i in range(4):

def test_arbitrary_number_snapshots(self):
self.worker_arbitrary_number_snapshots("npy", "", "numpy")
for i in range(5):
bispectrum = np.load("Be_shuffled" + str(i) + ".in.npy")
ldos = np.load("Be_shuffled" + str(i) + ".out.npy")
assert not np.any(np.where(np.all(ldos == 0, axis=-1).squeeze()))
assert not np.any(
np.where(np.all(bispectrum == 0, axis=-1).squeeze())
)
self.worker_arbitrary_number_snapshots("h5", ".h5", "openpmd")
import openpmd_api as opmd

bispectrum_series = opmd.Series(
"Be_shuffled%T.in.h5", opmd.Access.read_only
)
ldos_series = opmd.Series(
"Be_shuffled%T.out.h5", opmd.Access.read_only
)
for i in range(5):
for name, series in [("Bispectrum", bispectrum_series), ("LDOS", ldos_series)]:
loaded_array = [
component.load_chunk().squeeze()
for _, component in series.iterations[i]
.meshes[name]
.items()
]
series.flush()
loaded_array = np.array(loaded_array)
assert not np.any(
np.where(np.all(loaded_array == 0, axis=0).squeeze())
)
Loading