Skip to content

Commit

Permalink
demo refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Mar 3, 2024
1 parent fcf17fc commit 1e55a6b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ dist
.coverage
.coverage.*
codecov.xml

.spyproject/
81 changes: 52 additions & 29 deletions demo/parallel/server.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,73 @@
"""Demo of parallel tqdm."""

# HTTP server addition
import http.server
import json
import socket
import socketserver
import sys
import threading
import time
import uuid
from concurrent.futures import ProcessPoolExecutor
from typing import List, Tuple
from typing import List

import requests

from tqdm_publisher import TQDMPublisher


def _run_sleep_in_subprocess(args: Tuple[int, int]):
"""The operation to run on each subprocess."""
repeat = args[0]
iteration_index = args[1]
id = args[2]
url = args[3]
def to_main_process(id: int, url: str, n: int, total: int, **kwargs):
"""
This is the parallel callback definition.
iterable = range(repeat)
Its parameters are attributes of a tqdm instance and their values are what a typical default tqdm printout
to console would contain (update step `n` out of `total` iterations).
"""
json_data = json.dumps(obj=dict(id=str(id), data=dict(n=n, total=total)))

requests.post(url=url, data=json_data, headers={"Content-Type": "application/json"})


def _run_sleep_tasks_in_subprocess(task_times: List[float], iteration_index: int, id: int, url: str):
"""
Run a 'task' that takes a certain amount of time to run on each worker.
In this case that task is simply time.sleep.
Parameters
----------
sleep_time : float
The amount of time this task emulates having taken to complete.
iteration_index : int
The index of this task in the list of all tasks from the buffer map.
Each index would map to a different tqdm position.
id : int
Identifier of ??.
url : str
The localhost URL to sent progress updates to.
"""
sub_progress_bar = TQDMPublisher(
iterable=iterable,
iterable=task_times,
position=iteration_index + 1,
desc=f"Progress on iteration {iteration_index} ({id})",
leave=False,
)

if url:

def to_main_process(n: int, total: int, **kwargs):

json_data = json.dumps(dict(id=str(id), data=dict(n=n, total=total)))

requests.post(url, data=json_data, headers={"Content-Type": "application/json"})

sub_progress_bar.subscribe(to_main_process)

for _ in sub_progress_bar:
time.sleep(0.5)
for sleep_time in sub_progress_bar:
time.sleep(sleep_time)


if __name__ == "__main__":
number_of_jobs = 3
repeats = [4, 6, 8, 10, 4, 8, 20, 10, 5, 12, 5, 4, 5, 5, 5]

# Each outer entry is a list of 'tasks' to perform on a particular worker
# For demonstration purposes, each in the list of tasks is the length of time in seconds
# that each iteration of the task takes to run and update the progress bar (emulated by sleeping)
all_task_times: List[List[float]] = [
[4.2, 6.7, 8.5, 10.3, 4.2, 8.1],
[20.5, 10.7, 5.3],
[12.4, 5.2, 4.9, 5.1],
[5.7, 5.8],
]

flags_list = sys.argv[1:]

Expand All @@ -67,12 +86,16 @@ def to_main_process(n: int, total: int, **kwargs):

URL = f"http://{HOST}:{PORT}" if port_flag else None

# Start the parallel jobs
with ProcessPoolExecutor(max_workers=number_of_jobs) as executor:

# Assign the parallel jobs
job_map = executor.map(
_run_sleep_in_subprocess,
[(repeat, iteration_index, uuid.uuid4(), URL) for iteration_index, repeat in enumerate(repeats)],
_run_sleep_tasks_in_subprocess,
[
(task_times, iteration_index, uuid.uuid4(), URL)
for iteration_index, task_times in enumerate(all_task_times)
],
)

[_ for _ in job_map] # perform iteration to deploy jobs
# Perform iteration to deploy jobs
for _ in job_map:
pass

0 comments on commit 1e55a6b

Please sign in to comment.