Skip to content

Commit

Permalink
Regroup modules
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Dec 4, 2024
1 parent 52f0abe commit a349580
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 512 deletions.
22 changes: 22 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,28 @@ impl Writer for Context {
}
}

#[cfg(any(feature = "rtu", feature = "tcp"))]
pub(crate) async fn disconnect_framed<T, C>(
framed: tokio_util::codec::Framed<T, C>,
) -> std::io::Result<()>
where
T: tokio::io::AsyncWrite + Unpin,
{
use tokio::io::AsyncWriteExt as _;

framed
.into_inner()
.shutdown()
.await
.or_else(|err| match err.kind() {
std::io::ErrorKind::NotConnected | std::io::ErrorKind::BrokenPipe => {
// Already disconnected.
Ok(())
}
_ => Err(err),
})
}

#[cfg(test)]
mod tests {
use crate::{Error, Result};
Expand Down
261 changes: 255 additions & 6 deletions src/client/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,278 @@

//! RTU client connections
use std::{fmt, io};

use futures_util::{SinkExt as _, StreamExt as _};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

use crate::rtu::{Client, ClientContext};
use crate::{
codec::rtu::ClientCodec,
frame::{
rtu::{Header, RequestAdu},
RequestPdu,
},
slave::SlaveContext,
FunctionCode, Request, Response, Result, Slave,
};

use super::*;
use super::{disconnect_framed, Context};

/// Connect to no particular Modbus slave device for sending
/// Connect to no particular _Modbus_ slave device for sending
/// broadcast messages.
pub fn attach<T>(transport: T) -> Context
where
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + fmt::Debug + Unpin + Send + 'static,
{
attach_slave(transport, Slave::broadcast())
}

/// Connect to any kind of Modbus slave device.
/// Connect to any kind of _Modbus_ slave device.
pub fn attach_slave<T>(transport: T, slave: Slave) -> Context
where
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + fmt::Debug + Unpin + Send + 'static,
{
let client = Client::new(transport);
let context = ClientContext::new(client, slave);
Context {
client: Box::new(context),
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RequestContext {
pub(crate) function_code: FunctionCode,
pub(crate) header: Header,
}

impl RequestContext {
#[must_use]
pub const fn function_code(&self) -> FunctionCode {
self.function_code
}
}

/// _Modbus_ RTU client.
#[derive(Debug)]
pub struct Client<T> {
framed: Framed<T, ClientCodec>,
}

impl<T> Client<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub fn new(transport: T) -> Self {
let framed = Framed::new(transport, ClientCodec::default());
Self { framed }
}

pub async fn disconnect(self) -> io::Result<()> {
let Self { framed } = self;
disconnect_framed(framed).await
}

pub async fn call<'a>(&mut self, server: Slave, request: Request<'a>) -> Result<Response> {
let request_context = self.send_request(server, request).await?;
self.recv_response(request_context).await
}

pub async fn send_request<'a>(
&mut self,
server: Slave,
request: Request<'a>,
) -> io::Result<RequestContext> {
self.send_request_pdu(server, request).await
}

async fn send_request_pdu<'a, R>(
&mut self,
server: Slave,
request_pdu: R,
) -> io::Result<RequestContext>
where
R: Into<RequestPdu<'a>>,
{
let request_adu = request_adu(server, request_pdu);
self.send_request_adu(request_adu).await
}

async fn send_request_adu<'a>(
&mut self,
request_adu: RequestAdu<'a>,
) -> io::Result<RequestContext> {
let request_context = request_adu.context();

self.framed.read_buffer_mut().clear();
self.framed.send(request_adu).await?;

Ok(request_context)
}

pub async fn recv_response(&mut self, request_context: RequestContext) -> Result<Response> {
let response_adu = self
.framed
.next()
.await
.unwrap_or_else(|| Err(io::Error::from(io::ErrorKind::BrokenPipe)))?;

response_adu.try_into_response(request_context)
}
}

/// _Modbus_ RTU client with (server) context and connection state.
///
/// Client that invokes methods (request/response) on a single or many (broadcast) server(s).
///
/// The server can be switched between method calls.
#[derive(Debug)]
pub struct ClientContext<T> {
client: Option<Client<T>>,
server: Slave,
}

impl<T> ClientContext<T> {
pub fn new(client: Client<T>, server: Slave) -> Self {
Self {
client: Some(client),
server,
}
}

#[must_use]
pub const fn is_connected(&self) -> bool {
self.client.is_some()
}

#[must_use]
pub const fn server(&self) -> Slave {
self.server
}

pub fn set_server(&mut self, server: Slave) {
self.server = server;
}
}

impl<T> ClientContext<T>
where
T: AsyncWrite + Unpin,
{
pub async fn disconnect(&mut self) -> io::Result<()> {
let Some(client) = self.client.take() else {
// Already disconnected.
return Ok(());
};
disconnect_framed(client.framed).await
}
}

impl<T> ClientContext<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub async fn call(&mut self, request: Request<'_>) -> Result<Response> {
log::debug!("Call {:?}", request);

let Some(client) = &mut self.client else {
return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected").into());
};

client.call(self.server, request).await
}
}

impl<T> ClientContext<T>
where
T: AsyncRead + AsyncWrite + Unpin + fmt::Debug + Send + 'static,
{
#[must_use]
pub fn boxed(self) -> Box<dyn crate::client::Client> {
Box::new(self)
}
}

impl<T> SlaveContext for ClientContext<T> {
fn set_slave(&mut self, slave: Slave) {
self.set_server(slave);
}
}

#[async_trait::async_trait]
impl<T> crate::client::Client for ClientContext<T>
where
T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin,
{
async fn call(&mut self, req: Request<'_>) -> Result<Response> {
self.call(req).await
}

async fn disconnect(&mut self) -> io::Result<()> {
self.disconnect().await
}
}

fn request_adu<'a, R>(server: Slave, request_pdu: R) -> RequestAdu<'a>
where
R: Into<RequestPdu<'a>>,
{
let hdr = Header { slave: server };
let pdu = request_pdu.into();
RequestAdu { hdr, pdu }
}

#[cfg(test)]
mod tests {
use core::{
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Result};

use crate::Error;

use super::*;

#[derive(Debug)]
struct MockTransport;

impl Unpin for MockTransport {}

impl AsyncRead for MockTransport {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsyncWrite for MockTransport {
fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll<Result<usize>> {
Poll::Ready(Ok(2))
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
unimplemented!()
}
}

#[tokio::test]
async fn handle_broken_pipe() {
let transport = MockTransport;
let client = Client::new(transport);
let mut context = ClientContext::new(client, Slave::broadcast());
let res = context.call(Request::ReadCoils(0x00, 5)).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, Error::Transport(err) if err.kind() == std::io::ErrorKind::BrokenPipe)
);
}
}
Loading

0 comments on commit a349580

Please sign in to comment.