Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PoE config for Juniper devices #2666

Merged
merged 25 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions python/nav/portadmin/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ def set_poe_state(self, interface: manage.Interface, state: PoeState):
raise NotImplementedError

def get_poe_states(
self, interfaces: Sequence[manage.Interface] = None
) -> Dict[int, Optional[PoeState]]:
self, interfaces: Optional[Sequence[manage.Interface]] = None
) -> Dict[str, Optional[PoeState]]:
"""Retrieves current PoE state for interfaces on this device.

:param interfaces: Optional sequence of interfaces to filter for, as fetching
Expand All @@ -311,7 +311,7 @@ def get_poe_states(
the default behavior is to filter on all Interface objects
registered for this device.
:returns: A dict mapping interfaces to their discovered PoE state.
The key matches the `ifindex` attribute for the related
The key matches the `ifname` attribute for the related
Interface object.
The value will be None if the interface does not support PoE.
"""
Expand All @@ -338,3 +338,17 @@ class ProtocolError(ManagementError):
"""Raised when some non-categorized error in the underlying protocol occurred
during communication
"""


class POENotSupportedError(ManagementError):
"""Raised when an interface that does not support PoE is used in a context
where PoE support is expected
"""


class POEStateNotSupportedError(ManagementError):
"""Raised when a PoE state is detected in a context where it is not supported"""


class XMLParseError(ManagementError):
"""Raised when failing to parse XML"""
115 changes: 114 additions & 1 deletion python/nav/portadmin/napalm/juniper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
"""
from __future__ import annotations
from operator import attrgetter
from typing import List, Any, Dict, Tuple, Sequence
from typing import List, Any, Dict, Tuple, Sequence, Optional

from django.template.loader import get_template
from napalm.base.exceptions import ConnectAuthError, ConnectionException
from jnpr.junos.op.vlan import VlanTable
from jnpr.junos.exception import RpcError
from lxml.etree import ElementTree

from nav.napalm import connect as napalm_connect
from nav.enterprise.ids import VENDOR_ID_JUNIPER_NETWORKS_INC
Expand All @@ -44,6 +45,10 @@
AuthenticationError,
NoResponseError,
ProtocolError,
PoeState,
POEStateNotSupportedError,
POENotSupportedError,
XMLParseError,
)
from nav.junos.nav_views import (
EthernetSwitchingInterfaceTable,
Expand Down Expand Up @@ -102,6 +107,9 @@

VENDOR = VENDOR_ID_JUNIPER_NETWORKS_INC
PROTOCOL = manage.ManagementProfile.PROTOCOL_NAPALM
POE_ENABLED = PoeState(state=1, name="ENABLED")
POE_DISABLED = PoeState(state=2, name="DISABLED")
POE_OPTIONS = [POE_ENABLED, POE_DISABLED]

def __init__(self, netbox: manage.Netbox, **kwargs):
super().__init__(netbox, **kwargs)
Expand Down Expand Up @@ -441,6 +449,111 @@
if not self.profile:
raise DeviceNotConfigurableError("Device has no NAPALM profile")

def get_poe_state_options(self) -> Sequence[PoeState]:
"""Returns the available options for enabling/disabling PoE on this netbox"""
return self.POE_OPTIONS

@wrap_unhandled_rpc_errors
def set_poe_state(self, interface: manage.Interface, state: PoeState):
"""Set state for enabling/disabling PoE on this interface.
Available options should be retrieved using `get_poe_state_options`
"""
if not isinstance(state, PoeState):
raise TypeError("state must be a PoeState object")
if state == self.POE_ENABLED:
template = get_template("portadmin/junos-enable-poe.djt")
elif state == self.POE_DISABLED:
template = get_template("portadmin/junos-disable-poe.djt")

Check warning on line 466 in python/nav/portadmin/napalm/juniper.py

View check run for this annotation

Codecov / codecov/patch

python/nav/portadmin/napalm/juniper.py#L461-L466

Added lines #L461 - L466 were not covered by tests
else:
raise POEStateNotSupportedError(f"state {state} is not a valid state")
master, _ = split_master_unit(interface.ifname)
config = template.render({"ifname": master})
self.device.load_merge_candidate(config=config)

Check warning on line 471 in python/nav/portadmin/napalm/juniper.py

View check run for this annotation

Codecov / codecov/patch

python/nav/portadmin/napalm/juniper.py#L468-L471

Added lines #L468 - L471 were not covered by tests

def get_poe_states(
self, interfaces: Optional[Sequence[manage.Interface]] = None
) -> Dict[str, Optional[PoeState]]:
"""Retrieves current PoE state for interfaces on this device.

