Skip to content

Commit

Permalink
fix: lts loading with in-place conversion (#17561)
Browse files Browse the repository at this point in the history
* can save new content to a recording

* and in place convert

* correct test assertion
  • Loading branch information
pauldambra authored Sep 21, 2023
1 parent 8552575 commit f72d910
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 18 deletions.
34 changes: 31 additions & 3 deletions ee/session_recordings/session_recording_extensions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# EE extended functions for SessionRecording model

import gzip
import json
from datetime import timedelta
from typing import Optional
from datetime import timedelta, datetime
from typing import Optional, cast

import structlog
from django.utils import timezone
Expand All @@ -26,6 +26,34 @@

MINIMUM_AGE_FOR_RECORDING = timedelta(hours=24)

# TODO rename this...
def save_recording_with_new_content(recording: SessionRecording, content: str) -> str:
if not settings.OBJECT_STORAGE_ENABLED:
return ""

logger.info(
"re-saving recording file into 2023-08-01 LTS storage format",
recording_id=recording.session_id,
team_id=recording.team_id,
)

target_prefix = recording.build_object_storage_path("2023-08-01")

start = int(cast(datetime, recording.start_time).timestamp() * 1000)
end = int(cast(datetime, recording.end_time).timestamp() * 1000)
new_path = f"{target_prefix}/{start}-{end}"

zipped_content = gzip.compress(content.encode("utf-8"))
object_storage.write(
new_path, zipped_content, extras={"ContentType": "application/json", "ContentEncoding": "gzip"}
)

recording.storage_version = "2023-08-01"
recording.object_storage_path = target_prefix
recording.save()

return new_path


def persist_recording(recording_id: str, team_id: int) -> None:
"""Persist a recording to the S3"""
Expand Down
43 changes: 41 additions & 2 deletions ee/session_recordings/test/test_session_recording_extensions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import gzip
from datetime import timedelta, datetime, timezone
from secrets import token_urlsafe
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from uuid import uuid4

from boto3 import resource
from botocore.config import Config
from freezegun import freeze_time

from ee.session_recordings.session_recording_extensions import load_persisted_recording, persist_recording
from ee.session_recordings.session_recording_extensions import (
load_persisted_recording,
persist_recording,
save_recording_with_new_content,
)
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.models.session_recording_playlist import SessionRecordingPlaylist
from posthog.session_recordings.models.session_recording_playlist_item import SessionRecordingPlaylistItem
Expand Down Expand Up @@ -232,3 +237,37 @@ def test_persist_tracks_correct_to_posthog(self, mock_capture):
"total_time_ms",
]:
assert mock_capture.call_args_list[0][0][2][x] > 0

@patch("ee.session_recordings.session_recording_extensions.object_storage.write")
def test_can_save_content_to_new_location(self, mock_write: MagicMock):
with self.settings(OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER=TEST_BUCKET):
session_id = f"{uuid4()}"

recording = SessionRecording.objects.create(
team=self.team,
session_id=session_id,
start_time=datetime.fromtimestamp(12345),
end_time=datetime.fromtimestamp(12346),
object_storage_path="some_starting_value",
# None, but that would trigger the persistence behavior, and we don't want that
storage_version="None",
)

new_key = save_recording_with_new_content(recording, "the new content")

recording.refresh_from_db()

expected_path = f"session_recordings_lts/team_id/{self.team.pk}/session_id/{recording.session_id}/data"
assert new_key == f"{expected_path}/12345000-12346000"

assert recording.object_storage_path == expected_path
assert recording.storage_version == "2023-08-01"

