From f0d94b0ddf0020828e5fce1294cd65dec5410b5a Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Tue, 27 Aug 2024 08:35:04 +0300 Subject: [PATCH] feat: Pull event/property definitions into its own service, so we can get it out of the plugin server (#24166) --- .github/workflows/rust-docker-build.yml | 1 + rust/Cargo.lock | 36 +- rust/Cargo.toml | 3 +- rust/common/serve_metrics/Cargo.toml | 13 + rust/common/serve_metrics/src/lib.rs | 82 +++++ rust/property-defs-rs/Cargo.toml | 25 ++ rust/property-defs-rs/src/app_context.rs | 36 ++ rust/property-defs-rs/src/config.rs | 66 ++++ rust/property-defs-rs/src/lib.rs | 4 + rust/property-defs-rs/src/main.rs | 145 ++++++++ rust/property-defs-rs/src/metrics_consts.rs | 6 + rust/property-defs-rs/src/types.rs | 346 ++++++++++++++++++++ 12 files changed, 760 insertions(+), 3 deletions(-) create mode 100644 rust/common/serve_metrics/Cargo.toml create mode 100644 rust/common/serve_metrics/src/lib.rs create mode 100644 rust/property-defs-rs/Cargo.toml create mode 100644 rust/property-defs-rs/src/app_context.rs create mode 100644 rust/property-defs-rs/src/config.rs create mode 100644 rust/property-defs-rs/src/lib.rs create mode 100644 rust/property-defs-rs/src/main.rs create mode 100644 rust/property-defs-rs/src/metrics_consts.rs create mode 100644 rust/property-defs-rs/src/types.rs diff --git a/.github/workflows/rust-docker-build.yml b/.github/workflows/rust-docker-build.yml index 5d636c19a8901..078fac107a400 100644 --- a/.github/workflows/rust-docker-build.yml +++ b/.github/workflows/rust-docker-build.yml @@ -21,6 +21,7 @@ jobs: - hook-worker - cyclotron-janitor - cyclotron-fetch + - property-defs-rs runs-on: depot-ubuntu-22.04-4 permissions: id-token: write # allow issuing OIDC tokens for this workflow run diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ad5ec97b9218f..59078d1b019cc 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -665,9 +665,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.33" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", @@ -2805,6 +2805,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "property-defs-rs" +version = "0.1.0" +dependencies = [ + "axum 0.7.5", + "chrono", + "envconfig", + "futures", + "health", + "metrics", + "rdkafka", + "serde", + "serde_json", + "serve-metrics", + "sqlx", + "time", + "tokio", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "prost" version = "0.12.4" @@ -3398,6 +3420,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serve-metrics" +version = "0.1.0" +dependencies = [ + "axum 0.7.5", + "metrics", + "metrics-exporter-prometheus", + "tokio", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 712a5099b5b0e..347530f99d1bd 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ + "property-defs-rs", "capture", "common/health", "common/metrics", @@ -40,7 +41,7 @@ axum = { version = "0.7.5", features = ["http2", "macros", "matched-path"] } axum-client-ip = "0.6.0" base64 = "0.22.0" bytes = "1" -chrono = { version = "0.4", features = ["default", "serde"]} +chrono = { version = "0.4.38", features = ["default", "serde"] } envconfig = "0.10.0" eyre = "0.6.9" flate2 = "1.0" diff --git a/rust/common/serve_metrics/Cargo.toml b/rust/common/serve_metrics/Cargo.toml new file mode 100644 index 0000000000000..05eb90c0bd29c --- /dev/null +++ b/rust/common/serve_metrics/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "serve-metrics" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +axum = { workspace = true } +tokio = { workspace = true } +metrics-exporter-prometheus = { workspace = true } +metrics = { workspace = true } \ No newline at end of file diff --git a/rust/common/serve_metrics/src/lib.rs b/rust/common/serve_metrics/src/lib.rs new file mode 100644 index 0000000000000..904aad3402b33 --- /dev/null +++ b/rust/common/serve_metrics/src/lib.rs @@ -0,0 +1,82 @@ +use std::time::{Instant, SystemTime}; + +use axum::{ + body::Body, extract::MatchedPath, http::Request, middleware::Next, response::IntoResponse, + routing::get, Router, +}; +use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; + +/// Bind a `TcpListener` on the provided bind address to serve a `Router` on it. +/// This function is intended to take a Router as returned by `setup_metrics_router`, potentially with more routes added by the caller. +pub async fn serve(router: Router, bind: &str) -> Result<(), std::io::Error> { + let listener = tokio::net::TcpListener::bind(bind).await?; + + axum::serve(listener, router).await?; + + Ok(()) +} + +/// Add the prometheus endpoint and middleware to a router, should be called last. +pub fn setup_metrics_routes(router: Router) -> Router { + let recorder_handle = setup_metrics_recorder(); + + router + .route( + "/metrics", + get(move || std::future::ready(recorder_handle.render())), + ) + .layer(axum::middleware::from_fn(track_metrics)) +} + +pub fn setup_metrics_recorder() -> PrometheusHandle { + const BUCKETS: &[f64] = &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 50.0, 100.0, 250.0, + ]; + + PrometheusBuilder::new() + .set_buckets(BUCKETS) + .unwrap() + .install_recorder() + .unwrap() +} + +/// Middleware to record some common HTTP metrics +/// Someday tower-http might provide a metrics middleware: https://github.com/tower-rs/tower-http/issues/57 +pub async fn track_metrics(req: Request, next: Next) -> impl IntoResponse { + let start = Instant::now(); + + let path = if let Some(matched_path) = req.extensions().get::() { + matched_path.as_str().to_owned() + } else { + req.uri().path().to_owned() + }; + + let method = req.method().clone(); + + // Run the rest of the request handling first, so we can measure it and get response + // codes. + let response = next.run(req).await; + + let latency = start.elapsed().as_secs_f64(); + let status = response.status().as_u16().to_string(); + + let labels = [ + ("method", method.to_string()), + ("path", path), + ("status", status), + ]; + + metrics::counter!("http_requests_total", &labels).increment(1); + metrics::histogram!("http_requests_duration_seconds", &labels).record(latency); + + response +} + +/// Returns the number of seconds since the Unix epoch, to use in prom gauges. +/// Saturates to zero if the system time is set before epoch. +pub fn get_current_timestamp_seconds() -> f64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as f64 +} diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml new file mode 100644 index 0000000000000..177b159c9093b --- /dev/null +++ b/rust/property-defs-rs/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "property-defs-rs" +version = "0.1.0" +edition = "2021" + +[dependencies] +uuid = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +rdkafka = { workspace = true } +tokio = { workspace = true } +envconfig = {workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +sqlx = { workspace = true } +futures = { workspace = true } +health = { path = "../common/health" } +time = { workspace = true } +axum = { workspace = true } +serve-metrics = { path = "../common/serve_metrics" } +metrics = { workspace = true } +chrono = { workspace = true } + +[lints] +workspace = true diff --git a/rust/property-defs-rs/src/app_context.rs b/rust/property-defs-rs/src/app_context.rs new file mode 100644 index 0000000000000..317ff2c355925 --- /dev/null +++ b/rust/property-defs-rs/src/app_context.rs @@ -0,0 +1,36 @@ +use std::collections::HashSet; + +use health::{HealthHandle, HealthRegistry}; +use sqlx::{postgres::PgPoolOptions, PgPool}; + +use crate::{config::Config, metrics_consts::UPDATES_ISSUED, types::Update}; + +pub struct AppContext { + pub pool: PgPool, + pub liveness: HealthRegistry, + pub worker_liveness: HealthHandle, +} + +impl AppContext { + pub async fn new(config: &Config) -> Result { + let options = PgPoolOptions::new().max_connections(config.max_pg_connections); + + let pool = options.connect(&config.database_url).await?; + + let liveness: HealthRegistry = HealthRegistry::new("liveness"); + let worker_liveness = liveness + .register("worker".to_string(), time::Duration::seconds(60)) + .await; + + Ok(Self { + pool, + liveness, + worker_liveness, + }) + } + + pub async fn issue(&self, updates: HashSet) -> Result<(), sqlx::Error> { + metrics::counter!(UPDATES_ISSUED).increment(updates.len() as u64); + Ok(()) + } +} diff --git a/rust/property-defs-rs/src/config.rs b/rust/property-defs-rs/src/config.rs new file mode 100644 index 0000000000000..f74b3a90886e9 --- /dev/null +++ b/rust/property-defs-rs/src/config.rs @@ -0,0 +1,66 @@ +use envconfig::Envconfig; +use rdkafka::ClientConfig; + +#[derive(Envconfig, Clone)] +pub struct Config { + #[envconfig(default = "postgres://posthog:posthog@localhost:5432/posthog")] + pub database_url: String, + + #[envconfig(default = "10")] + pub max_pg_connections: u32, + + #[envconfig(nested = true)] + pub kafka: KafkaConfig, + + #[envconfig(default = "10")] + pub max_concurrent_transactions: usize, + + #[envconfig(default = "10000")] + pub max_batch_size: usize, + + // If a worker recieves a batch smaller than this, it will simply not commit the offset and + // sleep for a while, since DB ops/event scales inversely to batch size + #[envconfig(default = "1000")] + pub min_batch_size: usize, + + #[envconfig(default = "100")] + pub next_event_wait_timeout_ms: u64, + + #[envconfig(from = "BIND_HOST", default = "::")] + pub host: String, + + #[envconfig(from = "BIND_PORT", default = "3301")] + pub port: u16, +} + +#[derive(Envconfig, Clone)] +pub struct KafkaConfig { + #[envconfig(default = "kafka:9092")] + pub kafka_hosts: String, + #[envconfig(default = "clickhouse_events_json")] + pub event_topic: String, + #[envconfig(default = "false")] + pub kafka_tls: bool, + #[envconfig(default = "false")] + pub verify_ssl_certificate: bool, + #[envconfig(default = "autocomplete-rs")] + pub consumer_group: String, +} + +impl From<&KafkaConfig> for ClientConfig { + fn from(config: &KafkaConfig) -> Self { + let mut client_config = ClientConfig::new(); + client_config + .set("bootstrap.servers", &config.kafka_hosts) + .set("statistics.interval.ms", "10000") + .set("group.id", config.consumer_group.clone()); + + if config.kafka_tls { + client_config.set("security.protocol", "ssl").set( + "enable.ssl.certificate.verification", + config.verify_ssl_certificate.to_string(), + ); + }; + client_config + } +} diff --git a/rust/property-defs-rs/src/lib.rs b/rust/property-defs-rs/src/lib.rs new file mode 100644 index 0000000000000..7c639d72efa90 --- /dev/null +++ b/rust/property-defs-rs/src/lib.rs @@ -0,0 +1,4 @@ +pub mod app_context; +pub mod config; +pub mod metrics_consts; +pub mod types; diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs new file mode 100644 index 0000000000000..e502daae26c9f --- /dev/null +++ b/rust/property-defs-rs/src/main.rs @@ -0,0 +1,145 @@ +use std::{collections::HashSet, sync::Arc, time::Duration}; + +use axum::{routing::get, Router}; +use envconfig::Envconfig; +use futures::future::ready; +use property_defs_rs::{ + app_context::AppContext, + config::Config, + metrics_consts::{BATCH_SKIPPED, EVENTS_RECEIVED, FORCED_SMALL_BATCH, SMALL_BATCH_SLEEP}, + types::{Event, Update}, +}; +use rdkafka::{ + consumer::{Consumer, StreamConsumer}, + message::BorrowedMessage, + ClientConfig, Message, +}; +use serve_metrics::{serve, setup_metrics_routes}; +use tokio::{select, task::JoinHandle, time::sleep}; +use tracing::{info, warn}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; + +fn setup_tracing() { + let log_layer: tracing_subscriber::filter::Filtered< + tracing_subscriber::fmt::Layer, + EnvFilter, + tracing_subscriber::Registry, + > = tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()); + tracing_subscriber::registry().with(log_layer).init(); +} + +pub async fn index() -> &'static str { + "property definitions service" +} + +fn start_health_liveness_server(config: &Config, context: Arc) -> JoinHandle<()> { + let config = config.clone(); + let router = Router::new() + .route("/", get(index)) + .route("/_readiness", get(index)) + .route( + "/_liveness", + get(move || ready(context.liveness.get_status())), + ); + let router = setup_metrics_routes(router); + let bind = format!("{}:{}", config.host, config.port); + tokio::task::spawn(async move { + serve(router, &bind) + .await + .expect("failed to start serving metrics"); + }) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + setup_tracing(); + info!("Starting up..."); + + let config = Config::init_from_env()?; + + let kafka_config: ClientConfig = (&config.kafka).into(); + + let consumer: StreamConsumer = kafka_config.create()?; + + let context = Arc::new(AppContext::new(&config).await?); + + consumer.subscribe(&[config.kafka.event_topic.as_str()])?; + + info!("Subscribed to topic: {}", config.kafka.event_topic); + + start_health_liveness_server(&config, context.clone()); + + let mut batch = Vec::with_capacity(config.max_batch_size); + + let mut sleep_count = 0; + loop { + context.worker_liveness.report_healthy().await; + + while batch.len() < config.max_batch_size { + // Try to grab from the consumer, but use a select! to timeout if we'd block for more than some time + select! { + res = consumer.recv() => { + batch.push(res?); // Workers die on an kafka error + } + _ = sleep(Duration::from_millis(config.next_event_wait_timeout_ms)) => { + break; + } + } + } + + // We only process batches over a certain threshold, unless we haven't received anything in a while, to reduce DB load + if batch.len() < config.min_batch_size { + sleep_count += 1; + info!("Batch size is less than min_batch_size, sleeping for 2 seconds"); + metrics::counter!(BATCH_SKIPPED).increment(1); + sleep(Duration::from_millis(2000)).await; + if sleep_count > 10 { + warn!("Slept too many times, continuing with a small batch"); + metrics::counter!(FORCED_SMALL_BATCH).increment(1); + } else { + metrics::counter!(SMALL_BATCH_SLEEP).increment(1); + continue; + } + } + sleep_count = 0; + + metrics::counter!(EVENTS_RECEIVED).increment(batch.len() as u64); + + let updates: HashSet = batch + .drain(..) + .filter_map(message_to_event) + .flat_map(Event::into_updates) + .filter_map(filter_cached) + .collect(); + + context.issue(updates).await?; + } +} + +// This copies event properties, which means the total resident memory usage is higher than we'd like, and that constrains +// our batch size. serde_json provides no zero-copy way to parse a JSON object, so we're stuck with this for now. +fn message_to_event(msg: BorrowedMessage) -> Option { + let Some(payload) = msg.payload() else { + warn!("Received empty event"); + metrics::counter!("empty_event").increment(1); + return None; + }; + + let event = serde_json::from_slice::(payload); + let event = match event { + Ok(e) => e, + Err(e) => { + metrics::counter!("event_parse_error").increment(1); + warn!("Failed to parse event: {:?}", e); + return None; + } + }; + Some(event) +} + +// TODO: this is where caching would go, if we had any. Could probably use a bloom filter or something, +// rather than storing the entire update in memory, if we wanted to store some HUGE number of updates and +// be /really/ good about not hitting the DB when we don't need to. Right now this is just a no-op. +fn filter_cached(update: Update) -> Option { + Some(update) +} diff --git a/rust/property-defs-rs/src/metrics_consts.rs b/rust/property-defs-rs/src/metrics_consts.rs new file mode 100644 index 0000000000000..5cb4ba0091f61 --- /dev/null +++ b/rust/property-defs-rs/src/metrics_consts.rs @@ -0,0 +1,6 @@ +pub const UPDATES_ISSUED: &str = "prop_defs_issued_updates"; +pub const BATCH_SKIPPED: &str = "prop_defs_batch_skipped"; +pub const EVENTS_RECEIVED: &str = "prop_defs_events_received"; +pub const EVENTS_SKIPPED: &str = "prop_defs_events_skipped"; +pub const FORCED_SMALL_BATCH: &str = "prop_defs_forced_small_batch"; +pub const SMALL_BATCH_SLEEP: &str = "prop_defs_small_batch_sleep"; diff --git a/rust/property-defs-rs/src/types.rs b/rust/property-defs-rs/src/types.rs new file mode 100644 index 0000000000000..6312fc564d849 --- /dev/null +++ b/rust/property-defs-rs/src/types.rs @@ -0,0 +1,346 @@ +use std::{fmt, hash::Hash, str::FromStr}; + +use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use tracing::warn; +use uuid::Uuid; + +use crate::metrics_consts::EVENTS_SKIPPED; + +pub const SKIP_PROPERTIES: [&str; 9] = [ + "$set", + "$set_once", + "$unset", + "$group_0", + "$group_1", + "$group_2", + "$group_3", + "$group_4", + "$groups", +]; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub enum PropertyParentType { + Event = 1, + Person = 2, + Group = 3, + Session = 4, +} + +impl From for i32 { + fn from(parent_type: PropertyParentType) -> i32 { + match parent_type { + PropertyParentType::Event => 1, + PropertyParentType::Person => 2, + PropertyParentType::Group => 3, + PropertyParentType::Session => 4, + } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +pub enum PropertyValueType { + DateTime, + String, + Numeric, + Boolean, + Duration, +} + +impl fmt::Display for PropertyValueType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + PropertyValueType::DateTime => write!(f, "DateTime"), + PropertyValueType::String => write!(f, "String"), + PropertyValueType::Numeric => write!(f, "Numeric"), + PropertyValueType::Boolean => write!(f, "Boolean"), + PropertyValueType::Duration => write!(f, "Duration"), + } + } +} + +// The grouptypemapping table uses i32's, but we get group types by name, so we have to resolve them before DB writes, sigh +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum GroupType { + Unresolved(String), + Resolved(String, i32), +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct PropertyDefinition { + pub id: Uuid, + pub team_id: i32, + pub name: String, + pub is_numerical: bool, + pub property_type: Option, + pub event_type: Option, + pub group_type_index: Option, + pub property_type_format: Option, // Deprecated + pub volume_30_day: Option, // Deprecated + pub query_usage_30_day: Option, // Deprecated +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct EventDefinition { + pub id: Uuid, + pub name: String, + pub team_id: i32, + pub last_seen_at: DateTime, +} + +// Derived hash since these are keyed on all fields in the DB +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub struct EventProperty { + team_id: i32, + event: String, + property: String, +} + +// Represents a generic update, but comparable, allowing us to dedupe and cache updates +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub enum Update { + Event(EventDefinition), + Property(PropertyDefinition), + EventProperty(EventProperty), +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Event { + pub team_id: i32, + pub event: String, + pub properties: Option, +} + +impl From<&Event> for EventDefinition { + fn from(event: &Event) -> Self { + EventDefinition { + id: Uuid::now_v7(), + name: sanitize_event_name(&event.event), + team_id: event.team_id, + // We round last seen to the nearest day, as per the TS impl. Unwrap is safe here because we + // the duration is positive, non-zero, and smaller than time since epoch + last_seen_at: floor_datetime(Utc::now(), Duration::days(1)).unwrap(), + } + } +} + +impl Event { + pub fn into_updates(self) -> Vec { + let team_id = self.team_id; + let event = self.event.clone(); + + let updates = self.into_updates_inner(); + if updates.len() > 10_000 { + warn!( + "Event {} for team {} has more than 10,000 properties, skipping", + event, team_id + ); + metrics::counter!(EVENTS_SKIPPED).increment(1); + return vec![]; + } + + updates + } + + fn into_updates_inner(self) -> Vec { + let mut updates = vec![Update::Event(EventDefinition::from(&self))]; + let Some(props) = &self.properties else { + return updates; + }; + + let Ok(props) = Value::from_str(props) else { + return updates; + }; + + let Value::Object(props) = props else { + return updates; + }; + + // If this is a groupidentify event, we ONLY bubble up the group properties + if self.event == "$groupidentify" { + let Some(Value::String(group_type)) = props.get("$group_type") else { + return updates; + }; + let group_type = GroupType::Unresolved(group_type.clone()); + + let Some(group_properties) = props.get("$group_set") else { + return updates; + }; + + let Value::Object(group_properties) = group_properties else { + return updates; + }; + + self.get_props_from_object( + &mut updates, + group_properties, + PropertyParentType::Group, + Some(group_type), + ); + return updates; + } + + // Grab the "ordinary" (non-person) event properties + self.get_props_from_object(&mut updates, &props, PropertyParentType::Event, None); + + // If there are any person properties, also push those into the flat property map. + if let Some(Value::Object(set_props)) = props.get("$set") { + self.get_props_from_object(&mut updates, set_props, PropertyParentType::Person, None) + } + if let Some(Value::Object(set_once_props)) = props.get("$set_once") { + self.get_props_from_object( + &mut updates, + set_once_props, + PropertyParentType::Person, + None, + ) + } + + updates + } + + fn get_props_from_object( + &self, + updates: &mut Vec, + set: &Map, + parent_type: PropertyParentType, + group_type: Option, + ) { + updates.reserve(set.len() * 2); + for (key, value) in set { + if SKIP_PROPERTIES.contains(&key.as_str()) && parent_type == PropertyParentType::Event { + continue; + } + + updates.push(Update::EventProperty(EventProperty { + team_id: self.team_id, + event: self.event.clone(), + property: key.clone(), + })); + + let property_type = detect_property_type(key, value); + let is_numerical = matches!(property_type, Some(PropertyValueType::Numeric)); + + let def = PropertyDefinition { + id: Uuid::now_v7(), + team_id: self.team_id, + name: key.clone(), + is_numerical, + property_type, + event_type: Some(parent_type), + group_type_index: group_type.clone(), + property_type_format: None, + volume_30_day: None, + query_usage_30_day: None, + }; + updates.push(Update::Property(def)); + } + } +} + +fn detect_property_type(key: &str, value: &Value) -> Option { + // There are a whole set of special cases here, taken from the TS + if key.starts_with("utm_") { + // utm_ prefixed properties should always be detected as strings. + // Sometimes the first value sent looks like a number, event though + // subsequent values are not. See + // https://github.com/PostHog/posthog/issues/12529 for more context. + return Some(PropertyValueType::String); + } + if key.starts_with("$feature/") { + // $feature/ prefixed properties should always be detected as strings. + // These are feature flag values, and can be boolean or string. + // Sometimes the first value sent is boolean (because flag isn't enabled) while + // subsequent values are not. We don't want this to be misunderstood as a boolean. + return Some(PropertyValueType::String); + } + + if key == "$feature_flag_response" { + // $feature_flag_response properties should always be detected as strings. + // These are feature flag values, and can be boolean or string. + // Sometimes the first value sent is boolean (because flag isn't enabled) while + // subsequent values are not. We don't want this to be misunderstood as a boolean. + return Some(PropertyValueType::String); + } + + if key.starts_with("$survey_response") { + // NB: $survey_responses are collected in an interesting way, where the first + // response is called `$survey_response` and subsequent responses are called + // `$survey_response_2`, `$survey_response_3`, etc. So, this check should auto-cast + // all survey responses to strings, and $survey_response properties should always be detected as strings. + return Some(PropertyValueType::String); + } + + match value { + Value::String(s) => { + let s = &s.trim().to_lowercase(); + if s == "true" || s == "false" { + Some(PropertyValueType::Boolean) + } else { + // TODO - we should try to auto-detect datetime strings here, but I'm skipping the chunk of regex necessary to do it for v0 + Some(PropertyValueType::String) + } + } + Value::Number(_) => { + // TODO - this is a divergence from the TS impl - the TS also checks if the contained number is + // "likely" to be a unix timestamp on the basis of the number of characters. I have mixed feelings about this, + // so I'm going to leave it as just checking the key for now. This means we're being /less/ strict with datetime + // detection here than in the TS + if key.to_lowercase().contains("timestamp") || key.to_lowercase().contains("time") { + Some(PropertyValueType::DateTime) + } else { + Some(PropertyValueType::Numeric) + } + } + Value::Bool(_) => Some(PropertyValueType::Boolean), + _ => None, + } +} + +fn sanitize_event_name(event_name: &str) -> String { + event_name.replace('\u{0000}', "\u{FFFD}") +} + +// These hash impls correspond to DB uniqueness constraints, pulled from the TS + +impl Hash for PropertyDefinition { + fn hash(&self, state: &mut H) { + self.team_id.hash(state); + self.name.hash(state); + self.event_type.hash(state); + self.group_type_index.hash(state); + } +} + +impl Hash for EventDefinition { + fn hash(&self, state: &mut H) { + self.team_id.hash(state); + self.name.hash(state); + self.last_seen_at.hash(state) + } +} + +// Ensure group type hashes identically regardless of whether it's resolved or not. Note that if +// someone changes the name associated with a group type, all subsequent events will hash differently +// because of this, but that seems fine - it just means a few extra DB ops issued, we index on the i32 +// at write time anyway +impl Hash for GroupType { + fn hash(&self, state: &mut H) { + match self { + GroupType::Unresolved(name) => name.hash(state), + GroupType::Resolved(name, _) => name.hash(state), + } + } +} + +fn floor_datetime(dt: DateTime, duration: Duration) -> Result, RoundingError> { + let rounded = dt.duration_round(duration)?; + + // If we rounded up + if rounded > dt { + Ok(rounded - duration) + } else { + Ok(rounded) + } +}