diff --git a/CHANGES.rst b/CHANGES.rst index 78c07570..fd1b299a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -11,12 +11,18 @@ Changes Features / Changes ~~~~~~~~~~~~~~~~~~~~~ +* Add CLI helper ``batch_update_permissions`` that allows registering one or more `Permission` configuration files + against a running `Magpie` instance. * Security fix: bump Docker base ``python:3.11-alpine3.19``. * Update ``authomatic[OpenID]==1.3.0`` to resolve temporary workarounds (relates to `authomatic/authomatic#195 `_ and `authomatic/authomatic#233 `_, fixes `#583 `_). +Bug Fixes +~~~~~~~~~ +* Fix `Permission` update from configuration file using the ``requests`` code path. + .. _changes_4.0.0: `4.0.0 `_ (2024-04-26) diff --git a/docs/conf.py b/docs/conf.py index 25063df0..66d768da 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -211,6 +211,8 @@ def ignore_down_providers(): # ignore false-positive broken links to local doc files used for rendering on GitHub "CHANGES.rst", r"docs/\w+.rst", + "https://wso2.com/", # sporadic broken (probably robots or similar) + "https://docs.wso2.com/*", "https://pcmdi.llnl.gov/", # works, but very often causes false-positive 'broken' links ] + ignore_down_providers() linkcheck_anchors_ignore = [ diff --git a/magpie/cli/batch_update_permissions.py b/magpie/cli/batch_update_permissions.py new file mode 100644 index 00000000..a020b3de --- /dev/null +++ b/magpie/cli/batch_update_permissions.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +""" +Magpie helper to create or delete a set of permissions. + +When parsing permissions to create, any underlying user, group, or intermediate resource +that are missing, but that can be resolved with reasonable defaults or with an explicit +definition in the configuration file, will be dynamically created prior to setting the +corresponding permission on it. All referenced services should exist beforehand. Consider +using the 'register_providers' utility to register services beforehand as needed. + +See https://pavics-magpie.readthedocs.io/en/latest/configuration.html#file-permissions-cfg for more details. +""" +import argparse +from typing import TYPE_CHECKING + +from magpie.cli.utils import make_logging_options, setup_logger_from_options +from magpie.register import magpie_register_permissions_from_config +from magpie.utils import get_logger + +if TYPE_CHECKING: + from typing import Any, Optional, Sequence + +LOGGER = get_logger(__name__, + message_format="%(asctime)s - %(levelname)s - %(message)s", + datetime_format="%d-%b-%y %H:%M:%S", force_stdout=False) + +ERROR_PARAMS = 2 +ERROR_EXEC = 1 + + +def make_parser(): + # type: () -> argparse.ArgumentParser + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("-u", "--url", "--magpie-url", help=( + "URL used to access the magpie service (if omitted, will try using 'MAGPIE_URL' environment variable)." + )) + parser.add_argument("-U", "--username", "--magpie-admin-user", help=( + "Admin username for magpie login (if omitted, will try using 'MAGPIE_ADMIN_USER' environment variable)." + )) + parser.add_argument("-P", "--password", "--magpie-admin-password", help=( + "Admin password for magpie login (if omitted, will try using 'MAGPIE_ADMIN_PASSWORD' environment variable)." + )) + parser.add_argument("-c", "--config", required=True, nargs="+", action="append", help=( + "Path to a single configuration file or a directory containing configuration file that contains permissions. " + "The option can be specified multiple times to provide multiple lookup directories or specific files to load. " + "Configuration files must be in JSON or YAML format, with their respective extensions, or the '.cfg' extension." + )) + make_logging_options(parser) + return parser + + +def main(args=None, parser=None, namespace=None): + # type: (Optional[Sequence[str]], Optional[argparse.ArgumentParser], Optional[argparse.Namespace]) -> Any + if not parser: + parser = make_parser() + ns = parser.parse_args(args=args, namespace=namespace) + setup_logger_from_options(LOGGER, ns) + + all_configs = [cfg for cfg_args in ns.config for cfg in cfg_args] + try: + for config in all_configs: + LOGGER.info("Processing: [%s]", config) + magpie_register_permissions_from_config(config) + except Exception as exc: + LOGGER.error("Failed permissions parsing and update from specified configurations [%s].", str(exc)) + return ERROR_EXEC + return 0 + + +if __name__ == "__main__": + main() diff --git a/magpie/constants.py b/magpie/constants.py index fc183b2c..d05ae7e4 100644 --- a/magpie/constants.py +++ b/magpie/constants.py @@ -206,7 +206,7 @@ def get_constant(constant_name, # type: Str 5. search in environment variables Parameter :paramref:`constant_name` is expected to have the format ``MAGPIE_[VARIABLE_NAME]`` although any value can - be passed to retrieve generic settings from all above mentioned search locations. + be passed to retrieve generic settings from all above-mentioned search locations. If :paramref:`settings_name` is provided as alternative name, it is used as is to search for results if :paramref:`constant_name` was not found. Otherwise, ``magpie.[variable_name]`` is used for additional search when diff --git a/magpie/register.py b/magpie/register.py index 2c56b92a..06dfc86e 100644 --- a/magpie/register.py +++ b/magpie/register.py @@ -5,7 +5,7 @@ import subprocess # nosec import time from tempfile import NamedTemporaryFile -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, overload import requests import six @@ -56,10 +56,12 @@ AnyCookiesType, AnyResolvedSettings, AnyResponseType, + AnySettingsContainer, CombinedConfig, CookiesOrSessionType, GroupsConfig, GroupsSettings, + Literal, MultiConfigs, PermissionConfigItem, PermissionsConfig, @@ -67,7 +69,8 @@ ServicesSettings, Str, UsersConfig, - UsersSettings + UsersSettings, + WebhooksConfig ) @@ -572,6 +575,36 @@ def _load_config(path_or_dict, section, allow_missing=False): CONFIG_KNOWN_EXTENSIONS = frozenset([".cfg", ".json", ".yml", ".yaml"]) +@overload +def get_all_configs(path_or_dict, section, allow_missing=False): + # type: (Union[Str, CombinedConfig], Literal["groups"], bool) -> GroupsConfig + ... + + +@overload +def get_all_configs(path_or_dict, section, allow_missing=False): + # type: (Union[Str, CombinedConfig], Literal["users"], bool) -> UsersConfig + ... + + +@overload +def get_all_configs(path_or_dict, section, allow_missing=False): + # type: (Union[Str, CombinedConfig], Literal["permissions"], bool) -> PermissionsConfig + ... + + +@overload +def get_all_configs(path_or_dict, section, allow_missing=False): + # type: (Union[Str, CombinedConfig], Literal["services"], bool) -> ServicesConfig + ... + + +@overload +def get_all_configs(path_or_dict, section, allow_missing=False): + # type: (Union[Str, CombinedConfig], Literal["webhooks"], bool) -> WebhooksConfig + ... + + def get_all_configs(path_or_dict, section, allow_missing=False): # type: (Union[Str, CombinedConfig], Str, bool) -> MultiConfigs """ @@ -776,10 +809,9 @@ def _parse_resource_path(permission_config_entry, # type: PermissionConfigItem res_path = None if _use_request(cookies_or_session): - res_path = get_magpie_url() + ServiceResourcesAPI.path.format(service_name=svc_name) + res_path = magpie_url + ServiceResourcesAPI.path.format(service_name=svc_name) res_resp = requests.get(res_path, cookies=cookies_or_session, timeout=5) - svc_json = get_json(res_resp)[svc_name] # type: JSON - res_dict = svc_json["resources"] + res_dict = get_json(res_resp)[svc_name] # type: JSON else: from magpie.api.management.service.service_formats import format_service_resources svc = models.Service.by_service_name(svc_name, db_session=cookies_or_session) @@ -860,16 +892,16 @@ def _apply_request(_usr_name=None, _grp_name=None): Apply operation using HTTP request. """ action_oper = None - if usr_name: - action_oper = UserResourcePermissionsAPI.format(user_name=_usr_name, resource_id=resource_id) - if grp_name: - action_oper = GroupResourcePermissionsAPI.format(group_name=_grp_name, resource_id=resource_id) + if _usr_name: + action_oper = UserResourcePermissionsAPI.path.format(user_name=_usr_name, resource_id=resource_id) + if _grp_name: + action_oper = GroupResourcePermissionsAPI.path.format(group_name=_grp_name, resource_id=resource_id) if not action_oper: return None action_func = requests.post if create_perm else requests.delete action_body = {"permission": perm.json()} action_path = "{url}{path}".format(url=magpie_url, path=action_oper) - action_resp = action_func(action_path, json=action_body, cookies=cookies_or_session) + action_resp = action_func(action_path, json=action_body, cookies=cookies_or_session, timeout=5) return action_resp def _apply_session(_usr_name=None, _grp_name=None): @@ -921,10 +953,10 @@ def _apply_profile(_usr_name=None, _grp_name=None): if _use_request(cookies_or_session): if _usr_name: path = "{url}{path}".format(url=magpie_url, path=UsersAPI.path) - return requests.post(path, json=usr_data, timeout=5) + return requests.post(path, json=usr_data, cookies=cookies_or_session, timeout=5) if _grp_name: path = "{url}{path}".format(url=magpie_url, path=GroupsAPI.path) - return requests.post(path, json=grp_data, timeout=5) + return requests.post(path, json=grp_data, cookies=cookies_or_session, timeout=5) else: if _usr_name: from magpie.api.management.user.user_utils import create_user @@ -988,13 +1020,19 @@ def _validate_response(operation, is_create, item_type="Permission"): _validate_response(lambda: _apply_session(usr_name, None), is_create=create_perm) -def magpie_register_permissions_from_config(permissions_config, magpie_url=None, db_session=None, raise_errors=False): - # type: (Union[Str, PermissionsConfig], Optional[Str], Optional[Session], bool) -> None +def magpie_register_permissions_from_config( + permissions_config, # type: Union[Str, PermissionsConfig] + settings=None, # type: Optional[AnySettingsContainer] + db_session=None, # type: Optional[Session] + raise_errors=False, # type: bool +): # type: (...) -> None """ Applies `permissions` specified in configuration(s) defined as file, directory with files or literal configuration. :param permissions_config: file/dir path to `permissions` config or JSON/YAML equivalent pre-loaded. - :param magpie_url: URL to magpie instance (when using requests; default: `magpie.url` from this app's config). + :param settings: Magpie settings to resolve an instance session when using requests instead of DB session. + Will look for ``magpie.url``, ``magpie.admin_user`` and ``magpie.admin_password`` by default, or any + corresponding environment variable resolution if omitted in the settings. :param db_session: db session to use instead of requests to directly create/remove permissions with config. :param raise_errors: raises errors related to permissions, instead of just logging the info. @@ -1003,9 +1041,9 @@ def magpie_register_permissions_from_config(permissions_config, magpie_url=None, """ LOGGER.info("Starting permissions processing.") + magpie_url = None if _use_request(db_session): - magpie_url = magpie_url or get_magpie_url() - settings = {"magpie.url": magpie_url} + magpie_url = get_magpie_url(settings) LOGGER.debug("Editing permissions using requests to [%s]...", magpie_url) err_msg = "Invalid credentials to register Magpie permissions." cookies_or_session = get_admin_cookies(settings, raise_message=err_msg) @@ -1014,19 +1052,36 @@ def magpie_register_permissions_from_config(permissions_config, magpie_url=None, cookies_or_session = db_session LOGGER.debug("Loading configurations.") - permissions = get_all_configs(permissions_config, "permissions") # type: List[PermissionsConfig] + if isinstance(permissions_config, list): + permissions = [permissions_config] + else: + permissions = get_all_configs(permissions_config, "permissions") perms_cfg_count = len(permissions) LOGGER.log(logging.INFO if perms_cfg_count else logging.WARNING, "Found %s permissions configurations.", perms_cfg_count) users_settings = groups_settings = None if perms_cfg_count: - users = get_all_configs(permissions_config, "users", allow_missing=True) # type: List[UsersConfig] - groups = get_all_configs(permissions_config, "groups", allow_missing=True) # type: List[GroupsConfig] + if isinstance(permissions_config, str): + users = get_all_configs(permissions_config, "users", allow_missing=True) + else: + users = [] + if isinstance(permissions_config, str): + groups = get_all_configs(permissions_config, "groups", allow_missing=True) + else: + groups = [] users_settings = _resolve_config_registry(users, "username") or {} groups_settings = _resolve_config_registry(groups, "name") or {} for i, perms in enumerate(permissions): LOGGER.info("Processing permissions from configuration (%s/%s).", i + 1, perms_cfg_count) - _process_permissions(perms, magpie_url, cookies_or_session, users_settings, groups_settings, raise_errors) + _process_permissions( + perms, + magpie_url, + cookies_or_session, + users_settings, + groups_settings, + settings, + raise_errors, + ) LOGGER.info("All permissions processed.") @@ -1055,8 +1110,15 @@ def _resolve_config_registry(config_files, key): return config_map -def _process_permissions(permissions, magpie_url, cookies_or_session, users=None, groups=None, raise_errors=False): - # type: (PermissionsConfig, Str, Session, Optional[UsersSettings], Optional[GroupsSettings], bool) -> None +def _process_permissions( + permissions, # type: PermissionsConfig + magpie_url, # type: Str + cookies_or_session, # type: Session + users=None, # type: Optional[UsersSettings] + groups=None, # type: Optional[GroupsSettings] + settings=None, # type: Optional[AnySettingsContainer] + raise_errors=False, # type: bool +): # type: (...) -> None """ Processes a single `permissions` configuration. """ @@ -1064,7 +1126,7 @@ def _process_permissions(permissions, magpie_url, cookies_or_session, users=None LOGGER.warning("Permissions configuration are empty.") return - anon_user = get_constant("MAGPIE_ANONYMOUS_USER") + anon_user = get_constant("MAGPIE_ANONYMOUS_USER", settings) perm_count = len(permissions) LOGGER.log(logging.INFO if perm_count else logging.WARNING, "Found %s permissions to evaluate from configuration.", perm_count) @@ -1103,7 +1165,8 @@ def _process_permissions(permissions, magpie_url, cookies_or_session, users=None if svc_resp.status_code != 200: _handle_permission("Unknown service [{!s}]".format(svc_name), i, raise_errors=raise_errors) continue - service_info = get_json(svc_resp)[svc_name] + service_json = get_json(svc_resp) + service_info = service_json.get(svc_name) or service_json.get("service") # format depends on magpie version else: transaction.commit() # force any pending transaction to be applied to find possible dependencies svc = models.Service.by_service_name(svc_name, db_session=cookies_or_session) diff --git a/setup.cfg b/setup.cfg index 4143995e..1e548a24 100644 --- a/setup.cfg +++ b/setup.cfg @@ -95,6 +95,8 @@ exclude_lines = LOGGER.error LOGGER.exception LOGGER.log + @overload + ... [tool:pytest] addopts = diff --git a/setup.py b/setup.py index cc5f73ee..b922df83 100644 --- a/setup.py +++ b/setup.py @@ -247,6 +247,7 @@ def _extra_requirements(base_requirements, other_requirements): "console_scripts": [ "magpie_cli = magpie.cli:magpie_helper_cli", # redirect to others below "magpie_helper = magpie.cli:magpie_helper_cli", # alias to helper + "magpie_batch_update_permissions = magpie.cli.batch_update_permissions:main", "magpie_batch_update_users = magpie.cli.batch_update_users:main", "magpie_register_defaults = magpie.cli.register_defaults:main", "magpie_register_providers = magpie.cli.register_providers:main", diff --git a/tests/interfaces.py b/tests/interfaces.py index 04e5b054..e0959709 100644 --- a/tests/interfaces.py +++ b/tests/interfaces.py @@ -2780,7 +2780,7 @@ def test_GetUserResourcePermissions_MultipleGroupPermissions(self): [(effect_perm1_deny, grp1_reason), (effect_perm2_deny, PERMISSION_REASON_DEFAULT)] ) - # apply allow user permission on parent service (on level above, not same resource as previous tests) + # apply 'allow' user permission on parent service (on level above, not same resource as previous tests) # allow user permission takes priority over deny from second group, but only during effective resolution # even if second group deny still exists, the user allow permission takes priority as it is more specific # during check of local inherited permissions (no recursive considered), second group deny remains the result diff --git a/tests/test_magpie_cli.py b/tests/test_magpie_cli.py index 11a313cd..f063c38d 100644 --- a/tests/test_magpie_cli.py +++ b/tests/test_magpie_cli.py @@ -7,26 +7,33 @@ Tests for :mod:`magpie.cli` module. """ - +import contextlib import json import os import subprocess import tempfile +from typing import TYPE_CHECKING +from urllib.parse import urlparse import mock import six -from magpie.cli import batch_update_users, magpie_helper_cli +from magpie.cli import batch_update_permissions, batch_update_users, magpie_helper_cli from magpie.constants import get_constant +from magpie.permissions import Permission, PermissionType +from magpie.services import ServiceAPI, ServiceWPS from tests import runner, utils if six.PY2: from backports import tempfile as tempfile2 # noqa # pylint: disable=E0611,no-name-in-module # Python 2 else: tempfile2 = tempfile # pylint: disable=C0103,invalid-name +if TYPE_CHECKING: + from magpie.typedefs import JSON KNOWN_HELPERS = [ "batch_update_users", + "batch_update_permissions", "register_defaults", "register_providers", "run_db_migration", @@ -82,6 +89,138 @@ def test_magpie_helper_as_python(): raise AssertionError("expected exit code on help call not raised") +@runner.MAGPIE_TEST_CLI +@runner.MAGPIE_TEST_LOCAL +def test_magpie_batch_update_permissions_help_via_magpie_helper(): + out_lines = run_and_get_output("magpie_helper batch_update_permissions --help") + assert "usage: magpie_helper batch_update_permissions" in out_lines[0] + assert "Magpie helper to create or delete a set of permissions." in out_lines[1] + + +@runner.MAGPIE_TEST_CLI +@runner.MAGPIE_TEST_LOCAL +def test_magpie_batch_update_permissions_help_directly(): + out_lines = run_and_get_output("magpie_batch_update_permissions --help") + assert "usage: magpie_batch_update_permissions" in out_lines[0] + assert "Magpie helper to create or delete a set of permissions." in out_lines[1] + + +@runner.MAGPIE_TEST_CLI +@runner.MAGPIE_TEST_LOCAL +def test_magpie_batch_update_permissions_command(): + test_admin_usr = get_constant("MAGPIE_TEST_ADMIN_USERNAME", raise_not_set=False, raise_missing=False) + test_admin_pwd = get_constant("MAGPIE_TEST_ADMIN_PASSWORD", raise_not_set=False, raise_missing=False) + if not test_admin_usr or not test_admin_pwd: + test_admin_usr = get_constant("MAGPIE_ADMIN_USER") + test_admin_pwd = get_constant("MAGPIE_ADMIN_PASSWORD") + + test_grp = "unittest-batch-update-perms-cmd-group" + test_usr = "unittest-batch-update-perms-cmd-user" + test_api = "unittest-batch-update-perms-cmd-api" + test_wps = "unittest-batch-update-perms-cmd-wps" + test_url = "http://localhost" + test_app = utils.get_test_magpie_app() + test_args = [ + "-u", test_url, + "-U", test_admin_usr, + "-P", test_admin_pwd, + ] + + def mock_request(method, url, *args, **kwargs): + _path = urlparse(url).path + # because CLI utility does multiple login tests, we must force TestApp logout to forget session + # otherwise, error is raised because of user session mismatch between previous login and new one requested + if _path.startswith("/signin"): + utils.check_or_try_logout_user(test_app) + return utils.test_request(test_app, method, _path, *args, **kwargs) + + _, test_admin_cookies = utils.check_or_try_login_user(test_app, username=test_admin_usr, password=test_admin_pwd) + + # cleanup in case of previous run + utils.TestSetup.delete_TestUser(test_app, override_user_name=test_usr, override_cookies=test_admin_cookies) + utils.TestSetup.delete_TestGroup(test_app, override_group_name=test_grp, override_cookies=test_admin_cookies) + utils.TestSetup.delete_TestService(test_app, override_service_name=test_api, override_cookies=test_admin_cookies) + utils.TestSetup.delete_TestService(test_app, override_service_name=test_wps, override_cookies=test_admin_cookies) + + utils.TestSetup.create_TestService(test_app, + override_service_name=test_api, + override_service_type=ServiceAPI.service_type) + utils.TestSetup.create_TestService(test_app, + override_service_name=test_wps, + override_service_type=ServiceWPS.service_type) + + def run_command(__operation_name, operation_args): + with mock.patch("requests.Session.request", side_effect=mock_request): + with mock.patch("requests.request", side_effect=mock_request): + err_code = batch_update_permissions.main(operation_args) + assert err_code == 0, "failed execution due to invalid arguments" + + with contextlib.ExitStack() as stack: + tmp_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".json")) + perms1 = { + "permissions": [ + { + "group": test_grp, + "service": test_api, + "resource": "/v1", + "permission": Permission.READ.value, + "action": "create", + } + ] + } + tmp_file.write(json.dumps(perms1)) + tmp_file.flush() + tmp_file.seek(0) + test_args += ["-c", tmp_file.name] + + tmp_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".json")) + perms2 = { + "permissions": [ + { + "user": test_usr, + "service": test_wps, + "permission": Permission.GET_CAPABILITIES.value, + "action": "create", + } + ] + } + tmp_file.write(json.dumps(perms2)) + tmp_file.flush() + tmp_file.seek(0) + test_args += ["-c", tmp_file.name] + + test_args += ["--verbose"] + run_command("batch_update_permissions", test_args) + + path = f"/groups/{test_grp}/resources" + resp = utils.test_request(test_app, "GET", path) + body = utils.check_response_basic_info(resp) + res_api = body["resources"][ServiceAPI.service_type][test_api] # type: JSON + res_wps = body["resources"][ServiceWPS.service_type][test_wps] # type: JSON + perms_res_api = res_api["permissions"] + perms_res_wps = res_wps["permissions"] + utils.check_val_equal(len(perms_res_wps), 0) + utils.check_val_equal(len(perms_res_api), 0, msg="Permission should not be created directly on the service.") + res_res_api = res_api["resources"] + utils.check_val_equal(len(res_res_api), 1, msg="Resource should have been created dynamically.") + perms_sub_api = res_res_api[list(res_res_api)[0]]["permissions"] # type: JSON + utils.check_val_equal(len(perms_sub_api), 1) + utils.check_val_equal(perms_sub_api[0]["name"], Permission.READ.value) + utils.check_val_equal(perms_sub_api[0]["type"], PermissionType.INHERITED.value) + + path = f"/users/{test_usr}/resources" + resp = utils.test_request(test_app, "GET", path) + body = utils.check_response_basic_info(resp) + res_api = body["resources"][ServiceAPI.service_type][test_api] + res_wps = body["resources"][ServiceWPS.service_type][test_wps] + perms_res_api = res_api["permissions"] + perms_res_wps = res_wps["permissions"] + utils.check_val_equal(len(perms_res_api), 0) + utils.check_val_equal(len(perms_res_wps), 1) + utils.check_val_equal(perms_res_wps[0]["name"], Permission.GET_CAPABILITIES.value) + utils.check_val_equal(perms_res_wps[0]["type"], PermissionType.DIRECT.value) + + @runner.MAGPIE_TEST_CLI @runner.MAGPIE_TEST_LOCAL def test_magpie_batch_update_users_help_via_magpie_helper(): diff --git a/tests/test_register.py b/tests/test_register.py index 6036336f..6f80540c 100644 --- a/tests/test_register.py +++ b/tests/test_register.py @@ -524,18 +524,21 @@ def test_register_process_permissions_from_multiple_files(): json.dump(cfg2, cfg2_file) with utils.wrapped_call("magpie.register._process_permissions") as mock_process_perms: with utils.wrapped_call("magpie.register.get_admin_cookies", side_effect=lambda *_, **__: {}): - register.magpie_register_permissions_from_config(tmpdir, "http://dontcare.com") + register.magpie_register_permissions_from_config( + tmpdir, + settings={"magpie.url": "https://dontcare.com"}, + ) assert mock_process_perms.call_count == 2 expect_users = {"usr1": cfg1["users"][0], "usr2": cfg1["users"][1], "usr3": cfg2["users"][0]} expect_groups = {"grp1": cfg1["groups"][0], "grp2": cfg1["groups"][1]} - perms, _, _, users, groups, _ = mock_process_perms.call_args_list[0].args + perms, _, _, users, groups, _, _ = mock_process_perms.call_args_list[0].args assert perms == cfg1["permissions"] assert users == expect_users assert groups == expect_groups - perms, _, _, users, groups, _ = mock_process_perms.call_args_list[1].args + perms, _, _, users, groups, _, _ = mock_process_perms.call_args_list[1].args assert perms == cfg2["permissions"] assert users == expect_users assert groups == expect_groups diff --git a/tests/utils.py b/tests/utils.py index e8cd192d..568dc6d0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -193,7 +193,7 @@ def test_func(): @functools.wraps(run_option) def wrap(test_func, *_, **__): # type: (Callable, *Any, **Any) -> Callable - pytest_marker = getattr(pytest.mark, run_option.marker) + pytest_marker = getattr(pytest.mark, run_option.marker) # type: ignore unittest_skip = unittest.skipUnless(*run_option()) test_func = pytest_marker(test_func) test_func = unittest_skip(test_func) @@ -583,7 +583,7 @@ def get_headers(app_or_url, header_dict): Obtains stored headers in the class implementation. """ if isinstance(app_or_url, TestApp): - return header_dict.items() + return dict(header_dict.items()) return header_dict @@ -900,7 +900,7 @@ class MockUser(object): except Exception as exc: raise AssertionError("Expected 'TestFakeConnectError' from mocked 'send_email' during connection, " "but other exception was raised: {!r}".format(exc)) - raise AssertionError("Expected 'send_email' mock but it was not captured as intended.") + raise AssertionError("Expected 'send_email' mock, but it was not captured as intended.") # Run the test - full user registration procedure! with wrapped_call("magpie.api.notifications.get_smtp_server_connection", side_effect=fake_connect) as wrapped_conn: @@ -1111,7 +1111,7 @@ def test_request(test_item, # type: AnyMagpieTestItemType kwargs["content_type"] = CONTENT_TYPE_JSON # enforce if only 'json' keyword provided # always recalculate in case it was fixed with override JSON payload, or simply incorrect value provided kwargs["headers"]["Content-Length"] = str(len(kwargs["params"])) - if status and status >= 300: + if isinstance(status, int) and status >= 300: kwargs["expect_errors"] = True # cleanup unknown parameters @@ -1194,7 +1194,7 @@ def get_session_user(app_or_url, headers=None): else: resp = requests.get("{}/session".format(app_or_url), headers=headers) if resp.status_code != 200: - raise Exception("cannot retrieve logged in user information") + raise Exception("cannot retrieve logged-in user information") return resp @@ -1299,7 +1299,7 @@ def _is_logged_out(): raise Exception("logout did not succeed" + msg) -def create_or_assign_user_group_with_terms(test_case, # type: AnyMagpieTestCaseType +def create_or_assign_user_group_with_terms(test_case, # type: AnyMagpieTestItemType path, # type: Str data, # type: Union[JSON, Str] headers, # type: HeadersType @@ -1898,7 +1898,7 @@ class TestSetup(object): @staticmethod def get_Version(test_case, real_version=False, override_headers=null, override_cookies=null): - # type: (AnyMagpieTestCaseType, bool, Optional[HeadersType], Optional[CookiesType]) -> Str + # type: (AnyMagpieTestItemType, bool, Optional[HeadersType], Optional[CookiesType]) -> Str """ Obtains the `Magpie` version of the test instance (local or remote). This version can then be used in combination with :class:`TestVersion` comparisons or :func:`warn_version` to toggle test execution of certain @@ -1936,7 +1936,7 @@ def get_Version(test_case, real_version=False, override_headers=null, override_c return json_body["version"] @staticmethod - def check_UpStatus(test_case, # type: TestAppOrUrlType + def check_UpStatus(test_case, # type: AnyMagpieTestItemType method, # type: Str path, # type: Str override_headers=null, # type: Optional[HeadersType] @@ -1979,7 +1979,7 @@ def check_UpStatus(test_case, # type: TestAppOrUrlType return resp @staticmethod - def check_FormSubmit(test_case, # type: AnyMagpieTestCaseType + def check_FormSubmit(test_case, # type: AnyMagpieTestItemType form_match, # type: Union[Str, int, Dict[Str, Str], Form] form_data=None, # type: Optional[FormSearch] form_submit="submit", # type: Union[Str, int] @@ -2072,16 +2072,17 @@ def check_FormSubmit(test_case, # type: AnyMagpieTestCas @staticmethod def check_Unauthorized(test_case, method, path, expected_type=CONTENT_TYPE_JSON, override_cookies=null): - # type: (AnyMagpieTestCaseType, Str, Str, Str, Optional[CookiesType]) -> Union[JSON, Str] + # type: (AnyMagpieTestItemType, Str, Str, Str, Optional[CookiesType]) -> Union[JSON, Str] """ Verifies that Magpie returned an Unauthorized response. Validates that at the bare minimum, no underlying internal error occurred from the API or UI calls. """ app_or_url = get_app_or_url(test_case) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) resp = test_request(app_or_url, method, path, headers={"Accept": expected_type}, - cookies=override_cookies if override_cookies is not null else test_case.cookies, + cookies=cookies, expect_errors=True) if path.startswith("/ui"): check_ui_response_basic_info(resp, expected_code=401, expected_type=expected_type) @@ -2091,7 +2092,7 @@ def check_Unauthorized(test_case, method, path, expected_type=CONTENT_TYPE_JSON, return check_response_basic_info(resp, expected_code=401, expected_type=expected_type, expected_method=method) @staticmethod - def check_ResourceStructure(test_case, # type: AnyMagpieTestCaseType + def check_ResourceStructure(test_case, # type: AnyMagpieTestItemType body, # type: JSON resource_name, # type: Str resource_type, # type: Str @@ -2129,7 +2130,7 @@ def check_ResourceStructure(test_case, # type: AnyMagpieTes check_val_type(body["resource_id"], int) @staticmethod - def check_ResourceChildren(test_case, # type: AnyMagpieTestCaseType + def check_ResourceChildren(test_case, # type: AnyMagpieTestItemType resource_children, # type: JSON parent_resource_id, # type: int root_service_id, # type: int @@ -2147,7 +2148,7 @@ def check_ResourceChildren(test_case, # type: AnyMagpieTestCaseType check_val_type(resource_children, dict) for resource_id in resource_children: check_val_type(resource_id, six.string_types) - resource_int_id = int(resource_id) # should by an 'int' string, no error raised + resource_int_id = int(resource_id) # should be an 'int' string, no error raised resource_info = resource_children[resource_id] # type: JSON check_val_is_in("root_service_id", resource_info) check_val_type(resource_info["root_service_id"], int) @@ -2168,7 +2169,7 @@ def check_ResourceChildren(test_case, # type: AnyMagpieTestCaseType TestSetup.check_ResourceChildren(test_case, resource_info["children"], resource_int_id, root_service_id) @staticmethod - def check_ServiceFormat(test_case, # type: AnyMagpieTestCaseType + def check_ServiceFormat(test_case, # type: AnyMagpieTestItemType service, # type: JSON override_permissions=null, # type: Optional[Iterable[Str]] skip_permissions=False, # type: bool @@ -2239,7 +2240,7 @@ def check_ServiceFormat(test_case, # type: AnyMagpieTestCas check_val_not_in("children", service) @staticmethod - def get_AnyServiceOfTestServiceType(test_case, # type: AnyMagpieTestCaseType + def get_AnyServiceOfTestServiceType(test_case, # type: AnyMagpieTestItemType override_service_type=null, # type: Optional[Str] override_headers=null, # type: Optional[HeadersType] override_cookies=null, # type: Optional[CookiesType] @@ -2264,7 +2265,7 @@ def get_AnyServiceOfTestServiceType(test_case, # type: AnyM return list(services_dict.values())[0] @staticmethod - def create_TestServiceResource(test_case, # type: AnyMagpieTestCaseType + def create_TestServiceResource(test_case, # type: AnyMagpieTestItemType override_service_name=null, # type: Optional[Str] override_service_type=null, # type: Optional[Str] override_resource_name=null, # type: Optional[Str] @@ -2288,26 +2289,26 @@ def create_TestServiceResource(test_case, # type: AnyMagpi app_or_url = get_app_or_url(test_case) svc_name = override_service_name if override_service_name is not null else test_case.test_service_name svc_type = override_service_type if override_service_type is not null else test_case.test_service_type + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) svc_json = TestSetup.create_TestService(test_case, override_service_name=svc_name, override_service_type=svc_type, - override_headers=override_headers, override_cookies=override_cookies) + override_headers=headers, override_cookies=cookies) path = "/services/{svc}/resources".format(svc=svc_name) data = override_data if override_data is not null else { "resource_name": override_resource_name or test_case.test_resource_name, "resource_type": override_resource_type or test_case.test_resource_type, } - resp = test_request(app_or_url, "POST", path, json=data, expect_errors=True, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + resp = test_request(app_or_url, "POST", path, json=data, expect_errors=True, headers=headers, cookies=cookies) if ignore_conflict and resp.status_code == 409: svc_info = TestSetup.get_ExistingTestServiceInfo(test_case, override_service_info=svc_json, - override_headers=override_headers, - override_cookies=override_cookies) + override_headers=headers, + override_cookies=cookies) svc_res = TestSetup.get_TestServiceDirectResources(test_case, override_service_name=svc_info["service_name"], - override_headers=override_headers, - override_cookies=override_cookies) + override_headers=headers, + override_cookies=cookies) res_found = [ res for res in svc_res if res["resource_name"] == data["resource_name"] and res["resource_type"] == data["resource_type"] @@ -2317,7 +2318,7 @@ def create_TestServiceResource(test_case, # type: AnyMagpi return check_response_basic_info(resp, 201, expected_method="POST") @staticmethod - def create_TestServiceResourceTree(test_case, # type: AnyMagpieTestCaseType + def create_TestServiceResourceTree(test_case, # type: AnyMagpieTestItemType resource_depth=null, # type: Optional[int] override_service_name=null, # type: Optional[Str] override_service_type=null, # type: Optional[Str] @@ -2378,7 +2379,7 @@ def create_TestServiceResourceTree(test_case, # type: AnyM return all_ids @staticmethod - def create_TestResource(test_case, # type: AnyMagpieTestCaseType + def create_TestResource(test_case, # type: AnyMagpieTestItemType parent_resource_id, # type: int override_resource_name=null, # type: Optional[Str] override_resource_type=null, # type: Optional[Str] @@ -2394,11 +2395,11 @@ def create_TestResource(test_case, # type: AnyMagpieTestCas :raises AssertionError: if the response correspond to failure to create the test resource. """ + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) app_or_url = get_app_or_url(test_case) path = "/resources/{}".format(parent_resource_id) - resp = test_request(app_or_url, "GET", path, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + resp = test_request(app_or_url, "GET", path, headers=headers, cookies=cookies) check_response_basic_info(resp) data = override_data if override_data is not null else { "resource_name": override_resource_name or test_case.test_resource_name, @@ -2406,19 +2407,15 @@ def create_TestResource(test_case, # type: AnyMagpieTestCas "parent_id": parent_resource_id, } # creation response provides only 'basic' info, fetch detailed ones with additional get - resp = test_request(app_or_url, "POST", "/resources", json=data, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + resp = test_request(app_or_url, "POST", "/resources", json=data, headers=headers, cookies=cookies) body = check_response_basic_info(resp, 201, expected_method="POST") info = TestSetup.get_ResourceInfo(test_case, override_body=body) path = "/resources/{}".format(info["resource_id"]) - resp = test_request(app_or_url, "GET", path, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + resp = test_request(app_or_url, "GET", path, headers=headers, cookies=cookies) return check_response_basic_info(resp) @staticmethod - def update_TestAnyResourcePermission(test_case, # type: AnyMagpieTestCaseType + def update_TestAnyResourcePermission(test_case, # type: AnyMagpieTestItemType item_type, # type: Str method, # type: Str # POST|PUT|DELETE override_item_name=null, # type: Optional[Str] @@ -2459,9 +2456,9 @@ def update_TestAnyResourcePermission(test_case, # type: raise ValueError("invalid item-type: [{}]".format(item_type)) data = {"permission": PermissionSet(override_permission).json()} path = "{}/resources/{}/permissions".format(item_path, res_id) - resp = test_request(test_case, method, path, data=data, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) + resp = test_request(test_case, method, path, data=data, headers=headers, cookies=cookies) code = 200 if method == "PUT": code = [200, 201] @@ -2472,7 +2469,7 @@ def update_TestAnyResourcePermission(test_case, # type: return check_response_basic_info(resp, code, expected_method=method) @staticmethod - def create_TestUserResourcePermission(test_case, # type: AnyMagpieTestCaseType + def create_TestUserResourcePermission(test_case, # type: AnyMagpieTestItemType resource_info=null, # type: Optional[JSON] override_resource_id=null, # type: Optional[int] override_permission=null, # type: Optional[AnyPermissionType] @@ -2499,7 +2496,7 @@ def create_TestUserResourcePermission(test_case, # type: ) @staticmethod - def create_TestGroupResourcePermission(test_case, # type: AnyMagpieTestCaseType + def create_TestGroupResourcePermission(test_case, # type: AnyMagpieTestItemType resource_info=null, # type: Optional[JSON] override_resource_id=null, # type: Optional[int] override_permission=null, # type: Optional[AnyPermissionType] @@ -2526,7 +2523,7 @@ def create_TestGroupResourcePermission(test_case, # type: ) @staticmethod - def get_PermissionNames(test_case, # type: AnyMagpieTestCaseType + def get_PermissionNames(test_case, # type: AnyMagpieTestItemType permissions, # type: Union[AnyPermissionType, Collection[AnyPermissionType]] combinations=False, # type: bool ): # type: (...) -> List[Str] @@ -2554,7 +2551,7 @@ def get_PermissionNames(test_case, # type: AnyMagpieTestCaseType return list(perm_names) @staticmethod - def get_ResourceInfo(test_case, # type: AnyMagpieTestCaseType + def get_ResourceInfo(test_case, # type: AnyMagpieTestItemType override_body=None, # type: Optional[JSON] full_detail=False, # type: bool resource_id=None, # type: Optional[int] @@ -2572,6 +2569,8 @@ def get_ResourceInfo(test_case, # type: AnyMagpieTestCaseType resource creation. This is essentially the same as requesting the details directly with :paramref:`resource_id`. """ body = override_body + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) if override_body: if TestVersion(test_case.version) >= TestVersion("0.6.3"): # skip if sub-element was already extracted and provided as input override_body @@ -2580,15 +2579,14 @@ def get_ResourceInfo(test_case, # type: AnyMagpieTestCaseType check_val_type(body, dict) resource_id = body["resource_id"] if resource_id and full_detail: - resp = test_request(test_case, "GET", "/resources/{}".format(resource_id), - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + path = "/resources/{}".format(resource_id) + resp = test_request(test_case, "GET", path, headers=headers, cookies=cookies) body = check_response_basic_info(resp) body = TestSetup.get_ResourceInfo(test_case, override_body=body, resource_id=None, full_detail=False) return body @staticmethod - def get_ExistingTestServiceInfo(test_case, # type: AnyMagpieTestCaseType + def get_ExistingTestServiceInfo(test_case, # type: AnyMagpieTestItemType override_service_name=null, # type: Optional[Str] override_service_info=null, # type: Optional[JSON] override_headers=null, # type: Optional[HeadersType] @@ -2600,14 +2598,14 @@ def get_ExistingTestServiceInfo(test_case, # type: AnyMagpi :raises AssertionError: if the response correspond to missing service or failure to retrieve it. """ svc_name = override_service_name if override_service_name is not null else test_case.test_service_name + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) if override_service_info: json_body = override_service_info else: app_or_url = get_app_or_url(test_case) path = "/services/{svc}".format(svc=svc_name) - resp = test_request(app_or_url, "GET", path, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + resp = test_request(app_or_url, "GET", path, headers=headers, cookies=cookies) json_body = get_json_body(resp) svc_getter = "service" if TestVersion(test_case.version) < TestVersion("0.9.1"): @@ -2615,7 +2613,7 @@ def get_ExistingTestServiceInfo(test_case, # type: AnyMagpi return json_body[svc_getter] @staticmethod - def get_TestServiceDirectResources(test_case, # type: AnyMagpieTestCaseType + def get_TestServiceDirectResources(test_case, # type: AnyMagpieTestItemType ignore_missing_service=False, # type: bool override_service_name=null, # type: Optional[Str] override_headers=null, # type: Optional[HeadersType] @@ -2628,10 +2626,11 @@ def get_TestServiceDirectResources(test_case, # type: AnyM """ app_or_url = get_app_or_url(test_case) svc_name = override_service_name if override_service_name is not null else test_case.test_service_name + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) path = "/services/{svc}/resources".format(svc=svc_name) resp = test_request(app_or_url, "GET", path, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies, + headers=headers, cookies=cookies, expect_errors=ignore_missing_service) if ignore_missing_service and resp.status_code == 404: return [] @@ -2641,7 +2640,7 @@ def get_TestServiceDirectResources(test_case, # type: AnyM @staticmethod def check_NonExistingTestServiceResource(test_case, override_service_name=null, override_resource_name=null): - # type: (AnyMagpieTestCaseType, Optional[Str], Optional[Str]) -> None + # type: (AnyMagpieTestItemType, Optional[Str], Optional[Str]) -> None """ Validates that test resource nested *immediately* under test service does not exist. @@ -2656,7 +2655,7 @@ def check_NonExistingTestServiceResource(test_case, override_service_name=null, check_val_not_in(override_resource_name or test_case.test_resource_name, resources_names) @staticmethod - def delete_TestServiceResource(test_case, # type: AnyMagpieTestCaseType + def delete_TestServiceResource(test_case, # type: AnyMagpieTestItemType override_service_name=null, # type: Optional[Str] override_resource_name=null, # type: Optional[Str] override_headers=null, # type: Optional[HeadersType] @@ -2667,25 +2666,25 @@ def delete_TestServiceResource(test_case, # type: AnyMagpi If the resource does not exist, skip. Otherwise, delete it and validate that it was indeed removed. - :raises AssertionError: if any response does not correspond to non existing service's resource after execution. + :raises AssertionError: if any response does not correspond to non-existing service's resource after execution. """ app_or_url = get_app_or_url(test_case) resource_name = override_resource_name if override_resource_name is not null else test_case.test_resource_name resources = TestSetup.get_TestServiceDirectResources(test_case, ignore_missing_service=True) test_resource = list(filter(lambda r: r["resource_name"] == resource_name, resources)) + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) # delete as required, skip if non-existing if len(test_resource) > 0: resource_id = test_resource[0]["resource_id"] svc_name = override_service_name if override_service_name is not null else test_case.test_service_name path = "/services/{svc}/resources/{res_id}".format(svc=svc_name, res_id=resource_id) - resp = test_request(app_or_url, "DELETE", path, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + resp = test_request(app_or_url, "DELETE", path, headers=headers, cookies=cookies) check_val_equal(resp.status_code, 200) TestSetup.check_NonExistingTestServiceResource(test_case) @staticmethod - def create_TestService(test_case, # type: AnyMagpieTestCaseType + def create_TestService(test_case, # type: AnyMagpieTestItemType override_service_name=null, # type: Optional[Str] override_service_type=null, # type: Optional[Str] override_headers=null, # type: Optional[HeadersType] @@ -2702,32 +2701,32 @@ def create_TestService(test_case, # type: AnyMagpieTestCaseTyp app_or_url = get_app_or_url(test_case) svc_name = override_service_name if override_service_name is not null else test_case.test_service_name svc_type = override_service_type if override_service_type is not null else test_case.test_service_type - override_headers = override_headers if override_headers is not null else test_case.json_headers - override_cookies = override_cookies if override_cookies is not null else test_case.cookies + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) data = { "service_name": svc_name, "service_type": svc_type, "service_url": "http://localhost:9000/{}".format(svc_name) } - if svc_name: + if svc_name and hasattr(test_case, "extra_service_names"): test_case.extra_service_names.add(svc_name) # indicate potential removal at a later point resp = test_request(app_or_url, "POST", "/services", json=data, expect_errors=True, - headers=override_headers, cookies=override_cookies) + headers=headers, cookies=cookies) if resp.status_code == 409: if override_exist is not null and override_exist: TestSetup.delete_TestService(test_case, - override_service_name=override_service_name, - override_headers=override_headers, - override_cookies=override_cookies) + override_service_name=svc_name, + override_headers=headers, + override_cookies=cookies) return TestSetup.create_TestService(test_case, - override_service_name=override_service_name, - override_service_type=override_service_type, - override_headers=override_headers, - override_cookies=override_cookies, + override_service_name=svc_name, + override_service_type=svc_type, + override_headers=headers, + override_cookies=cookies, override_exist=False) if override_exist is null: path = "/services/{svc}".format(svc=svc_name) - resp = test_request(app_or_url, "GET", path, headers=override_headers, cookies=override_cookies) + resp = test_request(app_or_url, "GET", path, headers=headers, cookies=cookies) body = check_response_basic_info(resp, 200, expected_method="GET") if TestVersion(test_case.version) < TestVersion("0.9.1"): body.update({"service": body[svc_name]}) @@ -2737,7 +2736,7 @@ def create_TestService(test_case, # type: AnyMagpieTestCaseTyp @staticmethod def check_NonExistingTestService(test_case, override_service_name=null): - # type: (AnyMagpieTestCaseType, Optional[Str]) -> None + # type: (AnyMagpieTestItemType, Optional[Str]) -> None """ Validates that the test service does not exist. @@ -2750,7 +2749,7 @@ def check_NonExistingTestService(test_case, override_service_name=null): @staticmethod def delete_TestService(test_case, override_service_name=null, override_headers=null, override_cookies=null): - # type: (AnyMagpieTestCaseType, Optional[Str], Optional[HeadersType], Optional[CookiesType]) -> None + # type: (AnyMagpieTestItemType, Optional[Str], Optional[HeadersType], Optional[CookiesType]) -> None """ Deletes the test service. @@ -2759,32 +2758,36 @@ def delete_TestService(test_case, override_service_name=null, override_headers=n :raises AssertionError: if the response does not correspond to successful validation or removal of the service. """ app_or_url = get_app_or_url(test_case) - service_name = override_service_name if override_service_name is not null else test_case.test_service_name + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) + service_name = ( + override_service_name + if override_service_name is not null else + getattr(test_case, "test_service_name", None) + ) services_info = TestSetup.get_RegisteredServicesList(test_case, - override_headers=override_headers, - override_cookies=override_cookies) + override_headers=headers, + override_cookies=cookies) test_service = list(filter(lambda r: r["service_name"] == service_name, services_info)) # delete as required, skip if non-existing if len(test_service) > 0: path = "/services/{svc_name}".format(svc_name=service_name) - resp = test_request(app_or_url, "DELETE", path, - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + resp = test_request(app_or_url, "DELETE", path, headers=headers, cookies=cookies) check_val_equal(resp.status_code, 200) TestSetup.check_NonExistingTestService(test_case, override_service_name=service_name) @staticmethod def get_RegisteredServicesList(test_case, override_headers=null, override_cookies=null): - # type: (AnyMagpieTestCaseType, Optional[HeadersType], Optional[CookiesType]) -> List[JSON] + # type: (AnyMagpieTestItemType, Optional[HeadersType], Optional[CookiesType]) -> List[JSON] """ Obtains the list of registered services names. :raises AssertionError: if the response does not correspond to successful retrieval of user names. """ app_or_url = get_app_or_url(test_case) - resp = test_request(app_or_url, "GET", "/services", - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) + resp = test_request(app_or_url, "GET", "/services", headers=headers, cookies=cookies) json_body = check_response_basic_info(resp, 200, expected_method="GET") # type: JSON # prepare a flat list of registered services @@ -2796,13 +2799,13 @@ def get_RegisteredServicesList(test_case, override_headers=null, override_cookie @staticmethod def delete_TestResource(test_case, resource_id, override_headers=null, override_cookies=null): - # type: (AnyMagpieTestCaseType, int, Optional[HeadersType], Optional[CookiesType]) -> None + # type: (AnyMagpieTestItemType, int, Optional[HeadersType], Optional[CookiesType]) -> None """ Deletes the test resource directly using its ID. - If the resource does not exists, skips the operation. Otherwise, delete it and validate its removal. + If the resource does not exist, skips the operation. Otherwise, delete it and validate its removal. - :raises AssertionError: if the response does not correspond to non existing resource. + :raises AssertionError: if the response does not correspond to non-existing resource. """ app_or_url = get_app_or_url(test_case) path = "/resources/{res_id}".format(res_id=resource_id) @@ -2818,24 +2821,25 @@ def delete_TestResource(test_case, resource_id, override_headers=null, override_ @staticmethod def get_RegisteredUsersList(test_case, override_headers=null, override_cookies=null, pending=False): - # type: (AnyMagpieTestCaseType, Optional[HeadersType], Optional[CookiesType], bool) -> List[Str] + # type: (AnyMagpieTestItemType, Optional[HeadersType], Optional[CookiesType], bool) -> List[Str] """ Obtains the list of registered users. :raises AssertionError: if the response does not correspond to successful retrieval of user names. """ app_or_url = get_app_or_url(test_case) + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) resp = test_request(app_or_url, "GET", "/register/users" if pending else "/users", expect_errors=pending, # route does not exist if not enabled - headers=override_headers if override_headers is not null else test_case.json_headers, - cookies=override_cookies if override_cookies is not null else test_case.cookies) + headers=headers, cookies=cookies) if pending and resp.status_code == 404: # user-registration was not enabled return [] json_body = check_response_basic_info(resp, 200, expected_method="GET") return json_body["registrations"] if pending else json_body["user_names"] @staticmethod - def check_NonExistingTestUser(test_case, # type: AnyMagpieTestCaseType + def check_NonExistingTestUser(test_case, # type: AnyMagpieTestItemType override_user_name=null, # type: Optional[Str] override_headers=null, # type: Optional[HeadersType] override_cookies=null, # type: Optional[CookiesType] @@ -2852,7 +2856,7 @@ def check_NonExistingTestUser(test_case, # type: AnyMagpieTestCas check_val_not_in(user_name, users) @staticmethod - def create_TestUser(test_case, # type: AnyMagpieTestCaseType + def create_TestUser(test_case, # type: AnyMagpieTestItemType override_data=null, # type: Optional[JSON] override_user_name=null, # type: Optional[Str] override_email=null, # type: Optional[Str] @@ -2928,7 +2932,7 @@ def create_TestUser(test_case, # type: AnyMagpieTestCaseType return check_response_basic_info(create_user_resp, 201, expected_method="POST") @staticmethod - def delete_TestUser(test_case, # type: AnyMagpieTestCaseType + def delete_TestUser(test_case, # type: AnyMagpieTestItemType override_user_name=null, # type: Optional[Str] override_headers=null, # type: Optional[HeadersType] override_cookies=null, # type: Optional[CookiesType] @@ -2942,8 +2946,8 @@ def delete_TestUser(test_case, # type: AnyMagpieTestCaseType :raises AssertionError: if any request response does not match successful validation or removal from group. """ app_or_url = get_app_or_url(test_case) - headers = override_headers if override_headers is not null else test_case.json_headers - cookies = override_cookies if override_cookies is not null else test_case.cookies + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) users = TestSetup.get_RegisteredUsersList(test_case, pending=pending, override_headers=headers, override_cookies=cookies) user_name = override_user_name if override_user_name is not null else test_case.test_user_name @@ -2956,15 +2960,15 @@ def delete_TestUser(test_case, # type: AnyMagpieTestCaseType override_headers=headers, override_cookies=cookies) @staticmethod - def clear_PendingUsers(test_case, # type: AnyMagpieTestCaseType + def clear_PendingUsers(test_case, # type: AnyMagpieTestItemType override_headers=null, # type: Optional[HeadersType] override_cookies=null, # type: Optional[CookiesType] ): # type: (...) -> None """ Removes all existing pending user registrations. """ - headers = override_headers if override_headers is not null else test_case.json_headers - cookies = override_cookies if override_cookies is not null else test_case.cookies + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", null) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", null) users = TestSetup.get_RegisteredUsersList(test_case, pending=True, override_headers=headers, override_cookies=cookies) for user in users: @@ -2975,7 +2979,7 @@ def clear_PendingUsers(test_case, # type: AnyMagpieTestCaseType check_val_equal(len(users), 0) @staticmethod - def get_UserInfo(test_case, # type: AnyMagpieTestCaseType + def get_UserInfo(test_case, # type: AnyMagpieTestItemType override_body=None, # type: JSON override_username=null, # type: Optional[Str] override_version=null, # type: Optional[Str] @@ -3005,7 +3009,7 @@ def get_UserInfo(test_case, # type: AnyMagpieTestCaseType return body or {} @staticmethod - def get_GroupInfo(test_case, # type: AnyMagpieTestCaseType + def get_GroupInfo(test_case, # type: AnyMagpieTestItemType override_body=None, # type: JSON override_group_name=null, # type: Optional[Str] override_version=null, # type: Optional[Str] @@ -3040,7 +3044,7 @@ def get_GroupInfo(test_case, # type: AnyMagpieTestCaseType return body["group"] or {} @staticmethod - def check_UserGroupMembership(test_case, # type: AnyMagpieTestCaseType + def check_UserGroupMembership(test_case, # type: AnyMagpieTestItemType member=True, # type: bool override_user_name=null, # type: Optional[Str] override_group_name=null, # type: Optional[Str] @@ -3066,7 +3070,7 @@ def check_UserGroupMembership(test_case, # type: AnyMagpieTes check_val_not_in(usr_name, body["user_names"]) @staticmethod - def assign_TestUserGroup(test_case, # type: AnyMagpieTestCaseType + def assign_TestUserGroup(test_case, # type: AnyMagpieTestItemType override_user_name=null, # type: Optional[Str] override_group_name=null, # type: Optional[Str] override_headers=null, # type: Optional[HeadersType] @@ -3120,7 +3124,7 @@ def assign_TestUserGroup(test_case, # type: AnyMagpieTestCaseTyp @staticmethod def get_RegisteredGroupsList(test_case, only_discoverable=False, override_headers=null, override_cookies=null): - # type: (AnyMagpieTestCaseType, bool, Optional[HeadersType], Optional[CookiesType]) -> List[Str] + # type: (AnyMagpieTestItemType, bool, Optional[HeadersType], Optional[CookiesType]) -> List[Str] """ Obtains all existing group names, or optionally, only return the publicly discoverable ones. @@ -3136,7 +3140,7 @@ def get_RegisteredGroupsList(test_case, only_discoverable=False, override_header @staticmethod def check_NonExistingTestGroup(test_case, override_group_name=null, override_headers=null, override_cookies=null): - # type: (AnyMagpieTestCaseType, Optional[Str], Optional[HeadersType], Optional[CookiesType]) -> None + # type: (AnyMagpieTestItemType, Optional[Str], Optional[HeadersType], Optional[CookiesType]) -> None """ Validate that test group does not exist. @@ -3149,7 +3153,7 @@ def check_NonExistingTestGroup(test_case, override_group_name=null, override_hea check_val_not_in(group_name, groups) @staticmethod - def create_TestGroup(test_case, # type: AnyMagpieTestCaseType + def create_TestGroup(test_case, # type: AnyMagpieTestItemType override_group_name=null, # type: Optional[Str] override_terms=null, # type: Optional[Str] override_discoverable=null, # type: Optional[bool] @@ -3194,7 +3198,7 @@ def create_TestGroup(test_case, # type: AnyMagpieTestCaseTyp @staticmethod def delete_TestGroup(test_case, override_group_name=null, override_headers=null, override_cookies=null): - # type: (AnyMagpieTestCaseType, Optional[Str], Optional[HeadersType], Optional[CookiesType]) -> None + # type: (AnyMagpieTestItemType, Optional[Str], Optional[HeadersType], Optional[CookiesType]) -> None """ Delete the test group. @@ -3204,8 +3208,8 @@ def delete_TestGroup(test_case, override_group_name=null, override_headers=null, :return: nothing. Group is ensured to not exist. """ app_or_url = get_app_or_url(test_case) - headers = override_headers if override_headers is not null else test_case.json_headers - cookies = override_cookies if override_cookies is not null else test_case.cookies + headers = override_headers if override_headers is not null else getattr(test_case, "json_headers", None) + cookies = override_cookies if override_cookies is not null else getattr(test_case, "cookies", None) groups = TestSetup.get_RegisteredGroupsList(test_case, override_headers=headers, override_cookies=cookies) group_name = override_group_name if override_group_name is not null else test_case.test_group_name # delete as required, skip if non-existing @@ -3217,7 +3221,7 @@ def delete_TestGroup(test_case, override_group_name=null, override_headers=null, override_headers=headers, override_cookies=cookies) @staticmethod - def delete_TestUserResourcePermission(test_case, # type: AnyMagpieTestCaseType + def delete_TestUserResourcePermission(test_case, # type: AnyMagpieTestItemType resource_info=null, # type: Optional[JSON] override_resource_id=null, # type: Optional[int] override_permission=null, # type: Optional[AnyPermissionType] @@ -3247,7 +3251,7 @@ def delete_TestUserResourcePermission(test_case, # type: return result @staticmethod - def delete_TestGroupResourcePermission(test_case, # type: AnyMagpieTestCaseType + def delete_TestGroupResourcePermission(test_case, # type: AnyMagpieTestItemType resource_info=null, # type: Optional[JSON] override_resource_id=null, # type: Optional[int] override_permission=null, # type: Optional[AnyPermissionType] @@ -3278,7 +3282,7 @@ def delete_TestGroupResourcePermission(test_case, # type: @staticmethod def check_GetUserResourcePermissions(test_case, user_name, resource_id, query=None): - # type: (AnyMagpieTestCaseType, Str, int, Optional[Str]) -> JSON + # type: (AnyMagpieTestItemType, Str, int, Optional[Str]) -> JSON query = "?{}".format(query) if query else "" path = "/users/{usr}/resources/{res}/permissions{q}".format(usr=user_name, res=resource_id, q=query) resp = test_request(test_case, "GET", path, headers=test_case.json_headers, cookies=test_case.cookies) @@ -3289,7 +3293,7 @@ def check_GetUserResourcePermissions(test_case, user_name, resource_id, query=No @staticmethod def check_GetUserResourcesOrService(test_case, user_name, service_or_resource_path, query=None): - # type: (AnyMagpieTestCaseType, Str, Str, Optional[Str]) -> JSON + # type: (AnyMagpieTestItemType, Str, Str, Optional[Str]) -> JSON query = "?{}".format(query) if query else "" path = "/users/{usr}/{sr}{q}".format(usr=user_name, sr=service_or_resource_path, q=query) resp = test_request(test_case, "GET", path, headers=test_case.json_headers, cookies=test_case.cookies) @@ -3300,10 +3304,10 @@ def check_GetUserResourcesOrService(test_case, user_name, service_or_resource_pa @staticmethod def check_GetUserServices(test_case, user_name, query=None): - # type: (AnyMagpieTestCaseType, Str, Optional[Str]) -> JSON + # type: (AnyMagpieTestItemType, Str, Optional[Str]) -> JSON return TestSetup.check_GetUserResourcesOrService(test_case, user_name, "services", query=query) @staticmethod def check_GetUserResources(test_case, user_name, query=None): - # type: (AnyMagpieTestCaseType, Str, Optional[Str]) -> JSON + # type: (AnyMagpieTestItemType, Str, Optional[Str]) -> JSON return TestSetup.check_GetUserResourcesOrService(test_case, user_name, "resources", query=query)