Skip to content

Commit

Permalink
test(backend): Make demo data gen fast in tests (#26287)
Browse files Browse the repository at this point in the history
Co-authored-by: Georgiy Tarasov <[email protected]>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent 8cdc80a commit d1f7db9
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 34 deletions.
28 changes: 28 additions & 0 deletions ee/hogai/eval/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pytest

from posthog.test.base import run_clickhouse_statement_in_parallel


@pytest.fixture(scope="module", autouse=True)
def setup_kafka_tables(django_db_setup):
from posthog.clickhouse.client import sync_execute
from posthog.clickhouse.schema import (
CREATE_KAFKA_TABLE_QUERIES,
build_query,
)
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE

kafka_queries = list(map(build_query, CREATE_KAFKA_TABLE_QUERIES))
run_clickhouse_statement_in_parallel(kafka_queries)

yield

kafka_tables = sync_execute(
f"""
SELECT name
FROM system.tables
WHERE database = '{CLICKHOUSE_DATABASE}' AND name LIKE 'kafka_%'
""",
)
kafka_truncate_queries = [f"DROP TABLE {table[0]} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" for table in kafka_tables]
run_clickhouse_statement_in_parallel(kafka_truncate_queries)
File renamed without changes.
File renamed without changes.
File renamed without changes.
12 changes: 7 additions & 5 deletions ee/hogai/eval/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime as dt
import os

import pytest
from django.test import override_settings
from flaky import flaky

from posthog.demo.matrix.manager import MatrixManager
Expand All @@ -17,12 +17,14 @@ def setUpTestData(cls):
super().setUpTestData()
matrix = HedgeboxMatrix(
seed="b1ef3c66-5f43-488a-98be-6b46d92fbcef", # this seed generates all events
now=dt.datetime.now(dt.UTC) - dt.timedelta(days=25),
days_past=60,
days_past=120,
days_future=30,
n_clusters=60,
n_clusters=500,
group_type_index_offset=0,
)
matrix_manager = MatrixManager(matrix, print_steps=True)
existing_user = cls.team.organization.members.first()
matrix_manager.run_on_team(cls.team, existing_user)
with override_settings(TEST=False):
# Simulation saving should occur in non-test mode, so that Kafka isn't mocked. Normally in tests we don't
# want to ingest via Kafka, but simulation saving is specifically designed to use that route for speed
matrix_manager.run_on_team(cls.team, existing_user)
24 changes: 12 additions & 12 deletions ee/hogai/taxonomy_agent/test/test_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, _create_event, _create_person


class TestToolkit(TaxonomyAgentToolkit):
class DummyToolkit(TaxonomyAgentToolkit):
def _get_tools(self) -> list[ToolkitTool]:
return self._default_tools

Expand Down Expand Up @@ -69,7 +69,7 @@ def _create_taxonomy(self):
)

def test_retrieve_entity_properties(self):
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)

PropertyDefinition.objects.create(
team=self.team, type=PropertyDefinition.Type.PERSON, name="test", property_type="String"
Expand Down Expand Up @@ -100,14 +100,14 @@ def test_retrieve_entity_properties(self):
)

def test_retrieve_entity_properties_returns_descriptive_feedback_without_properties(self):
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
self.assertEqual(
toolkit.retrieve_entity_properties("person"),
"Properties do not exist in the taxonomy for the entity person.",
)

def test_retrieve_entity_property_values(self):
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
self.assertEqual(
toolkit.retrieve_entity_property_values("session", "$session_duration"),
"30, 146, 2 and many more distinct values.",
Expand Down Expand Up @@ -148,7 +148,7 @@ def test_retrieve_entity_property_values(self):
"5, 4, 3, 2, 1 and 1 more distinct value.",
)

toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
GroupTypeMapping.objects.create(
team=self.team, project_id=self.team.project_id, group_type_index=0, group_type="proj"
)
Expand Down Expand Up @@ -192,18 +192,18 @@ def test_group_names(self):
GroupTypeMapping.objects.create(
team=self.team, project_id=self.team.project_id, group_type_index=1, group_type="org"
)
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
self.assertEqual(toolkit._entity_names, ["person", "session", "proj", "org"])

def test_retrieve_event_properties_returns_descriptive_feedback_without_properties(self):
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
self.assertEqual(
toolkit.retrieve_event_properties("pageview"),
"Properties do not exist in the taxonomy for the event pageview.",
)

