-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
90c7081
commit 2251a94
Showing
9 changed files
with
246 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
""" | ||
Support for generic H5 files. | ||
This attempts to use some common names to read in some very simple h5 files. | ||
The FORESEE data in PubDAS motivated this module. | ||
""" | ||
from __future__ import annotations | ||
|
||
from .core import H5Simple |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
"""IO module for reading simple h5 data.""" | ||
from __future__ import annotations | ||
|
||
import dascore as dc | ||
from dascore.constants import SpoolType | ||
from dascore.io import FiberIO, HDF5Reader | ||
|
||
from .utils import _get_attrs_coords_and_data, _is_h5simple, _maybe_trim_data | ||
|
||
|
||
class H5Simple(FiberIO): | ||
"""Support for bare-bones h5 format.""" | ||
|
||
name = "H5Simple" | ||
preferred_extensions = ("hdf5", "h5") | ||
version = "1" | ||
|
||
def get_format(self, resource: HDF5Reader) -> tuple[str, str] | bool: | ||
"""Determine if is simple h5 format.""" | ||
if _is_h5simple(resource): | ||
return self.name, self.version | ||
return False | ||
|
||
def read(self, resource: HDF5Reader, snap=True, **kwargs) -> SpoolType: | ||
""" | ||
Read a simple h5 file. | ||
Parameters | ||
---------- | ||
resource | ||
The open h5 object. | ||
snap | ||
If True, snap each coordinate to be evenly sampled. | ||
**kwargs | ||
Passed to filtering coordinates. | ||
""" | ||
attrs, cm, data = _get_attrs_coords_and_data(resource, snap, self) | ||
new_cm, new_data = _maybe_trim_data(cm, data, kwargs) | ||
patch = dc.Patch(coords=new_cm, data=new_data[:], attrs=attrs) | ||
return dc.spool([patch]) | ||
|
||
def scan(self, resource: HDF5Reader, snap=True) -> list[dc.PatchAttrs]: | ||
"""Get the attributes of a h5simple file.""" | ||
attrs, cm, data = _get_attrs_coords_and_data(resource, snap, self) | ||
attrs["coords"] = cm.to_summary_dict() | ||
attrs["path"] = resource.filename | ||
return [dc.PatchAttrs(**attrs)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
"""Utilities for terra15.""" | ||
from __future__ import annotations | ||
|
||
import numpy as np | ||
|
||
import dascore as dc | ||
from dascore.core import get_coord | ||
|
||
# --- Getting format/version | ||
|
||
DATA_ARRAY_NAMES = frozenset(["raw", "data"]) | ||
TIME_ARRAY_NAMES = frozenset(("timestamp", "time", "timestamps")) | ||
OTHER_COORD_ARRAY_NAMES = frozenset(("channels", "distance")) | ||
|
||
FILE_FORMAT_ATTR_NAMES = frozenset(("__format__", "file_format", "format")) | ||
DEFAULT_ATTRS = frozenset(("CLASS", "PYTABLES_FORMAT_VERSION", "TITLE", "VERSION")) | ||
|
||
|
||
def _maybe_trim_data(cm, data, kwargs): | ||
"""Maybe use kwargs to trim data array.""" | ||
new_cm, new_data = cm.select(array=data, **kwargs) | ||
return new_cm, new_data | ||
|
||
|
||
def _get_attrs_coords_and_data(h5, snap, fiber_io): | ||
"""Return attrs, coordinate manager, and data node.""" | ||
attrs = h5.root._v_attrs | ||
attr_names = set(attrs._v_attrnames) - DEFAULT_ATTRS | ||
attr_dict = {x: getattr(attrs, x) for x in attr_names} | ||
attr_dict["file_version"] = fiber_io.version | ||
attr_dict["file_format"] = fiber_io.name | ||
cm, data = _get_cm_and_data(h5, snap, dims=attr_dict.get("dims")) | ||
attr_dict["dims"] = cm.dims | ||
return attr_dict, cm, data | ||
|
||
|
||
def _get_coord(v, snap, name): | ||
"""Get the coord values from a node.""" | ||
if snap: | ||
start = v[0] if name != "time" else dc.to_datetime64(v[0]) | ||
stop = v[-1] if name != "time" else dc.to_datetime64(v[-1]) | ||
duration = stop - start | ||
step = duration / (len(v) - 1) | ||
coord = get_coord(min=start, max=stop + step, step=step) | ||
assert len(coord) == len(v) | ||
else: | ||
values = v[:] if name != "time" else dc.to_datetime64(v[:]) | ||
coord = get_coord(values=values) | ||
return coord | ||
|
||
|
||
def _fill_coords(coord_shape_dict, other_nodes, data_node): | ||
""" | ||
Fill missing coordinate with "channel". | ||
This is needed because the foresee data on pubdas only specify time; | ||
we have to fill in channel number. | ||
""" | ||
missing_shape = set(data_node.shape) - set(coord_shape_dict) | ||
assert len(missing_shape) == 1, "can only fill one missing coord." | ||
shape = next(iter(missing_shape)) | ||
coord_shape_dict[shape] = "channel" | ||
other_nodes["channel"] = np.arange(shape) | ||
return other_nodes, coord_shape_dict | ||
|
||
|
||
def _get_coords_and_dims(data_node, time_node, other_nodes, snap=True, dims=None): | ||
"""Get dims tuple and coord dict.""" | ||
if dims: | ||
dims = dims if not isinstance(dims, str) else dims.split(",") | ||
else: # ascertain dims from shape | ||
can_guess_shape = len(data_node.shape) == len(set(data_node.shape)) | ||
assert can_guess_shape, "Cant determine dims; shape values not unique!" | ||
assert len(time_node.shape) == 1, "time node has more than one dimension!" | ||
# get a dict of {coord_name: shape} for 1d coords. | ||
coord_shape_dict = { | ||
len(v): x for x, v in other_nodes.items() if len(v.shape) == 1 | ||
} | ||
coord_shape_dict[len(time_node)] = "time" | ||
# need to fill some dims | ||
if len(coord_shape_dict) != len(data_node.shape): | ||
other_nodes, coord_shape_dict = _fill_coords( | ||
coord_shape_dict, | ||
other_nodes, | ||
data_node, | ||
) | ||
|
||
dims = tuple(coord_shape_dict[x] for x in data_node.shape) | ||
other_nodes["time"] = time_node | ||
coords = {i: _get_coord(v, snap=snap, name=i) for i, v in other_nodes.items()} | ||
return dims, coords | ||
|
||
|
||
def _get_cm_and_data(h5, snap=False, dims=None): | ||
"""Extract coordinate manager and data node.""" | ||
array_names = {x.name for x in h5.list_nodes("/") if hasattr(x, "shape")} | ||
data_node_name = array_names & DATA_ARRAY_NAMES | ||
time_node_name = array_names & TIME_ARRAY_NAMES | ||
other_node_names = array_names - data_node_name - time_node_name | ||
|
||
assert len(data_node_name) == 1, f"{h5} doesn't have exactly one data node." | ||
assert len(time_node_name) == 1, f"{h5} doesn't have exactly one time node" | ||
|
||
data_node = getattr(h5.root, next(iter(data_node_name))) | ||
time_node = getattr(h5.root, next(iter(time_node_name))) | ||
other_nodes = {x: getattr(h5.root, x) for x in other_node_names} | ||
|
||
dims, coords = _get_coords_and_dims(data_node, time_node, other_nodes, snap, dims) | ||
return dc.core.get_coord_manager(coords, dims=dims), data_node | ||
|
||
|
||
def _is_h5simple(h5): | ||
"""Determine if open h5 file is simple H5.""" | ||
has_arrays = _has_required_arrays(h5) | ||
version_ok = _no_format_or_simple_specified(h5) | ||
if has_arrays and version_ok: | ||
return True | ||
return False | ||
|
||
|
||
def _has_required_arrays(h5): | ||
"""Determine if h5 file has required arrays to be h5 simple.""" | ||
array_names = {x.name for x in h5.list_nodes("/") if hasattr(x, "shape")} | ||
data_node = array_names & DATA_ARRAY_NAMES | ||
time_node = array_names & TIME_ARRAY_NAMES | ||
return bool(data_node) and bool(time_node) | ||
|
||
|
||
def _no_format_or_simple_specified(h5): | ||
"""Ensure no other format is specified, or that simpleH5 is.""" | ||
attrs = h5.root._v_attrs | ||
attr_names = set(attrs._v_attrnames) | ||
file_format = attr_names & FILE_FORMAT_ATTR_NAMES | ||
format = getattr(attrs, next(iter(file_format))) if file_format else "h5simple" | ||
if format == "h5simple": | ||
return True | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
"""Tests for simple h5 format.""" | ||
from __future__ import annotations | ||
|
||
import shutil | ||
|
||
import pytest | ||
import tables | ||
|
||
import dascore as dc | ||
from dascore.utils.downloader import fetch | ||
|
||
|
||
class TestH5Simple: | ||
"""Tests for h5simple that aren't covered in common tests.""" | ||
|
||
@pytest.fixture(scope="class") | ||
def h5simple_path(self): | ||
"""Get the path to a h5 simple file.""" | ||
return fetch("h5_simple_1.h5") | ||
|
||
@pytest.fixture(scope="class") | ||
def h5simple_with_dim_attrs_path(self, tmp_path_factory): | ||
"""Create a h5_simpl which has dimensions specified.""" | ||
basic_path = fetch("h5_simple_2.h5") | ||
new_path = tmp_path_factory.mktemp("h5simple_dim_attrs") / "simple.h5" | ||
|
||
shutil.copy2(basic_path, new_path) | ||
with tables.open_file(new_path, "a") as h5: | ||
h5.root._v_attrs["dims"] = "distance,time" | ||
return new_path | ||
|
||
def test_no_snap(self, h5simple_path): | ||
"""Ensure when snap is not used it still reads patch.""" | ||
patch = dc.read(h5simple_path, file_format="h5simple", snap=False)[0] | ||
assert isinstance(patch, dc.Patch) | ||
|
||
def test_dims_in_attrs(self, h5simple_with_dim_attrs_path): | ||
"""Ensure if 'dims' is in attrs it gets used.""" | ||
patch = dc.spool(h5simple_with_dim_attrs_path, file_format="h5simple")[0] | ||
assert isinstance(patch, dc.Patch) |