Skip to content

Commit

Permalink
Test access rights application on WFS-T queries
Browse files Browse the repository at this point in the history
  • Loading branch information
index-git committed Jan 3, 2024
1 parent 0de894d commit 1f4a91f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 61 deletions.
57 changes: 0 additions & 57 deletions src/layman/geoserver_proxy_test.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,10 @@
import pytest

from geoserver.error import Error as GS_Error
from layman import app
from layman.layer import db
from test_tools import process_client
from test_tools.data import wfs as data_wfs
from test_tools.process_client import get_authz_headers
from . import geoserver_proxy


@pytest.mark.usefixtures('ensure_layman', 'oauth2_provider_mock')
def test_missing_attribute_authz():
username = 'testmissingattr_authz'
layername1 = 'testmissingattr_authz_layer'
username2 = 'testmissingattr_authz2'

authn_headers1 = get_authz_headers(username)
authn_headers2 = get_authz_headers(username2)

def do_test(wfs_query, attribute_names):
# Test, that unauthorized user will not cause new attribute
with app.app_context():
old_db_attributes = db.get_internal_table_all_column_names(username, layername1)
for attr_name in attribute_names:
assert attr_name not in old_db_attributes, f"old_db_attributes={old_db_attributes}, attr_name={attr_name}"
with pytest.raises(GS_Error) as exc:
process_client.post_wfst(wfs_query, headers=authn_headers2, workspace=username)
assert exc.value.data['status_code'] == 400

with app.app_context():
new_db_attributes = db.get_internal_table_all_column_names(username, layername1)
for attr_name in attribute_names:
assert attr_name not in new_db_attributes, f"new_db_attributes={new_db_attributes}, attr_name={attr_name}"

# Test, that authorized user will cause new attribute
process_client.post_wfst(wfs_query, headers=authn_headers1, workspace=username)
with app.app_context():
new_db_attributes = db.get_internal_table_all_column_names(username, layername1)
for attr_name in attribute_names:
assert attr_name in new_db_attributes, f"new_db_attributes={new_db_attributes}, attr_name={attr_name}"

process_client.reserve_username(username, headers=authn_headers1)
process_client.publish_workspace_layer(username,
layername1,
file_paths=['tmp/naturalearth/110m/cultural/ne_110m_admin_0_countries.geojson', ],
headers=authn_headers1)

# Testing, that user2 is not able to write to layer of user1
process_client.reserve_username(username2, headers=authn_headers2)

# INSERT
attr_names = ['inexisting_attribute_auth1', 'inexisting_attribute_auth2']
data_xml = data_wfs.get_wfs20_insert_points_new_attr(username, layername1, attr_names)
do_test(data_xml, attr_names)

# UPDATE
attr_names = ['inexisting_attribute_auth3', 'inexisting_attribute_auth4']
data_xml = data_wfs.get_wfs20_update_points_new_attr(username, layername1, attr_names)
do_test(data_xml, attr_names)

process_client.delete_workspace_layer(username, layername1, headers=authn_headers1)


@pytest.mark.parametrize('wfst_data_method, hardcoded_attrs', [
pytest.param(
data_wfs.get_wfs20_insert_points_new_attr,
Expand Down
13 changes: 13 additions & 0 deletions test_tools/process_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,19 @@ def post_wfst(xml, *, headers=None, url=None, workspace=None):
raise gs_error.Error(code_or_message='WFS-T error', data={'status_code': response.status_code})


def post_wfst_with_xml_getter(workspace, layer, *, xml_getter, actor_name=None, xml_getter_params=None):
xml_getter_params = xml_getter_params or {}
headers = {}
if actor_name:
assert TOKEN_HEADER not in headers

if actor_name and actor_name != settings.ANONYM_USER:
headers.update(get_authz_headers(actor_name))

xml = xml_getter(workspace, layer, **xml_getter_params)
post_wfst(xml, headers=headers, workspace=workspace)


def check_publication_status(response):
try:
current_status = response.json().get('layman_metadata', {}).get('publication_status')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import inspect
import pytest

from layman import settings, LaymanError
from geoserver.error import Error as GS_Error
from layman import app, settings, LaymanError
from layman.layer import db
from test_tools import process_client, role_service as role_service_util
from test_tools.data import wfs
from tests import Publication, EnumTestTypes
from tests.asserts.final.publication import geoserver_proxy
from tests.asserts.final.publication import geoserver_proxy, util as assert_publ_util
from tests.dynamic_data import base_test

ENDPOINTS_TO_TEST = {
Expand Down Expand Up @@ -37,6 +40,12 @@
(geoserver_proxy.workspace_wfs_2_0_0_capabilities_available_if_vector, {}),
]

WFST_METHODS_TO_TEST = {
'insert_lines': {'xml_getter': wfs.get_wfs20_insert_lines},
'insert_points_new_attr': {'xml_getter': wfs.get_wfs20_insert_points_new_attr, 'attr_names': ['inexist_attr1', 'inexist_attr2']},
'update_points_new_attr': {'xml_getter': wfs.get_wfs20_update_points_new_attr, 'attr_names': ['inexist_attr3', 'inexist_attr4']},
}


def pytest_generate_tests(metafunc):
# https://docs.pytest.org/en/6.2.x/parametrize.html#pytest-generate-tests
Expand All @@ -48,15 +57,15 @@ def pytest_generate_tests(metafunc):

test_cases = cls.test_cases[test_fn.__name__]
for test_case in test_cases:
assert not test_case.publication, f"Not yet implemented"
assert not test_case.publication_type, f"Not yet implemented"
assert not test_case.key, f"Not yet implemented"
assert not test_case.specific_params, f"Not yet implemented"
assert not test_case.specific_types, f"Not yet implemented"
assert not test_case.parametrization, f"Not yet implemented"
assert not test_case.post_before_test_args, f"Not yet implemented"
assert test_case.type == EnumTestTypes.MANDATORY, f"Other types then MANDATORY are not implemented yet"
arg_name_to_value = {
'publication': test_case.publication,
'publication_type': test_case.publication_type,
'rest_method': test_case.rest_method,
'rest_args': copy.deepcopy(test_case.rest_args),
'params': copy.deepcopy(test_case.params),
Expand Down Expand Up @@ -164,6 +173,63 @@ def generate_geoserver_negative_test_cases(publications_user_can_read, publicati
return tc_list


def add_wfst_publication_test_cases_to_list(tc_list, publication, user, username_for_id, test_cases):
method = process_client.post_wfst_with_xml_getter
all_args = {
'workspace': publication.workspace,
'name': publication.name,
'layer': publication.name,
'actor_name': user,
'publication_type': publication.type,
'publ_type': publication.type,
}
for key, test_case in test_cases.items():
attribs = [attr + f'_{username_for_id.lower()}' for attr in test_case.get('attr_names', [])]
args = {
'xml_getter': test_case['xml_getter'],
'xml_getter_params': {'attr_names': attribs} if attribs else {},
}
params = {
'headers': process_client.get_authz_headers(user) if user != settings.ANONYM_USER else {}
}
if attribs:
params['new_attribs'] = attribs
pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}__{key}'
method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs

test_case = base_test.TestCaseType(pytest_id=pytest_id,
publication=publication,
publication_type=process_client.LAYER_TYPE,
rest_method=method,
rest_args={**args, **{
key: value for key, value in all_args.items() if key in method_args
}},
params=params,
type=EnumTestTypes.MANDATORY,
)

tc_list.append(test_case)


def generate_wfst_negative_test_cases(publications_user_can_read, publication_all, test_cases):
tc_list = []
for user, available_publications in publications_user_can_read.items():
username_for_id = user.replace('--', "")
for publication in [publication for publication in publication_all if
publication not in available_publications and publication.type == process_client.LAYER_TYPE]:
add_wfst_publication_test_cases_to_list(tc_list, publication, user, username_for_id, test_cases)
return tc_list


def generate_positive_wfst_test_cases(publications_user_can_read, test_cases):
tc_list = []
for user, publications in publications_user_can_read.items():
username_for_id = user.replace('--', "")
for publication in [publication for publication in publications if publication.type == process_client.LAYER_TYPE]:
add_wfst_publication_test_cases_to_list(tc_list, publication, user, username_for_id, test_cases)
return tc_list


@pytest.mark.timeout(60)
@pytest.mark.usefixtures('ensure_layman_module', 'oauth2_provider_mock')
class TestAccessRights:
Expand Down Expand Up @@ -220,6 +286,8 @@ class TestAccessRights:
'test_single_negative': generate_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS),
'test_multiendpoint': generate_multiendpoint_test_cases(PUBLICATIONS_BY_USER, OWNER),
'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS),
'test_wfst_positive': generate_positive_wfst_test_cases(PUBLICATIONS_BY_USER, WFST_METHODS_TO_TEST),
'test_wfst_negative': generate_wfst_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS, WFST_METHODS_TO_TEST),
}

@pytest.fixture(scope='class', autouse=True)
Expand Down Expand Up @@ -264,3 +332,27 @@ def test_geoserver_negative(self, rest_method, rest_args, ):
with pytest.raises(AssertionError) as exc_info:
rest_method(**rest_args)
assert exc_info.value.args[0].startswith('Layer not found in Capabilities.')

def test_wfst_positive(self, publication, rest_method, rest_args, params):
attr_names = params.get('new_attributes', [])
with app.app_context():
old_db_attributes = db.get_internal_table_all_column_names(publication.workspace, publication.name)
for attr_name in attr_names:
assert attr_name not in old_db_attributes, \
f"old_db_attributes={old_db_attributes}, attr_name={attr_name}"
rest_method(**rest_args)
process_client.wait_for_publication_status(publication.workspace,
publication.type,
publication.name,
headers=params['headers'])
assert_publ_util.is_publication_valid_and_complete(publication)
with app.app_context():
new_db_attributes = db.get_internal_table_all_column_names(publication.workspace, publication.name)
for attr_name in attr_names:
assert attr_name in new_db_attributes, \
f"new_db_attributes={new_db_attributes}, attr_name={attr_name}"

def test_wfst_negative(self, rest_method, rest_args, ):
with pytest.raises(GS_Error) as exc_info:
rest_method(**rest_args)
assert exc_info.value.data['status_code'] == 400

0 comments on commit 1f4a91f

Please sign in to comment.