From aa4f817856a6925f69fdbb376e197c96196060da Mon Sep 17 00:00:00 2001 From: "Jens L." Date: Mon, 23 Dec 2024 22:13:38 +0100 Subject: [PATCH] admin: monitor worker version (#12463) * root: include version in celery ping Signed-off-by: Jens Langhammer * check version in worker endpoint Signed-off-by: Jens Langhammer * include worker version in prom metrics Signed-off-by: Jens Langhammer * format Signed-off-by: Jens Langhammer * fix tests Signed-off-by: Jens Langhammer --------- Signed-off-by: Jens Langhammer --- authentik/admin/api/workers.py | 41 ++++++++++++++++--- authentik/admin/apps.py | 3 +- authentik/admin/signals.py | 27 ++++++++++-- authentik/admin/tests/test_api.py | 2 +- authentik/root/celery.py | 8 ++++ blueprints/schema.json | 2 +- schema.yml | 20 ++++++--- .../admin-overview/cards/WorkerStatusCard.ts | 23 +++++++---- 8 files changed, 100 insertions(+), 26 deletions(-) diff --git a/authentik/admin/api/workers.py b/authentik/admin/api/workers.py index 86578a50ebb5..b7c2f08b2cf0 100644 --- a/authentik/admin/api/workers.py +++ b/authentik/admin/api/workers.py @@ -1,12 +1,16 @@ """authentik administration overview""" +from socket import gethostname + from django.conf import settings from drf_spectacular.utils import extend_schema, inline_serializer -from rest_framework.fields import IntegerField +from packaging.version import parse +from rest_framework.fields import BooleanField, CharField from rest_framework.request import Request from rest_framework.response import Response from rest_framework.views import APIView +from authentik import get_full_version from authentik.rbac.permissions import HasPermission from authentik.root.celery import CELERY_APP @@ -16,11 +20,38 @@ class WorkerView(APIView): permission_classes = [HasPermission("authentik_rbac.view_system_info")] - @extend_schema(responses=inline_serializer("Workers", fields={"count": IntegerField()})) + @extend_schema( + responses=inline_serializer( + "Worker", + fields={ + "worker_id": CharField(), + "version": CharField(), + "version_matching": BooleanField(), + }, + many=True, + ) + ) def get(self, request: Request) -> Response: """Get currently connected worker count.""" - count = len(CELERY_APP.control.ping(timeout=0.5)) + raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5) + our_version = parse(get_full_version()) + response = [] + for worker in raw: + key = list(worker.keys())[0] + version = worker[key].get("version") + version_matching = False + if version: + version_matching = parse(version) == our_version + response.append( + {"worker_id": key, "version": version, "version_matching": version_matching} + ) # In debug we run with `task_always_eager`, so tasks are ran on the main process if settings.DEBUG: # pragma: no cover - count += 1 - return Response({"count": count}) + response.append( + { + "worker_id": f"authentik-debug@{gethostname()}", + "version": get_full_version(), + "version_matching": True, + } + ) + return Response(response) diff --git a/authentik/admin/apps.py b/authentik/admin/apps.py index d151a6f953ad..def7d51ede30 100644 --- a/authentik/admin/apps.py +++ b/authentik/admin/apps.py @@ -1,11 +1,10 @@ """authentik admin app config""" -from prometheus_client import Gauge, Info +from prometheus_client import Info from authentik.blueprints.apps import ManagedAppConfig PROM_INFO = Info("authentik_version", "Currently running authentik version") -GAUGE_WORKERS = Gauge("authentik_admin_workers", "Currently connected workers") class AuthentikAdminConfig(ManagedAppConfig): diff --git a/authentik/admin/signals.py b/authentik/admin/signals.py index 6bd906972c61..d6856b0fa9d4 100644 --- a/authentik/admin/signals.py +++ b/authentik/admin/signals.py @@ -1,14 +1,35 @@ """admin signals""" from django.dispatch import receiver +from packaging.version import parse +from prometheus_client import Gauge -from authentik.admin.apps import GAUGE_WORKERS +from authentik import get_full_version from authentik.root.celery import CELERY_APP from authentik.root.monitoring import monitoring_set +GAUGE_WORKERS = Gauge( + "authentik_admin_workers", + "Currently connected workers, their versions and if they are the same version as authentik", + ["version", "version_matched"], +) + + +_version = parse(get_full_version()) + @receiver(monitoring_set) def monitoring_set_workers(sender, **kwargs): """Set worker gauge""" - count = len(CELERY_APP.control.ping(timeout=0.5)) - GAUGE_WORKERS.set(count) + raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5) + worker_version_count = {} + for worker in raw: + key = list(worker.keys())[0] + version = worker[key].get("version") + version_matching = False + if version: + version_matching = parse(version) == _version + worker_version_count.setdefault(version, {"count": 0, "matching": version_matching}) + worker_version_count[version]["count"] += 1 + for version, stats in worker_version_count.items(): + GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"]) diff --git a/authentik/admin/tests/test_api.py b/authentik/admin/tests/test_api.py index 8d7d5eb44a5e..ed6656470ab7 100644 --- a/authentik/admin/tests/test_api.py +++ b/authentik/admin/tests/test_api.py @@ -34,7 +34,7 @@ def test_workers(self): response = self.client.get(reverse("authentik_api:admin_workers")) self.assertEqual(response.status_code, 200) body = loads(response.content) - self.assertEqual(body["count"], 0) + self.assertEqual(len(body), 0) def test_metrics(self): """Test metrics API""" diff --git a/authentik/root/celery.py b/authentik/root/celery.py index 4d669ec02d2d..0ec011e9b036 100644 --- a/authentik/root/celery.py +++ b/authentik/root/celery.py @@ -18,6 +18,7 @@ task_prerun, worker_ready, ) +from celery.worker.control import inspect_command from django.conf import settings from django.db import ProgrammingError from django_tenants.utils import get_public_schema_name @@ -25,6 +26,7 @@ from structlog.stdlib import get_logger from tenant_schemas_celery.app import CeleryApp as TenantAwareCeleryApp +from authentik import get_full_version from authentik.lib.sentry import before_send from authentik.lib.utils.errors import exception_to_string @@ -159,6 +161,12 @@ def update_heartbeat_file(self, worker: Worker): HEARTBEAT_FILE.touch() +@inspect_command(default_timeout=0.2) +def ping(state, **kwargs): + """Ping worker(s).""" + return {"ok": "pong", "version": get_full_version()} + + CELERY_APP.config_from_object(settings.CELERY) # Load task modules from all registered Django app configs. diff --git a/blueprints/schema.json b/blueprints/schema.json index 95fae197bcc1..360da1e18211 100644 --- a/blueprints/schema.json +++ b/blueprints/schema.json @@ -4159,7 +4159,7 @@ "re_evaluate_policies": { "type": "boolean", "title": "Re evaluate policies", - "description": "Evaluate policies when the Stage is present to the user." + "description": "Evaluate policies when the Stage is presented to the user." }, "order": { "type": "integer", diff --git a/schema.yml b/schema.yml index b33a64345d80..bcbe961d1293 100644 --- a/schema.yml +++ b/schema.yml @@ -349,7 +349,7 @@ paths: description: '' /admin/workers/: get: - operationId: admin_workers_retrieve + operationId: admin_workers_list description: Get currently connected worker count. tags: - admin @@ -360,7 +360,9 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Workers' + type: array + items: + $ref: '#/components/schemas/Worker' description: '' '400': content: @@ -56987,13 +56989,19 @@ components: required: - aaguid - description - Workers: + Worker: type: object properties: - count: - type: integer + worker_id: + type: string + version: + type: string + version_matching: + type: boolean required: - - count + - version + - version_matching + - worker_id modelRequest: oneOf: - $ref: '#/components/schemas/GoogleWorkspaceProviderRequest' diff --git a/web/src/admin/admin-overview/cards/WorkerStatusCard.ts b/web/src/admin/admin-overview/cards/WorkerStatusCard.ts index 07cf161931d6..f4f640382f86 100644 --- a/web/src/admin/admin-overview/cards/WorkerStatusCard.ts +++ b/web/src/admin/admin-overview/cards/WorkerStatusCard.ts @@ -8,34 +8,41 @@ import { msg } from "@lit/localize"; import { TemplateResult, html } from "lit"; import { customElement } from "lit/decorators.js"; -import { AdminApi } from "@goauthentik/api"; +import { AdminApi, Worker } from "@goauthentik/api"; @customElement("ak-admin-status-card-workers") -export class WorkersStatusCard extends AdminStatusCard { +export class WorkersStatusCard extends AdminStatusCard { icon = "pf-icon pf-icon-server"; - getPrimaryValue(): Promise { - return new AdminApi(DEFAULT_CONFIG).adminWorkersRetrieve().then((workers) => { - return workers.count; - }); + getPrimaryValue(): Promise { + return new AdminApi(DEFAULT_CONFIG).adminWorkersList(); } renderHeader(): TemplateResult { return html`${msg("Workers")}`; } - getStatus(value: number): Promise { - if (value < 1) { + getStatus(value: Worker[]): Promise { + if (value.length < 1) { return Promise.resolve({ icon: "fa fa-times-circle pf-m-danger", message: html`${msg("No workers connected. Background tasks will not run.")}`, }); + } else if (value.filter((w) => !w.versionMatching).length > 0) { + return Promise.resolve({ + icon: "fa fa-times-circle pf-m-danger", + message: html`${msg("Worker with incorrect version connected.")}`, + }); } else { return Promise.resolve({ icon: "fa fa-check-circle pf-m-success", }); } } + + renderValue() { + return html`${this.value?.length}`; + } } declare global {