From 73c1730d53fc23e3090c37edb33774c1259c0e2d Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Wed, 4 Sep 2024 00:59:37 +0300 Subject: [PATCH] feat(capture): rip out all tracing spans (#24767) Co-authored-by: Brett Hoerner --- rust/capture/src/limiters/redis.rs | 6 ----- rust/capture/src/sinks/kafka.rs | 8 +----- rust/capture/src/sinks/print.rs | 3 --- rust/capture/src/v0_endpoint.rs | 36 ++----------------------- rust/capture/src/v0_request.rs | 3 --- rust/cyclotron-janitor/tests/janitor.rs | 20 +++++++++++--- 6 files changed, 20 insertions(+), 56 deletions(-) diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index 132f09df77513..c34d93ca1c37c 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -26,7 +26,6 @@ use crate::redis::Client; use thiserror::Error; use time::{Duration, OffsetDateTime}; use tokio::sync::RwLock; -use tracing::instrument; // todo: fetch from env const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/"; @@ -90,7 +89,6 @@ impl RedisLimiter { }) } - #[instrument(skip_all)] async fn fetch_limited( client: &Arc, key_prefix: &str, @@ -103,7 +101,6 @@ impl RedisLimiter { .await } - #[instrument(skip_all, fields(key = key))] pub async fn is_limited(&self, key: &str, resource: QuotaResource) -> bool { // hold the read lock to clone it, very briefly. clone is ok because it's very small 🤏 // rwlock can have many readers, but one writer. the writer will wait in a queue with all @@ -126,9 +123,6 @@ impl RedisLimiter { let mut updated = self.updated.write().await; *updated = OffsetDateTime::now_utc(); - let span = tracing::debug_span!("updating billing cache from redis"); - let _span = span.enter(); - // a few requests might end up in here concurrently, but I don't think a few extra will // be a big problem. If it is, we can rework the concurrency a bit. // On prod atm we call this around 15 times per second at peak times, and it usually diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index 2e008a900ccd6..6930a39b9d2fc 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -10,7 +10,6 @@ use rdkafka::util::Timeout; use rdkafka::ClientConfig; use tokio::task::JoinSet; use tracing::log::{debug, error, info}; -use tracing::{info_span, instrument, Instrument}; use crate::api::{CaptureError, DataType, ProcessedEvent}; use crate::config::KafkaConfig; @@ -280,16 +279,12 @@ impl KafkaSink { #[async_trait] impl Event for KafkaSink { - #[instrument(skip_all)] async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { let ack = self.kafka_send(event).await?; histogram!("capture_event_batch_size").record(1.0); - Self::process_ack(ack) - .instrument(info_span!("ack_wait_one")) - .await + Self::process_ack(ack).await } - #[instrument(skip_all)] async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { let mut set = JoinSet::new(); let batch_size = events.len(); @@ -319,7 +314,6 @@ impl Event for KafkaSink { } Ok(()) } - .instrument(info_span!("ack_wait_many")) .await?; histogram!("capture_event_batch_size").record(batch_size as f64); diff --git a/rust/capture/src/sinks/print.rs b/rust/capture/src/sinks/print.rs index 7845a3d039b56..02b784248725b 100644 --- a/rust/capture/src/sinks/print.rs +++ b/rust/capture/src/sinks/print.rs @@ -16,9 +16,6 @@ impl Event for PrintSink { Ok(()) } async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { - let span = tracing::span!(tracing::Level::INFO, "batch of events"); - let _enter = span.enter(); - histogram!("capture_event_batch_size").record(events.len() as f64); counter!("capture_events_ingested_total").increment(events.len() as u64); for event in events { diff --git a/rust/capture/src/v0_endpoint.rs b/rust/capture/src/v0_endpoint.rs index 03b550cd9cdaf..7f5f949114fb4 100644 --- a/rust/capture/src/v0_endpoint.rs +++ b/rust/capture/src/v0_endpoint.rs @@ -11,7 +11,6 @@ use base64::Engine; use metrics::counter; use serde_json::json; use serde_json::Value; -use tracing::instrument; use crate::limiters::redis::QuotaResource; use crate::prometheus::report_dropped_events; @@ -133,20 +132,6 @@ async fn handle_common( Ok((context, events)) } -#[instrument( - skip_all, - fields( - path, - token, - batch_size, - user_agent, - content_encoding, - content_type, - version, - compression, - historical_migration - ) -)] #[debug_handler] pub async fn event( state: State, @@ -203,20 +188,6 @@ pub async fn event( } } -#[instrument( - skip_all, - fields( - path, - token, - batch_size, - user_agent, - content_encoding, - content_type, - version, - compression, - historical_migration - ) -)] #[debug_handler] pub async fn recording( state: State, @@ -280,7 +251,6 @@ pub async fn options() -> Result, CaptureError> { })) } -#[instrument(skip_all)] pub fn process_single_event( event: &RawEvent, context: &ProcessingContext, @@ -315,7 +285,6 @@ pub fn process_single_event( }) } -#[instrument(skip_all, fields(events = events.len()))] pub async fn process_events<'a>( sink: Arc, events: &'a [RawEvent], @@ -335,11 +304,10 @@ pub async fn process_events<'a>( } } -#[instrument(skip_all, fields(events = events.len()))] -pub async fn process_replay_events<'a>( +pub async fn process_replay_events( sink: Arc, mut events: Vec, - context: &'a ProcessingContext, + context: &ProcessingContext, ) -> Result<(), CaptureError> { // Grab metadata about the whole batch from the first event before // we drop all the events as we rip out the snapshot data diff --git a/rust/capture/src/v0_request.rs b/rust/capture/src/v0_request.rs index d25a1280b7538..8211f9d880d84 100644 --- a/rust/capture/src/v0_request.rs +++ b/rust/capture/src/v0_request.rs @@ -7,7 +7,6 @@ use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; use time::format_description::well_known::Iso8601; use time::OffsetDateTime; -use tracing::instrument; use uuid::Uuid; use crate::api::CaptureError; @@ -124,7 +123,6 @@ impl RawRequest { /// fail due to it being missing when the body is compressed. /// Instead of trusting the parameter, we peek at the payload's first three bytes to /// detect gzip, fallback to uncompressed utf8 otherwise. - #[instrument(skip_all)] pub fn from_bytes(bytes: Bytes, limit: usize) -> Result { tracing::debug!(len = bytes.len(), "decoding new event"); @@ -216,7 +214,6 @@ impl RawRequest { } } -#[instrument(skip_all, fields(events = events.len()))] pub fn extract_token(events: &[RawEvent]) -> Result { let distinct_tokens: HashSet> = HashSet::from_iter( events diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index d213477613c78..7dceae4969c19 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Duration, Timelike, Utc}; use common_kafka::kafka_messages::app_metrics2::{ AppMetric2, Kind as AppMetric2Kind, Source as AppMetric2Source, }; @@ -66,6 +66,7 @@ async fn janitor_test(db: PgPool) { }; // First test - if we mark a job as completed, the janitor will clean it up + let mut job_now = Utc::now(); manager.create_job(job_init.clone()).await.unwrap(); let job = worker .dequeue_jobs(&queue_name, 1) @@ -92,7 +93,13 @@ async fn janitor_test(db: PgPool) { app_metric, AppMetric2 { team_id: 1, - timestamp: DateTime::::from_str("2024-08-30T19:00:00Z").unwrap(), + timestamp: job_now + .with_minute(0) + .unwrap() + .with_second(0) + .unwrap() + .with_nanosecond(0) + .unwrap(), app_source: AppMetric2Source::Cyclotron, app_source_id: uuid.to_string(), instance_id: None, @@ -104,6 +111,7 @@ async fn janitor_test(db: PgPool) { } // Second test - if we mark a job as failed, the janitor will clean it up + job_now = Utc::now(); manager.create_job(job_init.clone()).await.unwrap(); let job = worker .dequeue_jobs(&queue_name, 1) @@ -130,7 +138,13 @@ async fn janitor_test(db: PgPool) { app_metric, AppMetric2 { team_id: 1, - timestamp: DateTime::::from_str("2024-08-30T19:00:00Z").unwrap(), + timestamp: job_now + .with_minute(0) + .unwrap() + .with_second(0) + .unwrap() + .with_nanosecond(0) + .unwrap(), app_source: AppMetric2Source::Cyclotron, app_source_id: uuid.to_string(), instance_id: None,