Skip to content

Commit

Permalink
Simple AD service
Browse files Browse the repository at this point in the history
Summary:
Simple anomaly detection framework.

```
{
    "num_consumers": 10,
    "pipelines": {
      "pipeline 1": {
        "period_s": 1800,
        "jobs": [
          {
            "name": "ad_single_metric_job",
            "enabled": true,
            "params": {
                "metric" : "snr",
                "step_s" : 30,
                "duration_s" : 86400
            }
          }
        ]
      }
    }
  }

```
- The service has a single job that runs two AD algorithms for single metric data.
- The 'metric' field can be any link stat in the Prometheus DB.
- The job runs each period_s, loads link data for the past duration_s, sampled at step_s.
- The service generates AD samples (1 for no anomaly, -1 for anomaly) sampled at step_s for the past period_s (the rest of the data up to duration_s is only to use as historical data for anomaly detection). However, since the period_s is in the past, the samples are written with a delay of period_s. So an anomaly at UTC time 0 will show up as -1 at UTC time period_s in the time-series

Reviewed By: pohanhf, aclave1

Differential Revision: D35087134

fbshipit-source-id: 6e0bcc44685084d7d904cad5f657ee7640381b73
  • Loading branch information
halmehmood authored and facebook-github-bot committed Mar 31, 2022
1 parent a56b373 commit a6eb3a3
Show file tree
Hide file tree
Showing 19 changed files with 486 additions and 1 deletion.
17 changes: 17 additions & 0 deletions anomaly_detection/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Pull base image
ARG TAG=latest
FROM ghcr.io/terragraph/tglib:$TAG

ARG WORKDIR=/usr/local/anomaly_detection

# Copy anomaly_detection source
WORKDIR $WORKDIR
COPY anomaly_detection/ .flake8 ./

RUN echo "http://dl-cdn.alpinelinux.org/alpine/v3.13/main" >> /etc/apk/repositories && \
echo "http://dl-cdn.alpinelinux.org/alpine/v3.13/community" >> /etc/apk/repositories

# Install my_service
RUN apk add py3-numpy && \
apk add py3-scikit-learn && \
pip install .
Empty file.
232 changes: 232 additions & 0 deletions anomaly_detection/anomaly_detection/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.
import asyncio
import logging
import math
import time
from collections import defaultdict
from copy import deepcopy
from typing import Dict, Optional, List, Iterable

import numpy as np
from sklearn.covariance import EllipticEnvelope
from sklearn.exceptions import NotFittedError
from sklearn.impute import KNNImputer
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import MinMaxScaler
from tglib.clients import APIServiceClient
from tglib.clients.prometheus_client import consts, PrometheusClient, PrometheusMetric
from tglib.exceptions import ClientRuntimeError


async def ad_single_metric_job(
time_ms: int, metric: str, step_s: int, duration_s: int, period_s: int
) -> None:
"""This job loads the given link metric data from Prometheus for the past duration_s
sampled at step_s and runs two AD algorithms on it. It generates AD samples (1 for no
anomaly, -1 for anomaly) sampled at step_s for the past period_s (the rest of the data
up to duration_s is historical data for anomaly detection).
However, since samples cannot be written into Prometheus in the past, they are written
with a delay of period_s. So an anomaly at time 0 will show up as -1 at time period_s
in the time-series written into Prometheus."""

end_time = math.ceil(time_ms / 1e3 / step_s) * step_s
num_samples = math.ceil(duration_s / step_s)
start_time = end_time - num_samples * step_s
logging.info(
f"Running anomaly detection job for {metric} from {start_time} to {end_time}"
)

# Divide time into max_prom_samples chunks because only max_prom_samples
# can be read at one time
max_prom_samples = 11000
num_iterations = math.ceil(num_samples / max_prom_samples)

time_steps = [
[
start_time + i * max_prom_samples * step_s + step_s,
start_time + (i + 1) * max_prom_samples * step_s,
]
for i in range(num_iterations)
]
time_steps[-1][1] = end_time

network_names = APIServiceClient.network_names()
prom_client = PrometheusClient(timeout=15)
network_reps = []
coros = []

for network_name in network_names:
labels = {
consts.network: network_name,
consts.data_interval_s: str(step_s),
}
query = [prom_client.format_query(metric, labels)]
for query_start_time, query_end_time in time_steps:
coros.append(
fetch_metrics_from_queries(
prom_client,
network_name,
query,
query_start_time,
query_end_time,
step_s,
)
)
network_reps.append(network_name)

