Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel demo #32

Closed
wants to merge 12 commits into from
4 changes: 4 additions & 0 deletions demo/demo_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

client_path = demo_base_path / "client.html"
server_path = demo_base_path / "server.py"
parallel_script_path = demo_base_path / "parallel_script.py"


def main():
Expand Down Expand Up @@ -33,6 +34,9 @@ def main():
if flags["server"]:
subprocess.run(["python", server_path])

elif command == "parallel":
subprocess.run(["python", parallel_script_path])

else:
print(f"{command} is an invalid command.")

Expand Down
127 changes: 127 additions & 0 deletions demo/parallel_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Demo of parallel tqdm."""

# HTTP server addition
import http.server
import json
import socket
import socketserver
import threading
import time
import uuid
from concurrent.futures import ProcessPoolExecutor
from typing import List, Tuple

import requests

from tqdm_publisher import TQDMPublisher

id_ref = uuid.uuid4()


def find_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) # Bind to a free port provided by the host
return s.getsockname()[1] # Return the port number assigned


def _run_sleep_in_subprocess(args: Tuple[int, int]):
"""The operation to run on each subprocess."""
repeat = args[0]
iteration_index = args[1]
id = args[2]
PORT = args[3]

iterable = range(repeat)
sub_progress_bar = TQDMPublisher(
iterable=iterable,
position=iteration_index + 1,
desc=f"Progress on iteration {iteration_index} ({id})",
leave=False,
)

url = f"http://localhost:{PORT}"

def to_main_process(format_dict):

json_data = json.dumps(dict(id=str(id), update=format_dict))

requests.post(url, data=json_data, headers={"Content-Type": "application/json"})

sub_progress_bar.subscribe(to_main_process)

for _ in sub_progress_bar:
time.sleep(0.5)


class ParallelProgressHandler:
def __init__(self):
self.callbacks = {}
self.started = False

def _run_callbacks(self, id, format_dict):
for callback in self.callbacks.values():
callback(id, format_dict)

def run_server(self, port):

def run_callbacks(id, format_dict):
self._run_callbacks(id, format_dict)

class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler):

def do_POST(self):
content_length = int(self.headers["Content-Length"])
post_data = json.loads(self.rfile.read(content_length).decode("utf-8"))
run_callbacks(post_data["id"], post_data["update"])
self.send_response(200)
self.end_headers()

with socketserver.TCPServer(("", port), MyHttpRequestHandler) as httpd:
print(f"Serving HTTP on port {port}")
httpd.serve_forever()

def run(self, number_of_jobs: int, repeats: List[int]):
"""The main (outer) iteration run on the central managing process."""

if self.started:
return

self.started = True

PORT = find_free_port()

# Start the server in a new thread
server_thread = threading.Thread(target=self.run_server, args=[PORT])
server_thread.daemon = True
server_thread.start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move the PORT related stuff to an argument of this function? Such that the run command only does parallel dispatch of some operation with some map of input arguments (which can also include the port info)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or better yet make it more generic to only receive the URL at which to make the post request

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Now the process "server" and HTTP "client" are run in parallel and communicate to each other through the URL provided to the former.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also trying out the kwargs approach discussed yesterday. Let me know if this makes sense.

It's a bit more cumbersome than I'd anticipated because all kwargs are considered required to be defined on the callback function (or simply **kwargs)


# Start the parallel jobs
with ProcessPoolExecutor(max_workers=number_of_jobs) as executor:

job_map = executor.map(
_run_sleep_in_subprocess,
[(repeat, iteration_index, uuid.uuid4(), PORT) for iteration_index, repeat in enumerate(repeats)],
)

[_ for _ in job_map] # perform iteration to deploy jobs

server_thread.join()

def subscribe(self, callback):
id = uuid.uuid4()
self.callbacks[id] = callback
return id


if __name__ == "__main__":
number_of_jobs = 3
repeats = [4, 6, 8, 10, 4, 8, 20, 10, 5, 12, 5, 4, 5, 5, 5]

def send_to_website(id, format_dict):
print("Send to website", id, format_dict)

parallel_progress_handler = ParallelProgressHandler()

parallel_progress_handler.subscribe(send_to_website)

parallel_progress_handler.run(number_of_jobs=number_of_jobs, repeats=repeats)
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ test = [
]

demo = [
"websockets==12.0"
"websockets==12.0",
"requests==2.31.0"
]

[project.urls]
Expand Down
Loading