Skip to content

Commit

Permalink
Merge pull request #119 from simonsobs/koopman/check-started
Browse files Browse the repository at this point in the history
Create function to check if operations start properly
  • Loading branch information
BrianJKoopman authored Dec 19, 2023
2 parents 4546436 + edb66d8 commit bfa059c
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 17 deletions.
53 changes: 53 additions & 0 deletions src/sorunlib/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""

import time

import ocs


Expand Down Expand Up @@ -70,3 +72,54 @@ def check_running(client, response):
error = f"Operation {op} in Agent {instance} is not in the 'running' " + \
"state.\n" + str(response)
raise RuntimeError(error)


def check_started(client, response, timeout=60):
"""Check that a process has started and is in the 'running' state. If it
has not finished starting, wait for timeout seconds for it to start. If
it is not running by then, raise exception.
This is meant to be called right after a process' start call is made, to
ensure that process starts successfully. Use ``check_running()`` if you
want to check that an already running process is still running.
Args:
client (ocs.ocs_client.OCSClient): OCS Client which returned the
response.
response (ocs.ocs_client.OCSReply): Response from an OCS operation
call.
timeout (float): How long to wait, in seconds, for the operation to go
from 'starting' to 'running' before raising an exception.
Raises:
RuntimeError: When Operation does not properly transition from
'starting' to 'running' or is not 'running'.
"""
op = response.session['op_name']
instance = client.instance_id

_check_error(client, response)

op_code = response.session.get('op_code')

if op_code == 2: # STARTING
_operation = client.__getattribute__(op)

# Wait at most ~timeout seconds while checking the status
for i in range(timeout):
response = _operation.status()
op_code = response.session.get('op_code')
if op_code == 3:
# Tricky to change state during testing w/little reward
return # pragma: no cover
time.sleep(1)

error = f"Check timed out. Operation {op} in Agent {instance} stuck in " + \
"'starting' state.\n" + str(response)
raise RuntimeError(error)

if op_code != 3: # RUNNING
error = f"Operation {op} in Agent {instance} is not 'running'.\n" + \
f"Current OpCode: {op_code}\n" + str(response)
raise RuntimeError(error)
6 changes: 2 additions & 4 deletions src/sorunlib/seq.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sorunlib as run
from sorunlib._internal import check_response
from sorunlib._internal import check_response, check_started


OP_TIMEOUT = 60
Expand Down Expand Up @@ -41,9 +41,7 @@ def scan(description, stop_time, width, az_drift=0, tag=None, subtype=None):
el_endpoint2=el,
el_speed=0,
az_drift=az_drift)

if not resp.session:
raise Exception(f"Generate Scan failed to start:\n {resp}")
check_started(acu, resp)

# Wait until stop time
run.commands.wait_until(stop_time)
Expand Down
45 changes: 36 additions & 9 deletions src/sorunlib/smurf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import time

import sorunlib as run
from sorunlib._internal import check_response
from sorunlib._internal import check_response, check_started

# Timing between commanding separate SMuRF Controllers
# Yet to be determined in the field. Eventually might need this to be unique
Expand All @@ -30,6 +30,16 @@ def _wait_for_cryo(time_):
time.sleep(wait)


def _check_smurf_threshold():
cfg = run.config.load_config()
threshold = cfg['smurf_failure_threshold']
remaining = len(run.CLIENTS['smurf'])
if remaining < threshold:
error = 'Functional SMuRF count below failure threshold ' + \
f'({remaining} < {threshold}). Aborting.'
raise RuntimeError(error)


def _run_op(operation, concurrent, settling_time, **kwargs):
"""Run operation across all active SMuRF controllers.
Expand Down Expand Up @@ -83,13 +93,7 @@ def _run_op(operation, concurrent, settling_time, **kwargs):
run.CLIENTS['smurf'].remove(client)

# Check if enough SMuRFs remain
cfg = run.config.load_config()
threshold = cfg['smurf_failure_threshold']
remaining = len(run.CLIENTS['smurf'])
if remaining < threshold:
error = 'Functional SMuRF count below failure threshold ' + \
f'({remaining} < {threshold}). Aborting.'
raise RuntimeError(error)
_check_smurf_threshold()


def set_targets(targets):
Expand Down Expand Up @@ -322,13 +326,36 @@ def stream(state, tag=None, subtype=None):
subtype (str, optional): Operation subtype used to tag the stream.
"""
clients_to_remove = []

if state.lower() == 'on':
for smurf in run.CLIENTS['smurf']:
smurf.stream.start(subtype=subtype, tag=tag)

for smurf in run.CLIENTS['smurf']:
resp = smurf.stream.status()
try:
check_started(smurf, resp, timeout=60)
except RuntimeError as e:
print(f"Failed to start stream on {smurf}, removing from targets list.")
print(e)
clients_to_remove.append(smurf)

else:
for smurf in run.CLIENTS['smurf']:
print(f'Turning off stream from {smurf.instance_id}.')
smurf.stream.stop()
resp = smurf.stream.wait()
check_response(smurf, resp)
try:
check_response(smurf, resp)
except RuntimeError as e:
print(f"Failed to stop stream on {smurf}, removing from targets list.")
print(e)
clients_to_remove.append(smurf)

# Remove failed SMuRF clients
for client in clients_to_remove:
run.CLIENTS['smurf'].remove(client)

# Check if enough SMuRFs remain
_check_smurf_threshold()
27 changes: 25 additions & 2 deletions tests/test__internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import ocs
import pytest

from unittest.mock import MagicMock, patch
from ocs.ocs_client import OCSReply

from sorunlib._internal import check_response, check_running
from sorunlib._internal import check_response, check_running, check_started

from util import create_session as create_unencoded_session

