Skip to content

Commit

Permalink
chore: add app_metrics2 table (#22859)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
bretthoerner and github-actions[bot] authored Jul 16, 2024
1 parent efcb81c commit f787067
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 0 deletions.
19 changes: 19 additions & 0 deletions posthog/clickhouse/migrations/0072_app_metrics2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.app_metrics2.sql import (
APP_METRICS2_DATA_TABLE_SQL,
APP_METRICS2_MV_TABLE_SQL,
DISTRIBUTED_APP_METRICS2_TABLE_SQL,
KAFKA_APP_METRICS2_TABLE_SQL,
)
from posthog.settings import CLICKHOUSE_CLUSTER

operations = [
run_sql_with_exceptions(f"DROP TABLE IF EXISTS app_metrics2_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"DROP TABLE IF EXISTS kafka_app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"DROP TABLE IF EXISTS app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"DROP TABLE IF EXISTS sharded_app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(APP_METRICS2_DATA_TABLE_SQL()),
run_sql_with_exceptions(DISTRIBUTED_APP_METRICS2_TABLE_SQL()),
run_sql_with_exceptions(KAFKA_APP_METRICS2_TABLE_SQL()),
run_sql_with_exceptions(APP_METRICS2_MV_TABLE_SQL()),
]
10 changes: 10 additions & 0 deletions posthog/clickhouse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
KAFKA_APP_METRICS_TABLE_SQL,
DISTRIBUTED_APP_METRICS_TABLE_SQL,
)
from posthog.models.app_metrics2.sql import (
APP_METRICS2_DATA_TABLE_SQL,
APP_METRICS2_MV_TABLE_SQL,
KAFKA_APP_METRICS2_TABLE_SQL,
DISTRIBUTED_APP_METRICS2_TABLE_SQL,
)
from posthog.models.channel_type.sql import (
CHANNEL_DEFINITION_TABLE_SQL,
CHANNEL_DEFINITION_DATA_SQL,
Expand Down Expand Up @@ -127,6 +133,7 @@
SESSION_RECORDING_EVENTS_TABLE_SQL,
INGESTION_WARNINGS_DATA_TABLE_SQL,
APP_METRICS_DATA_TABLE_SQL,
APP_METRICS2_DATA_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_SQL,
SESSION_REPLAY_EVENTS_TABLE_SQL,
CHANNEL_DEFINITION_TABLE_SQL,
Expand All @@ -141,6 +148,7 @@
DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL,
DISTRIBUTED_INGESTION_WARNINGS_TABLE_SQL,
DISTRIBUTED_APP_METRICS_TABLE_SQL,
DISTRIBUTED_APP_METRICS2_TABLE_SQL,
WRITABLE_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_PERFORMANCE_EVENTS_TABLE_SQL,
DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL,
Expand All @@ -165,6 +173,7 @@
KAFKA_SESSION_RECORDING_EVENTS_TABLE_SQL,
KAFKA_INGESTION_WARNINGS_TABLE_SQL,
KAFKA_APP_METRICS_TABLE_SQL,
KAFKA_APP_METRICS2_TABLE_SQL,
KAFKA_PERFORMANCE_EVENTS_TABLE_SQL,
KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL,
KAFKA_HEATMAPS_TABLE_SQL,
Expand All @@ -183,6 +192,7 @@
SESSION_RECORDING_EVENTS_TABLE_MV_SQL,
INGESTION_WARNINGS_MV_TABLE_SQL,
APP_METRICS_MV_TABLE_SQL,
APP_METRICS2_MV_TABLE_SQL,
PERFORMANCE_EVENTS_TABLE_MV_SQL,
SESSION_REPLAY_EVENTS_TABLE_MV_SQL,
SESSIONS_TABLE_MV_SQL,
Expand Down
153 changes: 153 additions & 0 deletions posthog/clickhouse/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@

'''
# ---
# name: test_create_kafka_table_with_different_kafka_host[kafka_app_metrics2]
'''

CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
app_source LowCardinality(String),
app_source_id String,
instance_id String,
metric_kind String,
metric_name String,
count Int64
)
ENGINE=Kafka('test.kafka.broker:9092', 'clickhouse_app_metrics2_test', 'group1', 'JSONEachRow')

'''
# ---
# name: test_create_kafka_table_with_different_kafka_host[kafka_app_metrics]
'''

Expand Down Expand Up @@ -416,6 +434,55 @@

'''
# ---
# name: test_create_table_query[app_metrics2]
'''

CREATE TABLE IF NOT EXISTS app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
-- The name of the service or product that generated the metrics.
-- Examples: plugins, hog
app_source LowCardinality(String),
-- An id for the app source.
-- Set app_source to avoid collision with ids from other app sources if the id generation is not safe.
-- Examples: A plugin id, a hog application id
app_source_id String,
-- A secondary id e.g. for the instance of app_source that generated this metric.
-- This may be ommitted if app_source is a singleton.
-- Examples: A plugin config id, a hog application config id
instance_id String,
metric_kind LowCardinality(String),
metric_name LowCardinality(String),
count SimpleAggregateFunction(sum, Int64)

, _timestamp DateTime
, _offset UInt64
, _partition UInt64

)
ENGINE=Distributed('posthog', 'posthog_test', 'sharded_app_metrics2', rand())

