Skip to content

Commit

Permalink
dbus: make dbus path parth of a CompositeDevice at construction
Browse files Browse the repository at this point in the history
DBus path cannot be None as part of the code expects it and it will call .unwrap() on the option: construct a CompositeDevice with that value in place and ready for every other subsequent usage.
  • Loading branch information
NeroReflex committed Dec 31, 2024
1 parent fe4a01c commit add970e
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 89 deletions.
84 changes: 33 additions & 51 deletions src/input/composite_device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use std::{
};

use evdev::InputEvent;
use tokio::{sync::mpsc, task::JoinSet, time::Duration};
use tokio::{
sync::mpsc,
task::{JoinHandle, JoinSet},
time::Duration,
};
use zbus::Connection;

use crate::{
Expand Down Expand Up @@ -96,7 +100,7 @@ pub struct CompositeDevice {
/// release events
emitted_mappings: HashMap<String, CapabilityMapping>,
/// The DBus path this [CompositeDevice] is listening on
dbus_path: Option<String>,
dbus_path: String,
/// Mode defining how inputs should be routed
intercept_mode: InterceptMode,
/// Transmit channel for sending commands to this composite device
Expand Down Expand Up @@ -159,6 +163,7 @@ impl CompositeDevice {
manager: mpsc::Sender<ManagerCommand>,
config: CompositeDeviceConfig,
device_info: UdevDevice,
dbus_path: String,
capability_map: Option<CapabilityMap>,
) -> Result<Self, Box<dyn Error>> {
log::info!("Creating CompositeDevice with config: {}", config.name);
Expand All @@ -177,7 +182,7 @@ impl CompositeDevice {
translatable_active_inputs: Vec::new(),
translated_recent_events: HashSet::new(),
emitted_mappings: HashMap::new(),
dbus_path: None,
dbus_path,
intercept_mode: InterceptMode::None,
tx,
rx,
Expand Down Expand Up @@ -233,22 +238,25 @@ impl CompositeDevice {
Ok(device)
}

/// Return the DBus path of the composite device
pub fn dbus_path(&self) -> String {
self.dbus_path.clone()
}

/// Creates a new instance of the composite device interface on DBus.
pub async fn listen_on_dbus(&mut self, path: String) -> Result<(), Box<dyn Error>> {
pub async fn listen_on_dbus(&self) -> Result<JoinHandle<()>, Box<dyn Error>> {
let conn = self.conn.clone();
let client = self.client();
self.dbus_path = Some(path.clone());
tokio::spawn(async move {
let path = self.dbus_path();
Ok(tokio::spawn(async move {
log::debug!("Starting dbus interface: {path}");
let iface = CompositeDeviceInterface::new(client);
if let Err(e) = conn.object_server().at(path.clone(), iface).await {
log::debug!("Failed to start dbus interface {path}: {e:?}");
} else {
log::debug!("Started dbus interface: {path}");
log::debug!("Started listening on dbus interface: {path}");
}
});
log::info!("Started listening on {}", self.dbus_path.as_ref().unwrap());
Ok(())
}))
}

/// Starts the [CompositeDevice] and listens for events from all source
Expand All @@ -259,6 +267,8 @@ impl CompositeDevice {
) -> Result<(), Box<dyn Error>> {
log::debug!("Starting composite device");

let dbus_path = self.dbus_path.clone();

// Start all source devices
self.run_source_devices().await?;

Expand Down Expand Up @@ -377,8 +387,7 @@ impl CompositeDevice {
}
if self.source_devices_used.is_empty() {
log::debug!(
"No source devices remain. Stopping CompositeDevice {:?}",
self.dbus_path
"No source devices remain. Stopping CompositeDevice {dbus_path}"
);
break 'main;
}
Expand Down Expand Up @@ -480,27 +489,18 @@ impl CompositeDevice {
self.set_intercept_activation(activation_caps, target_cap)
}
CompositeCommand::Stop => {
log::debug!(
"Got STOP signal. Stopping CompositeDevice: {:?}",
self.dbus_path
);
log::debug!("Got STOP signal. Stopping CompositeDevice: {dbus_path}");
break 'main;
}
CompositeCommand::Suspend(sender) => {
log::info!(
"Preparing for system suspend for: {}",
self.dbus_path.as_ref().unwrap_or(&"".to_string())
);
log::info!("Preparing for system suspend for: {dbus_path}");
self.handle_suspend().await;
if let Err(e) = sender.send(()).await {
log::error!("Failed to send suspend response: {e:?}");
}
}
CompositeCommand::Resume(sender) => {
log::info!(
"Preparing for system resume for: {}",
self.dbus_path.as_ref().unwrap_or(&"".to_string())
);
log::info!("Preparing for system resume for: {dbus_path}");
self.handle_resume().await;
if let Err(e) = sender.send(()).await {
log::error!("Failed to send resume response: {e:?}");
Expand All @@ -512,17 +512,11 @@ impl CompositeDevice {
// If no source devices remain after processing the queue, stop
// the device.
if devices_removed && self.source_devices_used.is_empty() {
log::debug!(
"No source devices remain. Stopping CompositeDevice {:?}",
self.dbus_path
);
log::debug!("No source devices remain. Stopping CompositeDevice {dbus_path}");
break 'main;
}
}
log::info!(
"CompositeDevice stopping: {}",
self.dbus_path.as_ref().unwrap()
);
log::info!("CompositeDevice stopping: {dbus_path}");

// Stop all target devices
log::debug!("Stopping target devices");
Expand Down Expand Up @@ -565,10 +559,7 @@ impl CompositeDevice {
res?;
}

log::info!(
"CompositeDevice stopped: {}",
self.dbus_path.as_ref().unwrap()
);
log::info!("CompositeDevice stopped: {dbus_path}");

Ok(())
}
Expand Down Expand Up @@ -1816,9 +1807,7 @@ impl CompositeDevice {
}
}

let Some(composite_path) = self.dbus_path.clone() else {
return Err("No composite device DBus path found".into());
};
let composite_path = self.dbus_path.clone();

// Create new target devices using the input manager
for kind in device_types_to_start {
Expand Down Expand Up @@ -1925,6 +1914,8 @@ impl CompositeDevice {
&mut self,
targets: HashMap<String, TargetDeviceClient>,
) -> Result<(), Box<dyn Error>> {
let dbus_path = self.dbus_path.clone();

// Keep track of all target devices
for (path, target) in targets.into_iter() {
log::debug!("Attaching target device: {path}");
Expand All @@ -1933,10 +1924,7 @@ impl CompositeDevice {
format!("Failed to set composite device for target device: {:?}", e).into(),
);
}
log::debug!(
"Attached device {path} to {:?}",
self.dbus_path.as_ref().unwrap_or(&"".to_string())
);
log::debug!("Attached device {path} to {dbus_path}");

// Query the target device for its capabilities
let caps = match target.get_capabilities().await {
Expand Down Expand Up @@ -1972,10 +1960,7 @@ impl CompositeDevice {

/// Emit a DBus signal when target devices change
async fn signal_targets_changed(&self) {
let Some(dbus_path) = self.dbus_path.clone() else {
log::error!("No DBus path for composite device exists to emit signal!");
return;
};
let dbus_path = self.dbus_path.clone();
let conn = self.conn.clone();

tokio::task::spawn(async move {
Expand Down Expand Up @@ -2007,10 +1992,7 @@ impl CompositeDevice {

/// Emit a DBus signal when source devices change
async fn signal_sources_changed(&self) {
let Some(dbus_path) = self.dbus_path.clone() else {
log::error!("No DBus path for composite device exists to emit signal!");
return;
};
let dbus_path = self.dbus_path.clone();
let conn = self.conn.clone();

tokio::task::spawn(async move {
Expand Down
67 changes: 29 additions & 38 deletions src/input/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use mio::{Events, Interest, Poll, Token};
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::task;
use tokio::task::JoinHandle;
use zbus::fdo::ManagedObjects;
use zbus::zvariant::ObjectPath;
use zbus::Connection;
Expand Down Expand Up @@ -483,6 +484,7 @@ impl Manager {
self.tx.clone(),
config,
device,
self.next_composite_dbus_path()?,
capability_map,
)?;

Expand Down Expand Up @@ -596,22 +598,19 @@ impl Manager {
config: CompositeDeviceConfig,
target_types: Option<Vec<String>>,
source_device: SourceDevice,
) -> Result<(), Box<dyn Error>> {
// Generate the DBus tree path for this composite device
let path = self.next_composite_dbus_path();

) -> Result<JoinHandle<()>, Box<dyn Error>> {
// Keep track of the source devices that this composite device is
// using.
let source_device_ids = device.get_source_devices_used();
let composite_path = device.dbus_path();
log::debug!(
"Starting CompositeDevice at {path} with the following sources: {source_device_ids:?}"
"Starting CompositeDevice at {composite_path} with the following sources: {source_device_ids:?}"
);
for id in source_device_ids {
self.source_devices_used.insert(id.clone(), path.clone());
self.source_devices_used.insert(id.clone(), composite_path.clone());
self.source_devices.insert(id, source_device.clone());
}

let composite_path = path.clone();
if !self.composite_device_sources.contains_key(&composite_path) {
self.composite_device_sources
.insert(composite_path.clone(), Vec::new());
Expand All @@ -622,8 +621,7 @@ impl Manager {
.unwrap();
sources.push(source_device);

// Create a DBus interface for the device
device.listen_on_dbus(path.clone()).await?;
device.listen_on_dbus().await?;

// Get a handle to the device
let client = device.client();
Expand All @@ -632,7 +630,7 @@ impl Manager {
let mut target_device_paths = Vec::new();

// Create a DBus target device
log::debug!("Creating target devices for {path}");
log::debug!("Creating target devices for {composite_path}");
let dbus_device = self.create_target_device("dbus").await?;
let dbus_devices = self.start_target_devices(vec![dbus_device]).await?;
let dbus_paths = dbus_devices.keys();
Expand All @@ -658,32 +656,30 @@ impl Manager {
}

// Run the device
let dbus_path = path.clone();
let tx = self.tx.clone();
tokio::spawn(async move {
if let Err(e) = device.run(targets).await {
log::error!("Error running {dbus_path}: {}", e.to_string());
}
log::debug!("Composite device stopped running: {dbus_path}");
if let Err(e) = tx
.send(ManagerCommand::CompositeDeviceStopped(dbus_path))
.await
{
log::error!("Error sending composite device stopped: {}", e.to_string());
}
});
let comp_path = path.clone();
let comp_path = composite_path.clone();

// Add the device to our maps
self.composite_devices.insert(comp_path, client);
log::trace!("Managed source devices: {:?}", self.source_devices_used);
self.used_configs.insert(path, config);
self.used_configs.insert(composite_path.clone(), config);
log::trace!("Used configs: {:?}", self.used_configs);
self.composite_device_targets
.insert(composite_path.clone(), target_device_paths);
log::trace!("Used target devices: {:?}", self.composite_device_targets);

Ok(())
Ok(tokio::spawn(async move {
if let Err(e) = device.run(targets).await {
log::error!("Error running {composite_path}: {}", e.to_string());
}
log::debug!("Composite device stopped running: {composite_path}");
if let Err(e) = tx
.send(ManagerCommand::CompositeDeviceStopped(composite_path.clone()))
.await
{
log::error!("Error sending to composite device {composite_path} the stopped signal: {}", e.to_string());
}
}))
}

/// Called when a composite device stops running
Expand Down Expand Up @@ -896,7 +892,7 @@ impl Manager {
target_devices_config,
source_device.clone(),
)
.await?
.await?;
}
};

Expand Down Expand Up @@ -1329,20 +1325,15 @@ impl Manager {
}

/// Returns the next available composite device dbus path
fn next_composite_dbus_path(&self) -> String {
let max = 2048;
let mut i = 0;
loop {
if i > max {
return "Devices exceeded".to_string();
}
fn next_composite_dbus_path(&self) -> Result<String, Box<dyn Error>> {
for i in 0u64.. {
let path = format!("{}/CompositeDevice{}", BUS_PREFIX, i);
if self.composite_devices.contains_key(&path) {
i += 1;
continue;
if !self.composite_devices.contains_key(&path) {
return Ok(path)
}
return path;
}

Err(Box::from("No available dbus path left"))
}

fn watch_iio_devices(
Expand Down

0 comments on commit add970e

Please sign in to comment.