diff --git a/CHANGELOG.md b/CHANGELOG.md index 73b5e9cce..9476c8c72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file. ## [Unreleased] ### Added +- New optional spatial interpolation method, ``interpolation_function="az_za_map_coordinates"`` that improves the linear +interpolation speed for data in ``az_za`` coordinates. - New UVParameter `pol_convention` on `UVData` and `UVCal`. This specifies the convention assumed for converting linear to stokes polarizations -- either "sum" or "avg". Also added to `uvcalibrate` to apply from the `UVCal` to the `UVData`. diff --git a/src/pyuvdata/uvbeam/beamfits.py b/src/pyuvdata/uvbeam/beamfits.py index b2744953a..49f9fabd3 100644 --- a/src/pyuvdata/uvbeam/beamfits.py +++ b/src/pyuvdata/uvbeam/beamfits.py @@ -547,23 +547,11 @@ def write_beamfits( else: ax_nums = reg_primary_ax_nums if self.Naxes1 > 1: - if not utils.tools._test_array_constant_spacing(self._axis1_array): - raise ValueError( - "The pixels are not evenly spaced along first axis. " - "The beam fits format does not support " - "unevenly spaced pixels." - ) axis1_spacing = self.axis1_array[1] - self.axis1_array[0] else: axis1_spacing = 1 if self.Naxes2 > 1: - if not utils.tools._test_array_constant_spacing(self._axis2_array): - raise ValueError( - "The pixels are not evenly spaced along second axis. " - "The beam fits format does not support " - "unevenly spaced pixels." - ) axis2_spacing = self.axis2_array[1] - self.axis2_array[0] else: axis2_spacing = 1 diff --git a/src/pyuvdata/uvbeam/uvbeam.py b/src/pyuvdata/uvbeam/uvbeam.py index 3ba8bf744..374a9ff94 100644 --- a/src/pyuvdata/uvbeam/uvbeam.py +++ b/src/pyuvdata/uvbeam/uvbeam.py @@ -12,7 +12,7 @@ from astropy import units from astropy.coordinates import Angle from docstring_parser import DocstringStyle -from scipy import interpolate +from scipy import interpolate, ndimage from .. import parameter as uvp, utils from ..docstrings import combine_docstrings, copy_replace_short_description @@ -60,6 +60,10 @@ class UVBeam(UVBase): "description": "scipy RectBivariate spline interpolation", "func": "_interp_az_za_rect_spline", }, + "az_za_map_coordinates": { + "description": "scipy map_coordinates interpolation", + "func": "_interp_az_za_map_coordinates", + }, "healpix_simple": { "description": "healpix nearest-neighbor bilinear interpolation", "func": "_interp_healpix_bilinear", @@ -838,6 +842,18 @@ def check( "files" ) + # Check if the interpolation points are evenly-spaced + if self.pixel_coordinate_system == "az_za": + for i, ax_param in enumerate((self._axis1_array, self._axis2_array)): + ax = ax_param.value + if len(ax) < 3: + continue + + if not utils.tools._test_array_constant_spacing(ax, tols=ax_param.tols): + raise ValueError( + f"axis{i+1}_array must be evenly spaced in az_za coordinates." + ) + return True def peak_normalize(self): @@ -1303,6 +1319,165 @@ def get_lambda(real_lut, imag_lut=None): return tuple(interp_arrays) + def _handle_input_for_freq_interpolation( + self, freq_array, *, freq_interp_kind="cubic", freq_interp_tol=1.0 + ): + """ + Handle "interp" inputs prior to calling "_interp_freq". + + This helper function that is used by the az_za and healpix beam + interpolation methods. Prior to performing interpolation along the + azimuth/zenith angle axes, this function checks if the provided frequency + array is not None. If it is not None, it performs frequency interpolation + using "_interp_freq". If the frequency array is None, it returns the + intrinsic data array and bandpass array. + """ + if freq_array is not None: + assert isinstance(freq_array, np.ndarray) + interp_arrays = self._interp_freq( + freq_array, kind=freq_interp_kind, tol=freq_interp_tol + ) + if self.antenna_type == "phased_array": + (input_data_array, interp_bandpass, interp_coupling_matrix) = ( + interp_arrays + ) + else: + input_data_array, interp_bandpass = interp_arrays + interp_coupling_matrix = None + input_nfreqs = freq_array.size + else: + input_data_array = self.data_array + input_nfreqs = self.Nfreqs + freq_array = self.freq_array + interp_bandpass = self.bandpass_array[0] + if self.antenna_type == "phased_array": + interp_coupling_matrix = self.coupling_matrix + else: + interp_coupling_matrix = None + + return ( + input_data_array, + interp_bandpass, + interp_coupling_matrix, + input_nfreqs, + freq_array, + ) + + def _check_interpolation_domain(self, az_array, za_array, phi_use, theta_use): + """Check if the interpolation domain is covered by the intrinsic data array.""" + max_axis_diff = max(np.diff(self.axis1_array)[0], np.diff(self.axis2_array)[0]) + za_sq_dist = np.full(len(za_array), np.inf) + az_sq_dist = np.full(len(az_array), np.inf) + if (len(theta_use) + len(phi_use)) > len(za_array): + # If there are fewer interpolation points than grid points, go + # through the grid points one-by-one to spot any outliers + for idx in range(az_array.size): + za_sq_dist[idx] = np.min((theta_use - za_array[idx]) ** 2.0) + az_sq_dist[idx] = np.min((phi_use - az_array[idx]) ** 2.0) + else: + # Otherwise, if we have lots of interpolation points, then it's faster + # to evaluate the grid steps one-by-one. + for theta_val in theta_use: + temp_arr = np.square(za_array - theta_val) + za_sq_dist = np.where(za_sq_dist > temp_arr, temp_arr, za_sq_dist) + + for phi_val in phi_use: + temp_arr = np.square(az_array - phi_val) + az_sq_dist = np.where(az_sq_dist > temp_arr, temp_arr, az_sq_dist) + + if np.any(np.sqrt(az_sq_dist + za_sq_dist) > (max_axis_diff * 2.0)): + raise ValueError( + "at least one interpolation location " + "is outside of the UVBeam pixel coverage." + ) + + def _prepare_coordinate_data(self, input_data_array): + """Prepare coordinate data for interpolation functions.""" + freq_axis = 2 + axis1_diff = np.diff(self.axis1_array)[0] + phi_length = np.abs(self.axis1_array[0] - self.axis1_array[-1]) + axis1_diff + phi_vals, theta_vals = self.axis1_array, self.axis2_array + + if np.isclose(phi_length, 2 * np.pi, atol=axis1_diff): + # phi wraps around, extend array in each direction to improve interpolation + extend_length = 3 + phi_use = np.concatenate( + ( + phi_vals[0] + (np.arange(-extend_length, 0) * axis1_diff), + phi_vals, + phi_vals[-1] + (np.arange(1, extend_length + 1) * axis1_diff), + ) + ) + + low_slice = input_data_array[..., :extend_length] + high_slice = input_data_array[..., -1 * extend_length :] + + data_use = np.concatenate( + (high_slice, input_data_array, low_slice), axis=freq_axis + 2 + ) + else: + phi_use = phi_vals + data_use = input_data_array + + return data_use, phi_use, theta_vals + + def _prepare_polarized_inputs(self, polarizations): + """Prepare inputs for polarized interpolation functions.""" + # Npols is only defined for power beams. For E-field beams need Nfeeds. + if self.beam_type == "power": + # get requested polarization indices + if polarizations is None: + Npol_feeds = self.Npols + pol_inds = np.arange(Npol_feeds) + else: + pols = [ + utils.polstr2num(p, x_orientation=self.x_orientation) + for p in polarizations + ] + pol_inds = [] + for pol in pols: + if pol not in self.polarization_array: + raise ValueError( + f"Requested polarization {pol} not found " + "in self.polarization_array" + ) + pol_inds.append(np.where(self.polarization_array == pol)[0][0]) + pol_inds = np.asarray(pol_inds) + Npol_feeds = len(pol_inds) + + else: + Npol_feeds = self.Nfeeds + pol_inds = np.arange(Npol_feeds) + + return Npol_feeds, pol_inds + + def _prepare_basis_vector_array(self, npoints): + """Prepare basis vector array for interpolation functions.""" + if self.basis_vector_array is not None: + if np.any(self.basis_vector_array[0, 1, :] > 0) or np.any( + self.basis_vector_array[1, 0, :] > 0 + ): + # Input basis vectors are not aligned to the native theta/phi + # coordinate system + raise NotImplementedError( + "interpolation for input basis " + "vectors that are not aligned to the " + "native theta/phi coordinate system " + "is not yet supported" + ) + else: + # The basis vector array comes in defined at the rectangular grid. + # Redefine it for the interpolation points + interp_basis_vector = np.zeros( + [self.Naxes_vec, self.Ncomponents_vec, npoints] + ) + interp_basis_vector[0, 0, :] = np.ones(npoints) # theta hat + interp_basis_vector[1, 1, :] = np.ones(npoints) # phi hat + else: + interp_basis_vector = None + + return interp_basis_vector + def _interp_az_za_rect_spline( self, *, @@ -1317,7 +1492,10 @@ def _interp_az_za_rect_spline( check_azza_domain: bool = True, ): """ - Interpolate in az_za coordinate system with a simple spline. + Interpolate in az_za coordinate system using RectBivariateSpline. + + Uses the :func:`scipy.interpolate.RectBivariateSpline` function to perform + interpolation in the azimuth-zenith angle coordinate system. Parameters ---------- @@ -1365,183 +1543,87 @@ def _interp_az_za_rect_spline( "function" ) - if freq_array is not None: - assert isinstance(freq_array, np.ndarray) - interp_arrays = self._interp_freq( - freq_array, kind=freq_interp_kind, tol=freq_interp_tol - ) - if self.antenna_type == "phased_array": - (input_data_array, interp_bandpass, interp_coupling_matrix) = ( - interp_arrays - ) - else: - input_data_array, interp_bandpass = interp_arrays - input_nfreqs = freq_array.size - else: - input_data_array = self.data_array - input_nfreqs = self.Nfreqs - freq_array = self.freq_array - interp_bandpass = self.bandpass_array[0] - if self.antenna_type == "phased_array": - interp_coupling_matrix = self.coupling_matrix + # Perform initial frequency interpolation to get the data array + ( + input_data_array, + interp_bandpass, + interp_coupling_matrix, + input_nfreqs, + freq_array, + ) = self._handle_input_for_freq_interpolation( + freq_array=freq_array, + freq_interp_kind=freq_interp_kind, + freq_interp_tol=freq_interp_tol, + ) + # If az_array and za_array are not provided, return the interpolated data if az_array is None or za_array is None: interp_arrays = [input_data_array, self.basis_vector_array, interp_bandpass] if self.antenna_type == "phased_array": interp_arrays.append(interp_coupling_matrix) return tuple(interp_arrays) + freq_axis = 2 + + # Check input arrays assert isinstance(az_array, np.ndarray) assert isinstance(za_array, np.ndarray) assert az_array.ndim == 1 assert az_array.shape == za_array.shape - - npoints = az_array.size - - axis1_diff = np.diff(self.axis1_array)[0] - axis2_diff = np.diff(self.axis2_array)[0] - max_axis_diff = np.max([axis1_diff, axis2_diff]) - - phi_length = np.abs(self.axis1_array[0] - self.axis1_array[-1]) + axis1_diff - - phi_vals, theta_vals = self.axis1_array, self.axis2_array - - freq_axis = 2 assert input_data_array.shape[freq_axis] == input_nfreqs + # Get the data type if np.iscomplexobj(input_data_array): data_type = np.complex128 else: data_type = np.float64 - if np.isclose(phi_length, 2 * np.pi, atol=axis1_diff): - # phi wraps around, extend array in each direction to improve interpolation - extend_length = 3 - phi_use = np.concatenate( - ( - phi_vals[0] + (np.arange(-extend_length, 0) * axis1_diff), - phi_vals, - phi_vals[-1] + (np.arange(1, extend_length + 1) * axis1_diff), - ) - ) + # Prepare the data for interpolation + data_use, phi_use, theta_use = self._prepare_coordinate_data(input_data_array) - low_slice = input_data_array[..., :extend_length] - high_slice = input_data_array[..., -1 * extend_length :] + # Prepare basis functions + interp_basis_vector = self._prepare_basis_vector_array(az_array.size) - data_use = np.concatenate( - (high_slice, input_data_array, low_slice), axis=freq_axis + 2 - ) - else: - phi_use = phi_vals - data_use = input_data_array + # Get number of polarizations and indices + Npol_feeds, pol_inds = self._prepare_polarized_inputs(polarizations) - # This is now always true, keeping this here to keep naming convention the same - theta_use = theta_vals + # Check if the interpolation points are within the data array + if check_azza_domain: + self._check_interpolation_domain(az_array, za_array, phi_use, theta_use) - if self.basis_vector_array is not None: - if np.any(self.basis_vector_array[0, 1, :] > 0) or np.any( - self.basis_vector_array[1, 0, :] > 0 - ): - # Input basis vectors are not aligned to the native theta/phi - # coordinate system - raise NotImplementedError( - "interpolation for input basis " - "vectors that are not aligned to the " - "native theta/phi coordinate system " - "is not yet supported" - ) - else: - # The basis vector array comes in defined at the rectangular grid. - # Redefine it for the interpolation points - interp_basis_vector = np.zeros( - [self.Naxes_vec, self.Ncomponents_vec, npoints] - ) - interp_basis_vector[0, 0, :] = np.ones(npoints) # theta hat - interp_basis_vector[1, 1, :] = np.ones(npoints) # phi hat - else: - interp_basis_vector = None + interp_data = np.zeros( + (self.Naxes_vec, Npol_feeds, input_nfreqs, az_array.size), dtype=data_type + ) - def get_lambda(real_lut, imag_lut=None): + def get_lambda(real_lut, imag_lut=None, **kwargs): # Returns function objects for interpolation reuse if imag_lut is None: - return lambda za, az: real_lut(za, az, grid=False) + return lambda za, az: real_lut(za, az, **kwargs) else: return lambda za, az: ( - real_lut(za, az, grid=False) + 1j * imag_lut(za, az, grid=False) - ) - - # Npols is only defined for power beams. For E-field beams need Nfeeds. - if self.beam_type == "power": - # get requested polarization indices - if polarizations is None: - Npol_feeds = self.Npols - pol_inds = np.arange(Npol_feeds) - else: - pols = [ - utils.polstr2num(p, x_orientation=self.x_orientation) - for p in polarizations - ] - pol_inds = [] - for pol in pols: - if pol not in self.polarization_array: - raise ValueError( - f"Requested polarization {pol} not found " - "in self.polarization_array" - ) - pol_inds.append(np.where(self.polarization_array == pol)[0][0]) - pol_inds = np.asarray(pol_inds) - Npol_feeds = len(pol_inds) - - else: - Npol_feeds = self.Nfeeds - pol_inds = np.arange(Npol_feeds) - - if check_azza_domain: - za_sq_dist = np.full(len(za_array), np.inf) - az_sq_dist = np.full(len(az_array), np.inf) - if (len(theta_use) + len(phi_use)) > len(za_array): - # If there are fewer interpolation points than grid points, go - # through the grid points one-by-one to spot any outliers - for idx in range(npoints): - za_sq_dist[idx] = np.min((theta_use - za_array[idx]) ** 2.0) - az_sq_dist[idx] = np.min((phi_use - az_array[idx]) ** 2.0) - else: - # Otherwise, if we have lots of interpolation points, then it's faster - # to evaluate the grid steps one-by-one. - for theta_val in theta_use: - temp_arr = np.square(za_array - theta_val) - za_sq_dist = np.where(za_sq_dist > temp_arr, temp_arr, za_sq_dist) - - for phi_val in phi_use: - temp_arr = np.square(az_array - phi_val) - az_sq_dist = np.where(az_sq_dist > temp_arr, temp_arr, az_sq_dist) - - if np.any(np.sqrt(az_sq_dist + za_sq_dist) > (max_axis_diff * 2.0)): - raise ValueError( - "at least one interpolation location " - "is outside of the UVBeam pixel coverage." + real_lut(za, az, **kwargs) + 1j * imag_lut(za, az, **kwargs) ) - data_shape = (self.Naxes_vec, Npol_feeds, input_nfreqs, npoints) - interp_data = np.zeros(data_shape, dtype=data_type) - if spline_opts is None or not isinstance(spline_opts, dict): spline_opts = {} if reuse_spline and not hasattr(self, "saved_interp_functions"): int_dict = {} self.saved_interp_functions = int_dict + for index3 in range(input_nfreqs): freq = freq_array[index3] for index0 in range(self.Naxes_vec): for pol_return_ind, index2 in enumerate(pol_inds): do_interp = True key = (freq, index2, index0) + if reuse_spline and key in self.saved_interp_functions: do_interp = False lut = self.saved_interp_functions[key] if do_interp: data_inds = (index0, index2, index3) + if np.iscomplexobj(data_use): # interpolate real and imaginary parts separately real_lut = interpolate.RectBivariateSpline( @@ -1556,12 +1638,13 @@ def get_lambda(real_lut, imag_lut=None): data_use[data_inds].imag, **spline_opts, ) - lut = get_lambda(real_lut, imag_lut) + lut = get_lambda(real_lut, imag_lut, grid=False) else: lut = interpolate.RectBivariateSpline( theta_use, phi_use, data_use[data_inds], **spline_opts ) - lut = get_lambda(lut) + lut = get_lambda(lut, grid=False) + if reuse_spline: self.saved_interp_functions[key] = lut @@ -1574,6 +1657,152 @@ def get_lambda(real_lut, imag_lut=None): interp_arrays.append(interp_coupling_matrix) return tuple(interp_arrays) + def _interp_az_za_map_coordinates( + self, + *, + az_array, + za_array, + freq_array, + freq_interp_kind="cubic", + freq_interp_tol=1.0, + polarizations=None, + spline_opts=None, + check_azza_domain: bool = True, + reuse_spline: bool = False, + ): + """ + Interpolate in az_za coordinate system using map_coordinates. + + Uses the :func:`scipy.ndimage.map_coordinates` function to perform + interpolation in the azimuth-zenith angle coordinate system. + + Parameters + ---------- + az_array : array_like of floats + Azimuth values to interpolate to in radians, specifying the azimuth + positions for every interpolation point (same length as `za_array`). + za_array : array_like of floats + Zenith values to interpolate to in radians, specifying the zenith + positions for every interpolation point (same length as `az_array`). + freq_array : array_like of floats + Frequency values to interpolate to. + freq_interp_kind : str + Interpolation method to use frequency. + See scipy.interpolate.interp1d for details. + polarizations : list of str + polarizations to interpolate if beam_type is 'power'. + Default is all polarizations in self.polarization_array. + reuse_spline : bool + Save the interpolation functions for reuse. + spline_opts : dict + Options (kx, ky, s) for numpy.RectBivariateSpline. + check_azza_domain : bool + Whether to check the domain of az/za to ensure that they are covered by the + intrinsic data array. Checking them can be quite computationally expensive. + + Returns + ------- + interp_data : array_like of float or complex + The array of interpolated data values, + shape: (Naxes_vec, Nfeeds or Npols, freq_array.size, az_array.size) + interp_basis_vector : array_like of float + The array of interpolated basis vectors, + shape: (Naxes_vec, Ncomponents_vec, az_array.size) + interp_bandpass : array_like of float + The interpolated bandpass. shape: (freq_array.size,) + interp_coupling_matrix : array_like of complex, optional + The interpolated coupling matrix. Only returned if antenna_type is + "phased_array". + shape: (Nelements, Nelements, Nfeeds, Nfeeds, freq_array.size) + + """ + if self.pixel_coordinate_system != "az_za": + raise ValueError( + "pixel_coordinate_system must be 'az_za' to use this interpolation " + "function" + ) + + # Perform initial frequency interpolation to get the data array + ( + input_data_array, + interp_bandpass, + interp_coupling_matrix, + input_nfreqs, + freq_array, + ) = self._handle_input_for_freq_interpolation( + freq_array=freq_array, + freq_interp_kind=freq_interp_kind, + freq_interp_tol=freq_interp_tol, + ) + + if az_array is None or za_array is None: + interp_arrays = [input_data_array, self.basis_vector_array, interp_bandpass] + if self.antenna_type == "phased_array": + interp_arrays.append(interp_coupling_matrix) + return tuple(interp_arrays) + + freq_axis = 2 + + # Check input arrays + assert isinstance(az_array, np.ndarray) + assert isinstance(za_array, np.ndarray) + assert az_array.ndim == 1 + assert az_array.shape == za_array.shape + assert input_data_array.shape[freq_axis] == input_nfreqs + + # Get the data type + if np.iscomplexobj(input_data_array): + data_type = np.complex128 + else: + data_type = np.float64 + + # Prepare the data for interpolation + data_use, phi_use, theta_use = self._prepare_coordinate_data(input_data_array) + + # Prepare basis functions + interp_basis_vector = self._prepare_basis_vector_array(az_array.size) + + # Get number of polarizations and indices + Npol_feeds, pol_inds = self._prepare_polarized_inputs(polarizations) + + # Check if the interpolation points are within the data array + if check_azza_domain: + self._check_interpolation_domain(az_array, za_array, phi_use, theta_use) + + interp_data = np.zeros( + (self.Naxes_vec, Npol_feeds, input_nfreqs, az_array.size), dtype=data_type + ) + + if spline_opts is None or not isinstance(spline_opts, dict): + spline_opts = {} + + _az_array = ( + (az_array - phi_use.min()) + / (phi_use.max() - phi_use.min()) + * (phi_use.size - 1) + ) + _za_array = ( + (za_array - theta_use.min()) + / (theta_use.max() - theta_use.min()) + * (theta_use.size - 1) + ) + + for index3 in range(input_nfreqs): + for index0 in range(self.Naxes_vec): + for pol_return_ind, index2 in enumerate(pol_inds): + interp_data[index0, pol_return_ind, index3, :] = ( + ndimage.map_coordinates( + data_use[index0, index2, index3], + np.array([_za_array, _az_array]), + **spline_opts, + ) + ) + + interp_arrays = [interp_data, interp_basis_vector, interp_bandpass] + if self.antenna_type == "phased_array": + interp_arrays.append(interp_coupling_matrix) + return tuple(interp_arrays) + def _interp_healpix_bilinear( self, *, @@ -1645,25 +1874,18 @@ def _interp_healpix_bilinear( "simple healpix interpolation requires healpix pixels to be in order." ) - if freq_array is not None: - assert isinstance(freq_array, np.ndarray) - interp_arrays = self._interp_freq( - freq_array, kind=freq_interp_kind, tol=freq_interp_tol - ) - if self.antenna_type == "phased_array": - (input_data_array, interp_bandpass, interp_coupling_matrix) = ( - interp_arrays - ) - else: - input_data_array, interp_bandpass = interp_arrays - input_nfreqs = freq_array.size - else: - input_data_array = self.data_array - input_nfreqs = self.Nfreqs - freq_array = self.freq_array[0] - interp_bandpass = self.bandpass_array[0] - if self.antenna_type == "phased_array": - interp_coupling_matrix = self.coupling_matrix + # Perform initial frequency interpolation to get the data array + ( + input_data_array, + interp_bandpass, + interp_coupling_matrix, + input_nfreqs, + freq_array, + ) = self._handle_input_for_freq_interpolation( + freq_array=freq_array, + freq_interp_kind=freq_interp_kind, + freq_interp_tol=freq_interp_tol, + ) if az_array is None or za_array is None: interp_arrays = [input_data_array, self.basis_vector_array, interp_bandpass] @@ -1671,68 +1893,26 @@ def _interp_healpix_bilinear( interp_arrays.append(interp_coupling_matrix) return tuple(interp_arrays) + # Check input arrays assert isinstance(az_array, np.ndarray) assert isinstance(za_array, np.ndarray) assert az_array.ndim == 1 assert az_array.shape == za_array.shape - npoints = az_array.size - - # Npols is only defined for power beams. For E-field beams need Nfeeds. - if self.beam_type == "power": - # get requested polarization indices - if polarizations is None: - Npol_feeds = self.Npols - pol_inds = np.arange(Npol_feeds) - else: - pols = [ - utils.polstr2num(p, x_orientation=self.x_orientation) - for p in polarizations - ] - pol_inds = [] - for pol in pols: - if pol not in self.polarization_array: - raise ValueError( - f"Requested polarization {pol} not found " - "in self.polarization_array" - ) - pol_inds.append(np.where(self.polarization_array == pol)[0][0]) - pol_inds = np.asarray(pol_inds) - Npol_feeds = len(pol_inds) - else: - Npol_feeds = self.Nfeeds - pol_inds = np.arange(Npol_feeds) + # Get number of polarizations and indices + Npol_feeds, pol_inds = self._prepare_polarized_inputs(polarizations) if np.iscomplexobj(input_data_array): data_type = np.complex128 else: data_type = np.float64 + interp_data = np.zeros( (self.Naxes_vec, Npol_feeds, input_nfreqs, len(az_array)), dtype=data_type ) - if self.basis_vector_array is not None: - if np.any(self.basis_vector_array[0, 1, :] > 0) or np.any( - self.basis_vector_array[1, 0, :] > 0 - ): - """Input basis vectors are not aligned to the native theta/phi - coordinate system""" - raise NotImplementedError( - "interpolation for input basis " - "vectors that are not aligned to the " - "native theta/phi coordinate system " - "is not yet supported" - ) - else: - """The basis vector array comes in defined at the rectangular grid. - Redefine it for the interpolation points""" - interp_basis_vector = np.zeros( - [self.Naxes_vec, self.Ncomponents_vec, npoints] - ) - interp_basis_vector[0, 0, :] = np.ones(npoints) # theta hat - interp_basis_vector[1, 1, :] = np.ones(npoints) # phi hat - else: - interp_basis_vector = None + # Prepare basis functions + interp_basis_vector = self._prepare_basis_vector_array(az_array.size) hp_obj = HEALPix(nside=self.nside, order=self.ordering) lat_array = Angle(np.pi / 2, units.radian) - Angle(za_array, units.radian) @@ -1797,6 +1977,8 @@ def interp( - "az_za_simple": Uses scipy RectBivariate spline interpolation, can only be used on objects with an "az_za" pixel_coordinate_system. + - "az_za_map_coordinates": Uses scipy map_coordinates interpolation, can only + be used on objects with an "az_za" pixel_coordinate_system. - "healpix_simple": Uses HEALPix nearest-neighbor bilinear interpolation, can only be used on objects with a "healpix" pixel_coordinate_system. @@ -1813,7 +1995,8 @@ def interp( interpolation_function : str, optional Specify the interpolation function to use. Defaults to: "az_za_simple" for objects with the "az_za" pixel_coordinate_system and "healpix_simple" for - objects with the "healpix" pixel_coordinate_system. + objects with the "healpix" pixel_coordinate_system. "az_za_map_coordinates" + is also available for objects with the "az_za" pixel_coordinate_system. freq_interp_kind : str Interpolation method to use frequency. See scipy.interpolate.interp1d for details. Defaults to "cubic" (Note that this is a change. It used to @@ -1849,7 +2032,7 @@ def interp( spline_opts : dict Provide options to numpy.RectBivariateSpline. This includes spline order parameters `kx` and `ky`, and smoothing parameter `s`. - Only applies for `az_za_simple` interpolation. + Applies for `az_za_simple` and `az_za_map_coordinates` interpolation. run_check : bool Only used if new_object is True. Option to check for the existence and proper shapes of required parameters on the new object. @@ -2137,6 +2320,8 @@ def to_healpix( - "az_za_simple": Uses scipy RectBivariate spline interpolation, can only be used on objects with an "az_za" pixel_coordinate_system. + - "az_za_map_coordinates": Uses scipy map_coordinates interpolation, can only + be used on objects with an "az_za" pixel_coordinate Parameters ---------- @@ -2145,8 +2330,10 @@ def to_healpix( the nside that gives the closest resolution that is higher than the input resolution. interpolation_function : str, optional - Specify the interpolation function to use. Defaults to to: "az_za_simple" - for objects with the "az_za" pixel_coordinate_system. + Specify the interpolation function to use. Defaults to to: + "az_za_simple" for objects with the "az_za" pixel_coordinate_system. + "az_za_map_coordinates" is also available for objects with the "az_za" + pixel_coordinate_system. run_check : bool Option to check for the existence and proper shapes of required parameters after converting to healpix. @@ -2969,18 +3156,17 @@ def select( axis1_inds = sorted(set(axis1_inds)) if min(axis1_inds) < 0 or max(axis1_inds) > beam_object.Naxes1 - 1: raise ValueError("axis1_inds must be > 0 and < Naxes1") - beam_object.Naxes1 = len(axis1_inds) - beam_object.axis1_array = beam_object.axis1_array[axis1_inds] - if beam_object.Naxes1 > 1 and not utils.tools._test_array_constant_spacing( - beam_object._axis1_array + if len(axis1_inds) > 1 and not utils.tools._test_array_constant_spacing( + beam_object.axis1_array[axis1_inds], tols=beam_object._axis1_array.tols ): - warnings.warn( - "Selected values along first image axis are " - "not evenly spaced. This is not supported by " - "the regularly gridded beam fits format" + raise ValueError( + "Selected values along first image axis must be evenly spaced." ) + beam_object.Naxes1 = len(axis1_inds) + beam_object.axis1_array = beam_object.axis1_array[axis1_inds] + beam_object.data_array = beam_object.data_array[..., axis1_inds] if beam_object.beam_type == "efield": beam_object.basis_vector_array = beam_object.basis_vector_array[ @@ -3002,18 +3188,17 @@ def select( axis2_inds = sorted(set(axis2_inds)) if min(axis2_inds) < 0 or max(axis2_inds) > beam_object.Naxes2 - 1: raise ValueError("axis2_inds must be > 0 and < Naxes2") - beam_object.Naxes2 = len(axis2_inds) - beam_object.axis2_array = beam_object.axis2_array[axis2_inds] - if beam_object.Naxes2 > 1 and not utils.tools._test_array_constant_spacing( - beam_object._axis2_array + if len(axis2_inds) > 1 and not utils.tools._test_array_constant_spacing( + beam_object.axis2_array[axis2_inds], tols=beam_object._axis2_array.tols ): - warnings.warn( - "Selected values along second image axis are " - "not evenly spaced. This is not supported by " - "the regularly gridded beam fits format" + raise ValueError( + "Selected values along second image axis must be evenly spaced." ) + beam_object.Naxes2 = len(axis2_inds) + beam_object.axis2_array = beam_object.axis2_array[axis2_inds] + beam_object.data_array = beam_object.data_array[..., axis2_inds, :] if beam_object.beam_type == "efield": beam_object.basis_vector_array = beam_object.basis_vector_array[ diff --git a/tests/uvbeam/test_uvbeam.py b/tests/uvbeam/test_uvbeam.py index d05be5cdc..21925f132 100644 --- a/tests/uvbeam/test_uvbeam.py +++ b/tests/uvbeam/test_uvbeam.py @@ -359,6 +359,20 @@ def test_check_auto_power_errors(cst_efield_2freq_cut): cst_efield_2freq_cut._fix_auto_power() +def test_check_irregular_grid_az_za_errors(cst_efield_2freq_cut): + for param_name in ("axis1_array", "axis2_array"): + uvb = cst_efield_2freq_cut.copy() + arr_val = getattr(uvb, param_name) + arr_val[0] += 0.01 + setattr(uvb, param_name, arr_val) + + with pytest.raises( + ValueError, + match=f"{param_name} must be evenly spaced in az_za coordinates.", + ): + uvb.check() + + @pytest.mark.parametrize("beam_type", ["efield", "power"]) def test_peak_normalize(beam_type, cst_efield_2freq, cst_power_2freq): if beam_type == "efield": @@ -606,7 +620,12 @@ def test_efield_to_power_errors(cst_efield_2freq_cut, cst_power_2freq_cut): @pytest.mark.parametrize("antenna_type", ["simple", "phased_array"]) -def test_freq_interpolation(antenna_type, cst_power_2freq, phased_array_beam_2freq): +@pytest.mark.parametrize( + "interpolation_function", ["az_za_simple", "az_za_map_coordinates"] +) +def test_freq_interpolation( + antenna_type, interpolation_function, cst_power_2freq, phased_array_beam_2freq +): if antenna_type == "simple": beam = cst_power_2freq else: @@ -623,6 +642,7 @@ def test_freq_interpolation(antenna_type, cst_power_2freq, phased_array_beam_2fr freq_interp_kind="linear", return_bandpass=True, return_coupling=need_coupling, + interpolation_function=interpolation_function, ) if antenna_type == "simple": interp_data, interp_basis_vector, interp_bandpass = interp_arrays @@ -650,6 +670,7 @@ def test_freq_interpolation(antenna_type, cst_power_2freq, phased_array_beam_2fr freq_interp_kind="cubic", return_bandpass=True, return_coupling=need_coupling, + interpolation_function=interpolation_function, ) if antenna_type == "simple": interp_data, interp_basis_vector, interp_bandpass = interp_arrays @@ -692,6 +713,7 @@ def test_freq_interpolation(antenna_type, cst_power_2freq, phased_array_beam_2fr freq_interp_tol=0.0, new_object=True, freq_interp_kind="linear", + interpolation_function=interpolation_function, ) np.testing.assert_array_almost_equal(new_beam_obj.freq_array, freq_orig_vals) assert not hasattr(new_beam_obj, "saved_interp_functions") @@ -709,6 +731,7 @@ def test_freq_interpolation(antenna_type, cst_power_2freq, phased_array_beam_2fr freq_interp_tol=1.0, freq_interp_kind="cubic", new_object=True, + interpolation_function=interpolation_function, ) assert isinstance(new_beam_obj, UVBeam) np.testing.assert_array_almost_equal(new_beam_obj.freq_array, freq_orig_vals) @@ -731,6 +754,7 @@ def test_freq_interpolation(antenna_type, cst_power_2freq, phased_array_beam_2fr freq_interp_tol=0.0, freq_interp_kind="linear", new_object=True, + interpolation_function=interpolation_function, ) assert isinstance(new_beam_obj, UVBeam) @@ -770,7 +794,9 @@ def test_freq_interpolation(antenna_type, cst_power_2freq, phased_array_beam_2fr with pytest.raises( ValueError, match="Only one frequency in UVBeam so cannot interpolate." ): - beam_singlef.interp(freq_array=np.array([150e6])) + beam_singlef.interp( + freq_array=np.array([150e6]), interpolation_function=interpolation_function + ) def test_freq_interp_real_and_complex(cst_power_2freq): @@ -813,8 +839,15 @@ def test_freq_interp_real_and_complex(cst_power_2freq): @pytest.mark.parametrize("beam_type", ["efield", "power", "phased_array"]) +@pytest.mark.parametrize( + "interpolation_function", ["az_za_simple", "az_za_map_coordinates", None] +) def test_spatial_interpolation_samepoints( - beam_type, cst_power_2freq_cut, cst_efield_2freq_cut, phased_array_beam_2freq + cst_power_2freq_cut, + cst_efield_2freq_cut, + phased_array_beam_2freq, + beam_type, + interpolation_function, ): """ check that interpolating to existing points gives the same answer @@ -831,20 +864,12 @@ def test_spatial_interpolation_samepoints( za_orig_vals = za_orig_vals.ravel(order="C") freq_orig_vals = np.array([123e6, 150e6]) - # test defaulting works if no interpolation function is set interp_data_array, interp_basis_vector = uvbeam.interp( - az_array=az_orig_vals, za_array=za_orig_vals, freq_array=freq_orig_vals - ) - - interp_data_array2, interp_basis_vector2 = uvbeam.interp( az_array=az_orig_vals, za_array=za_orig_vals, freq_array=freq_orig_vals, - interpolation_function="az_za_simple", + interpolation_function=interpolation_function, ) - assert np.allclose(interp_data_array, interp_data_array2) - if beam_type == "efield": - assert np.allclose(interp_basis_vector, interp_basis_vector2) interp_data_array = interp_data_array.reshape(uvbeam.data_array.shape, order="F") assert np.allclose(uvbeam.data_array, interp_data_array) @@ -869,8 +894,7 @@ def test_spatial_interpolation_samepoints( interpolation_function="healpix_simple", ) - # test warning if interpolation_function is set differently on object and in - # function call and error if not set to known function + # test error if not set to known function with pytest.raises( ValueError, match="interpolation_function not recognized, must be one of " ): @@ -902,12 +926,14 @@ def test_spatial_interpolation_samepoints( az_za_grid=True, freq_array=freq_orig_vals, new_object=True, + interpolation_function=interpolation_function, ) + interp_fn_str = interpolation_function + if interpolation_function is None: + interp_fn_str = "az_za_simple" assert new_beam.history == ( - uvbeam.history + " Interpolated in " - "frequency and to a new azimuth/zenith " - "angle grid using pyuvdata with " - "interpolation_function = az_za_simple " + uvbeam.history + " Interpolated in frequency and to a new azimuth/zenith " + f"angle grid using pyuvdata with interpolation_function = {interp_fn_str} " "and freq_interp_kind = nearest." ) # make histories & freq_interp_kind equal @@ -924,6 +950,7 @@ def test_spatial_interpolation_samepoints( za_array=za_orig_vals, freq_array=freq_orig_vals, new_object=True, + interpolation_function=interpolation_function, ) if beam_type == "power": @@ -933,6 +960,7 @@ def test_spatial_interpolation_samepoints( za_array=za_orig_vals, freq_array=freq_orig_vals, polarizations=["xx"], + interpolation_function=interpolation_function, ) data_array_compare = uvbeam.data_array[:, :1] @@ -1057,6 +1085,26 @@ def test_spatial_interpolation_everyother( assert np.allclose(select_data_array_orig, select_data_array_reused) del uvbeam.saved_interp_functions + # test comparison of different interpolation functions + spline_opts = {"kx": 1, "ky": 1} + linear_data_array, _ = uvbeam.interp( + az_array=az_interp_vals, + za_array=za_interp_vals, + freq_array=freq_interp_vals, + freq_interp_kind="linear", + spline_opts=spline_opts, + ) + + mp_data_array, _ = uvbeam.interp( + az_array=az_interp_vals, + za_array=za_interp_vals, + freq_array=freq_interp_vals, + freq_interp_kind="linear", + interpolation_function="az_za_map_coordinates", + spline_opts={"order": 1}, + ) + assert np.allclose(linear_data_array, mp_data_array) + @pytest.mark.parametrize("beam_type", ["efield", "power"]) def test_spatial_interp_cutsky(beam_type, cst_power_2freq_cut, cst_efield_2freq_cut): @@ -1092,7 +1140,10 @@ def test_spatial_interp_cutsky(beam_type, cst_power_2freq_cut, cst_efield_2freq_ assert select_beam == interp_beam -def test_spatial_interpolation_errors(cst_power_2freq_cut): +@pytest.mark.parametrize( + "interpolation_function", ["az_za_simple", "az_za_map_coordinates"] +) +def test_spatial_interpolation_errors(interpolation_function, cst_power_2freq_cut): uvbeam = cst_power_2freq_cut az_interp_vals = np.array( @@ -1111,7 +1162,10 @@ def test_spatial_interpolation_errors(cst_power_2freq_cut): "the UVBeam freq_array range.", ): uvbeam.interp( - az_array=az_interp_vals, za_array=za_interp_vals, freq_array=np.array([100]) + az_array=az_interp_vals, + za_array=za_interp_vals, + freq_array=np.array([100]), + interpolation_function=interpolation_function, ) # test errors if frequency interp values outside range @@ -1119,17 +1173,29 @@ def test_spatial_interpolation_errors(cst_power_2freq_cut): ValueError, match="If az_za_grid is set to True, az_array and za_array must be provided.", ): - uvbeam.interp(az_za_grid=True, freq_array=freq_interp_vals) + uvbeam.interp( + az_za_grid=True, + freq_array=freq_interp_vals, + interpolation_function=interpolation_function, + ) # test errors if positions outside range with pytest.raises( ValueError, match="at least one interpolation location " "is outside of the UVBeam pixel coverage.", ): - uvbeam.interp(az_array=az_interp_vals, za_array=za_interp_vals + np.pi / 2) + uvbeam.interp( + az_array=az_interp_vals, + za_array=za_interp_vals + np.pi / 2, + interpolation_function=interpolation_function, + ) # test no errors only frequency interpolation - _, _ = uvbeam.interp(freq_array=freq_interp_vals, freq_interp_kind="linear") + _, _ = uvbeam.interp( + freq_array=freq_interp_vals, + freq_interp_kind="linear", + interpolation_function=interpolation_function, + ) # assert polarization value error with pytest.raises( @@ -1137,7 +1203,10 @@ def test_spatial_interpolation_errors(cst_power_2freq_cut): match="Requested polarization 1 not found in self.polarization_array", ): uvbeam.interp( - az_array=az_interp_vals, za_array=za_interp_vals, polarizations=["pI"] + az_array=az_interp_vals, + za_array=za_interp_vals, + polarizations=["pI"], + interpolation_function=interpolation_function, ) # test error returning coupling matrix for simple antenna_types @@ -1146,7 +1215,10 @@ def test_spatial_interpolation_errors(cst_power_2freq_cut): match="return_coupling can only be set if antenna_type is phased_array", ): uvbeam.interp( - az_array=az_interp_vals, za_array=za_interp_vals, return_coupling=True + az_array=az_interp_vals, + za_array=za_interp_vals, + return_coupling=True, + interpolation_function=interpolation_function, ) @@ -1274,6 +1346,21 @@ def test_healpix_interpolation(antenna_type, cst_efield_2freq, phased_array_beam interpolation_function="az_za_simple", ) + # test error with using an incompatible interpolation function + with pytest.raises( + ValueError, + match=re.escape( + "pixel_coordinate_system must be 'az_za' to use this interpolation " + "function" + ), + ): + interp_data_array, _ = hpx_efield_beam.interp( + az_array=az_orig_vals, + za_array=za_orig_vals, + freq_array=freq_orig_vals, + interpolation_function="az_za_map_coordinates", + ) + # test that interp to every other point returns an object that matches a select pixel_inds = np.arange(0, hpx_efield_beam.Npixels, 2) select_beam = hpx_efield_beam.select(pixels=pixel_inds, inplace=False) @@ -1621,16 +1708,11 @@ def test_select_axis(cst_power_1freq, tmp_path): # check for warnings and errors associated with unevenly spaced image pixels power_beam2 = power_beam.copy() - with check_warnings( - UserWarning, "Selected values along first image axis are not evenly spaced" - ): - power_beam2.select(axis1_inds=[0, 5, 6]) with pytest.raises( ValueError, - match="The pixels are not evenly spaced along first axis. " - "The beam fits format does not support unevenly spaced pixels.", + match="Selected values along first image axis must be evenly spaced.", ): - power_beam2.write_beamfits(write_file_beamfits) + power_beam2.select(axis1_inds=[0, 5, 6], inplace=False) # Test selecting on axis2 inds2_to_keep = np.arange(5, 14) @@ -1663,16 +1745,11 @@ def test_select_axis(cst_power_1freq, tmp_path): # check for warnings and errors associated with unevenly spaced image pixels power_beam2 = power_beam.copy() - with check_warnings( - UserWarning, "Selected values along second image axis are not evenly spaced" - ): - power_beam2.select(axis2_inds=[0, 5, 6]) with pytest.raises( ValueError, - match="The pixels are not evenly spaced along second axis. " - "The beam fits format does not support unevenly spaced pixels.", + match="Selected values along second image axis must be evenly spaced.", ): - power_beam2.write_beamfits(write_file_beamfits) + power_beam2.select(axis2_inds=[0, 5, 6]) @pytest.mark.parametrize("antenna_type", ["simple", "phased_array"])