Skip to content

Commit

Permalink
issue lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
daibhin committed Oct 29, 2024
1 parent b238b34 commit 8b12ff0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
28 changes: 27 additions & 1 deletion rust/cymbal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use cymbal::{
config::Config,
error::Error,
metric_consts::{ERRORS, EVENT_RECEIVED, MAIN_LOOP_TIME, PER_STACK_TIME, STACK_PROCESSED},
types::{frames::RawFrame, ErrProps},
types::{db::ErrorTrackingGroup, frames::RawFrame, ErrProps},
};
use envconfig::Envconfig;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -138,6 +138,32 @@ async fn main() -> Result<(), Error> {
.fin();
whole_loop.label("had_frame", "true").fin();

let transaction_time = common_metrics::timing_guard(UPDATE_TRANSACTION_TIME, &[]);
if !self.skip_writes && !self.skip_reads {
let mut tx = context.pool.begin().await?;

let update = ErrorTrackingGroup {
fingerprint: String("123456"),
team_id: event.team_id,
};

match update.issue(&mut *tx).await {
Ok(_) => {}
Err(sqlx::Error::Database(e)) if e.constraint().is_some() => {
// If we hit a constraint violation, we just skip the update. We see
// this in production for group-type-indexes not being resolved, and it's
// not worth aborting the whole batch for.
warn!("Failed to issue update: {:?}", e);
}
Err(e) => {
tx.rollback().await?;
return Err(e);
}
}
tx.commit().await?;
}
transaction_time.fin();

metrics::counter!(STACK_PROCESSED).increment(1);
}
}
37 changes: 37 additions & 0 deletions rust/cymbal/src/types/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use sqlx::{types::Uuid, Executor, Postgres};
use std::hash::Hash;

#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub struct ErrorTrackingGroup {
pub fingerprint: String,
pub team_id: i32,
}

impl Hash for ErrorTrackingGroup {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.team_id.hash(state);
self.fingerprint.hash(state);
}
}

impl ErrorTrackingGroup {
pub async fn issue<'c, E>(&self, executor: E) -> Result<(), sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
sqlx::query!(
r#"
INSERT INTO posthog_errortrackinggroup (id, fingerprint, team_id, created_at)
VALUES ($1, $2, $3, NOW()) ON CONFLICT
ON CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq
DO UPDATE SET created_at = $4
"#,
Uuid::now_v7(),
self.fingerprint,
self.team_id,
)
.execute(executor)
.await
.map(|_| ())
}
}
1 change: 1 addition & 0 deletions rust/cymbal/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;

pub mod db;
pub mod frames;

#[derive(Debug, Deserialize, Serialize, Clone)]
Expand Down

0 comments on commit 8b12ff0

Please sign in to comment.