From d14da1c734c20c4092f15ff8454bc6e7d711c720 Mon Sep 17 00:00:00 2001 From: Jeremy Bonghwan Choi Date: Tue, 19 Sep 2023 17:01:13 +1000 Subject: [PATCH] Added connection timeout for get_token() for DefectDojo integration (#130) * added timeout for get_token(). added a few loggings * added initial pytest for dd integration * changed the name of get_token(). improved error handling and log messages --- exports/defect_dojo.py | 70 +++++++++++++++++++--------- rapidast.py | 13 +++++- tests/test_defectdojo_integration.py | 27 +++++++++++ 3 files changed, 87 insertions(+), 23 deletions(-) create mode 100644 tests/test_defectdojo_integration.py diff --git a/exports/defect_dojo.py b/exports/defect_dojo.py index 20d16367..5ce53618 100644 --- a/exports/defect_dojo.py +++ b/exports/defect_dojo.py @@ -1,7 +1,5 @@ -import json import logging from urllib import parse -from urllib import request import requests @@ -9,6 +7,8 @@ class DefectDojo: """This class instanciates a connection to DefectDojo, and pushes reports""" + DD_CONNECT_TIMEOUT = 10 # in seconds + def __init__(self, base_url, username=None, password=None, token=None): if not base_url: raise ValueError( @@ -26,7 +26,7 @@ def __init__(self, base_url, username=None, password=None, token=None): if token: self.headers["Authorization"] = f"Token {token}" - def get_token(self): + def _auth_and_set_token(self): """Force a refresh of the token using the username/password""" logging.debug("Defect Dojo: refreshing token") if not self.username or not self.password: @@ -35,32 +35,44 @@ def get_token(self): ) url = self.base_url + "/api/v2/api-token-auth/" data = {"username": self.username, "password": self.password} - data = parse.urlencode(data).encode("ascii") - with request.urlopen(url, data=data) as resp: - if resp.getcode() >= 400: - logging.warning( - f"Defect Dojo did not answer as expected during login (status: {resp.getcode()})" - ) + try: + resp = requests.post(url, timeout=self.DD_CONNECT_TIMEOUT, data=data) + resp.raise_for_status() + + logging.debug(f"resp: {resp.json()}") + self.token = resp.json()["token"] - self.token = json.load(resp)["token"] + self.headers["Authorization"] = f"Token {self.token}" + logging.debug("Defect Dojo: successfully refreshed token") + except requests.exceptions.ConnectTimeout as e: + logging.error( + f"Getting token failed. Check the URL for defectDojo in config file. err details: {e}" + ) + return 1 + except requests.exceptions.HTTPError as e: + logging.error( + f"Getting token failed: Check the username/password for defectDojo in the config file. err details: {e}" + ) + return 1 - self.headers["Authorization"] = f"Token {self.token}" - logging.debug("Defect Dojo: successfully refreshed token") + return 0 def engagement_exists(self, engagement_id=None, name=None): """Return True if an engagement exists, False otherwise Engagement is identified either by its name or its ID (positive integer)""" if not self.token: - self.get_token() + self._auth_and_set_token() if engagement_id: resp = requests.get( f"{self.base_url}/api/v2/engagements/?engagment={engagement_id}", + timeout=self.DD_CONNECT_TIMEOUT, headers=self.headers, ) elif name: resp = requests.get( f"{self.base_url}/api/v2/engagements/?name={parse.quote_plus(name)}", + timeout=self.DD_CONNECT_TIMEOUT, headers=self.headers, ) else: @@ -88,18 +100,30 @@ def _private_import(self, endpoint, data, filename): raise ValueError(f"Missing required entries for reimport: {missing}") if not self.token: - self.get_token() + if self._auth_and_set_token() == 1: + # failed to get token + return 1 resp = requests.post( endpoint, + timeout=self.DD_CONNECT_TIMEOUT, headers=self.headers, data=data, files={"file": open(filename, "rb")}, # pylint: disable=consider-using-with ) if resp.status_code >= 400: - print(vars(resp)) + logging.debug(vars(resp)) err = resp.json() - logging.warning(f"Error while exporting ({resp.status_code}, {err})") + logging.error(f"Error while exporting ({resp.status_code}, {err})") + + if "Invalid token" in err["detail"]: + logging.error( + "Please check your token in 'config.defectDojo' of the config file" + ) + + return 1 + + return 0 def reimport_scan(self, data, filename): """Reimport to an existing engagement with an existing compatible scan.""" @@ -113,7 +137,9 @@ def reimport_scan(self, data, filename): "Reimport needs to identify an existing test (by ID or names of product+engagement+test)" ) - self._private_import(f"{self.base_url}/api/v2/reimport-scan/", data, filename) + return self._private_import( + f"{self.base_url}/api/v2/reimport-scan/", data, filename + ) def import_scan(self, data, filename): """Import to an existing engagement.""" @@ -125,7 +151,9 @@ def import_scan(self, data, filename): "Import needs to identify an existing engagement (by ID or names of product+engagement)" ) - self._private_import(f"{self.base_url}/api/v2/import-scan/", data, filename) + return self._private_import( + f"{self.base_url}/api/v2/import-scan/", data, filename + ) def import_or_reimport_scan(self, data, filename): """Decide wether to import or reimport. Based on: @@ -135,9 +163,9 @@ def import_or_reimport_scan(self, data, filename): if not data or not filename: # missing data means nothing to do logging.debug("Insufficient data for Defect Dojo") - return + return 1 if data.get("test"): - self.reimport_scan(data, filename) + return self.reimport_scan(data, filename) else: - self.import_scan(data, filename) + return self.import_scan(data, filename) diff --git a/rapidast.py b/rapidast.py index e2500a43..18db11e7 100755 --- a/rapidast.py +++ b/rapidast.py @@ -117,7 +117,11 @@ def run_scanner(name, config, args, defect_d): # Part 6: export to defect dojo, if the scanner is compatible if defect_d and hasattr(scanner, "data_for_defect_dojo"): - defect_d.import_or_reimport_scan(*scanner.data_for_defect_dojo()) + logging.info("Exporting results to the Defect Dojo service as configured") + + if defect_d.import_or_reimport_scan(*scanner.data_for_defect_dojo()) == 1: + logging.error("Exporting results to DefectDojo failed") + return 1 return 0 @@ -190,7 +194,12 @@ def run(): for name in config.get("scanners"): logging.info(f"Next scanner: '{name}'") - scan_error_count += run_scanner(name, config, args, defect_d) + ret = run_scanner(name, config, args, defect_d) + if ret == 1: + logging.info(f"scanner: '{name}' failed") + scan_error_count = scan_error_count + 1 + else: + logging.info(f"scanner: '{name}' completed successfully") if scan_error_count > 0: logging.warning(f"Number of failed scanners: {scan_error_count}") diff --git a/tests/test_defectdojo_integration.py b/tests/test_defectdojo_integration.py new file mode 100644 index 00000000..e3ad938c --- /dev/null +++ b/tests/test_defectdojo_integration.py @@ -0,0 +1,27 @@ +import pytest +import requests + +from exports.defect_dojo import DefectDojo + + +# DefectDojo integration tests +def test_dd_invalid_url_scheme(): + with pytest.raises(ValueError): + defect_d = DefectDojo("invalid_url") + + +def test_dd_auth_and_set_token_no_username(): + defect_d = DefectDojo("https://127.0.0.1:12345") + with pytest.raises(ValueError) as e_info: + defect_d._auth_and_set_token() + + assert "A username and a password are required" in str(e_info) + + +def test_dd_auth_and_set_token_non_existent_url(): + # assuming 127.0.0.1:12345 is non-existent + defect_d = DefectDojo( + "https://127.0.0.1:12345", "random_username", "random_password", "random_token" + ) + with pytest.raises(requests.exceptions.ConnectionError): + defect_d._auth_and_set_token()