Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v0.7] RTU/TCP sync: Add optional timeout #130

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

## v0.7.0 (Unreleased)

- Added: Optional timeout for synchronous RTU/TCP operations [#125](https://github.com/slowtec/tokio-modbus/issues/125).

### Breaking Changes

- Features: Added "rtu-sync" as a replacement and superset of "rtu" and "sync"
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ tcp-server-unstable = [
"tokio/rt-multi-thread",
]
# The following features are internal and must not be used in dependencies.
sync = []
sync = ["dep:futures", "tokio/time"]
server = ["dep:futures"]

[badges]
Expand Down
111 changes: 89 additions & 22 deletions src/client/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,35 @@ pub mod rtu;
#[cfg(feature = "tcp-sync")]
pub mod tcp;

use std::{future::Future, io::Result, time::Duration};

use futures::future::Either;

use crate::{frame::*, slave::*};

use super::{
Client as AsyncClient, Context as AsyncContext, Reader as AsyncReader, SlaveContext,
Writer as AsyncWriter,
};

use crate::frame::*;
use crate::slave::*;

use std::io::Result;
fn block_on_with_timeout<T>(
runtime: &tokio::runtime::Runtime,
timeout: Option<Duration>,
task: impl Future<Output = Result<T>>,
) -> Result<T> {
let task = if let Some(duration) = timeout {
Either::Left(async move {
tokio::time::timeout(duration, task)
.await
.unwrap_or_else(|elapsed| {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, elapsed))
})
})
} else {
Either::Right(task)
};
runtime.block_on(task)
}

/// A transport independent synchronous client trait.
pub trait Client: SlaveContext {
Expand Down Expand Up @@ -52,13 +72,33 @@ pub trait Writer: Client {
/// A synchronous Modbus client context.
#[derive(Debug)]
pub struct Context {
core: tokio::runtime::Runtime,
runtime: tokio::runtime::Runtime,
async_ctx: AsyncContext,
timeout: Option<Duration>,
}

impl Context {
/// Returns the current timeout.
pub const fn timeout(&self) -> Option<Duration> {
self.timeout
}

/// Sets a timeout duration for all subsequent operations.
///
/// The timeout is disabled by passing `None`.
pub fn set_timeout(&mut self, duration: impl Into<Option<Duration>>) {
self.timeout = duration.into()
}

/// Disables the timeout for all subsequent operations.
pub fn reset_timeout(&mut self) {
self.timeout = None;
}
}

impl Client for Context {
fn call(&mut self, req: Request) -> Result<Response> {
self.core.block_on(self.async_ctx.call(req))
block_on_with_timeout(&self.runtime, self.timeout, self.async_ctx.call(req))
}
}

Expand All @@ -70,22 +110,35 @@ impl SlaveContext for Context {

impl Reader for Context {
fn read_coils(&mut self, addr: Address, cnt: Quantity) -> Result<Vec<Coil>> {
self.core.block_on(self.async_ctx.read_coils(addr, cnt))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.read_coils(addr, cnt),
)
}

fn read_discrete_inputs(&mut self, addr: Address, cnt: Quantity) -> Result<Vec<Coil>> {
self.core
.block_on(self.async_ctx.read_discrete_inputs(addr, cnt))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.read_discrete_inputs(addr, cnt),
)
}

fn read_input_registers(&mut self, addr: Address, cnt: Quantity) -> Result<Vec<Word>> {
self.core
.block_on(self.async_ctx.read_input_registers(addr, cnt))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.read_input_registers(addr, cnt),
)
}

fn read_holding_registers(&mut self, addr: Address, cnt: Quantity) -> Result<Vec<Word>> {
self.core
.block_on(self.async_ctx.read_holding_registers(addr, cnt))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.read_holding_registers(addr, cnt),
)
}

