Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Jean-Louis Leroy <[email protected]>
  • Loading branch information
jll63 committed Jan 31, 2024
1 parent 42696e2 commit a59c5dc
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
- name: Run Integration Tests
run: |
${{ github.workspace }}/src/integration-tests/run-tests \
--log-level ERROR \
--log-level info \
--log-file-level=info \
--bmq-tolerate-dirty-shutdown \
--bmq-log-dir=failure-logs \
Expand Down
84 changes: 0 additions & 84 deletions src/python/blazingmq/dev/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,16 @@


import abc
import contextlib
import copy
import functools
import itertools
import logging
import subprocess
import threading
from dataclasses import dataclass, field
from decimal import Decimal
from pathlib import Path
from shutil import rmtree
from typing import IO, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union

from termcolor import colored
from xsdata.formats.dataclass.context import XmlContext
from xsdata.formats.dataclass.serializers import JsonSerializer
from xsdata.formats.dataclass.serializers.config import SerializerConfig
Expand All @@ -31,23 +27,7 @@
from blazingmq.schemas import mqbcfg, mqbconf

__all__ = ["Configurator"]

COLORS = {
"green": 32,
"yellow": 33,
"magenta": 35,
"cyan": 36,
"blue": 34,
"light_green": 92,
"light_yellow": 93,
"light_blue": 94,
"light_magenta": 95,
"light_cyan": 96,
}


logger = logging.getLogger(__name__)
broker_logger = logger.getChild("broker")


RUN_SCRIPT = """#! /usr/bin/env bash
Expand Down Expand Up @@ -725,21 +705,6 @@ def domains(self) -> List[Domain]:
]


class MonitoredProcess:
process: Optional[subprocess.Popen] = None
thread: Optional[threading.Thread] = None


def broker_monitor(out: IO[str], prefix: str, color: str):
while not out.closed:
line = out.readline()
if line == "":
break
line = line.rstrip(" \n\r")
if line:
broker_logger.info(colored("%s | %s", color), prefix, line)


def _json_filter(kv_pairs: Tuple) -> Dict:
return {k: (float(v) if "Ratio" in k else v) for k, v in kv_pairs if v is not None}

Expand Down Expand Up @@ -785,52 +750,3 @@ def create_json_file(self, path: Union[str, Path], content) -> None:
with open(path, "w", encoding="ascii") as out:
serializer.write(out, content)
path.chmod(0o644)


@dataclass
class Session(contextlib.AbstractContextManager):
configurator: Configurator
root: Path
brokers: Dict[Broker, MonitoredProcess] = field(default_factory=dict)

def __exit__(self, *args):
for broker in reversed(self.brokers.values()):
if broker.process is not None:
broker.process.__exit__(*args)

for broker in reversed(self.brokers.values()):
if broker.thread is not None:
broker.thread.join()

def stop(self):
for broker in self.brokers.values():
if broker.process is not None:
broker.process.terminate()
broker.process.wait()

def run(self):
colors = itertools.cycle(COLORS)
prefix_len = max(len(name) for name in self.configurator.brokers)

for broker in self.configurator.brokers.values():
monitored = MonitoredProcess()
self.brokers[broker] = monitored

monitored.process = subprocess.Popen(
[self.root.joinpath(broker.name, "run")],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="ASCII",
bufsize=0,
)

assert monitored.process.stdout is not None
monitored.thread = threading.Thread(
target=broker_monitor,
args=(
monitored.process.stdout,
broker.name.ljust(prefix_len),
next(colors),
),
)
monitored.thread.start()
90 changes: 90 additions & 0 deletions src/python/blazingmq/dev/configurator/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import contextlib
import itertools
import logging
import subprocess
import threading
from dataclasses import dataclass, field
from pathlib import Path
from typing import IO, Dict, Optional

from blazingmq.dev.configurator import Broker, Configurator
from termcolor import colored

logger = logging.getLogger(__name__)

COLORS = {
"green": 32,
"yellow": 33,
"magenta": 35,
"cyan": 36,
"blue": 34,
"light_green": 92,
"light_yellow": 93,
"light_blue": 94,
"light_magenta": 95,
"light_cyan": 96,
}


class MonitoredProcess:
process: Optional[subprocess.Popen] = None
thread: Optional[threading.Thread] = None


def broker_monitor(out: IO[str], prefix: str, color: str):
while not out.closed:
line = out.readline()
if line == "":
break
line = line.rstrip(" \n\r")
if line:
logger.info(colored("%s | %s", color), prefix, line)


@dataclass
class Session(contextlib.AbstractContextManager):
configurator: Configurator
root: Path
brokers: Dict[Broker, MonitoredProcess] = field(default_factory=dict)

def __exit__(self, *args):
for broker in reversed(self.brokers.values()):
if broker.process is not None:
broker.process.__exit__(*args)

for broker in reversed(self.brokers.values()):
if broker.thread is not None:
broker.thread.join()

def stop(self):
for broker in self.brokers.values():
if broker.process is not None:
broker.process.terminate()
broker.process.wait()

def run(self):
colors = itertools.cycle(COLORS)
prefix_len = max(len(name) for name in self.configurator.brokers)

for broker in self.configurator.brokers.values():
monitored = MonitoredProcess()
self.brokers[broker] = monitored

monitored.process = subprocess.Popen(
[self.root.joinpath(broker.name, "run")],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="ASCII",
bufsize=0,
)

assert monitored.process.stdout is not None
monitored.thread = threading.Thread(
target=broker_monitor,
args=(
monitored.process.stdout,
broker.name.ljust(prefix_len),
next(colors),
),
)
monitored.thread.start()
1 change: 1 addition & 0 deletions src/python/blazingmq/dev/it/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def cluster_fixture(request, configure) -> Generator:
log_file_path = re.sub(r"^[^:]+::", "", request.node.nodeid)
log_file_path = re.sub(r"/", "-", log_file_path)
log_file_path = (log_dir / (log_file_path + ".log")).resolve()
logger.debug("log file = %s", log_file_path)
log_file_handler = logging.FileHandler(
log_file_path, mode="w", encoding="UTF-8"
)
Expand Down
5 changes: 5 additions & 0 deletions src/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ pytest-rerunfailures
pytest-xdist
termcolor
xsdata


pyyaml
docker
termcolor

0 comments on commit a59c5dc

Please sign in to comment.