diff --git a/posthog/rbac/test/test_user_access_control.py b/posthog/rbac/test/test_user_access_control.py index e683672a98e4f..366b5f5d3e185 100644 --- a/posthog/rbac/test/test_user_access_control.py +++ b/posthog/rbac/test/test_user_access_control.py @@ -1,6 +1,8 @@ import pytest from posthog.constants import AvailableFeature -from posthog.models.organization import OrganizationMembership +from posthog.models.dashboard import Dashboard +from posthog.models.organization import Organization, OrganizationMembership +from posthog.models.team.team import Team from posthog.models.user import User from posthog.rbac.user_access_control import UserAccessControl from posthog.test.base import BaseTest @@ -13,8 +15,7 @@ pass -@pytest.mark.ee -class TestUserTeamPermissions(BaseTest): +class BaseUserAccessControlTest(BaseTest): user_access_control: UserAccessControl def _create_access_control( @@ -50,15 +51,24 @@ def setUp(self): self.user_with_no_role = User.objects.create_and_join(self.organization, "norole@posthog.com", "testtest") self.user_with_no_role_access_control = UserAccessControl(self.user_with_no_role, self.team) - def test_ac_object_default_response_without_available_feature(self): + def _clear_uac_caches(self): + self.user_access_control._clear_cache() + self.other_user_access_control._clear_cache() + self.user_with_no_role_access_control._clear_cache() + + +@pytest.mark.ee +class TestUserAccessControl(BaseUserAccessControlTest): + def test_without_available_features(self): self.organization.available_features = [] self.organization.save() + self.organization_membership.level = OrganizationMembership.Level.ADMIN + self.organization_membership.save() - # This always will return false - the user of this class will take care of it - assert self.user_access_control.access_level_for_object(self.team) is None - assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False - assert self.other_user_access_control.access_level_for_object(self.team) is None - assert self.other_user_access_control.check_access_level_for_object(self.team, "admin") is False + assert self.user_access_control.access_level_for_object(self.team) == "admin" + assert self.other_user_access_control.access_level_for_object(self.team) == "member" + assert self.user_access_control.access_level_for_resource("project") == "admin" + assert self.other_user_access_control.access_level_for_resource("project") == "member" def test_ac_object_default_response(self): assert self.user_access_control.access_level_for_object(self.team) == "member" @@ -81,6 +91,7 @@ def test_ac_object_user_access_control(self): ac.access_level = "member" ac.save() + self._clear_uac_caches() assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False assert self.user_access_control.check_access_level_for_object(self.team, "member") is True @@ -91,7 +102,7 @@ def test_ac_object_user_access_control(self): def test_ac_object_project_access_control(self): # Setup no access by default - ac = self._create_access_control(access_level="none") + ac = self._create_access_control(resource_id=self.team.id, access_level="none") assert self.user_access_control.access_level_for_object(self.team) == "none" assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False @@ -99,6 +110,7 @@ def test_ac_object_project_access_control(self): ac.access_level = "member" ac.save() + self._clear_uac_caches() assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False assert self.user_access_control.check_access_level_for_object(self.team, "member") is True @@ -107,12 +119,13 @@ def test_ac_object_project_access_control(self): ac.access_level = "admin" ac.save() + self._clear_uac_caches() assert self.user_access_control.check_access_level_for_object(self.team, "admin") is True assert self.other_user_access_control.check_access_level_for_object(self.team, "admin") is True def test_ac_object_role_access_control(self): - ac = self._create_access_control(access_level="admin", role=self.role_a) + ac = self._create_access_control(resource_id=self.team.id, access_level="admin", role=self.role_a) assert self.user_access_control.access_level_for_object(self.team) == "admin" assert self.user_access_control.check_access_level_for_object(self.team, "admin") is True @@ -121,9 +134,10 @@ def test_ac_object_role_access_control(self): ac.access_level = "member" ac.save() + self._clear_uac_caches() # Make the default access level none - self._create_access_control(access_level="none") + self._create_access_control(resource_id=self.team.id, access_level="none") assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False assert self.user_access_control.check_access_level_for_object(self.team, "member") is True @@ -133,13 +147,17 @@ def test_ac_object_role_access_control(self): def test_ac_object_mixed_access_controls(self): # No access by default - ac_project = self._create_access_control(access_level="none") + ac_project = self._create_access_control(resource_id=self.team.id, access_level="none") # Enroll self.user as member - ac_user = self._create_access_control(access_level="member", organization_member=self.organization_membership) + ac_user = self._create_access_control( + resource_id=self.team.id, access_level="member", organization_member=self.organization_membership + ) # Enroll role_a as admin - ac_role = self._create_access_control(access_level="admin", role=self.role_a) # The highest AC + ac_role = self._create_access_control( + resource_id=self.team.id, access_level="admin", role=self.role_a + ) # The highest AC # Enroll role_b as member - ac_role_2 = self._create_access_control(access_level="member", role=self.role_b) + ac_role_2 = self._create_access_control(resource_id=self.team.id, access_level="member", role=self.role_b) # Enroll self.user in both roles RoleMembership.objects.create(user=self.user, role=self.role_b) @@ -158,7 +176,7 @@ def test_ac_object_mixed_access_controls(self): assert self.user_access_control.access_level_for_object(self.team) == "admin" def test_org_admin_always_has_access(self): - self._create_access_control(access_level="none") + self._create_access_control(resource_id=self.team.id, access_level="none") assert self.other_user_access_control.check_access_level_for_object(self.team, "member") is False assert self.other_user_access_control.check_access_level_for_object(self.team, "admin") is False @@ -172,27 +190,58 @@ def test_leaving_the_org_revokes_access(self): self.user.leave(organization=self.organization) assert self.user_access_control.check_access_level_for_object(self.team, "member") is False + def test_filters_project_queryset_based_on_acs(self): + team2 = Team.objects.create(organization=self.organization) + team3 = Team.objects.create(organization=self.organization) + # No default access + self._create_access_control(resource="project", resource_id=team2.id, access_level="none") + # No default access + self._create_access_control(resource="project", resource_id=team3.id, access_level="none") + # This user access + self._create_access_control( + resource="project", + resource_id=team3.id, + access_level="member", + organization_member=self.organization_membership, + ) -# def test_team_ids_visible_for_user(self): -# assert self.permissions().team_ids_visible_for_user == [self.team.pk] + # NOTE: This is different to the API queries as the TeamAndOrgViewsetMixing takes care of filtering out based on the parent org + filtered_teams = list(self.user_access_control.filter_queryset_by_access_level(Team.objects.all())) + assert filtered_teams == [self.team, team3] -# def test_team_ids_visible_for_user_no_explicit_permissions(self): -# self.team.access_control = True -# self.team.save() + other_user_filtered_teams = list( + self.other_user_access_control.filter_queryset_by_access_level(Team.objects.all()) + ) + assert other_user_filtered_teams == [self.team] -# assert self.permissions().team_ids_visible_for_user == [] + def test_ac_resource_controls(self): + pass -# def test_team_ids_visible_for_user_explicit_permission(self): -# self.team.access_control = True -# self.team.save() -# ExplicitTeamMembership.objects.create( -# team=self.team, -# parent_membership=self.organization_membership, -# level=ExplicitTeamMembership.Level.ADMIN, -# ) +class TestUserAccessControlResourceSpecific(BaseUserAccessControlTest): + """ + Most things are identical between "project"s and other resources, but there are some differences particularly in level names + """ + + def setUp(self): + super().setUp() + + self.dashboard = Dashboard.objects.create(team=self.team) -# assert self.permissions().team_ids_visible_for_user == [self.team.pk] + def test_without_available_features(self): + self.organization.available_features = [] + self.organization.save() + self.organization_membership.level = OrganizationMembership.Level.ADMIN + self.organization_membership.save() + + assert self.user_access_control.access_level_for_object(self.dashboard) == "editor" + assert self.other_user_access_control.access_level_for_object(self.dashboard) == "editor" + assert self.user_access_control.access_level_for_resource("dashboard") == "editor" + assert self.other_user_access_control.access_level_for_resource("dashboard") == "editor" + + def test_ac_object_default_response(self): + assert self.user_access_control.access_level_for_object(self.dashboard) == "editor" + assert self.other_user_access_control.access_level_for_object(self.dashboard) == "editor" # class TestUserDashboardPermissions(BaseTest, WithPermissionsBase): diff --git a/posthog/rbac/user_access_control.py b/posthog/rbac/user_access_control.py index 3e7b25316dade..507312096207a 100644 --- a/posthog/rbac/user_access_control.py +++ b/posthog/rbac/user_access_control.py @@ -81,6 +81,11 @@ def __init__(self, user: User, team: Optional[Team] = None, organization: Option self._team = team self._organization: Organization = organization or team.organization + def _clear_cache(self): + # Primarily intended for tests + self._access_controls_for_object.cache_clear() + self._access_controls_for_resource.cache_clear() + @cached_property def _organization_membership(self) -> Optional[OrganizationMembership]: try: @@ -112,20 +117,25 @@ def _access_controls_for_object(self, obj: Model, resource: str) -> List[_Access role_memberships = self._user.role_memberships.select_related("role").all() role_ids = [membership.role.id for membership in role_memberships] if self.rbac_supported else [] - # TODO: Need to determine if there exists any ACs for the resource to determine if we should return None or not - return AccessControl.objects.filter( - Q( # Access controls applying to this team - team=self._team, resource=resource, resource_id=resource_id, organization_member=None, role=None - ) - | Q( # Access controls applying to this user - team=self._team, - resource=resource, - resource_id=resource_id, - organization_member__user=self._user, - role=None, - ) - | Q( # Access controls applying to this user's roles - team=self._team, resource=resource, resource_id=resource_id, organization_member=None, role__in=role_ids + return list( + AccessControl.objects.filter( + Q( # Access controls applying to this team + team=self._team, resource=resource, resource_id=resource_id, organization_member=None, role=None + ) + | Q( # Access controls applying to this user + team=self._team, + resource=resource, + resource_id=resource_id, + organization_member__user=self._user, + role=None, + ) + | Q( # Access controls applying to this user's roles + team=self._team, + resource=resource, + resource_id=resource_id, + organization_member=None, + role__in=role_ids, + ) ) ) @@ -138,19 +148,21 @@ def _access_controls_for_resource(self, resource: APIScopeObject) -> List[_Acces role_memberships = self._user.role_memberships.select_related("role").all() role_ids = [membership.role.id for membership in role_memberships] if self.rbac_supported else [] - return AccessControl.objects.filter( - Q( # Access controls applying to this team - team=self._team, resource=resource, resource_id=None, organization_member=None, role=None - ) - | Q( # Access controls applying to this user - team=self._team, - resource=resource, - resource_id=None, - organization_member__user=self._user, - role=None, - ) - | Q( # Access controls applying to this user's roles - team=self._team, resource=resource, resource_id=None, organization_member=None, role__in=role_ids + return list( + AccessControl.objects.filter( + Q( # Access controls applying to this team + team=self._team, resource=resource, resource_id=None, organization_member=None, role=None + ) + | Q( # Access controls applying to this user + team=self._team, + resource=resource, + resource_id=None, + organization_member__user=self._user, + role=None, + ) + | Q( # Access controls applying to this user's roles + team=self._team, resource=resource, resource_id=None, organization_member=None, role__in=role_ids + ) ) ) @@ -174,14 +186,14 @@ def access_level_for_object(self, obj: Model, resource: Optional[str] = None) -> # NOTE: Technically this is covered by Org Permission check so more of a sanity check return None - if not self.access_controls_supported: - # If access controls aren't supported, then we return the default access level - return default_access_level(resource) - # Org admins always have object level access if org_membership.level >= OrganizationMembership.Level.ADMIN: return ordered_access_levels(resource)[-1] + if not self.access_controls_supported: + # If access controls aren't supported, then we return the default access level + return default_access_level(resource) + access_controls = self._access_controls_for_object(obj, resource) if not access_controls: return default_access_level(resource) @@ -242,14 +254,14 @@ def access_level_for_resource(self, resource: APIScopeObject) -> Optional[str]: # In any of these cases, we can't determine the access level return None - if not self.access_controls_supported: - # If access controls aren't supported, then return the default access level - return default_access_level(resource) - # Org admins always have resource level access if org_membership.level >= OrganizationMembership.Level.ADMIN: return ordered_access_levels(resource)[-1] + if not self.access_controls_supported: + # If access controls aren't supported, then return the default access level + return default_access_level(resource) + access_controls = self._access_controls_for_resource(resource) if not access_controls: return None