diff --git a/configmodel/__init__.py b/configmodel/__init__.py index 99c9bd07..3da2fe4f 100644 --- a/configmodel/__init__.py +++ b/configmodel/__init__.py @@ -77,9 +77,7 @@ def delete(self, path): except AttributeError: pass # Failed to iterate until the end: the path does not exist - logging.warning( - f"RapidastConfigModel.delete(): Config path {path} was not found. No deletion" - ) + logging.warning(f"RapidastConfigModel.delete(): Config path {path} was not found. No deletion") return False def exists(self, path): @@ -122,9 +120,7 @@ def set(self, path, value, overwrite=True): tmp = walk[key] # case 3: not a "dictionary" type: warn and overwrite (if True) if not isinstance(tmp, dict): - logging.warning( - f"RapidastConfigModel.set: Incompatible {path} at {tmp}" - ) + logging.warning(f"RapidastConfigModel.set: Incompatible {path} at {tmp}") if not overwrite: logging.info("RapidastConfigModel.set: no overwrite: early return") return False @@ -162,9 +158,7 @@ def merge(self, merge, preserve=False, root=None): if not merge: return if not isinstance(merge, dict): - raise TypeError( - f"RapidastConfigModel.merge: merge must be a dict (was: {type(merge)})" - ) + raise TypeError(f"RapidastConfigModel.merge: merge must be a dict (was: {type(merge)})") root = path_to_list(root) diff --git a/configmodel/converter.py b/configmodel/converter.py index fcd66ab3..8177d888 100755 --- a/configmodel/converter.py +++ b/configmodel/converter.py @@ -45,9 +45,7 @@ def dispatch(version): def convert_configmodel(conf): """This is the base function, attached to error reporting""" version = conf.get("config.configVersion", 0) - raise RuntimeError( - f"There was an error in converting configuration. No convertion available for version {version}" - ) + raise RuntimeError(f"There was an error in converting configuration. No convertion available for version {version}") @convert_configmodel.register(4) @@ -60,9 +58,7 @@ def convert_from_version_4_to_5(old): new = copy.deepcopy(old) for key in old.conf["scanners"]: - if key.startswith("zap") and old.exists( - f"scanners.{key}.miscOptions.oauth2OpenapiManualDownload" - ): + if key.startswith("zap") and old.exists(f"scanners.{key}.miscOptions.oauth2OpenapiManualDownload"): new.move( f"scanners.{key}.miscOptions.oauth2OpenapiManualDownload", f"scanners.{key}.miscOptions.oauth2ManualDownload", @@ -174,8 +170,7 @@ def convert_from_version_0_to_1(old): auth_method = old.get("scan.auth_method", default=None) if ( auth_method == "scriptBasedAuthentication" - and old.get("scan.scriptAuth.authScriptFilePath", default="") - == "scripts/offline-token.js" + and old.get("scan.scriptAuth.authScriptFilePath", default="") == "scripts/offline-token.js" ): # probably OAuth2 new.set( @@ -183,20 +178,14 @@ def convert_from_version_0_to_1(old): { "type": "oauth2_rtoken", "parameters": { - "client_id": old.get( - "scan.scriptAuth.authClientID", default="cloud-services" - ), - "token_endpoint": old.get( - "scan.scriptAuth.authTokenEndpoint", default="" - ), + "client_id": old.get("scan.scriptAuth.authClientID", default="cloud-services"), + "token_endpoint": old.get("scan.scriptAuth.authTokenEndpoint", default=""), "rtoken_var_name": "RTOKEN", }, }, ) else: - logging.warning( - "The config version translator does not support this particular authentication" - ) + logging.warning("The config version translator does not support this particular authentication") # "Scanners.Zap" section new.set( @@ -206,13 +195,9 @@ def convert_from_version_0_to_1(old): ### OpenAPI if old.get("openapi.importFromUrl", default=False): - new.set( - "scanners.zap.apiScan.apis.apiUrl", old.get("openapi.url", default=None) - ) + new.set("scanners.zap.apiScan.apis.apiUrl", old.get("openapi.url", default=None)) elif old.get("openapi.directory", default=""): - logging.warning( - "The config version translator does not support Directory based OpenAPI" - ) + logging.warning("The config version translator does not support Directory based OpenAPI") ## Passive scan new.set("scanners.zap.passiveScan", {}) @@ -225,9 +210,7 @@ def convert_from_version_0_to_1(old): ## Active scan # Active scanner was always enabled, so we do the same: new.set("scanners.zap.activeScan", {}) - new.set( - "scanners.zap.activeScan.policy", old.get("scan.policies.scanPolicyName", None) - ) + new.set("scanners.zap.activeScan.policy", old.get("scan.policies.scanPolicyName", None)) # Finally, set the correct version number new.set("config.configVersion", 1) diff --git a/exports/defect_dojo.py b/exports/defect_dojo.py index ee13c124..297c2506 100644 --- a/exports/defect_dojo.py +++ b/exports/defect_dojo.py @@ -11,9 +11,7 @@ class DefectDojo: def __init__(self, base_url, login=None, token=None, ssl=None): if not base_url: - raise ValueError( - "Defect Dojo invalid configuration: URL is a mandatory value" - ) + raise ValueError("Defect Dojo invalid configuration: URL is a mandatory value") parsed = parse.urlparse(base_url) # expects to raise exception on invalid URL if parsed.scheme not in ["http", "https"]: raise ValueError("Defect Dojo invalid configuration: URL is not correct") @@ -27,9 +25,7 @@ def __init__(self, base_url, login=None, token=None, ssl=None): self.username = login["username"] self.password = login["password"] except KeyError: - logging.error( - "RapiDAST BUG: DefectDojo was created with invalid login information..." - ) + logging.error("RapiDAST BUG: DefectDojo was created with invalid login information...") logging.error("RapiDAST BUG: ...[continuing without credentials]") self.token = token @@ -47,9 +43,7 @@ def _auth_and_set_token(self): """Force a refresh of the token using the username/password""" logging.debug("Defect Dojo: refreshing token") if not self.username or not self.password: - raise ValueError( - "Defect Dojo invalid configuration: A username and a password are required to get a token" - ) + raise ValueError("Defect Dojo invalid configuration: A username and a password are required to get a token") url = self.base_url + "/api/v2/api-token-auth/" data = {"username": self.username, "password": self.password} @@ -63,9 +57,7 @@ def _auth_and_set_token(self): self.headers["Authorization"] = f"Token {self.token}" logging.debug("Defect Dojo: successfully refreshed token") except requests.exceptions.ConnectTimeout as e: - logging.error( - f"Getting token failed. Check the URL for defectDojo in config file. err details: {e}" - ) + logging.error(f"Getting token failed. Check the URL for defectDojo in config file. err details: {e}") return 1 except requests.exceptions.HTTPError as e: logging.error( @@ -96,9 +88,7 @@ def engagement_exists(self, engagement_id=None, name=None): raise ValueError("Either an engagement name or ID must be provided") if resp.status_code >= 400: - logging.warning( - f"Error while looking for engagement ({resp.status_code}, {resp.get('message')})" - ) + logging.warning(f"Error while looking for engagement ({resp.status_code}, {resp.get('message')})") counts = resp.json()["counts"] if counts > 1: logging.warning("Error while looking for engagement: too many hits") @@ -134,9 +124,7 @@ def _private_import(self, endpoint, data, filename): logging.error(f"Error while exporting ({resp.status_code}, {err})") if "Invalid token" in err["detail"]: - logging.error( - "Please check your token in 'config.defectDojo' of the config file" - ) + logging.error("Please check your token in 'config.defectDojo' of the config file") return 1 @@ -146,31 +134,19 @@ def reimport_scan(self, data, filename): """Reimport to an existing engagement with an existing compatible scan.""" if not data.get("test") and not ( - data.get("engagement_name") - and data.get("product_name") - and data.get("test_title") + data.get("engagement_name") and data.get("product_name") and data.get("test_title") ): - raise ValueError( - "Reimport needs to identify an existing test (by ID or names of product+engagement+test)" - ) + raise ValueError("Reimport needs to identify an existing test (by ID or names of product+engagement+test)") - return self._private_import( - f"{self.base_url}/api/v2/reimport-scan/", data, filename - ) + return self._private_import(f"{self.base_url}/api/v2/reimport-scan/", data, filename) def import_scan(self, data, filename): """export to an existing engagement, via the `import-scan` endpoint.""" - if not data.get("engagement") and not ( - data.get("engagement_name") and data.get("product_name") - ): - raise ValueError( - "Import needs to identify an existing engagement (by ID or names of product+engagement)" - ) + if not data.get("engagement") and not (data.get("engagement_name") and data.get("product_name")): + raise ValueError("Import needs to identify an existing engagement (by ID or names of product+engagement)") - return self._private_import( - f"{self.base_url}/api/v2/import-scan/", data, filename - ) + return self._private_import(f"{self.base_url}/api/v2/import-scan/", data, filename) def export_scan(self, data, filename): """Decide wether to import or reimport. Based on: diff --git a/exports/google_cloud_storage.py b/exports/google_cloud_storage.py index 2fe4883f..4cc7d684 100755 --- a/exports/google_cloud_storage.py +++ b/exports/google_cloud_storage.py @@ -56,9 +56,7 @@ def export_scan(self, data, filename): metadata = self.create_metadata(data) - logging.info( - f"GoogleCloudStorage: sending {filename}. UUID: {metadata['uuid']}" - ) + logging.info(f"GoogleCloudStorage: sending {filename}. UUID: {metadata['uuid']}") # export data as a metadata.json file json_stream = StringIO() @@ -82,11 +80,7 @@ def export_scan(self, data, filename): unique_id = "{}-RapiDAST-{}-{}.tgz".format( # pylint: disable=C0209 datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), self.app_name, - "".join( - random.choices( - string.ascii_letters + string.ascii_uppercase + string.digits, k=6 - ) - ), + "".join(random.choices(string.ascii_letters + string.ascii_uppercase + string.digits, k=6)), ) blob_name = self.directory + "/" + unique_id diff --git a/rapidast.py b/rapidast.py index 5b05ba4e..a03d0372 100755 --- a/rapidast.py +++ b/rapidast.py @@ -108,9 +108,7 @@ def run_scanner(name, config, args, scan_exporter): # Part 5: cleanup if not scanner.state == scanners.State.PROCESSED: - logging.error( - f"Something is wrong. Scanner {name} is not in PROCESSED state: the workdir won't be cleaned up" - ) + logging.error(f"Something is wrong. Scanner {name} is not in PROCESSED state: the workdir won't be cleaned up") return 1 if not args.no_cleanup: @@ -155,19 +153,13 @@ def run(): args.loglevel = args.loglevel.upper() add_logging_level("VERBOSE", logging.DEBUG + 5) logging.basicConfig(format="%(levelname)s:%(message)s", level=args.loglevel) - logging.debug( - f"log level set to debug. Config file: '{parser.parse_args().config_file}'" - ) + logging.debug(f"log level set to debug. Config file: '{parser.parse_args().config_file}'") # Load config file try: - config = configmodel.RapidastConfigModel( - yaml.safe_load(load_config_file(parser.parse_args().config_file)) - ) + config = configmodel.RapidastConfigModel(yaml.safe_load(load_config_file(parser.parse_args().config_file))) except yaml.YAMLError as exc: - raise RuntimeError( - f"YAML error in config {parser.parse_args().config_file}':\n {str(exc)}" - ) from exc + raise RuntimeError(f"YAML error in config {parser.parse_args().config_file}':\n {str(exc)}") from exc # Optionally adds default if file exists (will not overwrite existing entries) default_conf = os.path.join(os.path.dirname(__file__), "rapidast-defaults.yaml") @@ -176,18 +168,14 @@ def run(): try: config.merge(yaml.safe_load(load_config_file(default_conf)), preserve=True) except yaml.YAMLError as exc: - raise RuntimeError( - f"YAML error in config {default_conf}':\n {str(exc)}" - ) from exc + raise RuntimeError(f"YAML error in config {default_conf}':\n {str(exc)}") from exc # Update to latest config schema if need be config = configmodel.converter.update_to_latest_config(config) config.set("config.results_dir", get_full_result_dir_path(config)) - logging.debug( - f"The entire loaded configuration is as follow:\n=====\n{pp.pformat(config)}\n=====" - ) + logging.debug(f"The entire loaded configuration is as follow:\n=====\n{pp.pformat(config)}\n=====") # Do early: load the environment file if one is there load_environment(config) @@ -196,9 +184,7 @@ def run(): scan_exporter = None if config.get("config.googleCloudStorage.bucketName"): scan_exporter = GoogleCloudStorage( - bucket_name=config.get( - "config.googleCloudStorage.bucketName", "default-bucket-name" - ), + bucket_name=config.get("config.googleCloudStorage.bucketName", "default-bucket-name"), app_name=config.get_official_app_name(), directory=config.get("config.googleCloudStorage.directory", None), keyfile=config.get("config.googleCloudStorage.keyFile", None), @@ -207,12 +193,8 @@ def run(): scan_exporter = DefectDojo( config.get("config.defectDojo.url"), { - "username": config.get( - "config.defectDojo.authorization.username", default="" - ), - "password": config.get( - "config.defectDojo.authorization.password", default="" - ), + "username": config.get("config.defectDojo.authorization.username", default=""), + "password": config.get("config.defectDojo.authorization.password", default=""), }, config.get("config.defectDojo.authorization.token"), config.get("config.defectDojo.ssl", default=True), diff --git a/scanners/__init__.py b/scanners/__init__.py index c9d41b5c..c0f5df05 100644 --- a/scanners/__init__.py +++ b/scanners/__init__.py @@ -25,9 +25,7 @@ def __init__(self, config, ident): self.config = config self.state = State.UNCONFIGURED - self.results_dir = os.path.join( - self.config.get("config.results_dir", default="results"), self.ident - ) + self.results_dir = os.path.join(self.config.get("config.results_dir", default="results"), self.ident) # When requested to create a temporary file or directory, it will be a subdir of # this temporary directory @@ -77,8 +75,7 @@ def _should_export_to_defect_dojo(self): - this particular scanner's export is not explicitely disabled (`defectDojoExport` is not False) """ return self.my_conf("defectDojoExport") is not False and ( - self.config.get("config.googleCloudStorage") - or self.config.get("config.defectDojo") + self.config.get("config.googleCloudStorage") or self.config.get("config.defectDojo") ) def _fill_up_data_for_defect_dojo(self, data): @@ -124,9 +121,7 @@ def _fill_up_data_for_defect_dojo(self, data): # A default product name was chosen as part of `self.get_default_defectdojo_data()` # Generate an engagement name if none are set if not data.get("engagement_name"): - data[ - "engagement_name" - ] = f"RapiDAST-{data['product_name']}-{datetime.date.today()}" + data["engagement_name"] = f"RapiDAST-{data['product_name']}-{datetime.date.today()}" return data diff --git a/scanners/downloaders.py b/scanners/downloaders.py index cc26e317..d812a56d 100644 --- a/scanners/downloaders.py +++ b/scanners/downloaders.py @@ -57,9 +57,7 @@ def oauth2_get_token_from_rtoken(auth, proxy=None, session=None): resp = session.post(auth["url"], data=payload, headers=headers, proxies=proxy) resp.raise_for_status() except requests.exceptions.ConnectTimeout: - logging.error( - "Getting oauth2 token failed: server unresponsive. Check the Authentication URL parameters" - ) + logging.error("Getting oauth2 token failed: server unresponsive. Check the Authentication URL parameters") return False except requests.exceptions.HTTPError as e: logging.error(f"Getting token failed: Check the RTOKEN. err details: {e}") @@ -68,9 +66,7 @@ def oauth2_get_token_from_rtoken(auth, proxy=None, session=None): try: token = yaml.safe_load(resp.text)["access_token"] except KeyError as exc: - logging.error( - f"Unable to extract access token from OAuth2 authentication:\n {str(exc)}" - ) + logging.error(f"Unable to extract access token from OAuth2 authentication:\n {str(exc)}") return False return token @@ -96,9 +92,7 @@ def authenticated_download_with_rtoken(url, dest, auth, proxy=None): resp = session.get(url, proxies=proxy, headers=authenticated_headers) if resp.status_code >= 400: - logging.warning( - f"ERROR: download failed with {resp.status_code}. Aborting download for {url}" - ) + logging.warning(f"ERROR: download failed with {resp.status_code}. Aborting download for {url}") return False with open(dest, "w", encoding="utf-8") as file: diff --git a/scanners/generic/generic.py b/scanners/generic/generic.py index 74b304c1..6347e5fe 100644 --- a/scanners/generic/generic.py +++ b/scanners/generic/generic.py @@ -125,9 +125,7 @@ def _setup_generic_cli(self): # disabling these 2 rules only here since they might actually be useful else where # pylint: disable=unused-argument def _add_env(self, key, value=None): - logging.warning( - "_add_env() was called on the parent Generic class. This is likely a bug. No operation done" - ) + logging.warning("_add_env() was called on the parent Generic class. This is likely a bug. No operation done") ############################################################### # PRIVATE METHODS # diff --git a/scanners/generic/generic_none.py b/scanners/generic/generic_none.py index e881256b..c9673ef7 100644 --- a/scanners/generic/generic_none.py +++ b/scanners/generic/generic_none.py @@ -53,9 +53,7 @@ def setup(self): """ if self.state != State.UNCONFIGURED: - raise RuntimeError( - f"generic_none setup encountered an unexpected state: {self.state}" - ) + raise RuntimeError(f"generic_none setup encountered an unexpected state: {self.state}") self._setup_generic_cli() @@ -78,11 +76,7 @@ def run(self): cli = self.generic_cli # The result is stdout if "results" is undefined or `*stdout` - stdout_store = ( - subprocess.PIPE - if not self.my_conf("results") or self.my_conf("results") == "*stdout" - else None - ) + stdout_store = subprocess.PIPE if not self.my_conf("results") or self.my_conf("results") == "*stdout" else None # DO STUFF @@ -105,21 +99,13 @@ def run(self): for line in scanning.stdout: print(line, end="") scanning_stdout_results += line - logging.debug( - f"generic returned the following:\n=====\n{pp.pformat(scanning)}\n=====" - ) - - if scanning.returncode in self.my_conf( - "container.parameters.validReturns", [0] - ): - logging.info( - f"The generic process finished correctly, and exited with code {scanning.returncode}" - ) + logging.debug(f"generic returned the following:\n=====\n{pp.pformat(scanning)}\n=====") + + if scanning.returncode in self.my_conf("container.parameters.validReturns", [0]): + logging.info(f"The generic process finished correctly, and exited with code {scanning.returncode}") self.state = State.DONE else: - logging.warning( - f"The generic process did not finish correctly, and exited with code {scanning.returncode}" - ) + logging.warning(f"The generic process did not finish correctly, and exited with code {scanning.returncode}") self.state = State.ERROR # If we captured an output, let's save it into a temporary file, and use that as a new result parameter @@ -128,17 +114,13 @@ def run(self): with open(report_path, "w", encoding="utf-8") as results: results.write(scanning_stdout_results) # Now that the result is a file, change the config to point to it - logging.debug( - f"Overloading {self.ident} config result parameter to {report_path}" - ) + logging.debug(f"Overloading {self.ident} config result parameter to {report_path}") self.set_my_conf("results", value=report_path, overwrite=True) def postprocess(self): logging.info("Running postprocess for the generic environment") if not self.state == State.DONE: - raise RuntimeError( - "No post-processing as generic has not successfully run yet." - ) + raise RuntimeError("No post-processing as generic has not successfully run yet.") super().postprocess() diff --git a/scanners/generic/generic_podman.py b/scanners/generic/generic_podman.py index be3d8e92..04ee2f6a 100644 --- a/scanners/generic/generic_podman.py +++ b/scanners/generic/generic_podman.py @@ -64,9 +64,7 @@ def setup(self): """ if self.state != State.UNCONFIGURED: - raise RuntimeError( - f"generic_podman setup encountered an unexpected state: {self.state}" - ) + raise RuntimeError(f"generic_podman setup encountered an unexpected state: {self.state}") self._setup_podman_cli() self._setup_generic_cli() @@ -87,38 +85,24 @@ def run(self): cli = self.podman.get_complete_cli(self.generic_cli) # The result is stdout if "results" is undefined or `*stdout` - stdout_store = ( - subprocess.PIPE - if not self.my_conf("results") or self.my_conf("results") == "*stdout" - else None - ) + stdout_store = subprocess.PIPE if not self.my_conf("results") or self.my_conf("results") == "*stdout" else None # DO STUFF logging.info(f"Running generic with the following command:\n{cli}") scanning_stdout_results = "" - with subprocess.Popen( - cli, stdout=stdout_store, bufsize=1, universal_newlines=True - ) as scanning: + with subprocess.Popen(cli, stdout=stdout_store, bufsize=1, universal_newlines=True) as scanning: if stdout_store: logging.debug("Storing podman's standard output") for line in scanning.stdout: print(line, end="") scanning_stdout_results += line - logging.debug( - f"generic returned the following:\n=====\n{pp.pformat(scanning)}\n=====" - ) + logging.debug(f"generic returned the following:\n=====\n{pp.pformat(scanning)}\n=====") - if scanning.returncode in self.my_conf( - "container.parameters.validReturns", [0] - ): - logging.info( - f"The generic process finished correctly, and exited with code {scanning.returncode}" - ) + if scanning.returncode in self.my_conf("container.parameters.validReturns", [0]): + logging.info(f"The generic process finished correctly, and exited with code {scanning.returncode}") self.state = State.DONE else: - logging.warning( - f"The generic process did not finish correctly, and exited with code {scanning.returncode}" - ) + logging.warning(f"The generic process did not finish correctly, and exited with code {scanning.returncode}") self.state = State.ERROR # If we captured an output, let's save it into a temporary file, and use that as a new result parameter @@ -127,17 +111,13 @@ def run(self): with open(report_path, "w", encoding="utf-8") as results: results.write(scanning_stdout_results) # Now that the result is a file, change the config to point to it - logging.debug( - f"Overloading {self.ident} config result parameter to {report_path}" - ) + logging.debug(f"Overloading {self.ident} config result parameter to {report_path}") self.set_my_conf("results", value=report_path, overwrite=True) def postprocess(self): logging.info("Running postprocess for the generic Podman environment") if not self.state == State.DONE: - raise RuntimeError( - "No post-processing as generic has not successfully run yet." - ) + raise RuntimeError("No post-processing as generic has not successfully run yet.") super().postprocess() diff --git a/scanners/generic/tools/convert_trivy_k8s_to_sarif.py b/scanners/generic/tools/convert_trivy_k8s_to_sarif.py index ccc038c5..15c52f78 100755 --- a/scanners/generic/tools/convert_trivy_k8s_to_sarif.py +++ b/scanners/generic/tools/convert_trivy_k8s_to_sarif.py @@ -9,8 +9,8 @@ # import argparse import json -import sys import logging +import sys def read_json_block(json_file): @@ -36,9 +36,7 @@ def convert_json_to_sarif(json_data): "version": "2.1.0", "runs": [ { - "tool": { - "driver": {"name": "Trivy-k8s", "version": "0.49.1", "rules": []} - }, + "tool": {"driver": {"name": "Trivy-k8s", "version": "0.49.1", "rules": []}}, "results": [], } ], @@ -63,13 +61,7 @@ def convert_json_to_sarif(json_data): "ruleId": misconf["ID"], "level": misconf["Severity"], "message": {"text": misconf["Message"]}, - "locations": [ - { - "physicalLocation": { - "artifactLocation": {"uri": artifact_location} - } - } - ], + "locations": [{"physicalLocation": {"artifactLocation": {"uri": artifact_location}}}], } # It is observed there are no "StartLine" exists and "Code.Lines" is null in the result file @@ -82,11 +74,7 @@ def convert_json_to_sarif(json_data): new_report["locations"][0]["physicalLocation"]["region"] = { "startLine": misconf["CauseMetadata"]["StartLine"], "endLine": misconf["CauseMetadata"]["EndLine"], - "snippet": { - "text": json.dumps( - misconf["CauseMetadata"]["Code"]["Lines"] - ) - }, + "snippet": {"text": json.dumps(misconf["CauseMetadata"]["Code"]["Lines"])}, } if misconf["ID"] not in rule_ids: @@ -108,9 +96,7 @@ def convert_json_to_sarif(json_data): def main(): # Parse command-line arguments - parser = argparse.ArgumentParser( - description="Convert JSON data to SARIF format with JSON block added to message." - ) + parser = argparse.ArgumentParser(description="Convert JSON data to SARIF format with JSON block added to message.") parser.add_argument( "-f", "--filename", diff --git a/scanners/generic/tools/oobtkube.py b/scanners/generic/tools/oobtkube.py index be691fea..c02548fd 100644 --- a/scanners/generic/tools/oobtkube.py +++ b/scanners/generic/tools/oobtkube.py @@ -124,9 +124,7 @@ def count_total_leaf_keys(data): # pylint: disable=R0913 -def find_leaf_keys_and_test( - data, original_file, ipaddr, port, total_leaf_keys, processed_leaf_keys=0 -): +def find_leaf_keys_and_test(data, original_file, ipaddr, port, total_leaf_keys, processed_leaf_keys=0): """ Iterate the spec data and test each parameter by modifying the value with the attack payload. Test cases: appending 'curl' command, TBD @@ -139,9 +137,7 @@ def find_leaf_keys_and_test( ) else: processed_leaf_keys += 1 - logging.info( - f"Testing a leaf key: '{key}', ({processed_leaf_keys} / {total_leaf_keys})" - ) + logging.info(f"Testing a leaf key: '{key}', ({processed_leaf_keys} / {total_leaf_keys})") cmd = f"sed 's/{key}:.*/{key}: \"echo oobt; curl {ipaddr}:{port}\\/{key}\"/g' {original_file} > {tmp_file}" logging.debug(f"Command run: {cmd}") os.system(cmd) @@ -173,9 +169,7 @@ def scan_with_k8s_config(cfg_file_path: str, obj_data: dict, ipaddr: str, port: spec_data = obj_data.get("spec", {}) total_leaf_keys = count_total_leaf_keys(spec_data) # Apply Kubernetes config (e.g. CR for Operator, or Pod/resource for webhook) - find_leaf_keys_and_test( - spec_data, cfg_file_path, ipaddr, port, total_leaf_keys - ) + find_leaf_keys_and_test(spec_data, cfg_file_path, ipaddr, port, total_leaf_keys) def start_socket_listener(port, shared_queue, data_received, stop_event, duration): @@ -183,9 +177,7 @@ def start_socket_listener(port, shared_queue, data_received, stop_event, duratio try: server_socket.bind((SERVER_HOST, port)) except OSError as e: - logging.error( - f"{e}. Stopping the server. It might take a few seconds. Please try again later." - ) + logging.error(f"{e}. Stopping the server. It might take a few seconds. Please try again later.") stop_event.set() server_socket.close() return @@ -215,9 +207,7 @@ def start_socket_listener(port, shared_queue, data_received, stop_event, duratio break except socket.timeout: - logging.info( - "Socket timeout reached as the test duration expired. Stopping the server." - ) + logging.info("Socket timeout reached as the test duration expired. Stopping the server.") except Exception as e: raise RuntimeError("An error occurred. See logs for details.") from e @@ -292,9 +282,7 @@ def check_can_create(obj_data: dict) -> bool: # pylint: disable=R0915 def main(): # Parse command-line arguments - parser = argparse.ArgumentParser( - description="Simulate a socket listener and respond to requests." - ) + parser = argparse.ArgumentParser(description="Simulate a socket listener and respond to requests.") parser.add_argument( "-i", "--ip-addr", @@ -316,9 +304,7 @@ def main(): default=300, help="Duration for the listener thread to run in seconds (default: 300 seconds)", ) - parser.add_argument( - "-f", "--filename", type=str, required=True, help="Kubernetes config file path" - ) + parser.add_argument("-f", "--filename", type=str, required=True, help="Kubernetes config file path") # add argument for '-o' to output the result to a file parser.add_argument( "-o", @@ -397,9 +383,7 @@ def main(): time.sleep(1) # Adjust the sleep duration as needed elapsed_time_main = time.time() - start_time_main if elapsed_time_main >= args.duration: - logging.debug( - f"The duration of {args.duration} seconds has reached. Exiting..." - ) + logging.debug(f"The duration of {args.duration} seconds has reached. Exiting...") stop_event.set() if data_received.is_set(): @@ -408,9 +392,7 @@ def main(): print_result(sarif_output, args.output, True) vulnerability_count += 1 - logging.info( - f"A vulnerability has been found. Total: {vulnerability_count}" - ) + logging.info(f"A vulnerability has been found. Total: {vulnerability_count}") data_has_been_received = True diff --git a/scanners/podman_wrapper.py b/scanners/podman_wrapper.py index 809924cc..f29086b2 100644 --- a/scanners/podman_wrapper.py +++ b/scanners/podman_wrapper.py @@ -22,9 +22,7 @@ class PodmanWrapper: def __init__(self, app_name, scan_name, image): # First verify that "podman" can be called if not shutil.which("podman"): - raise OSError( - "Podman is not installed or not in the PATH. It is required to run a podman based scanner" - ) + raise OSError("Podman is not installed or not in the PATH. It is required to run a podman based scanner") # Image to use self.image = image @@ -52,9 +50,7 @@ def get_complete_cli(self, cmd=None): def delete_yourself(self): """Deletes the container image created by the run command""" - ret = subprocess.run( - ["podman", "rm", self.container_name], check=False - ).returncode + ret = subprocess.run(["podman", "rm", self.container_name], check=False).returncode if ret: logging.warning(f"Failed to delete container {self.container_name}") return ret @@ -118,13 +114,9 @@ def change_user_id(self, runas_uid, runas_gid): self.add_option("--userns", f"keep-id:uid={runas_uid},gid={runas_gid}") except json.JSONDecodeError as exc: - raise RuntimeError( - f"Unable to parse `podman version` output: {exc}" - ) from exc + raise RuntimeError(f"Unable to parse `podman version` output: {exc}") from exc except (KeyError, AttributeError) as exc: - raise RuntimeError( - f"Unexpected podman version output: Version not found: {exc}" - ) from exc + raise RuntimeError(f"Unexpected podman version output: Version not found: {exc}") from exc except ValueError as exc: raise RuntimeError( f"Unexpected podman version output: unable to decode major/minor version: {exc}" @@ -150,9 +142,7 @@ def change_user_id_workaround(self, runas_uid, runas_gid): logging.debug(f"podman UID mapping: {info['host']['idMappings']['uidmap']}") if info["host"]["idMappings"]["uidmap"] is not None: - subuid_size = ( - sum(i["size"] for i in info["host"]["idMappings"]["uidmap"]) - 1 - ) + subuid_size = sum(i["size"] for i in info["host"]["idMappings"]["uidmap"]) - 1 else: logging.warning( f"the value of host.idMappings.uidmap in 'podman info' is null. \ @@ -160,9 +150,7 @@ def change_user_id_workaround(self, runas_uid, runas_gid): DEFAULT_MAP_SIZE {self.DEFAULT_ID_MAPPING_MAP_SIZE} applied" ) if info["host"]["idMappings"]["gidmap"] is not None: - subgid_size = ( - sum(i["size"] for i in info["host"]["idMappings"]["gidmap"]) - 1 - ) + subgid_size = sum(i["size"] for i in info["host"]["idMappings"]["gidmap"]) - 1 else: logging.warning( f"the value of host.idMappings.gidmap in 'podman info' is null. \ @@ -173,9 +161,7 @@ def change_user_id_workaround(self, runas_uid, runas_gid): except json.JSONDecodeError as exc: raise RuntimeError(f"Unable to parse `podman info` output: {exc}") from exc except (KeyError, AttributeError) as exc: - raise RuntimeError( - f"Unexpected podman info output: entry not found: {exc}" - ) from exc + raise RuntimeError(f"Unexpected podman info output: entry not found: {exc}") from exc except Exception as exc: logging.error(f"change_user_id unexpected error: {exc}") raise RuntimeError(f"Unable to retrieve podman UID mapping: {exc}") from exc @@ -184,12 +170,8 @@ def change_user_id_workaround(self, runas_uid, runas_gid): if subuid_size >= runas_uid: self.add_option("--uidmap", f"0:1:{runas_uid}") self.add_option("--uidmap", f"{runas_uid}:0:1") - self.add_option( - "--uidmap", f"{runas_uid+1}:{runas_uid+1}:{subuid_size-runas_uid}" - ) - logging.debug( - "podman enabled UID mapping arguments (using uidmap workaround)" - ) + self.add_option("--uidmap", f"{runas_uid+1}:{runas_uid+1}:{subuid_size-runas_uid}") + logging.debug("podman enabled UID mapping arguments (using uidmap workaround)") else: raise RuntimeError( "subUIDs seem to be disabled/misconfigured for the current user. \ @@ -200,12 +182,8 @@ def change_user_id_workaround(self, runas_uid, runas_gid): if subgid_size >= runas_gid: self.add_option("--gidmap", f"0:1:{runas_gid}") self.add_option("--gidmap", f"{runas_gid}:0:1") - self.add_option( - "--gidmap", f"{runas_gid+1}:{runas_gid+1}:{subgid_size-runas_gid}" - ) - logging.debug( - "podman enabled GID mapping arguments (using uidmap workaround)" - ) + self.add_option("--gidmap", f"{runas_gid+1}:{runas_gid+1}:{subgid_size-runas_gid}") + logging.debug("podman enabled GID mapping arguments (using uidmap workaround)") else: raise RuntimeError( "subGIDs seem to be disabled/misconfigured for the current user. \ diff --git a/scanners/zap/zap.py b/scanners/zap/zap.py index 4f30ba74..6021645a 100644 --- a/scanners/zap/zap.py +++ b/scanners/zap/zap.py @@ -124,10 +124,7 @@ def data_for_defect_dojo(self): def get_update_command(self): """Returns a list of all options required to update ZAP plugins""" - if not ( - self.my_conf("miscOptions.updateAddons") - or self.my_conf("miscOptions.additionalAddons") - ): + if not (self.my_conf("miscOptions.updateAddons") or self.my_conf("miscOptions.additionalAddons")): return [] command = [ @@ -142,9 +139,7 @@ def get_update_command(self): if isinstance(addons, str): addons = addons.split(",") if len(addons) else [] if not isinstance(addons, list): - logging.warning( - "miscOptions.additionalAddons MUST be either a list or a string of comma-separated values" - ) + logging.warning("miscOptions.additionalAddons MUST be either a list or a string of comma-separated values") addons = [] for addon in addons: @@ -178,9 +173,7 @@ def _setup_zap_cli(self): self.zap_cli.extend(self._get_standard_options()) # Create a session, to store them as evidence - self.zap_cli.extend( - ["-newsession", f"{self.container_work_dir}/session_data/session"] - ) + self.zap_cli.extend(["-newsession", f"{self.container_work_dir}/session_data/session"]) if not self.my_conf("miscOptions.enableUI", default=False): # Disable UI @@ -208,9 +201,7 @@ def _get_standard_options(self): standard = [] # Proxy workaround (because it currently can't be configured from Automation Framework) - p_host, p_port = self.my_conf("proxy.proxyHost"), self.my_conf( - "proxy.proxyPort" - ) + p_host, p_port = self.my_conf("proxy.proxyHost"), self.my_conf("proxy.proxyPort") if p_host and p_port: standard.extend(["-config", f"network.connection.httpProxy.host={p_host}"]) standard.extend(["-config", f"network.connection.httpProxy.port={p_port}"]) @@ -222,9 +213,7 @@ def _get_standard_options(self): # Select a port that is unlikely to collide with anything else, but let the user able to # override it if need be local_port = self.my_conf("miscOptions.zapPort", 47691) - standard.extend( - ["-config", f"network.localServers.mainProxy.port={local_port}"] - ) + standard.extend(["-config", f"network.localServers.mainProxy.port={local_port}"]) # By default, ZAP allocates ΒΌ of the available RAM to the Java process. # This is not efficient when RapiDAST is executed in a dedicated environment. @@ -241,9 +230,7 @@ def _get_standard_options(self): # disabling these 2 rules only here since they might actually be useful else where # pylint: disable=unused-argument def _add_env(self, key, value=None): - logging.warning( - "_add_env() was called on the parent ZAP class. This is likely a bug. No operation done" - ) + logging.warning("_add_env() was called on the parent ZAP class. This is likely a bug. No operation done") def _include_file(self, host_path, dest_in_container=None): """Copies the file from host_path on the host to dest_in_container in the container @@ -324,9 +311,7 @@ def _setup_zap_automation(self): with open(af_template, "r", encoding="utf-8") as stream: self.automation_config = yaml.safe_load(stream) except yaml.YAMLError as exc: - raise RuntimeError( - f"Something went wrong while parsing the config '{af_template}':\n {str(exc)}" - ) from exc + raise RuntimeError(f"Something went wrong while parsing the config '{af_template}':\n {str(exc)}") from exc # Configure the basic environment target try: @@ -404,15 +389,11 @@ def _setup_api(self): # copy the file in the container's result directory # This allows the OpenAPI to be kept as evidence container_openapi_file = f"{self.container_work_dir}/openapi.json" - self._include_file( - host_path=api_file, dest_in_container=container_openapi_file - ) + self._include_file(host_path=api_file, dest_in_container=container_openapi_file) openapi["parameters"]["apiFile"] = container_openapi_file else: - raise ValueError( - "No apiUrl or apiFile is defined in the config, in apiScan.apis" - ) + raise ValueError("No apiUrl or apiFile is defined in the config, in apiScan.apis") # default target: main URL, or can be overridden in apiScan openapi["parameters"]["targetUrl"] = self._append_slash_to_url( @@ -633,22 +614,14 @@ def _setup_report(self): appended = 0 for format_id in formats: try: - logging.debug( - f"report {format_id}, filename: {reports[format_id].name}" - ) - self.automation_config["jobs"].append( - self._construct_report_af(reports[format_id]) - ) + logging.debug(f"report {format_id}, filename: {reports[format_id].name}") + self.automation_config["jobs"].append(self._construct_report_af(reports[format_id])) appended += 1 except KeyError as exc: - logging.warning( - f"Reports: {exc.args[0]} is not a valid format. Ignoring" - ) + logging.warning(f"Reports: {exc.args[0]} is not a valid format. Ignoring") if not appended: logging.warning("Creating a default report as no valid were found") - self.automation_config["jobs"].append( - self._construct_report_af(reports["json"]) - ) + self.automation_config["jobs"].append(self._construct_report_af(reports["json"])) def _setup_summary(self): """Adds a outputSummary job""" @@ -684,9 +657,7 @@ def _enforce_job_parameters(self, job): @generic_authentication_factory() def authentication_factory(self): """This is the default function, attached to error reporting""" - raise RuntimeError( - f"No valid authenticator found for ZAP. ZAP current config is: {self.config}" - ) + raise RuntimeError(f"No valid authenticator found for ZAP. ZAP current config is: {self.config}") @authentication_factory.register(None) def authentication_set_anonymous(self): @@ -841,9 +812,7 @@ def authentication_set_oauth2_rtoken(self): token = oauth2_get_token_from_rtoken(auth, proxy=self.my_conf("proxy")) if token: # Delete previous config, and creating a new one - logging.debug( - "successfully retrieved a token, hijacking authentication" - ) + logging.debug("successfully retrieved a token, hijacking authentication") self.set_my_conf("authentication.type", "http_header") self.set_my_conf(f"{params_path}", {}) self.set_my_conf(f"{params_path}.name", "Authorization") @@ -851,9 +820,7 @@ def authentication_set_oauth2_rtoken(self): # re-run authentication return self.authentication_factory() else: - logging.warning( - "Preauthentication failed, continuing with regular oauth2" - ) + logging.warning("Preauthentication failed, continuing with regular oauth2") # 1- complete the context: script, verification and user context_["authentication"] = { @@ -951,12 +918,8 @@ def _manual_oauth2_download(self, auth, proxy): url = self.my_conf(change.config_url) if url: if authenticated_download_with_rtoken(url, change.path, auth, proxy): - logging.info( - f"Successful download of scanner's {change.config_url}" - ) - self.config.set( - f"scanners.{self.ident}.{change.config_path}", change.path - ) + logging.info(f"Successful download of scanner's {change.config_url}") + self.config.set(f"scanners.{self.ident}.{change.config_path}", change.path) self.config.delete(f"scanners.{self.ident}.{change.config_url}") else: logging.warning("Failed to download scanner's {change.config_url}") diff --git a/scanners/zap/zap_none.py b/scanners/zap/zap_none.py index 552b246b..bb171103 100644 --- a/scanners/zap/zap_none.py +++ b/scanners/zap/zap_none.py @@ -129,22 +129,16 @@ def run(self): cli = ["sh", "-c", self._zap_cli_list_to_str_for_sh(self.zap_cli)] result = subprocess.run(cli, check=False) - logging.debug( - f"ZAP returned the following:\n=====\n{pp.pformat(result)}\n=====" - ) + logging.debug(f"ZAP returned the following:\n=====\n{pp.pformat(result)}\n=====") # Zap's return codes : https://www.zaproxy.org/docs/desktop/addons/automation-framework/ if result.returncode in [0, 2]: # 0: ZAP returned correctly. 2: ZAP returned warning - logging.info( - f"The ZAP process finished with no errors, and exited with code {result.returncode}" - ) + logging.info(f"The ZAP process finished with no errors, and exited with code {result.returncode}") self.state = State.DONE else: # 1: Zap hit an error - logging.warning( - f"The ZAP process did not finish correctly, and exited with code {result.returncode}" - ) + logging.warning(f"The ZAP process did not finish correctly, and exited with code {result.returncode}") self.state = State.ERROR def postprocess(self): @@ -192,9 +186,7 @@ def _setup_ajax_spider(self): "Make sure that /dev/shm/ is at least 1GB in size [ideally at least 2GB]" ) except FileNotFoundError: - logging.warning( - "/dev/shm not present. Unable to calcuate shared memory size" - ) + logging.warning("/dev/shm not present. Unable to calcuate shared memory size") # Firefox tends to use _a lot_ of threads # Assume we're regulated by cgroup v2 @@ -202,13 +194,9 @@ def _setup_ajax_spider(self): with open("/sys/fs/cgroup/pids.max", encoding="utf-8") as f: pid_val = f.readline().rstrip() if pid_val == "max" or int(pid_val) > 10000: - logging.debug( - f"cgroup v2 has a sufficient pid limit: {pid_val}" - ) + logging.debug(f"cgroup v2 has a sufficient pid limit: {pid_val}") else: - logging.warning( - f"Number of threads may be too low for SpiderAjax: cgroupv2 pids.max={pid_val}" - ) + logging.warning(f"Number of threads may be too low for SpiderAjax: cgroupv2 pids.max={pid_val}") except FileNotFoundError: # open /sys/fs/cgroup/pids.max failed: root cgroup (unlimited pids) or no cgroup v2 at all. # assume the former @@ -298,10 +286,7 @@ def _check_plugin_status(self): result = subprocess.run(command, check=False, capture_output=True) if result.returncode == 0: logging.debug("ZAP appears to be in a correct state") - elif ( - result.stderr.find(bytes("The mandatory add-on was not found:", "ascii")) - > 0 - ): + elif result.stderr.find(bytes("The mandatory add-on was not found:", "ascii")) > 0: logging.info("Missing mandatory plugins. Fixing") url_root = "https://github.com/zaproxy/zap-extensions/releases/download" anonymous_download( @@ -326,9 +311,7 @@ def _check_plugin_status(self): result = subprocess.run(command, check=False) else: - logging.warning( - f"ZAP appears to be in a incorrect state. Error: {result.stderr}" - ) + logging.warning(f"ZAP appears to be in a incorrect state. Error: {result.stderr}") def _create_home_if_needed(self): """Some tools (most notably: ZAP's Ajax Spider with Firefox) require a writable home directory. diff --git a/scanners/zap/zap_podman.py b/scanners/zap/zap_podman.py index f7c08e79..131a97fe 100644 --- a/scanners/zap/zap_podman.py +++ b/scanners/zap/zap_podman.py @@ -109,30 +109,22 @@ def run(self): # DO STUFF logging.info(f"Running ZAP with the following command:\n{cli}") result = subprocess.run(cli, check=False) - logging.debug( - f"ZAP returned the following:\n=====\n{pp.pformat(result)}\n=====" - ) + logging.debug(f"ZAP returned the following:\n=====\n{pp.pformat(result)}\n=====") # Zap's return codes : https://www.zaproxy.org/docs/desktop/addons/automation-framework/ if result.returncode in [0, 2]: # 0: ZAP returned correctly. 2: ZAP returned warning - logging.info( - f"The ZAP process finished with no errors, and exited with code {result.returncode}" - ) + logging.info(f"The ZAP process finished with no errors, and exited with code {result.returncode}") self.state = State.DONE else: # 1: Zap hit an error, >125 : podman returned an error - logging.warning( - f"The ZAP process did not finish correctly, and exited with code {result.returncode}" - ) + logging.warning(f"The ZAP process did not finish correctly, and exited with code {result.returncode}") self.state = State.ERROR def postprocess(self): logging.info("Running postprocess for the ZAP Podman environment") if not self.state == State.DONE: - raise RuntimeError( - "No post-processing as ZAP has not successfully run yet." - ) + raise RuntimeError("No post-processing as ZAP has not successfully run yet.") super().postprocess() diff --git a/tests/configmodel/test_convert.py b/tests/configmodel/test_convert.py index bf8ff206..54b996ae 100644 --- a/tests/configmodel/test_convert.py +++ b/tests/configmodel/test_convert.py @@ -38,9 +38,7 @@ def test_v2_to_v3(config_v2): newconf = configmodel.converter.convert_from_version_2_to_3(oldconf) # Check that new path was created - assert newconf.get("scanners.zap.miscOptions.updateAddons", "x") == oldconf.get( - "scanners.zap.updateAddons", "y" - ) + assert newconf.get("scanners.zap.miscOptions.updateAddons", "x") == oldconf.get("scanners.zap.updateAddons", "y") # Check that old path was deleted assert not newconf.exists("scanners.zap.updateAddons") @@ -60,9 +58,9 @@ def test_v4_to_v5(config_v4): newconf = configmodel.converter.convert_from_version_4_to_5(oldconf) # Check that new path was created - assert newconf.get( - "scanners.zap.miscOptions.oauth2ManualDownload", "x" - ) == oldconf.get("scanners.zap.miscOptions.oauth2OpenapiManualDownload", "y") + assert newconf.get("scanners.zap.miscOptions.oauth2ManualDownload", "x") == oldconf.get( + "scanners.zap.miscOptions.oauth2OpenapiManualDownload", "y" + ) # Check that old path was deleted assert not newconf.exists("scanners.zap.miscOptions.oauth2OpenapiManualDownload") @@ -79,12 +77,8 @@ def test_v1_to_v2(config_v1): def test_v0_to_v1(config_v0): conf_v1 = configmodel.converter.convert_from_version_0_to_1(config_v0) - assert conf_v1.get("application.shortName", "x") == config_v0.get( - "general.serviceName", "y" - ) - assert conf_v1.get("scanners.zap.activeScan.policy", "x") == config_v0.get( - "scan.policies.scanPolicyName", "y" - ) + assert conf_v1.get("application.shortName", "x") == config_v0.get("general.serviceName", "y") + assert conf_v1.get("scanners.zap.activeScan.policy", "x") == config_v0.get("scan.policies.scanPolicyName", "y") def test_basic_config_updater(): @@ -97,10 +91,7 @@ def test_basic_config_updater(): oldest = configmodel.RapidastConfigModel({}) last = configmodel.converter.update_to_latest_config(oldest) - assert ( - int(last.get("config.configVersion")) - == configmodel.converter.CURR_CONFIG_VERSION - ) + assert int(last.get("config.configVersion")) == configmodel.converter.CURR_CONFIG_VERSION if __name__ == "__main__": diff --git a/tests/exports/test_google_cloud_storage.py b/tests/exports/test_google_cloud_storage.py index f438cf89..beabb5ea 100644 --- a/tests/exports/test_google_cloud_storage.py +++ b/tests/exports/test_google_cloud_storage.py @@ -1,11 +1,12 @@ -import pytest - -from unittest.mock import Mock, MagicMock, patch, mock_open - import datetime +from unittest.mock import MagicMock +from unittest.mock import Mock +from unittest.mock import mock_open +from unittest.mock import patch -from exports.google_cloud_storage import GoogleCloudStorage +import pytest +from exports.google_cloud_storage import GoogleCloudStorage @patch("exports.google_cloud_storage.storage.Client.from_service_account_json") @@ -21,6 +22,7 @@ def test_GCS_simple_init_keyfile(mock_from_json): mock_from_json.assert_called_once_with("/key/file.json") mock_client.get_bucket.assert_called_once_with("bucket_name") + @patch("exports.google_cloud_storage.storage.Client") def test_GCS_simple_init_no_keyfile(mock_client): gcs = GoogleCloudStorage("bucket_name", "app_name", "directory_name") @@ -33,7 +35,6 @@ def test_GCS_simple_init_no_keyfile(mock_client): @patch("exports.google_cloud_storage.storage.Client") @patch("exports.google_cloud_storage.uuid") def test_GCS_create_metadata(mock_uuid, mock_client): - mock_uuid.uuid1.return_value = 123 gcs = GoogleCloudStorage("bucket_name", "app_name", "directory_name") @@ -62,7 +63,7 @@ def test_GCS_export_scan(MockRandom, MockDateTime, MockClient): # Forcing the date mock_now = MagicMock() - mock_now.isoformat.return_value = '2024-01-31T00:00:00' + mock_now.isoformat.return_value = "2024-01-31T00:00:00" MockDateTime.now.return_value = mock_now # catching the Client @@ -83,10 +84,7 @@ def test_GCS_export_scan(MockRandom, MockDateTime, MockClient): gcs = GoogleCloudStorage("bucket_name", "app_name", "directory_name") - import_data = { - "scan_type": "ABC", - "foo": "bar" - } + import_data = {"scan_type": "ABC", "foo": "bar"} # hack: use the pytest file itself as a scan gcs.export_scan(import_data, __file__) diff --git a/tests/scanners/generic/test_generic.py b/tests/scanners/generic/test_generic.py index 242428ce..118896c0 100644 --- a/tests/scanners/generic/test_generic.py +++ b/tests/scanners/generic/test_generic.py @@ -19,9 +19,7 @@ def test_generic_podman_cli(test_config): scanner = GenericPodman(config=test_config) scanner.setup() - assert {"podman", "run", "--name", "myimage", "--pod", "myPod"}.issubset( - set(scanner.podman.get_complete_cli()) - ) + assert {"podman", "run", "--name", "myimage", "--pod", "myPod"}.issubset(set(scanner.podman.get_complete_cli())) def test_generic_podman_volume(test_config): diff --git a/tests/scanners/generic/tools/test_convert_trivy_k8s.py b/tests/scanners/generic/tools/test_convert_trivy_k8s.py index eff0623c..dc671aa8 100644 --- a/tests/scanners/generic/tools/test_convert_trivy_k8s.py +++ b/tests/scanners/generic/tools/test_convert_trivy_k8s.py @@ -3,7 +3,8 @@ import pytest -from scanners.generic.tools.convert_trivy_k8s_to_sarif import convert_json_to_sarif, read_json_block +from scanners.generic.tools.convert_trivy_k8s_to_sarif import convert_json_to_sarif +from scanners.generic.tools.convert_trivy_k8s_to_sarif import read_json_block TEST_DATA_DIR = "tests/scanners/generic/tools/test_data_convert_trivy_k8s/" @@ -19,6 +20,7 @@ def _assert_default_sarif_info(sarif): return True + def test_read_json_block(): json_file = TEST_DATA_DIR + "sample-single-result.json" json_assert = json.load(open(json_file)) diff --git a/tests/scanners/generic/tools/test_oobtkube.py b/tests/scanners/generic/tools/test_oobtkube.py index f71329ae..06c76b19 100644 --- a/tests/scanners/generic/tools/test_oobtkube.py +++ b/tests/scanners/generic/tools/test_oobtkube.py @@ -35,22 +35,15 @@ def test_find_leaf_keys_and_test(mock_system, test_data, caplog): total_leaf_keys = oobtkube.count_total_leaf_keys(test_data) - oobtkube.find_leaf_keys_and_test( - test_data, "cr_test_file", "10.10.10.10", "12345", total_leaf_keys - ) + oobtkube.find_leaf_keys_and_test(test_data, "cr_test_file", "10.10.10.10", "12345", total_leaf_keys) processed_count = 0 leaves = ["leaf1", "leaf2", "leaf3"] for leaf_key in leaves: processed_count += 1 - assert ( - f"Testing a leaf key: '{leaf_key}', ({processed_count} / {total_leaf_keys})" - in caplog.text - ) - - assert ( - mock_system.call_count == 6 - ) # Each leaf key runs `sed` and `kubectl` commands (2 calls per key) + assert f"Testing a leaf key: '{leaf_key}', ({processed_count} / {total_leaf_keys})" in caplog.text + + assert mock_system.call_count == 6 # Each leaf key runs `sed` and `kubectl` commands (2 calls per key) def test_parse_resource_yaml(): diff --git a/tests/scanners/test_downloaders.py b/tests/scanners/test_downloaders.py index 36ac95de..02bca5c4 100644 --- a/tests/scanners/test_downloaders.py +++ b/tests/scanners/test_downloaders.py @@ -1,12 +1,12 @@ -from unittest.mock import Mock +from collections import namedtuple from unittest.mock import MagicMock +from unittest.mock import Mock from unittest.mock import patch import pytest from scanners import downloaders -from collections import namedtuple @pytest.fixture(scope="function") def my_auth(): @@ -16,6 +16,7 @@ def my_auth(): "rtoken": "aut_rtoken", } + @pytest.fixture(scope="function") def my_proxy(): proxy = { @@ -23,13 +24,13 @@ def my_proxy(): "proxyPort": "proxyPort", } + @patch("scanners.downloaders.requests.get") def test_anonymous_download(mock_get, my_proxy): def request_get(url, allow_redirects=True, proxies=None): Response = namedtuple("Response", ["status_code", "content"]) return Response(status_code=200, content="content") - mock_get.side_effect = request_get ret = downloaders.anonymous_download("url", dest=None, proxy=my_proxy) @@ -37,8 +38,6 @@ def request_get(url, allow_redirects=True, proxies=None): assert ret == "content" - - @patch("scanners.downloaders.requests.Session") def test_oauth2_get_token_from_rtoken(mock_session, my_auth, my_proxy): def fake_Session(): @@ -55,6 +54,7 @@ def fake_post(url, **kwargs): assert rtoken == 123 + @patch("scanners.downloaders.requests.Session") @patch("scanners.downloaders.oauth2_get_token_from_rtoken") @patch("builtins.open") @@ -63,6 +63,7 @@ def fake_Session(): def fake_post(url, **kwargs): Post = namedtuple("Post", ["raise_for_status", "text"]) return Post(raise_for_status=lambda: None, text=b"{'access_token':123}") + def fake_get(url, **kwargs): Get = namedtuple("Get", ["status_code", "text"]) return Get(status_code=200, text="text") @@ -76,4 +77,3 @@ def fake_get(url, **kwargs): res = downloaders.authenticated_download_with_rtoken("url", "Nowhere", auth=my_auth, proxy=my_proxy) assert res == True - diff --git a/tests/scanners/test_path_translators.py b/tests/scanners/test_path_translators.py index 9f47418e..9b78a813 100644 --- a/tests/scanners/test_path_translators.py +++ b/tests/scanners/test_path_translators.py @@ -7,10 +7,5 @@ def test_path_translation(): id3 = ("id3", "/z/x/c/v", "/b/n/m") path_map = make_mapping_for_scanner("Test", id1, id2, id3) - assert ( - path_map.host_2_container("/a/s/d/f/g/subdir/myfile") - == "/h/j/k/l/subdir/myfile" - ) - assert ( - path_map.container_2_host("/b//n/m/subdir/myfile") == "/z/x/c/v/subdir/myfile" - ) + assert path_map.host_2_container("/a/s/d/f/g/subdir/myfile") == "/h/j/k/l/subdir/myfile" + assert path_map.container_2_host("/b//n/m/subdir/myfile") == "/z/x/c/v/subdir/myfile" diff --git a/tests/scanners/test_podman_wrapper.py b/tests/scanners/test_podman_wrapper.py index d6661cbc..7623a884 100644 --- a/tests/scanners/test_podman_wrapper.py +++ b/tests/scanners/test_podman_wrapper.py @@ -1,9 +1,9 @@ import shutil -import pytest import subprocess - from unittest.mock import patch +import pytest + from scanners.podman_wrapper import PodmanWrapper @@ -12,7 +12,7 @@ def test_change_user_id(mock_subprocess): wrap = PodmanWrapper(app_name="pytest", scan_name="pytest", image="nothing") version = '{"Client":{"APIVersion":"5.2.2","Version":"5.2.2","GoVersion":"go1.22.6","GitCommit":"","BuiltTime":"Wed Aug 21 02:00:00 2024","Built":1724198400,"OsArch":"linux/amd64","Os":"linux"}}' - run = subprocess.CompletedProcess(args=None, returncode=0, stdout=version.encode('utf-8')) + run = subprocess.CompletedProcess(args=None, returncode=0, stdout=version.encode("utf-8")) mock_subprocess.return_value = run @@ -21,6 +21,7 @@ def test_change_user_id(mock_subprocess): i = wrap.opts.index("--userns") assert wrap.opts[i + 1] == "keep-id:uid=1000,gid=1000" + @patch("scanners.podman_wrapper.subprocess.run") def test_change_user_id_workaround(mock_subprocess): wrap = PodmanWrapper(app_name="pytest", scan_name="pytest", image="nothing") @@ -58,8 +59,7 @@ def test_change_user_id_workaround(mock_subprocess): } """ - - run = subprocess.CompletedProcess(args=None, returncode=0, stdout=info.encode('utf-8')) + run = subprocess.CompletedProcess(args=None, returncode=0, stdout=info.encode("utf-8")) mock_subprocess.return_value = run diff --git a/tests/scanners/zap/test_setup.py b/tests/scanners/zap/test_setup.py index 8b3749a0..b9ddde13 100644 --- a/tests/scanners/zap/test_setup.py +++ b/tests/scanners/zap/test_setup.py @@ -16,9 +16,7 @@ @pytest.fixture(scope="function") def test_config(): - return configmodel.RapidastConfigModel( - {"application": {"url": "http://example.com"}} - ) + return configmodel.RapidastConfigModel({"application": {"url": "http://example.com"}}) ## Basic test @@ -29,10 +27,7 @@ def test_setup_openapi(test_config): test_zap.setup() # a '/' should have been appended - assert ( - test_zap.automation_config["env"]["contexts"][0]["urls"][0] - == "http://example.com/" - ) + assert test_zap.automation_config["env"]["contexts"][0]["urls"][0] == "http://example.com/" for item in test_zap.automation_config["jobs"]: if item["type"] == "openapi": @@ -85,9 +80,7 @@ def test_setup_authentication_invalid_auth_configured(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -105,9 +98,7 @@ def test_setup_authentication_http_header(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -125,9 +116,7 @@ def test_setup_authentication_cookie(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -144,9 +133,7 @@ def test_setup_authentication_http_basic(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -168,9 +155,7 @@ def test_setup_authentication_auth_rtoken_configured(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -179,10 +164,7 @@ def test_setup_authentication_auth_rtoken_configured(test_config): test_zap.setup() assert test_zap.authenticated == True # TODO: check "RTOKEN" - assert ( - test_zap.automation_config["jobs"][0]["parameters"]["name"] - == "add-bearer-token" - ) + assert test_zap.automation_config["jobs"][0]["parameters"]["name"] == "add-bearer-token" def test_setup_authentication_auth_rtoken_preauth(test_config): @@ -199,9 +181,7 @@ def test_setup_authentication_auth_rtoken_preauth(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") test_zap = ZapNone(config=test_config) @@ -224,9 +204,7 @@ def test_setup_import_urls(test_config): def test_setup_exclude_urls(test_config): test_config.set("scanners.zap.urls.excludes", ["abc", "def"]) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") test_zap = ZapNone(config=test_config) test_zap.setup() @@ -237,9 +215,7 @@ def test_setup_exclude_urls(test_config): def test_setup_include_urls(test_config): test_config.set("scanners.zap.urls.includes", ["abc", "def"]) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") test_zap = ZapNone(config=test_config) test_zap.setup() @@ -302,10 +278,7 @@ def test_setup_graphql(test_config): if item["type"] == "graphql": assert item["parameters"]["endpoint"] == TEST_GRAPHQL_ENDPOINT assert item["parameters"]["schemaUrl"] == TEST_GRAPHQL_SCHEMA_URL - assert ( - item["parameters"]["schemaFile"] - == f"{test_zap.container_work_dir}/schema.graphql" - ) + assert item["parameters"]["schemaFile"] == f"{test_zap.container_work_dir}/schema.graphql" break else: assert False, "graphql job not found" @@ -405,18 +378,14 @@ def test_setup_override_cfg(test_config): override_cfg1 = "formhandler.fields.field(0).fieldId=namespace" override_cfg2 = "formhandler.fields.field(0).value=default" - test_config.set( - "scanners.zap.miscOptions.overrideConfigs", [override_cfg1, override_cfg2] - ) + test_config.set("scanners.zap.miscOptions.overrideConfigs", [override_cfg1, override_cfg2]) test_zap = ZapNone(config=test_config) test_zap.setup() assert f"{override_cfg1}" in test_zap.zap_cli assert f"{override_cfg2}" in test_zap.zap_cli - assert r"formhandler.fields.field\(0\)" in test_zap._zap_cli_list_to_str_for_sh( - test_zap.zap_cli - ) + assert r"formhandler.fields.field\(0\)" in test_zap._zap_cli_list_to_str_for_sh(test_zap.zap_cli) def test_setup_override_non_list_format(test_config): diff --git a/tests/scanners/zap/test_setup_none.py b/tests/scanners/zap/test_setup_none.py index b6069e86..bdb953a8 100644 --- a/tests/scanners/zap/test_setup_none.py +++ b/tests/scanners/zap/test_setup_none.py @@ -12,9 +12,7 @@ @pytest.fixture(scope="function") def test_config(): - return configmodel.RapidastConfigModel( - {"application": {"url": "http://example.com"}} - ) + return configmodel.RapidastConfigModel({"application": {"url": "http://example.com"}}) @patch("scanners.zap.zap_none.platform.system") @@ -37,9 +35,7 @@ def test_none_handling_ajax(mock_warning, mock_disk_usage, mock_system, test_con test_zap._setup_ajax_spider() mock_pidsmax.assert_called_once_with("/sys/fs/cgroup/pids.max", encoding="utf-8") - mock_warning.assert_any_call( - "Number of threads may be too low for SpiderAjax: cgroupv2 pids.max=42" - ) + mock_warning.assert_any_call("Number of threads may be too low for SpiderAjax: cgroupv2 pids.max=42") mock_warning.assert_any_call( "Insufficient shared memory to run an Ajax Spider correctly (67108864 bytes). " "Make sure that /dev/shm/ is at least 1GB in size [ideally at least 2GB]" @@ -56,11 +52,7 @@ def test_zap_none_postprocess(mock_tarfile, mock_copytree, mock_warning, test_co with patch("builtins.open", mock_open(read_data="max 2\n")) as mock_pidsevents: test_zap.postprocess() - mock_pidsevents.assert_called_once_with( - "/sys/fs/cgroup/pids.events", encoding="utf-8" - ) - mock_warning.assert_any_call( - "Scanner may have been throttled by CGroupv2 PID limits: pids.events reports max 2" - ) + mock_pidsevents.assert_called_once_with("/sys/fs/cgroup/pids.events", encoding="utf-8") + mock_warning.assert_any_call("Scanner may have been throttled by CGroupv2 PID limits: pids.events reports max 2") assert test_zap.state == State.PROCESSED diff --git a/tests/scanners/zap/test_setup_podman.py b/tests/scanners/zap/test_setup_podman.py index aebc24fe..550458ac 100644 --- a/tests/scanners/zap/test_setup_podman.py +++ b/tests/scanners/zap/test_setup_podman.py @@ -15,9 +15,7 @@ @pytest.fixture(scope="function") def test_config(): - return configmodel.RapidastConfigModel( - {"application": {"url": "http://example.com"}} - ) + return configmodel.RapidastConfigModel({"application": {"url": "http://example.com"}}) ## Testing Authentication methods ## @@ -29,9 +27,7 @@ def test_setup_podman_authentication_invalid_auth_configured(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -49,9 +45,7 @@ def test_setup_podman_authentication_http_header(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -69,19 +63,14 @@ def test_setup_podman_authentication_cookie(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) test_zap = ZapPodman(config=test_config) test_zap.setup() assert test_zap.authenticated == False - assert ( - "ZAP_AUTH_HEADER_VALUE=mycookiename=mycookieval" - in test_zap.podman.get_complete_cli() - ) + assert "ZAP_AUTH_HEADER_VALUE=mycookiename=mycookieval" in test_zap.podman.get_complete_cli() def test_setup_podman_authentication_http_basic(test_config): @@ -91,19 +80,14 @@ def test_setup_podman_authentication_http_basic(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) test_zap = ZapPodman(config=test_config) test_zap.setup() assert test_zap.authenticated == False - assert ( - "ZAP_AUTH_HEADER_VALUE=Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==" - in test_zap.podman.get_complete_cli() - ) + assert "ZAP_AUTH_HEADER_VALUE=Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==" in test_zap.podman.get_complete_cli() def test_setup_podman_authentication_auth_rtoken_configured(test_config): @@ -123,9 +107,7 @@ def test_setup_podman_authentication_auth_rtoken_configured(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -134,10 +116,7 @@ def test_setup_podman_authentication_auth_rtoken_configured(test_config): test_zap.setup() assert test_zap.authenticated == True assert "RTOKEN" in test_zap.podman.get_complete_cli() - assert ( - test_zap.automation_config["jobs"][0]["parameters"]["name"] - == "add-bearer-token" - ) + assert test_zap.automation_config["jobs"][0]["parameters"]["name"] == "add-bearer-token" def test_setup_podman_authentication_auth_rtoken_preauth(test_config): @@ -154,9 +133,7 @@ def test_setup_podman_authentication_auth_rtoken_preauth(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") test_zap = ZapPodman(config=test_config) diff --git a/tests/test_defectdojo_integration.py b/tests/test_defectdojo_integration.py index c6b9e025..876ab868 100644 --- a/tests/test_defectdojo_integration.py +++ b/tests/test_defectdojo_integration.py @@ -36,8 +36,6 @@ def test_dd_parameters(): with pytest.raises(KeyError): defect_d.params["verify"] - defect_d = DefectDojo( - "https://127.0.0.1:12345", token="random_token", ssl="CAbundle" - ) + defect_d = DefectDojo("https://127.0.0.1:12345", token="random_token", ssl="CAbundle") assert defect_d.params["timeout"] == DefectDojo.DD_CONNECT_TIMEOUT assert defect_d.params["verify"] == "CAbundle" diff --git a/tools/updater_config.py b/tools/updater_config.py index 61206bda..4472c258 100755 --- a/tools/updater_config.py +++ b/tools/updater_config.py @@ -39,14 +39,10 @@ args.loglevel = args.loglevel.upper() add_logging_level("VERBOSE", logging.DEBUG + 5) logging.basicConfig(format="%(levelname)s:%(message)s", level=args.loglevel) - logging.debug( - f"log level set to debug. Config file: '{parser.parse_args().config_file.name}'" - ) + logging.debug(f"log level set to debug. Config file: '{parser.parse_args().config_file.name}'") try: - config = configmodel.RapidastConfigModel( - yaml.safe_load(parser.parse_args().config_file) - ) + config = configmodel.RapidastConfigModel(yaml.safe_load(parser.parse_args().config_file)) except yaml.YAMLError as exc: raise RuntimeError( f"Something went wrong while parsing one of the config '{parser.parse_args().config_file}':\n {str(exc)}" diff --git a/utils/remove_openapi_ref_recursion.py b/utils/remove_openapi_ref_recursion.py index 9520990c..fdff3148 100644 --- a/utils/remove_openapi_ref_recursion.py +++ b/utils/remove_openapi_ref_recursion.py @@ -57,9 +57,7 @@ def main(input_file, output_file, debug): if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Remove recursive $ref from OpenAPI JSON file." - ) + parser = argparse.ArgumentParser(description="Remove recursive $ref from OpenAPI JSON file.") parser.add_argument("-f", "--file", required=True, help="Input OpenAPI JSON file") parser.add_argument( "-o", @@ -67,9 +65,7 @@ def main(input_file, output_file, debug): default="cleaned_openapi.json", help="Output file for cleaned OpenAPI JSON (default: cleaned_openapi.json)", ) - parser.add_argument( - "-d", "--debug", action="store_true", help="Enable debug messages" - ) + parser.add_argument("-d", "--debug", action="store_true", help="Enable debug messages") args = parser.parse_args()