diff --git a/socs/testing/device_emulator.py b/socs/testing/device_emulator.py index 88e933db6..0efba1014 100644 --- a/socs/testing/device_emulator.py +++ b/socs/testing/device_emulator.py @@ -12,7 +12,7 @@ import serial -def create_device_emulator(responses, relay_type, port=9001, encoding='utf-8', +def create_device_emulator(responses, relay_type, port=0, encoding='utf-8', reconnect=False): """Create a device emulator fixture. @@ -24,7 +24,8 @@ def create_device_emulator(responses, relay_type, port=9001, encoding='utf-8', values. See :class:`.DeviceEmulator` for details. relay_type (str): Communication relay type. Either 'serial' or 'tcp'. port (int): Port for the TCP relay to listen for connections on. - Defaults to 9001. Only used if relay_type is 'tcp'. + Defaults to 0, which will select a random available port. Only used + if relay_type is 'tcp'. encoding (str): Encoding for the messages and responses. See :func:`socs.testing.device_emulator.DeviceEmulator` for more details. @@ -81,6 +82,8 @@ class DeviceEmulator: Defaults to None. encoding (str): Encoding for the messages and responses, set by the encoding argument. + port (int): Port that the DeviceEmulator is listening on if using the + 'tcp' relay. _type (str): Relay type, either 'serial' or 'tcp'. _read (bool): Used to stop the background reading of data recieved on the relay. @@ -96,6 +99,7 @@ def __init__(self, responses, encoding='utf-8', reconnect=False): self._type = None self._read = True self._conn = None + self.port = None self.logger = logging.getLogger(self.__class__.__name__) self.logger.setLevel(logging.DEBUG) @@ -249,6 +253,7 @@ def _read_socket(self, port): try: self._sock.bind(('127.0.0.1', port)) self._sock_bound = True + self.port = self._sock.getsockname()[1] except OSError: self.logger.error(f"Failed to bind to port {port}, trying again...") time.sleep(1) @@ -310,7 +315,9 @@ def create_tcp_relay(self, port): DeviceEmulator object within a given test. Args: - port (int): Port for the TCP relay to listen for connections on. + port (int): Port for the TCP relay to listen for connections on. A + port of 0 will select a random available port. The port number + will then be available at ``self.port``. Notes: This will not return until the socket is properly bound to the diff --git a/tests/common/test_moxa_serial.py b/tests/common/test_moxa_serial.py index bd1b8bab6..63435b5af 100644 --- a/tests/common/test_moxa_serial.py +++ b/tests/common/test_moxa_serial.py @@ -6,15 +6,15 @@ from socs.testing.device_emulator import create_device_emulator tcp_emulator = create_device_emulator({'ping': 'pong\r'}, - 'tcp', 19001) + 'tcp') # Tried this as a fixture, but connections weren't cleaning up properly. -def create_tcpserver(): +def create_tcpserver(port): # Connection might not work on first attempt for i in range(5): try: - ser = moxa_serial.Serial_TCPServer(('127.0.0.1', 19001), 0.1) + ser = moxa_serial.Serial_TCPServer(('127.0.0.1', port), 0.1) break except ConnectionRefusedError: print("Could not connect, waiting and trying again.") @@ -24,24 +24,24 @@ def create_tcpserver(): @pytest.mark.integtest def test_moxa_serial_create_serial_tcpserver(tcp_emulator): - create_tcpserver() + create_tcpserver(tcp_emulator.port) @pytest.mark.integtest def test_moxa_serial_write(tcp_emulator): - ser = create_tcpserver() + ser = create_tcpserver(tcp_emulator.port) ser.write('ping') @pytest.mark.integtest def test_moxa_serial_writeread(tcp_emulator): - ser = create_tcpserver() + ser = create_tcpserver(tcp_emulator.port) response = ser.writeread('ping') assert response == 'pong' @pytest.mark.integtest def test_moxa_serial_write_readline(tcp_emulator): - ser = create_tcpserver() + ser = create_tcpserver(tcp_emulator.port) ser.write('ping') assert ser.readline() == 'pong\r' diff --git a/tests/integration/test_pfeiffer_tc400_agent_integration.py b/tests/integration/test_pfeiffer_tc400_agent_integration.py index 51ac78146..15746020a 100644 --- a/tests/integration/test_pfeiffer_tc400_agent_integration.py +++ b/tests/integration/test_pfeiffer_tc400_agent_integration.py @@ -43,7 +43,7 @@ def format_reply(data): run_agent = create_agent_runner_fixture( '../socs/agents/pfeiffer_tc400/agent.py', 'tc400_agent') client = create_client_fixture('pfeifferturboA') -emulator = create_device_emulator({}, relay_type='tcp') +emulator = create_device_emulator({}, relay_type='tcp', port=9001) @pytest.mark.integtest diff --git a/tests/test_device_emulator.py b/tests/test_device_emulator.py index 26fb820a7..df029b3da 100644 --- a/tests/test_device_emulator.py +++ b/tests/test_device_emulator.py @@ -6,7 +6,7 @@ from socs.testing import device_emulator tcp_emulator = device_emulator.create_device_emulator({'ping': 'pong'}, - 'tcp', 9001) + 'tcp') def test_create_device_emulator_invalid_type(): @@ -15,11 +15,12 @@ def test_create_device_emulator_invalid_type(): def test_create_device_emulator_tcp_relay(tcp_emulator): + port = tcp_emulator.port with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # Connection might not work on first attempt for i in range(5): try: - s.connect(('127.0.0.1', 9001)) + s.connect(('127.0.0.1', port)) break except ConnectionRefusedError: print("Could not connect, waiting and trying again.")