Skip to content

Commit

Permalink
Test access rights application on WFS-T queries
Browse files Browse the repository at this point in the history
  • Loading branch information
index-git committed Jan 3, 2024
1 parent f9ca94c commit 4350399
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 65 deletions.
57 changes: 0 additions & 57 deletions src/layman/geoserver_proxy_test.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,10 @@
import pytest

from geoserver.error import Error as GS_Error
from layman import app
from layman.layer import db
from test_tools import process_client
from test_tools.data import wfs as data_wfs
from test_tools.process_client import get_authz_headers
from . import geoserver_proxy


@pytest.mark.usefixtures('ensure_layman', 'oauth2_provider_mock')
def test_missing_attribute_authz():
username = 'testmissingattr_authz'
layername1 = 'testmissingattr_authz_layer'
username2 = 'testmissingattr_authz2'

authn_headers1 = get_authz_headers(username)
authn_headers2 = get_authz_headers(username2)

def do_test(wfs_query, attribute_names):
# Test, that unauthorized user will not cause new attribute
with app.app_context():
old_db_attributes = db.get_internal_table_all_column_names(username, layername1)
for attr_name in attribute_names:
assert attr_name not in old_db_attributes, f"old_db_attributes={old_db_attributes}, attr_name={attr_name}"
with pytest.raises(GS_Error) as exc:
process_client.post_wfst(wfs_query, headers=authn_headers2, workspace=username)
assert exc.value.data['status_code'] == 400

with app.app_context():
new_db_attributes = db.get_internal_table_all_column_names(username, layername1)
for attr_name in attribute_names:
assert attr_name not in new_db_attributes, f"new_db_attributes={new_db_attributes}, attr_name={attr_name}"

# Test, that authorized user will cause new attribute
process_client.post_wfst(wfs_query, headers=authn_headers1, workspace=username)
with app.app_context():
new_db_attributes = db.get_internal_table_all_column_names(username, layername1)
for attr_name in attribute_names:
assert attr_name in new_db_attributes, f"new_db_attributes={new_db_attributes}, attr_name={attr_name}"

process_client.reserve_username(username, headers=authn_headers1)
process_client.publish_workspace_layer(username,
layername1,
file_paths=['tmp/naturalearth/110m/cultural/ne_110m_admin_0_countries.geojson', ],
headers=authn_headers1)

# Testing, that user2 is not able to write to layer of user1
process_client.reserve_username(username2, headers=authn_headers2)

# INSERT
attr_names = ['inexisting_attribute_auth1', 'inexisting_attribute_auth2']
data_xml = data_wfs.get_wfs20_insert_points_new_attr(username, layername1, attr_names)
do_test(data_xml, attr_names)

# UPDATE
attr_names = ['inexisting_attribute_auth3', 'inexisting_attribute_auth4']
data_xml = data_wfs.get_wfs20_update_points_new_attr(username, layername1, attr_names)
do_test(data_xml, attr_names)

process_client.delete_workspace_layer(username, layername1, headers=authn_headers1)


@pytest.mark.parametrize('wfst_data_method, hardcoded_attrs', [
pytest.param(
data_wfs.get_wfs20_insert_points_new_attr,
Expand Down
13 changes: 13 additions & 0 deletions test_tools/process_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,19 @@ def post_wfst(xml, *, headers=None, url=None, workspace=None):
raise gs_error.Error(code_or_message='WFS-T error', data={'status_code': response.status_code})


def post_wfst_with_xml_getter(workspace, layer, *, xml_getter, actor_name=None, xml_getter_params=None):
xml_getter_params = xml_getter_params or {}
headers = {}
if actor_name:
assert TOKEN_HEADER not in headers

if actor_name and actor_name != settings.ANONYM_USER:
headers.update(get_authz_headers(actor_name))

xml = xml_getter(workspace, layer, **xml_getter_params)
post_wfst(xml, headers=headers, workspace=workspace)


def check_publication_status(response):
try:
current_status = response.json().get('layman_metadata', {}).get('publication_status')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import inspect
import pytest

from geoserver.error import Error as GS_Error
from layman import settings, LaymanError
from test_tools import process_client, role_service as role_service_util
from test_tools.data import wfs
from tests import Publication, EnumTestTypes
from tests.asserts.final.publication import geoserver_proxy
from tests.dynamic_data import base_test
Expand Down Expand Up @@ -37,6 +39,12 @@
(geoserver_proxy.workspace_wfs_2_0_0_capabilities_available_if_vector, {}),
]

WFST_METHODS_TO_TEST = [
(process_client.post_wfst_with_xml_getter, {'xml_method': wfs.get_wfs20_insert_lines}),
(process_client.post_wfst_with_xml_getter, {'xml_method': wfs.get_wfs20_insert_points_new_attr, 'xml_method_params': {'attr_names': ['inexisting_attribute_auth1', 'inexisting_attribute_auth2']}}),
(process_client.post_wfst_with_xml_getter, {'xml_method': wfs.get_wfs20_update_points_new_attr, 'xml_method_params': {'attr_names': ['inexisting_attribute_auth3', 'inexisting_attribute_auth4']}}),
]


def pytest_generate_tests(metafunc):
# https://docs.pytest.org/en/6.2.x/parametrize.html#pytest-generate-tests
Expand Down Expand Up @@ -72,7 +80,9 @@ def pytest_generate_tests(metafunc):
)


def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test):
def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_test, *, args_to_id_fnc=None):
args_to_id_fnc = args_to_id_fnc or 'keys'
assert args_to_id_fnc in ['keys', 'values_name']
all_args = {
'workspace': publication.workspace,
'name': publication.name,
Expand All @@ -82,7 +92,9 @@ def add_publication_test_cases_to_list(tc_list, publication, user, endpoints_to_
'publ_type': publication.type,
}
for method, args in endpoints_to_test:
pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}{("__" + next(iter(args.keys()))) if args else ""}'

