diff --git a/README.md b/README.md index 8a74c6c..a073674 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ TCP_IP = '192.168.1.57' USERNAME = 'test@test.com' PASS = 'MySecurePassword' KEY = 'abc123' -CONFIG = {} +CONFIG = {} # CONFIG = {"powermeters": [{"index": 6}]} print("local connection") @@ -96,7 +96,7 @@ from e3dc import E3DC USERNAME = 'test@test.com' PASS = 'MySecurePassword' SERIALNUMBER = 'S10-012345678910' -CONFIG = {} +CONFIG = {} print("web connection") e3dc_obj = E3DC(E3DC.CONNECT_WEB, username=USERNAME, password=PASS, serialNumber = SERIALNUMBER, isPasswordMd5=False, configuration = CONFIG) @@ -148,9 +148,19 @@ Poll returns a dictionary like the following: - `get_powermeter_data()` - `get_powermeters_data()` - `get_power_settings()` +- `get_wallbox_data()` +- `set_battery_to_car_mode()` - `set_power_limits()` - `set_powersave()` +- `set_wallbox_max_charge_current()` +- `set_wallbox_schuko()` +- `set_wallbox_sunmode()` - `set_weather_regulated_charge()` +- `toggle_wallbox_charging()` +- `toggle_wallbox_phases()` + +- `sendWallboxRequest()` +- `sendWallboxSetRequest()` See the full documentation on [ReadTheDocs](https://python-e3dc.readthedocs.io/en/latest/) diff --git a/e3dc/_e3dc.py b/e3dc/_e3dc.py index 5942a91..0764d24 100644 --- a/e3dc/_e3dc.py +++ b/e3dc/_e3dc.py @@ -7,6 +7,7 @@ import datetime import hashlib +import struct import time import uuid from calendar import monthrange @@ -981,6 +982,295 @@ def get_system_status(self, keepAlive: bool = False): outObj = {k: SystemStatusBools[v] for k, v in outObj.items()} return outObj + def get_wallbox_data(self, wbIndex: int = 0, keepAlive: bool = False): + """Polls the wallbox status via rscp protocol locally. + + Args: + wbIndex (Optional[int]): Index of the wallbox to poll data for + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + dict: Dictionary containing the wallbox status structured as follows:: + + { + "appSoftware": , + "batteryToCar": , + "chargingActive": , + "chargingCanceled": , + "consumptionNet": , + "consumptionSun": , + "energyAll": , + "energyNet": , + "energySun": , + "index": , + "keyState": , + "maxChargeCurrent": , + "phases": , + "schukoOn": , + "soc": , + "sunModeOn": + } + """ + req = self.sendRequest( + ( + RscpTag.WB_REQ_DATA, + RscpType.Container, + [ + (RscpTag.WB_INDEX, RscpType.UChar8, wbIndex), + (RscpTag.WB_REQ_EXTERN_DATA_ALG, RscpType.NoneType, None), + (RscpTag.WB_REQ_EXTERN_DATA_SUN, RscpType.NoneType, None), + (RscpTag.WB_REQ_EXTERN_DATA_NET, RscpType.NoneType, None), + (RscpTag.WB_REQ_APP_SOFTWARE, RscpType.NoneType, None), + (RscpTag.WB_REQ_KEY_STATE, RscpType.NoneType, None), + ], + ), + keepAlive=True, + ) + + outObj = { + "index": rscpFindTagIndex(req, RscpTag.WB_INDEX), + "appSoftware": rscpFindTagIndex(req, RscpTag.WB_APP_SOFTWARE), + } + + extern_data_alg = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_ALG) + if extern_data_alg is not None: + extern_data = rscpFindTagIndex(extern_data_alg, RscpTag.WB_EXTERN_DATA) + status_byte = extern_data[2] + outObj["sunModeOn"] = (status_byte & 128) != 0 + outObj["chargingCanceled"] = (status_byte & 64) != 0 + outObj["chargingActive"] = (status_byte & 32) != 0 + outObj["plugLocked"] = (status_byte & 16) != 0 + outObj["plugged"] = (status_byte & 8) != 0 + outObj["soc"] = extern_data[0] + outObj["phases"] = extern_data[1] + outObj["maxChargeCurrent"] = extern_data[3] + outObj["schukoOn"] = extern_data[5] != 0 + + extern_data_sun = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_SUN) + if extern_data_sun is not None: + extern_data = rscpFindTagIndex(extern_data_sun, RscpTag.WB_EXTERN_DATA) + outObj["consumptionSun"] = struct.unpack("h", extern_data[0:2])[0] + outObj["energySun"] = struct.unpack("i", extern_data[2:6])[0] + + extern_data_net = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_NET) + if extern_data_net is not None: + extern_data = rscpFindTagIndex(extern_data_net, RscpTag.WB_EXTERN_DATA) + outObj["consumptionNet"] = struct.unpack("h", extern_data[0:2])[0] + outObj["energyNet"] = struct.unpack("i", extern_data[2:6])[0] + + if "energySun" in outObj and "energyNet" in outObj: + outObj["energyAll"] = outObj["energyNet"] + outObj["energySun"] + + key_state = rscpFindTag(req, RscpTag.WB_KEY_STATE) + if key_state is not None: + outObj["keyState"] = rscpFindTagIndex(key_state, RscpTag.WB_KEY_STATE) + + req = self.sendRequest( + (RscpTag.EMS_REQ_BATTERY_TO_CAR_MODE, RscpType.NoneType, None), + keepAlive=keepAlive, + ) + battery_to_car = rscpFindTag(req, RscpTag.EMS_BATTERY_TO_CAR_MODE) + if battery_to_car is not None: + outObj["batteryToCar"] = rscpFindTagIndex( + battery_to_car, RscpTag.EMS_BATTERY_TO_CAR_MODE + ) + + outObj = {k: v for k, v in sorted(outObj.items())} + return outObj + + def set_wallbox_sunmode( + self, enable: bool, wbIndex: int = 0, keepAlive: bool = False + ) -> bool: + """Sets the sun mode of the wallbox via rscp protocol locally. + + Args: + enable (bool): True to enable sun mode, otherwise false, + wbIndex (Optional[int]): index of the requested wallbox, + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + True if success + False if error + """ + return self.sendWallboxSetRequest( + dataIndex=0, value=1 if enable else 2, wbIndex=wbIndex, keepAlive=keepAlive + ) + + def set_wallbox_schuko( + self, on: bool, wbIndex: int = 0, keepAlive: bool = False + ) -> bool: + """Sets the Schuko of the wallbox via rscp protocol locally. + + Args: + on (bool): True to activate the Schuko, otherwise false + wbIndex (Optional[int]): index of the requested wallbox, + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + True if success (wallbox has understood the request, but might have ignored an unsupported value) + False if error + """ + return self.sendWallboxSetRequest( + dataIndex=5, value=1 if on else 0, wbIndex=wbIndex, keepAlive=keepAlive + ) + + def set_wallbox_max_charge_current( + self, max_charge_current: int, wbIndex: int = 0, keepAlive: bool = False + ) -> bool: + """Sets the maximum charge current of the wallbox via rscp protocol locally. + + Args: + max_charge_current (int): maximum allowed charge current in A + wbIndex (Optional[int]): index of the requested wallbox, + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + True if success (wallbox has understood the request, but might have clipped the value) + False if error + """ + return self.sendWallboxSetRequest( + dataIndex=2, + value=max_charge_current, + request=RscpTag.WB_REQ_SET_PARAM_1, + wbIndex=wbIndex, + keepAlive=keepAlive, + ) + + def toggle_wallbox_charging( + self, wbIndex: int = 0, keepAlive: bool = False + ) -> bool: + """Toggles charging of the wallbox via rscp protocol locally. + + Args: + wbIndex (Optional[int]): index of the requested wallbox, + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + True if success + False if error + """ + return self.sendWallboxSetRequest( + dataIndex=4, value=1, wbIndex=wbIndex, keepAlive=keepAlive + ) + + def toggle_wallbox_phases(self, wbIndex: int = 0, keepAlive: bool = False) -> bool: + """Toggles the number of phases used for charging by the wallbox between 1 and 3 via rscp protocol locally. + + Args: + wbIndex (Optional[int]): index of the requested wallbox, + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + True if success + False if error + """ + return self.sendWallboxSetRequest( + dataIndex=3, value=1, wbIndex=wbIndex, keepAlive=keepAlive + ) + + def sendWallboxRequest( + self, + dataIndex: int, + value: int, + request: RscpTag = RscpTag.WB_REQ_SET_EXTERN, + wbIndex: int = 0, + keepAlive: bool = False, + ) -> Tuple[str | int | RscpTag, str | int | RscpType, Any]: + """Sends a low-level request with WB_EXTERN_DATA to the wallbox via rscp protocol locally. + + Args: + dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5) + value (int): byte value to be set in the WB_EXTERN_DATA array at the given index + request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2), + wbIndex (Optional[int]): index of the requested wallbox, + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + An object with the received data + """ + dataArray = bytearray([0, 0, 0, 0, 0, 0]) + dataArray[dataIndex] = value + result = self.sendRequest( + ( + RscpTag.WB_REQ_DATA, + RscpType.Container, + [ + (RscpTag.WB_INDEX, RscpType.UChar8, wbIndex), + ( + request, + RscpType.Container, + [ + (RscpTag.WB_EXTERN_DATA, RscpType.ByteArray, dataArray), + ( + RscpTag.WB_EXTERN_DATA_LEN, + RscpType.UChar8, + len(dataArray), + ), + ], + ), + ], + ), + keepAlive=keepAlive, + ) + return result + + def sendWallboxSetRequest( + self, + dataIndex: int, + value: int, + request: RscpTag = RscpTag.WB_REQ_SET_EXTERN, + wbIndex: int = 0, + keepAlive: bool = False, + ) -> bool: + """Sends a low-level set request with WB_EXTERN_DATA to the wallbox via rscp protocol locally and evaluates the response. + + Args: + dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5) + value (int): byte value to be set in the WB_EXTERN_DATA array at the given index + request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2), + wbIndex (Optional[int]): index of the requested wallbox, + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + True if success + False if error + """ + response = self.sendWallboxRequest( + dataIndex, value, request, wbIndex, keepAlive + ) + + if response[0] != RscpTag.WB_DATA.name: + return False + responseData = response[2][-1] + return ( + responseData[0][2:] == request.name[6:] + and responseData[1] != RscpType.Error.name + ) + + def set_battery_to_car_mode(self, enabled: bool, keepAlive: bool = False): + """Sets whether the wallbox may use the battery. + + Args: + enabled (bool): True to enable charging the car using the battery + keepAlive (Optional[bool]): True to keep connection alive + + Returns: + True if success + False if error + """ + enabledValue = 1 if enabled else 0 + + response = self.sendRequest( + (RscpTag.EMS_REQ_SET_BATTERY_TO_CAR_MODE, RscpType.UChar8, enabledValue), + keepAlive=keepAlive, + ) + + return response == ( + RscpTag.EMS_SET_BATTERY_TO_CAR_MODE.name, + RscpType.UChar8.name, + enabledValue, + ) + def get_batteries(self, keepAlive: bool = False): """Scans for installed batteries via rscp protocol.