Skip to content

Commit

Permalink
cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
derklaro committed Jan 10, 2024
1 parent 5e10c53 commit c4c7f53
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 53 deletions.
10 changes: 5 additions & 5 deletions openmetrics_udpserver/src/aggregator/average.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fnv::FnvHashMap;
use crate::processor::ProcessorMetric;
use fnv::FnvHashMap;

pub struct AverageBucket {
pub sum: u64,
Expand All @@ -21,18 +21,18 @@ impl AggragatorAverageGauge {
let bucket: &mut AverageBucket = self
.buffer
.entry(metric.name.clone())
.or_insert(AverageBucket {sum: 0, count: 0});
.or_insert(AverageBucket { sum: 0, count: 0 });

bucket.sum += metric.count as u64;
bucket.sum += metric.count;
bucket.count += 1;
}

pub fn reset_and_fetch(&mut self) -> FnvHashMap<String, u64> {
let mut buf = FnvHashMap::default();
for (k, v) in &self.buffer {
buf.insert(k.to_string(), (v.sum / v.count) as u64);
buf.insert(k.to_string(), v.sum / v.count);
}

buf
}
}
}
9 changes: 6 additions & 3 deletions openmetrics_udpserver/src/aggregator/min.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fnv::FnvHashMap;
use crate::processor::ProcessorMetric;
use fnv::FnvHashMap;

