diff --git a/ee/rbac/test/test_user_access_control.py b/ee/rbac/test/test_user_access_control.py index 5f60fcda722d2..f4fdec27f017b 100644 --- a/ee/rbac/test/test_user_access_control.py +++ b/ee/rbac/test/test_user_access_control.py @@ -27,6 +27,7 @@ def setUp(self): super().setUp() self.organization.available_features = [ AvailableFeature.PROJECT_BASED_PERMISSIONING, + AvailableFeature.ROLE_BASED_ACCESS, ] self.organization.save() diff --git a/ee/rbac/user_access_control.py b/ee/rbac/user_access_control.py index 1988f2caa0fcc..0e78ea6079181 100644 --- a/ee/rbac/user_access_control.py +++ b/ee/rbac/user_access_control.py @@ -5,6 +5,7 @@ from typing import List, Optional from ee.models.rbac.access_control import AccessControl +from posthog.constants import AvailableFeature from posthog.models import ( Organization, OrganizationMembership, @@ -32,7 +33,7 @@ def access_level_satisfied(resource: APIScopeObject, current_level: str, require class UserAccessControl: - def __init__(self, user: User, organization: Optional[Organization] = None, team: Optional[Team] = None): + def __init__(self, user: User, organization: Organization, team: Optional[Team] = None): self._user = user self._team = team self._organization = organization @@ -41,6 +42,19 @@ def __init__(self, user: User, organization: Optional[Organization] = None, team def _organization_membership(self, organization: Organization) -> Optional[OrganizationMembership]: return OrganizationMembership.objects.get(organization=organization, user=self.user) + @property + def _rbac_supported(self) -> bool: + return self._organization.is_feature_available(AvailableFeature.ROLE_BASED_ACCESS) + + @property + def _access_controls_supported(self) -> bool: + # NOTE: This is a proxy feature. We may want to consider making it explicit later + # ADVANCED_PERMISSIONS was only for dashboard collaborators, PROJECT_BASED_PERMISSIONING for project permissions + # both now apply to this generic access control + return self._organization.is_feature_available( + AvailableFeature.PROJECT_BASED_PERMISSIONING + ) or self._organization.is_feature_available(AvailableFeature.ADVANCED_PERMISSIONS) + # @cached_property def _access_controls_for_object(self, resource: APIScopeObject, resource_id: str) -> List[AccessControl]: """ @@ -48,7 +62,7 @@ def _access_controls_for_object(self, resource: APIScopeObject, resource_id: str """ # TODO: Make this more efficient role_memberships = self._user.role_memberships.select_related("role").all() - role_ids = [membership.role.id for membership in role_memberships] + role_ids = [membership.role.id for membership in role_memberships] if self._rbac_supported else [] # TODO: Need to determine if there exists any ACs for the resource to determine if we should return None or not return AccessControl.objects.filter( @@ -77,6 +91,9 @@ def access_control_for_object(self, resource: APIScopeObject, resource_id: str) # TODO: Override this based on your Org membership level + if not self._access_controls_supported: + return None + access_controls = self._access_controls_for_object(resource, resource_id) if not access_controls: return