Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32893][Connectors/Kafka] Allow the clientid to be fully configurable #45

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

bdbene
Copy link

@bdbene bdbene commented Aug 23, 2023

What is the purpose of the change

FLINK-32893
There's some use cases to use the same clientID across all of the low level Kafka consumers to

  1. Simplify how quotas are applied
  2. Lower the cardinality of metrics reported by the Kafka brokers

Brief Change Log

Support an optional configuration for the exact clientID to run Kafka consumers with

Verifying this change

It's difficult to write unit tests for this which is probably why there's no existing unit tests for the client ID. However I was able to run this locally and could see the client ID was logged exactly as I specified it (no prefix or suffix).

@boring-cyborg
Copy link

boring-cyborg bot commented Aug 23, 2023

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@MartijnVisser
Copy link
Contributor

@bdbene Can you please rebase your PR?

@bdbene
Copy link
Author

bdbene commented Jan 18, 2024

Done :)

@MartijnVisser
Copy link
Contributor

@bdbene Please rebase instead of merging, since we can't merge in merge commits

@bdbene bdbene force-pushed the FLINK-32893-fully-configurable-cliendid branch from ba3b9e6 to ed3d92f Compare January 18, 2024 21:47
@bdbene
Copy link
Author

bdbene commented Jan 18, 2024

Oops, fixed that

@@ -36,6 +36,14 @@ public class KafkaSourceOptions {
.noDefaultValue()
.withDescription("The prefix to use for the Kafka consumers.");

public static final ConfigOption<String> COMPLETE_CLIENT_ID =
ConfigOptions.key("client.id.complete")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ConfigOptions.key("client.id.complete")
ConfigOptions.key("client.id")

shall we just use the same config as kafka clients?

I agree this would NOT be backwards compatible in Flink source, but more align with Kafka clients

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants