diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..0b07f99 --- /dev/null +++ b/.flake8 @@ -0,0 +1,4 @@ +[flake8] +ignore = E226,E741,W504 +exclude = .git,__pycache__ +max-line-length = 115 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..fd8c8e1 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,28 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.2.0 + hooks: + - id: no-commit-to-branch + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-ast + - id: check-case-conflict + - id: check-json + - id: check-merge-conflict + - id: check-symlinks + - id: check-xml + - id: check-yaml + exclude: '^(conda-recipe/meta.yaml)$' + - id: debug-statements + +- repo: https://gitlab.com/pycqa/flake8.git + rev: 3.8.3 + hooks: + - id: flake8 + +- repo: https://github.com/timothycrosley/isort + rev: 5.5.3 + hooks: + - id: isort diff --git a/hxrsnd/aerotech.py b/hxrsnd/aerotech.py index 802c5d7..b4dc90e 100644 --- a/hxrsnd/aerotech.py +++ b/hxrsnd/aerotech.py @@ -5,15 +5,17 @@ import os import numpy as np -from ophyd import Component as Cmp, FormattedComponent as FrmCmp -from ophyd.utils import LimitError +from ophyd import Component as Cmp +from ophyd import FormattedComponent as FrmCmp from ophyd.signal import EpicsSignal, EpicsSignalRO, Signal from ophyd.status import wait as status_wait +from ophyd.utils import LimitError -from .sndmotor import SndEpicsMotor +from .exceptions import (BadN2Pressure, MotorDisabled, MotorFaulted, + MotorStopped) from .pneumatic import PressureSwitch +from .sndmotor import SndEpicsMotor from .utils import absolute_submodule_path, as_list, stop_on_keyboardinterrupt -from .exceptions import MotorDisabled, MotorFaulted, MotorStopped, BadN2Pressure logger = logging.getLogger(__name__) @@ -283,7 +285,8 @@ def mv(self, position, wait=True, print_move=True, if wait: logger.info("Move completed for '{0}'.".format(self.desc)) else: - logger.info("Move command sent to '{0}'.".format(self.desc)) + logger.info( + "Move command sent to '{0}'.".format(self.desc)) return status # Catch all the common motor exceptions @@ -613,7 +616,8 @@ def expert_screen(self, print_msg=True): print_msg : bool, optional Prints that the screen is being launched. """ - path = absolute_submodule_path("hxrsnd/screens/motor_expert_screens.sh") + path = absolute_submodule_path( + "hxrsnd/screens/motor_expert_screens.sh") if print_msg: logger.info("Launching expert screen.") os.system("{0} {1} {2} &".format(path, self.prefix, "aerotech")) @@ -653,12 +657,12 @@ def status(self, status="", offset=0, print_status=True, newline=False, status += "{0}Faulted: {1:>20}\n".format(" "*(offset+2), str(self.faulted)) status += "{0}State: {1:>22}\n".format(" "*(offset+2), - str(self.state)) + str(self.state)) status += "{0}Position: {1:>19}\n".format(" "*(offset+2), np.round(self.wm(), 6)) status += "{0}Dial: {1:>23}\n".format(" "*(offset+2), - np.round(self.dial.get(), - 6)) + np.round(self.dial.get(), + 6)) status += "{0}Limits: {1:>21}\n".format( " "*(offset+2), str((int(self.low_limit), int(self.high_limit)))) diff --git a/hxrsnd/attocube.py b/hxrsnd/attocube.py index 873ac06..e6fd667 100644 --- a/hxrsnd/attocube.py +++ b/hxrsnd/attocube.py @@ -1,19 +1,19 @@ """ Attocube devices """ -import os import logging +import os import numpy as np -from ophyd import PositionerBase from ophyd import Component as Cmp -from ophyd.utils import LimitError +from ophyd import PositionerBase from ophyd.signal import EpicsSignal, EpicsSignalRO from ophyd.status import wait as status_wait +from ophyd.utils import LimitError -from .sndmotor import SndMotor -from .snddevice import SndDevice from .exceptions import MotorDisabled, MotorError, MotorFaulted +from .snddevice import SndDevice +from .sndmotor import SndMotor from .utils import absolute_submodule_path, as_list logger = logging.getLogger(__name__) @@ -48,6 +48,31 @@ class EccBase(SndMotor, PositionerBase): """ ECC Motor Class """ + tab_component_names = True + tab_whitelist = [ + 'check_status', + # 'check_value', + 'connected', + 'disable', + 'egu', + 'enable', + 'enabled', + 'error', + 'expert_screen', + 'high_limit', + 'limits', + 'low_limit', + 'move', + 'mv', + 'position', + 'reference', + 'referenced', + 'reset', + 'set_limits', + 'status', + 'stop', + ] + # position user_readback = Cmp(EpicsSignalRO, ":POSITION", auto_monitor=True) user_setpoint = Cmp(EpicsSignal, ":CMD:TARGET") @@ -361,7 +386,6 @@ def mv(self, position, print_move=True, wait=None, logger.warning("Cannot move - motor {0} is currently faulted. Try " "running 'motor.clear()'.".format(self.desc)) - def check_status(self, position=None): """ Checks the status of the motor to make sure it is ready to move. Checks @@ -456,7 +480,8 @@ def expert_screen(self, print_msg=True): Prints that the screen is being launched. """ # Get the absolute path to the screen - path = absolute_submodule_path("hxrsnd/screens/motor_expert_screens.sh") + path = absolute_submodule_path( + "hxrsnd/screens/motor_expert_screens.sh") if print_msg: logger.info("Launching expert screen.") os.system("{0} {1} {2} &".format(path, self.prefix, "attocube")) diff --git a/hxrsnd/bragg.py b/hxrsnd/bragg.py index 2104f69..e65e408 100644 --- a/hxrsnd/bragg.py +++ b/hxrsnd/bragg.py @@ -27,32 +27,33 @@ _manual_energy = None # Chemical Formula Aliases -alias={'Air':'N1.562O.42C.0003Ar.0094', - 'air':'N1.562O.42C.0003Ar.0094', - 'YAG':'Y3Al5O12', - 'yag':'Y3Al5O12', - 'Sapphire':'Al2O3', - 'sapphire':'Al2O3', -} +alias = {'Air': 'N1.562O.42C.0003Ar.0094', + 'air': 'N1.562O.42C.0003Ar.0094', + 'YAG': 'Y3Al5O12', + 'yag': 'Y3Al5O12', + 'Sapphire': 'Al2O3', + 'sapphire': 'Al2O3', + } # Crystal Lattice parameters (a, b, c, alpha, beta, gamma) # a,b,c in angstroms # alpha, beta, gamma in degrees lattice_parameters = { - 'Si':(5.4310205,5.4310205,5.4310205,90,90,90), + 'Si': (5.4310205, 5.4310205, 5.4310205, 90, 90, 90), } -#define units and constants +# define units and constants u = { 'ang': 1e10, } # Trigonometric Functions + def sind(A): - """ + """ Sin of an angle specified in degrees. - + Parameters ---------- A : float @@ -64,13 +65,14 @@ def sind(A): Sin of the angle. """ Arad = np.deg2rad(A) - x = np.sin(Arad) + x = np.sin(Arad) return x - + + def cosd(A): - """ + """ Cos of an angle specified in degrees. - + Parameters ---------- A : float @@ -82,13 +84,14 @@ def cosd(A): Cos of the angle. """ Arad = np.deg2rad(A) - x = np.cos(Arad) + x = np.cos(Arad) return x + def tand(A): - """ + """ Tan of an angle specified in degrees. - + Parameters ---------- A : float @@ -100,11 +103,12 @@ def tand(A): Tan of the angle. """ Arad = np.deg2rad(A) - x = np.tan(Arad) + x = np.tan(Arad) return x - + + def asind(x): - """ + """ Calculates the arcsin in degrees. Parameters @@ -118,11 +122,12 @@ def asind(x): Arcsin of the value in degrees. """ A = np.arcsin(x) - A = np.rad2deg(A) + A = np.rad2deg(A) return A - + + def acosd(x): - """ + """ Calculates the arccos in degrees. Parameters @@ -134,13 +139,14 @@ def acosd(x): ------- A : float Arccos of the value in degrees. - """ + """ A = np.arccos(x) - A = np.rad2deg(A) + A = np.rad2deg(A) return A + def atand(x): - """ + """ Calculates the arctan in degrees. Parameters @@ -152,22 +158,23 @@ def atand(x): ------- A : float Arctan of the value in degrees. - """ + """ A = np.arctan(x) - A = np.rad2deg(A) + A = np.rad2deg(A) return A # Frequency, wavelength and energy conversions + def lam(E, o=0): - """ + """ Computes photon wavelength in m - + Parameters ---------- E : float The input energy in eV or KeV - + o : float, optional Set o to 0 if working at sub-100 eV energies @@ -177,21 +184,22 @@ def lam(E, o=0): Input energy converted to wavelength """ if o: - E=E + E = E else: - E=eV(E) - lam=(12398.4/E)/u['ang'] + E = eV(E) + lam = (12398.4/E)/u['ang'] return lam + def lam2E(l): """ Computes photon energy in eV Parameters - ---------- + ---------- l : float Photon wavelength in m - + Returns ------- E : float @@ -200,25 +208,27 @@ def lam2E(l): E = 12398.4/(l*u['ang']) return E + def lam2f(l): """ Computes the photon frequency in Hz Parameters - ---------- + ---------- l : float Photon wavelength in m - + Returns ------- f : float Frequency in Hz """ f = 299792458/l - return f + return f # Higher level functions + def eV(E): """ Returns photon energy in eV if specified in eV or KeV. Assumes that any @@ -232,12 +242,13 @@ def eV(E): Returns ------- E : float - Energy converted to eV from KeV + Energy converted to eV from KeV """ if E < 100: E *= 1000.0 return float(E) + def check_id(ID): """ Checks to see if you are using an alias. Returns the chemical formula @@ -256,6 +267,7 @@ def check_id(ID): except KeyError: return ID + def get_e(energy=None, correct_ev=True): """ Get working energy @@ -283,9 +295,10 @@ def get_e(energy=None, correct_ev=True): en = eV(en) return en + def d_space(ID, hkl): """ - Computes the d spacing (m) of the specified material and reflection + Computes the d spacing (m) of the specified material and reflection Parameters ---------- @@ -321,13 +334,14 @@ def d_space(ID, hkl): sg = sind(gamma) invdsqr = 1 / (1.+2.*ca*cb*cg-ca**2.-cb**2.-cg**2.) * \ - (h**2.*sa**2./a**2. + k**2.*sb**2./b**2. + l**2.*sg**2./c**2. + - 2.*h*k*(ca*cb-cg)/a/b+2.*k*l*(cb*cg-ca)/b/c+2.*h*l*(ca*cg-cb)/a/c) - + (h**2.*sa**2./a**2. + k**2.*sb**2./b**2. + l**2.*sg**2./c**2. + + 2.*h*k*(ca*cb-cg)/a/b+2.*k*l*(cb*cg-ca)/b/c+2.*h*l*(ca*cg-cb)/a/c) + d = invdsqr**-0.5 return d -def bragg_angle(E=None, ID="Si", hkl=(2,2,0)): + +def bragg_angle(E=None, ID="Si", hkl=(2, 2, 0)): """ Computes the Bragg angle (deg) of the specified material, reflection and photon energy. @@ -354,7 +368,8 @@ def bragg_angle(E=None, ID="Si", hkl=(2,2,0)): two_theta = asind(lam(E)/2/d) return two_theta -def bragg_energy(theta, ID="Si", hkl=(2,2,0)): + +def bragg_energy(theta, ID="Si", hkl=(2, 2, 0)): """ Computes the photon energy that satisfies the Bragg condition of the specified material, reflection and theta angle. @@ -363,7 +378,7 @@ def bragg_energy(theta, ID="Si", hkl=(2,2,0)): ---------- theta : float, optional The scattering angle in degrees - + ID : str, optional Chemical fomula : 'Si' @@ -381,6 +396,7 @@ def bragg_energy(theta, ID="Si", hkl=(2,2,0)): E = lam2E(l) return E + def snd_L(E1, E2, delay, gap=55): """ Calculates the theta angles of the towers and the delay length based on the @@ -390,7 +406,7 @@ def snd_L(E1, E2, delay, gap=55): ---------- E1 : float Energy of the delay branch in eV - + E2 : float Energy of the channel-cut branch in eV @@ -404,7 +420,7 @@ def snd_L(E1, E2, delay, gap=55): ------- theta_L : float The necessary angle of the delay branch in degrees. - + theta_cc : float The necessary angle of the channel-cut branch in degrees. @@ -412,8 +428,8 @@ def snd_L(E1, E2, delay, gap=55): The necessary length of the delay crystals in mm. """ cl = 0.3 - theta_L = bragg_angle('Si',(2,2,0),E1) - theta_cc= bragg_angle('Si',(2,2,0),E2) + theta_L = bragg_angle('Si', (2, 2, 0), E1) + theta_cc = bragg_angle('Si', (2, 2, 0), E2) # gap is the distance between the two faces of the channel cut crystal L = (delay*cl/2.+gap*(1-cosd(2*theta_cc))/sind(theta_cc))/(1-cosd( 2*theta_L)) @@ -423,6 +439,7 @@ def snd_L(E1, E2, delay, gap=55): logger.info("t2.th=t3.th = {} degree".format(theta_cc)) return theta_L, theta_cc, L + def snd_diag(E1, E2, delay, gap=55): """ Calculates the positions of the middle diagnostics of the system based on @@ -432,7 +449,7 @@ def snd_diag(E1, E2, delay, gap=55): ---------- E1 : float Energy of the delay branch in eV - + E2 : float Energy of the channel-cut branch in eV @@ -446,14 +463,14 @@ def snd_diag(E1, E2, delay, gap=55): ------- dd_x : float The necessary position of the middle delay diagnostic in mm - + dcc_x : float The necessary position of the middle channel-cut diagnostic in mm """ cl = 0.3 # speed of light - theta_L = bragg_angle('Si',(2,2,0),E1) - theta_cc= bragg_angle('Si',(2,2,0),E2) + theta_L = bragg_angle('Si', (2, 2, 0), E1) + theta_cc = bragg_angle('Si', (2, 2, 0), E2) dcc_x = 2*cosd(theta_cc)*gap L = (delay*cl/2.+gap*(1-cosd(2*theta_cc))/sind(theta_cc))/(1-cosd( 2*theta_L)) @@ -462,6 +479,7 @@ def snd_diag(E1, E2, delay, gap=55): logger.info("dcc.x = {}".format(dcc_x)) return dd_x, dcc_x + def snd_delay(E1, E2, L, gap=55): """ Calculates the delay of the system based on the inputted energies and the @@ -471,7 +489,7 @@ def snd_delay(E1, E2, L, gap=55): ---------- E1 : float Energy of the delay branch in eV - + E2 : float Energy of the channel-cut branch in eV @@ -484,9 +502,8 @@ def snd_delay(E1, E2, L, gap=55): The delay of the system in picoseconds """ cl = 0.3 - theta_L = bragg_angle('Si',(2,2,0),E1) - theta_cc= bragg_angle('Si',(2,2,0),E2) + theta_L = bragg_angle('Si', (2, 2, 0), E1) + theta_cc = bragg_angle('Si', (2, 2, 0), E2) delay = 2*(L*(1-cosd(2*theta_L)) - gap*(1-cosd(2*theta_cc))/sind( - theta_cc))/cl + theta_cc))/cl return delay - diff --git a/hxrsnd/diode.py b/hxrsnd/diode.py index 0b4b014..70024d1 100644 --- a/hxrsnd/diode.py +++ b/hxrsnd/diode.py @@ -5,11 +5,12 @@ import numpy as np from ophyd import EpicsSignalRO -from ophyd.device import Component as C, FormattedComponent as FC +from ophyd.device import Component as C +from ophyd.device import FormattedComponent as FC from pcdsdevices.areadetector.detectors import PCDSAreaDetector -from .snddevice import SndDevice from .aerotech import DiodeAero +from .snddevice import SndDevice logger = logging.getLogger(__name__) @@ -18,7 +19,7 @@ class DiodeBase(SndDevice): """ Base class for the diode. """ - pass + pass class HamamatsuDiode(DiodeBase): @@ -32,9 +33,14 @@ class HamamatsuXMotionDiode(SndDevice): """ Class for the Hamamatsu diode but with an X motor """ + + tab_component_names = True + tab_whitelist = ['block', 'blocked', 'unblock'] + diode = C(HamamatsuDiode, ":DIODE") x = C(DiodeAero, ":X") - def __init__(self, prefix, name=None, block_pos=5, unblock_pos=0, *args, + + def __init__(self, prefix, name=None, block_pos=5, unblock_pos=0, *args, block_atol=0.001, desc=None, **kwargs): super().__init__(prefix, name=name, *args, **kwargs) self.block_pos = block_pos @@ -49,12 +55,12 @@ def blocked(self): """ if np.isclose(self.x.position, self.block_pos, atol=self.block_atol): return True - elif np.isclose(self.x.position, self.unblock_pos, + elif np.isclose(self.x.position, self.unblock_pos, atol=self.block_atol): return False else: - return "Unknown" - + return "Unknown" + def block(self, *args, **kwargs): """ Moves the diode into the blocking position. @@ -75,7 +81,7 @@ def block(self, *args, **kwargs): Returns ------- - status : MoveStatus + status : MoveStatus Status object for the move. """ return self.x.mv(self.block_pos, *args, **kwargs) @@ -100,7 +106,7 @@ def unblock(self, *args, **kwargs): Returns ------- - status : MoveStatus + status : MoveStatus Status object for the move. """ return self.x.mv(self.unblock_pos, *args, **kwargs) @@ -110,12 +116,15 @@ class HamamatsuXYMotionCamDiode(SndDevice): """ Class for the Hamamatsu diode but with X and Y motors """ + tab_component_names = True + tab_whitelist = ['block', 'blocked', 'unblock'] + diode = C(HamamatsuDiode, ":DIODE") x = C(DiodeAero, ":X") y = C(DiodeAero, ":Y") cam = C(PCDSAreaDetector, ":CAM:", lazy=True) - def __init__(self, prefix, name=None, block_pos=5, pos_func=None, + def __init__(self, prefix, name=None, block_pos=5, pos_func=None, block_atol=0.001, desc=None, *args, **kwargs): super().__init__(prefix, name=name, *args, **kwargs) self.block_pos = block_pos @@ -135,10 +144,10 @@ def blocked(self): Returns 'Unknown' if it is far from either of those positions. """ if callable(self.pos_func): - if np.isclose(self.x.position, self.pos_func()+self.block_pos, + if np.isclose(self.x.position, self.pos_func()+self.block_pos, atol=self.block_atol): return True - elif np.isclose(self.x.position, self.pos_func(), + elif np.isclose(self.x.position, self.pos_func(), atol=self.block_atol): return False return "Unknown" @@ -146,7 +155,7 @@ def blocked(self): def block(self, *args, **kwargs): """ Moves the diode by the blocking position defined by the position - function plus the block position. + function plus the block position. Parameters ---------- @@ -164,7 +173,7 @@ def block(self, *args, **kwargs): Returns ------- - status : MoveStatus + status : MoveStatus Status object for the move. """ # Move to the blocked position if we aren't already there @@ -196,7 +205,7 @@ def unblock(self, *args, **kwargs): Returns ------- - status : MoveStatus + status : MoveStatus Status object for the move. """ # Move to the blocked position if we aren't already there @@ -223,17 +232,19 @@ class DiodeIO(SndDevice): name : str Name of Wave8 device """ - peakA = FC(EpicsSignalRO,'{self.prefix}:_peakA_{self.channel}') - peakT = FC(EpicsSignalRO,'{self.prefix}:_peakT_{self.channel}') + tab_component_names = True + + peakA = FC(EpicsSignalRO, '{self.prefix}:_peakA_{self.channel}') + peakT = FC(EpicsSignalRO, '{self.prefix}:_peakT_{self.channel}') def __init__(self, prefix, channel, name, *, read_attrs=None, **kwargs): - #Store the channel + # Store the channel self.channel = channel - #Default read attributes + # Default read attributes if read_attrs is None: read_attrs = ['peakT'] - #Initialize device + # Initialize device super().__init__(prefix, name=name, read_attrs=read_attrs, **kwargs) @@ -243,16 +254,17 @@ class Wave8(SndDevice): A system of sixteen diodes, each with two peaks; A and T. """ - diode_0 = C(DiodeIO, '', channel=0, name='Diode 0') - diode_1 = C(DiodeIO, '', channel=1, name='Diode 1') - diode_2 = C(DiodeIO, '', channel=2, name='Diode 2') - diode_3 = C(DiodeIO, '', channel=3, name='Diode 3') - diode_4 = C(DiodeIO, '', channel=4, name='Diode 4') - diode_5 = C(DiodeIO, '', channel=5, name='Diode 5') - diode_6 = C(DiodeIO, '', channel=6, name='Diode 6') - diode_7 = C(DiodeIO, '', channel=7, name='Diode 7') - diode_8 = C(DiodeIO, '', channel=8, name='Diode 8') - diode_9 = C(DiodeIO, '', channel=9, name='Diode 9') + tab_component_names = True + diode_0 = C(DiodeIO, '', channel=0, name='Diode 0') + diode_1 = C(DiodeIO, '', channel=1, name='Diode 1') + diode_2 = C(DiodeIO, '', channel=2, name='Diode 2') + diode_3 = C(DiodeIO, '', channel=3, name='Diode 3') + diode_4 = C(DiodeIO, '', channel=4, name='Diode 4') + diode_5 = C(DiodeIO, '', channel=5, name='Diode 5') + diode_6 = C(DiodeIO, '', channel=6, name='Diode 6') + diode_7 = C(DiodeIO, '', channel=7, name='Diode 7') + diode_8 = C(DiodeIO, '', channel=8, name='Diode 8') + diode_9 = C(DiodeIO, '', channel=9, name='Diode 9') diode_10 = C(DiodeIO, '', channel=10, name='Diode 10') diode_11 = C(DiodeIO, '', channel=11, name='Diode 11') diode_12 = C(DiodeIO, '', channel=12, name='Diode 12') diff --git a/hxrsnd/exceptions.py b/hxrsnd/exceptions.py index e1466e5..6417266 100644 --- a/hxrsnd/exceptions.py +++ b/hxrsnd/exceptions.py @@ -10,6 +10,7 @@ # Exceptions + class InputError(Exception): """ Exception when the inputs to a function or method are invalid. @@ -40,7 +41,7 @@ class MotorDisabled(SndException): class MotorFaulted(SndException): """ - Exception raised when an action requiring the motor not be faulted is + Exception raised when an action requiring the motor not be faulted is requested. """ pass @@ -48,7 +49,7 @@ class MotorFaulted(SndException): class MotorError(SndException): """ - Exception raised when an action requiring the motor not have an error is + Exception raised when an action requiring the motor not have an error is requested. """ pass @@ -56,7 +57,7 @@ class MotorError(SndException): class MotorStopped(SndException): """ - Exception raised when an action requiring the motor to not be stopped is + Exception raised when an action requiring the motor to not be stopped is requested. """ pass @@ -64,7 +65,7 @@ class MotorStopped(SndException): class BadN2Pressure(SndException): """ - Exception raised when an action requiring the N2 pressure be good is + Exception raised when an action requiring the N2 pressure be good is requested with a bad pressure. """ pass diff --git a/hxrsnd/macromotor.py b/hxrsnd/macromotor.py index dc3664b..11643da 100644 --- a/hxrsnd/macromotor.py +++ b/hxrsnd/macromotor.py @@ -6,22 +6,19 @@ import logging from functools import reduce -import numpy as np +from ophyd.device import Component as Cmp from ophyd.signal import AttributeSignal from ophyd.sim import NullStatus -from ophyd.device import Component as Cmp -from ophyd.utils import LimitError from ophyd.status import wait as status_wait - +from ophyd.utils import LimitError from pcdsdevices.areadetector.detectors import PCDSAreaDetector from pswalker.utils import field_prepend -from .snddevice import SndDevice -from .sndmotor import SndMotor, CalibMotor -from .utils import flatten, nan_if_no_parent from .bragg import bragg_angle, cosd, sind -from .exceptions import (MotorDisabled, MotorFaulted, MotorStopped, - BadN2Pressure) +from .exceptions import (BadN2Pressure, MotorDisabled, MotorFaulted, + MotorStopped) +from .sndmotor import CalibMotor, SndMotor +from .utils import flatten, nan_if_no_parent logger = logging.getLogger(__name__) @@ -34,12 +31,16 @@ class MacroBase(SndMotor): c = 0.299792458 # mm/ps gap = 55 # m + tab_component_names = True + tab_whitelist = ['aligned', 'move', 'position', 'set', 'set_position', + 'status', 'wait', 'c', 'gap'] + # Set add_prefix to be blank so cmp doesnt append the parent prefix readback = Cmp(AttributeSignal, "position", add_prefix='') def __init__(self, prefix, name=None, read_attrs=None, *args, **kwargs): read_attrs = read_attrs or ["readback"] - super().__init__(prefix, name=name, read_attrs=read_attrs, *args, + super().__init__(prefix, name=name, read_attrs=read_attrs, *args, **kwargs) self._status = NullStatus() @@ -57,7 +58,7 @@ def __init__(self, prefix, name=None, read_attrs=None, *args, **kwargs): def position(self): """ Returns the current position - + Returns ------- energy : float @@ -76,9 +77,9 @@ def _verify_move(self, *args, **kwargs): def _check_towers_and_diagnostics(self, *args, **kwargs): """ - Checks the towers in the delay line and the channel cut line to make - sure they can be moved. Depending on if E1, E2 or delay are entered, - the delay line energy motors, channel cut line energy motors or the + Checks the towers in the delay line and the channel cut line to make + sure they can be moved. Depending on if E1, E2 or delay are entered, + the delay line energy motors, channel cut line energy motors or the delay stages will be checked for faults, if they are enabled, if the requested energy requires moving beyond the limits and if the pressure in the tower N2 is good. To be overrided in subclasses. @@ -89,7 +90,7 @@ def _move_towers_and_diagnostics(self, *args, **kwargs): """ Moves all the tower and diagnostic motors according to the inputted energies and delay. If any of the inputted information is set to None - then that component of the move is ignored. To be overrided in + then that component of the move is ignored. To be overrided in subclasses. """ pass @@ -109,14 +110,15 @@ def _add_verify_header(self, string=""): string : str String with the header added to it. """ - header = "\n{:^15}|{:^15}|{:^15}".format("Motor", "Current", "Proposed") + header = "\n{:^15}|{:^15}|{:^15}".format( + "Motor", "Current", "Proposed") header += "\n" + "-"*len(header) return string + header def wait(self, status=None): """ Waits for the status objects to complete the motions. - + Parameters ---------- status : list or None, optional @@ -128,7 +130,7 @@ def wait(self, status=None): status_wait(status) logger.info("Move completed.") - def set(self, position, wait=True, verify_move=True, ret_status=True, + def set(self, position, wait=True, verify_move=True, ret_status=True, use_diag=True, use_calib=True): """ Moves the macro-motor to the inputted position, optionally waiting for @@ -138,11 +140,11 @@ def set(self, position, wait=True, verify_move=True, ret_status=True, ---------- position : float Position to move the macro-motor to. - + wait : bool, optional Wait for each motor to complete the motion before returning the console. - + verify_move : bool, optional Prints the current system state and a proposed system state and then prompts the user to accept the proposal before changing the @@ -156,7 +158,7 @@ def set(self, position, wait=True, verify_move=True, ret_status=True, use_calib : bool, optional Use the configurated calibration parameters - + Returns ------- status : list @@ -176,11 +178,11 @@ def move(self, position, wait=True, verify_move=True, use_diag=True, ---------- position : float Position to move the macro-motor to. - + wait : bool, optional Wait for each motor to complete the motion before returning the console. - + verify_move : bool, optional Prints the current system state and a proposed system state and then prompts the user to accept the proposal before changing the @@ -188,20 +190,20 @@ def move(self, position, wait=True, verify_move=True, use_diag=True, ret_status : bool, optional Return the status object of the move. - + use_diag : bool, optional Move the daignostic motors to align with the beam. use_calib : bool, optional Use the configurated calibration parameters - + Returns ------- status : list List of status objects for each motor that was involved in the move. """ # Check the towers and diagnostics - diag_pos = self._check_towers_and_diagnostics(position, + diag_pos = self._check_towers_and_diagnostics(position, use_diag=use_diag) # Prompt the user about the move before making it @@ -219,20 +221,20 @@ def move(self, position, wait=True, verify_move=True, use_diag=True, # Wait for all the motors to finish moving if wait: self.wait(status) - + return status def mv(self, position, wait=True, verify_move=True, use_diag=True, *args, **kwargs): """ - Moves the macro parameters to the inputted positions. For energy, this - moves the energies of both lines, energy1 moves just the delay line, - energy2 moves just the channel cut line, and delay moves just the delay + Moves the macro parameters to the inputted positions. For energy, this + moves the energies of both lines, energy1 moves just the delay line, + energy2 moves just the channel cut line, and delay moves just the delay motors. - + mv() is different from move() by catching all the common exceptions that - this motor can raise and just raises a logger warning. Therefore if - building higher level functionality, do not use this method and use + this motor can raise and just raises a logger warning. Therefore if + building higher level functionality, do not use this method and use move() instead, otherwise none of the exceptions will propagate beyond this method. @@ -244,7 +246,7 @@ def mv(self, position, wait=True, verify_move=True, use_diag=True, wait : bool, optional Wait for each motor to complete the motion before returning the console. - + verify_move : bool, optional Prints the current system state and a proposed system state and then prompts the user to accept the proposal before changing the @@ -255,12 +257,12 @@ def mv(self, position, wait=True, verify_move=True, use_diag=True, use_diag : bool, optional Move the daignostic motors to align with the beam. - + Exceptions Caught ----------------- LimitError Error raised when the inputted position is beyond the soft limits. - + MotorDisabled Error raised if the motor is disabled and move is requested. @@ -279,9 +281,9 @@ def mv(self, position, wait=True, verify_move=True, use_diag=True, List of status objects for each motor that was involved in the move. """ try: - return self.move(position, wait=wait, verify_move=verify_move, - use_diag=use_diag, *args, **kwargs) - # Catch all the common motor exceptions + return self.move(position, wait=wait, verify_move=verify_move, + use_diag=use_diag, *args, **kwargs) + # Catch all the common motor exceptions except LimitError: logger.warning("Requested move is outside the soft limits") except MotorDisabled: @@ -299,8 +301,8 @@ def mv(self, position, wait=True, verify_move=True, use_diag=True, def set_position(self, *args, **kwargs): """ - Sets the current positions of the motors in the towers to be the - calculated positions based on the inputted energies or delay. To be + Sets the current positions of the motors in the towers to be the + calculated positions based on the inputted energies or delay. To be overrided in subclasses. """ pass @@ -308,7 +310,7 @@ def set_position(self, *args, **kwargs): @property def aligned(self, *args, **kwargs): """ - Checks to see if the towers are in aligned in energy and delay. To be + Checks to see if the towers are in aligned in energy and delay. To be overrided in subclasses. """ pass @@ -334,17 +336,17 @@ def _confirm_move(self, string): return True else: logger.debug("\nMove confirmed.") - return False - + return False + def status(self, status="", offset=0, print_status=True, newline=False): """ Returns the status of the device. - + Parameters ---------- status : str, optional The string to append the status to. - + offset : int, optional Amount to offset each line of the status. @@ -353,13 +355,13 @@ def status(self, status="", offset=0, print_status=True, newline=False): """ try: status += "\n{0}{1:<16} {2:^16}".format( - " "*offset, - self.desc+":", + " "*offset, + self.desc+":", self.position) except TypeError: status += "\n{0}{1:<16} {2:^}".format( - " "*offset, - self.desc+":", + " "*offset, + self.desc+":", str(self.position)) if newline: @@ -367,13 +369,14 @@ def status(self, status="", offset=0, print_status=True, newline=False): if print_status: logger.info(status) else: - return status - + return status + class DelayTowerMacro(MacroBase): """ Class for the delay tower macros """ + def _delay_to_length(self, delay, theta1=None, theta2=None): """ Converts the inputted delay to the lengths on the delay arm linear @@ -401,26 +404,25 @@ def _delay_to_length(self, delay, theta1=None, theta2=None): theta2 = theta2 or self.parent.theta2 # Length calculation - length = ((delay*self.c/2 + self.gap*(1 - cosd(2*theta2)) / - sind(theta2)) / (1 - cosd(2*theta1))) + length = ((delay*self.c/2 + self.gap*(1 - cosd(2*theta2)) / sind(theta2)) / (1 - cosd(2*theta1))) return length def _get_delay_diagnostic_position(self, E1=None, E2=None, delay=None): """ - Gets the position the delay diagnostic needs to move to based on the - inputted energies and delay or the current bragg angles and current + Gets the position the delay diagnostic needs to move to based on the + inputted energies and delay or the current bragg angles and current delay of the system. Parameters ---------- E1 : float or None, optional - Energy in eV to use for the delay line. Uses the current energy if + Energy in eV to use for the delay line. Uses the current energy if None is inputted. E2 : float or None, optional - Energy in eV to use for the channel cut line. Uses the current + Energy in eV to use for the channel cut line. Uses the current energy if None is inputted. - + delay : float or None, optional Delay in picoseconds to use for the calculation. Uses current delay if None is inputted. @@ -428,7 +430,7 @@ def _get_delay_diagnostic_position(self, E1=None, E2=None, delay=None): Returns ------- position : float - Position in mm the delay diagnostic should move to given the + Position in mm the delay diagnostic should move to given the inputted parameters. """ # Use current bragg angle @@ -447,11 +449,11 @@ def _get_delay_diagnostic_position(self, E1=None, E2=None, delay=None): else: theta2 = bragg_angle(E=E2) length = self._delay_to_length(delay, theta1=theta1, theta2=theta2) - + # Calculate position the diagnostic needs to move to position = -length*sind(2*theta1) return position - + class DelayMacro(CalibMotor, DelayTowerMacro): """ @@ -466,7 +468,7 @@ class DelayMacro(CalibMotor, DelayTowerMacro): # ---------- # rtol : float, optional # Relative tolerance to use when comparing value differences. - + # atol : float, optional # Absolute tolerance to use when comparing value differences. @@ -488,11 +490,11 @@ class DelayMacro(CalibMotor, DelayTowerMacro): def __init__(self, prefix, name=None, *args, **kwargs): super().__init__(prefix, name=name, *args, **kwargs) if self.parent: - self.motor_fields=['readback'] - self.calib_motors=[self.parent.t1.chi1, self.parent.t1.y1] - self.calib_fields=[field_prepend('user_readback', calib_motor) - for calib_motor in self.calib_motors] - self.detector_fields=['stats2_centroid_x', 'stats2_centroid_y',] + self.motor_fields = ['readback'] + self.calib_motors = [self.parent.t1.chi1, self.parent.t1.y1] + self.calib_fields = [field_prepend('user_readback', calib_motor) + for calib_motor in self.calib_motors] + self.detector_fields = ['stats2_centroid_x', 'stats2_centroid_y', ] def _length_to_delay(self, L=None, theta1=None, theta2=None): """ @@ -504,7 +506,7 @@ def _length_to_delay(self, L=None, theta1=None, theta2=None): ---------- L : float or None, optional Position of the linear delay stage. - + theta1 : float or None, optional Bragg angle the delay line is set to maximize. @@ -522,17 +524,16 @@ def _length_to_delay(self, L=None, theta1=None, theta2=None): theta2 = theta2 or self.parent.theta2 # Delay calculation - delay = (2*(L*(1 - cosd(2*theta1)) - self.gap*(1 - cosd(2*theta2)) / - sind(theta2))/self.c) + delay = (2*(L*(1 - cosd(2*theta1)) - self.gap*(1 - cosd(2*theta2)) / sind(theta2))/self.c) return delay def _verify_move(self, delay, string="", use_header=True, confirm_move=True, use_diag=True): """ Prints a summary of the current positions and the proposed positions - of the delay motors based on the inputs. It then prompts the user to + of the delay motors based on the inputs. It then prompts the user to confirm the move. - + Parameters ---------- delay : float @@ -549,7 +550,7 @@ def _verify_move(self, delay, string="", use_header=True, confirm_move=True, use_diag : bool, optional Add the diagnostic motor to the list of motors to verify. - + Returns ------- allowed : bool @@ -579,9 +580,9 @@ def _verify_move(self, delay, string="", use_header=True, confirm_move=True, def _check_towers_and_diagnostics(self, delay, use_diag=True): """ - Checks the staus of the delay stages on the delay towers. Raises the + Checks the staus of the delay stages on the delay towers. Raises the basic motor errors if any of the motors are not ready to be moved. - + Parameters ---------- delay : float @@ -589,12 +590,12 @@ def _check_towers_and_diagnostics(self, delay, use_diag=True): use_diag : bool, optional Check the position of the diagnostic motor. - + Raises ------ LimitError Error raised when the inputted position is beyond the soft limits. - + MotorDisabled Error raised if the motor is disabled and move is requested. @@ -629,7 +630,7 @@ def _move_towers_and_diagnostics(self, delay, position_dd, use_diag=True): """ Moves the delay stages and delay diagnostic according to the inputted delay and diagnostic position. - + Parameters ---------- delay : float @@ -640,21 +641,21 @@ def _move_towers_and_diagnostics(self, delay, position_dd, use_diag=True): use_diag : bool, optional Move the daignostic motors to align with the beam. - + Returns ------- status : list Nested list of status objects from each tower. - """ + """ # Get the desired length for the delay stage length = self._delay_to_length(delay) # Move the delay stages - status = [tower.set_length(length, wait=False, check_status=False) - for tower in self._delay_towers] + status = [tower.set_length(length, wait=False, check_status=False) + for tower in self._delay_towers] # Log the delay change logger.debug("Setting delay to {0}.".format(delay)) - + if use_diag: # Move the delay diagnostic to the inputted position status += [self.parent.dd.x.move(position_dd, wait=False)] @@ -662,7 +663,7 @@ def _move_towers_and_diagnostics(self, delay, position_dd, use_diag=True): # Perform the compensation if self.has_calib and self.use_calib: status.append(self._calib_compensate(delay)) - + return status @property @@ -670,25 +671,25 @@ def _move_towers_and_diagnostics(self, delay, position_dd, use_diag=True): def position(self): """ Returns the current energy of the channel cut line. - + Returns ------- energy : float Energy the channel cut line is set to in eV. """ return self._length_to_delay() - + def set_position(self, delay=None, print_set=True, use_diag=True, verify_move=True): """ - Sets the current positions of the motors in the towers to be the + Sets the current positions of the motors in the towers to be the calculated positions based on the inputted energies or delay. - + Parameters ---------- delay : float or None, optional Delay to set the delay stages to. - + print_set : bool, optional Print a message to the console that the set has been made. @@ -696,21 +697,21 @@ def set_position(self, delay=None, print_set=True, use_diag=True, Prints the current system state and a proposed system state and then prompts the user to accept the proposal before changing the system. - """ + """ # Prompt the user about the move before making it if verify_move and self._verify_move(delay, use_diag=use_diag): return length = self._delay_to_length(delay) - + # Set the position of each delay stage for tower in self._delay_towers: tower.L.set_position(length, print_set=False) - + # Diagnostic if use_diag: position_dd = self._get_delay_diagnostic_position(delay=delay) - self.parent.dd.x.set_position(position_dd, print_set=False) + self.parent.dd.x.set_position(position_dd, print_set=False) if print_set: logger.info("Setting positions for delay to {0}.".format(delay)) @@ -729,7 +730,7 @@ class Energy1Macro(DelayTowerMacro): # ---------- # rtol : float, optional # Relative tolerance to use when comparing value differences. - + # atol : float, optional # Absolute tolerance to use when comparing value differences. @@ -743,7 +744,7 @@ class Energy1Macro(DelayTowerMacro): # is_aligned = np.isclose(t1.energy, t4.energy, atol=atol, rtol=rtol) # if not is_aligned: # logger.warning("Energy mismatch between t1 and t4. t1: {0:.3f} eV," - # " t4: {1:.3f} eV".format(t1.energy, t4.energy)) + # " t4: {1:.3f} eV".format(t1.energy, t4.energy)) # return is_aligned def _length_to_delay(self, L=None, theta1=None, theta2=None): @@ -756,7 +757,7 @@ def _length_to_delay(self, L=None, theta1=None, theta2=None): ---------- L : float or None, optional Position of the linear delay stage. - + theta1 : float or None, optional Bragg angle the delay line is set to maximize. @@ -774,8 +775,7 @@ def _length_to_delay(self, L=None, theta1=None, theta2=None): theta2 = theta2 or self.parent.theta2 # Delay calculation - delay = (2*(L*(1 - cosd(2*theta1)) - self.gap*(1 - cosd(2*theta2)) / - sind(theta2))/self.c) + delay = (2*(L*(1 - cosd(2*theta1)) - self.gap*(1 - cosd(2*theta2)) / sind(theta2))/self.c) return delay def _verify_move(self, E1, string="", use_header=True, confirm_move=True, @@ -784,7 +784,7 @@ def _verify_move(self, E1, string="", use_header=True, confirm_move=True, Prints a summary of the current positions and the proposed positions of the motors based on the inputs. It then prompts the user to confirm the move. - + Parameters ---------- E1 : float @@ -801,7 +801,7 @@ def _verify_move(self, E1, string="", use_header=True, confirm_move=True, use_diag : bool, optional Add the diagnostic motor to the list of motors to verify. - + Returns ------- allowed : bool @@ -831,9 +831,9 @@ def _verify_move(self, E1, string="", use_header=True, confirm_move=True, def _check_towers_and_diagnostics(self, E1, use_diag=True): """ - Checks the staus of the delay tower energy motors. Raises the basic + Checks the staus of the delay tower energy motors. Raises the basic motor errors if any of the motors are not ready to be moved - + Parameters ---------- E1 : float @@ -841,12 +841,12 @@ def _check_towers_and_diagnostics(self, E1, use_diag=True): use_diag : bool, optional Check the position of the diagnostic motor. - + Raises ------ LimitError Error raised when the inputted position is beyond the soft limits. - + MotorDisabled Error raised if the motor is disabled and move is requested. @@ -878,18 +878,18 @@ def _move_towers_and_diagnostics(self, E1, position_dd, use_diag=True): """ Moves the delay line energy motors and diagnostic to the inputted energy and diagnostic position. - + Parameters ---------- E1 : float Energy to set the delay line to. - + position_dd : float Position to move the delay diagnostic to. use_diag : bool, optional Move the daignostic motors to align with the beam. - + Returns ------- status : list @@ -897,7 +897,7 @@ def _move_towers_and_diagnostics(self, E1, position_dd, use_diag=True): """ # Move the towers to the specified energy status = [tower.set_energy(E1, wait=False, check_status=False) for - tower in self._delay_towers] + tower in self._delay_towers] # Log the energy change logger.debug("Setting E1 to {0}.".format(E1)) @@ -909,20 +909,20 @@ def _move_towers_and_diagnostics(self, E1, position_dd, use_diag=True): def _get_delay_diagnostic_position(self, E1=None): """ - Gets the position the delay diagnostic needs to move to based on the - inputted energies and delay or the current bragg angles and current + Gets the position the delay diagnostic needs to move to based on the + inputted energies and delay or the current bragg angles and current delay of the system. Parameters ---------- E1 : float or None, optional - Energy in eV to use for the delay line. Uses the current energy if + Energy in eV to use for the delay line. Uses the current energy if None is inputted. Returns ------- position : float - Position in mm the delay diagnostic should move to given the + Position in mm the delay diagnostic should move to given the inputted parameters. """ return super()._get_delay_diagnostic_position(E1=E1) @@ -932,7 +932,7 @@ def _get_delay_diagnostic_position(self, E1=None): def position(self): """ Returns the current energy of the delay line. - + Returns ------- energy : float @@ -943,14 +943,14 @@ def position(self): def set_position(self, E1=None, print_set=True, verify_move=True, use_diag=True): """ - Sets the current positions of the motors in the towers to be the + Sets the current positions of the motors in the towers to be the calculated positions based on the inputted energies or delay. - + Parameters ---------- E1 : float or None, optional Energy to set the delay line to. - + print_set : bool, optional Print a message to the console that the set has been made. @@ -986,13 +986,14 @@ class Energy1CCMacro(Energy1Macro): """ Macro-motor for the energy 1 channel cut macro-motor. """ + def _verify_move(self, E1, string="", use_header=True, confirm_move=True, use_diag=True): """ Prints a summary of the current positions and the proposed positions of the motors based on the inputs. It then prompts the user to confirm the move. - + Parameters ---------- E1 : float @@ -1009,7 +1010,7 @@ def _verify_move(self, E1, string="", use_header=True, confirm_move=True, use_diag : bool, optional Add the diagnostic motor to the list of motors to verify. - + Returns ------- allowed : bool @@ -1022,7 +1023,7 @@ def _verify_move(self, E1, string="", use_header=True, confirm_move=True, # Get move for each motor in the delay towers for tower in self._delay_towers: string += "\n{:<15}|{:^15.4f}|{:^15.4f}".format( - tower.tth.desc, tower.tth.position, 2*bragg_angle(E1)) + tower.tth.desc, tower.tth.position, 2*bragg_angle(E1)) if use_diag: position_dd = self._get_delay_diagnostic_position(E1) @@ -1037,9 +1038,9 @@ def _verify_move(self, E1, string="", use_header=True, confirm_move=True, def _check_towers_and_diagnostics(self, E1, use_diag=True): """ - Checks the staus of the delay tower energy motors. Raises the basic + Checks the staus of the delay tower energy motors. Raises the basic motor errors if any of the motors are not ready to be moved - + Parameters ---------- E1 : float @@ -1047,12 +1048,12 @@ def _check_towers_and_diagnostics(self, E1, use_diag=True): use_diag : bool, optional Check the position of the diagnostic motor. - + Raises ------ LimitError Error raised when the inputted position is beyond the soft limits. - + MotorDisabled Error raised if the motor is disabled and move is requested. @@ -1084,25 +1085,25 @@ def _move_towers_and_diagnostics(self, E1, position_dd, use_diag=True): """ Moves the delay line energy motors and diagnostic to the inputted energy and diagnostic position. - + Parameters ---------- E1 : float Energy to set the delay line to. - + position_dd : float Position to move the delay diagnostic to. use_diag : bool, optional Move the daignostic motors to align with the beam. - + Returns ------- status : list Nested list of status objects from each tower. """ # Move the towers to the specified energy - status = [tower.tth.move(2*bragg_angle(E1), wait=False, + status = [tower.tth.move(2*bragg_angle(E1), wait=False, check_status=False) for tower in self._delay_towers] # Log the energy change @@ -1117,14 +1118,14 @@ def _move_towers_and_diagnostics(self, E1, position_dd, use_diag=True): def set_position(self, E1=None, print_set=True, verify_move=True, use_diag=True): """ - Sets the current positions of the motors in the towers to be the + Sets the current positions of the motors in the towers to be the calculated positions based on the inputted energies or delay. - + Parameters ---------- E1 : float or None, optional Energy to set the delay line to. - + print_set : bool, optional Print a message to the console that the set has been made. @@ -1158,21 +1159,22 @@ class Energy2Macro(MacroBase): """ Macro-motor for the energy 2 macro-motor. """ + def _get_channelcut_diagnostic_position(self, E2=None): """ - Gets the position the channel cut diagnostic needs to move to based on + Gets the position the channel cut diagnostic needs to move to based on the inputted energy or the current energy of the channel cut line. Parameters ---------- E2 : float or None, optional - Energy in eV to use for the channel cut line. Uses the current + Energy in eV to use for the channel cut line. Uses the current energy if None is inputted. Returns ------- position : float - Position in mm the delay diagnostic should move to given the + Position in mm the delay diagnostic should move to given the inputted parameters. """ # Use the current theta2 of the system or calc based on inputted energy @@ -1180,11 +1182,11 @@ def _get_channelcut_diagnostic_position(self, E2=None): theta2 = self.parent.theta2 else: theta2 = bragg_angle(E=E2) - + # Calculate position the diagnostic needs to move to position = 2*cosd(theta2)*self.gap return position - + # @property # def aligned(self, rtol=0, atol=0.001): # """ @@ -1194,7 +1196,7 @@ def _get_channelcut_diagnostic_position(self, E2=None): # ---------- # rtol : float, optional # Relative tolerance to use when comparing value differences. - + # atol : float, optional # Absolute tolerance to use when comparing value differences. @@ -1217,7 +1219,7 @@ def _verify_move(self, E2, string="", use_header=True, confirm_move=True, Prints a summary of the current positions and the proposed positions of the motors based on the inputs. It then prompts the user to confirm the move. - + Parameters ---------- E2 : float @@ -1234,7 +1236,7 @@ def _verify_move(self, E2, string="", use_header=True, confirm_move=True, use_diag : bool, optional Add the diagnostic motor to the list of motors to verify. - + Returns ------- allowed : bool @@ -1255,9 +1257,9 @@ def _verify_move(self, E2, string="", use_header=True, confirm_move=True, if use_diag: position_dcc = self._get_channelcut_diagnostic_position(E2) string += "\n{:<15}|{:^15.4f}|{:^15.4f}".format( - self.parent.dcc.x.desc, self.parent.dcc.x.position, + self.parent.dcc.x.desc, self.parent.dcc.x.position, position_dcc) - + # Prompt the user for a confirmation or return the string if confirm_move is True: return self._confirm_move(string) @@ -1266,9 +1268,9 @@ def _verify_move(self, E2, string="", use_header=True, confirm_move=True, def _check_towers_and_diagnostics(self, E2, use_diag=True): """ - Checks the staus of the channel cut tower energy motors. Raises the + Checks the staus of the channel cut tower energy motors. Raises the basic motor errors if any of the motors are not ready to be moved. - + Parameters ---------- E2 : float @@ -1276,12 +1278,12 @@ def _check_towers_and_diagnostics(self, E2, use_diag=True): use_diag : bool, optional Check the position of the diagnostic motor. - + Raises ------ LimitError Error raised when the inputted position is beyond the soft limits. - + MotorDisabled Error raised if the motor is disabled and move is requested. @@ -1311,26 +1313,26 @@ def _check_towers_and_diagnostics(self, E2, use_diag=True): def _move_towers_and_diagnostics(self, E2, position_dcc, use_diag=True): """ - Moves the channel cut line energy motors and diagnostic to the inputted + Moves the channel cut line energy motors and diagnostic to the inputted energy and diagnostic position. - + Parameters ---------- E2 : float Energy to set the channel cut line to. - + position_dcc : float or None, optional Position to move the channel cut diagnostic to. use_diag : bool, optional Move the daignostic motors to align with the beam. - + Returns ------- status : list Nested list of status objects from each tower. """ - status = [] + status = [] # Move the channel cut towers status += [tower.set_energy(E2, wait=False, check_status=False) for tower in self._channelcut_towers] @@ -1340,14 +1342,14 @@ def _move_towers_and_diagnostics(self, E2, position_dcc, use_diag=True): # Log the energy change logger.debug("Setting E2 to {0}.".format(E2)) - return status + return status @property @nan_if_no_parent def position(self): """ Returns the current energy of the channel cut line. - + Returns ------- energy : float @@ -1358,14 +1360,14 @@ def position(self): def set_position(self, E2=None, print_set=True, verify_move=True, use_diag=True): """ - Sets the current positions of the motors in the towers to be the + Sets the current positions of the motors in the towers to be the calculated positions based on the inputted energies or delay. - + Parameters ---------- E2 : float or None, optional Energy to set the channel cut line to. - + print_set : bool, optional Print a message to the console that the set has been made. @@ -1383,7 +1385,7 @@ def set_position(self, E2=None, print_set=True, verify_move=True, # Set position of each E2 motor in each channel cut tower for tower in self._channelcut_towers: - for motor, pos in zip(tower._energy_motors, + for motor, pos in zip(tower._energy_motors, tower._get_move_positions(E2)): motor.set_position(pos, print_set=False) diff --git a/hxrsnd/pneumatic.py b/hxrsnd/pneumatic.py index 4b5bebb..e3e504a 100644 --- a/hxrsnd/pneumatic.py +++ b/hxrsnd/pneumatic.py @@ -15,6 +15,9 @@ class PneuBase(SndDevice): """ Base class for the penumatics. """ + tab_component_names = True + tab_whitelist = ['status'] + def status(self, status="", offset=0, print_status=True, newline=False): """ Returns the status of the device. @@ -68,6 +71,7 @@ class ProportionalValve(PneuBase): valve : EpicsSignal Valve control and readback pv. """ + tab_whitelist = ['close', 'closed', 'open', 'opened', 'position'] valve = Cmp(EpicsSignal, ":VGP") def open(self): @@ -140,6 +144,7 @@ class PressureSwitch(PneuBase): pressure : EpicsSignalRO Pressure readbac signal. """ + tab_whitelist = ['bad', 'good', 'position'] pressure = Cmp(EpicsSignalRO, ":GPS") @property @@ -209,6 +214,8 @@ class SndPneumatics(SndDevice): vac_pressure : PressureSwitch Pressure switch on the overall system. """ + tab_whitelist = ['close', 'open', 'pressures', 'status', 'valves'] + t1_valve = Cmp(ProportionalValve, ":N2:T1", desc="T1 Valve") t4_valve = Cmp(ProportionalValve, ":N2:T4", desc="T4 Valve") vac_valve = Cmp(ProportionalValve, ":VAC", desc="Vacuum Valve") diff --git a/hxrsnd/sequencer.py b/hxrsnd/sequencer.py index cc5f668..a0b20e2 100644 --- a/hxrsnd/sequencer.py +++ b/hxrsnd/sequencer.py @@ -16,18 +16,19 @@ class SeqBase(SndDevice): """ Base sequencer class. """ + tab_whitelist = ['start', 'stop'] state_control = Cmp(EpicsSignal, ":PLYCTL") - + def start(self): """ Start the sequencer. """ status = self.state_control.set(1, timeout=self.timeout) status_wait(status) - + def stop(self): """ Stop the sequencer. """ status = self.state_control.set(0, timeout=self.timeout) - status_wait(status) + status_wait(status) diff --git a/hxrsnd/snddevice.py b/hxrsnd/snddevice.py index 88b30fb..bb8a3ea 100644 --- a/hxrsnd/snddevice.py +++ b/hxrsnd/snddevice.py @@ -4,21 +4,26 @@ import logging from ophyd.device import Device +from pcdsdevices.interface import BaseInterface logger = logging.getLogger(__name__) -class SndDevice(Device): +class SndDevice(BaseInterface, Device): """ Base Sndmotor class """ - def __init__(self, prefix, name=None, desc=None, set_timeout=1, *args, + + tab_component_names = True + tab_whitelist = ['st'] + + def __init__(self, prefix, name=None, desc=None, set_timeout=1, *args, **kwargs): super().__init__(prefix, name=name, *args, **kwargs) self.desc = desc or self.name self.set_timeout = set_timeout - def _apply_all(self, method, subclass=object, *method_args, + def _apply_all(self, method, subclass=object, *method_args, **method_kwargs): """ Runs the method for all devices that are of the inputted subclass. All @@ -51,7 +56,7 @@ def _apply_all(self, method, subclass=object, *method_args, def st(self, *args, **kwargs): """ Returns or prints the status of the device. Alias for 'device.status()'. - + Parameters ---------- print_status : bool, optional @@ -60,7 +65,7 @@ def st(self, *args, **kwargs): short : bool, optional Use a shortened list or all relevant parameters """ - return self.status(*args, **kwargs) + return self.status(*args, **kwargs) def __repr__(self): """ diff --git a/hxrsnd/sndmotor.py b/hxrsnd/sndmotor.py index e996817..d327e17 100644 --- a/hxrsnd/sndmotor.py +++ b/hxrsnd/sndmotor.py @@ -1,24 +1,23 @@ """ Script for abstract motor classes used in the SnD. """ -import time import logging -from functools import reduce +import time from collections import OrderedDict +from functools import reduce import pandas as pd +from bluesky.preprocessors import run_wrapper from ophyd.device import Component as Cmp from ophyd.signal import Signal from ophyd.utils import LimitError from pcdsdevices.epics_motor import PCDSMotorBase from pcdsdevices.mv_interface import FltMvInterface -from bluesky.preprocessors import run_wrapper -from pcdsdevices.signal import Signal -from .snddevice import SndDevice +from .exceptions import InputError from .plans.calibration import calibrate_motor from .plans.preprocessors import return_to_start as _return_to_start -from .exceptions import InputError +from .snddevice import SndDevice from .utils import as_list logger = logging.getLogger(__name__) @@ -35,7 +34,7 @@ class SndMotor(FltMvInterface, SndDevice): class SndEpicsMotor(PCDSMotorBase, SndMotor): """ - SnD motor that inherits from EpicsMotor, therefore having all the relevant + SnD motor that inherits from EpicsMotor, therefore having all the relevant signals """ direction_of_travel = Cmp(Signal) @@ -57,7 +56,7 @@ def check_value(self, value, retries=5): """ if value is None: raise ValueError('Cannot write None to epics PVs') - + for i in range(retries): try: low_limit, high_limit = self.limits @@ -75,14 +74,16 @@ def check_value(self, value, retries=5): # TODO: Add ability to save caibrations to disk # TODO: Add ability to load calibrations from disk # TODO: Add ability to display calibrations -# TODO: Add ability to change post-processing done to scan. +# TODO: Add ability to change post-processing done to scan. # TODO: Add ability to redo scaling on scan class CalibMotor(SndDevice): """ Provides the calibration macro methods. """ - def __init__(self, prefix, name=None, calib_detector=None, - calib_motors=None, calib_fields=None, motor_fields=None, + tab_whitelist = ['calibrate', 'calibration', 'has_calib', 'use_calib'] + + def __init__(self, prefix, name=None, calib_detector=None, + calib_motors=None, calib_fields=None, motor_fields=None, *args, **kwargs): super().__init__(prefix, name=name, *args, **kwargs) self.calib_motors = calib_motors @@ -95,7 +96,7 @@ def __init__(self, prefix, name=None, calib_detector=None, def calibrate(self, start, stop, steps, average=100, confirm_overwrite=True, detector=None, detector_fields=None, RE=None, return_to_start=True, *args, **kwargs): - """Performs a calibration scan for this motor and updates the + """Performs a calibration scan for this motor and updates the configuration. Warning: This has not been commissioned. @@ -128,7 +129,7 @@ def calibrate(self, start, stop, steps, average=100, confirm_overwrite=True, return_to_start : bool, optional Move all the motors to their original positions after the scan has been - completed + completed """ # Remove this once the calibration routine has been tested logger.warning('Calibration functionality has not been commissioned.') @@ -144,7 +145,7 @@ def inner(): _, _ = yield from calibrate_motor( detector, detector_fields, - self, + self, self.motor_fields, self.calib_motors, self.calib_fields, @@ -153,7 +154,7 @@ def inner(): confirm_overwrite=confirm_overwrite, return_to_start=False, *args, **kwargs) - + # Run the inner plan RE(run_wrapper(inner())) @@ -161,30 +162,30 @@ def inner(): def calibration(self): """ Returns the current calibration of the motor as a dictionary. - + Returns ------- calibration : dict - Dictionary containing calib, motors, scan, scale, and start + Dictionary containing calib, motors, scan, scale, and start calibration parameters. """ if self.has_calib: config = self.read_configuration() # Grab the values of each of the calibration parameters - calib = {fld : config[fld]['value'] + calib = {fld: config[fld]['value'] for fld in ['calib', 'scan', 'scale', 'start']} # Make sure there are motors before we iterate through the list if config['motors']['value']: # If the motors have name attributes, just return those - calib['motors'] = [mot.name if hasattr(mot, 'name') else mot + calib['motors'] = [mot.name if hasattr(mot, 'name') else mot for mot in config['motors']['value']] else: calib['motors'] = None return calib - else: + else: return None - def configure(self, *, calib=None, motors=None, scan=None, scale=None, + def configure(self, *, calib=None, motors=None, scan=None, scale=None, start=None): """ Configure the calib-motor's move parameters. @@ -205,8 +206,8 @@ def configure(self, *, calib=None, motors=None, scan=None, scale=None, List of scales in the units of motor egu / detector value start : list, optional - List of the initial positions of the motors before the walk - + List of the initial positions of the motors before the walk + Returns ------- configs : tuple of dict @@ -244,20 +245,20 @@ def _config_calib(self, calib, motors, scan, scale, start): List of scales in the units of motor egu / detector value start : list - List of the initial positions of the motors before the walk + List of the initial positions of the motors before the walk """ # Start with all the previous calibration parameters save_calib = OrderedDict(self._calib) motors = as_list(motors) or None # Add in the new parameters if they are not None or empty. If they are, - # None or empty, check if they already exist as keys in the dict and + # None or empty, check if they already exist as keys in the dict and # only add them if they do not. - for key, value in {'calib': calib, 'motors': motors, 'scan': scan, + for key, value in {'calib': calib, 'motors': motors, 'scan': scan, 'scale': scale, 'start': start}.items(): if value is not None or (value is None and key not in save_calib): save_calib[key] = {'value': value, 'timestamp': time.time()} - + # Now check all those changes, raising errors if needed self._check_calib(save_calib) # We made it through the check, therefore it is safe to use @@ -276,7 +277,7 @@ def _check_calib(self, save_calib): table and motors have a mismatched number of columns and motors. TypeError - If a correction table is passed that is not a dataframe. + If a correction table is passed that is not a dataframe. """ # Let's get all the values we will update the calibration with calib = save_calib['calib']['value'] @@ -286,7 +287,8 @@ def _check_calib(self, save_calib): start = save_calib['start']['value'] # If no correction table is passed, then there isn't anything to check - if calib is None: pass + if calib is None: + pass # We have a correction table but it isnt a Dataframe elif not isinstance(calib, pd.DataFrame): @@ -304,7 +306,7 @@ def _check_calib(self, save_calib): elif len(calib.columns) != len(motors): raise InputError("Mismatched calibration size and number of " "motors. Got {0} columns for {1} motors.".format( - len(calib.columns),len(motors))) + len(calib.columns), len(motors))) # We have the correct correction table and motors but one of the of the # extra parameters were not passed, which is critical for corrected @@ -320,7 +322,7 @@ def _calib_compensate(self, position, *args, **kwargs): and the user has indicated that they want to perform them. Parameters - ---------- + ---------- position Position to move to. @@ -337,11 +339,11 @@ def _calib_compensate(self, position, *args, **kwargs): # Only perform the compensation if there is a valid calibration and we # want to use the calibration if not self.has_calib or not self.use_calib: - return + return # Grab the two rows where the main motor position (column 0) is # closest to the inputted position - top = calib.iloc[(calib.iloc[:,0] - position).abs().argsort().iloc[:2]] + top = calib.iloc[(calib.iloc[:, 0] - position).abs().argsort().iloc[:2]] first, second = top.iloc[0], top.iloc[1] # Get the slope between the lines between each of the motor values using @@ -365,21 +367,21 @@ def has_calib(self): """ Indicator if there is a valid calibration that can be used to perform correction moves. - + Because the only requirements to perform a corrected move are the correction table and the calibration motors, they are the only calibration parameters that must be defines. These parameters must also pass ``_check_calib`` without raising any exceptions. - + Returns ------- has_calib : bool - True if there is a calibration that can be used for correction + True if there is a calibration that can be used for correction motion, False otherwise. """ # Grab the current correction table and calibration motors calib = self._calib['calib']['value'] - motors = self._calib['motors']['value'] + _ = self._calib['motors']['value'] # Return False if we dont have a correction table if calib is None: @@ -388,29 +390,29 @@ def has_calib(self): # If we make it through the check, we have a valid calibration self._check_calib(self._calib) return True - except: + except Exception: # An exception was raised, the config is somehow invalid return False @property def use_calib(self): """ - Returns whether the user indicated that corrected motions should be + Returns whether the user indicated that corrected motions should be used. - + Returns ------- use_calib : bool Internal indicator if the user wants to perform correction motions. """ return self._use_calib - + @use_calib.setter def use_calib(self, indicator): """ - Setter for use_calib. Will warn the user if corrected motions are + Setter for use_calib. Will warn the user if corrected motions are desired but there is no valid configuration to use. - + Parameters ---------- indicator : bool @@ -420,7 +422,7 @@ def use_calib(self, indicator): if self._use_calib is True and not self.has_calib: logger.warning("use_calib is currently set to True but there is " "no valid calibration to use") - + def read_configuration(self): return self._calib @@ -431,7 +433,6 @@ def describe_configuration(self): shape = self._calib['calib']['value'].shape else: shape = [len(self._calib)] - return OrderedDict(**dict(calib=dict(source='calibrate', dtype='array', - shape=shape)), + return OrderedDict(**dict(calib=dict(source='calibrate', dtype='array', + shape=shape)), **super().describe_configuration()) - diff --git a/hxrsnd/sndsystem.py b/hxrsnd/sndsystem.py index def4f36..4d2b67d 100644 --- a/hxrsnd/sndsystem.py +++ b/hxrsnd/sndsystem.py @@ -3,17 +3,16 @@ All units of time are in picoseconds, units of length are in mm. """ -import os import logging from ophyd import Component as Cmp -from .snddevice import SndDevice +from .diode import HamamatsuXMotionDiode, HamamatsuXYMotionCamDiode +from .macromotor import DelayMacro, Energy1CCMacro, Energy1Macro, Energy2Macro from .pneumatic import SndPneumatics +from .snddevice import SndDevice +from .tower import ChannelCutTower, DelayTower from .utils import absolute_submodule_path -from .tower import DelayTower, ChannelCutTower -from .diode import HamamatsuXMotionDiode, HamamatsuXYMotionCamDiode -from .macromotor import Energy1Macro, Energy1CCMacro, Energy2Macro, DelayMacro logger = logging.getLogger(__name__) @@ -41,7 +40,7 @@ class SplitAndDelay(SndDevice): di : HamamatsuXYMotionCamDiode Input diode for the system. - + dd : HamamatsuXYMotionCamDiode Diode between the two delay towers. @@ -50,10 +49,10 @@ class SplitAndDelay(SndDevice): dci : HamamatsuXMotionDiode Input diode for the channel cut line. - + dcc : HamamatsuXMotionDiode Diode between the two channel cut towers. - + dco : HamamatsuXMotionDiode Input diode for the channel cut line. @@ -66,16 +65,19 @@ class SplitAndDelay(SndDevice): delay : DelayMacro Delay pseudomotor. """ + tab_component_names = True + tab_whitelist = ['st', 'status', 'diag_status', 'theta1', 'theta2', + 'main_screen', 'status'] # Delay Towers - t1 = Cmp(DelayTower, ":T1", pos_inserted=21.1, pos_removed=0, - desc="Tower 1") - t4 = Cmp(DelayTower, ":T4", pos_inserted=21.1, pos_removed=0, - desc="Tower 4") + t1 = Cmp(DelayTower, ":T1", pos_inserted=21.1, pos_removed=0, + desc="Tower 1") + t4 = Cmp(DelayTower, ":T4", pos_inserted=21.1, pos_removed=0, + desc="Tower 4") # Channel Cut Towers - t2 = Cmp(ChannelCutTower, ":T2", pos_inserted=None, pos_removed=0, + t2 = Cmp(ChannelCutTower, ":T2", pos_inserted=None, pos_removed=0, desc="Tower 2") - t3 = Cmp(ChannelCutTower, ":T3", pos_inserted=None, pos_removed=0, + t3 = Cmp(ChannelCutTower, ":T3", pos_inserted=None, pos_removed=0, desc="Tower 3") # Pneumatic Air Bearings @@ -89,14 +91,14 @@ class SplitAndDelay(SndDevice): # Channel Cut Diagnostics dci = Cmp(HamamatsuXMotionDiode, ":DIA:DCI", block_pos=-5, desc="DCI") dcc = Cmp(HamamatsuXYMotionCamDiode, ":DIA:DCC", block_pos=-5, desc="DCC") - dco = Cmp(HamamatsuXMotionDiode, ":DIA:DCO", block_pos=-5, desc="DCO") + dco = Cmp(HamamatsuXMotionDiode, ":DIA:DCO", block_pos=-5, desc="DCO") # Macro motors E1 = Cmp(Energy1Macro, "", desc="Delay Energy") E1_cc = Cmp(Energy1CCMacro, "", desc="CC Delay Energy") E2 = Cmp(Energy2Macro, "", desc="CC Energy") delay = Cmp(DelayMacro, "", desc="Delay") - + def __init__(self, prefix, name=None, daq=None, RE=None, *args, **kwargs): super().__init__(prefix, name=name, *args, **kwargs) self.daq = daq @@ -109,11 +111,11 @@ def __init__(self, prefix, name=None, daq=None, RE=None, *args, **kwargs): self._diagnostics = self._delay_diagnostics+self._channelcut_diagnostics # Set the position calculators of dd and dcc - self.dd.pos_func = lambda : \ - self.E1._get_delay_diagnostic_position() - self.dcc.pos_func = lambda : \ - self.E2._get_channelcut_diagnostic_position() - + self.dd.pos_func = lambda: \ + self.E1._get_delay_diagnostic_position() + self.dcc.pos_func = lambda: \ + self.E2._get_channelcut_diagnostic_position() + def diag_status(self): """ Prints a string containing the blocking status and the position of the @@ -135,7 +137,7 @@ def theta1(self): Returns ------- theta1 : float - The bragg angle the delay line is currently set to maximize + The bragg angle the delay line is currently set to maximize in degrees. """ return self.t1.theta @@ -149,10 +151,10 @@ def theta2(self): Returns ------- theta2 : float - The bragg angle the channel cut line is currently set to maximize + The bragg angle the channel cut line is currently set to maximize in degrees. """ - return self.t2.theta + return self.t2.theta def main_screen(self, print_msg=True): """ @@ -162,17 +164,19 @@ def main_screen(self, print_msg=True): path = absolute_submodule_path("hxrsnd/screens/snd_main") if print_msg: logger.info("Launching expert screen.") - os.system("{0} {1} {2} &".format(path, p, axis)) - + logger.error('TODO - not yet implemented (path: %s)', path) + # NOTE: this was the command ,where `p` and `axis` were not defined: + # os.system("{0} {1} {2} &".format(path, p, axis)) + def status(self, print_status=True): """ Returns the status of the split and delay system. - + Returns ------- - Status : str + Status : str """ - status = "Split and Delay System Status\n" + status = "Split and Delay System Status\n" status += "-----------------------------" status = self.E1.status(status, 0, print_status=False) status = self.E2.status(status, 0, print_status=False) diff --git a/hxrsnd/tower.py b/hxrsnd/tower.py index 323e851..1f04bd3 100644 --- a/hxrsnd/tower.py +++ b/hxrsnd/tower.py @@ -7,31 +7,36 @@ from ophyd import Component as Cmp from ophyd.status import wait as status_wait -from .snddevice import SndDevice +from .aerotech import (AeroBase, InterLinearAero, InterRotationAero, + LinearAero, RotationAero) +from .attocube import DiodeEcc, EccBase, GoniometerEcc, TranslationEcc from .bragg import bragg_angle, bragg_energy -from .attocube import EccBase, TranslationEcc, GoniometerEcc, DiodeEcc -from .aerotech import (AeroBase, RotationAero, InterRotationAero, - LinearAero, InterLinearAero) +from .snddevice import SndDevice logger = logging.getLogger(__name__) + class TowerBase(SndDevice): """ Base tower class. """ - def __init__(self, prefix, name=None, pos_inserted=None, pos_removed=None, + tab_whitelist = ['check_status', 'clear', 'disable', 'enable', 'energy', + 'insert', 'inserted', 'position', 'remove', 'set_energy', + 'status', 'stop', 'theta'] + + def __init__(self, prefix, name=None, pos_inserted=None, pos_removed=None, *args, **kwargs): super().__init__(prefix, name=name, *args, **kwargs) self.pos_inserted = pos_inserted self.pos_removed = pos_removed self.desc_short = "".join([s[0] for s in self.desc.split(" ")]) - + # Add Tower short name to desc for sig_name in self.component_names: signal = getattr(self, sig_name) if hasattr(signal, "desc"): signal.desc = "{0} {1}".format(self.desc_short, signal.desc) - + def set_energy(self, E, *args, **kwargs): """ Placeholder for the energy setter. Implement for each TowerBase @@ -56,7 +61,7 @@ def energy(self): @energy.setter def energy(self, E): """ - Sets the theta of the tower to the desired energy. Alias for + Sets the theta of the tower to the desired energy. Alias for set_energy(E). Parameters @@ -64,8 +69,8 @@ def energy(self, E): E : float Desired energy to set the tower to. """ - status = self.set_energy(E) - + _ = self.set_energy(E) + @property def position(self): """ @@ -119,7 +124,7 @@ def remove(self, *args, **kwargs): If pos_removed is set to None and remove() is called. """ if self.pos_removed is None: - raise ValueError("Must set pos_removed to use remove method.") + raise ValueError("Must set pos_removed to use remove method.") return self.x.move(self.pos_removed, *args, **kwargs) @property @@ -135,7 +140,7 @@ def inserted(self): Raises ------ ValueError - If pos_inserted is set to None and inserted is called. + If pos_inserted is set to None and inserted is called. """ if self.pos_inserted is None: raise ValueError("Must set pos_inserted to check if inserted.") @@ -161,7 +166,7 @@ def _get_move_positions(self, E): def check_status(self, energy=None, length=None, no_raise=False): """ - Checks to make sure that all the energy motors are not in a bad state. + Checks to make sure that all the energy motors are not in a bad state. Will include the delay motor if the delay argument is True. Parameters @@ -182,9 +187,9 @@ def check_status(self, energy=None, length=None, no_raise=False): # Get all the energy parameters if energy is not None: motors += self._energy_motors - theta = bragg_angle(energy) + _ = bragg_angle(energy) positions += self._get_move_positions(energy) - + # Get the delay parameters try: if length is not None: @@ -193,7 +198,7 @@ def check_status(self, energy=None, length=None, no_raise=False): except AttributeError: if not no_raise: raise - + # Check that we can move all the motors for motor, position in zip(motors, positions): try: @@ -208,7 +213,7 @@ def stop(self): Stops the motions of all the motors. """ self._apply_all("stop", (AeroBase, EccBase), print_set=False) - + def enable(self): """ Enables all the aerotech motors. @@ -227,16 +232,16 @@ def clear(self): """ self._apply_all("clear", AeroBase, print_set=False) - def status(self, status="", offset=0, print_status=True, newline=False, + def status(self, status="", offset=0, print_status=True, newline=False, short=True): """ Returns the status of the tower. - + Parameters ---------- status : str, optional The string to append the status to. - + offset : int, optional Amount to offset each line of the status. @@ -258,30 +263,35 @@ def status(self, status="", offset=0, print_status=True, newline=False, # Aerotech body status_list_aero = self._apply_all( - "status", AeroBase, offset=offset+2, print_status=False, + "status", AeroBase, offset=offset+2, print_status=False, short=True) if status_list_aero: # Aerotech header - status += "\n{0}{1:<16}|{2:^16}|{3:^16}\n{4}{5}".format( - " "*(offset+2), "Motor", "Position", "Dial", " "*(offset+2), - "-"*50) + status += ( + "\n{0}{1:<16}|{2:^16}|{3:^16}\n{4}{5}".format( + " "*(offset+2), "Motor", "Position", "Dial", " "*(offset+2), "-"*50 + ) + ) status += "".join(status_list_aero) # Attocube body status_list_atto = self._apply_all( - "status", EccBase, offset=offset+2, print_status=False, + "status", EccBase, offset=offset+2, print_status=False, short=True) if status_list_atto: # Attocube Header - status += "\n{0}{1}\n{2}{3:<16}|{4:^16}|{5:^16}\n{6}{7}".format( - " "*(offset+2), "-"*50, " "*(offset+2), "Motor", "Position", - "Reference", " "*(offset+2), "-"*50) + status += ( + "\n{0}{1}\n{2}{3:<16}|{4:^16}|{5:^16}\n{6}{7}".format( + " "*(offset+2), "-"*50, " " * (offset+2), + "Motor", "Position", "Reference", " "*(offset+2), "-"*50) + ) status += "".join(status_list_atto) else: status += "{0}{1}:\n{2}{3}\n".format( - " "*offset, self.desc, " "*offset, "-"*(len(self.desc)+1)) - status_list = self._apply_all("status", (AeroBase, EccBase), + " "*offset, self.desc, " "*offset, "-"*(len(self.desc)+1) + ) + status_list = self._apply_all("status", (AeroBase, EccBase), offset=offset+2, print_status=False) status += "".join(status_list) @@ -292,11 +302,11 @@ def status(self, status="", offset=0, print_status=True, newline=False, else: return status - + class DelayTower(TowerBase): """ Delay Tower - + Components tth : RotationAeroInterlocked @@ -335,6 +345,8 @@ class DelayTower(TowerBase): temp : OmegaRTD RTD temperature sensor for the nitrogen. """ + tab_whitelist = ['length', 'position', 'set_energy', 'set_length', 'theta'] + # Rotation stages tth = Cmp(InterRotationAero, ":TTH", desc="TTH") th1 = Cmp(RotationAero, ":TH1", desc="TH1") @@ -354,13 +366,13 @@ class DelayTower(TowerBase): # Diode motion dh = Cmp(DiodeEcc, ":DH", desc="DH") - + # # Diode # diode = Cmp(HamamatsuDiode, ":DIODE", desc="Tower Diode") # # Temperature monitor # temp = Cmp(OmegaRTD, ":TEMP", desc="Tower RTD") - + def __init__(self, prefix, *args, **kwargs): super().__init__(prefix, *args, **kwargs) self._energy_motors = [self.tth, self.th1, self.th2] @@ -408,8 +420,8 @@ def _get_move_positions(self, E): def set_energy(self, E, wait=False, check_status=True): """ Sets the angles of the crystals in the delay line to maximize the - inputted energy. - + inputted energy. + Parameters --------- E : float @@ -424,10 +436,10 @@ def set_energy(self, E, wait=False, check_status=True): # Check to make sure the motors are in a valid state to move if check_status: self.check_status(energy=E) - + # Perform the move status = [motor.move(pos, wait=False, check_status=False) for - motor, pos in zip(self._energy_motors, + motor, pos in zip(self._energy_motors, self._get_move_positions(E))] # Wait for the motions to finish @@ -436,7 +448,7 @@ def set_energy(self, E, wait=False, check_status=True): logger.info("Waiting for {} to finish move ...".format( s.device.name)) status_wait(s) - + return status def set_length(self, position, wait=False, *args, **kwargs): @@ -480,7 +492,7 @@ def length(self, position): position : float Position to move the delay motor to. """ - status = self.L.mv(position) + _ = self.L.mv(position) @property def theta(self): @@ -492,8 +504,8 @@ def theta(self): position : float Current position of the tower. """ - return self.position/2 - + return self.position/2 + class ChannelCutTower(TowerBase): """ @@ -507,6 +519,8 @@ class ChannelCutTower(TowerBase): x : LinearAero Translation stage of the tower """ + tab_whitelist = ['position', 'set_energy'] + # Rotation th = Cmp(RotationAero, ":TH", desc="TH") @@ -532,8 +546,8 @@ def position(self): def set_energy(self, E, wait=False, check_status=True): """ Sets the angles of the crystals in the channel cut line to maximize the - inputted energy. - + inputted energy. + Parameters --------- E : float @@ -547,7 +561,7 @@ def set_energy(self, E, wait=False, check_status=True): """ # Convert to theta theta = bragg_angle(E=E) - + # Check to make sure the motors are in a valid state to move if check_status: self.check_status(E) diff --git a/hxrsnd/utils.py b/hxrsnd/utils.py index 189a166..1e490b8 100644 --- a/hxrsnd/utils.py +++ b/hxrsnd/utils.py @@ -3,11 +3,10 @@ """ import inspect import logging +from collections.abc import Iterable +from functools import wraps from math import nan from pathlib import Path -from functools import wraps -from collections.abc import Iterable - logger = logging.getLogger(__name__) @@ -70,7 +69,7 @@ def as_list(obj, length=None, tp=None, iter_to_list=True): if length is None: return [] return [None] * length - + # If it is already a list do nothing elif isinstance(obj, list): pass @@ -78,22 +77,23 @@ def as_list(obj, length=None, tp=None, iter_to_list=True): # If it is an iterable (and not str), convert it to a list elif isiterable(obj) and iter_to_list: obj = list(obj) - + # Otherwise, just enclose in a list making it the inputted length else: try: obj = [obj] * length except TypeError: obj = [obj] - + # Cast to type; Let exceptions here bubble up to the top. if tp is not None: obj = [tp(o) for o in obj] return obj + def isiterable(obj): """ - Function that determines if an object is an iterable, not including + Function that determines if an object is an iterable, not including str. Parameters @@ -111,6 +111,7 @@ def isiterable(obj): else: return isinstance(obj, Iterable) + def _flatten(inp_iter): """ Recursively iterate through values in nested iterables. @@ -131,7 +132,8 @@ def _flatten(inp_iter): yield ival else: yield val - + + def flatten(inp_iter): """ Returns a flattened list of the inputted iterable. @@ -148,10 +150,11 @@ def flatten(inp_iter): """ return list(_flatten(inp_iter)) + def stop_on_keyboardinterrupt(func): """ Decorator that runs the object's `stop` method if a keyboard interrupt is - raised. This is meant to be used on ophyd device methods and expects the + raised. This is meant to be used on ophyd device methods and expects the first argument to be `self`. """ @wraps(func) @@ -169,6 +172,7 @@ def stop_dev_on_keyboardinterrupt(obj, *args, **kwargs): obj)) return stop_dev_on_keyboardinterrupt + def nan_if_no_parent(method): """ Decorator that will return None if the object passed via self does not have