Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not print error on jobstats issue #73

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 60 additions & 32 deletions lustrefs-exporter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use serde::Deserialize;
use std::{
borrow::Cow,
convert::Infallible,
io::{self, BufReader},
io::{self, BufRead, BufReader},
net::SocketAddr,
};
use tokio::process::Command;
Expand Down Expand Up @@ -90,6 +90,58 @@ async fn main() -> Result<(), Error> {
}

async fn scrape(Query(params): Query<Params>) -> Result<Response<Body>, Error> {
let jobstats = if params.jobstats {
let child = tokio::task::spawn_blocking(move || {
let child = std::process::Command::new("lctl")
.arg("get_param")
.args(["obdfilter.*OST*.job_stats", "mdt.*.job_stats"])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;

Ok::<_, Error>(child)
})
.await?;

match child {
Ok(mut child) => {
let reader = BufReader::with_capacity(
128 * 1_024,
child.stdout.take().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"stdout missing for lctl jobstats call.",
))?,
);

let reader_stderr = BufReader::new(child.stderr.take().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"stderr missing for lctl jobstats call.",
))?);

tokio::task::spawn(async move {
for line in reader_stderr.lines().map_while(Result::ok) {
tracing::debug!("stderr: {}", line);
}
});

let (_, rx) = lustrefs_exporter::jobstats::jobstats_stream(reader);

let stream = ReceiverStream::new(rx)
.map(|x| Bytes::from_iter(x.into_bytes()))
.map(Ok::<_, Infallible>);

Some(stream)
}
Err(e) => {
tracing::debug!("Error while spawning lctl jobstats: {e}");

None
}
}
} else {
None
};

let mut output = vec![];

let lctl = Command::new("lctl")
Expand Down Expand Up @@ -125,46 +177,22 @@ async fn scrape(Query(params): Query<Params>) -> Result<Response<Body>, Error> {

output.append(&mut lnetctl_stats_record);

let s = if params.jobstats {
let reader = tokio::task::spawn_blocking(move || {
let mut lctl_jobstats = std::process::Command::new("lctl")
.arg("get_param")
.args(["obdfilter.*OST*.job_stats", "mdt.*.job_stats"])
.stdout(std::process::Stdio::piped())
.spawn()?;

let reader = BufReader::with_capacity(
128 * 1_024,
lctl_jobstats.stdout.take().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"stdout missing for lctl jobstats call.",
))?,
);

Ok::<_, Error>(reader)
})
.await??;

let (_, rx) = lustrefs_exporter::jobstats::jobstats_stream(reader);

let stream = ReceiverStream::new(rx)
.map(|x| Bytes::from_iter(x.into_bytes()))
.map(Ok);

let lustre_stats = Ok::<_, Infallible>(build_lustre_stats(output).into());
let lustre_stats = build_lustre_stats(output);

let merged = tokio_stream::StreamExt::merge(tokio_stream::once(lustre_stats), stream);
let body = if let Some(stream) = jobstats {
let merged =
tokio_stream::StreamExt::merge(stream, tokio_stream::once(Ok(lustre_stats.into())));

Body::from_stream(merged)
} else {
tracing::debug!("Jobstats is disabled");
tracing::debug!("Jobstats collection disabled");

Body::from(build_lustre_stats(output))
Body::from(lustre_stats)
};

let response_builder = Response::builder().status(StatusCode::OK);

let resp = response_builder.body(s)?;
let resp = response_builder.body(body)?;

Ok(resp)
}
Expand Down
Loading