Skip to content

Commit

Permalink
chore: yeet CH recordings ingestion (#17572)
Browse files Browse the repository at this point in the history
Removing ClickHouse based recordings

One big yeet for a man, a great yeet for humanity
  • Loading branch information
pauldambra authored Oct 11, 2023
1 parent 5e0eb33 commit 31c1cdf
Show file tree
Hide file tree
Showing 57 changed files with 1,099 additions and 4,989 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from posthog.constants import INSIGHT_FUNNELS
from posthog.models import Cohort, Filter
from posthog.models.person import Person
from posthog.session_recordings.test.test_factory import create_session_recording_events
from posthog.session_recordings.queries.test.session_replay_sql import produce_replay_summary
from posthog.tasks.calculate_cohort import insert_cohort_from_insight_filter
from posthog.test.base import (
APIBaseTest,
Expand Down Expand Up @@ -273,13 +273,13 @@ def test_funnel_correlation_on_event_with_recordings(self):
event_uuid="21111111-1111-1111-1111-111111111111",
)

create_session_recording_events(
self.team.pk,
datetime(2021, 1, 2, 0, 0, 0),
"user_1",
"s2",
use_recording_table=False,
use_replay_table=True,
timestamp = datetime(2021, 1, 2, 0, 0, 0)
produce_replay_summary(
team_id=self.team.pk,
session_id="s2",
distinct_id="user_1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

# Success filter
Expand Down Expand Up @@ -371,13 +371,13 @@ def test_funnel_correlation_on_properties_with_recordings(self):
event_uuid="21111111-1111-1111-1111-111111111111",
)

create_session_recording_events(
self.team.pk,
datetime(2021, 1, 2, 0, 0, 0),
"user_1",
"s2",
use_recording_table=False,
use_replay_table=True,
timestamp = datetime(2021, 1, 2, 0, 0, 0)
produce_replay_summary(
team_id=self.team.pk,
session_id="s2",
distinct_id="user_1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

# Success filter
Expand Down Expand Up @@ -444,13 +444,13 @@ def test_strict_funnel_correlation_with_recordings(self):
properties={"$session_id": "s2", "$window_id": "w2"},
event_uuid="41111111-1111-1111-1111-111111111111",
)
create_session_recording_events(
self.team.pk,
datetime(2021, 1, 2, 0, 0, 0),
"user_1",
"s2",
use_recording_table=False,
use_replay_table=True,
timestamp = datetime(2021, 1, 2, 0, 0, 0)
produce_replay_summary(
team_id=self.team.pk,
session_id="s2",
distinct_id="user_1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

# Second user with strict funnel drop off, but completed the step events for a normal funnel
Expand Down Expand Up @@ -479,13 +479,13 @@ def test_strict_funnel_correlation_with_recordings(self):
properties={"$session_id": "s3", "$window_id": "w2"},
event_uuid="71111111-1111-1111-1111-111111111111",
)
create_session_recording_events(
self.team.pk,
datetime(2021, 1, 2, 0, 0, 0),
"user_2",
"s3",
use_recording_table=False,
use_replay_table=True,
timestamp1 = datetime(2021, 1, 2, 0, 0, 0)
produce_replay_summary(
team_id=self.team.pk,
session_id="s3",
distinct_id="user_2",
first_timestamp=timestamp1,
last_timestamp=timestamp1,
)

# Success filter
Expand Down
64 changes: 30 additions & 34 deletions ee/clickhouse/queries/test/test_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from posthog.models.instance_setting import override_instance_config
from posthog.queries.paths import Paths, PathsActors
from posthog.queries.paths.paths_event_query import PathEventQuery
from posthog.session_recordings.test.test_factory import create_session_recording_events
from posthog.session_recordings.queries.test.session_replay_sql import produce_replay_summary
from posthog.test.base import (
APIBaseTest,
ClickhouseTestMixin,
Expand Down Expand Up @@ -3181,23 +3181,21 @@ def test_recording(self):
event_uuid="41111111-1111-1111-1111-111111111111",
),
]
create_session_recording_events(
self.team.pk,
timezone.now(),
"p1",
"s1",
window_id="w1",
use_recording_table=False,
use_replay_table=True,
)
create_session_recording_events(
self.team.pk,
timezone.now(),
"p1",
"s3",
window_id="w3",
use_recording_table=False,
use_replay_table=True,
timestamp = timezone.now()
produce_replay_summary(
team_id=self.team.pk,
session_id="s1",
distinct_id="p1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)
timestamp1 = timezone.now()
produce_replay_summary(
team_id=self.team.pk,
session_id="s3",
distinct_id="p1",
first_timestamp=timestamp1,
last_timestamp=timestamp1,
)

# User with path matches, but no recordings
Expand Down Expand Up @@ -3328,14 +3326,13 @@ def test_recording_with_start_and_end(self):
event_uuid="31111111-1111-1111-1111-111111111111",
),

create_session_recording_events(
self.team.pk,
timezone.now(),
"p1",
"s1",
window_id="w1",
use_recording_table=False,
use_replay_table=True,
timestamp = timezone.now()
produce_replay_summary(
team_id=self.team.pk,
session_id="s1",
distinct_id="p1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

filter = PathFilter(
Expand Down Expand Up @@ -3400,14 +3397,13 @@ def test_recording_for_dropoff(self):
event_uuid="31111111-1111-1111-1111-111111111111",
),

create_session_recording_events(
self.team.pk,
timezone.now(),
"p1",
"s1",
window_id="w1",
use_recording_table=False,
use_replay_table=True,
timestamp = timezone.now()
produce_replay_summary(
team_id=self.team.pk,
session_id="s1",
distinct_id="p1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

# No matching events for dropoff
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'
/* user_id:125 celery:posthog.celery.sync_insight_caching_state */
/* user_id:128 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
64 changes: 15 additions & 49 deletions ee/session_recordings/session_recording_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
from sentry_sdk import capture_exception, capture_message

from posthog import settings
from posthog.event_usage import report_team_action
from posthog.session_recordings.models.metadata import PersistedRecordingV1
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.session_recording_helpers import compress_to_string, decompress
from posthog.session_recordings.session_recording_helpers import decompress
from posthog.storage import object_storage

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -55,13 +54,15 @@ def save_recording_with_new_content(recording: SessionRecording, content: str) -
return new_path


class InvalidRecordingForPersisting(Exception):
pass


def persist_recording(recording_id: str, team_id: int) -> None:
"""Persist a recording to the S3"""

logger.info("Persisting recording: init", recording_id=recording_id, team_id=team_id)

start_time = timezone.now()

if not settings.OBJECT_STORAGE_ENABLED:
return

Expand Down Expand Up @@ -91,10 +92,10 @@ def persist_recording(recording_id: str, team_id: int) -> None:
recording.save()
return

target_prefix = recording.build_object_storage_path("2023-08-01")
source_prefix = recording.build_blob_ingestion_storage_path()
# if snapshots are already in blob storage, then we can just copy the files between buckets
with SNAPSHOT_PERSIST_TIME_HISTOGRAM.labels(source="S3").time():
target_prefix = recording.build_object_storage_path("2023-08-01")
source_prefix = recording.build_blob_ingestion_storage_path()
copied_count = object_storage.copy_objects(source_prefix, target_prefix)

if copied_count > 0:
Expand All @@ -104,49 +105,14 @@ def persist_recording(recording_id: str, team_id: int) -> None:
logger.info("Persisting recording: done!", recording_id=recording_id, team_id=team_id, source="s3")
return
else:
# TODO this can be removed when we're happy with the new storage version
with SNAPSHOT_PERSIST_TIME_HISTOGRAM.labels(source="ClickHouse").time():
recording.load_snapshots(100_000) # TODO: Paginate rather than hardcode a limit

content: PersistedRecordingV1 = {
"version": "2022-12-22",
"distinct_id": recording.distinct_id,
"snapshot_data_by_window_id": recording.snapshot_data_by_window_id,
}

string_content = json.dumps(content, default=str)
string_content = compress_to_string(string_content)

logger.info("Persisting recording: writing to S3...", recording_id=recording_id, team_id=team_id)

try:
object_path = recording.build_object_storage_path("2022-12-22")
object_storage.write(object_path, string_content.encode("utf-8"))
recording.object_storage_path = object_path
recording.save()

report_team_action(
recording.team,
"session recording persisted",
{"total_time_ms": (timezone.now() - start_time).total_seconds() * 1000},
)

logger.info(
"Persisting recording: done!", recording_id=recording_id, team_id=team_id, source="ClickHouse"
)
except object_storage.ObjectStorageError as ose:
capture_exception(ose)
report_team_action(
recording.team,
"session recording persist failed",
{"total_time_ms": (timezone.now() - start_time).total_seconds() * 1000, "error": str(ose)},
)
logger.error(
"session_recording.object-storage-error",
recording_id=recording.session_id,
exception=ose,
exc_info=True,
)
logger.error(
"No snapshots found to copy in S3 when persisting a recording",
recording_id=recording_id,
team_id=team_id,
target_prefix=target_prefix,
source_prefix=source_prefix,
)
raise InvalidRecordingForPersisting("Could not persist recording: " + recording_id)


def load_persisted_recording(recording: SessionRecording) -> Optional[PersistedRecordingV1]:
Expand Down
Loading

0 comments on commit 31c1cdf

Please sign in to comment.