Skip to content

Commit

Permalink
Merge pull request #2666 from stveit/poe-juniper
Browse files Browse the repository at this point in the history
Add PoE config for Juniper devices
  • Loading branch information
stveit authored Nov 15, 2023
2 parents 04daaaf + 9659a76 commit bb794f2
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 31 deletions.
20 changes: 17 additions & 3 deletions python/nav/portadmin/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ def set_poe_state(self, interface: manage.Interface, state: PoeState):
raise NotImplementedError

def get_poe_states(
self, interfaces: Sequence[manage.Interface] = None
) -> Dict[int, Optional[PoeState]]:
self, interfaces: Optional[Sequence[manage.Interface]] = None
) -> Dict[str, Optional[PoeState]]:
"""Retrieves current PoE state for interfaces on this device.
:param interfaces: Optional sequence of interfaces to filter for, as fetching
Expand All @@ -311,7 +311,7 @@ def get_poe_states(
the default behavior is to filter on all Interface objects
registered for this device.
:returns: A dict mapping interfaces to their discovered PoE state.
The key matches the `ifindex` attribute for the related
The key matches the `ifname` attribute for the related
Interface object.
The value will be None if the interface does not support PoE.
"""
Expand All @@ -338,3 +338,17 @@ class ProtocolError(ManagementError):
"""Raised when some non-categorized error in the underlying protocol occurred
during communication
"""


class POENotSupportedError(ManagementError):
"""Raised when an interface that does not support PoE is used in a context
where PoE support is expected
"""


class POEStateNotSupportedError(ManagementError):
"""Raised when a PoE state is detected in a context where it is not supported"""