def test_empty_events(self):
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
self.assertEqual(
toolkit.retrieve_event_properties("test"), "Properties do not exist in the taxonomy for the event test."
)
Expand All @@ -220,15 +220,15 @@ def test_empty_events(self):
team=self.team,
)

toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
self.assertEqual(
toolkit.retrieve_event_properties("event1"),
"Properties do not exist in the taxonomy for the event event1.",
)

def test_retrieve_event_properties(self):
self._create_taxonomy()
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
prompt = toolkit.retrieve_event_properties("event1")

self.assertIn(
Expand All @@ -250,7 +250,7 @@ def test_retrieve_event_properties(self):

def test_retrieve_event_property_values(self):
self._create_taxonomy()
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)

self.assertIn('"Chrome"', toolkit.retrieve_event_property_values("event1", "$browser"))
self.assertIn('"Firefox"', toolkit.retrieve_event_property_values("event1", "$browser"))
Expand All @@ -264,7 +264,7 @@ def test_retrieve_event_property_values(self):
)

def test_enrich_props_with_descriptions(self):
toolkit = TestToolkit(self.team)
toolkit = DummyToolkit(self.team)
res = toolkit._enrich_props_with_descriptions("event", [("$geoip_city_name", "String")])
self.assertEqual(len(res), 1)
prop, type, description = res[0]
Expand Down
25 changes: 14 additions & 11 deletions posthog/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Any

import pytest
from django.conf import settings
from infi.clickhouse_orm import Database
Expand All @@ -22,14 +20,19 @@ def create_clickhouse_tables(num_tables: int):
build_query,
)

# REMEMBER TO ADD ANY NEW CLICKHOUSE TABLES TO THIS ARRAY!
CREATE_TABLE_QUERIES: tuple[Any, ...] = CREATE_MERGETREE_TABLE_QUERIES + CREATE_DISTRIBUTED_TABLE_QUERIES
total_tables = (
len(CREATE_MERGETREE_TABLE_QUERIES)
+ len(CREATE_DISTRIBUTED_TABLE_QUERIES)
+ len(CREATE_MV_TABLE_QUERIES)
+ len(CREATE_VIEW_QUERIES)
+ len(CREATE_DICTIONARY_QUERIES)
)

# Check if all the tables have already been created
if num_tables == len(CREATE_TABLE_QUERIES):
# Check if all the tables have already been created. Views, materialized views, and dictionaries also count
if num_tables == total_tables:
return

table_queries = list(map(build_query, CREATE_TABLE_QUERIES))
table_queries = list(map(build_query, CREATE_MERGETREE_TABLE_QUERIES + CREATE_DISTRIBUTED_TABLE_QUERIES))
run_clickhouse_statement_in_parallel(table_queries)

mv_queries = list(map(build_query, CREATE_MV_TABLE_QUERIES))
Expand All @@ -38,12 +41,12 @@ def create_clickhouse_tables(num_tables: int):
view_queries = list(map(build_query, CREATE_VIEW_QUERIES))
run_clickhouse_statement_in_parallel(view_queries)

data_queries = list(map(build_query, CREATE_DATA_QUERIES))
run_clickhouse_statement_in_parallel(data_queries)

dictionary_queries = list(map(build_query, CREATE_DICTIONARY_QUERIES))
run_clickhouse_statement_in_parallel(dictionary_queries)

data_queries = list(map(build_query, CREATE_DATA_QUERIES))
run_clickhouse_statement_in_parallel(data_queries)


