Skip to content

Commit

Permalink
Merge pull request #336 from jsmolar/rate_limit_auth
Browse files Browse the repository at this point in the history
Add test for rate limiting anonymous users
  • Loading branch information
pehala authored Feb 5, 2024
2 parents 36b5d1a + 3e1f537 commit efb12b9
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions testsuite/tests/kuadrant/test_rate_limit_anonymous.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Tests for authenticated rate limit, but only for anonymous users"""

import pytest

from testsuite.httpx.auth import HttpxOidcClientAuth
from testsuite.policy.authorization import Pattern, JsonResponse, ValueFrom
from testsuite.policy.rate_limit_policy import Limit


@pytest.fixture(scope="module")
def rate_limit(rate_limit):
"""Add limit to the policy only for anonymous users"""
rate_limit.add_limit(
"basic",
[Limit(5, 10)],
when=[
Pattern(
selector=r"metadata.filter_metadata.envoy\.filters\.http\.ext_authz.identity.anonymous",
operator="eq",
value="true",
)
],
)
return rate_limit


@pytest.fixture(scope="module")
def authorization(authorization, oidc_provider):
"""Add oidc and anonymous identity with low priority to the AuthConfig"""
authorization.identity.add_anonymous("anonymous", priority=1)
authorization.identity.add_oidc("rhsso", oidc_provider.well_known["issuer"])

# curly brackets are added to response as it stringifies the anonymous output.
authorization.responses.add_success_dynamic(
"identity", JsonResponse({"anonymous": ValueFrom("{auth.identity.anonymous}")})
)
return authorization


@pytest.fixture(scope="module")
def auth(oidc_provider):
"""Returns RHSSO authentication object for HTTPX"""
return HttpxOidcClientAuth(oidc_provider.get_token, "authorization")


def test_no_limit_for_auth_user(client, auth):
"""Test that no limit is not applied for authenticated user"""
responses = client.get_many("/get", 7, auth=auth)
assert all(
r.status_code == 200 for r in responses
), f"Rate Limited resource unexpectedly rejected requests {responses}"


def test_anonymous_identity(client, auth):
"""Test that an anonymous requests are correctly limited"""
assert client.get("/get", auth=auth).status_code == 200
responses = client.get_many("/get", 5)
assert all(
r.status_code == 200 for r in responses
), f"Rate Limited resource unexpectedly rejected requests {responses}"
assert client.get("/get").status_code == 429
assert client.get("/get", auth=auth).status_code == 200

0 comments on commit efb12b9

Please sign in to comment.