From 129e37e7d69194a1568b615cca36083339e036b5 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 20 Mar 2024 11:30:03 -0700 Subject: [PATCH 01/10] Use a tqdm_publisher handler for managing queue --- pyflask/manageNeuroconv/manage_neuroconv.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pyflask/manageNeuroconv/manage_neuroconv.py b/pyflask/manageNeuroconv/manage_neuroconv.py index c270a9f768..0c825f7858 100644 --- a/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/pyflask/manageNeuroconv/manage_neuroconv.py @@ -13,10 +13,11 @@ from pathlib import Path from typing import Any, Dict, List, Optional -from sse import MessageAnnouncer +from sse import format_sse from .info import GUIDE_ROOT_FOLDER, STUB_SAVE_FOLDER_PATH, CONVERSION_SAVE_FOLDER_PATH -announcer = MessageAnnouncer() +from tqdm_publisher import TQDMProgressHandler +progress_handler = TQDMProgressHandler() EXCLUDED_RECORDING_INTERFACE_PROPERTIES = ["contact_vector", "contact_shapes", "group", "location"] EXTRA_RECORDING_INTERFACE_PROPERTIES = { @@ -547,9 +548,6 @@ def convert_to_nwb(info: dict) -> str: converter = instantiate_custom_converter(resolved_source_data, info["interfaces"]) - def update_conversion_progress(**kwargs): - announcer.announce(dict(**kwargs, nwbfile_path=nwbfile_path), "conversion_progress") - # Assume all interfaces have the same conversion options for now available_options = converter.get_conversion_options_schema() options = ( @@ -685,10 +683,10 @@ def upload_project_to_dandi( # Create an events endpoint def listen_to_neuroconv_events(): - messages = announcer.listen() # returns a queue.Queue + messages = progress_handler.listen() # returns a queue.Queue while True: msg = messages.get() # blocks until a new message arrives - yield msg + yield format_sse(msg) def generate_dataset(input_path: str, output_path: str): @@ -765,17 +763,17 @@ def inspect_nwb_folder(payload): from nwbinspector.nwbinspector import InspectorOutputJSONEncoder from pickle import PicklingError - from tqdm_publisher import TQDMProgressSubscriber - request_id = payload.get("request_id") if request_id: payload.pop("request_id") kwargs = dict( n_jobs=-2, # uses number of CPU - 1 - progress_bar_class=TQDMProgressSubscriber, + progress_bar_class=progress_handler.create, progress_bar_options=dict( - on_progress_update=lambda message: announcer.announce(dict(request_id=request_id, **message)) + additional_metadata = dict( + request_id = request_id + ) ), ignore=[ "check_description", From 309727948ac8fe222a73ab7c9bda92902682edd9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:34:56 +0000 Subject: [PATCH 02/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pyflask/manageNeuroconv/manage_neuroconv.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pyflask/manageNeuroconv/manage_neuroconv.py b/pyflask/manageNeuroconv/manage_neuroconv.py index 0c825f7858..971a0e4836 100644 --- a/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/pyflask/manageNeuroconv/manage_neuroconv.py @@ -17,6 +17,7 @@ from .info import GUIDE_ROOT_FOLDER, STUB_SAVE_FOLDER_PATH, CONVERSION_SAVE_FOLDER_PATH from tqdm_publisher import TQDMProgressHandler + progress_handler = TQDMProgressHandler() EXCLUDED_RECORDING_INTERFACE_PROPERTIES = ["contact_vector", "contact_shapes", "group", "location"] @@ -770,11 +771,7 @@ def inspect_nwb_folder(payload): kwargs = dict( n_jobs=-2, # uses number of CPU - 1 progress_bar_class=progress_handler.create, - progress_bar_options=dict( - additional_metadata = dict( - request_id = request_id - ) - ), + progress_bar_options=dict(additional_metadata=dict(request_id=request_id)), ignore=[ "check_description", "check_data_orientation", From 00b5aa4de6e7345fed9c625ce4db02afe39ff9c8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 29 May 2024 15:11:10 +0000 Subject: [PATCH 03/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pyflask/manageNeuroconv/manage_neuroconv.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pyflask/manageNeuroconv/manage_neuroconv.py b/src/pyflask/manageNeuroconv/manage_neuroconv.py index cf2ef2c42e..07793cfa4c 100644 --- a/src/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/src/pyflask/manageNeuroconv/manage_neuroconv.py @@ -11,11 +11,11 @@ from shutil import copytree, rmtree from typing import Any, Dict, List, Optional, Union -from .info.sse import format_sse -from .info import GUIDE_ROOT_FOLDER, STUB_SAVE_FOLDER_PATH, CONVERSION_SAVE_FOLDER_PATH - from tqdm_publisher import TQDMProgressHandler +from .info import CONVERSION_SAVE_FOLDER_PATH, GUIDE_ROOT_FOLDER, STUB_SAVE_FOLDER_PATH +from .info.sse import format_sse + progress_handler = TQDMProgressHandler() EXCLUDED_RECORDING_INTERFACE_PROPERTIES = ["contact_vector", "contact_shapes", "group", "location"] From 4f5a29499601231a8a2122e8b594c75177ac1644 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 29 May 2024 11:14:30 -0700 Subject: [PATCH 04/10] Update manage_neuroconv.py --- src/pyflask/manageNeuroconv/manage_neuroconv.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/pyflask/manageNeuroconv/manage_neuroconv.py b/src/pyflask/manageNeuroconv/manage_neuroconv.py index 07793cfa4c..2a70dd7a5b 100644 --- a/src/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/src/pyflask/manageNeuroconv/manage_neuroconv.py @@ -1094,11 +1094,18 @@ def inspect_nwb_folder(url, payload) -> dict: request_id = payload.get("request_id", None) from nwbinspector import load_config + from nwbinspector.utils import calculate_number_of_cpu from nwbinspector.inspector_tools import format_messages, get_report_header from nwbinspector.nwbinspector import InspectorOutputJSONEncoder + # Prepare n_jobs + n_jobs = payload.pop("n_jobs", -2) # Default to all but one CPU + n_jobs = calculate_number_of_cpu(requested_cpu=n_jobs) + n_jobs = None if n_jobs == -1 else n_jobs + + # Organize keyword arguments kwargs = dict( - n_jobs=-2, # uses number of CPU - 1 + n_jobs, progress_bar_class=progress_handler.create_progress_subscriber, progress_bar_options=dict(additional_metadata=dict(request_id=request_id)), ignore=[ @@ -1109,6 +1116,7 @@ def inspect_nwb_folder(url, payload) -> dict: **payload, ) + try: messages = inspect_all(url, kwargs) except PicklingError as exception: From ed1bb89364ce47dc1cc88b32a235e9f809942984 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 29 May 2024 18:14:49 +0000 Subject: [PATCH 05/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pyflask/manageNeuroconv/manage_neuroconv.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/pyflask/manageNeuroconv/manage_neuroconv.py b/src/pyflask/manageNeuroconv/manage_neuroconv.py index 2a70dd7a5b..2e95f7ae7a 100644 --- a/src/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/src/pyflask/manageNeuroconv/manage_neuroconv.py @@ -1094,9 +1094,9 @@ def inspect_nwb_folder(url, payload) -> dict: request_id = payload.get("request_id", None) from nwbinspector import load_config - from nwbinspector.utils import calculate_number_of_cpu from nwbinspector.inspector_tools import format_messages, get_report_header from nwbinspector.nwbinspector import InspectorOutputJSONEncoder + from nwbinspector.utils import calculate_number_of_cpu # Prepare n_jobs n_jobs = payload.pop("n_jobs", -2) # Default to all but one CPU @@ -1116,7 +1116,6 @@ def inspect_nwb_folder(url, payload) -> dict: **payload, ) - try: messages = inspect_all(url, kwargs) except PicklingError as exception: From 2d205d532c4c6a38d90ca6adab6dd7e63ae3631a Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 29 May 2024 11:15:29 -0700 Subject: [PATCH 06/10] Update manage_neuroconv.py --- src/pyflask/manageNeuroconv/manage_neuroconv.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pyflask/manageNeuroconv/manage_neuroconv.py b/src/pyflask/manageNeuroconv/manage_neuroconv.py index 2a70dd7a5b..59d1ea6cd8 100644 --- a/src/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/src/pyflask/manageNeuroconv/manage_neuroconv.py @@ -1105,7 +1105,7 @@ def inspect_nwb_folder(url, payload) -> dict: # Organize keyword arguments kwargs = dict( - n_jobs, + n_jobs=n_jobs, progress_bar_class=progress_handler.create_progress_subscriber, progress_bar_options=dict(additional_metadata=dict(request_id=request_id)), ignore=[ From cbce1c9f8a7b8d11f7b785e6b8938617364050d3 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 29 May 2024 11:20:23 -0700 Subject: [PATCH 07/10] Simplify parsing --- .../manageNeuroconv/manage_neuroconv.py | 28 ++----------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/src/pyflask/manageNeuroconv/manage_neuroconv.py b/src/pyflask/manageNeuroconv/manage_neuroconv.py index 3d92b5c7b3..cdf7c38898 100644 --- a/src/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/src/pyflask/manageNeuroconv/manage_neuroconv.py @@ -1091,37 +1091,15 @@ def on_progress_update(message): def inspect_nwb_folder(url, payload) -> dict: from pickle import PicklingError - request_id = payload.get("request_id", None) - - from nwbinspector import load_config from nwbinspector.inspector_tools import format_messages, get_report_header from nwbinspector.nwbinspector import InspectorOutputJSONEncoder - from nwbinspector.utils import calculate_number_of_cpu - - # Prepare n_jobs - n_jobs = payload.pop("n_jobs", -2) # Default to all but one CPU - n_jobs = calculate_number_of_cpu(requested_cpu=n_jobs) - n_jobs = None if n_jobs == -1 else n_jobs - - # Organize keyword arguments - kwargs = dict( - n_jobs=n_jobs, - progress_bar_class=progress_handler.create_progress_subscriber, - progress_bar_options=dict(additional_metadata=dict(request_id=request_id)), - ignore=[ - "check_description", - "check_data_orientation", - ], # TODO: remove when metadata control is exposed - config=load_config(filepath_or_keyword="dandi"), - **payload, - ) try: - messages = inspect_all(url, kwargs) + messages = inspect_all(url, payload) except PicklingError as exception: if "attribute lookup auto_parse_some_output on nwbinspector.register_checks failed" in str(exception): - del kwargs["n_jobs"] - messages = inspect_all(url, kwargs) + del payload["n_jobs"] + messages = inspect_all(url, payload) else: raise exception except Exception as exception: From 45a1255bbc7a0f7bd43ef56cb8cafe076b354ea5 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 29 May 2024 11:20:30 -0700 Subject: [PATCH 08/10] Update manage_neuroconv.py --- src/pyflask/manageNeuroconv/manage_neuroconv.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pyflask/manageNeuroconv/manage_neuroconv.py b/src/pyflask/manageNeuroconv/manage_neuroconv.py index cdf7c38898..3cbf9ab0f4 100644 --- a/src/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/src/pyflask/manageNeuroconv/manage_neuroconv.py @@ -1081,7 +1081,6 @@ def on_progress_update(message): i = 0 for future in inspection_iterable: i += 1 - # on_progress_update(dict(progress_bar_id=request_id, format_dict=dict(total=len(futures), n=i))) for message in future.result(): messages.append(message) From e6fbe936e4310655bc2a70be3e4390b87dbdcb92 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Thu, 30 May 2024 08:55:58 -0700 Subject: [PATCH 09/10] Fix parallel progress updates and improve naming --- .../core/components/utils/progress.js | 4 +-- src/pyflask/manageNeuroconv/__init__.py | 3 ++- src/pyflask/manageNeuroconv/info/__init__.py | 2 +- src/pyflask/manageNeuroconv/info/sse.py | 25 +------------------ .../manageNeuroconv/manage_neuroconv.py | 4 +-- src/pyflask/namespaces/neuroconv.py | 20 +++++++-------- 6 files changed, 17 insertions(+), 41 deletions(-) diff --git a/src/electron/frontend/core/components/utils/progress.js b/src/electron/frontend/core/components/utils/progress.js index ea0678d12d..b938de34af 100644 --- a/src/electron/frontend/core/components/utils/progress.js +++ b/src/electron/frontend/core/components/utils/progress.js @@ -88,7 +88,7 @@ export const createProgressPopup = async (options, tqdmCallback) => { return { ...commonReturnValue, id, close }; }; -const eventsURL = new URL("/neuroconv/events", baseUrl).href; +const progressEventsUrl = new URL("/neuroconv/events/progress", baseUrl).href; class ProgressHandler { constructor(props) { @@ -112,4 +112,4 @@ class ProgressHandler { removeEventListener = (...args) => this.source.removeEventListener(...args); } -export const progressHandler = new ProgressHandler({ url: eventsURL }); +export const progressHandler = new ProgressHandler({ url: progressEventsUrl }); diff --git a/src/pyflask/manageNeuroconv/__init__.py b/src/pyflask/manageNeuroconv/__init__.py index f9f55e0826..3f618ad797 100644 --- a/src/pyflask/manageNeuroconv/__init__.py +++ b/src/pyflask/manageNeuroconv/__init__.py @@ -12,10 +12,11 @@ inspect_multiple_filesystem_objects, inspect_nwb_file, inspect_nwb_folder, - listen_to_neuroconv_events, + listen_to_neuroconv_progress_events, locate_data, upload_folder_to_dandi, upload_multiple_filesystem_objects_to_dandi, upload_project_to_dandi, validate_metadata, + progress_handler ) diff --git a/src/pyflask/manageNeuroconv/info/__init__.py b/src/pyflask/manageNeuroconv/info/__init__.py index 3a29439d24..74e4eb6ef9 100644 --- a/src/pyflask/manageNeuroconv/info/__init__.py +++ b/src/pyflask/manageNeuroconv/info/__init__.py @@ -1,4 +1,4 @@ -from .sse import announcer, format_sse +from .sse import format_sse from .urls import ( CONVERSION_SAVE_FOLDER_PATH, GUIDE_ROOT_FOLDER, diff --git a/src/pyflask/manageNeuroconv/info/sse.py b/src/pyflask/manageNeuroconv/info/sse.py index b9593cba59..165d2a2293 100644 --- a/src/pyflask/manageNeuroconv/info/sse.py +++ b/src/pyflask/manageNeuroconv/info/sse.py @@ -1,30 +1,7 @@ import json -import queue - def format_sse(data: str, event=None) -> str: msg = f"data: {json.dumps(data)}\n\n" if event is not None: msg = f"event: {event}\n{msg}" - return msg - - -class MessageAnnouncer: - def __init__(self): - self.listeners = [] - - def listen(self): - q = queue.Queue(maxsize=5) - self.listeners.append(q) - return q - - def announce(self, msg, event=None): - msg = format_sse(msg, event) - for i in reversed(range(len(self.listeners))): - try: - self.listeners[i].put_nowait(msg) - except queue.Full: - del self.listeners[i] - - -announcer = MessageAnnouncer() + return msg \ No newline at end of file diff --git a/src/pyflask/manageNeuroconv/manage_neuroconv.py b/src/pyflask/manageNeuroconv/manage_neuroconv.py index 3cbf9ab0f4..ab3ba7dc24 100644 --- a/src/pyflask/manageNeuroconv/manage_neuroconv.py +++ b/src/pyflask/manageNeuroconv/manage_neuroconv.py @@ -732,7 +732,7 @@ def convert_to_nwb(info: dict) -> str: options = ( { interface: ( - {"stub_test": info["stub_test"]} # , "iter_opts": {"report_hook": update_conversion_progress}} + {"stub_test": info["stub_test"]} if available_options.get("properties").get(interface).get("properties", {}).get("stub_test") else {} ) @@ -911,7 +911,7 @@ def upload_project_to_dandi( # Create an events endpoint -def listen_to_neuroconv_events(): +def listen_to_neuroconv_progress_events(): messages = progress_handler.listen() # returns a queue.Queue while True: msg = messages.get() # blocks until a new message arrives diff --git a/src/pyflask/namespaces/neuroconv.py b/src/pyflask/namespaces/neuroconv.py index e3383fde3b..db2027387a 100644 --- a/src/pyflask/namespaces/neuroconv.py +++ b/src/pyflask/namespaces/neuroconv.py @@ -13,14 +13,14 @@ inspect_multiple_filesystem_objects, inspect_nwb_file, inspect_nwb_folder, - listen_to_neuroconv_events, + listen_to_neuroconv_progress_events, locate_data, upload_folder_to_dandi, upload_multiple_filesystem_objects_to_dandi, upload_project_to_dandi, validate_metadata, + progress_handler ) -from manageNeuroconv.info import announcer neuroconv_namespace = Namespace("neuroconv", description="Neuroconv neuroconv_namespace for the NWB GUIDE.") @@ -158,17 +158,16 @@ def post(self): class InspectNWBFolder(Resource): @neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"}) def post(self): - url = f"{request.url_root}neuroconv/announce" + url = f"{request.url_root}neuroconv/announce/progress" return inspect_nwb_folder(url, neuroconv_namespace.payload) -@neuroconv_namespace.route("/announce") +@neuroconv_namespace.route("/announce/progress") class InspectNWBFolder(Resource): @neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"}) def post(self): data = neuroconv_namespace.payload - announcer.announce(data) - + progress_handler.announce(data) return True @@ -178,7 +177,7 @@ class InspectNWBFolder(Resource): def post(self): from os.path import isfile - url = f"{request.url_root}neuroconv/announce" + url = f"{request.url_root}neuroconv/announce/progress" paths = neuroconv_namespace.payload["paths"] @@ -207,9 +206,8 @@ def post(self): # Create an events endpoint -# announcer.announce('test', 'publish') -@neuroconv_namespace.route("/events", methods=["GET"]) -class Events(Resource): +@neuroconv_namespace.route("/events/progress", methods=["GET"]) +class ProgressEvents(Resource): @neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"}) def get(self): - return Response(listen_to_neuroconv_events(), mimetype="text/event-stream") + return Response(listen_to_neuroconv_progress_events(), mimetype="text/event-stream") From 1dc1bd1739dc5b8af4de1af87a6ad298aa1599fe Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 30 May 2024 15:56:15 +0000 Subject: [PATCH 10/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pyflask/manageNeuroconv/__init__.py | 2 +- src/pyflask/manageNeuroconv/info/sse.py | 3 ++- src/pyflask/namespaces/neuroconv.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/pyflask/manageNeuroconv/__init__.py b/src/pyflask/manageNeuroconv/__init__.py index 3f618ad797..25136e1328 100644 --- a/src/pyflask/manageNeuroconv/__init__.py +++ b/src/pyflask/manageNeuroconv/__init__.py @@ -14,9 +14,9 @@ inspect_nwb_folder, listen_to_neuroconv_progress_events, locate_data, + progress_handler, upload_folder_to_dandi, upload_multiple_filesystem_objects_to_dandi, upload_project_to_dandi, validate_metadata, - progress_handler ) diff --git a/src/pyflask/manageNeuroconv/info/sse.py b/src/pyflask/manageNeuroconv/info/sse.py index 165d2a2293..55ee0f7ebf 100644 --- a/src/pyflask/manageNeuroconv/info/sse.py +++ b/src/pyflask/manageNeuroconv/info/sse.py @@ -1,7 +1,8 @@ import json + def format_sse(data: str, event=None) -> str: msg = f"data: {json.dumps(data)}\n\n" if event is not None: msg = f"event: {event}\n{msg}" - return msg \ No newline at end of file + return msg diff --git a/src/pyflask/namespaces/neuroconv.py b/src/pyflask/namespaces/neuroconv.py index db2027387a..f298a6a13a 100644 --- a/src/pyflask/namespaces/neuroconv.py +++ b/src/pyflask/namespaces/neuroconv.py @@ -15,11 +15,11 @@ inspect_nwb_folder, listen_to_neuroconv_progress_events, locate_data, + progress_handler, upload_folder_to_dandi, upload_multiple_filesystem_objects_to_dandi, upload_project_to_dandi, validate_metadata, - progress_handler ) neuroconv_namespace = Namespace("neuroconv", description="Neuroconv neuroconv_namespace for the NWB GUIDE.")