diff --git a/posthog/session_recordings/realtime_snapshots.py b/posthog/session_recordings/realtime_snapshots.py index e1191c4ddb37e..5068fa8fa994a 100644 --- a/posthog/session_recordings/realtime_snapshots.py +++ b/posthog/session_recordings/realtime_snapshots.py @@ -14,7 +14,7 @@ PUBLISHED_REALTIME_SUBSCRIPTIONS_COUNTER = Counter( "realtime_snapshots_published_subscription_counter", "When the API is serving snapshot requests and wants to receive snapshots via a redis subscription.", - labelnames=["team_id", "session_id", "attempt_count"], + labelnames=["attempt_count"], ) REALTIME_SUBSCRIPTIONS_LOADED_COUNTER = Counter( @@ -23,27 +23,49 @@ labelnames=["attempt_count"], ) + SUBSCRIPTION_CHANNEL = "@posthog/replay/realtime-subscriptions" -ATTEMPT_MAX = 10 -ATTEMPT_TIMEOUT_SECONDS = 5 + +ATTEMPT_MAX = 6 +ATTEMPT_TIMEOUT_SECONDS = 0.1 def get_key(team_id: str, suffix: str) -> str: return f"@posthog/replay/snapshots/team-{team_id}/{suffix}" -def get_realtime_snapshots(team_id: str, session_id: str, attempt_count=0) -> Optional[List[Dict]]: +def publish_subscription(team_id: str, session_id: str) -> None: + """ + Publishing a subscription notifies each instance of Mr Blobby of the request for realtime playback + Only zero or one instances will be handling the session, if they are, they will start publishing + the snapshot data to Redis so that it can be played before the data has been sent to blob storage + """ try: redis = get_client(settings.SESSION_RECORDING_REDIS_URL) - key = get_key(team_id, session_id) - encoded_snapshots = redis.zrange(key, 0, -1, withscores=True) - - # We always publish as it could be that a rebalance has occured and the consumer doesn't know it should be - # sending data to redis redis.publish( SUBSCRIPTION_CHANNEL, json.dumps({"team_id": team_id, "session_id": session_id}), ) + except Exception as e: + capture_exception( + e, + extras={ + "operation": "publish_realtime_subscription", + }, + tags={"team_id": team_id, "session_id": session_id}, + ) + raise e + + +def get_realtime_snapshots(team_id: str, session_id: str, attempt_count=0) -> Optional[List[Dict]]: + try: + redis = get_client(settings.SESSION_RECORDING_REDIS_URL) + key = get_key(team_id, session_id) + encoded_snapshots = redis.zrange(key, 0, -1, withscores=True) + + # We always publish as it could be that a rebalance has occurred + # and the consumer doesn't know it should be sending data to redis + publish_subscription(team_id, session_id) if not encoded_snapshots and attempt_count < ATTEMPT_MAX: logger.info( @@ -52,16 +74,12 @@ def get_realtime_snapshots(team_id: str, session_id: str, attempt_count=0) -> Op session_id=session_id, attempt_count=attempt_count, ) - # If we don't have it we could be in the process of getting it and syncing it - redis.publish( - SUBSCRIPTION_CHANNEL, - json.dumps({"team_id": team_id, "session_id": session_id}), - ) - PUBLISHED_REALTIME_SUBSCRIPTIONS_COUNTER.labels( - team_id=team_id, session_id=session_id, attempt_count=attempt_count - ).inc() - sleep(ATTEMPT_TIMEOUT_SECONDS / ATTEMPT_MAX) + PUBLISHED_REALTIME_SUBSCRIPTIONS_COUNTER.labels(attempt_count=attempt_count).inc() + + # this means we'll sleep 0.1, 0.1, 0,1, 0.2, 0.2, 0.2 + # for a total of 0.9 seconds + sleep(ATTEMPT_TIMEOUT_SECONDS if attempt_count < 4 else ATTEMPT_TIMEOUT_SECONDS * 2) return get_realtime_snapshots(team_id, session_id, attempt_count + 1) if encoded_snapshots: diff --git a/posthog/session_recordings/session_recording_api.py b/posthog/session_recordings/session_recording_api.py index 915fbc83d68ba..31fe842a5ba2d 100644 --- a/posthog/session_recordings/session_recording_api.py +++ b/posthog/session_recordings/session_recording_api.py @@ -45,7 +45,7 @@ ClickHouseSustainedRateThrottle, ) from posthog.session_recordings.queries.session_replay_events import SessionReplayEvents -from posthog.session_recordings.realtime_snapshots import get_realtime_snapshots +from posthog.session_recordings.realtime_snapshots import get_realtime_snapshots, publish_subscription from posthog.session_recordings.snapshots.convert_legacy_snapshots import ( convert_original_version_lts_recording, ) @@ -379,6 +379,11 @@ def snapshots(self, request: request.Request, **kwargs): "end_timestamp": None, } ) + # the UI will use this to try to load realtime snapshots + # so, we can publish the request for Mr. Blobby to start syncing to Redis now + # it takes a short while for the subscription to be sync'd into redis + # let's use the network round trip time to get started + publish_subscription(team_id=str(self.team.pk), session_id=str(recording.session_id)) response_data["sources"] = sources