Skip to content

Commit

Permalink
Add a 'disable_jobstats' flag
Browse files Browse the repository at this point in the history
  • Loading branch information
RDruon committed Aug 5, 2024
1 parent 56c49b0 commit dbdd115
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 4 deletions.
10 changes: 10 additions & 0 deletions lustre-collector/src/mds/mdt_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ pub(crate) fn params() -> Vec<String> {
.collect()
}

pub(crate) fn params_no_jobstats() -> Vec<String> {
[
format!("mdt.*.{STATS}"),
format!("mdt.*MDT*.{NUM_EXPORTS}"),
format!("mdt.*MDT*.{EXPORTS_PARAMS}"),
]
.into_iter()
.collect()
}

fn target_name<I>() -> impl Parser<I, Output = Target>
where
I: Stream<Token = char>,
Expand Down
7 changes: 7 additions & 0 deletions lustre-collector/src/mds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ pub(crate) fn params() -> Vec<String> {
.collect()
}

pub(crate) fn params_no_jobstats() -> Vec<String> {
mds_parser::params()
.into_iter()
.chain(mdt_parser::params_no_jobstats())
.collect()
}

pub(crate) fn parse<I>() -> impl Parser<I, Output = Record>
where
I: Stream<Token = char>,
Expand Down
7 changes: 7 additions & 0 deletions lustre-collector/src/oss/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ pub(crate) fn params() -> Vec<String> {
.collect()
}

pub(crate) fn params_no_jobstats() -> Vec<String> {
obdfilter_parser::obd_params_no_jobstats()
.into_iter()
.chain(oss_parser::params())
.collect()
}

pub(crate) fn parse<I>() -> impl Parser<I, Output = Record>
where
I: Stream<Token = char>,
Expand Down
18 changes: 18 additions & 0 deletions lustre-collector/src/oss/obdfilter_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ pub(crate) const OBD_STATS: [&str; 7] = [
EXPORTS_PARAMS,
];

pub(crate) const OBD_STATS_NO_JOBSTATS: [&str; 6] = [
STATS,
NUM_EXPORTS,
TOT_DIRTY,
TOT_GRANTED,
TOT_PENDING,
EXPORTS_PARAMS,
];

