Skip to content

Commit

Permalink
Use a separate thread to handle jobstats and cache it
Browse files Browse the repository at this point in the history
  • Loading branch information
RDruon committed Jul 15, 2024
1 parent 99658e4 commit d444fef
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
lustrefs-exporter/src/snapshots/lustrefs_exporter__tests__valid_fixture_lustre-2.14.0_ddn125__ds86.txt.snap filter=lfs diff=lfs merge=lfs -text
lustre-collector/src/fixtures/valid/lustre-2.14.0_ddn125/ds86.txt filter=lfs diff=lfs merge=lfs -text
lustre-collector/src/snapshots/lustre_collector__parser__tests__valid_fixture_lustre-2.14.0_ddn125__ds86.txt.snap filter=lfs diff=lfs merge=lfs -text
lustre-collector/src/snapshots/lustre_collector__parser__tests__params_jobstats_only.snap filter=lfs diff=lfs merge=lfs -text
lustre-collector/src/snapshots/lustre_collector__parser__tests__params_no_jobstats.snap filter=lfs diff=lfs merge=lfs -text
lustre-collector/src/snapshots/lustre_collector__parser__tests__valid_fixture_lustre-2.14.0_ddn125__jobstats_only.txt.snap filter=lfs diff=lfs merge=lfs -text
lustrefs-exporter/src/snapshots/lustrefs_exporter__tests__valid_fixture_lustre-2.14.0_ddn125__jobstats_only.txt.snap filter=lfs diff=lfs merge=lfs -text
lustre-collector/src/fixtures/valid/lustre-2.14.0_ddn125/jobstats_only.txt filter=lfs diff=lfs merge=lfs -text
Git LFS file not shown
1 change: 1 addition & 0 deletions lustre-collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ fn check_output(records: Vec<Record>, state: &str) -> Result<Vec<Record>, Lustre
let params = crate::parser::params().join(" ");

