Skip to content

Commit

Permalink
Swapping over to proper object based checking
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Mar 20, 2024
1 parent 839ee79 commit 6dcab3c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 24 deletions.
25 changes: 10 additions & 15 deletions ee/api/rbac/access_control.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from collections import OrderedDict
from typing import cast

from rest_framework import exceptions, mixins, serializers, status, viewsets
from rest_framework import exceptions, serializers, status
from rest_framework.decorators import action
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.request import Request
from rest_framework.response import Response

from ee.models.rbac.access_control import AccessControl
from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.constants import AvailableFeature
from posthog.models.personal_api_key import API_SCOPE_OBJECTS
from posthog.permissions import PremiumFeaturePermission
from posthog.models.user import User
from posthog.rbac.user_access_control import UserAccessControl, ordered_access_levels


Expand Down Expand Up @@ -48,6 +44,7 @@ def validate_access_level(self, access_level):
return access_level

def validate(self, data):
context = self.context
# Ensure that only one of organization_member or role is set
if data.get("organization_member") and data.get("role"):
raise serializers.ValidationError("You can not scope an access control to both a member and a role.")
Expand All @@ -58,23 +55,18 @@ def validate(self, data):

# We assume the highest level is required for the given resource to edit access controls
required_level = ordered_access_levels(resource)[-1]

# NOTE: For specific resources you are permitted if you are:
# 1. The creator of the resource
# 2. An Organization admin
# 3. A Project admin
team = context["view"].team
the_object = context["view"].get_object()

if resource_id:
# Check that they have the right access level for this specific resource object
if not access_control.check_access_level_for_object(
resource, data["resource_id"], required_level=required_level
):
if not access_control.check_can_modify_access_levels_for_object(the_object):
# TODO: Human readable resource name
raise exceptions.PermissionDenied(f"Must be {required_level} to modify {resource} permissions.")
else:
# If modifying the base resource rules then we are checking the parent membership (project or organization)
# NOTE: Currently we only support org level in the UI so its simply an org level check
if not access_control.check_access_level_for_object("organization", required_level="admin"):
if not access_control.check_can_modify_access_levels_for_object(team):
raise exceptions.PermissionDenied("Must be an Organization admin to modify project-wide permissions.")

return data
Expand All @@ -83,6 +75,9 @@ def validate(self, data):
class AccessControlViewSetMixin:
"""
Adds an "access_controls" action to the viewset that handles access control for the given resource
Why a mixin? We want to easily add this to any existing resource, including providing easy helpers for adding access control info such
as the current users access level to any response.
"""

# TODO: Now that we are on the viewset we can
Expand Down
13 changes: 9 additions & 4 deletions ee/api/rbac/test/test_access_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,25 @@ def setUp(self):
super().setUp()

self.notebook = Notebook.objects.create(
team=self.team, created_by=self.user, short_id="01234", title="first notebook"
team=self.team, created_by=self.user, short_id="0", title="first notebook"
)

self.other_user = self._create_user("other_user")
self.other_user_notebook = Notebook.objects.create(
team=self.team, created_by=self.other_user, short_id="1", title="first notebook"
)

def _get_access_controls(self, data={}):
return self.client.get(f"/api/projects/@current/notebooks/{self.notebook.short_id}/access_controls")

def _put_access_control(self, data={}):
def _put_access_control(self, data={}, notebook_id=None):
payload = {
"access_level": self.default_access_level,
}

payload.update(data)
return self.client.put(
f"/api/projects/@current/notebooks/{self.notebook.short_id}/access_controls",
f"/api/projects/@current/notebooks/{notebook_id or self.notebook.short_id}/access_controls",
payload,
)

Expand All @@ -124,7 +129,7 @@ def test_get_access_controls(self):

def test_change_rejected_if_not_org_admin(self):
self._org_membership(OrganizationMembership.Level.MEMBER)
res = self._put_access_control()
res = self._put_access_control(notebook_id=self.other_user_notebook.short_id)
assert res.status_code == status.HTTP_403_FORBIDDEN, res.json()

# def test_change_permitted_if_creator_of_the_resource(self):
Expand Down
27 changes: 22 additions & 5 deletions posthog/rbac/user_access_control.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from rest_framework.permissions import BasePermission

from functools import cached_property
from django.db.models import Q, QuerySet
from django.db.models import Model, Q, QuerySet
from typing import TYPE_CHECKING, List, Optional

from posthog.constants import AvailableFeature
Expand Down Expand Up @@ -141,7 +141,7 @@ def access_control_for_object(self, resource: APIScopeObject, resource_id: str)
)

def check_access_level_for_object(
self, resource: APIScopeObject, resource_id: str, required_level: str
self, obj: Model, resource: APIScopeObject, resource_id: str, required_level: str
) -> Optional[bool]:
"""
Entry point for all permissions around a specific object.
Expand All @@ -150,14 +150,29 @@ def check_access_level_for_object(
Returns true or false if access controls are applied, otherwise None
"""

access_control = self.access_control_for_object(resource, resource_id)
access_control = self.access_control_for_object(obj, resource, resource_id)

return (
None
if not access_control
else access_level_satisfied(resource, access_control.access_level, required_level)
)

def check_can_modify_access_levels_for_object(self, obj: Model) -> Optional[bool]:
"""
Special case for checking if the user can modify the access levels for an object.
Unlike check_access_level_for_object, this requires that one of these conditions is true:
1. The user is the creator of the object
2. The user is a project admin
2. The user is an org admin
"""

if getattr(obj, "created_by", None) == self._user:
# TODO: Should this always be the case, even for projects?
return True

return self.check_access_level_for_object(obj.team, "project", str(obj.id), "admin")

# Used for filtering a queryset by access level
def filter_queryset_by_access_level(self, queryset: QuerySet) -> QuerySet:
# TODO: Get list of all access controls for the user and then filter the queryset based on that
Expand Down Expand Up @@ -199,7 +214,9 @@ def has_object_permission(self, request, view, object) -> bool:

# TODO: How to determine action level to check...
required_level = "viewer"
has_access = uac.check_access_level_for_object(view.scope_object, str(object.id), required_level=required_level)
has_access = uac.check_access_level_for_object(
object, view.scope_object, str(object.id), required_level=required_level
)

if not has_access:
self.message = f"You do not have {required_level} access to this resource."
Expand All @@ -216,7 +233,7 @@ def has_permission(self, request, view) -> bool:

try:
team = view.team
is_member = uac.check_access_level_for_object("project", str(team.id), "member")
is_member = uac.check_access_level_for_object(view.team, "project", str(team.id), "member")

if not is_member:
self.message = f"You are not a member of this project."
Expand Down

0 comments on commit 6dcab3c

Please sign in to comment.