mock_write.assert_called_with(
f"{expected_path}/12345000-12346000",
gzip.compress("the new content".encode("utf-8")),
extras={
"ContentEncoding": "gzip",
"ContentType": "application/json",
},
)
4 changes: 3 additions & 1 deletion posthog/session_recordings/session_recording_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from posthog.session_recordings.queries.session_recording_properties import SessionRecordingProperties
from posthog.rate_limit import ClickHouseBurstRateThrottle, ClickHouseSustainedRateThrottle
from posthog.session_recordings.realtime_snapshots import get_realtime_snapshots
from posthog.session_recordings.snapshots.convert_legacy_snapshots import convert_original_version_lts_recording
from posthog.storage import object_storage
from posthog.utils import format_query_params_absolute_url
from prometheus_client import Counter
Expand Down Expand Up @@ -351,7 +352,8 @@ def _snapshots_v2(self, request: request.Request):
if recording.storage_version == "2023-08-01":
file_key = f"{recording.object_storage_path}/{blob_key}"
else:
file_key = recording.object_storage_path
# this is a legacy recording, we need to load the file from the old path
file_key = convert_original_version_lts_recording(recording)
else:
file_key = (
f"session_recordings/team_id/{self.team.pk}/session_id/{recording.session_id}/data/{blob_key}"
Expand Down
82 changes: 82 additions & 0 deletions posthog/session_recordings/snapshots/convert_legacy_snapshots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import json
from typing import Dict

import structlog
from prometheus_client import Histogram

from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.session_recording_helpers import decompress
from posthog.storage import object_storage

logger = structlog.get_logger(__name__)

RECORDING_CONVERSION_TIME_HISTOGRAM = Histogram(
"recording_conversion_time_seconds",
"We convert legacy recordings from LTS format to the latest format, how long does that take?",
)


def _save_converted_content_back_to_storage(converted_content: str, recording: SessionRecording) -> str:
try:
from ee.session_recordings.session_recording_extensions import save_recording_with_new_content

return save_recording_with_new_content(recording, converted_content)
except ImportError:
# not running in EE context... shouldn't get here
logger.error("attempted_to_save_converted_content_back_to_storage_in_non_ee_context", recording_id=recording.id)
return ""


def convert_original_version_lts_recording(recording: SessionRecording) -> str:
# the original version of the LTS recording was a single file
# its contents were gzipped and then base64 encoded.
# we can't simply stream it back to the requester

with RECORDING_CONVERSION_TIME_HISTOGRAM.time():
content = object_storage.read(str(recording.object_storage_path))
if not content:
# TODO and if there's no content is this right?
logger.error(
"attempted_to_convert_original_version_lts_recording_with_no_content",
recording_id=recording.id,
object_storage_path=recording.object_storage_path,
)
return ""

converted_content = _prepare_legacy_content(content)

original_path = recording.object_storage_path
new_file_key = _save_converted_content_back_to_storage(converted_content, recording)

# TODO we should delete the old recording from storage here, but might not have permissions
object_storage.tag(str(original_path), {"converted": "true"})

return new_file_key


def _prepare_legacy_content(content: str) -> str:
# historically we stored the recording as a single file with a base64 encoded gzipped json string
# using utf-16 encoding, this `decompress` method unwinds that back to a json string
decoded_content = decompress(content)
json_content = json.loads(decoded_content)
return _convert_legacy_format_from_lts_storage(json_content)


def _convert_legacy_format_from_lts_storage(lts_formatted_data: Dict) -> str:
"""
The latest version is JSONL formatted data.
Each line is json containing a window_id and a data array.
This is equivalent to the LTS format snapshot_data_by_window_id property dumped as a single line.
"""
if "snapshot_data_by_window_id" not in lts_formatted_data:
raise ValueError("Invalid LTS format: missing snapshot_data_by_window_id")

if "version" not in lts_formatted_data or lts_formatted_data["version"] != "2022-12-22":
raise ValueError(f"Invalid LTS format: version is {lts_formatted_data.get('version', 'missing')}")

snapshot_data_by_window_id = lts_formatted_data["snapshot_data_by_window_id"]
converted = ""
for window_id, data in snapshot_data_by_window_id.items():
converted += json.dumps({"window_id": window_id, "data": data}, separators=(",", ":")) + "\n"

return converted.rstrip("\n")
39 changes: 34 additions & 5 deletions posthog/session_recordings/test/test_lts_session_recordings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, QueryMatchingTest

# this is the urf-16 surrogate pass encoded, gzipped and base64 encoded version of the above
# see: https://github.com/PostHog/posthog/blob/8ff764bb573c6a98368b2ae3503890551a1c3842/posthog/session_recordings/session_recording_helpers.py#L277
legacy_compressed_original = "H4sIAMHMCWUC/9VWbXPSQBDen+L0k44FkvBS4JPV0rG+1Pex08owKQmQSgGTUMRO/7r67HOXQBFa/dQ6N3fJ3e3tPrv73Ca/fl7KllxIKLEkEslYRpg30T1x0D0piMtR37dkGz2AXCIpxpF08ezgLbCnprKD/kOO5bvsy1DeoaXyTPZw4lBa8lF25UjeygQSLWh8KVWseXIGy8dYPcDskzyXBuSeQtc+pPvWbgJ7PmQSGUBa7QaYp+gdOZU5xhkxBdidLaFSD12pQ5sPn3oYXejvorsYfSnDOwf7PiSq9LOGeQPNQ1xqjEDAnSpmZalwpUb5HiQLa7WrXhejRwwnRJEC5QQ6daVmY2k8yHBOELMpPI7yPMRoM5w5lRK0an4SjEOsPIF+E5kJNMyxNsZz4bPKaGaHVtMMuzH1bhNLjHnXojmhpSLkfSII5YE8RJxTNI14E9ZLjP4E/ibErAzoYjbByTHsFvE25p7mp4+54j3HuWV59WIACyP5irMvEM3P8gH82EO+d3HmjNaqiKeOGvU64vko516RMYiBUH2d57pD6vWx17836Cvki5OjP5LX8grsNrjeA+c3xlotFKHzblG7QFzms4w3E/P2Bn4pX76gt6DT+KA9gAd6AyJyT2fxNR91d4w1s64MnOM9oud657SpVrV7hWZ4GsGf0PpzDixbxFgDL7RG6X3UUVmio55avWuVtXdtQBQ9ezvWx31zfDNtBcx8ViblnSIdYb3Eu5XaiprY/M9Yk1SX8aFCfm/Teoi9PlHoXp3V5m8j4MF35VwDM3dtBLy1ERiRQ2E+Xz7h8ITyRrMZoHob2WRDPXMpPyLCcCmm56w/hkVTVLEhGXmQfzGy2m5uskZwdS+r494NnqWM/+EN1n3mN4a2U+BIc09MpTR1w5wLWSOVf+1r9l2bD+VrxKxorXwDBvWgK7SZyypvz84di29s8+b8A7MXeXXJhrY9aU7E/Ab6/OJ1iFqfC633/6t4ae/En+juGttqlLOoLv8bGRQV/hs5qGAeq6eiaeJtB7WizlyauvaYY5Oj0b+asdt1m++K7hf5V+Zs1B0x/1kNurDae2SscvUqZ1II3mdVa/lu/8/e319O3Z4XveO/AS7WeNOWCwAA"


class TestSessionRecordings(APIBaseTest, ClickhouseTestMixin, QueryMatchingTest):
def setUp(self):
Expand Down Expand Up @@ -162,10 +166,19 @@ def list_objects_func(path: str) -> List[str]:
assert response_data == "the file contents"

@patch("posthog.session_recordings.session_recording_api.requests.get")
@patch("posthog.session_recordings.session_recording_api.object_storage.tag")
@patch("posthog.session_recordings.session_recording_api.object_storage.write")
@patch("posthog.session_recordings.session_recording_api.object_storage.read")
@patch("posthog.session_recordings.session_recording_api.object_storage.get_presigned_url")
@patch("posthog.session_recordings.session_recording_api.object_storage.list_objects")
def test_original_version_stored_snapshots_can_be_loaded_without_upversion(
self, mock_list_objects: MagicMock, mock_get_presigned_url: MagicMock, mock_requests: MagicMock
self,
mock_list_objects: MagicMock,
mock_get_presigned_url: MagicMock,
mock_read: MagicMock,
mock_write: MagicMock,
mock_tag: MagicMock,
mock_requests: MagicMock,
) -> None:
session_id = str(uuid.uuid4())
lts_storage_path = "purposefully/not/what/we/would/calculate/to/prove/this/is/used"
Expand All @@ -175,6 +188,7 @@ def list_objects_func(path: str) -> List[str]:

mock_list_objects.side_effect = list_objects_func
mock_get_presigned_url.return_value = "https://example.com"
mock_read.return_value = legacy_compressed_original

mock_response = Mock()
mock_response.raise_for_status.return_value = None
Expand All @@ -184,29 +198,44 @@ def list_objects_func(path: str) -> List[str]:
mock_requests.return_value.__enter__.return_value = mock_response
mock_requests.return_value.__exit__.return_value = None

SessionRecording.objects.create(
recording = SessionRecording.objects.create(
team=self.team,
session_id=session_id,
# to avoid auto-persistence kicking in when this is None
storage_version="not a know version",
object_storage_path=lts_storage_path,
start_time="1970-01-01T00:00:00.001000Z",
end_time="1970-01-01T00:00:00.002000Z",
)
# something in the setup is triggering a path that saves the recording without the provided path so
recording.object_storage_path = lts_storage_path
recording.save()

query_parameters = [
"source=blob",
"version=2",
f"blob_key={lts_storage_path}",
]

mock_write.reset_mock() # reset the mock to remove setup calls
response = self.client.get(
f"/api/projects/{self.team.id}/session_recordings/{session_id}/snapshots?{'&'.join(query_parameters)}"
)
response_data = response.content.decode("utf-8")

assert mock_list_objects.call_args_list == []

expected_path = f"session_recordings_lts/team_id/{self.team.pk}/session_id/{session_id}/data/1-2"

# the content was saved to the new location
assert mock_write.call_args_list[0][0][0] == expected_path
# the original location was tagged
assert mock_tag.call_args_list == [call(lts_storage_path, {"converted": "true"})]

# the original saved path isn't loaded for reading the content
assert mock_get_presigned_url.call_args_list == [
call(lts_storage_path, expiration=60),
call(expected_path, expiration=60),
]

# TODO this wouldn't actually work as OG version files are stored in a different way
# and the mock content is returned
response_data = response.content.decode("utf-8")
assert response_data == "the file contents"
39 changes: 32 additions & 7 deletions posthog/storage/object_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import Optional, Union, List
from typing import Optional, Union, List, Dict

import structlog
from boto3 import client
Expand Down Expand Up @@ -38,7 +38,11 @@ def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:
pass

@abc.abstractmethod
def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None:
def tag(self, bucket: str, key: str, tags: Dict[str, str]) -> None:
pass

@abc.abstractmethod
def write(self, bucket: str, key: str, content: Union[str, bytes], extras: Dict | None) -> None:
pass

@abc.abstractmethod
Expand All @@ -65,7 +69,10 @@ def read(self, bucket: str, key: str) -> Optional[str]:
def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:
pass

def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None:
def tag(self, bucket: str, key: str, tags: Dict[str, str]) -> None:
pass

def write(self, bucket: str, key: str, content: Union[str, bytes], extras: Dict | None) -> None:
pass

def copy_objects(self, bucket: str, source_prefix: str, target_prefix: str) -> int | None:
Expand Down Expand Up @@ -125,10 +132,22 @@ def read_bytes(self, bucket: str, key: str) -> Optional[bytes]:
capture_exception(e)
raise ObjectStorageError("read failed") from e

def write(self, bucket: str, key: str, content: Union[str, bytes]) -> None:
def tag(self, bucket: str, key: str, tags: Dict[str, str]) -> None:
try:
self.aws_client.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={"TagSet": [{"Key": k, "Value": v} for k, v in tags.items()]},
)
except Exception as e:
logger.error("object_storage.tag_failed", bucket=bucket, file_name=key, error=e)
capture_exception(e)
raise ObjectStorageError("tag failed") from e

def write(self, bucket: str, key: str, content: Union[str, bytes], extras: Dict | None) -> None:
s3_response = {}
try:
s3_response = self.aws_client.put_object(Bucket=bucket, Body=content, Key=key)
s3_response = self.aws_client.put_object(Bucket=bucket, Body=content, Key=key, **(extras or {}))
except Exception as e:
logger.error("object_storage.write_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response)
capture_exception(e)
Expand Down Expand Up @@ -175,8 +194,14 @@ def object_storage_client() -> ObjectStorageClient:
return _client


def write(file_name: str, content: Union[str, bytes]) -> None:
return object_storage_client().write(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, content=content)
def write(file_name: str, content: Union[str, bytes], extras: Dict | None = None) -> None:
return object_storage_client().write(
bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, content=content, extras=extras
)


def tag(file_name: str, tags: Dict[str, str]) -> None:
return object_storage_client().tag(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, tags=tags)


def read(file_name: str) -> Optional[str]:
Expand Down

0 comments on commit f72d910

Please sign in to comment.