Skip to content

Commit

Permalink
RTU/TCP sync: Add optional timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Jan 28, 2023
1 parent c2a13b8 commit 0e9146e
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 37 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

## v0.6.0 (Unreleased)

- Features: Replace "sync" with "rtu-sync" and "tcp-sync" respectively
- Features: Replace "sync" with "rtu-sync" and "tcp-sync" respectively.
- Added: Optional timeout for synchronous RTU/TCP operations [#125](https://github.com/slowtec/tokio-modbus/issues/125).

## v0.5.4 (2023-01-18)

Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ tokio-serial = { version = "5.4.3", default-features = false }
default = ["rtu", "tcp"]
rtu = ["futures-util/sink"]
tcp = ["tokio/net", "futures-util/sink"]
rtu-sync = ["rtu", "tokio-serial"]
tcp-sync = ["tcp"]
rtu-sync = ["rtu", "futures", "tokio/time", "tokio-serial"]
tcp-sync = ["tcp", "futures", "tokio/time"]
server = ["futures"]
rtu-server = ["rtu", "server", "tokio/macros", "tokio-serial"]
tcp-server-unstable = [
Expand Down
111 changes: 89 additions & 22 deletions src/client/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,35 @@ pub mod rtu;
#[cfg(feature = "tcp-sync")]
pub mod tcp;

use std::{future::Future, io::Result, time::Duration};

use futures::future::Either;

use crate::{frame::*, slave::*};

use super::{
Client as AsyncClient, Context as AsyncContext, Reader as AsyncReader, SlaveContext,
Writer as AsyncWriter,
};

use crate::frame::*;
use crate::slave::*;

use std::io::Result;
fn block_on_with_timeout<T>(
runtime: &tokio::runtime::Runtime,
timeout: Option<Duration>,
task: impl Future<Output = Result<T>>,
) -> Result<T> {
let task = if let Some(duration) = timeout {
Either::Left(async move {
tokio::time::timeout(duration, task)
.await
.unwrap_or_else(|elapsed| {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, elapsed))
})
})
} else {
Either::Right(task)
};
runtime.block_on(task)
}

/// A transport independent synchronous client trait.
pub trait Client: SlaveContext {
Expand Down Expand Up @@ -52,13 +72,33 @@ pub trait Writer: Client {
/// A synchronous Modbus client context.
#[derive(Debug)]
pub struct Context {
core: tokio::runtime::Runtime,
runtime: tokio::runtime::Runtime,
async_ctx: AsyncContext,
timeout: Option<Duration>,
}

impl Context {
/// Returns the current timeout.
pub const fn timeout(&self) -> Option<Duration> {
self.timeout
}

/// Sets a timeout duration for all subsequent operations.
///
/// The timeout is disabled by passing `None`.
pub fn set_timeout(&mut self, duration: impl Into<Option<Duration>>) {
self.timeout = duration.into()
}

/// Disables the timeout for all subsequent operations.
pub fn reset_timeout(&mut self) {
self.timeout = None;
}
}

impl Client for Context {
fn call(&mut self, req: Request) -> Result<Response> {
self.core.block_on(self.async_ctx.call(req))
block_on_with_timeout(&self.runtime, self.timeout, self.async_ctx.call(req))
}
}

Expand All @@ -70,22 +110,35 @@ impl SlaveContext for Context {

impl Reader for Context {
fn read_coils(&mut self, addr: Address, cnt: Quantity) -> Result<Vec<Coil>> {
self.core.block_on(self.async_ctx.read_coils(addr, cnt))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.read_coils(addr, cnt),
)
}

fn read_discrete_inputs(&mut self, addr: Address, cnt: Quantity) -> Result<Vec<Coil>> {
self.core
.block_on(self.async_ctx.read_discrete_inputs(addr, cnt))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.read_discrete_inputs(addr, cnt),
)
}

fn read_input_registers(&mut self, addr: Address, cnt: Quantity) -> Result<Vec<Word>> {
self.core
.block_on(self.async_ctx.read_input_registers(addr, cnt))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.read_input_registers(addr, cnt),
)
}

fn read_holding_registers(&mut self, addr: Address, cnt: Quantity) -> Result<Vec<Word>> {
self.core
.block_on(self.async_ctx.read_holding_registers(addr, cnt))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.read_holding_registers(addr, cnt),
)
}

