Skip to content

Commit

Permalink
add some internal metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
timglabisch committed Dec 19, 2023
1 parent 0cfca2f commit d2f2e2b
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions openmetrics_udpserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ fnv = "1.*"
# servedensity specific deps
md5 = "0.7.*"
reqwest = { version = "0.11.*", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots"] }
once_cell = "1.18.0"
3 changes: 3 additions & 0 deletions openmetrics_udpserver/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::METRIC_COUNTER_REQUESTS;

#[derive(Clone)]
struct HttpServerState {
Expand All @@ -20,6 +21,8 @@ struct HttpServerState {
async fn get_metrics(
State(state): State<Arc<HttpServerState>>,
) -> Result<Response<String>, StatusCode> {
METRIC_COUNTER_REQUESTS.inc();

let registry = state.metric_registry.read().await;
let body = {
let mut buffer = String::new();
Expand Down
17 changes: 15 additions & 2 deletions openmetrics_udpserver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@ use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use prometheus_client::registry::Registry;
use std::process::exit;
use std::sync::Arc;
use std::sync::{Arc};
use once_cell::sync::Lazy;
use prometheus_client::metrics::counter::Counter;
use tokio::sync::broadcast::channel;
use tokio::sync::RwLock;


pub static METRIC_COUNTER_REQUESTS: Lazy<Counter<u64>> = Lazy::new(|| Default::default());
pub static METRIC_COUNTER_ERRORS: Lazy<Counter<u64>> = Lazy::new(|| Default::default());
pub static METRIC_COUNTER_UDP_PACKETS: Lazy<Counter<u64>> = Lazy::new(|| Default::default());


#[tokio::main]
async fn main() -> anyhow::Result<(), anyhow::Error> {
let matches = Command::new("Prometheus UDP Monitor")
Expand Down Expand Up @@ -93,7 +101,12 @@ async fn main() -> anyhow::Result<(), anyhow::Error> {
println!("http host: {}", &config.http_bind);
println!("disable serverdensity: {}", &config.disable_serverdensity);

let metric_registry = Arc::new(RwLock::new(Registry::default()));
let mut registry = Registry::default();
registry.register("udpagent.requests.metrics", "requests to /metrics", METRIC_COUNTER_REQUESTS.clone());
registry.register("udpagent.errors", "internal errors", METRIC_COUNTER_ERRORS.clone());
registry.register("udpagent.udppackets", "udp packets", METRIC_COUNTER_UDP_PACKETS.clone());

let metric_registry = Arc::new(RwLock::new(registry));
let (sender, receiver) = channel::<InboundMetric>(100_000);

// server density aggregator
Expand Down
2 changes: 2 additions & 0 deletions openmetrics_udpserver/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::sync::RwLock;
use crate::aggregator::average::AggragatorAverageGauge;
use crate::aggregator::min::AggragatorMinGauge;
use crate::aggregator::peak::AggragatorPeakGauge;
use crate::METRIC_COUNTER_ERRORS;

#[derive(Debug, Clone)]
pub struct InboundMetric {
Expand Down Expand Up @@ -82,6 +83,7 @@ impl Processor {
match msg {
Ok(inbound_metric) => self.handle_metric(&regex_allowed_chars, inbound_metric).await,
Err(e) => {
METRIC_COUNTER_ERRORS.inc();
eprintln!("processor recv error {:#?}, investigate!", e);
::tokio::time::sleep(Duration::from_millis(300)).await;
}
Expand Down
5 changes: 5 additions & 0 deletions openmetrics_udpserver/src/serverdensity/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{anyhow, Context};
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::broadcast::Receiver;
use crate::METRIC_COUNTER_ERRORS;

#[derive(Clone, Debug)]
pub struct ServerDensityConfig {
Expand Down Expand Up @@ -167,6 +168,7 @@ impl ServerDensityAggregator {
};
}
Err(e) => {
METRIC_COUNTER_ERRORS.inc();
eprintln!("aggregator recv error {:#?}, investigate!", e);
::tokio::time::sleep(Duration::from_millis(300)).await;
}
Expand Down Expand Up @@ -259,6 +261,7 @@ impl ServerDensityAggregator {
(duration.as_secs() * 1000) + (duration.subsec_nanos() as u64 / 1000000)
}
Err(_) => {
METRIC_COUNTER_ERRORS.inc();
println!("seems to have trouble with the clock, should never happen.");
return;
}
Expand All @@ -272,11 +275,13 @@ impl ServerDensityAggregator {
println!("submitted to serverdensity, took {}ms \n--- metrics --- \n{:#?} \n\n{} \n----\n", &send_data_to_backend_tooked_in_ms, data, &content);
}
Err(err) => {
METRIC_COUNTER_ERRORS.inc();
println!("submitted to serverdentity, status: {}, but could not read response: {}", response_status, err);
}
}
}
Err(err) => {
METRIC_COUNTER_ERRORS.inc();
println!("failed to send to serverdensity, status {:?}", err.status());
println!("error: {:?}", err);
}
Expand Down
4 changes: 4 additions & 0 deletions openmetrics_udpserver/src/udp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use byteorder::ByteOrder;
use openmetrics_udpserver_lib::MetricType;
use tokio::net::UdpSocket;
use tokio::sync::broadcast::Sender;
use crate::{METRIC_COUNTER_ERRORS, METRIC_COUNTER_UDP_PACKETS};

pub struct UdpServer {
config: Config,
Expand All @@ -29,10 +30,12 @@ impl UdpServer {
match self.decode_buffer(&buf, read_bytes) {
Ok(inbound_metric) => {
if let Err(err) = self.metric_sender.send(inbound_metric) {
METRIC_COUNTER_ERRORS.inc();
eprintln!("Unable to process inbound metric: {}", err);
}
}
Err(err) => {
METRIC_COUNTER_ERRORS.inc();
// it could be, that we are so fast that we read a part of the message, may we need to improve this code.
eprintln!("could not decode message from socket: {}", err);
}
Expand All @@ -52,6 +55,7 @@ impl UdpServer {
.to_string()
.replace('"', "");

METRIC_COUNTER_UDP_PACKETS.inc();
Ok(InboundMetric {
count,
name,
Expand Down

0 comments on commit d2f2e2b

Please sign in to comment.