From 3e1f5377d4c06036959425b055bee7d26e0363ce Mon Sep 17 00:00:00 2001 From: Jakub Smolar Date: Mon, 5 Feb 2024 15:26:58 +0100 Subject: [PATCH] Add test for rate limiting anonymous users --- .../kuadrant/test_rate_limit_anonymous.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 testsuite/tests/kuadrant/test_rate_limit_anonymous.py diff --git a/testsuite/tests/kuadrant/test_rate_limit_anonymous.py b/testsuite/tests/kuadrant/test_rate_limit_anonymous.py new file mode 100644 index 00000000..1118a482 --- /dev/null +++ b/testsuite/tests/kuadrant/test_rate_limit_anonymous.py @@ -0,0 +1,62 @@ +"""Tests for authenticated rate limit, but only for anonymous users""" + +import pytest + +from testsuite.httpx.auth import HttpxOidcClientAuth +from testsuite.policy.authorization import Pattern, JsonResponse, ValueFrom +from testsuite.policy.rate_limit_policy import Limit + + +@pytest.fixture(scope="module") +def rate_limit(rate_limit): + """Add limit to the policy only for anonymous users""" + rate_limit.add_limit( + "basic", + [Limit(5, 10)], + when=[ + Pattern( + selector=r"metadata.filter_metadata.envoy\.filters\.http\.ext_authz.identity.anonymous", + operator="eq", + value="true", + ) + ], + ) + return rate_limit + + +@pytest.fixture(scope="module") +def authorization(authorization, oidc_provider): + """Add oidc and anonymous identity with low priority to the AuthConfig""" + authorization.identity.add_anonymous("anonymous", priority=1) + authorization.identity.add_oidc("rhsso", oidc_provider.well_known["issuer"]) + + # curly brackets are added to response as it stringifies the anonymous output. + authorization.responses.add_success_dynamic( + "identity", JsonResponse({"anonymous": ValueFrom("{auth.identity.anonymous}")}) + ) + return authorization + + +@pytest.fixture(scope="module") +def auth(oidc_provider): + """Returns RHSSO authentication object for HTTPX""" + return HttpxOidcClientAuth(oidc_provider.get_token, "authorization") + + +def test_no_limit_for_auth_user(client, auth): + """Test that no limit is not applied for authenticated user""" + responses = client.get_many("/get", 7, auth=auth) + assert all( + r.status_code == 200 for r in responses + ), f"Rate Limited resource unexpectedly rejected requests {responses}" + + +def test_anonymous_identity(client, auth): + """Test that an anonymous requests are correctly limited""" + assert client.get("/get", auth=auth).status_code == 200 + responses = client.get_many("/get", 5) + assert all( + r.status_code == 200 for r in responses + ), f"Rate Limited resource unexpectedly rejected requests {responses}" + assert client.get("/get").status_code == 429 + assert client.get("/get", auth=auth).status_code == 200