Skip to content

Commit

Permalink
Wallbox interaction (#116)
Browse files Browse the repository at this point in the history
* add fetching wallbox data, toggle wallbox
* add batteryToCar
* update README.md
---------

Co-authored-by: Max Dhom <[email protected]>
  • Loading branch information
mstv and mdhom authored Apr 28, 2024
1 parent cd9abc0 commit e680921
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 2 deletions.
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ TCP_IP = '192.168.1.57'
USERNAME = '[email protected]'
PASS = 'MySecurePassword'
KEY = 'abc123'
CONFIG = {}
CONFIG = {}
# CONFIG = {"powermeters": [{"index": 6}]}

print("local connection")
Expand All @@ -96,7 +96,7 @@ from e3dc import E3DC
USERNAME = '[email protected]'
PASS = 'MySecurePassword'
SERIALNUMBER = 'S10-012345678910'
CONFIG = {}
CONFIG = {}

print("web connection")
e3dc_obj = E3DC(E3DC.CONNECT_WEB, username=USERNAME, password=PASS, serialNumber = SERIALNUMBER, isPasswordMd5=False, configuration = CONFIG)
Expand Down Expand Up @@ -148,9 +148,19 @@ Poll returns a dictionary like the following:
- `get_powermeter_data()`
- `get_powermeters_data()`
- `get_power_settings()`
- `get_wallbox_data()`
- `set_battery_to_car_mode()`
- `set_power_limits()`
- `set_powersave()`
- `set_wallbox_max_charge_current()`
- `set_wallbox_schuko()`
- `set_wallbox_sunmode()`
- `set_weather_regulated_charge()`
- `toggle_wallbox_charging()`
- `toggle_wallbox_phases()`

- `sendWallboxRequest()`
- `sendWallboxSetRequest()`

See the full documentation on [ReadTheDocs](https://python-e3dc.readthedocs.io/en/latest/)

Expand Down
290 changes: 290 additions & 0 deletions e3dc/_e3dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import datetime
import hashlib
import struct
import time
import uuid
from calendar import monthrange
Expand Down Expand Up @@ -981,6 +982,295 @@ def get_system_status(self, keepAlive: bool = False):
outObj = {k: SystemStatusBools[v] for k, v in outObj.items()}
return outObj

def get_wallbox_data(self, wbIndex: int = 0, keepAlive: bool = False):
"""Polls the wallbox status via rscp protocol locally.
Args:
wbIndex (Optional[int]): Index of the wallbox to poll data for
keepAlive (Optional[bool]): True to keep connection alive
Returns:
dict: Dictionary containing the wallbox status structured as follows::
{
"appSoftware": <version of the app>,
"batteryToCar": <true if the wallbox may use the battery, otherwise false>,
"chargingActive": <true if charging is currently active, otherwise false>,
"chargingCanceled": <true if charging was manually canceled, otherwise false>,
"consumptionNet": <power currently consumed by the wallbox, provided by the grid in watts>,
"consumptionSun": <power currently consumed by the wallbox, provided by the solar panels in watts>,
"energyAll": <total consumed energy this month in watthours>,
"energyNet": <consumed net energy this month in watthours>,
"energySun": <consumed solar energy this month in watthours>,
"index": <index of the requested wallbox>,
"keyState": <state of the key switch at the wallbox>,
"maxChargeCurrent": <configured maximum charge current in A>,
"phases": <number of phases used for charging>,
"schukoOn": <true if the connected schuko of the wallbox is on, otherwise false>,
"soc": <state of charge>,
"sunModeOn": <true if sun-only-mode is active, false if mixed mode is active>
}
"""
req = self.sendRequest(
(
RscpTag.WB_REQ_DATA,
RscpType.Container,
[
(RscpTag.WB_INDEX, RscpType.UChar8, wbIndex),
(RscpTag.WB_REQ_EXTERN_DATA_ALG, RscpType.NoneType, None),
(RscpTag.WB_REQ_EXTERN_DATA_SUN, RscpType.NoneType, None),
(RscpTag.WB_REQ_EXTERN_DATA_NET, RscpType.NoneType, None),
(RscpTag.WB_REQ_APP_SOFTWARE, RscpType.NoneType, None),
(RscpTag.WB_REQ_KEY_STATE, RscpType.NoneType, None),
],
),
keepAlive=True,
)

outObj = {
"index": rscpFindTagIndex(req, RscpTag.WB_INDEX),
"appSoftware": rscpFindTagIndex(req, RscpTag.WB_APP_SOFTWARE),
}

extern_data_alg = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_ALG)
if extern_data_alg is not None:
extern_data = rscpFindTagIndex(extern_data_alg, RscpTag.WB_EXTERN_DATA)
status_byte = extern_data[2]
outObj["sunModeOn"] = (status_byte & 128) != 0
outObj["chargingCanceled"] = (status_byte & 64) != 0
outObj["chargingActive"] = (status_byte & 32) != 0
outObj["plugLocked"] = (status_byte & 16) != 0
outObj["plugged"] = (status_byte & 8) != 0
outObj["soc"] = extern_data[0]
outObj["phases"] = extern_data[1]
outObj["maxChargeCurrent"] = extern_data[3]
outObj["schukoOn"] = extern_data[5] != 0

extern_data_sun = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_SUN)
if extern_data_sun is not None:
extern_data = rscpFindTagIndex(extern_data_sun, RscpTag.WB_EXTERN_DATA)
outObj["consumptionSun"] = struct.unpack("h", extern_data[0:2])[0]
outObj["energySun"] = struct.unpack("i", extern_data[2:6])[0]

