From 5969b4d3408ed09927fbec25b9b3d818f92c3ec6 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Sun, 7 Apr 2024 21:15:08 -0400 Subject: [PATCH 01/34] add docstrings and annotations --- src/tqdm_publisher/__init__.py | 2 +- .../_demos/_parallel_bars/_server.py | 36 ++++++++-------- src/tqdm_publisher/_handler.py | 41 +++++++++++++------ src/tqdm_publisher/_publisher.py | 12 +++--- src/tqdm_publisher/_subscriber.py | 22 +++++++--- 5 files changed, 71 insertions(+), 42 deletions(-) diff --git a/src/tqdm_publisher/__init__.py b/src/tqdm_publisher/__init__.py index 37c74a8..0d28ffd 100644 --- a/src/tqdm_publisher/__init__.py +++ b/src/tqdm_publisher/__init__.py @@ -2,4 +2,4 @@ from ._publisher import TQDMPublisher from ._subscriber import TQDMProgressSubscriber -__all__ = ["TQDMPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] +__all__ = ["TQDMProgressPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 0ab5190..b0ed787 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -13,7 +13,7 @@ from flask import Flask, Response, jsonify, request from flask_cors import CORS, cross_origin -from tqdm_publisher import TQDMProgressHandler, TQDMPublisher +from tqdm_publisher import TQDMProgressHandler, TQDMProgressPublisher from tqdm_publisher._demos._parallel_bars._client import ( create_http_server, find_free_port, @@ -24,20 +24,20 @@ # Each outer entry is a list of 'tasks' to perform on a particular worker # For demonstration purposes, each in the list of tasks is the length of time in seconds # that each iteration of the task takes to run and update the progress bar (emulated by sleeping) -SECONDS_PER_TASK = 1 +BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task NUMBER_OF_TASKS_PER_JOB = 6 TASK_TIMES: List[List[float]] = [ - [SECONDS_PER_TASK * task_index] * task_index for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) + [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) ] WEBSOCKETS = {} -## NOTE: TQDMProgressHandler cannot be called from a process...so we just use a queue directly +## NOTE: TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess progress_handler = TQDMProgressHandler() -def forward_updates_over_sse(request_id, id, n, total, **kwargs): - progress_handler._announce(dict(request_id=request_id, id=id, format_dict=dict(n=n, total=total))) +def forward_updates_over_server_side_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): + progress_handler._announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs)) class ThreadedHTTPServer: @@ -107,17 +107,17 @@ def _run_sleep_tasks_in_subprocess( time.sleep(sleep_time) -def run_parallel_processes(request_id, url: str): +def run_parallel_processes(all_task_times: List[List[float]], request_id: str, url: str): futures = list() with ProcessPoolExecutor(max_workers=N_JOBS) as executor: # # Assign the parallel jobs - for iteration_index, task_times in enumerate(TASK_TIMES): + for iteration_index, task_times_per_job in enumerate(all_task_times): futures.append( executor.submit( _run_sleep_tasks_in_subprocess, - task_times=task_times, + task_times=task_times_per_job, iteration_index=iteration_index, request_id=request_id, url=url, @@ -129,21 +129,23 @@ def run_parallel_processes(request_id, url: str): iterable=total_tasks_iterable, total=len(TASK_TIMES), desc="Total tasks completed" ) + # The 'total' progress bar bas an ID equivalent to the request ID total_tasks_progress_bar.subscribe( lambda format_dict: forward_to_http_server( url=url, request_id=request_id, progress_bar_id=request_id, **format_dict ) ) + # Trigger the deployment of the parallel jobs for _ in total_tasks_progress_bar: pass -def format_sse(data: str, event=None) -> str: - msg = f"data: {json.dumps(data)}\n\n" +def format_sse(data: str, event: Union[str, None]=None) -> str: + message = f"data: {json.dumps(data)}\n\n" if event is not None: - msg = f"event: {event}\n{msg}" - return msg + message = f"event: {event}\n{message}" + return message def listen_to_events(): @@ -197,8 +199,8 @@ async def start_server(port): # await asyncio.Future() # DEMO TWO: Queue - def update_queue(request_id, id, n, total, **kwargs): - forward_updates_over_sse(request_id, id, n, total) + def update_queue(request_id: str, progress_bar_id : str, n: int, total: int, **kwargs): + forward_updates_over_server_side_events(request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total) http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) http_server.start() @@ -207,8 +209,8 @@ def update_queue(request_id, id, n, total, **kwargs): def run_parallel_bar_demo() -> None: - """Asynchronously start the servers""" - asyncio.run(start_server(PORT)) + """Asynchronously start the servers.""" + asyncio.run(start_server(port=PORT)) if __name__ == "__main__": diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index d72555e..a5958ca 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -1,27 +1,42 @@ import queue +from typing import List, Iterable, Any, Dict from ._subscriber import TQDMProgressSubscriber class TQDMProgressHandler: def __init__(self): - self.listeners = [] + self.listeners: List[queue.Queue] = [] - def listen(self): - q = queue.Queue(maxsize=25) - self.listeners.append(q) - return q + def listen(self) -> queue.Queue + new_queue = queue.Queue(maxsize=25) + self.listeners.append(new_queue) + return new_queue + + def create_progress_subscriber(self, iterable: Iterable[Any], additional_metadata: dict = dict(), **tqdm_kwargs) -> TQDMProgressSubscriber: + + def on_progress_update(progress_update: dict): + """ + This is the injection called on every update of the progress bar. + + It triggers the announcement event over all listeners on each update of the progress bar. + """ + self._announce(message=dict(**progress_update, **additional_metadata)) - def create(self, iterable, additional_metadata: dict = dict(), **tqdm_kwargs): return TQDMProgressSubscriber( - iterable, - lambda progress_update: self._announce(dict(**progress_update, **additional_metadata)), - **tqdm_kwargs, + iterable=iterable, + on_progress_update=on_progress_update, + **tqdm_kwargs ) - def _announce(self, msg): - for i in reversed(range(len(self.listeners))): + def _announce(self, message: Dict[Any, Any]): + """ + Announce a message to all listeners. + + @garrett - can you describe the expected structure of this message? + """ + for listener_index in reversed(range(len(self.listeners))): try: - self.listeners[i].put_nowait(msg) + self.listeners[listener_index].put_nowait(item=message) except queue.Full: - del self.listeners[i] + del self.listeners[listener_index] diff --git a/src/tqdm_publisher/_publisher.py b/src/tqdm_publisher/_publisher.py index 81a7ee8..7c5d5ce 100644 --- a/src/tqdm_publisher/_publisher.py +++ b/src/tqdm_publisher/_publisher.py @@ -4,10 +4,10 @@ from tqdm import tqdm as base_tqdm -class TQDMPublisher(base_tqdm): +class TQDMProgressPublisher(base_tqdm): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.id = str(uuid4()) + self.progress_bar_id = str(uuid4()) self.callbacks = dict() # Override the update method to run callbacks @@ -19,7 +19,7 @@ def update(self, n: int = 1) -> Union[bool, None]: return displayed - def subscribe(self, callback: callable): + def subscribe(self, callback: callable) -> str: """ Subscribe to updates from the progress bar. @@ -37,7 +37,7 @@ def subscribe(self, callback: callable): Returns ------- - str + callback_id : str A unique identifier for the callback. This ID is a UUID string and can be used to reference the registered callback in future operations. @@ -55,7 +55,7 @@ def subscribe(self, callback: callable): self.callbacks[callback_id] = callback return callback_id - def unsubscribe(self, callback_id: str): + def unsubscribe(self, callback_id: str) -> bool: """ Unsubscribe a previously registered callback from the progress bar updates. @@ -72,7 +72,7 @@ def unsubscribe(self, callback_id: str): Returns ------- - bool + success : bool Returns True if the callback was successfully removed, or False if no callback was found with the given ID. diff --git a/src/tqdm_publisher/_subscriber.py b/src/tqdm_publisher/_subscriber.py index 289d3d6..99e8359 100644 --- a/src/tqdm_publisher/_subscriber.py +++ b/src/tqdm_publisher/_subscriber.py @@ -1,7 +1,19 @@ -from ._publisher import TQDMPublisher +from typing import Iterable, Any, Dict +from ._publisher import TQDMProgressPublisher -class TQDMProgressSubscriber(TQDMPublisher): - def __init__(self, iterable, on_progress_update: callable, **tqdm_kwargs): - super().__init__(iterable, **tqdm_kwargs) - self.subscribe(lambda format_dict: on_progress_update(dict(progress_bar_id=self.id, format_dict=format_dict))) + +class TQDMProgressSubscriber(TQDMProgressPublisher): + def __init__(self, iterable: Iterable[Any], on_progress_update: callable, **tqdm_kwargs): + super().__init__(iterable=iterable, **tqdm_kwargs) + + def run_on_progress_update(format_dict: Dict[str, Any]): + """ + This is the injection called on every update of the progress bar. + + It calls the `on_progress_update` function, which must take a dictionary + containing the progress bar ID and `format_dict`. + """ + on_progress_update(dict(progress_bar_id=self.progress_bar_id, format_dict=format_dict)) + + self.subscribe(run_on_progress_update) From 9a368cc46764491192b7f1f64a8eeb9147519ac7 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 10:37:56 -0400 Subject: [PATCH 02/34] debug --- .../_demos/_demo_command_line_interface.py | 1 - .../_demos/_parallel_bars/_server.py | 18 +++++++++++------- src/tqdm_publisher/_handler.py | 14 ++++++-------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/tqdm_publisher/_demos/_demo_command_line_interface.py b/src/tqdm_publisher/_demos/_demo_command_line_interface.py index 107fe18..da27593 100644 --- a/src/tqdm_publisher/_demos/_demo_command_line_interface.py +++ b/src/tqdm_publisher/_demos/_demo_command_line_interface.py @@ -54,6 +54,5 @@ def _command_line_interface(): webbrowser.open_new_tab(f"http://localhost:{CLIENT_PORT}/{client_relative_path}") demo_info["server"]() - else: print(f"{command} is an invalid command.") diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index b0ed787..e6eb279 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -27,7 +27,8 @@ BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task NUMBER_OF_TASKS_PER_JOB = 6 TASK_TIMES: List[List[float]] = [ - [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) + [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB + for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) ] WEBSOCKETS = {} @@ -37,7 +38,9 @@ def forward_updates_over_server_side_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): - progress_handler._announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs)) + progress_handler._announce( + dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) + ) class ThreadedHTTPServer: @@ -141,7 +144,7 @@ def run_parallel_processes(all_task_times: List[List[float]], request_id: str, u pass -def format_sse(data: str, event: Union[str, None]=None) -> str: +def format_sse(data: str, event: Union[str, None] = None) -> str: message = f"data: {json.dumps(data)}\n\n" if event is not None: message = f"event: {event}\n{message}" @@ -199,8 +202,10 @@ async def start_server(port): # await asyncio.Future() # DEMO TWO: Queue - def update_queue(request_id: str, progress_bar_id : str, n: int, total: int, **kwargs): - forward_updates_over_server_side_events(request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total) + def update_queue(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): + forward_updates_over_server_side_events( + request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total + ) http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) http_server.start() @@ -213,8 +218,7 @@ def run_parallel_bar_demo() -> None: asyncio.run(start_server(port=PORT)) -if __name__ == "__main__": - +def _run_parallel_bars_demo(port_flag: bool, host_flag: bool): flags_list = sys.argv[1:] port_flag = "--port" in flags_list diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index a5958ca..7c4a3b1 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -1,5 +1,5 @@ import queue -from typing import List, Iterable, Any, Dict +from typing import Any, Dict, Iterable, List from ._subscriber import TQDMProgressSubscriber @@ -8,12 +8,14 @@ class TQDMProgressHandler: def __init__(self): self.listeners: List[queue.Queue] = [] - def listen(self) -> queue.Queue + def listen(self) -> queue.Queue: new_queue = queue.Queue(maxsize=25) self.listeners.append(new_queue) return new_queue - def create_progress_subscriber(self, iterable: Iterable[Any], additional_metadata: dict = dict(), **tqdm_kwargs) -> TQDMProgressSubscriber: + def create_progress_subscriber( + self, iterable: Iterable[Any], additional_metadata: dict = dict(), **tqdm_kwargs + ) -> TQDMProgressSubscriber: def on_progress_update(progress_update: dict): """ @@ -23,11 +25,7 @@ def on_progress_update(progress_update: dict): """ self._announce(message=dict(**progress_update, **additional_metadata)) - return TQDMProgressSubscriber( - iterable=iterable, - on_progress_update=on_progress_update, - **tqdm_kwargs - ) + return TQDMProgressSubscriber(iterable=iterable, on_progress_update=on_progress_update, **tqdm_kwargs) def _announce(self, message: Dict[Any, Any]): """ From 3d9ab79c524a7630f6737d954a04f8209140304d Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 10:56:14 -0400 Subject: [PATCH 03/34] debugging --- src/tqdm_publisher/__init__.py | 2 +- src/tqdm_publisher/_demos/_multiple_bars/_server.py | 2 +- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 12 ++++++------ .../_demos/_parallel_bars/_server_ws.py | 8 ++++---- src/tqdm_publisher/_demos/_single_bar/_server.py | 2 +- tests/test_basic.py | 8 ++++---- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/tqdm_publisher/__init__.py b/src/tqdm_publisher/__init__.py index 0d28ffd..770ada9 100644 --- a/src/tqdm_publisher/__init__.py +++ b/src/tqdm_publisher/__init__.py @@ -1,5 +1,5 @@ from ._handler import TQDMProgressHandler -from ._publisher import TQDMPublisher +from ._publisher import TQDMProgressPublisher from ._subscriber import TQDMProgressSubscriber __all__ = ["TQDMProgressPublisher", "TQDMProgressSubscriber", "TQDMProgressHandler"] diff --git a/src/tqdm_publisher/_demos/_multiple_bars/_server.py b/src/tqdm_publisher/_demos/_multiple_bars/_server.py index ca4a6bf..d81b912 100644 --- a/src/tqdm_publisher/_demos/_multiple_bars/_server.py +++ b/src/tqdm_publisher/_demos/_multiple_bars/_server.py @@ -44,7 +44,7 @@ def run(self): showcase of an alternative approach to defining and scoping the execution. """ all_task_durations_in_seconds = [1.0 for _ in range(10)] # Ten seconds at one task per second - self.progress_bar = tqdm_publisher.TQDMPublisher(iterable=all_task_durations_in_seconds) + self.progress_bar = tqdm_publisher.TQDMProgressPublisher(iterable=all_task_durations_in_seconds) self.progress_bar.subscribe(callback=self.update) for task_duration in self.progress_bar: diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index e6eb279..142b593 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -7,7 +7,7 @@ import time import uuid from concurrent.futures import ProcessPoolExecutor, as_completed -from typing import List +from typing import List, Union import requests from flask import Flask, Response, jsonify, request @@ -93,7 +93,7 @@ def _run_sleep_tasks_in_subprocess( subprogress_bar_id = uuid.uuid4() - sub_progress_bar = TQDMPublisher( + sub_progress_bar = TQDMProgressPublisher( iterable=task_times, position=iteration_index + 1, desc=f"Progress on iteration {iteration_index} ({id})", @@ -110,7 +110,7 @@ def _run_sleep_tasks_in_subprocess( time.sleep(sleep_time) -def run_parallel_processes(all_task_times: List[List[float]], request_id: str, url: str): +def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str, url: str): futures = list() with ProcessPoolExecutor(max_workers=N_JOBS) as executor: @@ -128,7 +128,7 @@ def run_parallel_processes(all_task_times: List[List[float]], request_id: str, u ) total_tasks_iterable = as_completed(futures) - total_tasks_progress_bar = TQDMPublisher( + total_tasks_progress_bar = TQDMProgressPublisher( iterable=total_tasks_iterable, total=len(TASK_TIMES), desc="Total tasks completed" ) @@ -169,7 +169,7 @@ def listen_to_events(): def start(): data = json.loads(request.data) if request.data else {} request_id = data["request_id"] - run_parallel_processes(request_id, f"http://localhost:{PORT}") + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=f"http://localhost:{PORT}") return jsonify({"status": "success"}) @@ -241,4 +241,4 @@ def _run_parallel_bars_demo(port_flag: bool, host_flag: bool): # Just run the parallel processes request_id = uuid.uuid4() - run_parallel_processes(request_id, URL) + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py index 088a8c7..e63dcda 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py @@ -12,7 +12,7 @@ import requests import websockets -from tqdm_publisher import TQDMProgressHandler, TQDMPublisher +from tqdm_publisher import TQDMProgressHandler, TQDMProgressPublisher from tqdm_publisher._demos._parallel_bars._client import ( create_http_server, find_free_port, @@ -119,7 +119,7 @@ def _run_sleep_tasks_in_subprocess( id = uuid.uuid4() - sub_progress_bar = TQDMPublisher( + sub_progress_bar = TQDMProgressPublisher( iterable=task_times, position=iteration_index + 1, desc=f"Progress on iteration {iteration_index} ({id})", @@ -132,7 +132,7 @@ def _run_sleep_tasks_in_subprocess( time.sleep(sleep_time) -def run_parallel_processes(request_id, url: str): +def run_parallel_processes(*, request_id: str, url: str): with ProcessPoolExecutor(max_workers=N_JOBS) as executor: @@ -222,4 +222,4 @@ def run_parallel_bar_demo() -> None: # Just run the parallel processes request_id = uuid.uuid4() - run_parallel_processes(request_id, URL) + run_parallel_processes(request_id=request_id, url=URL) diff --git a/src/tqdm_publisher/_demos/_single_bar/_server.py b/src/tqdm_publisher/_demos/_single_bar/_server.py index 32025fb..6bca796 100644 --- a/src/tqdm_publisher/_demos/_single_bar/_server.py +++ b/src/tqdm_publisher/_demos/_single_bar/_server.py @@ -15,7 +15,7 @@ def start_progress_bar(*, progress_callback: callable) -> None: Defaults are chosen for a deterministic and regular update period of one second for a total time of 10 seconds. """ all_task_durations_in_seconds = [1.0 for _ in range(10)] # Ten seconds at one second per task - progress_bar = tqdm_publisher.TQDMPublisher(iterable=all_task_durations_in_seconds) + progress_bar = tqdm_publisher.TQDMProgressPublisher(iterable=all_task_durations_in_seconds) def run_function_on_progress_update(format_dict: dict) -> None: """ diff --git a/tests/test_basic.py b/tests/test_basic.py index 5c2b10f..a511080 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -2,12 +2,12 @@ import pytest -from tqdm_publisher import TQDMPublisher +from tqdm_publisher import TQDMProgressPublisher from tqdm_publisher.testing import create_tasks def test_initialization(): - publisher = TQDMPublisher() + publisher = TQDMProgressPublisher() assert len(publisher.callbacks) == 0 @@ -27,7 +27,7 @@ def test_callback(identifier, data): assert "n" in data and "total" in data tasks = create_tasks() - publisher = TQDMPublisher(asyncio.as_completed(tasks), total=len(tasks)) + publisher = TQDMProgressPublisher(asyncio.as_completed(tasks), total=len(tasks)) n_subscriptions = 10 for i in range(n_subscriptions): @@ -51,7 +51,7 @@ def dummy_callback(data): pass tasks = [] - publisher = TQDMPublisher(asyncio.as_completed(tasks), total=len(tasks)) + publisher = TQDMProgressPublisher(asyncio.as_completed(tasks), total=len(tasks)) callback_id = publisher.subscribe(dummy_callback) result = publisher.unsubscribe(callback_id) assert result == True From 0724613099cfc84bfafddd5b300f74ad09cb83be Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 11:17:31 -0400 Subject: [PATCH 04/34] swap from try/except; enhance verbosity --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 5 +++-- src/tqdm_publisher/_handler.py | 9 ++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 142b593..fe77702 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -25,7 +25,7 @@ # For demonstration purposes, each in the list of tasks is the length of time in seconds # that each iteration of the task takes to run and update the progress bar (emulated by sleeping) BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task -NUMBER_OF_TASKS_PER_JOB = 6 +NUMBER_OF_TASKS_PER_JOB = 10 TASK_TIMES: List[List[float]] = [ [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) @@ -38,6 +38,7 @@ def forward_updates_over_server_side_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): + # TODO: shouldn't this line use `create_progress_subscriber`? Otherwise consider making `.accounce` non-private progress_handler._announce( dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) ) @@ -86,7 +87,7 @@ def _run_sleep_tasks_in_subprocess( The index of this task in the list of all tasks from the buffer map. Each index would map to a different tqdm position. request_id : int - Identifier of ??. + Identifier of the request. url : str The localhost URL to sent progress updates to. """ diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index 7c4a3b1..c4aca23 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -33,8 +33,11 @@ def _announce(self, message: Dict[Any, Any]): @garrett - can you describe the expected structure of this message? """ - for listener_index in reversed(range(len(self.listeners))): - try: + number_of_listeners = len(self.listeners) + listener_indices = range(number_of_listeners) + listener_indices_from_newest_to_oldest = reversed(listener_indices) + for listener_index in listener_indices_from_newest_to_oldest: + if not self.listeners[listener_index].full(): self.listeners[listener_index].put_nowait(item=message) - except queue.Full: + else: # When full, remove the newest listener in the stack del self.listeners[listener_index] From 14e2a55a7da9f071769756df596a21e052b542ea Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 11:20:43 -0400 Subject: [PATCH 05/34] explain locality --- src/tqdm_publisher/_handler.py | 3 +++ src/tqdm_publisher/_subscriber.py | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index c4aca23..5276353 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -22,6 +22,9 @@ def on_progress_update(progress_update: dict): This is the injection called on every update of the progress bar. It triggers the announcement event over all listeners on each update of the progress bar. + + It must be defined inside this local scope to communicate the `additional_metadata` from the level above + without including it in the method signature. """ self._announce(message=dict(**progress_update, **additional_metadata)) diff --git a/src/tqdm_publisher/_subscriber.py b/src/tqdm_publisher/_subscriber.py index 99e8359..e5715b3 100644 --- a/src/tqdm_publisher/_subscriber.py +++ b/src/tqdm_publisher/_subscriber.py @@ -1,4 +1,4 @@ -from typing import Iterable, Any, Dict +from typing import Any, Dict, Iterable from ._publisher import TQDMProgressPublisher @@ -13,6 +13,9 @@ def run_on_progress_update(format_dict: Dict[str, Any]): It calls the `on_progress_update` function, which must take a dictionary containing the progress bar ID and `format_dict`. + + It must be defined inside this local scope to include the `.progress_bar_id` attribute from the level above + without including it in the method signature. """ on_progress_update(dict(progress_bar_id=self.progress_bar_id, format_dict=format_dict)) From babf1acb137706e387beb7e1d3bfd5c63d82a956 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 8 Apr 2024 11:27:11 -0400 Subject: [PATCH 06/34] simplify main runner --- .../_demos/_parallel_bars/_server.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index fe77702..ec514fd 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -219,7 +219,14 @@ def run_parallel_bar_demo() -> None: asyncio.run(start_server(port=PORT)) -def _run_parallel_bars_demo(port_flag: bool, host_flag: bool): +def _run_parallel_bars_demo(port: str, host: str): + URL = f"http://{HOST}:{PORT}" + + request_id = uuid.uuid4() + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) + + +if __name__ == "main": flags_list = sys.argv[1:] port_flag = "--port" in flags_list @@ -235,11 +242,4 @@ def _run_parallel_bars_demo(port_flag: bool, host_flag: bool): else: HOST = "localhost" - URL = f"http://{HOST}:{PORT}" if port_flag else None - - if URL is None: - raise ValueError("URL is not defined.") - - # Just run the parallel processes - request_id = uuid.uuid4() - run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) + _run_parallel_bars_demo(port=PORT, host=HOST) From 07d598411c14becf9beafbf722d75344c39088eb Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:30:14 -0400 Subject: [PATCH 07/34] Update src/tqdm_publisher/_demos/_parallel_bars/_server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index ec514fd..0ba62a6 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -220,7 +220,7 @@ def run_parallel_bar_demo() -> None: def _run_parallel_bars_demo(port: str, host: str): - URL = f"http://{HOST}:{PORT}" + URL = f"http://{host}:{port}" request_id = uuid.uuid4() run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) From d4be7329ea58c1ad6670333bd9f59558b4e6416f Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 8 Apr 2024 12:19:33 -0700 Subject: [PATCH 08/34] GF updates --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- src/tqdm_publisher/_handler.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 0ba62a6..f7cb129 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -87,7 +87,7 @@ def _run_sleep_tasks_in_subprocess( The index of this task in the list of all tasks from the buffer map. Each index would map to a different tqdm position. request_id : int - Identifier of the request. + Identifier of the request, provided by the client. url : str The localhost URL to sent progress updates to. """ diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index 5276353..d59926e 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -34,6 +34,10 @@ def _announce(self, message: Dict[Any, Any]): """ Announce a message to all listeners. + This message can be any dictionary. But, when used internally, is + expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber class, + as well as any additional metadata supplied by the create_progress_subscriber method. + @garrett - can you describe the expected structure of this message? """ number_of_listeners = len(self.listeners) From 1d90a0e9df73c7447363e85d239a3124452194ab Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 19:19:40 +0000 Subject: [PATCH 09/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index d59926e..4782859 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -34,7 +34,7 @@ def _announce(self, message: Dict[Any, Any]): """ Announce a message to all listeners. - This message can be any dictionary. But, when used internally, is + This message can be any dictionary. But, when used internally, is expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber class, as well as any additional metadata supplied by the create_progress_subscriber method. From 44dd1694561a0dbcf560d906e47ecefdd2de960c Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 8 Apr 2024 12:20:07 -0700 Subject: [PATCH 10/34] Update _handler.py --- src/tqdm_publisher/_handler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index d59926e..041d3a0 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -35,10 +35,9 @@ def _announce(self, message: Dict[Any, Any]): Announce a message to all listeners. This message can be any dictionary. But, when used internally, is - expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber class, + expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber update function, as well as any additional metadata supplied by the create_progress_subscriber method. - @garrett - can you describe the expected structure of this message? """ number_of_listeners = len(self.listeners) listener_indices = range(number_of_listeners) From b6b5ea5d433b16461de1b49e8ed6ec396e0b95e6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 19:20:29 +0000 Subject: [PATCH 11/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index 041d3a0..b0919b5 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -34,7 +34,7 @@ def _announce(self, message: Dict[Any, Any]): """ Announce a message to all listeners. - This message can be any dictionary. But, when used internally, is + This message can be any dictionary. But, when used internally, is expected to contain the progress_bar_id and format_dict of the TQDMProgressSubscriber update function, as well as any additional metadata supplied by the create_progress_subscriber method. From 27afa8988db603601a9856004dac6830ffb4929f Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 8 Apr 2024 13:36:09 -0700 Subject: [PATCH 12/34] Show as much metadata as possible from format_dict --- src/tqdm_publisher/_demos/_client.css | 78 ++++++++++--- .../_demos/_multiple_bars/_client.js | 4 +- .../_demos/_parallel_bars/_client.js | 42 +------ .../_demos/_parallel_bars/_client.py | 6 +- .../_demos/_parallel_bars/_server.py | 22 ++-- .../_demos/_parallel_bars/_server_ws.py | 62 +++++----- .../_demos/_single_bar/_client.js | 7 +- src/tqdm_publisher/_demos/utils/elements.js | 110 +++++++++++++++++- 8 files changed, 231 insertions(+), 100 deletions(-) diff --git a/src/tqdm_publisher/_demos/_client.css b/src/tqdm_publisher/_demos/_client.css index b478ddb..39cd6ca 100644 --- a/src/tqdm_publisher/_demos/_client.css +++ b/src/tqdm_publisher/_demos/_client.css @@ -1,7 +1,9 @@ +* { + box-sizing: border-box; +} html, body { font-family: sans-serif; - box-sizing: border-box; } h1 { @@ -10,49 +12,99 @@ h1 { font-size: 1.5rem; } +h2 { + margin: 0; + padding: 0; + font-size: 1.2rem; +} + header { display: flex; justify-content: space-between; align-items: center; + padding: 20px 0px; +} + +body > header { padding: 20px; } + #bars { display: flex; justify-content: center; align-items: center; gap: 20px; flex-wrap: wrap; + padding: 0px 20px; } -.progress { +.request-container { width: 100%; - height: 20px; - background-color: #ddd; - box-sizing: border-box; } -.progress div { - height: 100%; - background-color: #4caf50; - width: 0%; +.bar { + padding: 10px; } -.progress[data-small="true"] { - height: 10px; +.bar, .progress { + width: 100%; +} + +/* Small Bar */ +.bar[data-small="true"] { border-bottom: 1px solid gainsboro; } -.progress[data-small="true"]:last-child { +.bar > div { + display: flex; + justify-content: space-between; + align-items: center; + gap: 10px; +} + + +.bar[data-small="true"] .progress { + height: 10px; +} + +.bar[data-small="true"] .progress:last-child { border-bottom: none; } -.progress[data-small="false"] { +/* Large Bar */ +.bar[data-small="false"] { border-bottom: 1px solid black; +} + +.bar[data-small="false"] .progress { padding: 4px; } +.progress { + height: 20px; + background-color: #ddd; +} + +.progress div { + height: 100%; + background-color: #4caf50; + width: 0%; +} + .bar-container { border: 1px solid black; } + +small { + font-size: 0.7rem; + color: gray; + width: min-content; + white-space: nowrap; +} + +.bar > div:last-child > small { + font-size: 0.65rem; + margin-top: 5px; +} \ No newline at end of file diff --git a/src/tqdm_publisher/_demos/_multiple_bars/_client.js b/src/tqdm_publisher/_demos/_multiple_bars/_client.js index 29b5da9..80dde4d 100644 --- a/src/tqdm_publisher/_demos/_multiple_bars/_client.js +++ b/src/tqdm_publisher/_demos/_multiple_bars/_client.js @@ -6,7 +6,9 @@ const bars = {} // Track progress bars // Update the specified progress bar when a message is received from the server const onProgressUpdate = (event) => { const { request_id, format_dict } = JSON.parse(event.data); - bars[request_id].style.width = 100 * (format_dict.n / format_dict.total) + '%'; + const { update } = bars[request_id]; + update(format_dict); + } // Create a new WebSocket client diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_client.js b/src/tqdm_publisher/_demos/_parallel_bars/_client.js index 43bbcf9..aabc9fe 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_client.js +++ b/src/tqdm_publisher/_demos/_parallel_bars/_client.js @@ -1,49 +1,13 @@ import { EventSourceManager } from '../utils/EventSourceManager.js'; import { WebSocketManager } from '../utils/WebSocketManager.js'; -import { createProgressBar } from '../utils/elements.js'; +import { getBar } from '../utils/elements.js'; -const bars = {} // Track progress bars -const requests = {} // Track request containers - -function getRequestContainer(request_id) { - const existing = requests[request_id] - if (existing) return existing; - - // Create a new container for the progress bar - const container = document.createElement('div'); - container.id = request_id; - container.classList.add('request-container'); - const h2 = document.createElement('h2'); - h2.innerText = `Request ID: ${request_id}`; - container.append(h2); - document.body.append(container); - const barContainer = document.createElement('div'); - barContainer.classList.add('bar-container'); - container.append(barContainer); - requests[request_id] = barContainer; - - return barContainer; -} - -// Create and/or render a progress bar -const getBar = (request_id, id) => { - - if (bars[id]) return bars[id]; - - const container = getRequestContainer(request_id); - const bar = createProgressBar(container); - - bar.parentElement.setAttribute('data-small', request_id !== id); // Add a small style to the progress bar if it is not the main request bar - - return bars[id] = bar; - -} // Update the specified progress bar when a message is received from the server const onProgressUpdate = (event) => { const { request_id, id, format_dict } = JSON.parse(event.data); - const bar = getBar(request_id, id); - bar.style.width = 100 * (format_dict.n / format_dict.total) + '%'; + const { update } = getBar(request_id, id); + update(format_dict, { request_id, id }); } // Create a new message client diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_client.py b/src/tqdm_publisher/_demos/_parallel_bars/_client.py index 7168383..0f45979 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_client.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_client.py @@ -15,8 +15,8 @@ def find_free_port(): return s.getsockname()[1] # Return the port number assigned -def GLOBAL_CALLBACK(request_id, id, n, total, **kwargs): - print("Global Update", request_id, id, f"{n}/{total}") +def GLOBAL_CALLBACK(request_id, id, format_dict): + print("Global Update", request_id, id, f"{format_dict['n']}/{format_dict['total']}") def create_http_server(port: int, callback): @@ -25,7 +25,7 @@ class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler): def do_POST(self): content_length = int(self.headers["Content-Length"]) post_data = json.loads(self.rfile.read(content_length).decode("utf-8")) - callback(post_data["request_id"], post_data["id"], **post_data["data"]) + callback(post_data["request_id"], post_data["id"], post_data["data"]) self.send_response(200) self.end_headers() diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 0ab5190..639f8fd 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -36,8 +36,8 @@ progress_handler = TQDMProgressHandler() -def forward_updates_over_sse(request_id, id, n, total, **kwargs): - progress_handler._announce(dict(request_id=request_id, id=id, format_dict=dict(n=n, total=total))) +def forward_updates_over_sse(request_id, id, format_dict): + progress_handler._announce(dict(request_id=request_id, id=id, format_dict=format_dict)) class ThreadedHTTPServer: @@ -53,13 +53,13 @@ def start(self): thread.start() -def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, n: int, total: int, **kwargs): +def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, format_dict: dict): """ This is the parallel callback definition. Its parameters are attributes of a tqdm instance and their values are what a typical default tqdm printout to console would contain (update step `n` out of `total` iterations). """ - json_data = json.dumps(obj=dict(request_id=request_id, id=str(progress_bar_id), data=dict(n=n, total=total))) + json_data = json.dumps(obj=dict(request_id=request_id, id=str(progress_bar_id), data=format_dict)) requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"}) @@ -93,13 +93,13 @@ def _run_sleep_tasks_in_subprocess( sub_progress_bar = TQDMPublisher( iterable=task_times, position=iteration_index + 1, - desc=f"Progress on iteration {iteration_index} ({id})", + desc=f"Progress on iteration {iteration_index} ({subprogress_bar_id})", leave=False, ) sub_progress_bar.subscribe( lambda format_dict: forward_to_http_server( - url=url, request_id=request_id, progress_bar_id=subprogress_bar_id, **format_dict + url=url, request_id=request_id, progress_bar_id=subprogress_bar_id, format_dict=format_dict ) ) @@ -126,12 +126,14 @@ def run_parallel_processes(request_id, url: str): total_tasks_iterable = as_completed(futures) total_tasks_progress_bar = TQDMPublisher( - iterable=total_tasks_iterable, total=len(TASK_TIMES), desc="Total tasks completed" + iterable=total_tasks_iterable, + total=len(TASK_TIMES), + desc=f"Total tasks completed for {request_id}" ) total_tasks_progress_bar.subscribe( lambda format_dict: forward_to_http_server( - url=url, request_id=request_id, progress_bar_id=request_id, **format_dict + url=url, request_id=request_id, progress_bar_id=request_id, format_dict=format_dict ) ) @@ -197,8 +199,8 @@ async def start_server(port): # await asyncio.Future() # DEMO TWO: Queue - def update_queue(request_id, id, n, total, **kwargs): - forward_updates_over_sse(request_id, id, n, total) + def update_queue(request_id, id, format_dict): + forward_updates_over_sse(request_id, id, format_dict) http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) http_server.start() diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py index 088a8c7..28eb52b 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py @@ -6,7 +6,7 @@ import threading import time import uuid -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List import requests @@ -37,7 +37,7 @@ progress_handler = TQDMProgressHandler() -def forward_updates_over_websocket(request_id, id, n, total, **kwargs): +def forward_updates_over_websocket(request_id, id, format_dict): ws = WEBSOCKETS.get(request_id) if ws: @@ -46,7 +46,7 @@ def forward_updates_over_websocket(request_id, id, n, total, **kwargs): ws["ref"].send( message=json.dumps( obj=dict( - format_dict=dict(n=n, total=total), + format_dict=format_dict, id=id, request_id=request_id, ) @@ -82,20 +82,22 @@ def start(self): thread.start() -def forward_to_http_server(url: str, request_id: str, id: int, n: int, total: int, **kwargs): +def forward_to_http_server(url: str, request_id: str, id: int, format_dict: dict): """ This is the parallel callback definition. Its parameters are attributes of a tqdm instance and their values are what a typical default tqdm printout to console would contain (update step `n` out of `total` iterations). """ - json_data = json.dumps(obj=dict(request_id=request_id, id=str(id), data=dict(n=n, total=total))) + json_data = json.dumps(obj=dict(request_id=request_id, id=str(id), data=format_dict)) requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"}) def _run_sleep_tasks_in_subprocess( - args, - # task_times: List[float], iteration_index: int, id: int, url: str + task_times: List[float], + iteration_index: int, + request_id: str, + url: str, ): """ Run a 'task' that takes a certain amount of time to run on each worker. @@ -115,8 +117,6 @@ def _run_sleep_tasks_in_subprocess( The localhost URL to sent progress updates to. """ - task_times, iteration_index, request_id, url = args - id = uuid.uuid4() sub_progress_bar = TQDMPublisher( @@ -126,7 +126,7 @@ def _run_sleep_tasks_in_subprocess( leave=False, ) - sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, id, **format_dict)) + sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, id, format_dict)) for sleep_time in sub_progress_bar: time.sleep(sleep_time) @@ -134,21 +134,37 @@ def _run_sleep_tasks_in_subprocess( def run_parallel_processes(request_id, url: str): + futures = list() with ProcessPoolExecutor(max_workers=N_JOBS) as executor: - # Assign the parallel jobs - job_map = executor.map( - _run_sleep_tasks_in_subprocess, - [(task_times, iteration_index, request_id, url) for iteration_index, task_times in enumerate(TASK_TIMES)], + # # Assign the parallel jobs + for iteration_index, task_times in enumerate(TASK_TIMES): + futures.append( + executor.submit( + _run_sleep_tasks_in_subprocess, + task_times=task_times, + iteration_index=iteration_index, + request_id=request_id, + url=url, + ) + ) + + total_tasks_iterable = as_completed(futures) + total_tasks_progress_bar = TQDMPublisher( + iterable=total_tasks_iterable, + total=len(TASK_TIMES), + desc=f"Total tasks completed for {request_id}" ) - # Send initialization for pool progress bar - forward_to_http_server(url, request_id, id=request_id, n=0, total=len(TASK_TIMES)) + total_tasks_progress_bar.subscribe( + lambda format_dict: forward_to_http_server( + url=url, request_id=request_id, id=request_id, format_dict=format_dict + ) + ) - for _ in job_map: + for _ in total_tasks_progress_bar: pass - WEBSOCKETS = {} @@ -176,14 +192,8 @@ async def spawn_server() -> None: async with websockets.serve(ws_handler=lambda websocket: handler(URL, websocket), host="", port=3768): - # # DEMO ONE: Direct updates from HTTP server - # http_server = ThreadedHTTPServer(port=PORT, callback=forward_updates_over_websocket) - # http_server.start() - # await asyncio.Future() - - # DEMO TWO: Queue - def update_queue(request_id, id, n, total, **kwargs): - progress_handler._announce(dict(request_id=request_id, id=id, n=n, total=total)) + def update_queue(request_id, id, format_dict): + progress_handler._announce(dict(request_id=request_id, id=id, format_dict=format_dict)) http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) http_server.start() diff --git a/src/tqdm_publisher/_demos/_single_bar/_client.js b/src/tqdm_publisher/_demos/_single_bar/_client.js index 1596925..d08289f 100644 --- a/src/tqdm_publisher/_demos/_single_bar/_client.js +++ b/src/tqdm_publisher/_demos/_single_bar/_client.js @@ -1,14 +1,13 @@ import { WebSocketManager } from '../utils/WebSocketManager.js'; import { createProgressBar } from '../utils/elements.js'; - -const bar = createProgressBar(); // Create and render a progress bar +const { update, progress } = createProgressBar(); // Create and render a progress bar // Update the specified progress bar when a message is received from the server const onProgressUpdate = (event) => { const { format_dict } = JSON.parse(event.data); const ratio = format_dict.n / format_dict.total; - bar.style.width = `${100 * ratio}%`; + update(format_dict); // Update the progress bar with the new progress if (ratio === 1) button.removeAttribute('disabled'); // Enable the button when the progress bar is complete } @@ -20,6 +19,6 @@ const client = new WebSocketManager({ onmessage: onProgressUpdate }); const button = document.querySelector('button'); button.addEventListener('click', () => { button.setAttribute('disabled', true); // Disable the button to prevent multiple progress bars from being created - bar.style.width = 0; // Reset the progress bar + progress.style.width = 0; // Reset the progress bar client.socket.send(JSON.stringify({ command: 'start' })); // Send a message to the server to start the progress bar }) diff --git a/src/tqdm_publisher/_demos/utils/elements.js b/src/tqdm_publisher/_demos/utils/elements.js index f2ca5f9..f4e4737 100644 --- a/src/tqdm_publisher/_demos/utils/elements.js +++ b/src/tqdm_publisher/_demos/utils/elements.js @@ -1,11 +1,113 @@ export const barContainer = document.querySelector('#bars'); // Create a progress bar and append it to the bar container -export const createProgressBar = (container = barContainer) => { +export const createProgressBar = (requestContainer = barContainer) => { + + const container = document.createElement('div'); + container.classList.add('bar'); + + const row1 = document.createElement('div'); + const row2 = document.createElement('div'); + const element = document.createElement('div'); element.classList.add('progress'); const progress = document.createElement('div'); - element.appendChild(progress); - container.appendChild(element); // Render the progress bar - return progress; + + const readout = document.createElement('small'); + element.append(progress); + + row1.append(element, readout); + + const descriptionEl = document.createElement('small'); + row2.append(descriptionEl); + + container.append(row1, row2); + + requestContainer.appendChild(container); // Render the progress bar + + + const update = ( format_dict, { request_id, id } = {}) => { + + const { total, n, elapsed, rate, prefix } = format_dict; + + const percent = 100 * (n / total); + progress.style.width = `${percent}%` + + readout.innerText = `${n} / ${total} (${percent.toFixed(1)}%)`; + + + const remaining = rate && total ? (total - n) / rate : 0; // Seconds + + const description = `${prefix ? `${prefix} — ` : ''}${elapsed.toFixed(1)}s elapsed, ${remaining.toFixed(1)}s remaining`; + + if (!request_id || !id) return descriptionEl.innerText = description; + + const resolvedDescriptionEl = request_id === id ? getRequestContainer(request_id).description : descriptionEl; + + resolvedDescriptionEl.innerText = description + } + + + return { + element, + description: descriptionEl, + progress, + readout, + container, + update + }; +} + + +const BARS = {} // Track progress bars +const REQUESTS = {} // Track request containers + +// Create and/or render a progress bar +export const getBar = (request_id, id) => { + + if (BARS[id]) return BARS[id]; + + const bar = createProgressBar(getRequestContainer(request_id).bars); + + const { container } = bar; + container.setAttribute('data-small', request_id !== id); // Add a small style to the progress bar if it is not the main request bar + + return BARS[id] = bar; + +} + +export function getRequestContainer(request_id) { + const existing = REQUESTS[request_id] + if (existing) return existing; + + // Create a new container for the progress bar + const container = document.createElement('div'); + container.id = request_id; + container.classList.add('request-container'); + + const header = document.createElement('header'); + + const firstHeaderContainer = document.createElement('div'); + const h2 = document.createElement('h2'); + h2.innerText = `Request ID: ${request_id}`; + + const description = document.createElement('small'); + + firstHeaderContainer.append(h2, description); + header.append(firstHeaderContainer); + + const barsElement = document.createElement('div'); + barsElement.classList.add('bar-container'); + + container.append(header, barsElement); + barContainer.append(container); + + return REQUESTS[request_id] = { + header: h2, + description, + bars: barsElement, + element: container + }; + + } From 4cb2b06090dc399d14cf0b3fe198738066eb63bb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 8 Apr 2024 20:38:37 +0000 Subject: [PATCH 13/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_demos/_client.css | 4 ++-- .../_demos/_parallel_bars/_server.py | 4 +--- .../_demos/_parallel_bars/_server_ws.py | 7 +++---- src/tqdm_publisher/_demos/utils/elements.js | 16 ++++++++-------- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/tqdm_publisher/_demos/_client.css b/src/tqdm_publisher/_demos/_client.css index 39cd6ca..214eddd 100644 --- a/src/tqdm_publisher/_demos/_client.css +++ b/src/tqdm_publisher/_demos/_client.css @@ -39,7 +39,7 @@ body > header { padding: 0px 20px; } -.request-container { +.request-container { width: 100%; } @@ -107,4 +107,4 @@ small { .bar > div:last-child > small { font-size: 0.65rem; margin-top: 5px; -} \ No newline at end of file +} diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 6c09671..2c2e05a 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -127,9 +127,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str total_tasks_iterable = as_completed(futures) total_tasks_progress_bar = TQDMPublisher( - iterable=total_tasks_iterable, - total=len(TASK_TIMES), - desc=f"Total tasks completed for {request_id}" + iterable=total_tasks_iterable, total=len(TASK_TIMES), desc=f"Total tasks completed for {request_id}" ) # The 'total' progress bar bas an ID equivalent to the request ID diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py index c9bf972..48c9946 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py @@ -94,7 +94,7 @@ def forward_to_http_server(url: str, request_id: str, id: int, format_dict: dict def _run_sleep_tasks_in_subprocess( - task_times: List[float], + task_times: List[float], iteration_index: int, request_id: str, url: str, @@ -151,9 +151,7 @@ def run_parallel_processes(*, request_id: str, url: str): total_tasks_iterable = as_completed(futures) total_tasks_progress_bar = TQDMPublisher( - iterable=total_tasks_iterable, - total=len(TASK_TIMES), - desc=f"Total tasks completed for {request_id}" + iterable=total_tasks_iterable, total=len(TASK_TIMES), desc=f"Total tasks completed for {request_id}" ) total_tasks_progress_bar.subscribe( @@ -165,6 +163,7 @@ def run_parallel_processes(*, request_id: str, url: str): for _ in total_tasks_progress_bar: pass + WEBSOCKETS = {} diff --git a/src/tqdm_publisher/_demos/utils/elements.js b/src/tqdm_publisher/_demos/utils/elements.js index f4e4737..2ae4719 100644 --- a/src/tqdm_publisher/_demos/utils/elements.js +++ b/src/tqdm_publisher/_demos/utils/elements.js @@ -2,7 +2,7 @@ export const barContainer = document.querySelector('#bars'); // Create a progress bar and append it to the bar container export const createProgressBar = (requestContainer = barContainer) => { - + const container = document.createElement('div'); container.classList.add('bar'); @@ -35,19 +35,19 @@ export const createProgressBar = (requestContainer = barContainer) => { readout.innerText = `${n} / ${total} (${percent.toFixed(1)}%)`; - - const remaining = rate && total ? (total - n) / rate : 0; // Seconds + + const remaining = rate && total ? (total - n) / rate : 0; // Seconds const description = `${prefix ? `${prefix} — ` : ''}${elapsed.toFixed(1)}s elapsed, ${remaining.toFixed(1)}s remaining`; if (!request_id || !id) return descriptionEl.innerText = description; const resolvedDescriptionEl = request_id === id ? getRequestContainer(request_id).description : descriptionEl; - + resolvedDescriptionEl.innerText = description } - - + + return { element, description: descriptionEl, @@ -90,7 +90,7 @@ export function getRequestContainer(request_id) { const firstHeaderContainer = document.createElement('div'); const h2 = document.createElement('h2'); h2.innerText = `Request ID: ${request_id}`; - + const description = document.createElement('small'); firstHeaderContainer.append(h2, description); @@ -105,7 +105,7 @@ export function getRequestContainer(request_id) { return REQUESTS[request_id] = { header: h2, description, - bars: barsElement, + bars: barsElement, element: container }; From 920ad34a99444dc3a36a615d9c82a2bdc1be59f2 Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Mon, 8 Apr 2024 22:26:47 -0400 Subject: [PATCH 14/34] add docs and simplify logic --- .../_demos/_parallel_bars/_server.py | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index f7cb129..272b7bf 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -145,11 +145,52 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_sse(data: str, event: Union[str, None] = None) -> str: - message = f"data: {json.dumps(data)}\n\n" +def format_server_side_events(*, data: str, event: Union[str, None] = None) -> str: + """ + Format multiple `data` and `event` type server-side events in a way the frontend expects. + + With reference to the following demonstration of frontend elements. + + ```javascript + const server_side_event = new EventSource("/api/v1/sse"); + + /* + * This will listen only for events + * similar to the following: + * + * event: notice + * data: useful data + * id: someid + */ + server_side_event.addEventListener("notice", (event) => { + console.log(event.data); + }); + + /* + * Similarly, this will listen for events + * with the field `event: update` + */ + server_side_event.addEventListener("update", (event) => { + console.log(event.data); + }); + + /* + * The event "message" is a special case, as it + * will capture events without an event field + * as well as events that have the specific type + * `event: message` It will not trigger on any + * other event type. + */ + server_side_event.addEventListener("message", (event) => { + console.log(event.data); + }); + ``` + """ + all_events = "" if event is not None: - message = f"event: {event}\n{message}" - return message + all_events += f"event: {event}\n" + all_events += f"data: {data}\n\n" + return all_events def listen_to_events(): From 54f4950c1cf3b685f0966f008e92ec16adc96c66 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 9 Apr 2024 02:27:40 +0000 Subject: [PATCH 15/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 272b7bf..ec52a6a 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -165,7 +165,7 @@ def format_server_side_events(*, data: str, event: Union[str, None] = None) -> s server_side_event.addEventListener("notice", (event) => { console.log(event.data); }); - + /* * Similarly, this will listen for events * with the field `event: update` @@ -173,7 +173,7 @@ def format_server_side_events(*, data: str, event: Union[str, None] = None) -> s server_side_event.addEventListener("update", (event) => { console.log(event.data); }); - + /* * The event "message" is a special case, as it * will capture events without an event field From b501b3e93ceb667340f52d02b7f81d09db31186c Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Tue, 9 Apr 2024 07:44:25 -0700 Subject: [PATCH 16/34] Update _server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index ec52a6a..6260842 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -145,9 +145,9 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_server_side_events(*, data: str, event: Union[str, None] = None) -> str: +def format_server_side_events(*, data: str, event: Union[str, None] = "message") -> str: """ - Format multiple `data` and `event` type server-side events in a way the frontend expects. + Format multiple `data` and `event` type server-side events in a way expected by Server-Sent Events. With reference to the following demonstration of frontend elements. @@ -186,11 +186,11 @@ def format_server_side_events(*, data: str, event: Union[str, None] = None) -> s }); ``` """ - all_events = "" + message = "" if event is not None: - all_events += f"event: {event}\n" - all_events += f"data: {data}\n\n" - return all_events + message += f"event: {event}\n" + message += f"data: {data}\n\n" + return message def listen_to_events(): From c474dfa02fb8e7f2df01e9144443c911a6ec2349 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Tue, 9 Apr 2024 07:45:04 -0700 Subject: [PATCH 17/34] Update _server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 6260842..d98bd34 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -145,7 +145,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_server_side_events(*, data: str, event: Union[str, None] = "message") -> str: +def format_server_side_events(*, data: str, event: str = "message") -> str: """ Format multiple `data` and `event` type server-side events in a way expected by Server-Sent Events. From 8058bdb7ceb2c60580e3a78c4c8e86e9f1dd2c5b Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Tue, 9 Apr 2024 07:47:36 -0700 Subject: [PATCH 18/34] Update _server.py --- .../_demos/_parallel_bars/_server.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index d98bd34..d86e405 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -37,7 +37,7 @@ progress_handler = TQDMProgressHandler() -def forward_updates_over_server_side_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): +def forward_updates_over_server_sent_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): # TODO: shouldn't this line use `create_progress_subscriber`? Otherwise consider making `.accounce` non-private progress_handler._announce( dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) @@ -145,14 +145,14 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_server_side_events(*, data: str, event: str = "message") -> str: +def format_server_sent_events(*, data: str, event: str = "message") -> str: """ - Format multiple `data` and `event` type server-side events in a way expected by Server-Sent Events. + Format multiple `data` and `event` type server-sent events in a way expected by the EventSource browser implementation. With reference to the following demonstration of frontend elements. ```javascript - const server_side_event = new EventSource("/api/v1/sse"); + const server_sent_event = new EventSource("/api/v1/sse"); /* * This will listen only for events @@ -162,7 +162,7 @@ def format_server_side_events(*, data: str, event: str = "message") -> str: * data: useful data * id: someid */ - server_side_event.addEventListener("notice", (event) => { + server_sent_event.addEventListener("notice", (event) => { console.log(event.data); }); @@ -170,7 +170,7 @@ def format_server_side_events(*, data: str, event: str = "message") -> str: * Similarly, this will listen for events * with the field `event: update` */ - server_side_event.addEventListener("update", (event) => { + server_sent_event.addEventListener("update", (event) => { console.log(event.data); }); @@ -181,7 +181,7 @@ def format_server_side_events(*, data: str, event: str = "message") -> str: * `event: message` It will not trigger on any * other event type. */ - server_side_event.addEventListener("message", (event) => { + server_sent_event.addEventListener("message", (event) => { console.log(event.data); }); ``` @@ -197,7 +197,7 @@ def listen_to_events(): messages = progress_handler.listen() # returns a queue.Queue while True: msg = messages.get() # blocks until a new message arrives - yield format_sse(msg) + yield format_server_sent_events(msg) app = Flask(__name__) @@ -245,7 +245,7 @@ async def start_server(port): # DEMO TWO: Queue def update_queue(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): - forward_updates_over_server_side_events( + forward_updates_over_server_sent_events( request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total ) From 91857a30f5c4c8f8e8d8e25af2ddf3b0968f85be Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Wed, 10 Apr 2024 11:04:01 -0400 Subject: [PATCH 19/34] change to public method and enhance docstring --- pyproject.toml | 7 +++-- .../_demos/_parallel_bars/_server.py | 28 +++++++++++++------ src/tqdm_publisher/_handler.py | 4 +-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9f735ab..0aa5d2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,9 +44,10 @@ test = [ ] demo = [ - "websockets==12.0", - "flask==2.3.2", - "flask-cors==4.0.0" + "requests", + "websockets", + "flask", + "flask-cors" ] [project.urls] diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index d86e405..6c8597e 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -39,7 +39,7 @@ def forward_updates_over_server_sent_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): # TODO: shouldn't this line use `create_progress_subscriber`? Otherwise consider making `.accounce` non-private - progress_handler._announce( + progress_handler.announce( dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) ) @@ -145,9 +145,9 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str pass -def format_server_sent_events(*, data: str, event: str = "message") -> str: +def format_server_sent_events(*, message_data: str, event_type: str = "message") -> str: """ - Format multiple `data` and `event` type server-sent events in a way expected by the EventSource browser implementation. + Format a `data` and `event` type server-sent events in a way expected by the EventSource browser implementation. With reference to the following demonstration of frontend elements. @@ -185,19 +185,29 @@ def format_server_sent_events(*, data: str, event: str = "message") -> str: console.log(event.data); }); ``` + + Parameters + ---------- + message_data : str + The message data to be sent to the client. + event_type : str, default="message" + The type of event corresponding to the message data. + + Returns + ------- + formatted_message : str + The formatted message to be sent to the client. """ - message = "" - if event is not None: - message += f"event: {event}\n" - message += f"data: {data}\n\n" + message = f"event: {event_type}\n" if event_type is not None else "" + message += f"data: {message_data}\n\n" return message def listen_to_events(): messages = progress_handler.listen() # returns a queue.Queue while True: - msg = messages.get() # blocks until a new message arrives - yield format_server_sent_events(msg) + message_data = messages.get() # blocks until a new message arrives + yield format_server_sent_events(message_data=message_data) app = Flask(__name__) diff --git a/src/tqdm_publisher/_handler.py b/src/tqdm_publisher/_handler.py index b0919b5..9cabad4 100644 --- a/src/tqdm_publisher/_handler.py +++ b/src/tqdm_publisher/_handler.py @@ -26,11 +26,11 @@ def on_progress_update(progress_update: dict): It must be defined inside this local scope to communicate the `additional_metadata` from the level above without including it in the method signature. """ - self._announce(message=dict(**progress_update, **additional_metadata)) + self.announce(message=dict(**progress_update, **additional_metadata)) return TQDMProgressSubscriber(iterable=iterable, on_progress_update=on_progress_update, **tqdm_kwargs) - def _announce(self, message: Dict[Any, Any]): + def announce(self, message: Dict[Any, Any]): """ Announce a message to all listeners. From 81e34adb696c53766bcc751285a95b393eb928bd Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Wed, 10 Apr 2024 11:22:22 -0400 Subject: [PATCH 20/34] simplify logic --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 6c8597e..194cd66 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -198,7 +198,7 @@ def format_server_sent_events(*, message_data: str, event_type: str = "message") formatted_message : str The formatted message to be sent to the client. """ - message = f"event: {event_type}\n" if event_type is not None else "" + message = f"event: {event_type}\n" if event_type != "" else "" message += f"data: {message_data}\n\n" return message From 2443e876bd104be5d4c189bf620db3ae80920265 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 10 Apr 2024 09:05:45 -0700 Subject: [PATCH 21/34] Fix id references --- .../_demos/_parallel_bars/_client.js | 4 +- .../_demos/_parallel_bars/_server.py | 25 +++--- .../_demos/_parallel_bars/_server_ws.py | 87 +++++++++++-------- .../_demos/_single_bar/_server.py | 2 +- 4 files changed, 64 insertions(+), 54 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_client.js b/src/tqdm_publisher/_demos/_parallel_bars/_client.js index 43bbcf9..aa46df6 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_client.js +++ b/src/tqdm_publisher/_demos/_parallel_bars/_client.js @@ -41,8 +41,8 @@ const getBar = (request_id, id) => { // Update the specified progress bar when a message is received from the server const onProgressUpdate = (event) => { - const { request_id, id, format_dict } = JSON.parse(event.data); - const bar = getBar(request_id, id); + const { request_id, progress_bar_id, format_dict } = JSON.parse(event.data); + const bar = getBar(request_id, progress_bar_id); bar.style.width = 100 * (format_dict.n / format_dict.total) + '%'; } diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 194cd66..4fb88d7 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -31,14 +31,11 @@ for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) ] -WEBSOCKETS = {} - -## NOTE: TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess +## TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess progress_handler = TQDMProgressHandler() def forward_updates_over_server_sent_events(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): - # TODO: shouldn't this line use `create_progress_subscriber`? Otherwise consider making `.accounce` non-private progress_handler.announce( dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=dict(n=n, total=total), **kwargs) ) @@ -130,7 +127,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str total_tasks_iterable = as_completed(futures) total_tasks_progress_bar = TQDMProgressPublisher( - iterable=total_tasks_iterable, total=len(TASK_TIMES), desc="Total tasks completed" + iterable=total_tasks_iterable, total=len(all_task_times), desc="Total tasks completed" ) # The 'total' progress bar bas an ID equivalent to the request ID @@ -198,8 +195,14 @@ def format_server_sent_events(*, message_data: str, event_type: str = "message") formatted_message : str The formatted message to be sent to the client. """ - message = f"event: {event_type}\n" if event_type != "" else "" - message += f"data: {message_data}\n\n" + + # message = f"event: {event_type}\n" if event_type != "" else "" + # message += f"data: {message_data}\n\n" + # return message + + message = f"data: {message_data}\n\n" + if event_type != "": + message = f"event: {event_type}\n{message}" return message @@ -207,7 +210,8 @@ def listen_to_events(): messages = progress_handler.listen() # returns a queue.Queue while True: message_data = messages.get() # blocks until a new message arrives - yield format_server_sent_events(message_data=message_data) + print("Message data", message_data) + yield format_server_sent_events(message_data=json.dumps(message_data)) app = Flask(__name__) @@ -248,12 +252,7 @@ async def start_server(port): flask_server = ThreadedFlaskServer(port=3768) flask_server.start() - # # DEMO ONE: Direct updates from HTTP server - # http_server = ThreadedHTTPServer(port=port, callback=forward_updates_over_sse) - # http_server.start() - # await asyncio.Future() - # DEMO TWO: Queue def update_queue(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): forward_updates_over_server_sent_events( request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py index e63dcda..8f50126 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py @@ -6,7 +6,7 @@ import threading import time import uuid -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List import requests @@ -20,24 +20,25 @@ N_JOBS = 3 +N_JOBS = 3 + # Each outer entry is a list of 'tasks' to perform on a particular worker # For demonstration purposes, each in the list of tasks is the length of time in seconds # that each iteration of the task takes to run and update the progress bar (emulated by sleeping) +BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task +NUMBER_OF_TASKS_PER_JOB = 10 TASK_TIMES: List[List[float]] = [ - [0.1 for _ in range(100)], - [0.2 for _ in range(100)], - [0.3 for _ in range(10)], - [0.4 for _ in range(10)], - [0.5 for _ in range(10)], + [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB + for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) ] WEBSOCKETS = {} -## NOTE: TQDMProgressHandler cannot be called from a process...so we just use a queue directly +## TQDMProgressHandler cannot be called from a process...so we just use a queue directly progress_handler = TQDMProgressHandler() -def forward_updates_over_websocket(request_id, id, n, total, **kwargs): +def forward_updates_over_websocket(request_id, progress_bar_id, n, total, **kwargs): ws = WEBSOCKETS.get(request_id) if ws: @@ -47,7 +48,7 @@ def forward_updates_over_websocket(request_id, id, n, total, **kwargs): message=json.dumps( obj=dict( format_dict=dict(n=n, total=total), - id=id, + progress_bar_id=progress_bar_id, request_id=request_id, ) ) @@ -82,20 +83,22 @@ def start(self): thread.start() -def forward_to_http_server(url: str, request_id: str, id: int, n: int, total: int, **kwargs): +def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, n: int, total: int, **kwargs): """ This is the parallel callback definition. Its parameters are attributes of a tqdm instance and their values are what a typical default tqdm printout to console would contain (update step `n` out of `total` iterations). """ - json_data = json.dumps(obj=dict(request_id=request_id, id=str(id), data=dict(n=n, total=total))) + json_data = json.dumps(obj=dict(request_id=request_id, id=str(progress_bar_id), data=dict(n=n, total=total))) requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"}) def _run_sleep_tasks_in_subprocess( - args, - # task_times: List[float], iteration_index: int, id: int, url: str + task_times: List[float], + iteration_index: int, + request_id: int, + url: str ): """ Run a 'task' that takes a certain amount of time to run on each worker. @@ -110,45 +113,59 @@ def _run_sleep_tasks_in_subprocess( The index of this task in the list of all tasks from the buffer map. Each index would map to a different tqdm position. id : int - Identifier of ??. + Identifier of the request, provided by the client. url : str The localhost URL to sent progress updates to. """ - task_times, iteration_index, request_id, url = args - - id = uuid.uuid4() + subprogress_bar_id = uuid.uuid4() sub_progress_bar = TQDMProgressPublisher( iterable=task_times, position=iteration_index + 1, - desc=f"Progress on iteration {iteration_index} ({id})", + desc=f"Progress on iteration {iteration_index} ({subprogress_bar_id})", leave=False, ) - sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, id, **format_dict)) + sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, subprogress_bar_id, **format_dict)) for sleep_time in sub_progress_bar: time.sleep(sleep_time) -def run_parallel_processes(*, request_id: str, url: str): +def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str, url: str): + futures = list() with ProcessPoolExecutor(max_workers=N_JOBS) as executor: # Assign the parallel jobs - job_map = executor.map( - _run_sleep_tasks_in_subprocess, - [(task_times, iteration_index, request_id, url) for iteration_index, task_times in enumerate(TASK_TIMES)], + for iteration_index, task_times_per_job in enumerate(all_task_times): + futures.append( + executor.submit( + _run_sleep_tasks_in_subprocess, + task_times=task_times_per_job, + iteration_index=iteration_index, + request_id=request_id, + url=url, + ) + ) + + total_tasks_iterable = as_completed(futures) + total_tasks_progress_bar = TQDMProgressPublisher( + iterable=total_tasks_iterable, total=len(all_task_times), desc="Total tasks completed" ) - # Send initialization for pool progress bar - forward_to_http_server(url, request_id, id=request_id, n=0, total=len(TASK_TIMES)) + # The 'total' progress bar bas an ID equivalent to the request ID + total_tasks_progress_bar.subscribe( + lambda format_dict: forward_to_http_server( + url=url, request_id=request_id, progress_bar_id=request_id, **format_dict + ) + ) - for _ in job_map: + # Trigger the deployment of the parallel jobs + for _ in total_tasks_progress_bar: pass - WEBSOCKETS = {} @@ -164,7 +181,7 @@ async def handler(url: str, websocket: websockets.WebSocketServerProtocol) -> No if message_from_client["command"] == "start": request_id = message_from_client["request_id"] WEBSOCKETS[request_id] = dict(ref=websocket, id=connection_id) - run_parallel_processes(request_id, url) + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=url) async def spawn_server() -> None: @@ -175,15 +192,9 @@ async def spawn_server() -> None: URL = f"http://localhost:{PORT}" async with websockets.serve(ws_handler=lambda websocket: handler(URL, websocket), host="", port=3768): - - # # DEMO ONE: Direct updates from HTTP server - # http_server = ThreadedHTTPServer(port=PORT, callback=forward_updates_over_websocket) - # http_server.start() - # await asyncio.Future() - - # DEMO TWO: Queue - def update_queue(request_id, id, n, total, **kwargs): - progress_handler._announce(dict(request_id=request_id, id=id, n=n, total=total)) + + def update_queue(request_id, progress_bar_id, n, total, **kwargs): + progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total)) http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) http_server.start() @@ -222,4 +233,4 @@ def run_parallel_bar_demo() -> None: # Just run the parallel processes request_id = uuid.uuid4() - run_parallel_processes(request_id=request_id, url=URL) + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) diff --git a/src/tqdm_publisher/_demos/_single_bar/_server.py b/src/tqdm_publisher/_demos/_single_bar/_server.py index 6bca796..2e4fd03 100644 --- a/src/tqdm_publisher/_demos/_single_bar/_server.py +++ b/src/tqdm_publisher/_demos/_single_bar/_server.py @@ -29,7 +29,7 @@ def run_function_on_progress_update(format_dict: dict) -> None: This specifically requires the `id` of the progress bar and the `format_dict` of the TQDM instance. """ - progress_callback(format_dict=format_dict, progress_bar_id=progress_bar.id) + progress_callback(format_dict=format_dict, progress_bar_id=progress_bar.progress_bar_id) progress_bar.subscribe(callback=run_function_on_progress_update) From 0106b5874c1f2ae9618855bcfd8016947ceb8f70 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 10 Apr 2024 09:05:48 -0700 Subject: [PATCH 22/34] Update _server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 4fb88d7..390bff7 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -144,7 +144,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str def format_server_sent_events(*, message_data: str, event_type: str = "message") -> str: """ - Format a `data` and `event` type server-sent events in a way expected by the EventSource browser implementation. + Format an `event_type` type server-sent event with `data` in a way expected by the EventSource browser implementation. With reference to the following demonstration of frontend elements. From 92402352a3de2cc84dfa523f65ef34739dcb0841 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Apr 2024 16:05:54 +0000 Subject: [PATCH 23/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../_demos/_parallel_bars/_server.py | 1 - .../_demos/_parallel_bars/_server_ws.py | 14 ++++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 390bff7..34da542 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -252,7 +252,6 @@ async def start_server(port): flask_server = ThreadedFlaskServer(port=3768) flask_server.start() - def update_queue(request_id: str, progress_bar_id: str, n: int, total: int, **kwargs): forward_updates_over_server_sent_events( request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py index 8f50126..6f301cf 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py @@ -94,12 +94,7 @@ def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, n: i requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"}) -def _run_sleep_tasks_in_subprocess( - task_times: List[float], - iteration_index: int, - request_id: int, - url: str -): +def _run_sleep_tasks_in_subprocess(task_times: List[float], iteration_index: int, request_id: int, url: str): """ Run a 'task' that takes a certain amount of time to run on each worker. @@ -127,7 +122,9 @@ def _run_sleep_tasks_in_subprocess( leave=False, ) - sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, subprogress_bar_id, **format_dict)) + sub_progress_bar.subscribe( + lambda format_dict: forward_to_http_server(url, request_id, subprogress_bar_id, **format_dict) + ) for sleep_time in sub_progress_bar: time.sleep(sleep_time) @@ -166,6 +163,7 @@ def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str for _ in total_tasks_progress_bar: pass + WEBSOCKETS = {} @@ -192,7 +190,7 @@ async def spawn_server() -> None: URL = f"http://localhost:{PORT}" async with websockets.serve(ws_handler=lambda websocket: handler(URL, websocket), host="", port=3768): - + def update_queue(request_id, progress_bar_id, n, total, **kwargs): progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, n=n, total=total)) From 788b7849beb4736d3f6249911f0f2427985df17d Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 10 Apr 2024 09:14:20 -0700 Subject: [PATCH 24/34] Update elements.js --- src/tqdm_publisher/_demos/utils/elements.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/tqdm_publisher/_demos/utils/elements.js b/src/tqdm_publisher/_demos/utils/elements.js index 2ae4719..cfb5190 100644 --- a/src/tqdm_publisher/_demos/utils/elements.js +++ b/src/tqdm_publisher/_demos/utils/elements.js @@ -26,7 +26,7 @@ export const createProgressBar = (requestContainer = barContainer) => { requestContainer.appendChild(container); // Render the progress bar - const update = ( format_dict, { request_id, id } = {}) => { + const update = ( format_dict, { request_id, progress_bar_id } = {}) => { const { total, n, elapsed, rate, prefix } = format_dict; @@ -40,9 +40,9 @@ export const createProgressBar = (requestContainer = barContainer) => { const description = `${prefix ? `${prefix} — ` : ''}${elapsed.toFixed(1)}s elapsed, ${remaining.toFixed(1)}s remaining`; - if (!request_id || !id) return descriptionEl.innerText = description; + if (!request_id || !progress_bar_id) return descriptionEl.innerText = description; - const resolvedDescriptionEl = request_id === id ? getRequestContainer(request_id).description : descriptionEl; + const resolvedDescriptionEl = request_id === progress_bar_id ? getRequestContainer(request_id).description : descriptionEl; resolvedDescriptionEl.innerText = description } @@ -63,16 +63,16 @@ const BARS = {} // Track progress bars const REQUESTS = {} // Track request containers // Create and/or render a progress bar -export const getBar = (request_id, id) => { +export const getBar = (request_id, progress_bar_id) => { - if (BARS[id]) return BARS[id]; + if (BARS[progress_bar_id]) return BARS[progress_bar_id]; const bar = createProgressBar(getRequestContainer(request_id).bars); const { container } = bar; - container.setAttribute('data-small', request_id !== id); // Add a small style to the progress bar if it is not the main request bar + container.setAttribute('data-small', request_id !== progress_bar_id); // Add a small style to the progress bar if it is not the main request bar - return BARS[id] = bar; + return BARS[progress_bar_id] = bar; } From 19a7886b863ff46c2bfbdde20ebe9d389039e5ec Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 10 Apr 2024 09:20:05 -0700 Subject: [PATCH 25/34] Fix client --- src/tqdm_publisher/_demos/_client.css | 2 +- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 1 - src/tqdm_publisher/_demos/utils/elements.js | 12 ++++++------ 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/tqdm_publisher/_demos/_client.css b/src/tqdm_publisher/_demos/_client.css index 214eddd..7cd9a6c 100644 --- a/src/tqdm_publisher/_demos/_client.css +++ b/src/tqdm_publisher/_demos/_client.css @@ -104,7 +104,7 @@ small { white-space: nowrap; } -.bar > div:last-child > small { +.bar > div:last-child > small:not(:empty) { font-size: 0.65rem; margin-top: 5px; } diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index d38eb77..57d1f89 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -208,7 +208,6 @@ def listen_to_events(): messages = progress_handler.listen() # returns a queue.Queue while True: message_data = messages.get() # blocks until a new message arrives - print("Message data", message_data) yield format_server_sent_events(message_data=json.dumps(message_data)) diff --git a/src/tqdm_publisher/_demos/utils/elements.js b/src/tqdm_publisher/_demos/utils/elements.js index cfb5190..276272c 100644 --- a/src/tqdm_publisher/_demos/utils/elements.js +++ b/src/tqdm_publisher/_demos/utils/elements.js @@ -1,3 +1,7 @@ + +const BARS = {} // Track progress bars +const REQUESTS = {} // Track request containers + export const barContainer = document.querySelector('#bars'); // Create a progress bar and append it to the bar container @@ -58,12 +62,8 @@ export const createProgressBar = (requestContainer = barContainer) => { }; } - -const BARS = {} // Track progress bars -const REQUESTS = {} // Track request containers - -// Create and/or render a progress bar -export const getBar = (request_id, progress_bar_id) => { +// Create + render a progress bar +export function getBar (request_id, progress_bar_id) { if (BARS[progress_bar_id]) return BARS[progress_bar_id]; From 1b26552d4c11c904b17ec68874ee2b83bd6b38fa Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Wed, 10 Apr 2024 12:04:27 -0700 Subject: [PATCH 26/34] Remove WS demo for parallel demos --- .../_demos/_demo_command_line_interface.py | 7 +- .../_demos/_parallel_bars/_client.js | 3 - .../_demos/_parallel_bars/_server_ws.py | 233 ------------------ 3 files changed, 2 insertions(+), 241 deletions(-) delete mode 100644 src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py diff --git a/src/tqdm_publisher/_demos/_demo_command_line_interface.py b/src/tqdm_publisher/_demos/_demo_command_line_interface.py index da27593..7148b73 100644 --- a/src/tqdm_publisher/_demos/_demo_command_line_interface.py +++ b/src/tqdm_publisher/_demos/_demo_command_line_interface.py @@ -6,9 +6,7 @@ from tqdm_publisher._demos._multiple_bars._server import run_multiple_bar_demo from tqdm_publisher._demos._parallel_bars._server import run_parallel_bar_demo -from tqdm_publisher._demos._parallel_bars._server_ws import ( - run_parallel_bar_demo as run_parallel_bar_demo_ws, -) + from tqdm_publisher._demos._single_bar._server import run_single_bar_demo CLIENT_PORT = 1234 @@ -16,8 +14,7 @@ DEMOS = { "demo_single": dict(subpath="_single_bar", server=run_single_bar_demo), "demo_multiple": dict(subpath="_multiple_bars", server=run_multiple_bar_demo), - "demo_parallel": dict(subpath="_parallel_bars", server=run_parallel_bar_demo), - "demo_parallel_ws": dict(subpath="_parallel_bars", server=run_parallel_bar_demo_ws), + "demo_parallel": dict(subpath="_parallel_bars", server=run_parallel_bar_demo) } DEMO_BASE_FOLDER_PATH = Path(__file__).parent diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_client.js b/src/tqdm_publisher/_demos/_parallel_bars/_client.js index a0cf2bd..c0324e4 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_client.js +++ b/src/tqdm_publisher/_demos/_parallel_bars/_client.js @@ -1,5 +1,4 @@ import { EventSourceManager } from '../utils/EventSourceManager.js'; -import { WebSocketManager } from '../utils/WebSocketManager.js'; import { getBar } from '../utils/elements.js'; @@ -11,7 +10,6 @@ const onProgressUpdate = (event) => { } // Create a new message client -const wsClient = new WebSocketManager({ onmessage: onProgressUpdate }, 3); const client = new EventSourceManager({ onmessage: onProgressUpdate }); // Declare that the HTML Button should create a new progress bar when clicked @@ -20,5 +18,4 @@ button.addEventListener('click', async () => { const request_id = Math.random().toString(36).substring(7); // Create a unique ID for the progress bar getBar(request_id, request_id); // Create a bar specifically for this request await client.send({ command: 'start', request_id }).catch(() => {}); // Send a message to the server to start the progress bar - wsClient.send({ command: 'start', request_id }); // Send a message to the server to start the progress bar }) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py b/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py deleted file mode 100644 index 7f5d71a..0000000 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server_ws.py +++ /dev/null @@ -1,233 +0,0 @@ -"""Demo of parallel tqdm.""" - -import asyncio -import json -import sys -import threading -import time -import uuid -from concurrent.futures import ProcessPoolExecutor, as_completed -from typing import List - -import requests -import websockets - -from tqdm_publisher import TQDMProgressHandler, TQDMProgressPublisher -from tqdm_publisher._demos._parallel_bars._client import ( - create_http_server, - find_free_port, -) - -N_JOBS = 3 - -N_JOBS = 3 - -# Each outer entry is a list of 'tasks' to perform on a particular worker -# For demonstration purposes, each in the list of tasks is the length of time in seconds -# that each iteration of the task takes to run and update the progress bar (emulated by sleeping) -BASE_SECONDS_PER_TASK = 0.5 # The base time for each task; actual time increases proportional to the index of the task -NUMBER_OF_TASKS_PER_JOB = 10 -TASK_TIMES: List[List[float]] = [ - [BASE_SECONDS_PER_TASK * task_index] * NUMBER_OF_TASKS_PER_JOB - for task_index in range(1, NUMBER_OF_TASKS_PER_JOB + 1) -] - -WEBSOCKETS = {} - -## TQDMProgressHandler cannot be called from a process...so we just use a queue directly -progress_handler = TQDMProgressHandler() - - -def forward_updates_over_websocket(request_id, progress_bar_id, format_dict): - ws = WEBSOCKETS.get(request_id) - - if ws: - - asyncio.run( - ws["ref"].send( - message=json.dumps( - obj=dict( - format_dict=format_dict, - progress_bar_id=progress_bar_id, - request_id=request_id, - ) - ) - ) - ) - - -class ThreadedHTTPServer: - def __init__(self, port: int, callback): - self.port = port - self.callback = callback - - def run(self): - create_http_server(port=self.port, callback=self.callback) - - def start(self): - thread = threading.Thread(target=self.run) - thread.start() - - -class ThreadedQueueTask: - def run(self): - - progress_queue = progress_handler.listen() - - while True: - msg = progress_queue.get() - forward_updates_over_websocket(**msg) - - def start(self): - thread = threading.Thread(target=self.run) - thread.start() - - -def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, format_dict: dict): - """ - This is the parallel callback definition. - Its parameters are attributes of a tqdm instance and their values are what a typical default tqdm printout - to console would contain (update step `n` out of `total` iterations). - """ - json_data = json.dumps(obj=dict(request_id=request_id, id=str(progress_bar_id), data=format_dict)) - - requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"}) - - -def _run_sleep_tasks_in_subprocess( - task_times: List[float], - iteration_index: int, - request_id: str, - url: str, -): - """ - Run a 'task' that takes a certain amount of time to run on each worker. - - In this case that task is simply time.sleep. - - Parameters - ---------- - sleep_time : float - The amount of time this task emulates having taken to complete. - iteration_index : int - The index of this task in the list of all tasks from the buffer map. - Each index would map to a different tqdm position. - id : int - Identifier of the request, provided by the client. - url : str - The localhost URL to sent progress updates to. - """ - - subprogress_bar_id = uuid.uuid4() - - sub_progress_bar = TQDMProgressPublisher( - iterable=task_times, - position=iteration_index + 1, - desc=f"Progress on iteration {iteration_index} ({subprogress_bar_id})", - leave=False, - ) - - sub_progress_bar.subscribe(lambda format_dict: forward_to_http_server(url, request_id, subprogress_bar_id, format_dict)) - - for sleep_time in sub_progress_bar: - time.sleep(sleep_time) - - -def run_parallel_processes(*, all_task_times: List[List[float]], request_id: str, url: str): - - futures = list() - with ProcessPoolExecutor(max_workers=N_JOBS) as executor: - - # Assign the parallel jobs - for iteration_index, task_times_per_job in enumerate(all_task_times): - futures.append( - executor.submit( - _run_sleep_tasks_in_subprocess, - task_times=task_times_per_job, - iteration_index=iteration_index, - request_id=request_id, - url=url, - ) - ) - - total_tasks_iterable = as_completed(futures) - total_tasks_progress_bar = TQDMProgressPublisher( - iterable=total_tasks_iterable, total=len(all_task_times), desc=f"Total tasks completed for {request_id}" - ) - - # The 'total' progress bar bas an ID equivalent to the request ID - total_tasks_progress_bar.subscribe( - lambda format_dict: forward_to_http_server( - url=url, request_id=request_id, progress_bar_id=request_id, format_dict=format_dict - ) - ) - - # Trigger the deployment of the parallel jobs - for _ in total_tasks_progress_bar: - pass - -async def handler(url: str, websocket: websockets.WebSocketServerProtocol) -> None: - """Handle messages from the client and manage the client connections.""" - - connection_id = uuid.uuid4() - - # Wait for messages from the client - async for message in websocket: - message_from_client = json.loads(message) - - if message_from_client["command"] == "start": - request_id = message_from_client["request_id"] - WEBSOCKETS[request_id] = dict(ref=websocket, id=connection_id) - run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=url) - - -async def spawn_server() -> None: - """Spawn the server asynchronously.""" - - PORT = find_free_port() - - URL = f"http://localhost:{PORT}" - - async with websockets.serve(ws_handler=lambda websocket: handler(URL, websocket), host="", port=3768): - - def update_queue(request_id, progress_bar_id, format_dict): - progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict)) - - http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) - http_server.start() - - queue_task = ThreadedQueueTask() - queue_task.start() - await asyncio.Future() - - -def run_parallel_bar_demo() -> None: - """Trigger the execution of the asynchronous spawn.""" - asyncio.run(spawn_server()) - - -if __name__ == "__main__": - - flags_list = sys.argv[1:] - - port_flag = "--port" in flags_list - host_flag = "--host" in flags_list - - if port_flag: - port_index = flags_list.index("--port") - PORT = flags_list[port_index + 1] - - if host_flag: - host_index = flags_list.index("--host") - HOST = flags_list[host_index + 1] - else: - HOST = "localhost" - - URL = f"http://{HOST}:{PORT}" if port_flag else None - - if URL is None: - raise ValueError("URL is not defined.") - - # Just run the parallel processes - request_id = uuid.uuid4() - run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) From 7627082b2b969ad9897b31a7c767a6bb1a3afb5c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 10 Apr 2024 19:14:45 +0000 Subject: [PATCH 27/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_demos/_demo_command_line_interface.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tqdm_publisher/_demos/_demo_command_line_interface.py b/src/tqdm_publisher/_demos/_demo_command_line_interface.py index 7148b73..452d37a 100644 --- a/src/tqdm_publisher/_demos/_demo_command_line_interface.py +++ b/src/tqdm_publisher/_demos/_demo_command_line_interface.py @@ -6,7 +6,6 @@ from tqdm_publisher._demos._multiple_bars._server import run_multiple_bar_demo from tqdm_publisher._demos._parallel_bars._server import run_parallel_bar_demo - from tqdm_publisher._demos._single_bar._server import run_single_bar_demo CLIENT_PORT = 1234 @@ -14,7 +13,7 @@ DEMOS = { "demo_single": dict(subpath="_single_bar", server=run_single_bar_demo), "demo_multiple": dict(subpath="_multiple_bars", server=run_multiple_bar_demo), - "demo_parallel": dict(subpath="_parallel_bars", server=run_parallel_bar_demo) + "demo_parallel": dict(subpath="_parallel_bars", server=run_parallel_bar_demo), } DEMO_BASE_FOLDER_PATH = Path(__file__).parent From 38575085bcaf0b59ed32f7ac321c6eb0f0ea3366 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Sat, 13 Apr 2024 09:01:49 -0700 Subject: [PATCH 28/34] Request initial state immediately --- src/tqdm_publisher/_publisher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_publisher.py b/src/tqdm_publisher/_publisher.py index 7c5d5ce..647571b 100644 --- a/src/tqdm_publisher/_publisher.py +++ b/src/tqdm_publisher/_publisher.py @@ -16,7 +16,7 @@ def update(self, n: int = 1) -> Union[bool, None]: for callback in self.callbacks.values(): callback(self.format_dict) - + return displayed def subscribe(self, callback: callable) -> str: @@ -53,6 +53,7 @@ def subscribe(self, callback: callable) -> str: callback_id = str(uuid4()) self.callbacks[callback_id] = callback + callback(self.format_dict) # Call the callback immediately to show the current state return callback_id def unsubscribe(self, callback_id: str) -> bool: From 4594d38cd6b294a3d4e719909165e4023d46477e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 13 Apr 2024 16:01:57 +0000 Subject: [PATCH 29/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_publisher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tqdm_publisher/_publisher.py b/src/tqdm_publisher/_publisher.py index 647571b..7ff34c2 100644 --- a/src/tqdm_publisher/_publisher.py +++ b/src/tqdm_publisher/_publisher.py @@ -16,7 +16,7 @@ def update(self, n: int = 1) -> Union[bool, None]: for callback in self.callbacks.values(): callback(self.format_dict) - + return displayed def subscribe(self, callback: callable) -> str: @@ -53,7 +53,7 @@ def subscribe(self, callback: callable) -> str: callback_id = str(uuid4()) self.callbacks[callback_id] = callback - callback(self.format_dict) # Call the callback immediately to show the current state + callback(self.format_dict) # Call the callback immediately to show the current state return callback_id def unsubscribe(self, callback_id: str) -> bool: From 95a3c274dc978fd34a1793b8947d11b96d51b679 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Sat, 13 Apr 2024 20:51:55 -0700 Subject: [PATCH 30/34] Update elements.js --- src/tqdm_publisher/_demos/utils/elements.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tqdm_publisher/_demos/utils/elements.js b/src/tqdm_publisher/_demos/utils/elements.js index 276272c..ddd8dc9 100644 --- a/src/tqdm_publisher/_demos/utils/elements.js +++ b/src/tqdm_publisher/_demos/utils/elements.js @@ -46,8 +46,8 @@ export const createProgressBar = (requestContainer = barContainer) => { if (!request_id || !progress_bar_id) return descriptionEl.innerText = description; - const resolvedDescriptionEl = request_id === progress_bar_id ? getRequestContainer(request_id).description : descriptionEl; - + // const resolvedDescriptionEl = request_id === progress_bar_id ? getRequestContainer(request_id).description : descriptionEl; + const resolvedDescriptionEl = descriptionEl; resolvedDescriptionEl.innerText = description } From 34a51eb0d7b91f4c6020b029fa4ffaf93ba1a24a Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Sat, 13 Apr 2024 20:52:36 -0700 Subject: [PATCH 31/34] Update elements.js --- src/tqdm_publisher/_demos/utils/elements.js | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/tqdm_publisher/_demos/utils/elements.js b/src/tqdm_publisher/_demos/utils/elements.js index ddd8dc9..26633b4 100644 --- a/src/tqdm_publisher/_demos/utils/elements.js +++ b/src/tqdm_publisher/_demos/utils/elements.js @@ -42,13 +42,7 @@ export const createProgressBar = (requestContainer = barContainer) => { const remaining = rate && total ? (total - n) / rate : 0; // Seconds - const description = `${prefix ? `${prefix} — ` : ''}${elapsed.toFixed(1)}s elapsed, ${remaining.toFixed(1)}s remaining`; - - if (!request_id || !progress_bar_id) return descriptionEl.innerText = description; - - // const resolvedDescriptionEl = request_id === progress_bar_id ? getRequestContainer(request_id).description : descriptionEl; - const resolvedDescriptionEl = descriptionEl; - resolvedDescriptionEl.innerText = description + descriptionEl.innerText = `${prefix ? `${prefix} — ` : ''}${elapsed.toFixed(1)}s elapsed, ${remaining.toFixed(1)}s remaining`; } From d167e494414ede953068282a402dc409fc2cd207 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Mon, 15 Apr 2024 13:35:01 -0500 Subject: [PATCH 32/34] Simplified parallel structure --- .../_demos/_parallel_bars/_client.py | 62 --------------- .../_demos/_parallel_bars/_server.py | 75 +++++-------------- 2 files changed, 20 insertions(+), 117 deletions(-) delete mode 100644 src/tqdm_publisher/_demos/_parallel_bars/_client.py diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_client.py b/src/tqdm_publisher/_demos/_parallel_bars/_client.py deleted file mode 100644 index 0f45979..0000000 --- a/src/tqdm_publisher/_demos/_parallel_bars/_client.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Demo of parallel tqdm client.""" - -# HTTP server addition -import http.server -import json -import signal -import socket -import socketserver -import sys - - -def find_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) # Bind to a free port provided by the host - return s.getsockname()[1] # Return the port number assigned - - -def GLOBAL_CALLBACK(request_id, id, format_dict): - print("Global Update", request_id, id, f"{format_dict['n']}/{format_dict['total']}") - - -def create_http_server(port: int, callback): - class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler): - - def do_POST(self): - content_length = int(self.headers["Content-Length"]) - post_data = json.loads(self.rfile.read(content_length).decode("utf-8")) - callback(post_data["request_id"], post_data["id"], post_data["data"]) - self.send_response(200) - self.end_headers() - - with socketserver.TCPServer(("", port), MyHttpRequestHandler) as httpd: - - def signal_handler(signal, frame): - print("\n\nInterrupt signal received. Closing server...") - httpd.server_close() - httpd.socket.close() - print("Server closed.") - sys.exit(0) - - try: - signal.signal(signal.SIGINT, signal_handler) - except: - pass # Allow to work in thread - - print(f"Serving HTTP on port {port}") - httpd.serve_forever() - - -if __name__ == "__main__": - - flags_list = sys.argv[1:] - - port_flag = "--port" in flags_list - - if port_flag: - port_index = flags_list.index("--port") - PORT = int(flags_list[port_index + 1]) - else: - PORT = find_free_port() - - create_http_server(port=PORT, callback=GLOBAL_CALLBACK) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 57d1f89..3ff1b3f 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -3,21 +3,16 @@ import asyncio import json import sys -import threading import time import uuid from concurrent.futures import ProcessPoolExecutor, as_completed -from typing import List, Union +from typing import List import requests from flask import Flask, Response, jsonify, request from flask_cors import CORS, cross_origin from tqdm_publisher import TQDMProgressHandler, TQDMProgressPublisher -from tqdm_publisher._demos._parallel_bars._client import ( - create_http_server, - find_free_port, -) N_JOBS = 3 @@ -32,25 +27,7 @@ ] ## TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess -progress_handler = TQDMProgressHandler() - - -def forward_updates_over_server_sent_events(request_id, progress_bar_id, format_dict): - progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict)) - - -class ThreadedHTTPServer: - def __init__(self, port: int, callback): - self.port = port - self.callback = callback - - def run(self): - create_http_server(port=self.port, callback=self.callback) - - def start(self): - thread = threading.Thread(target=self.run) - thread.start() - +progress_handler = TQDMProgressHandler() def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, format_dict: dict): """ @@ -214,7 +191,7 @@ def listen_to_events(): app = Flask(__name__) cors = CORS(app) app.config["CORS_HEADERS"] = "Content-Type" -PORT = find_free_port() +PORT = 3768 # find_free_port() @app.route("/start", methods=["POST"]) @@ -222,42 +199,31 @@ def listen_to_events(): def start(): data = json.loads(request.data) if request.data else {} request_id = data["request_id"] - run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=f"http://localhost:{PORT}") + url = f"http://localhost:{PORT}/update" + app.logger.info(url) + + run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=url) return jsonify({"status": "success"}) +@app.route("/update", methods=["POST"]) +@cross_origin() +def update(): + data = json.loads(request.data) if request.data else {} + request_id = data["request_id"] + progress_bar_id = data["id"] + format_dict = data["data"] + + # Forward updates over Sever-Side Events + progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict)) + @app.route("/events", methods=["GET"]) @cross_origin() def events(): return Response(listen_to_events(), mimetype="text/event-stream") - -class ThreadedFlaskServer: - def __init__(self, port: int): - self.port = port - - def run(self): - app.run(host="localhost", port=self.port) - - def start(self): - thread = threading.Thread(target=self.run) - thread.start() - - async def start_server(port): - - flask_server = ThreadedFlaskServer(port=3768) - flask_server.start() - - def update_queue(request_id: str, progress_bar_id: str, format_dict: dict): - forward_updates_over_server_sent_events( - request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict - ) - - http_server = ThreadedHTTPServer(port=PORT, callback=update_queue) - http_server.start() - - await asyncio.Future() + app.run(host="localhost", port=port) def run_parallel_bar_demo() -> None: @@ -266,8 +232,7 @@ def run_parallel_bar_demo() -> None: def _run_parallel_bars_demo(port: str, host: str): - URL = f"http://{host}:{port}" - + URL = f"http://{host}:{port}/update" request_id = uuid.uuid4() run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL) From 49f828bb385a0eba5d83a2ce7c4fb994f14e7b4f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 15 Apr 2024 18:37:22 +0000 Subject: [PATCH 33/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index 3ff1b3f..d651a79 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -27,7 +27,8 @@ ] ## TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess -progress_handler = TQDMProgressHandler() +progress_handler = TQDMProgressHandler() + def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, format_dict: dict): """ @@ -191,7 +192,7 @@ def listen_to_events(): app = Flask(__name__) cors = CORS(app) app.config["CORS_HEADERS"] = "Content-Type" -PORT = 3768 # find_free_port() +PORT = 3768 # find_free_port() @app.route("/start", methods=["POST"]) @@ -217,11 +218,13 @@ def update(): # Forward updates over Sever-Side Events progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict)) + @app.route("/events", methods=["GET"]) @cross_origin() def events(): return Response(listen_to_events(), mimetype="text/event-stream") + async def start_server(port): app.run(host="localhost", port=port) From 54734ca19124031f523d59a42a54a8dcd04f7670 Mon Sep 17 00:00:00 2001 From: Garrett Michael Flynn Date: Tue, 16 Apr 2024 14:44:10 -0500 Subject: [PATCH 34/34] Update _server.py --- src/tqdm_publisher/_demos/_parallel_bars/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tqdm_publisher/_demos/_parallel_bars/_server.py b/src/tqdm_publisher/_demos/_parallel_bars/_server.py index d651a79..46a8a60 100644 --- a/src/tqdm_publisher/_demos/_parallel_bars/_server.py +++ b/src/tqdm_publisher/_demos/_parallel_bars/_server.py @@ -192,7 +192,7 @@ def listen_to_events(): app = Flask(__name__) cors = CORS(app) app.config["CORS_HEADERS"] = "Content-Type" -PORT = 3768 # find_free_port() +PORT = 3768 @app.route("/start", methods=["POST"])