Skip to content

Commit

Permalink
fix: revert #19938 (#20232)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Feb 9, 2024
1 parent c4fa55c commit 11c0506
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 422 deletions.
183 changes: 40 additions & 143 deletions ee/billing/quota_limiting.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
import copy
from datetime import datetime, timedelta
from datetime import timedelta
from enum import Enum
from typing import (
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
TypedDict,
cast,
)
from typing import Dict, List, Mapping, Optional, Sequence, TypedDict, cast

import dateutil.parser
import posthoganalytics
Expand All @@ -31,11 +22,7 @@
)
from posthog.utils import get_current_day

# We are updating the quota limiting behavior so we retain data for DAYS_RETAIN_DATA days before
# permanently dropping it. This will allow us to recover data in the case of incidents
# where usage may have inadvertently exceeded a billing limit.
# Ref: INC-144
DAYS_RETAIN_DATA = 3
QUOTA_LIMITER_CACHE_KEY = "@posthog/quota-limits/"

QUOTA_LIMIT_DATA_RETENTION_FLAG = "retain-data-past-quota-limit"

Expand All @@ -46,45 +33,36 @@ class QuotaResource(Enum):
ROWS_SYNCED = "rows_synced"


class QuotaLimitingRedisCaches(Enum):
QUOTA_OVERAGE_RETENTION_CACHE_KEY = "@posthog/quota-overage-retention/"
QUOTA_LIMITER_CACHE_KEY = "@posthog/quota-limits/"


OVERAGE_BUFFER = {
QuotaResource.EVENTS: 0,
QuotaResource.RECORDINGS: 1000,
QuotaResource.ROWS_SYNCED: 0,
}


