From dbc9d52a787fa2877bb34a9609eaaf5c542fa78d Mon Sep 17 00:00:00 2001 From: RenierM26 <66512715+RenierM26@users.noreply.github.com> Date: Tue, 21 Jun 2022 19:24:43 +0200 Subject: [PATCH] Change library from bluepy to Bleak (with async) (#32) * Update __init__.py * Basic framework for bleak conversion. * Update __init__.py * Fix methods that return static data. * Fixed last few functions. * Ensure CaSe for discovery data, connect_lock for sensitive functions. -No passive scan mode available so I removed references. * Updated with latest changes on pyswitchbot * Update messages to latest released version. * Rework get device by type functions. * Add missing model key. * Update __init__.py * Update __init__.py * Update __init__.py * Update __init__.py * Update __init__.py * Restructure for non blocking async. * Attempt to improve reliability on weak signal device. * Update __init__.py * Update __init__.py * Update __init__.py * Update __init__.py * Update __init__.py * Update __init__.py * A fix for a few dict keys being overwritten. * Replace sleep with asyncio sleep. * oops, forgot to await asyncio.sleep. * Should probably keep the connect_lock on this function. * Remove client.read_gatt_char, as notification is already in memory after writing. --- setup.py | 4 +- switchbot/__init__.py | 446 ++++++++++++++++-------------------------- 2 files changed, 171 insertions(+), 279 deletions(-) diff --git a/setup.py b/setup.py index 874b8b8..56bd1e1 100644 --- a/setup.py +++ b/setup.py @@ -3,8 +3,8 @@ setup( name = 'PySwitchbot', packages = ['switchbot'], - install_requires=['bluepy'], - version = '0.13.3', + install_requires=['bleak'], + version = '0.14.0', description = 'A library to communicate with Switchbot', author='Daniel Hjelseth Hoyer', url='https://github.com/Danielhiversen/pySwitchbot/', diff --git a/switchbot/__init__.py b/switchbot/__init__.py index ac7ce23..e7b0077 100644 --- a/switchbot/__init__.py +++ b/switchbot/__init__.py @@ -1,13 +1,13 @@ """Library to handle connection with Switchbot.""" from __future__ import annotations +import asyncio import binascii import logging -from threading import Lock -import time from typing import Any +from uuid import UUID -import bluepy +import bleak DEFAULT_RETRY_COUNT = 3 DEFAULT_RETRY_TIMEOUT = 1 @@ -38,16 +38,16 @@ KEY_PASSWORD_PREFIX = "571" _LOGGER = logging.getLogger(__name__) -CONNECT_LOCK = Lock() +CONNECT_LOCK = asyncio.Lock() -def _sb_uuid(comms_type: str = "service") -> bluepy.btle.UUID: +def _sb_uuid(comms_type: str = "service") -> UUID | str: """Return Switchbot UUID.""" _uuid = {"tx": "002", "rx": "003", "service": "d00"} if comms_type in _uuid: - return bluepy.btle.UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b") + return UUID(f"cba20{_uuid[comms_type]}-224d-11e6-9fb8-0002a5d5c51b") return "Incorrect type, choose between: tx, rx or service" @@ -100,55 +100,68 @@ def _process_wosensorth(data: bytes) -> dict[str, object]: return _wosensorth_data -def _process_btle_adv_data(dev: bluepy.btle.ScanEntry) -> dict[str, Any]: - """Process bt le adv data.""" - _adv_data = {"mac_address": dev.addr} - _data = dev.getValue(22)[2:] - - supported_types: dict[str, dict[str, Any]] = { - "H": {"modelName": "WoHand", "func": _process_wohand}, - "c": {"modelName": "WoCurtain", "func": _process_wocurtain}, - "T": {"modelName": "WoSensorTH", "func": _process_wosensorth}, - } - - _model = chr(_data[0] & 0b01111111) - _adv_data["isEncrypted"] = bool(_data[0] & 0b10000000) - _adv_data["model"] = _model - if _model in supported_types: - _adv_data["data"] = supported_types[_model]["func"](_data) - _adv_data["data"]["rssi"] = dev.rssi - _adv_data["modelName"] = supported_types[_model]["modelName"] - else: - _adv_data["rawAdvData"] = dev.getValueText(22) - - return _adv_data - - class GetSwitchbotDevices: """Scan for all Switchbot devices and return by type.""" def __init__(self, interface: int = 0) -> None: """Get switchbot devices class constructor.""" - self._interface = interface + self._interface = f"hci{interface}" self._adv_data: dict[str, Any] = {} - def discover( + def detection_callback( self, - retry: int = DEFAULT_RETRY_COUNT, - scan_timeout: int = DEFAULT_SCAN_TIMEOUT, - passive: bool = False, - mac: str | None = None, - ) -> dict[str, Any]: + device: bleak.backends.device.BLEDevice, + advertisement_data: bleak.backends.scanner.AdvertisementData, + ) -> None: + """BTLE adv scan callback.""" + _device = device.address.replace(":", "").lower() + _service_data = list(advertisement_data.service_data.values())[0] + _model = chr(_service_data[0] & 0b01111111) + + supported_types: dict[str, dict[str, Any]] = { + "H": {"modelName": "WoHand", "func": _process_wohand}, + "c": {"modelName": "WoCurtain", "func": _process_wocurtain}, + "T": {"modelName": "WoSensorTH", "func": _process_wosensorth}, + } + + self._adv_data[_device] = { + "mac_address": device.address.lower(), + "rawAdvData": list(advertisement_data.service_data.values())[0], + "data": { + "rssi": device.rssi, + }, + } + + if _model in supported_types: + + self._adv_data[_device].update( + { + "isEncrypted": bool(_service_data[0] & 0b10000000), + "model": _model, + "modelName": supported_types[_model]["modelName"], + "data": supported_types[_model]["func"](_service_data), + } + ) + + self._adv_data[_device]["data"]["rssi"] = device.rssi + + async def discover( + self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT + ) -> dict: """Find switchbot devices and their advertisement data.""" + devices = None - with CONNECT_LOCK: - try: - devices = bluepy.btle.Scanner(self._interface).scan( - scan_timeout, passive - ) - except bluepy.btle.BTLEManagementError: - _LOGGER.error("Error scanning for switchbot devices", exc_info=True) + devices = bleak.BleakScanner( + filters={"UUIDs": [str(_sb_uuid())]}, + adapter=self._interface, + ) + devices.register_detection_callback(self.detection_callback) + + async with CONNECT_LOCK: + await devices.start() + await asyncio.sleep(scan_timeout) + await devices.stop() if devices is None: if retry < 1: @@ -161,29 +174,15 @@ def discover( "Error scanning for Switchbot devices. Retrying (remaining: %d)", retry, ) - time.sleep(DEFAULT_RETRY_TIMEOUT) - return self.discover( - retry=retry - 1, - scan_timeout=scan_timeout, - passive=passive, - mac=mac, - ) - - for dev in devices: - if dev.getValueText(7) == str(_sb_uuid()): - dev_id = dev.addr.replace(":", "") - if mac: - if dev.addr.lower() == mac.lower(): - self._adv_data[dev_id] = _process_btle_adv_data(dev) - else: - self._adv_data[dev_id] = _process_btle_adv_data(dev) + await asyncio.sleep(DEFAULT_RETRY_TIMEOUT) + return await self.discover(retry - 1, scan_timeout) return self._adv_data - def get_curtains(self) -> dict: + async def get_curtains(self) -> dict: """Return all WoCurtain/Curtains devices with services data.""" if not self._adv_data: - self.discover() + await self.discover() _curtain_devices = { device: data @@ -193,10 +192,10 @@ def get_curtains(self) -> dict: return _curtain_devices - def get_bots(self) -> dict: + async def get_bots(self) -> dict[str, Any] | None: """Return all WoHand/Bot devices with services data.""" if not self._adv_data: - self.discover() + await self.discover() _bot_devices = { device: data @@ -206,10 +205,10 @@ def get_bots(self) -> dict: return _bot_devices - def get_tempsensors(self) -> dict: + async def get_tempsensors(self) -> dict[str, Any] | None: """Return all WoSensorTH/Temp sensor devices with services data.""" if not self._adv_data: - self.discover() + await self.discover() _bot_temp = { device: data @@ -219,10 +218,10 @@ def get_tempsensors(self) -> dict: return _bot_temp - def get_device_data(self, mac: str) -> dict: + async def get_device_data(self, mac: str) -> dict[str, Any] | None: """Return data for specific device.""" if not self._adv_data: - self.discover() + await self.discover() _switchbot_data = { device: data @@ -233,7 +232,7 @@ def get_device_data(self, mac: str) -> dict: return _switchbot_data -class SwitchbotDevice(bluepy.btle.Peripheral): +class SwitchbotDevice: """Base Representation of a Switchbot Device.""" def __init__( @@ -244,13 +243,7 @@ def __init__( **kwargs: Any, ) -> None: """Switchbot base class constructor.""" - bluepy.btle.Peripheral.__init__( - self, - deviceAddr=None, - addrType=bluepy.btle.ADDR_TYPE_RANDOM, - iface=interface, - ) - self._interface = interface + self._interface = f"hci{interface}" self._mac = mac.replace("-", ":").lower() self._sb_adv_data: dict[str, Any] = {} self._scan_timeout: int = kwargs.pop("scan_timeout", DEFAULT_SCAN_TIMEOUT) @@ -261,179 +254,80 @@ def __init__( self._password_encoded = "%x" % ( binascii.crc32(password.encode("ascii")) & 0xFFFFFFFF ) + self._last_notification = bytearray() - # pylint: disable=arguments-differ - def _connect(self, retry: int, timeout: int | None = None) -> None: - _LOGGER.debug("Connecting to Switchbot") - - if retry < 1: # failsafe - self._stopHelper() - raise bluepy.btle.BTLEDisconnectError( - "Failed to connect to peripheral %s" % self._mac - ) - - if self._helper is None: - self._startHelper(self._interface) - - self._writeCmd("conn %s %s\n" % (self._mac, bluepy.btle.ADDR_TYPE_RANDOM)) - - rsp = self._getResp(["stat", "err"], timeout) - - while rsp and rsp["state"][0] in [ - "tryconn", - "scan", - ]: # Wait for any operations to finish. - rsp = self._getResp(["stat", "err"], timeout) - - # If operation in progress, disc is returned. - # Bluepy helper can't handle state. Execute stop, wait and retry. - if rsp and rsp["state"][0] == "disc": - _LOGGER.warning("Bluepy busy, waiting before retry") - self._stopHelper() - time.sleep(self._scan_timeout) - return self._connect(retry - 1, timeout) - - if rsp and rsp["rsp"][0] == "err": - errcode = rsp["code"][0] - _LOGGER.debug( - "Error trying to connect to peripheral %s, error: %s", - self._mac, - errcode, - ) - - if errcode == "connfail": # not terminal, can retry connection. - return self._connect(retry - 1, timeout) - - if errcode == "nomgmt": - raise bluepy.btle.BTLEManagementError( - "Management not available (permissions problem?)", rsp - ) - if errcode == "atterr": - raise bluepy.btle.BTLEGattError("Bluetooth command failed", rsp) - - raise bluepy.btle.BTLEException( - "Error from bluepy-helper (%s)" % errcode, rsp - ) - - if rsp is None or rsp["state"][0] != "conn": - self._stopHelper() - - if rsp is None: - _LOGGER.warning( - "Timed out while trying to connect to peripheral %s", self._mac - ) - - _LOGGER.warning("Bluehelper returned unable to connect state: %s", rsp) - - raise bluepy.btle.BTLEDisconnectError( - "Failed to connect to peripheral %s, rsp: %s" % (self._mac, rsp) - ) + async def _notification_handler(self, sender: int, data: bytearray) -> None: + """Handle notification responses.""" + self._last_notification = data def _commandkey(self, key: str) -> str: + """Add password to key if set.""" if self._password_encoded is None: return key key_action = key[3] key_suffix = key[4:] return KEY_PASSWORD_PREFIX + key_action + self._password_encoded + key_suffix - def _writekey(self, key: str) -> bool: - _LOGGER.debug("Prepare to send") - if self._helper is None or self.getState() == "disc": - return False - try: - hand = self.getCharacteristics(uuid=_sb_uuid("tx"))[0] - _LOGGER.debug("Sending command, %s", key) - hand.write(bytes.fromhex(key), withResponse=False) - except bluepy.btle.BTLEException: - _LOGGER.warning("Error sending command to Switchbot", exc_info=True) - raise - else: - _LOGGER.info("Successfully sent command to Switchbot (MAC: %s)", self._mac) - - return True - - def _subscribe(self) -> bool: - _LOGGER.debug("Subscribe to notifications") - if self._helper is None or self.getState() == "disc": - return False - enable_notify_flag = b"\x01\x00" # standard gatt flag to enable notification - try: - handle = self.getCharacteristics(uuid=_sb_uuid("rx"))[0] - notify_handle = handle.getHandle() + 1 - self.writeCharacteristic( - notify_handle, enable_notify_flag, withResponse=False - ) - except bluepy.btle.BTLEException: - _LOGGER.warning( - "Error while enabling notifications on Switchbot", exc_info=True - ) - raise + async def _sendcommand(self, key: str, retry: int) -> bytes: + """Send command to device and read response.""" + command = bytearray.fromhex(self._commandkey(key)) + notify_msg = b"" + _LOGGER.debug("Sending command to switchbot %s", command) - return True + if len(self._mac.split(":")) != 6: + raise ValueError("Expected MAC address, got %s" % repr(self._mac)) - def _readkey(self) -> bytes: - _LOGGER.debug("Prepare to read notification from switchbot") - if self._helper is None: - return b"\x00" - try: - receive_handle = self.getCharacteristics(uuid=_sb_uuid("rx")) - except bluepy.btle.BTLEException: - _LOGGER.warning( - "Error while reading notifications from Switchbot", exc_info=True - ) - else: - for char in receive_handle: - read_result: bytes = char.read() - return read_result + async with CONNECT_LOCK: + try: + async with bleak.BleakClient( + address_or_ble_device=self._mac, timeout=float(self._scan_timeout) + ) as client: + _LOGGER.debug("Connnected to switchbot: %s", client.is_connected) + + _LOGGER.debug("Subscribe to notifications") + await client.start_notify( + _sb_uuid(comms_type="rx"), self._notification_handler + ) - # Could disconnect before reading response. Assume it worked as this is executed after issueing command. - if self._helper and self.getState() == "disc": - return b"\x01" + _LOGGER.debug("Sending command, %s", key) + await client.write_gatt_char( + _sb_uuid(comms_type="tx"), command, False + ) - return b"\x00" + await asyncio.sleep( + 1.0 + ) # Bot needs pause. Otherwise notification could be missed. - def _sendcommand(self, key: str, retry: int, timeout: int | None = 40) -> bytes: - command = self._commandkey(key) - send_success = False - notify_msg = None - _LOGGER.debug("Sending command to switchbot %s", command) + notify_msg = self._last_notification + _LOGGER.info("Notification received: %s", notify_msg) - if len(self._mac.split(":")) != 6: - raise ValueError("Expected MAC address, got %s" % repr(self._mac)) + _LOGGER.debug("UnSubscribe to notifications") + await client.stop_notify(_sb_uuid(comms_type="rx")) - with CONNECT_LOCK: - try: - self._connect(retry, timeout) - send_success = self._subscribe() - except bluepy.btle.BTLEException: - _LOGGER.warning("Error connecting to Switchbot", exc_info=True) - else: - try: - send_success = self._writekey(command) - except bluepy.btle.BTLEException: - _LOGGER.warning( - "Error sending commands to Switchbot", exc_info=True + except (bleak.BleakError, asyncio.exceptions.TimeoutError): + + if retry < 1: + _LOGGER.error( + "Switchbot communication failed. Stopping trying", exc_info=True ) - else: - notify_msg = self._readkey() - finally: - self.disconnect() + return b"\x00" - if notify_msg and send_success: + _LOGGER.debug("Switchbot communication failed with:", exc_info=True) + + if notify_msg: if notify_msg == b"\x07": _LOGGER.error("Password required") elif notify_msg == b"\t": _LOGGER.error("Password incorrect") return notify_msg - if retry < 1: - _LOGGER.error( - "Switchbot communication failed. Stopping trying", exc_info=True - ) - return b"\x00" _LOGGER.warning("Cannot connect to Switchbot. Retrying (remaining: %d)", retry) - time.sleep(DEFAULT_RETRY_TIMEOUT) - return self._sendcommand(key, retry - 1) + + if retry < 1: # failsafe + return b"\x00" + + await asyncio.sleep(DEFAULT_RETRY_TIMEOUT) + return await self._sendcommand(key, retry - 1) def get_mac(self) -> str: """Return mac address of device.""" @@ -441,26 +335,23 @@ def get_mac(self) -> str: def get_battery_percent(self) -> Any: """Return device battery level in percent.""" - if not self._sb_adv_data.get("data"): + if not self._sb_adv_data: return None - return self._sb_adv_data["data"].get("battery") + return self._sb_adv_data["data"]["battery"] - def get_device_data( - self, - retry: int = DEFAULT_RETRY_COUNT, - interface: int | None = None, - passive: bool = False, - ) -> dict[str, Any]: + async def get_device_data( + self, retry: int = DEFAULT_RETRY_COUNT, interface: int | None = None + ) -> dict | None: """Find switchbot devices and their advertisement data.""" - _interface: int = interface if interface else self._interface + if interface: + _interface: int = interface + else: + _interface = int(self._interface.replace("hci", "")) dev_id = self._mac.replace(":", "") - _data = GetSwitchbotDevices(interface=_interface).discover( - retry=retry, - scan_timeout=self._scan_timeout, - passive=passive, - mac=self._mac, + _data = await GetSwitchbotDevices(interface=_interface).discover( + retry=retry, scan_timeout=self._scan_timeout ) if _data.get(dev_id): @@ -478,15 +369,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self._inverse: bool = kwargs.pop("inverse_mode", False) self._settings: dict[str, Any] = {} - def update(self, interface: int | None = None, passive: bool = False) -> None: + async def update(self, interface: int | None = None) -> None: """Update mode, battery percent and state of device.""" - self.get_device_data( - retry=self._retry_count, interface=interface, passive=passive - ) + await self.get_device_data(retry=self._retry_count, interface=interface) - def turn_on(self) -> bool: + async def turn_on(self) -> bool: """Turn device on.""" - result = self._sendcommand(ON_KEY, self._retry_count) + result = await self._sendcommand(ON_KEY, self._retry_count) if result[0] == 1: return True @@ -497,9 +386,9 @@ def turn_on(self) -> bool: return False - def turn_off(self) -> bool: + async def turn_off(self) -> bool: """Turn device off.""" - result = self._sendcommand(OFF_KEY, self._retry_count) + result = await self._sendcommand(OFF_KEY, self._retry_count) if result[0] == 1: return True @@ -509,9 +398,9 @@ def turn_off(self) -> bool: return False - def hand_up(self) -> bool: + async def hand_up(self) -> bool: """Raise device arm.""" - result = self._sendcommand(UP_KEY, self._retry_count) + result = await self._sendcommand(UP_KEY, self._retry_count) if result[0] == 1: return True @@ -521,9 +410,9 @@ def hand_up(self) -> bool: return False - def hand_down(self) -> bool: + async def hand_down(self) -> bool: """Lower device arm.""" - result = self._sendcommand(DOWN_KEY, self._retry_count) + result = await self._sendcommand(DOWN_KEY, self._retry_count) if result[0] == 1: return True @@ -533,9 +422,9 @@ def hand_down(self) -> bool: return False - def press(self) -> bool: + async def press(self) -> bool: """Press command to device.""" - result = self._sendcommand(PRESS_KEY, self._retry_count) + result = await self._sendcommand(PRESS_KEY, self._retry_count) if result[0] == 1: return True @@ -545,14 +434,14 @@ def press(self) -> bool: return False - def set_switch_mode( + async def set_switch_mode( self, switch_mode: bool = False, strength: int = 100, inverse: bool = False ) -> bool: """Change bot mode.""" mode_key = format(switch_mode, "b") + format(inverse, "b") strength_key = f"{strength:0{2}x}" # to hex with padding to double digit - result = self._sendcommand( + result = await self._sendcommand( DEVICE_SET_MODE_KEY + strength_key + mode_key, self._retry_count ) @@ -561,11 +450,11 @@ def set_switch_mode( return False - def set_long_press(self, duration: int = 0) -> bool: + async def set_long_press(self, duration: int = 0) -> bool: """Set bot long press duration.""" duration_key = f"{duration:0{2}x}" # to hex with padding to double digit - result = self._sendcommand( + result = await self._sendcommand( DEVICE_SET_EXTENDED_KEY + "08" + duration_key, self._retry_count ) @@ -574,9 +463,9 @@ def set_long_press(self, duration: int = 0) -> bool: return False - def get_basic_info(self) -> dict[str, Any] | None: + async def get_basic_info(self) -> dict[str, Any] | None: """Get device basic settings.""" - _data = self._sendcommand( + _data = await self._sendcommand( key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count ) @@ -601,7 +490,7 @@ def switch_mode(self) -> Any: # To get actual position call update() first. if not self._sb_adv_data.get("data"): return None - return self._sb_adv_data.get("switchMode") + return self._sb_adv_data["data"].get("switchMode") def is_on(self) -> Any: """Return switch state from cache.""" @@ -635,45 +524,43 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.ext_info_sum: dict[str, Any] = {} self.ext_info_adv: dict[str, Any] = {} - def open(self) -> bool: + async def open(self) -> bool: """Send open command.""" - result = self._sendcommand(OPEN_KEY, self._retry_count) + result = await self._sendcommand(OPEN_KEY, self._retry_count) if result[0] == 1: return True return False - def close(self) -> bool: + async def close(self) -> bool: """Send close command.""" - result = self._sendcommand(CLOSE_KEY, self._retry_count) + result = await self._sendcommand(CLOSE_KEY, self._retry_count) if result[0] == 1: return True return False - def stop(self) -> bool: + async def stop(self) -> bool: """Send stop command to device.""" - result = self._sendcommand(STOP_KEY, self._retry_count) + result = await self._sendcommand(STOP_KEY, self._retry_count) if result[0] == 1: return True return False - def set_position(self, position: int) -> bool: + async def set_position(self, position: int) -> bool: """Send position command (0-100) to device.""" position = (100 - position) if self._reverse else position hex_position = "%0.2X" % position - result = self._sendcommand(POSITION_KEY + hex_position, self._retry_count) + result = await self._sendcommand(POSITION_KEY + hex_position, self._retry_count) if result[0] == 1: return True return False - def update(self, interface: int | None = None, passive: bool = False) -> None: + async def update(self, interface: int | None = None) -> None: """Update position, battery percent and light level of device.""" - self.get_device_data( - retry=self._retry_count, interface=interface, passive=passive - ) + await self.get_device_data(retry=self._retry_count, interface=interface) def get_position(self) -> Any: """Return cached position (0-100) of Curtain.""" @@ -682,9 +569,9 @@ def get_position(self) -> Any: return None return self._sb_adv_data["data"].get("position") - def get_basic_info(self) -> dict[str, Any] | None: + async def get_basic_info(self) -> dict[str, Any] | None: """Get device basic settings.""" - _data = self._sendcommand( + _data = await self._sendcommand( key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count ) @@ -713,9 +600,11 @@ def get_basic_info(self) -> dict[str, Any] | None: return self._settings - def get_extended_info_summary(self) -> dict[str, Any] | None: + async def get_extended_info_summary(self) -> dict[str, Any] | None: """Get basic info for all devices in chain.""" - _data = self._sendcommand(key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count) + _data = await self._sendcommand( + key=CURTAIN_EXT_SUM_KEY, retry=self._retry_count + ) if _data in (b"\x07", b"\x00"): _LOGGER.error("Unsuccessfull, please try again") @@ -743,9 +632,12 @@ def get_extended_info_summary(self) -> dict[str, Any] | None: return self.ext_info_sum - def get_extended_info_adv(self) -> dict[str, Any] | None: + async def get_extended_info_adv(self) -> dict[str, Any] | None: """Get advance page info for device chain.""" - _data = self._sendcommand(key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count) + + _data = await self._sendcommand( + key=CURTAIN_EXT_ADV_KEY, retry=self._retry_count + ) if _data in (b"\x07", b"\x00"): _LOGGER.error("Unsuccessfull, please try again")