From 5cda1af8fd0126ce2195f6c2df316c73a6ae3e72 Mon Sep 17 00:00:00 2001 From: Sasha Romijn Date: Tue, 2 Nov 2021 21:40:20 +0100 Subject: [PATCH] Ref #575, #412 - Initial work on separate auth validator tests --- irrd/updates/tests/test_validators.py | 256 ++++++++++++++++++++++++++ irrd/updates/validators.py | 4 +- irrd/utils/rpsl_samples.py | 8 +- 3 files changed, 264 insertions(+), 4 deletions(-) create mode 100644 irrd/updates/tests/test_validators.py diff --git a/irrd/updates/tests/test_validators.py b/irrd/updates/tests/test_validators.py new file mode 100644 index 000000000..00c3d0332 --- /dev/null +++ b/irrd/updates/tests/test_validators.py @@ -0,0 +1,256 @@ +# flake8: noqa: W293 +import itertools +import textwrap +from unittest.mock import Mock + +import pytest +from passlib.handlers.md5_crypt import md5_crypt +from pytest import raises + +from irrd.conf import PASSWORD_HASH_DUMMY_VALUE +from irrd.rpki.status import RPKIStatus +from irrd.rpsl.rpsl_objects import rpsl_object_from_text +from irrd.scopefilter.status import ScopeFilterStatus +from irrd.scopefilter.validators import ScopeFilterValidator +from irrd.storage.models import JournalEntryOrigin +from irrd.utils.rpsl_samples import SAMPLE_INETNUM, SAMPLE_AS_SET, SAMPLE_MNTNER_CRYPT, SAMPLE_PERSON, SAMPLE_MNTNER, \ + SAMPLE_ROUTE, SAMPLE_MNTNER_MD5 +from irrd.utils.test_utils import flatten_mock_calls +from irrd.utils.text import splitline_unicodesafe, remove_auth_hashes +from ..parser import parse_change_requests +from ..parser_state import UpdateRequestType, UpdateRequestStatus +from ..validators import ReferenceValidator, AuthValidator, ValidatorResult + +VALID_PW = 'override-password' +INVALID_PW = 'not-override-password' +VALID_PW_HASH = '$1$J6KycItM$MbPaBU6iFSGFV299Rk7Di0' + + +@pytest.fixture() +def prepare_mocks(monkeypatch): + mock_dh = Mock() + mock_dq = Mock() + monkeypatch.setattr('irrd.updates.parser.RPSLDatabaseQuery', lambda: mock_dq) + monkeypatch.setattr('irrd.updates.validators.RPSLDatabaseQuery', lambda: mock_dq) + + validator = AuthValidator(mock_dh, None) + yield validator, mock_dq, mock_dh + + +class TestAuthValidatorOverride: + def test_valid_override(self, prepare_mocks, config_override): + config_override({ + 'auth': {'override_password': VALID_PW_HASH}, + }) + validator, mock_dq, mock_dh = prepare_mocks + person = rpsl_object_from_text(SAMPLE_PERSON) + + validator.overrides = [VALID_PW] + result = validator.process_auth(person, None) + assert result.is_valid(), result.error_messages + assert result.used_override + + person = rpsl_object_from_text(SAMPLE_PERSON) + result = validator.process_auth(person, person) + assert result.is_valid(), result.error_messages + assert result.used_override + + def test_invalid_or_missing_override(self, prepare_mocks, config_override): + # This test mostly ignores the regular process that happens + # after override validation fails. + validator, mock_dq, mock_dh = prepare_mocks + mock_dh.execute_query = lambda q: [] + person = rpsl_object_from_text(SAMPLE_PERSON) + + validator.overrides = [VALID_PW] + result = validator.process_auth(person, None) + assert not result.is_valid() + assert not result.used_override + + config_override({ + 'auth': {'override_password': VALID_PW_HASH}, + }) + validator.overrides = [] + result = validator.process_auth(person, None) + assert not result.is_valid() + assert not result.used_override + + validator.overrides = [INVALID_PW] + result = validator.process_auth(person, None) + assert not result.is_valid() + assert not result.used_override + + config_override({ + 'auth': {'override_password': 'not-valid-hash'}, + }) + person = rpsl_object_from_text(SAMPLE_PERSON) + result = validator.process_auth(person, None) + assert not result.is_valid() + assert not result.used_override + + +class TestAuthValidator: + def test_valid_new_person(self, prepare_mocks): + validator, mock_dq, mock_dh = prepare_mocks + person = rpsl_object_from_text(SAMPLE_PERSON) + mock_dh.execute_query = lambda q: [ + {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, + ] + + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(person, None) + assert result.is_valid(), result.error_messages + assert not result.used_override + assert len(result.mntners_notify) == 1 + assert result.mntners_notify[0].pk() == 'TEST-MNT' + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT'},), {}], + ] + + def test_existing_person_mntner_change(self, prepare_mocks): + validator, mock_dq, mock_dh = prepare_mocks + # TEST-MNT is in both maintainers + person_new = rpsl_object_from_text(SAMPLE_PERSON + 'mnt-by: TEST-NEW-MNT\n') + person_old = rpsl_object_from_text(SAMPLE_PERSON + 'mnt-by: TEST-OLD-MNT\n') + query_results = itertools.cycle([ + [ + { + 'object_class': 'mntner', + 'object_text': SAMPLE_MNTNER.replace('TEST-MNT', 'TEST-NEW-MNT').replace('MD5', 'nomd5') + }, { + 'object_class': 'mntner', + 'object_text': SAMPLE_MNTNER.replace('MD5', 'nomd5').replace('CRYPT', 'nocrypt') + }, + ], + [ + { + 'object_class': 'mntner', + 'object_text': SAMPLE_MNTNER.replace('TEST-MNT', 'TEST-OLD-MNT').replace('CRYPT', 'nocrypt') + }, + ], + ]) + mock_dh.execute_query = lambda q: next(query_results) + + validator.passwords = [SAMPLE_MNTNER_CRYPT, SAMPLE_MNTNER_MD5] + result = validator.process_auth(person_new, person_old) + + assert result.is_valid(), result.error_messages + assert not result.used_override + assert {m.pk() for m in result.mntners_notify} == {'TEST-MNT', 'TEST-OLD-MNT'} + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT', 'TEST-NEW-MNT'},), {}], + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-OLD-MNT'},), {}], # TEST-MNT is cached + ] + + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(person_new, person_old) + assert not result.is_valid() + print(result.error_messages) + assert result.error_messages == {'Authorisation for person PERSON-TEST failed: ' + 'must by authenticated by one of: TEST-MNT, TEST-NEW-MNT'} + + validator.passwords = [SAMPLE_MNTNER_CRYPT] + result = validator.process_auth(person_new, person_old) + assert not result.is_valid() + assert result.error_messages == {'Authorisation for person PERSON-TEST failed: ' + 'must by authenticated by one of: TEST-MNT, TEST-OLD-MNT'} + + # TODO: the preapprove function needs a bunch of refactoring for this to work + # def test_valid_new_person_preapproved_mntner(self, prepare_mocks): + # validator, mock_dq, mock_dh = prepare_mocks + # person = rpsl_object_from_text(SAMPLE_PERSON) + # mock_dh.execute_query = lambda q: [ + # {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, + # ] + + # result_mntner = parse_change_requests(SAMPLE_MNTNER + 'override: override-password', + # mock_dh, Mock(), Mock())[0] + # validator.pre_approve([result_mntner]) + + # result = validator.process_auth(person, None) + # assert result.is_valid(), result.error_messages + # assert not result.used_override + # assert len(result.mntners_notify) == 1 + # assert result.mntners_notify[0].pk() == 'TEST-MNT' + # assert not flatten_mock_calls(mock_dq, flatten_objects=True) + + def test_create_mntner_requires_override(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + mntner = rpsl_object_from_text(SAMPLE_MNTNER) + mock_dh.execute_query = lambda q: [ + {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, + ] + + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(mntner, None) + assert not result.is_valid() + assert not result.used_override + assert result.error_messages == {'New mntner objects must be added by an administrator.'} + + assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ + ['sources', (['TEST'],), {}], + ['object_classes', (['mntner'],), {}], + ['rpsl_pks', ({'TEST-MNT', 'OTHER1-MNT', 'OTHER2-MNT'},), {}], + ] + + validator.overrides = [VALID_PW] + config_override({ + 'auth': {'override_password': VALID_PW_HASH}, + }) + + result = validator.process_auth(mntner, None) + assert result.is_valid(), result.error_messages + assert result.used_override + + def test_modify_mntner(self, prepare_mocks, config_override): + validator, mock_dq, mock_dh = prepare_mocks + mntner = rpsl_object_from_text(SAMPLE_MNTNER) + mock_dh.execute_query = lambda q: [ + {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, + ] + + # This counts as submitting all new hashes. + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(mntner, mntner) + assert result.is_valid() + assert not result.info_messages + + # This counts as submitting all new hashes, but not matching any password + new_mntner = rpsl_object_from_text(SAMPLE_MNTNER.replace('CRYPT', '').replace('MD5', '')) + validator.passwords = [SAMPLE_MNTNER_MD5] + result = validator.process_auth(new_mntner, mntner) + assert not result.is_valid() + assert result.error_messages == { + 'Authorisation failed for the auth methods on this mntner object.' + } + + # This counts as submitting all dummy hashes. + mntner_no_auth_hashes = remove_auth_hashes(SAMPLE_MNTNER) + new_mntner = rpsl_object_from_text(mntner_no_auth_hashes) + result = validator.process_auth(new_mntner, mntner) + assert result.is_valid() + assert not new_mntner.has_dummy_auth_value() + assert result.info_messages == { + 'As you submitted dummy hash values, all password hashes on this ' + 'object were replaced with a new MD5-PW hash of the password you ' + 'provided for authentication.' + } + + # # This is a multi password submission which is rejected + validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] + new_mntner = rpsl_object_from_text(mntner_no_auth_hashes) + result = validator.process_auth(new_mntner, mntner) + assert not result.is_valid() + assert not result.info_messages + assert result.error_messages == { + 'Object submitted with dummy hash values, but multiple or no passwords ' + 'submitted. Either submit only full hashes, or a single password.' + } diff --git a/irrd/updates/validators.py b/irrd/updates/validators.py index a28f238fc..303296ab6 100644 --- a/irrd/updates/validators.py +++ b/irrd/updates/validators.py @@ -146,6 +146,8 @@ class AuthValidator: def __init__(self, database_handler: DatabaseHandler, keycert_obj_pk=None) -> None: self.database_handler = database_handler + self.passwords = [] + self.overrides = [] self._mntner_db_cache: Set[RPSLMntner] = set() self._pre_approved: Set[str] = set() self.keycert_obj_pk = keycert_obj_pk @@ -252,7 +254,7 @@ def _check_mntners(self, mntner_pk_list: List[str], source: str) -> Tuple[bool, Returns True if at least one of the mntners in mntner_list passes authentication, given self.passwords and self.keycert_obj_pk. Updates and checks self._mntner_db_cache - to prevent double checking of maintainers. + to prevent double retrieval of maintainers. """ mntner_pk_set = set(mntner_pk_list) mntner_objs: List[RPSLMntner] = [ diff --git a/irrd/utils/rpsl_samples.py b/irrd/utils/rpsl_samples.py index eafcb764a..d7df1aa6a 100644 --- a/irrd/utils/rpsl_samples.py +++ b/irrd/utils/rpsl_samples.py @@ -160,8 +160,8 @@ 127.0.0.0/8^- , 169.254.0.0/16^- , # The next two lines were introduced to test blank lines with correct cont. chars 172.16.0.0/12^- , # Note that the trailing whitespace is significant. -\t -+ +\t ++ 192.0.2.0/24^- , 192.168.0.0/16^- , 197.0.0.0/8^- , @@ -225,7 +225,7 @@ owner: Sasha Romijn fingerpr: 8626 1D8D BEBD A4F5 4692 D64D A838 3BA7 80F2 38C6 certif: -----BEGIN PGP PUBLIC KEY BLOCK----- - + mQINBFnY7YoBEADH5ooPsoR9G/dNxrdHRMJeDHXCSQbwgXWEez5/F8/BZKV9occ/ certif: jZ7w2wH+Ghj4vTQl1DhuNcxi60qDv9DAPxG73DkBrK0I3fDUJUPrdOKW9SXvZCAq certif: LrVEdDVH+YEKhQLlGG7DTODGsfMglL98mn7GD/wD64LtRF3eBAucTIjaOl9hvoqX @@ -494,6 +494,8 @@ source: TEST """ +SAMPLE_MNTNER_MD5 = 'md5-password' +SAMPLE_MNTNER_CRYPT = 'crypt-password' SAMPLE_MNTNER = """mntner: TEST-MNT admin-c: PERSON-TEST notify: notify@example.net