Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: LTS playback issues #17498

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions ee/session_recordings/session_recording_extensions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# EE extended functions for SessionRecording model

import gzip
import json
from datetime import timedelta
from typing import Optional
from datetime import timedelta, datetime
from typing import Optional, cast

import structlog
from django.utils import timezone
Expand All @@ -27,6 +27,33 @@
MINIMUM_AGE_FOR_RECORDING = timedelta(hours=24)


def save_recording_with_new_content(recording: SessionRecording, content: str) -> None:

if not settings.OBJECT_STORAGE_ENABLED:
return

logger.info(
"re-saving recording file into 2023-08-01 LTS storage format",
recording_id=recording.session_id,
team_id=recording.team_id,
)

target_prefix = recording.build_object_storage_path("2023-08-01")

start = int(cast(datetime, recording.start_time).timestamp() * 1000)
end = int(cast(datetime, recording.end_time).timestamp() * 1000)
new_path = f"{target_prefix}/{start}-{end}"

zipped_content = gzip.compress(content.encode("utf-8"))
object_storage.write(
new_path, zipped_content, extras={"ContentType": "application/json", "ContentEncoding": "gzip"}
)

recording.storage_version = "2023-08-01"
recording.object_storage_path = target_prefix
recording.save()


def persist_recording(recording_id: str, team_id: int) -> None:
"""Persist a recording to the S3"""

Expand Down
14 changes: 5 additions & 9 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ import { ActivityLogItem, ActivityScope } from 'lib/components/ActivityLog/human
import { ActivityLogProps } from 'lib/components/ActivityLog/ActivityLog'
import { SavedSessionRecordingPlaylistsResult } from 'scenes/session-recordings/saved-playlists/savedSessionRecordingPlaylistsLogic'
import { QuerySchema } from '~/queries/schema'
import { decompressSync, strFromU8 } from 'fflate'
import { getCurrentExporterData } from '~/exporter/exporterViewLogic'
import { encodeParams } from 'kea-router'
import { captureException } from '@sentry/react'

export const ACTIVITY_PAGE_SIZE = 20

Expand Down Expand Up @@ -1246,16 +1246,12 @@ const api = {

try {
const textLines = await response.text()

if (textLines) {
return textLines.split('\n')
}
return textLines.split('\n')
} catch (e) {
// Must be gzipped
// we no longer support any response that requires "manual" decompressing in the browser
captureException(e, { extra: { response, blobKey }, tags: { sessionRecordingId: recordingId } })
throw e
}

const contentBuffer = new Uint8Array(await response.arrayBuffer())
return strFromU8(decompressSync(contentBuffer)).trim().split('\n')
},

async updateRecording(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ const parseEncodedSnapshots = (items: (EncodedRecordingSnapshot | string)[]): Re
...d,
}))
} catch (e) {
captureException(e)
if (typeof l === 'string' && l.trim().length) {
// we don't care about the empty string in Sentry
captureException(e, { extra: { line: l } })
}
// return the empty array so that bad lines don't stop us processing good ones
return []
}
})
Expand Down
1 change: 0 additions & 1 deletion frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,6 @@ export interface SessionPlayerSnapshotData {
snapshots?: RecordingSnapshot[]
sources?: SessionRecordingSnapshotSource[]
next?: string
blob_keys?: string[]
}

