Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel demo #32

Closed
wants to merge 12 commits into from
Closed
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ test = [
]

demo = [
"websockets==12.0"
"websockets==12.0",
"requests==2.31.0"
]

[project.urls]
Expand Down
15 changes: 15 additions & 0 deletions src/tqdm_publisher/_demo/_client.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,32 @@
}

.progress {
position: relative;
width: 100%;
height: 20px;
background-color: #ddd;
}

.progress label {
position: absolute;
left: 10px;
top: 0;
transform: translateY(20%);
font-size: 0.8rem;
}

.progress div {
height: 100%;
background-color: #4caf50;
width: 0%;
}

.progress.closed div {
background-color: #af4c4c;
}



</style>
<script src="./_client.js" defer></script>

Expand Down
98 changes: 87 additions & 11 deletions src/tqdm_publisher/_demo/_demo_command_line_interface.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,84 @@
import os
import signal
import subprocess
import sys
import time
from pathlib import Path

from ._server import run_demo
from ._parallel._server import run_demo as _parallel_server
from ._server import run_demo as _server

DEMO_BASE_FOLDER_PATH = Path(__file__).parent

CLIENT_FILE_PATH = DEMO_BASE_FOLDER_PATH / "_client.html"
SERVER_FILE_PATH = DEMO_BASE_FOLDER_PATH / "_server.py"

PARALLEL_SERVER_FILE_PATH = DEMO_BASE_FOLDER_PATH / "_parallel" / "_server.py"
PARALLEL_CLIENT_FILE_PATH = DEMO_BASE_FOLDER_PATH / "_parallel" / "_client.py"

PORT_FLAG = "--port"
HOST_FLAG = "--host"
SERVER_FLAG = "--server"
CLIENT_FLAG = "--client"

SUBPROCESSES = []


def close_process():
for process in SUBPROCESSES:
process.terminate() # Send SIGTERM to subprocess
# sys.exit()


def signal_handler(signal, frame):
close_process()


signal.signal(signal.SIGINT, signal_handler)


def run_demo(demo, flags):

if demo == "demo":

print(flags)

# For convenience - automatically pop-up a browser window on the locally hosted HTML page
if flags["client"]:
if sys.platform == "win32":
os.system(f'start "" "{CLIENT_FILE_PATH}"')
else:
SUBPROCESSES.append(subprocess.Popen(["open", CLIENT_FILE_PATH]))

_server()

elif demo == "parallel-demo":
if flags["client"]:
client_args = ["python", PARALLEL_CLIENT_FILE_PATH]
if flags["both"]:
client_args += [PORT_FLAG, str(flags["port"]), HOST_FLAG, flags["host"]]
SUBPROCESSES.append(subprocess.Popen(client_args))
if flags["both"]:
time.sleep(1) # Ensure server starts before client connects

if flags["server"]:
if flags["both"]:
_parallel_server(flags["host"], flags["port"])
else:
_parallel_server()

close_process()

else:
print(f"{demo} is an invalid demo option.")


def get_flag(flags, flag, default=None):
if flag in flags:
flag_index = flags.index(flag)
return flags[flag_index + 1]
return default


def _command_line_interface():
"""A simple command line interface for running the demo for TQDM Publisher."""
Expand All @@ -18,7 +87,7 @@ def _command_line_interface():
return

