Skip to content

Commit

Permalink
add checks for harp soundcard and improve log format
Browse files Browse the repository at this point in the history
  • Loading branch information
bimac committed Aug 14, 2023
1 parent ee1455c commit a416ac3
Showing 1 changed file with 98 additions and 41 deletions.
139 changes: 98 additions & 41 deletions scripts/hardware_validation/verify_hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from glob import glob
from pathlib import Path
import time
import usb.core

from serial import Serial
import serial.tools.list_ports
Expand All @@ -10,15 +11,12 @@
import numpy as np
# import pandas as pd

from iblutil.util import setup_logger
import iblrig.base_tasks
from pybpodapi.protocol import Bpod # StateMachine

# set up logging
logging.basicConfig(
format="%(levelname)-8s %(message)s",
level=logging.INFO
)
log = logging.getLogger(__name__)
log = setup_logger('iblrig', level='DEBUG')


# function for querying a serial device
Expand All @@ -27,6 +25,8 @@ def query(s_obj, req, n=1):
return s.read(n)


issues = 0

# read hardware_settings.yaml
log.info('Checking hardware_settings.yaml:')
file_settings = Path(iblrig.__file__).parents[1].joinpath('settings', 'hardware_settings.yaml')
Expand All @@ -39,79 +39,92 @@ def query(s_obj, req, n=1):
# check for undefined serial ports (warn only)
tmp = [(k, v) for k, v in ports.items() if v is None]
for p in tmp:
log.warning("✗ {} is undefined (OK if you don't want to use it)".format(p[0]))
log.warning(f" ├ ✘ {p[0]} is undefined (OK if you don't want to use it)")
if not any(tmp):
log.info(' no undefined serial ports found')
log.info(' ├ ✔ no undefined serial ports found')
ports = {k: v for k, v in ports.items() if v is not None}

# check for duplicate serial ports
seen = set()
for p in [x for x in ports.values() if x in seen or seen.add(x)]:
raise Exception('Duplicate serial port: "{}"'.format(p))
log.info('✓ no duplicate serial ports found')
log.critical(f' ├ ✘ Duplicate serial port: "{p}"')
issues += 1
log.info(' ├ ✔ no duplicate serial ports found')

# collect valid ports
match platform.system():
case 'Windows':
valid_ports = ['COM{:d}'.format(i + 1) for i in range(256)]
valid_ports = [f'COM{i + 1}' for i in range(256)]
case 'Linux':
valid_ports = glob('/dev/tty[A-Za-z]*')
case _:
raise Exception('Unsupported platform: "{}"'.format(platform.system()))
log.critical(f' ├ ✘ Unsupported platform: "{platform.system()}"')
issues += 1

# check for invalid port-strings
for p in [(k, v) for k, v in ports.items() if v not in valid_ports]:
raise Exception('Invalid serial port: "{}"'.format(p[1]))
log.info('✓ no invalid port-strings found')
log.critical(f'✘ Invalid serial port: "{p[1]}"')
issues += 1
log.info(' └ ✔ no invalid port-strings found')

# check individual serial ports
port_info = [i for i in serial.tools.list_ports.comports()]
for (description, port) in ports.items():
log.info('Checking serial port {} ({}):'.format(port, description))
log.info(f'Checking serial port {description} ({port}):')

# check if serial port exists
try:
info = [i for i in serial.tools.list_ports.comports() if i.device == port][0]
except IndexError:
raise Exception(
'"{}" ({}) cannot be found - is the device connected to the computer?'.format(port, description))
log.info('✓ serial port exists')
log.critical(
f' ├ ✘ "{port}" ({description}) cannot be found - is the device connected to the computer?')
issues += 1
log.info(' ├ ✔ serial port exists')

# check if serial ports can be connected to
try:
s = Serial(port, timeout=1, writeTimeout=1)
except serial.SerialException:
raise Exception('Cannot connect to "{}" ({}) - is another process using the port?'.format(port, description))
log.info('✓ serial port can be connected to')
log.critical(f' ├ ✘ Cannot connect to "{port}" ({description}) - is another process using the port?')
issues += 1
log.info(' ├ ✔ serial port can be connected to')

# check correct assignments of serial ports
match description:
case "COM_BPOD":
if query(s, b'6') == b'5':
s.write(b'Z')
log.info(' device seems to be a Bpod Finite State Machines')
log.info(' └ ✔ device seems to be a Bpod Finite State Machines')
else:
raise Exception(
'Device on port "{}" does not appear to be a Bpod.'.format(port))
log.critical(
f' └ ✘ Device on port "{port}" does not appear to be a Bpod.')
issues += 1
case "COM_F2TTL":
try:
s.write(b'C')
except serial.serialutil.SerialTimeoutException:
port_info = serial.tools.list_ports.comports()
port_info = next((p for p in port_info if p.name == s.name), None)

