From ed3d92ffb11a337f54e328afdcd285dd9d0cda2e Mon Sep 17 00:00:00 2001 From: bdbene Date: Tue, 22 Aug 2023 16:35:27 -0400 Subject: [PATCH] [FLINK-32893][Connectors/Kafka] Allow the clientid to be fully configurable --- .../flink/connector/kafka/source/KafkaSourceOptions.java | 8 ++++++++ .../kafka/source/reader/KafkaPartitionSplitReader.java | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java index f96cd3ea0..7b4a18678 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java @@ -36,6 +36,14 @@ public class KafkaSourceOptions { .noDefaultValue() .withDescription("The prefix to use for the Kafka consumers."); + public static final ConfigOption COMPLETE_CLIENT_ID = + ConfigOptions.key("client.id.complete") + .stringType() + .noDefaultValue() + .withDescription( + "The complete client ID to use for Kafka consumers. If provided this will " + + "take precedence over prefixes and subtask ID suffixes."); + public static final ConfigOption PARTITION_DISCOVERY_INTERVAL_MS = ConfigOptions.key("partition.discovery.interval.ms") .longType() diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index 94940b8e7..6b3ae5f0a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -423,6 +423,11 @@ private void unassignPartitions(Collection partitionsToUnassign) } private String createConsumerClientId(Properties props) { + String providedClientId = props.getProperty(KafkaSourceOptions.COMPLETE_CLIENT_ID.key()); + if (providedClientId != null) { + return providedClientId; + } + String prefix = props.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()); return prefix + "-" + subtaskId; }