diff --git a/rust/capture/src/router.rs b/rust/capture/src/router.rs index efc98ae3c529e..e1ba579684585 100644 --- a/rust/capture/src/router.rs +++ b/rust/capture/src/router.rs @@ -27,6 +27,7 @@ pub struct State { pub timesource: Arc, pub redis: Arc, pub billing_limiter: RedisLimiter, + pub event_size_limit: usize, } async fn index() -> &'static str { @@ -47,12 +48,14 @@ pub fn router< metrics: bool, capture_mode: CaptureMode, concurrency_limit: Option, + event_size_limit: usize, ) -> Router { let state = State { sink: Arc::new(sink), timesource: Arc::new(timesource), redis, billing_limiter, + event_size_limit, }; // Very permissive CORS policy, as old SDK versions diff --git a/rust/capture/src/server.rs b/rust/capture/src/server.rs index 5d006df5282f4..93ff3f646c3bc 100644 --- a/rust/capture/src/server.rs +++ b/rust/capture/src/server.rs @@ -31,6 +31,8 @@ where ) .expect("failed to create billing limiter"); + let event_max_bytes = config.kafka.kafka_producer_message_max_bytes as usize; + let app = if config.print_sink { // Print sink is only used for local debug, don't allow a container with it to run on prod liveness @@ -48,6 +50,7 @@ where config.export_prometheus, config.capture_mode, config.concurrency_limit, + event_max_bytes, ) } else { let sink_liveness = liveness @@ -90,6 +93,7 @@ where config.export_prometheus, config.capture_mode, config.concurrency_limit, + event_max_bytes, ) }; diff --git a/rust/capture/src/v0_endpoint.rs b/rust/capture/src/v0_endpoint.rs index add976345d2c5..03b550cd9cdaf 100644 --- a/rust/capture/src/v0_endpoint.rs +++ b/rust/capture/src/v0_endpoint.rs @@ -77,12 +77,12 @@ async fn handle_common( tracing::error!("failed to decode form data: {}", e); CaptureError::RequestDecodingError(String::from("missing data field")) })?; - RawRequest::from_bytes(payload.into()) + RawRequest::from_bytes(payload.into(), state.event_size_limit) } ct => { tracing::Span::current().record("content_type", ct); - RawRequest::from_bytes(body) + RawRequest::from_bytes(body, state.event_size_limit) } }?; diff --git a/rust/capture/src/v0_request.rs b/rust/capture/src/v0_request.rs index ae0c80fece453..67bc722966f8c 100644 --- a/rust/capture/src/v0_request.rs +++ b/rust/capture/src/v0_request.rs @@ -124,22 +124,52 @@ impl RawRequest { /// Instead of trusting the parameter, we peek at the payload's first three bytes to /// detect gzip, fallback to uncompressed utf8 otherwise. #[instrument(skip_all)] - pub fn from_bytes(bytes: Bytes) -> Result { + pub fn from_bytes(bytes: Bytes, limit: usize) -> Result { tracing::debug!(len = bytes.len(), "decoding new event"); let payload = if bytes.starts_with(&GZIP_MAGIC_NUMBERS) { - let mut d = GzDecoder::new(bytes.reader()); - let mut s = String::new(); - d.read_to_string(&mut s).map_err(|e| { - tracing::error!("failed to decode gzip: {}", e); - CaptureError::RequestDecodingError(String::from("invalid gzip data")) - })?; - s + let len = bytes.len(); + let mut zipstream = GzDecoder::new(bytes.reader()); + let chunk = &mut [0; 1024]; + let mut buf = Vec::with_capacity(len); + loop { + let got = match zipstream.read(chunk) { + Ok(got) => got, + Err(e) => { + tracing::error!("failed to read gzip stream: {}", e); + return Err(CaptureError::RequestDecodingError(String::from( + "invalid gzip data", + ))); + } + }; + if got == 0 { + break; + } + buf.extend_from_slice(&chunk[..got]); + if buf.len() > limit { + tracing::error!("GZIP decompression limit reached"); + return Err(CaptureError::EventTooBig); + } + } + match String::from_utf8(buf) { + Ok(s) => s, + Err(e) => { + tracing::error!("failed to decode gzip: {}", e); + return Err(CaptureError::RequestDecodingError(String::from( + "invalid gzip data", + ))); + } + } } else { - String::from_utf8(bytes.into()).map_err(|e| { + let s = String::from_utf8(bytes.into()).map_err(|e| { tracing::error!("failed to decode body: {}", e); CaptureError::RequestDecodingError(String::from("invalid body encoding")) - })? + })?; + if s.len() > limit { + tracing::error!("Request size limit reached"); + return Err(CaptureError::EventTooBig); + } + s }; tracing::debug!(json = payload, "decoded event data"); @@ -286,7 +316,7 @@ mod tests { .expect("payload is not base64"), ); - let events = RawRequest::from_bytes(compressed_bytes) + let events = RawRequest::from_bytes(compressed_bytes, 1024) .expect("failed to parse") .events(); assert_eq!(1, events.len()); @@ -308,7 +338,7 @@ mod tests { .expect("payload is not base64"), ); - let events = RawRequest::from_bytes(compressed_bytes) + let events = RawRequest::from_bytes(compressed_bytes, 2048) .expect("failed to parse") .events(); assert_eq!(1, events.len()); @@ -325,7 +355,7 @@ mod tests { #[test] fn extract_distinct_id() { let parse_and_extract = |input: &'static str| -> Result { - let parsed = RawRequest::from_bytes(input.into()) + let parsed = RawRequest::from_bytes(input.into(), 2048) .expect("failed to parse") .events(); parsed[0].extract_distinct_id() @@ -393,7 +423,7 @@ mod tests { "distinct_id": distinct_id }]); - let parsed = RawRequest::from_bytes(input.to_string().into()) + let parsed = RawRequest::from_bytes(input.to_string().into(), 2048) .expect("failed to parse") .events(); assert_eq!( @@ -405,7 +435,7 @@ mod tests { #[test] fn extract_and_verify_token() { let parse_and_extract = |input: &'static str| -> Result { - RawRequest::from_bytes(input.into()) + RawRequest::from_bytes(input.into(), 2048) .expect("failed to parse") .extract_and_verify_token() }; diff --git a/rust/capture/tests/django_compat.rs b/rust/capture/tests/django_compat.rs index 55fa838aaef13..d08be11c7506c 100644 --- a/rust/capture/tests/django_compat.rs +++ b/rust/capture/tests/django_compat.rs @@ -113,6 +113,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { false, CaptureMode::Events, None, + 25 * 1024 * 1024, ); let client = TestClient::new(app);