Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cryomech: Move PTC class to drivers module #771

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 1 addition & 182 deletions socs/agents/cryomech_cpa/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,193 +3,12 @@
# Sanah Bhimani, May 2022

import argparse
import random
import socket
import struct
import time

from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock

STX = '\x02'
ADDR = '\x10'
CMD = '\x80'
CR = '\x0D'
DATA_WRITE = '\x61'
DATA_READ = '\x63'
ESC = '\x07'
ESC_STX = '\x30'
ESC_CR = '\x31'
ESC_ESC = '\x32'


class PTC:
def __init__(self, ip_address, port=502, timeout=10, fake_errors=False):
self.ip_address = ip_address
self.port = int(port)
self.fake_errors = fake_errors

self.model = None
self.serial = None
self.software_revision = None

self.comm = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.comm.connect((self.ip_address, self.port)) # connects to the PTC
self.comm.settimeout(timeout)

def get_data(self):
"""
Gets the raw data from the ptc and returns it in a usable format.
"""
self.comm.sendall(self.buildRegistersQuery())
data = self.comm.recv(1024)
data_flag, brd = self.breakdownReplyData(data)

return data_flag, brd

@staticmethod
def buildRegistersQuery():
query = bytes([0x09, 0x99, # Message ID
0x00, 0x00, # Unused
0x00, 0x06, # Message size in bytes
0x01, # Slave Address
0x04, # Function Code
0x00, 0x01, # The starting Register Number
0x00, 0x35]) # How many to read
return query

def power(self, state):
"""Turn the PTC on or off.

Parameters
----------
state : str
Desired power state of the PTC, either 'on', or 'off'.

"""
command = [0x09, 0x99, # Message ID
0x00, 0x00, # Unused
0x00, 0x06, # Message size in bytes
0x01, # Slave Address
0x06, # Function Code
0x00, 0x01] # Register Number

if state.lower() == 'on':
command.extend([0x00, 0x01])
elif state.lower() == 'off':
command.extend([0x00, 0xff])
else:
raise ValueError(f"Invalid state: {state}")

self.comm.sendall(bytes(command))
self.comm.recv(1024) # Discard the echoed command

def breakdownReplyData(self, rawdata):
"""Take in raw ptc data, and return a dictionary.

The dictionary keys are the data labels, the dictionary values are the
data in floats or ints.

Returns
-------
data_flag : bool
False if data is valid, True if output could not be interpretted.
data : dict
Data dictionary already formatted for passing to OCS Feed.

"""

# Associations between keys and their location in rawData
keyloc = {"Operating_State": [9, 10],
"Compressor_State": [11, 12],
"Warning_State": [15, 16, 13, 14],
"Alarm_State": [19, 20, 17, 18],
"Coolant_In_Temp": [23, 24, 21, 22],
"Coolant_Out_Temp": [27, 28, 25, 26],
"Oil_Temp": [31, 32, 29, 30],
"Helium_Temp": [35, 36, 33, 34],
"Low_Pressure": [39, 40, 37, 38],
"Low_Pressure_Average": [43, 44, 41, 42],
"High_Pressure": [47, 48, 45, 46],
"High_Pressure_Average": [51, 52, 49, 50],
"Delta_Pressure_Average": [55, 56, 53, 54],
"Motor_Current": [59, 60, 57, 58],
"Hours_of_Operation": [63, 64, 61, 62],
"Pressure_Unit": [65, 66],
"Temperature_Unit": [67, 68],
"Serial_Number": [69, 70],
"Model": [71, 72],
"Software_Revision": [73, 74]}

# Iterate through all keys and return the data in a usable format.
# If there is an error in the string format, print the
# error to logs, return an empty dictionary, and flag the data as bad
data = {}

# If fake_errors=True, then randomly output the string 'FAKE ERROR'
# instead of the actual data 50% of the time
if self.fake_errors:
if random.random() < 0.5:
rawdata = "FAKE ERROR"

try:
for key in keyloc.keys():
locs = keyloc[key]
wkrBytes = bytes([rawdata[loc] for loc in locs])

# four different data formats to unpack
# Big endian unsigned integer 16 bits
if key in [
"Operating_State",
"Compressor_State",
"Pressure_Unit",
"Temperature_Unit",
"Serial_Number",
]:
state = struct.unpack(">H", wkrBytes)[0]
# Serial number is an attribute, not publishable data
if key == "Serial_Number":
self.serial = state
else:
data[key] = state
# 32bit signed integer which is actually stored as a
# 32bit IEEE float (silly)
elif key in ["Warning_State", "Alarm_State"]:
state = int(struct.unpack(">f", wkrBytes)[0])
data[key] = state
# 2 x 8-bit lookup tables.
elif key in ["Model"]:
model_major = struct.unpack(
">B", bytes([rawdata[locs[0]]]))[0]
model_minor = struct.unpack(
">B", bytes([rawdata[locs[1]]]))[0]
# Model is an attribute, not publishable data
self.model = str(model_major) + "_" + str(model_minor)
elif key in ["Software_Revision"]:
version_major = struct.unpack(
">B", bytes([rawdata[locs[0]]]))[0]
version_minor = struct.unpack(
">B", bytes([rawdata[locs[1]]]))[0]
self.software_revision = str(version_major) + "." + str(version_minor)
# 32 bit Big endian IEEE floating point
else:
data[key] = struct.unpack(">f", wkrBytes)[0]

data_flag = False

except BaseException:
data_flag = True
print("Compressor output could not be converted to numbers."
f"Skipping this data block. Bad output string is {rawdata}")

return data_flag, data

def __del__(self):
"""
If the PTC class instance is destroyed, close the connection to the
ptc.
"""
self.comm.close()
from socs.agents.cryomech_cpa.drivers import PTC


