diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java index f96cd3ea0..7b4a18678 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java @@ -36,6 +36,14 @@ public class KafkaSourceOptions { .noDefaultValue() .withDescription("The prefix to use for the Kafka consumers."); + public static final ConfigOption COMPLETE_CLIENT_ID = + ConfigOptions.key("client.id.complete") + .stringType() + .noDefaultValue() + .withDescription( + "The complete client ID to use for Kafka consumers. If provided this will " + + "take precedence over prefixes and subtask ID suffixes."); + public static final ConfigOption PARTITION_DISCOVERY_INTERVAL_MS = ConfigOptions.key("partition.discovery.interval.ms") .longType() diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index 94940b8e7..6b3ae5f0a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -423,6 +423,11 @@ private void unassignPartitions(Collection partitionsToUnassign) } private String createConsumerClientId(Properties props) { + String providedClientId = props.getProperty(KafkaSourceOptions.COMPLETE_CLIENT_ID.key()); + if (providedClientId != null) { + return providedClientId; + } + String prefix = props.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()); return prefix + "-" + subtaskId; }