Skip to content

Commit

Permalink
Run the influxdb reporter in StatsReporter
Browse files Browse the repository at this point in the history
  • Loading branch information
datdenkikniet committed Jul 10, 2023
1 parent 66ee57e commit 1dc9bf9
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 153 deletions.
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ num-traits = { version = "*", optional = true }
futures = { version = "0.3", optional = true }

[features]
default = [ ]
default = [ "influxdb2" ]
influxdb2 = [ "dep:influxdb2", "dep:influxdb2-structmap", "dep:num-traits", "dep:futures" ]

[dev-dependencies]
Expand Down
23 changes: 1 addition & 22 deletions server/src/influxdb.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};

use futures::TryFutureExt;
use influxdb2::{
models::{DataPoint, Query},
Expand Down Expand Up @@ -44,22 +39,6 @@ impl InfluxDB {
}
}

pub async fn run(
mut self,
stats: Arc<Stats>,
keep_running: Arc<AtomicBool>,
interval: Duration,
) -> Result<(), String> {
let mut reporting_interval = tokio::time::interval(interval);

while keep_running.load(std::sync::atomic::Ordering::Relaxed) {
reporting_interval.tick().await;
self.write_stats(&stats).await?;
}

Ok(())
}

async fn load_stat(&mut self, stat_name: &str) -> Result<u64, String> {
let Self {
server_name,
Expand Down Expand Up @@ -112,7 +91,7 @@ impl InfluxDB {
Stats::new_with(pixels_set as usize, bandwidth as usize)
}

async fn write_stats(&mut self, stats: &Stats) -> Result<(), String> {
pub async fn write_stats(&mut self, stats: &Stats) -> Result<(), String> {
let bandwidth_used = stats.bytes_read();
let pixels_set = stats.pixels();
let clients = stats.clients();
Expand Down
44 changes: 23 additions & 21 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,6 @@ fn main() {
let stats = runtime.block_on(build_stats(stat_save_opts));
let stats = Arc::new(stats);

#[cfg(feature = "influxdb2")]
stat_save_opts.influxdb_config().map(|c| {
if let Some(interval) = stat_save_opts.stats_save_interval() {
let client = influxdb::InfluxDB::new(c);
runtime.spawn(client.run(stats.clone(), keep_running.clone(), interval));
}
});

if let Some(dir) = arg_handler.save_dir.clone() {
let pixmap = pixmap.clone();
runtime.spawn(spawn_save_image(
Expand All @@ -90,6 +82,8 @@ fn main() {
let net_running_2 = keep_running.clone();
let opts = arg_handler.clone().into();

let renderer = build_renderer(&arg_handler, pixmap, stats, keep_running, &runtime);

let tokio_runtime = std::thread::spawn(move || {
runtime.block_on(async move {
listen(listener, net_pixmap, net_stats, opts).await;
Expand All @@ -98,7 +92,7 @@ fn main() {
});

if !arg_handler.no_render {
render(&arg_handler, pixmap, stats, keep_running);
renderer();
} else {
tokio_runtime.join().unwrap()
}
Expand Down Expand Up @@ -222,12 +216,13 @@ fn handle_socket(
}

/// Start the pixel map renderer.
fn render(
arg_handler: &Opts,
fn build_renderer<'a>(
arg_handler: &'a Opts,
pixmap: Arc<Pixmap>,
stats: Arc<Stats>,
net_running: Arc<AtomicBool>,
) {
runtime: &tokio::runtime::Runtime,
) -> impl FnOnce() -> () + 'a {
// Build the renderer
let renderer = Renderer::new(env!("CARGO_PKG_NAME"), pixmap);

Expand All @@ -242,16 +237,23 @@ fn render(
arg_handler.stat_options.stats_file.clone(),
stats,
Some(stats_text),
#[cfg(feature = "influxdb2")]
arg_handler
.stat_options
.influxdb_config()
.map(|c| influxdb::InfluxDB::new(c)),
);
reporter.start();
runtime.spawn(reporter.run());

// Render the canvas
renderer.run(
arg_handler.fullscreen,
arg_handler.stats_font_size,
arg_handler.stats_offset(),
arg_handler.stats_padding,
arg_handler.stats_col_spacing,
net_running,
);
|| {
renderer.run(
arg_handler.fullscreen,
arg_handler.stats_font_size,
arg_handler.stats_offset(),
arg_handler.stats_padding,
arg_handler.stats_col_spacing,
net_running,
)
}
}
198 changes: 89 additions & 109 deletions server/src/stat_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use parking_lot::Mutex;
use std::cmp::min;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::{self, sleep};
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant};

use crate::stats::Stats;

Expand All @@ -17,27 +16,22 @@ pub struct StatReporter {
/// If none, no screen stats should be reported.
stdout_interval: Option<Duration>,

/// The interval to save the persistent file with.
/// The interval to save the persistent file and/or influxdb2 with.
/// If none, no stats will be saved.
save_interval: Option<Duration>,

/// The file to save persistent stats to.
save_path: Option<PathBuf>,

/// The last time the screen stats were updated.
screen_last: Arc<Mutex<Option<SystemTime>>>,

/// The last time the stdout stats were updated.
stdout_last: Arc<Mutex<Option<SystemTime>>>,

/// The last time the stats were saved.
save_last: Arc<Mutex<Option<SystemTime>>>,

/// A stats manager.
stats: Arc<Stats>,

/// A string mutex for text on the screen.
screen: Arc<Option<Arc<Mutex<String>>>>,
screen: Option<Arc<Mutex<String>>>,

/// The InfluxDB client to report stats to
#[cfg(feature = "influxdb2")]
influxdb_client: Option<crate::influxdb::InfluxDB>,
}

impl StatReporter {
Expand All @@ -49,136 +43,122 @@ impl StatReporter {
save_path: Option<PathBuf>,
stats: Arc<Stats>,
screen: Option<Arc<Mutex<String>>>,
#[cfg(feature = "influxdb2")] influxdb_client: Option<crate::influxdb::InfluxDB>,
) -> Self {
StatReporter {
screen_interval,
stdout_interval,
save_interval,
save_path,
screen_last: Arc::new(Mutex::new(None)),
stdout_last: Arc::new(Mutex::new(None)),
save_last: Arc::new(Mutex::new(None)),
stats,
screen: Arc::new(screen),
screen: screen,
#[cfg(feature = "influxdb2")]
influxdb_client,
}
}

/// Start the reporter, and spawn a thread internally which controls the
/// reporting.
pub fn start(&self) {
pub async fn run(mut self) {
// Do not actually start a thread if there is nothing to report
if self.screen_interval.is_none() && self.stdout_interval.is_none() {
let should_stop = self.screen_interval.is_none() && self.stdout_interval.is_none();

#[cfg(feature = "influxdb2")]
let should_stop = should_stop && self.influxdb_client.is_none();

if should_stop {
return;
}

// Clone the arcs for use in the reporter thread
let stats = self.stats.clone();
let screen = self.screen.clone();
let screen_interval = self.screen_interval.clone();
let stdout_interval = self.stdout_interval.clone();
let save_interval = self.save_interval.clone();
let screen_last = self.screen_last.clone();
let stdout_last = self.stdout_last.clone();
let save_last = self.save_last.clone();
let save_path = self.save_path.clone();
let mut screen_last = Instant::now();
let mut stdout_last = Instant::now();
let mut save_last = Instant::now();

// Update the statistics text each second in a separate thread
thread::spawn(move || {
loop {
// When the next update should happen, at least once a second
let mut next_update = Duration::from_secs(1);

// Check the screen update time
if let Some(interval) = screen_interval {
// Get the last screen time
let mut last = screen_last.lock();

// Get the number of elapsed seconds since the last report
let elapsed = last
.map(|last| last.elapsed().ok())
.unwrap_or(None)
.unwrap_or(Duration::from_secs(0));

// Report stats to the screen
if last.is_none() || elapsed >= interval {
if let Some(ref screen) = *screen {
Self::report_screen(&stats, screen);
*last = Some(SystemTime::now());
}
}

// See how long we should take, update the next update time
next_update = min(
next_update,
interval.checked_sub(elapsed).unwrap_or(interval),
);
}
loop {
// When the next update should happen, at least once a second
let mut next_update = Duration::from_secs(1);

// Check the stdout update time
if let Some(interval) = stdout_interval {
// Get the last stdout time
let mut last = stdout_last.lock();

// Get the number of elapsed seconds since the last report
let elapsed = last
.map(|last| last.elapsed().ok())
.unwrap_or(None)
.unwrap_or(Duration::from_secs(0));

// Report stats to the stdout
if last.is_none() || elapsed >= interval {
Self::report_stdout(&stats);
*last = Some(SystemTime::now());
// Check the screen update time
if let Some(interval) = self.screen_interval {
// Get the number of elapsed seconds since the last report
let elapsed = screen_last.elapsed();

// Report stats to the screen
if elapsed >= interval {
if let Some(screen) = &self.screen {
Self::report_screen(&self.stats, &mut screen.lock());
screen_last = Instant::now();
}
}

// See how long we should take, update the next update time
next_update = min(
next_update,
interval.checked_sub(elapsed).unwrap_or(interval),
);
// See how long we should take, update the next update time
next_update = min(
next_update,
interval.checked_sub(elapsed).unwrap_or(interval),
);
}

// Check the stdout update time
if let Some(interval) = self.stdout_interval {
// Get the number of elapsed seconds since the last report
let elapsed = stdout_last.elapsed();

// Report stats to the stdout
if elapsed >= interval {
Self::report_stdout(&self.stats);
stdout_last = Instant::now();
}

// Check the stats save update time
if let Some(interval) = save_interval {
// Get the last save time
let mut last = save_last.lock();

// Get the number of elapsed seconds since the last save
let elapsed = last
.map(|last| last.elapsed().ok())
.unwrap_or(None)
.unwrap_or(Duration::from_secs(0));

// Report stats to the stdout
if last.is_none() || elapsed >= interval {
// Create a raw stats instance
log::debug!("Saving persistent stats...");
let raw = stats.to_raw();

// Save the raw stats
if let Some(save_path) = &save_path {
raw.save(save_path.as_path())
}
// See how long we should take, update the next update time
next_update = min(
next_update,
interval.checked_sub(elapsed).unwrap_or(interval),
);
}

// Check the stats save update time
if let Some(interval) = self.save_interval {
// Get the number of elapsed seconds since the last save
let elapsed = save_last.elapsed();

// Report stats to the stdout
if elapsed >= interval {
// Create a raw stats instance
log::debug!("Saving persistent stats...");
let raw = self.stats.to_raw();

*last = Some(SystemTime::now());
// Save the raw stats
if let Some(save_path) = &self.save_path {
raw.save(save_path.as_path())
}

// See how long we should take, update the next update time
next_update = min(
next_update,
interval.checked_sub(elapsed).unwrap_or(interval),
);
if let Some(client) = &mut self.influxdb_client {
if let Err(e) = client.write_stats(&self.stats).await {
log::error!("Failed to write stats to influxdb: {e}");
}
}

save_last = Instant::now();
}

// Sleep for the specified duration
sleep(next_update);
// See how long we should take, update the next update time
next_update = min(
next_update,
interval.checked_sub(elapsed).unwrap_or(interval),
);
}
});

// Sleep for the specified duration
tokio::time::sleep(next_update).await;
}
}

/// Report the stats to the screen.
fn report_screen(stats: &Arc<Stats>, screen: &Arc<Mutex<String>>) {
*screen.lock() = format!(
fn report_screen(stats: &Stats, screen: &mut String) {
*screen = format!(
"CONNECT WITH: \tpx:\t{}\t{}\tclients: {}\ntelnet localhost 1234 \tin:\t{}\t{}",
stats.pixels_human(),
stats.pixels_sec_human(),
Expand Down

0 comments on commit 1dc9bf9

Please sign in to comment.