class PTCAgent:
Expand Down
183 changes: 183 additions & 0 deletions socs/agents/cryomech_cpa/drivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import random
import socket
import struct

STX = '\x02'
ADDR = '\x10'
CMD = '\x80'
CR = '\x0D'
DATA_WRITE = '\x61'
DATA_READ = '\x63'
ESC = '\x07'
ESC_STX = '\x30'
ESC_CR = '\x31'
ESC_ESC = '\x32'


class PTC:
def __init__(self, ip_address, port=502, timeout=10, fake_errors=False):
self.ip_address = ip_address
self.port = int(port)
self.fake_errors = fake_errors

self.model = None
self.serial = None
self.software_revision = None

self.comm = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.comm.connect((self.ip_address, self.port)) # connects to the PTC
self.comm.settimeout(timeout)

def get_data(self):
"""
Gets the raw data from the ptc and returns it in a usable format.
"""
self.comm.sendall(self.buildRegistersQuery())
data = self.comm.recv(1024)
data_flag, brd = self.breakdownReplyData(data)

return data_flag, brd

@staticmethod
def buildRegistersQuery():
query = bytes([0x09, 0x99, # Message ID
0x00, 0x00, # Unused
0x00, 0x06, # Message size in bytes
0x01, # Slave Address
0x04, # Function Code
0x00, 0x01, # The starting Register Number
0x00, 0x35]) # How many to read
return query

def power(self, state):
"""Turn the PTC on or off.

Parameters
----------
state : str
Desired power state of the PTC, either 'on', or 'off'.

"""
command = [0x09, 0x99, # Message ID
0x00, 0x00, # Unused
0x00, 0x06, # Message size in bytes
0x01, # Slave Address
0x06, # Function Code
0x00, 0x01] # Register Number

if state.lower() == 'on':
command.extend([0x00, 0x01])
elif state.lower() == 'off':
command.extend([0x00, 0xff])
else:
raise ValueError(f"Invalid state: {state}")

self.comm.sendall(bytes(command))
self.comm.recv(1024) # Discard the echoed command

def breakdownReplyData(self, rawdata):
"""Take in raw ptc data, and return a dictionary.

The dictionary keys are the data labels, the dictionary values are the
data in floats or ints.

Returns
-------
data_flag : bool
False if data is valid, True if output could not be interpretted.
data : dict
Data dictionary already formatted for passing to OCS Feed.

"""

# Associations between keys and their location in rawData
keyloc = {"Operating_State": [9, 10],
"Compressor_State": [11, 12],
"Warning_State": [15, 16, 13, 14],
"Alarm_State": [19, 20, 17, 18],
"Coolant_In_Temp": [23, 24, 21, 22],
"Coolant_Out_Temp": [27, 28, 25, 26],
"Oil_Temp": [31, 32, 29, 30],
"Helium_Temp": [35, 36, 33, 34],
"Low_Pressure": [39, 40, 37, 38],
"Low_Pressure_Average": [43, 44, 41, 42],
"High_Pressure": [47, 48, 45, 46],
"High_Pressure_Average": [51, 52, 49, 50],
"Delta_Pressure_Average": [55, 56, 53, 54],
"Motor_Current": [59, 60, 57, 58],
"Hours_of_Operation": [63, 64, 61, 62],
"Pressure_Unit": [65, 66],
"Temperature_Unit": [67, 68],
"Serial_Number": [69, 70],
"Model": [71, 72],
"Software_Revision": [73, 74]}

# Iterate through all keys and return the data in a usable format.
# If there is an error in the string format, print the
# error to logs, return an empty dictionary, and flag the data as bad
data = {}

# If fake_errors=True, then randomly output the string 'FAKE ERROR'
# instead of the actual data 50% of the time
if self.fake_errors:
if random.random() < 0.5:
rawdata = "FAKE ERROR"

try:
for key in keyloc.keys():
locs = keyloc[key]
wkrBytes = bytes([rawdata[loc] for loc in locs])

# four different data formats to unpack
# Big endian unsigned integer 16 bits
if key in [
"Operating_State",
"Compressor_State",
"Pressure_Unit",
"Temperature_Unit",
"Serial_Number",
]:
state = struct.unpack(">H", wkrBytes)[0]
# Serial number is an attribute, not publishable data
if key == "Serial_Number":
self.serial = state
else:
data[key] = state
# 32bit signed integer which is actually stored as a
# 32bit IEEE float (silly)
elif key in ["Warning_State", "Alarm_State"]:
state = int(struct.unpack(">f", wkrBytes)[0])
data[key] = state
# 2 x 8-bit lookup tables.
elif key in ["Model"]:
model_major = struct.unpack(
">B", bytes([rawdata[locs[0]]]))[0]
model_minor = struct.unpack(
">B", bytes([rawdata[locs[1]]]))[0]
# Model is an attribute, not publishable data
self.model = str(model_major) + "_" + str(model_minor)
elif key in ["Software_Revision"]:
version_major = struct.unpack(
">B", bytes([rawdata[locs[0]]]))[0]
version_minor = struct.unpack(
">B", bytes([rawdata[locs[1]]]))[0]
self.software_revision = str(version_major) + "." + str(version_minor)
# 32 bit Big endian IEEE floating point
else:
data[key] = struct.unpack(">f", wkrBytes)[0]

data_flag = False

except BaseException:
data_flag = True
print("Compressor output could not be converted to numbers."
f"Skipping this data block. Bad output string is {rawdata}")

return data_flag, data

def __del__(self):
"""
If the PTC class instance is destroyed, close the connection to the
ptc.
"""
self.comm.close()