Skip to content

Commit

Permalink
revert(capture): rip out all tracing spans (#24768)
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Sep 4, 2024
1 parent 73c1730 commit 59de5a4
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 3 deletions.
6 changes: 6 additions & 0 deletions rust/capture/src/limiters/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::redis::Client;
use thiserror::Error;
use time::{Duration, OffsetDateTime};
use tokio::sync::RwLock;
use tracing::instrument;

// todo: fetch from env
const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/";
Expand Down Expand Up @@ -89,6 +90,7 @@ impl RedisLimiter {
})
}

#[instrument(skip_all)]
async fn fetch_limited(
client: &Arc<dyn Client + Send + Sync>,
key_prefix: &str,
Expand All @@ -101,6 +103,7 @@ impl RedisLimiter {
.await
}

#[instrument(skip_all, fields(key = key))]
pub async fn is_limited(&self, key: &str, resource: QuotaResource) -> bool {
// hold the read lock to clone it, very briefly. clone is ok because it's very small 🤏
// rwlock can have many readers, but one writer. the writer will wait in a queue with all
Expand All @@ -123,6 +126,9 @@ impl RedisLimiter {
let mut updated = self.updated.write().await;
*updated = OffsetDateTime::now_utc();

let span = tracing::debug_span!("updating billing cache from redis");
let _span = span.enter();

// a few requests might end up in here concurrently, but I don't think a few extra will
// be a big problem. If it is, we can rework the concurrency a bit.
// On prod atm we call this around 15 times per second at peak times, and it usually
Expand Down
8 changes: 7 additions & 1 deletion rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::config::KafkaConfig;
Expand Down Expand Up @@ -279,12 +280,16 @@ impl KafkaSink {

#[async_trait]
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let ack = self.kafka_send(event).await?;
histogram!("capture_event_batch_size").record(1.0);
Self::process_ack(ack).await
Self::process_ack(ack)
.instrument(info_span!("ack_wait_one"))
.await
}

#[instrument(skip_all)]
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let mut set = JoinSet::new();
let batch_size = events.len();
Expand Down Expand Up @@ -314,6 +319,7 @@ impl Event for KafkaSink {
}
Ok(())
}
.instrument(info_span!("ack_wait_many"))
.await?;

histogram!("capture_event_batch_size").record(batch_size as f64);
Expand Down
3 changes: 3 additions & 0 deletions rust/capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ impl Event for PrintSink {
Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size").record(events.len() as f64);
counter!("capture_events_ingested_total").increment(events.len() as u64);
for event in events {
Expand Down
36 changes: 34 additions & 2 deletions rust/capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use base64::Engine;
use metrics::counter;
use serde_json::json;
use serde_json::Value;
use tracing::instrument;

use crate::limiters::redis::QuotaResource;
use crate::prometheus::report_dropped_events;
Expand Down Expand Up @@ -132,6 +133,20 @@ async fn handle_common(
Ok((context, events))
}

#[instrument(
skip_all,
fields(
path,
token,
batch_size,
user_agent,
content_encoding,
content_type,
version,
compression,
historical_migration
)
)]
#[debug_handler]
pub async fn event(
state: State<router::State>,
Expand Down Expand Up @@ -188,6 +203,20 @@ pub async fn event(
}
}

#[instrument(
skip_all,
fields(
path,
token,
batch_size,
user_agent,
content_encoding,
content_type,
version,
compression,
historical_migration
)
)]
#[debug_handler]
pub async fn recording(
state: State<router::State>,
Expand Down Expand Up @@ -251,6 +280,7 @@ pub async fn options() -> Result<Json<CaptureResponse>, CaptureError> {
}))
}

#[instrument(skip_all)]
pub fn process_single_event(
event: &RawEvent,
context: &ProcessingContext,
Expand Down Expand Up @@ -285,6 +315,7 @@ pub fn process_single_event(
})
}

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sinks::Event + Send + Sync>,
events: &'a [RawEvent],
Expand All @@ -304,10 +335,11 @@ pub async fn process_events<'a>(
}
}

pub async fn process_replay_events(
#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_replay_events<'a>(
sink: Arc<dyn sinks::Event + Send + Sync>,
mut events: Vec<RawEvent>,
context: &ProcessingContext,
context: &'a ProcessingContext,
) -> Result<(), CaptureError> {
// Grab metadata about the whole batch from the first event before
// we drop all the events as we rip out the snapshot data
Expand Down
3 changes: 3 additions & 0 deletions rust/capture/src/v0_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use time::format_description::well_known::Iso8601;
use time::OffsetDateTime;
use tracing::instrument;
use uuid::Uuid;

use crate::api::CaptureError;
Expand Down Expand Up @@ -123,6 +124,7 @@ impl RawRequest {
/// fail due to it being missing when the body is compressed.
/// Instead of trusting the parameter, we peek at the payload's first three bytes to
/// detect gzip, fallback to uncompressed utf8 otherwise.
#[instrument(skip_all)]
pub fn from_bytes(bytes: Bytes, limit: usize) -> Result<RawRequest, CaptureError> {
tracing::debug!(len = bytes.len(), "decoding new event");

Expand Down Expand Up @@ -214,6 +216,7 @@ impl RawRequest {
}
}

#[instrument(skip_all, fields(events = events.len()))]
pub fn extract_token(events: &[RawEvent]) -> Result<String, CaptureError> {
let distinct_tokens: HashSet<Option<String>> = HashSet::from_iter(
events
Expand Down

0 comments on commit 59de5a4

Please sign in to comment.