def reset_clickhouse_tables():
# Truncate clickhouse tables to default before running test
Expand All @@ -58,6 +61,7 @@ def reset_clickhouse_tables():
from posthog.models.app_metrics.sql import TRUNCATE_APP_METRICS_TABLE_SQL
from posthog.models.channel_type.sql import TRUNCATE_CHANNEL_DEFINITION_TABLE_SQL
from posthog.models.cohort.sql import TRUNCATE_COHORTPEOPLE_TABLE_SQL
from posthog.models.error_tracking.sql import TRUNCATE_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL
from posthog.models.event.sql import TRUNCATE_EVENTS_TABLE_SQL
from posthog.models.group.sql import TRUNCATE_GROUPS_TABLE_SQL
from posthog.models.performance.sql import TRUNCATE_PERFORMANCE_EVENTS_TABLE_SQL
Expand All @@ -68,7 +72,6 @@ def reset_clickhouse_tables():
TRUNCATE_PERSON_STATIC_COHORT_TABLE_SQL,
TRUNCATE_PERSON_TABLE_SQL,
)
from posthog.models.error_tracking.sql import TRUNCATE_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL
from posthog.models.sessions.sql import TRUNCATE_SESSIONS_TABLE_SQL
from posthog.session_recordings.sql.session_recording_event_sql import (
TRUNCATE_SESSION_RECORDING_EVENTS_TABLE_SQL,
Expand Down
15 changes: 11 additions & 4 deletions posthog/demo/matrix/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ def create_team(organization: Organization, **kwargs) -> Team:
return team

def run_on_team(self, team: Team, user: User):
if self.print_steps:
print(f"Saving simulated data...")
does_clickhouse_data_need_saving = True
if self.use_pre_save:
does_clickhouse_data_need_saving = not self._is_demo_data_pre_saved()
Expand All @@ -122,14 +120,20 @@ def run_on_team(self, team: Team, user: User):
source_team = team
if does_clickhouse_data_need_saving:
if self.matrix.is_complete is None:
if self.print_steps:
print(f"Simulating data...")
self.matrix.simulate()
if self.print_steps:
print(f"Saving simulated data...")
self._save_analytics_data(source_team)
if self.use_pre_save:
if self.print_steps:
print(f"Copying simulated data from master team...")
self._copy_analytics_data_from_master_team(team)
self._sync_postgres_with_clickhouse_data(source_team.pk, team.pk)
self.matrix.set_project_up(team, user)
if self.print_steps:
print(f"Inferring taxonomy for Data Management...")
print(f"Inferring taxonomy for data management...")
event_definition_count, property_definition_count, event_properties_count = infer_taxonomy_for_team(team.pk)
if self.print_steps:
print(
Expand All @@ -139,6 +143,7 @@ def run_on_team(self, team: Team, user: User):
cohort.calculate_people_ch(pending_version=0)
team.project.save()
team.save()
print(f"Demo data ready for team ID {team.pk}.")

def _save_analytics_data(self, data_team: Team):
sim_persons = self.matrix.people
Expand Down Expand Up @@ -370,7 +375,7 @@ def _sleep_until_person_data_in_clickhouse(self, team_id: int):
GET_PERSON_DISTINCT_ID2_COUNT_FOR_TEAM,
)

while True:
for _ in range(120):
person_count: int = sync_execute(GET_PERSON_COUNT_FOR_TEAM, {"team_id": team_id})[0][0]
person_distinct_id_count: int = sync_execute(GET_PERSON_DISTINCT_ID2_COUNT_FOR_TEAM, {"team_id": team_id})[
0
Expand All @@ -393,6 +398,8 @@ def _sleep_until_person_data_in_clickhouse(self, team_id: int):
f"Persons: {persons_progress}. Person distinct IDs: {person_distinct_ids_progress}."
)
sleep(0.5)
else:
raise TimeoutError("Person data did not land in ClickHouse in time.")

@classmethod
def _is_demo_data_pre_saved(cls) -> bool:
Expand Down
8 changes: 6 additions & 2 deletions posthog/kafka_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _sasl_params():
class _KafkaProducer:
def __init__(
self,
test=settings.TEST,
test=False,
# the default producer uses these defaulted environment variables,
# but the session recording producer needs to override them
kafka_base64_keys=None,
Expand All @@ -115,6 +115,8 @@ def __init__(
max_request_size=None,
compression_type=None,
):
if settings.TEST:
test = True # Set at runtime so that overriden settings.TEST is supported
if kafka_security_protocol is None:
kafka_security_protocol = settings.KAFKA_SECURITY_PROTOCOL
if kafka_hosts is None:
Expand Down Expand Up @@ -219,10 +221,12 @@ def build_kafka_consumer(
topic: Optional[str],
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
auto_offset_reset="latest",
test=settings.TEST,
test=False,
group_id=None,
consumer_timeout_ms=float("inf"),
):
if settings.TEST:
test = True # Set at runtime so that overriden settings.TEST is supported
if test:
consumer = KafkaConsumerForTests(
topic=topic,
Expand Down
1 change: 1 addition & 0 deletions posthog/kafka_client/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from posthog.kafka_client.client import _KafkaProducer, build_kafka_consumer


@override_settings(TEST=False)
class KafkaClientTestCase(TestCase):
def setUp(self):
self.topic = "test_topic"
Expand Down

0 comments on commit d1f7db9

Please sign in to comment.