Skip to content

Commit

Permalink
handle socketcan network down
Browse files Browse the repository at this point in the history
  • Loading branch information
pd0wm committed Apr 1, 2024
1 parent 2051824 commit 412388d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/can/async_can.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn process<T: CanAdapter>(
let mut callbacks: HashMap<BusIdentifier, VecDeque<FrameCallback>> = HashMap::new();

while shutdown_receiver.try_recv().is_err() {
let frames: Vec<Frame> = adapter.recv().unwrap();
let frames: Vec<Frame> = adapter.recv().expect("Failed to Receive CAN Frames");

for frame in frames {
if DEBUG {
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum Error {
MalformedFrame,
#[error("Timeout")]
Timeout,
#[error("Disconnected")]
Disconnected,
#[error(transparent)]
IsoTPError(#[from] crate::isotp::Error),
#[error(transparent)]
Expand Down
19 changes: 15 additions & 4 deletions src/socketcan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,21 @@ impl CanAdapter for SocketCan {
fn recv(&mut self) -> Result<Vec<Frame>> {
let mut frames = vec![];

while let Ok((frame, meta)) = self.socket.read_frame_with_meta() {
let mut frame: crate::can::Frame = frame.into();
frame.loopback = meta.loopback;
frames.push(frame);
loop {
match self.socket.read_frame_with_meta() {
Ok((frame, meta)) => {
let mut frame: crate::can::Frame = frame.into();
frame.loopback = meta.loopback;
frames.push(frame);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
break;
}
Err(e) => {
tracing::error!("Error reading frame: {}", e);
return Err(crate::error::Error::Disconnected);
}
}
}

// Add fake loopback frames to the receive queue
Expand Down
8 changes: 5 additions & 3 deletions tests/adapter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::time::Duration;

static BULK_NUM_FRAMES_SYNC: u64 = 0x100;
static BULK_NUM_FRAMES_ASYNC: u64 = 0x1000;
static BULK_TIMEOUT_MS: u64 = 1000;
static BULK_SYNC_TIMEOUT_MS: u64 = 1000;
static BULK_ASYNC_TIMEOUT_MS: u64 = 5000;

/// Sends a large number of frames to a "blocking" adapter, and then reads back all sent messages.
/// This verified the adapter doesn't drop messages and reads them back in the same order as they are sent,
Expand All @@ -27,7 +28,8 @@ fn bulk_send_sync<T: CanAdapter>(adapter: &mut T) {
let start = std::time::Instant::now();

let mut received: Vec<Frame> = vec![];
while received.len() < frames.len() && start.elapsed() < Duration::from_millis(BULK_TIMEOUT_MS)
while received.len() < frames.len()
&& start.elapsed() < Duration::from_millis(BULK_SYNC_TIMEOUT_MS)
{
let rx = adapter.recv().unwrap();
let rx: Vec<Frame> = rx.into_iter().filter(|frame| frame.loopback).collect();
Expand Down Expand Up @@ -55,7 +57,7 @@ async fn bulk_send(adapter: &AsyncCanAdapter) {

let r = frames.iter().map(|frame| adapter.send(frame));
tokio::time::timeout(
Duration::from_millis(BULK_TIMEOUT_MS),
Duration::from_millis(BULK_ASYNC_TIMEOUT_MS),
futures::future::join_all(r),
)
.await
Expand Down

0 comments on commit 412388d

Please sign in to comment.