diff --git a/e3dc/_e3dc.py b/e3dc/_e3dc.py index 86e2fdf..6c86a5d 100644 --- a/e3dc/_e3dc.py +++ b/e3dc/_e3dc.py @@ -74,7 +74,7 @@ def __init__(self, connectType: int, **kwargs: Any) -> None: ipAddress (str): IP address of the E3DC system - required for CONNECT_LOCAL key (str): encryption key as set in the E3DC settings - required for CONNECT_LOCAL serialNumber (str): the serial number of the system to monitor - required for CONNECT_WEB - isPasswordMd5 (Optional[bool]): indicates whether the password is already md5 digest (recommended, default = True) - required for CONNECT_WEB + isPasswordMd5 (bool): indicates whether the password is already md5 digest (recommended, default = True) - required for CONNECT_WEB configuration (Optional[dict]): dict containing details of the E3DC configuration. {"pvis": [{"index": 0, "strings": 2, "phases": 3}], "powermeters": [{"index": 0}], "batteries": [{"index": 0, "dcbs": 1}]} """ self.connectType = connectType @@ -208,8 +208,8 @@ def sendRequest( Args: request: the request to send - retries (Optional[int]): number of retries - keepAlive (Optional[bool]): True to keep connection alive + retries (int): number of retries. Defaults to 3. + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: An object with the received data @@ -250,8 +250,8 @@ def sendRequestTag( Args: tag (str): the request to send - retries (Optional[int]): number of retries - keepAlive (Optional[bool]): True to keep connection alive + retries (int): number of retries. Defaults to 3. + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: An object with the received data @@ -272,7 +272,7 @@ def poll(self, keepAlive: bool = False): """Polls via rscp protocol. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the condensed status information structured as follows:: @@ -331,7 +331,7 @@ def poll_switches(self, keepAlive: bool = False): """This function uses the RSCP interface to poll the switch status. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: list of the switches:: @@ -359,7 +359,7 @@ def poll_switches(self, keepAlive: bool = False): descList = switchDesc[2] # get the payload of the container statusList = switchStatus[2] - switchList = [] + switchList: List[Dict[str, Any]] = [] for switch in range(len(descList)): switchID = rscpFindTagIndex(descList[switch], RscpTag.HA_DATAPOINT_INDEX) @@ -387,7 +387,7 @@ def set_switch_onoff( Args: switchID (int): id of the switch value (str): value - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: True/False @@ -416,7 +416,7 @@ def get_idle_periods(self, keepAlive: bool = False): """Poll via rscp protocol to get idle periods. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the idle periods structured as follows:: @@ -427,15 +427,15 @@ def get_idle_periods(self, keepAlive: bool = False): { "day": , "start": - [ + ( , - ], + ), "end": - [ + ( , - ], + ), "active": } ], @@ -444,15 +444,15 @@ def get_idle_periods(self, keepAlive: bool = False): { "day": , "start": - [ + ( , - ], + ), "end": - [ + ( , - ], + ), "active": } ] @@ -465,23 +465,26 @@ def get_idle_periods(self, keepAlive: bool = False): if idlePeriodsRaw[0] != RscpTag.EMS_GET_IDLE_PERIODS: return None - idlePeriods = {"idleCharge": [{}] * 7, "idleDischarge": [{}] * 7} + idlePeriods: Dict[str, List[Dict[str, Any]]] = { + "idleCharge": [] * 7, + "idleDischarge": [] * 7, + } # initialize for period in idlePeriodsRaw[2]: - active = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_ACTIVE) + active: bool = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_ACTIVE) typ = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_TYPE) - day = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_DAY) + day: int = rscpFindTagIndex(period, RscpTag.EMS_IDLE_PERIOD_DAY) start = rscpFindTag(period, RscpTag.EMS_IDLE_PERIOD_START) - startHour = rscpFindTagIndex(start, RscpTag.EMS_IDLE_PERIOD_HOUR) - startMin = rscpFindTagIndex(start, RscpTag.EMS_IDLE_PERIOD_MINUTE) + startHour: int = rscpFindTagIndex(start, RscpTag.EMS_IDLE_PERIOD_HOUR) + startMin: int = rscpFindTagIndex(start, RscpTag.EMS_IDLE_PERIOD_MINUTE) end = rscpFindTag(period, RscpTag.EMS_IDLE_PERIOD_END) - endHour = rscpFindTagIndex(end, RscpTag.EMS_IDLE_PERIOD_HOUR) - endMin = rscpFindTagIndex(end, RscpTag.EMS_IDLE_PERIOD_MINUTE) + endHour: int = rscpFindTagIndex(end, RscpTag.EMS_IDLE_PERIOD_HOUR) + endMin: int = rscpFindTagIndex(end, RscpTag.EMS_IDLE_PERIOD_MINUTE) periodObj = { "day": day, - "start": [startHour, startMin], - "end": [endHour, endMin], + "start": (startHour, startMin), + "end": (endHour, endMin), "active": active, } @@ -506,15 +509,15 @@ def set_idle_periods( { "day": , "start": - [ + ( , - ], + ), "end": - [ + ( , - ], + ), "active": } ], @@ -523,26 +526,26 @@ def set_idle_periods( { "day": , "start": - [ + ( , - ], + ), "end": - [ + ( , - ], + ), "active": } ] } - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: True if success False if error """ - periodList = [] + periodList: List[Tuple[RscpTag, RscpType, Any]] = [] if "idleCharge" not in idlePeriods and "idleDischarge" not in idlePeriods: raise ValueError("neither key idleCharge nor idleDischarge in object") @@ -682,7 +685,7 @@ def get_db_data_timestamp( Args: startTimestamp (int): UNIX timestampt from where the db data should be collected timespanSeconds (int): number of seconds for which the data should be collected - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the stored db information structured as follows:: @@ -763,7 +766,7 @@ def get_db_data( startDate (datetime.date): start date for timespan, default today. Depending on timespan given, the startDate is automatically adjusted to the first of the month or the year timespan (str): string specifying the time span ["DAY", "MONTH", "YEAR"] - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the stored db information structured as follows:: @@ -811,7 +814,7 @@ def get_system_info_static(self, keepAlive: bool = False): """Polls the static system info via rscp protocol. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. """ self.deratePercent = ( self.sendRequestTag(RscpTag.EMS_REQ_DERATE_AT_PERCENT_VALUE, keepAlive=True) @@ -879,7 +882,7 @@ def get_system_info(self, keepAlive: bool = False): """Polls the system info via rscp protocol. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the system info structured as follows:: @@ -924,7 +927,7 @@ def get_system_status(self, keepAlive: bool = False): """Polls the system status via rscp protocol. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the system status structured as follows:: @@ -986,7 +989,8 @@ def get_batteries(self, keepAlive: bool = False): """Scans for installed batteries via rscp protocol. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. + Returns: list[dict]: List containing the found batteries as follows.: [ @@ -994,7 +998,7 @@ def get_batteries(self, keepAlive: bool = False): ] """ maxBatteries = 8 - outObj = [] + outObj: List[Dict[str, int]] = [] for batIndex in range(maxBatteries): try: req = self.sendRequest( @@ -1034,7 +1038,7 @@ def get_battery_data( Args: batIndex (Optional[int]): battery index dcbs (Optional[list]): dcb list - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the battery data structured as follows:: @@ -1170,7 +1174,7 @@ def get_battery_data( dcbCount = rscpFindTagIndex(req, RscpTag.BAT_DCB_COUNT) deviceStateContainer = rscpFindTag(req, RscpTag.BAT_DEVICE_STATE) - outObj = { + outObj: Dict[str, Any] = { "asoc": rscpFindTagIndex(req, RscpTag.BAT_ASOC), "chargeCycles": rscpFindTagIndex(req, RscpTag.BAT_CHARGE_CYCLES), "current": rscpFindTagIndex(req, RscpTag.BAT_CURRENT), @@ -1255,9 +1259,9 @@ def get_battery_data( # Initialize default values for DCB sensorCount = 0 - temperatures = [] + temperatures: List[float] = [] seriesCellCount = 0 - voltages = [] + voltages: List[float] = [] # Set temperatures, if available for the device temperatures_raw = rscpFindTag(req, RscpTag.BAT_DCB_ALL_CELL_TEMPERATURES) @@ -1283,7 +1287,7 @@ def get_battery_data( for cell in range(0, seriesCellCount): voltages.append(voltages_data[cell][2]) - dcbobj = { + dcbobj: Dict[str, Any] = { "current": rscpFindTagIndex(info, RscpTag.BAT_DCB_CURRENT), "currentAvg30s": rscpFindTagIndex( info, RscpTag.BAT_DCB_CURRENT_AVG_30S @@ -1358,7 +1362,7 @@ def get_batteries_data( Args: batteries (Optional[dict]): batteries dict - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: Returns a list of batteries data @@ -1366,7 +1370,7 @@ def get_batteries_data( if batteries is None: batteries = self.batteries - outObj = [] + outObj: List[Dict[str, Any]] = [] for battery in batteries: if "dcbs" in battery: @@ -1389,7 +1393,8 @@ def get_pvis(self, keepAlive: bool = False): """Scans for installed pvis via rscp protocol. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. + Returns: list[dict]: List containing the found pvis as follows.:: [ @@ -1397,7 +1402,7 @@ def get_pvis(self, keepAlive: bool = False): ] """ maxPvis = 8 - outObj = [] + outObj: List[Dict[str, Any]] = [] for pviIndex in range(maxPvis): req = self.sendRequest( ( @@ -1447,7 +1452,7 @@ def get_pvi_data( pviIndex (int): pv inverter index strings (Optional[list]): string list phases (Optional[list]): phase list - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the pvi data structured as follows:: @@ -1555,7 +1560,7 @@ def get_pvi_data( frequency = rscpFindTag(req, RscpTag.PVI_FREQUENCY_UNDER_OVER) deviceState = rscpFindTag(req, RscpTag.PVI_DEVICE_STATE) - outObj = { + outObj: Dict[str, Any] = { "acMaxApparentPower": rscpFindTagIndex( rscpFindTag(req, RscpTag.PVI_AC_MAX_APPARENTPOWER), RscpTag.PVI_VALUE ), @@ -1740,7 +1745,7 @@ def get_pvis_data( Args: pvis (Optional[dict]): pvis dict - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: Returns a list of pvi data @@ -1748,7 +1753,7 @@ def get_pvis_data( if pvis is None: pvis = self.pvis - outObj = [] + outObj: List[Dict[str, Any]] = [] for pvi in pvis: if "strings" in pvi: @@ -1778,7 +1783,7 @@ def get_powermeters(self, keepAlive: bool = False): """Scans for installed power meters via rscp protocol. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: List containing the found powermeters as follows.:: @@ -1789,7 +1794,7 @@ def get_powermeters(self, keepAlive: bool = False): ] """ maxPowermeters = 8 - outObj = [] + outObj: List[Dict[str, Any]] = [] for pmIndex in range( maxPowermeters ): # max 8 powermeters according to E3DC spec @@ -1823,7 +1828,7 @@ def get_powermeter_data(self, pmIndex: int | None = None, keepAlive: bool = Fals Args: pmIndex (Optional[int]): power meter index - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the power data structured as follows:: @@ -1912,7 +1917,7 @@ def get_powermeters_data( Args: powermeters (Optional[dict]): powermeters dict - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: list[dict]: Returns a list of powermeters data @@ -1920,7 +1925,7 @@ def get_powermeters_data( if powermeters is None: powermeters = self.powermeters - outObj = [] + outObj: List[Dict[str, Any]] = [] for powermeter in powermeters: outObj.append( @@ -1938,7 +1943,7 @@ def get_power_settings(self, keepAlive: bool = False): """Polls the power settings via rscp protocol. Args: - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: dict: Dictionary containing the power settings structured as follows:: @@ -1994,7 +1999,7 @@ def set_power_limits( max_charge (Optional[int]): maximum charge power max_discharge (Optional[int]: maximum discharge power discharge_start (Optional[int]: power where discharged is started - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: 0 if success @@ -2057,7 +2062,7 @@ def set_powersave(self, enable: bool, keepAlive: bool = False): Args: enable (bool): True/False - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: 0 if success @@ -2091,7 +2096,7 @@ def set_weather_regulated_charge(self, enable: bool, keepAlive: bool = False): Args: enable (bool): True/False - keepAlive (Optional[bool]): True to keep connection alive + keepAlive (bool): True to keep connection alive. Defaults to False. Returns: 0 if success diff --git a/e3dc/_e3dc_rscp_local.py b/e3dc/_e3dc_rscp_local.py index 55fcbf4..4563aaf 100644 --- a/e3dc/_e3dc_rscp_local.py +++ b/e3dc/_e3dc_rscp_local.py @@ -103,7 +103,7 @@ def sendRequest( except RSCPKeyError: self.disconnect() raise - except: + except Exception: self.disconnect() raise CommunicationError @@ -125,7 +125,7 @@ def connect(self) -> None: self.socket.connect((self.ip, PORT)) self.processedData = None self.connected = True - except: + except Exception: self.disconnect() raise CommunicationError self.encdec = RSCPEncryptDecrypt(self.key) diff --git a/e3dc/_e3dc_rscp_web.py b/e3dc/_e3dc_rscp_web.py index 08edb25..67e5c06 100644 --- a/e3dc/_e3dc_rscp_web.py +++ b/e3dc/_e3dc_rscp_web.py @@ -121,7 +121,7 @@ def __init__( self.ws = WebSocketApp( REMOTE_ADDRESS, on_message=lambda _, msg: self.on_message( # pyright: ignore [reportUnknownLambdaType] - msg + msg # pyright: ignore [reportUnknownArgumentType] ), on_close=lambda _: self.reset(), # pyright: ignore [reportUnknownLambdaType] on_error=lambda _: self.reset(), # pyright: ignore [reportUnknownLambdaType] @@ -130,7 +130,7 @@ def __init__( def reset(self): """Method to reset E3DC rscp web instance.""" - self.ws.close() + self.ws.close() # pyright: ignore [reportUnknownMemberType] self.conId: int = 0 self.authLevel = None self.virtConId = None @@ -413,7 +413,9 @@ def _sendRequest_internal( def connect(self): """Connect to E3DC system.""" self.reset() - self.thread = threading.Thread(target=self.ws.run_forever) + self.thread = threading.Thread( + target=self.ws.run_forever # pyright: ignore [reportUnknownMemberType, reportUnknownArgumentType] + ) self.thread.start() diff --git a/e3dc/_rscpLib.py b/e3dc/_rscpLib.py index 90a9b1c..aa37180 100644 --- a/e3dc/_rscpLib.py +++ b/e3dc/_rscpLib.py @@ -10,7 +10,7 @@ import struct import time import zlib -from typing import Any, Tuple +from typing import Any, List, Tuple from ._rscpTags import ( RscpTag, @@ -85,7 +85,10 @@ def rscpFindTag( if decodedMsg[0] == tagStr: return decodedMsg if isinstance(decodedMsg[2], list): - for msg in decodedMsg[2]: + msgList: List[ + Tuple[str | int | RscpTag, str | int | RscpType, Any] + ] = decodedMsg[2] + for msg in msgList: msgValue = rscpFindTag(msg, tag) if msgValue is not None: return msgValue @@ -171,7 +174,8 @@ def rscpEncode( elif rscptype == RscpType.Container: if isinstance(data, list): newData = b"" - for dataChunk in data: + dataList: List[Tuple[str | int | RscpTag, str | int | RscpType, Any]] = data + for dataChunk in dataList: newData += rscpEncode( dataChunk[0], dataChunk[1], dataChunk[2] ) # transform each dataChunk into byte array @@ -269,7 +273,7 @@ def rscpDecode( if type_ == RscpType.Container: # this is a container: parse the inside - dataList = [] + dataList: List[Tuple[str | int | RscpTag, str | int | RscpType, Any]] = [] curByte = headerSize while curByte < headerSize + length: innerData, usedLength = rscpDecode(data[curByte:]) diff --git a/pyproject.toml b/pyproject.toml index 4ff36ff..8bfffb3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,7 +73,7 @@ exclude = [".venv", ".git", "build", "docs"] max-line-length = 88 docstring-convention = "google" # E722 should be fixed -extend-ignore = ["E203", "E302", "E501", "W293", "E722"] +extend-ignore = ["E203", "E302", "E501", "W293"] [tool.isort] profile = "black" @@ -83,8 +83,4 @@ disable = "C0330, C0326" [tool.pyright] ignore = ["tools/*", "build/*", "dist/*", ".venv/*"] -typeCheckingMode = "strict" -reportUnknownVariableType = "warning" -reportUnknownArgumentType = "warning" -reportUnknownMemberType = "warning" -reportUnknownParameterType = "warning" \ No newline at end of file +typeCheckingMode = "strict" \ No newline at end of file