Skip to content

Commit

Permalink
Merge branch 'surveys-results-updates' of https://github.com/PostHog/…
Browse files Browse the repository at this point in the history
…posthog into surveys-results-updates
  • Loading branch information
liyiy committed Feb 14, 2024
2 parents ac7ac6c + fdbb9f6 commit e97c46d
Show file tree
Hide file tree
Showing 150 changed files with 4,673 additions and 2,142 deletions.
1 change: 0 additions & 1 deletion .github/actions/run-backend-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ runs:
run: echo "PYTEST_ARGS=--snapshot-update" >> $GITHUB_ENV # We can only update snapshots within the PostHog org

# Tests

- name: Run FOSS tests
if: ${{ inputs.segment == 'FOSS' }}
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/container-images-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ jobs:
message: |
{
"image_tag": "${{ steps.build.outputs.digest }}"
}
}
- name: Check for changes in plugins directory
id: check_changes_plugins
Expand Down
7 changes: 4 additions & 3 deletions .run/Celery.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
<env name="DEBUG" value="1" />
<env name="CLICKHOUSE_SECURE" value="False" />
<env name="KAFKA_HOSTS" value="localhost" />
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
<env name="DEBUG" value="1" />
<env name="KAFKA_HOSTS" value="localhost" />
<env name="PYTHONUNBUFFERED" value="1" />
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />
<env name="REPLAY_EMBEDDINGS_ALLOWED_TEAM" value="1,2,3" />
</envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/env/bin/python" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/env/bin" />
Expand Down
2 changes: 1 addition & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Important: Add new queues to make Celery consume tasks from them.

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery,usage_reports
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery,usage_reports,session_replay_embeddings
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'''
/* user_id:127 celery:posthog.tasks.tasks.sync_insight_caching_state */
/* user_id:124 celery:posthog.tasks.tasks.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
Empty file.
192 changes: 192 additions & 0 deletions ee/session_recordings/ai/generate_embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
from openai import OpenAI

from typing import Dict, Any, List

from prometheus_client import Histogram, Counter

from posthog.models import Team

from posthog.session_recordings.queries.session_replay_events import SessionReplayEvents
from ee.session_recordings.ai.utils import (
SessionSummaryPromptData,
reduce_elements_chain,
simplify_window_id,
format_dates,
collapse_sequence_of_events,
)
from structlog import get_logger
from posthog.clickhouse.client import sync_execute
import datetime
import pytz

GENERATE_RECORDING_EMBEDDING_TIMING = Histogram(
"posthog_session_recordings_generate_recording_embedding",
"Time spent generating recording embeddings for a single session",
)
SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS = Counter(
"posthog_session_recordings_skipped_when_generating_embeddings",
"Number of sessions skipped when generating embeddings",
)
SESSION_EMBEDDINGS_GENERATED = Counter(
"posthog_session_recordings_embeddings_generated",
"Number of session embeddings generated",
)
SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE = Counter(
"posthog_session_recordings_embeddings_written_to_clickhouse",
"Number of session embeddings written to Clickhouse",
)

logger = get_logger(__name__)

# TODO move these to settings
BATCH_FLUSH_SIZE = 10
MIN_DURATION_INCLUDE_SECONDS = 120


def fetch_recordings_without_embeddings(team: Team | int, offset=0) -> List[str]:
if isinstance(team, int):
team = Team.objects.get(id=team)

query = """
WITH embedding_ids AS
(
SELECT
session_id
from
session_replay_embeddings
where
team_id = %(team_id)s
-- don't load all data for all time
and generation_timestamp > now() - INTERVAL 7 DAY
)
SELECT session_id
FROM
session_replay_events
WHERE
session_id NOT IN embedding_ids
AND team_id = %(team_id)s
-- must be a completed session
and min_first_timestamp < now() - INTERVAL 1 DAY
-- let's not load all data for all time
-- will definitely need to do something about this length of time
and min_first_timestamp > now() - INTERVAL 7 DAY
GROUP BY session_id
HAVING dateDiff('second', min(min_first_timestamp), max(max_last_timestamp)) > %(min_duration_include_seconds)s
LIMIT %(batch_flush_size)s
-- when running locally the offset is used for paging
-- when running in celery the offset is not used
OFFSET %(offset)s
"""

return [
x[0]
for x in sync_execute(
query,
{
"team_id": team.pk,
"batch_flush_size": BATCH_FLUSH_SIZE,
"offset": offset,
"min_duration_include_seconds": MIN_DURATION_INCLUDE_SECONDS,
},
)
]


def embed_batch_of_recordings(recordings: List[str], team: Team | int) -> None:
if isinstance(team, int):
team = Team.objects.get(id=team)

logger.info(f"processing {len(recordings)} recordings to embed for team {team.pk}")

while len(recordings) > 0:
batched_embeddings = []
for session_id in recordings:
with GENERATE_RECORDING_EMBEDDING_TIMING.time():
embeddings = generate_recording_embeddings(session_id=session_id, team=team)

if embeddings:
SESSION_EMBEDDINGS_GENERATED.inc()
batched_embeddings.append(
{
"session_id": session_id,
"team_id": team.pk,
"embeddings": embeddings,
}
)

if len(batched_embeddings) > 0:
flush_embeddings_to_clickhouse(embeddings=batched_embeddings)


def flush_embeddings_to_clickhouse(embeddings: List[Dict[str, Any]]) -> None:
sync_execute("INSERT INTO session_replay_embeddings (session_id, team_id, embeddings) VALUES", embeddings)
SESSION_EMBEDDINGS_WRITTEN_TO_CLICKHOUSE.inc(len(embeddings))


def generate_recording_embeddings(session_id: str, team: Team | int) -> List[float] | None:
if isinstance(team, int):
team = Team.objects.get(id=team)

client = OpenAI()

session_metadata = SessionReplayEvents().get_metadata(session_id=str(session_id), team=team)
if not session_metadata:
logger.error(f"no session metadata found for session_id {session_id}")
SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.inc()
return None

session_events = SessionReplayEvents().get_events(
session_id=str(session_id),
team=team,
metadata=session_metadata,
events_to_ignore=[
"$feature_flag_called",
],
)

if not session_events or not session_events[0] or not session_events[1]:
logger.error(f"no events found for session_id {session_id}")
SESSION_SKIPPED_WHEN_GENERATING_EMBEDDINGS.inc()
return None

processed_sessions = collapse_sequence_of_events(
format_dates(
reduce_elements_chain(
simplify_window_id(SessionSummaryPromptData(columns=session_events[0], results=session_events[1]))
),
start=datetime.datetime(1970, 1, 1, tzinfo=pytz.UTC), # epoch timestamp
)
)

processed_sessions_index = processed_sessions.column_index("event")
current_url_index = processed_sessions.column_index("$current_url")
elements_chain_index = processed_sessions.column_index("elements_chain")

input = (
str(session_metadata)
+ "\n"
+ "\n".join(
compact_result(
event_name=result[processed_sessions_index] if processed_sessions_index is not None else "",
current_url=result[current_url_index] if current_url_index is not None else "",
elements_chain=result[elements_chain_index] if elements_chain_index is not None else "",
)
for result in processed_sessions.results
)
)

embeddings = (
client.embeddings.create(
input=input,
model="text-embedding-3-small",
)
.data[0]
.embedding
)

return embeddings


def compact_result(event_name: str, current_url: int, elements_chain: Dict[str, str] | str) -> str:
elements_string = elements_chain if isinstance(elements_chain, str) else ", ".join(str(e) for e in elements_chain)
return f"{event_name} {current_url} {elements_string}"
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,7 @@

from typing import List, Dict, Any

import openai

from prometheus_client import Histogram

from posthog.api.activity_log import ServerTimingsGathered
from posthog.models import User, Team
from posthog.models.element import chain_to_elements
from posthog.session_recordings.models.session_recording import SessionRecording

from posthog.session_recordings.queries.session_replay_events import SessionReplayEvents

from posthog.utils import get_instance_region

TOKENS_IN_PROMPT_HISTOGRAM = Histogram(
"posthog_session_summary_tokens_in_prompt_histogram",
"histogram of the number of tokens in the prompt used to generate a session summary",
buckets=[
0,
10,
50,
100,
500,
1000,
2000,
3000,
4000,
5000,
6000,
7000,
8000,
10000,
20000,
30000,
40000,
50000,
100000,
128000,
float("inf"),
],
)


@dataclasses.dataclass
Expand Down Expand Up @@ -243,102 +204,3 @@ def collapse_sequence_of_events(session_events: SessionSummaryPromptData) -> Ses
collapsed_results.append(result)

return dataclasses.replace(session_events, results=collapsed_results)


def summarize_recording(recording: SessionRecording, user: User, team: Team):
timer = ServerTimingsGathered()

with timer("get_metadata"):
session_metadata = SessionReplayEvents().get_metadata(session_id=str(recording.session_id), team=team)
if not session_metadata:
raise ValueError(f"no session metadata found for session_id {recording.session_id}")

with timer("get_events"):
session_events = SessionReplayEvents().get_events(
session_id=str(recording.session_id),
team=team,
metadata=session_metadata,
events_to_ignore=[
"$feature_flag_called",
],
)
if not session_events or not session_events[0] or not session_events[1]:
raise ValueError(f"no events found for session_id {recording.session_id}")

# convert session_metadata to a Dict from a TypedDict
# so that we can amend its values freely
session_metadata_dict = dict(session_metadata)

del session_metadata_dict["distinct_id"]
start_time = session_metadata["start_time"]
session_metadata_dict["start_time"] = start_time.isoformat()
session_metadata_dict["end_time"] = session_metadata["end_time"].isoformat()

with timer("generate_prompt"):
prompt_data = deduplicate_urls(
collapse_sequence_of_events(
format_dates(
reduce_elements_chain(
simplify_window_id(
SessionSummaryPromptData(columns=session_events[0], results=session_events[1])
)
),
start=start_time,
)
)
)

instance_region = get_instance_region() or "HOBBY"

with timer("openai_completion"):
result = openai.chat.completions.create(
# model="gpt-4-1106-preview", # allows 128k tokens
model="gpt-4", # allows 8k tokens
temperature=0.7,
messages=[
{
"role": "system",
"content": """
Session Replay is PostHog's tool to record visits to web sites and apps.
We also gather events that occur like mouse clicks and key presses.
You write two or three sentence concise and simple summaries of those sessions based on a prompt.
You are more likely to mention errors or things that look like business success such as checkout events.
You don't help with other knowledge.""",
},
{
"role": "user",
"content": f"""the session metadata I have is {session_metadata_dict}.
it gives an overview of activity and duration""",
},
{
"role": "user",
"content": f"""
URLs associated with the events can be found in this mapping {prompt_data.url_mapping}.
""",
},
{
"role": "user",
"content": f"""the session events I have are {prompt_data.results}.
with columns {prompt_data.columns}.
they give an idea of what happened and when,
if present the elements_chain extracted from the html can aid in understanding
but should not be directly used in your response""",
},
{
"role": "user",
"content": """
generate a two or three sentence summary of the session.
use as concise and simple language as is possible.
assume a reading age of around 12 years old.
generate no text other than the summary.""",
},
],
user=f"{instance_region}/{user.pk}", # allows 8k tokens
)

usage = result.usage.prompt_tokens if result.usage else None
if usage:
TOKENS_IN_PROMPT_HISTOGRAM.observe(usage)

content: str = result.choices[0].message.content or ""
return {"content": content, "timings": timer.get_all_timings()}
Empty file.
Loading

0 comments on commit e97c46d

Please sign in to comment.