fn read_write_multiple_registers(
Expand All @@ -95,7 +148,9 @@ impl Reader for Context {
write_addr: Address,
write_data: &[Word],
) -> Result<Vec<Word>> {
self.core.block_on(
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx
.read_write_multiple_registers(read_addr, read_cnt, write_addr, write_data),
)
Expand All @@ -104,22 +159,34 @@ impl Reader for Context {

impl Writer for Context {
fn write_single_register(&mut self, addr: Address, data: Word) -> Result<()> {
self.core
.block_on(self.async_ctx.write_single_register(addr, data))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.write_single_register(addr, data),
)
}

fn write_multiple_registers(&mut self, addr: Address, data: &[Word]) -> Result<()> {
self.core
.block_on(self.async_ctx.write_multiple_registers(addr, data))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.write_multiple_registers(addr, data),
)
}

fn write_single_coil(&mut self, addr: Address, coil: Coil) -> Result<()> {
self.core
.block_on(self.async_ctx.write_single_coil(addr, coil))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.write_single_coil(addr, coil),
)
}

fn write_multiple_coils(&mut self, addr: Address, coils: &[Coil]) -> Result<()> {
self.core
.block_on(self.async_ctx.write_multiple_coils(addr, coils))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.write_multiple_coils(addr, coils),
)
}
}
31 changes: 26 additions & 5 deletions src/client/sync/rtu.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// SPDX-FileCopyrightText: Copyright (c) 2017-2023 slowtec GmbH <[email protected]>
// SPDX-License-Identifier: MIT OR Apache-2.0

use super::{Context, Result};
use std::{io::Result, time::Duration};

use super::{block_on_with_timeout, Context};

use tokio_serial::{SerialPortBuilder, SerialStream};

Expand All @@ -14,17 +16,36 @@ pub fn connect(builder: &SerialPortBuilder) -> Result<Context> {
connect_slave(builder, Slave::broadcast())
}

/// Connect to no particular Modbus slave device for sending
/// broadcast messages with a timeout.
pub fn connect_with_timeout(
builder: &SerialPortBuilder,
timeout: Option<Duration>,
) -> Result<Context> {
connect_slave_with_timeout(builder, Slave::broadcast(), timeout)
}

/// Connect to any kind of Modbus slave device.
pub fn connect_slave(builder: &SerialPortBuilder, slave: Slave) -> Result<Context> {
let rt = tokio::runtime::Builder::new_current_thread()
connect_slave_with_timeout(builder, slave, None)
}

/// Connect to any kind of Modbus slave device with a timeout.
pub fn connect_slave_with_timeout(
builder: &SerialPortBuilder,
slave: Slave,
timeout: Option<Duration>,
) -> Result<Context> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()?;
// SerialStream::open requires a runtime at least on cfg(unix).
let serial = rt.block_on(async { SerialStream::open(builder) })?;
let async_ctx = rt.block_on(async_connect_slave(serial, slave))?;
let serial = runtime.block_on(async { SerialStream::open(builder) })?;
let async_ctx = block_on_with_timeout(&runtime, timeout, async_connect_slave(serial, slave))?;
let sync_ctx = Context {
core: rt,
runtime,
async_ctx,
timeout,
};
Ok(sync_ctx)
}
31 changes: 24 additions & 7 deletions src/client/sync/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,46 @@

//! TCP client connections
use std::net::SocketAddr;
use std::{io::Result, net::SocketAddr, time::Duration};

use crate::client::tcp::connect_slave as async_connect_slave;
use crate::slave::Slave;
use crate::{client::tcp::connect_slave as async_connect_slave, slave::Slave};

use super::{Context, Result};
use super::{block_on_with_timeout, Context};

/// Establish a direct connection to a Modbus TCP coupler.
pub fn connect(socket_addr: SocketAddr) -> Result<Context> {
connect_slave(socket_addr, Slave::tcp_device())
}

/// Establish a direct connection to a Modbus TCP coupler with a timeout.
pub fn connect_with_timeout(socket_addr: SocketAddr, timeout: Option<Duration>) -> Result<Context> {
connect_slave_with_timeout(socket_addr, Slave::tcp_device(), timeout)
}

/// Connect to any kind of Modbus slave device, probably through a Modbus TCP/RTU
/// gateway that is forwarding messages to/from the corresponding unit identified
/// by the slave parameter.
pub fn connect_slave(socket_addr: SocketAddr, slave: Slave) -> Result<Context> {
let rt = tokio::runtime::Builder::new_current_thread()
connect_slave_with_timeout(socket_addr, slave, None)
}

/// Connect to any kind of Modbus slave device, probably through a Modbus TCP/RTU
/// gateway that is forwarding messages to/from the corresponding unit identified
/// by the slave parameter.
pub fn connect_slave_with_timeout(
socket_addr: SocketAddr,
slave: Slave,
timeout: Option<Duration>,
) -> Result<Context> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()?;
let async_ctx = rt.block_on(async_connect_slave(socket_addr, slave))?;
let async_ctx =
block_on_with_timeout(&runtime, timeout, async_connect_slave(socket_addr, slave))?;
let sync_ctx = Context {
core: rt,
runtime,
async_ctx,
timeout,
};
Ok(sync_ctx)
}

0 comments on commit 0e9146e

Please sign in to comment.