Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: yeet CH recordings ingestion #17572

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
14c7942
feat: step 1 - remove recordings-ingestion capability from plugin server
pauldambra Sep 21, 2023
c715168
fix
pauldambra Sep 21, 2023
7d3baef
wat
pauldambra Sep 21, 2023
8fdb8bb
wat
pauldambra Sep 21, 2023
42ffeda
wat
pauldambra Sep 21, 2023
22e23c3
fi
pauldambra Sep 21, 2023
c761a2c
fy
pauldambra Sep 21, 2023
911e4b7
fo
pauldambra Sep 21, 2023
1e92dd2
fum
pauldambra Sep 21, 2023
a80784c
i
pauldambra Sep 21, 2023
8b309ab
smell
pauldambra Sep 21, 2023
7737077
the
pauldambra Sep 21, 2023
4ec0c9d
blood
pauldambra Sep 21, 2023
b2ce960
of
pauldambra Sep 21, 2023
c69487f
an
pauldambra Sep 21, 2023
92004c9
english
pauldambra Sep 21, 2023
1f6bad1
man
pauldambra Sep 21, 2023
9a4b1ad
be
pauldambra Sep 21, 2023
95e594f
Update query snapshots
github-actions[bot] Sep 21, 2023
36b59c8
he
pauldambra Sep 22, 2023
2fe9fd5
alive
pauldambra Sep 22, 2023
ef34e52
a little front end deleting
pauldambra Sep 22, 2023
93da94a
a little front end deleting
pauldambra Sep 22, 2023
4b7f773
fix one test
pauldambra Sep 22, 2023
b0ecb4a
fix tests
pauldambra Sep 22, 2023
3a9f62c
fix
pauldambra Sep 23, 2023
1ceb815
fi
pauldambra Sep 26, 2023
a58ad16
Update UI snapshots for `chromium` (1)
github-actions[bot] Sep 26, 2023
e9ff16b
Update UI snapshots for `chromium` (1)
github-actions[bot] Sep 26, 2023
1b64f72
Merge branch 'master' into feat/remove-legacy-ingestion-capability-fr…
pauldambra Sep 27, 2023
ffd20cc
first pass remove v1 snapshots
pauldambra Sep 27, 2023
f25a807
delete stuff
pauldambra Sep 27, 2023
bd316ed
remove recording debug info from UI
pauldambra Sep 27, 2023
3ef80d4
even more
pauldambra Sep 27, 2023
83663ec
Merge branch 'master' into feat/remove-legacy-ingestion-capability-fr…
pauldambra Sep 28, 2023
07b76de
better error when persisting
pauldambra Sep 28, 2023
d6e9cda
Merge branch 'master' into feat/remove-legacy-ingestion-capability-fr…
pauldambra Oct 3, 2023
b3cb010
Update query snapshots
github-actions[bot] Oct 3, 2023
706ef70
Merge branch 'master' into feat/remove-legacy-ingestion-capability-fr…
pauldambra Oct 4, 2023
b79dba4
there is a voided promise as part of flush when empty, let's wait to …
pauldambra Oct 4, 2023
7f54b72
Merge branch 'master' into feat/remove-legacy-ingestion-capability-fr…
pauldambra Oct 5, 2023
14f10fb
Merge branch 'master' into feat/remove-legacy-ingestion-capability-fr…
pauldambra Oct 11, 2023
dae426b
fix
pauldambra Oct 11, 2023
27a166e
fix
pauldambra Oct 11, 2023
392cbdc
fix
pauldambra Oct 11, 2023
26a7640
fix
pauldambra Oct 11, 2023
b1e1fe7
snapshot
pauldambra Oct 11, 2023
c6d541e
Merge branch 'master' into feat/remove-legacy-ingestion-capability-fr…
pauldambra Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from posthog.constants import INSIGHT_FUNNELS
from posthog.models import Cohort, Filter
from posthog.models.person import Person
from posthog.session_recordings.test.test_factory import create_session_recording_events
from posthog.session_recordings.queries.test.session_replay_sql import produce_replay_summary
from posthog.tasks.calculate_cohort import insert_cohort_from_insight_filter
from posthog.test.base import (
APIBaseTest,
Expand Down Expand Up @@ -273,13 +273,13 @@ def test_funnel_correlation_on_event_with_recordings(self):
event_uuid="21111111-1111-1111-1111-111111111111",
)

