diff --git a/rust/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json b/rust/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json index 7a3a8b98d9da5..23b0665a2d357 100644 --- a/rust/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json +++ b/rust/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json @@ -4,7 +4,7 @@ "describe": { "columns": [], "parameters": { - "Left": ["Text", "Uuid", "Uuid"] + "Left": ["Bytea", "Uuid", "Uuid"] }, "nullable": [] }, diff --git a/rust/.sqlx/query-350983ef271029734aff70eb7e298bfe578ecaa8678268863bce917ced9d5d46.json b/rust/.sqlx/query-229c28c25aec24180c29e6ed636c165376f43116b85921c62b36b1b8e85562b0.json similarity index 87% rename from rust/.sqlx/query-350983ef271029734aff70eb7e298bfe578ecaa8678268863bce917ced9d5d46.json rename to rust/.sqlx/query-229c28c25aec24180c29e6ed636c165376f43116b85921c62b36b1b8e85562b0.json index d3a54ba7ef247..ffda6f4b70b26 100644 --- a/rust/.sqlx/query-350983ef271029734aff70eb7e298bfe578ecaa8678268863bce917ced9d5d46.json +++ b/rust/.sqlx/query-229c28c25aec24180c29e6ed636c165376f43116b85921c62b36b1b8e85562b0.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nWITH available AS (\n SELECT\n id,\n state\n FROM cyclotron_jobs\n WHERE\n state = 'available'::JobState\n AND queue_name = $1\n AND scheduled <= NOW()\n ORDER BY\n priority ASC,\n scheduled ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n)\nUPDATE cyclotron_jobs\nSET\n state = 'running'::JobState,\n lock_id = $3,\n last_heartbeat = NOW(),\n last_transition = NOW(),\n transition_count = transition_count + 1\nFROM available\nWHERE\n cyclotron_jobs.id = available.id\nRETURNING\n cyclotron_jobs.id,\n team_id,\n available.state as \"state: JobState\",\n queue_name,\n priority,\n function_id,\n created,\n last_transition,\n scheduled,\n transition_count,\n NULL as vm_state,\n metadata,\n parameters,\n lock_id,\n last_heartbeat,\n janitor_touch_count\n ", + "query": "\nWITH available AS (\n SELECT\n id,\n state\n FROM cyclotron_jobs\n WHERE\n state = 'available'::JobState\n AND queue_name = $1\n AND scheduled <= NOW()\n ORDER BY\n priority ASC,\n scheduled ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n)\nUPDATE cyclotron_jobs\nSET\n state = 'running'::JobState,\n lock_id = $3,\n last_heartbeat = NOW(),\n last_transition = NOW(),\n transition_count = transition_count + 1\nFROM available\nWHERE\n cyclotron_jobs.id = available.id\nRETURNING\n cyclotron_jobs.id,\n team_id,\n available.state as \"state: JobState\",\n queue_name,\n priority,\n function_id,\n created,\n last_transition,\n scheduled,\n transition_count,\n NULL::bytea as vm_state,\n metadata,\n parameters,\n blob,\n lock_id,\n last_heartbeat,\n janitor_touch_count\n ", "describe": { "columns": [ { @@ -63,30 +63,35 @@ { "ordinal": 10, "name": "vm_state", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 11, "name": "metadata", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 12, "name": "parameters", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 13, + "name": "blob", + "type_info": "Bytea" + }, + { + "ordinal": 14, "name": "lock_id", "type_info": "Uuid" }, { - "ordinal": 14, + "ordinal": 15, "name": "last_heartbeat", "type_info": "Timestamptz" }, { - "ordinal": 15, + "ordinal": 16, "name": "janitor_touch_count", "type_info": "Int2" } @@ -110,8 +115,9 @@ true, true, true, + true, false ] }, - "hash": "350983ef271029734aff70eb7e298bfe578ecaa8678268863bce917ced9d5d46" + "hash": "229c28c25aec24180c29e6ed636c165376f43116b85921c62b36b1b8e85562b0" } diff --git a/rust/.sqlx/query-42e393046a686e6a69daa920dc2ab521aa6f393027c399a0c40139f5f8a0a45e.json b/rust/.sqlx/query-42e393046a686e6a69daa920dc2ab521aa6f393027c399a0c40139f5f8a0a45e.json new file mode 100644 index 0000000000000..aa1ff7e7cf3e1 --- /dev/null +++ b/rust/.sqlx/query-42e393046a686e6a69daa920dc2ab521aa6f393027c399a0c40139f5f8a0a45e.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO posthog_eventproperty (event, property, team_id) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": ["Varchar", "Varchar", "Int4"] + }, + "nullable": [] + }, + "hash": "42e393046a686e6a69daa920dc2ab521aa6f393027c399a0c40139f5f8a0a45e" +} diff --git a/rust/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json b/rust/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json new file mode 100644 index 0000000000000..5a2231c7c2fdd --- /dev/null +++ b/rust/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE cyclotron_jobs SET blob = $1 WHERE id = $2 AND lock_id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": ["Bytea", "Uuid", "Uuid"] + }, + "nullable": [] + }, + "hash": "58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050" +} diff --git a/rust/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json b/rust/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json index 8c3a3dbde8b62..66ae665232405 100644 --- a/rust/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json +++ b/rust/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json @@ -4,7 +4,7 @@ "describe": { "columns": [], "parameters": { - "Left": ["Text", "Uuid", "Uuid"] + "Left": ["Bytea", "Uuid", "Uuid"] }, "nullable": [] }, diff --git a/rust/.sqlx/query-917e3d14c15558a1e0bb1d7015ed687eb545ee9d4ccbb8b69c958a357d49f687.json b/rust/.sqlx/query-917e3d14c15558a1e0bb1d7015ed687eb545ee9d4ccbb8b69c958a357d49f687.json new file mode 100644 index 0000000000000..188cd0be38966 --- /dev/null +++ b/rust/.sqlx/query-917e3d14c15558a1e0bb1d7015ed687eb545ee9d4ccbb8b69c958a357d49f687.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id, last_seen_at, created_at)\n VALUES ($1, $2, NULL, NULL, $3, $4, NOW()) ON CONFLICT\n ON CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq\n DO UPDATE SET last_seen_at = $4\n ", + "describe": { + "columns": [], + "parameters": { + "Left": ["Uuid", "Varchar", "Int4", "Timestamptz"] + }, + "nullable": [] + }, + "hash": "917e3d14c15558a1e0bb1d7015ed687eb545ee9d4ccbb8b69c958a357d49f687" +} diff --git a/rust/.sqlx/query-aa595eaf28c1f4b872c278be407b59cc00f3125413f4032ac3647a6b5ee1a632.json b/rust/.sqlx/query-aa595eaf28c1f4b872c278be407b59cc00f3125413f4032ac3647a6b5ee1a632.json index bd8a7cdd90282..51fb1b018120b 100644 --- a/rust/.sqlx/query-aa595eaf28c1f4b872c278be407b59cc00f3125413f4032ac3647a6b5ee1a632.json +++ b/rust/.sqlx/query-aa595eaf28c1f4b872c278be407b59cc00f3125413f4032ac3647a6b5ee1a632.json @@ -6,7 +6,7 @@ { "ordinal": 0, "name": "vm_state", - "type_info": "Text" + "type_info": "Bytea" } ], "parameters": { diff --git a/rust/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json b/rust/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json index ea9c7f8fceb06..4364f2fee8816 100644 --- a/rust/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json +++ b/rust/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json @@ -4,7 +4,7 @@ "describe": { "columns": [], "parameters": { - "Left": ["Text", "Uuid", "Uuid"] + "Left": ["Bytea", "Uuid", "Uuid"] }, "nullable": [] }, diff --git a/rust/.sqlx/query-c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee.json b/rust/.sqlx/query-c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee.json new file mode 100644 index 0000000000000..f6ad8e4e89247 --- /dev/null +++ b/rust/.sqlx/query-c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT group_type_index FROM posthog_grouptypemapping WHERE group_type = $1 AND team_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "group_type_index", + "type_info": "Int4" + } + ], + "parameters": { + "Left": ["Text", "Int4"] + }, + "nullable": [false] + }, + "hash": "c6ff00fcbbc77c8f5c1b3fe2f3352ea79485e403b9e17b6c37259ea0612065ee" +} diff --git a/rust/.sqlx/query-c624261597b9356ff3e7c3e392a84bb0b551e91c503e8b21c29814f1eb660a8e.json b/rust/.sqlx/query-ce036f16a37a41b9dc5a164de0b52345454cd3323568c4bef5b8480380287068.json similarity index 89% rename from rust/.sqlx/query-c624261597b9356ff3e7c3e392a84bb0b551e91c503e8b21c29814f1eb660a8e.json rename to rust/.sqlx/query-ce036f16a37a41b9dc5a164de0b52345454cd3323568c4bef5b8480380287068.json index b94965873e7d6..fe174820c3a07 100644 --- a/rust/.sqlx/query-c624261597b9356ff3e7c3e392a84bb0b551e91c503e8b21c29814f1eb660a8e.json +++ b/rust/.sqlx/query-ce036f16a37a41b9dc5a164de0b52345454cd3323568c4bef5b8480380287068.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nWITH available AS (\n SELECT\n id,\n state\n FROM cyclotron_jobs\n WHERE\n state = 'available'::JobState\n AND queue_name = $1\n AND scheduled <= NOW()\n ORDER BY\n priority ASC,\n scheduled ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n)\nUPDATE cyclotron_jobs\nSET\n state = 'running'::JobState,\n lock_id = $3,\n last_heartbeat = NOW(),\n last_transition = NOW(),\n transition_count = transition_count + 1\nFROM available\nWHERE\n cyclotron_jobs.id = available.id\nRETURNING\n cyclotron_jobs.id,\n team_id,\n available.state as \"state: JobState\",\n queue_name,\n priority,\n function_id,\n created,\n last_transition,\n scheduled,\n transition_count,\n vm_state,\n metadata,\n parameters,\n lock_id,\n last_heartbeat,\n janitor_touch_count\n ", + "query": "\nWITH available AS (\n SELECT\n id,\n state\n FROM cyclotron_jobs\n WHERE\n state = 'available'::JobState\n AND queue_name = $1\n AND scheduled <= NOW()\n ORDER BY\n priority ASC,\n scheduled ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n)\nUPDATE cyclotron_jobs\nSET\n state = 'running'::JobState,\n lock_id = $3,\n last_heartbeat = NOW(),\n last_transition = NOW(),\n transition_count = transition_count + 1\nFROM available\nWHERE\n cyclotron_jobs.id = available.id\nRETURNING\n cyclotron_jobs.id,\n team_id,\n available.state as \"state: JobState\",\n queue_name,\n priority,\n function_id,\n created,\n last_transition,\n scheduled,\n transition_count,\n vm_state,\n metadata,\n parameters,\n blob,\n lock_id,\n last_heartbeat,\n janitor_touch_count\n ", "describe": { "columns": [ { @@ -63,30 +63,35 @@ { "ordinal": 10, "name": "vm_state", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 11, "name": "metadata", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 12, "name": "parameters", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 13, + "name": "blob", + "type_info": "Bytea" + }, + { + "ordinal": 14, "name": "lock_id", "type_info": "Uuid" }, { - "ordinal": 14, + "ordinal": 15, "name": "last_heartbeat", "type_info": "Timestamptz" }, { - "ordinal": 15, + "ordinal": 16, "name": "janitor_touch_count", "type_info": "Int2" } @@ -110,8 +115,9 @@ true, true, true, + true, false ] }, - "hash": "c624261597b9356ff3e7c3e392a84bb0b551e91c503e8b21c29814f1eb660a8e" + "hash": "ce036f16a37a41b9dc5a164de0b52345454cd3323568c4bef5b8480380287068" } diff --git a/rust/.sqlx/query-eecef0ce664dfe65dff4452d92a29c948a291ea8218bbbb4e25cd1ad36dbe9f4.json b/rust/.sqlx/query-eecef0ce664dfe65dff4452d92a29c948a291ea8218bbbb4e25cd1ad36dbe9f4.json new file mode 100644 index 0000000000000..78ca221cb2f49 --- /dev/null +++ b/rust/.sqlx/query-eecef0ce664dfe65dff4452d92a29c948a291ea8218bbbb4e25cd1ad36dbe9f4.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO posthog_propertydefinition (id, name, type, group_type_index, is_numerical, volume_30_day, query_usage_30_day, team_id, property_type)\n VALUES ($1, $2, $3, $4, $5, NULL, NULL, $6, $7)\n ON CONFLICT (team_id, name, type, coalesce(group_type_index, -1))\n DO UPDATE SET property_type=EXCLUDED.property_type WHERE posthog_propertydefinition.property_type IS NULL\n ", + "describe": { + "columns": [], + "parameters": { + "Left": ["Uuid", "Varchar", "Int2", "Int2", "Bool", "Int4", "Varchar"] + }, + "nullable": [] + }, + "hash": "eecef0ce664dfe65dff4452d92a29c948a291ea8218bbbb4e25cd1ad36dbe9f4" +} diff --git a/rust/.sqlx/query-7217e766aeb53298238222c0c71a2ce446cac731845c53cb926fc47ace708dd6.json b/rust/.sqlx/query-f074766d1fc32df17f92667f412af30c682288988fc6f102e8a063be97c3e51c.json similarity index 72% rename from rust/.sqlx/query-7217e766aeb53298238222c0c71a2ce446cac731845c53cb926fc47ace708dd6.json rename to rust/.sqlx/query-f074766d1fc32df17f92667f412af30c682288988fc6f102e8a063be97c3e51c.json index 230374e98d610..6139be53026c1 100644 --- a/rust/.sqlx/query-7217e766aeb53298238222c0c71a2ce446cac731845c53cb926fc47ace708dd6.json +++ b/rust/.sqlx/query-f074766d1fc32df17f92667f412af30c682288988fc6f102e8a063be97c3e51c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nINSERT INTO cyclotron_jobs\n (\n id,\n team_id,\n function_id,\n created,\n lock_id,\n last_heartbeat,\n janitor_touch_count,\n transition_count,\n last_transition,\n queue_name,\n state,\n scheduled,\n priority,\n vm_state,\n metadata,\n parameters\n )\nVALUES\n ($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10)\n ", + "query": "\nINSERT INTO cyclotron_jobs\n (\n id,\n team_id,\n function_id,\n created,\n lock_id,\n last_heartbeat,\n janitor_touch_count,\n transition_count,\n last_transition,\n queue_name,\n state,\n scheduled,\n priority,\n vm_state,\n metadata,\n parameters,\n blob\n )\nVALUES\n ($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10, $11)\n ", "describe": { "columns": [], "parameters": { @@ -19,12 +19,13 @@ }, "Timestamptz", "Int2", - "Text", - "Text", - "Text" + "Bytea", + "Bytea", + "Bytea", + "Bytea" ] }, "nullable": [] }, - "hash": "7217e766aeb53298238222c0c71a2ce446cac731845c53cb926fc47ace708dd6" + "hash": "f074766d1fc32df17f92667f412af30c682288988fc6f102e8a063be97c3e51c" } diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8dc04e80b75b7..16cbcf2b3cfa8 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2839,6 +2839,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml index 534977b5fedd8..6deb3bc0c22f7 100644 --- a/rust/property-defs-rs/Cargo.toml +++ b/rust/property-defs-rs/Cargo.toml @@ -22,6 +22,7 @@ chrono = { workspace = true } quick_cache = { workspace = true } common-metrics = { path = "../common/metrics" } ahash = { workspace = true } +uuid = { workspace = true } [lints] workspace = true diff --git a/rust/property-defs-rs/README.md b/rust/property-defs-rs/README.md new file mode 100644 index 0000000000000..b4fd40c273a05 --- /dev/null +++ b/rust/property-defs-rs/README.md @@ -0,0 +1,5 @@ +# Event and property definitions generator (propdefs for short) + +This consumes events from `clickhouse_events_json` and writes event and property definitions to postgres. It filters DB updates to avoid duplicate writes (writes are idempotent, but the DB load of writing every property definition every time would be too high). + +Hoglets should check out [the runbook](http://runbooks/ingestion/property-defs-rs) for a detailed breakdown of how it's tuned for our current scale, and what metrics to look at and levers to pull if responding to an incident. diff --git a/rust/property-defs-rs/src/app_context.rs b/rust/property-defs-rs/src/app_context.rs index 1ac2a97d4a842..cfd86d7839193 100644 --- a/rust/property-defs-rs/src/app_context.rs +++ b/rust/property-defs-rs/src/app_context.rs @@ -1,32 +1,128 @@ use health::{HealthHandle, HealthRegistry}; +use quick_cache::sync::Cache; +use sqlx::{postgres::PgPoolOptions, PgPool}; +use time::Duration; -use crate::{config::Config, metrics_consts::UPDATES_ISSUED, types::Update}; +use crate::{ + config::Config, + metrics_consts::{ + CACHE_WARMING_STATE, GROUP_TYPE_READS, GROUP_TYPE_RESOLVE_TIME, UPDATES_ISSUED, + UPDATE_TRANSACTION_TIME, + }, + types::{GroupType, Update}, +}; pub struct AppContext { - //pub pool: PgPool, + pub pool: PgPool, pub liveness: HealthRegistry, pub worker_liveness: HealthHandle, + pub cache_warming_delay: Duration, + pub cache_warming_cutoff: f64, + pub skip_writes: bool, + pub skip_reads: bool, + pub group_type_cache: Cache, } impl AppContext { - pub async fn new(_config: &Config) -> Result { - //let options = PgPoolOptions::new().max_connections(config.max_pg_connections); - //let pool = options.connect(&config.database_url).await?; + pub async fn new(config: &Config) -> Result { + let options = PgPoolOptions::new().max_connections(config.max_pg_connections); + let pool = options.connect(&config.database_url).await?; let liveness: HealthRegistry = HealthRegistry::new("liveness"); let worker_liveness = liveness - .register("worker".to_string(), time::Duration::seconds(60)) + .register("worker".to_string(), Duration::seconds(60)) .await; + let group_type_cache = Cache::new(config.group_type_cache_size); + Ok(Self { - //pool, + pool, liveness, worker_liveness, + cache_warming_delay: Duration::milliseconds(config.cache_warming_delay_ms as i64), + cache_warming_cutoff: 0.9, + skip_writes: config.skip_writes, + skip_reads: config.skip_reads, + group_type_cache, }) } - pub async fn issue(&self, updates: Vec) -> Result<(), sqlx::Error> { - metrics::counter!(UPDATES_ISSUED).increment(updates.len() as u64); + pub async fn issue( + &self, + mut updates: Vec, + cache_consumed: f64, + ) -> Result<(), sqlx::Error> { + if cache_consumed < self.cache_warming_cutoff { + metrics::gauge!(CACHE_WARMING_STATE, &[("state", "warming")]).set(cache_consumed); + let to_sleep = self.cache_warming_delay * (1.0 - cache_consumed); + tokio::time::sleep(to_sleep.try_into().unwrap()).await; + } else { + metrics::gauge!(CACHE_WARMING_STATE, &[("state", "hot")]).set(1.0); + } + + let update_count = updates.len(); + + let group_type_resolve_time = common_metrics::timing_guard(GROUP_TYPE_RESOLVE_TIME, &[]); + self.resolve_group_types_indexes(&mut updates).await?; + group_type_resolve_time.fin(); + + let transaction_time = common_metrics::timing_guard(UPDATE_TRANSACTION_TIME, &[]); + if !self.skip_writes && !self.skip_reads { + let mut tx = self.pool.begin().await?; + + for update in updates { + update.issue(&mut *tx).await?; + } + tx.commit().await?; + } + transaction_time.fin(); + + metrics::counter!(UPDATES_ISSUED).increment(update_count as u64); + Ok(()) + } + + async fn resolve_group_types_indexes(&self, updates: &mut [Update]) -> Result<(), sqlx::Error> { + if self.skip_reads { + return Ok(()); + } + + for update in updates { + // Only property definitions have group types + let Update::Property(update) = update else { + continue; + }; + // If we didn't find a group type, we have nothing to resolve. + let Some(GroupType::Unresolved(name)) = &update.group_type_index else { + continue; + }; + + let cached = self.group_type_cache.get(name); + if let Some(index) = cached { + update.group_type_index = + update.group_type_index.take().map(|gti| gti.resolve(index)); + continue; + } + + metrics::counter!(GROUP_TYPE_READS).increment(1); + + let found = sqlx::query_scalar!( + "SELECT group_type_index FROM posthog_grouptypemapping WHERE group_type = $1 AND team_id = $2", + name, + update.team_id + ) + .bind(name) + .fetch_optional(&self.pool) + .await?; + + if let Some(index) = found { + self.group_type_cache.insert(name.to_string(), index); + update.group_type_index = + update.group_type_index.take().map(|gti| gti.resolve(index)); + } else { + // If we fail to resolve a group type, we just don't write it + update.group_type_index = None; + } + } Ok(()) } } diff --git a/rust/property-defs-rs/src/config.rs b/rust/property-defs-rs/src/config.rs index 81864156a3c35..7afe4589944f9 100644 --- a/rust/property-defs-rs/src/config.rs +++ b/rust/property-defs-rs/src/config.rs @@ -36,6 +36,25 @@ pub struct Config { #[envconfig(default = "1000000")] pub cache_capacity: usize, + // We impose a slow-start, where each batch update operation is delayed by + // this many milliseconds, multiplied by the % of the cache currently unused. The idea + // is that we want to drip-feed updates to the DB during warmup, since + // cache fill rate is highest when it's most empty, and cache fill rate + // is exactly equivalent to the rate at which we can issue updates to the DB. + // The maths here is: + // max(writes/s) = max_concurrent_transactions * update_batch_size / transaction_seconds + // By artificially inflating transaction_time, we put a cap on writes/s. This cap is + // then loosened as the cache fills, until we're operating in "normal" mode and + // only presenting "true" DB backpressure (in the form of write time) to the main loop. + #[envconfig(default = "1000")] + pub cache_warming_delay_ms: u32, + + // This is the slow-start cutoff. Once the cache is this full, we + // don't delay the batch updates any more. 50% is fine for testing, + // in production you want to be using closer to 80-90% + #[envconfig(default = "0.5")] + pub cache_warming_cutoff: f64, + // Each worker maintains a small local batch of updates, which it // flushes to the main thread (updating/filtering by the // cross-thread cache while it does). This is that batch size. @@ -53,6 +72,21 @@ pub struct Config { #[envconfig(default = "10000")] pub update_count_skip_threshold: usize, + // Do everything except actually write to the DB + #[envconfig(default = "true")] + pub skip_writes: bool, + + // Do everything except actually read or write from the DB + #[envconfig(default = "true")] + pub skip_reads: bool, + + // We maintain a small cache for mapping from group names to group type indexes. + // You have very few reasons to ever change this... group type index resolution + // is done as a final step before writing an update, and is low-cost even without + // caching, compared to the rest of the process. + #[envconfig(default = "100000")] + pub group_type_cache_size: usize, + #[envconfig(from = "BIND_HOST", default = "::")] pub host: String, diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs index b3155a4207aed..2fa7b94614081 100644 --- a/rust/property-defs-rs/src/main.rs +++ b/rust/property-defs-rs/src/main.rs @@ -108,6 +108,8 @@ async fn spawn_producer_loop( Err(TrySendError::Full(update)) => { warn!("Worker blocked"); metrics::counter!(WORKER_BLOCKED).increment(1); + // Workers should just die if the channel is dropped, since that indicates + // the main loop is dead. channel.send(update).await.unwrap(); } Err(e) => { @@ -187,16 +189,18 @@ async fn main() -> Result<(), Box> { } batch_time.fin(); - metrics::gauge!(CACHE_CONSUMED).set(cache.len() as f64); - metrics::gauge!(TRANSACTION_LIMIT_SATURATION).set( (config.max_concurrent_transactions - transaction_limit.available_permits()) as f64, ); - // We unconditionally wait to acquire a transaction permit - this is our backpressure mechanism. If we + let cache_utilization = cache.len() as f64 / config.cache_capacity as f64; + metrics::gauge!(CACHE_CONSUMED).set(cache_utilization); + + // We unconditionally wait to wait for a transaction permit - this is our backpressure mechanism. If we // fail to acquire a permit for long enough, we will fail liveness checks (but that implies our ongoing // transactions are halted, at which point DB health is a concern). let permit_acquire_time = common_metrics::timing_guard(PERMIT_WAIT_TIME, &[]); + // This semaphore will never be closed. let permit = transaction_limit.clone().acquire_owned().await.unwrap(); permit_acquire_time.fin(); @@ -204,7 +208,9 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { let _permit = permit; let issue_time = common_metrics::timing_guard(UPDATE_ISSUE_TIME, &[]); - context.issue(batch).await.unwrap(); + if let Err(e) = context.issue(batch, cache_utilization).await { + warn!("Issue failed: {:?}", e); + } issue_time.fin(); }); } diff --git a/rust/property-defs-rs/src/metrics_consts.rs b/rust/property-defs-rs/src/metrics_consts.rs index 0c4ac2f297ad3..e9920c0c6cb1c 100644 --- a/rust/property-defs-rs/src/metrics_consts.rs +++ b/rust/property-defs-rs/src/metrics_consts.rs @@ -15,3 +15,8 @@ pub const UPDATE_ISSUE_TIME: &str = "prop_defs_update_issue_time_ms"; pub const CACHE_CONSUMED: &str = "prop_defs_cache_space"; pub const RECV_DEQUEUED: &str = "prop_defs_recv_dequeued"; pub const COMPACTED_UPDATES: &str = "prop_defs_compaction_dropped_updates"; +pub const CACHE_WARMING_STATE: &str = "prop_defs_cache_state"; +pub const UPDATE_TRANSACTION_TIME: &str = "prop_defs_update_transaction_time_ms"; +pub const GROUP_TYPE_RESOLVE_TIME: &str = "prop_defs_group_type_resolve_time_ms"; +pub const UPDATES_SKIPPED: &str = "prop_defs_skipped_updates"; +pub const GROUP_TYPE_READS: &str = "prop_defs_group_type_reads"; diff --git a/rust/property-defs-rs/src/types.rs b/rust/property-defs-rs/src/types.rs index 5242ec921b67b..dcff22785af79 100644 --- a/rust/property-defs-rs/src/types.rs +++ b/rust/property-defs-rs/src/types.rs @@ -3,10 +3,16 @@ use std::{fmt, hash::Hash, str::FromStr}; use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; +use sqlx::{Executor, Postgres}; use tracing::warn; +use uuid::Uuid; -use crate::metrics_consts::EVENTS_SKIPPED; +use crate::metrics_consts::{EVENTS_SKIPPED, UPDATES_SKIPPED}; +// We skip updates for events we generate +pub const EVENTS_WITHOUT_PROPERTIES: [&str; 1] = ["$$plugin_metrics"]; + +// These properties have special meaning, and are ignored pub const SKIP_PROPERTIES: [&str; 9] = [ "$set", "$set_once", @@ -44,7 +50,7 @@ pub enum PropertyValueType { String, Numeric, Boolean, - Duration, + Duration, // Unused, but exists. } impl fmt::Display for PropertyValueType { @@ -66,13 +72,22 @@ pub enum GroupType { Resolved(String, i32), } +impl GroupType { + pub fn resolve(self, index: i32) -> Self { + match self { + GroupType::Unresolved(name) => GroupType::Resolved(name, index), + GroupType::Resolved(name, _) => GroupType::Resolved(name, index), + } + } +} + #[derive(Clone, Debug, Eq, PartialEq)] pub struct PropertyDefinition { pub team_id: i32, pub name: String, pub is_numerical: bool, pub property_type: Option, - pub event_type: Option, + pub event_type: PropertyParentType, pub group_type_index: Option, pub property_type_format: Option, // Deprecated pub volume_30_day: Option, // Deprecated @@ -102,6 +117,19 @@ pub enum Update { EventProperty(EventProperty), } +impl Update { + pub async fn issue<'c, E>(&self, executor: E) -> Result<(), sqlx::Error> + where + E: Executor<'c, Database = Postgres>, + { + match self { + Update::Event(e) => e.issue(executor).await, + Update::Property(p) => p.issue(executor).await, + Update::EventProperty(ep) => ep.issue(executor).await, + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Event { pub team_id: i32, @@ -115,7 +143,9 @@ impl From<&Event> for EventDefinition { name: sanitize_event_name(&event.event), team_id: event.team_id, // We round last seen to the nearest day, as per the TS impl. Unwrap is safe here because we - // the duration is positive, non-zero, and smaller than time since epoch + // the duration is positive, non-zero, and smaller than time since epoch. We use this + // in the hash value, so updates which would modify this in the DB are issued even + // if another otherwise-identical event definition is in the cache last_seen_at: floor_datetime(Utc::now(), Duration::days(1)).unwrap(), } } @@ -123,6 +153,17 @@ impl From<&Event> for EventDefinition { impl Event { pub fn into_updates(self, skip_threshold: usize) -> Vec { + if EVENTS_WITHOUT_PROPERTIES.contains(&self.event.as_str()) { + metrics::counter!(EVENTS_SKIPPED, &[("reason", "no_properties_event")]).increment(1); + return vec![]; + } + + if !will_fit_in_postgres_column(&self.event) { + metrics::counter!(EVENTS_SKIPPED, &[("reason", "name_wont_fit_in_postgres")]) + .increment(1); + return vec![]; + } + let team_id = self.team_id; let event = self.event.clone(); @@ -132,7 +173,7 @@ impl Event { "Event {} for team {} has more than 10,000 properties, skipping", event, team_id ); - metrics::counter!(EVENTS_SKIPPED).increment(1); + metrics::counter!(EVENTS_SKIPPED, &[("reason", "too_many_properties")]).increment(1); return vec![]; } @@ -209,6 +250,15 @@ impl Event { continue; } + if !will_fit_in_postgres_column(key) { + metrics::counter!( + UPDATES_SKIPPED, + &[("reason", "property_name_wont_fit_in_postgres")] + ) + .increment(2); // We're skipping one EventProperty, and one PropertyDefinition + continue; + } + updates.push(Update::EventProperty(EventProperty { team_id: self.team_id, event: self.event.clone(), @@ -223,7 +273,7 @@ impl Event { name: key.clone(), is_numerical, property_type, - event_type: Some(parent_type), + event_type: parent_type, group_type_index: group_type.clone(), property_type_format: None, volume_30_day: None, @@ -333,7 +383,10 @@ impl Hash for GroupType { } } -fn floor_datetime(dt: DateTime, duration: Duration) -> Result, RoundingError> { +pub fn floor_datetime( + dt: DateTime, + duration: Duration, +) -> Result, RoundingError> { let rounded = dt.duration_round(duration)?; // If we rounded up @@ -343,3 +396,86 @@ fn floor_datetime(dt: DateTime, duration: Duration) -> Result Ok(rounded) } } + +// We impose some limits on some fields for legacy reasons, and drop updates that don't conform to them +pub const DJANGO_MAX_CHARFIELD_LENGTH: usize = 400; +fn will_fit_in_postgres_column(str: &str) -> bool { + str.len() <= DJANGO_MAX_CHARFIELD_LENGTH / 2 +} + +// Postgres doesn't like nulls in strings, so we replace them with uFFFD. +// This allocates, so only do it right when hitting the DB. We handle nulls +// in strings just fine. +pub fn sanitize_string(s: &str) -> String { + s.replace('\u{0000}', "\u{FFFD}") +} + +// The queries below are pulled more-or-less exactly from the TS impl. + +impl EventDefinition { + pub async fn issue<'c, E>(&self, executor: E) -> Result<(), sqlx::Error> + where + E: Executor<'c, Database = Postgres>, + { + sqlx::query!( + r#" + INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id, last_seen_at, created_at) + VALUES ($1, $2, NULL, NULL, $3, $4, NOW()) ON CONFLICT + ON CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq + DO UPDATE SET last_seen_at = $4 + "#, + Uuid::now_v7(), + self.name, + self.team_id, + self.last_seen_at + ).execute(executor).await.map(|_| ()) + } +} + +impl PropertyDefinition { + pub async fn issue<'c, E>(&self, executor: E) -> Result<(), sqlx::Error> + where + E: Executor<'c, Database = Postgres>, + { + let group_type_index = match &self.group_type_index { + Some(GroupType::Resolved(_, i)) => Some(*i as i16), + _ => { + warn!("Group type not resolved for property definition, skipping"); + None + } + }; + + sqlx::query!( + r#" + INSERT INTO posthog_propertydefinition (id, name, type, group_type_index, is_numerical, volume_30_day, query_usage_30_day, team_id, property_type) + VALUES ($1, $2, $3, $4, $5, NULL, NULL, $6, $7) + ON CONFLICT (team_id, name, type, coalesce(group_type_index, -1)) + DO UPDATE SET property_type=EXCLUDED.property_type WHERE posthog_propertydefinition.property_type IS NULL + "#, + Uuid::now_v7(), + self.name, + self.event_type as i16, + group_type_index, + self.is_numerical, + self.team_id, + self.property_type.as_ref().map(|t| t.to_string()) + ).execute(executor).await.map(|_| ()) + } +} + +impl EventProperty { + pub async fn issue<'c, E>(&self, executor: E) -> Result<(), sqlx::Error> + where + E: Executor<'c, Database = Postgres>, + { + sqlx::query!( + r#"INSERT INTO posthog_eventproperty (event, property, team_id) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"#, + self.event, + self.property, + self.team_id + ) + .execute(executor) + .await + .map(|_| ()) + } +} diff --git a/rust/property-defs-rs/tests/test_migrations/20240830124836_setup_propdefs_tables.sql b/rust/property-defs-rs/tests/test_migrations/20240830124836_setup_propdefs_tables.sql new file mode 100644 index 0000000000000..ec83e55a73dae --- /dev/null +++ b/rust/property-defs-rs/tests/test_migrations/20240830124836_setup_propdefs_tables.sql @@ -0,0 +1,41 @@ +-- These mimic the posthog main-db property, event and event-property tables, and are only used +-- for testing (so we can use `sqlx::test`) + +-- Create a unique contraint on posthog_eventdefinition for team_id and name, matching the django one + +CREATE TABLE IF NOT EXISTS posthog_eventdefinition ( + id UUID PRIMARY KEY, + name TEXT NOT NULL, + volume_30_day INTEGER, + query_usage_30_day INTEGER, + team_id INTEGER NOT NULL, + last_seen_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq UNIQUE (team_id, name) +); + + +CREATE TABLE IF NOT EXISTS posthog_propertydefinition ( + id UUID PRIMARY KEY, + name VARCHAR(400) NOT NULL, + is_numerical BOOLEAN NOT NULL, + query_usage_30_day INTEGER, + property_type VARCHAR(50), + property_type_format VARCHAR(50), + volume_30_day INTEGER, + team_id INTEGER NOT NULL, + group_type_index SMALLINT, + type SMALLINT NOT NULL DEFAULT 1 +); + +CREATE UNIQUE INDEX posthog_propertydefinition_uniq ON posthog_propertydefinition (team_id, name, type, coalesce(group_type_index, -1)); + + +CREATE TABLE IF NOT EXISTS posthog_eventproperty ( + id SERIAL PRIMARY KEY, + event VARCHAR(400)NOT NULL, + property VARCHAR(400) NOT NULL, + team_id INTEGER NOT NULL +); + +CREATE UNIQUE INDEX posthog_event_property_unique_team_event_property ON posthog_eventproperty (team_id, event, property); \ No newline at end of file diff --git a/rust/property-defs-rs/tests/updates.rs b/rust/property-defs-rs/tests/updates.rs new file mode 100644 index 0000000000000..c907909c0e459 --- /dev/null +++ b/rust/property-defs-rs/tests/updates.rs @@ -0,0 +1,148 @@ +use chrono::{DateTime, Duration, Utc}; +use property_defs_rs::types::{floor_datetime, Event, PropertyParentType, PropertyValueType}; +use serde_json::json; +use sqlx::PgPool; + +#[sqlx::test(migrations = "./tests/test_migrations")] +async fn test_updates(db: PgPool) { + let properties = r#" + { + "TIMESTAMP": 5424245435435435, + "some_string": "some_value", + "some_int": 42, + "some_float": 3.14, + "some_bool": true, + "some_bool_as_string": "true" + } + "#; + + let event_src = json!({ + "team_id": 1, + "event": "update", + "properties": properties + }); + + // Test fanout limiting + let event = serde_json::from_value::(event_src.clone()).unwrap(); + let updates = event.into_updates(10); + assert_eq!(updates.len(), 0); + + // Test that the event is correctly split into updates + let event = serde_json::from_value::(event_src).unwrap(); + let updates = event.into_updates(1000); + assert!(updates.len() == 13); + + // issue them, and then query the database to see that everthing we expect to exist does + for update in updates { + update.issue(&db).await.unwrap(); + } + + let today = floor_datetime(Utc::now(), Duration::days(1)).unwrap(); + + assert_eventdefinition_exists(&db, "update", 1, today).await; + assert_propertydefinition_exists( + &db, + "TIMESTAMP", + PropertyParentType::Event, + false, + 1, + PropertyValueType::DateTime, + ) + .await; + assert_propertydefinition_exists( + &db, + "some_string", + PropertyParentType::Event, + false, + 1, + PropertyValueType::String, + ) + .await; + assert_propertydefinition_exists( + &db, + "some_int", + PropertyParentType::Event, + true, + 1, + PropertyValueType::Numeric, + ) + .await; + assert_propertydefinition_exists( + &db, + "some_float", + PropertyParentType::Event, + true, + 1, + PropertyValueType::Numeric, + ) + .await; + assert_propertydefinition_exists( + &db, + "some_bool", + PropertyParentType::Event, + false, + 1, + PropertyValueType::Boolean, + ) + .await; + assert_propertydefinition_exists( + &db, + "some_bool_as_string", + PropertyParentType::Event, + false, + 1, + PropertyValueType::Boolean, + ) + .await; +} + +async fn assert_eventdefinition_exists( + db: &PgPool, + name: &str, + team_id: i32, + last_seen_at: DateTime, +) { + let count: Option = sqlx::query_scalar( + r#" + SELECT COUNT(*) + FROM posthog_eventdefinition + WHERE name = $1 AND team_id = $2 AND last_seen_at = $3 + "#, + ) + .bind(name) + .bind(team_id) + .bind(last_seen_at) + .fetch_one(db) + .await + .unwrap(); + + assert_eq!(count, Some(1)); +} + +async fn assert_propertydefinition_exists( + db: &PgPool, + name: &str, + event_type: PropertyParentType, + is_numerical: bool, + team_id: i32, + property_type: PropertyValueType, +) { + println!("Checking property definition for {}", name); + let count: Option = sqlx::query_scalar( + r#" + SELECT COUNT(*) + FROM posthog_propertydefinition + WHERE name = $1 AND type = $2 AND is_numerical = $3 AND team_id = $4 AND property_type = $5 + "#, + ) + .bind(name) + .bind(event_type as i32) + .bind(is_numerical) + .bind(team_id) + .bind(property_type.to_string()) + .fetch_one(db) + .await + .unwrap(); + + assert_eq!(count, Some(1)); +}