diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index 269b62e0..28a1bc5d 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -84,12 +84,14 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>, one of `["none", "gzip", "snappy", "lz4", "zstd"]`|No | <> |<>|No | <> |a valid filesystem path|No | <> |a valid filesystem path|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -197,6 +199,22 @@ The id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included with the request +[id="plugins-{type}s-{plugin}-enable_idempotence"] +===== `enable_idempotence` + + * Value type is <> + * There is no default value for this setting. + +When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. +If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. +Note that enabling idempotence requires `max.in.flight.requests.per.connection` to be less than or equal to 5 +(with message ordering preserved for any allowable value), `retries` to be greater than 0, and `acks` must be 'all'. + +Idempotence is enabled by default if no conflicting configurations are set. +If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. +If idempotence is explicitly enabled and conflicting configurations are set, +a https://kafka.apache.org/{kafka_client_doc}/javadoc/org/apache/kafka/common/config/ConfigException.html[ConfigException] is thrown. + [id="plugins-{type}s-{plugin}-compression_type"] ===== `compression_type` @@ -267,6 +285,19 @@ This setting accomplishes this by adding a small amount of artificial delay—th rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. +[id="plugins-{type}s-{plugin}-max_in_flight_requests_per_connection"] +===== `max_in_flight_requests_per_connection` + + * Value type is <> + * Default value is `5`. + +The maximum number of unacknowledged requests the client will send on a single connection before blocking. +Note that if this configuration is set to be greater than 1 and `enable.idempotence` is set to false, +there is a risk of message reordering after a failed send due to retries (i.e., if retries are enabled); +if retries are disabled or if `enable.idempotence` is set to true, ordering will be preserved. +Additionally, enabling idempotence requires the value of this configuration to be less than or equal to 5. +If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. + [id="plugins-{type}s-{plugin}-max_request_size"] ===== `max_request_size` diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index a73b9b49..a0ac9bd3 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -92,6 +92,11 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base # The purpose of this is to be able to track the source of requests beyond just # ip/port by allowing a logical application name to be included with the request config :client_id, :validate => :string + # When set to ‘true’, the producer will ensure that exactly one copy of each message is written in the stream. + # If ‘false’, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. + # Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 + # (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be ‘all’. + config :enable_idempotence, :validate => :boolean # Serializer class for the key of the message config :key_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer' # The producer groups together any records that arrive in between request @@ -102,6 +107,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base # rather than immediately sending out a record the producer will wait for up to the given delay # to allow other records to be sent so that the sends can be batched together. config :linger_ms, :validate => :number, :default => 0 # Kafka default + # The maximum number of unacknowledged requests the client will send on a single connection before blocking. + config :max_in_flight_requests_per_connection, :validate => :number, :default => 5 # Kafka default # The maximum size of a request config :max_request_size, :validate => :number, :default => 1_048_576 # (1MB) Kafka default # The key for the message @@ -334,8 +341,10 @@ def create_producer props.put(kafka::COMPRESSION_TYPE_CONFIG, compression_type) props.put(kafka::CLIENT_DNS_LOOKUP_CONFIG, client_dns_lookup) props.put(kafka::CLIENT_ID_CONFIG, client_id) unless client_id.nil? + props.put(kafka::ENABLE_IDEMPOTENCE_CONFIG, enable_idempotence.to_s) unless enable_idempotence.nil? props.put(kafka::KEY_SERIALIZER_CLASS_CONFIG, key_serializer) props.put(kafka::LINGER_MS_CONFIG, linger_ms.to_s) + props.put(kafka::MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, max_in_flight_requests_per_connection.to_s) props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s) props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms.to_s) unless metadata_max_age_ms.nil? unless partitioner.nil?