From a52186893383b0b46a043c26c2e613f36c9c009b Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 31 Jan 2023 14:14:04 +0100 Subject: [PATCH] RTU/TCP sync: Add optional timeout --- CHANGELOG.md | 3 +- Cargo.toml | 2 +- src/client/sync/mod.rs | 111 +++++++++++++++++++++++++++++++++-------- src/client/sync/rtu.rs | 31 ++++++++++-- src/client/sync/tcp.rs | 31 +++++++++--- 5 files changed, 142 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f9cbcfec..1a5a9f57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ ## v0.7.0 (Unreleased) -- Features: Replace "sync" with "rtu-sync" and "tcp-sync" respectively +- Features: Replace "sync" with "rtu-sync" and "tcp-sync" respectively. +- Added: Optional timeout for synchronous RTU/TCP operations [#125](https://github.com/slowtec/tokio-modbus/issues/125). ## v0.6.0 (2023-01-30) diff --git a/Cargo.toml b/Cargo.toml index 7c2dce89..fc14725d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ rtu = ["futures-util/sink"] tcp = ["tokio/net", "futures-util/sink"] # The internal feature `sync` has no effect when used alone. # It is always enabled together with `rtu-sync` or `tcp-sync`. -sync = [] +sync = ["dep:futures", "tokio/time"] rtu-sync = ["rtu", "sync", "dep:tokio-serial"] tcp-sync = ["tcp", "sync"] server = ["dep:futures"] diff --git a/src/client/sync/mod.rs b/src/client/sync/mod.rs index e9e9524f..6eb4652c 100644 --- a/src/client/sync/mod.rs +++ b/src/client/sync/mod.rs @@ -11,15 +11,35 @@ pub mod rtu; #[cfg(feature = "tcp-sync")] pub mod tcp; +use std::{future::Future, io::Result, time::Duration}; + +use futures::future::Either; + +use crate::{frame::*, slave::*}; + use super::{ Client as AsyncClient, Context as AsyncContext, Reader as AsyncReader, SlaveContext, Writer as AsyncWriter, }; -use crate::frame::*; -use crate::slave::*; - -use std::io::Result; +fn block_on_with_timeout( + runtime: &tokio::runtime::Runtime, + timeout: Option, + task: impl Future>, +) -> Result { + let task = if let Some(duration) = timeout { + Either::Left(async move { + tokio::time::timeout(duration, task) + .await + .unwrap_or_else(|elapsed| { + Err(std::io::Error::new(std::io::ErrorKind::TimedOut, elapsed)) + }) + }) + } else { + Either::Right(task) + }; + runtime.block_on(task) +} /// A transport independent synchronous client trait. pub trait Client: SlaveContext { @@ -52,13 +72,33 @@ pub trait Writer: Client { /// A synchronous Modbus client context. #[derive(Debug)] pub struct Context { - core: tokio::runtime::Runtime, + runtime: tokio::runtime::Runtime, async_ctx: AsyncContext, + timeout: Option, +} + +impl Context { + /// Returns the current timeout. + pub const fn timeout(&self) -> Option { + self.timeout + } + + /// Sets a timeout duration for all subsequent operations. + /// + /// The timeout is disabled by passing `None`. + pub fn set_timeout(&mut self, duration: impl Into>) { + self.timeout = duration.into() + } + + /// Disables the timeout for all subsequent operations. + pub fn reset_timeout(&mut self) { + self.timeout = None; + } } impl Client for Context { fn call(&mut self, req: Request) -> Result { - self.core.block_on(self.async_ctx.call(req)) + block_on_with_timeout(&self.runtime, self.timeout, self.async_ctx.call(req)) } } @@ -70,22 +110,35 @@ impl SlaveContext for Context { impl Reader for Context { fn read_coils(&mut self, addr: Address, cnt: Quantity) -> Result> { - self.core.block_on(self.async_ctx.read_coils(addr, cnt)) + block_on_with_timeout( + &self.runtime, + self.timeout, + self.async_ctx.read_coils(addr, cnt), + ) } fn read_discrete_inputs(&mut self, addr: Address, cnt: Quantity) -> Result> { - self.core - .block_on(self.async_ctx.read_discrete_inputs(addr, cnt)) + block_on_with_timeout( + &self.runtime, + self.timeout, + self.async_ctx.read_discrete_inputs(addr, cnt), + ) } fn read_input_registers(&mut self, addr: Address, cnt: Quantity) -> Result> { - self.core - .block_on(self.async_ctx.read_input_registers(addr, cnt)) + block_on_with_timeout( + &self.runtime, + self.timeout, + self.async_ctx.read_input_registers(addr, cnt), + ) } fn read_holding_registers(&mut self, addr: Address, cnt: Quantity) -> Result> { - self.core - .block_on(self.async_ctx.read_holding_registers(addr, cnt)) + block_on_with_timeout( + &self.runtime, + self.timeout, + self.async_ctx.read_holding_registers(addr, cnt), + ) } fn read_write_multiple_registers( @@ -95,7 +148,9 @@ impl Reader for Context { write_addr: Address, write_data: &[Word], ) -> Result> { - self.core.block_on( + block_on_with_timeout( + &self.runtime, + self.timeout, self.async_ctx .read_write_multiple_registers(read_addr, read_cnt, write_addr, write_data), ) @@ -104,22 +159,34 @@ impl Reader for Context { impl Writer for Context { fn write_single_register(&mut self, addr: Address, data: Word) -> Result<()> { - self.core - .block_on(self.async_ctx.write_single_register(addr, data)) + block_on_with_timeout( + &self.runtime, + self.timeout, + self.async_ctx.write_single_register(addr, data), + ) } fn write_multiple_registers(&mut self, addr: Address, data: &[Word]) -> Result<()> { - self.core - .block_on(self.async_ctx.write_multiple_registers(addr, data)) + block_on_with_timeout( + &self.runtime, + self.timeout, + self.async_ctx.write_multiple_registers(addr, data), + ) } fn write_single_coil(&mut self, addr: Address, coil: Coil) -> Result<()> { - self.core - .block_on(self.async_ctx.write_single_coil(addr, coil)) + block_on_with_timeout( + &self.runtime, + self.timeout, + self.async_ctx.write_single_coil(addr, coil), + ) } fn write_multiple_coils(&mut self, addr: Address, coils: &[Coil]) -> Result<()> { - self.core - .block_on(self.async_ctx.write_multiple_coils(addr, coils)) + block_on_with_timeout( + &self.runtime, + self.timeout, + self.async_ctx.write_multiple_coils(addr, coils), + ) } } diff --git a/src/client/sync/rtu.rs b/src/client/sync/rtu.rs index bd040498..f5cd0e04 100644 --- a/src/client/sync/rtu.rs +++ b/src/client/sync/rtu.rs @@ -1,7 +1,9 @@ // SPDX-FileCopyrightText: Copyright (c) 2017-2023 slowtec GmbH // SPDX-License-Identifier: MIT OR Apache-2.0 -use super::{Context, Result}; +use std::{io::Result, time::Duration}; + +use super::{block_on_with_timeout, Context}; use tokio_serial::{SerialPortBuilder, SerialStream}; @@ -14,17 +16,36 @@ pub fn connect(builder: &SerialPortBuilder) -> Result { connect_slave(builder, Slave::broadcast()) } +/// Connect to no particular Modbus slave device for sending +/// broadcast messages with a timeout. +pub fn connect_with_timeout( + builder: &SerialPortBuilder, + timeout: Option, +) -> Result { + connect_slave_with_timeout(builder, Slave::broadcast(), timeout) +} + /// Connect to any kind of Modbus slave device. pub fn connect_slave(builder: &SerialPortBuilder, slave: Slave) -> Result { - let rt = tokio::runtime::Builder::new_current_thread() + connect_slave_with_timeout(builder, slave, None) +} + +/// Connect to any kind of Modbus slave device with a timeout. +pub fn connect_slave_with_timeout( + builder: &SerialPortBuilder, + slave: Slave, + timeout: Option, +) -> Result { + let runtime = tokio::runtime::Builder::new_current_thread() .enable_io() .build()?; // SerialStream::open requires a runtime at least on cfg(unix). - let serial = rt.block_on(async { SerialStream::open(builder) })?; - let async_ctx = rt.block_on(async_connect_slave(serial, slave))?; + let serial = runtime.block_on(async { SerialStream::open(builder) })?; + let async_ctx = block_on_with_timeout(&runtime, timeout, async_connect_slave(serial, slave))?; let sync_ctx = Context { - core: rt, + runtime, async_ctx, + timeout, }; Ok(sync_ctx) } diff --git a/src/client/sync/tcp.rs b/src/client/sync/tcp.rs index 915177bf..e8af1428 100644 --- a/src/client/sync/tcp.rs +++ b/src/client/sync/tcp.rs @@ -3,29 +3,46 @@ //! TCP client connections -use std::net::SocketAddr; +use std::{io::Result, net::SocketAddr, time::Duration}; -use crate::client::tcp::connect_slave as async_connect_slave; -use crate::slave::Slave; +use crate::{client::tcp::connect_slave as async_connect_slave, slave::Slave}; -use super::{Context, Result}; +use super::{block_on_with_timeout, Context}; /// Establish a direct connection to a Modbus TCP coupler. pub fn connect(socket_addr: SocketAddr) -> Result { connect_slave(socket_addr, Slave::tcp_device()) } +/// Establish a direct connection to a Modbus TCP coupler with a timeout. +pub fn connect_with_timeout(socket_addr: SocketAddr, timeout: Option) -> Result { + connect_slave_with_timeout(socket_addr, Slave::tcp_device(), timeout) +} + /// Connect to any kind of Modbus slave device, probably through a Modbus TCP/RTU /// gateway that is forwarding messages to/from the corresponding unit identified /// by the slave parameter. pub fn connect_slave(socket_addr: SocketAddr, slave: Slave) -> Result { - let rt = tokio::runtime::Builder::new_current_thread() + connect_slave_with_timeout(socket_addr, slave, None) +} + +/// Connect to any kind of Modbus slave device, probably through a Modbus TCP/RTU +/// gateway that is forwarding messages to/from the corresponding unit identified +/// by the slave parameter. +pub fn connect_slave_with_timeout( + socket_addr: SocketAddr, + slave: Slave, + timeout: Option, +) -> Result { + let runtime = tokio::runtime::Builder::new_current_thread() .enable_io() .build()?; - let async_ctx = rt.block_on(async_connect_slave(socket_addr, slave))?; + let async_ctx = + block_on_with_timeout(&runtime, timeout, async_connect_slave(socket_addr, slave))?; let sync_ctx = Context { - core: rt, + runtime, async_ctx, + timeout, }; Ok(sync_ctx) }