export interface SessionPlayerData {
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
126 changes: 22 additions & 104 deletions posthog/session_recordings/session_recording_api.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from datetime import datetime, timedelta

import json
from typing import Any, List, Type, cast

import posthoganalytics
from dateutil import parser
import requests
from django.contrib.auth.models import AnonymousUser
from django.db.models import Count, Prefetch
from django.http import JsonResponse, HttpResponse
from django.http import JsonResponse
from drf_spectacular.utils import extend_schema
from loginas.utils import is_impersonated_session
from prometheus_client import Counter
from rest_framework import exceptions, request, serializers, viewsets, status
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
Expand All @@ -25,24 +23,26 @@
from posthog.models import Filter, User
from posthog.models.filters.session_recordings_filter import SessionRecordingsFilter
from posthog.models.person.person import PersonDistinctId
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.permissions import (
ProjectMembershipNecessaryPermissions,
SharingTokenPermission,
TeamMemberAccessPermission,
)
from posthog.rate_limit import ClickHouseBurstRateThrottle, ClickHouseSustainedRateThrottle
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.models.session_recording_event import SessionRecordingViewed

from posthog.session_recordings.queries.session_recording_list_from_replay_summary import (
SessionRecordingListFromReplaySummary,
SessionIdEventsQuery,
)
from posthog.session_recordings.queries.session_recording_properties import SessionRecordingProperties
from posthog.rate_limit import ClickHouseBurstRateThrottle, ClickHouseSustainedRateThrottle
from posthog.session_recordings.realtime_snapshots import get_realtime_snapshots
from posthog.storage import object_storage
from posthog.session_recordings.snapshots.load_snapshots import (
gather_snapshot_sources,
SnapshotLoadingContext,
load_snapshots_for,
)
from posthog.session_recordings.snapshots.serializer import SessionRecordingSnapshotsSerializer
from posthog.utils import format_query_params_absolute_url
from prometheus_client import Counter

DEFAULT_RECORDING_CHUNK_LIMIT = 20 # Should be tuned to find the best value

Expand Down Expand Up @@ -136,18 +136,6 @@ def to_representation(self, instance):
}


class SessionRecordingSnapshotsSourceSerializer(serializers.Serializer):
source = serializers.CharField() # type: ignore
start_timestamp = serializers.DateTimeField(allow_null=True)
end_timestamp = serializers.DateTimeField(allow_null=True)
blob_key = serializers.CharField(allow_null=True)


class SessionRecordingSnapshotsSerializer(serializers.Serializer):
sources = serializers.ListField(child=SessionRecordingSnapshotsSourceSerializer(), required=False)
snapshots = serializers.ListField(required=False)