'''
# ---
# name: test_create_table_query[app_metrics2_mv]
'''

CREATE MATERIALIZED VIEW IF NOT EXISTS app_metrics2_mv ON CLUSTER 'posthog'
TO posthog_test.sharded_app_metrics2
AS SELECT
team_id,
timestamp,
app_source,
app_source_id,
instance_id,
metric_kind,
metric_name,
count
FROM posthog_test.kafka_app_metrics2

'''
# ---
# name: test_create_table_query[app_metrics]
'''

Expand Down Expand Up @@ -767,6 +834,24 @@

'''
# ---
# name: test_create_table_query[kafka_app_metrics2]
'''

CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
app_source LowCardinality(String),
app_source_id String,
instance_id String,
metric_kind String,
metric_name String,
count Int64
)
ENGINE=Kafka('kafka:9092', 'clickhouse_app_metrics2_test', 'group1', 'JSONEachRow')

'''
# ---
# name: test_create_table_query[kafka_app_metrics]
'''

Expand Down Expand Up @@ -1989,6 +2074,40 @@
GROUP BY `$session_id`, team_id


'''
# ---
# name: test_create_table_query[sharded_app_metrics2]
'''

CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
-- The name of the service or product that generated the metrics.
-- Examples: plugins, hog
app_source LowCardinality(String),
-- An id for the app source.
-- Set app_source to avoid collision with ids from other app sources if the id generation is not safe.
-- Examples: A plugin id, a hog application id
app_source_id String,
-- A secondary id e.g. for the instance of app_source that generated this metric.
-- This may be ommitted if app_source is a singleton.
-- Examples: A plugin config id, a hog application config id
instance_id String,
metric_kind LowCardinality(String),
metric_name LowCardinality(String),
count SimpleAggregateFunction(sum, Int64)

, _timestamp DateTime
, _offset UInt64
, _partition UInt64

)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_app_metrics2', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (team_id, app_source, app_source_id, instance_id, toStartOfHour(timestamp), metric_kind, metric_name)


'''
# ---
# name: test_create_table_query[sharded_app_metrics]
Expand Down Expand Up @@ -3049,6 +3168,40 @@

SETTINGS index_granularity=512

'''
# ---
# name: test_create_table_query_replicated_and_storage[sharded_app_metrics2]
'''

CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER 'posthog'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
-- The name of the service or product that generated the metrics.
-- Examples: plugins, hog
app_source LowCardinality(String),
-- An id for the app source.
-- Set app_source to avoid collision with ids from other app sources if the id generation is not safe.
-- Examples: A plugin id, a hog application id
app_source_id String,
-- A secondary id e.g. for the instance of app_source that generated this metric.
-- This may be ommitted if app_source is a singleton.
-- Examples: A plugin config id, a hog application config id
instance_id String,
metric_kind LowCardinality(String),
metric_name LowCardinality(String),
count SimpleAggregateFunction(sum, Int64)

, _timestamp DateTime
, _offset UInt64
, _partition UInt64

)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_app_metrics2', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (team_id, app_source, app_source_id, instance_id, toStartOfHour(timestamp), metric_kind, metric_name)


'''
# ---
# name: test_create_table_query_replicated_and_storage[sharded_app_metrics]
Expand Down
1 change: 1 addition & 0 deletions posthog/kafka_client/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
KAFKA_GROUPS = f"{KAFKA_PREFIX}clickhouse_groups{SUFFIX}"
KAFKA_INGESTION_WARNINGS = f"{KAFKA_PREFIX}clickhouse_ingestion_warnings{SUFFIX}"
KAFKA_APP_METRICS = f"{KAFKA_PREFIX}clickhouse_app_metrics{SUFFIX}"
KAFKA_APP_METRICS2 = f"{KAFKA_PREFIX}clickhouse_app_metrics2{SUFFIX}"
KAFKA_METRICS_TIME_TO_SEE_DATA = f"{KAFKA_PREFIX}clickhouse_metrics_time_to_see_data{SUFFIX}"
KAFKA_PERSON_OVERRIDE = f"{KAFKA_PREFIX}clickhouse_person_override{SUFFIX}"
KAFKA_LOG_ENTRIES = f"{KAFKA_PREFIX}log_entries{SUFFIX}"
Expand Down
Empty file.
131 changes: 131 additions & 0 deletions posthog/models/app_metrics2/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from django.conf import settings

