Skip to content

Commit

Permalink
push embedding generation onto its own queue
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Feb 12, 2024
1 parent 5aa3681 commit dba20b4
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
13 changes: 13 additions & 0 deletions posthog/tasks/ee/session_recordings/ai/generate_embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import List, Any

from posthog.models import Team


# stub - will be introduced in https://github.com/PostHog/posthog/pull/20046
def generate_recording_embedding(session_id: str, team_id: int) -> None:
pass


# stub - will be introduced in https://github.com/PostHog/posthog/pull/20046
def fetch_recordings_without_embeddings(team: Team) -> List[Any]:
return []
27 changes: 11 additions & 16 deletions posthog/tasks/replay_summaries.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
from datetime import timedelta

import structlog
from celery import shared_task
from django.utils import timezone

from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.tasks.ee.session_recordings.ai.generate_embeddings import (
fetch_recordings_without_embeddings,
generate_recording_embedding,
)
from posthog.tasks.utils import CeleryQueue

logger = structlog.get_logger(__name__)


@shared_task(ignore_result=True)
def embed_single_recording(id: str, team_id: int) -> None:
_ = SessionRecording.objects.get(id=id, team_id=team_id)
# TODO: do the embedding
@shared_task(ignore_result=True, queue=CeleryQueue.SESSION_REPLAY_EMBEDDINGS.value)
def embed_single_recording(session_id: str, team_id: int) -> None:
generate_recording_embedding(session_id, team_id)


@shared_task(ignore_result=True)
def generate_recording_embeddings() -> None:
one_day_old = timezone.now() - timedelta(hours=24)
one_week_old = timezone.now() - timedelta(days=7)
finished_recordings = SessionRecording.objects.filter(
created_at__lte=one_week_old, created_at__gte=one_day_old, object_storage_path=None
)

logger.info("Embedding finished recordings", count=finished_recordings.count())
recordings = fetch_recordings_without_embeddings()

Check failure on line 20 in posthog/tasks/replay_summaries.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Missing positional argument "team" in call to "fetch_recordings_without_embeddings"

for recording in finished_recordings:
for recording in recordings:
# push each embedding task to a separate queue
embed_single_recording.delay(recording.session_id, recording.team_id)
2 changes: 2 additions & 0 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@ def check_data_import_row_limits() -> None:
check_synced_row_limits()


# this task runs a CH query and triggers other tasks
# it can run on the default queue
@shared_task(ignore_result=True)
def calculate_replay_embeddings() -> None:
from posthog.tasks.replay_summaries import generate_recording_embeddings
Expand Down
1 change: 1 addition & 0 deletions posthog/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ class CeleryQueue(Enum):
EXPORTS = "exports"
SUBSCRIPTION_DELIVERY = "subscription_delivery"
USAGE_REPORTS = "usage_reports"
SESSION_REPLAY_EMBEDDINGS = "session_replay_embeddings"

0 comments on commit dba20b4

Please sign in to comment.