# Load link metric data for all networks
network_stats = zip(network_reps, await asyncio.gather(*coros))

# Reshape data and run AD
label_val_map = gather_data(network_stats, start_time, end_time, step_s)
stats_to_write = analyze_data(label_val_map, metric, end_time, step_s, period_s)

# Write AD samples back to Prometheus every step_s with a time delay
num_write = math.ceil(period_s / step_s)
for i in range(-num_write, 0):
logging.info(f"Writing to Prometheus for index {i}")
PrometheusClient.write_metrics(stats_to_write[i])
await asyncio.sleep(step_s)

return None


async def fetch_metrics_from_queries(
client: PrometheusClient,
network_name: str,
queries: List[str],
start_time: int,
end_time: int,
step: int,
) -> Optional[Dict]:
"""Fetch latest metrics for all links in the network"""
coros = []
for query in queries:
coros.append(
client.query_range(query, step=f"{step}s", start=start_time, end=end_time)
)
try:
results: Dict = {}
for query, response in zip(queries, await asyncio.gather(*coros)):
if response["status"] != "success":
logging.error(f"Failed to fetch {query} data for {network_name}")
continue
results[query] = response["data"]["result"]
return results
except ClientRuntimeError:
logging.exception("Failed to fetch metrics from Prometheus.")
return None


def gather_data(
network_stats: Iterable, start_time: int, end_time: int, step: int
) -> Dict:
"""This function takes Prometheus data and reshapes it into a multi-level
dictionary of network name to link name to link dir to list of values."""

label_val_map: defaultdict = defaultdict(
lambda: defaultdict(lambda: defaultdict(list))
)
for network, prom_results in network_stats:
if prom_results is None:
continue
for query, values in prom_results.items():
logging.info(f"Processing data for network {network} and metric {query}")
if not values:
logging.debug(f"Found no {query} results for {network}")
continue
for result in values:
link_name = result["metric"][consts.link_name]
link_dir = result["metric"][consts.link_direction]
val_array = label_val_map[network][link_name][link_dir]
if len(val_array) == 0:
# Create empty array of length equal to duration_s sampled at step_s
val_array = [np.nan] * int((end_time - start_time) / step)
label_val_map[network][link_name][link_dir] = val_array
for timestamp, metric_value in result["values"]:
# Put values at the approporate index of array based on timestamp
val_array[int((int(timestamp) - start_time) / step - 1)] = int(
metric_value
)
return label_val_map


def analyze_data(
label_val_map: Dict, metric: str, end_time: int, step: int, period: int
) -> Dict[int, List]:
"""This function unpacks the label_val_map dictionary into labels of network name,
link name and link direction. It creates new Prometheus samples that are 1 or -1
based on running AD algorithm with these labels. The new AD stats are collected in
the dict stats_to_write in order to be written one by one to Prometheus with a
time delay."""

num_write = math.ceil(period / step)
stats_to_write: Dict[int, List] = {i: [] for i in range(-num_write, 0)}
for network, depth1 in label_val_map.items():
for link_name, depth2 in depth1.items():
for link_dir, val_array in depth2.items():

labels = {
consts.network: network,
consts.link_name: link_name,
consts.link_direction: link_dir,
"delay": f"{period}-sec",
}

outliers = run_ad(val_array)
for model, pred in outliers.items():

labels_model = deepcopy(labels)
labels_model["model"] = model
for i in range(-num_write, 0):
stats_to_write[i].append(
PrometheusMetric(
f"ad_{metric}",
labels_model,
pred[i],
int((end_time + step * (i + num_write)) * 1000),
)
)

return stats_to_write


def run_ad(train: List[float]) -> Dict[str, List]:
"""This function runs two AD algorithms on the input data."""
contamination = 0.001
anomaly_algorithms = [
("RobustCovariance", EllipticEnvelope(contamination=contamination)),
(
"LocalOutlierFactor",
LocalOutlierFactor(contamination=contamination, n_neighbors=100),
),
]

# Impute and scale data
imputer = KNNImputer(missing_values=np.nan, n_neighbors=10, weights="uniform")
scaler = MinMaxScaler()
train = np.array(train).reshape(-1, 1)
train_imp = imputer.fit_transform(train)
train_scaled = scaler.fit_transform(train_imp)

