Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] implement last_activity collection via prometheus metrics #229

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# -p 9090:9090 \

docker run \
-p 9090:9090 \
-v $PWD/prometheus.yml:/etc/prometheus/prometheus.yml \
prom/prometheus
34 changes: 34 additions & 0 deletions examples/metrics/jupyterhub_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""sample jupyterhub config file for testing

configures jupyterhub to run with traefik_file.

configures jupyterhub with dummyauthenticator and simplespawner
to enable testing without administrative privileges.

requires jupyterhub 1.0
"""

c = get_config() # noqa

c.JupyterHub.proxy_class = "traefik_file"
c.TraefikFileProviderProxy.traefik_api_username = "admin"
c.TraefikFileProviderProxy.traefik_api_password = "admin"
c.TraefikFileProviderProxy.traefik_log_level = "INFO"

c.TraefikProxy.enable_last_activity = True
c.TraefikProxy.last_activity_prometheus_url = "http://127.0.0.1:9090"
c.JupyterHub.log_level = "DEBUG"
# use dummy and simple auth/spawner for testing
c.JupyterHub.authenticator_class = "dummy"
c.JupyterHub.spawner_class = "simple"

# run notebooks in the current directory
from pathlib import Path

here = Path(__file__).absolute().parent
c.Spawner.notebook_dir = str(here)

# l
c.JupyterHub.cleanup_servers = False
# collect activity freqeuently for easier debugging
c.JupyterHub.last_activity_interval = 10
14 changes: 14 additions & 0 deletions examples/metrics/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: "traefik"

# Override the global default and scrape targets from this job every 5 seconds.
scrape_interval: 5s
basic_auth:
username: admin
password: admin

static_configs:
- targets:
- "localhost:8099"
- "localhost:12345"
244 changes: 242 additions & 2 deletions jupyterhub_traefik_proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
# Distributed under the terms of the Modified BSD License.

import asyncio
import io
import json
import os
import re
import ssl
from datetime import datetime, timezone
from os.path import abspath
from subprocess import Popen, TimeoutExpired
from urllib.parse import urlparse, urlunparse
from urllib.parse import urlencode, urlparse, urlunparse

from jupyterhub.proxy import Proxy
from jupyterhub.utils import exponential_backoff, new_token, url_path_join
Expand Down Expand Up @@ -300,6 +303,66 @@ def _warn_empty_username(self):
config=True,
help="""Timeout (in seconds) when waiting for traefik to register an updated route.""",
)
enable_metrics = Bool(
False,
config=True,
help="""Enable prometheus metrics""",
)

metrics_entrypoint = Unicode(
"",
config=True,
help="""
""",
)

@default("metrics_entrypoint")
def _default_metrics_entrypoint(self):
if self.last_activity_prometheus_url:
return "metrics"
else:
# not using prometheus
return self.traefik_api_entrypoint

metrics_entrypoint_address = Unicode(
":12345",
config=True,
help="""
Set the bind address for the metrics endpoint.
Used only when enable_metrics is True and metrics_endpoint
If specified,
""",
)

enable_last_activity = Bool(
False,
config=True,
help="""
Enable updating user last_activity based network traffic through the proxy.

Not usually required, but may be enabled to match
ConfigurableHTTPProxy's behavior.

If last_activity_prometheus_url is defined, uses prometheus,
otherwise communicates directly with the traefik API.

A prometheus endpoint is _required_ for accurate results if there is more than one traefik replica.
""",
)

@observe("enable_last_activity")
def _last_activity_enabled(self, change):
if change.new:
self.enable_metrics = True

last_activity_prometheus_url = Unicode(
config=True,
help="""
Compute last_activity data from prometheus

Required for accurate last_activity data with multiple replicas.
""",
)

def _generate_htpassword(self):
from passlib.hash import apr_md5_crypt
Expand Down Expand Up @@ -489,6 +552,21 @@ async def _setup_traefik_static_config(self):
}
}

# static_config["api"] = {"dashboard": True}
if self.enable_metrics:
metrics_prometheus = {}
static_config["metrics"] = {"prometheus": metrics_prometheus}
if self.metrics_entrypoint:
metrics_prometheus["entrypoint"] = self.metrics_entrypoint

if self.enable_last_activity:
# need router labels for last_activity
metrics_prometheus["addRoutersLabels"] = True
if self.metrics_entrypoint not in entrypoints:
entrypoints[self.metrics_entrypoint] = {
"address": self.metrics_entrypoint_address,
}

# load what we just defined at _lower_ priority
# than anything added to self.static_config in a subclass before this
self.static_config = deep_merge(static_config, self.static_config)
Expand Down Expand Up @@ -530,6 +608,15 @@ async def _setup_traefik_dynamic_config(self):
}
middlewares["auth_api"] = {"basicAuth": {"users": [api_credentials]}}

# add router for metrics if using internal last_activity
if self.enable_last_activity and not self.last_activity_prometheus_url:
routers["metrics_last_activity"] = {
"rule": f"Host(`{api_url.hostname}`) && Path(`/metrics`)",
"entryPoints": [self.traefik_api_entrypoint],
"service": "prometheus@internal",
"middlewares": ["auth_api"],
}

# add default ssl cert/keys
if self.ssl_cert and self.ssl_key:
dynamic_config["tls"] = {
Expand Down Expand Up @@ -721,6 +808,8 @@ async def delete_route(self, routespec):
routespec = self.validate_routespec(routespec)
traefik_keys, jupyterhub_keys = self._keys_for_route(routespec)
await self._delete_dynamic_config(traefik_keys, jupyterhub_keys)
router_name = traefik_utils.generate_alias(routespec, "router")
self._last_activity.pop(router_name, None)
self.log.debug("Route %s was deleted.", routespec)

async def _get_jupyterhub_dynamic_config(self):
Expand All @@ -735,6 +824,144 @@ async def check_routes(self, *args, **kwargs):
await self._start_future
return await super().check_routes(*args, **kwargs)

async def _collect_last_activity(self):
if self.last_activity_prometheus_url:
await self._collect_last_activity_prometheus()
else:
await self._collect_last_activity_api()

_previous_router_bytes = Dict()
_last_activity = Dict()

async def _collect_last_activity_api(self):
"""Collect last_activity via traefik metrics API

