From 8c18e2e155e26ae0ce140122852037703aa7a703 Mon Sep 17 00:00:00 2001 From: Neil Kakkar Date: Thu, 26 Dec 2024 13:49:59 +0000 Subject: [PATCH 1/2] feat(errors): Store error metadata for sorting --- .../models/error_tracking/error_tracking.py | 3 ++ posthog/tasks/error_tracking.py | 35 ++++++++++++++ posthog/tasks/scheduled.py | 7 +++ posthog/tasks/tasks.py | 7 +++ rust/Cargo.lock | 47 ++++++++++++++++--- rust/cymbal/Cargo.toml | 5 ++ rust/cymbal/src/app_context.rs | 4 ++ rust/cymbal/src/config.rs | 3 ++ rust/cymbal/src/error.rs | 2 + rust/cymbal/src/issue_resolution.rs | 38 +++++++++++++++ rust/cymbal/src/lib.rs | 7 ++- 11 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 posthog/tasks/error_tracking.py diff --git a/posthog/models/error_tracking/error_tracking.py b/posthog/models/error_tracking/error_tracking.py index 47ae96334b5f7..b7d852e358d86 100644 --- a/posthog/models/error_tracking/error_tracking.py +++ b/posthog/models/error_tracking/error_tracking.py @@ -24,6 +24,9 @@ class Status(models.TextChoices): name = models.TextField(null=True, blank=True) description = models.TextField(null=True, blank=True) + last_seen = models.DateTimeField(null=True, blank=True, auto_now_add=True) + occurrences = models.BigIntegerField(default=0) + def merge(self, issue_ids: list[str]) -> None: fingerprints = resolve_fingerprints_for_issues(team_id=self.team.pk, issue_ids=issue_ids) diff --git a/posthog/tasks/error_tracking.py b/posthog/tasks/error_tracking.py new file mode 100644 index 0000000000000..20974aef76d93 --- /dev/null +++ b/posthog/tasks/error_tracking.py @@ -0,0 +1,35 @@ +from sentry_sdk import capture_exception +from posthog.models.error_tracking.error_tracking import ErrorTrackingIssue +from posthog.redis import get_client +from django.db.models import F + + +def populate_error_tracking_issue_metrics(): + client = get_client() + keys = client.scan_iter(match="issue_metadata:*", count=20) + for key in keys: + # check key is valid + parts = key.split(":") + if len(parts) != 3: + continue + + team_id = parts[1] + issue_id = parts[2] + + # Fetch data associated with the key + data = client.hgetall(key) + last_seen = data.get(b"last_seen") + occurrences = data.get(b"occurrences") + + try: + # update the issue and reset redis key + ErrorTrackingIssue.objects.filter(team=team_id, id=issue_id).update( + last_seen=last_seen, occurrences=F("occurrences") + occurrences + ) + # :TRICKY: Resetting the redis key here is prone to race conditions, + # but given we sync later and the data here is not critical, just an estimate for sorting, + # I'm skipping locking and letting this be. + client.delete(key) + except Exception as error: + capture_exception(error) + continue diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index 8912d1e362be3..fb9e2d51cca97 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -49,6 +49,7 @@ send_org_usage_reports, start_poll_query_performance, stop_surveys_reached_target, + populate_error_tracking_issue_metrics, sync_all_organization_available_product_features, update_event_partitions, update_quota_limiting, @@ -254,6 +255,12 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: name="clickhouse clear deleted person data", ) + sender.add_periodic_task( + crontab(minute="*/10"), + populate_error_tracking_issue_metrics.s(), + name="populate error tracking issue metrics", + ) + sender.add_periodic_task( crontab(hour="*/12"), stop_surveys_reached_target.s(), diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 836a89e97ab53..5bfe989bad449 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -918,3 +918,10 @@ def calculate_external_data_rows_synced() -> None: pass else: capture_external_data_rows_synced() + + +@shared_task(ignore_result=True) +def populate_error_tracking_issue_metrics() -> None: + from posthog.tasks.error_tracking import sync_error_tracking_issue_metrics + + sync_error_tracking_issue_metrics() diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 12ea18182d17e..95ab62145fc55 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -97,6 +97,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayvec" version = "0.7.6" @@ -356,9 +362,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", @@ -1119,7 +1125,7 @@ dependencies = [ "opentelemetry_sdk", "rand", "rdkafka 0.36.2", - "redis", + "redis 0.23.3", "reqwest 0.12.3", "serde", "serde_json", @@ -1496,6 +1502,7 @@ dependencies = [ "mockall", "moka", "rdkafka 0.35.0", + "redis 0.27.6", "reqwest 0.12.3", "serde", "serde_json", @@ -1925,7 +1932,7 @@ dependencies = [ "once_cell", "petgraph", "rand", - "redis", + "redis 0.23.3", "regex", "reqwest 0.12.3", "serde", @@ -3429,9 +3436,9 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ "num-integer", "num-traits", @@ -4274,6 +4281,34 @@ dependencies = [ "url", ] +[[package]] +name = "redis" +version = "0.27.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "crc16", + "futures", + "futures-util", + "itertools 0.13.0", + "itoa", + "log", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "rand", + "ryu", + "sha1_smol", + "socket2 0.5.5", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/rust/cymbal/Cargo.toml b/rust/cymbal/Cargo.toml index 2cce1823e66c4..3936e36881af4 100644 --- a/rust/cymbal/Cargo.toml +++ b/rust/cymbal/Cargo.toml @@ -40,6 +40,11 @@ aws-sdk-s3 = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } moka = { workspace = true } +redis = { version = "0.27.6", features = [ + "tokio-comp", + "cluster", + "cluster-async", +] } [dev-dependencies] httpmock = { workspace = true } diff --git a/rust/cymbal/src/app_context.rs b/rust/cymbal/src/app_context.rs index 8b369a78440b4..462f8b0508231 100644 --- a/rust/cymbal/src/app_context.rs +++ b/rust/cymbal/src/app_context.rs @@ -26,6 +26,7 @@ pub struct AppContext { pub kafka_consumer: SingleTopicConsumer, pub kafka_producer: FutureProducer, pub pool: PgPool, + pub redis: Arc, pub catalog: Catalog, pub resolver: Resolver, pub config: Config, @@ -51,6 +52,8 @@ impl AppContext { let options = PgPoolOptions::new().max_connections(config.max_pg_connections); let pool = options.connect(&config.database_url).await?; + let redis = Arc::new(redis::Client::open(config.redis_url.clone())?); + let aws_credentials = aws_sdk_s3::config::Credentials::new( &config.object_storage_access_key_id, &config.object_storage_secret_access_key, @@ -100,6 +103,7 @@ impl AppContext { kafka_consumer, kafka_producer, pool, + redis, catalog, resolver, config: config.clone(), diff --git a/rust/cymbal/src/config.rs b/rust/cymbal/src/config.rs index 2b0cdaf20a50a..0bbd0e0124450 100644 --- a/rust/cymbal/src/config.rs +++ b/rust/cymbal/src/config.rs @@ -30,6 +30,9 @@ pub struct Config { #[envconfig(default = "postgres://posthog:posthog@localhost:5432/posthog")] pub database_url: String, + #[envconfig(default = "redis://localhost:6379/")] + pub redis_url: String, + // Rust service connect directly to postgres, not via pgbouncer, so we keep this low #[envconfig(default = "4")] pub max_pg_connections: u32, diff --git a/rust/cymbal/src/error.rs b/rust/cymbal/src/error.rs index 98de08f5fe75c..c79cb862fb3e0 100644 --- a/rust/cymbal/src/error.rs +++ b/rust/cymbal/src/error.rs @@ -28,6 +28,8 @@ pub enum UnhandledError { ByteStreamError(#[from] ByteStreamError), // AWS specific bytestream error. Idk #[error("Unhandled serde error: {0}")] SerdeError(#[from] serde_json::Error), + #[error("Redis error: {0}")] + RedisError(#[from] redis::RedisError), } // These are errors that occur during frame resolution. This excludes e.g. network errors, diff --git a/rust/cymbal/src/issue_resolution.rs b/rust/cymbal/src/issue_resolution.rs index b80c8d17d9b36..ef0f4f9a7c46c 100644 --- a/rust/cymbal/src/issue_resolution.rs +++ b/rust/cymbal/src/issue_resolution.rs @@ -1,6 +1,11 @@ +use std::sync::Arc; + use sqlx::postgres::any::AnyConnectionBackend; use uuid::Uuid; +use redis::{RedisError, AsyncCommands}; +use tracing::error; + use crate::{ error::UnhandledError, types::{FingerprintedErrProps, OutputErrProps}, @@ -183,3 +188,36 @@ where Ok(fingerprinted.to_output(issue_override.issue_id)) } + +pub async fn track_issue_metadata( + team_id: i32, + issue_id: Uuid, + redis: Arc, + // TODO: Confirm timestamp format + timestamp: String, +) -> Result<(), UnhandledError> +{ + + let mut conn = match redis.get_multiplexed_async_connection().await { + Ok(conn) => conn, + Err(e) => { + error!("Error tracking issue metadata: {:?}", e); + return Ok(()); + } + }; + + let redis_key = format!("issue_metadata:{}:{}", team_id, issue_id); + + let res: Result<(), RedisError> = redis::pipe() + .hset(redis_key.clone(), "last_seen", timestamp) + .hincr(redis_key, "occurrences", 1) + .query_async(&mut conn) + .await; + + // on error, log the error but don't propagate it + if let Err(e) = res { + error!("Error tracking issue metadata: {:?}", e); + } + + Ok(()) +} \ No newline at end of file diff --git a/rust/cymbal/src/lib.rs b/rust/cymbal/src/lib.rs index ed5720148fc9c..0d83dd846b642 100644 --- a/rust/cymbal/src/lib.rs +++ b/rust/cymbal/src/lib.rs @@ -4,7 +4,7 @@ use app_context::AppContext; use common_types::ClickHouseEvent; use error::{EventError, UnhandledError}; use fingerprinting::generate_fingerprint; -use issue_resolution::resolve_issue; +use issue_resolution::{resolve_issue, track_issue_metadata}; use tracing::warn; use types::{Exception, RawErrProps, Stacktrace}; @@ -55,6 +55,11 @@ pub async fn handle_event( let mut output = resolve_issue(&context.pool, event.team_id, fingerprinted).await?; + // TODO: Tracking issue metadata is currently a best-effort operation, and we don't want to + // fail the event processing if it fails. Also, unclear what should happen on merges with redis storage. + // Right now, I expect this to go out of sync so we should have some nightly celery task to keep it in sync. + track_issue_metadata(event.team_id, output.issue_id, context.redis.clone(), event.timestamp.clone()).await?; + // TODO - I'm not sure we actually want to do this? Maybe junk drawer stuff should end up in clickhouse, and // be directly queryable by users? Stripping it for now, so it only ends up in postgres output.strip_frame_junk(); From 765284b301e899ea4ae13ebb77560ee7bf5d74e1 Mon Sep 17 00:00:00 2001 From: Neil Kakkar Date: Thu, 26 Dec 2024 14:29:54 +0000 Subject: [PATCH 2/2] upd file --- posthog/tasks/error_tracking.py | 8 ++++++++ posthog/tasks/tasks.py | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/posthog/tasks/error_tracking.py b/posthog/tasks/error_tracking.py index 20974aef76d93..08adc4d32a2b0 100644 --- a/posthog/tasks/error_tracking.py +++ b/posthog/tasks/error_tracking.py @@ -21,6 +21,14 @@ def populate_error_tracking_issue_metrics(): last_seen = data.get(b"last_seen") occurrences = data.get(b"occurrences") + if not last_seen or not occurrences: + # there should be no case where one is missing + try: + client.delete(key) + except Exception as error: + capture_exception(error) + continue + try: # update the issue and reset redis key ErrorTrackingIssue.objects.filter(team=team_id, id=issue_id).update( diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 5bfe989bad449..bc75bc8d6ef95 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -922,6 +922,6 @@ def calculate_external_data_rows_synced() -> None: @shared_task(ignore_result=True) def populate_error_tracking_issue_metrics() -> None: - from posthog.tasks.error_tracking import sync_error_tracking_issue_metrics + from posthog.tasks.error_tracking import populate_error_tracking_issue_metrics - sync_error_tracking_issue_metrics() + populate_error_tracking_issue_metrics()