diff --git a/README.md b/README.md index 6dd30cf..2086356 100644 --- a/README.md +++ b/README.md @@ -11,36 +11,50 @@ This is useful if you want to use `tqdm` to track the progress of a long-running ```bash pip install tqdm_publisher ``` +## Getting Started +### Basic Usage +To monitor the progress of an existing `tqdm` progress bar, simply swap the `tqdm`and `TQDMPublisher` constructors. Then, declare a callback function to handle progress updates, and subscribe it to the `TQDMPublisher` updates using the `subscribe` method _before iteration begins_. -## Usage +#### Original Code ```python import random -import asyncio +import time -from tqdm_publisher import TQDMPublisher +from tqdm import tqdm + +N_TASKS = 100 + +# Create a list of tasks +durations = [ random.uniform(0, 1.0) for _ in range(N_TASKS) ] + +# Create a progress bar +progress_bar = tqdm(durations) -async def sleep_func(sleep_duration = 1): - await asyncio.sleep(delay=sleep_duration) +# Iterate over the progress bar +for duration in progress_bar: + time.sleep(duration) # Execute the task +``` -async def run_multiple_sleeps(sleep_durations): +#### Modified Code - tasks = [] +```python +import random +import time - for sleep_duration in sleep_durations: - task = asyncio.create_task(sleep_func(sleep_duration=sleep_duration)) - tasks.append(task) +from tqdm_publisher import TQDMPublisher - progress_bar = TQDMPublisher(asyncio.as_completed(tasks), total=len(tasks)) - callback_id = progress_bar.subscribe(lambda info: print('Progress Update', info)) +N_TASKS = 100 +durations = [ random.uniform(0, 1.0) for _ in range(N_TASKS) ] +progress_bar = TQDMPublisher(durations) - for f in progress_bar: - await f +# Declare a callback function to handle progress updates +on_update = lambda info: print('Progress Update', info) - progress_bar.unsubscribe(callback_id) +# Subscribe the callback to the TQDMPublisher +progress_bar.subscribe(on_update) -number_of_tasks = 10**5 -sleep_durations = [random.uniform(0, 5.0) for _ in range(number_of_tasks)] -asyncio.run(run_multiple_sleeps(sleep_durations=sleep_durations)) +for duration in progress_bar: + time.sleep(duration) ``` ## Demo diff --git a/src/tqdm_publisher/demo/client.html b/src/tqdm_publisher/demo/client.html index 67ed769..3b716e2 100644 --- a/src/tqdm_publisher/demo/client.html +++ b/src/tqdm_publisher/demo/client.html @@ -74,35 +74,67 @@

tqdm_progress

diff --git a/src/tqdm_publisher/demo/server.py b/src/tqdm_publisher/demo/server.py index 02d68dd..abaa212 100644 --- a/src/tqdm_publisher/demo/server.py +++ b/src/tqdm_publisher/demo/server.py @@ -4,6 +4,7 @@ import json import random import threading +import time from typing import List from uuid import uuid4 @@ -12,136 +13,51 @@ from tqdm_publisher import TQDMPublisher -async def sleep_func(sleep_duration: float = 1) -> float: - await asyncio.sleep(delay=sleep_duration) +def generate_task_durations(n=100) -> List[float]: + return [random.uniform(0, 1.0) for _ in range(n)] -def create_tasks(): - n = 10**5 - sleep_durations = [random.uniform(0, 5.0) for _ in range(n)] - tasks = [] - - for sleep_duration in sleep_durations: - task = asyncio.create_task(sleep_func(sleep_duration=sleep_duration)) - tasks.append(task) - - return tasks - - -class ProgressHandler: - def __init__(self): - self.started = False - self.callbacks = [] - self.callback_ids = [] - - def subscribe(self, callback): - self.callbacks.append(callback) - - if hasattr(self, "progress_bar"): - self._subscribe(callback) - - def unsubscribe(self, callback_id): - self.progress_bar.unsubscribe(callback_id) - - def clear(self): - self.callbacks = [] - self._clear() - - def _clear(self): - for callback_id in self.callback_ids: - self.unsubscribe(callback_id) - - self.callback_ids = [] - - async def run(self): - for f in self.progress_bar: - await f - - def stop(self): - self.started = False - self.clear() - self.thread.join() - - def _subscribe(self, callback): - callback_id = self.progress_bar.subscribe(callback) - self.callback_ids.append(callback_id) - - async def run(self): - if hasattr(self, "progress_bar"): - print("Progress bar already running") - return - - self.tasks = create_tasks() - self.progress_bar = TQDMPublisher(asyncio.as_completed(self.tasks), total=len(self.tasks)) - - for callback in self.callbacks: - self._subscribe(callback) - - for f in self.progress_bar: - await f - - self._clear() - del self.progress_bar - - def thread_loop(self): - while self.started: - asyncio.run(self.run()) - - def start(self): - if self.started: - return - - self.started = True - - self.thread = threading.Thread(target=self.thread_loop) # Start infinite loop of progress bar thread - self.thread.start() - - -progress_handler = ProgressHandler() +def start_progress_bar(id, callback): + durations = generate_task_durations() + progress_bar = TQDMPublisher(durations) + progress_bar.subscribe(lambda info: callback(id, info)) + for duration in progress_bar: + time.sleep(duration) class WebSocketHandler: def __init__(self): self.clients = {} - - # Initialize with any state you need pass - def handle_task_result(self, task): - try: - task.result() # This will re-raise any exception that occurred in the task - except websockets.exceptions.ConnectionClosedOK: - print("WebSocket closed while sending message") - except Exception as e: - print(f"Error in task: {e}") + async def send(self, id, data): + await self.clients[id].send(json.dumps(data)) async def handler(self, websocket): - id = str(uuid4()) - self.clients[id] = websocket # Register client connection + identifier = str(uuid4()) + self.clients[identifier] = websocket # Register client connection - progress_handler.start() # Start if not started + def on_progress(id, info): - def on_progress(info): - task = asyncio.create_task(websocket.send(json.dumps(info))) - task.add_done_callback(self.handle_task_result) # Handle task result or exception - - progress_handler.subscribe(on_progress) + asyncio.run(self.send(identifier, dict(id=id, payload=info))) try: async for message in websocket: - print("Message from client received:", message) + + info = json.loads(message) + + if info["command"] == "start": + thread = threading.Thread(target=start_progress_bar, args=[info["id"], on_progress]) + thread.start() finally: - # This is called when the connection is closed - del self.clients[id] - if len(self.clients) == 0: - progress_handler.stop() + del self.clients[identifier] # This is called when the connection is closed async def spawn_server(): handler = WebSocketHandler().handler async with websockets.serve(handler, "", 8000): - await asyncio.Future() # run forever + await asyncio.Future() def main():