Skip to content

Commit

Permalink
feat: wait less for realtime snapshots (#19728)
Browse files Browse the repository at this point in the history
* feat: wait less for realtime snapshots

* we can us the network round trip time to start getting the snapshot ready

* fix prom call

* don't need to publish again again

* fix
  • Loading branch information
pauldambra authored Jan 11, 2024
1 parent c448e58 commit a45aef4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 19 deletions.
54 changes: 36 additions & 18 deletions posthog/session_recordings/realtime_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
PUBLISHED_REALTIME_SUBSCRIPTIONS_COUNTER = Counter(
"realtime_snapshots_published_subscription_counter",
"When the API is serving snapshot requests and wants to receive snapshots via a redis subscription.",
labelnames=["team_id", "session_id", "attempt_count"],
labelnames=["attempt_count"],
)

REALTIME_SUBSCRIPTIONS_LOADED_COUNTER = Counter(
Expand All @@ -23,27 +23,49 @@
labelnames=["attempt_count"],
)


SUBSCRIPTION_CHANNEL = "@posthog/replay/realtime-subscriptions"
ATTEMPT_MAX = 10
ATTEMPT_TIMEOUT_SECONDS = 5

ATTEMPT_MAX = 6
ATTEMPT_TIMEOUT_SECONDS = 0.1


def get_key(team_id: str, suffix: str) -> str:
return f"@posthog/replay/snapshots/team-{team_id}/{suffix}"


def get_realtime_snapshots(team_id: str, session_id: str, attempt_count=0) -> Optional[List[Dict]]:
def publish_subscription(team_id: str, session_id: str) -> None:
"""
Publishing a subscription notifies each instance of Mr Blobby of the request for realtime playback
Only zero or one instances will be handling the session, if they are, they will start publishing
the snapshot data to Redis so that it can be played before the data has been sent to blob storage
"""
try:
redis = get_client(settings.SESSION_RECORDING_REDIS_URL)
key = get_key(team_id, session_id)
encoded_snapshots = redis.zrange(key, 0, -1, withscores=True)

# We always publish as it could be that a rebalance has occured and the consumer doesn't know it should be
# sending data to redis
redis.publish(
SUBSCRIPTION_CHANNEL,
json.dumps({"team_id": team_id, "session_id": session_id}),
)
except Exception as e:
capture_exception(
e,
extras={
"operation": "publish_realtime_subscription",
},
tags={"team_id": team_id, "session_id": session_id},
)
raise e


def get_realtime_snapshots(team_id: str, session_id: str, attempt_count=0) -> Optional[List[Dict]]:
try:
redis = get_client(settings.SESSION_RECORDING_REDIS_URL)
key = get_key(team_id, session_id)
encoded_snapshots = redis.zrange(key, 0, -1, withscores=True)

# We always publish as it could be that a rebalance has occurred
# and the consumer doesn't know it should be sending data to redis
publish_subscription(team_id, session_id)

if not encoded_snapshots and attempt_count < ATTEMPT_MAX:
logger.info(
Expand All @@ -52,16 +74,12 @@ def get_realtime_snapshots(team_id: str, session_id: str, attempt_count=0) -> Op
session_id=session_id,
attempt_count=attempt_count,
)
# If we don't have it we could be in the process of getting it and syncing it
redis.publish(
SUBSCRIPTION_CHANNEL,
json.dumps({"team_id": team_id, "session_id": session_id}),
)
PUBLISHED_REALTIME_SUBSCRIPTIONS_COUNTER.labels(
team_id=team_id, session_id=session_id, attempt_count=attempt_count
).inc()

sleep(ATTEMPT_TIMEOUT_SECONDS / ATTEMPT_MAX)
PUBLISHED_REALTIME_SUBSCRIPTIONS_COUNTER.labels(attempt_count=attempt_count).inc()

# this means we'll sleep 0.1, 0.1, 0,1, 0.2, 0.2, 0.2
# for a total of 0.9 seconds
sleep(ATTEMPT_TIMEOUT_SECONDS if attempt_count < 4 else ATTEMPT_TIMEOUT_SECONDS * 2)
return get_realtime_snapshots(team_id, session_id, attempt_count + 1)

if encoded_snapshots:
Expand Down
7 changes: 6 additions & 1 deletion posthog/session_recordings/session_recording_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
ClickHouseSustainedRateThrottle,
)
from posthog.session_recordings.queries.session_replay_events import SessionReplayEvents
from posthog.session_recordings.realtime_snapshots import get_realtime_snapshots
from posthog.session_recordings.realtime_snapshots import get_realtime_snapshots, publish_subscription
from posthog.session_recordings.snapshots.convert_legacy_snapshots import (
convert_original_version_lts_recording,
)
Expand Down Expand Up @@ -379,6 +379,11 @@ def snapshots(self, request: request.Request, **kwargs):
"end_timestamp": None,
}
)
# the UI will use this to try to load realtime snapshots
# so, we can publish the request for Mr. Blobby to start syncing to Redis now
# it takes a short while for the subscription to be sync'd into redis
# let's use the network round trip time to get started
publish_subscription(team_id=str(self.team.pk), session_id=str(recording.session_id))

response_data["sources"] = sources

Expand Down

0 comments on commit a45aef4

Please sign in to comment.