create_session_recording_events(
self.team.pk,
datetime(2021, 1, 2, 0, 0, 0),
"user_1",
"s2",
use_recording_table=False,
use_replay_table=True,
timestamp = datetime(2021, 1, 2, 0, 0, 0)
produce_replay_summary(
team_id=self.team.pk,
session_id="s2",
distinct_id="user_1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

# Success filter
Expand Down Expand Up @@ -371,13 +371,13 @@ def test_funnel_correlation_on_properties_with_recordings(self):
event_uuid="21111111-1111-1111-1111-111111111111",
)

create_session_recording_events(
self.team.pk,
datetime(2021, 1, 2, 0, 0, 0),
"user_1",
"s2",
use_recording_table=False,
use_replay_table=True,
timestamp = datetime(2021, 1, 2, 0, 0, 0)
produce_replay_summary(
team_id=self.team.pk,
session_id="s2",
distinct_id="user_1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

# Success filter
Expand Down Expand Up @@ -444,13 +444,13 @@ def test_strict_funnel_correlation_with_recordings(self):
properties={"$session_id": "s2", "$window_id": "w2"},
event_uuid="41111111-1111-1111-1111-111111111111",
)
create_session_recording_events(
self.team.pk,
datetime(2021, 1, 2, 0, 0, 0),
"user_1",
"s2",
use_recording_table=False,
use_replay_table=True,
timestamp = datetime(2021, 1, 2, 0, 0, 0)
produce_replay_summary(
team_id=self.team.pk,
session_id="s2",
distinct_id="user_1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

# Second user with strict funnel drop off, but completed the step events for a normal funnel
Expand Down Expand Up @@ -479,13 +479,13 @@ def test_strict_funnel_correlation_with_recordings(self):
properties={"$session_id": "s3", "$window_id": "w2"},
event_uuid="71111111-1111-1111-1111-111111111111",
)
create_session_recording_events(
self.team.pk,
datetime(2021, 1, 2, 0, 0, 0),
"user_2",
"s3",
use_recording_table=False,
use_replay_table=True,
timestamp1 = datetime(2021, 1, 2, 0, 0, 0)
produce_replay_summary(
team_id=self.team.pk,
session_id="s3",
distinct_id="user_2",
first_timestamp=timestamp1,
last_timestamp=timestamp1,
)

# Success filter
Expand Down
64 changes: 30 additions & 34 deletions ee/clickhouse/queries/test/test_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from posthog.models.instance_setting import override_instance_config
from posthog.queries.paths import Paths, PathsActors
from posthog.queries.paths.paths_event_query import PathEventQuery
from posthog.session_recordings.test.test_factory import create_session_recording_events
from posthog.session_recordings.queries.test.session_replay_sql import produce_replay_summary
from posthog.test.base import (
APIBaseTest,
ClickhouseTestMixin,
Expand Down Expand Up @@ -3181,23 +3181,21 @@ def test_recording(self):
event_uuid="41111111-1111-1111-1111-111111111111",
),
]
create_session_recording_events(
self.team.pk,
timezone.now(),
"p1",
"s1",
window_id="w1",
use_recording_table=False,
use_replay_table=True,
)
create_session_recording_events(
self.team.pk,
timezone.now(),
"p1",
"s3",
window_id="w3",
use_recording_table=False,
use_replay_table=True,
timestamp = timezone.now()
produce_replay_summary(
team_id=self.team.pk,
session_id="s1",
distinct_id="p1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)
timestamp1 = timezone.now()
produce_replay_summary(
team_id=self.team.pk,
session_id="s3",
distinct_id="p1",
first_timestamp=timestamp1,
last_timestamp=timestamp1,
)

# User with path matches, but no recordings
Expand Down Expand Up @@ -3328,14 +3326,13 @@ def test_recording_with_start_and_end(self):
event_uuid="31111111-1111-1111-1111-111111111111",
),

create_session_recording_events(
self.team.pk,
timezone.now(),
"p1",
"s1",
window_id="w1",
use_recording_table=False,
use_replay_table=True,
timestamp = timezone.now()
produce_replay_summary(
team_id=self.team.pk,
session_id="s1",
distinct_id="p1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

filter = PathFilter(
Expand Down Expand Up @@ -3400,14 +3397,13 @@ def test_recording_for_dropoff(self):
event_uuid="31111111-1111-1111-1111-111111111111",
),

create_session_recording_events(
self.team.pk,
timezone.now(),
"p1",
"s1",
window_id="w1",
use_recording_table=False,
use_replay_table=True,
timestamp = timezone.now()
produce_replay_summary(
team_id=self.team.pk,
session_id="s1",
distinct_id="p1",
first_timestamp=timestamp,
last_timestamp=timestamp,
)

# No matching events for dropoff
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'
/* user_id:125 celery:posthog.celery.sync_insight_caching_state */
/* user_id:128 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
64 changes: 15 additions & 49 deletions ee/session_recordings/session_recording_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
from sentry_sdk import capture_exception, capture_message

from posthog import settings
from posthog.event_usage import report_team_action
from posthog.session_recordings.models.metadata import PersistedRecordingV1
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.session_recording_helpers import compress_to_string, decompress
from posthog.session_recordings.session_recording_helpers import decompress
from posthog.storage import object_storage

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -55,13 +54,15 @@ def save_recording_with_new_content(recording: SessionRecording, content: str) -
return new_path


class InvalidRecordingForPersisting(Exception):
pass


def persist_recording(recording_id: str, team_id: int) -> None:
"""Persist a recording to the S3"""

logger.info("Persisting recording: init", recording_id=recording_id, team_id=team_id)

start_time = timezone.now()

if not settings.OBJECT_STORAGE_ENABLED:
return

Expand Down Expand Up @@ -91,10 +92,10 @@ def persist_recording(recording_id: str, team_id: int) -> None:
recording.save()
return

target_prefix = recording.build_object_storage_path("2023-08-01")
source_prefix = recording.build_blob_ingestion_storage_path()
# if snapshots are already in blob storage, then we can just copy the files between buckets
with SNAPSHOT_PERSIST_TIME_HISTOGRAM.labels(source="S3").time():
target_prefix = recording.build_object_storage_path("2023-08-01")
source_prefix = recording.build_blob_ingestion_storage_path()
copied_count = object_storage.copy_objects(source_prefix, target_prefix)

if copied_count > 0:
Expand All @@ -104,49 +105,14 @@ def persist_recording(recording_id: str, team_id: int) -> None:
logger.info("Persisting recording: done!", recording_id=recording_id, team_id=team_id, source="s3")
return
else:
# TODO this can be removed when we're happy with the new storage version
with SNAPSHOT_PERSIST_TIME_HISTOGRAM.labels(source="ClickHouse").time():
recording.load_snapshots(100_000) # TODO: Paginate rather than hardcode a limit

content: PersistedRecordingV1 = {
"version": "2022-12-22",
"distinct_id": recording.distinct_id,
"snapshot_data_by_window_id": recording.snapshot_data_by_window_id,
}

string_content = json.dumps(content, default=str)
string_content = compress_to_string(string_content)

logger.info("Persisting recording: writing to S3...", recording_id=recording_id, team_id=team_id)

try:
object_path = recording.build_object_storage_path("2022-12-22")
object_storage.write(object_path, string_content.encode("utf-8"))
recording.object_storage_path = object_path
recording.save()

report_team_action(
recording.team,
"session recording persisted",
{"total_time_ms": (timezone.now() - start_time).total_seconds() * 1000},
)

logger.info(
"Persisting recording: done!", recording_id=recording_id, team_id=team_id, source="ClickHouse"
)
except object_storage.ObjectStorageError as ose:
capture_exception(ose)
report_team_action(
recording.team,
"session recording persist failed",
{"total_time_ms": (timezone.now() - start_time).total_seconds() * 1000, "error": str(ose)},
)
logger.error(
"session_recording.object-storage-error",
recording_id=recording.session_id,
exception=ose,
exc_info=True,
)
logger.error(
"No snapshots found to copy in S3 when persisting a recording",
recording_id=recording_id,
team_id=team_id,
target_prefix=target_prefix,
source_prefix=source_prefix,
)
raise InvalidRecordingForPersisting("Could not persist recording: " + recording_id)


def load_persisted_recording(recording: SessionRecording) -> Optional[PersistedRecordingV1]:
Expand Down
Loading