diff --git a/.github/workflows/rust-docker-build.yml b/.github/workflows/rust-docker-build.yml
index 5d636c19a8901..078fac107a400 100644
--- a/.github/workflows/rust-docker-build.yml
+++ b/.github/workflows/rust-docker-build.yml
@@ -21,6 +21,7 @@ jobs:
- hook-worker
- cyclotron-janitor
- cyclotron-fetch
+ - property-defs-rs
runs-on: depot-ubuntu-22.04-4
permissions:
id-token: write # allow issuing OIDC tokens for this workflow run
diff --git a/rust/Cargo.lock b/rust/Cargo.lock
index ad5ec97b9218f..59078d1b019cc 100644
--- a/rust/Cargo.lock
+++ b/rust/Cargo.lock
@@ -665,9 +665,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
-version = "0.4.33"
+version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
+checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
dependencies = [
"android-tzdata",
"iana-time-zone",
@@ -2805,6 +2805,28 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "property-defs-rs"
+version = "0.1.0"
+dependencies = [
+ "axum 0.7.5",
+ "chrono",
+ "envconfig",
+ "futures",
+ "health",
+ "metrics",
+ "rdkafka",
+ "serde",
+ "serde_json",
+ "serve-metrics",
+ "sqlx",
+ "time",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+ "uuid",
+]
+
[[package]]
name = "prost"
version = "0.12.4"
@@ -3398,6 +3420,16 @@ dependencies = [
"serde",
]
+[[package]]
+name = "serve-metrics"
+version = "0.1.0"
+dependencies = [
+ "axum 0.7.5",
+ "metrics",
+ "metrics-exporter-prometheus",
+ "tokio",
+]
+
[[package]]
name = "sha1"
version = "0.10.6"
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 712a5099b5b0e..347530f99d1bd 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -2,6 +2,7 @@
resolver = "2"
members = [
+ "property-defs-rs",
"capture",
"common/health",
"common/metrics",
@@ -40,7 +41,7 @@ axum = { version = "0.7.5", features = ["http2", "macros", "matched-path"] }
axum-client-ip = "0.6.0"
base64 = "0.22.0"
bytes = "1"
-chrono = { version = "0.4", features = ["default", "serde"]}
+chrono = { version = "0.4.38", features = ["default", "serde"] }
envconfig = "0.10.0"
eyre = "0.6.9"
flate2 = "1.0"
diff --git a/rust/common/serve_metrics/Cargo.toml b/rust/common/serve_metrics/Cargo.toml
new file mode 100644
index 0000000000000..05eb90c0bd29c
--- /dev/null
+++ b/rust/common/serve_metrics/Cargo.toml
@@ -0,0 +1,13 @@
+[package]
+name = "serve-metrics"
+version = "0.1.0"
+edition = "2021"
+
+[lints]
+workspace = true
+
+[dependencies]
+axum = { workspace = true }
+tokio = { workspace = true }
+metrics-exporter-prometheus = { workspace = true }
+metrics = { workspace = true }
\ No newline at end of file
diff --git a/rust/common/serve_metrics/src/lib.rs b/rust/common/serve_metrics/src/lib.rs
new file mode 100644
index 0000000000000..904aad3402b33
--- /dev/null
+++ b/rust/common/serve_metrics/src/lib.rs
@@ -0,0 +1,82 @@
+use std::time::{Instant, SystemTime};
+
+use axum::{
+ body::Body, extract::MatchedPath, http::Request, middleware::Next, response::IntoResponse,
+ routing::get, Router,
+};
+use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
+
+/// Bind a `TcpListener` on the provided bind address to serve a `Router` on it.
+/// This function is intended to take a Router as returned by `setup_metrics_router`, potentially with more routes added by the caller.
+pub async fn serve(router: Router, bind: &str) -> Result<(), std::io::Error> {
+ let listener = tokio::net::TcpListener::bind(bind).await?;
+
+ axum::serve(listener, router).await?;
+
+ Ok(())
+}
+
+/// Add the prometheus endpoint and middleware to a router, should be called last.
+pub fn setup_metrics_routes(router: Router) -> Router {
+ let recorder_handle = setup_metrics_recorder();
+
+ router
+ .route(
+ "/metrics",
+ get(move || std::future::ready(recorder_handle.render())),
+ )
+ .layer(axum::middleware::from_fn(track_metrics))
+}
+
+pub fn setup_metrics_recorder() -> PrometheusHandle {
+ const BUCKETS: &[f64] = &[
+ 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 50.0, 100.0, 250.0,
+ ];
+
+ PrometheusBuilder::new()
+ .set_buckets(BUCKETS)
+ .unwrap()
+ .install_recorder()
+ .unwrap()
+}
+
+/// Middleware to record some common HTTP metrics
+/// Someday tower-http might provide a metrics middleware: https://github.com/tower-rs/tower-http/issues/57
+pub async fn track_metrics(req: Request
, next: Next) -> impl IntoResponse {
+ let start = Instant::now();
+
+ let path = if let Some(matched_path) = req.extensions().get::() {
+ matched_path.as_str().to_owned()
+ } else {
+ req.uri().path().to_owned()
+ };
+
+ let method = req.method().clone();
+
+ // Run the rest of the request handling first, so we can measure it and get response
+ // codes.
+ let response = next.run(req).await;
+
+ let latency = start.elapsed().as_secs_f64();
+ let status = response.status().as_u16().to_string();
+
+ let labels = [
+ ("method", method.to_string()),
+ ("path", path),
+ ("status", status),
+ ];
+
+ metrics::counter!("http_requests_total", &labels).increment(1);
+ metrics::histogram!("http_requests_duration_seconds", &labels).record(latency);
+
+ response
+}
+
+/// Returns the number of seconds since the Unix epoch, to use in prom gauges.
+/// Saturates to zero if the system time is set before epoch.
+pub fn get_current_timestamp_seconds() -> f64 {
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as f64
+}
diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml
new file mode 100644
index 0000000000000..177b159c9093b
--- /dev/null
+++ b/rust/property-defs-rs/Cargo.toml
@@ -0,0 +1,25 @@
+[package]
+name = "property-defs-rs"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+uuid = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+rdkafka = { workspace = true }
+tokio = { workspace = true }
+envconfig = {workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
+sqlx = { workspace = true }
+futures = { workspace = true }
+health = { path = "../common/health" }
+time = { workspace = true }
+axum = { workspace = true }
+serve-metrics = { path = "../common/serve_metrics" }
+metrics = { workspace = true }
+chrono = { workspace = true }
+
+[lints]
+workspace = true
diff --git a/rust/property-defs-rs/src/app_context.rs b/rust/property-defs-rs/src/app_context.rs
new file mode 100644
index 0000000000000..317ff2c355925
--- /dev/null
+++ b/rust/property-defs-rs/src/app_context.rs
@@ -0,0 +1,36 @@
+use std::collections::HashSet;
+
+use health::{HealthHandle, HealthRegistry};
+use sqlx::{postgres::PgPoolOptions, PgPool};
+
+use crate::{config::Config, metrics_consts::UPDATES_ISSUED, types::Update};
+
+pub struct AppContext {
+ pub pool: PgPool,
+ pub liveness: HealthRegistry,
+ pub worker_liveness: HealthHandle,
+}
+
+impl AppContext {
+ pub async fn new(config: &Config) -> Result {
+ let options = PgPoolOptions::new().max_connections(config.max_pg_connections);
+
+ let pool = options.connect(&config.database_url).await?;
+
+ let liveness: HealthRegistry = HealthRegistry::new("liveness");
+ let worker_liveness = liveness
+ .register("worker".to_string(), time::Duration::seconds(60))
+ .await;
+
+ Ok(Self {
+ pool,
+ liveness,
+ worker_liveness,
+ })
+ }
+
+ pub async fn issue(&self, updates: HashSet) -> Result<(), sqlx::Error> {
+ metrics::counter!(UPDATES_ISSUED).increment(updates.len() as u64);
+ Ok(())
+ }
+}
diff --git a/rust/property-defs-rs/src/config.rs b/rust/property-defs-rs/src/config.rs
new file mode 100644
index 0000000000000..f74b3a90886e9
--- /dev/null
+++ b/rust/property-defs-rs/src/config.rs
@@ -0,0 +1,66 @@
+use envconfig::Envconfig;
+use rdkafka::ClientConfig;
+
+#[derive(Envconfig, Clone)]
+pub struct Config {
+ #[envconfig(default = "postgres://posthog:posthog@localhost:5432/posthog")]
+ pub database_url: String,
+
+ #[envconfig(default = "10")]
+ pub max_pg_connections: u32,
+
+ #[envconfig(nested = true)]
+ pub kafka: KafkaConfig,
+
+ #[envconfig(default = "10")]
+ pub max_concurrent_transactions: usize,
+
+ #[envconfig(default = "10000")]
+ pub max_batch_size: usize,
+
+ // If a worker recieves a batch smaller than this, it will simply not commit the offset and
+ // sleep for a while, since DB ops/event scales inversely to batch size
+ #[envconfig(default = "1000")]
+ pub min_batch_size: usize,
+
+ #[envconfig(default = "100")]
+ pub next_event_wait_timeout_ms: u64,
+
+ #[envconfig(from = "BIND_HOST", default = "::")]
+ pub host: String,
+
+ #[envconfig(from = "BIND_PORT", default = "3301")]
+ pub port: u16,
+}
+
+#[derive(Envconfig, Clone)]
+pub struct KafkaConfig {
+ #[envconfig(default = "kafka:9092")]
+ pub kafka_hosts: String,
+ #[envconfig(default = "clickhouse_events_json")]
+ pub event_topic: String,
+ #[envconfig(default = "false")]
+ pub kafka_tls: bool,
+ #[envconfig(default = "false")]
+ pub verify_ssl_certificate: bool,
+ #[envconfig(default = "autocomplete-rs")]
+ pub consumer_group: String,
+}
+
+impl From<&KafkaConfig> for ClientConfig {
+ fn from(config: &KafkaConfig) -> Self {
+ let mut client_config = ClientConfig::new();
+ client_config
+ .set("bootstrap.servers", &config.kafka_hosts)
+ .set("statistics.interval.ms", "10000")
+ .set("group.id", config.consumer_group.clone());
+
+ if config.kafka_tls {
+ client_config.set("security.protocol", "ssl").set(
+ "enable.ssl.certificate.verification",
+ config.verify_ssl_certificate.to_string(),
+ );
+ };
+ client_config
+ }
+}
diff --git a/rust/property-defs-rs/src/lib.rs b/rust/property-defs-rs/src/lib.rs
new file mode 100644
index 0000000000000..7c639d72efa90
--- /dev/null
+++ b/rust/property-defs-rs/src/lib.rs
@@ -0,0 +1,4 @@
+pub mod app_context;
+pub mod config;
+pub mod metrics_consts;
+pub mod types;
diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs
new file mode 100644
index 0000000000000..e502daae26c9f
--- /dev/null
+++ b/rust/property-defs-rs/src/main.rs
@@ -0,0 +1,145 @@
+use std::{collections::HashSet, sync::Arc, time::Duration};
+
+use axum::{routing::get, Router};
+use envconfig::Envconfig;
+use futures::future::ready;
+use property_defs_rs::{
+ app_context::AppContext,
+ config::Config,
+ metrics_consts::{BATCH_SKIPPED, EVENTS_RECEIVED, FORCED_SMALL_BATCH, SMALL_BATCH_SLEEP},
+ types::{Event, Update},
+};
+use rdkafka::{
+ consumer::{Consumer, StreamConsumer},
+ message::BorrowedMessage,
+ ClientConfig, Message,
+};
+use serve_metrics::{serve, setup_metrics_routes};
+use tokio::{select, task::JoinHandle, time::sleep};
+use tracing::{info, warn};
+use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
+
+fn setup_tracing() {
+ let log_layer: tracing_subscriber::filter::Filtered<
+ tracing_subscriber::fmt::Layer,
+ EnvFilter,
+ tracing_subscriber::Registry,
+ > = tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env());
+ tracing_subscriber::registry().with(log_layer).init();
+}
+
+pub async fn index() -> &'static str {
+ "property definitions service"
+}
+
+fn start_health_liveness_server(config: &Config, context: Arc) -> JoinHandle<()> {
+ let config = config.clone();
+ let router = Router::new()
+ .route("/", get(index))
+ .route("/_readiness", get(index))
+ .route(
+ "/_liveness",
+ get(move || ready(context.liveness.get_status())),
+ );
+ let router = setup_metrics_routes(router);
+ let bind = format!("{}:{}", config.host, config.port);
+ tokio::task::spawn(async move {
+ serve(router, &bind)
+ .await
+ .expect("failed to start serving metrics");
+ })
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box> {
+ setup_tracing();
+ info!("Starting up...");
+
+ let config = Config::init_from_env()?;
+
+ let kafka_config: ClientConfig = (&config.kafka).into();
+
+ let consumer: StreamConsumer = kafka_config.create()?;
+
+ let context = Arc::new(AppContext::new(&config).await?);
+
+ consumer.subscribe(&[config.kafka.event_topic.as_str()])?;
+
+ info!("Subscribed to topic: {}", config.kafka.event_topic);
+
+ start_health_liveness_server(&config, context.clone());
+
+ let mut batch = Vec::with_capacity(config.max_batch_size);
+
+ let mut sleep_count = 0;
+ loop {
+ context.worker_liveness.report_healthy().await;
+
+ while batch.len() < config.max_batch_size {
+ // Try to grab from the consumer, but use a select! to timeout if we'd block for more than some time
+ select! {
+ res = consumer.recv() => {
+ batch.push(res?); // Workers die on an kafka error
+ }
+ _ = sleep(Duration::from_millis(config.next_event_wait_timeout_ms)) => {
+ break;
+ }
+ }
+ }
+
+ // We only process batches over a certain threshold, unless we haven't received anything in a while, to reduce DB load
+ if batch.len() < config.min_batch_size {
+ sleep_count += 1;
+ info!("Batch size is less than min_batch_size, sleeping for 2 seconds");
+ metrics::counter!(BATCH_SKIPPED).increment(1);
+ sleep(Duration::from_millis(2000)).await;
+ if sleep_count > 10 {
+ warn!("Slept too many times, continuing with a small batch");
+ metrics::counter!(FORCED_SMALL_BATCH).increment(1);
+ } else {
+ metrics::counter!(SMALL_BATCH_SLEEP).increment(1);
+ continue;
+ }
+ }
+ sleep_count = 0;
+
+ metrics::counter!(EVENTS_RECEIVED).increment(batch.len() as u64);
+
+ let updates: HashSet = batch
+ .drain(..)
+ .filter_map(message_to_event)
+ .flat_map(Event::into_updates)
+ .filter_map(filter_cached)
+ .collect();
+
+ context.issue(updates).await?;
+ }
+}
+
+// This copies event properties, which means the total resident memory usage is higher than we'd like, and that constrains
+// our batch size. serde_json provides no zero-copy way to parse a JSON object, so we're stuck with this for now.
+fn message_to_event(msg: BorrowedMessage) -> Option {
+ let Some(payload) = msg.payload() else {
+ warn!("Received empty event");
+ metrics::counter!("empty_event").increment(1);
+ return None;
+ };
+
+ let event = serde_json::from_slice::(payload);
+ let event = match event {
+ Ok(e) => e,
+ Err(e) => {
+ metrics::counter!("event_parse_error").increment(1);
+ warn!("Failed to parse event: {:?}", e);
+ return None;
+ }
+ };
+ Some(event)
+}
+
+// TODO: this is where caching would go, if we had any. Could probably use a bloom filter or something,
+// rather than storing the entire update in memory, if we wanted to store some HUGE number of updates and
+// be /really/ good about not hitting the DB when we don't need to. Right now this is just a no-op.
+fn filter_cached(update: Update) -> Option {
+ Some(update)
+}
diff --git a/rust/property-defs-rs/src/metrics_consts.rs b/rust/property-defs-rs/src/metrics_consts.rs
new file mode 100644
index 0000000000000..5cb4ba0091f61
--- /dev/null
+++ b/rust/property-defs-rs/src/metrics_consts.rs
@@ -0,0 +1,6 @@
+pub const UPDATES_ISSUED: &str = "prop_defs_issued_updates";
+pub const BATCH_SKIPPED: &str = "prop_defs_batch_skipped";
+pub const EVENTS_RECEIVED: &str = "prop_defs_events_received";
+pub const EVENTS_SKIPPED: &str = "prop_defs_events_skipped";
+pub const FORCED_SMALL_BATCH: &str = "prop_defs_forced_small_batch";
+pub const SMALL_BATCH_SLEEP: &str = "prop_defs_small_batch_sleep";
diff --git a/rust/property-defs-rs/src/types.rs b/rust/property-defs-rs/src/types.rs
new file mode 100644
index 0000000000000..6312fc564d849
--- /dev/null
+++ b/rust/property-defs-rs/src/types.rs
@@ -0,0 +1,346 @@
+use std::{fmt, hash::Hash, str::FromStr};
+
+use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc};
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use tracing::warn;
+use uuid::Uuid;
+
+use crate::metrics_consts::EVENTS_SKIPPED;
+
+pub const SKIP_PROPERTIES: [&str; 9] = [
+ "$set",
+ "$set_once",
+ "$unset",
+ "$group_0",
+ "$group_1",
+ "$group_2",
+ "$group_3",
+ "$group_4",
+ "$groups",
+];
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
+pub enum PropertyParentType {
+ Event = 1,
+ Person = 2,
+ Group = 3,
+ Session = 4,
+}
+
+impl From for i32 {
+ fn from(parent_type: PropertyParentType) -> i32 {
+ match parent_type {
+ PropertyParentType::Event => 1,
+ PropertyParentType::Person => 2,
+ PropertyParentType::Group => 3,
+ PropertyParentType::Session => 4,
+ }
+ }
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
+pub enum PropertyValueType {
+ DateTime,
+ String,
+ Numeric,
+ Boolean,
+ Duration,
+}
+
+impl fmt::Display for PropertyValueType {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ PropertyValueType::DateTime => write!(f, "DateTime"),
+ PropertyValueType::String => write!(f, "String"),
+ PropertyValueType::Numeric => write!(f, "Numeric"),
+ PropertyValueType::Boolean => write!(f, "Boolean"),
+ PropertyValueType::Duration => write!(f, "Duration"),
+ }
+ }
+}
+
+// The grouptypemapping table uses i32's, but we get group types by name, so we have to resolve them before DB writes, sigh
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub enum GroupType {
+ Unresolved(String),
+ Resolved(String, i32),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct PropertyDefinition {
+ pub id: Uuid,
+ pub team_id: i32,
+ pub name: String,
+ pub is_numerical: bool,
+ pub property_type: Option,
+ pub event_type: Option,
+ pub group_type_index: Option,
+ pub property_type_format: Option, // Deprecated
+ pub volume_30_day: Option, // Deprecated
+ pub query_usage_30_day: Option, // Deprecated
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct EventDefinition {
+ pub id: Uuid,
+ pub name: String,
+ pub team_id: i32,
+ pub last_seen_at: DateTime,
+}
+
+// Derived hash since these are keyed on all fields in the DB
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub struct EventProperty {
+ team_id: i32,
+ event: String,
+ property: String,
+}
+
+// Represents a generic update, but comparable, allowing us to dedupe and cache updates
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub enum Update {
+ Event(EventDefinition),
+ Property(PropertyDefinition),
+ EventProperty(EventProperty),
+}
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct Event {
+ pub team_id: i32,
+ pub event: String,
+ pub properties: Option,
+}
+
+impl From<&Event> for EventDefinition {
+ fn from(event: &Event) -> Self {
+ EventDefinition {
+ id: Uuid::now_v7(),
+ name: sanitize_event_name(&event.event),
+ team_id: event.team_id,
+ // We round last seen to the nearest day, as per the TS impl. Unwrap is safe here because we
+ // the duration is positive, non-zero, and smaller than time since epoch
+ last_seen_at: floor_datetime(Utc::now(), Duration::days(1)).unwrap(),
+ }
+ }
+}
+
+impl Event {
+ pub fn into_updates(self) -> Vec {
+ let team_id = self.team_id;
+ let event = self.event.clone();
+
+ let updates = self.into_updates_inner();
+ if updates.len() > 10_000 {
+ warn!(
+ "Event {} for team {} has more than 10,000 properties, skipping",
+ event, team_id
+ );
+ metrics::counter!(EVENTS_SKIPPED).increment(1);
+ return vec![];
+ }
+
+ updates
+ }
+
+ fn into_updates_inner(self) -> Vec {
+ let mut updates = vec![Update::Event(EventDefinition::from(&self))];
+ let Some(props) = &self.properties else {
+ return updates;
+ };
+
+ let Ok(props) = Value::from_str(props) else {
+ return updates;
+ };
+
+ let Value::Object(props) = props else {
+ return updates;
+ };
+
+ // If this is a groupidentify event, we ONLY bubble up the group properties
+ if self.event == "$groupidentify" {
+ let Some(Value::String(group_type)) = props.get("$group_type") else {
+ return updates;
+ };
+ let group_type = GroupType::Unresolved(group_type.clone());
+
+ let Some(group_properties) = props.get("$group_set") else {
+ return updates;
+ };
+
+ let Value::Object(group_properties) = group_properties else {
+ return updates;
+ };
+
+ self.get_props_from_object(
+ &mut updates,
+ group_properties,
+ PropertyParentType::Group,
+ Some(group_type),
+ );
+ return updates;
+ }
+
+ // Grab the "ordinary" (non-person) event properties
+ self.get_props_from_object(&mut updates, &props, PropertyParentType::Event, None);
+
+ // If there are any person properties, also push those into the flat property map.
+ if let Some(Value::Object(set_props)) = props.get("$set") {
+ self.get_props_from_object(&mut updates, set_props, PropertyParentType::Person, None)
+ }
+ if let Some(Value::Object(set_once_props)) = props.get("$set_once") {
+ self.get_props_from_object(
+ &mut updates,
+ set_once_props,
+ PropertyParentType::Person,
+ None,
+ )
+ }
+
+ updates
+ }
+
+ fn get_props_from_object(
+ &self,
+ updates: &mut Vec,
+ set: &Map,
+ parent_type: PropertyParentType,
+ group_type: Option,
+ ) {
+ updates.reserve(set.len() * 2);
+ for (key, value) in set {
+ if SKIP_PROPERTIES.contains(&key.as_str()) && parent_type == PropertyParentType::Event {
+ continue;
+ }
+
+ updates.push(Update::EventProperty(EventProperty {
+ team_id: self.team_id,
+ event: self.event.clone(),
+ property: key.clone(),
+ }));
+
+ let property_type = detect_property_type(key, value);
+ let is_numerical = matches!(property_type, Some(PropertyValueType::Numeric));
+
+ let def = PropertyDefinition {
+ id: Uuid::now_v7(),
+ team_id: self.team_id,
+ name: key.clone(),
+ is_numerical,
+ property_type,
+ event_type: Some(parent_type),
+ group_type_index: group_type.clone(),
+ property_type_format: None,
+ volume_30_day: None,
+ query_usage_30_day: None,
+ };
+ updates.push(Update::Property(def));
+ }
+ }
+}
+
+fn detect_property_type(key: &str, value: &Value) -> Option {
+ // There are a whole set of special cases here, taken from the TS
+ if key.starts_with("utm_") {
+ // utm_ prefixed properties should always be detected as strings.
+ // Sometimes the first value sent looks like a number, event though
+ // subsequent values are not. See
+ // https://github.com/PostHog/posthog/issues/12529 for more context.
+ return Some(PropertyValueType::String);
+ }
+ if key.starts_with("$feature/") {
+ // $feature/ prefixed properties should always be detected as strings.
+ // These are feature flag values, and can be boolean or string.
+ // Sometimes the first value sent is boolean (because flag isn't enabled) while
+ // subsequent values are not. We don't want this to be misunderstood as a boolean.
+ return Some(PropertyValueType::String);
+ }
+
+ if key == "$feature_flag_response" {
+ // $feature_flag_response properties should always be detected as strings.
+ // These are feature flag values, and can be boolean or string.
+ // Sometimes the first value sent is boolean (because flag isn't enabled) while
+ // subsequent values are not. We don't want this to be misunderstood as a boolean.
+ return Some(PropertyValueType::String);
+ }
+
+ if key.starts_with("$survey_response") {
+ // NB: $survey_responses are collected in an interesting way, where the first
+ // response is called `$survey_response` and subsequent responses are called
+ // `$survey_response_2`, `$survey_response_3`, etc. So, this check should auto-cast
+ // all survey responses to strings, and $survey_response properties should always be detected as strings.
+ return Some(PropertyValueType::String);
+ }
+
+ match value {
+ Value::String(s) => {
+ let s = &s.trim().to_lowercase();
+ if s == "true" || s == "false" {
+ Some(PropertyValueType::Boolean)
+ } else {
+ // TODO - we should try to auto-detect datetime strings here, but I'm skipping the chunk of regex necessary to do it for v0
+ Some(PropertyValueType::String)
+ }
+ }
+ Value::Number(_) => {
+ // TODO - this is a divergence from the TS impl - the TS also checks if the contained number is
+ // "likely" to be a unix timestamp on the basis of the number of characters. I have mixed feelings about this,
+ // so I'm going to leave it as just checking the key for now. This means we're being /less/ strict with datetime
+ // detection here than in the TS
+ if key.to_lowercase().contains("timestamp") || key.to_lowercase().contains("time") {
+ Some(PropertyValueType::DateTime)
+ } else {
+ Some(PropertyValueType::Numeric)
+ }
+ }
+ Value::Bool(_) => Some(PropertyValueType::Boolean),
+ _ => None,
+ }
+}
+
+fn sanitize_event_name(event_name: &str) -> String {
+ event_name.replace('\u{0000}', "\u{FFFD}")
+}
+
+// These hash impls correspond to DB uniqueness constraints, pulled from the TS
+
+impl Hash for PropertyDefinition {
+ fn hash(&self, state: &mut H) {
+ self.team_id.hash(state);
+ self.name.hash(state);
+ self.event_type.hash(state);
+ self.group_type_index.hash(state);
+ }
+}
+
+impl Hash for EventDefinition {
+ fn hash(&self, state: &mut H) {
+ self.team_id.hash(state);
+ self.name.hash(state);
+ self.last_seen_at.hash(state)
+ }
+}
+
+// Ensure group type hashes identically regardless of whether it's resolved or not. Note that if
+// someone changes the name associated with a group type, all subsequent events will hash differently
+// because of this, but that seems fine - it just means a few extra DB ops issued, we index on the i32
+// at write time anyway
+impl Hash for GroupType {
+ fn hash(&self, state: &mut H) {
+ match self {
+ GroupType::Unresolved(name) => name.hash(state),
+ GroupType::Resolved(name, _) => name.hash(state),
+ }
+ }
+}
+
+fn floor_datetime(dt: DateTime, duration: Duration) -> Result, RoundingError> {
+ let rounded = dt.duration_round(duration)?;
+
+ // If we rounded up
+ if rounded > dt {
+ Ok(rounded - duration)
+ } else {
+ Ok(rounded)
+ }
+}