Skip to content

Commit

Permalink
Use zero-copy parser for jobstats
Browse files Browse the repository at this point in the history
  • Loading branch information
RDruon committed Jul 17, 2024
1 parent d444fef commit 92e5047
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 45 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ tracing-subscriber = {version = "0.3", features = ["env-filter"]}

[profile.release]
lto = true

[profile.test]
inherits = "release"
1 change: 1 addition & 0 deletions lustre-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ serde_yaml = "0.9"
thiserror = "1"
tracing-subscriber.workspace = true
tracing.workspace = true
memchr = "2.7.4"

[dev-dependencies]
include_dir.workspace = true
Expand Down
1 change: 0 additions & 1 deletion lustre-collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ fn check_output(records: Vec<Record>, state: &str) -> Result<Vec<Record>, Lustre
let params = crate::parser::params().join(" ");

if !state.is_empty() {
println!("Left : {}", state);
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Content left in input buffer. Please run and supply to support: `lctl get_param {params}`"),
Expand Down
38 changes: 30 additions & 8 deletions lustre-collector/src/mds/job_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,53 @@ use combine::{
error::{ParseError, StreamError},
optional,
parser::{
char::{alpha_num, newline},
repeat::take_until,
char::{alpha_num, newline}, range::{take_fn, TakeRange}, repeat::take_until
},
stream::{Stream, StreamErrorFor},
Parser,
Parser, RangeStream,
};

pub(crate) fn parse<I>() -> impl Parser<I, Output = Option<Vec<JobStatMdt>>>
fn memslice(needle: &[u8], haystack: &[u8]) -> Option<usize> {
let (&prefix, suffix) = match needle.split_first() {

Check warning on line 18 in lustre-collector/src/mds/job_stats.rs

View workflow job for this annotation

GitHub Actions / Check

unused variable: `suffix`

Check failure on line 18 in lustre-collector/src/mds/job_stats.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `suffix`

error: unused variable: `suffix` --> lustre-collector/src/mds/job_stats.rs:18:19 | 18 | let (&prefix, suffix) = match needle.split_first() { | ^^^^^^ help: if this is intentional, prefix it with an underscore: `_suffix` | = note: `-D unused-variables` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(unused_variables)]`

Check warning on line 18 in lustre-collector/src/mds/job_stats.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused variable: `suffix`
Some(x) => x,
None => return Some(0),
};
for i in memchr::memchr_iter(prefix, haystack) {
// println!("{:#?}", std::str::from_utf8(&haystack[i + 1..]));
if haystack[i + 1..].first().map(|x| x.is_ascii_alphanumeric()).unwrap_or(false) {
return Some(i);
}
}
Some(haystack.len())
}


pub(crate) fn parse<'a, I>() -> impl Parser<I, Output = Option<Vec<JobStatMdt>>> + 'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Range: AsRef<[u8]> + combine::stream::Range,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
(
optional(newline()), // If Jobstats are present, the whole yaml blob will be on a newline
take_until(attempt((newline(), alpha_num()).map(drop).or(eof()))),
// take_until(attempt((newline(), alpha_num()).map(drop).or(eof()))),
take_fn(move |haystack: I::Range| {
let needle = b"\n";
let haystack = haystack.as_ref();
match memslice(needle, haystack) {
Some(i) => TakeRange::Found(i),
None => TakeRange::NotFound(haystack.len().saturating_sub(needle.len() - 1)),
}
}),
)
.skip(optional(newline()))
.skip(optional(eof()))
.and_then(|(_, x): (_, String)| {
.and_then(|(_, x): (_, &str)| {
serde_yaml::from_str(&x)

Check failure on line 53 in lustre-collector/src/mds/job_stats.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

error: this expression creates a reference which is immediately dereferenced by the compiler --> lustre-collector/src/mds/job_stats.rs:53:34 | 53 | serde_yaml::from_str(&x) | ^^ help: change this to: `x` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `-D clippy::needless-borrow` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::needless_borrow)]`
.map(|x: JobStatsMdt| x.job_stats)
.map_err(StreamErrorFor::<I>::other)
})
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
10 changes: 5 additions & 5 deletions lustre-collector/src/mds/mdt_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use combine::{
error::ParseError,
parser::char::{newline, string},
stream::Stream,
Parser,
Parser, RangeStream,
};

pub(crate) const JOB_STATS: &str = "job_stats";
Expand All @@ -30,9 +30,9 @@ enum MdtStat {
ExportStats(Vec<ExportStats>),
}

fn mdt_stat<I>() -> impl Parser<I, Output = (Param, MdtStat)>
fn mdt_stat<'a, I>() -> impl Parser<I, Output = (Param, MdtStat)> + 'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
choice((
Expand Down Expand Up @@ -88,9 +88,9 @@ where
.message("while parsing target_name")
}

pub(crate) fn parse<I>() -> impl Parser<I, Output = Record>
pub(crate) fn parse<'a, I>() -> impl Parser<I, Output = Record> + 'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
(target_name(), mdt_stat())
Expand Down
6 changes: 3 additions & 3 deletions lustre-collector/src/mds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) mod mds_parser;
pub(crate) mod mdt_parser;

use crate::types::Record;
use combine::{attempt, error::ParseError, Parser, Stream};
use combine::{attempt, error::ParseError, Parser, RangeStream};

pub(crate) fn params() -> Vec<String> {
mds_parser::params()
Expand All @@ -28,9 +28,9 @@ pub(crate) fn params_jobstats_only() -> Vec<String> {
mdt_parser::params_jobstats_only()
}

pub(crate) fn parse<I>() -> impl Parser<I, Output = Record>
pub(crate) fn parse<'a, I>() -> impl Parser<I, Output = Record> + 'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
attempt(mds_parser::parse()).or(attempt(mdt_parser::parse()))
Expand Down
46 changes: 32 additions & 14 deletions lustre-collector/src/oss/job_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,48 @@

use crate::types::{JobStatOst, JobStatsOst};
use combine::{
attempt, eof,
error::{ParseError, StreamError},
optional,
parser::{
char::{alpha_num, newline},
repeat::take_until,
},
stream::{Stream, StreamErrorFor},
Parser,
attempt, eof, error::{ParseError, StreamError}, optional, parser::{

Check warning on line 7 in lustre-collector/src/oss/job_stats.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `alpha_num`, `attempt`, `repeat::take_until`

Check failure on line 7 in lustre-collector/src/oss/job_stats.rs

View workflow job for this annotation

GitHub Actions / clippy

unused imports: `alpha_num`, `attempt`, `repeat::take_until`

error: unused imports: `alpha_num`, `attempt`, `repeat::take_until` --> lustre-collector/src/oss/job_stats.rs:7:5 | 7 | attempt, eof, error::{ParseError, StreamError}, optional, parser::{ | ^^^^^^^ 8 | char::{alpha_num, newline}, range::{take_fn, TakeRange}, repeat::take_until | ^^^^^^^^^ ^^^^^^^^^^^^^^^^^^

Check warning on line 7 in lustre-collector/src/oss/job_stats.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `alpha_num`, `attempt`, `repeat::take_until`
char::{alpha_num, newline}, range::{take_fn, TakeRange}, repeat::take_until
}, stream::StreamErrorFor, Parser, RangeStream
};

pub(crate) fn parse<I>() -> impl Parser<I, Output = Option<Vec<JobStatOst>>>
fn memslice(needle: &[u8], haystack: &[u8]) -> Option<usize> {
let (&prefix, _) = match needle.split_first() {
Some(x) => x,
None => return Some(0),
};
for i in memchr::memchr_iter(prefix, haystack) {
// println!("{:#?}", std::str::from_utf8(&haystack[i + 1..]));
if haystack[i + 1..].first().map(|x| x.is_ascii_alphanumeric()).unwrap_or(false) {
return Some(i);
}
}
Some(haystack.len())
}


pub(crate) fn parse<'a, I>() -> impl Parser<I, Output = Option<Vec<JobStatOst>>> + 'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Range: AsRef<[u8]> + combine::stream::Range,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
(
optional(newline()), // If Jobstats are present, the whole yaml blob will be on a newline
take_until(attempt((newline(), alpha_num()).map(drop).or(eof()))),
// take_until(attempt((newline(), alpha_num()).map(drop).or(eof()))),
take_fn(move |haystack: I::Range| {
let needle = b"\n";
let haystack = haystack.as_ref();
match memslice(needle, haystack) {
Some(i) => TakeRange::Found(i),
None => TakeRange::NotFound(haystack.len().saturating_sub(needle.len() - 1)),
}
}),
)
.skip(optional(newline()))
.skip(optional(eof()))
.and_then(|(_, x): (_, String)| {
serde_yaml::from_str(&x)
.and_then(|(_, x): (_, &str)| {
serde_yaml::from_str(x)
.map(|x: JobStatsOst| x.job_stats)
.map_err(StreamErrorFor::<I>::other)
})
Expand Down
6 changes: 3 additions & 3 deletions lustre-collector/src/oss/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub(crate) mod obdfilter_parser;
pub(crate) mod oss_parser;

use crate::types::Record;
use combine::{attempt, error::ParseError, Parser, Stream};
use combine::{attempt, error::ParseError, Parser, RangeStream};

pub(crate) fn params() -> Vec<String> {
obdfilter_parser::obd_params()
Expand All @@ -27,9 +27,9 @@ pub(crate) fn params_jobstats_only() -> Vec<String> {
obdfilter_parser::obd_params_jobstats_only()
}

pub(crate) fn parse<I>() -> impl Parser<I, Output = Record>
pub(crate) fn parse<'a, I>() -> impl Parser<I, Output = Record> + 'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
attempt(obdfilter_parser::parse()).or(attempt(oss_parser::parse()))
Expand Down
10 changes: 5 additions & 5 deletions lustre-collector/src/oss/obdfilter_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use combine::{
error::ParseError,
parser::char::{newline, string},
stream::Stream,
Parser,
Parser, RangeStream,
};

pub(crate) const JOBSTATS: &str = "job_stats";
Expand Down Expand Up @@ -95,9 +95,9 @@ enum ObdfilterStat {
TotPending(u64),
}

fn obdfilter_stat<I>() -> impl Parser<I, Output = (Param, ObdfilterStat)>
fn obdfilter_stat<'a, I>() -> impl Parser<I, Output = (Param, ObdfilterStat)> +'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
choice((
Expand Down Expand Up @@ -130,9 +130,9 @@ where
.message("while parsing obdfilter")
}

pub(crate) fn parse<I>() -> impl Parser<I, Output = Record>
pub(crate) fn parse<'a, I>() -> impl Parser<I, Output = Record> +'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
(target_name(), obdfilter_stat())
Expand Down
8 changes: 4 additions & 4 deletions lustre-collector/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::{
mds::{self, client_count_parser},
mgs::mgs_parser,
osd_parser, oss, quota, top_level_parser,
types::Record,
types::Record, JobStatsOst,

Check warning on line 10 in lustre-collector/src/parser.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `JobStatsOst`

Check failure on line 10 in lustre-collector/src/parser.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `JobStatsOst`

error: unused import: `JobStatsOst` --> lustre-collector/src/parser.rs:10:20 | 10 | types::Record, JobStatsOst, | ^^^^^^^^^^^

Check warning on line 10 in lustre-collector/src/parser.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `JobStatsOst`
};
use combine::{choice, error::ParseError, many, Parser, Stream};
use combine::{choice, error::ParseError, many, Parser, RangeStream, Stream};

Check warning on line 12 in lustre-collector/src/parser.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `Stream`

Check failure on line 12 in lustre-collector/src/parser.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `Stream`

error: unused import: `Stream` --> lustre-collector/src/parser.rs:12:69 | 12 | use combine::{choice, error::ParseError, many, Parser, RangeStream, Stream}; | ^^^^^^

Check warning on line 12 in lustre-collector/src/parser.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `Stream`

pub fn params() -> Vec<String> {
top_level_parser::top_level_params()
Expand Down Expand Up @@ -48,9 +48,9 @@ pub fn params_jobstats_only() -> Vec<String> {
.collect()
}

pub fn parse<I>() -> impl Parser<I, Output = Vec<Record>>
pub fn parse<'a, I>() -> impl Parser<I, Output = Vec<Record>> + 'a
where
I: Stream<Token = char>,
I: RangeStream<Token = char, Range = &'a str> + 'a,
I::Error: ParseError<I::Token, I::Range, I::Position>,
{
many(choice((
Expand Down

0 comments on commit 92e5047

Please sign in to comment.