From ad5bdd991c8f36814cdbf615108c20db853a6a3c Mon Sep 17 00:00:00 2001 From: Josh Gruenstein Date: Sun, 22 Sep 2024 16:19:20 -0400 Subject: [PATCH] feat: configuration for network and credentials --- examples/example.py | 8 +++--- intellinet_pdu_ctrl/api.py | 49 +++++++++++++++++++++++++++++----- intellinet_pdu_ctrl/types.py | 51 ++++++++++++++++++++++++++++++++++++ intellinet_pdu_ctrl/utils.py | 1 - 4 files changed, 98 insertions(+), 11 deletions(-) diff --git a/examples/example.py b/examples/example.py index f122b1d..e3f4542 100644 --- a/examples/example.py +++ b/examples/example.py @@ -1,10 +1,10 @@ import asyncio import os +from dataclasses import replace from aiohttp import BasicAuth, ClientSession from intellinet_pdu_ctrl.api import IPU -from intellinet_pdu_ctrl.types import OutletCommand async def main() -> None: @@ -16,8 +16,10 @@ async def main() -> None: ), ) ) as ipu: - await ipu.set_outlets(OutletCommand.ON, 0) - print(await ipu.get_status()) + await ipu.set_network_configuration( + replace(await ipu.get_network_configuration(), hostname="robotpdu") + ) + print(await ipu.get_network_configuration()) if __name__ == "__main__": diff --git a/intellinet_pdu_ctrl/api.py b/intellinet_pdu_ctrl/api.py index ab614e8..e0eee65 100644 --- a/intellinet_pdu_ctrl/api.py +++ b/intellinet_pdu_ctrl/api.py @@ -7,9 +7,11 @@ from intellinet_pdu_ctrl.types import ( AllOutletsConfig, + NetworkConfiguration, OutletCommand, PDUStatus, ThresholdsConfig, + UserVerifyResult, ) @@ -33,6 +35,8 @@ def __init__( ): self.session = session + assert self.session.auth is not None, "session must have auth set" + async def __aenter__(self) -> "IPU": return self @@ -64,8 +68,7 @@ async def _post_request( ) async def get_status(self) -> PDUStatus: - e = await self._get_request(PDUEndpoints.status) - return PDUStatus.from_xml(e) + return PDUStatus.from_xml(await self._get_request(PDUEndpoints.status)) async def set_outlets_config(self, outlet_configs: AllOutletsConfig) -> None: settings = dict[str, Any]() @@ -77,13 +80,14 @@ async def set_outlets_config(self, outlet_configs: AllOutletsConfig) -> None: await self._post_request(PDUEndpoints.config_pdu, data=settings) async def get_outlets_config(self) -> AllOutletsConfig: - etree = await self._get_request(PDUEndpoints.config_pdu) - - return AllOutletsConfig.from_xml(etree) + return AllOutletsConfig.from_xml( + await self._get_request(PDUEndpoints.config_pdu) + ) async def get_thresholds_config(self) -> ThresholdsConfig: - etree = await self._get_request(PDUEndpoints.thresholds) - return ThresholdsConfig.from_xml(etree) + return ThresholdsConfig.from_xml( + await self._get_request(PDUEndpoints.thresholds) + ) async def set_thresholds_config(self, threshold_config: ThresholdsConfig) -> None: await self._post_request( @@ -96,3 +100,34 @@ async def set_outlets(self, state: OutletCommand, *list_of_outlet_ids: int) -> N outlet_states["submit"] = "Anwenden" await self._get_request(PDUEndpoints.outlet, params=outlet_states) + + async def set_credentials(self, new_credentials: aiohttp.BasicAuth) -> None: + current_credentials = self.session.auth + assert current_credentials is not None, "session must have auth set" + + await self._post_request( + PDUEndpoints.users, + data=dict( + oldnm=current_credentials.login, + oldpas=current_credentials.password, + newnm=new_credentials.login, + newpas=new_credentials.password, + confirm=new_credentials.password, + ), + ) + + status = await self.get_status() + + assert status.user_verify_result == UserVerifyResult.CREDENTIALS_CHANGED + + self.session._default_auth = new_credentials + + async def get_network_configuration(self) -> NetworkConfiguration: + return NetworkConfiguration.from_xml( + await self._get_request(PDUEndpoints.network) + ) + + async def set_network_configuration( + self, network_config: NetworkConfiguration + ) -> None: + await self._post_request(PDUEndpoints.network, data=network_config.to_dict()) diff --git a/intellinet_pdu_ctrl/types.py b/intellinet_pdu_ctrl/types.py index 05ba8b9..2036d7d 100644 --- a/intellinet_pdu_ctrl/types.py +++ b/intellinet_pdu_ctrl/types.py @@ -115,6 +115,12 @@ def from_xml(cls, etree: et._Element) -> Self: return cls(**config) +class UserVerifyResult(Enum): + NA = 0 + CREDENTIALS_CHANGED = 1 + CREDENTIALS_ERRORED = 2 + + @dataclass(frozen=True) class PDUStatus: current_amps: float @@ -122,6 +128,7 @@ class PDUStatus: humidity_percent: int status: str # todo: make this an enum outlet_states: tuple[OutletState, ...] + user_verify_result: UserVerifyResult @classmethod def from_xml(cls, e: et._Element) -> Self: @@ -134,4 +141,48 @@ def from_xml(cls, e: et._Element) -> Self: OutletState(extract_text_from_child(e, "outletStat{}".format(i))) for i in range(0, 8) ), + user_verify_result=UserVerifyResult( + int(extract_text_from_child(e, "userVerifyRes")) + ), ) + + +@dataclass(frozen=True) +class NetworkConfiguration: + hostname: str + ip_address: str + subnet_mask: str + gateway: str + enable_dhcp: bool + primary_dns_ip: str + secondary_dns_ip: str + + @classmethod + def from_xml(cls, e: et._Element) -> Self: + dhcp_checkbox = cast(list[et._Element], e.xpath("//*[@id='dhcp']"))[0] + enable_dhcp = "checked" in dhcp_checkbox.attrib + + return cls( + hostname=find_input_value_in_xml(e, "host"), + ip_address=find_input_value_in_xml(e, "ip"), + subnet_mask=find_input_value_in_xml(e, "mask"), + gateway=find_input_value_in_xml(e, "gate"), + enable_dhcp=enable_dhcp, + primary_dns_ip=find_input_value_in_xml(e, "dns1"), + secondary_dns_ip=find_input_value_in_xml(e, "dns2"), + ) + + def to_dict(self) -> dict[str, str]: + data = { + "host": self.hostname, + "ip": self.ip_address, + "mask": self.subnet_mask, + "gate": self.gateway, + "dns1": self.primary_dns_ip, + "dns2": self.secondary_dns_ip, + } + + if self.enable_dhcp: + data["dhcp"] = "on" + + return data diff --git a/intellinet_pdu_ctrl/utils.py b/intellinet_pdu_ctrl/utils.py index 6d54ffb..aecd74e 100644 --- a/intellinet_pdu_ctrl/utils.py +++ b/intellinet_pdu_ctrl/utils.py @@ -4,7 +4,6 @@ def find_input_value_in_xml(et: et._Element, id: str) -> str: - print(type(et)) xpath = f"//*[@id='{id}']/@value | //*[@name='{id}']/@value" result = cast(list[str] | None, et.xpath(xpath)) if not result: