Skip to content

Commit

Permalink
Merge pull request #1 from vulogov/0.2.0
Browse files Browse the repository at this point in the history
0.2.0
  • Loading branch information
vulogov authored May 28, 2024
2 parents 3ad07c7 + 1120d07 commit bc2e7ac
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 2 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zbusdg"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
description = "Universal Data Gateway for ZBUS project"
license-file = "LICENSE"
Expand Down Expand Up @@ -29,3 +29,4 @@ unit_conversion = "0.2.0"
timedmap = "1.0.2"
etime = "0.1.8"
zenoh = "0.10.1-rc"
nats = "0.25.0"
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ Delivery without aggregation,to an individual item keys
zbusdg --zabbix-api http://192.168.86.29/zabbix gateway --zbus --zabbix-token zabbixapitoken
```

### Output processor NATS

Collected telemetry is shipped to the NATS.io server, stored for storage, and could be accessed by any NATS.io client. Delivery could be performed in aggregated or per Zabbix key mode. If aggregated delivery is specified, all telemetry will be delivered to a single key on the bus; otherwise, the gateway will extract a destination key from the telemetry message.

Delivery with telemetry aggregation

```
zbusdg --zabbix-api http://192.168.86.29/zabbix gateway --nats --zabbix-token zabbixapitoken --nats-aggregate --nats-aggregate-key mykey
```

Delivery without aggregation,to an individual item keys

```
zbusdg --zabbix-api http://192.168.86.29/zabbix gateway --nats --zabbix-token zabbixapitoken
```

## Monitor ZBUS submission

In order to verify and debug your gateway, you can run zbusudg in the "monitor mode", where you subscribing to the key on ZBUS and dump on STDOUT all data packets received on that key.
Expand Down
15 changes: 14 additions & 1 deletion src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod zbus_monitor;
pub mod zbus_gateway_processor;
pub mod zbus_gateway_stdout_sender;
pub mod zbus_gateway_zbus_sender;
pub mod zbus_gateway_nats_sender;
pub mod zbus_gateway_tcpsocket_sender;
pub mod zbus_version;
pub mod zbus_login;
Expand Down Expand Up @@ -152,18 +153,27 @@ pub struct Gateway {
#[clap(help="ZBUS address", long, default_value_t = String::from(env::var("ZBUS_ADDRESS").unwrap_or("tcp/127.0.0.1:7447".to_string())))]
pub zbus_connect: String,

#[clap(help="NATS address", long, default_value_t = String::from(env::var("NATS_ADDRESS").unwrap_or("127.0.0.1:4222".to_string())))]
pub nats_connect: String,

#[clap(help="ZBUS listen address", long, default_value_t = String::from_utf8(vec![]).unwrap())]
pub zbus_listen: String,

#[clap(help="ZBUS aggregate key", long, default_value_t = String::from("aggregation"))]
pub zbus_aggregate_key: String,

#[clap(help="NATS aggregate key", long, default_value_t = String::from("aggregation"))]
pub nats_aggregate_key: String,

#[clap(long, action = clap::ArgAction::SetTrue, help="Disable multicast discovery of ZENOH bus")]
pub zbus_disable_multicast_scout: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Aggregate all keys to a single topic")]
#[clap(long, action = clap::ArgAction::SetTrue, help="Aggregate all keys to a single ZBUS topic")]
pub zbus_aggregate: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Aggregate all keys to a single NATS subject")]
pub nats_aggregate: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Configure CONNECT mode for ZENOH bus")]
pub zbus_set_connect_mode: bool,

Expand All @@ -186,6 +196,9 @@ pub struct GatewayArgGroup {
#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to ZBUS")]
pub zbus: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to NATS")]
pub nats: bool,

#[clap(long, action = clap::ArgAction::SetTrue, help="Send catched data to NONE")]
pub none: bool,
}
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/zbus_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub fn run(c: &cmd::Cli, gateway: &cmd::Gateway) {
cmd::zbus_gateway_tcpsocket_sender::sender(c, gateway);
} else if gateway.group.zbus {
cmd::zbus_gateway_zbus_sender::sender(c, gateway);
} else if gateway.group.nats {
cmd::zbus_gateway_nats_sender::sender(c, gateway);
} else if gateway.group.none {
log::info!("Sender is set to NONE");
} else {
Expand Down
87 changes: 87 additions & 0 deletions src/cmd/zbus_gateway_nats_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
extern crate log;
use crate::cmd;
use crate::stdlib;
use nats;
use serde_json::{Deserializer, Value};


pub fn sender(c: &cmd::Cli, gateway: &cmd::Gateway) {
log::trace!("zbus_gateway_nats_sender::run() reached");
let gateway = gateway.clone();
let c = c.clone();
match stdlib::threads::THREADS.lock() {
Ok(t) => {
t.execute(move ||
{
log::debug!("NATS sender thread has been started");

let aggregate_key = format!("zbus/metric/{}/{}/{}", &c.protocol_version, &c.platform_name, &gateway.nats_aggregate_key);
if gateway.nats_aggregate {
log::debug!("Published telemetry will be aggregated to: {}", &aggregate_key);
} else {
log::debug!("Published telemetry will not be aggregated");
}
match nats::connect(gateway.nats_connect.clone()) {
Ok(session) => {
loop {
if ! stdlib::channel::pipe_is_empty_raw("out".to_string()) {
match stdlib::channel::pipe_pull("out".to_string()) {
Ok(res) => {
log::debug!("Received {} bytes by ZBUS processor", &res.len());
let stream = Deserializer::from_str(&res).into_iter::<Value>();
for value in stream {
match value {
Ok(zjson) => {
match serde_json::to_string(&zjson) {
Ok(payload) => {
if gateway.nats_aggregate {
match session.publish(&aggregate_key.clone(), payload.clone()) {
Ok(_) => log::debug!("ZBX catcher->NATS: {} len()={} bytes", &aggregate_key, &payload.len()),
Err(err) => log::error!("Error ingesting {} {:?}: {:?}", &aggregate_key, &payload, err),
}
} else {
let itemkey = match cmd::zbus_gateway_processor::zabbix_json_get_sub_subkey_raw(&zjson, "body".to_string(), "details".to_string(), "destination".to_string()) {
Some(key) => format!("zbus/metric/{}/{}{}", &c.protocol_version, &c.platform_name, key.as_str().unwrap()),
None => continue,
};
match session.publish(&itemkey.clone(), payload.clone()) {
Ok(_) => log::debug!("ZBX catcher->NATS: {} len()={} bytes", &itemkey, &payload.len()),
Err(err) => log::error!("Error ingesting {} {:?}: {:?}", &itemkey, &payload, err),
}
}
}
Err(err) => {
log::error!("Error convert JSON to string: {}", err);
}
}
// println!("{}", &zjson);
}
Err(err) => {
log::error!("Error converting JSON: {:?}", err);
}
}
log::debug!("End of JSON");
}
log::debug!("End of JSON series");
}
Err(err) => log::error!("Error getting data from channel: {:?}", err),
}
} else {
stdlib::sleep::sleep(1);
}
}
}
Err(err) => {
log::error!("Error connecting to the bus: {:?}", err);
return;
}
}
});
drop(t);
}
Err(err) => {
log::error!("Error accessing Thread Manager: {:?}", err);
return;
}
}
}

0 comments on commit bc2e7ac

Please sign in to comment.