Skip to content

Commit

Permalink
feat: allow configuring max.message.bytes for rust capture (#24599)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankh authored Aug 27, 2024
1 parent fb1cb6f commit c50a791
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 3 deletions.
2 changes: 2 additions & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct KafkaConfig {
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
#[envconfig(default = "20000")]
pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds
#[envconfig(default = "1000000")]
pub kafka_producer_message_max_bytes: u32, // message.max.bytes - max kafka message size we will produce
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
pub kafka_hosts: String,
Expand Down
34 changes: 31 additions & 3 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ impl KafkaSink {
.set("statistics.interval.ms", "10000")
.set("partitioner", "murmur2_random") // Compatibility with python-kafka
.set("linger.ms", config.kafka_producer_linger_ms.to_string())
.set(
"message.max.bytes",
config.kafka_producer_message_max_bytes.to_string(),
)
.set(
"message.timeout.ms",
config.kafka_message_timeout_ms.to_string(),
Expand Down Expand Up @@ -328,7 +332,9 @@ mod tests {
use std::num::NonZeroU32;
use time::Duration;

async fn start_on_mocked_sink() -> (MockCluster<'static, DefaultProducerContext>, KafkaSink) {
async fn start_on_mocked_sink(
message_max_bytes: Option<u32>,
) -> (MockCluster<'static, DefaultProducerContext>, KafkaSink) {
let registry = HealthRegistry::new("liveness");
let handle = registry
.register("one".to_string(), Duration::seconds(30))
Expand All @@ -343,6 +349,7 @@ mod tests {
kafka_producer_linger_ms: 0,
kafka_producer_queue_mib: 50,
kafka_message_timeout_ms: 500,
kafka_producer_message_max_bytes: message_max_bytes.unwrap_or(1000000),
kafka_compression_codec: "none".to_string(),
kafka_hosts: cluster.bootstrap_servers(),
kafka_topic: "events_plugin_ingestion".to_string(),
Expand All @@ -361,7 +368,7 @@ mod tests {
// Uses a mocked Kafka broker that allows injecting write errors, to check error handling.
// We test different cases in a single test to amortize the startup cost of the producer.

let (cluster, sink) = start_on_mocked_sink().await;
let (cluster, sink) = start_on_mocked_sink(Some(3000000)).await;
let event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
Expand Down Expand Up @@ -389,7 +396,7 @@ mod tests {
.await
.expect("failed to send initial event batch");

// Producer should reject a 2MB message, twice the default `message.max.bytes`
// Producer should accept a 2MB message as we set message.max.bytes to 3MB
let big_data = rand::thread_rng()
.sample_iter(Alphanumeric)
.take(2_000_000)
Expand All @@ -406,6 +413,27 @@ mod tests {
token: "token1".to_string(),
session_id: None,
};
sink.send(big_event)
.await
.expect("failed to send event larger than default max size");

// Producer should reject a 4MB message
let big_data = rand::thread_rng()
.sample_iter(Alphanumeric)
.take(4_000_000)
.map(char::from)
.collect();
let big_event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
data: big_data,
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
session_id: None,
};
match sink.send(big_event).await {
Err(CaptureError::EventTooBig) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Expand Down
1 change: 1 addition & 0 deletions rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
kafka_message_timeout_ms: 10000, // 10s, ACKs can be slow on low volumes, should be tuned
kafka_producer_message_max_bytes: 1000000, // 1MB, rdkafka default
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
Expand Down

0 comments on commit c50a791

Please sign in to comment.