Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inspect progress bars with a global handler #695

Merged
merged 15 commits into from
May 30, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions src/pyflask/manageNeuroconv/manage_neuroconv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
from shutil import copytree, rmtree
from typing import Any, Dict, List, Optional, Union

from .info import (
CONVERSION_SAVE_FOLDER_PATH,
GUIDE_ROOT_FOLDER,
STUB_SAVE_FOLDER_PATH,
announcer,
)
from tqdm_publisher import TQDMProgressHandler

from .info import CONVERSION_SAVE_FOLDER_PATH, GUIDE_ROOT_FOLDER, STUB_SAVE_FOLDER_PATH
from .info.sse import format_sse

progress_handler = TQDMProgressHandler()

EXCLUDED_RECORDING_INTERFACE_PROPERTIES = ["contact_vector", "contact_shapes", "group", "location"]

Expand Down Expand Up @@ -727,9 +727,6 @@ def convert_to_nwb(info: dict) -> str:

converter = instantiate_custom_converter(resolved_source_data, info["interfaces"])

def update_conversion_progress(**kwargs):
announcer.announce(dict(**kwargs, nwbfile_path=nwbfile_path), "conversion_progress")

# Assume all interfaces have the same conversion options for now
available_options = converter.get_conversion_options_schema()
options = (
Expand Down Expand Up @@ -915,10 +912,10 @@ def upload_project_to_dandi(

# Create an events endpoint
def listen_to_neuroconv_events():
messages = announcer.listen() # returns a queue.Queue
messages = progress_handler.listen() # returns a queue.Queue
while True:
msg = messages.get() # blocks until a new message arrives
yield msg
yield format_sse(msg)


def generate_dataset(input_path: str, output_path: str) -> dict:
Expand Down Expand Up @@ -1041,9 +1038,7 @@ def inspect_all(url, config):

nwbfile_paths = list(Path(path).rglob("*.nwb"))

request_id = config.get("request_id")
if request_id:
config.pop("request_id")
request_id = config.pop("request_id", None)

n_jobs = config.get("n_jobs", -2) # Default to all but one CPU
n_jobs = calculate_number_of_cpu(requested_cpu=n_jobs)
Expand All @@ -1068,7 +1063,7 @@ def inspect_all(url, config):
# Announce directly
def on_progress_update(message):
message["progress_bar_id"] = request_id # Ensure request_id matches
announcer.announce(
progress_handler.announce(
dict(
request_id=request_id,
**message,
Expand Down Expand Up @@ -1096,11 +1091,16 @@ def on_progress_update(message):
def inspect_nwb_folder(url, payload) -> dict:
from pickle import PicklingError

request_id = payload.get("request_id", None)

from nwbinspector import load_config
from nwbinspector.inspector_tools import format_messages, get_report_header
from nwbinspector.nwbinspector import InspectorOutputJSONEncoder

kwargs = dict(
n_jobs=-2, # uses number of CPU - 1
garrettmflynn marked this conversation as resolved.
Show resolved Hide resolved
progress_bar_class=progress_handler.create_progress_subscriber,
progress_bar_options=dict(additional_metadata=dict(request_id=request_id)),
ignore=[
"check_description",
"check_data_orientation",
Expand Down
Loading