diff --git a/ee/session_recordings/ai/generate_embeddings.py b/ee/session_recordings/ai/generate_embeddings.py index 96a818152f7f5..23e6a601d7a7b 100644 --- a/ee/session_recordings/ai/generate_embeddings.py +++ b/ee/session_recordings/ai/generate_embeddings.py @@ -111,29 +111,34 @@ def fetch_recordings_without_embeddings(team: Team | int, offset=0) -> List[str] def embed_batch_of_recordings(recordings: List[str], team: Team | int) -> None: - if isinstance(team, int): - team = Team.objects.get(id=team) + try: + if isinstance(team, int): + team = Team.objects.get(id=team) - logger.info(f"processing {len(recordings)} recordings to embed for team {team.pk}") + logger.info( + f"processing {len(recordings)} recordings to embed for team {team.pk}", flow="embeddings", team_id=team.pk + ) - while len(recordings) > 0: - batched_embeddings = [] - for session_id in recordings: - with GENERATE_RECORDING_EMBEDDING_TIMING.time(): - embeddings = generate_recording_embeddings(session_id=session_id, team=team) + while len(recordings) > 0: + batched_embeddings = [] + for session_id in recordings: + with GENERATE_RECORDING_EMBEDDING_TIMING.time(): + embeddings = generate_recording_embeddings(session_id=session_id, team=team) - if embeddings: - SESSION_EMBEDDINGS_GENERATED.inc() - batched_embeddings.append( - { - "session_id": session_id, - "team_id": team.pk, - "embeddings": embeddings, - } - ) + if embeddings: + SESSION_EMBEDDINGS_GENERATED.inc() + batched_embeddings.append( + { + "session_id": session_id, + "team_id": team.pk, + "embeddings": embeddings, + } + ) - if len(batched_embeddings) > 0: - flush_embeddings_to_clickhouse(embeddings=batched_embeddings) + if len(batched_embeddings) > 0: + flush_embeddings_to_clickhouse(embeddings=batched_embeddings) + except Exception as e: + logger.error(f"embed recordings error", flow="embeddings", error=e) def flush_embeddings_to_clickhouse(embeddings: List[Dict[str, Any]]) -> None: @@ -142,6 +147,7 @@ def flush_embeddings_to_clickhouse(embeddings: List[Dict[str, Any]]) -> None: def generate_recording_embeddings(session_id: str, team: Team | int) -> List[float] | None: + logger.error(f"generating embedding for session", flow="embeddings", session_id=session_id) if isinstance(team, int): team = Team.objects.get(id=team) @@ -149,7 +155,7 @@ def generate_recording_embeddings(session_id: str, team: Team | int) -> List[flo session_metadata = SessionReplayEvents().get_metadata(session_id=str(session_id), team=team) if not session_metadata: - logger.error(f"no session metadata found for session_id {session_id}") + logger.error(f"no session metadata found for session", flow="embeddings", session_id=session_id) SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.inc() return None @@ -163,7 +169,7 @@ def generate_recording_embeddings(session_id: str, team: Team | int) -> List[flo ) if not session_events or not session_events[0] or not session_events[1]: - logger.error(f"no events found for session_id {session_id}") + logger.error(f"no events found for session", flow="embeddings", session_id=session_id) SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.inc() return None @@ -176,6 +182,8 @@ def generate_recording_embeddings(session_id: str, team: Team | int) -> List[flo ) ) + logger.error(f"collapsed events for session", flow="embeddings", session_id=session_id) + processed_sessions_index = processed_sessions.column_index("event") current_url_index = processed_sessions.column_index("$current_url") elements_chain_index = processed_sessions.column_index("elements_chain") @@ -193,6 +201,8 @@ def generate_recording_embeddings(session_id: str, team: Team | int) -> List[flo ) ) + logger.error(f"generating embedding input for session", flow="embeddings", session_id=session_id) + embeddings = ( client.embeddings.create( input=input, @@ -202,6 +212,8 @@ def generate_recording_embeddings(session_id: str, team: Team | int) -> List[flo .embedding ) + logger.error(f"generated embedding input for session", flow="embeddings", session_id=session_id) + return embeddings