diff --git a/ee/hogai/eval/conftest.py b/ee/hogai/eval/conftest.py new file mode 100644 index 0000000000000..d0bc75348eeac --- /dev/null +++ b/ee/hogai/eval/conftest.py @@ -0,0 +1,28 @@ +import pytest + +from posthog.test.base import run_clickhouse_statement_in_parallel + + +@pytest.fixture(scope="module", autouse=True) +def setup_kafka_tables(django_db_setup): + from posthog.clickhouse.client import sync_execute + from posthog.clickhouse.schema import ( + CREATE_KAFKA_TABLE_QUERIES, + build_query, + ) + from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE + + kafka_queries = list(map(build_query, CREATE_KAFKA_TABLE_QUERIES)) + run_clickhouse_statement_in_parallel(kafka_queries) + + yield + + kafka_tables = sync_execute( + f""" + SELECT name + FROM system.tables + WHERE database = '{CLICKHOUSE_DATABASE}' AND name LIKE 'kafka_%' + """, + ) + kafka_truncate_queries = [f"DROP TABLE {table[0]} ON CLUSTER '{CLICKHOUSE_CLUSTER}'" for table in kafka_tables] + run_clickhouse_statement_in_parallel(kafka_truncate_queries) diff --git a/ee/hogai/eval/test_eval_funnel_planner.py b/ee/hogai/eval/tests/test_eval_funnel_planner.py similarity index 100% rename from ee/hogai/eval/test_eval_funnel_planner.py rename to ee/hogai/eval/tests/test_eval_funnel_planner.py diff --git a/ee/hogai/eval/test_eval_router.py b/ee/hogai/eval/tests/test_eval_router.py similarity index 100% rename from ee/hogai/eval/test_eval_router.py rename to ee/hogai/eval/tests/test_eval_router.py diff --git a/ee/hogai/eval/test_eval_trends_planner.py b/ee/hogai/eval/tests/test_eval_trends_planner.py similarity index 100% rename from ee/hogai/eval/test_eval_trends_planner.py rename to ee/hogai/eval/tests/test_eval_trends_planner.py diff --git a/ee/hogai/eval/utils.py b/ee/hogai/eval/utils.py index 473b47fe17a84..1e50a75daefa2 100644 --- a/ee/hogai/eval/utils.py +++ b/ee/hogai/eval/utils.py @@ -1,7 +1,7 @@ -import datetime as dt import os import pytest +from django.test import override_settings from flaky import flaky from posthog.demo.matrix.manager import MatrixManager @@ -17,12 +17,14 @@ def setUpTestData(cls): super().setUpTestData() matrix = HedgeboxMatrix( seed="b1ef3c66-5f43-488a-98be-6b46d92fbcef", # this seed generates all events - now=dt.datetime.now(dt.UTC) - dt.timedelta(days=25), - days_past=60, + days_past=120, days_future=30, - n_clusters=60, + n_clusters=500, group_type_index_offset=0, ) matrix_manager = MatrixManager(matrix, print_steps=True) existing_user = cls.team.organization.members.first() - matrix_manager.run_on_team(cls.team, existing_user) + with override_settings(TEST=False): + # Simulation saving should occur in non-test mode, so that Kafka isn't mocked. Normally in tests we don't + # want to ingest via Kafka, but simulation saving is specifically designed to use that route for speed + matrix_manager.run_on_team(cls.team, existing_user) diff --git a/ee/hogai/taxonomy_agent/test/test_toolkit.py b/ee/hogai/taxonomy_agent/test/test_toolkit.py index 078452225db28..32967d09165cb 100644 --- a/ee/hogai/taxonomy_agent/test/test_toolkit.py +++ b/ee/hogai/taxonomy_agent/test/test_toolkit.py @@ -10,7 +10,7 @@ from posthog.test.base import APIBaseTest, ClickhouseTestMixin, _create_event, _create_person -class TestToolkit(TaxonomyAgentToolkit): +class DummyToolkit(TaxonomyAgentToolkit): def _get_tools(self) -> list[ToolkitTool]: return self._default_tools @@ -69,7 +69,7 @@ def _create_taxonomy(self): ) def test_retrieve_entity_properties(self): - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) PropertyDefinition.objects.create( team=self.team, type=PropertyDefinition.Type.PERSON, name="test", property_type="String" @@ -100,14 +100,14 @@ def test_retrieve_entity_properties(self): ) def test_retrieve_entity_properties_returns_descriptive_feedback_without_properties(self): - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) self.assertEqual( toolkit.retrieve_entity_properties("person"), "Properties do not exist in the taxonomy for the entity person.", ) def test_retrieve_entity_property_values(self): - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) self.assertEqual( toolkit.retrieve_entity_property_values("session", "$session_duration"), "30, 146, 2 and many more distinct values.", @@ -148,7 +148,7 @@ def test_retrieve_entity_property_values(self): "5, 4, 3, 2, 1 and 1 more distinct value.", ) - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) GroupTypeMapping.objects.create( team=self.team, project_id=self.team.project_id, group_type_index=0, group_type="proj" ) @@ -192,18 +192,18 @@ def test_group_names(self): GroupTypeMapping.objects.create( team=self.team, project_id=self.team.project_id, group_type_index=1, group_type="org" ) - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) self.assertEqual(toolkit._entity_names, ["person", "session", "proj", "org"]) def test_retrieve_event_properties_returns_descriptive_feedback_without_properties(self): - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) self.assertEqual( toolkit.retrieve_event_properties("pageview"), "Properties do not exist in the taxonomy for the event pageview.", ) def test_empty_events(self): - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) self.assertEqual( toolkit.retrieve_event_properties("test"), "Properties do not exist in the taxonomy for the event test." ) @@ -220,7 +220,7 @@ def test_empty_events(self): team=self.team, ) - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) self.assertEqual( toolkit.retrieve_event_properties("event1"), "Properties do not exist in the taxonomy for the event event1.", @@ -228,7 +228,7 @@ def test_empty_events(self): def test_retrieve_event_properties(self): self._create_taxonomy() - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) prompt = toolkit.retrieve_event_properties("event1") self.assertIn( @@ -250,7 +250,7 @@ def test_retrieve_event_properties(self): def test_retrieve_event_property_values(self): self._create_taxonomy() - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) self.assertIn('"Chrome"', toolkit.retrieve_event_property_values("event1", "$browser")) self.assertIn('"Firefox"', toolkit.retrieve_event_property_values("event1", "$browser")) @@ -264,7 +264,7 @@ def test_retrieve_event_property_values(self): ) def test_enrich_props_with_descriptions(self): - toolkit = TestToolkit(self.team) + toolkit = DummyToolkit(self.team) res = toolkit._enrich_props_with_descriptions("event", [("$geoip_city_name", "String")]) self.assertEqual(len(res), 1) prop, type, description = res[0] diff --git a/posthog/conftest.py b/posthog/conftest.py index 938ae3028bfeb..ba0ab82e3a3b7 100644 --- a/posthog/conftest.py +++ b/posthog/conftest.py @@ -1,5 +1,3 @@ -from typing import Any - import pytest from django.conf import settings from infi.clickhouse_orm import Database @@ -22,14 +20,19 @@ def create_clickhouse_tables(num_tables: int): build_query, ) - # REMEMBER TO ADD ANY NEW CLICKHOUSE TABLES TO THIS ARRAY! - CREATE_TABLE_QUERIES: tuple[Any, ...] = CREATE_MERGETREE_TABLE_QUERIES + CREATE_DISTRIBUTED_TABLE_QUERIES + total_tables = ( + len(CREATE_MERGETREE_TABLE_QUERIES) + + len(CREATE_DISTRIBUTED_TABLE_QUERIES) + + len(CREATE_MV_TABLE_QUERIES) + + len(CREATE_VIEW_QUERIES) + + len(CREATE_DICTIONARY_QUERIES) + ) - # Check if all the tables have already been created - if num_tables == len(CREATE_TABLE_QUERIES): + # Check if all the tables have already been created. Views, materialized views, and dictionaries also count + if num_tables == total_tables: return - table_queries = list(map(build_query, CREATE_TABLE_QUERIES)) + table_queries = list(map(build_query, CREATE_MERGETREE_TABLE_QUERIES + CREATE_DISTRIBUTED_TABLE_QUERIES)) run_clickhouse_statement_in_parallel(table_queries) mv_queries = list(map(build_query, CREATE_MV_TABLE_QUERIES)) @@ -38,12 +41,12 @@ def create_clickhouse_tables(num_tables: int): view_queries = list(map(build_query, CREATE_VIEW_QUERIES)) run_clickhouse_statement_in_parallel(view_queries) - data_queries = list(map(build_query, CREATE_DATA_QUERIES)) - run_clickhouse_statement_in_parallel(data_queries) - dictionary_queries = list(map(build_query, CREATE_DICTIONARY_QUERIES)) run_clickhouse_statement_in_parallel(dictionary_queries) + data_queries = list(map(build_query, CREATE_DATA_QUERIES)) + run_clickhouse_statement_in_parallel(data_queries) + def reset_clickhouse_tables(): # Truncate clickhouse tables to default before running test @@ -58,6 +61,7 @@ def reset_clickhouse_tables(): from posthog.models.app_metrics.sql import TRUNCATE_APP_METRICS_TABLE_SQL from posthog.models.channel_type.sql import TRUNCATE_CHANNEL_DEFINITION_TABLE_SQL from posthog.models.cohort.sql import TRUNCATE_COHORTPEOPLE_TABLE_SQL + from posthog.models.error_tracking.sql import TRUNCATE_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL from posthog.models.event.sql import TRUNCATE_EVENTS_TABLE_SQL from posthog.models.group.sql import TRUNCATE_GROUPS_TABLE_SQL from posthog.models.performance.sql import TRUNCATE_PERFORMANCE_EVENTS_TABLE_SQL @@ -68,7 +72,6 @@ def reset_clickhouse_tables(): TRUNCATE_PERSON_STATIC_COHORT_TABLE_SQL, TRUNCATE_PERSON_TABLE_SQL, ) - from posthog.models.error_tracking.sql import TRUNCATE_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL from posthog.models.sessions.sql import TRUNCATE_SESSIONS_TABLE_SQL from posthog.session_recordings.sql.session_recording_event_sql import ( TRUNCATE_SESSION_RECORDING_EVENTS_TABLE_SQL, diff --git a/posthog/demo/matrix/manager.py b/posthog/demo/matrix/manager.py index 8c69ef06a8d24..0abc17f32ca08 100644 --- a/posthog/demo/matrix/manager.py +++ b/posthog/demo/matrix/manager.py @@ -112,8 +112,6 @@ def create_team(organization: Organization, **kwargs) -> Team: return team def run_on_team(self, team: Team, user: User): - if self.print_steps: - print(f"Saving simulated data...") does_clickhouse_data_need_saving = True if self.use_pre_save: does_clickhouse_data_need_saving = not self._is_demo_data_pre_saved() @@ -122,14 +120,20 @@ def run_on_team(self, team: Team, user: User): source_team = team if does_clickhouse_data_need_saving: if self.matrix.is_complete is None: + if self.print_steps: + print(f"Simulating data...") self.matrix.simulate() + if self.print_steps: + print(f"Saving simulated data...") self._save_analytics_data(source_team) if self.use_pre_save: + if self.print_steps: + print(f"Copying simulated data from master team...") self._copy_analytics_data_from_master_team(team) self._sync_postgres_with_clickhouse_data(source_team.pk, team.pk) self.matrix.set_project_up(team, user) if self.print_steps: - print(f"Inferring taxonomy for Data Management...") + print(f"Inferring taxonomy for data management...") event_definition_count, property_definition_count, event_properties_count = infer_taxonomy_for_team(team.pk) if self.print_steps: print( @@ -139,6 +143,7 @@ def run_on_team(self, team: Team, user: User): cohort.calculate_people_ch(pending_version=0) team.project.save() team.save() + print(f"Demo data ready for team ID {team.pk}.") def _save_analytics_data(self, data_team: Team): sim_persons = self.matrix.people @@ -370,7 +375,7 @@ def _sleep_until_person_data_in_clickhouse(self, team_id: int): GET_PERSON_DISTINCT_ID2_COUNT_FOR_TEAM, ) - while True: + for _ in range(120): person_count: int = sync_execute(GET_PERSON_COUNT_FOR_TEAM, {"team_id": team_id})[0][0] person_distinct_id_count: int = sync_execute(GET_PERSON_DISTINCT_ID2_COUNT_FOR_TEAM, {"team_id": team_id})[ 0 @@ -393,6 +398,8 @@ def _sleep_until_person_data_in_clickhouse(self, team_id: int): f"Persons: {persons_progress}. Person distinct IDs: {person_distinct_ids_progress}." ) sleep(0.5) + else: + raise TimeoutError("Person data did not land in ClickHouse in time.") @classmethod def _is_demo_data_pre_saved(cls) -> bool: diff --git a/posthog/kafka_client/client.py b/posthog/kafka_client/client.py index b4bc9f1f74c88..021677ad86ed2 100644 --- a/posthog/kafka_client/client.py +++ b/posthog/kafka_client/client.py @@ -106,7 +106,7 @@ def _sasl_params(): class _KafkaProducer: def __init__( self, - test=settings.TEST, + test=False, # the default producer uses these defaulted environment variables, # but the session recording producer needs to override them kafka_base64_keys=None, @@ -115,6 +115,8 @@ def __init__( max_request_size=None, compression_type=None, ): + if settings.TEST: + test = True # Set at runtime so that overriden settings.TEST is supported if kafka_security_protocol is None: kafka_security_protocol = settings.KAFKA_SECURITY_PROTOCOL if kafka_hosts is None: @@ -219,10 +221,12 @@ def build_kafka_consumer( topic: Optional[str], value_deserializer=lambda v: json.loads(v.decode("utf-8")), auto_offset_reset="latest", - test=settings.TEST, + test=False, group_id=None, consumer_timeout_ms=float("inf"), ): + if settings.TEST: + test = True # Set at runtime so that overriden settings.TEST is supported if test: consumer = KafkaConsumerForTests( topic=topic, diff --git a/posthog/kafka_client/test/test_client.py b/posthog/kafka_client/test/test_client.py index a4891e0d73403..8ae920a25b2b7 100644 --- a/posthog/kafka_client/test/test_client.py +++ b/posthog/kafka_client/test/test_client.py @@ -6,6 +6,7 @@ from posthog.kafka_client.client import _KafkaProducer, build_kafka_consumer +@override_settings(TEST=False) class KafkaClientTestCase(TestCase): def setUp(self): self.topic = "test_topic"