diff --git a/pyuvdata/hdf5_utils.py b/pyuvdata/hdf5_utils.py new file mode 100644 index 0000000000..68fb7b83ba --- /dev/null +++ b/pyuvdata/hdf5_utils.py @@ -0,0 +1,379 @@ +# -*- mode: python; coding: utf-8 -*- +# Copyright (c) 2023 Radio Astronomy Software Group +# Licensed under the 2-clause BSD License +"""Utilities for working with HDF5 files.""" +from functools import cached_property +from pathlib import Path +from typing import Any + +import h5py +import numpy as np + +from . import utils as uvutils + +hdf5plugin_present = True +try: + import hdf5plugin # noqa: F401 +except ImportError as error: + hdf5plugin_present = False + hdf5plugin_error = error + + +def _check_complex_dtype(dtype): + """ + Check that a specified custom datatype conforms to UVH5 standards. + + According to the UVH5 spec, the data type for the data array must be a + compound datatype with an "r" field and an "i" field. Additionally, both + datatypes must be the same (e.g., " bool: + """Whether the file is open.""" + return bool(self.__file) + + def __del__(self): + """Close the file when the object is deleted.""" + if self.__file: + self.__file.close() + + def __getstate__(self): + """Get the state of the object.""" + print(self.__dict__.keys()) + print(self.__class__.__name__) + return { + k: v + for k, v in self.__dict__.items() + if k + not in ( + "_HDF5Meta__file", + "_HDF5Meta__header", + "_HDF5Meta__datagrp", + "header", + "datagrp", + ) + } + + def __setstate__(self, state): + """Set the state of the object.""" + self.__dict__.update(state) + self.__file = None + + def __eq__(self, other): + """Check equality of two HDF5Meta objects.""" + if not isinstance(other, self.__class__): + return False + + return self.path == other.path + + def __hash__(self): + """Get a unique hash for the object.""" + return hash(self.path) + + def close(self): + """Close the file.""" + self.__header = None + self.__datagrp = None + + try: + del self.header # need to refresh these + except AttributeError: + pass + + try: + del self.datagrp + except AttributeError: + pass + + if self.__file: + self.__file.close() + self.__file = None + + def open(self): # noqa: A003 + """Open the file.""" + if not self.__file: + self.__file = h5py.File(self.path, "r") + self.__header = self.__file["/Header"] + self.__datagrp = self.__file["/Data"] + + @cached_property + def header(self) -> h5py.Group: + """Get the header group.""" + if not self.__file: + self.open() + return self.__header + + @cached_property + def datagrp(self) -> h5py.Group: + """Get the header group.""" + if not self.__file: + self.open() + return self.__datagrp + + def get_transactional(self, item: str, cache: bool = True) -> Any: + """Get an attribute from the metadata but close the file object afterwards. + + Using this method is safer than direct attribute access when dealing with + many files. + + Parameters + ---------- + item + The attribute to get. + cache + Whether to cache the attribute in the object so that the next access is + faster. + """ + try: + val = getattr(self, item) + finally: + self.close() + + if not cache: + if item in self.__dict__: + del self.__dict__[item] + + return val + + def __getattr__(self, name: str) -> Any: + """Get attribute directly from header group.""" + try: + x = self.header[name][()] + if name in self._string_attrs: + x = bytes(x).decode("utf8") + elif name in self._int_attrs: + x = int(x) + elif name in self._float_attrs: + x = float(x) + + self.__dict__[name] = x + return x + except KeyError: + try: + return self._defaults[name] + except KeyError as e: + raise AttributeError(f"{name} not found in {self.path}") from e + + @cached_property + def extra_keywords(self) -> dict: + """The extra_keywords from the file.""" + header = self.header + if "extra_keywords" not in header: + return {} + + extra_keywords = {} + for key in header["extra_keywords"].keys(): + if header["extra_keywords"][key].dtype.type in (np.string_, np.object_): + extra_keywords[key] = bytes(header["extra_keywords"][key][()]).decode( + "utf8" + ) + else: + # special handling for empty datasets == python `None` type + if header["extra_keywords"][key].shape is None: + extra_keywords[key] = None + else: + extra_keywords[key] = header["extra_keywords"][key][()] + return extra_keywords diff --git a/pyuvdata/uvcal/calh5.py b/pyuvdata/uvcal/calh5.py index 2b0f33c624..f396b4d67c 100644 --- a/pyuvdata/uvcal/calh5.py +++ b/pyuvdata/uvcal/calh5.py @@ -8,18 +8,12 @@ import warnings from functools import cached_property from pathlib import Path -from typing import Any import h5py import numpy as np +from .. import hdf5_utils from .. import utils as uvutils -from ..uvdata.uvh5 import ( - _check_uvh5_dtype, - _get_compression, - _read_complex_astype, - _write_complex_astype, -) from .uvcal import UVCal, _future_array_shapes_warning, radian_tol hdf5plugin_present = True @@ -30,7 +24,7 @@ hdf5plugin_error = error -class FastCalH5Meta: +class FastCalH5Meta(hdf5_utils.HDF5Meta): """ A fast read-only interface to CalH5 file metadata that makes some assumptions. @@ -97,165 +91,6 @@ class FastCalH5Meta: _bool_attrs = frozenset(("wide_band",)) - def __init__(self, path: str | Path | h5py.File | h5py.Group): - self.__file = None - - if isinstance(path, h5py.File): - self.path = Path(path.filename) - self.__file = path - self.__header = path["/Header"] - self.__datagrp = path["/Data"] - elif isinstance(path, h5py.Group): - self.path = Path(path.file.filename) - self.__file = path.file - self.__header = path - self.__datagrp = self.__file["/Data"] - elif isinstance(path, (str, Path)): - self.path = Path(path) - - def is_open(self) -> bool: - """Whether the file is open.""" - return bool(self.__file) - - def __del__(self): - """Close the file when the object is deleted.""" - if self.__file: - self.__file.close() - - def __getstate__(self): - """Get the state of the object.""" - return { - k: v - for k, v in self.__dict__.items() - if k - not in ( - "_FastCalH5Meta__file", - "_FastCalH5Meta__header", - "_FastCalH5Meta__datagrp", - "header", - "datagrp", - ) - } - - def __setstate__(self, state): - """Set the state of the object.""" - self.__dict__.update(state) - self.__file = None - - def __eq__(self, other): - """Check equality of two FastCalH5Meta objects.""" - if not isinstance(other, FastCalH5Meta): - return False - - return self.path == other.path - - def __hash__(self): - """Get a unique hash for the object.""" - return hash(self.path) - - def close(self): - """Close the file.""" - self.__header = None - self.__datagrp = None - - try: - del self.header # need to refresh these - except AttributeError: - pass - - try: - del self.datagrp - except AttributeError: - pass - - if self.__file: - self.__file.close() - self.__file = None - - def open(self): # noqa: A003 - """Open the file.""" - if not self.__file: - self.__file = h5py.File(self.path, "r") - self.__header = self.__file["/Header"] - self.__datagrp = self.__file["/Data"] - - @cached_property - def header(self) -> h5py.Group: - """Get the header group.""" - if not self.__file: - self.open() - return self.__header - - @cached_property - def datagrp(self) -> h5py.Group: - """Get the header group.""" - if not self.__file: - self.open() - return self.__datagrp - - def get_transactional(self, item: str, cache: bool = True) -> Any: - """Get an attribute from the metadata but close the file object afterwards. - - Using this method is safer than direct attribute access when dealing with - many files. - - Parameters - ---------- - item - The attribute to get. - cache - Whether to cache the attribute in the object so that the next access is - faster. - """ - try: - val = getattr(self, item) - finally: - self.close() - - if not cache: - if item in self.__dict__: - del self.__dict__[item] - - return val - - def __getattr__(self, name: str) -> Any: - """Get attribute directly from header group.""" - try: - x = self.header[name][()] - if name in self._string_attrs: - x = bytes(x).decode("utf8") - elif name in self._int_attrs: - x = int(x) - - self.__dict__[name] = x - return x - except KeyError: - try: - return self._defaults[name] - except KeyError as e: - raise AttributeError(f"{name} not found in {self.path}") from e - - @cached_property - def extra_keywords(self) -> dict: - """The extra_keywords from the file.""" - header = self.header - if "extra_keywords" not in header: - return {} - - extra_keywords = {} - for key in header["extra_keywords"].keys(): - if header["extra_keywords"][key].dtype.type in (np.string_, np.object_): - extra_keywords[key] = bytes(header["extra_keywords"][key][()]).decode( - "utf8" - ) - else: - # special handling for empty datasets == python `None` type - if header["extra_keywords"][key].shape is None: - extra_keywords[key] = None - else: - extra_keywords[key] = header["extra_keywords"][key][()] - return extra_keywords - def check_lsts_against_times(self): """Check that LSTs consistent with the time_array and telescope location.""" lsts = uvutils.get_lst_for_time( @@ -554,7 +389,7 @@ def _get_data( # cast to floats caldata_dtype = dgrp["gains"].dtype if caldata_dtype not in ("complex64", "complex128"): - _check_uvh5_dtype(caldata_dtype) + hdf5_utils._check_uvh5_dtype(caldata_dtype) if gain_array_dtype not in (np.complex64, np.complex128): raise ValueError( "gain_array_dtype must be np.complex64 or np.complex128" @@ -577,7 +412,7 @@ def _get_data( inds = (np.s_[:], np.s_[:], np.s_[:], np.s_[:]) if self.cal_type == "gain": if custom_dtype: - self.gain_array = _read_complex_astype( + self.gain_array = hdf5_utils._read_complex_astype( dgrp["gains"], inds, gain_array_dtype ) else: @@ -691,7 +526,9 @@ def _get_data( # index datasets if custom_dtype: - cal_data = _read_complex_astype(caldata_dset, inds, gain_array_dtype) + cal_data = hdf5_utils._read_complex_astype( + caldata_dset, inds, gain_array_dtype + ) else: cal_data = uvutils._index_dset(caldata_dset, inds) flags = uvutils._index_dset(flags_dset, inds) @@ -1093,7 +930,9 @@ def write_calh5( revert_fas = True self.use_future_array_shapes() - data_compression, data_compression_opts = _get_compression(data_compression) + data_compression, data_compression_opts = hdf5_utils._get_compression( + data_compression + ) # open file for writing with h5py.File(filename, "w") as f: @@ -1110,7 +949,7 @@ def write_calh5( else: gain_write_dtype = "c16" if gain_write_dtype not in ("c8", "c16"): - _check_uvh5_dtype(gain_write_dtype) + hdf5_utils._check_uvh5_dtype(gain_write_dtype) gaindata = dgrp.create_dataset( "gains", self.gain_array.shape, @@ -1120,7 +959,7 @@ def write_calh5( dtype=gain_write_dtype, ) indices = (np.s_[:], np.s_[:], np.s_[:], np.s_[:]) - _write_complex_astype(self.gain_array, gaindata, indices) + hdf5_utils._write_complex_astype(self.gain_array, gaindata, indices) else: gaindata = dgrp.create_dataset( "gains", diff --git a/pyuvdata/uvdata/tests/test_uvh5.py b/pyuvdata/uvdata/tests/test_uvh5.py index bfef79cd44..e67ce8e55c 100644 --- a/pyuvdata/uvdata/tests/test_uvh5.py +++ b/pyuvdata/uvdata/tests/test_uvh5.py @@ -19,6 +19,7 @@ from astropy.time import Time from packaging import version +import pyuvdata.hdf5_utils as hdf5_utils import pyuvdata.tests as uvtest import pyuvdata.utils as uvutils from pyuvdata import UVData @@ -2667,7 +2668,7 @@ def test_read_complex_astype(tmp_path): indices = (np.s_[:], np.s_[:], np.s_[:], np.s_[:]) with h5py.File(test_file, "r") as h5f: dset = h5f["Data/testdata"] - file_data = uvh5._read_complex_astype(dset, indices, np.complex64) + file_data = hdf5_utils._read_complex_astype(dset, indices, np.complex64) assert np.allclose(file_data, test_data) @@ -2697,7 +2698,7 @@ def test_read_complex_astype_errors(tmp_path): with h5py.File(test_file, "r") as h5f: dset = h5f["Data/testdata"] with pytest.raises(ValueError) as cm: - uvh5._read_complex_astype(dset, indices, np.int32) + hdf5_utils._read_complex_astype(dset, indices, np.int32) assert str(cm.value).startswith("output datatype must be one of (complex") # clean up @@ -2719,7 +2720,7 @@ def test_write_complex_astype(tmp_path): "testdata", test_data_shape, dtype=uvh5._hera_corr_dtype ) inds = (np.s_[:], np.s_[:], np.s_[:], np.s_[:]) - uvh5._write_complex_astype(test_data, dset, inds) + hdf5_utils._write_complex_astype(test_data, dset, inds) # read the data back in to confirm it's right with h5py.File(test_file, "r") as h5f: @@ -2736,19 +2737,19 @@ def test_write_complex_astype(tmp_path): def test_check_uvh5_dtype_errors(): # test passing in something that's not a dtype with pytest.raises(ValueError) as cm: - uvh5._check_uvh5_dtype("hi") + hdf5_utils._check_complex_dtype("hi") assert str(cm.value).startswith("dtype in a uvh5 file must be a numpy dtype") # test using a dtype with bad field names dtype = np.dtype([("a", " bool: - """Whether the file is open.""" - return bool(self.__file) - - def __del__(self): - """Close the file when the object is deleted.""" - if self.__file: - self.__file.close() - - def __getstate__(self): - """Get the state of the object.""" - return { - k: v - for k, v in self.__dict__.items() - if k - not in ( - "_FastUVH5Meta__file", - "_FastUVH5Meta__header", - "_FastUVH5Meta__datagrp", - "header", - "datagrp", - ) - } - - def __setstate__(self, state): - """Set the state of the object.""" - self.__dict__.update(state) - self.__file = None - def __eq__(self, other): """Check equality of two FastUVH5Meta objects.""" if not isinstance(other, FastUVH5Meta): @@ -379,91 +177,7 @@ def __eq__(self, other): def __hash__(self): """Get a unique hash for the object.""" - return hash(self.path) - - def close(self): - """Close the file.""" - self.__header = None - self.__datagrp = None - - try: - del self.header # need to refresh these - except AttributeError: - pass - - try: - del self.datagrp - except AttributeError: - pass - - if self.__file: - self.__file.close() - self.__file = None - - def open(self): # noqa: A003 - """Open the file.""" - if not self.__file: - self.__file = h5py.File(self.path, "r") - self.__header = self.__file["/Header"] - self.__datagrp = self.__file["/Data"] - - @cached_property - def header(self) -> h5py.Group: - """Get the header group.""" - if not self.__file: - self.open() - return self.__header - - @cached_property - def datagrp(self) -> h5py.Group: - """Get the header group.""" - if not self.__file: - self.open() - return self.__datagrp - - def get_transactional(self, item: str, cache: bool = True) -> Any: - """Get an attribute from the metadata but close the file object afterwards. - - Using this method is safer than direct attribute access when dealing with - many files. - - Parameters - ---------- - item - The attribute to get. - cache - Whether to cache the attribute in the object so that the next access is - faster. - """ - try: - val = getattr(self, item) - finally: - self.close() - - if not cache: - if item in self.__dict__: - del self.__dict__[item] - - return val - - def __getattr__(self, name: str) -> Any: - """Get attribute directly from header group.""" - try: - x = self.header[name][()] - if name in self._string_attrs: - x = bytes(x).decode("utf8") - elif name in self._int_attrs: - x = int(x) - elif name in self._float_attrs: - x = float(x) - - self.__dict__[name] = x - return x - except KeyError: - try: - return self._defaults[name] - except KeyError as e: - raise AttributeError(f"{name} not found in {self.path}") from e + return super().__hash__() @cached_property def Nbls(self) -> int: # noqa: N802 @@ -1201,7 +915,7 @@ def _get_data( # cast to floats visdata_dtype = dgrp["visdata"].dtype if visdata_dtype not in ("complex64", "complex128"): - _check_uvh5_dtype(visdata_dtype) + hdf5_utils._check_complex_dtype(visdata_dtype) if data_array_dtype not in (np.complex64, np.complex128): raise ValueError( "data_array_dtype must be np.complex64 or np.complex128" @@ -1214,7 +928,7 @@ def _get_data( # no select, read in all the data inds = (np.s_[:], np.s_[:], np.s_[:]) if custom_dtype: - self.data_array = _read_complex_astype( + self.data_array = hdf5_utils._read_complex_astype( dgrp["visdata"], inds, data_array_dtype ) else: @@ -1291,7 +1005,9 @@ def _get_data( # index datasets if custom_dtype: - visdata = _read_complex_astype(visdata_dset, inds, data_array_dtype) + visdata = hdf5_utils._read_complex_astype( + visdata_dset, inds, data_array_dtype + ) else: visdata = uvutils._index_dset(visdata_dset, inds) flags = uvutils._index_dset(flags_dset, inds) @@ -1337,7 +1053,9 @@ def _get_data( # index datasets if custom_dtype: - visdata = _read_complex_astype(visdata_dset, inds, data_array_dtype) + visdata = hdf5_utils._read_complex_astype( + visdata_dset, inds, data_array_dtype + ) else: visdata = uvutils._index_dset(visdata_dset, inds) flags = uvutils._index_dset(flags_dset, inds) @@ -1379,7 +1097,9 @@ def _get_data( # index datasets if custom_dtype: - visdata = _read_complex_astype(visdata_dset, inds, data_array_dtype) + visdata = hdf5_utils._read_complex_astype( + visdata_dset, inds, data_array_dtype + ) else: visdata = uvutils._index_dset(visdata_dset, inds) flags = uvutils._index_dset(flags_dset, inds) @@ -1807,7 +1527,9 @@ def write_uvh5( revert_fas = True self.use_future_array_shapes() - data_compression, data_compression_opts = _get_compression(data_compression) + data_compression, data_compression_opts = hdf5_utils._get_compression( + data_compression + ) # open file for writing with h5py.File(filename, "w") as f: @@ -1823,7 +1545,7 @@ def write_uvh5( else: data_write_dtype = "c16" if data_write_dtype not in ("c8", "c16"): - _check_uvh5_dtype(data_write_dtype) + hdf5_utils._check_complex_dtype(data_write_dtype) visdata = dgrp.create_dataset( "visdata", self.data_array.shape, @@ -1833,7 +1555,7 @@ def write_uvh5( dtype=data_write_dtype, ) indices = (np.s_[:], np.s_[:], np.s_[:]) - _write_complex_astype(self.data_array, visdata, indices) + hdf5_utils._write_complex_astype(self.data_array, visdata, indices) else: visdata = dgrp.create_dataset( "visdata", @@ -1949,7 +1671,9 @@ def initialize_uvh5_file( else: raise IOError("File exists; skipping") - data_compression, data_compression_opts = _get_compression(data_compression) + data_compression, data_compression_opts = hdf5_utils._get_compression( + data_compression + ) revert_fas = False if not self.future_array_shapes: @@ -1972,7 +1696,7 @@ def initialize_uvh5_file( data_write_dtype = "c16" if data_write_dtype not in ("c8", "c16"): # make sure the data type is correct - _check_uvh5_dtype(data_write_dtype) + hdf5_utils._check_complex_dtype(data_write_dtype) dgrp.create_dataset( "visdata", data_size, @@ -2360,7 +2084,7 @@ def write_uvh5_part( if n_reg_spaced >= 2: if custom_dtype: indices = (blt_inds, freq_inds, pol_inds) - _write_complex_astype(data_array, visdata_dset, indices) + hdf5_utils._write_complex_astype(data_array, visdata_dset, indices) else: visdata_dset[blt_inds, freq_inds, pol_inds] = data_array flags_dset[blt_inds, freq_inds, pol_inds] = flag_array @@ -2372,7 +2096,7 @@ def write_uvh5_part( for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_inds, freq_idx, pol_idx) - _write_complex_astype( + hdf5_utils._write_complex_astype( data_array[:, ifreq, ipol], visdata_dset, indices ) else: @@ -2390,7 +2114,7 @@ def write_uvh5_part( for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_idx, freq_inds, pol_idx) - _write_complex_astype( + hdf5_utils._write_complex_astype( data_array[iblt, :, ipol], visdata_dset, indices ) else: @@ -2408,7 +2132,7 @@ def write_uvh5_part( for ifreq, freq_idx in enumerate(freq_inds): if custom_dtype: indices = (blt_idx, freq_idx, pol_inds) - _write_complex_astype( + hdf5_utils._write_complex_astype( data_array[iblt, ifreq, :], visdata_dset, indices ) else: @@ -2429,7 +2153,7 @@ def write_uvh5_part( for ipol, pol_idx in enumerate(pol_inds): if custom_dtype: indices = (blt_idx, freq_idx, pol_idx) - _write_complex_astype( + hdf5_utils._write_complex_astype( data_array[iblt, ifreq, ipol], visdata_dset, indices ) else: