From 5969b4d3408ed09927fbec25b9b3d818f92c3ec6 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Sun, 7 Apr 2024 21:15:08 -0400 Subject: [PATCH 01/21] add docstrings and annotations --- src/tqdm_publisher/__init__.py | 2 +- .../_demos/_parallel_bars/_server.py | 36 ++++++++-------- src/tqdm_publisher/_handler.py | 41 +++++++++++++------ src/tqdm_publisher/_publisher.py | 12 +++--- src/tqdm_publisher/_subscriber.py | 22 +++++++--- 5 files changed, 71 insertions(+), 42 deletions(-) diff --git a/src/tqdm_publisher/__init__.py b/src/tqdm_publisher/__init__.py index 37c74a8..0d28ffd 100644 --- a/src/tqdm_publisher/__init__.py +++ b/src/tqdm_publisher/__init__.py @@ -2,4 +2,4 @@ from ._publisher import TQDMPublisher from ._subscriber import TQDMProgressSubscriber -__all__ = ["TQDMPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] +__all__ = ["TQDMProgressPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 0ab5190..b0ed787 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -13,7 +13,7 @@ from flask import Flask, Response, jsonify, request from flask_cors import CORS, cross_origin -from tqdm_publisher import TQDMProgressHandler, TQDMPublisher +from tqdm_publisher import TQDMProgressHandler, TQDMProgressPublisher from tqdm_publisher._demos._parallel_bars._client import ( create_http_server, find_free_port, @@ -24,20 +24,20 @@ # Each outer entry is a list of 'tasks' to perform on a particular worker # For demonstration purposes, each in the list of tasks is the length of time in seconds # that each iteration of the task takes to run and update the progress bar (emulated by sleeping) -SECONDS_PER_TASK = 1 +BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task NUMBER_OF_TASKS_PER_JOB = 6 TASK_TIMES: List[List[float]] = [ - [SECONDS_PER_TASK * task_index] * task_index for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) + [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) ] WEBSOCKETS = {} -## NOTE: TQDMProgressHandler cannot be called from a process...so we just use a queue directly +## NOTE: TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess progress_handler = TQDMProgressHandler() -def forward_updates_over_sse(request_id, id, n, total, **kwargs): - progress_handler._announce(dict(request_id=request_id, id=id, format_dict=dict(n=n, total=total))) +def forward_updates_over_server_side_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): + progress_handler._announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs)) class ThreadedHTTPServer: @@ -107,17 +107,17 @@ def _run_sleep_tasks_in_subprocess( time.sleep(sleep_time) -def run_parallel_processes(request_id, url: str): +def run_parallel_processes(all_task_times: List[List[float]], request_id: str, url: str): futures = list() with ProcessPoolExecutor(max_workers=N_JOBS) as executor: # # Assign the parallel jobs - for iteration_index, task_times in enumerate(TASK_TIMES): + for iteration_index, task_times_per_job in enumerate(all_task_times): futures.append( executor.submit( _run_sleep_tasks_in_subprocess, - task_times=task_times, + task_times=task_times_per_job, iteration_index=iteration_index, request_id=request_id, url=url, @@ -129,21 +129,23 @@ def run_parallel_processes(request_id, url: str): iterable=total_tasks_iterable, total=len(TASK_TIMES), desc="Total tasks completed" ) + # The 'total' progress bar bas an ID equivalent to the request ID total_tasks_progress_bar.subscribe( lambda format_dict: forward_to_http_server( url=url, request_id=request_id, progress_bar_id=request_id, **format_dict ) ) + # Trigger the deployment of the parallel jobs for _ in total_tasks_progress_bar: pass -def format_sse(data: str, event=None) -> str: - msg = f"data: {json.dumps(data)}\n\n" +def format_sse(data: str, event: Union[str, None]=None) -> str: + message = f"data: {json.dumps(data)}\n\n" if event is not None: - msg = f"event: {event}\n{msg}" - return msg + message = f"event: {event}\n{message}" + return message def listen_to_events(): @@ -197,8 +199,8 @@ async def start_server(port): # await asyncio.Future() # DEMO TWO: Queue - def update_queue(request_id, id, n, total, **kwargs): - forward_updates_over_sse(request_id, id, n, total) + def update_queue(request_id: str, progress_bar_id : str, n: int, total: int, **kwargs): + forward_updates_over_server_side_events(request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total) http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) http_server.start() @@ -207,8 +209,8 @@ def update_queue(request_id, id, n, total, **kwargs): def run_parallel_bar_demo() -> None: - """Asynchronously start the servers""" - asyncio.run(start_server(PORT)) + """Asynchronously start the servers.""" + asyncio.run(start_server(port=PORT)) if __name__ == "__main__": diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index d72555e..a5958ca 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -1,27 +1,42 @@ import queue +from typing import List, Iterable, Any, Dict from ._subscriber import TQDMProgressSubscriber class TQDMProgressHandler: def __init__(self): - self.listeners = [] + self.listeners: List[queue.Queue] = [] - def listen(self): - q = queue.Queue(maxsize=25) - self.listeners.append(q) - return q + def listen(self) -> queue.Queue + new_queue = queue.Queue(maxsize=25) + self.listeners.append(new_queue) + return new_queue + + def create_progress_subscriber(self, iterable: Iterable[Any], additional_metadata: dict = dict(), **tqdm_kwargs) -> TQDMProgressSubscriber: + + def on_progress_update(progress_update: dict): + """ + This is the injection called on every update of the progress bar. + + It triggers the announcement event over all listeners on each update of the progress bar. + """ + self._announce(message=dict(**progress_update, **additional_metadata)) - def create(self, iterable, additional_metadata: dict = dict(), **tqdm_kwargs): return TQDMProgressSubscriber( - iterable, - lambda progress_update: self._announce(dict(**progress_update, **additional_metadata)), - **tqdm_kwargs, + iterable=iterable, + on_progress_update=on_progress_update, + **tqdm_kwargs ) - def _announce(self, msg): - for i in reversed(range(len(self.listeners))): + def _announce(self, message: Dict[Any, Any]): + """ + Announce a message to all listeners. + + @garrett - can you describe the expected structure of this message? + """ + for listener_index in reversed(range(len(self.listeners))): try: - self.listeners[i].put_nowait(msg) + self.listeners[listener_index].put_nowait(item=message) except queue.Full: - del self.listeners[i] + del self.listeners[listener_index] diff --git a/src/tqdm_publisher/_publisher.py b/src/tqdm_publisher/_publisher.py index 81a7ee8..7c5d5ce 100644 --- a/src/tqdm_publisher/_publisher.py +++ b/src/tqdm_publisher/_publisher.py @@ -4,10 +4,10 @@ from tqdm import tqdm as base_tqdm -class TQDMPublisher(base_tqdm): +class TQDMProgressPublisher(base_tqdm): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.id = str(uuid4()) + self.progress_bar_id = str(uuid4()) self.callbacks = dict() # Override the update method to run callbacks @@ -19,7 +19,7 @@ def update(self, n: int = 1) -> Union[bool, None]: return displayed - def subscribe(self, callback: callable): + def subscribe(self, callback: callable) -> str: """ Subscribe to updates from the progress bar. @@ -37,7 +37,7 @@ def subscribe(self, callback: callable): Returns ------- - str + callback_id : str A unique identifier for the callback. This ID is a UUID string and can be used to reference the registered callback in future operations. @@ -55,7 +55,7 @@ def subscribe(self, callback: callable): self.callbacks[callback_id] = callback return callback_id - def unsubscribe(self, callback_id: str): + def unsubscribe(self, callback_id: str) -> bool: """ Unsubscribe a previously registered callback from the progress bar updates. @@ -72,7 +72,7 @@ def unsubscribe(self, callback_id: str): Returns ------- - bool + success : bool Returns True if the callback was successfully removed, or False if no callback was found with the given ID. diff --git a/src/tqdm_publisher/_subscriber.py b/src/tqdm_publisher/_subscriber.py index 289d3d6..99e8359 100644 --- a/src/tqdm_publisher/_subscriber.py +++ b/src/tqdm_publisher/_subscriber.py @@ -1,7 +1,19 @@ -from ._publisher import TQDMPublisher +from typing import Iterable, Any, Dict +from ._publisher import TQDMProgressPublisher -class TQDMProgressSubscriber(TQDMPublisher): - def __init__(self, iterable, on_progress_update: callable, **tqdm_kwargs): - super().__init__(iterable, **tqdm_kwargs) - self.subscribe(lambda format_dict: on_progress_update(dict(progress_bar_id=self.id, format_dict=format_dict))) + +class TQDMProgressSubscriber(TQDMProgressPublisher): + def __init__(self, iterable: Iterable[Any], on_progress_update: callable, **tqdm_kwargs): + super().__init__(iterable=iterable, **tqdm_kwargs) + + def run_on_progress_update(format_dict: Dict[str, Any]): + """ + This is the injection called on every update of the progress bar. + + It calls the `on_progress_update` function, which must take a dictionary + containing the progress bar ID and `format_dict`. + """ + on_progress_update(dict(progress_bar_id=self.progress_bar_id, format_dict=format_dict)) + + self.subscribe(run_on_progress_update) From 9a368cc46764491192b7f1f64a8eeb9147519ac7 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 10:37:56 -0400 Subject: [PATCH 02/21] debug --- .../_demos/_demo_command_line_interface.py | 1 - .../_demos/_parallel_bars/_server.py | 18 +++++++++++------- src/tqdm_publisher/_handler.py | 14 ++++++-------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/tqdm_publisher/_demos/_demo_command_line_interface.py b/src/tqdm_publisher/_demos/_demo_command_line_interface.py index 107fe18..da27593 100644 --- a/src/tqdm_publisher/_demos/_demo_command_line_interface.py +++ b/src/tqdm_publisher/_demos/_demo_command_line_interface.py @@ -54,6 +54,5 @@ def _command_line_interface(): webbrowser.open_new_tab(f"http://localhost:{CLIENT_PORT}/{client_relative_path}") demo_info["server"]() - else: print(f"{command} is an invalid command.") diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index b0ed787..e6eb279 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -27,7 +27,8 @@ BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task NUMBER_OF_TASKS_PER_JOB = 6 TASK_TIMES: List[List[float]] = [ - [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) + [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB + for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) ] WEBSOCKETS = {} @@ -37,7 +38,9 @@ def forward_updates_over_server_side_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): - progress_handler._announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs)) + progress_handler._announce( + dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) + ) class ThreadedHTTPServer: @@ -141,7 +144,7 @@ def run_parallel_processes(all_task_times: List[List[float]], request_id: str, u pass -def format_sse(data: str, event: Union[str, None]=None) -> str: +def format_sse(data: str, event: Union[str, None] = None) -> str: message = f"data: {json.dumps(data)}\n\n" if event is not None: message = f"event: {event}\n{message}" @@ -199,8 +202,10 @@ async def start_server(port): # await asyncio.Future() # DEMO TWO: Queue - def update_queue(request_id: str, progress_bar_id : str, n: int, total: int, **kwargs): - forward_updates_over_server_side_events(request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total) + def update_queue(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): + forward_updates_over_server_side_events( + request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total + ) http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) http_server.start() @@ -213,8 +218,7 @@ def run_parallel_bar_demo() -> None: asyncio.run(start_server(port=PORT)) -if __name__ == "__main__": - +def _run_parallel_bars_demo(port_flag: bool, host_flag: bool): flags_list = sys.argv[1:] port_flag = "--port" in flags_list diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index a5958ca..7c4a3b1 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -1,5 +1,5 @@ import queue -from typing import List, Iterable, Any, Dict +from typing import Any, Dict, Iterable, List from ._subscriber import TQDMProgressSubscriber @@ -8,12 +8,14 @@ class TQDMProgressHandler: def __init__(self): self.listeners: List[queue.Queue] = [] - def listen(self) -> queue.Queue + def listen(self) -> queue.Queue: new_queue = queue.Queue(maxsize=25) self.listeners.append(new_queue) return new_queue - def create_progress_subscriber(self, iterable: Iterable[Any], additional_metadata: dict = dict(), **tqdm_kwargs) -> TQDMProgressSubscriber: + def create_progress_subscriber( + self, iterable: Iterable[Any], additional_metadata: dict = dict(), **tqdm_kwargs + ) -> TQDMProgressSubscriber: def on_progress_update(progress_update: dict): """ @@ -23,11 +25,7 @@ def on_progress_update(progress_update: dict): """ self._announce(message=dict(**progress_update, **additional_metadata)) - return TQDMProgressSubscriber( - iterable=iterable, - on_progress_update=on_progress_update, - **tqdm_kwargs - ) + return TQDMProgressSubscriber(iterable=iterable, on_progress_update=on_progress_update, **tqdm_kwargs) def _announce(self, message: Dict[Any, Any]): """ From 3d9ab79c524a7630f6737d954a04f8209140304d Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 10:56:14 -0400 Subject: [PATCH 03/21] debugging --- src/tqdm_publisher/__init__.py | 2 +- src/tqdm_publisher/_demos/_multiple_bars/_server.py | 2 +- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 12 ++++++------ .../_demos/_parallel_bars/_server_ws.py | 8 ++++---- src/tqdm_publisher/_demos/_single_bar/_server.py | 2 +- tests/test_basic.py | 8 ++++---- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/tqdm_publisher/__init__.py b/src/tqdm_publisher/__init__.py index 0d28ffd..770ada9 100644 --- a/src/tqdm_publisher/__init__.py +++ b/src/tqdm_publisher/__init__.py @@ -1,5 +1,5 @@ from ._handler import TQDMProgressHandler -from ._publisher import TQDMPublisher +from ._publisher import TQDMProgressPublisher from ._subscriber import TQDMProgressSubscriber __all__ = ["TQDMProgressPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] diff --git a/src/tqdm_publisher/_demos/_multiple_bars/_server.py b/src/tqdm_publisher/_demos/_multiple_bars/_server.py index ca4a6bf..d81b912 100644 --- a/src/tqdm_publisher/_demos/_multiple_bars/_server.py +++ b/src/tqdm_publisher/_demos/_multiple_bars/_server.py @@ -44,7 +44,7 @@ def run(self): showcase of an alternative approach to defining and scoping the execution. """ all_task_durations_in_seconds = [1.0 for _ in range(10)] # Ten seconds at one task per second - self.progress_bar = tqdm_publisher.TQDMPublisher(iterable=all_task_durations_in_seconds) + self.progress_bar = tqdm_publisher.TQDMProgressPublisher(iterable=all_task_durations_in_seconds) self.progress_bar.subscribe(callback=self.update) for task_duration in self.progress_bar: diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index e6eb279..142b593 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -7,7 +7,7 @@ import time import uuid from concurrent.futures import ProcessPoolExecutor, as_completed -from typing import List +from typing import List, Union import requests from flask import Flask, Response, jsonify, request @@ -93,7 +93,7 @@ def _run_sleep_tasks_in_subprocess( subprogress_bar_id = uuid.uuid4() - sub_progress_bar = TQDMPublisher( + sub_progress_bar = TQDMProgressPublisher( iterable=task_times, position=iteration_index + 1, desc=f"Progress on iteration {iteration_index} ({id})", @@ -110,7 +110,7 @@ def _run_sleep_tasks_in_subprocess( time.sleep(sleep_time) -def run_parallel_processes(all_task_times: List[List[float]], request_id: str, url: str): +def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str, url: str): futures = list() with ProcessPoolExecutor(max_workers=N_JOBS) as executor: @@ -128,7 +128,7 @@ def run_parallel_processes(all_task_times: List[List[float]], request_id: str, u ) total_tasks_iterable = as_completed(futures) - total_tasks_progress_bar = TQDMPublisher( + total_tasks_progress_bar = TQDMProgressPublisher( iterable=total_tasks_iterable, total=len(TASK_TIMES), desc="Total tasks completed" ) @@ -169,7 +169,7 @@ def listen_to_events(): def start(): data = json.loads(request.data) if request.data else {} request_id = data["request_id"] - run_parallel_processes(request_id, f"http://localhost:{PORT}") + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=f"http://localhost:{PORT}") return jsonify({"status": "success"}) @@ -241,4 +241,4 @@ def _run_parallel_bars_demo(port_flag: bool, host_flag: bool): # Just run the parallel processes request_id = uuid.uuid4() - run_parallel_processes(request_id, URL) + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py index 088a8c7..e63dcda 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py @@ -12,7 +12,7 @@ import requests import websockets -from tqdm_publisher import TQDMProgressHandler, TQDMPublisher +from tqdm_publisher import TQDMProgressHandler, TQDMProgressPublisher from tqdm_publisher._demos._parallel_bars._client import ( create_http_server, find_free_port, @@ -119,7 +119,7 @@ def _run_sleep_tasks_in_subprocess( id = uuid.uuid4() - sub_progress_bar = TQDMPublisher( + sub_progress_bar = TQDMProgressPublisher( iterable=task_times, position=iteration_index + 1, desc=f"Progress on iteration {iteration_index} ({id})", @@ -132,7 +132,7 @@ def _run_sleep_tasks_in_subprocess( time.sleep(sleep_time) -def run_parallel_processes(request_id, url: str): +def run_parallel_processes(*, request_id: str, url: str): with ProcessPoolExecutor(max_workers=N_JOBS) as executor: @@ -222,4 +222,4 @@ def run_parallel_bar_demo() -> None: # Just run the parallel processes request_id = uuid.uuid4() - run_parallel_processes(request_id, URL) + run_parallel_processes(request_id=request_id, url=URL) diff --git a/src/tqdm_publisher/_demos/_single_bar/_server.py b/src/tqdm_publisher/_demos/_single_bar/_server.py index 32025fb..6bca796 100644 --- a/src/tqdm_publisher/_demos/_single_bar/_server.py +++ b/src/tqdm_publisher/_demos/_single_bar/_server.py @@ -15,7 +15,7 @@ def start_progress_bar(*, progress_callback: callable) -> None: Defaults are chosen for a deterministic and regular update period of one second for a total time of 10 seconds. """ all_task_durations_in_seconds = [1.0 for _ in range(10)] # Ten seconds at one second per task - progress_bar = tqdm_publisher.TQDMPublisher(iterable=all_task_durations_in_seconds) + progress_bar = tqdm_publisher.TQDMProgressPublisher(iterable=all_task_durations_in_seconds) def run_function_on_progress_update(format_dict: dict) -> None: """ diff --git a/tests/test_basic.py b/tests/test_basic.py index 5c2b10f..a511080 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -2,12 +2,12 @@ import pytest -from tqdm_publisher import TQDMPublisher +from tqdm_publisher import TQDMProgressPublisher from tqdm_publisher.testing import create_tasks def test_initialization(): - publisher = TQDMPublisher() + publisher = TQDMProgressPublisher() assert len(publisher.callbacks) == 0 @@ -27,7 +27,7 @@ def test_callback(identifier, data): assert "n" in data and "total" in data tasks = create_tasks() - publisher = TQDMPublisher(asyncio.as_completed(tasks), total=len(tasks)) + publisher = TQDMProgressPublisher(asyncio.as_completed(tasks), total=len(tasks)) n_subscriptions = 10 for i in range(n_subscriptions): @@ -51,7 +51,7 @@ def dummy_callback(data): pass tasks = [] - publisher = TQDMPublisher(asyncio.as_completed(tasks), total=len(tasks)) + publisher = TQDMProgressPublisher(asyncio.as_completed(tasks), total=len(tasks)) callback_id = publisher.subscribe(dummy_callback) result = publisher.unsubscribe(callback_id) assert result == True From 0724613099cfc84bfafddd5b300f74ad09cb83be Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 11:17:31 -0400 Subject: [PATCH 04/21] swap from try/except; enhance verbosity --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 5 +++-- src/tqdm_publisher/_handler.py | 9 ++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 142b593..fe77702 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -25,7 +25,7 @@ # For demonstration purposes, each in the list of tasks is the length of time in seconds # that each iteration of the task takes to run and update the progress bar (emulated by sleeping) BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task -NUMBER_OF_TASKS_PER_JOB = 6 +NUMBER_OF_TASKS_PER_JOB = 10 TASK_TIMES: List[List[float]] = [ [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) @@ -38,6 +38,7 @@ def forward_updates_over_server_side_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): + # TODO: shouldn't this line use `create_progress_subscriber`? Otherwise consider making `.accounce` non-private progress_handler._announce( dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) ) @@ -86,7 +87,7 @@ def _run_sleep_tasks_in_subprocess( The index of this task in the list of all tasks from the buffer map. Each index would map to a different tqdm position. request_id : int - Identifier of ??. + Identifier of the request. url : str The localhost URL to sent progress updates to. """ diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index 7c4a3b1..c4aca23 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -33,8 +33,11 @@ def _announce(self, message: Dict[Any, Any]): @garrett - can you describe the expected structure of this message? """ - for listener_index in reversed(range(len(self.listeners))): - try: + number_of_listeners = len(self.listeners) + listener_indices = range(number_of_listeners) + listener_indices_from_newest_to_oldest = reversed(listener_indices) + for listener_index in listener_indices_from_newest_to_oldest: + if not self.listeners[listener_index].full(): self.listeners[listener_index].put_nowait(item=message) - except queue.Full: + else: # When full, remove the newest listener in the stack del self.listeners[listener_index] From 14e2a55a7da9f071769756df596a21e052b542ea Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 11:20:43 -0400 Subject: [PATCH 05/21] explain locality --- src/tqdm_publisher/_handler.py | 3 +++ src/tqdm_publisher/_subscriber.py | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index c4aca23..5276353 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -22,6 +22,9 @@ def on_progress_update(progress_update: dict): This is the injection called on every update of the progress bar. It triggers the announcement event over all listeners on each update of the progress bar. + + It must be defined inside this local scope to communicate the `additional_metadata` from the level above + without including it in the method signature. """ self._announce(message=dict(**progress_update, **additional_metadata)) diff --git a/src/tqdm_publisher/_subscriber.py b/src/tqdm_publisher/_subscriber.py index 99e8359..e5715b3 100644 --- a/src/tqdm_publisher/_subscriber.py +++ b/src/tqdm_publisher/_subscriber.py @@ -1,4 +1,4 @@ -from typing import Iterable, Any, Dict +from typing import Any, Dict, Iterable from ._publisher import TQDMProgressPublisher @@ -13,6 +13,9 @@ def run_on_progress_update(format_dict: Dict[str, Any]): It calls the `on_progress_update` function, which must take a dictionary containing the progress bar ID and `format_dict`. + + It must be defined inside this local scope to include the `.progress_bar_id` attribute from the level above + without including it in the method signature. """ on_progress_update(dict(progress_bar_id=self.progress_bar_id, format_dict=format_dict)) From babf1acb137706e387beb7e1d3bfd5c63d82a956 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 11:27:11 -0400 Subject: [PATCH 06/21] simplify main runner --- .../_demos/_parallel_bars/_server.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index fe77702..ec514fd 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -219,7 +219,14 @@ def run_parallel_bar_demo() -> None: asyncio.run(start_server(port=PORT)) -def _run_parallel_bars_demo(port_flag: bool, host_flag: bool): +def _run_parallel_bars_demo(port: str, host: str): + URL = f"http://{HOST}:{PORT}" + + request_id = uuid.uuid4() + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) + + +if __name__ == "main": flags_list = sys.argv[1:] port_flag = "--port" in flags_list @@ -235,11 +242,4 @@ def _run_parallel_bars_demo(port_flag: bool, host_flag: bool): else: HOST = "localhost" - URL = f"http://{HOST}:{PORT}" if port_flag else None - - if URL is None: - raise ValueError("URL is not defined.") - - # Just run the parallel processes - request_id = uuid.uuid4() - run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) + _run_parallel_bars_demo(port=PORT, host=HOST) From 07d598411c14becf9beafbf722d75344c39088eb Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:30:14 -0400 Subject: [PATCH 07/21] Update src/tqdm_publisher/_demos/_parallel_bars/_server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index ec514fd..0ba62a6 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -220,7 +220,7 @@ def run_parallel_bar_demo() -> None: def _run_parallel_bars_demo(port: str, host: str): - URL = f"http://{HOST}:{PORT}" + URL = f"http://{host}:{port}" request_id = uuid.uuid4() run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) From d4be7329ea58c1ad6670333bd9f59558b4e6416f Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 8 Apr 2024 12:19:33 -0700 Subject: [PATCH 08/21] GF updates --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- src/tqdm_publisher/_handler.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 0ba62a6..f7cb129 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -87,7 +87,7 @@ def _run_sleep_tasks_in_subprocess( The index of this task in the list of all tasks from the buffer map. Each index would map to a different tqdm position. request_id : int - Identifier of the request. + Identifier of the request, provided by the client. url : str The localhost URL to sent progress updates to. """ diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index 5276353..d59926e 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -34,6 +34,10 @@ def _announce(self, message: Dict[Any, Any]): """ Announce a message to all listeners. + This message can be any dictionary. But, when used internally, is + expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber class, + as well as any additional metadata supplied by the create_progress_subscriber method. + @garrett - can you describe the expected structure of this message? """ number_of_listeners = len(self.listeners) From 1d90a0e9df73c7447363e85d239a3124452194ab Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 19:19:40 +0000 Subject: [PATCH 09/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index d59926e..4782859 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -34,7 +34,7 @@ def _announce(self, message: Dict[Any, Any]): """ Announce a message to all listeners. - This message can be any dictionary. But, when used internally, is + This message can be any dictionary. But, when used internally, is expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber class, as well as any additional metadata supplied by the create_progress_subscriber method. From 44dd1694561a0dbcf560d906e47ecefdd2de960c Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 8 Apr 2024 12:20:07 -0700 Subject: [PATCH 10/21] Update _handler.py --- src/tqdm_publisher/_handler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index d59926e..041d3a0 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -35,10 +35,9 @@ def _announce(self, message: Dict[Any, Any]): Announce a message to all listeners. This message can be any dictionary. But, when used internally, is - expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber class, + expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber update function, as well as any additional metadata supplied by the create_progress_subscriber method. - @garrett - can you describe the expected structure of this message? """ number_of_listeners = len(self.listeners) listener_indices = range(number_of_listeners) From b6b5ea5d433b16461de1b49e8ed6ec396e0b95e6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 19:20:29 +0000 Subject: [PATCH 11/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index 041d3a0..b0919b5 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -34,7 +34,7 @@ def _announce(self, message: Dict[Any, Any]): """ Announce a message to all listeners. - This message can be any dictionary. But, when used internally, is + This message can be any dictionary. But, when used internally, is expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber update function, as well as any additional metadata supplied by the create_progress_subscriber method. From 920ad34a99444dc3a36a615d9c82a2bdc1be59f2 Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Mon, 8 Apr 2024 22:26:47 -0400 Subject: [PATCH 12/21] add docs and simplify logic --- .../_demos/_parallel_bars/_server.py | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index f7cb129..272b7bf 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -145,11 +145,52 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_sse(data: str, event: Union[str, None] = None) -> str: - message = f"data: {json.dumps(data)}\n\n" +def format_server_side_events(*, data: str, event: Union[str, None] = None) -> str: + """ + Format multiple `data` and `event` type server-side events in a way the frontend expects. + + With reference to the following demonstration of frontend elements. + + ```javascript + const server_side_event = new EventSource("/api/v1/sse"); + + /* + * This will listen only for events + * similar to the following: + * + * event: notice + * data: useful data + * id: someid + */ + server_side_event.addEventListener("notice", (event) => { + console.log(event.data); + }); + + /* + * Similarly, this will listen for events + * with the field `event: update` + */ + server_side_event.addEventListener("update", (event) => { + console.log(event.data); + }); + + /* + * The event "message" is a special case, as it + * will capture events without an event field + * as well as events that have the specific type + * `event: message` It will not trigger on any + * other event type. + */ + server_side_event.addEventListener("message", (event) => { + console.log(event.data); + }); + ``` + """ + all_events = "" if event is not None: - message = f"event: {event}\n{message}" - return message + all_events += f"event: {event}\n" + all_events += f"data: {data}\n\n" + return all_events def listen_to_events(): From 54f4950c1cf3b685f0966f008e92ec16adc96c66 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 9 Apr 2024 02:27:40 +0000 Subject: [PATCH 13/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 272b7bf..ec52a6a 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -165,7 +165,7 @@ def format_server_side_events(*, data: str, event: Union[str, None] = None) -> s server_side_event.addEventListener("notice", (event) => { console.log(event.data); }); - + /* * Similarly, this will listen for events * with the field `event: update` @@ -173,7 +173,7 @@ def format_server_side_events(*, data: str, event: Union[str, None] = None) -> s server_side_event.addEventListener("update", (event) => { console.log(event.data); }); - + /* * The event "message" is a special case, as it * will capture events without an event field From b501b3e93ceb667340f52d02b7f81d09db31186c Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Tue, 9 Apr 2024 07:44:25 -0700 Subject: [PATCH 14/21] Update _server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index ec52a6a..6260842 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -145,9 +145,9 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_server_side_events(*, data: str, event: Union[str, None] = None) -> str: +def format_server_side_events(*, data: str, event: Union[str, None] = "message") -> str: """ - Format multiple `data` and `event` type server-side events in a way the frontend expects. + Format multiple `data` and `event` type server-side events in a way expected by Server-Sent Events. With reference to the following demonstration of frontend elements. @@ -186,11 +186,11 @@ def format_server_side_events(*, data: str, event: Union[str, None] = None) -> s }); ``` """ - all_events = "" + message = "" if event is not None: - all_events += f"event: {event}\n" - all_events += f"data: {data}\n\n" - return all_events + message += f"event: {event}\n" + message += f"data: {data}\n\n" + return message def listen_to_events(): From c474dfa02fb8e7f2df01e9144443c911a6ec2349 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Tue, 9 Apr 2024 07:45:04 -0700 Subject: [PATCH 15/21] Update _server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 6260842..d98bd34 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -145,7 +145,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_server_side_events(*, data: str, event: Union[str, None] = "message") -> str: +def format_server_side_events(*, data: str, event: str = "message") -> str: """ Format multiple `data` and `event` type server-side events in a way expected by Server-Sent Events. From 8058bdb7ceb2c60580e3a78c4c8e86e9f1dd2c5b Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Tue, 9 Apr 2024 07:47:36 -0700 Subject: [PATCH 16/21] Update _server.py --- .../_demos/_parallel_bars/_server.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index d98bd34..d86e405 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -37,7 +37,7 @@ progress_handler = TQDMProgressHandler() -def forward_updates_over_server_side_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): +def forward_updates_over_server_sent_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): # TODO: shouldn't this line use `create_progress_subscriber`? Otherwise consider making `.accounce` non-private progress_handler._announce( dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) @@ -145,14 +145,14 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_server_side_events(*, data: str, event: str = "message") -> str: +def format_server_sent_events(*, data: str, event: str = "message") -> str: """ - Format multiple `data` and `event` type server-side events in a way expected by Server-Sent Events. + Format multiple `data` and `event` type server-sent events in a way expected by the EventSource browser implementation. With reference to the following demonstration of frontend elements. ```javascript - const server_side_event = new EventSource("/api/v1/sse"); + const server_sent_event = new EventSource("/api/v1/sse"); /* * This will listen only for events @@ -162,7 +162,7 @@ def format_server_side_events(*, data: str, event: str = "message") -> str: * data: useful data * id: someid */ - server_side_event.addEventListener("notice", (event) => { + server_sent_event.addEventListener("notice", (event) => { console.log(event.data); }); @@ -170,7 +170,7 @@ def format_server_side_events(*, data: str, event: str = "message") -> str: * Similarly, this will listen for events * with the field `event: update` */ - server_side_event.addEventListener("update", (event) => { + server_sent_event.addEventListener("update", (event) => { console.log(event.data); }); @@ -181,7 +181,7 @@ def format_server_side_events(*, data: str, event: str = "message") -> str: * `event: message` It will not trigger on any * other event type. */ - server_side_event.addEventListener("message", (event) => { + server_sent_event.addEventListener("message", (event) => { console.log(event.data); }); ``` @@ -197,7 +197,7 @@ def listen_to_events(): messages = progress_handler.listen() # returns a queue.Queue while True: msg = messages.get() # blocks until a new message arrives - yield format_sse(msg) + yield format_server_sent_events(msg) app = Flask(__name__) @@ -245,7 +245,7 @@ async def start_server(port): # DEMO TWO: Queue def update_queue(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): - forward_updates_over_server_side_events( + forward_updates_over_server_sent_events( request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total ) From 91857a30f5c4c8f8e8d8e25af2ddf3b0968f85be Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Wed, 10 Apr 2024 11:04:01 -0400 Subject: [PATCH 17/21] change to public method and enhance docstring --- pyproject.toml | 7 +++-- .../_demos/_parallel_bars/_server.py | 28 +++++++++++++------ src/tqdm_publisher/_handler.py | 4 +-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9f735ab..0aa5d2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,9 +44,10 @@ test = [ ] demo = [ - "websockets==12.0", - "flask==2.3.2", - "flask-cors==4.0.0" + "requests", + "websockets", + "flask", + "flask-cors" ] [project.urls] diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index d86e405..6c8597e 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -39,7 +39,7 @@ def forward_updates_over_server_sent_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): # TODO: shouldn't this line use `create_progress_subscriber`? Otherwise consider making `.accounce` non-private - progress_handler._announce( + progress_handler.announce( dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) ) @@ -145,9 +145,9 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_server_sent_events(*, data: str, event: str = "message") -> str: +def format_server_sent_events(*, message_data: str, event_type: str = "message") -> str: """ - Format multiple `data` and `event` type server-sent events in a way expected by the EventSource browser implementation. + Format a `data` and `event` type server-sent events in a way expected by the EventSource browser implementation. With reference to the following demonstration of frontend elements. @@ -185,19 +185,29 @@ def format_server_sent_events(*, data: str, event: str = "message") -> str: console.log(event.data); }); ``` + + Parameters + ---------- + message_data : str + The message data to be sent to the client. + event_type : str, default="message" + The type of event corresponding to the message data. + + Returns + ------- + formatted_message : str + The formatted message to be sent to the client. """ - message = "" - if event is not None: - message += f"event: {event}\n" - message += f"data: {data}\n\n" + message = f"event: {event_type}\n" if event_type is not None else "" + message += f"data: {message_data}\n\n" return message def listen_to_events(): messages = progress_handler.listen() # returns a queue.Queue while True: - msg = messages.get() # blocks until a new message arrives - yield format_server_sent_events(msg) + message_data = messages.get() # blocks until a new message arrives + yield format_server_sent_events(message_data=message_data) app = Flask(__name__) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index b0919b5..9cabad4 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -26,11 +26,11 @@ def on_progress_update(progress_update: dict): It must be defined inside this local scope to communicate the `additional_metadata` from the level above without including it in the method signature. """ - self._announce(message=dict(**progress_update, **additional_metadata)) + self.announce(message=dict(**progress_update, **additional_metadata)) return TQDMProgressSubscriber(iterable=iterable, on_progress_update=on_progress_update, **tqdm_kwargs) - def _announce(self, message: Dict[Any, Any]): + def announce(self, message: Dict[Any, Any]): """ Announce a message to all listeners. From 81e34adb696c53766bcc751285a95b393eb928bd Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Wed, 10 Apr 2024 11:22:22 -0400 Subject: [PATCH 18/21] simplify logic --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 6c8597e..194cd66 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -198,7 +198,7 @@ def format_server_sent_events(*, message_data: str, event_type: str = "message") formatted_message : str The formatted message to be sent to the client. """ - message = f"event: {event_type}\n" if event_type is not None else "" + message = f"event: {event_type}\n" if event_type != "" else "" message += f"data: {message_data}\n\n" return message From 2443e876bd104be5d4c189bf620db3ae80920265 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 10 Apr 2024 09:05:45 -0700 Subject: [PATCH 19/21] Fix id references --- .../_demos/_parallel_bars/_client.js | 4 +- .../_demos/_parallel_bars/_server.py | 25 +++--- .../_demos/_parallel_bars/_server_ws.py | 87 +++++++++++-------- .../_demos/_single_bar/_server.py | 2 +- 4 files changed, 64 insertions(+), 54 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_client.js b/src/tqdm_publisher/_demos/_parallel_bars/_client.js index 43bbcf9..aa46df6 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_client.js +++ b/src/tqdm_publisher/_demos/_parallel_bars/_client.js @@ -41,8 +41,8 @@ const getBar = (request_id, id) => { // Update the specified progress bar when a message is received from the server const onProgressUpdate = (event) => { - const { request_id, id, format_dict } = JSON.parse(event.data); - const bar = getBar(request_id, id); + const { request_id, progress_bar_id, format_dict } = JSON.parse(event.data); + const bar = getBar(request_id, progress_bar_id); bar.style.width = 100 * (format_dict.n / format_dict.total) + '%'; } diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 194cd66..4fb88d7 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -31,14 +31,11 @@ for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) ] -WEBSOCKETS = {} - -## NOTE: TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess +## TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess progress_handler = TQDMProgressHandler() def forward_updates_over_server_sent_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): - # TODO: shouldn't this line use `create_progress_subscriber`? Otherwise consider making `.accounce` non-private progress_handler.announce( dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) ) @@ -130,7 +127,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str total_tasks_iterable = as_completed(futures) total_tasks_progress_bar = TQDMProgressPublisher( - iterable=total_tasks_iterable, total=len(TASK_TIMES), desc="Total tasks completed" + iterable=total_tasks_iterable, total=len(all_task_times), desc="Total tasks completed" ) # The 'total' progress bar bas an ID equivalent to the request ID @@ -198,8 +195,14 @@ def format_server_sent_events(*, message_data: str, event_type: str = "message") formatted_message : str The formatted message to be sent to the client. """ - message = f"event: {event_type}\n" if event_type != "" else "" - message += f"data: {message_data}\n\n" + + # message = f"event: {event_type}\n" if event_type != "" else "" + # message += f"data: {message_data}\n\n" + # return message + + message = f"data: {message_data}\n\n" + if event_type != "": + message = f"event: {event_type}\n{message}" return message @@ -207,7 +210,8 @@ def listen_to_events(): messages = progress_handler.listen() # returns a queue.Queue while True: message_data = messages.get() # blocks until a new message arrives - yield format_server_sent_events(message_data=message_data) + print("Message data", message_data) + yield format_server_sent_events(message_data=json.dumps(message_data)) app = Flask(__name__) @@ -248,12 +252,7 @@ async def start_server(port): flask_server = ThreadedFlaskServer(port=3768) flask_server.start() - # # DEMO ONE: Direct updates from HTTP server - # http_server = ThreadedHTTPServer(port=port, callback=forward_updates_over_sse) - # http_server.start() - # await asyncio.Future() - # DEMO TWO: Queue def update_queue(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): forward_updates_over_server_sent_events( request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py index e63dcda..8f50126 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py @@ -6,7 +6,7 @@ import threading import time import uuid -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List import requests @@ -20,24 +20,25 @@ N_JOBS = 3 +N_JOBS = 3 + # Each outer entry is a list of 'tasks' to perform on a particular worker # For demonstration purposes, each in the list of tasks is the length of time in seconds # that each iteration of the task takes to run and update the progress bar (emulated by sleeping) +BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task +NUMBER_OF_TASKS_PER_JOB = 10 TASK_TIMES: List[List[float]] = [ - [0.1 for _ in range(100)], - [0.2 for _ in range(100)], - [0.3 for _ in range(10)], - [0.4 for _ in range(10)], - [0.5 for _ in range(10)], + [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB + for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) ] WEBSOCKETS = {} -## NOTE: TQDMProgressHandler cannot be called from a process...so we just use a queue directly +## TQDMProgressHandler cannot be called from a process...so we just use a queue directly progress_handler = TQDMProgressHandler() -def forward_updates_over_websocket(request_id, id, n, total, **kwargs): +def forward_updates_over_websocket(request_id, progress_bar_id, n, total, **kwargs): ws = WEBSOCKETS.get(request_id) if ws: @@ -47,7 +48,7 @@ def forward_updates_over_websocket(request_id, id, n, total, **kwargs): message=json.dumps( obj=dict( format_dict=dict(n=n, total=total), - id=id, + progress_bar_id=progress_bar_id, request_id=request_id, ) ) @@ -82,20 +83,22 @@ def start(self): thread.start() -def forward_to_http_server(url: str, request_id: str, id: int, n: int, total: int, **kwargs): +def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, n: int, total: int, **kwargs): """ This is the parallel callback definition. Its parameters are attributes of a tqdm instance and their values are what a typical default tqdm printout to console would contain (update step `n` out of `total` iterations). """ - json_data = json.dumps(obj=dict(request_id=request_id, id=str(id), data=dict(n=n, total=total))) + json_data = json.dumps(obj=dict(request_id=request_id, id=str(progress_bar_id), data=dict(n=n, total=total))) requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"}) def _run_sleep_tasks_in_subprocess( - args, - # task_times: List[float], iteration_index: int, id: int, url: str + task_times: List[float], + iteration_index: int, + request_id: int, + url: str ): """ Run a 'task' that takes a certain amount of time to run on each worker. @@ -110,45 +113,59 @@ def _run_sleep_tasks_in_subprocess( The index of this task in the list of all tasks from the buffer map. Each index would map to a different tqdm position. id : int - Identifier of ??. + Identifier of the request, provided by the client. url : str The localhost URL to sent progress updates to. """ - task_times, iteration_index, request_id, url = args - - id = uuid.uuid4() + subprogress_bar_id = uuid.uuid4() sub_progress_bar = TQDMProgressPublisher( iterable=task_times, position=iteration_index + 1, - desc=f"Progress on iteration {iteration_index} ({id})", + desc=f"Progress on iteration {iteration_index} ({subprogress_bar_id})", leave=False, ) - sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, id, **format_dict)) + sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, subprogress_bar_id, **format_dict)) for sleep_time in sub_progress_bar: time.sleep(sleep_time) -def run_parallel_processes(*, request_id: str, url: str): +def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str, url: str): + futures = list() with ProcessPoolExecutor(max_workers=N_JOBS) as executor: # Assign the parallel jobs - job_map = executor.map( - _run_sleep_tasks_in_subprocess, - [(task_times, iteration_index, request_id, url) for iteration_index, task_times in enumerate(TASK_TIMES)], + for iteration_index, task_times_per_job in enumerate(all_task_times): + futures.append( + executor.submit( + _run_sleep_tasks_in_subprocess, + task_times=task_times_per_job, + iteration_index=iteration_index, + request_id=request_id, + url=url, + ) + ) + + total_tasks_iterable = as_completed(futures) + total_tasks_progress_bar = TQDMProgressPublisher( + iterable=total_tasks_iterable, total=len(all_task_times), desc="Total tasks completed" ) - # Send initialization for pool progress bar - forward_to_http_server(url, request_id, id=request_id, n=0, total=len(TASK_TIMES)) + # The 'total' progress bar bas an ID equivalent to the request ID + total_tasks_progress_bar.subscribe( + lambda format_dict: forward_to_http_server( + url=url, request_id=request_id, progress_bar_id=request_id, **format_dict + ) + ) - for _ in job_map: + # Trigger the deployment of the parallel jobs + for _ in total_tasks_progress_bar: pass - WEBSOCKETS = {} @@ -164,7 +181,7 @@ async def handler(url: str, websocket: websockets.WebSocketServerProtocol) -> No if message_from_client["command"] == "start": request_id = message_from_client["request_id"] WEBSOCKETS[request_id] = dict(ref=websocket, id=connection_id) - run_parallel_processes(request_id, url) + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=url) async def spawn_server() -> None: @@ -175,15 +192,9 @@ async def spawn_server() -> None: URL = f"http://localhost:{PORT}" async with websockets.serve(ws_handler=lambda websocket: handler(URL, websocket), host="", port=3768): - - # # DEMO ONE: Direct updates from HTTP server - # http_server = ThreadedHTTPServer(port=PORT, callback=forward_updates_over_websocket) - # http_server.start() - # await asyncio.Future() - - # DEMO TWO: Queue - def update_queue(request_id, id, n, total, **kwargs): - progress_handler._announce(dict(request_id=request_id, id=id, n=n, total=total)) + + def update_queue(request_id, progress_bar_id, n, total, **kwargs): + progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total)) http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) http_server.start() @@ -222,4 +233,4 @@ def run_parallel_bar_demo() -> None: # Just run the parallel processes request_id = uuid.uuid4() - run_parallel_processes(request_id=request_id, url=URL) + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) diff --git a/src/tqdm_publisher/_demos/_single_bar/_server.py b/src/tqdm_publisher/_demos/_single_bar/_server.py index 6bca796..2e4fd03 100644 --- a/src/tqdm_publisher/_demos/_single_bar/_server.py +++ b/src/tqdm_publisher/_demos/_single_bar/_server.py @@ -29,7 +29,7 @@ def run_function_on_progress_update(format_dict: dict) -> None: This specifically requires the `id` of the progress bar and the `format_dict` of the TQDM instance. """ - progress_callback(format_dict=format_dict, progress_bar_id=progress_bar.id) + progress_callback(format_dict=format_dict, progress_bar_id=progress_bar.progress_bar_id) progress_bar.subscribe(callback=run_function_on_progress_update) From 0106b5874c1f2ae9618855bcfd8016947ceb8f70 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 10 Apr 2024 09:05:48 -0700 Subject: [PATCH 20/21] Update _server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 4fb88d7..390bff7 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -144,7 +144,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str def format_server_sent_events(*, message_data: str, event_type: str = "message") -> str: """ - Format a `data` and `event` type server-sent events in a way expected by the EventSource browser implementation. + Format an `event_type` type server-sent event with `data` in a way expected by the EventSource browser implementation. With reference to the following demonstration of frontend elements. From 92402352a3de2cc84dfa523f65ef34739dcb0841 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Apr 2024 16:05:54 +0000 Subject: [PATCH 21/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../_demos/_parallel_bars/_server.py | 1 - .../_demos/_parallel_bars/_server_ws.py | 14 ++++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 390bff7..34da542 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -252,7 +252,6 @@ async def start_server(port): flask_server = ThreadedFlaskServer(port=3768) flask_server.start() - def update_queue(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): forward_updates_over_server_sent_events( request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py index 8f50126..6f301cf 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py @@ -94,12 +94,7 @@ def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, n: i requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"}) -def _run_sleep_tasks_in_subprocess( - task_times: List[float], - iteration_index: int, - request_id: int, - url: str -): +def _run_sleep_tasks_in_subprocess(task_times: List[float], iteration_index: int, request_id: int, url: str): """ Run a 'task' that takes a certain amount of time to run on each worker. @@ -127,7 +122,9 @@ def _run_sleep_tasks_in_subprocess( leave=False, ) - sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, subprogress_bar_id, **format_dict)) + sub_progress_bar.subscribe( + lambda format_dict: forward_to_http_server(url, request_id, subprogress_bar_id, **format_dict) + ) for sleep_time in sub_progress_bar: time.sleep(sleep_time) @@ -166,6 +163,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str for _ in total_tasks_progress_bar: pass + WEBSOCKETS = {} @@ -192,7 +190,7 @@ async def spawn_server() -> None: URL = f"http://localhost:{PORT}" async with websockets.serve(ws_handler=lambda websocket: handler(URL, websocket), host="", port=3768): - + def update_queue(request_id, progress_bar_id, n, total, **kwargs): progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total))