/// Takes OBD_STATS and produces a list of params for
/// consumption in proper ltcl get_param format.
pub(crate) fn obd_params() -> Vec<String> {
Expand All @@ -48,6 +57,15 @@ pub(crate) fn obd_params() -> Vec<String> {
.collect()
}

/// Takes OBD_STATS_NO_JOBSTATS and produces a list of params for
/// consumption in proper ltcl get_param format.
pub(crate) fn obd_params_no_jobstats() -> Vec<String> {
OBD_STATS_NO_JOBSTATS
.iter()
.map(|x| format!("obdfilter.*OST*.{x}"))
.collect()
}

/// Parses the name of a target
fn target_name<I>() -> impl Parser<I, Output = Target>
where
Expand Down
20 changes: 20 additions & 0 deletions lustre-collector/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ pub fn params() -> Vec<String> {
.collect()
}

pub fn params_no_jobstats() -> Vec<String> {
top_level_parser::top_level_params()
.into_iter()
.chain(client_count_parser::params())
.chain(osd_parser::params())
.chain(mgs_parser::params())
.chain(oss::params_no_jobstats())
.chain(mds::params_no_jobstats())
.chain(ldlm::params())
.chain(llite::params())
.chain(mdd_parser::params())
.chain(quota::params())
.collect()
}

pub fn parse<I>() -> impl Parser<I, Output = Vec<Record>>
where
I: Stream<Token = char>,
Expand Down Expand Up @@ -90,6 +105,11 @@ mod tests {
assert_debug_snapshot!(params());
}

#[test]
fn test_params_no_jobstats() {
assert_debug_snapshot!(params_no_jobstats());
}

#[test]
fn test_mdt_output() {
let x = r#"memused=343719411
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
source: lustre-collector/src/parser.rs
expression: params_no_jobstats()
---
[
"memused",
"memused_max",
"lnet_memused",
"health_check",
"mdt.*.exports.*.uuid",
"osd-*.*.filesfree",
"osd-*.*.filestotal",
"osd-*.*.fstype",
"osd-*.*.kbytesavail",
"osd-*.*.kbytesfree",
"osd-*.*.kbytestotal",
"osd-*.*.brw_stats",
"osd-*.*.quota_slave.acct_group",
"osd-*.*.quota_slave.acct_user",
"osd-*.*.quota_slave.acct_project",
"mgs.*.mgs.stats",
"mgs.*.mgs.threads_max",
"mgs.*.mgs.threads_min",
"mgs.*.mgs.threads_started",
"mgs.*.num_exports",
"obdfilter.*OST*.stats",
"obdfilter.*OST*.num_exports",
"obdfilter.*OST*.tot_dirty",
"obdfilter.*OST*.tot_granted",
"obdfilter.*OST*.tot_pending",
"obdfilter.*OST*.exports.*.stats",
"ost.OSS.ost.stats",
"ost.OSS.ost_io.stats",
"ost.OSS.ost_create.stats",
"ost.OSS.ost_out.stats",
"ost.OSS.ost_seq.stats",
"mds.MDS.mdt.stats",
"mds.MDS.mdt_fld.stats",
"mds.MDS.mdt_io.stats",
"mds.MDS.mdt_out.stats",
"mds.MDS.mdt_readpage.stats",
"mds.MDS.mdt_seqm.stats",
"mds.MDS.mdt_seqs.stats",
"mds.MDS.mdt_setattr.stats",
"mdt.*.md_stats",
"mdt.*MDT*.num_exports",
"mdt.*MDT*.exports.*.stats",
"ldlm.namespaces.{mdt-,filter-}*.contended_locks",
"ldlm.namespaces.{mdt-,filter-}*.contention_seconds",
"ldlm.namespaces.{mdt-,filter-}*.ctime_age_limit",
"ldlm.namespaces.{mdt-,filter-}*.early_lock_cancel",
"ldlm.namespaces.{mdt-,filter-}*.lock_count",
"ldlm.namespaces.{mdt-,filter-}*.lock_timeouts",
"ldlm.namespaces.{mdt-,filter-}*.lock_unused_count",
"ldlm.namespaces.{mdt-,filter-}*.lru_max_age",
"ldlm.namespaces.{mdt-,filter-}*.lru_size",
"ldlm.namespaces.{mdt-,filter-}*.max_nolock_bytes",
"ldlm.namespaces.{mdt-,filter-}*.max_parallel_ast",
"ldlm.namespaces.{mdt-,filter-}*.resource_count",
"ldlm.services.ldlm_canceld.stats",
"ldlm.services.ldlm_cbd.stats",
"llite.*.stats",
"mdd.*.changelog_users",
"qmt.*.*.glb-usr",
"qmt.*.*.glb-prj",
"qmt.*.*.glb-grp",
]
49 changes: 49 additions & 0 deletions lustrefs-exporter/benches/jobstats_fast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use lustre_collector::{parse_lctl_output, Record};

fn test_data(repeat: usize) -> String {
let job = r#"
- job_id: "SLURM_JOB_machine184_74186:0:ma"
snapshot_time: 1720516680
read_bytes: { samples: 0, unit: bytes, min: 0, max: 0, sum: 0, sumsq: 0 }
write_bytes: { samples: 52, unit: bytes, min: 4096, max: 475136, sum: 5468160, sumsq: 1071040692224 }
read: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
write: { samples: 52, unit: usecs, min: 12, max: 40081, sum: 692342, sumsq: 17432258604 }
getattr: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
setattr: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
punch: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
sync: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
destroy: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
create: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
statfs: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
get_info: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
set_info: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
quotactl: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
prealloc: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }"#;
let lctl_output = r#"obdfilter.ds002-OST0000.job_stats=
job_stats:"#;

let input = format!("{lctl_output}{}", job.to_string().repeat(repeat));

input
}

fn parse_jobstats(repeat: usize) -> Vec<Record> {
let input = test_data(repeat);
parse_lctl_output(input.as_bytes()).unwrap()
}

fn criterion_benchmark_fast(c: &mut Criterion) {
c.bench_function("jobstats 100", |b| {
b.iter(|| parse_jobstats(black_box(100)))
});
c.bench_function("jobstats 1000", |b| {
b.iter(|| parse_jobstats(black_box(1000)))
});
}
criterion_group! {
name = benches;
config = Criterion::default();
targets = criterion_benchmark_fast
}
criterion_main!(benches);
52 changes: 52 additions & 0 deletions lustrefs-exporter/benches/jobstats_slow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::time::Duration;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use lustre_collector::{parse_lctl_output, Record};

fn test_data(repeat: usize) -> String {
let job = r#"
- job_id: "SLURM_JOB_machine184_74186:0:ma"
snapshot_time: 1720516680
read_bytes: { samples: 0, unit: bytes, min: 0, max: 0, sum: 0, sumsq: 0 }
write_bytes: { samples: 52, unit: bytes, min: 4096, max: 475136, sum: 5468160, sumsq: 1071040692224 }
read: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
write: { samples: 52, unit: usecs, min: 12, max: 40081, sum: 692342, sumsq: 17432258604 }
getattr: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
setattr: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
punch: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
sync: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
destroy: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
create: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
statfs: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
get_info: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
set_info: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
quotactl: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }
prealloc: { samples: 0, unit: usecs, min: 0, max: 0, sum: 0, sumsq: 0 }"#;
let lctl_output = r#"obdfilter.ds002-OST0000.job_stats=
job_stats:"#;

let input = format!("{lctl_output}{}", job.to_string().repeat(repeat));

input
}