:param interfaces: Optional sequence of interfaces to filter for, as fetching
data for all interfaces may be a waste of time if only a
single interface is needed. If this parameter is omitted,
the default behavior is to filter on all Interface objects
registered for this device.
:returns: A dict mapping interfaces to their discovered PoE state.
The key matches the `ifname` attribute for the related
Interface object.
The value will be None if the interface does not support PoE.
"""
if not interfaces:
if self.netbox.interfaces:
interfaces = self.netbox.interfaces
else:
return {}
if len(interfaces) == 1:
interface = interfaces[0]
try:
state = self._get_single_poe_state(interface)
except POENotSupportedError:
state = None
return {interface.ifname: state}
else:
return self._get_poe_states_bulk(interfaces)

def _get_single_poe_state(self, interface: manage.Interface) -> PoeState:
tree = self._get_poe_interface_information(ifname=interface.ifname)
matching_elements = tree.xpath(
"//poe/interface-information-detail/interface-enabled-detail"
)
# Interfaces that do not support PoE will not have this element
if not matching_elements:
raise POENotSupportedError(
f"Interface {interface.ifname} does not support PoE"
)
if len(matching_elements) != 1:
raise XMLParseError(
f"Expected 1 matching element in xml response, "
f"{len(matching_elements)} found"
)
ifenabled = matching_elements[0].text.lower()
return self._poe_string_to_state(ifenabled)

def _get_poe_states_bulk(
self, interfaces: Sequence[manage.Interface]
) -> Dict[str, Optional[PoeState]]:
tree = self._get_all_poe_interface_information()
interface_information_elements = tree.findall(".//interface-information")
ifname_to_state_dict = {}
for element in interface_information_elements:
ifname = element.findall(".//interface-name")[0].text.strip().lower()
ifenabled = element.findall(".//interface-enabled")[0].text.strip().lower()
ifname_to_state_dict[ifname] = self._poe_string_to_state(ifenabled)
ifindex_to_state_dict = {
interface.ifname: ifname_to_state_dict.get(interface.ifname.lower())
for interface in interfaces
}
return ifindex_to_state_dict

@wrap_unhandled_rpc_errors
def _get_all_poe_interface_information(self) -> ElementTree:
return self.device.device.rpc.get_poe_interface_information()

Check warning on line 539 in python/nav/portadmin/napalm/juniper.py

View check run for this annotation

Codecov / codecov/patch

python/nav/portadmin/napalm/juniper.py#L539

Added line #L539 was not covered by tests

@wrap_unhandled_rpc_errors
def _get_poe_interface_information(self, ifname: str) -> ElementTree:
return self.device.device.rpc.get_poe_interface_information(ifname=ifname)

Check warning on line 543 in python/nav/portadmin/napalm/juniper.py

View check run for this annotation

Codecov / codecov/patch

python/nav/portadmin/napalm/juniper.py#L543

Added line #L543 was not covered by tests

def _poe_string_to_state(self, state_str: str) -> PoeState:
"""Converts from internal juniper state names to
corresponding PoeState objects
"""
state_cleaned = state_str.strip().lower()
if state_cleaned == "enabled":
return self.POE_ENABLED
elif state_cleaned == "disabled":
return self.POE_DISABLED
else:
raise POEStateNotSupportedError(f"Unknown PoE state {state_str}")

# FIXME Implement dot1x fetcher methods
# dot1x authentication configuration fetchers aren't implemented yet, for lack
# of configured devices to test on
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
set poe interface {{ ifname }} disable
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
delete poe interface {{ ifname }} disable
50 changes: 50 additions & 0 deletions tests/unittests/portadmin/napalm/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pytest
from unittest.mock import Mock

from nav.enterprise.ids import VENDOR_ID_JUNIPER_NETWORKS_INC
from nav.models import manage
from nav.portadmin.napalm.juniper import Juniper


@pytest.fixture()
def netbox_mock(interface1_mock, interface2_mock):
"""Create netbox model mock object"""
netbox = Mock()
netbox.ip = '10.0.0.1'
netbox.type.get_enterprise_id.return_value = VENDOR_ID_JUNIPER_NETWORKS_INC
netbox.interfaces = [interface1_mock, interface2_mock]
yield netbox


@pytest.fixture()
def profile_mock():
"""Create management profile model mock object"""
profile = Mock()
profile.protocol = manage.ManagementProfile.PROTOCOL_NAPALM
profile.PROTOCOL_NAPALM = manage.ManagementProfile.PROTOCOL_NAPALM
profile.configuration = {"driver": "mock"}
yield profile


@pytest.fixture()
def handler_mock(netbox_mock, profile_mock):
"""Create management handler mock object"""
juniper = Juniper(netbox=netbox_mock)
juniper._profile = profile_mock
yield juniper


@pytest.fixture()
def interface1_mock():
interface = Mock()
interface.ifname = "ge-0/0/1"
interface.ifindex = 1
yield interface


@pytest.fixture()
def interface2_mock():
interface = Mock()
interface.ifname = "ge-0/0/2"
interface.ifindex = 2
yield interface
Loading
Loading