Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into by/onboarding-in-app
Browse files Browse the repository at this point in the history
  • Loading branch information
Bianca Yang committed Feb 14, 2024
2 parents 19ddf6b + 9f838f9 commit 895e060
Show file tree
Hide file tree
Showing 169 changed files with 5,105 additions and 2,326 deletions.
1 change: 0 additions & 1 deletion .github/actions/run-backend-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ runs:
run: echo "PYTEST_ARGS=--snapshot-update" >> $GITHUB_ENV # We can only update snapshots within the PostHog org

# Tests

- name: Run FOSS tests
if: ${{ inputs.segment == 'FOSS' }}
env:
Expand Down
7 changes: 1 addition & 6 deletions .github/workflows/container-images-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ jobs:
with:
fetch-depth: 2

- name: Override git.py
run: >
echo "def get_git_commit(): return '${GITHUB_SHA}'" > posthog/git.py
echo "def get_git_branch(): return '${GITHUB_REF_NAME}'" >> posthog/git.py
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

Expand Down Expand Up @@ -107,7 +102,7 @@ jobs:
message: |
{
"image_tag": "${{ steps.build.outputs.digest }}"
}
}
- name: Check for changes in plugins directory
id: check_changes_plugins
Expand Down
7 changes: 4 additions & 3 deletions .run/Celery.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
<env name="DEBUG" value="1" />
<env name="CLICKHOUSE_SECURE" value="False" />
<env name="KAFKA_HOSTS" value="localhost" />
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
<env name="DEBUG" value="1" />
<env name="KAFKA_HOSTS" value="localhost" />
<env name="PYTHONUNBUFFERED" value="1" />
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />
<env name="REPLAY_EMBEDDINGS_ALLOWED_TEAM" value="1,2,3" />
</envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/env/bin/python" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/env/bin" />
Expand Down
2 changes: 1 addition & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Important: Add new queues to make Celery consume tasks from them.

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery,usage_reports
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery,usage_reports,session_replay_embeddings
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'''
/* user_id:127 celery:posthog.tasks.tasks.sync_insight_caching_state */
/* user_id:124 celery:posthog.tasks.tasks.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
Empty file.
192 changes: 192 additions & 0 deletions ee/session_recordings/ai/generate_embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
from openai import OpenAI

from typing import Dict, Any, List

from prometheus_client import Histogram, Counter

from posthog.models import Team

from posthog.session_recordings.queries.session_replay_events import SessionReplayEvents
from ee.session_recordings.ai.utils import (
SessionSummaryPromptData,
reduce_elements_chain,
simplify_window_id,
format_dates,
collapse_sequence_of_events,
)
from structlog import get_logger
from posthog.clickhouse.client import sync_execute
import datetime
import pytz

GENERATE_RECORDING_EMBEDDING_TIMING = Histogram(
"posthog_session_recordings_generate_recording_embedding",
"Time spent generating recording embeddings for a single session",
)
SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS = Counter(
"posthog_session_recordings_skipped_when_generating_embeddings",
"Number of sessions skipped when generating embeddings",
)
SESSION_EMBEDDINGS_GENERATED = Counter(
"posthog_session_recordings_embeddings_generated",
"Number of session embeddings generated",
)
SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE = Counter(
"posthog_session_recordings_embeddings_written_to_clickhouse",
"Number of session embeddings written to Clickhouse",
)

logger = get_logger(__name__)

# TODO move these to settings
BATCH_FLUSH_SIZE = 10
MIN_DURATION_INCLUDE_SECONDS = 120


def fetch_recordings_without_embeddings(team: Team | int, offset=0) -> List[str]:
if isinstance(team, int):
team = Team.objects.get(id=team)

query = """
WITH embedding_ids AS
(
SELECT
session_id
from
session_replay_embeddings
where
team_id = %(team_id)s
-- don't load all data for all time
and generation_timestamp > now() - INTERVAL 7 DAY
)
SELECT session_id
FROM
session_replay_events
WHERE
session_id NOT IN embedding_ids
AND team_id = %(team_id)s
-- must be a completed session
and min_first_timestamp < now() - INTERVAL 1 DAY
-- let's not load all data for all time
-- will definitely need to do something about this length of time
and min_first_timestamp > now() - INTERVAL 7 DAY
GROUP BY session_id
HAVING dateDiff('second', min(min_first_timestamp), max(max_last_timestamp)) > %(min_duration_include_seconds)s
LIMIT %(batch_flush_size)s
-- when running locally the offset is used for paging
-- when running in celery the offset is not used
OFFSET %(offset)s
"""

return [
x[0]
for x in sync_execute(
query,
{
"team_id": team.pk,
"batch_flush_size": BATCH_FLUSH_SIZE,
"offset": offset,
"min_duration_include_seconds": MIN_DURATION_INCLUDE_SECONDS,
},
)
]


def embed_batch_of_recordings(recordings: List[str], team: Team | int) -> None:
if isinstance(team, int):
team = Team.objects.get(id=team)

logger.info(f"processing {len(recordings)} recordings to embed for team {team.pk}")

while len(recordings) > 0:
batched_embeddings = []
for session_id in recordings:
with GENERATE_RECORDING_EMBEDDING_TIMING.time():
embeddings = generate_recording_embeddings(session_id=session_id, team=team)

if embeddings:
SESSION_EMBEDDINGS_GENERATED.inc()
batched_embeddings.append(
{
"session_id": session_id,
"team_id": team.pk,
"embeddings": embeddings,
}
)

if len(batched_embeddings) > 0:
flush_embeddings_to_clickhouse(embeddings=batched_embeddings)


def flush_embeddings_to_clickhouse(embeddings: List[Dict[str, Any]]) -> None:
sync_execute("INSERT INTO session_replay_embeddings (session_id, team_id, embeddings) VALUES", embeddings)
SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE.inc(len(embeddings))


def generate_recording_embeddings(session_id: str, team: Team | int) -> List[float] | None:
if isinstance(team, int):
team = Team.objects.get(id=team)

client = OpenAI()

session_metadata = SessionReplayEvents().get_metadata(session_id=str(session_id), team=team)
if not session_metadata:
logger.error(f"no session metadata found for session_id {session_id}")
SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.inc()
return None

session_events = SessionReplayEvents().get_events(
session_id=str(session_id),
team=team,
metadata=session_metadata,
events_to_ignore=[
"$feature_flag_called",
],
)

if not session_events or not session_events[0] or not session_events[1]:
logger.error(f"no events found for session_id {session_id}")
SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.inc()
return None

processed_sessions = collapse_sequence_of_events(
format_dates(
reduce_elements_chain(
simplify_window_id(SessionSummaryPromptData(columns=session_events[0], results=session_events[1]))
),
start=datetime.datetime(1970, 1, 1, tzinfo=pytz.UTC), # epoch timestamp
)
)

processed_sessions_index = processed_sessions.column_index("event")
current_url_index = processed_sessions.column_index("$current_url")
elements_chain_index = processed_sessions.column_index("elements_chain")

input = (
str(session_metadata)
+ "\n"
+ "\n".join(
compact_result(
event_name=result[processed_sessions_index] if processed_sessions_index is not None else "",
current_url=result[current_url_index] if current_url_index is not None else "",
elements_chain=result[elements_chain_index] if elements_chain_index is not None else "",
)
for result in processed_sessions.results
)
)

embeddings = (
client.embeddings.create(
input=input,
model="text-embedding-3-small",
)
.data[0]
.embedding
)

return embeddings


def compact_result(event_name: str, current_url: int, elements_chain: Dict[str, str] | str) -> str:
elements_string = elements_chain if isinstance(elements_chain, str) else ", ".join(str(e) for e in elements_chain)
return f"{event_name} {current_url} {elements_string}"
Loading

0 comments on commit 895e060

Please sign in to comment.