diff --git a/eudr_backend/util_classes.py b/eudr_backend/util_classes.py new file mode 100644 index 0000000..c6098f2 --- /dev/null +++ b/eudr_backend/util_classes.py @@ -0,0 +1,10 @@ +from rest_framework.permissions import BasePermission + + +class IsSuperUser(BasePermission): + """ + Allows access only to superusers. + """ + + def has_permission(self, request, view): + return request.user and request.user.is_superuser diff --git a/eudr_backend/views.py b/eudr_backend/views.py index 7010393..fc5afb6 100644 --- a/eudr_backend/views.py +++ b/eudr_backend/views.py @@ -14,6 +14,7 @@ from eudr_backend.models import EUDRCollectionSiteModel, EUDRFarmBackupModel, EUDRSharedMapAccessCodeModel, EUDRFarmModel, EUDRUploadedFilesModel from datetime import timedelta from eudr_backend.tasks import update_geoid +from eudr_backend.util_classes import IsSuperUser from eudr_backend.utils import extract_data_from_file, flatten_multipolygon_coordinates, generate_access_code, handle_failed_file_entry, store_failed_file_in_s3, transform_csv_to_json, transform_db_data_to_geojson from eudr_backend.validators import validate_csv, validate_geojson from .serializers import ( @@ -73,7 +74,7 @@ def create_user(request): } ) @api_view(["GET"]) -@permission_classes([IsAuthenticated]) +@permission_classes([IsSuperUser]) def retrieve_users(request): data = User.objects.all().order_by("-date_joined") serializer = EUDRUserModelSerializer(data, many=True) @@ -121,14 +122,25 @@ def retrieve_user(request, pk): @api_view(["PUT"]) @permission_classes([IsAuthenticated]) def update_user(request, pk): - user = User.objects.get(id=pk) - serializer = EUDRUserModelSerializer(instance=user, data=request.data) + try: + user = User.objects.get(id=pk) - if serializer.is_valid(): - serializer.save() - return Response(serializer.data, status=status.HTTP_200_OK) + # only superusers or the user themselves can update their details + if not request.user.is_superuser and request.user.id != user.id: + return Response({'error': 'You do not have permission to perform this action'}, status=status.HTTP_403_FORBIDDEN) - return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + if 'username' not in request.data: + request.data['username'] = user.username + + serializer = EUDRUserModelSerializer(instance=user, data=request.data) + + if serializer.is_valid(): + serializer.save() + return Response(serializer.data, status=status.HTTP_200_OK) + + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + except User.DoesNotExist: + return Response({'error': 'User does not exist'}, status=status.HTTP_400_BAD_REQUEST) @swagger_auto_schema( @@ -139,11 +151,17 @@ def update_user(request, pk): } ) @api_view(["DELETE"]) -@permission_classes([IsAuthenticated]) +@permission_classes([IsSuperUser]) def delete_user(request, pk): - user = User.objects.get(id=pk) - user.delete() - return Response(status=status.HTTP_204_NO_CONTENT) + if not request.user.is_superuser: + return Response({'error': 'You do not have permission to perform this action'}, status=status.HTTP_403_FORBIDDEN) + + try: + user = User.objects.get(id=pk) + user.delete() + return Response(status=status.HTTP_204_NO_CONTENT) + except User.DoesNotExist: + return Response({'error': 'User does not exist'}, status=status.HTTP_404_NOT_FOUND) @swagger_auto_schema( diff --git a/my_eudr_app/tests.py b/my_eudr_app/tests.py index 60e2013..141a446 100644 --- a/my_eudr_app/tests.py +++ b/my_eudr_app/tests.py @@ -185,10 +185,28 @@ def test_create_user(self): self.assertEqual(response.status_code, status.HTTP_201_CREATED) def test_retrieve_users(self): + # Create a superuser + superuser = User.objects.create_superuser( + username='superuser', password='superpassword') + + # Log in as superuser + self.client.login(username='superuser', password='superpassword') + + # Attempt to retrieve users url = reverse('user_list') response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) + # Log out superuser + self.client.logout() + + # Log in as a regular user + self.client.login(username='testuser', password='password123') + + # Attempt to retrieve users + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + def test_retrieve_user(self): url = reverse('user_detail', args=[self.user.id]) response = self.client.get(url) @@ -201,6 +219,10 @@ def test_update_user(self): self.assertEqual(response.status_code, status.HTTP_200_OK) def test_delete_user(self): + # Ensure only superuser can delete a user + superuser = User.objects.create_superuser( + username='superuser', password='superpassword') + self.client.login(username='superuser', password='superpassword') url = reverse('user_delete', args=[self.user.id]) response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)