diff --git a/.github/workflows/contrib.yml b/.github/workflows/contrib.yml index c1ff5449..afe95808 100644 --- a/.github/workflows/contrib.yml +++ b/.github/workflows/contrib.yml @@ -27,7 +27,7 @@ jobs: python -VV python -m site python -m pip install --upgrade pip setuptools wheel - python -m pip install --upgrade pycryptodome requests colorama + python -m pip install --upgrade cryptography requests colorama - name: "Run testcontrib.py on ${{ matrix.python-version }}" run: "python -m testcontrib.py" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 73b709d9..8f0ccd6c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,7 +27,7 @@ jobs: python -VV python -m site python -m pip install --upgrade pip setuptools wheel - python -m pip install --upgrade pycryptodome requests colorama + python -m pip install --upgrade cryptography requests colorama - name: "Run test.py on ${{ matrix.python-version }}" run: "python -m test.py" diff --git a/README.md b/README.md index cb1e3aca..656a8972 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ TinyTuya supports python versions 2.7 and 3.x (recommended). python -m pip install tinytuya ``` -Pip will attempt to install `pycryptodome`, `requests` and `colorama` if not already installed. +Pip will attempt to install `cryptography`, `requests` and `colorama` if not already installed. ## Tuya Device Preparation @@ -419,7 +419,7 @@ print( json.dumps(r, indent=2) ) ### Encryption Notes -Tuya devices use AES encryption which is not available in the Python standard library. **PyCryptodome** is recommended and installed by default. Other options include **PyCrypto** and **pyaes**. +Tuya devices use AES encryption which is not available in the Python standard library. **PyCA/cryptography** is recommended and installed by default. Other options include **PyCryptodome** , **PyCrypto** and **pyaes**. * Deprecation notice for pyaes: The pyaes library works for Tuya Protocol <= 3.4 but will not work for 3.5 devices. This is because pyaes does not support GCM which is required for v3.5 devices. diff --git a/RELEASE.md b/RELEASE.md index 21b69e4e..026c9f58 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,5 +1,12 @@ # RELEASE NOTES +## v1.13.0 - Crypto Library Update + +* PyPI 1.13.0 +* Updates AESCipher() to make it a bit easier to add additional crypto libraries. It also adds pyca/cryptography as the default. By @uzlonewolf in https://github.com/jasonacox/tinytuya/pull/423 +* Fixes issue with tinytuya.find_device() for v3.1 devices and the infinite loop in Contrib/IRRemoteControlDevice.py (Closes #403). +* Officially removes Python 2.7 support. + ## v1.12.11 - Bug Fix for _get_socket() * PyPI 1.12.11 diff --git a/requirements.txt b/requirements.txt index 97b7b7db..25f395c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ # -pycryptodome # Encryption - AES can also be provided via PyCrypto or pyaes +cryptography # Encryption - AES can also be provided via PyCryptodome or pyaes or pyca/cryptography requests # Used for Setup Wizard - Tuya IoT Platform calls colorama # Makes ANSI escape character sequences work under MS Windows. diff --git a/server/server.py b/server/server.py index ca797d3e..3e1ab46a 100644 --- a/server/server.py +++ b/server/server.py @@ -67,7 +67,7 @@ import tinytuya -BUILD = "t8" +BUILD = "t9" # Defaults APIPORT = 8888 @@ -293,11 +293,7 @@ def tuyalisten(port): gwId = dname = dkey = mac = "" result = data try: - result = data[20:-8] - try: - result = tinytuya.decrypt_udp(result) - except: - result = result.decode() + result = tinytuya.decrypt_udp( data ) result = json.loads(result) #log.debug("Received valid UDP packet: %r", result) ip = result["ip"] diff --git a/setup.py b/setup.py index 2190221d..f8e69bde 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,34 @@ import setuptools +from pkg_resources import DistributionNotFound, get_distribution from tinytuya import __version__ with open("README.md", "r") as fh: long_description = fh.read() +INSTALL_REQUIRES = [ + 'requests', # Used for Setup Wizard - Tuya IoT Platform calls + 'colorama', # Makes ANSI escape character sequences work under MS Windows. +] + +CHOOSE_CRYPTO_LIB = [ + 'cryptography', # pyca/cryptography - https://cryptography.io/en/latest/ + 'pycryptodome', # PyCryptodome - https://pycryptodome.readthedocs.io/en/latest/ + 'pyaes', # pyaes - https://github.com/ricmoo/pyaes + 'pycrypto', # PyCrypto - https://www.pycrypto.org/ +] + +pref_lib = CHOOSE_CRYPTO_LIB[0] +for cryptolib in CHOOSE_CRYPTO_LIB: + try: + get_distribution(cryptolib) + pref_lib = cryptolib + break + except DistributionNotFound: + pass + +INSTALL_REQUIRES.append( pref_lib ) + setuptools.setup( name="tinytuya", version=__version__, @@ -15,13 +39,8 @@ long_description_content_type="text/markdown", url='https://github.com/jasonacox/tinytuya', packages=setuptools.find_packages(exclude=("sandbox",)), - install_requires=[ - 'pycryptodome', # Encryption - AES can also be provided via PyCrypto or pyaes - 'requests', # Used for Setup Wizard - Tuya IoT Platform calls - 'colorama', # Makes ANSI escape character sequences work under MS Windows. - ], + install_requires=INSTALL_REQUIRES, classifiers=[ - "Programming Language :: Python :: 2", "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", diff --git a/tinytuya/Contrib/IRRemoteControlDevice.py b/tinytuya/Contrib/IRRemoteControlDevice.py index c6d0d59b..87816554 100644 --- a/tinytuya/Contrib/IRRemoteControlDevice.py +++ b/tinytuya/Contrib/IRRemoteControlDevice.py @@ -184,24 +184,18 @@ def detect_control_type( self ): self.study_end() self.control_type = 0 status = self.status() - while status: - if status and 'dps' in status: - # original devices using DPS 201/202 - if self.DP_SEND_IR in status['dps']: - log.debug( 'Detected control type 1' ) - self.control_type = 1 - break - # newer devices using DPS 1-13 - elif self.DP_MODE in status['dps']: - log.debug( 'Detected control type 2' ) - self.control_type = 2 - break + while status and 'dps' in status: + # original devices using DPS 201/202 + if self.DP_SEND_IR in status['dps']: + log.debug( 'Detected control type 1' ) + self.control_type = 1 + # newer devices using DPS 1-13 + elif self.DP_MODE in status['dps']: + log.debug( 'Detected control type 2' ) + self.control_type = 2 status = self._send_receive(None) if not self.control_type: log.warning( 'Detect control type failed! control_type= must be set manually' ) - elif status: - # try and make sure no data is waiting to be read - status = self._send_receive(None) self.set_socketTimeout( old_timeout ) self.set_socketPersistent( old_persist ) diff --git a/tinytuya/core.py b/tinytuya/core.py index 1a5c19a6..6637152f 100644 --- a/tinytuya/core.py +++ b/tinytuya/core.py @@ -88,18 +88,39 @@ except NameError: pass -# Required module: pycryptodome -try: - import Crypto - from Crypto.Cipher import AES # PyCrypto -except ImportError: - Crypto = AES = None - import pyaes # https://github.com/ricmoo/pyaes +for clib in ('pyca/cryptography', 'PyCryptodomex', 'PyCrypto', 'pyaes'): + Crypto = AES = CRYPTOLIB = None + try: + if clib == 'pyca/cryptography': # https://cryptography.io/en/latest/ + from cryptography.hazmat.primitives.ciphers import Cipher as Crypto + from cryptography.hazmat.primitives.ciphers import modes as Crypto_modes + from cryptography.hazmat.primitives.ciphers.algorithms import AES + from cryptography import __version__ as Crypto_version + elif clib == 'PyCryptodomex': # https://pycryptodome.readthedocs.io/en/latest/ + # PyCryptodome is installed as "Cryptodome" when installed by + # `apt install python3-pycryptodome` or `pip install pycryptodomex` + import Cryptodome as Crypto + from Cryptodome.Cipher import AES + elif clib == 'PyCrypto': # https://www.pycrypto.org/ + import Crypto + from Crypto.Cipher import AES + # v1/v2 is PyCrypto, v3 is PyCryptodome + clib = 'PyCrypto' if Crypto.version_info[0] < 3 else 'PyCryptodome' + elif clib == 'pyaes': + import pyaes # https://github.com/ricmoo/pyaes + else: + continue + CRYPTOLIB = clib + break + except ImportError: + continue +if CRYPTOLIB is None: + raise ModuleNotFoundError('No crypto library found, please "pip install" cryptography, pycryptodome, or pyaes') # Colorama terminal color capability for all platforms init() -version_tuple = (1, 12, 11) +version_tuple = (1, 13, 0) version = __version__ = "%d.%d.%d" % version_tuple __author__ = "jasonacox" @@ -226,97 +247,170 @@ class DecodeError(Exception): pass # Cryptography Helpers -class AESCipher(object): +class _AESCipher_Base(object): def __init__(self, key): - self.bs = 16 self.key = key - def encrypt(self, raw, use_base64=True, pad=True, iv=False, header=None): # pylint: disable=W0621 - if Crypto: - if iv: # initialization vector or nonce (number used once) - if iv is True: - if log.isEnabledFor( logging.DEBUG ): - iv = b'0123456789ab' - else: - iv = str(time.time() * 10)[:12].encode('utf8') - cipher = AES.new(self.key, mode=AES.MODE_GCM, nonce=iv) - if header: - cipher.update(header) - crypted_text, tag = cipher.encrypt_and_digest(raw) - crypted_text = cipher.nonce + crypted_text + tag + @classmethod + def get_encryption_iv( cls, iv ): + if not cls.CRYPTOLIB_HAS_GCM: + raise NotImplementedError( 'Crypto library does not support GCM' ) + if iv is True: + if log.isEnabledFor( logging.DEBUG ): + iv = b'0123456789ab' else: - if pad: raw = self._pad(raw) - cipher = AES.new(self.key, mode=AES.MODE_ECB) - crypted_text = cipher.encrypt(raw) - else: - if iv: - # GCM required for 3.5 devices - raise NotImplementedError( 'pyaes does not support GCM, please install PyCryptodome' ) - - _ = self._pad(raw) - # pylint: disable-next=used-before-assignment - cipher = pyaes.blockfeeder.Encrypter( - pyaes.AESModeOfOperationECB(self.key), - pyaes.PADDING_DEFAULT if pad else pyaes.PADDING_NONE - ) # no IV, auto pads to 16 - crypted_text = cipher.feed(raw) - crypted_text += cipher.feed() # flush final block + iv = str(time.time() * 10)[:12].encode('utf8') + return iv + + @classmethod + def get_decryption_iv( cls, iv, data ): + if not cls.CRYPTOLIB_HAS_GCM: + raise NotImplementedError( 'Crypto library does not support GCM' ) + if iv is True: + iv = data[:12] + data = data[12:] + return iv, data - if use_base64: - return base64.b64encode(crypted_text) + @staticmethod + def _pad(s, bs): + padnum = bs - len(s) % bs + return s + padnum * chr(padnum).encode() + + @staticmethod + def _unpad(s, verify_padding=False): + padlen = ord(s[-1:]) + if padlen < 1 or padlen > 16: + raise ValueError("invalid padding length byte") + if verify_padding and s[-padlen:] != (padlen * chr(padlen).encode()): + raise ValueError("invalid padding data") + return s[:-padlen] + +class _AESCipher_pyca(_AESCipher_Base): + def encrypt(self, raw, use_base64=True, pad=True, iv=False, header=None): # pylint: disable=W0621 + if iv: # initialization vector or nonce (number used once) + iv = self.get_encryption_iv( iv ) + encryptor = Crypto( AES(self.key), Crypto_modes.GCM(iv) ).encryptor() + if header: + encryptor.authenticate_additional_data(header) + crypted_text = encryptor.update(raw) + encryptor.finalize() + crypted_text = iv + crypted_text + encryptor.tag else: - return crypted_text + if pad: raw = self._pad(raw, 16) + encryptor = Crypto( AES(self.key), Crypto_modes.ECB() ).encryptor() + crypted_text = encryptor.update(raw) + encryptor.finalize() + + return base64.b64encode(crypted_text) if use_base64 else crypted_text def decrypt(self, enc, use_base64=True, decode_text=True, verify_padding=False, iv=False, header=None, tag=None): if not iv: if use_base64: enc = base64.b64decode(enc) - if len(enc) % 16 != 0: raise ValueError("invalid length") + if iv: + iv, enc = self.get_decryption_iv( iv, enc ) + decryptor = Crypto( AES(self.key), Crypto_modes.GCM(iv, tag) ).decryptor() + if header: + decryptor.authenticate_additional_data( header ) + #if tag is None: + # raw = decryptor.update( enc ) + #else: + raw = decryptor.update( enc ) + decryptor.finalize() + else: + decryptor = Crypto( AES(self.key), Crypto_modes.ECB() ).decryptor() + raw = decryptor.update( enc ) + decryptor.finalize() + raw = self._unpad(raw, verify_padding) + return raw.decode("utf-8") if decode_text else raw - if Crypto: - if iv: - if iv is True: - iv = enc[:12] - enc = enc[12:] - cipher = AES.new(self.key, AES.MODE_GCM, nonce=iv) - if header: - cipher.update(header) - if tag: - raw = cipher.decrypt_and_verify(enc, tag) - else: - raw = cipher.decrypt(enc) +class _AESCipher_PyCrypto(_AESCipher_Base): + def encrypt(self, raw, use_base64=True, pad=True, iv=False, header=None): # pylint: disable=W0621 + if iv: # initialization vector or nonce (number used once) + iv = self.get_encryption_iv( iv ) + cipher = AES.new(self.key, mode=AES.MODE_GCM, nonce=iv) + if header: + cipher.update(header) + crypted_text, tag = cipher.encrypt_and_digest(raw) + crypted_text = cipher.nonce + crypted_text + tag + else: + if pad: raw = self._pad(raw, 16) + cipher = AES.new(self.key, mode=AES.MODE_ECB) + crypted_text = cipher.encrypt(raw) + + return base64.b64encode(crypted_text) if use_base64 else crypted_text + + def decrypt(self, enc, use_base64=True, decode_text=True, verify_padding=False, iv=False, header=None, tag=None): + if not iv: + if use_base64: + enc = base64.b64decode(enc) + if len(enc) % 16 != 0: + raise ValueError("invalid length") + if iv: + iv, enc = self.get_decryption_iv( iv, enc ) + cipher = AES.new(self.key, AES.MODE_GCM, nonce=iv) + if header: + cipher.update(header) + if tag: + raw = cipher.decrypt_and_verify(enc, tag) else: - cipher = AES.new(self.key, AES.MODE_ECB) raw = cipher.decrypt(enc) - raw = self._unpad(raw, verify_padding) - return raw.decode("utf-8") if decode_text else raw else: - if iv: - # GCM required for 3.5 devices - raise NotImplementedError( 'pyaes does not support GCM, please install PyCryptodome' ) - cipher = pyaes.blockfeeder.Decrypter( - pyaes.AESModeOfOperationECB(self.key), - pyaes.PADDING_NONE if verify_padding else pyaes.PADDING_DEFAULT - ) # no IV, auto pads to 16 - raw = cipher.feed(enc) - raw += cipher.feed() # flush final block - if verify_padding: raw = self._unpad(raw, verify_padding) - return raw.decode("utf-8") if decode_text else raw - - def _pad(self, s): - padnum = self.bs - len(s) % self.bs - return s + padnum * chr(padnum).encode() + cipher = AES.new(self.key, AES.MODE_ECB) + raw = cipher.decrypt(enc) + raw = self._unpad(raw, verify_padding) + return raw.decode("utf-8") if decode_text else raw - @staticmethod - def _unpad(s, verify_padding=False): - padlen = ord(s[-1:]) - if padlen < 1 or padlen > 16: - raise ValueError("invalid padding length byte") - if verify_padding and s[-padlen:] != (padlen * chr(padlen).encode()): - raise ValueError("invalid padding data") - return s[:-padlen] +class _AESCipher_pyaes(_AESCipher_Base): + def encrypt(self, raw, use_base64=True, pad=True, iv=False, header=None): # pylint: disable=W0621 + if iv: + # GCM required for 3.5 devices + raise NotImplementedError( 'pyaes does not support GCM, please install PyCryptodome' ) + + # pylint: disable-next=used-before-assignment + cipher = pyaes.blockfeeder.Encrypter( + pyaes.AESModeOfOperationECB(self.key), + pyaes.PADDING_DEFAULT if pad else pyaes.PADDING_NONE + ) # no IV, auto pads to 16 + crypted_text = cipher.feed(raw) + crypted_text += cipher.feed() # flush final block + return base64.b64encode(crypted_text) if use_base64 else crypted_text + + def decrypt(self, enc, use_base64=True, decode_text=True, verify_padding=False, iv=False, header=None, tag=None): + if iv: + # GCM required for 3.5 devices + raise NotImplementedError( 'pyaes does not support GCM, please install PyCryptodome' ) + + if use_base64: + enc = base64.b64decode(enc) + + if len(enc) % 16 != 0: + raise ValueError("invalid length") + + cipher = pyaes.blockfeeder.Decrypter( + pyaes.AESModeOfOperationECB(self.key), + pyaes.PADDING_NONE if verify_padding else pyaes.PADDING_DEFAULT + ) # no IV, auto pads to 16 + + raw = cipher.feed(enc) + raw += cipher.feed() # flush final block + + if verify_padding: raw = self._unpad(raw, verify_padding) + return raw.decode("utf-8") if decode_text else raw + +if CRYPTOLIB[:8] == 'PyCrypto': # PyCrypto, PyCryptodome, and PyCryptodomex + class AESCipher(_AESCipher_PyCrypto): + CRYPTOLIB = CRYPTOLIB + CRYPTOLIB_VER = '.'.join( [str(x) for x in Crypto.version_info] ) + CRYPTOLIB_HAS_GCM = getattr( AES, 'MODE_GCM', False ) # only PyCryptodome supports GCM, PyCrypto does not +elif CRYPTOLIB == 'pyaes': + class AESCipher(_AESCipher_pyaes): + CRYPTOLIB = CRYPTOLIB + CRYPTOLIB_VER = '.'.join( [str(x) for x in pyaes.VERSION] ) + CRYPTOLIB_HAS_GCM = False +elif CRYPTOLIB == 'pyca/cryptography': + class AESCipher(_AESCipher_pyca): + CRYPTOLIB = CRYPTOLIB + CRYPTOLIB_VER = Crypto_version + CRYPTOLIB_HAS_GCM = getattr( Crypto_modes, 'GCM', False ) # Misc Helpers def bin2hex(x, pretty=False): @@ -348,11 +442,11 @@ def set_debug(toggle=True, color=True): log.setLevel(logging.DEBUG) log.debug("TinyTuya [%s]\n", __version__) log.debug("Python %s on %s", sys.version, sys.platform) - if Crypto is None: - # pylint: disable-next=used-before-assignment - log.debug("Using pyaes version %r", pyaes.VERSION) + if AESCipher.CRYPTOLIB_HAS_GCM == False: + log.debug("Using %s %s for crypto", AESCipher.CRYPTOLIB, AESCipher.CRYPTOLIB_VER) + log.debug("Warning: Crypto library does not support AES-GCM, v3.5 devices will not work!") else: - log.debug("Using PyCrypto %r", Crypto.version_info) + log.debug("Using %s %s for crypto, GCM is supported", AESCipher.CRYPTOLIB, AESCipher.CRYPTOLIB_VER) else: log.setLevel(logging.NOTSET) @@ -571,11 +665,7 @@ def find_device(dev_id=None, address=None): gwId = version = "" # pylint: disable=W0621 result = data try: - try: - result = decrypt_udp(result) - except: - result = result.decode() - + result = decrypt_udp(result) result = json.loads(result) ip = result["ip"] gwId = result["gwId"] @@ -1918,9 +2008,21 @@ def decrypt(msg, key): udpkey = md5(b"yGAdlopoPVldABfn").digest() def decrypt_udp(msg): - if msg[:4] == PREFIX_55AA_BIN: - return decrypt(msg[20:-8], udpkey) - if msg[:4] == PREFIX_6699_BIN: + try: + header = parse_header(msg) + except: + header = None + if not header: + return decrypt(msg, udpkey) + if header.prefix == PREFIX_55AA_VALUE: + payload = unpack_message(msg).payload + try: + if payload[:1] == b'{' and payload[-1:] == b'}': + return payload.decode() + except: + pass + return decrypt(payload, udpkey) + if header.prefix == PREFIX_6699_VALUE: unpacked = unpack_message(msg, hmac_key=udpkey, no_retcode=None) payload = unpacked.payload.decode() # app sometimes has extra bytes at the end diff --git a/tinytuya/scanner.py b/tinytuya/scanner.py index 1e750e1c..1102ab3b 100644 --- a/tinytuya/scanner.py +++ b/tinytuya/scanner.py @@ -962,7 +962,7 @@ def devices(verbose=False, scantime=None, color=True, poll=True, forcescan=False Parameters: verbose = True or False, print formatted output to stdout [Default: False] - scantime = The time to wait to pick up UDP from all devices + scantime = The time to wait to pick up UDP from all devices (ignored when discover=False)) color = True or False, print output in color [Default: True] poll = True or False, poll dps status for devices if possible forcescan = True, False, or a list of networks to force scan for device IP addresses @@ -1339,10 +1339,7 @@ def tuyaLookup(deviceid): ip = addr[0] result = b'' try: - if sock is client: - result = tinytuya.unpack_message( data ).payload.decode() - else: - result = tinytuya.decrypt_udp( data ) + result = tinytuya.decrypt_udp( data ) result = json.loads(result) log.debug("Received valid UDP packet: %r", result) except: diff --git a/tools/pcap_parse.py b/tools/pcap_parse.py index a0898bea..663185a4 100644 --- a/tools/pcap_parse.py +++ b/tools/pcap_parse.py @@ -279,7 +279,7 @@ def process_pcap( pcap_file, args ): continue if( isinstance(eth.ip.data, dpkt.udp.UDP) ): - if( eth.ip.udp.dport == 6667 and eth.ip.src not in ip_devs ): + if( (eth.ip.udp.dport == 6667 or eth.ip.udp.dport == 6666) and eth.ip.src not in ip_devs ): try: data = eth.ip.udp.data devmac = mac_to_str( eth.src )