From d7be93c65756d89eadcc2729af6d6351d317c464 Mon Sep 17 00:00:00 2001 From: Jack Lashner Date: Tue, 9 Jul 2024 18:03:01 +0000 Subject: [PATCH 1/5] Adds TES parameter correction module --- sodetlib/tes_param_correction.py | 529 +++++++++++++++++++++++++++++++ 1 file changed, 529 insertions(+) create mode 100644 sodetlib/tes_param_correction.py diff --git a/sodetlib/tes_param_correction.py b/sodetlib/tes_param_correction.py new file mode 100644 index 00000000..20d20fc6 --- /dev/null +++ b/sodetlib/tes_param_correction.py @@ -0,0 +1,529 @@ +""" +This is a module for a more accurate estimation of TES parameters than what +currently exists in the IV and Bias Step Modules. + +It contains the follwing procedures: + - recompute_ivpars: corrects errors in IV analysis + - run_correction: Takes IV and bias step data to compute corrected TES params + based on the RP fitting method described here: + https://simonsobs.atlassian.net/wiki/spaces/~5570586d07625a6be74c8780e4b96f6156f5e6/blog/2024/02/02/286228683/Nonlinear+TES+model+using+RP+curve + +Authors: Remy Gerras, Satoru Takakura, Jack Lashner +""" + +import numpy as np +from scipy.interpolate import interp1d +from scipy.optimize import minimize, OptimizeResult +from tqdm.auto import trange +import multiprocessing +from typing import Optional, List, Tuple +from dataclasses import dataclass +import traceback +import matplotlib.pyplot as plt + +from sodetlib.operations import bias_steps, iv + +def model_logalpha(r, logp0=0.0, p1=7.0, p2=1.0, logp3=0.0): + """ + Fits Alpha' parameter, or alpha / GT. P0 has units of [uW]^-1. + """ + return logp0 + p1 * r - p2 * r ** np.exp(logp3) * np.log(1 - r) + + +@dataclass +class AnalysisCfg: + """ + Class for configuring behavior of the parameter estimation functions + """ + + "Default number of processes to use when running correction in parallel." + default_nprocs: int = 10 + + "If true, will raise exceptions thrown in the run_correction function." + raise_exceptions: bool = False + + "rfrac for which psat is defined." + psat_level=0.9 + + "Min rfrac used for RP fits and estimation." + rpfit_min_rfrac: float = 0.1 + + "Max rfrac used for RP fits and estiamtion." + rpfit_max_rfrac: float = 0.9 + + "Number of samples to use in the RP fit and integral." + rpfit_intg_nsamps: int = 500 + + "Bias step rfrac threshold below which TES should be considered " + "superconducting, and the correction skipped." + sc_rfrac_thresh: float = 0.02 + + def __post_init__(self): + if self.rpfit_min_rfrac < 0.0001: + raise ValueError("rp_fit_min_rfrac must be larger than 0.0001") + if self.rpfit_max_rfrac > 0.9999: + raise ValueError("rp_fit_max_rfrac must be smaller than 0.9999") + + +@dataclass +class RpFitChanData: + """ + Channel data needed to perform RP correction, pulled from bias step and IV + calibration functions. + """ + bg: int + Rsh: float + + iv_idx_sc: int + iv_R: np.ndarray # Ohm + iv_p_tes: np.ndarray # pW + iv_Rn: float + iv_ibias: np.ndarray # uA + iv_si: np.ndarray # 1/uV + + bs_meas_dIrat: float + bs_Rtes: float = np.nan # Ohm + bs_Ibias: float = np.nan # uA + bs_Si: float = np.nan # 1/uV + + @classmethod + def from_data(cls, iva, bsa, band, channel): + """Create class from IV and BSA dicts""" + iv_idx = np.where( + (iva['channels'] == channel) & (iva['bands'] == band) + )[0] + if len(iv_idx) == 0: + raise ValueError(f"Could not find band={band} channel={channel} in IVAnalysis") + iv_idx = iv_idx[0] + + bs_idx = np.where( + (bsa['channels'] == channel) & (bsa['bands'] == band) + )[0] + if len(bs_idx) == 0: + raise ValueError(f"Could not find band={band} channel={channel} in Bias Steps") + bs_idx = bs_idx[0] + + bg = iva['bgmap'][iv_idx] + + return cls( + bg=bg, + Rsh=iva['meta']['R_sh'], + iv_idx_sc = iva['idxs'][iv_idx, 0], + iv_R=iva['R'][iv_idx], + iv_Rn=iva['R_n'][iv_idx], + iv_p_tes=iva['p_tes'][iv_idx] * 1e12, + iv_si = iva['si'][iv_idx] * 1e-6, + iv_ibias=iva['i_bias'] * 1e6, + bs_meas_dIrat=bsa['dItes'][bs_idx] / bsa['dIbias'][bg], + bs_Rtes = bsa['R0'][bs_idx], + bs_Ibias = bsa['Ibias'][bg] * 1e6, + bs_Si = bsa['Si'][bs_idx] * 1e-6, + ) + +@dataclass +class CorrectionResults: + """ + Results from the parameter correction procedure + """ + + "Channel data used for correction" + chdata: RpFitChanData + + "If correction finished successfully" + success: bool = False + "Traceback on failure" + traceback: Optional[str] = None + + "Log(alpha') fit popts" + logalpha_popts: Optional[np.ndarray] = None + "Log(alpha') fit results object" + logalpha_fit_results: Optional[OptimizeResult] = None + "delta Popt between IV and bias steps" + delta_Popt: Optional[float] = None + + "TES Current" + corrected_I0: Optional[float] = None + "TES resistance" + corrected_R0: Optional[float] = None + "Bias power" + corrected_Pj: Optional[float] = None + "Responsivity" + corrected_Si: Optional[float] = None + "Loopgain at bias current" + L0: Optional[float] = None + + +def find_logalpha_popts(chdata: RpFitChanData, cfg: AnalysisCfg) -> Tuple[np.ndarray, float]: + """ + Fit Log(alpha') to IV data. + + Returns + --------- + popts: np.ndarray + Optimal parameters for log(alpha') fit. + pbias_model_offset: float + Offset between pbias_model from alpha' integration and data [aW] + """ + smooth_dist = 5 + w_len = 2 * smooth_dist + 1 + w = (1./float(w_len))*np.ones(w_len) # window + + rfrac = chdata.iv_R / chdata.iv_Rn + mask = np.ones_like(rfrac, dtype=bool) + mask[:chdata.iv_idx_sc] = False + mask &= (rfrac > cfg.rpfit_min_rfrac) * (rfrac < cfg.rpfit_max_rfrac) + rfrac = np.convolve(rfrac, w, mode='same') + pbias = np.convolve(chdata.iv_p_tes, w, mode='same') + + rfrac = rfrac[mask] + pbias = pbias[mask] + drfrac = np.diff(rfrac, prepend=np.nan) + dpbias = np.diff(pbias, prepend=np.nan) + logalpha = np.log(1 / rfrac * drfrac / dpbias) + logalpha_mask = np.isfinite(logalpha) + + def fit_func(logalpha_pars): + model = model_logalpha(rfrac, *logalpha_pars) + chi2 = np.sum((logalpha[logalpha_mask] - model[logalpha_mask])**2) + return chi2 + + fitres = minimize(fit_func, [4, 10, 1, 1]) + logalpha_popts = fitres.x + + # Find pBias offset + logalpha_model = model_logalpha(rfrac, *fitres.x) + pbias_model = np.nancumsum(drfrac / (rfrac * np.exp(logalpha_model))) + def fit_func(pbias_offset): + return np.sum((pbias_model + pbias_offset - pbias)**2) + pbias_model_offset = minimize(fit_func, pbias[0]).x[0] + + return logalpha_popts, pbias_model_offset + + +def run_correction(chdata: RpFitChanData, cfg: AnalysisCfg) -> CorrectionResults: + """ + Runs TES param correction procedure. + + Args + ---- + chdata: RpFitChanData + IV and Bias Step calibration data + cfg: AnalysisCfg + Correction analysis configuration object. + """ + res = CorrectionResults(chdata) + try: + if np.isnan(chdata.iv_Rn): + raise ValueError("IV Rn is NaN") + + if np.abs(chdata.bs_Rtes / chdata.iv_Rn) < cfg.sc_rfrac_thresh: + # Cannot perform correction, since detectors are SC + res.corrected_R0 = 0 + res.corrected_Si = 0 + res.corrected_Pj = 0 + res.success = True + return res + + # Find logalpha popts + logalpha_popts, pbias_model_offset = find_logalpha_popts(chdata, cfg) + + # compute dPopt by minimizing (dIrat_IV - dIrat_BS) at chosen bias voltage with respect to delta_popt + dIrat_meas = chdata.bs_meas_dIrat + Rsh = chdata.Rsh + Ibias_setpoint = chdata.bs_Ibias + rfrac = np.linspace(cfg.rpfit_min_rfrac, cfg.rpfit_max_rfrac, cfg.rpfit_intg_nsamps) + dr = rfrac[1] - rfrac[0] + logalpha_model = model_logalpha(rfrac, *logalpha_popts) + alpha = np.exp(logalpha_model) + R = rfrac * chdata.iv_Rn + pbias = np.cumsum(dr / (rfrac * np.exp(logalpha_model))) + pbias_model_offset + + def dIrat_IV(dPopt): + P_ = pbias - dPopt + L0_ = alpha * P_ + i_tes_ = np.sqrt(P_ / R) + i_bias_ = i_tes_ * (R + Rsh) / Rsh + irat = (1.0 - L0_) / ((1.0 - L0_) + (1.0 + L0_) * R / Rsh) + return np.interp(Ibias_setpoint, i_bias_, irat) + + def fit_func(dPopt): + return (dIrat_IV(dPopt) - dIrat_meas)**2 + + dPopt = minimize(fit_func, [0]).x[0] + res.delta_Popt = dPopt + + # Adjust IV parameter curves with delta_Popt, and find params at Ibias setpoint + pbias -= dPopt + ok = (pbias > 0.0) * (R > Rsh) + pbias = pbias[ok] + R = R[ok] + L0 = alpha[ok] * pbias + L = (R - Rsh) / (R + Rsh) * L0 + Ites = np.sqrt(pbias / R) + Ibias = Ites * (R + Rsh) / Rsh + Si = -1 / (Ites * (R - Rsh)) * (L / (L + 1)) + + res.corrected_I0 = np.interp(Ibias_setpoint, Ibias, Ites) + res.corrected_R0 = np.interp(Ibias_setpoint, Ibias, R) + res.corrected_Pj = np.interp(Ibias_setpoint, Ibias, pbias) + res.corrected_Si = np.interp(Ibias_setpoint, Ibias, Si) + res.L0 = np.interp(Ibias_setpoint, Ibias, L0) + res.success = True + + except Exception: + if cfg.raise_exceptions: + raise + else: + res.traceback = traceback.format_exc() + res.success = False + + return res + +def run_corrections_parallel( + iva, bsa, cfg: AnalysisCfg, nprocs=None, pool=None) -> List[CorrectionResults]: + """ + Runs correction procedure in parallel for all channels in IV and BSA object + """ + + nchans = iva['nchans'] + pb = trange(nchans) + results = [] + + if nprocs is None: + nprocs = cfg.default_nprocs + + def callback(res: CorrectionResults): + pb.update(1) + results.append(res) + + def errback(e): + raise e + + if pool is None: + pool = multiprocessing.get_context('spawn').Pool(nprocs) + close_pool = True + else: + close_pool = False + + try: + async_results = [] + for idx in range(nchans): + chdata = RpFitChanData.from_data( + iva, bsa, iva['bands'][idx], iva['channels'][idx] + ) + r = pool.apply_async( + run_correction, args=(chdata, cfg), callback=callback, error_callback=errback + ) + async_results.append(r) + for r in async_results: + r.wait() + pb.close() + finally: + if close_pool: + pool.close() + pool.join() + + return results + +def plot_corrected_Rfrac_comparison(results: List[CorrectionResults]): + iv_r = [] + bs_r = [] + corrected_bs_r = [] + for res in results: + if res.success: + idx = np.argmin(np.abs(res.chdata.iv_ibias - res.chdata.bs_Ibias)) + iv_r.append(res.chdata.iv_R[idx]/res.chdata.iv_Rn) + bs_r.append(res.chdata.bs_Rtes/res.chdata.iv_Rn) + corrected_bs_r.append(res.corrected_R0/res.chdata.iv_Rn) + + plt.plot([0, 2], [0, 2], 'k--', alpha=0.5) + plt.plot(iv_r, bs_r, '.', alpha=0.2, label="Bias Step Estimation") + plt.plot(iv_r, corrected_bs_r, '.', alpha=0.2, label='Corrected Rfrac') + plt.legend() + plt.xlim(0, 1.1) + plt.ylim(0, 1.1) + plt.xlabel("IV Rfrac") + plt.ylabel("Bias Step Rfrac") + +def plot_corrected_Si_comparison(results: List[CorrectionResults]): + rfrac = np.full(len(results), np.nan) + bs_si = np.full(len(results), np.nan) + corrected_bs_si = np.full(len(results), np.nan) + bg = np.full(len(results), -1, dtype=int) + for i, res in enumerate(results): + if res.success: + idx = np.argmin(np.abs(res.chdata.iv_ibias - res.chdata.bs_Ibias)) + rfrac[i] = res.chdata.iv_R[idx]/res.chdata.iv_Rn + rfrac[i] = res.corrected_R0/res.chdata.iv_Rn + bs_si[i] = res.chdata.bs_Si + corrected_bs_si[i] = res.corrected_Si + bg[i] = res.chdata.bg + + m = rfrac > 0.2 + m &= bg != -1 + m &= bs_si < 0 + m &= np.abs(bs_si) < 60 + m &= np.abs(corrected_bs_si) < 60 + # m &= bg == 2 + plt.plot(rfrac[m], bs_si[m], '.', alpha=0.2, label="Bias Step Estimation") + plt.plot(rfrac[m], corrected_bs_si[m], '.', alpha=0.2, color='C1', label="Corrected Responsivity") + plt.legend() + plt.xlabel("Corrected Rfrac") + plt.ylabel("Si (1/uV)") + + +def recompute_psats(iva2, cfg: AnalysisCfg): + """ + Re-computes Psat for an IVAnalysis object. Will save results to iva.p_sat. + This assumes i_tes, v_tes, and r_tes have already been calculated. + + Args + ---- + iva2 : Dictionary + Dictionary built from original IV Analysis .npy file + psat_level : float + R_frac level for which Psat is defined. If 0.9, Psat will be the + power on the TES when R_frac = 0.9. + + Returns + ------- + p_sat : np.ndarray + Array of length (nchan) with the p-sat computed for each channel (W) + """ + # calculates P_sat as P_TES when Rfrac = psat_level + # if the TES is at 90% R_n more than once, just take the first crossing + iva2["p_sat"] = np.full(iva2["p_sat"].shape, np.nan) + + for i in range(iva2["nchans"]): + if np.isnan(iva2["R_n"][i]): + continue + + level = cfg.psat_level + R = iva2["R"][i] + R_n = iva2["R_n"][i] + p_tes = iva2["p_tes"][i] + cross_idx = np.where(R / R_n > level)[0] + + if len(cross_idx) == 0: + iva2["p_sat"][i] = np.nan + continue + + # Takes cross-index to be the first time Rfrac crosses psat_level + cross_idx = cross_idx[0] + if cross_idx == 0: + iva2["p_sat"][i] = np.nan + continue + + iva2["idxs"][i, 2] = cross_idx + try: + iva2["p_sat"][i] = interp1d( + R[cross_idx - 1 : cross_idx + 1] / R_n, + p_tes[cross_idx - 1 : cross_idx + 1], + )(level) + except ValueError: + iva2["p_sat"][i] = np.nan + + return iva2["p_sat"] + + +def recompute_si(iva2): + """ + Recalculates responsivity using the thevenin equivalent voltage. + + Args + ---- + iva2 : Dictionary + Dictionary built from original IVAnalysis object (un-analyzed) + + Returns + ------- + si : np.ndarray + Array of length (nchan, nbiases) with the responsivity as a fn of + thevenin equivalent voltage for each channel (V^-1). + + """ + iva2["si"] = np.full(iva2["si"].shape, np.nan) + smooth_dist = 5 + w_len = 2 * smooth_dist + 1 + w = (1.0 / float(w_len)) * np.ones(w_len) # window + + v_thev_smooth = np.convolve(iva2["v_thevenin"], w, mode="same") + dv_thev = np.diff(v_thev_smooth) + + R_bl = iva2["meta"]["bias_line_resistance"] + R_sh = iva2["meta"]["R_sh"] + + for i in range(iva2["nchans"]): + sc_idx = iva2["idxs"][i, 0] + + if np.isnan(iva2["R_n"][i]) or sc_idx == -1: + # iva2['si'][i] = np.nan + continue + + # Running average + i_tes_smooth = np.convolve(iva2["i_tes"][i], w, mode="same") + v_tes_smooth = np.convolve(iva2["v_tes"][i], w, mode="same") + r_tes_smooth = v_tes_smooth / i_tes_smooth + + R_L = iva2["R_L"][i] + + # Take derivatives + di_tes = np.diff(i_tes_smooth) + dv_tes = np.diff(v_tes_smooth) + R_L_smooth = np.ones(len(r_tes_smooth - 1)) * R_L + R_L_smooth[:sc_idx] = dv_tes[:sc_idx] / di_tes[:sc_idx] + r_tes_smooth_noStray = r_tes_smooth - R_L_smooth + i0 = i_tes_smooth[:-1] + r0 = r_tes_smooth_noStray[:-1] + rL = R_L_smooth[:-1] + beta = 0.0 + + # artificially setting rL to 0 for now, + # to avoid issues in the SC branch + # don't expect a large change, given the + # relative size of rL to the other terms + # rL = 0 + + # Responsivity estimate, derivation done here by MSF + # https://www.overleaf.com/project/613978cb38d9d22e8550d45c + si = -(1.0 / (i0 * r0 * (2 + beta))) * ( + 1 - ((r0 * (1 + beta) + rL) / (dv_thev / di_tes)) + ) + si[:sc_idx] = np.nan + iva2["si"][i, :-1] = si + + return iva2["si"] + + +def recompute_ivpars(iva2, cfg: AnalysisCfg): + R_sh = iva2["meta"]["R_sh"] + R_bl = iva2["meta"]["bias_line_resistance"] + iva2["i_bias"] = iva2["v_bias"] / R_bl + iva2["v_tes"] = np.full(iva2["v_tes"].shape, np.nan) + iva2["i_tes"] = np.full(iva2["i_tes"].shape, np.nan) + iva2["p_tes"] = np.full(iva2["p_tes"].shape, np.nan) + iva2["R"] = np.full(iva2["R"].shape, np.nan) + iva2["R_n"] = np.full(iva2["R_n"].shape, np.nan) + iva2["R_L"] = np.full(iva2["R_L"].shape, np.nan) + + for i in range(iva2["nchans"]): + sc_idx = iva2["idxs"][i, 0] + nb_idx = iva2["idxs"][i, 1] + + R = R_sh * (iva2["i_bias"] / (iva2["resp"][i]) - 1) + R_par = np.nanmean(R[1:sc_idx]) + R_n = np.nanmean(R[nb_idx:]) - R_par + R_L = R_sh + R_par + R_tes = R - R_par + + iva2["v_thevenin"] = iva2["i_bias"] * R_sh + iva2["v_tes"][i] = iva2["v_thevenin"] * (R_tes / (R_tes + R_L)) + iva2["i_tes"][i] = iva2["v_tes"][i] / R_tes + iva2["p_tes"][i] = iva2["v_tes"][i] ** 2 / R_tes + + iva2["R"][i] = R_tes + iva2["R_n"][i] = R_n + iva2["R_L"][i] = R_L + + recompute_psats(iva2, cfg) + recompute_si(iva2) \ No newline at end of file From dc2a03cc3ab282346972c7397759cfe250976fae Mon Sep 17 00:00:00 2001 From: Jack Lashner Date: Thu, 1 Aug 2024 09:35:53 -0700 Subject: [PATCH 2/5] bug fixes --- sodetlib/operations/bias_steps.py | 7 ++++ sodetlib/tes_param_correction.py | 65 +++++++++++++++++++------------ 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/sodetlib/operations/bias_steps.py b/sodetlib/operations/bias_steps.py index 086566d2..385bad3f 100644 --- a/sodetlib/operations/bias_steps.py +++ b/sodetlib/operations/bias_steps.py @@ -348,6 +348,13 @@ def save(self, path=None): make_path=True ) + @classmethod + def from_dict(cls, data): + self = cls() + for k, v in data.items(): + setattr(self, k, v) + return self + @classmethod def load(cls, filepath): self = cls() diff --git a/sodetlib/tes_param_correction.py b/sodetlib/tes_param_correction.py index 20d20fc6..80e28c70 100644 --- a/sodetlib/tes_param_correction.py +++ b/sodetlib/tes_param_correction.py @@ -35,28 +35,31 @@ class AnalysisCfg: """ Class for configuring behavior of the parameter estimation functions """ - - "Default number of processes to use when running correction in parallel." default_nprocs: int = 10 + "Default number of processes to use when running correction in parallel." - "If true, will raise exceptions thrown in the run_correction function." raise_exceptions: bool = False + "If true, will raise exceptions thrown in the run_correction function." - "rfrac for which psat is defined." psat_level=0.9 + "rfrac for which psat is defined." + rpfit_min_rfrac: float = 0.05 "Min rfrac used for RP fits and estimation." - rpfit_min_rfrac: float = 0.1 + rpfit_max_rfrac: float = 0.95 "Max rfrac used for RP fits and estiamtion." - rpfit_max_rfrac: float = 0.9 - "Number of samples to use in the RP fit and integral." rpfit_intg_nsamps: int = 500 + "Number of samples to use in the RP fit and integral." + sc_rfrac_thresh: float = 0.02 "Bias step rfrac threshold below which TES should be considered " "superconducting, and the correction skipped." - sc_rfrac_thresh: float = 0.02 + + show_pb: bool = True + "If true, will show a pb when running on all channels together" + def __post_init__(self): if self.rpfit_min_rfrac < 0.0001: @@ -74,7 +77,11 @@ class RpFitChanData: bg: int Rsh: float + band: int + channel: int + iv_idx_sc: int + iv_resp: np.ndarray iv_R: np.ndarray # Ohm iv_p_tes: np.ndarray # pW iv_Rn: float @@ -106,9 +113,10 @@ def from_data(cls, iva, bsa, band, channel): bg = iva['bgmap'][iv_idx] return cls( - bg=bg, + bg=bg, band=band, channel=channel, Rsh=iva['meta']['R_sh'], iv_idx_sc = iva['idxs'][iv_idx, 0], + iv_resp = iva['resp'][iv_idx], iv_R=iva['R'][iv_idx], iv_Rn=iva['R_n'][iv_idx], iv_p_tes=iva['p_tes'][iv_idx] * 1e12, @@ -136,21 +144,20 @@ class CorrectionResults: "Log(alpha') fit popts" logalpha_popts: Optional[np.ndarray] = None - "Log(alpha') fit results object" - logalpha_fit_results: Optional[OptimizeResult] = None + pbias_model_offset: Optional[float] = None "delta Popt between IV and bias steps" delta_Popt: Optional[float] = None - "TES Current" + "TES Current [uA]" corrected_I0: Optional[float] = None - "TES resistance" + "TES resistance [Ohm]" corrected_R0: Optional[float] = None - "Bias power" + "Bias power [aW]" corrected_Pj: Optional[float] = None - "Responsivity" + "Responsivity [1/uV]" corrected_Si: Optional[float] = None "Loopgain at bias current" - L0: Optional[float] = None + loopgain: Optional[float] = None def find_logalpha_popts(chdata: RpFitChanData, cfg: AnalysisCfg) -> Tuple[np.ndarray, float]: @@ -170,7 +177,7 @@ def find_logalpha_popts(chdata: RpFitChanData, cfg: AnalysisCfg) -> Tuple[np.nda rfrac = chdata.iv_R / chdata.iv_Rn mask = np.ones_like(rfrac, dtype=bool) - mask[:chdata.iv_idx_sc] = False + mask[:chdata.iv_idx_sc + w_len + 2] = False # Try to cut out SC branch + smoothing mask &= (rfrac > cfg.rpfit_min_rfrac) * (rfrac < cfg.rpfit_max_rfrac) rfrac = np.convolve(rfrac, w, mode='same') pbias = np.convolve(chdata.iv_p_tes, w, mode='same') @@ -218,14 +225,16 @@ def run_correction(chdata: RpFitChanData, cfg: AnalysisCfg) -> CorrectionResults if np.abs(chdata.bs_Rtes / chdata.iv_Rn) < cfg.sc_rfrac_thresh: # Cannot perform correction, since detectors are SC - res.corrected_R0 = 0 - res.corrected_Si = 0 - res.corrected_Pj = 0 + res.corrected_R0 = np.float(0) + res.corrected_Si = np.float(0) + res.corrected_Pj = np.float(0) res.success = True return res # Find logalpha popts logalpha_popts, pbias_model_offset = find_logalpha_popts(chdata, cfg) + res.logalpha_popts = logalpha_popts + res.pbias_model_offset = pbias_model_offset # compute dPopt by minimizing (dIrat_IV - dIrat_BS) at chosen bias voltage with respect to delta_popt dIrat_meas = chdata.bs_meas_dIrat @@ -247,9 +256,14 @@ def dIrat_IV(dPopt): return np.interp(Ibias_setpoint, i_bias_, irat) def fit_func(dPopt): - return (dIrat_IV(dPopt) - dIrat_meas)**2 + diff = (dIrat_IV(dPopt) - dIrat_meas)**2 + return diff if not np.isnan(diff) else np.inf - dPopt = minimize(fit_func, [0]).x[0] + dPopt_fitres = minimize(fit_func, [0]) + if not dPopt_fitres.success: + raise RuntimeError("dPopt fit failed") + + dPopt = dPopt_fitres.x[0] res.delta_Popt = dPopt # Adjust IV parameter curves with delta_Popt, and find params at Ibias setpoint @@ -267,7 +281,7 @@ def fit_func(dPopt): res.corrected_R0 = np.interp(Ibias_setpoint, Ibias, R) res.corrected_Pj = np.interp(Ibias_setpoint, Ibias, pbias) res.corrected_Si = np.interp(Ibias_setpoint, Ibias, Si) - res.L0 = np.interp(Ibias_setpoint, Ibias, L0) + res.loopgain = np.interp(Ibias_setpoint, Ibias, L0) res.success = True except Exception: @@ -286,7 +300,7 @@ def run_corrections_parallel( """ nchans = iva['nchans'] - pb = trange(nchans) + pb = trange(nchans, disable=(not cfg.show_pb)) results = [] if nprocs is None: @@ -320,8 +334,9 @@ def errback(e): pb.close() finally: if close_pool: - pool.close() + pool.terminate() pool.join() + pool.close() return results From e8209b285ff4981b8afea9cc0107942af57dba64 Mon Sep 17 00:00:00 2001 From: Jack Lashner Date: Tue, 20 Aug 2024 22:19:07 +0000 Subject: [PATCH 3/5] some docs and small changes --- sodetlib/tes_param_correction.py | 69 +++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/sodetlib/tes_param_correction.py b/sodetlib/tes_param_correction.py index 80e28c70..2845d241 100644 --- a/sodetlib/tes_param_correction.py +++ b/sodetlib/tes_param_correction.py @@ -73,6 +73,39 @@ class RpFitChanData: """ Channel data needed to perform RP correction, pulled from bias step and IV calibration functions. + + Args + ------- + bg: int + Bias Line + Rsh: float + Shunt Resistance [Ohms] + band: int + Readout band + channel :int + Readout channel + iv_idx_sc: int + Index in IV data for SC transition + iv_resp: np.ndarray + TES current response relative from IV. + iv_R: np.ndarray + TES resistance [Ohms] through IV. + iv_p_tes: np.ndarray + TES Bias power [pW] through IV. + iv_Rn: float + TES normal resistance, measured from IV. + iv_ibias: np.ndarray + TES bias current [uA] throughout IV. + iv_si: np.ndarray + TES responsivity [1/uV] throughout IV. + bs_meas_dIrat: float + TES measured dIdrat from bias steps. + bs_Rtes: float + TES resistance [Ohm] measured from bias steps. + bs_Ibias: float + TES bias current [uA] measured from bias steps. + bs_Si: float + TES responsivity [1/uV] measured from bias steps. """ bg: int Rsh: float @@ -84,17 +117,17 @@ class RpFitChanData: iv_resp: np.ndarray iv_R: np.ndarray # Ohm iv_p_tes: np.ndarray # pW - iv_Rn: float + iv_Rn: float iv_ibias: np.ndarray # uA iv_si: np.ndarray # 1/uV - bs_meas_dIrat: float + bs_meas_dIrat: float bs_Rtes: float = np.nan # Ohm bs_Ibias: float = np.nan # uA bs_Si: float = np.nan # 1/uV @classmethod - def from_data(cls, iva, bsa, band, channel): + def from_data(cls, iva, bsa, band, channel) -> 'RpFitChanData': """Create class from IV and BSA dicts""" iv_idx = np.where( (iva['channels'] == channel) & (iva['bands'] == band) @@ -102,7 +135,7 @@ def from_data(cls, iva, bsa, band, channel): if len(iv_idx) == 0: raise ValueError(f"Could not find band={band} channel={channel} in IVAnalysis") iv_idx = iv_idx[0] - + bs_idx = np.where( (bsa['channels'] == channel) & (bsa['bands'] == band) )[0] @@ -163,7 +196,7 @@ class CorrectionResults: def find_logalpha_popts(chdata: RpFitChanData, cfg: AnalysisCfg) -> Tuple[np.ndarray, float]: """ Fit Log(alpha') to IV data. - + Returns --------- popts: np.ndarray @@ -178,7 +211,7 @@ def find_logalpha_popts(chdata: RpFitChanData, cfg: AnalysisCfg) -> Tuple[np.nda rfrac = chdata.iv_R / chdata.iv_Rn mask = np.ones_like(rfrac, dtype=bool) mask[:chdata.iv_idx_sc + w_len + 2] = False # Try to cut out SC branch + smoothing - mask &= (rfrac > cfg.rpfit_min_rfrac) * (rfrac < cfg.rpfit_max_rfrac) + mask &= (rfrac > cfg.rpfit_min_rfrac) * (rfrac < cfg.rpfit_max_rfrac) rfrac = np.convolve(rfrac, w, mode='same') pbias = np.convolve(chdata.iv_p_tes, w, mode='same') @@ -197,8 +230,8 @@ def fit_func(logalpha_pars): fitres = minimize(fit_func, [4, 10, 1, 1]) logalpha_popts = fitres.x - # Find pBias offset logalpha_model = model_logalpha(rfrac, *fitres.x) + pbias_model = np.nancumsum(drfrac / (rfrac * np.exp(logalpha_model))) def fit_func(pbias_offset): return np.sum((pbias_model + pbias_offset - pbias)**2) @@ -223,11 +256,11 @@ def run_correction(chdata: RpFitChanData, cfg: AnalysisCfg) -> CorrectionResults if np.isnan(chdata.iv_Rn): raise ValueError("IV Rn is NaN") - if np.abs(chdata.bs_Rtes / chdata.iv_Rn) < cfg.sc_rfrac_thresh: + if np.abs(chdata.bs_Rtes / chdata.iv_Rn) < cfg.sc_rfrac_thresh: # Cannot perform correction, since detectors are SC - res.corrected_R0 = np.float(0) - res.corrected_Si = np.float(0) - res.corrected_Pj = np.float(0) + res.corrected_R0 = 0.0 + res.corrected_Si = 0.0 + res.corrected_Pj = 0.0 res.success = True return res @@ -258,7 +291,7 @@ def dIrat_IV(dPopt): def fit_func(dPopt): diff = (dIrat_IV(dPopt) - dIrat_meas)**2 return diff if not np.isnan(diff) else np.inf - + dPopt_fitres = minimize(fit_func, [0]) if not dPopt_fitres.success: raise RuntimeError("dPopt fit failed") @@ -277,11 +310,11 @@ def fit_func(dPopt): Ibias = Ites * (R + Rsh) / Rsh Si = -1 / (Ites * (R - Rsh)) * (L / (L + 1)) - res.corrected_I0 = np.interp(Ibias_setpoint, Ibias, Ites) - res.corrected_R0 = np.interp(Ibias_setpoint, Ibias, R) - res.corrected_Pj = np.interp(Ibias_setpoint, Ibias, pbias) - res.corrected_Si = np.interp(Ibias_setpoint, Ibias, Si) - res.loopgain = np.interp(Ibias_setpoint, Ibias, L0) + res.corrected_I0 = np.interp(Ibias_setpoint, Ibias, Ites).item() + res.corrected_R0 = np.interp(Ibias_setpoint, Ibias, R).item() + res.corrected_Pj = np.interp(Ibias_setpoint, Ibias, pbias).item() + res.corrected_Si = np.interp(Ibias_setpoint, Ibias, Si).item() + res.loopgain = np.interp(Ibias_setpoint, Ibias, L0).item() res.success = True except Exception: @@ -350,7 +383,7 @@ def plot_corrected_Rfrac_comparison(results: List[CorrectionResults]): iv_r.append(res.chdata.iv_R[idx]/res.chdata.iv_Rn) bs_r.append(res.chdata.bs_Rtes/res.chdata.iv_Rn) corrected_bs_r.append(res.corrected_R0/res.chdata.iv_Rn) - + plt.plot([0, 2], [0, 2], 'k--', alpha=0.5) plt.plot(iv_r, bs_r, '.', alpha=0.2, label="Bias Step Estimation") plt.plot(iv_r, corrected_bs_r, '.', alpha=0.2, label='Corrected Rfrac') From 1a488b2aefd9f6a6a9283b1f88997b9cfe747a57 Mon Sep 17 00:00:00 2001 From: Jack Lashner Date: Thu, 22 Aug 2024 18:57:56 -0400 Subject: [PATCH 4/5] Type checking, docs, to_dict and from_dict --- sodetlib/operations/bias_steps.py | 32 +-- sodetlib/operations/iv.py | 23 +- sodetlib/tes_param_correction.py | 420 +++++++++++++++--------------- 3 files changed, 243 insertions(+), 232 deletions(-) diff --git a/sodetlib/operations/bias_steps.py b/sodetlib/operations/bias_steps.py index 385bad3f..9840c059 100644 --- a/sodetlib/operations/bias_steps.py +++ b/sodetlib/operations/bias_steps.py @@ -6,6 +6,7 @@ import matplotlib.pyplot as plt import scipy.optimize from sodetlib.operations import iv +from typing import Dict, Any np.seterr(all='ignore') @@ -275,9 +276,9 @@ class BiasStepAnalysis: I0 (array (float) shape (nchans)): Computed TES currents for each channel (amps) Pj (array (float) shape (nchans)): - Bias power computed for each channel + Bias power computed for each channel (W) Si (array (float) shape (nchans)): - Responsivity computed for each channel + Responsivity computed for each channel (1/V) step_fit_tmin (float): Time after bias step to start fitting exponential (sec) step_fit_popts (array (float) of shape (nchans, 3)): @@ -313,7 +314,15 @@ def __init__(self, S=None, cfg=None, bgs=None, run_kwargs=None): self.run_kwargs = run_kwargs self.high_current_mode = run_kwargs.get("high_current_mode", True) - def save(self, path=None): + + @classmethod + def from_dict(cls, data) -> 'BiasStepAnalysis': + self = cls() + for k, v in data.items(): + setattr(self, k, v) + return self + + def to_dict(self) -> Dict[str, Any]: data = {} saved_fields = [ # Run data and metadata @@ -338,7 +347,10 @@ def save(self, path=None): print(f"WARNING: field {f} does not exist... " "defaulting to None") data[f] = getattr(self, f, None) + return data + def save(self, path=None) -> None: + data = self.to_dict() if path is not None: np.save(path, data, allow_pickle=True) self.filepath = path @@ -349,18 +361,8 @@ def save(self, path=None): ) @classmethod - def from_dict(cls, data): - self = cls() - for k, v in data.items(): - setattr(self, k, v) - return self - - @classmethod - def load(cls, filepath): - self = cls() - data = np.load(filepath, allow_pickle=True).item() - for k, v in data.items(): - setattr(self, k, v) + def load(cls, filepath) -> 'BiasStepAnalysis': + self = cls.from_dict(np.load(filepath, allow_pickle=True).item()) self.filepath = filepath return self diff --git a/sodetlib/operations/iv.py b/sodetlib/operations/iv.py index 04e349f0..f699f285 100644 --- a/sodetlib/operations/iv.py +++ b/sodetlib/operations/iv.py @@ -6,7 +6,7 @@ import os from copy import deepcopy from tqdm.auto import trange -from typing import Optional +from typing import Optional, Dict, Any from dataclasses import dataclass, asdict import warnings @@ -106,7 +106,7 @@ class IVAnalysis: current. """ def __init__(self, S=None, cfg=None, run_kwargs=None, sid=None, - start_times=None, stop_times=None): + start_times=None, stop_times=None) -> None: self._S = S self._cfg = cfg @@ -130,6 +130,7 @@ def __init__(self, S=None, cfg=None, run_kwargs=None, sid=None, self.channels = am.ch_info.channel self.v_bias = np.full((self.nbiases, ), np.nan) self.i_bias = np.full((self.nbiases, ), np.nan) + self.v_thevenin = np.full((self.nbiases, ), np.nan) self.resp = np.full((self.nchans, self.nbiases), np.nan) self.R = np.full((self.nchans, self.nbiases), np.nan) self.p_tes = np.full((self.nchans, self.nbiases), np.nan) @@ -145,16 +146,19 @@ def __init__(self, S=None, cfg=None, run_kwargs=None, sid=None, self.bands, self.channels, self.meta['bgmap_file'] ) - def save(self, path=None, update_cfg=False): + def to_dict(self): saved_fields = [ 'meta', 'bands', 'channels', 'sid', 'start_times', 'stop_times', 'run_kwargs', 'biases_cmd', 'bias_groups', 'nbiases', 'nchans', 'bgmap', 'polarity', 'resp', 'v_bias', 'i_bias', 'R', 'R_n', 'R_L', - 'idxs', 'p_tes', 'v_tes', 'i_tes', 'p_sat', 'si', + 'idxs', 'p_tes', 'v_tes', 'i_tes', 'p_sat', 'si', 'v_thevenin' ] - data = { + return { field: getattr(self, field, None) for field in saved_fields } + + def save(self, path=None, update_cfg=False): + data = self.to_dict() if path is not None: np.save(path, data, allow_pickle=True) else: @@ -166,9 +170,9 @@ def save(self, path=None, update_cfg=False): update_file=True) @classmethod - def load(cls, path): + def from_dict(cls, data: Dict[str, Any]) -> 'IVAnalysis': iva = cls() - for key, val in np.load(path, allow_pickle=True).item().items(): + for key, val in data.items(): setattr(iva, key, val) if len(iva.start_times) == 1: @@ -180,6 +184,11 @@ def load(cls, path): return iva + @classmethod + def load(cls, path) -> 'IVAnalysis': + return cls.from_dict(np.load(path, allow_pickle=True).item()) + + def _load_am(self, arc=None): if self.am is None: if arc: diff --git a/sodetlib/tes_param_correction.py b/sodetlib/tes_param_correction.py index 2845d241..b8873ca9 100644 --- a/sodetlib/tes_param_correction.py +++ b/sodetlib/tes_param_correction.py @@ -13,17 +13,22 @@ import numpy as np from scipy.interpolate import interp1d -from scipy.optimize import minimize, OptimizeResult +from scipy.optimize import minimize from tqdm.auto import trange import multiprocessing -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, TypeVar from dataclasses import dataclass import traceback -import matplotlib.pyplot as plt +from copy import deepcopy +from numpy.typing import NDArray -from sodetlib.operations import bias_steps, iv +from sodetlib.operations.iv import IVAnalysis +from sodetlib.operations.bias_steps import BiasStepAnalysis -def model_logalpha(r, logp0=0.0, p1=7.0, p2=1.0, logp3=0.0): +D = TypeVar("D", float, np.ndarray) + + +def model_logalpha(r: D, logp0=0.0, p1=7.0, p2=1.0, logp3=0.0) -> D: """ Fits Alpha' parameter, or alpha / GT. P0 has units of [uW]^-1. """ @@ -35,13 +40,14 @@ class AnalysisCfg: """ Class for configuring behavior of the parameter estimation functions """ + default_nprocs: int = 10 "Default number of processes to use when running correction in parallel." raise_exceptions: bool = False "If true, will raise exceptions thrown in the run_correction function." - psat_level=0.9 + psat_level: float = 0.9 "rfrac for which psat is defined." rpfit_min_rfrac: float = 0.05 @@ -60,7 +66,6 @@ class AnalysisCfg: show_pb: bool = True "If true, will show a pb when running on all channels together" - def __post_init__(self): if self.rpfit_min_rfrac < 0.0001: raise ValueError("rp_fit_min_rfrac must be larger than 0.0001") @@ -107,6 +112,7 @@ class RpFitChanData: bs_Si: float TES responsivity [1/uV] measured from bias steps. """ + bg: int Rsh: float @@ -115,85 +121,100 @@ class RpFitChanData: iv_idx_sc: int iv_resp: np.ndarray - iv_R: np.ndarray # Ohm - iv_p_tes: np.ndarray # pW + iv_R: np.ndarray # Ohm + iv_p_tes: np.ndarray # pW iv_Rn: float - iv_ibias: np.ndarray # uA - iv_si: np.ndarray # 1/uV + iv_ibias: np.ndarray # uA + iv_si: np.ndarray # 1/uV bs_meas_dIrat: float bs_Rtes: float = np.nan # Ohm - bs_Ibias: float = np.nan # uA - bs_Si: float = np.nan # 1/uV + bs_Ibias: float = np.nan # uA + bs_Si: float = np.nan # 1/uV @classmethod - def from_data(cls, iva, bsa, band, channel) -> 'RpFitChanData': + def from_data( + cls, iva: IVAnalysis, bsa: BiasStepAnalysis, band: int, channel: int + ) -> "RpFitChanData": """Create class from IV and BSA dicts""" - iv_idx = np.where( - (iva['channels'] == channel) & (iva['bands'] == band) - )[0] + iv_idx = np.where((iva.channels == channel) & (iva.bands == band))[0] if len(iv_idx) == 0: - raise ValueError(f"Could not find band={band} channel={channel} in IVAnalysis") + raise ValueError( + f"Could not find band={band} channel={channel} in IVAnalysis" + ) iv_idx = iv_idx[0] - bs_idx = np.where( - (bsa['channels'] == channel) & (bsa['bands'] == band) - )[0] + bs_idx = np.where((bsa.channels == channel) & (bsa.bands == band))[0] if len(bs_idx) == 0: - raise ValueError(f"Could not find band={band} channel={channel} in Bias Steps") + raise ValueError( + f"Could not find band={band} channel={channel} in Bias Steps" + ) bs_idx = bs_idx[0] - bg = iva['bgmap'][iv_idx] + bg = iva.bgmap[iv_idx] return cls( - bg=bg, band=band, channel=channel, - Rsh=iva['meta']['R_sh'], - iv_idx_sc = iva['idxs'][iv_idx, 0], - iv_resp = iva['resp'][iv_idx], - iv_R=iva['R'][iv_idx], - iv_Rn=iva['R_n'][iv_idx], - iv_p_tes=iva['p_tes'][iv_idx] * 1e12, - iv_si = iva['si'][iv_idx] * 1e-6, - iv_ibias=iva['i_bias'] * 1e6, - bs_meas_dIrat=bsa['dItes'][bs_idx] / bsa['dIbias'][bg], - bs_Rtes = bsa['R0'][bs_idx], - bs_Ibias = bsa['Ibias'][bg] * 1e6, - bs_Si = bsa['Si'][bs_idx] * 1e-6, + bg=bg, + band=band, + channel=channel, + Rsh=iva.meta["R_sh"], + iv_idx_sc=iva.idxs[iv_idx, 0], + iv_resp=iva.resp[iv_idx], + iv_R=iva.R[iv_idx], + iv_Rn=iva.R_n[iv_idx], + iv_p_tes=iva.p_tes[iv_idx] * 1e12, + iv_si=iva.si[iv_idx] * 1e-6, + iv_ibias=iva.i_bias * 1e6, + bs_meas_dIrat=bsa.dItes[bs_idx] / bsa.dIbias[bg], + bs_Rtes=bsa.R0[bs_idx], + bs_Ibias=bsa.Ibias[bg] * 1e6, + bs_Si=bsa.Si[bs_idx] * 1e-6, ) + +@dataclass +class CorrectedParams: + "Log(alpha') fit popts" + + logalpha_popts: np.ndarray + "Log alpha fit parameters" + pbias_model_offset: float + "delta Popt between IV and bias steps" + delta_Popt: float + "TES Current [uA]" + corrected_I0: float + "TES resistance [Ohm]" + corrected_R0: float + "Bias power [aW]" + corrected_Pj: float + "Responsivity [1/uV]" + corrected_Si: float + "Loopgain at bias current" + loopgain: float + + @dataclass class CorrectionResults: """ Results from the parameter correction procedure """ - "Channel data used for correction" chdata: RpFitChanData + "Channel data used for correction" - "If correction finished successfully" success: bool = False - "Traceback on failure" + "If correction finished successfully" + traceback: Optional[str] = None + "Traceback on failure" - "Log(alpha') fit popts" - logalpha_popts: Optional[np.ndarray] = None - pbias_model_offset: Optional[float] = None - "delta Popt between IV and bias steps" - delta_Popt: Optional[float] = None - - "TES Current [uA]" - corrected_I0: Optional[float] = None - "TES resistance [Ohm]" - corrected_R0: Optional[float] = None - "Bias power [aW]" - corrected_Pj: Optional[float] = None - "Responsivity [1/uV]" - corrected_Si: Optional[float] = None - "Loopgain at bias current" - loopgain: Optional[float] = None + corrected_params: Optional[CorrectedParams] = None + "Model popts and corrected TES parameters" -def find_logalpha_popts(chdata: RpFitChanData, cfg: AnalysisCfg) -> Tuple[np.ndarray, float]: +def find_logalpha_popts( + chdata: RpFitChanData, cfg: AnalysisCfg +) -> Tuple[np.ndarray, float]: """ Fit Log(alpha') to IV data. @@ -206,14 +227,14 @@ def find_logalpha_popts(chdata: RpFitChanData, cfg: AnalysisCfg) -> Tuple[np.nda """ smooth_dist = 5 w_len = 2 * smooth_dist + 1 - w = (1./float(w_len))*np.ones(w_len) # window + w = (1.0 / float(w_len)) * np.ones(w_len) # window rfrac = chdata.iv_R / chdata.iv_Rn mask = np.ones_like(rfrac, dtype=bool) - mask[:chdata.iv_idx_sc + w_len + 2] = False # Try to cut out SC branch + smoothing + mask[: chdata.iv_idx_sc + w_len + 2] = False # Try to cut out SC branch + smoothing mask &= (rfrac > cfg.rpfit_min_rfrac) * (rfrac < cfg.rpfit_max_rfrac) - rfrac = np.convolve(rfrac, w, mode='same') - pbias = np.convolve(chdata.iv_p_tes, w, mode='same') + rfrac = np.convolve(rfrac, w, mode="same") + pbias = np.convolve(chdata.iv_p_tes, w, mode="same") rfrac = rfrac[mask] pbias = pbias[mask] @@ -222,20 +243,22 @@ def find_logalpha_popts(chdata: RpFitChanData, cfg: AnalysisCfg) -> Tuple[np.nda logalpha = np.log(1 / rfrac * drfrac / dpbias) logalpha_mask = np.isfinite(logalpha) - def fit_func(logalpha_pars): + def logalpha_fit_func(logalpha_pars): model = model_logalpha(rfrac, *logalpha_pars) - chi2 = np.sum((logalpha[logalpha_mask] - model[logalpha_mask])**2) + chi2 = np.sum((logalpha[logalpha_mask] - model[logalpha_mask]) ** 2) return chi2 - fitres = minimize(fit_func, [4, 10, 1, 1]) + fitres = minimize(logalpha_fit_func, [4, 10, 1, 1]) logalpha_popts = fitres.x logalpha_model = model_logalpha(rfrac, *fitres.x) pbias_model = np.nancumsum(drfrac / (rfrac * np.exp(logalpha_model))) - def fit_func(pbias_offset): - return np.sum((pbias_model + pbias_offset - pbias)**2) - pbias_model_offset = minimize(fit_func, pbias[0]).x[0] + + def pbias_offset_fit_func(pbias_offset): + return np.sum((pbias_model + pbias_offset - pbias) ** 2) + + pbias_model_offset = minimize(pbias_offset_fit_func, pbias[0]).x[0] return logalpha_popts, pbias_model_offset @@ -256,24 +279,31 @@ def run_correction(chdata: RpFitChanData, cfg: AnalysisCfg) -> CorrectionResults if np.isnan(chdata.iv_Rn): raise ValueError("IV Rn is NaN") + # Find logalpha popts + logalpha_popts, pbias_model_offset = find_logalpha_popts(chdata, cfg) + if np.abs(chdata.bs_Rtes / chdata.iv_Rn) < cfg.sc_rfrac_thresh: # Cannot perform correction, since detectors are SC - res.corrected_R0 = 0.0 - res.corrected_Si = 0.0 - res.corrected_Pj = 0.0 + res.corrected_params = CorrectedParams( + logalpha_popts=logalpha_popts, + pbias_model_offset=pbias_model_offset, + delta_Popt=np.nan, + corrected_I0=0.0, + corrected_R0=0.0, + corrected_Pj=0.0, + corrected_Si=0.0, + loopgain=np.nan, + ) res.success = True return res - # Find logalpha popts - logalpha_popts, pbias_model_offset = find_logalpha_popts(chdata, cfg) - res.logalpha_popts = logalpha_popts - res.pbias_model_offset = pbias_model_offset - # compute dPopt by minimizing (dIrat_IV - dIrat_BS) at chosen bias voltage with respect to delta_popt dIrat_meas = chdata.bs_meas_dIrat Rsh = chdata.Rsh Ibias_setpoint = chdata.bs_Ibias - rfrac = np.linspace(cfg.rpfit_min_rfrac, cfg.rpfit_max_rfrac, cfg.rpfit_intg_nsamps) + rfrac = np.linspace( + cfg.rpfit_min_rfrac, cfg.rpfit_max_rfrac, cfg.rpfit_intg_nsamps + ) dr = rfrac[1] - rfrac[0] logalpha_model = model_logalpha(rfrac, *logalpha_popts) alpha = np.exp(logalpha_model) @@ -289,7 +319,7 @@ def dIrat_IV(dPopt): return np.interp(Ibias_setpoint, i_bias_, irat) def fit_func(dPopt): - diff = (dIrat_IV(dPopt) - dIrat_meas)**2 + diff = (dIrat_IV(dPopt) - dIrat_meas) ** 2 return diff if not np.isnan(diff) else np.inf dPopt_fitres = minimize(fit_func, [0]) @@ -297,7 +327,6 @@ def fit_func(dPopt): raise RuntimeError("dPopt fit failed") dPopt = dPopt_fitres.x[0] - res.delta_Popt = dPopt # Adjust IV parameter curves with delta_Popt, and find params at Ibias setpoint pbias -= dPopt @@ -310,13 +339,17 @@ def fit_func(dPopt): Ibias = Ites * (R + Rsh) / Rsh Si = -1 / (Ites * (R - Rsh)) * (L / (L + 1)) - res.corrected_I0 = np.interp(Ibias_setpoint, Ibias, Ites).item() - res.corrected_R0 = np.interp(Ibias_setpoint, Ibias, R).item() - res.corrected_Pj = np.interp(Ibias_setpoint, Ibias, pbias).item() - res.corrected_Si = np.interp(Ibias_setpoint, Ibias, Si).item() - res.loopgain = np.interp(Ibias_setpoint, Ibias, L0).item() + res.corrected_params = CorrectedParams( + logalpha_popts=logalpha_popts, + pbias_model_offset=pbias_model_offset, + delta_Popt=dPopt, + corrected_I0=np.interp(Ibias_setpoint, Ibias, Ites).item(), + corrected_R0=np.interp(Ibias_setpoint, Ibias, R).item(), + corrected_Pj=np.interp(Ibias_setpoint, Ibias, pbias).item(), + corrected_Si=np.interp(Ibias_setpoint, Ibias, Si).item(), + loopgain=np.interp(Ibias_setpoint, Ibias, L0).item(), + ) res.success = True - except Exception: if cfg.raise_exceptions: raise @@ -326,13 +359,15 @@ def fit_func(dPopt): return res + def run_corrections_parallel( - iva, bsa, cfg: AnalysisCfg, nprocs=None, pool=None) -> List[CorrectionResults]: + iva, bsa, cfg: AnalysisCfg, nprocs=None, pool=None +) -> List[CorrectionResults]: """ Runs correction procedure in parallel for all channels in IV and BSA object """ - nchans = iva['nchans'] + nchans = iva["nchans"] pb = trange(nchans, disable=(not cfg.show_pb)) results = [] @@ -342,12 +377,12 @@ def run_corrections_parallel( def callback(res: CorrectionResults): pb.update(1) results.append(res) - + def errback(e): raise e - + if pool is None: - pool = multiprocessing.get_context('spawn').Pool(nprocs) + pool = multiprocessing.get_context("spawn").Pool(nprocs) close_pool = True else: close_pool = False @@ -356,10 +391,13 @@ def errback(e): async_results = [] for idx in range(nchans): chdata = RpFitChanData.from_data( - iva, bsa, iva['bands'][idx], iva['channels'][idx] + iva, bsa, iva["bands"][idx], iva["channels"][idx] ) r = pool.apply_async( - run_correction, args=(chdata, cfg), callback=callback, error_callback=errback + run_correction, + args=(chdata, cfg), + callback=callback, + error_callback=errback, ) async_results.append(r) for r in async_results: @@ -373,57 +411,12 @@ def errback(e): return results -def plot_corrected_Rfrac_comparison(results: List[CorrectionResults]): - iv_r = [] - bs_r = [] - corrected_bs_r = [] - for res in results: - if res.success: - idx = np.argmin(np.abs(res.chdata.iv_ibias - res.chdata.bs_Ibias)) - iv_r.append(res.chdata.iv_R[idx]/res.chdata.iv_Rn) - bs_r.append(res.chdata.bs_Rtes/res.chdata.iv_Rn) - corrected_bs_r.append(res.corrected_R0/res.chdata.iv_Rn) - - plt.plot([0, 2], [0, 2], 'k--', alpha=0.5) - plt.plot(iv_r, bs_r, '.', alpha=0.2, label="Bias Step Estimation") - plt.plot(iv_r, corrected_bs_r, '.', alpha=0.2, label='Corrected Rfrac') - plt.legend() - plt.xlim(0, 1.1) - plt.ylim(0, 1.1) - plt.xlabel("IV Rfrac") - plt.ylabel("Bias Step Rfrac") - -def plot_corrected_Si_comparison(results: List[CorrectionResults]): - rfrac = np.full(len(results), np.nan) - bs_si = np.full(len(results), np.nan) - corrected_bs_si = np.full(len(results), np.nan) - bg = np.full(len(results), -1, dtype=int) - for i, res in enumerate(results): - if res.success: - idx = np.argmin(np.abs(res.chdata.iv_ibias - res.chdata.bs_Ibias)) - rfrac[i] = res.chdata.iv_R[idx]/res.chdata.iv_Rn - rfrac[i] = res.corrected_R0/res.chdata.iv_Rn - bs_si[i] = res.chdata.bs_Si - corrected_bs_si[i] = res.corrected_Si - bg[i] = res.chdata.bg - - m = rfrac > 0.2 - m &= bg != -1 - m &= bs_si < 0 - m &= np.abs(bs_si) < 60 - m &= np.abs(corrected_bs_si) < 60 - # m &= bg == 2 - plt.plot(rfrac[m], bs_si[m], '.', alpha=0.2, label="Bias Step Estimation") - plt.plot(rfrac[m], corrected_bs_si[m], '.', alpha=0.2, color='C1', label="Corrected Responsivity") - plt.legend() - plt.xlabel("Corrected Rfrac") - plt.ylabel("Si (1/uV)") - - -def recompute_psats(iva2, cfg: AnalysisCfg): + +def compute_psats(iva: IVAnalysis, cfg: AnalysisCfg) -> Tuple[np.ndarray, np.ndarray]: """ Re-computes Psat for an IVAnalysis object. Will save results to iva.p_sat. - This assumes i_tes, v_tes, and r_tes have already been calculated. + This assumes i_tes, v_tes, and r_tes have already been calculated. This will + not modify the original IVAnalysis object. Args ---- @@ -437,83 +430,82 @@ def recompute_psats(iva2, cfg: AnalysisCfg): ------- p_sat : np.ndarray Array of length (nchan) with the p-sat computed for each channel (W) + psat_cross_idx : np.ndarray + Array of indices at which the psat level is crossed for each channel """ # calculates P_sat as P_TES when Rfrac = psat_level # if the TES is at 90% R_n more than once, just take the first crossing - iva2["p_sat"] = np.full(iva2["p_sat"].shape, np.nan) + psats = np.full(iva.nchans, np.nan) + psat_cross_idx = np.full(iva.nchans, -1) - for i in range(iva2["nchans"]): - if np.isnan(iva2["R_n"][i]): - continue + for i in range(iva.nchans): + R = iva.R[i] + R_n = iva.R_n[i] + p_tes = iva.p_tes[i] - level = cfg.psat_level - R = iva2["R"][i] - R_n = iva2["R_n"][i] - p_tes = iva2["p_tes"][i] - cross_idx = np.where(R / R_n > level)[0] + if np.isnan(R_n): + continue + cross_idx = np.where(R / R_n > cfg.psat_level)[0] if len(cross_idx) == 0: - iva2["p_sat"][i] = np.nan continue # Takes cross-index to be the first time Rfrac crosses psat_level cross_idx = cross_idx[0] if cross_idx == 0: - iva2["p_sat"][i] = np.nan continue - iva2["idxs"][i, 2] = cross_idx + psat_cross_idx[i] = cross_idx try: - iva2["p_sat"][i] = interp1d( + psat = interp1d( R[cross_idx - 1 : cross_idx + 1] / R_n, p_tes[cross_idx - 1 : cross_idx + 1], - )(level) + )(cfg.psat_level) except ValueError: - iva2["p_sat"][i] = np.nan + continue + psats[i] = psat - return iva2["p_sat"] + return psats, psat_cross_idx -def recompute_si(iva2): +def compute_si(iva: IVAnalysis) -> np.ndarray: """ - Recalculates responsivity using the thevenin equivalent voltage. + Recalculates responsivity using the thevenin equivalent voltage. This will + not modify the original IVAnalysis object. - Args + Args ---- - iva2 : Dictionary - Dictionary built from original IVAnalysis object (un-analyzed) + iva : IVAnalysis + IVAnalysis object for which you want to compute Si. This should already + have items like R, R_n, R_L, i_tes, v_tes, computed. Returns ------- si : np.ndarray Array of length (nchan, nbiases) with the responsivity as a fn of thevenin equivalent voltage for each channel (V^-1). - """ - iva2["si"] = np.full(iva2["si"].shape, np.nan) + si_all = np.full(iva.si.shape, np.nan) + smooth_dist = 5 w_len = 2 * smooth_dist + 1 w = (1.0 / float(w_len)) * np.ones(w_len) # window - v_thev_smooth = np.convolve(iva2["v_thevenin"], w, mode="same") + v_thev_smooth = np.convolve(iva.v_thevenin, w, mode="same") dv_thev = np.diff(v_thev_smooth) - R_bl = iva2["meta"]["bias_line_resistance"] - R_sh = iva2["meta"]["R_sh"] + for i in range(iva.nchans): + sc_idx = iva.idxs[i, 0] - for i in range(iva2["nchans"]): - sc_idx = iva2["idxs"][i, 0] - - if np.isnan(iva2["R_n"][i]) or sc_idx == -1: - # iva2['si'][i] = np.nan + if np.isnan(iva.R_n[i]) or sc_idx == -1: continue # Running average - i_tes_smooth = np.convolve(iva2["i_tes"][i], w, mode="same") - v_tes_smooth = np.convolve(iva2["v_tes"][i], w, mode="same") + i_tes_smooth = np.convolve(iva.i_tes[i], w, mode="same") + v_tes_smooth = np.convolve(iva.v_tes[i], w, mode="same") r_tes_smooth = v_tes_smooth / i_tes_smooth - R_L = iva2["R_L"][i] + R_L = iva.R_L[i] # Take derivatives di_tes = np.diff(i_tes_smooth) @@ -538,40 +530,48 @@ def recompute_si(iva2): 1 - ((r0 * (1 + beta) + rL) / (dv_thev / di_tes)) ) si[:sc_idx] = np.nan - iva2["si"][i, :-1] = si - - return iva2["si"] - - -def recompute_ivpars(iva2, cfg: AnalysisCfg): - R_sh = iva2["meta"]["R_sh"] - R_bl = iva2["meta"]["bias_line_resistance"] - iva2["i_bias"] = iva2["v_bias"] / R_bl - iva2["v_tes"] = np.full(iva2["v_tes"].shape, np.nan) - iva2["i_tes"] = np.full(iva2["i_tes"].shape, np.nan) - iva2["p_tes"] = np.full(iva2["p_tes"].shape, np.nan) - iva2["R"] = np.full(iva2["R"].shape, np.nan) - iva2["R_n"] = np.full(iva2["R_n"].shape, np.nan) - iva2["R_L"] = np.full(iva2["R_L"].shape, np.nan) - - for i in range(iva2["nchans"]): - sc_idx = iva2["idxs"][i, 0] - nb_idx = iva2["idxs"][i, 1] - - R = R_sh * (iva2["i_bias"] / (iva2["resp"][i]) - 1) - R_par = np.nanmean(R[1:sc_idx]) - R_n = np.nanmean(R[nb_idx:]) - R_par - R_L = R_sh + R_par - R_tes = R - R_par - - iva2["v_thevenin"] = iva2["i_bias"] * R_sh - iva2["v_tes"][i] = iva2["v_thevenin"] * (R_tes / (R_tes + R_L)) - iva2["i_tes"][i] = iva2["v_tes"][i] / R_tes - iva2["p_tes"][i] = iva2["v_tes"][i] ** 2 / R_tes - - iva2["R"][i] = R_tes - iva2["R_n"][i] = R_n - iva2["R_L"][i] = R_L - - recompute_psats(iva2, cfg) - recompute_si(iva2) \ No newline at end of file + si_all[i] = si + + return si_all + + +def recompute_ivpars(iva: IVAnalysis, cfg: AnalysisCfg) -> IVAnalysis: + """ + Takes in an IV Analysis object and analysis cfg params, and recomputes + TES voltage, current, bias power, resistance, responsivity, and saturation + powers using the corrected TES formulas for R_L and V_thevenin. + """ + iva_new = IVAnalysis.from_dict(deepcopy(iva.to_dict())) + R_sh = iva.meta["R_sh"] + R_bl = iva.meta["bias_line_resistance"] + iva_new.i_bias = iva_new.v_bias / R_bl + iva_new.v_thevenin = iva_new.i_bias * R_sh + iva_new.v_tes = np.full(iva.v_tes.shape, np.nan) + iva_new.i_tes = np.full(iva.i_tes.shape, np.nan) + iva_new.p_tes = np.full(iva.p_tes.shape, np.nan) + iva_new.R = np.full(iva.R.shape, np.nan) + iva_new.R_n = np.full(iva.R_n.shape, np.nan) + iva_new.R_L = np.full(iva.R_L.shape, np.nan) + + iva_new.resp + for i in range(iva.nchans): + sc_idx = iva.idxs[i, 0] + nb_idx = iva.idxs[i, 1] + + R: NDArray[np.float] = R_sh * (iva.i_bias / iva.resp[i] - 1) + R_par: float = np.nanmean(R[1:sc_idx]) + R_n: float = np.nanmean(R[nb_idx:]) - R_par + R_L: float = R_sh + R_par + R_tes: NDArray[np.float] = R - R_par + + iva_new.v_tes[i] = iva_new.v_thevenin * (R_tes / (R_tes + R_L)) + iva_new.i_tes[i] = iva_new.v_tes[i] / R_tes + iva_new.p_tes[i] = iva_new.v_tes[i] ** 2 / R_tes + iva_new.R[i] = R_tes + iva_new.R_n[i] = R_n + iva_new.R_L[i] = R_L + + iva_new.p_sat, iva_new.idxs[:, 2] = compute_psats(iva_new, cfg) + iva_new.si = compute_si(iva_new) + + return iva_new From 5664ec38b8945c140ae953cd53dfad39ba9d18a6 Mon Sep 17 00:00:00 2001 From: Jack Lashner Date: Wed, 28 Aug 2024 15:59:56 +0000 Subject: [PATCH 5/5] Bug fix and py.typed file --- sodetlib/py.typed | 0 sodetlib/tes_param_correction.py | 10 +++++----- 2 files changed, 5 insertions(+), 5 deletions(-) create mode 100644 sodetlib/py.typed diff --git a/sodetlib/py.typed b/sodetlib/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/sodetlib/tes_param_correction.py b/sodetlib/tes_param_correction.py index b8873ca9..19902e96 100644 --- a/sodetlib/tes_param_correction.py +++ b/sodetlib/tes_param_correction.py @@ -291,7 +291,7 @@ def run_correction(chdata: RpFitChanData, cfg: AnalysisCfg) -> CorrectionResults corrected_I0=0.0, corrected_R0=0.0, corrected_Pj=0.0, - corrected_Si=0.0, + corrected_Si=np.nan, loopgain=np.nan, ) res.success = True @@ -361,13 +361,13 @@ def fit_func(dPopt): def run_corrections_parallel( - iva, bsa, cfg: AnalysisCfg, nprocs=None, pool=None + iva: IVAnalysis, bsa: BiasStepAnalysis, cfg: AnalysisCfg, nprocs=None, pool=None ) -> List[CorrectionResults]: """ Runs correction procedure in parallel for all channels in IV and BSA object """ - nchans = iva["nchans"] + nchans = iva.nchans pb = trange(nchans, disable=(not cfg.show_pb)) results = [] @@ -391,7 +391,7 @@ def errback(e): async_results = [] for idx in range(nchans): chdata = RpFitChanData.from_data( - iva, bsa, iva["bands"][idx], iva["channels"][idx] + iva, bsa, iva.bands[idx], iva.channels[idx] ) r = pool.apply_async( run_correction, @@ -530,7 +530,7 @@ def compute_si(iva: IVAnalysis) -> np.ndarray: 1 - ((r0 * (1 + beta) + rL) / (dv_thev / di_tes)) ) si[:sc_idx] = np.nan - si_all[i] = si + si_all[i, :-1] = si return si_all