From 7ced6d41fcfd4e7f795d190bfa275a6a961e111e Mon Sep 17 00:00:00 2001 From: zzzzzzzzzy9 Date: Mon, 27 May 2024 16:42:52 +0800 Subject: [PATCH] shim: Complete the metrics collection --- crates/shim/src/cgroup.rs | 339 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 326 insertions(+), 13 deletions(-) diff --git a/crates/shim/src/cgroup.rs b/crates/shim/src/cgroup.rs index 1df260f3..f0a82115 100644 --- a/crates/shim/src/cgroup.rs +++ b/crates/shim/src/cgroup.rs @@ -23,19 +23,32 @@ use std::{ }; use cgroups_rs::{ - cgroup::get_cgroups_relative_paths_by_pid, hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem, + blkio::{BlkIoData, IoService}, + cgroup::get_cgroups_relative_paths_by_pid, + hierarchies, Cgroup, CgroupPid, MaxValue, Subsystem, }; use containerd_shim_protos::{ - cgroups::metrics::{CPUStat, CPUUsage, MemoryEntry, MemoryStat, Metrics, PidsStat, Throttle}, + cgroups::metrics::{ + BlkIOEntry, BlkIOStat, CPUStat, CPUUsage, HugetlbStat, MemoryEntry, MemoryStat, Metrics, + PidsStat, Throttle, + }, protobuf::{well_known_types::any::Any, Message}, shim::oci::Options, }; +use log::debug; use oci_spec::runtime::LinuxResources; use crate::error::{Error, Result}; // OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10 const OOM_SCORE_ADJ_MAX: i64 = 1000; +const BLKIO_DEVICES_INFO_PATH: &str = "/proc/partitions"; + +#[derive(Debug)] +struct DeviceKey { + major: u64, + minor: u64, +} #[cfg_attr(feature = "tracing", tracing::instrument(level = "Info"))] pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> { @@ -102,15 +115,194 @@ fn write_process_oom_score(pid: u32, score: i64) -> Result<()> { .map_err(io_error!(e, "write oom score")) } -/// Collect process cgroup stats, return only necessary parts of it #[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] pub fn collect_metrics(pid: u32) -> Result { + let cgroup = get_cgroup(pid)?; + if cgroup.v2() { + collect_metrics_v2(pid) + } else { + collect_metrics_v1(pid) + } +} + +/// Collect process cgroup stats, return only necessary parts of it +#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] +pub fn collect_metrics_v1(pid: u32) -> Result { let mut metrics = Metrics::new(); let cgroup = get_cgroup(pid)?; // to make it easy, fill the necessary metrics only. - for sub_system in Cgroup::subsystems(&cgroup) { + for sub_system in cgroup.subsystems() { + match sub_system { + Subsystem::Cpu(cpu_ctr) => { + let mut throttle = Throttle::new(); + let stat = cpu_ctr.cpu().stat; + for line in stat.lines() { + let parts = line.split(' ').collect::>(); + if parts.len() != 2 { + Err(Error::Other(format!("invalid cpu stat line: {}", line)))?; + } + + // https://github.com/opencontainers/runc/blob/dbe8434359ca35af1c1e10df42b1f4391c1e1010/libcontainer/cgroups/fs2/cpu.go#L70 + match parts[0] { + "nr_periods" => { + throttle.set_periods(parts[1].parse::().unwrap()); + } + "nr_throttled" => { + throttle.set_throttled_periods(parts[1].parse::().unwrap()); + } + "throttled_time" => { + throttle.set_throttled_time(parts[1].parse::().unwrap()); + } + _ => {} + } + } + let mut cpu_stats = CPUStat::new(); + cpu_stats.set_throttling(throttle); + metrics.set_cpu(cpu_stats); + } + Subsystem::Mem(mem_ctr) => { + let mut mem_stat = MemoryStat::new(); + + // Memory + let mem = mem_ctr.memory_stat(); + let mut mem_entry = MemoryEntry::new(); + mem_entry.set_usage(mem.usage_in_bytes); + mem_entry.set_max(mem.max_usage_in_bytes); + mem_entry.set_failcnt(mem.fail_cnt); + if mem.limit_in_bytes < 0 { + mem_entry.set_limit(u64::MAX); + } else { + mem_entry.set_limit(mem.limit_in_bytes as u64); + } + + // add Memory + mem_stat.set_usage(mem_entry); + mem_stat.set_total_inactive_file(mem.stat.total_inactive_file); + // kmem + let kmem = mem_ctr.kmem_stat(); + let mut kmem_entry = MemoryEntry::new(); + kmem_entry.set_usage(kmem.usage_in_bytes); + kmem_entry.set_max(kmem.max_usage_in_bytes); + kmem_entry.set_failcnt(kmem.fail_cnt); + if kmem.limit_in_bytes < 0 { + kmem_entry.set_limit(u64::MAX); + } else { + kmem_entry.set_limit(kmem.limit_in_bytes as u64); + } + // add kmem + mem_stat.set_kernel(kmem_entry); + + // memswap + let memsw = mem_ctr.memswap(); + let mut memsw_entry = MemoryEntry::new(); + memsw_entry.set_usage(memsw.usage_in_bytes); + memsw_entry.set_max(memsw.max_usage_in_bytes); + memsw_entry.set_failcnt(memsw.fail_cnt); + if memsw.limit_in_bytes < 0 { + memsw_entry.set_limit(u64::MAX); + } else { + memsw_entry.set_limit(memsw.limit_in_bytes as u64); + } + // add Memory + mem_stat.set_swap(memsw_entry); + + //kernel_tcp + let tcp = mem_ctr.kmem_tcp_stat(); + let mut tcp_entry = MemoryEntry::new(); + tcp_entry.set_usage(tcp.usage_in_bytes); + tcp_entry.set_max(tcp.max_usage_in_bytes); + tcp_entry.set_failcnt(tcp.fail_cnt); + if tcp.limit_in_bytes < 0 { + tcp_entry.set_limit(u64::MAX); + } else { + tcp_entry.set_limit(tcp.limit_in_bytes as u64); + } + // add Memory + mem_stat.set_kernel_tcp(tcp_entry); + metrics.set_memory(mem_stat); + } + Subsystem::Pid(pid_ctr) => { + let mut pid_stats = PidsStat::new(); + pid_stats.set_current( + pid_ctr + .get_pid_current() + .map_err(other_error!(e, "get current pid"))?, + ); + pid_stats.set_limit( + pid_ctr + .get_pid_max() + .map(|val| match val { + // See https://github.com/opencontainers/runc/blob/dbe8434359ca35af1c1e10df42b1f4391c1e1010/libcontainer/cgroups/fs/pids.go#L55 + cgroups_rs::MaxValue::Max => 0, + cgroups_rs::MaxValue::Value(val) => val as u64, + }) + .map_err(other_error!(e, "get pid limit"))?, + ); + metrics.set_pids(pid_stats) + } + Subsystem::HugeTlb(ht_ctr) => { + let mut ht_stats: Vec = Vec::new(); + let hp_sizes = ht_ctr.get_sizes(); + for hp_size in hp_sizes { + let mut ht_stat = HugetlbStat::new(); + ht_stat.set_pagesize(hp_size.clone()); + ht_stat.set_usage(ht_ctr.usage_in_bytes(hp_size.as_str()).unwrap()); + ht_stat.set_max(ht_ctr.max_usage_in_bytes(hp_size.as_str()).unwrap()); + ht_stat.set_failcnt(ht_ctr.failcnt(hp_size.as_str()).unwrap()); + ht_stats.push(ht_stat); + } + metrics.set_hugetlb(ht_stats); + } + Subsystem::BlkIo(blkio_ctr) => { + let mut blkio_stat = BlkIOStat::new(); + let blkio = blkio_ctr.blkio(); + blkio_stat.set_io_merged_recursive(set_blkio_entries(&blkio.io_merged_recursive)); + blkio_stat.set_io_queued_recursive(set_blkio_entries(&blkio.io_queued_recursive)); + blkio_stat.set_io_service_bytes_recursive(set_blkio_entries( + &blkio.io_service_bytes_recursive, + )); + blkio_stat.set_io_service_time_recursive(set_blkio_entries( + &blkio.io_service_time_recursive, + )); + blkio_stat + .set_io_serviced_recursive(set_blkio_entries(&blkio.io_serviced_recursive)); + blkio_stat + .set_io_wait_time_recursive(set_blkio_entries(&blkio.io_wait_time_recursive)); + blkio_stat.set_sectors_recursive(set_blkio_entries_from_blkiodata( + &blkio.sectors_recursive, + )); + blkio_stat + .set_io_time_recursive(set_blkio_entries_from_blkiodata(&blkio.time_recursive)); + + // only when blkio_stat is empty, use throttle. + if blkio_stat_is_empty(&blkio_stat) { + blkio_stat + .set_io_serviced_recursive(set_blkio_entries(&blkio.throttle.io_serviced)); + blkio_stat.set_io_service_bytes_recursive(set_blkio_entries( + &blkio.throttle.io_service_bytes, + )); + } + metrics.set_blkio(blkio_stat); + } + // TODO: rdma. lib cgroup-rs's rdma is not completed, waiting for it. + // https://github.com/kata-containers/cgroups-rs/pull/135 + _ => {} + } + debug!("container metrics = {:?}", metrics); + } + Ok(metrics) +} + +#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] +pub fn collect_metrics_v2(pid: u32) -> Result { + let mut metrics = Metrics::new(); + + let cgroup = get_cgroup(pid)?; + + // to make it easy, fill the necessary metrics only. + for sub_system in cgroup.subsystems() { match sub_system { Subsystem::Cpu(cpu_ctr) => { let mut cpu_usage = CPUUsage::new(); @@ -150,15 +342,8 @@ pub fn collect_metrics(pid: u32) -> Result { cpu_stats.set_usage(cpu_usage); metrics.set_cpu(cpu_stats); } - Subsystem::Mem(mem_ctr) => { - let mem = mem_ctr.memory_stat(); - let mut mem_entry = MemoryEntry::new(); - mem_entry.set_usage(mem.usage_in_bytes); - let mut mem_stat = MemoryStat::new(); - mem_stat.set_usage(mem_entry); - mem_stat.set_total_inactive_file(mem.stat.total_inactive_file); - metrics.set_memory(mem_stat); - } + // TODO: other cgroup v2 + Subsystem::Mem(_mem_ctr) => {} Subsystem::Pid(pid_ctr) => { let mut pid_stats = PidsStat::new(); pid_stats.set_current( @@ -184,6 +369,134 @@ pub fn collect_metrics(pid: u32) -> Result { Ok(metrics) } +#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] +fn get_devices(device_key: &DeviceKey) -> Result { + let blkio_devices_path = Path::new(BLKIO_DEVICES_INFO_PATH); + match fs::File::open(blkio_devices_path) { + Err(e) => Err( + Error::IoError { + context: "open blkio_devices_path: /proc/partitions error".to_string(), + err: e, + }, // Error::NotFoundError("file not found: ".to_string() + blkio_devices_path.display().to_string().as_str()), + ), + Ok(mut file) => { + let mut string = String::new(); + match file.read_to_string(&mut string) { + Ok(_) => { + let lines = string.trim().split('\n').collect::>(); + for line in lines { + let s = line.split_whitespace().collect::>(); + if s.len() < 4 { + continue; + } + if *s[0] == device_key.major.to_string() + && *s[1] == device_key.minor.to_string() + { + return Ok(s[3].to_string()); + } + } + // if we don't find the device in /proc/partitions, just return a nil string. + Ok("".to_string()) + } + Err(e) => Err(Error::Other( + "cannot read file: ".to_string() + + BLKIO_DEVICES_INFO_PATH + + "error: " + + e.to_string().as_str(), + )), + } + } + } +} + +#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] +fn blkio_stat_is_empty(blkio_stat: &BlkIOStat) -> bool { + blkio_stat.io_service_bytes_recursive.is_empty() + && blkio_stat.io_serviced_recursive.is_empty() + && blkio_stat.io_queued_recursive.is_empty() + && blkio_stat.io_service_time_recursive.is_empty() + && blkio_stat.io_wait_time_recursive.is_empty() + && blkio_stat.io_merged_recursive.is_empty() + && blkio_stat.io_time_recursive.is_empty() + && blkio_stat.sectors_recursive.is_empty() +} + +#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] +fn set_blkio_entries_from_blkiodata(blkio_datas: &[BlkIoData]) -> Vec { + blkio_datas + .iter() + .map(|blkio_data| { + let mut blkio_entry = BlkIOEntry::new(); + blkio_entry.set_major(blkio_data.major as u64); + blkio_entry.set_minor(blkio_data.minor as u64); + blkio_entry.set_device( + get_devices(&DeviceKey { + major: blkio_data.major as u64, + minor: blkio_data.minor as u64, + }) + .unwrap(), + ); + blkio_entry.set_value(blkio_data.data); + blkio_entry + }) + .collect::>() +} + +#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] +fn set_blkio_entries(io_services: &[IoService]) -> Vec { + let operations = [ + ( + "Read", + Box::new(|io_service: &IoService| -> u64 { io_service.read }) + as Box u64>, + ), + ( + "Write", + Box::new(|io_service: &IoService| -> u64 { io_service.write }) + as Box u64>, + ), + ( + "Discard", + Box::new(|io_service: &IoService| -> u64 { io_service.discard }) + as Box u64>, + ), + ( + "Async", + Box::new(|io_service: &IoService| -> u64 { io_service.r#async }) + as Box u64>, + ), + ( + "Sync", + Box::new(|io_service: &IoService| -> u64 { io_service.sync }) + as Box u64>, + ), + ]; + + io_services + .iter() + .flat_map(|io_service| { + operations + .iter() + .map(|(op, value)| { + let mut entry = BlkIOEntry::new(); + entry.set_major(io_service.major as u64); + entry.set_minor(io_service.minor as u64); + entry.set_op(op.to_string()); + entry.set_value(value(io_service)); + entry.set_device( + get_devices(&DeviceKey { + major: io_service.major as u64, + minor: io_service.minor as u64, + }) + .unwrap(), + ); + entry + }) + .collect::>() + }) + .collect::>() +} + // get_cgroup will return either cgroup v1 or v2 depending on system configuration #[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))] fn get_cgroup(pid: u32) -> Result {