diff --git a/ee/session_recordings/ai/__init__.py b/ee/session_recordings/ai/__init__.py deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/ee/session_recordings/ai/embeddings_queries.py b/ee/session_recordings/ai/embeddings_queries.py deleted file mode 100644 index 2034a9f190152..0000000000000 --- a/ee/session_recordings/ai/embeddings_queries.py +++ /dev/null @@ -1,109 +0,0 @@ -from django.conf import settings - - -from posthog.models import Team -from posthog.clickhouse.client import sync_execute - -BATCH_FLUSH_SIZE = settings.REPLAY_EMBEDDINGS_BATCH_SIZE -MIN_DURATION_INCLUDE_SECONDS = settings.REPLAY_EMBEDDINGS_MIN_DURATION_SECONDS - - -def fetch_errors_by_session_without_embeddings(team_id: int, offset=0) -> list[str]: - query = """ - WITH embedded_sessions AS ( - SELECT - session_id - FROM - session_replay_embeddings - WHERE - team_id = %(team_id)s - -- don't load all data for all time - AND generation_timestamp > now() - INTERVAL 7 DAY - AND source_type = 'error' - ) - SELECT log_source_id, message - FROM log_entries - PREWHERE - team_id = %(team_id)s - AND level = 'error' - AND log_source = 'session_replay' - AND timestamp <= now() - AND timestamp >= now() - INTERVAL 7 DAY - AND log_source_id NOT IN embedded_sessions - LIMIT %(batch_flush_size)s - -- when running locally the offset is used for paging - -- when running in celery the offset is not used - OFFSET %(offset)s - """ - - return sync_execute( - query, - { - "team_id": team_id, - "batch_flush_size": BATCH_FLUSH_SIZE, - "offset": offset, - }, - ) - - -def fetch_recordings_without_embeddings(team_id: int, offset=0) -> list[str]: - team = Team.objects.get(id=team_id) - - query = """ - WITH embedding_ids AS - ( - SELECT - session_id - FROM - session_replay_embeddings - WHERE - team_id = %(team_id)s - -- don't load all data for all time - AND generation_timestamp > now() - INTERVAL 7 DAY - ), - replay_with_events AS - ( - SELECT - distinct $session_id - FROM - events - WHERE - team_id = %(team_id)s - -- don't load all data for all time - AND timestamp > now() - INTERVAL 7 DAY - AND timestamp < now() - AND $session_id IS NOT NULL AND $session_id != '' - ) - SELECT session_id - FROM - session_replay_events - WHERE - session_id NOT IN embedding_ids - AND team_id = %(team_id)s - -- must be a completed session - AND min_first_timestamp < now() - INTERVAL 1 DAY - -- let's not load all data for all time - -- will definitely need to do something about this length of time - AND min_first_timestamp > now() - INTERVAL 7 DAY - AND session_id IN replay_with_events - GROUP BY session_id - HAVING dateDiff('second', min(min_first_timestamp), max(max_last_timestamp)) > %(min_duration_include_seconds)s - ORDER BY rand() - LIMIT %(batch_flush_size)s - -- when running locally the offset is used for paging - -- when running in celery the offset is not used - OFFSET %(offset)s - """ - - return [ - x[0] - for x in sync_execute( - query, - { - "team_id": team.pk, - "batch_flush_size": BATCH_FLUSH_SIZE, - "offset": offset, - "min_duration_include_seconds": MIN_DURATION_INCLUDE_SECONDS, - }, - ) - ] diff --git a/ee/session_recordings/ai/embeddings_runner.py b/ee/session_recordings/ai/embeddings_runner.py deleted file mode 100644 index 5125934b60161..0000000000000 --- a/ee/session_recordings/ai/embeddings_runner.py +++ /dev/null @@ -1,297 +0,0 @@ -import json -import tiktoken -import datetime -import pytz - -from typing import Any, Optional - -from abc import ABC, abstractmethod -from prometheus_client import Histogram, Counter -from structlog import get_logger -from openai import OpenAI - -from posthog.models import Team -from posthog.clickhouse.client import sync_execute - -from posthog.session_recordings.queries.session_replay_events import SessionReplayEvents -from ee.session_recordings.ai.utils import ( - SessionSummaryPromptData, - simplify_window_id, - format_dates, - collapse_sequence_of_events, - only_pageview_urls, -) - -_encoding: Optional[tiktoken.Encoding] = None - - -def get_encoding() -> tiktoken.Encoding: - global _encoding - if not _encoding: - # NOTE: This does an API request so we want to ensure we load it lazily and not at startup - # tiktoken.encoding_for_model(model_name) specifies encoder - # model_name = "text-embedding-3-small" for this usecase - _encoding = tiktoken.get_encoding("cl100k_base") - return _encoding - - -MAX_TOKENS_FOR_MODEL = 8191 - -RECORDING_EMBEDDING_TOKEN_COUNT = Histogram( - "posthog_session_recordings_recording_embedding_token_count", - "Token count for individual recordings generated during embedding", - buckets=[0, 100, 500, 1000, 2000, 3000, 4000, 5000, 6000, 8000, 10000], - labelnames=["source_type"], -) - -GENERATE_RECORDING_EMBEDDING_TIMING = Histogram( - "posthog_session_recordings_generate_recording_embedding", - "Time spent generating recording embeddings for a single session", - buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20], - labelnames=["source_type"], -) - -SESSION_EMBEDDINGS_GENERATED = Counter( - "posthog_session_recordings_embeddings_generated", - "Number of session embeddings generated", - labelnames=["source_type"], -) - -SESSION_EMBEDDINGS_FAILED = Counter( - "posthog_session_recordings_embeddings_failed", - "Instance of an embedding request to open AI (and its surrounding work) failing and being swallowed", - labelnames=["source_type"], -) - -SESSION_EMBEDDINGS_FATAL_FAILED = Counter( - "posthog_session_recordings_embeddings_fatal_failed", - "Instance of the embeddings task failing and raising an exception", - labelnames=["source_type"], -) - -SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE = Counter( - "posthog_session_recordings_embeddings_written_to_clickhouse", - "Number of session embeddings written to Clickhouse", - labelnames=["source_type"], -) - -SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS = Counter( - "posthog_session_recordings_skipped_when_generating_embeddings", - "Number of sessions skipped when generating embeddings", - labelnames=["source_type", "reason"], -) - -SESSION_EMBEDDINGS_FAILED_TO_CLICKHOUSE = Counter( - "posthog_session_recordings_embeddings_failed_to_clickhouse", - "Number of session embeddings failed to Clickhouse", - labelnames=["source_type"], -) - - -logger = get_logger(__name__) - - -class EmbeddingPreparation(ABC): - source_type: str - - @staticmethod - @abstractmethod - def prepare(item, team) -> tuple[str, str]: - raise NotImplementedError() - - -class SessionEmbeddingsRunner(ABC): - team: Team - openai_client: Any - - def __init__(self, team: Team): - self.team = team - self.openai_client = OpenAI() - - def run(self, items: list[Any], embeddings_preparation: type[EmbeddingPreparation]) -> None: - source_type = embeddings_preparation.source_type - - try: - batched_embeddings = [] - - for item in items: - try: - logger.info( - f"generating embedding input for item", - flow="embeddings", - item=json.dumps(item), - source_type=source_type, - ) - - result = embeddings_preparation.prepare(item, self.team) - - if result: - session_id, input = result - - logger.info( - f"generating embedding for item", - flow="embeddings", - session_id=session_id, - source_type=source_type, - ) - - with GENERATE_RECORDING_EMBEDDING_TIMING.labels(source_type=source_type).time(): - embeddings = self._embed(input, source_type=source_type) - - logger.info( - f"generated embedding for item", - flow="embeddings", - session_id=session_id, - source_type=source_type, - ) - - if embeddings: - SESSION_EMBEDDINGS_GENERATED.labels(source_type=source_type).inc() - batched_embeddings.append( - { - "team_id": self.team.pk, - "session_id": session_id, - "embeddings": embeddings, - "source_type": source_type, - "input": input, - } - ) - # we don't want to fail the whole batch if only a single recording fails - except Exception as e: - SESSION_EMBEDDINGS_FAILED.labels(source_type=source_type).inc() - logger.exception( - f"embed individual item error", - flow="embeddings", - error=e, - source_type=source_type, - ) - # so we swallow errors here - - if len(batched_embeddings) > 0: - self._flush_embeddings_to_clickhouse(embeddings=batched_embeddings, source_type=source_type) - except Exception as e: - # but we don't swallow errors within the wider task itself - # if something is failing here then we're most likely having trouble with ClickHouse - SESSION_EMBEDDINGS_FATAL_FAILED.labels(source_type=source_type).inc() - logger.exception(f"embed items fatal error", flow="embeddings", error=e, source_type=source_type) - raise - - def _embed(self, input: str, source_type: str): - token_count = self._num_tokens_for_input(input) - RECORDING_EMBEDDING_TOKEN_COUNT.labels(source_type=source_type).observe(token_count) - if token_count > MAX_TOKENS_FOR_MODEL: - logger.error( - f"embedding input exceeds max token count for model", - flow="embeddings", - input=json.dumps(input), - source_type=source_type, - ) - SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.labels( - source_type=source_type, reason="token_count_too_high" - ).inc() - return None - - return ( - self.openai_client.embeddings.create( - input=input, - model="text-embedding-3-small", - ) - .data[0] - .embedding - ) - - def _num_tokens_for_input(self, string: str) -> int: - """Returns the number of tokens in a text string.""" - return len(get_encoding().encode(string)) - - def _flush_embeddings_to_clickhouse(self, embeddings: list[dict[str, Any]], source_type: str) -> None: - try: - sync_execute( - "INSERT INTO session_replay_embeddings (session_id, team_id, embeddings, source_type, input) VALUES", - embeddings, - ) - SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE.labels(source_type=source_type).inc(len(embeddings)) - except Exception as e: - logger.exception(f"flush embeddings error", flow="embeddings", error=e, source_type=source_type) - SESSION_EMBEDDINGS_FAILED_TO_CLICKHOUSE.labels(source_type=source_type).inc(len(embeddings)) - raise - - -class ErrorEmbeddingsPreparation(EmbeddingPreparation): - source_type = "error" - - @staticmethod - def prepare(item: tuple[str, str], _): - session_id = item[0] - error_message = item[1] - return session_id, error_message - - -class SessionEventsEmbeddingsPreparation(EmbeddingPreparation): - source_type = "session" - - @staticmethod - def prepare(session_id: str, team: Team): - eight_days_ago = datetime.datetime.now(pytz.UTC) - datetime.timedelta(days=8) - session_metadata = SessionReplayEvents().get_metadata( - session_id=str(session_id), team=team, recording_start_time=eight_days_ago - ) - if not session_metadata: - logger.error(f"no session metadata found for session", flow="embeddings", session_id=session_id) - SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.labels( - source_type=SessionEventsEmbeddingsPreparation.source_type, reason="metadata_missing" - ).inc() - return None - - session_events = SessionReplayEvents().get_events( - session_id=str(session_id), - team=team, - metadata=session_metadata, - events_to_ignore=[ - "$feature_flag_called", - ], - ) - - if not session_events or not session_events[0] or not session_events[1]: - logger.error(f"no events found for session", flow="embeddings", session_id=session_id) - SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.labels( - source_type=SessionEventsEmbeddingsPreparation.source_type, reason="events_missing" - ).inc() - return None - - processed_sessions = collapse_sequence_of_events( - only_pageview_urls( - format_dates( - simplify_window_id(SessionSummaryPromptData(columns=session_events[0], results=session_events[1])), - start=datetime.datetime(1970, 1, 1, tzinfo=pytz.UTC), # epoch timestamp - ) - ) - ) - - logger.info(f"collapsed events for session", flow="embeddings", session_id=session_id) - - processed_sessions_index = processed_sessions.column_index("event") - current_url_index = processed_sessions.column_index("$current_url") - elements_chain_index = processed_sessions.column_index("elements_chain") - - input = ( - str(session_metadata) - + "\n" - + "\n".join( - SessionEventsEmbeddingsPreparation._compact_result( - event_name=result[processed_sessions_index] if processed_sessions_index is not None else "", - current_url=result[current_url_index] if current_url_index is not None else "", - elements_chain=result[elements_chain_index] if elements_chain_index is not None else "", - ) - for result in processed_sessions.results - ) - ) - - return session_id, input - - @staticmethod - def _compact_result(event_name: str, current_url: int, elements_chain: dict[str, str] | str) -> str: - elements_string = ( - elements_chain if isinstance(elements_chain, str) else ", ".join(str(e) for e in elements_chain) - ) - return f"{event_name} {current_url} {elements_string}" diff --git a/ee/session_recordings/ai/error_clustering.py b/ee/session_recordings/ai/error_clustering.py deleted file mode 100644 index 0e03a755f41e9..0000000000000 --- a/ee/session_recordings/ai/error_clustering.py +++ /dev/null @@ -1,99 +0,0 @@ -from prometheus_client import Histogram -from django.conf import settings -from posthog.clickhouse.client import sync_execute -from posthog.models import Team -from sklearn.cluster import DBSCAN -import pandas as pd -import numpy as np -from posthog.session_recordings.models.session_recording_event import SessionRecordingViewed -from datetime import date - -CLUSTER_REPLAY_ERRORS_TIMING = Histogram( - "posthog_session_recordings_cluster_replay_errors", - "Time spent clustering the embeddings of replay errors", - buckets=[0.5, 1, 2, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60], -) - -CLUSTER_REPLAY_ERRORS_CLUSTER_COUNT = Histogram( - "posthog_session_recordings_errors_cluster_count", - "Count of clusters identified from error messages per team", - buckets=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 35, 40, 45, 50], - labelnames=["team_id"], -) - -DBSCAN_EPS = settings.REPLAY_EMBEDDINGS_CLUSTERING_DBSCAN_EPS -DBSCAN_MIN_SAMPLES = settings.REPLAY_EMBEDDINGS_CLUSTERING_DBSCAN_MIN_SAMPLES - - -def error_clustering(team: Team): - results = fetch_error_embeddings(team.pk) - - if not results: - return [] - - df = pd.DataFrame(results, columns=["session_id", "error", "embeddings", "timestamp"]) - - df["cluster"] = cluster_embeddings(df["embeddings"].tolist()) - - CLUSTER_REPLAY_ERRORS_CLUSTER_COUNT.labels(team_id=team.pk).observe(df["cluster"].nunique()) - - return construct_response(df, team) - - -def fetch_error_embeddings(team_id: int): - query = """ - SELECT - session_id, input, embeddings, generation_timestamp - FROM - session_replay_embeddings - WHERE - team_id = %(team_id)s - -- don't load all data for all time - AND generation_timestamp > now() - INTERVAL 7 DAY - AND source_type = 'error' - AND input != '' - """ - - return sync_execute( - query, - {"team_id": team_id}, - ) - - -def cluster_embeddings(embeddings): - dbscan = DBSCAN(eps=DBSCAN_EPS, min_samples=DBSCAN_MIN_SAMPLES) - with CLUSTER_REPLAY_ERRORS_TIMING.time(): - dbscan.fit(embeddings) - return dbscan.labels_ - - -def construct_response(df: pd.DataFrame, team: Team): - viewed_session_ids = list( - SessionRecordingViewed.objects.filter(team=team, session_id__in=df["session_id"].unique()) - .values_list("session_id", flat=True) - .distinct() - ) - - clusters = [] - for cluster, rows in df.groupby("cluster"): - session_ids = rows["session_id"].unique() - sample = rows.sample(n=1)[["session_id", "error"]].to_dict("records")[0] - - date_series = ( - rows.groupby([rows["timestamp"].dt.date]) - .size() - .reindex(pd.date_range(end=date.today(), periods=7), fill_value=0) - ) - sparkline = dict(zip(date_series.index.astype(str), date_series)) - clusters.append( - { - "cluster": cluster, - "sample": sample.get("error"), - "session_ids": np.random.choice(session_ids, size=DBSCAN_MIN_SAMPLES - 1), - "occurrences": rows.size, - "sparkline": sparkline, - "unique_sessions": len(session_ids), - "viewed": len(np.intersect1d(session_ids, viewed_session_ids, assume_unique=True)), - } - ) - return clusters diff --git a/ee/session_recordings/ai/similar_recordings.py b/ee/session_recordings/ai/similar_recordings.py deleted file mode 100644 index a267459cc8087..0000000000000 --- a/ee/session_recordings/ai/similar_recordings.py +++ /dev/null @@ -1,57 +0,0 @@ -from prometheus_client import Histogram - -from posthog.clickhouse.client import sync_execute -from posthog.models.team import Team -from posthog.session_recordings.models.session_recording import SessionRecording - -FIND_RECORDING_NEIGHBOURS_TIMING = Histogram( - "posthog_session_recordings_find_recording_neighbours", - "Time spent finding the most similar recording embeddings for a single session", -) - - -def similar_recordings(recording: SessionRecording, team: Team): - with FIND_RECORDING_NEIGHBOURS_TIMING.time(): - similar_embeddings = closest_embeddings(session_id=recording.session_id, team_id=team.pk) - - # TODO: join session recording context (person, duration, etc) to show in frontend - - return similar_embeddings - - -def closest_embeddings(session_id: str, team_id: int): - query = """ - WITH ( - SELECT - argMax(embeddings, generation_timestamp) as target_embeddings - FROM - session_replay_embeddings - WHERE - team_id = %(team_id)s - -- don't load all data for all time - AND generation_timestamp > now() - INTERVAL 7 DAY - AND session_id = %(session_id)s - group by session_id - LIMIT 1 - ) as target_embeddings - SELECT - session_id, - -- distance function choice based on https://help.openai.com/en/articles/6824809-embeddings-frequently-asked-questions - -- OpenAI normalizes embeddings so L2 should produce the same score but is slightly slower - cosineDistance(embeddings, target_embeddings) AS similarity_score - FROM session_replay_embeddings - WHERE - team_id = %(team_id)s - -- don't load all data for all time - AND generation_timestamp > now() - INTERVAL 7 DAY - -- skip the target recording - AND session_id != %(session_id)s - ORDER BY similarity_score ASC - -- only return a max number of results - LIMIT %(limit)s; - """ - - return sync_execute( - query, - {"team_id": team_id, "session_id": session_id, "limit": 3}, - ) diff --git a/ee/session_recordings/ai/utils.py b/ee/session_recordings/ai/utils.py index 7345abb3183b0..38ef49cfdb190 100644 --- a/ee/session_recordings/ai/utils.py +++ b/ee/session_recordings/ai/utils.py @@ -3,8 +3,6 @@ from typing import Any -from hashlib import shake_256 - @dataclasses.dataclass class SessionSummaryPromptData: @@ -59,42 +57,6 @@ def simplify_window_id(session_events: SessionSummaryPromptData) -> SessionSumma return dataclasses.replace(session_events, results=simplified_results) -def only_pageview_urls(session_events: SessionSummaryPromptData) -> SessionSummaryPromptData: - """ - including the url with every event is a lot of duplication, - so we remove it from all events except pageviews - """ - if session_events.is_empty(): - return session_events - - # find url column index - url_index = session_events.column_index("$current_url") - event_index = session_events.column_index("event") - - pageview_results = [] - for result in session_events.results: - if url_index is None or event_index is None: - pageview_results.append(result) - continue - - url: str | None = result[url_index] - event: str | None = result[event_index] - if not url: - pageview_results.append(result) - continue - if event == "$pageview": - pageview_results.append(result) - continue - - # otherwise we hash the url, so we have ~one token per event - # this would mean sessions with multiple events that only - # differ by URL should still have some distance between them - result[url_index] = shake_256(url.encode("utf-8")).hexdigest(4) - pageview_results.append(result) - - return dataclasses.replace(session_events, results=pageview_results) - - def deduplicate_urls(session_events: SessionSummaryPromptData) -> SessionSummaryPromptData: if session_events.is_empty(): return session_events diff --git a/ee/session_recordings/session_summary/__init__.py b/ee/session_recordings/session_summary/__init__.py deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/ee/session_recordings/session_summary/test/__init__.py b/ee/session_recordings/session_summary/test/__init__.py deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/ee/tasks/__init__.py b/ee/tasks/__init__.py index cf44299a71f71..4bc793399424b 100644 --- a/ee/tasks/__init__.py +++ b/ee/tasks/__init__.py @@ -7,12 +7,6 @@ handle_subscription_value_change, schedule_all_subscriptions, ) -from .replay import ( - embed_batch_of_recordings_task, - generate_recordings_embeddings_batch, - generate_replay_embedding_error_clusters, - cluster_replay_error_embeddings, -) # As our EE tasks are not included at startup for Celery, we need to ensure they are declared here so that they are imported by posthog/settings/celery.py @@ -22,8 +16,4 @@ "schedule_all_subscriptions", "deliver_subscription_report", "handle_subscription_value_change", - "embed_batch_of_recordings_task", - "generate_recordings_embeddings_batch", - "generate_replay_embedding_error_clusters", - "cluster_replay_error_embeddings", ] diff --git a/ee/tasks/replay.py b/ee/tasks/replay.py deleted file mode 100644 index fcf57196c2dc5..0000000000000 --- a/ee/tasks/replay.py +++ /dev/null @@ -1,99 +0,0 @@ -from typing import Any - -import structlog -from celery import shared_task - -from ee.session_recordings.ai.embeddings_queries import ( - fetch_errors_by_session_without_embeddings, - fetch_recordings_without_embeddings, -) -from ee.session_recordings.ai.embeddings_runner import ( - SessionEmbeddingsRunner, - ErrorEmbeddingsPreparation, - SessionEventsEmbeddingsPreparation, -) -from ee.session_recordings.ai.error_clustering import error_clustering -from posthog import settings -from posthog.models import Team -from posthog.tasks.utils import CeleryQueue -from django.core.cache import cache - -logger = structlog.get_logger(__name__) - - -# rate limits are per worker, and this task makes multiple calls to open AI -# we currently are allowed 500 calls per minute, so let's rate limit each worker -# to much less than that -@shared_task(ignore_result=False, queue=CeleryQueue.SESSION_REPLAY_EMBEDDINGS.value, rate_limit="75/m") -def embed_batch_of_recordings_task(recordings: list[Any], team_id: int) -> None: - try: - team = Team.objects.get(id=team_id) - runner = SessionEmbeddingsRunner(team=team) - - runner.run(recordings, embeddings_preparation=SessionEventsEmbeddingsPreparation) - - results = fetch_errors_by_session_without_embeddings(team.pk) - runner.run(results, embeddings_preparation=ErrorEmbeddingsPreparation) - except Team.DoesNotExist: - logger.info(f"[embed_batch_of_recordings_task] Team {team} does not exist. Skipping.") - pass - - -@shared_task(ignore_result=True) -def generate_recordings_embeddings_batch() -> None: - # see https://docs.celeryq.dev/en/stable/userguide/canvas.html - # we have three jobs to do here - # 1. get a batch of recordings - # 2. for each recording - ideally in parallel - generate an embedding - # 3. update CH with the embeddings in one update operation - # in Celery that's a chain of tasks - # with step 2 being a group of tasks - # chord( - # embed_single_recording.si(recording.session_id, recording.team_id) - # for recording in fetch_recordings_without_embeddings(int(team)) - # )(generate_recordings_embeddings_batch_on_complete.si()) - # but even the docs call out performance impact of synchronising tasks - # - # so, for now, we'll do that naively - - for team_id in settings.REPLAY_EMBEDDINGS_ALLOWED_TEAMS: - try: - recordings = fetch_recordings_without_embeddings(int(team_id)) - logger.info( - f"[generate_recordings_embeddings_batch] Fetched {len(recordings)} recordings", - recordings=recordings, - flow="embeddings", - team_id=team_id, - ) - embed_batch_of_recordings_task.si(recordings, int(team_id)).apply_async() - except Exception as e: - logger.error(f"[generate_recordings_embeddings_batch] Error: {e}.", exc_info=True, error=e) - pass - - -@shared_task(ignore_result=True) -def generate_replay_embedding_error_clusters() -> None: - for team_id in settings.REPLAY_EMBEDDINGS_ALLOWED_TEAMS: - try: - cluster_replay_error_embeddings.si(int(team_id)).apply_async() - except Exception as e: - logger.error(f"[generate_replay_error_clusters] Error: {e}.", exc_info=True, error=e) - pass - - -@shared_task(ignore_result=True, queue=CeleryQueue.SESSION_REPLAY_EMBEDDINGS.value) -def cluster_replay_error_embeddings(team_id: int) -> None: - try: - team = Team.objects.get(id=team_id) - clusters = error_clustering(team) - - cache.set(f"cluster_errors_{team.pk}", clusters, settings.CACHED_RESULTS_TTL) - - logger.info( - f"[generate_replay_error_clusters] Completed for team", - flow="embeddings", - team_id=team_id, - ) - except Team.DoesNotExist: - logger.info(f"[generate_replay_error_clusters] Team {team} does not exist. Skipping.") - pass diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 1701d3232a439..3339c51d43e3d 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -41,7 +41,6 @@ import { DataWarehouseTable, DataWarehouseViewLink, EarlyAccessFeatureType, - ErrorClusterResponse, EventDefinition, EventDefinitionType, EventsListQueryParams, @@ -1931,10 +1930,6 @@ const api = { return await new ApiRequest().recording(recordingId).withAction('similar_sessions').get() }, - async errorClusters(refresh?: boolean): Promise { - return await new ApiRequest().recordings().withAction('error_clusters').withQueryString({ refresh }).get() - }, - async delete(recordingId: SessionRecordingType['id']): Promise<{ success: boolean }> { return await new ApiRequest().recording(recordingId).delete() }, diff --git a/frontend/src/lib/constants.tsx b/frontend/src/lib/constants.tsx index 3a0c3436faa9f..caaf7e7951298 100644 --- a/frontend/src/lib/constants.tsx +++ b/frontend/src/lib/constants.tsx @@ -179,10 +179,8 @@ export const FEATURE_FLAGS = { PRODUCT_INTRO_PAGES: 'product-intro-pages', // owner: @raquelmsmith SQL_EDITOR: 'sql-editor', // owner: @EDsCODE #team-data-warehouse SESSION_REPLAY_DOCTOR: 'session-replay-doctor', // owner: #team-replay - REPLAY_SIMILAR_RECORDINGS: 'session-replay-similar-recordings', // owner: #team-replay SAVED_NOT_PINNED: 'saved-not-pinned', // owner: #team-replay NEW_EXPERIMENTS_UI: 'new-experiments-ui', // owner: @jurajmajerik #team-feature-success - REPLAY_ERROR_CLUSTERING: 'session-replay-error-clustering', // owner: #team-replay AUDIT_LOGS_ACCESS: 'audit-logs-access', // owner: #team-growth SUBSCRIBE_FROM_PAYGATE: 'subscribe-from-paygate', // owner: #team-growth HEATMAPS_UI: 'heatmaps-ui', // owner: @benjackwhite diff --git a/frontend/src/scenes/session-recordings/SessionRecordings.tsx b/frontend/src/scenes/session-recordings/SessionRecordings.tsx index b720e5eff097a..db612745ab2a0 100644 --- a/frontend/src/scenes/session-recordings/SessionRecordings.tsx +++ b/frontend/src/scenes/session-recordings/SessionRecordings.tsx @@ -24,7 +24,6 @@ import { urls } from 'scenes/urls' import { sidePanelSettingsLogic } from '~/layout/navigation-3000/sidepanel/panels/sidePanelSettingsLogic' import { AvailableFeature, NotebookNodeType, ReplayTabs } from '~/types' -import { SessionRecordingErrors } from './errors/SessionRecordingErrors' import { createPlaylist } from './playlist/playlistUtils' import { SessionRecordingsPlaylist } from './playlist/SessionRecordingsPlaylist' import { SavedSessionRecordingPlaylists } from './saved-playlists/SavedSessionRecordingPlaylists' @@ -196,8 +195,6 @@ function MainPanel(): JSX.Element { ) : tab === ReplayTabs.Playlists ? ( - ) : tab === ReplayTabs.Errors ? ( - ) : tab === ReplayTabs.Templates ? ( ) : null} diff --git a/frontend/src/scenes/session-recordings/errors/SessionRecordingErrors.tsx b/frontend/src/scenes/session-recordings/errors/SessionRecordingErrors.tsx deleted file mode 100644 index 75c96333ac580..0000000000000 --- a/frontend/src/scenes/session-recordings/errors/SessionRecordingErrors.tsx +++ /dev/null @@ -1,181 +0,0 @@ -import { IconFeatures } from '@posthog/icons' -import { LemonButton, LemonTable, LemonTabs } from '@posthog/lemon-ui' -import { captureException } from '@sentry/react' -import { useActions, useValues } from 'kea' -import { JSONViewer } from 'lib/components/JSONViewer' -import { Sparkline } from 'lib/components/Sparkline' -import { useState } from 'react' -import { urls } from 'scenes/urls' - -import { sessionPlayerModalLogic } from '../player/modal/sessionPlayerModalLogic' -import { sessionRecordingErrorsLogic } from './sessionRecordingErrorsLogic' - -const MAX_TITLE_LENGTH = 75 - -export function SessionRecordingErrors(): JSX.Element { - const { openSessionPlayer } = useActions(sessionPlayerModalLogic) - const { errors, errorsLoading } = useValues(sessionRecordingErrorsLogic) - const { loadErrorClusters, createPlaylist } = useActions(sessionRecordingErrorsLogic) - - if (!errors && !errorsLoading) { - return ( - } onClick={() => loadErrorClusters()}> - Automagically find errors - - ) - } - - return ( - <> - { - const displayTitle = parseTitle(cluster.sample) - return ( -
- {displayTitle} -
- ) - }, - width: '50%', - }, - { - title: '', - render: (_, cluster) => { - return ( - - ) - }, - }, - { - title: 'Occurrences', - dataIndex: 'occurrences', - sorter: (a, b) => a.occurrences - b.occurrences, - }, - { - title: 'Sessions', - dataIndex: 'unique_sessions', - sorter: (a, b) => a.unique_sessions - b.unique_sessions, - }, - { - title: 'Viewed', - tooltip: "How many of these you've already viewed", - dataIndex: 'viewed', - render: function Render(_, cluster) { - return `${((cluster.viewed / cluster.unique_sessions) * 100).toFixed(0)}%` - }, - sorter: (a, b) => a.viewed / a.unique_sessions - b.viewed / b.unique_sessions, - }, - { - title: 'Actions', - render: function Render(_, cluster) { - return ( -
- { - e.preventDefault() - openSessionPlayer({ id: cluster.session_ids[0] }) - }} - className="whitespace-nowrap" - type="primary" - > - Watch example - - { - createPlaylist( - `Examples of '${parseTitle(cluster.sample)}'`, - cluster.session_ids - ) - }} - className="whitespace-nowrap" - type="secondary" - tooltip="Create a playlist of recordings containing this issue" - > - Create playlist - -
- ) - }, - }, - ]} - loading={errorsLoading} - dataSource={errors || []} - expandable={{ - expandedRowRender: (cluster) => , - }} - /> - - ) -} - -const ExpandedError = ({ error }: { error: string }): JSX.Element => { - const hasJson = isJSON(error) - const [activeTab, setActiveTab] = useState(hasJson ? 'json' : 'raw') - - return hasJson ? ( -
- , - }, - { key: 'raw', label: 'Raw', content: {error} }, - ]} - /> -
- ) : ( -
-

Example error

-
{error}
-
- ) -} - -function isJSON(str: string): boolean { - try { - JSON.parse(str) - return true - } catch { - return false - } -} - -function parseTitle(error: string): string { - let input - try { - const parsedError = JSON.parse(error) - input = parsedError.error || error - } catch { - input = error - } - - if (!input) { - return error - } - - try { - // TRICKY - after json parsing we might not have a string, - // since the JSON parser will helpfully convert to other types too e.g. have seen objects here - if (typeof input !== 'string') { - input = JSON.stringify(input) - } - - return input.split('\n')[0].trim().substring(0, MAX_TITLE_LENGTH) || error - } catch (e) { - captureException(e, { extra: { error }, tags: { feature: 'replay/error-clustering' } }) - return error - } -} diff --git a/frontend/src/scenes/session-recordings/errors/sessionRecordingErrorsLogic.ts b/frontend/src/scenes/session-recordings/errors/sessionRecordingErrorsLogic.ts deleted file mode 100644 index 49de62c7bf5c4..0000000000000 --- a/frontend/src/scenes/session-recordings/errors/sessionRecordingErrorsLogic.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { actions, afterMount, kea, listeners, path } from 'kea' -import { loaders } from 'kea-loaders' -import { router } from 'kea-router' -import api from 'lib/api' -import { urls } from 'scenes/urls' - -import { ErrorClusterResponse } from '~/types' - -import { createPlaylist } from '../playlist/playlistUtils' -import type { sessionRecordingErrorsLogicType } from './sessionRecordingErrorsLogicType' - -export const sessionRecordingErrorsLogic = kea([ - path(['scenes', 'session-recordings', 'detail', 'sessionRecordingErrorsLogic']), - actions({ - createPlaylist: (name: string, sessionIds: string[]) => ({ name, sessionIds }), - }), - loaders(() => ({ - errors: [ - null as ErrorClusterResponse, - { - loadErrorClusters: async (refresh: boolean = true) => { - const response = await api.recordings.errorClusters(refresh) - return response - }, - }, - ], - })), - listeners(() => ({ - createPlaylist: async ({ name, sessionIds }) => { - const playlist = await createPlaylist({ name: name }) - - if (playlist) { - const samples = sessionIds.slice(0, 10) - await Promise.all( - samples.map((sessionId) => api.recordings.addRecordingToPlaylist(playlist.short_id, sessionId)) - ) - router.actions.push(urls.replayPlaylist(playlist.short_id)) - } - }, - })), - afterMount(({ actions }) => { - actions.loadErrorClusters(false) - }), -]) diff --git a/frontend/src/scenes/session-recordings/player/PlayerFrameOverlay.tsx b/frontend/src/scenes/session-recordings/player/PlayerFrameOverlay.tsx index f869661481789..6337aafc995af 100644 --- a/frontend/src/scenes/session-recordings/player/PlayerFrameOverlay.tsx +++ b/frontend/src/scenes/session-recordings/player/PlayerFrameOverlay.tsx @@ -3,7 +3,6 @@ import './PlayerFrameOverlay.scss' import { IconPlay } from '@posthog/icons' import clsx from 'clsx' import { useActions, useValues } from 'kea' -import { useFeatureFlag } from 'lib/hooks/useFeatureFlag' import { IconErrorOutline, IconSync } from 'lib/lemon-ui/icons' import { LemonButton } from 'lib/lemon-ui/LemonButton' import { useState } from 'react' @@ -13,7 +12,6 @@ import { getCurrentExporterData } from '~/exporter/exporterViewLogic' import { SessionPlayerState } from '~/types' import { PlayerUpNext } from './PlayerUpNext' -import { SimilarRecordings } from './SimilarRecordings' const PlayerFrameOverlayContent = (): JSX.Element | null => { const { currentPlayerState, endReached } = useValues(sessionRecordingPlayerLogic) @@ -84,7 +82,6 @@ const PlayerFrameOverlayContent = (): JSX.Element | null => { export function PlayerFrameOverlay(): JSX.Element { const { playlistLogic } = useValues(sessionRecordingPlayerLogic) const { togglePlayPause } = useActions(sessionRecordingPlayerLogic) - const hasSimilarRecordings = useFeatureFlag('REPLAY_SIMILAR_RECORDINGS') const [interrupted, setInterrupted] = useState(false) @@ -96,7 +93,6 @@ export function PlayerFrameOverlay(): JSX.Element { onMouseOut={() => setInterrupted(false)} > - {hasSimilarRecordings && } {playlistLogic ? ( { const { logicProps } = useValues(sessionRecordingPlayerLogic) const { exportRecordingToFile, deleteRecording, setIsFullScreen } = useActions(sessionRecordingPlayerLogic) - const { fetchSimilarRecordings } = useActions(sessionRecordingDataLogic(logicProps)) const hasMobileExportFlag = useFeatureFlag('SESSION_REPLAY_EXPORT_MOBILE_DATA') const hasMobileExport = window.IMPERSONATED_SESSION || hasMobileExportFlag - const hasSimilarRecordings = useFeatureFlag('REPLAY_SIMILAR_RECORDINGS') const onDelete = (): void => { setIsFullScreen(false) @@ -208,12 +196,6 @@ const MenuActions = (): JSX.Element => { 'DEBUG ONLY - Export untransformed recording to a file. This can be loaded later into PostHog for playback.', icon: , }, - hasSimilarRecordings && { - label: 'Find similar recordings', - onClick: fetchSimilarRecordings, - icon: , - tooltip: 'DEBUG ONLY - Find similar recordings based on distance calculations via embeddings.', - }, logicProps.playerKey !== 'modal' && { label: 'Delete recording', status: 'danger', diff --git a/frontend/src/scenes/session-recordings/player/SimilarRecordings.tsx b/frontend/src/scenes/session-recordings/player/SimilarRecordings.tsx deleted file mode 100644 index 00d2bc58b5b3d..0000000000000 --- a/frontend/src/scenes/session-recordings/player/SimilarRecordings.tsx +++ /dev/null @@ -1,34 +0,0 @@ -import { LemonButton, Spinner } from '@posthog/lemon-ui' -import { useValues } from 'kea' -import { urls } from 'scenes/urls' - -import { sessionRecordingDataLogic } from './sessionRecordingDataLogic' -import { sessionRecordingPlayerLogic } from './sessionRecordingPlayerLogic' - -export function SimilarRecordings(): JSX.Element | null { - const { logicProps } = useValues(sessionRecordingPlayerLogic) - const { similarRecordings, similarRecordingsLoading } = useValues(sessionRecordingDataLogic(logicProps)) - - if (!similarRecordings && !similarRecordingsLoading) { - return null - } - - return ( -
- {similarRecordingsLoading ? ( - - ) : !!similarRecordings && similarRecordings?.length > 0 ? ( -
- Watch similar recordings - {similarRecordings?.map(([id, similarity]) => ( - - {similarity} - - ))} -
- ) : ( - No similar recordings found - )} -
- ) -} diff --git a/frontend/src/scenes/session-recordings/player/sessionRecordingDataLogic.ts b/frontend/src/scenes/session-recordings/player/sessionRecordingDataLogic.ts index 67871116cb224..10118ce5defdc 100644 --- a/frontend/src/scenes/session-recordings/player/sessionRecordingDataLogic.ts +++ b/frontend/src/scenes/session-recordings/player/sessionRecordingDataLogic.ts @@ -727,14 +727,6 @@ export const sessionRecordingDataLogic = kea([ }, }, ], - similarRecordings: [ - null as [string, number][] | null, - { - fetchSimilarRecordings: async () => { - return await api.recordings.similarRecordings(props.sessionRecordingId) - }, - }, - ], })), listeners(({ values, actions, cache, props }) => ({ loadSnapshots: () => { diff --git a/frontend/src/scenes/session-recordings/sessionReplaySceneLogic.ts b/frontend/src/scenes/session-recordings/sessionReplaySceneLogic.ts index c2dce12e7f9e0..5f1bee532fdaa 100644 --- a/frontend/src/scenes/session-recordings/sessionReplaySceneLogic.ts +++ b/frontend/src/scenes/session-recordings/sessionReplaySceneLogic.ts @@ -69,11 +69,8 @@ export const sessionReplaySceneLogic = kea([ tabs: [ (s) => [s.featureFlags], (featureFlags) => { - const hasErrorClustering = !!featureFlags[FEATURE_FLAGS.REPLAY_ERROR_CLUSTERING] const hasTemplates = !!featureFlags[FEATURE_FLAGS.REPLAY_TEMPLATES] - return Object.values(ReplayTabs).filter((tab) => - tab == ReplayTabs.Errors ? hasErrorClustering : tab == ReplayTabs.Templates ? hasTemplates : true - ) + return Object.values(ReplayTabs).filter((tab) => (tab == ReplayTabs.Templates ? hasTemplates : true)) }, ], breadcrumbs: [ diff --git a/frontend/src/types.ts b/frontend/src/types.ts index c4f4031a0e06e..498eb1f1f5a67 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -688,7 +688,6 @@ export enum ReplayTabs { Templates = 'templates', Home = 'home', Playlists = 'playlists', - Errors = 'errors', } export enum ExperimentsTabs { diff --git a/posthog/session_recordings/session_recording_api.py b/posthog/session_recordings/session_recording_api.py index 3961109fc365f..83aa9564b2acb 100644 --- a/posthog/session_recordings/session_recording_api.py +++ b/posthog/session_recordings/session_recording_api.py @@ -20,8 +20,6 @@ from rest_framework.response import Response from rest_framework.utils.encoders import JSONEncoder -from ee.session_recordings.ai.error_clustering import error_clustering -from ee.session_recordings.ai.similar_recordings import similar_recordings from ee.session_recordings.session_summary.summarize_session import summarize_recording from posthog.api.person import MinimalPersonSerializer from posthog.api.routing import TeamAndOrgViewSetMixin @@ -685,65 +683,6 @@ def summarize(self, request: request.Request, **kwargs): ) return r - @extend_schema(exclude=True) - @action(methods=["GET"], detail=True) - def similar_sessions(self, request: request.Request, **kwargs): - if not request.user.is_authenticated: - raise exceptions.NotAuthenticated() - - cache_key = f'similar_sessions_{self.team.pk}_{self.kwargs["pk"]}' - # Check if the response is cached - cached_response = cache.get(cache_key) - if cached_response: - return Response(cached_response) - - user = cast(User, request.user) - - if not posthoganalytics.feature_enabled("session-replay-similar-recordings", str(user.distinct_id)): - raise exceptions.ValidationError("similar recordings is not enabled for this user") - - recording = self.get_object() - - if not SessionReplayEvents().exists(session_id=str(recording.session_id), team=self.team): - raise exceptions.NotFound("Recording not found") - - recordings = similar_recordings(recording, self.team) - if recordings: - cache.set(cache_key, recordings, timeout=30) - - # let the browser cache for half the time we cache on the server - r = Response(recordings, headers={"Cache-Control": "max-age=15"}) - return r - - @extend_schema(exclude=True) - @action(methods=["GET"], detail=False) - def error_clusters(self, request: request.Request, **kwargs): - if not request.user.is_authenticated: - raise exceptions.NotAuthenticated() - - refresh_clusters = request.GET.get("refresh") - - cache_key = f"cluster_errors_{self.team.pk}" - # Check if the response is cached - cached_response = cache.get(cache_key) - if cached_response and not refresh_clusters: - return Response(cached_response) - - user = cast(User, request.user) - - if not posthoganalytics.feature_enabled("session-replay-error-clustering", str(user.distinct_id)): - raise exceptions.ValidationError("clustered errors is not enabled for this user") - - # Clustering will eventually be done during a scheduled background task - clusters = error_clustering(self.team) - - if clusters: - cache.set(cache_key, clusters, settings.CACHED_RESULTS_TTL) - - # let the browser cache for half the time we cache on the server - r = Response(clusters, headers={"Cache-Control": "max-age=15"}) - return r - def _stream_blob_to_client( self, recording: SessionRecording, request: request.Request, event_properties: dict ) -> HttpResponse: diff --git a/posthog/settings/session_replay.py b/posthog/settings/session_replay.py index 08ddbce6dcb65..9bb53a501e905 100644 --- a/posthog/settings/session_replay.py +++ b/posthog/settings/session_replay.py @@ -16,17 +16,6 @@ "REALTIME_SNAPSHOTS_FROM_REDIS_ATTEMPT_TIMEOUT_SECONDS", 0.2, type_cast=float ) -REPLAY_EMBEDDINGS_ALLOWED_TEAMS: list[str] = get_list(get_from_env("REPLAY_EMBEDDINGS_ALLOWED_TEAM", "", type_cast=str)) -REPLAY_EMBEDDINGS_BATCH_SIZE = get_from_env("REPLAY_EMBEDDINGS_BATCH_SIZE", 10, type_cast=int) -REPLAY_EMBEDDINGS_MIN_DURATION_SECONDS = get_from_env("REPLAY_EMBEDDINGS_MIN_DURATION_SECONDS", 30, type_cast=int) -REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS = get_from_env( - "REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS", 150, type_cast=int -) -REPLAY_EMBEDDINGS_CLUSTERING_DBSCAN_EPS = get_from_env("REPLAY_EMBEDDINGS_CLUSTERING_DBSCAN_EPS", 0.2, type_cast=float) -REPLAY_EMBEDDINGS_CLUSTERING_DBSCAN_MIN_SAMPLES = get_from_env( - "REPLAY_EMBEDDINGS_CLUSTERING_DBSCAN_MIN_SAMPLES", 10, type_cast=int -) - REPLAY_MESSAGE_TOO_LARGE_SAMPLE_RATE = get_from_env("REPLAY_MESSAGE_TOO_LARGE_SAMPLE_RATE", 0, type_cast=float) REPLAY_MESSAGE_TOO_LARGE_SAMPLE_BUCKET = get_from_env( "REPLAY_MESSAGE_TOO_LARGE_SAMPLE_BUCKET", "posthog-cloud-prod-us-east-1-k8s-replay-samples" diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index 0d9628490b788..5972857f7fd2b 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -19,7 +19,6 @@ calculate_cohort, calculate_decide_usage, calculate_external_data_rows_synced, - calculate_replay_embeddings, check_async_migration_health, check_flags_to_rollback, clean_stale_partials, @@ -288,16 +287,6 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: ) if settings.EE_AVAILABLE: - # every interval seconds, we calculate N replay embeddings - # the goal is to process _enough_ every 24 hours that - # there is a meaningful playlist to test with - add_periodic_task_with_expiry( - sender, - settings.REPLAY_EMBEDDINGS_CALCULATION_CELERY_INTERVAL_SECONDS, - calculate_replay_embeddings.s(), - name="calculate replay embeddings", - ) - sender.add_periodic_task( crontab(hour="0", minute=str(randrange(0, 40))), clickhouse_send_license_usage.s(), diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index a7e291707914d..7cccde1b31249 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -910,34 +910,6 @@ def ee_persist_finished_recordings() -> None: persist_finished_recordings() -# this task runs a CH query and triggers other tasks -# it can run on the default queue -@shared_task(ignore_result=True) -def calculate_replay_embeddings() -> None: - try: - from ee.tasks.replay import generate_recordings_embeddings_batch - - generate_recordings_embeddings_batch() - except ImportError: - pass - except Exception as e: - logger.error("Failed to calculate replay embeddings", error=e, exc_info=True) - - -# this task triggers other tasks -# it can run on the default queue -@shared_task(ignore_result=True) -def calculate_replay_error_clusters() -> None: - try: - from ee.tasks.replay import generate_replay_embedding_error_clusters - - generate_replay_embedding_error_clusters() - except ImportError: - pass - except Exception as e: - logger.error("Failed to calculate replay error clusters", error=e, exc_info=True) - - @shared_task(ignore_result=True) def calculate_external_data_rows_synced() -> None: try: