From 43503994ba3fa3d668c520ecd3c5d43f05adcccc Mon Sep 17 00:00:00 2001 From: index-git Date: Tue, 2 Jan 2024 15:56:26 +0100 Subject: [PATCH] Test access rights application on WFS-T queries --- src/layman/geoserver_proxy_test.py | 57 ------------------- test_tools/process_client.py | 13 +++++ .../test_access_rights_application.py | 34 ++++++++--- 3 files changed, 39 insertions(+), 65 deletions(-) diff --git a/src/layman/geoserver_proxy_test.py b/src/layman/geoserver_proxy_test.py index 9ae490643..2d351113c 100644 --- a/src/layman/geoserver_proxy_test.py +++ b/src/layman/geoserver_proxy_test.py @@ -1,67 +1,10 @@ import pytest -from geoserver.error import Error as GS_Error from layman import app -from layman.layer import db -from test_tools import process_client from test_tools.data import wfs as data_wfs -from test_tools.process_client import get_authz_headers from . import geoserver_proxy -@pytest.mark.usefixtures('ensure_layman', 'oauth2_provider_mock') -def test_missing_attribute_authz(): - username = 'testmissingattr_authz' - layername1 = 'testmissingattr_authz_layer' - username2 = 'testmissingattr_authz2' - - authn_headers1 = get_authz_headers(username) - authn_headers2 = get_authz_headers(username2) - - def do_test(wfs_query, attribute_names): - # Test, that unauthorized user will not cause new attribute - with app.app_context(): - old_db_attributes = db.get_internal_table_all_column_names(username, layername1) - for attr_name in attribute_names: - assert attr_name not in old_db_attributes, f"old_db_attributes={old_db_attributes}, attr_name={attr_name}" - with pytest.raises(GS_Error) as exc: - process_client.post_wfst(wfs_query, headers=authn_headers2, workspace=username) - assert exc.value.data['status_code'] == 400 - - with app.app_context(): - new_db_attributes = db.get_internal_table_all_column_names(username, layername1) - for attr_name in attribute_names: - assert attr_name not in new_db_attributes, f"new_db_attributes={new_db_attributes}, attr_name={attr_name}" - - # Test, that authorized user will cause new attribute - process_client.post_wfst(wfs_query, headers=authn_headers1, workspace=username) - with app.app_context(): - new_db_attributes = db.get_internal_table_all_column_names(username, layername1) - for attr_name in attribute_names: - assert attr_name in new_db_attributes, f"new_db_attributes={new_db_attributes}, attr_name={attr_name}" - - process_client.reserve_username(username, headers=authn_headers1) - process_client.publish_workspace_layer(username, - layername1, - file_paths=['tmp/naturalearth/110m/cultural/ne_110m_admin_0_countries.geojson', ], - headers=authn_headers1) - - # Testing, that user2 is not able to write to layer of user1 - process_client.reserve_username(username2, headers=authn_headers2) - - # INSERT - attr_names = ['inexisting_attribute_auth1', 'inexisting_attribute_auth2'] - data_xml = data_wfs.get_wfs20_insert_points_new_attr(username, layername1, attr_names) - do_test(data_xml, attr_names) - - # UPDATE - attr_names = ['inexisting_attribute_auth3', 'inexisting_attribute_auth4'] - data_xml = data_wfs.get_wfs20_update_points_new_attr(username, layername1, attr_names) - do_test(data_xml, attr_names) - - process_client.delete_workspace_layer(username, layername1, headers=authn_headers1) - - @pytest.mark.parametrize('wfst_data_method, hardcoded_attrs', [ pytest.param( data_wfs.get_wfs20_insert_points_new_attr, diff --git a/test_tools/process_client.py b/test_tools/process_client.py index a803481bf..7d6d7b891 100644 --- a/test_tools/process_client.py +++ b/test_tools/process_client.py @@ -703,6 +703,19 @@ def post_wfst(xml, *, headers=None, url=None, workspace=None): raise gs_error.Error(code_or_message='WFS-T error', data={'status_code': response.status_code}) +def post_wfst_with_xml_getter(workspace, layer, *, xml_getter, actor_name=None, xml_getter_params=None): + xml_getter_params = xml_getter_params or {} + headers = {} + if actor_name: + assert TOKEN_HEADER not in headers + + if actor_name and actor_name != settings.ANONYM_USER: + headers.update(get_authz_headers(actor_name)) + + xml = xml_getter(workspace, layer, **xml_getter_params) + post_wfst(xml, headers=headers, workspace=workspace) + + def check_publication_status(response): try: current_status = response.json().get('layman_metadata', {}).get('publication_status') diff --git a/tests/dynamic_data/publications/access_rights/test_access_rights_application.py b/tests/dynamic_data/publications/access_rights/test_access_rights_application.py index 54337a8ee..e60b97358 100644 --- a/tests/dynamic_data/publications/access_rights/test_access_rights_application.py +++ b/tests/dynamic_data/publications/access_rights/test_access_rights_application.py @@ -2,8 +2,10 @@ import inspect import pytest +from geoserver.error import Error as GS_Error from layman import settings, LaymanError from test_tools import process_client, role_service as role_service_util +from test_tools.data import wfs from tests import Publication, EnumTestTypes from tests.asserts.final.publication import geoserver_proxy from tests.dynamic_data import base_test @@ -37,6 +39,12 @@ (geoserver_proxy.workspace_wfs_2_0_0_capabilities_available_if_vector, {}), ] +WFST_METHODS_TO_TEST = [ + (process_client.post_wfst_with_xml_getter, {'xml_method': wfs.get_wfs20_insert_lines}), + (process_client.post_wfst_with_xml_getter, {'xml_method': wfs.get_wfs20_insert_points_new_attr, 'xml_method_params': {'attr_names': ['inexisting_attribute_auth1', 'inexisting_attribute_auth2']}}), + (process_client.post_wfst_with_xml_getter, {'xml_method': wfs.get_wfs20_update_points_new_attr, 'xml_method_params': {'attr_names': ['inexisting_attribute_auth3', 'inexisting_attribute_auth4']}}), +] + def pytest_generate_tests(metafunc): # https://docs.pytest.org/en/6.2.x/parametrize.html#pytest-generate-tests @@ -72,7 +80,9 @@ def pytest_generate_tests(metafunc): ) -def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test): +def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test, *, args_to_id_fnc=None): + args_to_id_fnc = args_to_id_fnc or 'keys' + assert args_to_id_fnc in ['keys', 'values_name'] all_args = { 'workspace': publication.workspace, 'name': publication.name, @@ -82,7 +92,9 @@ def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_ 'publ_type': publication.type, } for method, args in endpoints_to_test: - pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}{("__" + next(iter(args.keys()))) if args else ""}' + + args_pytest_id = ("__" + (next(iter(args.keys())) if args_to_id_fnc == 'keys' else next(iter(args.values())).__name__)) if args else "" + pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}{args_pytest_id}' method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs test_case = base_test.TestCaseType(pytest_id=pytest_id, @@ -146,21 +158,21 @@ def generate_multiendpoint_test_cases(publications_user_can_read, workspace, ): return tc_list -def generate_positive_geoserver_test_cases(publications_user_can_read): +def generate_positive_geoserver_test_cases(publications_user_can_read, methods_to_test): tc_list = [] for user, publications in publications_user_can_read.items(): for publication in publications: if publication.type == process_client.LAYER_TYPE: - add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST) + add_publication_test_cases_to_list(tc_list, publication, user, methods_to_test, args_to_id_fnc='values_name') return tc_list -def generate_geoserver_negative_test_cases(publications_user_can_read, publication_all): +def generate_geoserver_negative_test_cases(publications_user_can_read, publication_all, methods_to_test): tc_list = [] for user, available_publications in publications_user_can_read.items(): for publication in publication_all: if publication not in available_publications and publication.type == process_client.LAYER_TYPE: - add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST) + add_publication_test_cases_to_list(tc_list, publication, user, methods_to_test, args_to_id_fnc='values_name') return tc_list @@ -216,10 +228,11 @@ class TestAccessRights: } test_cases = { - 'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER), + 'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER, GEOSERVER_METHODS_TO_TEST) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER, WFST_METHODS_TO_TEST), 'test_single_negative': generate_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS), 'test_multiendpoint': generate_multiendpoint_test_cases(PUBLICATIONS_BY_USER, OWNER), - 'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS), + 'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS, GEOSERVER_METHODS_TO_TEST), + 'test_wfst_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS, WFST_METHODS_TO_TEST), } @pytest.fixture(scope='class', autouse=True) @@ -264,3 +277,8 @@ def test_geoserver_negative(self, rest_method, rest_args, ): with pytest.raises(AssertionError) as exc_info: rest_method(**rest_args) assert exc_info.value.args[0].startswith('Layer not found in Capabilities.') + + def test_wfst_negative(self, rest_method, rest_args, ): + with pytest.raises(GS_Error) as exc_info: + rest_method(**rest_args) + assert exc_info.value.data['status_code'] == 400