Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
bimac committed Aug 15, 2023
1 parent 52dd19d commit a33eb82
Showing 1 changed file with 84 additions and 70 deletions.
154 changes: 84 additions & 70 deletions scripts/hardware_validation/verify_hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,46 @@
# set up logging
log = setup_logger('iblrig', level='DEBUG')

issues = 0


# function for querying a serial device
def query(s_obj, req, n=1):
def query(s_obj, req, n=1, end: bool = False):
s_obj.write(req)
return s.read(n)


issues = 0
def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):
global issues
tree = '└' if last else '├'
match msg_type:
case 'head':
pass
case 'pass':
level = 'info'
symbol = '✔'
case 'info':
level = 'info'
symbol = 'i'
case 'warn':
level = 'warning'
symbol = '!'
case 'fail':
level = 'critical'
symbol = '✘'
issues += 1
case _:
level = 'critical'
symbol = '?'

if msg_type == 'head':
log.info('\033[1m' + msg)
else:
getattr(log, level)(f' {tree} {symbol} {msg}')


# read hardware_settings.yaml
log.info('Checking hardware_settings.yaml:')
log_fun('head', 'Checking hardware_settings.yaml:')
file_settings = Path(iblrig.__file__).parents[1].joinpath('settings', 'hardware_settings.yaml')
hw_settings = iblrig.path_helper.load_settings_yaml(file_settings)

Expand All @@ -39,98 +68,89 @@ def query(s_obj, req, n=1):
# check for undefined serial ports (warn only)
tmp = [(k, v) for k, v in ports.items() if v is None]
for p in tmp:
log.warning(f" ├ ✘ {p[0]} is undefined (OK if you don't want to use it)")
log_fun('fail', f"{p[0]} is undefined (OK if you don't want to use it)")
if not any(tmp):
log.info(' ├ ✔ no undefined serial ports found')
log_fun('pass', 'no undefined serial ports found')
ports = {k: v for k, v in ports.items() if v is not None}

# check for duplicate serial ports
seen = set()
for p in [x for x in ports.values() if x in seen or seen.add(x)]:
log.critical(f' ├ ✘ Duplicate serial port: "{p}"')
issues += 1
log_fun('fail', f'Duplicate serial port: "{p}"')
else:
log.info(' ├ ✔ no duplicate serial ports found')
log_fun('pass', 'no duplicate serial ports found')

# collect valid ports
match platform.system():
case 'Windows':
valid_ports = [f'COM{i + 1}' for i in range(256)]
case 'Linux':
valid_ports = glob('/dev/tty[A-Za-z]*')
valid_ports = glob('/dev/tty*')
case _:
log.critical(f' ├ ✘ Unsupported platform: "{platform.system()}"')
issues += 1
raise Exception(f'Unsupported platform: "{platform.system()}"')

# check for invalid port-strings
for p in [(k, v) for k, v in ports.items() if v not in valid_ports]:
log.critical(f'✘ Invalid serial port: "{p[1]}"')
issues += 1
log_fun('fail', f'Invalid serial port: "{p[1]}"', last=True)
else:
log.info(' └ ✔ no invalid port-strings found')
log_fun('pass', 'no invalid port-strings found', last=True)

# check individual serial ports
port_info = [i for i in serial.tools.list_ports.comports()]
for (description, port) in ports.items():
log.info(f'Checking serial port {description} ({port}):')
log_fun('head', f'Checking serial port {description} ({port}):')

# check if serial port exists
try:
info = [i for i in serial.tools.list_ports.comports() if i.device == port][0]
except IndexError:
log.critical(
f' ├ ✘ "{port}" ({description}) cannot be found - is the device connected to the computer?')
issues += 1
log_fun('fail', f'{port} ({description}) cannot be found - is the device connected to the computer?', last=True)
continue
else:
log.info(' ├ ✔ serial port exists')
log_fun('pass', 'serial port exists')

# check if serial ports can be connected to
try:
s = Serial(port, timeout=1, writeTimeout=1)
except serial.SerialException:
log.critical(f' ├ ✘ Cannot connect to "{port}" ({description}) - is another process using the port?')
issues += 1
log_fun('fail', f'Cannot connect to {port} ({description}) - is another process using the port?', last=True)
continue
else:
log.info(' ├ ✔ serial port can be connected to')
log_fun('pass', 'serial port can be connected to')

# check correct assignments of serial ports
ok = False
match description:
case "COM_BPOD":
if query(s, b'6') == b'5':
device_name = 'Bpod Finite State Machine'
ok = query(s, b'6') == b'5'
if ok:
s.write(b'Z')
log.info(' └ ✔ device seems to be a Bpod Finite State Machines')
else:
log.critical(
f' └ ✘ Device on port "{port}" does not appear to be a Bpod.')
issues += 1
case "COM_F2TTL":
device_name = 'Frame2TTL'
try:
s.write(b'C')
except serial.serialutil.SerialTimeoutException:
port_info = serial.tools.list_ports.comports()
port_info = next((p for p in port_info if p.name == s.name), None)

# SAMD21 mini issue: not recognized by windows after reboot. Confirmed by Josh.
log.critical(
f' └ ✘ Writing to port "{port}" timed out. Try to unplug and plug the device.')
issues += 1
if s.read() == (218).to_bytes(1, 'little'):
log.info(' └ ✔ device seems to be a Frame2TTL')
else:
log.critical(
f' └ ✘ Device on port "{port}" does not appear to be a Frame2TTL.')
issues += 1
log_fun('fail', f'writing to port {port} timed out. Try to unplug and plug the device.', last=True)
continue
finally:
ok = s.read() == (218).to_bytes(1, 'little')
case "COM_ROTARY_ENCODER":
if len(query(s, b'Q', 2)) > 1 and query(s, b'P00', 1) == (1).to_bytes(1, 'little'):
log.info(' └ ✔ device seems to be a Rotary Encoder Module')
else:
log.critical(f' └ ✘ Device on port "{port}" does not appear to be a Rotary Encoder Module.')
issues += 1
device_name = 'Rotary Encoder Module'
ok = len(query(s, b'Q', 2)) > 1 and query(s, b'P00', 1) == (1).to_bytes(1, 'little')
case _:
log.critical(' └ ✘ How did you get here??')
issues += 1
raise Exception('How did you get here??')
s.close()

