From a33eb8283873d2d85ed66e5348bd7ec0f81bc3ac Mon Sep 17 00:00:00 2001 From: Florian Rau Date: Tue, 15 Aug 2023 11:16:59 +0100 Subject: [PATCH] refactoring --- .../hardware_validation/verify_hardware.py | 154 ++++++++++-------- 1 file changed, 84 insertions(+), 70 deletions(-) diff --git a/scripts/hardware_validation/verify_hardware.py b/scripts/hardware_validation/verify_hardware.py index 735ad8565..2b9b11070 100644 --- a/scripts/hardware_validation/verify_hardware.py +++ b/scripts/hardware_validation/verify_hardware.py @@ -18,17 +18,46 @@ # set up logging log = setup_logger('iblrig', level='DEBUG') +issues = 0 + # function for querying a serial device -def query(s_obj, req, n=1): +def query(s_obj, req, n=1, end: bool = False): s_obj.write(req) return s.read(n) -issues = 0 +def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False): + global issues + tree = '└' if last else '├' + match msg_type: + case 'head': + pass + case 'pass': + level = 'info' + symbol = '✔' + case 'info': + level = 'info' + symbol = 'i' + case 'warn': + level = 'warning' + symbol = '!' + case 'fail': + level = 'critical' + symbol = '✘' + issues += 1 + case _: + level = 'critical' + symbol = '?' + + if msg_type == 'head': + log.info('\033[1m' + msg) + else: + getattr(log, level)(f' {tree} {symbol} {msg}') + # read hardware_settings.yaml -log.info('Checking hardware_settings.yaml:') +log_fun('head', 'Checking hardware_settings.yaml:') file_settings = Path(iblrig.__file__).parents[1].joinpath('settings', 'hardware_settings.yaml') hw_settings = iblrig.path_helper.load_settings_yaml(file_settings) @@ -39,71 +68,66 @@ def query(s_obj, req, n=1): # check for undefined serial ports (warn only) tmp = [(k, v) for k, v in ports.items() if v is None] for p in tmp: - log.warning(f" ├ ✘ {p[0]} is undefined (OK if you don't want to use it)") + log_fun('fail', f"{p[0]} is undefined (OK if you don't want to use it)") if not any(tmp): - log.info(' ├ ✔ no undefined serial ports found') + log_fun('pass', 'no undefined serial ports found') ports = {k: v for k, v in ports.items() if v is not None} # check for duplicate serial ports seen = set() for p in [x for x in ports.values() if x in seen or seen.add(x)]: - log.critical(f' ├ ✘ Duplicate serial port: "{p}"') - issues += 1 + log_fun('fail', f'Duplicate serial port: "{p}"') else: - log.info(' ├ ✔ no duplicate serial ports found') + log_fun('pass', 'no duplicate serial ports found') # collect valid ports match platform.system(): case 'Windows': valid_ports = [f'COM{i + 1}' for i in range(256)] case 'Linux': - valid_ports = glob('/dev/tty[A-Za-z]*') + valid_ports = glob('/dev/tty*') case _: - log.critical(f' ├ ✘ Unsupported platform: "{platform.system()}"') - issues += 1 + raise Exception(f'Unsupported platform: "{platform.system()}"') # check for invalid port-strings for p in [(k, v) for k, v in ports.items() if v not in valid_ports]: - log.critical(f'✘ Invalid serial port: "{p[1]}"') - issues += 1 + log_fun('fail', f'Invalid serial port: "{p[1]}"', last=True) else: - log.info(' └ ✔ no invalid port-strings found') + log_fun('pass', 'no invalid port-strings found', last=True) # check individual serial ports port_info = [i for i in serial.tools.list_ports.comports()] for (description, port) in ports.items(): - log.info(f'Checking serial port {description} ({port}):') + log_fun('head', f'Checking serial port {description} ({port}):') # check if serial port exists try: info = [i for i in serial.tools.list_ports.comports() if i.device == port][0] except IndexError: - log.critical( - f' ├ ✘ "{port}" ({description}) cannot be found - is the device connected to the computer?') - issues += 1 + log_fun('fail', f'{port} ({description}) cannot be found - is the device connected to the computer?', last=True) + continue else: - log.info(' ├ ✔ serial port exists') + log_fun('pass', 'serial port exists') # check if serial ports can be connected to try: s = Serial(port, timeout=1, writeTimeout=1) except serial.SerialException: - log.critical(f' ├ ✘ Cannot connect to "{port}" ({description}) - is another process using the port?') - issues += 1 + log_fun('fail', f'Cannot connect to {port} ({description}) - is another process using the port?', last=True) + continue else: - log.info(' ├ ✔ serial port can be connected to') + log_fun('pass', 'serial port can be connected to') # check correct assignments of serial ports + ok = False match description: case "COM_BPOD": - if query(s, b'6') == b'5': + device_name = 'Bpod Finite State Machine' + ok = query(s, b'6') == b'5' + if ok: s.write(b'Z') - log.info(' └ ✔ device seems to be a Bpod Finite State Machines') - else: - log.critical( - f' └ ✘ Device on port "{port}" does not appear to be a Bpod.') - issues += 1 case "COM_F2TTL": + device_name = 'Frame2TTL' try: s.write(b'C') except serial.serialutil.SerialTimeoutException: @@ -111,26 +135,22 @@ def query(s_obj, req, n=1): port_info = next((p for p in port_info if p.name == s.name), None) # SAMD21 mini issue: not recognized by windows after reboot. Confirmed by Josh. - log.critical( - f' └ ✘ Writing to port "{port}" timed out. Try to unplug and plug the device.') - issues += 1 - if s.read() == (218).to_bytes(1, 'little'): - log.info(' └ ✔ device seems to be a Frame2TTL') - else: - log.critical( - f' └ ✘ Device on port "{port}" does not appear to be a Frame2TTL.') - issues += 1 + log_fun('fail', f'writing to port {port} timed out. Try to unplug and plug the device.', last=True) + continue + finally: + ok = s.read() == (218).to_bytes(1, 'little') case "COM_ROTARY_ENCODER": - if len(query(s, b'Q', 2)) > 1 and query(s, b'P00', 1) == (1).to_bytes(1, 'little'): - log.info(' └ ✔ device seems to be a Rotary Encoder Module') - else: - log.critical(f' └ ✘ Device on port "{port}" does not appear to be a Rotary Encoder Module.') - issues += 1 + device_name = 'Rotary Encoder Module' + ok = len(query(s, b'Q', 2)) > 1 and query(s, b'P00', 1) == (1).to_bytes(1, 'little') case _: - log.critical(' └ ✘ How did you get here??') - issues += 1 + raise Exception('How did you get here??') s.close() + if ok: + log_fun('pass', f'Device on port {port} seems to be a {device_name}', last=True) + else: + log_fun('fail', f'Device on port {port} does not appear to be a {device_name}', last=True) + # To Do: Look into this required delay time.sleep(.02) @@ -139,16 +159,14 @@ def query(s_obj, req, n=1): modules = [m for m in bpod.bpod_modules.modules if m.connected] if 'COM_ROTARY_ENCODER' in ports.keys(): - log.info('Checking Rotary Encoder Module:') + log_fun('head', 'Checking Rotary Encoder Module:') module = [m for m in modules if m.name.startswith('RotaryEncoder')] if len(module) == 0: - log.critical(' ├ ✘ Rotary Encoder Module is not connected to the Bpod') - issues += 1 + log_fun('fail', 'Rotary Encoder Module is not connected to the Bpod') elif len(module) > 1: - log.critical(' ├ ✘ More than one Rotary Encoder Module connected to the Bpod') - issues += 1 + log_fun('fail', 'More than one Rotary Encoder Module connected to the Bpod') else: - log.info(f' ├ ✔ module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}') + log_fun('pass', f'module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}') s = serial.Serial(ports['COM_ROTARY_ENCODER']) s.write(b'I') @@ -156,51 +174,47 @@ def query(s_obj, req, n=1): if s.in_waiting == 0: s.write(b'x') v = "1.x" if s.read(1) == (1).to_bytes(1, 'little') else "2+" - log.info(f' ├ i hardware version: {v}') - log.info(f' ├ i firmware version: {bpod.modules[0].firmware_version}') + log_fun('info', f'hardware version: {v}') + log_fun('info', f'firmware version: {bpod.modules[0].firmware_version}') s.write(b'Z') p = np.frombuffer(query(s, b'Q', 2), dtype=np.int16)[0] - log.warning(' ├ ! please move the wheel to the left (animal\'s POV) by a quarter turn') + log_fun('warn', 'please move the wheel to the left (animal\'s POV) by a quarter turn') while np.abs(p) < 200: p = np.frombuffer(query(s, b'Q', 2), dtype=np.int16)[0] if p > 0: - log.critical(' └ ✘ Rotary encoder seems to be wired incorrectly - try swapping A and B') - issues += 1 + log_fun('fail', 'Rotary encoder seems to be wired incorrectly - try swapping A and B', last=True) else: - log.info(' └ ✔ rotary encoder is wired correctly') + log_fun('pass', 'rotary encoder is wired correctly', last=True) s.close() if 'device_sound' in hw_settings and 'OUTPUT' in hw_settings['device_sound']: match hw_settings['device_sound']['OUTPUT']: case 'harp': - log.info('Checking Harp Sound Card:') + log_fun('head', 'Checking Harp Sound Card:') dev = usb.core.find(idVendor=0x04D8, idProduct=0xEE6A) if not dev: log.critical(' ├ ✘ Cannot find Harp Sound Card') - issues += 1 else: - log.info(' ├ ✔ found USB device {:04X}:{:04X} (Harp Sound Card)'.format(dev.idVendor, dev.idProduct)) + log_fun('pass', 'found USB device {:04X}:{:04X} (Harp Sound Card)'.format(dev.idVendor, dev.idProduct)) dev = next((p for p in serial.tools.list_ports.comports() if (p.vid == 1027 and p.pid == 24577)), None) if not dev: log.critical( ' ├ ✘ Cannot find Harp Sound Card\'s Serial port - did you plug in *both* USB ports of the device?') - issues += 1 else: - log.info(' ├ ✔ found USB device {:04X}:{:04X} (FT232 UART), serial port: {}'.format(dev.vid, dev.pid, + log_fun('pass', 'found USB device {:04X}:{:04X} (FT232 UART), serial port: {}'.format(dev.vid, dev.pid, dev.name)) module = [m for m in modules if m.name.startswith('SoundCard')] if len(module) == 0: - log.critical(' └ ✘ Harp Sound Card is not connected to the Bpod') - issues += 1 + log_fun('fail', 'Harp Sound Card is not connected to the Bpod', last=True) elif len(module) > 1: - log.critical(' └ ✘ More than one Harp Sound Card connected to the Bpod') - issues += 1 + log_fun('fail', 'More than one Harp Sound Card connected to the Bpod', last=True) else: - log.info(f' └ ✔ module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}') + log_fun('pass', f'module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}', + last=True) case _: pass @@ -214,9 +228,9 @@ def query(s_obj, req, n=1): if issues: - logstr = f" ✘ {issues} issue{'s' if issues > 1 else ''} found" - log.info(' ' + '═' * (len(logstr) - 3)) - log.critical(logstr) + msg = f" ✘ {issues} issue{'s' if issues > 1 else ''} found" + log.info(' ' + '═' * (len(msg) - 3)) + log.critical(msg) else: log.info(' ══════════════════') - log.info(' ✔ no issues found') + log.info('\033[1m ✔ no issues found')