This repository has been archived by the owner on Jul 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
kaiyu_odor_rig.py
85 lines (73 loc) · 3.48 KB
/
kaiyu_odor_rig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from __future__ import print_function, division
from modular_device import ModularDevices
import time
import yaml
DEBUG = True
BAUDRATE = 9600
class KaiyuOdorRig(object):
'''
'''
def __init__(self,*args,**kwargs):
if 'debug' in kwargs:
self._debug = kwargs['debug']
else:
kwargs.update({'debug': DEBUG})
self._debug = DEBUG
self._debug_print('Initializing KaiyuOdorRig...')
modular_devices = ModularDevices()
modular_device_name = 'power_switch_controller'
try:
dev_dict = modular_devices[modular_device_name]
except KeyError:
raise RuntimeError('Could not find ' + modular_device_name + '. Check connections and permissions.')
if len(dev_dict) > 1:
raise RuntimeError('More than one ' + modular_device_name + ' found. Only one should be connected.')
self.psc = dev_dict[dev_dict.keys()[0]]
self._debug_print('Found ' + modular_device_name + ' on port ' + str(self.psc.get_port()))
modular_device_name = 'aalborg_mfc_interface'
try:
dev_dict = modular_devices[modular_device_name]
except KeyError:
raise RuntimeError('Could not find ' + modular_device_name + '. Check connections and permissions.')
if len(dev_dict) > 1:
raise RuntimeError('More than one ' + modular_device_name + ' found. Only one should be connected.')
self.ami = dev_dict[dev_dict.keys()[0]]
self._debug_print('Found ' + modular_device_name + ' on port ' + str(self.ami.get_port()))
self.pulse_duration = 10
def setup_mfcs(self,mfc_settings_file_path):
mfc_settings_stream = open(mfc_settings_file_path, 'r')
mfc_settings = yaml.load(mfc_settings_stream)
flow_settings = [mfc_settings['mfc_setting_0'],mfc_settings['mfc_setting_1'],0]
self.psc.set_all_channels_off()
self._debug_print('set_all_channels_off()')
self.ami.set_mfc_flows(flow_settings)
self._debug_print('set_mfc_flows(' + str(flow_settings) + ')')
self._debug_print('sleeping for ' + str(mfc_settings['sleep_duration']) + 's...')
time.sleep(mfc_settings['sleep_duration'])
def run_protocol(self,protocol_file_path):
protocol_stream = open(protocol_file_path, 'r')
protocol = yaml.load(protocol_stream)
step_count = 0
for step in protocol:
self._debug_print('running step ' + str(step_count))
self.ami.pulse_bnc_b(self.pulse_duration)
self._debug_print('pulse_bnc_b(' + str(self.pulse_duration) + ')')
self.psc.set_channels_on(step['channels_on'])
self._debug_print('set_channels_on(' + str(step['channels_on']) + ')')
self._debug_print('sleeping for ' + str(step['sleep_duration_on']) + 's...')
time.sleep(step['sleep_duration_on'])
self._debug_print('set_channels_off()')
self.psc.set_all_channels_off()
self._debug_print('set_all_channels_off()')
self._debug_print('sleeping for ' + str(step['sleep_duration_off']) + 's...')
time.sleep(step['sleep_duration_off'])
self._debug_print('')
step_count += 1
def _debug_print(self, *args):
if self._debug:
print(*args)
# -----------------------------------------------------------------------------------------
if __name__ == '__main__':
debug = True
odor_rig = KaiyuOdorRig(debug=debug)
odor_rig.run()