fn parse_jobstats(repeat: usize) -> Vec<Record> {
let input = test_data(repeat);
parse_lctl_output(input.as_bytes()).unwrap()
}

fn criterion_benchmark_slow(c: &mut Criterion) {
c.bench_function("jobstats 10000", |b| {
b.iter(|| parse_jobstats(black_box(10000)))
});
c.bench_function("jobstats 100000", |b| {
b.iter(|| parse_jobstats(black_box(100000)))
});
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(10).measurement_time(Duration::from_secs(600));
targets = criterion_benchmark_slow
}
criterion_main!(benches);
18 changes: 14 additions & 4 deletions lustrefs-exporter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use axum::{
body::Body,
extract::State,
http::{self, StatusCode},
response::{IntoResponse, Response},
routing::get,
Expand Down Expand Up @@ -39,11 +40,14 @@ impl IntoResponse for Error {
}
}

#[derive(Debug, Parser)]
#[derive(Clone, Debug, Parser)]
pub struct CommandOpts {
/// Port that exporter will listen to
#[clap(short, long, env = "LUSTREFS_EXPORTER_PORT", default_value = LUSTREFS_EXPORTER_PORT)]
pub port: u16,
/// Disable jobstats processing
#[clap(short, long, env = "LUSTREFS_EXPORTER_JOBSTATS")]
pub disable_jobstats: bool,
}

#[tokio::main]
Expand All @@ -58,19 +62,25 @@ async fn main() -> Result<(), Error> {

let listener = tokio::net::TcpListener::bind(("0.0.0.0", opts.port)).await?;

let app = Router::new().route("/metrics", get(scrape));
let app = Router::new()
.route("/metrics", get(scrape))
.with_state(opts);

axum::serve(listener, app).await?;

Ok(())
}

async fn scrape() -> Result<Response<Body>, Error> {
async fn scrape(State(state): State<CommandOpts>) -> Result<Response<Body>, Error> {
let mut output = vec![];

let lctl = Command::new("lctl")
.arg("get_param")
.args(parser::params())
.args(if state.disable_jobstats {
parser::params_no_jobstats()
} else {
parser::params()
})
.kill_on_drop(true)
.output()
.await?;
Expand Down

0 comments on commit dbdd115

Please sign in to comment.