Skip to content

Commit

Permalink
feat(capture): rip out all tracing spans (#24767)
Browse files Browse the repository at this point in the history
Co-authored-by: Brett Hoerner <[email protected]>
  • Loading branch information
oliverb123 and bretthoerner authored Sep 3, 2024
1 parent 1a32f2b commit 73c1730
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 56 deletions.
6 changes: 0 additions & 6 deletions rust/capture/src/limiters/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::redis::Client;
use thiserror::Error;
use time::{Duration, OffsetDateTime};
use tokio::sync::RwLock;
use tracing::instrument;

// todo: fetch from env
const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/";
Expand Down Expand Up @@ -90,7 +89,6 @@ impl RedisLimiter {
})
}

#[instrument(skip_all)]
async fn fetch_limited(
client: &Arc<dyn Client + Send + Sync>,
key_prefix: &str,
Expand All @@ -103,7 +101,6 @@ impl RedisLimiter {
.await
}

#[instrument(skip_all, fields(key = key))]
pub async fn is_limited(&self, key: &str, resource: QuotaResource) -> bool {
// hold the read lock to clone it, very briefly. clone is ok because it's very small 🤏
// rwlock can have many readers, but one writer. the writer will wait in a queue with all
Expand All @@ -126,9 +123,6 @@ impl RedisLimiter {
let mut updated = self.updated.write().await;
*updated = OffsetDateTime::now_utc();

let span = tracing::debug_span!("updating billing cache from redis");
let _span = span.enter();

// a few requests might end up in here concurrently, but I don't think a few extra will
// be a big problem. If it is, we can rework the concurrency a bit.
// On prod atm we call this around 15 times per second at peak times, and it usually
Expand Down
8 changes: 1 addition & 7 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::config::KafkaConfig;
Expand Down Expand Up @@ -280,16 +279,12 @@ impl KafkaSink {

#[async_trait]
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let ack = self.kafka_send(event).await?;
histogram!("capture_event_batch_size").record(1.0);
Self::process_ack(ack)
.instrument(info_span!("ack_wait_one"))
.await
Self::process_ack(ack).await
}

#[instrument(skip_all)]
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let mut set = JoinSet::new();
let batch_size = events.len();
Expand Down Expand Up @@ -319,7 +314,6 @@ impl Event for KafkaSink {
}
Ok(())
}
.instrument(info_span!("ack_wait_many"))
.await?;

histogram!("capture_event_batch_size").record(batch_size as f64);
Expand Down
3 changes: 0 additions & 3 deletions rust/capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ impl Event for PrintSink {
Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size").record(events.len() as f64);
counter!("capture_events_ingested_total").increment(events.len() as u64);
for event in events {
Expand Down
36 changes: 2 additions & 34 deletions rust/capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use base64::Engine;
use metrics::counter;
use serde_json::json;
use serde_json::Value;
use tracing::instrument;

use crate::limiters::redis::QuotaResource;
use crate::prometheus::report_dropped_events;
Expand Down Expand Up @@ -133,20 +132,6 @@ async fn handle_common(
Ok((context, events))
}

#[instrument(
skip_all,
fields(
path,
token,
batch_size,
user_agent,
content_encoding,
content_type,
version,
compression,
historical_migration
)
)]
#[debug_handler]
pub async fn event(
state: State<router::State>,
Expand Down Expand Up @@ -203,20 +188,6 @@ pub async fn event(
}
}

#[instrument(
skip_all,
fields(
path,
token,
batch_size,
user_agent,
content_encoding,
content_type,
version,
compression,
historical_migration
)
)]
#[debug_handler]
pub async fn recording(
state: State<router::State>,
Expand Down Expand Up @@ -280,7 +251,6 @@ pub async fn options() -> Result<Json<CaptureResponse>, CaptureError> {
}))
}

#[instrument(skip_all)]
pub fn process_single_event(
event: &RawEvent,
context: &ProcessingContext,
Expand Down Expand Up @@ -315,7 +285,6 @@ pub fn process_single_event(
})
}

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sinks::Event + Send + Sync>,
events: &'a [RawEvent],
Expand All @@ -335,11 +304,10 @@ pub async fn process_events<'a>(
}
}

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_replay_events<'a>(
pub async fn process_replay_events(
sink: Arc<dyn sinks::Event + Send + Sync>,
mut events: Vec<RawEvent>,
context: &'a ProcessingContext,
context: &ProcessingContext,
) -> Result<(), CaptureError> {
// Grab metadata about the whole batch from the first event before
// we drop all the events as we rip out the snapshot data
Expand Down
3 changes: 0 additions & 3 deletions rust/capture/src/v0_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use time::format_description::well_known::Iso8601;
use time::OffsetDateTime;
use tracing::instrument;
use uuid::Uuid;

use crate::api::CaptureError;
Expand Down Expand Up @@ -124,7 +123,6 @@ impl RawRequest {
/// fail due to it being missing when the body is compressed.
/// Instead of trusting the parameter, we peek at the payload's first three bytes to
/// detect gzip, fallback to uncompressed utf8 otherwise.
#[instrument(skip_all)]
pub fn from_bytes(bytes: Bytes, limit: usize) -> Result<RawRequest, CaptureError> {
tracing::debug!(len = bytes.len(), "decoding new event");

Expand Down Expand Up @@ -216,7 +214,6 @@ impl RawRequest {
}
}

#[instrument(skip_all, fields(events = events.len()))]
pub fn extract_token(events: &[RawEvent]) -> Result<String, CaptureError> {
let distinct_tokens: HashSet<Option<String>> = HashSet::from_iter(
events
Expand Down
20 changes: 17 additions & 3 deletions rust/cyclotron-janitor/tests/janitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Duration, Timelike, Utc};
use common_kafka::kafka_messages::app_metrics2::{
AppMetric2, Kind as AppMetric2Kind, Source as AppMetric2Source,
};
Expand Down Expand Up @@ -66,6 +66,7 @@ async fn janitor_test(db: PgPool) {
};

// First test - if we mark a job as completed, the janitor will clean it up
let mut job_now = Utc::now();
manager.create_job(job_init.clone()).await.unwrap();
let job = worker
.dequeue_jobs(&queue_name, 1)
Expand All @@ -92,7 +93,13 @@ async fn janitor_test(db: PgPool) {
app_metric,
AppMetric2 {
team_id: 1,
timestamp: DateTime::<Utc>::from_str("2024-08-30T19:00:00Z").unwrap(),
timestamp: job_now
.with_minute(0)
.unwrap()
.with_second(0)
.unwrap()
.with_nanosecond(0)
.unwrap(),
app_source: AppMetric2Source::Cyclotron,
app_source_id: uuid.to_string(),
instance_id: None,
Expand All @@ -104,6 +111,7 @@ async fn janitor_test(db: PgPool) {
}

// Second test - if we mark a job as failed, the janitor will clean it up
job_now = Utc::now();
manager.create_job(job_init.clone()).await.unwrap();
let job = worker
.dequeue_jobs(&queue_name, 1)
Expand All @@ -130,7 +138,13 @@ async fn janitor_test(db: PgPool) {
app_metric,
AppMetric2 {
team_id: 1,
timestamp: DateTime::<Utc>::from_str("2024-08-30T19:00:00Z").unwrap(),
timestamp: job_now
.with_minute(0)
.unwrap()
.with_second(0)
.unwrap()
.with_nanosecond(0)
.unwrap(),
app_source: AppMetric2Source::Cyclotron,
app_source_id: uuid.to_string(),
instance_id: None,
Expand Down

0 comments on commit 73c1730

Please sign in to comment.