diff --git a/src/sorunlib/_internal.py b/src/sorunlib/_internal.py index a286ed3..59f0120 100644 --- a/src/sorunlib/_internal.py +++ b/src/sorunlib/_internal.py @@ -4,6 +4,8 @@ """ +import time + import ocs @@ -70,3 +72,54 @@ def check_running(client, response): error = f"Operation {op} in Agent {instance} is not in the 'running' " + \ "state.\n" + str(response) raise RuntimeError(error) + + +def check_started(client, response, timeout=60): + """Check that a process has started and is in the 'running' state. If it + has not finished starting, wait for timeout seconds for it to start. If + it is not running by then, raise exception. + + This is meant to be called right after a process' start call is made, to + ensure that process starts successfully. Use ``check_running()`` if you + want to check that an already running process is still running. + + Args: + client (ocs.ocs_client.OCSClient): OCS Client which returned the + response. + response (ocs.ocs_client.OCSReply): Response from an OCS operation + call. + timeout (float): How long to wait, in seconds, for the operation to go + from 'starting' to 'running' before raising an exception. + + Raises: + RuntimeError: When Operation does not properly transition from + 'starting' to 'running' or is not 'running'. + + """ + op = response.session['op_name'] + instance = client.instance_id + + _check_error(client, response) + + op_code = response.session.get('op_code') + + if op_code == 2: # STARTING + _operation = client.__getattribute__(op) + + # Wait at most ~timeout seconds while checking the status + for i in range(timeout): + response = _operation.status() + op_code = response.session.get('op_code') + if op_code == 3: + # Tricky to change state during testing w/little reward + return # pragma: no cover + time.sleep(1) + + error = f"Check timed out. Operation {op} in Agent {instance} stuck in " + \ + "'starting' state.\n" + str(response) + raise RuntimeError(error) + + if op_code != 3: # RUNNING + error = f"Operation {op} in Agent {instance} is not 'running'.\n" + \ + f"Current OpCode: {op_code}\n" + str(response) + raise RuntimeError(error) diff --git a/src/sorunlib/seq.py b/src/sorunlib/seq.py index db88fcf..7815a90 100644 --- a/src/sorunlib/seq.py +++ b/src/sorunlib/seq.py @@ -1,5 +1,5 @@ import sorunlib as run -from sorunlib._internal import check_response +from sorunlib._internal import check_response, check_started OP_TIMEOUT = 60 @@ -41,9 +41,7 @@ def scan(description, stop_time, width, az_drift=0, tag=None, subtype=None): el_endpoint2=el, el_speed=0, az_drift=az_drift) - - if not resp.session: - raise Exception(f"Generate Scan failed to start:\n {resp}") + check_started(acu, resp) # Wait until stop time run.commands.wait_until(stop_time) diff --git a/src/sorunlib/smurf.py b/src/sorunlib/smurf.py index a5eee6a..38c3f7d 100644 --- a/src/sorunlib/smurf.py +++ b/src/sorunlib/smurf.py @@ -14,7 +14,7 @@ import time import sorunlib as run -from sorunlib._internal import check_response +from sorunlib._internal import check_response, check_started # Timing between commanding separate SMuRF Controllers # Yet to be determined in the field. Eventually might need this to be unique @@ -30,6 +30,16 @@ def _wait_for_cryo(time_): time.sleep(wait) +def _check_smurf_threshold(): + cfg = run.config.load_config() + threshold = cfg['smurf_failure_threshold'] + remaining = len(run.CLIENTS['smurf']) + if remaining < threshold: + error = 'Functional SMuRF count below failure threshold ' + \ + f'({remaining} < {threshold}). Aborting.' + raise RuntimeError(error) + + def _run_op(operation, concurrent, settling_time, **kwargs): """Run operation across all active SMuRF controllers. @@ -83,13 +93,7 @@ def _run_op(operation, concurrent, settling_time, **kwargs): run.CLIENTS['smurf'].remove(client) # Check if enough SMuRFs remain - cfg = run.config.load_config() - threshold = cfg['smurf_failure_threshold'] - remaining = len(run.CLIENTS['smurf']) - if remaining < threshold: - error = 'Functional SMuRF count below failure threshold ' + \ - f'({remaining} < {threshold}). Aborting.' - raise RuntimeError(error) + _check_smurf_threshold() def set_targets(targets): @@ -322,13 +326,36 @@ def stream(state, tag=None, subtype=None): subtype (str, optional): Operation subtype used to tag the stream. """ + clients_to_remove = [] + if state.lower() == 'on': for smurf in run.CLIENTS['smurf']: smurf.stream.start(subtype=subtype, tag=tag) + for smurf in run.CLIENTS['smurf']: + resp = smurf.stream.status() + try: + check_started(smurf, resp, timeout=60) + except RuntimeError as e: + print(f"Failed to start stream on {smurf}, removing from targets list.") + print(e) + clients_to_remove.append(smurf) + else: for smurf in run.CLIENTS['smurf']: print(f'Turning off stream from {smurf.instance_id}.') smurf.stream.stop() resp = smurf.stream.wait() - check_response(smurf, resp) + try: + check_response(smurf, resp) + except RuntimeError as e: + print(f"Failed to stop stream on {smurf}, removing from targets list.") + print(e) + clients_to_remove.append(smurf) + + # Remove failed SMuRF clients + for client in clients_to_remove: + run.CLIENTS['smurf'].remove(client) + + # Check if enough SMuRFs remain + _check_smurf_threshold() diff --git a/tests/test__internal.py b/tests/test__internal.py index 9f1c44e..976d78b 100644 --- a/tests/test__internal.py +++ b/tests/test__internal.py @@ -2,9 +2,10 @@ import ocs import pytest +from unittest.mock import MagicMock, patch from ocs.ocs_client import OCSReply -from sorunlib._internal import check_response, check_running +from sorunlib._internal import check_response, check_running, check_started from util import create_session as create_unencoded_session @@ -20,6 +21,7 @@ def create_session(*args, **kwargs): class MockClient: def __init__(self): self.instance_id = 'test-id' + self.test_op = MagicMock() invalid_responses = [(MockClient(), OCSReply(ocs.TIMEOUT, @@ -33,11 +35,14 @@ def __init__(self): (MockClient(), OCSReply(ocs.OK, 'msg', create_session('test', success=True)))] invalid_running_responses = [ - (MockClient(), OCSReply(ocs.OK, 'msg', create_session('test')))] + (MockClient(), OCSReply(ocs.OK, 'msg', create_session('test', status='done')))] running_responses = [ (MockClient(), OCSReply(ocs.OK, 'msg', create_session('test', status='running')))] +invalid_starting_responses = [ + (MockClient(), OCSReply(ocs.OK, 'msg', create_session('test', status='starting')))] + @pytest.mark.parametrize("client,response", invalid_responses) def test_check_response_raises(client, response): @@ -59,3 +64,21 @@ def test_check_running_raises(client, response): @pytest.mark.parametrize("client,response", running_responses) def test_check_running(client, response): check_running(client, response) + + +@patch('sorunlib._internal.time.sleep', MagicMock()) +@pytest.mark.parametrize("client,response", invalid_starting_responses) +def test_check_started_raises_timeout(client, response): + with pytest.raises(RuntimeError): + check_started(client, response) + + +@pytest.mark.parametrize("client,response", invalid_running_responses) +def test_check_started_raises(client, response): + with pytest.raises(RuntimeError): + check_started(client, response, timeout=1) + + +@pytest.mark.parametrize("client,response", running_responses) +def test_check_started(client, response): + check_started(client, response) diff --git a/tests/test_seq.py b/tests/test_seq.py index e47ad4d..342487c 100644 --- a/tests/test_seq.py +++ b/tests/test_seq.py @@ -3,13 +3,15 @@ os.environ["SORUNLIB_CONFIG"] = "./data/example_config.yaml" import datetime as dt +import ocs import pytest from unittest.mock import MagicMock, patch +from ocs.ocs_client import OCSReply import sorunlib from sorunlib import seq -from util import create_patch_clients +from util import create_patch_clients, create_session patch_clients = create_patch_clients('satp') @@ -22,7 +24,7 @@ def test_scan(patch_clients): @patch('sorunlib.commands.time.sleep', MagicMock()) -def test_scan_failed_to_start(patch_clients): +def test_scan_no_session(patch_clients): # Setup mock OCSReply without session object mock_reply = MagicMock() mock_reply.session = None @@ -31,3 +33,32 @@ def test_scan_failed_to_start(patch_clients): target = dt.datetime.now() + dt.timedelta(seconds=1) with pytest.raises(Exception): seq.scan(description='test', stop_time=target.isoformat(), width=20.) + + +@patch('sorunlib.commands.time.sleep', MagicMock()) +def test_scan_failed_to_start(patch_clients): + # Setup mock OCSReply with failed status + failed_session = create_session('generate_scan') + failed_session.success = False + failed_session.set_status('running') + failed_session.add_message('A simulated error has occurred.') + failed_session.set_status('done') + + mock_reply = OCSReply(ocs.OK, 'msg', failed_session.encoded()) + sorunlib.CLIENTS['acu'].generate_scan.start = MagicMock(return_value=mock_reply) + sorunlib.CLIENTS['acu'].generate_scan.wait = MagicMock(return_value=mock_reply) + + # Example of failed reply this is trying to emulate. + # OCSReply: OK : Operation "generate_scan" is currently not running (FAILED). + # generate_scan[session=15]; status=done with ERROR 116.5 s ago, took 68.8 s + # messages (4 of 4): + # 1702337679.564 Status is now "starting". + # 1702337679.565 Status is now "running". + # 1702337748.356 Problems during scan + # 1702337748.357 Status is now "done". + # other keys in .session: op_code, data + print(mock_reply) + + target = dt.datetime.now() + dt.timedelta(seconds=1) + with pytest.raises(RuntimeError): + seq.scan(description='test', stop_time=target.isoformat(), width=20.) diff --git a/tests/test_smurf.py b/tests/test_smurf.py index e6718f6..102cdca 100644 --- a/tests/test_smurf.py +++ b/tests/test_smurf.py @@ -10,6 +10,8 @@ from sorunlib import smurf from util import create_patch_clients +os.environ["SORUNLIB_CONFIG"] = "./data/example_config.yaml" + # Use pytest-mock plugin to patch CLIENTS on all tests patch_clients = create_patch_clients('satp', autouse=True) @@ -133,3 +135,21 @@ def test_stream(state): client.stream.start.assert_called_once() else: client.stream.stop.assert_called_once() + + +@pytest.mark.parametrize("state", [("on"), ("off")]) +def test_stream_single_failure(state): + # Create failure on smurf1 + mocked_response = OCSReply( + 0, 'msg', {'success': False, 'op_name': 'stream'}) + # For state == 'on' + smurf.run.CLIENTS['smurf'][0].stream.status.side_effect = [mocked_response] + # For state == 'off' + smurf.run.CLIENTS['smurf'][0].stream.wait.side_effect = [mocked_response] + + smurf.stream(state=state) + for client in smurf.run.CLIENTS['smurf']: + if state == "on": + client.stream.start.assert_called_once() + else: + client.stream.stop.assert_called_once() diff --git a/tests/util.py b/tests/util.py index c1dde37..663d5cb 100644 --- a/tests/util.py +++ b/tests/util.py @@ -40,6 +40,12 @@ def _mock_smurf_client(instance_id): smurf.take_bgmap = MagicMock() smurf.take_noise = MagicMock() + # smurf.stream + session = create_session('stream', status='running') + reply = OCSReply(ocs.OK, 'msg', session.encoded()) + smurf.stream.start = MagicMock(return_value=reply) + smurf.stream.status = MagicMock(return_value=reply) + return smurf