Skip to content

Commit

Permalink
chore(data-warehouse): dw billing (#18168)
Browse files Browse the repository at this point in the history
* create airbyte source api

* comment test

* rename and add connection

* comment out test

* frontend updates and additional API calls on airbyte

* some more UI and retrieve endpoint

* restore lock file

* connecting the dots

* add destination logic

* make destinatino parquet

* ui updates

* add task to billing usage report

* update task

* rename data

* more renaming

* migration

* remove test

* rename

* Update UI snapshots for `chromium` (2)

* missing file

* typing

* remove unneeded field

* remove 'billable' and unnecessary field

* add rollback deletions if one fo the related resources fails

* fix types

* change type

* remove fluff

* remove migration

* Update UI snapshots for `chromium` (1)

* new attempt

* add tests

* add comment

* move around ph_client

* Update query snapshots

* Update query snapshots

* Update query snapshots

* Update query snapshots

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update query snapshots

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* add job

* usage report task and tests

* test

* add limit checker task

* add conversion

* add tests

* address comments

* Update query snapshots

* Update query snapshots

* update test

* revised

* fix tests

* Update query snapshots

* Update query snapshots

* Update query snapshots

* Update query snapshots

* add typing

* update tests

* rename

* typing

* fix tests

* Update query snapshots

* Update query snapshots

* Update query snapshots

* fix tests and types

* fix typo

* naming

* verbose

* remove verbose

* typo

* restore main

* adjust timing

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
EDsCODE and github-actions[bot] authored Nov 16, 2023
1 parent 6c16a42 commit 820e6a7
Show file tree
Hide file tree
Showing 34 changed files with 973 additions and 253 deletions.
15 changes: 15 additions & 0 deletions ee/api/test/test_billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def create_missing_billing_customer(**kwargs) -> CustomerInfo:
usage_summary={
"events": {"limit": None, "usage": 0},
"recordings": {"limit": None, "usage": 0},
"rows_synced": {"limit": None, "usage": 0},
},
free_trial_until=None,
available_features=[],
Expand Down Expand Up @@ -96,6 +97,7 @@ def create_billing_customer(**kwargs) -> CustomerInfo:
usage_summary={
"events": {"limit": None, "usage": 0},
"recordings": {"limit": None, "usage": 0},
"rows_synced": {"limit": None, "usage": 0},
},
free_trial_until=None,
)
Expand Down Expand Up @@ -292,6 +294,7 @@ def mock_implementation(url: str, headers: Any = None, params: Any = None) -> Ma
"usage_summary": {
"events": {"limit": None, "usage": 0},
"recordings": {"limit": None, "usage": 0},
"rows_synced": {"limit": None, "usage": 0},
},
"free_trial_until": None,
}
Expand Down Expand Up @@ -363,6 +366,7 @@ def mock_implementation(url: str, headers: Any = None, params: Any = None) -> Ma
"usage_summary": {
"events": {"limit": None, "usage": 0},
"recordings": {"limit": None, "usage": 0},
"rows_synced": {"limit": None, "usage": 0},
},
"free_trial_until": None,
"current_total_amount_usd": "0.00",
Expand Down Expand Up @@ -521,6 +525,11 @@ def mock_implementation(url: str, headers: Any = None, params: Any = None) -> Ma
"todays_usage": 0,
"usage": 0,
},
"rows_synced": {
"limit": None,
"todays_usage": 0,
"usage": 0,
},
"period": ["2022-10-07T11:12:48", "2022-11-07T11:12:48"],
}

Expand Down Expand Up @@ -556,6 +565,11 @@ def mock_implementation_missing_customer(url: str, headers: Any = None, params:
"todays_usage": 0,
"usage": 0,
},
"rows_synced": {
"limit": None,
"todays_usage": 0,
"usage": 0,
},
"period": ["2022-10-07T11:12:48", "2022-11-07T11:12:48"],
}
assert self.organization.customer_id == "cus_123"
Expand Down Expand Up @@ -613,5 +627,6 @@ def mock_implementation(url: str, headers: Any = None, params: Any = None) -> Ma
assert self.organization.usage == {
"events": {"limit": None, "usage": 0, "todays_usage": 0},
"recordings": {"limit": None, "usage": 0, "todays_usage": 0},
"rows_synced": {"limit": None, "usage": 0, "todays_usage": 0},
"period": ["2022-10-07T11:12:48", "2022-11-07T11:12:48"],
}
1 change: 1 addition & 0 deletions ee/billing/billing_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def update_org_details(self, organization: Organization, billing_status: Billing
usage_info = OrganizationUsageInfo(
events=usage_summary["events"],
recordings=usage_summary["recordings"],
rows_synced=usage_summary["rows_synced"],
period=[
data["billing_period"]["current_period_start"],
data["billing_period"]["current_period_end"],
Expand Down
55 changes: 40 additions & 15 deletions ee/billing/quota_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
convert_team_usage_rows_to_dict,
get_teams_with_billable_event_count_in_period,
get_teams_with_recording_count_in_period,
get_teams_with_rows_synced_in_period,
)
from posthog.utils import get_current_day

Expand All @@ -26,11 +27,13 @@
class QuotaResource(Enum):
EVENTS = "events"
RECORDINGS = "recordings"
ROWS_SYNCED = "rows_synced"


OVERAGE_BUFFER = {
QuotaResource.EVENTS: 0,
QuotaResource.RECORDINGS: 1000,
QuotaResource.ROWS_SYNCED: 0,
}


Expand All @@ -53,7 +56,7 @@ def remove_limited_team_tokens(resource: QuotaResource, tokens: List[str]) -> No


@cache_for(timedelta(seconds=30), background_refresh=True)
def list_limited_team_tokens(resource: QuotaResource) -> List[str]:
def list_limited_team_attributes(resource: QuotaResource) -> List[str]:
now = timezone.now()
redis_client = get_client()
results = redis_client.zrangebyscore(f"{QUOTA_LIMITER_CACHE_KEY}{resource.value}", min=now.timestamp(), max="+inf")
Expand All @@ -63,6 +66,7 @@ def list_limited_team_tokens(resource: QuotaResource) -> List[str]:
class UsageCounters(TypedDict):
events: int
recordings: int
rows_synced: int


def org_quota_limited_until(organization: Organization, resource: QuotaResource) -> Optional[int]:
Expand Down Expand Up @@ -93,19 +97,34 @@ def sync_org_quota_limits(organization: Organization):
if not organization.usage:
return None

team_tokens: List[str] = [x for x in list(organization.teams.values_list("api_token", flat=True)) if x]

if not team_tokens:
capture_exception(Exception(f"quota_limiting: No team tokens found for organization: {organization.id}"))
return

for resource in [QuotaResource.EVENTS, QuotaResource.RECORDINGS]:
for resource in [QuotaResource.EVENTS, QuotaResource.RECORDINGS, QuotaResource.ROWS_SYNCED]:
team_attributes = get_team_attribute_by_quota_resource(organization, resource)
quota_limited_until = org_quota_limited_until(organization, resource)

if quota_limited_until:
add_limited_team_tokens(resource, {x: quota_limited_until for x in team_tokens})
add_limited_team_tokens(resource, {x: quota_limited_until for x in team_attributes})
else:
remove_limited_team_tokens(resource, team_tokens)
remove_limited_team_tokens(resource, team_attributes)


def get_team_attribute_by_quota_resource(organization: Organization, resource: QuotaResource):
if resource in [QuotaResource.EVENTS, QuotaResource.RECORDINGS]:
team_tokens: List[str] = [x for x in list(organization.teams.values_list("api_token", flat=True)) if x]

if not team_tokens:
capture_exception(Exception(f"quota_limiting: No team tokens found for organization: {organization.id}"))
return

return team_tokens

if resource == QuotaResource.ROWS_SYNCED:
team_ids: List[str] = [x for x in list(organization.teams.values_list("id", flat=True)) if x]

if not team_ids:
capture_exception(Exception(f"quota_limiting: No team ids found for organization: {organization.id}"))
return

return team_ids


def set_org_usage_summary(
Expand All @@ -125,7 +144,7 @@ def set_org_usage_summary(

new_usage = copy.deepcopy(new_usage)

for field in ["events", "recordings"]:
for field in ["events", "recordings", "rows_synced"]:
resource_usage = new_usage[field] # type: ignore

if todays_usage:
Expand Down Expand Up @@ -155,6 +174,9 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
teams_with_recording_count_in_period=convert_team_usage_rows_to_dict(
get_teams_with_recording_count_in_period(period_start, period_end)
),
teams_with_rows_synced_in_period=convert_team_usage_rows_to_dict(
get_teams_with_rows_synced_in_period(period_start, period_end)
),
)

teams: Sequence[Team] = list(
Expand All @@ -171,6 +193,7 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
team_report = UsageCounters(
events=all_data["teams_with_event_count_in_period"].get(team.id, 0),
recordings=all_data["teams_with_recording_count_in_period"].get(team.id, 0),
rows_synced=all_data["teams_with_rows_synced_in_period"].get(team.id, 0),
)

org_id = str(team.organization.id)
Expand All @@ -183,7 +206,7 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
for field in team_report:
org_report[field] += team_report[field] # type: ignore

quota_limited_orgs: Dict[str, Dict[str, int]] = {"events": {}, "recordings": {}}
quota_limited_orgs: Dict[str, Dict[str, int]] = {"events": {}, "recordings": {}, "rows_synced": {}}

# We find all orgs that should be rate limited
for org_id, todays_report in todays_usage_report.items():
Expand All @@ -195,7 +218,7 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
if set_org_usage_summary(org, todays_usage=todays_report):
org.save(update_fields=["usage"])

for field in ["events", "recordings"]:
for field in ["events", "recordings", "rows_synced"]:
quota_limited_until = org_quota_limited_until(org, QuotaResource(field))

if quota_limited_until:
Expand All @@ -207,12 +230,13 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
previously_quota_limited_team_tokens: Dict[str, Dict[str, int]] = {
"events": {},
"recordings": {},
"rows_synced": {},
}

for field in quota_limited_orgs:
previously_quota_limited_team_tokens[field] = list_limited_team_tokens(QuotaResource(field))
previously_quota_limited_team_tokens[field] = list_limited_team_attributes(QuotaResource(field))

quota_limited_teams: Dict[str, Dict[str, int]] = {"events": {}, "recordings": {}}
quota_limited_teams: Dict[str, Dict[str, int]] = {"events": {}, "recordings": {}, "rows_synced": {}}

# Convert the org ids to team tokens
for team in teams:
Expand All @@ -233,6 +257,7 @@ def update_all_org_billing_quotas(dry_run: bool = False) -> Dict[str, Dict[str,
properties = {
"quota_limited_events": quota_limited_orgs["events"].get(org_id, None),
"quota_limited_recordings": quota_limited_orgs["events"].get(org_id, None),
"quota_limited_rows_synced": quota_limited_orgs["rows_synced"].get(org_id, None),
}

report_organization_action(
Expand Down
Loading

0 comments on commit 820e6a7

Please sign in to comment.