From d135a48ec73696108254d8e157f17819a66d6420 Mon Sep 17 00:00:00 2001 From: Xiao Gui Date: Thu, 29 Feb 2024 17:16:53 +0100 Subject: [PATCH] feat: rework metrics feat: trigger restart on data validation --- .docker/docker-compose.yaml | 22 ++ .docker/restart.sh | 10 + .github/workflows/docker-img.yml | 37 +--- api/common/timer.py | 40 ++++ api/server/metrics.py | 334 ++++++++++++++++--------------- 5 files changed, 250 insertions(+), 193 deletions(-) create mode 100644 .docker/docker-compose.yaml create mode 100644 .docker/restart.sh diff --git a/.docker/docker-compose.yaml b/.docker/docker-compose.yaml new file mode 100644 index 00000000..373df9e2 --- /dev/null +++ b/.docker/docker-compose.yaml @@ -0,0 +1,22 @@ +version: '3.0' +services: + api: + container_name: siibra-api + image: docker-registry.ebrains.eu/siibra/siibra-api:latest + ports: + - "5000:5000" + environment: + SIIBRA_CACHEDIR: /siibra-api-volume + SIIBRA_USE_CONFIGURATION: /siibra-configuration + volumes: + - ./siibra-api-volume:/siibra-api-volume + - ./siibra-configuration:/siibra-configuration + explorer: + container_name: siibra-explorer + image: docker-registry.ebrains.eu/siibra/siibra-explorer:staging + environment: + OVERWRITE_API_ENDPOINT: https://zam10189.zam.kfa-juelich.de/api/v3_0 + EXPERIMENTAL_FLAG: '1' + HOST_PATHNAME: '/explorer' + ports: + - "8080:8080" diff --git a/.docker/restart.sh b/.docker/restart.sh new file mode 100644 index 00000000..824f3e26 --- /dev/null +++ b/.docker/restart.sh @@ -0,0 +1,10 @@ +#! /bin/bash + +git -C ./siibra-configuration fetch && git -C ./siibra-configuration merge --ff-only origin/master + +docker pull docker-registry.ebrains.eu/siibra/siibra-api:latest +docker pull docker-registry.ebrains.eu/siibra/siibra-explorer:staging + +docker-compose down +sleep 5 +docker-compose up -d \ No newline at end of file diff --git a/.github/workflows/docker-img.yml b/.github/workflows/docker-img.yml index 3f6a90db..16df6b50 100644 --- a/.github/workflows/docker-img.yml +++ b/.github/workflows/docker-img.yml @@ -156,35 +156,14 @@ jobs: secrets: okd_token: ${{ secrets.OKD_PROD_SECRET }} -# disable deployment on jsc - - # data-validation-config-hash: - # if: ${{ github.event_name == 'release' && contains(github.ref, 'rc') }} - # runs-on: ubuntu-latest - # outputs: - # CONFIG_SHORT_REV: ${{ steps.parse-rev.outputs.CONFIG_SHORT_REV }} - # steps: - # - id: parse-rev - # name: Get short rev of HEAD at master - # run: | - # git clone https://jugit.fz-juelich.de/t.dickscheid/brainscapes-configurations.git - # CONFIG_SHORT_REV=$(git -C brainscapes-configurations rev-parse --short=6 HEAD) - # echo CONFIG_SHORT_REV=$CONFIG_SHORT_REV >> $GITHUB_OUTPUT - - # deploy-rc-on-data-validation: - # needs: - # - setup-envvar - # - data-validation-config-hash - # if: ${{ github.event_name == 'release' && contains(github.ref, 'rc') }} - # uses: ./.github/workflows/deploy-on-okd.yml - # with: - # okd_endpoint: https://okd.jsc.hbp.eu:443 - # flavor: rc - # queues: ${{ needs.setup-envvar.outputs.queues }} - # version: c.${{ needs.data-validation-config-hash.outputs.CONFIG_SHORT_REV }} - # workerimage: docker-registry.ebrains.eu/siibra/siibra-api:rc-worker - # secrets: - # okd_token: ${{ secrets.OKD_JSC_SECRET }} + deploy-rc-on-data-validation: + needs: + - build-docker-img + if: ${{ github.event_name == 'push' }} + runs-on: siibra-data-validation + steps: + - run: | + /bin/bash -c "cd /softwares/software && ./restart.sh" deploy-prod-on-okd: needs: setup-envvar diff --git a/api/common/timer.py b/api/common/timer.py index aa515130..9eb039ce 100644 --- a/api/common/timer.py +++ b/api/common/timer.py @@ -1,4 +1,5 @@ from threading import Timer +from typing import List, Callable class RepeatTimer(Timer): """RepeatTimer @@ -8,3 +9,42 @@ class RepeatTimer(Timer): def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs) + +class Cron: + def __init__(self) -> None: + self._minutely_fns: List[Callable] = [] + self._ten_minutely_fns: List[Callable] = [] + + self._timers: List[RepeatTimer] = [ + RepeatTimer(60, self._run_minutely), + RepeatTimer(600, self._run_ten_minutely) + ] + + def _run_minutely(self): + for fn in self._minutely_fns: + fn() + + def _run_ten_minutely(self): + for fn in self._ten_minutely_fns: + fn() + + def minutely(self, fn: Callable): + self._minutely_fns.append(fn) + return fn + + def ten_minutely(self, fn: Callable): + self._ten_minutely_fns.append(fn) + return fn + + def run_all(self): + self._run_ten_minutely() + self._run_minutely() + + def start(self): + for timer in self._timers: + timer.start() + + def stop(self): + """On terminate""" + for timer in self._timers: + timer.cancel() diff --git a/api/server/metrics.py b/api/server/metrics.py index f94db4d9..74301de2 100644 --- a/api/server/metrics.py +++ b/api/server/metrics.py @@ -1,202 +1,208 @@ from fastapi import HTTPException from fastapi.responses import PlainTextResponse -from typing import List, Dict, Tuple +from typing import Dict, Tuple, Callable from subprocess import run import os from pathlib import Path from collections import defaultdict -from api.siibra_api_config import ROLE, CELERY_CONFIG, NAME_SPACE, MONITOR_FIRSTLVL_DIR, queues -from api.common.timer import RepeatTimer +from functools import wraps +from api.siibra_api_config import ROLE, CELERY_CONFIG, NAME_SPACE, MONITOR_FIRSTLVL_DIR +from api.common.timer import Cron from api.common import general_logger +def is_server(fn: Callable): + @wraps(fn) + def outer(): + if ROLE != 'server': + return + return fn() + return outer + +def has_metric_dir(fn: Callable): + @wraps(fn) + def outer(): + if not MONITOR_FIRSTLVL_DIR: + return + return fn() + return outer + +cron = Cron() + class Singleton: """Timer singleton""" cached_metrics=None cached_du: Dict[str, str] = {} - timers: List[RepeatTimer] = [] - res_mtime: float = None cached_res_usage: Dict[str, Tuple[float, float]] = {} @staticmethod - def populate_celery(): - if ROLE == 'server': - Singleton.cached_metrics = refresh_prom_metrics() - + @cron.minutely + @has_metric_dir + @is_server + def parse_metrics_txt(): + def parse_cpu(text: str) -> float: + if text.endswith("m"): + return float(text.replace("m", "")) + raise ValueError(f"Cannot parse cpu text {text}") + + def parse_memory(text: str) -> float: + if text.endswith("Mi"): + return float(text.replace("Mi", "")) * 1024 * 1024 + raise ValueError(f"Cannot parse memory text {text}") + + def parse_text(text: str): + titles = ["NAME", "CPU", "MEMORY"] + + Singleton.cached_res_usage.clear() - @staticmethod - def parse_cpu(text: str) -> float: - if text.endswith("m"): - return float(text.replace("m", "")) - raise ValueError(f"Cannot parse cpu text {text}") + for line in text.splitlines(): + if all(t in line for t in titles): + continue + podname, cpuusage, memoryusage = line.split() + try: + Singleton.cached_res_usage[podname] = ( + str(parse_cpu(cpuusage)), + str(parse_memory(memoryusage)), + ) + except Exception as e: + general_logger.error(f"Cannot parse line: {str(e)}") - @staticmethod - def parse_memory(text: str) -> float: - if text.endswith("Mi"): - return float(text.replace("Mi", "")) * 1024 * 1024 - raise ValueError(f"Cannot parse memory text {text}") + try: + metrics_path = Path(MONITOR_FIRSTLVL_DIR) / "metrics.txt" + metric_text = metrics_path.read_text() + Singleton.res_mtime = metrics_path.lstat().st_mtime + parse_text(metric_text) + + except FileNotFoundError as e: + ... + except Exception as e: + general_logger.error(f"Reading metrics.txt error: {str(e)}") @staticmethod - def parse_text(text: str): - titles = ["NAME", "CPU", "MEMORY"] + @cron.ten_minutely + @has_metric_dir + @is_server + def first_lvl_du(): + + try: + dirs = os.listdir(MONITOR_FIRSTLVL_DIR) + except Exception as e: + general_logger.warn(f"Failed to listdir of {MONITOR_FIRSTLVL_DIR}: {str(e)}") + return - Singleton.cached_res_usage.clear() - - for line in text.splitlines(): - if all(t in line for t in titles): + for dir in dirs: + if dir == "lost+found": continue - podname, cpuusage, memoryusage = line.split() - try: - Singleton.cached_res_usage[podname] = ( - str(Singleton.parse_cpu(cpuusage)), - str(Singleton.parse_memory(memoryusage)), - ) - except Exception as e: - general_logger.error(f"Cannot parse line: {str(e)}") - - @staticmethod - def timed_get_metrics(): - if ROLE == 'server' and MONITOR_FIRSTLVL_DIR: - Singleton.res_mtime = None + path_to_dir = Path(MONITOR_FIRSTLVL_DIR) / dir try: - metrics_path = Path(MONITOR_FIRSTLVL_DIR) / "metrics.txt" - metric_text = metrics_path.read_text() - Singleton.res_mtime = metrics_path.lstat().st_mtime - Singleton.parse_text(metric_text) - - except FileNotFoundError as e: - ... + result = run(["du", "-s", str(path_to_dir)], capture_output=True, text=True) + size_b, *_ = result.stdout.split("\t") + Singleton.cached_du[dir] = int(size_b) except Exception as e: - general_logger.error(f"Reading metrics.txt error: {str(e)}") - + general_logger.warn(f"Failed to check du of {str(path_to_dir)}: {str(e)}") - # n.b. cannot use shutil.disk_usage . It seems it - # queries mount used/free and not directory - try: - dirs = os.listdir(MONITOR_FIRSTLVL_DIR) - except Exception as e: - general_logger.warn(f"Failed to listdir of {MONITOR_FIRSTLVL_DIR}: {str(e)}") - return + @staticmethod + @cron.minutely + @is_server + def refresh_metric(): + """Refresh metrics.""" + from api.worker.app import app + from prometheus_client import Gauge, CollectorRegistry, generate_latest + + registry = CollectorRegistry() + common_kwargs = { + 'registry':registry, + 'namespace':NAME_SPACE, + } + + cpu_usage = Gauge("resource_usage_cpu", + "CPU usage by pods", + labelnames=("podname",), + **common_kwargs) + + memory_usage = Gauge("resource_usage_memory", + "RAM usage by pods", + labelnames=("podname",), + **common_kwargs) + + for podname, (cpu, ram) in Singleton.cached_res_usage.items(): + cpu_usage.labels(podname=podname).set(cpu) + memory_usage.labels(podname=podname).set(ram) + + res_timestamp = Gauge("resource_usage_timestamp", + "Timestamp", **common_kwargs) + if Singleton.res_mtime: + res_timestamp.set(Singleton.res_mtime) + + du = Gauge(f"firstlvl_folder_disk_usage", + "Bytes used by first level folders", + labelnames=("folder_name",), + **common_kwargs) + for folder_name, size_b in Singleton.cached_du.items(): + du.labels(folder_name=folder_name).set(size_b) + + num_task_in_q_gauge = Gauge(f"num_task_in_q", + "Number of tasks in queue (not yet picked up by workers)", + labelnames=("q_name",), + **common_kwargs) + num_worker_gauge = Gauge("num_workers", + "Number of workers", + labelnames=("version", "namespace", "queue"), **common_kwargs) + scheduled_gauge = Gauge("scheduled_tasks","Number of scheduled tasks", labelnames=("hostname",), **common_kwargs) + active_gauge = Gauge("active_tasks", "Number of active tasks", labelnames=("hostname",), **common_kwargs) + reserved_gauge = Gauge("reserved_tasks", "Number of reserved tasks", labelnames=("hostname",), **common_kwargs) + last_pinged = Gauge("last_pinged", "Last pinged time", labelnames=[], **common_kwargs) + + # assuming we are using redis as broker + import redis + + _r = redis.from_url(CELERY_CONFIG.broker_url) + + last_pinged.set_to_current_time() + + # number of tasks in queue + for q in CELERY_CONFIG.task_queues.keys(): + num_task_in_q_gauge.labels(q_name=q).set(_r.llen(q)) + + i = app.control.inspect() + + # number of active workers + result = app.control.inspect().active_queues() + + tally = defaultdict(int) + for hostname in (result or {}): + for queue in result[hostname]: + routing_key = queue.get("routing_key") + *_, namespace, queue = routing_key.split(".") + version = ".".join(_) + tally[(version, namespace, queue)] += 1 + + for ((version, namespace, queue), total) in tally.items(): + num_worker_gauge.labels(version=version, + namespace=namespace, + queue=queue).set(total) + + for workername, queue in (i.scheduled() or {}).items(): + scheduled_gauge.labels(hostname=workername).set(len(queue)) - for dir in dirs: - if dir == "lost+found": - continue - path_to_dir = Path(MONITOR_FIRSTLVL_DIR) / dir - try: - result = run(["du", "-s", str(path_to_dir)], capture_output=True, text=True) - size_b, *_ = result.stdout.split("\t") - Singleton.cached_du[dir] = int(size_b) - except Exception as e: - general_logger.warn(f"Failed to check du of {str(path_to_dir)}: {str(e)}") + for workername, queue in (i.active() or {}).items(): + active_gauge.labels(hostname=workername).set(len(queue)) + for workername, queue in (i.reserved() or {}).items(): + reserved_gauge.labels(hostname=workername).set(len(queue)) + + Singleton.cached_metrics = generate_latest(registry) + def on_startup(): """On startup""" - Singleton.populate_celery() - Singleton.timed_get_metrics() - - Singleton.timers = [ - RepeatTimer(60, Singleton.populate_celery), - RepeatTimer(600, Singleton.timed_get_metrics), - ] + cron.start() - for timer in Singleton.timers: - timer.start() - def on_terminate(): """On terminate""" - for timer in Singleton.timers: - timer.cancel() - -def refresh_prom_metrics(): - """Refresh metrics.""" - from api.worker.app import app - from prometheus_client import Gauge, CollectorRegistry, generate_latest - - registry = CollectorRegistry() - common_kwargs = { - 'registry':registry, - 'namespace':NAME_SPACE, - } - - cpu_usage = Gauge("resource_usage_cpu", - "CPU usage by pods", - labelnames=("podname",), - **common_kwargs) - - memory_usage = Gauge("resource_usage_memory", - "RAM usage by pods", - labelnames=("podname",), - **common_kwargs) - - for podname, (cpu, ram) in Singleton.cached_res_usage.items(): - cpu_usage.labels(podname=podname).set(cpu) - memory_usage.labels(podname=podname).set(ram) - - res_timestamp = Gauge("resource_usage_timestamp", - "Timestamp", **common_kwargs) - if Singleton.res_mtime: - res_timestamp.set(Singleton.res_mtime) - - du = Gauge(f"firstlvl_folder_disk_usage", - "Bytes used by first level folders", - labelnames=("folder_name",), - **common_kwargs) - for folder_name, size_b in Singleton.cached_du.items(): - du.labels(folder_name=folder_name).set(size_b) - - num_task_in_q_gauge = Gauge(f"num_task_in_q", - "Number of tasks in queue (not yet picked up by workers)", - labelnames=("q_name",), - **common_kwargs) - num_worker_gauge = Gauge("num_workers", - "Number of workers", - labelnames=("version", "namespace", "queue"), **common_kwargs) - scheduled_gauge = Gauge("scheduled_tasks","Number of scheduled tasks", labelnames=("hostname",), **common_kwargs) - active_gauge = Gauge("active_tasks", "Number of active tasks", labelnames=("hostname",), **common_kwargs) - reserved_gauge = Gauge("reserved_tasks", "Number of reserved tasks", labelnames=("hostname",), **common_kwargs) - last_pinged = Gauge("last_pinged", "Last pinged time", labelnames=[], **common_kwargs) - - # assuming we are using redis as broker - import redis - - _r = redis.from_url(CELERY_CONFIG.broker_url) - - last_pinged.set_to_current_time() - - # number of tasks in queue - for q in CELERY_CONFIG.task_queues.keys(): - num_task_in_q_gauge.labels(q_name=q).set(_r.llen(q)) - - i = app.control.inspect() - - # number of active workers - result = app.control.inspect().active_queues() - - tally = defaultdict(int) - for hostname in (result or {}): - for queue in result[hostname]: - routing_key = queue.get("routing_key") - *_, namespace, queue = routing_key.split(".") - version = ".".join(_) - tally[(version, namespace, queue)] += 1 - - for ((version, namespace, queue), total) in tally.items(): - num_worker_gauge.labels(version=version, - namespace=namespace, - queue=queue).set(total) - - for workername, queue in (i.scheduled() or {}).items(): - scheduled_gauge.labels(hostname=workername).set(len(queue)) - - for workername, queue in (i.active() or {}).items(): - active_gauge.labels(hostname=workername).set(len(queue)) - - for workername, queue in (i.reserved() or {}).items(): - reserved_gauge.labels(hostname=workername).set(len(queue)) + cron.stop() - return generate_latest(registry) def prom_metrics_resp(): """Return PlainTextResponse of metrics"""