diff --git a/python/nav/portadmin/handlers.py b/python/nav/portadmin/handlers.py
index 250342272d..7c74e559df 100644
--- a/python/nav/portadmin/handlers.py
+++ b/python/nav/portadmin/handlers.py
@@ -301,8 +301,8 @@ def set_poe_state(self, interface: manage.Interface, state: PoeState):
raise NotImplementedError
def get_poe_states(
- self, interfaces: Sequence[manage.Interface] = None
- ) -> Dict[int, Optional[PoeState]]:
+ self, interfaces: Optional[Sequence[manage.Interface]] = None
+ ) -> Dict[str, Optional[PoeState]]:
"""Retrieves current PoE state for interfaces on this device.
:param interfaces: Optional sequence of interfaces to filter for, as fetching
@@ -311,7 +311,7 @@ def get_poe_states(
the default behavior is to filter on all Interface objects
registered for this device.
:returns: A dict mapping interfaces to their discovered PoE state.
- The key matches the `ifindex` attribute for the related
+ The key matches the `ifname` attribute for the related
Interface object.
The value will be None if the interface does not support PoE.
"""
@@ -338,3 +338,17 @@ class ProtocolError(ManagementError):
"""Raised when some non-categorized error in the underlying protocol occurred
during communication
"""
+
+
+class POENotSupportedError(ManagementError):
+ """Raised when an interface that does not support PoE is used in a context
+ where PoE support is expected
+ """
+
+
+class POEStateNotSupportedError(ManagementError):
+ """Raised when a PoE state is detected in a context where it is not supported"""
+
+
+class XMLParseError(ManagementError):
+ """Raised when failing to parse XML"""
diff --git a/python/nav/portadmin/napalm/juniper.py b/python/nav/portadmin/napalm/juniper.py
index 455e510cba..bdf3648560 100644
--- a/python/nav/portadmin/napalm/juniper.py
+++ b/python/nav/portadmin/napalm/juniper.py
@@ -28,12 +28,13 @@
"""
from __future__ import annotations
from operator import attrgetter
-from typing import List, Any, Dict, Tuple, Sequence
+from typing import List, Any, Dict, Tuple, Sequence, Optional
from django.template.loader import get_template
from napalm.base.exceptions import ConnectAuthError, ConnectionException
from jnpr.junos.op.vlan import VlanTable
from jnpr.junos.exception import RpcError
+from lxml.etree import ElementTree
from nav.napalm import connect as napalm_connect
from nav.enterprise.ids import VENDOR_ID_JUNIPER_NETWORKS_INC
@@ -44,6 +45,10 @@
AuthenticationError,
NoResponseError,
ProtocolError,
+ PoeState,
+ POEStateNotSupportedError,
+ POENotSupportedError,
+ XMLParseError,
)
from nav.junos.nav_views import (
EthernetSwitchingInterfaceTable,
@@ -102,6 +107,9 @@ class Juniper(ManagementHandler):
VENDOR = VENDOR_ID_JUNIPER_NETWORKS_INC
PROTOCOL = manage.ManagementProfile.PROTOCOL_NAPALM
+ POE_ENABLED = PoeState(state=1, name="ENABLED")
+ POE_DISABLED = PoeState(state=2, name="DISABLED")
+ POE_OPTIONS = [POE_ENABLED, POE_DISABLED]
def __init__(self, netbox: manage.Netbox, **kwargs):
super().__init__(netbox, **kwargs)
@@ -441,6 +449,111 @@ def raise_if_not_configurable(self):
if not self.profile:
raise DeviceNotConfigurableError("Device has no NAPALM profile")
+ def get_poe_state_options(self) -> Sequence[PoeState]:
+ """Returns the available options for enabling/disabling PoE on this netbox"""
+ return self.POE_OPTIONS
+
+ @wrap_unhandled_rpc_errors
+ def set_poe_state(self, interface: manage.Interface, state: PoeState):
+ """Set state for enabling/disabling PoE on this interface.
+ Available options should be retrieved using `get_poe_state_options`
+ """
+ if not isinstance(state, PoeState):
+ raise TypeError("state must be a PoeState object")
+ if state == self.POE_ENABLED:
+ template = get_template("portadmin/junos-enable-poe.djt")
+ elif state == self.POE_DISABLED:
+ template = get_template("portadmin/junos-disable-poe.djt")
+ else:
+ raise POEStateNotSupportedError(f"state {state} is not a valid state")
+ master, _ = split_master_unit(interface.ifname)
+ config = template.render({"ifname": master})
+ self.device.load_merge_candidate(config=config)
+
+ def get_poe_states(
+ self, interfaces: Optional[Sequence[manage.Interface]] = None
+ ) -> Dict[str, Optional[PoeState]]:
+ """Retrieves current PoE state for interfaces on this device.
+
+ :param interfaces: Optional sequence of interfaces to filter for, as fetching
+ data for all interfaces may be a waste of time if only a
+ single interface is needed. If this parameter is omitted,
+ the default behavior is to filter on all Interface objects
+ registered for this device.
+ :returns: A dict mapping interfaces to their discovered PoE state.
+ The key matches the `ifname` attribute for the related
+ Interface object.
+ The value will be None if the interface does not support PoE.
+ """
+ if not interfaces:
+ if self.netbox.interfaces:
+ interfaces = self.netbox.interfaces
+ else:
+ return {}
+ if len(interfaces) == 1:
+ interface = interfaces[0]
+ try:
+ state = self._get_single_poe_state(interface)
+ except POENotSupportedError:
+ state = None
+ return {interface.ifname: state}
+ else:
+ return self._get_poe_states_bulk(interfaces)
+
+ def _get_single_poe_state(self, interface: manage.Interface) -> PoeState:
+ tree = self._get_poe_interface_information(ifname=interface.ifname)
+ matching_elements = tree.xpath(
+ "//poe/interface-information-detail/interface-enabled-detail"
+ )
+ # Interfaces that do not support PoE will not have this element
+ if not matching_elements:
+ raise POENotSupportedError(
+ f"Interface {interface.ifname} does not support PoE"
+ )
+ if len(matching_elements) != 1:
+ raise XMLParseError(
+ f"Expected 1 matching element in xml response, "
+ f"{len(matching_elements)} found"
+ )
+ ifenabled = matching_elements[0].text.lower()
+ return self._poe_string_to_state(ifenabled)
+
+ def _get_poe_states_bulk(
+ self, interfaces: Sequence[manage.Interface]
+ ) -> Dict[str, Optional[PoeState]]:
+ tree = self._get_all_poe_interface_information()
+ interface_information_elements = tree.findall(".//interface-information")
+ ifname_to_state_dict = {}
+ for element in interface_information_elements:
+ ifname = element.findall(".//interface-name")[0].text.strip().lower()
+ ifenabled = element.findall(".//interface-enabled")[0].text.strip().lower()
+ ifname_to_state_dict[ifname] = self._poe_string_to_state(ifenabled)
+ ifindex_to_state_dict = {
+ interface.ifname: ifname_to_state_dict.get(interface.ifname.lower())
+ for interface in interfaces
+ }
+ return ifindex_to_state_dict
+
+ @wrap_unhandled_rpc_errors
+ def _get_all_poe_interface_information(self) -> ElementTree:
+ return self.device.device.rpc.get_poe_interface_information()
+
+ @wrap_unhandled_rpc_errors
+ def _get_poe_interface_information(self, ifname: str) -> ElementTree:
+ return self.device.device.rpc.get_poe_interface_information(ifname=ifname)
+
+ def _poe_string_to_state(self, state_str: str) -> PoeState:
+ """Converts from internal juniper state names to
+ corresponding PoeState objects
+ """
+ state_cleaned = state_str.strip().lower()
+ if state_cleaned == "enabled":
+ return self.POE_ENABLED
+ elif state_cleaned == "disabled":
+ return self.POE_DISABLED
+ else:
+ raise POEStateNotSupportedError(f"Unknown PoE state {state_str}")
+
# FIXME Implement dot1x fetcher methods
# dot1x authentication configuration fetchers aren't implemented yet, for lack
# of configured devices to test on
diff --git a/python/nav/portadmin/napalm/templates/portadmin/junos-disable-poe.djt b/python/nav/portadmin/napalm/templates/portadmin/junos-disable-poe.djt
new file mode 100644
index 0000000000..da2379be33
--- /dev/null
+++ b/python/nav/portadmin/napalm/templates/portadmin/junos-disable-poe.djt
@@ -0,0 +1 @@
+set poe interface {{ ifname }} disable
diff --git a/python/nav/portadmin/napalm/templates/portadmin/junos-enable-poe.djt b/python/nav/portadmin/napalm/templates/portadmin/junos-enable-poe.djt
new file mode 100644
index 0000000000..20a8a9e08c
--- /dev/null
+++ b/python/nav/portadmin/napalm/templates/portadmin/junos-enable-poe.djt
@@ -0,0 +1 @@
+delete poe interface {{ ifname }} disable
diff --git a/tests/unittests/portadmin/napalm/conftest.py b/tests/unittests/portadmin/napalm/conftest.py
new file mode 100644
index 0000000000..fb0d7394e7
--- /dev/null
+++ b/tests/unittests/portadmin/napalm/conftest.py
@@ -0,0 +1,50 @@
+import pytest
+from unittest.mock import Mock
+
+from nav.enterprise.ids import VENDOR_ID_JUNIPER_NETWORKS_INC
+from nav.models import manage
+from nav.portadmin.napalm.juniper import Juniper
+
+
+@pytest.fixture()
+def netbox_mock(interface1_mock, interface2_mock):
+ """Create netbox model mock object"""
+ netbox = Mock()
+ netbox.ip = '10.0.0.1'
+ netbox.type.get_enterprise_id.return_value = VENDOR_ID_JUNIPER_NETWORKS_INC
+ netbox.interfaces = [interface1_mock, interface2_mock]
+ yield netbox
+
+
+@pytest.fixture()
+def profile_mock():
+ """Create management profile model mock object"""
+ profile = Mock()
+ profile.protocol = manage.ManagementProfile.PROTOCOL_NAPALM
+ profile.PROTOCOL_NAPALM = manage.ManagementProfile.PROTOCOL_NAPALM
+ profile.configuration = {"driver": "mock"}
+ yield profile
+
+
+@pytest.fixture()
+def handler_mock(netbox_mock, profile_mock):
+ """Create management handler mock object"""
+ juniper = Juniper(netbox=netbox_mock)
+ juniper._profile = profile_mock
+ yield juniper
+
+
+@pytest.fixture()
+def interface1_mock():
+ interface = Mock()
+ interface.ifname = "ge-0/0/1"
+ interface.ifindex = 1
+ yield interface
+
+
+@pytest.fixture()
+def interface2_mock():
+ interface = Mock()
+ interface.ifname = "ge-0/0/2"
+ interface.ifindex = 2
+ yield interface
diff --git a/tests/unittests/portadmin/napalm/juniper_poe_test.py b/tests/unittests/portadmin/napalm/juniper_poe_test.py
new file mode 100644
index 0000000000..5c38b7ea4c
--- /dev/null
+++ b/tests/unittests/portadmin/napalm/juniper_poe_test.py
@@ -0,0 +1,196 @@
+import pytest
+from unittest.mock import Mock
+
+from lxml import etree
+
+from nav.portadmin.handlers import (
+ POEStateNotSupportedError,
+ POENotSupportedError,
+ XMLParseError,
+)
+from nav.portadmin.napalm.juniper import Juniper
+
+
+def test_returns_correct_state_options(handler_mock):
+ state_options = handler_mock.get_poe_state_options()
+ assert Juniper.POE_ENABLED in state_options
+ assert Juniper.POE_DISABLED in state_options
+
+
+def test_state_converter_returns_correct_states(handler_mock):
+ assert handler_mock._poe_string_to_state("enabled") == Juniper.POE_ENABLED
+ assert handler_mock._poe_string_to_state("disabled") == Juniper.POE_DISABLED
+
+
+def test_state_converter_raises_error_for_invalid_states(handler_mock):
+ with pytest.raises(POEStateNotSupportedError):
+ handler_mock._poe_string_to_state("invalid_state")
+
+
+class TestGetPoeStates:
+ def test_interfaces_from_db_is_used_if_input_is_none(self, handler_mock, xml_bulk):
+ expected_interfaces = handler_mock.netbox.interfaces
+ handler_mock._get_all_poe_interface_information = Mock(return_value=xml_bulk)
+ return_dict = handler_mock.get_poe_states()
+ for interface in expected_interfaces:
+ assert interface.ifname in return_dict
+
+ def test_interfaces_from_db_is_used_if_input_is_empty(self, handler_mock, xml_bulk):
+ expected_interfaces = handler_mock.netbox.interfaces
+ handler_mock._get_all_poe_interface_information = Mock(return_value=xml_bulk)
+ return_dict = handler_mock.get_poe_states([])
+ for interface in expected_interfaces:
+ assert interface.ifname in return_dict
+
+ def test_returns_empty_dict_if_no_input_and_no_interfaces_in_db(
+ self, handler_mock, xml_bulk
+ ):
+ handler_mock.netbox.interfaces = []
+ return_dict = handler_mock.get_poe_states()
+ assert return_dict == {}
+
+ def test_returns_correct_state_if_input_has_one_interface(
+ self, handler_mock, xml, interface1_mock
+ ):
+ handler_mock._get_poe_interface_information = Mock(return_value=xml)
+ return_dict = handler_mock.get_poe_states([interface1_mock])
+ assert return_dict[interface1_mock.ifname] == handler_mock.POE_ENABLED
+
+ def test_returns_correct_states_if_input_has_multiple_interfaces(
+ self, handler_mock, xml_bulk, interface1_mock, interface2_mock
+ ):
+ handler_mock._get_all_poe_interface_information = Mock(return_value=xml_bulk)
+ return_dict = handler_mock.get_poe_states([interface1_mock, interface2_mock])
+ assert return_dict[interface1_mock.ifname] == Juniper.POE_ENABLED
+ assert return_dict[interface2_mock.ifname] == Juniper.POE_DISABLED
+
+ def test_returns_none_for_single_interface_that_does_not_support_poe(
+ self, handler_mock, interface1_mock
+ ):
+ handler_mock._get_single_poe_state = Mock(side_effect=POENotSupportedError)
+ return_dict = handler_mock.get_poe_states([interface1_mock])
+ assert return_dict[interface1_mock.ifname] is None
+
+ def test_returns_none_for_multiple_interfaces_that_does_not_support_poe(
+ self, handler_mock, interface1_mock, interface2_mock
+ ):
+ bulk_return_dict = {interface1_mock.ifname: None, interface2_mock.ifname: None}
+ handler_mock._get_poe_states_bulk = Mock(return_value=bulk_return_dict)
+ return_dict = handler_mock.get_poe_states([interface1_mock, interface2_mock])
+ assert return_dict[interface1_mock.ifname] is None
+ assert return_dict[interface2_mock.ifname] is None
+
+
+class TestGetSinglePoeState:
+ def test_returns_correct_state_for_interface_that_exists_in_xml_response(
+ self, handler_mock, xml, interface1_mock
+ ):
+ handler_mock._get_poe_interface_information = Mock(return_value=xml)
+ state = handler_mock._get_single_poe_state(interface1_mock)
+ assert state == Juniper.POE_ENABLED
+
+ def test_raises_exception_if_no_interfaces_in_xml(
+ self, handler_mock, interface1_mock, xml_empty
+ ):
+ handler_mock._get_poe_interface_information = Mock(return_value=xml_empty)
+ with pytest.raises(POENotSupportedError):
+ handler_mock._get_single_poe_state(interface1_mock)
+
+ def test_raises_exception_if_multiple_interfaces_in_xml(
+ self, handler_mock, interface1_mock, xml_bulk_wrong_format
+ ):
+ handler_mock._get_poe_interface_information = Mock(
+ return_value=xml_bulk_wrong_format
+ )
+ with pytest.raises(XMLParseError):
+ handler_mock._get_single_poe_state(interface1_mock)
+
+
+class TestGetPoeStatesBulk:
+ def test_returns_correct_states(
+ self, handler_mock, xml_bulk, interface1_mock, interface2_mock
+ ):
+ handler_mock._get_all_poe_interface_information = Mock(return_value=xml_bulk)
+ states = handler_mock._get_poe_states_bulk([interface1_mock, interface2_mock])
+ assert states[interface1_mock.ifname] == Juniper.POE_ENABLED
+ assert states[interface2_mock.ifname] == Juniper.POE_DISABLED
+
+ def test_maps_interface_to_none_if_poe_not_supported(self, handler_mock, xml_bulk):
+ handler_mock._get_all_poe_interface_information = Mock(return_value=xml_bulk)
+ if_mock = Mock()
+ if_mock.ifname == "random_if"
+ if_mock.ifindex = 0
+ states = handler_mock._get_poe_states_bulk([if_mock])
+ assert states[if_mock.ifname] is None
+
+ def test_returns_none_values_if_no_interfaces_in_xml(
+ self, handler_mock, interface1_mock, interface2_mock, xml_empty
+ ):
+ handler_mock._get_all_poe_interface_information = Mock(return_value=xml_empty)
+ return_dict = handler_mock._get_poe_states_bulk(
+ [interface1_mock, interface2_mock]
+ )
+ assert return_dict[interface1_mock.ifname] is None
+ assert return_dict[interface2_mock.ifname] is None
+
+
+@pytest.fixture()
+def xml(interface1_mock):
+ """Creates a ElementTree containing poe information for one interface"""
+ tree_string = f"""
+
+
+ {interface1_mock.ifname}
+ Enabled
+
+ """
+ tree = etree.fromstring(tree_string)
+ yield tree
+
+
+@pytest.fixture()
+def xml_bulk_wrong_format(interface1_mock, interface2_mock):
+ """Creates a ElementTree with the format meant for a single interface in the response,
+ but it contains poe information for two interfaces
+ """
+ tree_string = f"""
+
+
+ {interface1_mock.ifname}
+ Enabled
+
+
+ {interface2_mock.ifname}
+ Enabled
+
+ """
+ tree = etree.fromstring(tree_string)
+ yield tree
+
+
+@pytest.fixture()
+def xml_bulk(interface1_mock, interface2_mock):
+ """Creates a ElementTree containing poe information for two interfaces"""
+ tree_string = f"""
+
+
+ {interface1_mock.ifname}
+ Enabled
+
+
+ {interface2_mock.ifname}
+ Disabled
+
+ """
+ tree = etree.fromstring(tree_string)
+ yield tree
+
+
+@pytest.fixture()
+def xml_empty():
+ """Creates a ElementTree containing no poe state for any interface"""
+ tree_string = f"""
+
+ """
+ tree = etree.fromstring(tree_string)
+ yield tree
diff --git a/tests/unittests/portadmin/napalm/juniper_test.py b/tests/unittests/portadmin/napalm/juniper_test.py
index 4f3fa2a9c5..58c088edae 100644
--- a/tests/unittests/portadmin/napalm/juniper_test.py
+++ b/tests/unittests/portadmin/napalm/juniper_test.py
@@ -20,31 +20,14 @@
from jnpr.junos.exception import RpcError
-from nav.enterprise.ids import VENDOR_ID_RESERVED, VENDOR_ID_JUNIPER_NETWORKS_INC
-from nav.models import manage
-from nav.portadmin.handlers import DeviceNotConfigurableError, ProtocolError
+from nav.enterprise.ids import VENDOR_ID_RESERVED
+from nav.portadmin.handlers import (
+ DeviceNotConfigurableError,
+ ProtocolError,
+)
from nav.portadmin.napalm.juniper import wrap_unhandled_rpc_errors, Juniper
-@pytest.fixture()
-def netbox_mock():
- """Create netbox model mock object"""
- netbox = Mock()
- netbox.ip = '10.0.0.1'
- netbox.type.get_enterprise_id.return_value = VENDOR_ID_JUNIPER_NETWORKS_INC
- yield netbox
-
-
-@pytest.fixture()
-def profile_mock():
- """Create management profile model mock object"""
- profile = Mock()
- profile.protocol = manage.ManagementProfile.PROTOCOL_NAPALM
- profile.PROTOCOL_NAPALM = manage.ManagementProfile.PROTOCOL_NAPALM
- profile.configuration = {"driver": "mock"}
- yield profile
-
-
class TestWrapUnhandledRpcErrors:
def test_rpcerrors_should_become_protocolerrors(self):
@wrap_unhandled_rpc_errors
@@ -64,7 +47,7 @@ def wrapped_function():
class TestJuniper:
- def test_juniper_device_returns_device_connection(self, netbox_mock, profile_mock):
+ def test_juniper_device_returns_device_connection(self, handler_mock):
driver = napalm.get_network_driver('mock')
device = driver(
hostname='foo',
@@ -73,10 +56,7 @@ def test_juniper_device_returns_device_connection(self, netbox_mock, profile_moc
optional_args={},
)
device.open()
- juniper = Juniper(netbox=netbox_mock)
- juniper._profile = profile_mock
-
- assert juniper.device
+ assert handler_mock.device
def test_juniper_device_raises_error_if_vendor_not_juniper(
self, netbox_mock, profile_mock