Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Don't Label Users as Quota Limited for N Days After They Exceed Their Quota #19938

Merged
merged 13 commits into from
Feb 9, 2024
183 changes: 143 additions & 40 deletions ee/billing/quota_limiting.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import copy
from datetime import timedelta
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Mapping, Optional, Sequence, TypedDict, cast
from typing import (
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
TypedDict,
cast,
)

import dateutil.parser
import posthoganalytics
Expand All @@ -22,7 +31,11 @@
)
from posthog.utils import get_current_day

QUOTA_LIMITER_CACHE_KEY = "@posthog/quota-limits/"
# We are updating the quota limiting behavior so we retain data for DAYS_RETAIN_DATA days before
# permanently dropping it. This will allow us to recover data in the case of incidents
# where usage may have inadvertently exceeded a billing limit.
# Ref: INC-144
DAYS_RETAIN_DATA = 3

QUOTA_LIMIT_DATA_RETENTION_FLAG = "retain-data-past-quota-limit"

Expand All @@ -33,36 +46,45 @@ class QuotaResource(Enum):
ROWS_SYNCED = "rows_synced"


class QuotaLimitingRedisCaches(Enum):
QUOTA_OVERAGE_RETENTION_CACHE_KEY = "@posthog/quota-overage-retention/"
QUOTA_LIMITER_CACHE_KEY = "@posthog/quota-limits/"


OVERAGE_BUFFER = {
QuotaResource.EVENTS: 0,
QuotaResource.RECORDINGS: 1000,
QuotaResource.ROWS_SYNCED: 0,
}


