From 7dd2594b918c491f762e4c2d46d2187a1c6c9067 Mon Sep 17 00:00:00 2001 From: Denis Benato Date: Fri, 3 Jan 2025 03:03:36 +0100 Subject: [PATCH 1/4] Improv(main): remove the useless usage of pending Pending is an infinite loop, this means that if run() terminates the program will be stuck in an endless (unresponsive) loop: remove the usage of pending and replace it with a join on run so that if the program exits due to an error systemd can restart it; this also avoids using an unnecessary task. Also avoid losing reference to the spawned Ctrl+C task. --- src/main.rs | 65 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/src/main.rs b/src/main.rs index 49043a46..dc305e45 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ use std::env; use std::error::Error; -use std::future::pending; use std::process; use zbus::fdo::ObjectManager; use zbus::Connection; @@ -32,17 +31,6 @@ async fn main() -> Result<(), Box> { const VERSION: &str = env!("CARGO_PKG_VERSION"); log::info!("Starting InputPlumber v{}", VERSION); - // Setup CTRL+C handler - tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - log::info!("Un-hiding all devices"); - if let Err(e) = unhide_all().await { - log::error!("Unable to un-hide devices: {:?}", e); - } - log::info!("Shutting down"); - process::exit(0); - }); - // Configure the DBus connection let connection = Connection::system().await?; @@ -57,19 +45,52 @@ async fn main() -> Result<(), Box> { // Create an InputManager instance let mut input_manager = Manager::new(connection.clone()); - // Start the input manager and listen on DBus - tokio::spawn(async move { - log::debug!("Starting input manager thread"); - if let Err(e) = input_manager.run().await { - log::error!("Error running input manager: {:?}", e); + let (ctrl_c_result, input_man_result, request_name_result) = tokio::join!( + // Setup CTRL+C handler + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + log::info!("Un-hiding all devices"); + if let Err(e) = unhide_all().await { + log::error!("Unable to un-hide devices: {:?}", e); + } + log::info!("Shutting down"); + process::exit(0); + }), + // Start the input manager and listen on DBus + input_manager.run(), + // Request the named bus + connection.request_name(BUS_NAME) + ); + + match ctrl_c_result { + Ok(_) => { + log::info!("The input manager task has exited"); } - }); + Err(err) => { + log::error!("Error in joining ctrl+C watcher: {err}"); + return Err(Box::new(err) as Box); + } + } - // Request the named bus - connection.request_name(BUS_NAME).await?; + match request_name_result { + Ok(_) => { + log::info!("The input manager task has exited"); + } + Err(err) => { + log::error!("Error in joining dbus request name operation: {err}"); + return Err(Box::new(err)); + } + }; - // Do other things or go to wait forever - pending::<()>().await; + match input_man_result { + Ok(_) => { + log::info!("The input manager task has exited"); + } + Err(err) => { + log::error!("Error in joining ctrl+C watcher: {err}"); + return Err(err); + } + }; log::info!("InputPlumber stopped"); From 7bc921ad2b07ef8ff687d9c1dffbaab44b774c79 Mon Sep 17 00:00:00 2001 From: Denis Benato Date: Fri, 3 Jan 2025 03:36:59 +0100 Subject: [PATCH 2/4] Improv(manager): reorganize code and reduce dependency of UdevDevice::devnode being populated For a future implementation of leds it is important checks of devnode being populated are pushed where devnode is acually used: do that while also splitting up a gigantic method that both leaked a reference to a spaned task and was very hard to work with. --- src/input/manager.rs | 356 ++++++++++++++++++++++--------------------- 1 file changed, 183 insertions(+), 173 deletions(-) diff --git a/src/input/manager.rs b/src/input/manager.rs index 35c0ba13..ecfec90e 100644 --- a/src/input/manager.rs +++ b/src/input/manager.rs @@ -39,6 +39,7 @@ use crate::input::source::iio; use crate::input::target::TargetDevice; use crate::input::target::TargetDeviceTypeId; use crate::udev; +use crate::udev::device::AttributeGetter; use crate::udev::device::UdevDevice; use super::composite_device::client::CompositeDeviceClient; @@ -194,15 +195,7 @@ impl Manager { } } - /// Starts listening for [Command] messages to be sent from clients and - /// dispatch those events. - pub async fn run(&mut self) -> Result<(), Box> { - // Start tasks for discovering new input devices - self.watch_input_devices().await?; - - // Create a DBus interface - self.listen_on_dbus().await?; - + async fn events_loop(&mut self) -> Result<(), Box> { // Loop and listen for command events while let Some(cmd) = self.rx.recv().await { log::debug!("Received command: {:?}", cmd); @@ -296,13 +289,16 @@ impl Manager { self.target_devices.remove(&path); } ManagerCommand::DeviceAdded { device } => { + let dev_name = device.name(); + let dev_sysname = device.sysname(); + if let Err(e) = self.on_device_added(device).await { - log::error!("Error adding device: {e:?}"); + log::error!("Error adding device '{dev_name} ({dev_sysname})': {e}"); } } ManagerCommand::DeviceRemoved { device } => { if let Err(e) = self.on_device_removed(device).await { - log::error!("Error removing device: {e:?}"); + log::error!("Error removing device: {e}"); } } ManagerCommand::SetManageAllDevices(manage_all_devices) => { @@ -399,6 +395,54 @@ impl Manager { Ok(()) } + /// Starts listening for [Command] messages to be sent from clients and + /// dispatch those events. + pub async fn run(&mut self) -> Result<(), Box> { + let tx_for_listen_on_dbus = self.tx.clone(); + let dbus_for_listen_on_dbus = self.dbus.clone(); + + let cmd_tx_all_devices = self.tx.clone(); + let cmd_tx_iio = self.tx.clone(); + let cmd_tx_hidraw = self.tx.clone(); + + // Watch for hidraw/evdev inotify events. + // TODO: when we reload the udev device it triggers the udev watcher. We do this to break + // access to the file descriptor for processes that have already authenticated. Figure out + // a way to do this only using the udev events. + let (watcher_tx, mut watcher_rx) = mpsc::channel(BUFFER_SIZE); + if std::path::Path::new(DEV_PATH).exists() { + let tx = watcher_tx.clone(); + tokio::task::spawn_blocking(move || { + log::info!("Started hidraw device discovery thread"); + watcher::watch(DEV_PATH.into(), tx) + }); + } + if std::path::Path::new(INPUT_PATH).exists() { + let tx = watcher_tx.clone(); + tokio::task::spawn_blocking(move || { + log::info!("Started evdev device discovery thread"); + watcher::watch(INPUT_PATH.into(), tx) + }); + } + + log::debug!("Starting input manager task..."); + + let _ = tokio::join!( + // Discover all (initial) devices + Self::discover_all_devices(&cmd_tx_all_devices), + // Watch for IIO device events + Self::watch_iio_devices(cmd_tx_iio), + // Watch for iio and other devices + Self::watch_hidraw_devices(cmd_tx_hidraw, &mut watcher_rx), + // Create a DBus interface + Self::listen_on_dbus(dbus_for_listen_on_dbus, tx_for_listen_on_dbus), + // Manage events generated by other tasks + self.events_loop() + ); + + Ok(()) + } + /// Create a new [CompositeDevice] from the given [CompositeDeviceConfig] async fn create_composite_device( &mut self, @@ -609,14 +653,14 @@ impl Manager { let tx = self.tx.clone(); tokio::spawn(async move { if let Err(e) = device.run(targets).await { - log::error!("Error running {dbus_path}: {:?}", e); + log::error!("Error running {dbus_path}: {e}"); } log::debug!("Composite device stopped running: {dbus_path}"); if let Err(e) = tx .send(ManagerCommand::CompositeDeviceStopped(dbus_path)) .await { - log::error!("Error sending composite device stopped: {:?}", e); + log::error!("Error sending composite device stopped: {e}"); } }); let comp_path = path.clone(); @@ -1205,7 +1249,8 @@ impl Manager { device: UdevDevice, id: String, ) -> Result<(), Box> { - log::debug!("Source device removed: {}", device.devnode()); + let dev_name = device.name(); + log::debug!("Source device removed: {dev_name}"); let Some(composite_device_path) = self.source_devices_used.get(&id) else { log::debug!("Source device not being managed by a composite device"); return Ok(()); @@ -1240,11 +1285,9 @@ impl Manager { /// Called when a new device is detected by udev async fn on_device_added(&mut self, device: UdevDevice) -> Result<(), Box> { - // We REQUIRE a dev node - let dev_node = device.devnode(); - if dev_node.is_empty() { - return Ok(()); - } + let dev_path = device.devpath(); + let dev_name = device.name(); + let dev_sysname = device.sysname(); let sys_name = device.sysname(); if sys_name.is_empty() { return Ok(()); @@ -1252,7 +1295,7 @@ impl Manager { let sysname = sys_name.clone(); let dev = device.clone(); - log::debug!("Device added: {}", device.devnode()); + log::debug!("Device added: {dev_name} ({dev_sysname}): {dev_path}"); // Get the device subsystem let subsystem = device.subsystem(); @@ -1268,7 +1311,9 @@ impl Manager { // Create a DBus interface for the event device let conn = self.dbus.clone(); let path = evdev::get_dbus_path(sys_name.clone()); - log::debug!("Attempting to listen on dbus for {dev_node} | {sysname}"); + log::debug!( + "Attempting to listen on dbus for {dev_path} | {dev_name} ({dev_sysname})" + ); let dbus_path = path.clone(); task::spawn(async move { @@ -1296,13 +1341,13 @@ impl Manager { // Check to see if the device is virtual if device.is_virtual() { // Look up the connected device using udev - let device_info = udev::get_device(dev_node.clone()).await?; + let device_info = udev::get_device(dev_path.clone()).await?; // Check if the virtual device is using the bluetooth bus // TODO: Can we get properties from UdevDevice::get_attribute_from_tree? let id_bus = device_info.properties.get("ID_BUS"); - log::debug!("Bus ID for {dev_node}: {id_bus:?}"); + log::debug!("Bus ID for {dev_path}: {id_bus:?}"); let is_bluetooth = { if let Some(bus) = id_bus { bus == "bluetooth" @@ -1312,15 +1357,12 @@ impl Manager { }; if !is_bluetooth { - log::debug!("{} is virtual, skipping consideration.", dev_node); + log::debug!("{dev_name} ({dev_sysname}) is virtual, skipping consideration for {dev_path}"); return Ok(()); } - log::debug!( - "{} is a virtual device node for a bluetooth device. Treating as real.", - dev_node - ) + log::debug!("{dev_name} ({dev_sysname}) is a virtual device node for a bluetooth device. Treating as real - {dev_path}") } else { - log::trace!("{} is a real device.", dev_node); + log::trace!("{dev_name} ({dev_sysname}) is a real device - {dev_path}"); } // Signal that a source device was added @@ -1334,7 +1376,7 @@ impl Manager { let conn = self.dbus.clone(); let path = hidraw::get_dbus_path(sys_name.clone()); - log::debug!("Attempting to listen on dbus for {dev_node} | {sysname}"); + log::debug!("Attempting to listen on dbus for {dev_path} | {dev_sysname}"); let dbus_path = path.clone(); task::spawn(async move { let result = SourceUdevDeviceInterface::listen_on_dbus( @@ -1362,7 +1404,7 @@ impl Manager { // Check to see if this virtual device is a bluetooth device let uniq = device.uniq(); if uniq.is_empty() { - log::debug!("{} is virtual, skipping consideration.", dev_node); + log::debug!("{dev_name} ({dev_sysname}) is virtual, skipping consideration for {dev_path}."); return Ok(()); }; @@ -1407,15 +1449,12 @@ impl Manager { } if !matches_bluetooth { - log::debug!("{} is virtual, skipping consideration.", dev_node); + log::debug!("{dev_name} ({dev_sysname}) is virtual, skipping consideration for {dev_path}."); return Ok(()); } - log::debug!( - "{} is a virtual device node for a bluetooth device. Treating as real.", - dev_node - ); + log::debug!("{dev_name} ({dev_sysname}) is a virtual device node for a bluetooth device. Treating as real - {dev_path}"); } else { - log::trace!("{} is a real device.", dev_node); + log::trace!("{dev_name} ({dev_sysname}) is a real device -{dev_path}"); } // Signal that a source device was added @@ -1431,7 +1470,7 @@ impl Manager { let conn = self.dbus.clone(); let path = iio::get_dbus_path(sys_name.clone()); - log::debug!("Attempting to listen on dbus for {dev_node} | {sysname}"); + log::debug!("Attempting to listen on dbus for device {dev_name} ({dev_sysname}) | {dev_path}"); let dbus_path = path.clone(); task::spawn(async move { let result = SourceUdevDeviceInterface::listen_on_dbus( @@ -1457,10 +1496,10 @@ impl Manager { // Check to see if the device is virtual if device.is_virtual() { - log::debug!("{} is virtual, skipping consideration.", dev_node); + log::debug!("{dev_name} ({dev_sysname}) is virtual, skipping consideration for {dev_path}"); return Ok(()); } else { - log::trace!("Real device: {}", dev_node); + log::trace!("Device {dev_name} ({dev_sysname}) is real - {dev_path}"); } // Signal that a source device was added @@ -1478,9 +1517,10 @@ impl Manager { } async fn on_device_removed(&mut self, device: UdevDevice) -> Result<(), Box> { - log::debug!("Device removed: {:?}", device.devnode()); + let dev_name = device.name(); let sys_name = device.sysname(); let subsystem = device.subsystem(); + log::debug!("Device removed: {dev_name} ({sys_name})"); let path = ObjectPath::from_string_unchecked(format!("{BUS_SOURCES_PREFIX}/{sys_name}")); log::debug!("Device dbus path: {path}"); let conn = self.dbus.clone(); @@ -1573,18 +1613,9 @@ impl Manager { } } - /// Starts watching for input devices that are added and removed. - async fn watch_input_devices(&self) -> Result<(), Box> { - log::debug!("Performing initial input device discovery"); - let cmd_tx = self.tx.clone(); - task::spawn(async move { - if let Err(e) = Manager::discover_all_devices(&cmd_tx).await { - log::error!("Failed to perform initial device discovery: {e:?}"); - } - }); - - // Watch for IIO device events. - let cmd_tx = self.tx.clone(); + fn watch_iio_devices( + cmd_tx: mpsc::Sender, + ) -> tokio::task::JoinHandle>> { task::spawn_blocking(move || { let mut monitor = MonitorBuilder::new()?.match_subsystem("iio")?.listen()?; @@ -1601,150 +1632,129 @@ impl Manager { for event in monitor.iter() { let action = event.action().unwrap_or_default(); let device = event.device(); - let Some(path) = device.devnode() else { - log::trace!("No devnode found for device: {device:?}"); - continue; - }; + let dev_name = device.name(); + let dev_sysname = device.sysname().to_string_lossy(); - match action.to_str().unwrap() { + match action.to_string_lossy().trim() { "add" => { - log::debug!("Got udev add action for {path:?}"); + log::debug!( + "Got udev add action for iio device {dev_name} ({dev_sysname})" + ); cmd_tx.blocking_send(ManagerCommand::DeviceAdded { device: device.into(), })?; } "remove" => { - log::debug!("Got udev remove action for {path:?}"); + log::debug!( + "Got udev remove action for iio device {dev_name} ({dev_sysname})" + ); cmd_tx.blocking_send(ManagerCommand::DeviceRemoved { device: device.into(), })?; } - _ => { - log::trace!("Unhandled udev action: {action:?}"); + unhandled_action => { + log::trace!("Unhandled udev action for iio device {dev_name} ({dev_sysname}: {unhandled_action}"); } } } std::thread::sleep(Duration::from_millis(10)); } - #[allow(unreachable_code)] - Ok::<(), Box>(()) - }); - - // Watch for hidraw/evdev inotify events. - // TODO: when we reload the udev device it triggers the udev watcher. We do this to break - // access to the file descriptor for processes that have already authenticated. Figure out - // a way to do this only using the udev events. - let cmd_tx = self.tx.clone(); - let (watcher_tx, mut watcher_rx) = mpsc::channel(BUFFER_SIZE); - if std::path::Path::new(DEV_PATH).exists() { - let tx = watcher_tx.clone(); - tokio::task::spawn_blocking(move || { - log::info!("Started hidraw device discovery thread"); - watcher::watch(DEV_PATH.into(), tx) - }); - } - if std::path::Path::new(INPUT_PATH).exists() { - let tx = watcher_tx.clone(); - tokio::task::spawn_blocking(move || { - log::info!("Started evdev device discovery thread"); - watcher::watch(INPUT_PATH.into(), tx) - }); - } + }) + } - task::spawn(async move { - 'outer: while let Some(event) = watcher_rx.recv().await { - match event { - WatchEvent::Create { name, base_path } => { - let subsystem = { - match base_path.as_str() { - "/dev" => { - if !name.starts_with("hidraw") { - None - } else { - Some("hidraw") - } + async fn watch_hidraw_devices( + cmd_tx: mpsc::Sender, + watcher_rx: &mut mpsc::Receiver, + ) -> () { + 'outer: while let Some(event) = watcher_rx.recv().await { + match event { + WatchEvent::Create { name, base_path } => { + let subsystem = { + match base_path.as_str() { + "/dev" => { + if !name.starts_with("hidraw") { + None + } else { + Some("hidraw") } - "/dev/input" => Some("input"), - - _ => None, } - }; - let Some(subsystem) = subsystem else { - log::trace!("No supported subsystem detected for {base_path}/{name}"); - continue; - }; + "/dev/input" => Some("input"), - // Wait until the device has initialized with udev - const MAX_TRIES: u8 = 80; - let mut attempt: u8 = 0; - loop { - // Break after max attempts reached - if attempt > MAX_TRIES { - log::warn!("Unable to create initialized UdevDevice for {base_path}/{name} after {MAX_TRIES} attempts."); - continue 'outer; - } + _ => None, + } + }; + let Some(subsystem) = subsystem else { + log::trace!("No supported subsystem detected for {base_path}/{name}"); + continue; + }; - // Try to get the device from udev to check its initialization state - { - let Ok(device) = ::udev::Device::from_subsystem_sysname( - subsystem.to_string(), - name.clone(), - ) else { - log::debug!( - "Unable to create UdevDevice from {base_path}/{name} to check initialization" - ); - attempt += 1; - tokio::time::sleep(Duration::from_millis(10)).await; - continue; - }; + // Wait until the device has initialized with udev + const MAX_TRIES: u8 = 80; + let mut attempt: u8 = 0; + loop { + // Break after max attempts reached + if attempt > MAX_TRIES { + log::warn!("Unable to create initialized UdevDevice for {base_path}/{name} after {MAX_TRIES} attempts."); + continue 'outer; + } - if device.is_initialized() { - break; - } + // Try to get the device from udev to check its initialization state + { + let Ok(device) = ::udev::Device::from_subsystem_sysname( + subsystem.to_string(), + name.clone(), + ) else { + log::debug!( + "Unable to create UdevDevice from {base_path}/{name} to check initialization" + ); + attempt += 1; + tokio::time::sleep(Duration::from_millis(10)).await; + continue; }; - log::trace!("{base_path}/{name} is not yet initialized by udev"); - - tokio::time::sleep(Duration::from_millis(10)).await; - attempt += 1; - } - // Create a udev device for the device - let Ok(device) = ::udev::Device::from_subsystem_sysname( - subsystem.to_string(), - name.clone(), - ) else { - log::warn!("Unable to create UdevDevice from {base_path}/{name}"); - continue; + if device.is_initialized() { + break; + } }; + log::trace!("{base_path}/{name} is not yet initialized by udev"); - // Notify the manager that a device was added - log::debug!("Got inotify add action for {base_path}/{name}"); - let result = cmd_tx - .send(ManagerCommand::DeviceAdded { - device: device.into(), - }) - .await; - if let Err(e) = result { - log::error!("Unable to send command: {:?}", e); - } + tokio::time::sleep(Duration::from_millis(10)).await; + attempt += 1; } - WatchEvent::Delete { name, base_path } => { - let device = UdevDevice::from_devnode(base_path.as_str(), name.as_str()); - log::debug!("Got inotify remove action for {base_path}/{name}"); - let result = cmd_tx.send(ManagerCommand::DeviceRemoved { device }).await; - if let Err(e) = result { - log::error!("Unable to send command: {:?}", e); - } + + // Create a udev device for the device + let Ok(device) = + ::udev::Device::from_subsystem_sysname(subsystem.to_string(), name.clone()) + else { + log::warn!("Unable to create UdevDevice from {base_path}/{name}"); + continue; + }; + + // Notify the manager that a device was added + log::debug!("Got inotify add action for {base_path}/{name}"); + let result = cmd_tx + .send(ManagerCommand::DeviceAdded { + device: device.into(), + }) + .await; + if let Err(e) = result { + log::error!("Unable to send command: {:?}", e); } - WatchEvent::Modify { - name: _, - base_path: _, - } => (), } + WatchEvent::Delete { name, base_path } => { + let device = UdevDevice::from_devnode(base_path.as_str(), name.as_str()); + log::debug!("Got inotify remove action for {base_path}/{name}"); + let result = cmd_tx.send(ManagerCommand::DeviceRemoved { device }).await; + if let Err(e) = result { + log::error!("Unable to send command: {:?}", e); + } + } + WatchEvent::Modify { + name: _, + base_path: _, + } => (), } - }); - - Ok(()) + } } /// Performs initial input device discovery of all supported subsystems @@ -1879,17 +1889,17 @@ impl Manager { } /// Creates a DBus object - async fn listen_on_dbus(&self) -> Result<(), Box> { - let iface = ManagerInterface::new(self.tx.clone()); + async fn listen_on_dbus( + dbus: Connection, + tx: mpsc::Sender, + ) -> tokio::task::JoinHandle<()> { + let iface = ManagerInterface::new(tx); let manager_path = format!("{}/Manager", BUS_PREFIX); - let dbus = self.dbus.clone(); task::spawn(async move { if let Err(e) = dbus.object_server().at(manager_path, iface).await { log::error!("Failed create manager dbus interface: {e:?}"); } - }); - - Ok(()) + }) } async fn add_device_to_composite_device( From d02c15c810f37aafd656c15a05029dee031b10f5 Mon Sep 17 00:00:00 2001 From: Denis Benato Date: Mon, 30 Dec 2024 23:36:53 +0100 Subject: [PATCH 3/4] Improv(manager): check for devnode() validity for device types that actually requires it In order to implement LED(s) handling and potentially other device types that don't have a node in /dev early checks for devnode() being valid must be dropped. --- src/input/manager.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/input/manager.rs b/src/input/manager.rs index ecfec90e..269ac76c 100644 --- a/src/input/manager.rs +++ b/src/input/manager.rs @@ -1290,6 +1290,7 @@ impl Manager { let dev_sysname = device.sysname(); let sys_name = device.sysname(); if sys_name.is_empty() { + log::debug!("Device discarded for missing sysname: {dev_name} at {dev_path}"); return Ok(()); } let sysname = sys_name.clone(); @@ -1306,7 +1307,12 @@ impl Manager { // Create a DBus interface depending on the device subsystem match subsystem.as_str() { "input" => { - log::debug!("Event device added"); + if device.devnode().is_empty() { + log::debug!("Event device discarded for missing devnode: {dev_name} ({dev_sysname}) at {dev_path}"); + return Ok(()); + } + + log::debug!("Event device added: {dev_name} ({dev_sysname})"); // Create a DBus interface for the event device let conn = self.dbus.clone(); @@ -1371,7 +1377,13 @@ impl Manager { log::debug!("Finished adding {id}"); } "hidraw" => { - log::debug!("hidraw device added"); + if device.devnode().is_empty() { + log::debug!("hidraw device discarded for missing devnode: {dev_name} ({dev_sysname}) at {dev_path}"); + return Ok(()); + } + + log::debug!("hidraw device added: {dev_name} ({dev_sysname})"); + // Create a DBus interface for the event device let conn = self.dbus.clone(); let path = hidraw::get_dbus_path(sys_name.clone()); @@ -1464,7 +1476,12 @@ impl Manager { } "iio" => { - log::debug!("iio device added"); + if device.devnode().is_empty() { + log::warn!("iio device discarded for missing devnode: {dev_name} ({dev_sysname}) at {dev_path}"); + return Ok(()); + } + + log::debug!("iio device added: {} ({})", device.name(), device.sysname()); // Create a DBus interface for the event device let conn = self.dbus.clone(); From 9134c8890c04e47df238c533a65350767164d072 Mon Sep 17 00:00:00 2001 From: Denis Benato Date: Tue, 31 Dec 2024 03:38:52 +0100 Subject: [PATCH 4/4] Improv(manager): remove duplicate code with minimal specialization in the manager component This makes it easier in the future to add new device types suc as led devices. --- src/input/manager.rs | 587 +++++++++---------------------------------- 1 file changed, 125 insertions(+), 462 deletions(-) diff --git a/src/input/manager.rs b/src/input/manager.rs index 269ac76c..217862ef 100644 --- a/src/input/manager.rs +++ b/src/input/manager.rs @@ -195,6 +195,47 @@ impl Manager { } } + /// Starts listening for [Command] messages to be sent from clients and + /// dispatch those events. + pub async fn run(&mut self) -> Result<(), Box> { + let dbus_for_listen_on_dbus = self.dbus.clone(); + + let cmd_tx_all_devices = self.tx.clone(); + + // Watch for hidraw/evdev inotify events. + // TODO: when we reload the udev device it triggers the udev watcher. We do this to break + // access to the file descriptor for processes that have already authenticated. Figure out + // a way to do this only using the udev events. + let (watcher_tx, mut watcher_rx) = mpsc::channel(BUFFER_SIZE); + if std::path::Path::new(DEV_PATH).exists() { + let tx = watcher_tx.clone(); + tokio::task::spawn_blocking(move || { + log::info!("Started hidraw device discovery thread"); + watcher::watch(DEV_PATH.into(), tx) + }); + } + if std::path::Path::new(INPUT_PATH).exists() { + let tx = watcher_tx.clone(); + tokio::task::spawn_blocking(move || { + log::info!("Started evdev device discovery thread"); + watcher::watch(INPUT_PATH.into(), tx) + }); + } + + log::debug!("Starting input manager task..."); + + let _ = tokio::join!( + Self::discover_all_devices(&cmd_tx_all_devices), + Self::watch_iio_devices(self.tx.clone()), + Self::watch_devnodes(self.tx.clone(), &mut watcher_rx), + Self::listen_on_dbus(dbus_for_listen_on_dbus, self.tx.clone()), + self.events_loop() + ); + + Ok(()) + } + + /// Manage events generated by various components async fn events_loop(&mut self) -> Result<(), Box> { // Loop and listen for command events while let Some(cmd) = self.rx.recv().await { @@ -395,54 +436,6 @@ impl Manager { Ok(()) } - /// Starts listening for [Command] messages to be sent from clients and - /// dispatch those events. - pub async fn run(&mut self) -> Result<(), Box> { - let tx_for_listen_on_dbus = self.tx.clone(); - let dbus_for_listen_on_dbus = self.dbus.clone(); - - let cmd_tx_all_devices = self.tx.clone(); - let cmd_tx_iio = self.tx.clone(); - let cmd_tx_hidraw = self.tx.clone(); - - // Watch for hidraw/evdev inotify events. - // TODO: when we reload the udev device it triggers the udev watcher. We do this to break - // access to the file descriptor for processes that have already authenticated. Figure out - // a way to do this only using the udev events. - let (watcher_tx, mut watcher_rx) = mpsc::channel(BUFFER_SIZE); - if std::path::Path::new(DEV_PATH).exists() { - let tx = watcher_tx.clone(); - tokio::task::spawn_blocking(move || { - log::info!("Started hidraw device discovery thread"); - watcher::watch(DEV_PATH.into(), tx) - }); - } - if std::path::Path::new(INPUT_PATH).exists() { - let tx = watcher_tx.clone(); - tokio::task::spawn_blocking(move || { - log::info!("Started evdev device discovery thread"); - watcher::watch(INPUT_PATH.into(), tx) - }); - } - - log::debug!("Starting input manager task..."); - - let _ = tokio::join!( - // Discover all (initial) devices - Self::discover_all_devices(&cmd_tx_all_devices), - // Watch for IIO device events - Self::watch_iio_devices(cmd_tx_iio), - // Watch for iio and other devices - Self::watch_hidraw_devices(cmd_tx_hidraw, &mut watcher_rx), - // Create a DBus interface - Self::listen_on_dbus(dbus_for_listen_on_dbus, tx_for_listen_on_dbus), - // Manage events generated by other tasks - self.events_loop() - ); - - Ok(()) - } - /// Create a new [CompositeDevice] from the given [CompositeDeviceConfig] async fn create_composite_device( &mut self, @@ -762,300 +755,88 @@ impl Manager { // If the CompositeDevice only allows a maximum number of source devices, // check to see if that limit has been reached. If that limit is reached, // then a new CompositeDevice will be created for the source device. - if let Some(max_sources) = config.maximum_sources { - // If maximum_sources is less than 1 (e.g. 0, -1) then consider - // the maximum to be 'unlimited'. - if max_sources > 0 { - // Check to see how many source devices this composite device is - // currently managing. - if let Some(sources) = self.composite_device_sources.get(composite_device) { - let sources_count = sources.len() as i32; - if sources_count >= max_sources { - log::trace!( - "{composite_device:?} maximum source devices reached: {max_sources}. Skipping." - ); - continue; - } - } + // If maximum_sources is less than 1 (e.g. 0, -1) then consider + // the maximum to be 'unlimited'. + if let Some(max_sources) = config + .maximum_sources + .filter(|max_sources| *max_sources > 0) + { + // Check to see how many source devices this composite device is + // currently managing. + if self + .composite_device_sources + .get(composite_device) + .map_or(false, |sources| (sources.len() as i32) >= max_sources) + { + log::trace!( + "{composite_device:?} maximum source devices reached: {max_sources}. Skipping." + ); + continue; } } // Check if this device matches any source udev configs of the running // CompositeDevice. - for source_device in config.source_devices.iter() { - log::trace!("Checking if existing composite device is missing udev device {id}"); - let Some(udev_config) = source_device.udev.as_ref() else { - continue; - }; - if !config.has_matching_udev(&device, udev_config) { - continue; - } - - // Check if the device has already been used in this config or not, - // stop here if the device must be unique. - if let Some(sources) = self.composite_device_sources.get(composite_device) { - for source in sources { - if source != source_device { - continue; - } - if let Some(ignored) = source_device.ignore { - if ignored { - log::debug!( - "Ignoring device {:?}, not adding to composite device: {}", - source_device, - composite_device - ); - break 'start; - } - } - if let Some(unique) = source_device.clone().unique { - if unique { - log::trace!( - "Found unique device {:?}, not adding to composite device {}", - source_device, - composite_device - ); - break 'start; - } - // Default to being unique - } else { - log::trace!( - "Found unique device {:?}, not adding to composite device {}", - source_device, - composite_device - ); - break 'start; - } - } - } - - log::info!("Found missing device, adding source device {id:?} to existing composite device: {composite_device:?}"); - let client = self.composite_devices.get(composite_device.as_str()); - if client.is_none() { - log::error!("No existing composite device found for key {composite_device:?}"); - continue; - } - self.add_device_to_composite_device(device, client.unwrap()) - .await?; - self.source_devices_used - .insert(id.clone(), composite_device.clone()); - let composite_id = composite_device.clone(); - if !self.composite_device_sources.contains_key(&composite_id) { - self.composite_device_sources - .insert(composite_id.clone(), Vec::new()); - } - let sources = self - .composite_device_sources - .get_mut(&composite_id) - .unwrap(); - sources.push(source_device.clone()); - self.source_devices.insert(id, source_device.clone()); - - return Ok(()); - } - - // TODO: Consolidate these - match device.subsystem().as_str() { - "input" => { - log::trace!( - "Checking if existing composite device is missing evdev device: {:?}", - device.name() - ); - for source_device in config.source_devices.iter() { - let Some(evdev_config) = source_device.evdev.as_ref() else { - log::trace!("Evdev section is empty"); - continue; - }; - if !config.has_matching_evdev(&device, evdev_config) { - continue; - } - - // Check if the device has already been used in this config or not, stop here if the device must be unique. - if let Some(sources) = self.composite_device_sources.get(composite_device) { - for source in sources { - if source != source_device { - continue; - } - if let Some(ignored) = source_device.ignore { - if ignored { - log::debug!("Ignoring device {:?}, not adding to composite device: {}", source_device, composite_device); - break 'start; - } - } - if let Some(unique) = source_device.clone().unique { - if unique { - log::trace!("Found unique device {:?}, not adding to composite device {}", source_device, composite_device); - break 'start; - } - // Default to being unique - } else { - log::trace!("Found unique device {:?}, not adding to composite device {}", source_device, composite_device); - break 'start; - } - } - } + let Some(source_device) = config.get_matching_device(&device) else { + log::trace!( + "Device {id} does not match existing device: {:?}", + config.name + ); - log::info!("Found missing device, adding source device {id:?} to existing composite device: {composite_device:?}"); - let client = self.composite_devices.get(composite_device.as_str()); - if client.is_none() { - log::error!( - "No existing composite device found for key {composite_device:?}" - ); - continue; - } - self.add_device_to_composite_device(device, client.unwrap()) - .await?; - self.source_devices_used - .insert(id.clone(), composite_device.clone()); - let composite_id = composite_device.clone(); - if !self.composite_device_sources.contains_key(&composite_id) { - self.composite_device_sources - .insert(composite_id.clone(), Vec::new()); - } - let sources = self - .composite_device_sources - .get_mut(&composite_id) - .unwrap(); - sources.push(source_device.clone()); - self.source_devices.insert(id, source_device.clone()); + continue; + }; - return Ok(()); + // Check if the device has already been used in this config or not, + // stop here if the device must be unique. + if let Some(sources) = self.composite_device_sources.get(composite_device) { + for source in sources { + if *source != source_device { + continue; } - } - "hidraw" => { - log::trace!( - "Checking if existing composite device is missing hidraw device: {:?}", - device.name() - ); - for source_device in config.source_devices.iter() { - let Some(hidraw_config) = source_device.hidraw.as_ref() else { - continue; - }; - if !config.has_matching_hidraw(&device, hidraw_config) { - continue; - } - // Check if the device has already been used in this config or not, stop here if the device must be unique. - if let Some(sources) = self.composite_device_sources.get(composite_device) { - for source in sources { - if source != source_device { - continue; - } - if let Some(ignored) = source_device.ignore { - if ignored { - log::debug!("Ignoring device {:?}, not adding to composite device: {}", source_device, composite_device); - break 'start; - } - } - if let Some(unique) = source_device.clone().unique { - if unique { - log::trace!("Found unique device {:?}, not adding to composite device {}", source_device, composite_device); - break 'start; - } - } else { - log::trace!("Found unique device {:?}, not adding to composite device {}", source_device, composite_device); - break 'start; - } - } - } - - log::info!("Found missing device, adding source device {id} to existing composite device: {composite_device}"); - let handle = self.composite_devices.get(composite_device.as_str()); - if handle.is_none() { - log::error!( - "No existing composite device found for key {}", - composite_device.as_str() - ); - continue; - } - self.add_device_to_composite_device(device, handle.unwrap()) - .await?; - self.source_devices_used - .insert(id.clone(), composite_device.clone()); - let composite_id = composite_device.clone(); - if !self.composite_device_sources.contains_key(&composite_id) { - self.composite_device_sources - .insert(composite_id.clone(), Vec::new()); - } - let sources = self - .composite_device_sources - .get_mut(&composite_id) - .unwrap(); - sources.push(source_device.clone()); + if source_device.ignore.map_or(false, |ignored| ignored) { + log::debug!( + "Ignoring device {:?}, not adding to composite device: {composite_device}", + source_device + ); + break 'start; + } - self.source_devices.insert(id, source_device.clone()); - return Ok(()); + // Check if the composite device has to be unique (default to being unique) + if source_device.unique.map_or(true, |unique| unique) { + log::trace!( + "Found unique device {:?}, not adding to composite device {composite_device}", + source_device + ); + break 'start; } } - "iio" => { - log::trace!("Checking if existing composite device is missing iio device"); - for source_device in config.source_devices.iter() { - let Some(iio_config) = source_device.iio.as_ref() else { - continue; - }; - if !config.has_matching_iio(&device, iio_config) { - continue; - } + } - // Check if the device has already been used in this config or not, stop here if the device must be unique. - if let Some(sources) = self.composite_device_sources.get(composite_device) { - for source in sources { - if source != source_device { - continue; - } - if let Some(ignored) = source_device.ignore { - if ignored { - log::debug!("Ignoring device {:?}, not adding to composite device: {}", source_device, composite_device); - continue; - } - } - if let Some(unique) = source_device.clone().unique { - if unique { - log::trace!("Found unique device {:?}, not adding to composite device {}", source_device, composite_device); - break 'start; - } - } else { - log::trace!("Found unique device {:?}, not adding to composite device {}", source_device, composite_device); - break 'start; - } - } - } + log::info!("Found missing {} device, adding source device {id} to existing composite device: {composite_device:?}", device.subsystem()); + let Some(client) = self.composite_devices.get(composite_device.as_str()) else { + log::error!("No existing composite device found for key {composite_device:?}"); + continue; + }; - log::info!("Found missing device, adding source device {id} to existing composite device: {composite_device}"); - let handle = self.composite_devices.get(composite_device.as_str()); - if handle.is_none() { - log::error!( - "No existing composite device found for key {}", - composite_device.as_str() - ); - continue; - } - self.add_device_to_composite_device(device, handle.unwrap()) - .await?; - self.source_devices_used - .insert(id.clone(), composite_device.clone()); - let composite_id = composite_device.clone(); - if !self.composite_device_sources.contains_key(&composite_id) { - self.composite_device_sources - .insert(composite_id.clone(), Vec::new()); - } - let sources = self - .composite_device_sources - .get_mut(&composite_id) - .unwrap(); - sources.push(source_device.clone()); + self.add_device_to_composite_device(device, client).await?; + self.source_devices_used + .insert(id.clone(), composite_device.clone()); + let composite_id = composite_device.clone(); + if !self.composite_device_sources.contains_key(&composite_id) { + self.composite_device_sources + .insert(composite_id.clone(), Vec::new()); + } + let sources = self + .composite_device_sources + .get_mut(&composite_id) + .unwrap(); + sources.push(source_device.clone()); + self.source_devices.insert(id, source_device.clone()); - self.source_devices.insert(id, source_device.clone()); - return Ok(()); - } - } - _ => (), - }; - log::trace!( - "Device {id} does not match existing device: {:?}", - config.name - ); + return Ok(()); } + log::debug!("No existing composite device matches device {id}."); // Check all CompositeDevice configs to see if this device creates @@ -1066,13 +847,11 @@ impl Manager { log::trace!("Checking config {:?} for device", config.name); // Check to see if 'auto_manage' is enabled for this config. - let auto_manage = { - if let Some(options) = config.options.as_ref() { - options.auto_manage.unwrap_or(false) - } else { - false - } - }; + let auto_manage = config + .options + .as_ref() + .map(|options| options.auto_manage.unwrap_or(false)) + .unwrap_or(false); if !self.manage_all_devices && !auto_manage { log::trace!( "Config {:?} does not have 'auto_manage' option enabled. Skipping.", @@ -1087,22 +866,18 @@ impl Manager { continue; } - // Check if this device matches any source udev configs - for source_device in config.source_devices.iter() { - let Some(udev_config) = source_device.udev.as_ref() else { - continue; - }; - if !config.has_matching_udev(&device, udev_config) { - continue; - } - + // Check if this device matches any source configs + if let Some(source_device) = config.get_matching_device(&device) { if let Some(ignored) = source_device.ignore { if ignored { log::trace!("Event device configured to ignore: {:?}", device); return Ok(()); } } - log::info!("Found a matching udev device {id}, creating CompositeDevice"); + log::info!( + "Found a matching {} device {id}, creating CompositeDevice", + device.subsystem() + ); let dev = self .create_composite_device_from_config(&config, device) .await?; @@ -1122,120 +897,6 @@ impl Manager { return Ok(()); } - let source_devices = config.source_devices.clone(); - match device.subsystem().as_str() { - "input" => { - for source_device in source_devices { - if source_device.evdev.is_none() { - continue; - } - // how to refrence source devices used by this config? - - if config.has_matching_evdev(&device, &source_device.clone().evdev.unwrap()) - { - if let Some(ignored) = source_device.ignore { - if ignored { - log::trace!("Event device configured to ignore: {:?}", device); - return Ok(()); - } - } - log::info!( - "Found a matching evdev device {id}, creating CompositeDevice" - ); - let dev = self - .create_composite_device_from_config(&config, device) - .await?; - - // Get the target input devices from the config - let target_devices_config = config.target_devices.clone(); - - // Create the composite deivce - self.start_composite_device( - dev, - config, - target_devices_config, - source_device.clone(), - ) - .await?; - - return Ok(()); - } - } - } - "hidraw" => { - for source_device in source_devices { - if source_device.hidraw.is_none() { - continue; - } - if config - .has_matching_hidraw(&device, &source_device.clone().hidraw.unwrap()) - { - if let Some(ignored) = source_device.ignore { - if ignored { - log::trace!("hidraw device configured to ignore: {:?}", device); - return Ok(()); - } - } - log::info!( - "Found a matching hidraw device {id}, creating CompositeDevice" - ); - let dev = self - .create_composite_device_from_config(&config, device) - .await?; - - // Get the target input devices from the config - let target_devices_config = config.target_devices.clone(); - - // Create the composite deivce - self.start_composite_device( - dev, - config, - target_devices_config, - source_device.clone(), - ) - .await?; - - return Ok(()); - } - } - } - "iio" => { - for source_device in source_devices { - if source_device.iio.is_none() { - continue; - } - if config.has_matching_iio(&device, &source_device.clone().iio.unwrap()) { - if let Some(ignored) = source_device.ignore { - if ignored { - log::trace!("iio device configured to ignore: {:?}", device); - return Ok(()); - } - } - log::info!( - "Found a matching iio device {id}, creating CompositeDevice" - ); - let dev = self - .create_composite_device_from_config(&config, device.clone()) - .await?; - - // Get the target input devices from the config - let target_devices_config = config.target_devices.clone(); - - // Create the composite deivce - self.start_composite_device( - dev, - config, - target_devices_config, - source_device.clone(), - ) - .await?; - - return Ok(()); - } - } - } - _ => (), - } log::trace!("Device does not match config: {:?}", config.name); } log::debug!("No unused configs found for device."); @@ -1630,6 +1291,7 @@ impl Manager { } } + /// Watch for IIO device events fn watch_iio_devices( cmd_tx: mpsc::Sender, ) -> tokio::task::JoinHandle>> { @@ -1679,10 +1341,11 @@ impl Manager { }) } - async fn watch_hidraw_devices( + /// Watch for appearance and disappearence of devices is /dev and associate the corresponding udev device + async fn watch_devnodes( cmd_tx: mpsc::Sender, watcher_rx: &mut mpsc::Receiver, - ) -> () { + ) { 'outer: while let Some(event) = watcher_rx.recv().await { match event { WatchEvent::Create { name, base_path } => { @@ -1725,7 +1388,7 @@ impl Manager { "Unable to create UdevDevice from {base_path}/{name} to check initialization" ); attempt += 1; - tokio::time::sleep(Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_millis(50)).await; continue; }; @@ -1735,7 +1398,7 @@ impl Manager { }; log::trace!("{base_path}/{name} is not yet initialized by udev"); - tokio::time::sleep(Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_millis(50)).await; attempt += 1; } @@ -1905,7 +1568,7 @@ impl Manager { result.unwrap_or_default() } - /// Creates a DBus object + /// Creates a DBus object and return the (active) handle to the listener async fn listen_on_dbus( dbus: Connection, tx: mpsc::Sender,