Skip to content

Commit

Permalink
Fix broken mtls test
Browse files Browse the repository at this point in the history
  • Loading branch information
averevki committed Nov 24, 2023
1 parent e1f30b8 commit f685990
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 40 deletions.
38 changes: 19 additions & 19 deletions testsuite/httpx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Union

import backoff
from httpx import Client, ConnectError
from httpx import Client, NetworkError

from testsuite.certificates import Certificate

Expand All @@ -30,27 +30,27 @@ def __init__(self, retry_codes, response=None, error=None):

def should_backoff(self):
"""True, if the Result can be considered an instability and should be retried"""
return self.has_dns_error() or (not self.has_error() and self.status_code in self.retry_codes)
return self.has_dns_error() or (self.error is None and self.status_code in self.retry_codes)

def has_error(self):
"""True, if the request failed and an error was returned"""
return self.error is not None
def has_error(self, error_msg: str) -> bool:
"""True, if the request failed and an error with message was returned"""
return self.error is not None and len(self.error.args) > 0 and any(error_msg in arg for arg in self.error.args)

def has_dns_error(self):
"""True, if the result failed due to DNS failure"""
return (
self.has_error()
and len(self.error.args) > 0
and any("Name or service not known" in arg for arg in self.error.args)
)

def has_tls_error(self):
"""True, if the result failed due to TLS failure"""
return (
self.has_error()
and len(self.error.args) > 0
and any("SSL: CERTIFICATE_VERIFY_FAILED" in arg for arg in self.error.args)
)
return self.has_error("Name or service not known")

def has_tls_cert_verify_error(self):
"""True, if the result failed due to TLS certificate verification failure"""
return self.has_error("SSL: CERTIFICATE_VERIFY_FAILED")

def has_tls_unknown_ca_error(self):
"""True, if the result failed due to TLS unknown certificate authority failure"""
return self.has_error("SSL: TLSV1_ALERT_UNKNOWN_CA")

def has_tls_cert_required_error(self):
"""True, if the result failed due to TLS certificate absense failure"""
return self.has_error("SSL: TLSV13_ALERT_CERTIFICATE_REQUIRED")

def __getattr__(self, item):
"""For backwards compatibility"""
Expand Down Expand Up @@ -127,7 +127,7 @@ def request(
extensions=extensions,
)
return Result(self.retry_codes, response=response)
except ConnectError as e:
except NetworkError as e:
return Result(self.retry_codes, error=e)

def get(self, *args, **kwargs) -> Result:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""mTLS authentication tests"""
from typing import Callable
import pytest
from httpx import ReadError, ConnectError

from testsuite.httpx import Result


def test_mtls_success(envoy_authority, valid_cert, hostname):
Expand All @@ -11,24 +13,22 @@ def test_mtls_success(envoy_authority, valid_cert, hostname):


@pytest.mark.parametrize(
"cert_authority, certificate, err, err_match",
"cert_authority, certificate, has_error",
[
pytest.param("envoy_authority", "self_signed_cert", ReadError, "unknown ca", id="Self-Signed Certificate"),
pytest.param("envoy_authority", "invalid_cert", ReadError, "unknown ca", id="Invalid certificate"),
pytest.param("envoy_authority", None, ReadError, "certificate required", id="Without certificate"),
pytest.param(
"invalid_authority", "valid_cert", ConnectError, "certificate verify failed", id="Unknown authority"
),
pytest.param("envoy_authority", "self_signed_cert", Result.has_tls_unknown_ca_error, id="Self-signed cert"),
pytest.param("envoy_authority", "invalid_cert", Result.has_tls_unknown_ca_error, id="Invalid certificate"),
pytest.param("envoy_authority", None, Result.has_tls_cert_required_error, id="Without certificate"),
pytest.param("invalid_authority", "valid_cert", Result.has_tls_cert_verify_error, id="Unknown authority"),
],
)
def test_mtls_fail(request, cert_authority, certificate, err, err_match: str, hostname):
def test_mtls_fail(request, cert_authority, certificate, has_error: Callable, hostname):
"""Test failed mtls verification"""
ca = request.getfixturevalue(cert_authority)
cert = request.getfixturevalue(certificate) if certificate else None

with pytest.raises(err, match=err_match):
with hostname.client(verify=ca, cert=cert) as client:
client.get("/get")
with hostname.client(verify=ca, cert=cert) as client:
result = client.get("/get")
assert has_error(result)


def test_mtls_unmatched_attributes(envoy_authority, custom_cert, hostname):
Expand Down
14 changes: 6 additions & 8 deletions testsuite/tests/kuadrant/authorino/operator/tls/test_tls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""Tests that envoy deployed with TLS security works with Authorino"""
import pytest
from httpx import ReadError


def test_valid_certificate(envoy_authority, valid_cert, auth, hostname):
Expand All @@ -12,13 +10,13 @@ def test_valid_certificate(envoy_authority, valid_cert, auth, hostname):

def test_no_certificate(hostname, envoy_authority):
"""Test that request without certificate will be rejected"""
with pytest.raises(ReadError, match="certificate required"):
with hostname.client(verify=envoy_authority) as client:
client.get("/get")
with hostname.client(verify=envoy_authority) as client:
result = client.get("/get")
result.has_tls_cert_required_error()


def test_invalid_certificate(envoy_authority, invalid_cert, auth, hostname):
"""Tests that certificate with different CA will be rejeceted"""
with pytest.raises(ReadError, match="unknown ca"):
with hostname.client(verify=envoy_authority, cert=invalid_cert) as client:
client.get("/get", auth=auth)
with hostname.client(verify=envoy_authority, cert=invalid_cert) as client:
result = client.get("/get", auth=auth)
result.has_tls_unknown_ca_error()
2 changes: 1 addition & 1 deletion testsuite/tests/mgc/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ def test_smoke(client):

result = client.get("/get")
assert not result.has_dns_error()
assert not result.has_tls_error()
assert not result.has_tls_cert_verify_error()
assert result.status_code == 200

0 comments on commit f685990

Please sign in to comment.