diff --git a/ee/session_recordings/session_recording_extensions.py b/ee/session_recordings/session_recording_extensions.py index f5d48fb8b285f..cb67977b0c3ba 100644 --- a/ee/session_recordings/session_recording_extensions.py +++ b/ee/session_recordings/session_recording_extensions.py @@ -1,8 +1,8 @@ # EE extended functions for SessionRecording model - +import gzip import json -from datetime import timedelta -from typing import Optional +from datetime import timedelta, datetime +from typing import Optional, cast import structlog from django.utils import timezone @@ -26,6 +26,34 @@ MINIMUM_AGE_FOR_RECORDING = timedelta(hours=24) +# TODO rename this... +def save_recording_with_new_content(recording: SessionRecording, content: str) -> str: + if not settings.OBJECT_STORAGE_ENABLED: + return "" + + logger.info( + "re-saving recording file into 2023-08-01 LTS storage format", + recording_id=recording.session_id, + team_id=recording.team_id, + ) + + target_prefix = recording.build_object_storage_path("2023-08-01") + + start = int(cast(datetime, recording.start_time).timestamp() * 1000) + end = int(cast(datetime, recording.end_time).timestamp() * 1000) + new_path = f"{target_prefix}/{start}-{end}" + + zipped_content = gzip.compress(content.encode("utf-8")) + object_storage.write( + new_path, zipped_content, extras={"ContentType": "application/json", "ContentEncoding": "gzip"} + ) + + recording.storage_version = "2023-08-01" + recording.object_storage_path = target_prefix + recording.save() + + return new_path + def persist_recording(recording_id: str, team_id: int) -> None: """Persist a recording to the S3""" diff --git a/ee/session_recordings/test/test_session_recording_extensions.py b/ee/session_recordings/test/test_session_recording_extensions.py index e201e71a02563..86b5b5ba2134d 100644 --- a/ee/session_recordings/test/test_session_recording_extensions.py +++ b/ee/session_recordings/test/test_session_recording_extensions.py @@ -1,13 +1,18 @@ +import gzip from datetime import timedelta, datetime, timezone from secrets import token_urlsafe -from unittest.mock import patch +from unittest.mock import patch, MagicMock from uuid import uuid4 from boto3 import resource from botocore.config import Config from freezegun import freeze_time -from ee.session_recordings.session_recording_extensions import load_persisted_recording, persist_recording +from ee.session_recordings.session_recording_extensions import ( + load_persisted_recording, + persist_recording, + save_recording_with_new_content, +) from posthog.session_recordings.models.session_recording import SessionRecording from posthog.session_recordings.models.session_recording_playlist import SessionRecordingPlaylist from posthog.session_recordings.models.session_recording_playlist_item import SessionRecordingPlaylistItem @@ -232,3 +237,37 @@ def test_persist_tracks_correct_to_posthog(self, mock_capture): "total_time_ms", ]: assert mock_capture.call_args_list[0][0][2][x] > 0 + + @patch("ee.session_recordings.session_recording_extensions.object_storage.write") + def test_can_save_content_to_new_location(self, mock_write: MagicMock): + with self.settings(OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER=TEST_BUCKET): + session_id = f"{uuid4()}" + + recording = SessionRecording.objects.create( + team=self.team, + session_id=session_id, + start_time=datetime.fromtimestamp(12345), + end_time=datetime.fromtimestamp(12346), + object_storage_path="some_starting_value", + # None, but that would trigger the persistence behavior, and we don't want that + storage_version="None", + ) + + new_key = save_recording_with_new_content(recording, "the new content") + + recording.refresh_from_db() + + expected_path = f"session_recordings_lts/team_id/{self.team.pk}/session_id/{recording.session_id}/data" + assert new_key == f"{expected_path}/12345000-12346000" + + assert recording.object_storage_path == expected_path + assert recording.storage_version == "2023-08-01" + + mock_write.assert_called_with( + f"{expected_path}/12345000-12346000", + gzip.compress("the new content".encode("utf-8")), + extras={ + "ContentEncoding": "gzip", + "ContentType": "application/json", + }, + ) diff --git a/posthog/session_recordings/session_recording_api.py b/posthog/session_recordings/session_recording_api.py index ceae99a6d4d77..cf3505c556752 100644 --- a/posthog/session_recordings/session_recording_api.py +++ b/posthog/session_recordings/session_recording_api.py @@ -40,6 +40,7 @@ from posthog.session_recordings.queries.session_recording_properties import SessionRecordingProperties from posthog.rate_limit import ClickHouseBurstRateThrottle, ClickHouseSustainedRateThrottle from posthog.session_recordings.realtime_snapshots import get_realtime_snapshots +from posthog.session_recordings.snapshots.convert_legacy_snapshots import convert_original_version_lts_recording from posthog.storage import object_storage from posthog.utils import format_query_params_absolute_url from prometheus_client import Counter @@ -351,7 +352,8 @@ def _snapshots_v2(self, request: request.Request): if recording.storage_version == "2023-08-01": file_key = f"{recording.object_storage_path}/{blob_key}" else: - file_key = recording.object_storage_path + # this is a legacy recording, we need to load the file from the old path + file_key = convert_original_version_lts_recording(recording) else: file_key = ( f"session_recordings/team_id/{self.team.pk}/session_id/{recording.session_id}/data/{blob_key}" diff --git a/posthog/session_recordings/snapshots/convert_legacy_snapshots.py b/posthog/session_recordings/snapshots/convert_legacy_snapshots.py new file mode 100644 index 0000000000000..a60e1b74717e0 --- /dev/null +++ b/posthog/session_recordings/snapshots/convert_legacy_snapshots.py @@ -0,0 +1,82 @@ +import json +from typing import Dict + +import structlog +from prometheus_client import Histogram + +from posthog.session_recordings.models.session_recording import SessionRecording +from posthog.session_recordings.session_recording_helpers import decompress +from posthog.storage import object_storage + +logger = structlog.get_logger(__name__) + +RECORDING_CONVERSION_TIME_HISTOGRAM = Histogram( + "recording_conversion_time_seconds", + "We convert legacy recordings from LTS format to the latest format, how long does that take?", +) + + +def _save_converted_content_back_to_storage(converted_content: str, recording: SessionRecording) -> str: + try: + from ee.session_recordings.session_recording_extensions import save_recording_with_new_content + + return save_recording_with_new_content(recording, converted_content) + except ImportError: + # not running in EE context... shouldn't get here + logger.error("attempted_to_save_converted_content_back_to_storage_in_non_ee_context", recording_id=recording.id) + return "" + + +def convert_original_version_lts_recording(recording: SessionRecording) -> str: + # the original version of the LTS recording was a single file + # its contents were gzipped and then base64 encoded. + # we can't simply stream it back to the requester + + with RECORDING_CONVERSION_TIME_HISTOGRAM.time(): + content = object_storage.read(str(recording.object_storage_path)) + if not content: + # TODO and if there's no content is this right? + logger.error( + "attempted_to_convert_original_version_lts_recording_with_no_content", + recording_id=recording.id, + object_storage_path=recording.object_storage_path, + ) + return "" + + converted_content = _prepare_legacy_content(content) + + original_path = recording.object_storage_path + new_file_key = _save_converted_content_back_to_storage(converted_content, recording) + + # TODO we should delete the old recording from storage here, but might not have permissions + object_storage.tag(str(original_path), {"converted": "true"}) + + return new_file_key + + +def _prepare_legacy_content(content: str) -> str: + # historically we stored the recording as a single file with a base64 encoded gzipped json string + # using utf-16 encoding, this `decompress` method unwinds that back to a json string + decoded_content = decompress(content) + json_content = json.loads(decoded_content) + return _convert_legacy_format_from_lts_storage(json_content) + + +def _convert_legacy_format_from_lts_storage(lts_formatted_data: Dict) -> str: + """ + The latest version is JSONL formatted data. + Each line is json containing a window_id and a data array. + This is equivalent to the LTS format snapshot_data_by_window_id property dumped as a single line. + """ + if "snapshot_data_by_window_id" not in lts_formatted_data: + raise ValueError("Invalid LTS format: missing snapshot_data_by_window_id") + + if "version" not in lts_formatted_data or lts_formatted_data["version"] != "2022-12-22": + raise ValueError(f"Invalid LTS format: version is {lts_formatted_data.get('version', 'missing')}") + + snapshot_data_by_window_id = lts_formatted_data["snapshot_data_by_window_id"] + converted = "" + for window_id, data in snapshot_data_by_window_id.items(): + converted += json.dumps({"window_id": window_id, "data": data}, separators=(",", ":")) + "\n" + + return converted.rstrip("\n") diff --git a/posthog/session_recordings/test/test_lts_session_recordings.py b/posthog/session_recordings/test/test_lts_session_recordings.py index 7f8c82abe27bb..b98286e045f25 100644 --- a/posthog/session_recordings/test/test_lts_session_recordings.py +++ b/posthog/session_recordings/test/test_lts_session_recordings.py @@ -6,6 +6,10 @@ from posthog.session_recordings.models.session_recording import SessionRecording from posthog.test.base import APIBaseTest, ClickhouseTestMixin, QueryMatchingTest +# this is the urf-16 surrogate pass encoded, gzipped and base64 encoded version of the above +# see: https://github.com/PostHog/posthog/blob/8ff764bb573c6a98368b2ae3503890551a1c3842/posthog/session_recordings/session_recording_helpers.py#L277 +legacy_compressed_original = "H4sIAMHMCWUC/9VWbXPSQBDen+L0k44FkvBS4JPV0rG+1Pex08owKQmQSgGTUMRO/7r67HOXQBFa/dQ6N3fJ3e3tPrv73Ca/fl7KllxIKLEkEslYRpg30T1x0D0piMtR37dkGz2AXCIpxpF08ezgLbCnprKD/kOO5bvsy1DeoaXyTPZw4lBa8lF25UjeygQSLWh8KVWseXIGy8dYPcDskzyXBuSeQtc+pPvWbgJ7PmQSGUBa7QaYp+gdOZU5xhkxBdidLaFSD12pQ5sPn3oYXejvorsYfSnDOwf7PiSq9LOGeQPNQ1xqjEDAnSpmZalwpUb5HiQLa7WrXhejRwwnRJEC5QQ6daVmY2k8yHBOELMpPI7yPMRoM5w5lRK0an4SjEOsPIF+E5kJNMyxNsZz4bPKaGaHVtMMuzH1bhNLjHnXojmhpSLkfSII5YE8RJxTNI14E9ZLjP4E/ibErAzoYjbByTHsFvE25p7mp4+54j3HuWV59WIACyP5irMvEM3P8gH82EO+d3HmjNaqiKeOGvU64vko516RMYiBUH2d57pD6vWx17836Cvki5OjP5LX8grsNrjeA+c3xlotFKHzblG7QFzms4w3E/P2Bn4pX76gt6DT+KA9gAd6AyJyT2fxNR91d4w1s64MnOM9oud657SpVrV7hWZ4GsGf0PpzDixbxFgDL7RG6X3UUVmio55avWuVtXdtQBQ9ezvWx31zfDNtBcx8ViblnSIdYb3Eu5XaiprY/M9Yk1SX8aFCfm/Teoi9PlHoXp3V5m8j4MF35VwDM3dtBLy1ERiRQ2E+Xz7h8ITyRrMZoHob2WRDPXMpPyLCcCmm56w/hkVTVLEhGXmQfzGy2m5uskZwdS+r494NnqWM/+EN1n3mN4a2U+BIc09MpTR1w5wLWSOVf+1r9l2bD+VrxKxorXwDBvWgK7SZyypvz84di29s8+b8A7MXeXXJhrY9aU7E/Ab6/OJ1iFqfC633/6t4ae/En+juGttqlLOoLv8bGRQV/hs5qGAeq6eiaeJtB7WizlyauvaYY5Oj0b+asdt1m++K7hf5V+Zs1B0x/1kNurDae2SscvUqZ1II3mdVa/lu/8/e319O3Z4XveO/AS7WeNOWCwAA" + class TestSessionRecordings(APIBaseTest, ClickhouseTestMixin, QueryMatchingTest): def setUp(self): @@ -162,10 +166,19 @@ def list_objects_func(path: str) -> List[str]: assert response_data == "the file contents" @patch("posthog.session_recordings.session_recording_api.requests.get") + @patch("posthog.session_recordings.session_recording_api.object_storage.tag") + @patch("posthog.session_recordings.session_recording_api.object_storage.write") + @patch("posthog.session_recordings.session_recording_api.object_storage.read") @patch("posthog.session_recordings.session_recording_api.object_storage.get_presigned_url") @patch("posthog.session_recordings.session_recording_api.object_storage.list_objects") def test_original_version_stored_snapshots_can_be_loaded_without_upversion( - self, mock_list_objects: MagicMock, mock_get_presigned_url: MagicMock, mock_requests: MagicMock + self, + mock_list_objects: MagicMock, + mock_get_presigned_url: MagicMock, + mock_read: MagicMock, + mock_write: MagicMock, + mock_tag: MagicMock, + mock_requests: MagicMock, ) -> None: session_id = str(uuid.uuid4()) lts_storage_path = "purposefully/not/what/we/would/calculate/to/prove/this/is/used" @@ -175,6 +188,7 @@ def list_objects_func(path: str) -> List[str]: mock_list_objects.side_effect = list_objects_func mock_get_presigned_url.return_value = "https://example.com" + mock_read.return_value = legacy_compressed_original mock_response = Mock() mock_response.raise_for_status.return_value = None @@ -184,29 +198,44 @@ def list_objects_func(path: str) -> List[str]: mock_requests.return_value.__enter__.return_value = mock_response mock_requests.return_value.__exit__.return_value = None - SessionRecording.objects.create( + recording = SessionRecording.objects.create( team=self.team, session_id=session_id, # to avoid auto-persistence kicking in when this is None storage_version="not a know version", object_storage_path=lts_storage_path, + start_time="1970-01-01T00:00:00.001000Z", + end_time="1970-01-01T00:00:00.002000Z", ) + # something in the setup is triggering a path that saves the recording without the provided path so + recording.object_storage_path = lts_storage_path + recording.save() query_parameters = [ "source=blob", "version=2", f"blob_key={lts_storage_path}", ] + + mock_write.reset_mock() # reset the mock to remove setup calls response = self.client.get( f"/api/projects/{self.team.id}/session_recordings/{session_id}/snapshots?{'&'.join(query_parameters)}" ) - response_data = response.content.decode("utf-8") assert mock_list_objects.call_args_list == [] + expected_path = f"session_recordings_lts/team_id/{self.team.pk}/session_id/{session_id}/data/1-2" + + # the content was saved to the new location + assert mock_write.call_args_list[0][0][0] == expected_path + # the original location was tagged + assert mock_tag.call_args_list == [call(lts_storage_path, {"converted": "true"})] + + # the original saved path isn't loaded for reading the content assert mock_get_presigned_url.call_args_list == [ - call(lts_storage_path, expiration=60), + call(expected_path, expiration=60), ] - # TODO this wouldn't actually work as OG version files are stored in a different way + # and the mock content is returned + response_data = response.content.decode("utf-8") assert response_data == "the file contents" diff --git a/posthog/storage/object_storage.py b/posthog/storage/object_storage.py index 187a9c85e2905..79ea0c90ceb19 100644 --- a/posthog/storage/object_storage.py +++ b/posthog/storage/object_storage.py @@ -1,5 +1,5 @@ import abc -from typing import Optional, Union, List +from typing import Optional, Union, List, Dict import structlog from boto3 import client @@ -38,7 +38,11 @@ def read_bytes(self, bucket: str, key: str) -> Optional[bytes]: pass @abc.abstractmethod - def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None: + def tag(self, bucket: str, key: str, tags: Dict[str, str]) -> None: + pass + + @abc.abstractmethod + def write(self, bucket: str, key: str, content: Union[str, bytes], extras: Dict | None) -> None: pass @abc.abstractmethod @@ -65,7 +69,10 @@ def read(self, bucket: str, key: str) -> Optional[str]: def read_bytes(self, bucket: str, key: str) -> Optional[bytes]: pass - def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None: + def tag(self, bucket: str, key: str, tags: Dict[str, str]) -> None: + pass + + def write(self, bucket: str, key: str, content: Union[str, bytes], extras: Dict | None) -> None: pass def copy_objects(self, bucket: str, source_prefix: str, target_prefix: str) -> int | None: @@ -125,10 +132,22 @@ def read_bytes(self, bucket: str, key: str) -> Optional[bytes]: capture_exception(e) raise ObjectStorageError("read failed") from e - def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None: + def tag(self, bucket: str, key: str, tags: Dict[str, str]) -> None: + try: + self.aws_client.put_object_tagging( + Bucket=bucket, + Key=key, + Tagging={"TagSet": [{"Key": k, "Value": v} for k, v in tags.items()]}, + ) + except Exception as e: + logger.error("object_storage.tag_failed", bucket=bucket, file_name=key, error=e) + capture_exception(e) + raise ObjectStorageError("tag failed") from e + + def write(self, bucket: str, key: str, content: Union[str, bytes], extras: Dict | None) -> None: s3_response = {} try: - s3_response = self.aws_client.put_object(Bucket=bucket, Body=content, Key=key) + s3_response = self.aws_client.put_object(Bucket=bucket, Body=content, Key=key, **(extras or {})) except Exception as e: logger.error("object_storage.write_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response) capture_exception(e) @@ -175,8 +194,14 @@ def object_storage_client() -> ObjectStorageClient: return _client -def write(file_name: str, content: Union[str, bytes]) -> None: - return object_storage_client().write(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, content=content) +def write(file_name: str, content: Union[str, bytes], extras: Dict | None = None) -> None: + return object_storage_client().write( + bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, content=content, extras=extras + ) + + +def tag(file_name: str, tags: Dict[str, str]) -> None: + return object_storage_client().tag(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, tags=tags) def read(file_name: str) -> Optional[str]: