Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix high memory usage in send_usage_reports #17447

Merged
merged 4 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions posthog/management/commands/send_usage_report.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import pprint

from django.core.management.base import BaseCommand

from posthog.tasks.usage_report import send_all_org_usage_reports
Expand All @@ -10,7 +8,6 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument("--dry-run", type=bool, help="Print information instead of sending it")
parser.add_argument("--print-reports", type=bool, help="Print the reports in full")
parser.add_argument("--date", type=str, help="The date to be ran in format YYYY-MM-DD")
parser.add_argument("--event-name", type=str, help="Override the event name to be sent - for testing")
parser.add_argument(
Expand All @@ -28,20 +25,14 @@ def handle(self, *args, **options):
run_async = options["async"]

if run_async:
results = send_all_org_usage_reports.delay(
send_all_org_usage_reports.delay(
dry_run, date, event_name, skip_capture_event=skip_capture_event, only_organization_id=organization_id
)
else:
results = send_all_org_usage_reports(
send_all_org_usage_reports(
dry_run, date, event_name, skip_capture_event=skip_capture_event, only_organization_id=organization_id
)
if options["print_reports"]:
print("") # noqa T201
pprint.pprint(results) # noqa T203
print("") # noqa T201

if dry_run:
print("Dry run so not sent.") # noqa T201
else:
print(f"{len(results)} Reports sent!") # noqa T201
print("Done!") # noqa T201
184 changes: 88 additions & 96 deletions posthog/tasks/test/test_usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,24 @@
from posthog.hogql.query import execute_hogql_query
from posthog.models import Organization, Plugin, Team
from posthog.models.dashboard import Dashboard
from posthog.models.event.util import create_event
from posthog.models.feature_flag import FeatureFlag
from posthog.models.group.util import create_group
from posthog.models.group_type_mapping import GroupTypeMapping
from posthog.models.plugin import PluginConfig
from posthog.models.sharing_configuration import SharingConfiguration
from posthog.schema import EventsQuery
from posthog.session_recordings.test.test_factory import create_snapshot
from posthog.tasks.usage_report import capture_event, send_all_org_usage_reports
from posthog.tasks.usage_report import (
_get_all_org_reports,
_get_all_usage_data_as_team_rows,
_get_full_org_usage_report,
_get_full_org_usage_report_as_dict,
_get_team_report,
capture_event,
get_instance_metadata,
send_all_org_usage_reports,
)
from posthog.test.base import (
APIBaseTest,
ClickhouseDestroyTablesMixin,
Expand All @@ -37,8 +47,7 @@
flush_persons_and_events,
snapshot_clickhouse_queries,
)
from posthog.models.event.util import create_event
from posthog.utils import get_machine_id
from posthog.utils import get_machine_id, get_previous_day

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -296,16 +305,20 @@ def _test_usage_report(self) -> List[dict]:
self._create_plugin("Installed but not enabled", False)
self._create_plugin("Installed and enabled", True)

all_reports = send_all_org_usage_reports(dry_run=False)
period = get_previous_day()
period_start, period_end = period
all_reports = _get_all_org_reports(period_start, period_end)
report = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.organization.id)], get_instance_metadata(period))
)

report = all_reports[0]
assert report["table_sizes"]
assert report["table_sizes"]["posthog_event"] < 10**7 # <10MB
assert report["table_sizes"]["posthog_sessionrecordingevent"] < 10**7 # <10MB

assert len(all_reports) == 2

expectation = [
expectations = [
{
"deployment_infrastructure": "tests",
"realm": "hosted-clickhouse",
Expand All @@ -316,12 +329,12 @@ def _test_usage_report(self) -> List[dict]:
"site_url": "http://test.posthog.com",
"product": "open source",
"helm": {},
"clickhouse_version": all_reports[0]["clickhouse_version"],
"clickhouse_version": report["clickhouse_version"],
"users_who_logged_in": [],
"users_who_logged_in_count": 0,
"users_who_signed_up": [],
"users_who_signed_up_count": 0,
"table_sizes": all_reports[0]["table_sizes"],
"table_sizes": report["table_sizes"],
"plugins_installed": {"Installed and enabled": 1, "Installed but not enabled": 1},
"plugins_enabled": {"Installed and enabled": 1},
"instance_tag": "none",
Expand Down Expand Up @@ -441,12 +454,12 @@ def _test_usage_report(self) -> List[dict]:
"site_url": "http://test.posthog.com",
"product": "open source",
"helm": {},
"clickhouse_version": all_reports[1]["clickhouse_version"],
"clickhouse_version": report["clickhouse_version"],
"users_who_logged_in": [],
"users_who_logged_in_count": 0,
"users_who_signed_up": [],
"users_who_signed_up_count": 0,
"table_sizes": all_reports[1]["table_sizes"],
"table_sizes": report["table_sizes"],
"plugins_installed": {"Installed and enabled": 1, "Installed but not enabled": 1},
"plugins_enabled": {"Installed and enabled": 1},
"instance_tag": "none",
Expand Down Expand Up @@ -525,18 +538,22 @@ def _test_usage_report(self) -> List[dict]:
},
]

for item in expectation:
for item in expectations:
item.update(**self.expected_properties)

# tricky: list could be in different order
assert len(all_reports) == 2
for report in all_reports:
if report["organization_id"] == expectation[0]["organization_id"]:
assert report == expectation[0]
elif report["organization_id"] == expectation[1]["organization_id"]:
assert report == expectation[1]
full_reports = []
for expectation in expectations:
report = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(
all_reports[expectation["organization_id"]], get_instance_metadata(period)
)
)
assert report == expectation
full_reports.append(report)

return all_reports
return full_reports

@freeze_time("2022-01-10T00:01:00Z")
@patch("os.environ", {"DEPLOYMENT": "tests"})
Expand All @@ -552,6 +569,8 @@ def test_unlicensed_usage_report(self, mock_post: MagicMock, mock_client: MagicM
mock_client.return_value = mock_posthog

all_reports = self._test_usage_report()
with self.settings(SITE_URL="http://test.posthog.com"):
send_all_org_usage_reports()

# Check calls to other services
mock_post.assert_not_called()
Expand Down Expand Up @@ -597,20 +616,21 @@ def test_usage_report_hogql_queries(self) -> None:
run_events_query(query=EventsQuery(select=["event"], limit=50), team=self.team)
sync_execute("SYSTEM FLUSH LOGS")

all_reports = send_all_org_usage_reports(dry_run=False, at=str(now() + relativedelta(days=1)))
assert len(all_reports) == 1
period = get_previous_day(at=now() + relativedelta(days=1))
period_start, period_end = period
all_reports = _get_all_usage_data_as_team_rows(period_start, period_end)

report = all_reports[0]["teams"][str(self.team.pk)]
report = _get_team_report(all_reports, self.team)

# We selected 200 or 50 rows, but still read 100 rows to return the query
assert report["hogql_app_rows_read"] == 100
assert report["hogql_app_bytes_read"] > 0
assert report["event_explorer_app_rows_read"] == 100
assert report["event_explorer_app_bytes_read"] > 0
assert report.hogql_app_rows_read == 100
assert report.hogql_app_bytes_read > 0
assert report.event_explorer_app_rows_read == 100
assert report.event_explorer_app_bytes_read > 0

# Nothing was read via the API
assert report["hogql_api_rows_read"] == 0
assert report["event_explorer_api_rows_read"] == 0
assert report.hogql_api_rows_read == 0
assert report.event_explorer_api_rows_read == 0


@freeze_time("2022-01-10T00:01:00Z")
Expand Down Expand Up @@ -680,21 +700,19 @@ def test_usage_report_decide_requests(self, billing_task_mock: MagicMock, postho
flush_persons_and_events()

with self.settings(DECIDE_BILLING_ANALYTICS_TOKEN="correct"):
all_reports = send_all_org_usage_reports(dry_run=False, at=str(now() + relativedelta(days=1)))
period = get_previous_day(at=now() + relativedelta(days=1))
period_start, period_end = period
all_reports = _get_all_org_reports(period_start, period_end)

assert len(all_reports) == 3

all_reports = sorted(all_reports, key=lambda x: x["organization_name"])

assert [all_reports["organization_name"] for all_reports in all_reports] == [
"Org 1",
"Org 2",
"PostHog",
]

org_1_report = all_reports[0]
org_2_report = all_reports[1]
analytics_report = all_reports[2]
org_1_report = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.org_1.id)], get_instance_metadata(period))
)
assert org_1_report["organization_name"] == "Org 1"
org_2_report = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.org_2.id)], get_instance_metadata(period))
)

assert org_1_report["organization_name"] == "Org 1"
assert org_1_report["decide_requests_count_in_period"] == 11
Expand All @@ -721,26 +739,6 @@ def test_usage_report_decide_requests(self, billing_task_mock: MagicMock, postho
assert org_2_report["teams"]["5"]["billable_feature_flag_requests_count_in_period"] == 0
assert org_2_report["teams"]["5"]["billable_feature_flag_requests_count_in_month"] == 0

# billing service calls are made only for org1, which has decide requests, and analytics org - which has decide usage events.
calls = [
call(
org_1_report["organization_id"],
ANY,
),
call(
analytics_report["organization_id"],
ANY,
),
]
assert billing_task_mock.delay.call_count == 2
billing_task_mock.delay.assert_has_calls(
calls,
any_order=True,
)

# capture usage report calls are made for all orgs
assert posthog_capture_mock.return_value.capture.call_count == 3

@patch("posthog.tasks.usage_report.Client")
@patch("posthog.tasks.usage_report.send_report_to_billing_service")
def test_usage_report_local_evaluation_requests(
Expand Down Expand Up @@ -792,21 +790,19 @@ def test_usage_report_local_evaluation_requests(
flush_persons_and_events()

with self.settings(DECIDE_BILLING_ANALYTICS_TOKEN="correct"):
all_reports = send_all_org_usage_reports(dry_run=False, at=str(now() + relativedelta(days=1)))
period = get_previous_day(at=now() + relativedelta(days=1))
period_start, period_end = period
all_reports = _get_all_org_reports(period_start, period_end)

assert len(all_reports) == 3

all_reports = sorted(all_reports, key=lambda x: x["organization_name"])

assert [all_reports["organization_name"] for all_reports in all_reports] == [
"Org 1",
"Org 2",
"PostHog",
]

org_1_report = all_reports[0]
org_2_report = all_reports[1]
analytics_report = all_reports[2]
org_1_report = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.org_1.id)], get_instance_metadata(period))
)
assert org_1_report["organization_name"] == "Org 1"
org_2_report = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.org_2.id)], get_instance_metadata(period))
)

assert org_1_report["organization_name"] == "Org 1"
assert org_1_report["local_evaluation_requests_count_in_period"] == 11
Expand Down Expand Up @@ -837,26 +833,6 @@ def test_usage_report_local_evaluation_requests(
assert org_2_report["teams"]["5"]["billable_feature_flag_requests_count_in_period"] == 0
assert org_2_report["teams"]["5"]["billable_feature_flag_requests_count_in_month"] == 0

# billing service calls are made only for org1, which has decide requests, and analytics org - which has local evaluation usage events.
calls = [
call(
org_1_report["organization_id"],
ANY,
),
call(
analytics_report["organization_id"],
ANY,
),
]
assert billing_task_mock.delay.call_count == 2
billing_task_mock.delay.assert_has_calls(
calls,
any_order=True,
)

# capture usage report calls are made for all orgs
assert posthog_capture_mock.return_value.capture.call_count == 3


class SendUsageTest(LicensedTestMixin, ClickhouseDestroyTablesMixin, APIBaseTest):
def setUp(self) -> None:
Expand Down Expand Up @@ -907,18 +883,26 @@ def test_send_usage(self, mock_post: MagicMock, mock_client: MagicMock) -> None:
mock_posthog = MagicMock()
mock_client.return_value = mock_posthog

all_reports = send_all_org_usage_reports(dry_run=False)
period = get_previous_day()
period_start, period_end = period
all_reports = _get_all_org_reports(period_start, period_end)
full_report_as_dict = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.organization.id)], get_instance_metadata(period))
)
send_all_org_usage_reports(dry_run=False)
license = License.objects.first()
assert license
token = build_billing_token(license, self.organization)
mock_post.assert_called_once_with(
f"{BILLING_SERVICE_URL}/api/usage", json=all_reports[0], headers={"Authorization": f"Bearer {token}"}
f"{BILLING_SERVICE_URL}/api/usage",
json=full_report_as_dict,
headers={"Authorization": f"Bearer {token}"},
)

mock_posthog.capture.assert_any_call(
get_machine_id(),
"organization usage report",
{**all_reports[0], "scope": "machine"},
{**full_report_as_dict, "scope": "machine"},
groups={"instance": ANY},
timestamp=None,
)
Expand All @@ -935,18 +919,26 @@ def test_send_usage_cloud(self, mock_post: MagicMock, mock_client: MagicMock) ->
mock_posthog = MagicMock()
mock_client.return_value = mock_posthog

all_reports = send_all_org_usage_reports(dry_run=False)
period = get_previous_day()
period_start, period_end = period
all_reports = _get_all_org_reports(period_start, period_end)
full_report_as_dict = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.organization.id)], get_instance_metadata(period))
)
send_all_org_usage_reports(dry_run=False)
license = License.objects.first()
assert license
token = build_billing_token(license, self.organization)
mock_post.assert_called_once_with(
f"{BILLING_SERVICE_URL}/api/usage", json=all_reports[0], headers={"Authorization": f"Bearer {token}"}
f"{BILLING_SERVICE_URL}/api/usage",
json=full_report_as_dict,
headers={"Authorization": f"Bearer {token}"},
)

mock_posthog.capture.assert_any_call(
self.user.distinct_id,
"organization usage report",
{**all_reports[0], "scope": "user"},
{**full_report_as_dict, "scope": "user"},
groups={"instance": "http://localhost:8000", "organization": str(self.organization.id)},
timestamp=None,
)
Expand Down
Loading
Loading