-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c05fa43
commit d256172
Showing
2 changed files
with
128 additions
and
67 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import pytest | ||
from posthog.constants import AvailableFeature | ||
from posthog.models.organization import OrganizationMembership | ||
from posthog.models.dashboard import Dashboard | ||
from posthog.models.organization import Organization, OrganizationMembership | ||
from posthog.models.team.team import Team | ||
from posthog.models.user import User | ||
from posthog.rbac.user_access_control import UserAccessControl | ||
from posthog.test.base import BaseTest | ||
|
@@ -13,8 +15,7 @@ | |
pass | ||
|
||
|
||
@pytest.mark.ee | ||
class TestUserTeamPermissions(BaseTest): | ||
class BaseUserAccessControlTest(BaseTest): | ||
user_access_control: UserAccessControl | ||
|
||
def _create_access_control( | ||
|
@@ -50,15 +51,24 @@ def setUp(self): | |
self.user_with_no_role = User.objects.create_and_join(self.organization, "[email protected]", "testtest") | ||
self.user_with_no_role_access_control = UserAccessControl(self.user_with_no_role, self.team) | ||
|
||
def test_ac_object_default_response_without_available_feature(self): | ||
def _clear_uac_caches(self): | ||
self.user_access_control._clear_cache() | ||
self.other_user_access_control._clear_cache() | ||
self.user_with_no_role_access_control._clear_cache() | ||
|
||
|
||
@pytest.mark.ee | ||
class TestUserAccessControl(BaseUserAccessControlTest): | ||
def test_without_available_features(self): | ||
self.organization.available_features = [] | ||
self.organization.save() | ||
self.organization_membership.level = OrganizationMembership.Level.ADMIN | ||
self.organization_membership.save() | ||
|
||
# This always will return false - the user of this class will take care of it | ||
assert self.user_access_control.access_level_for_object(self.team) is None | ||
assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False | ||
assert self.other_user_access_control.access_level_for_object(self.team) is None | ||
assert self.other_user_access_control.check_access_level_for_object(self.team, "admin") is False | ||
assert self.user_access_control.access_level_for_object(self.team) == "admin" | ||
assert self.other_user_access_control.access_level_for_object(self.team) == "member" | ||
assert self.user_access_control.access_level_for_resource("project") == "admin" | ||
assert self.other_user_access_control.access_level_for_resource("project") == "member" | ||
|
||
def test_ac_object_default_response(self): | ||
assert self.user_access_control.access_level_for_object(self.team) == "member" | ||
|
@@ -81,6 +91,7 @@ def test_ac_object_user_access_control(self): | |
|
||
ac.access_level = "member" | ||
ac.save() | ||
self._clear_uac_caches() | ||
|
||
assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False | ||
assert self.user_access_control.check_access_level_for_object(self.team, "member") is True | ||
|
@@ -91,14 +102,15 @@ def test_ac_object_user_access_control(self): | |
|
||
def test_ac_object_project_access_control(self): | ||
# Setup no access by default | ||
ac = self._create_access_control(access_level="none") | ||
ac = self._create_access_control(resource_id=self.team.id, access_level="none") | ||
|
||
assert self.user_access_control.access_level_for_object(self.team) == "none" | ||
assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False | ||
assert self.other_user_access_control.check_access_level_for_object(self.team, "admin") is False | ||
|
||
ac.access_level = "member" | ||
ac.save() | ||
self._clear_uac_caches() | ||
|
||
assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False | ||
assert self.user_access_control.check_access_level_for_object(self.team, "member") is True | ||
|
@@ -107,12 +119,13 @@ def test_ac_object_project_access_control(self): | |
|
||
ac.access_level = "admin" | ||
ac.save() | ||
self._clear_uac_caches() | ||
|
||
assert self.user_access_control.check_access_level_for_object(self.team, "admin") is True | ||
assert self.other_user_access_control.check_access_level_for_object(self.team, "admin") is True | ||
|
||
def test_ac_object_role_access_control(self): | ||
ac = self._create_access_control(access_level="admin", role=self.role_a) | ||
ac = self._create_access_control(resource_id=self.team.id, access_level="admin", role=self.role_a) | ||
|
||
assert self.user_access_control.access_level_for_object(self.team) == "admin" | ||
assert self.user_access_control.check_access_level_for_object(self.team, "admin") is True | ||
|
@@ -121,9 +134,10 @@ def test_ac_object_role_access_control(self): | |
|
||
ac.access_level = "member" | ||
ac.save() | ||
self._clear_uac_caches() | ||
|
||
# Make the default access level none | ||
self._create_access_control(access_level="none") | ||
self._create_access_control(resource_id=self.team.id, access_level="none") | ||
|
||
assert self.user_access_control.check_access_level_for_object(self.team, "admin") is False | ||
assert self.user_access_control.check_access_level_for_object(self.team, "member") is True | ||
|
@@ -133,13 +147,17 @@ def test_ac_object_role_access_control(self): | |
|
||
def test_ac_object_mixed_access_controls(self): | ||
# No access by default | ||
ac_project = self._create_access_control(access_level="none") | ||
ac_project = self._create_access_control(resource_id=self.team.id, access_level="none") | ||
# Enroll self.user as member | ||
ac_user = self._create_access_control(access_level="member", organization_member=self.organization_membership) | ||
ac_user = self._create_access_control( | ||
resource_id=self.team.id, access_level="member", organization_member=self.organization_membership | ||
) | ||
# Enroll role_a as admin | ||
ac_role = self._create_access_control(access_level="admin", role=self.role_a) # The highest AC | ||
ac_role = self._create_access_control( | ||
resource_id=self.team.id, access_level="admin", role=self.role_a | ||
) # The highest AC | ||
# Enroll role_b as member | ||
ac_role_2 = self._create_access_control(access_level="member", role=self.role_b) | ||
ac_role_2 = self._create_access_control(resource_id=self.team.id, access_level="member", role=self.role_b) | ||
# Enroll self.user in both roles | ||
RoleMembership.objects.create(user=self.user, role=self.role_b) | ||
|
||
|
@@ -158,7 +176,7 @@ def test_ac_object_mixed_access_controls(self): | |
assert self.user_access_control.access_level_for_object(self.team) == "admin" | ||
|
||
def test_org_admin_always_has_access(self): | ||
self._create_access_control(access_level="none") | ||
self._create_access_control(resource_id=self.team.id, access_level="none") | ||
assert self.other_user_access_control.check_access_level_for_object(self.team, "member") is False | ||
assert self.other_user_access_control.check_access_level_for_object(self.team, "admin") is False | ||
|
||
|
@@ -172,27 +190,58 @@ def test_leaving_the_org_revokes_access(self): | |
self.user.leave(organization=self.organization) | ||
assert self.user_access_control.check_access_level_for_object(self.team, "member") is False | ||
|
||
def test_filters_project_queryset_based_on_acs(self): | ||
team2 = Team.objects.create(organization=self.organization) | ||
team3 = Team.objects.create(organization=self.organization) | ||
# No default access | ||
self._create_access_control(resource="project", resource_id=team2.id, access_level="none") | ||
# No default access | ||
self._create_access_control(resource="project", resource_id=team3.id, access_level="none") | ||
# This user access | ||
self._create_access_control( | ||
resource="project", | ||
resource_id=team3.id, | ||
access_level="member", | ||
organization_member=self.organization_membership, | ||
) | ||
|
||
# def test_team_ids_visible_for_user(self): | ||
# assert self.permissions().team_ids_visible_for_user == [self.team.pk] | ||
# NOTE: This is different to the API queries as the TeamAndOrgViewsetMixing takes care of filtering out based on the parent org | ||
filtered_teams = list(self.user_access_control.filter_queryset_by_access_level(Team.objects.all())) | ||
assert filtered_teams == [self.team, team3] | ||
|
||
# def test_team_ids_visible_for_user_no_explicit_permissions(self): | ||
# self.team.access_control = True | ||
# self.team.save() | ||
other_user_filtered_teams = list( | ||
self.other_user_access_control.filter_queryset_by_access_level(Team.objects.all()) | ||
) | ||
assert other_user_filtered_teams == [self.team] | ||
|
||
# assert self.permissions().team_ids_visible_for_user == [] | ||
def test_ac_resource_controls(self): | ||
pass | ||
|
||
# def test_team_ids_visible_for_user_explicit_permission(self): | ||
# self.team.access_control = True | ||
# self.team.save() | ||
|
||
# ExplicitTeamMembership.objects.create( | ||
# team=self.team, | ||
# parent_membership=self.organization_membership, | ||
# level=ExplicitTeamMembership.Level.ADMIN, | ||
# ) | ||
class TestUserAccessControlResourceSpecific(BaseUserAccessControlTest): | ||
""" | ||
Most things are identical between "project"s and other resources, but there are some differences particularly in level names | ||
""" | ||
|
||
def setUp(self): | ||
super().setUp() | ||
|
||
self.dashboard = Dashboard.objects.create(team=self.team) | ||
|
||
# assert self.permissions().team_ids_visible_for_user == [self.team.pk] | ||
def test_without_available_features(self): | ||
self.organization.available_features = [] | ||
self.organization.save() | ||
self.organization_membership.level = OrganizationMembership.Level.ADMIN | ||
self.organization_membership.save() | ||
|
||
assert self.user_access_control.access_level_for_object(self.dashboard) == "editor" | ||
assert self.other_user_access_control.access_level_for_object(self.dashboard) == "editor" | ||
assert self.user_access_control.access_level_for_resource("dashboard") == "editor" | ||
assert self.other_user_access_control.access_level_for_resource("dashboard") == "editor" | ||
|
||
def test_ac_object_default_response(self): | ||
assert self.user_access_control.access_level_for_object(self.dashboard) == "editor" | ||
assert self.other_user_access_control.access_level_for_object(self.dashboard) == "editor" | ||
|
||
|
||
# class TestUserDashboardPermissions(BaseTest, WithPermissionsBase): | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters