diff --git a/dascore/data_registry.txt b/dascore/data_registry.txt index 62971e2e..9f54edff 100644 --- a/dascore/data_registry.txt +++ b/dascore/data_registry.txt @@ -19,3 +19,4 @@ PoroTomo_iDAS_1.h5 967a2885e79937ac0426b2022a9c03d5f24790ecf3abbaa9a16eb28055566 DASDMSShot00_20230328155653619.das 12ac53f78b32d8b0e32cc674c43ff5b4c79a6c8b19de2ad577fd481679b2b7b3 https://github.com/dasdae/test_data/raw/master/das/DASDMSShot00_20230328155653619.das opto_das_1.hdf5 0437d1f02d93c9f00d31133388efaf6a28c21883bcfac457b97f1224464c7dca https://github.com/dasdae/test_data/raw/master/das/opto_das_1.hdf5 whale_1.hdf5 a09922969e740307bf26dc6ffa7fb9fbb834dc7cd7d4ced02c66b159fb1ce0cd http://piweb.ooirsn.uw.edu/das/data/Optasense/NorthCable/TransmitFiber/North-C1-LR-P1kHz-GL50m-Sp2m-FS200Hz_2021-11-03T15_06_51-0700/North-C1-LR-P1kHz-GL50m-Sp2m-FS200Hz_2021-11-04T020002Z.h5 +febus_1.h5 73eba2b6e183b3bca51f8a1448c3b423c979d05ce6c18bfd7fb76b4f9bda5c0b https://github.com/dasdae/test_data/raw/master/das/febus_1.h5 diff --git a/dascore/io/febus/__init__.py b/dascore/io/febus/__init__.py new file mode 100644 index 00000000..d546bb9e --- /dev/null +++ b/dascore/io/febus/__init__.py @@ -0,0 +1,9 @@ +""" +Support for Febus format. + +This is used by the Febus DAS interrogator. + +More info about febus can be found here: https://www.febus-optics.com/en/ +""" +from __future__ import annotations +from .core import Febus2 diff --git a/dascore/io/febus/core.py b/dascore/io/febus/core.py new file mode 100644 index 00000000..ea6936be --- /dev/null +++ b/dascore/io/febus/core.py @@ -0,0 +1,103 @@ +""" +IO module for reading Febus data. +""" +from __future__ import annotations + +import numpy as np + +import dascore as dc +from dascore.constants import opt_timeable_types +from dascore.io import FiberIO +from dascore.utils.hdf5 import H5Reader +from dascore.utils.models import UTF8Str + +from .utils import ( + _get_febus_version_str, + _read_febus, + _yield_attrs_coords, +) + + +class FebusPatchAttrs(dc.PatchAttrs): + """ + Patch attrs for febus. + + Attributes + ---------- + source + The source designation + zone + The zone designations + """ + + gauge_length: float = np.NaN + gauge_length_units: str = "m" + pulse_width: float = np.NaN + pulse_width_units: str = "m" + + group: str = "" + source: str = "" + zone: str = "" + + folog_a1_software_version: UTF8Str = "" + + +class Febus2(FiberIO): + """Support for Febus V 2. + + This should cover all versions 2.* of the format (maybe). + """ + + name = "febus" + preferred_extensions = ("hdf5", "h5") + version = "2" + + def get_format(self, resource: H5Reader) -> tuple[str, str] | bool: + """ + Return True if file contains febus version 8 data else False. + + Parameters + ---------- + resource + A path to the file which may contain terra15 data. + """ + version_str = _get_febus_version_str(resource) + if version_str: + return self.name, version_str + + def scan(self, resource: H5Reader) -> list[dc.PatchAttrs]: + """Scan a febus file, return summary information about the file's contents.""" + out = [] + file_version = _get_febus_version_str(resource) + extras = { + "path": resource.filename, + "file_format": self.name, + "file_version": str(file_version), + } + for attr, cm, _ in _yield_attrs_coords(resource): + attr["coords"] = cm.to_summary_dict() + attr.update(dict(extras)) + out.append(FebusPatchAttrs(**attr)) + return out + + def read( + self, + resource: H5Reader, + time: tuple[opt_timeable_types, opt_timeable_types] | None = None, + distance: tuple[float | None, float | None] | None = None, + **kwargs, + ) -> dc.BaseSpool: + """Read a febus spool of patches.""" + patches = _read_febus( + resource, time=time, distance=distance, attr_cls=FebusPatchAttrs + ) + return dc.spool(patches) + + +class Febus1(Febus2): + """Support for Febus V 1. + + This is here to support legacy febus (eg pubdas Valencia) + """ + + version = "1" diff --git a/dascore/io/febus/utils.py b/dascore/io/febus/utils.py new file mode 100644 index 00000000..2059ac95 --- /dev/null +++ b/dascore/io/febus/utils.py @@ -0,0 +1,252 @@ +"""Utilities for Febus.""" +from __future__ import annotations + +from collections import namedtuple + +import numpy as np + +import dascore as dc +from dascore.core import get_coord, get_coord_manager +from dascore.core.coordmanager import CoordManager +from dascore.utils.misc import ( + _maybe_unpack, + broadcast_for_index, + maybe_get_items, + unbyte, +) + +# --- Getting format/version + +FebusSlice = namedtuple( + "FebusSlice", + ["group", "group_name", "source", "source_name", "zone", "zone_name", "data_name"], +) + + +def _flatten_febus_info(fi) -> tuple[FebusSlice, ...]: + """ + Given a febus file, return a tuple of named tuples with key info. + + This flattens the iteration nesting to a single layer. + """ + out = [] + for group_name, group in fi.items(): + for source_name, source in group.items(): + for zone_name, zone in source.items(): + # Skip time dataset (we only want zone groups). + if zone_name == "time": + continue + # get dataset name (not always StrainRate for older data) + possible_ds_names = list(zone.keys()) + assert len(possible_ds_names) == 1 + data_name = possible_ds_names[0] + zlice = FebusSlice( + group, group_name, source, source_name, zone, zone_name, data_name + ) + out.append(zlice) + return tuple(out) + + +def _get_febus_version_str(hdf_fi) -> str: + """Return the version string for febus file.""" + # Define a few root attrs that act as a "fingerprint" + # all Febus DAS files have folders that start with fa (I hope). + inst_keys = sorted(hdf_fi.keys()) + expected_source_attrs = { + "AmpliPower", + "Hostname", + "WholeExtent", + "SamplingRate", + } + # iterate instrument keys + is_febus = all([x.startswith("fa") for x in inst_keys]) + # Version 1, or what I think is version one (eg Valencia PubDAS data) + # did not include a Version attr in Source dataset, so we use that as + # the default. + version = "1" + for inst_key in inst_keys: + inst = hdf_fi[inst_key] + source_keys = set(inst.keys()) + is_febus = is_febus and all(x.startswith("Source") for x in source_keys) + for source_key in source_keys: + source = inst[source_key] + # If the version is set in a Source use that version. + # Hopefully this is the file version... + version = unbyte(source.attrs.get("Version", version)).split(".")[0] + is_febus = is_febus and expected_source_attrs.issubset(set(source.attrs)) + if is_febus: + return version + return "" + + +def _get_febus_attrs(feb: FebusSlice) -> dict: + """Get non-coordinate attrs from febus slice.""" + zone_attrs = feb.zone.attrs + attr_mapping = { + "GaugeLength": "gauge_length", + "PulseWidth": "pulse_width", + "Version": "folog_a1_software_version", + } + out = maybe_get_items(zone_attrs, attr_mapping, unpack_names=set(attr_mapping)) + out["group"] = feb.group_name + out["source"] = feb.source_name + out["zone"] = feb.zone_name + out["schema_version"] = out.get("folog_a1_software_version", "").split(".")[0] + out["dims"] = ("time", "distance") + return out + + +def _get_time_coord(feb): + """Get the time coordinate contained in the febus slice.""" + time = feb.source["time"] + # In older version time shape is different, always grap first eleemnt. + first_slice = tuple(0 for _ in time.shape) + t_0 = time[first_slice] + # Data dimensions are block_index, time, distance + data_shape = feb.zone[feb.data_name].shape + n_blocks = data_shape[0] + # Since the data have overlaps in each block's time dimension, we need to + # trim the overlap off the time dimension to avoid having to merge blocks later. + overlap_percentage = _maybe_unpack(feb.zone.attrs.get("BlockOverlap", 0)) + rows_to_remove = int(np.round(data_shape[1] * overlap_percentage / 100)) + total_time_rows = (data_shape[1] - 2 * rows_to_remove) * n_blocks + # Get spacing between time samples (in s) + time_step = feb.zone.attrs["Spacing"][1] / 1_000 # value in ms, convert to s. + # Get origin info, these are offsets from time for start of block, + time_origin = feb.zone.attrs["Origin"][1] / 1_000 # also convert to s + # Get the start/stop indicies for the zone + extent = feb.zone.attrs["Extent"] + time_ids = (extent[2], extent[3]) + # Create time coord + # Need to account for removing overlap times. + total_start = time_origin - rows_to_remove * time_step + time_ids[0] * time_step + total_end = total_start + total_time_rows * time_step + time_coord = get_coord( + start=dc.to_datetime64(t_0 + total_start), + stop=dc.to_datetime64(t_0 + total_end), + step=dc.to_timedelta64(time_step), + ) + return time_coord.change_length(total_time_rows) + + +def _get_distance_coord(feb): + """Get the distance coordinate associated with febus slice.""" + data_shape = feb.zone[feb.data_name].shape + total_distance_inds = data_shape[2] + # Get spacing between channels (in m) + distance_step = feb.zone.attrs["Spacing"][0] + # Get origin info, these are absolute for distance. + distance_origin = feb.zone.attrs["Origin"][0] + # Get the start/stop indicies for the zone + extent = feb.zone.attrs["Extent"] + dist_ids = (extent[0], extent[1]) + # Create distance coord + # Need to account for removing overlap times. + start = dist_ids[0] * distance_step + distance_origin + stop = start + total_distance_inds * distance_step + dist_coord = get_coord( + start=start, + stop=stop, + step=distance_step, + units="m", + ) + return dist_coord.change_length(total_distance_inds) + + +def _get_febus_coord_manager(feb: FebusSlice) -> CoordManager: + """Get a coordinate manager for febus slice.""" + coords = dict( + time=_get_time_coord(feb), + distance=_get_distance_coord(feb), + ) + cm = get_coord_manager(coords=coords, dims=("time", "distance")) + return cm + + +def _yield_attrs_coords(fi) -> tuple[dict, CoordManager]: + """Scan a febus file, return metadata.""" + febuses = _flatten_febus_info(fi) + for febus in febuses: + attr = _get_febus_attrs(febus) + cm = _get_febus_coord_manager(febus) + yield attr, cm, febus + + +def _get_data_new_cm(cm, febus, distance=None, time=None): + """ + Get the data from febus file, maybe filtering on time/distance. + + This is a bit more complicated since the febus data are stored in a 3d array, + but we want a 2d output. + """ + + def _get_start_end_time_array(time_coord, total_time_rows, data_shape, time): + """Get a 2d array where columns are start/end times for each block.""" + block_count = data_shape[0] + block_duration = total_time_rows * time_coord.step + start = ( + np.arange(block_count) * block_duration + time_coord.step + ) + time_coord.min() + end = start + block_duration + return np.stack([start, end], axis=-1) + + def _get_time_filtered_data(data, t_start_end, time, total_slice, time_coord): + """Get new data array filtered from time query.""" + assert len(time) == 2 + t1, t2 = time + # block for which all data are needed. + in_time = np.ones(len(t_start_end), dtype=bool) + if t1 is not None and t1 is not ...: + in_time = np.logical_and(in_time, ~(t_start_end[:, 1] < t1)) + if t2 is not None and t2 is not ...: + in_time = np.logical_and(in_time, ~(t_start_end[:, 0] > t2)) + times = t_start_end[in_time] + tmin = times[:, 0].min() + tmax = times[:, 1].max() + # get start/stop indexes for complete blocks + start = np.argmax(in_time) + stop = np.argmax(np.cumsum(in_time)) + total_slice[0] = slice(start, stop) + # load data from disk. + data_2d = data[tuple(total_slice)].reshape(-1, data.shape[-1]) + # Next, get mew time coord and slice. + new_coord, time_slice = ( + get_coord(min=tmin, max=tmax, step=time_coord.step) + .change_length(len(data_2d)) + .select((t1, t2)) + ) + return data_2d[time_slice], new_coord + + dist_coord, time_coord = cm.coord_map["distance"], cm.coord_map["time"] + data = febus.zone[febus.data_name] + data_shape = data.shape + overlap_percentage = _maybe_unpack(febus.zone.attrs.get("BlockOverlap", 0)) + skip_rows = int(np.round(overlap_percentage / 100 * data_shape[1])) + # Need to handle case where skip_rows == 0 + data_slice = slice(skip_rows, -skip_rows if skip_rows else None) + total_slice = list(broadcast_for_index(3, 1, data_slice)) + total_time_rows = data_shape[1] - 2 * skip_rows + if distance: + dist_coord, total_slice[2] = dist_coord.select(distance) + if time: # need to sub-select blocks to get data we are after. + t_start_end = _get_start_end_time_array( + time_coord, total_time_rows, data_shape, time + ) + data, time_coord = _get_time_filtered_data( + data, t_start_end, time, total_slice, time_coord + ) + else: # no need to mess with blocks, all time is selected + data_3d = data[tuple(total_slice)] + data = data_3d.reshape(-1, data_3d.shape[2]) + cm = get_coord_manager({"time": time_coord, "distance": dist_coord}, dims=cm.dims) + return data, cm + + +def _read_febus(fi, distance=None, time=None, attr_cls=dc.PatchAttrs): + """Read the febus values into a patch.""" + out = [] + for attr, cm, febus in _yield_attrs_coords(fi): + data, new_cm = _get_data_new_cm(cm, febus, distance=distance, time=time) + patch = dc.Patch(data=data, coords=new_cm, attrs=attr_cls(**attr)) + out.append(patch) + return out diff --git a/dascore/io/optodas/utils.py b/dascore/io/optodas/utils.py index bea41479..7dc53861 100644 --- a/dascore/io/optodas/utils.py +++ b/dascore/io/optodas/utils.py @@ -1,4 +1,4 @@ -"""Utilities for terra15.""" +"""Utilities for OptoDAS.""" from __future__ import annotations import dascore as dc diff --git a/dascore/utils/misc.py b/dascore/utils/misc.py index 94c905fd..812848c7 100644 --- a/dascore/utils/misc.py +++ b/dascore/utils/misc.py @@ -389,16 +389,39 @@ def maybe_get_attrs(obj, attr_map: Mapping): return out -def maybe_get_items(obj, attr_map: Mapping): - """Maybe get items from a mapping (if they exist).""" +def maybe_get_items( + obj, attr_map: Mapping[str, str], unpack_names: None | set[str] = None +): + """ + Maybe get items from a mapping (if they exist). + + Parameters + ---------- + obj + Any map like object. + attr_map + A mapping of {current_name: output_name} + unpack_names + A set of names which should be unpacked (ie collapse 0d arrays). + """ + unpack_names = set() if unpack_names is None else unpack_names out = {} for old_name, new_name in attr_map.items(): if not (value := obj.get(old_name, None)): continue - out[new_name] = unbyte(value) + val = unbyte(value) + out[new_name] = _maybe_unpack(val) if old_name in unpack_names else val return out +def _maybe_unpack(maybe_array): + """Unpack an array like object if it is size one, else return input.""" + size = getattr(maybe_array, "size", 0) + if size == 1: + maybe_array = maybe_array[0] + return maybe_array + + @cache def _get_compiled_suffix_prefix_regex( suffixes: str | tuple[str], diff --git a/pyproject.toml b/pyproject.toml index 3c55f8b1..43c1e094 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,8 @@ dev = ["dascore[test]", "dascore[docs]", "dascore[profile]", "dascore[extras]"] DASDAE__V1 = "dascore.io.dasdae.core:DASDAEV1" DASHDF5__V1 = "dascore.io.dashdf5.core:DASHDF5" H5SIMPLE__V1_0 = "dascore.io.h5simple.core:H5Simple" +FEBUS__V1 = "dascore.io.febus.core:Febus1" +FEBUS__V2 = "dascore.io.febus.core:Febus2" OPTODAS__V8 = "dascore.io.optodas.core:OptoDASV8" PICKLE = "dascore.io.pickle.core:PickleIO" PRODML__V2_0 = "dascore.io.prodml.core:ProdMLV2_0" diff --git a/tests/test_core/test_coordmanager.py b/tests/test_core/test_coordmanager.py index 8e584ed6..fd66f3b5 100644 --- a/tests/test_core/test_coordmanager.py +++ b/tests/test_core/test_coordmanager.py @@ -299,12 +299,12 @@ def test_iterate(self, basic_coord_manager): def test_coord_size(self, random_patch): """Ensure we can get size of the coordinate.""" - expected = len(random_patch.coords["time"]) + expected = len(random_patch.get_coord("time")) assert random_patch.coords.coord_size("time") == expected def test_coord_range(self, random_patch): """Ensure we can get a scaler value for the coordinate.""" - coord_array = random_patch.coords["time"] + coord_array = random_patch.get_coord("time").data expected = ( np.max(coord_array) - np.min(coord_array) + random_patch.attrs["time_step"] ) diff --git a/tests/test_core/test_patch.py b/tests/test_core/test_patch.py index af68ab61..aa22ef54 100644 --- a/tests/test_core/test_patch.py +++ b/tests/test_core/test_patch.py @@ -572,7 +572,7 @@ def test_seconds(self, random_patch_with_lat): def test_channel_count(self, random_patch_with_lat): """Ensure we can get number of channels in the patch.""" - expected = len(random_patch_with_lat.coords["distance"]) + expected = len(random_patch_with_lat.get_coord("distance")) assert random_patch_with_lat.channel_count == expected diff --git a/tests/test_io/test_common_io.py b/tests/test_io/test_common_io.py index b27295e4..392a0efb 100644 --- a/tests/test_io/test_common_io.py +++ b/tests/test_io/test_common_io.py @@ -23,6 +23,7 @@ from dascore.io import BinaryReader from dascore.io.dasdae import DASDAEV1 from dascore.io.dashdf5 import DASHDF5 +from dascore.io.febus import Febus2 from dascore.io.h5simple import H5Simple from dascore.io.optodas import OptoDASV8 from dascore.io.pickle import PickleIO @@ -48,6 +49,7 @@ # See the docs on adding a new IO format, in the contributing section, # for more details. COMMON_IO_READ_TESTS = { + Febus2(): ("febus_1.h5",), OptoDASV8(): ("opto_das_1.hdf5",), DASDAEV1(): ("example_dasdae_event_1.h5",), H5Simple(): ("h5_simple_2.h5", "h5_simple_1.h5"),