if ok:
log_fun('pass', f'Device on port {port} seems to be a {device_name}', last=True)
else:
log_fun('fail', f'Device on port {port} does not appear to be a {device_name}', last=True)

# To Do: Look into this required delay
time.sleep(.02)

Expand All @@ -139,68 +159,62 @@ def query(s_obj, req, n=1):
modules = [m for m in bpod.bpod_modules.modules if m.connected]

if 'COM_ROTARY_ENCODER' in ports.keys():
log.info('Checking Rotary Encoder Module:')
log_fun('head', 'Checking Rotary Encoder Module:')
module = [m for m in modules if m.name.startswith('RotaryEncoder')]
if len(module) == 0:
log.critical(' ├ ✘ Rotary Encoder Module is not connected to the Bpod')
issues += 1
log_fun('fail', 'Rotary Encoder Module is not connected to the Bpod')
elif len(module) > 1:
log.critical(' ├ ✘ More than one Rotary Encoder Module connected to the Bpod')
issues += 1
log_fun('fail', 'More than one Rotary Encoder Module connected to the Bpod')
else:
log.info(f' ├ ✔ module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}')
log_fun('pass', f'module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}')

s = serial.Serial(ports['COM_ROTARY_ENCODER'])
s.write(b'I')
time.sleep(.02)
if s.in_waiting == 0:
s.write(b'x')
v = "1.x" if s.read(1) == (1).to_bytes(1, 'little') else "2+"
log.info(f' ├ i hardware version: {v}')
log.info(f' ├ i firmware version: {bpod.modules[0].firmware_version}')
log_fun('info', f'hardware version: {v}')
log_fun('info', f'firmware version: {bpod.modules[0].firmware_version}')

s.write(b'Z')
p = np.frombuffer(query(s, b'Q', 2), dtype=np.int16)[0]
log.warning(' ├ ! please move the wheel to the left (animal\'s POV) by a quarter turn')
log_fun('warn', 'please move the wheel to the left (animal\'s POV) by a quarter turn')
while np.abs(p) < 200:
p = np.frombuffer(query(s, b'Q', 2), dtype=np.int16)[0]
if p > 0:
log.critical(' └ ✘ Rotary encoder seems to be wired incorrectly - try swapping A and B')
issues += 1
log_fun('fail', 'Rotary encoder seems to be wired incorrectly - try swapping A and B', last=True)
else:
log.info(' └ ✔ rotary encoder is wired correctly')
log_fun('pass', 'rotary encoder is wired correctly', last=True)
s.close()

if 'device_sound' in hw_settings and 'OUTPUT' in hw_settings['device_sound']:
match hw_settings['device_sound']['OUTPUT']:
case 'harp':
log.info('Checking Harp Sound Card:')
log_fun('head', 'Checking Harp Sound Card:')

dev = usb.core.find(idVendor=0x04D8, idProduct=0xEE6A)
if not dev:
log.critical(' ├ ✘ Cannot find Harp Sound Card')
issues += 1
else:
log.info(' ├ ✔ found USB device {:04X}:{:04X} (Harp Sound Card)'.format(dev.idVendor, dev.idProduct))
log_fun('pass', 'found USB device {:04X}:{:04X} (Harp Sound Card)'.format(dev.idVendor, dev.idProduct))

dev = next((p for p in serial.tools.list_ports.comports() if (p.vid == 1027 and p.pid == 24577)), None)
if not dev:
log.critical(
' ├ ✘ Cannot find Harp Sound Card\'s Serial port - did you plug in *both* USB ports of the device?')
issues += 1
else:
log.info(' ├ ✔ found USB device {:04X}:{:04X} (FT232 UART), serial port: {}'.format(dev.vid, dev.pid,
log_fun('pass', 'found USB device {:04X}:{:04X} (FT232 UART), serial port: {}'.format(dev.vid, dev.pid,
dev.name))

module = [m for m in modules if m.name.startswith('SoundCard')]
if len(module) == 0:
log.critical(' └ ✘ Harp Sound Card is not connected to the Bpod')
issues += 1
log_fun('fail', 'Harp Sound Card is not connected to the Bpod', last=True)
elif len(module) > 1:
log.critical(' └ ✘ More than one Harp Sound Card connected to the Bpod')
issues += 1
log_fun('fail', 'More than one Harp Sound Card connected to the Bpod', last=True)
else:
log.info(f' └ ✔ module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}')
log_fun('pass', f'module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}',
last=True)
case _:
pass

Expand All @@ -214,9 +228,9 @@ def query(s_obj, req, n=1):


if issues:
logstr = f" ✘ {issues} issue{'s' if issues > 1 else ''} found"
log.info(' ' + '═' * (len(logstr) - 3))
log.critical(logstr)
msg = f" ✘ {issues} issue{'s' if issues > 1 else ''} found"
log.info(' ' + '═' * (len(msg) - 3))
log.critical(msg)
else:
log.info(' ══════════════════')
log.info(' ✔ no issues found')
log.info('\033[1m ✔ no issues found')

0 comments on commit a33eb82

Please sign in to comment.