Skip to content

Commit

Permalink
feat: configuration for network and credentials
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuagruenstein committed Sep 22, 2024
1 parent adf7ced commit ad5bdd9
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 11 deletions.
8 changes: 5 additions & 3 deletions examples/example.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import os
from dataclasses import replace

from aiohttp import BasicAuth, ClientSession

from intellinet_pdu_ctrl.api import IPU
from intellinet_pdu_ctrl.types import OutletCommand


async def main() -> None:
Expand All @@ -16,8 +16,10 @@ async def main() -> None:
),
)
) as ipu:
await ipu.set_outlets(OutletCommand.ON, 0)
print(await ipu.get_status())
await ipu.set_network_configuration(
replace(await ipu.get_network_configuration(), hostname="robotpdu")
)
print(await ipu.get_network_configuration())


if __name__ == "__main__":
Expand Down
49 changes: 42 additions & 7 deletions intellinet_pdu_ctrl/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

from intellinet_pdu_ctrl.types import (
AllOutletsConfig,
NetworkConfiguration,
OutletCommand,
PDUStatus,
ThresholdsConfig,
UserVerifyResult,
)


Expand All @@ -33,6 +35,8 @@ def __init__(
):
self.session = session

assert self.session.auth is not None, "session must have auth set"

async def __aenter__(self) -> "IPU":
return self

Expand Down Expand Up @@ -64,8 +68,7 @@ async def _post_request(
)

async def get_status(self) -> PDUStatus:
e = await self._get_request(PDUEndpoints.status)
return PDUStatus.from_xml(e)
return PDUStatus.from_xml(await self._get_request(PDUEndpoints.status))

async def set_outlets_config(self, outlet_configs: AllOutletsConfig) -> None:
settings = dict[str, Any]()
Expand All @@ -77,13 +80,14 @@ async def set_outlets_config(self, outlet_configs: AllOutletsConfig) -> None:
await self._post_request(PDUEndpoints.config_pdu, data=settings)

async def get_outlets_config(self) -> AllOutletsConfig:
etree = await self._get_request(PDUEndpoints.config_pdu)

return AllOutletsConfig.from_xml(etree)
return AllOutletsConfig.from_xml(
await self._get_request(PDUEndpoints.config_pdu)
)

async def get_thresholds_config(self) -> ThresholdsConfig:
etree = await self._get_request(PDUEndpoints.thresholds)
return ThresholdsConfig.from_xml(etree)
return ThresholdsConfig.from_xml(
await self._get_request(PDUEndpoints.thresholds)
)

async def set_thresholds_config(self, threshold_config: ThresholdsConfig) -> None:
await self._post_request(
Expand All @@ -96,3 +100,34 @@ async def set_outlets(self, state: OutletCommand, *list_of_outlet_ids: int) -> N
outlet_states["submit"] = "Anwenden"

await self._get_request(PDUEndpoints.outlet, params=outlet_states)

async def set_credentials(self, new_credentials: aiohttp.BasicAuth) -> None:
current_credentials = self.session.auth
assert current_credentials is not None, "session must have auth set"

await self._post_request(
PDUEndpoints.users,
data=dict(
oldnm=current_credentials.login,
oldpas=current_credentials.password,
newnm=new_credentials.login,
newpas=new_credentials.password,
confirm=new_credentials.password,
),
)

status = await self.get_status()

assert status.user_verify_result == UserVerifyResult.CREDENTIALS_CHANGED

self.session._default_auth = new_credentials

async def get_network_configuration(self) -> NetworkConfiguration:
return NetworkConfiguration.from_xml(
await self._get_request(PDUEndpoints.network)
)

async def set_network_configuration(
self, network_config: NetworkConfiguration
) -> None:
await self._post_request(PDUEndpoints.network, data=network_config.to_dict())
51 changes: 51 additions & 0 deletions intellinet_pdu_ctrl/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,20 @@ def from_xml(cls, etree: et._Element) -> Self:
return cls(**config)


class UserVerifyResult(Enum):
NA = 0
CREDENTIALS_CHANGED = 1
CREDENTIALS_ERRORED = 2


@dataclass(frozen=True)
class PDUStatus:
current_amps: float
degree_celcius: int
humidity_percent: int
status: str # todo: make this an enum
outlet_states: tuple[OutletState, ...]
user_verify_result: UserVerifyResult

@classmethod
def from_xml(cls, e: et._Element) -> Self:
Expand All @@ -134,4 +141,48 @@ def from_xml(cls, e: et._Element) -> Self:
OutletState(extract_text_from_child(e, "outletStat{}".format(i)))
for i in range(0, 8)
),
user_verify_result=UserVerifyResult(
int(extract_text_from_child(e, "userVerifyRes"))
),
)


@dataclass(frozen=True)
class NetworkConfiguration:
hostname: str
ip_address: str
subnet_mask: str
gateway: str
enable_dhcp: bool
primary_dns_ip: str
secondary_dns_ip: str

@classmethod
def from_xml(cls, e: et._Element) -> Self:
dhcp_checkbox = cast(list[et._Element], e.xpath("//*[@id='dhcp']"))[0]
enable_dhcp = "checked" in dhcp_checkbox.attrib

return cls(
hostname=find_input_value_in_xml(e, "host"),
ip_address=find_input_value_in_xml(e, "ip"),
subnet_mask=find_input_value_in_xml(e, "mask"),
gateway=find_input_value_in_xml(e, "gate"),
enable_dhcp=enable_dhcp,
primary_dns_ip=find_input_value_in_xml(e, "dns1"),
secondary_dns_ip=find_input_value_in_xml(e, "dns2"),
)

def to_dict(self) -> dict[str, str]:
data = {
"host": self.hostname,
"ip": self.ip_address,
"mask": self.subnet_mask,
"gate": self.gateway,
"dns1": self.primary_dns_ip,
"dns2": self.secondary_dns_ip,
}

if self.enable_dhcp:
data["dhcp"] = "on"

return data
1 change: 0 additions & 1 deletion intellinet_pdu_ctrl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


def find_input_value_in_xml(et: et._Element, id: str) -> str:
print(type(et))
xpath = f"//*[@id='{id}']/@value | //*[@name='{id}']/@value"
result = cast(list[str] | None, et.xpath(xpath))
if not result:
Expand Down

0 comments on commit ad5bdd9

Please sign in to comment.