Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add app_metrics2 table #22859

Merged
merged 6 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions posthog/clickhouse/migrations/0072_app_metrics2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.app_metrics2.sql import (
APP_METRICS2_DATA_TABLE_SQL,
APP_METRICS2_MV_TABLE_SQL,
DISTRIBUTED_APP_METRICS2_TABLE_SQL,
KAFKA_APP_METRICS2_TABLE_SQL,
)
from posthog.settings import CLICKHOUSE_CLUSTER

operations = [
run_sql_with_exceptions(f"DROP TABLE IF EXISTS app_metrics2_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"DROP TABLE IF EXISTS kafka_app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"DROP TABLE IF EXISTS app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"DROP TABLE IF EXISTS sharded_app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(APP_METRICS2_DATA_TABLE_SQL()),
run_sql_with_exceptions(DISTRIBUTED_APP_METRICS2_TABLE_SQL()),
run_sql_with_exceptions(KAFKA_APP_METRICS2_TABLE_SQL()),
run_sql_with_exceptions(APP_METRICS2_MV_TABLE_SQL()),
]
10 changes: 10 additions & 0 deletions posthog/clickhouse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
KAFKA_APP_METRICS_TABLE_SQL,
DISTRIBUTED_APP_METRICS_TABLE_SQL,
)
from posthog.models.app_metrics2.sql import (
APP_METRICS2_DATA_TABLE_SQL,
APP_METRICS2_MV_TABLE_SQL,
KAFKA_APP_METRICS2_TABLE_SQL,
DISTRIBUTED_APP_METRICS2_TABLE_SQL,
)
from posthog.models.channel_type.sql import (
CHANNEL_DEFINITION_TABLE_SQL,
CHANNEL_DEFINITION_DATA_SQL,
Expand Down Expand Up @@ -127,6 +133,7 @@
SESSION_RECORDING_EVENTS_TABLE_SQL,
INGESTION_WARNINGS_DATA_TABLE_SQL,
APP_METRICS_DATA_TABLE_SQL,
APP_METRICS2_DATA_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_TABLE_SQL,
CHANNEL_DEFINITION_TABLE_SQL,
Expand All @@ -141,6 +148,7 @@
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL,
DISTRIBUTED_INGESTION_WARNINGS_TABLE_SQL,
DISTRIBUTED_APP_METRICS_TABLE_SQL,
DISTRIBUTED_APP_METRICS2_TABLE_SQL,
WRITABLE_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
Expand All @@ -165,6 +173,7 @@
KAFKA_SESSION_RECORDING_EVENTS_TABLE_SQL,
KAFKA_INGESTION_WARNINGS_TABLE_SQL,
KAFKA_APP_METRICS_TABLE_SQL,
KAFKA_APP_METRICS2_TABLE_SQL,
KAFKA_PERFORMANCE_EVENTS_TABLE_SQL,
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
KAFKA_HEATMAPS_TABLE_SQL,
Expand All @@ -183,6 +192,7 @@
SESSION_RECORDING_EVENTS_TABLE_MV_SQL,
INGESTION_WARNINGS_MV_TABLE_SQL,
APP_METRICS_MV_TABLE_SQL,
APP_METRICS2_MV_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_MV_SQL,
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
SESSIONS_TABLE_MV_SQL,
Expand Down
153 changes: 153 additions & 0 deletions posthog/clickhouse/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@

'''
# ---
# name: test_create_kafka_table_with_different_kafka_host[kafka_app_metrics2]
'''

CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
app_source LowCardinality(String),
app_source_id String,
instance_id String,
metric_kind String,
metric_name String,
count Int64
)
ENGINE=Kafka('test.kafka.broker:9092', 'clickhouse_app_metrics2_test', 'group1', 'JSONEachRow')

'''
# ---
# name: test_create_kafka_table_with_different_kafka_host[kafka_app_metrics]
'''

Expand Down Expand Up @@ -416,6 +434,55 @@

'''
# ---
# name: test_create_table_query[app_metrics2]
'''

CREATE TABLE IF NOT EXISTS app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
-- The name of the service or product that generated the metrics.
-- Examples: plugins, hog
app_source LowCardinality(String),
-- An id for the app source.
-- Set app_source to avoid collision with ids from other app sources if the id generation is not safe.
-- Examples: A plugin id, a hog application id
app_source_id String,
-- A secondary id e.g. for the instance of app_source that generated this metric.
-- This may be ommitted if app_source is a singleton.
-- Examples: A plugin config id, a hog application config id
instance_id String,
metric_kind LowCardinality(String),
metric_name LowCardinality(String),
count SimpleAggregateFunction(sum, Int64)

, _timestamp DateTime
, _offset UInt64
, _partition UInt64

)
ENGINE=Distributed('posthog', 'posthog_test', 'sharded_app_metrics2', rand())

'''
# ---
# name: test_create_table_query[app_metrics2_mv]
'''

CREATE MATERIALIZED VIEW IF NOT EXISTS app_metrics2_mv ON CLUSTER 'posthog'
TO posthog_test.sharded_app_metrics2
AS SELECT
team_id,
timestamp,
app_source,
app_source_id,
instance_id,
metric_kind,
metric_name,
count
FROM posthog_test.kafka_app_metrics2

'''
# ---
# name: test_create_table_query[app_metrics]
'''

Expand Down Expand Up @@ -767,6 +834,24 @@

'''
# ---
# name: test_create_table_query[kafka_app_metrics2]
'''

CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
app_source LowCardinality(String),
app_source_id String,
instance_id String,
metric_kind String,
metric_name String,
count Int64
)
ENGINE=Kafka('kafka:9092', 'clickhouse_app_metrics2_test', 'group1', 'JSONEachRow')

'''
# ---
# name: test_create_table_query[kafka_app_metrics]
'''

Expand Down Expand Up @@ -1989,6 +2074,40 @@
GROUP BY `$session_id`, team_id


'''
# ---
# name: test_create_table_query[sharded_app_metrics2]
'''

CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
-- The name of the service or product that generated the metrics.
-- Examples: plugins, hog
app_source LowCardinality(String),
-- An id for the app source.
-- Set app_source to avoid collision with ids from other app sources if the id generation is not safe.
-- Examples: A plugin id, a hog application id
app_source_id String,
-- A secondary id e.g. for the instance of app_source that generated this metric.
-- This may be ommitted if app_source is a singleton.
-- Examples: A plugin config id, a hog application config id
instance_id String,
metric_kind LowCardinality(String),
metric_name LowCardinality(String),
count SimpleAggregateFunction(sum, Int64)

, _timestamp DateTime
, _offset UInt64
, _partition UInt64

)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_app_metrics2', '{replica}')
PARTITION BY toYYYYMM(timestamp)
bretthoerner marked this conversation as resolved.
Show resolved Hide resolved
ORDER BY (team_id, app_source, app_source_id, instance_id, toStartOfHour(timestamp), metric_kind, metric_name)


'''
# ---
# name: test_create_table_query[sharded_app_metrics]
Expand Down Expand Up @@ -3049,6 +3168,40 @@

SETTINGS index_granularity=512

'''
# ---
# name: test_create_table_query_replicated_and_storage[sharded_app_metrics2]
'''

CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
-- The name of the service or product that generated the metrics.
-- Examples: plugins, hog
app_source LowCardinality(String),
-- An id for the app source.
-- Set app_source to avoid collision with ids from other app sources if the id generation is not safe.
-- Examples: A plugin id, a hog application id
app_source_id String,
-- A secondary id e.g. for the instance of app_source that generated this metric.
-- This may be ommitted if app_source is a singleton.
-- Examples: A plugin config id, a hog application config id
instance_id String,
metric_kind LowCardinality(String),
metric_name LowCardinality(String),
count SimpleAggregateFunction(sum, Int64)

, _timestamp DateTime
, _offset UInt64
, _partition UInt64

)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_app_metrics2', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (team_id, app_source, app_source_id, instance_id, toStartOfHour(timestamp), metric_kind, metric_name)


'''
# ---
# name: test_create_table_query_replicated_and_storage[sharded_app_metrics]
Expand Down
1 change: 1 addition & 0 deletions posthog/kafka_client/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
KAFKA_GROUPS = f"{KAFKA_PREFIX}clickhouse_groups{SUFFIX}"
KAFKA_INGESTION_WARNINGS = f"{KAFKA_PREFIX}clickhouse_ingestion_warnings{SUFFIX}"
KAFKA_APP_METRICS = f"{KAFKA_PREFIX}clickhouse_app_metrics{SUFFIX}"
KAFKA_APP_METRICS2 = f"{KAFKA_PREFIX}clickhouse_app_metrics2{SUFFIX}"
KAFKA_METRICS_TIME_TO_SEE_DATA = f"{KAFKA_PREFIX}clickhouse_metrics_time_to_see_data{SUFFIX}"
KAFKA_PERSON_OVERRIDE = f"{KAFKA_PREFIX}clickhouse_person_override{SUFFIX}"
KAFKA_LOG_ENTRIES = f"{KAFKA_PREFIX}log_entries{SUFFIX}"
Expand Down
Empty file.
131 changes: 131 additions & 0 deletions posthog/models/app_metrics2/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from django.conf import settings
bretthoerner marked this conversation as resolved.
Show resolved Hide resolved