if !state.is_empty() {
println!("Left : {}", state);
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Content left in input buffer. Please run and supply to support: `lctl get_param {params}`"),
Expand Down
7 changes: 4 additions & 3 deletions lustre-collector/src/mds/job_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::types::{JobStatMdt, JobStatsMdt};
use combine::{
attempt,
attempt, eof,
error::{ParseError, StreamError},
optional,
parser::{
Expand All @@ -22,9 +22,10 @@ where
{
(
optional(newline()), // If Jobstats are present, the whole yaml blob will be on a newline
take_until(attempt((newline(), alpha_num()))),
take_until(attempt((newline(), alpha_num()).map(drop).or(eof()))),
)
.skip(newline())
.skip(optional(newline()))
.skip(optional(eof()))
.and_then(|(_, x): (_, String)| {
serde_yaml::from_str(&x)
.map(|x: JobStatsMdt| x.job_stats)
Expand Down
14 changes: 14 additions & 0 deletions lustre-collector/src/mds/mdt_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ pub(crate) fn params() -> Vec<String> {
.collect()
}

pub(crate) fn params_no_jobstats() -> Vec<String> {
[
format!("mdt.*.{STATS}"),
format!("mdt.*MDT*.{NUM_EXPORTS}"),
format!("mdt.*MDT*.{EXPORTS_PARAMS}"),
]
.into_iter()
.collect()
}

pub(crate) fn params_jobstats_only() -> Vec<String> {
vec![format!("mdt.*.{JOB_STATS}")]
}

fn target_name<I>() -> impl Parser<I, Output = Target>
where
I: Stream<Token = char>,
Expand Down
11 changes: 11 additions & 0 deletions lustre-collector/src/mds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ pub(crate) fn params() -> Vec<String> {
.collect()
}

pub(crate) fn params_no_jobstats() -> Vec<String> {
mds_parser::params()
.into_iter()
.chain(mdt_parser::params_no_jobstats())
.collect()
}

pub(crate) fn params_jobstats_only() -> Vec<String> {
mdt_parser::params_jobstats_only()
}

pub(crate) fn parse<I>() -> impl Parser<I, Output = Record>
where
I: Stream<Token = char>,
Expand Down
11 changes: 11 additions & 0 deletions lustre-collector/src/oss/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ pub(crate) fn params() -> Vec<String> {
.collect()
}

pub(crate) fn params_no_jobstats() -> Vec<String> {
obdfilter_parser::obd_params_no_jobstats()
.into_iter()
.chain(oss_parser::params())
.collect()
}

pub(crate) fn params_jobstats_only() -> Vec<String> {
obdfilter_parser::obd_params_jobstats_only()
}

pub(crate) fn parse<I>() -> impl Parser<I, Output = Record>
where
I: Stream<Token = char>,
Expand Down
25 changes: 25 additions & 0 deletions lustre-collector/src/oss/obdfilter_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ pub(crate) const OBD_STATS: [&str; 7] = [
EXPORTS_PARAMS,
];

pub(crate) const OBD_STATS_NO_JOBSTATS: [&str; 6] = [
STATS,
NUM_EXPORTS,
TOT_DIRTY,
TOT_GRANTED,
TOT_PENDING,
EXPORTS_PARAMS,
];

pub(crate) const OBD_STATS_JOBSTATS_ONLY: [&str; 1] = [JOBSTATS];
/// Takes OBD_STATS and produces a list of params for
/// consumption in proper ltcl get_param format.
pub(crate) fn obd_params() -> Vec<String> {
Expand All @@ -48,6 +58,21 @@ pub(crate) fn obd_params() -> Vec<String> {
.collect()
}

/// Takes OBD_STATS and produces a list of params for
/// consumption in proper ltcl get_param format.
pub(crate) fn obd_params_no_jobstats() -> Vec<String> {
OBD_STATS_NO_JOBSTATS
.iter()
.map(|x| format!("obdfilter.*OST*.{x}"))
.collect()
}
pub(crate) fn obd_params_jobstats_only() -> Vec<String> {
OBD_STATS_JOBSTATS_ONLY
.iter()
.map(|x| format!("obdfilter.*OST*.{x}"))
.collect()
}

/// Parses the name of a target
fn target_name<I>() -> impl Parser<I, Output = Target>
where
Expand Down
32 changes: 32 additions & 0 deletions lustre-collector/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ pub fn params() -> Vec<String> {
.collect()
}

pub fn params_no_jobstats() -> Vec<String> {
top_level_parser::top_level_params()
.into_iter()
.chain(client_count_parser::params())
.chain(osd_parser::params())
.chain(mgs_parser::params())
.chain(oss::params_no_jobstats())
.chain(mds::params_no_jobstats())
.chain(ldlm::params())
.chain(llite::params())
.chain(mdd_parser::params())
.chain(quota::params())
.collect()
}

pub fn params_jobstats_only() -> Vec<String> {
oss::params_jobstats_only()
.into_iter()
.chain(mds::params_jobstats_only())
.collect()
}

pub fn parse<I>() -> impl Parser<I, Output = Vec<Record>>
where
I: Stream<Token = char>,
Expand Down Expand Up @@ -90,6 +112,16 @@ mod tests {
assert_debug_snapshot!(params());
}

#[test]
fn test_params_no_jobstats() {
assert_debug_snapshot!(params_no_jobstats());
}

#[test]
fn test_params_jobstats_only() {
assert_debug_snapshot!(params_jobstats_only());
}

#[test]
fn test_mdt_output() {
let x = r#"memused=343719411
Expand Down
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
65 changes: 62 additions & 3 deletions lustrefs-exporter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use std::{sync::Arc, time::Duration};

use clap::Parser;
use lustre_collector::{parse_lctl_output, parse_lnetctl_output, parse_lnetctl_stats, parser};
use lustrefs_exporter::build_lustre_stats;
use prometheus_exporter_base::prelude::*;

use tokio::process::Command;
use tokio::{
process::Command,
sync::Mutex,
time::{interval, MissedTickBehavior},
};

const LUSTREFS_EXPORTER_PORT: &str = "32221";

Expand All @@ -31,14 +37,60 @@ async fn main() {
authorization: Authorization::None,
};

let mtx_record = Arc::new(Mutex::new(None));
let mtx_record2 = Arc::clone(&mtx_record);

let mut ticker = interval(Duration::from_secs(10));
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);

tokio::spawn(async move {
loop {
ticker.tick().await;

let lctl = match Command::new("lctl")
.arg("get_param")
.args(parser::params_jobstats_only())
.kill_on_drop(true)
.output()
.await
{
Ok(r) => r,
Err(e) => {
tracing::debug!("Failed to retrieve jobstats parameters. {e}");
continue;
}
};

// Offload CPU-intensive parsing to a blocking task
let parsed_result =
tokio::task::spawn_blocking(move || parse_lctl_output(&lctl.stdout)).await;

match parsed_result {
Ok(Ok(r)) => {
let stat = build_lustre_stats(r);
let mut lock = mtx_record.lock().await;
*lock = Some(stat);
}
Ok(Err(e)) => {
tracing::debug!("Failed to parse jobstats information. {e}");
continue;
}
Err(e) => {
tracing::debug!("Failed to execute parse_lctl_output in blocking task. {e}");
continue;
}
}
}
});

render_prometheus(server_opts, Options, |request, options| async move {
tracing::debug!(?request, ?options);

let mut output = vec![];

let lctl = Command::new("lctl")
.arg("get_param")
.args(parser::params())
.args(parser::params_no_jobstats())
.kill_on_drop(true)
.output()
.await?;
Expand Down Expand Up @@ -69,7 +121,14 @@ async fn main() {

output.append(&mut lnetctl_stats_record);

Ok(build_lustre_stats(output))
let lustre_stats = build_lustre_stats(output);

let jobstats = { mtx_record2.lock().await.clone() };

match jobstats {
Some(jobstats) => Ok([lustre_stats, jobstats].join("\n")),
None => Ok(lustre_stats),
}
})
.await;
}
Expand Down
Git LFS file not shown

0 comments on commit d444fef

Please sign in to comment.