-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hwp site changes #541
Merged
Merged
Hwp site changes #541
Changes from 4 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
959fd9b
minor bugfixes to the hwp-gripper agent
e8623ac
Modified hwp-pid and hwp-pmx agents to handle connection interruptions
a50ca05
[pre-commit.ci] pre-commit autoupdate (#540)
pre-commit-ci[bot] fa26efa
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] f939874
Changed connection protocal from an infinite loop into a set number o…
f7cc7e4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 0735a59
Fixes to pre-commit failures
a7f009f
Added read=False to set_current_limit meathod
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
import socket | ||
import time | ||
from socket import AF_INET, SOCK_STREAM, socket | ||
|
||
protection_status_key = [ | ||
'Over voltage', | ||
|
@@ -21,26 +21,51 @@ class PMX: | |
""" | ||
|
||
def __init__(self, ip, port): | ||
self.sock = socket(AF_INET, SOCK_STREAM) | ||
self.sock.connect((ip, port)) | ||
self.sock.settimeout(5) | ||
|
||
self.ip = ip | ||
self.port = port | ||
self.wait_time = 0.01 | ||
self.buffer_size = 128 | ||
self.conn = self._establish_connection(self.ip, int(self.port)) | ||
|
||
def _establish_connection(self, ip, port, timeout=5): | ||
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
conn.settimeout(timeout) | ||
while True: | ||
try: | ||
conn.connect((ip, port)) | ||
break | ||
except (ConnectionRefusedError, OSError) as e: | ||
print(f"Failed to connect to device at {ip}:{port}") | ||
time.sleep(5) | ||
return conn | ||
|
||
def close(self): | ||
self.sock.close() | ||
|
||
def read(self): | ||
return self.sock.recv(self.buffer_size).decode('utf-8') | ||
self.conn.close() | ||
|
||
def send_message(self, msg, read=True): | ||
for attempt in range(2): | ||
try: | ||
self.conn.sendall(msg) | ||
time.sleep(0.5) | ||
if read: | ||
data = self.conn.recv(self.buffer_size).decode('utf-8') | ||
return data | ||
return | ||
except (socket.timeout, OSError) as e: | ||
print("Caught timeout waiting for responce from PMX. Trying again...") | ||
time.sleep(1) | ||
if attempt == 1: | ||
print("Resetting connection") | ||
self.conn.close() | ||
self.conn = self._establish_connection(self.ip, int(self.port)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as mentioned above, if there is no connection, |
||
return self.send_message(msg, read=read) | ||
|
||
def wait(self): | ||
time.sleep(self.wait_time) | ||
|
||
def check_output(self): | ||
""" Return the output status """ | ||
self.sock.sendall(b'output?\n') | ||
val = int(self.read()) | ||
val = int(self.send_message(b'output?\n')) | ||
msg = "Measured output state = " | ||
states = {0: 'OFF', 1: 'ON'} | ||
if val in states: | ||
|
@@ -51,103 +76,95 @@ def check_output(self): | |
|
||
def check_error(self): | ||
""" Check oldest error from error queues. Error queues store up to 255 errors """ | ||
self.sock.sendall(b':system:error?\n') | ||
val = self.read() | ||
val = self.send_message(b':system:error?\n') | ||
code, msg = val.split(',') | ||
code = int(code) | ||
msg = msg[1:-2] | ||
return msg, code | ||
|
||
def clear_alarm(self): | ||
""" Clear alarm """ | ||
self.sock.sendall(b'output:protection:clear\n') | ||
self.send_message(b'output:protection:clear\n', read=False) | ||
|
||
def turn_on(self): | ||
""" Turn the PMX on """ | ||
self.sock.sendall(b'output 1\n') | ||
self.send_message(b'output 1\n', read=False) | ||
self.wait() | ||
return self.check_output() | ||
|
||
def turn_off(self): | ||
""" Turn the PMX off """ | ||
self.sock.sendall(b'output 0\n') | ||
self.send_message(b'output 0\n', read=False) | ||
self.wait() | ||
return self.check_output() | ||
|
||
def check_current(self): | ||
""" Check the current setting """ | ||
self.sock.sendall(b'curr?\n') | ||
val = float(self.read()) | ||
val = float(self.send_message(b'curr?\n')) | ||
msg = "Current setting = {:.3f} A".format(val) | ||
return msg, val | ||
|
||
def check_voltage(self): | ||
""" Check the voltage setting """ | ||
self.sock.sendall(b'volt?\n') | ||
val = float(self.read()) | ||
val = float(self.send_message(b'volt?\n')) | ||
msg = "Voltage setting = {:.3f} V".format(val) | ||
return msg, val | ||
|
||
def meas_current(self): | ||
""" Measure the current """ | ||
self.sock.sendall(b'meas:curr?\n') | ||
val = float(self.read()) | ||
val = float(self.send_message(b'meas:curr?\n')) | ||
msg = "Measured current = {:.3f} A".format(val) | ||
return msg, val | ||
|
||
def meas_voltage(self): | ||
""" Measure the voltage """ | ||
self.sock.sendall(b'meas:volt?\n') | ||
val = float(self.read()) | ||
val = float(self.send_message(b'meas:volt?\n')) | ||
msg = "Measured voltage = {:.3f} V".format(val) | ||
return msg, val | ||
|
||
def set_current(self, curr): | ||
""" Set the current """ | ||
self.sock.sendall(b'curr %a\n' % curr) | ||
self.send_message(b'curr %a\n' % curr, read=False) | ||
self.wait() | ||
return self.check_current() | ||
|
||
def set_voltage(self, vol): | ||
""" Set the voltage """ | ||
self.sock.sendall(b'volt %a\n' % vol) | ||
self.send_message(b'volt %a\n' % vol, read=False) | ||
self.wait() | ||
return self.check_voltage() | ||
|
||
def check_source(self): | ||
""" Check the source of PMX """ | ||
self.sock.sendall(b'volt:ext:sour?\n') | ||
val = self.read() | ||
val = self.send_message(b'volt:ext:sour?\n') | ||
msg = "Source: " + val | ||
return msg, val | ||
|
||
def use_external_voltage(self): | ||
""" Set PMX to use external voltage """ | ||
self.sock.sendall(b'volt:ext:sour volt\n') | ||
self.send_message(b'volt:ext:sour volt\n', read=False) | ||
self.wait() | ||
return self.check_source() | ||
|
||
def ign_external_voltage(self): | ||
""" Set PMX to ignore external voltage """ | ||
self.sock.sendall(b'volt:ext:sour none\n') | ||
self.send_message(b'volt:ext:sour none\n', read=False) | ||
self.wait() | ||
return self.check_source() | ||
|
||
def set_current_limit(self, curr_lim): | ||
""" Set the PMX current limit """ | ||
self.sock.sendall(b'curr:prot %a\n' % curr_lim) | ||
self.send_message(b'curr:prot %a\n' % curr_lim) | ||
BrianJKoopman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.wait() | ||
self.sock.sendall(b'curr:prot?\n') | ||
val = float(self.read()) | ||
val = float(self.send_message(b'curr:prot?\n')) | ||
msg = "Current Limit: {:.3f} A".format(val) | ||
return msg | ||
|
||
def set_voltage_limit(self, vol_lim): | ||
""" Set the PMX voltage limit """ | ||
self.sock.sendall(b'volt:prot %a\n' % vol_lim) | ||
self.send_message(b'volt:prot %a\n' % vol_lim, read=False) | ||
self.wait() | ||
self.sock.sendall(b'volt:prot?\n') | ||
val = float(self.read()) | ||
val = float(self.send_message(b'volt:prot?\n')) | ||
msg = "Voltage Limit: {:.3f} V".format(val) | ||
return msg | ||
|
||
|
@@ -156,8 +173,7 @@ def check_prot(self): | |
Return: | ||
val (int): protection status code | ||
""" | ||
self.sock.sendall(b'stat:ques?\n') | ||
val = int(self.read()) | ||
val = int(self.send_message(b'stat:ques?\n')) | ||
return val | ||
|
||
def get_prot_msg(self, val): | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love this being an infinite loop, since control is never passed back to the agent. Can we keep this to a finite number of attempts, and handle re-connection at the agent level?