Skip to content

Commit

Permalink
Merge pull request #16 from vulogov/0.7.4
Browse files Browse the repository at this point in the history
0.7.4
  • Loading branch information
vulogov authored Jun 10, 2024
2 parents bc70a68 + 6794d02 commit c7f07d4
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 4 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zbusdg"
version = "0.7.3"
version = "0.7.4"
edition = "2021"
description = "Universal Data Gateway for ZBUS project"
license-file = "LICENSE"
Expand Down Expand Up @@ -46,6 +46,7 @@ markov-chain = "0.1.1"
decorum = "0.3.1"
anomaly_detection = "0.3.0"
breakout = "0.2.1"
prometheus-parse = "0.2.5"

[dependencies.rhai]
version = "1.18.*"
Expand Down
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ zbusdg --zabbix-api http://127.0.0.1/zabbix gateway --zabbix --nats --zabbix-to
zbusdg --zabbix-api http://127.0.0.1/zabbix gateway --nats-catcher --zabbix-token zabbixtoken --stdout --pretty
```

### Catching processor ZBUS

When selected with CLI keyword --zbus-catcher, ZBUSUDG starts catching thread from ZBUS telemetry bus by subscribing to the topic specified by --zbus-subscribe-key. In this example we are catching metrics from telemetry bus and sending them to standard output.

```bash
zbusdg --zabbix-api http://127.0.0.1/zabbix gateway --zbus-catcher --stdout --pretty
```

### Catching processor PROMETHEUS_EXPORTER

When selected with CLI keyword --prometheus-exporter-catcher, ZBUSUDG starts collecting thread that will scrapte metrics from Prometheus exporters and convert them to ZBUS telemetry format. In this example we are scrapting Prometheus telemetry and sending them to standard output.

```bash
zbusdg --zabbix-api http://127.0.0.1/zabbix gateway --prometheus-exporter-catcher --stdout --pretty
```

## Output processor

The function of the UDG's output processor is to read prepared telemetry from the "OUT" internal pipeline and send it to the proper destination.
Expand Down Expand Up @@ -197,4 +213,4 @@ zbusudg monitor

## Real-time metrics computation

If you want to enable real-time metrics computation, you can use the --analysis CLI argument to activate the "Analysis" mode for the Universal Data Gateway (ZBUSUDG). This mode allows ZBUSUDG to perform real-time statistical computations and forecasts while collecting telemetry data. ZBUSUDG will gather the most recent 128 float-point type telemetry samples to conserve memory when enabled. It will then enhance all collected metrics with additional data attributes such as mean, max, min, variance, standard deviation, statistical oscillation, statistical time series forecast, anomalies detection using statistical analysis, breakouts in a sample and forecasting using Markov chains of the sample.
If you want to enable real-time metrics computation, you can use the --analysis CLI argument to activate the "Analysis" mode for the Universal Data Gateway (ZBUSUDG). This mode allows ZBUSUDG to perform real-time statistical computations and forecasts while collecting telemetry data. ZBUSUDG will gather the most recent 128 float-point type telemetry samples. Then it will then enhance relevant metric with additional data attributes such as mean, max, min, variance, standard deviation, statistical oscillation, statistical time series forecast, anomalies detection using statistical analysis, breakouts in a sample and forecasting using Markov chains of the sample.
23 changes: 22 additions & 1 deletion src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod zbus_gateway_processor_passthrough;
pub mod zbus_gateway_processor_filter;
pub mod zbus_gateway_processor_transformation;
pub mod zbus_gateway_processor_analysis;
pub mod zbus_gateway_processor_prometheus;
pub mod zbus_gateway_stdout_sender;
pub mod zbus_gateway_zbus_sender;
pub mod zbus_gateway_nats_sender;
Expand All @@ -28,6 +29,8 @@ pub mod zbus_gateway_clickhouse_sender;
pub mod zbus_gateway_tcpsocket_sender;
pub mod zbus_gateway_catcher_zabbix;
pub mod zbus_gateway_catcher_nats;
pub mod zbus_gateway_catcher_zbus;
pub mod zbus_gateway_catcher_prometheus_scraper;
pub mod zbus_version;
pub mod zbus_login;
pub mod zbus_json;
Expand Down Expand Up @@ -178,6 +181,9 @@ pub struct Gateway {
#[clap(help="ZBUS address", long, default_value_t = String::from(env::var("ZBUS_ADDRESS").unwrap_or("tcp/127.0.0.1:7447".to_string())))]
pub zbus_connect: String,

#[clap(help="ZBUS address for the catcher", long, default_value_t = String::from(env::var("ZBUS_CATCH_ADDRESS").unwrap_or("tcp/127.0.0.1:7447".to_string())))]
pub zbus_catcher_connect: String,

#[clap(help="NATS address", long, default_value_t = String::from(env::var("NATS_ADDRESS").unwrap_or("127.0.0.1:4222".to_string())))]
pub nats_connect: String,

Expand All @@ -190,6 +196,9 @@ pub struct Gateway {
#[clap(help="ZBUS aggregate key", long, default_value_t = String::from("aggregation"))]
pub zbus_aggregate_key: String,

#[clap(help="ZBUS key from which catcher will receive telemetry", long, default_value_t = String::from("aggregation"))]
pub zbus_subscribe_key: String,

#[clap(help="NATS aggregate key", long, default_value_t = String::from("aggregation"))]
pub nats_aggregate_key: String,

Expand All @@ -211,6 +220,9 @@ pub struct Gateway {
#[clap(help="CLICKHOUSE database", long, default_value_t = String::from("zbus"))]
pub clickhouse_database: String,

#[clap(help="Prometheus exporter endpoints", long)]
pub prometheus_exporter_connect: Vec<String>,

#[clap(long, action = clap::ArgAction::SetTrue, help="Disable multicast discovery of ZENOH bus")]
pub zbus_disable_multicast_scout: bool,

Expand All @@ -232,6 +244,9 @@ pub struct Gateway {
#[clap(long, default_value_t = 7, help="Width of anomalies window")]
pub anomalies_window: usize,

#[clap(long, default_value_t = 120, help="Delay (in seconds) between prometheus scraper run")]
pub prometheus_scraper_run_every: u16,

#[clap(flatten)]
catchers: CatcherArgGroup,

Expand Down Expand Up @@ -263,7 +278,7 @@ pub struct GatewayArgGroup {
#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to TELEGRAF")]
pub telegraf: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to TELEGRAF")]
#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to CLICKHOUSE")]
pub clickhouse: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to NONE")]
Expand All @@ -279,6 +294,12 @@ pub struct CatcherArgGroup {
#[clap(long, action = clap::ArgAction::SetTrue, help="Catch telemetry from NATS")]
pub nats_catcher: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Catch telemetry from ZBUS")]
pub zbus_catcher: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Receive telemetry from Prometheus scraper")]
pub prometheus_exporter_catcher: bool,

}

#[derive(Subcommand, Clone, Debug)]
Expand Down
8 changes: 8 additions & 0 deletions src/cmd/zbus_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub fn run(c: &cmd::Cli, gateway: &cmd::Gateway) {
cmd::zbus_gateway_processor::processor(c, gateway);
} else if gateway.catchers.nats_catcher {
cmd::zbus_gateway_processor_passthrough::processor(c, gateway);
} else if gateway.catchers.zbus_catcher {
cmd::zbus_gateway_processor_passthrough::processor(c, gateway);
} else if gateway.catchers.prometheus_exporter_catcher {
cmd::zbus_gateway_processor_prometheus::processor(c, gateway);
} else {
log::error!("Catcher is not specified");
return;
Expand Down Expand Up @@ -62,6 +66,10 @@ pub fn run(c: &cmd::Cli, gateway: &cmd::Gateway) {
cmd::zbus_gateway_catcher_zabbix::catcher(c, gateway);
} else if gateway.catchers.nats_catcher {
cmd::zbus_gateway_catcher_nats::catcher(c, gateway);
} else if gateway.catchers.zbus_catcher {
cmd::zbus_gateway_catcher_zbus::catcher(c, gateway);
} else if gateway.catchers.prometheus_exporter_catcher {
cmd::zbus_gateway_catcher_prometheus_scraper::catcher(c, gateway);
} else {
log::error!("Catcher is not specified");
return;
Expand Down
46 changes: 46 additions & 0 deletions src/cmd/zbus_gateway_catcher_prometheus_scraper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
extern crate log;
use crate::cmd;
use crate::stdlib;
use reqwest;

pub fn catcher(_c: &cmd::Cli, gateway: &cmd::Gateway) {
log::trace!("zbus_gateway_catcher_prometheus_scraper::run() reached");
let gateway = gateway.clone();
match stdlib::threads::THREADS.lock() {
Ok(t) => {
t.execute(move ||
{
log::debug!("Prometheus scraper catcher thread has been started");
loop {
log::debug!("Running Prometheus scrape");
for e in &gateway.prometheus_exporter_connect {
log::debug!("Attempting to scrape: {}", &e);
match reqwest::blocking::get(e.clone()) {
Ok(body) => {
let data: String = match body.text() {
Ok(data) => String::from(data),
Err(err) => {
log::error!("Exporter returned an empty response: {}", err);
continue;
}
};
log::debug!("Scraped len()={} bytes from {}", &data.len(), &e);
stdlib::channel::pipe_push("in".to_string(), data);
}
Err(err) => {
log::error!("Prometheus scraping error: {}", err);
}
}

}
stdlib::sleep::sleep(gateway.prometheus_scraper_run_every.into());
}
});
drop(t);
}
Err(err) => {
log::error!("Error accessing Thread Manager: {:?}", err);
return;
}
}
}
132 changes: 132 additions & 0 deletions src/cmd/zbus_gateway_catcher_zbus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
extern crate log;
use crate::cmd;
use crate::stdlib;
use std::str::FromStr;
use zenoh::prelude::sync::*;
use zenoh::config::{Config, ConnectConfig, ListenConfig, EndPoint, WhatAmI};

pub fn catcher(c: &cmd::Cli, gateway: &cmd::Gateway) {
log::trace!("zbus_gateway_catcher_zbus::run() reached");
let subscribe_key = format!("zbus/metric/{}/{}/{}", &c.protocol_version, &c.platform_name, &gateway.zbus_subscribe_key);
let gateway = gateway.clone();

match stdlib::threads::THREADS.lock() {
Ok(t) => {
t.execute(move ||
{
log::debug!("ZBUS catcher thread has been started");
let mut config = Config::default();

if gateway.zbus_disable_multicast_scout.clone() {
match config.scouting.multicast.set_enabled(Some(false)) {
Ok(_) => { log::debug!("Multicast discovery disabled")}
Err(err) => {
log::error!("Failure in disabling multicast discovery: {:?}", err);
return;
}
}
} else {
log::debug!("Multicast discovery enabled");
}
match EndPoint::from_str(&gateway.zbus_catcher_connect) {
Ok(zconn) => {
log::debug!("ZENOH bus set to: {:?}", &zconn);
let _ = config.set_connect(ConnectConfig::new(vec![zconn]).unwrap());
}
Err(err) => {
log::error!("Failure in parsing connect address: {:?}", err);
return;
}
}
match EndPoint::from_str(&gateway.zbus_listen) {
Ok(zlisten) => {
log::debug!("ZENOH listen set to: {:?}", &zlisten);
let _ = config.set_listen(ListenConfig::new(vec![zlisten]).unwrap());
}
Err(_) => {
log::debug!("ZENOH listen set to default");
}
}
if gateway.zbus_set_connect_mode {
log::debug!("ZENOH configured in CONNECT mode");
let _ = config.set_mode(Some(WhatAmI::Client));
} else {
log::debug!("ZENOH configured in PEER mode");
let _ = config.set_mode(Some(WhatAmI::Peer));
}
if config.validate() {
log::debug!("ZENOH config is OK");
} else {
log::error!("ZENOH config not OK");
return;
}

'outside: loop {
match zenoh::open(config.clone()).res() {
Ok(session) => {
log::debug!("Connection to ZENOH bus succesful");
log::debug!("ZBUS catcher subscribing to: {}", &subscribe_key);
match session.declare_subscriber(&subscribe_key)
.callback_mut(move |sample| {
let slices = &sample.value.payload.contiguous();
match std::str::from_utf8(slices) {
Ok(data) => {
match serde_json::from_str::<serde_json::Value>(&data) {
Ok(zjson) => {
log::debug!("ZBUS catcher received {} bytes", &data.len());
stdlib::channel::pipe_push("in".to_string(), zjson.to_string());
}
Err(err) => {
log::error!("Error while converting JSON data from ZENOH bus: {:?}", err);
}
}
}
Err(err) => {
log::error!("Error while extracting data from ZENOH bus: {:?}", err);
}
}
})
.res() {
Ok(_) => {
let receiver = match zenoh::scout(WhatAmI::Peer, config.clone())
.res() {
Ok(receiver) => receiver,
Err(err) => {
log::error!("ZBUS scout had failed: {}", err);
stdlib::sleep::sleep(5);
continue 'outside;
}
};
log::debug!("Running ZBUS scout to detect the health of connection");
while let Ok(hello) = receiver.recv() {
log::trace!("ZBUS catcher received: {}", hello);
std::thread::yield_now();
}
}
Err(err) => {
log::error!("Telemetry subscribe for key {} failed: {:?}", &subscribe_key, err);
stdlib::sleep::sleep(5);
continue 'outside;
}
}
let _ = session.close();
log::debug!("Connection to ZENOH bus is closed");
stdlib::sleep::sleep(5);
continue 'outside;
}
Err(err) => {
log::error!("Error connecting to ZENOH bus: {:?}", err);
stdlib::sleep::sleep(5);
continue 'outside;
}
}
}
});
drop(t);
}
Err(err) => {
log::error!("Error accessing Thread Manager: {:?}", err);
return;
}
}
}
2 changes: 1 addition & 1 deletion src/cmd/zbus_gateway_processor_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub fn processor(c: &cmd::Cli, gateway: &cmd::Gateway) {
"body": {
"details": {
"details": {
"analythical_data": {
"analytical_data": {
"n_of_samples": sample.len(),
"statistical_oscillator": sample.oscillator(),
"tsf_next": sample.tsf_next(),
Expand Down
Loading

0 comments on commit c7f07d4

Please sign in to comment.