from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS_WITH_PARTITION, kafka_engine, ttl_period
from posthog.clickhouse.table_engines import (
AggregatingMergeTree,
Distributed,
ReplicationScheme,
)
from posthog.kafka_client.topics import KAFKA_APP_METRICS2

APP_METRICS2_TTL_DAYS = 90

SHARDED_APP_METRICS2_TABLE_ENGINE = lambda: AggregatingMergeTree(
"sharded_app_metrics2", replication_scheme=ReplicationScheme.SHARDED
)

BASE_APP_METRICS2_COLUMNS = """
team_id Int64,
timestamp DateTime64(6, 'UTC'),
-- The name of the service or product that generated the metrics.
-- Examples: plugins, hog
app_source LowCardinality(String),
-- An id for the app source.
-- Set app_source to avoid collision with ids from other app sources if the id generation is not safe.
-- Examples: A plugin id, a hog application id
app_source_id String,
-- A secondary id e.g. for the instance of app_source that generated this metric.
-- This may be ommitted if app_source is a singleton.
-- Examples: A plugin config id, a hog application config id
instance_id String,
metric_kind LowCardinality(String),
metric_name LowCardinality(String),
count SimpleAggregateFunction(sum, Int64)
""".strip()

# NOTE: We have producers that take advantage of the timestamp being truncated to the hour,
# i.e. they batch up metrics and send them pre-truncated. If we ever change this truncation
# we need to revisit producers (e.g. the webhook service currently known as rusty-hook or pgqueue).
APP_METRICS2_TIMESTAMP_TRUNCATION = "toStartOfHour(timestamp)"

APP_METRICS2_DATA_TABLE_SQL = (
lambda: f"""
CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'
(
{BASE_APP_METRICS2_COLUMNS}
{KAFKA_COLUMNS_WITH_PARTITION}
)
ENGINE = {SHARDED_APP_METRICS2_TABLE_ENGINE()}
PARTITION BY toYYYYMM(timestamp)
ORDER BY (team_id, app_source, app_source_id, instance_id, {APP_METRICS2_TIMESTAMP_TRUNCATION}, metric_kind, metric_name)
{ttl_period("timestamp", APP_METRICS2_TTL_DAYS, unit="DAY")}
"""
)

DISTRIBUTED_APP_METRICS2_TABLE_SQL = (
lambda: f"""
CREATE TABLE IF NOT EXISTS app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'
(
{BASE_APP_METRICS2_COLUMNS}
{KAFKA_COLUMNS_WITH_PARTITION}
)
ENGINE={Distributed(data_table="sharded_app_metrics2", sharding_key="rand()")}
"""
)

KAFKA_APP_METRICS2_TABLE_SQL = (
lambda: f"""
CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
app_source LowCardinality(String),
app_source_id String,
instance_id String,
metric_kind String,
metric_name String,
count Int64
)
ENGINE={kafka_engine(topic=KAFKA_APP_METRICS2)}
"""
)

APP_METRICS2_MV_TABLE_SQL = (
lambda: f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS app_metrics2_mv ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'
TO {settings.CLICKHOUSE_DATABASE}.sharded_app_metrics2
AS SELECT
team_id,
timestamp,
app_source,
app_source_id,
instance_id,
metric_kind,
metric_name,
count
FROM {settings.CLICKHOUSE_DATABASE}.kafka_app_metrics2
"""
)

TRUNCATE_APP_METRICS2_TABLE_SQL = f"TRUNCATE TABLE IF EXISTS sharded_app_metrics2"

INSERT_APP_METRICS2_SQL = """
INSERT INTO sharded_app_metrics2 (
team_id,
timestamp,
app_source,
app_source_id,
instance_id,
metric_kind,
metric_name,
count,
_timestamp,
_offset,
_partition
)
SELECT
%(team_id)s,
%(timestamp)s,
%(app_source)s,
%(app_source_id)s,
%(instance_id)s,
%(successes)s,
%(skipped)s,
%(failures)s,
%(metric_kind)s,
%(metric_name)s,
%(count)s,
now(),
0,
0
"""

0 comments on commit f787067

Please sign in to comment.