def replace_limited_team_tokens(resource: QuotaResource, tokens: Mapping[str, int]) -> None:
def replace_limited_team_tokens(
resource: QuotaResource, tokens: Mapping[str, int], cache_key: QuotaLimitingRedisCaches
) -> None:
pipe = get_client().pipeline()
pipe.delete(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}")
pipe.delete(f"{cache_key}{resource.value}")
if tokens:
pipe.zadd(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
pipe.zadd(f"{cache_key}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
pipe.execute()


def add_limited_team_tokens(resource: QuotaResource, tokens: Mapping[str, int]) -> None:
def add_limited_team_tokens(
resource: QuotaResource, tokens: Mapping[str, int], cache_key: QuotaLimitingRedisCaches
) -> None:
redis_client = get_client()
redis_client.zadd(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
redis_client.zadd(f"{cache_key}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)


def remove_limited_team_tokens(resource: QuotaResource, tokens: List[str]) -> None:
def remove_limited_team_tokens(resource: QuotaResource, tokens: List[str], cache_key: QuotaLimitingRedisCaches) -> None:
redis_client = get_client()
redis_client.zrem(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", *tokens)
redis_client.zrem(f"{cache_key}{resource.value}", *tokens)


@cache_for(timedelta(seconds=30), background_refresh=True)
def list_limited_team_attributes(resource: QuotaResource) -> List[str]:
def list_limited_team_attributes(resource: QuotaResource, cache_key: QuotaLimitingRedisCaches) -> List[str]:
now = timezone.now()
redis_client = get_client()
results = redis_client.zrangebyscore(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", min=now.timestamp(), max="+inf")
results = redis_client.zrangebyscore(f"{cache_key}{resource.value}", min=now.timestamp(), max="+inf")
return [x.decode("utf-8") for x in results]


Expand All @@ -72,7 +94,16 @@ class UsageCounters(TypedDict):
rows_synced: int


def org_quota_limited_until(organization: Organization, resource: QuotaResource) -> Optional[int]:
class QuotaLimitingTracker(TypedDict):
data_retained_until: Optional[int] # timestamp
quota_limited_until: Optional[int] # timestamp
needs_save: Optional[bool]


def determine_org_quota_limit_or_data_retention(
organization: Organization, resource: QuotaResource
) -> Optional[QuotaLimitingTracker]:
today, _ = get_current_day()
if not organization.usage:
return None

Expand All @@ -81,18 +112,30 @@ def org_quota_limited_until(organization: Organization, resource: QuotaResource)
return None
usage = summary.get("usage", 0)
todays_usage = summary.get("todays_usage", 0)
data_retained_until = summary.get("retained_period_end", None)
limit = summary.get("limit")
needs_save = False

if limit is None:
return None

is_quota_limited = usage + todays_usage >= limit + OVERAGE_BUFFER[resource]
billing_period_start = round(dateutil.parser.isoparse(organization.usage["period"][0]).timestamp())
billing_period_end = round(dateutil.parser.isoparse(organization.usage["period"][1]).timestamp())

if is_quota_limited and organization.never_drop_data:
if not is_quota_limited:
# Reset data retention
if data_retained_until:
data_retained_until = None
del summary["retained_period_end"]
needs_save = True
return {"quota_limited_until": None, "data_retained_until": None, "needs_save": needs_save}
return None

if organization.never_drop_data:
return None

if is_quota_limited and posthoganalytics.feature_enabled(
if posthoganalytics.feature_enabled(
QUOTA_LIMIT_DATA_RETENTION_FLAG,
organization.id,
groups={"organization": str(organization.id)},
Expand All @@ -105,8 +148,22 @@ def org_quota_limited_until(organization: Organization, resource: QuotaResource)
)
return None

if is_quota_limited and billing_period_end:
return billing_period_end
# Either wasn't set or was set in the previous biling period
if (
not data_retained_until
or round((datetime.fromtimestamp(data_retained_until) - timedelta(days=DAYS_RETAIN_DATA)).timestamp())
< billing_period_start
):
data_retained_until = round((today + timedelta(days=DAYS_RETAIN_DATA)).timestamp())
summary["retained_period_end"] = data_retained_until
needs_save = True

if billing_period_end:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would we not have a billing period end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't imagine why we wouldn't have one. This is here simply to be consistent with the previous check if is_quota_limited and billing_period_end:

return {
"quota_limited_until": billing_period_end,
"data_retained_until": data_retained_until,
"needs_save": needs_save,
}

return None

Expand All @@ -115,14 +172,35 @@ def sync_org_quota_limits(organization: Organization):
if not organization.usage:
return None

today_start, _ = get_current_day()
for resource in [QuotaResource.EVENTS, QuotaResource.RECORDINGS, QuotaResource.ROWS_SYNCED]:
team_attributes = get_team_attribute_by_quota_resource(organization, resource)
quota_limited_until = org_quota_limited_until(organization, resource)

if quota_limited_until:
add_limited_team_tokens(resource, {x: quota_limited_until for x in team_attributes})
else:
remove_limited_team_tokens(resource, team_attributes)
result = determine_org_quota_limit_or_data_retention(organization, resource)
if result:
quota_limited_until = result.get("quota_limited_until")
data_retained_until = result.get("data_retained_until")
needs_save = result.get("needs_save")

if needs_save:
organization.save()
if quota_limited_until and (data_retained_until and data_retained_until < round(today_start.timestamp())):
add_limited_team_tokens(
resource,
{x: quota_limited_until for x in team_attributes},
QuotaLimitingRedisCaches.QUOTA_LIMITER_CACHE_KEY,
)
continue
elif data_retained_until and data_retained_until >= today_start.timestamp():
add_limited_team_tokens(
resource,
{x: data_retained_until for x in team_attributes},
QuotaLimitingRedisCaches.QUOTA_OVERAGE_RETENTION_CACHE_KEY,
)
continue
remove_limited_team_tokens(resource, team_attributes, QuotaLimitingRedisCaches.QUOTA_LIMITER_CACHE_KEY)
remove_limited_team_tokens(
resource, team_attributes, QuotaLimitingRedisCaches.QUOTA_OVERAGE_RETENTION_CACHE_KEY
)


def get_team_attribute_by_quota_resource(organization: Organization, resource: QuotaResource):
Expand Down Expand Up @@ -182,7 +260,7 @@ def set_org_usage_summary(
return has_changed


def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str, int]]:
def update_all_org_billing_quotas(dry_run: bool = False) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
period = get_current_day()
period_start, period_end = period

Expand Down Expand Up @@ -233,7 +311,8 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
for field in team_report:
org_report[field] += team_report[field] # type: ignore

quota_limited_orgs: Dict[str, Dict[str, int]] = {"events": {}, "recordings": {}, "rows_synced": {}}
quota_limited_orgs: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
data_retained_orgs: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}

# We find all orgs that should be rate limited
for org_id, todays_report in todays_usage_report.items():
Expand All @@ -246,24 +325,35 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
org.save(update_fields=["usage"])

for field in ["events", "recordings", "rows_synced"]:
quota_limited_until = org_quota_limited_until(org, QuotaResource(field))
result = determine_org_quota_limit_or_data_retention(org, QuotaResource(field))
if result:
quota_limited_until = result.get("quota_limited_until")
data_retained_until = result.get("data_retained_until")
needs_save = result.get("needs_save")

if quota_limited_until:
# TODO: Set this rate limit to the end of the billing period
quota_limited_orgs[field][org_id] = quota_limited_until
if needs_save:
org.save(update_fields=["usage"])

# Get the current quota limits so we can track to poshog if it changes
if data_retained_until and data_retained_until >= period_start.timestamp():
data_retained_orgs[field][org_id] = data_retained_until
elif quota_limited_until:
quota_limited_orgs[field][org_id] = quota_limited_until

# Get the current quota limits so we can track to posthog if it changes
orgs_with_changes = set()
previously_quota_limited_team_tokens: Dict[str, Dict[str, int]] = {
"events": {},
"recordings": {},
"rows_synced": {},
}
previously_quota_limited_team_tokens: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
previously_data_retained_teams: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}

for field in quota_limited_orgs:
previously_quota_limited_team_tokens[field] = list_limited_team_attributes(QuotaResource(field))
previously_quota_limited_team_tokens[field] = list_limited_team_attributes(
QuotaResource(field), QuotaLimitingRedisCaches.QUOTA_LIMITER_CACHE_KEY
)
previously_data_retained_teams[field] = list_limited_team_attributes(
QuotaResource(field), QuotaLimitingRedisCaches.QUOTA_OVERAGE_RETENTION_CACHE_KEY
)

quota_limited_teams: Dict[str, Dict[str, int]] = {"events": {}, "recordings": {}, "rows_synced": {}}
quota_limited_teams: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
data_retained_teams: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}

# Convert the org ids to team tokens
for team in teams:
Expand All @@ -275,9 +365,14 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
# If the team was not previously quota limited, we add it to the list of orgs that were added
if team.api_token not in previously_quota_limited_team_tokens[field]:
orgs_with_changes.add(org_id)
elif org_id in data_retained_orgs[field]:
data_retained_teams[field][team.api_token] = data_retained_orgs[field][org_id]
else:
# If the team was previously quota limited, we add it to the list of orgs that were removed
if team.api_token in previously_quota_limited_team_tokens[field]:
if (
team.api_token in previously_quota_limited_team_tokens[field]
or team.api_token in previously_data_retained_teams[field]
):
orgs_with_changes.add(org_id)

for org_id in orgs_with_changes:
Expand All @@ -295,7 +390,15 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
)

if not dry_run:
for field in data_retained_teams:
replace_limited_team_tokens(
QuotaResource(field),
data_retained_teams[field],
QuotaLimitingRedisCaches.QUOTA_OVERAGE_RETENTION_CACHE_KEY,
)
for field in quota_limited_teams:
replace_limited_team_tokens(QuotaResource(field), quota_limited_teams[field])
replace_limited_team_tokens(
QuotaResource(field), quota_limited_teams[field], QuotaLimitingRedisCaches.QUOTA_LIMITER_CACHE_KEY
)

return quota_limited_orgs
return quota_limited_orgs, data_retained_orgs
Loading
Loading