Skip to content

Commit

Permalink
Merge branch 'master' into fix-zero-decimal-places
Browse files Browse the repository at this point in the history
  • Loading branch information
Twixes authored Mar 13, 2024
2 parents c07d22a + 3b7288f commit 9cc6fe3
Show file tree
Hide file tree
Showing 11 changed files with 498 additions and 104 deletions.
1 change: 1 addition & 0 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get
posthog/queries/trends/test/test_person.py:0: error: Invalid index type "int" for "HttpResponse"; expected type "str | bytes" [index]
posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined]
posthog/queries/trends/test/test_person.py:0: error: Invalid index type "int" for "HttpResponse"; expected type "str | bytes" [index]
posthog/management/commands/migrate_team.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "BatchExport") [assignment]
posthog/hogql/test/test_query.py:0: error: Argument 1 to "len" has incompatible type "list[Any] | None"; expected "Sized" [arg-type]
posthog/hogql/test/test_query.py:0: error: Value of type "list[QueryTiming] | None" is not indexable [index]
posthog/hogql/test/test_query.py:0: error: Value of type "list[QueryTiming] | None" is not indexable [index]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,18 @@ function isAnyMouseActivity(event: RRWebEvent) {
)
}

/**
* meta event has type = 4 and event.data.href
* and custom events have type = 5 and _might_ have event.data.payload.href
*
* we don't really care what type of event they are just whether they have a href
*/
function hrefFrom(event: RRWebEvent): string | undefined {
const metaHref = event.data?.href?.trim()
const customHref = event.data?.payload?.href?.trim()
return metaHref || customHref || undefined
}

