Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cache key interpolation bug #23200

Merged
merged 8 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ee/billing/quota_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,27 @@ def replace_limited_team_tokens(
resource: QuotaResource, tokens: Mapping[str, int], cache_key: QuotaLimitingCaches
) -> None:
pipe = get_client().pipeline()
pipe.delete(f"{cache_key}{resource.value}")
pipe.delete(f"{cache_key.value}{resource.value}")
if tokens:
pipe.zadd(f"{cache_key}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
pipe.zadd(f"{cache_key.value}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
pipe.execute()


def add_limited_team_tokens(resource: QuotaResource, tokens: Mapping[str, int], cache_key: QuotaLimitingCaches) -> None:
redis_client = get_client()
redis_client.zadd(f"{cache_key}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
redis_client.zadd(f"{cache_key.value}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)


def remove_limited_team_tokens(resource: QuotaResource, tokens: list[str], cache_key: QuotaLimitingCaches) -> None:
redis_client = get_client()
redis_client.zrem(f"{cache_key}{resource.value}", *tokens)
redis_client.zrem(f"{cache_key.value}{resource.value}", *tokens)


@cache_for(timedelta(seconds=30), background_refresh=True)
def list_limited_team_attributes(resource: QuotaResource, cache_key: QuotaLimitingCaches) -> list[str]:
now = timezone.now()
redis_client = get_client()
results = redis_client.zrangebyscore(f"{cache_key}{resource.value}", min=now.timestamp(), max="+inf")
results = redis_client.zrangebyscore(f"{cache_key.value}{resource.value}", min=now.timestamp(), max="+inf")
return [x.decode("utf-8") for x in results]


Expand Down
112 changes: 55 additions & 57 deletions ee/billing/test/test_quota_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ class TestQuotaLimiting(BaseTest):
def setUp(self) -> None:
super().setUp()
self.redis_client = get_client()
self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}.{QuotaResource.EVENTS}")
self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}.{QuotaResource.RECORDINGS}")
self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}.{QuotaResource.ROWS_SYNCED}")
self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}.{QuotaResource.EVENTS}")
self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}.{QuotaResource.RECORDINGS}")
self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}.{QuotaResource.ROWS_SYNCED}")
self.redis_client.delete(f"@posthog/quota-limits/.events")
self.redis_client.delete(f"@posthog/quota-limits/.recordings")
self.redis_client.delete(f"@posthog/quota-limits/.rows_synced")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/.events")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/.recordings")
self.redis_client.delete(f"@posthog/quota-limiting-suspended/.rows_synced")
raquelmsmith marked this conversation as resolved.
Show resolved Hide resolved