class SessionRecordingViewSet(StructuredViewSetMixin, viewsets.GenericViewSet):
permission_classes = [IsAuthenticated, ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission]
throttle_classes = [ClickHouseBurstRateThrottle, ClickHouseSustainedRateThrottle]
Expand Down Expand Up @@ -252,7 +240,6 @@ def _snapshots_v2(self, request: request.Request):
"""

recording = self.get_object()
response_data = {}
source = request.GET.get("source")

event_properties = {
Expand All @@ -272,89 +259,20 @@ def _snapshots_v2(self, request: request.Request):
SNAPSHOT_SOURCE_REQUESTED.labels(source=source).inc()

if not source:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't cope with everything going on here 😅

Tried to separate the arrangement and the work...

sources: List[dict] = []
blob_prefix = recording.build_blob_ingestion_storage_path()
blob_keys = object_storage.list_objects(blob_prefix)

if not blob_keys and recording.storage_version == "2023-08-01":
blob_prefix = recording.object_storage_path
blob_keys = object_storage.list_objects(cast(str, blob_prefix))

if blob_keys:
for full_key in blob_keys:
# Keys are like 1619712000-1619712060
blob_key = full_key.replace(blob_prefix.rstrip("/") + "/", "")
time_range = [datetime.fromtimestamp(int(x) / 1000) for x in blob_key.split("-")]

sources.append(
{
"source": "blob",
"start_timestamp": time_range[0],
"end_timestamp": time_range.pop(),
"blob_key": blob_key,
}
)

might_have_realtime = True
newest_timestamp = None

if sources:
sources = sorted(sources, key=lambda x: x["start_timestamp"])
oldest_timestamp = min(sources, key=lambda k: k["start_timestamp"])["start_timestamp"]
newest_timestamp = min(sources, key=lambda k: k["end_timestamp"])["end_timestamp"]

might_have_realtime = oldest_timestamp + timedelta(hours=24) > datetime.utcnow()

if might_have_realtime:
sources.append(
{
"source": "realtime",
"start_timestamp": newest_timestamp,
"end_timestamp": None,
}
)

response_data["sources"] = sources

elif source == "realtime":
snapshots = get_realtime_snapshots(team_id=self.team.pk, session_id=str(recording.session_id)) or []

event_properties["source"] = "realtime"
event_properties["snapshots_length"] = len(snapshots)
posthoganalytics.capture(
self._distinct_id_from_request(request), "session recording snapshots v2 loaded", event_properties
)

response_data["snapshots"] = snapshots

elif source == "blob":
blob_key = request.GET.get("blob_key", "")
if not blob_key:
raise exceptions.ValidationError("Must provide a snapshot file blob key")

# very short-lived pre-signed URL
file_key = f"session_recordings/team_id/{self.team.pk}/session_id/{recording.session_id}/data/{blob_key}"
url = object_storage.get_presigned_url(file_key, expiration=60)
if not url:
raise exceptions.NotFound("Snapshot file not found")

event_properties["source"] = "blob"
event_properties["blob_key"] = blob_key
posthoganalytics.capture(
self._distinct_id_from_request(request), "session recording snapshots v2 loaded", event_properties
)

with requests.get(url=url, stream=True) as r:
r.raise_for_status()
response = HttpResponse(content=r.raw, content_type="application/json")
response["Content-Disposition"] = "inline"
return response
response_data = gather_snapshot_sources(recording)
serializer = SessionRecordingSnapshotsSerializer(response_data)
return Response(serializer.data)
else:
raise exceptions.ValidationError("Invalid source must be one of [realtime, blob]")

serializer = SessionRecordingSnapshotsSerializer(response_data)
loader_context = SnapshotLoadingContext(
team_id=self.team.pk,
blob_key=request.GET.get("blob_key"),
recording=recording,
distinct_id=self._distinct_id_from_request(request),
event_properties=event_properties,
source=source,
)

return Response(serializer.data)
return load_snapshots_for(loader_context)

@action(methods=["GET"], detail=True)
def snapshots(self, request: request.Request, **kwargs):
Expand Down
73 changes: 73 additions & 0 deletions posthog/session_recordings/snapshots/convert_legacy_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import json
from typing import Dict

import structlog
from django.http import HttpResponse
from prometheus_client import Histogram
from requests import Response

from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.session_recording_helpers import decompress
from posthog.storage import object_storage

logger = structlog.get_logger(__name__)

RECORDING_CONVERSION_TIME_HISTOGRAM = Histogram(
"recording_conversion_time_seconds",
"We convert legacy recordings from LTS format to the latest format, how long does that take?",
)


def _save_converted_content_back_to_storage(converted_content: str, recording: SessionRecording) -> None:
try:
from ee.session_recordings.session_recording_extensions import save_recording_with_new_content

save_recording_with_new_content(recording, converted_content)
except ImportError:
# not running in EE context... shouldn't get here
logger.error("attempted_to_save_converted_content_back_to_storage_in_non_ee_context", recording_id=recording.id)
return


def convert_original_version_lts_recording(r: Response, recording: SessionRecording, url: str) -> HttpResponse:
# the original version of the LTS recording was a single file
# its contents were gzipped and then base64 encoded.
# we can't simply stream it back to the requester

with RECORDING_CONVERSION_TIME_HISTOGRAM.time():
converted_content = _prepare_legacy_content(r.text)

original_path = recording.object_storage_path
_save_converted_content_back_to_storage(converted_content, recording)
# TODO we should delete the old recording from storage here, but might not have permissions
object_storage.tag(str(original_path), {"converted": "true"})

return HttpResponse(content=(converted_content.encode("utf-8")), content_type="application/json")


def _prepare_legacy_content(content: str) -> str:
# historically we stored the recording as a single file with a base64 encoded gzipped json string
# using utf-16 encoding, this `decompress` method unwinds that back to a json string
decoded_content = decompress(content)
json_content = json.loads(decoded_content)
return _convert_legacy_format_from_lts_storage(json_content)


def _convert_legacy_format_from_lts_storage(lts_formatted_data: Dict) -> str:
"""
The latest version is JSONL formatted data.
Each line is json containing a window_id and a data array.
This is equivalent to the LTS format snapshot_data_by_window_id property dumped as a single line.
"""
if "snapshot_data_by_window_id" not in lts_formatted_data:
raise ValueError("Invalid LTS format: missing snapshot_data_by_window_id")

if "version" not in lts_formatted_data or lts_formatted_data["version"] != "2022-12-22":
raise ValueError(f"Invalid LTS format: version is {lts_formatted_data.get('version', 'missing')}")

snapshot_data_by_window_id = lts_formatted_data["snapshot_data_by_window_id"]
converted = ""
for window_id, data in snapshot_data_by_window_id.items():
converted += json.dumps({"window_id": window_id, "data": data}, separators=(",", ":")) + "\n"

return converted.rstrip("\n")
Loading