From 41dca79ed1ebbce3e7240c19cf8ebea1c76612f1 Mon Sep 17 00:00:00 2001 From: Etienne de Montalivet Date: Mon, 18 Dec 2023 14:36:46 +0100 Subject: [PATCH] fix: micromed buffer --- extern/read_tcp_to_epoch.py | 125 ---------------------- extern/tcp_to_epoch.py | 201 ++++++++++++++++++++++++++++++++++++ micromed_io/buffer.py | 2 +- micromed_io/in_out.py | 17 +++ 4 files changed, 219 insertions(+), 126 deletions(-) delete mode 100644 extern/read_tcp_to_epoch.py create mode 100644 extern/tcp_to_epoch.py diff --git a/extern/read_tcp_to_epoch.py b/extern/read_tcp_to_epoch.py deleted file mode 100644 index 5b24e41..0000000 --- a/extern/read_tcp_to_epoch.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -Read data sent by Micromed through TCP. -In this case, we use a circular buffer of epoch_duration seconds with epoch_overlap seconds -overlap emulating a sliding window. Each time the buffer is filled in the next slided window, -update_epoch_buffer() returns True and one can access to the window. - -HOW TO USE: -> python read_tcp_to_epoch.py --help -> python read_tcp_to_epoch.py -""" -import socket - -import numpy as np - -from micromed_io.buffer import MicromedBuffer -from micromed_io.tcp import decode_tcp_header_packet, MicromedPacketType -import logging -import argparse - -if __name__ == "__main__": - logging.basicConfig(level=0, format=("%(asctime)s\t\t%(levelname)s\t\t%(message)s")) - parser = argparse.ArgumentParser(description="Read data from a Micromed TCP client") - parser.add_argument( - "-p", - "--port", - type=int, - default=5123, - help="The TCP port number to connect to. The default is 5123.", - ) - parser.add_argument( - "-a", - "--address", - type=str, - default="localhost", - help="the TCP address to connect to. The default is localhost", - ) - parser.add_argument( - "-v", - "--verbosity", - type=int, - choices=[0, 1, 2], - default=1, - help="Increase output verbosity. The default is 1.", - ) - args = parser.parse_args() - - # convert to variable - tcp_port = args.port - tcp_address = args.address - verbosity = args.verbosity - - # create micromed - micromed_buffer = MicromedBuffer(epoch_duration=5, epoch_overlap=2.5) - - # Create a TCP/IP socket - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - # Bind the socket to the port - server_address = (tcp_address, tcp_port) - if verbosity >= 1: - logging.info("starting up on %s port %s" % server_address) - sock.bind(server_address) - sock.listen(1) - - sock.settimeout(5) - DONE = False - connection = None - - if verbosity >= 1: - logging.info("Waiting for a connection...") - while connection is None: - # Wait for a connection - try: - connection, client_address = sock.accept() - if verbosity >= 1: - logging.info("Connection from %s port %s" % server_address) - except Exception as e: - logging.warning(e) - - sock.settimeout(None) - - try: - while True: - header = connection.recv(2048) # 10 is enoughbut more is fine too - b_header = bytearray(header) - packet_type, next_packet_size = decode_tcp_header_packet(b_header) - - if packet_type is not None: - data = connection.recv(next_packet_size) - b_data = bytearray(data) - - if packet_type == MicromedPacketType.HEADER: - micromed_buffer.decode_data_header_packet(b_data) - if verbosity >= 1: - logging.info("Micromed header") - print(micromed_buffer.micromed_header) - - elif packet_type == MicromedPacketType.EEG_DATA: - if not micromed_buffer.decode_data_eeg_packet(b_data): - logging.error("Error in EEG data packet") - if micromed_buffer.update_epoch_buffer(): - logging.info("Buffer full: PROCESS HERE") - print(micromed_buffer.current_epoch.shape) - print(micromed_buffer.current_epoch) - - elif packet_type == MicromedPacketType.NOTE: - # TODO: check tcp note parsing - micromed_buffer.decode_operator_note_packet(b_data) - if verbosity >= 1: - logging.info("Note") - print(b_data) - else: - raise ValueError( - f"ERROR in packet ! Unknown tcp_data_type: {packet_type}" - ) - else: - raise ValueError("ERROR: Wrong header. Skipping data") - - finally: - # Clean up the connection - if connection is not None: - if verbosity >= 1: - logging.info("Closing the connection") - connection.close() - sock.close() diff --git a/extern/tcp_to_epoch.py b/extern/tcp_to_epoch.py new file mode 100644 index 0000000..8dec792 --- /dev/null +++ b/extern/tcp_to_epoch.py @@ -0,0 +1,201 @@ +""" +Read data sent by Micromed through TCP and store it into a buffer. Every +time the buffer is full, read the epoch and print it + +How to use +========== + +.. code:: bash + + $ python tcp_to_epoch.py --help +""" +import logging +import socket +from datetime import datetime +import click + + +import micromed_io.tcp as mmio_tcp +from micromed_io.buffer import MicromedBuffer + + +def recvall(sock, n): + # Helper function to recv n bytes or return None if EOF is hit + data = bytearray() + while len(data) < n: + packet = sock.recv(n - len(data)) + if not packet: + return None + data.extend(packet) + return data + + +@click.command(context_settings=dict(max_content_width=120)) +@click.option( + "--address", + "-a", + default="localhost", + type=str, + required=False, + help="the TCP address to use for the server (your IP)", + show_default=True, +) +@click.option( + "--port", + "-p", + default=5123, + type=int, + required=False, + help="The TCP port number to use", + show_default=True, +) +@click.option( + "--verbosity", + "-v", + default="1", + type=click.Choice(["0", "1", "2"]), + required=False, + help="Increase output verbosity", + show_default=True, +) +@click.option( + "--epoch-size", + default=5.0, + type=float, + required=False, + help="The epoch/window size in sec", + show_default=True, +) +@click.option( + "--overlap", + default=2.5, + type=float, + required=False, + help="The overlap between 2 successive epochs in sec", + show_default=True, +) +def run( + address: str = "localhost", + port: int = 5123, + epoch_size: float = 5.0, + overlap: float = 2.5, + verbosity: int = 1, +) -> None: + """Read online TCP data from Micromed device and store it into a buffer. Every time the buffer is full, print it.""" + logging.basicConfig( + level=0, + format=( + "[%(asctime)s - %(filename)s:%(lineno)d]\t\t%(levelname)s\t\t%(message)s" + ), + ) + + verbosity = int(verbosity) # because of click choice... + + while True: + # Create a IPv4 based (AF_INET) TCP (SOCK_STREAM) connection + # https://docs.python.org/3/library/socket.html#example + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Bind the socket to the port + server_address = (address, port) + if verbosity >= 1: + logging.info("starting up on %s port %s" % server_address) + sock.bind(server_address) + sock.listen(1) + + sock.settimeout(5) + connection = None + + logging.info("Waiting for a connection...") + while connection is None: + # Wait for a connection + try: + connection, _ = sock.accept() + if verbosity >= 1: + logging.info("Connection from %s port %s" % server_address) + except Exception as e: + logging.warning(e) + + sock.settimeout(None) + + # create micromed buffer + micromed_buffer = MicromedBuffer( + epoch_duration=epoch_size, epoch_overlap=overlap + ) + previous_eeg_epoch_time = datetime.now() + try: + while True: + header = connection.recv(10) # 10 is enoughbut more is fine too + b_header = bytearray(header) + packet_type, next_packet_size = mmio_tcp.decode_tcp_header_packet( + b_header + ) + + if packet_type is not None: + data = recvall(connection, next_packet_size) + b_data = bytearray(data) + + if packet_type == mmio_tcp.MicromedPacketType.HEADER: + micromed_buffer.decode_data_header_packet(b_data) + + logging.info("Got Micromed header.") + if verbosity >= 1: + logging.debug( + f"n_channels={micromed_buffer.micromed_header.nb_of_channels}, " + + f"sfreq={micromed_buffer.sfreq}, " + + f"first 10 ch_names: {micromed_buffer.micromed_header.ch_names[:10]}" + ) + + elif packet_type == mmio_tcp.MicromedPacketType.EEG_DATA: + if not micromed_buffer.decode_data_eeg_packet(b_data): + logging.error( + "Error in EEG data packet. (You can drop this data or " + + "interpolate or whatever)" + ) + if micromed_buffer.update_epoch_buffer(): + logging.info( + f"Buffer of size {micromed_buffer.current_epoch.shape} " + + "is full: PROCESS HERE using the micromed_buffer.current_epoch." + + f" [delta time = {(datetime.now() - previous_eeg_epoch_time).total_seconds()}s]" + ) + print(micromed_buffer.current_epoch) + previous_eeg_epoch_time = datetime.now() + + elif packet_type == mmio_tcp.MicromedPacketType.NOTE: + note_sample, note_value = mmio_tcp.decode_tcp_note_packet( + b_data + ) + if verbosity >= 1: + logging.info( + f"Received note: sample={note_sample} ,value={note_value}" + ) + + elif packet_type == mmio_tcp.MicromedPacketType.MARKER: + marker_sample, marker_value = mmio_tcp.decode_tcp_marker_packet( + b_data + ) + if verbosity >= 1: + logging.info( + f"Received marker: sample={marker_sample} ,value={marker_value}" + ) + else: + raise ValueError( + f"ERROR in packet ! Unknown tcp_data_type: {packet_type}" + ) + else: + raise ValueError("ERROR: Wrong header. Skipping data") + + except Exception as e: + logging.error(e) + + finally: + # Clean up the connection + if connection is not None: + if verbosity >= 1: + logging.info("Closing the connection") + connection.close() + sock.close() + + +if __name__ == "__main__": + run() diff --git a/micromed_io/buffer.py b/micromed_io/buffer.py index 2036452..7bb66de 100644 --- a/micromed_io/buffer.py +++ b/micromed_io/buffer.py @@ -162,6 +162,6 @@ def update_epoch_buffer(self) -> bool: self.epoch_buffer[ :, n_overlaping_samples : (n_overlaping_samples + over_size) ] = np.copy(self.current_data_eeg[self.picks_id, remaining_size:]) - self.current_buffer_length = over_size + self.current_buffer_length = over_size + n_overlaping_samples return has_new_epoch diff --git a/micromed_io/in_out.py b/micromed_io/in_out.py index a779b84..c971247 100644 --- a/micromed_io/in_out.py +++ b/micromed_io/in_out.py @@ -94,6 +94,23 @@ def decode_data_header_packet(self, packet: bytearray) -> None: self.micromed_header.header_type = self._header["header_type"] self.micromed_header.stored_channels = self._header["order"] self.micromed_header.ch_names = [d["chan_name"] for d in self._header["chans"]] + + # construct the indexes of channels to pick in epoch buffer + if self.picks is None: + self.picks_id = np.arange(len(self.micromed_header.stored_channels)) + else: + # Check that all channels are pickable + for ch in self.picks: + if ch not in self.micromed_header.ch_names: + raise ValueError( + f"[MICROMED IO] {ch} is not in " + + f"{self.micromed_header.ch_names}. Please fix it in config.ini file." + ) + self.picks_id = np.array( + [self.micromed_header.ch_names.index(ch) for ch in self.picks], + dtype=int, + ) + # elec_refs is a list of electrode references. Dim 2 is # [logic_min, logic_max, logic_ground, phy_min, phy_max, units] self.micromed_header.elec_refs = [