diff --git a/posthog/clickhouse/migrations/0072_app_metrics2.py b/posthog/clickhouse/migrations/0072_app_metrics2.py new file mode 100644 index 0000000000000..d04c2bfca91aa --- /dev/null +++ b/posthog/clickhouse/migrations/0072_app_metrics2.py @@ -0,0 +1,19 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.models.app_metrics2.sql import ( + APP_METRICS2_DATA_TABLE_SQL, + APP_METRICS2_MV_TABLE_SQL, + DISTRIBUTED_APP_METRICS2_TABLE_SQL, + KAFKA_APP_METRICS2_TABLE_SQL, +) +from posthog.settings import CLICKHOUSE_CLUSTER + +operations = [ + run_sql_with_exceptions(f"DROP TABLE IF EXISTS app_metrics2_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), + run_sql_with_exceptions(f"DROP TABLE IF EXISTS kafka_app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), + run_sql_with_exceptions(f"DROP TABLE IF EXISTS app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), + run_sql_with_exceptions(f"DROP TABLE IF EXISTS sharded_app_metrics2 ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), + run_sql_with_exceptions(APP_METRICS2_DATA_TABLE_SQL()), + run_sql_with_exceptions(DISTRIBUTED_APP_METRICS2_TABLE_SQL()), + run_sql_with_exceptions(KAFKA_APP_METRICS2_TABLE_SQL()), + run_sql_with_exceptions(APP_METRICS2_MV_TABLE_SQL()), +] diff --git a/posthog/clickhouse/schema.py b/posthog/clickhouse/schema.py index 50591b5bc1451..4602615cd1ed7 100644 --- a/posthog/clickhouse/schema.py +++ b/posthog/clickhouse/schema.py @@ -30,6 +30,12 @@ KAFKA_APP_METRICS_TABLE_SQL, DISTRIBUTED_APP_METRICS_TABLE_SQL, ) +from posthog.models.app_metrics2.sql import ( + APP_METRICS2_DATA_TABLE_SQL, + APP_METRICS2_MV_TABLE_SQL, + KAFKA_APP_METRICS2_TABLE_SQL, + DISTRIBUTED_APP_METRICS2_TABLE_SQL, +) from posthog.models.channel_type.sql import ( CHANNEL_DEFINITION_TABLE_SQL, CHANNEL_DEFINITION_DATA_SQL, @@ -127,6 +133,7 @@ SESSION_RECORDING_EVENTS_TABLE_SQL, INGESTION_WARNINGS_DATA_TABLE_SQL, APP_METRICS_DATA_TABLE_SQL, + APP_METRICS2_DATA_TABLE_SQL, PERFORMANCE_EVENTS_TABLE_SQL, SESSION_REPLAY_EVENTS_TABLE_SQL, CHANNEL_DEFINITION_TABLE_SQL, @@ -141,6 +148,7 @@ DISTRIBUTED_SESSION_RECORDING_EVENTS_TABLE_SQL, DISTRIBUTED_INGESTION_WARNINGS_TABLE_SQL, DISTRIBUTED_APP_METRICS_TABLE_SQL, + DISTRIBUTED_APP_METRICS2_TABLE_SQL, WRITABLE_PERFORMANCE_EVENTS_TABLE_SQL, DISTRIBUTED_PERFORMANCE_EVENTS_TABLE_SQL, DISTRIBUTED_SESSION_REPLAY_EVENTS_TABLE_SQL, @@ -165,6 +173,7 @@ KAFKA_SESSION_RECORDING_EVENTS_TABLE_SQL, KAFKA_INGESTION_WARNINGS_TABLE_SQL, KAFKA_APP_METRICS_TABLE_SQL, + KAFKA_APP_METRICS2_TABLE_SQL, KAFKA_PERFORMANCE_EVENTS_TABLE_SQL, KAFKA_SESSION_REPLAY_EVENTS_TABLE_SQL, KAFKA_HEATMAPS_TABLE_SQL, @@ -183,6 +192,7 @@ SESSION_RECORDING_EVENTS_TABLE_MV_SQL, INGESTION_WARNINGS_MV_TABLE_SQL, APP_METRICS_MV_TABLE_SQL, + APP_METRICS2_MV_TABLE_SQL, PERFORMANCE_EVENTS_TABLE_MV_SQL, SESSION_REPLAY_EVENTS_TABLE_MV_SQL, SESSIONS_TABLE_MV_SQL, diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr index ef9dbf72329a8..8a71a18c954e4 100644 --- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr @@ -35,6 +35,24 @@ ''' # --- +# name: test_create_kafka_table_with_different_kafka_host[kafka_app_metrics2] + ''' + + CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER 'posthog' + ( + team_id Int64, + timestamp DateTime64(6, 'UTC'), + app_source LowCardinality(String), + app_source_id String, + instance_id String, + metric_kind String, + metric_name String, + count Int64 + ) + ENGINE=Kafka('test.kafka.broker:9092', 'clickhouse_app_metrics2_test', 'group1', 'JSONEachRow') + + ''' +# --- # name: test_create_kafka_table_with_different_kafka_host[kafka_app_metrics] ''' @@ -416,6 +434,55 @@ ''' # --- +# name: test_create_table_query[app_metrics2] + ''' + + CREATE TABLE IF NOT EXISTS app_metrics2 ON CLUSTER 'posthog' + ( + team_id Int64, + timestamp DateTime64(6, 'UTC'), + -- The name of the service or product that generated the metrics. + -- Examples: plugins, hog + app_source LowCardinality(String), + -- An id for the app source. + -- Set app_source to avoid collision with ids from other app sources if the id generation is not safe. + -- Examples: A plugin id, a hog application id + app_source_id String, + -- A secondary id e.g. for the instance of app_source that generated this metric. + -- This may be ommitted if app_source is a singleton. + -- Examples: A plugin config id, a hog application config id + instance_id String, + metric_kind LowCardinality(String), + metric_name LowCardinality(String), + count SimpleAggregateFunction(sum, Int64) + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + ) + ENGINE=Distributed('posthog', 'posthog_test', 'sharded_app_metrics2', rand()) + + ''' +# --- +# name: test_create_table_query[app_metrics2_mv] + ''' + + CREATE MATERIALIZED VIEW IF NOT EXISTS app_metrics2_mv ON CLUSTER 'posthog' + TO posthog_test.sharded_app_metrics2 + AS SELECT + team_id, + timestamp, + app_source, + app_source_id, + instance_id, + metric_kind, + metric_name, + count + FROM posthog_test.kafka_app_metrics2 + + ''' +# --- # name: test_create_table_query[app_metrics] ''' @@ -767,6 +834,24 @@ ''' # --- +# name: test_create_table_query[kafka_app_metrics2] + ''' + + CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER 'posthog' + ( + team_id Int64, + timestamp DateTime64(6, 'UTC'), + app_source LowCardinality(String), + app_source_id String, + instance_id String, + metric_kind String, + metric_name String, + count Int64 + ) + ENGINE=Kafka('kafka:9092', 'clickhouse_app_metrics2_test', 'group1', 'JSONEachRow') + + ''' +# --- # name: test_create_table_query[kafka_app_metrics] ''' @@ -1989,6 +2074,40 @@ GROUP BY `$session_id`, team_id + ''' +# --- +# name: test_create_table_query[sharded_app_metrics2] + ''' + + CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER 'posthog' + ( + team_id Int64, + timestamp DateTime64(6, 'UTC'), + -- The name of the service or product that generated the metrics. + -- Examples: plugins, hog + app_source LowCardinality(String), + -- An id for the app source. + -- Set app_source to avoid collision with ids from other app sources if the id generation is not safe. + -- Examples: A plugin id, a hog application id + app_source_id String, + -- A secondary id e.g. for the instance of app_source that generated this metric. + -- This may be ommitted if app_source is a singleton. + -- Examples: A plugin config id, a hog application config id + instance_id String, + metric_kind LowCardinality(String), + metric_name LowCardinality(String), + count SimpleAggregateFunction(sum, Int64) + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + ) + ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_app_metrics2', '{replica}') + PARTITION BY toYYYYMM(timestamp) + ORDER BY (team_id, app_source, app_source_id, instance_id, toStartOfHour(timestamp), metric_kind, metric_name) + + ''' # --- # name: test_create_table_query[sharded_app_metrics] @@ -3049,6 +3168,40 @@ SETTINGS index_granularity=512 + ''' +# --- +# name: test_create_table_query_replicated_and_storage[sharded_app_metrics2] + ''' + + CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER 'posthog' + ( + team_id Int64, + timestamp DateTime64(6, 'UTC'), + -- The name of the service or product that generated the metrics. + -- Examples: plugins, hog + app_source LowCardinality(String), + -- An id for the app source. + -- Set app_source to avoid collision with ids from other app sources if the id generation is not safe. + -- Examples: A plugin id, a hog application id + app_source_id String, + -- A secondary id e.g. for the instance of app_source that generated this metric. + -- This may be ommitted if app_source is a singleton. + -- Examples: A plugin config id, a hog application config id + instance_id String, + metric_kind LowCardinality(String), + metric_name LowCardinality(String), + count SimpleAggregateFunction(sum, Int64) + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + ) + ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_app_metrics2', '{replica}') + PARTITION BY toYYYYMM(timestamp) + ORDER BY (team_id, app_source, app_source_id, instance_id, toStartOfHour(timestamp), metric_kind, metric_name) + + ''' # --- # name: test_create_table_query_replicated_and_storage[sharded_app_metrics] diff --git a/posthog/kafka_client/topics.py b/posthog/kafka_client/topics.py index 4637a010df19a..20e83f218169f 100644 --- a/posthog/kafka_client/topics.py +++ b/posthog/kafka_client/topics.py @@ -16,6 +16,7 @@ KAFKA_GROUPS = f"{KAFKA_PREFIX}clickhouse_groups{SUFFIX}" KAFKA_INGESTION_WARNINGS = f"{KAFKA_PREFIX}clickhouse_ingestion_warnings{SUFFIX}" KAFKA_APP_METRICS = f"{KAFKA_PREFIX}clickhouse_app_metrics{SUFFIX}" +KAFKA_APP_METRICS2 = f"{KAFKA_PREFIX}clickhouse_app_metrics2{SUFFIX}" KAFKA_METRICS_TIME_TO_SEE_DATA = f"{KAFKA_PREFIX}clickhouse_metrics_time_to_see_data{SUFFIX}" KAFKA_PERSON_OVERRIDE = f"{KAFKA_PREFIX}clickhouse_person_override{SUFFIX}" KAFKA_LOG_ENTRIES = f"{KAFKA_PREFIX}log_entries{SUFFIX}" diff --git a/posthog/models/app_metrics2/__init__.py b/posthog/models/app_metrics2/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/posthog/models/app_metrics2/sql.py b/posthog/models/app_metrics2/sql.py new file mode 100644 index 0000000000000..0472180f542e2 --- /dev/null +++ b/posthog/models/app_metrics2/sql.py @@ -0,0 +1,131 @@ +from django.conf import settings + +from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS_WITH_PARTITION, kafka_engine, ttl_period +from posthog.clickhouse.table_engines import ( + AggregatingMergeTree, + Distributed, + ReplicationScheme, +) +from posthog.kafka_client.topics import KAFKA_APP_METRICS2 + +APP_METRICS2_TTL_DAYS = 90 + +SHARDED_APP_METRICS2_TABLE_ENGINE = lambda: AggregatingMergeTree( + "sharded_app_metrics2", replication_scheme=ReplicationScheme.SHARDED +) + +BASE_APP_METRICS2_COLUMNS = """ + team_id Int64, + timestamp DateTime64(6, 'UTC'), + -- The name of the service or product that generated the metrics. + -- Examples: plugins, hog + app_source LowCardinality(String), + -- An id for the app source. + -- Set app_source to avoid collision with ids from other app sources if the id generation is not safe. + -- Examples: A plugin id, a hog application id + app_source_id String, + -- A secondary id e.g. for the instance of app_source that generated this metric. + -- This may be ommitted if app_source is a singleton. + -- Examples: A plugin config id, a hog application config id + instance_id String, + metric_kind LowCardinality(String), + metric_name LowCardinality(String), + count SimpleAggregateFunction(sum, Int64) +""".strip() + +# NOTE: We have producers that take advantage of the timestamp being truncated to the hour, +# i.e. they batch up metrics and send them pre-truncated. If we ever change this truncation +# we need to revisit producers (e.g. the webhook service currently known as rusty-hook or pgqueue). +APP_METRICS2_TIMESTAMP_TRUNCATION = "toStartOfHour(timestamp)" + +APP_METRICS2_DATA_TABLE_SQL = ( + lambda: f""" +CREATE TABLE IF NOT EXISTS sharded_app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' +( + {BASE_APP_METRICS2_COLUMNS} + {KAFKA_COLUMNS_WITH_PARTITION} +) +ENGINE = {SHARDED_APP_METRICS2_TABLE_ENGINE()} +PARTITION BY toYYYYMM(timestamp) +ORDER BY (team_id, app_source, app_source_id, instance_id, {APP_METRICS2_TIMESTAMP_TRUNCATION}, metric_kind, metric_name) +{ttl_period("timestamp", APP_METRICS2_TTL_DAYS, unit="DAY")} +""" +) + +DISTRIBUTED_APP_METRICS2_TABLE_SQL = ( + lambda: f""" +CREATE TABLE IF NOT EXISTS app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' +( + {BASE_APP_METRICS2_COLUMNS} + {KAFKA_COLUMNS_WITH_PARTITION} +) +ENGINE={Distributed(data_table="sharded_app_metrics2", sharding_key="rand()")} +""" +) + +KAFKA_APP_METRICS2_TABLE_SQL = ( + lambda: f""" +CREATE TABLE IF NOT EXISTS kafka_app_metrics2 ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' +( + team_id Int64, + timestamp DateTime64(6, 'UTC'), + app_source LowCardinality(String), + app_source_id String, + instance_id String, + metric_kind String, + metric_name String, + count Int64 +) +ENGINE={kafka_engine(topic=KAFKA_APP_METRICS2)} +""" +) + +APP_METRICS2_MV_TABLE_SQL = ( + lambda: f""" +CREATE MATERIALIZED VIEW IF NOT EXISTS app_metrics2_mv ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}' +TO {settings.CLICKHOUSE_DATABASE}.sharded_app_metrics2 +AS SELECT +team_id, +timestamp, +app_source, +app_source_id, +instance_id, +metric_kind, +metric_name, +count +FROM {settings.CLICKHOUSE_DATABASE}.kafka_app_metrics2 +""" +) + +TRUNCATE_APP_METRICS2_TABLE_SQL = f"TRUNCATE TABLE IF EXISTS sharded_app_metrics2" + +INSERT_APP_METRICS2_SQL = """ +INSERT INTO sharded_app_metrics2 ( + team_id, + timestamp, + app_source, + app_source_id, + instance_id, + metric_kind, + metric_name, + count, + _timestamp, + _offset, + _partition +) +SELECT + %(team_id)s, + %(timestamp)s, + %(app_source)s, + %(app_source_id)s, + %(instance_id)s, + %(successes)s, + %(skipped)s, + %(failures)s, + %(metric_kind)s, + %(metric_name)s, + %(count)s, + now(), + 0, + 0 +"""