Skip to content

Commit

Permalink
<feature> shim: Complete the metrics collection
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzzzzzzy9 committed May 27, 2024
1 parent 200f788 commit 7ced6d4
Showing 1 changed file with 326 additions and 13 deletions.
339 changes: 326 additions & 13 deletions crates/shim/src/cgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,32 @@ use std::{
};

use cgroups_rs::{
cgroup::get_cgroups_relative_paths_by_pid, hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem,
blkio::{BlkIoData, IoService},
cgroup::get_cgroups_relative_paths_by_pid,
hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem,
};
use containerd_shim_protos::{
cgroups::metrics::{CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics, PidsStat, Throttle},
cgroups::metrics::{
BlkIOEntry, BlkIOStat, CPUStat, CPUUsage, HugetlbStat, MemoryEntry, MemoryStat, Metrics,
PidsStat, Throttle,
},
protobuf::{well_known_types::any::Any, Message},
shim::oci::Options,
};
use log::debug;
use oci_spec::runtime::LinuxResources;

use crate::error::{Error, Result};

// OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10
const OOM_SCORE_ADJ_MAX: i64 = 1000;
const BLKIO_DEVICES_INFO_PATH: &str = "/proc/partitions";

#[derive(Debug)]
struct DeviceKey {
major: u64,
minor: u64,
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "Info"))]
pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> {
Expand Down Expand Up @@ -102,15 +115,194 @@ fn write_process_oom_score(pid: u32, score: i64) -> Result<()> {
.map_err(io_error!(e, "write oom score"))
}

/// Collect process cgroup stats, return only necessary parts of it
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn collect_metrics(pid: u32) -> Result<Metrics> {
let cgroup = get_cgroup(pid)?;
if cgroup.v2() {
collect_metrics_v2(pid)
} else {
collect_metrics_v1(pid)
}
}

/// Collect process cgroup stats, return only necessary parts of it
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn collect_metrics_v1(pid: u32) -> Result<Metrics> {
let mut metrics = Metrics::new();

let cgroup = get_cgroup(pid)?;

// to make it easy, fill the necessary metrics only.
for sub_system in Cgroup::subsystems(&cgroup) {
for sub_system in cgroup.subsystems() {
match sub_system {
Subsystem::Cpu(cpu_ctr) => {
let mut throttle = Throttle::new();
let stat = cpu_ctr.cpu().stat;
for line in stat.lines() {
let parts = line.split(' ').collect::<Vec<&str>>();
if parts.len() != 2 {
Err(Error::Other(format!("invalid cpu stat line: {}", line)))?;
}

// https://github.com/opencontainers/runc/blob/dbe8434359ca35af1c1e10df42b1f4391c1e1010/libcontainer/cgroups/fs2/cpu.go#L70
match parts[0] {
"nr_periods" => {
throttle.set_periods(parts[1].parse::<u64>().unwrap());
}
"nr_throttled" => {
throttle.set_throttled_periods(parts[1].parse::<u64>().unwrap());
}
"throttled_time" => {
throttle.set_throttled_time(parts[1].parse::<u64>().unwrap());
}
_ => {}
}
}
let mut cpu_stats = CPUStat::new();
cpu_stats.set_throttling(throttle);
metrics.set_cpu(cpu_stats);
}
Subsystem::Mem(mem_ctr) => {
let mut mem_stat = MemoryStat::new();

// Memory
let mem = mem_ctr.memory_stat();
let mut mem_entry = MemoryEntry::new();
mem_entry.set_usage(mem.usage_in_bytes);
mem_entry.set_max(mem.max_usage_in_bytes);
mem_entry.set_failcnt(mem.fail_cnt);
if mem.limit_in_bytes < 0 {
mem_entry.set_limit(u64::MAX);
} else {
mem_entry.set_limit(mem.limit_in_bytes as u64);
}

// add Memory
mem_stat.set_usage(mem_entry);
mem_stat.set_total_inactive_file(mem.stat.total_inactive_file);
// kmem
let kmem = mem_ctr.kmem_stat();
let mut kmem_entry = MemoryEntry::new();
kmem_entry.set_usage(kmem.usage_in_bytes);
kmem_entry.set_max(kmem.max_usage_in_bytes);
kmem_entry.set_failcnt(kmem.fail_cnt);
if kmem.limit_in_bytes < 0 {
kmem_entry.set_limit(u64::MAX);
} else {
kmem_entry.set_limit(kmem.limit_in_bytes as u64);
}
// add kmem
mem_stat.set_kernel(kmem_entry);

// memswap
let memsw = mem_ctr.memswap();
let mut memsw_entry = MemoryEntry::new();
memsw_entry.set_usage(memsw.usage_in_bytes);
memsw_entry.set_max(memsw.max_usage_in_bytes);
memsw_entry.set_failcnt(memsw.fail_cnt);
if memsw.limit_in_bytes < 0 {
memsw_entry.set_limit(u64::MAX);
} else {
memsw_entry.set_limit(memsw.limit_in_bytes as u64);
}
// add Memory
mem_stat.set_swap(memsw_entry);

//kernel_tcp
let tcp = mem_ctr.kmem_tcp_stat();
let mut tcp_entry = MemoryEntry::new();
tcp_entry.set_usage(tcp.usage_in_bytes);
tcp_entry.set_max(tcp.max_usage_in_bytes);
tcp_entry.set_failcnt(tcp.fail_cnt);
if tcp.limit_in_bytes < 0 {
tcp_entry.set_limit(u64::MAX);
} else {
tcp_entry.set_limit(tcp.limit_in_bytes as u64);
}
// add Memory
mem_stat.set_kernel_tcp(tcp_entry);
metrics.set_memory(mem_stat);
}
Subsystem::Pid(pid_ctr) => {
let mut pid_stats = PidsStat::new();
pid_stats.set_current(
pid_ctr
.get_pid_current()
.map_err(other_error!(e, "get current pid"))?,
);
pid_stats.set_limit(
pid_ctr
.get_pid_max()
.map(|val| match val {
// See https://github.com/opencontainers/runc/blob/dbe8434359ca35af1c1e10df42b1f4391c1e1010/libcontainer/cgroups/fs/pids.go#L55
cgroups_rs::MaxValue::Max => 0,
cgroups_rs::MaxValue::Value(val) => val as u64,
})
.map_err(other_error!(e, "get pid limit"))?,
);
metrics.set_pids(pid_stats)
}
Subsystem::HugeTlb(ht_ctr) => {
let mut ht_stats: Vec<HugetlbStat> = Vec::new();
let hp_sizes = ht_ctr.get_sizes();
for hp_size in hp_sizes {
let mut ht_stat = HugetlbStat::new();
ht_stat.set_pagesize(hp_size.clone());
ht_stat.set_usage(ht_ctr.usage_in_bytes(hp_size.as_str()).unwrap());
ht_stat.set_max(ht_ctr.max_usage_in_bytes(hp_size.as_str()).unwrap());
ht_stat.set_failcnt(ht_ctr.failcnt(hp_size.as_str()).unwrap());
ht_stats.push(ht_stat);
}
metrics.set_hugetlb(ht_stats);
}
Subsystem::BlkIo(blkio_ctr) => {
let mut blkio_stat = BlkIOStat::new();
let blkio = blkio_ctr.blkio();
blkio_stat.set_io_merged_recursive(set_blkio_entries(&blkio.io_merged_recursive));
blkio_stat.set_io_queued_recursive(set_blkio_entries(&blkio.io_queued_recursive));
blkio_stat.set_io_service_bytes_recursive(set_blkio_entries(
&blkio.io_service_bytes_recursive,
));
blkio_stat.set_io_service_time_recursive(set_blkio_entries(
&blkio.io_service_time_recursive,
));
blkio_stat
.set_io_serviced_recursive(set_blkio_entries(&blkio.io_serviced_recursive));
blkio_stat
.set_io_wait_time_recursive(set_blkio_entries(&blkio.io_wait_time_recursive));
blkio_stat.set_sectors_recursive(set_blkio_entries_from_blkiodata(
&blkio.sectors_recursive,
));
blkio_stat
.set_io_time_recursive(set_blkio_entries_from_blkiodata(&blkio.time_recursive));

// only when blkio_stat is empty, use throttle.
if blkio_stat_is_empty(&blkio_stat) {
blkio_stat
.set_io_serviced_recursive(set_blkio_entries(&blkio.throttle.io_serviced));
blkio_stat.set_io_service_bytes_recursive(set_blkio_entries(
&blkio.throttle.io_service_bytes,
));
}
metrics.set_blkio(blkio_stat);
}
// TODO: rdma. lib cgroup-rs's rdma is not completed, waiting for it.
// https://github.com/kata-containers/cgroups-rs/pull/135
_ => {}
}
debug!("container metrics = {:?}", metrics);
}
Ok(metrics)
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn collect_metrics_v2(pid: u32) -> Result<Metrics> {
let mut metrics = Metrics::new();

let cgroup = get_cgroup(pid)?;

// to make it easy, fill the necessary metrics only.
for sub_system in cgroup.subsystems() {
match sub_system {
Subsystem::Cpu(cpu_ctr) => {
let mut cpu_usage = CPUUsage::new();
Expand Down Expand Up @@ -150,15 +342,8 @@ pub fn collect_metrics(pid: u32) -> Result<Metrics> {
cpu_stats.set_usage(cpu_usage);
metrics.set_cpu(cpu_stats);
}
Subsystem::Mem(mem_ctr) => {
let mem = mem_ctr.memory_stat();
let mut mem_entry = MemoryEntry::new();
mem_entry.set_usage(mem.usage_in_bytes);
let mut mem_stat = MemoryStat::new();
mem_stat.set_usage(mem_entry);
mem_stat.set_total_inactive_file(mem.stat.total_inactive_file);
metrics.set_memory(mem_stat);
}
// TODO: other cgroup v2
Subsystem::Mem(_mem_ctr) => {}
Subsystem::Pid(pid_ctr) => {
let mut pid_stats = PidsStat::new();
pid_stats.set_current(
Expand All @@ -184,6 +369,134 @@ pub fn collect_metrics(pid: u32) -> Result<Metrics> {
Ok(metrics)
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn get_devices(device_key: &DeviceKey) -> Result<String> {
let blkio_devices_path = Path::new(BLKIO_DEVICES_INFO_PATH);
match fs::File::open(blkio_devices_path) {
Err(e) => Err(
Error::IoError {
context: "open blkio_devices_path: /proc/partitions error".to_string(),
err: e,
}, // Error::NotFoundError("file not found: ".to_string() + blkio_devices_path.display().to_string().as_str()),
),
Ok(mut file) => {
let mut string = String::new();
match file.read_to_string(&mut string) {
Ok(_) => {
let lines = string.trim().split('\n').collect::<Vec<&str>>();
for line in lines {
let s = line.split_whitespace().collect::<Vec<&str>>();
if s.len() < 4 {
continue;
}
if *s[0] == device_key.major.to_string()
&& *s[1] == device_key.minor.to_string()
{
return Ok(s[3].to_string());
}
}
// if we don't find the device in /proc/partitions, just return a nil string.
Ok("".to_string())
}
Err(e) => Err(Error::Other(
"cannot read file: ".to_string()
+ BLKIO_DEVICES_INFO_PATH
+ "error: "
+ e.to_string().as_str(),
)),
}
}
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn blkio_stat_is_empty(blkio_stat: &BlkIOStat) -> bool {
blkio_stat.io_service_bytes_recursive.is_empty()
&& blkio_stat.io_serviced_recursive.is_empty()
&& blkio_stat.io_queued_recursive.is_empty()
&& blkio_stat.io_service_time_recursive.is_empty()
&& blkio_stat.io_wait_time_recursive.is_empty()
&& blkio_stat.io_merged_recursive.is_empty()
&& blkio_stat.io_time_recursive.is_empty()
&& blkio_stat.sectors_recursive.is_empty()
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn set_blkio_entries_from_blkiodata(blkio_datas: &[BlkIoData]) -> Vec<BlkIOEntry> {
blkio_datas
.iter()
.map(|blkio_data| {
let mut blkio_entry = BlkIOEntry::new();
blkio_entry.set_major(blkio_data.major as u64);
blkio_entry.set_minor(blkio_data.minor as u64);
blkio_entry.set_device(
get_devices(&DeviceKey {
major: blkio_data.major as u64,
minor: blkio_data.minor as u64,
})
.unwrap(),
);
blkio_entry.set_value(blkio_data.data);
blkio_entry
})
.collect::<Vec<BlkIOEntry>>()
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn set_blkio_entries(io_services: &[IoService]) -> Vec<BlkIOEntry> {
let operations = [
(
"Read",
Box::new(|io_service: &IoService| -> u64 { io_service.read })
as Box<dyn Fn(&IoService) -> u64>,
),
(
"Write",
Box::new(|io_service: &IoService| -> u64 { io_service.write })
as Box<dyn Fn(&IoService) -> u64>,
),
(
"Discard",
Box::new(|io_service: &IoService| -> u64 { io_service.discard })
as Box<dyn Fn(&IoService) -> u64>,
),
(
"Async",
Box::new(|io_service: &IoService| -> u64 { io_service.r#async })
as Box<dyn Fn(&IoService) -> u64>,
),
(
"Sync",
Box::new(|io_service: &IoService| -> u64 { io_service.sync })
as Box<dyn Fn(&IoService) -> u64>,
),
];

io_services
.iter()
.flat_map(|io_service| {
operations
.iter()
.map(|(op, value)| {
let mut entry = BlkIOEntry::new();
entry.set_major(io_service.major as u64);
entry.set_minor(io_service.minor as u64);
entry.set_op(op.to_string());
entry.set_value(value(io_service));
entry.set_device(
get_devices(&DeviceKey {
major: io_service.major as u64,
minor: io_service.minor as u64,
})
.unwrap(),
);
entry
})
.collect::<Vec<BlkIOEntry>>()
})
.collect::<Vec<BlkIOEntry>>()
}

// get_cgroup will return either cgroup v1 or v2 depending on system configuration
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn get_cgroup(pid: u32) -> Result<Cgroup> {
Expand Down

0 comments on commit 7ced6d4

Please sign in to comment.