From 14b0cde9074d225fb262bb9b04ecff230baab638 Mon Sep 17 00:00:00 2001
From: MueezKhan246 <93375917+MueezKhan246@users.noreply.github.com>
Date: Wed, 5 Jun 2024 13:34:12 +0500
Subject: [PATCH] Pending enterprise customer admin user view set permission
update (#2122)
* feat: added is_in_provisioning_admin_group permission class
---
CHANGELOG.rst | 4 ++
enterprise/__init__.py | 2 +-
enterprise/api/v1/permissions.py | 15 +++++++
enterprise/api/v1/serializers.py | 32 ++++++++-------
.../pending_enterprise_customer_admin_user.py | 3 +-
tests/test_enterprise/api/constants.py | 2 +
tests/test_enterprise/api/test_views.py | 39 +++++++++++--------
7 files changed, 65 insertions(+), 32 deletions(-)
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index f7171d6e45..a7e7446488 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -17,6 +17,10 @@ Unreleased
----------
* nothing unreleased
+[4.19.8]
+--------
+* chore: updated permission class ``IsInProvisioningAdminGroup`` for ``PendingEnterpriseCustomerAdminUser`` viewpoint and handled exceptions in serializer.
+
[4.19.7]
--------
* feat: schema level improvement in integrated-channels
diff --git a/enterprise/__init__.py b/enterprise/__init__.py
index 4e617e328b..330beaf731 100644
--- a/enterprise/__init__.py
+++ b/enterprise/__init__.py
@@ -2,4 +2,4 @@
Your project description goes here.
"""
-__version__ = "4.19.7"
+__version__ = "4.19.8"
diff --git a/enterprise/api/v1/permissions.py b/enterprise/api/v1/permissions.py
index a90d7cfdb2..74ee12a075 100644
--- a/enterprise/api/v1/permissions.py
+++ b/enterprise/api/v1/permissions.py
@@ -18,3 +18,18 @@ def has_permission(self, request, view):
request.user.groups.filter(name__in=request.query_params.getlist('permissions')).exists()
)
)
+
+
+class IsInProvisioningAdminGroup(permissions.BasePermission):
+ """
+ Grant access to those users only who are part of the license provisiioning django group
+ """
+ ALLOWED_API_GROUPS = ['provisioning-admins-group']
+ message = 'Access denied: You do not have the necessary permissions to access this.'
+
+ def has_permission(self, request, view):
+ return (
+ super().has_permission(request, view) and (
+ request.user.groups.filter(name__in=self.ALLOWED_API_GROUPS).exists()
+ )
+ )
diff --git a/enterprise/api/v1/serializers.py b/enterprise/api/v1/serializers.py
index 73557da62d..c19a44b53b 100644
--- a/enterprise/api/v1/serializers.py
+++ b/enterprise/api/v1/serializers.py
@@ -17,6 +17,7 @@
from django.contrib import auth
from django.contrib.sites.models import Site
from django.core import exceptions as django_exceptions
+from django.db import IntegrityError, transaction
from django.utils.translation import gettext_lazy as _
from enterprise import models, utils # pylint: disable=cyclic-import
@@ -1644,20 +1645,6 @@ def validate(self, attrs):
user_email = attrs.get('user_email', instance.user_email if instance else None)
enterprise_customer = attrs.get('enterprise_customer', instance.enterprise_customer if instance else None)
- if instance:
- existing_instances = PendingEnterpriseCustomerAdminUser.objects.filter(
- user_email=user_email,
- enterprise_customer=enterprise_customer
- ).exclude(id=instance.id)
- else:
- existing_instances = PendingEnterpriseCustomerAdminUser.objects.filter(
- user_email=user_email,
- enterprise_customer=enterprise_customer
- )
-
- if existing_instances.exists():
- raise serializers.ValidationError('A pending user with this email and enterprise customer already exists.')
-
admin_instance = SystemWideEnterpriseUserRoleAssignment.objects.filter(
role__name=ENTERPRISE_ADMIN_ROLE, user__email=user_email, enterprise_customer=enterprise_customer
)
@@ -1669,6 +1656,23 @@ def validate(self, attrs):
return attrs
+ def save(self, **kwargs):
+ """
+ Attempts to save the pending enterprise customer admin user data while handling potential integrity errors.
+ """
+ try:
+ with transaction.atomic():
+ return super().save(**kwargs)
+ except IntegrityError as exc:
+ raise serializers.ValidationError(
+ 'A pending user with this email and enterprise customer already exists.'
+ ) from exc
+ except Exception as e:
+ error_message = f"An unexpected error occurred while saving PendingEnterpriseCustomerAdminUser: {e}"
+ data = self.validated_data
+ LOGGER.error(error_message, extra={'data': data})
+ raise serializers.ValidationError('An unexpected error occurred. Please try again later.')
+
class AnalyticsSummarySerializer(serializers.Serializer):
"""
diff --git a/enterprise/api/v1/views/pending_enterprise_customer_admin_user.py b/enterprise/api/v1/views/pending_enterprise_customer_admin_user.py
index 700be516b4..c289d1e3a5 100644
--- a/enterprise/api/v1/views/pending_enterprise_customer_admin_user.py
+++ b/enterprise/api/v1/views/pending_enterprise_customer_admin_user.py
@@ -7,6 +7,7 @@
from enterprise import models
from enterprise.api.v1 import serializers
+from enterprise.api.v1.permissions import IsInProvisioningAdminGroup
from enterprise.api.v1.views.base_views import EnterpriseReadWriteModelViewSet
from enterprise.logging import getEnterpriseLogger
@@ -21,7 +22,7 @@ class PendingEnterpriseCustomerAdminUserViewSet(EnterpriseReadWriteModelViewSet)
queryset = models.PendingEnterpriseCustomerAdminUser.objects.all()
filter_backends = (filters.OrderingFilter, DjangoFilterBackend)
serializer_class = serializers.PendingEnterpriseCustomerAdminUserSerializer
- permission_classes = (permissions.IsAuthenticated, permissions.IsAdminUser)
+ permission_classes = (permissions.IsAuthenticated, IsInProvisioningAdminGroup)
FIELDS = (
'enterprise_customer', 'user_email',
diff --git a/tests/test_enterprise/api/constants.py b/tests/test_enterprise/api/constants.py
index e7cf071dc0..6213f0cba3 100644
--- a/tests/test_enterprise/api/constants.py
+++ b/tests/test_enterprise/api/constants.py
@@ -11,3 +11,5 @@
(' ', "https://idp1.example.com"), # pylint: disable=line-too-long
('', "https://example.com") # pylint: disable=line-too-long
]
+
+PROVISIONING_ADMINS_GROUP = "provisioning-admins-group"
diff --git a/tests/test_enterprise/api/test_views.py b/tests/test_enterprise/api/test_views.py
index 9d2c8514c8..e9f128f70a 100644
--- a/tests/test_enterprise/api/test_views.py
+++ b/tests/test_enterprise/api/test_views.py
@@ -27,7 +27,7 @@
from testfixtures import LogCapture
from django.conf import settings
-from django.contrib.auth.models import Permission
+from django.contrib.auth.models import Group, Permission
from django.core.cache import cache
from django.test import override_settings
from django.utils import timezone
@@ -107,7 +107,7 @@
)
from test_utils.fake_enterprise_api import get_default_branding_object
-from .constants import FAKE_SSO_METADATA_XML_WITH_ENTITY_ID
+from .constants import FAKE_SSO_METADATA_XML_WITH_ENTITY_ID, PROVISIONING_ADMINS_GROUP
Application = get_application_model()
fake = Faker()
@@ -859,21 +859,21 @@ def setUp(self):
enterprise_customer=self.enterprise_customer,
user_email='test@example.com'
)
+ self.staff_user = factories.UserFactory(is_staff=True, is_active=True)
- def setup_admin_user(self, is_staff=True):
+ def setup_provisioning_admin_permission(self):
"""
- Creates an admin user and logs them in
+ Grants provisioning admin permissions to the staff user for testing purposes.
"""
- client_username = 'client_username'
- self.client.logout()
- self.create_user(username=client_username, password=TEST_PASSWORD, is_staff=is_staff)
- self.client.login(username=client_username, password=TEST_PASSWORD)
+ allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP)
+ self.staff_user.groups.add(allowed_group)
+ self.client.force_authenticate(user=self.staff_user)
def test_post_pending_enterprise_customer_admin_user_creation(self):
"""
Make sure service users can post new PendingEnterpriseCustomerAdminUsers.
"""
- self.setup_admin_user(True)
+ self.setup_provisioning_admin_permission()
data = {
'enterprise_customer': self.enterprise_customer.uuid,
@@ -888,9 +888,9 @@ def test_post_pending_enterprise_customer_admin_user_creation(self):
def test_post_pending_enterprise_customer_unauthorized_user(self):
"""
- Make sure unauthorized users can't post PendingEnterpriseCustomerAdminUsers.
+ Make sure unauthorized users, not added to IsInProvisioningAdminGroup,
+ can't post PendingEnterpriseCustomerAdminUsers.
"""
- self.setup_admin_user(False)
data = {
'enterprise_customer': self.enterprise_customer.uuid,
@@ -899,6 +899,9 @@ def test_post_pending_enterprise_customer_unauthorized_user(self):
response = self.client.post(settings.TEST_SERVER + PENDING_ENTERPRISE_CUSTOMER_ADMIN_LIST_ENDPOINT, data=data)
assert response.status_code == 403
+ error_message = response.json()['detail']
+ expected_message = "Access denied: You do not have the necessary permissions to access this."
+ self.assertIn(expected_message, error_message)
def test_post_pending_enterprise_customer_user_logged_out(self):
"""
@@ -916,6 +919,7 @@ def test_delete_pending_enterprise_customer_admin_user(self):
"""
Test deleting a pending enterprise customer admin user.
"""
+ self.setup_provisioning_admin_permission()
url = reverse('pending-enterprise-admin-detail', kwargs={'pk': self.pending_admin_user.id})
response = self.client.delete(settings.TEST_SERVER + url)
@@ -927,6 +931,7 @@ def test_get_pending_enterprise_customer_admin_user(self):
"""
Test retrieving a pending enterprise customer admin user.
"""
+ self.setup_provisioning_admin_permission()
url = reverse('pending-enterprise-admin-detail', kwargs={'pk': self.pending_admin_user.id})
response = self.client.get(url)
@@ -942,6 +947,7 @@ def test_patch_pending_enterprise_customer_admin_user(self):
"""
Test updating a pending enterprise customer admin user's email.
"""
+ self.setup_provisioning_admin_permission()
data = {
'user_email': 'updated@example.com'
}
@@ -958,6 +964,7 @@ def test_patch_pending_enterprise_customer_admin_user_existing_admin(self):
"""
Test updating a pending enterprise customer admin user with an email that already has admin permissions.
"""
+ self.setup_provisioning_admin_permission()
SystemWideEnterpriseUserRoleAssignment.objects.create(
role=admin_role(),
user=self.user,
@@ -982,7 +989,7 @@ def test_patch_pending_admin_user_with_existing_email(self):
Test patching a pending enterprise customer admin user with an email that already exists
for the same enterprise customer, expecting a validation error.
"""
-
+ self.setup_provisioning_admin_permission()
new_user_email = 'newtest@example.com'
PendingEnterpriseCustomerAdminUser.objects.create(
enterprise_customer=self.enterprise_customer,
@@ -998,7 +1005,7 @@ def test_patch_pending_admin_user_with_existing_email(self):
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
- error_message = response.json().get('non_field_errors', [])[0]
+ error_message = response.json()[0]
expected_message = 'A pending user with this email and enterprise customer already exists.'
self.assertEqual(error_message, expected_message)
@@ -1006,7 +1013,7 @@ def test_validate_existing_admin_user(self):
"""
Test validation error when creating a pending admin user with an email that already has admin permissions.
"""
- self.setup_admin_user(True)
+ self.setup_provisioning_admin_permission()
SystemWideEnterpriseUserRoleAssignment.objects.create(
role=admin_role(),
@@ -1031,7 +1038,7 @@ def test_validate_duplicate_user(self):
Test validation error when creating a pending admin user that already exists.
"""
- self.setup_admin_user(True)
+ self.setup_provisioning_admin_permission()
data = {
'enterprise_customer': self.enterprise_customer.uuid,
@@ -1043,7 +1050,7 @@ def test_validate_duplicate_user(self):
response = self.client.post(settings.TEST_SERVER + PENDING_ENTERPRISE_CUSTOMER_ADMIN_LIST_ENDPOINT, data=data)
self.assertEqual(response.status_code, 400)
- error_message = response.json().get('non_field_errors', [])
+ error_message = response.json()[0]
expected_message = "A pending user with this email and enterprise customer already exists."
self.assertIn(expected_message, error_message)