Skip to content

Commit

Permalink
Remove obsolete FunctionCode::Disconnect
Browse files Browse the repository at this point in the history
Use AsyncWriteExt::shutdown() to release all resources.
  • Loading branch information
uklotzde committed Aug 28, 2024
1 parent 14fa1ae commit 4251593
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 157 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

# Changelog

## v0.15.0 (Unreleased)

### Breaking Changes

- Client: Added new `disconnect()` method to trait that returns `std::io::Result`.
- Removed `FunctionCode::Disconnect`.

## v0.14.0 (2024-07-21)

### Breaking Changes
Expand All @@ -15,7 +22,7 @@

### Breaking Changes

- Add `FunctionCode::Disconnect`.
- Added `FunctionCode::Disconnect`.

## v0.13.0 (2024-06-23, yanked)

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ log = "0.4.20"
smallvec = { version = "1.13.1", optional = true, default-features = false }
socket2 = { version = "0.5.5", optional = true, default-features = false }
thiserror = "1.0.58"
tokio = { version = "1.35.1", default-features = false }
tokio = { version = "1.35.1", default-features = false, features = ["io-util"] }
# Disable default-features to exclude unused dependency on libudev
tokio-serial = { version = "5.4.4", optional = true, default-features = false }
tokio-util = { version = "0.7.10", optional = true, default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion examples/rtu-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Sensor value is: {rsp:?}");

println!("Disconnecting");
ctx.disconnect().await??;
ctx.disconnect().await?;

Ok(())
}
2 changes: 1 addition & 1 deletion examples/tcp-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("The coupler ID is '{id}'");

println!("Disconnecting");
ctx.disconnect().await??;
ctx.disconnect().await?;

Ok(())
}
2 changes: 1 addition & 1 deletion examples/tls-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Reading Holding Registers");
let data = ctx.read_holding_registers(40000, 68).await?;
println!("Holding Registers Data is '{:?}'", data);
ctx.disconnect().await??;
ctx.disconnect().await?;

Ok(())
}
45 changes: 24 additions & 21 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{borrow::Cow, fmt::Debug, io};

use async_trait::async_trait;

use crate::{frame::*, slave::*, Error, Result};
use crate::{frame::*, slave::*, Result};

#[cfg(feature = "rtu")]
pub mod rtu;
Expand All @@ -21,11 +21,22 @@ pub mod sync;
/// Transport independent asynchronous client trait
#[async_trait]
pub trait Client: SlaveContext + Send + Debug {
/// Invoke a _Modbus_ function
/// Invokes a _Modbus_ function.
async fn call(&mut self, request: Request<'_>) -> Result<Response>;

/// Disconnects the client.
///
/// Permanently disconnects the client by shutting down the
/// underlying stream in a graceful manner (`AsyncDrop`).
///
/// Dropping the client without explicitly disconnecting it
/// beforehand should also work and free all resources. The
/// actual behavior might depend on the underlying transport
/// protocol (RTU/TCP) that is used by the client.
async fn disconnect(&mut self) -> io::Result<()>;
}

/// Asynchronous Modbus reader
/// Asynchronous _Modbus_ reader
#[async_trait]
pub trait Reader: Client {
/// Read multiple coils (0x01)
Expand Down Expand Up @@ -83,22 +94,6 @@ pub struct Context {
client: Box<dyn Client>,
}

impl Context {
/// Disconnect the client
pub async fn disconnect(&mut self) -> Result<()> {
// Disconnecting is expected to fail!
let res = self.client.call(Request::Disconnect).await;
match res {
Ok(_) => unreachable!(),
Err(Error::Transport(err)) => match err.kind() {
io::ErrorKind::NotConnected | io::ErrorKind::BrokenPipe => Ok(Ok(())),
_ => Err(Error::Transport(err)),
},
Err(err) => Err(err),
}
}
}

impl From<Box<dyn Client>> for Context {
fn from(client: Box<dyn Client>) -> Self {
Self { client }
Expand All @@ -116,6 +111,10 @@ impl Client for Context {
async fn call(&mut self, request: Request<'_>) -> Result<Response> {
self.client.call(request).await
}

async fn disconnect(&mut self) -> io::Result<()> {
self.client.disconnect().await
}
}

impl SlaveContext for Context {
Expand Down Expand Up @@ -319,10 +318,10 @@ impl Writer for Context {

#[cfg(test)]
mod tests {
use crate::Result;
use crate::{Error, Result};

use super::*;
use std::sync::Mutex;
use std::{io, sync::Mutex};

#[derive(Default, Debug)]
pub(crate) struct ClientMock {
Expand Down Expand Up @@ -358,6 +357,10 @@ mod tests {
Err(err) => Err(err),
}
}

async fn disconnect(&mut self) -> io::Result<()> {
Ok(())
}
}

impl SlaveContext for ClientMock {
Expand Down
2 changes: 0 additions & 2 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl<'a> TryFrom<Request<'a>> for Bytes {
data.put_u8(*d);
}
}
Disconnect => unreachable!(),
}
Ok(data.freeze())
}
Expand Down Expand Up @@ -452,7 +451,6 @@ fn request_byte_count(req: &Request<'_>) -> usize {
MaskWriteRegister(_, _, _) => 7,
ReadWriteMultipleRegisters(_, _, _, ref data) => 10 + data.len() * 2,
Custom(_, ref data) => 1 + data.len(),
Disconnect => unreachable!(),
}
}

Expand Down
36 changes: 4 additions & 32 deletions src/codec/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,7 @@ impl Decoder for ServerCodec {
// to transmission errors, because the frame's bytes
// have already been verified with the CRC.
RequestPdu::try_from(pdu_data)
.map(|pdu| {
Some(RequestAdu {
hdr,
pdu,
disconnect: false,
})
})
.map(|pdu| Some(RequestAdu { hdr, pdu }))
.map_err(|err| {
// Unrecoverable error
log::error!("Failed to decode request PDU: {}", err);
Expand All @@ -329,21 +323,7 @@ impl<'a> Encoder<RequestAdu<'a>> for ClientCodec {
type Error = Error;

fn encode(&mut self, adu: RequestAdu<'a>, buf: &mut BytesMut) -> Result<()> {
let RequestAdu {
hdr,
pdu,
disconnect,
} = adu;
if disconnect {
// The disconnect happens implicitly after letting this request
// fail by returning an error. This will drop the attached
// transport, e.g. for closing a stale, exclusive connection
// to a serial port before trying to reconnect.
return Err(Error::new(
ErrorKind::NotConnected,
"Disconnecting - not an error",
));
}
let RequestAdu { hdr, pdu } = adu;
let pdu_data: Bytes = pdu.try_into()?;
buf.reserve(pdu_data.len() + 3);
buf.put_u8(hdr.slave_id);
Expand Down Expand Up @@ -729,11 +709,7 @@ mod tests {
let pdu = req.into();
let slave_id = 0x01;
let hdr = Header { slave_id };
let adu = RequestAdu {
hdr,
pdu,
disconnect: false,
};
let adu = RequestAdu { hdr, pdu };
codec.encode(adu, &mut buf).unwrap();

assert_eq!(
Expand All @@ -749,11 +725,7 @@ mod tests {
let pdu = req.into();
let slave_id = 0x01;
let hdr = Header { slave_id };
let adu = RequestAdu {
hdr,
pdu,
disconnect: false,
};
let adu = RequestAdu { hdr, pdu };
let mut buf = BytesMut::with_capacity(40);
#[allow(unsafe_code)]
unsafe {
Expand Down
33 changes: 4 additions & 29 deletions src/codec/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,7 @@ impl Decoder for ServerCodec {
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<RequestAdu<'static>>> {
if let Some((hdr, pdu_data)) = self.decoder.decode(buf)? {
let pdu = RequestPdu::try_from(pdu_data)?;
Ok(Some(RequestAdu {
hdr,
pdu,
disconnect: false,
}))
Ok(Some(RequestAdu { hdr, pdu }))
} else {
Ok(None)
}
Expand All @@ -137,20 +133,7 @@ impl<'a> Encoder<RequestAdu<'a>> for ClientCodec {
type Error = Error;

fn encode(&mut self, adu: RequestAdu<'a>, buf: &mut BytesMut) -> Result<()> {
let RequestAdu {
hdr,
pdu,
disconnect,
} = adu;
if disconnect {
// The disconnect happens implicitly after letting this request
// fail by returning an error. This will drop the attached
// transport, e.g. for terminating an open connection.
return Err(Error::new(
ErrorKind::NotConnected,
"Disconnecting - not an error",
));
}
let RequestAdu { hdr, pdu } = adu;
let pdu_data: Bytes = pdu.try_into()?;
buf.reserve(pdu_data.len() + 7);
buf.put_u16(hdr.transaction_id);
Expand Down Expand Up @@ -284,11 +267,7 @@ mod tests {
transaction_id: TRANSACTION_ID,
unit_id: UNIT_ID,
};
let adu = RequestAdu {
hdr,
pdu,
disconnect: false,
};
let adu = RequestAdu { hdr, pdu };
codec.encode(adu, &mut buf).unwrap();
// header
assert_eq!(buf[0], TRANSACTION_ID_HI);
Expand All @@ -312,11 +291,7 @@ mod tests {
transaction_id: TRANSACTION_ID,
unit_id: UNIT_ID,
};
let adu = RequestAdu {
hdr,
pdu,
disconnect: false,
};
let adu = RequestAdu { hdr, pdu };
let mut buf = BytesMut::with_capacity(40);
#[allow(unsafe_code)]
unsafe {
Expand Down
23 changes: 1 addition & 22 deletions src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ pub enum FunctionCode {

/// Custom Modbus Function Code.
Custom(u8),

Disconnect,
}

impl FunctionCode {
Expand All @@ -69,11 +67,7 @@ impl FunctionCode {
}
}

/// Get the [`u8`] value of the current [`FunctionCode`].
///
/// # Panics
///
/// Panics on [`Disconnect`](Self::Disconnect) which has no corresponding Modbus function code.
/// Gets the [`u8`] value of the current [`FunctionCode`].
#[must_use]
pub const fn value(self) -> u8 {
match self {
Expand All @@ -88,7 +82,6 @@ impl FunctionCode {
Self::MaskWriteRegister => 0x16,
Self::ReadWriteMultipleRegisters => 0x17,
Self::Custom(code) => code,
Self::Disconnect => unreachable!(),
}
}
}
Expand Down Expand Up @@ -181,17 +174,6 @@ pub enum Request<'a> {
/// The first parameter is the Modbus function code.
/// The second parameter is the raw bytes of the request.
Custom(u8, Cow<'a, [u8]>),

/// A poison pill for stopping the client service and to release
/// the underlying transport, e.g. for disconnecting from an
/// exclusively used serial port.
///
/// This is an ugly workaround, because `tokio-proto` does not
/// provide other means to gracefully shut down the `ClientService`.
/// Otherwise the bound transport is never freed as long as the
/// executor is active, even when dropping the Modbus client
/// context.
Disconnect,
}

impl<'a> Request<'a> {
Expand Down Expand Up @@ -220,7 +202,6 @@ impl<'a> Request<'a> {
ReadWriteMultipleRegisters(addr, qty, write_addr, Cow::Owned(words.into_owned()))
}
Custom(func, bytes) => Custom(func, Cow::Owned(bytes.into_owned())),
Disconnect => Disconnect,
}
}

Expand All @@ -247,8 +228,6 @@ impl<'a> Request<'a> {
ReadWriteMultipleRegisters(_, _, _, _) => FunctionCode::ReadWriteMultipleRegisters,

Custom(code, _) => FunctionCode::Custom(*code),

Disconnect => unreachable!(),
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/frame/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ pub(crate) struct Header {
pub struct RequestAdu<'a> {
pub(crate) hdr: Header,
pub(crate) pdu: RequestPdu<'a>,
pub(crate) disconnect: bool,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
1 change: 0 additions & 1 deletion src/frame/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub(crate) struct Header {
pub struct RequestAdu<'a> {
pub(crate) hdr: Header,
pub(crate) pdu: RequestPdu<'a>,
pub(crate) disconnect: bool,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
Loading

0 comments on commit 4251593

Please sign in to comment.