Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallel demo #37

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ dist
.coverage
.coverage.*
codecov.xml

.spyproject/
78 changes: 0 additions & 78 deletions demo/parallel/server.py

This file was deleted.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"License :: OSI Approved :: BSD License",
"Intended Audience :: Developers",
"Operating System :: Microsoft :: Windows",
Expand All @@ -51,8 +52,8 @@ demo = [
"Homepage" = "https://github.com/catalystneuro/tqdm_publisher"
"Bug Tracker" = "https://github.com/catalystneuro/tqdm_publisher/issues"

[project.gui-scripts]
tqdm_publisher = "demo.demo_cli:main"
[project.scripts]
tqdm_publisher = "tqdm_publisher._demo._demo_command_line_interface:_command_line_interface"

[tool.black]
line-length = 120
Expand Down
Empty file.
29 changes: 18 additions & 11 deletions demo/demo_cli.py → ...her/_demo/_demo_command_line_interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""Command line interface for running the TQDM Publisher demo."""

import os
import signal
import subprocess
import sys
Expand All @@ -15,21 +18,22 @@
subprocesses = []


def close_subprocesses():
def _close_subprocesses():
for process in subprocesses:
process.terminate() # Send SIGTERM to subprocess
sys.exit(0)


def signal_handler(signal, frame):
def _signal_handler(signal, frame):
print("Interrupt signal received. Shutting down subprocesses...")
close_subprocesses()
_close_subprocesses()


signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGINT, _signal_handler)


def main():
def _command_line_interface():
"""Should be called only through the package entrypoint."""
if len(sys.argv) <= 1:
print("No command provided. Please specify a command (e.g. 'demo').")
return
Expand All @@ -50,10 +54,13 @@ def main():

if command == "demo":
if flags["client"]:
subprocesses.append(subprocess.Popen(["open", client_path]))
if sys.platform == "win32":
os.system(f'start "" "{client_path}"')
else:
subprocess.run(["open", client_path])

if flags["server"]:
subprocesses.append(subprocess.Popen(["python", server_path]))
subprocess.run(["python", server_path])

elif command == "parallel-demo":
HOST = "localhost"
Expand All @@ -75,7 +82,7 @@ def main():
client_args = ["python", parallel_client_path]
if flags["both"]:
client_args += ["--port", str(PORT), "--host", HOST]
subprocesses.append(subprocess.Popen(client_args))
subprocess.run(client_args)
if flags["both"]:
time.sleep(1) # Ensure server starts before client connects

Expand All @@ -91,8 +98,8 @@ def main():

if __name__ == "__main__":
try:
main()
except KeyboardInterrupt as e:
_command_line_interface()
except KeyboardInterrupt:
print("\n\nInterrupt signal received. Shutting down subprocesses...")
finally:
close_subprocesses()
_close_subprocesses()
File renamed without changes.
File renamed without changes.
Empty file.
File renamed without changes.
101 changes: 101 additions & 0 deletions src/tqdm_publisher/_demo/parallel/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Demo of parallel tqdm."""

import json
import sys
import time
import uuid
from concurrent.futures import ProcessPoolExecutor
from typing import List

import requests

from tqdm_publisher import TQDMPublisher


def to_main_process(id: int, url: str, n: int, total: int, **kwargs):
"""
This is the parallel callback definition.

Its parameters are attributes of a tqdm instance and their values are what a typical default tqdm printout
to console would contain (update step `n` out of `total` iterations).
"""
json_data = json.dumps(obj=dict(id=str(id), data=dict(n=n, total=total)))

requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"})


def _run_sleep_tasks_in_subprocess(task_times: List[float], iteration_index: int, id: int, url: str):
"""
Run a 'task' that takes a certain amount of time to run on each worker.

In this case that task is simply time.sleep.

Parameters
----------
sleep_time : float
The amount of time this task emulates having taken to complete.
iteration_index : int
The index of this task in the list of all tasks from the buffer map.
Each index would map to a different tqdm position.
id : int
Identifier of ??.
url : str
The localhost URL to sent progress updates to.
"""
sub_progress_bar = TQDMPublisher(
iterable=task_times,
position=iteration_index + 1,
desc=f"Progress on iteration {iteration_index} ({id})",
leave=False,
)

if url:
sub_progress_bar.subscribe(to_main_process)

for sleep_time in sub_progress_bar:
time.sleep(sleep_time)


if __name__ == "__main__":
number_of_jobs = 3

# Each outer entry is a list of 'tasks' to perform on a particular worker
# For demonstration purposes, each in the list of tasks is the length of time in seconds
# that each iteration of the task takes to run and update the progress bar (emulated by sleeping)
all_task_times: List[List[float]] = [
[4.2, 6.7, 8.5, 10.3, 4.2, 8.1],
[20.5, 10.7, 5.3],
[12.4, 5.2, 4.9, 5.1],
[5.7, 5.8],
]

flags_list = sys.argv[1:]

port_flag = "--port" in flags_list
host_flag = "--host" in flags_list

if port_flag:
port_index = flags_list.index("--port")
PORT = flags_list[port_index + 1]

if host_flag:
host_index = flags_list.index("--host")
HOST = flags_list[host_index + 1]
else:
HOST = "localhost"

URL = f"http://{HOST}:{PORT}" if port_flag else None

with ProcessPoolExecutor(max_workers=number_of_jobs) as executor:
# Assign the parallel jobs
job_map = executor.map(
_run_sleep_tasks_in_subprocess,
[
(task_times, iteration_index, uuid.uuid4(), URL)
for iteration_index, task_times in enumerate(all_task_times)
],
)

# Perform iteration to deploy jobs
for _ in job_map:
pass
File renamed without changes.
Loading