From f311f3269718812aeb02f3967684d7f8cf4b6daa Mon Sep 17 00:00:00 2001 From: Mohamed Abdel Wedoud Date: Mon, 19 Feb 2024 18:41:10 +0100 Subject: [PATCH] docs(permission-db): update following code review --- antarest/study/repository.py | 16 ++++++---------- antarest/study/service.py | 11 ++++++----- tests/study/test_repository.py | 20 ++++++-------------- 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/antarest/study/repository.py b/antarest/study/repository.py index 97e7bc0a0b..a8761b7b02 100644 --- a/antarest/study/repository.py +++ b/antarest/study/repository.py @@ -3,7 +3,7 @@ import typing as t from pydantic import BaseModel, NonNegativeInt -from sqlalchemy import and_, func, not_, or_ # type: ignore +from sqlalchemy import and_, func, not_, or_, sql # type: ignore from sqlalchemy.orm import Query, Session, joinedload, with_polymorphic # type: ignore from antarest.core.interfaces.cache import ICache @@ -52,6 +52,7 @@ def from_params(cls, params: t.Union[RequestParameters, JWTUser]) -> "AccessPerm """ This function makes it easier to pass on user ids and groups into the repository filtering function by extracting the associated `AccessPermissions` object. + Args: params: `RequestParameters` or `JWTUser` holding user ids and groups @@ -89,7 +90,7 @@ class StudyFilter(BaseModel, frozen=True, extra="forbid"): exists: if raw study missing workspace: optional workspace of the study folder: optional folder prefix of the study - access_permissions: query user id, groups and admins status + access_permissions: query user ID, groups and admins status """ name: str = "" @@ -233,7 +234,6 @@ def get_all( Returns: The matching studies in proper order and pagination. """ - # When we fetch a study, we also need to fetch the associated owner and groups # to check the permissions of the current user efficiently. # We also need to fetch the additional data to display the study information @@ -242,10 +242,6 @@ def get_all( q = self._search_studies(study_filter) - # permissions filtering - if not study_filter.access_permissions.is_admin and study_filter.access_permissions.user_id is None: - return [] - # sorting if sort_by: if sort_by == StudySortBy.DATE_DESC: @@ -278,9 +274,6 @@ def count_studies(self, study_filter: StudyFilter = StudyFilter()) -> int: """ q = self._search_studies(study_filter) - # permissions filtering - if not study_filter.access_permissions.is_admin and study_filter.access_permissions.user_id is None: - return 0 total: int = q.count() return total @@ -356,6 +349,9 @@ def _search_studies( ) else: q = q1.union(q.filter(or_(condition_1, condition_2))) + elif not study_filter.access_permissions.is_admin and study_filter.access_permissions.user_id is None: + # return empty result + q = q.filter(sql.false()) elif study_filter.groups: q = q.join(entity.groups).filter(Group.id.in_(study_filter.groups)) diff --git a/antarest/study/service.py b/antarest/study/service.py index f64f6d36f2..7290cf692d 100644 --- a/antarest/study/service.py +++ b/antarest/study/service.py @@ -2146,14 +2146,15 @@ def update_matrix( def check_and_update_all_study_versions_in_database(self, params: RequestParameters) -> None: """ - This function updates studies version on the db. \n + This function updates studies version on the db. + **Warnings: Only users with Admins rights should be able to run this function.** - Args: - params: `RequestParameters` holding user ids and groups - Returns: + Args: + params: Request parameters holding user ID and groups - Raises: `UserHasNotPermissionError` if params user is not admin. + Raises: + UserHasNotPermissionError: if params user is not admin. """ if params.user and not params.user.is_site_admin(): diff --git a/tests/study/test_repository.py b/tests/study/test_repository.py index b4ecbfb872..cfeb3057bb 100644 --- a/tests/study/test_repository.py +++ b/tests/study/test_repository.py @@ -874,20 +874,16 @@ def test_get_all__non_admin_permissions_filter( ) # use the db recorder to check that: - # 1- retrieving all studies requires only 1 query if user_id is not None + # 1- retrieving all studies requires only 1 query # 2- accessing studies attributes does not require additional queries to db - # 3- having an exact total of queries equals to 1 if user_id is not None + # 3- having an exact total of queries equals to 1 with DBStatementRecorder(db_session.bind) as db_recorder: all_studies = repository.get_all(study_filter=study_filter) _ = [s.owner for s in all_studies] _ = [s.groups for s in all_studies] _ = [s.additional_data for s in all_studies] _ = [s.tags for s in all_studies] - if user_id: - assert len(db_recorder.sql_statements) == 1, str(db_recorder) - else: - # no query should be executed if user_id is None - assert len(db_recorder.sql_statements) == 0, str(db_recorder) + assert len(db_recorder.sql_statements) == 1, str(db_recorder) if expected_ids is not None: assert {s.id for s in all_studies} == expected_ids @@ -1006,20 +1002,16 @@ def test_get_all__admin_permissions_filter( else StudyFilter(access_permissions=access_permissions) ) # use the db recorder to check that: - # 1- retrieving all studies requires only 1 query if user is admin + # 1- retrieving all studies requires only 1 query # 2- accessing studies attributes does not require additional queries to db - # 3- having an exact total of queries equals to 1 if user is admin + # 3- having an exact total of queries equals to 1 with DBStatementRecorder(db_session.bind) as db_recorder: all_studies = repository.get_all(study_filter=study_filter) _ = [s.owner for s in all_studies] _ = [s.groups for s in all_studies] _ = [s.additional_data for s in all_studies] _ = [s.tags for s in all_studies] - if is_admin: - assert len(db_recorder.sql_statements) == 1, str(db_recorder) - else: - # no query should be executed if user is not admin - assert len(db_recorder.sql_statements) == 0, str(db_recorder) + assert len(db_recorder.sql_statements) == 1, str(db_recorder) if expected_ids is not None: assert {s.id for s in all_studies} == expected_ids