args_pytest_id = ("__" + (next(iter(args.keys())) if args_to_id_fnc == 'keys' else next(iter(args.values())).__name__)) if args else ""
pytest_id = f'{method.__name__}__{user.split("_")[-1]}__{publication.name[5:]}{args_pytest_id}'
method_args = inspect.getfullargspec(method).args + inspect.getfullargspec(method).kwonlyargs

test_case = base_test.TestCaseType(pytest_id=pytest_id,
Expand Down Expand Up @@ -146,21 +158,21 @@ def generate_multiendpoint_test_cases(publications_user_can_read, workspace, ):
return tc_list


def generate_positive_geoserver_test_cases(publications_user_can_read):
def generate_positive_geoserver_test_cases(publications_user_can_read, methods_to_test):
tc_list = []
for user, publications in publications_user_can_read.items():
for publication in publications:
if publication.type == process_client.LAYER_TYPE:
add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST)
add_publication_test_cases_to_list(tc_list, publication, user, methods_to_test, args_to_id_fnc='values_name')
return tc_list


def generate_geoserver_negative_test_cases(publications_user_can_read, publication_all):
def generate_geoserver_negative_test_cases(publications_user_can_read, publication_all, methods_to_test):
tc_list = []
for user, available_publications in publications_user_can_read.items():
for publication in publication_all:
if publication not in available_publications and publication.type == process_client.LAYER_TYPE:
add_publication_test_cases_to_list(tc_list, publication, user, GEOSERVER_METHODS_TO_TEST)
add_publication_test_cases_to_list(tc_list, publication, user, methods_to_test, args_to_id_fnc='values_name')
return tc_list


Expand Down Expand Up @@ -216,10 +228,11 @@ class TestAccessRights:
}

test_cases = {
'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER),
'test_single_positive': generate_positive_test_cases(PUBLICATIONS_BY_USER) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER, GEOSERVER_METHODS_TO_TEST) + generate_positive_geoserver_test_cases(PUBLICATIONS_BY_USER, WFST_METHODS_TO_TEST),
'test_single_negative': generate_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS),
'test_multiendpoint': generate_multiendpoint_test_cases(PUBLICATIONS_BY_USER, OWNER),
'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS),
'test_geoserver_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS, GEOSERVER_METHODS_TO_TEST),
'test_wfst_negative': generate_geoserver_negative_test_cases(PUBLICATIONS_BY_USER, PUBLICATIONS, WFST_METHODS_TO_TEST),
}

@pytest.fixture(scope='class', autouse=True)
Expand Down Expand Up @@ -264,3 +277,8 @@ def test_geoserver_negative(self, rest_method, rest_args, ):
with pytest.raises(AssertionError) as exc_info:
rest_method(**rest_args)
assert exc_info.value.args[0].startswith('Layer not found in Capabilities.')

def test_wfst_negative(self, rest_method, rest_args, ):
with pytest.raises(GS_Error) as exc_info:
rest_method(**rest_args)
assert exc_info.value.data['status_code'] == 400

0 comments on commit 4350399

Please sign in to comment.