From f67f1d439531ce36a0c448821a8f363769634a8b Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 21 Oct 2024 11:32:32 -0500 Subject: [PATCH 01/12] encapsulate group processing --- st2auth_ldap/ldap_backend.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/st2auth_ldap/ldap_backend.py b/st2auth_ldap/ldap_backend.py index e40a671..b258bf6 100644 --- a/st2auth_ldap/ldap_backend.py +++ b/st2auth_ldap/ldap_backend.py @@ -176,11 +176,8 @@ def authenticate(self, username, password): username=username) # Assume group entries are not case sensitive. - user_groups = set([entry.lower() for entry in user_groups]) - required_groups = set([entry.lower() for entry in self._group_dns]) - result = self._verify_user_group_membership(username=username, - required_groups=required_groups, + required_groups=self._group_dns, user_groups=user_groups, check_behavior=self._group_dns_check) if not result: @@ -377,6 +374,9 @@ def _verify_user_group_membership(self, username, required_groups, user_groups, in the config (and / or). """ + user_groups = set([entry.lower() for entry in user_groups]) + required_groups = set([entry.lower() for entry in required_groups]) + if check_behavior == 'and': additional_msg = ('user needs to be member of all the following groups "%s" for ' 'authentication to succeeed') From 7c9df179ea3ba23ec08caf20735f02fae9cd0702 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 21 Oct 2024 11:38:05 -0500 Subject: [PATCH 02/12] combine multiple if statements --- st2auth_ldap/ldap_backend.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/st2auth_ldap/ldap_backend.py b/st2auth_ldap/ldap_backend.py index b258bf6..41e4896 100644 --- a/st2auth_ldap/ldap_backend.py +++ b/st2auth_ldap/ldap_backend.py @@ -389,12 +389,13 @@ def _verify_user_group_membership(self, username, required_groups, user_groups, LOG.debug('Verifying user group membership using "%s" behavior (%s)' % (check_behavior, additional_msg)) - if check_behavior == 'and': - if required_groups.issubset(user_groups): - return True - elif check_behavior == 'or': - if required_groups.intersection(user_groups): - return True + # simple fully qualified group name match + if ( + check_behavior == 'and' and required_groups.issubset(user_groups) + ) or ( + check_behavior == 'or' and required_groups.intersection(user_groups) + ): + return True msg = ('Unable to verify membership for user "%s (required_groups=%s,' 'actual_groups=%s,check_behavior=%s)".' % (username, str(required_groups), From 78fbe174959f7b88b9c6168823b9deb199001e2a Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 21 Oct 2024 12:30:56 -0500 Subject: [PATCH 03/12] more robust group DN normalization --- st2auth_ldap/ldap_backend.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/st2auth_ldap/ldap_backend.py b/st2auth_ldap/ldap_backend.py index 41e4896..2c663ae 100644 --- a/st2auth_ldap/ldap_backend.py +++ b/st2auth_ldap/ldap_backend.py @@ -21,6 +21,7 @@ import logging import ldap +import ldap.dn import ldap.filter import ldapurl @@ -374,8 +375,17 @@ def _verify_user_group_membership(self, username, required_groups, user_groups, in the config (and / or). """ - user_groups = set([entry.lower() for entry in user_groups]) - required_groups = set([entry.lower() for entry in required_groups]) + # normalize DN strings (lowercase, remove spaces, etc) + user_group_dns = [ + ldap.dn.str2dn(group.lower(), flags=ldap.DN_FORMAT_LDAPV3) + for group in user_groups + ] + required_group_dns = [ + ldap.dn.str2dn(group.lower(), flags=ldap.DN_FORMAT_LDAPV3) + for group in required_groups + ] + norm_user_groups = {ldap.dn.dn2str(dn) for dn in user_group_dns} + norm_required_groups = {ldap.dn.dn2str(dn) for dn in required_group_dns} if check_behavior == 'and': additional_msg = ('user needs to be member of all the following groups "%s" for ' @@ -389,11 +399,13 @@ def _verify_user_group_membership(self, username, required_groups, user_groups, LOG.debug('Verifying user group membership using "%s" behavior (%s)' % (check_behavior, additional_msg)) - # simple fully qualified group name match + # simple fully qualified DN match if ( - check_behavior == 'and' and required_groups.issubset(user_groups) + check_behavior == 'and' + and norm_required_groups.issubset(norm_user_groups) ) or ( - check_behavior == 'or' and required_groups.intersection(user_groups) + check_behavior == 'or' + and norm_required_groups.intersection(norm_user_groups) ): return True From d468a9f874768387e2c924ed433e04049d799c34 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 21 Oct 2024 14:42:26 -0500 Subject: [PATCH 04/12] fmt some log lines --- st2auth_ldap/ldap_backend.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/st2auth_ldap/ldap_backend.py b/st2auth_ldap/ldap_backend.py index 2c663ae..263cd39 100644 --- a/st2auth_ldap/ldap_backend.py +++ b/st2auth_ldap/ldap_backend.py @@ -394,10 +394,11 @@ def _verify_user_group_membership(self, username, required_groups, user_groups, additional_msg = ('user needs to be member of one or more of the following groups "%s" ' 'for authentication to succeeed') - additional_msg = additional_msg % (str(list(required_groups))) + additional_msg = additional_msg % (str(list(sorted(required_groups)))) - LOG.debug('Verifying user group membership using "%s" behavior (%s)' % - (check_behavior, additional_msg)) + LOG.debug( + f'Verifying user group membership using "{check_behavior}" behavior ({additional_msg})' + ) # simple fully qualified DN match if ( @@ -409,10 +410,12 @@ def _verify_user_group_membership(self, username, required_groups, user_groups, ): return True - msg = ('Unable to verify membership for user "%s (required_groups=%s,' - 'actual_groups=%s,check_behavior=%s)".' % (username, str(required_groups), - str(user_groups), check_behavior)) - LOG.exception(msg) + LOG.exception( + f'Unable to verify membership for user "{username" ' + f"(required_groups={list(sorted(required_groups))}," + f"actual_groups={list(sorted(norm_user_groups))}," + f"check_behavior={check_behavior})" + ) # Final safe guard return False From 4eb38d46100b06715f2042308270631adebbeab5 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 21 Oct 2024 14:44:01 -0500 Subject: [PATCH 05/12] add _normalize_group_dns method --- st2auth_ldap/ldap_backend.py | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/st2auth_ldap/ldap_backend.py b/st2auth_ldap/ldap_backend.py index 263cd39..78ef700 100644 --- a/st2auth_ldap/ldap_backend.py +++ b/st2auth_ldap/ldap_backend.py @@ -133,7 +133,7 @@ def __init__(self, bind_dn, bind_password, base_ou, group_dns, host, port=389, '%s.' % (group_dns_check, valid_values)) self._group_dns_check = group_dns_check - self._group_dns = group_dns + self._group_dns = self._normalize_group_dns(group_dns) self._cache_user_groups_response = cache_user_groups_response self._cache_user_groups_cache_ttl = int(cache_user_groups_cache_ttl) @@ -368,24 +368,28 @@ def _get_groups_for_user(self, connection, user_dn, username): return groups - def _verify_user_group_membership(self, username, required_groups, user_groups, - check_behavior='and'): + @staticmethod + def _normalize_group_dns(groups: List[str]) -> Set[str]: + """Normalize group DNs by lowercasing and parsing to strip spaces.""" + return { + ldap.dn.dn2str( + ldap.dn.str2dn(group.lower(), flags=ldap.DN_FORMAT_LDAPV3) + ) for group in groups + } + + def _verify_user_group_membership( + self, + username: str, + required_groups: Set[str], + user_groups: List[str], + check_behavior: str = "and", + ): """ Validate that the user is a member of required groups based on the check behavior defined in the config (and / or). """ - # normalize DN strings (lowercase, remove spaces, etc) - user_group_dns = [ - ldap.dn.str2dn(group.lower(), flags=ldap.DN_FORMAT_LDAPV3) - for group in user_groups - ] - required_group_dns = [ - ldap.dn.str2dn(group.lower(), flags=ldap.DN_FORMAT_LDAPV3) - for group in required_groups - ] - norm_user_groups = {ldap.dn.dn2str(dn) for dn in user_group_dns} - norm_required_groups = {ldap.dn.dn2str(dn) for dn in required_group_dns} + norm_user_groups = self._normalize_group_dns(user_groups) if check_behavior == 'and': additional_msg = ('user needs to be member of all the following groups "%s" for ' @@ -403,10 +407,10 @@ def _verify_user_group_membership(self, username, required_groups, user_groups, # simple fully qualified DN match if ( check_behavior == 'and' - and norm_required_groups.issubset(norm_user_groups) + and required_groups.issubset(norm_user_groups) ) or ( check_behavior == 'or' - and norm_required_groups.intersection(norm_user_groups) + and required_groups.intersection(norm_user_groups) ): return True From 227241fb22be9ec20526bca8a05c04606274044f Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Tue, 22 Oct 2024 10:45:36 -0500 Subject: [PATCH 06/12] add support for RDN matching of group names --- st2auth_ldap/ldap_backend.py | 77 +++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/st2auth_ldap/ldap_backend.py b/st2auth_ldap/ldap_backend.py index 78ef700..4b27587 100644 --- a/st2auth_ldap/ldap_backend.py +++ b/st2auth_ldap/ldap_backend.py @@ -20,6 +20,8 @@ import os import logging +from typing import List, Set, Tuple + import ldap import ldap.dn import ldap.filter @@ -134,6 +136,7 @@ def __init__(self, bind_dn, bind_password, base_ou, group_dns, host, port=389, self._group_dns_check = group_dns_check self._group_dns = self._normalize_group_dns(group_dns) + self._group_dns_are_fqdns = all(len(dn) > 1 for dn in self._group_dns) self._cache_user_groups_response = cache_user_groups_response self._cache_user_groups_cache_ttl = int(cache_user_groups_cache_ttl) @@ -175,12 +178,12 @@ def authenticate(self, username, password): try: user_groups = self._get_groups_for_user(connection=connection, user_dn=user_dn, username=username) + user_group_dns = self._normalize_group_dns(user_groups) - # Assume group entries are not case sensitive. - result = self._verify_user_group_membership(username=username, - required_groups=self._group_dns, - user_groups=user_groups, - check_behavior=self._group_dns_check) + result = self._verify_user_group_membership( + username=username, + user_group_dns=user_group_dns, + ) if not result: return False except Exception: @@ -369,27 +372,33 @@ def _get_groups_for_user(self, connection, user_dn, username): return groups @staticmethod - def _normalize_group_dns(groups: List[str]) -> Set[str]: - """Normalize group DNs by lowercasing and parsing to strip spaces.""" + def _normalize_group_dns(groups: List[str]) -> Set[Tuple[str]]: + """Normalize group DNs by lowercasing and parsing into RDN parts. + + Assumes group entries are not case sensitive. + Returns a set of tuples of "type=value" strings. + """ return { - ldap.dn.dn2str( - ldap.dn.str2dn(group.lower(), flags=ldap.DN_FORMAT_LDAPV3) - ) for group in groups + tuple(ldap.dn.explode_dn(group.lower(), flags=ldap.DN_FORMAT_LDAPV3)) + for group in groups } + @staticmethod + def _str_dns(dns: Set[Tuple[str]]) -> str: + return str(list(','.join(dn) for dn in sorted(dns))) + def _verify_user_group_membership( self, username: str, - required_groups: Set[str], - user_groups: List[str], - check_behavior: str = "and", + user_group_dns: Set[Tuple[str]], ): """ Validate that the user is a member of required groups based on the check behavior defined in the config (and / or). """ - - norm_user_groups = self._normalize_group_dns(user_groups) + required_group_dns = self._group_dns + check_behavior = self._group_dns_check # default: "and" + use_fqdns = self._group_dns_are_fqdns if check_behavior == 'and': additional_msg = ('user needs to be member of all the following groups "%s" for ' @@ -398,26 +407,46 @@ def _verify_user_group_membership( additional_msg = ('user needs to be member of one or more of the following groups "%s" ' 'for authentication to succeeed') - additional_msg = additional_msg % (str(list(sorted(required_groups)))) + additional_msg = additional_msg % self._str_dns(required_group_dns) LOG.debug( f'Verifying user group membership using "{check_behavior}" behavior ({additional_msg})' ) - # simple fully qualified DN match if ( - check_behavior == 'and' - and required_groups.issubset(norm_user_groups) + use_fqdns + and check_behavior == 'and' + and required_group_dns.issubset(norm_user_groups) ) or ( - check_behavior == 'or' - and required_groups.intersection(norm_user_groups) + use_fqdns + and check_behavior == 'or' + and required_group_dns.intersection(norm_user_groups) ): + # simple fully qualified DN(s) matched return True + elif not use_fqdns: + user_group_rdns = { + (group_dn[0],) for group_dn in user_group_dns + } + #need to check each required DN for RDN + for group_dn in required_group_dns: + has_group = False + if len(group_dn) == 1: + if group_dn in user_group_rdns: + has_group = True + else: + if group_dn in user_group_dns: + has_group = True + if check_behavior == 'or' and has_group: + return True + if check_behavior == 'and' and not has_group: + # missing a required group, no need to check more groups. + break LOG.exception( - f'Unable to verify membership for user "{username" ' - f"(required_groups={list(sorted(required_groups))}," - f"actual_groups={list(sorted(norm_user_groups))}," + f'Unable to verify membership for user "{username}" ' + f"(required_groups={self._str_dns(required_group_dns)}," + f"actual_groups={self._str_dns(user_group_dns)}," f"check_behavior={check_behavior})" ) From cf29b6fb828376c75997d4aaf8649adf9873e8d3 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Tue, 22 Oct 2024 10:58:58 -0500 Subject: [PATCH 07/12] fmt --- st2auth_ldap/ldap_backend.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/st2auth_ldap/ldap_backend.py b/st2auth_ldap/ldap_backend.py index 4b27587..103e215 100644 --- a/st2auth_ldap/ldap_backend.py +++ b/st2auth_ldap/ldap_backend.py @@ -397,7 +397,7 @@ def _verify_user_group_membership( in the config (and / or). """ required_group_dns = self._group_dns - check_behavior = self._group_dns_check # default: "and" + check_behavior = self._group_dns_check # default: "and" use_fqdns = self._group_dns_are_fqdns if check_behavior == 'and': @@ -414,21 +414,19 @@ def _verify_user_group_membership( ) if ( - use_fqdns - and check_behavior == 'and' - and required_group_dns.issubset(norm_user_groups) + use_fqdns and + check_behavior == 'and' and + required_group_dns.issubset(user_group_dns) ) or ( - use_fqdns - and check_behavior == 'or' - and required_group_dns.intersection(norm_user_groups) + use_fqdns and + check_behavior == 'or' and + required_group_dns.intersection(user_group_dns) ): # simple fully qualified DN(s) matched return True elif not use_fqdns: - user_group_rdns = { - (group_dn[0],) for group_dn in user_group_dns - } - #need to check each required DN for RDN + user_group_rdns = {(group_dn[0],) for group_dn in user_group_dns} + # need to check each required DN for RDN for group_dn in required_group_dns: has_group = False if len(group_dn) == 1: From dee1a56cd0b7624445728a1d115ed6ae9381085b Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Wed, 23 Oct 2024 14:22:23 -0500 Subject: [PATCH 08/12] Add parametrized test cases that use RDN instead of FQDN --- tests/unit/test_backend.py | 675 ++++++++++++++++++++++++------------- 1 file changed, 450 insertions(+), 225 deletions(-) diff --git a/tests/unit/test_backend.py b/tests/unit/test_backend.py index eda60cf..4be0d68 100644 --- a/tests/unit/test_backend.py +++ b/tests/unit/test_backend.py @@ -16,6 +16,7 @@ import os import time import uuid +from typing import List import ldap import pytest @@ -30,6 +31,11 @@ LDAP_BIND_DN = "cn=Administrator,cn=users,dc=stackstorm,dc=net" LDAP_BIND_PASSWORD = uuid.uuid4().hex LDAP_GROUP_DNS = ["cn=testers,dc=stackstorm,dc=net"] +LDAP_GROUP_RDNS = ["cn=testers"] +LDAP_GROUP_DNS_CASES = { + "group_fqdn": LDAP_GROUP_DNS, + "group_rdn": LDAP_GROUP_RDNS, +} LDAP_CACERT = "../fixtures/certs/cacert.pem" LDAP_CACERT_REAL_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), LDAP_CACERT @@ -142,16 +148,23 @@ def test_instantiate_no_group_dns_provided(): @pytest.mark.parametrize( - "mock_ldap_search", - ([LDAP_USER_SEARCH_RESULT, LDAP_GROUP_SEARCH_RESULT],), - indirect=True, + "required_group_dns,mock_ldap_search", + ( + pytest.param( + group_dns, [LDAP_USER_SEARCH_RESULT, LDAP_GROUP_SEARCH_RESULT], id=test_name + ) + for test_name, group_dns in LDAP_GROUP_DNS_CASES.items() + ), + indirect=["mock_ldap_search"], ) -def test_authenticate(mock_ldap_bind: MockType, mock_ldap_search: MockType): +def test_authenticate( + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType +): backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, LDAP_BASE_OU, - LDAP_GROUP_DNS, + required_group_dns, LDAP_HOST, id_attr=LDAP_ID_ATTR, ) @@ -222,18 +235,25 @@ def test_authenticate_failure_bad_bind_cred(mocker: MockerFixture): @pytest.mark.parametrize( - "mock_ldap_search", - ([LDAP_USER_SEARCH_RESULT, LDAP_GROUP_SEARCH_RESULT],), - indirect=True, + "required_group_dns,mock_ldap_search", + ( + pytest.param( + group_dns, [LDAP_USER_SEARCH_RESULT, LDAP_GROUP_SEARCH_RESULT], id=test_name + ) + for test_name, group_dns in LDAP_GROUP_DNS_CASES.items() + ), + indirect=["mock_ldap_search"], ) def test_authenticate_failure_bad_user_password( - mock_ldap_auth_failure: MockType, mock_ldap_search: MockType + required_group_dns: List[str], + mock_ldap_auth_failure: MockType, + mock_ldap_search: MockType, ): backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, LDAP_BASE_OU, - LDAP_GROUP_DNS, + required_group_dns, LDAP_HOST, id_attr=LDAP_ID_ATTR, ) @@ -243,22 +263,31 @@ def test_authenticate_failure_bad_user_password( @pytest.mark.parametrize( - "group_dns_check,mock_ldap_search", + "group_dns_check,required_group_dns,mock_ldap_search", ( - pytest.param(group_dns_check, [LDAP_USER_SEARCH_RESULT, []], id=group_dns_check) + pytest.param( + group_dns_check, + group_dns, + [LDAP_USER_SEARCH_RESULT, []], + id=f"{group_dns_check}-{test_name}", + ) for group_dns_check in ("and", "or") + for test_name, group_dns in LDAP_GROUP_DNS_CASES.items() ), indirect=["mock_ldap_search"], ) def test_authenticate_failure_non_group_member_no_groups( - group_dns_check: str, mock_ldap_bind: MockType, mock_ldap_search: MockType + group_dns_check: str, + required_group_dns: List[str], + mock_ldap_bind: MockType, + mock_ldap_search: MockType, ): # User is not a member of any of the required groups backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, LDAP_BASE_OU, - LDAP_GROUP_DNS, + required_group_dns, LDAP_HOST, id_attr=LDAP_ID_ATTR, group_dns_check=group_dns_check, @@ -269,26 +298,31 @@ def test_authenticate_failure_non_group_member_no_groups( @pytest.mark.parametrize( - "group_dns_check,mock_ldap_search", + "group_dns_check,required_group_dns,mock_ldap_search", ( pytest.param( group_dns_check, + group_dns, [LDAP_USER_SEARCH_RESULT, [("cn=group1,dc=stackstorm,dc=net", ())]], - id=group_dns_check, + id=f"{group_dns_check}-{test_name}", ) for group_dns_check in ("and", "or") + for test_name, group_dns in LDAP_GROUP_DNS_CASES.items() ), indirect=["mock_ldap_search"], ) def test_authenticate_failure_non_group_member_non_required_group( - group_dns_check: str, mock_ldap_bind: MockType, mock_ldap_search: MockType + group_dns_check: str, + required_group_dns: List[str], + mock_ldap_bind: MockType, + mock_ldap_search: MockType, ): # User is member of a group which is not required backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, LDAP_BASE_OU, - LDAP_GROUP_DNS, + required_group_dns, LDAP_HOST, id_attr=LDAP_ID_ATTR, group_dns_check=group_dns_check, @@ -299,28 +333,41 @@ def test_authenticate_failure_non_group_member_non_required_group( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, + pytest.param( + group_dns, [ - ("cn=group1,dc=stackstorm,dc=net", ()), - ("cn=group3,dc=stackstorm,dc=net", ()), + LDAP_USER_SEARCH_RESULT, + [ + ("cn=group1,dc=stackstorm,dc=net", ()), + ("cn=group3,dc=stackstorm,dc=net", ()), + ], ], - ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group1,dc=stackstorm,dc=net", + "cn=group2,dc=stackstorm,dc=net", + "cn=group3,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group1", + "cn=group2", + "cn=group3", + ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_and_behavior_failure_non_group_member_of_all_required_groups_1( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], + mock_ldap_bind: MockType, + mock_ldap_search: MockType, ): # User is member of two of the required groups (1 and 3) but not all three of them # (1, 2, 3) - required_group_dns = [ - "cn=group1,dc=stackstorm,dc=net", - "cn=group2,dc=stackstorm,dc=net", - "cn=group3,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -336,26 +383,36 @@ def test_authenticate_and_behavior_failure_non_group_member_of_all_required_grou @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, + pytest.param( + group_dns, [ - ("cn=group1,dc=stackstorm,dc=net", ()), - ("cn=group3,dc=stackstorm,dc=net", ()), + LDAP_USER_SEARCH_RESULT, + [ + ("cn=group1,dc=stackstorm,dc=net", ()), + ("cn=group3,dc=stackstorm,dc=net", ()), + ], ], - ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group7,dc=stackstorm,dc=net", + "cn=group8,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group7", + "cn=group8", + ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_and_behavior_failure_non_group_member_of_all_required_groups_2( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is member of two of the groups, but none of them are required - required_group_dns = [ - "cn=group7,dc=stackstorm,dc=net", - "cn=group8,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -371,31 +428,43 @@ def test_authenticate_and_behavior_failure_non_group_member_of_all_required_grou @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, + pytest.param( + group_dns, [ - ("cn=group1,dc=stackstorm,dc=net", ()), - ("cn=group2,dc=stackstorm,dc=net", ()), - ("cn=group3,dc=stackstorm,dc=net", ()), - ("cn=group4,dc=stackstorm,dc=net", ()), + LDAP_USER_SEARCH_RESULT, + [ + ("cn=group1,dc=stackstorm,dc=net", ()), + ("cn=group2,dc=stackstorm,dc=net", ()), + ("cn=group3,dc=stackstorm,dc=net", ()), + ("cn=group4,dc=stackstorm,dc=net", ()), + ], ], - ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group1,dc=stackstorm,dc=net", + "cn=group2,dc=stackstorm,dc=net", + "cn=group5,dc=stackstorm,dc=net", + "cn=group6,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group1", + "cn=group2", + "cn=group5", + "cn=group6", + ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_and_behavior_failure_non_group_member_of_all_required_groups_3( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is member of two of the required groups and two non-required, but not # all of the required groups - required_group_dns = [ - "cn=group1,dc=stackstorm,dc=net", - "cn=group2,dc=stackstorm,dc=net", - "cn=group5,dc=stackstorm,dc=net", - "cn=group6,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -411,32 +480,44 @@ def test_authenticate_and_behavior_failure_non_group_member_of_all_required_grou @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, + pytest.param( + group_dns, [ - ("cn=group1,dc=stackstorm,dc=net", ()), - ("cn=group2,dc=stackstorm,dc=net", ()), - ("cn=group3,dc=stackstorm,dc=net", ()), - ("cn=group4,dc=stackstorm,dc=net", ()), + LDAP_USER_SEARCH_RESULT, + [ + ("cn=group1,dc=stackstorm,dc=net", ()), + ("cn=group2,dc=stackstorm,dc=net", ()), + ("cn=group3,dc=stackstorm,dc=net", ()), + ("cn=group4,dc=stackstorm,dc=net", ()), + ], + ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group1,dc=stackstorm,dc=net", + "cn=group2,dc=stackstorm,dc=net", + "cn=group5,dc=stackstorm,dc=net", + "cn=group6,dc=stackstorm,dc=net", ], - ], + "group_rdn": [ + "cn=group1", + "cn=group2", + "cn=group5", + "cn=group6", + ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_and_is_default_behavior_non_group_member_of_all_required_groups( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is member of two of the required groups and two non-required, but not # all of the required groups # Verify "and" is a default group_dns_check_behavior - required_group_dns = [ - "cn=group1,dc=stackstorm,dc=net", - "cn=group2,dc=stackstorm,dc=net", - "cn=group5,dc=stackstorm,dc=net", - "cn=group6,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -451,12 +532,22 @@ def test_authenticate_and_is_default_behavior_non_group_member_of_all_required_g @pytest.mark.parametrize( - "mock_ldap_search", - ([LDAP_USER_SEARCH_RESULT, [("cn=group1,dc=stackstorm,dc=net", ())]],), - indirect=True, + "required_group_dns,mock_ldap_search", + ( + pytest.param( + group_dns, + [LDAP_USER_SEARCH_RESULT, [("cn=group1,dc=stackstorm,dc=net", ())]], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": ["cn=group1,dc=stackstorm,dc=net"], + "group_rdn": ["cn=group1"], + }.items() + ), + indirect=["mock_ldap_search"], ) def test_authenticate_or_behavior_success_member_of_single_group_1( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is a member of single of possible required groups required_group_dns = ["cn=group1,dc=stackstorm,dc=net"] @@ -475,20 +566,34 @@ def test_authenticate_or_behavior_success_member_of_single_group_1( @pytest.mark.parametrize( - "mock_ldap_search", - ([LDAP_USER_SEARCH_RESULT, [("cn=group1,dc=stackstorm,dc=net", ())]],), - indirect=True, + "required_group_dns,mock_ldap_search", + ( + pytest.param( + group_dns, + [LDAP_USER_SEARCH_RESULT, [("cn=group1,dc=stackstorm,dc=net", ())]], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group1,dc=stackstorm,dc=net", + "cn=group2,dc=stackstorm,dc=net", + "cn=group3,dc=stackstorm,dc=net", + "cn=group4,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group1", + "cn=group2", + "cn=group3", + "cn=group4", + ], + }.items() + ), + indirect=["mock_ldap_search"], ) def test_authenticate_or_behavior_success_member_of_single_group_2( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is a member of single of possible required groups - required_group_dns = [ - "cn=group1,dc=stackstorm,dc=net", - "cn=group2,dc=stackstorm,dc=net", - "cn=group3,dc=stackstorm,dc=net", - "cn=group4,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -504,20 +609,34 @@ def test_authenticate_or_behavior_success_member_of_single_group_2( @pytest.mark.parametrize( - "mock_ldap_search", - ([LDAP_USER_SEARCH_RESULT, [("cn=group3,dc=stackstorm,dc=net", ())]],), - indirect=True, + "required_group_dns,mock_ldap_search", + ( + pytest.param( + group_dns, + [LDAP_USER_SEARCH_RESULT, [("cn=group3,dc=stackstorm,dc=net", ())]], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group1,dc=stackstorm,dc=net", + "cn=group2,dc=stackstorm,dc=net", + "cn=group3,dc=stackstorm,dc=net", + "cn=group4,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group1", + "cn=group2", + "cn=group3", + "cn=group4", + ], + }.items() + ), + indirect=["mock_ldap_search"], ) def test_authenticate_or_behavior_success_member_of_single_group_2b( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is a member of single of possible required groups - required_group_dns = [ - "cn=group1,dc=stackstorm,dc=net", - "cn=group2,dc=stackstorm,dc=net", - "cn=group3,dc=stackstorm,dc=net", - "cn=group4,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -533,29 +652,42 @@ def test_authenticate_or_behavior_success_member_of_single_group_2b( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, + pytest.param( + group_dns, [ - ("cn=group1,dc=stackstorm,dc=net", ()), - ("cn=group4,dc=stackstorm,dc=net", ()), + LDAP_USER_SEARCH_RESULT, + [ + ("cn=group1,dc=stackstorm,dc=net", ()), + ("cn=group4,dc=stackstorm,dc=net", ()), + ], + ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group1,dc=stackstorm,dc=net", + "cn=group2,dc=stackstorm,dc=net", + "cn=group3,dc=stackstorm,dc=net", + "cn=group4,dc=stackstorm,dc=net", + "cn=group5,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group1", + "cn=group2", + "cn=group3", + "cn=group4", + "cn=group5", ], - ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_or_behavior_success_member_of_multiple_groups_1( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is a member of multiple of required groups - required_group_dns = [ - "cn=group1,dc=stackstorm,dc=net", - "cn=group2,dc=stackstorm,dc=net", - "cn=group3,dc=stackstorm,dc=net", - "cn=group4,dc=stackstorm,dc=net", - "cn=group5,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -571,26 +703,36 @@ def test_authenticate_or_behavior_success_member_of_multiple_groups_1( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, + pytest.param( + group_dns, [ - ("cn=group1,dc=stackstorm,dc=net", ()), - ("cn=group4,dc=stackstorm,dc=net", ()), + LDAP_USER_SEARCH_RESULT, + [ + ("cn=group1,dc=stackstorm,dc=net", ()), + ("cn=group4,dc=stackstorm,dc=net", ()), + ], + ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group1,dc=stackstorm,dc=net", + "cn=group4,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group1", + "cn=group4", ], - ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_or_behavior_success_member_of_multiple_groups_2( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is a member of multiple of required groups - required_group_dns = [ - "cn=group1,dc=stackstorm,dc=net", - "cn=group4,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -606,24 +748,31 @@ def test_authenticate_or_behavior_success_member_of_multiple_groups_2( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, + pytest.param( + group_dns, [ - ("cn=group1,dc=stackstorm,dc=net", ()), - ("cn=group3,dc=stackstorm,dc=net", ()), - ("cn=group6,dc=stackstorm,dc=net", ()), + LDAP_USER_SEARCH_RESULT, + [ + ("cn=group1,dc=stackstorm,dc=net", ()), + ("cn=group3,dc=stackstorm,dc=net", ()), + ("cn=group6,dc=stackstorm,dc=net", ()), + ], ], - ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": ["cn=group3,dc=stackstorm,dc=net"], + "group_rdn": ["cn=group3"], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_or_behavior_success_member_of_multiple_groups_3( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): - # User is a member of multiple of required groups - required_group_dns = ["cn=group3,dc=stackstorm,dc=net"] + # User is a member of multiple groups including the required group backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -639,27 +788,37 @@ def test_authenticate_or_behavior_success_member_of_multiple_groups_3( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, + pytest.param( + group_dns, [ - ("cn=group1,dc=stackstorm,dc=net", ()), - ("cn=group3,dc=stackstorm,dc=net", ()), - ("cn=group6,dc=stackstorm,dc=net", ()), + LDAP_USER_SEARCH_RESULT, + [ + ("cn=group1,dc=stackstorm,dc=net", ()), + ("cn=group3,dc=stackstorm,dc=net", ()), + ("cn=group6,dc=stackstorm,dc=net", ()), + ], ], - ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group3,dc=stackstorm,dc=net", + "cn=group1,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group3", + "cn=group1", + ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_or_behavior_success_member_of_multiple_groups_3b( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): # User is a member of multiple of required groups - required_group_dns = [ - "cn=group3,dc=stackstorm,dc=net", - "cn=group1,dc=stackstorm,dc=net", - ] backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -900,16 +1059,23 @@ def test_get_user_multiple_results( @pytest.mark.parametrize( - "mock_ldap_search", - ([LDAP_USER_SEARCH_RESULT, LDAP_GROUP_SEARCH_RESULT],), - indirect=True, + "required_group_dns,mock_ldap_search", + ( + pytest.param( + group_dns, [LDAP_USER_SEARCH_RESULT, LDAP_GROUP_SEARCH_RESULT], id=test_name + ) + for test_name, group_dns in LDAP_GROUP_DNS_CASES.items() + ), + indirect=["mock_ldap_search"], ) -def test_get_user_groups(mock_ldap_bind: MockType, mock_ldap_search: MockType): +def test_get_user_groups( + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType +): backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, LDAP_BASE_OU, - LDAP_GROUP_DNS, + required_group_dns, LDAP_HOST, id_attr=LDAP_ID_ATTR, ) @@ -921,22 +1087,28 @@ def test_get_user_groups(mock_ldap_bind: MockType, mock_ldap_search: MockType): @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, - [("cn=group1,dc=stackstorm,dc=net", ())], - LDAP_USER_SEARCH_RESULT, - [("cn=group1,dc=stackstorm,dc=net", ())], - ], + pytest.param( + group_dns, + [ + LDAP_USER_SEARCH_RESULT, + [("cn=group1,dc=stackstorm,dc=net", ())], + LDAP_USER_SEARCH_RESULT, + [("cn=group1,dc=stackstorm,dc=net", ())], + ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": ["cn=group1,dc=stackstorm,dc=net"], + "group_rdn": ["cn=group1"], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_and_get_user_groups_caching_disabled( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): - required_group_dns = ["cn=group1,dc=stackstorm,dc=net"] - backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -963,22 +1135,28 @@ def test_authenticate_and_get_user_groups_caching_disabled( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, - [("cn=group1,dc=stackstorm,dc=net", ())], - LDAP_USER_SEARCH_RESULT, - [("cn=group1,dc=stackstorm,dc=net", ())], - ], + pytest.param( + group_dns, + [ + LDAP_USER_SEARCH_RESULT, + [("cn=group1,dc=stackstorm,dc=net", ())], + LDAP_USER_SEARCH_RESULT, + [("cn=group1,dc=stackstorm,dc=net", ())], + ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": ["cn=group1,dc=stackstorm,dc=net"], + "group_rdn": ["cn=group1"], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_authenticate_and_get_user_groups_caching_enabled( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): - required_group_dns = ["cn=group1,dc=stackstorm,dc=net"] - backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, @@ -1003,18 +1181,33 @@ def test_authenticate_and_get_user_groups_caching_enabled( @pytest.mark.parametrize( - "mock_ldap_search", - ([LDAP_USER_SEARCH_RESULT],), - indirect=True, + "required_group_dns,mock_ldap_search", + ( + pytest.param( + group_dns, + [LDAP_USER_SEARCH_RESULT], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group3,dc=stackstorm,dc=net", + "cn=group4,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group3", + "cn=group4", + ], + }.items() + ), + indirect=["mock_ldap_search"], ) def test_get_user_specifying_account_pattern( - mock_ldap_bind: MockType, mock_ldap_search: MockType, mocker: MockerFixture + required_group_dns: List[str], + mock_ldap_bind: MockType, + mock_ldap_search: MockType, + mocker: MockerFixture, ): expected_username = "unique_username_1" - required_group_dns = [ - "cn=group3,dc=stackstorm,dc=net", - "cn=group4,dc=stackstorm,dc=net", - ] scope = "subtree" scope_number = ldap_backend.SEARCH_SCOPES[scope] @@ -1052,26 +1245,39 @@ def test_get_user_specifying_account_pattern( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, - [("cn=group3,dc=stackstorm,dc=net", ())], - LDAP_USER_SEARCH_RESULT, - [("cn=group4,dc=stackstorm,dc=net", ())], - ], + pytest.param( + group_dns, + [ + LDAP_USER_SEARCH_RESULT, + [("cn=group3,dc=stackstorm,dc=net", ())], + LDAP_USER_SEARCH_RESULT, + [("cn=group4,dc=stackstorm,dc=net", ())], + ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group3,dc=stackstorm,dc=net", + "cn=group4,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group3", + "cn=group4", + ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_get_user_groups_specifying_group_pattern( - mock_ldap_bind: MockType, mock_ldap_search: MockType, mocker: MockerFixture + required_group_dns: List[str], + mock_ldap_bind: MockType, + mock_ldap_search: MockType, + mocker: MockerFixture, ): expected_user_dn = "unique_userdn_1" expected_username = "unique_username_2" - required_group_dns = [ - "cn=group3,dc=stackstorm,dc=net", - "cn=group4,dc=stackstorm,dc=net", - ] scope = "subtree" scope_number = ldap_backend.SEARCH_SCOPES[scope] @@ -1118,24 +1324,34 @@ def test_get_user_groups_specifying_group_pattern( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, - [("cn=group3,dc=stackstorm,dc=net", ())], - LDAP_USER_SEARCH_RESULT, - [("cn=group4,dc=stackstorm,dc=net", ())], - ], + pytest.param( + group_dns, + [ + LDAP_USER_SEARCH_RESULT, + [("cn=group3,dc=stackstorm,dc=net", ())], + LDAP_USER_SEARCH_RESULT, + [("cn=group4,dc=stackstorm,dc=net", ())], + ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group3,dc=stackstorm,dc=net", + "cn=group4,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group3", + "cn=group4", + ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_get_groups_caching_no_cross_username_cache_pollution( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): - required_group_dns = [ - "cn=group3,dc=stackstorm,dc=net", - "cn=group4,dc=stackstorm,dc=net", - ] # Test which verifies that cache items are correctly scoped per username backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, @@ -1161,25 +1377,34 @@ def test_get_groups_caching_no_cross_username_cache_pollution( @pytest.mark.parametrize( - "mock_ldap_search", + "required_group_dns,mock_ldap_search", ( - [ - LDAP_USER_SEARCH_RESULT, - [("cn=group3,dc=stackstorm,dc=net", ())], - LDAP_USER_SEARCH_RESULT, - [("cn=group4,dc=stackstorm,dc=net", ())], - ], + pytest.param( + group_dns, + [ + LDAP_USER_SEARCH_RESULT, + [("cn=group3,dc=stackstorm,dc=net", ())], + LDAP_USER_SEARCH_RESULT, + [("cn=group4,dc=stackstorm,dc=net", ())], + ], + id=test_name, + ) + for test_name, group_dns in { + "group_fqdn": [ + "cn=group3,dc=stackstorm,dc=net", + "cn=group4,dc=stackstorm,dc=net", + ], + "group_rdn": [ + "cn=group3", + "cn=group4", + ], + }.items() ), - indirect=True, + indirect=["mock_ldap_search"], ) def test_get_groups_caching_cache_ttl( - mock_ldap_bind: MockType, mock_ldap_search: MockType + required_group_dns: List[str], mock_ldap_bind: MockType, mock_ldap_search: MockType ): - required_group_dns = [ - "cn=group3,dc=stackstorm,dc=net", - "cn=group4,dc=stackstorm,dc=net", - ] - backend = ldap_backend.LDAPAuthenticationBackend( LDAP_BIND_DN, LDAP_BIND_PASSWORD, From f48dc65ea89ab09d47df77666b2207731a3be2b9 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Wed, 23 Oct 2024 14:22:55 -0500 Subject: [PATCH 09/12] Fix RDN handling to make tests pass --- st2auth_ldap/ldap_backend.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/st2auth_ldap/ldap_backend.py b/st2auth_ldap/ldap_backend.py index 103e215..1de72f9 100644 --- a/st2auth_ldap/ldap_backend.py +++ b/st2auth_ldap/ldap_backend.py @@ -427,19 +427,21 @@ def _verify_user_group_membership( elif not use_fqdns: user_group_rdns = {(group_dn[0],) for group_dn in user_group_dns} # need to check each required DN for RDN + found_groups = 0 for group_dn in required_group_dns: - has_group = False if len(group_dn) == 1: - if group_dn in user_group_rdns: - has_group = True + has_group = group_dn in user_group_rdns else: - if group_dn in user_group_dns: - has_group = True + has_group = group_dn in user_group_dns if check_behavior == 'or' and has_group: return True - if check_behavior == 'and' and not has_group: - # missing a required group, no need to check more groups. - break + if check_behavior == 'and': + if not has_group: + # missing a required group, no need to check more groups. + break + found_groups += 1 + if check_behavior == 'and' and found_groups == len(required_group_dns): + return True LOG.exception( f'Unable to verify membership for user "{username}" ' From eb8d13dde3a1e4c554dabc14899a49082b62bb7f Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Wed, 23 Oct 2024 16:25:18 -0500 Subject: [PATCH 10/12] Add more docs about `group_dns` parameter --- README.md | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 96e3763..2cca00e 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Ubuntu / Debian sudo apt-get install -y python-dev libldap2-dev libsasl2-dev libssl-dev ldap-utils ``` -CentOS / RHEL / Fedora +Rocky / CentOS / RHEL / Fedora ```bash sudo dnf install python2-devel python3-devel openldap-devel @@ -22,8 +22,8 @@ sudo dnf install python2-devel python3-devel openldap-devel |----------------------------|----------|----------------|--------------------------------------------------------------------------------------------------------------------------------| | bind_dn | yes | | DN of the service account to bind with the LDAP server | | bind_password | yes | | Password of the service account | -| base_ou | yes | | Base OU to search for user and group entries | -| group_dns | yes | | Which groups user must be member of to be granted access (group names are considered case-insensitive) | +| base_ou | yes | | Base OU to search for user entries (and group entries if `base_ou_group` is not provided) | +| group_dns | yes | | Which group DNs (Distinguished Names) a user must be member of to be granted access (group names are considered case-insensitive). Each group must be a fully-qualified DN, or just the last RDN (Relative DN) which is typically a `CN=` entry (like `CN=st2users`). | | group_dns_check | no | `and` | What kind of check to perform when validating user group membership (`and` / `or`). When `and` behavior is used, user needs to be part of all the specified groups and when `or` behavior is used, user needs to be part of at least one or more of the specified groups. | | host | yes | | Hostname of the LDAP server. Multiple comma-separated entries are allowed. | | port | yes | | Port of the LDAP server | @@ -33,14 +33,14 @@ sudo dnf install python2-devel python3-devel openldap-devel | id_attr | no | `uid` | Field name of the user ID attribute; ignored if `account_pattern` is specified. | | account_pattern | no | `{id_attr}={{username}}` | LDAP subtree pattern to match user. The user's `username` is escaped and interpolated into this string (see example). | | group_pattern | no | `(\|(&(objectClass=*)(\|(member={user_dn})(uniqueMember={user_dn})(memberUid={username}))))` | LDAP subtree pattern for user groups. Both `user_dn` and `username` are escaped and then interpolated into this string (see example). | -| scope | no | `subtree` | Search scope (base, onelevel, or subtree) | +| scope | no | `subtree` | Search scope (`base`, `onelevel`, or `subtree`) | | network_timeout | no | `10.0` | Timeout for network operations (in seconds) | | chase_referrals | no | `false` | True if the referrals should be automatically chased within the underlying LDAP C lib | | debug | no | `false` | Enable debug mode. When debug mode is enabled all the calls (including the results) to LDAP server are logged | | client_options | no | | A dictionary with additional Python LDAP client options which can be passed to `set_connection()` method | | cache_user_groups_response | no | `true` | When true, LDAP user groups response is cached for 120 seconds (by default) in memory. This decreases load on LDAP server and increases performance when remote LDAP group to RBAC role sync is enabled and / or when the same user authenticates concurrency in a short time frame. Keep in mind that even when this feature is enabled, single (authenticate) request to LDAP server will still be performed when user authenticates to st2auth - authentication information is not cached - only user groups are cached. | -| cache_user_groups_ttl | no | `120` | How long (in seconds) | -| base_ou_group | no | `None` | Base OU to search for group entries. If not specified will default to None and take value of base_ou | +| cache_user_groups_ttl | no | `120` | How long (in seconds) to cache LDAP user groups responses. | +| base_ou_group | no | `None` (If `None`, uses value of `base_ou`) | Base OU to search for group entries. If this is `None`, group searches use the value of `base_ou` instead. Defaults to `None`. | ### Escaping Password Characters @@ -169,6 +169,29 @@ This search string will query for LDAP objects that have `objectClass` attribute The `user_dn` value is the user's `bind_dn` attribute returned by the LDAP server in step 2. +### Configuration Specifying `group_dns` + +The `group_dns` option takes a list of fully-qualified DNs (Distinguished Names), or RDNs (Relative Distinguished Names). + +```ini +[auth] +mode = standalone +backend = ldap +backend_kwargs = {"bind_dn": "CN=st2admin,ou=users,dc=example,dc=com", "bind_password": "foobar123", "base_ou": "dc=example,dc=com", "group_dns": ["CN=st2users", "CN=st2developers"], "host": "identity.example.com", "port": 636, "use_ssl": true, "cacert": "/path/to/cacert.pem"} +enable = True +debug = False +use_ssl = True +cert = /path/to/mycert.crt +key = /path/to/mycert.key +logging = /path/to/st2auth.logging.conf +api_url = http://myhost.example.com:9101/ +``` + +#### Explanation + +In this example, the `group_dns` option has the RDNs `CN=st2users` and `CN=st2developers` instead of the fully qualified DNs `CN=st2users,ou=groups,dc=example,dc=com` and `CN=st2developers,ou=groups,dc=example,dc=com`. +Using RDNs can be especially helpful when using Active Directory as group names (the `CN` group attributes) are globally unique, no matter how they are organized (no matter which `OU` is the parent). + ## Running tests Unit tests: From 891c58fd85d8f5d3fa345e96ff522257d6b44e81 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Wed, 23 Oct 2024 16:26:02 -0500 Subject: [PATCH 11/12] Reformat README table --- README.md | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 2cca00e..34b886c 100644 --- a/README.md +++ b/README.md @@ -18,29 +18,29 @@ sudo dnf install python2-devel python3-devel openldap-devel ## Configuration Options -| option | required | default | description | -|----------------------------|----------|----------------|--------------------------------------------------------------------------------------------------------------------------------| -| bind_dn | yes | | DN of the service account to bind with the LDAP server | -| bind_password | yes | | Password of the service account | -| base_ou | yes | | Base OU to search for user entries (and group entries if `base_ou_group` is not provided) | -| group_dns | yes | | Which group DNs (Distinguished Names) a user must be member of to be granted access (group names are considered case-insensitive). Each group must be a fully-qualified DN, or just the last RDN (Relative DN) which is typically a `CN=` entry (like `CN=st2users`). | -| group_dns_check | no | `and` | What kind of check to perform when validating user group membership (`and` / `or`). When `and` behavior is used, user needs to be part of all the specified groups and when `or` behavior is used, user needs to be part of at least one or more of the specified groups. | -| host | yes | | Hostname of the LDAP server. Multiple comma-separated entries are allowed. | -| port | yes | | Port of the LDAP server | -| use_ssl | no | `false` | Use LDAPS to connect | -| use_tls | no | `false` | Start TLS on LDAP to connect | -| cacert | no | `None` | Path to the CA cert used to validate certificate | -| id_attr | no | `uid` | Field name of the user ID attribute; ignored if `account_pattern` is specified. | -| account_pattern | no | `{id_attr}={{username}}` | LDAP subtree pattern to match user. The user's `username` is escaped and interpolated into this string (see example). | -| group_pattern | no | `(\|(&(objectClass=*)(\|(member={user_dn})(uniqueMember={user_dn})(memberUid={username}))))` | LDAP subtree pattern for user groups. Both `user_dn` and `username` are escaped and then interpolated into this string (see example). | -| scope | no | `subtree` | Search scope (`base`, `onelevel`, or `subtree`) | -| network_timeout | no | `10.0` | Timeout for network operations (in seconds) | -| chase_referrals | no | `false` | True if the referrals should be automatically chased within the underlying LDAP C lib | -| debug | no | `false` | Enable debug mode. When debug mode is enabled all the calls (including the results) to LDAP server are logged | -| client_options | no | | A dictionary with additional Python LDAP client options which can be passed to `set_connection()` method | -| cache_user_groups_response | no | `true` | When true, LDAP user groups response is cached for 120 seconds (by default) in memory. This decreases load on LDAP server and increases performance when remote LDAP group to RBAC role sync is enabled and / or when the same user authenticates concurrency in a short time frame. Keep in mind that even when this feature is enabled, single (authenticate) request to LDAP server will still be performed when user authenticates to st2auth - authentication information is not cached - only user groups are cached. | -| cache_user_groups_ttl | no | `120` | How long (in seconds) to cache LDAP user groups responses. | -| base_ou_group | no | `None` (If `None`, uses value of `base_ou`) | Base OU to search for group entries. If this is `None`, group searches use the value of `base_ou` instead. Defaults to `None`. | +| option | required | default | description | +|----------------------------|----------|----------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| bind_dn | yes | | DN of the service account to bind with the LDAP server | +| bind_password | yes | | Password of the service account | +| base_ou | yes | | Base OU to search for user entries (and group entries if `base_ou_group` is not provided) | +| group_dns | yes | | Which group DNs (Distinguished Names) a user must be member of to be granted access (group names are considered case-insensitive). Each group must be a fully-qualified DN, or just the last RDN (Relative DN) which is typically a `CN=` entry (like `CN=st2users`). | +| group_dns_check | no | `and` | What kind of check to perform when validating user group membership (`and` / `or`). When `and` behavior is used, user needs to be part of all the specified groups and when `or` behavior is used, user needs to be part of at least one or more of the specified groups. | +| host | yes | | Hostname of the LDAP server. Multiple comma-separated entries are allowed. | +| port | yes | | Port of the LDAP server | +| use_ssl | no | `false` | Use LDAPS to connect | +| use_tls | no | `false` | Start TLS on LDAP to connect | +| cacert | no | `None` | Path to the CA cert used to validate certificate | +| id_attr | no | `uid` | Field name of the user ID attribute; ignored if `account_pattern` is specified. | +| account_pattern | no | `{id_attr}={{username}}` | LDAP subtree pattern to match user. The user's `username` is escaped and interpolated into this string (see example). | +| group_pattern | no | `(\|(&(objectClass=*)(\|(member={user_dn})(uniqueMember={user_dn})(memberUid={username}))))` | LDAP subtree pattern for user groups. Both `user_dn` and `username` are escaped and then interpolated into this string (see example). | +| scope | no | `subtree` | Search scope (`base`, `onelevel`, or `subtree`) | +| network_timeout | no | `10.0` | Timeout for network operations (in seconds) | +| chase_referrals | no | `false` | True if the referrals should be automatically chased within the underlying LDAP C lib | +| debug | no | `false` | Enable debug mode. When debug mode is enabled all the calls (including the results) to LDAP server are logged | +| client_options | no | | A dictionary with additional Python LDAP client options which can be passed to `set_connection()` method | +| cache_user_groups_response | no | `true` | When true, LDAP user groups response is cached for 120 seconds (by default) in memory. This decreases load on LDAP server and increases performance when remote LDAP group to RBAC role sync is enabled and / or when the same user authenticates concurrency in a short time frame. Keep in mind that even when this feature is enabled, single (authenticate) request to LDAP server will still be performed when user authenticates to st2auth - authentication information is not cached - only user groups are cached. | +| cache_user_groups_ttl | no | `120` | How long (in seconds) to cache LDAP user groups responses. | +| base_ou_group | no | `None` (If `None`, uses value of `base_ou`) | Base OU to search for group entries. If this is `None`, group searches use the value of `base_ou` instead. Defaults to `None`. | ### Escaping Password Characters From 006b570936e0b0da9bb1f5d0669364fc8f9a9c2f Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Wed, 23 Oct 2024 16:33:59 -0500 Subject: [PATCH 12/12] Add note about OU != groups. --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 34b886c..b9400ec 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,10 @@ logging = /path/to/st2auth.logging.conf api_url = http://myhost.example.com:9101/ ``` +Note: groups, like users, are objects in LDAP. Groups are not the same as the `OU` (Organizational Unit) "folders" that you can see in DNs. +To login, users must be members of all groups--identified by their `DN` or `RDN`--in the `base_dns` option (assuming `group_dns_check` is `and`). +If `group_dns_check` is `or`, then the user only needs to be in one of the required groups, not all of them. + #### Explanation In this example, the `group_dns` option has the RDNs `CN=st2users` and `CN=st2developers` instead of the fully qualified DNs `CN=st2users,ou=groups,dc=example,dc=com` and `CN=st2developers,ou=groups,dc=example,dc=com`.