-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementation of class-based permission system #19
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
class Permission(object): | ||
def pre_validate(self, method, model, user=None): | ||
return True | ||
|
||
def filter_query(self, query, model, user=None): | ||
return query | ||
|
||
def post_validate(self, method, instances, user=None): | ||
return instances | ||
|
||
|
||
class PermissionAnyone(Permission): | ||
pass | ||
|
||
|
||
class PermissionAdmin(Permission): | ||
def __init__(self, meta_admin_property): | ||
self.meta_admin_property = meta_admin_property | ||
|
||
def _user_is_admin(self, user): | ||
if user is None: return False | ||
if not hasattr(user, 'RESTMeta') or not hasattr(user.RESTMeta, self.meta_admin_property): | ||
raise ValueError('The user model class does not have a properly configured admin property') | ||
admin_property = getattr(user.RESTMeta, self.meta_admin_property) | ||
return getattr(user, admin_property) | ||
|
||
def pre_validate(self, method, model, user=None): | ||
return self._user_is_admin(user) | ||
|
||
def post_validate(self, method, instances, user=None): | ||
if self._user_is_admin(user): | ||
return instances | ||
return False | ||
|
||
|
||
class PermissionUser(Permission): | ||
def pre_validate(self, method, model, user=None): | ||
return user is not None | ||
|
||
def post_validate(self, method, instances, user=None): | ||
if user is not None: | ||
return instances | ||
return False | ||
|
||
|
||
class PermissionOwner(PermissionUser): | ||
def __init__(self, meta_owner_property): | ||
self.meta_owner_property = meta_owner_property | ||
|
||
def _get_owner_property(self, model): | ||
if not hasattr(model, 'RESTMeta') or not hasattr(model.RESTMeta, self.meta_owner_property): | ||
raise ValueError('The user model class does not have a properly configured owner property') | ||
return getattr(model.RESTMeta, self.meta_owner_property) | ||
|
||
def filter_query(self, query, model, user=None): | ||
model_owner_property = getattr(model, self._get_owner_property(model)) | ||
return query.filter(model_owner_property == user.key) | ||
|
||
def post_validate(self, method, instances, user=None): | ||
for i in range(len(instances)): | ||
owner_property = self._get_owner_property(type(instances[i])) | ||
if method == "POST": | ||
setattr(instances[i], owner_property, user.key) | ||
else: | ||
if getattr(instances[i], owner_property) != user.key: | ||
return False | ||
return instances | ||
|
||
|
||
PERMISSION_ANYONE = [PermissionAnyone()] | ||
PERMISSION_LOGGED_IN_USER = [PermissionUser()] | ||
PERMISSION_OWNER_USER = [PermissionAdmin('admin_property'), PermissionOwner('user_owner_property')] | ||
PERMISSION_ADMIN = [PermissionAdmin('admin_property')] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,20 +22,14 @@ | |
from google.appengine.api import app_identity | ||
from google.net.proto.ProtocolBuffer import ProtocolBufferDecodeError | ||
|
||
from permissions import * | ||
|
||
try: | ||
import dateutil.parser | ||
except ImportError as e: | ||
dateutil = None | ||
|
||
|
||
# The REST permissions | ||
PERMISSION_ANYONE = 'anyone' | ||
PERMISSION_LOGGED_IN_USER = 'logged_in_user' | ||
PERMISSION_OWNER_USER = 'owner_user' | ||
PERMISSION_ADMIN = 'admin' | ||
|
||
|
||
|
||
class NDBEncoder(json.JSONEncoder): | ||
"""JSON encoding for NDB models and properties""" | ||
def _decode_key(self, key): | ||
|
@@ -575,7 +569,7 @@ class NewRESTMeta: pass | |
model.RESTMeta = NewRESTMeta | ||
model.RESTMeta.base_url = base_url | ||
|
||
permissions = { 'OPTIONS': PERMISSION_ANYONE } | ||
permissions = { 'OPTIONS': [PermissionAnyone()] } | ||
permissions.update(kwd.get('permissions', {})) | ||
allow_http_method_override = kwd.get('allow_http_method_override', True) | ||
allowed_origin = kwd.get('allowed_origin', None) | ||
|
@@ -589,13 +583,6 @@ class NewRESTMeta: pass | |
before_delete_callback = [kwd.get('before_delete_callback', None)] | ||
after_delete_callback = [kwd.get('after_delete_callback', None)] | ||
|
||
# Validate arguments (we do this at this stage in order to raise exceptions immediately rather than while the app is running) | ||
if PERMISSION_OWNER_USER in permissions.values(): | ||
if not hasattr(model, 'RESTMeta') or not hasattr(model.RESTMeta, 'user_owner_property'): | ||
raise ValueError('Must define a RESTMeta.user_owner_property for the model class %s if user-owner permission is used' % (model)) | ||
if not hasattr(model, model.RESTMeta.user_owner_property): | ||
raise ValueError('The user_owner_property "%s" (defined in RESTMeta.user_owner_property) does not exist in the given model %s' % (model.RESTMeta.user_owner_property, model)) | ||
|
||
def __init__(self, request, response): | ||
self.initialize(request, response) | ||
blobstore_handlers.BlobstoreUploadHandler.__init__(self, request, response) | ||
|
@@ -620,24 +607,20 @@ def inner_f(self, model_id, property_name=None): | |
return self.method_not_allowed() | ||
|
||
# Verify permissions | ||
permission = self.permissions[method_name] | ||
accepted_permission = self._get_permission(method_name) | ||
|
||
if (permission in [PERMISSION_LOGGED_IN_USER, PERMISSION_OWNER_USER, PERMISSION_ADMIN]) and (not self.user): | ||
# User not logged-in as required | ||
if accepted_permission is None: | ||
return self.unauthorized() | ||
|
||
elif permission == PERMISSION_ADMIN and not self.is_user_admin: | ||
# User is not an admin | ||
return self.permission_denied() | ||
|
||
try: | ||
# Call original method | ||
if model_id: | ||
model = self._model_id_to_model(model_id.lstrip('/')) # Get rid of '/' at the beginning | ||
|
||
if (permission == PERMISSION_OWNER_USER) and (self.get_model_owner(model) != self.user.key): | ||
# The currently logged-in user is not the owner of the model | ||
return self.permission_denied() | ||
model = accepted_permission.post_validate(method_name, [model], self.user) | ||
|
||
if model is None: | ||
return self.unauthorized() | ||
|
||
if property_name and model: | ||
# Get the original name of the property | ||
|
@@ -654,7 +637,7 @@ def inner_f(self, model_id, property_name=None): | |
# Only return a result (i.e. write to the response object) if it's not a NoResponseResult (used when serving blobs - BlobKeyProperty) | ||
return self.success(result) | ||
|
||
except RESTException, exc: | ||
except RESTException as exc: | ||
return self.error(exc) | ||
|
||
return inner_f | ||
|
@@ -681,9 +664,12 @@ def get(self, model, property_name=None): | |
|
||
query = self._filter_query() # Filter the results | ||
|
||
if self.permissions['GET'] == PERMISSION_OWNER_USER: | ||
# Return only models owned by currently logged-in user | ||
query = query.filter(getattr(self.model, self.user_owner_property) == self.user.key) | ||
permission = self._get_permission('GET') | ||
|
||
if permission is None: | ||
return self.unauthorized() | ||
|
||
query = permission.filter_query(query, self.model, self.user) | ||
|
||
query = self._order_query(query) # Order the results | ||
(results, cursor) = self._fetch_query(query) # Fetch them (with a limit / specific page, if provided) | ||
|
@@ -788,6 +774,11 @@ def post(self, model, property_name=None): | |
except Exception as exc: | ||
raise RESTException('Invalid JSON POST data - %s' % exc) | ||
|
||
# Attempt to fetch the models from the datastore to ensure that they don't already exist | ||
keys = [m.key for m in models] | ||
if any(ndb.get_multi(keys)): | ||
raise RESTException('Cannot POST to an existing model ID') | ||
|
||
if self.before_post_callback: | ||
models = self.before_post_callback(models, json_data) | ||
|
||
|
@@ -834,6 +825,10 @@ def put(self, model, property_name=None): | |
model = self._build_model_from_data(model_to_update, self.model, model) | ||
models.append(model) | ||
|
||
permission = self._get_permission('put') | ||
if not permission.post_validate('put', models, self.user): | ||
return self.unauthorized() | ||
|
||
if self.before_put_callback: | ||
models = self.before_put_callback(models, json_data) | ||
|
||
|
@@ -866,12 +861,11 @@ def delete(self, model, property_name=None): | |
else: | ||
# Delete multiple model instances | ||
|
||
if self.permissions['DELETE'] == PERMISSION_OWNER_USER: | ||
# Delete all models owned by the currently logged-in user | ||
query = self.model.query().filter(getattr(self.model, self.user_owner_property) == self.user.key) | ||
else: | ||
# Delete all models | ||
query = self.model.query() | ||
query = self.model.query() | ||
|
||
permission = self._get_permission('DELETE') | ||
|
||
query = permission.filter_query(query, self.model, self.user) | ||
|
||
# Delete the models (we might need to fetch several pages in case of many results) | ||
cursor = None | ||
|
@@ -900,32 +894,19 @@ def delete(self, model, property_name=None): | |
# Utility methods/properties | ||
# | ||
|
||
def _get_permission(self, method): | ||
accepted_permission = None | ||
permissions = self.permissions.get(method, []) | ||
|
||
@webapp2.cached_property | ||
def is_user_admin(self): | ||
"""Determines if the currently logged-in user is an admin or not (relies on the user class RESTMeta.admin_property)""" | ||
|
||
if not hasattr(self.user, 'RESTMeta') or not hasattr(self.user.RESTMeta, 'admin_property'): | ||
# This is caused due to a misconfiguration by the coder (didn't define a proper RESTMeta.admin_property) - we raise an exception so | ||
# it'll trigger a 500 internal server error. This specific argument validation is done here instead of the class definition (where the | ||
# rest of the arguments are being validated) since at that stage we can't see the webapp2 auth configuration to determine the User model. | ||
raise ValueError('The user model class %s must include a RESTMeta class with `admin_property` defined' % (self.user.__class__)) | ||
|
||
admin_property = self.user.RESTMeta.admin_property | ||
if not hasattr(self.user, admin_property): | ||
raise ValueError('The user model class %s does not have the property %s as defined in its RESTMeta.admin_property' % (self.user.__class__, admin_property)) | ||
|
||
return getattr(self.user, admin_property) | ||
|
||
@webapp2.cached_property | ||
def user_owner_property(self): | ||
"""Returns the name of the user_owner_property""" | ||
return self.model.RESTMeta.user_owner_property | ||
if not isinstance(permissions, list): | ||
permissions = [permissions] | ||
|
||
def get_model_owner(self, model): | ||
"""Returns the user owner of the given `model` (relies on RESTMeta.user_owner_property)""" | ||
return getattr(model, self.user_owner_property) | ||
for permission_object in permissions: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not add support for a singular permission instead of just a list? e.g. if not isinstance(permissions, list):
permissions = [permissions] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea. Adding this. |
||
if permission_object.pre_validate(method, self.model, self.user): | ||
accepted_permission = permission_object | ||
break | ||
|
||
return accepted_permission | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about adding permission check for the PUT method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our |
||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only do this if permission is not None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we've reached this point, permissions have already been checked. If it was None,
rest_method_wrapper
would have raised an exception. We only useself._get_permission
so that we can filter the query based on whatever kind of permission was granted by it. I could be wrong, but I don't see a scenario where it would be possible to get here ifself._get_permission
is returning None.