diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml index 2b5cb3c5910a0..e8a31361e9f70 100644 --- a/rust/.cargo/config.toml +++ b/rust/.cargo/config.toml @@ -1,4 +1,4 @@ [env] # Force SQLX to run in offline mode for CI. Devs can change this if they want, to live code against the DB, # but we use it at the workspace level here to allow use of sqlx macros across all crates -SQLX_OFFLINE = "true" +SQLX_OFFLINE = "false" diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3f3c36157e36b..9fd4361c23b7c 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1002,6 +1002,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/rust/cymbal/Cargo.toml b/rust/cymbal/Cargo.toml index 532730d12e36a..8fa1d3f46547c 100644 --- a/rust/cymbal/Cargo.toml +++ b/rust/cymbal/Cargo.toml @@ -23,6 +23,7 @@ serde_json = { workspace = true } serde = { workspace = true } sourcemap = "9.0.0" reqwest = { workspace = true } +uuid = { workspace = true } [dev-dependencies] httpmock = { workspace = true } diff --git a/rust/cymbal/src/main.rs b/rust/cymbal/src/main.rs index 277205ea83467..d07e455a470b9 100644 --- a/rust/cymbal/src/main.rs +++ b/rust/cymbal/src/main.rs @@ -9,7 +9,11 @@ use cymbal::{ config::Config, error::Error, metric_consts::{ERRORS, EVENT_RECEIVED, MAIN_LOOP_TIME, PER_STACK_TIME, STACK_PROCESSED}, - types::{db::ErrorTrackingGroup, frames::RawFrame, ErrProps}, + types::{ + db::{find_error_tracking_issue_fingerprint, ErrorTrackingGroup}, + frames::RawFrame, + ErrProps, + }, }; use envconfig::Envconfig; use tokio::task::JoinHandle; @@ -138,31 +142,12 @@ async fn main() -> Result<(), Error> { .fin(); whole_loop.label("had_frame", "true").fin(); - let transaction_time = common_metrics::timing_guard(UPDATE_TRANSACTION_TIME, &[]); - if !self.skip_writes && !self.skip_reads { - let mut tx = context.pool.begin().await?; + let fingerprint = String("12345678"); - let update = ErrorTrackingGroup { - fingerprint: String("123456"), - team_id: event.team_id, - }; + let mut connection = self.pool.acquire().await?; - match update.issue(&mut *tx).await { - Ok(_) => {} - Err(sqlx::Error::Database(e)) if e.constraint().is_some() => { - // If we hit a constraint violation, we just skip the update. We see - // this in production for group-type-indexes not being resolved, and it's - // not worth aborting the whole batch for. - warn!("Failed to issue update: {:?}", e); - } - Err(e) => { - tx.rollback().await?; - return Err(e); - } - } - tx.commit().await?; - } - transaction_time.fin(); + let issue_id = + error_tracking_issue_for_fingerprint(connection.as_mut(), event.team_id, fingerprint); metrics::counter!(STACK_PROCESSED).increment(1); } diff --git a/rust/cymbal/src/types/db.rs b/rust/cymbal/src/types/db.rs index 662c2ec8daa33..d4a8250f28a62 100644 --- a/rust/cymbal/src/types/db.rs +++ b/rust/cymbal/src/types/db.rs @@ -1,37 +1,84 @@ -use sqlx::{types::Uuid, Executor, Postgres}; -use std::hash::Hash; +use uuid::Uuid; -#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)] -pub struct ErrorTrackingGroup { +pub struct ErrorTrackingIssueFingerprint { + pub id: Uuid, + pub team_id: i32, pub fingerprint: String, + pub issue_id: Uuid, + pub version: i64, +} + +pub struct ErrorTrackingGroup { + pub id: Uuid, pub team_id: i32, + pub fingerprint: String, } -impl Hash for ErrorTrackingGroup { - fn hash(&self, state: &mut H) { - self.team_id.hash(state); - self.fingerprint.hash(state); +pub async fn error_tracking_issue_for_fingerprint<'c, E>( + executor: E, + team_id: i32, + fingerprint: String, +) -> Uuid +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let res = sqlx::query_as!( + ErrorTrackingIssueFingerprint, + r#" + SELECT * + FROM posthog_errortrackingissuefingerprint + WHERE team_id = $1 AND fingerprint = $2 + ORDER BY version DESC + "#, + team_id, + fingerprint + ) + .fetch_one(executor) + .await; + + match res { + Ok(issue_fingerprint) => issue_fingerprint.issue_id, + Err(_) => { + return create_error_tracking_issue(executor, team_id, fingerprint).await; + } } } -impl ErrorTrackingGroup { - pub async fn issue<'c, E>(&self, executor: E) -> Result<(), sqlx::Error> - where - E: Executor<'c, Database = Postgres>, - { - sqlx::query!( - r#" - INSERT INTO posthog_errortrackinggroup (id, fingerprint, team_id, created_at) - VALUES ($1, $2, $3, NOW()) ON CONFLICT - ON CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq - DO UPDATE SET created_at = $4 +pub async fn create_error_tracking_issue<'c, E>( + executor: E, + team_id: i32, + fingerprint: String, +) -> Uuid +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let issue_id = Uuid::now_v7(); + + sqlx::query!( + r#" + INSERT INTO posthog_errortrackinggroup (id, team_id, fingerprint) + VALUES ($1, $2, $3) "#, - Uuid::now_v7(), - self.fingerprint, - self.team_id, - ) - .execute(executor) - .await - .map(|_| ()) - } + issue_id, + team_id, + fingerprint + ) + .execute(executor) + .await; + + sqlx::query!( + r#" + INSERT INTO posthog_errortrackingissuefingerprint (team_id, fingerprint, issue_id) + VALUES ($1, $2, $3) + "#, + team_id, + fingerprint, + issue_id + ) + .execute(executor) + .await; + + // TODO: write to Kafka + + issue_id }