Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify demo structure #39

Merged
merged 3 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,50 @@ This is useful if you want to use `tqdm` to track the progress of a long-running
```bash
pip install tqdm_publisher
```
## Getting Started
### Basic Usage
To monitor the progress of an existing `tqdm` progress bar, simply swap the `tqdm`and `TQDMPublisher` constructors. Then, declare a callback function to handle progress updates, and subscribe it to the `TQDMPublisher` updates using the `subscribe` method _before iteration begins_.

## Usage
#### Original Code
```python
import random
import asyncio
import time

from tqdm_publisher import TQDMPublisher
from tqdm import tqdm

N_TASKS = 100

# Create a list of tasks
durations = [ random.uniform(0, 1.0) for _ in range(N_TASKS) ]

# Create a progress bar
progress_bar = tqdm(durations)

async def sleep_func(sleep_duration = 1):
await asyncio.sleep(delay=sleep_duration)
# Iterate over the progress bar
for duration in progress_bar:
time.sleep(duration) # Execute the task
```

async def run_multiple_sleeps(sleep_durations):
#### Modified Code

tasks = []
```python
import random
import time

for sleep_duration in sleep_durations:
task = asyncio.create_task(sleep_func(sleep_duration=sleep_duration))
tasks.append(task)
from tqdm_publisher import TQDMPublisher

progress_bar = TQDMPublisher(asyncio.as_completed(tasks), total=len(tasks))
callback_id = progress_bar.subscribe(lambda info: print('Progress Update', info))
N_TASKS = 100
durations = [ random.uniform(0, 1.0) for _ in range(N_TASKS) ]
progress_bar = TQDMPublisher(durations)

for f in progress_bar:
await f
# Declare a callback function to handle progress updates
on_update = lambda info: print('Progress Update', info)

progress_bar.unsubscribe(callback_id)
# Subscribe the callback to the TQDMPublisher
progress_bar.subscribe(on_update)

number_of_tasks = 10**5
sleep_durations = [random.uniform(0, 5.0) for _ in range(number_of_tasks)]
asyncio.run(run_multiple_sleeps(sleep_durations=sleep_durations))
for duration in progress_bar:
time.sleep(duration)
```

## Demo
Expand Down
58 changes: 45 additions & 13 deletions src/tqdm_publisher/demo/client.html
Original file line number Diff line number Diff line change
Expand Up @@ -74,35 +74,67 @@ <h1>tqdm_progress</h1>

<script>

const bars = document.querySelector('#bars');
const barElements = document.querySelector('#bars');

const createProgressBar = () => {
const element = document.createElement('div');
element.classList.add('progress');
const progress = document.createElement('div');
element.appendChild(progress);
barElements.appendChild(element);
return { element, progress };
}

