Skip to content

Commit

Permalink
admin: monitor worker version (goauthentik#12463)
Browse files Browse the repository at this point in the history
* root: include version in celery ping

Signed-off-by: Jens Langhammer <[email protected]>

* check version in worker endpoint

Signed-off-by: Jens Langhammer <[email protected]>

* include worker version in prom metrics

Signed-off-by: Jens Langhammer <[email protected]>

* format

Signed-off-by: Jens Langhammer <[email protected]>

* fix tests

Signed-off-by: Jens Langhammer <[email protected]>

---------

Signed-off-by: Jens Langhammer <[email protected]>
  • Loading branch information
BeryJu authored Dec 23, 2024
1 parent c3aefd5 commit aa4f817
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 26 deletions.
41 changes: 36 additions & 5 deletions authentik/admin/api/workers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""authentik administration overview"""

from socket import gethostname

from django.conf import settings
from drf_spectacular.utils import extend_schema, inline_serializer
from rest_framework.fields import IntegerField
from packaging.version import parse
from rest_framework.fields import BooleanField, CharField
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView

from authentik import get_full_version
from authentik.rbac.permissions import HasPermission
from authentik.root.celery import CELERY_APP

Expand All @@ -16,11 +20,38 @@ class WorkerView(APIView):

permission_classes = [HasPermission("authentik_rbac.view_system_info")]

@extend_schema(responses=inline_serializer("Workers", fields={"count": IntegerField()}))
@extend_schema(
responses=inline_serializer(
"Worker",
fields={
"worker_id": CharField(),
"version": CharField(),
"version_matching": BooleanField(),
},
many=True,
)
)
def get(self, request: Request) -> Response:
"""Get currently connected worker count."""
count = len(CELERY_APP.control.ping(timeout=0.5))
raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5)
our_version = parse(get_full_version())
response = []
for worker in raw:
key = list(worker.keys())[0]
version = worker[key].get("version")
version_matching = False
if version:
version_matching = parse(version) == our_version
response.append(
{"worker_id": key, "version": version, "version_matching": version_matching}
)
# In debug we run with `task_always_eager`, so tasks are ran on the main process
if settings.DEBUG: # pragma: no cover
count += 1
return Response({"count": count})
response.append(
{
"worker_id": f"authentik-debug@{gethostname()}",
"version": get_full_version(),
"version_matching": True,
}
)
return Response(response)
3 changes: 1 addition & 2 deletions authentik/admin/apps.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""authentik admin app config"""

from prometheus_client import Gauge, Info
from prometheus_client import Info

from authentik.blueprints.apps import ManagedAppConfig

PROM_INFO = Info("authentik_version", "Currently running authentik version")
GAUGE_WORKERS = Gauge("authentik_admin_workers", "Currently connected workers")


class AuthentikAdminConfig(ManagedAppConfig):
Expand Down
27 changes: 24 additions & 3 deletions authentik/admin/signals.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
"""admin signals"""

from django.dispatch import receiver
from packaging.version import parse
from prometheus_client import Gauge

from authentik.admin.apps import GAUGE_WORKERS
from authentik import get_full_version
from authentik.root.celery import CELERY_APP
from authentik.root.monitoring import monitoring_set

GAUGE_WORKERS = Gauge(
"authentik_admin_workers",
"Currently connected workers, their versions and if they are the same version as authentik",
["version", "version_matched"],
)


_version = parse(get_full_version())


@receiver(monitoring_set)
def monitoring_set_workers(sender, **kwargs):
"""Set worker gauge"""
count = len(CELERY_APP.control.ping(timeout=0.5))
GAUGE_WORKERS.set(count)
raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5)
worker_version_count = {}
for worker in raw:
key = list(worker.keys())[0]
version = worker[key].get("version")
version_matching = False
if version:
version_matching = parse(version) == _version
worker_version_count.setdefault(version, {"count": 0, "matching": version_matching})
worker_version_count[version]["count"] += 1
for version, stats in worker_version_count.items():
GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
2 changes: 1 addition & 1 deletion authentik/admin/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_workers(self):
response = self.client.get(reverse("authentik_api:admin_workers"))
self.assertEqual(response.status_code, 200)
body = loads(response.content)
self.assertEqual(body["count"], 0)
self.assertEqual(len(body), 0)