def replace_limited_team_tokens(
resource: QuotaResource, tokens: Mapping[str, int], cache_key: QuotaLimitingRedisCaches
) -> None:
def replace_limited_team_tokens(resource: QuotaResource, tokens: Mapping[str, int]) -> None:
pipe = get_client().pipeline()
pipe.delete(f"{cache_key}{resource.value}")
pipe.delete(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}")
if tokens:
pipe.zadd(f"{cache_key}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
pipe.zadd(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
pipe.execute()


def add_limited_team_tokens(
resource: QuotaResource, tokens: Mapping[str, int], cache_key: QuotaLimitingRedisCaches
) -> None:
def add_limited_team_tokens(resource: QuotaResource, tokens: Mapping[str, int]) -> None:
redis_client = get_client()
redis_client.zadd(f"{cache_key}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)
redis_client.zadd(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", tokens) # type: ignore # (zadd takes a Mapping[str, int] but the derived Union type is wrong)


def remove_limited_team_tokens(resource: QuotaResource, tokens: List[str], cache_key: QuotaLimitingRedisCaches) -> None:
def remove_limited_team_tokens(resource: QuotaResource, tokens: List[str]) -> None:
redis_client = get_client()
redis_client.zrem(f"{cache_key}{resource.value}", *tokens)
redis_client.zrem(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", *tokens)


@cache_for(timedelta(seconds=30), background_refresh=True)
def list_limited_team_attributes(resource: QuotaResource, cache_key: QuotaLimitingRedisCaches) -> List[str]:
def list_limited_team_attributes(resource: QuotaResource) -> List[str]:
now = timezone.now()
redis_client = get_client()
results = redis_client.zrangebyscore(f"{cache_key}{resource.value}", min=now.timestamp(), max="+inf")
results = redis_client.zrangebyscore(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", min=now.timestamp(), max="+inf")
return [x.decode("utf-8") for x in results]


Expand All @@ -94,16 +72,7 @@ class UsageCounters(TypedDict):
rows_synced: int


class QuotaLimitingTracker(TypedDict):
data_retained_until: Optional[int] # timestamp
quota_limited_until: Optional[int] # timestamp
needs_save: Optional[bool]


def determine_org_quota_limit_or_data_retention(
organization: Organization, resource: QuotaResource
) -> Optional[QuotaLimitingTracker]:
today, _ = get_current_day()
def org_quota_limited_until(organization: Organization, resource: QuotaResource) -> Optional[int]:
if not organization.usage:
return None

Expand All @@ -112,30 +81,18 @@ def determine_org_quota_limit_or_data_retention(
return None
usage = summary.get("usage", 0)
todays_usage = summary.get("todays_usage", 0)
data_retained_until = summary.get("retained_period_end", None)
limit = summary.get("limit")
needs_save = False

if limit is None:
return None

is_quota_limited = usage + todays_usage >= limit + OVERAGE_BUFFER[resource]
billing_period_start = round(dateutil.parser.isoparse(organization.usage["period"][0]).timestamp())
billing_period_end = round(dateutil.parser.isoparse(organization.usage["period"][1]).timestamp())

if not is_quota_limited:
# Reset data retention
if data_retained_until:
data_retained_until = None
del summary["retained_period_end"]
needs_save = True
return {"quota_limited_until": None, "data_retained_until": None, "needs_save": needs_save}
return None

if organization.never_drop_data:
if is_quota_limited and organization.never_drop_data:
return None

if posthoganalytics.feature_enabled(
if is_quota_limited and posthoganalytics.feature_enabled(
QUOTA_LIMIT_DATA_RETENTION_FLAG,
organization.id,
groups={"organization": str(organization.id)},
Expand All @@ -148,22 +105,8 @@ def determine_org_quota_limit_or_data_retention(
)
return None

# Either wasn't set or was set in the previous biling period
if (
not data_retained_until
or round((datetime.fromtimestamp(data_retained_until) - timedelta(days=DAYS_RETAIN_DATA)).timestamp())
< billing_period_start
):
data_retained_until = round((today + timedelta(days=DAYS_RETAIN_DATA)).timestamp())
summary["retained_period_end"] = data_retained_until
needs_save = True

if billing_period_end:
return {
"quota_limited_until": billing_period_end,
"data_retained_until": data_retained_until,
"needs_save": needs_save,
}
if is_quota_limited and billing_period_end:
return billing_period_end

return None

Expand All @@ -172,35 +115,14 @@ def sync_org_quota_limits(organization: Organization):
if not organization.usage:
return None

today_start, _ = get_current_day()
for resource in [QuotaResource.EVENTS, QuotaResource.RECORDINGS, QuotaResource.ROWS_SYNCED]:
team_attributes = get_team_attribute_by_quota_resource(organization, resource)
result = determine_org_quota_limit_or_data_retention(organization, resource)
if result:
quota_limited_until = result.get("quota_limited_until")
data_retained_until = result.get("data_retained_until")
needs_save = result.get("needs_save")

if needs_save:
organization.save()
if quota_limited_until and (data_retained_until and data_retained_until < round(today_start.timestamp())):
add_limited_team_tokens(
resource,
{x: quota_limited_until for x in team_attributes},
QuotaLimitingRedisCaches.QUOTA_LIMITER_CACHE_KEY,
)
continue
elif data_retained_until and data_retained_until >= today_start.timestamp():
add_limited_team_tokens(
resource,
{x: data_retained_until for x in team_attributes},
QuotaLimitingRedisCaches.QUOTA_OVERAGE_RETENTION_CACHE_KEY,
)
continue
remove_limited_team_tokens(resource, team_attributes, QuotaLimitingRedisCaches.QUOTA_LIMITER_CACHE_KEY)
remove_limited_team_tokens(
resource, team_attributes, QuotaLimitingRedisCaches.QUOTA_OVERAGE_RETENTION_CACHE_KEY
)
quota_limited_until = org_quota_limited_until(organization, resource)

if quota_limited_until:
add_limited_team_tokens(resource, {x: quota_limited_until for x in team_attributes})
else:
remove_limited_team_tokens(resource, team_attributes)


def get_team_attribute_by_quota_resource(organization: Organization, resource: QuotaResource):
Expand Down Expand Up @@ -260,7 +182,7 @@ def set_org_usage_summary(
return has_changed


def update_all_org_billing_quotas(dry_run: bool = False) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str, int]]:
period = get_current_day()
period_start, period_end = period

Expand Down Expand Up @@ -311,8 +233,7 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Tuple[Dict[str, Dict
for field in team_report:
org_report[field] += team_report[field] # type: ignore

quota_limited_orgs: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
data_retained_orgs: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
quota_limited_orgs: Dict[str, Dict[str, int]] = {"events": {}, "recordings": {}, "rows_synced": {}}

# We find all orgs that should be rate limited
for org_id, todays_report in todays_usage_report.items():
Expand All @@ -325,35 +246,24 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Tuple[Dict[str, Dict
org.save(update_fields=["usage"])

for field in ["events", "recordings", "rows_synced"]:
result = determine_org_quota_limit_or_data_retention(org, QuotaResource(field))
if result:
quota_limited_until = result.get("quota_limited_until")
data_retained_until = result.get("data_retained_until")
needs_save = result.get("needs_save")
quota_limited_until = org_quota_limited_until(org, QuotaResource(field))

if needs_save:
org.save(update_fields=["usage"])
if quota_limited_until:
# TODO: Set this rate limit to the end of the billing period
quota_limited_orgs[field][org_id] = quota_limited_until

if data_retained_until and data_retained_until >= period_start.timestamp():
data_retained_orgs[field][org_id] = data_retained_until
elif quota_limited_until:
quota_limited_orgs[field][org_id] = quota_limited_until

# Get the current quota limits so we can track to posthog if it changes
# Get the current quota limits so we can track to poshog if it changes
orgs_with_changes = set()
previously_quota_limited_team_tokens: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
previously_data_retained_teams: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
previously_quota_limited_team_tokens: Dict[str, Dict[str, int]] = {
"events": {},
"recordings": {},
"rows_synced": {},
}

for field in quota_limited_orgs:
previously_quota_limited_team_tokens[field] = list_limited_team_attributes(
QuotaResource(field), QuotaLimitingRedisCaches.QUOTA_LIMITER_CACHE_KEY
)
previously_data_retained_teams[field] = list_limited_team_attributes(
QuotaResource(field), QuotaLimitingRedisCaches.QUOTA_OVERAGE_RETENTION_CACHE_KEY
)
previously_quota_limited_team_tokens[field] = list_limited_team_attributes(QuotaResource(field))

quota_limited_teams: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
data_retained_teams: Dict[str, Dict[str, int]] = {key.value: {} for key in QuotaResource}
quota_limited_teams: Dict[str, Dict[str, int]] = {"events": {}, "recordings": {}, "rows_synced": {}}

# Convert the org ids to team tokens
for team in teams:
Expand All @@ -365,14 +275,9 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Tuple[Dict[str, Dict
# If the team was not previously quota limited, we add it to the list of orgs that were added
if team.api_token not in previously_quota_limited_team_tokens[field]:
orgs_with_changes.add(org_id)
elif org_id in data_retained_orgs[field]:
data_retained_teams[field][team.api_token] = data_retained_orgs[field][org_id]
else:
# If the team was previously quota limited, we add it to the list of orgs that were removed
if (
team.api_token in previously_quota_limited_team_tokens[field]
or team.api_token in previously_data_retained_teams[field]
):
if team.api_token in previously_quota_limited_team_tokens[field]:
orgs_with_changes.add(org_id)

for org_id in orgs_with_changes:
Expand All @@ -390,15 +295,7 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Tuple[Dict[str, Dict
)

if not dry_run:
for field in data_retained_teams:
replace_limited_team_tokens(
QuotaResource(field),
data_retained_teams[field],
QuotaLimitingRedisCaches.QUOTA_OVERAGE_RETENTION_CACHE_KEY,
)
for field in quota_limited_teams:
replace_limited_team_tokens(
QuotaResource(field), quota_limited_teams[field], QuotaLimitingRedisCaches.QUOTA_LIMITER_CACHE_KEY
)
replace_limited_team_tokens(QuotaResource(field), quota_limited_teams[field])

return quota_limited_orgs, data_retained_orgs
return quota_limited_orgs
Loading

0 comments on commit 11c0506

Please sign in to comment.