From 74260e4b76cac45907ba1e5275eece65664c0ac2 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Wed, 2 Oct 2024 18:03:22 -0400 Subject: [PATCH 1/6] Split out TCP interface from PTC driver class --- socs/agents/cryomech_cpa/drivers.py | 95 ++---------------- socs/tcp.py | 150 ++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 88 deletions(-) create mode 100644 socs/tcp.py diff --git a/socs/agents/cryomech_cpa/drivers.py b/socs/agents/cryomech_cpa/drivers.py index 63e3c434e..b811aeadc 100644 --- a/socs/agents/cryomech_cpa/drivers.py +++ b/socs/agents/cryomech_cpa/drivers.py @@ -1,8 +1,8 @@ import random -import selectors -import socket import struct +from socs.tcp import TCPInterface + STX = '\x02' ADDR = '\x10' CMD = '\x80' @@ -15,7 +15,7 @@ ESC_ESC = '\x32' -class PTC: +class PTC(TCPInterface): def __init__(self, ip_address, port=502, timeout=10, fake_errors=False): self.fake_errors = fake_errors @@ -23,89 +23,15 @@ def __init__(self, ip_address, port=502, timeout=10, fake_errors=False): self.serial = None self.software_revision = None - self.ip_address = ip_address - self.port = int(port) - self.timeout = timeout - self.comm = self._connect((self.ip_address, self.port)) - - def _connect(self, address): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(self.timeout) - try: - sock.connect(address) - except TimeoutError: - print(f"Connection not established within {self.timeout}.") - return - except OSError as e: - print(f"Unable to connect. {e}") - return - except Exception as e: - print(f"Caught unexpected {type(e).__name__} while connecting:") - print(f" {e}") - return - return sock - - def reset(self): - print("Resetting the connection to the compressor.") - self.comm = self._connect((self.ip_address, self.port)) - - def _write(self, msg): - if self.comm is None: - print("Connection not established. Unable to send command.") - self.reset() - return - - try: - self.comm.sendall(msg) - return - except (BrokenPipeError, ConnectionResetError) as e: - print(f"Connection error: {e}") - self.reset() - except TimeoutError as e: - print(f"Timeout error while writing: {e}") - self.reset() - except Exception as e: - print(f"Caught unexpected {type(e).__name__} during write:") - print(f" {e}") - self.reset() - - # Try a second time before giving up - try: - self.comm.sendall(msg) - except (BrokenPipeError, ConnectionResetError) as e: - print(f"Connection error: {e}") - raise ConnectionError - except TimeoutError as e: - print(f"Timeout error while writing: {e}") - raise ConnectionError - except AttributeError: - raise ConnectionError("Unable to reset connection.") - except Exception as e: - print(f"Caught unexpected {type(e).__name__} during write:") - print(f" {e}") - raise ConnectionError - - def _check_ready(self): - """Check socket is ready to read from.""" - if self.comm is None: - raise ConnectionError("Connection not established, not ready to read.") - - sel = selectors.DefaultSelector() - sel.register(self.comm, selectors.EVENT_READ) - if not sel.select(self.timeout): - raise ConnectionError - - def _read(self): - self._check_ready() - data = self.comm.recv(1024) - return data + # Setup the TCP Interface + super().__init__(ip_address, port, timeout) def get_data(self): """ Gets the raw data from the ptc and returns it in a usable format. """ - self._write(self.buildRegistersQuery()) - data = self._read() + self.write(self.buildRegistersQuery()) + data = self.read() data_flag, brd = self.breakdownReplyData(data) return data_flag, brd @@ -246,10 +172,3 @@ def breakdownReplyData(self, rawdata): f"Skipping this data block. Bad output string is {rawdata}") return data_flag, data - - def __del__(self): - """ - If the PTC class instance is destroyed, close the connection to the - ptc. - """ - self.comm.close() diff --git a/socs/tcp.py b/socs/tcp.py new file mode 100644 index 000000000..20bc9a1bb --- /dev/null +++ b/socs/tcp.py @@ -0,0 +1,150 @@ +import selectors +import socket + + +class TCPInterface: + """Interface class for connecting to devices using TCP. + + Parameters + ---------- + ip_address : str + IP address of the device. + port : int + Associated port for TCP communication. + timeout : float + Duration in seconds that operations wait before giving up. + + Attributes + ---------- + ip_address : str + IP address of the device. + port : int + Associated port for TCP communication. + timeout : float + Duration in seconds that operations wait before giving up. + comm : socket.socket + Socket object that forms the connection to the device. + + """ + + def __init__(self, ip_address, port, timeout): + self.ip_address = ip_address + self.port = int(port) + self.timeout = timeout + self.comm = self._connect((self.ip_address, self.port)) + + def _connect(self, address): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(self.timeout) + try: + sock.connect(address) + except TimeoutError: + print(f"Connection not established within {self.timeout}.") + return + except OSError as e: + print(f"Unable to connect. {e}") + return + except Exception as e: + print(f"Caught unexpected {type(e).__name__} while connecting:") + print(f" {e}") + return + return sock + + def _reset(self): + print("Resetting the connection to the device.") + self.comm = self._connect((self.ip_address, self.port)) + + def write(self, msg): + """Write command to socket. + + This method will try to send the message and if it runs into any issues + will try to re-establish the socket connection before trying to send + the message again. If it fails a second time it raises an exception. + + If the connection has failed to reset from a previous ``write``, or has + not yet been established, it will first try to connnect before sending + the message. If it fails to establish the connection it will raise an + exception. + + Parameters + ---------- + msg : str + Message string to send on socket. + + Raises + ------ + ConnectionError + Raised if the communication fails for any reason. + + """ + if self.comm is None: + print("Connection not established. Unable to send command.") + self._reset() + return + + try: + self.comm.sendall(msg) + return + except (BrokenPipeError, ConnectionResetError) as e: + print(f"Connection error: {e}") + self._reset() + except TimeoutError as e: + print(f"Timeout error while writing: {e}") + self._reset() + except Exception as e: + print(f"Caught unexpected {type(e).__name__} during write:") + print(f" {e}") + self._reset() + + # Try a second time before giving up + try: + self.comm.sendall(msg) + except (BrokenPipeError, ConnectionResetError) as e: + print(f"Connection error: {e}") + raise ConnectionError + except TimeoutError as e: + print(f"Timeout error while writing: {e}") + raise ConnectionError + except AttributeError: + raise ConnectionError("Unable to reset connection.") + except Exception as e: + print(f"Caught unexpected {type(e).__name__} during write:") + print(f" {e}") + raise ConnectionError + + def _check_ready(self): + """Check socket is ready to read from.""" + if self.comm is None: + raise ConnectionError("Connection not established, not ready to read.") + + sel = selectors.DefaultSelector() + sel.register(self.comm, selectors.EVENT_READ) + if not sel.select(self.timeout): + raise ConnectionError("Socket not ready to read. Possible timeout.") + + def read(self): + """Read response from the device. + + This method will check if the socket is ready to be read from before + performing the read. If there is no data to read, or the socket is + otherwise unready an exception is raised. + + Returns + ------- + ``str`` or ``bytes`` + The response from the device. The return type + depends on the device. + + Raises + ------ + ConnectionError + Raised if the socket is not ready to read from. + + """ + self._check_ready() + data = self.comm.recv(1024) + return data + + def __del__(self): + if self.comm: + self.comm.close() From 10c154f75e3976c6c582470894f378b27eb157d0 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 4 Oct 2024 17:50:35 -0400 Subject: [PATCH 2/6] Rename 'write' to 'send' and 'read' to 'recv' This better mimics the socket API, which developers looking to use TCP might already be familiar with. --- socs/agents/cryomech_cpa/drivers.py | 4 ++-- socs/tcp.py | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/socs/agents/cryomech_cpa/drivers.py b/socs/agents/cryomech_cpa/drivers.py index b811aeadc..592cee5b0 100644 --- a/socs/agents/cryomech_cpa/drivers.py +++ b/socs/agents/cryomech_cpa/drivers.py @@ -30,8 +30,8 @@ def get_data(self): """ Gets the raw data from the ptc and returns it in a usable format. """ - self.write(self.buildRegistersQuery()) - data = self.read() + self.send(self.buildRegistersQuery()) + data = self.recv() data_flag, brd = self.breakdownReplyData(data) return data_flag, brd diff --git a/socs/tcp.py b/socs/tcp.py index 20bc9a1bb..facba1913 100644 --- a/socs/tcp.py +++ b/socs/tcp.py @@ -54,14 +54,14 @@ def _reset(self): print("Resetting the connection to the device.") self.comm = self._connect((self.ip_address, self.port)) - def write(self, msg): - """Write command to socket. + def send(self, msg): + """Send message to socket. This method will try to send the message and if it runs into any issues will try to re-establish the socket connection before trying to send the message again. If it fails a second time it raises an exception. - If the connection has failed to reset from a previous ``write``, or has + If the connection has failed to reset from a previous ``send``, or has not yet been established, it will first try to connnect before sending the message. If it fails to establish the connection it will raise an exception. @@ -92,7 +92,7 @@ def write(self, msg): print(f"Timeout error while writing: {e}") self._reset() except Exception as e: - print(f"Caught unexpected {type(e).__name__} during write:") + print(f"Caught unexpected {type(e).__name__} during send:") print(f" {e}") self._reset() @@ -108,7 +108,7 @@ def write(self, msg): except AttributeError: raise ConnectionError("Unable to reset connection.") except Exception as e: - print(f"Caught unexpected {type(e).__name__} during write:") + print(f"Caught unexpected {type(e).__name__} during send:") print(f" {e}") raise ConnectionError @@ -122,11 +122,11 @@ def _check_ready(self): if not sel.select(self.timeout): raise ConnectionError("Socket not ready to read. Possible timeout.") - def read(self): - """Read response from the device. + def recv(self): + """Receive response from the device. This method will check if the socket is ready to be read from before - performing the read. If there is no data to read, or the socket is + performing the recv. If there is no data to read, or the socket is otherwise unready an exception is raised. Returns From 93facd394b367ce149f382b5dc19b833a10f1cc6 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 4 Oct 2024 12:34:50 -0400 Subject: [PATCH 3/6] Raise if unable to reset at start of send --- socs/tcp.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/socs/tcp.py b/socs/tcp.py index facba1913..da5ac8e5a 100644 --- a/socs/tcp.py +++ b/socs/tcp.py @@ -78,9 +78,10 @@ def send(self, msg): """ if self.comm is None: - print("Connection not established. Unable to send command.") + print("Connection not established.") self._reset() - return + if self.comm is None: + raise ConnectionError("Unable to establish connection.") try: self.comm.sendall(msg) From ffadc36435df599cd1c27dfc56960fbc5b8ff44b Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 4 Oct 2024 12:34:24 -0400 Subject: [PATCH 4/6] Create TCP interface docs page --- docs/api.rst | 9 +++ docs/developer/{ => interfaces}/snmp.rst | 0 docs/developer/interfaces/tcp.rst | 83 ++++++++++++++++++++++++ docs/index.rst | 3 +- 4 files changed, 94 insertions(+), 1 deletion(-) rename docs/developer/{ => interfaces}/snmp.rst (100%) create mode 100644 docs/developer/interfaces/tcp.rst diff --git a/docs/api.rst b/docs/api.rst index 2ef255f28..fb9a9e996 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -100,6 +100,15 @@ socs.snmp :undoc-members: :show-inheritance: +socs.tcp +--------- + +.. automodule:: socs.tcp + :members: + :undoc-members: + :show-inheritance: + :noindex: + socs.testing ------------ diff --git a/docs/developer/snmp.rst b/docs/developer/interfaces/snmp.rst similarity index 100% rename from docs/developer/snmp.rst rename to docs/developer/interfaces/snmp.rst diff --git a/docs/developer/interfaces/tcp.rst b/docs/developer/interfaces/tcp.rst new file mode 100644 index 000000000..7bed36869 --- /dev/null +++ b/docs/developer/interfaces/tcp.rst @@ -0,0 +1,83 @@ +.. _tcp: + +=================================== +Transmission Control Protocol (TCP) +=================================== + +SOCS provides a standard interface for connecting to devices using TCP. This +page details how to use this interface. The primary benefit to using this +interface is the included error handling. + +A few important things to know about the behavior of the interface class: + +* The interface tries to connect to the device when instantiated. +* It will log but not raise an error if it cannot connect, instead + ``self.comm`` will be ``None``. +* The connection will be reset when ``send()`` is called if this happens. + An exception will be raised if it still cannot connect. +* The interface is built to mimic ``socket.send()`` and ``socket.recv()``, but + uses ``socket.sendall()`` in its implementation, so all bytes in the included + message are sent to the socket. + +See the example below for how to implement use of the ``TCPInterface`` class in +your device drivers and how to add error handling to the agent. + +Example +------- +An example of using ``TCPInterface`` to create a class that interfaces with a +device:: + + from socs.tcp import TCPInterface + + class Device(TCPInterface): + def __init__(self, ip_address, port=501, timeout=10, *args, **kwargs): + # Setup the TCP Interface + super().__init__(ip_address, port, timeout) + + def get_data(self): + self.send(query_string) + data = self.recv() + # Optionally perform any decoding required + return data + +Within the agent code where ``Device.get_data`` is used you should now handle +the possible ``ConnectionError``, as shown below. + +.. note:: + This example is stripped down to focus on the error handling. Important + parts of the agent process are missing here, like obtaining the lock and + publishing data to a feed. + +.. code-block:: + + class DeviceAgent: + self.device = Device('192.168.1.2') + + def main(self, session, params): + """Main data acquisition process.""" + + while session.status in ['starting', 'running']: + try: + data = self.device.get_data() + if session.degraded: + self.log.info("Connection re-established.") + session.degraded = False + except ConnectionError: + self.log.error("Failed to get data from device. Check network connection.") + session.degraded = True + time.sleep(1) # wait between reconnection attempts + continue + + return True, "Main process exited successfully." + +See existing TCP agents, such as the Cryomech CPA Agent (which the above +example is based on) for more examples. + +API +--- + +If you are developing an agent that connects to a device using TCP, the +``TCPInterface`` class is available for use and detailed here: + +.. autoclass:: socs.tcp.TCPInterface + :members: diff --git a/docs/index.rst b/docs/index.rst index 01ad2acbe..4f1c59e4f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -98,7 +98,8 @@ API Reference Full API documentation for core parts of the SOCS library. :caption: Developer Guide :maxdepth: 2 - developer/snmp + developer/interfaces/snmp + developer/interfaces/tcp developer/testing .. toctree:: From 74a89e30d9f1dbf9903caa6b485f5c04642e63fa Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 4 Oct 2024 16:45:55 -0400 Subject: [PATCH 5/6] Write docstring for PTC class --- docs/agents/cryomech_cpa.rst | 1 + socs/agents/cryomech_cpa/drivers.py | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/docs/agents/cryomech_cpa.rst b/docs/agents/cryomech_cpa.rst index 3af58368a..194cd14f2 100644 --- a/docs/agents/cryomech_cpa.rst +++ b/docs/agents/cryomech_cpa.rst @@ -91,3 +91,4 @@ Supporting APIs .. autoclass:: socs.agents.cryomech_cpa.agent.PTC :members: + :show-inheritance: diff --git a/socs/agents/cryomech_cpa/drivers.py b/socs/agents/cryomech_cpa/drivers.py index 592cee5b0..179ee319e 100644 --- a/socs/agents/cryomech_cpa/drivers.py +++ b/socs/agents/cryomech_cpa/drivers.py @@ -16,6 +16,28 @@ class PTC(TCPInterface): + """Interface class for connecting to the pulse tube compressor. + + Parameters + ---------- + ip_address : str + IP address of the device. + port : int + Associated port for TCP communication. Default is 502. + timeout : float + Duration in seconds that operations wait before giving up. Default is + 10 seconds. + fake_errors : bool + Flag that generates random fake errors if True. Does not generate + errors if False. Defaults to False. + + Attributes + ---------- + comm : socket.socket + Socket object that forms the connection to the compressor. + + """ + def __init__(self, ip_address, port=502, timeout=10, fake_errors=False): self.fake_errors = fake_errors @@ -28,7 +50,7 @@ def __init__(self, ip_address, port=502, timeout=10, fake_errors=False): def get_data(self): """ - Gets the raw data from the ptc and returns it in a usable format. + Gets the raw data from the PTC and returns it in a usable format. """ self.send(self.buildRegistersQuery()) data = self.recv() From 97d00a186dcf48d97a60c9f7bd235a1670d9a9f1 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 14 Oct 2024 17:48:32 -0400 Subject: [PATCH 6/6] Add bufsize argument to TCPInterface.recv() Default to 4096, the reasonable example value from upstream docs, which is commonly used in existing agents. --- socs/agents/cryomech_cpa/drivers.py | 2 +- socs/tcp.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/socs/agents/cryomech_cpa/drivers.py b/socs/agents/cryomech_cpa/drivers.py index 179ee319e..8851011f5 100644 --- a/socs/agents/cryomech_cpa/drivers.py +++ b/socs/agents/cryomech_cpa/drivers.py @@ -53,7 +53,7 @@ def get_data(self): Gets the raw data from the PTC and returns it in a usable format. """ self.send(self.buildRegistersQuery()) - data = self.recv() + data = self.recv(1024) data_flag, brd = self.breakdownReplyData(data) return data_flag, brd diff --git a/socs/tcp.py b/socs/tcp.py index da5ac8e5a..3b6746d6a 100644 --- a/socs/tcp.py +++ b/socs/tcp.py @@ -123,13 +123,18 @@ def _check_ready(self): if not sel.select(self.timeout): raise ConnectionError("Socket not ready to read. Possible timeout.") - def recv(self): + def recv(self, bufsize=4096): """Receive response from the device. This method will check if the socket is ready to be read from before performing the recv. If there is no data to read, or the socket is otherwise unready an exception is raised. + Parameters + ---------- + bufsize : int + Amount of data to be recieved in bytes. Defaults to 4096. + Returns ------- ``str`` or ``bytes`` @@ -143,7 +148,7 @@ def recv(self): """ self._check_ready() - data = self.comm.recv(1024) + data = self.comm.recv(bufsize) return data def __del__(self):