diff --git a/src/layman/authz/authz_test.py b/src/layman/authz/authz_test.py index a5fd460f1..41f2cfcfb 100644 --- a/src/layman/authz/authz_test.py +++ b/src/layman/authz/authz_test.py @@ -61,112 +61,6 @@ def test_authorize_publications_decorator_accepts_path(request_path): assert isinstance(exc_info.value, LaymanError), exc_info.traceback -class TestRestApiClass: - layername = 'test_authorize_decorator_layer' - mapname = 'test_authorize_decorator_map' - username = 'test_authorize_decorator_user' - authz_headers = process_client.get_authz_headers(username) - - @pytest.fixture(scope="class") - def provide_publications(self): - username = self.username - authz_headers = self.authz_headers - layername = self.layername - mapname = self.mapname - process_client.ensure_reserved_username(username, headers=authz_headers) - process_client.publish_workspace_layer(username, layername, headers=authz_headers) - process_client.publish_workspace_map(username, mapname, headers=authz_headers) - yield - process_client.delete_workspace_layer(username, layername, headers=authz_headers) - process_client.delete_workspace_map(username, mapname, headers=authz_headers) - - @staticmethod - def assert_response(response, exp_status_code, exp_data): - assert response.status_code == exp_status_code, response.text - if exp_status_code == 200 and exp_data is not None: - resp_json = response.json() - if callable(exp_data): - assert exp_data(resp_json), f"resp_json={resp_json}, exp_data={exp_data}" - else: - assert resp_json == exp_data - elif exp_status_code != 200 and exp_data is not None: - resp_json = response.json() - assert resp_json['code'] == exp_data, f"resp_json={resp_json}, exp_data={exp_data}" - - @staticmethod - def has_single_layer(r_json): - return {li['name'] for li in r_json} == {TestRestApiClass.layername} - - @staticmethod - def has_single_map(r_json): - return {li['name'] for li in r_json} == {TestRestApiClass.mapname} - - @staticmethod - def has_no_publication(r_json): - return {li['name'] for li in r_json} == set() - - @pytest.mark.parametrize( - "rest_action, url_for_params, authz_status_code, authz_response, unauthz_status_code, unauthz_response", - [ - ('rest_workspace_layers.get', {}, 200, has_single_layer.__func__, 200, has_no_publication.__func__), - ('rest_workspace_layer.get', {'layername': layername}, 200, None, 404, 15), - ('rest_workspace_layer_metadata_comparison.get', {'layername': layername}, 200, None, 404, 15), - ('rest_workspace_layer_style.get', {'layername': layername}, 200, None, 404, 15), - ('rest_workspace_layer_thumbnail.get', {'layername': layername}, 200, None, 404, 15), - ('rest_workspace_layer_chunk.get', {'layername': layername}, 400, 20, 404, 15), - ('rest_workspace_maps.get', {}, 200, has_single_map.__func__, 200, has_no_publication.__func__), - ('rest_workspace_map.get', {'mapname': mapname}, 200, None, 404, 26), - ('rest_workspace_map_file.get', {'mapname': mapname}, 200, None, 404, 26), - ('rest_workspace_map_metadata_comparison.get', {'mapname': mapname}, 200, None, 404, 26), - ('rest_workspace_map_thumbnail.get', {'mapname': mapname}, 200, None, 404, 26), - ], - ) - @pytest.mark.usefixtures('oauth2_provider_mock', 'ensure_layman', 'provide_publications') - def test_authorize_publications_decorator_on_rest_api( - self, - rest_action, - url_for_params, - authz_status_code, - authz_response, - unauthz_status_code, - unauthz_response, - ): - username = self.username - authz_headers = self.authz_headers - patch_method = None - publ_name = None - if '_layer' in rest_action: - patch_method = process_client.patch_workspace_layer - publ_name = self.layername - elif '_map' in rest_action: - patch_method = process_client.patch_workspace_map - publ_name = self.mapname - assert publ_name - - url_for_params['workspace'] = username - - with app.app_context(): - rest_url = url_for(rest_action, **url_for_params) - - patch_method(username, publ_name, headers=authz_headers, access_rights={ - 'read': username, - 'write': username, - }) - response = requests.get(rest_url, headers=authz_headers, timeout=settings.DEFAULT_CONNECTION_TIMEOUT) - self.assert_response(response, authz_status_code, authz_response) - response = requests.get(rest_url, timeout=settings.DEFAULT_CONNECTION_TIMEOUT) - self.assert_response(response, unauthz_status_code, unauthz_response) - - patch_method(username, publ_name, headers=authz_headers, access_rights={ - 'read': settings.RIGHTS_EVERYONE_ROLE, - 'write': settings.RIGHTS_EVERYONE_ROLE, - }) - response = requests.get(rest_url, headers=authz_headers, timeout=settings.DEFAULT_CONNECTION_TIMEOUT) - self.assert_response(response, authz_status_code, authz_response) - response = requests.get(rest_url, timeout=settings.DEFAULT_CONNECTION_TIMEOUT) - self.assert_response(response, authz_status_code, authz_response) - - @pytest.mark.parametrize('roles_and_users, exp_users, exp_roles', [ pytest.param([], [], [], id='no-names'), pytest.param(['user1', 'user2'], ['user1', 'user2'], [], id='only-users'), diff --git a/test_tools/process.py b/test_tools/process.py index 9b8d642dd..b14f5e0a8 100644 --- a/test_tools/process.py +++ b/test_tools/process.py @@ -77,6 +77,9 @@ def oauth2_provider_mock(): 'test_access_rights_role_user1': None, 'test_role_application_user': None, 'test_role_application_role_user': None, + 'test_access_rights_application_owner': None, + 'test_access_rights_application_reader': None, + 'test_access_rights_application_other_user': None, }, }, 'host': '0.0.0.0', diff --git a/tests/dynamic_data/publications/access_rights/test_access_rights_application.py b/tests/dynamic_data/publications/access_rights/test_access_rights_application.py new file mode 100644 index 000000000..93f01140a --- /dev/null +++ b/tests/dynamic_data/publications/access_rights/test_access_rights_application.py @@ -0,0 +1,229 @@ +import copy +import inspect +import pytest + +from layman import settings, LaymanError +from test_tools import process_client, role_service as role_service_util +from tests import Publication, EnumTestTypes +from tests.dynamic_data import base_test + +ENDPOINTS_TO_TEST = { + process_client.LAYER_TYPE: [ + process_client.get_workspace_publication, + process_client.get_workspace_publication_metadata_comparison, + process_client.get_workspace_layer_style, + process_client.get_workspace_publication_thumbnail, + # process_client.get_workspace_layer_chunk, + ], + process_client.MAP_TYPE: [ + process_client.get_workspace_publication, + process_client.get_workspace_map_file, + process_client.get_workspace_publication_metadata_comparison, + process_client.get_workspace_publication_thumbnail, + ], +} + + +def pytest_generate_tests(metafunc): + # https://docs.pytest.org/en/6.2.x/parametrize.html#pytest-generate-tests + cls = metafunc.cls + test_fn = metafunc.function + arg_names = [a for a in inspect.getfullargspec(test_fn).args if a != 'self'] + argvalues = [] + ids = [] + + test_cases = cls.test_cases[test_fn.__name__] + for test_case in test_cases: + assert not test_case.publication, f"Not yet implemented" + assert not test_case.publication_type, f"Not yet implemented" + assert not test_case.key, f"Not yet implemented" + assert not test_case.specific_params, f"Not yet implemented" + assert not test_case.specific_types, f"Not yet implemented" + assert not test_case.parametrization, f"Not yet implemented" + assert not test_case.post_before_test_args, f"Not yet implemented" + assert test_case.type == EnumTestTypes.MANDATORY, f"Other types then MANDATORY are not implemented yet" + arg_name_to_value = { + 'rest_method': test_case.rest_method, + 'rest_args': copy.deepcopy(test_case.rest_args), + 'params': copy.deepcopy(test_case.params), + } + arg_values = [arg_name_to_value[n] for n in arg_names] + + argvalues.append(pytest.param(*arg_values, marks=test_case.marks)) + ids.append(test_case.pytest_id) + metafunc.parametrize( + argnames=', '.join(arg_names), + argvalues=argvalues, + ids=ids, + ) + + +def generate_positive_test_cases(publications_user_can_read): + tc_list = [] + for user, publications in publications_user_can_read.items(): + for publication in publications: + all_args = { + 'workspace': publication.workspace, + 'name': publication.name, + 'layer': publication.name, + 'actor_name': user, + 'publication_type': publication.type, + } + for method in ENDPOINTS_TO_TEST[publication.type]: + pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}' + method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs + + test_case = base_test.TestCaseType(pytest_id=pytest_id, + rest_method=method, + rest_args={ + key: value for key, value in all_args.items() if key in method_args + }, + type=EnumTestTypes.MANDATORY, + ) + + tc_list.append(test_case) + return tc_list + + +def generate_negative_test_cases(publications_user_can_read, publication_all): + tc_list = [] + for user, available_publications in publications_user_can_read.items(): + for publication in publication_all: + if publication in available_publications: + continue + all_args = { + 'workspace': publication.workspace, + 'name': publication.name, + 'layer': publication.name, + 'actor_name': user, + 'publication_type': publication.type, + } + for method in ENDPOINTS_TO_TEST[publication.type]: + pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}' + method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs + + test_case = base_test.TestCaseType(pytest_id=pytest_id, + rest_method=method, + rest_args={ + key: value for key, value in all_args.items() if key in method_args + }, + type=EnumTestTypes.MANDATORY, + ) + + tc_list.append(test_case) + return tc_list + + +def generate_multiendpoint_test_cases(publications_user_can_read, role_user): + tc_list = [] + for user, publications in publications_user_can_read.items(): + if user == role_user: + continue + method = process_client.get_publications + available_publications = [(publication.workspace, publication.type, publication.name) for publication in publications] + available_publications.sort(key=lambda x: (x[0], x[2], x[1])) + pytest_id = f'{method.__name__}__{user.split("_")[-1]}' + test_case = base_test.TestCaseType(pytest_id=pytest_id, + rest_method=method, + rest_args={ + 'publication_type': None, + 'actor_name': user, + }, + params={'exp_publications': available_publications}, + type=EnumTestTypes.MANDATORY, + ) + + tc_list.append(test_case) + return tc_list + + +@pytest.mark.timeout(60) +@pytest.mark.usefixtures('ensure_layman_module', 'oauth2_provider_mock') +class TestAccessRights: + OWNER = 'test_access_rights_application_owner' + READER = 'test_access_rights_application_reader' + OTHER_USER = 'test_access_rights_application_other_user' + ROLE = 'TEST_ACCESS_RIGHTS_APPLICATION_ROLE' + + LAYER_NO_ACCESS = Publication(OWNER, process_client.LAYER_TYPE, 'test_no_access_layer') + LAYER_USER_ACCESS = Publication(OWNER, process_client.LAYER_TYPE, 'test_user_access_layer') + LAYER_ROLE_ACCESS = Publication(OWNER, process_client.LAYER_TYPE, 'test_role_access_layer') + LAYER_EVERYONE_ACCESS = Publication(OWNER, process_client.LAYER_TYPE, 'test_everyone_access_layer') + MAP_NO_ACCESS = Publication(OWNER, process_client.MAP_TYPE, 'test_no_access_map') + MAP_USER_ACCESS = Publication(OWNER, process_client.MAP_TYPE, 'test_user_access_map') + MAP_ROLE_ACCESS = Publication(OWNER, process_client.MAP_TYPE, 'test_role_access_map') + MAP_EVERYONE_ACCESS = Publication(OWNER, process_client.MAP_TYPE, 'test_everyone_access_map') + + ACCESS_RIGHT_NO_ACCESS = { + 'read': OWNER, + 'write': OWNER, + } + ACCESS_RIGHTS_USER_ACCESS = { + 'read': f'{OWNER}, {READER}', + 'write': OWNER, + } + ACCESS_RIGHTS_ROLE_ACCESS = { + 'read': f'{OWNER}, {ROLE}', + 'write': OWNER, + } + ACCESS_RIGHTS_EVERYONE_ACCESS = { + 'read': settings.RIGHTS_EVERYONE_ROLE, + 'write': OWNER, + } + + PUBLICATIONS_DEFS = [ + (LAYER_NO_ACCESS, ACCESS_RIGHT_NO_ACCESS), + (LAYER_USER_ACCESS, ACCESS_RIGHTS_USER_ACCESS), + (LAYER_ROLE_ACCESS, ACCESS_RIGHTS_ROLE_ACCESS), + (LAYER_EVERYONE_ACCESS, ACCESS_RIGHTS_EVERYONE_ACCESS), + (MAP_NO_ACCESS, ACCESS_RIGHT_NO_ACCESS), + (MAP_USER_ACCESS, ACCESS_RIGHTS_USER_ACCESS), + (MAP_ROLE_ACCESS, ACCESS_RIGHTS_ROLE_ACCESS), + (MAP_EVERYONE_ACCESS, ACCESS_RIGHTS_EVERYONE_ACCESS), + ] + + PUBLICATIONS = [publication for publication, _ in PUBLICATIONS_DEFS] + + PUBLICATIONS_USER_CAN_READ = { + OWNER: [publication for publication, _ in PUBLICATIONS_DEFS], + READER: [LAYER_USER_ACCESS, LAYER_ROLE_ACCESS, LAYER_EVERYONE_ACCESS, MAP_USER_ACCESS, MAP_ROLE_ACCESS, MAP_EVERYONE_ACCESS, ], + OTHER_USER: [LAYER_EVERYONE_ACCESS, MAP_EVERYONE_ACCESS, ], + settings.ANONYM_USER: [LAYER_EVERYONE_ACCESS, MAP_EVERYONE_ACCESS, ], + } + + test_cases = { + 'test_single_positive': generate_positive_test_cases(PUBLICATIONS_USER_CAN_READ), + 'test_single_negative': generate_negative_test_cases(PUBLICATIONS_USER_CAN_READ, PUBLICATIONS), + 'test_multiendpoint': generate_multiendpoint_test_cases(PUBLICATIONS_USER_CAN_READ, READER), + } + + @pytest.fixture(scope='class', autouse=True) + def class_fixture(self, request): + process_client.ensure_reserved_username(self.OWNER, process_client.get_authz_headers(self.OWNER)) + process_client.ensure_reserved_username(self.READER, process_client.get_authz_headers(self.READER)) + process_client.ensure_reserved_username(self.OTHER_USER, process_client.get_authz_headers(self.OTHER_USER)) + for publication, access_rights in self.PUBLICATIONS_DEFS: + process_client.publish_workspace_publication(publication.type, publication.workspace, publication.name, + actor_name=self.OWNER, access_rights=access_rights, ) + role_service_util.ensure_user_role(self.READER, self.ROLE) + yield + if request.node.session.testsfailed == 0 and not request.config.option.nocleanup: + for publication, access_rights in self.PUBLICATIONS_DEFS: + process_client.delete_workspace_publication(publication.type, publication.workspace, publication.name, + actor_name=self.OWNER, ) + role_service_util.delete_user_role(self.READER, self.ROLE) + role_service_util.delete_role(self.ROLE) + + def test_single_positive(self, rest_method, rest_args, ): + rest_method(**rest_args) + + def test_single_negative(self, rest_method, rest_args, ): + with pytest.raises(LaymanError) as exc_info: + rest_method(**rest_args) + assert exc_info.value.http_code == 404 + assert exc_info.value.code in [15, 26, ] + + def test_multiendpoint(self, rest_method, rest_args, params): + result = rest_method(**rest_args) + result_publications = [(publ['workspace'], f"layman.{publ['publication_type']}", publ['name']) for publ in result] + assert result_publications == params['exp_publications'] diff --git a/tests/dynamic_data/publications/access_rights/test_role_application.py b/tests/dynamic_data/publications/access_rights/test_role_application.py deleted file mode 100644 index 07805ee4d..000000000 --- a/tests/dynamic_data/publications/access_rights/test_role_application.py +++ /dev/null @@ -1,61 +0,0 @@ -import pytest - -from layman import LaymanError -from test_tools import process_client, role_service as role_service_util -from tests import EnumTestTypes -from tests.asserts.final.publication import util as assert_util -from tests.dynamic_data import base_test, base_test_classes -from tests.dynamic_data.publications import common_publications - -pytest_generate_tests = base_test.pytest_generate_tests - - -class PublicationTypes(base_test_classes.PublicationByDefinitionBase): - LAYER = (common_publications.LAYER_VECTOR_SLD, 'layer') - - -OWNER = 'test_role_application_user' -ROLE = 'TEST_ROLE_APPLICATION_ROLE' -ROLE_USER = 'test_role_application_role_user' -USERS_AND_ROLES = {OWNER, ROLE} - - -@pytest.mark.usefixtures('oauth2_provider_mock') -class TestPublication(base_test.TestSingleRestPublication): - workspace = OWNER - publication_type = process_client.LAYER_TYPE - - rest_parametrization = [ - ] - - usernames_to_reserve = [ - OWNER, - ROLE_USER, - ] - - test_cases = [base_test.TestCaseType(key='role_test', - rest_args={ - 'access_rights': { - 'read': ','.join(USERS_AND_ROLES), - }, - 'actor_name': OWNER, - }, - type=EnumTestTypes.MANDATORY, - )] - - def test_publication(self, layer, rest_method, rest_args): - rest_method.fn(layer, args=rest_args) - assert_util.is_publication_valid_and_complete(layer) - - with pytest.raises(LaymanError) as exc_info: - process_client.get_workspace_publication(layer.type, layer.workspace, layer.name, actor_name=ROLE_USER) - assert exc_info.value.http_code == 404 - assert exc_info.value.code == 15 - assert exc_info.value.message == 'Layer was not found' - - role_service_util.ensure_user_role(ROLE_USER, ROLE) - info = process_client.get_workspace_publication(layer.type, layer.workspace, layer.name, actor_name=ROLE_USER) - assert set(info['access_rights']['read']) == USERS_AND_ROLES - - role_service_util.delete_user_role(ROLE_USER, ROLE) - role_service_util.delete_role(ROLE)