from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS_WITH_PARTITION, kafka_engine, ttl_period
from posthog.clickhouse.table_engines import (
AggregatingMergeTree,
Distributed,
ReplicationScheme,
)
from posthog.kafka_client.topics import KAFKA_APP_METRICS2

APP_METRICS2_TTL_DAYS = 90

SHARDED_APP_METRICS2_TABLE_ENGINE = lambda: AggregatingMergeTree(
"sharded_app_metrics2", replication_scheme=ReplicationScheme.SHARDED
)

BASE_APP_METRICS2_COLUMNS = """
team_id Int64,
timestamp DateTime64(6, 'UTC'),
-- The name of the service or product that generated the metrics.
-- Examples: plugins, hog
app_source LowCardinality(String),
-- An id for the app source.
-- Set app_source to avoid collision with ids from other app sources if the id generation is not safe.
-- Examples: A plugin id, a hog application id
app_source_id String,
-- A secondary id e.g. for the instance of app_source that generated this metric.
-- This may be ommitted if app_source is a singleton.
-- Examples: A plugin config id, a hog application config id
instance_id String,
metric_kind LowCardinality(String),
metric_name LowCardinality(String),
count SimpleAggregateFunction(sum, Int64)
""".strip()

# NOTE: We have producers that take advantage of the timestamp being truncated to the hour,
# i.e. they batch up metrics and send them pre-truncated. If we ever change this truncation
# we need to revisit producers (e.g. the webhook service currently known as rusty-hook or pgqueue).
APP_METRICS2_TIMESTAMP_TRUNCATION = "toStartOfHour(timestamp)"

APP_METRICS2_DATA_TABLE_SQL = (
lambda: f"""
CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'
(
{BASE_APP_METRICS2_COLUMNS}
{KAFKA_COLUMNS_WITH_PARTITION}
)
ENGINE = {SHARDED_APP_METRICS2_TABLE_ENGINE()}
PARTITION BY toYYYYMM(timestamp)
ORDER BY (team_id, app_source, app_source_id, instance_id, {APP_METRICS2_TIMESTAMP_TRUNCATION}, metric_kind, metric_name)
{ttl_period("timestamp", APP_METRICS2_TTL_DAYS, unit="DAY")}
"""
)

DISTRIBUTED_APP_METRICS2_TABLE_SQL = (
lambda: f"""
CREATE TABLE IF NOT EXISTS app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'
(
{BASE_APP_METRICS2_COLUMNS}
{KAFKA_COLUMNS_WITH_PARTITION}
)
ENGINE={Distributed(data_table="sharded_app_metrics2", sharding_key="rand()")}
"""
)

KAFKA_APP_METRICS2_TABLE_SQL = (
lambda: f"""
CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
app_source LowCardinality(String),
app_source_id String,
instance_id String,
metric_kind String,
metric_name String,
count Int64
)
ENGINE={kafka_engine(topic=KAFKA_APP_METRICS2)}
"""
)

APP_METRICS2_MV_TABLE_SQL = (
lambda: f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS app_metrics2_mv ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'
TO {settings.CLICKHOUSE_DATABASE}.sharded_app_metrics2
AS SELECT
team_id,
timestamp,
app_source,
app_source_id,
instance_id,
metric_kind,
metric_name,
count
FROM {settings.CLICKHOUSE_DATABASE}.kafka_app_metrics2
"""
)

TRUNCATE_APP_METRICS2_TABLE_SQL = f"TRUNCATE TABLE IF EXISTS sharded_app_metrics2"
bretthoerner marked this conversation as resolved.
Show resolved Hide resolved

INSERT_APP_METRICS2_SQL = """
INSERT INTO sharded_app_metrics2 (
team_id,
timestamp,
app_source,
app_source_id,
instance_id,
metric_kind,
metric_name,
count,
_timestamp,
_offset,
_partition
)
SELECT
%(team_id)s,
%(timestamp)s,
%(app_source)s,
%(app_source_id)s,
%(instance_id)s,
%(successes)s,
%(skipped)s,
%(failures)s,
%(metric_kind)s,
%(metric_name)s,
%(count)s,
now(),
0,
0
"""
Loading