From 1da93224380350d8c91c5f831cd41b8ddcfdbb6d Mon Sep 17 00:00:00 2001 From: uzlonewolf Date: Mon, 5 Aug 2024 02:31:45 -0700 Subject: [PATCH] Cache tweaks and pylint cleanup --- tinytuya/BulbDevice.py | 10 +++---- tinytuya/core.py | 60 +++++++++++++++++++++++++----------------- 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/tinytuya/BulbDevice.py b/tinytuya/BulbDevice.py index e8557d4..6703f96 100644 --- a/tinytuya/BulbDevice.py +++ b/tinytuya/BulbDevice.py @@ -200,7 +200,7 @@ def _hexvalue_to_hsv(hexvalue, bulb="A"): else: # Unsupported bulb type raise ValueError(f"Unsupported bulb type {bulb} - unable to determine HSV values.") - + return (h, s, v) def turn_on(self, switch=0, nowait=False): @@ -276,7 +276,7 @@ def set_scene(self, scene, nowait=False): return self.set_value( self.DPS_INDEX_MODE[self.bulb_type], s, nowait=nowait ) - def set_colour(self, r, g, b, nowait=False, force=False): + def set_colour(self, r, g, b, nowait=False): """ Set colour of an rgb bulb. @@ -319,7 +319,7 @@ def set_colour(self, r, g, b, nowait=False, force=False): # check to see if power and mode also need to be set state = self.cached_status(nowait=True) - if (not force) and state and self.DPS in state and state[self.DPS]: + if state and self.DPS in state and state[self.DPS]: # last state is cached, so check to see if 'mode' needs to be set if (dp_index_mode not in state[self.DPS]) or (state[self.DPS][dp_index_mode] != self.DPS_MODE_COLOUR): payload[dp_index_mode] = self.DPS_MODE_COLOUR @@ -404,7 +404,7 @@ def set_white_percentage(self, brightness=100, colourtemp=0, nowait=False): data = self.set_white(b, c, nowait=nowait) return data - def set_white(self, brightness=-1, colourtemp=-1, nowait=False, force=False): + def set_white(self, brightness=-1, colourtemp=-1, nowait=False): """ Set white coloured theme of an rgb bulb. @@ -456,7 +456,7 @@ def set_white(self, brightness=-1, colourtemp=-1, nowait=False, force=False): # check to see if power and mode also need to be set state = self.cached_status(nowait=True) - if (not force) and state and self.DPS in state and state[self.DPS]: + if state and self.DPS in state and state[self.DPS]: # last state is cached, so check to see if 'mode' needs to be set if (dp_index_mode not in state[self.DPS]) or (state[self.DPS][dp_index_mode] != self.DPS_MODE_WHITE): payload[dp_index_mode] = self.DPS_MODE_WHITE diff --git a/tinytuya/core.py b/tinytuya/core.py index bda7f1e..70f11f9 100644 --- a/tinytuya/core.py +++ b/tinytuya/core.py @@ -13,7 +13,7 @@ * XenonDevice(...) - Base Tuya Objects and Functions XenonDevice(dev_id, address=None, local_key="", dev_type="default", connection_timeout=5, version="3.1", persist=False, cid/node_id=None, parent=None, connection_retry_limit=5, - connection_retry_delay=5) + connection_retry_delay=5, max_simultaneous_dps=0) * Device(XenonDevice) - Tuya Class for Devices Module Functions @@ -76,7 +76,6 @@ import json import logging import socket -import select import struct import sys import time @@ -364,7 +363,7 @@ def decrypt(self, enc, use_base64=True, decode_text=True, verify_padding=False, return raw.decode("utf-8") if decode_text else raw class _AESCipher_pyaes(_AESCipher_Base): - def encrypt(self, raw, use_base64=True, pad=True, iv=False, header=None): # pylint: disable=W0621 + def encrypt(self, raw, use_base64=True, pad=True, iv=False, header=None): # pylint: disable=W0613,W0621 if iv: # GCM required for 3.5 devices raise NotImplementedError( 'pyaes does not support GCM, please install PyCryptodome' ) @@ -378,7 +377,7 @@ def encrypt(self, raw, use_base64=True, pad=True, iv=False, header=None): # pyli crypted_text += cipher.feed() # flush final block return base64.b64encode(crypted_text) if use_base64 else crypted_text - def decrypt(self, enc, use_base64=True, decode_text=True, verify_padding=False, iv=False, header=None, tag=None): + def decrypt(self, enc, use_base64=True, decode_text=True, verify_padding=False, iv=False, header=None, tag=None): # pylint: disable=W0613 if iv: # GCM required for 3.5 devices raise NotImplementedError( 'pyaes does not support GCM, please install PyCryptodome' ) @@ -446,7 +445,7 @@ def set_debug(toggle=True, color=True): log.setLevel(logging.DEBUG) log.debug("TinyTuya [%s]\n", __version__) log.debug("Python %s on %s", sys.version, sys.platform) - if AESCipher.CRYPTOLIB_HAS_GCM == False: + if not AESCipher.CRYPTOLIB_HAS_GCM: log.debug("Using %s %s for crypto", AESCipher.CRYPTOLIB, AESCipher.CRYPTOLIB_VER) log.debug("Warning: Crypto library does not support AES-GCM, v3.5 devices will not work!") else: @@ -696,7 +695,7 @@ def assign_dp_mappings( tuyadevices, mappings ): raise ValueError( '\'mappings\' must be a dict' ) if (not mappings) or (not tuyadevices): - return None + return for dev in tuyadevices: try: @@ -820,7 +819,11 @@ def assign_dp_mappings( tuyadevices, mappings ): class XenonDevice(object): def __init__( - self, dev_id, address=None, local_key="", dev_type="default", connection_timeout=5, version=3.1, persist=False, cid=None, node_id=None, parent=None, connection_retry_limit=5, connection_retry_delay=5, port=TCPPORT, max_simultaneous_dps=0 # pylint: disable=W0621 + self, dev_id, address=None, local_key="", dev_type="default", connection_timeout=5, + version=3.1, # pylint: disable=W0621 + persist=False, cid=None, node_id=None, parent=None, + connection_retry_limit=5, connection_retry_delay=5, port=TCPPORT, + max_simultaneous_dps=0 ): """ Represents a Tuya device. @@ -864,7 +867,12 @@ def __init__( self.local_nonce = b'0123456789abcdef' # not-so-random random key self.remote_nonce = b'' self.payload_dict = None - self.max_simultaneous_dps = max_simultaneous_dps + self._last_status = {} + self.max_simultaneous_dps = max_simultaneous_dps if max_simultaneous_dps else 0 + self.version = 0.0 + self.version_str = 'v0.0' + self.version_bytes = b'0.0' + self.version_header = b'' if not local_key: local_key = "" @@ -895,7 +903,7 @@ def __init__( bcast_data = find_device(dev_id) if bcast_data['ip'] is None: log.debug("Unable to find device on network (specify IP address)") - raise Exception("Unable to find device on network (specify IP address)") + raise RuntimeError("Unable to find device on network (specify IP address)") self.address = bcast_data['ip'] self.set_version(float(bcast_data['version'])) time.sleep(0.1) @@ -907,14 +915,7 @@ def __init__( XenonDevice.set_version(self, 3.1) def __del__(self): - # In case we have a lingering socket connection, close it - try: - if self.socket: - # self.socket.shutdown(socket.SHUT_RDWR) - self.socket.close() - self.socket = None - except: - pass + self.close() def __repr__(self): # FIXME can do better than this @@ -1388,16 +1389,16 @@ def _decode_payload(self, payload): return json_payload - def _cache_response(self, response): # pylint: disable=R0201 + def _cache_response(self, response): """ Save (cache) the last value of every DP """ if (not self.socketPersistent) or (not self.socket): return - _merge_results(self._last_status, response) + merge_dps_results(self._last_status, response) - def _process_response(self, response): # pylint: disable=R0201 + def _process_response(self, response): """ Override this function in a sub-class if you want to do some processing on the received data """ @@ -1593,6 +1594,9 @@ def cached_status(self, nowait=False): log.debug("Have status cache, returning it") return self._last_status + def cache_clear(self): + self._last_status = {} + def subdev_query( self, nowait=False ): """Query for a list of sub-devices and their status""" # final payload should look like: {"data":{"cids":[]},"reqType":"subdev_online_stat_query"} @@ -1684,7 +1688,15 @@ def set_sendWait(self, s): self.sendWait = s def close(self): - self.__del__() + # In case we have a lingering socket connection, close it + try: + if self.socket: + # self.socket.shutdown(socket.SHUT_RDWR) + self.socket.close() + except: + pass + + self.socket = None @staticmethod def find(did): @@ -1952,7 +1964,7 @@ def set_multiple_values(self, data, nowait=False): ret = {} for k in data: result = self.set_value(k, data[k], nowait=nowait) - _merge_results(ret, result) + merge_dps_results(ret, result) time.sleep(1) return ret @@ -1976,7 +1988,7 @@ def set_multiple_values(self, data, nowait=False): result = res for k in out: res = self.set_value(k, out[k], nowait=nowait) - _merge_results(result, res) + merge_dps_results(result, res) return result def turn_on(self, switch=1, nowait=False): @@ -2122,7 +2134,7 @@ def deviceScan(verbose=False, maxretry=None, color=True, poll=True, forcescan=Fa return scanner.devices(verbose=verbose, scantime=maxretry, color=color, poll=poll, forcescan=forcescan, byID=byID) # Merge multiple results into a single dict -def _merge_results(dest, src): +def merge_dps_results(dest, src): if src and isinstance(src, dict) and 'Error' not in src and 'Err' not in src: for k in src: if k == 'dps' and src[k] and isinstance(src[k], dict):