Skip to content

Commit

Permalink
Simplified parallel structure
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettmflynn committed Apr 15, 2024
1 parent 34a51eb commit d167e49
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 117 deletions.
62 changes: 0 additions & 62 deletions src/tqdm_publisher/_demos/_parallel_bars/_client.py

This file was deleted.

75 changes: 20 additions & 55 deletions src/tqdm_publisher/_demos/_parallel_bars/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@
import asyncio
import json
import sys
import threading
import time
import uuid
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List, Union
from typing import List

import requests
from flask import Flask, Response, jsonify, request
from flask_cors import CORS, cross_origin

from tqdm_publisher import TQDMProgressHandler, TQDMProgressPublisher
from tqdm_publisher._demos._parallel_bars._client import (
create_http_server,
find_free_port,
)

N_JOBS = 3

Expand All @@ -32,25 +27,7 @@
]

## TQDMProgressHandler cannot be called from a process...so we just use a global reference exposed to each subprocess
progress_handler = TQDMProgressHandler()


def forward_updates_over_server_sent_events(request_id, progress_bar_id, format_dict):
progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict))


class ThreadedHTTPServer:
def __init__(self, port: int, callback):
self.port = port
self.callback = callback

def run(self):
create_http_server(port=self.port, callback=self.callback)

def start(self):
thread = threading.Thread(target=self.run)
thread.start()

progress_handler = TQDMProgressHandler()

def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, format_dict: dict):
"""
Expand Down Expand Up @@ -214,50 +191,39 @@ def listen_to_events():
app = Flask(__name__)
cors = CORS(app)
app.config["CORS_HEADERS"] = "Content-Type"
PORT = find_free_port()
PORT = 3768 # find_free_port()


@app.route("/start", methods=["POST"])
@cross_origin()
def start():
data = json.loads(request.data) if request.data else {}
request_id = data["request_id"]
run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=f"http://localhost:{PORT}")
url = f"http://localhost:{PORT}/update"
app.logger.info(url)

run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=url)
return jsonify({"status": "success"})


@app.route("/update", methods=["POST"])
@cross_origin()
def update():
data = json.loads(request.data) if request.data else {}
request_id = data["request_id"]
progress_bar_id = data["id"]
format_dict = data["data"]

# Forward updates over Sever-Side Events
progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict))

@app.route("/events", methods=["GET"])
@cross_origin()
def events():
return Response(listen_to_events(), mimetype="text/event-stream")


class ThreadedFlaskServer:
def __init__(self, port: int):
self.port = port

def run(self):
app.run(host="localhost", port=self.port)

def start(self):
thread = threading.Thread(target=self.run)
thread.start()


async def start_server(port):

flask_server = ThreadedFlaskServer(port=3768)
flask_server.start()

def update_queue(request_id: str, progress_bar_id: str, format_dict: dict):
forward_updates_over_server_sent_events(
request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict
)

http_server = ThreadedHTTPServer(port=PORT, callback=update_queue)
http_server.start()

await asyncio.Future()
app.run(host="localhost", port=port)


def run_parallel_bar_demo() -> None:
Expand All @@ -266,8 +232,7 @@ def run_parallel_bar_demo() -> None:


def _run_parallel_bars_demo(port: str, host: str):
URL = f"http://{host}:{port}"

URL = f"http://{host}:{port}/update"
request_id = uuid.uuid4()
run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL)

Expand Down

0 comments on commit d167e49

Please sign in to comment.