command = sys.argv[1]
if "-" in command:
if "--" in command:
print(
f"No command provided, but flag {command} was received. "
"Please specify a command before the flag (e.g. 'demo')."
Expand All @@ -30,14 +99,21 @@ def _command_line_interface():
print(f"No flags are accepted at this time, but flags {flags_list} were received.")
return

if command == "demo":
# For convenience - automatically pop-up a browser window on the locally hosted HTML page
if sys.platform == "win32":
os.system(f'start "" "{CLIENT_FILE_PATH}"')
else:
subprocess.run(["open", CLIENT_FILE_PATH])
client_flag = "--client" in flags_list
server_flag = "--server" in flags_list
both_flags = "--server" in flags_list and "--client" in flags_list

run_demo()
flags = dict(
client=not server_flag or both_flags,
server=not client_flag or both_flags,
both=(client_flag and server_flag) or (not client_flag and not server_flag),
host=get_flag(flags_list, HOST_FLAG, "localhost"),
port=get_flag(flags_list, PORT_FLAG, 8000),
)

else:
print(f"{command} is an invalid command.")
try:
run_demo(command, flags)
except KeyboardInterrupt as e:
print("\n\nInterrupt signal received. Shutting down subprocesses...")
finally:
close_process()
60 changes: 60 additions & 0 deletions src/tqdm_publisher/_demo/_parallel/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Demo of parallel tqdm client."""

# HTTP server addition
import http.server
import json

# Kill server on interrupt
import signal
import socket
import socketserver
import sys


def find_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) # Bind to a free port provided by the host
return s.getsockname()[1] # Return the port number assigned


class MyTCPServer(socketserver.TCPServer):
allow_reuse_address = True # This allows the server to bind to an address that is in a TIME_WAIT state


def GLOBAL_CALLBACK(id, n, total):
print("Global Update", id, f"{n}/{total}")


if __name__ == "__main__":

flags_list = sys.argv[1:]

port_flag = "--port" in flags_list

if port_flag:
port_index = flags_list.index("--port")
PORT = int(flags_list[port_index + 1])
else:
PORT = find_free_port()

class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler):

def do_POST(self):
content_length = int(self.headers["Content-Length"])
post_data = json.loads(self.rfile.read(content_length).decode("utf-8"))
GLOBAL_CALLBACK(post_data["id"], **post_data["data"])
self.send_response(200)
self.end_headers()

with MyTCPServer(("", PORT), MyHttpRequestHandler) as httpd:

def signal_handler(signal, frame):
print("\n\nInterrupt signal received. Closing server...")
httpd.shutdown() # Stop the serve_forever loop
httpd.server_close() # Clean up the server socket
print("Server closed.")

signal.signal(signal.SIGINT, signal_handler)

print(f"Serving HTTP on port {PORT}")
httpd.serve_forever()
80 changes: 80 additions & 0 deletions src/tqdm_publisher/_demo/_parallel/_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Demo of parallel tqdm."""

# HTTP server addition
import json
import sys
import time
import uuid
from concurrent.futures import ProcessPoolExecutor
from typing import Tuple

import requests

from tqdm_publisher import TQDMPublisher

NUMBER_OF_JOBS = 3
REPEATS = [4, 6, 8, 10, 4, 8, 20, 10, 5, 12, 5, 4, 5, 5, 5]


def _run_sleep_in_subprocess(args: Tuple[int, int]):
"""The operation to run on each subprocess."""
repeat = args[0]
iteration_index = args[1]
id = args[2]
url = args[3]

iterable = range(repeat)
sub_progress_bar = TQDMPublisher(
iterable=iterable,
position=iteration_index + 1,
desc=f"Progress on iteration {iteration_index} ({id})",
leave=False,
)

if url:

def to_main_process(format_dict):

json_data = json.dumps(dict(id=str(id), data=dict(n=format_dict["n"], total=format_dict["total"])))
requests.post(url, data=json_data, headers={"Content-Type": "application/json"})

sub_progress_bar.subscribe(to_main_process)

for _ in sub_progress_bar:
time.sleep(0.5)


def run_demo(host, port):

URL = f"http://{host}:{port}" if port else None

# Start the parallel jobs
with ProcessPoolExecutor(max_workers=NUMBER_OF_JOBS) as executor:

job_map = executor.map(
_run_sleep_in_subprocess,
[(repeat, iteration_index, uuid.uuid4(), URL) for iteration_index, repeat in enumerate(REPEATS)],
)

[_ for _ in job_map] # perform iteration to deploy jobs


if __name__ == "__main__":

flags_list = sys.argv[1:]

port_flag = "--port" in flags_list
host_flag = "--host" in flags_list

PORT = None
HOST = "localhost"

if port_flag:
port_index = flags_list.index("--port")
PORT = flags_list[port_index + 1]

if host_flag:
host_index = flags_list.index("--host")
HOST = flags_list[host_index + 1]

run_demo(HOST, PORT)
3 changes: 2 additions & 1 deletion src/tqdm_publisher/_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ def __init__(self, *args, **kwargs):
def update(self, n: int = 1) -> Union[bool, None]:
displayed = super().update(n)

format_dict = self.format_dict
for callback in self.callbacks.values():
callback(self.format_dict)
callback(format_dict)

return displayed

Expand Down
6 changes: 3 additions & 3 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ def test_initialization():
async def test_subscription_and_callback_execution():
n_callback_executions = dict()

def test_callback(identifier, data):
def test_callback(identifier, **kwargs):
nonlocal n_callback_executions

if identifier not in n_callback_executions:
n_callback_executions[identifier] = 0

n_callback_executions[identifier] += 1

assert "n" in data and "total" in data
assert "n" in kwargs and "total" in kwargs

tasks = create_tasks()
publisher = TQDMPublisher(asyncio.as_completed(tasks), total=len(tasks))
Expand All @@ -47,7 +47,7 @@ def test_callback(identifier, data):


def test_unsubscription():
def dummy_callback(data):
def dummy_callback(**kwargs):
pass

tasks = []
Expand Down
Loading