From a7b68a03fadbf7ae5f32ded9f44195181380b575 Mon Sep 17 00:00:00 2001 From: averevki Date: Fri, 24 Nov 2023 16:01:18 +0100 Subject: [PATCH] Fix broken mtls test --- testsuite/httpx/__init__.py | 38 +++++++++---------- .../operator/tls/mtls/test_mtls_identity.py | 24 ++++++------ .../authorino/operator/tls/test_tls.py | 14 +++---- 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/testsuite/httpx/__init__.py b/testsuite/httpx/__init__.py index 58adc191..cb53930f 100644 --- a/testsuite/httpx/__init__.py +++ b/testsuite/httpx/__init__.py @@ -5,7 +5,7 @@ from typing import Union import backoff -from httpx import Client, ConnectError +from httpx import Client, NetworkError from testsuite.certificates import Certificate @@ -30,27 +30,27 @@ def __init__(self, retry_codes, response=None, error=None): def should_backoff(self): """True, if the Result can be considered an instability and should be retried""" - return self.has_dns_error() or (not self.has_error() and self.status_code in self.retry_codes) + return self.has_dns_error() or (self.error is None and self.status_code in self.retry_codes) - def has_error(self): - """True, if the request failed and an error was returned""" - return self.error is not None + def has_error(self, error_msg: str) -> bool: + """True, if the request failed and an error with message was returned""" + return self.error is not None and len(self.error.args) > 0 and any(error_msg in arg for arg in self.error.args) def has_dns_error(self): """True, if the result failed due to DNS failure""" - return ( - self.has_error() - and len(self.error.args) > 0 - and any("Name or service not known" in arg for arg in self.error.args) - ) - - def has_tls_error(self): - """True, if the result failed due to TLS failure""" - return ( - self.has_error() - and len(self.error.args) > 0 - and any("SSL: CERTIFICATE_VERIFY_FAILED" in arg for arg in self.error.args) - ) + return self.has_error("Name or service not known") + + def has_tls_cert_verify_error(self): + """True, if the result failed due to TLS certificate verification failure""" + return self.has_error("SSL: CERTIFICATE_VERIFY_FAILED") + + def has_tls_unknown_ca_error(self): + """True, if the result failed due to TLS unknown certificate authority failure""" + return self.has_error("SSL: TLSV1_ALERT_UNKNOWN_CA") + + def has_tls_cert_required_error(self): + """True, if the result failed due to TLS certificate absense failure""" + return self.has_error("SSL: TLSV13_ALERT_CERTIFICATE_REQUIRED") def __getattr__(self, item): """For backwards compatibility""" @@ -127,7 +127,7 @@ def request( extensions=extensions, ) return Result(self.retry_codes, response=response) - except ConnectError as e: + except NetworkError as e: return Result(self.retry_codes, error=e) def get(self, *args, **kwargs) -> Result: diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py index cafb3505..649efebf 100644 --- a/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py +++ b/testsuite/tests/kuadrant/authorino/operator/tls/mtls/test_mtls_identity.py @@ -1,6 +1,8 @@ """mTLS authentication tests""" +from typing import Callable import pytest -from httpx import ReadError, ConnectError + +from testsuite.httpx import Result def test_mtls_success(envoy_authority, valid_cert, hostname): @@ -11,24 +13,22 @@ def test_mtls_success(envoy_authority, valid_cert, hostname): @pytest.mark.parametrize( - "cert_authority, certificate, err, err_match", + "cert_authority, certificate, has_error", [ - pytest.param("envoy_authority", "self_signed_cert", ReadError, "unknown ca", id="Self-Signed Certificate"), - pytest.param("envoy_authority", "invalid_cert", ReadError, "unknown ca", id="Invalid certificate"), - pytest.param("envoy_authority", None, ReadError, "certificate required", id="Without certificate"), - pytest.param( - "invalid_authority", "valid_cert", ConnectError, "certificate verify failed", id="Unknown authority" - ), + pytest.param("envoy_authority", "self_signed_cert", Result.has_tls_unknown_ca_error, id="Self-signed cert"), + pytest.param("envoy_authority", "invalid_cert", Result.has_tls_unknown_ca_error, id="Invalid certificate"), + pytest.param("envoy_authority", None, Result.has_tls_cert_required_error, id="Without certificate"), + pytest.param("invalid_authority", "valid_cert", Result.has_tls_cert_verify_error, id="Unknown authority"), ], ) -def test_mtls_fail(request, cert_authority, certificate, err, err_match: str, hostname): +def test_mtls_fail(request, cert_authority, certificate, has_error: Callable, hostname): """Test failed mtls verification""" ca = request.getfixturevalue(cert_authority) cert = request.getfixturevalue(certificate) if certificate else None - with pytest.raises(err, match=err_match): - with hostname.client(verify=ca, cert=cert) as client: - client.get("/get") + with hostname.client(verify=ca, cert=cert) as client: + result = client.get("/get") + assert has_error(result) def test_mtls_unmatched_attributes(envoy_authority, custom_cert, hostname): diff --git a/testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py b/testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py index 96c7e23a..fda0aec8 100644 --- a/testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py +++ b/testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py @@ -1,6 +1,4 @@ """Tests that envoy deployed with TLS security works with Authorino""" -import pytest -from httpx import ReadError def test_valid_certificate(envoy_authority, valid_cert, auth, hostname): @@ -12,13 +10,13 @@ def test_valid_certificate(envoy_authority, valid_cert, auth, hostname): def test_no_certificate(hostname, envoy_authority): """Test that request without certificate will be rejected""" - with pytest.raises(ReadError, match="certificate required"): - with hostname.client(verify=envoy_authority) as client: - client.get("/get") + with hostname.client(verify=envoy_authority) as client: + result = client.get("/get") + result.has_tls_cert_required_error() def test_invalid_certificate(envoy_authority, invalid_cert, auth, hostname): """Tests that certificate with different CA will be rejeceted""" - with pytest.raises(ReadError, match="unknown ca"): - with hostname.client(verify=envoy_authority, cert=invalid_cert) as client: - client.get("/get", auth=auth) + with hostname.client(verify=envoy_authority, cert=invalid_cert) as client: + result = client.get("/get", auth=auth) + result.has_tls_unknown_ca_error()