# Run anomaly detection algorithms
y_pred = {}
for model_name, algorithm in anomaly_algorithms:
t0 = time.time()
try:
algorithm.fit(train_scaled)
t1 = time.time()
if model_name in ["LocalOutlierFactor"]:
pred = algorithm.fit_predict(train_scaled)
else:
pred = algorithm.fit(train_scaled).predict(train_scaled)
y_pred[model_name] = pred
logging.debug(f"Ran {model_name} algorithm in {t1 - t0} seconds")
logging.info(
f"{model_name} has {sum(pred==-1)} outliers in {len(train)} samples"
)
except: # noqa
logging.info(f"Could not run {model_name} algorithm")

return y_pred
101 changes: 101 additions & 0 deletions anomaly_detection/anomaly_detection/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.

import asyncio
import dataclasses
import json
import logging
import sys
import time
from typing import Any, Dict, NoReturn

from tglib import init
from tglib.clients import APIServiceClient, PrometheusClient

from . import jobs


@dataclasses.dataclass
class Job:
"""Struct for representing pipeline job configurations."""

name: str
start_time_ms: int
params: Dict


async def produce(
queue: asyncio.Queue, name: str, pipeline: Dict[str, Any]
) -> NoReturn:
"""Add jobs from the pipeline configuration to the shared queue."""
while True:
start_time = time.time()
tasks = []
for job in pipeline.get("jobs", []):
if job.get("enabled", False):
params = job.get("params", {})
params.update({"period_s": pipeline["period_s"]})
tasks.append(
queue.put(
Job(
name=job["name"],
start_time_ms=int(round(start_time * 1e3)),
params=params,
)
)
)

# Add the jobs to the queue
await asyncio.gather(*tasks)

# Sleep until next invocation period
sleep_time = start_time + pipeline["period_s"] - time.time()

logging.info(
f"Done enqueuing jobs in '{name}'. "
f"Added {len(tasks)} job(s) to the queue. Sleeping for {sleep_time:0.2f}s"
)

await asyncio.sleep(sleep_time)


async def consume(queue: asyncio.Queue) -> NoReturn:
"""Consume and run a job from the shared queue."""
while True:
# Wait for a job from the producers
job = await queue.get()
logging.info(f"Starting the '{job.name}' job")

# Execute the job
function = getattr(jobs, job.name)
await function(job.start_time_ms, **job.params)
logging.info(f"Finished running the '{job.name}' job")


async def async_main(config: Dict[str, Any]) -> None:
logging.info("#### Starting the 'anomaly_detection' service ####")
logging.debug(f"service config: {config}")

q: asyncio.Queue = asyncio.Queue()

# Create producer coroutines
producers = [
produce(q, name, pipeline) for name, pipeline in config["pipelines"].items()
]

# Create consumer coroutines
consumers = [consume(q) for _ in range(config["num_consumers"])]

# Start the producer and consumer coroutines
await asyncio.gather(*producers, *consumers)


def main() -> None:
try:
with open("./service_config.json") as f:
config = json.load(f)
except (json.JSONDecodeError, OSError, KeyError):
logging.exception("Failed to parse configuration file.")
sys.exit(1)

init(lambda: async_main(config), {APIServiceClient, PrometheusClient})
6 changes: 6 additions & 0 deletions anomaly_detection/mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[mypy]
check_untyped_defs = True
ignore_missing_imports = True
show_error_context = True
warn_unused_ignores = True
warn_return_any = True
8 changes: 8 additions & 0 deletions anomaly_detection/ptrconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[ptr]
venv_pkgs =
black
coverage
flake8
mypy
pip
setuptools
28 changes: 28 additions & 0 deletions anomaly_detection/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.

from setuptools import find_packages, setup

ptr_params = {
"disabled": True,
"entry_point_module": "anomaly_detection/main",
"test_suite": "tests.base",
"test_suite_timeout": 300,
"required_coverage": {},
"run_flake8": True,
"run_black": True,
"run_mypy": True,
}

setup(
name="anomaly_detection",
version="2022.03.31",
packages=find_packages(exclude=["tests"]),
python_requires=">=3.7",
install_requires=["numpy>=1.16.4,<2.0"],
extras_require={"ci": ["ptr"], "docs": ["aiohttp-swagger>=1.0.9,<2.0"]},
test_suite=ptr_params["test_suite"],
entry_points={
"console_scripts": ["anomaly_detection = anomaly_detection.main:main"]
},
)
Empty file.
Loading

0 comments on commit a6eb3a3

Please sign in to comment.