# SAMD21 mini issue: not recognized by windows after reboot. Confirmed by Josh.
raise Exception(
'Writing to port "{}" timed out. Try to unplug and plug the device.'.format(port))
log.critical(
f' └ ✘ Writing to port "{port}" timed out. Try to unplug and plug the device.')
issues += 1
if s.read() == (218).to_bytes(1, 'little'):
log.info(' device seems to be a Frame2TTL')
log.info(' └ ✔ device seems to be a Frame2TTL')
else:
raise Exception(
'Device on port "{}" does not appear to be a Frame2TTL.'.format(port))
log.critical(
f' └ ✘ Device on port "{port}" does not appear to be a Frame2TTL.')
issues += 1
case "COM_ROTARY_ENCODER":
if len(query(s, b'Q', 2)) > 1 and query(s, b'P00', 1) == (1).to_bytes(1, 'little'):
log.info(' device seems to be a Rotary Encoder Module')
log.info(' └ ✔ device seems to be a Rotary Encoder Module')
else:
raise Exception('Device on port "{}" does not appear to be a Rotary Encoder Module.'.format(port))
log.critical(f' └ ✘ Device on port "{port}" does not appear to be a Rotary Encoder Module.')
issues += 1
case _:
raise Exception('How did you get here??')
log.critical(' └ ✘ How did you get here??')
issues += 1
s.close()

# To Do: Look into this required delay
Expand All @@ -125,36 +138,80 @@ def query(s_obj, req, n=1):
log.info('Checking Rotary Encoder Module:')
module = [m for m in modules if m.name.startswith('RotaryEncoder')]
if len(module) == 0:
raise Exception('Rotary Encoder Module is not connected to the Bpod')
log.critical(' ├ ✘ Rotary Encoder Module is not connected to the Bpod')
issues += 1
if len(module) > 1:
raise Exception('More than one Rotary Encoder Module connected to the Bpod')
log.info('✓ module "{}" is connected to the Bpod\'s module port #{}'.format(module[0].name, module[0].serial_port))
log.critical(' ├ ✘ More than one Rotary Encoder Module connected to the Bpod')
issues += 1
log.info(
f' ├ ✔ module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}')

s = serial.Serial(ports['COM_ROTARY_ENCODER'])
s.write(b'I')
time.sleep(.02)
if s.in_waiting == 0:
s.write(b'x')
v = "1.x" if s.read(1) == (1).to_bytes(1, 'little') else "2+"
log.info(hardware version: {}'.format(v))
log.info(firmware version: {}'.format(bpod.modules[0].firmware_version))
log.info(f' ├ i hardware version: {v}')
log.info(f' ├ i firmware version: {bpod.modules[0].firmware_version}')

s.write(b'Z')
p = np.frombuffer(query(s, b'Q', 2), dtype=np.int16)[0]
log.info('! please move the wheel to the left (animal\'s POV) by a quarter turn')
log.warning(' ├ ! please move the wheel to the left (animal\'s POV) by a quarter turn')
while np.abs(p) < 200:
p = np.frombuffer(query(s, b'Q', 2), dtype=np.int16)[0]
if p > 0:
raise Exception('Rotary encoder seems to be wired incorrectly - try swapping A and B')
log.info('✓ rotary encoder is wired correctly')
log.critical(' └ ✘ Rotary encoder seems to be wired incorrectly - try swapping A and B')
issues += 1
log.info(' └ ✔ rotary encoder is wired correctly')
s.close()

if 'device_sound' in hw_settings and 'OUTPUT' in hw_settings['device_sound']:
match hw_settings['device_sound']['OUTPUT']:
case 'harp':
log.info('Checking Harp Sound Card:')

dev = usb.core.find(idVendor=0x04D8, idProduct=0xEE6A)
if not dev:
log.critical(' ├ ✘ Cannot find Harp Sound Card')
issues += 1
else:
log.info(' ├ ✔ found USB device {:04X}:{:04X} (Harp Sound Card)'.format(dev.idVendor, dev.idProduct))

dev = next((p for p in serial.tools.list_ports.comports() if (p.vid == 1027 and p.pid == 24577)), None)
if not dev:
log.critical(
' ├ ✘ Cannot find Harp Sound Card\'s Serial port - did you plug in *both* USB ports of the device?')
issues += 1
else:
log.info(' ├ ✔ found USB device {:04X}:{:04X} (FT232 UART), serial port: {}'.format(dev.vid, dev.pid,
dev.name))

module = [m for m in modules if m.name.startswith('SoundCard')]
if len(module) == 0:
log.critical(' └ ✘ Harp Sound Card is not connected to the Bpod')
issues += 1
elif len(module) > 1:
log.critical(' └ ✘ More than one Harp Sound Card connected to the Bpod')
issues += 1
else:
log.info(f' └ ✔ module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}')
case _:
pass

bpod.close()

# TO DO: bpod
# TO DO: BNC connections
# TO DO: camera
# TO DO: sound output (harp can be matched through vendor/device ID?)
# TO DO: sound output
# TO DO: ambient module

log.info('---------------')
log.info('No issues found')

if issues:
logstr = f' ✘ {issues} issues found' if issues > 1 else ' ✘ 1 issue found'
log.info(' ' + '═' * (len(logstr) - 3))
log.critical(logstr)
else:
log.info(' ══════════════════')
log.info(' ✔ no issues found')

0 comments on commit a416ac3

Please sign in to comment.