diff --git a/examples/metrics/README.md b/examples/metrics/README.md new file mode 100644 index 00000000..a72baaa9 --- /dev/null +++ b/examples/metrics/README.md @@ -0,0 +1,6 @@ +# -p 9090:9090 \ + +docker run \ + -p 9090:9090 \ + -v $PWD/prometheus.yml:/etc/prometheus/prometheus.yml \ + prom/prometheus diff --git a/examples/metrics/jupyterhub_config.py b/examples/metrics/jupyterhub_config.py new file mode 100644 index 00000000..de613b9c --- /dev/null +++ b/examples/metrics/jupyterhub_config.py @@ -0,0 +1,34 @@ +"""sample jupyterhub config file for testing + +configures jupyterhub to run with traefik_file. + +configures jupyterhub with dummyauthenticator and simplespawner +to enable testing without administrative privileges. + +requires jupyterhub 1.0 +""" + +c = get_config() # noqa + +c.JupyterHub.proxy_class = "traefik_file" +c.TraefikFileProviderProxy.traefik_api_username = "admin" +c.TraefikFileProviderProxy.traefik_api_password = "admin" +c.TraefikFileProviderProxy.traefik_log_level = "INFO" + +c.TraefikProxy.enable_last_activity = True +c.TraefikProxy.last_activity_prometheus_url = "http://127.0.0.1:9090" +c.JupyterHub.log_level = "DEBUG" +# use dummy and simple auth/spawner for testing +c.JupyterHub.authenticator_class = "dummy" +c.JupyterHub.spawner_class = "simple" + +# run notebooks in the current directory +from pathlib import Path + +here = Path(__file__).absolute().parent +c.Spawner.notebook_dir = str(here) + +# l +c.JupyterHub.cleanup_servers = False +# collect activity freqeuently for easier debugging +c.JupyterHub.last_activity_interval = 10 diff --git a/examples/metrics/prometheus.yml b/examples/metrics/prometheus.yml new file mode 100644 index 00000000..67f04bf2 --- /dev/null +++ b/examples/metrics/prometheus.yml @@ -0,0 +1,14 @@ +scrape_configs: + # The job name is added as a label `job=` to any timeseries scraped from this config. + - job_name: "traefik" + + # Override the global default and scrape targets from this job every 5 seconds. + scrape_interval: 5s + basic_auth: + username: admin + password: admin + + static_configs: + - targets: + - "localhost:8099" + - "localhost:12345" diff --git a/jupyterhub_traefik_proxy/proxy.py b/jupyterhub_traefik_proxy/proxy.py index 15903484..1ddead3c 100644 --- a/jupyterhub_traefik_proxy/proxy.py +++ b/jupyterhub_traefik_proxy/proxy.py @@ -19,12 +19,15 @@ # Distributed under the terms of the Modified BSD License. import asyncio +import io import json import os +import re import ssl +from datetime import datetime, timezone from os.path import abspath from subprocess import Popen, TimeoutExpired -from urllib.parse import urlparse, urlunparse +from urllib.parse import urlencode, urlparse, urlunparse from jupyterhub.proxy import Proxy from jupyterhub.utils import exponential_backoff, new_token, url_path_join @@ -300,6 +303,66 @@ def _warn_empty_username(self): config=True, help="""Timeout (in seconds) when waiting for traefik to register an updated route.""", ) + enable_metrics = Bool( + False, + config=True, + help="""Enable prometheus metrics""", + ) + + metrics_entrypoint = Unicode( + "", + config=True, + help=""" + """, + ) + + @default("metrics_entrypoint") + def _default_metrics_entrypoint(self): + if self.last_activity_prometheus_url: + return "metrics" + else: + # not using prometheus + return self.traefik_api_entrypoint + + metrics_entrypoint_address = Unicode( + ":12345", + config=True, + help=""" + Set the bind address for the metrics endpoint. + Used only when enable_metrics is True and metrics_endpoint + If specified, + """, + ) + + enable_last_activity = Bool( + False, + config=True, + help=""" + Enable updating user last_activity based network traffic through the proxy. + + Not usually required, but may be enabled to match + ConfigurableHTTPProxy's behavior. + + If last_activity_prometheus_url is defined, uses prometheus, + otherwise communicates directly with the traefik API. + + A prometheus endpoint is _required_ for accurate results if there is more than one traefik replica. + """, + ) + + @observe("enable_last_activity") + def _last_activity_enabled(self, change): + if change.new: + self.enable_metrics = True + + last_activity_prometheus_url = Unicode( + config=True, + help=""" + Compute last_activity data from prometheus + + Required for accurate last_activity data with multiple replicas. + """, + ) def _generate_htpassword(self): from passlib.hash import apr_md5_crypt @@ -489,6 +552,21 @@ async def _setup_traefik_static_config(self): } } + # static_config["api"] = {"dashboard": True} + if self.enable_metrics: + metrics_prometheus = {} + static_config["metrics"] = {"prometheus": metrics_prometheus} + if self.metrics_entrypoint: + metrics_prometheus["entrypoint"] = self.metrics_entrypoint + + if self.enable_last_activity: + # need router labels for last_activity + metrics_prometheus["addRoutersLabels"] = True + if self.metrics_entrypoint not in entrypoints: + entrypoints[self.metrics_entrypoint] = { + "address": self.metrics_entrypoint_address, + } + # load what we just defined at _lower_ priority # than anything added to self.static_config in a subclass before this self.static_config = deep_merge(static_config, self.static_config) @@ -530,6 +608,15 @@ async def _setup_traefik_dynamic_config(self): } middlewares["auth_api"] = {"basicAuth": {"users": [api_credentials]}} + # add router for metrics if using internal last_activity + if self.enable_last_activity and not self.last_activity_prometheus_url: + routers["metrics_last_activity"] = { + "rule": f"Host(`{api_url.hostname}`) && Path(`/metrics`)", + "entryPoints": [self.traefik_api_entrypoint], + "service": "prometheus@internal", + "middlewares": ["auth_api"], + } + # add default ssl cert/keys if self.ssl_cert and self.ssl_key: dynamic_config["tls"] = { @@ -721,6 +808,8 @@ async def delete_route(self, routespec): routespec = self.validate_routespec(routespec) traefik_keys, jupyterhub_keys = self._keys_for_route(routespec) await self._delete_dynamic_config(traefik_keys, jupyterhub_keys) + router_name = traefik_utils.generate_alias(routespec, "router") + self._last_activity.pop(router_name, None) self.log.debug("Route %s was deleted.", routespec) async def _get_jupyterhub_dynamic_config(self): @@ -735,6 +824,144 @@ async def check_routes(self, *args, **kwargs): await self._start_future return await super().check_routes(*args, **kwargs) + async def _collect_last_activity(self): + if self.last_activity_prometheus_url: + await self._collect_last_activity_prometheus() + else: + await self._collect_last_activity_api() + + _previous_router_bytes = Dict() + _last_activity = Dict() + + async def _collect_last_activity_api(self): + """Collect last_activity via traefik metrics API + + Assumes a single traefik replica, otherwise must use prometheus + to handle cross-replica aggregation + """ + self.log.debug("Collecting router activity from traefik API") + # string timestamp is what's expected + now = datetime.now(timezone.utc).isoformat() + self.router_requests_bytes_metric_name = "traefik_router_requests_bytes_total" + metrics_resp = await self._traefik_api_request("/metrics") + router_bytes = self._sum_prometheus_router_bytes( + metrics_resp.body.decode("utf8", "replace") + ) + previous_router_bytes = self._previous_router_bytes + self._previous_router_bytes = router_bytes + + if not previous_router_bytes: + # don't compute differences on first collection, + # which would mark every router as 'active' + return + + for router_name, current_bytes in router_bytes.items(): + previous_bytes = previous_router_bytes.get(router_name, 0) + if current_bytes > previous_bytes: + # there has been traffic since last collection + self.log.debug("Observed activity on %s", router_name) + self._last_activity[router_name] = now + + def _sum_prometheus_router_bytes(self, prometheus_metrics_page): + """Given a prometheus metrics endpoint, compute total bytes per router + + Computes the equivalent of + + sum(traefik_router_requests_bytes_total) by (router) + + as a dict of {router_name: num_bytes}. + This must be _compared_ with a previous collection + + """ + # metric line looks like: + # traefik_router_requests_bytes_total{code="200",method="PATCH",protocol="http",router="router__2Fuser_2Ftest_2F@file",service="service__2Fuser_2Ftest_2F@file"} 144 + + router_bytes = {} + label_re = re.compile(r'''(\w+)="([^"]*)"[,$]''') + + # count any increase in bytes _in either direction_) + start_slugs = ( + "traefik_router_requests_bytes_total{", + "traefik_router_responses_bytes_total{", + ) + for line in io.StringIO(prometheus_metrics_page): + if not line.startswith(start_slugs): + # not a relevant line + continue + + _metric_name, _, rest = line.partition("{") + label_s, _, value_s = rest.rpartition("}") + # are prometheus numbers always valid Python floats? + # e.g. '2.010629e+06' + # so far, answer seems to be yes + value = float(value_s.strip()) + + for label, label_value in label_re.findall(label_s): + if label == "router": + # turn 'router@file' into 'router' + router_name, _, provider = label_value.rpartition("@") + if provider == self.provider_name: + break + else: + # router label not found + continue + + if router_name not in router_bytes: + router_bytes[router_name] = value + else: + router_bytes[router_name] += value + return router_bytes + + _last_prometheus_collect = None + + async def _collect_last_activity_prometheus(self): + """Compute last_activity data from prometheus + + Use this to aggregate activity when there are multiple traefik replicas + """ + now = datetime.now(timezone.utc) + now_ts = now.isoformat() + if self._last_prometheus_collect is None: + interval = "5m" + else: + interval_seconds = (now - self._last_prometheus_collect).total_seconds() + interval_seconds = max(int(interval_seconds), 10) + interval = f"{interval_seconds}s" + self._last_prometheus_collect = now + query = " + ".join( + [ + f"sum(increase({metric}[{interval}])) by (router)" + for metric in ( + "traefik_router_requests_bytes_total", + "traefik_router_responses_bytes_total", + ) + ] + ) + prometheus_query_url = url_path_join( + self.last_activity_prometheus_url, "api/v1/query" + ) + self.log.debug("Fetching %s with query=%s", prometheus_query_url, query) + resp = await AsyncHTTPClient().fetch( + prometheus_query_url, method="POST", body=urlencode({"query": query}) + ) + # TODO: error handling + response = json.loads(resp.body.decode("utf8", "replace")) + import pprint + + pprint.pprint(response) + response["status"] == "success" + result = response["data"]["result"] + for record in result: + router_label = record['metric']['router'] + router_name, _, provider = router_label.rpartition("@") + if provider != self.provider_name: + continue + _t, value_s = record['value'] + value = float(value_s) + if value > 0: + self.log.debug("Observed activity %s on %s", value_s, router_name) + self._last_activity[router_name] = now_ts + async def get_all_routes(self): """Fetch and return all the routes associated by JupyterHub from the proxy. @@ -753,13 +980,26 @@ async def get_all_routes(self): if self._start_future and not self._start_future.done(): await self._start_future + if self.enable_last_activity: + await self._collect_last_activity() + + import pprint + + pprint.pprint(self._last_activity) jupyterhub_config = await self._get_jupyterhub_dynamic_config() all_routes = {} for _key, route in jupyterhub_config.get("routes", {}).items(): + route_data = route.get("data", {}) + router_name = route["router"] + last_activity = self._last_activity.get(router_name, None) + # if we are collecting last_activity data + # add it to route data + if last_activity: + route_data["last_activity"] = last_activity all_routes[route["routespec"]] = { "routespec": route["routespec"], - "data": route.get("data", {}), + "data": route_data, "target": route["target"], } return all_routes