diff --git a/src/layman/geoserver_proxy_test.py b/src/layman/geoserver_proxy_test.py index 9ae490643..2d351113c 100644 --- a/src/layman/geoserver_proxy_test.py +++ b/src/layman/geoserver_proxy_test.py @@ -1,67 +1,10 @@ import pytest -from geoserver.error import Error as GS_Error from layman import app -from layman.layer import db -from test_tools import process_client from test_tools.data import wfs as data_wfs -from test_tools.process_client import get_authz_headers from . import geoserver_proxy -@pytest.mark.usefixtures('ensure_layman', 'oauth2_provider_mock') -def test_missing_attribute_authz(): - username = 'testmissingattr_authz' - layername1 = 'testmissingattr_authz_layer' - username2 = 'testmissingattr_authz2' - - authn_headers1 = get_authz_headers(username) - authn_headers2 = get_authz_headers(username2) - - def do_test(wfs_query, attribute_names): - # Test, that unauthorized user will not cause new attribute - with app.app_context(): - old_db_attributes = db.get_internal_table_all_column_names(username, layername1) - for attr_name in attribute_names: - assert attr_name not in old_db_attributes, f"old_db_attributes={old_db_attributes}, attr_name={attr_name}" - with pytest.raises(GS_Error) as exc: - process_client.post_wfst(wfs_query, headers=authn_headers2, workspace=username) - assert exc.value.data['status_code'] == 400 - - with app.app_context(): - new_db_attributes = db.get_internal_table_all_column_names(username, layername1) - for attr_name in attribute_names: - assert attr_name not in new_db_attributes, f"new_db_attributes={new_db_attributes}, attr_name={attr_name}" - - # Test, that authorized user will cause new attribute - process_client.post_wfst(wfs_query, headers=authn_headers1, workspace=username) - with app.app_context(): - new_db_attributes = db.get_internal_table_all_column_names(username, layername1) - for attr_name in attribute_names: - assert attr_name in new_db_attributes, f"new_db_attributes={new_db_attributes}, attr_name={attr_name}" - - process_client.reserve_username(username, headers=authn_headers1) - process_client.publish_workspace_layer(username, - layername1, - file_paths=['tmp/naturalearth/110m/cultural/ne_110m_admin_0_countries.geojson', ], - headers=authn_headers1) - - # Testing, that user2 is not able to write to layer of user1 - process_client.reserve_username(username2, headers=authn_headers2) - - # INSERT - attr_names = ['inexisting_attribute_auth1', 'inexisting_attribute_auth2'] - data_xml = data_wfs.get_wfs20_insert_points_new_attr(username, layername1, attr_names) - do_test(data_xml, attr_names) - - # UPDATE - attr_names = ['inexisting_attribute_auth3', 'inexisting_attribute_auth4'] - data_xml = data_wfs.get_wfs20_update_points_new_attr(username, layername1, attr_names) - do_test(data_xml, attr_names) - - process_client.delete_workspace_layer(username, layername1, headers=authn_headers1) - - @pytest.mark.parametrize('wfst_data_method, hardcoded_attrs', [ pytest.param( data_wfs.get_wfs20_insert_points_new_attr, diff --git a/test_tools/process_client.py b/test_tools/process_client.py index a803481bf..7d6d7b891 100644 --- a/test_tools/process_client.py +++ b/test_tools/process_client.py @@ -703,6 +703,19 @@ def post_wfst(xml, *, headers=None, url=None, workspace=None): raise gs_error.Error(code_or_message='WFS-T error', data={'status_code': response.status_code}) +def post_wfst_with_xml_getter(workspace, layer, *, xml_getter, actor_name=None, xml_getter_params=None): + xml_getter_params = xml_getter_params or {} + headers = {} + if actor_name: + assert TOKEN_HEADER not in headers + + if actor_name and actor_name != settings.ANONYM_USER: + headers.update(get_authz_headers(actor_name)) + + xml = xml_getter(workspace, layer, **xml_getter_params) + post_wfst(xml, headers=headers, workspace=workspace) + + def check_publication_status(response): try: current_status = response.json().get('layman_metadata', {}).get('publication_status') diff --git a/tests/dynamic_data/publications/access_rights/test_access_rights_application.py b/tests/dynamic_data/publications/access_rights/test_access_rights_application.py index 54337a8ee..e0c14f4c4 100644 --- a/tests/dynamic_data/publications/access_rights/test_access_rights_application.py +++ b/tests/dynamic_data/publications/access_rights/test_access_rights_application.py @@ -2,10 +2,13 @@ import inspect import pytest -from layman import settings, LaymanError +from geoserver.error import Error as GS_Error +from layman import app, settings, LaymanError +from layman.layer import db from test_tools import process_client, role_service as role_service_util +from test_tools.data import wfs from tests import Publication, EnumTestTypes -from tests.asserts.final.publication import geoserver_proxy +from tests.asserts.final.publication import geoserver_proxy, util as assert_publ_util from tests.dynamic_data import base_test ENDPOINTS_TO_TEST = { @@ -37,6 +40,12 @@ (geoserver_proxy.workspace_wfs_2_0_0_capabilities_available_if_vector, {}), ] +WFST_METHODS_TO_TEST = { + 'insert_lines': {'xml_getter': wfs.get_wfs20_insert_lines}, + 'insert_points_new_attr': {'xml_getter': wfs.get_wfs20_insert_points_new_attr, 'attr_names': ['inexist_attr1', 'inexist_attr2']}, + 'update_points_new_attr': {'xml_getter': wfs.get_wfs20_update_points_new_attr, 'attr_names': ['inexist_attr3', 'inexist_attr4']}, +} + def pytest_generate_tests(metafunc): # https://docs.pytest.org/en/6.2.x/parametrize.html#pytest-generate-tests @@ -48,8 +57,6 @@ def pytest_generate_tests(metafunc): test_cases = cls.test_cases[test_fn.__name__] for test_case in test_cases: - assert not test_case.publication, f"Not yet implemented" - assert not test_case.publication_type, f"Not yet implemented" assert not test_case.key, f"Not yet implemented" assert not test_case.specific_params, f"Not yet implemented" assert not test_case.specific_types, f"Not yet implemented" @@ -57,6 +64,8 @@ def pytest_generate_tests(metafunc): assert not test_case.post_before_test_args, f"Not yet implemented" assert test_case.type == EnumTestTypes.MANDATORY, f"Other types then MANDATORY are not implemented yet" arg_name_to_value = { + 'publication': test_case.publication, + 'publication_type': test_case.publication_type, 'rest_method': test_case.rest_method, 'rest_args': copy.deepcopy(test_case.rest_args), 'params': copy.deepcopy(test_case.params), @@ -164,6 +173,63 @@ def generate_geoserver_negative_test_cases(publications_user_can_read, publicati return tc_list +def add_wfst_publication_test_cases_to_list(tc_list, publication, user, username_for_id, test_cases): + method = process_client.post_wfst_with_xml_getter + all_args = { + 'workspace': publication.workspace, + 'name': publication.name, + 'layer': publication.name, + 'actor_name': user, + 'publication_type': publication.type, + 'publ_type': publication.type, + } + for key, test_case in test_cases.items(): + attribs = [attr + f'_{username_for_id.lower()}' for attr in test_case.get('attr_names', [])] + args = { + 'xml_getter': test_case['xml_getter'], + 'xml_getter_params': {'attr_names': attribs} if attribs else {}, + } + params = { + 'headers': process_client.get_authz_headers(user) if user != settings.ANONYM_USER else {} + } + if attribs: + params['new_attribs'] = attribs + pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}__{key}' + method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs + + test_case = base_test.TestCaseType(pytest_id=pytest_id, + publication=publication, + publication_type=process_client.LAYER_TYPE, + rest_method=method, + rest_args={**args, **{ + key: value for key, value in all_args.items() if key in method_args + }}, + params=params, + type=EnumTestTypes.MANDATORY, + ) + + tc_list.append(test_case) + + +def generate_wfst_negative_test_cases(publications_user_can_read, publication_all, test_cases): + tc_list = [] + for user, available_publications in publications_user_can_read.items(): + username_for_id = user.replace('--', "") + for publication in [publication for publication in publication_all if + publication not in available_publications and publication.type == process_client.LAYER_TYPE]: + add_wfst_publication_test_cases_to_list(tc_list, publication, user, username_for_id, test_cases) + return tc_list + + +def generate_positive_wfst_test_cases(publications_user_can_read, test_cases): + tc_list = [] + for user, publications in publications_user_can_read.items(): + username_for_id = user.replace('--', "") + for publication in [publication for publication in publications if publication.type == process_client.LAYER_TYPE]: + add_wfst_publication_test_cases_to_list(tc_list, publication, user, username_for_id, test_cases) + return tc_list + + @pytest.mark.timeout(60) @pytest.mark.usefixtures('ensure_layman_module', 'oauth2_provider_mock') class TestAccessRights: @@ -220,6 +286,8 @@ class TestAccessRights: 'test_single_negative': generate_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS), 'test_multiendpoint': generate_multiendpoint_test_cases(PUBLICATIONS_BY_USER, OWNER), 'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS), + 'test_wfst_positive': generate_positive_wfst_test_cases(PUBLICATIONS_BY_USER, WFST_METHODS_TO_TEST), + 'test_wfst_negative': generate_wfst_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS, WFST_METHODS_TO_TEST), } @pytest.fixture(scope='class', autouse=True) @@ -264,3 +332,27 @@ def test_geoserver_negative(self, rest_method, rest_args, ): with pytest.raises(AssertionError) as exc_info: rest_method(**rest_args) assert exc_info.value.args[0].startswith('Layer not found in Capabilities.') + + def test_wfst_positive(self, publication, rest_method, rest_args, params): + attr_names = params.get('new_attributes', []) + with app.app_context(): + old_db_attributes = db.get_internal_table_all_column_names(publication.workspace, publication.name) + for attr_name in attr_names: + assert attr_name not in old_db_attributes, \ + f"old_db_attributes={old_db_attributes}, attr_name={attr_name}" + rest_method(**rest_args) + process_client.wait_for_publication_status(publication.workspace, + publication.type, + publication.name, + headers=params['headers']) + assert_publ_util.is_publication_valid_and_complete(publication) + with app.app_context(): + new_db_attributes = db.get_internal_table_all_column_names(publication.workspace, publication.name) + for attr_name in attr_names: + assert attr_name in new_db_attributes, \ + f"new_db_attributes={new_db_attributes}, attr_name={attr_name}" + + def test_wfst_negative(self, rest_method, rest_args, ): + with pytest.raises(GS_Error) as exc_info: + rest_method(**rest_args) + assert exc_info.value.data['status_code'] == 400