diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java new file mode 100644 index 000000000..e1fa3defe --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +/** + * Construct a Kafka ClientId using a prefix + * and a subtask id. + */ +public class ClientIdFactory { + private static final String CLIENT_ID_DELIMITER = "-"; + + /** + * Construct a Kafka client id in the following format + * {@code clientIdPrefix-subtaskId}. + * + * @param clientIdPrefix prefix for the id + * @param subtaskId id of the Kafka producer subtask + * @return clientId + */ + public static String buildClientId( + String clientIdPrefix, + int subtaskId + ) { + return clientIdPrefix + + CLIENT_ID_DELIMITER + + subtaskId; + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index d5b1c3700..d7efcb96b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -61,15 +61,18 @@ public class KafkaSink private final KafkaRecordSerializationSchema recordSerializer; private final Properties kafkaProducerConfig; private final String transactionalIdPrefix; + private final String clientIdPrefix; KafkaSink( DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, + String clientIdPrefix, KafkaRecordSerializationSchema recordSerializer) { this.deliveryGuarantee = deliveryGuarantee; this.kafkaProducerConfig = kafkaProducerConfig; this.transactionalIdPrefix = transactionalIdPrefix; + this.clientIdPrefix = clientIdPrefix; this.recordSerializer = recordSerializer; } @@ -102,6 +105,7 @@ public KafkaWriter createWriter(InitContext context) throws IOException { deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, + clientIdPrefix, context, recordSerializer, context.asSerializationSchemaInitializationContext(), @@ -116,6 +120,7 @@ public KafkaWriter restoreWriter( deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, + clientIdPrefix, context, recordSerializer, context.asSerializationSchemaInitializationContext(), diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index f0c20cfc0..59afd3bf0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -71,6 +71,7 @@ public class KafkaSinkBuilder { private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE; private String transactionalIdPrefix = "kafka-sink"; + private String clientIdPrefix = null; private final Properties kafkaProducerConfig; private KafkaRecordSerializationSchema recordSerializer; @@ -190,6 +191,20 @@ public KafkaSinkBuilder setBootstrapServers(String bootstrapServers) { return setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } + /** + * Set the prefix for all KafkaProducer `client.id` values. + * This will overwrite any value set for `client.id` in the provided Kafka producer configuration. + * Instead, a value for `client.id` will be derived from the prefix provided. + * Using a prefix will create a unique Kafka `client.id` for all producers. + * + * @param clientIdPrefix Prefix to use + * @return {@link KafkaSinkBuilder} + */ + public KafkaSinkBuilder setClientIdPrefix(String clientIdPrefix) { + this.clientIdPrefix = checkNotNull(clientIdPrefix, "clientIdPrefix"); + return this; + } + private void sanityCheck() { checkNotNull( kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), @@ -202,6 +217,25 @@ private void sanityCheck() { checkNotNull(recordSerializer, "recordSerializer"); } + private static void overwriteConfig(Properties properties, String clientIdPrefix, int subtaskId) { + if (clientIdPrefix == null) { + return; + } + String updatedClientId = ClientIdFactory.buildClientId(clientIdPrefix, subtaskId); + overrideProperty(properties, ProducerConfig.CLIENT_ID_CONFIG, updatedClientId); + } + + private static void overrideProperty(Properties properties, String key, String value) { + String userValue = properties.getProperty(key); + if (userValue != null) { + LOG.warn( + String.format( + "Property %s is provided but will be overridden from %s to %s", + key, userValue, value)); + } + properties.setProperty(key, value); + } + /** * Constructs the {@link KafkaSink} with the configured properties. * @@ -210,6 +244,6 @@ private void sanityCheck() { public KafkaSink build() { sanityCheck(); return new KafkaSink<>( - deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, recordSerializer); + deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, clientIdPrefix, recordSerializer); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 0cc16b219..d628d437a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -32,6 +32,7 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; @@ -127,6 +128,7 @@ class KafkaWriter DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, + String clientIdPrefix, Sink.InitContext sinkInitContext, KafkaRecordSerializationSchema recordSerializer, SerializationSchema.InitializationContext schemaContext, @@ -136,6 +138,7 @@ class KafkaWriter this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix"); this.recordSerializer = checkNotNull(recordSerializer, "recordSerializer"); checkNotNull(sinkInitContext, "sinkInitContext"); + overwriteClientId(kafkaProducerConfig, clientIdPrefix, sinkInitContext.getSubtaskId()); this.deliveryCallback = new WriterCallback( sinkInitContext.getMailboxExecutor(), @@ -299,6 +302,25 @@ void abortLingeringTransactions( } } + private static void overwriteClientId(Properties properties, String clientIdPrefix, int subtaskId) { + if (clientIdPrefix == null) { + return; + } + String updatedClientId = ClientIdFactory.buildClientId(clientIdPrefix, subtaskId); + overrideProperty(properties, ProducerConfig.CLIENT_ID_CONFIG, updatedClientId); + } + + private static void overrideProperty(Properties properties, String key, String value) { + String userValue = properties.getProperty(key); + if (userValue != null) { + LOG.warn( + String.format( + "Property %s is provided but will be overridden from %s to %s", + key, userValue, value)); + } + properties.setProperty(key, value); + } + /** * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new transactions * will not clash with transactions created during previous checkpoints ({@code