Skip to content

Commit

Permalink
Merge branch 'master' into lemon-input-select-v2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Twixes authored Sep 2, 2024
2 parents 25a4324 + b67e96d commit eb45cf2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
16 changes: 12 additions & 4 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,14 @@ impl KafkaSink {
CaptureError::NonRetryableSinkError
})?;

let token = event.token.clone();
let data_type = event.data_type;
let event_key = event.key();
let session_id = event.session_id.as_deref();
let session_id = event.session_id.clone();

let (topic, partition_key): (&str, Option<&str>) = match &event.data_type {
drop(event); // Events can be EXTREMELY memory hungry

let (topic, partition_key): (&str, Option<&str>) = match data_type {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
// TODO: deprecate capture-led overflow or move logic in handler
Expand All @@ -212,7 +216,11 @@ impl KafkaSink {
DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())),
DataType::SnapshotMain => (
&self.main_topic,
Some(session_id.ok_or(CaptureError::MissingSessionId)?),
Some(
session_id
.as_deref()
.ok_or(CaptureError::MissingSessionId)?,
),
),
};

Expand All @@ -224,7 +232,7 @@ impl KafkaSink {
timestamp: None,
headers: Some(OwnedHeaders::new().insert(Header {
key: "token",
value: Some(&event.token),
value: Some(&token),
})),
}) {
Ok(ack) => Ok(ack),
Expand Down
3 changes: 3 additions & 0 deletions rust/capture/src/v0_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tracing::instrument;
use uuid::Uuid;

use crate::api::CaptureError;
use crate::prometheus::report_dropped_events;
use crate::token::validate_token;

#[derive(Deserialize, Default)]
Expand Down Expand Up @@ -148,6 +149,7 @@ impl RawRequest {
buf.extend_from_slice(&chunk[..got]);
if buf.len() > limit {
tracing::error!("GZIP decompression limit reached");
report_dropped_events("event_too_big", 1);
return Err(CaptureError::EventTooBig);
}
}
Expand All @@ -167,6 +169,7 @@ impl RawRequest {
})?;
if s.len() > limit {
tracing::error!("Request size limit reached");
report_dropped_events("event_too_big", 1);
return Err(CaptureError::EventTooBig);
}
s
Expand Down

0 comments on commit eb45cf2

Please sign in to comment.