Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate IP addresses in alert profiles #2649

57 changes: 51 additions & 6 deletions python/nav/web/alertprofiles/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@

# pylint: disable=R0903

from typing import Any, Dict

from django import forms
from django.db.models import Q

from crispy_forms.helper import FormHelper
from crispy_forms_foundation.layout import Layout, Row, Column, Field, Submit, HTML

from IPy import IP

from nav.alertengine.dispatchers.email_dispatcher import Email
from nav.alertengine.dispatchers.slack_dispatcher import Slack
from nav.alertengine.dispatchers.sms_dispatcher import Sms

from nav.models.profiles import MatchField, Filter, Expression, FilterGroup
from nav.models.profiles import Expression, Filter, FilterGroup, MatchField, Operator
from nav.models.profiles import AlertProfile, TimePeriod, AlertSubscription
from nav.models.profiles import AlertAddress, AlertSender
from nav.web.crispyforms import HelpField
Expand Down Expand Up @@ -536,8 +540,12 @@ class ExpressionForm(forms.ModelForm):
create expressions that can be used in a filter.
"""

filter = forms.IntegerField(widget=forms.widgets.HiddenInput)
match_field = forms.IntegerField(widget=forms.widgets.HiddenInput)
filter = forms.ModelChoiceField(
queryset=Filter.objects.all(), widget=forms.widgets.HiddenInput
)
match_field = forms.ModelChoiceField(
queryset=MatchField.objects.all(), widget=forms.widgets.HiddenInput
)
value = forms.CharField(required=True)

class Meta(object):
Expand All @@ -559,7 +567,7 @@ def __init__(self, *args, **kwargs):
# Values are selected from a multiple choice list.
# Populate that list with possible choices.

# MatcField stores which table and column alert engine should
# MatchField stores which table and column alert engine should
# watch, as well as a table and column for "friendly" names in
# the GUI and how we should sort the fields in the GUI (if we
# are displaying a list)
Expand Down Expand Up @@ -605,8 +613,8 @@ def __init__(self, *args, **kwargs):

choices = []
for obj in model_objects:
# ID is what is acctually used in the expression that will
# be evaluted by alert engine
# ID is what is actually used in the expression that will
# be evaluated by alert engine
ident = getattr(obj, attname)

if model == name_model:
Expand All @@ -625,3 +633,40 @@ def __init__(self, *args, **kwargs):

# At last we acctually add the multiple choice field.
self.fields['value'] = forms.MultipleChoiceField(choices=choices)

def clean(self) -> Dict[str, Any]:
validated_data = super().clean()

match_field = validated_data["match_field"]
operator_type = validated_data["operator"]
value = validated_data["value"]

if match_field.data_type == MatchField.IP:
if operator_type == Operator.IN:
ip_list = value.split()
else:
ip_list = [value]
validated_ip_addresses = []
for ip in ip_list:
try:
IP(ip)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I would advise against using IPy.IP for validation of strings, precisely because this library is extremely lenient in what is considered acceptable as input.

Case in point:

>>> from IPy import IP
>>> IP(42)
IP('0.0.0.42')
>>> IP('42')
IP('42.0.0.0')
>>> IP(23948234)
IP('1.109.107.202')
>> IP('100.200')
IP('100.200.0.0')
>>>

Please consider using nav.utils.is_valid_ip() instead (possibly with the strict flag set).

except ValueError:
self.add_error(
field="value",
error=forms.ValidationError(("Invalid IP address: %s" % ip)),
)
else:
validated_ip_addresses.append(str(ip))
# Bring ip address back into original format to be processed below
value = " ".join(validated_ip_addresses)

if operator_type == Operator.IN:
"""If input was a multiple choice list we have to join each option in one
string, where each option is separated by a | (pipe).
If input was a IP adress we should replace space with | (pipe)."""
if match_field.data_type == MatchField.IP:
validated_data["value"] = value.replace(' ', '|')
else:
validated_data["value"] = "|".join(value)

return validated_data
55 changes: 26 additions & 29 deletions python/nav/web/alertprofiles/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# TODO Filter/filter_groups have owners, check that the account that performs
# the operation is the owner

from django.http import HttpResponseRedirect
from django.http import HttpResponseRedirect, QueryDict
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
from django.shortcuts import render
Expand Down Expand Up @@ -1513,7 +1513,7 @@ def filter_remove(request):

@requires_post('alertprofiles-filters', ('id', 'matchfield'))
def filter_addexpression(request):
"""Shows the form to add en expression to a filter"""
"""Shows the form to add an expression to a filter"""
try:
filtr = Filter.objects.get(pk=request.POST.get('id'))
except Filter.DoesNotExist:
Expand Down Expand Up @@ -1563,40 +1563,37 @@ def filter_addexpression(request):
@requires_post('alertprofiles-filters')
def filter_saveexpression(request):
"""Saves an expression to a filter"""
# Get the MatchField, Filter and Operator objects associated with the
# input POST-data
filtr = Filter.objects.get(pk=request.POST.get('filter'))
type_ = request.POST.get('operator')
match_field = MatchField.objects.get(pk=request.POST.get('match_field'))
operator = Operator.objects.get(type=type_, match_field=match_field.pk)
if request.POST.get('id'):
existing_expression = Expression.objects.get(pk=request.POST.get('id'))
form = ExpressionForm(request.POST, instance=existing_expression)
else:
form = ExpressionForm(request.POST)

if not form.is_valid():
dictionary = {
'id': str(form.cleaned_data["filter"].pk),
'matchfield': str(form.cleaned_data["match_field"].pk),
}
qdict = QueryDict("", mutable=True)
qdict.update(dictionary)
request.POST = qdict
new_message(
request,
form.errors,
Messages.ERROR,
)

return filter_addexpression(request=request)

filtr = form.cleaned_data['filter']

if not account_owns_filters(get_account(request), filtr):
return alertprofiles_response_forbidden(
request, _('You do not own this filter.')
)

# Get the value
if operator.type == Operator.IN:
# If input was a multiple choice list we have to join each option
# in one string, where each option is separated by a | (pipe).
# If input was a IP adress we should replace space with | (pipe).
# FIXME We might want some data checks here
if match_field.data_type == MatchField.IP:
# FIXME We might want to check that it is a valid IP adress.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, I love the fact that there was a FIXME comment here from before 😆

# If we do so, we need to remember both IPv4 and IPv6
value = request.POST.get('value').replace(' ', '|')
else:
value = "|".join([value for value in request.POST.getlist('value')])
else:
value = request.POST.get('value')
form.save()

expression = Expression(
filter=filtr,
match_field=match_field,
operator=operator.type,
value=value,
)
expression.save()
new_message(
request,
_('Added expression to filter %(name)s') % {'name': filtr.name},
Expand Down
153 changes: 153 additions & 0 deletions tests/integration/web/alertprofiles_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
from django.test.client import RequestFactory
from django.urls import reverse
from nav.compatibility import smart_str

from nav.models.profiles import (
Account,
AlertAddress,
AlertPreference,
AlertProfile,
AlertSender,
AlertType,
Expression,
Filter,
MatchField,
Operator,
)
from nav.web.alertprofiles.views import set_active_profile

Expand Down Expand Up @@ -162,6 +168,146 @@ def test_alertprofiles_add_public_filter_should_succeed(client):
assert response.status_code == 200


def test_alertprofiles_add_expression_with_valid_ipv4_address_should_succeed(
client, dummy_filter
):
"""Tests that an expression with a valid IPv4 address can be added"""
ip_match_field = MatchField.objects.get(data_type=MatchField.IP)
url = reverse("alertprofiles-filters-saveexpression")
data = {
"filter": dummy_filter.pk,
"match_field": ip_match_field.pk,
"operator": Operator.EQUALS,
"value": "172.0.0.1",
}
response = client.post(url, data=data, follow=True)
assert response.status_code == 200
assert Expression.objects.filter(
filter=dummy_filter,
match_field=ip_match_field,
operator=Operator.EQUALS,
value=data["value"],
).exists()
assert f"Added expression to filter {dummy_filter}" in smart_str(response.content)


def test_alertprofiles_add_expression_with_valid_ipv6_address_should_succeed(
client, dummy_filter
):
"""Tests that an expression with a valid IPv6 address can be added"""
url = reverse("alertprofiles-filters-saveexpression")
ip_match_field = MatchField.objects.get(data_type=MatchField.IP)
data = {
"filter": dummy_filter.pk,
"match_field": ip_match_field.pk,
"operator": Operator.EQUALS,
"value": "2001:db8:3333:4444:5555:6666:7777:8888",
}
response = client.post(url, data=data, follow=True)
assert response.status_code == 200
assert Expression.objects.filter(
filter=dummy_filter,
match_field=ip_match_field,
operator=Operator.EQUALS,
value=data["value"],
).exists()
assert f"Added expression to filter {dummy_filter}" in smart_str(response.content)


def test_alertprofiles_add_expression_with_non_valid_ip_address_should_fail(
client, dummy_filter
):
"""Tests that an expression with a not valid IP address cannot be added"""
ip_match_field = MatchField.objects.get(data_type=MatchField.IP)
url = reverse("alertprofiles-filters-saveexpression")
data = {
"filter": dummy_filter.pk,
"match_field": ip_match_field.pk,
"operator": Operator.EQUALS,
"value": "wrong",
}
response = client.post(url, data=data, follow=True)
assert response.status_code == 200
assert not Expression.objects.filter(
filter=dummy_filter,
match_field=ip_match_field,
operator=Operator.EQUALS,
value=data["value"],
).exists()
assert f"Invalid IP address: {data['value']}" in smart_str(response.content)


def test_alertprofiles_add_expression_with_multiple_valid_ip_addresses_should_succeed(
client, dummy_filter
):
"""Tests that an expression with multiple valid IP addresses can be added"""
ip_match_field = MatchField.objects.get(data_type=MatchField.IP)
url = reverse("alertprofiles-filters-saveexpression")
data = {
"filter": dummy_filter.pk,
"match_field": ip_match_field.pk,
"operator": Operator.IN,
"value": "172.0.0.1 2001:db8:3333:4444:5555:6666:7777:8888",
}
response = client.post(url, data=data, follow=True)
assert response.status_code == 200
assert Expression.objects.filter(
filter=dummy_filter,
match_field=ip_match_field,
operator=Operator.IN,
value=data["value"].replace(' ', '|'),
).exists()
assert f"Added expression to filter {dummy_filter}" in smart_str(response.content)


def test_alertprofiles_add_expression_with_multiple_non_valid_ip_addresses_should_fail(
client, dummy_filter
):
"""Tests that an expression with a not valid IP address cannot be added"""
ip_match_field = MatchField.objects.get(data_type=MatchField.IP)
valid_ip = "172.0.0.1"
invalid_ip = "wrong"
url = reverse("alertprofiles-filters-saveexpression")
data = {
"filter": dummy_filter.pk,
"match_field": ip_match_field.pk,
"operator": Operator.IN,
"value": f"{valid_ip} {invalid_ip}",
}
response = client.post(url, data=data, follow=True)
assert response.status_code == 200
assert not Expression.objects.filter(
filter=dummy_filter,
match_field=ip_match_field,
operator=Operator.IN,
value=data["value"],
).exists()
assert f"Invalid IP address: {invalid_ip}" in smart_str(response.content)


def test_alertprofiles_add_expression_with_multiple_alert_types_should_succeed(
client, dummy_filter
):
"""Tests that an expression with multiple alert types can be added"""
string_match_field = MatchField.objects.get(name="Alert type")
url = reverse("alertprofiles-filters-saveexpression")
data = {
"filter": dummy_filter.pk,
"match_field": string_match_field.pk,
"operator": Operator.IN,
"value": ["apDown", "apUp"],
}
response = client.post(url, data=data, follow=True)
assert response.status_code == 200
assert Expression.objects.filter(
filter=dummy_filter,
match_field=string_match_field,
operator=Operator.IN,
value="|".join(data["value"]),
).exists()
assert f"Added expression to filter {dummy_filter}" in smart_str(response.content)


def test_set_accountgroup_permissions_should_not_crash(db, client):
"""Regression test for #2281"""
url = reverse('alertprofiles-permissions-save')
Expand Down Expand Up @@ -252,3 +398,10 @@ def activated_dummy_profile(dummy_profile):
)
preference.save()
return dummy_profile


@pytest.fixture(scope="function")
def dummy_filter():
filtr = Filter(name="dummy", owner=Account.objects.get(id=Account.ADMIN_ACCOUNT))
filtr.save()
return filtr
Loading