diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index c34d93ca1c37c..132f09df77513 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -26,6 +26,7 @@ use crate::redis::Client; use thiserror::Error; use time::{Duration, OffsetDateTime}; use tokio::sync::RwLock; +use tracing::instrument; // todo: fetch from env const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/"; @@ -89,6 +90,7 @@ impl RedisLimiter { }) } + #[instrument(skip_all)] async fn fetch_limited( client: &Arc, key_prefix: &str, @@ -101,6 +103,7 @@ impl RedisLimiter { .await } + #[instrument(skip_all, fields(key = key))] pub async fn is_limited(&self, key: &str, resource: QuotaResource) -> bool { // hold the read lock to clone it, very briefly. clone is ok because it's very small 🤏 // rwlock can have many readers, but one writer. the writer will wait in a queue with all @@ -123,6 +126,9 @@ impl RedisLimiter { let mut updated = self.updated.write().await; *updated = OffsetDateTime::now_utc(); + let span = tracing::debug_span!("updating billing cache from redis"); + let _span = span.enter(); + // a few requests might end up in here concurrently, but I don't think a few extra will // be a big problem. If it is, we can rework the concurrency a bit. // On prod atm we call this around 15 times per second at peak times, and it usually diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index 6930a39b9d2fc..2e008a900ccd6 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -10,6 +10,7 @@ use rdkafka::util::Timeout; use rdkafka::ClientConfig; use tokio::task::JoinSet; use tracing::log::{debug, error, info}; +use tracing::{info_span, instrument, Instrument}; use crate::api::{CaptureError, DataType, ProcessedEvent}; use crate::config::KafkaConfig; @@ -279,12 +280,16 @@ impl KafkaSink { #[async_trait] impl Event for KafkaSink { + #[instrument(skip_all)] async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { let ack = self.kafka_send(event).await?; histogram!("capture_event_batch_size").record(1.0); - Self::process_ack(ack).await + Self::process_ack(ack) + .instrument(info_span!("ack_wait_one")) + .await } + #[instrument(skip_all)] async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { let mut set = JoinSet::new(); let batch_size = events.len(); @@ -314,6 +319,7 @@ impl Event for KafkaSink { } Ok(()) } + .instrument(info_span!("ack_wait_many")) .await?; histogram!("capture_event_batch_size").record(batch_size as f64); diff --git a/rust/capture/src/sinks/print.rs b/rust/capture/src/sinks/print.rs index 02b784248725b..7845a3d039b56 100644 --- a/rust/capture/src/sinks/print.rs +++ b/rust/capture/src/sinks/print.rs @@ -16,6 +16,9 @@ impl Event for PrintSink { Ok(()) } async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { + let span = tracing::span!(tracing::Level::INFO, "batch of events"); + let _enter = span.enter(); + histogram!("capture_event_batch_size").record(events.len() as f64); counter!("capture_events_ingested_total").increment(events.len() as u64); for event in events { diff --git a/rust/capture/src/v0_endpoint.rs b/rust/capture/src/v0_endpoint.rs index 7f5f949114fb4..03b550cd9cdaf 100644 --- a/rust/capture/src/v0_endpoint.rs +++ b/rust/capture/src/v0_endpoint.rs @@ -11,6 +11,7 @@ use base64::Engine; use metrics::counter; use serde_json::json; use serde_json::Value; +use tracing::instrument; use crate::limiters::redis::QuotaResource; use crate::prometheus::report_dropped_events; @@ -132,6 +133,20 @@ async fn handle_common( Ok((context, events)) } +#[instrument( + skip_all, + fields( + path, + token, + batch_size, + user_agent, + content_encoding, + content_type, + version, + compression, + historical_migration + ) +)] #[debug_handler] pub async fn event( state: State, @@ -188,6 +203,20 @@ pub async fn event( } } +#[instrument( + skip_all, + fields( + path, + token, + batch_size, + user_agent, + content_encoding, + content_type, + version, + compression, + historical_migration + ) +)] #[debug_handler] pub async fn recording( state: State, @@ -251,6 +280,7 @@ pub async fn options() -> Result, CaptureError> { })) } +#[instrument(skip_all)] pub fn process_single_event( event: &RawEvent, context: &ProcessingContext, @@ -285,6 +315,7 @@ pub fn process_single_event( }) } +#[instrument(skip_all, fields(events = events.len()))] pub async fn process_events<'a>( sink: Arc, events: &'a [RawEvent], @@ -304,10 +335,11 @@ pub async fn process_events<'a>( } } -pub async fn process_replay_events( +#[instrument(skip_all, fields(events = events.len()))] +pub async fn process_replay_events<'a>( sink: Arc, mut events: Vec, - context: &ProcessingContext, + context: &'a ProcessingContext, ) -> Result<(), CaptureError> { // Grab metadata about the whole batch from the first event before // we drop all the events as we rip out the snapshot data diff --git a/rust/capture/src/v0_request.rs b/rust/capture/src/v0_request.rs index 8211f9d880d84..d25a1280b7538 100644 --- a/rust/capture/src/v0_request.rs +++ b/rust/capture/src/v0_request.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; use time::format_description::well_known::Iso8601; use time::OffsetDateTime; +use tracing::instrument; use uuid::Uuid; use crate::api::CaptureError; @@ -123,6 +124,7 @@ impl RawRequest { /// fail due to it being missing when the body is compressed. /// Instead of trusting the parameter, we peek at the payload's first three bytes to /// detect gzip, fallback to uncompressed utf8 otherwise. + #[instrument(skip_all)] pub fn from_bytes(bytes: Bytes, limit: usize) -> Result { tracing::debug!(len = bytes.len(), "decoding new event"); @@ -214,6 +216,7 @@ impl RawRequest { } } +#[instrument(skip_all, fields(events = events.len()))] pub fn extract_token(events: &[RawEvent]) -> Result { let distinct_tokens: HashSet> = HashSet::from_iter( events