export const createSessionReplayEvent = (
uuid: string,
team_id: number,
Expand Down Expand Up @@ -275,9 +287,12 @@ export const createSessionReplayEvent = (
keypressCount += 1
}
}
if (url === null && !!event.data?.href?.trim().length) {
url = event.data.href

const eventUrl: string | undefined = hrefFrom(event)
if (url === null && eventUrl) {
url = eventUrl
}

if (event.type === RRWebEventType.Plugin && event.data?.plugin === 'rrweb/console@1') {
const level = safeLevel(event.data.payload?.level)
if (level === 'info') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,51 @@ describe('session recording process event', () => {
},
},
{
testDescription: 'first url detection',
testDescription: 'url can be detected in meta event',
snapshotData: {
events_summary: [
{
timestamp: 1682449093693,
type: 3,
data: {},
windowId: '1',
},
{
timestamp: 1682449093469,
type: 4,
data: {
href: 'http://127.0.0.1:8000/the/url',
},
windowId: '1',
},
],
},
expected: {
click_count: 0,
keypress_count: 0,
mouse_activity_count: 0,
first_url: 'http://127.0.0.1:8000/the/url',
first_timestamp: '2023-04-25 18:58:13.469',
last_timestamp: '2023-04-25 18:58:13.693',
active_milliseconds: 0, // no data.source, so no activity
console_log_count: 0,
console_warn_count: 0,
console_error_count: 0,
size: 163,
event_count: 2,
message_count: 1,
snapshot_source: 'web',
},
},
{
testDescription: 'first url detection takes the first url whether meta url or payload url',
snapshotData: {
events_summary: [
{
timestamp: 1682449093693,
type: 5,
data: {
payload: {
// doesn't match because href is nested in payload
href: 'http://127.0.0.1:8000/home',
},
},
Expand All @@ -209,7 +245,7 @@ describe('session recording process event', () => {
click_count: 0,
keypress_count: 0,
mouse_activity_count: 0,
first_url: 'http://127.0.0.1:8000/second/url',
first_url: 'http://127.0.0.1:8000/home',
first_timestamp: '2023-04-25 18:58:13.469',
last_timestamp: '2023-04-25 18:58:13.693',
active_milliseconds: 0, // no data.source, so no activity
Expand All @@ -222,6 +258,51 @@ describe('session recording process event', () => {
snapshot_source: 'web',
},
},
{
testDescription: 'first url detection can use payload url',
snapshotData: {
events_summary: [
{
timestamp: 1682449093469,
type: 5,
data: {
payload: {
// we don't read just any URL
'the-page-url': 'http://127.0.0.1:8000/second/url',
},
},
windowId: '1',
},
{
timestamp: 1682449093693,
type: 5,
data: {
payload: {
// matches href nested in payload
href: 'http://127.0.0.1:8000/my-spa',
},
},
windowId: '1',
},
],
},
expected: {
click_count: 0,
keypress_count: 0,
mouse_activity_count: 0,
first_url: 'http://127.0.0.1:8000/my-spa',
first_timestamp: '2023-04-25 18:58:13.469',
last_timestamp: '2023-04-25 18:58:13.693',
active_milliseconds: 0, // no data.source, so no activity
console_log_count: 0,
console_warn_count: 0,
console_error_count: 0,
size: 235,
event_count: 2,
message_count: 1,
snapshot_source: 'web',
},
},
{
testDescription: 'negative timestamps are not included when picking timestamps',
snapshotData: {
Expand Down
23 changes: 2 additions & 21 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
BatchExportSchema,
BatchExportServiceError,
BatchExportServiceRPCError,
BatchExportServiceScheduleNotFound,
BatchExportWithNoEndNotAllowedError,
backfill_export,
batch_export_delete_schedule,
cancel_running_batch_export_backfill,
disable_and_delete_export,
pause_batch_export,
sync_batch_export,
unpause_batch_export,
Expand All @@ -43,7 +41,6 @@
from posthog.hogql.printer import prepare_ast_for_printing, print_prepared_ast
from posthog.models import (
BatchExport,
BatchExportBackfill,
BatchExportDestination,
BatchExportRun,
Team,
Expand Down Expand Up @@ -436,23 +433,7 @@ def perform_destroy(self, instance: BatchExport):
since we are deleting, we assume that we can recover from this state by finishing the delete operation by calling
instance.save().
"""
temporal = sync_connect()

instance.deleted = True

try:
batch_export_delete_schedule(temporal, str(instance.pk))
except BatchExportServiceScheduleNotFound as e:
logger.warning(
"The Schedule %s could not be deleted as it was not found",
e.schedule_id,
)

instance.save()

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, backfill.workflow_id)
disable_and_delete_export(instance)


class BatchExportOrganizationViewSet(BatchExportViewSet):
Expand Down
24 changes: 24 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import asdict, dataclass, fields
from uuid import UUID

import structlog
import temporalio
from asgiref.sync import async_to_sync
from temporalio.client import (
Expand Down Expand Up @@ -32,6 +33,8 @@
update_schedule,
)

logger = structlog.get_logger(__name__)


class BatchExportField(typing.TypedDict):
"""A field to be queried from ClickHouse.
Expand Down Expand Up @@ -291,6 +294,27 @@ def unpause_batch_export(
backfill_export(temporal, batch_export_id, batch_export.team_id, start_at, end_at)


def disable_and_delete_export(instance: BatchExport):
"""Mark a BatchExport as deleted and delete its Temporal Schedule (including backfills)."""
temporal = sync_connect()

instance.deleted = True

try:
batch_export_delete_schedule(temporal, str(instance.pk))
except BatchExportServiceScheduleNotFound as e:
logger.warning(
"The Schedule %s could not be deleted as it was not found",
e.schedule_id,
)

instance.save()

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, backfill.workflow_id)


def batch_export_delete_schedule(temporal: Client, schedule_id: str) -> None:
"""Delete a Temporal Schedule."""
try:
Expand Down
46 changes: 32 additions & 14 deletions posthog/hogql/database/schema/channel_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,29 @@ def create_initial_domain_type(name: str):
def create_initial_channel_type(name: str):
return ExpressionField(
name=name,
expr=parse_expr(
"""
expr=create_channel_type_expr(
campaign=ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_utm_campaign"])]),
medium=ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_utm_medium"])]),
source=ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_utm_source"])]),
referring_domain=ast.Call(
name="toString", args=[ast.Field(chain=["properties", "$initial_referring_domain"])]
),
gclid=ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_gclid"])]),
gad_source=ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_gad_source"])]),
),
)


def create_channel_type_expr(
campaign: ast.Expr,
medium: ast.Expr,
source: ast.Expr,
referring_domain: ast.Expr,
gclid: ast.Expr,
gad_source: ast.Expr,
) -> ast.Expr:
return parse_expr(
"""
multiIf(
match({campaign}, 'cross-network'),
'Cross Network',
Expand Down Expand Up @@ -99,16 +120,13 @@ def create_initial_channel_type(name: str):
)
)
)""",
start=None,
placeholders={
"campaign": ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_utm_campaign"])]),
"medium": ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_utm_medium"])]),
"source": ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_utm_source"])]),
"referring_domain": ast.Call(
name="toString", args=[ast.Field(chain=["properties", "$initial_referring_domain"])]
),
"gclid": ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_gclid"])]),
"gad_source": ast.Call(name="toString", args=[ast.Field(chain=["properties", "$initial_gad_source"])]),
},
),
start=None,
placeholders={
"campaign": campaign,
"medium": medium,
"source": source,
"referring_domain": referring_domain,
"gclid": gclid,
"gad_source": gad_source,
},
)
12 changes: 12 additions & 0 deletions posthog/hogql/database/schema/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DatabaseField,
LazyTable,
)
from posthog.hogql.database.schema.channel_type import create_channel_type_expr
from posthog.schema import HogQLQueryModifiers


Expand Down Expand Up @@ -109,6 +110,16 @@ def select_from_sessions_table(requested_fields: Dict[str, List[str | int]]):
ast.Call(name="max", args=[ast.Field(chain=[table_name, "max_timestamp"])]),
],
),
"channel_type": create_channel_type_expr(
campaign=ast.Call(name="argMinMerge", args=[ast.Field(chain=[table_name, "initial_utm_campaign"])]),
medium=ast.Call(name="argMinMerge", args=[ast.Field(chain=[table_name, "initial_utm_medium"])]),
source=ast.Call(name="argMinMerge", args=[ast.Field(chain=[table_name, "initial_utm_source"])]),
referring_domain=ast.Call(
name="argMinMerge", args=[ast.Field(chain=[table_name, "initial_referring_domain"])]
),
gclid=ast.Call(name="argMinMerge", args=[ast.Field(chain=[table_name, "initial_gclid"])]),
gad_source=ast.Call(name="argMinMerge", args=[ast.Field(chain=[table_name, "initial_gad_source"])]),
),
}

select_fields: List[ast.Expr] = []
Expand All @@ -134,6 +145,7 @@ class SessionsTable(LazyTable):
fields: Dict[str, FieldOrTable] = {
**SESSIONS_COMMON_FIELDS,
"duration": IntegerDatabaseField(name="duration"),
"channel_type": StringDatabaseField(name="channel_type"),
}

def lazy_select(self, requested_fields: Dict[str, List[str | int]], modifiers: HogQLQueryModifiers):
Expand Down
32 changes: 30 additions & 2 deletions posthog/hogql/database/schema/test/test_sessions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from posthog.hogql import ast
from posthog.hogql.parser import parse_select
from posthog.hogql.query import execute_hogql_query
from posthog.test.base import (
Expand All @@ -9,16 +10,19 @@

class TestReferringDomainType(ClickhouseTestMixin, APIBaseTest):
def test_select_star(self):
session_id = "session_test_select_star"

_create_event(
event="$pageview",
team=self.team,
distinct_id="d1",
properties={"$current_url": "https://example.com", "$session_id": "s1"},
properties={"$current_url": "https://example.com", "$session_id": session_id},
)

response = execute_hogql_query(
parse_select(
"select * from sessions",
"select * from sessions where session_id = {session_id}",
placeholders={"session_id": ast.Constant(value=session_id)},
),
self.team,
)
Expand All @@ -27,3 +31,27 @@ def test_select_star(self):
len(response.results or []),
1,
)

def test_channel_type(self):
session_id = "session_test_channel_type"

_create_event(
event="$pageview",
team=self.team,
distinct_id="d1",
properties={"gad_source": "1", "$session_id": session_id},
)

response = execute_hogql_query(
parse_select(
"select channel_type from sessions where session_id = {session_id}",
placeholders={"session_id": ast.Constant(value=session_id)},
),
self.team,
)

result = (response.results or [])[0]
self.assertEqual(
result[0],
"Paid Search",
)
Loading

0 comments on commit 9cc6fe3

Please sign in to comment.