Skip to content

Commit

Permalink
chore: expose more kafka settings to capture sink (#24739)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankh authored and MarconLP committed Sep 6, 2024
1 parent 020b531 commit c70c165
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
4 changes: 4 additions & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,8 @@ pub struct KafkaConfig {
pub kafka_tls: bool,
#[envconfig(default = "")]
pub kafka_client_id: String,
#[envconfig(default = "60000")]
pub kafka_metadata_max_age_ms: u32,
#[envconfig(default = "2")]
pub kafka_producer_max_retries: u32,
}
10 changes: 10 additions & 0 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ impl KafkaSink {
.set("bootstrap.servers", &config.kafka_hosts)
.set("statistics.interval.ms", "10000")
.set("partitioner", "murmur2_random") // Compatibility with python-kafka
.set(
"metadata.max.age.ms",
config.kafka_metadata_max_age_ms.to_string(),
)
.set(
"message.send.max.retries",
config.kafka_producer_max_retries.to_string(),
)
.set("linger.ms", config.kafka_producer_linger_ms.to_string())
.set(
"message.max.bytes",
Expand Down Expand Up @@ -371,6 +379,8 @@ mod tests {
kafka_heatmaps_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
kafka_client_id: "".to_string(),
kafka_metadata_max_age_ms: 60000,
kafka_producer_max_retries: 2,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
(cluster, sink)
Expand Down
2 changes: 2 additions & 0 deletions rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka_heatmaps_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
kafka_client_id: "".to_string(),
kafka_metadata_max_age_ms: 60000,
kafka_producer_max_retries: 2,
},
otel_url: None,
otel_sampling_rate: 0.0,
Expand Down

0 comments on commit c70c165

Please sign in to comment.