Skip to content

Commit

Permalink
be
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Sep 21, 2023
1 parent 1f6bad1 commit 9a4b1ad
Showing 1 changed file with 37 additions and 61 deletions.
98 changes: 37 additions & 61 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from posthog.kafka_client.client import KafkaProducer, sessionRecordingKafkaProducer
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_SESSION_RECORDING_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
)
from posthog.settings import (
Expand Down Expand Up @@ -1178,9 +1177,9 @@ def test_cors_allows_tracing_headers(self, _: str, path: str, headers: List[str]
def test_legacy_recording_ingestion_data_sent_to_kafka(self, kafka_produce) -> None:
session_id = "some_session_id"
self._send_session_recording_event(session_id=session_id)
self.assertEqual(kafka_produce.call_count, 2)
self.assertEqual(kafka_produce.call_count, 1)
kafka_topic_used = kafka_produce.call_args_list[0][1]["topic"]
self.assertEqual(kafka_topic_used, KAFKA_SESSION_RECORDING_EVENTS)
self.assertEqual(kafka_topic_used, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS)
key = kafka_produce.call_args_list[0][1]["key"]
self.assertEqual(key, session_id)

Expand All @@ -1205,48 +1204,28 @@ def test_legacy_recording_ingestion_compression_and_transformation(self, kafka_p
window_id=window_id,
event_data=event_data,
)
self.assertEqual(kafka_produce.call_count, 2)
self.assertEqual(kafka_produce.call_args_list[0][1]["topic"], KAFKA_SESSION_RECORDING_EVENTS)
self.assertEqual(kafka_produce.call_count, 1)
self.assertEqual(kafka_produce.call_args_list[0][1]["topic"], KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS)
key = kafka_produce.call_args_list[0][1]["key"]
self.assertEqual(key, session_id)
data_sent_to_kafka = json.loads(kafka_produce.call_args_list[0][1]["data"]["data"])

self.assertEqual(
data_sent_to_kafka,
{
"event": "$snapshot",
"properties": {
"$snapshot_data": {
"chunk_count": 1,
"chunk_id": "fake-uuid",
"chunk_index": 0,
"data": "H4sIAIB3mGAC/42NSwqAQAxD31Fk1m4GUUavIi78ggtR/CxEvLoaPweQQJu2SXoeKRuGmZWBWizBw+GrGipyXfJve+smehZGyh/aRtr+mw2FbqP6LryOmZZOOdPj6/T/1VoiQuWGD4sFq8kRyJlxAaIGxIyyAAAA",
"compression": "gzip-base64",
"has_full_snapshot": False,
"events_summary": [
{
"type": snapshot_type,
"data": {"source": snapshot_source},
"timestamp": timestamp,
}
],
},
"$session_id": session_id,
"$window_id": window_id,
"distinct_id": distinct_id,
},
"offset": 1993,
assert data_sent_to_kafka == {
"event": "$snapshot_items",
"properties": {
"$snapshot_items": [
{
"type": snapshot_type,
"timestamp": timestamp,
"data": {"data": event_data, "source": snapshot_source},
}
],
"$session_id": session_id,
"$window_id": window_id,
"distinct_id": distinct_id,
},
)

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_legacy_recording_ingestion_large_is_split_into_multiple_messages(self, kafka_produce) -> None:
self._send_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])

assert topic_counter == Counter(
{KAFKA_SESSION_RECORDING_EVENTS: 3, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}
)
"offset": 1993,
}

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_can_write_to_blob_ingestion_topic_with_usual_size_limit(self, kafka_produce) -> None:
Expand All @@ -1256,10 +1235,7 @@ def test_recording_ingestion_can_write_to_blob_ingestion_topic_with_usual_size_l
self._send_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])

# this fake data doesn't split, so we send one huge message to the item events topic
assert topic_counter == Counter(
{KAFKA_SESSION_RECORDING_EVENTS: 3, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}
)
assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1})

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produce) -> None:
Expand All @@ -1269,9 +1245,7 @@ def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produ
self._send_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])

assert topic_counter == Counter(
{KAFKA_SESSION_RECORDING_EVENTS: 3, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}
)
assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1})

@patch("posthog.kafka_client.client.SessionRecordingKafkaProducer")
def test_create_session_recording_kafka_with_expected_hosts(
Expand Down Expand Up @@ -1320,20 +1294,15 @@ def test_can_redirect_session_recordings_to_alternative_kafka(
data = "example"
session_id = "test_can_redirect_session_recordings_to_alternative_kafka"
self._send_session_recording_event(event_data=data, session_id=session_id)
default_kafka_producer_mock.assert_called()
# session events don't get routed through the default kafka producer
default_kafka_producer_mock.assert_not_called()
session_recording_producer_factory_mock.assert_called()

assert len(kafka_produce.call_args_list) == 2
assert len(kafka_produce.call_args_list) == 1

call_one = kafka_produce.call_args_list[0][1]
assert call_one["key"] == session_id
data_sent_to_default_kafka = json.loads(call_one["data"]["data"])
assert data_sent_to_default_kafka["event"] == "$snapshot"
assert data_sent_to_default_kafka["properties"]["$snapshot_data"]["chunk_count"] == 1

call_two = kafka_produce.call_args_list[1][1]
assert call_two["key"] == session_id
data_sent_to_recording_kafka = json.loads(call_two["data"]["data"])
data_sent_to_recording_kafka = json.loads(call_one["data"]["data"])
assert data_sent_to_recording_kafka["event"] == "$snapshot_items"
assert len(data_sent_to_recording_kafka["properties"]["$snapshot_items"]) == 1

Expand Down Expand Up @@ -1390,11 +1359,11 @@ def test_quota_limits_ignored_if_disabled(self, kafka_produce) -> None:
replace_limited_team_tokens(QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000})
replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000})
self._send_session_recording_event()
self.assertEqual(kafka_produce.call_count, 2)
self.assertEqual(kafka_produce.call_count, 1)

@patch("posthog.kafka_client.client._KafkaProducer.produce")
@pytest.mark.ee
def test_quota_limits(self, kafka_produce) -> None:
def test_quota_limits(self, kafka_produce: MagicMock) -> None:
from ee.billing.quota_limiting import QuotaResource, replace_limited_team_tokens

def _produce_events():
Expand All @@ -1415,11 +1384,18 @@ def _produce_events():

with self.settings(QUOTA_LIMITING_ENABLED=True):
_produce_events()
self.assertEqual(kafka_produce.call_count, 4)
self.assertEqual(
[c[1]["topic"] for c in kafka_produce.call_args_list],
[
"session_recording_snapshot_item_events_test",
"events_plugin_ingestion_test",
"events_plugin_ingestion_test",
],
)

replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000})
_produce_events()
self.assertEqual(kafka_produce.call_count, 2) # Only the recording event
self.assertEqual(kafka_produce.call_count, 1) # Only the recording event

replace_limited_team_tokens(
QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000}
Expand All @@ -1432,7 +1408,7 @@ def _produce_events():
)
replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() - 10000})
_produce_events()
self.assertEqual(kafka_produce.call_count, 4) # All events as limit-until timestamp is in the past
self.assertEqual(kafka_produce.call_count, 3) # All events as limit-until timestamp is in the past

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_capture_historical_analytics_events(self, kafka_produce) -> None:
Expand Down

0 comments on commit 9a4b1ad

Please sign in to comment.