From dda4ed60a0d9aa41da34398d035fc6ea5e3b24a5 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Sun, 5 Jan 2025 15:45:12 +0100 Subject: [PATCH 01/17] implement a way to switch Tarif color based on Summation --- DevicesModules/custom_zlinky.py | 130 ++++++++++++++++++++---- Modules/zlinky.py | 171 +++++++++++++++++++++++--------- 2 files changed, 235 insertions(+), 66 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index a3abf4485..c3d885c3e 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -1,31 +1,61 @@ -import binascii +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Implementation of Zigbee for Domoticz plugin. +# +# This file is part of Zigbee for Domoticz plugin. https://github.com/zigbeefordomoticz/Domoticz-Zigbee +# (C) 2015-2024 +# +# Initial authors: zaraki673 & pipiche38 +# +# SPDX-License-Identifier: GPL-3.0 license +# + +import binascii from Modules.domoMaj import MajDomoDevice -from Modules.tools import checkAndStoreAttributeValue +from Modules.tools import checkAndStoreAttributeValue, getAttributeValue from Modules.zlinky import (ZLINK_CONF_MODEL, ZLinky_TIC_COMMAND, - convert_kva_to_ampere, decode_STEG, linky_mode, - store_ZLinky_infos, + convert_kva_to_ampere, decode_STEG, get_OPTARIF, + linky_mode, store_ZLinky_infos, update_zlinky_device_model_if_needed, zlinky_check_alarm, zlinky_color_tarif, zlinky_totalisateur) def zlinky_clusters(self, Devices, nwkid, ep, cluster, attribut, value): - self.log.logging( "ZLinky", "Debug", "zlinky_clusters %s - %s/%s Attribute: %s Value: %s" % ( - cluster, nwkid, ep, attribut, value), nwkid, ) - - if cluster == "0b01": - zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value) - - elif cluster == "0702": - zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value) - - elif cluster == "0b04": - zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value) + """ + Handles the processing of different ZLinky clusters based on the cluster type. - elif cluster == "ff66": - zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value) - + Parameters: + Devices: The list of devices to process. + nwkid: The network ID of the device. + ep: The endpoint identifier. + cluster: The cluster type (e.g., "0b01", "0702"). + attribut: The attribute name being processed. + value: The value associated with the attribute. + """ + self.log.logging( + "ZLinky", + "Debug", + f"zlinky_clusters {cluster} - {nwkid}/{ep} Attribute: {attribut} Value: {value}", + nwkid + ) + + # Mapping clusters to their corresponding functions + cluster_handlers = { + "0b01": zlinky_meter_identification, + "0702": zlinky_cluster_metering, + "0b04": zlinky_cluster_electrical_measurement, + "ff66": zlinky_cluster_lixee_private + } + + # Call the appropriate handler function if the cluster is found in the mapping + handler = cluster_handlers.get(cluster) + if handler: + handler(self, Devices, nwkid, ep, cluster, attribut, value) + + def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_meter_identification %s - %s/%s Attribute: %s Value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) @@ -50,7 +80,63 @@ def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, val elif attribut == "000e": store_ZLinky_infos( self, nwkid, 'PCOUP', value) + +def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value): + """ + Sets the color based on the counter and tariff information for the given device. + + Parameters: + Devices: The list of devices to process. + nwkid: The network ID of the device. + ep: The endpoint identifier. + cluster: The cluster type (e.g., "0b01"). + attribut: The attribute being processed. + value: The current value of the attribute. + """ + op_tarifiare = get_OPTARIF(self, nwkid) + + # Exit if the tariff is BASE or not one of the supported types + if op_tarifiare == "BASE" or op_tarifiare not in ("TEMPO", "HC.."): + return + + # Get previous value to ensure there's a change + previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut, value) + if value == 0 or previous_value == value: + return + + # Set value based on attribute and tariff type + if attribut == "0000" and op_tarifiare == "HC..": + value = "HP.." + elif attribut == "0100": + if op_tarifiare == "HC..": + value = "HC.." + elif op_tarifiare == "TEMPO": + value = "BHC" + elif attribut == "0102": + if op_tarifiare == "HC..": + value = "HP.." + elif op_tarifiare == "TEMPO": + value = "BHP" + + # If the tariff is not TEMPO, proceed with updating the device + if op_tarifiare != "TEMPO": + MajDomoDevice(self, Devices, nwkid, ep, "0009", str(value), Attribute_="0020") + return + # Handle attributes specific to "TEMPO" tariff + if attribut == "0104": + value = "WHC" + elif attribut == "0106": + value = "WHP" + elif attribut == "0108": + value = "RHC" + elif attribut == "010a": + value = "RHP" + + # Update device with the final value + MajDomoDevice(self, Devices, nwkid, ep, "0009", str(value), Attribute_="0020") + + def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): # Smart Energy Metering @@ -69,7 +155,6 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): self.log.logging("Cluster", "Debug", "Cluster0702 - CURRENT_SUMMATION_RECEIVED %s " % (value), nwkid) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EAIT', value) - elif attribut == "0020": if value == 0: @@ -91,6 +176,7 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): store_ZLinky_infos( self, nwkid, 'HCHC', value) store_ZLinky_infos( self, nwkid, 'EJPHN', value) store_ZLinky_infos( self, nwkid, 'BBRHCJB', value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0102": # HP or BBRHPJB @@ -104,6 +190,7 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): store_ZLinky_infos( self, nwkid, 'HCHP', value) store_ZLinky_infos( self, nwkid, 'EJPHPM', value) store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0104": if value == 0: @@ -113,8 +200,8 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) store_ZLinky_infos( self, nwkid, 'EASF03', value) store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) - elif attribut == "0106": if value == 0: return @@ -123,6 +210,7 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) store_ZLinky_infos( self, nwkid, 'EASF04', value) store_ZLinky_infos( self, nwkid, 'BBRHPJW', value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0108": if value == 0: @@ -132,6 +220,7 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) store_ZLinky_infos( self, nwkid, 'EASF05', value) store_ZLinky_infos( self, nwkid, 'BBRHCJR', value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "010a": if value == 0: @@ -141,6 +230,7 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) store_ZLinky_infos( self, nwkid, 'EASF06', value) store_ZLinky_infos( self, nwkid, 'BBRHPJR', value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "010c": if value == 0: diff --git a/Modules/zlinky.py b/Modules/zlinky.py index e0751aa94..94084cc56 100644 --- a/Modules/zlinky.py +++ b/Modules/zlinky.py @@ -52,6 +52,7 @@ "ZLinky_TIC-standard-tri": (), "ZLinky_TIC-standard-tri-prod": (), } + ZLinky_TIC_COMMAND = { # Mode Historique "0000": "OPTARIF", @@ -99,19 +100,28 @@ "0300": "PROTOCOL Linky" } -def convert_kva_to_ampere( kva ): - return ( kva * 1000) / 200 + +def convert_kva_to_ampere(kva: float) -> float: + """ + Converts kilovolt-amperes (kVA) to amperes (A) assuming a voltage of 200V. + + Parameters: + kva (float): The value in kilovolt-amperes to convert. + + Returns: + float: The equivalent value in amperes. + """ + VOLTAGE = 200 # Assumed voltage in volts + return (kva * 1000) / VOLTAGE + def zlinky_color_tarif(self, MsgSrcAddr, color): - if "ZLinky" not in self.ListOfDevices[MsgSrcAddr]: - self.ListOfDevices[MsgSrcAddr]["ZLinky"] = {} - self.ListOfDevices[MsgSrcAddr]["ZLinky"]["Color"] = color + self.ListOfDevices.setdefault(MsgSrcAddr, {}).setdefault("ZLinky", {})["Color"] = color + -def store_ZLinky_infos( self, nwkid, command_tic, value): +def store_ZLinky_infos(self, nwkid, command_tic, value): + self.ListOfDevices.setdefault(nwkid, {}).setdefault('ZLinky', {})[command_tic] = value - if 'ZLinky' not in self.ListOfDevices[ nwkid ]: - self.ListOfDevices[ nwkid ][ 'ZLinky' ] = {} - self.ListOfDevices[ nwkid ][ 'ZLinky' ][ command_tic ] = value def get_ISOUSC( self, nwkid ): @@ -145,23 +155,53 @@ def get_ISOUSC( self, nwkid ): return 0 -def get_OPTARIF( self, nwkid): - if ( - "ZLinky" in self.ListOfDevices[nwkid] - and "OPTARIF" in self.ListOfDevices[nwkid]["ZLinky"] - ): - return self.ListOfDevices[nwkid]["ZLinky"]["OPTARIF"] +def get_OPTARIF(self, nwkid): + """ + Retrieves the OPTARIF value for a given network ID (nwkid). + If not found, defaults to "BASE". + + Parameters: + nwkid: The network ID to retrieve the value for. + + Returns: + The OPTARIF value as a string or "BASE" if not found. + """ + return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("OPTARIF", "BASE") - return "BASE" -def get_instant_power( self, nwkid ): - return round(float(self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"]["050f"]), 2) if "0b04" in self.ListOfDevices[nwkid]["Ep"]["01"] and "050f" in self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"] else 0 +def get_instant_power(self, nwkid): + """ + Retrieves the instant power value for a given network ID (nwkid). + If not available, returns 0. + + Parameters: + nwkid: The network ID to retrieve the value for. + + Returns: + The instant power value as a rounded float, or 0 if not found. + """ + try: + value = self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"]["050f"] + return round(float(value), 2) + except (KeyError, TypeError, ValueError): + return 0 -def get_tarif_color( self, nwkid ): - return self.ListOfDevices[nwkid]["ZLinky"]["Color"] if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"] else None +def get_tarif_color(self, nwkid): + """ + Retrieves the tarif color for a given network ID (nwkid). + If not found, returns None. + Parameters: + nwkid: The network ID to retrieve the color for. + + Returns: + The color value, or None if not found. + """ + return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("Color") + + def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): if value == 0: @@ -189,55 +229,93 @@ def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): return "00|Normal" - -def linky_mode( self, nwkid , protocol=False): +def linky_mode(self, nwkid, protocol=False): + """ + Retrieves the Linky mode for a given network ID (nwkid). + If not available, returns None. If `protocol` is True, returns the protocol value instead. - if 'ZLinky' not in self.ListOfDevices[ nwkid ]: - return None + Parameters: + nwkid: The network ID to retrieve the mode for. + protocol: A boolean flag to return the protocol value instead of the mode (default is False). - if 'PROTOCOL Linky' not in self.ListOfDevices[ nwkid ]['ZLinky']: - return get_linky_mode_from_ep(self, nwkid ) + Returns: + The Linky mode or protocol, or None if not found. + """ + device = self.ListOfDevices.get(nwkid, {}) + zlinky = device.get('ZLinky', {}) - if self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] in ZLINKY_MODE and not protocol: - return ZLINKY_MODE[ self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] ]["Mode"] - elif protocol: - return self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] - - return None + # Return the protocol value if requested + if protocol: + return zlinky.get('PROTOCOL Linky') + + # Return mode if the protocol is in ZLINKY_MODE + protocol_linky = zlinky.get('PROTOCOL Linky') + if protocol_linky in ZLINKY_MODE: + return ZLINKY_MODE[protocol_linky].get("Mode") + + # Fallback to getting mode from Ep + return get_linky_mode_from_ep(self, nwkid) if protocol_linky is None else None def get_linky_mode_from_ep(self, nwkid): + """ + Retrieves the Linky mode from the "Ep" section of a given network ID (nwkid). + If the mode is found in ZLINKY_MODE, it returns the mode. Otherwise, returns None. + + Parameters: + nwkid: The network ID to retrieve the mode from. + + Returns: + The Linky mode if found in ZLINKY_MODE, or None if not found. + """ ep = self.ListOfDevices.get(nwkid, {}).get("Ep", {}).get("01", {}).get("ff66", {}).get("0300") - return ep if ep in ZLINKY_MODE else None + return ZLINKY_MODE.get(ep) if ep in ZLINKY_MODE else None def linky_device_conf(self, nwkid): + """ + Retrieves the configuration for a Linky device based on its protocol. + If the protocol is not found, it attempts to get the mode from the Ep section. + + Parameters: + nwkid: The network ID of the device. + + Returns: + The configuration value for the Linky device or "ZLinky_TIC" if not found. + """ device = self.ListOfDevices.get(nwkid, {}) zlinky_info = device.get('ZLinky', {}) protocol_linky = zlinky_info.get('PROTOCOL Linky') + # If protocol_linky is not present, try to retrieve the mode from the Ep section if not protocol_linky: mode = get_linky_mode_from_ep(self, nwkid) if mode: self.log.logging("Cluster", "Status", f"linky_device_conf {nwkid} found 0xff66/0x0300: {mode}") zlinky_info['PROTOCOL Linky'] = mode - return ZLINKY_MODE[mode]["Conf"] - else: - return "ZLinky_TIC" + return ZLINKY_MODE.get(mode, {}).get("Conf", "ZLinky_TIC") - if protocol_linky not in ZLINKY_MODE: - return "ZLinky_TIC" - - self.log.logging("Cluster", "Debug", f"linky_device_conf {nwkid} found Protocol Linky: {protocol_linky}") - return ZLINKY_MODE[protocol_linky]["Conf"] + # If the protocol is in ZLINKY_MODE, return its configuration + if protocol_linky in ZLINKY_MODE: + self.log.logging("Cluster", "Debug", f"linky_device_conf {nwkid} found Protocol Linky: {protocol_linky}") + return ZLINKY_MODE[protocol_linky].get("Conf", "ZLinky_TIC") + + return "ZLinky_TIC" -def linky_upgrade_authorized( current_model, new_model ): +def linky_upgrade_authorized(current_model, new_model): + """ + Checks if an upgrade from the current model to the new model is authorized. + + Parameters: + current_model: The model currently in use. + new_model: The model to upgrade to. + + Returns: + True if the upgrade is authorized, otherwise False. + """ + return ZLINKY_UPGRADE_PATHS.get(current_model, {}).get(new_model, False) - return ( - current_model in ZLINKY_UPGRADE_PATHS - and new_model in ZLINKY_UPGRADE_PATHS[current_model] - ) def update_zlinky_device_model_if_needed( self, nwkid ): @@ -278,6 +356,7 @@ def update_zlinky_device_model_if_needed( self, nwkid ): if "Heartbeat" in self.ListOfDevices[nwkid]: self.ListOfDevices[nwkid]["Heartbeat"] = "-1" + CONTACT_SEC = { 0: "fermé", 1: "ouvert" From efe279a97d0f505b67e415eebb389ef3e10b2157 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Sun, 5 Jan 2025 15:52:20 +0100 Subject: [PATCH 02/17] improve color detection --- DevicesModules/custom_zlinky.py | 34 ++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index c3d885c3e..f68549195 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -17,7 +17,7 @@ from Modules.tools import checkAndStoreAttributeValue, getAttributeValue from Modules.zlinky import (ZLINK_CONF_MODEL, ZLinky_TIC_COMMAND, convert_kva_to_ampere, decode_STEG, get_OPTARIF, - linky_mode, store_ZLinky_infos, + get_tarif_color, linky_mode, store_ZLinky_infos, update_zlinky_device_model_if_needed, zlinky_check_alarm, zlinky_color_tarif, zlinky_totalisateur) @@ -103,38 +103,46 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut, value) if value == 0 or previous_value == value: return + + previous_color = get_tarif_color(self, nwkid) # Set value based on attribute and tariff type if attribut == "0000" and op_tarifiare == "HC..": - value = "HP.." + new_color = "HP.." elif attribut == "0100": if op_tarifiare == "HC..": - value = "HC.." + new_color = "HC.." elif op_tarifiare == "TEMPO": - value = "BHC" + new_color = "BHC" elif attribut == "0102": if op_tarifiare == "HC..": - value = "HP.." + new_color = "HP.." elif op_tarifiare == "TEMPO": - value = "BHP" + new_color = "BHP" # If the tariff is not TEMPO, proceed with updating the device if op_tarifiare != "TEMPO": - MajDomoDevice(self, Devices, nwkid, ep, "0009", str(value), Attribute_="0020") + if new_color != previous_color: + self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter from {previous_color} to {new_color}", nwkid) + MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") + zlinky_color_tarif(self, nwkid, new_color) return # Handle attributes specific to "TEMPO" tariff if attribut == "0104": - value = "WHC" + new_color = "WHC" elif attribut == "0106": - value = "WHP" + new_color = "WHP" elif attribut == "0108": - value = "RHC" + new_color = "RHC" elif attribut == "010a": - value = "RHP" + new_color = "RHP" - # Update device with the final value - MajDomoDevice(self, Devices, nwkid, ep, "0009", str(value), Attribute_="0020") + if new_color != previous_color: + # Update device with the final value + self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter", nwkid) + MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") + zlinky_color_tarif(self, nwkid, new_color) def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): From 0e2cc884af863f58553bd28fe4e8e50d018fd01d Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Sun, 5 Jan 2025 15:55:57 +0100 Subject: [PATCH 03/17] fix syntax --- DevicesModules/custom_zlinky.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index f68549195..ac01c0ae3 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -100,7 +100,7 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu return # Get previous value to ensure there's a change - previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut, value) + previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut) if value == 0 or previous_value == value: return From 2100d2676f7b6993b313b2bc1317dc27622e4812 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Mon, 6 Jan 2025 15:42:11 +0100 Subject: [PATCH 04/17] fix several issues --- DevicesModules/custom_zlinky.py | 106 ++++++++------------ Modules/zlinky.py | 171 +++++++++----------------------- 2 files changed, 89 insertions(+), 188 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index ac01c0ae3..605e5bb6d 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -9,7 +9,6 @@ # Initial authors: zaraki673 & pipiche38 # # SPDX-License-Identifier: GPL-3.0 license -# import binascii @@ -24,42 +23,26 @@ def zlinky_clusters(self, Devices, nwkid, ep, cluster, attribut, value): - """ - Handles the processing of different ZLinky clusters based on the cluster type. - - Parameters: - Devices: The list of devices to process. - nwkid: The network ID of the device. - ep: The endpoint identifier. - cluster: The cluster type (e.g., "0b01", "0702"). - attribut: The attribute name being processed. - value: The value associated with the attribute. - """ - self.log.logging( - "ZLinky", - "Debug", - f"zlinky_clusters {cluster} - {nwkid}/{ep} Attribute: {attribut} Value: {value}", - nwkid - ) - - # Mapping clusters to their corresponding functions - cluster_handlers = { - "0b01": zlinky_meter_identification, - "0702": zlinky_cluster_metering, - "0b04": zlinky_cluster_electrical_measurement, - "ff66": zlinky_cluster_lixee_private - } - - # Call the appropriate handler function if the cluster is found in the mapping - handler = cluster_handlers.get(cluster) - if handler: - handler(self, Devices, nwkid, ep, cluster, attribut, value) + self.log.logging( "ZLinky", "Debug", "zlinky_clusters %s - %s/%s Attribute: %s Value: %s" % ( + cluster, nwkid, ep, attribut, value), nwkid, ) + + if cluster == "0b01": + zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value) + + elif cluster == "0702": + zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value) + + elif cluster == "0b04": + zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value) + + elif cluster == "ff66": + zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value) def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_meter_identification %s - %s/%s Attribute: %s Value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) - + checkAndStoreAttributeValue( self, nwkid, ep, cluster, attribut, value, ) if attribut == "000d": # Looks like in standard mode PREF is in VA while in historique mode ISOUSC is in A @@ -73,14 +56,14 @@ def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, val # Mode standard store_ZLinky_infos( self, nwkid, 'PREF', value) store_ZLinky_infos( self, nwkid, 'ISOUSC', convert_kva_to_ampere(value) ) - + elif attribut == "000a": store_ZLinky_infos( self, nwkid, 'VTIC', value) - + elif attribut == "000e": store_ZLinky_infos( self, nwkid, 'PCOUP', value) - + def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value): """ Sets the color based on the counter and tariff information for the given device. @@ -98,12 +81,12 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu # Exit if the tariff is BASE or not one of the supported types if op_tarifiare == "BASE" or op_tarifiare not in ("TEMPO", "HC.."): return - + # Get previous value to ensure there's a change previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut) if value == 0 or previous_value == value: return - + previous_color = get_tarif_color(self, nwkid) # Set value based on attribute and tariff type @@ -119,7 +102,7 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu new_color = "HP.." elif op_tarifiare == "TEMPO": new_color = "BHP" - + # If the tariff is not TEMPO, proceed with updating the device if op_tarifiare != "TEMPO": if new_color != previous_color: @@ -143,8 +126,8 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter", nwkid) MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") zlinky_color_tarif(self, nwkid, new_color) - - + + def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): # Smart Energy Metering @@ -180,11 +163,11 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF01', value) store_ZLinky_infos( self, nwkid, 'HCHC', value) store_ZLinky_infos( self, nwkid, 'EJPHN', value) store_ZLinky_infos( self, nwkid, 'BBRHCJB', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0102": # HP or BBRHPJB @@ -194,11 +177,11 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF02', value) store_ZLinky_infos( self, nwkid, 'HCHP', value) store_ZLinky_infos( self, nwkid, 'EJPHPM', value) store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0104": if value == 0: @@ -206,9 +189,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF03', value) store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0106": if value == 0: @@ -216,9 +199,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF04', value) store_ZLinky_infos( self, nwkid, 'BBRHPJW', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0108": if value == 0: @@ -226,9 +209,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF05', value) store_ZLinky_infos( self, nwkid, 'BBRHCJR', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "010a": if value == 0: @@ -236,9 +219,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF06', value) store_ZLinky_infos( self, nwkid, 'BBRHPJR', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "010c": if value == 0: @@ -267,10 +250,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): elif attribut == "0307": # PRM store_ZLinky_infos( self, nwkid, 'PRM', value) - + elif attribut == "0308": # Serial Number self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0308 - Serial Number %s" % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'ADC0', value) store_ZLinky_infos( self, nwkid, 'ADSC', value) @@ -286,15 +268,14 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut == "050e": store_ZLinky_infos( self, nwkid, 'ERQ2', value) - + elif attribut == "090e": store_ZLinky_infos( self, nwkid, 'ERQ3', value) elif attribut == "0a0e": store_ZLinky_infos( self, nwkid, 'ERQ4', value) - + elif attribut == "050b": # Active Power - self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Power %s" % (cluster, nwkid, ep, value)) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value)) @@ -302,7 +283,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut == "090b": store_ZLinky_infos( self, nwkid, 'CCASN-1',value) - + elif attribut in ("0505", "0905", "0a05"): # RMS Voltage self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Voltage %s" % (cluster, nwkid, ep, value)) if value == 0xFFFF: @@ -354,13 +335,13 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att if value == 0x8000: return checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - + _linkyMode = linky_mode( self, nwkid, protocol=True ) - + if _linkyMode in ( 0, 2,) and attribut == "050d": # Historique Tri store_ZLinky_infos( self, nwkid, 'PMAX', value) - + elif _linkyMode in ( 1, 5, ) and attribut == "050d": # Historic Mono store_ZLinky_infos( self, nwkid, 'SMAXN', value) @@ -402,18 +383,17 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att self.log.logging( "ZLinky", "Error", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s out of range !!!" % (cluster, nwkid, ep, value), nwkid, ) return checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - + self.log.logging( "ZLinky", "Debug", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, ) # ApparentPower (Represents the single phase or Phase A, current demand of apparent (Square root of active and reactive power) power, in VA.) self.log.logging( "ZLinky", "Debug", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, ) - _linkyMode = linky_mode( self, nwkid, protocol=True ) - + if _linkyMode in ( 0, 2,) and attribut == "050f": # Historique Tri store_ZLinky_infos( self, nwkid, 'PAPP', value) - + elif _linkyMode in ( 1, 5, ) and attribut == "050f": # Historic Mono store_ZLinky_infos( self, nwkid, 'SINSTS', value) @@ -431,7 +411,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att self.log.logging( "ZLinky", "Error", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Unexpected %s/%s linkyMode: %s" % ( cluster, nwkid, ep, attribut, value, _linkyMode ), nwkid, ) return - + tarif_color = None if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"]: tarif_color = self.ListOfDevices[nwkid]["ZLinky"]["Color"] @@ -460,7 +440,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut in ( "0a0f", ): store_ZLinky_infos( self, nwkid, 'SINSTS3', value) - + elif attribut in ("0908", "0a08"): # Current Phase 2 and Current Phase 3 # from random import randrange # value = randrange( 0x0, 0x3c) @@ -478,7 +458,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L3 %s" % (cluster, nwkid, ep, attribut, value), nwkid) MajDomoDevice( self, Devices, nwkid, "f3", "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", ) store_ZLinky_infos( self, nwkid, 'IRMS3', value) - + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "0511": @@ -489,7 +469,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut == "0a11": store_ZLinky_infos( self, nwkid, 'UMOY3', value) - + def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value): if nwkid not in self.ListOfDevices: diff --git a/Modules/zlinky.py b/Modules/zlinky.py index 94084cc56..e0751aa94 100644 --- a/Modules/zlinky.py +++ b/Modules/zlinky.py @@ -52,7 +52,6 @@ "ZLinky_TIC-standard-tri": (), "ZLinky_TIC-standard-tri-prod": (), } - ZLinky_TIC_COMMAND = { # Mode Historique "0000": "OPTARIF", @@ -100,28 +99,19 @@ "0300": "PROTOCOL Linky" } - -def convert_kva_to_ampere(kva: float) -> float: - """ - Converts kilovolt-amperes (kVA) to amperes (A) assuming a voltage of 200V. - - Parameters: - kva (float): The value in kilovolt-amperes to convert. - - Returns: - float: The equivalent value in amperes. - """ - VOLTAGE = 200 # Assumed voltage in volts - return (kva * 1000) / VOLTAGE - +def convert_kva_to_ampere( kva ): + return ( kva * 1000) / 200 def zlinky_color_tarif(self, MsgSrcAddr, color): - self.ListOfDevices.setdefault(MsgSrcAddr, {}).setdefault("ZLinky", {})["Color"] = color - + if "ZLinky" not in self.ListOfDevices[MsgSrcAddr]: + self.ListOfDevices[MsgSrcAddr]["ZLinky"] = {} + self.ListOfDevices[MsgSrcAddr]["ZLinky"]["Color"] = color -def store_ZLinky_infos(self, nwkid, command_tic, value): - self.ListOfDevices.setdefault(nwkid, {}).setdefault('ZLinky', {})[command_tic] = value +def store_ZLinky_infos( self, nwkid, command_tic, value): + if 'ZLinky' not in self.ListOfDevices[ nwkid ]: + self.ListOfDevices[ nwkid ][ 'ZLinky' ] = {} + self.ListOfDevices[ nwkid ][ 'ZLinky' ][ command_tic ] = value def get_ISOUSC( self, nwkid ): @@ -155,53 +145,23 @@ def get_ISOUSC( self, nwkid ): return 0 +def get_OPTARIF( self, nwkid): -def get_OPTARIF(self, nwkid): - """ - Retrieves the OPTARIF value for a given network ID (nwkid). - If not found, defaults to "BASE". - - Parameters: - nwkid: The network ID to retrieve the value for. - - Returns: - The OPTARIF value as a string or "BASE" if not found. - """ - return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("OPTARIF", "BASE") + if ( + "ZLinky" in self.ListOfDevices[nwkid] + and "OPTARIF" in self.ListOfDevices[nwkid]["ZLinky"] + ): + return self.ListOfDevices[nwkid]["ZLinky"]["OPTARIF"] + return "BASE" -def get_instant_power(self, nwkid): - """ - Retrieves the instant power value for a given network ID (nwkid). - If not available, returns 0. - - Parameters: - nwkid: The network ID to retrieve the value for. - - Returns: - The instant power value as a rounded float, or 0 if not found. - """ - try: - value = self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"]["050f"] - return round(float(value), 2) - except (KeyError, TypeError, ValueError): - return 0 +def get_instant_power( self, nwkid ): + return round(float(self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"]["050f"]), 2) if "0b04" in self.ListOfDevices[nwkid]["Ep"]["01"] and "050f" in self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"] else 0 +def get_tarif_color( self, nwkid ): + return self.ListOfDevices[nwkid]["ZLinky"]["Color"] if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"] else None -def get_tarif_color(self, nwkid): - """ - Retrieves the tarif color for a given network ID (nwkid). - If not found, returns None. - Parameters: - nwkid: The network ID to retrieve the color for. - - Returns: - The color value, or None if not found. - """ - return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("Color") - - def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): if value == 0: @@ -229,93 +189,55 @@ def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): return "00|Normal" -def linky_mode(self, nwkid, protocol=False): - """ - Retrieves the Linky mode for a given network ID (nwkid). - If not available, returns None. If `protocol` is True, returns the protocol value instead. - - Parameters: - nwkid: The network ID to retrieve the mode for. - protocol: A boolean flag to return the protocol value instead of the mode (default is False). - - Returns: - The Linky mode or protocol, or None if not found. - """ - device = self.ListOfDevices.get(nwkid, {}) - zlinky = device.get('ZLinky', {}) + +def linky_mode( self, nwkid , protocol=False): - # Return the protocol value if requested - if protocol: - return zlinky.get('PROTOCOL Linky') + if 'ZLinky' not in self.ListOfDevices[ nwkid ]: + return None - # Return mode if the protocol is in ZLINKY_MODE - protocol_linky = zlinky.get('PROTOCOL Linky') - if protocol_linky in ZLINKY_MODE: - return ZLINKY_MODE[protocol_linky].get("Mode") + if 'PROTOCOL Linky' not in self.ListOfDevices[ nwkid ]['ZLinky']: + return get_linky_mode_from_ep(self, nwkid ) - # Fallback to getting mode from Ep - return get_linky_mode_from_ep(self, nwkid) if protocol_linky is None else None + if self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] in ZLINKY_MODE and not protocol: + return ZLINKY_MODE[ self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] ]["Mode"] + elif protocol: + return self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] + + return None def get_linky_mode_from_ep(self, nwkid): - """ - Retrieves the Linky mode from the "Ep" section of a given network ID (nwkid). - If the mode is found in ZLINKY_MODE, it returns the mode. Otherwise, returns None. - - Parameters: - nwkid: The network ID to retrieve the mode from. - - Returns: - The Linky mode if found in ZLINKY_MODE, or None if not found. - """ ep = self.ListOfDevices.get(nwkid, {}).get("Ep", {}).get("01", {}).get("ff66", {}).get("0300") - return ZLINKY_MODE.get(ep) if ep in ZLINKY_MODE else None + return ep if ep in ZLINKY_MODE else None def linky_device_conf(self, nwkid): - """ - Retrieves the configuration for a Linky device based on its protocol. - If the protocol is not found, it attempts to get the mode from the Ep section. - - Parameters: - nwkid: The network ID of the device. - - Returns: - The configuration value for the Linky device or "ZLinky_TIC" if not found. - """ device = self.ListOfDevices.get(nwkid, {}) zlinky_info = device.get('ZLinky', {}) protocol_linky = zlinky_info.get('PROTOCOL Linky') - # If protocol_linky is not present, try to retrieve the mode from the Ep section if not protocol_linky: mode = get_linky_mode_from_ep(self, nwkid) if mode: self.log.logging("Cluster", "Status", f"linky_device_conf {nwkid} found 0xff66/0x0300: {mode}") zlinky_info['PROTOCOL Linky'] = mode - return ZLINKY_MODE.get(mode, {}).get("Conf", "ZLinky_TIC") + return ZLINKY_MODE[mode]["Conf"] + else: + return "ZLinky_TIC" - # If the protocol is in ZLINKY_MODE, return its configuration - if protocol_linky in ZLINKY_MODE: - self.log.logging("Cluster", "Debug", f"linky_device_conf {nwkid} found Protocol Linky: {protocol_linky}") - return ZLINKY_MODE[protocol_linky].get("Conf", "ZLinky_TIC") - - return "ZLinky_TIC" + if protocol_linky not in ZLINKY_MODE: + return "ZLinky_TIC" + + self.log.logging("Cluster", "Debug", f"linky_device_conf {nwkid} found Protocol Linky: {protocol_linky}") + return ZLINKY_MODE[protocol_linky]["Conf"] -def linky_upgrade_authorized(current_model, new_model): - """ - Checks if an upgrade from the current model to the new model is authorized. - - Parameters: - current_model: The model currently in use. - new_model: The model to upgrade to. - - Returns: - True if the upgrade is authorized, otherwise False. - """ - return ZLINKY_UPGRADE_PATHS.get(current_model, {}).get(new_model, False) +def linky_upgrade_authorized( current_model, new_model ): + return ( + current_model in ZLINKY_UPGRADE_PATHS + and new_model in ZLINKY_UPGRADE_PATHS[current_model] + ) def update_zlinky_device_model_if_needed( self, nwkid ): @@ -356,7 +278,6 @@ def update_zlinky_device_model_if_needed( self, nwkid ): if "Heartbeat" in self.ListOfDevices[nwkid]: self.ListOfDevices[nwkid]["Heartbeat"] = "-1" - CONTACT_SEC = { 0: "fermé", 1: "ouvert" From e92096e2b8f01ad4fa67af7adf1720dcab00e425 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Wed, 8 Jan 2025 17:57:50 +0100 Subject: [PATCH 05/17] adding more debug --- DevicesModules/custom_zlinky.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index 605e5bb6d..c51d88a27 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -76,19 +76,22 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu attribut: The attribute being processed. value: The current value of the attribute. """ + self.log.logging( "ZLinky", "Debug", f"zlinky_set_color_based_on_counter Cluster: {cluster} Attribute: {attribut} Value: {value}", nwkid) + op_tarifiare = get_OPTARIF(self, nwkid) - + self.log.logging( "ZLinky", "Debug", f"OPTARIF {op_tarifiare}", nwkid) # Exit if the tariff is BASE or not one of the supported types if op_tarifiare == "BASE" or op_tarifiare not in ("TEMPO", "HC.."): return # Get previous value to ensure there's a change previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut) + previous_color = get_tarif_color(self, nwkid) + self.log.logging( "ZLinky", "Debug", f"PrevValue {previous_value} PrevColor {previous_color}", nwkid) + if value == 0 or previous_value == value: return - previous_color = get_tarif_color(self, nwkid) - # Set value based on attribute and tariff type if attribut == "0000" and op_tarifiare == "HC..": new_color = "HP.." @@ -103,8 +106,10 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu elif op_tarifiare == "TEMPO": new_color = "BHP" - # If the tariff is not TEMPO, proceed with updating the device + + # If the tariff is not TEMPO, proceed with updating the device (HC/HP) if op_tarifiare != "TEMPO": + self.log.logging( "ZLinky", "Debug", f"Not TEMPO / PrevColor {previous_color} NewColor {new_color}", nwkid) if new_color != previous_color: self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter from {previous_color} to {new_color}", nwkid) MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") @@ -122,6 +127,7 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu new_color = "RHP" if new_color != previous_color: + self.log.logging( "ZLinky", "Debug", f"PrevColor {previous_color} NewColor {new_color}", nwkid) # Update device with the final value self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter", nwkid) MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") From 8a6b7ed4def81ff3b5ea1ce8d549a67ef360f586 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Wed, 8 Jan 2025 18:06:42 +0100 Subject: [PATCH 06/17] refactor --- DevicesModules/custom_zlinky.py | 104 +++++++++++++++----------------- 1 file changed, 48 insertions(+), 56 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index c51d88a27..0d15f716c 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -63,7 +63,6 @@ def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, val elif attribut == "000e": store_ZLinky_infos( self, nwkid, 'PCOUP', value) - def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value): """ Sets the color based on the counter and tariff information for the given device. @@ -76,62 +75,54 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu attribut: The attribute being processed. value: The current value of the attribute. """ - self.log.logging( "ZLinky", "Debug", f"zlinky_set_color_based_on_counter Cluster: {cluster} Attribute: {attribut} Value: {value}", nwkid) + def update_color(nwkid, previous_color, new_color): + """Update the device color if it has changed.""" + if new_color != previous_color: + self.log.logging("ZLinky", "Status", f"Updating ZLinky color from {previous_color} to {new_color}", nwkid) + MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") + zlinky_color_tarif(self, nwkid, new_color) + + def get_new_color(attribut, op_tarifiare): + """Determine the new color based on the attribute and tariff type.""" + color_map = { + "HC..": {"0000": "HP..", "0100": "HC..", "0102": "HP.."}, + "TEMPO": {"0100": "BHC", "0102": "BHP", "0104": "WHC", "0106": "WHP", "0108": "RHC", "010a": "RHP"} + } + return color_map.get(op_tarifiare, {}).get(attribut) + + self.log.logging("ZLinky", "Debug", f"Cluster: {cluster}, Attribute: {attribut}, Value: {value}", nwkid) + # Fetch current tariff op_tarifiare = get_OPTARIF(self, nwkid) - self.log.logging( "ZLinky", "Debug", f"OPTARIF {op_tarifiare}", nwkid) - # Exit if the tariff is BASE or not one of the supported types - if op_tarifiare == "BASE" or op_tarifiare not in ("TEMPO", "HC.."): + self.log.logging("ZLinky", "Debug", f"OPTARIF: {op_tarifiare}", nwkid) + + # Exit early for unsupported tariffs + if op_tarifiare == "BASE" or op_tarifiare not in {"TEMPO", "HC.."}: return - # Get previous value to ensure there's a change + # Get previous values previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut) previous_color = get_tarif_color(self, nwkid) - self.log.logging( "ZLinky", "Debug", f"PrevValue {previous_value} PrevColor {previous_color}", nwkid) + self.log.logging("ZLinky", "Debug", f"PrevValue: {previous_value}, PrevColor: {previous_color}", nwkid) + # Exit if value is zero or hasn't changed if value == 0 or previous_value == value: return - # Set value based on attribute and tariff type - if attribut == "0000" and op_tarifiare == "HC..": - new_color = "HP.." - elif attribut == "0100": - if op_tarifiare == "HC..": - new_color = "HC.." - elif op_tarifiare == "TEMPO": - new_color = "BHC" - elif attribut == "0102": - if op_tarifiare == "HC..": - new_color = "HP.." - elif op_tarifiare == "TEMPO": - new_color = "BHP" + # Determine the new color + new_color = get_new_color(attribut, op_tarifiare) + if not new_color: + return - - # If the tariff is not TEMPO, proceed with updating the device (HC/HP) + # Handle updates for non-TEMPO tariffs if op_tarifiare != "TEMPO": - self.log.logging( "ZLinky", "Debug", f"Not TEMPO / PrevColor {previous_color} NewColor {new_color}", nwkid) - if new_color != previous_color: - self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter from {previous_color} to {new_color}", nwkid) - MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") - zlinky_color_tarif(self, nwkid, new_color) + self.log.logging("ZLinky", "Debug", f"Non-TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) + update_color(nwkid, previous_color, new_color) return - # Handle attributes specific to "TEMPO" tariff - if attribut == "0104": - new_color = "WHC" - elif attribut == "0106": - new_color = "WHP" - elif attribut == "0108": - new_color = "RHC" - elif attribut == "010a": - new_color = "RHP" - - if new_color != previous_color: - self.log.logging( "ZLinky", "Debug", f"PrevColor {previous_color} NewColor {new_color}", nwkid) - # Update device with the final value - self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter", nwkid) - MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") - zlinky_color_tarif(self, nwkid, new_color) + # Handle updates for TEMPO-specific tariffs + self.log.logging("ZLinky", "Debug", f"TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) + update_color(nwkid, previous_color, new_color) def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): @@ -143,10 +134,10 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): if attribut == "0000": # CurrentSummationDelivered # HP or Base self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0000 ZLinky_TIC Value: %s" % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) store_ZLinky_infos( self, nwkid, 'BASE', value) store_ZLinky_infos( self, nwkid, 'EAST', value) + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "0001": # CURRENT_SUMMATION_RECEIVED self.log.logging("Cluster", "Debug", "Cluster0702 - CURRENT_SUMMATION_RECEIVED %s " % (value), nwkid) @@ -166,68 +157,69 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): if value == "": return self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0100 ZLinky_TIC Conso: %s " % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF01', value) store_ZLinky_infos( self, nwkid, 'HCHC', value) store_ZLinky_infos( self, nwkid, 'EJPHN', value) store_ZLinky_infos( self, nwkid, 'BBRHCJB', value) + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "0102": # HP or BBRHPJB if value == 0: return self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0100 ZLinky_TIC Conso: %s " % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF02', value) store_ZLinky_infos( self, nwkid, 'HCHP', value) store_ZLinky_infos( self, nwkid, 'EJPHPM', value) store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "0104": if value == 0: return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF03', value) store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "0106": if value == 0: return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF04', value) store_ZLinky_infos( self, nwkid, 'BBRHPJW', value) + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "0108": if value == 0: return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF05', value) store_ZLinky_infos( self, nwkid, 'BBRHCJR', value) + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "010a": if value == 0: return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) + store_ZLinky_infos( self, nwkid, 'EASF06', value) store_ZLinky_infos( self, nwkid, 'BBRHPJR', value) + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "010c": if value == 0: From b752d6ff8ad6c844697dcd0607f34e707f968bea Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Wed, 8 Jan 2025 18:13:59 +0100 Subject: [PATCH 07/17] refactor zlinky_cluster_metering --- DevicesModules/custom_zlinky.py | 185 ++++++++++---------------------- 1 file changed, 59 insertions(+), 126 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index 0d15f716c..a801227f3 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -126,134 +126,67 @@ def get_new_color(attribut, op_tarifiare): def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): - # Smart Energy Metering - - self.log.logging( "ZLinky", "Debug", "zlinky_cluster_metering - %s - %s/%s attribut: %s value: %s" % ( - cluster, nwkid, ep, attribut, value), nwkid, ) - - if attribut == "0000": # CurrentSummationDelivered - # HP or Base - self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0000 ZLinky_TIC Value: %s" % (value), nwkid, ) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'BASE', value) - store_ZLinky_infos( self, nwkid, 'EAST', value) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - - elif attribut == "0001": # CURRENT_SUMMATION_RECEIVED - self.log.logging("Cluster", "Debug", "Cluster0702 - CURRENT_SUMMATION_RECEIVED %s " % (value), nwkid) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EAIT', value) - - elif attribut == "0020": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, "0009", str(value), Attribute_="0020") - zlinky_color_tarif(self, nwkid, str(value)) - store_ZLinky_infos( self, nwkid, 'PTEC', value) - - elif attribut == "0100": - # HC or Base or BBRHCJB - if value == "": - return - self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0100 ZLinky_TIC Conso: %s " % (value), nwkid, ) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF01', value) - store_ZLinky_infos( self, nwkid, 'HCHC', value) - store_ZLinky_infos( self, nwkid, 'EJPHN', value) - store_ZLinky_infos( self, nwkid, 'BBRHCJB', value) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - - elif attribut == "0102": - # HP or BBRHPJB - if value == 0: - return - self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0100 ZLinky_TIC Conso: %s " % (value), nwkid, ) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF02', value) - store_ZLinky_infos( self, nwkid, 'HCHP', value) - store_ZLinky_infos( self, nwkid, 'EJPHPM', value) - store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - - elif attribut == "0104": - if value == 0: - return - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF03', value) - store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - - elif attribut == "0106": - if value == 0: - return - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF04', value) - store_ZLinky_infos( self, nwkid, 'BBRHPJW', value) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - - elif attribut == "0108": - if value == 0: - return - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF05', value) - store_ZLinky_infos( self, nwkid, 'BBRHCJR', value) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - - elif attribut == "010a": - if value == 0: - return - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) - - store_ZLinky_infos( self, nwkid, 'EASF06', value) - store_ZLinky_infos( self, nwkid, 'BBRHPJR', value) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - - elif attribut == "010c": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EASF07', value) - - elif attribut == "010e": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EASF08', value) - - elif attribut == "0110": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EASF09', value) + """ + Handles Smart Energy Metering cluster attributes and updates devices accordingly. - elif attribut == "0112": - if value == 0: + Parameters: + Devices: The list of devices to process. + nwkid: The network ID of the device. + ep: The endpoint identifier. + cluster: The cluster type. + attribut: The attribute being processed. + value: The current value of the attribute. + """ + def handle_attribut_value(attribut, store_keys=None, update_color=False, totalize=False, maj_ep=None): + """Helper function to handle attribute values.""" + if not value: return - - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EASF10', value) - - elif attribut == "0307": # PRM - store_ZLinky_infos( self, nwkid, 'PRM', value) - - elif attribut == "0308": # Serial Number - self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0308 - Serial Number %s" % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'ADC0', value) - store_ZLinky_infos( self, nwkid, 'ADSC', value) + self.log.logging("ZLinky", "Debug", f"Cluster0702 - {attribut} ZLinky_TIC Value: {value}", nwkid) + maj_ep = maj_ep or ep + MajDomoDevice(self, Devices, nwkid, maj_ep, cluster, str(value), Attribute_=attribut) + + if update_color: + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) + + if totalize: + zlinky_totalisateur(self, nwkid, attribut, value) + + if store_keys: + for key in store_keys: + store_ZLinky_infos(self, nwkid, key, value) + + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) + + self.log.logging( + "ZLinky", "Debug", + f"zlinky_cluster_metering - {cluster} - {nwkid}/{ep} attribut: {attribut} value: {value}", + nwkid, + ) + + # Define attribute handlers + attribute_handlers = { + "0000": lambda: handle_attribut_value("0000", ["BASE", "EAST"]), + "0001": lambda: handle_attribut_value("0001", ["EAIT"]), + "0020": lambda: handle_attribut_value("0020", ["PTEC"], maj_ep="0009"), + "0100": lambda: handle_attribut_value("0100", ["EASF01", "HCHC", "EJPHN", "BBRHCJB"], update_color=True, totalize=True), + "0102": lambda: handle_attribut_value("0102", ["EASF02", "HCHP", "EJPHPM", "BBRHCJW"], update_color=True, totalize=True), + "0104": lambda: handle_attribut_value("0104", ["EASF03", "BBRHCJW"], update_color=True, totalize=True, maj_ep="f2"), + "0106": lambda: handle_attribut_value("0106", ["EASF04", "BBRHPJW"], update_color=True, totalize=True, maj_ep="f2"), + "0108": lambda: handle_attribut_value("0108", ["EASF05", "BBRHCJR"], update_color=True, totalize=True, maj_ep="f3"), + "010a": lambda: handle_attribut_value("010a", ["EASF06", "BBRHPJR"], update_color=True, totalize=True, maj_ep="f3"), + "010c": lambda: handle_attribut_value("010c", ["EASF07"]), + "010e": lambda: handle_attribut_value("010e", ["EASF08"]), + "0110": lambda: handle_attribut_value("0110", ["EASF09"]), + "0112": lambda: handle_attribut_value("0112", ["EASF10"]), + "0307": lambda: store_ZLinky_infos(self, nwkid, "PRM", value), + "0308": lambda: handle_attribut_value("0308", ["ADC0", "ADSC"]), + } + + # Process attribute using handler + if attribut in attribute_handlers: + attribute_handlers[attribut]() + else: + self.log.logging("ZLinky", "Warning", f"Unhandled attribute: {attribut}", nwkid) def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value): From 73ab19f68ee766e19e917a7cedc7302e479a2207 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Wed, 8 Jan 2025 18:18:50 +0100 Subject: [PATCH 08/17] rename Devices into domoticz_devices --- DevicesModules/custom_zlinky.py | 86 ++++++++++++++++----------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index a801227f3..4f47d22ea 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -22,24 +22,24 @@ zlinky_totalisateur) -def zlinky_clusters(self, Devices, nwkid, ep, cluster, attribut, value): +def zlinky_clusters(self, domoticz_devices, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_clusters %s - %s/%s Attribute: %s Value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) if cluster == "0b01": - zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value) + zlinky_meter_identification(self, domoticz_devices, nwkid, ep, cluster, attribut, value) elif cluster == "0702": - zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value) + zlinky_cluster_metering(self, domoticz_devices, nwkid, ep, cluster, attribut, value) elif cluster == "0b04": - zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value) + zlinky_cluster_electrical_measurement(self, domoticz_devices, nwkid, ep, cluster, attribut, value) elif cluster == "ff66": - zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value) + zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, attribut, value) -def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value): +def zlinky_meter_identification(self, domoticz_devices, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_meter_identification %s - %s/%s Attribute: %s Value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) @@ -63,12 +63,12 @@ def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, val elif attribut == "000e": store_ZLinky_infos( self, nwkid, 'PCOUP', value) -def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value): +def zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster, attribut, value): """ Sets the color based on the counter and tariff information for the given device. Parameters: - Devices: The list of devices to process. + domoticz_devices: The list of domoticz_devices to process. nwkid: The network ID of the device. ep: The endpoint identifier. cluster: The cluster type (e.g., "0b01"). @@ -79,7 +79,7 @@ def update_color(nwkid, previous_color, new_color): """Update the device color if it has changed.""" if new_color != previous_color: self.log.logging("ZLinky", "Status", f"Updating ZLinky color from {previous_color} to {new_color}", nwkid) - MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", new_color, Attribute_="0020") zlinky_color_tarif(self, nwkid, new_color) def get_new_color(attribut, op_tarifiare): @@ -125,12 +125,12 @@ def get_new_color(attribut, op_tarifiare): update_color(nwkid, previous_color, new_color) -def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): +def zlinky_cluster_metering(self, domoticz_devices, nwkid, ep, cluster, attribut, value): """ - Handles Smart Energy Metering cluster attributes and updates devices accordingly. + Handles Smart Energy Metering cluster attributes and updates domoticz_devices accordingly. Parameters: - Devices: The list of devices to process. + domoticz_devices: The list of domoticz_devices to process. nwkid: The network ID of the device. ep: The endpoint identifier. cluster: The cluster type. @@ -143,10 +143,10 @@ def handle_attribut_value(attribut, store_keys=None, update_color=False, totaliz return self.log.logging("ZLinky", "Debug", f"Cluster0702 - {attribut} ZLinky_TIC Value: {value}", nwkid) maj_ep = maj_ep or ep - MajDomoDevice(self, Devices, nwkid, maj_ep, cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, maj_ep, cluster, str(value), Attribute_=attribut) if update_color: - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) + zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster, attribut, value) if totalize: zlinky_totalisateur(self, nwkid, attribut, value) @@ -189,7 +189,7 @@ def handle_attribut_value(attribut, store_keys=None, update_color=False, totaliz self.log.logging("ZLinky", "Warning", f"Unhandled attribute: {attribut}", nwkid) -def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value): +def zlinky_cluster_electrical_measurement(self, domoticz_devices, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_cluster_electrical_measurement - %s - %s/%s attribut: %s value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) @@ -209,7 +209,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut == "050b": # Active Power self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Power %s" % (cluster, nwkid, ep, value)) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value)) + MajDomoDevice(self, domoticz_devices, nwkid, ep, cluster, str(value)) store_ZLinky_infos( self, nwkid, 'CCASN', value) elif attribut == "090b": @@ -221,7 +221,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att return checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) if attribut == "0505": - MajDomoDevice(self, Devices, nwkid, ep, "0001", str(value)) + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0001", str(value)) if "Model" in self.ListOfDevices[nwkid] and self.ListOfDevices[nwkid]["Model"] in ZLINK_CONF_MODEL: store_ZLinky_infos( self, nwkid, 'URMS1', value) elif attribut == "0905": @@ -244,10 +244,10 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att store_ZLinky_infos( self, nwkid, 'IRMS1', value) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, ep, cluster, str(value), Attribute_=attribut) # Check if Intensity is below subscription level - MajDomoDevice( self, Devices, nwkid, ep, "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", ) + MajDomoDevice( self, domoticz_devices, nwkid, ep, "0009", zlinky_check_alarm(self, domoticz_devices, nwkid, ep, value), Attribute_="0005", ) elif attribut in ("050a", "090a", "0a0a"): # Max Current if value == 0xFFFF: @@ -347,22 +347,22 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"]: tarif_color = self.ListOfDevices[nwkid]["ZLinky"]["Color"] if tarif_color == "White": - MajDomoDevice(self, Devices, nwkid, "01", cluster, str(0), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "01", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f3", cluster, str(0), Attribute_=attribut) elif tarif_color == "Red": - MajDomoDevice(self, Devices, nwkid, "01", cluster, str(0), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(0), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "01", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f2", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) else: # All others - MajDomoDevice(self, Devices, nwkid, "01", cluster, str(value), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(0), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "01", cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f2", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f3", cluster, str(0), Attribute_=attribut) else: - MajDomoDevice(self, Devices, nwkid, "01", cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "01", cluster, str(value), Attribute_=attribut) self.log.logging( "ZLinky", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, ) @@ -378,16 +378,16 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att if value == 0xFFFF: return - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, ep, cluster, str(value), Attribute_=attribut) # Check if Intensity is below subscription level if attribut == "0908": self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L2 %s" % (cluster, nwkid, ep, attribut, value), nwkid) - MajDomoDevice( self, Devices, nwkid, "f2", "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", ) + MajDomoDevice( self, domoticz_devices, nwkid, "f2", "0009", zlinky_check_alarm(self, domoticz_devices, nwkid, ep, value), Attribute_="0005", ) store_ZLinky_infos( self, nwkid, 'IRMS2', value) elif attribut == "0a08": self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L3 %s" % (cluster, nwkid, ep, attribut, value), nwkid) - MajDomoDevice( self, Devices, nwkid, "f3", "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", ) + MajDomoDevice( self, domoticz_devices, nwkid, "f3", "0009", zlinky_check_alarm(self, domoticz_devices, nwkid, ep, value), Attribute_="0005", ) store_ZLinky_infos( self, nwkid, 'IRMS3', value) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) @@ -402,7 +402,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att store_ZLinky_infos( self, nwkid, 'UMOY3', value) -def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value): +def zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, attribut, value): if nwkid not in self.ListOfDevices: return if "Ep" not in self.ListOfDevices[nwkid]: @@ -441,13 +441,13 @@ def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, va # Couleur du Lendemain DEMAIN Trigger Alarm if value == "BLAN": - MajDomoDevice(self, Devices, nwkid, ep, "0009", "20|Tomorrow WHITE day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "20|Tomorrow WHITE day", Attribute_="0001") elif value == "BLEU": - MajDomoDevice(self, Devices, nwkid, ep, "0009", "10|Tomorrow BLUE day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "10|Tomorrow BLUE day", Attribute_="0001") elif value == "ROUG": - MajDomoDevice(self, Devices, nwkid, ep, "0009", "40|Tomorrow RED day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "40|Tomorrow RED day", Attribute_="0001") else: - MajDomoDevice(self, Devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) @@ -476,9 +476,9 @@ def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, va value = int(value) if value == 0: - MajDomoDevice(self, Devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") else: - MajDomoDevice( self, Devices, nwkid, ep, "0009", "40|Mobile peak preannoncement: %s" % value, Attribute_="0001", ) + MajDomoDevice( self, domoticz_devices, nwkid, ep, "0009", "40|Mobile peak preannoncement: %s" % value, Attribute_="0001", ) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) @@ -501,15 +501,15 @@ def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, va _tmpattr = "0a08" if value == 0: - MajDomoDevice(self, Devices, nwkid, _tmpep, "0009", "00|Normal", Attribute_="0005") + MajDomoDevice(self, domoticz_devices, nwkid, _tmpep, "0009", "00|Normal", Attribute_="0005") return # value is equal to the Amper over the souscription # Issue critical alarm - MajDomoDevice(self, Devices, nwkid, _tmpep, "0009", "04|Critical", Attribute_="0005") + MajDomoDevice(self, domoticz_devices, nwkid, _tmpep, "0009", "04|Critical", Attribute_="0005") # Isse Current on the corresponding Ampere - MajDomoDevice(self, Devices, nwkid, ep, "0b04", str(value), Attribute_=_tmpattr) + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0b04", str(value), Attribute_=_tmpattr) elif attribut == "0201": # Standard : NTARF @@ -533,7 +533,7 @@ def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, va elif "HC" in value: s_tarif += "HC" - MajDomoDevice(self, Devices, nwkid, ep, "0009", s_tarif, Attribute_="0020") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", s_tarif, Attribute_="0020") checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'NTARF', value) From d4103b4f1771ae88d10400f2a770c54830eb4f97 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Wed, 8 Jan 2025 18:22:52 +0100 Subject: [PATCH 09/17] in case we found a color change we trigger a ReadAttributeReq_Scheduled_ZLinky --- DevicesModules/custom_zlinky.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index 4f47d22ea..ca149d799 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -20,6 +20,7 @@ update_zlinky_device_model_if_needed, zlinky_check_alarm, zlinky_color_tarif, zlinky_totalisateur) +from Modules.readAttributes import ReadAttributeReq_Scheduled_ZLinky def zlinky_clusters(self, domoticz_devices, nwkid, ep, cluster, attribut, value): @@ -81,6 +82,7 @@ def update_color(nwkid, previous_color, new_color): self.log.logging("ZLinky", "Status", f"Updating ZLinky color from {previous_color} to {new_color}", nwkid) MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", new_color, Attribute_="0020") zlinky_color_tarif(self, nwkid, new_color) + ReadAttributeReq_Scheduled_ZLinky(self, nwkid) def get_new_color(attribut, op_tarifiare): """Determine the new color based on the attribute and tariff type.""" From 6d9b09a6e6dfce04ada80cc09ed098ff642c342a Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Wed, 8 Jan 2025 18:23:15 +0100 Subject: [PATCH 10/17] sort imports --- DevicesModules/custom_zlinky.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index ca149d799..889de322d 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -13,6 +13,7 @@ import binascii from Modules.domoMaj import MajDomoDevice +from Modules.readAttributes import ReadAttributeReq_Scheduled_ZLinky from Modules.tools import checkAndStoreAttributeValue, getAttributeValue from Modules.zlinky import (ZLINK_CONF_MODEL, ZLinky_TIC_COMMAND, convert_kva_to_ampere, decode_STEG, get_OPTARIF, @@ -20,7 +21,6 @@ update_zlinky_device_model_if_needed, zlinky_check_alarm, zlinky_color_tarif, zlinky_totalisateur) -from Modules.readAttributes import ReadAttributeReq_Scheduled_ZLinky def zlinky_clusters(self, domoticz_devices, nwkid, ep, cluster, attribut, value): From 7492630000c3134693406e4ad400c2ebb4dcfd77 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Thu, 9 Jan 2025 09:31:44 +0100 Subject: [PATCH 11/17] fix mistake introduce in refactoring --- DevicesModules/custom_zlinky.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index 889de322d..0696d1abe 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -64,6 +64,7 @@ def zlinky_meter_identification(self, domoticz_devices, nwkid, ep, cluster, attr elif attribut == "000e": store_ZLinky_infos( self, nwkid, 'PCOUP', value) + def zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster, attribut, value): """ Sets the color based on the counter and tariff information for the given device. @@ -80,7 +81,7 @@ def update_color(nwkid, previous_color, new_color): """Update the device color if it has changed.""" if new_color != previous_color: self.log.logging("ZLinky", "Status", f"Updating ZLinky color from {previous_color} to {new_color}", nwkid) - MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", new_color, Attribute_="0020") + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", new_color, Attribute_="0020") zlinky_color_tarif(self, nwkid, new_color) ReadAttributeReq_Scheduled_ZLinky(self, nwkid) @@ -147,6 +148,9 @@ def handle_attribut_value(attribut, store_keys=None, update_color=False, totaliz maj_ep = maj_ep or ep MajDomoDevice(self, domoticz_devices, nwkid, maj_ep, cluster, str(value), Attribute_=attribut) + if attribut == "0020": + zlinky_color_tarif(self, nwkid, str(value)) + if update_color: zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster, attribut, value) @@ -169,7 +173,7 @@ def handle_attribut_value(attribut, store_keys=None, update_color=False, totaliz attribute_handlers = { "0000": lambda: handle_attribut_value("0000", ["BASE", "EAST"]), "0001": lambda: handle_attribut_value("0001", ["EAIT"]), - "0020": lambda: handle_attribut_value("0020", ["PTEC"], maj_ep="0009"), + "0020": lambda: handle_attribut_value("0020", ["PTEC"]), "0100": lambda: handle_attribut_value("0100", ["EASF01", "HCHC", "EJPHN", "BBRHCJB"], update_color=True, totalize=True), "0102": lambda: handle_attribut_value("0102", ["EASF02", "HCHP", "EJPHPM", "BBRHCJW"], update_color=True, totalize=True), "0104": lambda: handle_attribut_value("0104", ["EASF03", "BBRHCJW"], update_color=True, totalize=True, maj_ep="f2"), From bef0b16090480f79c92ee2c67d55958b793ab537 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Thu, 9 Jan 2025 11:51:39 +0100 Subject: [PATCH 12/17] remove un-necessary parameter un zlinky_meter_identification()b signature --- DevicesModules/custom_zlinky.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index 0696d1abe..9c12bf593 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -28,7 +28,7 @@ def zlinky_clusters(self, domoticz_devices, nwkid, ep, cluster, attribut, value) cluster, nwkid, ep, attribut, value), nwkid, ) if cluster == "0b01": - zlinky_meter_identification(self, domoticz_devices, nwkid, ep, cluster, attribut, value) + zlinky_meter_identification(self, nwkid, ep, cluster, attribut, value) elif cluster == "0702": zlinky_cluster_metering(self, domoticz_devices, nwkid, ep, cluster, attribut, value) @@ -40,7 +40,7 @@ def zlinky_clusters(self, domoticz_devices, nwkid, ep, cluster, attribut, value) zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, attribut, value) -def zlinky_meter_identification(self, domoticz_devices, nwkid, ep, cluster, attribut, value): +def zlinky_meter_identification(self, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_meter_identification %s - %s/%s Attribute: %s Value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) From 9b2927858c6adbcdf37db16d54874d822f76011f Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Sun, 12 Jan 2025 18:27:40 +0100 Subject: [PATCH 13/17] 0x0702/0x0000 must not be used for HC/HP, it is only for Base --- DevicesModules/custom_zlinky.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index 9c12bf593..ffa237036 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -88,8 +88,16 @@ def update_color(nwkid, previous_color, new_color): def get_new_color(attribut, op_tarifiare): """Determine the new color based on the attribute and tariff type.""" color_map = { - "HC..": {"0000": "HP..", "0100": "HC..", "0102": "HP.."}, - "TEMPO": {"0100": "BHC", "0102": "BHP", "0104": "WHC", "0106": "WHP", "0108": "RHC", "010a": "RHP"} + "HC..": { + "0100": "HC..", + "0102": "HP.."}, + "TEMPO": { + "0100": "BHC", + "0102": "BHP", + "0104": "WHC", + "0106": "WHP", + "0108": "RHC", + "010a": "RHP"} } return color_map.get(op_tarifiare, {}).get(attribut) From 116ebd18ac7494a3d3a09f9375deb96441c006fd Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Mon, 13 Jan 2025 14:23:53 +0100 Subject: [PATCH 14/17] update and fix --- DevicesModules/custom_zlinky.py | 10 +++++----- Modules/zlinky.py | 26 +++++++++++++++++--------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index ffa237036..85fb66cb7 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -79,11 +79,10 @@ def zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster """ def update_color(nwkid, previous_color, new_color): """Update the device color if it has changed.""" - if new_color != previous_color: - self.log.logging("ZLinky", "Status", f"Updating ZLinky color from {previous_color} to {new_color}", nwkid) - MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", new_color, Attribute_="0020") - zlinky_color_tarif(self, nwkid, new_color) - ReadAttributeReq_Scheduled_ZLinky(self, nwkid) + self.log.logging("ZLinky", "Status", f"Updating ZLinky color from {previous_color} to {new_color}", nwkid) + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", new_color, Attribute_="0020") + zlinky_color_tarif(self, nwkid, new_color) + ReadAttributeReq_Scheduled_ZLinky(self, nwkid) def get_new_color(attribut, op_tarifiare): """Determine the new color based on the attribute and tariff type.""" @@ -157,6 +156,7 @@ def handle_attribut_value(attribut, store_keys=None, update_color=False, totaliz MajDomoDevice(self, domoticz_devices, nwkid, maj_ep, cluster, str(value), Attribute_=attribut) if attribut == "0020": + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", value, Attribute_="0020") zlinky_color_tarif(self, nwkid, str(value)) if update_color: diff --git a/Modules/zlinky.py b/Modules/zlinky.py index e0751aa94..af6ace3da 100644 --- a/Modules/zlinky.py +++ b/Modules/zlinky.py @@ -102,17 +102,17 @@ def convert_kva_to_ampere( kva ): return ( kva * 1000) / 200 + def zlinky_color_tarif(self, MsgSrcAddr, color): - if "ZLinky" not in self.ListOfDevices[MsgSrcAddr]: - self.ListOfDevices[MsgSrcAddr]["ZLinky"] = {} - self.ListOfDevices[MsgSrcAddr]["ZLinky"]["Color"] = color + self.ListOfDevices.setdefault(MsgSrcAddr, {}).setdefault("ZLinky", {})["Color"] = color -def store_ZLinky_infos( self, nwkid, command_tic, value): +def store_ZLinky_infos( self, nwkid, command_tic, value): if 'ZLinky' not in self.ListOfDevices[ nwkid ]: self.ListOfDevices[ nwkid ][ 'ZLinky' ] = {} self.ListOfDevices[ nwkid ][ 'ZLinky' ][ command_tic ] = value + def get_ISOUSC( self, nwkid ): if ( @@ -155,12 +155,20 @@ def get_OPTARIF( self, nwkid): return "BASE" -def get_instant_power( self, nwkid ): - return round(float(self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"]["050f"]), 2) if "0b04" in self.ListOfDevices[nwkid]["Ep"]["01"] and "050f" in self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"] else 0 +def get_instant_power(self, nwkid): + try: + device = self.ListOfDevices.get(nwkid, {}) + ep = device.get("Ep", {}).get("01", {}) + cluster = ep.get("0b04", {}) + power = cluster.get("050f") + return round(float(power), 2) if power is not None else 0 + except (ValueError, TypeError): + return 0 + + +def get_tarif_color(self, nwkid): + return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("Color") -def get_tarif_color( self, nwkid ): - return self.ListOfDevices[nwkid]["ZLinky"]["Color"] if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"] else None - def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): From 6087db1ef74d6736b5d76cc7ac51f9e9c424e935 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Mon, 13 Jan 2025 17:16:37 +0100 Subject: [PATCH 15/17] adding function to retreive PTEC --- Modules/zlinky.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Modules/zlinky.py b/Modules/zlinky.py index af6ace3da..85689b624 100644 --- a/Modules/zlinky.py +++ b/Modules/zlinky.py @@ -169,7 +169,12 @@ def get_instant_power(self, nwkid): def get_tarif_color(self, nwkid): return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("Color") - + +def get_ptec(self, nwkid): + """ Retreive Current Tarif.""" + return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("PTEC") + + def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): if value == 0: From 47d93694c1b71c18ed16f833720edaa5169ab47c Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Mon, 13 Jan 2025 17:19:32 +0100 Subject: [PATCH 16/17] in case PTEC is not aligned with new color, trigger a Read Attribute --- DevicesModules/custom_zlinky.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index 85fb66cb7..0f9942ef2 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -16,7 +16,7 @@ from Modules.readAttributes import ReadAttributeReq_Scheduled_ZLinky from Modules.tools import checkAndStoreAttributeValue, getAttributeValue from Modules.zlinky import (ZLINK_CONF_MODEL, ZLinky_TIC_COMMAND, - convert_kva_to_ampere, decode_STEG, get_OPTARIF, + convert_kva_to_ampere, decode_STEG, get_OPTARIF,get_ptec, get_tarif_color, linky_mode, store_ZLinky_infos, update_zlinky_device_model_if_needed, zlinky_check_alarm, zlinky_color_tarif, @@ -77,14 +77,19 @@ def zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster attribut: The attribute being processed. value: The current value of the attribute. """ - def update_color(nwkid, previous_color, new_color): - """Update the device color if it has changed.""" - self.log.logging("ZLinky", "Status", f"Updating ZLinky color from {previous_color} to {new_color}", nwkid) + def _zlinky_update_color(nwkid, previous_color, new_color): + """Update the device color, if it has changed request a Read Attribute to get the Color""" + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", new_color, Attribute_="0020") zlinky_color_tarif(self, nwkid, new_color) - ReadAttributeReq_Scheduled_ZLinky(self, nwkid) - def get_new_color(attribut, op_tarifiare): + ptect_value = get_ptec(self, nwkid) + if ptect_value != new_color: + ### Looks like the PTEC info is not aligned with the current color ! + self.log.logging("ZLinky", "Status", f"Requesting PTEC as not inline {ptect_value} to {previous_color}/{new_color}", nwkid) + ReadAttributeReq_Scheduled_ZLinky(self, nwkid) + + def get_corresponding_color(attribut, op_tarifiare): """Determine the new color based on the attribute and tariff type.""" color_map = { "HC..": { @@ -100,6 +105,7 @@ def get_new_color(attribut, op_tarifiare): } return color_map.get(op_tarifiare, {}).get(attribut) + self.log.logging("ZLinky", "Debug", f"Cluster: {cluster}, Attribute: {attribut}, Value: {value}", nwkid) # Fetch current tariff @@ -112,27 +118,30 @@ def get_new_color(attribut, op_tarifiare): # Get previous values previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut) - previous_color = get_tarif_color(self, nwkid) - self.log.logging("ZLinky", "Debug", f"PrevValue: {previous_value}, PrevColor: {previous_color}", nwkid) # Exit if value is zero or hasn't changed if value == 0 or previous_value == value: return - # Determine the new color - new_color = get_new_color(attribut, op_tarifiare) + # Get previous Color + previous_color_value = getAttributeValue(self, nwkid, ep, "0702", "0020") + previous_color = get_tarif_color(self, nwkid) + self.log.logging("ZLinky", "Debug", f"PrevValue: {previous_value}, PrevValueAttributColor: {previous_color_value} PrevColor: {previous_color}", nwkid) + + # Determine the current color + new_color = get_corresponding_color(attribut, op_tarifiare) if not new_color: return # Handle updates for non-TEMPO tariffs if op_tarifiare != "TEMPO": self.log.logging("ZLinky", "Debug", f"Non-TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) - update_color(nwkid, previous_color, new_color) + _zlinky_update_color(nwkid, previous_color, new_color) return # Handle updates for TEMPO-specific tariffs self.log.logging("ZLinky", "Debug", f"TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) - update_color(nwkid, previous_color, new_color) + _zlinky_update_color(nwkid, previous_color, new_color) def zlinky_cluster_metering(self, domoticz_devices, nwkid, ep, cluster, attribut, value): From 2a5560573101089310cd80c5aee304203b77dd3a Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Tue, 14 Jan 2025 15:58:03 +0100 Subject: [PATCH 17/17] cosmetic --- DevicesModules/custom_zlinky.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index 0f9942ef2..49ceba55c 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -16,8 +16,9 @@ from Modules.readAttributes import ReadAttributeReq_Scheduled_ZLinky from Modules.tools import checkAndStoreAttributeValue, getAttributeValue from Modules.zlinky import (ZLINK_CONF_MODEL, ZLinky_TIC_COMMAND, - convert_kva_to_ampere, decode_STEG, get_OPTARIF,get_ptec, - get_tarif_color, linky_mode, store_ZLinky_infos, + convert_kva_to_ampere, decode_STEG, get_OPTARIF, + get_ptec, get_tarif_color, linky_mode, + store_ZLinky_infos, update_zlinky_device_model_if_needed, zlinky_check_alarm, zlinky_color_tarif, zlinky_totalisateur) @@ -85,7 +86,7 @@ def _zlinky_update_color(nwkid, previous_color, new_color): ptect_value = get_ptec(self, nwkid) if ptect_value != new_color: - ### Looks like the PTEC info is not aligned with the current color ! + # Looks like the PTEC info is not aligned with the current color ! self.log.logging("ZLinky", "Status", f"Requesting PTEC as not inline {ptect_value} to {previous_color}/{new_color}", nwkid) ReadAttributeReq_Scheduled_ZLinky(self, nwkid)