Skip to content

Commit

Permalink
Finally some code
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielaBreitman committed Jul 15, 2024
1 parent 9c21a56 commit 006b0f6
Show file tree
Hide file tree
Showing 4 changed files with 376 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/py21cmfast-tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from lc2ps import calculate_PS
347 changes: 347 additions & 0 deletions src/py21cmfast-tools/lc2ps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
import numpy as np
import powerbox as pb
from powerbox.tools import get_power, ignore_zero_ki, ignore_zero_absk, power2delta, _magnitude_grid, regular_angular_generator


def calculate_PS(lc,
lc_redshifts,
L,
HII_DIM = None,
zs = None,
chunk_size = None,
chunk_skip = 37,
calc_2d = True,
nbins = 50,
k_weights = ignore_zero_ki,
postprocess = True,
kpar_bins = None,
log_bins = True,
crop = None,
calc_1d = False,
nbins_1d = 14,
calc_global = False,
mu = None,
bin_ave=True,
interp=None,
prefactor_fnc=power2delta,
interp_points_generator=None,
):

""" Calculate 1D and 2D PS
Parameters
----------
lc : np.ndarray
The lightcone whose power spectrum we want to calculate.
The lightcone should be a 3D array with shape [HII_DIM, HII_DIM, len(lc_redshifts)].
lc_redshifts : np.ndarray
The redshifts of the lightcone.
L : float
The side length of the box in cMpc.
HII_DIM : int, optional
The size of the lightcone in pixels. If None, inferred from the shape of the lightcone.
zs : np.ndarray, optional
The redshifts at which to calculate the power spectrum.
If None, the lightcone is broken up into chunks using arguments chunk_skip and chunk_size.
chunk_size : int, optional
The size of the chunks to break the lightcone into.
If None, the chunk is assumed to be a cube i.e. chunk_size = HII_DIM.
chunk_skip : int, optional
The number of lightcone slices to skip between chunks. Default is 37.
calc_2d : bool, optional
If True, calculate the 2D power spectrum.
nbins : int, optional
The number of bins to use for the kperp axis of the 2D PS.
k_weights : callable, optional
A function that takes a frequency tuple and returns a boolean mask for the k values to ignore.
See powerbox.tools.ignore_zero_ki for an example and powerbox.tools.get_power documentation for more details.
Default is powerbox.tools.ignore_zero_ki, which excludes the power any k_i = 0 mode.
Typically, only the central zero mode |k| = 0 is excluded, in which case use powerbox.tools.ignore_zero_absk.
postprocess : bool, optional
If True, postprocess the 2D PS. This step involves cropping out empty bins and/or log binning the kpar axis.
kpar_bins : int or np.ndarray, optional
Affects only the postprocessing step. The number of bins or the bin edges to use for binning the kpar axis.
If None, produces 16 bins.
log_bins : bool, optional
Affects only the postprocessing step. If True, log bin the kpar axis.
crop : list, optional
Affects only the postprocessing step. The crop range for the (log-binned) PS. If None, crops out only the empty bins.
calc_1d : bool, optional
If True, calculate the 1D power spectrum.
nbins_1d : int, optional
The number of bins on which to calculate 1D PS.
calc_global : bool, optional
If True, calculate the global brightness temperature.
mu : float, optional
The minimum value of :math:`\cos(\theta), \theta = \arctan (k_\perp/k_\parallel)` for all calculated PS.
If None, all modes are included.
bin_ave : bool, optional
If True, return the center value of each kperp and kpar bin i.e. len(kperp) = PS_2D.shape[0].
If False, return the left edge of each bin i.e. len(kperp) = PS_2D.shape[0] + 1.
interp : str, optional
If True, use linear interpolation to calculate the PS at the points specified by interp_points_generator.
Note that this significantly slows down the calculation.
prefactor_fnc : callable, optional
A function that takes a frequency tuple and returns the prefactor to multiply the PS with.
Default is powerbox.tools.power2delta, which converts the power P [mK^2 Mpc^{-3}] to the dimensionless power \delta^2 [mK^2].
interp_points_generator : callable, optional
A function that generates the points at which to interpolate the PS.
See powerbox.tools.get_power documentation for more details.
"""
# Split the lightcone into chunks for each redshift bin
if zs is None:
if chunk_size is None:
chunk_size = HII_DIM
n_slices = lc.shape[-1]
chunk_indices = list(range(0, n_slices - chunk_size, chunk_skip))
else:
if chunk_size is None:
chunk_size=HII_DIM
chunk_indices = np.array(np.max([np.zeros_like(zs),np.array([np.argmin(abs(lc_redshifts - z)) for z in zs]) - chunk_size//2], axis = 0), dtype = np.int32)
zs = [] # all redshifts that will be computed
lc_PS_2D = []
clean_lc_PS_2D = []
if calc_global:
Tb = []
if calc_1d:
lc_PS_1D = []
out = {}

if HII_DIM is None:
HII_DIM = lc.shape[0]

for i in chunk_indices:
start = i
end = i + chunk_size
if end > len(lc_redshifts):
shift_it_back_by_a_few_bins = end - len(lc_redshifts)
start -= shift_it_back_by_a_few_bins
end = len(lc_redshifts)
chunk = lc[..., start:end]
zs.append(lc_redshifts[(start + end) // 2])
if calc_global:
Tb.append(np.mean(chunk))
if calc_2d:
PS_2D, kperp, Nmodes,kpar = get_power(chunk,
(L,L,L*chunk.shape[-1]/HII_DIM),
res_ndim = 2,
bin_ave = bin_ave,
bins = nbins,
log_bins = log_bins,
nthreads=1,
k_weights=k_weights,
prefactor_fnc=prefactor_fnc,
interpolation_method=interp,
return_sumweights=True
)
if postprocess:
clean_PS_2D, clean_kperp, clean_kpar, clean_Nmodes = postprocess_ps(PS_2D, kperp, kpar,
log_bins = log_bins,
kpar_bins = kpar_bins,
crop = crop.copy() if crop is not None else crop,
kperp_modes = Nmodes,
return_modes = True,
)
clean_lc_PS_2D.append(clean_PS_2D)

lc_PS_2D.append(PS_2D)

if calc_1d:
if mu is not None:
if interp is None:
def mask_fnc(freq, absk):
kz_mesh = np.zeros((len(freq[0]), len(freq[1]), len(freq[2])))
kz = freq[2]
for i in range(len(kz)):
kz_mesh[:,:,i] = kz[i]
phi = np.arccos(kz_mesh/absk)
mu_mesh = abs(np.cos(phi))
kmag = _magnitude_grid([c for i, c in enumerate(freq) if i < 2])
return np.logical_and(mu_mesh > mu, ignore_zero_ki(freq, kmag))
k_weights1d=mask_fnc

if interp is not None:
k_weights1d = ignore_zero_ki
def above_mu_min_angular_generator(bins, dims2avg, angular_resolution=0.1, mu=mu):
r"""
Returns a set of spherical coordinates above a certain :math:`\\mu` value.
Parameters
----------
bins : array-like
1D array of radii at which we want to spherically average the field.
dims2avg : int
The number of dimensions to average over.
angular_resolution : float, optional
The angular resolution in radians for the sample points for the interpolation.
mu : float, optional
The minimum value of :math:`\\cos(\theta), \theta = \arctan (k_\\perp/k_\\parallel)`
for the sample points generated for the interpolation.
Returns
-------
r_n : array-like
1D array of radii.
phi_n : array-like
2D array of azimuthal angles with shape (ndim-1, N), where N is the number of points.
phi_n[0,:] :math:`\\in [0,2*\\pi]`, and phi_n[1:,:] :math:`\\in [0,\\pi]`.
"""

def generator():
r_n, phi_n = regular_angular_generator(
bins, dims2avg, angular_resolution=angular_resolution
)
# sine because the phi_n are wrt x-axis and we need them wrt z-axis.
if len(phi_n) == 1:
mask = np.sin(phi_n[0, :]) >= mu
else:
mask = np.all(np.sin(phi_n[1:, :]) >= mu, axis=0)
return r_n[mask], phi_n[:, mask]

return generator()
interp_points_generator = above_mu_min_angular_generator
else:
k_weights1d = ignore_zero_ki
PS_1D, k, Nmodes_1D = get_power(chunk,
(L,L,L*chunk.shape[-1]/HII_DIM),
bin_ave = bin_ave,
bins = nbins_1d,
log_bins = log_bins,
k_weights=k_weights1d,
prefactor_fnc=power2delta,
interpolation_method=interp,
interp_points_generator=interp_points_generator,
return_sumweights=True

)
lc_PS_1D.append(PS_1D)

if calc_1d:
out["k"] = k
out["PS_1D"] = np.array(lc_PS_1D)
out["Nmodes_1D"] = Nmodes_1D
out["mu"] = mu
if calc_2d:
out["full_kperp"] = kperp
out["full_kpar"] = kpar
out["full_PS_2D"] = np.array(lc_PS_2D)
out["final_PS_2D"] = np.array(clean_lc_PS_2D)
out["final_kpar"] = clean_kpar
out["final_kperp"] = clean_kperp
out["full_Nmodes"] = Nmodes
out["final_Nmodes"] = clean_Nmodes
if calc_global:
out["global_Tb"] = np.array(Tb)
out["redshifts"] = np.array(zs)


return out


def log_bin(ps, kperp, kpar, bins=None):
"""
Log bin a 2D PS along the kpar axis and crop out empty bins in both axes.
Parameters
----------
ps : np.ndarray
The 2D power spectrum of shape [len(kperp), len(kpar)].
kperp : np.ndarray
Values of kperp.
kpar : np.ndarray
Values of kpar.
bins : np.ndarray or int, optional
The number of bins or the bin edges to use for binning the kpar axis.
If None, produces 16 bins logarithmically spaced between the minimum and maximum `kpar` supplied.
"""
if bins is None:
bins = np.logspace(np.log10(kpar[0]), np.log10(kpar[-1]),17)
elif isinstance(bins, int):
bins = np.logspace(np.log10(kpar[0]), np.log10(kpar[-1]),bins+1)
elif isinstance(bins, np.ndarray) or isinstance(bins, list):
bins = np.array(bins)
else:
raise ValueError("Bins should be np.ndarray or int")
modes = np.zeros(len(bins)-1)
new_ps = np.zeros((len(kperp), len(bins)-1))
for i in range(len(bins)-1):
m = np.logical_and(kpar > bins[i], kpar < bins[i+1])
new_ps[:,i] = np.nanmean(ps[:,m], axis = 1)
modes[i] = np.sum(m)
#print('kpar modes', modes)
bin_centers = np.exp((np.log(bins[1:]) + np.log(bins[:-1])) / 2)
return new_ps, kperp, bin_centers, modes




def postprocess_ps(ps, kperp, kpar,
kpar_bins = None,
log_bins=True,
crop=None,
kperp_modes=None,
return_modes=False,
):
"""
Postprocess a cylindrical PS by cropping out empty bins and log binning the kpar axis.
Parameters
----------
ps : np.ndarray
The 2D power spectrum of shape [len(kperp), len(kpar)].
kperp : np.ndarray
Values of kperp.
kpar : np.ndarray
Values of kpar.
kpar_bins : np.ndarray or int, optional
The number of bins or the bin edges to use for binning the kpar axis.
If None, produces 16 bins logarithmically spaced between the minimum and maximum `kpar` supplied.
log_bins : bool, optional
If True, log bin the kpar axis.
crop : list, optional
The crop range for the log-binned PS. If None, crops out all empty bins.
kperp_modes : np.ndarray, optional
The number of modes in each kperp bin.
return_modes : bool, optional
If True, return a grid with the number of modes in each bin.
Requires kperp_modes to be supplied.
"""
kpar = kpar[0]
m = kpar > 1e-10
if ps.shape[0] < len(kperp):
if log_bins:
kperp = np.exp((np.log(kperp[1:]) + np.log(kperp[:-1]))/2.)
else:
kperp = (kperp[1:] + kperp[:-1])/2
kpar = kpar[m]
ps = ps[:,m]
mkperp = ~np.isnan(kperp)
if kperp_modes is not None:
kperp_modes = kperp_modes[mkperp]
kperp = kperp[mkperp]
ps = ps[mkperp,:]

# Bin kpar in log
rebinned_ps, kperp, log_kpar, kpar_weights = log_bin(ps, kperp, kpar, bins=kpar_bins)
if crop is None:
crop = [0, rebinned_ps.shape[0]+1, 0, rebinned_ps.shape[1]+1]
# Find last bin that is NaN and cut out all bins before
try:
lastnan_perp = np.where(np.isnan(np.nanmean(rebinned_ps, axis = 1)))[0][-1]+1
crop[0] = crop[0] + lastnan_perp
except:
pass
try:
lastnan_par = np.where(np.isnan(np.nanmean(rebinned_ps, axis = 0)))[0][-1]+1
crop[2] = crop[2] + lastnan_par
except:
pass
if kperp_modes is not None:
final_kperp_modes = kperp_modes[crop[0]:crop[1]]
kpar_grid, kperp_grid = np.meshgrid(kpar_weights[crop[2]:crop[3]], final_kperp_modes)

Ngrid = np.sqrt(kperp_grid**2 + kpar_grid**2)
if return_modes:
return rebinned_ps[crop[0]:crop[1]][:,crop[2]:crop[3]], kperp[crop[0]:crop[1]], log_kpar[crop[2]:crop[3]], Ngrid
else:
return rebinned_ps[crop[0]:crop[1]][:,crop[2]:crop[3]], kperp[crop[0]:crop[1]], log_kpar[crop[2]:crop[3]]

1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for the py21cmFAST-tools package."""
27 changes: 27 additions & 0 deletions tests/test_ps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Test cases for the __main__ module."""

import shutil

import numpy as np
import pytest
from typeguard import suppress_type_checks

from py21cmfast_tools import calculate_PS


def test_calculate_PS():
test_lc = np.random.random(100*100*1000).reshape((100, 100, 1000))
test_redshifts = np.logspace(np.log10(5), np.log10(30), 1000)
zs = [5.,6.,10.,27.]

out = calculate_PS(test_lc, test_redshifts, HII_DIM=100, L=200, zs=zs,
calc_1D = True, calc_global=True)

out = calculate_PS(test_lc, test_redshifts, HII_DIM=100, L=200, zs=zs,
calc_1D = True, calc_global=True, interp=True)

out = calculate_PS(test_lc, test_redshifts, HII_DIM=100, L=200, zs=zs,
calc_1D = True, calc_global=True, mu = 0.5)

out = calculate_PS(test_lc, test_redshifts, HII_DIM=100, L=200, zs=zs,
calc_1D = True, calc_global=True, interp=True, mu = 0.5)

0 comments on commit 006b0f6

Please sign in to comment.