Skip to content

Commit

Permalink
chore: even more logging (#20612)
Browse files Browse the repository at this point in the history
* chore: even more embeddings logging

* and more settings

* fix

* fix
  • Loading branch information
pauldambra authored Feb 28, 2024
1 parent f59cf7e commit 5e89d91
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
28 changes: 23 additions & 5 deletions ee/session_recordings/ai/generate_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.conf import settings
from openai import OpenAI

from typing import Dict, Any, List
Expand Down Expand Up @@ -35,16 +36,26 @@
"Number of session embeddings generated",
)

SESSION_EMBEDDINGS_FAILED = Counter(
"posthog_session_recordings_embeddings_failed",
"Number of session embeddings failed",
)

SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE = Counter(
"posthog_session_recordings_embeddings_written_to_clickhouse",
"Number of session embeddings written to Clickhouse",
)

SESSION_EMBEDDINGS_FAILED_TO_CLICKHOUSE = Counter(
"posthog_session_recordings_embeddings_failed_to_clickhouse",
"Number of session embeddings failed to Clickhouse",
)

logger = get_logger(__name__)

# TODO move these to settings
BATCH_FLUSH_SIZE = 10
MIN_DURATION_INCLUDE_SECONDS = 120

BATCH_FLUSH_SIZE = settings.REPLAY_EMBEDDINGS_BATCH_SIZE
MIN_DURATION_INCLUDE_SECONDS = settings.REPLAY_EMBEDDINGS_MIN_DURATION_SECONDS


def fetch_recordings_without_embeddings(team: Team | int, offset=0) -> List[str]:
Expand Down Expand Up @@ -138,12 +149,19 @@ def embed_batch_of_recordings(recordings: List[str], team: Team | int) -> None:
if len(batched_embeddings) > 0:
flush_embeddings_to_clickhouse(embeddings=batched_embeddings)
except Exception as e:
SESSION_EMBEDDINGS_FAILED.inc()
logger.error(f"embed recordings error", flow="embeddings", error=e)
raise e


def flush_embeddings_to_clickhouse(embeddings: List[Dict[str, Any]]) -> None:
sync_execute("INSERT INTO session_replay_embeddings (session_id, team_id, embeddings) VALUES", embeddings)
SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE.inc(len(embeddings))
try:
sync_execute("INSERT INTO session_replay_embeddings (session_id, team_id, embeddings) VALUES", embeddings)
SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE.inc(len(embeddings))
except Exception as e:
logger.error(f"flush embeddings error", flow="embeddings", error=e)
SESSION_EMBEDDINGS_FAILED_TO_CLICKHOUSE.inc(len(embeddings))
raise e


def generate_recording_embeddings(session_id: str, team: Team | int) -> List[float] | None:
Expand Down
6 changes: 6 additions & 0 deletions ee/tasks/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def generate_recordings_embeddings_batch() -> None:
for team in settings.REPLAY_EMBEDDINGS_ALLOWED_TEAMS:
try:
recordings = fetch_recordings_without_embeddings(int(team))
logger.info(
f"[generate_recordings_embeddings_batch] Fetched {len(recordings)} recordings",
recordings=recordings,
flow="embeddings",
team_id=team,
)
embed_batch_of_recordings_task.si(recordings, int(team)).apply_async()
except Team.DoesNotExist:
logger.info(f"[generate_recordings_embeddings_batch] Team {team} does not exist. Skipping.")
Expand Down
14 changes: 8 additions & 6 deletions posthog/settings/session_replay.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List

from posthog.settings import get_from_env, get_list, DEBUG
from posthog.settings import get_from_env, get_list
from posthog.utils import str_to_bool

# TRICKY: we saw unusual memory usage behavior in EU clickhouse cluster
Expand All @@ -18,10 +18,12 @@
"REALTIME_SNAPSHOTS_FROM_REDIS_ATTEMPT_TIMEOUT_SECONDS", 0.2, type_cast=float
)

REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS = get_from_env(
"REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS", 30 if DEBUG else 300, type_cast=int
)

REPLAY_EMBEDDINGS_ALLOWED_TEAMS: List[str] = get_list(get_from_env("REPLAY_EMBEDDINGS_ALLOWED_TEAM", "", type_cast=str))

RECORDINGS_INGESTER_URL = get_from_env("RECORDINGS_INGESTER_URL", "")

REPLAY_EMBEDDINGS_ALLOWED_TEAMS: List[str] = get_list(get_from_env("REPLAY_EMBEDDINGS_ALLOWED_TEAM", "", type_cast=str))
REPLAY_EMBEDDINGS_BATCH_SIZE = get_from_env("REPLAY_EMBEDDINGS_BATCH_SIZE", 10, type_cast=int)
REPLAY_EMBEDDINGS_MIN_DURATION_SECONDS = get_from_env("REPLAY_EMBEDDINGS_MIN_DURATION_SECONDS", 30, type_cast=int)
REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS = get_from_env(
"REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS", 150, type_cast=int
)

0 comments on commit 5e89d91

Please sign in to comment.