diff --git a/.vscode/launch.json b/.vscode/launch.json index f981039..0f50b48 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -83,6 +83,22 @@ "module": "tplink_omada_client.cli", "justMyCode": true, "args": ["clients"] + }, + { + "name": "CLI: Get Client", + "type": "debugpy", + "request": "launch", + "module": "tplink_omada_client.cli", + "justMyCode": true, + "args": ["client", "Nanoleaf", "--dump"] + }, + { + "name": "CLI: Upload certificate", + "type": "debugpy", + "request": "launch", + "module": "tplink_omada_client.cli", + "justMyCode": true, + "args": ["upload-certificate", "/workspaces/tplink-omada-api/omada3.pfx"] } ] diff --git a/pyproject.toml b/pyproject.toml index 7054808..fe50413 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "tplink_omada_client" -version = "1.3.12" +version = "1.3.13" authors = [ { name="Mark Godwin", email="author@example.com" }, ] diff --git a/src/tplink_omada_client/__init__.py b/src/tplink_omada_client/__init__.py index 95c5019..50b1f05 100644 --- a/src/tplink_omada_client/__init__.py +++ b/src/tplink_omada_client/__init__.py @@ -1,6 +1,6 @@ from .devices import OmadaSwitchPortDetails from .omadaclient import OmadaClient, OmadaSite -from .omadasiteclient import OmadaSiteClient, SwitchPortOverrides, AccessPointPortSettings +from .omadasiteclient import OmadaSiteClient, SwitchPortOverrides, AccessPointPortSettings, GatewayPortSettings, OmadaClientSettings, OmadaClientFixedAddress from . import definitions from . import exceptions from . import clients @@ -10,6 +10,9 @@ "OmadaSite", "OmadaSiteClient", "AccessPointPortSettings", + "GatewayPortSettings", + "OmadaClientSettings", + "OmadaClientFixedAddress", "SwitchPortOverrides", "OmadaSwitchPortDetails", "definitions", diff --git a/src/tplink_omada_client/cli/__init__.py b/src/tplink_omada_client/cli/__init__.py index f98d1ea..d715430 100644 --- a/src/tplink_omada_client/cli/__init__.py +++ b/src/tplink_omada_client/cli/__init__.py @@ -7,25 +7,27 @@ from tplink_omada_client.exceptions import LoginFailed from . import ( + command_access_point, + command_access_points, command_block_client, + command_certificate, command_client, command_clients, command_default, command_devices, - command_known_clients, command_gateway, + command_known_clients, + command_poe, + command_reboot, + command_set_client_name, + command_set_device_led, command_switch, + command_switch_ports, command_switches, - command_access_points, - command_access_point, command_target, command_targets, - command_switch_ports, command_unblock_client, - command_set_device_led, - command_set_client_name, command_wan, - command_poe ) def main(argv: Union[Sequence[str], None] = None) -> int: @@ -44,6 +46,8 @@ def main(argv: Union[Sequence[str], None] = None) -> int: metavar='command', ) + command_access_point.arg_parser(subparsers) + command_access_points.arg_parser(subparsers) command_block_client.arg_parser(subparsers) command_client.arg_parser(subparsers) command_clients.arg_parser(subparsers) @@ -51,18 +55,18 @@ def main(argv: Union[Sequence[str], None] = None) -> int: command_devices.arg_parser(subparsers) command_gateway.arg_parser(subparsers) command_known_clients.arg_parser(subparsers) + command_poe.arg_parser(subparsers) + command_reboot.arg_parser(subparsers) + command_certificate.arg_parser(subparsers) + command_set_client_name.arg_parser(subparsers) + command_set_device_led.arg_parser(subparsers) command_switch.arg_parser(subparsers) - command_switches.arg_parser(subparsers) - command_access_points.arg_parser(subparsers) - command_access_point.arg_parser(subparsers) command_switch_ports.arg_parser(subparsers) + command_switches.arg_parser(subparsers) command_target.arg_parser(subparsers) command_targets.arg_parser(subparsers) command_unblock_client.arg_parser(subparsers) - command_set_device_led.arg_parser(subparsers) - command_set_client_name.arg_parser(subparsers) command_wan.arg_parser(subparsers) - command_poe.arg_parser(subparsers) try: args = parser.parse_args(args=argv) diff --git a/src/tplink_omada_client/cli/command_certificate.py b/src/tplink_omada_client/cli/command_certificate.py new file mode 100644 index 0000000..9e8a949 --- /dev/null +++ b/src/tplink_omada_client/cli/command_certificate.py @@ -0,0 +1,41 @@ +"""Implementation for 'set-certificate' command""" + +from argparse import ArgumentParser +import getpass + +from tplink_omada_client.definitions import GatewayPortMode, PoEMode + +from .config import get_target_config, to_omada_connection +from .util import get_target_argument + +async def command_certificate(args) -> int: + """Executes 'set-certificate' command""" + controller = get_target_argument(args) + config = get_target_config(controller) + + if args['password']: + password = args['password'] + else: + password = getpass.getpass() + + async with to_omada_connection(config) as client: + await client.set_certificate(args["cert-file"], password) + + print("Certificate uploaded successfully, and enabled. Please reboot the controller to apply the changes.") + return 0 + +def arg_parser(subparsers) -> None: + """Configures arguments parser for 'set-certificate' command""" + parser: ArgumentParser = subparsers.add_parser( + "set-certificate", + help="Sets a new certificate for the Omada controller." + ) + parser.set_defaults(func=command_certificate) + + parser.add_argument( + "cert-file", + help="The certificate file to upload. Must be in PKCS12 PFX format." + ) + + parser.add_argument('-p', '--password', help="The password for the certificate", required=False) + diff --git a/src/tplink_omada_client/cli/command_client.py b/src/tplink_omada_client/cli/command_client.py index 1158fed..2f09fd3 100644 --- a/src/tplink_omada_client/cli/command_client.py +++ b/src/tplink_omada_client/cli/command_client.py @@ -1,8 +1,9 @@ """Implementation for 'client' command""" -from argparse import _SubParsersAction +from argparse import _SubParsersAction, ArgumentError import datetime from tplink_omada_client.clients import OmadaWiredClientDetails, OmadaWirelessClientDetails +from tplink_omada_client.omadasiteclient import OmadaClientFixedAddress, OmadaClientSettings from .config import get_target_config, to_omada_connection from .util import dump_raw_data, get_client_mac, get_target_argument @@ -14,30 +15,53 @@ async def command_client(args) -> int: async with to_omada_connection(config) as client: site_client = await client.get_site_client(config.site) mac = await get_client_mac(site_client, args['mac']) - client = await site_client.get_client(mac) - print(f"Name: {client.name}") - print(f"MAC: {client.mac}") - if client.ip: - print(f"IP: {client.ip}") - if client.host_name: - print(f"Hostname: {client.host_name}") - print(f"Blocked: {client.is_blocked}") - if client.is_active: - uptime = str(datetime.timedelta(seconds=float(client.connection_time or 0))) - print(f"Uptime: {uptime}") - if isinstance(client, OmadaWiredClientDetails): - if client.connect_dev_type == 'switch': - print(f"Switch: {client.switch_name} ({client.switch_mac})") - print(f"Switch port: {client.port}") - elif client.connect_dev_type == 'gateway': - print(f"Gateway: {client.gateway_name} ({client.gateway_mac})") - elif isinstance(client, OmadaWirelessClientDetails): - print(f"SSID: {client.ssid}") - print(f"Access Point: {client.ap_name} ({client.ap_mac})") + + if args['set_name'] or args['lock_to_ap'] or args['unlock'] or args['fixed_ip'] or args['dynamic_ip']: + settings = OmadaClientSettings() + if args['set_name']: + settings.name = args['set_name'] + if args['lock_to_ap']: + settings.lock_to_aps = args['lock_to_ap'] + if args['unlock']: + settings.lock_to_aps = [] + if args['dynamic_ip']: + settings.fixed_address = OmadaClientFixedAddress() + elif args['fixed_ip']: + if not args['network']: + raise ArgumentError(args["network"], "Network ID must be specified when reserving an IP address") + settings.fixed_address = OmadaClientFixedAddress(network_id=args['network'], ip_address=args['fixed_ip']) + client = await site_client.update_client(mac, settings) + else: + client = await site_client.get_client(mac) + print_client(client) dump_raw_data(args, client) return 0 +def print_client(client): + print(f"Name: {client.name}") + print(f"MAC: {client.mac}") + if client.ip: + print(f"IP: {client.ip}") + if client.host_name: + print(f"Hostname: {client.host_name}") + print(f"Blocked: {client.is_blocked}") + if client.is_active: + uptime = str(datetime.timedelta(seconds=float(client.connection_time or 0))) + print(f"Uptime: {uptime}") + if isinstance(client, OmadaWiredClientDetails): + if client.connect_dev_type == 'switch': + print(f"Switch: {client.switch_name} ({client.switch_mac})") + print(f"Switch port: {client.port}") + elif client.connect_dev_type == 'gateway': + print(f"Gateway: {client.gateway_name} ({client.gateway_mac})") + elif isinstance(client, OmadaWirelessClientDetails): + print(f"SSID: {client.ssid}") + print(f"Access Point: {client.ap_name} ({client.ap_mac})") + +def list_of_strings(arg): + return arg.split(',') + def arg_parser(subparsers: _SubParsersAction) -> None: """Configures arguments parser for 'client' command""" client_parser = subparsers.add_parser( @@ -48,6 +72,19 @@ def arg_parser(subparsers: _SubParsersAction) -> None: "mac", help="The MAC address or name of the client", ) + client_parser.add_argument( + '-sn', '--set-name', help="Set the client's name", metavar="NAME" + ) + lock_grp = client_parser.add_mutually_exclusive_group() + lock_grp.add_argument('-l', '--lock-to-ap', help="Lock the client to the specified access point(s)", metavar="MACs", type=list_of_strings) + lock_grp.add_argument('-u', '--unlock', help="Unlock the client", action='store_true') + + fixed_ip_grp = client_parser.add_argument_group('IP Reservation') + fixed_ip_en_dis_grp = fixed_ip_grp.add_mutually_exclusive_group() + fixed_ip_en_dis_grp.add_argument('-ip', '--fixed-ip', help="Reserve the client's IP address") + fixed_ip_en_dis_grp.add_argument('-dyn', '--dynamic-ip', help="Remove the client's IP reservation", action='store_true') + fixed_ip_grp.add_argument('-n', '--network', help="Network ID for reservation") + client_parser.add_argument('-d', '--dump', help="Output raw client information", action='store_true') client_parser.set_defaults(func=command_client) diff --git a/src/tplink_omada_client/cli/command_reboot.py b/src/tplink_omada_client/cli/command_reboot.py new file mode 100644 index 0000000..73c4e49 --- /dev/null +++ b/src/tplink_omada_client/cli/command_reboot.py @@ -0,0 +1,29 @@ +"""Implementation for 'reboot' command""" + +from argparse import ArgumentParser +import getpass + +from tplink_omada_client.definitions import GatewayPortMode, PoEMode + +from .config import get_target_config, to_omada_connection +from .util import get_target_argument + +async def command_reboot(args) -> int: + """Executes 'reboot' command""" + controller = get_target_argument(args) + config = get_target_config(controller) + + async with to_omada_connection(config) as client: + reboot_time = await client.reboot() + + print(f"Controller is rebooting, and should be back up in approximately {reboot_time} seconds.") + return 0 + +def arg_parser(subparsers) -> None: + """Configures arguments parser for 'gateway' command""" + parser: ArgumentParser = subparsers.add_parser( + "reboot", + help="Reboot the Omada Controller" + ) + parser.set_defaults(func=command_reboot) + diff --git a/src/tplink_omada_client/cli/command_set_client_name.py b/src/tplink_omada_client/cli/command_set_client_name.py index 43ef8fc..156e674 100644 --- a/src/tplink_omada_client/cli/command_set_client_name.py +++ b/src/tplink_omada_client/cli/command_set_client_name.py @@ -2,8 +2,11 @@ from argparse import _SubParsersAction +from tplink_omada_client.cli.command_client import print_client +from tplink_omada_client.omadasiteclient import OmadaClientSettings + from .config import get_target_config, to_omada_connection -from .util import get_target_argument +from .util import dump_raw_data, get_client_mac, get_target_argument async def command_set_client_name(args) -> int: """Executes 'set-client-name' command""" @@ -12,9 +15,15 @@ async def command_set_client_name(args) -> int: async with to_omada_connection(config) as client: site_client = await client.get_site_client(config.site) - mac = await site_client.get_client(args['mac']) + mac = await get_client_mac(site_client, args['mac']) + name = args['name'] - await site_client.set_client_name(mac, name) + client = await site_client.update_client(mac, OmadaClientSettings(name=name)) + + print_client(client) + + dump_raw_data(args, client) + return 0 def arg_parser(subparsers: _SubParsersAction) -> None: @@ -24,8 +33,9 @@ def arg_parser(subparsers: _SubParsersAction) -> None: help="Sets the name of an omada client") parser.add_argument( "mac", - help="The MAC address of the client to set the name for", + help="The MAC address or name of the client to set the name for", ) parser.add_argument("name", help="The new name of the client") + parser.add_argument('-d', '--dump', help="Output raw client information", action='store_true') parser.set_defaults(func=command_set_client_name) diff --git a/src/tplink_omada_client/omadaapiconnection.py b/src/tplink_omada_client/omadaapiconnection.py index 178aad7..5222d40 100644 --- a/src/tplink_omada_client/omadaapiconnection.py +++ b/src/tplink_omada_client/omadaapiconnection.py @@ -5,7 +5,7 @@ import re from urllib.parse import urlsplit, urljoin -from aiohttp import client_exceptions, CookieJar +from aiohttp import Payload, client_exceptions, CookieJar from aiohttp.client import ClientSession from awesomeversion import AwesomeVersion @@ -104,7 +104,7 @@ async def login(self) -> str: auth = {"username": self._username, "password": self._password} response = await self._do_request( - "post", self.format_url("login"), payload=auth + "post", self.format_url("login"), json=auth ) self._csrf_token = response["token"] @@ -167,16 +167,16 @@ async def iterate_pages(self, url: str, params: Optional[dict[str, Any]]=None) - for item in data: yield item - async def request(self, method: str, url: str, params=None, payload=None) -> Any: + async def request(self, method: str, url: str, params=None, json=None, data: Optional[Payload] = None) -> Any: """Perform a request specific to the controlller, with authentication""" if not await self._check_login(): await self.login() - return await self._do_request(method, url, params=params, payload=payload) - + return await self._do_request(method, url, params=params, json=json, data=data) + async def _do_request( - self, method: str, url: str, params=None, payload=None + self, method: str, url: str, params=None, json=None, data: Optional[Payload] = None ) -> Any: """Perform a request on the controller, and unpack the response.""" @@ -194,7 +194,8 @@ async def _do_request( url, params=params, headers=headers, - json=payload, + json=json, + data=data, ssl=self._verify_ssl, ) as response: diff --git a/src/tplink_omada_client/omadaclient.py b/src/tplink_omada_client/omadaclient.py index aea6995..0c0e6f1 100644 --- a/src/tplink_omada_client/omadaclient.py +++ b/src/tplink_omada_client/omadaclient.py @@ -1,6 +1,9 @@ """ Simple Http client for Omada controller REST api. """ +import os from typing import NamedTuple, Optional, Union +from aiohttp import MultipartWriter from aiohttp.client import ClientSession +from multidict import CIMultiDict from .omadasiteclient import OmadaSiteClient from .omadaapiconnection import OmadaApiConnection @@ -92,3 +95,47 @@ async def _get_site_id(self, site_name: str): return site_id raise SiteNotFound(f"Site '{site_name}' not found") + + async def reboot(self) -> int: + """ + Reboot the Omada controller. + + Returns the estimated number of seconds until the reboot finishes. + """ + url = self._api.format_url("cmd/reboot") + result = await self._api.request("post", url) + + return result["rebootTime"] + + + async def set_certificate(self, file: str, cert_password: str): + """Upload a new PKCS12 PFX certificate to the controller.""" + + base_name = os.path.basename(file) + with open(file, "rb") as upload_file: + cert_data = upload_file.read() + + with MultipartWriter("form-data") as mpwriter: + file_part = mpwriter.append(cert_data, CIMultiDict({'Content-Type': 'application/x-pkcs12'})) + file_part.set_content_disposition("form-data", name="file", filename=base_name) + + data_part = mpwriter.append_json({"cerName": base_name}) + data_part.set_content_disposition("form-data", name="data") + + url = self._api.format_url("files/controller/certificate") + upload_result = await self._api.request("post", url, data=mpwriter) + cert_id = upload_result["cerId"] + cert_name = upload_result["cerName"] + + payload = { + "certificate": { + "cerId": cert_id, + "cerName": cert_name, + "cerType": "PFX", + "enable": True, + "keyPassword": cert_password, + } + } + url = self._api.format_url("controller/setting") + await self._api.request("patch", url, json=payload) + diff --git a/src/tplink_omada_client/omadasiteclient.py b/src/tplink_omada_client/omadasiteclient.py index 0d3bfbd..2a80826 100644 --- a/src/tplink_omada_client/omadasiteclient.py +++ b/src/tplink_omada_client/omadasiteclient.py @@ -92,6 +92,33 @@ def __init__( ): self.enable_poe = enable_poe +class OmadaClientFixedAddress: + """ + Describes a fixed IP address reservation for a client + """ + def __init__( + self, + network_id: Optional[str] = None, + ip_address: Optional[str] = None, + ): + self.network_id = network_id + self.ip_address = ip_address + +class OmadaClientSettings: + """ + Settings that can be applied to a client + """ + def __init__( + self, + name: Optional[str] = None, + lock_to_aps: Optional[list[str]] = None, + fixed_address: Optional[OmadaClientFixedAddress] = None, + + ): + self.name = name + self.lock_to_aps = lock_to_aps + self.fixed_address = fixed_address + class OmadaSiteClient: """Client for querying an Omada site's devices.""" @@ -132,6 +159,41 @@ async def get_client(self, mac_or_client: Union[str, OmadaNetworkClient]) -> Oma return OmadaWirelessClientDetails(result) else: return OmadaWiredClientDetails(result) + + async def update_client(self, mac_or_client: Union[str, OmadaNetworkClient], settings: OmadaClientSettings): + """Update configuration of a client""" + if isinstance(mac_or_client, OmadaConnectedClient): + mac = mac_or_client.mac + else: + mac = mac_or_client + + payload = {} + if settings.name: + payload["name"] = settings.name + if settings.lock_to_aps is not None: + payload["clientLockToApSetting"] = { + "enable": len(settings.lock_to_aps) > 0, + "aps": settings.lock_to_aps + } + if settings.fixed_address: + if settings.fixed_address.ip_address: + payload["ipSetting"] = { + "useFixedAddr": True, + "netId": settings.fixed_address.network_id, + "ip": settings.fixed_address.ip_address + } + else: + payload["ipSetting"] = { + "useFixedAddr": False + } + if payload == {}: + return await self.get_client(mac_or_client) + + result = await self._api.request("patch", self._api.format_url(f"clients/{mac}", self._site_id), json=payload) + if result.get("wireless"): + return OmadaWirelessClientDetails(result) + else: + return OmadaWiredClientDetails(result) async def get_connected_clients(self) -> AsyncIterable[OmadaConnectedClient]: """Get the clients connected to the site network.""" @@ -345,7 +407,7 @@ async def update_access_point_port( result = await self._api.request( "patch", self._api.format_url(f"eaps/{access_point.mac}", self._site_id), - payload=payload, + json=payload, ) updated_ap = OmadaAccessPoint(result) @@ -397,7 +459,7 @@ async def update_switch_port( await self._api.request( "patch", self._api.format_url(f"switches/{mac}/ports/{port.port}", self._site_id), - payload=payload, + json=payload, ) # Read back the new port settings @@ -451,7 +513,7 @@ async def start_firmware_upgrade( await self._api.request( "post", self._api.format_url(f"cmd/devices/{mac}/onlineUpgrade", self._site_id), - payload=payload, + json=payload, ) return True @@ -502,7 +564,7 @@ async def set_gateway_wan_port_connect_state(self, port_id: int, connect: bool, payload = {"portId": port_id, "operation": 1 if connect else 0} result = await self._api.request( - "post", self._api.format_url(f"cmd/gateways/{mac}/{'ipv6State' if ipv6 else 'internetState'}", self._site_id), payload=payload) + "post", self._api.format_url(f"cmd/gateways/{mac}/{'ipv6State' if ipv6 else 'internetState'}", self._site_id), json=payload) return OmadaGatewayPortStatus(result) async def set_gateway_port_settings(self, port_id: int, settings: GatewayPortSettings, mac_or_device: Union[str, OmadaDevice, None] = None) -> OmadaGatewayPortConfig: @@ -533,7 +595,7 @@ async def set_gateway_port_settings(self, port_id: int, settings: GatewayPortSet } await self._api.request( - "patch", self._api.format_url(gw.resource_path, self._site_id), payload=payload) + "patch", self._api.format_url(gw.resource_path, self._site_id), json=payload) # The result data includes an incomplete representation of the gateway port state, so we just request a new update return await self.get_gateway_port(port_id, mac) @@ -549,13 +611,13 @@ async def set_led_setting(self, mac_or_device: Union[str, OmadaDevice], setting: await self._api.request( "patch", self._api.format_url(device.resource_path, self._site_id), - payload=payload, + json=payload, ) return True async def set_client_name(self, mac_or_client: Union[str, OmadaNetworkClient], name): - """Sets the name of a client""" + """Sets the name of a client (Deprecated)""" if isinstance(mac_or_client, OmadaConnectedClient): mac = mac_or_client.mac else: @@ -564,7 +626,7 @@ async def set_client_name(self, mac_or_client: Union[str, OmadaNetworkClient], n await self._api.request( "patch", self._api.format_url(f"clients/{mac}", self._site_id), - payload=payload, + json=payload, ) return True