Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a debug table for clickhouse_events_json #23377

Merged
merged 23 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b101422
feat: Add a debug table for clickhouse_events_json
fuziontech Jul 1, 2024
24215ed
get_create_table => get_create_view
fuziontech Jul 1, 2024
11a752c
use DateTime for _timestamp for TTL
fuziontech Jul 2, 2024
ff61bee
fix type issue
fuziontech Jul 2, 2024
d257ad2
commas
fuziontech Jul 2, 2024
03d036c
not nullable
fuziontech Jul 2, 2024
e7c14d9
reorder tables
fuziontech Jul 2, 2024
ba0740f
update sequencing for migrations
fuziontech Jul 2, 2024
32cc9b0
Merge branch 'master' into debug_events
fuziontech Jul 2, 2024
dc5195b
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
afbbaac
Update query snapshots
github-actions[bot] Jul 2, 2024
f3a062c
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
dfc4269
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
e18bbb9
Update UI snapshots for `chromium` (2)
github-actions[bot] Jul 2, 2024
652fb4c
fix(multi project flags): remove flag id from URL when switching proj…
jurajmajerik Jul 2, 2024
d47c704
Update UI snapshots for `chromium` (2)
github-actions[bot] Jul 2, 2024
9337ecd
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
78a6afc
Update UI snapshots for `chromium` (2)
github-actions[bot] Jul 2, 2024
57052ad
remove version and don't skip bad messages
fuziontech Jul 2, 2024
a9a7c98
Add _error and _raw_message and set kafka_handle_error_mode=stream
fuziontech Jul 2, 2024
2d1b3c3
Merge branch 'master' into debug_events
fuziontech Jul 2, 2024
675388e
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
46cedae
typos
fuziontech Jul 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -655,26 +655,97 @@
# ---
# name: ClickhouseTestFunnelExperimentResults.test_experiment_flow_with_event_results_with_hogql_aggregation.1
'''
/* celery:posthog.tasks.tasks.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
WHERE timestamp > date_sub(DAY, 3, now())
AND timestamp < now()
GROUP BY team_id
ORDER BY age;
/* user_id:0 request:_snapshot_ */
SELECT array(replaceRegexpAll(JSONExtractRaw(properties, '$feature/a-b-test'), '^"|"$', '')) AS value,
count(*) as count
FROM events e
WHERE team_id = 2
AND event IN ['$pageleave', '$pageview']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2020-01-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2020-01-06 00:00:00', 'UTC')
GROUP BY value
ORDER BY count DESC, value DESC
LIMIT 26
OFFSET 0
'''
# ---
# name: ClickhouseTestFunnelExperimentResults.test_experiment_flow_with_event_results_with_hogql_aggregation.2
'''
/* celery:posthog.tasks.tasks.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
WHERE timestamp > date_sub(DAY, 3, now())
AND timestamp < now()
GROUP BY team_id
ORDER BY age;
/* user_id:0 request:_snapshot_ */
SELECT countIf(steps = 1) step_1,
countIf(steps = 2) step_2,
avg(step_1_average_conversion_time_inner) step_1_average_conversion_time,
median(step_1_median_conversion_time_inner) step_1_median_conversion_time,
prop
FROM
(SELECT aggregation_target,
steps,
avg(step_1_conversion_time) step_1_average_conversion_time_inner,
median(step_1_conversion_time) step_1_median_conversion_time_inner ,
prop
FROM
(SELECT aggregation_target,
steps,
max(steps) over (PARTITION BY aggregation_target,
prop) as max_steps,
step_1_conversion_time ,
prop
FROM
(SELECT *,
if(latest_0 <= latest_1
AND latest_1 <= latest_0 + INTERVAL 14 DAY, 2, 1) AS steps ,
if(isNotNull(latest_1)
AND latest_1 <= latest_0 + INTERVAL 14 DAY, dateDiff('second', toDateTime(latest_0), toDateTime(latest_1)), NULL) step_1_conversion_time,
prop
FROM
(SELECT aggregation_target, timestamp, step_0,
latest_0,
step_1,
min(latest_1) over (PARTITION by aggregation_target,
prop
ORDER BY timestamp DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) latest_1 ,
if(has([['test'], ['control'], ['']], prop), prop, ['Other']) as prop
FROM
(SELECT *,
if(notEmpty(arrayFilter(x -> notEmpty(x), prop_vals)), prop_vals, ['']) as prop
FROM
(SELECT e.timestamp as timestamp,
replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(properties, '$account_id'), ''), 'null'), '^"|"$', '') as aggregation_target,
pdi.person_id as person_id,
if(event = '$pageview', 1, 0) as step_0,
if(step_0 = 1, timestamp, null) as latest_0,
if(event = '$pageleave', 1, 0) as step_1,
if(step_1 = 1, timestamp, null) as latest_1,
array(replaceRegexpAll(JSONExtractRaw(properties, '$feature/a-b-test'), '^"|"$', '')) AS prop_basic,
prop_basic as prop,
argMinIf(prop, timestamp, notEmpty(arrayFilter(x -> notEmpty(x), prop))) over (PARTITION by aggregation_target) as prop_vals
FROM events e
INNER JOIN
(SELECT distinct_id,
argMax(person_id, version) as person_id
FROM person_distinct_id2
WHERE team_id = 2
AND distinct_id IN
(SELECT distinct_id
FROM events
WHERE team_id = 2
AND event IN ['$pageleave', '$pageview']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2020-01-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2020-01-06 00:00:00', 'UTC') )
GROUP BY distinct_id
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
WHERE team_id = 2
AND event IN ['$pageleave', '$pageview']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2020-01-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2020-01-06 00:00:00', 'UTC')
AND (step_0 = 1
OR step_1 = 1) )))
WHERE step_0 = 1 ))
GROUP BY aggregation_target,
steps,
prop
HAVING steps = max_steps)
GROUP BY prop
'''
# ---
# name: ClickhouseTestFunnelExperimentResults.test_experiment_flow_with_event_results_with_hogql_aggregation.3
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 3 additions & 2 deletions frontend/src/layout/navigation/ProjectSwitcher.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { upgradeModalLogic } from 'lib/components/UpgradeModal/upgradeModalLogic
import { LemonButton } from 'lib/lemon-ui/LemonButton'
import { LemonDivider } from 'lib/lemon-ui/LemonDivider'
import { LemonSnack } from 'lib/lemon-ui/LemonSnack/LemonSnack'
import { removeProjectIdIfPresent } from 'lib/utils/router-utils'
import { removeFlagIdIfPresent, removeProjectIdIfPresent } from 'lib/utils/router-utils'
import { useMemo } from 'react'
import { organizationLogic } from 'scenes/organizationLogic'
import { isAuthenticatedTeam, teamLogic } from 'scenes/teamLogic'
Expand Down Expand Up @@ -91,7 +91,8 @@ function OtherProjectButton({ team }: { team: TeamBasicType; onClickInside?: ()
// project switch lands on something like insight/abc that won't exist.
// On the other hand, if we remove the ID, it could be that someone opens a page, realizes they're in the wrong project
// and after switching is on a different page than before.
const route = removeProjectIdIfPresent(location.pathname)
let route = removeProjectIdIfPresent(location.pathname)
route = removeFlagIdIfPresent(route)
return urls.project(team.id, route)
}, [location.pathname])

