From ae6bf53f65da1acc3ebdd64df49648ac8ccad080 Mon Sep 17 00:00:00 2001 From: Axel Fischer Date: Thu, 28 Nov 2024 17:53:51 +0100 Subject: [PATCH 1/2] Switch-CTS_CS: working version --- src/Switch-CTS_CS/license.txt | 2 +- src/Switch-CTS_CS/main.py | 324 +++++++++++++++------------------- 2 files changed, 140 insertions(+), 186 deletions(-) diff --git a/src/Switch-CTS_CS/license.txt b/src/Switch-CTS_CS/license.txt index 11aaf8f6..98ef5718 100644 --- a/src/Switch-CTS_CS/license.txt +++ b/src/Switch-CTS_CS/license.txt @@ -5,7 +5,7 @@ find those in the corresponding folders or contact the maintainer. MIT License -Copyright (c) 2021 SweepMe! GmbH +Copyright (c) 2024 SweepMe! GmbH (sweep-me.net) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/Switch-CTS_CS/main.py b/src/Switch-CTS_CS/main.py index 7b2cfc87..ee6f2001 100644 --- a/src/Switch-CTS_CS/main.py +++ b/src/Switch-CTS_CS/main.py @@ -5,7 +5,7 @@ # # MIT License # -# Copyright (c) 2021 SweepMe! GmbH +# Copyright (c) 2024 SweepMe! GmbH (sweep-me.net) # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -26,17 +26,17 @@ # SOFTWARE. -# SweepMe! device class -# Type: Logger -# Device: CTS CS +# SweepMe! driver +# * Module: Switch +# * Instrument: CTS CS climate chambers import socket import time -from ErrorMessage import error, debug +from pysweepme.ErrorMessage import error, debug +from pysweepme.EmptyDeviceClass import EmptyDevice -from EmptyDeviceClass import EmptyDevice # Class comes with SweepMe! class Device(EmptyDevice): @@ -45,9 +45,11 @@ class Device(EmptyDevice):

 

Communication