extern_data_net = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_NET)
if extern_data_net is not None:
extern_data = rscpFindTagIndex(extern_data_net, RscpTag.WB_EXTERN_DATA)
outObj["consumptionNet"] = struct.unpack("h", extern_data[0:2])[0]
outObj["energyNet"] = struct.unpack("i", extern_data[2:6])[0]

if "energySun" in outObj and "energyNet" in outObj:
outObj["energyAll"] = outObj["energyNet"] + outObj["energySun"]

key_state = rscpFindTag(req, RscpTag.WB_KEY_STATE)
if key_state is not None:
outObj["keyState"] = rscpFindTagIndex(key_state, RscpTag.WB_KEY_STATE)

req = self.sendRequest(
(RscpTag.EMS_REQ_BATTERY_TO_CAR_MODE, RscpType.NoneType, None),
keepAlive=keepAlive,
)
battery_to_car = rscpFindTag(req, RscpTag.EMS_BATTERY_TO_CAR_MODE)
if battery_to_car is not None:
outObj["batteryToCar"] = rscpFindTagIndex(
battery_to_car, RscpTag.EMS_BATTERY_TO_CAR_MODE
)

outObj = {k: v for k, v in sorted(outObj.items())}
return outObj

def set_wallbox_sunmode(
self, enable: bool, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Sets the sun mode of the wallbox via rscp protocol locally.
Args:
enable (bool): True to enable sun mode, otherwise false,
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=0, value=1 if enable else 2, wbIndex=wbIndex, keepAlive=keepAlive
)

def set_wallbox_schuko(
self, on: bool, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Sets the Schuko of the wallbox via rscp protocol locally.
Args:
on (bool): True to activate the Schuko, otherwise false
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success (wallbox has understood the request, but might have ignored an unsupported value)
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=5, value=1 if on else 0, wbIndex=wbIndex, keepAlive=keepAlive
)

def set_wallbox_max_charge_current(
self, max_charge_current: int, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Sets the maximum charge current of the wallbox via rscp protocol locally.
Args:
max_charge_current (int): maximum allowed charge current in A
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success (wallbox has understood the request, but might have clipped the value)
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=2,
value=max_charge_current,
request=RscpTag.WB_REQ_SET_PARAM_1,
wbIndex=wbIndex,
keepAlive=keepAlive,
)

def toggle_wallbox_charging(
self, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Toggles charging of the wallbox via rscp protocol locally.
Args:
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=4, value=1, wbIndex=wbIndex, keepAlive=keepAlive
)

def toggle_wallbox_phases(self, wbIndex: int = 0, keepAlive: bool = False) -> bool:
"""Toggles the number of phases used for charging by the wallbox between 1 and 3 via rscp protocol locally.
Args:
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
return self.sendWallboxSetRequest(
dataIndex=3, value=1, wbIndex=wbIndex, keepAlive=keepAlive
)

def sendWallboxRequest(
self,
dataIndex: int,
value: int,
request: RscpTag = RscpTag.WB_REQ_SET_EXTERN,
wbIndex: int = 0,
keepAlive: bool = False,
) -> Tuple[str | int | RscpTag, str | int | RscpType, Any]:
"""Sends a low-level request with WB_EXTERN_DATA to the wallbox via rscp protocol locally.
Args:
dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5)
value (int): byte value to be set in the WB_EXTERN_DATA array at the given index
request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2),
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
An object with the received data
"""
dataArray = bytearray([0, 0, 0, 0, 0, 0])
dataArray[dataIndex] = value
result = self.sendRequest(
(
RscpTag.WB_REQ_DATA,
RscpType.Container,
[
(RscpTag.WB_INDEX, RscpType.UChar8, wbIndex),
(
request,
RscpType.Container,
[
(RscpTag.WB_EXTERN_DATA, RscpType.ByteArray, dataArray),
(
RscpTag.WB_EXTERN_DATA_LEN,
RscpType.UChar8,
len(dataArray),
),
],
),
],
),
keepAlive=keepAlive,
)
return result

def sendWallboxSetRequest(
self,
dataIndex: int,
value: int,
request: RscpTag = RscpTag.WB_REQ_SET_EXTERN,
wbIndex: int = 0,
keepAlive: bool = False,
) -> bool:
"""Sends a low-level set request with WB_EXTERN_DATA to the wallbox via rscp protocol locally and evaluates the response.
Args:
dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5)
value (int): byte value to be set in the WB_EXTERN_DATA array at the given index
request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2),
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
response = self.sendWallboxRequest(
dataIndex, value, request, wbIndex, keepAlive
)

if response[0] != RscpTag.WB_DATA.name:
return False
responseData = response[2][-1]
return (
responseData[0][2:] == request.name[6:]
and responseData[1] != RscpType.Error.name
)

def set_battery_to_car_mode(self, enabled: bool, keepAlive: bool = False):
"""Sets whether the wallbox may use the battery.
Args:
enabled (bool): True to enable charging the car using the battery
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
enabledValue = 1 if enabled else 0

response = self.sendRequest(
(RscpTag.EMS_REQ_SET_BATTERY_TO_CAR_MODE, RscpType.UChar8, enabledValue),
keepAlive=keepAlive,
)

return response == (
RscpTag.EMS_SET_BATTERY_TO_CAR_MODE.name,
RscpType.UChar8.name,
enabledValue,
)

def get_batteries(self, keepAlive: bool = False):
"""Scans for installed batteries via rscp protocol.
Expand Down

0 comments on commit e680921

Please sign in to comment.