fn read_write_multiple_registers(
Expand All @@ -95,7 +148,9 @@ impl Reader for Context {
write_addr: Address,
write_data: &[Word],
) -> Result<Vec<Word>> {
self.core.block_on(
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx
.read_write_multiple_registers(read_addr, read_cnt, write_addr, write_data),
)
Expand All @@ -104,22 +159,34 @@ impl Reader for Context {

impl Writer for Context {
fn write_single_register(&mut self, addr: Address, data: Word) -> Result<()> {
self.core
.block_on(self.async_ctx.write_single_register(addr, data))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.write_single_register(addr, data),
)
}

fn write_multiple_registers(&mut self, addr: Address, data: &[Word]) -> Result<()> {
self.core
.block_on(self.async_ctx.write_multiple_registers(addr, data))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.write_multiple_registers(addr, data),
)
}

fn write_single_coil(&mut self, addr: Address, coil: Coil) -> Result<()> {
self.core
.block_on(self.async_ctx.write_single_coil(addr, coil))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.write_single_coil(addr, coil),
)
}

fn write_multiple_coils(&mut self, addr: Address, coils: &[Coil]) -> Result<()> {
self.core
.block_on(self.async_ctx.write_multiple_coils(addr, coils))
block_on_with_timeout(
&self.runtime,
self.timeout,
self.async_ctx.write_multiple_coils(addr, coils),
)
}
}
31 changes: 26 additions & 5 deletions src/client/sync/rtu.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// SPDX-FileCopyrightText: Copyright (c) 2017-2023 slowtec GmbH <[email protected]>
// SPDX-License-Identifier: MIT OR Apache-2.0

use super::{Context, Result};
use std::{io::Result, time::Duration};

use super::{block_on_with_timeout, Context};

use tokio_serial::{SerialPortBuilder, SerialStream};

Expand All @@ -14,17 +16,36 @@ pub fn connect(builder: &SerialPortBuilder) -> Result<Context> {
connect_slave(builder, Slave::broadcast())
}

/// Connect to no particular Modbus slave device for sending
/// broadcast messages with a timeout.
pub fn connect_with_timeout(
builder: &SerialPortBuilder,
timeout: Option<Duration>,
) -> Result<Context> {
connect_slave_with_timeout(builder, Slave::broadcast(), timeout)
}

/// Connect to any kind of Modbus slave device.
pub fn connect_slave(builder: &SerialPortBuilder, slave: Slave) -> Result<Context> {
let rt = tokio::runtime::Builder::new_current_thread()
connect_slave_with_timeout(builder, slave, None)
}

/// Connect to any kind of Modbus slave device with a timeout.
pub fn connect_slave_with_timeout(
builder: &SerialPortBuilder,
slave: Slave,
timeout: Option<Duration>,
) -> Result<Context> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()?;
// SerialStream::open requires a runtime at least on cfg(unix).
let serial = rt.block_on(async { SerialStream::open(builder) })?;
let async_ctx = rt.block_on(async_connect_slave(serial, slave))?;
let serial = runtime.block_on(async { SerialStream::open(builder) })?;
let async_ctx = block_on_with_timeout(&runtime, timeout, async_connect_slave(serial, slave))?;
let sync_ctx = Context {
core: rt,
runtime,
async_ctx,
timeout,
};
Ok(sync_ctx)
}
31 changes: 24 additions & 7 deletions src/client/sync/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,46 @@

//! TCP client connections
use std::net::SocketAddr;
use std::{io::Result, net::SocketAddr, time::Duration};

use crate::client::tcp::connect_slave as async_connect_slave;
use crate::slave::Slave;
use crate::{client::tcp::connect_slave as async_connect_slave, slave::Slave};

use super::{Context, Result};
use super::{block_on_with_timeout, Context};

/// Establish a direct connection to a Modbus TCP coupler.
pub fn connect(socket_addr: SocketAddr) -> Result<Context> {
connect_slave(socket_addr, Slave::tcp_device())
}

/// Establish a direct connection to a Modbus TCP coupler with a timeout.
pub fn connect_with_timeout(socket_addr: SocketAddr, timeout: Option<Duration>) -> Result<Context> {
connect_slave_with_timeout(socket_addr, Slave::tcp_device(), timeout)
}

/// Connect to any kind of Modbus slave device, probably through a Modbus TCP/RTU
/// gateway that is forwarding messages to/from the corresponding unit identified
/// by the slave parameter.
pub fn connect_slave(socket_addr: SocketAddr, slave: Slave) -> Result<Context> {
let rt = tokio::runtime::Builder::new_current_thread()
connect_slave_with_timeout(socket_addr, slave, None)
}

/// Connect to any kind of Modbus slave device, probably through a Modbus TCP/RTU
/// gateway that is forwarding messages to/from the corresponding unit identified
/// by the slave parameter.
pub fn connect_slave_with_timeout(
socket_addr: SocketAddr,
slave: Slave,
timeout: Option<Duration>,
) -> Result<Context> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()?;
let async_ctx = rt.block_on(async_connect_slave(socket_addr, slave))?;
let async_ctx =
block_on_with_timeout(&runtime, timeout, async_connect_slave(socket_addr, slave))?;
let sync_ctx = Context {
core: rt,
runtime,
async_ctx,
timeout,
};
Ok(sync_ctx)
}