diff --git a/src/electron/frontend/core/components/utils/progress.js b/src/electron/frontend/core/components/utils/progress.js index ea0678d12d..b938de34af 100644 --- a/src/electron/frontend/core/components/utils/progress.js +++ b/src/electron/frontend/core/components/utils/progress.js @@ -88,7 +88,7 @@ export const createProgressPopup = async (options, tqdmCallback) => { return { ...commonReturnValue, id, close }; }; -const eventsURL = new URL("/neuroconv/events", baseUrl).href; +const progressEventsUrl = new URL("/neuroconv/events/progress", baseUrl).href; class ProgressHandler { constructor(props) { @@ -112,4 +112,4 @@ class ProgressHandler { removeEventListener = (...args) => this.source.removeEventListener(...args); } -export const progressHandler = new ProgressHandler({ url: eventsURL }); +export const progressHandler = new ProgressHandler({ url: progressEventsUrl }); diff --git a/src/pyflask/manageNeuroconv/__init__.py b/src/pyflask/manageNeuroconv/__init__.py index f9f55e0826..25136e1328 100644 --- a/src/pyflask/manageNeuroconv/__init__.py +++ b/src/pyflask/manageNeuroconv/__init__.py @@ -12,8 +12,9 @@ inspect_multiple_filesystem_objects, inspect_nwb_file, inspect_nwb_folder, - listen_to_neuroconv_events, + listen_to_neuroconv_progress_events, locate_data, + progress_handler, upload_folder_to_dandi, upload_multiple_filesystem_objects_to_dandi, upload_project_to_dandi, diff --git a/src/pyflask/manageNeuroconv/info/__init__.py b/src/pyflask/manageNeuroconv/info/__init__.py index 3a29439d24..74e4eb6ef9 100644 --- a/src/pyflask/manageNeuroconv/info/__init__.py +++ b/src/pyflask/manageNeuroconv/info/__init__.py @@ -1,4 +1,4 @@ -from .sse import announcer, format_sse +from .sse import format_sse from .urls import ( CONVERSION_SAVE_FOLDER_PATH, GUIDE_ROOT_FOLDER, diff --git a/src/pyflask/manageNeuroconv/info/sse.py b/src/pyflask/manageNeuroconv/info/sse.py index b9593cba59..55ee0f7ebf 100644 --- a/src/pyflask/manageNeuroconv/info/sse.py +++ b/src/pyflask/manageNeuroconv/info/sse.py @@ -1,5 +1,4 @@ import json -import queue def format_sse(data: str, event=None) -> str: @@ -7,24 +6,3 @@ def format_sse(data: str, event=None) -> str: if event is not None: msg = f"event: {event}\n{msg}" return msg - - -class MessageAnnouncer: - def __init__(self): - self.listeners = [] - - def listen(self): - q = queue.Queue(maxsize=5) - self.listeners.append(q) - return q - - def announce(self, msg, event=None): - msg = format_sse(msg, event) - for i in reversed(range(len(self.listeners))): - try: - self.listeners[i].put_nowait(msg) - except queue.Full: - del self.listeners[i] - - -announcer = MessageAnnouncer() diff --git a/src/pyflask/manageNeuroconv/manage_neuroconv.py b/src/pyflask/manageNeuroconv/manage_neuroconv.py index dedabd6836..ab3ba7dc24 100644 --- a/src/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/src/pyflask/manageNeuroconv/manage_neuroconv.py @@ -11,12 +11,12 @@ from shutil import copytree, rmtree from typing import Any, Dict, List, Optional, Union -from .info import ( - CONVERSION_SAVE_FOLDER_PATH, - GUIDE_ROOT_FOLDER, - STUB_SAVE_FOLDER_PATH, - announcer, -) +from tqdm_publisher import TQDMProgressHandler + +from .info import CONVERSION_SAVE_FOLDER_PATH, GUIDE_ROOT_FOLDER, STUB_SAVE_FOLDER_PATH +from .info.sse import format_sse + +progress_handler = TQDMProgressHandler() EXCLUDED_RECORDING_INTERFACE_PROPERTIES = ["contact_vector", "contact_shapes", "group", "location"] @@ -727,15 +727,12 @@ def convert_to_nwb(info: dict) -> str: converter = instantiate_custom_converter(resolved_source_data, info["interfaces"]) - def update_conversion_progress(**kwargs): - announcer.announce(dict(**kwargs, nwbfile_path=nwbfile_path), "conversion_progress") - # Assume all interfaces have the same conversion options for now available_options = converter.get_conversion_options_schema() options = ( { interface: ( - {"stub_test": info["stub_test"]} # , "iter_opts": {"report_hook": update_conversion_progress}} + {"stub_test": info["stub_test"]} if available_options.get("properties").get(interface).get("properties", {}).get("stub_test") else {} ) @@ -914,11 +911,11 @@ def upload_project_to_dandi( # Create an events endpoint -def listen_to_neuroconv_events(): - messages = announcer.listen() # returns a queue.Queue +def listen_to_neuroconv_progress_events(): + messages = progress_handler.listen() # returns a queue.Queue while True: msg = messages.get() # blocks until a new message arrives - yield msg + yield format_sse(msg) def generate_dataset(input_path: str, output_path: str) -> dict: @@ -1041,9 +1038,7 @@ def inspect_all(url, config): nwbfile_paths = list(Path(path).rglob("*.nwb")) - request_id = config.get("request_id") - if request_id: - config.pop("request_id") + request_id = config.pop("request_id", None) n_jobs = config.get("n_jobs", -2) # Default to all but one CPU n_jobs = calculate_number_of_cpu(requested_cpu=n_jobs) @@ -1068,7 +1063,7 @@ def inspect_all(url, config): # Announce directly def on_progress_update(message): message["progress_bar_id"] = request_id # Ensure request_id matches - announcer.announce( + progress_handler.announce( dict( request_id=request_id, **message, @@ -1086,7 +1081,6 @@ def on_progress_update(message): i = 0 for future in inspection_iterable: i += 1 - # on_progress_update(dict(progress_bar_id=request_id, format_dict=dict(total=len(futures), n=i))) for message in future.result(): messages.append(message) @@ -1096,25 +1090,15 @@ def on_progress_update(message): def inspect_nwb_folder(url, payload) -> dict: from pickle import PicklingError - from nwbinspector import load_config from nwbinspector.inspector_tools import format_messages, get_report_header from nwbinspector.nwbinspector import InspectorOutputJSONEncoder - kwargs = dict( - ignore=[ - "check_description", - "check_data_orientation", - ], # TODO: remove when metadata control is exposed - config=load_config(filepath_or_keyword="dandi"), - **payload, - ) - try: - messages = inspect_all(url, kwargs) + messages = inspect_all(url, payload) except PicklingError as exception: if "attribute lookup auto_parse_some_output on nwbinspector.register_checks failed" in str(exception): - del kwargs["n_jobs"] - messages = inspect_all(url, kwargs) + del payload["n_jobs"] + messages = inspect_all(url, payload) else: raise exception except Exception as exception: diff --git a/src/pyflask/namespaces/neuroconv.py b/src/pyflask/namespaces/neuroconv.py index e3383fde3b..f298a6a13a 100644 --- a/src/pyflask/namespaces/neuroconv.py +++ b/src/pyflask/namespaces/neuroconv.py @@ -13,14 +13,14 @@ inspect_multiple_filesystem_objects, inspect_nwb_file, inspect_nwb_folder, - listen_to_neuroconv_events, + listen_to_neuroconv_progress_events, locate_data, + progress_handler, upload_folder_to_dandi, upload_multiple_filesystem_objects_to_dandi, upload_project_to_dandi, validate_metadata, ) -from manageNeuroconv.info import announcer neuroconv_namespace = Namespace("neuroconv", description="Neuroconv neuroconv_namespace for the NWB GUIDE.") @@ -158,17 +158,16 @@ def post(self): class InspectNWBFolder(Resource): @neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"}) def post(self): - url = f"{request.url_root}neuroconv/announce" + url = f"{request.url_root}neuroconv/announce/progress" return inspect_nwb_folder(url, neuroconv_namespace.payload) -@neuroconv_namespace.route("/announce") +@neuroconv_namespace.route("/announce/progress") class InspectNWBFolder(Resource): @neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"}) def post(self): data = neuroconv_namespace.payload - announcer.announce(data) - + progress_handler.announce(data) return True @@ -178,7 +177,7 @@ class InspectNWBFolder(Resource): def post(self): from os.path import isfile - url = f"{request.url_root}neuroconv/announce" + url = f"{request.url_root}neuroconv/announce/progress" paths = neuroconv_namespace.payload["paths"] @@ -207,9 +206,8 @@ def post(self): # Create an events endpoint -# announcer.announce('test', 'publish') -@neuroconv_namespace.route("/events", methods=["GET"]) -class Events(Resource): +@neuroconv_namespace.route("/events/progress", methods=["GET"]) +class ProgressEvents(Resource): @neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"}) def get(self): - return Response(listen_to_neuroconv_events(), mimetype="text/event-stream") + return Response(listen_to_neuroconv_progress_events(), mimetype="text/event-stream")