@@ -67,27 +69,19 @@ def __init__(self): EmptyDevice.__init__(self) - self.shortname = "CTS CS" # short name will be shown in the sequencer - - - # self.port_manager = True - - # self.port_types = ["TCPIP"] - - + self.shortname = "CTS CS" + self.port_properties = { - "timeout": 3, - "baudrate": 19200, - "EOL": "", - "parity": "O", - } - - + "timeout": 3, + "baudrate": 19200, + "EOL": "", + "parity": "O", + } + self.STX = 0x02 - self.ADR = 0x81 # 0x81 - 0xA0 (Adresse 01 - 32); Default address is 01 = 0x81, + self.ADR = 0x81 # 0x81 - 0xA0 (Address 01 - 32); Default address is 01 = 0x81, self.ETX = 0x03 - self.analog_channels_info = { "Temperature": { "unit": "°C", @@ -144,8 +138,7 @@ def __init__(self): }, } - - + """ Wert Kanal-Nr. Kanäle Grenzen Ax CID @@ -165,61 +158,47 @@ def __init__(self): A= 14 t SiedeK in [°C] min. -100.00 [°C], max. 200.00 [°C] A> 15 Überh.VK in [K] min. 0.00 [K], max. 20.00 [K] A? 16 SaugdrVK in [bar] min. 2.00 [bar], max. 6.00 [bar] - """ + """ + # add the command to readout the channel self.analog_channels_get = { - "Temperature": "A0", - "Humidity": "A1", - "Pt100 movable": "A2", - "TempSupplyAir": "A3", - "TempExhAir": "A4", - "HumidSupplAir": "A5", - "HumidExhAir": "A6", - "Water storage": "A7", - "Dew point": "A8", - } + "Temperature": "A0", + "Humidity": "A1", + "Pt100 movable": "A2", + "TempSupplyAir": "A3", + "TempExhAir": "A4", + "HumidSupplAir": "A5", + "HumidExhAir": "A6", + "Water storage": "A7", + "Dew point": "A8", + } self.analog_channels_set = { - "Temperature": 0, - "Humidity": 1, - } - - + "Temperature": 0, + "Humidity": 1, + } + # index is the number of the parameter, related to the command 'O' + # 'O' is part of the returned message self.digital_channels_get = { - "Compressed air": 8, - } - - """ - Befehl Kanal-Nr. Kanäle Typ Bedeutung - sx CID - s1 - START / STOPP SYSTEM Kammer ein-/ausschalten - s2 - SAMMELSTÖRUNG SYSTEM Fehler quittieren - s3 1 PAUSE Merker 1 Kammer unterbrechen - s4 2 Feuchte Merker 2 nicht änderbar - s5 3 Tür zu Merker 3 nicht änderbar - s6 4 Türverrieg Merker 4 nicht änderbar - s7 1 Druckluft Softkey 1 Softkey kann gesetzt werden! - s8 2 RegZuluft Softkey 2 Softkey kann gesetzt werden! - s9 3 Dig.Ausg1 Softkey 3 Softkey kann gesetzt werden! - s: 4 Dig.Ausg2 Softkey 4 Softkey kann gesetzt werden! - """ - + "Compressed air": 8, + } + # command as related to the command "s" self.digital_channels_set = { - # "Start/Stop": "s1", - # "Pause": "s3", - "Compressed air": "s7", - } + # "Start/Stop": "s1", + # "Pause": "s3", + "Compressed air": "s7", + } self.status_channels = { - 2: "Temperature", - 3: "Humidity", - 4: "Door closed", - 5: "Door locked", - 6: "Compressed air", - 7: "RegSupplyAir", - } + 2: "Temperature", + 3: "Humidity", + 4: "Door closed", + 5: "Door locked", + 6: "Compressed air", + 7: "RegSupplyAir", + } # "Paused": answer[2] == "1", # "Humidity": answer[3] == "1", @@ -245,50 +224,42 @@ def __init__(self): self.plottype.append(True) self.savetype.append(True) - self.variables += ["Remaining hold time", "reached"] # define as many variables you need - self.units += ["min", ""] # make sure that you have as many units as you have variables + self.variables += ["Remaining hold time", "reached"] # define as many variables you need + self.units += ["min", ""] # make sure that you have as many units as you have variables self.plottype += [True, True] # True to plot data, corresponding to self.variables self.savetype += [True, True] # True to save data, corresponding to self.variables self._temperature_tolerance = 2.0 self._humidity_tolerance = 2.0 - - + def set_GUIparameter(self): - # add keys and values to generate GUI elements in the Parameters-Box - # If you use this template to create a driver for modules other than Logger or Switch, you need to use fixed keys that are defined for each module. - - GUIparameter = { - "SweepMode": ["None"], #"Temperature in °C", "Humidity in %rF"], - "Port": "10.10.1.5", - - "Temperature in °C": "", - "Humidity in %rF": "", - "Compressed air": "", - "Hold time in min": "", - "Reference temperature": ["Temper", "Pt100 movable"], - } + gui_parameter = { + "SweepMode": ["None"], + "Port": "10.10.1.5", + + "Temperature in °C": "", + "Humidity in %rF": "", + "Reach humidity": "", + "Compressed air": "", + "Hold time in min": "", + "Reference temperature": ["Temper", "Pt100 movable"], + } - return GUIparameter + return gui_parameter def get_GUIparameter(self, parameter): + self.port_string = parameter["Port"] - ### the port selected by the user is readout here and saved in a variable that can be later used to open the correct port - self.port_string = parameter["Port"] # use this string to open the right port object later during 'connect' - # print("Selected port", self.port_string) - self.hold_time_min = parameter["Hold time in min"] self.temperature_set = parameter["Temperature in °C"] self.humidity_set = parameter["Humidity in %rF"] + self.reach_humidity = parameter["Reach humidity"] self.compressed_air_set = parameter["Compressed air"] self.reference_temperature = parameter["Reference temperature"] - - # print("Hold time min", self.hold_time_min) - """ here, semantic standard functions start that are called by SweepMe! during a measurement """ def connect(self): @@ -296,10 +267,9 @@ def connect(self): self.comm_type = "TCPIP" self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.client.connect((self.port_string, 1080)) # 1080 is the standard port for the ASCII TCPIP protocol + self.client.connect((self.port_string, 1080)) # 1080 is the standard port for the ASCII TCPIP protocol self.client.settimeout(3) - def initialize(self): it = self.get_instrument_time() @@ -308,21 +278,28 @@ def initialize(self): sv = self.get_software_version() # print("Software version:", sv) - self.set_program_number(0) # switch off all programs - - + self.set_program_number(0) # switch off all programs + def configure(self): + + self.temperature_set = str(self.temperature_set) + self.humidity_set = str(self.humidity_set) - self.hold_starttime = time.perf_counter() + self.hold_starttime = time.time() if self.hold_time_min == "" or self.hold_time_min == "-": self.hold_time_min = 0.0 self.hold_time_min = float(self.hold_time_min) + self.reach_temperature = False + if self.temperature_set != "" and self.temperature_set != "-": + self._temperature_set_value = float(self.temperature_set) + self.reach_temperature = True + if self.hold_time_min > 0.0: # we calculate the ramp based on the current set value current_temperature_set = self.get_temperature_set() @@ -335,16 +312,23 @@ def configure(self): else: self._temperature_set_value = None + + self.reach_humidity = self.reach_humidity == "1" or self.reach_humidity == "True" or self.reach_humidity is True if self.humidity_set != "" and self.humidity_set != "-": + self._humidity_set_value = float(self.humidity_set) - - if self.hold_time_min > 0.0: - # we calculate the ramp based on the current set value - current_humidity_set = self.get_humidity_set() - self.set_humidity_ramp_rate( (float(self.humidity_set)-current_humidity_set) / self.hold_time_min ) - else: - self.set_humidity_ramp_rate(999.9) + + # no ramp rate adjustable ramp rate for humidity, immediately tries to reach the new value + self.set_humidity_ramp_rate(999.9) + + # code to change humidity within the hold time to the new set value starting from the current humidity + #if self.hold_time_min > 0.0: + # # we calculate the ramp based on the current set value + # current_humidity_set = self.get_humidity_set() + # self.set_humidity_ramp_rate( (float(self.humidity_set)-current_humidity_set) / self.hold_time_min ) + #else: + # self.set_humidity_ramp_rate(999.9) # we set humidity after ramp rate to take effect self.set_humidity(self._humidity_set_value) @@ -359,8 +343,7 @@ def configure(self): elif self.compressed_air_set in ["Off", "off", "0"]: self.set_compressed_air(0) - - + # print("Temperature set:", self.get_temperature_set()) # print("Temperature ramp:", self.get_temperature_ramp_rates()) @@ -370,39 +353,22 @@ def configure(self): # print("Temperature ramp parameters:", self.get_temperature_ramp_parameters()) # print("Humidity ramp parameters:", self.get_humidity_ramp_parameters()) - def unconfigure(self): pass - - # def reconfigure(self, parameters, keys): - """ 'reconfigure' is called whenever parameters of the GUI change by using the {...}-parameter system """ - - # print() - # print("reconfigure") - # print("Parameters:", parameters) - # print("Changed keys:", keys) - - ## The following two lines are the default behavior that is used by EmptyDevice if you do not overwrite 'reconfigure' - # self.get_GUIparameter(parameters) - # self.configure() - - + def poweron(self): self.start_chamber() def poweroff(self): self.stop_chamber() - def signin(self): pass - # self.hold_starttime = time.perf_counter() # this is needed if two times the same step is used and configure is not called. + # self.hold_starttime = time.time() # this is needed if two times the same step is used and configure is not called. # print("CTS CS driver: Hold starttime set back during signin() function.") - """ the following functions are called for each measurement point """ - def measure(self): self.results = [] @@ -412,28 +378,27 @@ def measure(self): self.write_message(self.analog_channels_get[key]) answer = self.read_message().decode('latin-1') vals = answer.split() - # print(key, vals) self.results.append(float(vals[1])) # todo: change to command Aa to read all channels. However, firmware version must support this digital_channel_values = self.get_digital_channels() + for key in self.digital_channels_get: - self.results.append(digital_channel_values[self.digital_channels_get[key]] == 1) - + self.results.append(digital_channel_values[self.digital_channels_get[key]] == "1") def call(self): status = self.get_status() # print(status) - time_to_hold = self.hold_time_min - (time.perf_counter() - self.hold_starttime)/60.0 + time_to_hold = self.hold_time_min - (time.time() - self.hold_starttime)/60.0 self.results.append(time_to_hold) - reached = time_to_hold < 0.0 if self.hold_time_min > 0.0 else True # only if hold time larger 0, we need to check it, otherwise it is automatically reached + # only if hold time larger 0, we need to check it, otherwise it is automatically reached + reached = time_to_hold < 0.0 if self.hold_time_min > 0.0 else True - - if not self._temperature_set_value is None: + if self.reach_temperature: # depending on the user selection, a different temperature sensor will be used for reaching the state if self.reference_temperature == "Temper": @@ -444,26 +409,23 @@ def call(self): self._is_temperature_reached = abs(self.results[index]-self._temperature_set_value) < self._temperature_tolerance reached = reached and self._is_temperature_reached - if not self._humidity_set_value is None: + if self.reach_humidity: self._is_humidity_reached = abs(self.results[1]-self._humidity_set_value) < self._humidity_tolerance reached = reached and self._is_humidity_reached # we set back the hold starttime if all steps are reached. # If another process step is started with the same hold time, it has to wait again even if configure was not called if reached: - self.hold_starttime = time.perf_counter() + self.hold_starttime = time.time() self.results.append(reached) return self.results - - + def write_message(self, cmd): if self.comm_type == "TCPIP": self.client.send(cmd.encode("latin-1")) - # self.port.write(cmd) - elif self.commetype == "COM": @@ -474,7 +436,6 @@ def write_message(self, cmd): self.port.write(msg) self.last_cmd = cmd - def read_message(self): if self.comm_type == "TCPIP": @@ -487,7 +448,6 @@ def read_message(self): # todo: COM port commuication is not implemented yet answer = self.port.read() - def get_software_version(self): """ get software versions, return a list with SPS-Version, ITC-Version, and SPSNummer """ @@ -519,14 +479,11 @@ def get_analog_channel(self, channel): answer = self.read_message().decode('latin-1') vals = answer.split() - def get_analog_channels(self): self.write_message("Aa") answer = self.read_message().decode('latin-1') channels = answer[1:].split() - - - + def set_analog_channel(self, channel, value): """ @@ -539,20 +496,16 @@ def set_analog_channel(self, channel, value): self.write_message("a%i %#05.1f" % (int(channel), float(value) ) ) # command structure: ax_yyy.y return self.read_message().decode('latin-1') - def set_temperature(self, value): answer = self.set_analog_channel(self.analog_channels_set["Temperature"], value) return answer - def set_humidity(self, value): answer = self.set_analog_channel(self.analog_channels_set["Humidity"], value) return answer - - """" Befehl/Wert Kanal-Nr. Kanäle Grenzen u/d/U/E/Rx CID @@ -575,7 +528,6 @@ def get_temperature_ramp_parameters(self): 5. Set value """ - self.write_message("R%i" % self.analog_channels_set["Temperature"]) return self.read_message().decode('latin-1').split() @@ -598,8 +550,9 @@ def set_temperature_ramp_rate_heating(self, value): if float(value) < 0.1: value = 0.1 - - self.write_message("u%i %#05.1f" % (self.analog_channels_set["Temperature"], float(value)) ) # command structure: ux_yyy.y + + # command structure: ux_yyy.y + self.write_message("u%i %#05.1f" % (self.analog_channels_set["Temperature"], float(value))) return self.read_message().decode('latin-1') def set_temperature_ramp_rate_cooling(self, value): @@ -611,20 +564,19 @@ def set_temperature_ramp_rate_cooling(self, value): if float(value) < 0.1: value = 0.1 - - self.write_message("d%i %#05.1f" % (self.analog_channels_set["Temperature"], float(value)) ) # command structure: dx_yyy.y + + # command structure: dx_yyy.y + self.write_message("d%i %#05.1f" % (self.analog_channels_set["Temperature"], float(value)) ) return self.read_message().decode('latin-1') - - def get_humidity_set(self): self.write_message("E%i" % self.analog_channels_set["Humidity"]) return float(self.read_message().decode('latin-1').split()[1]) def get_humidity_ramp_parameters(self): - """ returns a list with ramp parameters - + """ Returns a list with ramp parameters + 1. Channel number 2. two integers for ramp active and ramp control on 3. Ramp up @@ -649,13 +601,14 @@ def set_humidity_ramp_rate_heating(self, value): value = abs(value) - if float(value) > 999.9: # max possible value + if float(value) > 999.9: # max possible value value = 999.9 if float(value) < 0.1: value = 0.1 - - self.write_message("u%i %#05.1f" % (self.analog_channels_set["Humidity"], float(value)) ) # command structure: ux_yyy.y + + # command structure: ux_yyy.y + self.write_message("u%i %#05.1f" % (self.analog_channels_set["Humidity"], float(value))) return self.read_message().decode('latin-1') def set_humidity_ramp_rate_cooling(self, value): @@ -667,12 +620,11 @@ def set_humidity_ramp_rate_cooling(self, value): if float(value) < 0.01: value = 0.01 - - self.write_message("d%i %#05.1f" % (self.analog_channels_set["Humidity"], float(value)) ) # command structure: dx_yyy.y + + # command structure: dx_yyy.y + self.write_message("d%i %#05.1f" % (self.analog_channels_set["Humidity"], float(value))) return self.read_message().decode('latin-1') - - def get_status(self): self.write_message("S") @@ -685,7 +637,6 @@ def get_status(self): "Error number": ord(answer[8]), "Error message": "No error", } - # "Paused": answer[2] == "1", # "Humidity": answer[3] == "1", @@ -703,7 +654,6 @@ def get_status(self): print(error_messages) return response_dict - """ Pos. Wert Kanal-Nr. Kanäle Typ Bedeutung @@ -722,26 +672,35 @@ def get_status(self): w siehe Tabelle FEHLERNUMMER SYSTEM Kapitel 4.6.4 """ - - - - pass def set_digital_channel(self, cmd, state): """ set a digital channel with state 0 or 1""" state = int(state) - - self.write_message("%s %i" % (cmd, state ) ) # command structure: sx_y + + # command structure: sx_y + self.write_message("%s %i" % (cmd, state)) return self.read_message().decode('latin-1') def get_digital_channels(self): self.write_message("O") - answer = self.read_message().decode('latin-1') - print("Digital channels:", answer) + return self.read_message().decode('latin-1') # returns with lead 'O' - return answer + """ + Befehl Kanal-Nr. Kanäle Typ Bedeutung + sx CID + s1 - START / STOPP SYSTEM Kammer ein-/ausschalten + s2 - SAMMELSTÖRUNG SYSTEM Fehler quittieren + s3 1 PAUSE Merker 1 Kammer unterbrechen + s4 2 Feuchte Merker 2 nicht änderbar + s5 3 Tür zu Merker 3 nicht änderbar + s6 4 Türverrieg Merker 4 nicht änderbar + s7 1 Druckluft Softkey 1 Softkey kann gesetzt werden! + s8 2 RegZuluft Softkey 2 Softkey kann gesetzt werden! + s9 3 Dig.Ausg1 Softkey 3 Softkey kann gesetzt werden! + s: 4 Dig.Ausg2 Softkey 4 Softkey kann gesetzt werden! + """ def start_chamber(self): """ starts the automatic control """ @@ -770,7 +729,6 @@ def set_compressed_air(self, state): else: raise Exception("Climate chamber has no digital channel for compressed air. Compressed air cannot be set.") - def read_error_number(self): self.write_message("H01") answer = self.read_message().decode('latin-1').split()[1] @@ -780,7 +738,3 @@ def read_error_messages(self): self.write_message("H02") answer = self.read_message().decode('latin-1') return answer - - - -""" """ \ No newline at end of file From 02fb5def9291a4d65428c357a5d425330f540e48 Mon Sep 17 00:00:00 2001 From: Axel Fischer Date: Fri, 20 Dec 2024 16:04:56 +0100 Subject: [PATCH 2/2] CTS Climate chamber: evaluate reach humidity bool in get_GUIparameter --- src/Switch-CTS_CS/main.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Switch-CTS_CS/main.py b/src/Switch-CTS_CS/main.py index ee6f2001..75a934de 100644 --- a/src/Switch-CTS_CS/main.py +++ b/src/Switch-CTS_CS/main.py @@ -255,7 +255,7 @@ def get_GUIparameter(self, parameter): self.hold_time_min = parameter["Hold time in min"] self.temperature_set = parameter["Temperature in °C"] self.humidity_set = parameter["Humidity in %rF"] - self.reach_humidity = parameter["Reach humidity"] + self.reach_humidity = parameter["Reach humidity"] in ["1", "True", True] self.compressed_air_set = parameter["Compressed air"] self.reference_temperature = parameter["Reference temperature"] @@ -313,8 +313,6 @@ def configure(self): else: self._temperature_set_value = None - self.reach_humidity = self.reach_humidity == "1" or self.reach_humidity == "True" or self.reach_humidity is True - if self.humidity_set != "" and self.humidity_set != "-": self._humidity_set_value = float(self.humidity_set)