diff --git a/anomaly_detection/Dockerfile b/anomaly_detection/Dockerfile new file mode 100644 index 000000000..e7c70fc25 --- /dev/null +++ b/anomaly_detection/Dockerfile @@ -0,0 +1,17 @@ +# Pull base image +ARG TAG=latest +FROM ghcr.io/terragraph/tglib:$TAG + +ARG WORKDIR=/usr/local/anomaly_detection + +# Copy anomaly_detection source +WORKDIR $WORKDIR +COPY anomaly_detection/ .flake8 ./ + +RUN echo "http://dl-cdn.alpinelinux.org/alpine/v3.13/main" >> /etc/apk/repositories && \ + echo "http://dl-cdn.alpinelinux.org/alpine/v3.13/community" >> /etc/apk/repositories + +# Install my_service +RUN apk add py3-numpy && \ + apk add py3-scikit-learn && \ + pip install . diff --git a/anomaly_detection/anomaly_detection/__init__.py b/anomaly_detection/anomaly_detection/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/anomaly_detection/anomaly_detection/jobs.py b/anomaly_detection/anomaly_detection/jobs.py new file mode 100644 index 000000000..5cd91786e --- /dev/null +++ b/anomaly_detection/anomaly_detection/jobs.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. +import asyncio +import logging +import math +import time +from collections import defaultdict +from copy import deepcopy +from typing import Dict, Optional, List, Iterable + +import numpy as np +from sklearn.covariance import EllipticEnvelope +from sklearn.exceptions import NotFittedError +from sklearn.impute import KNNImputer +from sklearn.neighbors import LocalOutlierFactor +from sklearn.preprocessing import MinMaxScaler +from tglib.clients import APIServiceClient +from tglib.clients.prometheus_client import consts, PrometheusClient, PrometheusMetric +from tglib.exceptions import ClientRuntimeError + + +async def ad_single_metric_job( + time_ms: int, metric: str, step_s: int, duration_s: int, period_s: int +) -> None: + """This job loads the given link metric data from Prometheus for the past duration_s + sampled at step_s and runs two AD algorithms on it. It generates AD samples (1 for no + anomaly, -1 for anomaly) sampled at step_s for the past period_s (the rest of the data + up to duration_s is historical data for anomaly detection). + However, since samples cannot be written into Prometheus in the past, they are written + with a delay of period_s. So an anomaly at time 0 will show up as -1 at time period_s + in the time-series written into Prometheus.""" + + end_time = math.ceil(time_ms / 1e3 / step_s) * step_s + num_samples = math.ceil(duration_s / step_s) + start_time = end_time - num_samples * step_s + logging.info( + f"Running anomaly detection job for {metric} from {start_time} to {end_time}" + ) + + # Divide time into max_prom_samples chunks because only max_prom_samples + # can be read at one time + max_prom_samples = 11000 + num_iterations = math.ceil(num_samples / max_prom_samples) + + time_steps = [ + [ + start_time + i * max_prom_samples * step_s + step_s, + start_time + (i + 1) * max_prom_samples * step_s, + ] + for i in range(num_iterations) + ] + time_steps[-1][1] = end_time + + network_names = APIServiceClient.network_names() + prom_client = PrometheusClient(timeout=15) + network_reps = [] + coros = [] + + for network_name in network_names: + labels = { + consts.network: network_name, + consts.data_interval_s: str(step_s), + } + query = [prom_client.format_query(metric, labels)] + for query_start_time, query_end_time in time_steps: + coros.append( + fetch_metrics_from_queries( + prom_client, + network_name, + query, + query_start_time, + query_end_time, + step_s, + ) + ) + network_reps.append(network_name) + + # Load link metric data for all networks + network_stats = zip(network_reps, await asyncio.gather(*coros)) + + # Reshape data and run AD + label_val_map = gather_data(network_stats, start_time, end_time, step_s) + stats_to_write = analyze_data(label_val_map, metric, end_time, step_s, period_s) + + # Write AD samples back to Prometheus every step_s with a time delay + num_write = math.ceil(period_s / step_s) + for i in range(-num_write, 0): + logging.info(f"Writing to Prometheus for index {i}") + PrometheusClient.write_metrics(stats_to_write[i]) + await asyncio.sleep(step_s) + + return None + + +async def fetch_metrics_from_queries( + client: PrometheusClient, + network_name: str, + queries: List[str], + start_time: int, + end_time: int, + step: int, +) -> Optional[Dict]: + """Fetch latest metrics for all links in the network""" + coros = [] + for query in queries: + coros.append( + client.query_range(query, step=f"{step}s", start=start_time, end=end_time) + ) + try: + results: Dict = {} + for query, response in zip(queries, await asyncio.gather(*coros)): + if response["status"] != "success": + logging.error(f"Failed to fetch {query} data for {network_name}") + continue + results[query] = response["data"]["result"] + return results + except ClientRuntimeError: + logging.exception("Failed to fetch metrics from Prometheus.") + return None + + +def gather_data( + network_stats: Iterable, start_time: int, end_time: int, step: int +) -> Dict: + """This function takes Prometheus data and reshapes it into a multi-level + dictionary of network name to link name to link dir to list of values.""" + + label_val_map: defaultdict = defaultdict( + lambda: defaultdict(lambda: defaultdict(list)) + ) + for network, prom_results in network_stats: + if prom_results is None: + continue + for query, values in prom_results.items(): + logging.info(f"Processing data for network {network} and metric {query}") + if not values: + logging.debug(f"Found no {query} results for {network}") + continue + for result in values: + link_name = result["metric"][consts.link_name] + link_dir = result["metric"][consts.link_direction] + val_array = label_val_map[network][link_name][link_dir] + if len(val_array) == 0: + # Create empty array of length equal to duration_s sampled at step_s + val_array = [np.nan] * int((end_time - start_time) / step) + label_val_map[network][link_name][link_dir] = val_array + for timestamp, metric_value in result["values"]: + # Put values at the approporate index of array based on timestamp + val_array[int((int(timestamp) - start_time) / step - 1)] = int( + metric_value + ) + return label_val_map + + +def analyze_data( + label_val_map: Dict, metric: str, end_time: int, step: int, period: int +) -> Dict[int, List]: + """This function unpacks the label_val_map dictionary into labels of network name, + link name and link direction. It creates new Prometheus samples that are 1 or -1 + based on running AD algorithm with these labels. The new AD stats are collected in + the dict stats_to_write in order to be written one by one to Prometheus with a + time delay.""" + + num_write = math.ceil(period / step) + stats_to_write: Dict[int, List] = {i: [] for i in range(-num_write, 0)} + for network, depth1 in label_val_map.items(): + for link_name, depth2 in depth1.items(): + for link_dir, val_array in depth2.items(): + + labels = { + consts.network: network, + consts.link_name: link_name, + consts.link_direction: link_dir, + "delay": f"{period}-sec", + } + + outliers = run_ad(val_array) + for model, pred in outliers.items(): + + labels_model = deepcopy(labels) + labels_model["model"] = model + for i in range(-num_write, 0): + stats_to_write[i].append( + PrometheusMetric( + f"ad_{metric}", + labels_model, + pred[i], + int((end_time + step * (i + num_write)) * 1000), + ) + ) + + return stats_to_write + + +def run_ad(train: List[float]) -> Dict[str, List]: + """This function runs two AD algorithms on the input data.""" + contamination = 0.001 + anomaly_algorithms = [ + ("RobustCovariance", EllipticEnvelope(contamination=contamination)), + ( + "LocalOutlierFactor", + LocalOutlierFactor(contamination=contamination, n_neighbors=100), + ), + ] + + # Impute and scale data + imputer = KNNImputer(missing_values=np.nan, n_neighbors=10, weights="uniform") + scaler = MinMaxScaler() + train = np.array(train).reshape(-1, 1) + train_imp = imputer.fit_transform(train) + train_scaled = scaler.fit_transform(train_imp) + + # Run anomaly detection algorithms + y_pred = {} + for model_name, algorithm in anomaly_algorithms: + t0 = time.time() + try: + algorithm.fit(train_scaled) + t1 = time.time() + if model_name in ["LocalOutlierFactor"]: + pred = algorithm.fit_predict(train_scaled) + else: + pred = algorithm.fit(train_scaled).predict(train_scaled) + y_pred[model_name] = pred + logging.debug(f"Ran {model_name} algorithm in {t1 - t0} seconds") + logging.info( + f"{model_name} has {sum(pred==-1)} outliers in {len(train)} samples" + ) + except: # noqa + logging.info(f"Could not run {model_name} algorithm") + + return y_pred diff --git a/anomaly_detection/anomaly_detection/main.py b/anomaly_detection/anomaly_detection/main.py new file mode 100644 index 000000000..1fe8e95ec --- /dev/null +++ b/anomaly_detection/anomaly_detection/main.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + +import asyncio +import dataclasses +import json +import logging +import sys +import time +from typing import Any, Dict, NoReturn + +from tglib import init +from tglib.clients import APIServiceClient, PrometheusClient + +from . import jobs + + +@dataclasses.dataclass +class Job: + """Struct for representing pipeline job configurations.""" + + name: str + start_time_ms: int + params: Dict + + +async def produce( + queue: asyncio.Queue, name: str, pipeline: Dict[str, Any] +) -> NoReturn: + """Add jobs from the pipeline configuration to the shared queue.""" + while True: + start_time = time.time() + tasks = [] + for job in pipeline.get("jobs", []): + if job.get("enabled", False): + params = job.get("params", {}) + params.update({"period_s": pipeline["period_s"]}) + tasks.append( + queue.put( + Job( + name=job["name"], + start_time_ms=int(round(start_time * 1e3)), + params=params, + ) + ) + ) + + # Add the jobs to the queue + await asyncio.gather(*tasks) + + # Sleep until next invocation period + sleep_time = start_time + pipeline["period_s"] - time.time() + + logging.info( + f"Done enqueuing jobs in '{name}'. " + f"Added {len(tasks)} job(s) to the queue. Sleeping for {sleep_time:0.2f}s" + ) + + await asyncio.sleep(sleep_time) + + +async def consume(queue: asyncio.Queue) -> NoReturn: + """Consume and run a job from the shared queue.""" + while True: + # Wait for a job from the producers + job = await queue.get() + logging.info(f"Starting the '{job.name}' job") + + # Execute the job + function = getattr(jobs, job.name) + await function(job.start_time_ms, **job.params) + logging.info(f"Finished running the '{job.name}' job") + + +async def async_main(config: Dict[str, Any]) -> None: + logging.info("#### Starting the 'anomaly_detection' service ####") + logging.debug(f"service config: {config}") + + q: asyncio.Queue = asyncio.Queue() + + # Create producer coroutines + producers = [ + produce(q, name, pipeline) for name, pipeline in config["pipelines"].items() + ] + + # Create consumer coroutines + consumers = [consume(q) for _ in range(config["num_consumers"])] + + # Start the producer and consumer coroutines + await asyncio.gather(*producers, *consumers) + + +def main() -> None: + try: + with open("./service_config.json") as f: + config = json.load(f) + except (json.JSONDecodeError, OSError, KeyError): + logging.exception("Failed to parse configuration file.") + sys.exit(1) + + init(lambda: async_main(config), {APIServiceClient, PrometheusClient}) diff --git a/anomaly_detection/mypy.ini b/anomaly_detection/mypy.ini new file mode 100644 index 000000000..a132a369c --- /dev/null +++ b/anomaly_detection/mypy.ini @@ -0,0 +1,6 @@ +[mypy] +check_untyped_defs = True +ignore_missing_imports = True +show_error_context = True +warn_unused_ignores = True +warn_return_any = True diff --git a/anomaly_detection/ptrconfig b/anomaly_detection/ptrconfig new file mode 100644 index 000000000..ea3bd834d --- /dev/null +++ b/anomaly_detection/ptrconfig @@ -0,0 +1,8 @@ +[ptr] +venv_pkgs = + black + coverage + flake8 + mypy + pip + setuptools diff --git a/anomaly_detection/setup.py b/anomaly_detection/setup.py new file mode 100644 index 000000000..3d2ebf9c1 --- /dev/null +++ b/anomaly_detection/setup.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + +from setuptools import find_packages, setup + +ptr_params = { + "disabled": True, + "entry_point_module": "anomaly_detection/main", + "test_suite": "tests.base", + "test_suite_timeout": 300, + "required_coverage": {}, + "run_flake8": True, + "run_black": True, + "run_mypy": True, +} + +setup( + name="anomaly_detection", + version="2022.03.31", + packages=find_packages(exclude=["tests"]), + python_requires=">=3.7", + install_requires=["numpy>=1.16.4,<2.0"], + extras_require={"ci": ["ptr"], "docs": ["aiohttp-swagger>=1.0.9,<2.0"]}, + test_suite=ptr_params["test_suite"], + entry_points={ + "console_scripts": ["anomaly_detection = anomaly_detection.main:main"] + }, +) diff --git a/anomaly_detection/tests/__init__.py b/anomaly_detection/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/anomaly_detection/tests/base.py b/anomaly_detection/tests/base.py new file mode 100644 index 000000000..681a4ff29 --- /dev/null +++ b/anomaly_detection/tests/base.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +# Copyright 2004-present Facebook. All Rights Reserved. + +import logging +import unittest + +if __name__ == "__main__": + # Suppress logging statements during tests + logging.disable(logging.CRITICAL) + + # Run all tests + unittest.main() diff --git a/jenkins_scripts/develop.py b/jenkins_scripts/develop.py index d5444fd16..7d91e7643 100644 --- a/jenkins_scripts/develop.py +++ b/jenkins_scripts/develop.py @@ -141,6 +141,7 @@ def prepare(ctx, machines): "topology-service", "queryservice", "crashlog-analysis-service", + "anomaly_detection", ] for service in msa_services: @@ -259,7 +260,7 @@ async def go_impl( ) else: cmd( - f"ssh {kube_manager} docker service update --image {image_name}:dev {swarm_service}" + f"ssh {kube_manager} docker service update --force --image {image_name}:dev {swarm_service}" ) diff --git a/nms_stack/nms_cli/k8s_nms/ansible/group_vars/all.yml b/nms_stack/nms_cli/k8s_nms/ansible/group_vars/all.yml index 2ea938dda..f9bd4b924 100644 --- a/nms_stack/nms_cli/k8s_nms/ansible/group_vars/all.yml +++ b/nms_stack/nms_cli/k8s_nms/ansible/group_vars/all.yml @@ -264,6 +264,12 @@ msa_services: provider: climacell api_key: 1234 roles: "tg_topology_read" + anomaly_detection: + uses_database: false + image: secure.cxl-terragraph.com:443/anomaly_detection:stable + command: "anomaly_detection" + enabled: true + roles: "tg_topology_read" E2E_CONFIG_FILE: cfg/controller_config.json E2E_TOPOLOGY_FILE: e2e_topology.conf diff --git a/nms_stack/nms_cli/k8s_nms/manifests/msa/anomaly_detection/service_config.json b/nms_stack/nms_cli/k8s_nms/manifests/msa/anomaly_detection/service_config.json new file mode 100644 index 000000000..d8943fc6f --- /dev/null +++ b/nms_stack/nms_cli/k8s_nms/manifests/msa/anomaly_detection/service_config.json @@ -0,0 +1,19 @@ +{ + "num_consumers": 10, + "pipelines": { + "pipeline 1": { + "period_s": 1800, + "jobs": [ + { + "name": "ad_single_metric_job", + "enabled": true, + "params": { + "metric" : "snr", + "step_s" : 30, + "duration_s" : 86400 + } + } + ] + } + } + } diff --git a/nms_stack/nms_cli/k8s_nms/manifests/stats/prometheus.config.yml b/nms_stack/nms_cli/k8s_nms/manifests/stats/prometheus.config.yml index f135008a4..e6c8329d3 100644 --- a/nms_stack/nms_cli/k8s_nms/manifests/stats/prometheus.config.yml +++ b/nms_stack/nms_cli/k8s_nms/manifests/stats/prometheus.config.yml @@ -73,6 +73,11 @@ data: static_configs: - targets: ['weather-service:8080'] + - job_name: 'anomaly_detection' + metrics_path: /metrics + static_configs: + - targets: ['anomaly_detection:8080'] + alertmanager-default.yml: | global: resolve_timeout: 5m diff --git a/nms_stack/nms_cli/nms_stack/group_vars/all b/nms_stack/nms_cli/nms_stack/group_vars/all index 72acadea9..7b9ed9b36 100644 --- a/nms_stack/nms_cli/nms_stack/group_vars/all +++ b/nms_stack/nms_cli/nms_stack/group_vars/all @@ -227,6 +227,7 @@ msa_network_test_image: ghcr.io/terragraph/network_test msa_scan_service_image: ghcr.io/terragraph/scan_service msa_topology_service_image: ghcr.io/terragraph/topology_service msa_weather_service_image: ghcr.io/terragraph/weather_service +msa_anomaly_detection_image: ghcr.io/terragraph/anomaly_detection nms_image: ghcr.io/terragraph/nmsv2 nginx_image: ghcr.io/terragraph/nms_nginx query_service_image: ghcr.io/terragraph/cpp_backends diff --git a/nms_stack/nms_cli/nms_stack/roles/msa/tasks/main.yml b/nms_stack/nms_cli/nms_stack/roles/msa/tasks/main.yml index 43c7fc193..33a4e64f4 100644 --- a/nms_stack/nms_cli/nms_stack/roles/msa/tasks/main.yml +++ b/nms_stack/nms_cli/nms_stack/roles/msa/tasks/main.yml @@ -35,6 +35,7 @@ - { name: "scan_service", has_database: true, roles: "tg_scan_write,tg_topology_read" } - { name: "topology_service", has_database: true, roles: "tg_topology_read" } - { name: "weather_service", has_database: false, roles: "tg_topology_read" } + - { name: "anomaly_detection", has_database: false, roles: "tg_topology_read" } loop_control: loop_var: service diff --git a/nms_stack/nms_cli/nms_stack/roles/msa/templates/anomaly_detection/service_config.json b/nms_stack/nms_cli/nms_stack/roles/msa/templates/anomaly_detection/service_config.json new file mode 100644 index 000000000..d8943fc6f --- /dev/null +++ b/nms_stack/nms_cli/nms_stack/roles/msa/templates/anomaly_detection/service_config.json @@ -0,0 +1,19 @@ +{ + "num_consumers": 10, + "pipelines": { + "pipeline 1": { + "period_s": 1800, + "jobs": [ + { + "name": "ad_single_metric_job", + "enabled": true, + "params": { + "metric" : "snr", + "step_s" : 30, + "duration_s" : 86400 + } + } + ] + } + } + } diff --git a/nms_stack/nms_cli/nms_stack/roles/msa/templates/docker-compose.yml b/nms_stack/nms_cli/nms_stack/roles/msa/templates/docker-compose.yml index 896fb6dec..16525f054 100644 --- a/nms_stack/nms_cli/nms_stack/roles/msa/templates/docker-compose.yml +++ b/nms_stack/nms_cli/nms_stack/roles/msa/templates/docker-compose.yml @@ -154,6 +154,24 @@ services: max-size: "20m" max-file: "10" + anomaly_detection: + image: {{ msa_anomaly_detection_image }} +{% if keycloak_enabled %} + env_file: + - {{ msa_gfs_path }}/env/anomaly_detection_auth.env +{% endif %} + volumes: + - {{ msa_gfs_path }}/config/config.json:/usr/local/anomaly_detection/config.json + - {{ msa_gfs_path }}/config/anomaly_detection/service_config.json:/usr/local/anomaly_detection/service_config.json + networks: + - terragraph_net + command: ["/bin/sh", "-c", "anomaly_detection"] + logging: + driver: "json-file" + options: + max-size: "20m" + max-file: "10" + networks: terragraph_net: external: true diff --git a/nms_stack/nms_cli/nms_stack/roles/stats/templates/prometheus.yml b/nms_stack/nms_cli/nms_stack/roles/stats/templates/prometheus.yml index eca87e8f6..8d4def540 100644 --- a/nms_stack/nms_cli/nms_stack/roles/stats/templates/prometheus.yml +++ b/nms_stack/nms_cli/nms_stack/roles/stats/templates/prometheus.yml @@ -67,3 +67,8 @@ scrape_configs: metrics_path: /metrics static_configs: - targets: ['weather_service:8080'] + + - job_name: 'anomaly_detection' + metrics_path: /metrics + static_configs: + - targets: ['anomaly_detection:8080'] diff --git a/nms_stack/nms_cli/tests/configs/k8s_config.yml b/nms_stack/nms_cli/tests/configs/k8s_config.yml index e06d85dc1..603c133b9 100644 --- a/nms_stack/nms_cli/tests/configs/k8s_config.yml +++ b/nms_stack/nms_cli/tests/configs/k8s_config.yml @@ -255,6 +255,12 @@ msa_services: provider: climacell api_key: 1234 roles: "tg_topology_read" + anomaly_detection: + uses_database: false + image: secure.cxl-terragraph.com:443/anomaly_detection:stable + command: "anomaly_detection" + enabled: true + roles: "tg_topology_read" E2E_CONFIG_FILE: cfg/controller_config.json E2E_TOPOLOGY_FILE: e2e_topology.conf