@patch("posthoganalytics.capture")
@patch("posthoganalytics.feature_enabled", return_value=True)
Expand Down Expand Up @@ -86,9 +86,9 @@ def test_quota_limiting_feature_flag_enabled(self, patch_feature_enabled, patch_
assert quota_limiting_suspended_orgs["events"] == {}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

patch_capture.reset_mock()
# Add this org to the redis cache.
Expand Down Expand Up @@ -116,11 +116,9 @@ def test_quota_limiting_feature_flag_enabled(self, patch_feature_enabled, patch_
assert quota_limiting_suspended_orgs["events"] == {}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [self.team.api_token.encode("UTF-8")]
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

@patch("posthoganalytics.capture")
@patch("posthoganalytics.feature_enabled", return_value=True)
Expand Down Expand Up @@ -148,9 +146,9 @@ def test_quota_limit_feature_flag_not_on(self, patch_feature_enabled, patch_capt
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

def test_billing_rate_limit_not_set_if_missing_org_usage(self) -> None:
with self.settings(USE_TZ=False):
Expand Down Expand Up @@ -178,9 +176,9 @@ def test_billing_rate_limit_not_set_if_missing_org_usage(self) -> None:
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

@patch("posthoganalytics.capture")
def test_billing_rate_limit(self, patch_capture) -> None:
Expand Down Expand Up @@ -227,11 +225,11 @@ def test_billing_rate_limit(self, patch_capture) -> None:
groups={"instance": "http://localhost:8000", "organization": org_id},
)

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

self.organization.refresh_from_db()
assert self.organization.usage == {
Expand All @@ -252,28 +250,28 @@ def test_billing_rate_limit(self, patch_capture) -> None:
assert quota_limiting_suspended_orgs["events"] == {}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

# Reset the event limiting set so their limiting will be suspended for 1 day.
self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events")
self.redis_client.delete(f"@posthog/quota-limits/events")
quota_limited_orgs, quota_limiting_suspended_orgs = update_all_org_billing_quotas()
assert quota_limited_orgs["events"] == {}
assert quota_limited_orgs["recordings"] == {}
assert quota_limited_orgs["rows_synced"] == {}
assert quota_limiting_suspended_orgs["events"] == {org_id: 1611705600}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limiting-suspended/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

# Check that limiting still suspended 23 hrs later
with freeze_time("2021-01-25T23:00:00Z"):
Expand All @@ -284,13 +282,13 @@ def test_billing_rate_limit(self, patch_capture) -> None:
assert quota_limiting_suspended_orgs["events"] == {org_id: 1611705600}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limiting-suspended/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []
self.organization.refresh_from_db()
assert self.organization.usage == {
"events": {
Expand Down Expand Up @@ -320,17 +318,17 @@ def test_billing_rate_limit(self, patch_capture) -> None:
assert quota_limiting_suspended_orgs["events"] == {}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limiting-suspended/events", 0, -1) == []

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

# Increase trust score. Their quota limiting suspension expiration should not update.
with freeze_time("2021-01-25T00:00:00Z"):
assert self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events")
assert self.redis_client.delete(f"@posthog/quota-limits/events")

self.organization.customer_trust_scores = {"events": 10, "recordings": 0, "rows_synced": 0}
self.organization.usage = {
Expand All @@ -347,13 +345,13 @@ def test_billing_rate_limit(self, patch_capture) -> None:
assert quota_limiting_suspended_orgs["events"] == {org_id: 1611705600}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limiting-suspended/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

# Reset, quota limiting should be suspended for 3 days.
self.organization.customer_trust_scores = {"events": 10, "recordings": 0, "rows_synced": 0}
Expand All @@ -371,13 +369,13 @@ def test_billing_rate_limit(self, patch_capture) -> None:
assert quota_limiting_suspended_orgs["events"] == {org_id: 1611878400}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limiting-suspended/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

# Decrease the trust score to 0. Quota limiting should immediately take effect.
self.organization.customer_trust_scores = {"events": 0, "recordings": 0, "rows_synced": 0}
Expand All @@ -395,16 +393,16 @@ def test_billing_rate_limit(self, patch_capture) -> None:
assert quota_limiting_suspended_orgs["events"] == {}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limiting-suspended/events", 0, -1) == []

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

with freeze_time("2021-01-28T00:00:00Z"):
self.redis_client.delete(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events")
self.redis_client.delete(f"@posthog/quota-limits/events")

# Quota limiting suspension date set in previous billing period, update to new suspension expiration
self.organization.customer_trust_scores = {"events": 10, "recordings": 0, "rows_synced": 0}
Expand All @@ -423,13 +421,13 @@ def test_billing_rate_limit(self, patch_capture) -> None:
assert quota_limiting_suspended_orgs["events"] == {org_id: 1612137600}
assert quota_limiting_suspended_orgs["recordings"] == {}
assert quota_limiting_suspended_orgs["rows_synced"] == {}
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITING_SUSPENDED_KEY}events", 0, -1) == [
assert self.redis_client.zrange(f"@posthog/quota-limiting-suspended/events", 0, -1) == [
self.team.api_token.encode("UTF-8")
]

assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}events", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}recordings", 0, -1) == []
assert self.redis_client.zrange(f"{QuotaLimitingCaches.QUOTA_LIMITER_CACHE_KEY}rows_synced", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/events", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/recordings", 0, -1) == []
assert self.redis_client.zrange(f"@posthog/quota-limits/rows_synced", 0, -1) == []

def test_set_org_usage_summary_updates_correctly(self):
self.organization.usage = {
Expand Down
Loading