Expand All @@ -20,6 +21,7 @@ def create_session(*args, **kwargs):
class MockClient:
def __init__(self):
self.instance_id = 'test-id'
self.test_op = MagicMock()


invalid_responses = [(MockClient(), OCSReply(ocs.TIMEOUT,
Expand All @@ -33,11 +35,14 @@ def __init__(self):
(MockClient(), OCSReply(ocs.OK, 'msg', create_session('test', success=True)))]

invalid_running_responses = [
(MockClient(), OCSReply(ocs.OK, 'msg', create_session('test')))]
(MockClient(), OCSReply(ocs.OK, 'msg', create_session('test', status='done')))]

running_responses = [
(MockClient(), OCSReply(ocs.OK, 'msg', create_session('test', status='running')))]

invalid_starting_responses = [
(MockClient(), OCSReply(ocs.OK, 'msg', create_session('test', status='starting')))]


@pytest.mark.parametrize("client,response", invalid_responses)
def test_check_response_raises(client, response):
Expand All @@ -59,3 +64,21 @@ def test_check_running_raises(client, response):
@pytest.mark.parametrize("client,response", running_responses)
def test_check_running(client, response):
check_running(client, response)


@patch('sorunlib._internal.time.sleep', MagicMock())
@pytest.mark.parametrize("client,response", invalid_starting_responses)
def test_check_started_raises_timeout(client, response):
with pytest.raises(RuntimeError):
check_started(client, response)


@pytest.mark.parametrize("client,response", invalid_running_responses)
def test_check_started_raises(client, response):
with pytest.raises(RuntimeError):
check_started(client, response, timeout=1)


@pytest.mark.parametrize("client,response", running_responses)
def test_check_started(client, response):
check_started(client, response)
35 changes: 33 additions & 2 deletions tests/test_seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
os.environ["SORUNLIB_CONFIG"] = "./data/example_config.yaml"
import datetime as dt

import ocs
import pytest
from unittest.mock import MagicMock, patch
from ocs.ocs_client import OCSReply

import sorunlib
from sorunlib import seq

from util import create_patch_clients
from util import create_patch_clients, create_session


patch_clients = create_patch_clients('satp')
Expand All @@ -22,7 +24,7 @@ def test_scan(patch_clients):


@patch('sorunlib.commands.time.sleep', MagicMock())
def test_scan_failed_to_start(patch_clients):
def test_scan_no_session(patch_clients):
# Setup mock OCSReply without session object
mock_reply = MagicMock()
mock_reply.session = None
Expand All @@ -31,3 +33,32 @@ def test_scan_failed_to_start(patch_clients):
target = dt.datetime.now() + dt.timedelta(seconds=1)
with pytest.raises(Exception):
seq.scan(description='test', stop_time=target.isoformat(), width=20.)


@patch('sorunlib.commands.time.sleep', MagicMock())
def test_scan_failed_to_start(patch_clients):
# Setup mock OCSReply with failed status
failed_session = create_session('generate_scan')
failed_session.success = False
failed_session.set_status('running')
failed_session.add_message('A simulated error has occurred.')
failed_session.set_status('done')

mock_reply = OCSReply(ocs.OK, 'msg', failed_session.encoded())
sorunlib.CLIENTS['acu'].generate_scan.start = MagicMock(return_value=mock_reply)
sorunlib.CLIENTS['acu'].generate_scan.wait = MagicMock(return_value=mock_reply)

# Example of failed reply this is trying to emulate.
# OCSReply: OK : Operation "generate_scan" is currently not running (FAILED).
# generate_scan[session=15]; status=done with ERROR 116.5 s ago, took 68.8 s
# messages (4 of 4):
# 1702337679.564 Status is now "starting".
# 1702337679.565 Status is now "running".
# 1702337748.356 Problems during scan
# 1702337748.357 Status is now "done".
# other keys in .session: op_code, data
print(mock_reply)

target = dt.datetime.now() + dt.timedelta(seconds=1)
with pytest.raises(RuntimeError):
seq.scan(description='test', stop_time=target.isoformat(), width=20.)
20 changes: 20 additions & 0 deletions tests/test_smurf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from sorunlib import smurf
from util import create_patch_clients

os.environ["SORUNLIB_CONFIG"] = "./data/example_config.yaml"


# Use pytest-mock plugin to patch CLIENTS on all tests
patch_clients = create_patch_clients('satp', autouse=True)
Expand Down Expand Up @@ -133,3 +135,21 @@ def test_stream(state):
client.stream.start.assert_called_once()
else:
client.stream.stop.assert_called_once()


@pytest.mark.parametrize("state", [("on"), ("off")])
def test_stream_single_failure(state):
# Create failure on smurf1
mocked_response = OCSReply(
0, 'msg', {'success': False, 'op_name': 'stream'})
# For state == 'on'
smurf.run.CLIENTS['smurf'][0].stream.status.side_effect = [mocked_response]
# For state == 'off'
smurf.run.CLIENTS['smurf'][0].stream.wait.side_effect = [mocked_response]

smurf.stream(state=state)
for client in smurf.run.CLIENTS['smurf']:
if state == "on":
client.stream.start.assert_called_once()
else:
client.stream.stop.assert_called_once()
6 changes: 6 additions & 0 deletions tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def _mock_smurf_client(instance_id):
smurf.take_bgmap = MagicMock()
smurf.take_noise = MagicMock()

# smurf.stream
session = create_session('stream', status='running')
reply = OCSReply(ocs.OK, 'msg', session.encoded())
smurf.stream.start = MagicMock(return_value=reply)
smurf.stream.status = MagicMock(return_value=reply)

return smurf


Expand Down

0 comments on commit bfa059c

Please sign in to comment.