-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_data.py
63 lines (52 loc) · 2.76 KB
/
stream_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import argparse
import time
import numpy as np
import brainflow
from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds
from brainflow.data_filter import DataFilter, FilterTypes
def main():
parser = argparse.ArgumentParser()
# use docs to check which parameters are required for specific board, e.g. for Cyton - set serial port
parser.add_argument('--ip-port', type=int, help='ip port', required=False, default=0)
parser.add_argument('--ip-protocol', type=int, help='ip protocol, check IpProtocolType enum', required=False,
default=0)
parser.add_argument('--ip-address', type=str, help='ip address', required=False, default='')
parser.add_argument('--serial-port', type=str, help='serial port', required=False, default='COM4')
parser.add_argument('--mac-address', type=str, help='mac address', required=False, default='')
parser.add_argument('--other-info', type=str, help='other info', required=False, default='')
parser.add_argument('--streamer-params', type=str, help='other info', required=False, default='')
parser.add_argument('--board-id', type=int, help='board id, check docs to get a list of supported boards',
required=True, default='0') # board_id = 0 / BoardIds.CYTON_BOARD.value
parser.add_argument('--log', action='store_true')
args = parser.parse_args()
params = BrainFlowInputParams()
params.ip_port = args.ip_port
params.serial_port = args.serial_port
params.mac_address = args.mac_address
params.other_info = args.other_info
params.ip_address = args.ip_address
params.ip_protocol = args.ip_protocol
if (args.log):
BoardShim.enable_dev_board_logger()
else:
BoardShim.disable_board_logger()
board = BoardShim(args.board_id, params)
board.prepare_session()
board.start_stream()
time.sleep(5)
board.config_board('/2') # enable analog mode only for Cyton Based Boards!
time.sleep(5)
data = board.get_board_data()
board.stop_stream()
board.release_session()
"""
data[BoardShim.get_other_channels(args.board_id)[0]] contains cyton end byte
data[BoardShim.get_other_channels(args.board_id)[1....]] contains unprocessed bytes
if end byte is 0xC0 there are accel data in data[BoardShim.get_accel_channels(args.board_id)[....]] else there are zeros
if end byte is 0xC1 there are analog data in data[BoardShim.get_analog_channels(args.board_id)[....]] else there are zeros
"""
print(data[BoardShim.get_other_channels(args.board_id)[0]][0:5]) # should be standard end byte 0xC0
print(data[BoardShim.get_other_channels(args.board_id)[0]][-5:]) # should be analog and byte 0xC1
DataFilter.write_file(data, 'cyton_data.csv', 'w')
if __name__ == "__main__":
main()