Skip to content

Commit

Permalink
Read functions should now try multiple times and not crash the agent
Browse files Browse the repository at this point in the history
  • Loading branch information
Bryce Bixler committed Sep 30, 2024
1 parent 113f12b commit ef14f44
Showing 1 changed file with 48 additions and 37 deletions.
85 changes: 48 additions & 37 deletions socs/agents/hwp_pid/drivers/pid_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@
from typing import Optional, Union


def retry_multiple_times(loops=3):
def dec_wrapper(func):
def inner(*args, **kwargs):
for i in range(loops):
try:
return func(*args, **kwargs)
except:
time.sleep(0.2)
print(f'Could not complete {func.__name__} after {loops} attempt(s)')
return None
return inner
return dec_wrapper


@dataclass
class DecodedResponse:
msg_type: str
Expand Down Expand Up @@ -221,6 +235,7 @@ def tune_freq(self):
tune_params = [0.2, 63, 0]
self.set_pid(tune_params)

@retry_multiple_times(loops=3)
def get_freq(self):
"""Returns the current frequency of the CHWP.
Expand All @@ -236,17 +251,17 @@ def get_freq(self):
responses = []
responses.append(self.send_message("*X01"))
decoded_resp = self.return_messages(responses)[0]
attempts = 3
for attempt in range(attempts):
if self.verb:
print(responses)
print(decoded_resp)
if decoded_resp.msg_type == 'measure':
return decoded_resp.measure
elif decoded_resp.msg_type == 'error':
print(f"Error reading freq: {decoded_resp.msg}")
raise ValueError('Could not get current frequency')
if self.verb:
print(responses)
print(decoded_resp)
if decoded_resp.msg_type == 'measure':
return decoded_resp.measure
elif decoded_resp.msg_type == 'error':
raise ValueError(f"Error reading freq: {decoded_resp.msg}")
else:
raise ValueError("Unknown freq response")

@retry_multiple_times(loops=3)
def get_target(self):
"""Returns the target frequency of the CHWP.
Expand All @@ -262,17 +277,17 @@ def get_target(self):
responses = []
responses.append(self.send_message("*R01"))
decoded_resp = self.return_messages(responses)[0]
attempts = 3
for attempt in range(attempts):
if self.verb:
print(responses)
print(decoded_resp)
if decoded_resp.msg_type == 'read':
return decoded_resp.measure
elif decoded_resp.msg_type == 'error':
print(f"Error reading target: {decoded_resp.msg}")
raise ValueError('Could not get target frequency')
if self.verb:
print(responses)
print(decoded_resp)
if decoded_resp.msg_type == 'read':
return decoded_resp.measure
elif decoded_resp.msg_type == 'error':
raise ValueError(f"Error reading target: {decoded_resp.msg}")
else:
raise ValueError('Unknown target response')

@retry_multipe_times(loops=3)
def get_direction(self):
"""Get the current rotation direction.
Expand All @@ -291,16 +306,15 @@ def get_direction(self):
responses = []
responses.append(self.send_message("*R02"))
decoded_resp = self.return_messages(responses)[0]
attempts = 3
for attempt in range(attempts):
if self.verb:
print(responses)
print(decoded_resp)
if decoded_resp.msg_type == 'read':
return decoded_resp.measure
elif decoded_resp.msg_type == 'error':
print(f"Error reading direction: {decoded_resp.msg}")
raise ValueError('Could not get direction')
if self.verb:
print(responses)
print(decoded_resp)
if decoded_resp.msg_type == 'read':
return decoded_resp.measure
elif decoded_resp.msg_type == 'error':
raise ValueError(f"Error reading direction: {decoded_resp.msg}")
else:
raise ValueError('Unknown direction response')

def set_pid(self, params):
"""Sets the PID parameters of the controller.
Expand Down Expand Up @@ -524,19 +538,16 @@ def _decode_read(string):
"""
end_string = string.split('\r')[-1]
if len(end_string) == 9:
read_type = end_string[1:3]
elif len(end_string) == 8:
read_type = '0' + end_string[1:2]
else:
read_type = end_string[1:3]
if len(end_string) != 9:
return DecodedResponse(msg_type='error', msg='Unrecognized Read Length')
# Decode target
if read_type == '01':
target = float(int(end_string[-5:], 16) / 1000.)
target = float(int(end_string[4:], 16) / 1000.)
return DecodedResponse(msg_type='read', msg='Setpoint = ' + str(target), measure=target)
# Decode direction
elif read_type == '02':
if int(end_string[-5:], 16) / 1000. > 2.5:
if int(end_string[4:], 16) / 1000. > 2.5:
return DecodedResponse(msg_type='read', msg='Direction = Reverse', measure=1)
else:
return DecodedResponse(msg_type='read', msg='Direction = Forward', measure=0)
Expand Down

0 comments on commit ef14f44

Please sign in to comment.