diff --git a/CHANGELOG.md b/CHANGELOG.md index cb68c40..ad11c0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # pylhc Changelog +## Version 0.5.0 + +Removed `irnl_rdt_correction`. Is now in https://github.com/pylhc/irnl_rdt_correction + ## Version 0.4.1 Minor bugfixes in `machine_settings_info`. diff --git a/README.md b/README.md index 69b1cce..9ae4754 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,6 @@ To use those, you should make sure to install the relevant extra dependencies wi - `Machine Settings Info` - Prints an overview over the machine settings at a given time. ([**machine_settings_info.py**](pylhc/machine_settings_info.py)) - `BSRT Logger` and `BSRT Analysis` - Saves data coming straight from LHC BSRT FESA class and allows subsequent analysis. ([**bsrt_logger.py**](pylhc/bsrt_logger.py) & [**bsrt_analysis.py**](pylhc/bsrt_analysis.py) ) - `BPM Calibration Factors` - Compute the BPM calibration factors using ballistic optics. Two methods are available: using the beta function and using the dispersion. ([**bpm_calibration.py**](pylhc/bpm_calibration.py)) -- `IR NonLinear RDT Correction` - Script to compute RDT correction in the (HL)LHC IRs including feed-down effects. ([**irnl_rdt_correction.py**](pylhc/irnl_rdt_correction.py)) ## License diff --git a/doc/entrypoints/irnl_rdt_correction.rst b/doc/entrypoints/irnl_rdt_correction.rst deleted file mode 100644 index 2ba24d2..0000000 --- a/doc/entrypoints/irnl_rdt_correction.rst +++ /dev/null @@ -1,5 +0,0 @@ -IRNL RDT Correction -************************** - -.. automodule:: pylhc.irnl_rdt_correction - :members: diff --git a/pylhc/__init__.py b/pylhc/__init__.py index af81da1..a21eb3d 100644 --- a/pylhc/__init__.py +++ b/pylhc/__init__.py @@ -10,7 +10,7 @@ __title__ = "pylhc" __description__ = "An accelerator physics script collection for the OMC team at CERN." __url__ = "https://github.com/pylhc/pylhc" -__version__ = "0.4.2" +__version__ = "0.5.0" __author__ = "pylhc" __author_email__ = "pylhc@github.com" __license__ = "MIT" diff --git a/pylhc/irnl_rdt_correction.py b/pylhc/irnl_rdt_correction.py deleted file mode 100644 index 20b0845..0000000 --- a/pylhc/irnl_rdt_correction.py +++ /dev/null @@ -1,1174 +0,0 @@ -""" -Nonlinear Correction calculation in the IRs --------------------------------------------- - -Performs local correction of the Resonance Driving Terms (RDTs) -in the Insertion Regions (IRs) based on the principle described in -[#BruningDynamicApertureStudies2004]_ with the addition of correcting -feed-down and using feed-down to correct lower order RDTs. -Details can be found in [#DillyNonlinearIRCorrections2022]_ . - -This script is written to be used stand-alone if needed -with python 3.6+ and with ``tfs-pandas`` installed. - - -.. rubric:: References - -.. [#BruningDynamicApertureStudies2004] - O. Bruening et al., - Dynamic aperture studies for the LHC separation dipoles. (2004) - https://cds.cern.ch/record/742967 - -.. [#DillyNonlinearIRCorrections2022] - J. Dilly et al., - Corrections of high-order nonlinear errors in the LHC and HL-LHC insertion regions. (2022) - - -author: Joschua Dilly - -""" -import argparse -import logging -import sys -from pathlib import Path -from time import time -from typing import Sequence, Tuple, Iterable, Sized, Set, Callable - -import numpy as np -import tfs -from pandas import DataFrame, Series - -LOG = logging.getLogger(__name__) - - -# Classes ---------------------------------------------------------------------- -class IRCorrector: - def __init__(self, field_component: str, accel: str, ip: int, side: str): - order, skew = field_component2order(field_component) - - self.field_component = field_component - self.order = order - self.skew = skew - self.accel = accel - self.ip = ip - self.side = side - - main_name = f'C{ORDER_NAME_MAP[order]}{SKEW_NAME_MAP[skew]}X' - extra = "F" if accel == "hllhc" and ip in [1, 5] else "" - self.type = f'M{main_name}{extra}' - self.name = f'{self.type}.{POSITION:d}{side}{ip:d}' - self.circuit = f'K{main_name}{POSITION:d}.{side}{ip:d}' - self.strength_component = f"K{order-1}{SKEW_NAME_MAP[skew]}L" # MAD-X order notation - self.value = 0 - - def __repr__(self): - return f"IRCorrector object {str(self)}" - - def __str__(self): - return f"{self.name} ({self.field_component}), {self.strength_component}: {self.value: 6E}" - - def __lt__(self, other): - if self.order == other.order: - if self.skew == other.skew: - if self.ip == other.ip: - return (self.side == SIDES[1]) < (other.side == SIDES[1]) - return self.ip < other.ip - return self.skew < other.skew - return self.order < other.order - - def __gt__(self, other): - if self.order == other.order: - if self.skew == other.skew: - if self.ip == other.ip: - return (self.side == SIDES[1]) > (other.side == SIDES[1]) - return self.ip > other.ip - return self.skew > other.skew - return self.order > other.order - - def __hash__(self): - return hash(self.name) - - def __eq__(self, other): - return self.name == other.name - - -class RDT: - def __init__(self, name: str): - self.name = name - self.jklm = tuple(int(i) for i in name[1:5]) - self.j, self.k, self.l, self.m = self.jklm - self.skew = bool((self.l + self.m) % 2) - self.order = sum(self.jklm) - self.swap_beta_exp = name.endswith("*") # swap beta-exponents - - def __repr__(self): - return f"RDT object {str(self)}" - - def __str__(self): - return f"{self.name} ({order2field_component(self.order, self.skew)})" - - def __lt__(self, other): - if self.order == other.order: - if self.skew == other.skew: - return len(self.name) < len(other.name) - return self.skew < other.skew - return self.order < other.order - - def __gt__(self, other): - if self.order == other.order: - if self.skew == other.skew: - return len(self.name) > len(other.name) - return self.skew > other.skew - return self.order > other.order - - def __hash__(self): - return hash(self.name) - - def __eq__(self, other): - return self.name == other.name - - -# Additional Classes --- - -class DotDict(dict): - """ Make dict fields accessible by attributes.""" - def __init__(self, *args, **kwargs): - super(DotDict, self).__init__(*args, **kwargs) - - __setattr__ = dict.__setitem__ - __delattr__ = dict.__delitem__ - - def __getattr__(self, key): - """ Needed to raise the correct exceptions """ - try: - return super(DotDict, self).__getitem__(key) - except KeyError as e: - raise AttributeError(e).with_traceback(e.__traceback__) from e - - -class Timer: - """ Collect Times and print a summary at the end. """ - def __init__(self, name: str = "start", print_fun: Callable[[str], None] = print): - self.steps = {} - self.step(name) - self.print = print_fun - - def step(self, name: str = None): - if not name: - name = str(len(self.steps)) - time_ = time() - self.steps[name] = time_ - return time_ - - def time_since_step(self, step=None): - if not step: - step = list(self.steps.keys())[-1] - dtime = time() - self.steps[step] - return dtime - - def time_between_steps(self, start: str = None, end: str = None): - list_steps = list(self.steps.keys()) - if not start: - start = list_steps[0] - if not end: - end = list_steps[-1] - dtime = self.steps[end] - self.steps[start] - return dtime - - def summary(self): - str_length = max(len(s) for s in self.steps.keys()) - time_length = len(f"{int(self.time_between_steps()):d}") - format_str = (f"{{step:{str_length}s}}:" - f" +{{dtime: {time_length:d}.5f}}s" - f" ({{ttime: {time_length:d}.3f}}s total)") - last_time = None - start_time = None - for step, step_time in self.steps.items(): - if last_time is None: - start_time = step_time - self.print(f"Timing Summary ----") - self.print(format_str.format(step=step, dtime=0, ttime=0)) - else: - self.print(format_str.format( - step=step, dtime=step_time-last_time, ttime=step_time-start_time) - ) - last_time = step_time - - -# Constants -------------------------------------------------------------------- - -POSITION = 3 # all NL correctors are at POSITION 3, to be adapted for Linear -ORDER_NAME_MAP = {1: "B", 2: "Q", 3: "S", 4: "O", 5: "D", 6: "T"} -SKEW_NAME_MAP = {True: "S", False: ""} -SKEW_FIELD_MAP = {True: "a", False: "b"} -FIELD_SKEW_MAP = {v: k for k, v in SKEW_FIELD_MAP.items()} -SKEW_CHAR_MAP = {True: "J", False: "K"} - -PLANES = ("X", "Y") -BETA = "BET" -DELTA = "D" -KEYWORD = "KEYWORD" -MULTIPOLE = "MULTIPOLE" -X, Y = PLANES -SIDES = ("L", "R") - - -# Default input options -DEFAULTS = {'feeddown': 0, - 'ips': [1, 2, 5, 8], - 'accel': 'lhc', - 'solver': 'lstsq', - 'update_optics': True, - 'iterations': 1, - 'ignore_corrector_settings': False, - 'rdts2': None, - 'ignore_missing_columns': False, - 'output': None, - } - -DEFAULT_RDTS = { - 'lhc': ('F0003', 'F0003*', # correct a3 errors with F0003 - 'F1002', 'F1002*', # correct b3 errors with F1002 - 'F1003', 'F3001', # correct a4 errors with F1003 and F3001 - 'F4000', 'F0004', # correct b4 errors with F4000 and F0004 - 'F6000', 'F0006', # correct b6 errors with F6000 and F0006 - ), - 'hllhc': ('F0003', 'F0003*', # correct a3 errors with F0003 - 'F1002', 'F1002*', # correct b3 errors with F1002 - 'F1003', 'F3001', # correct a4 errors with F1003 and F3001 - 'F0004', 'F4000', # correct b4 errors with F0004 and F4000 - 'F0005', 'F0005*', # correct a5 errors with F0005 - 'F5000', 'F5000*', # correct b5 errors with F5000 - 'F5001', 'F1005', # correct a6 errors with F5001 and F1005 - 'F6000', 'F0006', # correct b6 errors with F6000 and F0006 - ), -} - -EXT_TFS = ".tfs" # suffix for dataframe file -EXT_MADX = ".madx" # suffix for madx-command file - - -# Solving functions --- - -def _solve_linear(correctors, lhs, rhs): - res = np.linalg.solve(lhs, rhs) - _assign_corrector_values(correctors, res) - - -def _solve_invert(correctors, lhs, rhs): - res = np.linalg.inv(lhs).dot(rhs) - _assign_corrector_values(correctors, res) - - -def _solve_lstsq(correctors, lhs, rhs): - res = np.linalg.lstsq(lhs, rhs, rcond=None) - _assign_corrector_values(correctors, res[0]) - if len(res[1]): - LOG.info(f"Residuals ||I - Bx||_2: {list2str(res[1])}") - LOG.debug(f"Rank of Beta-Matrix: {res[2]}") - - -SOLVER_MAP = {'inv': _solve_invert, - 'lstsq': _solve_lstsq, - 'linear': _solve_linear,} - - -APPROXIMATE_SOLVERS = ['lstsq'] - - -# Main Functions --- - -def get_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser() - parser.add_argument( - "--optics", - dest="optics", - nargs="+", - help="Path(s) to optics file(s). Defines which elements to correct for!", - required=True, - ) - parser.add_argument( - "--errors", - dest="errors", - nargs="+", - help="Path(s) to error file(s).", - required=True, - ) - parser.add_argument( - "--beams", - dest="beams", - type=int, - nargs="+", - help="Which beam the files come from (1, 2 or 4)", - required=True, - ) - parser.add_argument( - "--output", - dest="output", - help=("Path to write command and tfs_df file. " - f"Extension (if given) is ignored and replaced with {EXT_TFS} and {EXT_MADX} " - "for the Dataframe and the command file respectively." - ), - ) - parser.add_argument( - "--rdts", - dest="rdts", - nargs="+", - help=("RDTs to correct." - " Format: 'Fjklm'; or 'Fjklm*'" - " to correct for this RDT in Beam 2 using" - " beta-symmetry (jk <-> lm)."), - ) - parser.add_argument( - "--rdts2", - dest="rdts2", - nargs="+", - help=("RDTs to correct for second beam/file, if different from first." - " Same format rules as for `rdts`."), - ) - parser.add_argument( - "--accel", - dest="accel", - type=str.lower, - choices=list(DEFAULT_RDTS.keys()), - default=DEFAULTS['accel'], - help="Which accelerator we have.", - ) - parser.add_argument( - "--feeddown", - dest="feeddown", - type=int, - help="Order of Feeddown to include.", - default=DEFAULTS['feeddown'], - ) - parser.add_argument( - "--ips", - dest="ips", - nargs="+", - help="In which IPs to correct.", - type=int, - default=DEFAULTS['ips'], - ) - parser.add_argument( - "--solver", - dest="solver", - help="Solving method to use.", - type=str.lower, - choices=list(SOLVER_MAP.keys()), - default=DEFAULTS['solver'], - ) - parser.add_argument( - "--update_optics", - dest="update_optics", - type=bool, - help=("Sorts the RDTs to start with the highest order and updates the " - "corrector strengths in the optics after calculation, so the " - "feeddown to lower order correctors is included." - ), - default=DEFAULTS["update_optics"] - ) - parser.add_argument( - "--iterations", - dest="iterations", - type=int, - help=("Reiterate correction, " - "starting with the previously calculated values."), - default=DEFAULTS["iterations"] - ) - parser.add_argument( - "--ignore_corrector_settings", - dest="ignore_corrector_settings", - help=("Ignore the current settings of the correctors. If this is not set " - "the corrector values of the optics are used as initial conditions."), - action="store_true", - ) - parser.add_argument( - "--ignore_missing_columns", - dest="ignore_missing_columns", - help=("If set, missing strength columns in any of the input files " - "are assumed to be zero, instead of raising an error."), - action="store_true", - ) - return parser - - -def main(**opt) -> Tuple[str, tfs.TfsDataFrame]: - """ Get correctors and their optimal powering to minimize the given RDTs. - - Keyword Args: - - optics (list[str/Path/DataFrame]): Path(s) to optics file(s) or DataFrame(s) of optics. - Needs to contain only the elements to be corrected - (e.g. only the ones in the IRs). - All elements from the error-files need to be present. - Required! - errors (list[str/Path/DataFrame]): Path(s) to error file(s) or DataFrame(s) of errors. - Can contain less elements than the optics files, - these elements are then assumed to have no errors. - Required! - beams (list[int]): Which beam the files come from (1, 2 or 4). - Required! - output (str/Path): Path to write command and tfs_df file. - Extension (if given) is ignored and replaced with '.tfs' and '.madx' - for the Dataframe and the command file respectively. - Default: ``None``. - rdts (list[str], dict[str, list[str]): - RDTs to correct. - Format: 'Fjklm'; or 'Fjklm*' to correct for - this RDT in Beam 2 using beta-symmetry (jk <-> lm). - The RDTs can be either given as a list, then the appropriate correctors - are determined by jklmn. - Alternatively, the input can be a dictionary, - where the keys are the RDT names as before, and the values are a list - of correctors to use, e.g. 'b5' for normal decapole corrector, - 'a3' for skew sextupole, etc. - If the order of the corrector is higher than the order of the RDT, - the feed-down from the corrector is used for correction. - In the case where multiple orders of correctors are used, - increasing ``iterations`` might be useful. - Default: Standard RDTs for given ``accel`` (see ``DEFAULT_RDTS`` in this file). - rdts2 (list[str], dict[str, list[str]): - RDTs to correct for second beam/file, if different from first. - Same format rules as for ``rdts``. Default: ``None``. - accel (str): Which accelerator we have. One of 'lhc', 'hllhc'. - Default: ``lhc``. - feeddown (int): Order of Feeddown to include. - Default: ``0``. - ips (list[int]): In which IPs to correct. - Default: ``[1, 2, 5, 8]``. - solver (str): Solver to use. One of 'lstsq', 'inv' or 'linear'. - Default ``lstsq``. - update_optics (bool): Sorts the RDTs to start with the highest order - and updates the corrector strengths in the optics - after calculation, so the feeddown to lower order - correctors is included. - Default: ``True``. - ignore_corrector_settings (bool): Ignore the current settings of the correctors. - If this is not set the corrector values of the - optics are used as initial conditions. - Default: ``False``. - ignore_missing_columns (bool): If set, missing strength columns in any - of the input files are assumed to be - zero, instead of raising an error. - Default: ``False``. - iterations (int): Reiterate correction, starting with the previously - calculated values. - Default: ``1``. - - - Returns: - - tuple[string, Dataframe]: - the string contains the madx-commands used to power the correctors; - the dataframe contains the same values in a pandas DataFrame. - """ - LOG.info("Starting IRNL Correction.") - timer = Timer("Start", print_fun=LOG.debug) - if not len(opt): - opt = vars(get_parser().parse_args()) - opt = DotDict(opt) - opt = _check_opt(opt) - timer.step("Opt Parsed") - - rdt_maps = sort_rdts(opt.rdts, opt.rdts2) - _check_corrector_order(rdt_maps, opt.update_optics, opt.feeddown) - needed_orders = _get_needed_orders(rdt_maps, opt.feeddown) - timer.step("RDT Sorted") - - optics_dfs = get_tfs(opt.optics) - errors_dfs = get_tfs(opt.errors) - timer.step("Optics Loaded") - - _check_dfs(optics_dfs, errors_dfs, opt.beams, needed_orders, opt.ignore_missing_columns) - switch_signs_for_beam4(optics_dfs, errors_dfs, opt.beams) - timer.step("Optics Checked") - - correctors = solve(rdt_maps, optics_dfs, errors_dfs, opt) - timer.step("Done") - - timer.summary() - if len(correctors) == 0: - raise EnvironmentError('No correctors found in input optics.') - - return get_and_write_output(opt.output, correctors) - - -def solve(rdt_maps, optics_dfs, errors_dfs, opt): - """ Calculate corrections. - They are grouped into rdt's with common correctors. If possible, these are - ordered from highest order to lowest, to be able to update optics and include - their feed-down. """ - all_correctors = [] - remaining_rdt_maps = rdt_maps - while remaining_rdt_maps: - current_rdt_maps, remaining_rdt_maps, corrector_names = _get_current_rdt_maps(remaining_rdt_maps) - - for ip in opt.ips: - correctors = get_available_correctors(corrector_names, opt.accel, ip, optics_dfs) - all_correctors += correctors - if not len(correctors): - continue # warnings are logged in get_available_correctors - - saved_corrector_values = init_corrector_and_optics_values(correctors, optics_dfs, - opt.update_optics, - opt.ignore_corrector_settings) - - beta_matrix, integral = build_equation_system( - current_rdt_maps, correctors, - ip, optics_dfs, errors_dfs, opt.feeddown, - ) - for iteration in range(opt.iterations): - solve_equation_system(correctors, beta_matrix, integral, opt.solver) # changes corrector values - update_optics(correctors, optics_dfs) # update corrector values in optics - - # update integral values after iteration: - integral_before = integral - _, integral = build_equation_system( - current_rdt_maps, [], # empty correctors list skips beta-matrix calculation - ip, optics_dfs, errors_dfs, opt.feeddown - ) - _log_correction(integral_before, integral, current_rdt_maps, optics_dfs, iteration, ip) - - LOG.info(f"Correction of IP{ip:d} complete.") - restore_optics_values(saved_corrector_values, optics_dfs) # hint: nothing saved if update_optics is True - return sorted(all_correctors) - - -def _get_current_rdt_maps(rdt_maps): - """ Creates a new rdt_map with all rdt's that share correctors. """ - n_maps = len(rdt_maps) - new_rdt_map = [{} for _ in rdt_maps] - for rdt_map in rdt_maps: - # get next RDT/correctors - if len(rdt_map): - rdt, correctors = list(rdt_map.items())[0] - break - else: - raise ValueError("rdt_maps are empty. " - "This should have triggered an end of the solver loop " - "earlier. Please investigate.") - - correctors = set(correctors) - checked_correctors = set() - while len(correctors): - # find all RDTs with the same correctors - checked_correctors |= correctors - additional_correctors = set() # new correctors found this loop - - for corrector in correctors: - for idx in range(n_maps): - for rdt_current, rdt_correctors in rdt_maps[idx].items(): - if corrector in rdt_correctors: - new_rdt_map[idx][rdt_current] = rdt_correctors - additional_correctors |= (set(rdt_correctors) - checked_correctors) - - correctors = additional_correctors - - remaining_rdt_map = [{k: v for k, v in rdt_maps[idx].items() - if k not in new_rdt_map[idx].keys()} for idx in range(n_maps)] - - if not any(len(rrm) for rrm in remaining_rdt_map): - remaining_rdt_map = None - - return new_rdt_map, remaining_rdt_map, checked_correctors - - -# RDT Sorting ------------------------------------------------------------------ - -def sort_rdts(rdts: Sequence, rdts2: Sequence) -> Tuple[dict, dict]: - """ Sorts RDTs by reversed-order and orientation (skew, normal). """ - LOG.debug("Sorting RDTs") - LOG.debug(" - First Optics:") - rdt_dict = _build_rdt_dict(rdts) - if rdts2 is not None: - LOG.debug(" - Second Optics:") - rdt_dict2 = _build_rdt_dict(rdts2) - else: - LOG.debug(" - Second Optics: same RDTs as first.") - rdt_dict2 = rdt_dict.copy() - return rdt_dict, rdt_dict2 - - -def _build_rdt_dict(rdts: Sequence) -> dict: - LOG.debug("Building RDT dictionary.") - if not isinstance(rdts, dict): - rdts = {rdt: [] for rdt in rdts} - - rdt_dict = {} - for rdt_name, correctors in rdts.items(): - rdt = RDT(rdt_name) - if not len(correctors): - skew = rdt.skew - correctors = [order2field_component(rdt.order, skew)] - - rdt_dict[rdt] = correctors - LOG.debug(f"Added: {rdt} with correctors: {list2str(correctors)}") - rdt_dict = dict(sorted(rdt_dict.items(), reverse=True)) # sorts by highest order and skew first - return rdt_dict - - -def _get_needed_orders(rdt_maps: Sequence[dict], feed_down: int) -> Sequence[int]: - """Returns the sorted orders needed for correction, based on the order - of the RDTs to correct plus the feed-down involved and the order of the - corrector, which can be higher than the RDTs in case one wants to correct - via feeddown.""" - needed_orders = set() - for rdt_map in rdt_maps: - for rdt, correctors in rdt_map.items(): - # get orders from RDTs + feed-down - for fd in range(feed_down+1): - needed_orders |= {rdt.order + fd, } - - # get orders from correctors - for corrector in correctors: - needed_orders |= {int(corrector[1]), } - return sorted(needed_orders) - - -# Equation System ------------------------------------------------------------- - -def build_equation_system(rdt_maps: Sequence[dict], correctors: Sequence[IRCorrector], ip: int, - optics_dfs: Sequence, errors_dfs: Sequence, - feeddown: int) -> Tuple[np.ndarray, np.ndarray]: - """ Builds equation system as in Eq. (43) in [#DillyNonlinearIRCorrections2022]_ - for a given ip for all given optics and error files (i.e. beams) and rdts. - - Returns - b_matrix: np.array N_rdts x N_correctors - integral: np.array N_rdts x 1 - """ - n_rdts = sum(len(rdt_map.keys()) for rdt_map, _ in zip(rdt_maps, optics_dfs)) - b_matrix = np.zeros([n_rdts, len(correctors)]) - integral = np.zeros([n_rdts, 1]) - - idx_row = 0 # row in equation system - for idx_file, rdts, optics_df, errors_df in zip(range(1, 3), rdt_maps, optics_dfs, errors_dfs): - for rdt, rdt_correctors in rdts.items(): - LOG.info(f"Calculating {rdt}, optics {idx_file}/{len(optics_dfs)}, IP{ip:d}") - integral[idx_row] = get_elements_integral(rdt, ip, optics_df, errors_df, feeddown) - - for idx_corrector, corrector in enumerate(correctors): - if corrector.field_component not in rdt_correctors: - continue - b_matrix[idx_row][idx_corrector] = get_corrector_coefficient(rdt, corrector, optics_df, errors_df) - idx_row += 1 - - return b_matrix, integral - - -def _log_correction(integral_before: np.array, integral_after: np.array, rdt_maps: Sequence[dict], - optics_dfs: Sequence[DataFrame], iteration: int, ip: int): - """ Log the correction initial and final value of this iteration. """ - LOG.info(f"RDT change in IP{ip:d}, iteration {iteration+1:d}:") - integral_iter = zip(integral_before, integral_after) - for idx_optics, rdts, _ in zip(range(1, 3), rdt_maps, optics_dfs): - for rdt in rdts.keys(): - val_before, val_after = next(integral_iter) - delta = val_after - val_before - LOG.info(f"Optics {idx_optics}, {rdt.name}: {val_before[0]:.2e} -> {val_after[0]:.2e} ({delta[0]:+.2e})") - - -def get_available_correctors(field_components: Set[str], accel: str, ip: int, - optics_dfs: Sequence[DataFrame]) -> Sequence[IRCorrector]: - """ Gets the available correctors by checking for their presence in the optics. - If the corrector is not found in this ip, the ip is skipped. - If only one corrector (left or right) is present a warning is issued. - If one corrector is present in only one optics (and not in the other) - an Environment Error is raised. """ - correctors = [] - for field_component in field_components: - corrector_not_found = [] - corrector_names = [] - for side in SIDES: - corrector = IRCorrector(field_component, accel, ip, side) - corr_in_optics = [_corrector_in_optics(corrector.name, df) for df in optics_dfs] - if all(corr_in_optics): - correctors.append(corrector) - corrector_names.append(corrector.name) - elif any(corr_in_optics): - # Raise informative Error - idx_true = corr_in_optics.index(True) - idx_false = (idx_true + 1) % 2 - raise EnvironmentError(f'Corrector {corrector.name} was found' - f'in the {idx2str(idx_true + 1)} optics' - f'but not in the {idx2str(idx_false + 1)}' - f'optics.') - else: - # Ignore Corrector - corrector_not_found.append(corrector.name) - - if len(corrector_not_found) == 1: - LOG.warning(f'Corrector {corrector_not_found[0]} could not be found in ' - f'optics, yet {corrector_names[0]} was present. ' - f'Correction will be performed with available corrector(s) only.') - elif len(corrector_not_found): - LOG.info(f'Correctors {list2str(corrector_not_found)} were not found in' - f' optics. Skipping IP{ip}.') - - return list(sorted(correctors)) # do not have to be sorted, but looks nicer - - -def init_corrector_and_optics_values(correctors: Sequence[IRCorrector], optics_dfs: Sequence[DataFrame], update_optics: bool, ignore_settings: bool): - """ If wished save original corrector values from optics (for later restoration) - and sync corrector values in list and optics. """ - saved_values = {} - - for corrector in correctors: - values = [df.loc[corrector.name, corrector.strength_component] for df in optics_dfs] - - if not update_optics: - saved_values[corrector] = values - - if ignore_settings: - # set corrector value in optics to zero - for df in optics_dfs: - df.loc[corrector.name, corrector.strength_component] = 0 - else: - if any(np.diff(values)): - raise ValueError(f"Initial value for corrector {corrector.name} differs " - f"between optics.") - # use optics value as initial value - corrector.value = values[0] - return saved_values - - -def restore_optics_values(saved_values: dict, optics_dfs: Sequence[DataFrame]): - """ Restore saved initial corrector values (if any) to optics. """ - for corrector, values in saved_values.items(): - for df, val in zip(optics_dfs, values): - df.loc[corrector.name, corrector.strength_component] = val - - -def get_elements_integral(rdt, ip, optics_df, errors_df, feeddown): - """ Calculate the RDT integral for all elements of the IP. """ - integral = 0 - lm, jk = rdt.l + rdt.m, rdt.j + rdt.k - # Integral on side --- - for side in SIDES: - LOG.debug(f" - Integral on side {side}.") - side_sign = get_integral_sign(rdt.order, side) - - # get IP elements, errors and twiss have same elements because of check_dfs - elements = optics_df.index[optics_df.index.str.match(fr".*{side}{ip:d}(\.B[12])?")] - - betax = optics_df.loc[elements, f"{BETA}{X}"] - betay = optics_df.loc[elements, f"{BETA}{Y}"] - if rdt.swap_beta_exp: - # in case of beta-symmetry, this corrects for the same RDT in the opposite beam. - betax = betax**(lm/2.) - betay = betay**(jk/2.) - else: - betax = betax**(jk/2.) - betay = betay**(lm/2.) - - dx = optics_df.loc[elements, X] + errors_df.loc[elements, f"{DELTA}{X}"] - dy = optics_df.loc[elements, Y] + errors_df.loc[elements, f"{DELTA}{Y}"] - dx_idy = dx + 1j*dy - - k_sum = Series(0j, index=elements) # Complex sum of strengths (from K_n + iJ_n) and feed-down to them - - for q in range(feeddown+1): - n_mad = rdt.order+q-1 - kl_opt = optics_df.loc[elements, f"K{n_mad:d}L"] - kl_err = errors_df.loc[elements, f"K{n_mad:d}L"] - iksl_opt = 1j*optics_df.loc[elements, f"K{n_mad:d}SL"] - iksl_err = 1j*errors_df.loc[elements, f"K{n_mad:d}SL"] - - k_sum += ((kl_opt + kl_err + iksl_opt + iksl_err) * - (dx_idy**q) / np.math.factorial(q)) - - integral += -sum(np.real(i_pow(lm) * k_sum.to_numpy()) * (side_sign * betax * betay).to_numpy()) - LOG.debug(f" -> Sum value: {integral}") - return integral - - -def get_corrector_coefficient(rdt: RDT, corrector: IRCorrector, optics_df: DataFrame, errors_df: DataFrame): - """ Calculate B-Matrix Element for Corrector. """ - LOG.debug(f" - Corrector {corrector.name}.") - lm, jk = rdt.l + rdt.m, rdt.j + rdt.k - - sign_i = np.real(i_pow(lm + (lm % 2))) # i_pow is always real - sign_corrector = sign_i * get_integral_sign(rdt.order, corrector.side) - - betax = optics_df.loc[corrector.name, f"{BETA}{X}"] - betay = optics_df.loc[corrector.name, f"{BETA}{Y}"] - if rdt.swap_beta_exp: - # in case of beta-symmetry, this corrects for the same RDT in the opposite beam. - betax = betax**(lm/2.) - betay = betay**(jk/2.) - else: - betax = betax**(jk/2.) - betay = betay**(lm/2.) - - z = 1 - p = corrector.order - rdt.order - if p: - # Corrector contributes via feed-down - dx = optics_df.loc[corrector.name, X] + errors_df.loc[corrector.name, f"{DELTA}{X}"] - dy = optics_df.loc[corrector.name, Y] + errors_df.loc[corrector.name, f"{DELTA}{Y}"] - dx_idy = dx + 1j*dy - z_cmplx = (dx_idy**p) / np.math.factorial(p) - if (corrector.skew and is_odd(lm)) or (not corrector.skew and is_even(lm)): - z = np.real(z_cmplx) - else: - z = np.imag(z_cmplx) - if not corrector.skew: - z = -z - if abs(z) < 1e-15: - LOG.warning(f"Z-coefficient for {corrector.name} in {rdt.name} is very small.") - - return sign_corrector * z * betax * betay - - -def get_integral_sign(n: int, side: str) -> int: - """ Sign of the integral and corrector for this side. - - This is the exp(iπnθ(s_w−s_IP)) part of Eq. (40) in - [#DillyNonlinearIRCorrections2022]_, - """ - if side == "R": - # return (-1)**n - return -1 if n % 2 else 1 - return 1 - - -def _corrector_in_optics(name: str, df: DataFrame) -> bool: - """ Checks if corrector is present in optics and has the KEYWORD MULTIPOLE. """ - if name not in df.index: - LOG.debug(f'{name} not found in optics.') - return False - - if KEYWORD in df.columns: - if df.loc[name, KEYWORD].upper() != MULTIPOLE: - LOG.warning(f"{name} found in optics, yet the Keyword was {df.loc[name, KEYWORD]}" - f" (should be '{MULTIPOLE}').") - return False - else: - LOG.warning(f"'{KEYWORD}' column not found in optics." - f" Assumes you have filtered {MULTIPOLE}s beforehand!") - - return True - - -# Solve --- - -def solve_equation_system(correctors: Sequence[IRCorrector], lhs: np.array, rhs: np.array, solver: str): - """ Solves the system with the given solver. - - The result is transferred to the corrector values internally. """ - if len(rhs) > len(correctors) and solver not in APPROXIMATE_SOLVERS: - raise ValueError("Overdetermined equation systems can only be solved " - "by one of the approximate solvers" - f" '{list2str(APPROXIMATE_SOLVERS)}'. " - f"Instead '{solver}' was chosen.") - - LOG.debug(f"Solving Equation system via {solver}.") - # lhs x corrector = rhs -> correctors = lhs\rhs - SOLVER_MAP[solver](correctors, lhs, rhs) - - -def _assign_corrector_values(correctors: Sequence[IRCorrector], values: Sequence): - """ Assigns calculated values to the correctors. """ - for corr, val in zip(correctors, values): - if len(val) > 1: - raise ValueError(f"Multiple Values for corrector {str(corr)} found." - f" There should be only one.") - LOG.debug(f"Updating Corrector: {str(corr)} {val[0]:+.2e}.") - corr.value += val[0] - LOG.info(str(corr)) - - -# Update Optics ---------------------------------------------------------------- - -def update_optics(correctors: Sequence[IRCorrector], optics_dfs: Sequence[DataFrame]): - """ Updates the corrector strength values in the current optics. """ - for optics in optics_dfs: - for corrector in correctors: - optics.loc[corrector.name, corrector.strength_component] = corrector.value - - -# Order Checks ----------------------------------------------------------------- - -def _check_corrector_order(rdt_maps: Sequence[dict], update_optics: bool, feed_down: int): - """ Perform checks on corrector orders compared to RDT orders and feed-down. """ - for rdt_map in rdt_maps: - for rdt, correctors in rdt_map.items(): - _check_corrector_order_not_lower(rdt, correctors) - _check_update_optics(rdt, correctors, rdt_map, update_optics, feed_down) - - -def _check_update_optics(rdt: RDT, correctors: list, rdt_map: dict, update_optics: bool, feed_down: int): - """ Check if corrector values are set before they would be needed for feed-down. """ - if not update_optics: - return - - for corrector in correctors: - corrector_order = int(corrector[1]) - for rdt_comp in rdt_map.keys(): - if rdt_comp.order == rdt.order: - break - if rdt_comp.order <= corrector_order <= rdt_comp.order + feed_down: - raise ValueError( - "Updating the optics is in this configuration not possible," - f" as corrector {corrector} influences {rdt_comp.name}" - f" with the given feeddown of {feed_down}. Yet the value of" - f" the corrector is defined by {rdt.name}.") - - -def _check_corrector_order_not_lower(rdt, correctors): - """ Check if only higher and equal order correctors are defined to correct - a given rdt.""" - wrong_correctors = [c for c in correctors if int(c[1]) < rdt.order] - if len(wrong_correctors): - raise ValueError( - "Correctors can not correct RDTs of higher order." - f" Yet for {rdt.name} the corrector(s)" - f" '{list2str(wrong_correctors)}' was (were) given." - ) - - -# IO Handling ------------------------------------------------------------------ -# Input --- - -def _check_opt(opt: DotDict) -> DotDict: - # check for unkown input - parser = get_parser() - known_opts = [a.dest for a in parser._actions if not isinstance(a, argparse._HelpAction)] # best way I could figure out - unknown_opts = [k for k in opt.keys() if k not in known_opts] - if len(unknown_opts): - raise AttributeError(f"Unknown arguments found: '{list2str(unknown_opts)}'.\n" - f"Allowed input parameters are: '{list2str(known_opts)}'") - - # Set defaults - for name, default in DEFAULTS.items(): - if opt.get(name) is None: - LOG.debug(f"Setting input '{name}' to default value '{default}'.") - opt[name] = default - - # check accel - opt.accel = opt.accel.lower() # let's not care about case - if opt.accel not in DEFAULT_RDTS.keys(): - raise ValueError(f"Parameter 'accel' needs to be one of '{list2str(list(DEFAULT_RDTS.keys()))}' " - f"but was '{opt.accel}' instead.") - - # Set rdts: - if opt.get('rdts') is None: - opt.rdts = DEFAULT_RDTS[opt.accel] - - # Check required and rdts: - for name in ('optics', 'errors', 'beams', 'rdts'): - inputs = opt.get(name) - if inputs is None or isinstance(inputs, str) or not isinstance(inputs, (Iterable, Sized)): - raise ValueError(f"Parameter '{name}' is required and needs to be " - "iterable, even if only of length 1. " - f"Instead was '{inputs}'.") - - # Copy DataFrames as they might be modified - opt.optics = [o.copy() for o in opt.optics] - opt.errors = [e.copy() for e in opt.errors] - - if opt.feeddown < 0 or not (opt.feeddown == int(opt.feeddown)): - raise ValueError("'feeddown' needs to be a positive integer.") - - if opt.iterations < 1: - raise ValueError("At least one iteration (see: 'iterations') needs to " - "be done for correction.") - return opt - - -def get_tfs(paths: Sequence) -> Sequence[tfs.TfsDataFrame]: - if isinstance(paths[0], str) or isinstance(paths[0], Path): - return tuple(tfs.read_tfs(path, index="NAME") for path in paths) - return paths - - -def _check_dfs(optics_dfs, errors_dfs, beams, orders, ignore_missing_columns): - if len(optics_dfs) > 2 or len(errors_dfs) > 2: - raise NotImplementedError("A maximum of two optics can be corrected " - "at the same time, for now.") - - if len(optics_dfs) != len(errors_dfs): - raise ValueError(f"The number of given optics ({len(optics_dfs):d}) " - "does not equal the number of given error files " - f"({len(errors_dfs):d}). Hint: it should.") - - if len(optics_dfs) != len(beams): - raise ValueError(f"The number of given optics ({len(optics_dfs):d}) " - "does not equal the number of given beams " - f"({len(beams):d}). Please specify a beam for each " - "optics.") - - for idx_file, (optics, errors) in enumerate(zip(optics_dfs, errors_dfs)): - not_found_errors = errors.index.difference(optics.index) - if len(not_found_errors): - raise IOError("The following elements were found in the " - f"{idx2str(idx_file)} given errors file but not in" - f"the optics: {list2str(not_found_errors.to_list())}") - - not_found_optics = optics.index.difference(errors.index) - if len(not_found_optics): - LOG.debug("The following elements were found in the " - f"{idx2str(idx_file)} given optics file but not in " - f"the errors: {list2str(not_found_optics.to_list())}." - f"They are assumed to be zero.") - for indx in not_found_optics: - errors.loc[indx, :] = 0 - - needed_values = [f"K{order-1:d}{orientation}L" # -1 for madx-order - for order in orders - for orientation in ("S", "")] - - for df, file_type in ((optics, "optics"), (errors, "error")): - not_found_strengths = [s for s in needed_values if s not in df.columns] - if len(not_found_strengths): - text = ("Some strength values were not found in the " - f"{idx2str(idx_file)} given {file_type} file: " - f"{list2str(not_found_strengths)}.") - - if not ignore_missing_columns: - raise IOError(text) - LOG.warning(text + " They are assumed to be zero.") - for kl in not_found_strengths: - df[kl] = 0 - - -def switch_signs_for_beam4(optics_dfs: Iterable[tfs.TfsDataFrame], - error_dfs: Iterable[tfs.TfsDataFrame], beams: Iterable[int]): - """ Switch the signs for Beam 4 optics. - This is due to the switch in direction for this beam and - (anti-) symmetry after a rotation of 180deg around the y-axis of magnets, - combined with the fact that the KL values in MAD-X twiss do not change sign, - but in the errors they do (otherwise it would compensate). - Magnet orders that show anti-symmetry are: a1 (K0SL), b2 (K1L), a3 (K2SL), b4 (K3L) etc. - Also the sign for (delta) X is switched back to have the same orientation as beam2.""" - for idx, (optics_df, error_df, beam) in enumerate(zip(optics_dfs, error_dfs, beams)): - if beam == 4: - LOG.debug(f"Beam 4 input found. Switching signs for X and K(S)L values when needed.") - optics_df[X] = -optics_df[X] - error_df[f"{DELTA}{X}"] = -error_df[f"{DELTA}{X}"] - - max_order = error_df.columns.str.extract(r"^K(\d+)S?L$", expand=False).dropna().astype(int).max() - for order in range(max_order+1): - name = f"K{order:d}{SKEW_NAME_MAP[is_even(order)]}L" - if name in error_df.columns: - error_df[name] = -error_df[name] - - -# Output -------- - -def get_and_write_output(out_path: str, correctors: Sequence) -> Tuple[str, tfs.TfsDataFrame]: - correction_text = build_correction_str(correctors) - correction_df = build_correction_df(correctors) - - if out_path is not None: - out_path = Path(out_path) - write_command(out_path, correction_text) - write_tfs(out_path, correction_df) - - return correction_text, correction_df - - -# Build --- - -def build_correction_df(correctors: Sequence) -> tfs.TfsDataFrame: - attributes = vars(correctors[0]) - return tfs.TfsDataFrame( - data=[[getattr(cor, attr) for attr in attributes] for cor in correctors], - columns=attributes, - ) - - -def build_correction_str(correctors: Sequence) -> str: - return _build_correction_str(corr for corr in correctors) - - -def build_correction_str_from_df(correctors_df: DataFrame) -> str: - return _build_correction_str(row[1] for row in correctors_df.iterrows()) - - -def _build_correction_str(iterator: Iterable) -> str: - """ Creates madx commands (assignments) to run for correction""" - last_type = '' - text = '' - for corr in iterator: - if not _types_almost_equal(corr.type, last_type): - text += f"\n!! {_nice_type(corr.type)} ({corr.field_component}) corrector\n" - last_type = corr.type - text += f"{corr.circuit} := {corr.value: 6E} / l.{corr.type} ;\n" - return text.lstrip("\n") - - -def _types_almost_equal(type_a: str, type_b: str) -> bool: - """ Groups correctors with and without ending F. """ - if type_a == type_b: - return True - if type_a.endswith('F'): - return type_a[:-1] == type_b - if type_b.endswith('F'): - return type_b[:-1] == type_a - - -def _nice_type(mtype: str) -> str: - if mtype.endswith('F'): - return f'{mtype[:-1]}[F]' - return mtype - - -# Write --- - -def write_command(out_path: Path, correction_text: str): - out_path_cmd = out_path.with_suffix(EXT_MADX) - with open(out_path_cmd, "w") as out: - out.write(correction_text) - - -def write_tfs(out_path: Path, correction_df: tfs.TfsDataFrame): - out_path_df = out_path.with_suffix(EXT_TFS) - tfs.write(out_path_df, correction_df) - - -# Helper ----------------------------------------------------------------------- - -def list2str(list_: list) -> str: - return str(list_).strip('[]') - - -def idx2str(idx: int) -> str: - return {1: 'first', 2: 'second', 3: 'third', 4: 'fourth', 5: 'fifth'}[idx+1] - - -def order2field_component(order: int, skew: bool) -> str: - return f"{SKEW_FIELD_MAP[skew]:s}{order:d}" - - -def field_component2order(field_component) -> Tuple[int, bool]: - return int(field_component[1]), FIELD_SKEW_MAP[field_component[0]] - - -def is_odd(n): - return bool(n % 2) - - -def is_even(n): - return not is_odd(n) - - -def i_pow(n): - return 1j**(n % 4) # more exact with modulo - - -# Script Mode ------------------------------------------------------------------ - - -def log_setup(): - """ Set up a basic logger. """ - logging.basicConfig( - stream=sys.stdout, - level=logging.INFO, - format="%(levelname)7s | %(message)s | %(name)s" - ) - - -if __name__ == '__main__': - log_setup() - main() diff --git a/tests/inputs/model_lhc_thin_30cm/job.lhc.b1.madx_commands.madx b/tests/inputs/model_lhc_thin_30cm/job.lhc.b1.madx_commands.madx deleted file mode 100644 index 6ac6e1e..0000000 --- a/tests/inputs/model_lhc_thin_30cm/job.lhc.b1.madx_commands.madx +++ /dev/null @@ -1,53 +0,0 @@ -mylhcbeam = 1; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/toolkit/macro.madx"; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/lhc_as-built.seq"; -slicefactor = 4; -beam; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/toolkit/myslice.madx"; -beam; -use, sequence=lhcb1; -makethin, sequence=lhcb1, style=teapot, makedipedge=true; -seqedit, sequence=lhcb1; -flatten; -cycle, start=IP3; -endedit; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/PROTON/opticsfile.22_ctpps2"; -beam, sequence=lhcb1, bv=1, energy:=NRJ, particle=proton, npart=10000000000.0, kbunch=1, ex=7.29767146889e-09, ey=7.29767146889e-09; -on_crab1 = 0; -on_crab5 = 0; -on_o1 = 0; -on_sep2 = 1.4; -on_oe2 = 0; -on_ov2 = 0; -on_o5 = 0; -on_ov5 = 0; -on_sep8h = 0; -on_x8v = 0; -on_alice = 0; -on_sol_alice = 0; -on_lhcb = 0; -on_sol_atlas = 0; -on_sol_cms = 0; -use, sequence=lhcb1; -match, chrom=true; -global, sequence=lhcb1, q1=62.31, q2=60.32; -vary, name="dqx.b1", step=1e-07; -vary, name="dqy.b1", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -match, chrom=true; -global, sequence=lhcb1, dq1=3, dq2=3; -vary, name="dqpx.b1", step=1e-07; -vary, name="dqpy.b1", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -match, chrom=true; -global, sequence=lhcb1, q1=62.31, q2=60.32, dq1=3, dq2=3; -vary, name="dqx.b1", step=1e-07; -vary, name="dqy.b1", step=1e-07; -vary, name="dqpx.b1", step=1e-07; -vary, name="dqpy.b1", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -twiss, sequence=lhcb1, file="twiss.lhc.b1.nominal.tfs"; -quit; diff --git a/tests/inputs/model_lhc_thin_30cm/job.lhc.b2.madx_commands.madx b/tests/inputs/model_lhc_thin_30cm/job.lhc.b2.madx_commands.madx deleted file mode 100644 index 3434b1a..0000000 --- a/tests/inputs/model_lhc_thin_30cm/job.lhc.b2.madx_commands.madx +++ /dev/null @@ -1,53 +0,0 @@ -mylhcbeam = 2; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/toolkit/macro.madx"; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/lhc_as-built.seq"; -slicefactor = 4; -beam; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/toolkit/myslice.madx"; -beam; -use, sequence=lhcb2; -makethin, sequence=lhcb2, style=teapot, makedipedge=true; -seqedit, sequence=lhcb2; -flatten; -cycle, start=IP3; -endedit; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/PROTON/opticsfile.22_ctpps2"; -beam, sequence=lhcb2, bv=-1, energy:=NRJ, particle=proton, npart=10000000000.0, kbunch=1, ex=7.29767146889e-09, ey=7.29767146889e-09; -on_crab1 = 0; -on_crab5 = 0; -on_o1 = 0; -on_sep2 = 1.4; -on_oe2 = 0; -on_ov2 = 0; -on_o5 = 0; -on_ov5 = 0; -on_sep8h = 0; -on_x8v = 0; -on_alice = 0; -on_sol_alice = 0; -on_lhcb = 0; -on_sol_atlas = 0; -on_sol_cms = 0; -use, sequence=lhcb2; -match, chrom=true; -global, sequence=lhcb2, q1=62.31, q2=60.32; -vary, name="dqx.b2", step=1e-07; -vary, name="dqy.b2", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -match, chrom=true; -global, sequence=lhcb2, dq1=3, dq2=3; -vary, name="dqpx.b2", step=1e-07; -vary, name="dqpy.b2", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -match, chrom=true; -global, sequence=lhcb2, q1=62.31, q2=60.32, dq1=3, dq2=3; -vary, name="dqx.b2", step=1e-07; -vary, name="dqy.b2", step=1e-07; -vary, name="dqpx.b2", step=1e-07; -vary, name="dqpy.b2", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -twiss, sequence=lhcb2, file="twiss.lhc.b2.nominal.tfs"; -quit; diff --git a/tests/inputs/model_lhc_thin_30cm/job.lhc.b4.madx_commands.madx b/tests/inputs/model_lhc_thin_30cm/job.lhc.b4.madx_commands.madx deleted file mode 100644 index 2f00e86..0000000 --- a/tests/inputs/model_lhc_thin_30cm/job.lhc.b4.madx_commands.madx +++ /dev/null @@ -1,53 +0,0 @@ -mylhcbeam = 4; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/toolkit/macro.madx"; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/lhcb4_as-built.seq"; -slicefactor = 4; -beam; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/toolkit/myslice.madx"; -beam; -use, sequence=lhcb2; -makethin, sequence=lhcb2, style=teapot, makedipedge=true; -seqedit, sequence=lhcb2; -flatten; -cycle, start=IP3; -endedit; -call, file="/afs/cern.ch/eng/lhc/optics/runII/2018/PROTON/opticsfile.22_ctpps2"; -beam, sequence=lhcb2, bv=1, energy:=NRJ, particle=proton, npart=10000000000.0, kbunch=1, ex=7.29767146889e-09, ey=7.29767146889e-09; -on_crab1 = 0; -on_crab5 = 0; -on_o1 = 0; -on_sep2 = 1.4; -on_oe2 = 0; -on_ov2 = 0; -on_o5 = 0; -on_ov5 = 0; -on_sep8h = 0; -on_x8v = 0; -on_alice = 0; -on_sol_alice = 0; -on_lhcb = 0; -on_sol_atlas = 0; -on_sol_cms = 0; -use, sequence=lhcb2; -match, chrom=true; -global, sequence=lhcb2, q1=62.31, q2=60.32; -vary, name="dqx.b2", step=1e-07; -vary, name="dqy.b2", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -match, chrom=true; -global, sequence=lhcb2, dq1=3, dq2=3; -vary, name="dqpx.b2", step=1e-07; -vary, name="dqpy.b2", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -match, chrom=true; -global, sequence=lhcb2, q1=62.31, q2=60.32, dq1=3, dq2=3; -vary, name="dqx.b2", step=1e-07; -vary, name="dqy.b2", step=1e-07; -vary, name="dqpx.b2", step=1e-07; -vary, name="dqpy.b2", step=1e-07; -lmdif, calls=100, tolerance=1e-21; -endmatch; -twiss, sequence=lhcb2, file="twiss.lhc.b4.nominal.tfs"; -quit; diff --git a/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b1.nominal.hd5 b/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b1.nominal.hd5 deleted file mode 100644 index a97ab69..0000000 Binary files a/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b1.nominal.hd5 and /dev/null differ diff --git a/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b2.nominal.hd5 b/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b2.nominal.hd5 deleted file mode 100644 index a1c818d..0000000 Binary files a/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b2.nominal.hd5 and /dev/null differ diff --git a/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b4.nominal.hd5 b/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b4.nominal.hd5 deleted file mode 100644 index ead01f8..0000000 Binary files a/tests/inputs/model_lhc_thin_30cm/twiss.lhc.b4.nominal.hd5 and /dev/null differ diff --git a/tests/unit/test_irnl_rdt_correction.py b/tests/unit/test_irnl_rdt_correction.py deleted file mode 100644 index 6b5a651..0000000 --- a/tests/unit/test_irnl_rdt_correction.py +++ /dev/null @@ -1,821 +0,0 @@ -from typing import List, Iterable - -import numpy as np -import pandas as pd -import pytest -import re -import tfs -from pathlib import Path -from pandas.testing import assert_frame_equal, assert_series_equal -from pylhc.irnl_rdt_correction import ( - main as irnl_correct, BETA, KEYWORD, X, Y, MULTIPOLE, - get_integral_sign, list2str, switch_signs_for_beam4, - IRCorrector, RDT -) -from pylhc.utils import tfs_tools - -INPUTS = Path(__file__).parent.parent / "inputs" -LHC_MODELS_PATH = INPUTS / "model_lhc_thin_30cm" - -EPS = 1e-13 # to compare floating point numbers against -ABC = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" # the alphabet -MAX_N = 6 # 2 == Sextupole - -PLACEHOLDER = "PLACEHOLDER" # MADX Keyword PLACEHOLDER - -# fields of IRCorrector --> columns in corrections tfs -VALUE = "value" -STRENGTH = "strength_component" -FIELD = "field_component" -ORDER = "order" -IP = "ip" -CIRCUIT = "circuit" -NAME = "name" - - -class TestStandardCorrection: - @pytest.mark.parametrize('order', range(3, MAX_N+1)) # 3 == Sextupole - @pytest.mark.parametrize('orientation', ('skew', 'normal')) - @pytest.mark.parametrize('accel', ('lhc', 'hllhc')) - def test_basic_correction(self, tmp_path: Path, order: int, orientation: str, accel: str): - """Tests the basic correction functionality and performs some sanity checks. - Operates on a pseudo-model so that the corrector values are easily known. - Sanity Checks: - - all correctors found - - correctors have the correct value (as set by errors or zero) - - all corrector circuits present in madx-script - """ - # Parameters ----------------------------------------------------------- - if accel == 'lhc': - if order == 5: - pytest.skip("LHC has no decapole correctors") - if order == 6 and orientation == 'skew': - pytest.skip("LHC has no skew dodecapole correctors") - - orientation = "S" if orientation is "skew" else "" - - correct_ips = (1, 3) - error_value = 2 - n_magnets = 4 - n_ips = 4 - - n_correct_ips = len(correct_ips) - n_sides = len("LR") - n_orientation = len(["S", ""]) - - # Setup ---------------------------------------------------------------- - optics = generate_pseudo_model(accel=accel, n_ips=n_ips, n_magnets=n_magnets) - errors = generate_errortable(index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets)) - error_component = f"K{order-1}{orientation}L" - errors[error_component] = error_value - - if order % 2: # order is odd -> sides have different sign in rdt - left_hand_magnets = errors.index.str.match(r".*L\d$") - errors.loc[left_hand_magnets, error_component] = errors.loc[left_hand_magnets, error_component] / 2 # so they don't fully compensate - - # Correction ----------------------------------------------------------- - madx_corrections, df_corrections = irnl_correct( - accel=accel, - optics=[optics], - errors=[errors], - beams=[1], - output=tmp_path / "correct", - feeddown=0, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - # Testing -------------------------------------------------------------- - # Check output data --- - assert len(list(tmp_path.glob("correct.*"))) == 2 - - # Check all found correctors --- - if accel == 'lhc': - assert len(df_corrections.index) == ( - n_orientation * n_sides * n_correct_ips * len("SO") + - n_sides * n_correct_ips * len("T") - ) - - if accel == 'hllhc': - assert len(df_corrections.index) == n_orientation * n_sides * n_correct_ips * len("SODT") - - # All circuits in madx script --- - for circuit in df_corrections[CIRCUIT]: - assert circuit in madx_corrections - - # Check corrector values --- - for test_order in range(3, MAX_N+1): - for test_orientation in ("S", ""): - for ip in correct_ips: - mask = ( - (df_corrections[STRENGTH] == f"K{test_order-1}{test_orientation}L") & - (df_corrections[IP] == ip) - ) - if (test_order == order) and (test_orientation == orientation): - if order % 2: - corrector_strengths = sum(df_corrections.loc[mask, VALUE]) - assert abs(corrector_strengths) < EPS # correctors should be equally distributed - - corrector_strengths = -sum(df_corrections.loc[mask, VALUE].abs()) - # as beta cancels out (and is 1 anyway) - error_strengths = n_magnets * error_value / 2 # account for partial compensation (from above) - else: - corrector_strengths = sum(df_corrections.loc[mask, VALUE]) - assert all(abs(df_corrections.loc[mask, VALUE] - corrector_strengths / n_sides) < EPS) - # as beta cancels out (and is 1 anyway) - error_strengths = (n_sides * n_magnets * error_value) - assert abs(corrector_strengths + error_strengths) < EPS # compensation of RDT - else: - assert all(df_corrections.loc[mask, VALUE] == 0.) - - @pytest.mark.parametrize('beam', (1, 2, 4)) - def test_lhc_correction(self, tmp_path: Path, beam: int): - """Test LHC optics with random errors assigned. - Sanity Checks: - - all correctors found - - all correctors have a value - - all corrector circuits present in madx-script - """ - # Setup ---------------------------------------------------------------- - np.random.seed(20211108) - optics = read_lhc_model(beam) - mask_ir = _get_ir_magnets_mask(optics.index) - optics = optics.loc[mask_ir, :] - correctors = optics.index[_get_corrector_magnets_mask(optics.index)] - correct_ips = (1, 5) - correctors = [c for c in correctors if int(c[-1]) in correct_ips] - - errors = generate_errortable(index=optics.index) - - # here: 2 == sextupole - errors.loc[:, [f"K{order}{orientation}L" - for order in range(2, MAX_N) for orientation in ("S", "")]] = np.random.random([len(errors.index), 8]) - if beam == 4: - negative_columns = _get_opposite_sign_beam4_kl_columns(range(2, MAX_N)) - errors.loc[:, negative_columns] = -errors.loc[:, negative_columns] - - # Correction ----------------------------------------------------------- - madx_corrections, df_corrections = irnl_correct( - accel='lhc', - optics=[optics], - errors=[errors], - beams=[beam], - output=tmp_path / "correct", - feeddown=0, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - # Testing -------------------------------------------------------------- - # Check output data --- - assert len(list(tmp_path.glob("correct.*"))) == 2 - - # All correctors present with a value --- - assert len(df_corrections.index) == 2 * 2 * 5 - 1 # sides * ips * corrector orders - faulty MCOSX.3L1 - assert all(df_corrections[VALUE] != 0) - - found_correctors = df_corrections[NAME].to_numpy() - for name in correctors: - if optics.loc[name, KEYWORD] == PLACEHOLDER: - continue - assert name in found_correctors - - # all corrector strengths are negative because all errors are positive (np.random.random) - # this checks, that there is no sign-change between beam 1, 2 and 4. - assert all(df_corrections[VALUE] < 0) - - # All circuits in madx script --- - for circuit in df_corrections[CIRCUIT]: - assert circuit in madx_corrections - - -class TestDualOptics: - def test_dual_optics(self, tmp_path: Path): - """Test that given two different optics, an approximative solution - will be found.""" - # Parameters ----------------------------------------------------------- - accel = 'hllhc' - - correct_ips = (1, 3) - n_magnets = 4 - n_ips = 4 - n_sides = 2 - - # Setup ---------------------------------------------------------------- - beta = 2 - error_value = 2 - optics1 = generate_pseudo_model( - accel=accel, n_ips=n_ips, n_magnets=n_magnets, betax=beta, betay=beta) - errors1 = generate_errortable( - index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets), - value=error_value, - ) - - # Optics 2 - beta2 = 4 - error_value2 = 3 * error_value - optics2 = generate_pseudo_model( - accel=accel, n_ips=n_ips, n_magnets=n_magnets, betax=beta2, betay=beta2) - errors2 = generate_errortable( - index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets), - value=error_value2, - ) - - # Correction --------------------------------------------------------------- - rdt = "f4000" - - # The corrector values in this example are not uniquely defined - # so these methods will fail: - for solver in ["inv", "linear"]: - with pytest.raises(np.linalg.LinAlgError): - _, df_corrections = irnl_correct( - accel=accel, - optics=[optics1, optics2], - errors=[errors1, errors2], - beams=[1, 1], - rdts=[rdt, ], - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - solver=solver - ) - - # Best approximation for corrector values, via least-squares: - _, df_corrections = irnl_correct( - accel=accel, - optics=[optics1, optics2], - errors=[errors1, errors2], - beams=[1, 1], - rdts=[rdt, ], - output=tmp_path / "correct_dual", - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - solver="lstsq", - ) - - # as beta cancels out: - error_strengths1 = n_sides * n_magnets * error_value - error_strengths2 = n_sides * n_magnets * error_value2 - - # build the equation system manually, and solve with least square - # (basically what the correction should do): - exp_x = (int(rdt[1]) + int(rdt[2])) / 2 - exp_y = (int(rdt[2]) + int(rdt[3])) / 2 - b1 = beta**(exp_x+exp_y) - b2 = beta2**(exp_x+exp_y) - dual_correction = np.linalg.lstsq(np.array([[b1, b1], [b2, b2]]), - np.array([-b1*error_strengths1, -b2*error_strengths2]))[0] - - assert all(np.abs(dual_correction) > 0) # just for safety, that there is a solution - - for ip in correct_ips: - mask = df_corrections[IP] == ip - assert all(np.abs((df_corrections.loc[mask, VALUE] - dual_correction)) < EPS) - - def test_dual_optics_rdts(self, tmp_path: Path): - """Test calculations given two different optics and different RDTs.""" - # Parameters ----------------------------------------------------------- - accel = 'hllhc' - - correct_ips = (1, 3) - n_magnets = 4 - n_ips = 4 - n_sides = 2 - - # Setup ---------------------------------------------------------------- - rdt1 = "f4000" - beta = 2 - error_value = 2 - optics1 = generate_pseudo_model( - accel=accel, n_ips=n_ips, n_magnets=n_magnets, betax=beta, betay=beta) - errors1 = generate_errortable( - index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets), - value=error_value, - ) - - # Optics that require same strengths with rdt2 - rdt2 = "f2002" - beta2 = 4 - error_value2 = error_value - optics2 = generate_pseudo_model( - accel=accel, n_ips=n_ips, n_magnets=n_magnets, betax=beta2, betay=beta2) - - errors2 = generate_errortable( - index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets), - value=error_value2, - ) - - # Correction --------------------------------------------------------------- - _, df_corrections = irnl_correct( - accel=accel, - optics=[optics1, optics2], - errors=[errors1, errors2], - beams=[1, 1], - rdts=[rdt1, ], - rdts2=[rdt2, ], - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - # as beta cancels out: - error_strengths = n_sides * n_magnets * error_value - - for ip in correct_ips: - mask = df_corrections[IP] == ip - corrector_strengths = sum(df_corrections.loc[mask, VALUE]) - assert abs(corrector_strengths + error_strengths) < EPS # compensation of RDT - - -class TestRDT: - def test_different_rdts(self, tmp_path: Path): - """Test that different RDTs can be corrected and only their correctors - are returned. Also checks that the corrector values are varying between RDTs - when they should. Octupole RDTs are used for this example. - """ - # Parameters ----------------------------------------------------------- - accel = 'lhc' - - correct_ips = (1, 3) - error_value = 2 - n_magnets = 4 - n_ips = 4 - - # Setup ---------------------------------------------------------------- - optics = generate_pseudo_model(accel=accel, n_ips=n_ips, n_magnets=n_magnets) - - # use different beta for correctors to avoid beta cancellation - # so that different RDTs give different corrector strengths - correctors_mask = _get_corrector_magnets_mask(optics.index) - optics.loc[correctors_mask, f"{BETA}Y"] = 3 - - errors = generate_errortable(index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets)) - errors["K3L"] = error_value - - # Correction ----------------------------------------------------------- - _, df_corrections_f4000 = irnl_correct( - accel=accel, - optics=[optics], - errors=[errors], - beams=[1], - rdts=["f4000",], - output=tmp_path / "correct4000", - feeddown=0, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - _, df_corrections_f2200 = irnl_correct( - accel=accel, - optics=[optics], - errors=[errors], - beams=[1], - rdts=["f2200",], - output=tmp_path / "correct2200", - feeddown=0, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - _, df_corrections_f2002 = irnl_correct( - accel=accel, - optics=[optics], - errors=[errors], - beams=[1], - rdts=["f2002", ], - output=tmp_path / "correct2002", - feeddown=0, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - # Testing -------------------------------------------------------------- - # Check output data --- - assert len(list(tmp_path.glob("correct*"))) == 6 - - # Check all found correctors --- - # only octupole correctors should be present - for correction in (df_corrections_f4000, df_corrections_f2200, df_corrections_f2002): - assert len(correction.index) == 4 - assert all(correction['order'] == 4) - - # f4000 and f2200 should give same values for correction - assert_frame_equal(df_corrections_f4000, df_corrections_f2200) - - # f4000 and f2002 should give different values for correction - with pytest.raises(AssertionError): - assert_series_equal(df_corrections_f4000[VALUE], df_corrections_f2002[VALUE]) - - # frames are equal apart from value, though - non_val_columns = [col for col in df_corrections_f2200.columns if col != VALUE] - assert_frame_equal(df_corrections_f4000[non_val_columns], df_corrections_f2002[non_val_columns]) - - def test_switched_beta(self): - """Test using the special RDTs* where the beta-exponents are switched.""" - # Parameters ----------------------------------------------------------- - accel = 'hllhc' - - correct_ips = (1, 3) - n_magnets = 4 - n_ips = 4 - n_sides = 2 - - # Setup ---------------------------------------------------------------- - beta = 2 - error_value = 2 - optics = generate_pseudo_model( - accel=accel, n_ips=n_ips, n_magnets=n_magnets, betax=beta, betay=beta) - errors = generate_errortable( - index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets), - value=error_value, - ) - - # Correction --------------------------------------------------------------- - _, df_corrections = irnl_correct( - accel=accel, - optics=[optics, ], - errors=[errors, ], - beams=[1, ], - rdts=["f4000", ], - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - _, df_corrections_switched = irnl_correct( - accel=accel, - optics=[optics, ], - errors=[errors, ], - beams=[1, ], - rdts=["f0004*", ], # only for testing purposes use this RDT - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - # as beta cancels out: - error_strengths = n_sides * n_magnets * error_value - - for ip in correct_ips: - mask = df_corrections_switched[IP] == ip - corrector_strengths_switched = sum(df_corrections_switched.loc[mask, VALUE]) - assert abs(corrector_strengths_switched + error_strengths) < EPS # compensation of RDT - assert_frame_equal(df_corrections, df_corrections_switched) - - -class TestFeeddown: - @pytest.mark.parametrize('x', (2, 0)) - @pytest.mark.parametrize('y', (1.5, 0)) - def test_general_feeddown(self, tmp_path: Path, x: float, y: float): - """Test feeddown functionality from decapoles to octupoles and sextupoles.""" - # Parameters ----------------------------------------------------------- - accel = 'lhc' - - correct_ips = (1, 3) - error_value = 2 - n_magnets = 4 - n_ips = 4 - n_sides = 2 - - # Setup ---------------------------------------------------------------- - optics = generate_pseudo_model( - accel=accel, n_ips=n_ips, n_magnets=n_magnets, x=x, y=y) - errors = generate_errortable( - index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets), - ) - errors["K4L"] = error_value # normal decapole errors - - # Correction --------------------------------------------------------------- - rdts = "f4000", "f3001" - _, df_corrections = irnl_correct( - accel=accel, - optics=[optics], - errors=[errors], - beams=[1], - rdts=rdts, - output=tmp_path / "correct", - feeddown=0, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - _, df_corrections_fd1 = irnl_correct( - accel=accel, - optics=[optics], - errors=[errors], - beams=[1], - rdts=rdts, - output=tmp_path / "correct_fd1", - feeddown=1, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - errors["K4L"] = 0 - errors["K5L"] = error_value # normal dodecapole errors - _, df_corrections_fd2 = irnl_correct( - accel=accel, - optics=[optics], - errors=[errors], - beams=[1], - rdts=rdts, - output=tmp_path / "correct_fd2", - feeddown=2, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - # Testing ------------------------------------------------------------------ - # Check output data --- - assert len(list(tmp_path.glob("correct*"))) == 6 - - # Check all found correctors --- - # no corrections with feed-down - assert all(df_corrections[VALUE] == 0) - - if x == 0 and y == 0: - assert all(df_corrections_fd1[VALUE] == 0) - assert all(df_corrections_fd2[VALUE] == 0) - - else: - for ip in correct_ips: - normal_oct_mask = (df_corrections[STRENGTH] == "K3L") & (df_corrections[IP] == ip) - skew_oct_mask = (df_corrections[STRENGTH] == "K3SL") & (df_corrections[IP] == ip) - dodecapole_error_sum = error_value * n_magnets * n_sides - norm_oct_corr_fd1 = sum(df_corrections_fd1.loc[normal_oct_mask, VALUE]) - skew_oct_corr_fd1 = sum(df_corrections_fd1.loc[skew_oct_mask, VALUE]) - assert abs(norm_oct_corr_fd1 + x * dodecapole_error_sum) < EPS - assert abs(skew_oct_corr_fd1 + y * dodecapole_error_sum) < EPS - - norm_oct_corr_fd2 = sum(df_corrections_fd2.loc[normal_oct_mask, VALUE]) - skew_oct_corr_fd2 = sum(df_corrections_fd2.loc[skew_oct_mask, VALUE]) - assert abs(norm_oct_corr_fd2 + 0.5 * (x**2 - y**2) * dodecapole_error_sum) < EPS - assert abs(skew_oct_corr_fd2 + x * y * dodecapole_error_sum) < EPS - - @pytest.mark.parametrize('corrector', ("a5", "b5", "a6", "b6")) - @pytest.mark.parametrize('x', (2, 0)) - @pytest.mark.parametrize('y', (2, 1.5, 0)) - def test_correct_via_feeddown(self, tmp_path: Path, x: float, y: float, corrector: str): - """Test correct RDT via feeddown from higher order corrector. - In this example: Use normal and skew deca- and dodecapole correctors - to correct for normal octupole errors (which make it easy to - just sum up over both sides). - """ - # Parameters ----------------------------------------------------------- - accel = 'hllhc' - - correct_ips = (1, 3) - error_value = 2 - n_magnets = 4 - n_ips = 4 - n_sides = 2 - - # Setup ---------------------------------------------------------------- - optics = generate_pseudo_model( - accel=accel, n_ips=n_ips, n_magnets=n_magnets, x=x, y=y) - errors = generate_errortable( - index=get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets), - ) - errors["K3L"] = error_value # octupole errors - - # Correction --------------------------------------------------------------- - rdts = {"f4000": [corrector]} - _, df_corrections = irnl_correct( - accel=accel, - optics=[optics], - errors=[errors], - beams=[1], - rdts=rdts, - output=tmp_path / "correct", - feeddown=0, - ips=correct_ips, - ignore_missing_columns=True, - iterations=1, - ) - - assert len(df_corrections.index) == len(correct_ips) * n_sides - assert all(df_corrections[FIELD] == corrector) - - coeff = {"a5": y, "b5": x, "a6": y*x, "b6": 0.5*(x**2 - y**2)}[corrector] - if coeff == 0: - # No Feed-down possible - assert all(df_corrections[VALUE] < EPS) - else: - # as beta cancels out (and is 1 anyway) - error_strengths = n_sides * n_magnets * error_value - for ip in correct_ips: - mask = df_corrections[IP] == ip - corrector_strengths = coeff * sum(df_corrections.loc[mask, VALUE]) - assert abs(corrector_strengths + error_strengths) < EPS # compensation of RDT - - -class TestUnit: - """Unit Tests for easy to test functions.""" - def test_get_integral_sign(self): - for n in range(10): - assert get_integral_sign(n, "R") == (-1)**n - assert get_integral_sign(n, "L") == 1 - - def test_list_to_str(self): - assert ABC == "".join(list2str(list(ABC)).replace(" ", "").replace("'", "").replace('"', "").split(',')) - - def test_wrong_arguments(self): - with pytest.raises(AttributeError) as e: - irnl_correct( - feddown=0, - itterations=1, - ) - assert "feddown" in str(e) - assert "itterations" in str(e) - - @pytest.mark.parametrize('beam', (1, 2, 4)) - def test_switch_signs(self, beam: int): - all_k = [f"K{order}{orientation}L" for order in range(2, MAX_N) for orientation in ("S", "")] - optics = generate_pseudo_model(n_ips=1, n_magnets=10, accel='lhc', x=10, y=5) - optics[all_k] = 1 - - errors = generate_errortable(index=optics.index, value=2.) - - # make copies as it switches in place - optics_switch = optics.copy() - errors_switch = errors.copy() - switch_signs_for_beam4([optics_switch], [errors_switch], [beam]) - - if beam != 4: - assert_frame_equal(optics, optics_switch) - assert_frame_equal(errors, errors_switch) - else: - # in madx optics only X changes sign for beam 4 ... - switch_col_optics_mask = optics.columns.isin(["X"]) - assert_frame_equal(optics.loc[:, switch_col_optics_mask], -optics_switch.loc[:, switch_col_optics_mask]) - assert_frame_equal(optics.loc[:, ~switch_col_optics_mask], optics_switch.loc[:, ~switch_col_optics_mask]) - - # ... but in the errors DX and the anti-symmetric KL change sign - switch_col_errors_mask = errors.columns.isin(["DX"] + _get_opposite_sign_beam4_kl_columns(range(MAX_N))) - assert_frame_equal(errors.loc[:, switch_col_errors_mask], -errors_switch.loc[:, switch_col_errors_mask]) - assert_frame_equal(errors.loc[:, ~switch_col_errors_mask], errors_switch.loc[:, ~switch_col_errors_mask]) - - def test_ircorrector_class(self): - # Test Corrector - a5_corrector_L1 = IRCorrector(field_component="a5", accel="lhc", ip=1, side="L") - - # Test Equality - assert a5_corrector_L1 == IRCorrector(field_component="a5", accel="lhc", ip=1, side="L") - assert a5_corrector_L1 != IRCorrector(field_component="a4", accel="lhc", ip=1, side="L") - - # Test > and < per order (important for feed-down!) - assert a5_corrector_L1 > IRCorrector(field_component="a4", accel="lhc", ip=1, side="L") - assert a5_corrector_L1 > IRCorrector(field_component="a4", accel="lhc", ip=2, side="R") - assert a5_corrector_L1 > IRCorrector(field_component="b4", accel="lhc", ip=1, side="L") - assert a5_corrector_L1 < IRCorrector(field_component="a6", accel="lhc", ip=1, side="L") - assert a5_corrector_L1 < IRCorrector(field_component="b6", accel="lhc", ip=1, side="L") - assert a5_corrector_L1 < IRCorrector(field_component="b6", accel="lhc", ip=8, side="R") - - # These ones are arbitrary, just to allow sorting/make sorting unique - assert a5_corrector_L1 > IRCorrector(field_component="b5", accel="lhc", ip=1, side="L") - assert a5_corrector_L1 < IRCorrector(field_component="a5", accel="lhc", ip=1, side="R") - assert a5_corrector_L1 < IRCorrector(field_component="a5", accel="lhc", ip=2, side="L") - - def test_ircorrector_accel(self): - a4_corrector_L1 = IRCorrector(field_component="a4", accel="lhc", ip=1, side="L") - assert "F" not in a4_corrector_L1.name - - a4_corrector_L1_hllhc = IRCorrector(field_component="a4", accel="hllhc", ip=1, side="L") - assert "F" in a4_corrector_L1_hllhc.name - assert a4_corrector_L1_hllhc.name.startswith("MCOS") - assert a4_corrector_L1 != a4_corrector_L1_hllhc - - assert IRCorrector(field_component="a4", accel="lhc", ip=2, side="L") == IRCorrector(field_component="a4", accel="hllhc", ip=2, side="L") - - assert IRCorrector(field_component="b2", accel="hllhc", ip=1, side="L").name.startswith("MCQ") - assert IRCorrector(field_component="a2", accel="hllhc", ip=1, side="L").name.startswith("MCQS") - - assert IRCorrector(field_component="b3", accel="hllhc", ip=1, side="L").name.startswith("MCS") - assert IRCorrector(field_component="a3", accel="hllhc", ip=1, side="L").name.startswith("MCSS") - - assert IRCorrector(field_component="b4", accel="hllhc", ip=1, side="L").name.startswith("MCO") - assert IRCorrector(field_component="a4", accel="hllhc", ip=1, side="L").name.startswith("MCOS") - - assert IRCorrector(field_component="b5", accel="hllhc", ip=1, side="L").name.startswith("MCD") - assert IRCorrector(field_component="a5", accel="hllhc", ip=1, side="L").name.startswith("MCDS") - - assert IRCorrector(field_component="b6", accel="hllhc", ip=1, side="L").name.startswith("MCT") - assert IRCorrector(field_component="a6", accel="hllhc", ip=1, side="L").name.startswith("MCTS") - - def test_rdt_init(self): - jklm = (1, 2, 3, 4) - - rdt = RDT(name=f"f{''.join(str(ii) for ii in jklm)}") - assert rdt.order == sum(jklm) - assert rdt.jklm == jklm - assert rdt.j == jklm[0] - assert rdt.k == jklm[1] - assert rdt.l == jklm[2] - assert rdt.m == jklm[3] - assert not rdt.swap_beta_exp - assert RDT("f1001*").swap_beta_exp - - def test_rdt_equality(self): - assert RDT("f2110") == RDT("f2110") - assert RDT("f2110") != RDT("f2110*") - - def test_rdt_sortable(self): - # sortable by order - assert RDT("f1001") < RDT("f2001") - assert RDT("f1003") > RDT("f2001") - - # arbitrary (so sorting is unique) - assert RDT("f1001") > RDT("f2000") - assert RDT("f3002") < RDT("f2003") - assert RDT("f2110") < RDT("f2110*") - assert RDT("f1001*") > RDT("f1001") - - -# Helper ------------------------------------------------------------------------------------------- - -def read_lhc_model(beam: int) -> tfs.TfsDataFrame: - """Read the LHC model from the input directory.""" - # tfs files were too big, but if generated from the `.madx` the `.tfs` can be used directly. - # E.g. for debugging purposes. - # return tfs.read_tfs(LHC_MODELS_PATH / f"twiss.lhc.b{beam}.nominal.tfs", index="NAME") - return tfs_tools.read_hdf(LHC_MODELS_PATH / f"twiss.lhc.b{beam}.nominal.hd5") - - -def generate_pseudo_model(n_ips: int, n_magnets: int, accel: str, - betax: float = 1, betay: float = 1, x: float = 0, y: float = 0) -> pd.DataFrame: - """Generate a Twiss-Like DataFrame with magnets as index and Beta and Orbit columns.""" - df = pd.DataFrame( - index=( - get_some_magnet_names(n_ips=n_ips, n_magnets=n_magnets) + - get_lhc_corrector_names(n_ips=n_ips, accelerator=accel) - ), - columns=[f"{BETA}{X}", f"{BETA}{Y}", X, Y, KEYWORD] - ) - df[f"{BETA}{X}"] = betax - df[f"{BETA}{Y}"] = betay - df[X] = x - df[Y] = y - df[KEYWORD] = MULTIPOLE - return df - - -def generate_errortable(index: pd.Series, value: float = 0) -> pd.DataFrame: - """Return DataFrame from index and KN(S)L + D[XY] columns.""" - return pd.DataFrame(value, - index=index, - columns=[f"K{n}{o}L" for n in range(MAX_N) for o in ("", "S")] + [f"D{plane}" for plane in "XY"] - ) - - -def get_some_magnet_names(n_ips: int, n_magnets: int) -> List[str]: - r"""More or less random magnet names, ending in ``[LR]\d``. - n_magnets < 26 because their names come from alphabet. - """ - return [ - f"M{name}.{number+1}{side}{ip}" - for ip in range(1, n_ips+1) - for side in "LR" - for number, name in enumerate(ABC[:n_magnets]) - ] - - -def get_lhc_corrector_names(n_ips: int, accelerator: str = 'lhc') -> List[str]: - r"""Corrector names as defined in LHC/HLLHC as the correction script is looking for them. - Need to start with ``MC`` and end in ``X.3[LR]\d`` or ``XF.3[LR][15]``""" - magnets = [ - f"MC{order}{orientation}X.3{side}{ip}" - for order in "SODT" - for orientation in ("S", "") - for side in "LR" - for ip in range(1, n_ips+1) - ] - if accelerator == 'hllhc': - magnets = [ - name.replace("X", "XF") if name[-1] in "15" else name - for name in magnets - ] - return magnets - - -def _get_ir_magnets_mask(index: pd.Index) -> pd.Series: - """Returns a boolean mask for magnets in the IR (n<=13) in the index.""" - return index.str.match(r"M.*\.(1[0123]|[0-9])[LR]\d(\.B\d)?", flags=re.IGNORECASE) - - -def _get_corrector_magnets_mask(index: pd.Index) -> pd.Series: - """Returns a boolean mask for the nonlinear corrector magnets in index.""" - return index.str.match(r"MC.*XF?\.3[LR]\d$", flags=re.IGNORECASE) - - -def _get_opposite_sign_beam4_kl_columns(range_: Iterable): - """Get the KN(S)L columns that have opposite signs for beam 4.""" - return [f"K{order}{'' if order % 2 else 'S'}L" for order in range_]