Skip to content

Commit

Permalink
Merge branch 'main' into time-alignment-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD authored May 30, 2024
2 parents 5231454 + 2c15f7c commit 08b171e
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 68 deletions.
4 changes: 2 additions & 2 deletions src/electron/frontend/core/components/utils/progress.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export const createProgressPopup = async (options, tqdmCallback) => {
return { ...commonReturnValue, id, close };
};

const eventsURL = new URL("/neuroconv/events", baseUrl).href;
const progressEventsUrl = new URL("/neuroconv/events/progress", baseUrl).href;

class ProgressHandler {
constructor(props) {
Expand All @@ -112,4 +112,4 @@ class ProgressHandler {
removeEventListener = (...args) => this.source.removeEventListener(...args);
}

export const progressHandler = new ProgressHandler({ url: eventsURL });
export const progressHandler = new ProgressHandler({ url: progressEventsUrl });
3 changes: 2 additions & 1 deletion src/pyflask/manageNeuroconv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
inspect_multiple_filesystem_objects,
inspect_nwb_file,
inspect_nwb_folder,
listen_to_neuroconv_events,
listen_to_neuroconv_progress_events,
locate_data,
progress_handler,
upload_folder_to_dandi,
upload_multiple_filesystem_objects_to_dandi,
upload_project_to_dandi,
Expand Down
2 changes: 1 addition & 1 deletion src/pyflask/manageNeuroconv/info/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .sse import announcer, format_sse
from .sse import format_sse
from .urls import (
CONVERSION_SAVE_FOLDER_PATH,
GUIDE_ROOT_FOLDER,
Expand Down
22 changes: 0 additions & 22 deletions src/pyflask/manageNeuroconv/info/sse.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,8 @@
import json
import queue


def format_sse(data: str, event=None) -> str:
msg = f"data: {json.dumps(data)}\n\n"
if event is not None:
msg = f"event: {event}\n{msg}"
return msg


class MessageAnnouncer:
def __init__(self):
self.listeners = []

def listen(self):
q = queue.Queue(maxsize=5)
self.listeners.append(q)
return q

def announce(self, msg, event=None):
msg = format_sse(msg, event)
for i in reversed(range(len(self.listeners))):
try:
self.listeners[i].put_nowait(msg)
except queue.Full:
del self.listeners[i]


announcer = MessageAnnouncer()
46 changes: 15 additions & 31 deletions src/pyflask/manageNeuroconv/manage_neuroconv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
from shutil import copytree, rmtree
from typing import Any, Dict, List, Optional, Union

from .info import (
CONVERSION_SAVE_FOLDER_PATH,
GUIDE_ROOT_FOLDER,
STUB_SAVE_FOLDER_PATH,
announcer,
)
from tqdm_publisher import TQDMProgressHandler

from .info import CONVERSION_SAVE_FOLDER_PATH, GUIDE_ROOT_FOLDER, STUB_SAVE_FOLDER_PATH
from .info.sse import format_sse

progress_handler = TQDMProgressHandler()

EXCLUDED_RECORDING_INTERFACE_PROPERTIES = ["contact_vector", "contact_shapes", "group", "location"]

Expand Down Expand Up @@ -859,15 +859,12 @@ def convert_to_nwb(info: dict) -> str:
alignment_info=info.get("alignment", dict()),
)

def update_conversion_progress(**kwargs):
announcer.announce(dict(**kwargs, nwbfile_path=nwbfile_path), "conversion_progress")

# Assume all interfaces have the same conversion options for now
available_options = converter.get_conversion_options_schema()
options = (
{
interface: (
{"stub_test": info["stub_test"]} # , "iter_opts": {"report_hook": update_conversion_progress}}
{"stub_test": info["stub_test"]}
if available_options.get("properties").get(interface).get("properties", {}).get("stub_test")
else {}
)
Expand Down Expand Up @@ -1046,11 +1043,11 @@ def upload_project_to_dandi(


# Create an events endpoint
def listen_to_neuroconv_events():
messages = announcer.listen() # returns a queue.Queue
def listen_to_neuroconv_progress_events():
messages = progress_handler.listen() # returns a queue.Queue
while True:
msg = messages.get() # blocks until a new message arrives
yield msg
yield format_sse(msg)


def generate_dataset(input_path: str, output_path: str) -> dict:
Expand Down Expand Up @@ -1173,9 +1170,7 @@ def inspect_all(url, config):

nwbfile_paths = list(Path(path).rglob("*.nwb"))

request_id = config.get("request_id")
if request_id:
config.pop("request_id")
request_id = config.pop("request_id", None)

n_jobs = config.get("n_jobs", -2) # Default to all but one CPU
n_jobs = calculate_number_of_cpu(requested_cpu=n_jobs)
Expand All @@ -1200,7 +1195,7 @@ def inspect_all(url, config):
# Announce directly
def on_progress_update(message):
message["progress_bar_id"] = request_id # Ensure request_id matches
announcer.announce(
progress_handler.announce(
dict(
request_id=request_id,
**message,
Expand All @@ -1218,7 +1213,6 @@ def on_progress_update(message):
i = 0
for future in inspection_iterable:
i += 1
# on_progress_update(dict(progress_bar_id=request_id, format_dict=dict(total=len(futures), n=i)))
for message in future.result():
messages.append(message)

Expand All @@ -1228,25 +1222,15 @@ def on_progress_update(message):
def inspect_nwb_folder(url, payload) -> dict:
from pickle import PicklingError

from nwbinspector import load_config
from nwbinspector.inspector_tools import format_messages, get_report_header
from nwbinspector.nwbinspector import InspectorOutputJSONEncoder

kwargs = dict(
ignore=[
"check_description",
"check_data_orientation",
], # TODO: remove when metadata control is exposed
config=load_config(filepath_or_keyword="dandi"),
**payload,
)

try:
messages = inspect_all(url, kwargs)
messages = inspect_all(url, payload)
except PicklingError as exception:
if "attribute lookup auto_parse_some_output on nwbinspector.register_checks failed" in str(exception):
del kwargs["n_jobs"]
messages = inspect_all(url, kwargs)
del payload["n_jobs"]
messages = inspect_all(url, payload)
else:
raise exception
except Exception as exception:
Expand Down
20 changes: 9 additions & 11 deletions src/pyflask/namespaces/neuroconv.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
inspect_multiple_filesystem_objects,
inspect_nwb_file,
inspect_nwb_folder,
listen_to_neuroconv_events,
listen_to_neuroconv_progress_events,
locate_data,
progress_handler,
upload_folder_to_dandi,
upload_multiple_filesystem_objects_to_dandi,
upload_project_to_dandi,
validate_metadata,
)
from manageNeuroconv.info import announcer

neuroconv_namespace = Namespace("neuroconv", description="Neuroconv neuroconv_namespace for the NWB GUIDE.")

Expand Down Expand Up @@ -158,17 +158,16 @@ def post(self):
class InspectNWBFolder(Resource):
@neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"})
def post(self):
url = f"{request.url_root}neuroconv/announce"
url = f"{request.url_root}neuroconv/announce/progress"
return inspect_nwb_folder(url, neuroconv_namespace.payload)


@neuroconv_namespace.route("/announce")
@neuroconv_namespace.route("/announce/progress")
class InspectNWBFolder(Resource):
@neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"})
def post(self):
data = neuroconv_namespace.payload
announcer.announce(data)

progress_handler.announce(data)
return True


Expand All @@ -178,7 +177,7 @@ class InspectNWBFolder(Resource):
def post(self):
from os.path import isfile

url = f"{request.url_root}neuroconv/announce"
url = f"{request.url_root}neuroconv/announce/progress"

paths = neuroconv_namespace.payload["paths"]

Expand Down Expand Up @@ -207,9 +206,8 @@ def post(self):


# Create an events endpoint
# announcer.announce('test', 'publish')
@neuroconv_namespace.route("/events", methods=["GET"])
class Events(Resource):
@neuroconv_namespace.route("/events/progress", methods=["GET"])
class ProgressEvents(Resource):
@neuroconv_namespace.doc(responses={200: "Success", 400: "Bad Request", 500: "Internal server error"})
def get(self):
return Response(listen_to_neuroconv_events(), mimetype="text/event-stream")
return Response(listen_to_neuroconv_progress_events(), mimetype="text/event-stream")

0 comments on commit 08b171e

Please sign in to comment.