Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional producer config #151

Conversation

hellopeera
Copy link

Release notes

Add Kafka producer configs enable_idempotence and max_in_flight_requests_per_connection to help ensure that exactly one copy of each message is written in the stream in face of producer retries

What does this PR do?

With producer idempotence, it ensures that duplicates are not introduced due to unexpected retries due to some intermittent issue e.g. network problem, etc.

This PR exposes the Kafka producer configurations: enable_idempotence and max_in_flight_requests_per_connection which is optional to Logstash users.

How to enable idempotence for Kafka producer

Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be 'all'.

acks = -1
enable.idempotence = true
max.in.flight.requests.per.connection = 5

How is this PR test locally

Runs a local Kafka cluster

Launch the test Kafka script

./kafka_test_setup.sh

Connect a consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic logstash_integration_enable_idempotence_topic

Setup Logstash Kafka output & run

Install the plugin in development mode

bin/logstash-plugin install --no-verify /path/to/logstash_plugins/ogstash-integration-kafka/logstash-integration-kafka-*.gem

Run logstash

bin/logstash -e 'input { stdin { } } output { kafka { topic_id => "logstash_integration_enable_idempotence_topic" enable_idempotence => "true" acks => "all" max_in_flight_requests_per_connection => 5 } }'

Verify if Logstash producer config is according to what is configured

Observe Kafka producer config printed on Logstash console

acks = -1
enable.idempotence = true
max.in.flight.requests.per.connection = 5

Verify if Logstash is able to producing messages

From the Logstash console, enter some messages via stdin. Verify the logstash consumer console if the message is received.

Related issues

@cla-checker-service
Copy link

❌ Author of the following commits did not sign a Contributor Agreement:
5d49e5e

Please, read and sign the above mentioned agreement if you want to contribute to this project

@hellopeera
Copy link
Author

Commit code with wrong user credential. I'll close this PR and recreate again. Sorry for confusion.

@hellopeera hellopeera closed this Oct 20, 2023
@hellopeera hellopeera deleted the feature/additional_producer_configurations branch October 20, 2023 15:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants