diff --git a/Cargo.lock b/Cargo.lock index 1c626ef2..1fe92872 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,9 +145,9 @@ checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", @@ -344,6 +344,8 @@ dependencies = [ "diesel", "glob", "hex", + "hyper", + "hyperlocal", "lazy_static", "libc", "libsqlite3-sys", @@ -771,6 +773,17 @@ dependencies = [ "const-random", ] +[[package]] +name = "dns-mock-server" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadf0b53355eb10446bf1e4c203bdd1e366efe9f32d9cae90833c2125c32d67a" +dependencies = [ + "async-trait", + "hickory-server", + "tokio", +] + [[package]] name = "dns-parser" version = "0.8.0" @@ -846,6 +859,18 @@ dependencies = [ "tokio-tungstenite 0.21.0", ] +[[package]] +name = "enum-as-inner" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffccbb6966c05b32ef8fbac435df276c4ae4d3dc55a8cd0eb9745e6c12f546a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "env_logger" version = "0.10.2" @@ -1156,6 +1181,72 @@ dependencies = [ "itertools 0.4.19", ] +[[package]] +name = "hickory-proto" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07698b8420e2f0d6447a436ba999ec85d8fbf2a398bbd737b82cac4a2e96e512" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.4.0", + "ipnet", + "once_cell", + "rand", + "thiserror", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28757f23aa75c98f254cf0405e6d8c25b831b32921b050a66692427679b1f243" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot", + "rand", + "resolv-conf", + "smallvec", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "hickory-server" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be0e43c556b9b3fdb6c7c71a9a32153a2275d02419e3de809e520bfcfe40c37" +dependencies = [ + "async-trait", + "bytes", + "cfg-if", + "enum-as-inner", + "futures-util", + "hickory-proto", + "serde", + "thiserror", + "time", + "tokio", + "tokio-rustls 0.24.1", + "tokio-util", + "tracing", +] + [[package]] name = "home" version = "0.5.9" @@ -1165,6 +1256,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "http" version = "0.2.12" @@ -1290,6 +1392,16 @@ dependencies = [ "cc", ] +[[package]] +name = "idna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "0.5.0" @@ -1342,6 +1454,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "ipconfig" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" +dependencies = [ + "socket2", + "widestring", + "windows-sys 0.48.0", + "winreg", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -1460,7 +1584,7 @@ dependencies = [ "futures-io", "futures-util", "httpdate", - "idna", + "idna 0.5.0", "mime", "native-tls", "nom", @@ -1515,6 +1639,12 @@ dependencies = [ "threadpool", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1549,6 +1679,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matchit" version = "0.7.3" @@ -1658,7 +1803,9 @@ version = "0.8.1" dependencies = [ "bpf-builder", "bpf-common", + "dns-mock-server", "dns-parser", + "hickory-resolver", "log", "nix 0.27.1", "pulsar-core", @@ -2211,6 +2358,16 @@ dependencies = [ "winreg", ] +[[package]] +name = "resolv-conf" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00" +dependencies = [ + "hostname", + "quick-error", +] + [[package]] name = "ring" version = "0.17.8" @@ -2837,9 +2994,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" dependencies = [ "backtrace", "bytes", @@ -2866,9 +3023,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", @@ -3128,7 +3285,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", - "idna", + "idna 0.5.0", "percent-encoding", ] @@ -3313,6 +3470,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "widestring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index c3c68fcc..ab6875a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ cgroups-rs = { version = "0.3.4" } chrono = { version = "0.4.31" } clap = { version = "4.4.11", features = ["derive"] } comfy-table = "7.1.0" +dns-mock-server = "0.1.4" dns-parser = "0.8.0" diesel = { version = "2.1", features = ["sqlite"] } env_logger = "0.10.1" @@ -127,6 +128,7 @@ futures-util = "0.3.29" gethostname = "0.4.3" glob = "0.3.1" hex = "0.4.3" +hickory-resolver = "0.24.1" hyper = "0.14.28" hyperlocal = "0.8" indicatif = "0.17" diff --git a/crates/bpf-common/Cargo.toml b/crates/bpf-common/Cargo.toml index 493ff29a..57fe2d95 100644 --- a/crates/bpf-common/Cargo.toml +++ b/crates/bpf-common/Cargo.toml @@ -28,6 +28,8 @@ procfs = { workspace = true } libc = { workspace = true } glob = { workspace = true } hex = { workspace = true } +hyper = { workspace = true } +hyperlocal = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } lazy_static = { workspace = true } diff --git a/crates/bpf-common/src/containers/layers.rs b/crates/bpf-common/src/containers/layers.rs new file mode 100644 index 00000000..ba5adb06 --- /dev/null +++ b/crates/bpf-common/src/containers/layers.rs @@ -0,0 +1,311 @@ +use std::{ + fmt, + fs::{self, File}, + io::BufReader, + path::{Path, PathBuf}, + str::FromStr, +}; + +use hyper::{body, Client}; +use hyperlocal::{UnixClientExt, Uri as HyperlocalUri}; +use nix::unistd::Uid; +use serde::Deserialize; + +use super::ContainerError; + +const DOCKER_SOCKET: &str = "/var/run/docker.sock"; + +/// Docker API response for `image inspect` request. +#[derive(Debug, Deserialize)] +struct ImageInspect { + #[serde(rename = "GraphDriver")] + graph_driver: GraphDriver, +} + +#[derive(Debug, Deserialize)] +struct GraphDriver { + #[serde(rename = "Data")] + data: Option, + #[serde(rename = "Name")] + name: GraphDriverName, +} + +#[derive(Debug, Deserialize)] +struct GraphDriverData { + #[serde(rename = "LowerDir")] + lower_dir: Option, + #[serde(rename = "MergedDir")] + merged_dir: Option, + #[serde(rename = "UpperDir")] + upper_dir: Option, + #[serde(rename = "WorkDir")] + work_dir: Option, +} + +#[derive(Debug, Deserialize)] +enum GraphDriverName { + #[serde(rename = "btrfs")] + Btrfs, + #[serde(rename = "fuse-overlayfs")] + FuseOverlayfs, + #[serde(rename = "overlay2")] + Overlayfs, + #[serde(rename = "vfs")] + Vfs, + #[serde(rename = "zfs")] + Zfs, +} + +#[derive(Debug, Deserialize)] +struct LibpodLayer { + id: String, + parent: Option, +} + +impl fmt::Display for GraphDriverName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Btrfs => write!(f, "btrfs"), + Self::FuseOverlayfs => write!(f, "fuse-overlayfs"), + Self::Overlayfs => write!(f, "overlay2"), + Self::Vfs => write!(f, "vfs"), + Self::Zfs => write!(f, "zfs"), + } + } +} + +#[derive(Debug, Deserialize)] +struct ImageDbEntry { + rootfs: Rootfs, +} + +#[derive(Debug, Deserialize)] +struct Rootfs { + diff_ids: Vec, +} + +/// Returns a list of layer paths for the given Docker image ID. +pub(crate) async fn docker_layers(image_id: &str) -> Result, ContainerError> { + let client = Client::unix(); + let uri = HyperlocalUri::new(DOCKER_SOCKET, &format!("/images/{}/json", image_id)); + let uri: hyper::Uri = uri.into(); + + let response = + client + .get(uri.clone()) + .await + .map_err(|source| ContainerError::HyperRequest { + source, + uri: uri.clone(), + })?; + let body_bytes = + body::to_bytes(response) + .await + .map_err(|source| ContainerError::HyperResponse { + source, + uri: uri.clone(), + })?; + + let response: ImageInspect = serde_json::from_slice(&body_bytes) + .map_err(|source| ContainerError::ParseResponse { source, uri })?; + + match response.graph_driver.name { + GraphDriverName::Btrfs => docker_btrfs_layers(image_id), + GraphDriverName::Overlayfs => docker_overlayfs_layers(response.graph_driver.data), + _ => { + log::warn!( + "Docker graph driver {} is unsupported", + response.graph_driver.name + ); + Ok(Vec::new()) + } + } +} + +/// Returns a list of BTRFS layer paths for the given Docker image ID. +/// +/// The procedure for BTRFS is not straigthforward, since the `image inspect` +/// response doesn't have direct information about layer directories. It +/// consists of the following steps: +/// +/// 1. Using the given image ID, find an "imagedb entry". It's located in +/// `/var/lib/docker/image/btrfs/imagedb/content/sha256/`. +/// 2. Get the list of layer checksums from that entry. +/// 3. For each layer, check whether a "layerdb entry" exists. It's located +/// in `/var/lib/docker/image/btrfs/layerdb/sha256/`. The +/// layerdb directory contains a `cache-id` file. +/// 4. That `cache-id` file contains an ID of a BTRFS subvolume. The +/// subvolume can be found in `/var/lib/docker/btrfs/subvolumes/`. +fn docker_btrfs_layers(image_id: &str) -> Result, ContainerError> { + const DOCKER_IMAGEDB_PATH: &str = "/var/lib/docker/image/btrfs/imagedb/content/sha256/"; + const DOCKER_LAYERDB_PATH: &str = "/var/lib/docker/image/btrfs/layerdb/sha256/"; + const DOCKER_BTRFS_SUBVOL_PATH: &str = "/var/lib/docker/btrfs/subvolumes/"; + + let mut layers = Vec::new(); + + let path = PathBuf::from(DOCKER_IMAGEDB_PATH).join(image_id); + let file = File::open(&path).map_err(|source| ContainerError::ReadFile { + source, + path: path.clone(), + })?; + + let reader = BufReader::new(file); + let imagedb_entry: ImageDbEntry = serde_json::from_reader(reader) + .map_err(|source| ContainerError::ParseConfigFile { source, path })?; + + for layer_id in imagedb_entry.rootfs.diff_ids { + let layer_id = layer_id + .split(':') + .last() + .ok_or(ContainerError::InvalidLayerID(layer_id.clone()))?; + + let path = PathBuf::from(DOCKER_LAYERDB_PATH).join(layer_id); + if path.exists() { + let path = path.join("cache-id"); + let btrfs_subvol_id = fs::read_to_string(&path) + .map_err(|source| ContainerError::ReadFile { source, path })?; + let btrfs_subvol_path = PathBuf::from(DOCKER_BTRFS_SUBVOL_PATH).join(btrfs_subvol_id); + + layers.push(btrfs_subvol_path); + } + } + + Ok(layers) +} + +fn docker_overlayfs_layers( + graph_driver_data: Option, +) -> Result, ContainerError> { + let mut layers = Vec::new(); + + if let Some(graph_driver_data) = graph_driver_data { + if let Some(lower_dirs) = graph_driver_data.lower_dir { + for lower_dir in lower_dirs.split(':') { + // `PathBuf::from_str` is infallible. + layers.push(PathBuf::from_str(lower_dir).unwrap()); + } + } + if let Some(merged_dir) = graph_driver_data.merged_dir { + layers.push(merged_dir); + } + if let Some(upper_dir) = graph_driver_data.upper_dir { + layers.push(upper_dir); + } + if let Some(work_dir) = graph_driver_data.work_dir { + layers.push(work_dir); + } + } + + Ok(layers) +} + +/// For the given `top_layer`, return a vector which contains that layer and +/// all parents of it. In other words, return a vector of all layers associated +/// with an image. +pub(crate) fn podman_layers>( + top_layer_id: &str, + uid: Uid, + user_home: P, +) -> Result, ContainerError> { + let layer_store_path = + find_layer_store(uid, &user_home).ok_or(ContainerError::LayerStoreNotFound)?; + let layer_store_file = + File::open(&layer_store_path).map_err(|source| ContainerError::ReadFile { + source, + path: layer_store_path.clone(), + })?; + let reader = BufReader::new(layer_store_file); + + let overlay_dir = + find_overlay_dir(uid, &user_home).ok_or(ContainerError::OverlayDirNotFound)?; + + let mut layers = find_subdirs(overlay_dir.join(top_layer_id)); + let config_layers: Vec = + serde_json::from_reader(reader).map_err(|source| ContainerError::ParseConfigFile { + source, + path: layer_store_path.clone(), + })?; + let mut layer_id = top_layer_id; + let mut limit = config_layers.len(); + loop { + let pos = config_layers[..limit] + .iter() + .rev() + .position(|layer| layer.id == layer_id) + .ok_or(ContainerError::LayerNotFound(layer_id.to_string()))?; + let layer = &config_layers[pos]; + layers.extend(find_subdirs(overlay_dir.join(&layer.id))); + match layer.parent { + Some(ref parent) => { + layer_id = parent; + limit = pos; + } + None => break, + } + } + + Ok(layers) +} + +fn find_layer_store>(uid: Uid, user_home: P) -> Option { + const LIBPOD_LAYER_STORE_PATH: &str = "/var/lib/containers/storage/overlay-layers/layers.json"; + + let layer_store_path = if uid.is_root() { + PathBuf::from(LIBPOD_LAYER_STORE_PATH) + } else { + user_home + .as_ref() + .join(".local") + .join("share") + .join("containers") + .join("storage") + .join("overlay-layers") + .join("layers.json") + }; + + if !layer_store_path.exists() { + return None; + } + + Some(layer_store_path) +} + +fn find_overlay_dir>(uid: Uid, user_home: P) -> Option { + const OVERLAY_PATH: &str = "/var/lib/containers/storage/overlay"; + + let overlay_dir = if uid.is_root() { + PathBuf::from(OVERLAY_PATH) + } else { + user_home + .as_ref() + .join(".local") + .join("share") + .join("containers") + .join("storage") + .join("overlay") + }; + + if !overlay_dir.exists() { + return None; + } + + Some(overlay_dir) +} + +/// Returns all subdirectories of the given `parent_path`. +fn find_subdirs>(parent_path: P) -> Vec { + let mut subdirectories = Vec::new(); + + if parent_path.as_ref().is_dir() { + if let Ok(entries) = fs::read_dir(parent_path) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + subdirectories.push(path); + } + } + } + } + + subdirectories +} diff --git a/crates/bpf-common/src/containers/mod.rs b/crates/bpf-common/src/containers/mod.rs index 4b9ffcea..c1a9a808 100644 --- a/crates/bpf-common/src/containers/mod.rs +++ b/crates/bpf-common/src/containers/mod.rs @@ -18,6 +18,7 @@ use validatron::Validatron; use crate::parsing::procfs::ProcfsError; +pub mod layers; pub mod schema; #[derive(Error, Debug)] @@ -40,8 +41,26 @@ pub enum ContainerError { source: serde_json::error::Error, path: PathBuf, }, + #[error("parsing response from `{uri:?}` failed")] + ParseResponse { + #[source] + source: serde_json::error::Error, + uri: hyper::Uri, + }, #[error("path `{path}` is non-UTF-8")] PathNonUtf8 { path: PathBuf }, + #[error("failed to make a request to the UNIX socket `{uri:?}`")] + HyperRequest { + #[source] + source: hyper::Error, + uri: hyper::Uri, + }, + #[error("failed to parse a response from the UNIX socket `{uri:?}`")] + HyperResponse { + #[source] + source: hyper::Error, + uri: hyper::Uri, + }, #[error("could not connect to the database `{path:?}`")] SqliteConnection { #[source] @@ -52,6 +71,8 @@ pub enum ContainerError { ContainerNotFound { id: String }, #[error("could not find libpod image store")] ImageStoreNotFound, + #[error("could not find libpod layer store")] + LayerStoreNotFound, #[error("could not find container image `{id}` in `{path:?}`")] ImageNotFound { id: String, path: PathBuf }, #[error("parsing image digest {digest} failed")] @@ -78,6 +99,14 @@ pub enum ContainerError { BoltBucketNotFound(String), #[error("bolt key `{0}` not found")] BoltKeyNotFound(String), + #[error("Invalid layer ID: `{0}`")] + InvalidLayerID(String), + #[error("Invalid image digest: `{0}`")] + InvalidImageDigest(String), + #[error("layer {0} not found in the layer store")] + LayerNotFound(String), + #[error("could not find overlay directory")] + OverlayDirNotFound, } /// A container ID. @@ -128,6 +157,7 @@ struct LibpodConfig { struct LibpodImageConfig { id: String, digest: String, + layer: String, } /// Database schema of libpod. @@ -145,6 +175,8 @@ pub struct ContainerInfo { pub name: String, pub image: String, pub image_digest: String, + #[validatron(skip)] + pub layers: Vec, } impl fmt::Display for ContainerInfo { @@ -158,19 +190,19 @@ impl fmt::Display for ContainerInfo { } impl ContainerInfo { - pub fn from_container_id( + pub async fn from_container_id( container_id: ContainerId, uid: Uid, ) -> Result, ContainerError> { let info = match container_id { - ContainerId::Docker(id) => Self::from_docker_id(id), + ContainerId::Docker(id) => Self::from_docker_id(id).await, ContainerId::Libpod(id) => Self::from_libpod_id(id, uid), }; info.map(Some) } - fn from_docker_id(id: String) -> Result { + async fn from_docker_id(id: String) -> Result { const DOCKER_CONTAINERS_PATH: &str = "/var/lib/docker/containers"; let path = PathBuf::from(DOCKER_CONTAINERS_PATH) @@ -194,11 +226,27 @@ impl ContainerInfo { let image = config.config.image; let image_digest = config.image_digest; + // `image_digest` has format like: + // + // ``` + // sha256:1d34ffeaf190be23d3de5a8de0a436676b758f48f835c3a2d4768b798c15a7f1 + // ``` + // + // The unprefixed digest is used as an image ID. + let image_id = image_digest + .split(':') + .last() + .ok_or(ContainerError::InvalidImageDigest(image_digest.clone()))?; + + let layers = layers::docker_layers(image_id).await?; + log::debug!("found layer filesystems for container {id}: {layers:?}"); + Ok(Self { id, name, image, image_digest, + layers, }) } @@ -261,11 +309,15 @@ impl ContainerInfo { }, )?; + let layers = layers::podman_layers(&image.layer, uid, user_home)?; + log::debug!("found layer filesystems for container {id}: {layers:?}"); + Ok(Self { id, name: config.name, image: config.rootfs_image_name, image_digest: image.digest.clone(), + layers, }) } } diff --git a/crates/bpf-common/src/test_runner.rs b/crates/bpf-common/src/test_runner.rs index ce29d0aa..a5799746 100644 --- a/crates/bpf-common/src/test_runner.rs +++ b/crates/bpf-common/src/test_runner.rs @@ -135,6 +135,28 @@ impl TestRunner { expectations: Vec::new(), } } + + pub async fn run_async(mut self, trigger_program: F) -> TestResult + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let _program = self.ebpf.await.context("running eBPF").unwrap(); + // Run the triggering code + let start_time = Timestamp::now(); + trigger_program.await; + let end_time = Timestamp::now(); + // Wait ebpf to process pending events + tokio::time::sleep(MAX_TIMEOUT).await; + // Collect events + let events: Vec<_> = std::iter::from_fn(|| self.rx.try_recv().ok()).collect(); + TestResult { + start_time, + end_time, + events, + expectations: Vec::new(), + } + } } /// Simple BpfSender used to collect `bpf_common::program::Program` events. diff --git a/crates/modules/network-monitor/Cargo.toml b/crates/modules/network-monitor/Cargo.toml index e54cb9c7..41e31b6d 100644 --- a/crates/modules/network-monitor/Cargo.toml +++ b/crates/modules/network-monitor/Cargo.toml @@ -6,13 +6,19 @@ edition.workspace = true repository.workspace = true [features] -test-suite = ["bpf-common/test-utils"] +test-suite = [ + "bpf-common/test-utils", + "dns-mock-server", + "hickory-resolver" +] [dependencies] bpf-common = { workspace = true } pulsar-core = { workspace = true } tokio = { workspace = true, features = ["full"] } +dns-mock-server = { workspace = true, optional = true } +hickory-resolver = { workspace = true, optional = true } log = { workspace = true } nix = { workspace = true } dns-parser = { workspace = true } diff --git a/crates/modules/network-monitor/src/lib.rs b/crates/modules/network-monitor/src/lib.rs index f9f624ef..e7150ade 100644 --- a/crates/modules/network-monitor/src/lib.rs +++ b/crates/modules/network-monitor/src/lib.rs @@ -8,6 +8,10 @@ use bpf_common::{ ProgramBuilder, ProgramError, }; use nix::sys::socket::{SockaddrIn, SockaddrIn6}; +use pulsar_core::{ + event::{DnsAnswer, DnsQuestion}, + pdk::Payload, +}; const MODULE_NAME: &str = "network-monitor"; @@ -172,11 +176,44 @@ impl fmt::Display for NetworkEvent { } } +fn parse_dns(data: &[u8]) -> Option { + let dns = dns_parser::Packet::parse(data).ok()?; + let with_q = !dns.questions.is_empty(); + let with_a = !dns.answers.is_empty(); + + let mut questions = Vec::new(); + for q in dns.questions { + questions.push(DnsQuestion { + name: format!("{}", q.qname), + qtype: format!("{:?}", q.qtype), + qclass: format!("{:?}", q.qclass), + }); + } + + let mut answers = Vec::new(); + for a in dns.answers { + answers.push(DnsAnswer { + name: format!("{}", a.name), + class: format!("{:?}", a.cls), + ttl: a.ttl, + data: format!("{:?}", a.data), + }); + } + + if with_q && !with_a { + Some(Payload::DnsQuery { questions }) + } else if with_a { + Some(Payload::DnsResponse { answers, questions }) + } else { + None + } +} + pub mod pulsar { use super::*; use bpf_common::{parsing::IndexError, program::BpfEvent, BpfSenderWrapper}; use pulsar_core::{ - event::{DnsAnswer, DnsQuestion, Host}, + event::Host, pdk::{IntoPayload, ModuleContext, ModuleError, Payload, SimplePulsarModule}, }; @@ -314,36 +351,7 @@ pub mod pulsar { .ok()?; // Check wheter the payload contains any DNS data. - let dns = dns_parser::Packet::parse(data).ok()?; - let with_q = !dns.questions.is_empty(); - let with_a = !dns.answers.is_empty(); - - let mut questions = Vec::new(); - for q in dns.questions { - questions.push(DnsQuestion { - name: format!("{}", q.qname), - qtype: format!("{:?}", q.qtype), - qclass: format!("{:?}", q.qclass), - }); - } - - let mut answers = Vec::new(); - for a in dns.answers { - answers.push(DnsAnswer { - name: format!("{}", a.name), - class: format!("{:?}", a.cls), - ttl: a.ttl, - data: format!("{:?}", a.data), - }); - } - - if with_q && !with_a { - Some(Payload::DnsQuery { questions }) - } else if with_a { - Some(Payload::DnsResponse { answers, questions }) - } else { - None - } + parse_dns(data) } } @@ -351,7 +359,10 @@ pub mod pulsar { pub mod test_suite { use std::{ io::{Read, Write}, - net::{SocketAddr, TcpListener, TcpStream, UdpSocket}, + net::{ + IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, TcpListener, + TcpStream, UdpSocket, + }, time::Duration, }; @@ -359,10 +370,13 @@ pub mod test_suite { event_check, test_runner::{TestCase, TestReport, TestRunner, TestSuite}, }; + use dns_mock_server::Server; + use hickory_resolver::{config::*, TokioAsyncResolver}; use nix::{ libc::kill, unistd::{fork, ForkResult}, }; + use pulsar_core::pdk::Payload; use super::*; @@ -386,6 +400,8 @@ pub mod test_suite { tcp_ipv6_sendmsg_recvmsg(), close_ipv4(), close_ipv6(), + dns_ipv4(), + dns_ipv6(), ], } } @@ -654,4 +670,97 @@ pub mod test_suite { ) .report() } + + fn dns_ipv4() -> TestCase { + TestCase::new( + "dns_ipv4", + run_dns( + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 18110)), + "example.io".to_string(), + IpAddr::V4(Ipv4Addr::LOCALHOST), + "A".to_string(), + ), + ) + } + + fn dns_ipv6() -> TestCase { + TestCase::new( + "dns_ipv6", + run_dns( + SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 18110, 0, 0)), + "example.io".to_string(), + IpAddr::V6(Ipv6Addr::LOCALHOST), + "AAAA".to_string(), + ), + ) + } + + async fn run_dns( + addr: SocketAddr, + domain: String, + address: IpAddr, + expected_record_type: String, + ) -> TestReport { + // Create necessary copies for each non-terminal closure. + let dns_server_domain = domain.clone(); + let dns_query_domain = domain.clone(); + TestRunner::with_ebpf(program) + .run_async(async move { + // DNS server. + let mut server = Server::default(); + server + .add_records(&dns_server_domain, vec![address]) + .unwrap(); + let socket = tokio::net::UdpSocket::bind(&addr).await.unwrap(); + let local_addr = socket.local_addr().unwrap(); + tokio::spawn(async move { + server.start(socket).await.unwrap(); + }); + + // DNS requests. + let mut config = ResolverConfig::new(); + config.add_name_server(NameServerConfig::new(local_addr, Protocol::Udp)); + let resolver = TokioAsyncResolver::tokio(config, ResolverOpts::default()); + resolver.lookup_ip(&dns_server_domain).await.unwrap(); + }) + .await + .expect(move |event| { + if let NetworkEvent::Send { data, .. } = &event.payload { + if data.is_empty() { + return false; + } + let data = data.bytes(&event.buffer).unwrap(); + match parse_dns(data) { + Some(Payload::DnsQuery { questions }) => { + questions.iter().any(|q| q.name == dns_query_domain) + } + _ => false, + } + } else { + false + } + }) + .expect(move |event| { + if let NetworkEvent::Receive { data, .. } = &event.payload { + if data.is_empty() { + return false; + } + let data = data.bytes(&event.buffer).unwrap(); + match parse_dns(data) { + Some(Payload::DnsResponse { questions, answers }) => { + let expected_record = + format!("{expected_record_type}(Record({address}))"); + questions.iter().any(|q| q.name == domain) + && answers + .iter() + .any(|a| a.name == domain && a.data == expected_record) + } + _ => false, + } + } else { + false + } + }) + .report() + } } diff --git a/crates/pulsar-core/src/pdk/process_tracker.rs b/crates/pulsar-core/src/pdk/process_tracker.rs index cb928468..26163f0f 100644 --- a/crates/pulsar-core/src/pdk/process_tracker.rs +++ b/crates/pulsar-core/src/pdk/process_tracker.rs @@ -1,4 +1,8 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + future::Future, + pin::Pin, +}; use bpf_common::{ containers::{ContainerId, ContainerInfo}, @@ -209,7 +213,7 @@ impl ProcessTracker { tokio::select! { msg = self.rx.recv() => match msg { Some(msg) => { - self.handle_message(msg); + self.handle_message(msg).await; self.cleanup(); // We check pending requests here and not periodically because // the only way we can get a response is by handling a message. @@ -224,9 +228,11 @@ impl ProcessTracker { } } - fn handle_message(&mut self, req: TrackerRequest) { + async fn handle_message(&mut self, req: TrackerRequest) { match req { - TrackerRequest::UpdateProcess(update) => self.handle_update(update), + TrackerRequest::UpdateProcess(update) => { + self.handle_update(update).await; + } TrackerRequest::GetProcessInfo(info_request) => { let r = self.get_info(info_request.pid, info_request.ts); match r { @@ -263,100 +269,116 @@ impl ProcessTracker { } } - fn handle_update(&mut self, mut update: TrackerUpdate) { - match update { - TrackerUpdate::Fork { - pid, - uid, - gid, - timestamp, - ppid, - namespaces, - container_id, - } => { - let container = - container_id.and_then(|c_id| { - match ContainerInfo::from_container_id(c_id, uid) { - Ok(container) => container, - Err(err) => { - log::error!("{err}"); - None + fn handle_update<'a>( + &'a mut self, + mut update: TrackerUpdate, + ) -> Pin + Send + 'a>> { + Box::pin(async move { + match update { + TrackerUpdate::Fork { + pid, + uid, + gid, + timestamp, + ppid, + namespaces, + container_id, + } => { + let container = match container_id { + Some(container_id) => { + match ContainerInfo::from_container_id(container_id.clone(), uid).await + { + Ok(container) => container, + Err(err) => { + log::error!("{err}"); + None + } } } - }); + None => None, + }; - self.processes.insert( - pid, - ProcessData { - ppid, - uid, - gid, - fork_time: timestamp, - exit_time: None, - original_image: self.get_image(ppid, timestamp), - exec_changes: BTreeMap::new(), - argv: self - .processes - .get(&ppid) - .map(|parent| parent.argv.clone()) - .unwrap_or_default(), - namespaces, - container, - }, - ); - if let Some(pending_updates) = self.pending_updates.remove(&pid) { - pending_updates - .into_iter() - .for_each(|update| self.handle_update(update)); + self.processes.insert( + pid, + ProcessData { + ppid, + uid, + gid, + fork_time: timestamp, + exit_time: None, + original_image: self.get_image(ppid, timestamp), + exec_changes: BTreeMap::new(), + argv: self + .processes + .get(&ppid) + .map(|parent| parent.argv.clone()) + .unwrap_or_default(), + namespaces, + container, + }, + ); + if let Some(pending_updates) = self.pending_updates.remove(&pid) { + for update in pending_updates { + self.handle_update(update).await; + } + } } - } - TrackerUpdate::Exec { - pid, - uid, - timestamp, - ref mut image, - ref mut argv, - namespaces, - ref container_id, - } => { - let container = container_id.clone().and_then(|c_id| { - match ContainerInfo::from_container_id(c_id, uid) { - Ok(container) => container, - Err(err) => { - log::error!("{err}"); - None + TrackerUpdate::Exec { + pid, + uid, + timestamp, + ref mut image, + ref mut argv, + namespaces, + ref container_id, + } => { + let container = match container_id { + Some(container_id) => { + match ContainerInfo::from_container_id(container_id.clone(), uid).await + { + Ok(container) => container, + Err(err) => { + log::error!("{err}"); + None + } + } } + None => None, + }; + + if let Some(p) = self.processes.get_mut(&pid) { + p.exec_changes.insert(timestamp, std::mem::take(image)); + p.argv = std::mem::take(argv); + p.namespaces = namespaces; + p.container = container; + } else { + // if exec arrived before the fork, we save the event as pending + log::debug!( + "(exec) Process {pid} not found in process tree, saving for later" + ); + self.pending_updates.entry(pid).or_default().push(update); } - }); - - if let Some(p) = self.processes.get_mut(&pid) { - p.exec_changes.insert(timestamp, std::mem::take(image)); - p.argv = std::mem::take(argv); - p.namespaces = namespaces; - p.container = container; - } else { - // if exec arrived before the fork, we save the event as pending - log::debug!("(exec) Process {pid} not found in process tree, saving for later"); - self.pending_updates.entry(pid).or_default().push(update); } - } - TrackerUpdate::Exit { pid, timestamp } => { - if let Some(p) = self.processes.get_mut(&pid) { - p.exit_time = Some(timestamp); - } else { - // if exit arrived before the fork, we save the event as pending - log::debug!("(exit) Process {pid} not found in process tree, saving for later"); - self.pending_updates.entry(pid).or_default().push(update); + TrackerUpdate::Exit { pid, timestamp } => { + if let Some(p) = self.processes.get_mut(&pid) { + p.exit_time = Some(timestamp); + } else { + // if exit arrived before the fork, we save the event as pending + log::debug!( + "(exit) Process {pid} not found in process tree, saving for later" + ); + self.pending_updates.entry(pid).or_default().push(update); + } } - } - TrackerUpdate::SetNewParent { pid, ppid } => { - if let Some(p) = self.processes.get_mut(&pid) { - p.ppid = ppid; - } else { - log::warn!("{ppid} is the new parent of {pid}, but we couldn't find it") + TrackerUpdate::SetNewParent { pid, ppid } => { + if let Some(p) = self.processes.get_mut(&pid) { + p.ppid = ppid; + } else { + log::warn!("{ppid} is the new parent of {pid}, but we couldn't find it") + } } } - } + }) } fn get_info(&self, pid: Pid, ts: Timestamp) -> Result { diff --git a/crates/validatron/src/reflection.rs b/crates/validatron/src/reflection.rs index 070989d1..b49b8c2c 100644 --- a/crates/validatron/src/reflection.rs +++ b/crates/validatron/src/reflection.rs @@ -33,11 +33,11 @@ //! //! The [ClassBuilder] can build 4 types of class //! - [Primitive] : representation of a base type. It needs a parsing function and a function to know which operator is available -//! on it and how to use it. +//! on it and how to use it. //! - [Struct] : representation of a struct. It needs the description of its fields and how to access each one. //! - [Enum] : representation of a enum. It needs the description of its fields, including the relative variant, and how to access each one. //! - [Collection] : representation of a collection. It requires that the current type implements `IntoIterator` if `U` is the -//! type of the items in the collection. +//! type of the items in the collection. use std::marker::PhantomData; diff --git a/xtask/src/test.rs b/xtask/src/test.rs index 8e0ca111..f4c66d60 100644 --- a/xtask/src/test.rs +++ b/xtask/src/test.rs @@ -41,6 +41,10 @@ pub(crate) struct Options { /// Space or comma separated list of kernel versions to use in architest/QEMU. #[clap(long, default_value = "6.6", value_delimiter = ',')] kernel_versions: Vec, + + /// Arguments to pass to the test suite + #[clap(name = "args")] + test_args: Vec, } fn download_tarball

(url: &str, tarball_path: P) -> Result<()> @@ -105,6 +109,7 @@ fn test_architest(sh: Shell, options: Options, binary_file: &str) -> Result<()> target, preserve_tempdir, kernel_versions, + test_args, .. } = options; @@ -175,7 +180,8 @@ fn test_architest(sh: Shell, options: Options, binary_file: &str) -> Result<()> std::thread::sleep(std::time::Duration::from_secs(12)); cmd!(sh, "scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -P 3366 {binary_file} root@localhost:/tmp/").run()?; - cmd!(sh, "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null root@localhost -p 3366 /tmp/test-suite").run()?; + let test_args = test_args.clone(); + cmd!(sh, "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null root@localhost -p 3366 /tmp/test-suite {test_args...}").run()?; qemu_process.kill()?; qemu_process.wait()?; @@ -198,6 +204,7 @@ pub(crate) fn run(options: Options) -> Result<()> { release, features, force_architest, + test_args, .. } = &options; let mut args = Vec::new(); @@ -225,7 +232,7 @@ pub(crate) fn run(options: Options) -> Result<()> { if *force_architest || !target.starts_with(arch) { test_architest(sh, options, &binary_file)?; } else { - cmd!(sh, "sudo -E {binary_file}").run()?; + cmd!(sh, "sudo -E {binary_file} {test_args...}").run()?; } Ok(())