From a7d6b3351a203f47e7eb100e769e14c1308a56e9 Mon Sep 17 00:00:00 2001 From: TOUFIKI Zakarya Date: Thu, 19 Dec 2024 18:02:23 +0100 Subject: [PATCH] Fix the body problem --- Stormshield/stormshield_module/base.py | 18 +++++++++++++++- Stormshield/tests/test_base.py | 29 +++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/Stormshield/stormshield_module/base.py b/Stormshield/stormshield_module/base.py index 2949ec12..1610ca13 100644 --- a/Stormshield/stormshield_module/base.py +++ b/Stormshield/stormshield_module/base.py @@ -12,6 +12,7 @@ ) from sekoia_automation.action import GenericAPIAction +from sekoia_automation.exceptions import MissingActionArgumentFileError class StormshieldAction(GenericAPIAction): @@ -29,6 +30,21 @@ def base_url(self) -> str: def get_headers(self) -> dict[str, str]: return {"Authorization": f"Bearer {self.api_token}"} + + def get_body(self, arguments: dict): + res = {} + for key, value in arguments.items(): + if isinstance(value, dict): + res[key] = self.get_body(value) + elif isinstance(value, bool): + res[key] = str(value).lower() + else: + try: + new_key = key.replace("_path", "") + res[new_key] = self.json_argument(new_key, arguments) + except MissingActionArgumentFileError: # pragma: no cover + res[key] = value + return res def treat_failed_response(self, response: Response) -> None: errors = { @@ -58,7 +74,7 @@ def get_url(self, arguments: dict[str, Any]) -> str: if k in arguments: value = arguments.pop(k) if isinstance(value, bool): - value = str(value) + value = str(value).lower() query_arguments.append(f"{k}={value}") if query_arguments: diff --git a/Stormshield/tests/test_base.py b/Stormshield/tests/test_base.py index 3eef85ee..d1b7d949 100644 --- a/Stormshield/tests/test_base.py +++ b/Stormshield/tests/test_base.py @@ -15,7 +15,7 @@ def test_base_get_headers(symphony_storage): assert header == {"Authorization": "Bearer token"} -def test_base_get_url(symphony_storage): +def test_base_normal_get_url(symphony_storage): module_configuration = { "api_token": "token", "url": "https://stormshield-api-example.eu/", @@ -24,6 +24,18 @@ def test_base_get_url(symphony_storage): action.module.configuration = module_configuration url = action.get_url({"id": "foo"}) assert url == "https://stormshield-api-example.eu/rest/api/v1/agents/foo/tasks/network-isolation" + + +def test_base_query_param_get_url(symphony_storage): + module_configuration = { + "api_token": "token", + "url": "https://stormshield-api-example.eu/", + } + action = EndpointAgentIsolationAction(data_path=symphony_storage) + action.module.configuration = module_configuration + action.query_parameters = ["param1", "param2"] + url = action.get_url({"id": "foo", "param1": "value1", "param2": "value2"}) + assert url == "https://stormshield-api-example.eu/rest/api/v1/agents/foo/tasks/network-isolation?param1=value1¶m2=value2" def test_treat_failed_response_authentication_failed_401(symphony_storage): @@ -46,3 +58,18 @@ def test_treat_failed_response_authentication_failed_500(symphony_storage): action.treat_failed_response(response) assert str(excinfo.value) == f"Error : Internal server error: Rate limit exceeded." + + +def test_get_body(symphony_storage): + module_configuration = { + "api_token": "token", + "url": "https://stormshield-api-example.eu/", + } + action = EndpointAgentIsolationAction(data_path=symphony_storage) + action.module.configuration = module_configuration + url = action.get_body({"id": "foo", "forceServerIsolation" : True, "comment": "test"}) + assert url == {"id": "foo", "forceServerIsolation": "true", "comment": "test"} + + # Test with dict parameter + url = action.get_body({"id": "foo", "dictparam": {"forceServerIsolation" : True, "comment": "test"}}) + assert url == {"id": "foo", "dictparam": {"forceServerIsolation": "true", "comment": "test"}} \ No newline at end of file