Skip to content

Commit

Permalink
start wiring up Celery task
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Feb 12, 2024
1 parent 1fbb4da commit 2158178
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
5 changes: 4 additions & 1 deletion ee/session_recordings/ai/generate_embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ def flush_embeddings_to_clickhouse(embeddings: List[Dict[str, Any]]) -> None:
sync_execute("INSERT INTO session_replay_embeddings (session_id, team_id, embeddings) VALUES", embeddings)


def generate_recording_embeddings(session_id: str, team: Team) -> List[float] | None:
def generate_recording_embeddings(session_id: str, team: Team | int) -> List[float] | None:
if isinstance(team, int):
team = Team.objects.get(pk=team)

client = OpenAI()

session_metadata = SessionReplayEvents().get_metadata(session_id=str(session_id), team=team)
Expand Down
25 changes: 11 additions & 14 deletions ee/tasks/replay_summaries.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
import structlog
from celery import shared_task

# from ee.session_recordings.ai.generate_embeddings import (
# generate_recording_embedding,
# fetch_recordings_without_embeddings,
# )
from ee.session_recordings.ai.generate_embeddings import (
fetch_recordings_without_embeddings,
generate_recording_embeddings,
)
from posthog.tasks.utils import CeleryQueue

logger = structlog.get_logger(__name__)


# just so we can merge into another PR
@shared_task(ignore_result=True, queue=CeleryQueue.SESSION_REPLAY_EMBEDDINGS.value)
def embed_single_recording(session_id: str, team_id: int) -> None:
# generate_recording_embedding(session_id, team_id)
pass
generate_recording_embeddings(session_id, team_id)


# just so we can merge into another PR
@shared_task(ignore_result=True)
def generate_recording_embeddings() -> None:
# recordings = fetch_recordings_without_embeddings()
# for recording in []: # recordings:
# # push each embedding task to a separate queue
# embed_single_recording.delay(recording.session_id, recording.team_id)
pass
def generate_recordings_embeddings_batch() -> None:
for recording in fetch_recordings_without_embeddings():

Check failure on line 20 in ee/tasks/replay_summaries.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Missing positional argument "team" in call to "fetch_recordings_without_embeddings"
# push each embedding task to a separate queue
# TODO really we should be doing scatter and gather here
# so we can do one CH update at the end of a batch
embed_single_recording.delay(recording.session_id, recording.team_id)
4 changes: 2 additions & 2 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,8 @@ def check_data_import_row_limits() -> None:
@shared_task(ignore_result=True)
def calculate_replay_embeddings() -> None:
try:
from ee.tasks.replay_summaries import generate_recording_embeddings
from ee.tasks.replay_summaries import generate_recordings_embeddings_batch

generate_recording_embeddings()
generate_recordings_embeddings_batch()
except ImportError:
pass

0 comments on commit 2158178

Please sign in to comment.