pub struct AggragatorMinGauge {
buffer: FnvHashMap<String, u64>,
Expand All @@ -13,7 +13,10 @@ impl AggragatorMinGauge {
}

pub fn handle(&mut self, metric: &ProcessorMetric) {
let e: &mut u64 = self.buffer.entry(metric.name.clone()).or_insert(metric.count);
let e: &mut u64 = self
.buffer
.entry(metric.name.clone())
.or_insert(metric.count);

if *e > metric.count {
*e = metric.count;
Expand All @@ -25,4 +28,4 @@ impl AggragatorMinGauge {
::std::mem::swap(&mut swap_map, &mut self.buffer);
swap_map
}
}
}
4 changes: 2 additions & 2 deletions openmetrics_udpserver/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod peak;
pub mod average;
pub mod min;
pub mod average;
pub mod peak;
9 changes: 6 additions & 3 deletions openmetrics_udpserver/src/aggregator/peak.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fnv::FnvHashMap;
use crate::processor::ProcessorMetric;
use fnv::FnvHashMap;

pub struct AggragatorPeakGauge {
buffer: FnvHashMap<String, u64>,
Expand All @@ -13,7 +13,10 @@ impl AggragatorPeakGauge {
}

pub fn handle(&mut self, metric: &ProcessorMetric) {
let e: &mut u64 = self.buffer.entry(metric.name.clone()).or_insert(metric.count);
let e: &mut u64 = self
.buffer
.entry(metric.name.clone())
.or_insert(metric.count);

if *e < metric.count {
*e = metric.count;
Expand All @@ -25,4 +28,4 @@ impl AggragatorPeakGauge {
::std::mem::swap(&mut swap_map, &mut self.buffer);
swap_map
}
}
}
6 changes: 3 additions & 3 deletions openmetrics_udpserver/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ async fn get_metrics(
let registry = state.metric_registry.read().await;
let body = {
let mut buffer = String::new();
if !encode(&mut buffer, &registry).is_ok() {
if encode(&mut buffer, &registry).is_err() {
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
buffer
};

return Ok(Response::builder()
Response::builder()
.status(StatusCode::OK)
.header(
"Content-Type",
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)
.body(body)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?);
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}

pub(crate) fn bind(
Expand Down
44 changes: 28 additions & 16 deletions openmetrics_udpserver/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
mod aggregator;
mod config;
mod http_server;
mod processor;
mod serverdensity;
mod udp_server;
mod aggregator;

use crate::config::Config;
use crate::processor::{InboundMetric, Processor};
use crate::serverdensity::aggregator::{ServerDensityAggregator, ServerDensityConfig};
use crate::udp_server::UdpServer;
use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use prometheus_client::registry::Registry;
use std::process::exit;
use std::sync::{Arc};
use once_cell::sync::Lazy;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::registry::Registry;
use std::process::exit;
use std::sync::Arc;
use tokio::sync::broadcast::channel;
use tokio::sync::RwLock;


pub static METRIC_COUNTER_REQUESTS: Lazy<Counter<u64>> = Lazy::new(|| Default::default());
pub static METRIC_COUNTER_ERRORS: Lazy<Counter<u64>> = Lazy::new(|| Default::default());
pub static METRIC_COUNTER_UDP_PACKETS: Lazy<Counter<u64>> = Lazy::new(|| Default::default());

pub static METRIC_COUNTER_REQUESTS: Lazy<Counter<u64>> = Lazy::new(Default::default);
pub static METRIC_COUNTER_ERRORS: Lazy<Counter<u64>> = Lazy::new(Default::default);
pub static METRIC_COUNTER_UDP_PACKETS: Lazy<Counter<u64>> = Lazy::new(Default::default);

#[tokio::main]
async fn main() -> anyhow::Result<(), anyhow::Error> {
Expand Down Expand Up @@ -92,7 +90,7 @@ async fn main() -> anyhow::Result<(), anyhow::Error> {
.get_one::<String>("http-bind")
.ok_or(anyhow!("HTTP bind host is missing"))?
.to_string(),
disable_serverdensity: matches.get_flag("disable-serverdensity")
disable_serverdensity: matches.get_flag("disable-serverdensity"),
};

println!("UDP Monitor for OpenMetrics");
Expand All @@ -102,9 +100,21 @@ async fn main() -> anyhow::Result<(), anyhow::Error> {
println!("disable serverdensity: {}", &config.disable_serverdensity);

let mut registry = Registry::default();
registry.register("udpagent.requests.metrics", "requests to /metrics", METRIC_COUNTER_REQUESTS.clone());
registry.register("udpagent.errors", "internal errors", METRIC_COUNTER_ERRORS.clone());
registry.register("udpagent.udppackets", "udp packets", METRIC_COUNTER_UDP_PACKETS.clone());
registry.register(
"udpagent.requests.metrics",
"requests to /metrics",
METRIC_COUNTER_REQUESTS.clone(),
);
registry.register(
"udpagent.errors",
"internal errors",
METRIC_COUNTER_ERRORS.clone(),
);
registry.register(
"udpagent.udppackets",
"udp packets",
METRIC_COUNTER_UDP_PACKETS.clone(),
);

let metric_registry = Arc::new(RwLock::new(registry));
let (sender, receiver) = channel::<InboundMetric>(100_000);
Expand All @@ -113,11 +123,14 @@ async fn main() -> anyhow::Result<(), anyhow::Error> {
let server_density_aggregator_handle = if config.disable_serverdensity {
None
} else {
let server_density_config = ServerDensityConfig::from_args(matches).context("serverdensity args")?;
let server_density_config =
ServerDensityConfig::from_args(matches).context("serverdensity args")?;
let server_density_aggregator_receiver = sender.subscribe();
Some(tokio::spawn(async move {
let server_density_aggregator = ServerDensityAggregator::new(server_density_config);
server_density_aggregator.run(server_density_aggregator_receiver).await;
server_density_aggregator
.run(server_density_aggregator_receiver)
.await;
}))
};

Expand All @@ -135,7 +148,6 @@ async fn main() -> anyhow::Result<(), anyhow::Error> {
udp_server.run().await;
});


// bind the http server to serve open metrics requests
let http_server_registry = metric_registry.clone();
let http_server_handle = http_server::bind(&config, http_server_registry);
Expand Down
29 changes: 14 additions & 15 deletions openmetrics_udpserver/src/processor.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::collections::hash_map::Entry;
use crate::aggregator::average::AggragatorAverageGauge;
use crate::aggregator::min::AggragatorMinGauge;
use crate::aggregator::peak::AggragatorPeakGauge;
use crate::config::Config;
use crate::METRIC_COUNTER_ERRORS;
use openmetrics_udpserver_lib::MetricType;
use prometheus_client::registry::{Metric, Registry};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use regex::Regex;
use std::sync::Arc;
use std::collections::hash_map::Entry;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::Arc;
use std::time::Duration;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use crate::aggregator::average::AggragatorAverageGauge;
use crate::aggregator::min::AggragatorMinGauge;
use crate::aggregator::peak::AggragatorPeakGauge;
use crate::METRIC_COUNTER_ERRORS;

#[derive(Debug, Clone)]
pub struct InboundMetric {
Expand All @@ -40,7 +40,6 @@ pub struct ProcessorMetric {

impl ProcessorMetric {
pub fn from_inbound(name: String, inbound_metric: InboundMetric) -> Self {

let name = match inbound_metric.metric_type {
// this is some kind of legacy. we would end up with _total_total because the application is already sending _total and the client is also appending _total
MetricType::Sum => name.trim_end_matches("_total").to_string(),
Expand Down Expand Up @@ -138,11 +137,11 @@ impl Processor {
async fn handle_counter(&mut self, metric: &ProcessorMetric) {
match self.counters.entry(metric.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().inc_by(metric.count as u64);
},
v.get_mut().inc_by(metric.count);
}
Entry::Vacant(vacant) => {
let counter = Counter::<u64, AtomicU64>::default();
counter.inc_by(metric.count as u64);
counter.inc_by(metric.count);
vacant.insert(counter.clone());

{
Expand All @@ -157,9 +156,9 @@ impl Processor {
match self.gauges.entry(metric_name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().set(metric_count as i64);
},
}
Entry::Vacant(vacant) => {
let mut gauge = Gauge::<i64, AtomicI64>::default();
let gauge = Gauge::<i64, AtomicI64>::default();
gauge.set(metric_count as i64);
vacant.insert(gauge.clone());

Expand Down
8 changes: 3 additions & 5 deletions openmetrics_udpserver/src/serverdensity/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::processor::InboundMetric;
use crate::serverdensity::{AverageHandler, MinHandler, PeakHandler, SumHandler};
use crate::METRIC_COUNTER_ERRORS;
use anyhow::{anyhow, Context};
use clap::ArgMatches;
use openmetrics_udpserver_lib::MetricType;
use regex::Regex;
Expand All @@ -8,10 +10,7 @@ use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Read};
use std::time::{Duration, SystemTime};
use anyhow::{anyhow, Context};
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::broadcast::Receiver;
use crate::METRIC_COUNTER_ERRORS;

#[derive(Clone, Debug)]
pub struct ServerDensityConfig {
Expand Down Expand Up @@ -133,7 +132,6 @@ impl ServerDensityAggregator {
let mut flush_interval = ::tokio::time::interval(Duration::from_secs(10));

loop {

::tokio::select! {
_ = flush_interval.tick() => {
handler_sum.flush(&mut metricmap);
Expand Down Expand Up @@ -174,7 +172,7 @@ impl ServerDensityAggregator {
}
};
}
};
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion openmetrics_udpserver/src/udp_server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::config::Config;
use crate::processor::InboundMetric;
use crate::{METRIC_COUNTER_ERRORS, METRIC_COUNTER_UDP_PACKETS};
use byteorder::BigEndian;
use byteorder::ByteOrder;
use openmetrics_udpserver_lib::MetricType;
use tokio::net::UdpSocket;
use tokio::sync::broadcast::Sender;
use crate::{METRIC_COUNTER_ERRORS, METRIC_COUNTER_UDP_PACKETS};

pub struct UdpServer {
config: Config,
Expand Down

0 comments on commit c4c7f53

Please sign in to comment.