From cffd91ffffc25496b5236e601fbb038dceb335f1 Mon Sep 17 00:00:00 2001 From: Ben Hall Date: Mon, 18 Mar 2024 12:39:22 -0400 Subject: [PATCH] re-worked writing --- py_data_acq/broadcast-test.py | 4 +- .../py_data_acq/common/common_types.py | 11 +- py_data_acq/py_data_acq/mcap_writer/writer.py | 76 ++++++++--- .../py_data_acq/web_server/mcap_server.py | 51 ++++---- py_data_acq/runner.py | 122 ++++++++++++------ 5 files changed, 177 insertions(+), 87 deletions(-) diff --git a/py_data_acq/broadcast-test.py b/py_data_acq/broadcast-test.py index f40fb8e..8cb756f 100644 --- a/py_data_acq/broadcast-test.py +++ b/py_data_acq/broadcast-test.py @@ -10,8 +10,8 @@ from hytech_np_proto_py import hytech_pb2 # Define the IP and port for the UDP socket -# bus1 = can.interface.Bus('can0', bustype='virtual') -bus1 = can.Bus(channel="can0", interface='socketcan') +bus1 = can.interface.Bus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv6, interface="udp_multicast") +# bus1 = can.Bus(channel="can0", interface='socketcan') def main(): path_to_dbc = os.environ.get('DBC_PATH') full_path = os.path.join(path_to_dbc, "hytech.dbc") diff --git a/py_data_acq/py_data_acq/common/common_types.py b/py_data_acq/py_data_acq/common/common_types.py index d911178..151dad0 100644 --- a/py_data_acq/py_data_acq/common/common_types.py +++ b/py_data_acq/py_data_acq/common/common_types.py @@ -3,4 +3,13 @@ class QueueData(): def __init__(self, schema_name: str, msg): self.name = schema_name self.data = msg.SerializeToString() - self.pb_msg = msg \ No newline at end of file + self.pb_msg = msg + +class MCAPServerStatusQueueData(): + def __init__(self, writing_status: bool, writing_file: str): + self.is_writing = writing_status + self.writing_file = writing_file + +class MCAPFileWriterCommand(): + def __init__(self, write: bool): + self.writing = write \ No newline at end of file diff --git a/py_data_acq/py_data_acq/mcap_writer/writer.py b/py_data_acq/py_data_acq/mcap_writer/writer.py index b76c886..4b7ef64 100644 --- a/py_data_acq/py_data_acq/mcap_writer/writer.py +++ b/py_data_acq/py_data_acq/mcap_writer/writer.py @@ -3,48 +3,82 @@ import time from mcap_protobuf.writer import Writer from datetime import datetime -from typing import ( - Any, - Optional, - Set -) +from typing import Any, Optional, Set import os -class HTPBMcapWriter(Writer): - def __init__(self, mcap_base_path, msg_names: list[str], msg_classes): + +class HTPBMcapWriter: + def __init__(self, mcap_base_path, init_writing: bool): self.base_path = mcap_base_path - messages = msg_names - self.message_classes = msg_classes - now = datetime.now() - date_time_filename = now.strftime("%m_%d_%Y_%H_%M_%S"+".mcap") - self.actual_path = os.path.join(mcap_base_path, date_time_filename) - self.writing_file = open(self.actual_path, "wb") - super().__init__(self.writing_file) + if init_writing: + now = datetime.now() + date_time_filename = now.strftime("%m_%d_%Y_%H_%M_%S" + ".mcap") + self.actual_path = os.path.join(mcap_base_path, date_time_filename) + self.writing_file = open(self.actual_path, "wb") + self.mcap_writer_class = Writer(self.writing_file) + self.is_writing = True + else: + self.is_writing = False + self.actual_path = None + self.writing_file = None + self.mcap_writer_class = None def __await__(self): async def closure(): print("await") return self + return closure().__await__() + def __enter__(self): return self + def __exit__(self, exc_, exc_type_, tb_): - super().finish() + self.mcap_writer_class.finish() self.writing_file.close() + def __aenter__(self): return self + async def __aexit__(self, exc_type: Any, exc_val: Any, traceback: Any): - super().finish() + self.mcap_writer_class.finish() self.writing_file.close() + + async def close_writer(self): + if self.is_writing: + self.is_writing = False + self.mcap_writer_class.finish() + self.writing_file.close() + + return True + + async def open_new_writer(self): + if self.is_writing: + self.is_writing = False + self.mcap_writer_class.finish() + self.writing_file.close() + + now = datetime.now() + date_time_filename = now.strftime("%m_%d_%Y_%H_%M_%S" + ".mcap") + self.actual_path = os.path.join(self.base_path, date_time_filename) + self.writing_file = open(self.actual_path, "wb") + self.mcap_writer_class = Writer(self.writing_file) + self.is_writing = True + + return True + async def write_msg(self, msg): - super().write_message(topic=msg.DESCRIPTOR.name+"_data", message=msg, log_time=int(time.time_ns()), publish_time=int(time.time_ns())) - self.writing_file.flush() + if self.is_writing: + self.mcap_writer_class.write_message( + topic=msg.DESCRIPTOR.name + "_data", + message=msg, + log_time=int(time.time_ns()), + publish_time=int(time.time_ns()), + ) + self.writing_file.flush() return True async def write_data(self, queue): msg = await queue.get() if msg is not None: return await self.write_msg(msg.pb_msg) - - - diff --git a/py_data_acq/py_data_acq/web_server/mcap_server.py b/py_data_acq/py_data_acq/web_server/mcap_server.py index 7402a8c..4cdee7e 100644 --- a/py_data_acq/py_data_acq/web_server/mcap_server.py +++ b/py_data_acq/py_data_acq/web_server/mcap_server.py @@ -3,19 +3,25 @@ import json from py_data_acq.mcap_writer.writer import HTPBMcapWriter import py_data_acq.common.protobuf_helpers as pb_helpers +from py_data_acq.common.common_types import MCAPServerStatusQueueData, MCAPFileWriterCommand from typing import Any class MCAPServer: - def __init__(self, host='0.0.0.0', port=6969, mcap_writer=None,path='.'): + def __init__(self, writer_command_queue: asyncio.Queue, writer_status_queue: asyncio.Queue, init_writing= True, init_filename = '.',host='0.0.0.0', port=6969): self.host = host self.port = port - self.mcap_writer = mcap_writer - self.path = path - if mcap_writer is not None: - self.mcap_status_message = f"An MCAP file is being written: {self.mcap_writer.writing_file.name}" + + self.is_writing = init_writing + self.cmd_queue = writer_command_queue + self.status_queue = writer_status_queue + + if(init_writing): + self.is_writing = True + self.mcap_status_message = f"An MCAP file is being written: {init_filename}" else: + self.is_writing = False self.mcap_status_message = "No MCAP file is being written." - self.html_content = b""" + self.html_content = b""" @@ -75,31 +81,30 @@ async def serve_file(self): header = b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n" return header + current_html_content - async def start_mcap_generation(self): - if self.mcap_writer is None: - list_of_msg_names, msg_pb_classes = pb_helpers.get_msg_names_and_classes() - self.mcap_writer = HTPBMcapWriter(self.path, list_of_msg_names, msg_pb_classes) - self.mcap_status_message = f"An MCAP file is being written: {self.mcap_writer.writing_file.name}" - - async def stop_mcap_generation(self): - if self.mcap_writer is not None: - # await self.mcap_writer.__aexit__(None, None, None) - self.mcap_writer.finish() - self.mcap_writer.writing_file.close() - self.mcap_status_message = "No MCAP file is being written." - self.mcap_writer = None + async def start_stop_mcap_generation(self, input_cmd: bool): + await self.cmd_queue.put(MCAPFileWriterCommand(input_cmd)) + while True: + # Wait for the next message from the queue + message = await self.status_queue.get() + if message.is_writing: + self.is_writing = True + self.mcap_status_message = f"An MCAP file is being written: {message.writing_file}" + else: + self.is_writing = False + self.mcap_status_message = f"No MCAP file is being written." + # Important: Mark the current task as done to allow the queue to proceed + self.status_queue.task_done() def handle_command(self, command): if command == '/start': - asyncio.create_task(self.start_mcap_generation()) + asyncio.create_task(self.start_stop_mcap_generation(True)) return "MCAP generation started." elif command == '/stop': - asyncio.create_task(self.stop_mcap_generation()) + asyncio.create_task(self.start_stop_mcap_generation(False)) return "MCAP generation stopped." else: return "Command not recognized." - # Checks if client connected and updates them on different actions async def handle_client(self, reader, writer): addr = writer.get_extra_info('peername') @@ -117,7 +122,7 @@ async def handle_client(self, reader, writer): elif url == '/status': status_response = { "statusMessage": self.mcap_status_message, - "isRecording": self.mcap_writer is not None + "isRecording": self.is_writing } response_bytes = json.dumps(status_response).encode('utf-8') response = (f"HTTP/1.1 200 OK\r\n" diff --git a/py_data_acq/runner.py b/py_data_acq/runner.py index 18d60c0..b954132 100644 --- a/py_data_acq/runner.py +++ b/py_data_acq/runner.py @@ -5,6 +5,10 @@ from py_data_acq.mcap_writer.writer import HTPBMcapWriter from py_data_acq.common.common_types import QueueData import py_data_acq.common.protobuf_helpers as pb_helpers +from py_data_acq.common.common_types import ( + MCAPServerStatusQueueData, + MCAPFileWriterCommand, +) from py_data_acq.web_server.mcap_server import MCAPServer from hytech_np_proto_py import hytech_pb2 import concurrent.futures @@ -16,25 +20,28 @@ import logging # TODO we may want to have a config file handling to set params such as: -# - file save interval for MCAP file # - foxglove server port # - foxglove server ip -# - protobuf binary schema file location and file name # - config to inform io handler (say for different CAN baudrates) can_methods = { - "debug": [UdpMulticastBus.DEFAULT_GROUP_IPv4, 'udp_multicast'], - "local_can_usb_KV": [0, 'kvaser'], - "local_debug": ["vcan0", 'socketcan'] + "debug": [UdpMulticastBus.DEFAULT_GROUP_IPv4, "udp_multicast"], + "local_can_usb_KV": [0, "kvaser"], + "local_debug": ["vcan0", "socketcan"], } + + def find_can_interface(): """Find a CAN interface by checking /sys/class/net/.""" - for interface in os.listdir('/sys/class/net/'): - if interface.startswith('can'): + for interface in os.listdir("/sys/class/net/"): + if interface.startswith("can"): return interface return None -async def continuous_can_receiver(can_msg_decoder: cantools.db.Database, message_classes, queue, q2, can_bus): + +async def continuous_can_receiver( + can_msg_decoder: cantools.db.Database, message_classes, queue, q2, can_bus +): loop = asyncio.get_event_loop() reader = can.AsyncBufferedReader() notifier = can.Notifier(can_bus, [reader], loop=loop) @@ -44,13 +51,17 @@ async def continuous_can_receiver(can_msg_decoder: cantools.db.Database, message msg = await reader.get_message() # print("got msg") - id = msg.arbitration_id + id = msg.arbitration_id try: - decoded_msg = can_msg_decoder.decode_message(msg.arbitration_id, msg.data, decode_containers=True) + decoded_msg = can_msg_decoder.decode_message( + msg.arbitration_id, msg.data, decode_containers=True + ) # print("decoded msg") msg = can_msg_decoder.get_message_by_frame_id(msg.arbitration_id) # print("got msg by id") - msg = pb_helpers.pack_protobuf_msg(decoded_msg, msg.name.lower(), message_classes) + msg = pb_helpers.pack_protobuf_msg( + decoded_msg, msg.name.lower(), message_classes + ) # print("created pb msg successfully") data = QueueData(msg.DESCRIPTOR.name, msg) await queue.put(data) @@ -64,80 +75,111 @@ async def continuous_can_receiver(can_msg_decoder: cantools.db.Database, message notifier.stop() -async def write_data_to_mcap(queue, mcap_writer): +async def write_data_to_mcap( + writer_cmd_queue: asyncio.Queue, + writer_status_queue: asyncio.Queue, + data_queue: asyncio.Queue, + mcap_writer: HTPBMcapWriter, + write_on_init: bool, +): + writing = write_on_init async with mcap_writer as mcw: while True: - await mcw.write_data(queue) + response_needed = False + if not writer_cmd_queue.empty(): + cmd_msg = writer_cmd_queue.get_nowait() + writing = cmd_msg.writing + response_needed = True + + if writing: + if response_needed: + await mcw.open_new_writer() + await writer_status_queue.put(MCAPServerStatusQueueData(True, mcw.actual_path)) + await mcw.write_data(data_queue) + else: + if response_needed: + await writer_status_queue.put(MCAPServerStatusQueueData(False, mcw.actual_path)) + await mcw.close_writer() + # still keep getting the msgs from queue so it doesnt fill up + trash_msg = await data_queue.get() + async def fxglv_websocket_consume_data(queue, foxglove_server): async with foxglove_server as fz: while True: await fz.send_msgs_from_queue(queue) + async def run(logger): - - # for example, we will have CAN as our only input as of right now but we may need to add in + # for example, we will have CAN as our only input as of right now but we may need to add in # a sensor that inputs over UART or ethernet - can_interface = find_can_interface() - + if can_interface: print(f"Found CAN interface: {can_interface}") try: # Attempt to initialize the CAN bus - bus = can.interface.Bus(channel=can_interface, bustype='socketcan') + bus = can.interface.Bus(channel=can_interface, bustype="socketcan") print(f"Successfully initialized CAN bus on {can_interface}") # Interface exists and bus is initialized, but this doesn't ensure the interface is 'up' except can.CanError as e: print(f"Failed to initialize CAN bus on {can_interface}: {e}") else: - print("defaulting to using virtual can interface vcan0") - bus = can.Bus(channel=UdpMulticastBus.DEFAULT_GROUP_IPv6, interface='udp_multicast') + bus = can.Bus( + channel=UdpMulticastBus.DEFAULT_GROUP_IPv6, interface="udp_multicast" + ) queue = asyncio.Queue() queue2 = asyncio.Queue() path_to_bin = "" path_to_dbc = "" - + if len(sys.argv) > 2: path_to_bin = sys.argv[1] path_to_dbc = sys.argv[2] else: - path_to_bin = os.environ.get('BIN_PATH') - path_to_dbc = os.environ.get('DBC_PATH') + path_to_bin = os.environ.get("BIN_PATH") + path_to_dbc = os.environ.get("DBC_PATH") full_path = os.path.join(path_to_bin, "hytech.bin") full_path_to_dbc = os.path.join(path_to_dbc, "hytech.dbc") db = cantools.db.load_file(full_path_to_dbc) - list_of_msg_names, msg_pb_classes = pb_helpers.get_msg_names_and_classes() - fx_s = HTProtobufFoxgloveServer("0.0.0.0", 8765, "asdf", full_path, list_of_msg_names) + fx_s = HTProtobufFoxgloveServer( + "0.0.0.0", 8765, "hytech-foxglove", full_path, list_of_msg_names + ) path_to_mcap = "." - if(os.path.exists('/etc/nixos')): + if os.path.exists("/etc/nixos"): logger.info("detected running on nixos") path_to_mcap = "/home/nixos/recordings" - - mcap_writer = HTPBMcapWriter(path_to_mcap, list_of_msg_names, True) - mcap_server = MCAPServer(mcap_writer=mcap_writer, path=path_to_mcap) - receiver_task = asyncio.create_task(continuous_can_receiver(db, msg_pb_classes, queue, queue2, bus)) + + init_writing_on_start = False + mcap_writer_status_queue = asyncio.Queue(maxsize=1) + mcap_writer_cmd_queue = asyncio.Queue(maxsize=1) + mcap_writer = HTPBMcapWriter(path_to_mcap, init_writing_on_start) + mcap_web_server = MCAPServer( + writer_command_queue=mcap_writer_cmd_queue, + writer_status_queue=mcap_writer_status_queue, + init_writing=init_writing_on_start, + init_filename=mcap_writer.actual_path + ) + receiver_task = asyncio.create_task( + continuous_can_receiver(db, msg_pb_classes, queue, queue2, bus) + ) fx_task = asyncio.create_task(fxglv_websocket_consume_data(queue, fx_s)) - mcap_task = asyncio.create_task(write_data_to_mcap(queue2, mcap_writer)) - srv_task = asyncio.create_task(mcap_server.start_server()) + mcap_task = asyncio.create_task(write_data_to_mcap(mcap_writer_cmd_queue, mcap_writer_status_queue, queue2, mcap_writer, init_writing_on_start)) + srv_task = asyncio.create_task(mcap_web_server.start_server()) logger.info("created tasks") # in the mcap task I actually have to deserialize the any protobuf msg into the message ID and # the encoded message for the message id. I will need to handle the same association of message id - # and schema in the foxglove websocket server. - - await asyncio.gather(receiver_task, fx_task, mcap_task, srv_task) - # await asyncio.gather(receiver_task, fx_task, mcap_task) + # and schema in the foxglove websocket server. - # await asyncio.gather(receiver_task, mcap_task, srv_task) - # await asyncio.gather(receiver_task) + await asyncio.gather(receiver_task, fx_task, mcap_task, srv_task) if __name__ == "__main__": logging.basicConfig() - logger = logging.getLogger('data_writer_service') + logger = logging.getLogger("data_writer_service") logger.setLevel(logging.INFO) - asyncio.run(run(logger)) \ No newline at end of file + asyncio.run(run(logger))