diff --git a/.github/workflows/build-and-push.yml b/.github/workflows/build-and-push.yml new file mode 100644 index 00000000..f1974b03 --- /dev/null +++ b/.github/workflows/build-and-push.yml @@ -0,0 +1,41 @@ +name: Build and push container image + +env: + IMAGE_NAME: "rapidast" + IMAGE_TAGS: "${{ github.sha }}" + IMAGE_REGISTRY: quay.io/redhatproductsecurity + IMAGE_REGISTRY_USER: ${{ secrets.IMAGE_REGISTRY_USER }} + IMAGE_REGISTRY_PASSWORD: ${{ secrets.IMAGE_REGISTRY_PASSWORD }} + +on: + push: + branches: ["development", "main"] + +jobs: + + build-and-push: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + # https://github.com/redhat-actions/buildah-build#readme + - name: Build container image + id: build-image + uses: redhat-actions/buildah-build@v2 + with: + image: ${{ env.IMAGE_NAME }} + tags: ${{ env.IMAGE_TAGS }} + dockerfiles: | + ./containerize/Containerfile + + # https://github.com/redhat-actions/push-to-registry#readme + - name: Push to registry + id: push-image + uses: redhat-actions/push-to-registry@v2 + with: + image: ${{ steps.build-image.outputs.image }} + tags: ${{ steps.build-image.outputs.tags }} + registry: ${{ env.IMAGE_REGISTRY }} + username: ${{ env.IMAGE_REGISTRY_USER }} + password: ${{ env.IMAGE_REGISTRY_PASSWORD }} diff --git a/.github/workflows/build-image.yml b/.github/workflows/build-image.yml new file mode 100644 index 00000000..e1ce7f9e --- /dev/null +++ b/.github/workflows/build-image.yml @@ -0,0 +1,26 @@ +name: Build container image + +env: + IMAGE_NAME: "rapidast" + IMAGE_TAGS: "${{ github.sha }}" + +on: + pull_request: + branches: ["development", "main"] + +jobs: + + build-image: + + runs-on: ubuntu-latest + + # https://github.com/redhat-actions/buildah-build#readme + steps: + - uses: actions/checkout@v4 + - name: Build container image + uses: redhat-actions/buildah-build@v2 + with: + image: ${{ env.IMAGE_NAME }} + tags: ${{ env.IMAGE_TAGS }} + dockerfiles: | + ./containerize/Containerfile diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml new file mode 100644 index 00000000..96005c4c --- /dev/null +++ b/.github/workflows/run-tests.yml @@ -0,0 +1,32 @@ +name: Run tests + +on: + push: + branches: ["development", "main"] + pull_request: + branches: ["development", "main"] + +permissions: + contents: read + +jobs: + test: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.9 + uses: actions/setup-python@v3 + with: + python-version: "3.9" + - name: Install dependencies + run: | + python3 -m ensurepip --upgrade + pip install --no-cache-dir -r requirements.txt -r requirements-dev.txt + - name: Test with pytest + run: | + pytest + - name: Lint with pre-commit hook + run: | + pre-commit run --all-files --show-diff-on-failure diff --git a/.github/workflows/tag-image.yml b/.github/workflows/tag-image.yml new file mode 100644 index 00000000..ea6b1b1a --- /dev/null +++ b/.github/workflows/tag-image.yml @@ -0,0 +1,33 @@ +name: Tag image on quay.io + +env: + IMAGE_NAME: "rapidast" + IMAGE_REGISTRY: quay.io/redhatproductsecurity + IMAGE_REGISTRY_USER: ${{ secrets.IMAGE_REGISTRY_USER }} + IMAGE_REGISTRY_PASSWORD: ${{ secrets.IMAGE_REGISTRY_PASSWORD }} + +on: + push: + tags: ["*"] + +jobs: + + tag-image: + + runs-on: ubuntu-latest + + steps: + # https://github.com/redhat-actions/podman-login + - name: Log in to quay.io + uses: redhat-actions/podman-login@v1 + with: + registry: ${{ env.IMAGE_REGISTRY }} + username: ${{ env.IMAGE_REGISTRY_USER }} + password: ${{ env.IMAGE_REGISTRY_PASSWORD }} + + - name: Tag image + run: | + # tag existing image on quay.io that has : tag with : gh tag + SRC=${{ env.IMAGE_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} + DST=${{ env.IMAGE_REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.ref_name }} + skopeo copy docker://${SRC} docker://${DST} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f6bb264b..88488353 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,6 +20,8 @@ repos: args: - --safe - --quiet + - --line-length + - "120" # same as pylint below language_version: python3 require_serial: true diff --git a/README.md b/README.md index 5b704bac..d362a68b 100644 --- a/README.md +++ b/README.md @@ -619,6 +619,23 @@ Solutions: * Selenium, used to control Firefox, uses shared memory (`/dev/shm/`). When using the RapiDAST image or the ZAP image, the user needs to make sure that sufficient space is available in `/dev/shm/` (in podman, by default, its size is 64MB). A size of 2G is the recommended value by the Selenium community. In podman for example, the option would be `--shm-size=2g`. * Zap and Firefox can create a huge numbers of threads. Some container engines will default to 2048 concurrent pids, which is not sufficient for the Ajax Spider. Whenever possible, RapiDAST will check if that limit was reached, after the scan is finished, and prints a warning if this happened. In podman, increasing the maximum number of concurrent pids is done via the `--pids-limit=-1` option to prevent any limits. +## Podman errors + +### subuid/subgid are not enabled + +If you see one of those errors: + +``` +Error: copying system image from manifest list: writing blob: adding layer with blob "sha256:82aabceedc2fbf89030cbb4ff98215b70d9ae35c780ade6c784d9b447b1109ed": processing tar file(potentially insufficient UIDs or GIDs available in user namespace (requested 0:42 for /etc/gshadow): Check /etc/subuid and /etc/subgid if configured locally and run "podman system migrate": lchown /etc/gshadow: invalid argument): exit status 1 +``` + -or- +``` +Error: parsing id map value "-1000": strconv.ParseUint: parsing "-1000": invalid syntax +``` + +Podman, in rootless mode (running as a regular user), needs subuid/subgit to be enabled: [rootless mode](https://docs.podman.io/en/latest/markdown/podman.1.html#rootless-mode) + + ## Caveats * Currently, RapiDAST does not clean up the temporary data when there is an error. The data may include: diff --git a/config/config-template-trivy-k8s-scan.yaml b/config/config-template-trivy-k8s-scan.yaml index 6a3627d2..b9aacaf2 100644 --- a/config/config-template-trivy-k8s-scan.yaml +++ b/config/config-template-trivy-k8s-scan.yaml @@ -40,7 +40,7 @@ scanners: # 'inline' is used when container.type is not 'podman' # 'toolDir' specifies the default directory where inline scripts are located #toolDir: scanners/generic/tools - inline: "trivy k8s --kubeconfig=/home/rapidast/.kube/config -n default pod --scanners=misconfig --report all --format json -o /tmp/k8s_result.json && python3 convert_trivy_k8s_to_sarif.py -f /tmp/k8s_result.json" + inline: "trivy k8s --kubeconfig=/home/rapidast/.kube/config -n default pod --scanners=misconfig --report all --format json | convert_trivy_k8s_to_sarif.py" container: parameters: diff --git a/config/config-template-zap-long.yaml b/config/config-template-zap-long.yaml index 4663a368..79051f56 100644 --- a/config/config-template-zap-long.yaml +++ b/config/config-template-zap-long.yaml @@ -15,6 +15,10 @@ config: # all the results of all scanners will be stored under that location base_results_dir: "./results" + # In RapiDAST only: should RapiDAST verify certificates + # possible values: true [default], false, /path/to/a/PEM/file + tls_verify_for_rapidast_downloads: true + # Import a particular environment, and inject it for each scanner environ: envFile: "path/to/env/file" @@ -66,7 +70,7 @@ general: # # "browser" authentication will use firefox in the background to generate cookies # - verifyUrl must return an error if the user is not logged in - #type: "browser" + #type: "browser" #parameters: # username: "user" # password: "mypassw0rd" @@ -135,6 +139,9 @@ scanners: url: "" # url to start spidering from, default: application.url set above spiderAjax: + # The list of parameters: https://www.zaproxy.org/docs/desktop/addons/ajax-spider/automation/ + #maxCrawlStates: 10 # this may be useful when running in a memory limited environment (default: 0 unlimited) + #maxCrawlDepth: 10 # default: unlimited maxDuration: 0 # in minutes, default: 0 unlimited url: "" # url to start spidering from, default: application.url set above browserId: firefox-headless @@ -145,6 +152,10 @@ scanners: disabledRules: "2,10015,10024,10027,10054,10096,10109,10112" activeScan: + # The list of parameters: https://www.zaproxy.org/docs/desktop/addons/ajax-spider/automation/ + #maxRuleDurationInMins: max scan time for each Rule (default: unlimited) + #maxScanDurationInMins: max scan time for the entire scan. Useful for debugging automation + # # If no policy is chosen, a default ("API-scan-minimal") will be selected # The list of policies can be found in scanners/zap/policies/ policy: "API-scan-minimal" diff --git a/configmodel/__init__.py b/configmodel/__init__.py index 13bb1758..c3a564e4 100644 --- a/configmodel/__init__.py +++ b/configmodel/__init__.py @@ -77,9 +77,7 @@ def delete(self, path): except AttributeError: pass # Failed to iterate until the end: the path does not exist - logging.warning( - f"RapidastConfigModel.delete(): Config path {path} was not found. No deletion" - ) + logging.warning(f"RapidastConfigModel.delete(): Config path {path} was not found. No deletion") return False def exists(self, path): @@ -122,9 +120,7 @@ def set(self, path, value, overwrite=True): tmp = walk[key] # case 3: not a "dictionary" type: warn and overwrite (if True) if not isinstance(tmp, dict): - logging.warning( - f"RapidastConfigModel.set: Incompatible {path} at {tmp}" - ) + logging.warning(f"RapidastConfigModel.set: Incompatible {path} at {tmp}") if not overwrite: logging.info("RapidastConfigModel.set: no overwrite: early return") return False @@ -162,9 +158,7 @@ def merge(self, merge, preserve=False, root=None): if not merge: return if not isinstance(merge, dict): - raise TypeError( - f"RapidastConfigModel.merge: merge must be a dict (was: {type(merge)})" - ) + raise TypeError(f"RapidastConfigModel.merge: merge must be a dict (was: {type(merge)})") root = path_to_list(root) @@ -176,8 +170,54 @@ def merge(self, merge, preserve=False, root=None): deep_dict_merge(sub_conf, merge, preserve) + def subtree_to_dict(self, path): + """Given a path, returns its subtree as a dictionary. + This includes applying all the `*_from_var` transformation. + e.g.: + "{'a_from_var': 'A_VAR'}" would return "{'a': ''}" + + Cases: + 1- path does not exist: return None + 2- path does not point to a dictionary: throw a KeyError instance + 3- path exist and is a dictionary: copy it, walk the copy apply all _from_var, return the copy + """ + + # recursively descend the tree, and apply all the _from_var + def descend(root): + if isinstance(root, dict): + # Dictionary: + # create a new dictionary, and apply the following logic: + # if key matches `_from_var`, assume value is a string, and apply replacement + # otherwise, copy key name and recursively descend on the value + new = {} + for key, val in root.items(): + if key.endswith("_from_var"): + new[key.removesuffix("_from_var")] = os.environ[val] + if not new[key.removesuffix("_from_var")]: + logging.warning(f"configuration {key} points to environment variable {val}, which is empty") + else: + new[key] = descend(val) + return new + elif isinstance(root, list): + # List: apply on each entry, and return a new List + return [descend(val) for val in root] + else: + # root is just a value (integer, string), assuming it's immutable + return root + + try: + subtree = self._get_from_conf(path_to_list(path)) + except KeyError: + logging.debug(f"subtree_to_dict(): path '{path}' does not exist") + return None + + if not isinstance(subtree, dict): + raise KeyError(f"subtree_to_dict(): '{path}' does not point to a dictionary in the config") + + return descend(subtree) + def get_official_app_name(self): - """ Shortcut: + """Shortcut: Return a string corresponding to how the application should be called Based on the configuratoin. Prefer the full product name, but defer to short name if unavailable diff --git a/configmodel/converter.py b/configmodel/converter.py index fcd66ab3..8177d888 100755 --- a/configmodel/converter.py +++ b/configmodel/converter.py @@ -45,9 +45,7 @@ def dispatch(version): def convert_configmodel(conf): """This is the base function, attached to error reporting""" version = conf.get("config.configVersion", 0) - raise RuntimeError( - f"There was an error in converting configuration. No convertion available for version {version}" - ) + raise RuntimeError(f"There was an error in converting configuration. No convertion available for version {version}") @convert_configmodel.register(4) @@ -60,9 +58,7 @@ def convert_from_version_4_to_5(old): new = copy.deepcopy(old) for key in old.conf["scanners"]: - if key.startswith("zap") and old.exists( - f"scanners.{key}.miscOptions.oauth2OpenapiManualDownload" - ): + if key.startswith("zap") and old.exists(f"scanners.{key}.miscOptions.oauth2OpenapiManualDownload"): new.move( f"scanners.{key}.miscOptions.oauth2OpenapiManualDownload", f"scanners.{key}.miscOptions.oauth2ManualDownload", @@ -174,8 +170,7 @@ def convert_from_version_0_to_1(old): auth_method = old.get("scan.auth_method", default=None) if ( auth_method == "scriptBasedAuthentication" - and old.get("scan.scriptAuth.authScriptFilePath", default="") - == "scripts/offline-token.js" + and old.get("scan.scriptAuth.authScriptFilePath", default="") == "scripts/offline-token.js" ): # probably OAuth2 new.set( @@ -183,20 +178,14 @@ def convert_from_version_0_to_1(old): { "type": "oauth2_rtoken", "parameters": { - "client_id": old.get( - "scan.scriptAuth.authClientID", default="cloud-services" - ), - "token_endpoint": old.get( - "scan.scriptAuth.authTokenEndpoint", default="" - ), + "client_id": old.get("scan.scriptAuth.authClientID", default="cloud-services"), + "token_endpoint": old.get("scan.scriptAuth.authTokenEndpoint", default=""), "rtoken_var_name": "RTOKEN", }, }, ) else: - logging.warning( - "The config version translator does not support this particular authentication" - ) + logging.warning("The config version translator does not support this particular authentication") # "Scanners.Zap" section new.set( @@ -206,13 +195,9 @@ def convert_from_version_0_to_1(old): ### OpenAPI if old.get("openapi.importFromUrl", default=False): - new.set( - "scanners.zap.apiScan.apis.apiUrl", old.get("openapi.url", default=None) - ) + new.set("scanners.zap.apiScan.apis.apiUrl", old.get("openapi.url", default=None)) elif old.get("openapi.directory", default=""): - logging.warning( - "The config version translator does not support Directory based OpenAPI" - ) + logging.warning("The config version translator does not support Directory based OpenAPI") ## Passive scan new.set("scanners.zap.passiveScan", {}) @@ -225,9 +210,7 @@ def convert_from_version_0_to_1(old): ## Active scan # Active scanner was always enabled, so we do the same: new.set("scanners.zap.activeScan", {}) - new.set( - "scanners.zap.activeScan.policy", old.get("scan.policies.scanPolicyName", None) - ) + new.set("scanners.zap.activeScan.policy", old.get("scan.policies.scanPolicyName", None)) # Finally, set the correct version number new.set("config.configVersion", 1) diff --git a/containerize/Containerfile b/containerize/Containerfile index 0fb18da4..b4f8bd73 100644 --- a/containerize/Containerfile +++ b/containerize/Containerfile @@ -55,6 +55,9 @@ COPY ./configmodel/ /opt/rapidast/configmodel/ COPY ./utils/ /opt/rapidast/utils/ COPY ./config/ /opt/rapidast/config/ +### Add generic tools in the PATH +COPY ./scanners/generic/tools/convert_trivy_k8s_to_sarif.py /usr/local/bin/ + ### Overload default config (set 'none' as default container type) COPY ./containerize/container_default_config.yaml /opt/rapidast/rapidast-defaults.yaml diff --git a/exports/defect_dojo.py b/exports/defect_dojo.py index ee13c124..297c2506 100644 --- a/exports/defect_dojo.py +++ b/exports/defect_dojo.py @@ -11,9 +11,7 @@ class DefectDojo: def __init__(self, base_url, login=None, token=None, ssl=None): if not base_url: - raise ValueError( - "Defect Dojo invalid configuration: URL is a mandatory value" - ) + raise ValueError("Defect Dojo invalid configuration: URL is a mandatory value") parsed = parse.urlparse(base_url) # expects to raise exception on invalid URL if parsed.scheme not in ["http", "https"]: raise ValueError("Defect Dojo invalid configuration: URL is not correct") @@ -27,9 +25,7 @@ def __init__(self, base_url, login=None, token=None, ssl=None): self.username = login["username"] self.password = login["password"] except KeyError: - logging.error( - "RapiDAST BUG: DefectDojo was created with invalid login information..." - ) + logging.error("RapiDAST BUG: DefectDojo was created with invalid login information...") logging.error("RapiDAST BUG: ...[continuing without credentials]") self.token = token @@ -47,9 +43,7 @@ def _auth_and_set_token(self): """Force a refresh of the token using the username/password""" logging.debug("Defect Dojo: refreshing token") if not self.username or not self.password: - raise ValueError( - "Defect Dojo invalid configuration: A username and a password are required to get a token" - ) + raise ValueError("Defect Dojo invalid configuration: A username and a password are required to get a token") url = self.base_url + "/api/v2/api-token-auth/" data = {"username": self.username, "password": self.password} @@ -63,9 +57,7 @@ def _auth_and_set_token(self): self.headers["Authorization"] = f"Token {self.token}" logging.debug("Defect Dojo: successfully refreshed token") except requests.exceptions.ConnectTimeout as e: - logging.error( - f"Getting token failed. Check the URL for defectDojo in config file. err details: {e}" - ) + logging.error(f"Getting token failed. Check the URL for defectDojo in config file. err details: {e}") return 1 except requests.exceptions.HTTPError as e: logging.error( @@ -96,9 +88,7 @@ def engagement_exists(self, engagement_id=None, name=None): raise ValueError("Either an engagement name or ID must be provided") if resp.status_code >= 400: - logging.warning( - f"Error while looking for engagement ({resp.status_code}, {resp.get('message')})" - ) + logging.warning(f"Error while looking for engagement ({resp.status_code}, {resp.get('message')})") counts = resp.json()["counts"] if counts > 1: logging.warning("Error while looking for engagement: too many hits") @@ -134,9 +124,7 @@ def _private_import(self, endpoint, data, filename): logging.error(f"Error while exporting ({resp.status_code}, {err})") if "Invalid token" in err["detail"]: - logging.error( - "Please check your token in 'config.defectDojo' of the config file" - ) + logging.error("Please check your token in 'config.defectDojo' of the config file") return 1 @@ -146,31 +134,19 @@ def reimport_scan(self, data, filename): """Reimport to an existing engagement with an existing compatible scan.""" if not data.get("test") and not ( - data.get("engagement_name") - and data.get("product_name") - and data.get("test_title") + data.get("engagement_name") and data.get("product_name") and data.get("test_title") ): - raise ValueError( - "Reimport needs to identify an existing test (by ID or names of product+engagement+test)" - ) + raise ValueError("Reimport needs to identify an existing test (by ID or names of product+engagement+test)") - return self._private_import( - f"{self.base_url}/api/v2/reimport-scan/", data, filename - ) + return self._private_import(f"{self.base_url}/api/v2/reimport-scan/", data, filename) def import_scan(self, data, filename): """export to an existing engagement, via the `import-scan` endpoint.""" - if not data.get("engagement") and not ( - data.get("engagement_name") and data.get("product_name") - ): - raise ValueError( - "Import needs to identify an existing engagement (by ID or names of product+engagement)" - ) + if not data.get("engagement") and not (data.get("engagement_name") and data.get("product_name")): + raise ValueError("Import needs to identify an existing engagement (by ID or names of product+engagement)") - return self._private_import( - f"{self.base_url}/api/v2/import-scan/", data, filename - ) + return self._private_import(f"{self.base_url}/api/v2/import-scan/", data, filename) def export_scan(self, data, filename): """Decide wether to import or reimport. Based on: diff --git a/exports/google_cloud_storage.py b/exports/google_cloud_storage.py index 2fe4883f..4cc7d684 100755 --- a/exports/google_cloud_storage.py +++ b/exports/google_cloud_storage.py @@ -56,9 +56,7 @@ def export_scan(self, data, filename): metadata = self.create_metadata(data) - logging.info( - f"GoogleCloudStorage: sending {filename}. UUID: {metadata['uuid']}" - ) + logging.info(f"GoogleCloudStorage: sending {filename}. UUID: {metadata['uuid']}") # export data as a metadata.json file json_stream = StringIO() @@ -82,11 +80,7 @@ def export_scan(self, data, filename): unique_id = "{}-RapiDAST-{}-{}.tgz".format( # pylint: disable=C0209 datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), self.app_name, - "".join( - random.choices( - string.ascii_letters + string.ascii_uppercase + string.digits, k=6 - ) - ), + "".join(random.choices(string.ascii_letters + string.ascii_uppercase + string.digits, k=6)), ) blob_name = self.directory + "/" + unique_id diff --git a/rapidast.py b/rapidast.py index 5b05ba4e..a03d0372 100755 --- a/rapidast.py +++ b/rapidast.py @@ -108,9 +108,7 @@ def run_scanner(name, config, args, scan_exporter): # Part 5: cleanup if not scanner.state == scanners.State.PROCESSED: - logging.error( - f"Something is wrong. Scanner {name} is not in PROCESSED state: the workdir won't be cleaned up" - ) + logging.error(f"Something is wrong. Scanner {name} is not in PROCESSED state: the workdir won't be cleaned up") return 1 if not args.no_cleanup: @@ -155,19 +153,13 @@ def run(): args.loglevel = args.loglevel.upper() add_logging_level("VERBOSE", logging.DEBUG + 5) logging.basicConfig(format="%(levelname)s:%(message)s", level=args.loglevel) - logging.debug( - f"log level set to debug. Config file: '{parser.parse_args().config_file}'" - ) + logging.debug(f"log level set to debug. Config file: '{parser.parse_args().config_file}'") # Load config file try: - config = configmodel.RapidastConfigModel( - yaml.safe_load(load_config_file(parser.parse_args().config_file)) - ) + config = configmodel.RapidastConfigModel(yaml.safe_load(load_config_file(parser.parse_args().config_file))) except yaml.YAMLError as exc: - raise RuntimeError( - f"YAML error in config {parser.parse_args().config_file}':\n {str(exc)}" - ) from exc + raise RuntimeError(f"YAML error in config {parser.parse_args().config_file}':\n {str(exc)}") from exc # Optionally adds default if file exists (will not overwrite existing entries) default_conf = os.path.join(os.path.dirname(__file__), "rapidast-defaults.yaml") @@ -176,18 +168,14 @@ def run(): try: config.merge(yaml.safe_load(load_config_file(default_conf)), preserve=True) except yaml.YAMLError as exc: - raise RuntimeError( - f"YAML error in config {default_conf}':\n {str(exc)}" - ) from exc + raise RuntimeError(f"YAML error in config {default_conf}':\n {str(exc)}") from exc # Update to latest config schema if need be config = configmodel.converter.update_to_latest_config(config) config.set("config.results_dir", get_full_result_dir_path(config)) - logging.debug( - f"The entire loaded configuration is as follow:\n=====\n{pp.pformat(config)}\n=====" - ) + logging.debug(f"The entire loaded configuration is as follow:\n=====\n{pp.pformat(config)}\n=====") # Do early: load the environment file if one is there load_environment(config) @@ -196,9 +184,7 @@ def run(): scan_exporter = None if config.get("config.googleCloudStorage.bucketName"): scan_exporter = GoogleCloudStorage( - bucket_name=config.get( - "config.googleCloudStorage.bucketName", "default-bucket-name" - ), + bucket_name=config.get("config.googleCloudStorage.bucketName", "default-bucket-name"), app_name=config.get_official_app_name(), directory=config.get("config.googleCloudStorage.directory", None), keyfile=config.get("config.googleCloudStorage.keyFile", None), @@ -207,12 +193,8 @@ def run(): scan_exporter = DefectDojo( config.get("config.defectDojo.url"), { - "username": config.get( - "config.defectDojo.authorization.username", default="" - ), - "password": config.get( - "config.defectDojo.authorization.password", default="" - ), + "username": config.get("config.defectDojo.authorization.username", default=""), + "password": config.get("config.defectDojo.authorization.password", default=""), }, config.get("config.defectDojo.authorization.token"), config.get("config.defectDojo.ssl", default=True), diff --git a/scanners/__init__.py b/scanners/__init__.py index 839335a6..c0f5df05 100644 --- a/scanners/__init__.py +++ b/scanners/__init__.py @@ -25,19 +25,24 @@ def __init__(self, config, ident): self.config = config self.state = State.UNCONFIGURED - self.results_dir = os.path.join( - self.config.get("config.results_dir", default="results"), self.ident - ) + self.results_dir = os.path.join(self.config.get("config.results_dir", default="results"), self.ident) # When requested to create a temporary file or directory, it will be a subdir of # this temporary directory self.main_temp_dir = None + def absolute_conf_path(self, path): + """Handy shortcut to get an absolute path into a scanner's config parameter. + WARNING: currently, `path` MUST be in string for (e.g.: `spiderAjax.parameters.maxCrawlDepth`) + """ + return f"scanners.{self.ident}.{path}" + def my_conf(self, path, *args, **kwargs): """Handy shortcut to get the scanner's configuration. Only for within `scanners.` + WARNING: currently, `path` MUST be in string for (e.g.: `spiderAjax.parameters.maxCrawlDepth`) """ - return self.config.get(f"scanners.{self.ident}.{path}", *args, **kwargs) + return self.config.get(self.absolute_conf_path(path), *args, **kwargs) def set_my_conf(self, path, *args, **kwargs): """Handy shortcut to set the scanner's configuration. @@ -70,8 +75,7 @@ def _should_export_to_defect_dojo(self): - this particular scanner's export is not explicitely disabled (`defectDojoExport` is not False) """ return self.my_conf("defectDojoExport") is not False and ( - self.config.get("config.googleCloudStorage") - or self.config.get("config.defectDojo") + self.config.get("config.googleCloudStorage") or self.config.get("config.defectDojo") ) def _fill_up_data_for_defect_dojo(self, data): @@ -117,9 +121,7 @@ def _fill_up_data_for_defect_dojo(self, data): # A default product name was chosen as part of `self.get_default_defectdojo_data()` # Generate an engagement name if none are set if not data.get("engagement_name"): - data[ - "engagement_name" - ] = f"RapiDAST-{data['product_name']}-{datetime.date.today()}" + data["engagement_name"] = f"RapiDAST-{data['product_name']}-{datetime.date.today()}" return data diff --git a/scanners/downloaders.py b/scanners/downloaders.py index cc26e317..f731c41b 100644 --- a/scanners/downloaders.py +++ b/scanners/downloaders.py @@ -5,7 +5,7 @@ import yaml -def anonymous_download(url, dest=None, proxy=None): +def anonymous_download(url, dest=None, proxy=None, verify=None): """Given a URL, load it using a GET request to dest""" logging.debug(f"Downloading {url}") @@ -14,7 +14,7 @@ def anonymous_download(url, dest=None, proxy=None): "https": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}", "http": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}", } - resp = requests.get(url, allow_redirects=True, proxies=proxy) + resp = requests.get(url, allow_redirects=True, proxies=proxy, verify=verify) if resp.status_code >= 400: logging.warning(f"Download {url} failed with {resp.status_code}.") return False @@ -29,14 +29,17 @@ def anonymous_download(url, dest=None, proxy=None): return resp.content -def oauth2_get_token_from_rtoken(auth, proxy=None, session=None): +def oauth2_get_token_from_rtoken(auth, proxy=None, session=None, verify=None): """Given a rtoken, retrieve and return a Bearer token auth is in the form { url, client_id, rtoken } + NOTE: if a session is provided, `verify` will not overwrite the session's `verify` state """ if session is None: session = requests.Session() + if verify is not None: + session.verify = verify headers = { "Accept": "application/json", @@ -57,9 +60,7 @@ def oauth2_get_token_from_rtoken(auth, proxy=None, session=None): resp = session.post(auth["url"], data=payload, headers=headers, proxies=proxy) resp.raise_for_status() except requests.exceptions.ConnectTimeout: - logging.error( - "Getting oauth2 token failed: server unresponsive. Check the Authentication URL parameters" - ) + logging.error("Getting oauth2 token failed: server unresponsive. Check the Authentication URL parameters") return False except requests.exceptions.HTTPError as e: logging.error(f"Getting token failed: Check the RTOKEN. err details: {e}") @@ -68,18 +69,18 @@ def oauth2_get_token_from_rtoken(auth, proxy=None, session=None): try: token = yaml.safe_load(resp.text)["access_token"] except KeyError as exc: - logging.error( - f"Unable to extract access token from OAuth2 authentication:\n {str(exc)}" - ) + logging.error(f"Unable to extract access token from OAuth2 authentication:\n {str(exc)}") return False return token -def authenticated_download_with_rtoken(url, dest, auth, proxy=None): +def authenticated_download_with_rtoken(url, dest, auth, proxy=None, verify=None): """Given a URL and Oauth2 authentication parameters, download the URL and store it at `dest`""" session = requests.Session() + if verify is not None: + session.verify = verify # get a token token = oauth2_get_token_from_rtoken(auth, proxy, session) @@ -96,9 +97,7 @@ def authenticated_download_with_rtoken(url, dest, auth, proxy=None): resp = session.get(url, proxies=proxy, headers=authenticated_headers) if resp.status_code >= 400: - logging.warning( - f"ERROR: download failed with {resp.status_code}. Aborting download for {url}" - ) + logging.warning(f"ERROR: download failed with {resp.status_code}. Aborting download for {url}") return False with open(dest, "w", encoding="utf-8") as file: diff --git a/scanners/generic/generic.py b/scanners/generic/generic.py index 74b304c1..6347e5fe 100644 --- a/scanners/generic/generic.py +++ b/scanners/generic/generic.py @@ -125,9 +125,7 @@ def _setup_generic_cli(self): # disabling these 2 rules only here since they might actually be useful else where # pylint: disable=unused-argument def _add_env(self, key, value=None): - logging.warning( - "_add_env() was called on the parent Generic class. This is likely a bug. No operation done" - ) + logging.warning("_add_env() was called on the parent Generic class. This is likely a bug. No operation done") ############################################################### # PRIVATE METHODS # diff --git a/scanners/generic/generic_none.py b/scanners/generic/generic_none.py index e881256b..c9673ef7 100644 --- a/scanners/generic/generic_none.py +++ b/scanners/generic/generic_none.py @@ -53,9 +53,7 @@ def setup(self): """ if self.state != State.UNCONFIGURED: - raise RuntimeError( - f"generic_none setup encountered an unexpected state: {self.state}" - ) + raise RuntimeError(f"generic_none setup encountered an unexpected state: {self.state}") self._setup_generic_cli() @@ -78,11 +76,7 @@ def run(self): cli = self.generic_cli # The result is stdout if "results" is undefined or `*stdout` - stdout_store = ( - subprocess.PIPE - if not self.my_conf("results") or self.my_conf("results") == "*stdout" - else None - ) + stdout_store = subprocess.PIPE if not self.my_conf("results") or self.my_conf("results") == "*stdout" else None # DO STUFF @@ -105,21 +99,13 @@ def run(self): for line in scanning.stdout: print(line, end="") scanning_stdout_results += line - logging.debug( - f"generic returned the following:\n=====\n{pp.pformat(scanning)}\n=====" - ) - - if scanning.returncode in self.my_conf( - "container.parameters.validReturns", [0] - ): - logging.info( - f"The generic process finished correctly, and exited with code {scanning.returncode}" - ) + logging.debug(f"generic returned the following:\n=====\n{pp.pformat(scanning)}\n=====") + + if scanning.returncode in self.my_conf("container.parameters.validReturns", [0]): + logging.info(f"The generic process finished correctly, and exited with code {scanning.returncode}") self.state = State.DONE else: - logging.warning( - f"The generic process did not finish correctly, and exited with code {scanning.returncode}" - ) + logging.warning(f"The generic process did not finish correctly, and exited with code {scanning.returncode}") self.state = State.ERROR # If we captured an output, let's save it into a temporary file, and use that as a new result parameter @@ -128,17 +114,13 @@ def run(self): with open(report_path, "w", encoding="utf-8") as results: results.write(scanning_stdout_results) # Now that the result is a file, change the config to point to it - logging.debug( - f"Overloading {self.ident} config result parameter to {report_path}" - ) + logging.debug(f"Overloading {self.ident} config result parameter to {report_path}") self.set_my_conf("results", value=report_path, overwrite=True) def postprocess(self): logging.info("Running postprocess for the generic environment") if not self.state == State.DONE: - raise RuntimeError( - "No post-processing as generic has not successfully run yet." - ) + raise RuntimeError("No post-processing as generic has not successfully run yet.") super().postprocess() diff --git a/scanners/generic/generic_podman.py b/scanners/generic/generic_podman.py index be3d8e92..04ee2f6a 100644 --- a/scanners/generic/generic_podman.py +++ b/scanners/generic/generic_podman.py @@ -64,9 +64,7 @@ def setup(self): """ if self.state != State.UNCONFIGURED: - raise RuntimeError( - f"generic_podman setup encountered an unexpected state: {self.state}" - ) + raise RuntimeError(f"generic_podman setup encountered an unexpected state: {self.state}") self._setup_podman_cli() self._setup_generic_cli() @@ -87,38 +85,24 @@ def run(self): cli = self.podman.get_complete_cli(self.generic_cli) # The result is stdout if "results" is undefined or `*stdout` - stdout_store = ( - subprocess.PIPE - if not self.my_conf("results") or self.my_conf("results") == "*stdout" - else None - ) + stdout_store = subprocess.PIPE if not self.my_conf("results") or self.my_conf("results") == "*stdout" else None # DO STUFF logging.info(f"Running generic with the following command:\n{cli}") scanning_stdout_results = "" - with subprocess.Popen( - cli, stdout=stdout_store, bufsize=1, universal_newlines=True - ) as scanning: + with subprocess.Popen(cli, stdout=stdout_store, bufsize=1, universal_newlines=True) as scanning: if stdout_store: logging.debug("Storing podman's standard output") for line in scanning.stdout: print(line, end="") scanning_stdout_results += line - logging.debug( - f"generic returned the following:\n=====\n{pp.pformat(scanning)}\n=====" - ) + logging.debug(f"generic returned the following:\n=====\n{pp.pformat(scanning)}\n=====") - if scanning.returncode in self.my_conf( - "container.parameters.validReturns", [0] - ): - logging.info( - f"The generic process finished correctly, and exited with code {scanning.returncode}" - ) + if scanning.returncode in self.my_conf("container.parameters.validReturns", [0]): + logging.info(f"The generic process finished correctly, and exited with code {scanning.returncode}") self.state = State.DONE else: - logging.warning( - f"The generic process did not finish correctly, and exited with code {scanning.returncode}" - ) + logging.warning(f"The generic process did not finish correctly, and exited with code {scanning.returncode}") self.state = State.ERROR # If we captured an output, let's save it into a temporary file, and use that as a new result parameter @@ -127,17 +111,13 @@ def run(self): with open(report_path, "w", encoding="utf-8") as results: results.write(scanning_stdout_results) # Now that the result is a file, change the config to point to it - logging.debug( - f"Overloading {self.ident} config result parameter to {report_path}" - ) + logging.debug(f"Overloading {self.ident} config result parameter to {report_path}") self.set_my_conf("results", value=report_path, overwrite=True) def postprocess(self): logging.info("Running postprocess for the generic Podman environment") if not self.state == State.DONE: - raise RuntimeError( - "No post-processing as generic has not successfully run yet." - ) + raise RuntimeError("No post-processing as generic has not successfully run yet.") super().postprocess() diff --git a/scanners/generic/tools/convert_trivy_k8s_to_sarif.py b/scanners/generic/tools/convert_trivy_k8s_to_sarif.py old mode 100644 new mode 100755 index 0e6ffcd0..15150560 --- a/scanners/generic/tools/convert_trivy_k8s_to_sarif.py +++ b/scanners/generic/tools/convert_trivy_k8s_to_sarif.py @@ -3,21 +3,28 @@ # # Convert a Trivy k8s json result to SARIF format(stdout). # A usage example (see options in the code): -# $ convert_trivy_k8s_to_sarify.py -f [--log-level=DEBUG] +# $ convert_trivy_k8s_to_sarify.py [-f ] [--log-level=DEBUG] +# If `-f` is absent, or its value is `-`, JSON data will be read from STDIN # # import argparse import json import logging +import sys def read_json_block(json_file): """ - Read JSON data from a file. + Read JSON data from a file, or from STDIN. """ - with open(json_file, "r", encoding="utf-8") as f: - json_data = json.load(f) - return json_data + if json_file is None or json_file == "-": + logging.debug("Reading input from STDIN") + data = sys.stdin.read() + else: + logging.debug(f"Reading input from '{json_file}'") + with open(json_file, "r", encoding="utf-8") as f: + data = f.read() + return json.loads(data) def convert_json_to_sarif(json_data): @@ -29,9 +36,7 @@ def convert_json_to_sarif(json_data): "version": "2.1.0", "runs": [ { - "tool": { - "driver": {"name": "Trivy-k8s", "version": "0.49.1", "rules": []} - }, + "tool": {"driver": {"name": "Trivy-k8s", "version": "0.49.1", "rules": []}}, "results": [], } ], @@ -40,6 +45,8 @@ def convert_json_to_sarif(json_data): if "Resources" not in json_data: return sarif_template + rule_ids = set() + for res in json_data["Resources"]: if "Results" not in res: continue @@ -54,13 +61,7 @@ def convert_json_to_sarif(json_data): "ruleId": misconf["ID"], "level": misconf["Severity"], "message": {"text": misconf["Message"]}, - "locations": [ - { - "physicalLocation": { - "artifactLocation": {"uri": artifact_location} - } - } - ], + "locations": [{"physicalLocation": {"artifactLocation": {"uri": artifact_location}}}], } # It is observed there are no "StartLine" exists and "Code.Lines" is null in the result file @@ -73,20 +74,19 @@ def convert_json_to_sarif(json_data): new_report["locations"][0]["physicalLocation"]["region"] = { "startLine": misconf["CauseMetadata"]["StartLine"], "endLine": misconf["CauseMetadata"]["EndLine"], - "snippet": { - "text": json.dumps( - misconf["CauseMetadata"]["Code"]["Lines"] - ) - }, + "snippet": {"text": json.dumps(misconf["CauseMetadata"]["Code"]["Lines"])}, } - new_rule = { - "id": misconf["ID"], - "name": misconf["Title"], - "shortDescription": {"text": misconf["Description"]}, - } + if misconf["ID"] not in rule_ids: + new_rule = { + "id": misconf["ID"], + "name": misconf["Title"], + "shortDescription": {"text": misconf["Description"]}, + } + + sarif_template["runs"][0]["tool"]["driver"]["rules"].append(new_rule) + rule_ids.add(misconf["ID"]) - sarif_template["runs"][0]["tool"]["driver"]["rules"].append(new_rule) sarif_template["runs"][0]["results"].append(new_report) return sarif_template @@ -94,15 +94,14 @@ def convert_json_to_sarif(json_data): def main(): # Parse command-line arguments - parser = argparse.ArgumentParser( - description="Convert JSON data to SARIF format with JSON block added to message." - ) + parser = argparse.ArgumentParser(description="Convert JSON data to SARIF format with JSON block added to message.") parser.add_argument( "-f", "--filename", type=str, - required=True, - help="Path to JSON file", + required=False, + default=None, + help="Path to JSON file (if absent or '-': read from STDIN)", ) parser.add_argument( "--log-level", diff --git a/scanners/generic/tools/oobtkube.py b/scanners/generic/tools/oobtkube.py index be691fea..c02548fd 100644 --- a/scanners/generic/tools/oobtkube.py +++ b/scanners/generic/tools/oobtkube.py @@ -124,9 +124,7 @@ def count_total_leaf_keys(data): # pylint: disable=R0913 -def find_leaf_keys_and_test( - data, original_file, ipaddr, port, total_leaf_keys, processed_leaf_keys=0 -): +def find_leaf_keys_and_test(data, original_file, ipaddr, port, total_leaf_keys, processed_leaf_keys=0): """ Iterate the spec data and test each parameter by modifying the value with the attack payload. Test cases: appending 'curl' command, TBD @@ -139,9 +137,7 @@ def find_leaf_keys_and_test( ) else: processed_leaf_keys += 1 - logging.info( - f"Testing a leaf key: '{key}', ({processed_leaf_keys} / {total_leaf_keys})" - ) + logging.info(f"Testing a leaf key: '{key}', ({processed_leaf_keys} / {total_leaf_keys})") cmd = f"sed 's/{key}:.*/{key}: \"echo oobt; curl {ipaddr}:{port}\\/{key}\"/g' {original_file} > {tmp_file}" logging.debug(f"Command run: {cmd}") os.system(cmd) @@ -173,9 +169,7 @@ def scan_with_k8s_config(cfg_file_path: str, obj_data: dict, ipaddr: str, port: spec_data = obj_data.get("spec", {}) total_leaf_keys = count_total_leaf_keys(spec_data) # Apply Kubernetes config (e.g. CR for Operator, or Pod/resource for webhook) - find_leaf_keys_and_test( - spec_data, cfg_file_path, ipaddr, port, total_leaf_keys - ) + find_leaf_keys_and_test(spec_data, cfg_file_path, ipaddr, port, total_leaf_keys) def start_socket_listener(port, shared_queue, data_received, stop_event, duration): @@ -183,9 +177,7 @@ def start_socket_listener(port, shared_queue, data_received, stop_event, duratio try: server_socket.bind((SERVER_HOST, port)) except OSError as e: - logging.error( - f"{e}. Stopping the server. It might take a few seconds. Please try again later." - ) + logging.error(f"{e}. Stopping the server. It might take a few seconds. Please try again later.") stop_event.set() server_socket.close() return @@ -215,9 +207,7 @@ def start_socket_listener(port, shared_queue, data_received, stop_event, duratio break except socket.timeout: - logging.info( - "Socket timeout reached as the test duration expired. Stopping the server." - ) + logging.info("Socket timeout reached as the test duration expired. Stopping the server.") except Exception as e: raise RuntimeError("An error occurred. See logs for details.") from e @@ -292,9 +282,7 @@ def check_can_create(obj_data: dict) -> bool: # pylint: disable=R0915 def main(): # Parse command-line arguments - parser = argparse.ArgumentParser( - description="Simulate a socket listener and respond to requests." - ) + parser = argparse.ArgumentParser(description="Simulate a socket listener and respond to requests.") parser.add_argument( "-i", "--ip-addr", @@ -316,9 +304,7 @@ def main(): default=300, help="Duration for the listener thread to run in seconds (default: 300 seconds)", ) - parser.add_argument( - "-f", "--filename", type=str, required=True, help="Kubernetes config file path" - ) + parser.add_argument("-f", "--filename", type=str, required=True, help="Kubernetes config file path") # add argument for '-o' to output the result to a file parser.add_argument( "-o", @@ -397,9 +383,7 @@ def main(): time.sleep(1) # Adjust the sleep duration as needed elapsed_time_main = time.time() - start_time_main if elapsed_time_main >= args.duration: - logging.debug( - f"The duration of {args.duration} seconds has reached. Exiting..." - ) + logging.debug(f"The duration of {args.duration} seconds has reached. Exiting...") stop_event.set() if data_received.is_set(): @@ -408,9 +392,7 @@ def main(): print_result(sarif_output, args.output, True) vulnerability_count += 1 - logging.info( - f"A vulnerability has been found. Total: {vulnerability_count}" - ) + logging.info(f"A vulnerability has been found. Total: {vulnerability_count}") data_has_been_received = True diff --git a/scanners/podman_wrapper.py b/scanners/podman_wrapper.py index 271b82de..f29086b2 100644 --- a/scanners/podman_wrapper.py +++ b/scanners/podman_wrapper.py @@ -22,9 +22,7 @@ class PodmanWrapper: def __init__(self, app_name, scan_name, image): # First verify that "podman" can be called if not shutil.which("podman"): - raise OSError( - "Podman is not installed or not in the PATH. It is required to run a podman based scanner" - ) + raise OSError("Podman is not installed or not in the PATH. It is required to run a podman based scanner") # Image to use self.image = image @@ -52,9 +50,7 @@ def get_complete_cli(self, cmd=None): def delete_yourself(self): """Deletes the container image created by the run command""" - ret = subprocess.run( - ["podman", "rm", self.container_name], check=False - ).returncode + ret = subprocess.run(["podman", "rm", self.container_name], check=False).returncode if ret: logging.warning(f"Failed to delete container {self.container_name}") return ret @@ -92,13 +88,46 @@ def add_volume_map(self, mapping): self.add_option("--volume", mapping) def change_user_id(self, runas_uid, runas_gid): - """Adds a specific user mapping between host user and user in the podman container. - Some containers, such as Zap, focused on docker require this to prevent UID mismatch. - This function aims as preparing a specific UID/GID mapping so that a particular UID/GID maps to the host user + """ + Specify a container user ID to which the current user should be mapped to. + This is sometimes required because rootless podman uses Linux' subUIDs. + + If podman version >= 4.3, use the `--userns keep-id:uid=X,gid=Y` + otherwise, call `change_user_id_workaround()` to manually create a user mapping + """ + logging.info(f"Current user mapped to container user {runas_uid}:{runas_gid}") + try: + vers = json.loads( + subprocess.run( + ["podman", "version", "--format", "json"], + stdout=subprocess.PIPE, + check=True, + ).stdout.decode("utf-8") + ) + major, minor = map(int, vers["Client"]["Version"].split(".")[:2]) + logging.debug(f"podman version: {vers}. Major: {major}, minor: {minor}") + if major < 4 or (major == 4 and minor < 3): + # podman < 4.3.0 : the option `keep-id:uid=1000,gid=1000` is not present, we need a workaround + self.change_user_id_workaround(runas_uid, runas_gid) + else: + # use option: keep-id:uid=1000,gid=1000 + self.add_option("--userns", f"keep-id:uid={runas_uid},gid={runas_gid}") + + except json.JSONDecodeError as exc: + raise RuntimeError(f"Unable to parse `podman version` output: {exc}") from exc + except (KeyError, AttributeError) as exc: + raise RuntimeError(f"Unexpected podman version output: Version not found: {exc}") from exc + except ValueError as exc: + raise RuntimeError( + f"Unexpected podman version output: unable to decode major/minor version: {exc}" + ) from exc + + def change_user_id_workaround(self, runas_uid, runas_gid): + """This function aims as preparing a specific UID/GID mapping so that a particular UID/GID maps to the host user + Should be called only for podman < 4.3 source of the hack : https://github.com/containers/podman/blob/main/troubleshooting.md#39-podman-run-fails-with-error-unrecognized-namespace-mode-keep-iduid1000gid1000-passed """ - subuid_size = self.DEFAULT_ID_MAPPING_MAP_SIZE - 1 subgid_size = self.DEFAULT_ID_MAPPING_MAP_SIZE - 1 @@ -113,9 +142,7 @@ def change_user_id(self, runas_uid, runas_gid): logging.debug(f"podman UID mapping: {info['host']['idMappings']['uidmap']}") if info["host"]["idMappings"]["uidmap"] is not None: - subuid_size = ( - sum(i["size"] for i in info["host"]["idMappings"]["uidmap"]) - 1 - ) + subuid_size = sum(i["size"] for i in info["host"]["idMappings"]["uidmap"]) - 1 else: logging.warning( f"the value of host.idMappings.uidmap in 'podman info' is null. \ @@ -123,9 +150,7 @@ def change_user_id(self, runas_uid, runas_gid): DEFAULT_MAP_SIZE {self.DEFAULT_ID_MAPPING_MAP_SIZE} applied" ) if info["host"]["idMappings"]["gidmap"] is not None: - subgid_size = ( - sum(i["size"] for i in info["host"]["idMappings"]["gidmap"]) - 1 - ) + subgid_size = sum(i["size"] for i in info["host"]["idMappings"]["gidmap"]) - 1 else: logging.warning( f"the value of host.idMappings.gidmap in 'podman info' is null. \ @@ -136,25 +161,31 @@ def change_user_id(self, runas_uid, runas_gid): except json.JSONDecodeError as exc: raise RuntimeError(f"Unable to parse `podman info` output: {exc}") from exc except (KeyError, AttributeError) as exc: - raise RuntimeError( - f"Unexpected podman info output: entry not found: {exc}" - ) from exc + raise RuntimeError(f"Unexpected podman info output: entry not found: {exc}") from exc except Exception as exc: logging.error(f"change_user_id unexpected error: {exc}") raise RuntimeError(f"Unable to retrieve podman UID mapping: {exc}") from exc # UID mapping - self.add_option("--uidmap", f"0:1:{runas_uid}") - self.add_option("--uidmap", f"{runas_uid}:0:1") - self.add_option( - "--uidmap", f"{runas_uid+1}:{runas_uid+1}:{subuid_size-runas_uid}" - ) + if subuid_size >= runas_uid: + self.add_option("--uidmap", f"0:1:{runas_uid}") + self.add_option("--uidmap", f"{runas_uid}:0:1") + self.add_option("--uidmap", f"{runas_uid+1}:{runas_uid+1}:{subuid_size-runas_uid}") + logging.debug("podman enabled UID mapping arguments (using uidmap workaround)") + else: + raise RuntimeError( + "subUIDs seem to be disabled/misconfigured for the current user. \ + Rootless podman can not run without subUIDs" + ) # GID mapping - self.add_option("--gidmap", f"0:1:{runas_gid}") - self.add_option("--gidmap", f"{runas_gid}:0:1") - self.add_option( - "--gidmap", f"{runas_gid+1}:{runas_gid+1}:{subgid_size-runas_gid}" - ) - - logging.debug("podman enabled UID/GID mapping arguments") + if subgid_size >= runas_gid: + self.add_option("--gidmap", f"0:1:{runas_gid}") + self.add_option("--gidmap", f"{runas_gid}:0:1") + self.add_option("--gidmap", f"{runas_gid+1}:{runas_gid+1}:{subgid_size-runas_gid}") + logging.debug("podman enabled GID mapping arguments (using uidmap workaround)") + else: + raise RuntimeError( + "subGIDs seem to be disabled/misconfigured for the current user. \ + Rootless podman can not run without subGIDs" + ) diff --git a/scanners/zap/zap.py b/scanners/zap/zap.py index 7c0585d7..c4a9b5bc 100644 --- a/scanners/zap/zap.py +++ b/scanners/zap/zap.py @@ -124,10 +124,7 @@ def data_for_defect_dojo(self): def get_update_command(self): """Returns a list of all options required to update ZAP plugins""" - if not ( - self.my_conf("miscOptions.updateAddons") - or self.my_conf("miscOptions.additionalAddons") - ): + if not (self.my_conf("miscOptions.updateAddons") or self.my_conf("miscOptions.additionalAddons")): return [] command = [ @@ -142,9 +139,7 @@ def get_update_command(self): if isinstance(addons, str): addons = addons.split(",") if len(addons) else [] if not isinstance(addons, list): - logging.warning( - "miscOptions.additionalAddons MUST be either a list or a string of comma-separated values" - ) + logging.warning("miscOptions.additionalAddons MUST be either a list or a string of comma-separated values") addons = [] for addon in addons: @@ -178,9 +173,7 @@ def _setup_zap_cli(self): self.zap_cli.extend(self._get_standard_options()) # Create a session, to store them as evidence - self.zap_cli.extend( - ["-newsession", f"{self.container_work_dir}/session_data/session"] - ) + self.zap_cli.extend(["-newsession", f"{self.container_work_dir}/session_data/session"]) if not self.my_conf("miscOptions.enableUI", default=False): # Disable UI @@ -208,9 +201,7 @@ def _get_standard_options(self): standard = [] # Proxy workaround (because it currently can't be configured from Automation Framework) - p_host, p_port = self.my_conf("proxy.proxyHost"), self.my_conf( - "proxy.proxyPort" - ) + p_host, p_port = self.my_conf("proxy.proxyHost"), self.my_conf("proxy.proxyPort") if p_host and p_port: standard.extend(["-config", f"network.connection.httpProxy.host={p_host}"]) standard.extend(["-config", f"network.connection.httpProxy.port={p_port}"]) @@ -222,9 +213,7 @@ def _get_standard_options(self): # Select a port that is unlikely to collide with anything else, but let the user able to # override it if need be local_port = self.my_conf("miscOptions.zapPort", 47691) - standard.extend( - ["-config", f"network.localServers.mainProxy.port={local_port}"] - ) + standard.extend(["-config", f"network.localServers.mainProxy.port={local_port}"]) # By default, ZAP allocates ΒΌ of the available RAM to the Java process. # This is not efficient when RapiDAST is executed in a dedicated environment. @@ -241,9 +230,7 @@ def _get_standard_options(self): # disabling these 2 rules only here since they might actually be useful else where # pylint: disable=unused-argument def _add_env(self, key, value=None): - logging.warning( - "_add_env() was called on the parent ZAP class. This is likely a bug. No operation done" - ) + logging.warning("_add_env() was called on the parent ZAP class. This is likely a bug. No operation done") def _include_file(self, host_path, dest_in_container=None): """Copies the file from host_path on the host to dest_in_container in the container @@ -324,9 +311,7 @@ def _setup_zap_automation(self): with open(af_template, "r", encoding="utf-8") as stream: self.automation_config = yaml.safe_load(stream) except yaml.YAMLError as exc: - raise RuntimeError( - f"Something went wrong while parsing the config '{af_template}':\n {str(exc)}" - ) from exc + raise RuntimeError(f"Something went wrong while parsing the config '{af_template}':\n {str(exc)}") from exc # Configure the basic environment target try: @@ -404,15 +389,11 @@ def _setup_api(self): # copy the file in the container's result directory # This allows the OpenAPI to be kept as evidence container_openapi_file = f"{self.container_work_dir}/openapi.json" - self._include_file( - host_path=api_file, dest_in_container=container_openapi_file - ) + self._include_file(host_path=api_file, dest_in_container=container_openapi_file) openapi["parameters"]["apiFile"] = container_openapi_file else: - raise ValueError( - "No apiUrl or apiFile is defined in the config, in apiScan.apis" - ) + raise ValueError("No apiUrl or apiFile is defined in the config, in apiScan.apis") # default target: main URL, or can be overridden in apiScan openapi["parameters"]["targetUrl"] = self._append_slash_to_url( @@ -452,54 +433,55 @@ def _setup_verify(self): def _setup_spider(self): """Prepare an spider job and append it to the job list""" - if self.my_conf("spider", default=False) is False: + params = self.config.subtree_to_dict(self.absolute_conf_path("spider")) + if params is None: return - af_spider = { + job = { "name": "spider", "type": "spider", - "parameters": { - "user": Zap.USER if self.authenticated else "", - "maxDuration": self.my_conf("spider.maxDuration", default=0), - "url": self.my_conf("spider.url", default=""), - }, + "parameters": params, } + # Enforce user/context parameters + self._enforce_job_parameters(job) + # Add to includePaths to the context - if self.my_conf("spider.url"): - new_include_path = self.my_conf("spider.url") + ".*" + if params.get("url"): + new_include_path = f"{params['url']}.*" af_context = find_context(self.automation_config) af_context["includePaths"].append(new_include_path) - self.automation_config["jobs"].append(af_spider) + self.automation_config["jobs"].append(job) def _setup_ajax_spider(self): """Prepare an spiderAjax job and append it to the job list""" - if self.my_conf("spiderAjax", default=False) is False: + params = self.config.subtree_to_dict(self.absolute_conf_path("spiderAjax")) + if params is None: return - af_spider_ajax = { + job = { "name": "spiderAjax", "type": "spiderAjax", - "parameters": { - "user": Zap.USER if self.authenticated else "", - "maxDuration": self.my_conf("spiderAjax.maxDuration", default=0), - "url": self.my_conf("spiderAjax.url", default=""), - "browserId": self.my_conf( - "spiderAjax.browserId", default="firefox-headless" - ), - }, + "parameters": params, } + # Enforce user/context parameters + self._enforce_job_parameters(job) + + # Set some RapiDAST-centric defaults + # Unless overwritten, browser should be Firefox-headless, since RapiDAST only has that + if not job["parameters"].get("browserId"): + job["parameters"]["policy"] = "firefox-headless" + # Add to includePaths to the context - ajax_url = self.my_conf("spiderAjax.url") - if ajax_url: - new_include_path = f"{ajax_url}.*" + if params.get("url"): + new_include_path = f"{params['url']}.*" af_context = find_context(self.automation_config) af_context["includePaths"].append(new_include_path) - self.automation_config["jobs"].append(af_spider_ajax) + self.automation_config["jobs"].append(job) def _setup_graphql(self): """Prepare a graphql job and append it to the job list""" @@ -567,22 +549,27 @@ def _setup_passive_wait(self): self.automation_config["jobs"].append(waitfor) def _setup_active_scan(self): - """Adds the active scan job list.""" + """Adds an active scan job list, if there is one""" - if self.my_conf("activeScan", default=False) is False: + params = self.config.subtree_to_dict(self.absolute_conf_path("activeScan")) + if params is None: return - active = { + job = { "name": "activeScan", "type": "activeScan", - "parameters": { - "context": Zap.DEFAULT_CONTEXT, - "user": Zap.USER if self.authenticated else "", - "policy": self.my_conf("activeScan.policy", default="API-scan-minimal"), - }, + "parameters": params, } - self.automation_config["jobs"].append(active) + # Enforce user/context parameters + self._enforce_job_parameters(job) + + # Set some RapiDAST-centric defaults + # unless overwritten, policy should be "API-scan-minimal" + if not job["parameters"].get("policy"): + job["parameters"]["policy"] = "API-scan-minimal" + + self.automation_config["jobs"].append(job) def _construct_report_af(self, report_format): report_af = { @@ -627,22 +614,14 @@ def _setup_report(self): appended = 0 for format_id in formats: try: - logging.debug( - f"report {format_id}, filename: {reports[format_id].name}" - ) - self.automation_config["jobs"].append( - self._construct_report_af(reports[format_id]) - ) + logging.debug(f"report {format_id}, filename: {reports[format_id].name}") + self.automation_config["jobs"].append(self._construct_report_af(reports[format_id])) appended += 1 except KeyError as exc: - logging.warning( - f"Reports: {exc.args[0]} is not a valid format. Ignoring" - ) + logging.warning(f"Reports: {exc.args[0]} is not a valid format. Ignoring") if not appended: logging.warning("Creating a default report as no valid were found") - self.automation_config["jobs"].append( - self._construct_report_af(reports["json"]) - ) + self.automation_config["jobs"].append(self._construct_report_af(reports["json"])) def _setup_summary(self): """Adds a outputSummary job""" @@ -664,6 +643,11 @@ def _save_automation_file(self): f.write(yaml.dump(self.automation_config)) logging.info(f"Saved Automation Framework in {af_host_path}") + def _enforce_job_parameters(self, job): + """Enforce parameters `user` and `context` to a given job""" + job["parameters"]["user"] = Zap.USER if self.authenticated else "" + job["parameters"]["context"] = Zap.DEFAULT_CONTEXT + # Building an authentication factory for ZAP # For every authentication methods: # - Will extract authentication parameters from config's `authentication.parameters` @@ -673,9 +657,7 @@ def _save_automation_file(self): @generic_authentication_factory() def authentication_factory(self): """This is the default function, attached to error reporting""" - raise RuntimeError( - f"No valid authenticator found for ZAP. ZAP current config is: {self.config}" - ) + raise RuntimeError(f"No valid authenticator found for ZAP. ZAP current config is: {self.config}") @authentication_factory.register(None) def authentication_set_anonymous(self): @@ -827,12 +809,11 @@ def authentication_set_oauth2_rtoken(self): "rtoken": rtoken, "url": token_endpoint, } - token = oauth2_get_token_from_rtoken(auth, proxy=self.my_conf("proxy")) + verify = self.config.get("config.tls_verify_for_rapidast_downloads", True) + token = oauth2_get_token_from_rtoken(auth, proxy=self.my_conf("proxy"), verify=verify) if token: # Delete previous config, and creating a new one - logging.debug( - "successfully retrieved a token, hijacking authentication" - ) + logging.debug("successfully retrieved a token, hijacking authentication") self.set_my_conf("authentication.type", "http_header") self.set_my_conf(f"{params_path}", {}) self.set_my_conf(f"{params_path}.name", "Authorization") @@ -840,9 +821,7 @@ def authentication_set_oauth2_rtoken(self): # re-run authentication return self.authentication_factory() else: - logging.warning( - "Preauthentication failed, continuing with regular oauth2" - ) + logging.warning("Preauthentication failed, continuing with regular oauth2") # 1- complete the context: script, verification and user context_["authentication"] = { @@ -938,14 +917,11 @@ def _manual_oauth2_download(self, auth, proxy): for change in changes: url = self.my_conf(change.config_url) + verify = self.config.get("config.tls_verify_for_rapidast_downloads", True) if url: - if authenticated_download_with_rtoken(url, change.path, auth, proxy): - logging.info( - f"Successful download of scanner's {change.config_url}" - ) - self.config.set( - f"scanners.{self.ident}.{change.config_path}", change.path - ) + if authenticated_download_with_rtoken(url, change.path, auth, proxy, verify=verify): + logging.info(f"Successful download of scanner's {change.config_url}") + self.config.set(f"scanners.{self.ident}.{change.config_path}", change.path) self.config.delete(f"scanners.{self.ident}.{change.config_url}") else: logging.warning("Failed to download scanner's {change.config_url}") diff --git a/scanners/zap/zap_none.py b/scanners/zap/zap_none.py index 552b246b..bb171103 100644 --- a/scanners/zap/zap_none.py +++ b/scanners/zap/zap_none.py @@ -129,22 +129,16 @@ def run(self): cli = ["sh", "-c", self._zap_cli_list_to_str_for_sh(self.zap_cli)] result = subprocess.run(cli, check=False) - logging.debug( - f"ZAP returned the following:\n=====\n{pp.pformat(result)}\n=====" - ) + logging.debug(f"ZAP returned the following:\n=====\n{pp.pformat(result)}\n=====") # Zap's return codes : https://www.zaproxy.org/docs/desktop/addons/automation-framework/ if result.returncode in [0, 2]: # 0: ZAP returned correctly. 2: ZAP returned warning - logging.info( - f"The ZAP process finished with no errors, and exited with code {result.returncode}" - ) + logging.info(f"The ZAP process finished with no errors, and exited with code {result.returncode}") self.state = State.DONE else: # 1: Zap hit an error - logging.warning( - f"The ZAP process did not finish correctly, and exited with code {result.returncode}" - ) + logging.warning(f"The ZAP process did not finish correctly, and exited with code {result.returncode}") self.state = State.ERROR def postprocess(self): @@ -192,9 +186,7 @@ def _setup_ajax_spider(self): "Make sure that /dev/shm/ is at least 1GB in size [ideally at least 2GB]" ) except FileNotFoundError: - logging.warning( - "/dev/shm not present. Unable to calcuate shared memory size" - ) + logging.warning("/dev/shm not present. Unable to calcuate shared memory size") # Firefox tends to use _a lot_ of threads # Assume we're regulated by cgroup v2 @@ -202,13 +194,9 @@ def _setup_ajax_spider(self): with open("/sys/fs/cgroup/pids.max", encoding="utf-8") as f: pid_val = f.readline().rstrip() if pid_val == "max" or int(pid_val) > 10000: - logging.debug( - f"cgroup v2 has a sufficient pid limit: {pid_val}" - ) + logging.debug(f"cgroup v2 has a sufficient pid limit: {pid_val}") else: - logging.warning( - f"Number of threads may be too low for SpiderAjax: cgroupv2 pids.max={pid_val}" - ) + logging.warning(f"Number of threads may be too low for SpiderAjax: cgroupv2 pids.max={pid_val}") except FileNotFoundError: # open /sys/fs/cgroup/pids.max failed: root cgroup (unlimited pids) or no cgroup v2 at all. # assume the former @@ -298,10 +286,7 @@ def _check_plugin_status(self): result = subprocess.run(command, check=False, capture_output=True) if result.returncode == 0: logging.debug("ZAP appears to be in a correct state") - elif ( - result.stderr.find(bytes("The mandatory add-on was not found:", "ascii")) - > 0 - ): + elif result.stderr.find(bytes("The mandatory add-on was not found:", "ascii")) > 0: logging.info("Missing mandatory plugins. Fixing") url_root = "https://github.com/zaproxy/zap-extensions/releases/download" anonymous_download( @@ -326,9 +311,7 @@ def _check_plugin_status(self): result = subprocess.run(command, check=False) else: - logging.warning( - f"ZAP appears to be in a incorrect state. Error: {result.stderr}" - ) + logging.warning(f"ZAP appears to be in a incorrect state. Error: {result.stderr}") def _create_home_if_needed(self): """Some tools (most notably: ZAP's Ajax Spider with Firefox) require a writable home directory. diff --git a/scanners/zap/zap_podman.py b/scanners/zap/zap_podman.py index f7c08e79..131a97fe 100644 --- a/scanners/zap/zap_podman.py +++ b/scanners/zap/zap_podman.py @@ -109,30 +109,22 @@ def run(self): # DO STUFF logging.info(f"Running ZAP with the following command:\n{cli}") result = subprocess.run(cli, check=False) - logging.debug( - f"ZAP returned the following:\n=====\n{pp.pformat(result)}\n=====" - ) + logging.debug(f"ZAP returned the following:\n=====\n{pp.pformat(result)}\n=====") # Zap's return codes : https://www.zaproxy.org/docs/desktop/addons/automation-framework/ if result.returncode in [0, 2]: # 0: ZAP returned correctly. 2: ZAP returned warning - logging.info( - f"The ZAP process finished with no errors, and exited with code {result.returncode}" - ) + logging.info(f"The ZAP process finished with no errors, and exited with code {result.returncode}") self.state = State.DONE else: # 1: Zap hit an error, >125 : podman returned an error - logging.warning( - f"The ZAP process did not finish correctly, and exited with code {result.returncode}" - ) + logging.warning(f"The ZAP process did not finish correctly, and exited with code {result.returncode}") self.state = State.ERROR def postprocess(self): logging.info("Running postprocess for the ZAP Podman environment") if not self.state == State.DONE: - raise RuntimeError( - "No post-processing as ZAP has not successfully run yet." - ) + raise RuntimeError("No post-processing as ZAP has not successfully run yet.") super().postprocess() diff --git a/tests/configmodel/test_configmodel.py b/tests/configmodel/test_configmodel.py index 06f95f8a..3c9a473b 100644 --- a/tests/configmodel/test_configmodel.py +++ b/tests/configmodel/test_configmodel.py @@ -1,3 +1,4 @@ +import copy import os import pytest @@ -31,6 +32,66 @@ def generate_some_nested_config(): } +@pytest.fixture(name="nested_with_var") +def generate_some_nested_config_with_var(): + os.environ["SECRETKEY"] = "ABC" + return { + "key1": "value1", + "key2": {"key21": "value21"}, + "key3": "value3", + "key4": "value4", + "nested": { + "morenested": { + "key3": "nestedvalue", + "secretkey_from_var": "SECRETKEY", + "nestnest": {"leaf": "value"}, + }, + "list": [1, 2, 3, {"foo_from_var": "SECRETKEY"}, 4, 5], + }, + "nothing": None, + "falsekey": False, + } + + +def test_subtree_to_dict(nested_with_var): + myconf = RapidastConfigModel(nested_with_var) + + # make a "backup" of the original config, to look for unexpected modification + original = copy.deepcopy(myconf.conf) + + d = myconf.subtree_to_dict("nested.morenested") + expected = { + "key3": "nestedvalue", + "secretkey": "ABC", + "nestnest": {"leaf": "value"}, + } + assert d == expected + # also verify that the original config dictionary was not modified + assert original == myconf.conf + + # same test, one layer up + d = myconf.subtree_to_dict("nested") + expected = { + "morenested": { + "key3": "nestedvalue", + "secretkey": "ABC", + "nestnest": {"leaf": "value"}, + }, + "list": [1, 2, 3, {"foo": "ABC"}, 4, 5], + } + assert d == expected + # also verify that the original config dictionary was not modified + assert original == myconf.conf + + # pointing to a non-dictionary generates a KeyError + with pytest.raises(KeyError): + myconf.subtree_to_dict("key1") + + # pointing to a non existing entry return an empty dict + d = myconf.subtree_to_dict("nested.foo") + assert d == None + + def test_configmodel_exists(some_nested_config): myconf = RapidastConfigModel(some_nested_config) diff --git a/tests/configmodel/test_convert.py b/tests/configmodel/test_convert.py index bf8ff206..54b996ae 100644 --- a/tests/configmodel/test_convert.py +++ b/tests/configmodel/test_convert.py @@ -38,9 +38,7 @@ def test_v2_to_v3(config_v2): newconf = configmodel.converter.convert_from_version_2_to_3(oldconf) # Check that new path was created - assert newconf.get("scanners.zap.miscOptions.updateAddons", "x") == oldconf.get( - "scanners.zap.updateAddons", "y" - ) + assert newconf.get("scanners.zap.miscOptions.updateAddons", "x") == oldconf.get("scanners.zap.updateAddons", "y") # Check that old path was deleted assert not newconf.exists("scanners.zap.updateAddons") @@ -60,9 +58,9 @@ def test_v4_to_v5(config_v4): newconf = configmodel.converter.convert_from_version_4_to_5(oldconf) # Check that new path was created - assert newconf.get( - "scanners.zap.miscOptions.oauth2ManualDownload", "x" - ) == oldconf.get("scanners.zap.miscOptions.oauth2OpenapiManualDownload", "y") + assert newconf.get("scanners.zap.miscOptions.oauth2ManualDownload", "x") == oldconf.get( + "scanners.zap.miscOptions.oauth2OpenapiManualDownload", "y" + ) # Check that old path was deleted assert not newconf.exists("scanners.zap.miscOptions.oauth2OpenapiManualDownload") @@ -79,12 +77,8 @@ def test_v1_to_v2(config_v1): def test_v0_to_v1(config_v0): conf_v1 = configmodel.converter.convert_from_version_0_to_1(config_v0) - assert conf_v1.get("application.shortName", "x") == config_v0.get( - "general.serviceName", "y" - ) - assert conf_v1.get("scanners.zap.activeScan.policy", "x") == config_v0.get( - "scan.policies.scanPolicyName", "y" - ) + assert conf_v1.get("application.shortName", "x") == config_v0.get("general.serviceName", "y") + assert conf_v1.get("scanners.zap.activeScan.policy", "x") == config_v0.get("scan.policies.scanPolicyName", "y") def test_basic_config_updater(): @@ -97,10 +91,7 @@ def test_basic_config_updater(): oldest = configmodel.RapidastConfigModel({}) last = configmodel.converter.update_to_latest_config(oldest) - assert ( - int(last.get("config.configVersion")) - == configmodel.converter.CURR_CONFIG_VERSION - ) + assert int(last.get("config.configVersion")) == configmodel.converter.CURR_CONFIG_VERSION if __name__ == "__main__": diff --git a/tests/exports/test_google_cloud_storage.py b/tests/exports/test_google_cloud_storage.py index f438cf89..beabb5ea 100644 --- a/tests/exports/test_google_cloud_storage.py +++ b/tests/exports/test_google_cloud_storage.py @@ -1,11 +1,12 @@ -import pytest - -from unittest.mock import Mock, MagicMock, patch, mock_open - import datetime +from unittest.mock import MagicMock +from unittest.mock import Mock +from unittest.mock import mock_open +from unittest.mock import patch -from exports.google_cloud_storage import GoogleCloudStorage +import pytest +from exports.google_cloud_storage import GoogleCloudStorage @patch("exports.google_cloud_storage.storage.Client.from_service_account_json") @@ -21,6 +22,7 @@ def test_GCS_simple_init_keyfile(mock_from_json): mock_from_json.assert_called_once_with("/key/file.json") mock_client.get_bucket.assert_called_once_with("bucket_name") + @patch("exports.google_cloud_storage.storage.Client") def test_GCS_simple_init_no_keyfile(mock_client): gcs = GoogleCloudStorage("bucket_name", "app_name", "directory_name") @@ -33,7 +35,6 @@ def test_GCS_simple_init_no_keyfile(mock_client): @patch("exports.google_cloud_storage.storage.Client") @patch("exports.google_cloud_storage.uuid") def test_GCS_create_metadata(mock_uuid, mock_client): - mock_uuid.uuid1.return_value = 123 gcs = GoogleCloudStorage("bucket_name", "app_name", "directory_name") @@ -62,7 +63,7 @@ def test_GCS_export_scan(MockRandom, MockDateTime, MockClient): # Forcing the date mock_now = MagicMock() - mock_now.isoformat.return_value = '2024-01-31T00:00:00' + mock_now.isoformat.return_value = "2024-01-31T00:00:00" MockDateTime.now.return_value = mock_now # catching the Client @@ -83,10 +84,7 @@ def test_GCS_export_scan(MockRandom, MockDateTime, MockClient): gcs = GoogleCloudStorage("bucket_name", "app_name", "directory_name") - import_data = { - "scan_type": "ABC", - "foo": "bar" - } + import_data = {"scan_type": "ABC", "foo": "bar"} # hack: use the pytest file itself as a scan gcs.export_scan(import_data, __file__) diff --git a/tests/scanners/generic/test_generic.py b/tests/scanners/generic/test_generic.py index 242428ce..118896c0 100644 --- a/tests/scanners/generic/test_generic.py +++ b/tests/scanners/generic/test_generic.py @@ -19,9 +19,7 @@ def test_generic_podman_cli(test_config): scanner = GenericPodman(config=test_config) scanner.setup() - assert {"podman", "run", "--name", "myimage", "--pod", "myPod"}.issubset( - set(scanner.podman.get_complete_cli()) - ) + assert {"podman", "run", "--name", "myimage", "--pod", "myPod"}.issubset(set(scanner.podman.get_complete_cli())) def test_generic_podman_volume(test_config): diff --git a/tests/scanners/generic/tools/test_convert_trivy_k8s.py b/tests/scanners/generic/tools/test_convert_trivy_k8s.py index 2cbe3cde..c9ab63b3 100644 --- a/tests/scanners/generic/tools/test_convert_trivy_k8s.py +++ b/tests/scanners/generic/tools/test_convert_trivy_k8s.py @@ -4,6 +4,7 @@ import pytest from scanners.generic.tools.convert_trivy_k8s_to_sarif import convert_json_to_sarif +from scanners.generic.tools.convert_trivy_k8s_to_sarif import read_json_block TEST_DATA_DIR = "tests/scanners/generic/tools/test_data_convert_trivy_k8s/" @@ -20,6 +21,14 @@ def _assert_default_sarif_info(sarif): return True +def test_read_json_block(): + json_file = TEST_DATA_DIR + "sample-single-result.json" + json_assert = json.load(open(json_file)) + + json_test = read_json_block(json_file) + assert json_test == json_assert + + def test_convert_json_to_sarif(): json_file = TEST_DATA_DIR + "sample-single-result.json" json_data = json.load(open(json_file)) @@ -55,3 +64,20 @@ def test_empty_json(): json_data = json.loads("[]") assert _assert_default_sarif_info(convert_json_to_sarif(json_data)) + + +def test_convert_json_to_sarif_no_duplicate_rules_with_same_id(): + json_file = TEST_DATA_DIR + "sample-misconfig-findings-with-same-rule.json" + json_data = json.load(open(json_file, encoding="utf-8")) + + expected_rules = [ + {"id": "RULE001", "name": "First Rule Title", "shortDescription": {"text": "First rule description"}} + ] + + sarif_result = convert_json_to_sarif(json_data) + + assert sarif_result["runs"][0]["tool"]["driver"]["rules"] == expected_rules + + assert len(sarif_result["runs"][0]["results"]) == 2 + assert sarif_result["runs"][0]["results"][0]["ruleId"] == "RULE001" + assert sarif_result["runs"][0]["results"][1]["ruleId"] == "RULE001" diff --git a/tests/scanners/generic/tools/test_data_convert_trivy_k8s/sample-misconfig-findings-with-same-rule.json b/tests/scanners/generic/tools/test_data_convert_trivy_k8s/sample-misconfig-findings-with-same-rule.json new file mode 100644 index 00000000..0b8f6436 --- /dev/null +++ b/tests/scanners/generic/tools/test_data_convert_trivy_k8s/sample-misconfig-findings-with-same-rule.json @@ -0,0 +1,46 @@ +{ + "ClusterName": "cluster1", + "Resources": [ + { + "Results": [ + { + "Target": "resource1.yaml", + "Misconfigurations": [ + { + "ID": "RULE001", + "Severity": "HIGH", + "Message": "First message", + "Title": "First Rule Title", + "Description": "First rule description", + "CauseMetadata": { + "StartLine": 1, + "EndLine": 2, + "Code": { + "Lines": [ + "some code line" + ] + } + } + }, + { + "ID": "RULE001", + "Severity": "HIGH", + "Message": "Second message", + "Title": "First Rule Title", + "Description": "First rule description", + "CauseMetadata": { + "StartLine": 3, + "EndLine": 4, + "Code": { + "Lines": [ + "another code line" + ] + } + } + } + ] + } + ] + } + ] +} diff --git a/tests/scanners/generic/tools/test_oobtkube.py b/tests/scanners/generic/tools/test_oobtkube.py index f71329ae..06c76b19 100644 --- a/tests/scanners/generic/tools/test_oobtkube.py +++ b/tests/scanners/generic/tools/test_oobtkube.py @@ -35,22 +35,15 @@ def test_find_leaf_keys_and_test(mock_system, test_data, caplog): total_leaf_keys = oobtkube.count_total_leaf_keys(test_data) - oobtkube.find_leaf_keys_and_test( - test_data, "cr_test_file", "10.10.10.10", "12345", total_leaf_keys - ) + oobtkube.find_leaf_keys_and_test(test_data, "cr_test_file", "10.10.10.10", "12345", total_leaf_keys) processed_count = 0 leaves = ["leaf1", "leaf2", "leaf3"] for leaf_key in leaves: processed_count += 1 - assert ( - f"Testing a leaf key: '{leaf_key}', ({processed_count} / {total_leaf_keys})" - in caplog.text - ) - - assert ( - mock_system.call_count == 6 - ) # Each leaf key runs `sed` and `kubectl` commands (2 calls per key) + assert f"Testing a leaf key: '{leaf_key}', ({processed_count} / {total_leaf_keys})" in caplog.text + + assert mock_system.call_count == 6 # Each leaf key runs `sed` and `kubectl` commands (2 calls per key) def test_parse_resource_yaml(): diff --git a/tests/scanners/test_downloaders.py b/tests/scanners/test_downloaders.py index 36ac95de..e49597a6 100644 --- a/tests/scanners/test_downloaders.py +++ b/tests/scanners/test_downloaders.py @@ -1,12 +1,12 @@ -from unittest.mock import Mock +from collections import namedtuple from unittest.mock import MagicMock +from unittest.mock import Mock from unittest.mock import patch import pytest from scanners import downloaders -from collections import namedtuple @pytest.fixture(scope="function") def my_auth(): @@ -16,6 +16,7 @@ def my_auth(): "rtoken": "aut_rtoken", } + @pytest.fixture(scope="function") def my_proxy(): proxy = { @@ -23,13 +24,13 @@ def my_proxy(): "proxyPort": "proxyPort", } + @patch("scanners.downloaders.requests.get") def test_anonymous_download(mock_get, my_proxy): - def request_get(url, allow_redirects=True, proxies=None): + def request_get(url, allow_redirects=True, proxies=None, verify=True): Response = namedtuple("Response", ["status_code", "content"]) return Response(status_code=200, content="content") - mock_get.side_effect = request_get ret = downloaders.anonymous_download("url", dest=None, proxy=my_proxy) @@ -37,43 +38,36 @@ def request_get(url, allow_redirects=True, proxies=None): assert ret == "content" - - @patch("scanners.downloaders.requests.Session") def test_oauth2_get_token_from_rtoken(mock_session, my_auth, my_proxy): - def fake_Session(): - def fake_post(url, **kwargs): + class fake_Session: + def post(self, url, **kwargs): Post = namedtuple("Post", ["raise_for_status", "text"]) return Post(raise_for_status=lambda: None, text=b"{'access_token':123}") - Session = namedtuple("Session", ["post"]) - return Session(post=fake_post) - mock_session.side_effect = fake_Session rtoken = downloaders.oauth2_get_token_from_rtoken(auth=my_auth, proxy=my_proxy, session=None) assert rtoken == 123 + @patch("scanners.downloaders.requests.Session") @patch("scanners.downloaders.oauth2_get_token_from_rtoken") @patch("builtins.open") def test_authenticated_download_with_rtoken(mock_open, mock_get_rtoken, mock_session, my_auth, my_proxy): - def fake_Session(): - def fake_post(url, **kwargs): + class fake_Session: + def post(self, url, **kwargs): Post = namedtuple("Post", ["raise_for_status", "text"]) return Post(raise_for_status=lambda: None, text=b"{'access_token':123}") - def fake_get(url, **kwargs): + + def get(self, url, **kwargs): Get = namedtuple("Get", ["status_code", "text"]) return Get(status_code=200, text="text") - Session = namedtuple("Session", ["post", "get"]) - return Session(post=fake_post, get=fake_get) - mock_session.side_effect = fake_Session mock_get_rtoken.return_value = "123" mock_open.return_value = MagicMock() res = downloaders.authenticated_download_with_rtoken("url", "Nowhere", auth=my_auth, proxy=my_proxy) assert res == True - diff --git a/tests/scanners/test_path_translators.py b/tests/scanners/test_path_translators.py index 9f47418e..9b78a813 100644 --- a/tests/scanners/test_path_translators.py +++ b/tests/scanners/test_path_translators.py @@ -7,10 +7,5 @@ def test_path_translation(): id3 = ("id3", "/z/x/c/v", "/b/n/m") path_map = make_mapping_for_scanner("Test", id1, id2, id3) - assert ( - path_map.host_2_container("/a/s/d/f/g/subdir/myfile") - == "/h/j/k/l/subdir/myfile" - ) - assert ( - path_map.container_2_host("/b//n/m/subdir/myfile") == "/z/x/c/v/subdir/myfile" - ) + assert path_map.host_2_container("/a/s/d/f/g/subdir/myfile") == "/h/j/k/l/subdir/myfile" + assert path_map.container_2_host("/b//n/m/subdir/myfile") == "/z/x/c/v/subdir/myfile" diff --git a/tests/scanners/test_podman_wrapper.py b/tests/scanners/test_podman_wrapper.py index 2077ed21..7623a884 100644 --- a/tests/scanners/test_podman_wrapper.py +++ b/tests/scanners/test_podman_wrapper.py @@ -1,18 +1,70 @@ import shutil +import subprocess +from unittest.mock import patch import pytest from scanners.podman_wrapper import PodmanWrapper -@pytest.mark.skipif( - shutil.which("podman") == False, reason="podman is required for this test" -) -def test_podman_mappings(): +@patch("scanners.podman_wrapper.subprocess.run") +def test_change_user_id(mock_subprocess): wrap = PodmanWrapper(app_name="pytest", scan_name="pytest", image="nothing") + version = '{"Client":{"APIVersion":"5.2.2","Version":"5.2.2","GoVersion":"go1.22.6","GitCommit":"","BuiltTime":"Wed Aug 21 02:00:00 2024","Built":1724198400,"OsArch":"linux/amd64","Os":"linux"}}' + run = subprocess.CompletedProcess(args=None, returncode=0, stdout=version.encode("utf-8")) + + mock_subprocess.return_value = run + wrap.change_user_id(1000, 1000) + i = wrap.opts.index("--userns") + assert wrap.opts[i + 1] == "keep-id:uid=1000,gid=1000" + + +@patch("scanners.podman_wrapper.subprocess.run") +def test_change_user_id_workaround(mock_subprocess): + wrap = PodmanWrapper(app_name="pytest", scan_name="pytest", image="nothing") + + info = """ +{ + "host": { + "idMappings": { + "gidmap": [ + { + "container_id": 0, + "host_id": 1000, + "size": 1 + }, + { + "container_id": 1, + "host_id": 524288, + "size": 65536 + } + ], + "uidmap": [ + { + "container_id": 0, + "host_id": 1000, + "size": 1 + }, + { + "container_id": 1, + "host_id": 524288, + "size": 65536 + } + ] + } + } +} +""" + + run = subprocess.CompletedProcess(args=None, returncode=0, stdout=info.encode("utf-8")) + + mock_subprocess.return_value = run + + wrap.change_user_id_workaround(1000, 1000) + assert "--uidmap" in wrap.opts assert "0:1:1000" in wrap.opts assert "--gidmap" in wrap.opts diff --git a/tests/scanners/zap/test_setup.py b/tests/scanners/zap/test_setup.py index c4ed74f0..b9ddde13 100644 --- a/tests/scanners/zap/test_setup.py +++ b/tests/scanners/zap/test_setup.py @@ -16,9 +16,7 @@ @pytest.fixture(scope="function") def test_config(): - return configmodel.RapidastConfigModel( - {"application": {"url": "http://example.com"}} - ) + return configmodel.RapidastConfigModel({"application": {"url": "http://example.com"}}) ## Basic test @@ -29,10 +27,7 @@ def test_setup_openapi(test_config): test_zap.setup() # a '/' should have been appended - assert ( - test_zap.automation_config["env"]["contexts"][0]["urls"][0] - == "http://example.com/" - ) + assert test_zap.automation_config["env"]["contexts"][0]["urls"][0] == "http://example.com/" for item in test_zap.automation_config["jobs"]: if item["type"] == "openapi": @@ -85,9 +80,7 @@ def test_setup_authentication_invalid_auth_configured(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -105,9 +98,7 @@ def test_setup_authentication_http_header(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -125,9 +116,7 @@ def test_setup_authentication_cookie(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -144,9 +133,7 @@ def test_setup_authentication_http_basic(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -168,9 +155,7 @@ def test_setup_authentication_auth_rtoken_configured(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -179,10 +164,7 @@ def test_setup_authentication_auth_rtoken_configured(test_config): test_zap.setup() assert test_zap.authenticated == True # TODO: check "RTOKEN" - assert ( - test_zap.automation_config["jobs"][0]["parameters"]["name"] - == "add-bearer-token" - ) + assert test_zap.automation_config["jobs"][0]["parameters"]["name"] == "add-bearer-token" def test_setup_authentication_auth_rtoken_preauth(test_config): @@ -199,9 +181,7 @@ def test_setup_authentication_auth_rtoken_preauth(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") test_zap = ZapNone(config=test_config) @@ -224,9 +204,7 @@ def test_setup_import_urls(test_config): def test_setup_exclude_urls(test_config): test_config.set("scanners.zap.urls.excludes", ["abc", "def"]) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") test_zap = ZapNone(config=test_config) test_zap.setup() @@ -237,9 +215,7 @@ def test_setup_exclude_urls(test_config): def test_setup_include_urls(test_config): test_config.set("scanners.zap.urls.includes", ["abc", "def"]) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") test_zap = ZapNone(config=test_config) test_zap.setup() @@ -248,19 +224,40 @@ def test_setup_include_urls(test_config): assert "def" in find_context(test_zap.automation_config)["includePaths"] +def test_setup_active_scan(test_config): + test_config.set("scanners.zap.activeScan.maxRuleDurationInMins", 10) + + test_zap = ZapNone(config=test_config) + test_zap.setup() + + for item in test_zap.automation_config["jobs"]: + if item["type"] == "activeScan": + assert item["parameters"]["policy"] == "API-scan-minimal" + assert item["parameters"]["maxRuleDurationInMins"] == 10 + assert item["parameters"]["context"] == "Default Context" + assert item["parameters"]["user"] == "" + break + else: + assert False + + def test_setup_ajax(test_config): test_config.set("scanners.zap.spiderAjax.maxDuration", 10) test_config.set("scanners.zap.spiderAjax.url", "http://test.com") test_config.set("scanners.zap.spiderAjax.browserId", "chrome-headless") + test_config.set("scanners.zap.spiderAjax.maxCrawlState", 3) test_zap = ZapNone(config=test_config) test_zap.setup() for item in test_zap.automation_config["jobs"]: if item["type"] == "spiderAjax": + assert item["parameters"]["context"] == "Default Context" assert item["parameters"]["maxDuration"] == 10 + assert item["parameters"]["user"] == "" assert item["parameters"]["url"] == "http://test.com" assert item["parameters"]["browserId"] == "chrome-headless" + assert item["parameters"]["maxCrawlState"] == 3 break else: assert False @@ -281,10 +278,7 @@ def test_setup_graphql(test_config): if item["type"] == "graphql": assert item["parameters"]["endpoint"] == TEST_GRAPHQL_ENDPOINT assert item["parameters"]["schemaUrl"] == TEST_GRAPHQL_SCHEMA_URL - assert ( - item["parameters"]["schemaFile"] - == f"{test_zap.container_work_dir}/schema.graphql" - ) + assert item["parameters"]["schemaFile"] == f"{test_zap.container_work_dir}/schema.graphql" break else: assert False, "graphql job not found" @@ -384,18 +378,14 @@ def test_setup_override_cfg(test_config): override_cfg1 = "formhandler.fields.field(0).fieldId=namespace" override_cfg2 = "formhandler.fields.field(0).value=default" - test_config.set( - "scanners.zap.miscOptions.overrideConfigs", [override_cfg1, override_cfg2] - ) + test_config.set("scanners.zap.miscOptions.overrideConfigs", [override_cfg1, override_cfg2]) test_zap = ZapNone(config=test_config) test_zap.setup() assert f"{override_cfg1}" in test_zap.zap_cli assert f"{override_cfg2}" in test_zap.zap_cli - assert r"formhandler.fields.field\(0\)" in test_zap._zap_cli_list_to_str_for_sh( - test_zap.zap_cli - ) + assert r"formhandler.fields.field\(0\)" in test_zap._zap_cli_list_to_str_for_sh(test_zap.zap_cli) def test_setup_override_non_list_format(test_config): diff --git a/tests/scanners/zap/test_setup_none.py b/tests/scanners/zap/test_setup_none.py index b6069e86..bdb953a8 100644 --- a/tests/scanners/zap/test_setup_none.py +++ b/tests/scanners/zap/test_setup_none.py @@ -12,9 +12,7 @@ @pytest.fixture(scope="function") def test_config(): - return configmodel.RapidastConfigModel( - {"application": {"url": "http://example.com"}} - ) + return configmodel.RapidastConfigModel({"application": {"url": "http://example.com"}}) @patch("scanners.zap.zap_none.platform.system") @@ -37,9 +35,7 @@ def test_none_handling_ajax(mock_warning, mock_disk_usage, mock_system, test_con test_zap._setup_ajax_spider() mock_pidsmax.assert_called_once_with("/sys/fs/cgroup/pids.max", encoding="utf-8") - mock_warning.assert_any_call( - "Number of threads may be too low for SpiderAjax: cgroupv2 pids.max=42" - ) + mock_warning.assert_any_call("Number of threads may be too low for SpiderAjax: cgroupv2 pids.max=42") mock_warning.assert_any_call( "Insufficient shared memory to run an Ajax Spider correctly (67108864 bytes). " "Make sure that /dev/shm/ is at least 1GB in size [ideally at least 2GB]" @@ -56,11 +52,7 @@ def test_zap_none_postprocess(mock_tarfile, mock_copytree, mock_warning, test_co with patch("builtins.open", mock_open(read_data="max 2\n")) as mock_pidsevents: test_zap.postprocess() - mock_pidsevents.assert_called_once_with( - "/sys/fs/cgroup/pids.events", encoding="utf-8" - ) - mock_warning.assert_any_call( - "Scanner may have been throttled by CGroupv2 PID limits: pids.events reports max 2" - ) + mock_pidsevents.assert_called_once_with("/sys/fs/cgroup/pids.events", encoding="utf-8") + mock_warning.assert_any_call("Scanner may have been throttled by CGroupv2 PID limits: pids.events reports max 2") assert test_zap.state == State.PROCESSED diff --git a/tests/scanners/zap/test_setup_podman.py b/tests/scanners/zap/test_setup_podman.py index aebc24fe..550458ac 100644 --- a/tests/scanners/zap/test_setup_podman.py +++ b/tests/scanners/zap/test_setup_podman.py @@ -15,9 +15,7 @@ @pytest.fixture(scope="function") def test_config(): - return configmodel.RapidastConfigModel( - {"application": {"url": "http://example.com"}} - ) + return configmodel.RapidastConfigModel({"application": {"url": "http://example.com"}}) ## Testing Authentication methods ## @@ -29,9 +27,7 @@ def test_setup_podman_authentication_invalid_auth_configured(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -49,9 +45,7 @@ def test_setup_podman_authentication_http_header(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -69,19 +63,14 @@ def test_setup_podman_authentication_cookie(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) test_zap = ZapPodman(config=test_config) test_zap.setup() assert test_zap.authenticated == False - assert ( - "ZAP_AUTH_HEADER_VALUE=mycookiename=mycookieval" - in test_zap.podman.get_complete_cli() - ) + assert "ZAP_AUTH_HEADER_VALUE=mycookiename=mycookieval" in test_zap.podman.get_complete_cli() def test_setup_podman_authentication_http_basic(test_config): @@ -91,19 +80,14 @@ def test_setup_podman_authentication_http_basic(test_config): } test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) test_zap = ZapPodman(config=test_config) test_zap.setup() assert test_zap.authenticated == False - assert ( - "ZAP_AUTH_HEADER_VALUE=Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==" - in test_zap.podman.get_complete_cli() - ) + assert "ZAP_AUTH_HEADER_VALUE=Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==" in test_zap.podman.get_complete_cli() def test_setup_podman_authentication_auth_rtoken_configured(test_config): @@ -123,9 +107,7 @@ def test_setup_podman_authentication_auth_rtoken_configured(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") print(test_config) @@ -134,10 +116,7 @@ def test_setup_podman_authentication_auth_rtoken_configured(test_config): test_zap.setup() assert test_zap.authenticated == True assert "RTOKEN" in test_zap.podman.get_complete_cli() - assert ( - test_zap.automation_config["jobs"][0]["parameters"]["name"] - == "add-bearer-token" - ) + assert test_zap.automation_config["jobs"][0]["parameters"]["name"] == "add-bearer-token" def test_setup_podman_authentication_auth_rtoken_preauth(test_config): @@ -154,9 +133,7 @@ def test_setup_podman_authentication_auth_rtoken_preauth(test_config): test_config.set("general.authentication", authentication) - test_config.merge( - test_config.get("general", default={}), preserve=False, root=f"scanners.zap" - ) + test_config.merge(test_config.get("general", default={}), preserve=False, root=f"scanners.zap") test_zap = ZapPodman(config=test_config) diff --git a/tests/test_defectdojo_integration.py b/tests/test_defectdojo_integration.py index c6b9e025..876ab868 100644 --- a/tests/test_defectdojo_integration.py +++ b/tests/test_defectdojo_integration.py @@ -36,8 +36,6 @@ def test_dd_parameters(): with pytest.raises(KeyError): defect_d.params["verify"] - defect_d = DefectDojo( - "https://127.0.0.1:12345", token="random_token", ssl="CAbundle" - ) + defect_d = DefectDojo("https://127.0.0.1:12345", token="random_token", ssl="CAbundle") assert defect_d.params["timeout"] == DefectDojo.DD_CONNECT_TIMEOUT assert defect_d.params["verify"] == "CAbundle" diff --git a/tools/updater_config.py b/tools/updater_config.py index 61206bda..4472c258 100755 --- a/tools/updater_config.py +++ b/tools/updater_config.py @@ -39,14 +39,10 @@ args.loglevel = args.loglevel.upper() add_logging_level("VERBOSE", logging.DEBUG + 5) logging.basicConfig(format="%(levelname)s:%(message)s", level=args.loglevel) - logging.debug( - f"log level set to debug. Config file: '{parser.parse_args().config_file.name}'" - ) + logging.debug(f"log level set to debug. Config file: '{parser.parse_args().config_file.name}'") try: - config = configmodel.RapidastConfigModel( - yaml.safe_load(parser.parse_args().config_file) - ) + config = configmodel.RapidastConfigModel(yaml.safe_load(parser.parse_args().config_file)) except yaml.YAMLError as exc: raise RuntimeError( f"Something went wrong while parsing one of the config '{parser.parse_args().config_file}':\n {str(exc)}" diff --git a/utils/remove_openapi_ref_recursion.py b/utils/remove_openapi_ref_recursion.py index 9520990c..fdff3148 100644 --- a/utils/remove_openapi_ref_recursion.py +++ b/utils/remove_openapi_ref_recursion.py @@ -57,9 +57,7 @@ def main(input_file, output_file, debug): if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Remove recursive $ref from OpenAPI JSON file." - ) + parser = argparse.ArgumentParser(description="Remove recursive $ref from OpenAPI JSON file.") parser.add_argument("-f", "--file", required=True, help="Input OpenAPI JSON file") parser.add_argument( "-o", @@ -67,9 +65,7 @@ def main(input_file, output_file, debug): default="cleaned_openapi.json", help="Output file for cleaned OpenAPI JSON (default: cleaned_openapi.json)", ) - parser.add_argument( - "-d", "--debug", action="store_true", help="Enable debug messages" - ) + parser.add_argument("-d", "--debug", action="store_true", help="Enable debug messages") args = parser.parse_args()