class XMLParseError(ManagementError):
"""Raised when failing to parse XML"""
115 changes: 114 additions & 1 deletion python/nav/portadmin/napalm/juniper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
"""
from __future__ import annotations
from operator import attrgetter
from typing import List, Any, Dict, Tuple, Sequence
from typing import List, Any, Dict, Tuple, Sequence, Optional

from django.template.loader import get_template
from napalm.base.exceptions import ConnectAuthError, ConnectionException
from jnpr.junos.op.vlan import VlanTable
from jnpr.junos.exception import RpcError
from lxml.etree import ElementTree

from nav.napalm import connect as napalm_connect
from nav.enterprise.ids import VENDOR_ID_JUNIPER_NETWORKS_INC
Expand All @@ -44,6 +45,10 @@
AuthenticationError,
NoResponseError,
ProtocolError,
PoeState,
POEStateNotSupportedError,
POENotSupportedError,
XMLParseError,
)
from nav.junos.nav_views import (
EthernetSwitchingInterfaceTable,
Expand Down Expand Up @@ -102,6 +107,9 @@ class Juniper(ManagementHandler):

VENDOR = VENDOR_ID_JUNIPER_NETWORKS_INC
PROTOCOL = manage.ManagementProfile.PROTOCOL_NAPALM
POE_ENABLED = PoeState(state=1, name="ENABLED")
POE_DISABLED = PoeState(state=2, name="DISABLED")
POE_OPTIONS = [POE_ENABLED, POE_DISABLED]

def __init__(self, netbox: manage.Netbox, **kwargs):
super().__init__(netbox, **kwargs)
Expand Down Expand Up @@ -441,6 +449,111 @@ def raise_if_not_configurable(self):
if not self.profile:
raise DeviceNotConfigurableError("Device has no NAPALM profile")

def get_poe_state_options(self) -> Sequence[PoeState]:
"""Returns the available options for enabling/disabling PoE on this netbox"""
return self.POE_OPTIONS

@wrap_unhandled_rpc_errors
def set_poe_state(self, interface: manage.Interface, state: PoeState):
"""Set state for enabling/disabling PoE on this interface.
Available options should be retrieved using `get_poe_state_options`
"""
if not isinstance(state, PoeState):
raise TypeError("state must be a PoeState object")
if state == self.POE_ENABLED:
template = get_template("portadmin/junos-enable-poe.djt")
elif state == self.POE_DISABLED:
template = get_template("portadmin/junos-disable-poe.djt")
else:
raise POEStateNotSupportedError(f"state {state} is not a valid state")
master, _ = split_master_unit(interface.ifname)
config = template.render({"ifname": master})
self.device.load_merge_candidate(config=config)

def get_poe_states(
self, interfaces: Optional[Sequence[manage.Interface]] = None
) -> Dict[str, Optional[PoeState]]:
"""Retrieves current PoE state for interfaces on this device.
:param interfaces: Optional sequence of interfaces to filter for, as fetching
data for all interfaces may be a waste of time if only a
single interface is needed. If this parameter is omitted,
the default behavior is to filter on all Interface objects
registered for this device.
:returns: A dict mapping interfaces to their discovered PoE state.
The key matches the `ifname` attribute for the related
Interface object.
The value will be None if the interface does not support PoE.
"""
if not interfaces:
if self.netbox.interfaces:
interfaces = self.netbox.interfaces
else:
return {}
if len(interfaces) == 1:
interface = interfaces[0]
try:
state = self._get_single_poe_state(interface)
except POENotSupportedError:
state = None
return {interface.ifname: state}
else:
return self._get_poe_states_bulk(interfaces)

def _get_single_poe_state(self, interface: manage.Interface) -> PoeState:
tree = self._get_poe_interface_information(ifname=interface.ifname)
matching_elements = tree.xpath(
"//poe/interface-information-detail/interface-enabled-detail"
)
# Interfaces that do not support PoE will not have this element
if not matching_elements:
raise POENotSupportedError(
f"Interface {interface.ifname} does not support PoE"
)
if len(matching_elements) != 1:
raise XMLParseError(
f"Expected 1 matching element in xml response, "
f"{len(matching_elements)} found"
)
ifenabled = matching_elements[0].text.lower()
return self._poe_string_to_state(ifenabled)

def _get_poe_states_bulk(
self, interfaces: Sequence[manage.Interface]
) -> Dict[str, Optional[PoeState]]:
tree = self._get_all_poe_interface_information()
interface_information_elements = tree.findall(".//interface-information")
ifname_to_state_dict = {}
for element in interface_information_elements:
ifname = element.findall(".//interface-name")[0].text.strip().lower()
ifenabled = element.findall(".//interface-enabled")[0].text.strip().lower()
ifname_to_state_dict[ifname] = self._poe_string_to_state(ifenabled)
ifindex_to_state_dict = {
interface.ifname: ifname_to_state_dict.get(interface.ifname.lower())
for interface in interfaces
}
return ifindex_to_state_dict

@wrap_unhandled_rpc_errors
def _get_all_poe_interface_information(self) -> ElementTree:
return self.device.device.rpc.get_poe_interface_information()

@wrap_unhandled_rpc_errors
def _get_poe_interface_information(self, ifname: str) -> ElementTree:
return self.device.device.rpc.get_poe_interface_information(ifname=ifname)

def _poe_string_to_state(self, state_str: str) -> PoeState:
"""Converts from internal juniper state names to
corresponding PoeState objects
"""
state_cleaned = state_str.strip().lower()
if state_cleaned == "enabled":
return self.POE_ENABLED
elif state_cleaned == "disabled":
return self.POE_DISABLED
else:
raise POEStateNotSupportedError(f"Unknown PoE state {state_str}")

# FIXME Implement dot1x fetcher methods
# dot1x authentication configuration fetchers aren't implemented yet, for lack
# of configured devices to test on
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
set poe interface {{ ifname }} disable
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
delete poe interface {{ ifname }} disable
50 changes: 50 additions & 0 deletions tests/unittests/portadmin/napalm/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pytest
from unittest.mock import Mock

from nav.enterprise.ids import VENDOR_ID_JUNIPER_NETWORKS_INC
from nav.models import manage
from nav.portadmin.napalm.juniper import Juniper


@pytest.fixture()
def netbox_mock(interface1_mock, interface2_mock):
"""Create netbox model mock object"""
netbox = Mock()
netbox.ip = '10.0.0.1'
netbox.type.get_enterprise_id.return_value = VENDOR_ID_JUNIPER_NETWORKS_INC
netbox.interfaces = [interface1_mock, interface2_mock]
yield netbox


@pytest.fixture()
def profile_mock():
"""Create management profile model mock object"""
profile = Mock()
profile.protocol = manage.ManagementProfile.PROTOCOL_NAPALM
profile.PROTOCOL_NAPALM = manage.ManagementProfile.PROTOCOL_NAPALM
profile.configuration = {"driver": "mock"}
yield profile


@pytest.fixture()
def handler_mock(netbox_mock, profile_mock):
"""Create management handler mock object"""
juniper = Juniper(netbox=netbox_mock)
juniper._profile = profile_mock
yield juniper


@pytest.fixture()
def interface1_mock():
interface = Mock()
interface.ifname = "ge-0/0/1"
interface.ifindex = 1
yield interface


@pytest.fixture()
def interface2_mock():
interface = Mock()
interface.ifname = "ge-0/0/2"
interface.ifindex = 2
yield interface
Loading

0 comments on commit bb794f2

Please sign in to comment.