Expand Down
7 changes: 7 additions & 0 deletions frontend/src/lib/utils/router-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ export function removeProjectIdIfPresent(path: string): string {
return path
}

export function removeFlagIdIfPresent(path: string): string {
if (path.match(/^\/feature_flags\/\d+/)) {
return path.replace(/(feature_flags).*$/, '$1/')
}
return path
}

export function addProjectIdIfMissing(path: string, teamId?: TeamType['id']): string {
return isPathWithoutProjectId(removeProjectIdIfPresent(path))
? removeProjectIdIfPresent(path)
Expand Down
20 changes: 20 additions & 0 deletions posthog/clickhouse/migrations/0067_event_kafka_debug_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON
from posthog.models.kafka_debug.sql import (
KafkaDebugKafkaTable,
KafkaDebugTable,
KafkaDebugMaterializedView,
)
from posthog.settings.data_stores import KAFKA_HOSTS


debug_table = KafkaDebugTable(topic=KAFKA_EVENTS_JSON)
kafka_table = KafkaDebugKafkaTable(brokers=KAFKA_HOSTS, topic=KAFKA_EVENTS_JSON)
materialized_view = KafkaDebugMaterializedView(to_table=debug_table, from_table=kafka_table)


operations = [
run_sql_with_exceptions(debug_table.get_create_table_sql()),
run_sql_with_exceptions(kafka_table.get_create_table_sql()),
run_sql_with_exceptions(materialized_view.get_create_view_sql()),
]
Empty file.
86 changes: 86 additions & 0 deletions posthog/models/kafka_debug/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from dataclasses import dataclass
from posthog.clickhouse.kafka_engine import kafka_engine
from posthog.clickhouse.table_engines import MergeTreeEngine
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE


@dataclass
class KafkaDebugKafkaTable:
brokers: list[str]
topic: str
consumer_group: str = "debug"

@property
def table_name(self) -> str:
return f"kafka_{self.topic}_debug"

def get_create_table_sql(self) -> str:
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
(
payload String
)
ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)}
SETTINGS input_format_values_interpret_expressions=0, kafka_handle_error_mode='stream'
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
"""


@dataclass
class KafkaDebugTable:
topic: str

@property
def table_name(self) -> str:
return f"{self.topic}_debug"

def get_create_table_sql(self) -> str:
engine = MergeTreeEngine(self.table_name)
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' (
payload String,
_timestamp DateTime,
_timestamp_ms Nullable(DateTime64(3)),
_partition UInt64,
_offset UInt64,
_error String,
_raw_message String
)
ENGINE = {engine}
PARTITION BY toStartOfHour(_timestamp)
ORDER BY (_partition, _offset)
TTL _timestamp + INTERVAL 14 DAY
"""


@dataclass
class KafkaDebugMaterializedView:
to_table: KafkaDebugTable
from_table: KafkaDebugKafkaTable

@property
def view_name(self) -> str:
return f"{self.to_table.table_name}_mv"

def get_create_view_sql(self) -> str:
return f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.view_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' TO {self.to_table.table_name}
AS SELECT
payload,
_timestamp,
_timestamp_ms,
_partition,
_offset,
_error String,
_raw_message String
FROM `{CLICKHOUSE_DATABASE}`.{self.from_table.table_name}
"""

def get_drop_view_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.view_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC
"""