diff --git a/README.md b/README.md index 745bdfaf..4ed58a1a 100644 --- a/README.md +++ b/README.md @@ -97,9 +97,10 @@ The scanners will communicate anonymously with the application This method describes required parameters needed to retrieve an access token, using a refresh token as a secret. + authentication type : `oauth2_rtoken` + parameters : - * `token_endpoint` : the URL to which send the refresh token + * `token_endpoint`: the URL to which send the refresh token * `client_id` : the client ID * `rtoken_var_name`: for practical reasons, the refresh token is provided using environment variables. This entry describes the name of the variable containing the secret refresh token + * `preauth`: Pre-generate a token and force ZAP to use it throughout the session (the session token will not be refreshed after it's expired). Default: False. This is only useful for scans sufficiently short that it will be finished before the token expires - HTTP Basic: This method describes the HTTP Basic Authorization Header. The username and password must be provided in plaintext and will be encoded by the scanners diff --git a/config/config-template-zap-long.yaml b/config/config-template-zap-long.yaml index 17ce0f08..9df37b2e 100644 --- a/config/config-template-zap-long.yaml +++ b/config/config-template-zap-long.yaml @@ -52,6 +52,7 @@ general: client_id: "cloud-services" token_endpoint: "" rtoken_from_var: "RTOKEN" # referring to a env defined in general.environ.envFile + #preauth: false # set to true to pregenerate a token, and stick to it (no refresh) # Other types of authentication: #type: "http_header" #parameters: diff --git a/scanners/downloaders.py b/scanners/downloaders.py index 08e534aa..45b0a7f9 100644 --- a/scanners/downloaders.py +++ b/scanners/downloaders.py @@ -29,12 +29,15 @@ def anonymous_download(url, dest=None, proxy=None): return resp.content -def authenticated_download_with_rtoken(url, dest, auth, proxy=None): - """Given a URL and Oauth2 authentication parameters, download the URL and store it at `dest`""" +def oauth2_get_token_from_rtoken(auth, proxy=None, session=None): + """Given a rtoken, retrieve and return a Bearer token + auth is in the form { url, client_id, rtoken } - session = requests.Session() + """ + + if session is None: + session = requests.Session() - # get a token headers = { "Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded", @@ -50,20 +53,44 @@ def authenticated_download_with_rtoken(url, dest, auth, proxy=None): "http": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}", } - resp = session.post(auth["url"], data=payload, headers=headers, proxies=proxy) - - if resp.status_code != 200: - logging.warning( - f"Unable to get a bearer token. Aborting manual download for {url}" + try: + resp = session.post(auth["url"], data=payload, headers=headers, proxies=proxy) + resp.raise_for_status() + except requests.exceptions.ConnectTimeout: + logging.error( + "Getting oauth2 token failed: server unresponsive. Check the Authentication URL parameters" ) return False + except requests.exceptions.HTTPError as e: + logging.error(f"Getting token failed: Check the RTOKEN. err details: {e}") + return False try: token = yaml.safe_load(resp.text)["access_token"] - except Exception as exc: - raise RuntimeError( + except KeyError as exc: + logging.error( f"Unable to extract access token from OAuth2 authentication:\n {str(exc)}" - ) from exc + ) + return False + + return token + + +def authenticated_download_with_rtoken(url, dest, auth, proxy=None): + """Given a URL and Oauth2 authentication parameters, download the URL and store it at `dest`""" + + session = requests.Session() + + # get a token + if proxy: + proxy = { + "https": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}", + "http": f"http://{proxy['proxyHost']}:{proxy['proxyPort']}", + } + token = oauth2_get_token_from_rtoken(auth, proxy, session) + if not token: + return False + authenticated_headers = {"Authorization": f"Bearer {token}"} resp = session.get(url, proxies=proxy, headers=authenticated_headers) diff --git a/scanners/zap/zap.py b/scanners/zap/zap.py index 65d00915..da1e6a30 100644 --- a/scanners/zap/zap.py +++ b/scanners/zap/zap.py @@ -13,7 +13,7 @@ from scanners import RapidastScanner from scanners.authentication_factory import generic_authentication_factory from scanners.downloaders import authenticated_download_with_rtoken - +from scanners.downloaders import oauth2_get_token_from_rtoken CLASSNAME = "Zap" @@ -731,6 +731,9 @@ def authentication_set_oauth2_rtoken(self): - Sets a "script" (httpsender) job, which will inject the latest token retrieved + Except if `preauth` is set. In that case, generate a token, and + enforce its use (warning: it will not be regenerated after expiration) + Returns True as it creates a ZAP user """ @@ -741,6 +744,33 @@ def authentication_set_oauth2_rtoken(self): rtoken = self.my_conf(f"{params_path}.rtoken", None) scripts_dir = self.container_scripts_dir + # Sometimes, rtoken causes issues + # workaround: pre-generate 1 token, and enforce its use + # Downside: it will not be refreshed after expiring + if self.my_conf(f"{params_path}.preauth"): + logging.debug("Oauth2/rtoken: preauthenticating mode") + auth = { + "client_id": client_id, + "rtoken": rtoken, + "url": token_endpoint, + } + token = oauth2_get_token_from_rtoken(auth, proxy=self.my_conf("proxy")) + if token: + # Delete previous config, and creating a new one + logging.debug( + "successfully retrieved a token, hijacking authentication" + ) + self.set_my_conf("authentication.type", "http_header") + self.set_my_conf(f"{params_path}", {}) + self.set_my_conf(f"{params_path}.name", "Authorization") + self.set_my_conf(f"{params_path}.value", f"Bearer {token}") + # re-run authentication + return self.authentication_factory() + else: + logging.warning( + "Preauthentication failed, continuing with regular oauth2" + ) + # 1- complete the context: script, verification and user context_["authentication"] = { "method": "script", diff --git a/tests/scanners/zap/test_setup_podman.py b/tests/scanners/zap/test_setup_podman.py index 145a9771..f2518a6a 100644 --- a/tests/scanners/zap/test_setup_podman.py +++ b/tests/scanners/zap/test_setup_podman.py @@ -2,6 +2,7 @@ from pathlib import Path import pytest +import requests import configmodel.converter from scanners.zap.zap import find_context @@ -168,6 +169,31 @@ def test_setup_authentication_auth_rtoken_configured(test_config): ) +def test_setup_authentication_auth_rtoken_preauth(test_config): + # Verify that preauth changes the oauth2_rtoken to http_header + authentication = { + "type": "oauth2_rtoken", + "parameters": { + "client_id": "cloud-services", + "token_endpoint": "", + "rtoken_var_name": "RTOKEN", + "preauth": True, + }, + } + + test_config.set("general.authentication", authentication) + + test_config.merge( + test_config.get("general", default={}), preserve=False, root=f"scanners.zap" + ) + + test_zap = ZapPodman(config=test_config) + + with pytest.raises(requests.exceptions.MissingSchema) as e_info: + test_zap.setup() + assert "Invalid URL ''" in str(e_info.value) + + ## Testing APIs & URLs ##