From 8b12ff08a0db9e6e708615558d4e1ca6b8ac258a Mon Sep 17 00:00:00 2001 From: David Newell Date: Tue, 29 Oct 2024 13:43:13 +0000 Subject: [PATCH] issue lookup --- rust/cymbal/src/main.rs | 28 ++++++++++++++++++++++++++- rust/cymbal/src/types/db.rs | 37 ++++++++++++++++++++++++++++++++++++ rust/cymbal/src/types/mod.rs | 1 + 3 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 rust/cymbal/src/types/db.rs diff --git a/rust/cymbal/src/main.rs b/rust/cymbal/src/main.rs index 29086defed700..277205ea83467 100644 --- a/rust/cymbal/src/main.rs +++ b/rust/cymbal/src/main.rs @@ -9,7 +9,7 @@ use cymbal::{ config::Config, error::Error, metric_consts::{ERRORS, EVENT_RECEIVED, MAIN_LOOP_TIME, PER_STACK_TIME, STACK_PROCESSED}, - types::{frames::RawFrame, ErrProps}, + types::{db::ErrorTrackingGroup, frames::RawFrame, ErrProps}, }; use envconfig::Envconfig; use tokio::task::JoinHandle; @@ -138,6 +138,32 @@ async fn main() -> Result<(), Error> { .fin(); whole_loop.label("had_frame", "true").fin(); + let transaction_time = common_metrics::timing_guard(UPDATE_TRANSACTION_TIME, &[]); + if !self.skip_writes && !self.skip_reads { + let mut tx = context.pool.begin().await?; + + let update = ErrorTrackingGroup { + fingerprint: String("123456"), + team_id: event.team_id, + }; + + match update.issue(&mut *tx).await { + Ok(_) => {} + Err(sqlx::Error::Database(e)) if e.constraint().is_some() => { + // If we hit a constraint violation, we just skip the update. We see + // this in production for group-type-indexes not being resolved, and it's + // not worth aborting the whole batch for. + warn!("Failed to issue update: {:?}", e); + } + Err(e) => { + tx.rollback().await?; + return Err(e); + } + } + tx.commit().await?; + } + transaction_time.fin(); + metrics::counter!(STACK_PROCESSED).increment(1); } } diff --git a/rust/cymbal/src/types/db.rs b/rust/cymbal/src/types/db.rs new file mode 100644 index 0000000000000..662c2ec8daa33 --- /dev/null +++ b/rust/cymbal/src/types/db.rs @@ -0,0 +1,37 @@ +use sqlx::{types::Uuid, Executor, Postgres}; +use std::hash::Hash; + +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)] +pub struct ErrorTrackingGroup { + pub fingerprint: String, + pub team_id: i32, +} + +impl Hash for ErrorTrackingGroup { + fn hash(&self, state: &mut H) { + self.team_id.hash(state); + self.fingerprint.hash(state); + } +} + +impl ErrorTrackingGroup { + pub async fn issue<'c, E>(&self, executor: E) -> Result<(), sqlx::Error> + where + E: Executor<'c, Database = Postgres>, + { + sqlx::query!( + r#" + INSERT INTO posthog_errortrackinggroup (id, fingerprint, team_id, created_at) + VALUES ($1, $2, $3, NOW()) ON CONFLICT + ON CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq + DO UPDATE SET created_at = $4 + "#, + Uuid::now_v7(), + self.fingerprint, + self.team_id, + ) + .execute(executor) + .await + .map(|_| ()) + } +} diff --git a/rust/cymbal/src/types/mod.rs b/rust/cymbal/src/types/mod.rs index 3a420e6410601..3e23fcde3818e 100644 --- a/rust/cymbal/src/types/mod.rs +++ b/rust/cymbal/src/types/mod.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; use serde_json::Value; +pub mod db; pub mod frames; #[derive(Debug, Deserialize, Serialize, Clone)]