def test_metrics(self):
"""Test metrics API"""
Expand Down
8 changes: 8 additions & 0 deletions authentik/root/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
task_prerun,
worker_ready,
)
from celery.worker.control import inspect_command
from django.conf import settings
from django.db import ProgrammingError
from django_tenants.utils import get_public_schema_name
from structlog.contextvars import STRUCTLOG_KEY_PREFIX
from structlog.stdlib import get_logger
from tenant_schemas_celery.app import CeleryApp as TenantAwareCeleryApp

from authentik import get_full_version
from authentik.lib.sentry import before_send
from authentik.lib.utils.errors import exception_to_string

Expand Down Expand Up @@ -159,6 +161,12 @@ def update_heartbeat_file(self, worker: Worker):
HEARTBEAT_FILE.touch()


@inspect_command(default_timeout=0.2)
def ping(state, **kwargs):
"""Ping worker(s)."""
return {"ok": "pong", "version": get_full_version()}


CELERY_APP.config_from_object(settings.CELERY)

# Load task modules from all registered Django app configs.
Expand Down
2 changes: 1 addition & 1 deletion blueprints/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4159,7 +4159,7 @@
"re_evaluate_policies": {
"type": "boolean",
"title": "Re evaluate policies",
"description": "Evaluate policies when the Stage is present to the user."
"description": "Evaluate policies when the Stage is presented to the user."
},
"order": {
"type": "integer",
Expand Down
20 changes: 14 additions & 6 deletions schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ paths:
description: ''
/admin/workers/:
get:
operationId: admin_workers_retrieve
operationId: admin_workers_list
description: Get currently connected worker count.
tags:
- admin
Expand All @@ -360,7 +360,9 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/Workers'
type: array
items:
$ref: '#/components/schemas/Worker'
description: ''
'400':
content:
Expand Down Expand Up @@ -56987,13 +56989,19 @@ components:
required:
- aaguid
- description
Workers:
Worker:
type: object
properties:
count:
type: integer
worker_id:
type: string
version:
type: string
version_matching:
type: boolean
required:
- count
- version
- version_matching
- worker_id
modelRequest:
oneOf:
- $ref: '#/components/schemas/GoogleWorkspaceProviderRequest'
Expand Down
23 changes: 15 additions & 8 deletions web/src/admin/admin-overview/cards/WorkerStatusCard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,41 @@ import { msg } from "@lit/localize";
import { TemplateResult, html } from "lit";
import { customElement } from "lit/decorators.js";

import { AdminApi } from "@goauthentik/api";
import { AdminApi, Worker } from "@goauthentik/api";

@customElement("ak-admin-status-card-workers")
export class WorkersStatusCard extends AdminStatusCard<number> {
export class WorkersStatusCard extends AdminStatusCard<Worker[]> {
icon = "pf-icon pf-icon-server";

getPrimaryValue(): Promise<number> {
return new AdminApi(DEFAULT_CONFIG).adminWorkersRetrieve().then((workers) => {
return workers.count;
});
getPrimaryValue(): Promise<Worker[]> {
return new AdminApi(DEFAULT_CONFIG).adminWorkersList();
}

renderHeader(): TemplateResult {
return html`${msg("Workers")}`;
}

getStatus(value: number): Promise<AdminStatus> {
if (value < 1) {
getStatus(value: Worker[]): Promise<AdminStatus> {
if (value.length < 1) {
return Promise.resolve<AdminStatus>({
icon: "fa fa-times-circle pf-m-danger",
message: html`${msg("No workers connected. Background tasks will not run.")}`,
});
} else if (value.filter((w) => !w.versionMatching).length > 0) {
return Promise.resolve<AdminStatus>({
icon: "fa fa-times-circle pf-m-danger",
message: html`${msg("Worker with incorrect version connected.")}`,
});
} else {
return Promise.resolve<AdminStatus>({
icon: "fa fa-check-circle pf-m-success",
});
}
}

renderValue() {
return html`${this.value?.length}`;
}
}

declare global {
Expand Down

0 comments on commit aa4f817

Please sign in to comment.