class ProgressClient {
constructor() {
this.socket = new WebSocket('ws://localhost:8000');
#connect = (props = {}) => {

this.element = document.createElement('div');
this.element.classList.add('progress');
const progress = document.createElement('div');
this.element.appendChild(progress);
bars.appendChild(this.element);
const {
onopen = () => {},
onclose = () => {},
onmessage = () => {}
} = props;

this.socket.addEventListener('message', function (event) {
const data = JSON.parse(event.data);
progress.style.width = 100 * (data.n / data.total) + '%';
this.socket = new WebSocket('ws://localhost:8000');
this.socket.addEventListener('open', onopen);
this.socket.addEventListener('close', () => {
onclose();
setTimeout(() => this.#connect(props), 1000);
});
this.socket.addEventListener('message', onmessage);
}
constructor(props) {

this.#connect(props);

}

close() {
this.socket.close();
this.element.remove()
}

}


const bars = {}
const client = new ProgressClient({
onopen: () => console.log('Connected'),
onclose: () => console.log('Disconnected'),
onmessage: (data) => {
const { id, payload } = JSON.parse(event.data);
console.log(id, payload, bars[id]);
bars[id].style.width = 100 * (payload.n / payload.total) + '%';
}
});

const button = document.querySelector('button');
button.addEventListener('click', () => {
const client = new ProgressClient();
const { element, progress } = createProgressBar();
barElements.appendChild(element);

const id = Math.random().toString(36).substring(7);
bars[id] = progress;

client.socket.send(JSON.stringify({ command: 'start', id }));
})

</script>
Expand Down
130 changes: 23 additions & 107 deletions src/tqdm_publisher/demo/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import random
import threading
import time
from typing import List
from uuid import uuid4

Expand All @@ -12,136 +13,51 @@
from tqdm_publisher import TQDMPublisher


async def sleep_func(sleep_duration: float = 1) -> float:
await asyncio.sleep(delay=sleep_duration)
def generate_task_durations(n=100) -> List[float]:
return [random.uniform(0, 1.0) for _ in range(n)]


def create_tasks():
n = 10**5
sleep_durations = [random.uniform(0, 5.0) for _ in range(n)]
tasks = []

for sleep_duration in sleep_durations:
task = asyncio.create_task(sleep_func(sleep_duration=sleep_duration))
tasks.append(task)

return tasks


class ProgressHandler:
def __init__(self):
self.started = False
self.callbacks = []
self.callback_ids = []

def subscribe(self, callback):
self.callbacks.append(callback)

if hasattr(self, "progress_bar"):
self._subscribe(callback)

def unsubscribe(self, callback_id):
self.progress_bar.unsubscribe(callback_id)

def clear(self):
self.callbacks = []
self._clear()

def _clear(self):
for callback_id in self.callback_ids:
self.unsubscribe(callback_id)

self.callback_ids = []

async def run(self):
for f in self.progress_bar:
await f

def stop(self):
self.started = False
self.clear()
self.thread.join()

def _subscribe(self, callback):
callback_id = self.progress_bar.subscribe(callback)
self.callback_ids.append(callback_id)

async def run(self):
if hasattr(self, "progress_bar"):
print("Progress bar already running")
return

self.tasks = create_tasks()
self.progress_bar = TQDMPublisher(asyncio.as_completed(self.tasks), total=len(self.tasks))

for callback in self.callbacks:
self._subscribe(callback)

for f in self.progress_bar:
await f

self._clear()
del self.progress_bar

def thread_loop(self):
while self.started:
asyncio.run(self.run())

def start(self):
if self.started:
return

self.started = True

self.thread = threading.Thread(target=self.thread_loop) # Start infinite loop of progress bar thread
self.thread.start()


progress_handler = ProgressHandler()
def start_progress_bar(id, callback):
durations = generate_task_durations()
progress_bar = TQDMPublisher(durations)
progress_bar.subscribe(lambda info: callback(id, info))
for duration in progress_bar:
time.sleep(duration)


class WebSocketHandler:
def __init__(self):
self.clients = {}

# Initialize with any state you need
pass

def handle_task_result(self, task):
try:
task.result() # This will re-raise any exception that occurred in the task
except websockets.exceptions.ConnectionClosedOK:
print("WebSocket closed while sending message")
except Exception as e:
print(f"Error in task: {e}")
async def send(self, id, data):
await self.clients[id].send(json.dumps(data))

async def handler(self, websocket):
id = str(uuid4())
self.clients[id] = websocket # Register client connection
identifier = str(uuid4())
self.clients[identifier] = websocket # Register client connection

progress_handler.start() # Start if not started
def on_progress(id, info):

def on_progress(info):
task = asyncio.create_task(websocket.send(json.dumps(info)))
task.add_done_callback(self.handle_task_result) # Handle task result or exception

progress_handler.subscribe(on_progress)
asyncio.run(self.send(identifier, dict(id=id, payload=info)))

try:
async for message in websocket:
print("Message from client received:", message)

info = json.loads(message)

if info["command"] == "start":
thread = threading.Thread(target=start_progress_bar, args=[info["id"], on_progress])
thread.start()

finally:
# This is called when the connection is closed
del self.clients[id]
if len(self.clients) == 0:
progress_handler.stop()
del self.clients[identifier] # This is called when the connection is closed


async def spawn_server():
handler = WebSocketHandler().handler
async with websockets.serve(handler, "", 8000):
await asyncio.Future() # run forever
await asyncio.Future()


def main():
Expand Down
Loading