Assumes a single traefik replica, otherwise must use prometheus
to handle cross-replica aggregation
"""
self.log.debug("Collecting router activity from traefik API")
# string timestamp is what's expected
now = datetime.now(timezone.utc).isoformat()
self.router_requests_bytes_metric_name = "traefik_router_requests_bytes_total"
metrics_resp = await self._traefik_api_request("/metrics")
router_bytes = self._sum_prometheus_router_bytes(
metrics_resp.body.decode("utf8", "replace")
)
previous_router_bytes = self._previous_router_bytes
self._previous_router_bytes = router_bytes

if not previous_router_bytes:
# don't compute differences on first collection,
# which would mark every router as 'active'
return

for router_name, current_bytes in router_bytes.items():
previous_bytes = previous_router_bytes.get(router_name, 0)
if current_bytes > previous_bytes:
# there has been traffic since last collection
self.log.debug("Observed activity on %s", router_name)
self._last_activity[router_name] = now

def _sum_prometheus_router_bytes(self, prometheus_metrics_page):
"""Given a prometheus metrics endpoint, compute total bytes per router

Computes the equivalent of

sum(traefik_router_requests_bytes_total) by (router)

as a dict of {router_name: num_bytes}.
This must be _compared_ with a previous collection

"""
# metric line looks like:
# traefik_router_requests_bytes_total{code="200",method="PATCH",protocol="http",router="router__2Fuser_2Ftest_2F@file",service="service__2Fuser_2Ftest_2F@file"} 144

router_bytes = {}
label_re = re.compile(r'''(\w+)="([^"]*)"[,$]''')

# count any increase in bytes _in either direction_)
start_slugs = (
"traefik_router_requests_bytes_total{",
"traefik_router_responses_bytes_total{",
)
for line in io.StringIO(prometheus_metrics_page):
if not line.startswith(start_slugs):
# not a relevant line
continue

_metric_name, _, rest = line.partition("{")
label_s, _, value_s = rest.rpartition("}")
# are prometheus numbers always valid Python floats?
# e.g. '2.010629e+06'
# so far, answer seems to be yes
value = float(value_s.strip())

for label, label_value in label_re.findall(label_s):
if label == "router":
# turn 'router@file' into 'router'
router_name, _, provider = label_value.rpartition("@")
if provider == self.provider_name:
break
else:
# router label not found
continue

if router_name not in router_bytes:
router_bytes[router_name] = value
else:
router_bytes[router_name] += value
return router_bytes

_last_prometheus_collect = None

async def _collect_last_activity_prometheus(self):
"""Compute last_activity data from prometheus

Use this to aggregate activity when there are multiple traefik replicas
"""
now = datetime.now(timezone.utc)
now_ts = now.isoformat()
if self._last_prometheus_collect is None:
interval = "5m"
else:
interval_seconds = (now - self._last_prometheus_collect).total_seconds()
interval_seconds = max(int(interval_seconds), 10)
interval = f"{interval_seconds}s"
self._last_prometheus_collect = now
query = " + ".join(
[
f"sum(increase({metric}[{interval}])) by (router)"
for metric in (
"traefik_router_requests_bytes_total",
"traefik_router_responses_bytes_total",
)
]
)
prometheus_query_url = url_path_join(
self.last_activity_prometheus_url, "api/v1/query"
)
self.log.debug("Fetching %s with query=%s", prometheus_query_url, query)
resp = await AsyncHTTPClient().fetch(
prometheus_query_url, method="POST", body=urlencode({"query": query})
)
# TODO: error handling
response = json.loads(resp.body.decode("utf8", "replace"))
import pprint

pprint.pprint(response)
response["status"] == "success"
result = response["data"]["result"]
for record in result:
router_label = record['metric']['router']
router_name, _, provider = router_label.rpartition("@")
if provider != self.provider_name:
continue
_t, value_s = record['value']
value = float(value_s)
if value > 0:
self.log.debug("Observed activity %s on %s", value_s, router_name)
self._last_activity[router_name] = now_ts

async def get_all_routes(self):
"""Fetch and return all the routes associated by JupyterHub from the
proxy.
Expand All @@ -753,13 +980,26 @@ async def get_all_routes(self):
if self._start_future and not self._start_future.done():
await self._start_future

if self.enable_last_activity:
await self._collect_last_activity()

import pprint

pprint.pprint(self._last_activity)
jupyterhub_config = await self._get_jupyterhub_dynamic_config()

all_routes = {}
for _key, route in jupyterhub_config.get("routes", {}).items():
route_data = route.get("data", {})
router_name = route["router"]
last_activity = self._last_activity.get(router_name, None)
# if we are collecting last_activity data
# add it to route data
if last_activity:
route_data["last_activity"] = last_activity
all_routes[route["routespec"]] = {
"routespec": route["routespec"],
"data": route.get("data", {}),
"data": route_data,
"target": route["target"],
}
return all_routes