Skip to content

Commit

Permalink
main flow
Browse files Browse the repository at this point in the history
  • Loading branch information
daibhin committed Oct 29, 2024
1 parent 8b12ff0 commit f939251
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 52 deletions.
2 changes: 1 addition & 1 deletion rust/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[env]
# Force SQLX to run in offline mode for CI. Devs can change this if they want, to live code against the DB,
# but we use it at the workspace level here to allow use of sqlx macros across all crates
SQLX_OFFLINE = "true"
SQLX_OFFLINE = "false"
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/cymbal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ serde_json = { workspace = true }
serde = { workspace = true }
sourcemap = "9.0.0"
reqwest = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
httpmock = { workspace = true }
Expand Down
33 changes: 9 additions & 24 deletions rust/cymbal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use cymbal::{
config::Config,
error::Error,
metric_consts::{ERRORS, EVENT_RECEIVED, MAIN_LOOP_TIME, PER_STACK_TIME, STACK_PROCESSED},
types::{db::ErrorTrackingGroup, frames::RawFrame, ErrProps},
types::{
db::{find_error_tracking_issue_fingerprint, ErrorTrackingGroup},
frames::RawFrame,
ErrProps,
},
};
use envconfig::Envconfig;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -138,31 +142,12 @@ async fn main() -> Result<(), Error> {
.fin();
whole_loop.label("had_frame", "true").fin();

let transaction_time = common_metrics::timing_guard(UPDATE_TRANSACTION_TIME, &[]);
if !self.skip_writes && !self.skip_reads {
let mut tx = context.pool.begin().await?;
let fingerprint = String("12345678");

let update = ErrorTrackingGroup {
fingerprint: String("123456"),
team_id: event.team_id,
};
let mut connection = self.pool.acquire().await?;

match update.issue(&mut *tx).await {
Ok(_) => {}
Err(sqlx::Error::Database(e)) if e.constraint().is_some() => {
// If we hit a constraint violation, we just skip the update. We see
// this in production for group-type-indexes not being resolved, and it's
// not worth aborting the whole batch for.
warn!("Failed to issue update: {:?}", e);
}
Err(e) => {
tx.rollback().await?;
return Err(e);
}
}
tx.commit().await?;
}
transaction_time.fin();
let issue_id =
error_tracking_issue_for_fingerprint(connection.as_mut(), event.team_id, fingerprint);

metrics::counter!(STACK_PROCESSED).increment(1);
}
Expand Down
101 changes: 74 additions & 27 deletions rust/cymbal/src/types/db.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,84 @@
use sqlx::{types::Uuid, Executor, Postgres};
use std::hash::Hash;
use uuid::Uuid;

#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub struct ErrorTrackingGroup {
pub struct ErrorTrackingIssueFingerprint {
pub id: Uuid,
pub team_id: i32,
pub fingerprint: String,
pub issue_id: Uuid,
pub version: i64,
}

pub struct ErrorTrackingGroup {
pub id: Uuid,
pub team_id: i32,
pub fingerprint: String,
}

impl Hash for ErrorTrackingGroup {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.team_id.hash(state);
self.fingerprint.hash(state);
pub async fn error_tracking_issue_for_fingerprint<'c, E>(
executor: E,
team_id: i32,
fingerprint: String,
) -> Uuid
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let res = sqlx::query_as!(
ErrorTrackingIssueFingerprint,
r#"
SELECT *
FROM posthog_errortrackingissuefingerprint
WHERE team_id = $1 AND fingerprint = $2
ORDER BY version DESC
"#,
team_id,
fingerprint
)
.fetch_one(executor)
.await;

match res {
Ok(issue_fingerprint) => issue_fingerprint.issue_id,
Err(_) => {
return create_error_tracking_issue(executor, team_id, fingerprint).await;
}
}
}

impl ErrorTrackingGroup {
pub async fn issue<'c, E>(&self, executor: E) -> Result<(), sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
sqlx::query!(
r#"
INSERT INTO posthog_errortrackinggroup (id, fingerprint, team_id, created_at)
VALUES ($1, $2, $3, NOW()) ON CONFLICT
ON CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq
DO UPDATE SET created_at = $4
pub async fn create_error_tracking_issue<'c, E>(
executor: E,
team_id: i32,
fingerprint: String,
) -> Uuid
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let issue_id = Uuid::now_v7();

sqlx::query!(
r#"
INSERT INTO posthog_errortrackinggroup (id, team_id, fingerprint)
VALUES ($1, $2, $3)
"#,
Uuid::now_v7(),
self.fingerprint,
self.team_id,
)
.execute(executor)
.await
.map(|_| ())
}
issue_id,
team_id,
fingerprint
)
.execute(executor)
.await;

sqlx::query!(
r#"
INSERT INTO posthog_errortrackingissuefingerprint (team_id, fingerprint, issue_id)
VALUES ($1, $2, $3)
"#,
team_id,
fingerprint,
issue_id
)
.execute(executor)
.await;

// TODO: write to Kafka

issue_id
}

0 comments on commit f939251

Please sign in to comment.