Skip to content

Commit

Permalink
chore: run clustering in background task (#21080)
Browse files Browse the repository at this point in the history
  • Loading branch information
daibhin authored Mar 28, 2024
1 parent ddeedc7 commit 4e22252
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 11 deletions.
10 changes: 5 additions & 5 deletions ee/session_recordings/ai/error_clustering.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from prometheus_client import Histogram
from django.conf import settings
from posthog.clickhouse.client import sync_execute
from posthog.models import Team, User
from posthog.models import Team
from sklearn.cluster import DBSCAN
import pandas as pd
import numpy as np
Expand All @@ -25,7 +25,7 @@
DBSCAN_MIN_SAMPLES = settings.REPLAY_EMBEDDINGS_CLUSTERING_DBSCAN_MIN_SAMPLES


def error_clustering(team: Team, user: User):
def error_clustering(team: Team):
results = fetch_error_embeddings(team.pk)

if not results:
Expand All @@ -37,7 +37,7 @@ def error_clustering(team: Team, user: User):

CLUSTER_REPLAY_ERRORS_CLUSTER_COUNT.labels(team_id=team.pk).observe(df["cluster"].nunique())

return construct_response(df, team, user)
return construct_response(df, team)


def fetch_error_embeddings(team_id: int):
Expand Down Expand Up @@ -67,9 +67,9 @@ def cluster_embeddings(embeddings):
return dbscan.labels_


def construct_response(df: pd.DataFrame, team: Team, user: User):
def construct_response(df: pd.DataFrame, team: Team):
viewed_session_ids = list(
SessionRecordingViewed.objects.filter(team=team, user=user, session_id__in=df["session_id"].unique())
SessionRecordingViewed.objects.filter(team=team, session_id__in=df["session_id"].unique())
.values_list("session_id", flat=True)
.distinct()
)
Expand Down
9 changes: 8 additions & 1 deletion ee/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
handle_subscription_value_change,
schedule_all_subscriptions,
)
from .replay import embed_batch_of_recordings_task, generate_recordings_embeddings_batch
from .replay import (
embed_batch_of_recordings_task,
generate_recordings_embeddings_batch,
generate_replay_embedding_error_clusters,
cluster_replay_error_embeddings,
)

# As our EE tasks are not included at startup for Celery, we need to ensure they are declared here so that they are imported by posthog/settings/celery.py

Expand All @@ -19,4 +24,6 @@
"handle_subscription_value_change",
"embed_batch_of_recordings_task",
"generate_recordings_embeddings_batch",
"generate_replay_embedding_error_clusters",
"cluster_replay_error_embeddings",
]
33 changes: 30 additions & 3 deletions ee/tasks/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
ErrorEmbeddingsPreparation,
SessionEventsEmbeddingsPreparation,
)
from ee.session_recordings.ai.error_clustering import error_clustering
from posthog import settings
from posthog.models import Team
from posthog.tasks.utils import CeleryQueue
from django.core.cache import cache

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -64,9 +66,34 @@ def generate_recordings_embeddings_batch() -> None:
team_id=team_id,
)
embed_batch_of_recordings_task.si(recordings, int(team_id)).apply_async()
except Team.DoesNotExist:
logger.info(f"[generate_recordings_embeddings_batch] Team {team_id} does not exist. Skipping.")
pass
except Exception as e:
logger.error(f"[generate_recordings_embeddings_batch] Error: {e}.", exc_info=True, error=e)
pass


@shared_task(ignore_result=True)
def generate_replay_embedding_error_clusters() -> None:
for team_id in settings.REPLAY_EMBEDDINGS_ALLOWED_TEAMS:
try:
cluster_replay_error_embeddings.si(int(team_id)).apply_async()
except Exception as e:
logger.error(f"[generate_replay_error_clusters] Error: {e}.", exc_info=True, error=e)
pass


@shared_task(ignore_result=True, queue=CeleryQueue.SESSION_REPLAY_EMBEDDINGS.value)
def cluster_replay_error_embeddings(team_id: int) -> None:
try:
team = Team.objects.get(id=team_id)
clusters = error_clustering(team)

cache.set(f"cluster_errors_{team.pk}", clusters, settings.CACHED_RESULTS_TTL)

logger.info(
f"[generate_replay_error_clusters] Completed for team",
flow="embeddings",
team_id=team_id,
)
except Team.DoesNotExist:
logger.info(f"[generate_replay_error_clusters] Team {team} does not exist. Skipping.")
pass
4 changes: 2 additions & 2 deletions posthog/session_recordings/session_recording_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,10 @@ def error_clusters(self, request: request.Request, **kwargs):
raise exceptions.ValidationError("clustered errors is not enabled for this user")

# Clustering will eventually be done during a scheduled background task
clusters = error_clustering(self.team, user)
clusters = error_clustering(self.team)

if clusters:
cache.set(cache_key, clusters, timeout=30)
cache.set(cache_key, clusters, settings.CACHED_RESULTS_TTL)

# let the browser cache for half the time we cache on the server
r = Response(clusters, headers={"Cache-Control": "max-age=15"})
Expand Down
8 changes: 8 additions & 0 deletions posthog/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
update_quota_limiting,
verify_persons_data_in_sync,
calculate_replay_embeddings,
calculate_replay_error_clusters,
)
from posthog.utils import get_crontab

Expand Down Expand Up @@ -249,6 +250,13 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None:
name="calculate replay embeddings",
)

add_periodic_task_with_expiry(
sender,
crontab(hour="10", minute=str(randrange(0, 40))),
calculate_replay_error_clusters.s(),
name="calculate replay error clusters",
) # every day at a random minute past 10am. Randomize to avoid overloading license.posthog.com

sender.add_periodic_task(
crontab(hour="0", minute=str(randrange(0, 40))),
clickhouse_send_license_usage.s(),
Expand Down
14 changes: 14 additions & 0 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,3 +757,17 @@ def calculate_replay_embeddings() -> None:
pass
except Exception as e:
logger.error("Failed to calculate replay embeddings", error=e, exc_info=True)


# this task triggers other tasks
# it can run on the default queue
@shared_task(ignore_result=True)
def calculate_replay_error_clusters() -> None:
try:
from ee.tasks.replay import generate_replay_embedding_error_clusters

generate_replay_embedding_error_clusters()
except ImportError:
pass
except Exception as e:
logger.error("Failed to calculate replay error clusters", error=e, exc_info=True)

0 comments on commit 4e22252

Please sign in to comment.