From 1e55a6ba7f065efeae02b2c7f091cfd7a2a421e8 Mon Sep 17 00:00:00 2001 From: codycbakerphd Date: Sun, 3 Mar 2024 12:43:46 -0500 Subject: [PATCH 1/3] demo refactor --- .gitignore | 2 + demo/parallel/server.py | 81 ++++++++++++++++++++++++++--------------- 2 files changed, 54 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index 34fbcc3..6f8f449 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ dist .coverage .coverage.* codecov.xml + +.spyproject/ \ No newline at end of file diff --git a/demo/parallel/server.py b/demo/parallel/server.py index 7a13a68..248e40a 100644 --- a/demo/parallel/server.py +++ b/demo/parallel/server.py @@ -1,54 +1,73 @@ """Demo of parallel tqdm.""" -# HTTP server addition -import http.server import json -import socket -import socketserver import sys -import threading import time import uuid from concurrent.futures import ProcessPoolExecutor -from typing import List, Tuple +from typing import List import requests from tqdm_publisher import TQDMPublisher -def _run_sleep_in_subprocess(args: Tuple[int, int]): - """The operation to run on each subprocess.""" - repeat = args[0] - iteration_index = args[1] - id = args[2] - url = args[3] +def to_main_process(id: int, url: str, n: int, total: int, **kwargs): + """ + This is the parallel callback definition. - iterable = range(repeat) + Its parameters are attributes of a tqdm instance and their values are what a typical default tqdm printout + to console would contain (update step `n` out of `total` iterations). + """ + json_data = json.dumps(obj=dict(id=str(id), data=dict(n=n, total=total))) + + requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"}) + + +def _run_sleep_tasks_in_subprocess(task_times: List[float], iteration_index: int, id: int, url: str): + """ + Run a 'task' that takes a certain amount of time to run on each worker. + + In this case that task is simply time.sleep. + + Parameters + ---------- + sleep_time : float + The amount of time this task emulates having taken to complete. + iteration_index : int + The index of this task in the list of all tasks from the buffer map. + Each index would map to a different tqdm position. + id : int + Identifier of ??. + url : str + The localhost URL to sent progress updates to. + """ sub_progress_bar = TQDMPublisher( - iterable=iterable, + iterable=task_times, position=iteration_index + 1, desc=f"Progress on iteration {iteration_index} ({id})", leave=False, ) if url: - - def to_main_process(n: int, total: int, **kwargs): - - json_data = json.dumps(dict(id=str(id), data=dict(n=n, total=total))) - - requests.post(url, data=json_data, headers={"Content-Type": "application/json"}) - sub_progress_bar.subscribe(to_main_process) - for _ in sub_progress_bar: - time.sleep(0.5) + for sleep_time in sub_progress_bar: + time.sleep(sleep_time) if __name__ == "__main__": number_of_jobs = 3 - repeats = [4, 6, 8, 10, 4, 8, 20, 10, 5, 12, 5, 4, 5, 5, 5] + + # Each outer entry is a list of 'tasks' to perform on a particular worker + # For demonstration purposes, each in the list of tasks is the length of time in seconds + # that each iteration of the task takes to run and update the progress bar (emulated by sleeping) + all_task_times: List[List[float]] = [ + [4.2, 6.7, 8.5, 10.3, 4.2, 8.1], + [20.5, 10.7, 5.3], + [12.4, 5.2, 4.9, 5.1], + [5.7, 5.8], + ] flags_list = sys.argv[1:] @@ -67,12 +86,16 @@ def to_main_process(n: int, total: int, **kwargs): URL = f"http://{HOST}:{PORT}" if port_flag else None - # Start the parallel jobs with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: - + # Assign the parallel jobs job_map = executor.map( - _run_sleep_in_subprocess, - [(repeat, iteration_index, uuid.uuid4(), URL) for iteration_index, repeat in enumerate(repeats)], + _run_sleep_tasks_in_subprocess, + [ + (task_times, iteration_index, uuid.uuid4(), URL) + for iteration_index, task_times in enumerate(all_task_times) + ], ) - [_ for _ in job_map] # perform iteration to deploy jobs + # Perform iteration to deploy jobs + for _ in job_map: + pass From d3085dc3d97bc53cc2005726bba2be816605e46d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 3 Mar 2024 19:31:59 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 6f8f449..b4f6cca 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ dist .coverage.* codecov.xml -.spyproject/ \ No newline at end of file +.spyproject/ From bf595cd3ce07db9120e3ee2dd6a2439544ff1ab8 Mon Sep 17 00:00:00 2001 From: codycbakerphd Date: Sun, 3 Mar 2024 14:47:20 -0500 Subject: [PATCH 3/3] move demo inside to fix entrypoint --- pyproject.toml | 5 ++-- src/tqdm_publisher/_demo/__init__.py | 0 .../_demo/_demo_command_line_interface.py | 29 ++++++++++++------- .../tqdm_publisher/_demo}/client.html | 0 {demo => src/tqdm_publisher/_demo}/client.py | 0 src/tqdm_publisher/_demo/parallel/__init__.py | 0 .../tqdm_publisher/_demo}/parallel/client.py | 0 .../tqdm_publisher/_demo}/parallel/server.py | 0 {demo => src/tqdm_publisher/_demo}/server.py | 0 9 files changed, 21 insertions(+), 13 deletions(-) create mode 100644 src/tqdm_publisher/_demo/__init__.py rename demo/demo_cli.py => src/tqdm_publisher/_demo/_demo_command_line_interface.py (78%) rename {demo => src/tqdm_publisher/_demo}/client.html (100%) rename {demo => src/tqdm_publisher/_demo}/client.py (100%) create mode 100644 src/tqdm_publisher/_demo/parallel/__init__.py rename {demo => src/tqdm_publisher/_demo}/parallel/client.py (100%) rename {demo => src/tqdm_publisher/_demo}/parallel/server.py (100%) rename {demo => src/tqdm_publisher/_demo}/server.py (100%) diff --git a/pyproject.toml b/pyproject.toml index 6083aee..073722c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "License :: OSI Approved :: BSD License", "Intended Audience :: Developers", "Operating System :: Microsoft :: Windows", @@ -51,8 +52,8 @@ demo = [ "Homepage" = "https://github.com/catalystneuro/tqdm_publisher" "Bug Tracker" = "https://github.com/catalystneuro/tqdm_publisher/issues" -[project.gui-scripts] -tqdm_publisher = "demo.demo_cli:main" +[project.scripts] +tqdm_publisher = "tqdm_publisher._demo._demo_command_line_interface:_command_line_interface" [tool.black] line-length = 120 diff --git a/src/tqdm_publisher/_demo/__init__.py b/src/tqdm_publisher/_demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/demo/demo_cli.py b/src/tqdm_publisher/_demo/_demo_command_line_interface.py similarity index 78% rename from demo/demo_cli.py rename to src/tqdm_publisher/_demo/_demo_command_line_interface.py index 2428be1..ad58140 100644 --- a/demo/demo_cli.py +++ b/src/tqdm_publisher/_demo/_demo_command_line_interface.py @@ -1,3 +1,6 @@ +"""Command line interface for running the TQDM Publisher demo.""" + +import os import signal import subprocess import sys @@ -15,21 +18,22 @@ subprocesses = [] -def close_subprocesses(): +def _close_subprocesses(): for process in subprocesses: process.terminate() # Send SIGTERM to subprocess sys.exit(0) -def signal_handler(signal, frame): +def _signal_handler(signal, frame): print("Interrupt signal received. Shutting down subprocesses...") - close_subprocesses() + _close_subprocesses() -signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGINT, _signal_handler) -def main(): +def _command_line_interface(): + """Should be called only through the package entrypoint.""" if len(sys.argv) <= 1: print("No command provided. Please specify a command (e.g. 'demo').") return @@ -50,10 +54,13 @@ def main(): if command == "demo": if flags["client"]: - subprocesses.append(subprocess.Popen(["open", client_path])) + if sys.platform == "win32": + os.system(f'start "" "{client_path}"') + else: + subprocess.run(["open", client_path]) if flags["server"]: - subprocesses.append(subprocess.Popen(["python", server_path])) + subprocess.run(["python", server_path]) elif command == "parallel-demo": HOST = "localhost" @@ -75,7 +82,7 @@ def main(): client_args = ["python", parallel_client_path] if flags["both"]: client_args += ["--port", str(PORT), "--host", HOST] - subprocesses.append(subprocess.Popen(client_args)) + subprocess.run(client_args) if flags["both"]: time.sleep(1) # Ensure server starts before client connects @@ -91,8 +98,8 @@ def main(): if __name__ == "__main__": try: - main() - except KeyboardInterrupt as e: + _command_line_interface() + except KeyboardInterrupt: print("\n\nInterrupt signal received. Shutting down subprocesses...") finally: - close_subprocesses() + _close_subprocesses() diff --git a/demo/client.html b/src/tqdm_publisher/_demo/client.html similarity index 100% rename from demo/client.html rename to src/tqdm_publisher/_demo/client.html diff --git a/demo/client.py b/src/tqdm_publisher/_demo/client.py similarity index 100% rename from demo/client.py rename to src/tqdm_publisher/_demo/client.py diff --git a/src/tqdm_publisher/_demo/parallel/__init__.py b/src/tqdm_publisher/_demo/parallel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/demo/parallel/client.py b/src/tqdm_publisher/_demo/parallel/client.py similarity index 100% rename from demo/parallel/client.py rename to src/tqdm_publisher/_demo/parallel/client.py diff --git a/demo/parallel/server.py b/src/tqdm_publisher/_demo/parallel/server.py similarity index 100% rename from demo/parallel/server.py rename to src/tqdm_publisher/_demo/parallel/server.py diff --git a/demo/server.py b/src/tqdm_publisher/_demo/server.py similarity index 100% rename from demo/server.py rename to src/tqdm_publisher/_demo/server.py