Skip to content

Commit

Permalink
Simplified parallel structure (#57)
Browse files Browse the repository at this point in the history
* add docstrings and annotations

* debug

* debugging

* swap from try/except; enhance verbosity

* explain locality

* simplify main runner

* Update src/tqdm_publisher/_demos/_parallel_bars/_server.py

* GF updates

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update _handler.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Show as much metadata as possible from format_dict

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add docs and simplify logic

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update _server.py

* Update _server.py

* Update _server.py

* change to public method and enhance docstring

* simplify logic

* Fix id references

* Update _server.py

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update elements.js

* Fix client

* Remove WS demo for parallel demos

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Request initial state immediately

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update elements.js

* Update elements.js

* Simplified parallel structure

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update _server.py

---------

Co-authored-by: CodyCBakerPhD <[email protected]>
Co-authored-by: Cody Baker <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Cody Baker <[email protected]>
  • Loading branch information
5 people authored Apr 17, 2024
1 parent 4d2763b commit 2e39378
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 115 deletions.
62 changes: 0 additions & 62 deletions src/tqdm_publisher/_demos/_parallel_bars/_client.py

This file was deleted.

74 changes: 21 additions & 53 deletions src/tqdm_publisher/_demos/_parallel_bars/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@
import asyncio
import json
import sys
import threading
import time
import uuid
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List, Union
from typing import List

import requests
from flask import Flask, Response, jsonify, request
from flask_cors import CORS, cross_origin

from tqdm_publisher import TQDMProgressHandler, TQDMProgressPublisher
from tqdm_publisher._demos._parallel_bars._client import (
create_http_server,
find_free_port,
)

N_JOBS = 3

Expand All @@ -35,23 +30,6 @@
progress_handler = TQDMProgressHandler()


def forward_updates_over_server_sent_events(request_id, progress_bar_id, format_dict):
progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict))


class ThreadedHTTPServer:
def __init__(self, port: int, callback):
self.port = port
self.callback = callback

def run(self):
create_http_server(port=self.port, callback=self.callback)

def start(self):
thread = threading.Thread(target=self.run)
thread.start()


def forward_to_http_server(url: str, request_id: str, progress_bar_id: int, format_dict: dict):
"""
This is the parallel callback definition.
Expand Down Expand Up @@ -214,60 +192,50 @@ def listen_to_events():
app = Flask(__name__)
cors = CORS(app)
app.config["CORS_HEADERS"] = "Content-Type"
PORT = find_free_port()
PORT = 3768


@app.route("/start", methods=["POST"])
@cross_origin()
def start():
data = json.loads(request.data) if request.data else {}
request_id = data["request_id"]
run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=f"http://localhost:{PORT}")
url = f"http://localhost:{PORT}/update"
app.logger.info(url)

run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=url)
return jsonify({"status": "success"})


@app.route("/events", methods=["GET"])
@app.route("/update", methods=["POST"])
@cross_origin()
def events():
return Response(listen_to_events(), mimetype="text/event-stream")

def update():
data = json.loads(request.data) if request.data else {}
request_id = data["request_id"]
progress_bar_id = data["id"]
format_dict = data["data"]

class ThreadedFlaskServer:
def __init__(self, port: int):
self.port = port
# Forward updates over Sever-Side Events
progress_handler.announce(dict(request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict))

def run(self):
app.run(host="localhost", port=self.port)

def start(self):
thread = threading.Thread(target=self.run)
thread.start()
@app.route("/events", methods=["GET"])
@cross_origin()
def events():
return Response(listen_to_events(), mimetype="text/event-stream")


async def start_server(port):

flask_server = ThreadedFlaskServer(port=3768)
flask_server.start()

def update_queue(request_id: str, progress_bar_id: str, format_dict: dict):
forward_updates_over_server_sent_events(
request_id=request_id, progress_bar_id=progress_bar_id, format_dict=format_dict
)

http_server = ThreadedHTTPServer(port=PORT, callback=update_queue)
http_server.start()

await asyncio.Future()
app.run(host="localhost", port=port)


def run_parallel_bar_demo() -> None:
"""Asynchronously start the servers."""
"""Asynchronously start the server."""
asyncio.run(start_server(port=PORT))


def _run_parallel_bars_demo(port: str, host: str):
URL = f"http://{host}:{port}"

URL = f"http://{host}:{port}/update"
request_id = uuid.uuid4()
run_parallel_processes(all_task_times=TASK_TIMES, request_id=request_id, url=URL)

Expand Down

0 comments on commit 2e39378

Please sign in to comment.