Painless open source anomaly detection for your metrics! πππ
Check out this Data Engineering Podcast where we discussed Anomstack and anomaly detection in general.
Note: If you are already using Airflow then also checkout the
airflow-provider-anomaly-detection
package.
- What is Anomstack?
- Project structure
- Quickstart
- Adding your metrics
- Visualization
- Concepts
- Alerts
- LLM Alerts
- Contributing
Supported sources and databases for your metrics to live in and be queried from:
Python | BigQuery | Snowflake | DuckDB | SQLite | Redshift |
---|---|---|---|---|---|
β | β | β | β | β | π§ |
Supported storage for your trained models:
Local | GCS | S3 | Azure Blob |
---|---|---|---|
β | β | β | π§ |
Supported ways to receive alerts:
Slack | |
---|---|
β | β |
Supported ways to run this project:
Python Env | Docker | Dagster Cloud | GitHub Codespaces |
---|---|---|---|
β | β | β | β |
Anomstack is a lightweight (README buzzword bingo alert!) data app built on top of dagster (for orchestration) that lets you easily get great anomaly detection (using pyod
for the ML stuff) for your metrics (whatever data platform you use) with as little pain as physically possible.
It's similar in scope and goal to this Airflow Anomaly Detection provider i also made, but easier to get going since does not require airflow and so easier to set up and run yourself or via Dagster Cloud in a serverless manner.
- Define your metrics (part of a "metric batch") in a
.sql
file and corresponding config in a.yaml
file. You can also define your own custom python ingest function instead of just SQL, check out thepython_ingest_simple
example. - Run Anomstack and it will automatically ingest, train, score, and alert ("jobs") on your metrics and detect anomalies (alerts via email/slack etc.).
- Get alerts when metrics look anomalous.
It's still too hard and messy to get decent out of the box anomaly detection on your metrics with minimal fuss. You either have to build some custom solution yourself or buy some sexy modern data stack tool that does it for you. This project aims to make it as easy as possible to get anomaly detection on your metrics without having to buy anything or build anything from scratch yourself.
Here is a list of features of Anomstack (emoji alert warning!)
- π - You bring your metrics Anomstack will do the ML (β€οΈPyOD).
- π - Easy to run yourself or via Dagster Cloud.
- βοΈ - Very flexible config, you can see all params in
defaults.yaml
and override them in each metric batch config. - π§ - Ability to define your own custom python ingest function instead of just SQL, check out the
python_ingest_simple
example. - π οΈ - Ability to define your own custom python preprocess function instead of the default at
/metrics/defaults/python/preprocess.py
. - π§ - Email alerting with fancy(ish) ascii art plots of your metrics and anomaly scores.
- π¬ - Slack alerts too (want to make these nicer).
- π€ - LLM based alerts (ChatGPT) - see LLM Alerts. p.s. they don't work great yet - experimental :)
- π - Ability to ingest at whatever frequency you want and then agg to a different level for training/scoring, see
freq
example. - π - Plot jobs so you can just eyeball your metrics in Dagster job logs, see #dagster-ui-plots.
- ποΈ - Minimal infrastructure requirements, Anomstack just reads from and writes to whatever database you use.
- π - A nice little local Streamlit dashboard to visualize your metrics and anomaly scores, see #streamlit.
- π¦ - Dockerized for easy deployment.
- π - Scores & Alerts saved to database so you can query them and do whatever you want with them.
- π·οΈ - Add custom metric tags for more complex alert routing e.g. priority or subject area based.
- π - Change detection jobs out of the box.
- π΄ - Ability to snooze alerts for a period of time to reduce repeated and duplicate alerts.
- ποΈ - Daily summary emails.
flowchart LR;
metric_batch_config[".yaml"]
metric_batch_sql[".sql"]
metric_batch_ingest_py["ingest.py"]
metric_batch_preprocess_py["preprocess.py"]
ingest[[ingest]]
train[[train]]
score[[score]]
alert[[alert]]
change[[change]]
llmalert[[llmalert]]
plot[[plot]]
dashboardpy["dashboard.py"]
subgraph metric_batch
metric_batch_config
metric_batch_sql
metric_batch_ingest_py
metric_batch_preprocess_py
end
subgraph dagster_jobs
ingest
train
score
alert
change
llmalert
plot
end
subgraph alerts
email
slack
end
subgraph datasources
duckdb
bigquery
snowflake
python
end
subgraph user_inputs
metric_batch
end
subgraph anomstack
dagster_jobs
datasources
model_store
alerts
llmalert
dashboard
end
subgraph model_store
local
gcs
s3
end
subgraph dashboard
dashboardpy
end
ingest --> train
train --> score
score --> alert
score --> llmalert
score --> plot
ingest --> change
change --> alert
metric_batch --> dagster_jobs
alert --> email
alert --> slack
llmalert --> email
llmalert --> slack
datasources <--> dagster_jobs
train --> model_store
model_store --> score
datasources --> dashboard
Core to what Anomstack is doing in reading from and appending to a "Metrics" table for each metric batch. This is a "long" format table where new metrics are appended to the table as they come in or are defined and configured as you add new metric batches.
Here are the columns in the metrics table:
metric_timestamp
: Timestamp of the metric (Defined iningest_sql
oringest_fn
).metric_batch
: Name of the metric batch (Defined frommetric_batch
in the yaml config for the batch).metric_name
: Name of the metric (Defined iningest_sql
oringest_fn
).metric_type
: Type of the metric the row relates to.metric
for the raw metric value.score
for the anomaly score (a float from 0-1).alert
for an alert (a 1 when an alert was raised).
metric_value
: Value of the metric (coming from the ingest, score, or alert jobs (see concepts for more details).
SELECT
metric_timestamp,
metric_batch,
metric_name,
metric_type,
metric_value,
FROM
`metrics.metrics`
WHERE
metric_batch = 'gsod'
and
metric_name = 'gsod_us_temp_avg'
ORDER BY metric_timestamp DESC
limit 10
/*
+--------------------------+------------+----------------+-----------+------------+
|metric_timestamp |metric_batch|metric_name |metric_type|metric_value|
+--------------------------+------------+----------------+-----------+------------+
|2023-11-12 00:00:00.000000|gsod |gsod_us_temp_avg|score |1 |
|2023-11-12 00:00:00.000000|gsod |gsod_us_temp_avg|score |1 |
|2023-11-12 00:00:00.000000|gsod |gsod_us_temp_avg|alert |1 |
|2023-11-12 00:00:00.000000|gsod |gsod_us_temp_avg|metric |44.4758 |
|2023-11-11 00:00:00.000000|gsod |gsod_us_temp_avg|score |1 |
|2023-11-11 00:00:00.000000|gsod |gsod_us_temp_avg|score |1 |
|2023-11-11 00:00:00.000000|gsod |gsod_us_temp_avg|score |1 |
|2023-11-11 00:00:00.000000|gsod |gsod_us_temp_avg|metric |46.3212 |
|2023-11-11 00:00:00.000000|gsod |gsod_us_temp_avg|score |1 |
|2023-11-11 00:00:00.000000|gsod |gsod_us_temp_avg|metric |46.3212 |
+--------------------------+------------+----------------+-----------+------------+
*/
Of course you can easily pivot this table to get a slightly more "wide" format table if you prefer and is easier for working with your analytics tools etc.
SELECT
metric_timestamp,
metric_batch,
metric_name,
avg(if(metric_type='metric', metric_value, null)) as metric_value,
avg(if(metric_type='score', metric_value, null)) as metric_score,
max(if(metric_type='alert', metric_value, 0)) as metric_alert,
FROM
`metrics.metrics`
WHERE
metric_batch = 'gsod'
and
metric_name = 'gsod_us_temp_avg'
GROUP BY 1,2,3
ORDER BY metric_timestamp DESC
limit 10
/*
+--------------------------+------------+----------------+------------------+------------+------------+
|metric_timestamp |metric_batch|metric_name |metric_value |metric_score|metric_alert|
+--------------------------+------------+----------------+------------------+------------+------------+
|2023-11-12 00:00:00.000000|gsod |gsod_us_temp_avg|44.4758 |1 |1 |
|2023-11-11 00:00:00.000000|gsod |gsod_us_temp_avg|46.3212 |1 |1 |
|2023-11-10 00:00:00.000000|gsod |gsod_us_temp_avg|47.51435 |1 |0 |
|2023-11-08 00:00:00.000000|gsod |gsod_us_temp_avg|51.7557 |1 |0 |
|2023-11-07 00:00:00.000000|gsod |gsod_us_temp_avg|54.1946 |1 |0 |
|2023-11-06 00:00:00.000000|gsod |gsod_us_temp_avg|53.8131 |1 |0 |
|2023-11-05 00:00:00.000000|gsod |gsod_us_temp_avg|52.0883 |1 |0 |
|2023-11-04 00:00:00.000000|gsod |gsod_us_temp_avg|47.8 |1 |0 |
|2023-11-03 00:00:00.000000|gsod |gsod_us_temp_avg|48.752422407267225|1 |0 |
|2023-11-02 00:00:00.000000|gsod |gsod_us_temp_avg|38.999010833725855|1 |0 |
+--------------------------+------------+----------------+------------------+------------+------------+
*/
Here as some specific examples, there are lots more in the ./metrics/examples/
folder.
In ./metrics/examples/hackernews/
you will find an example of using a customer Python function (hn_top_stories_scores.py
) to pull current top 10 stories from HackerNew API and derive some metrics based on their score. This is all defined in the hn_top_stories_scores.yaml
` configuration file for this metric batch.
In ./metrics/examples/gsod/
you will find an example of just defining some sql to derive a metric batch on data already in BigQuery (gsod.sql
) and ingest it into a table called metrics
in a metrics
dataset in a Google Bigquery project. This is all defined in the gsod.yaml
` configuration file for this metric batch.
In ./metrics/examples/weather/
you will find an example of using a customer Python function (ingest_weather.py
) to pull current temperature data for some cities from the Open Meteo API and ingest it into a table called metrics
in a metrics
dataset in a Google Bigquery project. This is all defined in the weather.yaml
` configuration file for this metric batch.
In ./metrics/examples/yfinance/
you will find an example of using a customer Python function (yfinance.py
) to pull current stock price data for some stocks and ingest it into a table called metrics
in a metrics
dataset in a Google Bigquery project. This is all defined in the yfinance.yaml
` configuration file for this metric batch.
./anomstack
source code for Anomstack../metrics
metrics.sql
and.yaml
configuration files. This is where you define your metrics (check outexamples
folder). Defaults params etc live indefaults
folder indefaults.yaml
.
Below are some quick start instructions for getting up and running with Anomstack and a local db using duckdb and some example metrics.
For proper use you would need to set up all your metrics and environment variables etc, but this should get you started.
By default Anomstack will run on port 3000, so you can go to http://localhost:3000 to see the dagster UI. You can then enable the jobs you want to run and see them run in the UI.
Note: you will need to wait for it to run a dozen or so ingest jobs before there is enough data for train, score and alert jobs to run successfully.
There are some more detailed instructions (WIP) in /docs/deployment/
.
You can run Anomstack using docker in a GitHub Codespace. This is a great way to get started and familiar with Anomstack without having to install or run anything locally.
You can see the .devcontainer
folder for the config used to run Anomstack in a codespace and the post create script post_create_command.sh
for the commands the devcontainer will run to get Anomstack up and running.
You can run this project in Dagster Cloud. Fork the repo (or make a completely new repo using the andrewm4894/anomstack
GitHub template) and then follow the instructions here to deploy to Dagster Cloud from your forked repo.
You can then manage you metrics via PR's in your GitHub repo (here is a PR to add Google Trends metrics) and run them in Dagster Cloud which will just sync with your repo.
To get started with Anomstack, you can run it via docker compose.
# clone repo
git clone https://github.com/andrewm4894/anomstack.git
# clone repo at specific release tag
# git clone -b v0.0.1 https://github.com/andrewm4894/anomstack.git
# cd into project
cd anomstack
# generate your .env file based on example
cp .example.env .env
# run docker compose up to start anomstack
docker compose up -d
# anomstack should now be running on port 3000
To update and rebuild after adding metrics or changing code, you can run:
# rebuild docker compose
docker compose build
# run docker compose up to re-start anomstack
docker compose up -d
You can also run Anomstack locally via a python virtual env.
# git clone
git clone https://github.com/andrewm4894/anomstack.git
# clone repo at specific release tag
# git clone -b v0.0.1 https://github.com/andrewm4894/anomstack.git
# cd into project
cd anomstack
# make virtual env
python3 -m venv .venv
# activate virtual env
source .venv/bin/activate
# install deps
pip3 install -r requirements.txt
# cp example env file
cp .example.env .env
# run locally
dagster dev -f anomstack/main.py
# anomstack should now be running on port 3000
To add metrics, you can add them to the metrics
folder. You can see some examples in the metrics/examples
folder.
For example, here is the PR that added Google Trends metrics to the examples.
You can customize the default params for your metrics in the metrics/defaults
folder.
Environment variables for your metrics can be set in the .env
file (see .example.env
for examples and comments) or in the docker-compose.yml
file.
Visualization of the metrics and anomaly scores is a bit outside the scope of this project, but we do provide a couple of ways to visualize your metrics and anomaly scores.
Within Dagster there is the plot.py
job to generate some plots of your metrics and anomaly scores for quick eyeballing within the dagster UI.
You can also use the little streamlit app in ./dashboard.py
to visualize your metrics and anomaly scores.
# run streamlit app
streamlit run .\dashboard.py
...Or you can run it via make dashboard
.
- "Metric Batch": You configure metric batches in Anomstack. A metric batch is a collection of metrics that you want to run together and with its own separate set of parameters. Of course a metric batch can contain just one metric if you want but typically it makes more sense to group metrics in ways that make sense for you. A metric batch is just some SQL or custom Python that results in a Pandas DataFrame with
metric_timestamp
,metric_name
andmetric_value
columns. - "Jobs": At the core Anomstack runs a few jobs (Dagster Jobs) for each metric batch. These jobs are:
- "Ingest" (
ingest.py
): This job runs the sql query (or Python function) for the metric batch and ingests the data into the database. - "Train" (
train.py
): This job trains a model for each metric. - "Score" (
score.py
): This job scores metrics using the latest trained model for each metric. - "Alert" (
alert.py
): This job alerts you when the metric looks anomalous. - "LLM Alert" (
llmalert.py
): This job alerts you when the metric looks anomalous as decided by a LLM (ChatGPT). - "Plot" (
plot.py
): This job plots metric values and scores for a batch at regular intervals so you can see some charts from within the Dagster UI.
- "Ingest" (
Check out more example alerts in the anomaly gallery.
Anomstack supports alerts via email and slack. You can configure these in the .env
file (see .example.env
for examples and comments).
Below is an example of an alert via email. It has some ascii art plotting recent metric values and which observations were anomalous. Attached is a png plot with more details.
And the attached plot will look something like this:
Yes! I have managed to find a way to ram a large language model (LLM) into this project. But you know what, it might just work...
Update: It works horribly, but it works! π€£. Still need to do a lot more prompt engineering to get this to work well, but it's a start.
Idea here is to just send the metric data and prompt to a LLM (ChatGPT) and ask it if it thinks the metric looks anomalous. If it does, we alert.
Note: If you don't want to send your metric data to OpenAI then just set disable_llmalert
to True
in your metric batch config.
Click to see some LLM Alert screenshots
Below you see an example of an LLM alert via email. In this case we add a description of the reasoning from the LLM around why it thinks the metric looks anomalous.
Read the contributing guide to learn about our development process, how to propose bugfixes and improvements, and how to build and test your changes to Anomstack.